Baidu AI Cloud
中国站

百度智能云

All Product Document

          Data Warehouse

          ROUTINE-LOAD

          ROUTINE LOAD

          Description

          Routine Load function, which supports users to submit a resident load task and load data into Palo by continuously reading data from the specified data source.

          At present, it only supports loading CSV or Json format data from Kakfa by means of no authentication or SSL authentication.

          Syntax:

          CREATE ROUTINE LOAD [db.]job_name ON tbl_name
          [merge_type]
          [load_properties]
          [job_properties]
          FROM data_source [data_source_properties]
          • [db.]job_name

            The name of the load job. in the same database, only one job with the same name can run.

          • tbl_name

            Specify the name of the table to be loaded.

          • merge_type

            Data merger type. The default value is APPEND, which means that the data loaded are ordinary append write operations. MERGE and DELETE types are only applicable to Unique Key model tables. The MERGE type needs to be used with the [DELETE ON] statement to mark the Delete Flag column. The DELETE type means that all the data loaded this time are deleted data.

          • load_properties

            Used to describe loaded data, which consists of:

            [column_separator],
            [columns_mapping],
            [preceding_filter],
            [where_predicates],
            [partitions],
            [DELETE ON],
            [ORDER BY]
            • column_separator

              Specify column separator, default is \t

              COLUMNS TERMINATED BY ","

            • columns_mapping

              Used to specify the mapping relationship between file columns and columns in tables, and various column transformations. For a detailed introduction of this part, please refer to the [Mapping, Transformation and Filtering of Columns] document.

              (k1, k2, tmpk1, k3 = tmpk1 + 1)

            • preceding_filter

              Filter raw data. For a detailed introduction of this part, please refer to the [Mapping, Transformation and Filtering of Columns] document.

            • where_predicates

              Filter the loaded data according to the conditions. For a detailed introduction of this part, please refer to the [Mapping, Transformation and Filtering of Columns] document.

              WHERE k1 > 100 and k2 = 1000

            • partitions

              Specify which partition of the destination table to load. If not specified, it will be automatically loaded into the corresponding partition.

              PARTITION(p1, p2, p3)

            • DELETE ON

              It needs to be used together with MEREGE load mode, only for tables of Unique Key model. And it is used to specify the column and calculation relation of Delete Flag in loaded data.

              DELETE ON v3 >100

            • ORDER BY

              Only tables for the Unique Key model. It is used to specify the column representing Sequence Col in the loaded data. It is mainly used to ensure data order during loading.

          • job_properties

            Used to specify general parameters for routine load jobs.

            PROPERTIES (
                "key1" = "val1",
                "key2" = "val2"
            )

            At present we support the following parameters:

            1. desired_concurrent_number

              Expected degree of concurrency. A routine load job will be divided into several subtasks. This parameter specifies the maximum number of tasks that a job can perform at the same time. It must be greater than 0 and is 3 by default.

              This concurrency is not the actual concurrency. The actual concurrency will be comprehensively considered through the number of nodes in the cluster, the load situation and the data source situation.

              "desired_concurrent_number" = "3"

            2. max_batch_interval/max_batch_rows/max_batch_size

              These three parameters respectively represent:

              1. Maximum execution time of each subtask, in seconds. It ranges from 5 to 60 and is 10 by default.
              2. The maximum number of rows read per subtask, which must be greater than or equal to 200000 and is 200000 by default.
              3. The maximum number of lines read per subtask. The unit is bytes and the range is 100MB to 1GB. it takes 100MB by default.

              These three parameters are used to control the execution time and throughput of a subtask. When any one reaches the threshold, the task ends.

              "max_batch_interval" = "20",
              "max_batch_rows" = "300000",
              "max_batch_size" = "209715200"
            3. max_error_number

              The maximum number of error rows allowed in the sampling window, which must be greater than or equal to 0. The default value is 0, that is, error lines are not allowed.

              Sampling window is max_batch_rows * 10。That is, if the number of error rows is larger than max_error_number in the sampling window, routine jobs will be suspended, and manual intervention is needed to check data quality problems.

              Rows filtered out by the where condition are not considered as error rows.

            4. strict_mode

              Whether strict mode is turned on or not is turned off by default. If it is turned on, the column type transformation of non-empty original data will be filtered if the result is NULL. Specify the method as follows:

              "strict_mode" = "true"

            5. timezone

              Specify the time zone used by the load job. The default is to use the timezone parameter of Session. This parameter will affect the results of all functions related to time zone involved in load.

            6. format

              Specify the load data format, which is csv by default and supports json format.

            7. jsonpaths

              When the loaded data format is json, you can specify the fields in the extracted Json data through jsonpaths.

              -H "jsonpaths: [\"$.k2\", \"$.k1\"]"

            8. strip_outer_array

              When the loaded data format is json, strip_outer_array is true, which means that Json data is presented in the form of array, and each element in the data will be regarded as a row of data. The default is "False".

              -H "strip_outer_array: true"

            9. json_root

              When the loaded data format is json, you can specify the fields in the root Json data through json_root. Palo will extract the elements of the root node through json_root for parsing. It is empty by default.

              -H "json_root: $.RECORDS"

          • FROM data_source [data_source_properties]

            Data source type. Currently support:

            FROM KAFKA
            (
                "key1" = "val1",
                "key2" = "val2"
            )

            data_source_properties supports the following data source properties:

            1. kafka_broker_list

              Broker connection information for Kafka, in the format ip:host. Multiple broker are separated by commas.

              "kafka_broker_list" = "broker1:9092,broker2:9092"

            2. kafka_topic

              Specify the topic of Kafka to subscribe to.

              "kafka_topic" = "my_topic"

            3. kafka_partitions/kafka_offsets

              Specify the kafka partition that need to be subscribed, and the corresponding starting offset of each partition.

              Offset can specify a specific offset from 0 or more, or:

              • OFFSET_BEGINNING: Subscribe from where there is data.
              • OFFSET_END: Subscribe from the end.

              If not specified, all partition under topic will be subscribed by default from OFFSET_END .

              "kafka_partitions" = "0,1,2,3",
              "kafka_offsets" = "101,0,OFFSET_BEGINNING,OFFSET_END"
            4. property

              Specify custom kafka parameters. The function is equivalent to the "--property" parameter in kafka shell.

              When the value of the parameter is a file, you need to add keyword "FILE:" before the value.

              For information on how to create a file, please refer to CREATE FILE command document.

              See the CONFIGURATION items on the client side in the official configuration document of librdkafka for more supported custom parameters. Such as:

              "property.client.id" = "12345",
              "property.ssl.ca.location" = "FILE:ca.pem"
              1. The following parameters need to be specified when connecting Kafka using SSL:

                "property.security.protocol" = "ssl",
                "property.ssl.ca.location" = "FILE:ca.pem",
                "property.ssl.certificate.location" = "FILE:client.pem",
                "property.ssl.key.location" = "FILE:client.key",
                "property.ssl.key.password" = "abcdefg"

                In which:

                property.security.protocol and property.ssl.ca.location are required to indicate that the connection method is SSL and the location of the CA certificate.

                If client authentication is enabled on Kafka server side, it is also necessary to set:

                "property.ssl.certificate.location"
                "property.ssl.key.location"
                "property.ssl.key.password"

                Used to specify public key, private key and password of private key of client respectively.

              2. Specify the default starting offset of kafka partition

                If kafka_partitions/kafka_offsets is not specified, all partitions are consumed by default.

                At this time, the start offset of kafka_default_offsets can be specified, which is OFFSET_END by default, that is, subscribe from the end.

                Example:

                "property.kafka_default_offsets" = "OFFSET_BEGINNING"

          Example

          1. Create a Kafka routine load task named test1 for example_tbl of example_db. Specify column separator, group.id and client.id, automatically consume all partitions by default, and subscribe from the position where there is data (OFFSET_BEGINNING)

            CREATE ROUTINE LOAD example_db.test1 ON example_tbl
            COLUMNS TERMINATED BY ",",
            COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100)
            PROPERTIES
            (
                "desired_concurrent_number"="3",
                "max_batch_interval" = "20",
                "max_batch_rows" = "300000",
                "max_batch_size" = "209715200",
                "strict_mode" = "false"
            )
            FROM KAFKA
            (
                "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
                "kafka_topic" = "my_topic",
                "property.group.id" = "xxx",
                "property.client.id" = "xxx",
                "property.kafka_default_offsets" = "OFFSET_BEGINNING"
            );
          2. Create a Kafka routine load task named test1 for example_tbl of example_db. Load task is in strict mode.

            CREATE ROUTINE LOAD example_db.test1 ON example_tbl
            COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100),
            PRECEDING FILTER k1 = 1,
            WHERE k1 > 100 and k2 like "%palo%"
            PROPERTIES
            (
                "desired_concurrent_number"="3",
                "max_batch_interval" = "20",
                "max_batch_rows" = "300000",
                "max_batch_size" = "209715200",
                "strict_mode" = "false"
            )
            FROM KAFKA
            (
                "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
                "kafka_topic" = "my_topic",
                "kafka_partitions" = "0,1,2,3",
                "kafka_offsets" = "101,0,0,200"
            );
          3. Load data from Kafka cluster through SSL authentication. Set the client.id parameter at the same time. Load task is in non-strict mode and time zone is Africa/Abidjan

            CREATE ROUTINE LOAD example_db.test1 ON example_tbl
            COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100),
            WHERE k1 > 100 and k2 like "%palo%"
            PROPERTIES
            (
                "desired_concurrent_number"="3",
                "max_batch_interval" = "20",
                "max_batch_rows" = "300000",
                "max_batch_size" = "209715200",
                "strict_mode" = "false",
                "timezone" = "Africa/Abidjan"
            )
            FROM KAFKA
            (
                "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
                "kafka_topic" = "my_topic",
                "property.security.protocol" = "ssl",
                "property.ssl.ca.location" = "FILE:ca.pem",
                "property.ssl.certificate.location" = "FILE:client.pem",
                "property.ssl.key.location" = "FILE:client.key",
                "property.ssl.key.password" = "abcdefg",
                "property.client.id" = "my_client_id"
            );
          4. Load Json format data. By default, field names in Json are used as column name mappings. Specify to load into three partitions, 0, 1 and 2, with the initial offset of 0

            CREATE ROUTINE LOAD example_db.test_json_label_1 ON table1
            COLUMNS(category,price,author)
            PROPERTIES
            (
                "desired_concurrent_number"="3",
                "max_batch_interval" = "20",
                "max_batch_rows" = "300000",
                "max_batch_size" = "209715200",
                "strict_mode" = "false",
                "format" = "json"
            )
            FROM KAFKA
            (
                "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
                "kafka_topic" = "my_topic",
                "kafka_partitions" = "0,1,2",
                "kafka_offsets" = "0,0,0"
            );
          5. Load Json data, extract fields through Jsonpaths, and specify the root node of Json document

            CREATE ROUTINE LOAD example_db.test1 ON example_tbl
            COLUMNS(category, author, price, timestamp, dt=from_unixtime(timestamp, '%Y%m%d'))
            PROPERTIES
            (
                "desired_concurrent_number"="3",
                "max_batch_interval" = "20",
                "max_batch_rows" = "300000",
                "max_batch_size" = "209715200",
                "strict_mode" = "false",
                "format" = "json",
                "jsonpaths" = "[\"$.category\",\"$.author\",\"$.price\",\"$.timestamp\"]",
                "json_root" = "$.RECORDS"
                "strip_outer_array" = "true"
            )
            FROM KAFKA
            (
                "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
                "kafka_topic" = "my_topic",
                "kafka_partitions" = "0,1,2",
                "kafka_offsets" = "0,0,0"
            );
          6. Create a Kafka routine load task named test1 for example_tbl of example_db. And use conditional filtering.

            CREATE ROUTINE LOAD example_db.test1 ON example_tbl
            WITH MERGE
            COLUMNS(k1, k2, k3, v1, v2, v3),
            WHERE k1 > 100 and k2 like "%palo%",
            DELETE ON v3 >100
            PROPERTIES
            (
                "desired_concurrent_number"="3",
                "max_batch_interval" = "20",
                "max_batch_rows" = "300000",
                "max_batch_size" = "209715200",
                "strict_mode" = "false"
            )
            FROM KAFKA
            (
                "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
                "kafka_topic" = "my_topic",
                "kafka_partitions" = "0,1,2,3",
                "kafka_offsets" = "101,0,0,200"
            );
          7. Load data into a Unique Key model table with a sequence column

            CREATE ROUTINE LOAD example_db.test_job ON example_tbl
            COLUMNS TERMINATED BY ",",
            COLUMNS(k1,k2,source_sequence,v1,v2),
            ORDER BY source_sequence
            PROPERTIES
            (
                "desired_concurrent_number"="3",
                "max_batch_interval" = "30",
                "max_batch_rows" = "300000",
                "max_batch_size" = "209715200"
            ) FROM KAFKA
            (
                "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
                "kafka_topic" = "my_topic",
                "kafka_partitions" = "0,1,2,3",
                "kafka_offsets" = "101,0,0,200"
            );

          Keywords

          CREATE, ROUTINE, LOAD

          Best Practices

          TBD

          Previous
          RESTORE
          Next
          SELECT-INTO-OUTFILE