Coverage for src/python/ensembl/io/genomio/gff3/gene_merger.py: 100%

69 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-02-21 15:37 +0000

1# See the NOTICE file distributed with this work for additional information 

2# regarding copyright ownership. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15"""Merge split genes in a GFF file.""" 

16 

17__all__ = [ 

18 "GFFGeneMerger", 

19] 

20 

21from importlib.resources import as_file, files 

22import logging 

23from os import PathLike 

24from pathlib import Path 

25import re 

26from typing import List 

27 

28import ensembl.io.genomio.data.gff3 

29from ensembl.io.genomio.utils.json_utils import get_json 

30 

31 

32class GFFGeneMerger: 

33 """Specialized class to merge split genes in a GFF3 file, prior to further parsing.""" 

34 

35 def __init__(self) -> None: 

36 source = files(ensembl.io.genomio.data.gff3).joinpath("biotypes.json") 

37 with as_file(source) as biotypes_json: 

38 self._biotypes = get_json(biotypes_json) 

39 

40 def merge(self, in_gff_path: PathLike, out_gff_path: PathLike) -> List[str]: 

41 """ 

42 Merge genes in a gff that are split in multiple lines. 

43 

44 Args: 

45 in_gff_path: Input GFF3 that may have split merge. 

46 out_gff_path: Output GFF3 with those genes merged. 

47 

48 Returns: 

49 List of all merged genes, each represented as a string of the GFF3 lines of all their parts. 

50 """ 

51 to_merge = [] 

52 merged: List[str] = [] 

53 

54 with Path(in_gff_path).open("r") as in_gff_fh, Path(out_gff_path).open("w") as out_gff_fh: 

55 for line in in_gff_fh: 

56 # Skip comments 

57 if line.startswith("#"): 

58 if line.startswith("##FASTA"): 

59 logging.warning("This GFF3 file contains FASTA sequences") 

60 break 

61 out_gff_fh.write(line) 

62 else: 

63 # Parse one line 

64 line = line.rstrip() 

65 fields = line.split("\t") 

66 attr_fields = fields[8].split(";") 

67 attrs = {} 

68 for a in attr_fields: 

69 (key, value) = a.split("=") 

70 attrs[key] = value 

71 

72 # Check this is a gene to merge; cache it then 

73 if fields[2] in self._biotypes["gene"]["supported"] and ( 

74 "part" in attrs or "is_ordered" in attrs 

75 ): 

76 to_merge.append(fields) 

77 

78 # If not, merge previous gene if needed, and print the line 

79 else: 

80 if to_merge: 

81 merged_str = [] 

82 for line_to_merge in to_merge: 

83 merged_str.append("\t".join(line_to_merge)) 

84 merged.append("\n".join(merged_str) + "\n") 

85 

86 new_line = self._merge_genes(to_merge) 

87 out_gff_fh.write(new_line) 

88 to_merge = [] 

89 out_gff_fh.write(line + "\n") 

90 

91 # Print last merged gene if there is one 

92 if to_merge: 

93 merged_str = [] 

94 for line_to_merge in to_merge: 

95 merged_str.append("\t".join(line_to_merge)) 

96 merged.append("\n".join(merged_str) + "\n") 

97 

98 new_line = self._merge_genes(to_merge) 

99 out_gff_fh.write(new_line) 

100 

101 logging.debug(f"Merged lines: {len(merged)}") 

102 return merged 

103 

104 def _merge_genes(self, to_merge: List) -> str: 

105 """Returns a single gene gff3 line merged from separate parts. 

106 

107 Args: 

108 to_merge: List of gff3 lines with gene parts. 

109 

110 """ 

111 min_start = -1 

112 max_end = -1 

113 for gene in to_merge: 

114 start = int(gene[3]) 

115 end = int(gene[4]) 

116 

117 if start < min_start or min_start < 0: 

118 min_start = start 

119 if end > max_end or max_end < 0: 

120 max_end = end 

121 

122 # Take the first line as template and replace things 

123 new_gene = to_merge[0] 

124 new_gene[3] = str(min_start) 

125 new_gene[4] = str(max_end) 

126 

127 attrs = new_gene[8] 

128 attrs = attrs.replace(";is_ordered=true", "") 

129 attrs = re.sub(r";part=\d+/\d+", "", attrs) 

130 new_gene[8] = attrs 

131 

132 return "\t".join(new_gene) + "\n"