基础使用

MapReduce BMR

  • 发行版本
  • 功能发布记录
  • 产品描述
    • 节点类型说明
    • 产品优势
    • 应用场景
    • 产品功能
    • 产品简介
  • Python-SDK
    • Cluster(集群)
    • BmrClient
    • 异常处理
    • InstanceGroup(实例组)
    • 简介
    • 文档更新记录
    • Step(作业)
    • 版本变更记录
    • Instance(实例)
    • 快速入门
    • 安装SDK工具包
  • 开源组件介绍
    • Impala
    • Pig
    • Druid
    • Presto
    • Hue
    • Ooize
    • HBase
    • Kudu
    • Sqoop
    • Hadoop-Streaming
    • Zeppelin
    • Alluxio
    • Kerberos
      • 集群互信配置
      • 概述
    • ClickHouse
      • 常见问题
      • 数据迁移同步
        • 从Spark导入
        • 从Kafka同步数据
        • 将自建ClickHouse数据迁移到云ClickHouse中
        • 从Flink导入
        • 从MySQL导入和同步
        • 从本地数据导入
          • Parquet格式
          • JSON
          • SQL转储
          • CSV and TSV
      • 快速入门
        • 访问模式
        • 客户端登录
        • 创建ClickHouse集群
        • 基础操作
      • 运维相关操作
        • ClickHouse集群扩容
        • ClickHouse集群缩容
        • 日志配置说明
        • 监控告警配置
    • Ranger
      • ranger概述
      • 权限策略配置
    • Paimon
      • Hive示例
      • StarRocks示例
      • 联合查询示例
      • Flink示例
      • Spark示例
    • Flink
      • 基础使用
    • Trino
      • 基础使用
      • 概述
    • Spark
      • 引擎增强
      • 基础使用
    • Hive
      • 开发指南
        • 自定义函数(UDF)
      • 实践操作
        • Hive迁移
        • Hive操作HBase外表
      • 基础使用
        • Hive基础操作
        • Hive连接方式
  • Java-SDK
    • Cluster(集群)
    • 异常
    • BmrClient
    • InstanceGroup(实例组)
    • 日志
    • 文档更新记录
    • 版本更新记录
    • Step(作业)
    • Instance(实例)
    • 快速入门
    • 安装SDK工具包
    • 概述
  • 快速入门
    • 操作流程概览
    • 环境准备
    • 创建集群
    • 数据准备
    • 开发作业
    • 查看结果
    • ClickHouse
      • 导入数据
      • 创建数据库
      • 连接集群
      • 创建表
  • 操作指南
    • 集群模板
    • 服务管理
    • 集群配置
      • 用户管理
      • 弹性伸缩
      • 创建集群
      • 集群安全模式
      • EIP
      • Hive元数据说明
      • 集群审计
      • 配置已有集群
      • 安全组
    • 管理作业
      • 创建作业
      • 诊断、调优
      • 定时任务
      • 查看作业
    • 访问集群
      • 访问集群服务页面
      • 访问集群-openVPN访问集群
      • 使用OpenVPN提交Hadoop作业
      • SSH连接到集群
    • 实践操作
      • 存储数据至HBase
      • 导入数据
      • 编译Maven项目
      • Sqoop导入导出数据
        • 导出数据
    • 权限管理
      • 多用户访问控制
      • 用户管理
    • 集群管理
      • 节点管理
      • 监控报警
      • 集群指标
      • 资源管理
  • 服务等级协议SLA
    • BMR服务等级协议SLA
  • API参考
    • 通用说明
    • 公共头
    • 数据类型
    • 版本更新记录
    • 服务域名
    • 实例操作接口
    • 实例组操作接口
    • 集群操作接口
    • API简介
    • 错误码
  • 常见问题
    • 安全性问题
    • 计费类问题
    • 常见问题总览
    • 性能类问题
    • 配置类问题
    • 故障类问题
  • 视频专区
    • 操作指南
    • 产品介绍
  • 场景教程
    • 流式应用场景
    • 离线应用场景
    • 使用Hive分析网站日志
    • Sqoop应用文档
    • 定时分析日志数据
    • HIVE
      • 不同集群的 Hive 迁移方案
      • Hive 操作 Hbase 外部表
  • 产品定价
    • 转换计费方式
    • 计费项
    • 到期或欠费说明
    • 包年包月计费
    • 续费说明
    • 变更配置计费说明
    • 计费方式
    • 按需计费
    • 账单和用量查询
    • 退款说明
所有文档
menu
没有找到结果,请重新输入

