实时消费

日志服务 BLS

  • 功能发布记录
  • 产品描述
    • 产品介绍
    • 使用限制
  • 产品定价
  • 快速入门
    • 简介
    • 安装收集器
    • 创建日志集
    • 创建传输任务
    • 日志分析和报警
    • 创建投递任务
    • 创建日志集
    • 简介
    • 日志分析和报警
    • 创建投递任务
    • 创建传输任务
    • 安装收集器
  • 操作指南
    • 百度智能云环境准备
    • 概览
    • 收集器
      • 主机安装收集器
      • 在k8s环境安装收集器
      • 设置收集器启动参数
      • 收集器管理
      • 收集器发行版本
    • 查询分析
      • 日志查询
      • 检索语法
      • SQL语法
    • 日志采集
      • 传输任务采集
        • 管理传输任务
        • 创建传输任务
      • 使用Kafka协议上传日志
    • 日志管理
      • 日志集管理
      • 日志视图
    • 仪表盘
      • 概述
      • 管理仪表盘
      • 管理仪表盘图表
    • 报警管理
      • 报警概述
      • 报警策略
        • 管理报警策略
        • 触发条件
      • 报警通知
        • 报警通知模版
        • 报警回调
      • 报警执行统计
      • 报警历史
    • 数据处理
      • 定时SQL分析
        • 创建定时SQL分析任务
        • 管理定时SQL分析任务
      • 日志投递
        • 日志投递概述
        • 创建投递任务
        • 管理投递任务
      • 数据加工
        • 数据加工函数总览
        • 事件操作函数
        • 字段值提取函数
        • 字段操作函数
        • 映射富化函数
        • 流程控制函数
      • 实时消费
    • 日志应用
      • 智能诊断
    • 多用户访问控制
  • 开发指南
    • API参考
      • API概述
      • 通用说明
      • 名词解释
      • 接口概览
      • 通用错误码
      • 服务域名
      • API功能发布记录
      • 日志组Project相关接口
        • 创建日志组CreateProject
        • 更新日志组UpdateProject
        • 删除日志组DeleteProject
        • 获取日志组列表ListProject
        • 获取日志组详情DescribeProject
      • 日志集LogStore相关接口
        • 创建日志集CreateLogStore
        • 修改日志集UpdateLogStore
        • 删除日志集DeleteLogStore
        • 获取日志集列表ListLogStore
        • 获取日志集详情DescribeLogStore
        • 批量获取日志集BatchGetLogStore
      • 日志流LogStream相关接口
        • 获取日志流列表ListLogStream
      • 日志记录LogRecord相关接口
        • 检索分析日志QueryLogRecord
        • 直方图接口QueryLogHistogram
        • 推送日志PushLogRecord
        • 获取日志记录PullLogRecord
      • 日志视图相关接口
        • 创建日志视图CreateLogStoreView
        • 修改日志视图UpdateLogStoreView
        • 删除日志视图DeleteLogStoreView
        • 获取日志视图列表ListLogStoreView
        • 获取日志视图详情DescribeLogStoreView
      • 下载日志Download相关接口
        • 创建下载任务CreateDownloadTask
        • 删除下载任务DeleteDownloadTask
        • 获取下载任务列表ListDownloadTask
        • 获取下载任务地址GetDownloadTaskLink
        • 获取下载任务详情DescribeDownloadTask
      • 快速查询FastQuery相关接口
        • 创建快速查询CreateFastQuery
        • 更新快速查询UpdateFastQuery
        • 删除快速查询DeleteFastQuery
        • 获取快速查询列表ListFastQuery
        • 获取快速查询详情DescribeFastQuery
      • 传输任务Task相关接口
        • 创建传输任务CreateTask
        • 更新传输任务UpdateTask
      • 报警Alarm相关接口
        • 创建报警策略CreateAlarmPolicy
        • 启用报警策略EnableAlarmPolicy
        • 获取报警历史详情DescribeAlarmRecord
        • 更新报警策略UpdateAlarmPolicy
        • 删除报警策略DeleteAlarmPolicy
        • 获取报警执行列表ListAlarmExecutions
        • 获取报警策略列表ListAlarmPolicy
        • 获取报警策略详情DescribeAlarmPolicy
        • 获取报警执行统计ListAlarmExecutionStats
        • 检验报警策略执行条件ValidateAlarmCondition
        • 禁用报警策略DisableAlarmPolicy
        • 获取报警历史列表ListAlarmRecord
        • 检验报警策略查询语句ValidateAlarmPolicySQL
      • 日志投递LogShipper相关接口
        • 创建日志投递CreateLogShipper
        • 更新日志投递UpdateLogShipper
        • 删除日志投递任务DeleteSingleLogShipper
        • 批量设置日志投递任务状态BulkSetLogShipperStatus
        • 设置日志投递任务状态SetSingleLogShipperStatus
        • 获取日志投递记录ListLogShipperRecord
        • 批量删除日志投递任务BulkDeleteLogShipper
        • 获取日志投递列表ListLogShipper
        • 获取日志投递详情GetLogShipper
      • 日志集模板相关接口
        • 创建日志集模板CreateLogStoreTemplate
        • 更新日志集模板UpdateLogStoreTemplate
        • 删除日志集模板DeleteLogStoreTemplates
        • 获取日志集模板列表DescribeLogStoreTemplates
        • 获取日志集模板详情DescribeLogStoreTemplate
      • 索引Index相关接口
        • 创建索引CreateIndex
        • 更新索引UpdateIndex
        • 删除索引DeleteIndex
        • 获取索引详情DescribeIndex
      • 兼容Elasticsearch相关接口
        • 检索和分析BLS日志 AsyncSearch
        • 查询索引列表 ResolveIndex
        • 推荐字段取值 TermsEnum
        • 获取索引字段列表 FieldCaps
    • SDK参考
      • SDK隐私政策
      • SDK开发者个人信息保护合规指引
      • Android & ISO SDK下载
      • Go SDK
        • 概述
        • 初始化
        • 安装SDK工具包
        • Project操作
        • 日志集LogStore操作
        • 日志流LogStream操作
        • 日志记录LogRecord操作
        • 日志投递LogShipper操作
        • 快速查询FastQuery操作
        • 下载任务操作
        • 索引Index操作
        • 版本发布记录
      • iOS SDK
        • 概述
        • 快速开始
        • 版本发布记录
      • Java SDK
        • 概述
        • 日志记录LogRecord操作
        • 安装SDK工具包
      • Android SDK
        • 概述
        • 快速开始
        • 版本发布记录
    • 导入SLS采集配置
  • 最佳实践
    • 使用同环比作为报警触发条件
    • 通过Grafana使用BLS
    • 使用Grafana访问日志服务的Elasticsearch兼容接口
    • BLS接入Kibana
  • 常见问题
    • 常见问题总览
    • 配置类问题
    • 故障类问题
  • 日志服务等级协议SLA
