Coverage for src/python/ensembl/io/genomio/manifest/check_integrity.py: 11%
173 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-02-21 15:37 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-02-21 15:37 +0000
1# See the NOTICE file distributed with this work for additional information
2# regarding copyright ownership.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15"""Compare the genomic data in a DNA FASTA file, seq_region JSON, gene models GFF3 and peptide FASTA
16to ensure their contents are in sync.
17"""
19__all__ = ["IntegrityTool"]
21import logging
22from pathlib import Path
23import re
24from typing import Any
26import ensembl.io.genomio
27from ensembl.io.genomio.manifest.manifest_stats import InvalidIntegrityError, ManifestStats
28from ensembl.utils.argparse import ArgumentParser
29from ensembl.utils.logging import init_logging_with_args
32class IntegrityTool:
33 """Check the integrity of sequence and annotation files in the genome"""
35 def __init__(
36 self,
37 manifest_file: Path,
38 ignore_final_stops: bool = False,
39 no_fail: bool = False,
40 ) -> None:
41 self.manifest = ManifestStats(manifest_file)
42 self.ignore_final_stops = False
43 self.set_ignore_final_stops(ignore_final_stops)
44 self.errors: list[str] = []
45 self.no_fail = no_fail
47 def add_errors(self, errors: list[str] | str) -> None:
48 """Store the given errors (list or single string) in the list of all errors."""
49 if isinstance(errors, str):
50 self.errors.append(errors)
51 else:
52 self.errors += errors
54 def check_integrity(self) -> None:
55 """Load files listed in the manifest.json and check the integrity.
56 Check if the files are correct by verifying the MD5 hash.
57 Check if translation, functional annotation and sequence region ids
58 and lengths are consistent with the information in gff.
59 Compare sequence length from fasta_dna file to seq_region.json metadata.
60 """
62 # Load the manifest integrity counts
63 manifest = self.manifest
64 manifest.prepare_integrity_data()
66 genome = manifest.genome
68 dna = manifest.get_lengths("dna_sequences")
69 pep = manifest.get_lengths("peptide_sequences")
70 seq_lengths = manifest.get_lengths("seq_regions")
71 seq_circular = manifest.get_circular("seq_regions")
73 agp_seqr = manifest.get_lengths("agp")
75 # Then, run the checks
76 self._check_genome(genome)
78 if self.manifest.errors:
79 errors_str = "\n".join(self.manifest.errors)
80 raise InvalidIntegrityError(f"Manifest files parsing failed:\n{errors_str}")
82 # Check gff3
83 if manifest.has_lengths("gff3_genes"):
84 gff_genes = manifest.get_lengths("gff3_genes")
85 gff_seq_regions = manifest.get_lengths("gff3_seq_regions")
86 gff_translations = manifest.get_lengths("gff3_translations")
87 gff_all_translations = manifest.get_lengths("gff3_all_translations")
88 gff_transposable_elements = manifest.get_lengths("gff3_transposable_elements")
90 ann_genes = manifest.get_lengths("ann_genes")
91 ann_translations = manifest.get_lengths("ann_translations")
92 ann_transposable_elements = manifest.get_lengths("ann_transposable_elements")
94 # Check fasta_pep.fa integrity
95 # The sequence length and id retrieved from the fasta_pep file
96 # and compared to the translated CDS id and length in the gff
97 # We do not compare the peptide lengths because of sequence edits
98 if pep:
99 tr_errors = self.check_lengths(
100 pep, gff_translations, "Fasta translations vs gff", special_diff=True
101 )
102 if len(tr_errors) > 0:
103 # The pseudo CDSs are included in this check
104 # Pseudo CDSs are not translated, if the pseudo translation ids are not ignored
105 # in the gff it will give an error
106 tr_errors_all = self.check_lengths(
107 pep,
108 gff_all_translations,
109 "Fasta translations vs gff (include pseudo CDS)",
110 special_diff=True,
111 )
112 if tr_errors_all:
113 self.add_errors(tr_errors)
114 self.add_errors(tr_errors_all)
116 # Check functional_annotation.json integrity
117 # Gene ids, translated CDS ids and translated CDSs
118 # including pseudogenes are compared to the gff
119 if ann_genes:
120 self.add_errors(self.check_ids(ann_genes, gff_genes, "Gene ids metadata vs gff"))
121 tr_id_errors = self.check_ids(
122 ann_translations, gff_translations, "Translation ids metadata vs gff"
123 )
124 if tr_id_errors:
125 tr_id_errors_all = self.check_ids(
126 ann_translations,
127 gff_all_translations,
128 "Translation ids metadata vs gff (include pseudo CDS)",
129 )
130 if tr_id_errors_all:
131 self.add_errors(tr_id_errors)
132 self.add_errors(tr_id_errors_all)
133 self.add_errors(
134 self.check_ids(
135 ann_transposable_elements,
136 gff_transposable_elements,
137 "TE ids metadata vs gff",
138 )
139 )
141 self.check_seq_region_lengths(
142 seq_lengths, gff_seq_regions, "seq_regions JSON vs GFF3 lengths", seq_circular
143 )
145 self.check_seq_region_lengths(seq_lengths, dna, "seq_regions JSON vs DNA lengths")
146 self.check_seq_region_lengths(seq_lengths, agp_seqr, "seq_regions JSON vs AGPs lengths")
148 if self.errors:
149 errors_str = "\n".join(self.errors)
150 message = f"Integrity test failed:\n{errors_str}"
151 if self.no_fail:
152 print(message)
153 else:
154 raise InvalidIntegrityError(message)
156 def set_ignore_final_stops(self, ignore_final_stops: bool) -> None:
157 """Set ignore_final_stops (when calculating peptide length) for this tool and the manifest."""
158 self.ignore_final_stops = ignore_final_stops
159 self.manifest.ignore_final_stops = ignore_final_stops
161 def _check_genome(self, genome: dict[str, Any]) -> None:
162 """Check if the accession is correct in genome.json."""
163 genome_accession = genome.get("assembly", {}).get("accession", "")
164 if not genome_accession:
165 return
166 if not re.match(r"GC[AF]_\d{9}(\.\d+)?", genome_accession):
167 self.add_errors(f"Genome assembly accession is wrong: '{genome_accession}'")
169 def check_ids(self, list1: dict[str, Any], list2: dict[str, Any], name: str) -> list[str]:
170 """Compare the ids in list1 and list2.
172 Args:
173 list1: Sequence IDs retrieved from `functional.json`.
174 list2: Sequence IDs retrieved from the GFF3 file.
175 name: Source name.
177 Return:
178 List of message errors of sequence IDs found only in one of the lists provided.
179 """
181 only1 = []
182 only2 = []
183 common = []
185 for item_id in list1:
186 if item_id in list2:
187 common.append(item_id)
188 else:
189 only1.append(item_id)
190 for item_id in list2:
191 if item_id not in common:
192 only2.append(item_id)
194 errors = []
195 if common:
196 logging.info(f"{len(common)} common elements in {name}")
197 if only1:
198 errors.append(f"{len(only1)} only in first list in {name} (first: {only1[0]})")
199 logging.debug(f"{len(only1)} only in first list in {name}")
200 if only2:
201 errors.append(f"{len(only2)} only in second list in {name} (first: {only2[0]})")
202 logging.debug(f"{len(only1)} only in second list in {name}")
204 return errors
206 def check_lengths(
207 self,
208 list1: dict[str, int],
209 list2: dict[str, int],
210 name: str,
211 *,
212 allowed_len_diff: int | None = None,
213 special_diff: bool = False,
214 ) -> list[str]:
215 """Check the difference in ids and length between list1 and list2.
216 There are a few special cases here where we allow a certain asymmetry
217 by changing the values of the arguments.
219 Args:
220 list1: dict containing length and id of the sequence from fasta files.
221 list2: dict containing length and id in the retrieved from the gff.
222 name: string
224 allowed_len_diff : None to to not accept differences in length between list1 and list2.
225 The value can be changed based on how much difference in sequence length we are wanting to accept.
227 special_diff: set as False when no special length difference is expected between the lists.
228 This can be changed if we want to report common sequences with 1 BP difference.
230 Returns:
231 Error if there is a difference in length or ids between the lists.
232 """
234 # check list differences, checks if abs(values diff) < allowed_len_diff
236 set1 = frozenset(list1)
237 set2 = frozenset(list2)
238 list1_2 = list(set1 - set2)
239 list2_1 = list(set2 - set1)
241 errors = []
242 if len(list1_2) > 0:
243 errors.append(f"{name}: {len(list1_2)} from the first list only (i.e. {list1_2[0]})")
244 if len(list2_1) > 0:
245 errors.append(f"{name}: {len(list2_1)} from the second list only (i.e. {list2_1[0]})")
247 common_len = 0
248 if allowed_len_diff is None:
249 common_len = len(set1 & set2)
250 else:
251 # check for the sequence length difference
252 diff_len_list: list[str] = []
253 diff_len_special_list: list[str] = []
254 for e in set1 & set2:
255 dl12 = list1[e] - list2[e]
256 if abs(dl12) <= allowed_len_diff:
257 common_len += 1
258 else:
259 _dlist = diff_len_list
260 # Special case: 1 AA /BP shorter,
261 # so assuming the stop codon is not included in the CDS (when it should be)
262 if dl12 == 1 and special_diff:
263 _dlist = diff_len_special_list
264 _dlist.append(f"{e}: {list1[e]}, {list2[e]}")
265 if diff_len_special_list:
266 errors.append(
267 (
268 f"{len(diff_len_special_list)} common elements with one BP/AA length diff for {name}"
269 f"(e.g. {diff_len_special_list[0]})"
270 )
271 )
272 if diff_len_list:
273 errors.append(
274 (
275 f"{len(diff_len_list)} common elements with length diff for {name}"
276 f"(e.g. {diff_len_list[0]})"
277 )
278 )
279 if common_len > 0:
280 logging.warning(f"{common_len} common elements between lists for {name}")
282 return errors
284 def check_seq_region_lengths(
285 self,
286 seqrs: dict[str, Any] | None,
287 feats: dict[str, Any] | None,
288 name: str,
289 circular: dict[str, Any] | None = None,
290 ) -> None:
291 """Check the integrity of seq_region.json file by comparing the length of the sequence
292 to fasta files and the gff.
294 Seq_region file is in json format containing the metadata of the sequence.
295 It contains sequence id, length, location and the synonyms for the sequence name
296 from different sources.
298 Args:
299 seqs: Sequence name and length retrieved from seq_region.json file.
300 feats: Sequence name and length retrieved from the fasta and gff file.
301 name: Name of the check to show in the logs.
302 circular: Whether any sequence is circular.
304 Returns:
305 Error if there are common sequences with difference in ids
306 and if the sequences are not consistent in the files.
307 """
308 if not seqrs or not feats:
309 return
310 comp = self._compare_seqs(seqrs, feats, circular)
312 common = comp["common"]
313 diff = comp["diff"]
314 diff_circular = comp["diff_circular"]
315 only_seqr = comp["only_seqr"]
316 only_feat = comp["only_feat"]
318 if common:
319 logging.info(f"{len(common)} common elements in {name}")
320 if diff_circular:
321 example = diff_circular[0]
322 logging.info(f"{len(diff_circular)} differences for circular elements in {name} (e.g. {example})")
323 if diff:
324 self.add_errors(f"{len(diff)} common elements with higher length in {name} (e.g. {diff[0]})")
325 if only_seqr:
326 # Not an error!
327 logging.info(f"{len(only_seqr)} only in seq_region list in {name} (first: {only_seqr[0]})")
328 if only_feat:
329 self.add_errors(f"{len(only_feat)} only in second list in {name} (first: {only_feat[0]})")
331 def _compare_seqs(
332 self, seqrs: dict[str, Any], feats: dict[str, Any], circular: dict[str, Any] | None = None
333 ) -> dict[str, list[str]]:
334 """Give the intersection and other comparison between two groups of sequences.
336 Args:
337 seqs: Sequence name and length retrieved from seq_region.json file.
338 feats: Sequence name and length retrieved from the fasta and gff file.
339 circular: Whether any sequence is circular.
341 Returns: Dict with 5 stats:
342 common: Common elements.
343 only_seqr: Elements only in the first one.
344 only_feat: Elements only in the second one.
345 diff: Elements that differ.
346 diff_circular: Elements that differ in a circular sequence.
348 """
349 comp: dict[str, list[str]] = {
350 "common": [],
351 "only_seqr": [],
352 "only_feat": [],
353 "diff": [],
354 "diff_circular": [],
355 }
357 for seq_id in seqrs:
358 if seq_id in feats:
359 # Check that feature is within the seq_region length
360 if feats[seq_id] > seqrs[seq_id]:
361 diff_str = f"{seq_id}: {seqrs[seq_id]} vs {feats[seq_id]}"
362 if circular and circular.get(seq_id, False):
363 comp["diff_circular"].append(diff_str)
364 else:
365 comp["diff"].append(diff_str)
366 else:
367 comp["common"].append(seq_id)
368 else:
369 comp["only_seqr"].append(seq_id)
371 for seq_id in feats:
372 if (
373 seq_id not in comp["common"]
374 and seq_id not in comp["diff"]
375 and seq_id not in comp["diff_circular"]
376 and seq_id not in seqrs
377 ):
378 comp["only_feat"].append(seq_id)
380 return comp
383def main() -> None:
384 """Main entrypoint."""
385 parser = ArgumentParser(description=__doc__)
386 parser.add_argument_src_path("--manifest_file", required=True, help="Manifest file for the data to check")
387 parser.add_argument(
388 "--ignore_final_stops", action="store_true", help="Ignore final stop when calculating peptide length"
389 )
390 parser.add_argument(
391 "--no_fail", action="store_true", help="In case of errors, don't fail but print errors to stdout."
392 )
393 parser.add_argument("--version", action="version", version=ensembl.io.genomio.__version__)
394 parser.add_log_arguments(add_log_file=True)
395 args = parser.parse_args()
396 init_logging_with_args(args)
398 inspector = IntegrityTool(args.manifest_file, args.ignore_final_stops, args.no_fail)
399 inspector.check_integrity()
402if __name__ == "__main__":
403 main()