当前位置: 首页 > news >正文

建设网站的意义作用是什么意思百度收录网站方法

建设网站的意义作用是什么意思,百度收录网站方法,苏中建设是哪里的,优化搜狗排名DolphinDB 作为一款高性能时序数据库#xff0c;其在实际生产环境中常有数据的清洗、装换以及加载等需求#xff0c;而对于该如何结构化管理好 ETL 作业#xff0c;Airflow 提供了一种很好的思路。本篇教程为生产环境中 ETL 实践需求提供了一个解决方案#xff0c;将 Pytho…DolphinDB 作为一款高性能时序数据库其在实际生产环境中常有数据的清洗、装换以及加载等需求而对于该如何结构化管理好 ETL 作业Airflow 提供了一种很好的思路。本篇教程为生产环境中 ETL 实践需求提供了一个解决方案将 Python Airflow 引入到 DolphinDB 的高可用集群中通过使用 Airflow 所提供的功能来实现更好管理 DolphinDB 数据 ETL 作业整体架构如下: 1. Airflow 1.1 Airflow 简介 Airflow 是一个可编程调度和监控的工作流平台基于有向无环图 (Directed acyclic graph, DAG)Airflow 可以定义一组有依赖的任务按照依赖依次执行。Airflow 提供了丰富的命令行工具用于系统管控而其 web 管理界面同样也可以方便地管控调度任务并且对任务运行状态进行实时监控方便了系统的运维和管理。 1.2 Airflow 部分核心功能 增量加载数据当表或数据集较小时可以整体加载数据。但是随着数据的增长以固定的时间间隔增量提取数据才是 ETL 的常态仅加载一个小时一天一周数据的需求非常普遍。Airflow 可以容易地以特定的时间间隔加载增量数据。处理历史数据在某些情况下您刚刚完成一个新的工作流程并且需要回溯到将新代码投入生产的日期的数据。在这种情况下您只需使用 DAG 中的 start_date 参数以指定开始日期。然后Airflow 将回填任务到开始日期。此外还可以使用 DAG 的参数来处理数周、数月或数年的数据。分区提取的数据通过对数据的目的地进行分区可以并行运行 DAG避免对提取的数据进行锁定并在读取相同数据时优化性能。不再相关的数据可以存档并从数据库中删除。强制幂等约束DAG 运行的结果应始终具有幂等特性。这意味着当您使用相同的参数多次运行某个流程时即使在不同的日期结果也将完全相同。有条件地执行Airflow 具有一些选项可根据之前的实例的成功来控制 DAG 中任务的运行方式。 1.3 DolphinDBOperator DolphinDBOperator 是 Airflow 的 operator 一种通过 DolphinDBOperator 可以在 Airflow 连接 DolphinDB 进行数据写入、查询、计算等操作。DolphinDBOperator 特有的参数有 dolphindb_conn_id: 用于指定 DolphinDB 连接可在 connection 中设置sql: 指定需要运行的 DolphinDB 脚本file_path: 可以指定 DolphinDB dos 文件运行脚本 ​ DolphinDBOperator 使用示例如下 通过 sql 参数指定任务内容运行脚本 //在 DolphinDB 中创建一个共享表 create_parameter_table DolphinDBOperator(task_idcreate_parameter_table,dolphindb_conn_iddolphindb_test,sqlundef(paramTable,SHARED)t table(1:0, paramvalue, [STRING, STRING])share t as paramTable) 通过 file_path 指定 dos 文件运行脚本//CalAlpha001.dos 为 DolphinDB 脚本case1 DolphinDBOperator(task_idcase1,dolphindb_conn_iddolphindb_test,file_pathpath /StreamCalculating/CalAlpha001.dos) 1.4 Airflow 安装部署 硬件环境 硬件名称配置信息主机名HostName外网 IPxxx.xxx.xxx.122操作系统Linux内核版本3.10以上内存64 GBCPUx86_6412核心 软件环境 软件名称版本信息DolphinDB2.00.9Airflow2.5.1python3.7及以上 注 1.本教程使用 SQLite 数据库作为后端存储如果因 SQLite 版本过低无法启动可参考设置数据库升级 SQLlite 或更改默认数据库。 2.在流程开始前建议预先构建 DolphinDB 服务。具体安装方法可以参考 DolphinDB 高可用集群部署教程。也可以参考基于 Docker-Compose 的 DolphinDB 多容器集群部署。主机环境 首先在安装 Airflow 之前要确保主机上安装了 python3 、dolphindb 、dolphindb-operator 三个依赖包。执行以下命令完成对这三个依赖包的安装。 依赖包可从附件中获取。 pip install --force-reinstall dolphindb pip install --force-reinstall dolphindbapi-1.0.0-py3-none-any.whl pip install --force-reinstall apache_Airflow_providers_dolphindb-1.0.0-py3-none-any.whl 本教程使用的 Airflow 的安装包仅提供离线版安装在线版安装会在正式发布后提供安装方式。2. 安装好 airflow.provide.dolphindb 插件后启动 Airflow 部署以及安装 Airflow 详情见官网airflow 快速入门。以下为启动 Airflow 的核心代码: #初始化数据库 airflow db init#创建用户 airflow users create --username admin --firstname Peter --lastname Parker --role Admin --email spidermansuperhero.org --password admin# 守护进程运行 webserve airflow webserver --port 8080 -D# 守护进程运行 scheduler airflow scheduler -D1#初始化数据库 2airflow db init 3 4#创建用户 5airflow users create --username admin --firstname Peter --lastname Parker --role Admin --email spidermansuperhero.org --password admin 6 7# 守护进程运行webserve 8airflow webserver --port 8080 -D 9 10# 守护进程运行scheduler 11airflow scheduler -D 3. 执行以下命令验证 Airflow 是否成功启动 ps -aux|grep airflow 预期输出如下图证明 Airflow 启动成功 4. 启动成功后浏览器中登陆 Airflow 的 web 界面 默认地址http://IP:8080默认账户初始化 db 中创建本文例子中为 admin默认密码初始化 db 中创建, 本文例子中为 admin5. 输入上述创建用户名密码即可进入 Airflow 的 UI 界面如下所示: 6. 填写 DolphinDB 连接信息后连接到 DolphinDB 数据库。 连接成功后在 DolphinDBOperator 中指定 dolphindb_conn_iddolphindb_test即可运行 DolphinDB 脚本。上述准备工作完成后下文以一个股票快照数据的 ETL 过程为例展现 Airflow 如何和 DolphinDB 交互。 2. Airflow 调度对行情数据 ETL 2.1 整体 ETL 架构图 ETL 平台功能模块代码目录结构 功能模块代码目录结构详解 add增量数据 ETL addLoadSnapshot每日新增 Snapshot 原始数据导入addProcessSnapshot增量 Snapshot 处理成 ArrayVector 以及清洗数据addFactor增加合成日 K 及一分钟 K 数据并存储addETL.py构建增量数据 DAGfull全量数据 ETL loadSnapshotSnapshot 建表与导入processSnapshotSnapshot 清洗结果建表将数据处理成 ArrayVector 以及清洗数据并存储Factor创建因子存储表将清洗后数据加工成日 K 以及一分钟 K 数据并存储fullETL.py构建全量数据 DAG外部数据源 - ODS 数据源将原始数据从外部数据源导入 DolphinDB ODS 数据源 - DWD 数据明细清洗原始数据将原始数据中的多档数据清洗成 ArrayVector 并去重 DWD 数据明细 - DWB/DWS 数据汇总: 对清洗后的快照数据进行计算加工合成 K 线数据 注 本教程使用 DolphinDB 中 module 功能以及 DolphinDB 客户端工具进行工程化管理 DolphinDB 脚本详细介绍见 DolphinDB教程: 模块 以及 DolphinDB客户端软件教程。2.2 数据介绍 本教程选取了 2020.01.04 - 2021.01.08 全市场所有股票的 5 天的 level 2 快照数据。以下是快照表在DolphinDB的结构。BidOrderQtyBidPriceBidNumOrdersBidOrdersOfferPriceOfferOrderQtyOfferNumOrders 和 OfferOrders 8个字段分别包含多档数据在 DolphinDB 中采用 ArrayVector 数据类型来保存 字段名字段含义数据类型DolphinDBSecurityID证券代码SYMBOLDateTime日期时间TIMESTAMPPreClosePx昨收价DOUBLEOpenPx开始价DOUBLEHighPx最高价DOUBLELowPx最低价DOUBLELastPx最新价DOUBLETotalVolumeTrade成交总量INTTotalValueTrade成交总金额DOUBLEInstrumentStatus交易状态SYMBOLBidPrice申买十价DOUBLE[]BidOrderQty申买十量INT[]BidNumOrders申买十实际总委托笔 数INT[]BidOrders申买一前 50 笔订单INT[]OfferPrice申卖十价DOUBLE[]OfferOrderQty申卖十量INT[]OfferNumOrders申卖十实际总委托笔 数INT[]OfferOrders申卖一前 50 笔订单INT[]……………… 2.3 DolphinDB 核心清洗脚本介绍 2.3.1 创建分布式库表 创建 snapshot 原始数据存储表 创建存储原始 snapshot 原始数据的库表核心代码如下 module loadSnapshot::createSnapshotTable//创建 snapshot 原始数据存储库表 def createSnapshot(dbName, tbName){login(admin, 123456)if(existsDatabase(dbName)){dropDatabase(dbName)}db1 database(, VALUE, 2020.01.01..2021.01.01)db2 database(, HASH, [SYMBOL, 50])//按天和股票代码组合分区db database(dbName,COMPO,[db1,db2],engineTSDB)colName [SecurityID,DateTime,PreClosePx,OpenPx,HighPx,LowPx,LastPx,TotalVolumeTrade,TotalValueTrade,InstrumentStatus,BidPrice0,BidPrice1,BidPrice2,BidPrice3,BidPrice4,BidPrice5,BidPrice6,BidPrice7,BidPrice8,BidPrice9,BidOrderQty0,BidOrderQty1,BidOrderQty2,BidOrderQty3,BidOrderQty4,BidOrderQty5,BidOrderQty6,BidOrderQty7,BidOrderQty8,BidOrderQty9,BidNumOrders0,BidNumOrders1,BidNumOrders2,BidNumOrders3,BidNumOrders4,BidNumOrders5,BidNumOrders6,BidNumOrders7,BidNumOrders8,BidNumOrders9,BidOrders0,BidOrders1,BidOrders2,BidOrders3,BidOrders4,BidOrders5,BidOrders6,BidOrders7,BidOrders8,BidOrders9,BidOrders10,BidOrders11,BidOrders12,BidOrders13,BidOrders14,BidOrders15,BidOrders16,BidOrders17,BidOrders18,BidOrders19,BidOrders20,BidOrders21,BidOrders22,BidOrders23,BidOrders24,BidOrders25,BidOrders26,BidOrders27,BidOrders28,BidOrders29,BidOrders30,BidOrders31,BidOrders32,BidOrders33,BidOrders34,BidOrders35,BidOrders36,BidOrders37,BidOrders38,BidOrders39,BidOrders40,BidOrders41,BidOrders42,BidOrders43,BidOrders44,BidOrders45,BidOrders46,BidOrders47,BidOrders48,BidOrders49,OfferPrice0,OfferPrice1,OfferPrice2,OfferPrice3,OfferPrice4,OfferPrice5,OfferPrice6,OfferPrice7,OfferPrice8,OfferPrice9,OfferOrderQty0,OfferOrderQty1,OfferOrderQty2,OfferOrderQty3,OfferOrderQty4,OfferOrderQty5,OfferOrderQty6,OfferOrderQty7,OfferOrderQty8,OfferOrderQty9,OfferNumOrders0,OfferNumOrders1,OfferNumOrders2,OfferNumOrders3,OfferNumOrders4,OfferNumOrders5,OfferNumOrders6,OfferNumOrders7,OfferNumOrders8,OfferNumOrders9,OfferOrders0,OfferOrders1,OfferOrders2,OfferOrders3,OfferOrders4,OfferOrders5,OfferOrders6,OfferOrders7,OfferOrders8,OfferOrders9,OfferOrders10,OfferOrders11,OfferOrders12,OfferOrders13,OfferOrders14,OfferOrders15,OfferOrders16,OfferOrders17,OfferOrders18,OfferOrders19,OfferOrders20,OfferOrders21,OfferOrders22,OfferOrders23,OfferOrders24,OfferOrders25,OfferOrders26,OfferOrders27,OfferOrders28,OfferOrders29,OfferOrders30,OfferOrders31,OfferOrders32,OfferOrders33,OfferOrders34,OfferOrders35,OfferOrders36,OfferOrders37,OfferOrders38,OfferOrders39,OfferOrders40,OfferOrders41,OfferOrders42,OfferOrders43,OfferOrders44,OfferOrders45,OfferOrders46,OfferOrders47,OfferOrders48,OfferOrders49,NumTrades,IOPV,TotalBidQty,TotalOfferQty,WeightedAvgBidPx,WeightedAvgOfferPx,TotalBidNumber,TotalOfferNumber,BidTradeMaxDuration,OfferTradeMaxDuration,NumBidOrders,NumOfferOrders,WithdrawBuyNumber,WithdrawBuyAmount,WithdrawBuyMoney,WithdrawSellNumber,WithdrawSellAmount,WithdrawSellMoney,ETFBuyNumber,ETFBuyAmount,ETFBuyMoney,ETFSellNumber,ETFSellAmount,ETFSellMoney]colType [SYMBOL,TIMESTAMP,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,INT,DOUBLE,SYMBOL,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,INT,DOUBLE,DOUBLE,INT,INT,INT,INT,INT,INT,INT,INT,DOUBLE,INT,INT,DOUBLE,INT,INT,INT,INT,INT,INT]schemaTable table(1:0,colName, colType)db.createPartitionedTable(tableschemaTable, tableNametbName, partitionColumnsDateTimeSecurityID, compressMethods{DateTime:delta}, sortColumnsSecurityIDDateTime, keepDuplicatesALL) } 对于 snapshot 数据本文采用的数据库分区方案是组合分区第一层按天分区第二层对股票代码按 HASH 分50个分区。如何根据数据确定分区方案可参考 DolphinDB 分区数据库教程。 创建清洗后 snapshot 数据存储表 创建清洗后以 Array 格式存储 snapshot 数据的库表核心代码如下 module processSnapshot::createSnapshot_array//创建清洗后的 snapshot 数据存储表 def createProcessTable(dbName, tbName){if(existsDatabase(dbName)){dropDatabase(dbName)}db1 database(, VALUE, 2020.01.01..2021.01.01)db2 database(, HASH, [SYMBOL, 50])//按天和股票代码组合分区db database(dbName,COMPO,[db1,db2],engineTSDB)colName [SecurityID,DateTime,PreClosePx,OpenPx,HighPx,LowPx,LastPx,TotalVolumeTrade,TotalValueTrade,InstrumentStatus,BidPrice,BidOrderQty,BidNumOrders,BidOrders,OfferPrice,OfferOrderQty,OfferNumOrders,OfferOrders,NumTrades,IOPV,TotalBidQty,TotalOfferQty,WeightedAvgBidPx,WeightedAvgOfferPx,TotalBidNumber,TotalOfferNumber,BidTradeMaxDuration,OfferTradeMaxDuration,NumBidOrders,NumOfferOrders,WithdrawBuyNumber,WithdrawBuyAmount,WithdrawBuyMoney,WithdrawSellNumber,WithdrawSellAmount,WithdrawSellMoney,ETFBuyNumber,ETFBuyAmount,ETFBuyMoney,ETFSellNumber,ETFSellAmount,ETFSellMoney]colType [SYMBOL,TIMESTAMP,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,INT,DOUBLE,SYMBOL,DOUBLE[],INT[],INT[],INT[],DOUBLE[],INT[],INT[],INT[],INT,INT,INT,INT,DOUBLE,DOUBLE,INT,INT,INT,INT,INT,INT,INT,INT,DOUBLE,INT,INT,DOUBLE,INT,INT,INT,INT,INT,INT]schemaTable table(1:0, colName, colType)db.createPartitionedTable(tableschemaTable, tableNametbName, partitionColumnsDateTimeSecurityID, compressMethods{DateTime:delta}, sortColumnsSecurityIDDateTime, keepDuplicatesALL) } 创建 K 线结果存储表 创建分钟级 K 线结果存储表核心代码如下 module Factor::createFactorOneMinute//创建分钟 k 线因子储存表 def createFactorOneMinute(dbName, tbName){if(existsDatabase(dbName)){dropDatabase(dbName)}//按天分区db database(dbName, VALUE, 2021.01.01..2021.01.03,engine TSDB)colName TradeDateTradeTimeSecurityIDOpenHighLowCloseVolumeAmountVwapcolType [DATE, MINUTE, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, LONG, DOUBLE, DOUBLE]tbSchema table(1:0, colName, colType)db.createPartitionedTable(tabletbSchema,tableNametbName,partitionColumnsTradeDate,sortColumnsSecurityIDTradeTime,keepDuplicatesALL) } 创建日级 K 线结果存储表核心代码如下 module Factor::createFactorDaily//创建日 K 线储存表 def createFactorDaily(dbName, tbName){if(existsDatabase(dbName)){dropDatabase(dbName)}//按年分区db database(dbName, RANGE, datetimeAdd(2000.01M,(0..50)*12, M),engine TSDB)colName TradeDateSecurityIDOpenHighLowCloseVolumeAmountVwapcolType [DATE, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, LONG, DOUBLE, DOUBLE]tbSchema table(1:0, colName, colType)db.createPartitionedTable(tabletbSchema,tableNametbName,partitionColumnsTradeDate,sortColumnsSecurityIDTradeDate,keepDuplicatesALL) } 2.3.2 数据清洗 本文将 snapshot 数据的清洗主要分为两个部分 将多档价格、委托量以 Array 形式储存数据去重 具体的处理流程及核心代码如下: module processSnapshot::processSnapshotData//将数据组合为 array、去重并统计重复数据量 def mapProcess(mutable t, dbName, tbName){n1 t.size()t select SecurityID, DateTime, PreClosePx, OpenPx, HighPx, LowPx, LastPx, TotalVolumeTrade, TotalValueTrade, InstrumentStatus, fixedLengthArrayVector(BidPrice0, BidPrice1, BidPrice2, BidPrice3, BidPrice4, BidPrice5, BidPrice6, BidPrice7, BidPrice8, BidPrice9) as BidPrice, fixedLengthArrayVector(BidOrderQty0, BidOrderQty1, BidOrderQty2, BidOrderQty3, BidOrderQty4, BidOrderQty5, BidOrderQty6, BidOrderQty7, BidOrderQty8, BidOrderQty9) as BidOrderQty, fixedLengthArrayVector(BidNumOrders0, BidNumOrders1, BidNumOrders2, BidNumOrders3, BidNumOrders4, BidNumOrders5, BidNumOrders6, BidNumOrders7, BidNumOrders8, BidNumOrders9) as BidNumOrders, fixedLengthArrayVector(BidOrders0, BidOrders1, BidOrders2, BidOrders3, BidOrders4, BidOrders5, BidOrders6, BidOrders7, BidOrders8, BidOrders9, BidOrders10, BidOrders11, BidOrders12, BidOrders13, BidOrders14, BidOrders15, BidOrders16, BidOrders17, BidOrders18, BidOrders19, BidOrders20, BidOrders21, BidOrders22, BidOrders23, BidOrders24, BidOrders25, BidOrders26, BidOrders27, BidOrders28, BidOrders29, BidOrders30, BidOrders31, BidOrders32, BidOrders33, BidOrders34, BidOrders35, BidOrders36, BidOrders37, BidOrders38, BidOrders39, BidOrders40, BidOrders41, BidOrders42, BidOrders43, BidOrders44, BidOrders45, BidOrders46, BidOrders47, BidOrders48, BidOrders49) as BidOrders, fixedLengthArrayVector(OfferPrice0, OfferPrice1, OfferPrice2, OfferPrice3, OfferPrice4, OfferPrice5, OfferPrice6, OfferPrice7, OfferPrice8, OfferPrice9) as OfferPrice, fixedLengthArrayVector(OfferOrderQty0, OfferOrderQty1, OfferOrderQty2, OfferOrderQty3, OfferOrderQty4, OfferOrderQty5, OfferOrderQty6, OfferOrderQty7, OfferOrderQty8, OfferOrderQty9) as OfferQty, fixedLengthArrayVector(OfferNumOrders0, OfferNumOrders1, OfferNumOrders2, OfferNumOrders3, OfferNumOrders4, OfferNumOrders5, OfferNumOrders6, OfferNumOrders7, OfferNumOrders8, OfferNumOrders9) as OfferNumOrders, fixedLengthArrayVector(OfferOrders0, OfferOrders1, OfferOrders2, OfferOrders3, OfferOrders4, OfferOrders5, OfferOrders6, OfferOrders7, OfferOrders8, OfferOrders9, OfferOrders10, OfferOrders11, OfferOrders12, OfferOrders13, OfferOrders14, OfferOrders15, OfferOrders16, OfferOrders17, OfferOrders18, OfferOrders19, OfferOrders20, OfferOrders21, OfferOrders22, OfferOrders23, OfferOrders24, OfferOrders25, OfferOrders26, OfferOrders27, OfferOrders28, OfferOrders29, OfferOrders30, OfferOrders31, OfferOrders32, OfferOrders33, OfferOrders34, OfferOrders35, OfferOrders36, OfferOrders37, OfferOrders38, OfferOrders39, OfferOrders40, OfferOrders41, OfferOrders42, OfferOrders43, OfferOrders44, OfferOrders45, OfferOrders46, OfferOrders47, OfferOrders48, OfferOrders49) as OfferOrders, NumTrades, IOPV, TotalBidQty, TotalOfferQty, WeightedAvgBidPx, WeightedAvgOfferPx, TotalBidNumber, TotalOfferNumber, BidTradeMaxDuration, OfferTradeMaxDuration, NumBidOrders, NumOfferOrders, WithdrawBuyNumber, WithdrawBuyAmount, WithdrawBuyMoney, WithdrawSellNumber, WithdrawSellAmount, WithdrawSellMoney, ETFBuyNumber, ETFBuyAmount, ETFBuyMoney, ETFSellNumber, ETFSellAmount, ETFSellMoney from t where isDuplicated([SecurityID, DateTime], FIRST) falsen2 t.size()loadTable(dbName, tbName).append!(t)return n1,n2 }def process(processDate, dbName_orig, tbName_orig, dbName_process, tbName_process){dataString temporalFormat(processDate, yyyyMMdd)//查询处理日期的数据在数据库中是否存在todayCount exec count(*) from loadTable(dbName_process, tbName_process) where date(DateTime)processDate//如果库里面已经存在当天要处理的数据删除库里面已有数据if(todayCount ! 0){writeLog(Start to delete the process snapshot data, the delete date is: dataString)dropPartition(database(dbName_process), processDate, tbName_process)writeLog(Successfully deleted the process snapshot data, the delete date is: dataString)}//开始处理数据writeLog(Start process Snapshot Data, the datetime is dataString)ds sqlDS(sql(selectsqlCol(*), fromloadTable(dbName_orig,tbName_orig),wheredate(DateTime)processDate))n1,n2mr(ds, mapProcess{, dbName_process, tbName_process}, , , false)if(n1 ! n2){writeLog(ERROR: Duplicated datas exists in dataString , Successfully drop string(n1-n2) pieces of data )}writeLog(Successfully process the snapshot data, the processDate is: dataString) } 2.3.3 清洗行情数据合成 K 线 分钟级 K 线合成并入库, 核心代码如下 module Factor::calFactorOneMinute//合成分钟 K 线并入库 def calFactorOneMinute(dbName, tbName, mutable factorTable){pt loadTable(dbName, tbName)//将数据分为10天一组计算dayList schema(pt).partitionSchema[0]if(dayList.size()10) dayList dayList.cut(10)for(days in dayList){//计算分钟 K 线res select first(LastPX) as Open, max(LastPx) as High, min(LastPx) as Low, last(LastPx) as Close, sum(TotalVolumeTrade) as Volume, sum(LastPx*totalVolumeTrade) as Amount, wavg(LastPx, TotalVolumeTrade) as Vwap from pt where date(DateTime) in days group by date(DateTime) as TradeDate,minute(DateTime) as TradeTime, SecurityIDwriteLog(Start to append minute factor result , the days is: [ concat(days, ,)])//分钟 K 线入库factorTable.append!(res)writeLog(Successfully append the minute factor result to databse, the days is: [ concat(days, ,)])} } 日级 K 线合成并入库, 核心代码如下 module Factor::calFactorDaily1//合成日 K 线并入库 def calFactorDaily(dbName, tbName, mutable factorTable){pt loadTable(dbName, tbName)//将数据分为10天一组计算dayList schema(pt).partitionSchema[0]if(dayList.size()10) dayList dayList.cut(10)for(days in dayList){//计算日 K 线res select first(LastPX) as Open, max(LastPx) as High, min(LastPx) as Low, last(LastPx) as Close, sum(TotalVolumeTrade) as Volume, sum(LastPx*totalVolumeTrade) as Amount, wavg(LastPx, TotalVolumeTrade) as Vwap from pt where date(DateTime) in days group by date(DateTime) as TradeDate, SecurityID writeLog(Start to append daily factor result , the days is: [ concat(days, ,)])//日 K 线入库factorTable.append!(res)writeLog(Successfully append the daily factor result to databse, the days is: [ concat(days, ,)])} } 2.4 增量数据清洗 增量数据的导入和全量数据的导入整体逻辑相同主要区别如下 全量数据批量导入增量数据每天定时执行全量数据导入通过 DolphinDB submitjob 异步提交任务增量数据导入通过调用 append! 接口同步导入数据加工 K 线全量数据批量加工增量数据只加工当天数据 具体代码差别见附件。 2.5 Airflow 生成 DAG 执行任务 2.5.1 生成一个 DAG 实例 生成全量 DAG 实例的示例如下 with DAG(dag_idETLTest, start_datedatetime(2023, 3, 10), schedule_intervalNone) as dag: dag_id 指定了 DAG 名称需要具有唯一性start_date 设定任务开始日期schedule_interval 指定两次任务的间隔None 表示该任务不自动执行需手动触发。 增量 DAG 示例如下 args {owner : Airflow,start_date : days_ago(1),catchup : False,retries : 0 } with DAG(dag_idaddETLTest, default_args args, schedule_interval0 12 * * *) as dag: 增量 DAG 中 catchup 表示是否进行回填任务操作retries 表示任务失败重试次数schedule_interval “0 12 * * * ” 表示在每天12点 (UTC) 运行任务。schedule 设置详细可参考Scheduling Triggers 2.5.2 获取 Airflow 中的变量 Airflow 中设定的变量值无法直接在 DolphinDB 脚本中获取为了在后续的任务中使用本文通过将 Airflow 中变量写入共享表的方式来实现后续在 DolphinDB 任务读取变量具体代码示例如下 //获取变量值 variable [ETL_dbName_origin, ETL_tbName_origin, ETL_dbName_process,ETL_tbName_process, ETL_dbName_factor,ETL_tbName_factor,ETL_dbName_factor_daily,ETL_tbName_factor_daily,ETL_filedir, ETL_start_date,ETL_end_date] value [] for var in variable:value.append(str(Variable.get(var))) variables ,.join(variable) values ,.join(value)//通过DolphinDBOperator创建共享表并将变量值写入共享表中create_parameter_table DolphinDBOperator(task_idcreate_parameter_table,dolphindb_conn_iddolphindb_test,sqlundef(paramTable,SHARED)t table(1:0, paramvalue, [STRING, STRING])share t as paramTable)given_param DolphinDBOperator(task_idgiven_param,dolphindb_conn_iddolphindb_test,sqlparams split( variables ,,); \values split( values ,,); \for(i in 0..(params.size()-1)){ \insert into paramTable values(params[i], values[i]);}) 运行该任务后可在 DolphinDB GUI 共享变量栏中看到参数表及其值如下图所示 每个 DAG 变量的值可以通过 AirFlow 进行修改点击下图所示位置 DAG 生成后在如下 Web 页面显示 DAG 使用的变量可以动态修改如下所示 下表为本项目中涉及的变量名称及其含义 变量名变量描述变量示例ETL_dbName_factor存储合成的分钟 k 线的库名dfs://oneMinuteFactorETL_tbName_factor存储合成的分钟 k 线的表名oneMinuteFactorETL_dbName_factor_daily存储合成的分钟 k 线的库名dfs://dailyFactorETL_tbName_factor_daily存储合成的日 k 线的表名dailyFactorETL_dbName_origin存储未处理的原始 snapshot 数据的库名dfs://TSDB_snapshot_origETL_tbName_origin存储未处理的原始 snapshot 数据的表名snapshot_origETL_dbName_process存储清洗后 snapshot 数据的库名dfs://TSDB_snapshot_processETL_tbName_process存储清洗后 snapshot 数据的表名snapshot_processETL_filedir原始 snapshot 数据存储路径/home/appadmin/根据实际存放路径修改ETL_start_date全量 ETL 任务中需要处理的原始数据的开始日期2021.01.04ETL_end_date全量 ETL 任务中需要处理的原始数据的结束日期2021.01.04 2.5.3 DolphinDBOperator 执行任务 DolphinDBOperator 全量处理数据 通过 DolphinDBOperator 将上述的数据入库、清洗、计算等设置为 DAG 中的任务 全量处理核心代码如下 loadSnapshot DolphinDBOperator(task_idloadSnapshot,dolphindb_conn_iddolphindb_test,sqlpnodeRun(clearAllCache)undef(all)go;//使用 module加载已封装好的建表及入库函数use loadSnapshot::createSnapshotTableuse loadSnapshot::loadSnapshotData//通过参数共享表获取参数params dict(paramTable[param], paramTable[value])dbName params[ETL_dbName_origin]tbName params[ETL_tbName_origin]startDate date(params[ETL_start_date])endDate date(params[ETL_end_date])fileDir params[ETL_filedir]//结果库表不存在则创建if(not existsDatabase(dbName)){loadSnapshot::createSnapshotTable::createSnapshot(dbName, tbName)}//调用清洗函数后台多进程写入提高写入效率start now()for (loadDate in startDate..endDate){submitJob(loadSnapshotyear(loadDate)monthOfYear(loadDate)dayOfMonth(loadDate), loadSnapshot, loadSnapshot::loadSnapshotData::loadSnapshot{, dbName, tbName, fileDir}, loadDate)}//查看写入任务是否完成以保证后续处理部分数据源完整do{cnt exec count(*) from getRecentJobs() where jobDescloadSnapshot and endTime is null}while(cnt ! 0)//查看导入过程中是否有异常有异常则抛出异常cnt exec count(*) from pnodeRun(getRecentJobs) where jobDescloadSnapshot and errorMsg is not null and receivedTime startif (cnt ! 0){error exec errorMsg from pnodeRun(getRecentJobs) where jobDescloadSnapshot and errorMsg is not null and receivedTime startthrow error[0]})processSnapshot DolphinDBOperator(task_idprocessSnapshot,dolphindb_conn_iddolphindb_test,sqlpnodeRun(clearAllCache)undef(all)go;//使用 module加载已封装好的建表及入库函数use processSnapshot::createSnapshot_arrayuse processSnapshot::processSnapshotData//通过参数共享表获取参数params dict(paramTable[param], paramTable[value])dbName_orig params[ETL_dbName_origin]tbName_orig params[ETL_tbName_origin]dbName_process params[ETL_dbName_process]tbName_process params[ETL_tbName_process]startDate date(params[ETL_start_date])endDate date(params[ETL_end_date])//结果库表不存在则创建if(not existsDatabase(dbName_process)){processSnapshot::createSnapshot_array::createProcessTable(dbName_process, tbName_process)}//后台多进程处理提高处理效率start now()for (processDate in startDate..endDate){submitJob(processSnapshotyear(processDate)monthOfYear(processDate)dayOfMonth(processDate), processSnapshot, processSnapshot::processSnapshotData::process{, dbName_orig, tbName_orig, dbName_process, tbName_process}, processDate)}//查看清洗任务是否完成以保证后续处理部分数据源完整do{cnt exec count(*) from getRecentJobs() where jobDescprocessSnapshot and endTime is null}while(cnt ! 0)//查看清洗过程中是否有异常有异常则抛出异常cnt exec count(*) from pnodeRun(getRecentJobs) where jobDescprocessSnapshot and errorMsg is not null and receivedTime startif (cnt ! 0){error exec errorMsg from pnodeRun(getRecentJobs) where jobDescprocessSnapshot and errorMsg is not null and receivedTime startthrow error[0]})calMinuteFactor DolphinDBOperator(task_idcalMinuteFactor,dolphindb_conn_iddolphindb_test,sqlpnodeRun(clearAllCache)undef(all)go;//使用 module加载已封装好的建表及入库函数use Factor::createFactorOneMinuteuse Factor::calFactorOneMinute//通过参数共享表获取参数params dict(paramTable[param], paramTable[value])dbName params[ETL_dbName_process]tbName params[ETL_tbName_process] dbName_factor params[ETL_dbName_factor]tbName_factor params[ETL_tbName_factor]//结果库表不存在则创建if(not existsDatabase(dbName_factor)){createFactorOneMinute(dbName_factor, tbName_factor)}factorTable loadTable(dbName_factor, tbName_factor)//调用计算函数calFactorOneMinute(dbName, tbName,factorTable))calDailyFactor DolphinDBOperator(task_idcalDailyFactor,dolphindb_conn_iddolphindb_test,sqlpnodeRun(clearAllCache)undef(all)go;//使用 module加载已封装好的建表及入库函数use Factor::createFactorDailyuse Factor::calFactorDaily1 //通过参数共享表获取参数params dict(paramTable[param], paramTable[value])dbName params[ETL_dbName_process]tbName params[ETL_tbName_process] dbName_factor params[ETL_dbName_factor_daily]tbName_factor params[ETL_tbName_factor_daily]//结果库表不存在则创建if(not existsDatabase(dbName_factor)){createFactorDaily(dbName_factor, tbName_factor)}//调用计算函数factorTable loadTable(dbName_factor, tbName_factor)Factor::calFactorDaily1::calFactorDaily(dbName, tbName,factorTable)) 根据任务间的依赖关系构建 DAG示例如下 start_task create_parameter_table given_param loadSnapshot processSnapshot calMinuteFactor calDailyFactor DolphinDBOperator 增量数据入库 增量数据任务构建代码如下 addLoadSnapshot DolphinDBOperator(task_idaddLoadSnapshot,dolphindb_conn_iddolphindb_test,sqlpnodeRun(clearAllCache)undef(all)go;//使用module加载已封装好的入库函数use addLoadSnapshot::loadSnapshotData//通过参数共享表获取参数params dict(paramTable[param], paramTable[value])dbName params[ETL_dbName_origin]tbName params[ETL_tbName_origin]fileDir params[ETL_filedir]//获取交易日历MarketDays getMarketCalendar(CFFEX)//是交易日则进行数据入库if(today() in MarketDays ){fileDir params[ETL_filedir]addLoadSnapshot::loadSnapshotData::loadSnapshot(today(), dbName, tbName, fileDir)})addProcessSnapshot DolphinDBOperator(task_idaddProcessSnapshot,dolphindb_conn_iddolphindb_test,sqlpnodeRun(clearAllCache)undef(all)go;//使用module加载已封装好的清洗函数use addProcessSnapshot::processSnapshotData//通过参数共享表获取参数params dict(paramTable[param], paramTable[value])dbName_orig params[ETL_dbName_origin]tbName_orig params[ETL_tbName_origin]dbName_process params[ETL_dbName_process]tbName_process params[ETL_tbName_process]//获取交易日历MarketDays getMarketCalendar(CFFEX)//是交易日则进行数据处理if(today() in MarketDays ){addProcessSnapshot::processSnapshotData::process(today(), dbName_orig, tbName_orig, dbName_process, tbName_process)})addCalMinuteFactor DolphinDBOperator(task_idaddCalMinuteFactor,dolphindb_conn_iddolphindb_test,sqlpnodeRun(clearAllCache)undef(all)go;//使用module加载已封装好的计算函数use addFactor::calFactorOneMinute//通过参数共享表获取参数params dict(paramTable[param], paramTable[value])dbName params[ETL_dbName_process]tbName params[ETL_tbName_process] dbName_factor params[ETL_dbName_factor]tbName_factor params[ETL_tbName_factor]factorTable loadTable(dbName_factor, tbName_factor)//获取交易日历MarketDays getMarketCalendar(CFFEX)//是交易日则调用计算函数合成分钟K线if(today() in MarketDays ){addFactor::calFactorOneMinute::calFactorOneMinute(dbName, tbName,today(), factorTable)})addCalDailyFactor DolphinDBOperator(task_idaddCalDailyFactor,dolphindb_conn_iddolphindb_test,sqlpnodeRun(clearAllCache)undef(all)go;//使用module加载已封装好的计算函数use addFactor::calFactorDaily1 //通过参数共享表获取参数params dict(paramTable[param], paramTable[value])dbName params[ETL_dbName_process]tbName params[ETL_tbName_process] dbName_factor params[ETL_dbName_factor_daily]tbName_factor params[ETL_tbName_factor_daily]factorTable loadTable(dbName_factor, tbName_factor)//获取交易日历MarketDays getMarketCalendar(CFFEX)//是交易日则调用计算函数合成日K线if(today() in MarketDays ){addFactor::calFactorDaily1::calFactorDaily(dbName, tbName,today(), factorTable)}) 根据任务间的依赖关系构建 DAG示例如下 start_task create_parameter_table given_param addLoadSnapshot addProcessSnapshot addCalMinuteFactor addCalDailyFactor 2.5.4 生成 DAG 根据如下步骤部署项目 第一步 DolphinDB 项目部署 将 DolphinDB 项目中的 addETL 和 fullETL 项目分别导入 DolphinDB GUI (DolphinDB 客户端工具)中 将 addETL 及 fullETL 项目中的 module 模块上传至 Airflow 中已建立连接的 DolphinDB server 中 第二步 python 项目部署 将 python 项目中的 python 脚本放置到 Airflow_install_Dir/airflow/dags 目录下。注意新建的 DAG 任务并不会马上出现在界面上默认需要等待5分钟后刷新也可修改 airflow.cfg 文件中的 dag_dir_list_interval 调整刷新间隔。 第三步 Airflow 变量导入 在 Airflow 网页中进入 Admin--Variables将 Variables.json 文件上传将变量导入 Airflow 中并根据实际情况调整变量值。 第四步 上传原始数据文件 将数据文件上传至服务器并根据数据文件的实际存放路径在 Airflow 中修改 ETL_filedir 变量。如运行增量 ETL 任务需要将数据文件名中的日期改为当前日期如20230330snapshot.csv以避免无数据导致任务失败。 最终实现 DAG 如下所示 全量数据入库 增量数据入库 运行任务后任务实例为绿色代表任务运行成功红色表示任务运行失败橙色则表示该任务所依赖的上游任务运行失败任务未启动。 3. 常见问题解答(FAQ) 3.1 如何捕获 DolphinDB 脚本中的 print 函数打印的信息 DolphinDB 脚本的 print 的信息为标准输出可以在 airflow-scheduler.out 中找到如下图所示3.2 DolphinDB 脚本中的异步作业 submitjob 如何检测其完成状态 通过 DolphinDB 的函数 getRecntJobs 获取后台作业信息, 之后在 DolphinDB DAG 中添加判断逻辑, 代码示例如下 DolphinDBOperator(task_idprocessSnapshot,dolphindb_conn_iddolphindb_test,sql//检查所有任务是否全部完成do{cnt exec count(*) from getRecentJobs() where jobDescprocessSnapshot and endTime is null}while(cnt ! 0)//检查后台任务是否成功失败则抛出异常cnt exec count(*) from pnodeRun(getRecentJobs) where jobDescprocessSnapshot and errorMsg is not null and receivedTime startif (cnt ! 0){error exec errorMsg from pnodeRun(getRecentJobs) where jobDescprocessSnapshot and errorMsg is not null and receivedTime startthrow error[0]}) 3.3 执行 Airflow 中经常遇到连接超时断开该如何处理 当遇到如上问题可能是网络延时导致的可以在建立连接时设置参数如上图在 DolphinDB 连接中设置 KeepAliveTime 及 reconnect 参数即可。 3.4 将 start_date 日期设为当前日期每天运行一次为什么当天不会运行 在 Airflow 中一个定时调度任务的最早开始时间为 start_date scheduler_interval例如start_date 2023.03.16每天调用一次则最早一次任务调度为 2023.03.17所以当天的任务无法执行。 3.5 DolphinDBOperator 任务运行失败如何定位失败原因 任务失败后DolphinDBOperator 会将具体的错误信息打印在日志中可通过查看日志信息定位异常代码并进行修改。查看日志信息步骤如下4. 总结 ​ 本教程从一个常用行情数据 ETL 案例出发着重阐述了如何将 Airflow 调度框架与 DolphinDB 数据库结合起来进行结构化 ETL 作业管理, 以此来节省时间提高效率降低运维成本。同时由于篇幅有限涉及到DolphinDB 和 Airflow 框架的一些其它操作未能更进一步展示用户在使用过程中需要按照实际情况进行调整。也欢迎大家对本教程中可能存在的纰漏和缺陷批评指正。 附件 依赖包pydolphindb-1.0.0-py3-none-any.whlapache_airflow_providers_dolphindb-1.0.0-py3-none-any.whlDolphinDB 工程项目addETLfullETLPython 项目addETL.pyfullETL.py数据文件20210104snapshot.csvAirflow 变量Variables.json
http://www.dnsts.com.cn/news/41104.html

