-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsearom.py
143 lines (118 loc) · 6.65 KB
/
searom.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
import concurrent.futures
import gzip
import urllib.request
import os
import time
from urllib.parse import unquote
from tqdm import tqdm
from settings import configure_logging, SEAROM, DOWNLOAD_AT
from daemon import Daemon
LOG = configure_logging()
class Downloader(Daemon):
def __init__(self, pidfile: str, url: str):
super().__init__(pidfile)
self.max_length = SEAROM.get('MAXIMUM_LENGTH', 10)
self.failed_chunks = []
self.downloaded_chunks = []
self.retries = SEAROM.get('MAXIMUM_RETRIES', 10)
self.num_chunks = SEAROM.get('MAXIMUM_CHUNKS', 10)
self.url = url
self.save_directory = DOWNLOAD_AT
def run(self):
self.download_url(self.url, self.save_directory)
def download_chunk(self, chunk_info):
chunk_url, chunk_save_path, range_header = chunk_info
if (chunk_url, chunk_save_path, range_header) not in self.downloaded_chunks:
for _ in range(self.retries):
try:
req = urllib.request.Request(chunk_url, headers={"Range": range_header})
# Enable compression
req.add_header("Accept-Encoding", "gzip, deflate")
response = urllib.request.urlopen(req)
# Check if the response is compressed
if response.info().get("Content-Encoding") == "gzip":
response = gzip.GzipFile(fileobj=response)
with open(chunk_save_path, 'wb') as file:
file.write(response.read())
self.downloaded_chunks.append((chunk_url, chunk_save_path, range_header))
return # Download successful, exit the function
except Exception as e:
LOG.info(f"An error occurred during the download: {e}")
LOG.info("Retrying download...")
time.sleep(1) # Wait for 1 second before retrying
LOG.info(f"Download failed for {chunk_save_path}")
LOG.info(f"Logging failed download with path {chunk_save_path} and range header {range_header}")
self.failed_chunks.append((chunk_url, chunk_save_path, range_header))
else:
LOG.info(f"{chunk_save_path} has already been downloaded")
self.failed_chunks.remove((chunk_url, chunk_save_path, range_header))
def download_url(self, url, save_directory):
try:
# Extract the file name from the URL or response headers
file_name = os.path.basename(url)
file_name = self.truncate_filename(file_name,
max_length=self.max_length) # Set the maximum length as desired
response = urllib.request.urlopen(url)
headers = dict(response.headers)
if 'Content-Disposition' in headers:
disposition = headers['Content-Disposition']
file_name = disposition.split('filename=')[1].strip('"\'')
file_name = self.truncate_filename(file_name,
max_length=self.max_length) # Set the maximum length as desired
# Generate a unique file name based on the current timestamp
save_path = os.path.join(save_directory, file_name)
# Get the file size
file_size = int(headers["Content-Length"])
# Define the number of chunks to download and chunk size
# num_chunks = 100 # Adjust this value to control the number of concurrent downloads
chunk_size = file_size // self.num_chunks
# Generate chunk URLs and save paths
chunks = []
for i in range(self.num_chunks):
start_byte = i * chunk_size
end_byte = start_byte + chunk_size - 1 if i < self.num_chunks - 1 else ""
range_header = f"bytes={start_byte}-{end_byte}"
chunk_url = url if isinstance(url, str) else url.decode()
chunk_save_path = f"{save_path}.part{i}"
chunks.append((chunk_url, chunk_save_path, range_header))
# Perform concurrent downloads with progress indicator
with tqdm(total=file_size, unit="B", unit_scale=True, unit_divisor=1024, miniters=1, desc=file_name) as t:
with concurrent.futures.ThreadPoolExecutor(max_workers=self.num_chunks) as executor:
download_futures = [executor.submit(self.download_chunk, chunk_info) for chunk_info in chunks]
for future in concurrent.futures.as_completed(download_futures):
t.update(chunk_size)
# Retry failed chunks
allowed_retries = 0
while self.failed_chunks and allowed_retries < self.retries:
LOG.info("Retrying failed chunks...")
failed_chunks_copy = self.failed_chunks.copy()
self.failed_chunks = []
with tqdm(total=file_size, unit="B", unit_scale=True, unit_divisor=1024, miniters=1, desc=file_name) as t:
with concurrent.futures.ThreadPoolExecutor(max_workers=len(failed_chunks_copy)) as executor:
retry_download_futures = [executor.submit(self.download_chunk, chunk_info)
for chunk_info in failed_chunks_copy]
for future in concurrent.futures.as_completed(retry_download_futures):
t.update(chunk_size)
# Merge downloaded chunks into a single file
with open(save_path, "wb") as outfile:
for i in range(self.num_chunks):
chunk_save_path = f"{save_path}.part{i}"
with open(chunk_save_path, "rb") as infile:
outfile.write(infile.read())
# Delete chunk file after merging
os.remove(chunk_save_path)
LOG.info("Download completed successfully!")
# clean up .crdownload
files = os.listdir(self.save_directory)
# Filter files with .crdownload extension
[os.remove(file) for file in files if file.endswith(".crdownload")]
LOG.info("CRDownload removed successfully!")
except Exception as e:
LOG.info(f"An error occurred during the download: {e}")
def truncate_filename(self, filename, max_length):
filename = unquote(filename.split("?filename=")[-1])
if len(filename) > max_length:
basename, extension = os.path.splitext(filename)
truncated_name = basename[:max_length - len(extension)] + extension
return truncated_name
return filename