@@ -38,51 +38,157 @@ def sequence_segments(
38
38
# NB: Mutable sequence of bytes
39
39
# NB: This is over a 1000x (not a typo) speed-up over a byte object
40
40
working_sequence_buffer = bytearray ()
41
+ overlap_buffer = bytearray ()
41
42
42
- # NB: Line buffered reading is probably the best way to handle edge cases
43
- # around sequence ID parsing and newline characters
44
- # NB: fasta_file is assumed to be opened in binary mode (rb)
45
- for fasta_line in fasta_file :
46
- # If we are on a new sequence (sequence ID)
47
- # NB: Assume that either of the delimiters are indicators of a
48
- # new sequence, notably including comments
49
- if fasta_line .startswith (FASTA_FILE_IGNORE_DELIMITERS ): # type: ignore
50
- # Yield the current sequence segment if there is remaining sequence
51
- # NB: We always keep the lookahead/overlap in the working sequence
52
- # buffer, therefore there can only be sequence remaining if it is
53
- # longer than the lookahead/overlap length
54
- if len (working_sequence_buffer ) > sequence_overlap_length :
55
- yield SequenceSegment (current_sequence_id , # type: ignore
56
- bytes (working_sequence_buffer ))
57
-
58
- # Get the new reference sequence name
43
+ sequences = [] # working list of sequences
44
+
45
+ # import debugpy
46
+ # debugpy.listen(5678)
47
+ # debugpy.wait_for_client()
48
+ # debugpy.breakpoint()
49
+ # For every line in the fasta file
50
+ for line in fasta_file :
51
+ line = line .rstrip () # Remove trailing newline
52
+
53
+ # While there is enough working sequence buffer to fill the requested
54
+ # sequence length
55
+ # Create sequences for each segment
56
+ sequences .extend (get_sequences_from_buffer (
57
+ working_sequence_buffer ,
58
+ overlap_buffer ,
59
+ sequence_length ,
60
+ sequence_overlap_length ))
61
+
62
+ # If the current line is a sequence ID
63
+ if line .startswith (FASTA_FILE_IGNORE_DELIMITERS ): # type: ignore
64
+ # Yield the remaining sequences
65
+ for sequence_buffer in get_remaining_sequence_segments (
66
+ current_sequence_id , # type: ignore
67
+ sequences ,
68
+ working_sequence_buffer ,
69
+ overlap_buffer ,
70
+ sequence_length ,
71
+ sequence_overlap_length ):
72
+ yield sequence_buffer
73
+
74
+ # Empty working sequences
75
+ sequences .clear ()
76
+
77
+ # Update the working sequence ID
59
78
# NB: remove leading '>'
60
- current_sequence_id = fasta_line .split ()[0 ][1 :] # type: ignore
61
- # Reset the working sequence buffer
62
- working_sequence_buffer = bytearray ()
79
+ current_sequence_id = line .split ()[0 ][1 :] # type: ignore
80
+ # Clear the overlap buffer
81
+ overlap_buffer .clear ()
82
+ # Clear the working buffer
83
+ working_sequence_buffer .clear ()
63
84
64
- # Otherwise the line we are on is sequence data
85
+ # Otherwise the line is not a sequence ID and is sequence data
65
86
else :
66
- fasta_line = fasta_line .rstrip () # Remove trailing newline
67
- # Add to the working sequence buffer
68
- working_sequence_buffer += fasta_line # type: ignore
69
- # While we have enough sequence buffer to fill a sequence segment
70
- while len (working_sequence_buffer ) >= sequence_length :
71
- yield SequenceSegment (
72
- current_sequence_id , # type: ignore
73
- bytes (working_sequence_buffer [:sequence_length ]))
74
- # Truncate the working sequence buffer by the sequence length
75
- # minus the lookahead
76
- # XXX: Assert that the kmer/sequence length is always larger
77
- # than the lookahead length?
78
- truncate_length = sequence_length - sequence_overlap_length
79
- working_sequence_buffer = \
80
- working_sequence_buffer [truncate_length :]
81
-
82
- # Yield the last sequence segment
83
- # NB: We always keep the lookahead in the working sequence buffer
84
- # So there needs to be check if it is longer the lookahead length
85
- if len (working_sequence_buffer ) > sequence_overlap_length :
86
- yield SequenceSegment (current_sequence_id , # type: ignore
87
- bytes (working_sequence_buffer ),
88
- epilogue = True )
87
+ # If any sequences were created
88
+ if sequences :
89
+ # Create all sequence segments but for the last
90
+ for sequence in sequences [:- 1 ]:
91
+ # Yield a sequence segment without the epilogue flag set
92
+ yield SequenceSegment (current_sequence_id , bytes (sequence ))
93
+ # Carry over the last sequence to the next iteration
94
+ # in case this the last line it the sequence filled the
95
+ # remaining buffer exactly
96
+ sequences = [sequences [- 1 ]]
97
+
98
+ # Add the sequence line to the working buffer
99
+ working_sequence_buffer += line # type: ignore
100
+
101
+ # Yield the remaining sequences
102
+ for sequence_buffer in get_remaining_sequence_segments (
103
+ current_sequence_id , # type: ignore
104
+ sequences ,
105
+ working_sequence_buffer ,
106
+ overlap_buffer ,
107
+ sequence_length ,
108
+ sequence_overlap_length ):
109
+
110
+ yield sequence_buffer
111
+
112
+
113
+ def get_sequences_from_buffer (working_sequence_buffer : bytearray ,
114
+ overlap_sequence : bytearray ,
115
+ sequence_length : int ,
116
+ sequence_overlap_length : int ):
117
+ """Returns a list of overlapping byte sequences from a sequence buffer.
118
+ Modifies the working sequence buffer and overlap buffer in place.
119
+ """
120
+
121
+ # If there is no sequence buffer
122
+ if not working_sequence_buffer :
123
+ # Return nothing
124
+ return []
125
+
126
+ sequences = []
127
+
128
+ non_overlap_length = sequence_length - sequence_overlap_length
129
+
130
+ while (len (working_sequence_buffer ) + len (overlap_sequence ) >=
131
+ sequence_length ):
132
+ # If there is an overlap buffer
133
+ if overlap_sequence :
134
+ # Create the sequence with the overlap
135
+ sequence = bytes (
136
+ overlap_sequence +
137
+ working_sequence_buffer [:non_overlap_length ])
138
+ bytes_used = non_overlap_length
139
+ else :
140
+ # Otherwise create the sequence without the overlap
141
+ sequence = bytes (working_sequence_buffer [:sequence_length ])
142
+ bytes_used = sequence_length
143
+
144
+ # Add to our working list of sequences
145
+ sequences .append (sequence )
146
+ # Update the overlap buffer if it exists by taking the last
147
+ # current calculated sequence
148
+ if sequence_overlap_length :
149
+ # NB: Avoid re-assignment to modifiy in place
150
+ overlap_sequence [:] = sequence [- sequence_overlap_length :]
151
+ # Truncate the start of working sequence buffer by bytes used
152
+ working_sequence_buffer [:bytes_used ] = b''
153
+
154
+ return sequences
155
+
156
+
157
+ def get_remaining_sequence_segments (sequence_id : bytes ,
158
+ sequences : list [bytes ],
159
+ working_sequence_buffer : bytearray ,
160
+ overlap_buffer : bytearray ,
161
+ sequence_length : int ,
162
+ sequence_overlap_length : int ):
163
+ # Assumes last of any buffer is the the epilogue
164
+
165
+ sequence_segments = []
166
+
167
+ sequences .extend (get_sequences_from_buffer (
168
+ working_sequence_buffer ,
169
+ overlap_buffer ,
170
+ sequence_length ,
171
+ sequence_overlap_length ))
172
+
173
+ if working_sequence_buffer :
174
+ sequences .append (bytes (overlap_buffer + working_sequence_buffer ))
175
+
176
+ # If any sequences were created
177
+ if sequences :
178
+ # Create a sequence segment for all but the last element
179
+ # NB: Empty on a single list
180
+ for sequence in sequences [:- 1 ]:
181
+ sequence_segments .append (
182
+ SequenceSegment (sequence_id , bytes (sequence ))
183
+ )
184
+
185
+ # Create a sequence segment for the last element with the
186
+ # epilogue flag set
187
+
188
+ sequence_segments .append (
189
+ SequenceSegment (sequence_id ,
190
+ bytes (sequences [- 1 ]),
191
+ epilogue = True )
192
+ )
193
+
194
+ return sequence_segments
0 commit comments