基础使用

MapReduce BMR

  • 发行版本
  • 功能发布记录
  • 产品描述
    • 节点类型说明
    • 产品优势
    • 应用场景
    • 产品功能
    • 产品简介
  • Python-SDK
    • Cluster(集群)
    • BmrClient
    • 异常处理
    • InstanceGroup(实例组)
    • 简介
    • 文档更新记录
    • Step(作业)
    • 版本变更记录
    • Instance(实例)
    • 快速入门
    • 安装SDK工具包
  • 开源组件介绍
    • Impala
    • Pig
    • Druid
    • Presto
    • Hue
    • Ooize
    • HBase
    • Kudu
    • Sqoop
    • Hadoop-Streaming
    • Zeppelin
    • Alluxio
    • Kerberos
      • 集群互信配置
      • 概述
    • ClickHouse
      • 常见问题
      • 数据迁移同步
        • 从Spark导入
        • 从Kafka同步数据
        • 将自建ClickHouse数据迁移到云ClickHouse中
        • 从Flink导入
        • 从MySQL导入和同步
        • 从本地数据导入
          • Parquet格式
          • JSON
          • SQL转储
          • CSV and TSV
      • 快速入门
        • 访问模式
        • 客户端登录
        • 创建ClickHouse集群
        • 基础操作
      • 运维相关操作
        • ClickHouse集群扩容
        • ClickHouse集群缩容
        • 日志配置说明
        • 监控告警配置
    • Ranger
      • ranger概述
      • 权限策略配置
    • Paimon
      • Hive示例
      • StarRocks示例
      • 联合查询示例
      • Flink示例
      • Spark示例
    • Flink
      • 基础使用
    • Trino
      • 基础使用
      • 概述
    • Spark
      • 引擎增强
      • 基础使用
    • Hive
      • 开发指南
        • 自定义函数(UDF)
      • 实践操作
        • Hive迁移
        • Hive操作HBase外表
      • 基础使用
        • Hive基础操作
        • Hive连接方式
  • Java-SDK
    • Cluster(集群)
    • 异常
    • BmrClient
    • InstanceGroup(实例组)
    • 日志
    • 文档更新记录
    • 版本更新记录
    • Step(作业)
    • Instance(实例)
    • 快速入门
    • 安装SDK工具包
    • 概述
  • 快速入门
    • 操作流程概览
    • 环境准备
    • 创建集群
    • 数据准备
    • 开发作业
    • 查看结果
    • ClickHouse
      • 导入数据
      • 创建数据库
      • 连接集群
      • 创建表
  • 操作指南
    • 集群模板
    • 服务管理
    • 集群配置
      • 用户管理
      • 弹性伸缩
      • 创建集群
      • 集群安全模式
      • EIP
      • Hive元数据说明
      • 集群审计
      • 配置已有集群
      • 安全组
    • 管理作业
      • 创建作业
      • 诊断、调优
      • 定时任务
      • 查看作业
    • 访问集群
      • 访问集群服务页面
      • 访问集群-openVPN访问集群
      • 使用OpenVPN提交Hadoop作业
      • SSH连接到集群
    • 实践操作
      • 存储数据至HBase
      • 导入数据
      • 编译Maven项目
      • Sqoop导入导出数据
        • 导出数据
    • 权限管理
      • 多用户访问控制
      • 用户管理
    • 集群管理
      • 节点管理
      • 监控报警
      • 集群指标
      • 资源管理
  • 服务等级协议SLA
    • BMR服务等级协议SLA
  • API参考
    • 通用说明
    • 公共头
    • 数据类型
    • 版本更新记录
    • 服务域名
    • 实例操作接口
    • 实例组操作接口
    • 集群操作接口
    • API简介
    • 错误码
  • 常见问题
    • 安全性问题
    • 计费类问题
    • 常见问题总览
    • 性能类问题
    • 配置类问题
    • 故障类问题
  • 视频专区
    • 操作指南
    • 产品介绍
  • 场景教程
    • 流式应用场景
    • 离线应用场景
    • 使用Hive分析网站日志
    • Sqoop应用文档
    • 定时分析日志数据
    • HIVE
      • 不同集群的 Hive 迁移方案
      • Hive 操作 Hbase 外部表
  • 产品定价
    • 转换计费方式
    • 计费项
    • 到期或欠费说明
    • 包年包月计费
    • 续费说明
    • 变更配置计费说明
    • 计费方式
    • 按需计费
    • 账单和用量查询
    • 退款说明
