Overview

百度流式计算 BSC

  • 产品定价
  • 功能发布记录
  • 产品描述
    • 产品优势
    • 应用场景
    • 产品功能
    • 核心概念
    • 概述
  • 快速入门
    • 开通服务
    • 开发作业
  • 典型实践
    • CDN 日志提取中转(ETL)
    • API 日志调用统计
    • CDN 接口日志聚合统计
    • 物联网设备实时报警统计(流表Join)
    • 物设备报警情况实时统计
    • 物联网设备实时监控预警
  • 操作指南
    • 多用户访问控制
    • 作业运维
    • 扩展内置函数
    • 编辑作业
    • 模板管理
    • 新增作业
    • 资源管理
  • API参考
    • 接口概述
    • 通用说明
    • 公共头
    • 模板相关接口
    • 模型定义
    • 服务域名
    • 请求返回格式
    • 作业实例相关接口
    • 作业相关接口
    • 资源相关接口
  • 常见问题
    • 一般问题
  • 自定义JAR作业
    • Flink自定义JAR作业
    • Spark自定义JAR作业
  • SQL
    • DML语句
    • SET 语句
    • 内置函数
    • 标识符和关键字
    • 窗口函数
    • 概述
    • DDL 语句
      • KAFKA
      • Formats
      • ES
      • Overview
      • PALO
      • TSDB
      • BKAFKA
      • RDS
      • BOS
      • MQTT
所有文档
menu
没有找到结果,请重新输入

百度流式计算 BSC

  • 产品定价
  • 功能发布记录
  • 产品描述
    • 产品优势
    • 应用场景
    • 产品功能
    • 核心概念
    • 概述
  • 快速入门
    • 开通服务
    • 开发作业
  • 典型实践
    • CDN 日志提取中转(ETL)
    • API 日志调用统计
    • CDN 接口日志聚合统计
    • 物联网设备实时报警统计(流表Join)
    • 物设备报警情况实时统计
    • 物联网设备实时监控预警
  • 操作指南
    • 多用户访问控制
    • 作业运维
    • 扩展内置函数
    • 编辑作业
    • 模板管理
    • 新增作业
    • 资源管理
  • API参考
    • 接口概述
    • 通用说明
    • 公共头
    • 模板相关接口
    • 模型定义
    • 服务域名
    • 请求返回格式
    • 作业实例相关接口
    • 作业相关接口
    • 资源相关接口
  • 常见问题
    • 一般问题
  • 自定义JAR作业
    • Flink自定义JAR作业
    • Spark自定义JAR作业
  • SQL
    • DML语句
    • SET 语句
    • 内置函数
    • 标识符和关键字
    • 窗口函数
    • 概述
    • DDL 语句
      • KAFKA
      • Formats
      • ES
      • Overview
      • PALO
      • TSDB
      • BKAFKA
      • RDS
      • BOS
      • MQTT
  • 文档中心
  • arrow
  • 百度流式计算BSC
  • arrow
  • SQL
  • arrow
  • DDL 语句
  • arrow
  • Overview
本页目录
  • 支持的 Connectors
  • 如何使用 Connector
  • 字段类型
  • 时间属性
  • 属性如何支持 EVENTTIME 和 PROCTIME
  • EVENTTIME支持的时间字段类型及其对应参数设置
  • 日期格式对照标配表
  • 带有EVENTTIME的 Connector
  • 带有 PROCTIME 的 Connector

Overview

更新时间:2025-08-21

支持的 Connectors

服务类型 SPARK FLINK
Source Sink Source Sink
KAFKA Y Y Y Y
BOS Y Y Y Y
MQTT Y Y Y
RDS Y Y Y Y
ES Y Y
PALO Y Y Y
TSDB Y Y

如何使用 Connector

SQL
1CREATE TABLE source_kafka_table (
2    `field01` STRING,
3    `field02` BIGINT,
4    `field03` FLOAT,
5    `field04` BINARY,
6    `field05` INT,
7    `field06` TINYINT,
8    `field07` BOOLEAN,
9    `field08` DATA,
10    `field09` DOUBLE,
11    `field10` SMALLINT
12) WITH (
13    'connector.type' = 'KAFKA',
14    'connector.topic' = 'xxxxxxxxxxxx__bsc-test-source',
15    'format.encode' = 'JSON',
16    'connector.properties.bootstrap.servers' = 'kafka.gz.baidubce.com:9092',
17    'connector.properties.ssl.filename' = 'kafka-key_gz.zip',
18    'connector.properties.group.id' = 'bsc123',
19    'connector.read.startup.mode' = 'earliest'
20);

字段类型

TYPE SPARK FLINK MAPPING
TINYINT Y Y BTYE / TINYINT
SMALLINT Y Y SHORT / SMALLINT
INT Y Y INT / INTEGER
BIGINT Y Y LONG / BIGINT
FLOAT Y Y FLOAT
DOUBLE Y Y DOUBLE
STRING Y Y STRING / CHAR / VARCHAR
BINARY Y Y BINARY / VARBINARY / BYTES
BOOLEAN Y Y BOOLEAN / BOOL
TIMESTAMP Y Y TIMESTAMP / SQL_TIMESTAMP
DECIMAL Y Y DECIMAL
DATE Y Y DATE / LOCAL_DATE
TIME Y TIME / LOCAL_TIME
ARRAY Y Y ARRAY
MAP Y Y MAP
ROW Y ROW
STRUCT Y STRUCT

