使用Kafka协议上传日志

日志服务 BLS

  • 功能发布记录
  • 产品描述
    • 产品介绍
    • 使用限制
  • 产品定价
  • 快速入门
    • 简介
    • 安装收集器
    • 创建日志集
    • 创建传输任务
    • 日志分析和报警
    • 创建投递任务
    • 创建日志集
    • 简介
    • 日志分析和报警
    • 创建投递任务
    • 创建传输任务
    • 安装收集器
  • 操作指南
    • 百度智能云环境准备
    • 概览
    • 收集器
      • 主机安装收集器
      • 在k8s环境安装收集器
      • 设置收集器启动参数
      • 收集器管理
      • 收集器发行版本
    • 查询分析
      • 日志查询
      • 检索语法
      • SQL语法
    • 日志采集
      • 传输任务采集
        • 管理传输任务
        • 创建传输任务
      • 使用Kafka协议上传日志
    • 日志管理
      • 日志集管理
      • 日志视图
    • 仪表盘
      • 概述
      • 管理仪表盘
      • 管理仪表盘图表
    • 报警管理
      • 报警概述
      • 报警策略
        • 管理报警策略
        • 触发条件
      • 报警通知
        • 报警通知模版
        • 报警回调
      • 报警执行统计
      • 报警历史
    • 数据处理
      • 定时SQL分析
        • 创建定时SQL分析任务
        • 管理定时SQL分析任务
      • 日志投递
        • 日志投递概述
        • 创建投递任务
        • 管理投递任务
      • 数据加工
        • 数据加工函数总览
        • 事件操作函数
        • 字段值提取函数
        • 字段操作函数
        • 映射富化函数
        • 流程控制函数
      • 实时消费
    • 日志应用
      • 智能诊断
    • 多用户访问控制
  • 开发指南
    • API参考
      • API概述
      • 通用说明
      • 名词解释
      • 接口概览
      • 通用错误码
      • 服务域名
      • API功能发布记录
      • 日志组Project相关接口
        • 创建日志组CreateProject
        • 更新日志组UpdateProject
        • 删除日志组DeleteProject
        • 获取日志组列表ListProject
        • 获取日志组详情DescribeProject
      • 日志集LogStore相关接口
        • 创建日志集CreateLogStore
        • 修改日志集UpdateLogStore
        • 删除日志集DeleteLogStore
        • 获取日志集列表ListLogStore
        • 获取日志集详情DescribeLogStore
        • 批量获取日志集BatchGetLogStore
      • 日志流LogStream相关接口
        • 获取日志流列表ListLogStream
      • 日志记录LogRecord相关接口
        • 检索分析日志QueryLogRecord
        • 直方图接口QueryLogHistogram
        • 推送日志PushLogRecord
        • 获取日志记录PullLogRecord
      • 日志视图相关接口
        • 创建日志视图CreateLogStoreView
        • 修改日志视图UpdateLogStoreView
        • 删除日志视图DeleteLogStoreView
        • 获取日志视图列表ListLogStoreView
        • 获取日志视图详情DescribeLogStoreView
      • 下载日志Download相关接口
        • 创建下载任务CreateDownloadTask
        • 删除下载任务DeleteDownloadTask
        • 获取下载任务列表ListDownloadTask
        • 获取下载任务地址GetDownloadTaskLink
        • 获取下载任务详情DescribeDownloadTask
      • 快速查询FastQuery相关接口
        • 创建快速查询CreateFastQuery
        • 更新快速查询UpdateFastQuery
        • 删除快速查询DeleteFastQuery
        • 获取快速查询列表ListFastQuery
        • 获取快速查询详情DescribeFastQuery
      • 传输任务Task相关接口
        • 创建传输任务CreateTask
        • 更新传输任务UpdateTask
      • 报警Alarm相关接口
        • 创建报警策略CreateAlarmPolicy
        • 启用报警策略EnableAlarmPolicy
        • 获取报警历史详情DescribeAlarmRecord
        • 更新报警策略UpdateAlarmPolicy
        • 删除报警策略DeleteAlarmPolicy
        • 获取报警执行列表ListAlarmExecutions
        • 获取报警策略列表ListAlarmPolicy
        • 获取报警策略详情DescribeAlarmPolicy
        • 获取报警执行统计ListAlarmExecutionStats
        • 检验报警策略执行条件ValidateAlarmCondition
        • 禁用报警策略DisableAlarmPolicy
        • 获取报警历史列表ListAlarmRecord
        • 检验报警策略查询语句ValidateAlarmPolicySQL
      • 日志投递LogShipper相关接口
        • 创建日志投递CreateLogShipper
        • 更新日志投递UpdateLogShipper
        • 删除日志投递任务DeleteSingleLogShipper
        • 批量设置日志投递任务状态BulkSetLogShipperStatus
        • 设置日志投递任务状态SetSingleLogShipperStatus
        • 获取日志投递记录ListLogShipperRecord
        • 批量删除日志投递任务BulkDeleteLogShipper
        • 获取日志投递列表ListLogShipper
        • 获取日志投递详情GetLogShipper
      • 日志集模板相关接口
        • 创建日志集模板CreateLogStoreTemplate
        • 更新日志集模板UpdateLogStoreTemplate
        • 删除日志集模板DeleteLogStoreTemplates
        • 获取日志集模板列表DescribeLogStoreTemplates
        • 获取日志集模板详情DescribeLogStoreTemplate
      • 索引Index相关接口
        • 创建索引CreateIndex
        • 更新索引UpdateIndex
        • 删除索引DeleteIndex
        • 获取索引详情DescribeIndex
      • 兼容Elasticsearch相关接口
        • 检索和分析BLS日志 AsyncSearch
        • 查询索引列表 ResolveIndex
        • 推荐字段取值 TermsEnum
        • 获取索引字段列表 FieldCaps
    • SDK参考
      • SDK隐私政策
      • SDK开发者个人信息保护合规指引
      • Android & ISO SDK下载
      • Go SDK
        • 概述
        • 初始化
        • 安装SDK工具包
        • Project操作
        • 日志集LogStore操作
        • 日志流LogStream操作
        • 日志记录LogRecord操作
        • 日志投递LogShipper操作
        • 快速查询FastQuery操作
        • 下载任务操作
        • 索引Index操作
        • 版本发布记录
      • iOS SDK
        • 概述
        • 快速开始
        • 版本发布记录
      • Java SDK
        • 概述
        • 日志记录LogRecord操作
        • 安装SDK工具包
      • Android SDK
        • 概述
        • 快速开始
        • 版本发布记录
    • 导入SLS采集配置
  • 最佳实践
    • 使用同环比作为报警触发条件
    • 通过Grafana使用BLS
    • 使用Grafana访问日志服务的Elasticsearch兼容接口
    • BLS接入Kibana
  • 常见问题
    • 常见问题总览
    • 配置类问题
    • 故障类问题
  • 日志服务等级协议SLA
