百度智能云

All Product Document

          Data Warehouse

          BROKER-LOAD

          BROKER LOAD

          Description

          This command is mainly used to load data from remote storage (such as BOS and HDFS) through Broker service process.

          LOAD LABEL load_label
          (
          data_desc1[, data_desc2, ...]
          )
          WITH BROKER broker_name
          [broker_properties]
          [load_properties];
          • load_label

            Each load needs to specify a unique Label. The job progress can be viewed through this label later.

            [database.]label_name

          • data_desc1

            Used to describe a set of files that need to be loaded.

            [MERGE|APPEND|DELETE]
            DATA INFILE
            (
            "file_path1"[, file_path2, ...]
            )
            [NEGATIVE]
            INTO TABLE `table_name`
            [PARTITION (p1, p2, ...)]
            [COLUMNS TERMINATED BY "column_separator"]
            [FORMAT AS "file_type"]
            [(column_list)]
            [COLUMNS FROM PATH AS (c1, c2, ...)]
            [PRECEDING FILTER predicate]
            [SET (column_mapping)]
            [WHERE predicate]
            [DELETE ON expr]
            [ORDER BY source_sequence]
            • [MERGE|APPEND|DELETE]

              The default value is APPEND, which means that this load is an ordinary append write operation. MERGE and DELETE types are only applicable to Unique Key model tables. The MERGE type needs to be used with the [DELETE ON] statement to label the Delete Flag column. The DELETE type means that all the data loaded this time are deleted data.

            • DATA INFILE

              Specify the file path to be loaded. It can be multiple. Wildcards can be used. The path must eventually match the file, and if it only matches the directory, the load will fail.

            • NEGTIVE

              This keyword is used to indicate that this load is a batch of "negative" loads. This method is only for aggregate data tables with integer SUM aggregate type. In this way, the integer value corresponding to the SUM aggregate column in the loaded data will be inverted. It is mainly used to offset the wrong data loaded before.

            • PARTITION(p1, p2, ...)

              User can specify some partitions only loaded with tables. Data not in the partition will be ignored.

            • COLUMNS TERMINATED BY

              Specify the column separator. Valid only in CSV format. Only single byte separators can be specified.

            • FORMAT AS

              Specify the file type and support CSV, PARQUET and ORC formats. Default to CSV.

            • column list

              Used to specify the order of columns in the original file. For a detailed introduction of this part, please refer to Mapping, Transformation and Filtering of Columns document.

              (k1, k2, tmpk1)

            • COLUMNS FROM PATH AS

              Specify the columns extracted from the load file path.

            • PRECEDING FILTER predicate

              Pre-filtering condition. Data are spliced into original data rows in sequence firstly according to column list and COLUMNS FROM PATH AS , and then filtered by the pre-filtering conditions. For a detailed introduction of this part, please refer to Mapping, Transformation and Filtering of Columns document.

            • SET (column_mapping)

              Specify the conversion function for the column.

            • WHERE predicate

              Filter the loaded data according to the conditions. For a detailed introduction of this part, please refer to Mapping, Transformation and Filtering of Columns document.

            • DELETE ON expr

              It needs to be used together with MEREGE load mode, only for tables of Unique Key model. And it is used to specify the column and calculation relation of Delete Flag in loaded data.

            • ORDER BY

              Only tables for the Unique Key model. It is used to specify the column representing Sequence Col in the loaded data. It is mainly used to ensure data order during loading.

          • WITH BROKER broker_name

            Specify the name of the Broker service to use. In the public cloud Palo. Broker service name is bos

          • broker_properties

            Specify the information required by the broker. This information is often used for remote storage systems that Broker can access. For BOS or HDFS. For specific information, please refer to Broker document.

            (
                "key1" = "val1",
                "key2" = "val2",
                ...
            )
          • load_properties

            Specify parameters related to the load. Currently the following parameters are supported:

            • timeout

              Load timeout period. Default to 4 hours, in seconds.

            • max_filter_ratio

              Maximum tolerance of filterable (data nonstandard, etc.) data ratio. Zero tolerance by default. The value range is 0 to 1.

            • exec_mem_limit

              Load memory limits. Default to 2 GB, in bytes.

            • strict_mode

              Whether there are strict restrictions on data. Default to false.

            • timezone

              Specify the time zone of some functions affected by time zone, such as strftime/alignment_timestamp/from_unixtime etc., for details, please refer to Time Zone document. If not specified, use the "Asia/Shanghai" time zone.

          Example

          1. Load a batch of data from BOS

            LOAD LABEL example_db.label1
            (
                DATA INFILE("bos://my_bucket/input/file.txt")
                INTO TABLE `my_table`
                COLUMNS TERMINATED BY ","
            )
            WITH BROKER bos
            (
                "bos_endpoint" = "http://bj.bcebos.com",
                "bos_accesskey" = "xxxxxxxxxxxxxxxxxxxxxxxxxx",
                "bos_secret_accesskey"="yyyyyyyyyyyyyyyyyyyyyyyyyy"
            );

            Load the file file.txt, separated by commas, and load it into the table my_table.

          2. Load data from BOS and use wildcards to match two batches of files. Load into two tables respectively.

            LOAD LABEL example_db.label2
            (
                DATA INFILE("bos://my_bucket/input/file-10*")
                INTO TABLE `my_table1`
                PARTITION (p1)
                COLUMNS TERMINATED BY ","
                (k1, tmp_k2, tmp_k3)
                SET (
                    k2 = tmp_k2 + 1,
                    k3 = tmp_k3 + 1
                )
                DATA INFILE("bos://my_bucket/input/file-20*")
                INTO TABLE `my_table2`
                COLUMNS TERMINATED BY ","
                (k1, k2, k3)
            )
            WITH BROKER bos
            (
                "bos_endpoint" = "http://bj.bcebos.com",
                "bos_accesskey" = "xxxxxxxxxxxxxxxxxxxxxxxxxx",
                "bos_secret_accesskey"="yyyyyyyyyyyyyyyyyyyyyyyyyy"
            );

            Use wildcard matching to load two batches of files file-10* and file-20*。Load intomy_table1 and my_table2 , in which, my_table1 is designated to be loaded to p1 after adding +1 to values in the second columns and third columns in source files loaded.

          3. Load a batch of data from HDFS.

            LOAD LABEL example_db.label3
            (
                DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/*/*")
                INTO TABLE `my_table`
                COLUMNS TERMINATED BY "\\x01"
            )
            WITH BROKER my_hdfs_broker
            (
                "username" = "",
                "password" = "",
                "dfs.nameservices" = "my_ha",
                "dfs.ha.namenodes.my_ha" = "my_namenode1, my_namenode2",
                "dfs.namenode.rpc-address.my_ha.my_namenode1" = "nn1_host:rpc_port",
                "dfs.namenode.rpc-address.my_ha.my_namenode2" = "nn2_host:rpc_port",
                "dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
            );

            Specify the default delimiter \\x01,and use wildcard * to specify all files in all directories under the data directory. Use simple authentication and configure namenode HA at the same time.

          4. Load Parquet FORMAT data and specify parquet as format. It is judged by file suffix by default.

            LOAD LABEL example_db.label4
            (
                DATA INFILE("bos://bucket/input/file")
                INTO TABLE `my_table`
                FORMAT AS "parquet"
                (k1, k2, k3)
            )
            WITH BROKER bos
            (
                "bos_endpoint" = "http://bj.bcebos.com",
                "bos_accesskey" = "xxxxxxxxxxxxxxxxxxxxxxxxxx",
                "bos_secret_accesskey"="yyyyyyyyyyyyyyyyyyyyyyyyyy"
            );
          5. Load the data and extract the partition fields in the file path

            LOAD LABEL example_db.label10
            (
                DATA INFILE("hdfs://hdfs_host:hdfs_port/input/city=beijing/*/*")
                INTO TABLE `my_table`
                FORMAT AS "csv"
                (k1, k2, k3)
                COLUMNS FROM PATH AS (city, utc_date)
            )
            WITH BROKER hdfs
            (
                "username"="hdfs_user",
                "password"="hdfs_password"
            );

            Columns inmy_table are k1, k2, k3, city, utc_date.

            In which, the catalog hdfs://hdfs_host:hdfs_port/user/palo/data/input/dir/city=beijing contains the following files:

            hdfs://hdfs_host:hdfs_port/input/city=beijing/utc_date=2020-10-01/0000.csv
            hdfs://hdfs_host:hdfs_port/input/city=beijing/utc_date=2020-10-02/0000.csv
            hdfs://hdfs_host:hdfs_port/input/city=tianji/utc_date=2020-10-03/0000.csv
            hdfs://hdfs_host:hdfs_port/input/city=tianji/utc_date=2020-10-04/0000.csv

            The file only contains three columns of data: k1, k2, k3 , and the two columns of city, utc_date will be extracted from the file path.

          6. Filter the data to be loaded.

            LOAD LABEL example_db.label6
            (
                DATA INFILE("bos://bucket/input/file")
                INTO TABLE `my_table`
                (k1, k2, k3)
                PRECEDING FILTER k1 = 1
                SET (
                    k2 = k2 + 1
                )
                WHERE k1 > k2
            )
            WITH BROKER bos
            (
                "bos_endpoint" = "http://bj.bcebos.com",
                "bos_accesskey" = "xxxxxxxxxxxxxxxxxxxxxxxxxx",
                "bos_secret_accesskey"="yyyyyyyyyyyyyyyyyyyyyyyyyy"
            );

            Only in the original data, k1 = 1,and after conversion, rows with k1 > k2 will be loaded.

          7. Load data, extract the time partition field in the file path, and the time contains %3A (in hdfs path,':' is not allowed, all':' will be replaced by %3A)

            LOAD LABEL example_db.label7
            (
                DATA INFILE("hdfs://host:port/user/data/*/test.txt") 
                INTO TABLE `tbl12`
                COLUMNS TERMINATED BY ","
                (k2,k3)
                COLUMNS FROM PATH AS (data_time)
                SET (
                    data_time=str_to_date(data_time, '%Y-%m-%d %H%%3A%i%%3A%s')
                )
            )
            WITH BROKER hdfs
            (
                "username"="user",
                "password"="pass"
            );

            There are the following files under the path:

            /user/data/data_time=2020-02-17 00%3A00%3A00/test.txt
            /user/data/data_time=2020-02-18 00%3A00%3A00/test.txt

            The table structure is:

            data_time DATETIME,
            k2        INT,
            k3        INT
          8. Load a batch of data from HDFS, and specify timeout and filtering ratio. Broker using plaintext my_hdfs_broker. Simple authentication. And delete the columns in the original data that match the columns with v2 greater than 100 in the loaded data, and load the other columns normally

            LOAD LABEL example_db.label8
            (
                MERGE DATA INFILE("bos://bucket/input/file")
                INTO TABLE `my_table`
                (k1, k2, k3, v2, v1)
                DELETE ON v2 > 100
            )
            WITH BROKER bos
            (
                "bos_endpoint" = "http://bj.bcebos.com",
                "bos_accesskey" = "xxxxxxxxxxxxxxxxxxxxxxxxxx",
                "bos_secret_accesskey"="yyyyyyyyyyyyyyyyyyyyyyyyyy"
            )
            PROPERTIES
            (
                "timeout" = "3600",
                "max_filter_ratio" = "0.1"
            );

            Load using MERGE method.my_table must be a Unique Key table. When the value of v2 column in the loaded data is greater than 100, the row is considered as a deleted row.

            The timeout of the load task is 3600 seconds, and the allowable error rate is within 10%.

          9. Specify the source_sequence column during load to ensure the replacement order in the UNIQUE_KEYS table:

            LOAD LABEL example_db.label9
            (
                DATA INFILE("bos://bucket/input/file")
                INTO TABLE `my_table`
                COLUMNS TERMINATED BY ","
                (k1,k2,source_sequence,v1,v2)
                ORDER BY source_sequence
            ) 
            WITH BROKER bos
            (
                "bos_endpoint" = "http://bj.bcebos.com",
                "bos_accesskey" = "xxxxxxxxxxxxxxxxxxxxxxxxxx",
                "bos_secret_accesskey"="yyyyyyyyyyyyyyyyyyyyyyyyyy"
            )

            my_table must be an Unqiue Key model table with sequcecol specified. Data will be ordered according to the value of source_sequence column in the source data.

          Keywords

          BROKER, LOAD

          Best Practices

          1. View load task status

            Broker Load is an asynchronous load process. Successful execution of statements only means successful submission of load tasks, but does not mean successful load of data. Load status should be viewed by SHOW LOAD command.

          2. Cancel the load task

            Load tasks that have been submitted but not finished can be cancelled by CANCEL LOAD command. After cancellation, the written data will also be rolled back and will not take effect.

          3. Label, load transaction, multi-table atomicity

            All load tasks in Palo are atomic. Moreover, the atomicity can be guaranteed by loading multiple tables in the same load task. At the same time, Palo can also ensure that data load is not lost or duplicated through the mechanism of Label.Specific instructions can be found in Load Transactions and Atomicity document.

          4. Column mapping, derived columns and filtering

            Palo can support very rich column conversion and filtering operations in load statements. Most built-in functions and UDFs are supported. For how to use this function correctly, please refer to Mapping, transformation and filtering of columns document.

          5. Error data filtering

            Palo's load task can tolerate some malformed data. Tolerance rate can be set by max_filter_ratio , which is 0 by default, which means that, when there is an error data, the whole load task will fail. If the user wants to ignore some data rows with problems, the secondary parameter can be set to a value between 0 and 1, and Palo will automatically skip the rows with incorrect data format.

            For some calculation methods of tolerance rate, please refer to Mapping, transformation and filtering of columns document.

          6. Strict mode

            strict_mode property is used to set whether the load task runs in strict mode. This format will affect the results of column mapping, transformation and filtering. For a detailed description of the strict mode, please refer to [Strict Mode]PALO/Operating Manual/Data Load/Strict Mode.md#) document.

          7. Timeout period

            The default timeout for Broker Load is 4 hours, starting from the time when the task is submitted.If it is not completed within the timeout period, the task will fail.

          8. Data volume and task number limit

            Broker Load is suitable for loading data within 100GB in one load task. Although theoretically there is no upper limit to the amount of data loaded in an load task. However, submitting too large an load will lead to a long running time, and the cost of retry after failure will also increase.

            At the same time, due to the cluster size, we limit the maximum amount of loaded data to the number of ComputeNode nodes * 3GB. So as to ensure the rational utilization of system resources.If there is a large amount of data to be loaded, it is recommended to submit it in multiple load tasks.

            Palo also limits the number of load tasks running simultaneously in the cluster, usually ranging from 3 to 10. Load jobs submitted afterwards will be queued. The maximum queue length is 100. Subsequent submissions will be rejected directly. Note that the queue time is also calculated into the total job time. If it times out, the job will be cancelled. Therefore, it is suggested to reasonably control the frequency of job submission by monitoring the operation status.

          Previous
          DDL
          Next
          BACKUP