所有文档
menu
没有找到结果,请重新输入

MapReduce BMR

  • 发行版本
  • 功能发布记录
  • 产品描述
    • 节点类型说明
    • 产品优势
    • 应用场景
    • 产品功能
    • 产品简介
  • Python-SDK
    • Cluster(集群)
    • BmrClient
    • 异常处理
    • InstanceGroup(实例组)
    • 简介
    • 文档更新记录
    • Step(作业)
    • 版本变更记录
    • Instance(实例)
    • 快速入门
    • 安装SDK工具包
  • 开源组件介绍
    • Impala
    • Pig
    • Druid
    • Presto
    • Hue
    • Ooize
    • HBase
    • Kudu
    • Sqoop
    • Hadoop-Streaming
    • Zeppelin
    • Alluxio
    • Kerberos
      • 集群互信配置
      • 概述
    • ClickHouse
      • 常见问题
      • 数据迁移同步
        • 从Spark导入
        • 从Kafka同步数据
        • 将自建ClickHouse数据迁移到云ClickHouse中
        • 从Flink导入
        • 从MySQL导入和同步
        • 从本地数据导入
          • Parquet格式
          • JSON
          • SQL转储
          • CSV and TSV
      • 快速入门
        • 访问模式
        • 客户端登录
        • 创建ClickHouse集群
        • 基础操作
      • 运维相关操作
        • ClickHouse集群扩容
        • ClickHouse集群缩容
        • 日志配置说明
        • 监控告警配置
    • Ranger
      • ranger概述
      • 权限策略配置
    • Paimon
      • Hive示例
      • StarRocks示例
      • 联合查询示例
      • Flink示例
      • Spark示例
    • Flink
      • 基础使用
    • Trino
      • 基础使用
      • 概述
    • Spark
      • 引擎增强
      • 基础使用
    • Hive
      • 开发指南
        • 自定义函数(UDF)
      • 实践操作
        • Hive迁移
        • Hive操作HBase外表
      • 基础使用
        • Hive基础操作
        • Hive连接方式
  • Java-SDK
    • Cluster(集群)
    • 异常
    • BmrClient
    • InstanceGroup(实例组)
    • 日志
    • 文档更新记录
    • 版本更新记录
    • Step(作业)
    • Instance(实例)
    • 快速入门
    • 安装SDK工具包
    • 概述
  • 快速入门
    • 操作流程概览
    • 环境准备
    • 创建集群
    • 数据准备
    • 开发作业
    • 查看结果
    • ClickHouse
      • 导入数据
      • 创建数据库
      • 连接集群
      • 创建表
  • 操作指南
    • 集群模板
    • 服务管理
    • 集群配置
      • 用户管理
      • 弹性伸缩
      • 创建集群
      • 集群安全模式
      • EIP
      • Hive元数据说明
      • 集群审计
      • 配置已有集群
      • 安全组
    • 管理作业
      • 创建作业
      • 诊断、调优
      • 定时任务
      • 查看作业
    • 访问集群
      • 访问集群服务页面
      • 访问集群-openVPN访问集群
      • 使用OpenVPN提交Hadoop作业
      • SSH连接到集群
    • 实践操作
      • 存储数据至HBase
      • 导入数据
      • 编译Maven项目
      • Sqoop导入导出数据
        • 导出数据
    • 权限管理
      • 多用户访问控制
      • 用户管理
    • 集群管理
      • 节点管理
      • 监控报警
      • 集群指标
      • 资源管理
  • 服务等级协议SLA
    • BMR服务等级协议SLA
  • API参考
    • 通用说明
    • 公共头
    • 数据类型
    • 版本更新记录
    • 服务域名
    • 实例操作接口
    • 实例组操作接口
    • 集群操作接口
    • API简介
    • 错误码
  • 常见问题
    • 安全性问题
    • 计费类问题
    • 常见问题总览
    • 性能类问题
    • 配置类问题
    • 故障类问题
  • 视频专区
    • 操作指南
    • 产品介绍
  • 场景教程
    • 流式应用场景
    • 离线应用场景
    • 使用Hive分析网站日志
    • Sqoop应用文档
    • 定时分析日志数据
    • HIVE
      • 不同集群的 Hive 迁移方案
      • Hive 操作 Hbase 外部表
  • 产品定价
    • 转换计费方式
    • 计费项
    • 到期或欠费说明
    • 包年包月计费
    • 续费说明
    • 变更配置计费说明
    • 计费方式
    • 按需计费
    • 账单和用量查询
    • 退款说明
  • 文档中心
  • arrow
  • MapReduceBMR
  • arrow
  • 开源组件介绍
  • arrow
  • Flink
  • arrow
  • 基础使用