所有文档
menu
没有找到结果,请重新输入

日志服务 BLS

  • 功能发布记录
  • 产品描述
    • 产品介绍
    • 使用限制
  • 产品定价
  • 快速入门
    • 简介
    • 安装收集器
    • 创建日志集
    • 创建传输任务
    • 日志分析和报警
    • 创建投递任务
    • 创建日志集
    • 简介
    • 日志分析和报警
    • 创建投递任务
    • 创建传输任务
    • 安装收集器
  • 操作指南
    • 百度智能云环境准备
    • 概览
    • 收集器
      • 主机安装收集器
      • 在k8s环境安装收集器
      • 设置收集器启动参数
      • 收集器管理
      • 收集器发行版本
    • 查询分析
      • 日志查询
      • 检索语法
      • SQL语法
    • 日志采集
      • 传输任务采集
        • 管理传输任务
        • 创建传输任务
      • 使用Kafka协议上传日志
    • 日志管理
      • 日志集管理
      • 日志视图
    • 仪表盘
      • 概述
      • 管理仪表盘
      • 管理仪表盘图表
    • 报警管理
      • 报警概述
      • 报警策略
        • 管理报警策略
        • 触发条件
      • 报警通知
        • 报警通知模版
        • 报警回调
      • 报警执行统计
      • 报警历史
    • 数据处理
      • 定时SQL分析
        • 创建定时SQL分析任务
        • 管理定时SQL分析任务
      • 日志投递
        • 日志投递概述
        • 创建投递任务
        • 管理投递任务
      • 数据加工
        • 数据加工函数总览
        • 事件操作函数
        • 字段值提取函数
        • 字段操作函数
        • 映射富化函数
        • 流程控制函数
      • 实时消费
    • 日志应用
      • 智能诊断
    • 多用户访问控制
  • 开发指南
    • API参考
      • API概述
      • 通用说明
      • 名词解释
      • 接口概览
      • 通用错误码
      • 服务域名
      • API功能发布记录
      • 日志组Project相关接口
        • 创建日志组CreateProject
        • 更新日志组UpdateProject
        • 删除日志组DeleteProject
        • 获取日志组列表ListProject
        • 获取日志组详情DescribeProject
      • 日志集LogStore相关接口
        • 创建日志集CreateLogStore
        • 修改日志集UpdateLogStore
        • 删除日志集DeleteLogStore
        • 获取日志集列表ListLogStore
        • 获取日志集详情DescribeLogStore
        • 批量获取日志集BatchGetLogStore
      • 日志流LogStream相关接口
        • 获取日志流列表ListLogStream
      • 日志记录LogRecord相关接口
        • 检索分析日志QueryLogRecord
        • 直方图接口QueryLogHistogram
        • 推送日志PushLogRecord
        • 获取日志记录PullLogRecord
      • 日志视图相关接口
        • 创建日志视图CreateLogStoreView
        • 修改日志视图UpdateLogStoreView
        • 删除日志视图DeleteLogStoreView
        • 获取日志视图列表ListLogStoreView
        • 获取日志视图详情DescribeLogStoreView
      • 下载日志Download相关接口
        • 创建下载任务CreateDownloadTask
        • 删除下载任务DeleteDownloadTask
        • 获取下载任务列表ListDownloadTask
        • 获取下载任务地址GetDownloadTaskLink
        • 获取下载任务详情DescribeDownloadTask
      • 快速查询FastQuery相关接口
        • 创建快速查询CreateFastQuery
        • 更新快速查询UpdateFastQuery
        • 删除快速查询DeleteFastQuery
        • 获取快速查询列表ListFastQuery
        • 获取快速查询详情DescribeFastQuery
      • 传输任务Task相关接口
        • 创建传输任务CreateTask
        • 更新传输任务UpdateTask
      • 报警Alarm相关接口
        • 创建报警策略CreateAlarmPolicy
        • 启用报警策略EnableAlarmPolicy
        • 获取报警历史详情DescribeAlarmRecord
        • 更新报警策略UpdateAlarmPolicy
        • 删除报警策略DeleteAlarmPolicy
        • 获取报警执行列表ListAlarmExecutions
        • 获取报警策略列表ListAlarmPolicy
        • 获取报警策略详情DescribeAlarmPolicy
        • 获取报警执行统计ListAlarmExecutionStats
        • 检验报警策略执行条件ValidateAlarmCondition
        • 禁用报警策略DisableAlarmPolicy
        • 获取报警历史列表ListAlarmRecord
        • 检验报警策略查询语句ValidateAlarmPolicySQL
      • 日志投递LogShipper相关接口
        • 创建日志投递CreateLogShipper
        • 更新日志投递UpdateLogShipper
        • 删除日志投递任务DeleteSingleLogShipper
        • 批量设置日志投递任务状态BulkSetLogShipperStatus
        • 设置日志投递任务状态SetSingleLogShipperStatus
        • 获取日志投递记录ListLogShipperRecord
        • 批量删除日志投递任务BulkDeleteLogShipper
        • 获取日志投递列表ListLogShipper
        • 获取日志投递详情GetLogShipper
      • 日志集模板相关接口
        • 创建日志集模板CreateLogStoreTemplate
        • 更新日志集模板UpdateLogStoreTemplate
        • 删除日志集模板DeleteLogStoreTemplates
        • 获取日志集模板列表DescribeLogStoreTemplates
        • 获取日志集模板详情DescribeLogStoreTemplate
      • 索引Index相关接口
        • 创建索引CreateIndex
        • 更新索引UpdateIndex
        • 删除索引DeleteIndex
        • 获取索引详情DescribeIndex
      • 兼容Elasticsearch相关接口
        • 检索和分析BLS日志 AsyncSearch
        • 查询索引列表 ResolveIndex
        • 推荐字段取值 TermsEnum
        • 获取索引字段列表 FieldCaps
    • SDK参考
      • SDK隐私政策
      • SDK开发者个人信息保护合规指引
      • Android & ISO SDK下载
      • Go SDK
        • 概述
        • 初始化
        • 安装SDK工具包
        • Project操作
        • 日志集LogStore操作
        • 日志流LogStream操作
        • 日志记录LogRecord操作
        • 日志投递LogShipper操作
        • 快速查询FastQuery操作
        • 下载任务操作
        • 索引Index操作
        • 版本发布记录
      • iOS SDK
        • 概述
        • 快速开始
        • 版本发布记录
      • Java SDK
        • 概述
        • 日志记录LogRecord操作
        • 安装SDK工具包
      • Android SDK
        • 概述
        • 快速开始
        • 版本发布记录
    • 导入SLS采集配置
  • 最佳实践
    • 使用同环比作为报警触发条件
    • 通过Grafana使用BLS
    • 使用Grafana访问日志服务的Elasticsearch兼容接口
    • BLS接入Kibana
  • 常见问题
    • 常见问题总览
    • 配置类问题
    • 故障类问题
  • 日志服务等级协议SLA
  • 文档中心
  • arrow
  • 日志服务BLS
  • arrow
  • 操作指南
  • arrow
  • 日志采集
  • arrow
  • 使用Kafka协议上传日志