MapReduce BMR

  • 发行版本
  • 功能发布记录
  • 产品描述
    • 节点类型说明
    • 产品优势
    • 应用场景
    • 产品功能
    • 产品简介
  • Python-SDK
    • Cluster(集群)
    • BmrClient
    • 异常处理
    • InstanceGroup(实例组)
    • 简介
    • 文档更新记录
    • Step(作业)
    • 版本变更记录
    • Instance(实例)
    • 快速入门
    • 安装SDK工具包
  • 开源组件介绍
    • Impala
    • Pig
    • Druid
    • Presto
    • Hue
    • Ooize
    • HBase
    • Kudu
    • Sqoop
    • Hadoop-Streaming
    • Zeppelin
    • Alluxio
    • Kerberos
      • 集群互信配置
      • 概述
    • ClickHouse
      • 常见问题
      • 数据迁移同步
        • 从Spark导入
        • 从Kafka同步数据
        • 将自建ClickHouse数据迁移到云ClickHouse中
        • 从Flink导入
        • 从MySQL导入和同步
        • 从本地数据导入
          • Parquet格式
          • JSON
          • SQL转储
          • CSV and TSV
      • 快速入门
        • 访问模式
        • 客户端登录
        • 创建ClickHouse集群
        • 基础操作
      • 运维相关操作
        • ClickHouse集群扩容
        • ClickHouse集群缩容
        • 日志配置说明
        • 监控告警配置
    • Ranger
      • ranger概述
      • 权限策略配置
    • Paimon
      • Hive示例
      • StarRocks示例
      • 联合查询示例
      • Flink示例
      • Spark示例
    • Flink
      • 基础使用
    • Trino
      • 基础使用
      • 概述
    • Spark
      • 引擎增强
      • 基础使用
    • Hive
      • 开发指南
        • 自定义函数(UDF)
      • 实践操作
        • Hive迁移
        • Hive操作HBase外表
      • 基础使用
        • Hive基础操作
        • Hive连接方式
  • Java-SDK
    • Cluster(集群)
    • 异常
    • BmrClient
    • InstanceGroup(实例组)
    • 日志
    • 文档更新记录
    • 版本更新记录
    • Step(作业)
    • Instance(实例)
    • 快速入门
    • 安装SDK工具包
    • 概述
  • 快速入门
    • 操作流程概览
    • 环境准备
    • 创建集群
    • 数据准备
    • 开发作业
    • 查看结果
    • ClickHouse
      • 导入数据
      • 创建数据库
      • 连接集群
      • 创建表
  • 操作指南
    • 集群模板
    • 服务管理
    • 集群配置
      • 用户管理
      • 弹性伸缩
      • 创建集群
      • 集群安全模式
      • EIP
      • Hive元数据说明
      • 集群审计
      • 配置已有集群
      • 安全组
    • 管理作业
      • 创建作业
      • 诊断、调优
      • 定时任务
      • 查看作业
    • 访问集群
      • 访问集群服务页面
      • 访问集群-openVPN访问集群
      • 使用OpenVPN提交Hadoop作业
      • SSH连接到集群
    • 实践操作
      • 存储数据至HBase
      • 导入数据
      • 编译Maven项目
      • Sqoop导入导出数据
        • 导出数据
    • 权限管理
      • 多用户访问控制
      • 用户管理
    • 集群管理
      • 节点管理
      • 监控报警
      • 集群指标
      • 资源管理
  • 服务等级协议SLA
    • BMR服务等级协议SLA
  • API参考
    • 通用说明
    • 公共头
    • 数据类型
    • 版本更新记录
    • 服务域名
    • 实例操作接口
    • 实例组操作接口
    • 集群操作接口
    • API简介
    • 错误码
  • 常见问题
    • 安全性问题
    • 计费类问题
    • 常见问题总览
    • 性能类问题
    • 配置类问题
    • 故障类问题
  • 视频专区
    • 操作指南
    • 产品介绍
  • 场景教程
    • 流式应用场景
    • 离线应用场景
    • 使用Hive分析网站日志
    • Sqoop应用文档
    • 定时分析日志数据
    • HIVE
      • 不同集群的 Hive 迁移方案
      • Hive 操作 Hbase 外部表
  • 产品定价
    • 转换计费方式
    • 计费项
    • 到期或欠费说明
    • 包年包月计费
    • 续费说明
    • 变更配置计费说明
    • 计费方式
    • 按需计费
    • 账单和用量查询
    • 退款说明
  • 文档中心
  • arrow
  • MapReduceBMR
  • arrow
  • 开源组件介绍
  • arrow
  • Spark
  • arrow
  • 基础使用
本页目录
  • Spark SQL 基础操作
  • 启动Spark Shell
  • RDD基础操作
  • PySpark基础操作
  • SparkSQL UDF 基础操作
  • 前提条件
  • 使用Hive UDF

基础使用

更新时间:2025-08-21

Spark SQL 基础操作

Spark SQL允许用户直接运用SQL语句对数据进行操作,在此过程中,Spark会负责对SQL语句进行解析、优化以及执行。

以下示例展示了如何使用Spark SQL进行读取文件。示例如下:

  • 示例1:Spark支持多种数据格式,本示例读取了JSON格式文件的数据,并输出为Parquet格式。
Plain Text
1val peopleDF = spark.read.json("examples/student.json")
2peopleDF.write.parquet("student.parquet")
  • 示例2:借助 SQL 从 parquetFile 表中提取出年龄处于 13 岁至 19 岁区间的年轻人的名字,接着将这些数据转换为 DataFrame。之后,利用 Map 操作把名字处理成可读的形式,最后输出处理后的结果。
Plain Text
1val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")
2namesDF.map(attributes => "Name: " + attributes(0)).show()

