从Spark导入

MapReduce BMR

  • 发行版本
  • 功能发布记录
  • 产品描述
    • 节点类型说明
    • 产品优势
    • 应用场景
    • 产品功能
    • 产品简介
  • Python-SDK
    • Cluster(集群)
    • BmrClient
    • 异常处理
    • InstanceGroup(实例组)
    • 简介
    • 文档更新记录
    • Step(作业)
    • 版本变更记录
    • Instance(实例)
    • 快速入门
    • 安装SDK工具包
  • 开源组件介绍
    • Impala
    • Pig
    • Druid
    • Presto
    • Hue
    • Ooize
    • HBase
    • Kudu
    • Sqoop
    • Hadoop-Streaming
    • Zeppelin
    • Alluxio
    • Kerberos
      • 集群互信配置
      • 概述
    • ClickHouse
      • 常见问题
      • 数据迁移同步
        • 从Spark导入
        • 从Kafka同步数据
        • 将自建ClickHouse数据迁移到云ClickHouse中
        • 从Flink导入
        • 从MySQL导入和同步
        • 从本地数据导入
          • Parquet格式
          • JSON
          • SQL转储
          • CSV and TSV
      • 快速入门
        • 访问模式
        • 客户端登录
        • 创建ClickHouse集群
        • 基础操作
      • 运维相关操作
        • ClickHouse集群扩容
        • ClickHouse集群缩容
        • 日志配置说明
        • 监控告警配置
    • Ranger
      • ranger概述
      • 权限策略配置
    • Paimon
      • Hive示例
      • StarRocks示例
      • 联合查询示例
      • Flink示例
      • Spark示例
    • Flink
      • 基础使用
    • Trino
      • 基础使用
      • 概述
    • Spark
      • 引擎增强
      • 基础使用
    • Hive
      • 开发指南
        • 自定义函数(UDF)
      • 实践操作
        • Hive迁移
        • Hive操作HBase外表
      • 基础使用
        • Hive基础操作
        • Hive连接方式
  • Java-SDK
    • Cluster(集群)
    • 异常
    • BmrClient
    • InstanceGroup(实例组)
    • 日志
    • 文档更新记录
    • 版本更新记录
    • Step(作业)
    • Instance(实例)
    • 快速入门
    • 安装SDK工具包
    • 概述
  • 快速入门
    • 操作流程概览
    • 环境准备
    • 创建集群
    • 数据准备
    • 开发作业
    • 查看结果
    • ClickHouse
      • 导入数据
      • 创建数据库
      • 连接集群
      • 创建表
  • 操作指南
    • 集群模板
    • 服务管理
    • 集群配置
      • 用户管理
      • 弹性伸缩
      • 创建集群
      • 集群安全模式
      • EIP
      • Hive元数据说明
      • 集群审计
      • 配置已有集群
      • 安全组
    • 管理作业
      • 创建作业
      • 诊断、调优
      • 定时任务
      • 查看作业
    • 访问集群
      • 访问集群服务页面
      • 访问集群-openVPN访问集群
      • 使用OpenVPN提交Hadoop作业
      • SSH连接到集群
    • 实践操作
      • 存储数据至HBase
      • 导入数据
      • 编译Maven项目
      • Sqoop导入导出数据
        • 导出数据
    • 权限管理
      • 多用户访问控制
      • 用户管理
    • 集群管理
      • 节点管理
      • 监控报警
      • 集群指标
      • 资源管理
  • 服务等级协议SLA
    • BMR服务等级协议SLA
  • API参考
    • 通用说明
    • 公共头
    • 数据类型
    • 版本更新记录
    • 服务域名
    • 实例操作接口
    • 实例组操作接口
    • 集群操作接口
    • API简介
    • 错误码
  • 常见问题
    • 安全性问题
    • 计费类问题
    • 常见问题总览
    • 性能类问题
    • 配置类问题
    • 故障类问题
  • 视频专区
    • 操作指南
    • 产品介绍
  • 场景教程
    • 流式应用场景
    • 离线应用场景
    • 使用Hive分析网站日志
    • Sqoop应用文档
    • 定时分析日志数据
    • HIVE
      • 不同集群的 Hive 迁移方案
      • Hive 操作 Hbase 外部表
  • 产品定价
    • 转换计费方式
    • 计费项
    • 到期或欠费说明
    • 包年包月计费
    • 续费说明
    • 变更配置计费说明
    • 计费方式
    • 按需计费
    • 账单和用量查询
    • 退款说明
