高明网站制作,企业网站制作与维护,精品课程网站的建设,Wordpress简约卡片理论上可以eventtime processtime混用,但是下面代码测试发现bug,输入一条数据会一直输出. 
flink github无法提bug/问题. apache jira账户新建后竟然flink又需要一个账户,放弃 bug复现操作 
idea运行代码后 往source kafka发送一条数据   
a,1,1690304400000 
可以看到无限输出…理论上可以eventtime processtime混用,但是下面代码测试发现bug,输入一条数据会一直输出. 
flink github无法提bug/问题. apache jira账户新建后竟然flink又需要一个账户,放弃 bug复现操作 
idea运行代码后 往source kafka发送一条数据   
a,1,1690304400000 
可以看到无限输出: 理论上时间语义不建议混用,但是在rich函数中的确可以做到混用且正常使用 问题复现代码 
package com.yy.flinkWindowAndTriggerimport com.yy.flinkWindow.M1
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.configuration.{Configuration, RestOptions}
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.time.Time.seconds
import org.apache.flink.streaming.api.windowing.triggers.{ContinuousProcessingTimeTrigger, CountTrigger, ProcessingTimeTrigger}
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.joda.time.Secondsobject flinkEventWindowAndProcessTriggerBUGLearn {def main(args: Array[String]): Unit  {// flink 启动本地webuival conf  new Configurationconf.setInteger(RestOptions.PORT, 28080)//    val env  StreamExecutionEnvironment.getExecutionEnvironmentval env  StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)//    val env  StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)env.configure(conf)/*kafka输入:a,1,1690304400000        //对应 2023-07-26 01:00:00 (无限输出)       //如果传入 a,1,1693037756000 对应:2023-08-26 16:15:56 (1条/s)a,1,7200000               // 1970-01-1 10:00:00*/val brokers  172.18.105.147:9092val source  KafkaSource.builder[String].setBootstrapServers(brokers).setTopics(t1).setGroupId(my-group-23asdf46).setStartingOffsets(OffsetsInitializer.latest())// .setDeserializer() // 参数: KafkaRecordDeserializationSchema.setDeserializer(new M1()).build()val ds1  env.fromSource(source, WatermarkStrategy.noWatermarks(), Kafka Source)val s1  ds1.map(_.split(,)).map(x  C1(x(0), x(1).toInt, x(2).toLong)) // key number 时间戳.assignTimestampsAndWatermarks(new OTAWatermarks(Time.seconds(0))).keyBy(_.f1).window(TumblingEventTimeWindows.of(seconds(10))).trigger(ContinuousProcessingTimeTrigger.of[TimeWindow](seconds(10L))).reduce((x, y)  C1(x.f1, x.f2  y.f2, 100L))s1.print()env.execute(KafkaNewSourceAPi)}// 乱序流class OTAWatermarks(time: Time) extends BoundedOutOfOrdernessTimestampExtractor[C1](time) {override def extractTimestamp(element: C1): Long  {element.f3}}// key num timestampcase class C1(f1: String, f2: Int, f3: Long)
}- - maven pom 
project xmlnshttp://maven.apache.org/POM/4.0.0 xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdorg.example/groupIdartifactIdFlinkLocalDemo/artifactIdversion1.0-SNAPSHOT/versionpackagingjar/packagingnameFlinkLocalDemo/nameurlhttp://maven.apache.org/urlpropertiesproject.build.sourceEncodingUTF-8/project.build.sourceEncodingflink.version1.17.1/flink.versionscala.binary.version2.12/scala.binary.versionscala.version2.12.8/scala.version/propertiesdependencies!-- https://mvnrepository.com/artifact/joda-time/joda-time --dependencygroupIdjoda-time/groupIdartifactIdjoda-time/artifactIdversion2.12.5/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-avro/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-runtime-web/artifactIdversion${flink.version}/version/dependency!-- https://mvnrepository.com/artifact/com.alibaba.fastjson2/fastjson2 --dependencygroupIdcom.alibaba.fastjson2/groupIdartifactIdfastjson2/artifactIdversion2.0.33/version/dependency!-- https://mvnrepository.com/artifact/com.alibaba/fastjson --dependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactIdversion1.2.83/version
!--            version1.2.17/version--/dependencydependencygroupIdjunit/groupIdartifactIdjunit/artifactIdversion3.8.1/versionscopetest/scope/dependency!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-common --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-common/artifactIdversion${flink.version}/version/dependency!-- 引入flink1.13.0 scala2.12.12   --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-kafka/artifactIdversion${flink.version}/versionscopeprovided/scope/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-json/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-scala_${scala.binary.version}/artifactIdversion${flink.version}/versionscopeprovided/scope/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-scala_${scala.binary.version}/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-csv/artifactIdversion${flink.version}/version/dependency!-- Either... --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-api-java-bridge/artifactIdversion${flink.version}/version/dependency!-- or... --!--        下面几个是代码中写sql需要的包 四个中一个都不能少 --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-api-scala-bridge_${scala.binary.version}/artifactIdversion${flink.version}/version/dependency!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner-loader --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-planner-loader/artifactIdversion${flink.version}/version
!--            scopetest/scope--/dependency!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-runtime --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-runtime/artifactIdversion${flink.version}/versionscopeprovided/scope/dependency!--  https://mvnrepository.com/artifact/org.apache.flink/flink-connector-files --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-files/artifactIdversion${flink.version}/version/dependency!--        注意: flink-table-planner-loader 不能和 flink-table-planner_${scala.binary.version} 共存--!--        dependency--!--            groupIdorg.apache.flink/groupId--!--            artifactIdflink-table-planner_${scala.binary.version}/artifactId--!--            version${flink.version}/version--!--            scopeprovided/scope--!--        /dependency--dependencygroupIdorg.apache.flink/groupIdartifactIdflink-clients/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-jdbc/artifactIdversion3.1.0-1.17/versionscopeprovided/scope/dependencydependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactIdversion8.0.11/version/dependency/dependenciesbuildplugins!-- 打jar插件 --plugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-shade-plugin/artifactIdversion2.4.3/versionexecutionsexecutionphasepackage/phasegoalsgoalshade/goal/goalsconfigurationfiltersfilterartifact*:*/artifactexcludesexcludeMETA-INF/*.SF/excludeexcludeMETA-INF/*.DSA/excludeexcludeMETA-INF/*.RSA/exclude/excludes/filter/filters/configuration/execution/executions/pluginplugingroupIdorg.scala-tools/groupIdartifactIdmaven-scala-plugin/artifactIdversion2.15.2/versionexecutionsexecutiongoalsgoalcompile/goalgoaltestCompile/goal/goals/execution/executions/pluginplugingroupIdnet.alchim31.maven/groupIdartifactIdscala-maven-plugin/artifactIdversion3.2.2/versionexecutionsexecutionidscala-compile-first/idphaseprocess-resources/phasegoalsgoaladd-source/goalgoalcompile/goal/goals/execution/executionsconfigurationscalaVersion${scala.version}/scalaVersion/configuration/pluginplugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-assembly-plugin/artifactIdversion2.5.5/versionconfiguration!--这部分可有可无,加上的话则直接生成可运行jar包--!--archive--!--manifest--!--mainClass${exec.mainClass}/mainClass--!--/manifest--!--/archive--descriptorRefsdescriptorRefjar-with-dependencies/descriptorRef/descriptorRefs/configuration/pluginplugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-compiler-plugin/artifactIdversion3.1/versionconfigurationsource11/sourcetarget11/target/configuration/plugin/plugins/build
/project