Real-Time Consumption

BLS

  • Function Release Records
  • Product Description
    • Usage restrictions
    • Product Introduction
  • Product pricing
  • Quick Start
    • Introduction
    • Install agent
    • Create LogStore
    • Create Transmission Task
    • Log Analysis and Alerting
    • Create Delivery Task
  • Operation guide
    • Baidu Intelligent Cloud Environment Preparation
    • Overview
    • Identity and access management
    • Logset Management
    • Agent
      • Install Agent on Host
      • Install Agent in K8s Environment
      • Agent Management
      • Agent Release Version
      • Set Agent Startup Parameters
    • Log Collection
      • Transmission Task Collection
        • Create Transmission Task
        • Manage Transmission Task
      • Uploading Logs Using Kafka Protocol
    • Query analysis
      • Log query
      • SQL Syntax
      • Search Syntax
    • Dashboard
      • Overview
      • Management Dashboard
      • Management Dashboard Charts
    • Alarm management
      • Alert Overview
      • Alarm strategy
        • Management alarm strategy
        • Trigger conditions
      • Alarm history
      • Alert execution statistics
      • Alarm notification
        • Alarm Notification Template
        • Alarm callback
    • Data processing
      • Log Delivery
        • Log Delivery Overview
        • Create Delivery Task
        • Manage Delivery Task
      • Scheduled SQL Analysis
        • Manage Scheduled SQL Analysis Task
        • Create Scheduled SQL Analysis Task
      • Real-Time Consumption
      • Data processing
        • Data processing
          • Overview of data processing functions
          • Process control function
          • Mapping enrichment functions
          • Event operation functions
          • Field operation functions
          • Field value extraction functions
    • Log Applications
      • Intelligent Diagnostics
  • Best Practices
    • Use Year-Over-Year and Month-Over-Month as Alert Trigger Conditions
    • BLS Integration with Kibana
    • Use BLS via Grafana
  • Development Guide
    • API Reference
      • API function release records
      • API Overview
      • Interface Overview
      • General Description
      • Service domain
      • Common error codes
      • Terminology
      • Project Related APIs
        • Create Project
        • Update Project
        • Describe Project
        • Delete Project
        • List Project
      • LogStore Related APIs
        • Create LogStore
        • Update LogStore
        • Delete LogStore
        • Describe LogStore
        • Batch Get LogStore
        • List LogStore
      • LogStream Related APIs
        • List LogStream
      • LogRecord Related APIs
        • Push log PushLogRecord
        • Obtain logrecord PullLogRecord
        • Search analysis log QueryLogRecord
        • Histogram API QueryLogHistogram
      • Fast Query FastQuery Related Interfaces
        • Create Fast Query CreateFastQuery
        • Update Fast Query UpdateFastQuery
        • Delete Fast Query DeleteFastQuery
        • Get Fast Query Details DescribeFastQuery
        • Get Fast Query List ListFastQuery
      • Index Related APIs
        • Create Index
        • Update Index
        • Delete Index
        • Describe Index
      • Log Shipper LogShipper Related Interfaces
        • Create Log Shipper
        • Update Log Shipper
        • Set Single Log Shipper Status
        • Delete Single Log Shipper
        • Bulk Delete Log Shipper
        • List Log Shipper Records
        • List Log Shipper
        • Bulk Set Log Shipper Status
        • Get Log Shipper
      • Alarm-Related Interfaces
        • CreateAlarmPolicy
        • UpdateAlarmPolicy
        • DeleteAlarmPolicy
        • ValidateAlarmCondition
        • ValidateAlarmPolicySQL
        • EnableAlarmPolicy
        • DescribeAlarmRecord
        • DisableAlarmPolicy
        • DescribeAlarmPolicy
        • ListAlarmPolicy
        • ListAlarmRecord
        • ListAlarmExecutionStats
        • ListAlarmExecutions
      • LogStore Template-Related Interfaces
        • CreateLogStoreTemplate
        • UpdateLogStoreTemplate
        • DeleteLogStoreTemplates
        • DescribeLogStoreTemplates
        • DescribeLogStoreTemplate
      • Download Log Download Related Interfaces
        • Create Download Task CreateDownloadTask
        • Get Download Task List ListDownloadTask
        • Delete Download Task DeleteDownloadTask
        • Get Download Task Address GetDownloadTaskLink
        • Get Download Task Details DescribeDownloadTask
      • LogAlarm Related Interfaces
        • SetLogAlarmStatus
        • deleteLogAlarm
        • createLogAlarm
        • listLogAlarm
        • updateLogAlarm
        • BulkDeleteLogAlarm
        • PreviewAlarmLogRecord
        • getLogAlarm
        • BulkSetLogAlarmStatus
      • Transmission Task Related Interfaces
        • Create Task CreateTask
        • UpdateTask
      • Interfaces Compatible with Elasticsearch
        • ResolveIndex
        • FieldCaps
        • TermsEnum
        • AsyncSearch
    • SDK Reference
      • Go SDK
        • Overview
        • Initialization
        • Version Release Records
        • Project Operations
        • LogStore Operations
        • Install the SDK Package
        • LogStream Operations
        • LogRecord Operations
        • FastQuery Operations
        • LogShipper Operations
        • Index Operations
        • Download Task Operations
      • Java SDK
        • Overview
        • Install the SDK Package
        • LogRecord Operations
      • iOS SDK
        • Overview
        • Quick start
        • Version Release Records
      • Android SDK
        • Overview
        • Quick start
        • Version Release Records
      • Android & iOS SDK Download
      • SDK Privacy Policy
      • SDK Developer Personal Information Protection Compliance Guide
    • Importing SLS Collection Configuration
  • FAQs
    • Common Questions Overview
    • Fault-related questions
    • Configuration-related questions
  • Log Service Level Agreement SLA
