从Flink导入

MapReduce BMR

  • 发行版本
  • 功能发布记录
  • 产品描述
    • 节点类型说明
    • 产品优势
    • 应用场景
    • 产品功能
    • 产品简介
  • Python-SDK
    • Cluster(集群)
    • BmrClient
    • 异常处理
    • InstanceGroup(实例组)
    • 简介
    • 文档更新记录
    • Step(作业)
    • 版本变更记录
    • Instance(实例)
    • 快速入门
    • 安装SDK工具包
  • 开源组件介绍
    • Impala
    • Pig
    • Druid
    • Presto
    • Hue
    • Ooize
    • HBase
    • Kudu
    • Sqoop
    • Hadoop-Streaming
    • Zeppelin
    • Alluxio
    • Kerberos
      • 集群互信配置
      • 概述
    • ClickHouse
      • 常见问题
      • 数据迁移同步
        • 从Spark导入
        • 从Kafka同步数据
        • 将自建ClickHouse数据迁移到云ClickHouse中
        • 从Flink导入
        • 从MySQL导入和同步
        • 从本地数据导入
          • Parquet格式
          • JSON
          • SQL转储
          • CSV and TSV
      • 快速入门
        • 访问模式
        • 客户端登录
        • 创建ClickHouse集群
        • 基础操作
      • 运维相关操作
        • ClickHouse集群扩容
        • ClickHouse集群缩容
        • 日志配置说明
        • 监控告警配置
    • Ranger
      • ranger概述
      • 权限策略配置
    • Paimon
      • Hive示例
      • StarRocks示例
      • 联合查询示例
      • Flink示例
      • Spark示例
    • Flink
      • 基础使用
    • Trino
      • 基础使用
      • 概述
    • Spark
      • 引擎增强
      • 基础使用
    • Hive
      • 开发指南
        • 自定义函数(UDF)
      • 实践操作
        • Hive迁移
        • Hive操作HBase外表
      • 基础使用
        • Hive基础操作
        • Hive连接方式
  • Java-SDK
    • Cluster(集群)
    • 异常
    • BmrClient
    • InstanceGroup(实例组)
    • 日志
    • 文档更新记录
    • 版本更新记录
    • Step(作业)
    • Instance(实例)
    • 快速入门
    • 安装SDK工具包
    • 概述
  • 快速入门
    • 操作流程概览
    • 环境准备
    • 创建集群
    • 数据准备
    • 开发作业
    • 查看结果
    • ClickHouse
      • 导入数据
      • 创建数据库
      • 连接集群
      • 创建表
  • 操作指南
    • 集群模板
    • 服务管理
    • 集群配置
      • 用户管理
      • 弹性伸缩
      • 创建集群
      • 集群安全模式
      • EIP
      • Hive元数据说明
      • 集群审计
      • 配置已有集群
      • 安全组
    • 管理作业
      • 创建作业
      • 诊断、调优
      • 定时任务
      • 查看作业
    • 访问集群
      • 访问集群服务页面
      • 访问集群-openVPN访问集群
      • 使用OpenVPN提交Hadoop作业
      • SSH连接到集群
    • 实践操作
      • 存储数据至HBase
      • 导入数据
      • 编译Maven项目
      • Sqoop导入导出数据
        • 导出数据
    • 权限管理
      • 多用户访问控制
      • 用户管理
    • 集群管理
      • 节点管理
      • 监控报警
      • 集群指标
      • 资源管理
  • 服务等级协议SLA
    • BMR服务等级协议SLA
  • API参考
    • 通用说明
    • 公共头
    • 数据类型
    • 版本更新记录
    • 服务域名
    • 实例操作接口
    • 实例组操作接口
    • 集群操作接口
    • API简介
    • 错误码
  • 常见问题
    • 安全性问题
    • 计费类问题
    • 常见问题总览
    • 性能类问题
    • 配置类问题
    • 故障类问题
  • 视频专区
    • 操作指南
    • 产品介绍
  • 场景教程
    • 流式应用场景
    • 离线应用场景
    • 使用Hive分析网站日志
    • Sqoop应用文档
    • 定时分析日志数据
    • HIVE
      • 不同集群的 Hive 迁移方案
      • Hive 操作 Hbase 外部表
  • 产品定价
    • 转换计费方式
    • 计费项
    • 到期或欠费说明
    • 包年包月计费
    • 续费说明
    • 变更配置计费说明
    • 计费方式
    • 按需计费
    • 账单和用量查询
    • 退款说明
