Filebeat接入Kafka专享版

消息服务 for Kafka Kafka

  • 功能发布记录
  • 共享版
    • 产品定价
    • 产品描述
      • 介绍
    • 快速入门
      • 多用户访问控制
      • 监控报警
      • 操作流程
    • 常见问题
      • 常见问题总览
      • 安全类问题
      • 配置类问题
    • API文档
      • 通用说明
      • 简介
      • 公共头
      • 接口说明
      • 模型定义
      • 服务域名
      • 错误返回
  • 专享版
    • 产品描述
      • 使用限制
      • 基本概念
      • 产品规格
      • 产品优势
      • 产品介绍
      • 应用场景
      • 产品架构
    • 开发指南
      • 访问协议介绍
      • 接入点查看
      • 概述
      • Python示例
        • VPC网络PLAINTEXT方式生产和消费
        • VPC网络SASL_PLAINTEXT方式生产和消费消息
        • SSL协议生产和消费消息
        • 公网SASL_SSL方式生产和消费消息
      • Go示例
        • VPC网络PLAINTEXT方式生产和消费
        • VPC网络SASL_PLAINTEXT方式生产和消费消息
        • SSL协议生产和消费消息
        • 公网SASL_SSL方式生产和消费消息
      • Java示例
        • VPC网络PLAINTEXT方式生产和消费
        • VPC网络SASL_PLAINTEXT方式生产和消费消息
        • SSL协议生产和消费消息
        • 公网SASL_SSL方式生产和消费消息
      • PHP示例
        • VPC网络PLAINTEXT方式生产和消费
        • VPC网络SASL_PLAINTEXT方式生产和消费消息
        • SSL协议生产和消费消息
        • 公网SASL_SSL方式生产和消费消息
      • C++示例
        • VPC网络PLAINTEXT方式生产和消费
        • VPC网络SASL_PLAINTEXT方式生产和消费消息
        • SSL协议生产和消费消息
        • 公网SASL_SSL方式生产和消费消息
    • 快速入门
      • 步骤二:创建主题
      • 步骤一:创建Kafka集群实例
      • 步骤三:配置权限认证
      • 概述
      • 步骤四:访问Kafka集群
        • 使用SASL_SSL协议访问集群
        • 使用PLAINTEXT协议访问集群
        • 使用SASL_PLAINTEXT协议访问集群
        • 使用SSL协议访问集群
    • 操作指南
      • 多用户访问控制
      • 消息查询
      • 标签管理
      • 存储分析
      • 集群日志
      • 操作审计
      • 任务管理
        • 任务类型介绍
        • 查看任务详情
      • 主题管理
        • 主题重新分区
        • 删除主题
        • 创建主题
        • 修改主题分区数
        • 查看主题订阅关系
        • 查看主题分区详情
        • 查看主题详情
        • 修改主题配置
      • 用户管理
        • 删除用户
        • 重置用户密码
        • 创建用户
      • 监控报警
        • 报警策略配置
        • 集群监控
        • 事件策略配置
      • 消费组管理
        • 查看消费组订阅信息
        • 消费组重置位点
        • 删除消费组
      • 集群配置管理
        • 配置参数介绍
        • 查看集群配置详情
        • 创建集群配置
        • 删除集群配置
      • 权限管理
        • 创建权限
        • 删除权限
      • 集群管理
        • 删除集群
        • 查看集群信息
        • 变更访问配置
        • 集群变更
        • 磁盘水位处理
        • 查看Controller信息
        • 重启节点
        • 查看集群接入点
        • 集群启停
    • 最佳实践
      • Flink接入Kafka专享版
      • 如何选择合适的集群规格
      • Logstash接入Kafka专享版
      • Filebeat接入Kafka专享版
      • 业务迁移
    • API参考
      • 更新记录
      • 调用说明
      • 附录
      • 服务域名
      • 错误返回
      • 主题管理接口
        • 查询主题列表
        • 查询主题订阅详情
        • 删除主题
        • 查询主题详情
        • 创建主题
        • 获取订阅主题的消费组列表
        • 查询主题分区详情
        • 变更主题
        • 查询主题分区列表
      • 用户管理接口
        • 查询用户列表
        • 删除用户
        • 重置用户密码
        • 创建用户
      • 消费组管理接口
        • 删除消费组
        • 重置消费组位点
        • 查询消费组列表
        • 查询消费组订阅的主题列表
      • 集群管理接口
        • 查询集群参数
        • 创建集群
        • 释放集群
        • 查询集群节点列表
        • 停止集群
        • 查询集群详情
        • 启动集群
        • 查询集群列表
        • 查询集群接入点
      • 集群变更接口
        • 变更访问配置
        • 扩容磁盘容量
        • 增加节点数量
        • 变更用户安全组
        • 变更节点机型
        • 变更集群配置
        • 集群公网开关
        • 变更存储策略
        • 变更公网带宽
      • 权限管理接口
        • 查询权限列表
        • 创建权限
        • 删除权限
      • 任务管理接口
        • 暂停任务
        • 恢复任务
        • 查询操作详情
        • 查询任务列表
        • 取消任务
        • 查询任务详情
        • 启动任务
      • 集群配置管理接口
        • 查询集群配置列表
        • 查询集群配置详情
        • 创建集群配置
        • 查询集群配置版本列表
        • 新增集群配置版本
        • 查询集群配置版本详情
        • 删除集群配置
    • 产品定价
      • 余额不足提醒和欠费处理
      • 计费说明
      • 变配规则说明
      • 续费说明