All documents
menu
No results found, please re-enter

BLS

  • Function Release Records
  • Product Description
    • Usage restrictions
    • Product Introduction
  • Product pricing
  • Quick Start
    • Introduction
    • Install agent
    • Create LogStore
    • Create Transmission Task
    • Log Analysis and Alerting
    • Create Delivery Task
  • Operation guide
    • Baidu Intelligent Cloud Environment Preparation
    • Overview
    • Identity and access management
    • Logset Management
    • Agent
      • Install Agent on Host
      • Install Agent in K8s Environment
      • Agent Management
      • Agent Release Version
      • Set Agent Startup Parameters
    • Log Collection
      • Transmission Task Collection
        • Create Transmission Task
        • Manage Transmission Task
      • Uploading Logs Using Kafka Protocol
    • Query analysis
      • Log query
      • SQL Syntax
      • Search Syntax
    • Dashboard
      • Overview
      • Management Dashboard
      • Management Dashboard Charts
    • Alarm management
      • Alert Overview
      • Alarm strategy
        • Management alarm strategy
        • Trigger conditions
      • Alarm history
      • Alert execution statistics
      • Alarm notification
        • Alarm Notification Template
        • Alarm callback
    • Data processing
      • Log Delivery
        • Log Delivery Overview
        • Create Delivery Task
        • Manage Delivery Task
      • Scheduled SQL Analysis
        • Manage Scheduled SQL Analysis Task
        • Create Scheduled SQL Analysis Task
      • Real-Time Consumption
      • Data processing
        • Data processing
          • Overview of data processing functions
          • Process control function
          • Mapping enrichment functions
          • Event operation functions
          • Field operation functions
          • Field value extraction functions
    • Log Applications
      • Intelligent Diagnostics
  • Best Practices
    • Use Year-Over-Year and Month-Over-Month as Alert Trigger Conditions
    • BLS Integration with Kibana
    • Use BLS via Grafana
  • Development Guide
    • API Reference
      • API function release records
      • API Overview
      • Interface Overview
      • General Description
      • Service domain
      • Common error codes
      • Terminology
      • Project Related APIs
        • Create Project
        • Update Project
        • Describe Project
        • Delete Project
        • List Project
      • LogStore Related APIs
        • Create LogStore
        • Update LogStore
        • Delete LogStore
        • Describe LogStore
        • Batch Get LogStore
        • List LogStore
      • LogStream Related APIs
        • List LogStream
      • LogRecord Related APIs
        • Push log PushLogRecord
        • Obtain logrecord PullLogRecord
        • Search analysis log QueryLogRecord
        • Histogram API QueryLogHistogram
      • Fast Query FastQuery Related Interfaces
        • Create Fast Query CreateFastQuery
        • Update Fast Query UpdateFastQuery
        • Delete Fast Query DeleteFastQuery
        • Get Fast Query Details DescribeFastQuery
        • Get Fast Query List ListFastQuery
      • Index Related APIs
        • Create Index
        • Update Index
        • Delete Index
        • Describe Index
      • Log Shipper LogShipper Related Interfaces
        • Create Log Shipper
        • Update Log Shipper
        • Set Single Log Shipper Status
        • Delete Single Log Shipper
        • Bulk Delete Log Shipper
        • List Log Shipper Records
        • List Log Shipper
        • Bulk Set Log Shipper Status
        • Get Log Shipper
      • Alarm-Related Interfaces
        • CreateAlarmPolicy
        • UpdateAlarmPolicy
        • DeleteAlarmPolicy
        • ValidateAlarmCondition
        • ValidateAlarmPolicySQL
        • EnableAlarmPolicy
        • DescribeAlarmRecord
        • DisableAlarmPolicy
        • DescribeAlarmPolicy
        • ListAlarmPolicy
        • ListAlarmRecord
        • ListAlarmExecutionStats
        • ListAlarmExecutions
      • LogStore Template-Related Interfaces
        • CreateLogStoreTemplate
        • UpdateLogStoreTemplate
        • DeleteLogStoreTemplates
        • DescribeLogStoreTemplates
        • DescribeLogStoreTemplate
      • Download Log Download Related Interfaces
        • Create Download Task CreateDownloadTask
        • Get Download Task List ListDownloadTask
        • Delete Download Task DeleteDownloadTask
        • Get Download Task Address GetDownloadTaskLink
        • Get Download Task Details DescribeDownloadTask
      • LogAlarm Related Interfaces
        • SetLogAlarmStatus
        • deleteLogAlarm
        • createLogAlarm
        • listLogAlarm
        • updateLogAlarm
        • BulkDeleteLogAlarm
        • PreviewAlarmLogRecord
        • getLogAlarm
        • BulkSetLogAlarmStatus
      • Transmission Task Related Interfaces
        • Create Task CreateTask
        • UpdateTask
      • Interfaces Compatible with Elasticsearch
        • ResolveIndex
        • FieldCaps
        • TermsEnum
        • AsyncSearch
    • SDK Reference
      • Go SDK
        • Overview
        • Initialization
        • Version Release Records
        • Project Operations
        • LogStore Operations
        • Install the SDK Package
        • LogStream Operations
        • LogRecord Operations
        • FastQuery Operations
        • LogShipper Operations
        • Index Operations
        • Download Task Operations
      • Java SDK
        • Overview
        • Install the SDK Package
        • LogRecord Operations
      • iOS SDK
        • Overview
        • Quick start
        • Version Release Records
      • Android SDK
        • Overview
        • Quick start
        • Version Release Records
      • Android & iOS SDK Download
      • SDK Privacy Policy
      • SDK Developer Personal Information Protection Compliance Guide
    • Importing SLS Collection Configuration
  • FAQs
    • Common Questions Overview
    • Fault-related questions
    • Configuration-related questions
  • Log Service Level Agreement SLA
  • Document center
  • arrow
  • BLS
  • arrow
  • Operation guide
  • arrow
  • Data processing
  • arrow
  • Real-Time Consumption
