Coverage for src/python/ensembl/io/genomio/gff3/extract_annotation.py: 96%

149 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-02-21 15:37 +0000

1# See the NOTICE file distributed with this work for additional information 

2# regarding copyright ownership. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15"""Simple representation of gene features functional annotation extracted from a GFF3 file.""" 

16 

17__all__ = [ 

18 "Annotation", 

19 "DuplicateIdError", 

20 "MissingParentError", 

21 "AnnotationError", 

22 "FunctionalAnnotations", 

23] 

24 

25from os import PathLike 

26import logging 

27from pathlib import Path 

28import re 

29from typing import Any, Dict, List, Optional 

30 

31from ensembl.io.genomio.utils.json_utils import print_json 

32from .features import GFFSeqFeature 

33 

34 

35Annotation = Dict[str, Any] 

36 

37_PARENTS = { 

38 "transcript": "gene", 

39 "translation": "transcript", 

40} 

41 

42 

43class DuplicateIdError(Exception): 

44 """Trying to add a feature with an ID already in use.""" 

45 

46 

47class MissingParentError(Exception): 

48 """Trying to add a feature without an expected parent.""" 

49 

50 

51class AnnotationError(Exception): 

52 """If anything wrong happens when recording annotations.""" 

53 

54 

55class FunctionalAnnotations: 

56 """List of annotations extracted from a GFF3 file.""" 

57 

58 ignored_xrefs = {"go", "interpro", "uniprot"} 

59 

60 def __init__(self, provider_name: str = "") -> None: 

61 self.annotations: List[Annotation] = [] 

62 self.provider_name = provider_name 

63 # Annotated features 

64 # Under each feature, each dict's key is a feature ID 

65 self.features: Dict[str, Dict[str, Annotation]] = { 

66 "gene": {}, 

67 "transcript": {}, 

68 "translation": {}, 

69 "transposable_element": {}, 

70 } 

71 # Keep parent info: key is the feature ID, value is the parent ID 

72 self.parents: Dict[str, Dict[str, str]] = { 

73 "gene": {}, 

74 "transcript": {}, 

75 } 

76 

77 def get_xrefs(self, feature: GFFSeqFeature) -> List[Dict[str, Any]]: 

78 """Get the xrefs from the Dbxref field.""" 

79 all_xref: List[Dict[str, str]] = [] 

80 

81 if "Dbxref" in feature.qualifiers: 

82 for xref in feature.qualifiers["Dbxref"]: 

83 dbname, name = xref.split(":", maxsplit=1) 

84 if dbname == "GenBank" and self.provider_name == "RefSeq": 

85 dbname = "RefSeq" 

86 

87 if dbname.lower() in self.ignored_xrefs: 

88 continue 

89 

90 xrefs = {"dbname": dbname, "id": name} 

91 all_xref.append(xrefs) 

92 

93 # Add RefSeq ID xref if it looks like one 

94 if self.provider_name == "RefSeq": 

95 if feature.type == "gene" and feature.id.startswith("LOC"): 

96 xref_dbs = {x["dbname"] for x in all_xref} 

97 if "RefSeq" not in xref_dbs: 

98 all_xref.append({"dbname": "RefSeq", "id": feature.id}) 

99 

100 return all_xref 

101 

102 def get_features(self, feat_type: str) -> Dict[str, Annotation]: 

103 """Get all feature annotations for the requested type.""" 

104 try: 

105 return self.features[feat_type] 

106 except KeyError as err: 

107 raise KeyError(f"No such feature type {feat_type}") from err 

108 

109 def add_parent_link(self, parent_type: str, parent_id: str, child_id: str) -> None: 

110 """Record a parent-child IDs relationship for a given parent biotype.""" 

111 features = self.get_features(parent_type) 

112 if parent_id not in features: 

113 raise MissingParentError(f"Parent {parent_type}:{parent_id} not found for {child_id}") 

114 self.parents[parent_type][child_id] = parent_id 

115 

116 def get_parent(self, parent_type: str, child_id: str) -> str: 

117 """Returns the parent ID of a given child for a given parent biotype.""" 

118 try: 

119 parents = self.parents[parent_type] 

120 except KeyError as err: 

