对接spark-sql

时序时空数据库 TSDB

  • 功能发布记录
  • 产品描述
    • 名词解释
    • 产品概述
    • 数据结构
    • 产品优势
    • 产品功能
    • 系统限制
  • Python-SDK
    • 管理接口
    • 生成查询数据点的预签名URL
    • 写入数据点的Gzip压缩说明
    • 查询操作
    • 写入操作
    • 创建TsdbClient
    • Demo工程下载
    • 版本说明
    • 安装SDK工具包
    • 概述
  • 申请邀请
    • 申请成为邀测用户
  • Java-SDK
    • 管理接口
    • 生成查询数据点的预签名URL
    • 写入数据点的gzip压缩说明
    • 查询操作
    • 写入操作
    • 创建TsdbClient
    • Demo工程下载
    • 版本说明
    • 快速入门
    • 安装SDK工具包
    • 概述
  • SQL参考
    • 支持SQL查询
    • 对接hive-sql
    • 支持MySQL协议
    • 对接spark-sql
  • 快速入门
    • 使用API入门
    • 创建数据库
    • 通过查询面板生成图表
    • 连接数据库
    • 概述
  • 典型实践
    • 物联网设备状态监控存储分析
    • 互联网业务性能监控服务
  • 操作指南
    • 多用户访问控制
    • 数据预处理
    • 时空服务
    • 数据管理
    • 与天工产品对接
    • 数据可视化
    • 数据库管理
    • 插值查询
  • 服务等级协议SLA
    • 时序数据库TSDB服务等级协议SLA(V2.0)
  • Node-SDK
    • 管理接口
    • 生成查询数据点的预签名URL
    • 写入数据点的gzip压缩说明
    • 查询操作
    • 写入操作
    • 创建TsdbClient
    • Demo工程下载
    • 版本说明
    • 快速入门
    • 安装SDK工具包
    • 错误码
    • 概述
  • API参考
    • 介绍
    • 聚合函数
    • 时间单位
    • 附录
    • 更新历史
    • 数据API接口说明
    • 管理API接口说明
    • 分组方式
  • 常见问题
    • 数据库创建及设置
    • 数据点查询
    • 数据管理
    • 用量提示
    • 常见问题总览
    • 数据点写入
  • 产品定价
    • 到期停服处理
    • 预付费
所有文档
menu
没有找到结果,请重新输入

时序时空数据库 TSDB

  • 功能发布记录
  • 产品描述
    • 名词解释
    • 产品概述
    • 数据结构
    • 产品优势
    • 产品功能
    • 系统限制
  • Python-SDK
    • 管理接口
    • 生成查询数据点的预签名URL
    • 写入数据点的Gzip压缩说明
    • 查询操作
    • 写入操作
    • 创建TsdbClient
    • Demo工程下载
    • 版本说明
    • 安装SDK工具包
    • 概述
  • 申请邀请
    • 申请成为邀测用户
  • Java-SDK
    • 管理接口
    • 生成查询数据点的预签名URL
    • 写入数据点的gzip压缩说明
    • 查询操作
    • 写入操作
    • 创建TsdbClient
    • Demo工程下载
    • 版本说明
    • 快速入门
    • 安装SDK工具包
    • 概述
  • SQL参考
    • 支持SQL查询
    • 对接hive-sql
    • 支持MySQL协议
    • 对接spark-sql
  • 快速入门
    • 使用API入门
    • 创建数据库
    • 通过查询面板生成图表
    • 连接数据库
    • 概述
  • 典型实践
    • 物联网设备状态监控存储分析
    • 互联网业务性能监控服务
  • 操作指南
    • 多用户访问控制
    • 数据预处理
    • 时空服务
    • 数据管理
    • 与天工产品对接
    • 数据可视化
    • 数据库管理
    • 插值查询
  • 服务等级协议SLA
    • 时序数据库TSDB服务等级协议SLA(V2.0)
  • Node-SDK
    • 管理接口
    • 生成查询数据点的预签名URL
    • 写入数据点的gzip压缩说明
    • 查询操作
    • 写入操作
    • 创建TsdbClient
    • Demo工程下载
    • 版本说明
    • 快速入门
    • 安装SDK工具包
    • 错误码
    • 概述
  • API参考
    • 介绍
    • 聚合函数
    • 时间单位
    • 附录
    • 更新历史
    • 数据API接口说明
    • 管理API接口说明
    • 分组方式
  • 常见问题
    • 数据库创建及设置
    • 数据点查询
    • 数据管理
    • 用量提示
    • 常见问题总览
    • 数据点写入
  • 产品定价
    • 到期停服处理
    • 预付费
  • 文档中心
  • arrow
  • 时序时空数据库TSDB
  • arrow
  • SQL参考
  • arrow
  • 对接spark-sql
