Coverage for src/python/ensembl/io/genomio/manifest/manifest_stats.py: 91%
198 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-02-21 15:37 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-02-21 15:37 +0000
1# See the NOTICE file distributed with this work for additional information
2# regarding copyright ownership.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15"""Register the main stats for all files from a manifest for comparison."""
17__all__ = ["InvalidIntegrityError", "ManifestStats", "StatsLengths"]
19import logging
20import json
21from math import floor
22from os import PathLike
23from pathlib import Path
24from typing import Any, TypeAlias
26from BCBio import GFF
27from Bio import SeqIO
29from ensembl.io.genomio.gff3.features import GFFSeqFeature
30from ensembl.io.genomio.manifest.manifest import Manifest
31from ensembl.io.genomio.utils import get_json
32from ensembl.utils.archive import open_gz_file
33from ensembl.utils import StrPath
36# Record the lengths of the sequence for features/regions
37StatsLengths: TypeAlias = dict[str, int]
40class InvalidIntegrityError(Exception):
41 """When a file integrity check fails"""
44class ManifestStats:
45 """Representation of the main stats of the files in a manifest for comparison.
47 The stats in question are:
48 - lengths of sequences (DNA, genes and peptides)
49 - sequences and features IDs
50 - sequences circularity
51 """
53 def __init__(self, manifest_path: StrPath, ignore_final_stops: bool = False) -> None:
54 self.manifest_files = self._get_manifest(manifest_path)
55 self.genome: dict[str, Any] = {}
57 self.lengths: dict[str, StatsLengths] = {
58 "dna_sequences": {},
59 "peptide_sequences": {},
60 "seq_region_levels": {},
61 "annotations": {},
62 "agp": {},
63 "gff3_seq_regions": {},
64 "gff3_genes": {},
65 "gff3_translations": {},
66 "gff3_all_translations": {},
67 "gff3_transposable_elements": {},
68 "ann_genes": {},
69 "ann_translations": {},
70 "ann_transposable_elements": {},
71 "seq_regions": {},
72 }
74 self.circular: dict[str, StatsLengths] = {
75 "seq_regions": {},
76 }
78 self.errors: list[str] = []
80 self.ignore_final_stops = ignore_final_stops
82 def _get_manifest(self, manifest_path: PathLike) -> dict[str, Any]:
83 """Load the content of a manifest file.
85 Returns:
86 Dict: Content of the manifest file.
87 """
88 manifest = Manifest(Path(manifest_path).parent)
89 manifest_files = manifest.load()
91 # Replace the {file, md5} dict with the file path
92 for name in manifest_files:
93 if "file" in manifest_files[name]:
94 manifest_files[name] = Path(manifest_path).parent / manifest_files[name]["file"]
95 else:
96 for f in manifest_files[name]:
97 manifest_files[name][f] = Path(manifest_path).parent / manifest_files[name][f]["file"]
98 return manifest_files
100 def add_error(self, error: str) -> None:
101 """Record an error."""
102 self.errors.append(error)
104 def load_seq_regions(self) -> None:
105 """Retrieve seq_regions lengths and circular information from the seq_region JSON file."""
107 if "seq_region" not in self.manifest_files:
108 return
109 logging.info("Manifest contains seq_region JSON")
110 seq_regions = get_json(Path(self.manifest_files["seq_region"]))
111 seqr_seqlevel = {}
112 seq_lengths = {}
113 seq_circular = {}
114 # Store the length as int
115 for seq in seq_regions:
116 seq_lengths[seq["name"]] = int(seq["length"])
117 seq_circular[seq["name"]] = seq.get("circular", False)
118 if seq["coord_system_level"] == "contig":
119 seqr_seqlevel[seq["name"]] = int(seq["length"])
120 # Also record synonyms (in case GFF file uses synonyms)
121 if "synonyms" in seq:
122 for synonym in seq["synonyms"]:
123 seq_lengths[synonym["name"]] = int(seq["length"])
124 self.lengths["seq_regions"] = seq_lengths
125 self.circular["seq_regions"] = seq_circular
127 def load_peptides_fasta_lengths(self) -> None:
128 """Retrieve peptides sequences lengths from their FASTA file."""
129 if "fasta_pep" not in self.manifest_files:
130 return
131 self.lengths["peptide_sequences"] = self._get_fasta_lengths(
132 self.manifest_files["fasta_pep"], ignore_final_stops=self.ignore_final_stops
133 )
135 def load_dna_fasta_lengths(self) -> None:
136 """Retrieve DNA sequences lengths from their FASTA file."""
137 if "fasta_dna" not in self.manifest_files:
138 return
139 self.lengths["dna_sequences"] = self._get_fasta_lengths(self.manifest_files["fasta_dna"])
141 def _get_fasta_lengths(self, fasta_path: StrPath, ignore_final_stops: bool = False) -> dict[str, int]:
142 """Returns every sequence ID and its length from a FASTA file (DNA or peptide).
144 An error will be added for every empty id, non-unique id or stop codon found in the FASTA file.
146 Args:
147 fasta_path: Path to FASTA file.
148 ignore_final_stops: Do not include final stop in the total length.
150 """
152 data = {}
153 non_unique = {}
154 non_unique_count = 0
155 empty_id_count = 0
156 contains_stop_codon = 0
157 rec_count = 0
158 for rec in SeqIO.parse(fasta_path, "fasta"):
159 rec_count += 1
161 # Flag empty ids
162 if rec.id == "":
163 empty_id_count += 1
164 continue
165 # Flag redundant ids
166 if rec.id in data:
167 non_unique[rec.id] = 1
168 non_unique_count += 1
169 # Store sequence id and length
170 data[rec.id] = len(rec.seq)
171 stops = rec.seq.count("*")
172 if stops >= 1 and not rec.seq.endswith("*"):
173 contains_stop_codon += 1
174 elif rec.seq.endswith("*") and not ignore_final_stops:
175 contains_stop_codon += 1
177 if empty_id_count > 0:
178 self.add_error(f"{empty_id_count} sequences with empty ids in {fasta_path}")
179 if non_unique_count > 0:
180 self.add_error(f"{non_unique_count} non unique sequence ids in {fasta_path}")
181 if contains_stop_codon > 0:
182 self.add_error(f"{contains_stop_codon} sequences with stop codons in {fasta_path}")
183 if rec_count == 0:
184 self.add_error(f"No sequences found in {fasta_path}")
185 return data
187 def load_functional_annotation(self) -> None:
188 """Load the functional annotation file to retrieve the gene_id and translation id.
190 The functional annotation file is stored in a JSON format containing the description, id
191 and object type, eg: "gene", "transcript", "translation".
193 """
194 if "functional_annotation" not in self.manifest_files:
195 return
196 logging.info("Manifest contains functional annotation(s)")
198 # Load the json file
199 with open(self.manifest_files["functional_annotation"]) as json_file:
200 data = json.load(json_file)
202 # Get gene ids and translation ids
203 genes = {}
204 translations = {}
205 transposons = {}
207 for item in data:
208 if item["object_type"] == "gene":
209 genes[item["id"]] = 1
210 elif item["object_type"] == "translation":
211 translations[item["id"]] = 1
212 if item["object_type"] == "transposable_element":
213 transposons[item["id"]] = 1
215 stats = {
216 "ann_genes": genes,
217 "ann_translations": translations,
218 "ann_transposable_elements": transposons,
219 }
220 self.lengths = {**self.lengths, **stats}
222 def load_gff3(self) -> None:
223 """A GFF3 parser is used to retrieve information in the GFF3 file such as
224 gene and CDS ids and their corresponding lengths.
225 """
226 if "gff3" not in self.manifest_files:
227 return
228 logging.info("Manifest contains GFF3 gene annotations")
229 gff3_path = self.manifest_files["gff3"]
231 seqs: StatsLengths = {}
232 genes: StatsLengths = {}
233 peps: StatsLengths = {}
234 all_peps: StatsLengths = {}
235 tes: StatsLengths = {}
237 with open_gz_file(gff3_path) as gff3_handle:
238 gff = GFF.parse(gff3_handle)
239 for seq in gff:
240 seqs[seq.id] = len(seq.seq)
242 for feat in seq.features:
243 feat_length = abs(feat.location.end - feat.location.start)
244 # Store gene id and length
245 if feat.type in ["gene", "ncRNA_gene", "pseudogene"]:
246 self._retrieve_gff_gene_lengths(feat, genes, peps, all_peps)
247 if feat.type == "transposable_element":
248 tes[feat.id] = feat_length
250 stats: dict[str, StatsLengths] = {
251 "gff3_seq_regions": seqs,
252 "gff3_genes": genes,
253 "gff3_translations": peps,
254 "gff3_all_translations": all_peps,
255 "gff3_transposable_elements": tes,
256 }
257 self.lengths = {**self.lengths, **stats}
259 def _retrieve_gff_gene_lengths(
260 self, feat: GFFSeqFeature, genes: StatsLengths, peps: StatsLengths, all_peps: StatsLengths
261 ) -> None:
262 """Record genes and peptides lengths from a feature.
264 Args:
265 feat : Gene feature to check.
266 genes: Record of genes lengths to update.
267 peps: Record of peptides lengths to update.
268 all_peps: Record of all peptides lengths to update (include pseudogenes).
270 """
271 gene_id = feat.id
272 gene_id = gene_id.replace("gene:", "")
273 genes[gene_id] = abs(feat.location.end - feat.location.start)
274 # Get CDS id and length
275 protein_transcripts = {
276 "mRNA",
277 "pseudogenic_transcript",
278 }
279 ig_transcripts = {
280 "IG_V_gene",
281 "IG_C_gene",
282 "TR_C_gene",
283 "TR_V_gene",
284 }
285 cds_transcripts = protein_transcripts.union(ig_transcripts)
286 for feat2 in feat.sub_features:
287 if feat2.type not in cds_transcripts:
288 continue
289 length: dict[str, int] = {}
290 for feat3 in feat2.sub_features:
291 if feat3.type != "CDS":
292 continue
293 pep_id = feat3.id
294 pep_id = pep_id.replace("CDS:", "")
295 length.setdefault(pep_id, 0)
296 length[pep_id] += abs(feat3.location.end - feat3.location.start)
297 for pep_id, pep_length in length.items():
298 # Store length for translations, add pseudo translations separately
299 pep_length = floor(pep_length / 3) - 1
300 if feat.type != "pseudogene" and feat2.type in protein_transcripts:
301 peps[pep_id] = pep_length
302 all_peps[pep_id] = pep_length
304 def load_agp_seq_regions(self, agp_dict: dict | None) -> None:
305 """AGP files describe the assembly of larger sequence objects using smaller objects.
307 E.g. describes the assembly of scaffolds from contigs.
309 Args:
310 agp_dict: Dict containing the information about the sequence.
312 Note:
313 AGP file is only used in the older builds, not used for current processing.
314 """
315 if not agp_dict: 315 ↛ 317line 315 didn't jump to line 317 because the condition on line 315 was always true
316 return
317 logging.info("Manifest contains AGP files")
319 seqr: StatsLengths = {}
320 for agp_path in agp_dict.values():
321 with open(agp_path, "r") as agph:
322 for line in agph:
323 (
324 asm_id,
325 _, # asm_start
326 asm_end,
327 _, # asm_part
328 typ,
329 cmp_id,
330 _, # cmp_start
331 cmp_end,
332 _, # cmp_strand
333 ) = line.split("\t")
334 # Ignore WGS contig
335 if typ != "W":
336 continue
338 # Assembled seq length
339 if asm_id not in seqr or seqr[asm_id] < int(asm_end):
340 seqr[asm_id] = int(asm_end)
342 # Composite seq length
343 if cmp_id not in seqr or seqr[cmp_id] < int(cmp_end):
344 seqr[cmp_id] = int(cmp_end)
346 self.lengths["agp"] = seqr
348 def load_genome(self) -> None:
349 """Load the genome data."""
350 if "genome" not in self.manifest_files:
351 return
352 logging.info("Manifest contains genome JSON")
353 self.genome = get_json(Path(self.manifest_files["genome"]))
355 def prepare_integrity_data(self) -> None: # pylint: disable=too-many-branches
356 """Read all the files and keep a record (IDs and their lengths) for each case to be compared later."""
357 self.load_gff3()
358 self.load_dna_fasta_lengths()
359 self.load_peptides_fasta_lengths()
360 self.load_seq_regions()
361 self.load_functional_annotation()
362 self.load_agp_seq_regions(self.manifest_files.get("agp"))
363 self.load_genome()
365 def has_lengths(self, name: str) -> bool:
366 """Check if a given name has lengths records.
368 Args:
369 name: Name for the lengths to check.
371 Raises:
372 KeyError: If the name is not supported.
373 """
374 try:
375 return bool(self.lengths[name])
376 except KeyError as err:
377 raise KeyError(f"There is no length record for {name}") from err
379 def get_lengths(self, name: str) -> dict[str, Any]:
380 """Returns a dict associating IDs with their length from a given file name.
382 Args:
383 name: Name for the lengths to get.
385 Raises:
386 KeyError: If the name is not supported.
387 """
388 try:
389 return self.lengths[name]
390 except KeyError as err:
391 raise KeyError(f"There is no length record for {name}") from err
393 def get_circular(self, name: str) -> dict[str, Any]:
394 """Returns a dict associating IDs with their is_circular flag from a given file name.
396 Args:
397 name: Name for the circular data to get.
399 Raises:
400 KeyError: If the name is not supported.
401 """
402 try:
403 return self.circular[name]
404 except KeyError as err:
405 raise KeyError(f"No length available for key {name}") from err