Coverage for src/python/ensembl/io/genomio/genbank/extract_data.py: 89%

258 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-02-21 15:37 +0000

1# See the NOTICE file distributed with this work for additional information 

2# regarding copyright ownership. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15"""Parse a Genbank file and creates cleaned up files from it: 

16- DNA fasta 

17- Peptide fasta 

18- Gene models GFF3 

19- seq_regions json 

20- genome metadata json 

21 

22Raises: 

23 GBParseError: If the structure of the gb file cannot be parsed. 

24 UnsupportedData: If some data is not as expected. 

25 

26Returns: 

27 json_output: json file with a dict that contains all genome files created. 

28""" 

29 

30__all__ = ["GBParseError", "UnsupportedData", "GenomeFiles", "FormattedFilesGenerator"] 

31 

32from collections import Counter 

33import json 

34import logging 

35from os import PathLike 

36from pathlib import Path 

37from typing import Any, Dict, List, Optional, Tuple 

38 

39from BCBio import GFF 

40from Bio import GenBank, SeqIO 

41from Bio.Seq import Seq 

42from Bio.SeqRecord import SeqRecord 

43from Bio.SeqFeature import SeqFeature 

44 

45import ensembl.io.genomio 

46from ensembl.utils.argparse import ArgumentParser 

47from ensembl.utils.logging import init_logging_with_args 

48 

49 

50class GBParseError(Exception): 

51 """Error when parsing the Genbank file.""" 

52 

53 

54class UnsupportedData(Exception): 

55 """When an expected data is not supported by the current parser.""" 

56 

57 

58class GenomeFiles(dict): 

59 """ 

60 Store the representation of the genome files created. 

61 """ 

62 

63 def __init__(self, out_dir: PathLike) -> None: 

64 super().__init__() 

65 out_dir = Path(out_dir) 

66 self["genome"] = out_dir / "genome.json" 

67 self["seq_region"] = out_dir / "seq_region.json" 

68 self["fasta_dna"] = out_dir / "dna.fasta" 

69 self["fasta_pep"] = out_dir / "pep.fasta" 

70 self["gene_models"] = out_dir / "genes.gff" 

71 

72 

73class FormattedFilesGenerator: 

74 """ 

75 Contains a parser to load data from a file, and output a set of files that follow our schema 

76 for input into a core database 

77 """ 

78 

79 locations = { 

80 "mitochondrion": "mitochondrial_chromosome", 

81 "apicoplast": "apicoplast_chromosome", 

82 "chloroplast": "chloroplast_chromosome", 

83 "chromoplast": "chromoplast_chromosome", 

84 "cyanelle": "cyanelle_chromosome", 

85 "leucoplast": "leucoplast_chromosome", 

86 } 

87 

88 allowed_feat_types = [ 

89 "gene", 

90 "transcript", 

91 "tRNA", 

92 "rRNA", 

93 "CDS", 

94 ] 

95 

96 def __init__(self, prod_name: str, gb_file: PathLike, prefix: str, out_dir: PathLike) -> None: 

97 self.prefix = prefix 

98 self.seq_records: List[SeqRecord] = [] 

99 self.prod_name = prod_name 

100 self.gb_file = gb_file 

101 

102 # Output the gff3 file 

103 self.files = GenomeFiles(Path(out_dir)) 

104 

105 def parse_genbank(self, gb_file: PathLike) -> None: 

106 """ 

107 Load records metadata from a Genbank file 

108 

109 Args: 

110 gb_file: Path to downloaded genbank file 

111 """ 

112 organella = self._get_organella(gb_file) 

113 logging.debug(f"Organella loaded: {organella}") 

114 

115 with open(gb_file, "r") as gbh: 

116 for record in SeqIO.parse(gbh, "genbank"): 

117 # We don't want the record description (especially for the fasta file) 

118 record.description = "" 

119 record.organelle = None 

120 if record.id in organella: 120 ↛ 122line 120 didn't jump to line 122 because the condition on line 120 was always true

121 record.annotations["organelle"] = organella[record.id] 

122 self.seq_records.append(record) 

123 

124 if len(self.seq_records) >= 1: 124 ↛ 127line 124 didn't jump to line 127 because the condition on line 124 was always true

125 self.format_write_record() 

126 else: 

127 logging.warning("No records are found in gb_file") 

128 

129 def format_write_record(self) -> None: 

130 """ 

131 Generate the prepared files from genbank record 

