Coverage for src/python/ensembl/io/genomio/fasta/chunk.py: 86%

116 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-02-21 15:37 +0000

1#!env python3 

2 

3# See the NOTICE file distributed with this work for additional information 

4# regarding copyright ownership. 

5# 

6# Licensed under the Apache License, Version 2.0 (the "License"); 

7# you may not use this file except in compliance with the License. 

8# You may obtain a copy of the License at 

9# 

10# http://www.apache.org/licenses/LICENSE-2.0 

11# 

12# Unless required by applicable law or agreed to in writing, software 

13# distributed under the License is distributed on an "AS IS" BASIS, 

14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

15# See the License for the specific language governing permissions and 

16# limitations under the License. 

17"""Split a set of nucleotide sequence(s) (.fasta, .gz) into smaller chunks.""" 

18 

19__all__ = [ 

20 "check_chunk_size_and_tolerance", 

21 "chunk_fasta", 

22 "chunk_fasta_stream", 

23 "get_tolerated_size", 

24 "prepare_out_dir_for_individuals", 

25 "split_seq_by_chunk_size", 

26 "split_seq_by_n", 

27] 

28 

29 

30from contextlib import nullcontext 

31from io import TextIOWrapper 

32import logging 

33from pathlib import Path 

34import re 

35from typing import Any, Callable, ContextManager, Optional 

36 

37from Bio import SeqIO 

38from Bio.Seq import Seq 

39from Bio.SeqRecord import SeqRecord 

40 

41import ensembl.io.genomio 

42from ensembl.utils.archive import open_gz_file 

43from ensembl.utils.argparse import ArgumentParser 

44from ensembl.utils.logging import init_logging_with_args 

45 

46 

47def _on_value_error(msg: str) -> None: 

48 """A default on_value_handler, raises ValueError with the provided `msg`. 

49 

50 Args: 

51 msg: A message to raise ValueError with. 

52 """ 

53 raise ValueError(msg) 

54 

55 

56def check_chunk_size_and_tolerance( 

57 chunk_size: int, 

58 chunk_tolerance: int, 

59 error_f: Callable[[str], None] = _on_value_error, 

60) -> None: 

61 """Check the chunk size and the tolerance are positive and chunk size is not too small 

62 

63 Args: 

64 chunk_size: Chunk size to check 

65 chunk_tolerance: Chunk tolerance to check 

66 

67 Dies: 

68 If checks failed dies with `parser.error` 

69 """ 

70 if chunk_size < 50_000: 

71 error_f(f"wrong '--chunk_size' value: '{chunk_size}'. should be greater then 50_000. exiting...") 

72 if chunk_tolerance < 0: 

73 error_f(f"wrong '--chunk_tolerance' value: '{chunk_tolerance}'. can't be less then 0. exiting...") 

74 

75 

76def split_seq_by_n(seq: str, split_pattern: Optional[re.Pattern]) -> list[int]: 

77 """Split a string into chunks at the positions where the 

78 pattern is found. `N`s (pattern) are appended to the chunk on the left. 

79 

80 The end point of each chunk will correspond to the end 

81 of the matching part. 

82 

83 Args: 

84 seq: Sequence to be split into chunks. 

85 split_pattern: Pattern to search in the sequence. 

86 

87 Returns: 

88 List with open coordinates of the chunks ends (or with only a single sequence length). 

89 """ 

90 seq_len = len(seq) 

91 if not split_pattern: 

92 return [seq_len] 

93 split_points = [m.end() for m in split_pattern.finditer(seq)] 

94 if not split_points or split_points[-1] != seq_len: 

95 split_points.append(seq_len) 

96 return split_points 

97 

98 

99def split_seq_by_chunk_size( 

100 ends: list[int], chunk_size: int, tolerated_size: Optional[int] = None 

101) -> list[int]: 

102 """Split list of end coordinates, to form chunks not longer then 

103 chunk_size. 

104 

105 Args: 

106 ends: List of one or more chunk(s) to split a sequence. 

107 chunk_size: Size of chunks to split a sequence into. 

108 tolerated_size: Threshold to use instead of `chunk_size` to determine when to split a sequence. 

109 

110 Returns: 

111 List with open coordinates of the chunks ends (or with only a single sequence length). 

112 """ 

