diff --git "a/docs/\347\210\254\350\231\253\345\205\245\351\227\250/11_\347\210\254\350\231\253\345\205\245\351\227\250\345\256\236\346\210\2304_\351\253\230\346\225\210\347\216\207\347\232\204\347\210\254\350\231\253\345\256\236\347\216\260.md" "b/docs/\347\210\254\350\231\253\345\205\245\351\227\250/11_\347\210\254\350\231\253\345\205\245\351\227\250\345\256\236\346\210\2304_\351\253\230\346\225\210\347\216\207\347\232\204\347\210\254\350\231\253\345\256\236\347\216\260.md" index e69de29..e327a8d 100644 --- "a/docs/\347\210\254\350\231\253\345\205\245\351\227\250/11_\347\210\254\350\231\253\345\205\245\351\227\250\345\256\236\346\210\2304_\351\253\230\346\225\210\347\216\207\347\232\204\347\210\254\350\231\253\345\256\236\347\216\260.md" +++ "b/docs/\347\210\254\350\231\253\345\205\245\351\227\250/11_\347\210\254\350\231\253\345\205\245\351\227\250\345\256\236\346\210\2304_\351\253\230\346\225\210\347\216\207\347\232\204\347\210\254\350\231\253\345\256\236\347\216\260.md" @@ -0,0 +1,640 @@ +# 爬虫入门实战4_高效率的爬虫实现 + +在现代爬虫开发中,提高爬取效率是一个永恒的话题。本文将深入探讨Python中的多进程、多线程和协程三种并发编程方式,分析它们在爬虫开发中的应用,并通过实际案例比较它们的性能表现。 + +## 1. 并发编程概述 + +### 1.1 多进程(Multiprocessing) + +多进程是指在操作系统中同时运行多个独立的进程。每个进程都有自己的内存空间和系统资源。 + +优点: +- 可以充分利用多核CPU +- 绕过Python的全局解释器锁(GIL) +- 进程间内存隔离,更安全 + +缺点: +- 进程创建和切换开销大 +- 进程间通信相对复杂 +- 占用较多系统资源 + +### 1.2 多线程(Multithreading) + +多线程是在同一进程内创建多个线程,共享进程的内存空间。 + +优点: +- 资源占用相对较少 +- 线程间共享内存,通信方便 +- 适合I/O密集型任务 + +缺点: +- 受Python GIL限制,难以充分利用多核CPU +- 需要考虑线程安全问题 +- 调试相对困难 + +### 1.3 协程(Coroutine) + +协程是一种用户态的轻量级线程,通过协作式多任务实现并发。 + +优点: +- 极低的系统开销 +- 高效处理I/O密集型任务 +- 编程模型简单,易于理解 + +缺点: +- 不适合CPU密集型任务 +- 需要特定的库支持(如asyncio) +- 对于长时间运行的I/O操作可能会阻塞事件循环 + +## 2. 并发编程的演变 +> 最新的3.13已经支持编译一个无gil版本的python,后面可能python真的要起飞🛫了 + +Python并发编程的演变历程: +1. 早期:单线程同步编程 +2. Python 2.x:引入threading模块,支持多线程 +3. Python 2.6+:引入multiprocessing模块,支持多进程 +4. Python 3.4+:引入asyncio模块,支持协程 +5. Python 3.5+:引入async/await语法,简化协程编写 + +这种演变反映了开发者对更高效、更易用的并发编程方式的不懈追求。 + +## 3. 最简单的基本示例 + +### 3.1 多进程示例 + +```python +import multiprocessing +import time + +def worker(num): + print(f"Worker {num} started") + time.sleep(2) + print(f"Worker {num} finished") + +if __name__ == "__main__": + processes = [] + for i in range(5): + p = multiprocessing.Process(target=worker, args=(i,)) + processes.append(p) + p.start() + + for p in processes: + p.join() + + print("All processes completed") +``` + +### 3.2 多线程示例 + +```python +import threading +import time + +def worker(num): + print(f"Thread {num} started") + time.sleep(2) + print(f"Thread {num} finished") + +threads = [] +for i in range(5): + t = threading.Thread(target=worker, args=(i,)) + threads.append(t) + t.start() + +for t in threads: + t.join() + +print("All threads completed") +``` + +### 3.3 协程示例 + +```python +import asyncio + +async def worker(num): + print(f"Coroutine {num} started") + await asyncio.sleep(2) + print(f"Coroutine {num} finished") + +async def main(): + tasks = [asyncio.create_task(worker(i)) for i in range(5)] + await asyncio.gather(*tasks) + +asyncio.run(main()) +print("All coroutines completed") +``` + +上述的三个示例,在时间上都是2秒,但是在内存上有所不同,多进程的内存占用最大,多线程次之,协程最小。 + +## 4. 爬虫中的应用 + +在爬虫开发中,这三种并发方式各有其适用场景: + +- 多进程:适合需要绕过GIL、利用多核CPU的场景,如大规模数据处理。 +- 多线程:适合I/O密集型任务,如同时爬取多个网页。 +- 协程:最适合大量并发I/O操作,如高并发的网络请求。 + +## 5. 实战对比 + +我们将使用多进程、多线程和协程三种方式实现同一个爬虫任务:爬取Yahoo Finance的加密货币数据。我们会比较它们的性能表现。 + +如果不熟悉 Yahoo Finance 的加密货币数据 可以回过头去看 [09_爬虫入门实战2_动态数据提取.md](09_爬虫入门实战2_动态数据提取.md) 章节 + + +### 5.1 多进程版本实现 +```python +# -*- coding: utf-8 -*- +import csv +import time +from typing import Any, Dict, List +from multiprocessing import Pool, cpu_count + +import requests +from common import SymbolContent, make_req_params_and_headers + +HOST = "https://query1.finance.yahoo.com" +SYMBOL_QUERY_API_URI = "/v1/finance/screener" +PAGE_SIZE = 100 # 可选配置(25, 50, 100) + + +def parse_symbol_content(quote_item: Dict) -> SymbolContent: + """ + 数据提取 + :param quote_item: + :return: + """ + symbol_content = SymbolContent() + symbol_content.symbol = quote_item["symbol"] + symbol_content.name = quote_item["shortName"] + symbol_content.price = quote_item["regularMarketPrice"]["fmt"] + symbol_content.change_price = quote_item["regularMarketChange"]["fmt"] + symbol_content.change_percent = quote_item["regularMarketChangePercent"]["fmt"] + symbol_content.market_price = quote_item["marketCap"]["fmt"] + return symbol_content + + +def send_request(page_start: int, page_size: int) -> Dict[str, Any]: + """ + 公共的发送请求的函数 + :param page_start: 分页起始位置 + :param page_size: 每一页的长度 + :return: + """ + # print(f"[send_request] page_start:{page_start}") + req_url = HOST + SYMBOL_QUERY_API_URI + common_params, headers, common_payload_data = make_req_params_and_headers() + # 修改分页变动参数 + common_payload_data["offset"] = page_start + common_payload_data["size"] = page_size + + response = requests.post(url=req_url, params=common_params, json=common_payload_data, headers=headers) + if response.status_code != 200: + raise Exception("发起请求时发生异常,请求发生错误,原因:", response.text) + try: + response_dict: Dict = response.json() + return response_dict + except Exception as e: + raise e + + +def fetch_currency_data_single(page_start: int) -> List[SymbolContent]: + """ + Fetch currency data for a single page. + :param page_start: Page start index. + :return: List of SymbolContent for the page. + """ + try: + response_dict: Dict = send_request(page_start=page_start, page_size=PAGE_SIZE) + symbol_data_list: List[SymbolContent] = [ + parse_symbol_content(quote) for quote in response_dict["finance"]["result"][0]["quotes"] + ] + return symbol_data_list + except Exception as e: + print(f"Error fetching data for page_start={page_start}: {e}") + return [] + + +def fetch_currency_data_list(max_total_count: int) -> List[SymbolContent]: + """ + Fetch currency data using multiprocessing. + :param max_total_count: Maximum total count of currencies. + :return: List of all SymbolContent. + """ + with Pool(processes=cpu_count()) as pool: + page_starts = list(range(0, max_total_count, PAGE_SIZE)) + print(f"总共发起: {len(page_starts)} 次网络请求") + results = pool.map(fetch_currency_data_single, page_starts) + + # Flatten the list of lists into a single list + return [item for sublist in results for item in sublist] + + +def get_max_total_count() -> int: + """ + 获取所有币种总数量 + :return: + """ + print("开始获取最大的币种数量") + try: + response_dict: Dict = send_request(page_start=0, page_size=PAGE_SIZE) + total_num: int = response_dict["finance"]["result"][0]["total"] + print(f"获取到 {total_num} 种币种") + return total_num + except Exception as e: + print("错误信息:", e) + return 0 + + +def save_data_to_csv(save_file_name: str, currency_data_list: List[SymbolContent]) -> None: + """ + 保存数据存储到CSV文件中 + :param save_file_name: 保存的文件名 + :param currency_data_list: + :return: + """ + with open(save_file_name, mode='w', newline='', encoding='utf-8') as file: + writer = csv.writer(file) + # 写入标题行 + writer.writerow(SymbolContent.get_fields()) + # 遍历数据列表,并将每个币种的名称写入CSV + for symbol in currency_data_list: + writer.writerow([symbol.symbol, symbol.name, symbol.price, symbol.change_price, symbol.change_percent, + symbol.market_price]) + + +def run_crawler_mp(save_file_name: str) -> None: + """ + 爬虫主流程(多进程版本) + :param save_file_name: + :return: + """ + # step1 获取最大数据总量 + max_total: int = get_max_total_count() + # step2 遍历每一页数据并解析存储到数据容器中 + data_list: List[SymbolContent] = fetch_currency_data_list(max_total) + # step3 将数据容器中的数据保存csv + save_data_to_csv(save_file_name, data_list) + + +if __name__ == '__main__': + start_time = time.time() + save_csv_file_name = f"symbol_data_{int(start_time)}.csv" + run_crawler_mp(save_csv_file_name) + end_time = time.time() + print(f"多进程执行程序耗时: {end_time - start_time} 秒") +``` + +### 5.2 多线程版本 +```python +# -*- coding: utf-8 -*- +import csv +import time +from os import cpu_count +from typing import Any, Dict, List +from concurrent.futures import ThreadPoolExecutor + +import requests +from common import SymbolContent, make_req_params_and_headers + +HOST = "https://query1.finance.yahoo.com" +SYMBOL_QUERY_API_URI = "/v1/finance/screener" +PAGE_SIZE = 100 # 可选配置(25, 50, 100) + + +def parse_symbol_content(quote_item: Dict) -> SymbolContent: + """ + 数据提取 + :param quote_item: + :return: + """ + symbol_content = SymbolContent() + symbol_content.symbol = quote_item["symbol"] + symbol_content.name = quote_item["shortName"] + symbol_content.price = quote_item["regularMarketPrice"]["fmt"] + symbol_content.change_price = quote_item["regularMarketChange"]["fmt"] + symbol_content.change_percent = quote_item["regularMarketChangePercent"]["fmt"] + symbol_content.market_price = quote_item["marketCap"]["fmt"] + return symbol_content + + +def send_request(page_start: int, page_size: int) -> Dict[str, Any]: + """ + 公共的发送请求的函数 + :param page_start: 分页起始位置 + :param page_size: 每一页的长度 + :return: + """ + # print(f"[send_request] page_start:{page_start}") + req_url = HOST + SYMBOL_QUERY_API_URI + common_params, headers, common_payload_data = make_req_params_and_headers() + # 修改分页变动参数 + common_payload_data["offset"] = page_start + common_payload_data["size"] = page_size + + response = requests.post(url=req_url, params=common_params, json=common_payload_data, headers=headers) + if response.status_code != 200: + raise Exception("发起请求时发生异常,请求发生错误,原因:", response.text) + try: + response_dict: Dict = response.json() + return response_dict + except Exception as e: + raise e + + +def fetch_currency_data_single(page_start: int) -> List[SymbolContent]: + """ + Fetch currency data for a single page. + :param page_start: Page start index. + :return: List of SymbolContent for the page. + """ + try: + response_dict: Dict = send_request(page_start=page_start, page_size=PAGE_SIZE) + return [ + parse_symbol_content(quote) for quote in response_dict["finance"]["result"][0]["quotes"] + ] + except Exception as e: + print(f"Error fetching data for page_start={page_start}: {e}") + return [] + + +def fetch_currency_data_list(max_total_count: int) -> List[SymbolContent]: + """ + Fetch currency data using multithreading. + :param max_total_count: Maximum total count of currencies. + :return: List of all SymbolContent. + """ + with ThreadPoolExecutor(max_workers=cpu_count() * 2) as executor: + page_starts = list(range(0, max_total_count, PAGE_SIZE)) + print(f"总共发起: {len(page_starts)} 次网络请求") + + # 使用 map 方法 + results = list(executor.map(fetch_currency_data_single, page_starts)) + + # 扁平化结果列表 + return [item for sublist in results for item in sublist] + + +def get_max_total_count() -> int: + """ + 获取所有币种总数量 + :return: + """ + print("开始获取最大的币种数量") + try: + response_dict: Dict = send_request(page_start=0, page_size=PAGE_SIZE) + total_num: int = response_dict["finance"]["result"][0]["total"] + print(f"获取到 {total_num} 种币种") + return total_num + except Exception as e: + print("错误信息:", e) + return 0 + + +def save_data_to_csv(save_file_name: str, currency_data_list: List[SymbolContent]) -> None: + """ + 保存数据存储到CSV文件中 + :param save_file_name: 保存的文件名 + :param currency_data_list: + :return: + """ + with open(save_file_name, mode='w', newline='', encoding='utf-8') as file: + writer = csv.writer(file) + # 写入标题行 + writer.writerow(SymbolContent.get_fields()) + # 遍历数据列表,并将每个币种的名称写入CSV + for symbol in currency_data_list: + writer.writerow([symbol.symbol, symbol.name, symbol.price, symbol.change_price, symbol.change_percent, + symbol.market_price]) + + +def run_crawler_mt(save_file_name: str) -> None: + """ + 爬虫主流程(多线程版本) + :param save_file_name: + :return: + """ + # step1 获取最大数据总量 + max_total: int = get_max_total_count() + # step2 遍历每一页数据并解析存储到数据容器中 + data_list: List[SymbolContent] = fetch_currency_data_list(max_total) + # step3 将数据容器中的数据保存csv + save_data_to_csv(save_file_name, data_list) + + +if __name__ == '__main__': + start_time = time.time() + save_csv_file_name = f"symbol_data_{int(start_time)}.csv" + run_crawler_mt(save_csv_file_name) + end_time = time.time() + print(f"多线程执行程序耗时: {end_time - start_time} 秒") +``` + +### 5.3 协程版本实现 +```python +# -*- coding: utf-8 -*- +import asyncio +import csv +import time +from typing import Any, Dict, List + +import aiofiles +import httpx +from common import SymbolContent, make_req_params_and_headers + +HOST = "https://query1.finance.yahoo.com" +SYMBOL_QUERY_API_URI = "/v1/finance/screener" +PAGE_SIZE = 100 # 可选配置(25, 50, 100) + + +def parse_symbol_content(quote_item: Dict) -> SymbolContent: + """ + 数据提取 + :param quote_item: + :return: + """ + symbol_content = SymbolContent() + symbol_content.symbol = quote_item["symbol"] + symbol_content.name = quote_item["shortName"] + symbol_content.price = quote_item["regularMarketPrice"]["fmt"] + symbol_content.change_price = quote_item["regularMarketChange"]["fmt"] + symbol_content.change_percent = quote_item["regularMarketChangePercent"]["fmt"] + symbol_content.market_price = quote_item["marketCap"]["fmt"] + return symbol_content + + +async def send_request(page_start: int, page_size: int) -> Dict[str, Any]: + """ + 公共的发送请求的函数 + :param page_start: 分页起始位置 + :param page_size: 每一页的长度 + :return: + """ + # print(f"[send_request] page_start:{page_start}") + req_url = HOST + SYMBOL_QUERY_API_URI + common_params, headers, common_payload_data = make_req_params_and_headers() + # 修改分页变动参数 + common_payload_data["offset"] = page_start + common_payload_data["size"] = page_size + + async with httpx.AsyncClient() as client: + response = await client.post(url=req_url, params=common_params, json=common_payload_data, headers=headers, + timeout=30) + if response.status_code != 200: + raise Exception("发起请求时发生异常,请求发生错误,原因:", response.text) + try: + response_dict: Dict = response.json() + return response_dict + except Exception as e: + raise e + + +async def fetch_currency_data_single(page_start: int) -> List[SymbolContent]: + """ + Fetch currency data for a single page. + :param page_start: Page start index. + :return: List of SymbolContent for the page. + """ + try: + response_dict: Dict = await send_request(page_start=page_start, page_size=PAGE_SIZE) + return [ + parse_symbol_content(quote) for quote in response_dict["finance"]["result"][0]["quotes"] + ] + except Exception as e: + print(f"Error fetching data for page_start={page_start}: {e}") + return [] + + +async def fetch_currency_data_list(max_total_count: int) -> List[SymbolContent]: + """ + Fetch currency data using asyncio. + :param max_total_count: Maximum total count of currencies. + :return: List of all SymbolContent. + """ + page_starts = list(range(0, max_total_count, PAGE_SIZE)) + print(f"总共发起: {len(page_starts)} 次网络请求") + + tasks = [fetch_currency_data_single(page_start) for page_start in page_starts] + results = await asyncio.gather(*tasks) + + # 扁平化结果列表 + return [item for sublist in results for item in sublist] + + +async def get_max_total_count() -> int: + """ + 获取所有币种总数量 + :return: + """ + print("开始获取最大的币种数量") + try: + response_dict: Dict = await send_request(page_start=0, page_size=PAGE_SIZE) + total_num: int = response_dict["finance"]["result"][0]["total"] + print(f"获取到 {total_num} 种币种") + return total_num + except Exception as e: + print("错误信息:", e) + return 0 + + +async def save_data_to_csv(save_file_name: str, currency_data_list: List[SymbolContent]) -> None: + """ + 保存数据存储到CSV文件中 + :param save_file_name: 保存的文件名 + :param currency_data_list: + :return: + """ + async with aiofiles.open(save_file_name, mode='w', newline='', encoding='utf-8') as file: + writer = csv.writer(file) + # 写入标题行 + await file.write(','.join(SymbolContent.get_fields()) + '\n') + # 遍历数据列表,并将每个币种的名称写入CSV + for symbol in currency_data_list: + await file.write(f"{symbol.symbol},{symbol.name},{symbol.price},{symbol.change_price},{symbol.change_percent},{symbol.market_price}\n") + + +async def run_crawler_async(save_file_name: str) -> None: + """ + 爬虫主流程(异步并发版本) + :param save_file_name: + :return: + """ + # step1 获取最大数据总量 + max_total: int = await get_max_total_count() + # step2 遍历每一页数据并解析存储到数据容器中 + data_list: List[SymbolContent] = await fetch_currency_data_list(max_total) + # step3 将数据容器中的数据保存csv + await save_data_to_csv(save_file_name, data_list) + +async def main(): + """ + 主函数 + :return: + """ + start_time = time.time() + save_csv_file_name = f"symbol_data_{int(start_time)}.csv" + await run_crawler_async(save_csv_file_name) + end_time = time.time() + print(f"asyncio调度协程执行程序耗时: {end_time - start_time} 秒") + + +if __name__ == '__main__': + asyncio.run(main()) + + +``` + +> 上述源代码路径:[11_爬虫入门实战4_高效率的爬虫实现](https://github.com/NanmiCoder/CrawlerTutorial/tree/main/%E6%BA%90%E4%BB%A3%E7%A0%81/%E7%88%AC%E8%99%AB%E5%85%A5%E9%97%A8/11_%E7%88%AC%E8%99%AB%E5%85%A5%E9%97%A8%E5%AE%9E%E6%88%984_%E9%AB%98%E6%95%88%E7%8E%87%E7%9A%84%E7%88%AC%E8%99%AB%E5%AE%9E%E7%8E%B0) + +### 5.4 执行耗时 + + +#### 5.4.1 多线程执行耗时 +开始获取最大的币种数量
+获取到 9967 种币种
+总共发起: 100 次网络请求
+多线程执行程序耗时: 7.992658853530884 秒
+ + +#### 5.4.2 多进程执行耗时 +开始获取最大的币种数量
+获取到 9967 种币种
+总共发起: 100 次网络请求
+多进程执行程序耗时: 17.447596073150635 秒
+ + +#### 5.4.3 协程执行耗时 +开始获取最大的币种数量
+获取到 9967 种币种
+总共发起: 100 次网络请求
+asyncio调度协程执行程序耗时: 4.690491199493408 秒
+ + +## 6. 上述代码总结 +> 我比较喜欢使用异步协程,因为编程风格很像同步代码,并且还能带来高效率的表现。 + +### 6.1. 多线程(run_crawler_multi_thread.py) +适用场景:适合I/O密集型任务,如网络请求,因为线程在等待I/O操作(如网络响应)时可以让出CPU给其他线程。 +实现逻辑: +- 使用ThreadPoolExecutor来管理线程池。 +- 将任务(获取单页货币数据)分配给线程池中的线程执行。 +- 使用executor.map来并行处理多个页面的数据获取,这个方法会自动处理任务的分配和结果的收集。 + +### 6.2. 多进程(run_crawler_multi_process.py) +适用场景:适合CPU密集型任务,但在这个案例中,它用于处理I/O密集型任务,这通常不是最佳选择,因为进程间通信成本较高。 +实现逻辑: +- 使用multiprocessing.Pool来创建进程池。 +- 类似于多线程,使用pool.map来并行处理多个页面的数据获取。 +- 进程间的数据传递通过序列化和反序列化实现,这可能会引入额外的开销。 + +### 6.3. 协程(run_crawler_multi_coroutine.py) +适用场景:非常适合I/O密集型任务,如网络请求。协程通过事件循环和非阻塞I/O操作提高程序的执行效率。 +实现逻辑: +- 使用asyncio库来管理协程。 +- 使用httpx.AsyncClient进行异步HTTP请求,这允许在等待网络响应时不阻塞程序的其他部分。 +- 使用asyncio.gather来并发执行多个协程,这些协程分别处理不同页面的数据获取。 + +总结 +- 多线程和多进程都可以处理并发任务,但在处理大量的网络I/O操作时,它们可能不如协程高效。 +- 协程提供了最高的效率和最佳的资源利用率,特别是在处理网络I/O密集型任务时。 +- 在选择并发策略时,应考虑任务的类型(CPU密集型还是I/O密集型)、系统的资源(如CPU核心数)以及程序的复杂性。 \ No newline at end of file diff --git "a/\346\272\220\344\273\243\347\240\201/\347\210\254\350\231\253\345\205\245\351\227\250/11_\347\210\254\350\231\253\345\205\245\351\227\250\345\256\236\346\210\2304_\351\253\230\346\225\210\347\216\207\347\232\204\347\210\254\350\231\253\345\256\236\347\216\260/1_multi_process_demo.py" "b/\346\272\220\344\273\243\347\240\201/\347\210\254\350\231\253\345\205\245\351\227\250/11_\347\210\254\350\231\253\345\205\245\351\227\250\345\256\236\346\210\2304_\351\253\230\346\225\210\347\216\207\347\232\204\347\210\254\350\231\253\345\256\236\347\216\260/1_multi_process_demo.py" new file mode 100644 index 0000000..7dc54c3 --- /dev/null +++ "b/\346\272\220\344\273\243\347\240\201/\347\210\254\350\231\253\345\205\245\351\227\250/11_\347\210\254\350\231\253\345\205\245\351\227\250\345\256\236\346\210\2304_\351\253\230\346\225\210\347\216\207\347\232\204\347\210\254\350\231\253\345\256\236\347\216\260/1_multi_process_demo.py" @@ -0,0 +1,22 @@ +# -*- coding: utf-8 -*- +import multiprocessing +import time + + +def worker(num): + print(f"Worker {num} started") + time.sleep(2) + print(f"Worker {num} finished") + + +if __name__ == "__main__": + processes = [] + for i in range(5): + p = multiprocessing.Process(target=worker, args=(i,)) + processes.append(p) + p.start() + + for p in processes: + p.join() + + print("All processes completed") \ No newline at end of file diff --git "a/\346\272\220\344\273\243\347\240\201/\347\210\254\350\231\253\345\205\245\351\227\250/11_\347\210\254\350\231\253\345\205\245\351\227\250\345\256\236\346\210\2304_\351\253\230\346\225\210\347\216\207\347\232\204\347\210\254\350\231\253\345\256\236\347\216\260/2_multi_thread_demo.py" "b/\346\272\220\344\273\243\347\240\201/\347\210\254\350\231\253\345\205\245\351\227\250/11_\347\210\254\350\231\253\345\205\245\351\227\250\345\256\236\346\210\2304_\351\253\230\346\225\210\347\216\207\347\232\204\347\210\254\350\231\253\345\256\236\347\216\260/2_multi_thread_demo.py" new file mode 100644 index 0000000..10104b7 --- /dev/null +++ "b/\346\272\220\344\273\243\347\240\201/\347\210\254\350\231\253\345\205\245\351\227\250/11_\347\210\254\350\231\253\345\205\245\351\227\250\345\256\236\346\210\2304_\351\253\230\346\225\210\347\216\207\347\232\204\347\210\254\350\231\253\345\256\236\347\216\260/2_multi_thread_demo.py" @@ -0,0 +1,18 @@ +import threading +import time + +def worker(num): + print(f"Thread {num} started") + time.sleep(2) + print(f"Thread {num} finished") + +threads = [] +for i in range(5): + t = threading.Thread(target=worker, args=(i,)) + threads.append(t) + t.start() + +for t in threads: + t.join() + +print("All threads completed") \ No newline at end of file diff --git "a/\346\272\220\344\273\243\347\240\201/\347\210\254\350\231\253\345\205\245\351\227\250/11_\347\210\254\350\231\253\345\205\245\351\227\250\345\256\236\346\210\2304_\351\253\230\346\225\210\347\216\207\347\232\204\347\210\254\350\231\253\345\256\236\347\216\260/3_multi_coroutine_demo.py" "b/\346\272\220\344\273\243\347\240\201/\347\210\254\350\231\253\345\205\245\351\227\250/11_\347\210\254\350\231\253\345\205\245\351\227\250\345\256\236\346\210\2304_\351\253\230\346\225\210\347\216\207\347\232\204\347\210\254\350\231\253\345\256\236\347\216\260/3_multi_coroutine_demo.py" new file mode 100644 index 0000000..ad6ec01 --- /dev/null +++ "b/\346\272\220\344\273\243\347\240\201/\347\210\254\350\231\253\345\205\245\351\227\250/11_\347\210\254\350\231\253\345\205\245\351\227\250\345\256\236\346\210\2304_\351\253\230\346\225\210\347\216\207\347\232\204\347\210\254\350\231\253\345\256\236\347\216\260/3_multi_coroutine_demo.py" @@ -0,0 +1,14 @@ +# -*- coding: utf-8 -*- +import asyncio + +async def worker(num): + print(f"Coroutine {num} started") + await asyncio.sleep(2) + print(f"Coroutine {num} finished") + +async def main(): + tasks = [asyncio.create_task(worker(i)) for i in range(5)] + await asyncio.gather(*tasks) + +asyncio.run(main()) +print("All coroutines completed") \ No newline at end of file diff --git "a/\346\272\220\344\273\243\347\240\201/\347\210\254\350\231\253\345\205\245\351\227\250/11_\347\210\254\350\231\253\345\205\245\351\227\250\345\256\236\346\210\2304_\351\253\230\346\225\210\347\216\207\347\232\204\347\210\254\350\231\253\345\256\236\347\216\260/common.py" "b/\346\272\220\344\273\243\347\240\201/\347\210\254\350\231\253\345\205\245\351\227\250/11_\347\210\254\350\231\253\345\205\245\351\227\250\345\256\236\346\210\2304_\351\253\230\346\225\210\347\216\207\347\232\204\347\210\254\350\231\253\345\256\236\347\216\260/common.py" new file mode 100644 index 0000000..dda1f12 --- /dev/null +++ "b/\346\272\220\344\273\243\347\240\201/\347\210\254\350\231\253\345\205\245\351\227\250/11_\347\210\254\350\231\253\345\205\245\351\227\250\345\256\236\346\210\2304_\351\253\230\346\225\210\347\216\207\347\232\204\347\210\254\350\231\253\345\256\236\347\216\260/common.py" @@ -0,0 +1,73 @@ +# -*- coding: utf-8 -*- +# @Author : relakkes@gmail.com +# @Name : 程序员阿江-Relakkes +# @Time : 2024/4/7 20:54 +# @Desc : 存放一些公共的函数 +from typing import List + + +class SymbolContent: + symbol: str = "" + name: str = "" + price: str = "" # 价格(盘中) + change_price: str = "" # 跌涨价格 + change_percent: str = "" # 跌涨幅 + market_price: str = "" # 市值 + + @classmethod + def get_fields(cls) -> List[str]: + return [key for key in cls.__dict__.keys() if not key.startswith('__') and key != "get_fields"] + + def __str__(self): + return f""" +Symbol: {self.symbol} +Name: {self.name} +Price: {self.price} +Change Price: {self.change_price} +Change Percent: {self.change_percent} +Market Price: {self.market_price} +""" + + +def make_req_params_and_headers(): + headers = { + # cookies是必须的,并且和common_params的crumb参数绑定的。 + 'cookie': 'GUC=AQEBCAFmDYVmOUIdcARM&s=AQAAANxlE2ny&g=Zgw0yA; A1=d=AQABBBB0fGQCEKnzzPnIHq8Lm4HEj-GCp50FEgEBCAGFDWY5Zliia3sB_eMBAAcIEHR8ZOGCp50&S=AQAAAgF-nCWw8AxSZ-gyIaeg4aI; A3=d=AQABBBB0fGQCEKnzzPnIHq8Lm4HEj-GCp50FEgEBCAGFDWY5Zliia3sB_eMBAAcIEHR8ZOGCp50&S=AQAAAgF-nCWw8AxSZ-gyIaeg4aI; axids=gam=y-lf5u4KlE2uJWDQYbXyUTkKMC2GVH7OUj~A&dv360=eS1XSElPM3l4RTJ1SHVVV3hNZVBDeG9aTDlDYXdaQ1dPNX5B&ydsp=y-_wiZU4RE2uIAxUbGalyjvJCoR6Le9iVT~A&tbla=y-gt2Wyc1E2uI4nvAYanhnPTMrhn4c3edZ~A; tbla_id=fde33964-c427-4b9c-b849-90a304938e21-tuctb84a272; cmp=t=1712472060&j=0&u=1YNN; gpp=DBABBg~BVoIgACA.QA; gpp_sid=8; A1S=d=AQABBBB0fGQCEKnzzPnIHq8Lm4HEj-GCp50FEgEBCAGFDWY5Zliia3sB_eMBAAcIEHR8ZOGCp50&S=AQAAAgF-nCWw8AxSZ-gyIaeg4aI&j=WORLD', + 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36', + } + common_params = { + 'crumb': 'UllRf10isbP', + 'lang': 'en-US', + 'region': 'US', + 'formatted': 'true', + 'corsDomain': 'finance.yahoo.com', + } + common_payload_data = { + 'offset': 0, # 这个是分页其实位置 + 'size': 25, # 这个是分页数量 + 'sortType': 'DESC', + 'sortField': 'intradaymarketcap', + 'quoteType': 'CRYPTOCURRENCY', + 'query': { + 'operator': 'and', + 'operands': [ + { + 'operator': 'eq', + 'operands': [ + 'currency', + 'USD', + ], + }, + { + 'operator': 'eq', + 'operands': [ + 'exchange', + 'CCC', + ], + }, + ], + }, + 'userId': '', + 'userIdType': 'guid', + } + return common_params, headers, common_payload_data diff --git "a/\346\272\220\344\273\243\347\240\201/\347\210\254\350\231\253\345\205\245\351\227\250/11_\347\210\254\350\231\253\345\205\245\351\227\250\345\256\236\346\210\2304_\351\253\230\346\225\210\347\216\207\347\232\204\347\210\254\350\231\253\345\256\236\347\216\260/run_crawler_multi_coroutine.py" "b/\346\272\220\344\273\243\347\240\201/\347\210\254\350\231\253\345\205\245\351\227\250/11_\347\210\254\350\231\253\345\205\245\351\227\250\345\256\236\346\210\2304_\351\253\230\346\225\210\347\216\207\347\232\204\347\210\254\350\231\253\345\256\236\347\216\260/run_crawler_multi_coroutine.py" new file mode 100644 index 0000000..356e960 --- /dev/null +++ "b/\346\272\220\344\273\243\347\240\201/\347\210\254\350\231\253\345\205\245\351\227\250/11_\347\210\254\350\231\253\345\205\245\351\227\250\345\256\236\346\210\2304_\351\253\230\346\225\210\347\216\207\347\232\204\347\210\254\350\231\253\345\256\236\347\216\260/run_crawler_multi_coroutine.py" @@ -0,0 +1,149 @@ +# -*- coding: utf-8 -*- +import asyncio +import csv +import time +from typing import Any, Dict, List + +import aiofiles +import httpx +from common import SymbolContent, make_req_params_and_headers + +HOST = "https://query1.finance.yahoo.com" +SYMBOL_QUERY_API_URI = "/v1/finance/screener" +PAGE_SIZE = 100 # 可选配置(25, 50, 100) + + +def parse_symbol_content(quote_item: Dict) -> SymbolContent: + """ + 数据提取 + :param quote_item: + :return: + """ + symbol_content = SymbolContent() + symbol_content.symbol = quote_item["symbol"] + symbol_content.name = quote_item["shortName"] + symbol_content.price = quote_item["regularMarketPrice"]["fmt"] + symbol_content.change_price = quote_item["regularMarketChange"]["fmt"] + symbol_content.change_percent = quote_item["regularMarketChangePercent"]["fmt"] + symbol_content.market_price = quote_item["marketCap"]["fmt"] + return symbol_content + + +async def send_request(page_start: int, page_size: int) -> Dict[str, Any]: + """ + 公共的发送请求的函数 + :param page_start: 分页起始位置 + :param page_size: 每一页的长度 + :return: + """ + # print(f"[send_request] page_start:{page_start}") + req_url = HOST + SYMBOL_QUERY_API_URI + common_params, headers, common_payload_data = make_req_params_and_headers() + # 修改分页变动参数 + common_payload_data["offset"] = page_start + common_payload_data["size"] = page_size + + async with httpx.AsyncClient() as client: + response = await client.post(url=req_url, params=common_params, json=common_payload_data, headers=headers, + timeout=30) + if response.status_code != 200: + raise Exception("发起请求时发生异常,请求发生错误,原因:", response.text) + try: + response_dict: Dict = response.json() + return response_dict + except Exception as e: + raise e + + +async def fetch_currency_data_single(page_start: int) -> List[SymbolContent]: + """ + Fetch currency data for a single page. + :param page_start: Page start index. + :return: List of SymbolContent for the page. + """ + try: + response_dict: Dict = await send_request(page_start=page_start, page_size=PAGE_SIZE) + return [ + parse_symbol_content(quote) for quote in response_dict["finance"]["result"][0]["quotes"] + ] + except Exception as e: + print(f"Error fetching data for page_start={page_start}: {e}") + return [] + + +async def fetch_currency_data_list(max_total_count: int) -> List[SymbolContent]: + """ + Fetch currency data using asyncio. + :param max_total_count: Maximum total count of currencies. + :return: List of all SymbolContent. + """ + page_starts = list(range(0, max_total_count, PAGE_SIZE)) + print(f"总共发起: {len(page_starts)} 次网络请求") + + tasks = [fetch_currency_data_single(page_start) for page_start in page_starts] + results = await asyncio.gather(*tasks) + + # 扁平化结果列表 + return [item for sublist in results for item in sublist] + + +async def get_max_total_count() -> int: + """ + 获取所有币种总数量 + :return: + """ + print("开始获取最大的币种数量") + try: + response_dict: Dict = await send_request(page_start=0, page_size=PAGE_SIZE) + total_num: int = response_dict["finance"]["result"][0]["total"] + print(f"获取到 {total_num} 种币种") + return total_num + except Exception as e: + print("错误信息:", e) + return 0 + + +async def save_data_to_csv(save_file_name: str, currency_data_list: List[SymbolContent]) -> None: + """ + 保存数据存储到CSV文件中 + :param save_file_name: 保存的文件名 + :param currency_data_list: + :return: + """ + async with aiofiles.open(save_file_name, mode='w', newline='', encoding='utf-8') as file: + writer = csv.writer(file) + # 写入标题行 + await file.write(','.join(SymbolContent.get_fields()) + '\n') + # 遍历数据列表,并将每个币种的名称写入CSV + for symbol in currency_data_list: + await file.write(f"{symbol.symbol},{symbol.name},{symbol.price},{symbol.change_price},{symbol.change_percent},{symbol.market_price}\n") + + +async def run_crawler_async(save_file_name: str) -> None: + """ + 爬虫主流程(异步并发版本) + :param save_file_name: + :return: + """ + # step1 获取最大数据总量 + max_total: int = await get_max_total_count() + # step2 遍历每一页数据并解析存储到数据容器中 + data_list: List[SymbolContent] = await fetch_currency_data_list(max_total) + # step3 将数据容器中的数据保存csv + await save_data_to_csv(save_file_name, data_list) + +async def main(): + """ + 主函数 + :return: + """ + start_time = time.time() + save_csv_file_name = f"symbol_data_{int(start_time)}.csv" + await run_crawler_async(save_csv_file_name) + end_time = time.time() + print(f"asyncio调度协程执行程序耗时: {end_time - start_time} 秒") + + +if __name__ == '__main__': + asyncio.run(main()) + diff --git "a/\346\272\220\344\273\243\347\240\201/\347\210\254\350\231\253\345\205\245\351\227\250/11_\347\210\254\350\231\253\345\205\245\351\227\250\345\256\236\346\210\2304_\351\253\230\346\225\210\347\216\207\347\232\204\347\210\254\350\231\253\345\256\236\347\216\260/run_crawler_multi_process.py" "b/\346\272\220\344\273\243\347\240\201/\347\210\254\350\231\253\345\205\245\351\227\250/11_\347\210\254\350\231\253\345\205\245\351\227\250\345\256\236\346\210\2304_\351\253\230\346\225\210\347\216\207\347\232\204\347\210\254\350\231\253\345\256\236\347\216\260/run_crawler_multi_process.py" new file mode 100644 index 0000000..f3b736d --- /dev/null +++ "b/\346\272\220\344\273\243\347\240\201/\347\210\254\350\231\253\345\205\245\351\227\250/11_\347\210\254\350\231\253\345\205\245\351\227\250\345\256\236\346\210\2304_\351\253\230\346\225\210\347\216\207\347\232\204\347\210\254\350\231\253\345\256\236\347\216\260/run_crawler_multi_process.py" @@ -0,0 +1,139 @@ +# -*- coding: utf-8 -*- +import csv +import time +from typing import Any, Dict, List +from multiprocessing import Pool, cpu_count + +import requests +from common import SymbolContent, make_req_params_and_headers + +HOST = "https://query1.finance.yahoo.com" +SYMBOL_QUERY_API_URI = "/v1/finance/screener" +PAGE_SIZE = 100 # 可选配置(25, 50, 100) + + +def parse_symbol_content(quote_item: Dict) -> SymbolContent: + """ + 数据提取 + :param quote_item: + :return: + """ + symbol_content = SymbolContent() + symbol_content.symbol = quote_item["symbol"] + symbol_content.name = quote_item["shortName"] + symbol_content.price = quote_item["regularMarketPrice"]["fmt"] + symbol_content.change_price = quote_item["regularMarketChange"]["fmt"] + symbol_content.change_percent = quote_item["regularMarketChangePercent"]["fmt"] + symbol_content.market_price = quote_item["marketCap"]["fmt"] + return symbol_content + + +def send_request(page_start: int, page_size: int) -> Dict[str, Any]: + """ + 公共的发送请求的函数 + :param page_start: 分页起始位置 + :param page_size: 每一页的长度 + :return: + """ + # print(f"[send_request] page_start:{page_start}") + req_url = HOST + SYMBOL_QUERY_API_URI + common_params, headers, common_payload_data = make_req_params_and_headers() + # 修改分页变动参数 + common_payload_data["offset"] = page_start + common_payload_data["size"] = page_size + + response = requests.post(url=req_url, params=common_params, json=common_payload_data, headers=headers) + if response.status_code != 200: + raise Exception("发起请求时发生异常,请求发生错误,原因:", response.text) + try: + response_dict: Dict = response.json() + return response_dict + except Exception as e: + raise e + + +def fetch_currency_data_single(page_start: int) -> List[SymbolContent]: + """ + Fetch currency data for a single page. + :param page_start: Page start index. + :return: List of SymbolContent for the page. + """ + try: + response_dict: Dict = send_request(page_start=page_start, page_size=PAGE_SIZE) + symbol_data_list: List[SymbolContent] = [ + parse_symbol_content(quote) for quote in response_dict["finance"]["result"][0]["quotes"] + ] + return symbol_data_list + except Exception as e: + print(f"Error fetching data for page_start={page_start}: {e}") + return [] + + +def fetch_currency_data_list(max_total_count: int) -> List[SymbolContent]: + """ + Fetch currency data using multiprocessing. + :param max_total_count: Maximum total count of currencies. + :return: List of all SymbolContent. + """ + with Pool(processes=cpu_count()) as pool: + page_starts = list(range(0, max_total_count, PAGE_SIZE)) + print(f"总共发起: {len(page_starts)} 次网络请求") + results = pool.map(fetch_currency_data_single, page_starts) + + # Flatten the list of lists into a single list + return [item for sublist in results for item in sublist] + + +def get_max_total_count() -> int: + """ + 获取所有币种总数量 + :return: + """ + print("开始获取最大的币种数量") + try: + response_dict: Dict = send_request(page_start=0, page_size=PAGE_SIZE) + total_num: int = response_dict["finance"]["result"][0]["total"] + print(f"获取到 {total_num} 种币种") + return total_num + except Exception as e: + print("错误信息:", e) + return 0 + + +def save_data_to_csv(save_file_name: str, currency_data_list: List[SymbolContent]) -> None: + """ + 保存数据存储到CSV文件中 + :param save_file_name: 保存的文件名 + :param currency_data_list: + :return: + """ + with open(save_file_name, mode='w', newline='', encoding='utf-8') as file: + writer = csv.writer(file) + # 写入标题行 + writer.writerow(SymbolContent.get_fields()) + # 遍历数据列表,并将每个币种的名称写入CSV + for symbol in currency_data_list: + writer.writerow([symbol.symbol, symbol.name, symbol.price, symbol.change_price, symbol.change_percent, + symbol.market_price]) + + +def run_crawler_mp(save_file_name: str) -> None: + """ + 爬虫主流程(多进程版本) + :param save_file_name: + :return: + """ + # step1 获取最大数据总量 + max_total: int = get_max_total_count() + # step2 遍历每一页数据并解析存储到数据容器中 + data_list: List[SymbolContent] = fetch_currency_data_list(max_total) + # step3 将数据容器中的数据保存csv + save_data_to_csv(save_file_name, data_list) + + +if __name__ == '__main__': + start_time = time.time() + save_csv_file_name = f"symbol_data_{int(start_time)}.csv" + run_crawler_mp(save_csv_file_name) + end_time = time.time() + print(f"多进程执行程序耗时: {end_time - start_time} 秒") \ No newline at end of file diff --git "a/\346\272\220\344\273\243\347\240\201/\347\210\254\350\231\253\345\205\245\351\227\250/11_\347\210\254\350\231\253\345\205\245\351\227\250\345\256\236\346\210\2304_\351\253\230\346\225\210\347\216\207\347\232\204\347\210\254\350\231\253\345\256\236\347\216\260/run_crawler_multi_thread.py" "b/\346\272\220\344\273\243\347\240\201/\347\210\254\350\231\253\345\205\245\351\227\250/11_\347\210\254\350\231\253\345\205\245\351\227\250\345\256\236\346\210\2304_\351\253\230\346\225\210\347\216\207\347\232\204\347\210\254\350\231\253\345\256\236\347\216\260/run_crawler_multi_thread.py" new file mode 100644 index 0000000..529e0af --- /dev/null +++ "b/\346\272\220\344\273\243\347\240\201/\347\210\254\350\231\253\345\205\245\351\227\250/11_\347\210\254\350\231\253\345\205\245\351\227\250\345\256\236\346\210\2304_\351\253\230\346\225\210\347\216\207\347\232\204\347\210\254\350\231\253\345\256\236\347\216\260/run_crawler_multi_thread.py" @@ -0,0 +1,141 @@ +# -*- coding: utf-8 -*- +import csv +import time +from os import cpu_count +from typing import Any, Dict, List +from concurrent.futures import ThreadPoolExecutor + +import requests +from common import SymbolContent, make_req_params_and_headers + +HOST = "https://query1.finance.yahoo.com" +SYMBOL_QUERY_API_URI = "/v1/finance/screener" +PAGE_SIZE = 100 # 可选配置(25, 50, 100) + + +def parse_symbol_content(quote_item: Dict) -> SymbolContent: + """ + 数据提取 + :param quote_item: + :return: + """ + symbol_content = SymbolContent() + symbol_content.symbol = quote_item["symbol"] + symbol_content.name = quote_item["shortName"] + symbol_content.price = quote_item["regularMarketPrice"]["fmt"] + symbol_content.change_price = quote_item["regularMarketChange"]["fmt"] + symbol_content.change_percent = quote_item["regularMarketChangePercent"]["fmt"] + symbol_content.market_price = quote_item["marketCap"]["fmt"] + return symbol_content + + +def send_request(page_start: int, page_size: int) -> Dict[str, Any]: + """ + 公共的发送请求的函数 + :param page_start: 分页起始位置 + :param page_size: 每一页的长度 + :return: + """ + # print(f"[send_request] page_start:{page_start}") + req_url = HOST + SYMBOL_QUERY_API_URI + common_params, headers, common_payload_data = make_req_params_and_headers() + # 修改分页变动参数 + common_payload_data["offset"] = page_start + common_payload_data["size"] = page_size + + response = requests.post(url=req_url, params=common_params, json=common_payload_data, headers=headers) + if response.status_code != 200: + raise Exception("发起请求时发生异常,请求发生错误,原因:", response.text) + try: + response_dict: Dict = response.json() + return response_dict + except Exception as e: + raise e + + +def fetch_currency_data_single(page_start: int) -> List[SymbolContent]: + """ + Fetch currency data for a single page. + :param page_start: Page start index. + :return: List of SymbolContent for the page. + """ + try: + response_dict: Dict = send_request(page_start=page_start, page_size=PAGE_SIZE) + return [ + parse_symbol_content(quote) for quote in response_dict["finance"]["result"][0]["quotes"] + ] + except Exception as e: + print(f"Error fetching data for page_start={page_start}: {e}") + return [] + + +def fetch_currency_data_list(max_total_count: int) -> List[SymbolContent]: + """ + Fetch currency data using multithreading. + :param max_total_count: Maximum total count of currencies. + :return: List of all SymbolContent. + """ + with ThreadPoolExecutor(max_workers=cpu_count() * 2) as executor: + page_starts = list(range(0, max_total_count, PAGE_SIZE)) + print(f"总共发起: {len(page_starts)} 次网络请求") + + # 使用 map 方法 + results = list(executor.map(fetch_currency_data_single, page_starts)) + + # 扁平化结果列表 + return [item for sublist in results for item in sublist] + + +def get_max_total_count() -> int: + """ + 获取所有币种总数量 + :return: + """ + print("开始获取最大的币种数量") + try: + response_dict: Dict = send_request(page_start=0, page_size=PAGE_SIZE) + total_num: int = response_dict["finance"]["result"][0]["total"] + print(f"获取到 {total_num} 种币种") + return total_num + except Exception as e: + print("错误信息:", e) + return 0 + + +def save_data_to_csv(save_file_name: str, currency_data_list: List[SymbolContent]) -> None: + """ + 保存数据存储到CSV文件中 + :param save_file_name: 保存的文件名 + :param currency_data_list: + :return: + """ + with open(save_file_name, mode='w', newline='', encoding='utf-8') as file: + writer = csv.writer(file) + # 写入标题行 + writer.writerow(SymbolContent.get_fields()) + # 遍历数据列表,并将每个币种的名称写入CSV + for symbol in currency_data_list: + writer.writerow([symbol.symbol, symbol.name, symbol.price, symbol.change_price, symbol.change_percent, + symbol.market_price]) + + +def run_crawler_mt(save_file_name: str) -> None: + """ + 爬虫主流程(多线程版本) + :param save_file_name: + :return: + """ + # step1 获取最大数据总量 + max_total: int = get_max_total_count() + # step2 遍历每一页数据并解析存储到数据容器中 + data_list: List[SymbolContent] = fetch_currency_data_list(max_total) + # step3 将数据容器中的数据保存csv + save_data_to_csv(save_file_name, data_list) + + +if __name__ == '__main__': + start_time = time.time() + save_csv_file_name = f"symbol_data_{int(start_time)}.csv" + run_crawler_mt(save_csv_file_name) + end_time = time.time() + print(f"多线程执行程序耗时: {end_time - start_time} 秒") \ No newline at end of file