Coverage for src/python/ensembl/io/genomio/manifest/manifest_stats.py: 91%

198 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-02-21 15:37 +0000

1# See the NOTICE file distributed with this work for additional information 

2# regarding copyright ownership. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15"""Register the main stats for all files from a manifest for comparison.""" 

16 

17__all__ = ["InvalidIntegrityError", "ManifestStats", "StatsLengths"] 

18 

19import logging 

20import json 

21from math import floor 

22from os import PathLike 

23from pathlib import Path 

24from typing import Any, TypeAlias 

25 

26from BCBio import GFF 

27from Bio import SeqIO 

28 

29from ensembl.io.genomio.gff3.features import GFFSeqFeature 

30from ensembl.io.genomio.manifest.manifest import Manifest 

31from ensembl.io.genomio.utils import get_json 

32from ensembl.utils.archive import open_gz_file 

33from ensembl.utils import StrPath 

34 

35 

36# Record the lengths of the sequence for features/regions 

37StatsLengths: TypeAlias = dict[str, int] 

38 

39 

40class InvalidIntegrityError(Exception): 

41 """When a file integrity check fails""" 

42 

43 

44class ManifestStats: 

45 """Representation of the main stats of the files in a manifest for comparison. 

46 

47 The stats in question are: 

48 - lengths of sequences (DNA, genes and peptides) 

49 - sequences and features IDs 

50 - sequences circularity 

51 """ 

52 

53 def __init__(self, manifest_path: StrPath, ignore_final_stops: bool = False) -> None: 

54 self.manifest_files = self._get_manifest(manifest_path) 

55 self.genome: dict[str, Any] = {} 

56 

57 self.lengths: dict[str, StatsLengths] = { 

58 "dna_sequences": {}, 

59 "peptide_sequences": {}, 

60 "seq_region_levels": {}, 

61 "annotations": {}, 

62 "agp": {}, 

63 "gff3_seq_regions": {}, 

64 "gff3_genes": {}, 

65 "gff3_translations": {}, 

66 "gff3_all_translations": {}, 

67 "gff3_transposable_elements": {}, 

68 "ann_genes": {}, 

69 "ann_translations": {}, 

70 "ann_transposable_elements": {}, 

71 "seq_regions": {}, 

72 } 

73 

74 self.circular: dict[str, StatsLengths] = { 

75 "seq_regions": {}, 

76 } 

77 

78 self.errors: list[str] = [] 

79 

80 self.ignore_final_stops = ignore_final_stops 

81 

82 def _get_manifest(self, manifest_path: PathLike) -> dict[str, Any]: 

83 """Load the content of a manifest file. 

84 

85 Returns: 

86 Dict: Content of the manifest file. 

87 """ 

88 manifest = Manifest(Path(manifest_path).parent) 

89 manifest_files = manifest.load() 

90 

91 # Replace the {file, md5} dict with the file path 

92 for name in manifest_files: 

93 if "file" in manifest_files[name]: 

94 manifest_files[name] = Path(manifest_path).parent / manifest_files[name]["file"] 

95 else: 

96 for f in manifest_files[name]: 

97 manifest_files[name][f] = Path(manifest_path).parent / manifest_files[name][f]["file"] 

98 return manifest_files 

99 

100 def add_error(self, error: str) -> None: 

101 """Record an error.""" 

102 self.errors.append(error) 

103 

104 def load_seq_regions(self) -> None: 

105 """Retrieve seq_regions lengths and circular information from the seq_region JSON file.""" 

106 

107 if "seq_region" not in self.manifest_files: 

108 return 

109 logging.info("Manifest contains seq_region JSON") 

110 seq_regions = get_json(Path(self.manifest_files["seq_region"])) 

111 seqr_seqlevel = {} 

112 seq_lengths = {} 

113 seq_circular = {} 

114 # Store the length as int 

115 for seq in seq_regions: 

116 seq_lengths[seq["name"]] = int(seq["length"]) 

117 seq_circular[seq["name"]] = seq.get("circular", False) 

118 if seq["coord_system_level"] == "contig": 

119 seqr_seqlevel[seq["name"]] = int(seq["length"]) 

120 # Also record synonyms (in case GFF file uses synonyms) 

121 if "synonyms" in seq: 

122 for synonym in seq["synonyms"]: 

123 seq_lengths[synonym["name"]] = int(seq["length"]) 

124 self.lengths["seq_regions"] = seq_lengths 

125 self.circular["seq_regions"] = seq_circular 

126 

127 def load_peptides_fasta_lengths(self) -> None: 

128 """Retrieve peptides sequences lengths from their FASTA file.""" 

129 if "fasta_pep" not in self.manifest_files: 

130 return 

131 self.lengths["peptide_sequences"] = self._get_fasta_lengths( 

132 self.manifest_files["fasta_pep"], ignore_final_stops=self.ignore_final_stops 

133 ) 

134 

135 def load_dna_fasta_lengths(self) -> None: 

136 """Retrieve DNA sequences lengths from their FASTA file.""" 

137 if "fasta_dna" not in self.manifest_files: 

138 return 

