当前位置: 首页 > news >正文

一起做网站广州东莞关键词排名seo

一起做网站广州,东莞关键词排名seo,万网网站后台管理,动漫项网站建设项目项目建议书Scala 练习一 将Mysql表数据导入HBase 续第一篇#xff1a;Java代码将Mysql表数据导入HBase表 源码仓库地址#xff1a;https://gitee.com/leaf-domain/data-to-hbase 一、整体介绍二、依赖三、测试结果四、源码 一、整体介绍 HBase特质 连接HBase, 创建HBase执行对象 初始化…Scala 练习一 将Mysql表数据导入HBase 续第一篇Java代码将Mysql表数据导入HBase表 源码仓库地址https://gitee.com/leaf-domain/data-to-hbase 一、整体介绍二、依赖三、测试结果四、源码 一、整体介绍 HBase特质 连接HBase, 创建HBase执行对象 初始化配置信息多条(hbase.zookeeper.quorumip:2181) Configuration conf HBaseConfiguration.create() conf.set(String, String)创建连接多个连接(池化) Connection con ConnectionFactory.createConnection()创建数据表表名: String Table table con.getTable(TableName) def build(): HBase // 初始化配置信息 def initPool(): HBase // 初始化连接池 def finish(): Executor // 完成 返回执行对象Executor特质 对HBase进行操作的方法: 包含如下函数 def exists(tableName: String): Boolean // 验证数据表是否存在 def create(tableName: String, columnFamilies: Seq[String]): Boolean // 创建数据表 def drop(tableName: String): Boolean // 删除数据表 def put(tableName: String, data: util.List[Put]): Boolean // 批量插入数据Jdbc 封装 Jdbc封装 初始化连接 driver : com.mysql.cj.jdbc.Driver 参数url, username, password 创建连接初始化执行器 sql, parameters 创建执行器【初始化参数】执行操作并返回【结果】 DML: 返回影响数据库表行数 DQL: 返回查询的数据集合 EX: 出现异常结果 MyHBase用于实现HBase和 Executor特质 测试数据格式 mysql表 SET NAMES utf8mb4; SET FOREIGN_KEY_CHECKS 0;DROP TABLE IF EXISTS test_table_for_hbase; CREATE TABLE test_table_for_hbase (test_id int NULL DEFAULT NULL,test_name varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,test_age int NULL DEFAULT NULL,test_gender varchar(6) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,test_phone varchar(11) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL ) ENGINE InnoDB CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci ROW_FORMAT Dynamic;INSERT INTO test_table_for_hbase VALUES (1, testName1, 26, male, 18011111112); INSERT INTO test_table_for_hbase VALUES (2, testName2, 25, female, 18011111113); INSERT INTO test_table_for_hbase VALUES (3, testName3, 27, male, 18011111114); INSERT INTO test_table_for_hbase VALUES (4, testName4, 35, male, 18011111115); -- .... 省略以下数据部分hbase表 # 创建表 库名:表名, 列族1, 列族2 create hbase_test:tranfer_from_mysql,baseInfo,scoreInfo truncate hbase_test:tranfer_from_mysql # 清空hbase_test命名空间下的tranfer_from_mysql表 scan hbase_test:tranfer_from_mysql # 查看表二、依赖 dependencies!-- HBase 驱动 --dependencygroupIdorg.apache.hbase/groupIdartifactIdhbase-client/artifactIdversion2.3.5/version/dependency!-- Hadoop --dependencygroupIdorg.apache.hadoop/groupIdartifactIdhadoop-common/artifactIdversion3.1.3/version/dependencydependencygroupIdorg.apache.hadoop/groupIdartifactIdhadoop-auth/artifactIdversion3.1.3/version/dependency!-- mysql --dependencygroupIdcom.mysql/groupIdartifactIdmysql-connector-j/artifactIdversion8.0.33/version/dependency!-- zookeeper --dependencygroupIdorg.apache.zookeeper/groupIdartifactIdzookeeper/artifactIdversion3.6.3/version/dependency /dependencies三、测试结果 终端有个日志的小警告(无伤大雅hh)输出为 true 查看hbase表发现数据正常导入 四、源码 scala代码较简单这里直接上源码了去除了部分注释更多请去仓库下载 Executor package hbase import org.apache.hadoop.hbase.client.Put import java.util trait Executor {def exists(tableName: String): Booleandef create(tableName: String, columnFamilies: Seq[String]): Booleandef drop(tableName: String): Booleandef put(tableName: String, data: util.List[Put]): Boolean } HBase package hbase import org.apache.hadoop.hbase.client.Connection trait HBase {protected var statusCode: Int -1def build(): HBasecase class PoolCon(var available: Boolean, con: Connection) {def out {available falsethis}def in available true}def initPool(): HBasedef finish(): Executor }MyHBase package hbase.implimport hbase.{Executor, HBase} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hbase.client.{ColumnFamilyDescriptorBuilder, ConnectionFactory, Put, TableDescriptorBuilder} import org.apache.hadoop.hbase.exceptions.HBaseException import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}import java.util import scala.collection.mutable.ArrayBufferclass MyHBase (conf: Map[String, String])(pooled: Boolean false, poolSize: Int 3) extends HBase{private lazy val config: Configuration HBaseConfiguration.create()private lazy val pool: ArrayBuffer[PoolCon] ArrayBuffer()override def build(): HBase {if(statusCode -1){conf.foreach(t config.set(t._1, t._2))statusCode 0this}else{throw new HBaseException(build() function must be invoked first)}}override def initPool(): HBase {if(statusCode 0){val POOL_SIZE if (pooled) {if (poolSize 0) 3 else poolSize} else 1for (i - 1 to POOL_SIZE) {pool.append(PoolCon(available true, ConnectionFactory.createConnection(config)))}statusCode 1this}else{throw new HBaseException(initPool() function must be invoked only after build())}}override def finish(): Executor {if (statusCode 1) {statusCode 2new Executor {override def exists(tableName: String): Boolean {var pc: PoolCon nulltry{pc getConval exists pc.con.getAdmin.tableExists(TableName.valueOf(tableName))pc.inexists}catch {case e: Exception e.printStackTrace()false}finally {close(pc)}}override def create(tableName: String, columnFamilies: Seq[String]): Boolean {if (exists(tableName)) {return false}var pc: PoolCon nulltry {pc getConval builder: TableDescriptorBuilder TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName))columnFamilies.foreach(cf builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(cf)))pc.con.getAdmin.createTable(builder.build())true} catch {case e: Exception e.printStackTrace()false} finally {close(pc)}}override def drop(tableName: String): Boolean {if(!exists(tableName)){return false}var pc: PoolCon nulltry {pc getConpc.con.getAdmin.deleteTable(TableName.valueOf(tableName))true} catch {case e: Exception e.printStackTrace()false} finally {close(pc)}}override def put(tableName: String, data: util.List[Put]): Boolean {if(!exists(tableName)){return false}var pc: PoolCon nulltry {pc getConpc.con.getTable(TableName.valueOf(tableName)).put(data)true} catch {case e: Exception e.printStackTrace()false} finally {close(pc)}}}}else {throw new HBaseException(finish() function must be invoked only after initPool())}}private def getCon {val left: ArrayBuffer[PoolCon] pool.filter(_.available)if (left.isEmpty) {throw new HBaseException(no available connection)}left.apply(0).out}private def close(con: PoolCon) {if (null ! con) {con.in}} }object MyHBase{def apply(conf: Map[String, String])(poolSize: Int): MyHBase new MyHBase(conf)(true, poolSize) } Jdbc package mysql import java.sql.{Connection, DriverManager, ResultSet, SQLException} import java.util object Jdbc {object Result extends Enumeration {val EX Value(0) val DML Value(1) val DQL Value(2) }// 3种结果(异常DMLDQL)封装case class ResThree(rst: Result.Value) {def to[T : ResThree]: T this.asInstanceOf[T]}class Ex(throwable: Throwable) extends ResThree(Result.EX)object Ex {def apply(throwable: Throwable): Ex new Ex(throwable)}class Dml(affectedRows: Int) extends ResThree(Result.DML) {def update affectedRows}object Dml {def apply(affectedRows: Int): Dml new Dml(affectedRows)}class Dql(set: ResultSet) extends ResThree(Result.DQL) {def generate[T](f: ResultSet T) {val list: util.List[T] new util.ArrayList()while (set.next()) {list.add(f(set))}list}}object Dql {def apply(set: ResultSet): Dql new Dql(set)}// JDBC 函数封装def jdbc(url: String, user: String, password: String)(sql: String, params: Seq[Any] null): ResThree {def con() {// 1.1 显式加载 JDBC 驱动程序只需要一次Class.forName(com.mysql.cj.jdbc.Driver)// 1.2 创建连接对象DriverManager.getConnection(url, user, password)}def pst(con: Connection) {// 2.1 创建执行对象val pst con.prepareStatement(sql)// 2.2 初始化 SQL 参数if (null ! params params.nonEmpty) {params.zipWithIndex.foreach(t pst.setObject(t._2 1, t._1))}pst}try {val connect con()val prepared pst(connect)sql match {case sql if sql.matches(^(insert|INSERT|delete|DELETE|update|UPDATE) .*) Dml(prepared.executeUpdate())case sql if sql.matches(^(select|SELECT) .*) Dql(prepared.executeQuery())case _ Ex(new SQLException(sillegal sql command : $sql))}} catch {case e: Exception Ex(e)}}}Test import hbase.impl.MyHBase import mysql.Jdbc._ import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.util.Bytes import java.utilobject Test {def main(args: Array[String]): Unit {// 初始化MySQL JDBC操作函数val jdbcOpr: (String, Seq[Any]) ResThree jdbc(user root,url jdbc:mysql://localhost:3306/test_db_for_bigdata,password 123456)// 执行SQL查询并将结果封装在ResThree对象中val toEntity: ResThree jdbcOpr(select * from test_table_for_hbase where test_id between ? and ?,Seq(2, 4))// 判断ResThree对象中的结果是否为异常if (toEntity.rst Result.EX) {// 如果异常执行异常结果处理toEntity.to[Ex]println(出现异常结果处理)} else {// 如果没有异常将查询结果转换为HBase的Put对象列表val puts: util.List[Put] toEntity.to[Dql].generate(rst {// 创建一个Put对象表示HBase中的一行val put new Put(Bytes.toBytes(rst.getInt(test_id)), // row key设置为test_idSystem.currentTimeMillis() // 设置时间戳)// 向Put对象中添加列值// baseInfo是列族名test_name、test_age、test_gender、test_phone是列名put.addColumn(Bytes.toBytes(baseInfo), Bytes.toBytes(test_name),Bytes.toBytes(rst.getString(test_name)))put.addColumn(Bytes.toBytes(baseInfo), Bytes.toBytes(test_age),Bytes.toBytes(rst.getString(test_age)) // 注意这里假设test_age是字符串类型但通常应为整数类型)put.addColumn(Bytes.toBytes(baseInfo), Bytes.toBytes(test_gender),Bytes.toBytes(rst.getString(test_gender)))put.addColumn(Bytes.toBytes(baseInfo), Bytes.toBytes(test_phone),Bytes.toBytes(rst.getString(test_phone)))// 返回构建好的Put对象put})// 如果有数据需要插入HBaseif (puts.size() 0) {// 初始化HBase连接池并执行Put操作val exe MyHBase(Map(hbase.zookeeper.quorum - single01:2181))(1).build().initPool().finish()// 执行Put操作并返回是否成功val bool exe.put(hbase_test:tranfer_from_mysql, puts)// 打印操作结果println(bool)} else {// 如果没有数据需要插入println(查无数据)}}} }
http://www.dnsts.com.cn/news/9078.html

