Coverage for src/python/ensembl/io/genomio/assembly/status.py: 89%

180 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-02-21 15:37 +0000

1# See the NOTICE file distributed with this work for additional information 

2# regarding copyright ownership. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15"""Obtain and record the assembly status for a set of INSDC accession(s) using NCBI's datasets CLI tool.""" 

16 

17__all__ = [ 

18 "extract_assembly_metadata", 

19 "fetch_datasets_reports", 

20 "fetch_accessions_from_core_dbs", 

21 "generate_report_tsv", 

22 "get_assembly_accessions", 

23 "singularity_image_setter", 

24] 

25 

26import csv 

27from dataclasses import dataclass 

28import json 

29import logging 

30import os 

31from pathlib import Path 

32import re 

33 

34from spython.main import Client 

35from sqlalchemy.engine import URL 

36from sqlalchemy import text 

37 

38import ensembl.io.genomio 

39from ensembl.io.genomio.utils.json_utils import print_json 

40from ensembl.utils import StrPath 

41from ensembl.utils.argparse import ArgumentParser 

42from ensembl.utils.database import DBConnection 

43from ensembl.utils.logging import init_logging_with_args 

44 

45 

46DATASETS_SINGULARITY = { 

47 "datasets_version_url": "docker://ensemblorg/datasets-cli:latest", 

48} 

49 

50 

51class UnsupportedFormatError(Exception): 

52 """When a string does not have the expected format.""" 

53 

54 

55@dataclass 

56class ReportStructure: 

57 """Stores key report meta information.""" 

58 

59 species_name: str = "" 

60 taxon_id: int = 0 

61 strain: str = "NA" 

62 assembly_name: str = "" 

63 assembly_type: str = "" 

64 accession: str = "" 

65 paired_assembly: str = "NA" 

66 last_updated: str = "" 

67 assembly_status: str = "NA" 

68 assembly_notes: str = "NA" 

69 

70 def to_dict(self) -> dict[str, str]: 

71 """Returns a dictionary representation of this object.""" 

72 return { 

73 "Species Name": self.species_name, 

74 "Taxon ID": str(self.taxon_id), 

75 "Isolate/Strain": self.strain, 

76 "Asm name": self.assembly_name, 

77 "Assembly type": self.assembly_type, 

78 "Asm accession": self.accession, 

79 "Paired assembly": self.paired_assembly, 

80 "Asm last updated": self.last_updated, 

81 "Asm status": self.assembly_status, 

82 "Asm notes": self.assembly_notes, 

83 } 

84 

85 def header(self) -> list[str]: 

86 """Returns the dictionary keys matching each of the properties of the report.""" 

87 return list(self.to_dict().keys()) 

88 

89 def values(self) -> list[str]: 

90 """Returns the values of each of the properties of the report.""" 

91 return list(self.to_dict().values()) 

92 

93 

94def singularity_image_setter(sif_cache_dir: Path | None, datasets_version: str | None) -> Client: 

95 """Parse ENV and User specified variables related to `datasets` singularity SIF 

96 container and define version and location of container. 

97 

98 Args: 

99 sif_cache_dir: Path to locate existing, or download new SIF container image. 

100 datasets_version: URL of singularity container (custom `datasets` version if desired). 

101 

102 Returns: 

103 `spython.main.client` instance of singularity container image housing `datasets`. 

104 """ 

105 

106 # Set singularity cache dir from user defined path or use environment 

107 if sif_cache_dir and sif_cache_dir.is_dir(): 

108 image_dl_path = sif_cache_dir 

109 logging.info(f"Using user-defined cache_dir: '{image_dl_path}'") 

110 elif os.environ.get("NXF_SINGULARITY_CACHEDIR"): 

111 image_dl_path = Path(os.environ["NXF_SINGULARITY_CACHEDIR"]) 

112 logging.info( 

113 f"Using preferred nextflow singularity cache dir 'NXF_SINGULARITY_CACHEDIR': {image_dl_path}" 

114 ) 

115 elif os.environ.get("SINGULARITY_CACHEDIR"): 

116 image_dl_path = Path(os.environ["SINGULARITY_CACHEDIR"]) 

117 logging.info( 

118 f"Using the default singularity installation cache dir 'SINGULARITY_CACHEDIR': {image_dl_path}" 

119 ) 

120 else: 

121 image_dl_path = Path() 

122 logging.warning(f"Unable to set singularity cache dir properly, using CWD {image_dl_path}") 

123 

124 # Set the datasets version URL 