所有文档
menu
没有找到结果,请重新输入

MapReduce BMR

  • 发行版本
  • 功能发布记录
  • 产品描述
    • 节点类型说明
    • 产品优势
    • 应用场景
    • 产品功能
    • 产品简介
  • Python-SDK
    • Cluster(集群)
    • BmrClient
    • 异常处理
    • InstanceGroup(实例组)
    • 简介
    • 文档更新记录
    • Step(作业)
    • 版本变更记录
    • Instance(实例)
    • 快速入门
    • 安装SDK工具包
  • 开源组件介绍
    • Impala
    • Pig
    • Druid
    • Presto
    • Hue
    • Ooize
    • HBase
    • Kudu
    • Sqoop
    • Hadoop-Streaming
    • Zeppelin
    • Alluxio
    • Kerberos
      • 集群互信配置
      • 概述
    • ClickHouse
      • 常见问题
      • 数据迁移同步
        • 从Spark导入
        • 从Kafka同步数据
        • 将自建ClickHouse数据迁移到云ClickHouse中
        • 从Flink导入
        • 从MySQL导入和同步
        • 从本地数据导入
          • Parquet格式
          • JSON
          • SQL转储
          • CSV and TSV
      • 快速入门
        • 访问模式
        • 客户端登录
        • 创建ClickHouse集群
        • 基础操作
      • 运维相关操作
        • ClickHouse集群扩容
        • ClickHouse集群缩容
        • 日志配置说明
        • 监控告警配置
    • Ranger
      • ranger概述
      • 权限策略配置
    • Paimon
      • Hive示例
      • StarRocks示例
      • 联合查询示例
      • Flink示例
      • Spark示例
    • Flink
      • 基础使用
    • Trino
      • 基础使用
      • 概述
    • Spark
      • 引擎增强
      • 基础使用
    • Hive
      • 开发指南
        • 自定义函数(UDF)
      • 实践操作
        • Hive迁移
        • Hive操作HBase外表
      • 基础使用
        • Hive基础操作
        • Hive连接方式
  • Java-SDK
    • Cluster(集群)
    • 异常
    • BmrClient
    • InstanceGroup(实例组)
    • 日志
    • 文档更新记录
    • 版本更新记录
    • Step(作业)
    • Instance(实例)
    • 快速入门
    • 安装SDK工具包
    • 概述
  • 快速入门
    • 操作流程概览
    • 环境准备
    • 创建集群
    • 数据准备
    • 开发作业
    • 查看结果
    • ClickHouse
      • 导入数据
      • 创建数据库
      • 连接集群
      • 创建表
  • 操作指南
    • 集群模板
    • 服务管理
    • 集群配置
      • 用户管理
      • 弹性伸缩
      • 创建集群
      • 集群安全模式
      • EIP
      • Hive元数据说明
      • 集群审计
      • 配置已有集群
      • 安全组
    • 管理作业
      • 创建作业
      • 诊断、调优
      • 定时任务
      • 查看作业
    • 访问集群
      • 访问集群服务页面
      • 访问集群-openVPN访问集群
      • 使用OpenVPN提交Hadoop作业
      • SSH连接到集群
    • 实践操作
      • 存储数据至HBase
      • 导入数据
      • 编译Maven项目
      • Sqoop导入导出数据
        • 导出数据
    • 权限管理
      • 多用户访问控制
      • 用户管理
    • 集群管理
      • 节点管理
      • 监控报警
      • 集群指标
      • 资源管理
  • 服务等级协议SLA
    • BMR服务等级协议SLA
  • API参考
    • 通用说明
    • 公共头
    • 数据类型
    • 版本更新记录
    • 服务域名
    • 实例操作接口
    • 实例组操作接口
    • 集群操作接口
    • API简介
    • 错误码
  • 常见问题
    • 安全性问题
    • 计费类问题
    • 常见问题总览
    • 性能类问题
    • 配置类问题
    • 故障类问题
  • 视频专区
    • 操作指南
    • 产品介绍
  • 场景教程
    • 流式应用场景
    • 离线应用场景
    • 使用Hive分析网站日志
    • Sqoop应用文档
    • 定时分析日志数据
    • HIVE
      • 不同集群的 Hive 迁移方案
      • Hive 操作 Hbase 外部表
  • 产品定价
    • 转换计费方式
    • 计费项
    • 到期或欠费说明
    • 包年包月计费
    • 续费说明
    • 变更配置计费说明
    • 计费方式
    • 按需计费
    • 账单和用量查询
    • 退款说明
  • 文档中心
  • arrow
  • MapReduceBMR
  • arrow
  • 开源组件介绍
  • arrow
  • ClickHouse
  • arrow
  • 数据迁移同步
  • arrow
  • 从Spark导入
