Coverage for src/python/ensembl/io/genomio/events/dump.py: 0%

251 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-02-21 15:37 +0000

1# See the NOTICE file distributed with this work for additional information 

2# regarding copyright ownership. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15"""Module to dump stable id events from an Ensembl Core database""" 

16 

17__all__ = [ 

18 "IdsSet", 

19 "DictToIdsSet", 

20 "BRC4_START_DATE", 

21 "Pair", 

22 "UnsupportedEvent", 

23 "Event", 

24 "DumpStableIDs", 

25] 

26 

27from datetime import datetime 

28from pathlib import Path 

29from typing import List, Dict, Optional, Set, Tuple 

30import logging 

31 

32from sqlalchemy import select, and_, or_ 

33from sqlalchemy.orm import Session 

34 

35import ensembl.io.genomio 

36from ensembl.core.models import MappingSession, StableIdEvent 

37from ensembl.io.genomio.database import DBConnectionLite 

38from ensembl.utils.argparse import ArgumentParser 

39from ensembl.utils.logging import init_logging_with_args 

40 

41 

42BRC4_START_DATE = datetime(2020, 5, 1) 

43IdsSet = Set[str] 

44DictToIdsSet = Dict[str, IdsSet] 

45 

46 

47class Pair: 

48 """Simple old_id - new_id pair representation""" 

49 

50 def __init__(self, old_id: Optional[str], new_id: Optional[str]) -> None: 

51 """Create a pair with an old_id and a new_id if provided""" 

52 

53 self.old_id = old_id if old_id is not None else "" 

54 if new_id is not None: 

55 self.new_id = new_id 

56 else: 

57 self.new_id = "" 

58 

59 def has_old_id(self) -> bool: 

60 """Check if the pair has an old_id""" 

61 return self.old_id != "" 

62 

63 def has_new_id(self) -> bool: 

64 """Check if the pair has a new_id""" 

65 return self.new_id != "" 

66 

67 def is_empty(self) -> bool: 

68 """Test if the current pair has no id.""" 

69 

70 return not (self.has_old_id() or self.has_new_id()) 

71 

72 

73class UnsupportedEvent(ValueError): 

74 """If an event is not supported""" 

75 

76 

77class Event: 

78 """Represents a stable id event from one gene set version to another one. Various events: 

79 - new genes 

80 - deleted genes 

81 - merged genes (several genes to one) 

82 - split genes (one gene to several) 

83 - mixed (several genes to several) 

84 

85 Attributes: 

86 from_list: List of genes the previous gene set. 

87 to_list: List of genes in the new gene set. 

88 release: New gene set release name. 

89 date: Date of the new gene set. 

90 name: Name of the event (will be updated automatically). 

91 pairs: All pair of ids for this event. 

92 

93 Any gene set before 2019-09 is dubbed pre-BRC4. 

94 

95 """ 

96 

97 def __init__( 

98 self, 

99 from_list: Optional[Set[str]] = None, 

100 to_list: Optional[Set[str]] = None, 

101 release: Optional[str] = None, 

102 date: Optional[datetime] = None, 

103 ) -> None: 

104 """Create a stable id event from a set of old_ids to a set of new_ids""" 

105 

106 if from_list is None: 

107 from_list = set() 

108 if to_list is None: 

109 to_list = set() 

110 self.from_set = self.clean_set(from_list) 

111 self.to_set = self.clean_set(to_list) 

112 self.release = release 

113 self.date = date 

114 self.name = "" 

115 self.pairs: List[Pair] = [] 

116 

117 def __str__(self) -> str: 

118 """String representation of the stable id event""" 

119 

120 from_str = ",".join(self.from_set) 

121 to_str = ",".join(self.to_set) 

122 return f"From {from_str} to {to_str} = {self.get_name()} in release {self.release}" 

123 

124 def brc_format_1(self) -> List[str]: 

125 """Returns a list events, one line per initial ID, in the following TSV format: 

126 - old gene id 

127 - event name 

128 - release 

129 - release date 

130 - list of old gene ids in the event (comma-separated) 

131 - list of new gene ids in the event (comma-separated) 

132 

133 """ 

134 from_str = ",".join(self.from_set) 

135 to_str = ",".join(self.to_set) 

136 release = self.get_full_release() 

137 if self.date: 

138 date = self.date.strftime("%Y-%m") 

139 else: 

140 date = "no_date" 

141 name = self.get_name() 

142 line_list = [] 

143 for identifier in self.from_set: 

