怎样用手机搭建网站,杏林建设网站,莒县做网站和微信,网站建设需要哪些证书Flink CDC 自定义函数处理 SQLServer XML类型数据方案
1. 背景
因业务使用SQLServer数据库#xff0c;CDC同步到doris 数仓。对于SQLServer xml类型#xff0c;doris没有相应的字段对应#xff0c;
可以使用json来存储xml数据。需要进行一步转换。从 flink 自定义函数入手…Flink CDC 自定义函数处理 SQLServer XML类型数据方案
1. 背景
因业务使用SQLServer数据库CDC同步到doris 数仓。对于SQLServer xml类型doris没有相应的字段对应
可以使用json来存储xml数据。需要进行一步转换。从 flink 自定义函数入手。
2. 解决方案
SQLServer xml 字段如下
itemsitem lngzh-CN value银行货到付款 /item lngen valueBank transfer on delivery /
/itemsdoris 存储转换后的json内容
{item: [{lng: zh-CN,value: 银行货到付款},{lng: en,value: Bank transfer on delivery}]
}flink 自定义函数代码 import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.dataformat.xml.XmlMapper;
import org.apache.flink.table.functions.ScalarFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;/*** 将XML转换为JSON*/
public class XmlToJson extends ScalarFunction {private Logger log LoggerFactory.getLogger(XmlToJson.class);/*** 创建XmlMapper对象用于解析XML*/private final XmlMapper xmlMapper new XmlMapper();public String eval(String xml) {// 将XML字符串解析为JsonNode对象JsonNode jsonNode null;try {jsonNode xmlMapper.readTree(xml);} catch (JsonProcessingException e) {log.error(XML解析失败, e);}// 将JsonNode对象转换为JSON字符串return jsonNode.toString();}
}doris 表
-- GName 为json格式
CREATE TABLE table1 (ID int(11) NOT NULL COMMENT 字典表统一ID,Name varchar(600) NULL COMMENT 统一进行字典命名,GName json NULL COMMENT 采用xml存储多语言,
) ENGINEOLAP
UNIQUE KEY(ID)
COMMENT 测试表
DISTRIBUTED BY HASH(ID) BUCKETS AUTO
PROPERTIES (
replication_allocation tag.location.default: 1,
is_being_synced false,
storage_format V2,
enable_unique_key_merge_on_write true,
light_schema_change true,
disable_auto_compaction false,
enable_single_replica_compaction false
);注册自定义函数 sql调用转换
create temporary function xml_to_json as com.zfb.flink.udf.XmlToJson;INSERT INTO flink_doris (ID,Name, GName)
SELECT
ID,Name, xml_to_json(GName), TypeID
FROM table1;
doris json使用
selectjson_extract_string(GName, $.item[0].value) as cn_name,*
fromtable1;