113 if not ends or chunk_size < 1: 

114 return ends 

115 

116 if tolerated_size is None or tolerated_size < chunk_size: 

117 tolerated_size = chunk_size 

118 result = [] 

119 offset = 0 

120 for chunk_end in ends: 

121 chunk_len = chunk_end - offset 

122 if chunk_len > tolerated_size: 

123 # exclude starts, as they are 0 or pushed as previous chunk_ends 

124 result += list(range(offset, chunk_end, chunk_size))[1:] 

125 result.append(chunk_end) 

126 offset = chunk_end 

127 return result 

128 

129 

130def _individual_file_opener(name: str) -> TextIOWrapper: 

131 """Is used to open file for an individual chunk sequence. 

132 

133 Args: 

134 name: Name of the file to open 

135 """ 

136 return open(name, "wt", encoding="utf-8") 

137 

138 

139def chunk_fasta_stream( 

140 input_fasta: TextIOWrapper, 

141 chunk_size: int, 

142 chunk_size_tolerated: int, 

143 output_fasta: Optional[TextIOWrapper] | nullcontext[Any], 

144 individual_file_prefix: Optional[Path], 

145 *, 

146 n_sequence_len: int = 0, 

147 chunk_sfx: str = "ens_chunk", 

148 append_offset_to_chunk_name: Optional[bool] = None, 

149 open_individual: Callable[[str], ContextManager[Any]] = _individual_file_opener, 

150) -> list[str]: 

151 """Split input TextIOWrapper stream with fasta into a smaller chunks based on 

152 stretches of "N"s and then based on chunk_size_tolerated and store either to 

153 the output_fasta stream (if valid) or to the files created by 

154 invocation of the `open_individual` callable. 

155 

156 Args: 

157 input_fasta: Input FASTA as the TextIOWrapper stream. 

158 chunk_size: Size of the chunks to split into. 

159 chunk_size_tolerated: If more flexibility allowed, use this as the maximum size of a chunk. 

160 output_fasta: Output FASTA TextIOWrapper stream to store the chunks into, 

161 if none or False, `open_individual` is used (see below). 

162 individual_file_prefix: A file path prefix including dirs and filenames part to use as a 

163 first part of the chunk file name. 

164 n_sequence_len: Length of the stretch of `N`s to split at; not slitting on `N`s if 0. 

165 chunk_sfx: A string to put between the original sequence name and the chunk suffix. 

166 append_offset_to_chunk_name: A flag to append 0-based offset (`_off_{offset}`) to the chunk name. 

167 open_individual: A callable taking filename as an input to generate the output file for 

168 individual contig if out_put FASTA is `false` of `None`, folders should be preexisting. 

169 """ 

170 

171 chunk_size_tolerated = max(chunk_size, chunk_size_tolerated) 

172 # output agp_lines list 

173 agp_lines = [] 

174 

175 # make sure not used for n_seq <= 0 

176 n_split_regex = None 

177 if n_sequence_len > 0: 177 ↛ 182line 177 didn't jump to line 182 because the condition on line 177 was always true

178 pattern = f"(N{{{n_sequence_len},}})" 

179 n_split_regex = re.compile(pattern) 

180 

181 # process stream 

182 fasta_parser = SeqIO.parse(input_fasta, "fasta") 

183 for rec_count, rec in enumerate(fasta_parser, start=1): 

184 rec_name = str(rec.name) 

185 

186 ends = split_seq_by_n(str(rec.seq), n_split_regex) 

187 ends = split_seq_by_chunk_size(ends, chunk_size, chunk_size_tolerated) 

188 

189 offset = 0 

190 for chunk, chunk_end in enumerate(ends, start=1): 

191 chunk_name = f"{rec_name}_{chunk_sfx}_{chunk:03d}" 

192 chunk_file_name = "" 

193 if individual_file_prefix: 

194 chunk_file_name = f"{individual_file_prefix}.{rec_count:03d}.{chunk:03d}.fa" 

195 if append_offset_to_chunk_name: 195 ↛ 198line 195 didn't jump to line 198 because the condition on line 195 was always true

196 chunk_name += f"_off_{offset}" 

197 

198 rec_from = offset + 1 

