Flink接入Kafka专享版

消息服务 for Kafka Kafka

  • 功能发布记录
  • 共享版
    • 产品定价
    • 产品描述
      • 介绍
    • 快速入门
      • 多用户访问控制
      • 监控报警
      • 操作流程
    • 常见问题
      • 常见问题总览
      • 安全类问题
      • 配置类问题
    • API文档
      • 通用说明
      • 简介
      • 公共头
      • 接口说明
      • 模型定义
      • 服务域名
      • 错误返回
  • 专享版
    • 产品描述
      • 使用限制
      • 基本概念
      • 产品规格
      • 产品优势
      • 产品介绍
      • 应用场景
      • 产品架构
    • 开发指南
      • 访问协议介绍
      • 接入点查看
      • 概述
      • Python示例
        • VPC网络PLAINTEXT方式生产和消费
        • VPC网络SASL_PLAINTEXT方式生产和消费消息
        • SSL协议生产和消费消息
        • 公网SASL_SSL方式生产和消费消息
      • Go示例
        • VPC网络PLAINTEXT方式生产和消费
        • VPC网络SASL_PLAINTEXT方式生产和消费消息
        • SSL协议生产和消费消息
        • 公网SASL_SSL方式生产和消费消息
      • Java示例
        • VPC网络PLAINTEXT方式生产和消费
        • VPC网络SASL_PLAINTEXT方式生产和消费消息
        • SSL协议生产和消费消息
        • 公网SASL_SSL方式生产和消费消息
      • PHP示例
        • VPC网络PLAINTEXT方式生产和消费
        • VPC网络SASL_PLAINTEXT方式生产和消费消息
        • SSL协议生产和消费消息
        • 公网SASL_SSL方式生产和消费消息
      • C++示例
        • VPC网络PLAINTEXT方式生产和消费
        • VPC网络SASL_PLAINTEXT方式生产和消费消息
        • SSL协议生产和消费消息
        • 公网SASL_SSL方式生产和消费消息
    • 快速入门
      • 步骤二:创建主题
      • 步骤一:创建Kafka集群实例
      • 步骤三:配置权限认证
      • 概述
      • 步骤四:访问Kafka集群
        • 使用SASL_SSL协议访问集群
        • 使用PLAINTEXT协议访问集群
        • 使用SASL_PLAINTEXT协议访问集群
        • 使用SSL协议访问集群
    • 操作指南
      • 多用户访问控制
      • 消息查询
      • 标签管理
      • 存储分析
      • 集群日志
      • 操作审计
      • 任务管理
        • 任务类型介绍
        • 查看任务详情
      • 主题管理
        • 主题重新分区
        • 删除主题
        • 创建主题
        • 修改主题分区数
        • 查看主题订阅关系
        • 查看主题分区详情
        • 查看主题详情
        • 修改主题配置
      • 用户管理
        • 删除用户
        • 重置用户密码
        • 创建用户
      • 监控报警
        • 报警策略配置
        • 集群监控
        • 事件策略配置
      • 消费组管理
        • 查看消费组订阅信息
        • 消费组重置位点
        • 删除消费组
      • 集群配置管理
        • 配置参数介绍
        • 查看集群配置详情
        • 创建集群配置
        • 删除集群配置
      • 权限管理
        • 创建权限
        • 删除权限
      • 集群管理
        • 删除集群
        • 查看集群信息
        • 变更访问配置
        • 集群变更
        • 磁盘水位处理
        • 查看Controller信息
        • 重启节点
        • 查看集群接入点
        • 集群启停
    • 最佳实践
      • Flink接入Kafka专享版
      • 如何选择合适的集群规格
      • Logstash接入Kafka专享版
      • Filebeat接入Kafka专享版
      • 业务迁移
    • API参考
      • 更新记录
      • 调用说明
      • 附录
      • 服务域名
      • 错误返回
      • 主题管理接口
        • 查询主题列表
        • 查询主题订阅详情
        • 删除主题
        • 查询主题详情
        • 创建主题
        • 获取订阅主题的消费组列表
        • 查询主题分区详情
        • 变更主题
        • 查询主题分区列表
      • 用户管理接口
        • 查询用户列表
        • 删除用户
        • 重置用户密码
        • 创建用户
      • 消费组管理接口
        • 删除消费组
        • 重置消费组位点
        • 查询消费组列表
        • 查询消费组订阅的主题列表
      • 集群管理接口
        • 查询集群参数
        • 创建集群
        • 释放集群
        • 查询集群节点列表
        • 停止集群
        • 查询集群详情
        • 启动集群
        • 查询集群列表
        • 查询集群接入点
      • 集群变更接口
        • 变更访问配置
        • 扩容磁盘容量
        • 增加节点数量
        • 变更用户安全组
        • 变更节点机型
        • 变更集群配置
        • 集群公网开关
        • 变更存储策略
        • 变更公网带宽
      • 权限管理接口
        • 查询权限列表
        • 创建权限
        • 删除权限
      • 任务管理接口
        • 暂停任务
        • 恢复任务
        • 查询操作详情
        • 查询任务列表
        • 取消任务
        • 查询任务详情
        • 启动任务
      • 集群配置管理接口
        • 查询集群配置列表
        • 查询集群配置详情
        • 创建集群配置
        • 查询集群配置版本列表
        • 新增集群配置版本
        • 查询集群配置版本详情
        • 删除集群配置
    • 产品定价
      • 余额不足提醒和欠费处理
      • 计费说明
      • 变配规则说明
      • 续费说明