本页目录
  • 简介
  • 作业提交
  • 查看作业状态
  • 使用示例
  • 第一步 创建Topic并下载百度消息服务的证书
  • 第二步 编写业务代码
  • 第三步 编译代码,打成可执行Jar文件,上传到服务器上
  • 第四步 消息队列中生产一些消息,在Flink作业监控页面上查看对应输出

基础使用

更新时间:2025-08-21

简介

Apache Flink 是一个开源的框架和分布式处理引擎,用于在无边界和有边界数据流上进行有状态的计算。它能够处理实时数据流和批处理数据,具有高吞吐量、低延迟和容错性强的特点。Flink 能在所有常见的集群环境中运行,并能以内存速度和任意规模进行计算。

常见应用场景:

  1. 事件驱动型应用:
  • 反欺诈:实时检测异常交易行为。
  • 异常检测:监控系统状态,实时发现异常。
  1. 数据分析应用:
  • 流数据分析:实时分析数据流,提取有价值的信息。
  • 实时报表分析:实时生成报表,展示关键指标的变化。
  1. 数据管道与ETL:
  • 实时ETL:Flink 提供丰富的Connector,支持多种数据源和数据Sink,能够实时处理数据管道。
  • 实时数仓:支持分钟级或秒级的数据更新,便于实时查询和分析。

作业提交

  1. 登录百度智能云控制台,选择“产品>MapReduce BMR”,单击“创建集群”,进入集群创建页,可选服务中勾选 Flink 服务。

注意:BMR2.1.1及以上版本支持 Flink。不同BMR版本对应支持的Flink组件版本也不同,具体支持版本以选择BMR版本后可选服务中组件版本为准。

  1. SSH登录集群 ,参考SSH连接到集群。
  2. 执行以下命令,上传文件至HDFS,本示例以 Flink 作业示例 WordCount 为例:
Plain Text
1hdfs dfs -put /etc/hadoop/conf/core-site.xml /tmp
  1. 执行以下命令,提交作业:
Plain Text
1flink run --jobmanager yarn-cluster \
2-yn 1 \
3-ytm 1024 \
4-yjm 1024 \
5/opt/bmr/flink/examples/batch/WordCount.jar \
6--input hdfs://$ACTIVE_NAMENODE_HOSTNAME:$PORT/tmp/core-site.xml \
7--output hdfs://$ACTIVE_NAMENODE_HOSTNAME:$PORT/tmp/out
  1. 结果查看

image (8).png

查看作业状态

  1. 登录百度云控制台,进入 BMR 集群列表,单击集群名称/ID>>集群详情>>相关应用和工具>>Hadoop Yarn Web UI,进入 Web UI。
  2. 单击 Flink 任务的 Application ID。
  3. 进入详情页面后,单击 Tracking URL 的链接。
  4. 进入 Apache Flink Dashboard 页面,即可查看作业的状态。

