ROUTINE-LOAD
ROUTINE LOAD
Description
Routine Load function, which supports users to submit a resident load task and load data into Palo by continuously reading data from the specified data source.
At present, it only supports loading CSV or Json format data from Kakfa by means of no authentication or SSL authentication.
Syntax:
CREATE ROUTINE LOAD [db.]job_name ON tbl_name
[merge_type]
[load_properties]
[job_properties]
FROM data_source [data_source_properties]
-
[db.]job_name
The name of the load job. in the same database, only one job with the same name can run.
-
tbl_name
Specify the name of the table to be loaded.
-
merge_type
Data merger type. The default value is APPEND, which means that the data loaded are ordinary append write operations. MERGE and DELETE types are only applicable to Unique Key model tables. The MERGE type needs to be used with the [DELETE ON] statement to mark the Delete Flag column. The DELETE type means that all the data loaded this time are deleted data.
-
load_properties
Used to describe loaded data, which consists of:
[column_separator], [columns_mapping], [preceding_filter], [where_predicates], [partitions], [DELETE ON], [ORDER BY]
-
column_separator
Specify column separator, default is
\t
COLUMNS TERMINATED BY ","
-
columns_mapping
Used to specify the mapping relationship between file columns and columns in tables, and various column transformations. For a detailed introduction of this part, please refer to the [Mapping, Transformation and Filtering of Columns] document.
(k1, k2, tmpk1, k3 = tmpk1 + 1)
-
preceding_filter
Filter raw data. For a detailed introduction of this part, please refer to the [Mapping, Transformation and Filtering of Columns] document.
-
where_predicates
Filter the loaded data according to the conditions. For a detailed introduction of this part, please refer to the [Mapping, Transformation and Filtering of Columns] document.
WHERE k1 > 100 and k2 = 1000
-
partitions
Specify which partition of the destination table to load. If not specified, it will be automatically loaded into the corresponding partition.
PARTITION(p1, p2, p3)
-
DELETE ON
It needs to be used together with MEREGE load mode, only for tables of Unique Key model. And it is used to specify the column and calculation relation of Delete Flag in loaded data.
DELETE ON v3 >100
-
ORDER BY
Only tables for the Unique Key model. It is used to specify the column representing Sequence Col in the loaded data. It is mainly used to ensure data order during loading.
-
-
job_properties
Used to specify general parameters for routine load jobs.
PROPERTIES ( "key1" = "val1", "key2" = "val2" )
At present we support the following parameters:
-
desired_concurrent_number
Expected degree of concurrency. A routine load job will be divided into several subtasks. This parameter specifies the maximum number of tasks that a job can perform at the same time. It must be greater than 0 and is 3 by default.
This concurrency is not the actual concurrency. The actual concurrency will be comprehensively considered through the number of nodes in the cluster, the load situation and the data source situation.
"desired_concurrent_number" = "3"
-
max_batch_interval/max_batch_rows/max_batch_size
These three parameters respectively represent:
- Maximum execution time of each subtask, in seconds. It ranges from 5 to 60 and is 10 by default.
- The maximum number of rows read per subtask, which must be greater than or equal to 200000 and is 200000 by default.
- The maximum number of lines read per subtask. The unit is bytes and the range is 100MB to 1GB. it takes 100MB by default.
These three parameters are used to control the execution time and throughput of a subtask. When any one reaches the threshold, the task ends.
"max_batch_interval" = "20", "max_batch_rows" = "300000", "max_batch_size" = "209715200"
-
max_error_number
The maximum number of error rows allowed in the sampling window, which must be greater than or equal to 0. The default value is 0, that is, error lines are not allowed.
Sampling window is
max_batch_rows * 10
。That is, if the number of error rows is larger thanmax_error_number
in the sampling window, routine jobs will be suspended, and manual intervention is needed to check data quality problems.Rows filtered out by the where condition are not considered as error rows.
-
strict_mode
Whether strict mode is turned on or not is turned off by default. If it is turned on, the column type transformation of non-empty original data will be filtered if the result is NULL. Specify the method as follows:
"strict_mode" = "true"
-
timezone
Specify the time zone used by the load job. The default is to use the timezone parameter of Session. This parameter will affect the results of all functions related to time zone involved in load.
-
format
Specify the load data format, which is csv by default and supports json format.
-
jsonpaths
When the loaded data format is json, you can specify the fields in the extracted Json data through jsonpaths.
-H "jsonpaths: [\"$.k2\", \"$.k1\"]"
-
strip_outer_array
When the loaded data format is json, strip_outer_array is true, which means that Json data is presented in the form of array, and each element in the data will be regarded as a row of data. The default is "False".
-H "strip_outer_array: true"
-
json_root
When the loaded data format is json, you can specify the fields in the root Json data through json_root. Palo will extract the elements of the root node through json_root for parsing. It is empty by default.
-H "json_root: $.RECORDS"
-
-
FROM data_source [data_source_properties]
Data source type. Currently support:
FROM KAFKA ( "key1" = "val1", "key2" = "val2" )
data_source_properties
supports the following data source properties:-
kafka_broker_list
Broker connection information for Kafka, in the format ip:host. Multiple broker are separated by commas.
"kafka_broker_list" = "broker1:9092,broker2:9092"
-
kafka_topic
Specify the topic of Kafka to subscribe to.
"kafka_topic" = "my_topic"
-
kafka_partitions/kafka_offsets
Specify the kafka partition that need to be subscribed, and the corresponding starting offset of each partition.
Offset can specify a specific offset from 0 or more, or:
OFFSET_BEGINNING
: Subscribe from where there is data.OFFSET_END
: Subscribe from the end.
If not specified, all partition under topic will be subscribed by default from
OFFSET_END
."kafka_partitions" = "0,1,2,3", "kafka_offsets" = "101,0,OFFSET_BEGINNING,OFFSET_END"
-
property
Specify custom kafka parameters. The function is equivalent to the "--property" parameter in kafka shell.
When the value of the parameter is a file, you need to add keyword "FILE:" before the value.
For information on how to create a file, please refer to CREATE FILE command document.
See the CONFIGURATION items on the client side in the official configuration document of librdkafka for more supported custom parameters. Such as:
"property.client.id" = "12345", "property.ssl.ca.location" = "FILE:ca.pem"
-
The following parameters need to be specified when connecting Kafka using SSL:
"property.security.protocol" = "ssl", "property.ssl.ca.location" = "FILE:ca.pem", "property.ssl.certificate.location" = "FILE:client.pem", "property.ssl.key.location" = "FILE:client.key", "property.ssl.key.password" = "abcdefg"
In which:
property.security.protocol
andproperty.ssl.ca.location
are required to indicate that the connection method is SSL and the location of the CA certificate.If client authentication is enabled on Kafka server side, it is also necessary to set:
"property.ssl.certificate.location" "property.ssl.key.location" "property.ssl.key.password"
Used to specify public key, private key and password of private key of client respectively.
-
Specify the default starting offset of kafka partition
If
kafka_partitions/kafka_offsets
is not specified, all partitions are consumed by default.At this time, the start offset of
kafka_default_offsets
can be specified, which isOFFSET_END
by default, that is, subscribe from the end.Example:
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
-
-
Example
-
Create a Kafka routine load task named test1 for example_tbl of example_db. Specify column separator, group.id and client.id, automatically consume all partitions by default, and subscribe from the position where there is data (OFFSET_BEGINNING)
CREATE ROUTINE LOAD example_db.test1 ON example_tbl COLUMNS TERMINATED BY ",", COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100) PROPERTIES ( "desired_concurrent_number"="3", "max_batch_interval" = "20", "max_batch_rows" = "300000", "max_batch_size" = "209715200", "strict_mode" = "false" ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092", "kafka_topic" = "my_topic", "property.group.id" = "xxx", "property.client.id" = "xxx", "property.kafka_default_offsets" = "OFFSET_BEGINNING" );
-
Create a Kafka routine load task named test1 for example_tbl of example_db. Load task is in strict mode.
CREATE ROUTINE LOAD example_db.test1 ON example_tbl COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100), PRECEDING FILTER k1 = 1, WHERE k1 > 100 and k2 like "%palo%" PROPERTIES ( "desired_concurrent_number"="3", "max_batch_interval" = "20", "max_batch_rows" = "300000", "max_batch_size" = "209715200", "strict_mode" = "false" ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092", "kafka_topic" = "my_topic", "kafka_partitions" = "0,1,2,3", "kafka_offsets" = "101,0,0,200" );
-
Load data from Kafka cluster through SSL authentication. Set the client.id parameter at the same time. Load task is in non-strict mode and time zone is Africa/Abidjan
CREATE ROUTINE LOAD example_db.test1 ON example_tbl COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100), WHERE k1 > 100 and k2 like "%palo%" PROPERTIES ( "desired_concurrent_number"="3", "max_batch_interval" = "20", "max_batch_rows" = "300000", "max_batch_size" = "209715200", "strict_mode" = "false", "timezone" = "Africa/Abidjan" ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092", "kafka_topic" = "my_topic", "property.security.protocol" = "ssl", "property.ssl.ca.location" = "FILE:ca.pem", "property.ssl.certificate.location" = "FILE:client.pem", "property.ssl.key.location" = "FILE:client.key", "property.ssl.key.password" = "abcdefg", "property.client.id" = "my_client_id" );
-
Load Json format data. By default, field names in Json are used as column name mappings. Specify to load into three partitions, 0, 1 and 2, with the initial offset of 0
CREATE ROUTINE LOAD example_db.test_json_label_1 ON table1 COLUMNS(category,price,author) PROPERTIES ( "desired_concurrent_number"="3", "max_batch_interval" = "20", "max_batch_rows" = "300000", "max_batch_size" = "209715200", "strict_mode" = "false", "format" = "json" ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092", "kafka_topic" = "my_topic", "kafka_partitions" = "0,1,2", "kafka_offsets" = "0,0,0" );
-
Load Json data, extract fields through Jsonpaths, and specify the root node of Json document
CREATE ROUTINE LOAD example_db.test1 ON example_tbl COLUMNS(category, author, price, timestamp, dt=from_unixtime(timestamp, '%Y%m%d')) PROPERTIES ( "desired_concurrent_number"="3", "max_batch_interval" = "20", "max_batch_rows" = "300000", "max_batch_size" = "209715200", "strict_mode" = "false", "format" = "json", "jsonpaths" = "[\"$.category\",\"$.author\",\"$.price\",\"$.timestamp\"]", "json_root" = "$.RECORDS" "strip_outer_array" = "true" ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092", "kafka_topic" = "my_topic", "kafka_partitions" = "0,1,2", "kafka_offsets" = "0,0,0" );
-
Create a Kafka routine load task named test1 for example_tbl of example_db. And use conditional filtering.
CREATE ROUTINE LOAD example_db.test1 ON example_tbl WITH MERGE COLUMNS(k1, k2, k3, v1, v2, v3), WHERE k1 > 100 and k2 like "%palo%", DELETE ON v3 >100 PROPERTIES ( "desired_concurrent_number"="3", "max_batch_interval" = "20", "max_batch_rows" = "300000", "max_batch_size" = "209715200", "strict_mode" = "false" ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092", "kafka_topic" = "my_topic", "kafka_partitions" = "0,1,2,3", "kafka_offsets" = "101,0,0,200" );
-
Load data into a Unique Key model table with a sequence column
CREATE ROUTINE LOAD example_db.test_job ON example_tbl COLUMNS TERMINATED BY ",", COLUMNS(k1,k2,source_sequence,v1,v2), ORDER BY source_sequence PROPERTIES ( "desired_concurrent_number"="3", "max_batch_interval" = "30", "max_batch_rows" = "300000", "max_batch_size" = "209715200" ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092", "kafka_topic" = "my_topic", "kafka_partitions" = "0,1,2,3", "kafka_offsets" = "101,0,0,200" );
Keywords
CREATE, ROUTINE, LOAD
Best Practices
TBD