Coverage for src/python/ensembl/io/genomio/gff3/simplifier.py: 100%

307 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-02-21 15:37 +0000

1# See the NOTICE file distributed with this work for additional information 

2# regarding copyright ownership. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15"""Standardize the gene model representation of a GFF3 file, and extract the functional annotation 

16in a separate file. 

17""" 

18 

19__all__ = [ 

20 "Records", 

21 "GFFSimplifier", 

22] 

23 

24from importlib.resources import as_file, files 

25import json 

26import logging 

27from os import PathLike 

28from pathlib import Path 

29import re 

30from typing import List, Optional, Set 

31 

32from BCBio import GFF 

33from Bio.SeqRecord import SeqRecord 

34 

35import ensembl.io.genomio.data.gff3 

36from ensembl.io.genomio.utils.json_utils import get_json 

37from .extract_annotation import FunctionalAnnotations 

38from .id_allocator import StableIDAllocator 

39from .restructure import restructure_gene, remove_cds_from_pseudogene 

40from .exceptions import GeneSegmentError, GFFParserError, IgnoredFeatureError, UnsupportedFeatureError 

41from .features import GFFSeqFeature 

42 

43 

44class Records(list): 

45 """List of GFF3 SeqRecords.""" 

46 

47 def from_gff(self, in_gff_path: PathLike, excluded: Optional[List[str]] = None) -> None: 

48 """Loads records from a GFF3 file. 

49 

50 Args: 

51 in_gff_path: Input GFF3 file path. 

52 excluded: Record IDs to not load from the GFF3 file. 

53 """ 

54 if excluded is None: 

55 excluded = [] 

56 with Path(in_gff_path).open("r") as in_gff_fh: 

57 for record in GFF.parse(in_gff_fh): 

58 if record.id in excluded: 

59 logging.debug(f"Skip seq_region {record.id} - in exclusion list") 

60 continue 

61 clean_record = SeqRecord(record.seq, id=record.id) 

62 clean_record.features = record.features 

63 self.append(clean_record) 

64 

65 def to_gff(self, out_gff_path: PathLike) -> None: 

66 """Writes the current list of records in a GFF3 file. 

67 

68 Args: 

69 out_gff_path: Path to GFF3 file where to write the records. 

70 """ 

71 with Path(out_gff_path).open("w") as out_gff_fh: 

72 GFF.write(self, out_gff_fh) 

73 

74 

75class GFFSimplifier: 

76 """Parse a GGF3 file and output a cleaned up GFF3 + annotation json file. 

77 

78 Raises: 

79 GFFParserError: If an error cannot be automatically fixed. 

80 """ 

81 

82 def __init__( 

83 self, 

84 genome_path: Optional[PathLike] = None, 

85 skip_unrecognized: bool = False, 

86 allow_pseudogene_with_cds: bool = False, 

87 ): 

88 """Create an object that simplifies `SeqFeature` objects. 

89 

90 Args: 

91 genome_path: Genome metadata file. 

92 skip_unrecognized: Do not include unknown biotypes instead of raising an exception. 

93 allow_pseudogene_with_cds: Keep CDSs under pseudogenes that have them. Delete them otherwise. 

94 

95 Raises: 

96 GFFParserError: If a biotype is unknown and `skip_unrecognized` is False. 

97 """ 

98 self.skip_unrecognized = skip_unrecognized 

99 self.allow_pseudogene_with_cds = allow_pseudogene_with_cds 

100 

101 # Load biotypes 

102 source = files(ensembl.io.genomio.data.gff3).joinpath("biotypes.json") 

103 with as_file(source) as biotypes_json: 

104 self._biotypes = get_json(biotypes_json) 

105 

106 # Load genome metadata 

107 self.genome = {} 

108 if genome_path: 

109 with Path(genome_path).open("r") as genome_fh: 

110 self.genome = json.load(genome_fh) 

111 

112 self.refseq = False 

113 if self.genome and self.genome["assembly"]["accession"].startswith("GCF"): 

114 self.refseq = True 

115 

116 # Other preparations 

117 self.stable_ids = StableIDAllocator() 

118 self.stable_ids.set_prefix(self.genome) 

119 self.exclude_seq_regions: List[str] = [] 

120 self.fail_types: Set = set() 

121 

122 # Init the actual data we will store 

123 self.records = Records() 

124 self.annotations = FunctionalAnnotations(self.get_provider_name()) 

125 

126 def get_provider_name(self) -> str: 

127 """Returns the provider name for this genome. 

