网站优化排名易下拉排名,互联网之光博览会第六届,wordpress 最好的seo,网站建设 有必要吗随着时间的积累#xff0c;日志数据会越来越多#xff0c;当您需要查看并分析庞杂的日志数据时#xff0c;可通过FilebeatKafkaLogstashElasticsearch采集日志数据到Elasticsearch中#xff0c;并通过Kibana进行可视化展示与分析。本文介绍具体的实现方法。
一、背景信息 … 随着时间的积累日志数据会越来越多当您需要查看并分析庞杂的日志数据时可通过FilebeatKafkaLogstashElasticsearch采集日志数据到Elasticsearch中并通过Kibana进行可视化展示与分析。本文介绍具体的实现方法。
一、背景信息
Kafka是一种分布式、高吞吐、可扩展的消息队列服务广泛用于日志收集、监控数据聚合、流式数据处理、在线和离线分析等大数据领域已成为大数据生态中不可或缺的部分。
在实际应用场景中为了满足大数据实时检索的需求您可以使用Filebeat采集日志数据并输出到Kafka中。Kafka实时接收Filebeat采集的数据并输出到Logstash中。输出到Logstash中的数据在格式或内容上可能不能满足您的需求此时可以通过Logstash的filter插件过滤数据。最后将满足需求的数据输出到Elasticsearch中进行分布式检索并通过Kibana进行数据分析与展示。简单流程如下。 二、操作流程
1、准备工作
完成环境准备包括创建Elasticsearch、Logstash、ECS和消息队列 Kafka 版实例、创建Topic和Consumer Group等。
2、步骤一安装并配置Filebeat 安装并配置Filebeat设置input为系统日志output为Kafka将日志数据采集到Kafka的指定Topic中。
3、步骤二配置Logstash管道
配置Logstash管道的input为Kafkaoutput为阿里云Elasticsearch使用Logstash消费Topic中的数据并传输到阿里云Elasticsearch中。
4、步骤三查看日志消费状态
在消息队列Kafka中查看日志数据的消费的状态验证日志数据是否采集成功。
5、步骤四通过Kibana过滤日志数据
在Kibana控制台的Discover页面通过Filter过滤出Kafka相关的日志。
三、步骤一安装并配置Filebeat 连接ECS服务器。 安装Filebeat。 本文以6.8.5版本为例安装命令如下详细信息请参见Install Filebeat。 curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-6.8.5-linux-x86_64.tar.gz
tar xzvf filebeat-6.8.5-linux-x86_64.tar.gz 执行以下命令进入Filebeat安装目录创建并配置filebeat.kafka.yml文件。 cd filebeat-6.8.5-linux-x86_64
vi filebeat.kafka.yml filebeat.kafka.yml配置如下。 filebeat.prospectors:- type: logenabled: truepaths:- /var/log/*.logoutput.kafka:hosts: [alikafka-post-cn-zvp2n4v7****-1-vpc.alikafka.aliyuncs.com:9092]topic: estestversion: 0.10.2 重要 当Filebeat为7.0及以上版本时filebeat.prospectors需要替换为filebeat.inputs。 参数 说明 type 输入类型。设置为log表示输入源为日志。 enabled 设置配置是否生效 true生效 false不生效 paths 需要监控的日志文件的路径。多个日志可在当前路径下另起一行写入日志文件路径。 hosts 消息队列Kafka实例的单个接入点可在实例详情页面获取详情请参见查看接入点。由于本文使用的是VPC实例因此使用默认接入点中的任意一个接入点。 topic 日志输出到消息队列Kafka的Topic请指定为您已创建的Topic。 version Kafka的版本可在消息队列Kafka的实例详情页面获取。 重要 不配置此参数会报错。 由于不同版本的Filebeat支持的Kafka版本不同例如8.2及以上版本的Filebeat支持的Kafka版本为2.2.0因此version需要设置为Filebeat支持的Kafka版本否则会出现类似报错Exiting: error initializing publisher: unknown/unsupported kafka vesion 2.2.0 accessing output.kafka.version (source:filebeat.kafka.yml)详细信息请参见version。 启动Filebeat。 ./filebeat -e -c filebeat.kafka.yml
四、步骤二配置Logstash管道
进入阿里云Elasticsearch控制台的Logstash页面。进入目标实例。 在顶部菜单栏处选择地域。在Logstash实例中单击目标实例ID。 在左侧导航栏单击管道管理。 单击创建管道。 在创建管道任务页面输入管道ID并配置管道。 本文使用的管道配置如下。 input {kafka {bootstrap_servers [alikafka-post-cn-zvp2n4v7****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-zvp2n4v7****-2-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-zvp2n4v7****-3-vpc.alikafka.aliyuncs.com:9092]group_id es-testtopics [estest]codec json}
}
filter {}
output {elasticsearch {hosts http://es-cn-n6w1o1x0w001c****.elasticsearch.aliyuncs.com:9200user elasticpassword your_passwordindex kafka‐%{YYYY.MM.dd}}
} 表 1. input参数说明 参数 说明 bootstrap_servers 消息队列Kafka实例的接入点可在实例详情页面获取详情请参见查看接入点。由于本文使用的是VPC实例因此使用默认接入点。 group_id 指定为您已创建的Consumer Group的名称。 topics 指定为您已创建的Topic的名称需要与Filebeat中配置的Topic名称保持一致。 codec 设置为json表示解析JSON格式的字段便于在Kibana中分析。 表 2. output参数说明 参数 说明 hosts 阿里云Elasticsearch的访问地址取值为http://阿里云Elasticsearch实例的私网地址:9200。 说明 您可在阿里云Elasticsearch实例的基本信息页面获取其私网地址详情请参见查看实例的基本信息。 user 访问阿里云Elasticsearch的用户名默认为elastic。您也可以使用自建用户详情请参见通过Elasticsearch X-Pack角色管理实现用户权限管控。 password 访问阿里云Elasticsearch的密码在创建实例时设置。如果忘记密码可进行重置重置密码的注意事项及操作步骤请参见重置实例访问密码。 index 索引名称。设置为kafka‐%{YYYY.MM.dd}表示索引名称以kafka为前缀以日期为后缀例如kafka-2020.05.27。 更多Config配置详情请参见Logstash配置文件说明。 如果您有多topic的数据同步需求需要在kafka中添加新的topic然后在Logstash的管道配置中添加input。示例如下 input {kafka {bootstrap_servers [alikafka-post-cn-zvp2n4v7****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-zvp2n4v7****-2-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-zvp2n4v7****-3-vpc.alikafka.aliyuncs.com:9092]group_id es-testtopics [estest]codec json
}kafka {bootstrap_servers [alikafka-post-cn-zvp2n4v7****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-zvp2n4v7****-2-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-zvp2n4v7****-3-vpc.alikafka.aliyuncs.com:9092]group_id es-test-2topics [estest_2]codec json
}
} 单击下一步配置管道参数。 参数 说明 管道工作线程 并行执行管道的Filter和Output的工作线程数量。当事件出现积压或CPU未饱和时请考虑增大线程数更好地使用CPU处理能力。默认值实例的CPU核数。 管道批大小 单个工作线程在尝试执行Filter和Output前可以从Input收集的最大事件数目。较大的管道批大小可能会带来较大的内存开销。您可以设置LS_HEAP_SIZE变量来增大JVM堆大小从而有效使用该值。默认值125。 管道批延迟 创建管道事件批时将过小的批分派给管道工作线程之前要等候每个事件的时长单位为毫秒。默认值50ms。 队列类型 用于事件缓冲的内部排队模型。可选值 MEMORY默认值。基于内存的传统队列。 PERSISTED基于磁盘的ACKed队列持久队列。 队列最大字节数 请确保该值小于您的磁盘总容量。默认值1024 MB。 队列检查点写入数 启用持久性队列时在强制执行检查点之前已写入事件的最大数目。设置为0表示无限制。默认值1024。 警告 配置完成后需要保存并部署才能生效。保存并部署操作会触发实例重启请在不影响业务的前提下继续执行以下步骤。 单击保存或者保存并部署。 保存将管道信息保存在Logstash里并触发实例变更配置不会生效。保存后系统会返回管道管理页面。可在管道列表区域单击操作列下的立即部署触发实例重启使配置生效。 保存并部署保存并且部署后会触发实例重启使配置生效。
五、步骤三查看日志消费状态 进入消息队列Kafka控制台。 参见查看消费状态查看详细消费状态。 预期结果如下
六、步骤四通过Kibana过滤日志数据
登录目标阿里云Elasticsearch实例的Kibana控制台根据页面提示进入Kibana主页。 创建一个索引模式。 在左侧导航栏单击Management。 在Kibana区域单击Index Patterns。 单击Create index pattern。 输入Index pattern本文使用kafka-*单击Next step。 选择Time Filter field name本文选择timestamp单击Create index pattern。 在左侧导航栏单击Discover。 从页面左侧的下拉列表中选择您已创建的索引模式kafka-*。 在页面右上角选择一段时间查看对应时间段内的Filebeat采集的日志数据。 单击Add a filter在Add filter页面中设置过滤条件查看符合对应过滤条件的日志数据。
七、常见问题
Q同步日志数据出现问题管道一直在生效中无法将数据导入Elasticsearch如何解决
A查看Logstash实例的主日志是否有报错根据报错判断原因具体操作请参见查询日志。常见的原因及解决方法如下。 原因 解决方法 Kafka的接入点不正确。 参见查看接入点获取正确的接入点。完成后修改管道配置替换错误接入点。 Logstash与Kafka不在同一VPC下。 重新购买同一VPC下的实例。购买后修改现有管道配置。 说明 VPC实例只能通过专有网络VPC访问 云消息队列 Kafka 版 。 Kafka或Logstash集群的配置太低例如使用了测试版集群。 升级集群规格完成后刷新实例观察变更进度。升级Logstash实例规格的具体操作请参见升配集群升级Kafka实例规格的具体操作请参见升级实例配置。 管道配置中包含了file_extend但没有安裝logstash-output-file_extend插件。 选择以下任意一种方式处理 安装logstash-output-file_extend插件。具体操作请参见 安装或卸载插件。 中断变更等到实例处于变更中断状态后在管道配置中去掉file_extend配置触发重启恢复。