所有文档
menu
没有找到结果,请重新输入

消息服务 for Kafka Kafka

  • 功能发布记录
  • 共享版
    • 产品定价
    • 产品描述
      • 介绍
    • 快速入门
      • 多用户访问控制
      • 监控报警
      • 操作流程
    • 常见问题
      • 常见问题总览
      • 安全类问题
      • 配置类问题
    • API文档
      • 通用说明
      • 简介
      • 公共头
      • 接口说明
      • 模型定义
      • 服务域名
      • 错误返回
  • 专享版
    • 产品描述
      • 使用限制
      • 基本概念
      • 产品规格
      • 产品优势
      • 产品介绍
      • 应用场景
      • 产品架构
    • 开发指南
      • 访问协议介绍
      • 接入点查看
      • 概述
      • Python示例
        • VPC网络PLAINTEXT方式生产和消费
        • VPC网络SASL_PLAINTEXT方式生产和消费消息
        • SSL协议生产和消费消息
        • 公网SASL_SSL方式生产和消费消息
      • Go示例
        • VPC网络PLAINTEXT方式生产和消费
        • VPC网络SASL_PLAINTEXT方式生产和消费消息
        • SSL协议生产和消费消息
        • 公网SASL_SSL方式生产和消费消息
      • Java示例
        • VPC网络PLAINTEXT方式生产和消费
        • VPC网络SASL_PLAINTEXT方式生产和消费消息
        • SSL协议生产和消费消息
        • 公网SASL_SSL方式生产和消费消息
      • PHP示例
        • VPC网络PLAINTEXT方式生产和消费
        • VPC网络SASL_PLAINTEXT方式生产和消费消息
        • SSL协议生产和消费消息
        • 公网SASL_SSL方式生产和消费消息
      • C++示例
        • VPC网络PLAINTEXT方式生产和消费
        • VPC网络SASL_PLAINTEXT方式生产和消费消息
        • SSL协议生产和消费消息
        • 公网SASL_SSL方式生产和消费消息
    • 快速入门
      • 步骤二:创建主题
      • 步骤一:创建Kafka集群实例
      • 步骤三:配置权限认证
      • 概述
      • 步骤四:访问Kafka集群
        • 使用SASL_SSL协议访问集群
        • 使用PLAINTEXT协议访问集群
        • 使用SASL_PLAINTEXT协议访问集群
        • 使用SSL协议访问集群
    • 操作指南
      • 多用户访问控制
      • 消息查询
      • 标签管理
      • 存储分析
      • 集群日志
      • 操作审计
      • 任务管理
        • 任务类型介绍
        • 查看任务详情
      • 主题管理
        • 主题重新分区
        • 删除主题
        • 创建主题
        • 修改主题分区数
        • 查看主题订阅关系
        • 查看主题分区详情
        • 查看主题详情
        • 修改主题配置
      • 用户管理
        • 删除用户
        • 重置用户密码
        • 创建用户
      • 监控报警
        • 报警策略配置
        • 集群监控
        • 事件策略配置
      • 消费组管理
        • 查看消费组订阅信息
        • 消费组重置位点
        • 删除消费组
      • 集群配置管理
        • 配置参数介绍
        • 查看集群配置详情
        • 创建集群配置
        • 删除集群配置
      • 权限管理
        • 创建权限
        • 删除权限
      • 集群管理
        • 删除集群
        • 查看集群信息
        • 变更访问配置
        • 集群变更
        • 磁盘水位处理
        • 查看Controller信息
        • 重启节点
        • 查看集群接入点
        • 集群启停
    • 最佳实践
      • Flink接入Kafka专享版
      • 如何选择合适的集群规格
      • Logstash接入Kafka专享版
      • Filebeat接入Kafka专享版
      • 业务迁移
    • API参考
      • 更新记录
      • 调用说明
      • 附录
      • 服务域名
      • 错误返回
      • 主题管理接口
        • 查询主题列表
        • 查询主题订阅详情
        • 删除主题
        • 查询主题详情
        • 创建主题
        • 获取订阅主题的消费组列表
        • 查询主题分区详情
        • 变更主题
        • 查询主题分区列表
      • 用户管理接口
        • 查询用户列表
        • 删除用户
        • 重置用户密码
        • 创建用户
      • 消费组管理接口
        • 删除消费组
        • 重置消费组位点
        • 查询消费组列表
        • 查询消费组订阅的主题列表
      • 集群管理接口
        • 查询集群参数
        • 创建集群
        • 释放集群
        • 查询集群节点列表
        • 停止集群
        • 查询集群详情
        • 启动集群
        • 查询集群列表
        • 查询集群接入点
      • 集群变更接口
        • 变更访问配置
        • 扩容磁盘容量
        • 增加节点数量
        • 变更用户安全组
        • 变更节点机型
        • 变更集群配置
        • 集群公网开关
        • 变更存储策略
        • 变更公网带宽
      • 权限管理接口
        • 查询权限列表
        • 创建权限
        • 删除权限
      • 任务管理接口
        • 暂停任务
        • 恢复任务
        • 查询操作详情
        • 查询任务列表
        • 取消任务
        • 查询任务详情
        • 启动任务
      • 集群配置管理接口
        • 查询集群配置列表
        • 查询集群配置详情
        • 创建集群配置
        • 查询集群配置版本列表
        • 新增集群配置版本
        • 查询集群配置版本详情
        • 删除集群配置
    • 产品定价
      • 余额不足提醒和欠费处理
      • 计费说明
      • 变配规则说明
      • 续费说明
  • 文档中心
  • arrow
  • 消息服务 for KafkaKafka
  • arrow
  • 专享版
  • arrow
  • 最佳实践
  • arrow
  • Flink接入Kafka专享版
