Uploading Logs Using Kafka Protocol

BLS

  • Function Release Records
  • Product Description
    • Usage restrictions
    • Product Introduction
  • Product pricing
  • Quick Start
    • Introduction
    • Install agent
    • Create LogStore
    • Create Transmission Task
    • Log Analysis and Alerting
    • Create Delivery Task
  • Operation guide
    • Baidu Intelligent Cloud Environment Preparation
    • Overview
    • Identity and access management
    • Logset Management
    • Agent
      • Install Agent on Host
      • Install Agent in K8s Environment
      • Agent Management
      • Agent Release Version
      • Set Agent Startup Parameters
    • Log Collection
      • Transmission Task Collection
        • Create Transmission Task
        • Manage Transmission Task
      • Uploading Logs Using Kafka Protocol
    • Query analysis
      • Log query
      • SQL Syntax
      • Search Syntax
    • Dashboard
      • Overview
      • Management Dashboard
      • Management Dashboard Charts
    • Alarm management
      • Alert Overview
      • Alarm strategy
        • Management alarm strategy
        • Trigger conditions
      • Alarm history
      • Alert execution statistics
      • Alarm notification
        • Alarm Notification Template
        • Alarm callback
    • Data processing
      • Log Delivery
        • Log Delivery Overview
        • Create Delivery Task
        • Manage Delivery Task
      • Scheduled SQL Analysis
        • Manage Scheduled SQL Analysis Task
        • Create Scheduled SQL Analysis Task
      • Real-Time Consumption
      • Data processing
        • Data processing
          • Overview of data processing functions
          • Process control function
          • Mapping enrichment functions
          • Event operation functions
          • Field operation functions
          • Field value extraction functions
    • Log Applications
      • Intelligent Diagnostics
  • Best Practices
    • Use Year-Over-Year and Month-Over-Month as Alert Trigger Conditions
    • BLS Integration with Kibana
    • Use BLS via Grafana
  • Development Guide
    • API Reference
      • API function release records
      • API Overview
      • Interface Overview
      • General Description
      • Service domain
      • Common error codes
      • Terminology
      • Project Related APIs
        • Create Project
        • Update Project
        • Describe Project
        • Delete Project
        • List Project
      • LogStore Related APIs
        • Create LogStore
        • Update LogStore
        • Delete LogStore
        • Describe LogStore
        • Batch Get LogStore
        • List LogStore
      • LogStream Related APIs
        • List LogStream
      • LogRecord Related APIs
        • Push log PushLogRecord
        • Obtain logrecord PullLogRecord
        • Search analysis log QueryLogRecord
        • Histogram API QueryLogHistogram
      • Fast Query FastQuery Related Interfaces
        • Create Fast Query CreateFastQuery
        • Update Fast Query UpdateFastQuery
        • Delete Fast Query DeleteFastQuery
        • Get Fast Query Details DescribeFastQuery
        • Get Fast Query List ListFastQuery
      • Index Related APIs
        • Create Index
        • Update Index
        • Delete Index
        • Describe Index
      • Log Shipper LogShipper Related Interfaces
        • Create Log Shipper
        • Update Log Shipper
        • Set Single Log Shipper Status
        • Delete Single Log Shipper
        • Bulk Delete Log Shipper
        • List Log Shipper Records
        • List Log Shipper
        • Bulk Set Log Shipper Status
        • Get Log Shipper
      • Alarm-Related Interfaces
        • CreateAlarmPolicy
        • UpdateAlarmPolicy
        • DeleteAlarmPolicy
        • ValidateAlarmCondition
        • ValidateAlarmPolicySQL
        • EnableAlarmPolicy
        • DescribeAlarmRecord
        • DisableAlarmPolicy
        • DescribeAlarmPolicy
        • ListAlarmPolicy
        • ListAlarmRecord
        • ListAlarmExecutionStats
        • ListAlarmExecutions
      • LogStore Template-Related Interfaces
        • CreateLogStoreTemplate
        • UpdateLogStoreTemplate
        • DeleteLogStoreTemplates
        • DescribeLogStoreTemplates
        • DescribeLogStoreTemplate
      • Download Log Download Related Interfaces
        • Create Download Task CreateDownloadTask
        • Get Download Task List ListDownloadTask
        • Delete Download Task DeleteDownloadTask
        • Get Download Task Address GetDownloadTaskLink
        • Get Download Task Details DescribeDownloadTask
      • LogAlarm Related Interfaces
        • SetLogAlarmStatus
        • deleteLogAlarm
        • createLogAlarm
        • listLogAlarm
        • updateLogAlarm
        • BulkDeleteLogAlarm
        • PreviewAlarmLogRecord
        • getLogAlarm
        • BulkSetLogAlarmStatus
      • Transmission Task Related Interfaces
        • Create Task CreateTask
        • UpdateTask
      • Interfaces Compatible with Elasticsearch
        • ResolveIndex
        • FieldCaps
        • TermsEnum
        • AsyncSearch
    • SDK Reference
      • Go SDK
        • Overview
        • Initialization
        • Version Release Records
        • Project Operations
        • LogStore Operations
        • Install the SDK Package
        • LogStream Operations
        • LogRecord Operations
        • FastQuery Operations
        • LogShipper Operations
        • Index Operations
        • Download Task Operations
      • Java SDK
        • Overview
        • Install the SDK Package
        • LogRecord Operations
      • iOS SDK
        • Overview
        • Quick start
        • Version Release Records
      • Android SDK
        • Overview
        • Quick start
        • Version Release Records
      • Android & iOS SDK Download
      • SDK Privacy Policy
      • SDK Developer Personal Information Protection Compliance Guide
    • Importing SLS Collection Configuration
  • FAQs
    • Common Questions Overview
    • Fault-related questions
    • Configuration-related questions
  • Log Service Level Agreement SLA
