Coverage for src/python/ensembl/io/genomio/seq_region/collection.py: 100%

120 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-02-21 15:37 +0000

1# See the NOTICE file distributed with this work for additional information 

2# regarding copyright ownership. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15"""SeqCollection as a collection of seq_regions metadata.""" 

16 

17__all__ = [ 

18 "SeqCollection", 

19 "SeqRegionDict", 

20] 

21 

22import logging 

23from pathlib import Path 

24from typing import Any, Mapping, TypeAlias 

25 

26from Bio import SeqIO 

27import requests 

28 

29from ensembl.io.genomio.seq_region.gbff import GBFFRecord 

30from ensembl.io.genomio.seq_region.exceptions import UnknownMetadata 

31from ensembl.io.genomio.seq_region.mappings import SYNONYM_MAP, MOLECULE_LOCATION, LOCATION_CODON 

32from ensembl.io.genomio.seq_region.report import ReportRecord 

33from ensembl.utils.archive import open_gz_file 

34 

35SeqRegionDict: TypeAlias = dict[str, Any] 

36 

37 

38class SeqCollection: 

39 """Represent a collection of seq_regions metadata.""" 

40 

41 mock: bool 

42 seqs: dict 

43 

44 def __init__(self, mock: bool = False) -> None: 

45 self.seqs = {} 

46 self.mock = mock 

47 

48 def from_gbff(self, gbff_path: Path) -> None: 

49 """Store seq_regions extracted from a GBFF file. 

50 

51 If a seq_region with the same ID exists in the collection, it will be replaced. 

52 """ 

53 with open_gz_file(gbff_path) as gbff_file: 

54 for record in SeqIO.parse(gbff_file, "genbank"): 

55 record_data = GBFFRecord(record) 

56 new_seq: SeqRegionDict = self.make_seqregion_from_gbff(record_data) 

57 name = record.id 

58 merged_seq = self._merge(new_seq, self.seqs.get(name, {})) 

59 self.seqs[name] = merged_seq 

60 

61 def _merge(self, source: SeqRegionDict, destination: SeqRegionDict) -> SeqRegionDict: 

62 """Merge a source dict in a destination dict (only add values or append to lists).""" 

63 if not destination: 

64 return source 

65 for key, value in source.items(): 

66 if isinstance(value, list): 

67 destination[key] += value 

68 else: 

69 destination[key] = value 

70 

71 return destination 

72 

73 @staticmethod 

74 def make_seqregion_from_gbff(record_data: GBFFRecord) -> SeqRegionDict: 

75 """Returns a seq_region dict extracted from a GBFF record.""" 

76 seqr: SeqRegionDict = {"length": len(record_data.record.seq)} # type: ignore[arg-type] 

77 

78 if record_data.is_circular(): 

79 seqr["circular"] = True 

80 

81 # Is there a genetic code defined? 

82 codon_table = record_data.get_codon_table() 

83 if codon_table is not None: 

84 seqr["codon_table"] = codon_table 

85 

86 # Is it an organelle? 

87 location = record_data.get_organelle() 

88 if location is not None: 

89 seqr["location"] = location 

90 

91 # Is there a comment stating the Genbank record this is based on? 

92 genbank_id = record_data.get_genbank_id() 

93 if genbank_id is not None: 

94 seqr["synonyms"] = [{"source": "INSDC", "name": genbank_id}] 

95 

96 return seqr 

97 

98 def from_report(self, report_path: Path, is_refseq: bool = False) -> None: 

99 """Store seq_regions extracted from an INSDC assembly report file. 

100 

101 If a seq_region with the same id exists in the collection, it will be replaced. 

102 

103 Args: 

104 report_path: Path to the sequence regions report file. 

105 is_refseq: True if the source of the report is RefSeq, false if INSDC. 

106 

107 """ 

108 report = ReportRecord(report_path) 

109 for seq_data in report.reader: 

110 new_seq = self.make_seq_region_from_report(seq_data, is_refseq) 

111 name = new_seq["name"] 

112 merged_seq = self._merge(new_seq, self.seqs.get(name, {})) 

113 self.seqs[name] = merged_seq 

114 

115 @staticmethod 

116 def make_seq_region_from_report( 

117 seq_data: dict[str, Any], 

118 is_refseq: bool, 

119 synonym_map: Mapping[str, str] = SYNONYM_MAP, 

120 molecule_location: Mapping[str, str] = MOLECULE_LOCATION, 

121 ) -> SeqRegionDict: 

