Flink自定义JAR作业

百度流式计算 BSC

  • 产品定价
  • 功能发布记录
  • 产品描述
    • 产品优势
    • 应用场景
    • 产品功能
    • 核心概念
    • 概述
  • 快速入门
    • 开通服务
    • 开发作业
  • 典型实践
    • CDN 日志提取中转(ETL)
    • API 日志调用统计
    • CDN 接口日志聚合统计
    • 物联网设备实时报警统计(流表Join)
    • 物设备报警情况实时统计
    • 物联网设备实时监控预警
  • 操作指南
    • 多用户访问控制
    • 作业运维
    • 扩展内置函数
    • 编辑作业
    • 模板管理
    • 新增作业
    • 资源管理
  • API参考
    • 接口概述
    • 通用说明
    • 公共头
    • 模板相关接口
    • 模型定义
    • 服务域名
    • 请求返回格式
    • 作业实例相关接口
    • 作业相关接口
    • 资源相关接口
  • 常见问题
    • 一般问题
  • 自定义JAR作业
    • Flink自定义JAR作业
    • Spark自定义JAR作业
  • SQL
    • DML语句
    • SET 语句
    • 内置函数
    • 标识符和关键字
    • 窗口函数
    • 概述
    • DDL 语句
      • KAFKA
      • Formats
      • ES
      • Overview
      • PALO
      • TSDB
      • BKAFKA
      • RDS
      • BOS
      • MQTT
所有文档
menu
没有找到结果,请重新输入

百度流式计算 BSC

  • 产品定价
  • 功能发布记录
  • 产品描述
    • 产品优势
    • 应用场景
    • 产品功能
    • 核心概念
    • 概述
  • 快速入门
    • 开通服务
    • 开发作业
  • 典型实践
    • CDN 日志提取中转(ETL)
    • API 日志调用统计
    • CDN 接口日志聚合统计
    • 物联网设备实时报警统计(流表Join)
    • 物设备报警情况实时统计
    • 物联网设备实时监控预警
  • 操作指南
    • 多用户访问控制
    • 作业运维
    • 扩展内置函数
    • 编辑作业
    • 模板管理
    • 新增作业
    • 资源管理
  • API参考
    • 接口概述
    • 通用说明
    • 公共头
    • 模板相关接口
    • 模型定义
    • 服务域名
    • 请求返回格式
    • 作业实例相关接口
    • 作业相关接口
    • 资源相关接口
  • 常见问题
    • 一般问题
  • 自定义JAR作业
    • Flink自定义JAR作业
    • Spark自定义JAR作业
  • SQL
    • DML语句
    • SET 语句
    • 内置函数
    • 标识符和关键字
    • 窗口函数
    • 概述
    • DDL 语句
      • KAFKA
      • Formats
      • ES
      • Overview
      • PALO
      • TSDB
      • BKAFKA
      • RDS
      • BOS
      • MQTT
  • 文档中心
  • arrow
  • 百度流式计算BSC
  • arrow
  • 自定义JAR作业
  • arrow
  • Flink自定义JAR作业
本页目录
  • 1. 开发作业
  • 开发环境
  • 项目结构
  • pom文件
  • demo代码
  • 项目打包
  • 2. 新增资源
  • 3. 新增作业
  • 编辑作业参数
  • 引用jar资源
  • 保存发布作业
  • 4. 运行作业
  • 5. 更新作业

Flink自定义JAR作业

更新时间:2025-08-21

背景

BSC 产品支持用户提交FLINK自定义jar作业,以读KAFKA写BOS为例,其具体步骤如下:

步骤

1. 开发作业

开发环境

使用IDEA进行开发,项目管理使用maven,相关版本为

名称 版本
java 1.8
scala 2.11
flink 1.11.2

项目结构

整体项目结构如下图所示。由于示例中使用了KAFKA,因此将对应kafka配置文件放在resource下一起打包到jar中。

image.png

pom文件

pom文件如下所示,相关的注意事项见文件注释

