流式应用场景

MapReduce BMR

  • 发行版本
  • 功能发布记录
  • 产品描述
    • 节点类型说明
    • 产品优势
    • 应用场景
    • 产品功能
    • 产品简介
  • Python-SDK
    • Cluster(集群)
    • BmrClient
    • 异常处理
    • InstanceGroup(实例组)
    • 简介
    • 文档更新记录
    • Step(作业)
    • 版本变更记录
    • Instance(实例)
    • 快速入门
    • 安装SDK工具包
  • 开源组件介绍
    • Impala
    • Pig
    • Druid
    • Presto
    • Hue
    • Ooize
    • HBase
    • Kudu
    • Sqoop
    • Hadoop-Streaming
    • Zeppelin
    • Alluxio
    • Kerberos
      • 集群互信配置
      • 概述
    • ClickHouse
      • 常见问题
      • 数据迁移同步
        • 从Spark导入
        • 从Kafka同步数据
        • 将自建ClickHouse数据迁移到云ClickHouse中
        • 从Flink导入
        • 从MySQL导入和同步
        • 从本地数据导入
          • Parquet格式
          • JSON
          • SQL转储
          • CSV and TSV
      • 快速入门
        • 访问模式
        • 客户端登录
        • 创建ClickHouse集群
        • 基础操作
      • 运维相关操作
        • ClickHouse集群扩容
        • ClickHouse集群缩容
        • 日志配置说明
        • 监控告警配置
    • Ranger
      • ranger概述
      • 权限策略配置
    • Paimon
      • Hive示例
      • StarRocks示例
      • 联合查询示例
      • Flink示例
      • Spark示例
    • Flink
      • 基础使用
    • Trino
      • 基础使用
      • 概述
    • Spark
      • 引擎增强
      • 基础使用
    • Hive
      • 开发指南
        • 自定义函数(UDF)
      • 实践操作
        • Hive迁移
        • Hive操作HBase外表
      • 基础使用
        • Hive基础操作
        • Hive连接方式
  • Java-SDK
    • Cluster(集群)
    • 异常
    • BmrClient
    • InstanceGroup(实例组)
    • 日志
    • 文档更新记录
    • 版本更新记录
    • Step(作业)
    • Instance(实例)
    • 快速入门
    • 安装SDK工具包
    • 概述
  • 快速入门
    • 操作流程概览
    • 环境准备
    • 创建集群
    • 数据准备
    • 开发作业
    • 查看结果
    • ClickHouse
      • 导入数据
      • 创建数据库
      • 连接集群
      • 创建表
  • 操作指南
    • 集群模板
    • 服务管理
    • 集群配置
      • 用户管理
      • 弹性伸缩
      • 创建集群
      • 集群安全模式
      • EIP
      • Hive元数据说明
      • 集群审计
      • 配置已有集群
      • 安全组
    • 管理作业
      • 创建作业
      • 诊断、调优
      • 定时任务
      • 查看作业
    • 访问集群
      • 访问集群服务页面
      • 访问集群-openVPN访问集群
      • 使用OpenVPN提交Hadoop作业
      • SSH连接到集群
    • 实践操作
      • 存储数据至HBase
      • 导入数据
      • 编译Maven项目
      • Sqoop导入导出数据
        • 导出数据
    • 权限管理
      • 多用户访问控制
      • 用户管理
    • 集群管理
      • 节点管理
      • 监控报警
      • 集群指标
      • 资源管理
  • 服务等级协议SLA
    • BMR服务等级协议SLA
  • API参考
    • 通用说明
    • 公共头
    • 数据类型
    • 版本更新记录
    • 服务域名
    • 实例操作接口
    • 实例组操作接口
    • 集群操作接口
    • API简介
    • 错误码
  • 常见问题
    • 安全性问题
    • 计费类问题
    • 常见问题总览
    • 性能类问题
    • 配置类问题
    • 故障类问题
  • 视频专区
    • 操作指南
    • 产品介绍
  • 场景教程
    • 流式应用场景
    • 离线应用场景
    • 使用Hive分析网站日志
    • Sqoop应用文档
    • 定时分析日志数据
    • HIVE
      • 不同集群的 Hive 迁移方案
      • Hive 操作 Hbase 外部表
  • 产品定价
    • 转换计费方式
    • 计费项
    • 到期或欠费说明
    • 包年包月计费
    • 续费说明
    • 变更配置计费说明
    • 计费方式
    • 按需计费
    • 账单和用量查询
    • 退款说明
