Druid

MapReduce BMR

  • 发行版本
  • 功能发布记录
  • 产品描述
    • 节点类型说明
    • 产品优势
    • 应用场景
    • 产品功能
    • 产品简介
  • Python-SDK
    • Cluster(集群)
    • BmrClient
    • 异常处理
    • InstanceGroup(实例组)
    • 简介
    • 文档更新记录
    • Step(作业)
    • 版本变更记录
    • Instance(实例)
    • 快速入门
    • 安装SDK工具包
  • 开源组件介绍
    • Impala
    • Pig
    • Druid
    • Presto
    • Hue
    • Ooize
    • HBase
    • Kudu
    • Sqoop
    • Hadoop-Streaming
    • Zeppelin
    • Alluxio
    • Kerberos
      • 集群互信配置
      • 概述
    • ClickHouse
      • 常见问题
      • 数据迁移同步
        • 从Spark导入
        • 从Kafka同步数据
        • 将自建ClickHouse数据迁移到云ClickHouse中
        • 从Flink导入
        • 从MySQL导入和同步
        • 从本地数据导入
          • Parquet格式
          • JSON
          • SQL转储
          • CSV and TSV
      • 快速入门
        • 访问模式
        • 客户端登录
        • 创建ClickHouse集群
        • 基础操作
      • 运维相关操作
        • ClickHouse集群扩容
        • ClickHouse集群缩容
        • 日志配置说明
        • 监控告警配置
    • Ranger
      • ranger概述
      • 权限策略配置
    • Paimon
      • Hive示例
      • StarRocks示例
      • 联合查询示例
      • Flink示例
      • Spark示例
    • Flink
      • 基础使用
    • Trino
      • 基础使用
      • 概述
    • Spark
      • 引擎增强
      • 基础使用
    • Hive
      • 开发指南
        • 自定义函数(UDF)
      • 实践操作
        • Hive迁移
        • Hive操作HBase外表
      • 基础使用
        • Hive基础操作
        • Hive连接方式
  • Java-SDK
    • Cluster(集群)
    • 异常
    • BmrClient
    • InstanceGroup(实例组)
    • 日志
    • 文档更新记录
    • 版本更新记录
    • Step(作业)
    • Instance(实例)
    • 快速入门
    • 安装SDK工具包
    • 概述
  • 快速入门
    • 操作流程概览
    • 环境准备
    • 创建集群
    • 数据准备
    • 开发作业
    • 查看结果
    • ClickHouse
      • 导入数据
      • 创建数据库
      • 连接集群
      • 创建表
  • 操作指南
    • 集群模板
    • 服务管理
    • 集群配置
      • 用户管理
      • 弹性伸缩
      • 创建集群
      • 集群安全模式
      • EIP
      • Hive元数据说明
      • 集群审计
      • 配置已有集群
      • 安全组
    • 管理作业
      • 创建作业
      • 诊断、调优
      • 定时任务
      • 查看作业
    • 访问集群
      • 访问集群服务页面
      • 访问集群-openVPN访问集群
      • 使用OpenVPN提交Hadoop作业
      • SSH连接到集群
    • 实践操作
      • 存储数据至HBase
      • 导入数据
      • 编译Maven项目
      • Sqoop导入导出数据
        • 导出数据
    • 权限管理
      • 多用户访问控制
      • 用户管理
    • 集群管理
      • 节点管理
      • 监控报警
      • 集群指标
      • 资源管理
  • 服务等级协议SLA
    • BMR服务等级协议SLA
  • API参考
    • 通用说明
    • 公共头
    • 数据类型
    • 版本更新记录
    • 服务域名
    • 实例操作接口
    • 实例组操作接口
    • 集群操作接口
    • API简介
    • 错误码
  • 常见问题
    • 安全性问题
    • 计费类问题
    • 常见问题总览
    • 性能类问题
    • 配置类问题
    • 故障类问题
  • 视频专区
    • 操作指南
    • 产品介绍
  • 场景教程
    • 流式应用场景
    • 离线应用场景
    • 使用Hive分析网站日志
    • Sqoop应用文档
    • 定时分析日志数据
    • HIVE
      • 不同集群的 Hive 迁移方案
      • Hive 操作 Hbase 外部表
  • 产品定价
    • 转换计费方式
    • 计费项
    • 到期或欠费说明
    • 包年包月计费
    • 续费说明
    • 变更配置计费说明
    • 计费方式
    • 按需计费
    • 账单和用量查询
    • 退款说明
所有文档
menu
没有找到结果,请重新输入

