Hadoop-Streaming

MapReduce BMR

  • 发行版本
  • 功能发布记录
  • 产品描述
    • 节点类型说明
    • 产品优势
    • 应用场景
    • 产品功能
    • 产品简介
  • Python-SDK
    • Cluster(集群)
    • BmrClient
    • 异常处理
    • InstanceGroup(实例组)
    • 简介
    • 文档更新记录
    • Step(作业)
    • 版本变更记录
    • Instance(实例)
    • 快速入门
    • 安装SDK工具包
  • 开源组件介绍
    • Impala
    • Pig
    • Druid
    • Presto
    • Hue
    • Ooize
    • HBase
    • Kudu
    • Sqoop
    • Hadoop-Streaming
    • Zeppelin
    • Alluxio
    • Kerberos
      • 集群互信配置
      • 概述
    • ClickHouse
      • 常见问题
      • 数据迁移同步
        • 从Spark导入
        • 从Kafka同步数据
        • 将自建ClickHouse数据迁移到云ClickHouse中
        • 从Flink导入
        • 从MySQL导入和同步
        • 从本地数据导入
          • Parquet格式
          • JSON
          • SQL转储
          • CSV and TSV
      • 快速入门
        • 访问模式
        • 客户端登录
        • 创建ClickHouse集群
        • 基础操作
      • 运维相关操作
        • ClickHouse集群扩容
        • ClickHouse集群缩容
        • 日志配置说明
        • 监控告警配置
    • Ranger
      • ranger概述
      • 权限策略配置
    • Paimon
      • Hive示例
      • StarRocks示例
      • 联合查询示例
      • Flink示例
      • Spark示例
    • Flink
      • 基础使用
    • Trino
      • 基础使用
      • 概述
    • Spark
      • 引擎增强
      • 基础使用
    • Hive
      • 开发指南
        • 自定义函数(UDF)
      • 实践操作
        • Hive迁移
        • Hive操作HBase外表
      • 基础使用
        • Hive基础操作
        • Hive连接方式
  • Java-SDK
    • Cluster(集群)
    • 异常
    • BmrClient
    • InstanceGroup(实例组)
    • 日志
    • 文档更新记录
    • 版本更新记录
    • Step(作业)
    • Instance(实例)
    • 快速入门
    • 安装SDK工具包
    • 概述
  • 快速入门
    • 操作流程概览
    • 环境准备
    • 创建集群
    • 数据准备
    • 开发作业
    • 查看结果
    • ClickHouse
      • 导入数据
      • 创建数据库
      • 连接集群
      • 创建表
  • 操作指南
    • 集群模板
    • 服务管理
    • 集群配置
      • 用户管理
      • 弹性伸缩
      • 创建集群
      • 集群安全模式
      • EIP
      • Hive元数据说明
      • 集群审计
      • 配置已有集群
      • 安全组
    • 管理作业
      • 创建作业
      • 诊断、调优
      • 定时任务
      • 查看作业
    • 访问集群
      • 访问集群服务页面
      • 访问集群-openVPN访问集群
      • 使用OpenVPN提交Hadoop作业
      • SSH连接到集群
    • 实践操作
      • 存储数据至HBase
      • 导入数据
      • 编译Maven项目
      • Sqoop导入导出数据
        • 导出数据
    • 权限管理
      • 多用户访问控制
      • 用户管理
    • 集群管理
      • 节点管理
      • 监控报警
      • 集群指标
      • 资源管理
  • 服务等级协议SLA
    • BMR服务等级协议SLA
  • API参考
    • 通用说明
    • 公共头
    • 数据类型
    • 版本更新记录
    • 服务域名
    • 实例操作接口
    • 实例组操作接口
    • 集群操作接口
    • API简介
    • 错误码
  • 常见问题
    • 安全性问题
    • 计费类问题
    • 常见问题总览
    • 性能类问题
    • 配置类问题
    • 故障类问题
  • 视频专区
    • 操作指南
    • 产品介绍
  • 场景教程
    • 流式应用场景
    • 离线应用场景
    • 使用Hive分析网站日志
    • Sqoop应用文档
    • 定时分析日志数据
    • HIVE
      • 不同集群的 Hive 迁移方案
      • Hive 操作 Hbase 外部表
  • 产品定价
    • 转换计费方式
    • 计费项
    • 到期或欠费说明
    • 包年包月计费
    • 续费说明
    • 变更配置计费说明
    • 计费方式
    • 按需计费
    • 账单和用量查询
    • 退款说明