使用示例

通过BMR的Flink消费百度消息服务BMS。本文以使用Scala为例,Flink版本1.8.2,线上Kafka版本2.1。具体步骤如下:

第一步 创建Topic并下载百度消息服务的证书

(本步骤详情请参考文档 Spark流式应用场景)

下载证书:

image.png

第二步 编写业务代码

Plain Text
1package com.baidu.inf.flink
2 
3import java.util.Properties
4 
5import org.apache.flink.api.common.serialization.SimpleStringSchema
6import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
7import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
8import org.apache.kafka.common.serialization.StringDeserializer
9import org.slf4j.LoggerFactory
10 
11object CloudFlinkConsumeKafkaDemo {
12  private val logger = LoggerFactory.getLogger(this.getClass)
13 
14  def main(args: Array[String]): Unit = {
15    logger.info("************ Flink Consume Kafka Demo start **************")
16    if (args.length < 7) {
17      logger.error(" Parameters Are Missing , " +
18        "Needs : <topic> " +
19        "<groupId> " +
20        "<brokerHosts> " +
21        "<truststore_location> " +
22        "<truststore_pass> " +
23        "<keystore_location> " +
24        "<keystore_pass>")
25      System.exit(-1)
26    }
27    val Array(topic, groupId, brokerHosts,
28    truststore_location, truststore_pass,
29    keystore_location, keystore_pass, _*) = args
30 
31    val env = StreamExecutionEnvironment.getExecutionEnvironment
32    env.setParallelism(2)
33    env.getConfig.disableSysoutLogging
34 
35    val kafkaProperties = new Properties()
36    kafkaProperties.setProperty("bootstrap.servers", brokerHosts)
37    kafkaProperties.setProperty("key.deserializer", classOf[StringDeserializer].getName)
38    kafkaProperties.setProperty("value.deserializer", classOf[StringDeserializer].getName)
39    kafkaProperties.setProperty("group.id", groupId)
40    kafkaProperties.setProperty("auto.offset.reset", "latest")
41    kafkaProperties.setProperty("serializer.class", "kafka.serializer.StringEncoder")
42    kafkaProperties.setProperty("security.protocol", "SSL")
43    kafkaProperties.setProperty("ssl.truststore.location", truststore_location)
44    kafkaProperties.setProperty("ssl.truststore.password", truststore_pass)
45    kafkaProperties.setProperty("ssl.keystore.location", keystore_location)
46    kafkaProperties.setProperty("ssl.keystore.password", keystore_pass)
47    kafkaProperties.setProperty("enable.auto.commit", "true")
48 
49    val ds = env.addSource(
50      new FlinkKafkaConsumer[String](topic,
51        new SimpleStringSchema(),
52        kafkaProperties))
53 
54    ds.print()
55    env.execute()
56  }
57}

第三步 编译代码,打成可执行Jar文件,上传到服务器上

(注:要保证第一步下载的证书文件在集群每个节点上相同的路径下都存在)

运行作业示例:
flink run --jobmanager yarn-cluster -yn 1 -ytm 1024 -yjm 1024 /root/flink-streaming-1.0-SNAPSHOT-jar-with-dependencies.jar "676c4bb9b72c49c7bd3b089c181af9ec__demo02" "group1" "kafka.fsh.baidubce.com:9091" "/tmp/client.truststore.jks" "kafka" "/tmp/client.keystore.jks" "0yw0ckrt"

第四步 消息队列中生产一些消息,在Flink作业监控页面上查看对应输出

通过Tunnel登录到集群的Yarn页面上(通过SSH-Tunnel访问集群)

在yarn console找到对应作业的application的单击application名称,进入作业详情页面:
(在Flink的原生页面上,点击TaskManagers > Stdout,查看作业运行情况)

image.png

上一篇
Paimon
下一篇
Trino