基于EMQX+Flask+InfluxDB+Grafana打造多协议物联网云平台:MQTT/HTTP设备接入与数据可视化流程(附代码示例)

摘要: 本文深入浅出地介绍了物联网、云平台、MQTT、HTTP、数据可视化等核心概念,并结合 EMQX、Flask、InfluxDB、Grafana 等主流工具,手把手教你搭建一个支持多协议的物联网云平台。文章结构清晰,图文并茂,代码翔实易懂,旨在帮助读者快速掌握物联网云平台搭建的核心技术。

关键词: 物联网,云平台,MQTT,HTTP,数据可视化,EMQX,Flask,InfluxDB,Grafana


一、物联网基础知识

1.1 物联网概述

物联网(IoT,Internet of Things)是指通过各种信息传感器、射频识别技术、全球定位系统等,实时采集任何需要监控、连接、互动的物体或过程,实现物与物、物与人的泛在连接,进而实现对物品和过程的智能化感知、识别和管理。

1.2 物联网架构

物联网系统架构通常分为三层:

  • 感知层: 负责采集数据,包括各种传感器、RFID 标签、GPS 模块等。
  • 网络层: 负责数据传输,包括各种网络协议、通信技术等,例如 WiFi、蓝牙、Zigbee、NB-IoT 等。
  • 应用层: 负责数据处理和应用呈现,例如数据分析、远程控制、智能决策等。

二、云平台与数据可视化

2.1 云平台

云平台是指基于互联网的相关服务的增加、使用和交付模式,通常涉及通过互联网来提供动态易扩展且经常是虚拟化的资源。 云平台可以为物联网应用提供强大的计算、存储和网络资源,降低物联网应用开发和部署的成本。

2.2 数据可视化

数据可视化是指将数据以图形、图表等可视化形式展示出来,帮助用户更直观地理解数据,洞察数据背后的规律和趋势。

三、常用协议与工具介绍

3.1 MQTT 协议

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是一种轻量级的消息发布/订阅协议,专为低带宽、低功耗设备和网络而设计。MQTT 广泛应用于物联网领域,特别适用于资源受限的设备和不可靠的网络环境。

3.2 HTTP 协议

HTTP(Hypertext Transfer Protocol,超文本传输协议)是一种应用层协议,用于在 Web 浏览器和 Web 服务器之间传输信息。 HTTP 协议简单易用,被广泛应用于各种网络应用,包括物联网领域。

3.3 EMQX

EMQX 是一款开源、高性能、可扩展的 MQTT 消息服务器,支持百万级并发连接和消息吞吐。

3.4 Flask

Flask 是一个轻量级的 Web 应用框架,使用 Python 编写,易于学习和使用,适合快速搭建 Web 应用和 API 接口。

3.5 InfluxDB

InfluxDB 是一款开源的时序数据库,专为存储和查询时间序列数据而设计,适用于存储物联网传感器数据、监控数据等。

3.6 Grafana

Grafana 是一款开源的数据可视化工具,可以连接多种数据源,创建美观、功能强大的仪表盘,实时展示数据。

四、多协议物联网云平台搭建

本项目将搭建一个支持 MQTT 和 HTTP 协议的物联网云平台,实现以下功能:

  • 多协议支持: 同时支持 MQTT 和 HTTP 协议的设备接入。
  • 数据采集与存储: 实时采集来自不同协议设备的数据,并将其存储到 InfluxDB 数据库。
  • 数据可视化: 使用 Grafana 对采集到的数据进行可视化展示。
4.1 系统架构

 

4.2 代码实现

1. HTTP 服务器搭建 (Flask)

# 导入 Flask 库
from flask import Flask, request, jsonify

# 创建 Flask 应用
app = Flask(__name__)

# 定义 HTTP 接口,接收 POST 请求
@app.route('/data', methods=['POST'])
def receive_data():
    # 获取请求数据
    data = request.get_json()

    # 数据处理逻辑,例如数据校验、格式转换等
    # ...

    # 将数据写入 InfluxDB (示例)
    from influxdb import InfluxDBClient
    client = InfluxDBClient('localhost', 8086, 'user', 'password', 'iot_data')
    json_body = [
        {
            "measurement": "sensor_data",
            "tags": {
                "sensor_id": data.get("sensor_id")
            },
            "fields": {
                "temperature": data.get("temperature"),
                "humidity": data.get("humidity")
            }
        }
    ]
    client.write_points(json_body)

    # 返回响应
    return jsonify({'message': 'Data received successfully!'}), 200

# 启动 Flask 应用
if __name__ == '__main__':
    app.run(debug=True)

代码说明:

  • 使用 Flask 框架创建 HTTP 服务器,并定义 /data 接口接收 POST 请求。
  • 使用 request.get_json() 获取 HTTP 请求中的 JSON 数据。
  • 进行数据处理,例如数据校验、格式转换等。
  • 使用 InfluxDBClient 连接 InfluxDB 数据库,并将数据写入数据库。
  • 返回 JSON 格式的响应,告知客户端数据接收成功。

