Coverage for src/python/ensembl/io/genomio/events/dump.py: 0%
251 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-02-21 15:37 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-02-21 15:37 +0000
1# See the NOTICE file distributed with this work for additional information
2# regarding copyright ownership.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15"""Module to dump stable id events from an Ensembl Core database"""
17__all__ = [
18 "IdsSet",
19 "DictToIdsSet",
20 "BRC4_START_DATE",
21 "Pair",
22 "UnsupportedEvent",
23 "Event",
24 "DumpStableIDs",
25]
27from datetime import datetime
28from pathlib import Path
29from typing import List, Dict, Optional, Set, Tuple
30import logging
32from sqlalchemy import select, and_, or_
33from sqlalchemy.orm import Session
35import ensembl.io.genomio
36from ensembl.core.models import MappingSession, StableIdEvent
37from ensembl.io.genomio.database import DBConnectionLite
38from ensembl.utils.argparse import ArgumentParser
39from ensembl.utils.logging import init_logging_with_args
42BRC4_START_DATE = datetime(2020, 5, 1)
43IdsSet = Set[str]
44DictToIdsSet = Dict[str, IdsSet]
47class Pair:
48 """Simple old_id - new_id pair representation"""
50 def __init__(self, old_id: Optional[str], new_id: Optional[str]) -> None:
51 """Create a pair with an old_id and a new_id if provided"""
53 self.old_id = old_id if old_id is not None else ""
54 if new_id is not None:
55 self.new_id = new_id
56 else:
57 self.new_id = ""
59 def has_old_id(self) -> bool:
60 """Check if the pair has an old_id"""
61 return self.old_id != ""
63 def has_new_id(self) -> bool:
64 """Check if the pair has a new_id"""
65 return self.new_id != ""
67 def is_empty(self) -> bool:
68 """Test if the current pair has no id."""
70 return not (self.has_old_id() or self.has_new_id())
73class UnsupportedEvent(ValueError):
74 """If an event is not supported"""
77class Event:
78 """Represents a stable id event from one gene set version to another one. Various events:
79 - new genes
80 - deleted genes
81 - merged genes (several genes to one)
82 - split genes (one gene to several)
83 - mixed (several genes to several)
85 Attributes:
86 from_list: List of genes the previous gene set.
87 to_list: List of genes in the new gene set.
88 release: New gene set release name.
89 date: Date of the new gene set.
90 name: Name of the event (will be updated automatically).
91 pairs: All pair of ids for this event.
93 Any gene set before 2019-09 is dubbed pre-BRC4.
95 """
97 def __init__(
98 self,
99 from_list: Optional[Set[str]] = None,
100 to_list: Optional[Set[str]] = None,
101 release: Optional[str] = None,
102 date: Optional[datetime] = None,
103 ) -> None:
104 """Create a stable id event from a set of old_ids to a set of new_ids"""
106 if from_list is None:
107 from_list = set()
108 if to_list is None:
109 to_list = set()
110 self.from_set = self.clean_set(from_list)
111 self.to_set = self.clean_set(to_list)
112 self.release = release
113 self.date = date
114 self.name = ""
115 self.pairs: List[Pair] = []
117 def __str__(self) -> str:
118 """String representation of the stable id event"""
120 from_str = ",".join(self.from_set)
121 to_str = ",".join(self.to_set)
122 return f"From {from_str} to {to_str} = {self.get_name()} in release {self.release}"
124 def brc_format_1(self) -> List[str]:
125 """Returns a list events, one line per initial ID, in the following TSV format:
126 - old gene id
127 - event name
128 - release
129 - release date
130 - list of old gene ids in the event (comma-separated)
131 - list of new gene ids in the event (comma-separated)
133 """
134 from_str = ",".join(self.from_set)
135 to_str = ",".join(self.to_set)
136 release = self.get_full_release()
137 if self.date:
138 date = self.date.strftime("%Y-%m")
139 else:
140 date = "no_date"
141 name = self.get_name()
142 line_list = []
143 for identifier in self.from_set:
144 line = [
145 identifier,
146 name,
147 release,
148 date,
149 ]
150 if name in ("merge", "split", "mixed", "change"):
151 line.append(from_str)
152 line.append(to_str)
153 else:
154 line += ["", ""]
155 line_list.append("\t".join(line))
157 if self.get_name() == "new":
158 new_id = [self.to_set][0]
159 line = [new_id, name, release, date, "", ""]
160 line_list.append("\t".join(line))
161 return line_list
163 def brc_format_2(self) -> List[str]:
164 """Returns a list of combination of genes, one line per combination of old_id - new_ids, in the
165 following TSV format:
166 - old gene id
167 - new gene id
168 - event name
169 - release
170 - release date
172 """
173 release = self.get_full_release()
174 if self.date:
175 date = self.date.strftime("%Y-%m")
176 else:
177 date = "no_date"
178 name = self.get_name()
179 line_list = []
181 for pair in self.pairs:
182 line = [
183 pair.old_id,
184 pair.new_id,
185 name,
186 release,
187 date,
188 ]
189 line_list.append("\t".join(line))
190 return line_list
192 @staticmethod
193 def clean_set(this_list: Set) -> Set:
194 """Removes any empty elements from a list.
196 Args:
197 this_list: list of items, so of which can be empty/None.
199 Returns:
200 The cleaned list.
202 """
203 return {identifier for identifier in this_list if identifier}
205 def add_from(self, from_id: str) -> None:
206 """Store an id in the from_set."""
207 if from_id:
208 self.from_set.add(from_id)
210 def add_to(self, to_id: str) -> None:
211 """Store an id in the from_set."""
212 if to_id:
213 self.to_set.add(to_id)
215 def set_release(self, release: str) -> None:
216 """Set the release name of the event"""
217 self.release = release
219 def set_date(self, date: datetime) -> None:
220 """Set the date of the release for this event"""
221 self.date = date
223 def add_pair(self, pair: Pair) -> None:
224 """Keeps a record of this pair.
226 Args:
227 pair: a Pair to record.
229 Raises:
230 ValueError: can't add an empty pair.
232 """
233 if pair.is_empty():
234 raise ValueError(f"Expected at least one value in the given pair {pair}")
235 self.pairs.append(pair)
237 def get_full_release(self) -> str:
238 """Returns the expanded release name, pre-BRC4 or `BRC4 = build`."""
239 release = self.release
240 date = self.date
242 if date and date > BRC4_START_DATE:
243 release = f"build {release}"
244 else:
245 release = f"pre-BRC4 {release}"
247 return release
249 def _name_event(self) -> None:
250 """Identify the event name based on the old vs new id lists."""
251 if not self.from_set and len(self.to_set) == 1:
252 self.name = "new"
253 elif not self.to_set and len(self.from_set) == 1:
254 self.name = "deletion"
255 elif len(self.from_set) == 1 and len(self.to_set) == 1:
256 self.name = "change"
257 elif len(self.from_set) == 1 and len(self.to_set) > 1:
258 self.name = "split"
259 elif len(self.from_set) > 1 and len(self.to_set) == 1:
260 self.name = "merge"
261 elif len(self.from_set) > 1 and len(self.to_set) > 1:
262 self.name = "mixed"
263 else:
264 raise UnsupportedEvent(f"Event {self.from_set} to {self.to_set} is not supported")
266 def clean_pairs(self) -> None:
267 """Remove the empty old pairs when the event is not 'new'."""
268 if not self.name:
269 self._name_event()
271 if self.name != "new":
272 new_pairs = []
273 for pair in self.pairs:
274 if not pair.has_old_id():
275 continue
276 new_pairs.append(pair)
277 self.pairs = new_pairs
279 def get_name(self) -> str:
280 """Retrieve the name for this event, update it beforehand."""
281 self._name_event()
282 return self.name
284 def add_pairs(self, pairs: List[Pair]) -> None:
285 """Provided all the pairs, keep those that are used by this event.
287 Args:
288 pairs: list of Pair.
290 """
291 for pair in pairs:
292 if (pair.has_old_id() and pair.old_id in self.from_set) or (
293 pair.has_new_id() and pair.new_id in self.to_set
294 ):
295 # Core db contains an empty line to signify that an old id has been removed
296 # in merge/split/mixed
297 name = self.get_name()
298 if (name != "deletion") and not pair.has_new_id():
299 continue
300 self.add_pair(pair)
303class DumpStableIDs:
304 """An processor that create events from pairs of ids and can print those events out.
306 Attributes:
307 server: a core server set to a database, to retrieve the data from.
309 """
311 def __init__(self, session: Session) -> None:
312 """Create a processor for events"""
313 self.session = session
315 def get_history(self) -> List:
316 """Retrieve all events from a database.
318 Returns:
319 A list of all events.
321 """
323 sessions = self.get_mapping_sessions()
325 events = []
326 for session in sessions:
327 logging.info(f"Mapping session {session.new_release}")
328 pairs = self.get_pairs(session.mapping_session_id)
329 session_events = self.make_events(pairs)
330 for event in session_events:
331 event.set_release(session.new_release)
332 event.set_date(session.created)
333 events += session_events
335 # Then analyse the pairs to make events
336 return events
338 def print_events(self, events: List[Event], output_file: Path) -> None:
339 """Print events in a format for BRC.
341 Args:
342 events: list of events for a given genome.
343 output_file: where the events will be printed.
345 """
346 if not events:
347 logging.info("No events to print")
348 return
349 with output_file.open("w") as out_fh:
350 for event in events:
351 event_lines = event.brc_format_2()
352 for line in event_lines:
353 out_fh.write(line + "\n")
355 def get_mapping_sessions(self) -> List[MappingSession]:
356 """Retrieve the mapping sessions from the connected database.
358 Returns:
359 A list of sessions.
361 """
362 map_sessions_stmt = select(MappingSession)
363 map_sessions = list(self.session.scalars(map_sessions_stmt).unique().all())
364 return map_sessions
366 def get_pairs(self, session_id: int) -> List[Pair]:
367 """Retrieve all pair of ids for a given session.
369 Args:
370 session_id: id of a session from the connected database.
372 Returns:
373 All pairs of IDs.
375 """
377 id_events_stmt = (
378 select(StableIdEvent)
379 .where(
380 and_(
381 (StableIdEvent.mapping_session_id == session_id),
382 (StableIdEvent.id_type == "gene"),
383 (
384 or_(
385 (StableIdEvent.old_stable_id.is_(None)),
386 (StableIdEvent.new_stable_id.is_(None)),
387 (StableIdEvent.old_stable_id != StableIdEvent.new_stable_id),
388 )
389 ),
390 )
391 )
392 .group_by(
393 StableIdEvent.old_stable_id, StableIdEvent.new_stable_id, StableIdEvent.mapping_session_id
394 )
395 )
396 pairs: List[Pair] = []
397 for row in self.session.scalars(id_events_stmt).unique().all():
398 pair = Pair(row.old_stable_id, row.new_stable_id)
399 pairs.append(pair)
400 return pairs
402 def make_events(self, pairs: List[Pair]) -> List:
403 """Given a list of pairs, create events.
405 Args:
406 pairs: list of Pair.
408 Return:
409 A list of events.
411 """
413 from_list, to_list = self.get_pairs_from_to(pairs)
415 # Create events with those 2 dicts
416 events: List[Event] = []
417 for old_id, from_old_list in from_list.items():
418 if not old_id or old_id not in from_list:
419 continue
420 event = Event(set([old_id]), set(from_old_list))
421 (event, from_list, to_list) = self.extend_event(event, from_list, to_list)
422 event.add_pairs(pairs)
423 events.append(event)
425 # Remaining events should only be new genes
426 for new_id, to_new_list in to_list.items():
427 if not new_id:
428 continue
429 event = Event(set(to_new_list), set([new_id]))
430 event.add_pairs(pairs)
431 events.append(event)
433 stats = {}
434 for event in events:
435 name = event.get_name()
436 event.clean_pairs()
437 if name not in stats:
438 stats[name] = 1
439 else:
440 stats[name] += 1
442 for stat, value in stats.items():
443 logging.info(f"\t{stat} = {value}")
445 return events
447 @staticmethod
448 def get_pairs_from_to(pairs: List[Pair]) -> Tuple[DictToIdsSet, DictToIdsSet]:
449 """
450 From a list of Pairs, extract a mapping of all ids from a given old id (from_list),
451 and a mapping of all ids to a given new id (to_list).
453 Args:
454 pairs: list of Pairs.
456 Return:
457 Tuple of 2 values:
458 from_list
459 to_list
461 """
462 from_list: DictToIdsSet = {}
463 to_list: DictToIdsSet = {}
464 for pair in pairs:
465 old_id = pair.old_id
466 new_id = pair.new_id
467 if old_id is None:
468 old_id = ""
469 if new_id is None:
470 new_id = ""
472 if old_id in from_list:
473 from_list[old_id].add(new_id)
474 else:
475 from_list[old_id] = set([new_id])
477 if new_id in to_list:
478 to_list[new_id].add(old_id)
479 else:
480 to_list[new_id] = set([old_id])
482 # Remove empty elements
483 for from_id in from_list:
484 from_list[from_id] = Event.clean_set(from_list[from_id])
485 for to_id in to_list:
486 to_list[to_id] = Event.clean_set(to_list[to_id])
488 return from_list, to_list
490 def extend_event(
491 self, event: Event, from_list: DictToIdsSet, to_list: DictToIdsSet
492 ) -> Tuple[Event, DictToIdsSet, DictToIdsSet]:
493 """Given an event, aggregate ids in the 'from' and 'to' sets, to connect the whole group.
495 Args:
496 event: the event to extend.
497 from_list: A dict a the from ids, and their corresponding to ids.
498 to_list: A dict of the to ids, and their corresponding from ids.
500 Returns:
501 A tuple of the extended event, and the from_list and to_list from which the ids that
502 have been added to the event have been removed.
504 """
506 extended = True
508 while extended:
509 extended = False
511 # Extend the group in the to ids
512 for to_id in event.to_set:
513 if to_id in to_list:
514 to_from_ids: IdsSet = to_list[to_id]
515 # Add to the from list?
516 for to_from_id in to_from_ids:
517 if to_from_id not in event.from_set:
518 event.add_from(to_from_id)
519 extended = True
521 # Extend the group in the from ids
522 for from_id in event.from_set:
523 if from_id in from_list:
524 from_to_ids = from_list[from_id]
525 # Add to the to list?
526 for from_to_id in from_to_ids:
527 if from_to_id not in event.to_set:
528 event.add_to(from_to_id)
529 extended = True
531 # Clean up
532 from_list = {from_id: from_list[from_id] for from_id in from_list if from_id not in event.from_set}
533 to_list = {to_id: to_list[to_id] for to_id in to_list if to_id not in event.to_set}
535 return (event, from_list, to_list)
538def main() -> None:
539 """Main entrypoint"""
540 parser = ArgumentParser(
541 description="Dump the stable ID events from the information available in a core database."
542 )
543 parser.add_server_arguments(include_database=True)
544 parser.add_argument_dst_path("--output_file", required=True, help="Output file")
545 parser.add_argument("--version", action="version", version=ensembl.io.genomio.__version__)
546 parser.add_log_arguments(add_log_file=True)
547 args = parser.parse_args()
548 init_logging_with_args(args)
550 dbc = DBConnectionLite(args.url)
551 with dbc.session_scope() as session:
552 dumper = DumpStableIDs(session)
553 events = dumper.get_history()
554 dumper.print_events(events, args.output_file)
557if __name__ == "__main__":
558 main()