STREAM-LOAD
STREAM LOAD
Description
Stream Load transmits and loads data to Palo through HTTP protocol. This mode is mainly for uploading and loading local data of users. But it is essentially an load framework, and its HTTP interface can not only support the transmission of local data, but also support the transmission of data from memory data and pipeline data to HTTP ports.
- The public cloud user must use the HTTP protocol port of Compute Node(BE), which is 8040 by default.
- Privatized deployment users can use the HTTP protocol port of Leader Node(FE), which is 8030 by default. It must ensure that the machine network where the client is located can be connected to the machine where Compute Node is located.
This document mainly introduces the use mode of Stream Load by cURL command.
curl -XPUT --location-trusted -u user:passwd \
[-H "header1: xxx" -H "header2: xxx" ...] \
-T data.txt \
http://host:port/api/{db}/{table}/_stream_load
- The request mode of HTTP is
PUT
- At present, HTTP chunked and non-chunked upload methods are supported. For non-chunked mode, the Header must contain
Content-Length
to identify the length of uploaded content to ensure the integrity of data. - Header should contain
Expect Header: 100-continue
,which can avoid unnecessary data transmission in some error scenarios. -
There are two types of target
host:port
for command:- HTTP protocol port pointing to FE.In this way, the FE will directly forward the request 307 to a random BE node. The final request and data communicate directly with this BE node. This method requires that the network of client and BE node can communicate normally.
- HTTP protocol port pointing to BE. The request directly interacts with the BE node.
Note: Baidu cloud Palo users can directly connect to the HTTP protocol port of Compute Node.
- Specify the database and table to be loaded in the two pathparameters
{db}
and{table}
of the URL. -
Other parameters of the load task are specified in the Header:
-
label
Specify a Label for the load task to uniquely identify the job. If not specified, the system will automatically generate a UUID as a Label.
-H "label: my_label1"
-
column_separator
Used to specify the column separator in the load file, which is
\t
by default. If it is an invisible character, you need to prefix it with \x and use hexadecimal to represent the separator. For example, the delimiter\x01
of the hive file needs to be specified as\x01
.-H "column_separator: ,"
-
columns
Used to specify the mapping relationship between file columns and columns in tables, and various column transformations. For a detailed introduction of this part, please refer to Mapping, transformation and filtering of columns document.
-H "columns: k1, k2, tmpk1, k3 = tmpk1 + 1"
-
where
Filter the loaded data according to the conditions. For a detailed introduction of this part, please refer to Mapping, transformation and filtering of columns document.
-H "where: k1 > 100"
-
max_filter_ratio
Maximum tolerance of filterable (data nonstandard, etc.) data ratio. Zero tolerance by default. The value range is 0 to 1.
-H "max_filter_ratio: 0.01"
-
partitions
Specify the partition that needs to load data.
-H "partitions: p1, p2"
-
timeout
Specify the timeout for load, in seconds, which is 600s by default. The setting range is 1s to 14400s.
-H "timeout: 120"
-
strict_mode
Whether there are strict restrictions on data. The default is "False".
-H "strict_mode: true"
-
timezone
Specify the time zone used for this load. The default is East Eight Districts. This parameter will affect the results of all functions related to time zone involved in load.
-H "timezone: Asia/Shanghai"
-
exec_mem_limit
Load memory limits. Default to 2, in bytes.
-H "exec_mem_limit: 4294967296"
-
format
Specify the load data format. Support
csv
andjson
,csv
by default.-H "format: json"
-
jsonpaths
When the loaded data format is json, you can specify the fields in the extracted Json data through jsonpaths.
-H "jsonpaths: [\"$.k2\", \"$.k1\"]"
-
strip_outer_array
When the loaded data format is json,
strip_outer_array
istrue
, which means that Json data is presented in the form of array, and each element in the data will be regarded as a row of data. The default is "False".-H "strip_outer_array: true"
-
json_root
When the format of loaded data is json, user may specify the root node of Json data by
json_root
. Palo will extract the elements of the root node throughjson_root
for parsing. It is empty by default.-H "json_root: $.RECORDS"
-
merge_type
The default value is APPEND, which means that this load is an ordinary append write operation. MERGE and DELETE types are only applicable to Unique Key model tables. The MERGE type needs to be used with the
delete
parameter to mark the Delete Flag column. The DELETE type means that all the data loaded this time are deleted data.-H "merge_type: MERGE"
-
delete: Meaningful only under MERGE type, used to specify the Delete Flag column and the conditions for marking the delete flag.
-H "delete: col3 = 1"
-
function_column.sequence_col
Only tables for the Unique Key model. It is used to specify the column representing Sequence Col in the loaded data. It is mainly used to ensure data order during loading.
-H "function_column.sequence_col: col3"
-
fuzzy_parse
When the loaded data format is Json array, and the field order of each row in the array is completely consistent. This parameter can be turned on to speed up the load. Generally it is used in combination with
strip_outer_array: true
. Details can be found in JSON FormatDataLoad Instructions.-H "fuzzy_parse: true"
-
Example
-
Load the local file testData and specify the timeout period
curl --location-trusted -u admin -H "label:label1" -H "timeout:100" -T testData http://host:port/api/example_db/my_table/_stream_load
-
Load the local file testData and filter the data according to the conditions
curl --location-trusted -u admin -H "label:label2" -H "where: k1=20180601" -T testData http://host:port/api/example_db/my_table/_stream_load
-
Load the local file testData and set the maximum allowable error rate
curl --location-trusted -u admin -H "label:label3" -H "max_filter_ratio:0.2" -T testData http://host:port/api/example_db/my_table/_stream_load
-
Load the local file testData and specify the the column mapping relationship
curl --location-trusted -u admin -H "label:label4" -H "max_filter_ratio:0.2" -H "columns: k2, k1, v1" -T testData http://host:port/api/example_db/my_table/_stream_load
-
Load the local file testData and specify the partition and maximum allowable error rate
curl --location-trusted -u admin -H "label:label5" -H "max_filter_ratio:0.2" -H "partitions: p1, p2" -T testData http://host:port/api/example_db/my_table/_stream_load
-
Load using streaming method
seq 1 10 | awk '{OFS="\t"}{print $1, $1 * 10}' | curl --location-trusted -u admin -T - http://host:port/api/example_db/my_table/_stream_load
-
Load a table with HLL columns
curl --location-trusted -u admin -H "columns: k1, k2, v1=hll_hash(k1), v2=hll_empty()" -T testData http://host:port/api/example_db/my_table/_stream_load
-
Load a table with BITMAP columns
curl --location-trusted -u admin -H "columns: k1, k2, v1=to_bitmap(k1), v2=bitmap_empty()" -T testData http://host:port/api/example_db/my_table/_stream_load
-
Load Json data, using simple mode. That is, the field name in Json data is the column name.
The table structure is:
category varchar(512) author varchar(512) title varchar(512) price double
Json data:
{"category":"C++","author":"avc","title":"C++ primer","price":895}
Load command:
curl --location-trusted -u admin -H "label:label10" -H "format: json" -T testData http://host:port/api/example_db/my_table/_stream_load
- Load Json data and extract fields using jsonpath
json data format:
{"category":"xuxb111","author":"1avc","title":"SayingsoftheCentury","price":895},
Use jsonpath to extract three fields: category, author, price
.
curl --location-trusted -u admin -H "columns: category, price, author" -H "label:123" -H "format: json" -H "jsonpaths: [\"$.category\",\"$.price\",\"$.author\"]" -H "strip_outer_array: true" -T testData http://host:port/api/example_db/my_table/_stream_load
-
Load Json data, specify Json document root node, and flatten array.
json data format:
{ "RECORDS":[ {"category":"11","title":"SayingsoftheCentury","price":895,"timestamp":1589191587}, {"category":"22","author":"2avc","price":895,"timestamp":1589191487}, {"category":"33","author":"3avc","title":"SayingsoftheCentury","timestamp":1589191387} ] }
Use jsonpath to extract three fields:
category, author, price
.curl --location-trusted -u admin -H "columns: category, price, author" -H "label:label12" -H "format: json" -H "jsonpaths: [\"$.category\",\"$.price\",\"$.author\"]" -H "strip_outer_array: true" -H "json_root: $.RECORDS" -T testData http://host:port/api/example_db/my_table/_stream_load
-
Use DELETE mode to delete the same data as this batch of loaded keys
curl --location-trusted -u admin -H "merge_type: DELETE" -T testData http://host:port/api/example_db/my_table/_stream_load
-
Use MERGE mode. Delete the columns in a batch of data that match the data whose
flag
column is true, and append other rows normallycurl --location-trusted -u admin -H "column_separator:," -H "columns: siteid, citycode, username, pv, flag" -H "merge_type: MERGE" -H "delete: flag=1" -T testData http://host:port/api/example_db/my_table/_stream_load
-
Load data into a Unique Key model table with a Sequence Col column
curl --location-trusted -u admin -H "columns: k1,k2,source_sequence,v1,v2" -H "function_column.sequence_col: source_sequence" -T testData http://host:port/api/example_db/my_table/_stream_load
Keywords
STREAM, LOAD
Best Practices
-
View load task status
Stream Load is a synchronous load process, and the successful execution of statements means the successful load of data. The execution result of the load will be returned synchronously through HTTP return value. And present it in Json format.
{ "TxnId": 17, "Label": "707717c0-271a-44c5-be0b-4e71bfeacaa5", "Status": "Success", "Message": "OK", "NumberTotalRows": 5, "NumberLoadedRows": 5, "NumberFilteredRows": 0, "NumberUnselectedRows": 0, "LoadBytes": 28, "LoadTimeMs": 27, "BeginTxnTimeMs": 0, "StreamLoadPutTimeMs": 2, "ReadDataTimeMs": 0, "WriteDataTimeMs": 3, "CommitAndPublishTimeMs": 18 }
The field definitions are as follows:
- TxnId:Load transaction ID, which is automatically generated by the system and unique globally.
- Label:Load Label, if not specified, the system will generate a UUID.
-
Status:
Load result. There are the following values:
- Success:Indicates that the load is successful and the data is visible.
- Publish Timeout:This status also indicates that the load has been completed, but the data may be visible later.
- Label Already Exists:The Label is duplicate, and the Label needs to be replaced.
- Fail:Load failed.
-
ExistingJobStatus:
Status of load job corresponding to existing Label.
This field will only be displayed when the Status is "Label Already Exists". The user can know the status of the existing load job corresponding to Label through this status. "RUNNING" means that the job is still executing, and "FINISHED" means that the job is successful.
- Message:Load error messages.
- NumberTotalRows:Load the total number of rows processed.
- NumberLoadedRows:Number of rows successfully loaded.
- NumberFilteredRows:Number of rows with unqualified data quality.
- NumberUnselectedRows:The number of rows filtered by the where condition.
- LoadBytes:Number of bytes loaded.
- LoadTimeMs:Load finish time, in milliseconds
- BeginTxnTimeMs:Time spent requesting FE to start a transaction, in milliseconds.
- StreamLoadPutTimeMs:It takes time to ask FE for the load data execution plan, in milliseconds.
- ReadDataTimeMs:Time spent on reading data, in milliseconds.
- WriteDataTimeMs:Time spent onexecuting the write data operation, in milliseconds.
- CommitAndPublishTimeMs:Time taken to submit and publish a transaction to the Fe request, in milliseconds.
- ErrorURL:If there is data quality problem, visit this URL to view the specific error line.
-
How to correctly submit Stream Load job and process the returned result.
Stream Load is a synchronous load operation, so the user needs to wait for the return result of the command synchronously and decide the next processing mode according to the return result.
The user's primary concern is to return the
Status
field in the result.If it is
Success
,everything is normal and other operations can be performed later.If there are a large number of
Publish Timeout
, in the returned results, it may indicate that some resources (such as IO) in the cluster are short at present, so that the loaded data cannot take effect finally. Load task inPublish Timeout
status has been successful, so there is no need to retry. however, it is suggested to slow down or stop the submission of new load task and observe the cluster load.If the returned result is
Fail
, and the problem should be checked according to specific reasons. Once resolved, you can try again using the same Label.In some cases, the user's HTTP connection may be abnormally disconnected, resulting in that the final return result cannot be obtained. At this time, you can resubmit the load task with the same Label, and the resubmitted task may have the following results:
Status
isSuccess
,Fail
orPublish Timeout
. At this time, it can be processed according to the normal process.Status
isLabel Already Exists
. continue to view theExistingJobStatus
field. If the value of this field isFINISHED
, it means that the load task corresponding to this Label has been successful, so there is no need to try again. If it isRUNNING
,it means that the load task corresponding to this Label is still running. At this time, the same Label should be used at regular intervals (for example, 10 seconds) to continue submitting repeatedly until theStatus
is no longer theLabel Already Exists
,or theExistingJobStatus
field value isFINISHED
.
-
Cancel the load task
Load tasks that have been submitted but not finished can be cancelled by the CANCEL LOAD command. After cancellation, the written data will also be rolled back and will not take effect.
-
Label, load transaction, multi-table atomicity
All load tasks in Palo are atomic. Moreover, the atomicity can be guaranteed by loading multiple tables in the same load task. At the same time, Palo can also ensure that data load is not lost or duplicated through the mechanism of Label. Specific instructions can be found in Load Transaction and Atomicity document.
-
Column mapping, derived columns and filtering
Palo can support very rich column conversion and filtering operations in load statements. 支Most built-in functions and UDFs are supported. For how to use this function correctly, please refer to Column mapping, derived columns and filtering document.
-
Error data filtering
Palo's load task can tolerate some malformed data. Tolerance ratecan be set by
max_filter_ratio
, which takes 0 by default, indicating that, when there is an error data, the whole load task will fail.If the user wants to ignore some data rows with problems, the secondary parameter can be set to a value between 0 and 1, and Palo will automatically skip the rows with incorrect data format.For some calculation methods of tolerance rate, please refer to Mapping, transformation and filtering of columns document.
-
strict mode
strict_mode
property is used to set whether the load task runs in strict mode. This format will affect the results of column mapping, transformation and filtering. For a detailed description of the strict mode, please refer to Strict Mode document. -
Timeout period
The default timeout for Stream Load is 10 minutes, starting from the time the task is submitted. If it is not completed within the timeout period, the task will fail.
-
Data volume and task number limit
Stream Load is suitable for loading data within a few GB, because the data is processed by single thread transmission, so the performance of loading too large data cannot be guaranteed. When a large amount of local data needs to be loaded, multiple load tasks can be submitted in parallel.
Palo also limits the number of load tasks running simultaneously in the cluster, usually ranging from 10 to 20. Load jobs submitted later will be rejected.