所有文档
menu
没有找到结果,请重新输入

MapReduce BMR

  • 发行版本
  • 功能发布记录
  • 产品描述
    • 节点类型说明
    • 产品优势
    • 应用场景
    • 产品功能
    • 产品简介
  • Python-SDK
    • Cluster(集群)
    • BmrClient
    • 异常处理
    • InstanceGroup(实例组)
    • 简介
    • 文档更新记录
    • Step(作业)
    • 版本变更记录
    • Instance(实例)
    • 快速入门
    • 安装SDK工具包
  • 开源组件介绍
    • Impala
    • Pig
    • Druid
    • Presto
    • Hue
    • Ooize
    • HBase
    • Kudu
    • Sqoop
    • Hadoop-Streaming
    • Zeppelin
    • Alluxio
    • Kerberos
      • 集群互信配置
      • 概述
    • ClickHouse
      • 常见问题
      • 数据迁移同步
        • 从Spark导入
        • 从Kafka同步数据
        • 将自建ClickHouse数据迁移到云ClickHouse中
        • 从Flink导入
        • 从MySQL导入和同步
        • 从本地数据导入
          • Parquet格式
          • JSON
          • SQL转储
          • CSV and TSV
      • 快速入门
        • 访问模式
        • 客户端登录
        • 创建ClickHouse集群
        • 基础操作
      • 运维相关操作
        • ClickHouse集群扩容
        • ClickHouse集群缩容
        • 日志配置说明
        • 监控告警配置
    • Ranger
      • ranger概述
      • 权限策略配置
    • Paimon
      • Hive示例
      • StarRocks示例
      • 联合查询示例
      • Flink示例
      • Spark示例
    • Flink
      • 基础使用
    • Trino
      • 基础使用
      • 概述
    • Spark
      • 引擎增强
      • 基础使用
    • Hive
      • 开发指南
        • 自定义函数(UDF)
      • 实践操作
        • Hive迁移
        • Hive操作HBase外表
      • 基础使用
        • Hive基础操作
        • Hive连接方式
  • Java-SDK
    • Cluster(集群)
    • 异常
    • BmrClient
    • InstanceGroup(实例组)
    • 日志
    • 文档更新记录
    • 版本更新记录
    • Step(作业)
    • Instance(实例)
    • 快速入门
    • 安装SDK工具包
    • 概述
  • 快速入门
    • 操作流程概览
    • 环境准备
    • 创建集群
    • 数据准备
    • 开发作业
    • 查看结果
    • ClickHouse
      • 导入数据
      • 创建数据库
      • 连接集群
      • 创建表
  • 操作指南
    • 集群模板
    • 服务管理
    • 集群配置
      • 用户管理
      • 弹性伸缩
      • 创建集群
      • 集群安全模式
      • EIP
      • Hive元数据说明
      • 集群审计
      • 配置已有集群
      • 安全组
    • 管理作业
      • 创建作业
      • 诊断、调优
      • 定时任务
      • 查看作业
    • 访问集群
      • 访问集群服务页面
      • 访问集群-openVPN访问集群
      • 使用OpenVPN提交Hadoop作业
      • SSH连接到集群
    • 实践操作
      • 存储数据至HBase
      • 导入数据
      • 编译Maven项目
      • Sqoop导入导出数据
        • 导出数据
    • 权限管理
      • 多用户访问控制
      • 用户管理
    • 集群管理
      • 节点管理
      • 监控报警
      • 集群指标
      • 资源管理
  • 服务等级协议SLA
    • BMR服务等级协议SLA
  • API参考
    • 通用说明
    • 公共头
    • 数据类型
    • 版本更新记录
    • 服务域名
    • 实例操作接口
    • 实例组操作接口
    • 集群操作接口
    • API简介
    • 错误码
  • 常见问题
    • 安全性问题
    • 计费类问题
    • 常见问题总览
    • 性能类问题
    • 配置类问题
    • 故障类问题
  • 视频专区
    • 操作指南
    • 产品介绍
  • 场景教程
    • 流式应用场景
    • 离线应用场景
    • 使用Hive分析网站日志
    • Sqoop应用文档
    • 定时分析日志数据
    • HIVE
      • 不同集群的 Hive 迁移方案
      • Hive 操作 Hbase 外部表
  • 产品定价
    • 转换计费方式
    • 计费项
    • 到期或欠费说明
    • 包年包月计费
    • 续费说明
    • 变更配置计费说明
    • 计费方式
    • 按需计费
    • 账单和用量查询
    • 退款说明
  • 文档中心
  • arrow
  • MapReduceBMR
  • arrow
  • 开源组件介绍
  • arrow
  • ClickHouse
  • arrow
  • 数据迁移同步
  • arrow
  • 从Flink导入
