Coverage for src/python/ensembl/io/genomio/gff3/overlaps.py: 17%

94 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-02-21 15:37 +0000

1# See the NOTICE file distributed with this work for additional information 

2# regarding copyright ownership. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15"""Scan a GFF3 file to detect overlapping SeqFeature objects. Default object level => gene.""" 

16 

17__all__ = [ 

18 "summarize_feature_stats", 

19 "identify_feature_overlaps", 

20 "scan_tree", 

21 "get_intervals", 

22] 

23 

24from collections import defaultdict 

25import json 

26import logging 

27from pathlib import Path 

28from pprint import pprint 

29 

30from BCBio import GFF 

31from BCBio.GFF import GFFExaminer 

32from Bio.SeqRecord import SeqRecord 

33from intervaltree import Interval, IntervalTree 

34 

35import ensembl.io.genomio 

36from ensembl.io.genomio.utils import print_json 

37from ensembl.utils.argparse import ArgumentParser 

38from ensembl.utils.logging import init_logging_with_args 

39 

40 

41def summarize_feature_stats(gff_in: Path) -> None: 

42 """Analyse a GFF3 file and produce a summary of its feature types. 

43 

44 Args: 

45 gff_in: User supplied GFF3 input file. 

46 """ 

47 

48 logging.info("Alt processing: Not parsing the GFF3, producing summary feature stats instead!") 

49 

50 examiner = GFFExaminer() 

51 with gff_in.open("r", encoding="utf-8") as input_handle: 

52 pprint(examiner.available_limits(input_handle)) 

53 input_handle.close() 

54 

55 

56def identify_feature_overlaps(gff_in: Path, output_file: Path, isolate_feature: str) -> None: 

57 """Detect overlapping GFF3 SeqFeature objects and dump to a report. 

58 

59 Args: 

60 gff_in: User supplied GFF3 input file. 

61 output_file: Output file to write feature overlaps. 

62 isolate_feature: Sequence feature type to filter by. 

63 """ 

64 logging.info("Processing sequence feature overlaps!") 

65 logging.info(f"Output file = {str(output_file)}") 

66 logging.info(f"Features filtered by type: {isolate_feature}") 

67 

68 gff_type_filter: dict = {"gff_type": [isolate_feature]} 

69 seq_dict: dict = defaultdict(dict) 

70 genes_dict: dict = {} 

71 with gff_in.open("r", encoding="utf-8") as input_handle: 

72 for record in GFF.parse(input_handle, limit_info=gff_type_filter): 

73 seq_name = str(record.id) 

74 if seq_name not in seq_dict: 

75 seq_dict[seq_name]["plus"] = [] 

76 seq_dict[seq_name]["minus"] = [] 

77 

78 get_intervals(record, genes_dict, seq_dict, seq_name) 

79 

80 overlap_count = _write_report(output_file, seq_dict, genes_dict) 

81 

82 result_total_features = f"In total {len(genes_dict)} {isolate_feature} features were scanned." 

83 print(result_total_features) 

84 logging.info(result_total_features) 

85 

86 result_total_overlaps = f"In total {overlap_count} overlaps were detected." 

87 print(result_total_overlaps) 

88 logging.info(result_total_overlaps) 

89 

90 logging.info("Finished all processing.") 

91 

92 

93def scan_tree(feature_intervals: list) -> set: 

94 """Construct an interval tree using supplied genomic intervals, check all elements on the tree against 

95 itself and return any that hit 2 or more intervals (i.e. itself + 1 other) 

96 

97 Args: 

98 feature_intervals: Genome features to examine for coordinate (start/end) overlaps. 

99 

100 Return: 

101 Set of intervals identified in the input GFF3 file that overlap with 2 or more intervals. 

102 """ 

103 

104 interval_sets = set() 

105 traversed_tree = IntervalTree(Interval(*iv) for iv in feature_intervals) 

106 

107 for interval in feature_intervals: 

108 if len(traversed_tree.overlap(interval[0], interval[1])) > 1: 

109 overlap_interval = traversed_tree.overlap(interval[0], interval[1]) 

110 

111 for features in overlap_interval: 

112 interval_sets.add(features.data) 

113 

114 return interval_sets 

115 

116 

117def _write_report(out_file: Path, seq_dict: dict, genes_dict: dict) -> int: 