125 if datasets_version is None: 

126 container_url = DATASETS_SINGULARITY["datasets_version_url"] 

127 logging.info(f"Using default 'ncbi datasets' version '{container_url}'") 

128 else: 

129 container_url = datasets_version 

130 logging.info(f"Using user defined 'ncbi datasets' version '{container_url}'") 

131 

132 # Pull or load pre-existing 'datasets' singularity container image. 

133 datasets_image = Client.pull(container_url, stream=False, pull_folder=image_dl_path, quiet=True) 

134 

135 return datasets_image 

136 

137 

138def get_assembly_accessions(src_file: StrPath) -> list[str]: 

139 """Returns the list of assembly accessions found in the provided file. 

140 

141 Args: 

142 src_file: Path to file with one line per INSDC assembly accession. 

143 

144 Raises: 

145 UnsupportedFormatError: If an accession does not match the INSDC assembly accession format. 

146 """ 

147 query_accessions: list[str] = [] 

148 with Path(src_file).open(mode="r") as fin: 

149 for line in fin.readlines(): 

150 line = line.strip() 

151 match = re.match(r"^GC[AF]_[0-9]{9}\.[1-9][0-9]*$", line) 

152 if not match: 

153 raise UnsupportedFormatError(f"Could not recognize GCA/GCF accession format: {line}") 

154 query_accessions.append(line) 

155 return query_accessions 

156 

157 

158def fetch_accessions_from_core_dbs(src_file: StrPath, server_url: URL) -> dict[str, str]: 

159 """Obtain the associated INSDC accession given a set of core database names and a database server URL. 

160 

161 The accession information is obtained from the `meta` table's meta key `assembly.accession`. 

162 

163 Args: 

164 src_file: File path with list of core database names. 

165 server_url: Database server URL. 

166 

167 Returns: 

168 Dict of core database names (key) and their corresponding INSDC assembly accession (value). 

169 """ 

170 

171 core_accn_meta = {} 

172 database_count = 0 

173 count_accn_found = 0 

174 

175 with Path(src_file).open("r") as fin: 

176 for line in fin.readlines(): 

177 core_db = line.strip() 

178 database_count += 1 

179 db_connection_url = server_url.set(database=core_db) 

180 db_connection = DBConnection(db_connection_url) 

181 with db_connection.begin() as conn: 

182 query_result = conn.execute( 

183 text('SELECT meta_value FROM meta WHERE meta_key = "assembly.accession";') 

184 ).fetchall() 

185 

186 if not query_result: 

187 logging.warning(f"No accessions found in core: {core_db}") 

188 elif len(query_result) == 1: 

189 count_accn_found += 1 

190 asm_accession = query_result.pop()[0] 

191 logging.info(f"{core_db} -> assembly.accession[{asm_accession}]") 

192 core_accn_meta[core_db] = asm_accession 

193 else: 

194 logging.warning(f"Core {core_db} has {len(query_result)} assembly.accessions") 

195 

196 logging.info( 

197 f"From initial input core databases ({database_count}), obtained ({count_accn_found}) accessions" 

198 ) 

199 

200 return core_accn_meta 

201 

202 

203def fetch_datasets_reports( 

204 sif_image: Client, assembly_accessions: dict[str, str], download_directory: StrPath, batch_size: int 

205) -> dict[str, dict]: 

206 """Obtain assembly reports in JSON format for each assembly accession via `datasets` CLI. 

207 

208 Args: 

209 sif_image: Instance of `Client.loaded()` singularity image. 

210 assembly_accessions: Dictionary of accession source <> assembly accessions pairs. 

211 download_directory: Directory path to store assembly report JSON files. 

212 batch_size: Number of assembly accessions to batch submit to `datasets`. 

213 

214 Returns: 

215 Dictionary of accession source and its associated assembly report. 

216 

217 Raises: 

218 ValueError: If result returned by `datasets` is not a string. 

219 RuntimeError: If there was an error raised by `datasets`. 

220 

221 """ 

222 master_accn_list = list(assembly_accessions.values()) 

223 combined_asm_reports = {} 

224 

225 # Setting the number of combined accessions to query in a single call to datasets 

226 list_split = list(range(0, len(master_accn_list), batch_size)) 

227 accn_subsample = [master_accn_list[ind : ind + batch_size] for ind in list_split] 

228 

229 datasets_command = ["datasets", "summary", "genome", "accession"] 

230 for accessions in accn_subsample: 

231 # Make call to singularity datasets providing a multi-accession query 