本页目录
  • 背景信息
  • 限制说明
  • 参数说明
  • 示例
  • 通过Filebeat上传日志
  • 通过Kafka Go SDK上传日志
  • 通过Kafka Java SDK上传日志

使用Kafka协议上传日志

更新时间:2025-11-14

日志服务支持通过Kafka协议上传日志数据到服务端,即可以使用Kafka Producer SDK来采集日志数据,并通过Kafka协议上传到日志服务。文本介绍通过采集工具采集到日志后,使用Kafka协议将日志上传到日志服务的操作步骤

背景信息

Kafka 作为高吞吐量的消息中间件,常用于自建日志采集场景中的消息管道。例如,可在日志源服务器通过开源采集工具采集日志,或由 Producer 直接写入日志数据。日志服务支持通过 Kafka 协议上传日志数据。 使用 Kafka 协议上传日志功能,无需额外开启功能,无需在数据源侧安装数据采集工具,基于简单的配置即可实现 Kafka Producer 采集并上传日志信息到日志服务。

限制说明

  • 支持的Kafka协议版本为2.1.0-2.3.1。
  • 支持压缩方式包括 gzip、snappy、lz4。
  • 为保证日志传输的安全性,必须使用 SASL_SSL 连接协议。对应的用户名为日志组名称,密码为日志服务账号密钥。
  • 通过Kafka协议上传日志时,要求日志格式必须为合法的JSON格式,且日志时间戳应包含@timestamp字段(格式为2006-01-02T15:04:05Z07:00);对于不合法的JSON格式,会返回错误。

