Coverage for src/python/ensembl/io/genomio/gff3/simplifier.py: 100%
307 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-02-21 15:37 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-02-21 15:37 +0000
1# See the NOTICE file distributed with this work for additional information
2# regarding copyright ownership.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15"""Standardize the gene model representation of a GFF3 file, and extract the functional annotation
16in a separate file.
17"""
19__all__ = [
20 "Records",
21 "GFFSimplifier",
22]
24from importlib.resources import as_file, files
25import json
26import logging
27from os import PathLike
28from pathlib import Path
29import re
30from typing import List, Optional, Set
32from BCBio import GFF
33from Bio.SeqRecord import SeqRecord
35import ensembl.io.genomio.data.gff3
36from ensembl.io.genomio.utils.json_utils import get_json
37from .extract_annotation import FunctionalAnnotations
38from .id_allocator import StableIDAllocator
39from .restructure import restructure_gene, remove_cds_from_pseudogene
40from .exceptions import GeneSegmentError, GFFParserError, IgnoredFeatureError, UnsupportedFeatureError
41from .features import GFFSeqFeature
44class Records(list):
45 """List of GFF3 SeqRecords."""
47 def from_gff(self, in_gff_path: PathLike, excluded: Optional[List[str]] = None) -> None:
48 """Loads records from a GFF3 file.
50 Args:
51 in_gff_path: Input GFF3 file path.
52 excluded: Record IDs to not load from the GFF3 file.
53 """
54 if excluded is None:
55 excluded = []
56 with Path(in_gff_path).open("r") as in_gff_fh:
57 for record in GFF.parse(in_gff_fh):
58 if record.id in excluded:
59 logging.debug(f"Skip seq_region {record.id} - in exclusion list")
60 continue
61 clean_record = SeqRecord(record.seq, id=record.id)
62 clean_record.features = record.features
63 self.append(clean_record)
65 def to_gff(self, out_gff_path: PathLike) -> None:
66 """Writes the current list of records in a GFF3 file.
68 Args:
69 out_gff_path: Path to GFF3 file where to write the records.
70 """
71 with Path(out_gff_path).open("w") as out_gff_fh:
72 GFF.write(self, out_gff_fh)
75class GFFSimplifier:
76 """Parse a GGF3 file and output a cleaned up GFF3 + annotation json file.
78 Raises:
79 GFFParserError: If an error cannot be automatically fixed.
80 """
82 def __init__(
83 self,
84 genome_path: Optional[PathLike] = None,
85 skip_unrecognized: bool = False,
86 allow_pseudogene_with_cds: bool = False,
87 ):
88 """Create an object that simplifies `SeqFeature` objects.
90 Args:
91 genome_path: Genome metadata file.
92 skip_unrecognized: Do not include unknown biotypes instead of raising an exception.
93 allow_pseudogene_with_cds: Keep CDSs under pseudogenes that have them. Delete them otherwise.
95 Raises:
96 GFFParserError: If a biotype is unknown and `skip_unrecognized` is False.
97 """
98 self.skip_unrecognized = skip_unrecognized
99 self.allow_pseudogene_with_cds = allow_pseudogene_with_cds
101 # Load biotypes
102 source = files(ensembl.io.genomio.data.gff3).joinpath("biotypes.json")
103 with as_file(source) as biotypes_json:
104 self._biotypes = get_json(biotypes_json)
106 # Load genome metadata
107 self.genome = {}
108 if genome_path:
109 with Path(genome_path).open("r") as genome_fh:
110 self.genome = json.load(genome_fh)
112 self.refseq = False
113 if self.genome and self.genome["assembly"]["accession"].startswith("GCF"):
114 self.refseq = True
116 # Other preparations
117 self.stable_ids = StableIDAllocator()
118 self.stable_ids.set_prefix(self.genome)
119 self.exclude_seq_regions: List[str] = []
120 self.fail_types: Set = set()
122 # Init the actual data we will store
123 self.records = Records()
124 self.annotations = FunctionalAnnotations(self.get_provider_name())
126 def get_provider_name(self) -> str:
127 """Returns the provider name for this genome.
129 If this information is not available, will try to infer it from the assembly accession. Will
130 return "GenBank" otherwise.
131 """
132 provider_name = "GenBank"
133 if self.genome:
134 try:
135 provider_name = self.genome["assembly"]["provider_name"]
136 except KeyError:
137 if self.genome["assembly"]["accession"].startswith("GCF"):
138 provider_name = "RefSeq"
139 else:
140 logging.warning(f"No genome file, using the default provider_name: {provider_name}")
141 return provider_name
143 def simpler_gff3(self, in_gff_path: PathLike) -> None:
144 """Loads a GFF3 from INSDC and rewrites it in a simpler version, whilst also writing a
145 functional annotation file.
146 """
147 self.records.from_gff(in_gff_path, self.exclude_seq_regions)
148 for record in self.records:
149 cleaned_features = []
150 for feature in record.features:
151 split_genes = self.normalize_mirna(feature)
152 if split_genes:
153 cleaned_features += split_genes
154 else:
155 try:
156 clean_feature = self.simpler_gff3_feature(feature)
157 cleaned_features.append(clean_feature)
158 except (UnsupportedFeatureError, IgnoredFeatureError) as err:
159 logging.debug(err.message)
160 record.features = cleaned_features
162 if self.fail_types:
163 fail_errors = "\n ".join(list(self.fail_types))
164 logging.warning(f"Unrecognized types found:\n {fail_errors}")
165 if not self.skip_unrecognized:
166 raise GFFParserError("Unrecognized types found, abort")
168 def simpler_gff3_feature(self, gene: GFFSeqFeature) -> GFFSeqFeature:
169 """Creates a simpler version of a GFF3 feature.
171 Raises:
172 IgnoredFeatureError: If the feature type is ignored.
173 UnsupportedFeatureError: If the feature type is not supported.
174 """
175 # Special cases
176 non_gene = self.normalize_non_gene(gene)
177 if non_gene:
178 return non_gene
179 if gene.type in self._biotypes["gene"]["ignored"]:
180 raise IgnoredFeatureError(f"Ignored type {gene.type} for {gene.id}")
182 # Synonym
183 if gene.type == "protein_coding_gene":
184 gene.type = "gene"
186 # Lone sub-gene features, create a gene
187 gene = self.create_gene_for_lone_transcript(gene)
188 gene = self.create_gene_for_lone_cds(gene)
190 # What to do with unsupported gene types
191 if gene.type not in self._biotypes["gene"]["supported"]:
192 self.fail_types.add(f"gene={gene.type}")
193 raise UnsupportedFeatureError(f"Unsupported type {gene.type} for {gene.id}")
195 # Normalize and store
196 gene = self.normalize_gene(gene)
197 self.annotations.store_gene(gene)
198 return self.clean_gene(gene)
200 def create_gene_for_lone_transcript(self, feat: GFFSeqFeature) -> GFFSeqFeature:
201 """Returns a gene for lone transcripts: 'gene' for tRNA/rRNA/mRNA, and 'ncRNA_gene' for all others.
203 Args:
204 feat: The transcript for which we want to create a gene.
205 """
206 transcript_types = self._biotypes["transcript"]["supported"]
207 if feat.type not in transcript_types:
208 return feat
210 new_type = "ncRNA_gene"
211 if feat.type in ("tRNA", "rRNA", "mRNA"):
212 new_type = "gene"
213 logging.debug(f"Put the transcript {feat.type} in a {new_type} parent feature")
214 new_gene = GFFSeqFeature(feat.location, type=new_type)
215 new_gene.qualifiers["source"] = feat.qualifiers["source"]
216 new_gene.sub_features = [feat]
218 # Use the transcript ID for the gene, and generate a sub ID for the transcript
219 new_gene.id = feat.id
220 new_gene.qualifiers["ID"] = new_gene.id
221 feat.id = self.stable_ids.generate_transcript_id(new_gene.id, 1)
222 feat.qualifiers["ID"] = feat.id
224 # Remove the exon/CDS parent so it is properly updated
225 for subfeat in feat.sub_features:
226 del subfeat.qualifiers["Parent"]
228 # Check if it's a pseudogene
229 if feat.type == "mRNA":
230 is_pseudo = False
231 for subfeat in feat.sub_features:
232 pseudo_qual = subfeat.qualifiers.get("pseudo", [""])[0]
233 if subfeat.type == "CDS" and pseudo_qual == "true":
234 is_pseudo = True
235 del subfeat.qualifiers["pseudo"]
236 break
237 if is_pseudo:
238 new_gene.type = "pseudogene"
240 return new_gene
242 def create_gene_for_lone_cds(self, feat: GFFSeqFeature) -> GFFSeqFeature:
243 """Returns a gene created for a lone CDS.
245 Args:
246 feat: The CDS for which we want to create a gene.
247 """
248 if feat.type != "CDS":
249 return feat
251 logging.debug(f"Put the lone CDS in gene-mRNA parent features for {feat.id}")
253 # Create a transcript, add the CDS
254 transcript = GFFSeqFeature(feat.location, type="mRNA")
255 transcript.qualifiers["source"] = feat.qualifiers["source"]
256 transcript.sub_features = [feat]
258 # Add an exon too
259 exon = GFFSeqFeature(feat.location, type="exon")
260 exon.qualifiers["source"] = feat.qualifiers["source"]
261 transcript.sub_features.append(exon)
263 # Create a gene, add the transcript
264 gene_type = "gene"
265 if ("pseudo" in feat.qualifiers) and (feat.qualifiers["pseudo"][0] == "true"):
266 gene_type = "pseudogene"
267 del feat.qualifiers["pseudo"]
268 new_gene = GFFSeqFeature(feat.location, type=gene_type)
269 new_gene.qualifiers["source"] = feat.qualifiers["source"]
270 new_gene.sub_features = [transcript]
271 new_gene.id = self.stable_ids.generate_gene_id()
272 new_gene.qualifiers["ID"] = new_gene.id
273 transcript.id = self.stable_ids.generate_transcript_id(new_gene.id, 1)
274 transcript.qualifiers["ID"] = transcript.id
276 return new_gene
278 def normalize_non_gene(self, feat: GFFSeqFeature) -> Optional[GFFSeqFeature]:
279 """Returns a normalised "non-gene" or `None` if not applicable.
281 Only transposable elements supported at the moment.
283 Args:
284 feat: Feature to normalise.
286 Raises:
287 NotImplementedError: If the feature is a not supported non-gene.
288 """
290 if feat.type not in self._biotypes["non_gene"]["supported"]:
291 return None
292 if feat.type in ("mobile_genetic_element", "transposable_element"):
293 feat.type = "transposable_element"
294 feat = self._normalize_mobile_genetic_element(feat)
295 # Generate ID if needed
296 feat.id = self.stable_ids.normalize_gene_id(feat, self.refseq)
297 feat.qualifiers["ID"] = feat.id
299 self.annotations.add_feature(feat, "transposable_element")
300 return self.clean_gene(feat)
301 # This is a failsafe in case you add supported non-genes
302 raise NotImplementedError(f"Unsupported non-gene: {feat.type} for {feat.id}")
304 def _normalize_mobile_genetic_element(self, feat: GFFSeqFeature) -> GFFSeqFeature:
305 """Normalize a mobile element if it has a mobile_element_type field."""
306 try:
307 mobile_element_type = feat.qualifiers["mobile_element_type"]
308 except KeyError:
309 logging.warning("No 'mobile_element_type' tag found")
310 return feat
312 # Get the type (and name) from the attrib
313 element_type, _, element_name = mobile_element_type[0].partition(":")
314 description = element_type
315 if element_name:
316 description += f" ({element_name})"
318 # Keep the metadata in the description if the type is known
319 if element_type in ("transposon", "retrotransposon"):
320 if not feat.qualifiers.get("product"):
321 feat.qualifiers["product"] = [description]
322 return feat
323 raise GFFParserError(f"'mobile_element_type' is not a transposon: {element_type}")
325 def clean_gene(self, gene: GFFSeqFeature) -> GFFSeqFeature:
326 """Return the same gene without qualifiers unrelated to the gene structure."""
328 old_gene_qualifiers = gene.qualifiers
329 gene.qualifiers = {"ID": gene.id, "source": old_gene_qualifiers["source"]}
330 for transcript in gene.sub_features:
331 # Replace qualifiers
332 old_transcript_qualifiers = transcript.qualifiers
333 transcript.qualifiers = {
334 "ID": transcript.id,
335 "Parent": gene.id,
336 "source": old_transcript_qualifiers["source"],
337 }
339 for feat in transcript.sub_features:
340 old_qualifiers = feat.qualifiers
341 feat.qualifiers = {
342 "ID": feat.id,
343 "Parent": transcript.id,
344 "source": old_qualifiers["source"],
345 }
346 if feat.type == "CDS":
347 feat.qualifiers["phase"] = old_qualifiers["phase"]
349 return gene
351 def normalize_gene(self, gene: GFFSeqFeature) -> GFFSeqFeature:
352 """Returns a normalized gene structure, separate from the functional elements.
354 Args:
355 gene: Gene object to normalize.
356 functional_annotation: List of feature annotations (appended by this method).
358 """
360 gene.id = self.stable_ids.normalize_gene_id(gene, refseq=self.refseq)
361 restructure_gene(gene)
362 self.normalize_transcripts(gene)
363 self.normalize_pseudogene(gene)
365 return gene
367 def normalize_pseudogene(self, gene: GFFSeqFeature) -> None:
368 """Normalize CDSs if allowed, otherwise remove them."""
369 if gene.type != "pseudogene":
370 return
372 if self.allow_pseudogene_with_cds:
373 self.stable_ids.normalize_pseudogene_cds_id(gene)
374 else:
375 remove_cds_from_pseudogene(gene)
377 def normalize_transcripts(self, gene: GFFSeqFeature) -> None:
378 """Normalizes a transcript."""
380 allowed_transcript_types = self._biotypes["transcript"]["supported"]
381 ignored_transcript_types = self._biotypes["transcript"]["ignored"]
383 transcripts_to_delete = []
384 for count, transcript in enumerate(gene.sub_features):
385 if (
386 transcript.type not in allowed_transcript_types
387 and transcript.type not in ignored_transcript_types
388 ):
389 self.fail_types.add(f"transcript={transcript.type}")
390 logging.warning(
391 f"Unrecognized transcript type: {transcript.type}" f" for {transcript.id} ({gene.id})"
392 )
393 transcripts_to_delete.append(count)
394 continue
396 # New transcript ID
397 transcript_number = count + 1
398 transcript.id = self.stable_ids.generate_transcript_id(gene.id, transcript_number)
400 transcript = self.format_gene_segments(transcript)
402 # EXONS AND CDS
403 transcript = self._normalize_transcript_subfeatures(gene, transcript)
405 if transcripts_to_delete:
406 for elt in sorted(transcripts_to_delete, reverse=True):
407 gene.sub_features.pop(elt)
409 def format_gene_segments(self, transcript: GFFSeqFeature) -> GFFSeqFeature:
410 """Returns the equivalent Ensembl biotype feature for gene segment transcript features.
412 Supported features: "C_gene_segment" and "V_gene_segment".
414 Args:
415 transcript: Gene segment transcript feature.
417 Raises:
418 GeneSegmentError: Unable to get the segment type information from the feature.
419 """
420 if transcript.type not in ("C_gene_segment", "V_gene_segment"):
421 return transcript
423 # Guess the segment type from the transcript attribs
424 seg_type = self._get_segment_type(transcript)
425 if not seg_type:
426 # Get the information from a CDS instead
427 sub_feats: List[GFFSeqFeature] = transcript.sub_features
428 cdss: List[GFFSeqFeature] = list(filter(lambda x: x.type == "CDS", sub_feats))
429 if cdss:
430 seg_type = self._get_segment_type(cdss[0])
431 if not seg_type:
432 raise GeneSegmentError(f"Unable to infer segment from {transcript.id}")
434 # Change V/C_gene_segment into a its corresponding transcript names
435 transcript.type = f"{seg_type}_{transcript.type.replace('_segment', '')}"
436 return transcript
438 def _get_segment_type(self, feature: GFFSeqFeature) -> str:
439 """Infer if a segment is "IG" (immunoglobulin) of "TR" (t-cell) from the feature attribs.
441 Returns an empty string if no segment type info was found.
442 """
444 product = feature.qualifiers.get("standard_name", [""])[0]
445 if not product:
446 product = feature.qualifiers.get("product", [""])[0]
447 if not product:
448 return ""
450 if re.search(r"\b(immunoglobulin|ig)\b", product, flags=re.IGNORECASE):
451 return "IG"
452 if re.search(r"\bt[- _]cell\b", product, flags=re.IGNORECASE):
453 return "TR"
454 return ""
456 def _normalize_transcript_subfeatures(
457 self, gene: GFFSeqFeature, transcript: GFFSeqFeature
458 ) -> GFFSeqFeature:
459 """Returns a transcript with normalized sub-features."""
460 exons_to_delete = []
461 exon_number = 1
462 for tcount, feat in enumerate(transcript.sub_features):
463 if feat.type == "exon":
464 # New exon ID
465 feat.id = f"{transcript.id}-E{exon_number}"
466 exon_number += 1
467 # Replace qualifiers
468 old_exon_qualifiers = feat.qualifiers
469 feat.qualifiers = {"Parent": transcript.id}
470 feat.qualifiers["source"] = old_exon_qualifiers["source"]
471 elif feat.type == "CDS":
472 # New CDS ID
473 feat.id = self.stable_ids.normalize_cds_id(feat.id)
474 if feat.id in ("", gene.id, transcript.id):
475 feat.id = f"{transcript.id}_cds"
476 else:
477 if feat.type in self._biotypes["transcript"]["ignored"]:
478 exons_to_delete.append(tcount)
479 continue
481 self.fail_types.add(f"sub_transcript={feat.type}")
482 logging.warning(
483 f"Unrecognized exon type for {feat.type}: {feat.id}"
484 f" (for transcript {transcript.id} of type {transcript.type})"
485 )
486 exons_to_delete.append(tcount)
487 continue
489 if exons_to_delete:
490 for elt in sorted(exons_to_delete, reverse=True):
491 transcript.sub_features.pop(elt)
492 return transcript
494 def normalize_mirna(self, gene: GFFSeqFeature) -> List[GFFSeqFeature]:
495 """Returns gene representations from a miRNA gene that can be loaded in an Ensembl database.
497 Change the representation from the form `gene[ primary_transcript[ exon, miRNA[ exon ] ] ]`
498 to `ncRNA_gene[ miRNA_primary_transcript[ exon ] ]` and `gene[ miRNA[ exon ] ]`
500 Raises:
501 GFFParserError: If gene has more than 1 transcript, the transcript was not formatted
502 correctly or there are unknown sub-features.
503 """
504 base_id = gene.id
505 transcripts = gene.sub_features
507 # Insert main gene first if needed
508 old_gene = gene
509 if gene.type == "primary_transcript":
510 primary = old_gene
511 gene = GFFSeqFeature(primary.location, type="gene")
512 gene.sub_features = [primary]
513 gene.qualifiers = primary.qualifiers
514 transcripts = gene.sub_features
515 gene.id = f"{base_id}_0"
516 gene.qualifiers["ID"] = gene.id
518 if (len(transcripts) == 0) or (transcripts[0].type != "primary_transcript"):
519 return []
520 if len(transcripts) > 1:
521 raise GFFParserError(f"Gene has too many sub_features for miRNA {gene.id}")
523 # Passed the checks
524 primary = transcripts[0]
525 primary.type = "miRNA_primary_transcript"
526 gene.type = "ncRNA_gene"
527 logging.debug(f"Formatting miRNA gene {gene.id}")
529 new_genes = []
530 new_primary_subfeatures = []
531 num = 1
532 for sub in primary.sub_features:
533 if sub.type == "exon":
534 new_primary_subfeatures.append(sub)
535 elif sub.type == "miRNA":
536 new_gene_id = f"{base_id}_{num}"
537 num += 1
538 new_gene = GFFSeqFeature(sub.location, type="gene", id=new_gene_id)
539 new_gene.qualifiers = {"source": sub.qualifiers["source"], "ID": new_gene_id}
540 new_gene.sub_features = [sub]
541 new_genes.append(new_gene)
542 else:
543 raise GFFParserError(f"Unknown subtypes for miRNA features: {sub.id}")
544 primary.sub_features = new_primary_subfeatures
546 if not new_genes:
547 logging.debug(f"Primary_transcript without miRNA in {gene.id}")
548 all_genes = [gene]
549 else:
550 all_genes = [gene] + new_genes
552 # Normalize like other genes
553 all_genes_cleaned = []
554 for new_gene in all_genes:
555 new_gene = self.normalize_gene(new_gene)
556 self.annotations.store_gene(new_gene)
557 all_genes_cleaned.append(self.clean_gene(new_gene))
558 return all_genes_cleaned