Coverage for src/python/ensembl/io/genomio/assembly/status.py: 89%
180 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-02-21 15:37 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-02-21 15:37 +0000
1# See the NOTICE file distributed with this work for additional information
2# regarding copyright ownership.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15"""Obtain and record the assembly status for a set of INSDC accession(s) using NCBI's datasets CLI tool."""
17__all__ = [
18 "extract_assembly_metadata",
19 "fetch_datasets_reports",
20 "fetch_accessions_from_core_dbs",
21 "generate_report_tsv",
22 "get_assembly_accessions",
23 "singularity_image_setter",
24]
26import csv
27from dataclasses import dataclass
28import json
29import logging
30import os
31from pathlib import Path
32import re
34from spython.main import Client
35from sqlalchemy.engine import URL
36from sqlalchemy import text
38import ensembl.io.genomio
39from ensembl.io.genomio.utils.json_utils import print_json
40from ensembl.utils import StrPath
41from ensembl.utils.argparse import ArgumentParser
42from ensembl.utils.database import DBConnection
43from ensembl.utils.logging import init_logging_with_args
46DATASETS_SINGULARITY = {
47 "datasets_version_url": "docker://ensemblorg/datasets-cli:latest",
48}
51class UnsupportedFormatError(Exception):
52 """When a string does not have the expected format."""
55@dataclass
56class ReportStructure:
57 """Stores key report meta information."""
59 species_name: str = ""
60 taxon_id: int = 0
61 strain: str = "NA"
62 assembly_name: str = ""
63 assembly_type: str = ""
64 accession: str = ""
65 paired_assembly: str = "NA"
66 last_updated: str = ""
67 assembly_status: str = "NA"
68 assembly_notes: str = "NA"
70 def to_dict(self) -> dict[str, str]:
71 """Returns a dictionary representation of this object."""
72 return {
73 "Species Name": self.species_name,
74 "Taxon ID": str(self.taxon_id),
75 "Isolate/Strain": self.strain,
76 "Asm name": self.assembly_name,
77 "Assembly type": self.assembly_type,
78 "Asm accession": self.accession,
79 "Paired assembly": self.paired_assembly,
80 "Asm last updated": self.last_updated,
81 "Asm status": self.assembly_status,
82 "Asm notes": self.assembly_notes,
83 }
85 def header(self) -> list[str]:
86 """Returns the dictionary keys matching each of the properties of the report."""
87 return list(self.to_dict().keys())
89 def values(self) -> list[str]:
90 """Returns the values of each of the properties of the report."""
91 return list(self.to_dict().values())
94def singularity_image_setter(sif_cache_dir: Path | None, datasets_version: str | None) -> Client:
95 """Parse ENV and User specified variables related to `datasets` singularity SIF
96 container and define version and location of container.
98 Args:
99 sif_cache_dir: Path to locate existing, or download new SIF container image.
100 datasets_version: URL of singularity container (custom `datasets` version if desired).
102 Returns:
103 `spython.main.client` instance of singularity container image housing `datasets`.
104 """
106 # Set singularity cache dir from user defined path or use environment
107 if sif_cache_dir and sif_cache_dir.is_dir():
108 image_dl_path = sif_cache_dir
109 logging.info(f"Using user-defined cache_dir: '{image_dl_path}'")
110 elif os.environ.get("NXF_SINGULARITY_CACHEDIR"):
111 image_dl_path = Path(os.environ["NXF_SINGULARITY_CACHEDIR"])
112 logging.info(
113 f"Using preferred nextflow singularity cache dir 'NXF_SINGULARITY_CACHEDIR': {image_dl_path}"
114 )
115 elif os.environ.get("SINGULARITY_CACHEDIR"):
116 image_dl_path = Path(os.environ["SINGULARITY_CACHEDIR"])
117 logging.info(
118 f"Using the default singularity installation cache dir 'SINGULARITY_CACHEDIR': {image_dl_path}"
119 )
120 else:
121 image_dl_path = Path()
122 logging.warning(f"Unable to set singularity cache dir properly, using CWD {image_dl_path}")
124 # Set the datasets version URL
125 if datasets_version is None:
126 container_url = DATASETS_SINGULARITY["datasets_version_url"]
127 logging.info(f"Using default 'ncbi datasets' version '{container_url}'")
128 else:
129 container_url = datasets_version
130 logging.info(f"Using user defined 'ncbi datasets' version '{container_url}'")
132 # Pull or load pre-existing 'datasets' singularity container image.
133 datasets_image = Client.pull(container_url, stream=False, pull_folder=image_dl_path, quiet=True)
135 return datasets_image
138def get_assembly_accessions(src_file: StrPath) -> list[str]:
139 """Returns the list of assembly accessions found in the provided file.
141 Args:
142 src_file: Path to file with one line per INSDC assembly accession.
144 Raises:
145 UnsupportedFormatError: If an accession does not match the INSDC assembly accession format.
146 """
147 query_accessions: list[str] = []
148 with Path(src_file).open(mode="r") as fin:
149 for line in fin.readlines():
150 line = line.strip()
151 match = re.match(r"^GC[AF]_[0-9]{9}\.[1-9][0-9]*$", line)
152 if not match:
153 raise UnsupportedFormatError(f"Could not recognize GCA/GCF accession format: {line}")
154 query_accessions.append(line)
155 return query_accessions
158def fetch_accessions_from_core_dbs(src_file: StrPath, server_url: URL) -> dict[str, str]:
159 """Obtain the associated INSDC accession given a set of core database names and a database server URL.
161 The accession information is obtained from the `meta` table's meta key `assembly.accession`.
163 Args:
164 src_file: File path with list of core database names.
165 server_url: Database server URL.
167 Returns:
168 Dict of core database names (key) and their corresponding INSDC assembly accession (value).
169 """
171 core_accn_meta = {}
172 database_count = 0
173 count_accn_found = 0
175 with Path(src_file).open("r") as fin:
176 for line in fin.readlines():
177 core_db = line.strip()
178 database_count += 1
179 db_connection_url = server_url.set(database=core_db)
180 db_connection = DBConnection(db_connection_url)
181 with db_connection.begin() as conn:
182 query_result = conn.execute(
183 text('SELECT meta_value FROM meta WHERE meta_key = "assembly.accession";')
184 ).fetchall()
186 if not query_result:
187 logging.warning(f"No accessions found in core: {core_db}")
188 elif len(query_result) == 1:
189 count_accn_found += 1
190 asm_accession = query_result.pop()[0]
191 logging.info(f"{core_db} -> assembly.accession[{asm_accession}]")
192 core_accn_meta[core_db] = asm_accession
193 else:
194 logging.warning(f"Core {core_db} has {len(query_result)} assembly.accessions")
196 logging.info(
197 f"From initial input core databases ({database_count}), obtained ({count_accn_found}) accessions"
198 )
200 return core_accn_meta
203def fetch_datasets_reports(
204 sif_image: Client, assembly_accessions: dict[str, str], download_directory: StrPath, batch_size: int
205) -> dict[str, dict]:
206 """Obtain assembly reports in JSON format for each assembly accession via `datasets` CLI.
208 Args:
209 sif_image: Instance of `Client.loaded()` singularity image.
210 assembly_accessions: Dictionary of accession source <> assembly accessions pairs.
211 download_directory: Directory path to store assembly report JSON files.
212 batch_size: Number of assembly accessions to batch submit to `datasets`.
214 Returns:
215 Dictionary of accession source and its associated assembly report.
217 Raises:
218 ValueError: If result returned by `datasets` is not a string.
219 RuntimeError: If there was an error raised by `datasets`.
221 """
222 master_accn_list = list(assembly_accessions.values())
223 combined_asm_reports = {}
225 # Setting the number of combined accessions to query in a single call to datasets
226 list_split = list(range(0, len(master_accn_list), batch_size))
227 accn_subsample = [master_accn_list[ind : ind + batch_size] for ind in list_split]
229 datasets_command = ["datasets", "summary", "genome", "accession"]
230 for accessions in accn_subsample:
231 # Make call to singularity datasets providing a multi-accession query
232 client_return = Client.execute(
233 image=sif_image, command=datasets_command + accessions, return_result=True, quiet=True
234 )
235 raw_result = client_return["message"]
237 ## Test what result we have obtained following execution of sif image and accession value
238 # Returned a list, i.e. datasets returned a result to client.execute
239 # Returned a str, i.e. no datasets result obtained exited with fatal error
240 if isinstance(raw_result, list):
241 result = raw_result[0]
242 else:
243 result = raw_result
244 if not isinstance(result, str):
245 raise ValueError("Result obtained from datasets is not a string")
246 if re.search("^FATAL", result):
247 raise RuntimeError(f"Singularity image execution failed! -> '{result.strip()}'")
249 tmp_asm_dict = json.loads(result)
250 if not tmp_asm_dict["total_count"]:
251 logging.warning(f"No assembly report found for accession(s) {accessions}")
252 continue
254 logging.info(f"Assembly report obtained for accession(s) {accessions}")
255 batch_reports_json = tmp_asm_dict["reports"]
256 for assembly_report in batch_reports_json:
257 accession = assembly_report["accession"]
258 asm_json_outfile = Path(download_directory, f"{accession}.asm_report.json")
259 print_json(asm_json_outfile, assembly_report)
260 # Save assembly report into source key<>report dict
261 for src_key, accession_core in assembly_accessions.items():
262 if accession == accession_core:
263 combined_asm_reports[src_key] = assembly_report
265 return combined_asm_reports
268def extract_assembly_metadata(assembly_reports: dict[str, dict]) -> dict[str, ReportStructure]:
269 """Parse assembly reports and extract specific key information on status and related fields.
271 Args:
272 assembly_reports: Key value pair of source name <> assembly report.
274 Returns:
275 Parsed assembly report meta (source, meta).
276 """
277 parsed_meta = {}
279 for source, asm_report in assembly_reports.items():
280 asm_meta_info = ReportStructure()
282 # Mandatory meta key parsing:
283 asm_meta_info.accession = asm_report["accession"]
284 asm_meta_info.assembly_name = asm_report["assembly_info"]["assembly_name"]
285 asm_meta_info.assembly_type = asm_report["assembly_info"]["assembly_type"]
286 asm_meta_info.assembly_status = asm_report["assembly_info"]["assembly_status"]
287 asm_meta_info.species_name = asm_report["organism"]["organism_name"]
288 asm_meta_info.taxon_id = int(asm_report["organism"]["tax_id"])
290 ## Non-mandatory meta key parsing:
291 assembly_meta_keys = asm_report["assembly_info"].keys()
292 organism_keys = asm_report["organism"].keys()
294 # check for genome_notes:
295 if "genome_notes" in assembly_meta_keys:
296 complete_notes = ", ".join(asm_report["assembly_info"]["genome_notes"])
297 asm_meta_info.assembly_notes = complete_notes
299 # check for biosample:
300 if "biosample" in assembly_meta_keys:
301 asm_meta_info.last_updated = asm_report["assembly_info"]["biosample"]["last_updated"]
303 # check for paired assembly:
304 if "paired_assembly" in assembly_meta_keys:
305 asm_meta_info.paired_assembly = asm_report["assembly_info"]["paired_assembly"]["accession"]
307 # check for isolate/strain type:
308 if "infraspecific_names" in organism_keys:
309 organism_type_keys = asm_report["organism"]["infraspecific_names"].keys()
310 if "isolate" in organism_type_keys:
311 asm_meta_info.strain = asm_report["organism"]["infraspecific_names"]["isolate"]
312 elif "strain" in organism_type_keys:
313 asm_meta_info.strain = asm_report["organism"]["infraspecific_names"]["strain"]
315 parsed_meta[source] = asm_meta_info
317 return parsed_meta
320def generate_report_tsv(
321 parsed_asm_reports: dict[str, ReportStructure],
322 query_type: str,
323 output_directory: StrPath = Path(),
324 outfile_name: str = "AssemblyStatusReport",
325) -> None:
326 """Generate and write the assembly report to a TSV file.
328 Args:
329 parsed_asm_reports: Parsed assembly report meta.
330 query_type: Type of query (either core databases or accessions).
331 output_directory: Directory to store report TSV file.
332 outfile_name: Name to give to the output TSV file.
333 """
334 tsv_outfile = Path(output_directory, f"{outfile_name}.tsv")
336 header_list = next(iter(parsed_asm_reports.values())).header()
337 header_list = [query_type.capitalize().replace("_", " ")] + header_list
339 with open(tsv_outfile, "w+") as tsv_out:
340 writer = csv.writer(tsv_out, delimiter="\t", lineterminator="\n")
341 writer.writerow(header_list)
342 for core, report_meta in parsed_asm_reports.items():
343 final_asm_report = [core] + report_meta.values()
344 writer.writerow(final_asm_report)
347def main() -> None:
348 """Module's entry-point."""
349 parser = ArgumentParser(description=__doc__)
350 parser.add_argument("--version", action="version", version=ensembl.io.genomio.__version__)
351 # Create parser with common arguments to be used by both subparsers
352 base_parser = ArgumentParser(add_help=False)
353 base_parser.add_argument_dst_path(
354 "--reports_dir",
355 default=Path("assembly_report_jsons"),
356 help="path to folder where the assembly report JSON files is stored",
357 )
358 base_parser.add_argument(
359 "--assembly_report_name",
360 metavar="NAME",
361 default="AssemblyStatusReport",
362 help="file name used for the assembly report TSV output file",
363 )
364 base_parser.add_argument(
365 "--datasets_version_url",
366 type=str,
367 metavar="URL",
368 help="datasets version, e.g. docker://ensemblorg/datasets-cli:latest",
369 )
370 base_parser.add_argument_src_path(
371 "--cache_dir",
372 default=Path(os.environ.get("NXF_SINGULARITY_CACHEDIR", "")),
373 metavar="SINGULARITY_CACHE",
374 help="folder path to user generated singularity container housing NCBI tool 'datasets'",
375 )
376 base_parser.add_numeric_argument(
377 "--datasets_batch_size",
378 type=int,
379 min_value=1,
380 default=100,
381 metavar="BATCH_SIZE",
382 help="number of accessions requested in one query to datasets",
383 )
384 base_parser.add_log_arguments(add_log_file=True)
385 # Add subparsers with their parent being the base parser with the common arguments
386 subparsers = parser.add_subparsers(title="report assembly status from", required=True, dest="src")
387 # Specific arguments required when using Ensembl core database names as source
388 core_db_parser = subparsers.add_parser(
389 "core_db", parents=[base_parser], help="list of Ensembl core databases"
390 )
391 core_db_parser.add_argument_src_path(
392 "--input",
393 required=True,
394 help="file path with list of Ensembl core database(s) to retrieve query accessions from",
395 )
396 core_db_parser.add_server_arguments()
397 # Specific arguments required when using assembly accessions as source
398 accessions_parser = subparsers.add_parser(
399 "accession", parents=[base_parser], help="list of INSDC accessions"
400 )
401 accessions_parser.add_argument_src_path(
402 "--input", required=True, help="file path with list of assembly INSDC query accessions"
403 )
405 args = parser.parse_args()
406 init_logging_with_args(args)
408 # Get accessions on cores list or use user accession list directly
409 if args.src == "core_db":
410 query_accessions = fetch_accessions_from_core_dbs(args.input, args.url)
411 else:
412 query_accessions = {x: x for x in get_assembly_accessions(args.input)}
414 # Parse singularity setting and define the SIF image for 'datasets'
415 datasets_image = singularity_image_setter(args.cache_dir, args.datasets_version_url)
417 # Datasets query implementation for one or more batched accessions
418 assembly_reports = fetch_datasets_reports(
419 datasets_image, query_accessions, args.reports_dir, args.datasets_batch_size
420 )
422 # Extract the key assembly report meta information for reporting status
423 key_assembly_report_meta = extract_assembly_metadata(assembly_reports)
425 # Produce the finalized assembly status report TSV from set of parsed 'datasets summary report'
426 generate_report_tsv(key_assembly_report_meta, args.src, args.reports_dir, args.assembly_report_name)