本页目录
  • 接入准备
  • 1. 购买专享版消息服务for Kafka集群
  • 2. 为购买的集群创建主题
  • 接入步骤
  • 步骤一:获取集群接入点
  • 步骤二:添加Maven配置
  • 步骤三:编写测试代码
  • 生产者示例代码
  • 消费者实例
  • 步骤四:编译并运行
  • 步骤五:查看集群监控

Flink接入Kafka专享版

更新时间:2025-08-21

接入准备

1. 购买专享版消息服务for Kafka集群

开通消息服务 for Kafka服务后,在控制台页面点击创建集群,即可进行购买,详情可参考创建集群。

Flink接入1.png

2. 为购买的集群创建主题

在控制台页面点击集群名称,进入集群详情页面。

在左侧的边栏中点击主题管理,进入主题管理页面。

Flink接入2.png

在主题管理页面点击创建主题,进行主题的创建,详情参考创建主题。

接入步骤

步骤一:获取集群接入点

具体请参考:查看集群接入点。

步骤二:添加Maven配置

XML
1<?xml version="1.0" encoding="UTF-8"?>
2<project xmlns="http://maven.apache.org/POM/4.0.0"
3         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5    <modelVersion>4.0.0</modelVersion>
6
7    <groupId>org.example</groupId>
8    <artifactId>flink_sdk</artifactId>
9    <version>1.0-SNAPSHOT</version>
10
11    <properties>
12        <maven.compiler.source>1.8</maven.compiler.source>
13        <maven.compiler.target>1.8</maven.compiler.target>
14        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
15    </properties>
16
17    <dependencies>
18        <dependency>
19            <groupId>org.apache.kafka</groupId>
20            <artifactId>kafka-clients</artifactId>
21            <version>0.10.2.2</version>
22        </dependency>
23        <dependency>
24            <groupId>org.slf4j</groupId>
25            <artifactId>slf4j-simple</artifactId>
26            <version>1.7.25</version>
27            <scope>compile</scope>
28        </dependency>
29        <dependency>
30            <groupId>org.apache.flink</groupId>
31            <artifactId>flink-java</artifactId>
32            <version>1.6.1</version>
33        </dependency>
34        <dependency>
35            <groupId>org.apache.flink</groupId>
36            <artifactId>flink-streaming-java_2.11</artifactId>
37            <version>1.6.1</version>
38        </dependency>
39        <dependency>
40            <groupId>org.apache.flink</groupId>
41            <artifactId>flink-connector-kafka_2.11</artifactId>
42            <version>1.7.0</version>
43        </dependency>
44    </dependencies>
45
46    <build>
47        <plugins>
48            <plugin>
49                <groupId>org.apache.maven.plugins</groupId>
50                <artifactId>maven-shade-plugin</artifactId>
51                <version>3.2.4</version>
52                <executions>
53                    <execution>
54                        <phase>package</phase>
55                        <goals>
56                            <goal>shade</goal>
57                        </goals>
58                        <configuration>
59                            <transformers>
60                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
61                                    <mainClass>org.example.KafkaConsumerDemo</mainClass>
62                                </transformer>
63                                <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
64                                    <resource>reference.conf</resource>
65                                </transformer>
66                            </transformers>
67                        </configuration>
68                    </execution>
69                </executions>
70            </plugin>
71        </plugins>
72    </build>
73
74</project>