所有文档
menu
没有找到结果,请重新输入

消息服务 for Kafka Kafka

  • 功能发布记录
  • 共享版
    • 产品定价
    • 产品描述
      • 介绍
    • 快速入门
      • 多用户访问控制
      • 监控报警
      • 操作流程
    • 常见问题
      • 常见问题总览
      • 安全类问题
      • 配置类问题
    • API文档
      • 通用说明
      • 简介
      • 公共头
      • 接口说明
      • 模型定义
      • 服务域名
      • 错误返回
  • 专享版
    • 产品描述
      • 使用限制
      • 基本概念
      • 产品规格
      • 产品优势
      • 产品介绍
      • 应用场景
      • 产品架构
    • 开发指南
      • 访问协议介绍
      • 接入点查看
      • 概述
      • Python示例
        • VPC网络PLAINTEXT方式生产和消费
        • VPC网络SASL_PLAINTEXT方式生产和消费消息
        • SSL协议生产和消费消息
        • 公网SASL_SSL方式生产和消费消息
      • Go示例
        • VPC网络PLAINTEXT方式生产和消费
        • VPC网络SASL_PLAINTEXT方式生产和消费消息
        • SSL协议生产和消费消息
        • 公网SASL_SSL方式生产和消费消息
      • Java示例
        • VPC网络PLAINTEXT方式生产和消费
        • VPC网络SASL_PLAINTEXT方式生产和消费消息
        • SSL协议生产和消费消息
        • 公网SASL_SSL方式生产和消费消息
      • PHP示例
        • VPC网络PLAINTEXT方式生产和消费
        • VPC网络SASL_PLAINTEXT方式生产和消费消息
        • SSL协议生产和消费消息
        • 公网SASL_SSL方式生产和消费消息
      • C++示例
        • VPC网络PLAINTEXT方式生产和消费
        • VPC网络SASL_PLAINTEXT方式生产和消费消息
        • SSL协议生产和消费消息
        • 公网SASL_SSL方式生产和消费消息
    • 快速入门
      • 步骤二:创建主题
      • 步骤一:创建Kafka集群实例
      • 步骤三:配置权限认证
      • 概述
      • 步骤四:访问Kafka集群
        • 使用SASL_SSL协议访问集群
        • 使用PLAINTEXT协议访问集群
        • 使用SASL_PLAINTEXT协议访问集群
        • 使用SSL协议访问集群
    • 操作指南
      • 多用户访问控制
      • 消息查询
      • 标签管理
      • 存储分析
      • 集群日志
      • 操作审计
      • 任务管理
        • 任务类型介绍
        • 查看任务详情
      • 主题管理
        • 主题重新分区
        • 删除主题
        • 创建主题
        • 修改主题分区数
        • 查看主题订阅关系
        • 查看主题分区详情
        • 查看主题详情
        • 修改主题配置
      • 用户管理
        • 删除用户
        • 重置用户密码
        • 创建用户
      • 监控报警
        • 报警策略配置
        • 集群监控
        • 事件策略配置
      • 消费组管理
        • 查看消费组订阅信息
        • 消费组重置位点
        • 删除消费组
      • 集群配置管理
        • 配置参数介绍
        • 查看集群配置详情
        • 创建集群配置
        • 删除集群配置
      • 权限管理
        • 创建权限
        • 删除权限
      • 集群管理
        • 删除集群
        • 查看集群信息
        • 变更访问配置
        • 集群变更
        • 磁盘水位处理
        • 查看Controller信息
        • 重启节点
        • 查看集群接入点
        • 集群启停
    • 最佳实践
      • Flink接入Kafka专享版
      • 如何选择合适的集群规格
      • Logstash接入Kafka专享版
      • Filebeat接入Kafka专享版
      • 业务迁移
    • API参考
      • 更新记录
      • 调用说明
      • 附录
      • 服务域名
      • 错误返回
      • 主题管理接口
        • 查询主题列表
        • 查询主题订阅详情
        • 删除主题
        • 查询主题详情
        • 创建主题
        • 获取订阅主题的消费组列表
        • 查询主题分区详情
        • 变更主题
        • 查询主题分区列表
      • 用户管理接口
        • 查询用户列表
        • 删除用户
        • 重置用户密码
        • 创建用户
      • 消费组管理接口
        • 删除消费组
        • 重置消费组位点
        • 查询消费组列表
        • 查询消费组订阅的主题列表
      • 集群管理接口
        • 查询集群参数
        • 创建集群
        • 释放集群
        • 查询集群节点列表
        • 停止集群
        • 查询集群详情
        • 启动集群
        • 查询集群列表
        • 查询集群接入点
      • 集群变更接口
        • 变更访问配置
        • 扩容磁盘容量
        • 增加节点数量
        • 变更用户安全组
        • 变更节点机型
        • 变更集群配置
        • 集群公网开关
        • 变更存储策略
        • 变更公网带宽
      • 权限管理接口
        • 查询权限列表
        • 创建权限
        • 删除权限
      • 任务管理接口
        • 暂停任务
        • 恢复任务
        • 查询操作详情
        • 查询任务列表
        • 取消任务
        • 查询任务详情
        • 启动任务
      • 集群配置管理接口
        • 查询集群配置列表
        • 查询集群配置详情
        • 创建集群配置
        • 查询集群配置版本列表
        • 新增集群配置版本
        • 查询集群配置版本详情
        • 删除集群配置
    • 产品定价
      • 余额不足提醒和欠费处理
      • 计费说明
      • 变配规则说明
      • 续费说明
  • 文档中心
  • arrow
  • 消息服务 for KafkaKafka
  • arrow
  • 专享版
  • arrow
  • 最佳实践
  • arrow
  • Filebeat接入Kafka专享版