132 """ 

133 self._format_genome_data() 

134 self._format_write_genes_gff() 

135 self._format_write_seq_json() 

136 self._write_fasta_dna() 

137 

138 def _get_organella(self, gb_file: PathLike) -> Dict[str, str]: 

139 """ 

140 Retrieve the organelle from the genbank file, using the specific GenBank object, 

141 because SeqIO does not support this field 

142 

143 Args: 

144 gb_file: path to genbank file 

145 """ 

146 organella = {} 

147 with open(gb_file, "r") as gbh: 

148 for record in GenBank.parse(gbh): 

149 accession = record.version 

150 for q in record.features[0].qualifiers: 

151 if q.key == "/organelle=": 

152 organelle = q.value.replace('"', "") 

153 organella[accession] = organelle 

154 return organella 

155 

156 def _write_fasta_dna(self) -> None: 

157 """ 

158 Generate a DNA fasta file with all the sequences in the record 

159 """ 

160 logging.debug(f"Write {len(self.seq_records)} DNA sequences to {self.files['fasta_dna']}") 

161 with open(self.files["fasta_dna"], "w") as fasta_fh: 

162 SeqIO.write(self.seq_records, fasta_fh, "fasta") 

163 

164 def _format_write_genes_gff(self) -> None: 

165 """ 

166 Extract gene models from the record, and write a GFF and peptide fasta file. 

167 Raise GBParseError If the IDs in all the records are not unique. 

168 """ 

169 peptides: List[SeqRecord] = [] 

170 gff_records: List[SeqRecord] = [] 

171 all_ids: List[str] = [] 

172 

173 for record in self.seq_records: 

174 new_record, rec_ids, rec_peptides = self._parse_record(record) 

175 if new_record.features: 175 ↛ 177line 175 didn't jump to line 177 because the condition on line 175 was always true

176 gff_records.append(new_record) 

177 all_ids += rec_ids 

178 peptides += rec_peptides 

179 

180 if gff_records: 180 ↛ 183line 180 didn't jump to line 183 because the condition on line 180 was always true

181 self._write_genes_gff(gff_records) 

182 

183 if peptides: 183 ↛ 186line 183 didn't jump to line 186 because the condition on line 183 was always true

184 self._write_pep_fasta(peptides) 

185 

186 logging.debug("Check that IDs are unique") 

187 count = dict(Counter(all_ids)) 

188 num_duplicates = 0 

189 for key in count: 

190 if count[key] > 1: 

191 num_duplicates += 1 

192 logging.warning(f"ID {key} is duplicated {count[key]} times") 

193 if num_duplicates > 0: 

194 raise GBParseError(f"Some {num_duplicates} IDs are duplicated") 

195 

196 def _write_genes_gff(self, gff_records: List[SeqRecord]) -> None: 

197 """ 

198 Generate gene_models.gff file with the parsed gff_features 

199 

200 Args: 

201 gff_records: List of records with features extracted from the record 

202 """ 

203 logging.debug(f"Write {len(gff_records)} gene records to {self.files['gene_models']}") 

204 with self.files["gene_models"].open("w") as gff_fh: 

205 GFF.write(gff_records, gff_fh) 

206 

207 def _write_pep_fasta(self, peptides: List[SeqRecord]) -> None: 

208 """ 

209 Generate a peptide fasta file with the protein ids and sequence 

210 

211 Args: 

212 peptides: List of extracted peptide features as records 

213 """ 

214 logging.debug(f"Write {len(peptides)} peptide sequences to {self.files['fasta_pep']}") 

215 with self.files["fasta_pep"].open("w") as fasta_fh: 

216 SeqIO.write(peptides, fasta_fh, "fasta") 

217 

218 def _parse_record(self, record: SeqRecord) -> Tuple[SeqRecord, List[str], List[SeqRecord]]: 

219 """ 

220 Parse a gene feature from the genbank file 

221 Args: 

222 gene_feat: Gene feature to parse 

223 gene_name: Gene name associated with the gene feature 

