API 日志调用统计

百度流式计算 BSC

  • 产品定价
  • 功能发布记录
  • 产品描述
    • 产品优势
    • 应用场景
    • 产品功能
    • 核心概念
    • 概述
  • 快速入门
    • 开通服务
    • 开发作业
  • 典型实践
    • CDN 日志提取中转(ETL)
    • API 日志调用统计
    • CDN 接口日志聚合统计
    • 物联网设备实时报警统计(流表Join)
    • 物设备报警情况实时统计
    • 物联网设备实时监控预警
  • 操作指南
    • 多用户访问控制
    • 作业运维
    • 扩展内置函数
    • 编辑作业
    • 模板管理
    • 新增作业
    • 资源管理
  • API参考
    • 接口概述
    • 通用说明
    • 公共头
    • 模板相关接口
    • 模型定义
    • 服务域名
    • 请求返回格式
    • 作业实例相关接口
    • 作业相关接口
    • 资源相关接口
  • 常见问题
    • 一般问题
  • 自定义JAR作业
    • Flink自定义JAR作业
    • Spark自定义JAR作业
  • SQL
    • DML语句
    • SET 语句
    • 内置函数
    • 标识符和关键字
    • 窗口函数
    • 概述
    • DDL 语句
      • KAFKA
      • Formats
      • ES
      • Overview
      • PALO
      • TSDB
      • BKAFKA
      • RDS
      • BOS
      • MQTT
所有文档
menu
没有找到结果,请重新输入

百度流式计算 BSC

  • 产品定价
  • 功能发布记录
  • 产品描述
    • 产品优势
    • 应用场景
    • 产品功能
    • 核心概念
    • 概述
  • 快速入门
    • 开通服务
    • 开发作业
  • 典型实践
    • CDN 日志提取中转(ETL)
    • API 日志调用统计
    • CDN 接口日志聚合统计
    • 物联网设备实时报警统计(流表Join)
    • 物设备报警情况实时统计
    • 物联网设备实时监控预警
  • 操作指南
    • 多用户访问控制
    • 作业运维
    • 扩展内置函数
    • 编辑作业
    • 模板管理
    • 新增作业
    • 资源管理
  • API参考
    • 接口概述
    • 通用说明
    • 公共头
    • 模板相关接口
    • 模型定义
    • 服务域名
    • 请求返回格式
    • 作业实例相关接口
    • 作业相关接口
    • 资源相关接口
  • 常见问题
    • 一般问题
  • 自定义JAR作业
    • Flink自定义JAR作业
    • Spark自定义JAR作业
  • SQL
    • DML语句
    • SET 语句
    • 内置函数
    • 标识符和关键字
    • 窗口函数
    • 概述
    • DDL 语句
      • KAFKA
      • Formats
      • ES
      • Overview
      • PALO
      • TSDB
      • BKAFKA
      • RDS
      • BOS
      • MQTT
  • 文档中心
  • arrow
  • 百度流式计算BSC
  • arrow
  • 典型实践
  • arrow
  • API 日志调用统计
本页目录
  • 概览
  • 需求场景
  • 方案概述
  • 配置步骤
  • 定义 KAFKA Source 表
  • 定义 TSDB Sink 表
  • 编写数据统计DML语句
  • 相关产品

API 日志调用统计

更新时间:2025-08-21

概览

用户拥有多台服务器,托管了一些 API 调用服务,现在想统计 API 的调用情况,形成图表。

需求场景

所有机器的 API 调用日志通过 自定义日志采集程序 进行日志采集后推送到 百度消息服务(KAFKA)中作为流式计算 source , 在我们 BSC 中创建 FLINK_STREAM/SQL 类型的作业用于 API 日志的聚合统计,并实时将聚合结果写到 时序时空数据库(TSDB)当中,用户可以通过 TSDB 的可视化面板或者利用 数据可视化工具(如 Sugar BI)等调用 TSDB 的数据 API 完成数据展示。

方案概述

服务器 → 自定义日志采集程序 → KAFKA → BSC → TSDB → Sugar BI

配置步骤

一个完整的 Flink SQL 作业由 source 表、sink 表和 DML 语句构成。

定义 KAFKA Source 表

