百度推广网络推广微信网站,戚墅堰建设网站,青岛做网站哪个最好,国外公司网站模板背景
需要通过flink同时向测试和线上的RocketMQ中写入数据
现象
在程序中分别创建了两个MqProducer#xff0c;设置了不同的nameServerAddr#xff0c;分别调用不同的producer向不同环境发消息#xff0c;返回发送成功#xff0c;但是在线上MQ中却查不到数据#xff0…
背景
需要通过flink同时向测试和线上的RocketMQ中写入数据
现象
在程序中分别创建了两个MqProducer设置了不同的nameServerAddr分别调用不同的producer向不同环境发消息返回发送成功但是在线上MQ中却查不到数据测试环境是有的。 代码如下
private DefaultMQProducer testEnvProducer;
private DefaultMQProducer prodEnvProducer;
Override
public void open(Configuration parameters) throws Exception {if (testEnvProducer null) {testEnvProducer new DefaultMQProducer(_test);testEnvProducer.setNamesrvAddr(SINK_ADDRESS);testEnvProducer.start();}if (prodEnvProducer null) {prodEnvProducer new DefaultMQProducer(_prod);prodEnvProducer.setNamesrvAddr(SOURCE_ADDRESS);prodEnvProducer.start();}
}解决过程及方案
由于不了解flink的运行机制尝试将发送MQ的逻辑拆分为两个sink无济于事在中间遇到了创建DefaultMQProducer时设置的是同一个group理论上是不同的环境不会有问题prodProducer在start时却报该group的实例已经创建当时就有点怀疑是不是两个producer是同一个。后又通过在消息体中增加profile明确区分开线上和测试的数据发现应该发送到线上的数据却发送到了测试环境此时断定是两个producer为同一个实例。查看RocketMQ Client源码发现了factory这个参数
那问题大概率就是这个工厂导致的工厂内做了缓存让我们来看一看内部通过构建了ClientId再通过clinetId去缓存中查询是否有对应实例有则直接返回此时我们肯定要看一看构造clientId是否有可定义的参数得知是通过ip及instanceName等参数构造的instanceName又是系统变量那我们需要做的就是在创建producer实例之前先修改该系统变量修改后问题解决
public void open(Configuration parameters) throws Exception {if (testEnvProducer null) {//需要覆盖该环境变量因为mq client有内部缓存使用了该环境变量作为获取client instance的条件详情见 org.apache.rocketmq.client.ClientConfig#buildMQClientIdSystem.setProperty(rocketmq.client.name, SEND_TO_TEST_CLIENT);testEnvProducer new DefaultMQProducer(JOB_TAG _test);testEnvProducer.setNamesrvAddr(SINK_ADDRESS);testEnvProducer.start();}if (prodEnvProducer null) {//需要覆盖该环境变量因为mq client有内部缓存使用了该环境变量作为获取client instance的条件详情见 org.apache.rocketmq.client.ClientConfig#buildMQClientIdSystem.setProperty(rocketmq.client.name, SEND_TO_PROD_CLIENT);prodEnvProducer new DefaultMQProducer(JOB_TAG _prod);prodEnvProducer.setNamesrvAddr(SOURCE_ADDRESS);prodEnvProducer.start();}
}大家在实际开发中如果有这种场景的话也要注意哦