2. MQTT 消息处理 (Python)

# 导入必要的库
import paho.mqtt.client as mqtt
from influxdb import InfluxDBClient
import json

# MQTT Broker 配置
MQTT_BROKER = "localhost"
MQTT_PORT = 1883
MQTT_TOPIC = "sensor/data"

# InfluxDB 配置
INFLUXDB_HOST = "localhost"
INFLUXDB_PORT = 8086
INFLUXDB_USER = "user"
INFLUXDB_PASSWORD = "password"
INFLUXDB_DATABASE = "iot_data"

# 创建 InfluxDB 客户端
influxdb_client = InfluxDBClient(
    host=INFLUXDB_HOST,
    port=INFLUXDB_PORT,
    username=INFLUXDB_USER,
    password=INFLUXDB_PASSWORD,
    database=INFLUXDB_DATABASE
)

# 连接到 MQTT Broker
def on_connect(client, userdata, flags, rc):
    print("Connected to MQTT Broker with result code " + str(rc))
    client.subscribe(MQTT_TOPIC)

# 接收 MQTT 消息
def on_message(client, userdata, msg):
    # 解析数据
    data = json.loads(msg.payload.decode())
    
    # 构建 InfluxDB 数据点
    influxdb_data = [
        {
            "measurement": "sensor_data",
            "tags": {
                "sensor_id": data.get("sensor_id"),
            },
            "fields": {
                "temperature": data.get("temperature"),
                "humidity": data.get("humidity"),
            }
        }
    ]

    # 写入 InfluxDB
    influxdb_client.write_points(influxdb_data)
    print("Data written to InfluxDB: " + str(influxdb_data))

# 创建 MQTT 客户端
mqtt_client = mqtt.Client()
mqtt_client.on_connect = on_connect
mqtt_client.on_message = on_message
mqtt_client.connect(MQTT_BROKER, MQTT_PORT, 60)

# 启动 MQTT 客户端
mqtt_client.loop_start()

# 保持程序运行
while True:
    pass

代码说明:

  • 使用 paho.mqtt.client 连接到 MQTT Broker,并订阅指定主题。
  • 当收到 MQTT 消息时,使用 json.loads() 解析消息内容。
  • 将解析后的数据构建成 InfluxDB 数据点格式。
  • 使用 influxdb_client.write_points() 将数据写入 InfluxDB 数据库。

3. 数据可视化 (Grafana)

  • 安装 Grafana 并配置数据源连接到 InfluxDB 数据库。
  • 创建仪表盘并在仪表盘上添加图表,例如折线图、柱状图等。
  • 配置图表的数据源为 InfluxDB,并编写查询语句从 InfluxDB 中获取数据。
  • 根据需要配置图表的样式、标题、坐标轴等属性,使数据展示更加直观易懂。

示例 Grafana 查询语句:

SELECT "temperature", "humidity" FROM "sensor_data" WHERE time > now() - 1h

该查询语句将从 sensor_data measurement 中查询最近 1 小时的温度和湿度数据。

本项目通过结合 Flask、EMQX、InfluxDB 和 Grafana 等工具,搭建了一个支持 MQTT 和 HTTP 协议的物联网云平台,并实现了数据的采集、存储和可视化展示。该平台可以灵活扩展,支持更多类型的设备和协议接入,并根据实际需求进行功能定制。

注意:

  • 以上代码仅供参考,实际应用中需要根据具体需求进行修改。
  • 请确保已安装所有依赖库,例如 paho-mqttinfluxdbflask 等。
  • 在实际部署中,需要考虑数据的安全性、系统的稳定性等因素。

想要具体的代码以及思路可以私信我!!!

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-07-11 15:14:03       67 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-07-11 15:14:03       71 阅读
  3. 在Django里面运行非项目文件

    2024-07-11 15:14:03       58 阅读
  4. Python语言-面向对象

    2024-07-11 15:14:03       69 阅读

热门阅读

  1. springboot 抽出多个接口中都有相同的代码的方法

    2024-07-11 15:14:03       22 阅读
  2. OpenJudge | 最高的分数

    2024-07-11 15:14:03       20 阅读
  3. springmvc 如何对接接口

    2024-07-11 15:14:03       22 阅读
  4. VUE2用elementUI实现父组件中校验子组件中的表单

    2024-07-11 15:14:03       22 阅读
  5. 解释一下DecorView和Window之间的交互。

    2024-07-11 15:14:03       24 阅读
  6. 【AI原理解析】-目标检测概述

    2024-07-11 15:14:03       19 阅读
  7. 24/07/10数据结构(4.1209)单链表OJ

    2024-07-11 15:14:03       22 阅读