本页目录
  • Filebeat
  • 前提条件
  • Filebeat接入
  • 步骤一:获取接入点
  • 步骤二:创建/获取主题
  • 步骤三:Filebeat发送消息
  • 准备配置文件
  • Filebeat 发送消息
  • 步骤四:Filebeat消费消息
  • 准备配置文件
  • Filebeat消费消息
  • 步骤五:查看集群监控

Filebeat接入Kafka专享版

更新时间:2025-08-21

Filebeat

Filebeat是用于转发和集中日志数据的轻量级传送工具。Filebeat监视您指定的日志文件或位置,收集日志事件,并将它们转发到Elasticsearch或 Logstash进行索引。

Filebeat的工作方式如下:启动Filebeat时,它将启动一个或多个输入,这些输入将在为日志数据指定的位置中查找。对于Filebeat所找到的每个日志,Filebeat都会启动收集器。每个收集器都读取单个日志以获取新内容,并将新日志数据发送到libbeat,libbeat将聚集事件,并将聚集的数据发送到为Filebeat配置的输出。

工作流程图如下:

Filebeat1.png

前提条件

  • 下载并安装Filebeat(参见Download Filebeat)
  • 下载并安装1.8或以上版本 JDK
  • 已创建消息服务 for kafka集群

Filebeat接入

步骤一:获取接入点

具体请参考查看集群接入点

SSL/SASL方式下载证书参考:如何下载证书?。

步骤二:创建/获取主题

具体请参考创建主题、查看主题详情

步骤三:Filebeat发送消息

准备配置文件

进入 Filebeat 的安装目录,创建配置监控文件 filebeat.yml,参数详情可参考Filebeat官网。

