Coverage for src/python/ensembl/io/genomio/assembly/download.py: 91%

161 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-02-21 15:37 +0000

1# See the NOTICE file distributed with this work for additional information 

2# regarding copyright ownership. 

3# 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

13# See the License for the specific language governing permissions and 

14# limitations under the License. 

15"""Download an assembly data files from INSDC or RefSeq.""" 

16 

17__all__ = [ 

18 "FileDownloadError", 

19 "FTPConnectionError", 

20 "UnsupportedFormatError", 

21 "establish_ftp", 

22 "md5_files", 

23 "get_checksums", 

24 "download_files", 

25 "get_files_selection", 

26 "get_root_name", 

27 "retrieve_assembly_data", 

28] 

29 

30from ftplib import FTP 

31import hashlib 

32import logging 

33from os import PathLike 

34from pathlib import Path 

35import re 

36import time 

37from typing import Dict, Optional 

38 

39import ensembl.io.genomio 

40from ensembl.utils.argparse import ArgumentParser 

41from ensembl.utils.logging import init_logging_with_args 

42 

43_FILE_ENDS = { 

44 "assembly_report.txt": "report", 

45 "genomic.fna.gz": "fasta_dna", 

46 "protein.faa.gz": "fasta_pep", 

47 "genomic.gff.gz": "gff3_raw", 

48 "genomic.gbff.gz": "gbff", 

49} 

50 

51 

52class FileDownloadError(Exception): 

53 """When a file download fails or there is a problem with that file.""" 

54 

55 

56class FTPConnectionError(Exception): 

57 """Error while initialising an FTP connection.""" 

58 

59 

60class UnsupportedFormatError(Exception): 

61 """When a string does not have the expected format.""" 

62 

63 

64def establish_ftp(ftp_conn: FTP, ftp_url: str, accession: str) -> FTP: 

65 """Return an FTP connection based on the provided `accession` and `sub_dir`. 

66 

67 Args: 

68 ftp_conn: FTP class object. 

69 ftp_url: Specific FTP URL in connection request. 

70 accession: Genome accession required data for download. 

71 

72 Raises: 

73 UnsupportedFormatError: If `accession` does not follow INSDC's accession format. 

74 """ 

75 

76 match = re.match(r"^(GC[AF])_([0-9]{3})([0-9]{3})([0-9]{3})(\.[0-9]+)?$", accession) 

77 if not match: 

78 raise UnsupportedFormatError(f"Could not recognize GCA accession format: {accession}") 

79 gca = match.group(1) 

80 part1 = match.group(2) 

81 part2 = match.group(3) 

82 part3 = match.group(4) 

83 sub_dir = Path("genomes", "all", gca, part1, part2, part3) 

84 

85 # Try now to establish connection to remote FTP server 

86 ftp_conn.connect(ftp_url) 

87 ftp_conn.login() 

88 ftp_conn.cwd(str(sub_dir)) 

89 

90 return ftp_conn 

91 

92 

93def md5_files(dl_dir: Path, md5_path: Optional[Path] = None, md5_filename: str = "md5checksums.txt") -> bool: 

94 """ 

95 Check all files checksums with the sums listed in a checksum file, if available. 

96 Return False if there is no checksum file, or a file is missing, or has a wrong checksum. 

97 

98 Args: 

99 dl_dir: Path location to containing downloaded FTP files. 

100 md5_path: Full path to an MD5 checksum file. 

101 md5_filename: Name of a checksum file in the `dl_dir` (used if no `md5_path` is given). 

102 """ 

103 # Get or set md5 file to user or default setting 

104 if md5_path is None: 

105 md5_path = dl_dir / md5_filename 

106 

107 # Get checksums and compare 

108 sums = get_checksums(md5_path) 

109 if not sums: 

110 return False 

111 logging.info(f" File sums from {md5_path}: {len(sums)}") 

112 for dl_file, checksum in sums.items(): 

113 for end in _FILE_ENDS: 

114 if dl_file.endswith(end) and not dl_file.endswith(f"_from_{end}"): 

115 file_path = dl_dir / dl_file 

116 if not file_path.is_file(): 116 ↛ 117line 116 didn't jump to line 117 because the condition on line 116 was never true

117 logging.warning(f" No file {file_path} found") 

118 return False 

119 # Check the file checksum 

120 with file_path.open(mode="rb") as f: 

121 content = f.read() 

122 file_sum = hashlib.md5(content).hexdigest() 

123 if file_sum != checksum: 

124 logging.warning(f" File {file_path} checksum doesn't match") 

125 return False 