时间属性

注意:SPARK 仅仅支持指定 source table 中某一时间类型的列作为 watermark 处理窗口的时间

属性如何支持 EVENTTIME 和 PROCTIME

属性名称 说明 EVENTTIME 设置举例 PROCTIME 设置举例
watermark.field 使用事件时间的字段作为时间提取 'field' ''
watermark.threshold 时间窗口的允许最大延迟设置 '2 seconds',支持单位有:milliseconds,seconds,minutes,hours ''
watermark.field.alias SQL正文中使用的时间别名 'alias' 'proctime'
watermark.field.pattern 设置日期模式进行转换时间戳 'yyyy-MM-dd HH:mm:ss' ''
watermark.field.timezone 设置日期模式进行转换时区 'Asia/Shanghai' ''

EVENTTIME支持的时间字段类型及其对应参数设置

字段类型 watermark.field.pattern watermark.field.timezone 说明
BIGINT 's'、'ms'、'second'、'millisecond' '' 使用的字段为LONG,转化为毫秒
STRING 'yyyy-MM-dd HH:mm:ss' 'Asia/Shanghai' 使用的字段能够通过指定的模式转换为日期
TIMESTAMP '' '' 使用的字段,必须符合TZ格式:2018-05-20T00:08:00Z

日期格式对照标配表

pattern timezone
yyyy-MM-dd'T'HH:mm:ss.SSS Asia/Shanghai
yyyy-MM-dd'T'HH:mm:ss.SSS'Z' UTC
yyyy-MM-dd'T'HH:mm:ss Asia/Shanghai
yyyy-MM-dd'T'HH:mm:ss'Z' UTC
yyyy-MM-dd HH:mm:ss.SSS Asia/Shanghai
yyyy-MM-dd HH:mm:ss.SSS'Z' UTC
yyyy-MM-dd HH:mm:ss Asia/Shanghai
yyyy-MM-dd HH:mm:ss'Z' UTC

带有EVENTTIME的 Connector

使用 EVENTTIME 时,需要指定 source 表中某一列作为时间戳,并配置其 watermark 等时间属性参数。

SPARK
1CREATE TABLE source_kafka_table (
2    `field01` STRING,
3    `field02` TIMESTAMP,  -- SPARK 支持窗口数据类型为 TIMESTAMP
4    `field03` FLOAT,
5    `field04` BINARY,
6    `field05` INT,
7    `field06` TINYINT,
8    `field07` BOOLEAN,
9    `field08` DATA,
10    `field09` DOUBLE,
11    `field10` SMALLINT
12) WITH (
13    'connector.type' = 'KAFKA',
14    'format.encode' = 'JSON',
15    'connector.topic' = 'xxxxxxxxxxxx__bsc-test-source',
16    'connector.properties.bootstrap.servers' = 'kafka.bj.baidubce.com:9091',
17    'connector.properties.ssl.filename' = 'kafka-key-bj.zip',
18    'watermark.field' = 'field02',
19    'watermark.threshold' = '10 seconds'
20);
21CREATE TABLE sink_kafka_table (
22    `timestamp` TIMESTAMP,
23    `field01` STRING,
24    `count` BIGINT
25) WITH (
26    'connector.type' = 'KAFKA',
27    'format.encode' = 'JSON',
28    'connector.topic' = 'xxxxxxxxxxxx__bsc-test-sink',
29    'connector.properties.bootstrap.servers' = 'kafka.bj.baidubce.com:9091',
30    'connector.properties.ssl.filename' = 'kafka-key-bj.zip'
31);
32INSERT INTO
33    sink_kafka_table
34SELECT
35    window.start AS `timestamp`,
36    `field01`,
37    COUNT(`field05`) AS `count`
38FROM
39    source_kafka_table
40GROUP BY 
41    window(`field02`, "1 MINUTE"),
42    `field01`

带有 PROCTIME 的 Connector

使用进程的处理时间作为时间戳,FLINK不需要指定 source 表中某一列,只需要加入 SET job.streamTimeType = 'PROCESSTIME' 语句即可。

FLINK
1SET job.stream.timeType = 'PROCESSTIME'; -- 通过 SET 语句指定 Flink 使用 PROCTIME
2CREATE TABLE source_mqtt_table (
3    `field01` STRING,
4    `field02` BIGINT,
5    `field03` FLOAT,
6    `field04` BINARY,
7    `field05` INT,
8    `field06` TINYINT,
9    `field07` BOOLEAN,
10    `field08` DATA,
11    `field09` DOUBLE,
12    `field10` SMALLINT
13) WITH (
14    'connector.type' = 'MQTT',
15    'format.encode' = 'JSON',
16    'connector.url' = 'tcp://xxxxxx.mqtt.iot.gz.baidubce.com:1883',
17    'connector.topic' = 'xxxx',
18    'connector.username' = 'xxxxxxxxx/bsc_test',
19    'connector.password' = 'xxxxxxxx',
20    'connector.semantic' = 'at_least_once'
21);

上一篇
ES
下一篇
PALO