百度智能云

All Product Document

          Database Transmission Server

          Migrate the Data from the MySQL to the Kafka Using the DTS

          Applicable Scenarios

          This article is suitable for using Baidu AI Cloud Data Transmission Service DTS (hereinafter referred to as DTS) to migrate data from self-built MySQL or RDS for MySQL database instances to Baidu Messaging System Topic or self-built Kafka cluster.

          Migration Precondition

          • An instance of MySQL database is created as the migration source with a version of 5.5, 5.6, or 5.7.
          • The Kafka cluster or Baidu Messaging System topic that is the target of the migration has been created. The self-built Kafka cluster supports version 0.9 or 0.10.

          Source MySQL Migration Privilege Requirements

          When the source is a self-built database, the user needs to provide a migration account meeting the privilege requirements. Users can authorize the existing account for migration or create a new account for migration. Please see the following authorization statement: GRANT SELECT, LOCK TABLES, REPLICATION SLAVE, REPLICATION CLIENT, SHOW VIEW ON ∗.∗ TO'Migrated Account' @ 'CVM Server' IDENTIFIED BY 'Migration Password'; Please fill in the "Migration Account" and the "Migration Password" as required; fill in the "Hostname" with reference to DTS Public Network IP Address Range, to release access restriction only to DTS server IP, or fill in % to open access to all IPs. After authorization, continue to refresh the system privilege table, and execute FLUSH PRIVILEGES;, ensure that the account privilege takes effect immediately.

          Kafka Environmental Preparation at Target

          The target end is the subject of Baidu Messaging System

          It is not needed to prepare the environment, and you can directly configure DTS tasks. For DTS operation steps, refer to: [[Operation Steps]](https://cloud.baidu.com/doc/DTS/s/fjyzm6rzu#Operation step)

          The target end is a self-built Kafka cluster

          For the network isolation between the DTS service control node and the self-built Kafka cluster at the target, it is required to configure the access routing rules for the self-built Kafka cluster. You can choose different access methods as needed and configure your Kafka cluster step by step.

          Access your Kafka cluster through the public network

          If you want DTS to access your Kafka cluster through a public network link, you need to configure public network access for each machine in the Kafka cluster. Provided there are three brokers in the Kafka cluster at target, and their public IPs are 106.0.0.1, 106.0.0.2, and 106.0.0.3; their private IPs are 172.16.0.1, 172.16.0.2 and 172.16.0.3. You need to make the following configuration in the configuration file server.properties of each broker, taking broker1 as an example (public IP: 106.0.0.1, private IP: 172.16.0.1):

          Listeners
          listeners=INTERNAL://172.16.0.1:9092,PUBLIC://172.16.0.1:19092

          Listeners are configuration items used to define a broker's listener. Connection information (172.16.0.1:9092) under the INTERNAL tag is used for internal communication between brokers. Private IP(172.16.0.1) is configured here, indicating that network communication between brokers can be realized through a private network. If you want the communication between brokers to go through the public network link, you can change to configure the public IP (106.0.0.1). The connection information marked by the PUBLIC tag (172.16.0.1:19092) is used for network communication with the public network. Note: The IP configured here shall be consistent with the IP under the INTERNAL tab, but the ports must be different.

          Advertised.listeners
          advertised.listeners=INTERNAL://172.16.0.1:9092,PUBLIC://106.0.0.1:19092

          Listeners are used to publish a broker's listener information to Zookeeper for clients or other brokers to query. If advertised.listeners is configured, the information configured by listeners will not be published to Zookeeper. The connection information (172.16.0.1:9092) under the INTERNAL tag is consistent with the listeners configuration above, but the connection information (106.0.0.1:19092) under the PUBLIC tab needs to be entered with the public IP.

          Listener.security.protocol.map
          listener.security.protocol.map=INTERNAL:PLAINTEXT,PUBLIC:PLAINTEXT

          Listener.security.protocol.map is used to configure the listener's security protocol. Here you can configure different security protocols for different connection methods according to your needs. In the example, by default, INTERNAL and PUBLIC are both configured with security protocol without access control (PLAINTEXT).

          Inter.broker.listener.name
          inter.broker.listener.name=INTERNAL

          Inter.broker.listener.name is used to specify a tab as the connection method of the internal listener. The listener represented by this tab is specially used for communication between brokers in the Kafka cluster. In the example, the value of the field is configured as INTERNAL, indicating the hope to perform communication between brokers through the private network.

          Start broker

          After completing the configuration of the above four parameters, save the modification and exit to the Kafka root directory. Execute the following command to start broker1. Then follow the same steps to configure and start broker2 and broker3.

          nohup ./bin/kafka-server-start.sh ./config/server.properties &

          Access your Kafka cluster through the private network of Baidu AI Cloud

          Except for the public network self-built instance, DTS also currently supports the Kafka clusters with BBC downstream or self-built by BBC. For the cluster is deployed on Baidu AI Cloud, users can select to bind EIP and make DTS access Kafka cluster through a public network or directly access the Kafka cluster through Baidu AI Cloud private network.

          For public network access, please see the previous chapter. This chapter introduces how to configure the BBC/BCC self-built Kafka cluster to enable DTS to access the Kafka cluster through Baidu AI Cloud private network.

          Query PNET IP

          In the Baidu AI Cloud private network, PNET IP is used to uniquely identify a virtual machine instance. DTS uses PNET IP to correctly access your Kafka cluster in the Baidu AI Cloud private network. By executing the following command on your own BBC/BCC instance command line, you can obtain the PNET IP of the instance.

          curl http://169.254.169.254/2009-04-04/meta-data/public-ipv4

          PNET IP.png

          Here, taking broker1 as an example (PNET IP: 10.0.0.1, private IP: 192.168.0.1), modify the 4 network communication configuration items in server.properties. For the meaning of each configuration item, please see the above section of access through a public network.

          Listeners
          listeners=INTERNAL://192.168.0.1:9092,EXTERNAL://192.168.0.1:19092

          The IP configured by INTERNAL here is the private IP of Baidu AI Cloud VPC. You can query the instance private IP on the instance details page of BCC or BBC.

          Listener under the EXTERNAL tab indicates accessing the broker's connection information through PNET IP. Note: IP configured here shall be consistent with IP under the INTERNAL tab, but the ports must be different.

          Advertised.listeners
          advertised.listeners=INTERNAL://192.168.0.1:9092,EXTERNAL://10.0.0.1:19092

          Here, the advertised.listeners corresponding to the EXTERNAL tab is configured to PNET IP: Listening port. The content of the INTERNAL tab is consistent with the configuration item listeners.

          Listener.security.protocol.map
          listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT

          Here you can configure different security protocols for different connection methods according to your needs. In the example, by default, INTERNAL and EXTERNAL are both configured with security protocol without access control (PLAINTEXT).

          Inter.broker.listener.name
          inter.broker.listener.name=INTERNAL

          In the example, the value of the field is configured as INTERNAL, indicating the communication between brokers can be performed through Baidu AI Cloud VPC subnet.

          Operation Steps

          1. Create Migration Task

          On the management console of DTS, click the "Data Migration" tab on the left and click the "Create Migration Task" button.

          image.png

          If your source is RDS for MySQL instance, the source location is Baidu AI Cloud Database. If it is public network/BBC/BCC self-built MySQL instance, the source location is Self-built Data Storage. Select Self-built Data Storage at target location.

          image.png

          Then click "Next" to complete the purchase configuration. The page jumps to the task list page of the management console. An Unconfigured DTS task is added at the top of the list page, which is the migration task you created. Click "Configure Task" to configure the task. If no new task is found on the list page after the page jumps, it is suggested to wait for a period of time to refresh the page.

          image.png

          2. Configure Task

          First, enter the task connection configuration page. In the figure, take Baidu AI Cloud database as an example, and select RDS for MySQL instance at source as required.

          image.png

          When configuring the connection information at target, first select the access type according to the access mode of the Kafka cluster at target.

          If the target end is the subject of Baidu Messaging System, select Baidu Messaging System as the access type, and select the corresponding region and topic ID.

          image.png

          If the target end is a self-built Kafka cluster, as shown in the figure below, select the corresponding access type according to two methods: public network access and Baidu cloud internal network access.

          Note: The IP in the Broker list must be filled with PNET IP

          image.png

          Then fill in other information as required. Note: Currently, DTS only supports Kafka clusters of 0.9 and 0.10 versions, and supports Kafka clusters of 0.10 version to configure SASL access control.

          image.png

          Click "Authorize Whitelist and Enter Next", and select the migration object of the instance at the source.

          Click "Save and Pre-check" to complete the creation of task and then view the task status in the task list.

          • The status column displays "Pre-check Passed". You can select and start the migration task. After the task is started, you can view the migration progress in the task progress column.
          • The status column displays "Pre-check Failed". Click the button next to view the reason for failure and modify it. Restart the check and start the migration task until successful.

          For detailed interpretation of pre-check items, please see Data Migration Operation Guide - Pre-check.

          3. Start Migration

          After the pre-check, you can start the task on the task list page.

          image.png

          4. Check Kafka Cluster Data

          After the task gets started, DTS pulls the Full reference or Incremental change data from the database instance at source and write it into the designated Topic of the Kafka cluster at the target in a fixed format.s The specific format is as follows:

          //json structure
          [   //The outermost layer is an array
          {   //The first record, a message contain 1 to more records
              "TIME":"20180831165311",          //Timestamp
              "GLOBAL_ID":"xxxxxxxx",           //Globally unique ID, consumers can use this ID to deduplicate
              "DATABASE":"db1",           //Database Name
              "TABLE":"tbl1",         //Table Name
              "GROUP_ID":"xxxxxxxxxxx",         //Useless to consumers
              "EVENT_SERVER_ID":"xxxxxx",       //Useless to consumers
              "BINLOG_NAME":"xxxxxx",           //Useless to consumers
              "BINLOG_POS":"xxxxxx",            //Useless to consumers
              "TYPE":"U",                       //Change type, I is insert, U is update, D is delete
              "OLD_VALUES":{                    //"Column name": "Column value" group for each column after change, if the change type is D, OLD_VALUES  does not exist.
                  "key1":"old-value1",
                  "key2":"old-value2",
                  ...
              },
              "NEW_VALUES":{                    //"Column name": "Column value" group for each column after change, if the change type is D, NEW_VALUES does not exist.
                  "key1":"new-value1",
                  "key2":"new-value2",
                  ...
              }
          },  
          {   //Second line record
              "TIME":"20180831165311",
              "DATABASE":"db1",
              ...
          }
          ... //More line records
          ]
          Previous
          Operation Guide
          Next
          Migrate the MySQL to the Baidu AI Cloud RDS from the Alibaba Cloud Database Using the DTS