所有文档
menu
没有找到结果,请重新输入

日志服务 BLS

  • 功能发布记录
  • 产品描述
    • 产品介绍
    • 使用限制
  • 产品定价
  • 快速入门
    • 简介
    • 安装收集器
    • 创建日志集
    • 创建传输任务
    • 日志分析和报警
    • 创建投递任务
    • 创建日志集
    • 简介
    • 日志分析和报警
    • 创建投递任务
    • 创建传输任务
    • 安装收集器
  • 操作指南
    • 百度智能云环境准备
    • 概览
    • 收集器
      • 主机安装收集器
      • 在k8s环境安装收集器
      • 设置收集器启动参数
      • 收集器管理
      • 收集器发行版本
    • 查询分析
      • 日志查询
      • 检索语法
      • SQL语法
    • 日志采集
      • 传输任务采集
        • 管理传输任务
        • 创建传输任务
      • 使用Kafka协议上传日志
    • 日志管理
      • 日志集管理
      • 日志视图
    • 仪表盘
      • 概述
      • 管理仪表盘
      • 管理仪表盘图表
    • 报警管理
      • 报警概述
      • 报警策略
        • 管理报警策略
        • 触发条件
      • 报警通知
        • 报警通知模版
        • 报警回调
      • 报警执行统计
      • 报警历史
    • 数据处理
      • 定时SQL分析
        • 创建定时SQL分析任务
        • 管理定时SQL分析任务
      • 日志投递
        • 日志投递概述
        • 创建投递任务
        • 管理投递任务
      • 数据加工
        • 数据加工函数总览
        • 事件操作函数
        • 字段值提取函数
        • 字段操作函数
        • 映射富化函数
        • 流程控制函数
      • 实时消费
    • 日志应用
      • 智能诊断
    • 多用户访问控制
  • 开发指南
    • API参考
      • API概述
      • 通用说明
      • 名词解释
      • 接口概览
      • 通用错误码
      • 服务域名
      • API功能发布记录
      • 日志组Project相关接口
        • 创建日志组CreateProject
        • 更新日志组UpdateProject
        • 删除日志组DeleteProject
        • 获取日志组列表ListProject
        • 获取日志组详情DescribeProject
      • 日志集LogStore相关接口
        • 创建日志集CreateLogStore
        • 修改日志集UpdateLogStore
        • 删除日志集DeleteLogStore
        • 获取日志集列表ListLogStore
        • 获取日志集详情DescribeLogStore
        • 批量获取日志集BatchGetLogStore
      • 日志流LogStream相关接口
        • 获取日志流列表ListLogStream
      • 日志记录LogRecord相关接口
        • 检索分析日志QueryLogRecord
        • 直方图接口QueryLogHistogram
        • 推送日志PushLogRecord
        • 获取日志记录PullLogRecord
      • 日志视图相关接口
        • 创建日志视图CreateLogStoreView
        • 修改日志视图UpdateLogStoreView
        • 删除日志视图DeleteLogStoreView
        • 获取日志视图列表ListLogStoreView
        • 获取日志视图详情DescribeLogStoreView
      • 下载日志Download相关接口
        • 创建下载任务CreateDownloadTask
        • 删除下载任务DeleteDownloadTask
        • 获取下载任务列表ListDownloadTask
        • 获取下载任务地址GetDownloadTaskLink
        • 获取下载任务详情DescribeDownloadTask
      • 快速查询FastQuery相关接口
        • 创建快速查询CreateFastQuery
        • 更新快速查询UpdateFastQuery
        • 删除快速查询DeleteFastQuery
        • 获取快速查询列表ListFastQuery
        • 获取快速查询详情DescribeFastQuery
      • 传输任务Task相关接口
        • 创建传输任务CreateTask
        • 更新传输任务UpdateTask
      • 报警Alarm相关接口
        • 创建报警策略CreateAlarmPolicy
        • 启用报警策略EnableAlarmPolicy
        • 获取报警历史详情DescribeAlarmRecord
        • 更新报警策略UpdateAlarmPolicy
        • 删除报警策略DeleteAlarmPolicy
        • 获取报警执行列表ListAlarmExecutions
        • 获取报警策略列表ListAlarmPolicy
        • 获取报警策略详情DescribeAlarmPolicy
        • 获取报警执行统计ListAlarmExecutionStats
        • 检验报警策略执行条件ValidateAlarmCondition
        • 禁用报警策略DisableAlarmPolicy
        • 获取报警历史列表ListAlarmRecord
        • 检验报警策略查询语句ValidateAlarmPolicySQL
      • 日志投递LogShipper相关接口
        • 创建日志投递CreateLogShipper
        • 更新日志投递UpdateLogShipper
        • 删除日志投递任务DeleteSingleLogShipper
        • 批量设置日志投递任务状态BulkSetLogShipperStatus
        • 设置日志投递任务状态SetSingleLogShipperStatus
        • 获取日志投递记录ListLogShipperRecord
        • 批量删除日志投递任务BulkDeleteLogShipper
        • 获取日志投递列表ListLogShipper
        • 获取日志投递详情GetLogShipper
      • 日志集模板相关接口
        • 创建日志集模板CreateLogStoreTemplate
        • 更新日志集模板UpdateLogStoreTemplate
        • 删除日志集模板DeleteLogStoreTemplates
        • 获取日志集模板列表DescribeLogStoreTemplates
        • 获取日志集模板详情DescribeLogStoreTemplate
      • 索引Index相关接口
        • 创建索引CreateIndex
        • 更新索引UpdateIndex
        • 删除索引DeleteIndex
        • 获取索引详情DescribeIndex
      • 兼容Elasticsearch相关接口
        • 检索和分析BLS日志 AsyncSearch
        • 查询索引列表 ResolveIndex
        • 推荐字段取值 TermsEnum
        • 获取索引字段列表 FieldCaps
    • SDK参考
      • SDK隐私政策
      • SDK开发者个人信息保护合规指引
      • Android & ISO SDK下载
      • Go SDK
        • 概述
        • 初始化
        • 安装SDK工具包
        • Project操作
        • 日志集LogStore操作
        • 日志流LogStream操作
        • 日志记录LogRecord操作
        • 日志投递LogShipper操作
        • 快速查询FastQuery操作
        • 下载任务操作
        • 索引Index操作
        • 版本发布记录
      • iOS SDK
        • 概述
        • 快速开始
        • 版本发布记录
      • Java SDK
        • 概述
        • 日志记录LogRecord操作
        • 安装SDK工具包
      • Android SDK
        • 概述
        • 快速开始
        • 版本发布记录
    • 导入SLS采集配置
  • 最佳实践
    • 使用同环比作为报警触发条件
    • 通过Grafana使用BLS
    • 使用Grafana访问日志服务的Elasticsearch兼容接口
    • BLS接入Kibana
  • 常见问题
    • 常见问题总览
    • 配置类问题
    • 故障类问题
  • 日志服务等级协议SLA
  • 文档中心
  • arrow
  • 日志服务BLS
  • arrow
  • 操作指南
  • arrow
  • 数据处理
  • arrow
  • 实时消费
