-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfetch-mainnet-blocks.py
190 lines (155 loc) · 7.24 KB
/
fetch-mainnet-blocks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
import requests
import json
import csv
import time
import logging
import argparse
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
API_URLS = [
"https://rpc-us.exsat.network/v1/chain/get_table_rows",
"https://as-node.defibox.xyz/v1/chain/get_table_rows"
]
OUTPUT_FILE = "block_main.csv"
CHECKPOINT_FILE = "checkpoint-block-main.json"
BATCH_SIZE = 1000
MAX_RETRIES = 5
INITIAL_RETRY_DELAY = 1
MAX_RETRY_DELAY = 60
RANGE_SIZE = 100000
MAX_ID = 839999
def fetch_data(api_url, lower_bound, upper_bound, retry_count=0):
payload = {
"json": True,
"code": "utxomng.xsat",
"scope": "utxomng.xsat",
"table": "blocks",
"lower_bound": str(lower_bound),
"upper_bound": str(upper_bound),
"index_position": 1,
"key_type": "",
"limit": str(BATCH_SIZE),
"reverse": False,
"show_payer": False
}
try:
response = requests.post(api_url, json=payload, timeout=30)
response.raise_for_status()
return response.json()
except requests.RequestException as e:
if retry_count < MAX_RETRIES:
delay = min(INITIAL_RETRY_DELAY * (2 ** retry_count), MAX_RETRY_DELAY)
logging.warning(f"Request failed for {api_url}. Retrying in {delay:.2f} seconds... (Attempt {retry_count + 1}/{MAX_RETRIES})")
time.sleep(delay)
return fetch_data(api_url, lower_bound, upper_bound, retry_count + 1)
else:
logging.error(f"Failed to fetch data from {api_url} after {MAX_RETRIES} attempts: {e}")
return None
def process_batch(api_url, lower_bound, upper_bound):
response_data = fetch_data(api_url, lower_bound, upper_bound)
if not response_data:
return None, lower_bound, False
rows = []
for item in response_data.get('rows', []):
data = item.get('data', item)
rows.append({
'height': int(data['height']),
'hash': data['hash'],
'cumulative_work': data['cumulative_work'],
'version': int(data['version']),
'previous_block_hash': data['previous_block_hash'],
'merkle': data['merkle'],
'timestamp': data['timestamp'],
'bits': int(data['bits']),
'nonce': int(data['nonce']),
})
more = response_data.get('more', False)
next_key = response_data.get('next_key', '')
if next_key == '':
logging.warning(f"Empty next_key received for range {lower_bound}-{upper_bound}. Using upper_bound as next_key.")
next_key = str(upper_bound)
try:
next_key = int(next_key)
except ValueError:
logging.error(f"Invalid next_key received: '{next_key}'. Using upper_bound as next_key.")
next_key = upper_bound
return rows, next_key, more
def save_checkpoint(ranges, total_processed):
with open(CHECKPOINT_FILE, 'w') as f:
json.dump({"height ranges": ranges, "total_processed": total_processed}, f)
def load_checkpoint():
try:
with open(CHECKPOINT_FILE, 'r') as f:
return json.load(f)
except FileNotFoundError:
return None
def main(start_id=None, max_records=None, continue_on_error=False):
checkpoint = load_checkpoint() if start_id is None else None
if checkpoint:
ranges = checkpoint["ranges"]
total_processed = checkpoint["total_processed"]
logging.info(f"Resuming from checkpoint. Starting at ranges: {ranges}")
else:
start_id = int(start_id or 1)
ranges = [(start_id + i * RANGE_SIZE, min(start_id + (i + 1) * RANGE_SIZE - 1, MAX_ID)) for i in range(len(API_URLS))]
total_processed = 0
start_time = time.time()
more = True
file_exists = os.path.exists(OUTPUT_FILE) and os.path.getsize(OUTPUT_FILE) > 0
with open(OUTPUT_FILE, 'a', newline='') as csvfile:
writer = csv.writer(csvfile)
if not file_exists:
writer.writerow(['height', 'hash', 'cumulative_work', 'version', 'previous_block_hash','merkle','timestamp','bits','nonce'])
with ThreadPoolExecutor(max_workers=len(API_URLS)) as executor:
while more and (max_records is None or total_processed < max_records):
futures = []
for i, api_url in enumerate(API_URLS):
lower, upper = ranges[i]
if lower > MAX_ID:
continue
futures.append(executor.submit(process_batch, api_url, lower, upper))
all_rows = []
for future in as_completed(futures):
try:
result = future.result()
if result[0] is None:
if not continue_on_error:
logging.error("Critical error occurred. Stopping the process.")
return
continue
rows, next_key, batch_more = result
all_rows.extend(rows)
api_index = futures.index(future)
ranges[api_index] = (next_key, ranges[api_index][1])
if not batch_more or next_key > ranges[api_index][1]:
# Move to the next range
new_lower = ranges[api_index][1] + 1
new_upper = min(new_lower + RANGE_SIZE - 1, MAX_ID)
ranges[api_index] = (new_lower, new_upper)
more = more or batch_more
except Exception as e:
logging.error(f"Error processing batch: {e}")
if not continue_on_error:
raise
sorted_rows = sorted(all_rows, key=lambda x: x['height'])
writer.writerows([[row['height'], row['hash'], row['cumulative_work'], row['version'], row['previous_block_hash'], row['merkle'], row['timestamp'], row['bits'], row['nonce']] for row in sorted_rows])
total_processed += len(sorted_rows)
csvfile.flush()
save_checkpoint(ranges, total_processed)
if max_records and total_processed >= max_records:
break
logging.info(f"Processed {total_processed} unique records. Current ranges: {ranges}")
time.sleep(0.1)
if all(lower > MAX_ID for lower, _ in ranges):
logging.info(f"Reached maximum height of {MAX_ID}. Stopping the process.")
break
end_time = time.time()
logging.info(f"Finished processing {total_processed} unique records in {end_time - start_time:.2f} seconds")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Fetch spring table data")
parser.add_argument("--start_id", help="Starting ID for data fetch")
parser.add_argument("--max_records", type=int, help="Maximum number of records to fetch")
parser.add_argument("--continue_on_error", action="store_true", help="Continue fetching data even if some requests fail")
args = parser.parse_args()
main(args.start_id, args.max_records, args.continue_on_error)