232 client_return = Client.execute( 

233 image=sif_image, command=datasets_command + accessions, return_result=True, quiet=True 

234 ) 

235 raw_result = client_return["message"] 

236 

237 ## Test what result we have obtained following execution of sif image and accession value 

238 # Returned a list, i.e. datasets returned a result to client.execute 

239 # Returned a str, i.e. no datasets result obtained exited with fatal error 

240 if isinstance(raw_result, list): 

241 result = raw_result[0] 

242 else: 

243 result = raw_result 

244 if not isinstance(result, str): 

245 raise ValueError("Result obtained from datasets is not a string") 

246 if re.search("^FATAL", result): 

247 raise RuntimeError(f"Singularity image execution failed! -> '{result.strip()}'") 

248 

249 tmp_asm_dict = json.loads(result) 

250 if not tmp_asm_dict["total_count"]: 

251 logging.warning(f"No assembly report found for accession(s) {accessions}") 

252 continue 

253 

254 logging.info(f"Assembly report obtained for accession(s) {accessions}") 

255 batch_reports_json = tmp_asm_dict["reports"] 

256 for assembly_report in batch_reports_json: 

257 accession = assembly_report["accession"] 

258 asm_json_outfile = Path(download_directory, f"{accession}.asm_report.json") 

259 print_json(asm_json_outfile, assembly_report) 

260 # Save assembly report into source key<>report dict 

261 for src_key, accession_core in assembly_accessions.items(): 

262 if accession == accession_core: 

263 combined_asm_reports[src_key] = assembly_report 

264 

265 return combined_asm_reports 

266 

267 

268def extract_assembly_metadata(assembly_reports: dict[str, dict]) -> dict[str, ReportStructure]: 

269 """Parse assembly reports and extract specific key information on status and related fields. 

270 

271 Args: 

272 assembly_reports: Key value pair of source name <> assembly report. 

273 

274 Returns: 

275 Parsed assembly report meta (source, meta). 

276 """ 

277 parsed_meta = {} 

278 

279 for source, asm_report in assembly_reports.items(): 

280 asm_meta_info = ReportStructure() 

281 

282 # Mandatory meta key parsing: 

283 asm_meta_info.accession = asm_report["accession"] 

284 asm_meta_info.assembly_name = asm_report["assembly_info"]["assembly_name"] 

285 asm_meta_info.assembly_type = asm_report["assembly_info"]["assembly_type"] 

286 asm_meta_info.assembly_status = asm_report["assembly_info"]["assembly_status"] 

287 asm_meta_info.species_name = asm_report["organism"]["organism_name"] 

288 asm_meta_info.taxon_id = int(asm_report["organism"]["tax_id"]) 

289 

290 ## Non-mandatory meta key parsing: 

291 assembly_meta_keys = asm_report["assembly_info"].keys() 

292 organism_keys = asm_report["organism"].keys() 

293 

294 # check for genome_notes: 

295 if "genome_notes" in assembly_meta_keys: 

296 complete_notes = ", ".join(asm_report["assembly_info"]["genome_notes"]) 

297 asm_meta_info.assembly_notes = complete_notes 

298 

299 # check for biosample: 

300 if "biosample" in assembly_meta_keys: 

301 asm_meta_info.last_updated = asm_report["assembly_info"]["biosample"]["last_updated"] 

302 

303 # check for paired assembly: 

304 if "paired_assembly" in assembly_meta_keys: 

305 asm_meta_info.paired_assembly = asm_report["assembly_info"]["paired_assembly"]["accession"] 

306 

307 # check for isolate/strain type: 

308 if "infraspecific_names" in organism_keys: 

309 organism_type_keys = asm_report["organism"]["infraspecific_names"].keys() 

310 if "isolate" in organism_type_keys: 

311 asm_meta_info.strain = asm_report["organism"]["infraspecific_names"]["isolate"] 

312 elif "strain" in organism_type_keys: 

313 asm_meta_info.strain = asm_report["organism"]["infraspecific_names"]["strain"] 

314 

315 parsed_meta[source] = asm_meta_info 

316 

317 return parsed_meta 

318 

319 

320def generate_report_tsv( 

321 parsed_asm_reports: dict[str, ReportStructure], 

322 query_type: str, 

323 output_directory: StrPath = Path(), 

324 outfile_name: str = "AssemblyStatusReport", 

325) -> None: 

