Coverage for src/python/ensembl/io/genomio/fasta/chunk.py: 86%
116 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-02-21 15:37 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-02-21 15:37 +0000
1#!env python3
3# See the NOTICE file distributed with this work for additional information
4# regarding copyright ownership.
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17"""Split a set of nucleotide sequence(s) (.fasta, .gz) into smaller chunks."""
19__all__ = [
20 "check_chunk_size_and_tolerance",
21 "chunk_fasta",
22 "chunk_fasta_stream",
23 "get_tolerated_size",
24 "prepare_out_dir_for_individuals",
25 "split_seq_by_chunk_size",
26 "split_seq_by_n",
27]
30from contextlib import nullcontext
31from io import TextIOWrapper
32import logging
33from pathlib import Path
34import re
35from typing import Any, Callable, ContextManager, Optional
37from Bio import SeqIO
38from Bio.Seq import Seq
39from Bio.SeqRecord import SeqRecord
41import ensembl.io.genomio
42from ensembl.utils.archive import open_gz_file
43from ensembl.utils.argparse import ArgumentParser
44from ensembl.utils.logging import init_logging_with_args
47def _on_value_error(msg: str) -> None:
48 """A default on_value_handler, raises ValueError with the provided `msg`.
50 Args:
51 msg: A message to raise ValueError with.
52 """
53 raise ValueError(msg)
56def check_chunk_size_and_tolerance(
57 chunk_size: int,
58 chunk_tolerance: int,
59 error_f: Callable[[str], None] = _on_value_error,
60) -> None:
61 """Check the chunk size and the tolerance are positive and chunk size is not too small
63 Args:
64 chunk_size: Chunk size to check
65 chunk_tolerance: Chunk tolerance to check
67 Dies:
68 If checks failed dies with `parser.error`
69 """
70 if chunk_size < 50_000:
71 error_f(f"wrong '--chunk_size' value: '{chunk_size}'. should be greater then 50_000. exiting...")
72 if chunk_tolerance < 0:
73 error_f(f"wrong '--chunk_tolerance' value: '{chunk_tolerance}'. can't be less then 0. exiting...")
76def split_seq_by_n(seq: str, split_pattern: Optional[re.Pattern]) -> list[int]:
77 """Split a string into chunks at the positions where the
78 pattern is found. `N`s (pattern) are appended to the chunk on the left.
80 The end point of each chunk will correspond to the end
81 of the matching part.
83 Args:
84 seq: Sequence to be split into chunks.
85 split_pattern: Pattern to search in the sequence.
87 Returns:
88 List with open coordinates of the chunks ends (or with only a single sequence length).
89 """
90 seq_len = len(seq)
91 if not split_pattern:
92 return [seq_len]
93 split_points = [m.end() for m in split_pattern.finditer(seq)]
94 if not split_points or split_points[-1] != seq_len:
95 split_points.append(seq_len)
96 return split_points
99def split_seq_by_chunk_size(
100 ends: list[int], chunk_size: int, tolerated_size: Optional[int] = None
101) -> list[int]:
102 """Split list of end coordinates, to form chunks not longer then
103 chunk_size.
105 Args:
106 ends: List of one or more chunk(s) to split a sequence.
107 chunk_size: Size of chunks to split a sequence into.
108 tolerated_size: Threshold to use instead of `chunk_size` to determine when to split a sequence.
110 Returns:
111 List with open coordinates of the chunks ends (or with only a single sequence length).
112 """
113 if not ends or chunk_size < 1:
114 return ends
116 if tolerated_size is None or tolerated_size < chunk_size:
117 tolerated_size = chunk_size
118 result = []
119 offset = 0
120 for chunk_end in ends:
121 chunk_len = chunk_end - offset
122 if chunk_len > tolerated_size:
123 # exclude starts, as they are 0 or pushed as previous chunk_ends
124 result += list(range(offset, chunk_end, chunk_size))[1:]
125 result.append(chunk_end)
126 offset = chunk_end
127 return result
130def _individual_file_opener(name: str) -> TextIOWrapper:
131 """Is used to open file for an individual chunk sequence.
133 Args:
134 name: Name of the file to open
135 """
136 return open(name, "wt", encoding="utf-8")
139def chunk_fasta_stream(
140 input_fasta: TextIOWrapper,
141 chunk_size: int,
142 chunk_size_tolerated: int,
143 output_fasta: Optional[TextIOWrapper] | nullcontext[Any],
144 individual_file_prefix: Optional[Path],
145 *,
146 n_sequence_len: int = 0,
147 chunk_sfx: str = "ens_chunk",
148 append_offset_to_chunk_name: Optional[bool] = None,
149 open_individual: Callable[[str], ContextManager[Any]] = _individual_file_opener,
150) -> list[str]:
151 """Split input TextIOWrapper stream with fasta into a smaller chunks based on
152 stretches of "N"s and then based on chunk_size_tolerated and store either to
153 the output_fasta stream (if valid) or to the files created by
154 invocation of the `open_individual` callable.
156 Args:
157 input_fasta: Input FASTA as the TextIOWrapper stream.
158 chunk_size: Size of the chunks to split into.
159 chunk_size_tolerated: If more flexibility allowed, use this as the maximum size of a chunk.
160 output_fasta: Output FASTA TextIOWrapper stream to store the chunks into,
161 if none or False, `open_individual` is used (see below).
162 individual_file_prefix: A file path prefix including dirs and filenames part to use as a
163 first part of the chunk file name.
164 n_sequence_len: Length of the stretch of `N`s to split at; not slitting on `N`s if 0.
165 chunk_sfx: A string to put between the original sequence name and the chunk suffix.
166 append_offset_to_chunk_name: A flag to append 0-based offset (`_off_{offset}`) to the chunk name.
167 open_individual: A callable taking filename as an input to generate the output file for
168 individual contig if out_put FASTA is `false` of `None`, folders should be preexisting.
169 """
171 chunk_size_tolerated = max(chunk_size, chunk_size_tolerated)
172 # output agp_lines list
173 agp_lines = []
175 # make sure not used for n_seq <= 0
176 n_split_regex = None
177 if n_sequence_len > 0: 177 ↛ 182line 177 didn't jump to line 182 because the condition on line 177 was always true
178 pattern = f"(N{{{n_sequence_len},}})"
179 n_split_regex = re.compile(pattern)
181 # process stream
182 fasta_parser = SeqIO.parse(input_fasta, "fasta")
183 for rec_count, rec in enumerate(fasta_parser, start=1):
184 rec_name = str(rec.name)
186 ends = split_seq_by_n(str(rec.seq), n_split_regex)
187 ends = split_seq_by_chunk_size(ends, chunk_size, chunk_size_tolerated)
189 offset = 0
190 for chunk, chunk_end in enumerate(ends, start=1):
191 chunk_name = f"{rec_name}_{chunk_sfx}_{chunk:03d}"
192 chunk_file_name = ""
193 if individual_file_prefix:
194 chunk_file_name = f"{individual_file_prefix}.{rec_count:03d}.{chunk:03d}.fa"
195 if append_offset_to_chunk_name: 195 ↛ 198line 195 didn't jump to line 198 because the condition on line 195 was always true
196 chunk_name += f"_off_{offset}"
198 rec_from = offset + 1
199 rec_to = chunk_end
200 chunk_len = chunk_end - offset
202 # form agp lines
203 agp_line = f"{rec_name}\t{rec_from}\t{rec_to}\t{chunk}\tW\t{chunk_name}\t1\t{chunk_len}\t+"
204 agp_lines.append(agp_line)
206 # use agp lines as fasta description
207 agp_line = agp_line.replace("\t", " ")
208 logging.info(f"Dumping {chunk_name} AGP {agp_line}")
210 # get slice and put it out
211 tmp_record = SeqRecord(
212 Seq(rec.seq[offset:chunk_end]),
213 id=chunk_name,
214 description=f"AGP {agp_line}",
215 name="",
216 )
218 # if user specified chunks -- store each chunk in an individual output file
219 with output_fasta and nullcontext(output_fasta) or open_individual(chunk_file_name) as out_file:
220 out_file.write(tmp_record.format("fasta")) # type: ignore[union-attr]
222 del tmp_record
223 offset = chunk_end
225 return agp_lines
228def get_tolerated_size(size: int, tolerance: int) -> int:
229 """Calculate max tolerated size of the chunk based on initial size and percent of allowed deviation.
231 Args:
232 size: Base chunk size
233 tolerance: Percent of allowed deviance as integer.
235 Returns:
236 Maximum tolerated chunk size.
237 """
238 tolerance = max(tolerance, 0)
240 tolerated_size = size
241 tolerated_size += size * tolerance // 100
242 return tolerated_size
245def chunk_fasta(
246 input_fasta_file: str,
247 chunk_size: int,
248 chunk_size_tolerated: int,
249 out_file_name: str,
250 individual_file_prefix: Optional[Path],
251 *,
252 agp_output_file: Optional[str] = None,
253 n_sequence_len: int = 0,
254 chunk_sfx: str = "ens_chunk",
255 append_offset_to_chunk_name: Optional[bool] = None,
256) -> None:
257 """Open `input_fasta_file` and split into a smaller chunks based on
258 stretches of "N"s and then based on chunk_size_tolerated and store either to
259 the `out_file_name` if no `individual_file_prefix` is provided or
260 store each individual chunk to a file starting with non-empty `individual_file_prefix`.
262 Args:
263 input_fasta_file: Input FASTA
264 chunk_size: Size of the chunks to split into.
265 chunk_size_tolerated: If more flexibility allowed, use this as the maximum size of a chunk.
266 out_file_name: Output FASTA to store the chunks into if no `individual_file_prefix` is provided.
267 individual_file_prefix: A file path prefix including dirs and filenames part to use as a
268 first part of the chunk file name.
269 agp_output_file: Output AGP file to store the map for the chunking procedure if present and non-empty.
270 n_sequence_len: Length of the stretch of `N`s to split at; not slitting on `N`s if 0.
271 chunk_sfx: A string to put between the original sequence name and the chunk suffix.
272 append_offset_to_chunk_name: Append 0-based offset in the form of `_off_{offset}` to the chunk name.
273 """
275 # process input fasta
276 with open_gz_file(input_fasta_file) as fasta:
277 logging.info(
278 f"splitting sequences from '{input_fasta_file}', chunk size {chunk_size:_}, \
279 splitting on {n_sequence_len} Ns (0 -- disabled)"
280 )
281 # do not open a joined file if you plan to open many individual ones
282 with (
283 individual_file_prefix
284 and nullcontext(None)
285 or open(out_file_name, "wt", encoding="utf-8") as out_file_joined
286 ):
287 agp_lines = chunk_fasta_stream(
288 fasta,
289 chunk_size,
290 chunk_size_tolerated,
291 out_file_joined,
292 individual_file_prefix,
293 n_sequence_len=n_sequence_len,
294 chunk_sfx=chunk_sfx,
295 append_offset_to_chunk_name=append_offset_to_chunk_name,
296 )
298 # dump AGP
299 if agp_output_file:
300 with open(agp_output_file, "w", encoding="utf-8") as agp_out:
301 agp_out.write("\n".join(agp_lines) + "\n")
304def prepare_out_dir_for_individuals(dir_name: Path, file_part: str) -> Optional[Path]:
305 """Creates `dir_name` (including upstream dirs) and returns its paths with the `file_part` appended.
307 Args:
308 dir_name: Directory to create.
309 file_part: File part to append.
311 Returns:
312 `dir_name` with appended `file_part`.
314 Throws:
315 exception if not able to create directory.
316 """
317 file_prefix = None
318 if dir_name: 318 ↛ 321line 318 didn't jump to line 321 because the condition on line 318 was always true
319 dir_name.mkdir(parents=True, exist_ok=True)
320 file_prefix = Path(dir_name, file_part)
321 return file_prefix
324def main() -> None:
325 """Module entry-point."""
326 parser = ArgumentParser(description=__doc__)
327 parser.add_argument_src_path(
328 "--fasta_dna",
329 required=True,
330 metavar="input[.fa | .gz]",
331 help="Raw or compressed (.gz) FASTA file with DNA sequences to be split",
332 )
333 parser.add_argument(
334 "--out",
335 required=False,
336 default="chunks.fna",
337 help="Chunks output file",
338 )
339 parser.add_argument(
340 "--individual_out_dir",
341 required=False,
342 default=None,
343 type=Path,
344 help="Output directory for writing files with individual chunks to. \
345 If provided,`--out` value used as a filename prefix",
346 )
347 parser.add_argument_dst_path(
348 "--agp_output_file",
349 required=False,
350 default=None,
351 help="AGP file with chunks to contigs mapping.",
352 )
353 parser.add_argument(
354 "--chunk_size",
355 required=False,
356 default=100_000_000,
357 metavar="100_000_000",
358 type=int,
359 help="Maximum chunk size (should be greater then 50k).",
360 )
361 parser.add_argument(
362 "--chunk_sfx",
363 required=False,
364 default="ens_chunk",
365 type=str,
366 help="Added to contig ID before chunk number.",
367 )
368 parser.add_argument(
369 "--chunk_tolerance",
370 required=False,
371 default=0,
372 type=int,
373 help="Chunk size tolerance percentage. If the to-be-written chunk is longer \
374 than the defined chunk size by less than the specified tolerance percentage,\
375 it will not be split.",
376 )
377 parser.add_argument(
378 "--n_seq",
379 required=False,
380 default=0,
381 type=int,
382 help="Split into chunks at positions of at least this number of N characters.",
383 )
384 parser.add_argument(
385 "--add_offset",
386 required=False,
387 action="store_true",
388 help="Append zero-based offset to chunk name ('_off_{offset}').",
389 )
390 parser.add_argument("--version", action="version", version=ensembl.io.genomio.__version__)
391 parser.add_log_arguments(add_log_file=True)
392 args = parser.parse_args()
393 init_logging_with_args(args)
395 check_chunk_size_and_tolerance(args.chunk_size, args.chunk_tolerance, error_f=parser.error)
397 chunk_size_tolerated = get_tolerated_size(args.chunk_size, args.chunk_tolerance)
399 file_prefix = prepare_out_dir_for_individuals(args.individual_out_dir, args.out or args.fasta_dna)
401 chunk_fasta(
402 args.fasta_dna,
403 args.chunk_size,
404 chunk_size_tolerated,
405 args.out,
406 individual_file_prefix=file_prefix,
407 agp_output_file=args.agp_output_file,
408 n_sequence_len=args.n_seq,
409 chunk_sfx=args.chunk_sfx,
410 append_offset_to_chunk_name=args.add_offset,
411 )
414if __name__ == "__main__":
415 main()