当前位置: 首页 > news >正文

小地方做外卖网站怎样白帽seo

小地方做外卖网站怎样,白帽seo,设计房子装修的软件,内容营销和传统营销的区别前言 今天的任务是完成流量域最后一个需求、用户域的两个需求以及交易域的部分需求#xff1b; 1、流量域页面浏览各窗口汇总表 任务#xff1a;从 Kafka 页面日志主题读取数据#xff0c;统计当日的首页和商品详情页独立访客数。 注意#xff1a;一般我们谈到访客…前言 今天的任务是完成流量域最后一个需求、用户域的两个需求以及交易域的部分需求 1、流量域页面浏览各窗口汇总表 任务从 Kafka 页面日志主题读取数据统计当日的首页和商品详情页独立访客数。 注意一般我们谈到访客指的是 mid而用户才是 uid 1.1、思路 消费 Kafka dwd_traffic_page_log过滤出 page_id home 或 good_detail 的数据按照 mid 分组使用状态编程为每个 mid 维护两个状态首页的末次访问日期商品详情页的末次访问日期 每新来一条数据就判断它的两个状态是否为 null 如果为 null则给状态赋值如果不为 null则不做操作当两个状态中有一个不为 null 时发送数据到下游开窗聚合实时计算更新报表这里开窗用的是 windowAll() 因为上一步发送下来的数据已经不再是键控流了写出到 clickhouse 1.2、实现 1.2.1、创建 ck 表并创建 Java Bean 首先创建 ck 表结构和前面的表一样主要的字段就是维度 度量值 这里没有粒度因为我们统计的是一个宏观的统计结果信息到 ADS 都不用加工这里的 stt 和 edt 依然是作为 ck 表的 order by 字段防止数据重复ts 字段作为 ck 的版本字段这里 order by 字段取窗口起止时间因为窗口是基于事件时间的所以不用担心任务挂了之后重复消费造成数据重复的问题ck 会自动根据 order by 字段进行去重 create table if not exists dws_traffic_page_view_window (stt DateTime,edt DateTime,home_uv_ct UInt64,good_detail_uv_ct UInt64,ts UInt64 ) engine ReplacingMergeTree(ts)partition by toYYYYMMDD(stt)order by (stt, edt); 创建 ck 表对应的 JavaBean Data AllArgsConstructor public class TrafficHomeDetailPageViewBean {// 窗口起始时间String stt;// 窗口结束时间String edt;// 首页独立访客数Long homeUvCt;// 商品详情页独立访客数Long goodDetailUvCt;// 时间戳Long ts; } 1.2.2、读取页面日志并过滤出首页与商品详情页 这里我们不仅要过滤还希望尽量顺便把数据转换为 JSONObject 格式所以选用 flatMap 最为合适 过滤出 page_id 为 home 或者 good_detail 的数据 // TODO 3. 读取 dwd_traffic_page_log 的数据String groupId dws_traffic_page_view_window;DataStreamSourceString pageLog env.addSource(MyKafkaUtil.getFlinkKafkaConsumer(dwd_traffic_page_log, groupId));// TODO 4. 转为 json 并过滤出首页和商品详情页SingleOutputStreamOperatorJSONObject filterDS pageLog.flatMap(new FlatMapFunctionString, JSONObject() {Overridepublic void flatMap(String value, CollectorJSONObject out) throws Exception {JSONObject jsonObject JSON.parseObject(value);String page_id jsonObject.getJSONObject(page).getString(page_id);if (page_id ! null) {if (page_id.equals(home) || page_id.equals(good_detail)) {out.collect(jsonObject);}}}}); 1.2.3、提取事件时间并生成水位线 // TODO 5. 提取事件时间生成水位线filterDS.assignTimestampsAndWatermarks(WatermarkStrategy.JSONObjectforBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner(new SerializableTimestampAssignerJSONObject() {Overridepublic long extractTimestamp(JSONObject element, long recordTimestamp) {return element.getLong(ts);}})); 1.2.4、状态编程过滤出独立访客 这里使用富函数的 flatMap因为富函数中才有 open在 open 方法中初始化状态、close等方法以及获取上下文对象通过上下文对象给状态描述器设置ttl并初始化等高级操作 这里 flatMap 的输出类型我们设置为之前写好的 ck 表对应的 JavaBean 方便直接插入到 ck中 这里我们同样可以给状态设置一个 TTL 防止长时间访客未访问状态存储浪费这里两个状态任意一个不为 null 即可输出 // TODO 6. 状态编程按照mid分组过滤出独立访客KeyedStreamJSONObject, String keyedStream filterDS.keyBy(json - json.getJSONObject(common).getString(mid));SingleOutputStreamOperatorTrafficHomeDetailPageViewBean trafficHomeDetailDS keyedStream.flatMap(new RichFlatMapFunctionJSONObject, TrafficHomeDetailPageViewBean() {private ValueStateString homeLastVisit;private ValueStateString detailLastVisit;Overridepublic void open(Configuration parameters) throws Exception {StateTtlConfig ttlConfig new StateTtlConfig.Builder(Time.days(1)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).build();ValueStateDescriptorString homeStateDescriptor new ValueStateDescriptor(home-state, String.class);ValueStateDescriptorString detailStateDescriptor new ValueStateDescriptor(detail-state, String.class);// 设置 TTLhomeStateDescriptor.enableTimeToLive(ttlConfig);detailStateDescriptor.enableTimeToLive(ttlConfig);homeLastVisit getRuntimeContext().getState(homeStateDescriptor);detailLastVisit getRuntimeContext().getState(detailStateDescriptor);}Overridepublic void flatMap(JSONObject value, CollectorTrafficHomeDetailPageViewBean out) throws Exception {// 获取状态数据以及当前数据中的日期String curDt DateFormatUtil.toDate(value.getLong(ts));String homeLastDt homeLastVisit.value();String detailLastDt detailLastVisit.value();long homeUvCt 0;long goodDetailUvCt 0;if (homeLastDt null || !homeLastDt.equals(curDt)) {homeUvCt 1;homeLastVisit.update(curDt);}if (detailLastDt null || !detailLastDt.equals(curDt)) {goodDetailUvCt 1;detailLastVisit.update(curDt);}if (homeUvCt 1 || goodDetailUvCt 1) {out.collect(new TrafficHomeDetailPageViewBean(, ,homeUvCt,goodDetailUvCt,value.getLong(ts)));}}});1.2.5、开窗聚合并写入到 clickhouse 这里的窗口函数依旧是先用增量聚合函数再用全量聚合函数获得窗口信息 注意这里的 ts 字段是 clickhouse 表数据的版本字段取系统时间即可 // TODO 7. 开窗windowAll聚合聚合SingleOutputStreamOperatorTrafficHomeDetailPageViewBean resultDS trafficHomeDetailDS.windowAll(TumblingEventTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.seconds(10))).reduce(new ReduceFunctionTrafficHomeDetailPageViewBean() {Overridepublic TrafficHomeDetailPageViewBean reduce(TrafficHomeDetailPageViewBean value1, TrafficHomeDetailPageViewBean value2) throws Exception {value1.setHomeUvCt(value1.getHomeUvCt() value2.getHomeUvCt());value1.setGoodDetailUvCt(value1.getGoodDetailUvCt() value2.getGoodDetailUvCt());return value1;}}, new AllWindowFunctionTrafficHomeDetailPageViewBean, TrafficHomeDetailPageViewBean, TimeWindow() {Overridepublic void apply(TimeWindow window, IterableTrafficHomeDetailPageViewBean values, CollectorTrafficHomeDetailPageViewBean out) throws Exception {TrafficHomeDetailPageViewBean next values.iterator().next();next.setTs(System.currentTimeMillis());next.setStt(DateFormatUtil.toYmdHms(window.getStart()));next.setEdt(DateFormatUtil.toYmdHms(window.getEnd()));out.collect(next);}});// TODO 8. 写入到 clickhouseresultDS.addSink(ClickHouseUtil.getSinkFunction(insert into dws_traffic_page_view_window values(?,?,?,?,?)));// TODO 9. 启动任务env.execute(DwsTrafficPageViewWindow); 2、用户域用户登陆各窗口汇总表 任务从 Kafka 页面日志主题读取数据统计七日回流用户和当日独立用户数。 当日独立用户数很好求和上面差不多也是使用状态编程对 uid 保存状态去重即可。接下来我们主要分析七日回流用户怎么求 2.1、思路分析 回流用户定义之前的活跃用户一段时间未活跃流失今日又活跃了。这里要求统计回流用户总数规定当日登陆且自上次登陆之后至少 7 日未登录的用户为回流用户。 1、消费页面浏览主题dwd_traffic_page_log登录用户过滤 用户打开应用自动登录cookie uid ! null last_page_id null 后面这个条件可以过滤掉没必要的数据用户在登录页登录 uid ! null last_page_id login 2、设置水位线、uid 分组之后进行状态编程 判断 lastLoginDt 是否为 null null是今天的独立用户但不是回流用户!null 判断和今天是否相同 相同丢弃不同是今天的独立用户再判断今天-lastLoginDt 8是回流用户不是 2.2、代码实现 2.2.1、创建 ck 表并创建对应 JavaBean 这张表依然没有粒度直接就是统计结果我们去重的字段依然是窗口的起止时间  create table if not exists dws_user_user_login_window (stt DateTime,edt DateTime, back_ct UInt64,uu_ct UInt64,ts UInt64 ) engine ReplacingMergeTree(ts)partition by toYYYYMMDD(stt)order by (stt, edt); import lombok.AllArgsConstructor; import lombok.Data;Data AllArgsConstructor public class UserLoginBean {// 窗口起始时间String stt;// 窗口终止时间String edt;// 回流用户数Long backCt;// 独立用户数Long uuCt;// 时间戳Long ts; } 2.2.2、 消费 dwd_traffic_page_log 主题 // TODO 3. 读取 dwd_traffic_page_log 的数据String groupId dws_user_user_login_window;DataStreamSourceString pageLog env.addSource(MyKafkaUtil.getFlinkKafkaConsumer(dwd_traffic_page_log, groupId));2.2.3、转换数据流为 JSON 格式并过滤出独立用户 // TODO 4. 转换为 json 格式 过滤出独立用户(uid!null last_page_idnull 或者 uid!null last_page_idlogin)SingleOutputStreamOperatorJSONObject filterDS pageLog.flatMap(new RichFlatMapFunctionString, JSONObject() {Overridepublic void flatMap(String value, CollectorJSONObject out) throws Exception {JSONObject jsonObject JSONObject.parseObject(value);String uid jsonObject.getJSONObject(common).getString(uid);String lastPageId jsonObject.getJSONObject(page).getString(last_page_id);if (uid ! null) {if (lastPageId null || lastPageId.equals(login)) {out.collect(jsonObject);}}}}); 2.2.4、提取事件时间生成水位线 // TODO 5. 提取事件时间生成水位线filterDS.assignTimestampsAndWatermarks(WatermarkStrategy.JSONObjectforBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner(new SerializableTimestampAssignerJSONObject() {Overridepublic long extractTimestamp(JSONObject element, long recordTimestamp) {return element.getLong(ts);}})); 2.2.5、使用状态编程过滤出独立用户 // TODO 6. 状态编程过滤出独立用户KeyedStreamJSONObject, String keyedStream filterDS.keyBy(json - json.getJSONObject(common).getString(uid));SingleOutputStreamOperatorUserLoginBean userLoginDS keyedStream.flatMap(new RichFlatMapFunctionJSONObject, UserLoginBean() {private ValueStateString lastLoginDtState;Overridepublic void open(Configuration parameters) throws Exception {StateTtlConfig ttlConfig new StateTtlConfig.Builder(Time.days(7)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).build();ValueStateDescriptorString lastLoginStateDescriptor new ValueStateDescriptorString(last-login, String.class);lastLoginStateDescriptor.enableTimeToLive(ttlConfig);lastLoginDtState getIterationRuntimeContext().getState(lastLoginStateDescriptor);}Overridepublic void flatMap(JSONObject value, CollectorUserLoginBean out) throws Exception {// 本次登录日期Long curTs value.getLong(ts);String curDt DateFormatUtil.toDate(curTs);// 上次登录日期String lastLoginDt lastLoginDtState.value();long uuCt 0L;long backCt 0L;if (lastLoginDt null) {uuCt 1;lastLoginDtState.update(curDt);} else if (!lastLoginDt.equals(curDt)) {uuCt 1;lastLoginDtState.update(curDt);// 判断相差是否 8 天Long lastTs DateFormatUtil.toTs(lastLoginDt);long days (curTs - lastTs) / 1000 / 3600 / 24;backCt days 8 ? 1 : 0;}if (uuCt ! 0) {out.collect(new UserLoginBean(, , backCt, uuCt, curTs));}}});2.2.6、窗口聚合 和上一个需求一样增量聚合函数和全量聚合函数配合着使用 // TODO 6. 窗口聚合SingleOutputStreamOperatorUserLoginBean resultDS userLoginDS.windowAll(TumblingEventTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.seconds(10))).reduce((record1, record2) - {record1.setUuCt(record1.getUuCt() record2.getUuCt());record2.setBackCt(record1.getBackCt() record2.getBackCt());return record1;}, new AllWindowFunctionUserLoginBean, UserLoginBean, TimeWindow() {Overridepublic void apply(TimeWindow window, IterableUserLoginBean values, CollectorUserLoginBean out) throws Exception {UserLoginBean next values.iterator().next();next.setStt(DateFormatUtil.toYmdHms(window.getStart()));next.setEdt(DateFormatUtil.toYmdHms(window.getEnd()));next.setTs(System.currentTimeMillis());out.collect(next);}}); 2.2.7、写出到 clickhouse  // TODO 7. 写入到 clickhouseresultDS.addSink(ClickHouseUtil.getSinkFunction(insert into dws_user_user_login_window values(?,?,?,?,?)));// TODO 8. 启动任务env.execute(DwsUserUserLoginWindow); 3、用户域用户注册各窗口汇总表 任务从 DWD 层用户注册表中读取数据统计各窗口注册用户数写入 ClickHouse。 这个需求比较简单因为我们之前在 DWD 层已经创建了用户注册事务事实表包含字段user_iddate_idcreate_timets 3.1、代码实现 这里教程中用的是 DataStream API 但是我这里想用 Flink SQL 实现 3.1.1、创建 dwd_user_register 表并生成水位线 注意当原表中有更贴近事件时间的字段时我们就尽量少用 Maxwell 的 ts 字段 // TODO 3. 消费 Kafka dwd_user_register 主题(生成水位线)String groupId dws_user_user_register_window;tableEnv.executeSql(CREATE TABLE dwd_user_register user_id string, date_id string, create_time string, ts string time_ltz AS TO_TIMESTAMP(FROM_UNIXTIME(create_time/1000)), WATERMARK FOR time_ltz AS time_ltz - INTERVAL 2 SECOND ) MyKafkaUtil.getKafkaDDL(dwd_user_register,groupId)); 3.1.2、分组开窗聚合 用 Flink SQL 实现就简单多了这里的聚合逻辑更简单直接 count(*) // TODO 4. 分组,开窗,聚合Table resultTable tableEnv.sqlQuery(SELECT date_format(tumble_start(time_ltz,interval 10 second),yyyy-MM-dd HH:mm:ss) stt, date_format(tumble_end(time_ltz,interval 10 second),yyyy-MM-dd HH:mm:ss) edt, count(*) register_ct, unix_timestamp() ts FROM dwd_user_register GROUP BY tumble(time_ltz,interval 10 second));tableEnv.createTemporaryView(result_table,resultTable); 3.1.3、创建 ck 表及其 Bean create table if not exists dws_user_user_register_window (stt DateTime,edt DateTime,register_ct UInt64,ts UInt64 ) engine ReplacingMergeTree(ts)partition by toYYYYMMDD(stt)order by (stt, edt); 这里需要把动态表转为流所以我们需要创建一个 Java Bean对应上 ck 表的每个字段 Data AllArgsConstructor public class UserRegisterBean {// 窗口起始时间String stt;// 窗口终止时间String edt;// 注册用户数Long registerCt;// 时间戳Long ts; } 3.1.4、将动态表转为流并写入到 clickhouse // TODO 5. 将动态表转为流并写入到 clickhouseDataStreamUserRegisterBean dataStream tableEnv.toAppendStream(resultTable, UserRegisterBean.class);dataStream.addSink(ClickHouseUtil.getSinkFunction(insert into dws_user_user_register_window values (?,?,?,?)));// TODO 6. 启动任务env.execute(DwsUserUserRegisterWindow); 4、交易域加购各窗口汇总表 任务从 Kafka 读取用户加购明细数据统计每日各窗口加购独立用户数写入 ClickHouse。 4.1、思路分析 思路很简单还是根据 uid 进行 keyby然后使用状态编程维护一个 lastCartAddDate对数据进行判断 如果 lastCartAddDate null 写入状态如果 lastCartAddDate ! null 如果 lastCartAddDate ! curDate 更新状态否则丢弃 4.2、代码实现 这里不多介绍和前面的逻辑都是一样的只说明部分点 我们在生成水位线的时候应该尽可能的生成贴近事件时间的而这里对于加购操作来说它有两种情况 insert就是加购会影响的到 create_time 字段update可能是加购会影响到 operate_time 字段我们在 DWD 层已经过滤过了只要 sku_num 变大就是加购所以这里我们的水位线可以取 operate_time 字段取不到再取 create_time // TODO 3. 读取 dwd_traffic_card_add 的数据String groupId dws_trade_cart_add_uu_window;DataStreamSourceString cartAddLog env.addSource(MyKafkaUtil.getFlinkKafkaConsumer(dwd_trade_cart_add, groupId));//TODO 4. 转为 json 格式并SingleOutputStreamOperatorJSONObject jsonDS cartAddLog.map(JSONObject::parseObject);// TODO 5. 提取事件时间生成水位线jsonDS.assignTimestampsAndWatermarks(WatermarkStrategy.JSONObjectforBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner(new SerializableTimestampAssignerJSONObject() {Overridepublic long extractTimestamp(JSONObject element, long recordTimestamp) {String operate_time element.getString(operate_time);if (operate_time ! null){return DateFormatUtil.toTs(operate_time,true);}return DateFormatUtil.toTs(element.getString(create_time));}}));// TODO 6. 按照用户id进行分组 过滤出独立用户KeyedStreamJSONObject, String keyedStream jsonDS.keyBy(json - json.getJSONObject(common).getString(uid));SingleOutputStreamOperatorCartAddUuBean filterDS keyedStream.flatMap(new RichFlatMapFunctionJSONObject, CartAddUuBean() {private ValueStateString lastCartAddDateState;Overridepublic void open(Configuration parameters) throws Exception {StateTtlConfig ttlConfig new StateTtlConfig.Builder(Time.days(1)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).build();ValueStateDescriptorString lastCartAddStateDescriptor new ValueStateDescriptorString(last-cart-add, String.class);lastCartAddStateDescriptor.enableTimeToLive(ttlConfig);lastCartAddDateState getRuntimeContext().getState(lastCartAddStateDescriptor);}Overridepublic void flatMap(JSONObject value, CollectorCartAddUuBean out) throws Exception {// 当前的时间戳Long curTs value.getLong(ts);String curDate DateFormatUtil.toDate(curTs);String lastCartAddDate lastCartAddDateState.value();if (lastCartAddDate null || !lastCartAddDate.equals(curDate)) {lastCartAddDateState.update(curDate);out.collect(new CartAddUuBean(,,1L,curTs));}}});// TODO 7. 开窗聚合(补充字段)SingleOutputStreamOperatorCartAddUuBean resultDS filterDS.windowAll(TumblingEventTimeWindows.of(org.apache.flink.streaming.api.windowing.time.Time.seconds(10))).reduce(new ReduceFunctionCartAddUuBean() {Overridepublic CartAddUuBean reduce(CartAddUuBean value1, CartAddUuBean value2) throws Exception {value1.setCartAddUuCt(value1.getCartAddUuCt() value2.getCartAddUuCt());return value1;}}, new AllWindowFunctionCartAddUuBean, CartAddUuBean, TimeWindow() {Overridepublic void apply(TimeWindow window, IterableCartAddUuBean values, CollectorCartAddUuBean out) throws Exception {CartAddUuBean next values.iterator().next();next.setStt(DateFormatUtil.toYmdHms(window.getStart()));next.setEdt(DateFormatUtil.toYmdHms(window.getEnd()));next.setTs(System.currentTimeMillis());out.collect(next);}});// TODO 8. 写出到 clickhouseresultDS.addSink(ClickHouseUtil.getSinkFunction(insert into dws_trade_cart_add_uu_window values (?,?,?,?)));// TODO 9. 启动任务env.execute(DwsTradeCartAddUuWindow); 5、交易域支付各窗口汇总表 任务从 Kafka 读取交易域支付成功主题数据统计支付成功独立用户数和首次支付成功用户数第一次在平台消费。 5.1、思路分析 如果一个用户是首次支付成功用户既然是历史第一次下单操作必然也是今天的第一次下单那么他必然是今天的支付成功独立用户所以我们只需要通过状态过滤出 lastPayDate null 或者 lastPayDate ! curDt 的用户注意这里的 lastPayDate 不能设置 TTL 因为我们需要知道这个用户历史上有没有支付过所以就不允许状态失效 left join 实现过程 假设 A 表作为主表与 B 表做等值左外联。当 A 表数据进入算子而 B 表数据未至时会先生成一条 B 表字段均为 null 的关联数据ab1其标记为 I。其后B 表数据到来会先将之前的数据撤回即生成一条与 ab1 内容相同但标记为 -D 的数据再生成一条关联后的数据标记为 I。这样生成的动态表对应的流称之为回撤流。 在 DWD 层的订单预处理表dwd_trade_order_pre_process生成过程中会形成回撤流因为它需要对订单明细活动表和订单明细优惠券表进行 left join。而我们这里的支付成功依赖于 DWD 层支付成功事务事实表dwd_trade_pay_detail_suc该表又依赖于 DWD 层的下单事务事实表dwd_trade_order_detail所以这里我们需要考虑回撤流的问题 回撤数据在 Kafka 中以 null 值的形式存在只需要简单判断即可过滤。我们需要考虑的是如何对其余数据去重 order_id 1001 order_detail_id 1001-a order_detail_activity_id: a1SELECT ... FROM order_detail od join order_info oi onod.order_id oi.id left join order_detail_activity oa onod.id oa.order_detail_id 上面我们有一个订单id1001这个订单内只有一个商品并且参与了活动那么由于 order_detail_activity 来得肯定要晚一些所以可能会出现下面这种情况 /- order_id order_detail_id order_detail_activity_id 1001 1001-a null - null null null1001 1001-a a1 我们过滤 null 值指的是过滤上面操作是 - 的数据因为回撤数据在 Kafka 中以 null 值的形式存在。而除了 null 值之外我们还应该过滤掉旧的错误数据由于 order_detail_activity 数据来得晚一些导致flink 直接给字段 order_detail_activity_id 一个 null所以我们应该把这个字段值删除 但是对于这个需求求支付成功的用户数其实我们也可以不做去重放到最后再做去重为什么呢设想如果一个用户下了多个订单而我们的支付成功表的粒度是商品所以数据即使在 left join 之后对相同 order_detail_id 的数据做了去重但是多个订单的话最终还有重复。 考虑到之后还可能遇到需要去重的需求尤其是设计到金额的这里我们还是练习一下如何实现去重 5.2、代码实现 5.2.1、创建 clickhouse 表格及对应的 JavaBean create table if not exists dws_trade_payment_suc_window (stt DateTime,edt DateTime,payment_suc_unique_user_count UInt64,payment_new_user_count UInt64,ts UInt64 ) engine ReplacingMergeTree(ts)partition by toYYYYMMDD(stt)order by (stt, edt); import lombok.AllArgsConstructor; import lombok.Data;Data AllArgsConstructor public class TradePaymentWindowBean {// 窗口起始时间String stt;// 窗口终止时间String edt;// 支付成功独立用户数Long paymentSucUniqueUserCount;// 支付成功新用户数Long paymentSucNewUserCount;// 时间戳Long ts; } 5.2.2、创建时间工具类 为了去重我们需要对每一条数据都设置一个时间因为对于重复数据它们在原始表中的时间字段值都是一样的。 FlinkSQL 提供了几个可以获取当前时间戳的函数 localtimestamp()返回本地时区的当前时间戳返回类型为 TIMESTAMP(3)。在流处理模式下会对每条记录计算一次时间。而在批处理模式下仅在查询开始时计算一次时间所有数据使用相同的时间。current_timestamp()返回本地时区的当前时间戳返回类型为 TIMESTAMP_LTZ(3)。在流处理模式下会对每条记录计算一次时间。而在批处理模式下仅在查询开始时计算一次时间所有数据使用相同的时间。now()与 current_timestamp 相同。current_row_timestamp()返回本地时区的当前时间戳返回类型为 TIMESTAMP_LTZ(3)。无论在流处理模式还是批处理模式下都会对每行数据计算一次时间。 这里我们使用current_row_timestamp 来作为时间我们需要给订单预处理表中添加 current_row_timestamp() as row_op_ts-- 在建表语句中添加 row_op_ts TIMESTAMP_LTZ(3) 那么下单事务事实表来源于订单预处理表支付成功事务事实表依赖于下单事务事实表搜易当然也应该添加该字段。 import java.util.Comparator;public class TimestampLtz3CompareUtil {public static int compare(String timestamp1, String timestamp2) {// 数据格式 2022-04-01 10:20:47.302Z// 1. 去除末尾的时区标志Z 表示 0 时区String cleanedTime1 timestamp1.substring(0, timestamp1.length() - 1);String cleanedTime2 timestamp2.substring(0, timestamp2.length() - 1);// 2. 提取小于 1秒的部分String[] timeArr1 cleanedTime1.split(\\.);String[] timeArr2 cleanedTime2.split(\\.);String microseconds1 new StringBuilder(timeArr1[timeArr1.length - 1]).append(000).toString().substring(0, 3);String microseconds2 new StringBuilder(timeArr2[timeArr2.length - 1]).append(000).toString().substring(0, 3);int micro1 Integer.parseInt(microseconds1);int micro2 Integer.parseInt(microseconds2);// 3. 提取 yyyy-MM-dd HH:mm:ss 的部分String date1 timeArr1[0];String date2 timeArr2[0];Long ts1 DateFormatUtil.toTs(date1, true);Long ts2 DateFormatUtil.toTs(date2, true);// 4. 获得精确到毫秒的时间戳long microTs1 ts1 * 1000 micro1;long microTs2 ts2 * 1000 micro2;long divTs microTs1 - microTs2;return divTs 0 ? -1 : divTs 0 ? 0 : 1;}public static void main(String[] args) {System.out.println(compare(2022-04-01 11:10:55.040Z,2022-04-01 11:10:55.04Z));} } 5.2.3、读取DWD支付成功事务事实表 读取DWD支付成功事务事实表并转为 JSON 格式然后按照订单明细id进行分组为了对回撤流的数据进行去重根据相同明细id的时间进行判断 // TODO 3. 读取 dwd_trade_pay_detail_suc 的数据String groupId dws_trade_payment_suc_window;DataStreamSourceString paymentSucDS env.addSource(MyKafkaUtil.getFlinkKafkaConsumer(dwd_trade_pay_detail_suc, groupId));// TODO 4. 将数据转为JSON格式SingleOutputStreamOperatorJSONObject jsonDS paymentSucDS.flatMap(new RichFlatMapFunctionString, JSONObject() {Overridepublic void flatMap(String value, CollectorJSONObject out) throws Exception {try {JSONObject jsonObject JSONObject.parseObject(value);out.collect(jsonObject);} catch (Exception e) {// 可以选择输出到侧输出流e.printStackTrace();}}});// TODO 5. 按照订单明细id分组KeyedStreamJSONObject, String keyedStream jsonDS.keyBy(json - json.getString(order_detail_id)); 5.2.4、状态编程对回撤流中的数据去重 这里的回撤流是因为支付成功事务事实表需要用 订单明细 innner join 订单表 left join 订单明细活动 left join 订单明细活动造成的 // TODO 6. 使用状态编程过滤最新数据输出(需要使用状态和定时器所以使用 process)SingleOutputStreamOperatorJSONObject filterDS keyedStream.process(new KeyedProcessFunctionString, JSONObject, JSONObject() {private ValueStateJSONObject lastPaySucDateState;Overridepublic void open(Configuration parameters) throws Exception {lastPaySucDateState getRuntimeContext().getState(new ValueStateDescriptor(last-pay-suc, JSONObject.class));}Overridepublic void processElement(JSONObject value, Context ctx, CollectorJSONObject out) throws Exception {JSONObject state lastPaySucDateState.value();if (state null) {lastPaySucDateState.update(value);// 注册定时器ctx.timerService().registerEventTimeTimer(ctx.timerService().currentProcessingTime() 5000L);} else {String stateRt state.getString(row_op_ts);String curRt value.getString(row_op_ts);int compare TimestampLtz3CompareUtil.compare(stateRt, curRt);if (compare ! 1) { // 状态里的时间小lastPaySucDateState.update(value);}}}Overridepublic void onTimer(long timestamp, OnTimerContext ctx, CollectorJSONObject out) throws Exception {super.onTimer(timestamp, ctx, out);// 输出并清空状态数据JSONObject value lastPaySucDateState.value();out.collect(value);lastPaySucDateState.clear();}});5.2.5、提取事件时间并生成水位线 这里选择 callback_time 它是支付成功后的回调时间  // TODO 7. 提取事件时间生成水位线SingleOutputStreamOperatorJSONObject jsonWithWmDS filterDS.assignTimestampsAndWatermarks(WatermarkStrategy.JSONObjectforBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner(new SerializableTimestampAssignerJSONObject() {Overridepublic long extractTimestamp(JSONObject element, long recordTimestamp) {return DateFormatUtil.toTs(element.getString(callback_time), true);}})); 5.2.6、按照 user_id 分组并提取支付成功独立用户数和首次支付成功用户数 // TODO 8. 按照 user_id 分组KeyedStreamJSONObject, String keyedByUidDS jsonWithWmDS.keyBy(json - json.getString(user_id));// TODO 9. 提取独立支付成功用户数和首次支付成功用户数SingleOutputStreamOperatorTradePaymentWindowBean tradePaymentDS keyedByUidDS.flatMap(new RichFlatMapFunctionJSONObject, TradePaymentWindowBean() {private ValueStateString lastDtState;Overridepublic void open(Configuration parameters) throws Exception {lastDtState getRuntimeContext().getState(new ValueStateDescriptorString(lastDt, String.class));}Overridepublic void flatMap(JSONObject value, CollectorTradePaymentWindowBean out) throws Exception {String lastDt lastDtState.value();String curDt value.getString(callback_time).split( )[0];// 当日支付人数long pay 0L;// 首次支付人数long newPay 0L;// 判断状态是否为nullif (lastDt null) {pay 1;newPay 1;lastDtState.update(curDt);} else if (!lastDt.equals(curDt)) {pay 1;lastDtState.update(curDt);}// 写出if (pay 1) {out.collect(new TradePaymentWindowBean(, , newPay, pay, DateFormatUtil.toTs(curDt)));}}}); 5.2.7、开窗聚合并写出到 clickhouse 开窗是为了实时刷新到报表聚合依然是那两个函数增量聚合聚合结果全量聚合补充窗口起止字段 // TODO 10. 开窗聚合SingleOutputStreamOperatorTradePaymentWindowBean resultDS tradePaymentDS.windowAll(TumblingEventTimeWindows.of(Time.seconds(10))).reduce(new ReduceFunctionTradePaymentWindowBean() {Overridepublic TradePaymentWindowBean reduce(TradePaymentWindowBean value1, TradePaymentWindowBean value2) throws Exception {value1.setPaymentSucNewUserCount(value1.getPaymentSucNewUserCount() value2.getPaymentSucNewUserCount());value1.setPaymentSucUniqueUserCount(value1.getPaymentSucUniqueUserCount() value2.getPaymentSucUniqueUserCount());return value1;}}, new AllWindowFunctionTradePaymentWindowBean, TradePaymentWindowBean, TimeWindow() {Overridepublic void apply(TimeWindow window, IterableTradePaymentWindowBean values, CollectorTradePaymentWindowBean out) throws Exception {TradePaymentWindowBean next values.iterator().next();next.setTs(System.currentTimeMillis());next.setStt(DateFormatUtil.toYmdHms(window.getStart()));next.setEdt(DateFormatUtil.toYmdHms(window.getEnd()));out.collect(next);}});// TODO 11. 写出到 clickhouseresultDS.addSink(ClickHouseUtil.getSinkFunction(insert into dws_trade_payment_suc_window values(?,?,?,?,?)));// TODO 12. 启动任务env.execute(DwsTradePaymentSucWindow); 总结 今天的 DWS 层到此为止剩下了还有几个需求估计还得 1~2 天完成这一块要比之前都难一些争取这周日前把实时数仓完结然后下周开始把离线和实时再好好复习一遍
http://www.dnsts.com.cn/news/19322.html