326 """Generate and write the assembly report to a TSV file. 

327 

328 Args: 

329 parsed_asm_reports: Parsed assembly report meta. 

330 query_type: Type of query (either core databases or accessions). 

331 output_directory: Directory to store report TSV file. 

332 outfile_name: Name to give to the output TSV file. 

333 """ 

334 tsv_outfile = Path(output_directory, f"{outfile_name}.tsv") 

335 

336 header_list = next(iter(parsed_asm_reports.values())).header() 

337 header_list = [query_type.capitalize().replace("_", " ")] + header_list 

338 

339 with open(tsv_outfile, "w+") as tsv_out: 

340 writer = csv.writer(tsv_out, delimiter="\t", lineterminator="\n") 

341 writer.writerow(header_list) 

342 for core, report_meta in parsed_asm_reports.items(): 

343 final_asm_report = [core] + report_meta.values() 

344 writer.writerow(final_asm_report) 

345 

346 

347def main() -> None: 

348 """Module's entry-point.""" 

349 parser = ArgumentParser(description=__doc__) 

350 parser.add_argument("--version", action="version", version=ensembl.io.genomio.__version__) 

351 # Create parser with common arguments to be used by both subparsers 

352 base_parser = ArgumentParser(add_help=False) 

353 base_parser.add_argument_dst_path( 

354 "--reports_dir", 

355 default=Path("assembly_report_jsons"), 

356 help="path to folder where the assembly report JSON files is stored", 

357 ) 

358 base_parser.add_argument( 

359 "--assembly_report_name", 

360 metavar="NAME", 

361 default="AssemblyStatusReport", 

362 help="file name used for the assembly report TSV output file", 

363 ) 

364 base_parser.add_argument( 

365 "--datasets_version_url", 

366 type=str, 

367 metavar="URL", 

368 help="datasets version, e.g. docker://ensemblorg/datasets-cli:latest", 

369 ) 

370 base_parser.add_argument_src_path( 

371 "--cache_dir", 

372 default=Path(os.environ.get("NXF_SINGULARITY_CACHEDIR", "")), 

373 metavar="SINGULARITY_CACHE", 

374 help="folder path to user generated singularity container housing NCBI tool 'datasets'", 

375 ) 

376 base_parser.add_numeric_argument( 

377 "--datasets_batch_size", 

378 type=int, 

379 min_value=1, 

380 default=100, 

381 metavar="BATCH_SIZE", 

382 help="number of accessions requested in one query to datasets", 

383 ) 

384 base_parser.add_log_arguments(add_log_file=True) 

385 # Add subparsers with their parent being the base parser with the common arguments 

386 subparsers = parser.add_subparsers(title="report assembly status from", required=True, dest="src") 

387 # Specific arguments required when using Ensembl core database names as source 

388 core_db_parser = subparsers.add_parser( 

389 "core_db", parents=[base_parser], help="list of Ensembl core databases" 

390 ) 

391 core_db_parser.add_argument_src_path( 

392 "--input", 

393 required=True, 

394 help="file path with list of Ensembl core database(s) to retrieve query accessions from", 

395 ) 

396 core_db_parser.add_server_arguments() 

397 # Specific arguments required when using assembly accessions as source 

398 accessions_parser = subparsers.add_parser( 

399 "accession", parents=[base_parser], help="list of INSDC accessions" 

400 ) 

401 accessions_parser.add_argument_src_path( 

402 "--input", required=True, help="file path with list of assembly INSDC query accessions" 

403 ) 

404 

405 args = parser.parse_args() 

406 init_logging_with_args(args) 

407 

408 # Get accessions on cores list or use user accession list directly 

409 if args.src == "core_db": 

410 query_accessions = fetch_accessions_from_core_dbs(args.input, args.url) 

411 else: 

412 query_accessions = {x: x for x in get_assembly_accessions(args.input)} 

413 

414 # Parse singularity setting and define the SIF image for 'datasets' 

415 datasets_image = singularity_image_setter(args.cache_dir, args.datasets_version_url) 

416 

417 # Datasets query implementation for one or more batched accessions 

418 assembly_reports = fetch_datasets_reports( 

419 datasets_image, query_accessions, args.reports_dir, args.datasets_batch_size 

420 ) 

421 

422 # Extract the key assembly report meta information for reporting status 

423 key_assembly_report_meta = extract_assembly_metadata(assembly_reports) 

424 

425 # Produce the finalized assembly status report TSV from set of parsed 'datasets summary report' 

426 generate_report_tsv(key_assembly_report_meta, args.src, args.reports_dir, args.assembly_report_name)