224 """ 

225 all_ids: List[str] = [] 

226 peptides: List[SeqRecord] = [] 

227 feats: Dict[str, SeqFeature] = {} 

228 

229 for feat in record.features: 

230 # Silently skip any unsupported feature type 

231 if feat.type not in self.allowed_feat_types: 

232 continue 

233 

234 # Create a clean clone of the feature 

235 gff_qualifiers = feat.qualifiers 

236 gff_feat = SeqFeature( 

237 location=feat.location, 

238 type=feat.type, 

239 qualifiers=gff_qualifiers, 

240 ) 

241 # Only Genes should have a name: use either attribute gene or locus_tag 

242 gene_name = gff_qualifiers.get("gene", [None])[0] 

243 if gene_name is None: 

244 gene_name = gff_qualifiers.get("locus_tag", [None])[0] 

245 

246 # Parse this gene 

247 if gene_name is not None: 

248 gene_feats, gene_ids, gene_peptides = self._parse_gene_feat(gff_feat, gene_name) 

249 peptides += gene_peptides 

250 feats = {**feats, **gene_feats} 

251 all_ids += gene_ids 

252 

253 # No gene ID: parse if it is a tRNA or rRNA 

254 elif gff_feat.type in ("tRNA", "rRNA"): 

255 rna_feats, rna_ids = self._parse_rna_feat(gff_feat) 

256 feats = {**feats, **rna_feats} 

257 all_ids += rna_ids 

258 

259 # Any other case? Fail here and check if we should support it, or add it to unsupported list 

260 else: 

261 raise GBParseError(f"No ID for allowed feature: {feat}") 

262 

263 new_record = SeqRecord(record.seq, record.id) 

264 new_record.features = list(feats.values()) 

265 return new_record, all_ids, peptides 

266 

267 def _parse_gene_feat( 

268 self, gene_feat: SeqFeature, gene_name: str 

269 ) -> Tuple[Dict[str, SeqFeature], List[str], List[SeqRecord]]: 

270 """ 

271 Parse a gene feature from the genbank file 

272 

273 Args: 

274 gene_feat: Gene feature to parse 

275 gene_name: Gene name associated with the gene feature 

276 """ 

277 

278 gene_id = self.prefix + gene_name 

279 gene_qualifiers = gene_feat.qualifiers 

280 new_feats: Dict[str, Any] = {} 

281 peptides: List[SeqRecord] = [] 

282 all_ids: List[str] = [] 

283 

284 if gene_feat.type == "gene": 

285 if "pseudo" in gene_qualifiers: 285 ↛ 286line 285 didn't jump to line 286 because the condition on line 285 was never true

286 gene_feat.type = "pseudogene" 

287 gene_feat.qualifiers["ID"] = gene_id 

288 gene_feat.qualifiers["Name"] = gene_name 

289 if "gene" in gene_feat.qualifiers: 

290 del gene_feat.qualifiers["gene"] 

291 if "locus_tag" in gene_feat.qualifiers: 291 ↛ 292line 291 didn't jump to line 292 because the condition on line 291 was never true

292 del gene_feat.qualifiers["locus_tag"] 

293 new_feats[str(gene_id)] = gene_feat 

294 all_ids.append(str(gene_id)) 

295 

296 if gene_feat.type in ("tRNA", "rRNA"): 

297 tr_id = gene_id + "_t1" 

298 gene_feat.qualifiers["ID"] = tr_id 

299 gene_feat.qualifiers["Parent"] = gene_id 

300 if "gene" in gene_feat.qualifiers: 300 ↛ 301line 300 didn't jump to line 301 because the condition on line 300 was never true

301 del gene_feat.qualifiers["gene"] 

302 if "locus_tag" in gene_feat.qualifiers: 302 ↛ 304line 302 didn't jump to line 304 because the condition on line 302 was always true

303 del gene_feat.qualifiers["locus_tag"] 

304 new_feats[str(tr_id)] = gene_feat 

305 all_ids.append(str(tr_id)) 

306 

307 if gene_feat.type == "CDS": 

308 if "pseudo" in gene_qualifiers: 

309 gene_feat.type = "exon" 

310 cds_id = gene_id + "_p1" 

311 tr_id = gene_id + "_t1" 

312 gene_feat.qualifiers["ID"] = cds_id 

313 gene_feat.qualifiers["Parent"] = tr_id 

314 if "gene" in gene_feat.qualifiers: 

315 del gene_feat.qualifiers["gene"] 

316 if "locus_tag" in gene_feat.qualifiers: 316 ↛ 317line 316 didn't jump to line 317 because the condition on line 316 was never true

317 del gene_feat.qualifiers["locus_tag"] 

318 

319 # Add fasta to pep fasta file 

320 if "translation" in gene_qualifiers: 

321 new_pep_record = SeqRecord(Seq(gene_qualifiers["translation"][0]), id=cds_id) 

322 peptides.append(new_pep_record) 

323 

324 # Also create a parent transcript for this translation 

325 tr_qualifiers = {"ID": tr_id, "Name": gene_name, "Parent": gene_id} 

326 gff_tr = SeqFeature( 

327 location=gene_feat.location, 

328 type="mRNA", 

329 qualifiers=tr_qualifiers, 

330 ) 

331 new_feats[str(tr_id)] = gff_tr 

332 new_feats[str(cds_id)] = gene_feat 

333 all_ids.append(str(tr_id)) 

334 all_ids.append(str(cds_id)) 

335 

336 return new_feats, all_ids, peptides 

337 

338 def _parse_rna_feat(self, rna_feat: SeqFeature) -> Tuple[Dict[str, SeqFeature], List[str]]: 

339 """ 

