          Streaming Application Scenario

          Scenarios Description

          This scenario is used for data streaming processing, and BLS (Baidu Log Service), BMS (Baidu Messaging System), and BMR (Baidu MapReduce) are used.


          The whole process consists of data collection and data computing.

          Data Collection

          Data collection is realized via the BLS and Baidu Messaging System (BMS).

          Step 1 Create BMS Topic

          For more information, please see the documentation: Create BMS Topic.

          Currently, BMS supports “North China - Beijing”, “South China - Guangzhou” and “Hong Kong region 2” regions. Before creating a topic, you can select different regions as needed.

          Step 2 Install BLS Collector

          For more information, please see the documentation Install Collector.

          1. Select "Install Collector", and click "Copy" after selecting the operating system;


          2. Log in to the CVM server, which needs log transmission, and execute "Copy" installation command under root privilege.

          Step 3 Create BLS Transmission Task

          The specific steps are as follows:

          1. In the transmission task list page, click "Create Transmission Task" to enter the page of creating transmission task;
          2. Enter task name in "Task Information" section;
          3. In "Source Setup" section, select and configure source types according to types of source data;
          4. In "Destination Setup" section, select "Kafka" as log delivery destination;
          5. In "CVM Server List" section, click "Add CVM Server", and select CVM server with "Collector" installed;
          6. In "CVM Server List" section, select CVM server which needs to deploy transmission task, and click "Create";

          For operating steps, please see the documentation: Create Transmission Task.

          Data Computing (Python)

          The data computing process is connected to Baidu Messaging System through BMR’s Spark Streaming. The use of PySpark (Spark version 1.6 and on-line Kafka version 0.10) is taken as an example. The specific steps are as follows:

          Step 1 Create BMR Spark Cluster

          For more information, please see the documentation: Create Cluster

          Note: In "Cluster Configuration" section, select "Spark" built-in template, and check Spark.

          Step 2 Download Spark Kafka Streaming Dependency

          # obtain eip from instance list in BMR Console cluster details page
          ssh root@eip
          # switch to hdfs user
          su hdfs
          # download dependency
          wget http://bmr-public-bj.bj.bcebos.com/sample/spark-streaming-kafka-0-10-assembly_2.10-1.6.0.jar


          Obtain cluster login public IP.

          Step 3 Write Spark Streaming Program

          Taking Kafka_wordcount for example, please delete annotation before use:

          from __future__ import print_function
          import sys
          import ConfigParser
          from pyspark import SparkContext
          from pyspark.streaming import StreamingContext
          from pyspark.streaming.kafka010 import KafkaUtils
          from pyspark.streaming.kafka010 import PreferConsistent
          from pyspark.streaming.kafka010 import Subscribe
          # read configuration files
          def read_config(file_name):
              cf = ConfigParser.ConfigParser()
              # read config file
              # read kafka config
              section = "kafka"
              opts = cf.options(section)
              config = {}
              for opt in opts:
                  # topics should be a list
                  if opt == "topics":
                      config[opt] = str.split(cf.get(section, opt), ",")
                      config[opt] = cf.get(section, opt)
              return config
          if __name__ == "__main__":
              if len(sys.argv) != 3:
                  print("Usage: kafka_wordcount.py <bootstrap-server> <topic>", file=sys.stderr)
              # build SparkContext and StreamingContext, and demo processing interval is 20s
              sc = SparkContext(appName="PythonStreamingKafkaWordCount")
              ssc = StreamingContext(sc, 20)
              # read configuration file test.conf, and obtain Baidu Kafka connection parameters
              config_file = "test.conf"
              kafkaParams = read_config(config_file)
              # build kafka input stream
              topics = kafkaParams["topics"]
              kvs = KafkaUtils.createDirectStream(ssc, PreferConsistent(), Subscribe(topics, kafkaParams))
              lines = kvs.map(lambda x: x[1])
              counts = lines.flatMap(lambda line: line.split(" ")) \
                  .map(lambda word: (word, 1)) \
                  .reduceByKey(lambda a, b: a+b)
              # enable StreamingContext

          Step 4 Download BMS Certificate

          Download the certificate.

          Step 5 Create Kafka Connection Configuration Files

          Take “test.conf” as an example:

          vi test.conf
          # write following content
          bootstrap.servers = kafka.bj.baidubce.com:9091
          topics = test_for_demo
          group.id = test
          # the following SSL configuration is replaced according to content of client.properties
          security.protocol = SSL
          ssl.truststore.password = test_truststore_password
          ssl.truststore.location = client.truststore.jks
          ssl.keystore.location = client.keystore.jks
          ssl.keystore.password = test_keystore_password

          Parameter Description:

          bootstrap.servers: kafka service address
          topics: Topics to consume are separated with commas, for example, topic1, topic2, and topic3
          group.id: To avoid conflict with other users, you cannot set the consumer group’s id at random, and kafka service supports groupId isolation in the future.

          For more configurations, please see:http://kafka.apache.org/0100/documentation.html#newconsumerconfigs

          Step 6 Submit Streaming Steps

          Based on above four steps, the current directory (hdfs home directory) has five files: test.conf, client.truststore.jks, client.keystore.jks, kafka_wordcount.py, and spark-streaming-kafka-0-10-assembly_2.10-1.6.0.jar:

          Use spark-submit to submit streaming step:

          /usr/bin/spark-submit --master yarn --deploy-mode cluster  --files test.conf,client.keystore.jks,client.truststore.jks --jars spark-streaming-kafka-0-10-assembly_2.10-1.6.0.jar kafka_wordcount.py

          Step 7 Show Step Outputs

          In "Cluster Details Page", click "Hadoop Yarn Web UI" to open yarn console:


          In the yarn console, you can see application logs to view the program’s output. Taking kafka_wordcount for example, you can check output in stdout:


          1. In case of stopping any step, you can use “yarn application -kill applicationId” command, for example:

            yarn application -kill application_1488868742896_0002

          2. If multiple steps are running, you need to modify group.id configuration in test.conf

          Data Computing (Scala)

          The data computing process is connected to Baidu Messaging System (BMS) through BMR’s Spark Streaming. The use of Scala (Spark version 2.1 and on-line Kafka version 0.10) is taken as an example. The specific steps are as follows:

          Step 1 Create BMR Spark Cluster

          For more information, please see the documentation: Create Cluster

          Note: In "Cluster Configuration" section, select "Spark2" built-in template, and check Spark.

          Step 2 Download Spark Kafka Streaming Dependency

          # obtain eip from instance list in BMR Console cluster details page
          ssh root@eip
          # switch to hdfs user
          su hdfs
          # download dependency
          wget https://bmr-public-bj.bj.bcebos.com/sample/original-kafke-read-streaming-1.0-SNAPSHOT.jar

          Step 3 Download BMS Certificate

          Download the certificate.

          Step 4 Create Kafka Connection Configuration Files

          Take “test.conf” for example:

          vi test.conf
          # write following content
          bootstrap.servers = kafka.bj.baidubce.com:9091
          topics = test_for_demo
          group.id = test
          # the following SSL configuration is replaced according to content of client.properties
          security.protocol = SSL
          ssl.truststore.password = test_truststore_password
          ssl.truststore.location = client.truststore.jks
          ssl.keystore.location = client.keystore.jks
          ssl.keystore.password = test_keystore_password

          Step 5 Submit Streaming Steps

          Based on the above four steps, the current directory has four files: test.conf, client.truststore.jks, client.keystore.jks and


          Use spark-submit to submit streaming step:

          spark-submit --class com.baidu.inf.spark.WordCount --master yarn --deploy-mode cluster  --files test.conf,client.keystore.jks,client.truststore.jks ./original-kafke-read-streaming-1.0-SNAPSHOT.jar "$topics" "$bootstrap.servers" "$group.id"  "$ssl.truststore.password" "$ssl.keystore.password"

          (Note: Please replace last 5 parameter values in the command with actual values, i.e., fields described in test.conf)



          spark-submit --class com.baidu.inf.spark.WordCount --master yarn --deploy-mode cluster --files test.conf,client.keystore.jks,client.truststore.jks ./original-kafke-read-streaming-1.0-SNAPSHOT.jar "868313b92dbe474b80ee4ef0904df26d__test" "kafka.bj.baidubce.com:9091" "test" "kafka" "k7ynher0"

          WordCount example code is provided here:

          package com.baidu.inf.spark
          import org.apache.kafka.common.serialization.StringDeserializer
          import org.apache.spark.SparkConf
          import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
          import org.apache.spark.streaming.kafka010.KafkaUtils
          import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
          import org.apache.spark.streaming.{Seconds, StreamingContext}
          object WordCount {
            def className = this.getClass.getName.stripSuffix("$")
            def main(args: Array[String]): Unit = {
          if (args.length < 4) {
              s"Usage:Input Params: "
                + " <topic> "
                + " <bootstrap.servers> "
                + " <group.id> "
                + " <ssl.truststore.password>"
                + " <ssl.keystore.password>"
          val Array(topic, bootstrap, group, 
          truststore, keystore, _*) = args
          val conf = new SparkConf().setAppName(className).setIfMissing("spark.master", "local[2]")
          conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
          val ssc = new StreamingContext(conf, Seconds(5))
          val kafkaParams = Map[String, Object](
            "bootstrap.servers" -> bootstrap,
            "key.deserializer" -> classOf[StringDeserializer].getName,
            "value.deserializer" -> classOf[StringDeserializer].getName,
            "group.id" -> group,
            "auto.offset.reset" -> "latest",
            "serializer.class" -> "kafka.serializer.StringEncoder",
            "ssl.truststore.location" -> "client.truststore.jks",
            "ssl.keystore.location" -> "client.keystore.jks",
            "security.protocol" -> "SSL",
            "ssl.truststore.password" -> truststore,
            "ssl.keystore.password" -> keystore,
            "enable.auto.commit" -> (true: java.lang.Boolean)
          val topics = Array(topic)
          // consume kafka data
          val stream = KafkaUtils.createDirectStream[String, String](
            Subscribe[String, String](topics, kafkaParams)
          ).map(record => record.value())
          val counts = stream.flatMap(_.split(" "))
            .map(word => (word, 1))
            .reduceByKey(_ + _)
            ssc.stop(true, true)

          Step 6 Show Step Outputs

