网站对于企业的意义,比较好看的wordpress主题,顺义网站建设报价,100m的网站 数据库1.滚动窗⼝#xff08;TUMBLE#xff09; 
**滚动窗⼝定义#xff1a;**滚动窗⼝将每个元素指定给指定窗⼝⼤⼩的窗⼝#xff0c;滚动窗⼝具有固定⼤⼩#xff0c;且不重叠。 
例如#xff0c;指定⼀个⼤⼩为 5 分钟的滚动窗⼝#xff0c;Flink 将每隔 5 分钟开启⼀个新…1.滚动窗⼝TUMBLE 
**滚动窗⼝定义**滚动窗⼝将每个元素指定给指定窗⼝⼤⼩的窗⼝滚动窗⼝具有固定⼤⼩且不重叠。 
例如指定⼀个⼤⼩为 5 分钟的滚动窗⼝Flink 将每隔 5 分钟开启⼀个新的窗⼝其中每⼀条数都会划分到唯⼀⼀个 5 分钟的窗⼝中。 **应⽤场景**按照⼀分钟对数据进⾏聚合计算⼀分钟内 PVUV 数据。 
**实际案例**分维度分钟级别统计在线⽤户数、总销售额。 
滚动窗⼝在 1.13 版本之前和 1.13 及版本之后有两种 Flink SQL 实现⽅式 
Group Window Aggregation1.13 之前和 Windowing TVF1.13 及之后 
Group Window Aggregation ⽅案⽀持 Batch\Streaming 任务 
-- 数据源表
CREATE TABLE source_table (-- 维度数据dim STRING,-- ⽤户 iduser_id BIGINT,-- ⽤户price BIGINT,-- 事件时间戳row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),-- watermark 设置WATERMARK FOR row_time AS row_time - INTERVAL 5 SECOND
) WITH (connector  datagen,rows-per-second  10,fields.dim.length  1,fields.user_id.min  1,fields.user_id.max  100000,fields.price.min  1,fields.price.max  100000
)-- 数据汇表
CREATE TABLE sink_table (dim STRING,pv BIGINT,sum_price BIGINT,max_price BIGINT,min_price BIGINT,uv BIGINT,window_start bigint
) WITH (connector  print
)-- 数据处理逻辑
insert into sink_table
selectdim,count(*) as pv,sum(price) as sum_price,max(price) as max_price,min(price) as min_price,-- 计算 uv 数count(distinct user_id) as uv,UNIX_TIMESTAMP(CAST(tumble_start(row_time, interval 1 minute) AS STRING)) * 10
from source_table
group bydim,tumble(row_time, interval 1 minute)Group Window Aggregation 滚动窗⼝的 SQL 语法把 tumble window 的声明写在了 group by ⼦句中即 tumble(row_time, interval ‘1’ minute) 第⼀个参数为事件时间的时间戳第⼆个参数为滚动窗⼝⼤⼩。 
Window TVF ⽅案1.13 只⽀持 Streaming 任务 
-- 数据源表
CREATE TABLE source_table (-- 维度数据dim STRING,-- ⽤户 iduser_id BIGINT,-- ⽤户price BIGINT,-- 事件时间戳row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),-- watermark 设置WATERMARK FOR row_time AS row_time - INTERVAL 5 SECOND
) WITH (connector  datagen,rows-per-second  10,fields.dim.length  1,fields.user_id.min  1,fields.user_id.max  100000,fields.price.min  1,fields.price.max  100000
)-- 数据汇表
CREATE TABLE sink_table (dim STRING,pv BIGINT,sum_price BIGINT,max_price BIGINT,min_price BIGINT,uv BIGINT,window_start bigint
) WITH (connector  print
)-- 数据处理逻辑
insert into sink_table
SELECTdim,UNIX_TIMESTAMP(CAST(window_start AS STRING)) * 1000 as window_start,count(*) as pv,sum(price) as sum_price,max(price) as max_price,min(price) as min_price,count(distinct user_id) as uv
FROM TABLE(TUMBLE(TABLE source_table, DESCRIPTOR(row_time), INTERVAL 60 SECOND))
GROUP BY window_start, window_end,dimWindowing TVF 滚动窗⼝的写法把 tumble window 的声明写在了数据源的 Table ⼦句中包含三部分参数 
TABLE(
TUMBLE(TABLE source_table, DESCRIPTOR(row_time), INTERVAL 60 SECOND)
) 第⼀个参数 TABLE source_table 声明数据源表 
第⼆个参数 DESCRIPTOR(row_time) 声明数据源的时间戳字段 
第三个参数 INTERVAL ‘60’ SECOND 声明滚动窗⼝⼤⼩为 1 min。 
实时场景 SQL 语义 假设 Orders 为 kafkatarget_table 也为 Kafka这个 SQL ⽣成的实时任务在执⾏时会⽣成三个算⼦。 
数据源算⼦From Order 
连接到 Kafka topic数据源算⼦⼀直运⾏实时的从 Order Kafka 中⼀条⼀条的读取数据然后⼀条⼀条发送给下游的 窗⼝聚合算⼦ 
窗⼝聚合算⼦TUMBLE 算⼦ 
接收到上游算⼦发的⼀条⼀条的数据然后将每⼀条数据按照时间戳划分到对应的窗⼝中根据事件时间、处理时间的不同语义进⾏划分上述案例为事件时间事件时间中滚动窗⼝算⼦接收到上游的 Watermark ⼤于窗⼝的结束时间时则说明当前这⼀分钟的滚动窗⼝已经结束了将窗⼝计算完的结果发往下游算⼦⼀条⼀条发给下游 数据汇算⼦  
数据汇算⼦INSERT INTO target_table 
接收到上游发的⼀条⼀条的数据写⼊到 target_table Kafka 中 
注意 事件时间中滚动窗⼝的窗⼝计算触发是由 Watermark 推动的。 
2.滑动窗⼝HOP 
**滑动窗⼝定义**滑动窗⼝是将元素指定给固定⻓度的窗⼝与滚动窗⼝功能⼀样也有窗⼝⼤⼩的概念不⼀样的地⽅在于滑动窗⼝有另⼀个参数控制窗⼝计算的频率滑动窗⼝滑动的步⻓如果滑动的步⻓⼩于窗⼝⼤⼩则滑动窗⼝之间每个窗⼝是可以重叠在这种情况下⼀条数据就会分配到多个窗⼝当中。 
**举例**有 10 分钟⼤⼩的窗⼝滑动步⻓为 5 分钟每 5 分钟会划分⼀次窗⼝这个窗⼝包含的数据是过去 10 分钟内的数据。 **应⽤场景**计算同时在线的数据要求结果的输出频率是 1 分钟⼀次每次计算的数据是过去 5 分钟的数据有的场景下⽤户可能在线但是可能会 2 分钟不活跃但是这也要算在同时在线数据中所以取最近 5 分钟的数据就能计算进去了 
**实际案例**分维度分钟级别同时在线⽤户数1 分钟输出⼀次计算最近 5 分钟的数据Group Window Aggregation、Windowing TVF 两种⽅案 
Group Window Aggregation ⽅案⽀持 Batch\Streaming 任务 
CREATE TABLE source_table (-- 维度数据dim STRING,-- ⽤户 iduser_id BIGINT,-- ⽤户price BIGINT,-- 事件时间戳row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),-- watermark 设置WATERMARK FOR row_time AS row_time - INTERVAL 5 SECOND
) WITH (connector  datagen,rows-per-second  10,fields.dim.length  1,fields.user_id.min  1,fields.user_id.max  100000,fields.price.min  1,fields.price.max  100000
);-- 数据汇表
CREATE TABLE sink_table (dim STRING,uv BIGINT,window_start bigint
) WITH (connector  print
);-- 数据处理逻辑
insert into sink_table
SELECT dim,
UNIX_TIMESTAMP(CAST(hop_start(row_time, interval 1 minute, interval 5 minute) AS STRING)) * 10,
count(distinct user_id) as uv
FROM source_table
GROUP BY dim, hop(row_time, interval 1 minute, interval 5 minute)Group Window Aggregation 滚动窗⼝的写法把 hop window 的声明写在了 group by ⼦句中即 
hop(row_time, interval 1 minute, interval 5 minute) 第⼀个参数为事件时间的时间戳字段 
第⼆个参数为滑动窗⼝的滑动步⻓ 
第三个参数为滑动窗⼝⼤⼩。 
Windowing TVF ⽅案1.13 只⽀持 Streaming 任务 
-- 数据源表
CREATE TABLE source_table (-- 维度数据dim STRING,-- ⽤户 iduser_id BIGINT,-- ⽤户price BIGINT,-- 事件时间戳row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),-- watermark 设置WATERMARK FOR row_time AS row_time - INTERVAL 5 SECOND
) WITH (connector  datagen,rows-per-second  10,fields.dim.length  1,fields.user_id.min  1,fields.user_id.max  100000,fields.price.min  1,fields.price.max  100000
);-- 数据汇表
CREATE TABLE sink_table (dim STRING,uv BIGINT,window_start bigint
) WITH (connector  print
);-- 数据处理逻辑
insert into sink_table
SELECTdim,UNIX_TIMESTAMP(CAST(window_start AS STRING)) * 1000 as window_start, count(distinct user_id) as bucket_uv
FROM TABLE(HOP(TABLE source_table, DESCRIPTOR(row_time), INTERVAL 1 MINUTES, INTERVAL 5 MINUTES))
GROUP BY window_start, window_end,dimWindowing TVF 滑动窗⼝的写法把 hop window 的声明写在了数据源的 Table ⼦句中即 
TABLE(HOP(TABLE source_table, DESCRIPTOR(row_time), INTERVAL 1 MINUTES, INTERVAL 5 MINUTES))第⼀个参数 TABLE source_table 声明数据源表 
第⼆个参数 DESCRIPTOR(row_time) 声明数据源的时间戳 
第三个参数 INTERVAL ‘1’ MINUTES 声明滚动窗⼝滑动步⻓⼤⼩为 1 min。 
第四个参数 INTERVAL ‘5’ MINUTES 声明滚动窗⼝⼤⼩为 5 min。 
3.Session 窗⼝ 
**Session 窗⼝定义**Session 时间窗⼝和滚动、滑动窗⼝不⼀样其没有固定的持续时间如果在定义的间隔期Session Gap内没有新的数据出现则 Session 就会窗⼝关闭。 **实际案例**计算每个⽤户在活跃期间⼀个 Session总共购买的商品数量如果⽤户 5 分钟没有活动则视为 Session 断开 
⽬前 1.13 版本中 Flink SQL 不⽀持 Session 窗⼝的 Window TVF只介绍 Group Window Aggregation ⽅案。 
Group Window Aggregation ⽅案⽀持 Batch\Streaming 任务 
-- 数据源表⽤户购买⾏为记录表
CREATE TABLE source_table (-- 维度数据dim STRING,-- ⽤户 iduser_id BIGINT,-- ⽤户price BIGINT,-- 事件时间戳row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),-- watermark 设置WATERMARK FOR row_time AS row_time - INTERVAL 5 SECOND
) WITH (connector  datagen,rows-per-second  10,fields.dim.length  1,fields.user_id.min  1,fields.user_id.max  100000,fields.price.min  1,fields.price.max  100000
);-- 数据汇表
CREATE TABLE sink_table (dim STRING,pv BIGINT, -- 购买商品数量window_start bigint
) WITH (connector  print
);-- 数据处理逻辑
insert into sink_table
SELECTdim,UNIX_TIMESTAMP(CAST(session_start(row_time, interval 5 minute) AS STRING)) * 10,count(1) as pv
FROM source_table
GROUP BY dim, session(row_time, interval 5 minute)**注意**上述 SQL 任务是在整个 Session 窗⼝结束之后才会把数据输出Session 窗⼝⽀持 处理时间 和 事件时间但是处理时间只⽀持在 Streaming 任务中运⾏Batch 任务不⽀持。 
Group Window Aggregation 中 Session 窗⼝的写法把 session window 的声明写在了 group by ⼦句中 
session(row_time, interval 5 minute)第⼀个参数为事件时间的时间戳 
第⼆个参数为 Session gap 间隔。 
4.渐进式窗⼝CUMULATE 
**渐进式窗⼝定义1.13 只⽀持 Streaming 任务**渐进式窗⼝可以认为是⾸先开⼀个最⼤窗⼝⼤⼩的滚动窗⼝然后根据⽤户设置的触发的时间间隔将这个滚动窗⼝拆分为多个窗⼝这些窗⼝具有相同的窗⼝起点和不同的窗⼝终点。 
**示例**从每⽇零点到当前这⼀分钟绘制累积 UV其中 10:00 时的 UV 表示从 00:00 到 10:00 的 UV 总数。 **应⽤场景**周期内累计 PVUV 指标如每天累计到当前这⼀分钟的 PVUV这类指标是⼀段周期内的累计状态。 
**实际案例**每天的截⽌当前分钟的累计 moneysum(money)去重 id 数count(distinct id)每天代表渐进式窗⼝⼤⼩为 1 天分钟代表渐进式窗⼝移动步⻓为分钟级别。 
明细输⼊数据 预期经过渐进式窗⼝计算的输出数据 **特点**每⼀分钟的输出结果都是当天零点累计到当前的结果渐进式窗⼝只有 Windowing TVF ⽅案⽀持。 
Windowing TVF ⽅案1.13 只⽀持 Streaming 任务 
-- 数据源表
CREATE TABLE source_table (-- ⽤户 iduser_id BIGINT,-- ⽤户money BIGINT,-- 事件时间戳row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),-- watermark 设置WATERMARK FOR row_time AS row_time - INTERVAL 5 SECOND
) WITH (connector  datagen,rows-per-second  10,fields.user_id.min  1,fields.user_id.max  100000,fields.money.min  1,fields.money.max  100000
);-- 数据汇表
CREATE TABLE sink_table (window_end bigint,window_start bigint,sum_money BIGINT,count_distinct_id bigint
) WITH (connector  print
);-- 数据处理逻辑
insert into sink_table
SELECTUNIX_TIMESTAMP(CAST(window_end AS STRING)) * 1000 as window_end, window_start, sum(money) as sum_money,count(distinct user_id) as count_distinct_id
FROM TABLE(CUMULATE(TABLE source_table, DESCRIPTOR(row_time), INTERVAL 60 SECOND, INTERVAL 1 DAY))
GROUP BYwindow_start, window_endWindowing TVF 滚动窗⼝的写法把 cumulate window 的声明写在了数据源的 Table ⼦句中 
TABLE(CUMULATE(TABLE source_table,DESCRIPTOR(row_time),INTERVAL 60 SECOND, INTERVAL 1 DAY)
) 第⼀个参数 TABLE source_table 声明数据源表 
第⼆个参数 DESCRIPTOR(row_time) 声明数据源的时间戳 
第三个参数 INTERVAL ‘60’ SECOND 声明渐进式窗⼝触发的渐进步⻓为 1 min。 
第四个参数 INTERVAL ‘1’ DAY 声明整个渐进式窗⼝的⼤⼩为 1 天到了第⼆天新开⼀个窗⼝重新累计。 
5.Window TVF ⽀持 Grouping Sets、Rollup、Cube 
**应⽤场景**多个维度组合cube计算把每个维度写⼀遍 union all 起来麻烦⽽且会导致⼀个数据源读取多遍。 
⽤ Grouping Sets 将维度组合写在⼀条 SQL 中⽅便且执⾏效率⾼⽬前 Grouping Sets 只在 Window TVF 中⽀持不⽀持 Group Window Aggregation。 
**示例**计算每⽇零点累计到当前这⼀分钟的分汇总、age、sex、agesex 维度的⽤户数。 
-- ⽤户访问明细表
CREATE TABLE source_table (age STRING,sex STRING,user_id BIGINT,row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),WATERMARK FOR row_time AS row_time - INTERVAL 5 SECOND
) WITH (connector  datagen,rows-per-second  1,fields.age.length  1,fields.sex.length  1,fields.user_id.min  1,fields.user_id.max  100000
);CREATE TABLE sink_table (age STRING,sex STRING,uv BIGINT,window_end bigint
) WITH (connector  print
);insert into sink_table
SELECTUNIX_TIMESTAMP(CAST(window_end AS STRING)) * 1000 as window_end,if (age is null, ALL, age) as age,if (sex is null, ALL, sex) as sex,count(distinct user_id) as bucket_uv
FROM TABLE(CUMULATE(TABLE source_table, DESCRIPTOR(row_time), INTERVAL 5 SECOND, INTERVAL 1 DAY))
GROUP BYwindow_start, window_end,-- grouping sets 写法GROUPING SETS ((), (age), (sex), (age, sex))Flink SQL 中 Grouping Sets 的语法和 Hive SQL 的语法有不同使⽤ Hive SQL 实现上述 SQL 的语义实现如下 
insert into sink_table
SELECTUNIX_TIMESTAMP(CAST(window_end AS STRING)) * 1000 as window_end, if (age is null, ALL, age) as age,if (sex is null, ALL, sex) as sex,count(distinct user_id) as bucket_uv
FROM source_table
GROUP BYage, sex
-- hive sql grouping sets 写法
GROUPING SETS ((), (age), (sex), (age, sex)
)