128 

129 If this information is not available, will try to infer it from the assembly accession. Will 

130 return "GenBank" otherwise. 

131 """ 

132 provider_name = "GenBank" 

133 if self.genome: 

134 try: 

135 provider_name = self.genome["assembly"]["provider_name"] 

136 except KeyError: 

137 if self.genome["assembly"]["accession"].startswith("GCF"): 

138 provider_name = "RefSeq" 

139 else: 

140 logging.warning(f"No genome file, using the default provider_name: {provider_name}") 

141 return provider_name 

142 

143 def simpler_gff3(self, in_gff_path: PathLike) -> None: 

144 """Loads a GFF3 from INSDC and rewrites it in a simpler version, whilst also writing a 

145 functional annotation file. 

146 """ 

147 self.records.from_gff(in_gff_path, self.exclude_seq_regions) 

148 for record in self.records: 

149 cleaned_features = [] 

150 for feature in record.features: 

151 split_genes = self.normalize_mirna(feature) 

152 if split_genes: 

153 cleaned_features += split_genes 

154 else: 

155 try: 

156 clean_feature = self.simpler_gff3_feature(feature) 

157 cleaned_features.append(clean_feature) 

158 except (UnsupportedFeatureError, IgnoredFeatureError) as err: 

159 logging.debug(err.message) 

160 record.features = cleaned_features 

161 

162 if self.fail_types: 

163 fail_errors = "\n ".join(list(self.fail_types)) 

164 logging.warning(f"Unrecognized types found:\n {fail_errors}") 

165 if not self.skip_unrecognized: 

166 raise GFFParserError("Unrecognized types found, abort") 

167 

168 def simpler_gff3_feature(self, gene: GFFSeqFeature) -> GFFSeqFeature: 

169 """Creates a simpler version of a GFF3 feature. 

170 

171 Raises: 

172 IgnoredFeatureError: If the feature type is ignored. 

173 UnsupportedFeatureError: If the feature type is not supported. 

174 """ 

175 # Special cases 

176 non_gene = self.normalize_non_gene(gene) 

177 if non_gene: 

178 return non_gene 

179 if gene.type in self._biotypes["gene"]["ignored"]: 

180 raise IgnoredFeatureError(f"Ignored type {gene.type} for {gene.id}") 

181 

182 # Synonym 

183 if gene.type == "protein_coding_gene": 

184 gene.type = "gene" 

185 

186 # Lone sub-gene features, create a gene 

187 gene = self.create_gene_for_lone_transcript(gene) 

188 gene = self.create_gene_for_lone_cds(gene) 

189 

190 # What to do with unsupported gene types 

191 if gene.type not in self._biotypes["gene"]["supported"]: 

192 self.fail_types.add(f"gene={gene.type}") 

193 raise UnsupportedFeatureError(f"Unsupported type {gene.type} for {gene.id}") 

194 

195 # Normalize and store 

196 gene = self.normalize_gene(gene) 

197 self.annotations.store_gene(gene) 

198 return self.clean_gene(gene) 

199 

200 def create_gene_for_lone_transcript(self, feat: GFFSeqFeature) -> GFFSeqFeature: 

201 """Returns a gene for lone transcripts: 'gene' for tRNA/rRNA/mRNA, and 'ncRNA_gene' for all others. 

202 