340 Parse an RNA feature 

341 

342 Args: 

343 gene_feat: list of RNA features found in the record 

344 """ 

345 new_feats: Dict[str, Any] = {} 

346 all_ids: List[str] = [] 

347 

348 gff_qualifiers = rna_feat.qualifiers 

349 feat_name = gff_qualifiers["product"][0] 

350 gene_id = self.prefix + feat_name 

351 

352 parts = gene_id.split(" ") 

353 if len(parts) > 2: 

354 logging.info(f"Shortening gene_id to {parts[0]}") 

355 gene_id = parts[0] 

356 gene_id = self._uniquify_id(gene_id, all_ids) 

357 

358 feat_id = gene_id + "_t1" 

359 rna_feat.qualifiers["ID"] = feat_id 

360 rna_feat.qualifiers["Name"] = feat_name 

361 rna_feat.qualifiers["Parent"] = gene_id 

362 

363 # Also create a parent gene for this transcript 

364 gene_qualifiers = { 

365 "ID": gene_id, 

366 "Name": feat_name, 

367 } 

368 gff_gene = SeqFeature( 

369 location=rna_feat.location, 

370 type="gene", 

371 qualifiers=gene_qualifiers, 

372 ) 

373 new_feats[str(gene_id)] = gff_gene 

374 new_feats[str(feat_id)] = rna_feat 

375 all_ids.append(str(gene_id)) 

376 all_ids.append(str(feat_id)) 

377 

378 return new_feats, all_ids 

379 

380 def _uniquify_id(self, gene_id: str, all_ids: List[str]) -> str: 

381 """ 

382 Ensure the gene id used is unique, 

383 and append a number otherwise, starting at 2 

384 

385 Args: 

386 all_ids: list of all the feature ids 

387 gene_id: ids assigned to gene 

388 """ 

389 

390 new_id = gene_id 

391 num = 1 

392 while new_id in all_ids: 

393 num += 1 

394 new_id = f"{gene_id}_{num}" 

395 if gene_id != new_id: 

396 logging.info(f"Make gene id unique: {gene_id} -> {new_id}") 

397 

398 return new_id 

399 

400 def _format_write_seq_json(self) -> None: 

401 """ 

402 Add the sequence metadata to seq_json based on ensembl requirements 

403 """ 

404 json_array = [] 

405 for seq in self.seq_records: 

406 codon_table = self._get_codon_table(seq) 

407 if codon_table is None: 407 ↛ 408line 407 didn't jump to line 408 because the condition on line 407 was never true

408 logging.warning( 

409 ( 

410 "No codon table found. Make sure to change the codon table number in " 

411 f"{self.files['seq_region']} manually if it is not the standard codon table." 

412 ) 

413 ) 

414 codon_table = 1 

415 else: 

416 codon_table = int(codon_table) 

417 

418 seq_obj: Dict[str, Any] = { 

419 "name": seq.id, 

420 "coord_system_level": "chromosome", 

421 "circular": (seq.annotations["topology"] == "circular"), 

422 "codon_table": codon_table, 

423 "length": len(seq.seq), # type: ignore[arg-type] 

424 } 

425 if "organelle" in seq.annotations: 425 ↛ 437line 425 didn't jump to line 437 because the condition on line 425 was always true

426 seq_obj["location"] = self._prepare_location(str(seq.annotations["organelle"])) 

427 if not codon_table: 427 ↛ 428line 427 didn't jump to line 428 because the condition on line 427 was never true

428 logging.warning( 

429 ( 

430 f"'{seq.annotations['organelle']}' is an organelle: " 

431 "make sure to change the codon table number " 

432 f"in {self.files['seq_region']} manually if it is not the standard codon table" 

433 ) 

434 ) 

435 

436 # Additional attributes for Ensembl 

437 seq_obj["added_sequence"] = { 

438 "accession": seq.id, 

439 "assembly_provider": { 

440 "name": "GenBank", 

441 "url": "https://www.ncbi.nlm.nih.gov/genbank", 

442 }, 

443 } 

444 json_array.append(seq_obj) 

445 self._write_seq_region_json(json_array) 

446 

447 def _write_seq_region_json(self, json_array: List[Dict[str, Any]]) -> None: 

448 """ 