所有文档
menu
没有找到结果,请重新输入

MapReduce BMR

  • 发行版本
  • 功能发布记录
  • 产品描述
    • 节点类型说明
    • 产品优势
    • 应用场景
    • 产品功能
    • 产品简介
  • Python-SDK
    • Cluster(集群)
    • BmrClient
    • 异常处理
    • InstanceGroup(实例组)
    • 简介
    • 文档更新记录
    • Step(作业)
    • 版本变更记录
    • Instance(实例)
    • 快速入门
    • 安装SDK工具包
  • 开源组件介绍
    • Impala
    • Pig
    • Druid
    • Presto
    • Hue
    • Ooize
    • HBase
    • Kudu
    • Sqoop
    • Hadoop-Streaming
    • Zeppelin
    • Alluxio
    • Kerberos
      • 集群互信配置
      • 概述
    • ClickHouse
      • 常见问题
      • 数据迁移同步
        • 从Spark导入
        • 从Kafka同步数据
        • 将自建ClickHouse数据迁移到云ClickHouse中
        • 从Flink导入
        • 从MySQL导入和同步
        • 从本地数据导入
          • Parquet格式
          • JSON
          • SQL转储
          • CSV and TSV
      • 快速入门
        • 访问模式
        • 客户端登录
        • 创建ClickHouse集群
        • 基础操作
      • 运维相关操作
        • ClickHouse集群扩容
        • ClickHouse集群缩容
        • 日志配置说明
        • 监控告警配置
    • Ranger
      • ranger概述
      • 权限策略配置
    • Paimon
      • Hive示例
      • StarRocks示例
      • 联合查询示例
      • Flink示例
      • Spark示例
    • Flink
      • 基础使用
    • Trino
      • 基础使用
      • 概述
    • Spark
      • 引擎增强
      • 基础使用
    • Hive
      • 开发指南
        • 自定义函数(UDF)
      • 实践操作
        • Hive迁移
        • Hive操作HBase外表
      • 基础使用
        • Hive基础操作
        • Hive连接方式
  • Java-SDK
    • Cluster(集群)
    • 异常
    • BmrClient
    • InstanceGroup(实例组)
    • 日志
    • 文档更新记录
    • 版本更新记录
    • Step(作业)
    • Instance(实例)
    • 快速入门
    • 安装SDK工具包
    • 概述
  • 快速入门
    • 操作流程概览
    • 环境准备
    • 创建集群
    • 数据准备
    • 开发作业
    • 查看结果
    • ClickHouse
      • 导入数据
      • 创建数据库
      • 连接集群
      • 创建表
  • 操作指南
    • 集群模板
    • 服务管理
    • 集群配置
      • 用户管理
      • 弹性伸缩
      • 创建集群
      • 集群安全模式
      • EIP
      • Hive元数据说明
      • 集群审计
      • 配置已有集群
      • 安全组
    • 管理作业
      • 创建作业
      • 诊断、调优
      • 定时任务
      • 查看作业
    • 访问集群
      • 访问集群服务页面
      • 访问集群-openVPN访问集群
      • 使用OpenVPN提交Hadoop作业
      • SSH连接到集群
    • 实践操作
      • 存储数据至HBase
      • 导入数据
      • 编译Maven项目
      • Sqoop导入导出数据
        • 导出数据
    • 权限管理
      • 多用户访问控制
      • 用户管理
    • 集群管理
      • 节点管理
      • 监控报警
      • 集群指标
      • 资源管理
  • 服务等级协议SLA
    • BMR服务等级协议SLA
  • API参考
    • 通用说明
    • 公共头
    • 数据类型
    • 版本更新记录
    • 服务域名
    • 实例操作接口
    • 实例组操作接口
    • 集群操作接口
    • API简介
    • 错误码
  • 常见问题
    • 安全性问题
    • 计费类问题
    • 常见问题总览
    • 性能类问题
    • 配置类问题
    • 故障类问题
  • 视频专区
    • 操作指南
    • 产品介绍
  • 场景教程
    • 流式应用场景
    • 离线应用场景
    • 使用Hive分析网站日志
    • Sqoop应用文档
    • 定时分析日志数据
    • HIVE
      • 不同集群的 Hive 迁移方案
      • Hive 操作 Hbase 外部表
  • 产品定价
    • 转换计费方式
    • 计费项
    • 到期或欠费说明
    • 包年包月计费
    • 续费说明
    • 变更配置计费说明
    • 计费方式
    • 按需计费
    • 账单和用量查询
    • 退款说明
  • 文档中心
  • arrow
  • MapReduceBMR
  • arrow
  • 场景教程
  • arrow
  • 流式应用场景