本页目录
  • 数据类型映射
  • Maven依赖关系
  • 使用步骤

从Flink导入

更新时间:2025-08-21

使用Flink ClickHouse 连接器进行导入。

表一 连接器选择

选项 默认 类型 描述
网址 必填 none String 格式的 ClickHouse jdbc url clickhouse://<host>:<port>
用户名 选填 none String 如果指定了“用户名”和“密码”,则必须同时指定。
密码 选填 none String ClickHouse 密码。
数据库名称 选填 default String ClickHouse 数据库名称。
表名 必填 none String ClickHouse 表名。
使用本地 选填 false Boolean 在分布式表引擎的情况下直接读取/写入本地表。
sink.flush 间隔 选填 1000 Integer 最大刷新大小,超过此大小将会刷新数据。
sink.flush 间隔 选填 1s Duration 在此刷新间隔时间内,异步线程将刷新数据。
sink.max-重试次数 选填 3 Integer 将记录写入数据库失败时的最大重试次数。
sink.update 策略 选填 update String 将 UPDATE_AFTER 类型的记录转换为更新/插入语句或者直接丢弃它,可用:更新、插入、丢弃。
sink.partition-策略 选填 balanced String 分区策略:平衡(循环)、哈希(分区键)、随机(随机)。
接收器.分区键 选填 none String 用于哈希策略的分区键。
接收器分片使用表定义 选填 false Boolean 分片策略与分布式表定义一致,若设置为true,则会覆盖sink.partition-strategy和sink.partition-key 的配置。
sink.ignore-删除 选填 true Integer 是否忽略删除语句。
接收器并行性 选填 none String 为接收器定义自定义并行性。
扫描.分区.列 选填 none Integer 用于对输入进行分区的列名。
扫描分区号 选填 none Long 分区的数量。
扫描分区下限 选填 none Long 第一个分区的最小值。
是否忽略主键 选填 true Boolean 使用 ClickHouseCatalog 创建表时是否忽略主键。
特性 选填 none String 这可以设置并传递clickhouse-jdbc配置。
查找缓存 选填 none String 该查询表的缓存策略,包括NONE和PARTIAL(暂不支持FULL)
查找部分缓存访问后过期 选填 none Duration 访问后缓存中的条目过期的持续时间,超过此时间,最旧的行将会过期。
查找部分缓存写入后过期 选填 none Duration 写入后缓存中的条目过期的持续时间,超过此时间,最旧的行将会过期。
查找缓存最大行数 选填 none Long 查找缓存的最大行数,超过此值,最旧的行将会过期。
查找部分缓存缺少键 选填 true Boolean 标记缓存丢失的密钥,默认为 true
查找最大重试数 选填 3 Integer 查找数据库失败时的最大重试次数。

更新/删除数据注意事项:

  1. 分布式表不支持更新/删除语句,如果要使用更新/删除报表,请确保将记录写入本地表或将use local设置为true。
  2. 数据由主键更新和删除,在分区表中使用时请注意这一点。

数据类型映射

Flink 类型 ClickHouse 类型
CHAR String
VARCHAR String / IP / UUID
STRING String / Enum
BOOLEAN UInt8
BYTES FixedString
DECIMAL Decimal / Int128 / Int256 / UInt64 / UInt128 / UInt256
TINYINT Int8
SMALLINT Int16 / UInt8
INTEGER Int32 / UInt16 / Interval
BIGINT Int64 / UInt32
FLOAT Float32
DOUBLE Float64
DATE Date
TIME DateTime
TIMESTAMP DateTime
TIMESTAMP_LTZ DateTime
INTERVAL_YEAR_MONTH Int32
INTERVAL_DAY_TIME Int64
ARRAY Array
MAP Map
ROW Not supported
MULTISET Not supported
RAW Not supported

Maven依赖关系