本页目录
  • 概述
  • 基本概念
  • 费用
  • 控制台操作
  • 开启实时消费
  • 实时消费任务
  • 实时消费监控

实时消费

更新时间:2025-11-14

概述

日志服务支持通过Kafka协议消费,即可以将一个日志集,当作一个 Kafka Topic 来消费。下面介绍通过 Kafka 协议消费日志数据的相关步骤。

基本概念

概念 说明
消费组(ConsumerGroup) 消费组是多个消费者组成的虚拟集合。以消费组维度消费日志数据时,消费组中的所有消费者订阅同一个日志集,共同消费一个日志集中的数据。
消费者(Consumer) 一个从日志服务中消费数据的客户端,是消费组的组成部分。同一个消费组中的消费者名称唯一。同一时刻,日志集的一个Shard将会分配给消费组中的某一个消费者,一个消费者可能负责多个 Shard。
消费位点(Checkpoint) 一个 Shard 在被一个消费者消费的过程中,会随时记录当前 Shard 的消费位点(即游标进度)并上报服务端,以此来作为程序重启时的起始消费游标,从而保证数据不会被重复消费。

费用

公测期间免费,具体收费时间将通过邮件、短信、站内信等方式通知。

控制台操作

开启实时消费

  • 开启实时消费:按日志集维度开启实时消费,在日志集“操作->更多->开启实时消费”处按日志集维度开启实时消费(注:实时消费开启后可能影响日志查询性能,一经开启不可关闭)