相关文章:

  • 网站开发培训 价格设计一个网站策划书
  • 网站论坛 备案免费网站搭建系统
  • 淘宝app官网软件优化网站
  • 网站开发调研问卷建婚恋网站需要多少钱
  • 企业做网站维护价格一个新手怎么做电商运营
  • 上海 企矩 网站建设网站的定位分析
  • 后台网站设计php网站开发视频网站
  • 快速建站用什么网站外链建设可以提升网站权重对吗
  • 建设英文网站wordpress解析优化
  • 卫浴网站模板wordpress算数验证
  • 美团网站建设总体需求与目标旅游网站建设属于什么以及学科
  • wordpress custom post type朝阳区seo
  • 网络推广最好的网站有哪些商丘网站建设制作
  • 深圳做装修网站费用多少钱网站建设教学视频百度云盘
  • 菜鸟必读 网站被入侵后需做的检测 1seo技术秋蝉
  • 常用网站开发工具洛阳市住房与城乡建设部网站
  • 邢台网站设计哪家专业福州搜索优化网站
  • 三九集团如何进行网站建设wordpress页面调用
  • 网站做跳转教程做网站需要申请商标哪些类目
  • 企业网站建设收费汉川建设局网站
  • 获得网站管理员密码犀牛云做网站一年多少钱
  • 电子商务网站设计与...钓鱼网站下载安装
  • 竞猜网站开发香烟网上商城
  • 做网络传销网站犯法吗深圳企业vi设计公司
  • 四川在线北京seo网站推广费用
  • 网页设计茶叶网站建设吉林电商的网络推广
  • 功能性的网站归档系统哈尔滨设计网站建设
  • 做彩票网站服务器hide my wordpress
  • 网站备案必须做前置审批吗江门网站推广哪家好
  • 怎么在子域名建立一个不同的网站wordpress 不同文章不同模板