Real-Time Consumption
Overview
Log service supports consumption via the Kafka protocol, meaning a logstore can be treated as a Kafka Topic for consumption. The following describes the steps for consuming log data via the Kafka protocol.
Basic concepts
| Concepts | Description |
|---|---|
| Consumer group (ConsumerGroup) | A consumer group is a virtual collection of multiple consumers. When consuming log data from the consumer groupdimension, all consumers in the consumer group subscribe to the same logstore and collectively consume data from that logstore. |
| Consumer | A client that consumes data from the log service and is part of a consumer group. Each consumer within the same consumer group must have a unique name. At any given time, a shard of the logstore will be assigned to one consumer in the consumer group, and a single consumer may be responsible for multiple shards. |
| Checkpoint | During the consumption of a shard by a consumer, the current checkpoint (i.e., cursor progress) of the shard will be continuously recorded and reported to the server. This serves as the starting consumption cursor when the program restarts, ensuring data is not consumed repeatedly. |
Fee
Free during the public beta period. Specific charge time will be notified via email, SMS, and in-site messages.
Console operations
Enable real-time consumption
- Enable real-time consumption: Enable real-time consumption from the logstore dimension. Enable real-time consumption from the logstore dimension by navigating to Operation - More - Enable Real-time Consumption for the logstore. (Note: Enabling real-time consumption may affect log query performance and cannot be disabled once enabled.)

Real-time consumption task
- How to consume:
Support consumption using the Kafka protocol, which is largely consistent with consuming Kafka.
You need to configure the following parameters when using the Kafka client.
| Parameters | Example | Description |
|---|---|---|
| Connection type | SASL_SSL | To ensure the security of log transmission, the SASL_SSL connection protocol must be used. The corresponding username is the log service project name, and the password is the account key. |
| username | default | Kafka SASL username. It should be configured as the project name of the log service. |
| password | ALTAKOGSZ***#ALTAK9sZr** | Kafka SASL user password. It should be configured as the Baidu AI Cloud account key. The format is ${AccessKey}#${SecretKey}, where: ${AccessKey} should be replaced with your AccessKey. ${SecretKey} should be replaced with your SecretKey. |
| host | bls-log.bj.baidubce.com:8201 | Initial cluster connection address, formatted as service address: port number, e.g., bls-log.bj.baidubce.com:8201, where the service address is the service entry for the log service in the current region. Please select the correct service entry based on the region. For details, refer to the service address. The port number is fixed at 8201. |
| topic | log-online | Configure as the logstore name |
Example of go version consumption code is as follows:
1import (
2 "context"
3 "fmt"
4 "log"
5 "time"
6 "github.com/IBM/sarama"
7 "google.golang.org/grpc"
8)
9type ConsumerHandler struct {
10}
11func (h *ConsumerHandler) Setup(s sarama.ConsumerGroupSession) error {
12 for topic, partitions := range s.Claims() {
13 fmt.Printf("Assigned topic: %s Assigned partitions: %v\n", topic, partitions)
14 }
15 log.Println("Consumer setup")
16 return nil
17}
18func (h *ConsumerHandler) Cleanup(s sarama.ConsumerGroupSession) error {
19 log.Println("Consumer cleanup")
20 return nil
21}
22func (h *ConsumerHandler) ConsumeClaim(s sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
23 for msg := range claim.Messages() {
24 fmt.Printf("Message: topic=%s partition=%d offset=%d key=%s value=%s\n",
25 msg.Topic, msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
26 s.MarkMessage(msg, "") // Manually mark the message as consumed
27 }
28 return nil
29}
30func main() {
31 config := sarama.NewConfig()
32 config.Consumer.Group.Rebalance.Strategy = sarama.NewBalanceStrategyRoundRobin()
33 config.Metadata.Full = false
34 config.Net.SASL.Mechanism = "PLAIN"
35 config.Net.SASL.Enable = true
36 config.Net.TLS.Enable = true
37 // Enter the applied username
38 config.Net.SASL.User = "${project}"
39 // Enter the password corresponding to the username
40 config.Net.SASL.Password = "${AccessKey}#${SecretKey}"
41 // Enter the bls domain name+8201 port for the address, e.g., for the bj region, enter bls-log.bj.baidubce.com:8201
42 client, _ := sarama.NewConsumerGroup([]string{"bls_endpoint:8201"}, "your_group_id", config)
43 for {
44 if err := client.Consume(context.Background(), []string{"${topic}"}, &ConsumerHandler{
45 }); err != nil {
46 fmt.Println("Error from consumer:", err)
47 }
48 }
49}
Note: When using Java Kafka SDK, the version must be 1.0.0-2.3.1, for example
1<dependency>
2 <groupId>org.apache.kafka</groupId>
3 <artifactId>kafka-clients</artifactId>
4 <version>2.2.2</version>
5</dependency>
- Consumption task list: Real-time consumption task progress can be viewed in the real-time consumption module. The left side displays the list of consumer groups, and after clicking, the right side displays the consumption status of the corresponding logstore consumed by the corresponding consumption group. (Note: Consumer groups inactive for 7 days will be automatically removed from the list.)
- Reset checkpoint: You can reset the shard corresponding to an entire consumer group or a specific shard. Please close the consumption process before resetting. Reset supports selecting the earliest position, the latest position, or a specified time point.


Real-time consumption monitor
Support viewing the monitor of the total count of unconsumed messages for a consumer group. Select a logstore to view the count of unconsumed messages for a specific logstore, and select both a logstore and a shard to view the count of unconsumed messages for a specific shard.

