网站建设方案评审,wordpress与论坛,2345电脑版网址导航,无锡建设信息中心网站#xff08;1#xff09;概述#xff1a; Sink 不断地轮询 Channel 中的事件且批量地移除它们#xff0c;并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent。 Sink 是完全事务性的。在从 Channel 批量删除数据之前#xff0c;每个 Sink 用 Chan…1概述 Sink 不断地轮询 Channel 中的事件且批量地移除它们并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent。 Sink 是完全事务性的。在从 Channel 批量删除数据之前每个 Sink 用 Channel 启动一个事务。批量事件一旦成功写出到存储系统或下一个 Flume AgentSink 就利用 Channel 提交事务。事务一旦被提交该 Channel 从自己的内部缓冲区删除事件。 Sink 组件目的地包括 hdfs、logger、avro、thrift、ipc、file、null、HBase、solr、自定义。官方提供的 Sink 类型已经很多但是有时候并不能满足实际开发当中的需求此时我们就需要根据实际需求自定义某些 Sink。 自定义 sink 的接口https://flume.apache.org/releases/content/1.11.0/FlumeDeveloperGuide.html#sink MySink 需要继承 AbstractSink 类并实现 Configurable 接口。 实现相应方法 configure(Context context)//初始化 context读取配置文件内容 process()//从 Channel 读取获取数据event这个方法将被循环调用。 适用于读取 Channel 数据写入 MySQL 或者其他文件系统。
2需求 使用 flume 接收数据并在 Sink 端给每条数据添加前缀和后缀输出到控制台。前后缀可在 flume 任务配置文件中配置。
3分析
步骤 1创建一个 maven 项目并引入以下pom依赖。
dependencygroupIdorg.apache.flume/groupIdartifactIdflume-ng-core/artifactIdversion1.9.0/version
/dependency2自定义MySink 继承 AbstractSink 类并实现 Configurable 接口并打包将jar包放到/opt/module/flume-1.9.0/lib目录下。
package com.study.sink;import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class MySink extends AbstractSink implements Configurable
{//声明前后缀private String perfix;private String subfix;//创建logger对象用于打印到控制台private Logger logger LoggerFactory.getLogger(MySink.class);Overridepublic void configure(Context context) {perfix context.getString(per,per-);subfix context.getString(sub,-sub);}Overridepublic Status process() throws EventDeliveryException {//1.获取channel并开启事务Channel channel getChannel();Transaction transaction channel.getTransaction();transaction.begin();//2.从channel中抓取数据打印到控制台try {//2.1抓取数据//创建事件Event event;while(true){//防止空数据event channel.take();if (event ! null)break;}//2.2处理事件logger.info(perfixnew String(event.getBody())subfix);//2.3提交事务transaction.commit();return Status.READY;} catch (Exception e) {e.printStackTrace();//回滚transaction.rollback();return Status.BACKOFF;} finally {transaction.close();}}
}
3在/opt/module/flume-1.9.0/job下创建文件夹group6在该文件夹下创建配置文件netcat-flume-mysink.conf。
# Name the components on this agent
#组件声明
a1.sources r1
a1.sinks k1
a1.channels c1# Describe/configure the source
a1.sources.r1.type netcat
a1.sources.r1.bind localhost
a1.sources.r1.port 44444# Describe the sink
a1.sinks.k1.type com.study.sink.MySink
a1.sinks.per per
a1.sinks.sub sub# Use a channel whichbuffers events in memory
a1.channels.c1.type memory
a1.channels.c1.capacity 1000
a1.channels.c1.transactionCapacity 100# Bind the source and sink to the channel
a1.sources.r1.channels c1
a1.sinks.k1.channel c14开启任务
bin/flume-ng agent -c conf/ -n a1 job/group6/netcat-flume-mysink.conf -Dflume.root.loggerINFO,console5开启端口并发送消息
nc localhost 444446结果