所有文档
menu
没有找到结果,请重新输入

MapReduce BMR

  • 发行版本
  • 功能发布记录
  • 产品描述
    • 节点类型说明
    • 产品优势
    • 应用场景
    • 产品功能
    • 产品简介
  • Python-SDK
    • Cluster(集群)
    • BmrClient
    • 异常处理
    • InstanceGroup(实例组)
    • 简介
    • 文档更新记录
    • Step(作业)
    • 版本变更记录
    • Instance(实例)
    • 快速入门
    • 安装SDK工具包
  • 开源组件介绍
    • Impala
    • Pig
    • Druid
    • Presto
    • Hue
    • Ooize
    • HBase
    • Kudu
    • Sqoop
    • Hadoop-Streaming
    • Zeppelin
    • Alluxio
    • Kerberos
      • 集群互信配置
      • 概述
    • ClickHouse
      • 常见问题
      • 数据迁移同步
        • 从Spark导入
        • 从Kafka同步数据
        • 将自建ClickHouse数据迁移到云ClickHouse中
        • 从Flink导入
        • 从MySQL导入和同步
        • 从本地数据导入
          • Parquet格式
          • JSON
          • SQL转储
          • CSV and TSV
      • 快速入门
        • 访问模式
        • 客户端登录
        • 创建ClickHouse集群
        • 基础操作
      • 运维相关操作
        • ClickHouse集群扩容
        • ClickHouse集群缩容
        • 日志配置说明
        • 监控告警配置
    • Ranger
      • ranger概述
      • 权限策略配置
    • Paimon
      • Hive示例
      • StarRocks示例
      • 联合查询示例
      • Flink示例
      • Spark示例
    • Flink
      • 基础使用
    • Trino
      • 基础使用
      • 概述
    • Spark
      • 引擎增强
      • 基础使用
    • Hive
      • 开发指南
        • 自定义函数(UDF)
      • 实践操作
        • Hive迁移
        • Hive操作HBase外表
      • 基础使用
        • Hive基础操作
        • Hive连接方式
  • Java-SDK
    • Cluster(集群)
    • 异常
    • BmrClient
    • InstanceGroup(实例组)
    • 日志
    • 文档更新记录
    • 版本更新记录
    • Step(作业)
    • Instance(实例)
    • 快速入门
    • 安装SDK工具包
    • 概述
  • 快速入门
    • 操作流程概览
    • 环境准备
    • 创建集群
    • 数据准备
    • 开发作业
    • 查看结果
    • ClickHouse
      • 导入数据
      • 创建数据库
      • 连接集群
      • 创建表
  • 操作指南
    • 集群模板
    • 服务管理
    • 集群配置
      • 用户管理
      • 弹性伸缩
      • 创建集群
      • 集群安全模式
      • EIP
      • Hive元数据说明
      • 集群审计
      • 配置已有集群
      • 安全组
    • 管理作业
      • 创建作业
      • 诊断、调优
      • 定时任务
      • 查看作业
    • 访问集群
      • 访问集群服务页面
      • 访问集群-openVPN访问集群
      • 使用OpenVPN提交Hadoop作业
      • SSH连接到集群
    • 实践操作
      • 存储数据至HBase
      • 导入数据
      • 编译Maven项目
      • Sqoop导入导出数据
        • 导出数据
    • 权限管理
      • 多用户访问控制
      • 用户管理
    • 集群管理
      • 节点管理
      • 监控报警
      • 集群指标
      • 资源管理
  • 服务等级协议SLA
    • BMR服务等级协议SLA
  • API参考
    • 通用说明
    • 公共头
    • 数据类型
    • 版本更新记录
    • 服务域名
    • 实例操作接口
    • 实例组操作接口
    • 集群操作接口
    • API简介
    • 错误码
  • 常见问题
    • 安全性问题
    • 计费类问题
    • 常见问题总览
    • 性能类问题
    • 配置类问题
    • 故障类问题
  • 视频专区
    • 操作指南
    • 产品介绍
  • 场景教程
    • 流式应用场景
    • 离线应用场景
    • 使用Hive分析网站日志
    • Sqoop应用文档
    • 定时分析日志数据
    • HIVE
      • 不同集群的 Hive 迁移方案
      • Hive 操作 Hbase 外部表
  • 产品定价
    • 转换计费方式
    • 计费项
    • 到期或欠费说明
    • 包年包月计费
    • 续费说明
    • 变更配置计费说明
    • 计费方式
    • 按需计费
    • 账单和用量查询
    • 退款说明
  • 文档中心
  • arrow
  • MapReduceBMR
  • arrow
  • 开源组件介绍
  • arrow
  • Hadoop-Streaming
