@@ -38,19 +38,25 @@ def __init__(self):
38
38
# Used when storing input that will be processed when requiring to
39
39
# generate a signature:
40
40
41
- self .input_pending_processing : List [int ] = [] # Signed 16-bits, 16 KHz mono samples to be processed
41
+ self .input_pending_processing : List [int ] = []
42
+ # Signed 16-bits, 16 KHz mono samples to be processed
42
43
43
- self .samples_processed : int = 0 # Number of samples processed out of "self.input_pending_processing"
44
+ self .samples_processed : int = 0
45
+ # Number of samples processed out of "self.input_pending_processing"
44
46
45
47
# Used when processing input:
46
48
47
- self .ring_buffer_of_samples : RingBuffer [int ] = RingBuffer (buffer_size = 2048 , default_value = 0 )
49
+ self .ring_buffer_of_samples : RingBuffer [int ] = RingBuffer (buffer_size = 2048 ,
50
+ default_value = 0 )
48
51
49
- self .fft_outputs : RingBuffer [List [float ]] = RingBuffer (buffer_size = 256 , default_value = [0. * 1025 ])
50
- # Lists of 1025 floats, premultiplied with a Hanning function before being passed through FFT, computed from
52
+ self .fft_outputs : RingBuffer [List [float ]] = RingBuffer (buffer_size = 256 ,
53
+ default_value = [0. * 1025 ])
54
+ # Lists of 1025 floats, premultiplied with a Hanning function before being
55
+ # passed through FFT, computed from
51
56
# the ring buffer every new 128 samples
52
57
53
- self .spread_fft_output : RingBuffer [List [float ]] = RingBuffer (buffer_size = 256 , default_value = [0 ] * 1025 )
58
+ self .spread_fft_output : RingBuffer [List [float ]] = RingBuffer (buffer_size = 256 ,
59
+ default_value = [0 ] * 1025 )
54
60
55
61
# How much data to send to Shazam at once?
56
62
@@ -83,14 +89,17 @@ def feed_input(self, s16le_mono_samples: List[int]):
83
89
Except if there are no more samples to be consumed, in this case
84
90
we will return None.
85
91
"""
92
+
86
93
def get_next_signature (self ) -> Optional [DecodedMessage ]:
87
94
if len (self .input_pending_processing ) - self .samples_processed < 128 :
88
95
return None
89
96
while (len (self .input_pending_processing ) - self .samples_processed >= 128 and
90
- (self .next_signature .number_samples / self .next_signature .sample_rate_hz < self .MAX_TIME_SECONDS or
91
- sum (len (peaks ) for peaks in self .next_signature .frequency_band_to_sound_peaks .values ())
97
+ (self .next_signature .number_samples / self .next_signature .sample_rate_hz <
98
+ self .MAX_TIME_SECONDS or sum (len (peaks ) for peaks in
99
+ self .next_signature .frequency_band_to_sound_peaks .values ())
92
100
< self .MAX_PEAKS )):
93
- self .process_input (self .input_pending_processing [self .samples_processed :self .samples_processed + 128 ])
101
+ self .process_input (self .input_pending_processing
102
+ [self .samples_processed :self .samples_processed + 128 ])
94
103
self .samples_processed += 128
95
104
96
105
returned_signature = self .next_signature
@@ -100,9 +109,12 @@ def get_next_signature(self) -> Optional[DecodedMessage]:
100
109
self .next_signature .number_samples = 0
101
110
self .next_signature .frequency_band_to_sound_peaks = {}
102
111
103
- self .ring_buffer_of_samples : RingBuffer [int ] = RingBuffer (buffer_size = 2048 , default_value = 0 )
104
- self .fft_outputs : RingBuffer [List [float ]] = RingBuffer (buffer_size = 256 , default_value = [0. * 1025 ])
105
- self .spread_fft_output : RingBuffer [List [float ]] = RingBuffer (buffer_size = 256 , default_value = [0 ] * 1025 )
112
+ self .ring_buffer_of_samples : RingBuffer [int ] = RingBuffer (buffer_size = 2048 ,
113
+ default_value = 0 )
114
+ self .fft_outputs : RingBuffer [List [float ]] = RingBuffer (buffer_size = 256 ,
115
+ default_value = [0. * 1025 ])
116
+ self .spread_fft_output : RingBuffer [List [float ]] = RingBuffer (buffer_size = 256 ,
117
+ default_value = [0 ] * 1025 )
106
118
107
119
return returned_signature
108
120
@@ -114,23 +126,22 @@ def process_input(self, s16le_mono_samples: List[int]):
114
126
115
127
def do_fft (self , batch_of_128_s16le_mono_samples ):
116
128
type_ring = (self .ring_buffer_of_samples .position + len (batch_of_128_s16le_mono_samples ))
117
- self .ring_buffer_of_samples [self .ring_buffer_of_samples .position : type_ring ] = batch_of_128_s16le_mono_samples
129
+ self .ring_buffer_of_samples [
130
+ self .ring_buffer_of_samples .position : type_ring ] = batch_of_128_s16le_mono_samples
118
131
self .ring_buffer_of_samples .position += len (batch_of_128_s16le_mono_samples )
119
132
self .ring_buffer_of_samples .position %= 2048
120
133
self .ring_buffer_of_samples .num_written += len (batch_of_128_s16le_mono_samples )
121
134
122
135
excerpt_from_ring_buffer : list = (
123
- self .ring_buffer_of_samples [self .ring_buffer_of_samples .position :] +
124
- self .ring_buffer_of_samples [:self .ring_buffer_of_samples .position ]
136
+ self .ring_buffer_of_samples [self .ring_buffer_of_samples .position :] +
137
+ self .ring_buffer_of_samples [:self .ring_buffer_of_samples .position ]
125
138
)
126
139
127
140
# The pre multiplication of the array is for applying a windowing function before the DFT
128
141
# (slight rounded Hanning without zeros at edges)
129
142
130
143
fft_results : array = fft .rfft (HANNING_MATRIX * excerpt_from_ring_buffer )
131
144
132
- assert len (fft_results ) == 1025 and len (excerpt_from_ring_buffer ) == 2048 == len (HANNING_MATRIX )
133
-
134
145
fft_results = (fft_results .real ** 2 + fft_results .imag ** 2 ) / (1 << 17 )
135
146
fft_results = maximum (fft_results , 0.0000000001 )
136
147
@@ -185,16 +196,17 @@ def do_peak_recognition(self):
185
196
186
197
# Ensure that the bin is large enough to be a peak
187
198
188
- if ( fft_minus_46 [bin_position ] >= 1 / 64 and
189
- fft_minus_46 [ bin_position ] >= fft_minus_49 [bin_position - 1 ]):
199
+ if fft_minus_46 [bin_position ] >= 1 / 64 and ( fft_minus_46 [ bin_position ] >=
200
+ fft_minus_49 [bin_position - 1 ]):
190
201
191
202
# Ensure that it is frequency-domain local minimum
192
203
193
204
max_neighbor_in_fft_minus_49 = 0
194
205
195
206
for neighbor_offset in [* range (- 10 , - 3 , 3 ), - 3 , 1 , * range (2 , 9 , 3 )]:
196
- max_neighbor_in_fft_minus_49 = max (fft_minus_49 [bin_position + neighbor_offset ],
197
- max_neighbor_in_fft_minus_49 )
207
+ max_neighbor_in_fft_minus_49 = max (
208
+ fft_minus_49 [bin_position + neighbor_offset ],
209
+ max_neighbor_in_fft_minus_49 )
198
210
199
211
if fft_minus_46 [bin_position ] > max_neighbor_in_fft_minus_49 :
200
212
@@ -216,7 +228,8 @@ def do_peak_recognition(self):
216
228
217
229
fft_number = self .spread_fft_output .num_written - 46
218
230
219
- peak_magnitude = log (max (1 / 64 , fft_minus_46 [bin_position ])) * 1477.3 + 6144
231
+ peak_magnitude = log (max (1 / 64 , fft_minus_46 [bin_position ])
232
+ ) * 1477.3 + 6144
220
233
peak_magnitude_before = log (
221
234
max (1 / 64 , fft_minus_46 [bin_position - 1 ])) * 1477.3 + 6144
222
235
peak_magnitude_after = log (
0 commit comments