本页目录
  • 1 BMR spark sql
  • 1.1 Spark-tsdb-connector
  • 1.2 作业程序
  • 1.2.1 查询tsdb的通用作业程序
  • 1.2.2 更多参数的作业程序
  • 1.3 创建bmr spark集群
  • 1.4 创建作业
  • 1.5 场景示例
  • 1.5.1 计算风速
  • 1.5.2 计算车辆在时间上的使用情况

对接spark-sql

更新时间:2025-08-22

1 BMR spark sql

1.1 Spark-tsdb-connector

TSDB对接spark sql是通过实现org.apache.spark.rdd.RDD(即Resilient Distributed Dataset)和一些相关的接口,方便用户通过spark来查询TSDB的数据。

Jar下载地址:http://tsdb-bos.gz.bcebos.com/spark-tsdb-connector-all.jar

如果是本地spark集群,请下载jar到本地;如果使用bmr,则上传到bos或者直接使用地址bos://iot-tsdb/spark-tsdb-connector-all.jar。

支持的版本:spark 2.1.0,jdk 1.7。

1.2 作业程序

1.2.1 查询tsdb的通用作业程序

Main class:

Plain Text
1package com.baidu.cloud.bmr.spark;
2 
3import org.apache.spark.SparkConf;
4import org.apache.spark.api.java.JavaSparkContext;
5import org.apache.spark.sql.Dataset;
6import org.apache.spark.sql.Row;
7import org.apache.spark.sql.SQLContext;
8 
9public class TsdbSparkSql {
10 
11    public static void main(String[] args) {
12        if (args.length != 6) {
13            System.err.println("usage: spark-submit com.baidu.cloud.bmr.spark.TsdbSparkSql <endpoint> <ak> <sk> "
14                    + "<metric> <sql> <output>");
15            System.exit(1);
16        }
17        String endpoint = args[0];
18        String ak = args[1];
19        String sk = args[2];
20        String metric = args[3];
21        String sql = args[4];
22        String output = args[5];
23 
24        SparkConf conf = new SparkConf().setAppName("TsdbSparkSql");
25        JavaSparkContext sc = new JavaSparkContext(conf);
26        SQLContext sqlContext = new SQLContext(sc);
27        Dataset<Row> dataset = sqlContext.read()
28                .format("tsdb")                 // 设置为tsdb源
29                .option("endpoint", endpoint)   // tsdb实例endpoint
30                .option("access_key", ak)       // AK
31                .option("secret_key", sk)       // SK
32                .option("metric_name", metric)  // 对应的metric
33                .load();
34        dataset.registerTempTable(metric);
35        sqlContext.sql(sql).rdd().saveAsTextFile(output);   // 执行sql并存储到output中
36    }
37 
38}

endpoint为IP:PORT格式的情形下:

Plain Text
1package com.baidu.cloud.bmr.spark;
2 
3import org.apache.spark.SparkConf;
4import org.apache.spark.api.java.JavaSparkContext;
5import org.apache.spark.sql.Dataset;
6import org.apache.spark.sql.Row;
7import org.apache.spark.sql.SQLContext;
8 
9public class TsdbSparkSql {
10 
11    public static void main(String[] args) {
12        if (args.length != 8) {
13            System.err.println("usage: spark-submit com.baidu.cloud.bmr.spark.TsdbSparkSql <endpoint> <host> <grpc_port> <ak> <sk> "
14                    + "<metric> <sql> <output>");
15            System.exit(1);
16        }
17        String endpoint = args[0];
18        String host = args[1];
19        String grpcPort = args[2];
20        String ak = args[3];
21        String sk = args[4];
22        String metric = args[5];
23        String sql = args[6];
24        String output = args[7];
25        SparkConf conf = new SparkConf().setAppName("TsdbSparkSql");
26        JavaSparkContext sc = new JavaSparkContext(conf);
27        SQLContext sqlContext = new SQLContext(sc);
28        Dataset<Row> dataset = sqlContext.read()
29                .format("tsdb")                 // 设置为tsdb源
30                .option("endpoint", endpoint)   // tsdb实例endpoint
31                .option("host", host)           // host
32                .option("grpc_port", grpcPort)  // grpc port
33                .option("access_key", ak)       // AK
34                .option("secret_key", sk)       // SK
35                .option("metric_name", metric)  // 对应的metric
36                .load();
37        dataset.registerTempTable(metric);
38        sqlContext.sql(sql).rdd().saveAsTextFile(output);   // 执行sql并存储到output中
39
40    }
41}

依赖:

Plain Text
1<dependency>
2    <groupId>org.apache.spark</groupId>
3    <artifactId>spark-sql_2.10</artifactId>
4    <version>2.1.2</version>
5</dependency>

需要将程序打包为jar文件,放入bos中,在配置作业时需要用到该文件的bos路径。

1.2.2 更多参数的作业程序

Plain Text
1Main class:
2
3package com.baidu.cloud.bmr.spark;
4 
5import static org.apache.spark.sql.types.DataTypes.DoubleType;
6import static org.apache.spark.sql.types.DataTypes.LongType;
7import static org.apache.spark.sql.types.DataTypes.StringType;
8 
9import org.apache.spark.SparkConf;
10import org.apache.spark.api.java.JavaSparkContext;
11import org.apache.spark.sql.Dataset;
12import org.apache.spark.sql.Row;
13import org.apache.spark.sql.SQLContext;
14import org.apache.spark.sql.types.Metadata;
15import org.apache.spark.sql.types.StructField;
16import org.apache.spark.sql.types.StructType;
17 
18public class TsdbSparkSqlMoreOptions {
19 
20    public static void main(String[] args) {
21        if (args.length != 6) {
22            System.err.println("usage: spark-submit com.baidu.cloud.bmr.spark.TsdbSparkSqlMoreOptions <endpoint> <ak> <sk> "
23                    + "<metric> <sql> <output>");
24            System.exit(1);
25        }
26        String endpoint = args[0];
27        String ak = args[1];
28        String sk = args[2];
29        String metric = args[3];
30        String sql = args[4];
31        String output = args[5];
32 
33        SparkConf conf = new SparkConf().setAppName("TsdbSparkSqlMoreOptions");
34        JavaSparkContext sc = new JavaSparkContext(conf);
35        SQLContext sqlContext = new SQLContext(sc);
36        StructType schema = new StructType(new StructField[] {
37                new StructField("time", LongType, false, Metadata.empty()),     // 设置time列,为long
38                new StructField("value", DoubleType, false, Metadata.empty()),  // 设置value列,为double
39                new StructField("city", StringType, false, Metadata.empty())    // 设置city列,为string
40        });
41        Dataset<Row> dataset = sqlContext.read()
42                .format("tsdb")                 // 设置为tsdb源
43                .schema(schema)                 // 设置自定义schema
44                .option("endpoint", endpoint)   // tsdb实例endpoint
45                .option("access_key", ak)       // AK
46                .option("secret_key", sk)       // SK
47                .option("metric_name", metric)  // 对应的metric
48                .option("field_names", "value") // schema中属于field的列名,用逗号分割,如"field1, field2",表示有两个field分别为field1,field2
49                .option("tag_names", "city")    // 指定schema中的tag名,用逗号分割,"city,latitude",表示有两个tag分别为city,latitude
50                .option("split_number", "10")   // 设置split的个数,split数据时会尽量与split number接近。
51                .load();
52        dataset.registerTempTable(metric);
53        sqlContext.sql(sql).rdd().saveAsTextFile(output);
54    }
55 
56}

依赖:

Plain Text
1<dependency>
2    <groupId>org.apache.spark</groupId>
3    <artifactId>spark-sql_2.10</artifactId>
4    <version>2.1.2</version>
5</dependency>