Table of contents on this page
  • Overview
  • Basic concepts
  • Fee
  • Console operations
  • Enable real-time consumption
  • Real-time consumption task
  • Real-time consumption monitor

Real-Time Consumption

Updated at:2025-11-03

Overview

Log service supports consumption via the Kafka protocol, meaning a logstore can be treated as a Kafka Topic for consumption. The following describes the steps for consuming log data via the Kafka protocol.

Basic concepts

Concepts Description
Consumer group (ConsumerGroup) A consumer group is a virtual collection of multiple consumers. When consuming log data from the consumer groupdimension, all consumers in the consumer group subscribe to the same logstore and collectively consume data from that logstore.
Consumer A client that consumes data from the log service and is part of a consumer group. Each consumer within the same consumer group must have a unique name. At any given time, a shard of the logstore will be assigned to one consumer in the consumer group, and a single consumer may be responsible for multiple shards.
Checkpoint During the consumption of a shard by a consumer, the current checkpoint (i.e., cursor progress) of the shard will be continuously recorded and reported to the server. This serves as the starting consumption cursor when the program restarts, ensuring data is not consumed repeatedly.

Fee

Free during the public beta period. Specific charge time will be notified via email, SMS, and in-site messages.

Console operations

Enable real-time consumption

  • Enable real-time consumption: Enable real-time consumption from the logstore dimension. Enable real-time consumption from the logstore dimension by navigating to Operation - More - Enable Real-time Consumption for the logstore. (Note: Enabling real-time consumption may affect log query performance and cannot be disabled once enabled.)