本页目录
  • 将Apache Spark与ClickHouse集成
  • Spark连接器
  • 必要条件
  • 兼容性
  • 下载库
  • 作为依赖项导入
  • 使用Spark SQL
  • 启动Spark SQL命令行界面
  • 操作
  • 使用 Spark Shell
  • 启动 Spark Shell
  • Shell基本操作
  • 支持的数据类型
  • 将数据从ClickHouse读取到Spark
  • 将Spark中的数据插入ClickHouse
  • Spark JDBC
  • 读取数据
  • 写入数据

从Spark导入

更新时间:2025-08-21

将Apache Spark与ClickHouse集成

连接Apache Spark和ClickHouse有两种主要方式:

  • Spark连接器-Spark连接器实现了DataSourceV2,并具有自己的目录管理。截至今天,这是集成ClickHouse和Spark的推荐方式。
  • Spark JDBC-使用JDBC数据源集成Spark和ClickHouse。

Spark连接器

此连接器利用ClickHouse特定的优化,如高级分区和谓词下推,来提高查询性能和数据处理。该连接器基于ClickHouse的官方JDBC连接器,并管理自己的目录。

必要条件

  • Java 8 or 17
  • Scala 2.12 or 2.13
  • Apache Spark 3.3 or 3.4 or 3.5

兼容性

版本 兼容的 Spark 版本 ClickHouse JDBC 版本
main Spark 3.3, 3.4, 3.5 0.6.3
0.8.0 Spark 3.3, 3.4, 3.5 0.6.3
0.7.3 Spark 3.3, 3.4 0.4.6
0.6.0 Spark 3.3 0.3.2-patch11
0.5.0 Spark 3.2, 3.3 0.3.2-patch11
0.4.0 Spark 3.2, 3.3 Not depend on
0.3.0 Spark 3.2, 3.3 Not depend on
0.2.1 Spark 3.2 Not depend on
0.1.2 Spark 3.2 Not depend on

下载库

二进制JAR的名称模式是:

Plain Text
1clickhouse-spark-runtime-${spark_binary_version}_${scala_binary_version}-${version}.jar

您可以在Maven中央存储库中找到所有可用的已发布JAR,在Sonatype OSS快照存储库中可以找到所有每日构建的SNAPSHOT JAR。

作为依赖项导入

  • Gradle
Plain Text
1dependencies {
2  implementation("com.clickhouse.spark:clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}:{{ stable_version }}")
3  implementation("com.clickhouse:clickhouse-jdbc:{{ clickhouse_jdbc_version }}:all") { transitive = false }
4}

