Coverage for src/python/ensembl/io/genomio/manifest/compute_stats.py: 11%

215 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-02-21 15:37 +0000

1# See the NOTICE file distributed with this work for additional information 

2# regarding copyright ownership. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15"""Compute stats from the current genome files associated with the manifest.""" 

16 

17__all__ = [ 

18 "BiotypeCounter", 

19 "manifest_stats", 

20 "StatsError", 

21] 

22 

23import json 

24from pathlib import Path 

25from shutil import which 

26from statistics import mean 

27import subprocess 

28from typing import Dict, List, Optional, Set, Union 

29 

30from BCBio import GFF 

31 

32import ensembl.io.genomio 

33from ensembl.utils import StrPath 

34from ensembl.utils.archive import open_gz_file 

35from ensembl.utils.argparse import ArgumentParser 

36from ensembl.utils.logging import init_logging_with_args 

37 

38 

39class BiotypeCounter: 

40 """A counter for a given biotype, given a list of features.""" 

41 

42 def __init__(self, count: int = 0, ids: Optional[Set[str]] = None, example: Optional[str] = None) -> None: 

43 self.count: int = count 

44 if ids is None: 

45 ids = set() 

46 self.ids: Set[str] = ids 

47 if example is None: 

48 example = "" 

49 self.example: str = example 

50 

51 def add_id(self, feature_id: str) -> None: 

52 """Add a feature to the counter. 

53 

54 Args: 

55 feature_id (str): Feature id to add. 

56 """ 

57 self.count += 1 

58 self.ids.add(feature_id) 

59 

60 def unique_count(self) -> int: 

61 """Total number feature ids added to the counter so far. 

62 

63 Returns: 

64 int: number of features in the counter. 

65 """ 

66 return len(self.ids) 

67 

68 

69class StatsError(Exception): 

70 """Raised when stats could not be computed.""" 

71 

72 

73class manifest_stats: 

74 """Representation of the statistics of the set of files listed in the manifest file provided.""" 

75 

76 def __init__(self, manifest_dir: str, accession: Optional[str], datasets_bin: Optional[str]): 

77 self.manifest = f"{manifest_dir}/manifest.json" 

78 self.accession: Optional[str] = accession 

79 self.errors: List[str] = [] 

80 self.errors_file = Path(manifest_dir) / "stats_diff.log" 

81 if datasets_bin is None: 

82 datasets_bin = "datasets" 

83 self.datasets_bin = datasets_bin 

84 self.manifest_parent = manifest_dir 

85 self.check_ncbi = False 

86 

87 def run(self, stats_path: StrPath) -> None: 

88 """Compute stats in the files and output a stats.txt file in the same folder. 

89 

90 Raises: 

91 StatsError: Could not compute some stats. 

92 """ 

93 manifest = self.get_manifest() 

94 

95 stats = [] 

96 if self.accession is not None: 

97 stats.append(self.accession) 

98 

99 # Compute the stats from the GFF3 file 

100 if "gff3" in manifest: 

101 stats += self.get_gff3_stats(Path(manifest["gff3"])) 

102 

103 # Compute the stats from the seq_region file 

104 if "seq_region" in manifest: 

105 stats += self.get_seq_region_stats(Path(manifest["seq_region"])) 

106 

107 # Print out the stats in a separate file 

108 with Path(stats_path).open("w") as stats_out: 

109 stats_out.write("\n".join(stats)) 

110 

111 # Die if there were errors in stats comparison 

112 if self.errors: 

113 with self.errors_file.open("w") as errors_fh: 

114 for error_line in self.errors: 

115 errors_fh.write(error_line) 

116 

117 def get_manifest(self) -> Dict: 

118 """Get the files metadata from the manifest json file. 

119 

120 Returns: 

121 Dict: A representation of the manifest json data. 

122 """ 

123 with open(self.manifest) as f_json: 

124 manifest = json.load(f_json) 

125 manifest_root = self.manifest_parent 

126 

127 # Use dir name from the manifest 

128 for name in manifest: 

129 if "file" in manifest[name]: 

130 file_name = manifest[name]["file"] 