144 line = [ 

145 identifier, 

146 name, 

147 release, 

148 date, 

149 ] 

150 if name in ("merge", "split", "mixed", "change"): 

151 line.append(from_str) 

152 line.append(to_str) 

153 else: 

154 line += ["", ""] 

155 line_list.append("\t".join(line)) 

156 

157 if self.get_name() == "new": 

158 new_id = [self.to_set][0] 

159 line = [new_id, name, release, date, "", ""] 

160 line_list.append("\t".join(line)) 

161 return line_list 

162 

163 def brc_format_2(self) -> List[str]: 

164 """Returns a list of combination of genes, one line per combination of old_id - new_ids, in the 

165 following TSV format: 

166 - old gene id 

167 - new gene id 

168 - event name 

169 - release 

170 - release date 

171 

172 """ 

173 release = self.get_full_release() 

174 if self.date: 

175 date = self.date.strftime("%Y-%m") 

176 else: 

177 date = "no_date" 

178 name = self.get_name() 

179 line_list = [] 

180 

181 for pair in self.pairs: 

182 line = [ 

183 pair.old_id, 

184 pair.new_id, 

185 name, 

186 release, 

187 date, 

188 ] 

189 line_list.append("\t".join(line)) 

190 return line_list 

191 

192 @staticmethod 

193 def clean_set(this_list: Set) -> Set: 

194 """Removes any empty elements from a list. 

195 

196 Args: 

197 this_list: list of items, so of which can be empty/None. 

198 

199 Returns: 

200 The cleaned list. 

201 

202 """ 

203 return {identifier for identifier in this_list if identifier} 

204 

205 def add_from(self, from_id: str) -> None: 

206 """Store an id in the from_set.""" 

207 if from_id: 

208 self.from_set.add(from_id) 

209 

210 def add_to(self, to_id: str) -> None: 

211 """Store an id in the from_set.""" 

212 if to_id: 

213 self.to_set.add(to_id) 

214 

215 def set_release(self, release: str) -> None: 

216 """Set the release name of the event""" 

217 self.release = release 

218 

219 def set_date(self, date: datetime) -> None: 

220 """Set the date of the release for this event""" 

221 self.date = date 

222 

223 def add_pair(self, pair: Pair) -> None: 

224 """Keeps a record of this pair. 

225 

226 Args: 

227 pair: a Pair to record. 

228 

229 Raises: 

230 ValueError: can't add an empty pair. 

231 

232 """ 

233 if pair.is_empty(): 

234 raise ValueError(f"Expected at least one value in the given pair {pair}") 

235 self.pairs.append(pair) 

236 

237 def get_full_release(self) -> str: 

238 """Returns the expanded release name, pre-BRC4 or `BRC4 = build`.""" 

239 release = self.release 

240 date = self.date 

241 

242 if date and date > BRC4_START_DATE: 

243 release = f"build {release}" 

244 else: 

245 release = f"pre-BRC4 {release}" 

246 

247 return release 

248 

249 def _name_event(self) -> None: 

250 """Identify the event name based on the old vs new id lists.""" 

251 if not self.from_set and len(self.to_set) == 1: 

252 self.name = "new" 

253 elif not self.to_set and len(self.from_set) == 1: 

254 self.name = "deletion" 

255 elif len(self.from_set) == 1 and len(self.to_set) == 1: 

256 self.name = "change" 

257 elif len(self.from_set) == 1 and len(self.to_set) > 1: 

258 self.name = "split" 

259 elif len(self.from_set) > 1 and len(self.to_set) == 1: 

260 self.name = "merge" 

261 elif len(self.from_set) > 1 and len(self.to_set) > 1: 

262 self.name = "mixed" 

263 else: 

264 raise UnsupportedEvent(f"Event {self.from_set} to {self.to_set} is not supported") 

265 

266 def clean_pairs(self) -> None: 

267 """Remove the empty old pairs when the event is not 'new'.""" 

268 if not self.name: 

269 self._name_event() 

270 

271 if self.name != "new": 

272 new_pairs = [] 

273 for pair in self.pairs: 

274 if not pair.has_old_id(): 

275 continue 

276 new_pairs.append(pair) 

277 self.pairs = new_pairs 

278 

279 def get_name(self) -> str: 

280 """Retrieve the name for this event, update it beforehand.""" 

281 self._name_event() 

282 return self.name 

283 

284 def add_pairs(self, pairs: List[Pair]) -> None: 