如果要使用SNAPSHOT版本,请添加以下存储库:

Plain Text
1repositries {
2  maven { url = "https://s01.oss.sonatype.org/content/repositories/snapshots" }
3}
  • Maven
Plain Text
1<dependency>
2  <groupId>com.clickhouse.spark</groupId>
3  <artifactId>clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}</artifactId>
4  <version>{{ stable_version }}</version>
5</dependency>
6<dependency>
7  <groupId>com.clickhouse</groupId>
8  <artifactId>clickhouse-jdbc</artifactId>
9  <classifier>all</classifier>
10  <version>{{ clickhouse_jdbc_version }}</version>
11  <exclusions>
12    <exclusion>
13      <groupId>*</groupId>
14      <artifactId>*</artifactId>
15    </exclusion>
16  </exclusions>
17</dependency>

如果要使用SNAPSHOT版本,请添加以下存储库。

Plain Text
1<repositories>
2  <repository>
3    <id>sonatype-oss-snapshots</id>
4    <name>Sonatype OSS Snapshots Repository</name>
5    <url>https://s01.oss.sonatype.org/content/repositories/snapshots</url>
6  </repository>
7</repositories>

使用Spark SQL

注意:对于仅使用SQL的用例,建议将Apache Kyuubi用于生产环境。

启动Spark SQL命令行界面

Plain Text
1$SPARK_HOME/bin/spark-sql \
2  --conf spark.sql.catalog.clickhouse=com.clickhouse.spark.ClickHouseCatalog \
3  --conf spark.sql.catalog.clickhouse.host=${CLICKHOUSE_HOST:-127.0.0.1} \
4  --conf spark.sql.catalog.clickhouse.protocol=http \
5  --conf spark.sql.catalog.clickhouse.http_port=${CLICKHOUSE_HTTP_PORT:-8123} \
6  --conf spark.sql.catalog.clickhouse.user=${CLICKHOUSE_USER:-default} \
7  --conf spark.sql.catalog.clickhouse.password=${CLICKHOUSE_PASSWORD:-} \
8  --conf spark.sql.catalog.clickhouse.database=default \
9  --jars /path/clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}:{{ stable_version }}.jar,/path/clickhouse-jdbc-{{ clickhouse_jdbc_version }}-all.jar

以下论点:

Plain Text
1  --jars /path/clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}:{{ stable_version }}.jar,/path/clickhouse-jdbc-{{ clickhouse_jdbc_version }}-all.jar

可以替换为以下代码,以避免将JAR复制到Spark客户端节点。

Plain Text
1  --repositories https://{maven-cental-mirror or private-nexus-repo} \
2  --packages com.clickhouse.spark:clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}:{{ stable_version }},com.clickhouse:clickhouse-jdbc:{{ clickhouse_jdbc_version }}:all

操作

基本操作,例如创建数据库、创建表、写表、读表等。

