全部文档
当前文档

暂无内容

如果没有找到您期望的内容,请尝试其他搜索词

文档中心

Logstash写数据入kafka

最近更新时间:2025-08-18 16:56:29

Logstash介绍

Logstash 是一个开源的日志处理工具,可以从多个源头收集数据、过滤收集的数据并对数据进行存储作为其他用途。

Logstash 灵活性强,拥有强大的语法分析功能,插件丰富,支持多种输入和输出源。Logstash 作为水平可伸缩的数据管道,与 Elasticsearch 和 Kibana 配合,在日志收集检索方面功能强大。

Logstash 工作原理

Logstash 数据处理可以分为三个阶段:inputs → filters → outputs。

  1. inputs:产生数据来源,例如文件、syslog、redis 和 beats 此类来源。

  2. filters:修改过滤数据, 在 Logstash 数据管道中属于中间环节,可以根据条件去对事件进行更改。一些常见的过滤器包括:grok、mutate、drop 和 clone 等。

  3. outputs:将数据传输到其他地方,一个事件可以传输到多个 outputs,当传输完成后这个事件就结束。Elasticsearch 就是最常见的 outputs。

同时 Logstash 支持编码解码,可以在 inputs 和 outputs 端指定格式。

Logstash 接入 Kafka 的优势

  • 可以异步处理数据:防止突发流量。

  • 解耦:当 Elasticsearch 异常的时候不会影响上游工作。

注意: Logstash 过滤消耗资源,如果部署在生产 server 上会影响其性能。

准备工作

操作步骤

步骤一、获取托管Kafka接入地址

  1. 登录金山云KMR控制台

  2. 点击已经创建好的集群进入集群详情列表。

  3. 在集群详情页面找到获取集群接入地址。

步骤二、创建Topic

  1. 开启公网访问,进入Kafka eagle webui仪表盘界面。

  2. 点击Topic,点击创建。

步骤三、接入Kafka

作为inputs 接入

  1. 执行 bin/logstash-plugin list,查看已经支持的插件是否含有 logstash-input-kafka。

  2. 在 .bin/ 目录下编写配置文件 input.conf。

此处将标准输出作为数据终点,将 Kafka 作为数据来源。

input {
    kafka {
        bootstrap_servers => "xx.xx.xx.xx:xxxx" // kafka 集群接入地址
        group_id => "logstash_group"  // kafka groupid 名称
        topics => ["logstash_test"] // kafka topic 名称
        consumer_threads => 3 // 消费线程数,一般与 kafka 分区数一致
        auto_offset_reset => "earliest"
    }
}
output {
    stdout{codec=>rubydebug}
}
  1. 执行以下命令启动 Logstash,进行消息消费。

./logstash -f input.conf

返回结果如下:

可以看到刚才 Topic 中的数据被消费出来。

文档导读
纯净模式常规模式

纯净模式

点击可全屏预览文档内容
文档反馈