Baidu AI Cloud
中国站

百度智能云

MapReduce

Streaming Application Scenario

Scenarios Description

This scenario is used for data streaming processing, and BLS (Baidu Log Service), BMS (Baidu Messaging System), and BMR (Baidu MapReduce) are used.

image.png

The whole process consists of data collection and data computing.

Data Collection

Data collection is realized via the BLS and Baidu Messaging System (BMS).

Step 1 Create BMS Topic

For more information, please see the documentation: Create BMS Topic.

Currently, BMS supports “North China - Beijing”, “South China - Guangzhou” and “Hong Kong region 2” regions. Before creating a topic, you can select different regions as needed.

Step 2 Install BLS Collector

For more information, please see the documentation Install Collector.

  1. Select "Install Collector", and click "Copy" after selecting the operating system;

    image.png

  2. Log in to the CVM server, which needs log transmission, and execute "Copy" installation command under root privilege.

Step 3 Create BLS Transmission Task

The specific steps are as follows:

  1. In the transmission task list page, click "Create Transmission Task" to enter the page of creating transmission task;
  2. Enter task name in "Task Information" section;
  3. In "Source Setup" section, select and configure source types according to types of source data;
  4. In "Destination Setup" section, select "Kafka" as log delivery destination;
  5. In "CVM Server List" section, click "Add CVM Server", and select CVM server with "Collector" installed;
  6. In "CVM Server List" section, select CVM server which needs to deploy transmission task, and click "Create";

For operating steps, please see the documentation: Create Transmission Task.

Currently, BMS supports “North China - Beijing” and “South China - Guangzhou” regions. Before creating a topic, you can select different regions as needed.

image.png

Data Computing (Python)

The data computing process is connected to Baidu Messaging System through BMR’s Spark Streaming. The use of PySpark (Spark version 1.6 and on-line Kafka version 0.10) is taken as an example. The specific steps are as follows:

Step 1 Create BMR Spark Cluster

For more information, please see the documentation: Create Cluster

Note: In "Cluster Configuration" section, select "Spark" built-in template, and check Spark.

Step 2 Download Spark Kafka Streaming Dependency

# obtain eip from instance list in BMR Console cluster details page
ssh root@eip

# switch to hdfs user
su hdfs
cd

# download dependency
wget http://bmr-public-bj.bj.bcebos.com/sample/spark-streaming-kafka-0-10-assembly_2.10-1.6.0.jar

Remarks

Obtain cluster login public IP.

Step 3 Write Spark Streaming Program

Taking Kafka_wordcount for example, please delete annotation before use:

from __future__ import print_function

import sys
import ConfigParser

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka010 import KafkaUtils
from pyspark.streaming.kafka010 import PreferConsistent
from pyspark.streaming.kafka010 import Subscribe

# read configuration files
def read_config(file_name):
    cf = ConfigParser.ConfigParser()
    # read config file
    cf.read(file_name)
    # read kafka config
    section = "kafka"
    opts = cf.options(section)
    config = {}
    for opt in opts:
        # topics should be a list
        if opt == "topics":
            config[opt] = str.split(cf.get(section, opt), ",")
        else:
            config[opt] = cf.get(section, opt)
    return config

if __name__ == "__main__":
    """
    if len(sys.argv) != 3:
        print("Usage: kafka_wordcount.py <bootstrap-server> <topic>", file=sys.stderr)
        exit(-1)
    """
    # build SparkContext and StreamingContext, and demo processing interval is 20s
    sc = SparkContext(appName="PythonStreamingKafkaWordCount")
    ssc = StreamingContext(sc, 20)

    # read configuration file test.conf, and obtain Baidu Kafka connection parameters
    config_file = "test.conf"
    kafkaParams = read_config(config_file)

    # build kafka input stream
    topics = kafkaParams["topics"]
    kvs = KafkaUtils.createDirectStream(ssc, PreferConsistent(), Subscribe(topics, kafkaParams))
    lines = kvs.map(lambda x: x[1])
    counts = lines.flatMap(lambda line: line.split(" ")) \
        .map(lambda word: (word, 1)) \
        .reduceByKey(lambda a, b: a+b)
    counts.pprint()

    # enable StreamingContext
    ssc.start()
    ssc.awaitTermination()

Step 4 Download BMS Certificate

Download the certificate.

Step 5 Create Kafka Connection Configuration Files

Take “test.conf” as an example:

vi test.conf

# write following content
[kafka]
bootstrap.servers = kafka.bj.baidubce.com:9091
topics = test_for_demo
group.id = test
# the following SSL configuration is replaced according to content of client.properties
security.protocol = SSL
ssl.truststore.password = test_truststore_password
ssl.truststore.location = client.truststore.jks
ssl.keystore.location = client.keystore.jks
ssl.keystore.password = test_keystore_password

Parameter Description:

bootstrap.servers: kafka service address
topics: Topics to consume are separated with commas, for example, topic1, topic2, and topic3
group.id: To avoid conflict with other users, you cannot set the consumer group’s id at random, and kafka service supports groupId isolation in the future.

For more configurations, please see:http://kafka.apache.org/0100/documentation.html#newconsumerconfigs

Step 6 Submit Streaming Steps

Based on above four steps, the current directory (hdfs home directory) has five files: test.conf, client.truststore.jks, client.keystore.jks, kafka_wordcount.py, and spark-streaming-kafka-0-10-assembly_2.10-1.6.0.jar:

Use spark-submit to submit streaming step:

/usr/bin/spark-submit --master yarn --deploy-mode cluster  --files test.conf,client.keystore.jks,client.truststore.jks --jars spark-streaming-kafka-0-10-assembly_2.10-1.6.0.jar kafka_wordcount.py

