公司做二手网站的用意,千锋教育培训收费一览表,网站优化毕业设计,网络营销做得好的公司说明 所有的基于时间处理和运行的程序将以同样的节奏同步和执行 TT(TimeTraveller)是一个新的设计#xff0c;它最初会服务与量化过程的大量任务管理#xff1a;分散开发、协同运行。但是很显然#xff0c;TT的功能将远不止于此#xff0c;它将服务大量的#xff0c;基于时…说明 所有的基于时间处理和运行的程序将以同样的节奏同步和执行 TT(TimeTraveller)是一个新的设计它最初会服务与量化过程的大量任务管理分散开发、协同运行。但是很显然TT的功能将远不止于此它将服务大量的基于时间游走特性的各种任务。
内容
1 概念
以量化场景作为假设讨论所需要的功能以及TT在其中所起的作用。
1 数据获取。程序需要在每个时隙启动从接口或者网页获取新的数据然后落库。2 数据流转。这个不在这里讨论假设数据流转的结构和渠道已经固定会有一个程序来专门确保流转。3 数据特征。程序在每个时隙启动回顾过去n个时隙(brick, block)的内容生成当前时隙新的特征。4 数据决策。程序在每个时隙启动回顾过去时隙的内容生成当前时隙的决策。5 过程跟随。程序在每个时隙检查检查采取的行动给出当前时隙的新建议。6 决策评估。在每个时隙评估情况并作出参数改变。…
以上的每一个过程都是 一个相对独立且复杂的过程如果只是case by case的开发最后会发现情况及其混乱难以维护。
所以是否可以有一种机制或工具可以基本上统一以上所有的过程
时隙 假设程序只会在每个时隙启动并处理一次正常情况下最新的决策、行动、数据、信息可能会落后最实时的时隙1~10个。这主要是由流程的深度决定的同时也考虑了几乎无限逻辑复杂度情况下的可同步单位。这个单位目前来看是1分钟。
时间特性
1 顺序性。时间会按照严格统一的顺序向前推进不可能跨时隙跳跃。这意味着要处理的数据顺序和步骤是完全确定的。同时站在某个时隙时不可能观察到未来时隙 技术上说某个程序在t时隙启动只能看到t的时隙数据。2 单向性。只可能从小到大的执行。
【Method】GoBack: 某个过程在执行过程中出错在T时刻时发现并且知道在T’时刻(T’T)是正常的那么我们回到T’状态并重新执行的过程叫GoBack。一个GoBack过程可以是幂等也可以是分叉这个由过程本身决定。当引入了生成机制那么GoBack过程不是幂等的结果可能产生变化。
【Method】Run : 执行过程每次TT实例总是从当前时隙执行到最新时隙。 【Attr】LastRunSlot : 上一次执行运行的时间。
【Attr】LastDataSlot : 上一次处理到的数据时隙(含) 【Attr】LatestDataSlot : 最新的数据时隙这是查询当前数据源得到的。
【Attr】t : 当前执行时的时隙
2 前提条件 TT执行时只考虑一个固定机制这极大的简化了大量工作的同步和协同。但这是建立在一系列机制之上的。 1 DataFlow: 这是由Flask-APScheduler-Celery MongoEngine为基础组成的异步任务调度任务系统系统主要是完成IO密集操作的并行操作通过协程可以在不额外增加CPU开销的情况下大幅提升处理能力。这部分对应着以前的sniffers逻辑上sniffer主要就是嗅探各种数据变化然后进行数据传递。 消息队列(Redis Stream、RabbitMQ)、主库(Mongo)、分析库(ClickHouse)构成了数据流的数据库核心。2 GFGoLite:这个本身是一个全局的、无状态的函数服务。与TT相关的是UCS规范相关的实现当TT需要追溯时间数据时可能需要通过UCS对象(背后是GFGoLite)操作。3 GlobalBuffer: 首先通过较为严格的tiers命名方法确保数据始终可以使用kv的方式存取。一方面是程序(对象)本身的状态信息需要暂存(以便加载时可以恢复执行),另一方面是程序依赖的时间数据需要载入(预载入)以便快速处理。
3 设计
先考虑几个常见的应用场景。
3.1 回测
对于标的A开发了策略S我们需要对S进行回测以确保其效果特别是对于OOB的表现。
回测会指定一套规则集然后执行一个时间区间从头开始执行到尾然后停止。
1 顺序加载数据(Read)。指定了开始和结尾之后通过UCS可以获取brick_list,从而精确获取需要执行的每一个brick。brick数据是结构化的存在于clickhouse中加载速度会非常快。(RedisOrClickHouse)2 执行规则。回测过程读取的全部是结构化数据不包含向量。向量是中间数据(这可能会导致一些delay所以数据处理和特征生成一般在slot的前半部分执行而决策在后半部分执行这样如果调度得当的话还是有可能只落后一个时隙的)。3 回写数据。单次执行完毕后会有对应的行动数据需要写入数据库。数据可以分为两部分一部分是明细数据需要存入数据库作为进一步分析这部分数据推入RedisStream(元数据也顺带推入作为checkpoint)。;另一部分是运行时元数据这部分只要缓存在Redis里即可。任何时刻只要从checkpoint恢复都可以退回到那个时刻重新开始。
回测时数据以拉为主数据在缓存中存在1小时或者一天然后自动删除
3.2 运行时 与回测不同运行时会存在很多空转情况甚至出现依赖错误。 运行时一般会处在等待最新数据流入然后处理的情况。有几个问题是需要考虑的
1 网络连接【偶发失败、挂起、超时】这不是可选项而是必选项。网络连接表现为偶发中断连接完全挂起超时等。2 数据源更新【推和拉】部分数据源未更新全部数据源未更新。数据更多是采取服务端主动推送到缓存中而不是程序直接去数据库取。3 处理逻辑分为导入依赖、数据预处理、数据处理和数据后处理几部分。
服务主动将数据推到缓存中确保服务中存在的缓存数据是最新的。
这里要做的完善的话应该结合类似prometheus之类的工具去做。但是这个目前我没搞所以会考虑一种折中的方式去实现这个。比较明确的是在运行上可以用状态机的方法来控制。 正常 Normal 程序启动后按照既定计划完成了数据更新逻辑计算和结果保存。 Success滞后 Lag 虽然程序完成了执行但是上一个时间点和当前运行时间点的差值大于阈值认为数据出现了滞后。缓慢 Slow : 程序出现了缓慢执行的情况 错误 Error: 程序遇到了无法执行的问题 读入错误 Read 连接错误 Connection超时 OutOfTime 处理错误 Processing 依赖 Dependency主逻辑 Logic 写入错误 Write 连接错误 Connection超时 OutOfTime
3.3 设计
1 TT的初始化分为全新初始化和断点续传(checkpoint)两种2 TT的运行分为固定周期执行(fixed-run)和嗅探执行(sniff-run)两种模式。前者一次性检查数据完整性然后执行后者使用状态机管理一般运行时状态。3 TT采用Tiers方法进行精确编号每一个TT实例将会一直使用这个编号。4 TT需要GLobalBuffer作为默认的元数据保存方法。5 TT需要QManager作为默认的数据保存方法。保存数据日志到stream然后由一个任务来解析这些日志 可以考虑存到clickhouse(之前一般觉得mongo比较合适6 TT需要UCS作为brick的推算方法。这意味着GFGoLite重启会有影响所以TT需要考虑UCS挂掉一会的情况。7 TT将会使用前端管理名称的统一命名将会在元数据表中记录。这意味着TT将使用MongoEnige和Mongo集群(mymeta)
一个简单的hypo如下
from typing import List, Optional,Dict
from pydantic import BaseModel# 测试
from Basefuncs import QManager,UCS,GlobalBuffer# Naming ... 先确保命名通过 - 通过MongoEngine进行操作正确的结果将写入GlobalBuffer# 假设每个实例是从一个名称开始的所以会先从GlobalBuffer中读取配置来初始化# 这个类的目的是构造通用的处理对象
import requests as req
class TimeTraveller(BaseModel):__version__ 1.0meta : Dict {}tiers_name: str tier1.tier2.tier3.option_tier1.option_tier2qmanager_redis_agent_host: str http://172.17.0.1:24021/qmanager_batch_size: int 1000qmanager_q_max_len: int 100000ucs_gfgo_lite_server: str http://172.17.0.1:24090/# global buffer同样需要先设置名称 | 空间名称要和这个叠加起来globalbuffer_server: str http://172.17.0.1:24088/globalbuffer_space_name: str tem_test.test# 这里可以增加校验 | 例如给到空间的一些解释 # __space_description# 执行初始化校验def __init__(self, **para):super().__init__(**para)print(当前版本, self.__version__)# 在初始化时对QManager、UCS和GlobalBuffer进行校验def _check_init_parts(self):pass# 读取数据部分: 用户自定义部分有时候可以为空def read_part(self):pass# 处理数据部分用户自定义通常不会为空def process_part(self):pass# 输出数据部分用户自定义通常不会为空def write_part(self):pass4 实现
1 在开始之前先要建立tt的专属消息管道例如stream_tt_outcome_in和 stream_tt_outcome_out。2 创建一个实例先按规则构造一个名称并随之创造对应的redis_var空间。3 假设策略就是SMA构造对应的函数4 先进行历史区间的静态回测5 再进行最新的运行时测试