121 raise KeyError(f"Unsupported parent type {parent_type}") from err 

122 

123 parent_id = parents.get(child_id) 

124 if parent_id is None: 

125 raise MissingParentError(f"Can't find {parent_type} parent for {child_id}") 

126 return parent_id 

127 

128 def add_feature( 

129 self, 

130 feature: GFFSeqFeature, 

131 feat_type: str, 

132 parent_id: Optional[str] = None, 

133 all_parent_ids: Optional[List[str]] = None, 

134 ) -> None: 

135 """Add annotation for a feature of a given type. If a parent_id is provided, record the relationship. 

136 

137 Args: 

138 feature: The feature to create an annotation. 

139 feat_type: Type of the feature to annotate. 

140 parent_id: Parent ID of this feature to keep it linked. 

141 all_parent_ids: All parent IDs to remove from non-informative descriptions. 

142 """ 

143 if all_parent_ids is None: 

144 all_parent_ids = [] 

145 features = self.get_features(feat_type) 

146 if feature.id in features: 

147 raise AnnotationError(f"Feature {feat_type} ID {feature.id} already added") 

148 

149 feature_object = self._generic_feature(feature, feat_type, all_parent_ids) 

150 self.features[feat_type][feature.id] = feature_object 

151 

152 if parent_id: 

153 if feat_type in _PARENTS: 

154 parent_type = _PARENTS[feat_type] 

155 self.add_parent_link(parent_type, parent_id, feature.id) 

156 else: 

157 raise AnnotationError(f"No parent possible for {feat_type} {feature.id}") 

158 

159 def _generic_feature( 

160 self, feature: GFFSeqFeature, feat_type: str, parent_ids: Optional[List[str]] = None 

161 ) -> Dict[str, Any]: 

162 """Create a feature object following the specifications. 

163 

164 Args: 

165 feature: The GFFSeqFeature to add to the list. 

166 feat_type: Feature type of the feature to store (e.g. gene, transcript, translation). 

167 all_parent_ids: All parent IDs to remove from non-informative descriptions. 

168 

169 """ 

170 if parent_ids is None: 170 ↛ 171line 170 didn't jump to line 171 because the condition on line 170 was never true

171 parent_ids = [] 

172 

173 feature_object: Annotation = {"object_type": feat_type, "id": feature.id} 

174 

175 # Description? 

176 for qname in ("description", "product"): 

177 if qname in feature.qualifiers: 

178 description = feature.qualifiers[qname][0] 

179 if self.product_is_informative(description, feat_ids=parent_ids + [feature.id]): 

180 feature_object["description"] = description 

181 break 

182 logging.debug(f"Non informative description for {feature.id}: {description}") 

183 

184 feature_object["xrefs"] = [] 

185 if "Dbxref" in feature.qualifiers: 185 ↛ 186line 185 didn't jump to line 186 because the condition on line 185 was never true

186 all_xref = self.get_xrefs(feature) 

187 feature_object["xrefs"] = all_xref 

188 

189 xref_values = {xref["id"].lower() for xref in feature_object["xrefs"]} 

190 

191 # Synonyms? 

192 # We add synonyms to the external_synonym table 

193 # which is associated with the first xref of that feature type 

194 if "Name" in feature.qualifiers: 

195 feat_name = feature.qualifiers["Name"][0] 

196 if feat_name.lower() != feature.id.lower() and feat_name.lower() not in xref_values: 

197 feature_object["synonyms"] = [feat_name] 

198 

199 # is_pseudogene? 

200 if feature.type.startswith("pseudogen"): 

201 feature_object["is_pseudogene"] = True 

202 

203 # Don't keep empty xref 

204 if not feature_object["xrefs"]: 204 ↛ 206line 204 didn't jump to line 206 because the condition on line 204 was always true

205 del feature_object["xrefs"] 

206 return feature_object 

207 

208 def transfer_descriptions(self) -> None: 

209 """Transfers the feature descriptions in 2 steps: 

210 - from translations to transcripts (if the transcript description is empty) 

211 - from transcripts to genes (same case) 

212 

213 """ 

214 self._transfer_description_up("translation") 

215 self._transfer_description_up("transcript") 

216 

217 def _transfer_description_up(self, child_feature: str) -> None: 