203 Args: 

204 feat: The transcript for which we want to create a gene. 

205 """ 

206 transcript_types = self._biotypes["transcript"]["supported"] 

207 if feat.type not in transcript_types: 

208 return feat 

209 

210 new_type = "ncRNA_gene" 

211 if feat.type in ("tRNA", "rRNA", "mRNA"): 

212 new_type = "gene" 

213 logging.debug(f"Put the transcript {feat.type} in a {new_type} parent feature") 

214 new_gene = GFFSeqFeature(feat.location, type=new_type) 

215 new_gene.qualifiers["source"] = feat.qualifiers["source"] 

216 new_gene.sub_features = [feat] 

217 

218 # Use the transcript ID for the gene, and generate a sub ID for the transcript 

219 new_gene.id = feat.id 

220 new_gene.qualifiers["ID"] = new_gene.id 

221 feat.id = self.stable_ids.generate_transcript_id(new_gene.id, 1) 

222 feat.qualifiers["ID"] = feat.id 

223 

224 # Remove the exon/CDS parent so it is properly updated 

225 for subfeat in feat.sub_features: 

226 del subfeat.qualifiers["Parent"] 

227 

228 # Check if it's a pseudogene 

229 if feat.type == "mRNA": 

230 is_pseudo = False 

231 for subfeat in feat.sub_features: 

232 pseudo_qual = subfeat.qualifiers.get("pseudo", [""])[0] 

233 if subfeat.type == "CDS" and pseudo_qual == "true": 

234 is_pseudo = True 

235 del subfeat.qualifiers["pseudo"] 

236 break 

237 if is_pseudo: 

238 new_gene.type = "pseudogene" 

239 

240 return new_gene 

241 

242 def create_gene_for_lone_cds(self, feat: GFFSeqFeature) -> GFFSeqFeature: 

243 """Returns a gene created for a lone CDS. 

244 

245 Args: 

246 feat: The CDS for which we want to create a gene. 

247 """ 

248 if feat.type != "CDS": 

249 return feat 

250 

251 logging.debug(f"Put the lone CDS in gene-mRNA parent features for {feat.id}") 

252 

253 # Create a transcript, add the CDS 

254 transcript = GFFSeqFeature(feat.location, type="mRNA") 

255 transcript.qualifiers["source"] = feat.qualifiers["source"] 

256 transcript.sub_features = [feat] 

257 

258 # Add an exon too 

259 exon = GFFSeqFeature(feat.location, type="exon") 

260 exon.qualifiers["source"] = feat.qualifiers["source"] 

261 transcript.sub_features.append(exon) 

262 

263 # Create a gene, add the transcript 

264 gene_type = "gene" 

265 if ("pseudo" in feat.qualifiers) and (feat.qualifiers["pseudo"][0] == "true"): 

266 gene_type = "pseudogene" 

267 del feat.qualifiers["pseudo"] 

268 new_gene = GFFSeqFeature(feat.location, type=gene_type) 

269 new_gene.qualifiers["source"] = feat.qualifiers["source"] 

270 new_gene.sub_features = [transcript] 

271 new_gene.id = self.stable_ids.generate_gene_id() 

272 new_gene.qualifiers["ID"] = new_gene.id 

273 transcript.id = self.stable_ids.generate_transcript_id(new_gene.id, 1) 

274 transcript.qualifiers["ID"] = transcript.id 

275 

276 return new_gene 

277 

278 def normalize_non_gene(self, feat: GFFSeqFeature) -> Optional[GFFSeqFeature]: 

279 """Returns a normalised "non-gene" or `None` if not applicable. 

280 

281 Only transposable elements supported at the moment. 

282 

283 Args: 

284 feat: Feature to normalise. 

285 

286 Raises: 

287 NotImplementedError: If the feature is a not supported non-gene. 

