微网站免费建设平台,php 网站开发收费,wordpress翻页代码,自己做h5网站背景
在某个场景中#xff0c;需要从Kafka中获取数据#xff0c;经过转换处理后#xff0c;需要同时sink到多个输出源中(kafka、mysql、hologres)等。两次调用execute, 阿里云Flink vvr引擎报错#xff1a;
public static void main(String[] args) {final StreamExecuti…背景
在某个场景中需要从Kafka中获取数据经过转换处理后需要同时sink到多个输出源中(kafka、mysql、hologres)等。两次调用execute, 阿里云Flink vvr引擎报错
public static void main(String[] args) {final StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tEnv StreamTableEnvironment.create(env);StreamStatementSet streamStatementSet tEnv.createStatementSet();String s LocalDateTimeUtils.getDateTime(System.currentTimeMillis());DataStreamString dataStream env.fromElements(s, LocalDateTimeUtils.getDateTime(System.currentTimeMillis()));tEnv.executeSql(KAFKA_TABLE_SQL);tEnv.executeSql(KAFKA_TABLE_SQL_1);Table table tEnv.fromDataStream(dataStream);table.insertInto(kafka_sink).execute();table.insertInto(kafka_sink_1).execute();streamStatementSet.execute();}Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot have more than one execute() or executeAsync() call in a single environment.at org.apache.flink.client.program.StreamContextEnvironment.validateAllowedExecution(StreamContextEnvironment.java:199) ~[flink-dist-1.15-vvr-6.0.7-1-SNAPSHOT.jar:1.15-vvr-6.0.7-1-SNAPSHOT]at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:187) ~[flink-dist-1.15-vvr-6.0.7-1-SNAPSHOT.jar:1.15-vvr-6.0.7-1-SNAPSHOT]at org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(DefaultExecutor.java:110) ~[?:?]at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:877) ~[flink-table-api-java-uber-1.15-vvr-6.0.7-1-SNAPSHOT.jar:1.15-vvr-6.0.7-1-SNAPSHOT]at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:756) ~[flink-table-api-java-uber-1.15-vvr-6.0.7-1-SNAPSHOT.jar:1.15-vvr-6.0.7-1-SNAPSHOT]at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:955) ~[flink-table-api-java-uber-1.15-vvr-6.0.7-1-SNAPSHOT.jar:1.15-vvr-6.0.7-1-SNAPSHOT]at org.apache.flink.table.api.internal.TablePipelineImpl.execute(TablePipelineImpl.java:57) ~[flink-table-api-java-uber-1.15-vvr-6.0.7-1-SNAPSHOT.jar:1.15-vvr-6.0.7-1-SNAPSHOT]解决
使用 StreamStatementSet. 具体参考官网 https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/dev/table/data_stream_api/#converting-between-datastream-and-table
改良后的代码
public static void main(String[] args) {final StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tEnv StreamTableEnvironment.create(env);StreamStatementSet streamStatementSet tEnv.createStatementSet();String s LocalDateTimeUtils.getDateTime(System.currentTimeMillis());DataStreamString dataStream env.fromElements(s, LocalDateTimeUtils.getDateTime(System.currentTimeMillis()));tEnv.executeSql(KAFKA_TABLE_SQL);tEnv.executeSql(KAFKA_TABLE_SQL_1);Table table tEnv.fromDataStream(dataStream);streamStatementSet.addInsert(kafka_sink, table);streamStatementSet.addInsert(kafka_sink_1, table);streamStatementSet.execute();}