139 self.lengths["dna_sequences"] = self._get_fasta_lengths(self.manifest_files["fasta_dna"]) 

140 

141 def _get_fasta_lengths(self, fasta_path: StrPath, ignore_final_stops: bool = False) -> dict[str, int]: 

142 """Returns every sequence ID and its length from a FASTA file (DNA or peptide). 

143 

144 An error will be added for every empty id, non-unique id or stop codon found in the FASTA file. 

145 

146 Args: 

147 fasta_path: Path to FASTA file. 

148 ignore_final_stops: Do not include final stop in the total length. 

149 

150 """ 

151 

152 data = {} 

153 non_unique = {} 

154 non_unique_count = 0 

155 empty_id_count = 0 

156 contains_stop_codon = 0 

157 rec_count = 0 

158 for rec in SeqIO.parse(fasta_path, "fasta"): 

159 rec_count += 1 

160 

161 # Flag empty ids 

162 if rec.id == "": 

163 empty_id_count += 1 

164 continue 

165 # Flag redundant ids 

166 if rec.id in data: 

167 non_unique[rec.id] = 1 

168 non_unique_count += 1 

169 # Store sequence id and length 

170 data[rec.id] = len(rec.seq) 

171 stops = rec.seq.count("*") 

172 if stops >= 1 and not rec.seq.endswith("*"): 

173 contains_stop_codon += 1 

174 elif rec.seq.endswith("*") and not ignore_final_stops: 

175 contains_stop_codon += 1 

176 

177 if empty_id_count > 0: 

178 self.add_error(f"{empty_id_count} sequences with empty ids in {fasta_path}") 

179 if non_unique_count > 0: 

180 self.add_error(f"{non_unique_count} non unique sequence ids in {fasta_path}") 

181 if contains_stop_codon > 0: 

182 self.add_error(f"{contains_stop_codon} sequences with stop codons in {fasta_path}") 

183 if rec_count == 0: 

184 self.add_error(f"No sequences found in {fasta_path}") 

185 return data 

186 

187 def load_functional_annotation(self) -> None: 

188 """Load the functional annotation file to retrieve the gene_id and translation id. 

189 

190 The functional annotation file is stored in a JSON format containing the description, id 

191 and object type, eg: "gene", "transcript", "translation". 

192 

193 """ 

194 if "functional_annotation" not in self.manifest_files: 

195 return 

196 logging.info("Manifest contains functional annotation(s)") 

197 

198 # Load the json file 

199 with open(self.manifest_files["functional_annotation"]) as json_file: 

200 data = json.load(json_file) 

201 

202 # Get gene ids and translation ids 

203 genes = {} 

204 translations = {} 

205 transposons = {} 

206 

207 for item in data: 

208 if item["object_type"] == "gene": 

209 genes[item["id"]] = 1 

210 elif item["object_type"] == "translation": 

211 translations[item["id"]] = 1 

212 if item["object_type"] == "transposable_element": 

213 transposons[item["id"]] = 1 

214 

215 stats = { 

216 "ann_genes": genes, 

217 "ann_translations": translations, 

218 "ann_transposable_elements": transposons, 

219 } 

220 self.lengths = {**self.lengths, **stats} 

221 

222 def load_gff3(self) -> None: 

223 """A GFF3 parser is used to retrieve information in the GFF3 file such as 

224 gene and CDS ids and their corresponding lengths. 

225 """ 

226 if "gff3" not in self.manifest_files: 

227 return 

228 logging.info("Manifest contains GFF3 gene annotations") 

229 gff3_path = self.manifest_files["gff3"] 

230 

231 seqs: StatsLengths = {} 

232 genes: StatsLengths = {} 

233 peps: StatsLengths = {} 

234 all_peps: StatsLengths = {} 

235 tes: StatsLengths = {} 

236 

237 with open_gz_file(gff3_path) as gff3_handle: 

238 gff = GFF.parse(gff3_handle) 

239 for seq in gff: 

240 seqs[seq.id] = len(seq.seq) 

241 

242 for feat in seq.features: 

243 feat_length = abs(feat.location.end - feat.location.start) 

244 # Store gene id and length 

245 if feat.type in ["gene", "ncRNA_gene", "pseudogene"]: 

246 self._retrieve_gff_gene_lengths(feat, genes, peps, all_peps) 

247 if feat.type == "transposable_element": 

248 tes[feat.id] = feat_length 

249 

250 stats: dict[str, StatsLengths] = { 

251 "gff3_seq_regions": seqs, 

252 "gff3_genes": genes, 

253 "gff3_translations": peps, 

254 "gff3_all_translations": all_peps, 

255 "gff3_transposable_elements": tes, 

256 } 

257 self.lengths = {**self.lengths, **stats} 

258 

259 def _retrieve_gff_gene_lengths( 

260 self, feat: GFFSeqFeature, genes: StatsLengths, peps: StatsLengths, all_peps: StatsLengths 

261 ) -> None: 