MapReduce BMR

  • 发行版本
  • 功能发布记录
  • 产品描述
    • 节点类型说明
    • 产品优势
    • 应用场景
    • 产品功能
    • 产品简介
  • Python-SDK
    • Cluster(集群)
    • BmrClient
    • 异常处理
    • InstanceGroup(实例组)
    • 简介
    • 文档更新记录
    • Step(作业)
    • 版本变更记录
    • Instance(实例)
    • 快速入门
    • 安装SDK工具包
  • 开源组件介绍
    • Impala
    • Pig
    • Druid
    • Presto
    • Hue
    • Ooize
    • HBase
    • Kudu
    • Sqoop
    • Hadoop-Streaming
    • Zeppelin
    • Alluxio
    • Kerberos
      • 集群互信配置
      • 概述
    • ClickHouse
      • 常见问题
      • 数据迁移同步
        • 从Spark导入
        • 从Kafka同步数据
        • 将自建ClickHouse数据迁移到云ClickHouse中
        • 从Flink导入
        • 从MySQL导入和同步
        • 从本地数据导入
          • Parquet格式
          • JSON
          • SQL转储
          • CSV and TSV
      • 快速入门
        • 访问模式
        • 客户端登录
        • 创建ClickHouse集群
        • 基础操作
      • 运维相关操作
        • ClickHouse集群扩容
        • ClickHouse集群缩容
        • 日志配置说明
        • 监控告警配置
    • Ranger
      • ranger概述
      • 权限策略配置
    • Paimon
      • Hive示例
      • StarRocks示例
      • 联合查询示例
      • Flink示例
      • Spark示例
    • Flink
      • 基础使用
    • Trino
      • 基础使用
      • 概述
    • Spark
      • 引擎增强
      • 基础使用
    • Hive
      • 开发指南
        • 自定义函数(UDF)
      • 实践操作
        • Hive迁移
        • Hive操作HBase外表
      • 基础使用
        • Hive基础操作
        • Hive连接方式
  • Java-SDK
    • Cluster(集群)
    • 异常
    • BmrClient
    • InstanceGroup(实例组)
    • 日志
    • 文档更新记录
    • 版本更新记录
    • Step(作业)
    • Instance(实例)
    • 快速入门
    • 安装SDK工具包
    • 概述
  • 快速入门
    • 操作流程概览
    • 环境准备
    • 创建集群
    • 数据准备
    • 开发作业
    • 查看结果
    • ClickHouse
      • 导入数据
      • 创建数据库
      • 连接集群
      • 创建表
  • 操作指南
    • 集群模板
    • 服务管理
    • 集群配置
      • 用户管理
      • 弹性伸缩
      • 创建集群
      • 集群安全模式
      • EIP
      • Hive元数据说明
      • 集群审计
      • 配置已有集群
      • 安全组
    • 管理作业
      • 创建作业
      • 诊断、调优
      • 定时任务
      • 查看作业
    • 访问集群
      • 访问集群服务页面
      • 访问集群-openVPN访问集群
      • 使用OpenVPN提交Hadoop作业
      • SSH连接到集群
    • 实践操作
      • 存储数据至HBase
      • 导入数据
      • 编译Maven项目
      • Sqoop导入导出数据
        • 导出数据
    • 权限管理
      • 多用户访问控制
      • 用户管理
    • 集群管理
      • 节点管理
      • 监控报警
      • 集群指标
      • 资源管理
  • 服务等级协议SLA
    • BMR服务等级协议SLA
  • API参考
    • 通用说明
    • 公共头
    • 数据类型
    • 版本更新记录
    • 服务域名
    • 实例操作接口
    • 实例组操作接口
    • 集群操作接口
    • API简介
    • 错误码
  • 常见问题
    • 安全性问题
    • 计费类问题
    • 常见问题总览
    • 性能类问题
    • 配置类问题
    • 故障类问题
  • 视频专区
    • 操作指南
    • 产品介绍
  • 场景教程
    • 流式应用场景
    • 离线应用场景
    • 使用Hive分析网站日志
    • Sqoop应用文档
    • 定时分析日志数据
    • HIVE
      • 不同集群的 Hive 迁移方案
      • Hive 操作 Hbase 外部表
  • 产品定价
    • 转换计费方式
    • 计费项
    • 到期或欠费说明
    • 包年包月计费
    • 续费说明
    • 变更配置计费说明
    • 计费方式
    • 按需计费
    • 账单和用量查询
    • 退款说明
  • 文档中心
  • arrow
  • MapReduceBMR
  • arrow
  • 开源组件介绍
  • arrow
  • Druid
本页目录
  • Druid简介
  • 集群准备
  • Druid模版
  • 节点类型
  • 使用简介(批量索引)
  • 使用Kafka Indexing Service
  • 跨集群使用Druid
  • 使用Hive简化Druid操作
  • 示例一:查询
  • 示例二:导入历史数据
  • 示例三:导入Kafka实时数据

Druid

更新时间:2025-08-21

Druid简介

Druid是一个高性能的实时数据分析系统,由MetaMarkets公司在2012开源,专门为OLAP场景而设计。Druid遵从Lambda架构,支持批量和实时两种方式导入数据,并提供高性能的数据查询。

集群准备

Druid模版

登录百度云控制台,选择“产品服务->百度MapReduce BMR”,点击“创建集群”,进入集群创建页。购置集群时选择Druid模版即可,另外Druid的元数据存储支持本地MySQL和云数据库RDS MySQL。如下图所示:

image.png

节点类型

