当前位置: 首页 > news >正文

自己个人的网站怎么设计劳务公司

自己个人的网站怎么设计,劳务公司,博兴建设局网站,天津市网站制作建设推广公司需求 1、将Flume采集到的日志数据也同步保存到MySQL中一份#xff0c;但是Flume目前不支持直接向MySQL中写数据#xff0c;所以需要用到自定义Sink#xff0c;自定义一个MysqlSink。 2、日志数据默认在Linux本地的/data/log/user.log日志文件中#xff0c;使用Flume采集到…需求 1、将Flume采集到的日志数据也同步保存到MySQL中一份但是Flume目前不支持直接向MySQL中写数据所以需要用到自定义Sink自定义一个MysqlSink。 2、日志数据默认在Linux本地的/data/log/user.log日志文件中使用Flume采集到MySQL中到user中。 3、user.log的数据格式如下 2020-01-01 01:10:23,tom,18,beijing 2020-01-01 01:12:09,jack,20,shanghai 2020-01-01 01:13:17,jessic,15,guangzhou 4、mysql中的user表结构如下 CREATE TABLE user (id int(11) NOT NULL AUTO_INCREMENT,name varchar(255),age int(11),city varchar(255),create_time datetime(0),PRIMARY KEY (id) ); 实现  鉴于此可以使用 Exec Source File Channel Custom Mysql Sink 来实现。官方文档如下 Exec Source: https://flume.apache.org/releases/content/1.11.0/FlumeUserGuide.html#exec-sourceFile Channel: https://flume.apache.org/releases/content/1.11.0/FlumeUserGuide.html#file-channelCustom Sink: https://flume.apache.org/releases/content/1.11.0/FlumeUserGuide.html#custom-sink https://flume.apache.org/releases/content/1.11.0/FlumeDeveloperGuide.html#sink 创建工程 引入依赖 主要是 flume-ng-core 和 mysql-connector-java 依赖其他可不引入。 ?xml version1.0 encodingUTF-8? project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdcom.example/groupIdartifactIdflume-demo/artifactIdversion1.0-SNAPSHOT/versionpropertiesmaven.compiler.source8/maven.compiler.sourcemaven.compiler.target8/maven.compiler.targetproject.build.sourceEncodingUTF-8/project.build.sourceEncoding/propertiesdependenciesdependencygroupIdorg.apache.flume/groupIdartifactIdflume-ng-core/artifactIdversion1.9.0/version/dependencydependencygroupIdcom.fasterxml.jackson.core/groupIdartifactIdjackson-databind/artifactIdversion2.15.4/version/dependency !-- dependency-- !-- groupIdcom.alibaba/groupId-- !-- artifactIdfastjson/artifactId-- !-- version2.0.25/version-- !-- /dependency-- !-- dependency-- !-- groupIdcn.hutool/groupId-- !-- artifactIdhutool-core/artifactId-- !-- version5.8.27/version-- !-- /dependency--dependencygroupIdorg.slf4j/groupIdartifactIdslf4j-api/artifactIdversion1.7.10/version/dependencydependencygroupIdorg.slf4j/groupIdartifactIdslf4j-log4j12/artifactIdversion1.7.10/version/dependencydependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactIdversion8.0.33/version/dependency/dependencies/project 编写 Custom Sink package com.example.flumedemo.sink;import com.google.common.base.Charsets; import com.google.common.base.Preconditions; import org.apache.flume.*; import org.apache.flume.conf.Configurable; import org.apache.flume.sink.AbstractSink; import org.slf4j.Logger; import org.slf4j.LoggerFactory;import java.sql.*; import java.util.ArrayList; import java.util.List;/*** 自定义Sink实现将数据写入到mysql。* p* 注意* 1、编写完成打包后需要把当前jar包和mysql驱动包放到flume下的lib目录下。* 2、linux直接连linux上的mysql最好不要连win上的mysql了避坑。** author liaorj* date 2024/11/14*/ public class MySink extends AbstractSink implements Configurable {private static final Logger logger LoggerFactory.getLogger(MySink.class);private String mysqlUrl;private String username;private String password;private String tableName;//表字段逗号分割。需要和Event body中的数据对应。private String tableFields;Overridepublic void configure(Context context) {mysqlUrl context.getString(mysqlUrl);Preconditions.checkNotNull(mysqlUrl, mysqlUrl required);username context.getString(username);Preconditions.checkNotNull(username, username required);password context.getString(password);Preconditions.checkNotNull(password, password required);tableName context.getString(tableName);Preconditions.checkNotNull(tableName, tableName required);tableFields context.getString(tableFields);Preconditions.checkNotNull(tableFields, tableFields required);}Overridepublic Status process() throws EventDeliveryException {Status status null;//开启事务Channel ch getChannel();Transaction txn ch.getTransaction();txn.begin();Event event null;while (true) {event ch.take();if (event ! null) {break;}}Connection conn null;PreparedStatement stmt null;try {//获取body中的数据String body new String(event.getBody(), Charsets.UTF_8);//如果这两个数组大小不一样则抛异常String[] bodySplit body.split(,);String[] fieldsSplit tableFields.split(,);if (bodySplit.length ! fieldsSplit.length) {//字段数对不上throw new Exception(the number of tableFields is incorrect);}//根据字段数生成对应的问号ListString questionMarkList new ArrayList();for (int i 0; i fieldsSplit.length; i) {questionMarkList.add(?);}String questionMarks String.join(,, questionMarkList);//生成sql并插入数据String formatSql String.format(insert into %s(%s) values(%s), tableName, tableFields, questionMarks);logger.info(-----formatSql{}, formatSql);logger.info(-----mysqlUrl{}, username{}, password{}, mysqlUrl, username, password);DriverManager.registerDriver(new com.mysql.cj.jdbc.Driver());conn DriverManager.getConnection(mysqlUrl, username, password);stmt conn.prepareStatement(formatSql);for (int i 0; i bodySplit.length; i) {stmt.setString(i 1, bodySplit[i]);}stmt.executeUpdate();txn.commit();status Status.READY;} catch (Throwable t) {//异常则回滚txn.rollback();status Status.BACKOFF;if (t instanceof Error) {throw (Error) t;} else {throw new EventDeliveryException(t);}} finally {//关闭事务txn.close();//关闭PrepareStatement预处理if (stmt ! null) {try {stmt.close();} catch (SQLException e) {e.printStackTrace();}}//关闭Connection连接if (conn ! null) {try {conn.close();} catch (SQLException e) {e.printStackTrace();}}}return status;} }打包 mvn clean mvn package 打包好后需要把当前jar包和mysql驱动包一起上传到linux上的flume目录下的lib目录中否则会报错驱动找不到。 配置文件 创建配置文件 然后在flume目录下的conf目录下创建配置文件file-to-mysql.conf内容如下注意mysqlUrl/username/password 要修改成自己的。 # example.conf: A single-node Flume configuration# Name the components on this agent a1.sources r1 a1.sinks k1 a1.channels c1# Describe/configure the source a1.sources.r1.type exec a1.sources.r1.command tail -F /data/log/user.log# Describe the sink,custom sink to mysql a1.sinks.k1.type com.example.flumedemo.sink.MySink a1.sinks.k1.mysqlUrl jdbc:mysql://192.168.163.128:3306/flume_demo?useUnicodetruecharacterEncodingUTF-8zeroDateTimeBehaviorconvertToNullallowMultiQueriestrue a1.sinks.k1.username root a1.sinks.k1.password toor a1.sinks.k1.tableName user a1.sinks.k1.tableFields create_time,name,age,city# Use a channel which buffers events in memory a1.channels.c1.type file a1.channels.c1.checkpointDir /data/user/checkpointDir a1.channels.c1.dataDirs /data/user/dataDirs# Bind the source and sink to the channel a1.sources.r1.channels c1 a1.sinks.k1.channel c1 启动flume 切换到flume目录执行 bin/flume-ng agent --name a1 --conf conf --conf-file conf/file-to-mysql.conf -Dflume.root.loggerINFO,console 测试结果 查看flume控制台日志 查看mysql user表已插入数据
http://www.dnsts.com.cn/news/218567.html

