Coverage for src/python/ensembl/io/genomio/gff3/gene_merger.py: 100%
69 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-02-21 15:37 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-02-21 15:37 +0000
1# See the NOTICE file distributed with this work for additional information
2# regarding copyright ownership.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15"""Merge split genes in a GFF file."""
17__all__ = [
18 "GFFGeneMerger",
19]
21from importlib.resources import as_file, files
22import logging
23from os import PathLike
24from pathlib import Path
25import re
26from typing import List
28import ensembl.io.genomio.data.gff3
29from ensembl.io.genomio.utils.json_utils import get_json
32class GFFGeneMerger:
33 """Specialized class to merge split genes in a GFF3 file, prior to further parsing."""
35 def __init__(self) -> None:
36 source = files(ensembl.io.genomio.data.gff3).joinpath("biotypes.json")
37 with as_file(source) as biotypes_json:
38 self._biotypes = get_json(biotypes_json)
40 def merge(self, in_gff_path: PathLike, out_gff_path: PathLike) -> List[str]:
41 """
42 Merge genes in a gff that are split in multiple lines.
44 Args:
45 in_gff_path: Input GFF3 that may have split merge.
46 out_gff_path: Output GFF3 with those genes merged.
48 Returns:
49 List of all merged genes, each represented as a string of the GFF3 lines of all their parts.
50 """
51 to_merge = []
52 merged: List[str] = []
54 with Path(in_gff_path).open("r") as in_gff_fh, Path(out_gff_path).open("w") as out_gff_fh:
55 for line in in_gff_fh:
56 # Skip comments
57 if line.startswith("#"):
58 if line.startswith("##FASTA"):
59 logging.warning("This GFF3 file contains FASTA sequences")
60 break
61 out_gff_fh.write(line)
62 else:
63 # Parse one line
64 line = line.rstrip()
65 fields = line.split("\t")
66 attr_fields = fields[8].split(";")
67 attrs = {}
68 for a in attr_fields:
69 (key, value) = a.split("=")
70 attrs[key] = value
72 # Check this is a gene to merge; cache it then
73 if fields[2] in self._biotypes["gene"]["supported"] and (
74 "part" in attrs or "is_ordered" in attrs
75 ):
76 to_merge.append(fields)
78 # If not, merge previous gene if needed, and print the line
79 else:
80 if to_merge:
81 merged_str = []
82 for line_to_merge in to_merge:
83 merged_str.append("\t".join(line_to_merge))
84 merged.append("\n".join(merged_str) + "\n")
86 new_line = self._merge_genes(to_merge)
87 out_gff_fh.write(new_line)
88 to_merge = []
89 out_gff_fh.write(line + "\n")
91 # Print last merged gene if there is one
92 if to_merge:
93 merged_str = []
94 for line_to_merge in to_merge:
95 merged_str.append("\t".join(line_to_merge))
96 merged.append("\n".join(merged_str) + "\n")
98 new_line = self._merge_genes(to_merge)
99 out_gff_fh.write(new_line)
101 logging.debug(f"Merged lines: {len(merged)}")
102 return merged
104 def _merge_genes(self, to_merge: List) -> str:
105 """Returns a single gene gff3 line merged from separate parts.
107 Args:
108 to_merge: List of gff3 lines with gene parts.
110 """
111 min_start = -1
112 max_end = -1
113 for gene in to_merge:
114 start = int(gene[3])
115 end = int(gene[4])
117 if start < min_start or min_start < 0:
118 min_start = start
119 if end > max_end or max_end < 0:
120 max_end = end
122 # Take the first line as template and replace things
123 new_gene = to_merge[0]
124 new_gene[3] = str(min_start)
125 new_gene[4] = str(max_end)
127 attrs = new_gene[8]
128 attrs = attrs.replace(";is_ordered=true", "")
129 attrs = re.sub(r";part=\d+/\d+", "", attrs)
130 new_gene[8] = attrs
132 return "\t".join(new_gene) + "\n"