一起做网站广州,东莞关键词排名seo,万网网站后台管理,动漫项网站建设项目项目建议书Scala 练习一 将Mysql表数据导入HBase 续第一篇#xff1a;Java代码将Mysql表数据导入HBase表 源码仓库地址#xff1a;https://gitee.com/leaf-domain/data-to-hbase 一、整体介绍二、依赖三、测试结果四、源码
一、整体介绍 HBase特质 连接HBase, 创建HBase执行对象 初始化…Scala 练习一 将Mysql表数据导入HBase 续第一篇Java代码将Mysql表数据导入HBase表 源码仓库地址https://gitee.com/leaf-domain/data-to-hbase 一、整体介绍二、依赖三、测试结果四、源码
一、整体介绍 HBase特质 连接HBase, 创建HBase执行对象 初始化配置信息多条(hbase.zookeeper.quorumip:2181) Configuration conf HBaseConfiguration.create() conf.set(String, String)创建连接多个连接(池化) Connection con ConnectionFactory.createConnection()创建数据表表名: String Table table con.getTable(TableName) def build(): HBase // 初始化配置信息
def initPool(): HBase // 初始化连接池
def finish(): Executor // 完成 返回执行对象Executor特质 对HBase进行操作的方法: 包含如下函数 def exists(tableName: String): Boolean // 验证数据表是否存在
def create(tableName: String, columnFamilies: Seq[String]): Boolean // 创建数据表
def drop(tableName: String): Boolean // 删除数据表
def put(tableName: String, data: util.List[Put]): Boolean // 批量插入数据Jdbc 封装 Jdbc封装 初始化连接 driver : com.mysql.cj.jdbc.Driver 参数url, username, password 创建连接初始化执行器 sql, parameters 创建执行器【初始化参数】执行操作并返回【结果】 DML: 返回影响数据库表行数 DQL: 返回查询的数据集合 EX: 出现异常结果 MyHBase用于实现HBase和 Executor特质 测试数据格式 mysql表 SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS 0;DROP TABLE IF EXISTS test_table_for_hbase;
CREATE TABLE test_table_for_hbase (test_id int NULL DEFAULT NULL,test_name varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,test_age int NULL DEFAULT NULL,test_gender varchar(6) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,test_phone varchar(11) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL
) ENGINE InnoDB CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci ROW_FORMAT Dynamic;INSERT INTO test_table_for_hbase VALUES (1, testName1, 26, male, 18011111112);
INSERT INTO test_table_for_hbase VALUES (2, testName2, 25, female, 18011111113);
INSERT INTO test_table_for_hbase VALUES (3, testName3, 27, male, 18011111114);
INSERT INTO test_table_for_hbase VALUES (4, testName4, 35, male, 18011111115);
-- .... 省略以下数据部分hbase表 # 创建表 库名:表名, 列族1, 列族2
create hbase_test:tranfer_from_mysql,baseInfo,scoreInfo
truncate hbase_test:tranfer_from_mysql # 清空hbase_test命名空间下的tranfer_from_mysql表
scan hbase_test:tranfer_from_mysql # 查看表二、依赖
dependencies!-- HBase 驱动 --dependencygroupIdorg.apache.hbase/groupIdartifactIdhbase-client/artifactIdversion2.3.5/version/dependency!-- Hadoop --dependencygroupIdorg.apache.hadoop/groupIdartifactIdhadoop-common/artifactIdversion3.1.3/version/dependencydependencygroupIdorg.apache.hadoop/groupIdartifactIdhadoop-auth/artifactIdversion3.1.3/version/dependency!-- mysql --dependencygroupIdcom.mysql/groupIdartifactIdmysql-connector-j/artifactIdversion8.0.33/version/dependency!-- zookeeper --dependencygroupIdorg.apache.zookeeper/groupIdartifactIdzookeeper/artifactIdversion3.6.3/version/dependency
/dependencies三、测试结果
终端有个日志的小警告(无伤大雅hh)输出为 true
查看hbase表发现数据正常导入 四、源码
scala代码较简单这里直接上源码了去除了部分注释更多请去仓库下载
Executor
package hbase
import org.apache.hadoop.hbase.client.Put
import java.util
trait Executor {def exists(tableName: String): Booleandef create(tableName: String, columnFamilies: Seq[String]): Booleandef drop(tableName: String): Booleandef put(tableName: String, data: util.List[Put]): Boolean
}
HBase
package hbase
import org.apache.hadoop.hbase.client.Connection
trait HBase {protected var statusCode: Int -1def build(): HBasecase class PoolCon(var available: Boolean, con: Connection) {def out {available falsethis}def in available true}def initPool(): HBasedef finish(): Executor
}MyHBase
package hbase.implimport hbase.{Executor, HBase}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.client.{ColumnFamilyDescriptorBuilder, ConnectionFactory, Put, TableDescriptorBuilder}
import org.apache.hadoop.hbase.exceptions.HBaseException
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}import java.util
import scala.collection.mutable.ArrayBufferclass MyHBase (conf: Map[String, String])(pooled: Boolean false, poolSize: Int 3) extends HBase{private lazy val config: Configuration HBaseConfiguration.create()private lazy val pool: ArrayBuffer[PoolCon] ArrayBuffer()override def build(): HBase {if(statusCode -1){conf.foreach(t config.set(t._1, t._2))statusCode 0this}else{throw new HBaseException(build() function must be invoked first)}}override def initPool(): HBase {if(statusCode 0){val POOL_SIZE if (pooled) {if (poolSize 0) 3 else poolSize} else 1for (i - 1 to POOL_SIZE) {pool.append(PoolCon(available true, ConnectionFactory.createConnection(config)))}statusCode 1this}else{throw new HBaseException(initPool() function must be invoked only after build())}}override def finish(): Executor {if (statusCode 1) {statusCode 2new Executor {override def exists(tableName: String): Boolean {var pc: PoolCon nulltry{pc getConval exists pc.con.getAdmin.tableExists(TableName.valueOf(tableName))pc.inexists}catch {case e: Exception e.printStackTrace()false}finally {close(pc)}}override def create(tableName: String, columnFamilies: Seq[String]): Boolean {if (exists(tableName)) {return false}var pc: PoolCon nulltry {pc getConval builder: TableDescriptorBuilder TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName))columnFamilies.foreach(cf builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(cf)))pc.con.getAdmin.createTable(builder.build())true} catch {case e: Exception e.printStackTrace()false} finally {close(pc)}}override def drop(tableName: String): Boolean {if(!exists(tableName)){return false}var pc: PoolCon nulltry {pc getConpc.con.getAdmin.deleteTable(TableName.valueOf(tableName))true} catch {case e: Exception e.printStackTrace()false} finally {close(pc)}}override def put(tableName: String, data: util.List[Put]): Boolean {if(!exists(tableName)){return false}var pc: PoolCon nulltry {pc getConpc.con.getTable(TableName.valueOf(tableName)).put(data)true} catch {case e: Exception e.printStackTrace()false} finally {close(pc)}}}}else {throw new HBaseException(finish() function must be invoked only after initPool())}}private def getCon {val left: ArrayBuffer[PoolCon] pool.filter(_.available)if (left.isEmpty) {throw new HBaseException(no available connection)}left.apply(0).out}private def close(con: PoolCon) {if (null ! con) {con.in}}
}object MyHBase{def apply(conf: Map[String, String])(poolSize: Int): MyHBase new MyHBase(conf)(true, poolSize)
}
Jdbc
package mysql
import java.sql.{Connection, DriverManager, ResultSet, SQLException}
import java.util
object Jdbc {object Result extends Enumeration {val EX Value(0) val DML Value(1) val DQL Value(2) }// 3种结果(异常DMLDQL)封装case class ResThree(rst: Result.Value) {def to[T : ResThree]: T this.asInstanceOf[T]}class Ex(throwable: Throwable) extends ResThree(Result.EX)object Ex {def apply(throwable: Throwable): Ex new Ex(throwable)}class Dml(affectedRows: Int) extends ResThree(Result.DML) {def update affectedRows}object Dml {def apply(affectedRows: Int): Dml new Dml(affectedRows)}class Dql(set: ResultSet) extends ResThree(Result.DQL) {def generate[T](f: ResultSet T) {val list: util.List[T] new util.ArrayList()while (set.next()) {list.add(f(set))}list}}object Dql {def apply(set: ResultSet): Dql new Dql(set)}// JDBC 函数封装def jdbc(url: String, user: String, password: String)(sql: String, params: Seq[Any] null): ResThree {def con() {// 1.1 显式加载 JDBC 驱动程序只需要一次Class.forName(com.mysql.cj.jdbc.Driver)// 1.2 创建连接对象DriverManager.getConnection(url, user, password)}def pst(con: Connection) {// 2.1 创建执行对象val pst con.prepareStatement(sql)// 2.2 初始化 SQL 参数if (null ! params params.nonEmpty) {params.zipWithIndex.foreach(t pst.setObject(t._2 1, t._1))}pst}try {val connect con()val prepared pst(connect)sql match {case sql if sql.matches(^(insert|INSERT|delete|DELETE|update|UPDATE) .*) Dml(prepared.executeUpdate())case sql if sql.matches(^(select|SELECT) .*) Dql(prepared.executeQuery())case _ Ex(new SQLException(sillegal sql command : $sql))}} catch {case e: Exception Ex(e)}}}Test
import hbase.impl.MyHBase
import mysql.Jdbc._
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.util.Bytes
import java.utilobject Test {def main(args: Array[String]): Unit {// 初始化MySQL JDBC操作函数val jdbcOpr: (String, Seq[Any]) ResThree jdbc(user root,url jdbc:mysql://localhost:3306/test_db_for_bigdata,password 123456)// 执行SQL查询并将结果封装在ResThree对象中val toEntity: ResThree jdbcOpr(select * from test_table_for_hbase where test_id between ? and ?,Seq(2, 4))// 判断ResThree对象中的结果是否为异常if (toEntity.rst Result.EX) {// 如果异常执行异常结果处理toEntity.to[Ex]println(出现异常结果处理)} else {// 如果没有异常将查询结果转换为HBase的Put对象列表val puts: util.List[Put] toEntity.to[Dql].generate(rst {// 创建一个Put对象表示HBase中的一行val put new Put(Bytes.toBytes(rst.getInt(test_id)), // row key设置为test_idSystem.currentTimeMillis() // 设置时间戳)// 向Put对象中添加列值// baseInfo是列族名test_name、test_age、test_gender、test_phone是列名put.addColumn(Bytes.toBytes(baseInfo), Bytes.toBytes(test_name),Bytes.toBytes(rst.getString(test_name)))put.addColumn(Bytes.toBytes(baseInfo), Bytes.toBytes(test_age),Bytes.toBytes(rst.getString(test_age)) // 注意这里假设test_age是字符串类型但通常应为整数类型)put.addColumn(Bytes.toBytes(baseInfo), Bytes.toBytes(test_gender),Bytes.toBytes(rst.getString(test_gender)))put.addColumn(Bytes.toBytes(baseInfo), Bytes.toBytes(test_phone),Bytes.toBytes(rst.getString(test_phone)))// 返回构建好的Put对象put})// 如果有数据需要插入HBaseif (puts.size() 0) {// 初始化HBase连接池并执行Put操作val exe MyHBase(Map(hbase.zookeeper.quorum - single01:2181))(1).build().initPool().finish()// 执行Put操作并返回是否成功val bool exe.put(hbase_test:tranfer_from_mysql, puts)// 打印操作结果println(bool)} else {// 如果没有数据需要插入println(查无数据)}}}
}