image.png

实时消费任务

  • 如何消费:

支持使用kafka协议进行消费,使用上与消费kafka基本一致。

您需要在使用kafka客户端时配置以下参数

参数 示例 说明
连接类型 SASL_SSL 为保证日志传输的安全性,必须使用 SASL_SSL 连接协议。对应的用户名为日志服务日志组名,密码为账号密钥。
username default Kafka SASL 用户名。应配置为日志服务的日志组名。
password ALTAKOGSZ***#ALTAK9sZr** Kafka SASL 用户密码。应配置为百度云账户密钥。格式为 ${AccessKey}#${SecretKey},其中:${AccessKey} 应替换为您的 AccessKey。${SecretKey} 应替换为您的SecretKey
host bls-log.bj.baidubce.com:8201 初始连接的集群地址,格式为服务地址:端口号,例如 bls-log.bj.baidubce.com:8201,其中:服务地址为当前地域下日志服务的服务入口。请根据地域选择正确的服务入口,详细信息请参见服务地址。端口号固定为 8201。
topic log-online 配置为日志集名称

go版本消费代码示例如下:

Go
1import (
2	"context"
3	"fmt"
4	"log"
5	"time"
6
7	"github.com/IBM/sarama"
8	"google.golang.org/grpc"
9)
10
11type ConsumerHandler struct {
12}
13
14func (h *ConsumerHandler) Setup(s sarama.ConsumerGroupSession) error {
15	for topic, partitions := range s.Claims() {
16		fmt.Printf("Assigned topic: %s Assigned partitions: %v\n", topic, partitions)
17	}
18	log.Println("Consumer setup")
19	return nil
20}
21
22func (h *ConsumerHandler) Cleanup(s sarama.ConsumerGroupSession) error {
23	log.Println("Consumer cleanup")
24	return nil
25}
26
27func (h *ConsumerHandler) ConsumeClaim(s sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
28	for msg := range claim.Messages() {
29		fmt.Printf("Message: topic=%s partition=%d offset=%d key=%s value=%s\n",
30			msg.Topic, msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
31		s.MarkMessage(msg, "") // 手动标记消息为已消费
32	}
33	return nil
34}
35
36func main() {
37	config := sarama.NewConfig()
38	config.Consumer.Group.Rebalance.Strategy = sarama.NewBalanceStrategyRoundRobin()
39	config.Metadata.Full = false
40	config.Net.SASL.Mechanism = "PLAIN"
41	config.Net.SASL.Enable = true
42    config.Net.TLS.Enable = true
43	// 填申请的用户名
44	config.Net.SASL.User = "${project}"
45	// 填用户名对应的密码
46	config.Net.SASL.Password = "${AccessKey}#${SecretKey}"
47    // 地址填bls的域名+8201端口,例如bj地域填bls-log.bj.baidubce.com:8201
48	client, _ := sarama.NewConsumerGroup([]string{"bls_endpoint:8201"}, "your_group_id", config)
49	for {
50		if err := client.Consume(context.Background(), []string{"${topic}"}, &ConsumerHandler{
51		}); err != nil {
52			fmt.Println("Error from consumer:", err)
53		}
54	}
55}

注意:使用java kafka sdk时,版本需要为1.0.0-2.3.1版本,例如

XML
1<dependency>
2    <groupId>org.apache.kafka</groupId>
3    <artifactId>kafka-clients</artifactId>
4    <version>2.2.2</version>
5</dependency>
  • 消费任务列表:实时消费任务进度在实时消费模块进行查看,左侧显示消费组列表,点击后右侧显示对应消费组消费对应日志集的消费情况。(注:7天消费组没消费,列表自动把消费组删掉)。
  • 重置消费位点:可针对整个消费组或一个日志集对应的shard进行重置,重置时请先关闭消费进程后重置。重置支持选择最早位置,最新位置,或指定时间点

image.png

image.png

实时消费监控

支持查看消费组整体未消费消息条数监控,选择日志集可查看具体某个日志集未消费消息条数监控,选择日志集和Shard后查看具体某个Shard未消费消息条数监控

image.png

上一篇
数据加工
下一篇
日志应用