本页目录
  • 概览
  • 需求场景
  • 事件流
  • 持续计算
  • 方案概述
  • 数据采集
  • 创建消息服务BMS Topic
  • 安装BLS收集器
  • 创建BLS传输任务
  • 数据计算(Python)
  • 创建BMR Spark集群
  • 下载Spark Kafka Streaming依赖
  • 编写Spark Streaming程序
  • 下载百度消息服务的证书
  • 创建连接Kafka配置文件
  • 提交Streaming作业
  • 查看作业输出
  • 注意事项
  • 数据计算(Scala)
  • 创建BMR Spark集群
  • 下载Spark Kafka Streaming依赖
  • 下载百度消息服务的证书
  • 创建连接Kafka配置文件
  • 提交Streaming作业
  • 查看作业输出
  • 注意事项
  • 相关产品

流式应用场景

更新时间:2025-08-21

概览

实现云上流式场景下数据流打通,方便用户在百度智能云上使用各个产品实现流式需求,实现流式数据处理全流程。

需求场景

事件流

事件流具能够持续产生大量的数据,这类数据最早出现与传统的银行和股票交易领域,也在互联网监控、无线通信网等领域出现、需要以近实时的方式对更新数据流进行复杂分析如趋势分析、预测、监控等。简单来说,事件流采用的是查询保持静态,语句是固定的,数据不断变化的方式。

持续计算

比如对于大型网站的流式数据:网站的访问PV/UV、用户访问了什么内容、搜索了什么内容等,实时的数据计算和分析可以动态实时地刷新用户访问数据,展示网站实时流量的变化情况,分析每天各小时的流量和用户分布情况;比如金融行业,毫秒级延迟的需求至关重要。一些需要实时处理数据的场景也可以应用Flink/Kafka,比如根据用户行为产生的日志文件进行实时分析,对用户进行商品的实时推荐等。

方案概述

本场景应用于数据流式处理,使用到BLS(百度Log Service)、BMS(百度消息服务)以及BMR(MapReduce)三个产品。

整个流程分为数据采集和数据计算两部分。

数据采集

数据采集过程通过BLS以及百度消息服务BMS实现。

创建消息服务BMS Topic

参考文档:创建BMS主题。

目前百度消息服务BMS支持“华北-北京”、“华南-广州”以及“香港2区”三个地区,创建主题前可以根据具体需求选择不同的区域。

安装BLS收集器

参考文档:安装收集器

  1. 选择“收集器安装”,选择相应的操作系统后,点击“复制”;
  2. 登录需要传输日志的主机,在root权限下执行所“复制”的安装命令。