Plain Text
1spark-sql> use clickhouse;
2Time taken: 0.016 seconds
3
4spark-sql> create database if not exists test_db;
5Time taken: 0.022 seconds
6
7spark-sql> show databases;
8default
9system
10test_db
11Time taken: 0.289 seconds, Fetched 3 row(s)
12
13spark-sql> CREATE TABLE test_db.tbl_sql (
14         >   create_time TIMESTAMP NOT NULL,
15         >   m           INT       NOT NULL COMMENT 'part key',
16         >   id          BIGINT    NOT NULL COMMENT 'sort key',
17         >   value       STRING
18         > ) USING ClickHouse
19         > PARTITIONED BY (m)
20         > TBLPROPERTIES (
21         >   engine = 'MergeTree()',
22         >   order_by = 'id',
23         >   settings.index_granularity = 8192
24         > );
25Time taken: 0.242 seconds
26
27spark-sql> insert into test_db.tbl_sql values
28         > (timestamp'2021-01-01 10:10:10', 1, 1L, '1'),
29         > (timestamp'2022-02-02 10:10:10', 2, 2L, '2')
30         > as tabl(create_time, m, id, value);
31Time taken: 0.276 seconds
32
33spark-sql> select * from test_db.tbl_sql;
342021-01-01 10:10:10 1   1   1
352022-02-02 10:10:10 2   2   2
36Time taken: 0.116 seconds, Fetched 2 row(s)
37
38spark-sql> insert into test_db.tbl_sql select * from test_db.tbl_sql;
39Time taken: 1.028 seconds
40
41spark-sql> insert into test_db.tbl_sql select * from test_db.tbl_sql;
42Time taken: 0.462 seconds
43
44spark-sql> select count(*) from test_db.tbl_sql;
456
46Time taken: 1.421 seconds, Fetched 1 row(s)
47
48spark-sql> select * from test_db.tbl_sql;
492021-01-01 10:10:10 1   1   1
502021-01-01 10:10:10 1   1   1
512021-01-01 10:10:10 1   1   1
522022-02-02 10:10:10 2   2   2
532022-02-02 10:10:10 2   2   2
542022-02-02 10:10:10 2   2   2
55Time taken: 0.123 seconds, Fetched 6 row(s)
56
57spark-sql> delete from test_db.tbl_sql where id = 1;
58Time taken: 0.129 seconds
59
60spark-sql> select * from test_db.tbl_sql;
612022-02-02 10:10:10 2   2   2
622022-02-02 10:10:10 2   2   2
632022-02-02 10:10:10 2   2   2
64Time taken: 0.101 seconds, Fetched 3 row(s)

使用 Spark Shell

启动 Spark Shell

Plain Text
1$SPARK_HOME/bin/spark-shell \
2  --conf spark.sql.catalog.clickhouse=com.clickhouse.spark.ClickHouseCatalog \
3  --conf spark.sql.catalog.clickhouse.host=${CLICKHOUSE_HOST:-127.0.0.1} \
4  --conf spark.sql.catalog.clickhouse.protocol=http \
5  --conf spark.sql.catalog.clickhouse.http_port=${CLICKHOUSE_HTTP_PORT:-8123} \
6  --conf spark.sql.catalog.clickhouse.user=${CLICKHOUSE_USER:-default} \
7  --conf spark.sql.catalog.clickhouse.password=${CLICKHOUSE_PASSWORD:-} \
8  --conf spark.sql.catalog.clickhouse.database=default \
9  --jars /path/clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}:{{ stable_version }}.jar,/path/clickhouse-jdbc-{{ clickhouse_jdbc_version }}-all.jar

以下论点:

Plain Text
1  --jars /path/clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}:{{ stable_version }}.jar,/path/clickhouse-jdbc-{{ clickhouse_jdbc_version }}-all.jar

可以替换为以下代码,以避免将JAR复制到Spark客户端节点。

Plain Text
1  --repositories https://{maven-cental-mirror or private-nexus-repo} \
2  --packages com.clickhouse.spark:clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}:{{ stable_version }},com.clickhouse:clickhouse-jdbc:{{ clickhouse_jdbc_version }}:all

Shell基本操作

基本操作,例如创建数据库、创建表、写表、读表等。