相关文章:

  • 1688域名网站青岛气象站建站时间
  • 建设o2o网站不错的免费网站建设
  • 什么网站可以做软件有哪些东西wordpress主题如何用
  • 广东自助建站网站山西发布紧急通知
  • 网站建设硬件支撑网店推广新技术
  • 可以自己设计一个公司的网站一键抓取的网站怎么做
  • 做网站怎样租用虚拟空间网络营销模式下品牌推广研究论文
  • 中国建设银行官网站纪念币wordpress互联网访问
  • 做网站选大公司好还是小公司阿里云 win wordpress 伪静态
  • 甘肃网站建设公司小企业网站建设在哪里找
  • 怎么做网站图标巩义网络建设网站
  • 为什么公司的网站打不开wap建站工具
  • 在机关网站建设会上讲话网站推广临沂
  • 兴宁市住房和城乡规划建设局网站wordpress统计蜘蛛
  • 网站的不同类南通通明建设监理有限公司网站
  • 网站搭建什么意思网络服务许可证
  • 国外网站有备案吗哪里可以做免费网站
  • 上海站群优化网站主机名
  • 阿里巴巴国际站开店流程及费用免费一级域名和空间
  • 徐州网站建设方案优化做婚纱影楼网站的价格
  • 上犹建设局网站建设一个直播网站要多少钱
  • 有货 那样的网站怎么做广州娱乐场所最新通知
  • 2021年有没有人给个网站网络网站建设公司排名
  • 杭州百度做网站多少钱网站搭建服务器
  • 自己做的网站怎么传入外网广告设计与制作基础知识
  • 北京的网站开发公司网站免费正能量入口
  • 怎么做自己的购物网站网站建设的关键技术
  • 零基础学pytho 网站开发怎样打造营销型网站建设
  • 网站不推广如何排名企腾做的网站怎么样
  • 网站服务器转移视频吗家装室内设计