相关文章:

  • 衡水安徽学校网站建设小门户网站开发
  • 网站建设犀牛云成都网站建设怎么样
  • 哈尔滨做网站搭建的网站设计 广州
  • 兰州新区建站亚运村网站建设
  • 商城购物网站建设wordpress主机和xampp
  • 购票网站模板国防教育网站建设说明书
  • 就业指导中心网站建设总结wordpress 主机和域名
  • mockpuls可以做网站吗网上花店网站建设规划书
  • 网站改版 建设方案soho做网站要写品牌吗
  • 淘宝客怎么建立网站快速搭建房屋
  • 平阳网站优化网站被挂黑链怎么删除
  • 东莞在线网站制作平台沈阳专业做网站方案
  • 淘宝客导购网站网站中下拉列表框怎么做
  • 做网站要买什么类型云空间有域名自己做网站
  • 设计好的单位网站开发wordpress只让文章标题
  • 长安做网站价格oa管理系统免费版
  • 做网站的怎么找客户中小网站 架构
  • 济南做网站创意培训计划方案
  • 网站建设青岛外贸网站建设网站优化
  • 大丰做网站价格做公司网站和设计logo
  • 有经验的江苏网站建设高州网站seo
  • 山西做网站的他达拉非是什么药
  • 学校网站官网wordpress update_post_meta
  • 网站做双拼域名什么意思wordpress 文章多图
  • 上海网站seo外包杭州网站建设哪家强
  • 西红门网站建设公司白酒包装设计网站
  • 网站设计创新点怎么写奇墙网站建设
  • 河南省住房城乡建设厅网站本地搭建wordpress
  • 做企业网站排名优化要多少钱官网建设公司
  • 专业官方网站建设手机网站建设过程