该项目未发布到maven中央存储库,在使用之前,需要部署/安装到自己的存储库,步骤如下:

Plain Text
1 # clone the project
2git clone https://github.com/itinycheng/flink-connector-clickhouse.git
3
4 # enter the project directory
5cd flink-connector-clickhouse/
6
7 # display remote branches
8git branch -r
9
10 # checkout the branch you need
11git checkout $branch_name
12
13 # install or deploy the project to our own repository
14mvn clean install -DskipTests
15mvn clean deploy -DskipTests
Plain Text
1<dependency>
2    <groupId>org.apache.flink</groupId>
3    <artifactId>flink-connector-clickhouse</artifactId>
4    <version>1.16.0-SNAPSHOT</version>
5</dependency>

使用步骤

  1. 创建和读/写表:
Plain Text
1-- register a clickhouse table `t_user` in flink sql.
2CREATE TABLE t_user (
3    `user_id` BIGINT,
4    `user_type` INTEGER,
5    `language` STRING,
6    `country` STRING,
7    `gender` STRING,
8    `score` DOUBLE,
9    `list` ARRAY<STRING>,
10    `map` Map<STRING, BIGINT>,
11    PRIMARY KEY (`user_id`) NOT ENFORCED
12) WITH (
13    'connector' = 'clickhouse',
14    'url' = 'clickhouse://{ip}:{port}',
15    'database-name' = 'tutorial',
16    'table-name' = 'users',
17    'sink.batch-size' = '500',
18    'sink.flush-interval' = '1000',
19    'sink.max-retries' = '3'
20);
21
22-- read data from clickhouse 
23SELECT user_id, user_type from t_user;
24
25-- write data into the clickhouse table from the table `T`
26INSERT INTO t_user
27SELECT cast(`user_id` as BIGINT), `user_type`, `lang`, `country`, `gender`, `score`, ARRAY['CODER', 'SPORTSMAN'], CAST(MAP['BABA', cast(10 as BIGINT), 'NIO', cast(8 as BIGINT)] AS MAP<STRING, BIGINT>) FROM T;
  1. 创建和使用ClickHouseCatalog:
  • Scala
Plain Text
1val tEnv = TableEnvironment.create(setting)
2
3val props = new util.HashMap[String, String]()
4props.put(ClickHouseConfig.DATABASE_NAME, "default")
5props.put(ClickHouseConfig.URL, "clickhouse://127.0.0.1:8123")
6props.put(ClickHouseConfig.USERNAME, "username")
7props.put(ClickHouseConfig.PASSWORD, "password")
8props.put(ClickHouseConfig.SINK_FLUSH_INTERVAL, "30s")
9val cHcatalog = new ClickHouseCatalog("clickhouse", props)
10tEnv.registerCatalog("clickhouse", cHcatalog)
11tEnv.useCatalog("clickhouse")
12
13tEnv.executeSql("insert into `clickhouse`.`default`.`t_table` select...");
  • Java
Plain Text
1TableEnvironment tEnv = TableEnvironment.create(setting);
2
3Map<String, String> props = new HashMap<>();
4props.put(ClickHouseConfig.DATABASE_NAME, "default")
5props.put(ClickHouseConfig.URL, "clickhouse://127.0.0.1:8123")
6props.put(ClickHouseConfig.USERNAME, "username")
7props.put(ClickHouseConfig.PASSWORD, "password")
8props.put(ClickHouseConfig.SINK_FLUSH_INTERVAL, "30s");
9Catalog cHcatalog = new ClickHouseCatalog("clickhouse", props);
10tEnv.registerCatalog("clickhouse", cHcatalog);
11tEnv.useCatalog("clickhouse");
12
13tEnv.executeSql("insert into `clickhouse`.`default`.`t_table` select...");
  • SQL
Plain Text
1> CREATE CATALOG clickhouse WITH (
2    'type' = 'clickhouse',
3    'url' = 'clickhouse://127.0.0.1:8123',
4    'username' = 'username',
5    'password' = 'password',
6    'database-name' = 'default',
7    'use-local' = 'false',
8    ...
9);
10
11> USE CATALOG clickhouse;
12> SELECT user_id, user_type FROM `default`.`t_user` limit 10;
13> INSERT INTO `default`.`t_user` SELECT ...;

上一篇
将自建ClickHouse数据迁移到云ClickHouse中
下一篇
从MySQL导入和同步