人设生成器网站,深圳网站建设公司小江,如何办网站 论坛,西安最新招聘信息今天目录 1 Flume 概述1.1 Flume 定义1.2 Flume 基础架构 2 Flume 安装3 Flume 入门案例3.1 监控端口数据3.2 实时监控单个追加文件3.3 实时监控目录下多个新文件3.4 实时监控目录下的多个追加文件 4 Flume 进阶4.1 Flume 事务4.2 Flume Agent 内部原理4.3 Flume 拓扑结构4.3.1 简单… 目录 1 Flume 概述1.1 Flume 定义1.2 Flume 基础架构 2 Flume 安装3 Flume 入门案例3.1 监控端口数据3.2 实时监控单个追加文件3.3 实时监控目录下多个新文件3.4 实时监控目录下的多个追加文件 4 Flume 进阶4.1 Flume 事务4.2 Flume Agent 内部原理4.3 Flume 拓扑结构4.3.1 简单串联4.3.2 复制和多路复用4.3.3 负载均衡和故障转移4.3.4 聚合 4.4 企业开发案例4.4.1 复制4.4.2 负载均衡4.4.3 故障转移4.4.4 聚合 4.5 自定义 Interceptor4.6 自定义 Source4.7 自定义 Sink4.8 Flume 数据流监控4.8.1 Ganglia 的安装与部署4.8.2 操作 Flume 测试监控 1 Flume 概述
1.1 Flume 定义 Flume 是 Cloudera 公司提供的一个 高可用 的 高可靠 的分布式 的 海量日志采集、聚合 和 传输 的系统。Flume 基于流式架构灵活简单。 这里的日志不是指框架工作运行的日志而是跟业务相关的日志数据如用户行为数据等 Flume 最主要的作用就是实时读取服务器本地磁盘的数据将数据写入到 HDFS。
1.2 Flume 基础架构 Flume 组成架构如下图所示。 1Agent
Agent 是一个JVM 进程它以 事件 的形式将数据从源头送至目的地。 Agent 主要有 3 个部分组成Source、Channel、Sink。
2Source
Source 是负责接收数据到 Flume Agent 的组件。Source 组件可以处理各种类型、各种格式的日志数据包括 avro、thrift、exec、jms、spooling directory、netcat、 taildir 、sequence generator、syslog、http、legacy。 Flume中有两种source Pullable SourceTailDirSource就是这种Source这种Source是主动拉取数据而不是由数据源推送过来的这种Source在回滚等待的过程中source不会继续拉取数据。Eventdriven Source这种Source中的数据是由数据源主动不停的提交数据在事务回滚的时候会停止接收数据这时有可能会产生数据丢失这种丢失并不是发生在Flume内部而是发生在Flume和数据源之间。 3Sink
Sink 不断地轮询 Channel 中的事件且批量地移除它们并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent。
Sink 组件目的地包括 hdfs、logger、avro、thrift、ipc、file、HBase、solr、自定义。 Flume 与 Flume 之间使用 avro 4Channel
Channel 是位于 Source 和 Sink 之间的 缓冲区。因此Channel 允许 Source 和Sink 运作在不同的速率上。Channel 是线程安全的可以同时处理几个 Source 的写入操作和几个 Sink 的读取操作。
Flume 自带两种 ChannelMemory Channel 和 File Channel。
Memory Channel 是内存中的队列。Memory Channel 在不需要关心数据丢失的情景下适用。如果需要关心数据丢失那么 Memory Channel 就不应该使用因为程序死亡、机器宕机或者重启都会导致数据丢失。File Channel 将所有事件写到磁盘。因此在程序关闭或机器宕机的情况下不会丢失数据。
5Event
Event 是传输单元Flume 数据传输的基本单元以 Event 的形式将数据从源头送至目的地。Event 由 Header 和 Body 两部分组成Header 用来存放该 event 的一些属性为 K-V 结构Body 用来存放该条数据形式为 字节数组。 2 Flume 安装 请移步 Flume 安装与部署
3 Flume 入门案例 Flume 官方文档
3.1 监控端口数据 案例需求
使用 Flume 监听一个端口收集该端口数据并打印到控制台。
需求分析
确定每一个组件的类型 实现步骤
1在 flume 目录下创建 jobs 文件夹并进入 jobs 文件夹
[huweihadoop101 ~]$ cd /opt/module/flume-1.9.0/
[huweihadoop101 flume-1.9.0]$ mkdir jobs
[huweihadoop101 flume-1.9.0]$ cd jobs2新建并编辑 flume-netcat-logger.conf 文件
[huweihadoop101 jobs]$ vim flume-netcat-logger.conf添加如下内容
# Named
# a1表示agent的名称
a1.sources r1 # r1表示a1的Source的名称
a1.channels c1 # c1表示a1的channel的名称
a1.sinks k1 # k1表示a1的Sink的名称# Source
a1.sources.r1.type netcat # 表示a1的输入源类型为netcat端口类型
a1.sources.r1.bind localhost # 表示a1的监听主机
a1.sources.r1.port 6666 # 表示a1的监听端口号# Channel
a1.channels.c1.type memory # 表示a1的channel类型是memory内存型
a1.channels.c1.capacity 10000 # 表示a1的channel总容量是10000个event
a1.channels.c1.transactionCapacity 100 # 表示a1的channel传输时收集到了100条event以后再去提交事务# Sink
a1.sinks.k1.type logger # 表示a1的输出目的地是控制台logger类型# Bind
a1.sources.r1.channels c1 # 表示将r1和c1连接起来
a1.sinks.k1.channel c1 # 表示将k1和c1连接起来3安装 netcat 工具实现客户端发送数据到端口
[huweihadoop101 jobs]$ sudo yum install -y nc4开启 flume 监听端口
[huweihadoop101 jobs]$ flume-ng agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/jobs/flume-netcat-logger.conf --name a1也可以简写为
[huweihadoop101 jobs]$ flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/flume-netcat-logger.conf -n a1Logger Sink 本质上就是 log4j 的实现默认是往文件里存日志 5使用 netcat 工具向本机的 6666 端口发送内容
重新开启一个 shell 窗口
[huweihadoop101 ~]$ nc localhost 66666查看日志
jobs 文件夹下会多一个 logs 文件夹
[huweihadoop101 jobs]$ cd /opt/module/flume-1.9.0/jobs/logs/
[huweihadoop101 logs]$ cat flume.log7设置将日志打印到控制台
[huweihadoop101 flume-1.9.0]$ cd conf
[huweihadoop101 conf]$ vim log4j.properties但是上述通过配置文件将日志打印控制台的方式并不推荐因为我们并不是每次都需要打印控制台推荐指定参数动态修改
修改4中的命令添加参数 -Dflume.root.loggerINFO,console
[huweihadoop101 jobs]$ flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/flume-netcat-logger.conf -n a1 -Dflume.root.loggerINFO,console此时重新启动5中的窗口再次发送数据就可以在监听窗口的控制台看到发送的数据了
3.2 实时监控单个追加文件 案例需求
实时 监控单个追加文件并将监控到的内容上传到 HDFS 中
需求分析
确定每一个组件的类型 实现步骤
1在 flume 目录下 jobs 文件夹下新建一个要监控文件
[huweihadoop101 jobs]$ touch tail.txt2新建并编辑 flume-exec-hdfs.conf 文件
[huweihadoop101 jobs]$ vim flume-exec-hdfs.conf添加如下内容
#Named
a1.sources r1
a1.channels c1
a1.sinks k1 #Source
a1.sources.r1.type exec
a1.sources.r1.command tail -f /opt/module/flume-1.9.0/jobs/tail.txt#Channel
a1.channels.c1.type memory
a1.channels.c1.capacity 10000
a1.channels.c1.transactionCapacity 100#Sink
a1.sinks.k1.type hdfs
a1.sinks.k1.hdfs.path hdfs://hadoop101:9820/flume/%Y%m%d/%H
#上传文件的前缀
a1.sinks.k1.hdfs.filePrefix logs-
#是否按照时间滚动文件夹
a1.sinks.k1.hdfs.round true
#多少时间单位创建一个新的文件夹
a1.sinks.k1.hdfs.roundValue 1
#重新定义时间单位
a1.sinks.k1.hdfs.roundUnit hour
#是否使用本地时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp true
#积攒多少个Event才flush到HDFS一次
a1.sinks.k1.hdfs.batchSize 100
#设置文件类型可支持压缩
a1.sinks.k1.hdfs.fileType DataStream
#多久生成一个新的文件
a1.sinks.k1.hdfs.rollInterval 60
#设置每个文件的滚动大小
a1.sinks.k1.hdfs.rollSize 134217700
#文件的滚动与Event数量无关
a1.sinks.k1.hdfs.rollCount 0#Bind
a1.sources.r1.channels c1
a1.sinks.k1.channel c1这里 Sink 的端口号是 9820跟着教程走写的 8020报错了参考 flume 中sink用hdfs sink报拒绝连接错误hdfs-io 3开启 flume 监听端口
[huweihadoop101 ~]$ flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/flume-exec-hdfs.conf -n a1 -Dflume.root.loggerINFO,console建议加上-Dflume.root.loggerINFO,console方便直观地观察 4向监控文件追加数据
[huweihadoop101 ~]$ cd /opt/module/flume-1.9.0/jobs/
[huweihadoop101 jobs]$ echo a tail.txt
[huweihadoop101 jobs]$ echo b tail.txt观察 hdfs 文件系统 http://hadoop101:9870 3.3 实时监控目录下多个新文件 案例需求
实时监控目录下多个新文件并上传到 HDFS 。
需求分析
确定每一个组件的类型 实现步骤
1在 flume 目录下 jobs 文件夹下新建一个要监控目录
[huweihadoop101 jobs]$ mkdir spooling2新建并编辑 flume-spooling-hdfs.conf 文件
[huweihadoop101 jobs]$ vim flume-spooling-hdfs.conf添加如下内容
#Named
a1.sources r1
a1.channels c1
a1.sinks k1 #Source
a1.sources.r1.type spooldir
a1.sources.r1.spoolDir /opt/module/flume-1.9.0/jobs/spooling
a1.sources.r1.fileSuffix .COMPLETED
a1.sources.r1.ignorePattern .*\.tmp # 忽略后缀名是.tmp的文件#Channel
a1.channels.c1.type memory
a1.channels.c1.capacity 10000
a1.channels.c1.transactionCapacity 100#Sink
a1.sinks.k1.type hdfs
a1.sinks.k1.hdfs.path hdfs://hadoop101:9820/flume/%Y%m%d/%H
#上传文件的前缀
a1.sinks.k1.hdfs.filePrefix logs-
#是否按照时间滚动文件夹
a1.sinks.k1.hdfs.round true
#多少时间单位创建一个新的文件夹
a1.sinks.k1.hdfs.roundValue 1
#重新定义时间单位
a1.sinks.k1.hdfs.roundUnit hour
#是否使用本地时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp true
#积攒多少个Event才flush到HDFS一次
a1.sinks.k1.hdfs.batchSize 100
#设置文件类型可支持压缩
a1.sinks.k1.hdfs.fileType DataStream
#多久生成一个新的文件
a1.sinks.k1.hdfs.rollInterval 60
#设置每个文件的滚动大小
a1.sinks.k1.hdfs.rollSize 134217700
#文件的滚动与Event数量无关
a1.sinks.k1.hdfs.rollCount 0#Bind
a1.sources.r1.channels c1
a1.sinks.k1.channel c1 3开启 flume 监听端口
[huweihadoop101 ~]$ flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/flume-spooling-hdfs.conf -n a1 -Dflume.root.loggerINFO,console4新建文件向监控目录中逐个增加文件
[huweihadoop101 jobs]$ touch file1.txt
[huweihadoop101 jobs]$ touch file2.txt
[huweihadoop101 jobs]$ echo file1 file1.txt
[huweihadoop101 jobs]$ echo file2 file2.txt
[huweihadoop101 jobs]$ mv file1.txt ./spooling/
[huweihadoop101 jobs]$ mv file2.txt ./spooling/观察 hdfs 文件系统 http://hadoop101:9870
5再去看 spooling 文件夹下的文件
文件已被添加后缀 .COMPLETED用来区分是不是新文件当添加的文件后缀本身就是.COMPLETEDflume 就不会认为它是新文件就不会采集
[huweihadoop101 jobs]$ cd spooling/
[huweihadoop101 spooling]$ ll
总用量 8
-rw-rw-r--. 1 huwei huwei 6 12月 17 15:20 file1.txt.COMPLETED
-rw-rw-r--. 1 huwei huwei 6 12月 17 15:20 file2.txt.COMPLETED3.4 实时监控目录下的多个追加文件 案例需求
实时监控目录下多个追加文件将内容上传到 HDFS 中。
需求分析
确定每一个组件的类型 实现步骤
1在 flume 目录下 jobs 文件夹下新建一个要监控目录并在其中创建一些文件
[huweihadoop101 jobs]$ mkdir taildir
[huweihadoop101 jobs]$ cd taildir
[huweihadoop101 taildir]$ touch file1.txt
[huweihadoop101 taildir]$ touch file2.txt
[huweihadoop101 taildir]$ touch log1.log
[huweihadoop101 taildir]$ touch log2.log2新建一个存放每一个文件所采集到数据的位置的文件夹
[huweihadoop101 jobs]$ mkdir position3新建并编辑 flume-taildir-hdfs.conf 文件
[huweihadoop101 jobs]$ vim flume-taildir-hdfs.conf添加如下内容
#Named
a1.sources r1
a1.channels c1
a1.sinks k1 #Source
a1.sources.r1.type TAILDIR
a1.sources.r1.filegroups f1 f2 # 将文件分组
a1.sources.r1.filegroups.f1 /opt/module/flume-1.9.0/jobs/taildir/.*\.txt # f1组负责监控.txt文件
a1.sources.r1.filegroups.f2 /opt/module/flume-1.9.0/jobs/taildir/.*\.log # f2组负责监控.log文件
a1.sources.r1.positionFile /opt/module/flume-1.9.0/jobs/position/position.json # 断点续传#Channel
a1.channels.c1.type memory
a1.channels.c1.capacity 10000
a1.channels.c1.transactionCapacity 100#Sink
a1.sinks.k1.type hdfs
a1.sinks.k1.hdfs.path hdfs://hadoop101:9820/flume/%Y%m%d/%H
#上传文件的前缀
a1.sinks.k1.hdfs.filePrefix logs-
#是否按照时间滚动文件夹
a1.sinks.k1.hdfs.round true
#多少时间单位创建一个新的文件夹
a1.sinks.k1.hdfs.roundValue 1
#重新定义时间单位
a1.sinks.k1.hdfs.roundUnit hour
#是否使用本地时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp true
#积攒多少个Event才flush到HDFS一次
a1.sinks.k1.hdfs.batchSize 100
#设置文件类型可支持压缩
a1.sinks.k1.hdfs.fileType DataStream
#多久生成一个新的文件
a1.sinks.k1.hdfs.rollInterval 60
#设置每个文件的滚动大小
a1.sinks.k1.hdfs.rollSize 134217700
#文件的滚动与Event数量无关
a1.sinks.k1.hdfs.rollCount 0#Bind
a1.sources.r1.channels c1
a1.sinks.k1.channel c14开启 flume 监听端口
[huweihadoop101 ~]$ flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/flume-taildir-hdfs.conf -n a1 -Dflume.root.loggerINFO,console5开始逐个往监控目录中的文件中追加数据
[huweihadoop101 taildir]$ echo file1 file1.txt
[huweihadoop101 taildir]$ echo file2 file2.txt
[huweihadoop101 taildir]$ echo log1 log1.log
[huweihadoop101 taildir]$ echo log2 log2.log观察 hdfs 文件系统 http://hadoop101:9870
6查看 position.json 文件
[huweihadoop101 position]$ cat position.json{inode:535145,pos:6,file:/opt/module/flume-1.9.0/jobs/taildir/file1.txt},
{inode:535146,pos:6,file:/opt/module/flume-1.9.0/jobs/taildir/file2.txt},
{inode:535147,pos:5,file:/opt/module/flume-1.9.0/jobs/taildir/log1.log},
{inode:535148,pos:5,file:/opt/module/flume-1.9.0/jobs/taildir/log2.log}Taildir Source 维护了一个 json 格式的 position File其会定期的往 position File 中更新每个文件读取到的最新的位置因此能够实现断点续传。 注意Linux中储存文件元数据的区域就叫做 inode每个 inode 都有一个号码操作系统用 inode 号码来识别不同的文件Unix/Linux 系统内部不使用文件名而使用 inode 号码来识别文件。 4 Flume 进阶
4.1 Flume 事务 Put 事务流程
doPut将批数据先写入临时缓冲区 putListdoCommit检查channel 内存队列是否足够doRollbackchannel 内存队列空间不足回滚数据把数据直接丢掉给Source端抛出异常Source端会重新采集这一批数据
Take 事务
doTake将数据提取到临时缓冲区 takeList并将数据发送到 HDFSdoCommit如果数据全部发送成功则清除临时缓冲区 takeListdoRollback数据发送过程中如果出现异常回滚数据将临时缓冲区 takeList 中的数据归还给 channel 内存队列如果 takeList处理到一半出现异常则可能会导致数据重复
4.2 Flume Agent 内部原理 1Channel Selector
ChannelSelector 的作用就是选出 Event 将要被发往哪个 Channel。其共有两种类型分别是 Replicating复制和 Multiplexing多路复用。
ReplicatingChannelSelector 默认会将同一个 Event 发往所有的 ChannelMultiplexingChannelSelector 会根据相应的原则将不同的 Event 发往不同的 Channel。
2SinkProcessor
SinkProcessor 共有三种类型分别是DefaultSinkProcessor、LoadBalancingSinkProcessor和 FailoverSinkProcessor
DefaultSinkProcessor 默认对应的是单个的SinkLoadBalancingSinkProcessor和FailoverSinkProcessor对应的是Sink Group LoadBalancingSinkProcessor可以实现负载均衡的功能FailoverSinkProcessor可以错误恢复的功能对应单个的Sink当该Sink出错选择其他的Sink代替其工作
4.3 Flume 拓扑结构
4.3.1 简单串联 这种模式是将多个 flume 顺序连接起来了从最初的 source 开始到最终 sink 传送的目的存储系统。
此模式不建议桥接过多的 flume 数量 flume 数量过多不仅会影响传输速率而且一旦传输过程中某个节点 flume 宕机会影响整个传输系统。
上游的是客户端下游的是服务端所有启动的时候先启动下游的服务端 简单串联的结构我们一般不用 4.3.2 复制和多路复用 单source多channel、sink Flume 支持将事件流向一个或者多个目的地。这种模式可以将相同数据复制到多个channel 中或者将不同数据分发到不同的 channel 中sink 可以选择传送到不同的目的地。
4.3.3 负载均衡和故障转移 Flume负载均衡或故障转移 Flume 支持使用将多个 sink 逻辑上分到一个 sink 组sink 组配合不同的 SinkProcessor 可以实现负载均衡和错误恢复的功能。
4.3.4 聚合 这种模式是我们最常见的也非常实用日常 web 应用通常分布在上百个服务器大者甚至上千个、上万个服务器。产生的日志处理起来也非常麻烦。用 flume 的这种组合方式能很好的解决这一问题每台服务器部署一个 flume 采集日志传送到一个集中收集日志的 flume再由此 flume 上传到 hdfs、hive、hbase 等进行日志分析。
4.4 企业开发案例
4.4.1 复制 案例需求
使用 Flume-1 监控文件变动Flume-1 将变动内容传递给 Flume-2Flume-2 负责存储到 HDFS。同时 Flume-1 将变动内容传递给 Flume-3Flume-3 负责输出到 Local FileSystem。
需求分析 实现步骤
1在 flume 目录的 jobs 文件夹下 fileroll 文件夹作为 flume3 写入本地文件系统重的路径
[huweihadoop101 ~]$ cd /opt/module/flume-1.9.0/jobs
[huweihadoop101 jobs]$ mkdir fileroll2在 flume 目录的 jobs 文件夹下创建 replication 文件夹
[huweihadoop101 jobs]$ mkdir replication
[huweihadoop101 jobs]$ cd replication3在replication 文件夹中新建并编辑配置文件
flume1.conf
[huweihadoop101 replication]$ vim flume1.conf添加如下内容
#Named
a1.sources r1
a1.channels c1 c2
a1.sinks k1 k2#Source
a1.sources.r1.type TAILDIR
a1.sources.r1.filegroups f1
a1.sources.r1.filegroups.f1 /opt/module/flume-1.9.0/jobs/taildir/.*\.txt
a1.sources.r1.positionFile /opt/module/flume-1.9.0/jobs/position/position.json#channel selector
a1.sources.r1.selector.type replicating#Channel
a1.channels.c1.type memory
a1.channels.c1.capacity 10000
a1.channels.c1.transactionCapacity 100a1.channels.c2.type memory
a1.channels.c2.capacity 10000
a1.channels.c2.transactionCapacity 100#Sink
a1.sinks.k1.type avro
a1.sinks.k1.hostname localhost
a1.sinks.k1.port 7777a1.sinks.k2.type avro
a1.sinks.k2.hostname localhost
a1.sinks.k2.port 8888#Bind
a1.sources.r1.channels c1 c2
a1.sinks.k1.channel c1
a1.sinks.k2.channel c2 flume2.conf
[huweihadoop101 replication]$ vim flume2.conf添加如下内容
a2.sources r1
a2.channels c1
a2.sinks k1 #Source
a2.sources.r1.type avro
a2.sources.r1.bind localhost
a2.sources.r1.port 7777#Channel
a2.channels.c1.type memory
a2.channels.c1.capacity 10000
a2.channels.c1.transactionCapacity 100#Sink
a2.sinks.k1.type hdfs
a2.sinks.k1.hdfs.path hdfs://hadoop101:9820/flume/%Y%m%d/%H
a2.sinks.k1.hdfs.filePrefix logs-
a2.sinks.k1.hdfs.round true
a2.sinks.k1.hdfs.roundValue 1
a2.sinks.k1.hdfs.roundUnit hour
a2.sinks.k1.hdfs.useLocalTimeStamp true
a2.sinks.k1.hdfs.batchSize 100
a2.sinks.k1.hdfs.fileType DataStream
a2.sinks.k1.hdfs.rollInterval 60
a2.sinks.k1.hdfs.rollSize 134217700
a2.sinks.k1.hdfs.rollCount 0#Bind
a2.sources.r1.channels c1
a2.sinks.k1.channel c1 flume3.conf
[huweihadoop101 replication]$ vim flume3.conf添加如下内容
#Named
a3.sources r1
a3.channels c1
a3.sinks k1 #Source
a3.sources.r1.type avro
a3.sources.r1.bind localhost
a3.sources.r1.port 8888#Channel
a3.channels.c1.type memory
a3.channels.c1.capacity 10000
a3.channels.c1.transactionCapacity 100#Sink
a3.sinks.k1.type file_roll
a3.sinks.k1.sink.directory /opt/module/flume-1.9.0/jobs/fileroll#Bind
a3.sources.r1.channels c1
a3.sinks.k1.channel c1 4启动 flume
开启三个 shell 窗口分别启动 flume3、flume2、flume1 注意先启动下游后启动上游 [huweihadoop101 ~]$ flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/replication/flume3.conf -n a3 -Dflume.root.loggerINFO,console
[huweihadoop101 ~]$ flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/replication/flume2.conf -n a2 -Dflume.root.loggerINFO,console
[huweihadoop101 ~]$ flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/replication/flume1.conf -n a1 -Dflume.root.loggerINFO,console5向 Flume-1 的监控文件中追加内容
[huweihadoop101 ~]$ cd /opt/module/flume-1.9.0/jobs/taildir/
[huweihadoop101 taildir]$ echo abcdef file1.txt
[huweihadoop101 taildir]$ echo 123456 file2.txt6查看 flume2、flume3 的输出
观察 hdfs 文件系统 http://hadoop101:9870 观察本地文件系统 每隔30秒就会生成新的文件不管有没有新的数据
4.4.2 负载均衡 案例需求
Flume1 监控端口数据将监控到的内容通过轮询或者随机的方式给到Flume2、Flume3Flume2、Flume3 将内容打印到控制台
需求分析 实现步骤
1在 flume 目录的 jobs 文件夹下创建 loadbalance 文件夹
[huweihadoop101 jobs]$ mkdir loadbalance2在 loadbalance 文件夹中新建并编辑配置文件
flume1.conf
[huweihadoop101 loadbalance]$ vim flume1.conf添加如下内容
#Named
a1.sources r1
a1.channels c1
a1.sinks k1 k2#Source
a1.sources.r1.type netcat
a1.sources.r1.bind localhost
a1.sources.r1.port 6666#channel selector
a1.sources.r1.selector.type replicating#Channel
a1.channels.c1.type memory
a1.channels.c1.capacity 10000
a1.channels.c1.transactionCapacity 100#Sink
a1.sinks.k1.type avro
a1.sinks.k1.hostname localhost
a1.sinks.k1.port 7777a1.sinks.k2.type avro
a1.sinks.k2.hostname localhost
a1.sinks.k2.port 8888#Sink processor
a1.sinkgroups g1
a1.sinkgroups.g1.sinks k1 k2
a1.sinkgroups.g1.processor.type load_balance
a1.sinkgroups.g1.processor.selector random #Bind
a1.sources.r1.channels c1
a1.sinks.k1.channel c1
a1.sinks.k2.channel c1flume2.conf
[huweihadoop101 loadbalance]$ vim flume2.conf添加如下内容
a2.sources r1
a2.channels c1
a2.sinks k1 #Source
a2.sources.r1.type avro
a2.sources.r1.bind localhost
a2.sources.r1.port 7777#Channel
a2.channels.c1.type memory
a2.channels.c1.capacity 10000
a2.channels.c1.transactionCapacity 100#Sink
a2.sinks.k1.type logger#Bind
a2.sources.r1.channels c1
a2.sinks.k1.channel c1 flume3.conf
[huweihadoop101 loadbalance]$ vim flume3.conf添加如下内容
#Named
a3.sources r1
a3.channels c1
a3.sinks k1 #Source
a3.sources.r1.type avro
a3.sources.r1.bind localhost
a3.sources.r1.port 8888#Channel
a3.channels.c1.type memory
a3.channels.c1.capacity 10000
a3.channels.c1.transactionCapacity 100#Sink
a3.sinks.k1.type logger#Bind
a3.sources.r1.channels c1
a3.sinks.k1.channel c1 3启动 flume
开启三个 shell 窗口分别启动 flume3、flume2、flume1 注意先启动下游后启动上游 [huweihadoop101 ~]$ flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/loadbalance/flume3.conf -n a3 -Dflume.root.loggerINFO,console
[huweihadoop101 ~]$ flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/loadbalance/flume2.conf -n a2 -Dflume.root.loggerINFO,console
[huweihadoop101 ~]$ flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/loadbalance/flume1.conf -n a1 -Dflume.root.loggerINFO,console4使用 netcat 工具向本机的 6666 端口发送内容
[huweihadoop101 loadbalance]$ nc localhost 6666观察 flume2、flume3
4.4.3 故障转移 案例需求
Flume1 监控端口数据将监控到的内容发送给 active 的 sinkFlume2、Flume3 将内容打印到控制台
需求分析 实现步骤
1在 flume 目录的 jobs 文件夹下创建 failover 文件夹
[huweihadoop101 jobs]$ mkdir failover2在 failover 文件夹中新建并编辑配置文件
flume1.conf
[huweihadoop101 failover]$ vim flume1.conf添加如下内容
#Named
a1.sources r1
a1.channels c1
a1.sinks k1 k2#Source
a1.sources.r1.type netcat
a1.sources.r1.bind localhost
a1.sources.r1.port 6666#channel selector
a1.sources.r1.selector.type replicating#Channel
a1.channels.c1.type memory
a1.channels.c1.capacity 10000
a1.channels.c1.transactionCapacity 100#Sink
a1.sinks.k1.type avro
a1.sinks.k1.hostname localhost
a1.sinks.k1.port 7777a1.sinks.k2.type avro
a1.sinks.k2.hostname localhost
a1.sinks.k2.port 8888#Sink processor
a1.sinkgroups g1
a1.sinkgroups.g1.sinks k1 k2
a1.sinkgroups.g1.processor.type failover
a1.sinkgroups.g1.processor.priority.k1 5
a1.sinkgroups.g1.processor.priority.k2 10#Bind
a1.sources.r1.channels c1
a1.sinks.k1.channel c1
a1.sinks.k2.channel c1sink 代表优先级的数字越大其优先级就越高 优先级高的就是那个 active 的 sink。这里设置 flume3 的优先级更高即 flume3 的输入为 active 的 sink 输出 flume2.conf
[huweihadoop101 failover]$ vim flume2.conf添加如下内容
a2.sources r1
a2.channels c1
a2.sinks k1 #Source
a2.sources.r1.type avro
a2.sources.r1.bind localhost
a2.sources.r1.port 7777#Channel
a2.channels.c1.type memory
a2.channels.c1.capacity 10000
a2.channels.c1.transactionCapacity 100#Sink
a2.sinks.k1.type logger#Bind
a2.sources.r1.channels c1
a2.sinks.k1.channel c1 flume3.conf
[huweihadoop101 failover]$ vim flume3.conf添加如下内容
#Named
a3.sources r1
a3.channels c1
a3.sinks k1 #Source
a3.sources.r1.type avro
a3.sources.r1.bind localhost
a3.sources.r1.port 8888#Channel
a3.channels.c1.type memory
a3.channels.c1.capacity 10000
a3.channels.c1.transactionCapacity 100#Sink
a3.sinks.k1.type logger#Bind
a3.sources.r1.channels c1
a3.sinks.k1.channel c1 3启动 flume
开启三个 shell 窗口分别启动 flume3、flume2、flume1 注意先启动下游后启动上游 [huweihadoop101 ~]$ flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/failover/flume3.conf -n a3 -Dflume.root.loggerINFO,console
[huweihadoop101 ~]$ flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/failover/flume2.conf -n a2 -Dflume.root.loggerINFO,console
[huweihadoop101 ~]$ flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/failover/flume1.conf -n a1 -Dflume.root.loggerINFO,console4使用 netcat 工具向本机的 6666 端口发送内容
[huweihadoop101 loadbalance]$ nc localhost 6666可以发现只有 flume3 可以接收到发送的数据当 flume3 故障后再发送数据时此时只有 flume2 可以接收到数据再次启动 flume3 数据又要给到 flume3 了因为其优先级更高。 4.4.4 聚合 案例需求
Flume1 hadoop101监控文件内容Flume2 hadoop102监控端口数据Flume1 和 Flume 2 将监控到的数据发往 Flume3hadoop103 Flume3 将内容打印到控制台。
需求分析 实现步骤
1在 flume 目录的 jobs 文件夹下创建 aggre 文件夹
[huweihadoop101 jobs]$ mkdir aggre2在 failover 文件夹中新建并编辑配置文件
flume1.conf
[huweihadoop101 aggre]$ vim flume1.conf添加如下内容
#Named
a1.sources r1
a1.channels c1
a1.sinks k1#Source
a1.sources.r1.type TAILDIR
a1.sources.r1.filegroups f1
a1.sources.r1.filegroups.f1 /opt/module/flume-1.9.0/jobs/taildir/.*\.txt
a1.sources.r1.positionFile /opt/module/flume-1.9.0/jobs/position/position.json#Channel
a1.channels.c1.type memory
a1.channels.c1.capacity 10000
a1.channels.c1.transactionCapacity 100#Sink
a1.sinks.k1.type avro
a1.sinks.k1.hostname hadoop103
a1.sinks.k1.port 8888#Bind
a1.sources.r1.channels c1
a1.sinks.k1.channel c1 flume2.conf
[huweihadoop101 aggre]$ vim flume2.conf添加如下内容
a2.sources r1
a2.channels c1
a2.sinks k1 #Source
a2.sources.r1.type netcat
a2.sources.r1.bind localhost
a2.sources.r1.port 6666#Channel
a2.channels.c1.type memory
a2.channels.c1.capacity 10000
a2.channels.c1.transactionCapacity 100#Sink
a2.sinks.k1.type avro
a2.sinks.k1.hostname hadoop103
a2.sinks.k1.port 8888#Bind
a2.sources.r1.channels c1
a2.sinks.k1.channel c1 flume3.conf
[huweihadoop101 aggre]$ vim flume3.conf添加如下内容
#Named
a3.sources r1
a3.channels c1
a3.sinks k1 #Source
a3.sources.r1.type avro
a3.sources.r1.bind hadoop103
a3.sources.r1.port 8888#Channel
a3.channels.c1.type memory
a3.channels.c1.capacity 10000
a3.channels.c1.transactionCapacity 100#Sink
a3.sinks.k1.type logger#Bind
a3.sources.r1.channels c1
a3.sinks.k1.channel c1 3向其他机器分发 flume
[huweihadoop101 ~]$ xsync /opt/module/flume-1.9.0这里使用的是大数据技术学习笔记三—— Hadoop 的运行模式中编写集群分发脚本 xsync 同时发送环境变量配置
[huweihadoop101 ~]$ sudo xsync /etc/profile.d/my_env.sh使得环境变量生效
[huweihadoop102 ~]$ source /etc/profile
[huweihadoop103 ~]$ source /etc/profile3启动 flume
在 hadoop103、hadoop102、hadoop101分别启动 flume3、flume2、flume1 注意先启动下游后启动上游 [huweihadoop103 ~]$ flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/aggre/flume3.conf -n a3 -Dflume.root.loggerINFO,console
[huweihadoop102 ~]$ flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/aggre/flume2.conf -n a2 -Dflume.root.loggerINFO,console
[huweihadoop101 ~]$ flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/aggre/flume1.conf -n a1 -Dflume.root.loggerINFO,console后续测试同前文不再赘述 4.5 自定义 Interceptor 案例需求
Flume1 监控端口数据将监控到的数据发往 Flume2 、Flume3 、Flume4 包含“flume”的数据发往 Flume2包含“hadoop”的数据发往 Flume 3其他的数据发往Flume 4
需求分析
在实际的开发中一台服务器产生的日志类型可能有很多种不同类型的日志可能需要发送到不同的分析系统。此时会用到 Flume 拓扑结构中的 Multiplexing 结构Multiplexing 的原理是根据 event 中 Header 的某个 key 的值将不同的event 发送到不同的 Channel 中所以我们需要 自定义一个 Interceptor为不同类型的 event 的 Header 中的 key 赋予不同的值。 实现步骤
1创建一个 maven 项目并引入以下依赖。
dependencygroupIdorg.apache.flume/groupIdartifactIdflume-ng-core/artifactIdversion1.9.0/version
/dependency2定义 CustomInterceptor 类并实现 Interceptor 接口然后再定义一个静态内部类用来返回自定义的拦截器对象
public class CustomInterceptor implements Interceptor {Overridepublic void initialize() {}/*** 一个event的处理*/Overridepublic Event intercept(Event event) {// 1. 获取event的headersMapString, String headers event.getHeaders();// 2. 获取event的bodyString body new String(event.getBody());// 3. 判断event的body中是否包含flume、hadoopif (body.contains(flume)){headers.put(title,flume);}else if (body.contains(hadoop)){headers.put(title,hadoop);}return event;}/*** 迭代每一个event进行处理*/Overridepublic ListEvent intercept(ListEvent events) {for (Event event : events) {intercept(event);}return events;}Overridepublic void close() {}/*** 定义一个静态内部类用来返回自定义的拦截器对象*/public static class MyBuilder implements Builder{Overridepublic Interceptor build() {return new CustomInterceptor();}Overridepublic void configure(Context context) {}}
}3package 打成 jar 包 4将打好的 jar 包上传到 flume 目录下的 lib 文件夹下
5在 flume 目录的 jobs 文件夹下创建 multi 文件夹
[huweihadoop101 jobs]$ mkdir multi6在 multi 文件夹中新建并编辑配置文件
flume1.conf
[huweihadoop101 multi]$ vim flume1.conf添加如下内容
#Named
a1.sources r1
a1.channels c1 c2 c3
a1.sinks k1 k2 k3#Source
a1.sources.r1.type netcat
a1.sources.r1.bind localhost
a1.sources.r1.port 5555#channel selector
a1.sources.r1.selector.type multiplexing
a1.sources.r1.selector.header title
a1.sources.r1.selector.mapping.flume c1
a1.sources.r1.selector.mapping.hadoop c2
a1.sources.r1.selector.default c3# Interceptor
a1.sources.r1.interceptors i1
a1.sources.r1.interceptors.i1.type com.huwei.flume.CustomInterceptor$MyBuilder#Channel
a1.channels.c1.type memory
a1.channels.c1.capacity 10000
a1.channels.c1.transactionCapacity 100a1.channels.c2.type memory
a1.channels.c2.capacity 10000
a1.channels.c2.transactionCapacity 100a1.channels.c3.type memory
a1.channels.c3.capacity 10000
a1.channels.c3.transactionCapacity 100#Sink
a1.sinks.k1.type avro
a1.sinks.k1.hostname localhost
a1.sinks.k1.port 6666a1.sinks.k2.type avro
a1.sinks.k2.hostname localhost
a1.sinks.k2.port 7777a1.sinks.k3.type avro
a1.sinks.k3.hostname localhost
a1.sinks.k3.port 8888#Bind
a1.sources.r1.channels c1 c2 c3
a1.sinks.k1.channel c1
a1.sinks.k2.channel c2
a1.sinks.k3.channel c3 flume2.conf
[huweihadoop101 failover]$ vim flume2.conf添加如下内容 a2.sources r1
a2.channels c1
a2.sinks k1 #Source
a2.sources.r1.type avro
a2.sources.r1.bind localhost
a2.sources.r1.port 6666#Channel
a2.channels.c1.type memory
a2.channels.c1.capacity 10000
a2.channels.c1.transactionCapacity 100#Sink
a2.sinks.k1.type logger#Bind
a2.sources.r1.channels c1
a2.sinks.k1.channel c1 flume3.conf
[huweihadoop101 multi]$ vim flume3.conf添加如下内容
#Named
a3.sources r1
a3.channels c1
a3.sinks k1 #Source
a3.sources.r1.type avro
a3.sources.r1.bind localhost
a3.sources.r1.port 7777#Channel
a3.channels.c1.type memory
a3.channels.c1.capacity 10000
a3.channels.c1.transactionCapacity 100#Sink
a3.sinks.k1.type logger#Bind
a3.sources.r1.channels c1
a3.sinks.k1.channel c1 flume4.conf
[huweihadoop101 multi]$ vim flume4.conf添加如下内容
#Named
a4.sources r1
a4.channels c1
a4.sinks k1 #Source
a4.sources.r1.type avro
a4.sources.r1.bind localhost
a4.sources.r1.port 8888#Channel
a4.channels.c1.type memory
a4.channels.c1.capacity 10000
a4.channels.c1.transactionCapacity 100#Sink
a4.sinks.k1.type logger#Bind
a4.sources.r1.channels c1
a4.sinks.k1.channel c1 7启动 Flume
[huweihadoop101 ~]$ flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/multi/flume4.conf -n a4 -Dflume.root.loggerINFO,console
[huweihadoop101 ~]$ flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/multi/flume3.conf -n a3 -Dflume.root.loggerINFO,console
[huweihadoop101 ~]$ flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/multi/flume2.conf -n a2 -Dflume.root.loggerINFO,console
[huweihadoop101 ~]$ flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/multi/flume1.conf -n a1 -Dflume.root.loggerINFO,console8发送数据
输入要发送的数据进行测试
[huweihadoop101 ~]$ nc localhost 55554.6 自定义 Source Source 是负责接收数据到 Flume Agent 的组件。Source 组件可以处理各种类型、各种格式的日志数据包括 avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy。官方提供的 source 类型已经很多但是有时候并不能满足实际开发当中的需求此时我们就需要根据实际需求自定义某些source。
4.7 自定义 Sink Sink 不断地轮询 Channel 中的事件且批量地移除它们并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent。
Sink 是完全事务性的。在从 Channel 批量删除数据之前每个 Sink 用 Channel 启动一个事务。批量事件一旦成功写出到存储系统或下一个 Flume AgentSink 就利用 Channel 提交事务。事务一旦被提交该 Channel 从自己的内部缓冲区删除事件。
Sink组件目的地包括 hdfs、logger、avro、thrift、ipc、file、null、HBase、solr、自定义。官方提供的Sink类型已经很多但是有时候并不能满足实际开发当中的需求此时我们就需要根据实际需求自定义某些 Sink。
4.8 Flume 数据流监控
4.8.1 Ganglia 的安装与部署 Ganglia 由gmond、gmetad 和 gweb 三部分组成。
gmondGanglia Monitoring Daemon是一种轻量级服务安装在每台需要收集指标数据的节点主机上。使用 gmond你可以很容易收集很多系统指标数据如CPU、内存、磁盘、网络和活跃进程的数据等。gmetadGanglia Meta Daemon整合所有信息并将其以 RRD 格式存储至磁盘的服务。gwebGanglia WebGanglia 可视化工具gweb 是一种利用浏览器显示gmetad 所存储数据的PHP前端。在 Web 界面中以图表方式展现集群的运行状态下收集的多种不同指标数据。
1集群规划
hadoop101gweb gmetad gmod
hadoop102gmod
hadoop103gmod2在101 102 103 分别安装 epel-release
[huweihadoop101 ~]$ sudo yum -y install epel-release
[huweihadoop102 ~]$ sudo yum -y install epel-release
[huweihadoop103 ~]$ sudo yum -y install epel-release3在101安装
[huweihadoop101 ~]$ sudo yum -y install ganglia-gmetad
[huweihadoop101 ~]$ sudo yum -y install ganglia-web
[huweihadoop101 ~]$ sudo yum -y install ganglia-gmond4在102 和 103 安装
[huweihadoop102 ~]$ sudo yum -y install ganglia-gmond
[huweihadoop103 ~]$ sudo yum -y install ganglia-gmond5在101修改配置文件 /etc/httpd/conf.d/ganglia.conf
[huweihadoop101 ~]$ sudo vim /etc/httpd/conf.d/ganglia.conf通过windows访问ganglia需要配置Linux对应的主机(windows)ip地址这里需要根据自己的电脑 ip 来配置 6在 101 修改配置文件 /etc/ganglia/gmetad.conf
[huweihadoop101 ~]$ sudo vim /etc/ganglia/gmetad.conf同时注意集群名称“my cluster” 7在101 102 103 修改配置文件 /etc/ganglia/gmond.conf
[huweihadoop101 ~]$ sudo vim /etc/ganglia/gmond.conf
[huweihadoop102 ~]$ sudo vim /etc/ganglia/gmond.conf
[huweihadoop103 ~]$ sudo vim /etc/ganglia/gmond.conf数据发送给 hadoop101 接收来自任意连接的数据 8在101 修改配置文件 /etc/selinux/config
[huweihadoop101 ~]$ sudo vim /etc/selinux/configselinux 本次生效关闭必须重启如果此时不想重启可以临时生效之 sudo setenforce 0 9启动 ganglia
在101 102 103 启动
[huweihadoop101 ~]$ sudo systemctl start gmond
[huweihadoop102 ~]$ sudo systemctl start gmond
[huweihadoop103 ~]$ sudo systemctl start gmond查看服务状态systemctl status gmond 在101 启动
[huweihadoop101 ~]$ sudo systemctl start httpd
[huweihadoop101 ~]$ sudo systemctl start gmetad10打开网页浏览 ganglia 页面
http://hadoop101/ganglia
4.8.2 操作 Flume 测试监控 1启动 flume
[huweihadoop101 ~]$ flume-ng agent \-c $FLUME_HOME/conf \-n a1 \-f $FLUME_HOME/jobs/flume-netcat-logger.conf \-Dflume.root.loggerINFO,console \-Dflume.monitoring.typeganglia \-Dflume.monitoring.hostshadoop101:86492发送数据观察 web 界面变化
[huweihadoop101 ~]$ nc localhost 6666