All documents
menu
No results found, please re-enter

BLS

  • Function Release Records
  • Product Description
    • Usage restrictions
    • Product Introduction
  • Product pricing
  • Quick Start
    • Introduction
    • Install agent
    • Create LogStore
    • Create Transmission Task
    • Log Analysis and Alerting
    • Create Delivery Task
  • Operation guide
    • Baidu Intelligent Cloud Environment Preparation
    • Overview
    • Identity and access management
    • Logset Management
    • Agent
      • Install Agent on Host
      • Install Agent in K8s Environment
      • Agent Management
      • Agent Release Version
      • Set Agent Startup Parameters
    • Log Collection
      • Transmission Task Collection
        • Create Transmission Task
        • Manage Transmission Task
      • Uploading Logs Using Kafka Protocol
    • Query analysis
      • Log query
      • SQL Syntax
      • Search Syntax
    • Dashboard
      • Overview
      • Management Dashboard
      • Management Dashboard Charts
    • Alarm management
      • Alert Overview
      • Alarm strategy
        • Management alarm strategy
        • Trigger conditions
      • Alarm history
      • Alert execution statistics
      • Alarm notification
        • Alarm Notification Template
        • Alarm callback
    • Data processing
      • Log Delivery
        • Log Delivery Overview
        • Create Delivery Task
        • Manage Delivery Task
      • Scheduled SQL Analysis
        • Manage Scheduled SQL Analysis Task
        • Create Scheduled SQL Analysis Task
      • Real-Time Consumption
      • Data processing
        • Data processing
          • Overview of data processing functions
          • Process control function
          • Mapping enrichment functions
          • Event operation functions
          • Field operation functions
          • Field value extraction functions
    • Log Applications
      • Intelligent Diagnostics
  • Best Practices
    • Use Year-Over-Year and Month-Over-Month as Alert Trigger Conditions
    • BLS Integration with Kibana
    • Use BLS via Grafana
  • Development Guide
    • API Reference
      • API function release records
      • API Overview
      • Interface Overview
      • General Description
      • Service domain
      • Common error codes
      • Terminology
      • Project Related APIs
        • Create Project
        • Update Project
        • Describe Project
        • Delete Project
        • List Project
      • LogStore Related APIs
        • Create LogStore
        • Update LogStore
        • Delete LogStore
        • Describe LogStore
        • Batch Get LogStore
        • List LogStore
      • LogStream Related APIs
        • List LogStream
      • LogRecord Related APIs
        • Push log PushLogRecord
        • Obtain logrecord PullLogRecord
        • Search analysis log QueryLogRecord
        • Histogram API QueryLogHistogram
      • Fast Query FastQuery Related Interfaces
        • Create Fast Query CreateFastQuery
        • Update Fast Query UpdateFastQuery
        • Delete Fast Query DeleteFastQuery
        • Get Fast Query Details DescribeFastQuery
        • Get Fast Query List ListFastQuery
      • Index Related APIs
        • Create Index
        • Update Index
        • Delete Index
        • Describe Index
      • Log Shipper LogShipper Related Interfaces
        • Create Log Shipper
        • Update Log Shipper
        • Set Single Log Shipper Status
        • Delete Single Log Shipper
        • Bulk Delete Log Shipper
        • List Log Shipper Records
        • List Log Shipper
        • Bulk Set Log Shipper Status
        • Get Log Shipper
      • Alarm-Related Interfaces
        • CreateAlarmPolicy
        • UpdateAlarmPolicy
        • DeleteAlarmPolicy
        • ValidateAlarmCondition
        • ValidateAlarmPolicySQL
        • EnableAlarmPolicy
        • DescribeAlarmRecord
        • DisableAlarmPolicy
        • DescribeAlarmPolicy
        • ListAlarmPolicy
        • ListAlarmRecord
        • ListAlarmExecutionStats
        • ListAlarmExecutions
      • LogStore Template-Related Interfaces
        • CreateLogStoreTemplate
        • UpdateLogStoreTemplate
        • DeleteLogStoreTemplates
        • DescribeLogStoreTemplates
        • DescribeLogStoreTemplate
      • Download Log Download Related Interfaces
        • Create Download Task CreateDownloadTask
        • Get Download Task List ListDownloadTask
        • Delete Download Task DeleteDownloadTask
        • Get Download Task Address GetDownloadTaskLink
        • Get Download Task Details DescribeDownloadTask
      • LogAlarm Related Interfaces
        • SetLogAlarmStatus
        • deleteLogAlarm
        • createLogAlarm
        • listLogAlarm
        • updateLogAlarm
        • BulkDeleteLogAlarm
        • PreviewAlarmLogRecord
        • getLogAlarm
        • BulkSetLogAlarmStatus
      • Transmission Task Related Interfaces
        • Create Task CreateTask
        • UpdateTask
      • Interfaces Compatible with Elasticsearch
        • ResolveIndex
        • FieldCaps
        • TermsEnum
        • AsyncSearch
    • SDK Reference
      • Go SDK
        • Overview
        • Initialization
        • Version Release Records
        • Project Operations
        • LogStore Operations
        • Install the SDK Package
        • LogStream Operations
        • LogRecord Operations
        • FastQuery Operations
        • LogShipper Operations
        • Index Operations
        • Download Task Operations
      • Java SDK
        • Overview
        • Install the SDK Package
        • LogRecord Operations
      • iOS SDK
        • Overview
        • Quick start
        • Version Release Records
      • Android SDK
        • Overview
        • Quick start
        • Version Release Records
      • Android & iOS SDK Download
      • SDK Privacy Policy
      • SDK Developer Personal Information Protection Compliance Guide
    • Importing SLS Collection Configuration
  • FAQs
    • Common Questions Overview
    • Fault-related questions
    • Configuration-related questions
  • Log Service Level Agreement SLA
  • Document center
  • arrow
  • BLS
  • arrow
  • Operation guide
  • arrow
  • Log Collection
  • arrow
  • Uploading Logs Using Kafka Protocol