285 """Provided all the pairs, keep those that are used by this event. 

286 

287 Args: 

288 pairs: list of Pair. 

289 

290 """ 

291 for pair in pairs: 

292 if (pair.has_old_id() and pair.old_id in self.from_set) or ( 

293 pair.has_new_id() and pair.new_id in self.to_set 

294 ): 

295 # Core db contains an empty line to signify that an old id has been removed 

296 # in merge/split/mixed 

297 name = self.get_name() 

298 if (name != "deletion") and not pair.has_new_id(): 

299 continue 

300 self.add_pair(pair) 

301 

302 

303class DumpStableIDs: 

304 """An processor that create events from pairs of ids and can print those events out. 

305 

306 Attributes: 

307 server: a core server set to a database, to retrieve the data from. 

308 

309 """ 

310 

311 def __init__(self, session: Session) -> None: 

312 """Create a processor for events""" 

313 self.session = session 

314 

315 def get_history(self) -> List: 

316 """Retrieve all events from a database. 

317 

318 Returns: 

319 A list of all events. 

320 

321 """ 

322 

323 sessions = self.get_mapping_sessions() 

324 

325 events = [] 

326 for session in sessions: 

327 logging.info(f"Mapping session {session.new_release}") 

328 pairs = self.get_pairs(session.mapping_session_id) 

329 session_events = self.make_events(pairs) 

330 for event in session_events: 

331 event.set_release(session.new_release) 

332 event.set_date(session.created) 

333 events += session_events 

334 

335 # Then analyse the pairs to make events 

336 return events 

337 

338 def print_events(self, events: List[Event], output_file: Path) -> None: 

339 """Print events in a format for BRC. 

340 

341 Args: 

342 events: list of events for a given genome. 

343 output_file: where the events will be printed. 

344 

345 """ 

346 if not events: 

347 logging.info("No events to print") 

348 return 

349 with output_file.open("w") as out_fh: 

350 for event in events: 

351 event_lines = event.brc_format_2() 

352 for line in event_lines: 

353 out_fh.write(line + "\n") 

354 

355 def get_mapping_sessions(self) -> List[MappingSession]: 

356 """Retrieve the mapping sessions from the connected database. 

357 

358 Returns: 

359 A list of sessions. 

360 

361 """ 

362 map_sessions_stmt = select(MappingSession) 

363 map_sessions = list(self.session.scalars(map_sessions_stmt).unique().all()) 

364 return map_sessions 

365 

366 def get_pairs(self, session_id: int) -> List[Pair]: 

367 """Retrieve all pair of ids for a given session. 

368 

369 Args: 

370 session_id: id of a session from the connected database. 

371 

372 Returns: 

373 All pairs of IDs. 

374 

375 """ 

376 

377 id_events_stmt = ( 

378 select(StableIdEvent) 

379 .where( 

380 and_( 

381 (StableIdEvent.mapping_session_id == session_id), 

382 (StableIdEvent.id_type == "gene"), 

383 ( 

384 or_( 

385 (StableIdEvent.old_stable_id.is_(None)), 

386 (StableIdEvent.new_stable_id.is_(None)), 

387 (StableIdEvent.old_stable_id != StableIdEvent.new_stable_id), 

388 ) 

389 ), 

390 ) 

391 ) 

392 .group_by( 

393 StableIdEvent.old_stable_id, StableIdEvent.new_stable_id, StableIdEvent.mapping_session_id 

394 ) 

395 ) 

396 pairs: List[Pair] = [] 

397 for row in self.session.scalars(id_events_stmt).unique().all(): 

398 pair = Pair(row.old_stable_id, row.new_stable_id) 

399 pairs.append(pair) 

400 return pairs 

401 

402 def make_events(self, pairs: List[Pair]) -> List: 

403 """Given a list of pairs, create events. 

404 

405 Args: 

406 pairs: list of Pair. 

407 

408 Return: 

409 A list of events. 

410 

411 """ 

412 

413 from_list, to_list = self.get_pairs_from_to(pairs) 

414 

415 # Create events with those 2 dicts 

416 events: List[Event] = [] 

417 for old_id, from_old_list in from_list.items(): 

418 if not old_id or old_id not in from_list: 

419 continue 

420 event = Event(set([old_id]), set(from_old_list)) 

421 (event, from_list, to_list) = self.extend_event(event, from_list, to_list) 

422 event.add_pairs(pairs) 

423 events.append(event) 

424 

425 # Remaining events should only be new genes 

426 for new_id, to_new_list in to_list.items(): 