Plain Text
1scala> spark.sql("use clickhouse")
2res0: org.apache.spark.sql.DataFrame = []
3
4scala> spark.sql("create database test_db")
5res1: org.apache.spark.sql.DataFrame = []
6
7scala> spark.sql("show databases").show
8+---------+
9|namespace|
10+---------+
11|  default|
12|   system|
13|  test_db|
14+---------+
15
16scala> spark.sql("""
17     | CREATE TABLE test_db.tbl (
18     |   create_time TIMESTAMP NOT NULL,
19     |   m           INT       NOT NULL COMMENT 'part key',
20     |   id          BIGINT    NOT NULL COMMENT 'sort key',
21     |   value       STRING
22     | ) USING ClickHouse
23     | PARTITIONED BY (m)
24     | TBLPROPERTIES (
25     |   engine = 'MergeTree()',
26     |   order_by = 'id',
27     |   settings.index_granularity = 8192
28     | )
29     | """)
30res2: org.apache.spark.sql.DataFrame = []
31
32scala> :paste
33// Entering paste mode (ctrl-D to finish)
34
35spark.createDataFrame(Seq(
36    ("2021-01-01 10:10:10", 1L, "1"),
37    ("2022-02-02 10:10:10", 2L, "2")
38)).toDF("create_time", "id", "value")
39    .withColumn("create_time", to_timestamp($"create_time"))
40    .withColumn("m", month($"create_time"))
41    .select($"create_time", $"m", $"id", $"value")
42    .writeTo("test_db.tbl")
43    .append
44
45// Exiting paste mode, now interpreting.
46
47scala> spark.table("test_db.tbl").show
48+-------------------+---+---+-----+
49|        create_time|  m| id|value|
50+-------------------+---+---+-----+
51|2021-01-01 10:10:10|  1|  1|    1|
52|2022-02-02 10:10:10|  2|  2|    2|
53+-------------------+---+---+-----+
54
55scala> spark.sql("DELETE FROM test_db.tbl WHERE id=1")
56res3: org.apache.spark.sql.DataFrame = []
57
58scala> spark.table("test_db.tbl").show
59+-------------------+---+---+-----+
60|        create_time|  m| id|value|
61+-------------------+---+---+-----+
62|2022-02-02 10:10:10|  2|  2|    2|
63+-------------------+---+---+-----+

执行ClickHouse原生SQL。

Plain Text
1scala> val options = Map(
2     |     "host" -> "clickhouse",
3     |     "protocol" -> "http",
4     |     "http_port" -> "8123",
5     |     "user" -> "default",
6     |     "password" -> ""
7     | )
8
9scala> val sql = """
10     | |CREATE TABLE test_db.person (
11     | |  id    Int64,
12     | |  name  String,
13     | |  age Nullable(Int32)
14     | |)
15     | |ENGINE = MergeTree()
16     | |ORDER BY id
17     | """.stripMargin
18
19scala> spark.executeCommand("com.clickhouse.spark.ClickHouseCommandRunner", sql, options) 
20
21scala> spark.sql("show tables in clickhouse_s1r1.test_db").show
22+---------+---------+-----------+
23|namespace|tableName|isTemporary|
24+---------+---------+-----------+
25|  test_db|   person|      false|
26+---------+---------+-----------+
27
28scala> spark.table("clickhouse_s1r1.test_db.person").printSchema
29root
30 |-- id: long (nullable = false)
31 |-- name: string (nullable = false)
32 |-- age: integer (nullable = true)

支持的数据类型

本节概述了Spark和ClickHouse之间的数据类型映射。下表提供了从ClickHouse读取数据到Spark以及将Spark数据插入ClickHouse时转换数据类型的快速参考。

将数据从ClickHouse读取到Spark