Table of contents on this page
  • Background
  • Limitations
  • Parameter description
  • Example
  • Uploading logs via Filebeat
  • Upload logs via Kafka Go SDK
  • Upload logs via Kafka Java SDK

Uploading Logs Using Kafka Protocol

Updated at:2025-11-03

The log service supports uploading log data to the server via the Kafka protocol, meaning you can use the Kafka Producer SDK to collect log data and upload it to the log service via the Kafka protocol. This text introduces the operational steps for uploading logs to the log service via the Kafka protocol after collecting logs with a collection tool.

Background

Kafka, as a high-throughput message middleware, is commonly used as a message pipeline in self-built logstore scenarios. For example, logs can be collected from the log source server via the open-source collection tools or log data can be directly written by the Producer. The log service supports uploading log data via the Kafka protocol. The log upload function via the Kafka protocol does not require additional activation function or the installation of data collection tools on the data source side. Simple configuration enables Kafka Producer to collect and upload log information to the log service.

Limitations

  • The supported Kafka protocol versions are 2.1.0-2.3.1.
  • Supported compression methods include gzip, snappy, and lz4.
  • To ensure the security of log transmission, the SASL_SSL connection protocol must be used. The corresponding username is the project name, and the password is the log service account key.
  • When uploading logs via the Kafka protocol, the log format must be valid JSON, and the log timestamp must include the @timestamp field (formatted as 2006-01-02T15:04:05Z07:00); An error will be returned for an invalid JSON format.