131 file_name = f"{manifest_root}/{file_name}" 

132 manifest[name] = file_name 

133 else: 

134 for f in manifest[name]: 

135 if "file" in manifest[name][f]: 

136 file_name = manifest[name][f]["file"] 

137 file_name = manifest_root, file_name 

138 manifest[name][f] = file_name 

139 

140 return manifest 

141 

142 def get_seq_region_stats(self, seq_region_path: Path) -> List[str]: 

143 """Compute stats from the seq_region json file. 

144 

145 Args: 

146 seq_region_path (Path): the seq_region json file. 

147 

148 Returns: 

149 List[str]: Stats from the seq_regions. 

150 """ 

151 with seq_region_path.open("r") as json_file: 

152 seq_regions = json.load(json_file) 

153 

154 # Get basic data 

155 coord_systems: Dict[str, List[int]] = {} 

156 circular = 0 

157 locations = [] 

158 codon_tables = [] 

159 for seqr in seq_regions: 

160 # Get readable seq_region name: 

161 # either use a Genbank synonym, or just the provided seq_region name 

162 genbank = "synonyms" in seqr and [x for x in seqr["synonyms"] if x["source"] == "GenBank"] 

163 seqr_name = genbank and genbank[0]["name"] or seqr["name"] 

164 

165 # Record the lengths of the elements of each coord_system 

166 coord_level = seqr["coord_system_level"] 

167 if coord_level not in coord_systems: 

168 coord_systems[coord_level] = [] 

169 coord_systems[coord_level].append(seqr["length"]) 

170 

171 # Additional metadata records to count 

172 if "circular" in seqr: 

173 circular += 1 

174 if "codon_table" in seqr: 

175 codon_tables.append(f"{seqr_name} = {seqr['codon_table']}") 

176 if "location" in seqr: 

177 locations.append(f"{seqr_name} = {seqr['location']}") 

178 

179 # Stats 

180 stats: List[str] = [] 

181 stats.append(seq_region_path.name) 

182 stats += self.coord_systems_stats(coord_systems) 

183 stats += self.seq_region_special_stats(circular, locations, codon_tables) 

184 stats.append("\n") 

185 return stats 

186 

187 def coord_systems_stats(self, coord_systems: Dict[str, List[int]]) -> List[str]: 

188 """For each coord_system compute various stats: 

189 - number of sequences 

190 - sequence length sum, minimum, maximum, mean 

191 

192 Args: 

193 coord_systems: Coordinate system dictionary of lengths. 

194 

195 Returns: 

196 A list with the computed statistics in a printable format. 

197 """ 

198 stats: List[str] = [] 

199 stats.append(f"Total coord_systems {len(coord_systems)}") 

200 for coord_name, lengths in coord_systems.items(): 

201 stats.append(f"\nCoord_system: {coord_name}") 

202 

203 stat_counts: Dict[str, Union[int, float]] = { 

204 "Number of sequences": len(lengths), 

205 "Sequence length sum": sum(lengths), 

206 "Sequence length minimum": min(lengths), 

207 "Sequence length maximum": max(lengths), 

208 "Sequence length mean": mean(lengths), 

209 } 

210 

211 for name, count in stat_counts.items(): 

212 if isinstance(count, int): 

213 stats.append(f"{count: 9d}\t{name}") 

214 else: 

215 stats.append(f"{count: 9f}\t{name}") 

216 return stats 

217 

218 def seq_region_special_stats( 

219 self, 

220 circular: int = 0, 

221 locations: Optional[List[str]] = None, 

222 codon_tables: Optional[List[str]] = None, 

223 ) -> List[str]: 

224 """Prepare stats in case there are circular regions, specific locations and codon_tables. 

225 stats.append(f"{count: 9f}\t{name}") 

226 

227 Args: 

228 circular: Number of circular regions. Defaults to 0. 

229 locations: The regions and their location. Defaults to None. 

230 codon_tables: The regions and their codon_table. Defaults to None. 

231 

232 Returns: 

233 A list with the computed statistics in a printable format. 

234 """ 

235 stats: List[str] = [] 

236 if circular or locations or codon_tables: 

237 stats.append("\nSpecial") 