image.png

BMR的Master节点上部署Druid的Overlord、Coordinator、Broker和Router,供提交任务、查看数据源和数据查询。

image.png

image.png

为了更容易扩展节点,BMR的Core节点部署Druid的Historical和MiddleManager, Task节点部署Druid的Broker和MiddleManager。可以按照需求变更Core和Task的节点数目。

Druid对内存的要求较高,建议使用4核16GB(或更高配置)的节点配置。在Druid集群内部,建议不要使用MapReduce跑其他开源组件的作业。

使用简介(批量索引)

Druid使用的端口如下:

Druid节点类型 端口
Broker 8590
Overlord 8591
Coordinator 8592
Router 8593
Historical 8594
MiddleManager 8595

Druid提供Http访问,每个请求都会返回结果,如果使用curl命令没有任何返回,可以加上-v查看具体的响应信息。

  1. 远程登录到创建好的集群中

    ssh root@[master节点公网ip]
    使用创建集群时输入的密码

  2. 切换成hdfs用户,在HDFS上创建quickstart目录

    hdfs dfs -mkdir /user/druid/quickstart

  3. 将示例数据文件wikiticker-2015-09-12-sampled.json.gz拷贝到HDFS

    hdfs dfs -copyFromLocal /opt/bmr/druid/quickstart/wikiticker-2015-09-12-sampled.json.gz /user/druid/quickstart

  4. 向Overlord提交批量索引任务

    curl -v -X POST -H 'Content-Type: application/json' -d @/opt/bmr/druid/quickstart/wikiticker-index.json http://xxx-master-instance-xxx-1(hostname):8591/druid/indexer/v1/task
    成功返回结果为{"task":"index_hadoop_wikiticker_yyyy-MM-ddThh:mm:ss.xxZ"}

    如果使用的是高可用集群,Overlord只有一个为Active,有效的hostname可能是xxx-master-instance-xxx-2。

  5. 使用Web UI查看任务执行情况

    使用ssh -ND [本地未使用端口] root@[公网ip] 并对浏览器做相关配置,例如Chrome浏览器通过Swichysharp配置了VPN之后(通过SSH-Tunnel访问集群),或者使用OpenVPN,可通过Master节点的内网ip加上Overlord端口(8591)查看结果,如下图所示:

    image.png

  6. 等到索引任务执行成功,查看数据源

使用Web UI:配置好VPN后,通过Master节点的内网ip加上Coordinator端口(8592)查看结果,如下图所示:

使用Http:

curl -v -X GET http://xxx-master-instance-xxx-1:8592/druid/coordinator/v1/datasources

成功返回结果为["wikiticker"]

如果使用的是高可用集群,Coordinator只有一个为Active,有效的hostname可能是xxx-master-instance-xxx-2。

  1. 查询数据

    当Overlord Web UI显示任务成功或者查看数据源显示有wikiticker的时候,可以使用Broker查询数据:
    curl -v -X 'POST' -H 'Content-Type:application/json' -d @/opt/bmr/druid/quickstart/wikiticker-top-pages.json http://[master/task hostname或ip]:8590/druid/v2?pretty

Tips: 如果需要使用自己的Druid Spec JSON,建议在~目录下编辑,并在本地做好备份。Spec的编写请参考Apache Druid官方文档。

使用Kafka Indexing Service