427 if not new_id: 

428 continue 

429 event = Event(set(to_new_list), set([new_id])) 

430 event.add_pairs(pairs) 

431 events.append(event) 

432 

433 stats = {} 

434 for event in events: 

435 name = event.get_name() 

436 event.clean_pairs() 

437 if name not in stats: 

438 stats[name] = 1 

439 else: 

440 stats[name] += 1 

441 

442 for stat, value in stats.items(): 

443 logging.info(f"\t{stat} = {value}") 

444 

445 return events 

446 

447 @staticmethod 

448 def get_pairs_from_to(pairs: List[Pair]) -> Tuple[DictToIdsSet, DictToIdsSet]: 

449 """ 

450 From a list of Pairs, extract a mapping of all ids from a given old id (from_list), 

451 and a mapping of all ids to a given new id (to_list). 

452 

453 Args: 

454 pairs: list of Pairs. 

455 

456 Return: 

457 Tuple of 2 values: 

458 from_list 

459 to_list 

460 

461 """ 

462 from_list: DictToIdsSet = {} 

463 to_list: DictToIdsSet = {} 

464 for pair in pairs: 

465 old_id = pair.old_id 

466 new_id = pair.new_id 

467 if old_id is None: 

468 old_id = "" 

469 if new_id is None: 

470 new_id = "" 

471 

472 if old_id in from_list: 

473 from_list[old_id].add(new_id) 

474 else: 

475 from_list[old_id] = set([new_id]) 

476 

477 if new_id in to_list: 

478 to_list[new_id].add(old_id) 

479 else: 

480 to_list[new_id] = set([old_id]) 

481 

482 # Remove empty elements 

483 for from_id in from_list: 

484 from_list[from_id] = Event.clean_set(from_list[from_id]) 

485 for to_id in to_list: 

486 to_list[to_id] = Event.clean_set(to_list[to_id]) 

487 

488 return from_list, to_list 

489 

490 def extend_event( 

491 self, event: Event, from_list: DictToIdsSet, to_list: DictToIdsSet 

492 ) -> Tuple[Event, DictToIdsSet, DictToIdsSet]: 

493 """Given an event, aggregate ids in the 'from' and 'to' sets, to connect the whole group. 

494 

495 Args: 

496 event: the event to extend. 

497 from_list: A dict a the from ids, and their corresponding to ids. 

498 to_list: A dict of the to ids, and their corresponding from ids. 

499 

500 Returns: 

501 A tuple of the extended event, and the from_list and to_list from which the ids that 

502 have been added to the event have been removed. 

503 

504 """ 

505 

506 extended = True 

507 

508 while extended: 

509 extended = False 

510 

511 # Extend the group in the to ids 

512 for to_id in event.to_set: 

513 if to_id in to_list: 

514 to_from_ids: IdsSet = to_list[to_id] 

515 # Add to the from list? 

516 for to_from_id in to_from_ids: 

517 if to_from_id not in event.from_set: 

518 event.add_from(to_from_id) 

519 extended = True 

520 

521 # Extend the group in the from ids 

522 for from_id in event.from_set: 

523 if from_id in from_list: 

524 from_to_ids = from_list[from_id] 

525 # Add to the to list? 

526 for from_to_id in from_to_ids: 

527 if from_to_id not in event.to_set: 

528 event.add_to(from_to_id) 

529 extended = True 

530 

531 # Clean up 

532 from_list = {from_id: from_list[from_id] for from_id in from_list if from_id not in event.from_set} 

533 to_list = {to_id: to_list[to_id] for to_id in to_list if to_id not in event.to_set} 

534 

535 return (event, from_list, to_list) 

536 

537 

538def main() -> None: 

539 """Main entrypoint""" 

540 parser = ArgumentParser( 

541 description="Dump the stable ID events from the information available in a core database." 

542 ) 

543 parser.add_server_arguments(include_database=True) 

544 parser.add_argument_dst_path("--output_file", required=True, help="Output file") 

545 parser.add_argument("--version", action="version", version=ensembl.io.genomio.__version__) 

546 parser.add_log_arguments(add_log_file=True) 

547 args = parser.parse_args() 

548 init_logging_with_args(args) 

549 

550 dbc = DBConnectionLite(args.url) 

551 with dbc.session_scope() as session: 

552 dumper = DumpStableIDs(session) 

553 events = dumper.get_history() 

554 dumper.print_events(events, args.output_file) 

555 

556 

557if __name__ == "__main__": 

558 main()