找加工厂上什么网站,网站开发主管工作内容,展台设计搭建,phpcms v9 网站模板在构建数仓时#xff0c;经常会用到flume接收日志数据#xff0c;通常涉及到的组件为kafka#xff0c;hdfs等。下面以一个flume接收指定topic数据#xff0c;并存入hdfs的案例#xff0c;大致了解下flume相关使用规则。
版本#xff1a;1.9
Source
Kafka Source就是一…在构建数仓时经常会用到flume接收日志数据通常涉及到的组件为kafkahdfs等。下面以一个flume接收指定topic数据并存入hdfs的案例大致了解下flume相关使用规则。
版本1.9
Source
Kafka Source就是一个Apache Kafka消费者它从Kafka的topic中读取消息。 如果运行了多个Kafka Source则可以把它们配置到同一个消费者组以便每个source都读取一组唯一的topic分区。
目前支持Kafka 0.10.1.0以上版本最高已经在Kafka 2.0.1版本上完成了测试这已经是Flume 1.9发行时候的最高的Kafka版本了。 属性名 默认值 解释 channels – 与Source绑定的channel多个用空格分开 type – 组件类型这个是 org.apache.flume.source.kafka.KafkaSource kafka.bootstrap.servers – Source使用的Kafka集群实例列表 kafka.consumer.group.id flume 消费组的唯一标识符。如果有多个source或者Agent设定了相同的ID表示它们是同一个消费者组 kafka.topics – 将要读取消息的目标 Kafka topic 列表多个用逗号分隔 kafka.topics.regex – 会被Kafka Source订阅的 topic 集合的正则表达式。这个参数比 kafka.topics 拥有更高的优先级如果这两个参数同时存在则会覆盖kafka.topics的配置。 batchSize 1000 一批写入 channel 的最大消息数 batchDurationMillis 1000 一个批次写入 channel 之前的最大等待时间毫秒。达到等待时间或者数量达到 batchSize 都会触发写操作。 backoffSleepIncrement 1000 当Kafka topic 显示为空时触发的初始和增量等待时间毫秒。等待时间可以避免对Kafka topic的频繁ping操作。默认的1秒钟对于获取数据比较合适 但是对于使用拦截器时想达到更低的延迟可能就需要配置更低一些。 maxBackoffSleep 5000 Kafka topic 显示为空时触发的最长等待时间毫秒。默认的5秒钟对于获取数据比较合适但是对于使用拦截器时想达到更低的延迟可能就需要配置更低一些。 useFlumeEventFormat false 默认情况下从 Kafka topic 里面读取到的内容直接以字节数组的形式赋值给Event。如果设置为true会以Flume Avro二进制格式进行读取。与Kafka Sink上的同名参数或者 Kafka channel 的parseAsFlumeEvent参数相关联这样以对象的形式处理能使生成端发送过来的Event header信息得以保留。 setTopicHeader true 当设置为 true 时会把存储Event的topic名字存储到header中使用的key就是下面的 topicHeader 的值。 topicHeader topic 如果 setTopicHeader 设置为 true 则定义用于存储接收消息的 topic 使用header key。注意如果与 Kafka Sink 的 topicHeader 参数一起使用的时候要小心避免又循环将消息又发送回 topic。 kafka.consumer.security.protocol PLAINTEXT 设置使用哪种安全协议写入Kafka。可选值SASL_PLAINTEXT 、 SASL_SSL 和 SSL ,有关安全设置的其他信息请参见下文。 more consumer security props 如果使用了SASL_PLAINTEXT、SASL_SSL 或 SSL 等安全协议参考 Kafka security 来为消费者增加安全相关的参数配置 Other Kafka Consumer Properties – 其他一些 Kafka 消费者配置参数。任何 Kafka 支持的消费者参数都可以使用。唯一的要求是使用“kafka.consumer.”这个前缀来配置参数比如 kafka.consumer.auto.offset.reset
必需的参数已用 粗体 标明。
已经弃用的一些属性 属性名 默认值 解释 topic – 改用 kafka.topics groupId flume 改用 kafka.consumer.group.id zookeeperConnect – 自0.9.x起不再受kafka消费者客户端的支持。以后使用kafka.bootstrap.servers与kafka集群建立连接 migrateZookeeperOffsets true 如果找不到Kafka存储的偏移量去Zookeeper中查找偏移量并将它们提交给 Kafka 。 它应该设置为true以支持从旧版本的FlumeKafka客户端无缝迁移。 迁移后可以将其设置为false但通常不需要这样做。 如果在Zookeeper未找到偏移量则可通过kafka.consumer.auto.offset.reset配置如何处理偏移量。可以从 Kafka documentation 查看更多详细信息。 Channel
此处选择memory channel内存 channel 是把 Event 队列存储到内存上队列的最大数量就是 capacity 的设定值。它非常适合对吞吐量有较高要求的场景但也是有代价的当发生故障的时候会丢失当时内存中的所有 Event。 必需的参数已用 粗体 标明。 属性 默认值 解释 type – 组件类型这个是 memory capacity 100 内存中存储 Event 的最大数 transactionCapacity 100 source 或者 sink 每个事务中存取 Event 的操作数量不能比 capacity 大 keep-alive 3 添加或删除一个 Event 的超时时间秒 byteCapacityBufferPercentage 20 指定 Event header 所占空间大小与 channel 中所有 Event 的总大小之间的百分比 byteCapacity Channel 中最大允许存储所有 Event 的总字节数bytes。默认情况下会使用JVM可用内存的80%作为最大可用内存就是JVM启动参数里面配置的-Xmx的值。 计算总字节时只计算 Event 的主体这也是提供 byteCapacityBufferPercentage 配置参数的原因。注意当你在一个 Agent 里面有多个内存 channel 的时候 而且碰巧这些 channel 存储相同的物理 Event例如这些 channel 通过复制机制 复制选择器 接收同一个 source 中的 Event 这时候这些 Event 占用的空间是累加的并不会只计算一次。如果这个值设置为0不限制就会达到200G左右的内部硬件限制。
Sink
HDFS Sink 这个Sink将Event写入Hadoop分布式文件系统也就是HDFS。 目前支持创建文本和序列文件。 它支持两种文件类型的压缩。 可以根据写入的时间、文件大小或Event数量定期滚动文件关闭当前文件并创建新文件。 它还可以根据Event自带的时间戳或系统时间等属性对数据进行分区。 存储文件的HDFS目录路径可以使用格式转义符会由HDFS Sink进行动态地替换以生成用于存储Event的目录或文件名。 使用此Sink需要安装hadoop 以便Flume可以使用Hadoop的客户端与HDFS集群进行通信。 注意 需要使用支持sync() 调用的Hadoop版本 。 属性名 默认值 解释 channel – 与 Sink 连接的 channel type – 组件类型这个是 hdfs hdfs.path – HDFS目录路径例如hdfs://namenode/flume/webdata/ hdfs.filePrefix FlumeData Flume在HDFS文件夹下创建新文件的固定前缀 hdfs.fileSuffix – Flume在HDFS文件夹下创建新文件的后缀比如.avro注意这个“.”不会自动添加需要显式配置 hdfs.inUsePrefix – Flume正在写入的临时文件前缀默认没有 hdfs.inUseSuffix .tmp Flume正在写入的临时文件后缀 hdfs.emptyInUseSuffix false 如果设置为 false 上面的 hdfs.inUseSuffix 参数在写入文件时会生效并且写入完成后会在目标文件上移除 hdfs.inUseSuffix 配置的后缀。如果设置为 true 则上面的 hdfs.inUseSuffix 参数会被忽略写文件时不会带任何后缀 hdfs.rollInterval 30 当前文件写入达到该值时间后触发滚动创建新文件0表示不按照时间来分割文件单位秒 hdfs.rollSize 1024 当前文件写入达到该大小后触发滚动创建新文件0表示不根据文件大小来分割文件单位字节 hdfs.rollCount 10 当前文件写入Event达到该数量后触发滚动创建新文件0表示不根据 Event 数量来分割文件 hdfs.idleTimeout 0 关闭非活动文件的超时时间0表示禁用自动关闭文件单位秒 hdfs.batchSize 100 向 HDFS 写入内容时每次批量操作的 Event 数量 hdfs.codeC – 压缩算法。可选值gzip 、 bzip2 、 lzo 、 lzop 、 snappy hdfs.fileType SequenceFile 文件格式目前支持 SequenceFile 、 DataStream 、 CompressedStream 。 1. DataStream 不会压缩文件不需要设置hdfs.codeC 2. CompressedStream 必须设置hdfs.codeC参数 hdfs.maxOpenFiles 5000 允许打开的最大文件数如果超过这个数量最先打开的文件会被关闭 hdfs.minBlockReplicas – 指定每个HDFS块的最小副本数。 如果未指定则使用 classpath 中 Hadoop 的默认配置。 hdfs.writeFormat Writable 文件写入格式。可选值 Text 、 Writable 。在使用 Flume 创建数据文件之前设置为 Text否则 Apache Impala孵化或 Apache Hive 无法读取这些文件。 hdfs.threadsPoolSize 10 每个HDFS Sink实例操作HDFS IO时开启的线程数open、write 等 hdfs.rollTimerPoolSize 1 每个HDFS Sink实例调度定时文件滚动的线程数 hdfs.kerberosPrincipal – 用于安全访问 HDFS 的 Kerberos 用户主体 hdfs.kerberosKeytab – 用于安全访问 HDFS 的 Kerberos keytab 文件 hdfs.proxyUser 代理名 hdfs.round false 是否应将时间戳向下舍入如果为true则影响除 %t 之外的所有基于时间的转义符 hdfs.roundValue 1 向下舍入小于当前时间的这个值的最高倍单位取决于下面的 hdfs.roundUnit 例子假设当前时间戳是18:32:01hdfs.roundUnit minute 如果roundValue5则时间戳会取为18:30 如果roundValue7则时间戳会取为18:28 如果roundValue10则时间戳会取为18:30 hdfs.roundUnit second 向下舍入的单位可选值 second 、 minute 、 hour hdfs.timeZone Local Time 解析存储目录路径时候所使用的时区名例如America/Los_Angeles、Asia/Shanghai hdfs.useLocalTimeStamp false 使用日期时间转义符时是否使用本地时间戳而不是使用 Event header 中自带的时间戳 hdfs.closeTries 0 开始尝试关闭文件时最大的重命名文件的尝试次数因为打开的文件通常都有个.tmp的后缀写入结束关闭文件时要重命名把后缀去掉。 如果设置为1Sink在重命名失败可能是因为 NameNode 或 DataNode 发生错误后不会重试这样就导致了这个文件会一直保持为打开状态并且带着.tmp的后缀 如果设置为0Sink会一直尝试重命名文件直到成功为止 关闭文件操作失败时这个文件可能仍然是打开状态这种情况数据还是完整的不会丢失只有在Flume重启后文件才会关闭。 hdfs.retryInterval 180 连续尝试关闭文件的时间间隔秒。 每次关闭操作都会调用多次 RPC 往返于 Namenode 因此将此设置得太低会导致 Namenode 上产生大量负载。 如果设置为0或更小则如果第一次尝试失败将不会再尝试关闭文件并且可能导致文件保持打开状态或扩展名为“.tmp”。 serializer TEXT Event 转为文件使用的序列化器。其他可选值有 avro_event 或其他 EventSerializer.Builderinterface 接口的实现类的全限定类名。 serializer.* 根据上面 serializer 配置的类型来根据需要添加序列化器的参数
完整案例如下
a1.sourcesr1
a1.channelsc1
a1.sinksk1a1.sources.r1.type org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize 5000
a1.sources.r1.batchDurationMillis 2000
a1.sources.r1.kafka.bootstrap.servers hmcs030:9092,hmcs031:9092,hmcs032:9092
a1.sources.r1.kafka.topics hmcs_network_enterprise_climb
a1.sources.r1.kafka.consumer.group.id hmcs_network_enterprise_climb_group
a1.sources.r1.interceptors i1
a1.sources.r1.interceptors.i1.type com.hmcs.interceptor.DecodeInterceptor$Buildera1.channels.c1.type memory
a1.channels.c1.capacity 10000
a1.channels.c1.transactionCapacity 10000
a1.channels.c1.parseAsFlumeEvent falsea1.sinks.k1.type hdfs
a1.sinks.k1.hdfs.path hdfs://ns1/flume/enterprise/networkEnterData/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix climbNetworkEnter-
a1.sinks.k1.hdfs.round false
a1.sinks.k1.hdfs.rollInterval 300
a1.sinks.k1.hdfs.rollSize 134217728
a1.sinks.k1.hdfs.rollCount 0
a1.sinks.k1.hdfs.fileTypeDataStreama1.sources.r1.channels c1
a1.sinks.k1.channel c1启动命令如下
nohup /usr/local/flume/bin/flume-ng agent -c /usr/local/flume/conf/ -f /usr/local/flume/job/kafka_memory_hdfs.conf -n a1 -Dflume.root.loggerinfo,console /usr/local/flume/logs/kafka_memory_hdfs.log 21