Coverage for src/python/ensembl/io/genomio/assembly/download.py: 91%
161 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-02-21 15:37 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-02-21 15:37 +0000
1# See the NOTICE file distributed with this work for additional information
2# regarding copyright ownership.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15"""Download an assembly data files from INSDC or RefSeq."""
17__all__ = [
18 "FileDownloadError",
19 "FTPConnectionError",
20 "UnsupportedFormatError",
21 "establish_ftp",
22 "md5_files",
23 "get_checksums",
24 "download_files",
25 "get_files_selection",
26 "get_root_name",
27 "retrieve_assembly_data",
28]
30from ftplib import FTP
31import hashlib
32import logging
33from os import PathLike
34from pathlib import Path
35import re
36import time
37from typing import Dict, Optional
39import ensembl.io.genomio
40from ensembl.utils.argparse import ArgumentParser
41from ensembl.utils.logging import init_logging_with_args
43_FILE_ENDS = {
44 "assembly_report.txt": "report",
45 "genomic.fna.gz": "fasta_dna",
46 "protein.faa.gz": "fasta_pep",
47 "genomic.gff.gz": "gff3_raw",
48 "genomic.gbff.gz": "gbff",
49}
52class FileDownloadError(Exception):
53 """When a file download fails or there is a problem with that file."""
56class FTPConnectionError(Exception):
57 """Error while initialising an FTP connection."""
60class UnsupportedFormatError(Exception):
61 """When a string does not have the expected format."""
64def establish_ftp(ftp_conn: FTP, ftp_url: str, accession: str) -> FTP:
65 """Return an FTP connection based on the provided `accession` and `sub_dir`.
67 Args:
68 ftp_conn: FTP class object.
69 ftp_url: Specific FTP URL in connection request.
70 accession: Genome accession required data for download.
72 Raises:
73 UnsupportedFormatError: If `accession` does not follow INSDC's accession format.
74 """
76 match = re.match(r"^(GC[AF])_([0-9]{3})([0-9]{3})([0-9]{3})(\.[0-9]+)?$", accession)
77 if not match:
78 raise UnsupportedFormatError(f"Could not recognize GCA accession format: {accession}")
79 gca = match.group(1)
80 part1 = match.group(2)
81 part2 = match.group(3)
82 part3 = match.group(4)
83 sub_dir = Path("genomes", "all", gca, part1, part2, part3)
85 # Try now to establish connection to remote FTP server
86 ftp_conn.connect(ftp_url)
87 ftp_conn.login()
88 ftp_conn.cwd(str(sub_dir))
90 return ftp_conn
93def md5_files(dl_dir: Path, md5_path: Optional[Path] = None, md5_filename: str = "md5checksums.txt") -> bool:
94 """
95 Check all files checksums with the sums listed in a checksum file, if available.
96 Return False if there is no checksum file, or a file is missing, or has a wrong checksum.
98 Args:
99 dl_dir: Path location to containing downloaded FTP files.
100 md5_path: Full path to an MD5 checksum file.
101 md5_filename: Name of a checksum file in the `dl_dir` (used if no `md5_path` is given).
102 """
103 # Get or set md5 file to user or default setting
104 if md5_path is None:
105 md5_path = dl_dir / md5_filename
107 # Get checksums and compare
108 sums = get_checksums(md5_path)
109 if not sums:
110 return False
111 logging.info(f" File sums from {md5_path}: {len(sums)}")
112 for dl_file, checksum in sums.items():
113 for end in _FILE_ENDS:
114 if dl_file.endswith(end) and not dl_file.endswith(f"_from_{end}"):
115 file_path = dl_dir / dl_file
116 if not file_path.is_file(): 116 ↛ 117line 116 didn't jump to line 117 because the condition on line 116 was never true
117 logging.warning(f" No file {file_path} found")
118 return False
119 # Check the file checksum
120 with file_path.open(mode="rb") as f:
121 content = f.read()
122 file_sum = hashlib.md5(content).hexdigest()
123 if file_sum != checksum:
124 logging.warning(f" File {file_path} checksum doesn't match")
125 return False
126 logging.info(f" File checksum ok {file_path}")
127 logging.info(" All checksums OK")
128 return True
131def get_checksums(checksum_path: Path) -> Dict[str, str]:
132 """
133 Get a dict of checksums from a file, with file names as keys and sums as values
135 Args:
136 checksum_path: Path location to MD5 checksum file.
137 """
138 sums: Dict[str, str] = {}
139 if not checksum_path.is_file():
140 return sums
141 with checksum_path.open(mode="r") as fh:
142 for line in fh:
143 checksum, file_path = line.strip().split(" ")
144 file_path = file_path[2:]
145 if not file_path.find("/") >= 0:
146 sums[file_path] = checksum
147 return sums
150def download_files(ftp_connection: FTP, accession: str, dl_dir: Path, max_redo: int) -> None:
151 """
152 Given an INSDC accession, download all available files from the ftp to the download dir
154 Args:
155 ftp_connection: An open FTP connection object
156 accession: Genome assembly accession.
157 dl_dir: Path to downloaded FTP files.
158 max_redo: Maximum FTP connection retry attempts.
159 """
161 # Get the list of assemblies for this accession
162 for ftp_dir, _ in ftp_connection.mlsd():
163 if re.search(accession, ftp_dir):
164 ftp_connection.cwd(ftp_dir)
166 # First, get the md5sum file
167 md5_file = "md5checksums.txt"
168 md5_path = dl_dir / md5_file
169 with md5_path.open("wb") as fp:
170 ftp_connection.retrbinary(f"RETR {md5_file}", fp.write)
171 md5_sums = get_checksums(md5_path)
173 # Get all the files
174 for ftp_file, _ in ftp_connection.mlsd():
175 for end in _FILE_ENDS:
176 if ftp_file.endswith(end) and not ftp_file.endswith(f"_from_{end}"):
177 _download_file(ftp_connection, ftp_file, md5_sums, dl_dir, max_redo)
178 else:
179 logging.warning(
180 f"Could not find accession '{accession}' from ftp {ftp_dir} in open FTP connection"
181 )
184def _download_file(
185 ftp_connection: FTP, ftp_file: str, md5_sums: Dict[str, str], dl_dir: Path, max_redo: int = 0
186) -> None:
187 """Downloads individual files from FTP server.
189 Args:
190 ftp_connection: Established connection FTP object.
191 ftp_file: Name of ftp file to download.
192 md5_sums: Dictionary of key value pairs filename - md5_checksums.
193 dl_dir: Path to downloaded FTP files.
194 max_redo: Maximum number of connection retry attempts.
195 """
196 has_md5 = True
197 expected_sum = ""
198 if ftp_file not in md5_sums:
199 logging.warning(f" File not in the md5 checksums: {ftp_file}")
200 has_md5 = False
201 else:
202 expected_sum = md5_sums[ftp_file]
203 local_path = Path(dl_dir, ftp_file)
205 # File exists? Check md5sum before anything else
206 if local_path.is_file():
207 if has_md5: 207 ↛ 215line 207 didn't jump to line 215 because the condition on line 207 was always true
208 with local_path.open(mode="rb") as fp:
209 content = fp.read()
210 file_sum = hashlib.md5(content).hexdigest()
211 if file_sum == expected_sum: 211 ↛ 216line 211 didn't jump to line 216
212 logging.info(f" File {local_path} is already downloaded properly")
213 return
214 else:
215 logging.info(f" Can't check file (no md5sum), using it as is: {local_path}")
216 file_sum = ""
217 redo = 0
219 while (file_sum != expected_sum) and (redo <= max_redo):
220 redo += 1
221 if redo > 1: 221 ↛ 222line 221 didn't jump to line 222 because the condition on line 221 was never true
222 time.sleep(3)
224 # Download the file
225 logging.info(f" Downloading file {ftp_file}, try {redo}...")
226 try:
227 with local_path.open(mode="wb") as fp:
228 ftp_connection.retrbinary(f"RETR {ftp_file}", fp.write)
229 except EOFError:
230 continue
232 # Compute checksum
233 with local_path.open(mode="rb") as fp:
234 content = fp.read()
235 file_sum = hashlib.md5(content).hexdigest()
236 if expected_sum == file_sum:
237 logging.info(f" Downloaded file properly to {local_path}")
238 else:
239 raise FileDownloadError(f"Could not download file {ftp_file} after {redo} tries")
242def get_files_selection(dl_dir: Path) -> Dict[str, str]:
243 """Returns a dictionary with the relevant downloaded files classified.
245 Args:
246 dl_dir: Local path to downloaded FTP files.
248 Returns:
249 Dictionary of file type (e.g.`"report"`) as keys and the relative file path (from `dl_dir`) as values.
251 Raises:
252 FileDownloadError: If `dl_dir` tree does not include a file named `*_assembly_report.txt`.
253 """
254 files = {}
255 root_name = get_root_name(dl_dir)
256 if root_name == "":
257 raise FileDownloadError(f"Could not determine the files root name in {dl_dir}")
258 for dl_file in dl_dir.iterdir():
259 for end, name in _FILE_ENDS.items():
260 file_with_end = dl_file.name.endswith(end) and not dl_file.name.endswith(f"_from_{end}")
261 if (root_name and dl_file.name == root_name + end) or file_with_end:
262 files[name] = str(dl_file)
263 return files
266def get_root_name(dl_dir: Path) -> str:
267 """Returns the root name, i.e. shared files basename prefix, using the assembly report file as base.
269 Args:
270 dl_dir: Path location of downloaded FTP files.
271 """
272 root_name = ""
273 for dl_file in dl_dir.iterdir():
274 matches = re.search("^(.+_)assembly_report.txt", dl_file.name)
275 if matches:
276 root_name = matches.group(1)
277 break
278 return root_name
281def retrieve_assembly_data(
282 accession: str,
283 download_dir: PathLike,
284 max_increment: int = 0,
285 max_redo: int = 3,
286) -> None:
287 """Establishes an FTP connection and downloads a predefined subset of assembly data files from either
288 INSDC or RefSeq.
290 Args:
291 accession: Genome assembly accession.
292 download_dir: Path to where to download FTP files.
293 max_increment: If you want to allow assembly versions.
294 max_redo: Maximum FTP connection retry attempts.
296 Raises:
297 FileDownloadError: If no files are downloaded or if any does not match its MD5 checksum.
298 """
299 download_dir = Path(download_dir)
301 # Set and create dedicated dir for download
302 download_dir.mkdir(parents=True, exist_ok=True)
304 # Download if files don't exist or fail checksum
305 if not md5_files(download_dir, None):
306 logging.info(" Download the files")
308 for increment in range(0, max_increment + 1):
309 if increment > 0:
310 logging.info(f" Increment accession version once from {accession}")
311 version = int(accession[-1])
312 version += 1
313 accession = accession[:-1] + str(version)
314 download_dir.mkdir(parents=True, exist_ok=True)
315 ftp_url = "ftp.ncbi.nlm.nih.gov"
316 ftp_instance = FTP()
317 open_ftp_connection = establish_ftp(ftp_instance, ftp_url, accession)
318 download_files(open_ftp_connection, accession, download_dir, max_redo)
320 if not md5_files(download_dir, None): 320 ↛ 324line 320 didn't jump to line 324 because the condition on line 320 was always true
321 raise FileDownloadError("Failed md5sum of downloaded files")
323 # Select specific files and give them a name
324 files = get_files_selection(download_dir)
326 if len(files) == 0: 326 ↛ 327line 326 didn't jump to line 327 because the condition on line 326 was never true
327 raise FileDownloadError("No file downloaded")
330def main() -> None:
331 """Module's entry-point."""
332 parser = ArgumentParser(description="Download an assembly data files from INSDC or RefSeq.")
333 parser.add_argument("--accession", required=True, help="Genome assembly accession")
334 parser.add_argument_dst_path(
335 "--download_dir", default=Path.cwd(), help="Folder where the data will be downloaded"
336 )
337 parser.add_argument("--version", action="version", version=ensembl.io.genomio.__version__)
338 parser.add_log_arguments()
339 args = parser.parse_args()
340 init_logging_with_args(args)
342 retrieve_assembly_data(args.accession, args.download_dir)