销售一个产品的网站怎么做的,重庆建设施工安全管理网站,上海百度研发中心,.net网站开发工程师q 
1 概述 
1.1 定义 Flume 是Cloudera 提供的一个高可用的#xff0c;高可靠的#xff0c;分布式的海量日志采集、聚合和传输的系统。Flume 基于流式架构#xff0c;灵活简单。  Flume最主要的作用就是#xff0c;实时读取服务器本地磁盘的数据#xff0c;将数据写入到HD…q 
1 概述 
1.1 定义 Flume 是Cloudera 提供的一个高可用的高可靠的分布式的海量日志采集、聚合和传输的系统。Flume 基于流式架构灵活简单。  Flume最主要的作用就是实时读取服务器本地磁盘的数据将数据写入到HDFS。 1.2 架构 1.2.1 Agent  Agent是一个JVM进程它以事件的形式将数据从源头送至目的。          Agent主要有3个部分组成Source、Channel、Sink。  
1.2.2 Source    Source是负责接收数据到Flume Agent的组件。Source组件可以处理各种类型、各种格式的日志数据包括avro、thrift、exec、jms、spooling directory、netcat、taildir、sequence generator、syslog、http、legacy。  
1.2.3 Sink  Sink 不断地轮询 Channel 中的事件且批量地移除它们并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent。  
Sink 组件目的地包括 hdfs、logger、avro、thrift、ipc、file、HBase、solr、自定义。  
1.2.4 Channel  Channel是位于Source 和Sink 之间的缓冲区。因此Channel允许 Source和Sink 运作在不同的速率上。Channel 是线程安全的可以同时处理几个 Source 的写入操作和几个Sink的读取操作。          Flume自带两种ChannelMemory Channel和 File Channel。  Memory Channel是内存中的队列。Memory Channel在不需要关心数据丢失的情景下适用。如果需要关心数据丢失那么 Memory Channel就不应该使用因为程序死亡、机器宕机或者重启都会导致数据丢失。  File Channel 将所有事件写到磁盘。因此在程序关闭或机器宕机的情况下不会丢失数据。  
1.2.5 Event  传输单元Flume 数据传输的基本单元以 Event 的形式将数据从源头送至目的地。 Event由Header 和Body 两部分组成Header用来存放该 event的一些属性为K-V 结构Body用来存放该条数据形式为字节数组。 2 Flume基本操作 
2.1 安装部署 
http://www.apache.org/dyn/closer.lua/flume/1.9.0/apache-flume-1.9.0-tar.gz 
直接解压 
将lib文件夹下的guava-11.0.2.jar删除以兼容 Hadoop 3.1.3 
2.2 案例 
2.2.1 监控端口数据官方案例 
Flume 1.9.0 User Guide — Apache Flume 
使用 Flume监听一个端口收集该端口数据并打印到控制台 1安装netcat工具  [atguiguhadoop102 software]$ sudo yum install -y nc  2判断4444端口是否被占用  [atguiguhadoop102 flume-telnet]$ sudo netstat -nlp | grep 4444 3创建Flume Agent配置文件flume-netcat-logger.conf  4在flume目录下创建 job文件夹并进入job文件夹。  [atguiguhadoop102 flume]$ mkdir job  [atguiguhadoop102 flume]$ cd job/  5在job文件夹下创建 Flume Agent配置文件flume-netcat-logger.conf。  [atguiguhadoop102 job]$ vim flume-netcat-logger.conf  6在flume-netcat-logger.conf文件中添加如下内容 # name the components on this agent 
a1.sources  r1
a1.sinks  k1
a1.channels  c1# Describe/configure the source 
a1.sources.r1.type  netcat
a1.sources.r1.bind  localhost
a1.sources.r1.port  4444# Describe the sink
a1.sinks.k1.type  logger# Use a channel which buffers events in memory 
a1.channels.c1.type  memory
a1.channels.c1.capacity  1000
a1.channels.c1.transactionCapacity  100# Bind the source and sink to the channel 
a1.sources.r1.channels  c1
a1.sinks.k1.channel  c1原神启动   
bin/flume-ng agent --conf conf/ --name a1 --conf-file job/flume-netcat-logger.conf -Dflume.root.loggerINFO,console
or
bin/flume-ng agent -c conf/ -n a1 -f job/flume-netcat-logger.conf -flume.root.loggerINFO,console --conf/-c表示配置文件存储在 conf/目录 --name/-n表示给 agent 起名为 a1 --conf-file/-fflume 本次启动读取的配置文件是在 job 文件夹下的 flume-telnet.conf文件。 -Dflume.root.loggerINFO,console -D 表示 flume 运行时动态修改 flume.root.logge参数属性值并将控制台日志打印级别设置为 INFO 级别。日志级别包括:log、info、warn、error。  
另一台nc启动 nc localhost 4444 然后发消息  2023-10-25 14:03:53,633 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 30    2.2.2 实时监控单个追加文件 
实时监控 Hive 日志并上传到HDFS中  开启hadoop集群 start-all.sh 开启hive /export/servers/hive/bin/hive --service metastore  nohup /export/servers/hive/bin/hive vim flume-file-hdfs.conf 
# Name the components on this agent 
a2.sources  r2
a2.sinks  k2
a2.channels  c2# Describe/configure the source 
a2.sources.r2.type  exec
a2.sources.r2.command  tail -F /export/server/hive/logs/hive.log要监控拉取的文件# Describe the sink 
a2.sinks.k2.type  hdfs
a2.sinks.k2.hdfs.path  hdfs://hadoop1:8020/flume/%Y%m%d/%H这里的端口要和hadoop配置里hdfs的一样
#上传文件的前缀 
a2.sinks.k2.hdfs.filePrefix  logs-
#是否按照时间滚动文件夹 
a2.sinks.k2.hdfs.round  true
#多少时间单位创建一个新的文件夹 
a2.sinks.k2.hdfs.roundValue  1
#重新定义时间单位 
a2.sinks.k2.hdfs.roundUnit  hour
#是否使用本地时间戳 
a2.sinks.k2.hdfs.useLocalTimeStamp  true
#积攒多少个Event 才flush 到HDFS一次 
a2.sinks.k2.hdfs.batchSize  100
#设置文件类型可支持压缩
a2.sinks.k2.hdfs.fileType  DataStream
#多久生成一个新的文件
a2.sinks.k2.hdfs.rollInterval  60
#设置每个文件的滚动大小
a2.sinks.k2.hdfs.rollSize  134217700
#文件的滚动与Event数量无关
a2.sinks.k2.hdfs.rollCount  0# Use a channel which buffers events in memory 
a2.channels.c2.type  memory
a2.channels.c2.capacity  1000
a2.channels.c2.transactionCapacity  100# Bind the source and sink to the channel 
a2.sources.r2.channels  c2
a2.sinks.k2.channel  c2启动 bin/flume-ng agent -n a2 -c conf -f job/flume-file-hdfs.conf ctrlZ退出 在HDFS上查看文件。 
2.2.3 实时监控目录下多个新文件  
使用Flume监听整个目录的文件并上传至 HDFS  在使用 Spooling Directory Source 时不要在监控目录中创建并持续修改文件上传完成的文件会以.COMPLETED结尾被监控文件夹每 500毫秒扫描一次文件变动。  
bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-dir-hdfs.confa3.sources  r3
a3.sinks  k3
a3.channels  c3# Describe/configure the source
a3.sources.r3.type  spooldir
a3.sources.r3.spoolDir  /export/server/flume/upload
a3.sources.r3.fileSuffix  .COMPLETED
a3.sources.r3.fileHeader  true
#忽略所有以.tmp结尾的文件不上传 
a3.sources.r3.ignorePattern  ([^ ]*\.tmp)# Describe the sink 
a3.sinks.k3.type  hdfs
a3.sinks.k3.hdfs.path  hdfs://hadoop1:8020/flume/upload/%Y%m%d/%H
#上传文件的前缀:
a3.sinks.k3.hdfs.filePrefix  upload-
#是否按照时间滚动文件夹 
a3.sinks.k3.hdfs.round  true
#多少时间单位创建一个新的文件夹 
a3.sinks.k3.hdfs.roundValue  1
#重新定义时间单位
a3.sinks.k3.hdfs.roundUnit  hour
#是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp  true
#积攒多少个Event 才flush 到HDFS一次
a3.sinks.k3.hdfs.batchSize  100
#设置文件类型可支持压缩 
a3.sinks.k3.hdfs.fileType  DataStream
#多久生成一个新的文件 
a3.sinks.k3.hdfs.rollInterval  60
#设置每个文件的滚动大小大概是 128M
a3.sinks.k3.hdfs.rollSize  134217700
#文件的滚动与Event数量无关
a3.sinks.k3.hdfs.rollCount  0# Use a channel which buffers events in memory 
a3.channels.c3.type  memory
a3.channels.c3.capacity  1000
a3.channels.c3.transactionCapacity  100# Bind the source and sink to the channel 
a3.sources.r3.channels  c3
a3.sinks.k3.channel  c3 
在/opt/module/flume 目录下创建upload文件夹  
向 upload文件夹中添加文件  2.2.4 实时监控目录下的多个追加文件  Exec source适用于监控一个实时追加的文件不能实现断点续传Spooldir Source适合用于同步新文件但不适合对实时追加日志的文件进行监听并同步而Taildir Source适合用于监听多个实时追加的文件并且能够实现断点续传。 a3.sources  r3 
a3.sinks  k3 
a3.channels  c3 # Describe/configure the source 
a3.sources.r3.type  TAILDIR 
a3.sources.r3.positionFile  /opt/module/flume/tail_dir.json 
a3.sources.r3.filegroups  f1 f2 
a3.sources.r3.filegroups.f1  /opt/module/flume/files/.*file.* 
a3.sources.r3.filegroups.f2  /opt/module/flume/files2/.*log.* # Describe the sink 
a3.sinks.k3.type  hdfs 
a3.sinks.k3.hdfs.path  
hdfs://hadoop102:9820/flume/upload2/%Y%m%d/%H 
#上传文件的前缀 
a3.sinks.k3.hdfs.filePrefix  upload-
#是否按照时间滚动文件夹 
a3.sinks.k3.hdfs.round  true 
#多少时间单位创建一个新的文件夹 
a3.sinks.k3.hdfs.roundValue  1 
#重新定义时间单位 
a3.sinks.k3.hdfs.roundUnit  hour 
#是否使用本地时间戳 
a3.sinks.k3.hdfs.useLocalTimeStamp  true 
#积攒多少个Event 才flush 到HDFS一次 
a3.sinks.k3.hdfs.batchSize  100 
#设置文件类型可支持压缩 
a3.sinks.k3.hdfs.fileType  DataStream 
#多久生成一个新的文件 
a3.sinks.k3.hdfs.rollInterval  60 
#设置每个文件的滚动大小大概是128M 
a3.sinks.k3.hdfs.rollSize  134217700 
#文件的滚动与Event数量无关 
a3.sinks.k3.hdfs.rollCount  0 # Use a channel which buffers events in memory 
a3.channels.c3.type  memory 
a3.channels.c3.capacity  1000 
a3.channels.c3.transactionCapacity  100 # Bind the source and sink to the channel 
a3.sources.r3.channels  c3 
a3.sinks.k3.channel  c3 Taildir 说明    Taildir Source维护了一个 json格式的position File其会定期的往position File中更新每个文件读取到的最新的位置因此能够实现断点续传。Position File的格式如下  3 Flume 高级