本页目录
  • Hadoop Streaming简介
  • 程序准备
  • Mapper.py程序:
  • Reducer.py程序:
  • 集群准备
  • 运行作业
  • 查看结果
  • 分布式缓存导入数据
  • 应用场景
  • 支持的文件类型
  • 操作步骤

Hadoop-Streaming

更新时间:2025-08-21

Hadoop Streaming简介

本文以分析Web日志统计每日请求量为例,介绍如何在百度智能云平台使用Hadoop Streaming。

在BMR集群中,您可以使用python、shell、C++等任何您熟悉的编程语言开发Hadoop Streaming作业。Hadoop Streaming是Hadoop提供的编程工具,允许用户使用任何可执行文件或者脚本文件作为Mapper和Reducer,它将Mapper和Reducer文件封装成MapReduce作业并提交运行,让您无须打包即可快速实现MapReduce功能,满足复杂的业务需求。

程序准备

Mapper.py程序:

Python
1    #!/usr/bin/env python
2    import sys
3    import re
4    separator = "\\s{1}"
5    ipAddr = "(\\S+)"
6    remoteUser = "(.*?)"
7    timeLocal = "\\[(.*?)\\]"
8    request = "(.*?)"
9    status = "(\\d{3})"
10    bodyBytesSent = "(\\S+)"
11    httpReferer = "(.*?)"
12    httpCookie = "(.*?)"
13    httpUserAgent = "(.*?)"
14    requestTime = "(\\S+)"
15    host = "(\\S+)"
16    msec = "(\\S+)"
17    p = ipAddr + separator + "-" + separator + timeLocal\
18        + separator + request + separator + status + separator\
19        + bodyBytesSent + separator + httpReferer + separator + httpCookie\
20        + separator + remoteUser + separator + httpUserAgent + separator\
21        + requestTime + separator + host + separator + msec
22    for line in sys.stdin:
23        line = line.strip()
24        m = re.match(p, line)
25        if m is not None:
26            print '%s\t%s' % (m.group(2).split(":")[0], 1)

Reducer.py程序:

Python
1    #!/usr/bin/env python
2    import sys
3    from operator import itemgetter
4    sum_result = {}
5    for line in sys.stdin:
6        line = line.strip()
7        key, value = line.split()
8        try:
9            value = int(value)
10            sum_result[key] = sum_result.get(key, 0) + value
11        except ValueError:
12            pass
13    sorted_sum_result = sorted(sum_result.items(), key=itemgetter(0))
14    for key, value in sorted_sum_result:
15        print '%s\t%s'% (key, value)

集群准备

  1. 准备数据,请参考数据准备;
  2. 百度智能云环境准备;
  3. 登录控制台,选择“产品服务->MapReduce BMR”,点击“创建集群”,进入集群创建页,并做如下配置:

    • 设置集群名称
    • 设置管理员密码
    • 关闭日志开关
    • 选择集群版本“BMR1.6.0”
    • 选择集群类型“hadoop”
  4. 请保持集群的其他默认配置不变,点击“完成”可在集群列表页查看已创建的集群,当集群状态由“初始化中”变为“空闲中”时,集群创建成功。

