最近更新时间:2025-08-18 16:56:29
Logstash 是一个开源的日志处理工具,可以从多个源头收集数据、过滤收集的数据并对数据进行存储作为其他用途。
Logstash 灵活性强,拥有强大的语法分析功能,插件丰富,支持多种输入和输出源。Logstash 作为水平可伸缩的数据管道,与 Elasticsearch 和 Kibana 配合,在日志收集检索方面功能强大。
Logstash 数据处理可以分为三个阶段:inputs → filters → outputs。
inputs:产生数据来源,例如文件、syslog、redis 和 beats 此类来源。
filters:修改过滤数据, 在 Logstash 数据管道中属于中间环节,可以根据条件去对事件进行更改。一些常见的过滤器包括:grok、mutate、drop 和 clone 等。
outputs:将数据传输到其他地方,一个事件可以传输到多个 outputs,当传输完成后这个事件就结束。Elasticsearch 就是最常见的 outputs。
同时 Logstash 支持编码解码,可以在 inputs 和 outputs 端指定格式。
Logstash 接入 Kafka 的优势
可以异步处理数据:防止突发流量。
解耦:当 Elasticsearch 异常的时候不会影响上游工作。
注意: Logstash 过滤消耗资源,如果部署在生产 server 上会影响其性能。
下载并安装 Logstash,参见 Download Logstash。
下载并安装 JDK 8,参见 Download JDK 8。
已创建Kafka集群
步骤一、获取托管Kafka接入地址
登录金山云KMR控制台。
点击已经创建好的集群进入集群详情列表。
在集群详情页面找到获取集群接入地址。
步骤二、创建Topic
开启公网访问,进入Kafka eagle webui仪表盘界面。
点击Topic,点击创建。
步骤三、接入Kafka
作为inputs 接入
此处将标准输出作为数据终点,将 Kafka 作为数据来源。
input {
kafka {
bootstrap_servers => "xx.xx.xx.xx:xxxx" // kafka 集群接入地址
group_id => "logstash_group" // kafka groupid 名称
topics => ["logstash_test"] // kafka topic 名称
consumer_threads => 3 // 消费线程数,一般与 kafka 分区数一致
auto_offset_reset => "earliest"
}
}
output {
stdout{codec=>rubydebug}
}执行以下命令启动 Logstash,进行消息消费。
./logstash -f input.conf
返回结果如下:
可以看到刚才 Topic 中的数据被消费出来。
纯净模式