image.png

Real-time consumption task

  • How to consume:

Support consumption using the Kafka protocol, which is largely consistent with consuming Kafka.

You need to configure the following parameters when using the Kafka client.

Parameters Example Description
Connection type SASL_SSL To ensure the security of log transmission, the SASL_SSL connection protocol must be used. The corresponding username is the log service project name, and the password is the account key.
username default Kafka SASL username. It should be configured as the project name of the log service.
password ALTAKOGSZ***#ALTAK9sZr** Kafka SASL user password. It should be configured as the Baidu AI Cloud account key. The format is ${AccessKey}#${SecretKey}, where: ${AccessKey} should be replaced with your AccessKey. ${SecretKey} should be replaced with your SecretKey.
host bls-log.bj.baidubce.com:8201 Initial cluster connection address, formatted as service address: port number, e.g., bls-log.bj.baidubce.com:8201, where the service address is the service entry for the log service in the current region. Please select the correct service entry based on the region. For details, refer to the service address. The port number is fixed at 8201.
topic log-online Configure as the logstore name

Example of go version consumption code is as follows:

Go
1import (
2	"context"
3	"fmt"
4	"log"
5	"time"
6	"github.com/IBM/sarama"
7	"google.golang.org/grpc"
8)
9type ConsumerHandler struct {
10}
11func (h *ConsumerHandler) Setup(s sarama.ConsumerGroupSession) error {
12	for topic, partitions := range s.Claims() {
13		fmt.Printf("Assigned topic: %s Assigned partitions: %v\n", topic, partitions)
14	}
15	log.Println("Consumer setup")
16	return nil
17}
18func (h *ConsumerHandler) Cleanup(s sarama.ConsumerGroupSession) error {
19	log.Println("Consumer cleanup")
20	return nil
21}
22func (h *ConsumerHandler) ConsumeClaim(s sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
23	for msg := range claim.Messages() {
24		fmt.Printf("Message: topic=%s partition=%d offset=%d key=%s value=%s\n",
25			msg.Topic, msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
26		s.MarkMessage(msg, "") // Manually mark the message as consumed
27	}
28	return nil
29}
30func main() {
31	config := sarama.NewConfig()
32	config.Consumer.Group.Rebalance.Strategy = sarama.NewBalanceStrategyRoundRobin()
33	config.Metadata.Full = false
34	config.Net.SASL.Mechanism = "PLAIN"
35	config.Net.SASL.Enable = true
36    config.Net.TLS.Enable = true
37	// Enter the applied username
38	config.Net.SASL.User = "${project}"
39	// Enter the password corresponding to the username
40	config.Net.SASL.Password = "${AccessKey}#${SecretKey}"
41    // Enter the bls domain name+8201 port for the address, e.g., for the bj region, enter bls-log.bj.baidubce.com:8201
42	client, _ := sarama.NewConsumerGroup([]string{"bls_endpoint:8201"}, "your_group_id", config)
43	for {
44		if err := client.Consume(context.Background(), []string{"${topic}"}, &ConsumerHandler{
45		}); err != nil {
46			fmt.Println("Error from consumer:", err)
47		}
48	}
49}

Note: When using Java Kafka SDK, the version must be 1.0.0-2.3.1, for example

XML
1<dependency>
2    <groupId>org.apache.kafka</groupId>
3    <artifactId>kafka-clients</artifactId>
4    <version>2.2.2</version>
5</dependency>
  • Consumption task list: Real-time consumption task progress can be viewed in the real-time consumption module. The left side displays the list of consumer groups, and after clicking, the right side displays the consumption status of the corresponding logstore consumed by the corresponding consumption group. (Note: Consumer groups inactive for 7 days will be automatically removed from the list.)
  • Reset checkpoint: You can reset the shard corresponding to an entire consumer group or a specific shard. Please close the consumption process before resetting. Reset supports selecting the earliest position, the latest position, or a specified time point.

image.png

image.png

Real-time consumption monitor

Support viewing the monitor of the total count of unconsumed messages for a consumer group. Select a logstore to view the count of unconsumed messages for a specific logstore, and select both a logstore and a shard to view the count of unconsumed messages for a specific shard.

image.png

Previous
Scheduled SQL Analysis
Next
Data processing