相关文章:

  • 吉林品牌网站建设商家广告宣传册制作公司
  • 公司网站服务器选择西安有哪些家做网站的公司
  • 绍兴网站设计合肥企业网站制作
  • 订阅号可以做网站吗汕头老城区是什么区
  • 检测 网站优化网站的优点和缺点
  • 网站建设课程心得体会汉化wordpress主题
  • 哪个网站可以做前端项目网页设计在线培训网站有哪些
  • 上海网站推广优化购物网站建设技术难点
  • 商城网站前端更新商品天天做吗wordpress做的好的网站
  • 南昌市会做网站有哪几家尚海整装电话号码
  • 济南市建设工程招标投标协会网站wordpress 评论500
  • 软件开发工程师面试问题网站有源代码如何做seo
  • 网站建设见站分析和准备论文wordpress 内容置顶
  • 建设国际互联网网站惠州做棋牌网站建设有哪些公司
  • 有关网站空间正确的说法是市场营销女生好就业吗?
  • 美乐乐 网站建设seo教学网站
  • 网站建设要备案吗外包加工网手工活
  • 淘宝美工做倒计时图片网站山西做网站推广
  • 莱芜摩托车网站低代码无代码开发平台
  • 建网站素材wordpress上传svg
  • 个人网站备案备注哈尔滨关键词搜索排名
  • 网站建设阿胶膏的作用网站品牌建设流程
  • 天河做网站孝感建设银行网站
  • 广州建设专业网站做网站如何适应分辨率
  • 百度做网站的公司怎样做国际网站平台
  • 网站建设与管理是什么意思北京广告公司名录
  • 银川哪里做网站网站开发培训机构需要多少钱
  • phpcms 专题网站模板seo北京
  • 专业网站建设人工智能研发青岛栈桥门票多少钱
  • nginx wordpress多个站点在人才网站做业务