XML
1<?xml version="1.0" encoding="UTF-8"?>
2<project xmlns="http://maven.apache.org/POM/4.0.0"
3         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5    <modelVersion>4.0.0</modelVersion>
6
7    <groupId>org.example</groupId>
8    <artifactId>flink-demo</artifactId>
9    <version>1.0-SNAPSHOT</version>
10
11    <!--  指定相关依赖的版本号  -->
12    <properties>
13        <maven.compiler.source>8</maven.compiler.source>
14        <maven.compiler.target>8</maven.compiler.target>
15        <flink.version>1.11.2</flink.version>
16        <scala.binary.version>2.11</scala.binary.version>
17        <scope.setting>provided</scope.setting>
18    </properties>
19
20    <dependencies>
21        <!--  1、 bsc运行环境中包含 flink 核心依赖,所以下面涉及到的 flink 核心依赖无需打到项目jar中,
22              在打包的时候需要指定scope为provided
23        -->
24        <!--  flink 核心依赖:flink-table-api-java  -->
25        <dependency>
26            <groupId>org.apache.flink</groupId>
27            <artifactId>flink-table-runtime-blink_${scala.binary.version}</artifactId>
28            <version>${flink.version}</version>
29            <scope>${scope.setting}</scope>
30        </dependency>
31
32
33        <!--  2、 bsc运行环境中包含 flink 常用依赖,所以下面涉及到的 flink 其他依赖无需打到项目jar中,
34              在打包的时候需要指定scope为provided
35        -->
36        <dependency>
37            <groupId>org.apache.flink</groupId>
38            <artifactId>flink-json</artifactId>
39            <version>${flink.version}</version>
40            <scope>${scope.setting}</scope>
41        </dependency>
42
43
44        <!--  3、 bsc运行环境中不包含 flink connector依赖,所以下面涉及到的 flink connector 依赖需要打到项目jar中。
45                 注意:
46                   - bos相关的依赖无须额外引用,
47                   - kafka必须使用0.10版本的connector和client
48        -->
49        <dependency>
50            <groupId>org.apache.flink</groupId>
51            <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
52            <version>${flink.version}</version>~~~~
53        </dependency>
54
55        <!--  4、 下面可以引用 flink 之外的一些依赖  -->
56        <dependency>
57            <groupId>org.projectlombok</groupId>
58            <artifactId>lombok</artifactId>
59            <version>1.18.24</version>
60        </dependency>
61    </dependencies>
62
63    <build>
64        <plugins>
65            <plugin>
66                <!--  scala编译插件  -->
67                <groupId>org.scala-tools</groupId>
68                <artifactId>maven-scala-plugin</artifactId>
69                <version>2.15.2</version>
70                <executions>
71                    <execution>
72                        <id>scala-compile-first</id>
73                        <goals>
74                            <goal>compile</goal>
75                        </goals>
76                        <configuration>
77                            <includes>
78                                <include>**/*.scala</include>
79                            </includes>
80                        </configuration>
81                    </execution>
82                    <execution>
83                        <id>scala-test-compile</id>
84                        <goals>
85                            <goal>testCompile</goal>
86                        </goals>
87                    </execution>
88                </executions>
89            </plugin>
90
91            <!--  按照上述的逻辑,必须打fat-jar才能把所有依赖提交到bsc中,因此需要使用此打包插件 -->
92            <plugin>
93                <artifactId>maven-assembly-plugin</artifactId>
94                <version>3.0.0</version>
95                <configuration>
96                    <archive>
97                        <manifest>
98                            <mainClass></mainClass>
99                        </manifest>
100                    </archive>
101                    <descriptorRefs>
102                        <descriptorRef>jar-with-dependencies</descriptorRef>
103                    </descriptorRefs>
104                </configuration>
105                <executions>
106                    <execution>
107                        <id>make-assembly</id>
108                        <phase>package</phase>
109                        <goals>
110                            <goal>single</goal>
111                        </goals>
112                    </execution>
113                </executions>
114            </plugin>
115        </plugins>
116    </build>
117
118</project>

demo代码

读kafka写bos的代码如下:

Java
1package com.baidu.bce.bsc.demo.flink;
2
3import lombok.Getter;
4import lombok.Setter;
5import org.apache.flink.api.common.functions.MapFunction;
6import org.apache.flink.api.common.serialization.SimpleStringEncoder;
7import org.apache.flink.api.common.typeinfo.TypeInformation;
8import org.apache.flink.api.java.typeutils.RowTypeInfo;
9import org.apache.flink.api.java.typeutils.TypeExtractor;
10import org.apache.flink.configuration.Configuration;
11import org.apache.flink.core.fs.Path;
12import org.apache.flink.formats.json.JsonRowDeserializationSchema;
13import org.apache.flink.runtime.state.filesystem.FsStateBackend;
14import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
15import org.apache.flink.streaming.api.CheckpointingMode;
16import org.apache.flink.streaming.api.datastream.DataStream;
17import org.apache.flink.streaming.api.environment.CheckpointConfig;
18import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
19import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
20import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
21import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
22import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
23import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
24import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
25import org.apache.flink.table.api.DataTypes;
26import org.apache.flink.table.filesystem.stream.StreamingFileCommitter;
27import org.apache.flink.table.filesystem.stream.StreamingFileCommitter.CommitMessage;
28import org.apache.flink.table.filesystem.stream.StreamingFileWriter;
29import org.apache.flink.table.types.DataType;
30import org.apache.flink.table.types.utils.TypeConversions;
31import org.apache.flink.types.Row;
32import org.apache.kafka.clients.consumer.ConsumerConfig;
33import org.slf4j.Logger;
34
35import java.io.BufferedInputStream;
36import java.io.File;
37import java.io.FileOutputStream;
38import java.io.IOException;
39import java.io.OutputStream;
40import java.net.URL;
41import java.net.URLConnection;
42import java.util.Arrays;
43import java.util.Base64;
44import java.util.LinkedHashMap;
45import java.util.List;
46import java.util.Map;
47import java.util.Properties;
48import java.util.concurrent.TimeUnit;
49import java.util.stream.Collectors;
50import java.util.zip.ZipEntry;
51import java.util.zip.ZipInputStream;
52
53/**
54 * 使用Flink DataStream接口完成读kafka,写bos的jar作业示例demo
55 * 示例中:
56 *   - 读kafka使用SSL协议,用户需自行指定kafka的kafka client参数,并利用demo提供的文件下载、解析能力,读取配置文件和bos上的证书zip包
57 *   - 从kafka读取的数据为json格式,需要用户自行指定schema
58 *   - 写bos进行了按照时间分桶的操作,以及生成commit文件
59 */
60public class Kafka2Bos {
61
62    // 启动日志处理
63    public static Logger logger = org.slf4j.LoggerFactory.getLogger(Kafka2Bos.class);
64
65    // 用于解析json
66    public static ObjectMapper objectMapper = new ObjectMapper();
67
68    public static void main(String[] args) throws Exception {
69
70        // 1. 获取参数
71        /**
72         * args(0) 为BSC为用户指定的checkpoint目录,无法更改。
73         * 如果用户想要使用SPARK的checkpoint功能,只能使用提供的args(0)作为checkpoint目录。
74         */
75        String checkpointLocation = args[0];
76        /**
77         * args[1] 为BSC提供的作业运行参数,以base64编码存储,解码后需要解析成Map再使用
78         * 参数格式:
79         *      key1=value1
80         *      key2=value2
81         * 本代码中示例:
82         *     bootStrapServer=kafka.bj.baidubce.com:9092
83         *     topic=test
84         *     bosEndpoint=https://bj.bcebos.com
85         *     ...
86         */
87        Map<String, String> variables = new LinkedHashMap<>();
88        try {
89            String kvStr = new String(Base64.getDecoder().decode(args[1]));
90            Arrays.stream(kvStr.split("\n")).forEach(kv -> {
91                String[] variable = kv.split("=");
92                variables.put(variable[0], variable[1]);
93            });
94        } catch (Exception e){
95            logger.error("decode job variables failed", e);
96            throw new Exception(e);
97        }
98        /** 为配置参数赋值 */
99        String sourceBootstrapServer = variables.get("bootStrapServer");
100        String sourceTopic = variables.get("topic");
101        String sourceGroupId = variables.get("groupId");
102        String certBosUrl = variables.get("certBosUrl");
103        String bosSink = variables.get("bosSink");
104        String bosDatatimePattern = variables.get("bosDatatimePattern");
105
106        // 2. create StreamExecutionEnvironment
107        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
108        /** 配置一些运行参数,如检查点参数 */
109        env.setStateBackend(new FsStateBackend(checkpointLocation));
110        env.enableCheckpointing(1000);
111        env.getCheckpointConfig().setCheckpointInterval(1000);
112        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
113        env.getCheckpointConfig().enableExternalizedCheckpoints(
114                CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
115
116        // 3. 创建 kafka streaming Source
117        // 设置 kafka consumer 参数,并读取 client.properties 中的配置
118        Properties sourceProp = new Properties();
119        sourceProp.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, sourceBootstrapServer);
120        sourceProp.setProperty(ConsumerConfig.GROUP_ID_CONFIG, sourceGroupId);
121        ClassLoader classLoader = Kafka2Bos.class.getClassLoader();
122        sourceProp.load(classLoader.getResourceAsStream("client.properties"));
123        /**
124         * 提取kafka value中的数据字段, 配置用户自定义schema
125         * 使用RowTypeInfo的形式来配置所有的类型
126         *
127         * 原始value数据格式由用户上游kafka自定义,只需要能在以下逻辑中可以正确提取数据即可。
128         * 以json格式为例
129         * {
130         *    "stringtype": "lRAhSQgShKn77uD",
131         *    "longtype": 1199158871,
132         *    "floattype": 0.038981155578358462,
133         *    "binarytype": "null",
134         *    "integertype": 1,
135         *    "bytetype": -58,
136         *    "booleantype": true,
137         *    "doubletype": 439147658,
138         *    "shorttype": 13538
139         *  }
140         */
141        List<DataType> dataTypes = Arrays.asList(
142                DataTypes.STRING(),
143                DataTypes.BIGINT(),
144                DataTypes.FLOAT(),
145                DataTypes.BYTES(),
146                DataTypes.INT(),
147                DataTypes.TINYINT(),
148                DataTypes.BOOLEAN(),
149                DataTypes.DOUBLE(),
150                DataTypes.SMALLINT());
151        TypeInformation[] types = dataTypes.stream()
152                .map(type -> TypeConversions.fromDataTypeToLegacyInfo(type))
153                .collect(Collectors.toList())
154                .toArray(new TypeInformation[dataTypes.size()]);
155        String[] filedNames = Arrays.asList(
156                    "stringtype",
157                    "longtype",
158                    "floattype",
159                    "binarytype",
160                    "integertype",
161                    "bytetype",
162                    "booleantype",
163                    "doubletype",
164                    "shorttype")
165                .toArray(new String[dataTypes.size()]);
166        RowTypeInfo typeInfo = new RowTypeInfo(types, filedNames);
167        JsonRowDeserializationSchema schema = new JsonRowDeserializationSchema.Builder(typeInfo).build();
168
169        /** 创建 FlinkKafkaConsumer,使用0.10版本 */
170        FlinkKafkaConsumer010<Row> flinkKafkaConsumer = new FlinkKafkaConsumer010<Row>(sourceTopic, schema, sourceProp) {
171            @Override
172            public void open(Configuration configuration) throws Exception {
173                this.properties.putAll(copySslFileAndGetLocalProperties(sourceProp, certBosUrl));
174                super.open(configuration);
175            }
176        };
177        flinkKafkaConsumer.setStartFromEarliest();
178        DataStream<Row> source = env.addSource(flinkKafkaConsumer);
179
180        // 4. 进行一些数据处理pipeline
181        // 此处举例为只提取需要的字段, 并转换json为字符串
182        /**
183         * 输出数据全部字段,但有输出转储的只有如下三个,分别为:
184         * {
185         *    "stringtype": "lRAhSQgShKn77uD-1",
186         *    "longtype": 1199158871,
187         *    "floattype": 0.038981155578358462
188         * }
189         */
190        DataStream<String> operatorDataStream = source.map(new MapFunction<Row, KafkaValue>() {
191            @Override
192            public KafkaValue map(Row row) throws Exception {
193                KafkaValue kafkaValue = new KafkaValue();
194                kafkaValue.setStringtype(row.getField(0).toString() + "-" + row.getField(4).toString());
195                kafkaValue.setLongtype((long)row.getField(1));
196                kafkaValue.setFloattype((float)row.getField(2));
197                return kafkaValue;
198            }
199        }).map(new MapFunction<KafkaValue, String>() {
200            @Override
201            public String map(KafkaValue kafkaValue) throws Exception {
202                return objectMapper.writeValueAsString(kafkaValue);
203            }
204        });
205
206        // 5. 创建 bos streaming Sink
207        // 这里使用了按照时间分桶的策略,并设置了滚动策略,配置了一些参数
208        StreamingFileSink.RowFormatBuilder bosStreamingFileSinkBuilder = StreamingFileSink
209                .forRowFormat(new Path(bosSink), new SimpleStringEncoder<>())
210                .withBucketAssigner(new DateTimeBucketAssigner(bosDatatimePattern))
211                .withRollingPolicy(DefaultRollingPolicy.builder()
212                        .withRolloverInterval(TimeUnit.MINUTES.toMillis(1))
213                        .withInactivityInterval(TimeUnit.MINUTES.toMillis(1))
214                        .withMaxPartSize(128L * 1024L * 1024L)
215                        .build())
216                .withBucketCheckInterval(10000);
217        StreamingFileWriter fileWriter = new StreamingFileWriter(
218                bosStreamingFileSinkBuilder.getBucketCheckInterval(), bosStreamingFileSinkBuilder);
219        // 设置了commit算子,用于标志该时间段数据已经写完
220        DataStream<StreamingFileCommitter.CommitMessage> writerStream = operatorDataStream
221                .transform(StreamingFileWriter.class.getSimpleName(),
222                        TypeExtractor.createTypeInfo(CommitMessage.class),
223                        (OneInputStreamOperator) fileWriter)
224                .setParallelism(1);
225
226        writerStream.addSink(new DiscardingSink()).setParallelism(1);
227
228        // 6. 启动执行
229        env.execute("flink-kafka-to-bos-jar-demo");
230    }
231
232
233    /**
234     * kafka value 对应的对象格式
235     */
236    @Getter
237    @Setter
238    public static class KafkaValue {
239        private String stringtype;
240        private Long longtype;
241        private Float floattype;
242        private String binarytype;
243        private Integer integertype;
244        private Byte bytetype;
245        private Boolean booleantype;
246        private Double doubletype;
247        private Short shorttype;
248    }
249
250    /**
251     * 从bos下载ssl证书压缩包,解压后加入到配置中
252     * 注意:需要先在bos上创建好对应的目录,并设置权限为公共读,并且传入的url应该为普通访问链接,而非CDN加速链接
253     * @param properties
254     * @return
255     */
256    private static Properties copySslFileAndGetLocalProperties(Properties properties, String certBosUrl) {
257        String userDir = System.getProperty("user.dir");
258        /** 尝试3次,如果失败则抛出异常 */
259        int i = 0;
260        for (; i < 3; i++) {
261            try {
262                FileUtil.downloadBosFileAndUnzip(userDir, certBosUrl);
263                break;
264            } catch (IOException e) {
265                logger.error("download bos file fail when try: {}", i, e);
266            }
267        }
268        if (i >= 3) {
269            throw new RuntimeException("download bos file fail");
270        }
271        properties.setProperty("ssl.truststore.location", userDir + "/" + properties.getProperty("ssl.truststore.location"));
272        properties.setProperty("ssl.keystore.location", userDir + "/" + properties.getProperty("ssl.keystore.location"));
273        logger.info("ssl.truststore.location: " + properties.getProperty("ssl.truststore.location"));
274        logger.info("ssl.keystore.location: " + properties.getProperty("ssl.keystore.location"));
275        return properties;
276    }
277
278    /**
279     * bos文件下载工具类
280     */
281    public static class FileUtil {
282
283        private static final String PATH_SEPARATOR = "/";
284
285        /**
286         * 通过url下载bos上的xx.tar.gz(设置为公共读权限),
287         * 并解压至指定路径生成一个文件目录
288         * @param outputDir
289         * @param url
290         */
291        public static void downloadBosFileAndUnzip(String outputDir, String url) throws IOException {
292            URLConnection connection = new URL(url).openConnection();
293            ZipInputStream zipIn = null;
294            try{
295                zipIn = new ZipInputStream(
296                        new BufferedInputStream(connection.getInputStream()));
297                ZipEntry entry;
298                while ((entry = zipIn.getNextEntry()) != null ) {
299                    if (entry.isDirectory()) {
300                        createDirectory(outputDir, entry.getName());
301                    } else {
302                        File tmpFile = new File(outputDir + PATH_SEPARATOR + entry.getName());
303                        OutputStream out = null;
304                        try{
305                            out = new FileOutputStream(tmpFile);
306                            int length = 0;
307                            byte[] b = new byte[2048];
308                            while ((length = zipIn.read(b)) != -1) {
309                                out.write(b, 0, length);
310                            }
311                        } catch (IOException ex){
312                            logger.error("write to {} fail", tmpFile,  ex);
313                        } finally {
314                            if (out != null) {
315                                out.close();
316                            }
317                        }
318                    }
319                }
320            } catch (IOException ex){
321                throw new IOException("解压归档文件出现异常",ex);
322            } finally {
323                try {
324                    if (zipIn != null) {
325                        zipIn.close();
326                    }
327                } catch (IOException ex){
328                    throw new IOException("关闭tarFile出现异常",ex);
329                }
330            }
331        }
332
333        /**
334         * 构建证书目录
335         * @param outputDir
336         * @param subDir
337         */
338        public static void createDirectory(String outputDir, String subDir){
339            File file = new File(outputDir + PATH_SEPARATOR + subDir);
340            if (!file.exists()) {
341                file.mkdir();
342            }
343        }
344
345    }
346}