449 Generate seq_region.json file with metadata for the sequence 

450 

451 Args: 

452 json_array: List of extracted sequence with metadata 

453 """ 

454 logging.debug(f"Write {len(json_array)} seq_region to {self.files['seq_region']}") 

455 with open(self.files["seq_region"], "w") as seq_fh: 

456 seq_fh.write(json.dumps(json_array, indent=4)) 

457 

458 def _get_codon_table(self, seq: SeqRecord) -> Optional[int]: 

459 """ 

460 Look at the CDS features to see if they have a codon table 

461 

462 Args: 

463 seq: SeqRecord in the genbank file 

464 """ 

465 for feat in seq.features: 

466 if feat.type == "CDS": 

467 qualifiers = feat.qualifiers 

468 if "transl_table" in qualifiers: 468 ↛ 470line 468 didn't jump to line 470 because the condition on line 468 was always true

469 return qualifiers["transl_table"][0] 

470 return None 

471 return None 

472 

473 def _prepare_location(self, organelle: str) -> str: 

474 """ 

475 Given an organelle name, returns the SO term corresponding to its location 

476 

477 Args: 

478 organelle: SeqRecord with organelle 

479 """ 

480 if organelle in self.locations: 

481 return self.locations[organelle] 

482 raise UnsupportedData(f"Unknown organelle: {organelle}") 

483 

484 def _format_genome_data(self) -> None: 

485 """ 

486 Write a draft for the genome json file 

487 Only the production_name is needed, but the rest of the fields need to be given 

488 for the validation of the json file 

489 """ 

490 prod_name = self.prod_name 

491 genome_data: Dict[str, Dict[str, Any]] = { 

492 "species": { 

493 "production_name": prod_name, 

494 "taxonomy_id": 0, 

495 }, 

496 "assembly": {"accession": "GCA_000000000", "version": 1}, 

497 "added_seq": {}, 

498 } 

499 

500 if not genome_data["species"]["production_name"]: 500 ↛ 501line 500 didn't jump to line 501 because the condition on line 500 was never true

501 logging.warning( 

502 f"Please add the relevant production_name for this genome in {self.files['genome']}" 

503 ) 

504 

505 ids = [seq.id for seq in self.seq_records] 

506 genome_data["added_seq"]["region_name"] = ids 

507 self._write_genome_json(genome_data) 

508 

509 def _write_genome_json(self, genome_data: Dict[str, Any]) -> None: 

510 """ 

511 Generate genome.json file with metadata for the assembly 

512 

513 Args: 

514 genome_data: Dict of metadata for assembly 

515 """ 

516 logging.debug(f"Write assembly metadata to {self.files['genome']}") 

517 with open(self.files["genome"], "w") as genome_fh: 

518 genome_fh.write(json.dumps(genome_data, indent=4)) 

519 

520 

521def main() -> None: 

522 """Main script entry-point.""" 

523 parser = ArgumentParser(description="Parse a GenBank file and create cleaned up files from it.") 

524 parser.add_argument_src_path("--gb_file", required=True, help="sequence accession file") 

525 parser.add_argument("--prefix", required=True, help="prefix to add to every feature ID") 

526 parser.add_argument("--prod_name", required=True, help="production name for the species") 

527 parser.add_argument_dst_path( 

528 "--out_dir", default=Path.cwd(), help="output folder where the generated files will be stored" 

529 ) 

530 parser.add_argument("--version", action="version", version=ensembl.io.genomio.__version__) 

531 parser.add_log_arguments(add_log_file=True) 

532 args = parser.parse_args() 

533 init_logging_with_args(args) 

534 

535 gb_extractor = FormattedFilesGenerator( 

536 prefix=args.prefix, prod_name=args.prod_name, gb_file=args.gb_file, out_dir=args.out_dir 

537 ) 

538 gb_extractor.parse_genbank(args.gb_file)