262 """Record genes and peptides lengths from a feature. 

263 

264 Args: 

265 feat : Gene feature to check. 

266 genes: Record of genes lengths to update. 

267 peps: Record of peptides lengths to update. 

268 all_peps: Record of all peptides lengths to update (include pseudogenes). 

269 

270 """ 

271 gene_id = feat.id 

272 gene_id = gene_id.replace("gene:", "") 

273 genes[gene_id] = abs(feat.location.end - feat.location.start) 

274 # Get CDS id and length 

275 protein_transcripts = { 

276 "mRNA", 

277 "pseudogenic_transcript", 

278 } 

279 ig_transcripts = { 

280 "IG_V_gene", 

281 "IG_C_gene", 

282 "TR_C_gene", 

283 "TR_V_gene", 

284 } 

285 cds_transcripts = protein_transcripts.union(ig_transcripts) 

286 for feat2 in feat.sub_features: 

287 if feat2.type not in cds_transcripts: 

288 continue 

289 length: dict[str, int] = {} 

290 for feat3 in feat2.sub_features: 

291 if feat3.type != "CDS": 

292 continue 

293 pep_id = feat3.id 

294 pep_id = pep_id.replace("CDS:", "") 

295 length.setdefault(pep_id, 0) 

296 length[pep_id] += abs(feat3.location.end - feat3.location.start) 

297 for pep_id, pep_length in length.items(): 

298 # Store length for translations, add pseudo translations separately 

299 pep_length = floor(pep_length / 3) - 1 

300 if feat.type != "pseudogene" and feat2.type in protein_transcripts: 

301 peps[pep_id] = pep_length 

302 all_peps[pep_id] = pep_length 

303 

304 def load_agp_seq_regions(self, agp_dict: dict | None) -> None: 

305 """AGP files describe the assembly of larger sequence objects using smaller objects. 

306 

307 E.g. describes the assembly of scaffolds from contigs. 

308 

309 Args: 

310 agp_dict: Dict containing the information about the sequence. 

311 

312 Note: 

313 AGP file is only used in the older builds, not used for current processing. 

314 """ 

315 if not agp_dict: 315 ↛ 317line 315 didn't jump to line 317 because the condition on line 315 was always true

316 return 

317 logging.info("Manifest contains AGP files") 

318 

319 seqr: StatsLengths = {} 

320 for agp_path in agp_dict.values(): 

321 with open(agp_path, "r") as agph: 

322 for line in agph: 

323 ( 

324 asm_id, 

325 _, # asm_start 

326 asm_end, 

327 _, # asm_part 

328 typ, 

329 cmp_id, 

330 _, # cmp_start 

331 cmp_end, 

332 _, # cmp_strand 

333 ) = line.split("\t") 

334 # Ignore WGS contig 

335 if typ != "W": 

336 continue 

337 

338 # Assembled seq length 

339 if asm_id not in seqr or seqr[asm_id] < int(asm_end): 

340 seqr[asm_id] = int(asm_end) 

341 

342 # Composite seq length 

343 if cmp_id not in seqr or seqr[cmp_id] < int(cmp_end): 

344 seqr[cmp_id] = int(cmp_end) 

345 

346 self.lengths["agp"] = seqr 

347 

348 def load_genome(self) -> None: 

349 """Load the genome data.""" 

350 if "genome" not in self.manifest_files: 

351 return 

352 logging.info("Manifest contains genome JSON") 

353 self.genome = get_json(Path(self.manifest_files["genome"])) 

354 

355 def prepare_integrity_data(self) -> None: # pylint: disable=too-many-branches 

356 """Read all the files and keep a record (IDs and their lengths) for each case to be compared later.""" 

357 self.load_gff3() 

358 self.load_dna_fasta_lengths() 

359 self.load_peptides_fasta_lengths() 

360 self.load_seq_regions() 

361 self.load_functional_annotation() 

362 self.load_agp_seq_regions(self.manifest_files.get("agp")) 

363 self.load_genome() 

364 

365 def has_lengths(self, name: str) -> bool: 

366 """Check if a given name has lengths records. 

367 

368 Args: 

369 name: Name for the lengths to check. 

370 

371 Raises: 

372 KeyError: If the name is not supported. 

373 """ 

374 try: 

375 return bool(self.lengths[name]) 

376 except KeyError as err: 

377 raise KeyError(f"There is no length record for {name}") from err 

378 

379 def get_lengths(self, name: str) -> dict[str, Any]: 

380 """Returns a dict associating IDs with their length from a given file name. 

381 

382 Args: 

383 name: Name for the lengths to get. 

384 

385 Raises: 

386 KeyError: If the name is not supported. 

387 """ 

388 try: 

389 return self.lengths[name] 

390 except KeyError as err: 

391 raise KeyError(f"There is no length record for {name}") from err 

392 

393 def get_circular(self, name: str) -> dict[str, Any]: 

394 """Returns a dict associating IDs with their is_circular flag from a given file name. 

395 

396 Args: 

397 name: Name for the circular data to get. 

398 

399 Raises: 

400 KeyError: If the name is not supported. 

401 """ 

402 try: 

403 return self.circular[name] 

404 except KeyError as err: 

405 raise KeyError(f"No length available for key {name}") from err