搜题网站怎么制作,房地产市场调研报告,茂名建设公司网站,合肥网络公司有哪些更多Paimon数据湖内容请关注#xff1a;https://edu.51cto.com/course/35051.html
Paimon提供了两种类型的Catalog#xff1a;Filesystem Catalog和Hive Catalog。
Filesystem Catalog#xff1a;会把元数据信息存储到文件系统里面。Hive Catalog#xff1a;则会把元数据…更多Paimon数据湖内容请关注https://edu.51cto.com/course/35051.html
Paimon提供了两种类型的CatalogFilesystem Catalog和Hive Catalog。
Filesystem Catalog会把元数据信息存储到文件系统里面。Hive Catalog则会把元数据信息存储到Hive的Metastore里面这样就可以直接在Hive中访问Paimon表了。注意此时也会同时在文件系统中存储一份元数据信息相当于元数据会存储两份这个大家需要特别注意一下。
还有就是我们在使用Hive Catalog的时候Paimon中的数据库名称、表名称以及字段名称都要小写因为这些数据存储到Hive Metastore的时候会统一存储为小写。
下面我们来具体演示一下Paimon如何使用Hive Catalog来存储元数据。
在Flink中操作Paimon的时候想要使用Hive Catalog需要依赖于Flink Hive connector以及hive-exec和flink-table-api-scala-bridge。
flink-table-api-scala-bridge这个依赖我们之前已经添加过了所以只需要添加另外两个即可
!-- flink-hive-connector --
dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-hive_2.12/artifactIdversion1.15.0/version!--scopeprovided/scope--
/dependency
dependencygroupIdorg.apache.hive/groupIdartifactIdhive-exec/artifactIdversion3.1.2/versionexclusionsexclusiongroupIdorg.apache.logging.log4j/groupIdartifactIdlog4j-slf4j-impl/artifactId/exclusion/exclusions!--scopeprovided/scope--
/dependency创建packagetech.xuwei.paimon.catalog 创建objectPaimonHiveCatalog
代码如下
package tech.xuwei.paimon.catalogimport org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment/*** Paimon使用Hive Catalog* Created by xuwei*/
object PaimonHiveCatalog {def main(args: Array[String]): Unit {//创建执行环境val env StreamExecutionEnvironment.getExecutionEnvironmentenv.setRuntimeMode(RuntimeExecutionMode.STREAMING)val tEnv StreamTableEnvironment.create(env)//创建Paimon类型的Catalog--使用Hive CatalogtEnv.executeSql(|CREATE CATALOG paimon_hive_catalog WITH(| typepaimon,| metastore hive,| uri thrift://bigdata04:9083,| warehousehdfs://bigdata01:9000/paimon|)|.stripMargin)tEnv.executeSql(USE CATALOG paimon_hive_catalog)//创建Paimon表tEnv.executeSql(|CREATE TABLE IF NOT EXISTS p_h_t1(| name STRING,| age INT,| PRIMARY KEY (name) NOT ENFORCED|)|.stripMargin)//向表中插入数据tEnv.executeSql(|INSERT INTO p_h_t1(name,age) VALUES(jack,18),(tom,20)|.stripMargin)}}接下来到bigdata04节点上启动hive的metastore服务。
[rootbigdata04 ~]# cd /data/soft/apache-hive-3.1.2-bin/
[rootbigdata04 apache-hive-3.1.2-bin]# nohup bin/hive --service metastore -p 9083 21 /dev/null 然后运行代码PaimonHiveCatalog
代码运行之后可以到先到hdfs中确认一下是否能看到元数据信息
[rootbigdata04 ~]# hdfs dfs -cat /paimon/default.db/p_h_t1/schema/schema-0
{id : 0,fields : [ {id : 0,name : name,type : STRING NOT NULL}, {id : 1,name : age,type : INT} ],highestFieldId : 1,partitionKeys : [ ],primaryKeys : [ name ],options : { }可以发现在hdfs中依然是可以看到的因为我们前面说了使用hive catalog时也会同时在hdfs中存储一份元数据。
最后我们到hive中确认一下 注意由于目前bigdata04节点的环境变量中有HADOOP_CLASSPATH所以直接使用hive客户端会看到很多日志信息所以建议使用hive的beeline客户端。 此时需要先启动hiveserver2服务。
[rootbigdata04 ~]# cd /data/soft/apache-hive-3.1.2-bin/
[rootbigdata04 apache-hive-3.1.2-bin]# bin/hiveserver2使用beeline客户端进行连接
[rootbigdata04 apache-hive-3.1.2-bin]# bin/beeline -u jdbc:hive2://localhost:10000 -n root
0: jdbc:hive2://localhost:10000 show tables;
--------------------
| tab_name |
--------------------
| flink_stu |
| orders |
| p_h_t1 |
| s1 |
| student_favors |
| student_favors_2 |
| student_score |
| student_score_bak |
| t1 |
--------------------
9 rows selected (1.727 seconds)
0: jdbc:hive2://localhost:10000 select * from p_h_t1;
Error: Error while compiling statement: FAILED: RuntimeException java.lang.ClassNotFoundException: org.apache.paimon.hive.mapred.PaimonInputFormat (state42000,code40000)此时是可以在hive中查看到p_h_t1这个表的但是在操作这个表的时候会报错提示缺少依赖现在报这个错是正常的等后面我们会有一个单独的小节来讲Paimon和Hive引擎的集成。 目前通过hive catalog可以将paimon的元数据同时存储到hive的metastore中但是还无法在hive中操作paimon的表其实主要是因为缺少一个依赖在这大家先知道这个问题即可。 注意如果我们此时操作的是分区表那么分区信息默认是无法同步到Hive Metastore的。 也就是说默认情况下Paimon不会将新创建的分区同步到Hive Metastore中。我们在Hive中只能看到一个未分区的普通表。
如果想解决这个问题也很简单只需要在paimon的表属性中设置metastore.partitioned-tabletrue即可。
下面开发一个案例 创建objectPaimonHiveCatalogPartitionTable基于PaimonHiveCatalog进行复制。
完整代码如下
package tech.xuwei.paimon.catalogimport org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment/*** Paimon使用Hive Catalog* 操作分区表* Created by xuwei*/
object PaimonHiveCatalogPartitionTable {def main(args: Array[String]): Unit {//创建执行环境val env StreamExecutionEnvironment.getExecutionEnvironmentenv.setRuntimeMode(RuntimeExecutionMode.STREAMING)val tEnv StreamTableEnvironment.create(env)//创建Paimon类型的Catalog--使用Hive CatalogtEnv.executeSql(|CREATE CATALOG paimon_hive_catalog WITH(| typepaimon,| metastore hive,| uri thrift://bigdata04:9083,| warehousehdfs://bigdata01:9000/paimon|)|.stripMargin)tEnv.executeSql(USE CATALOG paimon_hive_catalog)//创建Paimon表tEnv.executeSql(|CREATE TABLE IF NOT EXISTS p_h_par(| id INT,| name STRING,| dt STRING,| PRIMARY KEY (id, dt) NOT ENFORCED|) PARTITIONED BY (dt) WITH(| metastore.partitioned-table true|)|.stripMargin)//向表中插入数据tEnv.executeSql(|INSERT INTO p_h_par(id,name,dt)|VALUES(1,jack,20230101),(2,tom,20230102)|.stripMargin)}}在idea中执行代码。
然后到hive中进行验证可以执行show partitions p_h_par;进行验证。
或者到hive metastore里面进行确认查看mysql中的partitions表这个表里面存储的是分区信息如果能看到分区信息就说明Paimon表的分区信息同步过来了。
这样就说明Paimon表的分区信息同步过来了。
更多Paimon数据湖内容请关注https://edu.51cto.com/course/35051.html