Coverage for src/python/ensembl/io/genomio/events/load.py: 0%
141 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-02-21 15:37 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-02-21 15:37 +0000
1# See the NOTICE file distributed with this work for additional information
2# regarding copyright ownership.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15"""Provided a file with events, load them in a core database.
17cf the load_events functions for the events tab file format.
18"""
20__all__ = ["IdEvent", "MapSession", "EventCollection"]
22from dataclasses import dataclass
23from os import PathLike
24from pathlib import Path
25import re
26import logging
27from typing import Dict, Generator, List, Optional, Tuple
29from sqlalchemy.orm import Session
31import ensembl.io.genomio
32from ensembl.io.genomio.database import DBConnectionLite
33from ensembl.core.models import MappingSession, StableIdEvent
34from ensembl.utils.argparse import ArgumentParser
35from ensembl.utils.logging import init_logging_with_args
38@dataclass
39class IdEvent:
40 """Simple representation for the events from the input file"""
42 from_id: str
43 to_id: str
44 event: str
45 release: str
46 release_date: str
48 def __str__(self) -> str:
49 fields = [self.from_id, self.to_id, self.event, self.release, self.release_date]
50 return "\t".join(fields)
52 def is_change(self) -> bool:
53 """If the event is an update of an existing gene."""
54 changed_events = ("iso_gain", "iso_loss", "broken", "changed")
55 return self.event in changed_events
58class MapSession:
59 """Simple mapping_sessions representation from the input file"""
61 def __init__(self, release: str, release_date: str) -> None:
62 self.release = release
63 self.release_date = release_date
64 self.events: List[IdEvent] = []
66 def add_event(self, event: IdEvent) -> None:
67 """Add an event to this mapping_session"""
68 self.events.append(event)
71class EventCollection:
72 """Collection of events with loader/writer in various formats."""
74 def __init__(self) -> None:
75 self.events: List[IdEvent] = []
77 def load_events(self, input_file: PathLike) -> None:
78 """Load events from input file.
79 Expected tab file columns: old_id, new_id, event_name, release, release_date
81 """
82 events: List[IdEvent] = []
84 with Path(input_file).open("r") as events_fh:
85 for line in events_fh:
86 line.strip()
87 if line == "":
88 continue
89 (from_id, to_id, event_name, release, release_date) = line.split("\t")
90 event = IdEvent(
91 from_id=from_id, to_id=to_id, event=event_name, release=release, release_date=release_date
92 )
93 events.append(event)
94 self.events = events
96 def add_deletes(
97 self, genes: List[str], release_name: str = "release_name", release_date: str = "release_date"
98 ) -> None:
99 """Add deletion events from a list of deleted genes."""
100 for gene_id in genes:
101 event = IdEvent(
102 from_id=gene_id, to_id="", event="deletion", release=release_name, release_date=release_date
103 )
104 self.events.append(event)
106 def load_events_from_gene_diff(
107 self, input_file: PathLike, release_name: str = "release_name", release_date: str = "release_date"
108 ) -> None:
109 """Load events from input file from gene_diff."""
110 loaded_event = set()
112 with Path(input_file).open("r") as events_fh:
113 for line in events_fh:
114 if line.startswith("//") or line == "":
115 continue
116 (_, event_string, _) = line.split("\t")
117 for pair in self._parse_gene_diff_event(event_string):
118 (from_id, to_id, event_name) = pair
119 if event_name == "identical":
120 continue
121 fingerprint = f"{from_id} {to_id}"
122 if fingerprint in loaded_event:
123 logging.debug(f"Duplicated event, skipped: {fingerprint}")
124 continue
125 loaded_event.add(fingerprint)
126 event = IdEvent(
127 from_id=from_id,
128 to_id=to_id,
129 event=event_name,
130 release=release_name,
131 release_date=release_date,
132 )
133 self.events.append(event)
135 def _parse_gene_diff_event(self, event_string: str) -> Generator[Tuple[str, str, str], None, None]:
136 """Gets all the pairs of IDs from an event string from gene diff."""
137 event_symbol = {
138 "~": "identical",
139 "=+": "iso_gain",
140 "=-": "iso_loss",
141 "=!": "broken",
142 "=": "changed",
143 ">": "merge",
144 "<": "split",
145 "+": "new",
146 }
147 event_sep = r"|".join([symbol.replace(r"+", r"\+") for symbol in event_symbol])
148 splitter = f"({event_sep})"
149 parts = re.split(splitter, event_string)
150 if len(parts) != 3:
151 logging.warning(f"Wrong partition: from '{event_string}' to '{parts}'")
152 return
153 [from_ids, sep, to_ids] = parts
154 event_name = event_symbol[sep]
156 # Identical gene: no need to keep in the history
157 for from_id in from_ids.split(":"):
158 for to_id in to_ids.split(":"):
159 yield (from_id, to_id, event_name)
161 def remap_to_ids(self, map_dict: Dict[str, str]) -> None:
162 """Using a mapping dict, remap the to_id of all events.
164 Raises:
165 ValueError: If there are events without map information.
166 """
168 no_map = 0
169 for event in self.events:
170 if not event.to_id:
171 continue
172 if event.is_change():
173 event.to_id = event.from_id
174 elif event.to_id in map_dict:
175 event.to_id = map_dict[event.to_id]
176 else:
177 logging.info(f"No map for to_id {event.to_id}")
178 no_map += 1
180 if no_map:
181 raise ValueError(f"No map for {no_map} event to_ids")
183 def write_events_to_file(self, output_file: PathLike) -> None:
184 """Write the events to a file."""
185 with Path(output_file).open("w") as out_fh:
186 logging.info(f"Write {len(self.events)} events to {output_file}")
187 for event in self.events:
188 out_fh.write(f"{event}\n")
190 def write_events_to_db(self, session: Session, update: bool = False) -> None:
191 """Insert the events in the core database.
192 A mapping session is created for each different 'release'.
194 """
195 # First, create mapping_sessions based on the release
196 mappings: Dict[str, MapSession] = {}
197 for event in self.events:
198 release = event.release
199 if release not in mappings:
200 mappings[release] = MapSession(release, event.release_date)
201 mappings[release].add_event(event)
203 # Then, add the mapping, and the events for this mapping
204 for release, mapping in mappings.items():
205 if update:
206 logging.info(f"Adding mapping for release {release} ({len(mapping.events)} events)")
207 map_session = MappingSession(new_release=mapping.release, created=mapping.release_date)
208 session.add(map_session)
209 session.flush()
210 session.refresh(map_session)
211 for event in mapping.events:
212 from_id: Optional[str] = event.from_id
213 if from_id == "":
214 from_id = None
215 to_id: Optional[str] = event.to_id
216 if to_id == "":
217 to_id = None
218 id_event = StableIdEvent(
219 mapping_session_id=map_session.mapping_session_id,
220 old_stable_id=from_id,
221 new_stable_id=to_id,
222 id_type="gene",
223 old_version=1,
224 new_version=1,
225 )
226 session.add(id_event)
227 session.commit()
228 else:
229 logging.info(f"Found mapping for release {release} ({len(mapping.events)} events)")
230 if not update:
231 logging.info("Run your command again with '--update' to add them")
234def main() -> None:
235 """Main entrypoint"""
236 parser = ArgumentParser(description="Load the events in the input file into a core database.")
237 parser.add_server_arguments(include_database=True)
238 parser.add_argument_src_path(
239 "--input_file",
240 required=True,
241 help=(
242 "Input TSV file with events in the format exported by the dumper: old_id, new_id, event_name, "
243 "release, date"
244 ),
245 )
246 parser.add_argument("--update", action="store_true", help="Make changes to the database")
247 parser.add_argument("--version", action="version", version=ensembl.io.genomio.__version__)
248 parser.add_log_arguments(add_log_file=True)
249 args = parser.parse_args()
250 init_logging_with_args(args)
252 # Start
253 dbc = DBConnectionLite(args.url)
254 collection = EventCollection()
255 collection.load_events(args.input_file)
257 with dbc.session_scope() as session:
258 collection.write_events_to_db(session, args.update)
261if __name__ == "__main__":
262 main()