左旗网站建设,ico交易网站怎么做,网站域名空间费用,网站地图做关键词排名使用 multiprocessing 模块实现多进程爬取股票网址买卖数据的基本思路是#xff1a;
定义爬虫函数#xff0c;用于从一个或多个股票网址上抓取数据。创建多个进程#xff0c;每个进程执行爬虫函数#xff0c;可能针对不同的股票或不同的网页。使用 multiprocessing.Queue …使用 multiprocessing 模块实现多进程爬取股票网址买卖数据的基本思路是
定义爬虫函数用于从一个或多个股票网址上抓取数据。创建多个进程每个进程执行爬虫函数可能针对不同的股票或不同的网页。使用 multiprocessing.Queue 或 multiprocessing.Manager() 管理共享数据结构以便进程间可以共享爬取的数据。
以下是一个简化的示例展示如何使用 multiprocessing 模块和 requests 库来实现多进程爬取股票数据 # encoding:utf-8
import sys,os,copy,time,traceback,copy
import multiprocessing
# from queue import Queue
import pandas as pd
from loguru import logger
sys.path.append(..)
from QhSetting import QHJSPATH
from QhSpiderObj import QhDFSpider
from QhCsvMode import QHDFDBJSON,QhPdCsvUnique
from QhSpiderTool import QhDbPathJieXiIsMkdir,QhDfDateSort,QhSouHuJiaoYiDate,QhNotNaNdf,\QhDfWeiYiZhi,QhGetTimes
from QhSpiderTool import QhStarEndTime
from QhInterFace import _QhDfMaiMAIDetails,_QhDBToCsvdef worker(num):print(fWorker: {num})# QhStarEndTime
def QhDfMaiMAIDetailsForM(QhCodeList,QhQueue,QhIsCsvFalse):作者阙辉功能获取每日买卖明细# QhCsvPath QHDFDBJSON[QhDfAllStock][QhCsvPath]# QhCsvPath QhDbPathJieXiIsMkdir(QhCsvPath,QHJSPATH)# QhCsvName QHDFDBJSON[QhDfAllStock][QhCsvName]# QhCsvPath {}\{}.format(QhCsvPath,QhCsvName)# QhOldCsvDf pd.read_csv(QhCsvPath,encodinggbk)# QhOldCsvDf.set_index(股票代码,dropFalse,inplaceTrue) #重置索引并保留原列 要先设置所以 否则无法使用at方法# QhOldCsvDf QhOldCsvDf[QhOldCsvDf[交易板块].isin([上证A股,深证A股,北证A股,科创板,创业板])]#[:10]QhUniqueValue QHDFDBJSON[_QhDfMaiMAIDetails][QhUniqueValue]QhJiaoYiDateD QhSouHuJiaoYiDate()[2] # 获取交易日期(YYYY,YYYY-MM,YYYY-MM-DD)QhCsvPath QHDFDBJSON[_QhDfMaiMAIDetails][QhCsvPath]QhCsvName0 QHDFDBJSON[_QhDfMaiMAIDetails][QhCsvName]QhCsvName QhCsvName0.format(QhJiaoYiDateD)QhCsvPathF0 QHDFDBJSON[_QhDfMaiMAIDetails][QhCsvPathF]QhCsvNameF0 QHDFDBJSON[_QhDfMaiMAIDetails][QhCsvNameF]QhCsvPath QhDbPathJieXiIsMkdir(QhCsvPath,QHJSPATH)QhCsvPath {}\{}.format(QhCsvPath,QhCsvName)print(QhCsvPath)QhI 0for QhRow in QhCodeList:try:QhCode01 QhRow[0]QhShiChang QhRow[1]QhCsvPathF copy.deepcopy(QhCsvPathF0)QhCsvNameF QhCsvNameF0.format(QhCode01)QhCsvPathF QhDbPathJieXiIsMkdir(QhCsvPathF,QHJSPATH)QhCsvPathF {}\{}.format(QhCsvPathF,QhCsvNameF)QhCode QhCode01.replace(Q,)QhCodes QhShiChangsecid {}.{}.format(QhCodes,QhCode)QhJieGuoRowDf _QhDfMaiMAIDetails(QhSecidsecid)QhJieGuoRowDf[交易日期01] QhJiaoYiDateDQhQueue.put(QhJieGuoRowDf)print(QhJieGuoRowDf)# 将数据添加后面if QhI 0:QhJieGuoDfNew QhJieGuoRowDf.copy(deepTrue) else:try: # 兼容旧版本处理QhJieGuoDfNew QhJieGuoDfNew._append(QhJieGuoRowDf)except:QhJieGuoDfNew QhJieGuoDfNew.append(QhJieGuoRowDf)_QhDBToCsv(QhCsvPathF,QhUniqueValue,QhJieGuoRowDf,QhDateSort,QhIsCsvTrue)QhI QhI 1except:QhErrMsg traceback.print_exc()logger.error(【买卖竞价数据】获取失败,报错消息\n{QhErrMsg}QueHui!.format(QhErrMsgQhErrMsg))_QhDBToCsv(QhCsvPath,QhUniqueValue,QhJieGuoDfNew,QhDateSort,QhIsCsvTrue)QhI QhI 1# 存储_QhDBToCsv(QhCsvPath,QhUniqueValue,QhJieGuoDfNew,QhDateSort,QhIsCsvQhIsCsv)return QhJieGuoDfNew
if __name__ __main__:# processes []# for i in range(5): # 创建5个进程# p multiprocessing.Process(targetworker, args(i,))# processes.append(p)# p.start() # 启动进程# for process in processes:# process.join() # 等待进程结束QhCsvPath QHDFDBJSON[QhDfAllStock][QhCsvPath]QhCsvPath QhDbPathJieXiIsMkdir(QhCsvPath,QHJSPATH)QhCsvName QHDFDBJSON[QhDfAllStock][QhCsvName]QhCsvPath {}\{}.format(QhCsvPath,QhCsvName)QhOldCsvDf pd.read_csv(QhCsvPath,encodinggbk)QhOldCsvDf.set_index(股票代码,dropFalse,inplaceTrue) #重置索引并保留原列 要先设置所以 否则无法使用at方法QhOldCsvDf QhOldCsvDf[QhOldCsvDf[交易板块].isin([上证A股,深证A股,北证A股,科创板,创业板])][:500]QhOldCsvList []for index, row in QhOldCsvDf.iterrows():# print(row)QhOldCsvListRow []QhCode row[股票代码]QhOldCsvListRow.append(QhCode)QhShiChang row[市场代码]QhOldCsvListRow.append(QhShiChang)QhOldCsvList.append(QhOldCsvListRow)qh_group_count 100processes []QhQueueList []QhTotalTimes QhGetTimes(len(QhOldCsvList),qh_group_count qh_group_count)QhManager multiprocessing.Manager()QhQueue QhManager.Queue() # 设置队列上限为3QhStart 0for QhRow in range(1,QhTotalTimes1):QhPa QhOldCsvList[QhStart:QhRow*qh_group_count]print(QhPa)QhStart QhRow*qh_group_count p multiprocessing.Process(targetQhDfMaiMAIDetailsForM, args(QhPa,QhQueue,False))processes.append(p)# QhQueueList.append(QhQueue)p.start() # 启动进程for process in processes:process.join() # 等待进程结束