1.3 创建bmr spark集群

在使用BMR时,强烈建议您先阅读BMR文档.

选择BMR1.1.0版本,并选择spark 2.1.0。

1.4 创建作业

作业配置如下:

Plain Text
1应用程序位置:bos://<tsdb-spark-sql-sample>.jar
2 
3Spark-submit:--class com.baidu.cloud.bmr.spark.TsdbSparkSql --jars bos://<to>/spark-tsdb-connector-all.jar
4 
5应用程序参数:<databaseName>.<databaseId>.tsdb.iot.gz.baidubce.com <AK> <SK> <metric> "select count(1) from <metric>" "bos://<to>/output/data"

需要注意的是:

  • 应用程序配置其实是与2.1中作业程序相关的,请根据自己的作业程序来配置;
  • Spark-submit中记得需要最后的“--jars”参数不能省略,需要指定为1.1中tsdb的connector。

1.5 场景示例

1.5.1 计算风速

风速数据由传感器定时上传到tsdb中,数据包含两个field分别为x和y,表示x轴和y轴方向的风速,如下由两个垂直方向的风速来计算出总的风速。

Plain Text
1Main class:
2
3package com.baidu.cloud.bmr.spark;
4 
5import static org.apache.spark.sql.types.DataTypes.DoubleType;
6import static org.apache.spark.sql.types.DataTypes.LongType;
7 
8import org.apache.spark.SparkConf;
9import org.apache.spark.api.java.JavaSparkContext;
10import org.apache.spark.sql.Dataset;
11import org.apache.spark.sql.Row;
12import org.apache.spark.sql.SQLContext;
13import org.apache.spark.sql.types.Metadata;
14import org.apache.spark.sql.types.StructField;
15import org.apache.spark.sql.types.StructType;
16 
17public class WindSpeed {
18 
19    public static void main(String[] args) {
20        String endpoint = "<endpoint>";
21        String ak = "<AK>";
22        String sk = "<SK>";
23        String metric = "WindSpeed";
24        String output = "bos://<to>/output/data";
25 
26        SparkConf conf = new SparkConf().setAppName("TsdbSparkSqlMoreOptions");
27        JavaSparkContext sc = new JavaSparkContext(conf);
28        SQLContext sqlContext = new SQLContext(sc);
29        StructType schema = new StructType(new StructField[] {
30                new StructField("time", LongType, false, Metadata.empty()),  // 设置time列,为long
31                new StructField("x", DoubleType, false, Metadata.empty()),   // 设置x列,为double
32                new StructField("y", DoubleType, false, Metadata.empty())    // 设置y列,为double
33        });
34        Dataset<Row> dataset = sqlContext.read()
35                .format("tsdb")                 // 设置为tsdb源
36                .schema(schema)                 // 设置自定义schema
37                .option("endpoint", endpoint)   // tsdb实例endpoint
38                .option("access_key", ak)       // AK
39                .option("secret_key", sk)       // SK
40                .option("metric_name", metric)  // 对应的metric
41                .option("field_names", "x,y")   // schema中属于field的列名
42                .load();
43        dataset.registerTempTable(metric);
44        sqlContext.sql("select time, sqrt(pow(x, 2) + pow(y, 2)) as speed from WindSpeed")
45                .rdd()
46                .saveAsTextFile(output);
47    }
48 
49}

依赖:

Plain Text
1<dependency>
2    <groupId>org.apache.spark</groupId>
3    <artifactId>spark-sql_2.10</artifactId>
4    <version>2.1.2</version>
5</dependency>

原始数据:

metric:WindSpeed

time field : x field : y
1512086400000 3.0 4.0
1512086410000 1.0 2.0
1512086420000 2.0 3.0

结果:

结果输出到output指定的bos文件夹中,样例如下

Plain Text
1[1512086400000,5.000]
2[1512086410000,2.236]
3[1512086420000,3.606]

1.5.2 计算车辆在时间上的使用情况

车辆在行驶过程中会定时(每10秒)将数据上传到tsdb中,数据中包含车速speed。需要统计三种时长:

(1)停止时长:一段时间内这台车子有上报数据,但是上报的车速显示是0,可能是车子在等红灯。