创建BLS传输任务

具体步骤为: 1. 在传输任务列表页面,点击“创建传输任务”,进入创建传输任务页面; 2. 在“任务信息”区,输入任务名称; 3. 在“源端设置”区,根据源数据类型,选择不通的源端类型以及进行相应的配置; 4. 在“目的端设置”区,选择“Kafka”作为日志投递目的; 5. 在“主机列表”区,点击“添加主机”,选择安装好“收集器”的主机; 6. 在“主机列表”区,选择需部署该传输任务的主机,点击“创建”;

详细操作步骤请参考文档:创建传输任务。

目前百度消息服务支持“华北-北京”、“华南-广州”两个地区,创建topic前可以根据具体需求选择不同的区域。

数据计算(Python)

数据计算过程通过BMR的Spark Streaming连接百度消息服务。本文以使用PySpark为例,Spark版本1.6,线上Kafka版本0.10。具体步骤如下:

创建BMR Spark集群

参考文档:创建集群

注意:在“集群配置”区,选择“Spark”内置模板,并将Spark选上。

image.png

下载Spark Kafka Streaming依赖

Plain Text
1# eip可以在BMR Console集群详情页的实例列表获取
2ssh root@eip
3
4# 切换到hdfs用户
5su hdfs
6cd
7
8# 下载依赖
9wget http://bmr-public-bj.bj.bcebos.com/sample/spark-streaming-kafka-0-10-assembly_2.10-1.6.0.jar

备注

获取集群登录公网IP

image.png

编写Spark Streaming程序

以Kafka_wordcount为例,使用前请删除文中注释:

Plain Text
1from __future__ import print_function
2
3import sys
4import ConfigParser
5
6from pyspark import SparkContext
7from pyspark.streaming import StreamingContext
8from pyspark.streaming.kafka010 import KafkaUtils
9from pyspark.streaming.kafka010 import PreferConsistent
10from pyspark.streaming.kafka010 import Subscribe
11
12# 读取配置文件
13def read_config(file_name):
14    cf = ConfigParser.ConfigParser()
15    # read config file
16    cf.read(file_name)
17    # read kafka config
18    section = "kafka"
19    opts = cf.options(section)
20    config = {}
21    for opt in opts:
22        # topics should be a list
23        if opt == "topics":
24            config[opt] = str.split(cf.get(section, opt), ",")
25        else:
26            config[opt] = cf.get(section, opt)
27    return config
28
29if __name__ == "__main__":
30    """
31    if len(sys.argv) != 3:
32        print("Usage: kafka_wordcount.py <bootstrap-server> <topic>", file=sys.stderr)
33        exit(-1)
34    """
35    # 建立SparkContext和StreamingContext,demo处理间隔为20s
36    sc = SparkContext(appName="PythonStreamingKafkaWordCount")
37    ssc = StreamingContext(sc, 20)
38
39    # 读取配置文件test.conf,获取连接百度Kafka参数
40    config_file = "test.conf"
41    kafkaParams = read_config(config_file)
42
43    # 建立kafka输入流
44    topics = kafkaParams["topics"]
45    kvs = KafkaUtils.createDirectStream(ssc, PreferConsistent(), Subscribe(topics, kafkaParams))
46    lines = kvs.map(lambda x: x[1])
47    counts = lines.flatMap(lambda line: line.split(" ")) \
48        .map(lambda word: (word, 1)) \
49        .reduceByKey(lambda a, b: a+b)
50    counts.pprint()
51
52    # 启动StreamingContext
53    ssc.start()
54    ssc.awaitTermination()

下载百度消息服务的证书

下载证书:

image.png

创建连接Kafka配置文件

以“test.conf"为例:

Plain Text
1vi test.conf
2
3# 写入如下内容
4[kafka]
5bootstrap.servers = kafka.bj.baidubce.com:9091
6topics = test_for_demo
7group.id = test
8# 以下为SSL配置,根据client.properties中的内容进行更换
9security.protocol = SSL
10ssl.truststore.password = test_truststore_password
11ssl.truststore.location = client.truststore.jks
12ssl.keystore.location = client.keystore.jks
13ssl.keystore.password = test_keystore_password

参数说明:

Plain Text
1bootstrap.servers: kafka服务地址
2topics: 需要消费的topic,如需消费多个topic,以逗号分割,如topic1,topic2,topic3
3group.id: consumer group的id,请不要随意设置,以免与其他用户冲突(kafka服务未来将支持groupId隔离)

更多配置见:http://kafka.apache.org/0100/documentation.html#newconsumerconfigs

提交Streaming作业

按照如上四步,当前目录(hdfs home目录)有五个文件:test.conf、client.truststore.jks、client.keystore.jks、kafka_wordcount.py、spark-streaming-kafka-0-10-assembly_2.10-1.6.0.jar:

使用spark-submit提交streaming作业:

Plain Text
1/usr/bin/spark-submit --master yarn --deploy-mode cluster  --files test.conf,client.keystore.jks,client.truststore.jks --jars spark-streaming-kafka-0-10-assembly_2.10-1.6.0.jar kafka_wordcount.py

查看作业输出

在“集群详情页”点开“Hadoop Yarn Web UI”,即可打开yarn console:

image.png

在yarn console可以查看对应application的日志,用以查看程序的输出,以kafka_wordcount为例,输出在stdout中:

注意事项

  1. 如果需要停掉某个作业,可以使用“yarn application -kill applicationId”命令,例如:

    Plain Text
    1yarn application -kill application_1488868742896_0002
  2. 同时跑多个作业,请注意修改test.conf中的group.id配置

数据计算(Scala)

数据计算过程通过BMR的Spark Streaming连接百度消息服务BMS。本文以使用Scala为例,Spark版本2.1,线上Kafka版本0.10。具体步骤如下:

创建BMR Spark集群

参考文档:创建集群

注意:在“集群配置”区,选择“Spark2”内置模板,并将Spark选上。

image.png

下载Spark Kafka Streaming依赖

Plain Text
1# eip可以在BMR Console集群详情页的实例列表获取
2ssh root@eip
3
4# 切换到hdfs用户
5su hdfs
6cd
7
8# 下载依赖
9wget https://bmr-public-bj.bj.bcebos.com/sample/original-kafke-read-streaming-1.0-SNAPSHOT.jar

下载百度消息服务的证书

下载证书:

image.png

创建连接Kafka配置文件

以“test.conf"为例:

Plain Text
1vi test.conf
2
3# 写入如下内容
4[kafka]
5bootstrap.servers = kafka.bj.baidubce.com:9091
6topics = test_for_demo
7group.id = test
8# 以下为SSL配置,根据client.properties中的内容进行更换
9security.protocol = SSL
10ssl.truststore.password = test_truststore_password
11ssl.truststore.location = client.truststore.jks
12ssl.keystore.location = client.keystore.jks
13ssl.keystore.password = test_keystore_password

参数说明:

Plain Text
1bootstrap.servers: kafka服务地址
2topics: 需要消费的topic,如需消费多个topic,以逗号分割,如topic1,topic2,topic3
3group.id: consumer group的id,请不要随意设置,以免与其他用户冲突(kafka服务未来将支持groupId隔离)

更多配置见:http://kafka.apache.org/0100/documentation.html#newconsumerconfigs

提交Streaming作业

按照如上四步,当前目录有四个文件:test.conf、client.truststore.jks、client.keystore.jks、

image.png

使用spark-submit提交streaming作业:

Plain Text
1spark-submit --class com.baidu.inf.spark.WordCount --master yarn --deploy-mode cluster  --files test.conf,client.keystore.jks,client.truststore.jks ./original-kafke-read-streaming-1.0-SNAPSHOT.jar "$topics" "$bootstrap.servers" "$group.id"  "$ssl.truststore.password" "$ssl.keystore.password"

(注意:这里请替换掉命令中最后面5个参数值为实际的值,即文件test.conf中描述的字段。)

例子:

image.png

Plain Text
1spark-submit --class com.baidu.inf.spark.WordCount --master yarn --deploy-mode cluster --files test.conf,client.keystore.jks,client.truststore.jks ./original-kafke-read-streaming-1.0-SNAPSHOT.jar "868313b92dbe474b80ee4ef0904df26d__test" "kafka.bj.baidubce.com:9091" "test" "kafka" "k7ynher0"

附上WordCount 示例代码:

Plain Text
1package com.baidu.inf.spark
2
3import org.apache.kafka.common.serialization.StringDeserializer
4import org.apache.spark.SparkConf
5import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
6import org.apache.spark.streaming.kafka010.KafkaUtils
7import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
8import org.apache.spark.streaming.{Seconds, StreamingContext}
9
10object WordCount {
11  def className = this.getClass.getName.stripSuffix("$")
12
13  def main(args: Array[String]): Unit = {
14if (args.length < 4) {
15  System.err.println(
16    s"Usage:Input Params: "
17      + " <topic> "
18      + " <bootstrap.servers> "
19      + " <group.id> "
20      + " <ssl.truststore.password>"
21      + " <ssl.keystore.password>"
22  )
23  sys.exit(1)
24}
25val Array(topic, bootstrap, group, 
26truststore, keystore, _*) = args
27
28val conf = new SparkConf().setAppName(className).setIfMissing("spark.master", "local[2]")
29conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
30
31val ssc = new StreamingContext(conf, Seconds(5))
32
33val kafkaParams = Map[String, Object](
34  "bootstrap.servers" -> bootstrap,
35  "key.deserializer" -> classOf[StringDeserializer].getName,
36  "value.deserializer" -> classOf[StringDeserializer].getName,
37  "group.id" -> group,
38  "auto.offset.reset" -> "latest",
39  "serializer.class" -> "kafka.serializer.StringEncoder",
40  "ssl.truststore.location" -> "client.truststore.jks",
41  "ssl.keystore.location" -> "client.keystore.jks",
42  "security.protocol" -> "SSL",
43  "ssl.truststore.password" -> truststore,
44  "ssl.keystore.password" -> keystore,
45  "enable.auto.commit" -> (true: java.lang.Boolean)
46)
47ssc.sparkContext.setLogLevel("WARN")
48
49val topics = Array(topic)
50// 消费kafka数据
51val stream = KafkaUtils.createDirectStream[String, String](
52  ssc,
53  PreferConsistent,~~~~
54  Subscribe[String, String](topics, kafkaParams)
55).map(record => record.value())
56
57val counts = stream.flatMap(_.split(" "))
58  .map(word => (word, 1))
59  .reduceByKey(_ + _)
60  
61  counts.print()
62  
63  ssc.start()
64  ssc.awaitTermination()
65  ssc.stop(true, true)
66  }
67 }

查看作业输出

在“集群详情页”点开“Hadoop Yarn Web UI”,即可打开yarn console:

image.png

在yarn console可以查看对应application的日志,用以查看程序的输出,以kafka_wordcount为例,输出在stdout中:

image.png

image.png

image.png

image.png

注意事项

  1. 如果需要停掉某个作业,可以使用“yarn application -kill applicationId”命令,例如:

    Plain Text
    1yarn application -kill application_1488868742896_0002
  2. 同时跑多个作业,请注意修改test.conf中的group.id配置

相关产品

BLS(百度Log Service)、BMS(百度消息服务)以及BMR(MapReduce)、弹性公网IP(EIP)。

上一篇
视频专区
下一篇
离线应用场景