Coverage for src/python/ensembl/io/genomio/events/load.py: 0%

141 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-02-21 15:37 +0000

1# See the NOTICE file distributed with this work for additional information 

2# regarding copyright ownership. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15"""Provided a file with events, load them in a core database. 

16 

17cf the load_events functions for the events tab file format. 

18""" 

19 

20__all__ = ["IdEvent", "MapSession", "EventCollection"] 

21 

22from dataclasses import dataclass 

23from os import PathLike 

24from pathlib import Path 

25import re 

26import logging 

27from typing import Dict, Generator, List, Optional, Tuple 

28 

29from sqlalchemy.orm import Session 

30 

31import ensembl.io.genomio 

32from ensembl.io.genomio.database import DBConnectionLite 

33from ensembl.core.models import MappingSession, StableIdEvent 

34from ensembl.utils.argparse import ArgumentParser 

35from ensembl.utils.logging import init_logging_with_args 

36 

37 

38@dataclass 

39class IdEvent: 

40 """Simple representation for the events from the input file""" 

41 

42 from_id: str 

43 to_id: str 

44 event: str 

45 release: str 

46 release_date: str 

47 

48 def __str__(self) -> str: 

49 fields = [self.from_id, self.to_id, self.event, self.release, self.release_date] 

50 return "\t".join(fields) 

51 

52 def is_change(self) -> bool: 

53 """If the event is an update of an existing gene.""" 

54 changed_events = ("iso_gain", "iso_loss", "broken", "changed") 

55 return self.event in changed_events 

56 

57 

58class MapSession: 

59 """Simple mapping_sessions representation from the input file""" 

60 

61 def __init__(self, release: str, release_date: str) -> None: 

62 self.release = release 

63 self.release_date = release_date 

64 self.events: List[IdEvent] = [] 

65 

66 def add_event(self, event: IdEvent) -> None: 

67 """Add an event to this mapping_session""" 

68 self.events.append(event) 

69 

70 

71class EventCollection: 

72 """Collection of events with loader/writer in various formats.""" 

73 

74 def __init__(self) -> None: 

75 self.events: List[IdEvent] = [] 

76 

77 def load_events(self, input_file: PathLike) -> None: 

78 """Load events from input file. 

79 Expected tab file columns: old_id, new_id, event_name, release, release_date 

80 

81 """ 

82 events: List[IdEvent] = [] 

83 

84 with Path(input_file).open("r") as events_fh: 

85 for line in events_fh: 

86 line.strip() 

87 if line == "": 

88 continue 

89 (from_id, to_id, event_name, release, release_date) = line.split("\t") 

90 event = IdEvent( 

91 from_id=from_id, to_id=to_id, event=event_name, release=release, release_date=release_date 

92 ) 

93 events.append(event) 

94 self.events = events 

95 

96 def add_deletes( 

97 self, genes: List[str], release_name: str = "release_name", release_date: str = "release_date" 

98 ) -> None: 

99 """Add deletion events from a list of deleted genes.""" 

100 for gene_id in genes: 

101 event = IdEvent( 

102 from_id=gene_id, to_id="", event="deletion", release=release_name, release_date=release_date 

103 ) 

104 self.events.append(event) 

105 

106 def load_events_from_gene_diff( 

107 self, input_file: PathLike, release_name: str = "release_name", release_date: str = "release_date" 

108 ) -> None: 

109 """Load events from input file from gene_diff.""" 

110 loaded_event = set() 

111 

112 with Path(input_file).open("r") as events_fh: 

113 for line in events_fh: 

114 if line.startswith("//") or line == "": 

115 continue 

116 (_, event_string, _) = line.split("\t") 

117 for pair in self._parse_gene_diff_event(event_string): 

118 (from_id, to_id, event_name) = pair 

119 if event_name == "identical": 

120 continue 

121 fingerprint = f"{from_id} {to_id}" 

122 if fingerprint in loaded_event: 

123 logging.debug(f"Duplicated event, skipped: {fingerprint}") 

124 continue 

125 loaded_event.add(fingerprint) 

126 event = IdEvent( 

127 from_id=from_id, 

128 to_id=to_id, 

129 event=event_name, 

130 release=release_name, 

131 release_date=release_date, 

132 ) 

133 self.events.append(event) 

134 

135 def _parse_gene_diff_event(self, event_string: str) -> Generator[Tuple[str, str, str], None, None]: 

136 """Gets all the pairs of IDs from an event string from gene diff.""" 

137 event_symbol = { 

138 "~": "identical", 

139 "=+": "iso_gain", 

140 "=-": "iso_loss", 

141 "=!": "broken", 

142 "=": "changed", 

143 ">": "merge", 

144 "<": "split", 

145 "+": "new", 

146 } 

147 event_sep = r"|".join([symbol.replace(r"+", r"\+") for symbol in event_symbol]) 

148 splitter = f"({event_sep})" 

149 parts = re.split(splitter, event_string) 

150 if len(parts) != 3: 

151 logging.warning(f"Wrong partition: from '{event_string}' to '{parts}'") 

152 return 

153 [from_ids, sep, to_ids] = parts 

154 event_name = event_symbol[sep] 

155 

156 # Identical gene: no need to keep in the history 

157 for from_id in from_ids.split(":"): 

158 for to_id in to_ids.split(":"): 

159 yield (from_id, to_id, event_name) 

160 

161 def remap_to_ids(self, map_dict: Dict[str, str]) -> None: 

162 """Using a mapping dict, remap the to_id of all events. 

