Coverage for src/python/ensembl/io/genomio/gff3/extract_annotation.py: 96%
149 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-02-21 15:37 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-02-21 15:37 +0000
1# See the NOTICE file distributed with this work for additional information
2# regarding copyright ownership.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15"""Simple representation of gene features functional annotation extracted from a GFF3 file."""
17__all__ = [
18 "Annotation",
19 "DuplicateIdError",
20 "MissingParentError",
21 "AnnotationError",
22 "FunctionalAnnotations",
23]
25from os import PathLike
26import logging
27from pathlib import Path
28import re
29from typing import Any, Dict, List, Optional
31from ensembl.io.genomio.utils.json_utils import print_json
32from .features import GFFSeqFeature
35Annotation = Dict[str, Any]
37_PARENTS = {
38 "transcript": "gene",
39 "translation": "transcript",
40}
43class DuplicateIdError(Exception):
44 """Trying to add a feature with an ID already in use."""
47class MissingParentError(Exception):
48 """Trying to add a feature without an expected parent."""
51class AnnotationError(Exception):
52 """If anything wrong happens when recording annotations."""
55class FunctionalAnnotations:
56 """List of annotations extracted from a GFF3 file."""
58 ignored_xrefs = {"go", "interpro", "uniprot"}
60 def __init__(self, provider_name: str = "") -> None:
61 self.annotations: List[Annotation] = []
62 self.provider_name = provider_name
63 # Annotated features
64 # Under each feature, each dict's key is a feature ID
65 self.features: Dict[str, Dict[str, Annotation]] = {
66 "gene": {},
67 "transcript": {},
68 "translation": {},
69 "transposable_element": {},
70 }
71 # Keep parent info: key is the feature ID, value is the parent ID
72 self.parents: Dict[str, Dict[str, str]] = {
73 "gene": {},
74 "transcript": {},
75 }
77 def get_xrefs(self, feature: GFFSeqFeature) -> List[Dict[str, Any]]:
78 """Get the xrefs from the Dbxref field."""
79 all_xref: List[Dict[str, str]] = []
81 if "Dbxref" in feature.qualifiers:
82 for xref in feature.qualifiers["Dbxref"]:
83 dbname, name = xref.split(":", maxsplit=1)
84 if dbname == "GenBank" and self.provider_name == "RefSeq":
85 dbname = "RefSeq"
87 if dbname.lower() in self.ignored_xrefs:
88 continue
90 xrefs = {"dbname": dbname, "id": name}
91 all_xref.append(xrefs)
93 # Add RefSeq ID xref if it looks like one
94 if self.provider_name == "RefSeq":
95 if feature.type == "gene" and feature.id.startswith("LOC"):
96 xref_dbs = {x["dbname"] for x in all_xref}
97 if "RefSeq" not in xref_dbs:
98 all_xref.append({"dbname": "RefSeq", "id": feature.id})
100 return all_xref
102 def get_features(self, feat_type: str) -> Dict[str, Annotation]:
103 """Get all feature annotations for the requested type."""
104 try:
105 return self.features[feat_type]
106 except KeyError as err:
107 raise KeyError(f"No such feature type {feat_type}") from err
109 def add_parent_link(self, parent_type: str, parent_id: str, child_id: str) -> None:
110 """Record a parent-child IDs relationship for a given parent biotype."""
111 features = self.get_features(parent_type)
112 if parent_id not in features:
113 raise MissingParentError(f"Parent {parent_type}:{parent_id} not found for {child_id}")
114 self.parents[parent_type][child_id] = parent_id
116 def get_parent(self, parent_type: str, child_id: str) -> str:
117 """Returns the parent ID of a given child for a given parent biotype."""
118 try:
119 parents = self.parents[parent_type]
120 except KeyError as err:
121 raise KeyError(f"Unsupported parent type {parent_type}") from err
123 parent_id = parents.get(child_id)
124 if parent_id is None:
125 raise MissingParentError(f"Can't find {parent_type} parent for {child_id}")
126 return parent_id
128 def add_feature(
129 self,
130 feature: GFFSeqFeature,
131 feat_type: str,
132 parent_id: Optional[str] = None,
133 all_parent_ids: Optional[List[str]] = None,
134 ) -> None:
135 """Add annotation for a feature of a given type. If a parent_id is provided, record the relationship.
137 Args:
138 feature: The feature to create an annotation.
139 feat_type: Type of the feature to annotate.
140 parent_id: Parent ID of this feature to keep it linked.
141 all_parent_ids: All parent IDs to remove from non-informative descriptions.
142 """
143 if all_parent_ids is None:
144 all_parent_ids = []
145 features = self.get_features(feat_type)
146 if feature.id in features:
147 raise AnnotationError(f"Feature {feat_type} ID {feature.id} already added")
149 feature_object = self._generic_feature(feature, feat_type, all_parent_ids)
150 self.features[feat_type][feature.id] = feature_object
152 if parent_id:
153 if feat_type in _PARENTS:
154 parent_type = _PARENTS[feat_type]
155 self.add_parent_link(parent_type, parent_id, feature.id)
156 else:
157 raise AnnotationError(f"No parent possible for {feat_type} {feature.id}")
159 def _generic_feature(
160 self, feature: GFFSeqFeature, feat_type: str, parent_ids: Optional[List[str]] = None
161 ) -> Dict[str, Any]:
162 """Create a feature object following the specifications.
164 Args:
165 feature: The GFFSeqFeature to add to the list.
166 feat_type: Feature type of the feature to store (e.g. gene, transcript, translation).
167 all_parent_ids: All parent IDs to remove from non-informative descriptions.
169 """
170 if parent_ids is None: 170 ↛ 171line 170 didn't jump to line 171 because the condition on line 170 was never true
171 parent_ids = []
173 feature_object: Annotation = {"object_type": feat_type, "id": feature.id}
175 # Description?
176 for qname in ("description", "product"):
177 if qname in feature.qualifiers:
178 description = feature.qualifiers[qname][0]
179 if self.product_is_informative(description, feat_ids=parent_ids + [feature.id]):
180 feature_object["description"] = description
181 break
182 logging.debug(f"Non informative description for {feature.id}: {description}")
184 feature_object["xrefs"] = []
185 if "Dbxref" in feature.qualifiers: 185 ↛ 186line 185 didn't jump to line 186 because the condition on line 185 was never true
186 all_xref = self.get_xrefs(feature)
187 feature_object["xrefs"] = all_xref
189 xref_values = {xref["id"].lower() for xref in feature_object["xrefs"]}
191 # Synonyms?
192 # We add synonyms to the external_synonym table
193 # which is associated with the first xref of that feature type
194 if "Name" in feature.qualifiers:
195 feat_name = feature.qualifiers["Name"][0]
196 if feat_name.lower() != feature.id.lower() and feat_name.lower() not in xref_values:
197 feature_object["synonyms"] = [feat_name]
199 # is_pseudogene?
200 if feature.type.startswith("pseudogen"):
201 feature_object["is_pseudogene"] = True
203 # Don't keep empty xref
204 if not feature_object["xrefs"]: 204 ↛ 206line 204 didn't jump to line 206 because the condition on line 204 was always true
205 del feature_object["xrefs"]
206 return feature_object
208 def transfer_descriptions(self) -> None:
209 """Transfers the feature descriptions in 2 steps:
210 - from translations to transcripts (if the transcript description is empty)
211 - from transcripts to genes (same case)
213 """
214 self._transfer_description_up("translation")
215 self._transfer_description_up("transcript")
217 def _transfer_description_up(self, child_feature: str) -> None:
218 """Transfer descriptions from all feature of a given type, up to their parent.
220 Args:
221 child_feature: Either "translation" (transfer to transcript) or "transcript" (to gene).
223 """
224 children_features = self.get_features(child_feature)
225 parent_type = _PARENTS[child_feature]
226 parent_features = self.get_features(parent_type)
228 # Transfer description from children to their parent
229 for child_id, child in children_features.items():
230 child_description = child.get("description")
231 if child_description is not None:
232 child_description = self._clean_description(child_description)
233 # Check parent
234 parent_id = self.get_parent(parent_type, child_id)
235 parent = parent_features[parent_id]
236 parent_description = parent.get("description")
237 if parent_description is None:
238 parent["description"] = child_description
240 @staticmethod
241 def _clean_description(description: str) -> str:
242 """Returns the description without "transcript variant" information."""
243 variant_re = re.compile(r", transcript variant [A-Z][0-9]+$", re.IGNORECASE)
244 description = re.sub(variant_re, "", description)
245 return description
247 @staticmethod
248 def product_is_informative(product: str, feat_ids: Optional[List[str]] = None) -> bool:
249 """Returns True if the product name contains informative words, False otherwise.
251 It is considered uninformative when the description contains words such as "hypothetical" or
252 or "putative". If feature IDs are provided, consider it uninformative as well (we do not want
253 descriptions to be just the ID).
255 Args:
256 product: A product name.
257 feat_ids: List of feature IDs.
259 """
260 non_informative_words = [
261 "hypothetical",
262 "putative",
263 "uncharacterized",
264 "unspecified",
265 "unknown",
266 r"(of )?unknown function",
267 "conserved",
268 "predicted",
269 "fragment",
270 "product",
271 "function",
272 "protein",
273 "transcript",
274 "gene",
275 "RNA",
276 r"(variant|isoform)( X?\d+)?",
277 r"low quality protein",
278 ]
279 non_informative_re = re.compile(r"|".join(non_informative_words), re.IGNORECASE)
281 # Remove all IDs that are in the description
282 if feat_ids:
283 logging.debug(f"Filter out {feat_ids} from {product}")
284 try:
285 for feat_id in feat_ids:
286 feat_id_re = re.compile(feat_id, re.IGNORECASE)
287 product = re.sub(feat_id_re, "", product)
288 except TypeError as err:
289 raise TypeError(f"Failed to search {feat_id_re} in '{product}'") from err
291 # Remove punctuations
292 punct_re = re.compile(r"[,;: _()-]+")
293 product = re.sub(punct_re, " ", product)
295 # Then remove non informative words
296 product = re.sub(non_informative_re, " ", product)
298 # Anything (informative) left?
299 empty_re = re.compile(r"^[ ]*$")
300 return not bool(empty_re.match(product))
302 def _to_list(self) -> list[Annotation]:
303 all_list: list[Annotation] = []
304 for feat_dict in self.features.values():
305 all_list += feat_dict.values()
306 return all_list
308 def to_json(self, out_path: PathLike) -> None:
309 """Print out the current annotation list in a json file.
311 Args:
312 out_path: JSON file path where to write the data.
314 """
315 self.transfer_descriptions()
316 feats_list = self._to_list()
317 print_json(Path(out_path), feats_list)
319 def store_gene(self, gene: GFFSeqFeature) -> None:
320 """Record the functional_annotations of a gene and its children features."""
321 self.add_feature(gene, "gene")
323 for transcript in gene.sub_features:
324 self.add_feature(transcript, "transcript", gene.id, [gene.id])
325 for feat in transcript.sub_features:
326 if feat.type == "CDS":
327 self.add_feature(feat, "translation", transcript.id, [gene.id, transcript.id])
328 # Store CDS functional annotation only once
329 break