(2)运行时长:一段时间内这台车子有上报数据,且上报的车速显示大于0,这台车子正在行驶中。

(3)离线时长:一段时间内这台车子没有上报数据的时长,这台车子已经停下并熄火了。

Plain Text
1Main class:
2
3package com.baidu.cloud.bmr.spark;
4 
5import static org.apache.spark.sql.types.DataTypes.LongType;
6import static org.apache.spark.sql.types.DataTypes.StringType;
7 
8import org.apache.spark.SparkConf;
9import org.apache.spark.api.java.JavaSparkContext;
10import org.apache.spark.sql.Dataset;
11import org.apache.spark.sql.Row;
12import org.apache.spark.sql.SQLContext;
13import org.apache.spark.sql.types.Metadata;
14import org.apache.spark.sql.types.StructField;
15import org.apache.spark.sql.types.StructType;
16 
17public class VehicleSpeed {
18 
19    public static void main(String[] args) {
20        String endpoint = "<endpoint>";
21        String ak = "<AK>";
22        String sk = "<SK>";
23        String metric = "vehicle";
24        String output = "bos://<to>/output/data";
25 
26        SparkConf conf = new SparkConf().setAppName("TsdbSparkSqlMoreOptions");
27        JavaSparkContext sc = new JavaSparkContext(conf);
28        SQLContext sqlContext = new SQLContext(sc);
29        StructType schema = new StructType(new StructField[] {
30                new StructField("time", LongType, false, Metadata.empty()),     // 设置time列,为long
31                new StructField("speed", LongType, false, Metadata.empty()),    // 设置speed列,为long
32                new StructField("carId", StringType, false, Metadata.empty())   // 设置carId列,为string
33        });
34        Dataset<Row> dataset = sqlContext.read()
35                .format("tsdb")                 // 设置为tsdb源
36                .schema(schema)                 // 设置自定义schema
37                .option("endpoint", endpoint)   // tsdb实例endpoint
38                .option("access_key", ak)       // AK
39                .option("secret_key", sk)       // SK
40                .option("metric_name", metric)  // 对应的metric
41                .option("field_names", "speed") // schema中属于field的列名
42                .option("tag_names", "cardId")  // 指定tag名
43                .load();
44        dataset.registerTempTable(metric);
45        sqlContext.sql("select floor((time - 1512057600000) / 86400000) + 1 as day, count(*) * 10 as stop_seconds"
46                + " from vehicle where carId='123' and time >= 1512057600000 and time < 1514736000000 and speed = 0"
47                + " group by floor((time - 1512057600000) / 86400000)")
48                .rdd()
49                .saveAsTextFile(output + "/stopSeconds");
50        sqlContext.sql("select floor((time - 1512057600000) / 86400000) + 1 as day, count(*) * 10 as run_seconds"
51                + " from vehicle where carId='123' and time >= 1512057600000 and time < 1514736000000 and speed > 0"
52                + " group by floor((time - 1512057600000) / 86400000)")
53                .rdd()
54                .saveAsTextFile(output + "/runSeconds");
55        sqlContext.sql("select floor((time - 1512057600000) / 86400000) + 1 as day, 2678400 - count(*) * 10 as"
56                + " offline_seconds from vehicle where carId='123' and time >= 1512057600000 and time < 1514736000000"
57                + " group by floor((time - 1512057600000) / 86400000)")
58                .rdd()
59                .saveAsTextFile(output + "/offlineSeconds");
60    }
61 
62}

依赖:

Plain Text
1<dependency>
2    <groupId>org.apache.spark</groupId>
3    <artifactId>spark-sql_2.10</artifactId>
4    <version>2.1.2</version>
5</dependency>

原始数据

metric : vehicle

time field : speed tag
1512057600000 40 carId=123
1512057610000 60 carId=123
1512057620000 50 carId=123
... ... ... carId=123
1514721600000 10 carId=123

结果

结果输出到output指定的bos文件夹中,样例如下:

Plain Text
1[1,3612]
2[2,3401]
3...
4[31,3013]
5
6[1,17976]
7[2,17968]
8...
9[31,17377]
10
11[1,64812]
12[2,65031]
13...
14[31,66010]

上一篇
支持MySQL协议
下一篇
快速入门