288 """ 

289 

290 if feat.type not in self._biotypes["non_gene"]["supported"]: 

291 return None 

292 if feat.type in ("mobile_genetic_element", "transposable_element"): 

293 feat.type = "transposable_element" 

294 feat = self._normalize_mobile_genetic_element(feat) 

295 # Generate ID if needed 

296 feat.id = self.stable_ids.normalize_gene_id(feat, self.refseq) 

297 feat.qualifiers["ID"] = feat.id 

298 

299 self.annotations.add_feature(feat, "transposable_element") 

300 return self.clean_gene(feat) 

301 # This is a failsafe in case you add supported non-genes 

302 raise NotImplementedError(f"Unsupported non-gene: {feat.type} for {feat.id}") 

303 

304 def _normalize_mobile_genetic_element(self, feat: GFFSeqFeature) -> GFFSeqFeature: 

305 """Normalize a mobile element if it has a mobile_element_type field.""" 

306 try: 

307 mobile_element_type = feat.qualifiers["mobile_element_type"] 

308 except KeyError: 

309 logging.warning("No 'mobile_element_type' tag found") 

310 return feat 

311 

312 # Get the type (and name) from the attrib 

313 element_type, _, element_name = mobile_element_type[0].partition(":") 

314 description = element_type 

315 if element_name: 

316 description += f" ({element_name})" 

317 

318 # Keep the metadata in the description if the type is known 

319 if element_type in ("transposon", "retrotransposon"): 

320 if not feat.qualifiers.get("product"): 

321 feat.qualifiers["product"] = [description] 

322 return feat 

323 raise GFFParserError(f"'mobile_element_type' is not a transposon: {element_type}") 

324 

325 def clean_gene(self, gene: GFFSeqFeature) -> GFFSeqFeature: 

326 """Return the same gene without qualifiers unrelated to the gene structure.""" 

327 

328 old_gene_qualifiers = gene.qualifiers 

329 gene.qualifiers = {"ID": gene.id, "source": old_gene_qualifiers["source"]} 

330 for transcript in gene.sub_features: 

331 # Replace qualifiers 

332 old_transcript_qualifiers = transcript.qualifiers 

333 transcript.qualifiers = { 

334 "ID": transcript.id, 

335 "Parent": gene.id, 

336 "source": old_transcript_qualifiers["source"], 

337 } 

338 

339 for feat in transcript.sub_features: 

340 old_qualifiers = feat.qualifiers 

341 feat.qualifiers = { 

342 "ID": feat.id, 

343 "Parent": transcript.id, 

344 "source": old_qualifiers["source"], 

345 } 

346 if feat.type == "CDS": 

347 feat.qualifiers["phase"] = old_qualifiers["phase"] 

348 

349 return gene 

350 

351 def normalize_gene(self, gene: GFFSeqFeature) -> GFFSeqFeature: 

352 """Returns a normalized gene structure, separate from the functional elements. 

353 

354 Args: 

355 gene: Gene object to normalize. 

356 functional_annotation: List of feature annotations (appended by this method). 

357 