126 logging.info(f" File checksum ok {file_path}") 

127 logging.info(" All checksums OK") 

128 return True 

129 

130 

131def get_checksums(checksum_path: Path) -> Dict[str, str]: 

132 """ 

133 Get a dict of checksums from a file, with file names as keys and sums as values 

134 

135 Args: 

136 checksum_path: Path location to MD5 checksum file. 

137 """ 

138 sums: Dict[str, str] = {} 

139 if not checksum_path.is_file(): 

140 return sums 

141 with checksum_path.open(mode="r") as fh: 

142 for line in fh: 

143 checksum, file_path = line.strip().split(" ") 

144 file_path = file_path[2:] 

145 if not file_path.find("/") >= 0: 

146 sums[file_path] = checksum 

147 return sums 

148 

149 

150def download_files(ftp_connection: FTP, accession: str, dl_dir: Path, max_redo: int) -> None: 

151 """ 

152 Given an INSDC accession, download all available files from the ftp to the download dir 

153 

154 Args: 

155 ftp_connection: An open FTP connection object 

156 accession: Genome assembly accession. 

157 dl_dir: Path to downloaded FTP files. 

158 max_redo: Maximum FTP connection retry attempts. 

159 """ 

160 

161 # Get the list of assemblies for this accession 

162 for ftp_dir, _ in ftp_connection.mlsd(): 

163 if re.search(accession, ftp_dir): 

164 ftp_connection.cwd(ftp_dir) 

165 

166 # First, get the md5sum file 

167 md5_file = "md5checksums.txt" 

168 md5_path = dl_dir / md5_file 

169 with md5_path.open("wb") as fp: 

170 ftp_connection.retrbinary(f"RETR {md5_file}", fp.write) 

171 md5_sums = get_checksums(md5_path) 

172 

173 # Get all the files 

174 for ftp_file, _ in ftp_connection.mlsd(): 

175 for end in _FILE_ENDS: 

176 if ftp_file.endswith(end) and not ftp_file.endswith(f"_from_{end}"): 

177 _download_file(ftp_connection, ftp_file, md5_sums, dl_dir, max_redo) 

178 else: 

179 logging.warning( 

180 f"Could not find accession '{accession}' from ftp {ftp_dir} in open FTP connection" 

181 ) 

182 

183 

184def _download_file( 

185 ftp_connection: FTP, ftp_file: str, md5_sums: Dict[str, str], dl_dir: Path, max_redo: int = 0 

186) -> None: 

187 """Downloads individual files from FTP server. 

188 

189 Args: 

190 ftp_connection: Established connection FTP object. 

191 ftp_file: Name of ftp file to download. 

192 md5_sums: Dictionary of key value pairs filename - md5_checksums. 

193 dl_dir: Path to downloaded FTP files. 

194 max_redo: Maximum number of connection retry attempts. 

195 """ 

196 has_md5 = True 

197 expected_sum = "" 

198 if ftp_file not in md5_sums: 

199 logging.warning(f" File not in the md5 checksums: {ftp_file}") 

200 has_md5 = False 

201 else: 

202 expected_sum = md5_sums[ftp_file] 

203 local_path = Path(dl_dir, ftp_file) 

204 

205 # File exists? Check md5sum before anything else 

206 if local_path.is_file(): 

207 if has_md5: 207 ↛ 215line 207 didn't jump to line 215 because the condition on line 207 was always true

208 with local_path.open(mode="rb") as fp: 

209 content = fp.read() 

210 file_sum = hashlib.md5(content).hexdigest() 

211 if file_sum == expected_sum: 211 ↛ 216line 211 didn't jump to line 216

212 logging.info(f" File {local_path} is already downloaded properly") 

213 return 

214 else: 

215 logging.info(f" Can't check file (no md5sum), using it as is: {local_path}") 

216 file_sum = "" 

217 redo = 0 

218 

219 while (file_sum != expected_sum) and (redo <= max_redo): 

220 redo += 1 

221 if redo > 1: 221 ↛ 222line 221 didn't jump to line 222 because the condition on line 221 was never true

222 time.sleep(3) 

223 

224 # Download the file 

225 logging.info(f" Downloading file {ftp_file}, try {redo}...") 

226 try: 

227 with local_path.open(mode="wb") as fp: 

228 ftp_connection.retrbinary(f"RETR {ftp_file}", fp.write) 

229 except EOFError: 

230 continue 

231 

232 # Compute checksum 

233 with local_path.open(mode="rb") as fp: 

234 content = fp.read() 

235 file_sum = hashlib.md5(content).hexdigest() 