Parameter description

When uploading logs via the Kafka protocol, you need to configure the following parameters

Parameters Example Description
Connection type SASL_SSL To ensure the security of log transmission, the SASL_SSL connection protocol must be used. The corresponding username is the log service project name, and the password is the account key.
username default Kafka SASL username. It should be configured as the project name of the log service.
password ALTAKOGSZ***#ALTAK9sZr** Kafka SASL user password. It should be configured as the Baidu AI Cloud account key. The format is ${AccessKey}#${SecretKey}, where: ${AccessKey} should be replaced with your AccessKey. ${SecretKey} should be replaced with your SecretKey.
host bls-log.bj.baidubce.com:8200 Initial cluster connection address, formatted as service address: port number, e.g., bls-log.bj.baidubce.com:8200, where the service address is the service entry for the log service in the current region. Please select the correct service entry based on the region. For details, refer to the service address. The port number is fixed at 8200.
topic log-online Configure as the logstore name

Example

Uploading logs via Filebeat

  • Configuration example
    The parameter configuration used in the example can be found in the parameter description.
YAML
1## Multiple input sources, writing to different logstores via the fields parameter
2- type: log
3  fields:
4    logstore : "filebeat_1"
5  paths:
6    - /root/log/*.log
7- type: log
8  fields:
9    logstore : "filebeat_2"
10  paths:
11    - /root/log/*.txt
12
13##  Increasing the internal queue of filebeat can improve the log upload rate (Reference documentation: https://www.elastic.co/docs/reference/beats/filebeat/configuring-internal-queue)
14queue.mem:
15  events: 100000
16##  Kafka output configuration
17output.kafka:
18  hosts: "[${host}]"
19  username: "${project}"
20  password: "${AccessKey}#${SecretKey}"
21  topic : '%{[fields.logstore]}' #Linked with input parameters to output to multiple different logstores
22  required_acks: 1
23  sasl.mechanism: PLAIN
24  ssl.enabled: true
  • Limitations

    • Filebeat's log output must use JSON format
    • To ensure the security of log transmission, sasl.enabled:true and sasl.mechanism:PLAIN must be set.
    • The default queue.mem.events configuration for filebeat is 3200. It is recommended to set it to 20000-100000 to improve log push performance.

Upload logs via Kafka Go SDK

  • Dependencies
Palin
1go get github.com/IBM/sarama
  • Code example
Go
1package main
2import (
3	"crypto/tls"
4	"fmt"
5	"log"
6	"time"
7	"github.com/IBM/sarama"
8)
9func main() {
10	config := sarama.NewConfig()
11	config.Metadata.Full = false
12	config.Net.SASL.Mechanism = "PLAIN"
13	config.Net.SASL.Enable = true
14	config.Net.TLS.Enable = true
15    // username is the project name
16	config.Net.SASL.User = "${project}"
17	config.Producer.Return.Errors = true
18    // Baidu AI Cloud key
19	config.Net.SASL.Password = "${AccessKey}#${SecretKey}"
20    // hosts is the service address. For detailed instructions, refer to the parameter descriptions in this document
21	producer, err := sarama.NewAsyncProducer([]string{"{$hosts}"}, config)
22    if err != nil {
23        fmt.Println("new producer error:" + err.Error())
24        panic(err)
25    }
26	go func() {
27		for e := range producer.Errors() {
28			fmt.Println(e)
29		}
30	}()
31	channel := producer.Input()
32	for i := 0; i < 10; i++ {
33		channel <- &sarama.ProducerMessage{
34            // ${logStoreName} is the logstore name, e.g., log-online
35			Topic: "${logStoreName}",
36			Value: sarama.StringEncoder("{\"@timestamp\":\"2025-09-10T04:41:12.220Z\",\"level\":\"info\",\"latency\":0.008858347,\"status\":200,\"method\":\"POST\"}"),
37		}
38	}
39	time.Sleep(time.Minute)
40	producer.Close()
41}

Upload logs via Kafka Java SDK

  • Dependencies
XML
1<dependency>
2    <groupId>org.apache.kafka</groupId>
3    <artifactId>kafka-clients</artifactId>
4    <version>2.3.1</version>
5</dependency>
  • Code example
Java
1package org.wjr.test;
2import org.apache.kafka.clients.CommonClientConfigs;
3import org.apache.kafka.clients.consumer.ConsumerConfig;
4import org.apache.kafka.clients.producer.Producer;
5import org.apache.kafka.clients.producer.ProducerConfig;
6import org.apache.kafka.clients.producer.ProducerRecord;
7import org.apache.kafka.clients.producer.RecordMetadata;
8import org.apache.kafka.common.config.SaslConfigs;
9import java.util.Properties;
10import java.util.concurrent.ExecutionException;
11import java.util.concurrent.Future;
12import java.util.concurrent.TimeUnit;
13import java.util.concurrent.TimeoutException;
14public class Main {
15    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
16        Properties props = new Properties();
17        // hosts is the service address. For detailed instructions, refer to the parameter descriptions in this document
18        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "${hosts}");
19        // Project name
20        String username = "${project}"
21        // Baidu AI Cloud key
22        String password = "${AccessKey}#${SecretKey}"
23        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
24        props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
25        props.put(SaslConfigs.SASL_JAAS_CONFIG,
26                "org.apache.kafka.common.security.plain.PlainLoginModule " +
27                        "required username=\"" + username + "\" password=\"" + password + "\";");
28        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
29        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
30        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
31        Producer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer<String, String>(props);
32        // Call the send method.
33        Future<RecordMetadata> meta = producer.send(new ProducerRecord<String, String>("java_test", "{\"@timestamp\":\"2025-09-17T02:41:12.220Z\",\"level\":\"info\",\"latency\":0.008858347,\"status\":200,\"method\":\"POST\"}"));
34        RecordMetadata recordMetadata = meta.get(100, TimeUnit.SECONDS);
35        // Wait for Kafka Producer to asynchronously send data
36        Thread.sleep(1000000);
37        producer.close();
38    }
39}

Previous
Transmission Task Collection
Next
Query analysis