步骤三:编写测试代码

  • 需要关注并自行修改的参数
参数名 含义
access_point 接入点信息
topic 主题名称
value 消息的具体内容
group_id 消费组id

生产者示例代码

创建KafkaProducerDemo.java文件,具体代码示例如下:

Java
1package org.example;
2
3import org.apache.kafka.clients.producer.KafkaProducer;
4import org.apache.kafka.clients.producer.ProducerConfig;
5import org.apache.kafka.clients.producer.ProducerRecord;
6import org.apache.kafka.clients.producer.RecordMetadata;
7import org.apache.kafka.common.serialization.StringSerializer;
8
9import java.util.Properties;
10
11public class KafkaProducerDemo {
12    public static void main(String args[]) throws Exception {
13
14        //接入点设置
15        String access_point = "x.x.x.x:9092,x.x.x.x:9092,x.x.x.x:9092";
16        //填写需要发送消息的主题名称
17        String topic = "topic_name";
18        //填写需要发送消息
19        String value = "test flink message";
20        // 创建配置类
21        Properties props = new Properties();;
22        // 指定kafka服务端所在位置
23        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, access_point);
24        // Kafka消息的序列化方式。
25        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
26        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
27        // 请求的最长等待时间。
28        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000);
29        // 设置客户端内部重试次数。
30        props.put(ProducerConfig.RETRIES_CONFIG, 5);
31        // 设置客户端内部重试间隔。
32        props.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, 3000);
33        // 构造 Producer 对象
34        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);
35
36        try {
37            // 测试发送 100 条消息
38            for (int i = 0; i < 100; i++) {
39                ProducerRecord<String, String> kafkaMessage =  new ProducerRecord<>(topic, value + ": " + i);
40                kafkaProducer.send(kafkaMessage, (RecordMetadata recordMetadata, Exception e) -> {
41                    if (e == null) {
42                        System.out.println("send success:" + recordMetadata.toString());
43                    } else {
44                        e.printStackTrace();
45                        System.err.println("send fail");
46                    }
47                });
48            }
49        } catch (Exception e) {
50            System.out.println("Something error has happened");
51            e.printStackTrace();
52        } finally {
53            kafkaProducer.close();
54        }
55    }
56}

消费者实例

创建KafkaConsumerDemo.java文件,具体代码示例如下

Java
1package org.example;
2
3import org.apache.flink.api.common.serialization.SimpleStringSchema;
4import org.apache.flink.streaming.api.datastream.DataStream;
5import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
6import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
7
8import java.util.Properties;
9
10
11public class KafkaConsumerDemo {
12    public static void main(String args[]) throws Exception {
13
14        //接入点设置
15        String access_point = "x.x.x.x:9092,x.x.x.x:9092,x.x.x.x:9092";
16        String group_id = "flink_group";
17        
18        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
19        // 创建配置类
20        Properties properties = new Properties();
21        // 设置接入点
22        properties.setProperty("bootstrap.servers", access_point);
23        // 设置消费组id
24        properties.setProperty("group.id", group_id);
25        // FlinkKafkaConsumer的第一个参数填创建的topic名称
26        DataStream<String> stream = env
27                .addSource(new FlinkKafkaConsumer<>("topic_name", new SimpleStringSchema(), properties));
28        // 打印输出
29        stream.print();
30        env.execute();
31    }
32}

步骤四:编译并运行

通过maven工具将上述代码打包后,上传至指定的服务器,并执行以下命令:

Shell
1java -jar flink_sdk-1.0-SNAPSHOT.jar

步骤五:查看集群监控

查看消息是否发送成功或消费成功有两种方式:

  1. 在服务器端查看jar包运行日志。
  2. 在专享版消息服务 for Kafka控制台查看集群监控,获取集群生产、消息情况。

推荐使用第二种方式,下面介绍如何查看集群监控。

(1)在专享版消息服务 for Kafka的控制台页面找到需要连接的集群,点击集群名称进入集群详情页面。

监控信息1.png

(2)页面跳转后,进入左侧边中的集群详情页面。

监控信息2.png

(3)点击左侧边栏中的集群监控,进入集群监控页面。

监控信息3.png

(4)通过查看集群监控页面,提供的不同纬度的监控信息(集群监控、节点监控、主题监控、消费组监控),即可获知集群的生产和消费情况。

集群监控的具体使用请参考:集群监控

监控信息4.png

上一篇
操作指南
下一篇
如何选择合适的集群规格