Coverage for src/python/ensembl/io/genomio/manifest/compute_stats.py: 11%
215 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-02-21 15:37 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-02-21 15:37 +0000
1# See the NOTICE file distributed with this work for additional information
2# regarding copyright ownership.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15"""Compute stats from the current genome files associated with the manifest."""
17__all__ = [
18 "BiotypeCounter",
19 "manifest_stats",
20 "StatsError",
21]
23import json
24from pathlib import Path
25from shutil import which
26from statistics import mean
27import subprocess
28from typing import Dict, List, Optional, Set, Union
30from BCBio import GFF
32import ensembl.io.genomio
33from ensembl.utils import StrPath
34from ensembl.utils.archive import open_gz_file
35from ensembl.utils.argparse import ArgumentParser
36from ensembl.utils.logging import init_logging_with_args
39class BiotypeCounter:
40 """A counter for a given biotype, given a list of features."""
42 def __init__(self, count: int = 0, ids: Optional[Set[str]] = None, example: Optional[str] = None) -> None:
43 self.count: int = count
44 if ids is None:
45 ids = set()
46 self.ids: Set[str] = ids
47 if example is None:
48 example = ""
49 self.example: str = example
51 def add_id(self, feature_id: str) -> None:
52 """Add a feature to the counter.
54 Args:
55 feature_id (str): Feature id to add.
56 """
57 self.count += 1
58 self.ids.add(feature_id)
60 def unique_count(self) -> int:
61 """Total number feature ids added to the counter so far.
63 Returns:
64 int: number of features in the counter.
65 """
66 return len(self.ids)
69class StatsError(Exception):
70 """Raised when stats could not be computed."""
73class manifest_stats:
74 """Representation of the statistics of the set of files listed in the manifest file provided."""
76 def __init__(self, manifest_dir: str, accession: Optional[str], datasets_bin: Optional[str]):
77 self.manifest = f"{manifest_dir}/manifest.json"
78 self.accession: Optional[str] = accession
79 self.errors: List[str] = []
80 self.errors_file = Path(manifest_dir) / "stats_diff.log"
81 if datasets_bin is None:
82 datasets_bin = "datasets"
83 self.datasets_bin = datasets_bin
84 self.manifest_parent = manifest_dir
85 self.check_ncbi = False
87 def run(self, stats_path: StrPath) -> None:
88 """Compute stats in the files and output a stats.txt file in the same folder.
90 Raises:
91 StatsError: Could not compute some stats.
92 """
93 manifest = self.get_manifest()
95 stats = []
96 if self.accession is not None:
97 stats.append(self.accession)
99 # Compute the stats from the GFF3 file
100 if "gff3" in manifest:
101 stats += self.get_gff3_stats(Path(manifest["gff3"]))
103 # Compute the stats from the seq_region file
104 if "seq_region" in manifest:
105 stats += self.get_seq_region_stats(Path(manifest["seq_region"]))
107 # Print out the stats in a separate file
108 with Path(stats_path).open("w") as stats_out:
109 stats_out.write("\n".join(stats))
111 # Die if there were errors in stats comparison
112 if self.errors:
113 with self.errors_file.open("w") as errors_fh:
114 for error_line in self.errors:
115 errors_fh.write(error_line)
117 def get_manifest(self) -> Dict:
118 """Get the files metadata from the manifest json file.
120 Returns:
121 Dict: A representation of the manifest json data.
122 """
123 with open(self.manifest) as f_json:
124 manifest = json.load(f_json)
125 manifest_root = self.manifest_parent
127 # Use dir name from the manifest
128 for name in manifest:
129 if "file" in manifest[name]:
130 file_name = manifest[name]["file"]
131 file_name = f"{manifest_root}/{file_name}"
132 manifest[name] = file_name
133 else:
134 for f in manifest[name]:
135 if "file" in manifest[name][f]:
136 file_name = manifest[name][f]["file"]
137 file_name = manifest_root, file_name
138 manifest[name][f] = file_name
140 return manifest
142 def get_seq_region_stats(self, seq_region_path: Path) -> List[str]:
143 """Compute stats from the seq_region json file.
145 Args:
146 seq_region_path (Path): the seq_region json file.
148 Returns:
149 List[str]: Stats from the seq_regions.
150 """
151 with seq_region_path.open("r") as json_file:
152 seq_regions = json.load(json_file)
154 # Get basic data
155 coord_systems: Dict[str, List[int]] = {}
156 circular = 0
157 locations = []
158 codon_tables = []
159 for seqr in seq_regions:
160 # Get readable seq_region name:
161 # either use a Genbank synonym, or just the provided seq_region name
162 genbank = "synonyms" in seqr and [x for x in seqr["synonyms"] if x["source"] == "GenBank"]
163 seqr_name = genbank and genbank[0]["name"] or seqr["name"]
165 # Record the lengths of the elements of each coord_system
166 coord_level = seqr["coord_system_level"]
167 if coord_level not in coord_systems:
168 coord_systems[coord_level] = []
169 coord_systems[coord_level].append(seqr["length"])
171 # Additional metadata records to count
172 if "circular" in seqr:
173 circular += 1
174 if "codon_table" in seqr:
175 codon_tables.append(f"{seqr_name} = {seqr['codon_table']}")
176 if "location" in seqr:
177 locations.append(f"{seqr_name} = {seqr['location']}")
179 # Stats
180 stats: List[str] = []
181 stats.append(seq_region_path.name)
182 stats += self.coord_systems_stats(coord_systems)
183 stats += self.seq_region_special_stats(circular, locations, codon_tables)
184 stats.append("\n")
185 return stats
187 def coord_systems_stats(self, coord_systems: Dict[str, List[int]]) -> List[str]:
188 """For each coord_system compute various stats:
189 - number of sequences
190 - sequence length sum, minimum, maximum, mean
192 Args:
193 coord_systems: Coordinate system dictionary of lengths.
195 Returns:
196 A list with the computed statistics in a printable format.
197 """
198 stats: List[str] = []
199 stats.append(f"Total coord_systems {len(coord_systems)}")
200 for coord_name, lengths in coord_systems.items():
201 stats.append(f"\nCoord_system: {coord_name}")
203 stat_counts: Dict[str, Union[int, float]] = {
204 "Number of sequences": len(lengths),
205 "Sequence length sum": sum(lengths),
206 "Sequence length minimum": min(lengths),
207 "Sequence length maximum": max(lengths),
208 "Sequence length mean": mean(lengths),
209 }
211 for name, count in stat_counts.items():
212 if isinstance(count, int):
213 stats.append(f"{count: 9d}\t{name}")
214 else:
215 stats.append(f"{count: 9f}\t{name}")
216 return stats
218 def seq_region_special_stats(
219 self,
220 circular: int = 0,
221 locations: Optional[List[str]] = None,
222 codon_tables: Optional[List[str]] = None,
223 ) -> List[str]:
224 """Prepare stats in case there are circular regions, specific locations and codon_tables.
225 stats.append(f"{count: 9f}\t{name}")
227 Args:
228 circular: Number of circular regions. Defaults to 0.
229 locations: The regions and their location. Defaults to None.
230 codon_tables: The regions and their codon_table. Defaults to None.
232 Returns:
233 A list with the computed statistics in a printable format.
234 """
235 stats: List[str] = []
236 if circular or locations or codon_tables:
237 stats.append("\nSpecial")
238 if circular:
239 stats.append(f"{circular: 9d}\tcircular sequences")
240 if locations is not None:
241 stats.append(f"{len(locations): 9d} sequences with location")
242 for loc in locations:
243 stats.append(f"\t\t\t{loc}")
244 if codon_tables:
245 stats.append(f"{len(codon_tables): 9d} sequences with codon_table")
246 for table in codon_tables:
247 stats.append(f"\t\t\t{table}")
248 return stats
250 def get_gff3_stats(self, gff3_path: Path) -> List[str]:
251 """Extract the gene models from the GFF3 file and compute stats.
253 Args:
254 gff3_path (Path): the GFF3 file.
256 Returns:
257 List: Stats from the gene model.
258 """
260 biotypes = self.count_biotypes(gff3_path)
261 # Compile final stats
262 stats = self.biotypes_stats(biotypes)
263 stats += self.check_ncbi_stats(biotypes)
264 return stats
266 def count_biotypes(self, gff3_path: Path) -> Dict[str, BiotypeCounter]:
267 """Count the biotypes in a GFF3 file.
269 Args:
270 gff3_path: Path to the GFF3 file.
272 Returns:
273 Dictionary of biotype counters.
274 """
276 biotypes: Dict[str, BiotypeCounter] = {}
278 with open_gz_file(gff3_path) as gff3_handle:
279 for rec in GFF.parse(gff3_handle):
280 for feat1 in rec.features:
281 # Check if the gene contains proteins (CDSs),
282 # and keep a count of all hierarchies (e.g. gene-mRNA-CDS)
283 is_protein = False
284 for feat2 in feat1.sub_features:
285 if feat2.type == "mRNA":
286 types2 = {f.type for f in feat2.sub_features}
287 if "CDS" in types2:
288 is_protein = True
289 manifest_stats.increment_biotype(biotypes, feat2.id, f"{feat1.type}-{feat2.type}")
290 for feat3 in feat2.sub_features:
291 if feat3.type == "exon":
292 continue
293 manifest_stats.increment_biotype(
294 biotypes, feat3.id, f"{feat1.type}-{feat2.type}-{feat3.type}"
295 )
297 # Main categories counts
298 if feat1.type == "pseudogene":
299 manifest_stats.increment_biotype(biotypes, feat1.id, "pseudogene")
300 elif is_protein:
301 manifest_stats.increment_biotype(biotypes, feat1.id, f"PROT_{feat1.type}")
302 else:
303 # Special case, undefined gene-transcript
304 if (
305 feat1.type == "gene"
306 and feat1.sub_features
307 and feat1.sub_features[0].type == "transcript"
308 ):
309 manifest_stats.increment_biotype(biotypes, feat1.id, "OTHER")
310 else:
311 manifest_stats.increment_biotype(biotypes, feat1.id, f"NONPROT_{feat1.type}")
313 # Total
314 if feat1.type in ("gene", "pseudogene"):
315 manifest_stats.increment_biotype(biotypes, feat1.id, "ALL_GENES")
316 return biotypes
318 def biotypes_stats(self, biotypes: Dict[str, BiotypeCounter]) -> List[str]:
319 """Prepare biotype stats in order of their name.
321 Args:
322 biotypes: Biotypes counters.
324 Returns:
325 A list with the computed statistics in a printable format.
326 """
327 sorted_biotypes = {}
328 for name in sorted(biotypes.keys()):
329 data: BiotypeCounter = biotypes[name]
330 sorted_biotypes[name] = data
332 stats = [
333 f"{data.unique_count():>9}\t{biotype:<20}\tID = {data.example}"
334 for (biotype, data) in sorted_biotypes.items()
335 ]
336 return stats
338 def check_ncbi_stats(self, biotypes: Dict[str, BiotypeCounter]) -> List[str]:
339 """Use the dataset tool from NCBI to get stats and compare with what we have"""
340 stats: List[str] = []
341 if not self.check_ncbi:
342 return stats
344 if self.accession is None:
345 return stats
347 accession: str = self.accession
349 datasets_bin = self.datasets_bin
350 if not which(datasets_bin):
351 return stats
353 # Get the dataset summary from NCBI
354 command = [datasets_bin, "summary", "genome", "accession", accession]
355 result_out = subprocess.run(command, stdout=subprocess.PIPE, check=True)
356 result = json.loads(result_out.stdout)
358 # Get stats
359 if "reports" in result:
360 genome = result["reports"][0]
361 if "annotation_info" in genome and "stats" in genome["annotation_info"]:
362 ncbi_stats = genome["annotation_info"]["stats"]
364 if "gene_counts" in ncbi_stats:
365 counts = ncbi_stats["gene_counts"]
366 stats = self.compare_ncbi_counts(biotypes, counts)
367 return stats
369 def compare_ncbi_counts(self, biotypes: Dict[str, BiotypeCounter], ncbi: Dict) -> List[str]:
370 """Compare specific gene stats from NCBI"""
371 stats: List[str] = []
373 maps = [
374 ["total", "ALL_GENES"],
375 ["protein_coding", "PROT_gene"],
376 ["pseudogene", "pseudogene"],
377 ["non_coding", "NONPROT_gene"],
378 ["other", "OTHER"],
379 ]
381 for count_map in maps:
382 ncbi_name, prep_name = count_map
383 ncbi_count = ncbi.get(ncbi_name, 0)
384 prepped: Optional[BiotypeCounter] = biotypes.get(prep_name)
385 prep_count = 0
386 if prepped is not None:
387 prep_count = prepped.count
389 if prep_count != ncbi_count:
390 diff = prep_count - ncbi_count
391 self.errors.append(f"DIFF gene count for {count_map}: {prep_count} - {ncbi_count} = {diff}")
392 else:
393 stats.append(f"Same count for {count_map}: {prep_count}")
395 return stats
397 @staticmethod
398 def increment_biotype(biotypes: Dict[str, BiotypeCounter], feature_id: str, feature_biotype: str) -> None:
399 """Add the feature to their respective biotype counter.
401 Args:
402 biotypes (Dict[str, BiotypeCounter]): All current biotypes, with their counter.
403 feature_id (str): Feature id to be counted.
404 feature_biotype (str): The biotype of the feature.
405 """
406 if feature_biotype not in biotypes:
407 biotypes[feature_biotype] = BiotypeCounter(example=feature_id)
408 biotypes[feature_biotype].add_id(feature_id)
411def main() -> None:
412 """Main entrypoint."""
413 parser = ArgumentParser(
414 description="Compute stats from the current genome files associated with the manifest."
415 )
416 parser.add_argument_src_path(
417 "--manifest_dir", required=True, help="Manifest directory where 'manifest.json' file is located"
418 )
419 parser.add_argument("--accession", help="Sequence accession ID to compare stats with NCBI")
420 parser.add_argument("--datasets_bin", help="Datasets bin status")
421 parser.add_argument_dst_path("--stats_file", help="Output file with the stats")
422 parser.add_argument("--version", action="version", version=ensembl.io.genomio.__version__)
423 parser.add_log_arguments()
424 args = parser.parse_args()
425 init_logging_with_args(args)
427 mstats = manifest_stats(args.manifest_dir, args.accession, args.datasets_bin)
428 if args.accession is not None:
429 mstats.check_ncbi = True
430 stats_file = args.stats_file if args.stats_file is not None else args.manifest_dir / "stats.txt"
431 mstats.run(stats_file)
434if __name__ == "__main__":
435 main()