199 rec_to = chunk_end 

200 chunk_len = chunk_end - offset 

201 

202 # form agp lines 

203 agp_line = f"{rec_name}\t{rec_from}\t{rec_to}\t{chunk}\tW\t{chunk_name}\t1\t{chunk_len}\t+" 

204 agp_lines.append(agp_line) 

205 

206 # use agp lines as fasta description 

207 agp_line = agp_line.replace("\t", " ") 

208 logging.info(f"Dumping {chunk_name} AGP {agp_line}") 

209 

210 # get slice and put it out 

211 tmp_record = SeqRecord( 

212 Seq(rec.seq[offset:chunk_end]), 

213 id=chunk_name, 

214 description=f"AGP {agp_line}", 

215 name="", 

216 ) 

217 

218 # if user specified chunks -- store each chunk in an individual output file 

219 with output_fasta and nullcontext(output_fasta) or open_individual(chunk_file_name) as out_file: 

220 out_file.write(tmp_record.format("fasta")) # type: ignore[union-attr] 

221 

222 del tmp_record 

223 offset = chunk_end 

224 

225 return agp_lines 

226 

227 

228def get_tolerated_size(size: int, tolerance: int) -> int: 

229 """Calculate max tolerated size of the chunk based on initial size and percent of allowed deviation. 

230 

231 Args: 

232 size: Base chunk size 

233 tolerance: Percent of allowed deviance as integer. 

234 

235 Returns: 

236 Maximum tolerated chunk size. 

237 """ 

238 tolerance = max(tolerance, 0) 

239 

240 tolerated_size = size 

241 tolerated_size += size * tolerance // 100 

242 return tolerated_size 

243 

244 

245def chunk_fasta( 

246 input_fasta_file: str, 

247 chunk_size: int, 

248 chunk_size_tolerated: int, 

249 out_file_name: str, 

250 individual_file_prefix: Optional[Path], 

251 *, 

252 agp_output_file: Optional[str] = None, 

253 n_sequence_len: int = 0, 

254 chunk_sfx: str = "ens_chunk", 

255 append_offset_to_chunk_name: Optional[bool] = None, 

256) -> None: 

257 """Open `input_fasta_file` and split into a smaller chunks based on 

258 stretches of "N"s and then based on chunk_size_tolerated and store either to 

259 the `out_file_name` if no `individual_file_prefix` is provided or 

260 store each individual chunk to a file starting with non-empty `individual_file_prefix`. 

261 

262 Args: 

263 input_fasta_file: Input FASTA 

264 chunk_size: Size of the chunks to split into. 

265 chunk_size_tolerated: If more flexibility allowed, use this as the maximum size of a chunk. 

266 out_file_name: Output FASTA to store the chunks into if no `individual_file_prefix` is provided. 

267 individual_file_prefix: A file path prefix including dirs and filenames part to use as a 

268 first part of the chunk file name. 

269 agp_output_file: Output AGP file to store the map for the chunking procedure if present and non-empty. 

270 n_sequence_len: Length of the stretch of `N`s to split at; not slitting on `N`s if 0. 

271 chunk_sfx: A string to put between the original sequence name and the chunk suffix. 

272 append_offset_to_chunk_name: Append 0-based offset in the form of `_off_{offset}` to the chunk name. 

273 """ 

274 

275 # process input fasta 

276 with open_gz_file(input_fasta_file) as fasta: 

277 logging.info( 

278 f"splitting sequences from '{input_fasta_file}', chunk size {chunk_size:_}, \ 

279 splitting on {n_sequence_len} Ns (0 -- disabled)" 

280 ) 

281 # do not open a joined file if you plan to open many individual ones 

282 with ( 

283 individual_file_prefix 

284 and nullcontext(None) 

285 or open(out_file_name, "wt", encoding="utf-8") as out_file_joined 

286 ): 

287 agp_lines = chunk_fasta_stream( 

288 fasta, 

289 chunk_size, 

290 chunk_size_tolerated, 

291 out_file_joined, 

292 individual_file_prefix, 

293 n_sequence_len=n_sequence_len, 

294 chunk_sfx=chunk_sfx, 

295 append_offset_to_chunk_name=append_offset_to_chunk_name, 

296 ) 