运行作业

  1. 在“产品服务>MapReduce>MapReduce-作业列表”页中,点击“创建作业”,进入创建作业页。
  2. 配置Streaming作业参数:

    • 作业类型:选择“Streaming作业”;
    • 作业名称:输入作业名称,长度不可超过255个字符;
    • Mapper:若使用您自己的代码,请上传程序至BOS或者您本地的HDFS中,并在此输入程序路径;您也可直接使用百度智能云提供的样例程序,路径如下: 华北-北京区域的BMR集群对应的样例程序路径:bos://bmr-public-bj/sample/streaming-1.0-mapper.py 华南-广州区域的BMR集群对应的样例程序路径:bos://bmr-public-gz/sample/streaming-1.0-mapper.py
    • Reducer:若使用您自己的代码,请上传程序至BOS或者您本地的HDFS中,并在此输入程序路径;您也可直接使用百度智能云提供的样例程序,路径如下: 华北-北京区域的BMR集群对应的样例程序路径:bos://bmr-public-bj/sample/streaming-1.0-reducer.py 华南-广州区域的BMR集群对应的样例程序路径:bos://bmr-public-gz/sample/streaming-1.0-reducer.py
    • BOS输入地址:bos://bmr-public-bj/data/log/accesslog-1k.log
    • BOS输出地址:输出路径必须具有写权限且该路径不能已存在,bos://{your-bucket}/output
    • 失败后操作:继续;
    • 应用程序参数:无。
  3. 在“集群适配”区,选择适配的集群。
  4. 点击“完成”,则作业创建成功;运行中的作业状态会由“等待中”更新为“运行中”,当作业运行完毕后,状态会更新为“已完成”,即可查看到结果了。

查看结果

您可以到您所选的存储系统(BOS或HDFS)中查看输出结果,以下是在BOS中查看输出结果的说明:

用户到bos://{your-bucket}/output路径下查看输出,如果使用系统提供的输入数据和程序,可以打开输出结果看到如下内容:

Plain Text
103/Oct/2015    139
204/Oct/2015    375
305/Oct/2015    372
406/Oct/2015    114

分布式缓存导入数据

分布式缓存是Hadoop提供的文件缓存工具,它能够自动将指定的文件分发到运行Map或Reduce任务的各个节点上,并缓存到本地,供用户程序读取使用。在Map或Reduce时,如需访问通用数据,利用分布式缓存可显著提高访问效率。下面以Streaming作业为例说明如何使用Hadoop分布式缓存。

应用场景

分布式缓存通常用在当MapReduce需要依赖一些特定的版本库,或者MapReduce需要访问第三方库或文件时,若将这些库或文件安装到集群各个机器上通常比较麻烦,这时可以使用分布式缓存将其分发到集群各个节点上,程序运行完后,Hadoop自动将其删除。

例如下述的场景,运用分布式缓存会很好的解决问题:

  • 分发字典文件:Map或者Reduce需要用到一些外部字典时,可使用分发字典文件,比如黑白名单、词表等。
  • 自动化软件部署:MapReduce需依赖于特定版本的库时,可使用自动化软件部署,比如依赖于某个版本的PHP解释器,可以运用分布式缓存进行上传分发。

支持的文件类型

分布式缓存支持单个文件和打包文件,支持以下压缩格式:

Plain Text
1* zip
2* tgz
3* tar.gz
4* tar
5* jar

操作步骤

  1. 您需要先将文件上传到BOS,可参考BOS上传Object;
  2. 提交作业时通过指定参数,为缓存文件指定一个软链接,在Mapper和Reducer中通过调用软连接即可访问实际的文件内容;

    • -files:通常用来上传单个文件,可将指定文件分发到各个Task的工作目录下,不做其他处理;如需上传多个文件,文件之间用逗号隔开;
    • -archives:通常用来上传打包文件,可将指定文件分发到各个Task的工作目录下,并对后缀名为“.jar”、“.zip”,“.tar.gz”、“.tgz”的文件自动解压,默认情况下,解压后的内容存放到工作目录下名称为解压前文件名的目录中;如需上传多个打包文件,文件之间用逗号隔开。
文件类型 操作 说明
添加单个文件 输入参数:-files+文件在BOS中的位置+井号(#)+文件在缓存中的软链接名称 -files bos://bucket_name/file_name#file_name
添加打包文件 输入参数:-archives+文件在BOS中的位置+井号(#)+文件在缓存中的软链接名称 -archives bos://bucket_name/archive_name#archive_name

例如:

  • 对于某一个文件mydata.txt,将其上传到BOS文件系统,路径为bos://mybucket/mydata.txt,提交MapReduce Streaming作业时在参数中指定 -files bos://mybucket/mydata.txt#data,在MapTask和ReduceTask中就可以通过 ./data 来访问所需的文件了;
  • 对于某一个文件libA.so,将其打包到mylib.tar.gz文件中,并上传到BOS文件系统,路径为bos://mybucket/mylib.tar.gz,提交MapReduce Streaming作业时在参数中指定 -archives bos://mybucket/mylib.tar.gz#libs,在MapTask和ReduceTask中就可以通过./libs/libA.so来访问所需的文件了。

上一篇
Sqoop
下一篇
Zeppelin