FLINK
1CREATE TABLE source_kafka_table (
2    `timestamp` BIGINT,
3    `status` INTEGER,
4    `contentLength` BIGINT,
5    `latency` BIGINT,
6    `groupUuid` STRING,
7    `apiUuid` STRING
8) WITH (
9    'connector.type' = 'KAFKA',
10    'format.encode' = 'JSON',
11    'connector.topic' = 'xxxxxxxxx__bsc-source',
12    'connector.properties.bootstrap.servers' = 'kafka.bd.baidubce.com:9071',
13    'connector.properties.ssl.filename' = 'kafka-key_bd.zip',
14    'connector.properties.group.id' = 'test_group',
15    'connector.read.startup.mode' = 'latest',
16    'watermark.field' = 'timestamp',
17    'watermark.threshold' = '1 minutes'
18);

定义 TSDB Sink 表

FLINK
1CREATE TABLE sink_tsdb_table (
2    `datapoints` ARRAY < ROW(
3        `timestamp` BIGINT,
4        `metric` STRING,
5        `value` BIGINT,
6        `tags` MAP < STRING,
7        STRING >
8    ) >
9) WITH (
10    'connector.type' = 'TSDB',
11    'format.encode' = 'JSON',
12    'connector.emit' = 'BATCH',
13    'connector.url' = 'http://xxxxxxx.tsdb-ej9v6mg6q8z9.tsdb.iot.bj.baidubce.com',
14    'connector.write.max-message-num-per-batch' = '2000'
15);

编写数据统计DML语句

统计每分钟按照 apiUuid、groupUuid、status 进行聚合的结果,每个 Query 产生3个 TSDB datapoints,并实时写入到 TSDB 中。这里通过嵌套子查询的方式来使SQL结构更加清晰。选取 timestamp 字段作为 Eventtime 的watermark,延迟设置为1分钟。聚合时采用滚动窗口,窗口大小为1分钟。

FLINK
1INSERT INTO
2   sink_tsdb_table
3SELECT
4   ARRAY [
5       ROW(`timestamp`, `count_name` , `count`, `common_tags`),
6       ROW(`timestamp`, `traffic_name`, `traffic`, `common_tags`),
7       ROW(`timestamp`, `latency_name`, `latency`, `common_tags`)
8   ]
9FROM
10   (
11       SELECT
12           `timestamp`,
13           'count' AS `count_name`,
14           `count`,
15           'traffic' AS `traffic_name`,
16           `traffic`,
17           'latency' AS `latency_name`,
18           `latency`,
19           MAP ['apiUuid', `apiUuid`, 'groupUuid', `groupUuid`, 'status', `status`] AS `common_tags`
20       FROM
21           (
22               SELECT
23                   TO_BIGINT(TUMBLE_START(`timestamp`, INTERVAL '1' MINUTE)) AS `timestamp`,
24                   COUNT(1) AS `count`,
25                   SUM(contentLength) AS `traffic`,
26                   SUM(latency) AS `latency`,
27                   `apiUuid` AS `apiUuid`,
28                   `groupUuid` AS `groupUuid`,
29                   `status` AS `status`
30               FROM
31                   (
32                       SELECT
33                           `timestamp`,
34                           `contentLength`,
35                           `latency`,
36                           `apiUuid`,
37                           `groupUuid`,
38                           CASE
39                               WHEN status >= 200
40                               AND status < 300 THEN '2xx'
41                               WHEN status >= 300
42                               AND status < 200 THEN '3xx'
43                               WHEN status >= 400
44                               AND status < 500 THEN '4xx'
45                               WHEN status >= 500
46                               AND status < 600 THEN '5xx'
47                               ELSE 'oth'
48                           END AS `status`
49                       FROM
50                           source_kafka_table
51                   ) AS taba
52               GROUP BY
53                   TUMBLE(`timestamp`, INTERVAL '1' MINUTE),
54                   `apiUuid`,
55                   `groupUuid`,
56                   `status`
57           ) AS tabb
58   ) AS tabc

相关产品

消息服务 for Kafka、时序时空数据库TSDB、数据可视化Sugar BI

上一篇
CDN 日志提取中转(ETL)
下一篇
CDN 接口日志聚合统计