参数说明

使用 Kafka 协议上传日志时,您需要配置以下参数。

参数 示例 说明
连接类型 SASL_SSL 为保证日志传输的安全性,必须使用 SASL_SSL 连接协议。对应的用户名为日志服务日志组名,密码为账号密钥。
username default Kafka SASL 用户名。应配置为日志服务的日志组名。
password ALTAKOGSZ***#ALTAK9sZr** Kafka SASL 用户密码。应配置为百度云账户密钥。格式为 ${AccessKey}#${SecretKey},其中:${AccessKey} 应替换为您的 AccessKey。${SecretKey} 应替换为您的SecretKey
host bls-log.bj.baidubce.com:8200 初始连接的集群地址,格式为服务地址:端口号,例如 bls-log.bj.baidubce.com:8200,其中:服务地址为当前地域下日志服务的服务入口。请根据地域选择正确的服务入口,详细信息请参见服务地址。端口号固定为 8200。
topic log-online 配置为日志集名称

示例

通过Filebeat上传日志

  • 配置示例
    示例中用的的参数配置请参见参数说明
YAML
1## 多输入源,通过fields参数实现写不同的日志集
2- type: log
3  fields:
4    logstore : "filebeat_1"
5  paths:
6    - /root/log/*.log
7- type: log
8  fields:
9    logstore : "filebeat_2"
10  paths:
11    - /root/log/*.txt
12 
13## 增大filebeat内部队列,可提高日志上传速率 (参考文档:https://www.elastic.co/docs/reference/beats/filebeat/configuring-internal-queue)
14queue.mem:
15  events: 100000
16
17## kafka输出配置
18output.kafka:
19  hosts: "[${host}]"
20  username: "${project}"
21  password: "${AccessKey}#${SecretKey}"
22  topic : '%{[fields.logstore]}' #联动Input参数实现输出到多个不同的日志集
23  required_acks: 1
24  sasl.mechanism: PLAIN
25  ssl.enabled: true
  • 限制说明

    • filebeat的日志输出必须使用JSON格式
    • 为保证日志传输的安全性,必须设置sasl.enabled:true和sasl.mechanism:PLAIN
    • filebeat默认配置的queue.mem.events为3200,推荐设置20000-100000,以提高日志推送性能

通过Kafka Go SDK上传日志

  • 依赖
Palin
1go get github.com/IBM/sarama
  • 代码示例
Go
1package main
2
3import (
4	"crypto/tls"
5	"fmt"
6	"log"
7	"time"
8
9	"github.com/IBM/sarama"
10)
11
12func main() {
13	config := sarama.NewConfig()
14	config.Metadata.Full = false
15	config.Net.SASL.Mechanism = "PLAIN"
16	config.Net.SASL.Enable = true
17	config.Net.TLS.Enable = true
18    // username为日志组名称
19	config.Net.SASL.User = "${project}"
20	config.Producer.Return.Errors = true
21    // 百度云的密钥
22	config.Net.SASL.Password = "${AccessKey}#${SecretKey}"
23    // hosts为服务地址。具体说明,请参考本文中的参数说明
24	producer, err := sarama.NewAsyncProducer([]string{"{$hosts}"}, config)
25    if err != nil {
26        fmt.Println("new producer error:" + err.Error())
27        panic(err)
28    }
29	go func() {
30		for e := range producer.Errors() {
31			fmt.Println(e)
32		}
33	}()
34	channel := producer.Input()
35	for i := 0; i < 10; i++ {
36		channel <- &sarama.ProducerMessage{
37            // ${logStoreName}为日志集名称,例如log-online
38			Topic: "${logStoreName}",
39			Value: sarama.StringEncoder("{\"@timestamp\":\"2025-09-10T04:41:12.220Z\",\"level\":\"info\",\"latency\":0.008858347,\"status\":200,\"method\":\"POST\"}"),
40		}
41	}
42	time.Sleep(time.Minute)
43	producer.Close()
44}

通过Kafka Java SDK上传日志

  • 依赖
XML
1<dependency>
2    <groupId>org.apache.kafka</groupId>
3    <artifactId>kafka-clients</artifactId>
4    <version>2.3.1</version>
5</dependency>
  • 代码示例
Java
1package org.wjr.test;
2
3
4import org.apache.kafka.clients.CommonClientConfigs;
5import org.apache.kafka.clients.consumer.ConsumerConfig;
6import org.apache.kafka.clients.producer.Producer;
7import org.apache.kafka.clients.producer.ProducerConfig;
8import org.apache.kafka.clients.producer.ProducerRecord;
9import org.apache.kafka.clients.producer.RecordMetadata;
10import org.apache.kafka.common.config.SaslConfigs;
11
12import java.util.Properties;
13import java.util.concurrent.ExecutionException;
14import java.util.concurrent.Future;
15import java.util.concurrent.TimeUnit;
16import java.util.concurrent.TimeoutException;
17
18public class Main {
19    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
20        Properties props = new Properties();
21        // hosts为服务地址。具体说明,请参考本文中的参数说明
22        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "${hosts}"); 
23        // 日志组名称
24        String username = "${project}"
25        // 百度云密钥
26        String password = "${AccessKey}#${SecretKey}"
27        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
28        props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
29        props.put(SaslConfigs.SASL_JAAS_CONFIG,
30                "org.apache.kafka.common.security.plain.PlainLoginModule " +
31                        "required username=\"" + username + "\" password=\"" + password + "\";");
32        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
33        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
34        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
35        Producer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer<String, String>(props);
36        // 调用 send 方法。
37        Future<RecordMetadata> meta = producer.send(new ProducerRecord<String, String>("java_test", "{\"@timestamp\":\"2025-09-17T02:41:12.220Z\",\"level\":\"info\",\"latency\":0.008858347,\"status\":200,\"method\":\"POST\"}"));
38        RecordMetadata recordMetadata = meta.get(100, TimeUnit.SECONDS);
39        // 等待Kafka Producer异步发送数据
40        Thread.sleep(1000000);
41        producer.close();
42    }
43}

上一篇
传输任务采集
下一篇
日志管理