358 """ 

359 

360 gene.id = self.stable_ids.normalize_gene_id(gene, refseq=self.refseq) 

361 restructure_gene(gene) 

362 self.normalize_transcripts(gene) 

363 self.normalize_pseudogene(gene) 

364 

365 return gene 

366 

367 def normalize_pseudogene(self, gene: GFFSeqFeature) -> None: 

368 """Normalize CDSs if allowed, otherwise remove them.""" 

369 if gene.type != "pseudogene": 

370 return 

371 

372 if self.allow_pseudogene_with_cds: 

373 self.stable_ids.normalize_pseudogene_cds_id(gene) 

374 else: 

375 remove_cds_from_pseudogene(gene) 

376 

377 def normalize_transcripts(self, gene: GFFSeqFeature) -> None: 

378 """Normalizes a transcript.""" 

379 

380 allowed_transcript_types = self._biotypes["transcript"]["supported"] 

381 ignored_transcript_types = self._biotypes["transcript"]["ignored"] 

382 

383 transcripts_to_delete = [] 

384 for count, transcript in enumerate(gene.sub_features): 

385 if ( 

386 transcript.type not in allowed_transcript_types 

387 and transcript.type not in ignored_transcript_types 

388 ): 

389 self.fail_types.add(f"transcript={transcript.type}") 

390 logging.warning( 

391 f"Unrecognized transcript type: {transcript.type}" f" for {transcript.id} ({gene.id})" 

392 ) 

393 transcripts_to_delete.append(count) 

394 continue 

395 

396 # New transcript ID 

397 transcript_number = count + 1 

398 transcript.id = self.stable_ids.generate_transcript_id(gene.id, transcript_number) 

399 

400 transcript = self.format_gene_segments(transcript) 

401 

402 # EXONS AND CDS 

403 transcript = self._normalize_transcript_subfeatures(gene, transcript) 

404 

405 if transcripts_to_delete: 

406 for elt in sorted(transcripts_to_delete, reverse=True): 

407 gene.sub_features.pop(elt) 

408 

409 def format_gene_segments(self, transcript: GFFSeqFeature) -> GFFSeqFeature: 

410 """Returns the equivalent Ensembl biotype feature for gene segment transcript features. 

411 

412 Supported features: "C_gene_segment" and "V_gene_segment". 

413 

414 Args: 

415 transcript: Gene segment transcript feature. 

416 

417 Raises: 

418 GeneSegmentError: Unable to get the segment type information from the feature. 

419 """ 

420 if transcript.type not in ("C_gene_segment", "V_gene_segment"): 

421 return transcript 

422 

423 # Guess the segment type from the transcript attribs 

424 seg_type = self._get_segment_type(transcript) 

425 if not seg_type: 

426 # Get the information from a CDS instead 

427 sub_feats: List[GFFSeqFeature] = transcript.sub_features 

428 cdss: List[GFFSeqFeature] = list(filter(lambda x: x.type == "CDS", sub_feats)) 

429 if cdss: 

430 seg_type = self._get_segment_type(cdss[0]) 

431 if not seg_type: 

432 raise GeneSegmentError(f"Unable to infer segment from {transcript.id}") 

433 

434 # Change V/C_gene_segment into a its corresponding transcript names 

435 transcript.type = f"{seg_type}_{transcript.type.replace('_segment', '')}" 

436 return transcript 

437 

438 def _get_segment_type(self, feature: GFFSeqFeature) -> str: 

439 """Infer if a segment is "IG" (immunoglobulin) of "TR" (t-cell) from the feature attribs. 

440 

441 Returns an empty string if no segment type info was found. 

442 """ 

443 

444 product = feature.qualifiers.get("standard_name", [""])[0] 

445 if not product: 

446 product = feature.qualifiers.get("product", [""])[0] 

447 if not product: 

448 return "" 

449 

450 if re.search(r"\b(immunoglobulin|ig)\b", product, flags=re.IGNORECASE): 

451 return "IG" 

452 if re.search(r"\bt[- _]cell\b", product, flags=re.IGNORECASE): 

453 return "TR" 

454 return "" 

455 

456 def _normalize_transcript_subfeatures( 

457 self, gene: GFFSeqFeature, transcript: GFFSeqFeature 

458 ) -> GFFSeqFeature: 

459 """Returns a transcript with normalized sub-features.""" 

460 exons_to_delete = [] 

461 exon_number = 1 

462 for tcount, feat in enumerate(transcript.sub_features): 

463 if feat.type == "exon": 

464 # New exon ID 

465 feat.id = f"{transcript.id}-E{exon_number}" 

466 exon_number += 1 

467 # Replace qualifiers 

468 old_exon_qualifiers = feat.qualifiers 

469 feat.qualifiers = {"Parent": transcript.id} 

470 feat.qualifiers["source"] = old_exon_qualifiers["source"] 

471 elif feat.type == "CDS": 

472 # New CDS ID 

473 feat.id = self.stable_ids.normalize_cds_id(feat.id) 

474 if feat.id in ("", gene.id, transcript.id): 

475 feat.id = f"{transcript.id}_cds" 

476 else: 

477 if feat.type in self._biotypes["transcript"]["ignored"]: 

478 exons_to_delete.append(tcount) 

479 continue 

480 

481 self.fail_types.add(f"sub_transcript={feat.type}") 

482 logging.warning( 

483 f"Unrecognized exon type for {feat.type}: {feat.id}" 

484 f" (for transcript {transcript.id} of type {transcript.type})" 

485 ) 

486 exons_to_delete.append(tcount) 

487 continue 

488 

489 if exons_to_delete: 

490 for elt in sorted(exons_to_delete, reverse=True): 

491 transcript.sub_features.pop(elt) 

492 return transcript 

493 

494 def normalize_mirna(self, gene: GFFSeqFeature) -> List[GFFSeqFeature]: 

495 """Returns gene representations from a miRNA gene that can be loaded in an Ensembl database. 

