百度智能云

All Product Document

          Data Warehouse

          STREAM-LOAD

          STREAM LOAD

          Description

          Stream Load transmits and loads data to Palo through HTTP protocol. This mode is mainly for uploading and loading local data of users. But it is essentially an load framework, and its HTTP interface can not only support the transmission of local data, but also support the transmission of data from memory data and pipeline data to HTTP ports.

          • The public cloud user must use the HTTP protocol port of Compute Node(BE), which is 8040 by default.
          • Privatized deployment users can use the HTTP protocol port of Leader Node(FE), which is 8030 by default. It must ensure that the machine network where the client is located can be connected to the machine where Compute Node is located.

          This document mainly introduces the use mode of Stream Load by cURL command.

          curl -XPUT --location-trusted -u user:passwd \
          [-H "header1: xxx" -H "header2: xxx" ...] \
          -T data.txt \
          http://host:port/api/{db}/{table}/_stream_load
          • The request mode of HTTP is PUT
          • At present, HTTP chunked and non-chunked upload methods are supported. For non-chunked mode, the Header must contain Content-Length to identify the length of uploaded content to ensure the integrity of data.
          • Header should contain Expect Header: 100-continue,which can avoid unnecessary data transmission in some error scenarios.
          • There are two types of target host:port for command:

            1. HTTP protocol port pointing to FE.In this way, the FE will directly forward the request 307 to a random BE node. The final request and data communicate directly with this BE node. This method requires that the network of client and BE node can communicate normally.
            2. HTTP protocol port pointing to BE. The request directly interacts with the BE node.

            Note: Baidu cloud Palo users can directly connect to the HTTP protocol port of Compute Node.

          • Specify the database and table to be loaded in the two pathparameters {db}and {table} of the URL.
          • Other parameters of the load task are specified in the Header:

            • label

              Specify a Label for the load task to uniquely identify the job. If not specified, the system will automatically generate a UUID as a Label.

              -H "label: my_label1"

            • column_separator

              Used to specify the column separator in the load file, which is \t by default. If it is an invisible character, you need to prefix it with \x and use hexadecimal to represent the separator. For example, the delimiter \x01 of the hive file needs to be specified as \x01.

              -H "column_separator: ,"

            • columns

              Used to specify the mapping relationship between file columns and columns in tables, and various column transformations. For a detailed introduction of this part, please refer to Mapping, transformation and filtering of columns document.

              -H "columns: k1, k2, tmpk1, k3 = tmpk1 + 1"

            • where

              Filter the loaded data according to the conditions. For a detailed introduction of this part, please refer to Mapping, transformation and filtering of columns document.

              -H "where: k1 > 100"

            • max_filter_ratio

              Maximum tolerance of filterable (data nonstandard, etc.) data ratio. Zero tolerance by default. The value range is 0 to 1.

              -H "max_filter_ratio: 0.01"

            • partitions

              Specify the partition that needs to load data.

              -H "partitions: p1, p2"

            • timeout

              Specify the timeout for load, in seconds, which is 600s by default. The setting range is 1s to 14400s.

              -H "timeout: 120"

            • strict_mode

              Whether there are strict restrictions on data. The default is "False".

              -H "strict_mode: true"

            • timezone

              Specify the time zone used for this load. The default is East Eight Districts. This parameter will affect the results of all functions related to time zone involved in load.

              -H "timezone: Asia/Shanghai"

            • exec_mem_limit

              Load memory limits. Default to 2, in bytes.

              -H "exec_mem_limit: 4294967296"

            • format

              Specify the load data format. Support csv and json , csv by default.

              -H "format: json"

            • jsonpaths

              When the loaded data format is json, you can specify the fields in the extracted Json data through jsonpaths.

              -H "jsonpaths: [\"$.k2\", \"$.k1\"]"

            • strip_outer_array

              When the loaded data format is json, strip_outer_array is true , which means that Json data is presented in the form of array, and each element in the data will be regarded as a row of data. The default is "False".

              -H "strip_outer_array: true"

            • json_root

              When the format of loaded data is json, user may specify the root node of Json data by json_root . Palo will extract the elements of the root node through json_root for parsing. It is empty by default.

              -H "json_root: $.RECORDS"

            • merge_type

              The default value is APPEND, which means that this load is an ordinary append write operation. MERGE and DELETE types are only applicable to Unique Key model tables. The MERGE type needs to be used with the delete parameter to mark the Delete Flag column. The DELETE type means that all the data loaded this time are deleted data.

              -H "merge_type: MERGE"

            • delete: Meaningful only under MERGE type, used to specify the Delete Flag column and the conditions for marking the delete flag.

              -H "delete: col3 = 1"

            • function_column.sequence_col

              Only tables for the Unique Key model. It is used to specify the column representing Sequence Col in the loaded data. It is mainly used to ensure data order during loading.

              -H "function_column.sequence_col: col3"

            • fuzzy_parse

              When the loaded data format is Json array, and the field order of each row in the array is completely consistent. This parameter can be turned on to speed up the load. Generally it is used in combination with strip_outer_array: true . Details can be found in JSON FormatDataLoad Instructions.

              -H "fuzzy_parse: true"

          Example

          1. Load the local file testData and specify the timeout period

            curl --location-trusted -u admin -H "label:label1" -H "timeout:100" -T testData http://host:port/api/example_db/my_table/_stream_load
          2. Load the local file testData and filter the data according to the conditions

            curl --location-trusted -u admin -H "label:label2" -H "where: k1=20180601" -T testData http://host:port/api/example_db/my_table/_stream_load
          3. Load the local file testData and set the maximum allowable error rate

            curl --location-trusted -u admin -H "label:label3" -H "max_filter_ratio:0.2" -T testData http://host:port/api/example_db/my_table/_stream_load
          4. Load the local file testData and specify the the column mapping relationship

            curl --location-trusted -u admin -H "label:label4" -H "max_filter_ratio:0.2" -H "columns: k2, k1, v1" -T testData http://host:port/api/example_db/my_table/_stream_load
          5. Load the local file testData and specify the partition and maximum allowable error rate

            curl --location-trusted -u admin -H "label:label5" -H "max_filter_ratio:0.2" -H "partitions: p1, p2" -T testData http://host:port/api/example_db/my_table/_stream_load
          6. Load using streaming method

            seq 1 10 | awk '{OFS="\t"}{print $1, $1 * 10}' | curl --location-trusted -u admin -T - http://host:port/api/example_db/my_table/_stream_load
          7. Load a table with HLL columns

            curl --location-trusted -u admin -H "columns: k1, k2, v1=hll_hash(k1), v2=hll_empty()" -T testData http://host:port/api/example_db/my_table/_stream_load
          8. Load a table with BITMAP columns

            curl --location-trusted -u admin -H "columns: k1, k2, v1=to_bitmap(k1), v2=bitmap_empty()" -T testData http://host:port/api/example_db/my_table/_stream_load
          9. Load Json data, using simple mode. That is, the field name in Json data is the column name.

            The table structure is:

            category    varchar(512)
            author      varchar(512)
            title       varchar(512)
            price       double

            Json data:

            {"category":"C++","author":"avc","title":"C++ primer","price":895}

            Load command:

            curl --location-trusted -u admin -H "label:label10" -H "format: json" -T testData http://host:port/api/example_db/my_table/_stream_load
          10. Load Json data and extract fields using jsonpath

          json data format:

          {"category":"xuxb111","author":"1avc","title":"SayingsoftheCentury","price":895},

          Use jsonpath to extract three fields: category, author, price .

          curl --location-trusted -u admin -H "columns: category, price, author" -H "label:123" -H "format: json" -H "jsonpaths: [\"$.category\",\"$.price\",\"$.author\"]" -H "strip_outer_array: true" -T testData http://host:port/api/example_db/my_table/_stream_load
          1. Load Json data, specify Json document root node, and flatten array.

            json data format:

            {
            "RECORDS":[
                {"category":"11","title":"SayingsoftheCentury","price":895,"timestamp":1589191587},
                {"category":"22","author":"2avc","price":895,"timestamp":1589191487},
                {"category":"33","author":"3avc","title":"SayingsoftheCentury","timestamp":1589191387}
                ]
            }

            Use jsonpath to extract three fields: category, author, price .

            curl --location-trusted -u admin -H "columns: category, price, author" -H "label:label12" -H "format: json" -H "jsonpaths: [\"$.category\",\"$.price\",\"$.author\"]" -H "strip_outer_array: true" -H "json_root: $.RECORDS" -T testData http://host:port/api/example_db/my_table/_stream_load
          2. Use DELETE mode to delete the same data as this batch of loaded keys

            curl --location-trusted -u admin -H "merge_type: DELETE" -T testData http://host:port/api/example_db/my_table/_stream_load
          3. Use MERGE mode. Delete the columns in a batch of data that match the data whose flag column is true, and append other rows normally

            curl --location-trusted -u admin -H "column_separator:," -H "columns: siteid, citycode, username, pv, flag" -H "merge_type: MERGE" -H "delete: flag=1" -T testData http://host:port/api/example_db/my_table/_stream_load
          4. Load data into a Unique Key model table with a Sequence Col column

            curl --location-trusted -u admin -H "columns: k1,k2,source_sequence,v1,v2" -H "function_column.sequence_col: source_sequence" -T testData http://host:port/api/example_db/my_table/_stream_load

          Keywords

          STREAM, LOAD

          Best Practices

          1. View load task status

            Stream Load is a synchronous load process, and the successful execution of statements means the successful load of data. The execution result of the load will be returned synchronously through HTTP return value. And present it in Json format.

            {
                "TxnId": 17,
                "Label": "707717c0-271a-44c5-be0b-4e71bfeacaa5",
                "Status": "Success",
                "Message": "OK",
                "NumberTotalRows": 5,
                "NumberLoadedRows": 5,
                "NumberFilteredRows": 0,
                "NumberUnselectedRows": 0,
                "LoadBytes": 28,
                "LoadTimeMs": 27,
                "BeginTxnTimeMs": 0,
                "StreamLoadPutTimeMs": 2,
                "ReadDataTimeMs": 0,
                "WriteDataTimeMs": 3,
                "CommitAndPublishTimeMs": 18
            }

            The field definitions are as follows:

            • TxnId:Load transaction ID, which is automatically generated by the system and unique globally.
            • Label:Load Label, if not specified, the system will generate a UUID.
            • Status:

              Load result. There are the following values:

              • Success:Indicates that the load is successful and the data is visible.
              • Publish Timeout:This status also indicates that the load has been completed, but the data may be visible later.
              • Label Already Exists:The Label is duplicate, and the Label needs to be replaced.
              • Fail:Load failed.
            • ExistingJobStatus:

              Status of load job corresponding to existing Label.

              This field will only be displayed when the Status is "Label Already Exists". The user can know the status of the existing load job corresponding to Label through this status. "RUNNING" means that the job is still executing, and "FINISHED" means that the job is successful.

            • Message:Load error messages.
            • NumberTotalRows:Load the total number of rows processed.
            • NumberLoadedRows:Number of rows successfully loaded.
            • NumberFilteredRows:Number of rows with unqualified data quality.
            • NumberUnselectedRows:The number of rows filtered by the where condition.
            • LoadBytes:Number of bytes loaded.
            • LoadTimeMs:Load finish time, in milliseconds
            • BeginTxnTimeMs:Time spent requesting FE to start a transaction, in milliseconds.
            • StreamLoadPutTimeMs:It takes time to ask FE for the load data execution plan, in milliseconds.
            • ReadDataTimeMs:Time spent on reading data, in milliseconds.
            • WriteDataTimeMs:Time spent onexecuting the write data operation, in milliseconds.
            • CommitAndPublishTimeMs:Time taken to submit and publish a transaction to the Fe request, in milliseconds.
            • ErrorURL:If there is data quality problem, visit this URL to view the specific error line.
          2. How to correctly submit Stream Load job and process the returned result.

            Stream Load is a synchronous load operation, so the user needs to wait for the return result of the command synchronously and decide the next processing mode according to the return result.

            The user's primary concern is to return the Status field in the result.

            If it is Success,everything is normal and other operations can be performed later.

            If there are a large number of Publish Timeout, in the returned results, it may indicate that some resources (such as IO) in the cluster are short at present, so that the loaded data cannot take effect finally. Load task in Publish Timeout status has been successful, so there is no need to retry. however, it is suggested to slow down or stop the submission of new load task and observe the cluster load.

            If the returned result is Fail, and the problem should be checked according to specific reasons. Once resolved, you can try again using the same Label.

            In some cases, the user's HTTP connection may be abnormally disconnected, resulting in that the final return result cannot be obtained. At this time, you can resubmit the load task with the same Label, and the resubmitted task may have the following results:

            1. Status is SuccessFail or Publish Timeout. At this time, it can be processed according to the normal process.
            2. Status is Label Already Exists. continue to view the ExistingJobStatus field. If the value of this field isFINISHED, it means that the load task corresponding to this Label has been successful, so there is no need to try again. If it is RUNNING,it means that the load task corresponding to this Label is still running. At this time, the same Label should be used at regular intervals (for example, 10 seconds) to continue submitting repeatedly until the Status is no longer the Label Already Exists,or the ExistingJobStatus field value is FINISHED .
          3. Cancel the load task

            Load tasks that have been submitted but not finished can be cancelled by the CANCEL LOAD command. After cancellation, the written data will also be rolled back and will not take effect.

          4. Label, load transaction, multi-table atomicity

            All load tasks in Palo are atomic. Moreover, the atomicity can be guaranteed by loading multiple tables in the same load task. At the same time, Palo can also ensure that data load is not lost or duplicated through the mechanism of Label. Specific instructions can be found in Load Transaction and Atomicity document.

          5. Column mapping, derived columns and filtering

            Palo can support very rich column conversion and filtering operations in load statements. 支Most built-in functions and UDFs are supported. For how to use this function correctly, please refer to Column mapping, derived columns and filtering document.

          6. Error data filtering

            Palo's load task can tolerate some malformed data. Tolerance ratecan be set by max_filter_ratio , which takes 0 by default, indicating that, when there is an error data, the whole load task will fail.If the user wants to ignore some data rows with problems, the secondary parameter can be set to a value between 0 and 1, and Palo will automatically skip the rows with incorrect data format.

            For some calculation methods of tolerance rate, please refer to Mapping, transformation and filtering of columns document.

          7. strict mode

            strict_mode property is used to set whether the load task runs in strict mode. This format will affect the results of column mapping, transformation and filtering. For a detailed description of the strict mode, please refer to Strict Mode document.

          8. Timeout period

            The default timeout for Stream Load is 10 minutes, starting from the time the task is submitted. If it is not completed within the timeout period, the task will fail.

          9. Data volume and task number limit

            Stream Load is suitable for loading data within a few GB, because the data is processed by single thread transmission, so the performance of loading too large data cannot be guaranteed. When a large amount of local data needs to be loaded, multiple load tasks can be submitted in parallel.

            Palo also limits the number of load tasks running simultaneously in the cluster, usually ranging from 10 to 20. Load jobs submitted later will be rejected.

          Previous
          EXPORT
          Next
          RESTORE