238 if circular: 

239 stats.append(f"{circular: 9d}\tcircular sequences") 

240 if locations is not None: 

241 stats.append(f"{len(locations): 9d} sequences with location") 

242 for loc in locations: 

243 stats.append(f"\t\t\t{loc}") 

244 if codon_tables: 

245 stats.append(f"{len(codon_tables): 9d} sequences with codon_table") 

246 for table in codon_tables: 

247 stats.append(f"\t\t\t{table}") 

248 return stats 

249 

250 def get_gff3_stats(self, gff3_path: Path) -> List[str]: 

251 """Extract the gene models from the GFF3 file and compute stats. 

252 

253 Args: 

254 gff3_path (Path): the GFF3 file. 

255 

256 Returns: 

257 List: Stats from the gene model. 

258 """ 

259 

260 biotypes = self.count_biotypes(gff3_path) 

261 # Compile final stats 

262 stats = self.biotypes_stats(biotypes) 

263 stats += self.check_ncbi_stats(biotypes) 

264 return stats 

265 

266 def count_biotypes(self, gff3_path: Path) -> Dict[str, BiotypeCounter]: 

267 """Count the biotypes in a GFF3 file. 

268 

269 Args: 

270 gff3_path: Path to the GFF3 file. 

271 

272 Returns: 

273 Dictionary of biotype counters. 

274 """ 

275 

276 biotypes: Dict[str, BiotypeCounter] = {} 

277 

278 with open_gz_file(gff3_path) as gff3_handle: 

279 for rec in GFF.parse(gff3_handle): 

280 for feat1 in rec.features: 

281 # Check if the gene contains proteins (CDSs), 

282 # and keep a count of all hierarchies (e.g. gene-mRNA-CDS) 

283 is_protein = False 

284 for feat2 in feat1.sub_features: 

285 if feat2.type == "mRNA": 

286 types2 = {f.type for f in feat2.sub_features} 

287 if "CDS" in types2: 

288 is_protein = True 

289 manifest_stats.increment_biotype(biotypes, feat2.id, f"{feat1.type}-{feat2.type}") 

290 for feat3 in feat2.sub_features: 

291 if feat3.type == "exon": 

292 continue 

293 manifest_stats.increment_biotype( 

294 biotypes, feat3.id, f"{feat1.type}-{feat2.type}-{feat3.type}" 

295 ) 

296 

297 # Main categories counts 

298 if feat1.type == "pseudogene": 

299 manifest_stats.increment_biotype(biotypes, feat1.id, "pseudogene") 

300 elif is_protein: 

301 manifest_stats.increment_biotype(biotypes, feat1.id, f"PROT_{feat1.type}") 

302 else: 

303 # Special case, undefined gene-transcript 

304 if ( 

305 feat1.type == "gene" 

306 and feat1.sub_features 

307 and feat1.sub_features[0].type == "transcript" 

308 ): 

309 manifest_stats.increment_biotype(biotypes, feat1.id, "OTHER") 

310 else: 

311 manifest_stats.increment_biotype(biotypes, feat1.id, f"NONPROT_{feat1.type}") 

312 

313 # Total 

314 if feat1.type in ("gene", "pseudogene"): 

315 manifest_stats.increment_biotype(biotypes, feat1.id, "ALL_GENES") 

316 return biotypes 

317 

318 def biotypes_stats(self, biotypes: Dict[str, BiotypeCounter]) -> List[str]: 

319 """Prepare biotype stats in order of their name. 

320 

321 Args: 

322 biotypes: Biotypes counters. 

323 

324 Returns: 

325 A list with the computed statistics in a printable format. 

326 """ 

327 sorted_biotypes = {} 

328 for name in sorted(biotypes.keys()): 

329 data: BiotypeCounter = biotypes[name] 

330 sorted_biotypes[name] = data 

331 

332 stats = [ 

333 f"{data.unique_count():>9}\t{biotype:<20}\tID = {data.example}" 

334 for (biotype, data) in sorted_biotypes.items() 

335 ] 

336 return stats 

337 

338 def check_ncbi_stats(self, biotypes: Dict[str, BiotypeCounter]) -> List[str]: 