Step 7 Show Step Outputs

In "Cluster Details Page", click "Hadoop Yarn Web UI" to open yarn console:

image.png

In the yarn console, you can see application logs to view the program’s output. Taking kafka_wordcount for example, you can check output in stdout:

Notes

  1. In case of stopping any step, you can use “yarn application -kill applicationId” command, for example:

    yarn application -kill application_1488868742896_0002

  2. If multiple steps are running, you need to modify group.id configuration in test.conf

Data Computing (Scala)

The data computing process is connected to Baidu Messaging System (BMS) through BMR’s Spark Streaming. The use of Scala (Spark version 2.1 and on-line Kafka version 0.10) is taken as an example. The specific steps are as follows:

Step 1 Create BMR Spark Cluster

For more information, please see the documentation: Create Cluster

Note: In "Cluster Configuration" section, select "Spark2" built-in template, and check Spark.

Step 2 Download Spark Kafka Streaming Dependency

# obtain eip from instance list in BMR Console cluster details page
ssh root@eip

# switch to hdfs user
su hdfs
cd

# download dependency
wget https://bmr-public-bj.bj.bcebos.com/sample/original-kafke-read-streaming-1.0-SNAPSHOT.jar

Step 3 Download BMS Certificate

Download the certificate.

Step 4 Create Kafka Connection Configuration Files

Take “test.conf” for example:

vi test.conf

# write following content
[kafka]
bootstrap.servers = kafka.bj.baidubce.com:9091
topics = test_for_demo
group.id = test
# the following SSL configuration is replaced according to content of client.properties
security.protocol = SSL
ssl.truststore.password = test_truststore_password
ssl.truststore.location = client.truststore.jks
ssl.keystore.location = client.keystore.jks
ssl.keystore.password = test_keystore_password

Parameter Description:

bootstrap.servers: kafka service address
topics: Topics to consume are separated with commas, for example, topic1, topic2, and topic3
group.id: To avoid conflict with other users, you cannot set the consumer group’s id at random, and kafka service supports groupId isolation in the future.

For more configurations, please see:http://kafka.apache.org/0100/documentation.html#newconsumerconfigs

Step 5 Submit Streaming Steps

Based on the above four steps, the current directory has four files: test.conf, client.truststore.jks, client.keystore.jks and

image.png

Use spark-submit to submit streaming step:

spark-submit --class com.baidu.inf.spark.WordCount --master yarn --deploy-mode cluster  --files test.conf,client.keystore.jks,client.truststore.jks ./original-kafke-read-streaming-1.0-SNAPSHOT.jar "$topics" "$bootstrap.servers" "$group.id"  "$ssl.truststore.password" "$ssl.keystore.password"

(Note: Please replace last 5 parameter values in the command with actual values, i.e., fields described in test.conf)

Example:

image.png

spark-submit --class com.baidu.inf.spark.WordCount --master yarn --deploy-mode cluster --files test.conf,client.keystore.jks,client.truststore.jks ./original-kafke-read-streaming-1.0-SNAPSHOT.jar "868313b92dbe474b80ee4ef0904df26d__test" "kafka.bj.baidubce.com:9091" "test" "kafka" "k7ynher0"

WordCount example code is provided here:

package com.baidu.inf.spark

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.{Seconds, StreamingContext}

object WordCount {
  def className = this.getClass.getName.stripSuffix("$")

  def main(args: Array[String]): Unit = {
if (args.length < 4) {
  System.err.println(
    s"Usage:Input Params: "
      + " <topic> "
      + " <bootstrap.servers> "
      + " <group.id> "
      + " <ssl.truststore.password>"
      + " <ssl.keystore.password>"
  )
  sys.exit(1)
}
val Array(topic, bootstrap, group, 
truststore, keystore, _*) = args

val conf = new SparkConf().setAppName(className).setIfMissing("spark.master", "local[2]")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

val ssc = new StreamingContext(conf, Seconds(5))

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> bootstrap,
  "key.deserializer" -> classOf[StringDeserializer].getName,
  "value.deserializer" -> classOf[StringDeserializer].getName,
  "group.id" -> group,
  "auto.offset.reset" -> "latest",
  "serializer.class" -> "kafka.serializer.StringEncoder",
  "ssl.truststore.location" -> "client.truststore.jks",
  "ssl.keystore.location" -> "client.keystore.jks",
  "security.protocol" -> "SSL",
  "ssl.truststore.password" -> truststore,
  "ssl.keystore.password" -> keystore,
  "enable.auto.commit" -> (true: java.lang.Boolean)
)
ssc.sparkContext.setLogLevel("WARN")

val topics = Array(topic)
// consume kafka data
val stream = KafkaUtils.createDirectStream[String, String](
  ssc,
  PreferConsistent,~~~~
  Subscribe[String, String](topics, kafkaParams)
).map(record => record.value())

val counts = stream.flatMap(_.split(" "))
  .map(word => (word, 1))
  .reduceByKey(_ + _)
  
  counts.print()
  
  ssc.start()
  ssc.awaitTermination()
  ssc.stop(true, true)
  }
 }

Step 6 Show Step Outputs

In "Cluster Details Page", click "Hadoop Yarn Web UI" to open yarn console:

image.png

In the yarn console, you can see application logs to view the program’s output. Taking kafka_wordcount for example, you can check output in stdout:

image.png

image.png

image.png

image.png

Notes

  1. In case of stopping any step, you can use “yarn application -kill applicationId” command, for example:

    yarn application -kill application_1488868742896_0002

  2. If multiple steps are running, you need to modify group.id configuration in test.conf
Previous
Sqoop Application Documentations
Next
Off-line Application Scenario