Uploading Logs Using Kafka Protocol
The log service supports uploading log data to the server via the Kafka protocol, meaning you can use the Kafka Producer SDK to collect log data and upload it to the log service via the Kafka protocol. This text introduces the operational steps for uploading logs to the log service via the Kafka protocol after collecting logs with a collection tool.
Background
Kafka, as a high-throughput message middleware, is commonly used as a message pipeline in self-built logstore scenarios. For example, logs can be collected from the log source server via the open-source collection tools or log data can be directly written by the Producer. The log service supports uploading log data via the Kafka protocol. The log upload function via the Kafka protocol does not require additional activation function or the installation of data collection tools on the data source side. Simple configuration enables Kafka Producer to collect and upload log information to the log service.
Limitations
- The supported Kafka protocol versions are 2.1.0-2.3.1.
- Supported compression methods include gzip, snappy, and lz4.
- To ensure the security of log transmission, the SASL_SSL connection protocol must be used. The corresponding username is the project name, and the password is the log service account key.
- When uploading logs via the Kafka protocol, the log format must be valid JSON, and the log timestamp must include the
@timestampfield (formatted as 2006-01-02T15:04:05Z07:00); An error will be returned for an invalid JSON format.
Parameter description
When uploading logs via the Kafka protocol, you need to configure the following parameters
| Parameters | Example | Description |
|---|---|---|
| Connection type | SASL_SSL | To ensure the security of log transmission, the SASL_SSL connection protocol must be used. The corresponding username is the log service project name, and the password is the account key. |
| username | default | Kafka SASL username. It should be configured as the project name of the log service. |
| password | ALTAKOGSZ***#ALTAK9sZr** | Kafka SASL user password. It should be configured as the Baidu AI Cloud account key. The format is ${AccessKey}#${SecretKey}, where: ${AccessKey} should be replaced with your AccessKey. ${SecretKey} should be replaced with your SecretKey. |
| host | bls-log.bj.baidubce.com:8200 | Initial cluster connection address, formatted as service address: port number, e.g., bls-log.bj.baidubce.com:8200, where the service address is the service entry for the log service in the current region. Please select the correct service entry based on the region. For details, refer to the service address. The port number is fixed at 8200. |
| topic | log-online | Configure as the logstore name |
Example
Uploading logs via Filebeat
- Configuration example
The parameter configuration used in the example can be found in the parameter description.
1## Multiple input sources, writing to different logstores via the fields parameter
2- type: log
3 fields:
4 logstore : "filebeat_1"
5 paths:
6 - /root/log/*.log
7- type: log
8 fields:
9 logstore : "filebeat_2"
10 paths:
11 - /root/log/*.txt
12
13## Increasing the internal queue of filebeat can improve the log upload rate (Reference documentation: https://www.elastic.co/docs/reference/beats/filebeat/configuring-internal-queue)
14queue.mem:
15 events: 100000
16## Kafka output configuration
17output.kafka:
18 hosts: "[${host}]"
19 username: "${project}"
20 password: "${AccessKey}#${SecretKey}"
21 topic : '%{[fields.logstore]}' #Linked with input parameters to output to multiple different logstores
22 required_acks: 1
23 sasl.mechanism: PLAIN
24 ssl.enabled: true
-
Limitations
- Filebeat's log output must use JSON format
- To ensure the security of log transmission, sasl.enabled:true and sasl.mechanism:PLAIN must be set.
- The default queue.mem.events configuration for filebeat is 3200. It is recommended to set it to 20000-100000 to improve log push performance.
Upload logs via Kafka Go SDK
- Dependencies
1go get github.com/IBM/sarama
- Code example
1package main
2import (
3 "crypto/tls"
4 "fmt"
5 "log"
6 "time"
7 "github.com/IBM/sarama"
8)
9func main() {
10 config := sarama.NewConfig()
11 config.Metadata.Full = false
12 config.Net.SASL.Mechanism = "PLAIN"
13 config.Net.SASL.Enable = true
14 config.Net.TLS.Enable = true
15 // username is the project name
16 config.Net.SASL.User = "${project}"
17 config.Producer.Return.Errors = true
18 // Baidu AI Cloud key
19 config.Net.SASL.Password = "${AccessKey}#${SecretKey}"
20 // hosts is the service address. For detailed instructions, refer to the parameter descriptions in this document
21 producer, err := sarama.NewAsyncProducer([]string{"{$hosts}"}, config)
22 if err != nil {
23 fmt.Println("new producer error:" + err.Error())
24 panic(err)
25 }
26 go func() {
27 for e := range producer.Errors() {
28 fmt.Println(e)
29 }
30 }()
31 channel := producer.Input()
32 for i := 0; i < 10; i++ {
33 channel <- &sarama.ProducerMessage{
34 // ${logStoreName} is the logstore name, e.g., log-online
35 Topic: "${logStoreName}",
36 Value: sarama.StringEncoder("{\"@timestamp\":\"2025-09-10T04:41:12.220Z\",\"level\":\"info\",\"latency\":0.008858347,\"status\":200,\"method\":\"POST\"}"),
37 }
38 }
39 time.Sleep(time.Minute)
40 producer.Close()
41}
Upload logs via Kafka Java SDK
- Dependencies
1<dependency>
2 <groupId>org.apache.kafka</groupId>
3 <artifactId>kafka-clients</artifactId>
4 <version>2.3.1</version>
5</dependency>
- Code example
1package org.wjr.test;
2import org.apache.kafka.clients.CommonClientConfigs;
3import org.apache.kafka.clients.consumer.ConsumerConfig;
4import org.apache.kafka.clients.producer.Producer;
5import org.apache.kafka.clients.producer.ProducerConfig;
6import org.apache.kafka.clients.producer.ProducerRecord;
7import org.apache.kafka.clients.producer.RecordMetadata;
8import org.apache.kafka.common.config.SaslConfigs;
9import java.util.Properties;
10import java.util.concurrent.ExecutionException;
11import java.util.concurrent.Future;
12import java.util.concurrent.TimeUnit;
13import java.util.concurrent.TimeoutException;
14public class Main {
15 public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
16 Properties props = new Properties();
17 // hosts is the service address. For detailed instructions, refer to the parameter descriptions in this document
18 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "${hosts}");
19 // Project name
20 String username = "${project}"
21 // Baidu AI Cloud key
22 String password = "${AccessKey}#${SecretKey}"
23 props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
24 props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
25 props.put(SaslConfigs.SASL_JAAS_CONFIG,
26 "org.apache.kafka.common.security.plain.PlainLoginModule " +
27 "required username=\"" + username + "\" password=\"" + password + "\";");
28 props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
29 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
30 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
31 Producer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer<String, String>(props);
32 // Call the send method.
33 Future<RecordMetadata> meta = producer.send(new ProducerRecord<String, String>("java_test", "{\"@timestamp\":\"2025-09-17T02:41:12.220Z\",\"level\":\"info\",\"latency\":0.008858347,\"status\":200,\"method\":\"POST\"}"));
34 RecordMetadata recordMetadata = meta.get(100, TimeUnit.SECONDS);
35 // Wait for Kafka Producer to asynchronously send data
36 Thread.sleep(1000000);
37 producer.close();
38 }
39}