339 """Use the dataset tool from NCBI to get stats and compare with what we have""" 

340 stats: List[str] = [] 

341 if not self.check_ncbi: 

342 return stats 

343 

344 if self.accession is None: 

345 return stats 

346 

347 accession: str = self.accession 

348 

349 datasets_bin = self.datasets_bin 

350 if not which(datasets_bin): 

351 return stats 

352 

353 # Get the dataset summary from NCBI 

354 command = [datasets_bin, "summary", "genome", "accession", accession] 

355 result_out = subprocess.run(command, stdout=subprocess.PIPE, check=True) 

356 result = json.loads(result_out.stdout) 

357 

358 # Get stats 

359 if "reports" in result: 

360 genome = result["reports"][0] 

361 if "annotation_info" in genome and "stats" in genome["annotation_info"]: 

362 ncbi_stats = genome["annotation_info"]["stats"] 

363 

364 if "gene_counts" in ncbi_stats: 

365 counts = ncbi_stats["gene_counts"] 

366 stats = self.compare_ncbi_counts(biotypes, counts) 

367 return stats 

368 

369 def compare_ncbi_counts(self, biotypes: Dict[str, BiotypeCounter], ncbi: Dict) -> List[str]: 

370 """Compare specific gene stats from NCBI""" 

371 stats: List[str] = [] 

372 

373 maps = [ 

374 ["total", "ALL_GENES"], 

375 ["protein_coding", "PROT_gene"], 

376 ["pseudogene", "pseudogene"], 

377 ["non_coding", "NONPROT_gene"], 

378 ["other", "OTHER"], 

379 ] 

380 

381 for count_map in maps: 

382 ncbi_name, prep_name = count_map 

383 ncbi_count = ncbi.get(ncbi_name, 0) 

384 prepped: Optional[BiotypeCounter] = biotypes.get(prep_name) 

385 prep_count = 0 

386 if prepped is not None: 

387 prep_count = prepped.count 

388 

389 if prep_count != ncbi_count: 

390 diff = prep_count - ncbi_count 

391 self.errors.append(f"DIFF gene count for {count_map}: {prep_count} - {ncbi_count} = {diff}") 

392 else: 

393 stats.append(f"Same count for {count_map}: {prep_count}") 

394 

395 return stats 

396 

397 @staticmethod 

398 def increment_biotype(biotypes: Dict[str, BiotypeCounter], feature_id: str, feature_biotype: str) -> None: 

399 """Add the feature to their respective biotype counter. 

400 

401 Args: 

402 biotypes (Dict[str, BiotypeCounter]): All current biotypes, with their counter. 

403 feature_id (str): Feature id to be counted. 

404 feature_biotype (str): The biotype of the feature. 

405 """ 

406 if feature_biotype not in biotypes: 

407 biotypes[feature_biotype] = BiotypeCounter(example=feature_id) 

408 biotypes[feature_biotype].add_id(feature_id) 

409 

410 

411def main() -> None: 

412 """Main entrypoint.""" 

413 parser = ArgumentParser( 

414 description="Compute stats from the current genome files associated with the manifest." 

415 ) 

416 parser.add_argument_src_path( 

417 "--manifest_dir", required=True, help="Manifest directory where 'manifest.json' file is located" 

418 ) 

419 parser.add_argument("--accession", help="Sequence accession ID to compare stats with NCBI") 

420 parser.add_argument("--datasets_bin", help="Datasets bin status") 

421 parser.add_argument_dst_path("--stats_file", help="Output file with the stats") 

422 parser.add_argument("--version", action="version", version=ensembl.io.genomio.__version__) 

423 parser.add_log_arguments() 

424 args = parser.parse_args() 

425 init_logging_with_args(args) 

426 

427 mstats = manifest_stats(args.manifest_dir, args.accession, args.datasets_bin) 

428 if args.accession is not None: 

429 mstats.check_ncbi = True 

430 stats_file = args.stats_file if args.stats_file is not None else args.manifest_dir / "stats.txt" 

431 mstats.run(stats_file) 

432 

433 

434if __name__ == "__main__": 

435 main()