122 """Returns a sequence region from the information provided. 

123 

124 An empty sequence region will be returned if no accession information is found. 

125 

126 Args: 

127 data: Dict from the report representing one line, where the key is the column name. 

128 is_refseq: True if the source is RefSeq, false if INSDC. 

129 synonym_map: Map of INSDC report column names to sequence region field names. 

130 molecule_location: Map of sequence type to SO location. 

131 

132 Raises: 

133 UnknownMetadata: If the sequence role or location is not recognised. 

134 

135 """ 

136 seq_region = {} 

137 

138 # Set accession as the sequence region name 

139 src = "RefSeq" if is_refseq else "GenBank" 

140 accession_id = seq_data.get(f"{src}-Accn", "") 

141 if not accession_id or (accession_id == "na"): 

142 logging.warning(f'No {src} accession ID found for {seq_data["Sequence-Name"]}') 

143 return {} 

144 seq_region["name"] = accession_id 

145 

146 # Add synonyms 

147 synonyms = [] 

148 for field, source in synonym_map.items(): 

149 if (field in seq_data) and (seq_data[field].casefold() != "na"): 

150 synonym = {"source": source, "name": seq_data[field]} 

151 synonyms.append(synonym) 

152 synonyms.sort(key=lambda x: x["source"]) 

153 seq_region["synonyms"] = synonyms 

154 

155 # Add sequence length 

156 field = "Sequence-Length" 

157 if (field in seq_data) and (seq_data[field].casefold() != "na"): 

158 seq_region["length"] = int(seq_data[field]) 

159 

160 # Add coordinate system and location 

161 seq_role = seq_data["Sequence-Role"] 

162 # Scaffold? 

163 if seq_role in ("unplaced-scaffold", "unlocalized-scaffold"): 

164 seq_region["coord_system_level"] = "scaffold" 

165 # Chromosome? Check location 

166 elif seq_role == "assembled-molecule": 

167 seq_region["coord_system_level"] = "chromosome" 

168 location = seq_data["Assigned-Molecule-Location/Type"].lower() 

169 # Get location metadata 

170 try: 

171 seq_region["location"] = molecule_location[location] 

172 except KeyError as exc: 

173 raise UnknownMetadata(f"Unrecognized sequence location: {location}") from exc 

174 else: 

175 raise UnknownMetadata(f"Unrecognized sequence role: {seq_role}") 

176 

177 return seq_region 

178 

179 def remove(self, to_exclude: list[str]) -> None: 

180 """Remove seq_regions based on a provided list of accessions.""" 

181 for seq_name in to_exclude: 

182 if seq_name in self.seqs: 

183 del self.seqs[seq_name] 

184 else: 

185 logging.info(f"Cannot exclude seq not found: {seq_name}") 

186 

187 def add_translation_table(self, location_codon: Mapping[str, int] = LOCATION_CODON) -> None: 

188 """Adds the translation codon table to each sequence region (when missing) based on its location. 

189 

190 Args: 

191 location_codon: Map of known codon tables for known locations. 

192 

193 """ 

194 for seqr in self.seqs.values(): 

195 if "codon_table" in seqr: 

196 continue 

197 if seqr.get("location", "") in location_codon: 

198 seqr["codon_table"] = location_codon[seqr["location"]] 

199 

200 def add_mitochondrial_codon_table(self, taxon_id: int) -> None: 

201 """Adds the mitochondrial codon table to each sequence region (when missing) based on the taxon ID. 

202 

203 If no mitochondrial genetic code can be found for the given taxon ID nothing will be changed. 

204 

205 Args: 

206 taxon_id: The species taxon ID. 

207 

208 """ 

209 if self.mock: 

210 logging.info("Skip mitochondrial codon table: mock") 

211 return 

212 if not taxon_id: 

213 logging.info("Skip mitochondrial codon table: no taxon_id to use") 

214 return 

215 

216 url = f"https://www.ebi.ac.uk/ena/taxonomy/rest/tax-id/{str(taxon_id)}" 

217 response = requests.get(url, headers={"Content-Type": "application/json"}, timeout=60) 

218 response.raise_for_status() 

219 # In case we have been redirected, check for HTML opening tag 

220 if response.text.startswith("<"): 

221 raise ValueError(f"Response from {url} is not JSON") 

222 decoded = response.json() 

223 genetic_code = int(decoded.get("mitochondrialGeneticCode", 0)) 

224 if genetic_code == 0: 

225 logging.warning(f"No mitochondria genetic code found for taxon {taxon_id}") 

226 return 

227 

228 for seqr in self.seqs.values(): 

229 if ("codon_table" not in seqr) and (seqr.get("location", "") == "mitochondrial_chromosome"): 

230 seqr["codon_table"] = genetic_code 

231 

232 def to_list(self) -> list[SeqRegionDict]: 

233 """Returns the sequences as a simple list of `SeqRegionDict` objects.""" 

234 return list(self.seqs.values())