118 """Write the final overlap report to output file. 

119 

120 Args: 

121 out_file: Output name of file to dump detected feature overlaps. 

122 seq_dict: Sequence features. 

123 genes_dict: Unique (+ | - stranded) overlap features. 

124 

125 Returns: 

126 Count of overlaps detected 

127 """ 

128 overlap_count = 0 

129 overlap_features = [] 

130 

131 for seq_name in seq_dict: 

132 logging.info(f"{seq_name} plus {len(seq_dict[seq_name]['plus'])}") 

133 logging.info(f"{seq_name} minus {len(seq_dict[seq_name]['minus'])}") 

134 

135 positive_hit = scan_tree(seq_dict[seq_name]["plus"]) 

136 logging.info(f"{len(positive_hit)} positive strand overlaps detected") 

137 

138 negative_hit = scan_tree(seq_dict[seq_name]["minus"]) 

139 logging.info(f"{len(negative_hit)} negative strand overlaps detected") 

140 

141 uniq_features: set = positive_hit.union(negative_hit) 

142 overlap_count = overlap_count + len(uniq_features) 

143 

144 for feature in uniq_features: 

145 format_feature = str(genes_dict[feature]).replace("'", '"') 

146 overlap_features.append(json.loads(format_feature)) 

147 print_json(out_file, overlap_features) 

148 

149 return overlap_count 

150 

151 

152def get_intervals(record: SeqRecord, genes_dict: dict, seq_dict: dict, seq_name: str) -> None: 

153 """Extract start/stop feature coordinates for use in creating intervaltree object. 

154 

155 Args: 

156 record: Individual sequence record. 

157 genes_dict: Genes. 

158 seq_dict: Sequences. 

159 seq_name: Feature sequence name. 

160 """ 

161 

162 for feature in record.features: 

163 genes_dict[str(feature.id)] = { 

164 "sequence": f"{record.id}", 

165 "start": f"{int(feature.location.start) + 1}", 

166 "end": f"{int(feature.location.end)}", 

167 "strand": f"{feature.location.strand}", 

168 "name": f"{feature.id}", 

169 } 

170 

171 if feature.location.strand == 1: 

172 seq_dict[seq_name]["plus"].append( 

173 (int(feature.location.start), int(feature.location.end), str(feature.id)) 

174 ) 

175 elif feature.location.strand == -1: 

176 seq_dict[seq_name]["minus"].append( 

177 (int(feature.location.start), int(feature.location.end), str(feature.id)) 

178 ) 

179 else: 

180 logging.critical("Something went wrong with the strand processing!") 

181 

182 

183def main() -> None: 

184 """Module entry-point.""" 

185 parser = ArgumentParser(description=__doc__) 

186 parser.add_argument("--version", action="version", version=ensembl.io.genomio.__version__) 

187 # Create parser with common arguments to be used by both subparsers 

188 base_parser = ArgumentParser(add_help=False) 

189 base_parser.add_argument_src_path("--input_gff", required=True, help="path of GFF3 file to process") 

190 base_parser.add_log_arguments(add_log_file=True) 

191 # Add subparsers with their parent being the base parser with the common arguments 

192 subparsers = parser.add_subparsers(title="Parse GFF3 and ", required=True, dest="subcommand") 

193 _ = subparsers.add_parser("stats", parents=[base_parser], help="Provide summary of feature types") 

194 overlaps_parser = subparsers.add_parser("overlaps", parents=[base_parser], help="Find feature overlaps") 

195 overlaps_parser.add_argument_dst_path( 

196 "--output_file", default="feature_overlaps.txt", help="path of output file" 

197 ) 

198 overlaps_parser.add_argument( 

199 "--filter_type", default="gene", help="sequence feature type used for overlap isolation" 

200 ) 

201 

202 args = parser.parse_args() 

203 init_logging_with_args(args) 

204 

205 logging.info("Starting processing...") 

206 logging.info(f"GFF input file = {str(args.input_gff)}") 

207 

208 # Check optional processing param 

209 if args.subcommand == "stats": 

210 summarize_feature_stats(args.input_gff) 

211 else: 

212 identify_feature_overlaps(args.input_gff, args.output_file, args.filter_type) 

213 

214 

215if __name__ == "__main__": 

216 main()