Druid提供了Kafka Indexing Service摄入实时数据,详情参考Druid官方文档。BMR托管的Druid可支持百度消息服务BMS、BMR内部和放在同一VPC下的Kafka集群。本示例使用百度消息服务BMS演示,通过其他方式部署的Kafka集群步骤类似。

  1. 使用百度消息服务BMS创建主题 kafka_demo,然后下载BMS的证书,用于后续创建Kafka生产者和消费者,参考BMS文档
  2. 在~目录(/home/hdfs),将下载完毕的BMS证书上传并解压到此处,然后scp到其他节点

    Plain Text
    1unzip -d kafka-key kafka-key.zip
    2scp -rp kafka-key root@[节点ip或hostname]:~/
  3. 在~目录(/home/hdfs)下编写Druid Spec文件kafka_demo.json,BMS的主题需要带上创建后生成的前缀

    JSON
    1查看client.properties文件,替换kafka_demo.json里的ssl.keystore.password
    2
    3{
    4  "type": "kafka",
    5  "dataSchema": {
    6    "dataSource": "wiki_kafka_demo",
    7    "parser": {
    8      "type": "string",
    9      "parseSpec": {
    10        "format": "json",
    11        "timestampSpec": {
    12          "column": "time",
    13          "format": "auto"
    14        },
    15        "dimensionsSpec": {
    16          "dimensions": [
    17            "channel",
    18            "cityName",
    19            "comment",
    20            "countryIsoCode",
    21            "countryName",
    22            "isAnonymous",
    23            "isMinor",
    24            "isNew",
    25            "isRobot",
    26            "isUnpatrolled",
    27            "metroCode",
    28            "namespace",
    29            "page",
    30            "regionIsoCode",
    31            "regionName",
    32            "user",
    33            { "name": "added", "type": "long" },
    34            { "name": "deleted", "type": "long" },
    35            { "name": "delta", "type": "long" }
    36          ]
    37        }
    38      }
    39    },
    40    "metricsSpec" : [],
    41    "granularitySpec": {
    42      "type": "uniform",
    43      "segmentGranularity": "DAY",
    44      "queryGranularity": "NONE",
    45      "rollup": false
    46    }
    47  },
    48  "tuningConfig": {
    49    "type": "kafka",
    50    "reportParseExceptions": false
    51  },
    52  "ioConfig": {
    53    "topic": "[accountID]__kafka_demo",
    54    "replicas": 2,
    55    "taskDuration": "PT10M",
    56    "completionTimeout": "PT20M",
    57    "consumerProperties": {
    58      "bootstrap.servers": "[kafka_address]:[kafka_port]",
    59      "security.protocol": "SSL",
    60      "ssl.truststore.password": "kafka",
    61      "ssl.truststore.location": "/home/hdfs/kafka-key/client.truststore.jks",
    62      "ssl.keystore.location": "/home/hdfs/kafka-key/client.keystore.jks",
    63      "ssl.keystore.password": "******"
    64    }
    65  }
    66} 

    若使用的不是开启SSL的Kafka集群,consumerProperties只需要配置bootstrap.servers即可,例如BMR集群中的Kafka。

  4. 向Overlord提交Kafka Supervisor任务

    Plain Text
    1 curl -XPOST -H'Content-Type: application/json' -d @kafka_demo.json http://[overlord_ip]:8591/druid/indexer/v1/supervisor

    成功返回:

    {"id":"wiki_kafka_demo"},Overlord Web UI上也可以看到结果:

    image.png

  5. 本地目录解压BMS证书压缩包,使用client.propertie启动Kafka生产者并发送消息

    Plain Text
    1sh bin/kafka-console-producer.sh --producer.config client.properties --topic <accountID>__kafka_demo --sync --broker-list [kafka_address]:[kafka_port]
    2
    3{"time":"2015-09-12T23:58:31.643Z","channel":"#en.wikipedia","cityName":null,"comment":"Notification: tagging for deletion of [[File:Axintele team.gif]]. ([[WP:TW|TW]])","countryIsoCode":null,"countryName":null,"isAnonymous":false,"isMinor":false,"isNew":false,"isRobot":false,"isUnpatrolled":false,"metroCode":null,"namespace":"User talk","page":"User talk:AlexGeorge33","regionIsoCode":null,"regionName":null,"user":"Sir Sputnik","delta":1921,"added":1921,"deleted":0}
    4{"time":"2015-09-12T23:58:33.743Z","channel":"#vi.wikipedia","cityName":null,"comment":"clean up using [[Project:AWB|AWB]]","countryIsoCode":null,"countryName":null,"isAnonymous":false,"isMinor":false,"isNew":false,"isRobot":true,"isUnpatrolled":false,"metroCode":null,"namespace":"Main","page":"Codiaeum finisterrae","regionIsoCode":null,"regionName":null,"user":"ThitxongkhoiAWB","delta":18,"added":18,"deleted":0}
    5{"time":"2015-09-12T23:58:35.732Z","channel":"#en.wikipedia","cityName":null,"comment":"Put info into infobox, succession box, split sections a bit","countryIsoCode":null,"countryName":null,"isAnonymous":false,"isMinor":false,"isNew":false,"isRobot":false,"isUnpatrolled":false,"metroCode":null,"namespace":"Main","page":"Jamel Holley","regionIsoCode":null,"regionName":null,"user":"Mr. Matté","delta":3427,"added":3427,"deleted":0}
    6{"time":"2015-09-12T23:58:38.531Z","channel":"#uz.wikipedia","cityName":null,"comment":"clean up, replaced: ozbekcha → oʻzbekcha, olchami → oʻlchami (3) using [[Project:AWB|AWB]]","countryIsoCode":null,"countryName":null,"isAnonymous":false,"isMinor":true,"isNew":false,"isRobot":true,"isUnpatrolled":false,"metroCode":null,"namespace":"Main","page":"Chambors","regionIsoCode":null,"regionName":null,"user":"Ximik1991Bot","delta":8,"added":8,"deleted":0}
    7{"time":"2015-09-12T23:58:41.619Z","channel":"#it.wikipedia","cityName":null,"comment":"/* Gruppo Discovery Italia */","countryIsoCode":null,"countryName":null,"isAnonymous":false,"isMinor":false,"isNew":false,"isRobot":false,"isUnpatrolled":true,"metroCode":null,"namespace":"Main","page":"Deejay TV","regionIsoCode":null,"regionName":null,"user":"Ciosl","delta":4,"added":4,"deleted":0}
    8{"time":"2015-09-12T23:58:43.304Z","channel":"#en.wikipedia","cityName":null,"comment":"/* Plot */","countryIsoCode":null,"countryName":null,"isAnonymous":false,"isMinor":false,"isNew":false,"isRobot":false,"isUnpatrolled":false,"metroCode":null,"namespace":"Main","page":"Paper Moon (film)","regionIsoCode":null,"regionName":null,"user":"Siddharth Mehrotra","delta":0,"added":0,"deleted":0}
    9{"time":"2015-09-12T23:58:46.732Z","channel":"#en.wikipedia","cityName":null,"comment":"/* Jeez */ delete (also fixed Si Trew's syntax error, as I think he meant to make a link there)","countryIsoCode":null,"countryName":null,"isAnonymous":false,"isMinor":false,"isNew":false,"isRobot":false,"isUnpatrolled":false,"metroCode":null,"namespace":"Wikipedia","page":"Wikipedia:Redirects for discussion/Log/2015 September 12","regionIsoCode":null,"regionName":null,"user":"JaykeBird","delta":293,"added":293,"deleted":0}
    10{"time":"2015-09-12T23:58:48.729Z","channel":"#fr.wikipedia","cityName":null,"comment":"MàJ","countryIsoCode":null,"countryName":null,"isAnonymous":false,"isMinor":false,"isNew":false,"isRobot":false,"isUnpatrolled":false,"metroCode":null,"namespace":"Wikipédia","page":"Wikipédia:Bons articles/Nouveau","regionIsoCode":null,"regionName":null,"user":"Gemini1980","delta":76,"added":76,"deleted":0}
    11{"time":"2015-09-12T23:58:51.785Z","channel":"#vi.wikipedia","cityName":null,"comment":"clean up using [[Project:AWB|AWB]]","countryIsoCode":null,"countryName":null,"isAnonymous":false,"isMinor":false,"isNew":false,"isRobot":true,"isUnpatrolled":false,"metroCode":null,"namespace":"Main","page":"Amata yezonis","regionIsoCode":null,"regionName":null,"user":"ThitxongkhoiAWB","delta":10,"added":10,"deleted":0}
    12{"time":"2015-09-12T23:58:53.898Z","channel":"#vi.wikipedia","cityName":null,"comment":"clean up using [[Project:AWB|AWB]]","countryIsoCode":null,"countryName":null,"isAnonymous":false,"isMinor":false,"isNew":false,"isRobot":true,"isUnpatrolled":false,"metroCode":null,"namespace":"Main","page":"Codia triverticillata","regionIsoCode":null,"regionName":null,"user":"ThitxongkhoiAWB","delta":36,"added":36,"deleted":0}
  6. 通过Coordinator Web UI查看实时数据源

