平面设计免费网站,做网站多少钱一张页面,wordpress 打卡插件,石家庄市高新区建设局网站1. Flink的资源和代码优化
1.1 slot资源配置
Flink中具体跑任务的进程叫TaskManager#xff0c;TM进程又会根据配置划分出诺干个TaskSlot#xff0c;它是具体运行SubTask的地方。slot是Flink用来隔离各个subtask的资源集合#xff0c;这里的资源一把指内存#xff0c;TCP…1. Flink的资源和代码优化
1.1 slot资源配置
Flink中具体跑任务的进程叫TaskManagerTM进程又会根据配置划分出诺干个TaskSlot它是具体运行SubTask的地方。slot是Flink用来隔离各个subtask的资源集合这里的资源一把指内存TCP连接和CPU是共享的 关于Slot在TM中的个数设置可以参考如下
如果是Standalone模式建议Slot的数量和TM的Cpu Core一致如果是Yarn模式会根据配置动态申请TM此时TM就是Yarn的Container所有对应Slot的设置要根据yarn中Container最大可申请的Core一致默认是4个
1.2 并行度设置
通常说的并行度是指一个job中并行度最大的那个算子的并行度设置合适的并行度最关键的还是靠多次的调试但可以从以下几个方面考虑设置初始并行度
数据源的并行度对于source/sink端的并行度一般设置成跟source本身的并行度一直即可在消费速度跟不上时出现背压可以减少source的并行度减缓背压。但是不建议source端的并行度大于source本身的并行度这样就会出现并行度空闲的情况浪费资源算子逻辑的复杂度这个比较抽象一般如果处理逻辑较多时或是需要连接外部资源的时候都会提高算子的复杂度此时可适当提高并行度这一块也是需要多次调试的地方系统资源可用性在考虑并行度的时候可以考虑集群的可用资源多少
1.3 SlotSharingGroup
SlotSharingGroup是实现taskslot共享的一种机制在程序中会有一个默认的“default” 共享组不同的subtask可以组成一个完整的job管道放在同一个TaskSlot中运行他们共享这个taskslot中的所有资源这些subtask的数据传递就变的更加的简单提高了运行的效率。不同并行度的相同subtask会被分配到不同的taskslot中这样会分散各个taskslot的运行压力合理利用集群资源 如果一个job中有多个SSG这个job所需要的taskslot就不是等于它最大并行度而是等于各个SSG中各自最大并行度的和 1.4 细粒度资源管理
slot是Flink运行时资源调度和分配的基本单元Flink1.14之后出现细粒度资源管理概念之前的版本中对于TM的资源会平均分配给自己拥有的所有slot这种分配方式会导致一些计算密集性和内存需求大的subtask“挤在”同一个slot中从而影响作业的运行效率而且有些subtask可能需要类似GPU这样的贵重资源之前的粗粒度资源管理方式是不能解决的。细粒度的资源管理是指可以动态的剪切一个slot专门给到对应的subtask运行如下图所示 具体是实现案例如下
//创建SSG共享组对象指定资源配置
SlotSharingGroup ssgB SlotSharingGroup.newBuilder(b).setCpuCores(0.5).setTaskHeapMemoryMB(10).build();
//直接指定SSG共享组对象名称来使用SSG共享组资源
DataStream... ds1 someStream.filter(...).slotSharingGroup(ssgB)1.5 Flink的异步IO
Flink中默认是同步IO向外部组件发布一条查询命令必须要等到第一条回复了才可以继续发送这种情况要是在有大批量数据的时候就会出现阻塞的情况通过异步的IO就可以一次将多个查询发送到外部系统统一得到回复用这个方法来提高效率 需要用异步IO需要外部系统支持异步请求的客户端如Java的Vertx如果不支持可以用线程池模拟异步客户端
1.6 设置barrier对齐和非对齐
checkpoint机制中用barrier来衡量此时是否可以做算子本地的快照存储在多并行度的情况下barrier在上下游传递时会涉及到barrier广播和barrier对齐机制。当上游数据向下游多个并行度发送barrier时需要对barrier进行广播保证下游各个并行度barrier一致当上游多个并行度向下游少量并行度传递barrier时需要对barrier进行对齐因为每个并行度在上游算子处理完对应数据的时间时不一样的所有往下游发barrier的时间也不一样。 如上图所示barrier对齐机制中快的并行度会等待慢的并行度到达之后才能开始做本地的状态快照在等待的过程中来的数据就会被缓存起来如果并行度之间相差很大就会占用很大的内存资源可能会导致内存溢出同时它的等待也会迟缓checkpoint的整体时间还有当数据流被阻塞住处理来不及时会触发反压机制进而更加限制数据流的流动导致barrier在的流速更慢进一步导致checkpoint时间变长进入恶性循环 针对上面的问题Flink中提出barrier的不对齐机制解决这个问题它的运行过程中并不会等待那些barrier流速慢的并行度最快的那个并行度到达之后对应的算子就会做本地的状态快照并把barrier继续往下游发并移除慢的并行度的barrier从细节上来说不对齐机制会有额外的input buffer和output buffer用来缓存流中未处理的数据当barrier达到下游算子的input buffer后Flink会将该barrier插入到该下游算子的output buffer的最前面并把该barrier发送给后面的算子同时当前算子做快照其中包括当前算子的状态已经input/output buffer 已经流式慢的barrier之前的数据全部都会保存在状态后端。 两种机制的优缺点对比
barrier机制 优点状态后端保存的数据少缺点缓存的数据多对Flink的内存压力大checkpoint时间长易触发反压机制 barrier不对其机制 优点大大加快了checkpoint的进程不容易出现反压问题缺点状态后端保存的数据多状态恢复时间长 建议简单的数据处理作业使用轻量级的barrier对齐机制但对于计算复杂容易出现反压checkpoint经常会超时的作业用barrier不对其机制
2. Flink的内存优化
Flink的内存分为Flink的总内存和JVM自身需要的内存Flink总内存又分为对堆内存和堆外内存首先堆堆和堆外都有Framework和Task内存其中堆外还包含托管内存和Network内存这就是Flink内存的模型。其中堆内存是平时跑task的时候用的堆外内存总的来说是为了job的优化提升运行效率而存在的其中托管内存是用于状态后端数据存储等功能使用Network用于各个TM之间网络数据的交互使用。 主要优化方式如下 一般需要注意的是TaskManager的内存因为job的运行主要是在TM上。通常是在程序的配置文件中配置taskmanager.memory.process.size2G,jobmanager.memory.process.size1G,这样的默认配置即可然后就会到预发布环境去测试运行的效率主要的标准有这么几个一个是测试程序在历史峰值的时候会不会出现反压另一个是再提升到历史峰值的20%的数据量如果这两个都不会出现反压就可以。如果出现反压的情况但是不是很严重的情况我们一般会通过调整一些参数再观察结果这个就需要根据自己的程序具体分析比如程序中开启了checkpoint而且状态数据也比较大的程序可以试着调大taskmanager.memory.managed.fraction它是控制托管内存占Flink总内存的比例的增大用于状态后端的内存。如果程序中需要大量数据在各个TM之间传递的话就试着调大taskmanager.memory.network.fraction它是控制network内存在flink内存的占比。到此如果还是被压严重就会试着加大TM的整体内存大小 3. Checkpoint和大状态优化
3.1 Checkpoint的优化
3.1.1 在webui中查看checkpoint的情况 点击running的job再点击checkpoint标签主要是看end to end Duration checkpointd data size full checkpoint data sizeprocessed(persisted) in-flight data这几个重要指标他们分别表示此次CK花的时间产生多少数据缓存了多少数据0表示没有缓存数据
3.1.2 优化
设置CK的存储位置 推荐是写入外部的文件系统中默认是JM的堆内存中CK的模式 主要看任务的需求堆数据的一致性要求高可以使用默认的exactly-once如果堆时延和吞吐要求较高可以使用at-least-once指定CK之间的最小间隔时间 通过观察webui上CK的启动时间如果经常超过CK的设置时间说明CK很繁忙同时也意味这过多的资源被用于CK的过程为了防止这种情况可以试着调整两个CK之间的等待时间让更多的资源用于算子计算来缓解CK的超时问题设置CK并行度 这种方案需要保证Flink集群资源充足才可以不然只会使情况更加恶化设置多并行度的CK意味着CK需要占用更多的资源设置增量的CK 这点对于大状态的作业效果比较明显Flink提供了两类状态后端其中HashMapStateBackend是基于JM的内存存储的全量状态存储还有就是基于EmbeddedRockDBStateBackend的增量状态后端它的数据是存储在TM的本地数据目录开启不对齐CK 当Flink处于严重背压的情况下同时它会缓存大量的数据导致CK的周期加长这种情况下可以试着开启非对齐的CK它允许第一个barrier到达之后就开始做本地快照不等慢的并行度同时放弃落后的并行度往下游传barrier直接把它拉齐这样很大程度上加快了checkpoint的速度开启Changelog Changlog是为了优化增量快照的一种机制在增量快照中RocketDB会定期的对本地全量快照做压缩压缩出来的新数据肯定会大于增量的数据而且上传也是随着checkpoint机制一起执行的如果对于大状态且task比较多的job来说这样压缩后的大状态上传就会变的频繁这就会导致checkpoint整体的效率变低。 Changlog的解决方案 它主要思路是通过增加Changlog记录实时的状态变更日志来辅助增量状态的更新压缩后的大状态它会独立于checkpoint机制单独周期性的上传。具体的实现是在task做本地快照的时候它会往Changlog里写同时也往state tableRocketDB中写然后state table中的数据会周期性的独立上传changlog是实时的上传等state table把对应的状态上传之后就会截断changlog中的数据保证checkpoint Storage只存储一份完整的状态数据Changelog的缺点 由它的机制可以看出会在状态存储系统创建更多的文件同时changlog会给TM带来额外的内存开销在状态恢复的过程中需要额外的重放changlog
3.2 RocksDB优化
在对RocketDB进行优化时我一般会分如下几个层级
调整Flink的manage memory占比 Flink中的manage memory主要是给状态后端使用所以可以调大它在Flink总内存中的占比taskmanage.memory.managed.fraction0.4这是默认值并观察调优效果调整RocketDB读写的性能 经过调整托管内存的效果不明显可以手动控制RocketDB的参数state.backend.rocksdb.memory.managedtrue默认修改成false可以先优化它的读写性能上入手RocketDB的架构跟Hbase的类似一共有memtableblockbuffersstfile三层架构组成数据会先写入memtable中也就是内存中等内存的数据达到一定量时会刷写到磁盘中。在读数据的时候优先会到memtable中读取如果没有会到blockbuffer中读取它时通过布隆过滤器的形式得到的一个缓冲数据如果还是没有找到会到sstfile也就是磁盘上查找。通过以上读写的原理了解可以增加memtable的内存来优化写的性能state.backend.rocksdb.memory.write-buffer-ratio0.5可以通过增大索引和布隆过滤器的内存来优化读的性能stae.backend.rocksdb.memory.high-prio-ratio0.1默认。调整RocketDB更底层的参数 如果上面的两个方式效果都不明显那只能对RocketDB更底层的参数进行调整一般不太建议要进行它的调整建议先开启rocketDB的监控通过监控更好的观察到问题再去调整开启RocketDB监控需要损耗job的整体性能具体的可参照官网慢慢调整
3.3 Task本地恢复
Task本地恢复功能默认是禁止的需要配置“state.backend.local-recoverytrue”来开启Flink默认情况下task是通过到远程持久化存储中拿到对应task的状态用于恢复这样做的优势是状态具备天然的容错性和各个节点都可以访问获取状态信息但是缺点也存在那就是从远程读取效率比较低下这会导致大状态的task恢复时间长。而且很多时候task失败重试时Flink会把task分配到原先节点上继续运行这就为task本地恢复提供了可行性。task本地恢复的原理是对于每个checkpoint每个task不仅将状态写入分布式存储中同时还在task本地存储一份相同的备份对于那些重启的task并且还在被分配在之前的taskmanager节点上运行的task来说不需要从远程读task状态直接可以在本地拿到状态数据用于恢复。 需注意以下几点 1. 如果对应的taskmanager丢失那么task的本地状态也会丢失 2. task本地恢复仅涵盖keyed state不支持算子和定时器状态 3. unaligned checkpoint目前不支持task本地恢复 4. 网络内存优化
4.1 Flink网络传输的过程
Flink网络传输是通过类似生产者-消费者的模式实现如图上游的TM1生产好数据之后会写入ResultPartition中然后会通知Jobmanager上游数据准备好了JM会通知到下游的TM2并找到对应接收此RP缓冲区中的InputChannel然后InputChannel会返回去通知RP可以启动网络传输之后RP会把缓冲区数据交给TM1的网络堆栈通过Netty进行传输到此就完成了上下游数据的传输。 这里需要注意TM1和TM2之间的Netty网络连接是共享的而且是长期存在的 4.2 数据反压机制
Flink的反压机制有两种实现方式一种是老版本的基于TCP的反压机制一种是基于Credit的反压机制。下面介绍下这两种 基于TCP的反压机制 下游的消费者处理数据的速度慢下来之后刚开始的时候上游感知不是很强还是依旧源源不断的产生新的buffer数据然后通过netty往下游传递下游的接收模块InputGate会一直接收直到把InputGate模块下的所有缓冲区占满此时缓冲区无法接收数据发过来的数据就会堵塞在netty底层socket缓冲区中由于产生的数据无法发送就又会把生产端的缓冲区占满到此上游就会感知到来之下游的反压。 需要注意的TCP的反压机制有这么几个缺点 1.上游要感知到下游的反压需要把上下游的数据缓冲区socket缓冲区都占满之后才能知道导致反压链路过长 2.由于socket网络连接是TM级别的一旦被一个task占满之后其它消费正常的task也会被阻塞 基于credit的翻页机制 它主要是解决TCP反压带来的问题的从Flink的1.5版本开始因引入主要的思路如下图所示它通过让接收模块的InputGate和发送模块的ResultPartition直接沟通感知下游此时的消费速度来判断上游是否要触发反压机制。具体实现上来说当ResultPartition模块准备好数据之后会告知下游此次需要传输的数据量是多少个buffer每个buffer32kbInput Gate模块接收到会返回此时自己最多能接收多少个bufferRP接收到反馈之后就会按照要求给下游发数据。此时如果下游可接收的buffer数量为0就说明下游的消费已经停止上游就需要触发反压机制
4.3 网络内存优化
缓存销胀机制 根据当前消费数据的速率来自动的计算一个合理的缓冲区数量来保障Flink在做checkpoint时的效率和job运行整体的性能通过配置taskmanager.network.memory.buffer-debloat.enabledtrue来开启开启销胀机制是需要消耗job的资源的所有如果吞吐不佳的时候可以关闭它缓冲区数量设置建议 如果吞吐不佳时关闭销账机制需要手动调整网络缓冲区个数可以从如下方面入手缓冲区分为独占缓冲区和流动缓冲区独占缓冲区是单独给task用的如果不够程序会报错流动缓冲区是为了处理数据倾斜提供作业性能的缓冲区的计算公式number_of_buffers expected_throughput * buffer_roundtrip / buffer_sizeexpected_throughput表示希望的吞吐量比如320MB/sbuffer_roundtrip表示数据在节点之间的延时一般默认1msbuffer_size表示每个buffer的容量默认是32kb所有想要吞吐量达到320MB/s需要的缓存区数量是number_of_buffers320MB/s*1ms/32KB10
5. Flink的反压优化
反压是指Flink程序下游的消费数据跟不上上游的数据产生速度而触发的一种警告机制它会直接给job带来如下的影响
Flink性能下降 由于下游的消费速度跟不上直接迫使上游的数据来源端产生数据的速度变慢从而导致Flink整体的吞吐量下降性能下降checkpoint时间变成或失败 反压导致整个任务的流速变慢这也将导致checkpoint barrier流经整个数据管道的数据变长从而增加CK的总体时间严重的可能会超时如多次超时则会导致失败内存OOM 在checkpoint的对齐机制中反压会导致部分并发度的barrier到达时间更晚这就意味这快的并发度在等待的过程中需要缓存大量的数据这就有可能导致OOM的出现任务卡住 在下游有窗口计算的时候反压会导致watermark往下游流速变量这会导致窗口迟迟不触发就会出现卡顿现象
5.1 Flink反压问题的定位
禁用Flink任务算子链后再次运行任务根据JobGrap定位出现反压的位置结合webui task的执行情况定位具体的问题点解决对应的问题 一般先根据webui大致定位是哪个算子的问题再webui上先找到没有出现反压的第一个算子它往往是处于繁忙状态颜色是红色的程序中如果出现性能问题往往就是这个算子导致的如下图所示可以具体结合代码进行处理 结合webui执行情况定位具体问题如下图所示1号subtask接收的处理的数据几十倍与其他的subtask这明显就是数据倾斜导致的问题。 此外我们还可以通过火焰图来查看Flink业务逻辑是否出现性能问题它可以把Flink中出现的对象占用CPU的时长大小显示出来我们可以根据业务情况判断此对象是否正常
5.2 Flink反压的原因及优化
资源设置不合理 数据源生产数据速度很快下游消费跟不上此时容易产生反压。 可以通过webui查看每个task内存使用情况适当增大Flink任务资源及并行度来解决问题 突发性数据量激增 数据流突增很可能会触发反压机制如果频繁的激增的话可以适当增加Flink任务的并行度来分摊数据倾斜问题 数据倾斜会导致其中一两个task处理很大量的数据这就容易导致处理不过来触发反压这种情况需要根据业务数据查找倾斜问题点如是因为聚合之后出现大量相同的key导致可以使用多阶段聚合把对应的key打散来解决代码执行效率问题 这个问题可以通过查看火焰图的方式观察到筛选出自己写的类观看使用CPU的时长就能发现问题一般在代码优化方面可以从源头数据采用多并行方式读取外部数据库使用异步IO的方式提高算子并行度避免打状态checkpoint使用不对齐机制多步骤分散业务避免在一个算子内实现复杂逻辑
6. 数据倾斜
数据倾斜往往是由于个别subtask处理绝大多数数据在Flink中往往以反压的形式表现出来然后我们可以通过查看webui里的subtask的指标可以具体观察出哪些发生数据倾斜的subtask然后继续向上游找一直找到发生倾斜的源头分析原因具体解决
6.1 数据倾斜原因和处理方案
数据倾斜一般有两种原因一中是数据源本身有数据倾斜问题比如kafka的某个partition的数据量特别多还有一种是因为进行了KeyBy操作后导致的数据倾斜。
数据本身倾斜 这个可以通过修改Flink上下游的分区策略即可Flink上下游之间并行度一致时默认用forward的分区策略forward分区策略会保持上下游一比一的数据流流转方式这对于那些有复杂逻辑的subtask来说很容易就会出现反压等一些问题。可以通过shuffle()或是rebalance()算子来打散那些出现数据倾斜的分区KeyBy导致数据倾斜 这种倾斜一般使用多阶段聚合的方式来处理首先导致这种倾斜的原因是个别key的数据量特别大通过KeyBy时这些数据都被分配到一个分区里由一个subtask来处理这就会导致反压甚至OOM等一些列问题。然后我们解决这个问题的思路就是把这些大数据量的key打散到多个分区中由多个subtask去处理再在下游对这些subtask的结果再进行处理这样就达到了最终目的。 需要注意的点Flink是实时数据流在进行多阶段聚合的时候不能简单的用聚合函数直接聚合因为它每次会把聚合后的结果往下游传递这样得到的结果是无效的甚至统计的结果会不准比如用sum算子它每次往下游传递的是sum之后的结果等到再次聚合的时候算的是sum结果的结果并不是原来数据的结果了
Flink中的多阶段聚合操作可以用“攒批”聚合之后再发往下游处理或是用开窗的方式把数据聚集在窗口中计算好再往下游发送
7. Tabla和SQL优化
7.1 使用Flilnk SQL的累计窗口
Flink中的滑动窗口和滚动窗口比较适合周期不长要求输出结果的场景比如每10s统计当前的在线人数滚动窗口每10s统计近一个小时的用户在线人数滑动窗口如果要求统计一天内的在线人数每20秒输出一次。这种需求上面的两种窗口就不太合适FlinkSQL提供一种累计窗口是专门解决这个问题的具体实现如下
Table result tableEnv.sqlQuery(select sid,window_start,window_end,sum(duration) as sum_dur from TABLE( CUMULATE(TABLE stationlog_tbl,DESCRIPTOR(time_ltz), INTERVAL 5 SECOND , INTERVAL 1 DAY) ) group by sid,window_start,window_end);其CUMULATE是个表值函数里面的TABLE是对应的表descriptor指定表中的时间列剩下的两个超参数分别指定窗口累积的步长和指定窗口的长度 7.2 利用MiniBatch优化聚合
默认情况下Flink是来一笔数据就聚合一次聚合数据时会读取状态值然后进行聚合操作之后把结果又写回状态中每笔数据都要进行这些操作对整个job的StateBacken性能有一定的要求比如RockDBBackendMinniBatch会在算子的内存中先把数据先存起来等达到一定条件后做触发处理触发前会进行本地局部聚合这样只在需要聚合这批数据的时候才去访问和写入一次状态数据。显然这种方式保证了吞吐量但损失了实时性这就需要开发者自己权衡了 具体实现如下
//3.开启minibatch
//通过flink configuration进行参数设置
TableConfig configuration tableEnv.getConfig();
//开启MiniBatch 优化默认false
configuration.set(table.exec.mini-batch.enabled, true);
//设置5秒时间处理缓冲数据,默认0s
configuration.set(table.exec.mini-batch.allow-latency, 5 s);
//设置每个聚合操作可以缓冲的最大记录数,默认-1开启MiniBatch后必须设置为正值
configuration.set(table.exec.mini-batch.size, 5000);7.3 使用Local-Global优化聚合
Local-global操作类似于MapReduce中的Combine操作在聚合之前会在上游的算子中先进行本地局部聚合把聚合之后的结果往下游发送这样既减少了Shuffle操作时网络拉取的数据量也较少了对状态后端的操作次数当然了这个优化方式是依赖上面提到的MiniBatch优化的要先开启MiniBatch操作才行 具体实现如下
//2.创建TableEnv
StreamTableEnvironment tableEnv StreamTableEnvironment.create(env);//3.开启Local-Global 聚合
//通过flink configuration进行参数设置
TableConfig configuration tableEnv.getConfig();
//开启MiniBatch 优化默认false
configuration.set(table.exec.mini-batch.enabled, true);
//设置5秒时间处理缓冲数据,默认0s
configuration.set(table.exec.mini-batch.allow-latency, 5 s);
//设置每个聚合操作可以缓冲的最大记录数,默认-1开启MiniBatch后必须设置为正值
configuration.set(table.exec.mini-batch.size, 5000);
//设置Local-Global 聚合
configuration.set(table.optimizer.agg-phase-strategy, TWO_PHASE);Local-Global能有效的解决常规的数据倾斜问题 7.4 拆分distinct聚合
distinct是针对去重的聚合操作如果distinct key的值分布稀疏且数据倾斜用Local-Global的优化方式性能提升并不明显因为每个并行可能仍然包含几乎所有的原始记录并且全局聚合将成为瓶颈。这种情况就可以使用distinct聚合来优化其原理如下 它会把聚合分为两个阶段第一个阶段由bucket key组成的key进行group by操作bucket key是使用 hash_code(key)%bucket_num计算所得相当于是给数据加随机前缀进行聚合。第二阶段是由原始的group key进行Shuffle并使用sum聚合来自不同的buckets的值相当于把sql -- 原始sql
SELECT day, COUNT(DISTINCT user_id)
FROM T
GROUP BY day-- 语句改写成如下形式
SELECT day, SUM(cnt)
FROM (
SELECT day, COUNT(DISTINCT user_id) as cnt
FROM T
GROUP BY day, MOD(HASH_CODE(user_id), 1024)
)GROUP BY day7.5 用Filter替代Case when操作
如下的sql是统计总的UVandroid和iPhone的UVwep和other的UV在Flink中三个查询指标都是针对user_id进行去重统计每个查询指标都会维护一个状态实例这样会导致Flink维护状态实例过大
SELECTday,COUNT(DISTINCT user_id) AS total_uv,COUNT(DISTINCT CASE WHEN flag IN (android, iphone) THEN user_id ELSE NULL END) AS app_uv,COUNT(DISTINCT CASE WHEN flag IN (wap, other) THEN user_id ELSE NULL END) AS web_uv
FROM T
GROUP BY day如果修改成用Filter的方式Flink Sql的优化器可以识别到相distinct key上不同过滤器参数如上三个count distinct 都在user_id一列上Flink可以只使用一个共享状态实例而不是三个状态实例
SELECTday,COUNT(DISTINCT user_id) AS total_uv,COUNT(DISTINCT user_id) FILTER (WHERE flag IN (android, iphone)) AS app_uv,COUNT(DISTINCT user_id) FILTER (WHERE flag IN (wap, other)) AS web_uv
FROM T
GROUP BY day可以在webui中对比修改前后checkpoint的大小可以明显的看到filter的方式会少很多