BROKER-LOAD
BROKER LOAD
Description
This command is mainly used to load data from remote storage (such as BOS and HDFS) through Broker service process.
LOAD LABEL load_label
(
data_desc1[, data_desc2, ...]
)
WITH BROKER broker_name
[broker_properties]
[load_properties];
-
load_label
Each load needs to specify a unique Label. The job progress can be viewed through this label later.
[database.]label_name
-
data_desc1
Used to describe a set of files that need to be loaded.
[MERGE|APPEND|DELETE] DATA INFILE ( "file_path1"[, file_path2, ...] ) [NEGATIVE] INTO TABLE `table_name` [PARTITION (p1, p2, ...)] [COLUMNS TERMINATED BY "column_separator"] [FORMAT AS "file_type"] [(column_list)] [COLUMNS FROM PATH AS (c1, c2, ...)] [PRECEDING FILTER predicate] [SET (column_mapping)] [WHERE predicate] [DELETE ON expr] [ORDER BY source_sequence]
-
[MERGE|APPEND|DELETE]
The default value is APPEND, which means that this load is an ordinary append write operation. MERGE and DELETE types are only applicable to Unique Key model tables. The MERGE type needs to be used with the
[DELETE ON]
statement to label the Delete Flag column. The DELETE type means that all the data loaded this time are deleted data. -
DATA INFILE
Specify the file path to be loaded. It can be multiple. Wildcards can be used. The path must eventually match the file, and if it only matches the directory, the load will fail.
-
NEGTIVE
This keyword is used to indicate that this load is a batch of "negative" loads. This method is only for aggregate data tables with integer SUM aggregate type. In this way, the integer value corresponding to the SUM aggregate column in the loaded data will be inverted. It is mainly used to offset the wrong data loaded before.
-
PARTITION(p1, p2, ...)
User can specify some partitions only loaded with tables. Data not in the partition will be ignored.
-
COLUMNS TERMINATED BY
Specify the column separator. Valid only in CSV format. Only single byte separators can be specified.
-
FORMAT AS
Specify the file type and support CSV, PARQUET and ORC formats. Default to CSV.
-
column list
Used to specify the order of columns in the original file. For a detailed introduction of this part, please refer to Mapping, Transformation and Filtering of Columns document.
(k1, k2, tmpk1)
-
COLUMNS FROM PATH AS
Specify the columns extracted from the load file path.
-
PRECEDING FILTER predicate
Pre-filtering condition. Data are spliced into original data rows in sequence firstly according to
column list
andCOLUMNS FROM PATH AS
, and then filtered by the pre-filtering conditions. For a detailed introduction of this part, please refer to Mapping, Transformation and Filtering of Columns document. -
SET (column_mapping)
Specify the conversion function for the column.
-
WHERE predicate
Filter the loaded data according to the conditions. For a detailed introduction of this part, please refer to Mapping, Transformation and Filtering of Columns document.
-
DELETE ON expr
It needs to be used together with MEREGE load mode, only for tables of Unique Key model. And it is used to specify the column and calculation relation of Delete Flag in loaded data.
-
ORDER BY
Only tables for the Unique Key model. It is used to specify the column representing Sequence Col in the loaded data. It is mainly used to ensure data order during loading.
-
-
WITH BROKER broker_name
Specify the name of the Broker service to use. In the public cloud Palo. Broker service name is
bos
-
broker_properties
Specify the information required by the broker. This information is often used for remote storage systems that Broker can access. For BOS or HDFS. For specific information, please refer to Broker document.
( "key1" = "val1", "key2" = "val2", ... )
-
load_properties
Specify parameters related to the load. Currently the following parameters are supported:
-
timeout
Load timeout period. Default to 4 hours, in seconds.
-
max_filter_ratio
Maximum tolerance of filterable (data nonstandard, etc.) data ratio. Zero tolerance by default. The value range is 0 to 1.
-
exec_mem_limit
Load memory limits. Default to 2 GB, in bytes.
-
strict_mode
Whether there are strict restrictions on data. Default to false.
-
timezone
Specify the time zone of some functions affected by time zone, such as
strftime/alignment_timestamp/from_unixtime
etc., for details, please refer to Time Zone document. If not specified, use the "Asia/Shanghai" time zone.
-
Example
-
Load a batch of data from BOS
LOAD LABEL example_db.label1 ( DATA INFILE("bos://my_bucket/input/file.txt") INTO TABLE `my_table` COLUMNS TERMINATED BY "," ) WITH BROKER bos ( "bos_endpoint" = "http://bj.bcebos.com", "bos_accesskey" = "xxxxxxxxxxxxxxxxxxxxxxxxxx", "bos_secret_accesskey"="yyyyyyyyyyyyyyyyyyyyyyyyyy" );
Load the file
file.txt
, separated by commas, and load it into the tablemy_table
. -
Load data from BOS and use wildcards to match two batches of files. Load into two tables respectively.
LOAD LABEL example_db.label2 ( DATA INFILE("bos://my_bucket/input/file-10*") INTO TABLE `my_table1` PARTITION (p1) COLUMNS TERMINATED BY "," (k1, tmp_k2, tmp_k3) SET ( k2 = tmp_k2 + 1, k3 = tmp_k3 + 1 ) DATA INFILE("bos://my_bucket/input/file-20*") INTO TABLE `my_table2` COLUMNS TERMINATED BY "," (k1, k2, k3) ) WITH BROKER bos ( "bos_endpoint" = "http://bj.bcebos.com", "bos_accesskey" = "xxxxxxxxxxxxxxxxxxxxxxxxxx", "bos_secret_accesskey"="yyyyyyyyyyyyyyyyyyyyyyyyyy" );
Use wildcard matching to load two batches of files
file-10*
andfile-20*
。Load intomy_table1
andmy_table2
, in which,my_table1
is designated to be loaded top1
after adding +1 to values in the second columns and third columns in source files loaded. -
Load a batch of data from HDFS.
LOAD LABEL example_db.label3 ( DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/*/*") INTO TABLE `my_table` COLUMNS TERMINATED BY "\\x01" ) WITH BROKER my_hdfs_broker ( "username" = "", "password" = "", "dfs.nameservices" = "my_ha", "dfs.ha.namenodes.my_ha" = "my_namenode1, my_namenode2", "dfs.namenode.rpc-address.my_ha.my_namenode1" = "nn1_host:rpc_port", "dfs.namenode.rpc-address.my_ha.my_namenode2" = "nn2_host:rpc_port", "dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider" );
Specify the default delimiter
\\x01
,and use wildcard * to specify all files in all directories under thedata
directory. Use simple authentication and configure namenode HA at the same time. -
Load Parquet FORMAT data and specify parquet as format. It is judged by file suffix by default.
LOAD LABEL example_db.label4 ( DATA INFILE("bos://bucket/input/file") INTO TABLE `my_table` FORMAT AS "parquet" (k1, k2, k3) ) WITH BROKER bos ( "bos_endpoint" = "http://bj.bcebos.com", "bos_accesskey" = "xxxxxxxxxxxxxxxxxxxxxxxxxx", "bos_secret_accesskey"="yyyyyyyyyyyyyyyyyyyyyyyyyy" );
-
Load the data and extract the partition fields in the file path
LOAD LABEL example_db.label10 ( DATA INFILE("hdfs://hdfs_host:hdfs_port/input/city=beijing/*/*") INTO TABLE `my_table` FORMAT AS "csv" (k1, k2, k3) COLUMNS FROM PATH AS (city, utc_date) ) WITH BROKER hdfs ( "username"="hdfs_user", "password"="hdfs_password" );
Columns in
my_table
arek1, k2, k3, city, utc_date
.In which, the catalog
hdfs://hdfs_host:hdfs_port/user/palo/data/input/dir/city=beijing
contains the following files:hdfs://hdfs_host:hdfs_port/input/city=beijing/utc_date=2020-10-01/0000.csv hdfs://hdfs_host:hdfs_port/input/city=beijing/utc_date=2020-10-02/0000.csv hdfs://hdfs_host:hdfs_port/input/city=tianji/utc_date=2020-10-03/0000.csv hdfs://hdfs_host:hdfs_port/input/city=tianji/utc_date=2020-10-04/0000.csv
The file only contains three columns of data:
k1, k2, k3
, and the two columns ofcity, utc_date
will be extracted from the file path. -
Filter the data to be loaded.
LOAD LABEL example_db.label6 ( DATA INFILE("bos://bucket/input/file") INTO TABLE `my_table` (k1, k2, k3) PRECEDING FILTER k1 = 1 SET ( k2 = k2 + 1 ) WHERE k1 > k2 ) WITH BROKER bos ( "bos_endpoint" = "http://bj.bcebos.com", "bos_accesskey" = "xxxxxxxxxxxxxxxxxxxxxxxxxx", "bos_secret_accesskey"="yyyyyyyyyyyyyyyyyyyyyyyyyy" );
Only in the original data, k1 = 1,and after conversion, rows with k1 > k2 will be loaded.
-
Load data, extract the time partition field in the file path, and the time contains %3A (in hdfs path,':' is not allowed, all':' will be replaced by %3A)
LOAD LABEL example_db.label7 ( DATA INFILE("hdfs://host:port/user/data/*/test.txt") INTO TABLE `tbl12` COLUMNS TERMINATED BY "," (k2,k3) COLUMNS FROM PATH AS (data_time) SET ( data_time=str_to_date(data_time, '%Y-%m-%d %H%%3A%i%%3A%s') ) ) WITH BROKER hdfs ( "username"="user", "password"="pass" );
There are the following files under the path:
/user/data/data_time=2020-02-17 00%3A00%3A00/test.txt /user/data/data_time=2020-02-18 00%3A00%3A00/test.txt
The table structure is:
data_time DATETIME, k2 INT, k3 INT
-
Load a batch of data from HDFS, and specify timeout and filtering ratio. Broker using plaintext my_hdfs_broker. Simple authentication. And delete the columns in the original data that match the columns with v2 greater than 100 in the loaded data, and load the other columns normally
LOAD LABEL example_db.label8 ( MERGE DATA INFILE("bos://bucket/input/file") INTO TABLE `my_table` (k1, k2, k3, v2, v1) DELETE ON v2 > 100 ) WITH BROKER bos ( "bos_endpoint" = "http://bj.bcebos.com", "bos_accesskey" = "xxxxxxxxxxxxxxxxxxxxxxxxxx", "bos_secret_accesskey"="yyyyyyyyyyyyyyyyyyyyyyyyyy" ) PROPERTIES ( "timeout" = "3600", "max_filter_ratio" = "0.1" );
Load using MERGE method.
my_table
must be a Unique Key table. When the value of v2 column in the loaded data is greater than 100, the row is considered as a deleted row.The timeout of the load task is 3600 seconds, and the allowable error rate is within 10%.
-
Specify the source_sequence column during load to ensure the replacement order in the UNIQUE_KEYS table:
LOAD LABEL example_db.label9 ( DATA INFILE("bos://bucket/input/file") INTO TABLE `my_table` COLUMNS TERMINATED BY "," (k1,k2,source_sequence,v1,v2) ORDER BY source_sequence ) WITH BROKER bos ( "bos_endpoint" = "http://bj.bcebos.com", "bos_accesskey" = "xxxxxxxxxxxxxxxxxxxxxxxxxx", "bos_secret_accesskey"="yyyyyyyyyyyyyyyyyyyyyyyyyy" )
my_table
must be an Unqiue Key model table with sequcecol specified. Data will be ordered according to the value ofsource_sequence
column in the source data.
Keywords
BROKER, LOAD
Best Practices
-
View load task status
Broker Load is an asynchronous load process. Successful execution of statements only means successful submission of load tasks, but does not mean successful load of data. Load status should be viewed by SHOW LOAD command.
-
Cancel the load task
Load tasks that have been submitted but not finished can be cancelled by CANCEL LOAD command. After cancellation, the written data will also be rolled back and will not take effect.
-
Label, load transaction, multi-table atomicity
All load tasks in Palo are atomic. Moreover, the atomicity can be guaranteed by loading multiple tables in the same load task. At the same time, Palo can also ensure that data load is not lost or duplicated through the mechanism of Label.Specific instructions can be found in Load Transactions and Atomicity document.
-
Column mapping, derived columns and filtering
Palo can support very rich column conversion and filtering operations in load statements. Most built-in functions and UDFs are supported. For how to use this function correctly, please refer to Mapping, transformation and filtering of columns document.
-
Error data filtering
Palo's load task can tolerate some malformed data. Tolerance rate can be set by
max_filter_ratio
, which is 0 by default, which means that, when there is an error data, the whole load task will fail. If the user wants to ignore some data rows with problems, the secondary parameter can be set to a value between 0 and 1, and Palo will automatically skip the rows with incorrect data format.For some calculation methods of tolerance rate, please refer to Mapping, transformation and filtering of columns document.
-
Strict mode
strict_mode
property is used to set whether the load task runs in strict mode. This format will affect the results of column mapping, transformation and filtering. For a detailed description of the strict mode, please refer to [Strict Mode]PALO/Operating Manual/Data Load/Strict Mode.md#) document. -
Timeout period
The default timeout for Broker Load is 4 hours, starting from the time when the task is submitted.If it is not completed within the timeout period, the task will fail.
-
Data volume and task number limit
Broker Load is suitable for loading data within 100GB in one load task. Although theoretically there is no upper limit to the amount of data loaded in an load task. However, submitting too large an load will lead to a long running time, and the cost of retry after failure will also increase.
At the same time, due to the cluster size, we limit the maximum amount of loaded data to the number of ComputeNode nodes * 3GB. So as to ensure the rational utilization of system resources.If there is a large amount of data to be loaded, it is recommended to submit it in multiple load tasks.
Palo also limits the number of load tasks running simultaneously in the cluster, usually ranging from 3 to 10. Load jobs submitted afterwards will be queued. The maximum queue length is 100. Subsequent submissions will be rejected directly. Note that the queue time is also calculated into the total job time. If it times out, the job will be cancelled. Therefore, it is suggested to reasonably control the frequency of job submission by monitoring the operation status.