目录
场景:设备监控
解决方案架构
先决条件:
设置基础设施组件
设置本地服务
MQTT代理
Grafana
Kafka 连接
创建MQTT源连接器实例
部署设备数据处理器应用程序
启动模拟设备数据生成器
享受Grafana仪表板!
那么,您想运行一些RedisTimeSeries命令吗?
删除资源
其他注意事项
优化RedisTimeSeries
长期数据保留怎么样?
可扩展性
结论
RedisTimeSeries是一个Redis模块,它为Redis带来了原生的时间序列数据结构。时间序列解决方案早先建立在排序集(或Redis流)之上,可以从RedisTimeSeries功能中受益,例如高容量插入、低延迟读取、灵活的查询语言、下采样等等!
一般来说,时间序列数据(相对)简单。话虽如此,我们还需要考虑其他特征:
- 数据速度:例如,考虑每秒来自数千台设备的数百个指标
- 容量(大数据):考虑数月(甚至数年)的数据积累
因此,RedisTimeSeries等数据库只是整体解决方案的一部分。您还需要考虑如何收集(摄取)、处理所有数据并将其发送到RedisTimeSeries。你真正需要的是一个可扩展的数据管道,它可以作为一个缓冲区来解耦生产者和消费者。
这就是Apache Kafka的用武之地!除了核心代理之外,它还拥有丰富的组件生态系统,包括Kafka Connect(这是本博文中介绍的解决方案架构的一部分)、多语言客户端库、Kafka Streams、Mirror Maker等。
这篇博文提供了一个实际示例,说明如何将RedisTimeSeries与Apache Kafka结合使用来分析时间序列数据。
该代码可在此GitHub存储库中找到https://github.com/abhirockzz/redis-timeseries-kafka
让我们首先探索用例。请注意,为了博客文章的目的,它保持简单,然后在后续部分中进一步解释。
场景:设备监控假设有很多位置,每个位置都有多个设备,您的任务是监控设备指标——现在我们将考虑温度和压力。这些指标将存储在RedisTimeSeries中(当然!)并使用以下键命名约定— ::。例如,位置5中设备1的温度将表示为 temp:5:1。每个时间序列数据点还将具有以下标签(键值对)— metric, location, device。这是为了允许灵活的查询,正如您将在接下来的部分中看到的那样。
以下是几个示例,可让您了解如何使用该TS.ADD命令添加数据点:
#位置3中设备2的温度以及标签:
TS.ADD temp:3:2 * 20 LABELS metric temp location 3 device 2
#位置3中设备2的压力:
TS.ADD pressure:3:2 * 60 LABELS metric pressure location 3 device 2
以下是该解决方案的高层次外观:
让我们分解一下:
源(本地)组件
- MQTT代理(mosquitto):MQTT是物联网用例的事实上的协议。我们将使用的场景是物联网和时间序列的组合——稍后会详细介绍。
- Kafka Connect:MQTT源连接器用于将数据从MQTT代理传输到Kafka集群。
Azure服务
- 适用于Redis企业层的Azure缓存:企业层基于Redis Enterprise,这是Redis Labs的Redis商业变体。除了RedisTimeSeries,企业层还支持RediSearch和RedisBloom。客户无需担心企业层的许可证获取。Azure Redis缓存将促进这一过程,其中,客户可以通过Azure市场优惠获得并支付此软件的许可。
- Confluent Cloud on Azure:一个完全托管的产品,提供Apache Kafka即服务,这要归功于从Azure到Confluent Cloud的集成供应层。它减轻了跨平台管理的负担,并为在Azure基础架构上使用Confluent Cloud提供了统一的体验,从而使您可以轻松地将Confluent Cloud与Azure应用程序集成。
- Azure Spring Cloud:借助Azure Spring Cloud,可以更轻松地将Spring Boot微服务部署到Azure。Azure Spring Cloud减轻了基础结构问题,提供了配置管理、服务发现、CI/CD集成、蓝绿部署等。该服务完成了所有繁重的工作,因此开发人员可以专注于他们的代码。
请注意,有些服务是在本地托管的,只是为了简单起见。在生产级部署中,您也希望在Azure中运行它们。例如,您可以在Azure Kubernetes服务中操作Kafka Connect集群和MQTT连接器。
总而言之,这是端到端流程:
- 脚本生成发送到本地MQTT代理的模拟设备数据。
- 此数据由MQTT Kafka Connect源连接器获取,并发送到Azure中运行的Confluent Cloud Kafka集群中的主题。
- 它由托管在Azure Spring Cloud中的Spring Boot应用程序进一步处理,然后将其保存到Azure Cache for Redis实例。
是时候从实用的东西开始了!在此之前,请确保您具备以下条件。
先决条件:- Azure帐户——您可以在此处免费获取
- 安装Azure CLI
- JDK 11用于例如OpenJDK
- Maven和Git的最新版本
按照文档预配RedisTimeSeries模块附带的Azure Redis缓存(企业层)。
在Azure Marketplace上配置Confluent Cloud集群。还要创建一个Kafka主题(使用名称mqtt.device-stats)并创建凭证(API密钥和秘密),稍后您将使用这些凭证安全地连接到您的集群。
您可以使用Azure门户或使用Azure CLI预配Azure Spring Cloud实例:
az spring-cloud create -n -g -l
在继续之前,请确保克隆GitHub存储库:
git clone https://github.com/abhirockzz/redis-timeseries-kafka
cd redis-timeseries-kafka
组件包括:
- Mosquitto MQTT代理
- 使用MQTT源连接器的Kafka Connect
- Grafana用于跟踪仪表板中的时间序列数据
我在Mac上本地安装并启动了mosquitto代理。
brew install mosquitto
brew services start mosquitto
您可以按照与您的操作系统相对应的步骤进行操作,也可以随意使用此Docker镜像。
Grafana我在Mac上本地安装并启动了Grafana。
brew install grafana
brew services start grafana
你可以对你的操作系统做同样的事情,也可以随意使用这个Docker镜像。
docker run -d -p 3000:3000 --name=grafana -e "GF_INSTALL_PLUGINS=redis-datasource" grafana/grafana
您应该能够在刚刚克隆的存储库中找到connect-distributed.properties文件。替换bootstrap.servers、sasl.jaas.config等属性的值。
首先,在本地下载并解压Apache Kafka。
启动本地Kafka Connect集群:
export KAFKA_INSTALL_DIR=
$KAFKA_INSTALL_DIR/bin/connect-distributed.sh connect-distributed.properties
要手动安装MQTT源连接器:
- 从此链接下载连接器/插件ZIP文件,然后,
- 将其解压缩到Connect worker的plugin.path配置属性中列出的目录之一
如果您在本地使用Confluent Platform,只需使用Confluent Hub CLI: confluent-hub install confluentinc/kafka-connect-mqtt:latest
创建MQTT源连接器实例确保检查mqtt-source-config.json文件。确保输入正确的主题名称kafka.topic并且mqtt.topics保持不变。
curl -X POST -H 'Content-Type: application/json'
http://localhost:8083/connectors -d @mqtt-source-config.json
# wait for a minute before checking the connector status
curl http://localhost:8083/connectors/mqtt-source/status
在您刚刚克隆的GitHub存储库中,在consumer/src/resources文件夹中查找application.yaml文件并替换以下值:
- Azure Redis缓存主机、端口和主访问密钥
- Confluent Cloud on Azure API密钥和秘密
构建应用程序JAR文件:
cd consumer
export JAVA_HOME=
mvn clean package
创建一个Azure Spring Cloud应用程序并将JAR文件部署到它:
az spring-cloud app create -n device-data-processor -s -g --runtime-version Java_11
az spring-cloud app deploy -n device-data-processor -s -g --jar-path target/device-data-processor-0.0.1-SNAPSHOT.jar
您可以使用刚刚克隆的GitHub存储库中的脚本:
./gen-timeseries-data.sh
注意——它所做的只是使用mosquitto_pub CLI命令发送数据。
数据被发送到device-stats MQTT topic(这不是Kafka主题)。您可以使用CLI订阅者仔细检查:
mosquitto_sub -h localhost -t device-stats
检查Confluent Cloud门户中的Kafka主题。您还应该检查Azure Spring Cloud中设备数据处理器应用的日志:
az spring-cloud app logs -f -n device-data-processor -s -g
浏览到Grafana UI localhost:3000。
Grafana的Redis数据源插件适用于任何Redis数据库,包括Azure Redis缓存。按照此博客文章中的说明配置数据源。
在您克隆的GitHub存储库中的grafana_dashboards文件夹中导入仪表板(如果您需要有关如何导入仪表板的帮助,请参阅Grafana文档)。
例如,这里有一个仪表板,显示了location 1的device 5 (使用TS.MRANGE)的平均压力(超过30秒)。
这是另一个仪表板,显示了多个设备的最高温度(超过15秒)location 3(再次感谢TS.MRANGE)。
启动redis-cli并连接到Azure Cache for Redis实例:
redis-cli -h -p 10000 -a --tls
从简单的查询开始:
# pressure in device 5 for location 1
TS.GET pressure:1:5
# temperature in device 5 for location 4
TS.GET temp:4:5
按位置过滤并获取所有设备的温度和压力:
TS.MGET WITHLABELS FILTER location=3
提取特定时间范围内一个或多个位置的所有设备的温度和压力:
TS.MRANGE - + WITHLABELS FILTER location=3
TS.MRANGE - + WITHLABELS FILTER location=(3,5)
– +指的是从开始到最新时间戳的所有内容,但您可以更具体。
MRANGE正是我们所需要的!我们还可以按某个位置的特定设备进行过滤,然后按温度或压力进一步深入分析:
TS.MRANGE - + WITHLABELS FILTER location=3 device=2
TS.MRANGE - + WITHLABELS FILTER location=3 metric=temp
TS.MRANGE - + WITHLABELS FILTER location=3 device=2 metric=temp
所有这些都可以与聚合相结合。
# all the temp data points are not useful. how about an average (or max) instead of every temp data points?
TS.MRANGE - + WITHLABELS AGGREGATION avg 10000 FILTER location=3 metric=temp
TS.MRANGE - + WITHLABELS AGGREGATION max 10000 FILTER location=3 metric=temp
也可以创建一个规则来进行此聚合并将其存储在不同的时间序列中。
完成后,不要忘记删除资源以避免不必要的成本。
删除资源- 按照文档中的步骤删除Confluent Cloud集群——您只需要删除Confluent组织即可。
- 同样,您也应该删除Azure Cache for Redis实例。
在您的本地机器上:
- 停止Kafka Connect集群
- 停止蚊子经纪人(例如酿造服务停止mosquito)
- 停止Grafana服务(例如brew services stop grafana)
我们探索了使用Redis和Kafka摄取、处理和查询时间序列数据的数据管道。当您考虑下一步并转向生产级解决方案时,您应该考虑更多的事情。
其他注意事项- 保留策略:考虑这一点,因为默认情况下您的时间序列数据点不会被修剪或删除。
- 下采样和聚合规则:您不想永远存储数据,对吗?确保配置适当的规则来解决这个问题(例如TS.CREATERULE temp:1:2 temp:avg:30 AGGREGATION avg 30000 )。
- 重复数据政策:您希望如何处理重复样本?确保默认策略(BLOCK)确实是您所需要的。如果没有,请考虑其他选项。
这不是一个详尽的清单。其他配置选项请参考RedisTimeSeries文档
长期数据保留怎么样?数据是宝贵的,包括时间序列!您可能想要进一步处理它(例如运行机器学习以提取洞察力、预测性维护等)。为此,您需要将这些数据保留更长的时间,并且为了经济高效且经济高效,您需要使用可扩展的对象存储服务,例如Azure Data Lake Storage Gen2 (ADLS Gen2) .
有一个连接器!您可以通过使用完全托管的Azure Data Lake Storage Gen2 Sink Connector for Confluent Cloud来处理和存储ADLS中的数据,然后使用Azure Synapse Analytics或Azure Databricks运行机器学习来增强现有数据管道。
可扩展性您的时间序列数据量只能向上移动!解决方案的可扩展性至关重要:
- 核心基础设施:托管服务允许团队专注于解决方案,而不是设置和维护基础设施,尤其是在涉及复杂的分布式系统(如数据库和流媒体平台,如Redis和Kafka)时。
- Kafka Connect:就数据管道而言,由于Kafka Connect平台本质上是无状态且水平可扩展的,因此您掌握得很好。在如何构建和调整Kafka Connect工作集群的大小方面,您有很多选择。
- 自定义应用程序:与本解决方案一样,我们构建了一个自定义应用程序来处理Kafka主题中的数据。幸运的是,同样的可伸缩性特征也适用于它们。在水平扩展方面,它仅受您拥有的Kafka主题分区数量的限制。
集成:不仅仅是Grafana!RedisTimeSeries还与Prometheus和Telegraf集成。但是,在撰写本文时还没有Kafka连接器——这将是一个很棒的附加组件!
结论当然,您可以将Redis用于(几乎)所有事情,包括时间序列工作负载!请务必考虑从时间序列数据源到Redis及其他范围的数据管道和集成的端到端架构。
https://www.codeproject.com/Articles/5309620/Processing-Time-Series-Data-with-Redis-and-Apache