163 

164 Raises: 

165 ValueError: If there are events without map information. 

166 """ 

167 

168 no_map = 0 

169 for event in self.events: 

170 if not event.to_id: 

171 continue 

172 if event.is_change(): 

173 event.to_id = event.from_id 

174 elif event.to_id in map_dict: 

175 event.to_id = map_dict[event.to_id] 

176 else: 

177 logging.info(f"No map for to_id {event.to_id}") 

178 no_map += 1 

179 

180 if no_map: 

181 raise ValueError(f"No map for {no_map} event to_ids") 

182 

183 def write_events_to_file(self, output_file: PathLike) -> None: 

184 """Write the events to a file.""" 

185 with Path(output_file).open("w") as out_fh: 

186 logging.info(f"Write {len(self.events)} events to {output_file}") 

187 for event in self.events: 

188 out_fh.write(f"{event}\n") 

189 

190 def write_events_to_db(self, session: Session, update: bool = False) -> None: 

191 """Insert the events in the core database. 

192 A mapping session is created for each different 'release'. 

193 

194 """ 

195 # First, create mapping_sessions based on the release 

196 mappings: Dict[str, MapSession] = {} 

197 for event in self.events: 

198 release = event.release 

199 if release not in mappings: 

200 mappings[release] = MapSession(release, event.release_date) 

201 mappings[release].add_event(event) 

202 

203 # Then, add the mapping, and the events for this mapping 

204 for release, mapping in mappings.items(): 

205 if update: 

206 logging.info(f"Adding mapping for release {release} ({len(mapping.events)} events)") 

207 map_session = MappingSession(new_release=mapping.release, created=mapping.release_date) 

208 session.add(map_session) 

209 session.flush() 

210 session.refresh(map_session) 

211 for event in mapping.events: 

212 from_id: Optional[str] = event.from_id 

213 if from_id == "": 

214 from_id = None 

215 to_id: Optional[str] = event.to_id 

216 if to_id == "": 

217 to_id = None 

218 id_event = StableIdEvent( 

219 mapping_session_id=map_session.mapping_session_id, 

220 old_stable_id=from_id, 

221 new_stable_id=to_id, 

222 id_type="gene", 

223 old_version=1, 

224 new_version=1, 

225 ) 

226 session.add(id_event) 

227 session.commit() 

228 else: 

229 logging.info(f"Found mapping for release {release} ({len(mapping.events)} events)") 

230 if not update: 

231 logging.info("Run your command again with '--update' to add them") 

232 

233 

234def main() -> None: 

235 """Main entrypoint""" 

236 parser = ArgumentParser(description="Load the events in the input file into a core database.") 

237 parser.add_server_arguments(include_database=True) 

238 parser.add_argument_src_path( 

239 "--input_file", 

240 required=True, 

241 help=( 

242 "Input TSV file with events in the format exported by the dumper: old_id, new_id, event_name, " 

243 "release, date" 

244 ), 

245 ) 

246 parser.add_argument("--update", action="store_true", help="Make changes to the database") 

247 parser.add_argument("--version", action="version", version=ensembl.io.genomio.__version__) 

248 parser.add_log_arguments(add_log_file=True) 

249 args = parser.parse_args() 

250 init_logging_with_args(args) 

251 

252 # Start 

253 dbc = DBConnectionLite(args.url) 

254 collection = EventCollection() 

255 collection.load_events(args.input_file) 

256 

257 with dbc.session_scope() as session: 

258 collection.write_events_to_db(session, args.update) 

259 

260 

261if __name__ == "__main__": 

262 main()