百度智能云

All Product Document

          MapReduce

          Flink

          Apache Flink is an open-source streaming processing framework developed by Apache Software Foundation, and its core is the distributed data stream engine written by Java and Scala. Flink executes the random stream data program in ways of data parallelism and pipeline, and the system can execute batch processing and stream processing programs when Flink’s pipeline is running. Besides, Flink supports the execution of iterative algorithm when it is running.

          Create Clusters

          Log in to Baidu AI Cloud console, select "Product Service->Baidu MapReduce BMR", and click "Create Cluster" to enter the cluster creation page. BMR of version 2.0.0 and above supports the integration of Flink components. You can select Flink components when purchasing the cluster, as the following figure shows:

          image.png

          Usage Introduction

          1. Remotely log in to the created cluster

            ssh hdfs@$public_ip
            Use the password entered during cluster creation

          2. Run WordCount step (Kerberos authentication not enabled)
          • Upload a file to HDFS

            hdfs dfs -put /etc/hadoop/conf/core-site.xml /tmp

          • Execute the following command, and submit the step on Yarn:

            flink run --jobmanager yarn-cluster \
            -yn 1 \
            -ytm 1024 \
            -yjm 1024 \
            /opt/bmr/flink/examples/batch/WordCount.jar \
            --input hdfs://$ACTIVE_NAMENODE_HOSTNAME:$PORT/tmp/core-site.xml \
            --output hdfs://$ACTIVE_NAMENODE_HOSTNAME:$PORT/tmp/out
          • The final running outcome is as follows after the step is submitted successfully:

            image.png

          Real-Time Stream Computing (Scala)

          Data computing consumes BMS through BMR’s Flink. The use of Scala (Flink version 1.8.2 and on-line Kafka version 2.1) is taken as an example. The specific steps are as follows:

          Step 1 Create Topic and Download BMS Certificate

          (For more information, please see Spark Streaming Application Scenario)

          Download certificate.

          Step 2 Write Business Codes

          package com.baidu.inf.flink
           
          import java.util.Properties
           
          import org.apache.flink.api.common.serialization.SimpleStringSchema
          import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
          import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
          import org.apache.kafka.common.serialization.StringDeserializer
          import org.slf4j.LoggerFactory
           
          object CloudFlinkConsumeKafkaDemo {
            private val logger = LoggerFactory.getLogger(this.getClass)
           
            def main(args: Array[String]): Unit = {
              logger.info("************ Flink Consume Kafka Demo start **************")
              if (args.length < 7) {
                logger.error(" Parameters Are Missing , " +
                  "Needs : <topic> " +
                  "<groupId> " +
                  "<brokerHosts> " +
                  "<truststore_location> " +
                  "<truststore_pass> " +
                  "<keystore_location> " +
                  "<keystore_pass>")
                System.exit(-1)
              }
              val Array(topic, groupId, brokerHosts,
              truststore_location, truststore_pass,
              keystore_location, keystore_pass, _*) = args
           
              val env = StreamExecutionEnvironment.getExecutionEnvironment
              env.setParallelism(2)
              env.getConfig.disableSysoutLogging
           
              val kafkaProperties = new Properties()
              kafkaProperties.setProperty("bootstrap.servers", brokerHosts)
              kafkaProperties.setProperty("key.deserializer", classOf[StringDeserializer].getName)
              kafkaProperties.setProperty("value.deserializer", classOf[StringDeserializer].getName)
              kafkaProperties.setProperty("group.id", groupId)
              kafkaProperties.setProperty("auto.offset.reset", "latest")
              kafkaProperties.setProperty("serializer.class", "kafka.serializer.StringEncoder")
              kafkaProperties.setProperty("security.protocol", "SSL")
              kafkaProperties.setProperty("ssl.truststore.location", truststore_location)
              kafkaProperties.setProperty("ssl.truststore.password", truststore_pass)
              kafkaProperties.setProperty("ssl.keystore.location", keystore_location)
              kafkaProperties.setProperty("ssl.keystore.password", keystore_pass)
              kafkaProperties.setProperty("enable.auto.commit", "true")
           
              val ds = env.addSource(
                new FlinkKafkaConsumer[String](topic,
                  new SimpleStringSchema(),
                  kafkaProperties))
           
              ds.print()
              env.execute()
            }
          }

          Step 3 Compile Code, Pack Executable Jar Files, and Upload to Server

          (Note: Make sure the certificate downloaded in Step 1 exists under the same path of every node on the cluster)

          Example of running step:
          flink run --jobmanager yarn-cluster -yn 1 -ytm 1024 -yjm 1024 /root/flink-streaming-1.0-SNAPSHOT-jar-with-dependencies.jar "676c4bb9b72c49c7bd3b089c181af9ec__demo02" "group1" "kafka.fsh.baidubce.com:9091" "/tmp/client.truststore.jks" "kafka" "/tmp/client.keystore.jks" "0yw0ckrt"

          Use Tunnel to log in to the cluster’s Yarn page (Use SSH-Tunnel to Access Cluster)

          Find the step’s application in yarn console, and click application name to enter the step details page:
          (On the native page of Flink, click "TaskManagers>Stdout" to view the step running conditions)

          image.png

          References

          1. Flink Application Scenario
          2. Release Notes - Flink 1.8
          Previous
          Zeppelin
          Next
          Druid