跨集群使用Druid

Druid支持通过跨集群和其他Hadoop组件联合使用,如Hadoop、Hive、Spark等。在集群创建页将多个集群创建在同一个私有网络(VPC)之下,curl命令或者Http客户端就可以指定IP和端口向Druid各组件发送跨集群的请求。

目前只支持通过IP地址访问同一个VPC下的另一个集群的主机。

使用Hive简化Druid操作

在Hive中可以通过指定IP地址和端口访问Druid节点。

本节提供一个能够运行的示例,更多使用详情可参考Apache Hive官方文档https://cwiki.apache.org/confluence/display/Hive/Druid+Integration

示例一:查询

推荐使用Hive查询Druid数据,可以简化Druid常用的查询,不用为了简单的查询而去编写复杂的JSON文件。

  1. 创建一个Hive集群,登录Hive集群

    ssh root@[hive集群公网ip]
    使用创建hive集群时输入的密码

  2. 指定Druid集群Broker的IP地址和端口

    hive> SET hive.druid.broker.address.default=x.x.x.x:8590;

  3. 创建外表(确保按照之前的示例,创建了wikiticker数据源)

    hive> CREATE EXTERNAL TABLE druid_hive_demo_01 STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler' TBLPROPERTIES ("druid.datasource" = "wikiticker");

  4. 查看元信息

    hive> DESCRIBE FORMATTED druid_hive_demo_01;
    其中维度列是string类型,指标列是bigint类型,时间戳字段__time是timestamp with local time zone类型

  5. 查询数据

    hive> SELECT `__time`, count, page, `user`, added FROM druid_hive_demo_01 LIMIT 10;

    注意:使用Hive无法扫描Druid全表,如上面的例子不使用limit会报错

    __time是Druid表(数据源)的时间戳字段,在Hive中需要用反引号转义,返回结果如下:

    Plain Text
    12015-09-12 08:46:58.771 Asia/Shanghai	1	36	Talk:Oswald Tilghman	GELongstreet
    22015-09-12 08:47:00.496 Asia/Shanghai	1	17	Rallicula	PereBot
    32015-09-12 08:47:05.474 Asia/Shanghai	1	0	Peremptory norm	60.225.66.142
    42015-09-12 08:47:08.77 Asia/Shanghai	1	18	Apamea abruzzorum	Cheers!-bot
    52015-09-12 08:47:11.862 Asia/Shanghai	1	18	Atractus flammigerus	ThitxongkhoiAWB
    62015-09-12 08:47:13.987 Asia/Shanghai	1	18	Agama mossambica	ThitxongkhoiAWB
    72015-09-12 08:47:17.009 Asia/Shanghai	1	0	Campanya dels Balcans (1914-1918)	Jaumellecha
    82015-09-12 08:47:19.591 Asia/Shanghai	1	345	Talk:Dani Ploeger	New Media Theorist
    92015-09-12 08:47:21.578 Asia/Shanghai	1	121	User:WP 1.0 bot/Tables/Project/Pubs	WP 1.0 bot
    102015-09-12 08:47:25.821 Asia/Shanghai	1	18	Agama persimilis	ThitxongkhoiAWB