297 

298 # dump AGP 

299 if agp_output_file: 

300 with open(agp_output_file, "w", encoding="utf-8") as agp_out: 

301 agp_out.write("\n".join(agp_lines) + "\n") 

302 

303 

304def prepare_out_dir_for_individuals(dir_name: Path, file_part: str) -> Optional[Path]: 

305 """Creates `dir_name` (including upstream dirs) and returns its paths with the `file_part` appended. 

306 

307 Args: 

308 dir_name: Directory to create. 

309 file_part: File part to append. 

310 

311 Returns: 

312 `dir_name` with appended `file_part`. 

313 

314 Throws: 

315 exception if not able to create directory. 

316 """ 

317 file_prefix = None 

318 if dir_name: 318 ↛ 321line 318 didn't jump to line 321 because the condition on line 318 was always true

319 dir_name.mkdir(parents=True, exist_ok=True) 

320 file_prefix = Path(dir_name, file_part) 

321 return file_prefix 

322 

323 

324def main() -> None: 

325 """Module entry-point.""" 

326 parser = ArgumentParser(description=__doc__) 

327 parser.add_argument_src_path( 

328 "--fasta_dna", 

329 required=True, 

330 metavar="input[.fa | .gz]", 

331 help="Raw or compressed (.gz) FASTA file with DNA sequences to be split", 

332 ) 

333 parser.add_argument( 

334 "--out", 

335 required=False, 

336 default="chunks.fna", 

337 help="Chunks output file", 

338 ) 

339 parser.add_argument( 

340 "--individual_out_dir", 

341 required=False, 

342 default=None, 

343 type=Path, 

344 help="Output directory for writing files with individual chunks to. \ 

345 If provided,`--out` value used as a filename prefix", 

346 ) 

347 parser.add_argument_dst_path( 

348 "--agp_output_file", 

349 required=False, 

350 default=None, 

351 help="AGP file with chunks to contigs mapping.", 

352 ) 

353 parser.add_argument( 

354 "--chunk_size", 

355 required=False, 

356 default=100_000_000, 

357 metavar="100_000_000", 

358 type=int, 

359 help="Maximum chunk size (should be greater then 50k).", 

360 ) 

361 parser.add_argument( 

362 "--chunk_sfx", 

363 required=False, 

364 default="ens_chunk", 

365 type=str, 

366 help="Added to contig ID before chunk number.", 

367 ) 

368 parser.add_argument( 

369 "--chunk_tolerance", 

370 required=False, 

371 default=0, 

372 type=int, 

373 help="Chunk size tolerance percentage. If the to-be-written chunk is longer \ 

374 than the defined chunk size by less than the specified tolerance percentage,\ 

375 it will not be split.", 

376 ) 

377 parser.add_argument( 

378 "--n_seq", 

379 required=False, 

380 default=0, 

381 type=int, 

382 help="Split into chunks at positions of at least this number of N characters.", 

383 ) 

384 parser.add_argument( 

385 "--add_offset", 

386 required=False, 

387 action="store_true", 

388 help="Append zero-based offset to chunk name ('_off_{offset}').", 

389 ) 

390 parser.add_argument("--version", action="version", version=ensembl.io.genomio.__version__) 

391 parser.add_log_arguments(add_log_file=True) 

392 args = parser.parse_args() 

393 init_logging_with_args(args) 

394 

395 check_chunk_size_and_tolerance(args.chunk_size, args.chunk_tolerance, error_f=parser.error) 

396 

397 chunk_size_tolerated = get_tolerated_size(args.chunk_size, args.chunk_tolerance) 

398 

399 file_prefix = prepare_out_dir_for_individuals(args.individual_out_dir, args.out or args.fasta_dna) 

400 

401 chunk_fasta( 

402 args.fasta_dna, 

403 args.chunk_size, 

404 chunk_size_tolerated, 

405 args.out, 

406 individual_file_prefix=file_prefix, 

407 agp_output_file=args.agp_output_file, 

408 n_sequence_len=args.n_seq, 

409 chunk_sfx=args.chunk_sfx, 

410 append_offset_to_chunk_name=args.add_offset, 

411 ) 

412 

413 

414if __name__ == "__main__": 

415 main()