销售一个产品的网站怎么做的,重庆建设施工安全管理网站,上海百度研发中心,.net网站开发工程师q
1 概述
1.1 定义 Flume 是Cloudera 提供的一个高可用的#xff0c;高可靠的#xff0c;分布式的海量日志采集、聚合和传输的系统。Flume 基于流式架构#xff0c;灵活简单。 Flume最主要的作用就是#xff0c;实时读取服务器本地磁盘的数据#xff0c;将数据写入到HD…q
1 概述
1.1 定义 Flume 是Cloudera 提供的一个高可用的高可靠的分布式的海量日志采集、聚合和传输的系统。Flume 基于流式架构灵活简单。 Flume最主要的作用就是实时读取服务器本地磁盘的数据将数据写入到HDFS。 1.2 架构 1.2.1 Agent Agent是一个JVM进程它以事件的形式将数据从源头送至目的。 Agent主要有3个部分组成Source、Channel、Sink。
1.2.2 Source Source是负责接收数据到Flume Agent的组件。Source组件可以处理各种类型、各种格式的日志数据包括avro、thrift、exec、jms、spooling directory、netcat、taildir、sequence generator、syslog、http、legacy。
1.2.3 Sink Sink 不断地轮询 Channel 中的事件且批量地移除它们并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent。
Sink 组件目的地包括 hdfs、logger、avro、thrift、ipc、file、HBase、solr、自定义。
1.2.4 Channel Channel是位于Source 和Sink 之间的缓冲区。因此Channel允许 Source和Sink 运作在不同的速率上。Channel 是线程安全的可以同时处理几个 Source 的写入操作和几个Sink的读取操作。 Flume自带两种ChannelMemory Channel和 File Channel。 Memory Channel是内存中的队列。Memory Channel在不需要关心数据丢失的情景下适用。如果需要关心数据丢失那么 Memory Channel就不应该使用因为程序死亡、机器宕机或者重启都会导致数据丢失。 File Channel 将所有事件写到磁盘。因此在程序关闭或机器宕机的情况下不会丢失数据。
1.2.5 Event 传输单元Flume 数据传输的基本单元以 Event 的形式将数据从源头送至目的地。 Event由Header 和Body 两部分组成Header用来存放该 event的一些属性为K-V 结构Body用来存放该条数据形式为字节数组。 2 Flume基本操作
2.1 安装部署
http://www.apache.org/dyn/closer.lua/flume/1.9.0/apache-flume-1.9.0-tar.gz
直接解压
将lib文件夹下的guava-11.0.2.jar删除以兼容 Hadoop 3.1.3
2.2 案例
2.2.1 监控端口数据官方案例
Flume 1.9.0 User Guide — Apache Flume
使用 Flume监听一个端口收集该端口数据并打印到控制台 1安装netcat工具 [atguiguhadoop102 software]$ sudo yum install -y nc 2判断4444端口是否被占用 [atguiguhadoop102 flume-telnet]$ sudo netstat -nlp | grep 4444 3创建Flume Agent配置文件flume-netcat-logger.conf 4在flume目录下创建 job文件夹并进入job文件夹。 [atguiguhadoop102 flume]$ mkdir job [atguiguhadoop102 flume]$ cd job/ 5在job文件夹下创建 Flume Agent配置文件flume-netcat-logger.conf。 [atguiguhadoop102 job]$ vim flume-netcat-logger.conf 6在flume-netcat-logger.conf文件中添加如下内容 # name the components on this agent
a1.sources r1
a1.sinks k1
a1.channels c1# Describe/configure the source
a1.sources.r1.type netcat
a1.sources.r1.bind localhost
a1.sources.r1.port 4444# Describe the sink
a1.sinks.k1.type logger# Use a channel which buffers events in memory
a1.channels.c1.type memory
a1.channels.c1.capacity 1000
a1.channels.c1.transactionCapacity 100# Bind the source and sink to the channel
a1.sources.r1.channels c1
a1.sinks.k1.channel c1原神启动
bin/flume-ng agent --conf conf/ --name a1 --conf-file job/flume-netcat-logger.conf -Dflume.root.loggerINFO,console
or
bin/flume-ng agent -c conf/ -n a1 -f job/flume-netcat-logger.conf -flume.root.loggerINFO,console --conf/-c表示配置文件存储在 conf/目录 --name/-n表示给 agent 起名为 a1 --conf-file/-fflume 本次启动读取的配置文件是在 job 文件夹下的 flume-telnet.conf文件。 -Dflume.root.loggerINFO,console -D 表示 flume 运行时动态修改 flume.root.logge参数属性值并将控制台日志打印级别设置为 INFO 级别。日志级别包括:log、info、warn、error。
另一台nc启动 nc localhost 4444 然后发消息 2023-10-25 14:03:53,633 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 30 2.2.2 实时监控单个追加文件
实时监控 Hive 日志并上传到HDFS中 开启hadoop集群 start-all.sh 开启hive /export/servers/hive/bin/hive --service metastore nohup /export/servers/hive/bin/hive vim flume-file-hdfs.conf
# Name the components on this agent
a2.sources r2
a2.sinks k2
a2.channels c2# Describe/configure the source
a2.sources.r2.type exec
a2.sources.r2.command tail -F /export/server/hive/logs/hive.log要监控拉取的文件# Describe the sink
a2.sinks.k2.type hdfs
a2.sinks.k2.hdfs.path hdfs://hadoop1:8020/flume/%Y%m%d/%H这里的端口要和hadoop配置里hdfs的一样
#上传文件的前缀
a2.sinks.k2.hdfs.filePrefix logs-
#是否按照时间滚动文件夹
a2.sinks.k2.hdfs.round true
#多少时间单位创建一个新的文件夹
a2.sinks.k2.hdfs.roundValue 1
#重新定义时间单位
a2.sinks.k2.hdfs.roundUnit hour
#是否使用本地时间戳
a2.sinks.k2.hdfs.useLocalTimeStamp true
#积攒多少个Event 才flush 到HDFS一次
a2.sinks.k2.hdfs.batchSize 100
#设置文件类型可支持压缩
a2.sinks.k2.hdfs.fileType DataStream
#多久生成一个新的文件
a2.sinks.k2.hdfs.rollInterval 60
#设置每个文件的滚动大小
a2.sinks.k2.hdfs.rollSize 134217700
#文件的滚动与Event数量无关
a2.sinks.k2.hdfs.rollCount 0# Use a channel which buffers events in memory
a2.channels.c2.type memory
a2.channels.c2.capacity 1000
a2.channels.c2.transactionCapacity 100# Bind the source and sink to the channel
a2.sources.r2.channels c2
a2.sinks.k2.channel c2启动 bin/flume-ng agent -n a2 -c conf -f job/flume-file-hdfs.conf ctrlZ退出 在HDFS上查看文件。
2.2.3 实时监控目录下多个新文件
使用Flume监听整个目录的文件并上传至 HDFS 在使用 Spooling Directory Source 时不要在监控目录中创建并持续修改文件上传完成的文件会以.COMPLETED结尾被监控文件夹每 500毫秒扫描一次文件变动。
bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-dir-hdfs.confa3.sources r3
a3.sinks k3
a3.channels c3# Describe/configure the source
a3.sources.r3.type spooldir
a3.sources.r3.spoolDir /export/server/flume/upload
a3.sources.r3.fileSuffix .COMPLETED
a3.sources.r3.fileHeader true
#忽略所有以.tmp结尾的文件不上传
a3.sources.r3.ignorePattern ([^ ]*\.tmp)# Describe the sink
a3.sinks.k3.type hdfs
a3.sinks.k3.hdfs.path hdfs://hadoop1:8020/flume/upload/%Y%m%d/%H
#上传文件的前缀:
a3.sinks.k3.hdfs.filePrefix upload-
#是否按照时间滚动文件夹
a3.sinks.k3.hdfs.round true
#多少时间单位创建一个新的文件夹
a3.sinks.k3.hdfs.roundValue 1
#重新定义时间单位
a3.sinks.k3.hdfs.roundUnit hour
#是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp true
#积攒多少个Event 才flush 到HDFS一次
a3.sinks.k3.hdfs.batchSize 100
#设置文件类型可支持压缩
a3.sinks.k3.hdfs.fileType DataStream
#多久生成一个新的文件
a3.sinks.k3.hdfs.rollInterval 60
#设置每个文件的滚动大小大概是 128M
a3.sinks.k3.hdfs.rollSize 134217700
#文件的滚动与Event数量无关
a3.sinks.k3.hdfs.rollCount 0# Use a channel which buffers events in memory
a3.channels.c3.type memory
a3.channels.c3.capacity 1000
a3.channels.c3.transactionCapacity 100# Bind the source and sink to the channel
a3.sources.r3.channels c3
a3.sinks.k3.channel c3
在/opt/module/flume 目录下创建upload文件夹
向 upload文件夹中添加文件 2.2.4 实时监控目录下的多个追加文件 Exec source适用于监控一个实时追加的文件不能实现断点续传Spooldir Source适合用于同步新文件但不适合对实时追加日志的文件进行监听并同步而Taildir Source适合用于监听多个实时追加的文件并且能够实现断点续传。 a3.sources r3
a3.sinks k3
a3.channels c3 # Describe/configure the source
a3.sources.r3.type TAILDIR
a3.sources.r3.positionFile /opt/module/flume/tail_dir.json
a3.sources.r3.filegroups f1 f2
a3.sources.r3.filegroups.f1 /opt/module/flume/files/.*file.*
a3.sources.r3.filegroups.f2 /opt/module/flume/files2/.*log.* # Describe the sink
a3.sinks.k3.type hdfs
a3.sinks.k3.hdfs.path
hdfs://hadoop102:9820/flume/upload2/%Y%m%d/%H
#上传文件的前缀
a3.sinks.k3.hdfs.filePrefix upload-
#是否按照时间滚动文件夹
a3.sinks.k3.hdfs.round true
#多少时间单位创建一个新的文件夹
a3.sinks.k3.hdfs.roundValue 1
#重新定义时间单位
a3.sinks.k3.hdfs.roundUnit hour
#是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp true
#积攒多少个Event 才flush 到HDFS一次
a3.sinks.k3.hdfs.batchSize 100
#设置文件类型可支持压缩
a3.sinks.k3.hdfs.fileType DataStream
#多久生成一个新的文件
a3.sinks.k3.hdfs.rollInterval 60
#设置每个文件的滚动大小大概是128M
a3.sinks.k3.hdfs.rollSize 134217700
#文件的滚动与Event数量无关
a3.sinks.k3.hdfs.rollCount 0 # Use a channel which buffers events in memory
a3.channels.c3.type memory
a3.channels.c3.capacity 1000
a3.channels.c3.transactionCapacity 100 # Bind the source and sink to the channel
a3.sources.r3.channels c3
a3.sinks.k3.channel c3 Taildir 说明 Taildir Source维护了一个 json格式的position File其会定期的往position File中更新每个文件读取到的最新的位置因此能够实现断点续传。Position File的格式如下 3 Flume 高级