Subscribe Kafka Log
Through submitting routine load job, the user can directly subscribe to Kafka message data to synchronize the data in near real time.
Palo itself can guarantee to subscribe to Kafka messages without losing or duplication, that is, Exactly-Once
consumption semantics.
Preparations
Open Baidu Message Service
Please first open Baidu Message Service (BMS), which is based on Kafka to provide hosting services in Baidu intelligent cloud, according to the following process.
- Open message service according to BMS Quick start file.
-
Download and unzip certificate zip file kafka-key.zip to get the following files
- ca.pem
- client.key
- client.keystore.jks
- client.pem
- client.properties
- client.truststore.jks
-
Upload certificate file to HTTP server.
Since Palo needs to download these integers from a HTTP server to get access to Kafka later, first we need to upload these certificates to the HTTP server. This HTTP server must can be accessed by the Leader Node of Palo.
If you don't have an appropriate HTTP server, you can complete it through the following ways by virtue of Baidu Object Storage (BOS):
- Open BOS service and create a Bucket according to files Start to use,Create Bucket. Note that the domain of the Bucket must be the same as that of the Palo cluster.
-
Upload the following 3 files to Bucket
- ca.pem
- client.key
- client.pem
-
Click
File information
on the right side of the file on BOS Bucket file list page to obtain HTTP access connection. SetValid connection time
to- 1
, that is, Permanent.Note: Do not use the http download address with cdn acceleration, which cannot be accessed by Palo in some cases.
Self-built Kafka service
Make sure that the Kafka service and Palo cluster are in the same VPC and that the networks between them can be interconnected in order to use self-built Kafka service.
Subscribe to Kafka message
Routine Load function in Palo is used to subscribe to Kafka message.
First create a Routine Load Job. The job will constantly send a series of tasks through routine scheduling, and each task will consume a certain number of messages in Kafka.
Please note the following restrictions for use:
- Kafka access without authentication and Kafka cluster with SSL authentication are supported.
-
The supported message formats are as follows:
- csv text format. Each message is set in a line and it doesn't contain line break at the end of the line.
- Json format,refer to Load data in Json format.
- Only Kafka 0.10.0.0 or above is supported.
Access Kafka cluster with SSL authentication
Routine load function supports Kafka cluster without authentication and Kafka cluster with SSL authentication.
Accessing SSL certified Kafka cluster requires user provide the certificate file for certifying Kafka Broker public key (ca.pem). Public key (client.pem), key file( client.key ) and the key password of the client should also be provided if Kafka cluster also opens client authentication. The required files here need to be uploaded to Plao through command CREAE FILE
and the catalog name should be kafka
. Refer to CREATE FILE command manual for specific command help. Examples are as follows:
-
Upload files
CREATE FILE "ca.pem" PROPERTIES("url" = "https://example_url/kafka-key/ca.pem", "catalog" = "kafka"); CREATE FILE "client.key" PROPERTIES("url" = "https://example_urlkafka-key/client.key", "catalog" = "kafka"); CREATE FILE "client.pem" PROPERTIES("url" = "https://example_url/kafka-key/client.pem", "catalog" = "kafka");
View the uploaded files through SHOW FILES
command after upload.
Create a routine load job
Refer to ROUTINE LOAD command manual for specific commands for creating routine load jobs. Examples are as follows:
-
Access Kafka cluster without authentication
CREATE ROUTINE LOAD example_db.my_first_job ON example_tbl COLUMNS TERMINATED BY "," PROPERTIES ( "max_batch_interval" = "20", "max_batch_rows" = "300000", "max_batch_size" = "209715200", ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092", "kafka_topic" = "my_topic", "property.group.id" = "xxx", "property.client.id" = "xxx", "property.kafka_default_offsets" = "OFFSET_BEGINNING" );
max_batch_interval/max_batch_rows/max_batch_size
is used to control the run cycle of a subtask. The longest running time, the largest number of consumption lines and the largest amount of consumption data determine the running cycle of a subtask.
-
Access Kafka cluster with SSL authentication
CREATE ROUTINE LOAD example_db.my_first_job ON example_tbl COLUMNS TERMINATED BY ",", PROPERTIES ( "max_batch_interval" = "20", "max_batch_rows" = "300000", "max_batch_size" = "209715200", ) FROM KAFKA ( "kafka_broker_list"= "broker1:9091,broker2:9091", "kafka_topic" = "my_topic", "property.security.protocol" = "ssl", "property.ssl.ca.location" = "FILE:ca.pem", "property.ssl.certificate.location" = "FILE:client.pem", "property.ssl.key.location" = "FILE:client.key", "property.ssl.key.password" = "abcdefg" );
- For Baidu Message Service,
property.ssl.key.password
property can be obtained from fileclient.properties
.
- For Baidu Message Service,
View loaded job status
Refer to SHOW ROUTINE LOAD command file for specific commands and examples of job status.
Refer to SHOW ROUTINE LOAD TASK command file for specific commands and examples of running status of a certain job task.
Only running task can be viewed instead of those that have ended or not started.
Alter job properties
Partial created job properties can be altered. Refer to ALTER ROUTINE LOAD command manual for specific instructions.
Job control
The user can stop, pause and resume the job through three commands: STOP/PAUSE/RESUME
.
Refer to command files STOP ROUTINE LOAD,PAUSE ROUTINE LOAD,RESUME ROUTINE LOAD for specific commands.
More help
Refer to ROUTINE LOAD command manual for more detailed syntax and best practices of Route Load.