218 """Transfer descriptions from all feature of a given type, up to their parent. 

219 

220 Args: 

221 child_feature: Either "translation" (transfer to transcript) or "transcript" (to gene). 

222 

223 """ 

224 children_features = self.get_features(child_feature) 

225 parent_type = _PARENTS[child_feature] 

226 parent_features = self.get_features(parent_type) 

227 

228 # Transfer description from children to their parent 

229 for child_id, child in children_features.items(): 

230 child_description = child.get("description") 

231 if child_description is not None: 

232 child_description = self._clean_description(child_description) 

233 # Check parent 

234 parent_id = self.get_parent(parent_type, child_id) 

235 parent = parent_features[parent_id] 

236 parent_description = parent.get("description") 

237 if parent_description is None: 

238 parent["description"] = child_description 

239 

240 @staticmethod 

241 def _clean_description(description: str) -> str: 

242 """Returns the description without "transcript variant" information.""" 

243 variant_re = re.compile(r", transcript variant [A-Z][0-9]+$", re.IGNORECASE) 

244 description = re.sub(variant_re, "", description) 

245 return description 

246 

247 @staticmethod 

248 def product_is_informative(product: str, feat_ids: Optional[List[str]] = None) -> bool: 

249 """Returns True if the product name contains informative words, False otherwise. 

250 

251 It is considered uninformative when the description contains words such as "hypothetical" or 

252 or "putative". If feature IDs are provided, consider it uninformative as well (we do not want 

253 descriptions to be just the ID). 

254 

255 Args: 

256 product: A product name. 

257 feat_ids: List of feature IDs. 

258 

259 """ 

260 non_informative_words = [ 

261 "hypothetical", 

262 "putative", 

263 "uncharacterized", 

264 "unspecified", 

265 "unknown", 

266 r"(of )?unknown function", 

267 "conserved", 

268 "predicted", 

269 "fragment", 

270 "product", 

271 "function", 

272 "protein", 

273 "transcript", 

274 "gene", 

275 "RNA", 

276 r"(variant|isoform)( X?\d+)?", 

277 r"low quality protein", 

278 ] 

279 non_informative_re = re.compile(r"|".join(non_informative_words), re.IGNORECASE) 

280 

281 # Remove all IDs that are in the description 

282 if feat_ids: 

283 logging.debug(f"Filter out {feat_ids} from {product}") 

284 try: 

285 for feat_id in feat_ids: 

286 feat_id_re = re.compile(feat_id, re.IGNORECASE) 

287 product = re.sub(feat_id_re, "", product) 

288 except TypeError as err: 

289 raise TypeError(f"Failed to search {feat_id_re} in '{product}'") from err 

290 

291 # Remove punctuations 

292 punct_re = re.compile(r"[,;: _()-]+") 

293 product = re.sub(punct_re, " ", product) 

294 

295 # Then remove non informative words 

296 product = re.sub(non_informative_re, " ", product) 

297 

298 # Anything (informative) left? 

299 empty_re = re.compile(r"^[ ]*$") 

300 return not bool(empty_re.match(product)) 

301 

302 def _to_list(self) -> list[Annotation]: 

303 all_list: list[Annotation] = [] 

304 for feat_dict in self.features.values(): 

305 all_list += feat_dict.values() 

306 return all_list 

307 

308 def to_json(self, out_path: PathLike) -> None: 

309 """Print out the current annotation list in a json file. 

310 

311 Args: 

312 out_path: JSON file path where to write the data. 

313 

314 """ 

315 self.transfer_descriptions() 

316 feats_list = self._to_list() 

317 print_json(Path(out_path), feats_list) 

318 

319 def store_gene(self, gene: GFFSeqFeature) -> None: 

320 """Record the functional_annotations of a gene and its children features.""" 

321 self.add_feature(gene, "gene") 

322 

323 for transcript in gene.sub_features: 

324 self.add_feature(transcript, "transcript", gene.id, [gene.id]) 

325 for feat in transcript.sub_features: 

326 if feat.type == "CDS": 

327 self.add_feature(feat, "translation", transcript.id, [gene.id, transcript.id]) 

328 # Store CDS functional annotation only once 

329 break