YAML
1filebeat.inputs:
2
3- input_type: log 
4
5# 此处为监听文件路径
6  paths:
7    - /var/log/*.log
8#------------------------------- Kafka output ----------------------------------
9output.kafka:
10  # 是否启用
11  enabled: true
12 
13  # The list of Kafka broker addresses from which to fetch the cluster metadata.
14  # The cluster metadata contain the actual Kafka brokers events are published
15  # to
16  # 获取集群元数据的 Kafka 代理地址列表(接入点)
17  hosts: ["x.x.x.x:9095","x.x.x.x:9095","x.x.x.x:9095"]
18 
19  # 用于生成事件的 Kafka 主题, 可以使用任何事件 field 的格式字符串, 若要从文档类型设置主题,请使用 `%{[type]}`.
20  topic: test
21  
22  # 设置sasl协议字段
23  sasl.mechanism: "SCRAM-SHA-512" 
24 
25 # 设置 SSL协议字段
26 #SSL/SASL_SSL方式请下载证书文件: kafka-key.zip
27 #ssl.certificate_authorities:[/***/client_ssl.properties]
28  partition.random:
29    # If enabled, events will only be published to partitions with reachable leaders. Default is false.
30    # reachable_only 设置为true,则事件将仅发布到可用的分区
31    # 必须是 random, round_robin, hash 三种的一种
32    # 默认为 false
33    reachable_only: false
34 
35  # Authentication details. Password is required if username is set.
36  # 身份验证的细节。如果设置了用户名,则需要密码。
37  # 没有设置可保持空值
38  username: ''
39  password: ''
40 
41  # Sets the output compression codec. Must be one of none, snappy and gzip. The default is gzip.
42  # 设置输出压缩编解码器
43  # 必须是 none, snappy 和 gzip 中的一个
44  # 默认是 gzip
45  compression: none
46 
47  # Set the compression level. Currently only gzip provides a compression level between 0 and 9. The default value is chosen by the compression algorithm.
48  # 设置压缩级别
49  # 目前只有 gzip 提供 0 到 9 之间的压缩级别
50  # 默认值由压缩算法选择
51  compression_level: 4
52 
53  # The maximum permitted size of JSON-encoded messages. Bigger messages will be dropped. The default value is 1000000 (bytes). This value should be equal to r less than the broker's message.max.bytes.
54  # json编码消息的最大允许大小, 更大的消息将被删除。默认值是 1000000 ( 字节 ) 。这个值应该等于 r,小于 broker 的 message.max.bytes
55  max_message_bytes: 1000000
56  # The ACK reliability level required from broker. 0=no response, 1=wait for local commit, -1=wait for all replicas to commit. The default is 1.  Note: If set to 0, no ACKs are returned by Kafka. Messages might be lost silently on error.
57  # 代理要求的ACK可靠性级别
58  # 0=无响应,1=等待本地提交,-1=等待所有副本提交
59  # 默认值是1
60  # 注意:如果设置为0,Kafka不会返回任何ack。出错时,消息可能会悄无声息地丢失。
61  required_acks: 1
62  # The configurable ClientID used for logging, debugging, and auditing purposes.  The default is "beats".
63  # 可配置的ClientID,用于日志记录、调试和审计。默认是“beats”。
64  client_id: beats

Filebeat 发送消息

在filebeat安装路径执行如下命令

Shell
1./filebeat -e -c filebeat.yml 

步骤四:Filebeat消费消息

准备配置文件

YAML
1filebeat.inputs:
2- type: kafka
3  hosts:
4    - ip:端口
5    - ip:端口
6    - ip:端口
7    
8  # 身份验证的细节。如果设置了用户名,则需要密码。
9  # 没有设置可保持空值
10  username: ""
11  password: ""
12  #Topic的名称。
13  topics: ["filebeat_test"]
14  #Group的名称。
15  group_id: "filebeat_group"
16  #证书所在位置
17 #SSL/SASL_SSL方式请下载证书文件: kafka-key.zip
18 #ssl.certificate_authorities:[/***/client_ssl.properties]
19
20  ssl.verification_mode: none
21
22output.console:
23  pretty: true

Filebeat消费消息

在filebeat安装路径执行如下命令

Shell
1./filebeat -c ./input.yml 

步骤五:查看集群监控

查看消息是否发送成功或消费成功有两种方式:

  1. 在服务器端查看jar包运行日志。
  2. 在专享版消息服务 for Kafka控制台查看集群监控,获取集群生产、消息情况。

推荐使用第二种方式,下面介绍如何查看集群监控。

(1)在专享版消息服务 for Kafka的控制台页面找到需要连接的集群,点击集群名称进入集群详情页面。

监控信息1.png

(2)页面跳转后,进入左侧边中的集群详情页面。

监控信息2.png

(3)点击左侧边栏中的集群监控,进入集群监控页面。

监控信息3.png

(4)通过查看集群监控页面,提供的不同纬度的监控信息(集群监控、节点监控、主题监控、消费组监控),即可获知集群的生产和消费情况。

集群监控的具体使用请参考:集群监控

监控信息4.png

上一篇
Logstash接入Kafka专享版
下一篇
业务迁移