启动Spark Shell

Spark的Shell是一款功能强大的交互式数据分析工具,它为学习API提供了一种简便途径。在Spark中,你既能够运用Scala语言,也能够使用Python语言。

操作步骤:

  1. 通过SSH方式连接集群,详情请参见登录集群。
  2. 执行以下命令,启动Spark Shell。
Plain Text
1spark-shell

RDD基础操作

Spark是以弹性分布式数据集(RDD)这一概念为核心构建的,RDD是能够进行并行操作且具备容错能力的元素集合。在Spark里,创建RDD有两种途径,一是通过集合来创建,二是借助外部数据集进行构建。像共享文件系统、HDFS、HBase或者任何提供Hadoop InputFormat的数据集,都可用于构建RDD。

  1. 创建RDD示例:
  • 通过集合来创建RDD
Plain Text
1val data = Array(1, 2, 3, 4, 5)
2val distData = sc.parallelize(data)
  • 通过外部数据集构建RDD
Plain Text
1val distFile = sc.textFile("data.txt")

当 RDD 成功构建完成后,可以针对它开展一系列操作,诸如 Map 和 Reduce 等操作。

比如,执行以下代码时,会先从外部存储系统读取一个文本文件,利用该文件构建出一个RDD。接着,借助RDD的Map算子对其进行运算,从而得到文本文件中每一行的长度。最后,再通过Reduce算子进行计算,得出文本文件中各行长度的总和。

Plain Text
1val lines = sc.textFile("data.txt")
2val lineLengths = lines.map(s => s.length)
3val totalLength = lineLengths.reduce((a, b) => a + b)

一般来说,Spark RDD有两种常见操作,分别是Transform操作和Action操作。Transform操作不会马上执行,而是要等到执行Action操作时才会真正执行。

  • Transform操作
操作 描述
map() 参数是函数,函数应用于RDD每一个元素,返回值是新的RDD。
flatMap() 参数是函数,函数应用于RDD每一个元素,拆分元素数据,变成迭代器,返回值是新的RDD。
filter() 参数是函数,函数会过滤掉不符合条件的元素,返回值是新的RDD。
distinct() 没有参数,将RDD里的元素进行去重操作。
union() 参数是RDD,生成包含两个RDD所有元素的新RDD。
intersection() 参数是RDD,求出两个RDD的共同元素。
subtract() 参数是RDD,去掉原RDD里和参数RDD里相同的元素。
cartesian() 参数是RDD,求两个RDD的笛卡尔积。
  • Action操作
操作 描述
collect() 返回RDD所有元素。
count() 返回RDD中的元素个数。
countByValue() 返回各元素在RDD中出现的次数。
reduce() 并行整合所有RDD数据,例如求和操作。
fold(0)(func) 和reduce()功能一样,但是fold带有初始值。
aggregate(0)(seqOp,combop) 和reduce()功能一样,但是返回的RDD数据类型和原RDD不一样。
foreach(func) 对RDD每个元素都是使用特定函数。

PySpark基础操作

  1. SSH登录集群,参考SSH连接到集群
  2. 执行以下命令,进入PySpark交互式环境:
Plain Text
1pyspark
  1. 初始化SparkSession
Plain Text
1from pyspark.sql import SparkSession
2spark = SparkSession.builder.getOrCreate()
  1. 创建DataFrame
Plain Text
1from datetime import datetime, date
2import pandas as pd
3from pyspark.sql import Row
4
5df = spark.createDataFrame([
6    (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
7    (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
8    (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
9],schema='a long, b double, c string, d date, e timestamp')

DataFrame创建完成后,可以通过各种类型的transform算子完成数据计算。

SparkSQL UDF 基础操作

Spark SQL具备众多内建函数,能够满足您的计算需求。同时,您还可以通过创建自定义函数(UDF)来满足各种不同的计算需求。UDF在使用方式上和普通的内建函数相近。本文将为您介绍在Spark SQL中使用Hive自定义函数的具体流程。

前提条件

已在Hive中创建了UDF,详情请参见开发UDF。

使用Hive UDF

  1. 使用文件传输工具,上传生成的JAR包至集群任意目录(本文以test目录为例)。
  2. 上传JAR包至HDFS或BOS(本文以HDFS为例)。

a.通过SSH方式登录集群,详情请参见登录集群。

b.执行以下命令,上传JAR包到HDFS:

Plain Text
1hadoop fs -put /test/hiveudf-1.0-SNAPSHOT.jar /user/hive/warehouse/

您可以通过hadoop fs -ls /user/hive/warehouse/命令,查看是否上传成功。待返回信息如下所示表示上传成功。

Plain Text
1Found 1 items
2-rw-r--r--   1 xx xx 2668 2021-06-09 14:13 /user/hive/warehouse/hiveudf-1.0-SNAPSHOT.jar
  1. 执行以下命令,使用UDF函数。

该函数与内置函数使用方式一样,直接使用函数名称即可访问。

Plain Text
1select myfunc("abc");

返回如下信息。

Plain Text
1OK
2abc:HelloWorld

上一篇
引擎增强
下一篇
Hive