236 if expected_sum == file_sum: 

237 logging.info(f" Downloaded file properly to {local_path}") 

238 else: 

239 raise FileDownloadError(f"Could not download file {ftp_file} after {redo} tries") 

240 

241 

242def get_files_selection(dl_dir: Path) -> Dict[str, str]: 

243 """Returns a dictionary with the relevant downloaded files classified. 

244 

245 Args: 

246 dl_dir: Local path to downloaded FTP files. 

247 

248 Returns: 

249 Dictionary of file type (e.g.`"report"`) as keys and the relative file path (from `dl_dir`) as values. 

250 

251 Raises: 

252 FileDownloadError: If `dl_dir` tree does not include a file named `*_assembly_report.txt`. 

253 """ 

254 files = {} 

255 root_name = get_root_name(dl_dir) 

256 if root_name == "": 

257 raise FileDownloadError(f"Could not determine the files root name in {dl_dir}") 

258 for dl_file in dl_dir.iterdir(): 

259 for end, name in _FILE_ENDS.items(): 

260 file_with_end = dl_file.name.endswith(end) and not dl_file.name.endswith(f"_from_{end}") 

261 if (root_name and dl_file.name == root_name + end) or file_with_end: 

262 files[name] = str(dl_file) 

263 return files 

264 

265 

266def get_root_name(dl_dir: Path) -> str: 

267 """Returns the root name, i.e. shared files basename prefix, using the assembly report file as base. 

268 

269 Args: 

270 dl_dir: Path location of downloaded FTP files. 

271 """ 

272 root_name = "" 

273 for dl_file in dl_dir.iterdir(): 

274 matches = re.search("^(.+_)assembly_report.txt", dl_file.name) 

275 if matches: 

276 root_name = matches.group(1) 

277 break 

278 return root_name 

279 

280 

281def retrieve_assembly_data( 

282 accession: str, 

283 download_dir: PathLike, 

284 max_increment: int = 0, 

285 max_redo: int = 3, 

286) -> None: 

287 """Establishes an FTP connection and downloads a predefined subset of assembly data files from either 

288 INSDC or RefSeq. 

289 

290 Args: 

291 accession: Genome assembly accession. 

292 download_dir: Path to where to download FTP files. 

293 max_increment: If you want to allow assembly versions. 

294 max_redo: Maximum FTP connection retry attempts. 

295 

296 Raises: 

297 FileDownloadError: If no files are downloaded or if any does not match its MD5 checksum. 

298 """ 

299 download_dir = Path(download_dir) 

300 

301 # Set and create dedicated dir for download 

302 download_dir.mkdir(parents=True, exist_ok=True) 

303 

304 # Download if files don't exist or fail checksum 

305 if not md5_files(download_dir, None): 

306 logging.info(" Download the files") 

307 

308 for increment in range(0, max_increment + 1): 

309 if increment > 0: 

310 logging.info(f" Increment accession version once from {accession}") 

311 version = int(accession[-1]) 

312 version += 1 

313 accession = accession[:-1] + str(version) 

314 download_dir.mkdir(parents=True, exist_ok=True) 

315 ftp_url = "ftp.ncbi.nlm.nih.gov" 

316 ftp_instance = FTP() 

317 open_ftp_connection = establish_ftp(ftp_instance, ftp_url, accession) 

318 download_files(open_ftp_connection, accession, download_dir, max_redo) 

319 

320 if not md5_files(download_dir, None): 320 ↛ 324line 320 didn't jump to line 324 because the condition on line 320 was always true

321 raise FileDownloadError("Failed md5sum of downloaded files") 

322 

323 # Select specific files and give them a name 

324 files = get_files_selection(download_dir) 

325 

326 if len(files) == 0: 326 ↛ 327line 326 didn't jump to line 327 because the condition on line 326 was never true

327 raise FileDownloadError("No file downloaded") 

328 

329 

330def main() -> None: 

331 """Module's entry-point.""" 

332 parser = ArgumentParser(description="Download an assembly data files from INSDC or RefSeq.") 

333 parser.add_argument("--accession", required=True, help="Genome assembly accession") 

334 parser.add_argument_dst_path( 

335 "--download_dir", default=Path.cwd(), help="Folder where the data will be downloaded" 

336 ) 

337 parser.add_argument("--version", action="version", version=ensembl.io.genomio.__version__) 

338 parser.add_log_arguments() 

339 args = parser.parse_args() 

340 init_logging_with_args(args) 

341 

342 retrieve_assembly_data(args.accession, args.download_dir)