496 

497 Change the representation from the form `gene[ primary_transcript[ exon, miRNA[ exon ] ] ]` 

498 to `ncRNA_gene[ miRNA_primary_transcript[ exon ] ]` and `gene[ miRNA[ exon ] ]` 

499 

500 Raises: 

501 GFFParserError: If gene has more than 1 transcript, the transcript was not formatted 

502 correctly or there are unknown sub-features. 

503 """ 

504 base_id = gene.id 

505 transcripts = gene.sub_features 

506 

507 # Insert main gene first if needed 

508 old_gene = gene 

509 if gene.type == "primary_transcript": 

510 primary = old_gene 

511 gene = GFFSeqFeature(primary.location, type="gene") 

512 gene.sub_features = [primary] 

513 gene.qualifiers = primary.qualifiers 

514 transcripts = gene.sub_features 

515 gene.id = f"{base_id}_0" 

516 gene.qualifiers["ID"] = gene.id 

517 

518 if (len(transcripts) == 0) or (transcripts[0].type != "primary_transcript"): 

519 return [] 

520 if len(transcripts) > 1: 

521 raise GFFParserError(f"Gene has too many sub_features for miRNA {gene.id}") 

522 

523 # Passed the checks 

524 primary = transcripts[0] 

525 primary.type = "miRNA_primary_transcript" 

526 gene.type = "ncRNA_gene" 

527 logging.debug(f"Formatting miRNA gene {gene.id}") 

528 

529 new_genes = [] 

530 new_primary_subfeatures = [] 

531 num = 1 

532 for sub in primary.sub_features: 

533 if sub.type == "exon": 

534 new_primary_subfeatures.append(sub) 

535 elif sub.type == "miRNA": 

536 new_gene_id = f"{base_id}_{num}" 

537 num += 1 

538 new_gene = GFFSeqFeature(sub.location, type="gene", id=new_gene_id) 

539 new_gene.qualifiers = {"source": sub.qualifiers["source"], "ID": new_gene_id} 

540 new_gene.sub_features = [sub] 

541 new_genes.append(new_gene) 

542 else: 

543 raise GFFParserError(f"Unknown subtypes for miRNA features: {sub.id}") 

544 primary.sub_features = new_primary_subfeatures 

545 

546 if not new_genes: 

547 logging.debug(f"Primary_transcript without miRNA in {gene.id}") 

548 all_genes = [gene] 

549 else: 

550 all_genes = [gene] + new_genes 

551 

552 # Normalize like other genes 

553 all_genes_cleaned = [] 

554 for new_gene in all_genes: 

555 new_gene = self.normalize_gene(new_gene) 

556 self.annotations.store_gene(new_gene) 

557 all_genes_cleaned.append(self.clean_gene(new_gene)) 

558 return all_genes_cleaned