项目打包

执行 mvn clean package 命令之后,能编译出用于bsc运行的jar包 flink-demo-1.0-SNAPSHOT-jar-with-dependencies.jar

image.png

2. 新增资源

进入BSC控制台,选择资源管理,点击新增资源按钮。资源类型需要选择为JOB_FILE/JAR,上传方式与jar包大小相关,可以选择bos上传或者本地上传。

image.png

上传完成之后效果如下:

image.png

3. 新增作业

打开BSC控制台,点击新建作业按钮,新建一个FLINK_STREAM/JAR作业如下图示例。

image.png

编辑作业参数

在作业开发的富文本编辑框中,配置SPARK jar的参数信息如示例:

Bash
1-- 函数完整类名
2main.class=com.baidu.bce.bsc.demo.flink.Kafka2Bos;
3 
4-- 完整主类名JAR包的资源名称
5main.jar=flink-demo-1.0-SNAPSHOT-jar-with-dependencies.jar;
6 
7-- 函数参数设置,必须以“main.args.”开头
8main.args.bootStrapServer=kafka.gz.baidubce.com:9092;
9main.args.topic=topictopictopic;
10main.args.groupId=groupgroupgroup;
11main.args.certBosUrl=https://bucket.bj.bcebos.com/kafka-key.zip;
12main.args.bosSink=bos://bucket/object;
13main.args.bosDatatimePattern=yyyy-MM-dd--HH;

引用jar资源

在资源引用栏中选择刚才上传的jar包,点击引用;并将资源详情中的资源原名作为作业参数main.jar的参数

image.png

image.png

保存发布作业

依次点击保存、发布按钮,将作业发布到作业运维列表。

4. 运行作业

切换到作业运维的详情页面,点击启动,选择相应的网络参数并申请启动资源,点击确认。如果使用了检查点,可以选择从上次作业停止时间点启动

image.png

启动之后,可以看到作业运行日志,但jar作业不支持实时监控的查看。

5. 更新作业

如果需要更新作业jar包,需要按照如下步骤执行:

  1. 停止运行中的作业。
  2. 在资源管理列表对相应的jar包发起"新增版本"操作。
  3. 在作业开发页面对相应作业的资源引用执行"解绑-引用"操作。
  4. 在作业开发页面修改作业参数,对作业执行"保存-发布"操作。
  5. 重启启动作业。

如果仅仅是需要修改作业参数,可以简化步骤为:

  1. 停止运行中的作业。
  2. 在作业开发页面修改作业参数,对作业执行"保存-发布"操作。
  3. 重启启动作业。

上一篇
常见问题
下一篇
Spark自定义JAR作业