ClickHouse 数据类型 Spark 数据类型 是否支持 是原始的 备注
Nothing NullType ✅ 是
Bool BooleanType ✅ 是
UInt8, Int16 ShortType ✅ 是
Int8 ByteType ✅ 是
UInt16,Int32 IntegerType ✅ 是
UInt32,Int64, UInt64 LongType ✅ 是
Int128,UInt128, Int256, UInt256 DecimalType(38, 0) ✅ 是
Float32 FloatType ✅ 是
Float64 DoubleType ✅ 是
String, JSON, UUID, Enum8, Enum16, IPv4, IPv6 StringType ✅ 是
FixedString BinaryType, StringType ✅ 是 由配置控制READ_FIXED_STRING_AS
Decimal DecimalType ✅ 是 精度和规模高达Decimal128
Decimal32 DecimalType(9, scale) ✅ 是
Decimal64 DecimalType(18, scale) ✅ 是
Decimal128 DecimalType(38, scale) ✅ 是
Date, Date32 DateType ✅ 是
DateTime, DateTime32, DateTime64 TimestampType ✅ 是
Array ArrayType ✅ 是 数组元素类型也会被转换
Map MapType ✅ 是 钥匙仅限于StringType
IntervalYear YearMonthIntervalType(Year) ✅ 是
IntervalMonth YearMonthIntervalType(Month) ✅ 是
IntervalDay, IntervalHour, IntervalMinute, IntervalSecond DayTimeIntervalType ✅ 否 使用特定间隔类型
Object ❌
Nested ❌
Tuple ❌
Point ❌
Polygon ❌
MultiPolygon ❌
Ring ❌
IntervalQuarter ❌
IntervalWeek ❌
Decimal256 ❌
AggregateFunction ❌
SimpleAggregateFunction ❌

将Spark中的数据插入ClickHouse

Spark 数据类型 ClickHouse 数据类型 是否支持 Is Primitive 备注
BooleanType UInt8 ✅ Yes
ByteType Int8 ✅ Yes
ShortType Int16 ✅ Yes
IntegerType Int32 ✅ Yes
LongType Int64 ✅ Yes
FloatType Float32 ✅ Yes
DoubleType Float64 ✅ Yes
StringType String ✅ Yes
VarcharType String ✅ Yes
CharType String ✅ Yes
DecimalType Decimal(p, s) ✅ Yes Precision and scale up to Decimal128
DateType Date ✅ Yes
TimestampTypeArrayType (list, tuple, or array) DateTime ✅ Yes
ArrayType (list, tuple, or array) Array ✅ No Array element type is also converted
MapType Map ✅ No Keys are limited toStringType
Object ❌
Nested ❌

Spark JDBC

读取数据

Plain Text
1public static void main(String[] args) {
2        // Initialize Spark session
3        SparkSession spark = SparkSession.builder().appName("example").master("local").getOrCreate();
4
5        // JDBC connection details
6        String jdbcUrl = "jdbc:ch://localhost:8123/default";
7        Properties jdbcProperties = new Properties();
8        jdbcProperties.put("user", "default");
9        jdbcProperties.put("password", "123456");
10
11        // Load the table from ClickHouse
12        Dataset<Row> df = spark.read().jdbc(jdbcUrl, "example_table", jdbcProperties);
13
14        // Show the DataFrame
15        df.show();
16
17        // Stop the Spark session
18        spark.stop();
19    }

写入数据

Plain Text
1    public static void main(String[] args) {
2        // Initialize Spark session
3        SparkSession spark = SparkSession.builder().appName("example").master("local").getOrCreate();
4
5        // JDBC connection details
6        String jdbcUrl = "jdbc:ch://localhost:8123/default";
7        Properties jdbcProperties = new Properties();
8        jdbcProperties.put("user", "default");
9        jdbcProperties.put("password", "******");
10        // Create a sample DataFrame
11        StructType schema = new StructType(new StructField[]{
12                DataTypes.createStructField("id", DataTypes.IntegerType, false),
13                DataTypes.createStructField("name", DataTypes.StringType, false)
14        });
15        
16        List<Row> rows = new ArrayList<Row>();
17        rows.add(RowFactory.create(1, "John"));
18        rows.add(RowFactory.create(2, "Doe"));
19
20        Dataset<Row> df = spark.createDataFrame(rows, schema);
21
22        df.write()
23                .mode(SaveMode.Append)
24                .jdbc(jdbcUrl, "my_table", jdbcProperties);
25        // Show the DataFrame
26        df.show();
27
28        // Stop the Spark session
29        spark.stop();
30    }

上一篇
常见问题
下一篇
从Kafka同步数据