示例二:导入历史数据

可以使用已有的Hive表,通过建外表将HDFS数据导入到Druid中,不必编写复杂的Druid Ingestion Spec文件。注意,必须指定时间戳字段__time,且时间戳类型为timestamp with local time zone。

  1. 进入~目录,下载示例数据文件,创建示例Hive表

    SQL
    1wget http://files.grouplens.org/datasets/movielens/ml-100k.zip
    2unzip ml-100k.zip
    3
    4hive>
    5CREATE TABLE u_data (
    6    userid INT,
    7    movieid INT,
    8    rating INT,
    9    unixtime STRING)
    10ROW FORMAT DELIMITED
    11FIELDS TERMINATED BY '\t'
    12STORED AS TEXTFILE;
    13
    14LOAD DATA LOCAL INPATH 'ml-100k/u.data'
    15OVERWRITE INTO TABLE u_data;
  2. 设置Hive属性,关联Druid

    SQL
    1// 获取druid集群的元数据库信息:ssh root@public_ip 'cat /etc/druid/conf/_common/common.runtime.properties | grep "druid.metadata.storage.connector"'
    2// 跨集群访问druid集群的mysql,需要使用ip地址,若使用hostname需要更新hive集群的hosts文件
    3// 需要删除外表时开启,可以清空Druid数据: SET external.table.purge=true;
    4SET hive.druid.metadata.uri=jdbc:mysql://[druid_mysql_ip]:3306/druid?characterEncoding=UTF-8;
    5SET hive.druid.metadata.username=[druid_user];
    6SET hive.druid.metadata.password=[druid_passwd];
    7SET hive.druid.broker.address.default=[druid_broker_ip]:8590;
    8SET hive.druid.overlord.address.default=[druid_overlord_ip]:8591;
    9SET hive.druid.coordinator.address.default=[druid_coordinator_ip]:8592;
    10SET hive.druid.storage.storageDirectory=hdfs://[hive_hdfs_ip]:8020/user/druid/warehouses;
  3. 方法一:先建表再导入数据

    SQL
    1CREATE EXTERNAL TABLE druid_hive_demo_02
    2(`__time` TIMESTAMP WITH LOCAL TIME ZONE, userid STRING, moveid STRING, rating INT)
    3STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler'
    4TBLPROPERTIES (
    5    "druid.segment.granularity" = "MONTH",
    6    "druid.query.granularity" = "DAY"
    7);
    8
    9INSERT INTO druid_hive_demo_02
    10SELECT
    11    cast (from_unixtime(cast(unixtime as int)) as timestamp with local time zone)  as `__time`,
    12    cast(userid as STRING) userid,
    13    cast(movieid as STRING) moveid,
    14    cast(rating as INT) rating
    15FROM u_data;

    方法二,使用CTAS语句创建Druid表并导入数据

    SQL
    1// 首先打开Hive外表的CTAS开关
    2SET hive.ctas.external.tables=true;
    3
    4CREATE EXTERNAL TABLE druid_hive_demo_02
    5STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler'
    6TBLPROPERTIES (
    7    "druid.segment.granularity" = "MONTH",
    8    "druid.query.granularity" = "DAY")
    9AS
    10SELECT
    11    cast (from_unixtime(cast(unixtime as int)) as timestamp with local time zone)  as `__time`,
    12    cast(userid as STRING) userid,
    13    cast(movieid as STRING) moveid,
    14    cast(rating as INT) rating
    15FROM u_data;

    如果CTAS语句执行失败,创建的表仍然存在,可以继续使用INSERT语句插入数据,或者删除该表并重新执行。

  4. 查询结果

    SQL
    1hive> SELECT * FROM druid_hive_demo_02 LIMIT 10;
    2OK
    31997-09-19 08:00:00.0 Asia/Shanghai	259	1074 3
    41997-09-19 08:00:00.0 Asia/Shanghai	259	108	 4
    51997-09-19 08:00:00.0 Asia/Shanghai	259	117	 4
    61997-09-19 08:00:00.0 Asia/Shanghai	259	173	 4
    71997-09-19 08:00:00.0 Asia/Shanghai	259	176	 4
    81997-09-19 08:00:00.0 Asia/Shanghai	259	185	 4
    91997-09-19 08:00:00.0 Asia/Shanghai	259	200	 4
    101997-09-19 08:00:00.0 Asia/Shanghai	259	210	 4
    111997-09-19 08:00:00.0 Asia/Shanghai	259	255	 4
    121997-09-19 08:00:00.0 Asia/Shanghai	259	286	 4

