Coverage for src/python/ensembl/io/genomio/genbank/extract_data.py: 89%
258 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-02-21 15:37 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-02-21 15:37 +0000
1# See the NOTICE file distributed with this work for additional information
2# regarding copyright ownership.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15"""Parse a Genbank file and creates cleaned up files from it:
16- DNA fasta
17- Peptide fasta
18- Gene models GFF3
19- seq_regions json
20- genome metadata json
22Raises:
23 GBParseError: If the structure of the gb file cannot be parsed.
24 UnsupportedData: If some data is not as expected.
26Returns:
27 json_output: json file with a dict that contains all genome files created.
28"""
30__all__ = ["GBParseError", "UnsupportedData", "GenomeFiles", "FormattedFilesGenerator"]
32from collections import Counter
33import json
34import logging
35from os import PathLike
36from pathlib import Path
37from typing import Any, Dict, List, Optional, Tuple
39from BCBio import GFF
40from Bio import GenBank, SeqIO
41from Bio.Seq import Seq
42from Bio.SeqRecord import SeqRecord
43from Bio.SeqFeature import SeqFeature
45import ensembl.io.genomio
46from ensembl.utils.argparse import ArgumentParser
47from ensembl.utils.logging import init_logging_with_args
50class GBParseError(Exception):
51 """Error when parsing the Genbank file."""
54class UnsupportedData(Exception):
55 """When an expected data is not supported by the current parser."""
58class GenomeFiles(dict):
59 """
60 Store the representation of the genome files created.
61 """
63 def __init__(self, out_dir: PathLike) -> None:
64 super().__init__()
65 out_dir = Path(out_dir)
66 self["genome"] = out_dir / "genome.json"
67 self["seq_region"] = out_dir / "seq_region.json"
68 self["fasta_dna"] = out_dir / "dna.fasta"
69 self["fasta_pep"] = out_dir / "pep.fasta"
70 self["gene_models"] = out_dir / "genes.gff"
73class FormattedFilesGenerator:
74 """
75 Contains a parser to load data from a file, and output a set of files that follow our schema
76 for input into a core database
77 """
79 locations = {
80 "mitochondrion": "mitochondrial_chromosome",
81 "apicoplast": "apicoplast_chromosome",
82 "chloroplast": "chloroplast_chromosome",
83 "chromoplast": "chromoplast_chromosome",
84 "cyanelle": "cyanelle_chromosome",
85 "leucoplast": "leucoplast_chromosome",
86 }
88 allowed_feat_types = [
89 "gene",
90 "transcript",
91 "tRNA",
92 "rRNA",
93 "CDS",
94 ]
96 def __init__(self, prod_name: str, gb_file: PathLike, prefix: str, out_dir: PathLike) -> None:
97 self.prefix = prefix
98 self.seq_records: List[SeqRecord] = []
99 self.prod_name = prod_name
100 self.gb_file = gb_file
102 # Output the gff3 file
103 self.files = GenomeFiles(Path(out_dir))
105 def parse_genbank(self, gb_file: PathLike) -> None:
106 """
107 Load records metadata from a Genbank file
109 Args:
110 gb_file: Path to downloaded genbank file
111 """
112 organella = self._get_organella(gb_file)
113 logging.debug(f"Organella loaded: {organella}")
115 with open(gb_file, "r") as gbh:
116 for record in SeqIO.parse(gbh, "genbank"):
117 # We don't want the record description (especially for the fasta file)
118 record.description = ""
119 record.organelle = None
120 if record.id in organella: 120 ↛ 122line 120 didn't jump to line 122 because the condition on line 120 was always true
121 record.annotations["organelle"] = organella[record.id]
122 self.seq_records.append(record)
124 if len(self.seq_records) >= 1: 124 ↛ 127line 124 didn't jump to line 127 because the condition on line 124 was always true
125 self.format_write_record()
126 else:
127 logging.warning("No records are found in gb_file")
129 def format_write_record(self) -> None:
130 """
131 Generate the prepared files from genbank record
132 """
133 self._format_genome_data()
134 self._format_write_genes_gff()
135 self._format_write_seq_json()
136 self._write_fasta_dna()
138 def _get_organella(self, gb_file: PathLike) -> Dict[str, str]:
139 """
140 Retrieve the organelle from the genbank file, using the specific GenBank object,
141 because SeqIO does not support this field
143 Args:
144 gb_file: path to genbank file
145 """
146 organella = {}
147 with open(gb_file, "r") as gbh:
148 for record in GenBank.parse(gbh):
149 accession = record.version
150 for q in record.features[0].qualifiers:
151 if q.key == "/organelle=":
152 organelle = q.value.replace('"', "")
153 organella[accession] = organelle
154 return organella
156 def _write_fasta_dna(self) -> None:
157 """
158 Generate a DNA fasta file with all the sequences in the record
159 """
160 logging.debug(f"Write {len(self.seq_records)} DNA sequences to {self.files['fasta_dna']}")
161 with open(self.files["fasta_dna"], "w") as fasta_fh:
162 SeqIO.write(self.seq_records, fasta_fh, "fasta")
164 def _format_write_genes_gff(self) -> None:
165 """
166 Extract gene models from the record, and write a GFF and peptide fasta file.
167 Raise GBParseError If the IDs in all the records are not unique.
168 """
169 peptides: List[SeqRecord] = []
170 gff_records: List[SeqRecord] = []
171 all_ids: List[str] = []
173 for record in self.seq_records:
174 new_record, rec_ids, rec_peptides = self._parse_record(record)
175 if new_record.features: 175 ↛ 177line 175 didn't jump to line 177 because the condition on line 175 was always true
176 gff_records.append(new_record)
177 all_ids += rec_ids
178 peptides += rec_peptides
180 if gff_records: 180 ↛ 183line 180 didn't jump to line 183 because the condition on line 180 was always true
181 self._write_genes_gff(gff_records)
183 if peptides: 183 ↛ 186line 183 didn't jump to line 186 because the condition on line 183 was always true
184 self._write_pep_fasta(peptides)
186 logging.debug("Check that IDs are unique")
187 count = dict(Counter(all_ids))
188 num_duplicates = 0
189 for key in count:
190 if count[key] > 1:
191 num_duplicates += 1
192 logging.warning(f"ID {key} is duplicated {count[key]} times")
193 if num_duplicates > 0:
194 raise GBParseError(f"Some {num_duplicates} IDs are duplicated")
196 def _write_genes_gff(self, gff_records: List[SeqRecord]) -> None:
197 """
198 Generate gene_models.gff file with the parsed gff_features
200 Args:
201 gff_records: List of records with features extracted from the record
202 """
203 logging.debug(f"Write {len(gff_records)} gene records to {self.files['gene_models']}")
204 with self.files["gene_models"].open("w") as gff_fh:
205 GFF.write(gff_records, gff_fh)
207 def _write_pep_fasta(self, peptides: List[SeqRecord]) -> None:
208 """
209 Generate a peptide fasta file with the protein ids and sequence
211 Args:
212 peptides: List of extracted peptide features as records
213 """
214 logging.debug(f"Write {len(peptides)} peptide sequences to {self.files['fasta_pep']}")
215 with self.files["fasta_pep"].open("w") as fasta_fh:
216 SeqIO.write(peptides, fasta_fh, "fasta")
218 def _parse_record(self, record: SeqRecord) -> Tuple[SeqRecord, List[str], List[SeqRecord]]:
219 """
220 Parse a gene feature from the genbank file
221 Args:
222 gene_feat: Gene feature to parse
223 gene_name: Gene name associated with the gene feature
224 """
225 all_ids: List[str] = []
226 peptides: List[SeqRecord] = []
227 feats: Dict[str, SeqFeature] = {}
229 for feat in record.features:
230 # Silently skip any unsupported feature type
231 if feat.type not in self.allowed_feat_types:
232 continue
234 # Create a clean clone of the feature
235 gff_qualifiers = feat.qualifiers
236 gff_feat = SeqFeature(
237 location=feat.location,
238 type=feat.type,
239 qualifiers=gff_qualifiers,
240 )
241 # Only Genes should have a name: use either attribute gene or locus_tag
242 gene_name = gff_qualifiers.get("gene", [None])[0]
243 if gene_name is None:
244 gene_name = gff_qualifiers.get("locus_tag", [None])[0]
246 # Parse this gene
247 if gene_name is not None:
248 gene_feats, gene_ids, gene_peptides = self._parse_gene_feat(gff_feat, gene_name)
249 peptides += gene_peptides
250 feats = {**feats, **gene_feats}
251 all_ids += gene_ids
253 # No gene ID: parse if it is a tRNA or rRNA
254 elif gff_feat.type in ("tRNA", "rRNA"):
255 rna_feats, rna_ids = self._parse_rna_feat(gff_feat)
256 feats = {**feats, **rna_feats}
257 all_ids += rna_ids
259 # Any other case? Fail here and check if we should support it, or add it to unsupported list
260 else:
261 raise GBParseError(f"No ID for allowed feature: {feat}")
263 new_record = SeqRecord(record.seq, record.id)
264 new_record.features = list(feats.values())
265 return new_record, all_ids, peptides
267 def _parse_gene_feat(
268 self, gene_feat: SeqFeature, gene_name: str
269 ) -> Tuple[Dict[str, SeqFeature], List[str], List[SeqRecord]]:
270 """
271 Parse a gene feature from the genbank file
273 Args:
274 gene_feat: Gene feature to parse
275 gene_name: Gene name associated with the gene feature
276 """
278 gene_id = self.prefix + gene_name
279 gene_qualifiers = gene_feat.qualifiers
280 new_feats: Dict[str, Any] = {}
281 peptides: List[SeqRecord] = []
282 all_ids: List[str] = []
284 if gene_feat.type == "gene":
285 if "pseudo" in gene_qualifiers: 285 ↛ 286line 285 didn't jump to line 286 because the condition on line 285 was never true
286 gene_feat.type = "pseudogene"
287 gene_feat.qualifiers["ID"] = gene_id
288 gene_feat.qualifiers["Name"] = gene_name
289 if "gene" in gene_feat.qualifiers:
290 del gene_feat.qualifiers["gene"]
291 if "locus_tag" in gene_feat.qualifiers: 291 ↛ 292line 291 didn't jump to line 292 because the condition on line 291 was never true
292 del gene_feat.qualifiers["locus_tag"]
293 new_feats[str(gene_id)] = gene_feat
294 all_ids.append(str(gene_id))
296 if gene_feat.type in ("tRNA", "rRNA"):
297 tr_id = gene_id + "_t1"
298 gene_feat.qualifiers["ID"] = tr_id
299 gene_feat.qualifiers["Parent"] = gene_id
300 if "gene" in gene_feat.qualifiers: 300 ↛ 301line 300 didn't jump to line 301 because the condition on line 300 was never true
301 del gene_feat.qualifiers["gene"]
302 if "locus_tag" in gene_feat.qualifiers: 302 ↛ 304line 302 didn't jump to line 304 because the condition on line 302 was always true
303 del gene_feat.qualifiers["locus_tag"]
304 new_feats[str(tr_id)] = gene_feat
305 all_ids.append(str(tr_id))
307 if gene_feat.type == "CDS":
308 if "pseudo" in gene_qualifiers:
309 gene_feat.type = "exon"
310 cds_id = gene_id + "_p1"
311 tr_id = gene_id + "_t1"
312 gene_feat.qualifiers["ID"] = cds_id
313 gene_feat.qualifiers["Parent"] = tr_id
314 if "gene" in gene_feat.qualifiers:
315 del gene_feat.qualifiers["gene"]
316 if "locus_tag" in gene_feat.qualifiers: 316 ↛ 317line 316 didn't jump to line 317 because the condition on line 316 was never true
317 del gene_feat.qualifiers["locus_tag"]
319 # Add fasta to pep fasta file
320 if "translation" in gene_qualifiers:
321 new_pep_record = SeqRecord(Seq(gene_qualifiers["translation"][0]), id=cds_id)
322 peptides.append(new_pep_record)
324 # Also create a parent transcript for this translation
325 tr_qualifiers = {"ID": tr_id, "Name": gene_name, "Parent": gene_id}
326 gff_tr = SeqFeature(
327 location=gene_feat.location,
328 type="mRNA",
329 qualifiers=tr_qualifiers,
330 )
331 new_feats[str(tr_id)] = gff_tr
332 new_feats[str(cds_id)] = gene_feat
333 all_ids.append(str(tr_id))
334 all_ids.append(str(cds_id))
336 return new_feats, all_ids, peptides
338 def _parse_rna_feat(self, rna_feat: SeqFeature) -> Tuple[Dict[str, SeqFeature], List[str]]:
339 """
340 Parse an RNA feature
342 Args:
343 gene_feat: list of RNA features found in the record
344 """
345 new_feats: Dict[str, Any] = {}
346 all_ids: List[str] = []
348 gff_qualifiers = rna_feat.qualifiers
349 feat_name = gff_qualifiers["product"][0]
350 gene_id = self.prefix + feat_name
352 parts = gene_id.split(" ")
353 if len(parts) > 2:
354 logging.info(f"Shortening gene_id to {parts[0]}")
355 gene_id = parts[0]
356 gene_id = self._uniquify_id(gene_id, all_ids)
358 feat_id = gene_id + "_t1"
359 rna_feat.qualifiers["ID"] = feat_id
360 rna_feat.qualifiers["Name"] = feat_name
361 rna_feat.qualifiers["Parent"] = gene_id
363 # Also create a parent gene for this transcript
364 gene_qualifiers = {
365 "ID": gene_id,
366 "Name": feat_name,
367 }
368 gff_gene = SeqFeature(
369 location=rna_feat.location,
370 type="gene",
371 qualifiers=gene_qualifiers,
372 )
373 new_feats[str(gene_id)] = gff_gene
374 new_feats[str(feat_id)] = rna_feat
375 all_ids.append(str(gene_id))
376 all_ids.append(str(feat_id))
378 return new_feats, all_ids
380 def _uniquify_id(self, gene_id: str, all_ids: List[str]) -> str:
381 """
382 Ensure the gene id used is unique,
383 and append a number otherwise, starting at 2
385 Args:
386 all_ids: list of all the feature ids
387 gene_id: ids assigned to gene
388 """
390 new_id = gene_id
391 num = 1
392 while new_id in all_ids:
393 num += 1
394 new_id = f"{gene_id}_{num}"
395 if gene_id != new_id:
396 logging.info(f"Make gene id unique: {gene_id} -> {new_id}")
398 return new_id
400 def _format_write_seq_json(self) -> None:
401 """
402 Add the sequence metadata to seq_json based on ensembl requirements
403 """
404 json_array = []
405 for seq in self.seq_records:
406 codon_table = self._get_codon_table(seq)
407 if codon_table is None: 407 ↛ 408line 407 didn't jump to line 408 because the condition on line 407 was never true
408 logging.warning(
409 (
410 "No codon table found. Make sure to change the codon table number in "
411 f"{self.files['seq_region']} manually if it is not the standard codon table."
412 )
413 )
414 codon_table = 1
415 else:
416 codon_table = int(codon_table)
418 seq_obj: Dict[str, Any] = {
419 "name": seq.id,
420 "coord_system_level": "chromosome",
421 "circular": (seq.annotations["topology"] == "circular"),
422 "codon_table": codon_table,
423 "length": len(seq.seq), # type: ignore[arg-type]
424 }
425 if "organelle" in seq.annotations: 425 ↛ 437line 425 didn't jump to line 437 because the condition on line 425 was always true
426 seq_obj["location"] = self._prepare_location(str(seq.annotations["organelle"]))
427 if not codon_table: 427 ↛ 428line 427 didn't jump to line 428 because the condition on line 427 was never true
428 logging.warning(
429 (
430 f"'{seq.annotations['organelle']}' is an organelle: "
431 "make sure to change the codon table number "
432 f"in {self.files['seq_region']} manually if it is not the standard codon table"
433 )
434 )
436 # Additional attributes for Ensembl
437 seq_obj["added_sequence"] = {
438 "accession": seq.id,
439 "assembly_provider": {
440 "name": "GenBank",
441 "url": "https://www.ncbi.nlm.nih.gov/genbank",
442 },
443 }
444 json_array.append(seq_obj)
445 self._write_seq_region_json(json_array)
447 def _write_seq_region_json(self, json_array: List[Dict[str, Any]]) -> None:
448 """
449 Generate seq_region.json file with metadata for the sequence
451 Args:
452 json_array: List of extracted sequence with metadata
453 """
454 logging.debug(f"Write {len(json_array)} seq_region to {self.files['seq_region']}")
455 with open(self.files["seq_region"], "w") as seq_fh:
456 seq_fh.write(json.dumps(json_array, indent=4))
458 def _get_codon_table(self, seq: SeqRecord) -> Optional[int]:
459 """
460 Look at the CDS features to see if they have a codon table
462 Args:
463 seq: SeqRecord in the genbank file
464 """
465 for feat in seq.features:
466 if feat.type == "CDS":
467 qualifiers = feat.qualifiers
468 if "transl_table" in qualifiers: 468 ↛ 470line 468 didn't jump to line 470 because the condition on line 468 was always true
469 return qualifiers["transl_table"][0]
470 return None
471 return None
473 def _prepare_location(self, organelle: str) -> str:
474 """
475 Given an organelle name, returns the SO term corresponding to its location
477 Args:
478 organelle: SeqRecord with organelle
479 """
480 if organelle in self.locations:
481 return self.locations[organelle]
482 raise UnsupportedData(f"Unknown organelle: {organelle}")
484 def _format_genome_data(self) -> None:
485 """
486 Write a draft for the genome json file
487 Only the production_name is needed, but the rest of the fields need to be given
488 for the validation of the json file
489 """
490 prod_name = self.prod_name
491 genome_data: Dict[str, Dict[str, Any]] = {
492 "species": {
493 "production_name": prod_name,
494 "taxonomy_id": 0,
495 },
496 "assembly": {"accession": "GCA_000000000", "version": 1},
497 "added_seq": {},
498 }
500 if not genome_data["species"]["production_name"]: 500 ↛ 501line 500 didn't jump to line 501 because the condition on line 500 was never true
501 logging.warning(
502 f"Please add the relevant production_name for this genome in {self.files['genome']}"
503 )
505 ids = [seq.id for seq in self.seq_records]
506 genome_data["added_seq"]["region_name"] = ids
507 self._write_genome_json(genome_data)
509 def _write_genome_json(self, genome_data: Dict[str, Any]) -> None:
510 """
511 Generate genome.json file with metadata for the assembly
513 Args:
514 genome_data: Dict of metadata for assembly
515 """
516 logging.debug(f"Write assembly metadata to {self.files['genome']}")
517 with open(self.files["genome"], "w") as genome_fh:
518 genome_fh.write(json.dumps(genome_data, indent=4))
521def main() -> None:
522 """Main script entry-point."""
523 parser = ArgumentParser(description="Parse a GenBank file and create cleaned up files from it.")
524 parser.add_argument_src_path("--gb_file", required=True, help="sequence accession file")
525 parser.add_argument("--prefix", required=True, help="prefix to add to every feature ID")
526 parser.add_argument("--prod_name", required=True, help="production name for the species")
527 parser.add_argument_dst_path(
528 "--out_dir", default=Path.cwd(), help="output folder where the generated files will be stored"
529 )
530 parser.add_argument("--version", action="version", version=ensembl.io.genomio.__version__)
531 parser.add_log_arguments(add_log_file=True)
532 args = parser.parse_args()
533 init_logging_with_args(args)
535 gb_extractor = FormattedFilesGenerator(
536 prefix=args.prefix, prod_name=args.prod_name, gb_file=args.gb_file, out_dir=args.out_dir
537 )
538 gb_extractor.parse_genbank(args.gb_file)