示例三:导入Kafka实时数据

  1. 使用百度消息服务BMS创建主题 kafka_demo,并下载证书scp到druid集群各节点,参考BMS文档。

scp -rp kafka-key root@[druid集群的节点ip或hostname]:~/

  1. 创建Hive表,通过属性关联BMS Kafka

    SQL
    1// 获取druid集群的元数据库信息:ssh hdfs@public_ip 'cat /etc/druid/conf/_common/common.runtime.properties | grep "druid.metadata.storage.connector"'
    2// 跨集群访问druid集群的mysql,需要使用ip地址,若使用hostname需要更新hive集群的hosts文件
    3SET hive.druid.metadata.uri=jdbc:mysql://[druid_mysql_ip]:3306/druid?characterEncoding=UTF-8;
    4SET hive.druid.metadata.username=[druid_user];
    5SET hive.druid.metadata.password=[druid_passwd];
    6SET hive.druid.broker.address.default=[druid_broker_ip]:8590;
    7SET hive.druid.overlord.address.default=[druid_overlord_ip]:8591;
    8SET hive.druid.coordinator.address.default=[druid_coordinator_ip]:8592;
    9SET hive.druid.storage.storageDirectory=hdfs://[hive_hdfs_ip]:8020/user/druid/warehouses;
    10// Kafka Supervisor配置前缀是druid.kafka.ingestion
    11CREATE EXTERNAL TABLE druid_kafka_demo
    12(
    13    `__time`          TIMESTAMP WITH LOCAL TIME ZONE, 
    14    `channel`         STRING,
    15    `cityName`        STRING,
    16    `comment`         STRING,
    17    `countryIsoCode`  STRING,
    18    `isAnonymous`     STRING,
    19    `isMinor`         STRING,
    20    `isNew`           STRING,
    21    `isRobot`         STRING,
    22    `isUnpatrolled`   STRING,
    23    `metroCode`       STRING,
    24    `namespace`       STRING,
    25    `page`            STRING,
    26    `regionIsoCode`   STRING,
    27    `regionName`      STRING,
    28    `added`           INT,
    29    `user`            STRING,
    30    `deleted`         INT,
    31    `delta`           INT
    32)
    33STORED BY 'org.apache.hadoop.hive.druid.DruidStorageHandler'
    34TBLPROPERTIES (
    35    "kafka.bootstrap.servers" = "[kafka_address]:[kafka_port]",
    36    "kafka.security.protocol" = "SSL",
    37    "kafka.ssl.truststore.password" = "kafka",
    38    "kafka.ssl.truststore.location" =  "/home/hdfs/kafka-key/client.truststore.jks",
    39    "kafka.ssl.keystore.location" =  "/home/hdfs/kafka-key/client.keystore.jks",
    40    "kafka.ssl.keystore.password" =  "******",
    41    "kafka.topic" = "[account_id]__kafka_demo",
    42    "druid.kafka.ingestion.useEarliestOffset" = "true",
    43    "druid.kafka.ingestion.maxRowsInMemory" = "5",
    44    "druid.kafka.ingestion.startDelay" = "PT1S",
    45    "druid.kafka.ingestion.period" = "PT1S",
    46    "druid.kafka.ingestion.taskDuration" = "PT10M",
    47    "druid.kafka.ingestion.completionTimeout" = "PT20M",
    48    "druid.kafka.ingestion.consumer.retries" = "2"
    49);

    如果使用的不是开启了SSL的Kafka,可移除ssl相关配置,例如BMR集群里的Kafka

  2. 启动Kafka生产者发送数据

    Plain Text
    1sh bin/kafka-console-producer.sh --producer.config client.properties --topic druid_kafka_demo --sync --broker-list [kafka_address]:[kafka_port]
    2
    3{"__time":"2015-09-12T23:58:31.643Z","channel":"#en.wikipedia","cityName":null,"comment":"Notification: tagging for deletion of [[File:Axintele team.gif]]. ([[WP:TW|TW]])","countryIsoCode":null,"countryName":null,"isAnonymous":false,"isMinor":false,"isNew":false,"isRobot":false,"isUnpatrolled":false,"metroCode":null,"namespace":"User talk","page":"User talk:AlexGeorge33","regionIsoCode":null,"regionName":null,"user":"Sir Sputnik","delta":1921,"added":1921,"deleted":0}
    4{"__time":"2015-09-12T23:58:33.743Z","channel":"#vi.wikipedia","cityName":null,"comment":"clean up using [[Project:AWB|AWB]]","countryIsoCode":null,"countryName":null,"isAnonymous":false,"isMinor":false,"isNew":false,"isRobot":true,"isUnpatrolled":false,"metroCode":null,"namespace":"Main","page":"Codiaeum finisterrae","regionIsoCode":null,"regionName":null,"user":"ThitxongkhoiAWB","delta":18,"added":18,"deleted":0}
    5{"__time":"2015-09-12T23:58:35.732Z","channel":"#en.wikipedia","cityName":null,"comment":"Put info into infobox, succession box, split sections a bit","countryIsoCode":null,"countryName":null,"isAnonymous":false,"isMinor":false,"isNew":false,"isRobot":false,"isUnpatrolled":false,"metroCode":null,"namespace":"Main","page":"Jamel Holley","regionIsoCode":null,"regionName":null,"user":"Mr. Matté","delta":3427,"added":3427,"deleted":0}
    6{"__time":"2015-09-12T23:58:38.531Z","channel":"#uz.wikipedia","cityName":null,"comment":"clean up, replaced: ozbekcha → oʻzbekcha, olchami → oʻlchami (3) using [[Project:AWB|AWB]]","countryIsoCode":null,"countryName":null,"isAnonymous":false,"isMinor":true,"isNew":false,"isRobot":true,"isUnpatrolled":false,"metroCode":null,"namespace":"Main","page":"Chambors","regionIsoCode":null,"regionName":null,"user":"Ximik1991Bot","delta":8,"added":8,"deleted":0}
    7{"__time":"2015-09-12T23:58:41.619Z","channel":"#it.wikipedia","cityName":null,"comment":"/* Gruppo Discovery Italia */","countryIsoCode":null,"countryName":null,"isAnonymous":false,"isMinor":false,"isNew":false,"isRobot":false,"isUnpatrolled":true,"metroCode":null,"namespace":"Main","page":"Deejay TV","regionIsoCode":null,"regionName":null,"user":"Ciosl","delta":4,"added":4,"deleted":0}
    8{"__time":"2015-09-12T23:58:43.304Z","channel":"#en.wikipedia","cityName":null,"comment":"/* Plot */","countryIsoCode":null,"countryName":null,"isAnonymous":false,"isMinor":false,"isNew":false,"isRobot":false,"isUnpatrolled":false,"metroCode":null,"namespace":"Main","page":"Paper Moon (film)","regionIsoCode":null,"regionName":null,"user":"Siddharth Mehrotra","delta":0,"added":0,"deleted":0}
    9{"__time":"2015-09-12T23:58:46.732Z","channel":"#en.wikipedia","cityName":null,"comment":"/* Jeez */ delete (also fixed Si Trew's syntax error, as I think he meant to make a link there)","countryIsoCode":null,"countryName":null,"isAnonymous":false,"isMinor":false,"isNew":false,"isRobot":false,"isUnpatrolled":false,"metroCode":null,"namespace":"Wikipedia","page":"Wikipedia:Redirects for discussion/Log/2015 September 12","regionIsoCode":null,"regionName":null,"user":"JaykeBird","delta":293,"added":293,"deleted":0}
    10{"__time":"2015-09-12T23:58:48.729Z","channel":"#fr.wikipedia","cityName":null,"comment":"MàJ","countryIsoCode":null,"countryName":null,"isAnonymous":false,"isMinor":false,"isNew":false,"isRobot":false,"isUnpatrolled":false,"metroCode":null,"namespace":"Wikipédia","page":"Wikipédia:Bons articles/Nouveau","regionIsoCode":null,"regionName":null,"user":"Gemini1980","delta":76,"added":76,"deleted":0}
    11{"__time":"2015-09-12T23:58:51.785Z","channel":"#vi.wikipedia","cityName":null,"comment":"clean up using [[Project:AWB|AWB]]","countryIsoCode":null,"countryName":null,"isAnonymous":false,"isMinor":false,"isNew":false,"isRobot":true,"isUnpatrolled":false,"metroCode":null,"namespace":"Main","page":"Amata yezonis","regionIsoCode":null,"regionName":null,"user":"ThitxongkhoiAWB","delta":10,"added":10,"deleted":0}
    12{"__time":"2015-09-12T23:58:53.898Z","channel":"#vi.wikipedia","cityName":null,"comment":"clean up using [[Project:AWB|AWB]]","countryIsoCode":null,"countryName":null,"isAnonymous":false,"isMinor":false,"isNew":false,"isRobot":true,"isUnpatrolled":false,"metroCode":null,"namespace":"Main","page":"Codia triverticillata","regionIsoCode":null,"regionName":null,"user":"ThitxongkhoiAWB","delta":36,"added":36,"deleted":0}
  3. 可以通过DDL控制Kafka导入数据的启停

    SQL
    1// 只有druid.kafka.ingestion为START,Hive才能够提交实时任务到Druid
    2ALTER TABLE druid_kafka_demo SET TBLPROPERTIES('druid.kafka.ingestion' = 'START');
    3ALTER TABLE druid_kafka_demo SET TBLPROPERTIES('druid.kafka.ingestion' = 'STOP');
    4ALTER TABLE druid_kafka_demo SET TBLPROPERTIES('druid.kafka.ingestion' = 'RESET');
  4. 当第一个Segment上传成功,Coordinator显示出数据源后即可使用Hive查询数据,见示例一

上一篇
Pig
下一篇
Presto