IIoT: не просто модное слово
Промышленный IoT (IIoT — Industrial Internet of Things) — это не умный чайник и не фитнес-браслет. Это системы, которые собирают данные с реального производственного оборудования, анализируют их и помогают принимать решения.
Реальный кейс: завод по производству подшипников. Раньше плановое ТО каждые 3 месяца — меняли подшипники в редукторах "по графику". После внедрения IIoT (вибродатчики на каждом редукторе + MQTT + аналитика): 30% редукторов работали нормально и менялись зря, 5% уже имели износ и могли выйти из строя раньше графика. Экономия на расходниках — 28%, аварийных остановок из-за поломки — минус 4 в год.
Это и есть предиктивное обслуживание. И начинается оно с правильного сбора данных.
Архитектура IIoT-системы
Уровень 0 — Полевые устройства:
Датчики (температура, вибрация, давление, ток)
Исполнительные механизмы
Уровень 1 — Агрегаторы / Граничные узлы (Edge):
ESP32, Raspberry Pi, промышленные шлюзы
Протоколы: Modbus, 1-Wire, I2C, SPI, 4-20мА
Уровень 2 — Брокер сообщений:
MQTT Broker (Mosquitto, EMQX, HiveMQ)
Нормализация данных, маршрутизация
Уровень 3 — Обработка и хранение:
Node-RED / Python — логика, алармы
InfluxDB / TimescaleDB — временные ряды
PostgreSQL — конфигурация, справочники
Уровень 4 — Визуализация и аналитика:
Grafana — дашборды, алерты
Jupyter Notebook — анализ данных
ML модели — предиктивная аналитика
MQTT: почему именно он
MQTT (Message Queuing Telemetry Transport) — лёгкий протокол публикации/подписки, разработанный IBM в 1999 году для телеметрии нефтепроводов через спутник. Идеален для IoT:
Лёгкий: заголовок всего 2 байта, работает при 2G-соединении
Асинхронный: устройства не опрашиваются, а сами публикуют данные
QoS (Quality of Service): три уровня надёжности доставки
Retain: брокер хранит последнее значение, новые подписчики сразу его получают
Last Will: автоматическое сообщение при потере связи с устройством
Уровни QoS:
QoS 0 (At most once): Отправил и забыл. Быстро, но сообщение может потеряться. Для частых нечувствительных данных (телеметрия каждую секунду).
QoS 1 (At least once): Гарантированная доставка, но возможны дубликаты. Для алармов и важных событий.
QoS 2 (Exactly once): Ровно один раз. Медленнее, для критичных команд управления.
Структура топиков (best practices):
factory/ ← Завод
├── line1/ ← Производственная линия 1
│ ├── conveyor1/ ← Конвейер 1
│ │ ├── telemetry ← Данные датчиков (JSON, часто)
│ │ ├── status ← Состояние (работает/стоит)
│ │ ├── alarms ← Аварии
│ │ └── commands ← Команды управления
│ └── robot1/
│ └── telemetry
└── utilities/
├── compressor1/
│ └── telemetry
└── hvac/
└── telemetry
Примеры топиков:
factory/line1/conveyor1/telemetry
factory/line1/robot1/status
factory/+/+/alarms ← Подписка на все аварии всей линии 1
factory/# ← Подписка на ВСЁ (осторожно!)
Установка и настройка Mosquitto
# Ubuntu/Debian
sudo apt update
sudo apt install mosquitto mosquitto-clients
# Конфигурация /etc/mosquitto/mosquitto.conf:
listener 1883
allow_anonymous false
password_file /etc/mosquitto/passwd
# TLS (обязательно для производства!):
listener 8883
cafile /etc/ssl/certs/ca-certificates.crt
certfile /etc/mosquitto/certs/server.crt
keyfile /etc/mosquitto/certs/server.key
require_certificate false
# WebSocket для Node-RED и браузерных клиентов:
listener 9001
protocol websockets
# Логирование:
log_dest file /var/log/mosquitto/mosquitto.log
log_type all
# Создание пользователя:
sudo mosquitto_passwd -c /etc/mosquitto/passwd username
sudo systemctl enable mosquitto
sudo systemctl start mosquitto
# Тест:
mosquitto_sub -h localhost -u user -P pass -t "factory/#" -v &
mosquitto_pub -h localhost -u user -P pass -t "factory/test" -m "hello"
ESP32: узел сбора данных
ESP32 — идеальный Edge-узел: WiFi/BT, 240 МГц, 520 КБ RAM, куча периферии, цена $3–5.
#include <WiFi.h>
#include <PubSubClient.h>
#include <ArduinoJson.h>
#include <Wire.h>
#include "Adafruit_BME280.h"
// ===== КОНФИГУРАЦИЯ =====
const char* WIFI_SSID = "Factory_WiFi";
const char* WIFI_PASSWORD = "secretpass";
const char* MQTT_SERVER = "192.168.1.100";
const int MQTT_PORT = 1883;
const char* MQTT_USER = "esp32_node1";
const char* MQTT_PASS = "nodepass";
const char* DEVICE_ID = "conveyor1";
// Топики
const char* TOPIC_TELEMETRY = "factory/line1/conveyor1/telemetry";
const char* TOPIC_STATUS = "factory/line1/conveyor1/status";
const char* TOPIC_ALARMS = "factory/line1/conveyor1/alarms";
const char* TOPIC_COMMANDS = "factory/line1/conveyor1/commands";
const char* TOPIC_WILL = "factory/line1/conveyor1/status";
WiFiClient espClient;
PubSubClient mqtt(espClient);
Adafruit_BME280 bme;
// Состояние
bool motorRunning = false;
float setpoint = 50.0f;
uint32_t lastPublish = 0;
uint32_t uptime_sec = 0;
// ===== ПОДКЛЮЧЕНИЕ =====
void connectWiFi() {
WiFi.begin(WIFI_SSID, WIFI_PASSWORD);
Serial.print("WiFi...");
while (WiFi.status() != WL_CONNECTED) {
delay(500);
Serial.print(".");
}
Serial.println(" OK");
Serial.println(WiFi.localIP());
}
void connectMQTT() {
while (!mqtt.connected()) {
Serial.print("MQTT...");
// Last Will & Testament — сообщение при потере связи
const char* willMsg = "{\"online\":false}";
if (mqtt.connect(DEVICE_ID, MQTT_USER, MQTT_PASS,
TOPIC_WILL, 1, true, willMsg)) {
Serial.println(" OK");
// Сообщение о подключении
mqtt.publish(TOPIC_STATUS, "{\"online\":true}", true);
// Подписываемся на команды
mqtt.subscribe(TOPIC_COMMANDS, 1); // QoS 1
} else {
Serial.printf(" Ошибка: %d\n", mqtt.state());
delay(5000);
}
}
}
// ===== ОБРАБОТКА КОМАНД =====
void mqttCallback(char* topic, byte* payload, unsigned int length) {
// Парсим JSON команду
StaticJsonDocument<256> doc;
DeserializationError error = deserializeJson(doc, payload, length);
if (error) {
Serial.println("JSON error");
return;
}
String topicStr = String(topic);
if (topicStr == TOPIC_COMMANDS) {
// Команда пуск/стоп
if (doc.containsKey("run")) {
motorRunning = doc["run"].as<bool>();
Serial.printf("Команда: %s\n", motorRunning ? "ПУСК" : "СТОП");
}
// Изменение уставки
if (doc.containsKey("setpoint")) {
setpoint = doc["setpoint"].as<float>();
Serial.printf("Уставка: %.1f\n", setpoint);
}
// Сброс аварии
if (doc["reset_alarm"].as<bool>()) {
Serial.println("Сброс аварии");
}
}
}
// ===== ПУБЛИКАЦИЯ ДАННЫХ =====
void publishTelemetry() {
// Читаем датчики
float temperature = bme.readTemperature();
float humidity = bme.readHumidity();
float pressure = bme.readPressure() / 100.0f;
int current_raw = analogRead(34); // 4-20мА через ACS712
float current_a = current_raw * 25.0f / 4095.0f; // 0-25А
// Формируем JSON
StaticJsonDocument<512> doc;
doc["device_id"] = DEVICE_ID;
doc["timestamp"] = millis() / 1000;
doc["uptime"] = uptime_sec;
doc["running"] = motorRunning;
doc["setpoint"] = setpoint;
JsonObject sensors = doc.createNestedObject("sensors");
sensors["temperature"] = round(temperature * 10) / 10.0;
sensors["humidity"] = round(humidity * 10) / 10.0;
sensors["pressure"] = round(pressure * 10) / 10.0;
sensors["current"] = round(current_a * 100) / 100.0;
// Диагностика устройства
JsonObject diag = doc.createNestedObject("diagnostics");
diag["wifi_rssi"] = WiFi.RSSI();
diag["free_heap"] = ESP.getFreeHeap();
diag["cpu_freq"] = ESP.getCpuFreqMHz();
// Сериализация и публикация
char payload[512];
serializeJson(doc, payload);
mqtt.publish(TOPIC_TELEMETRY, payload, false); // QoS 0, не retain
// Проверка алармов
if (temperature > 80.0f) {
StaticJsonDocument<128> alarm;
alarm["type"] = "high_temperature";
alarm["value"] = temperature;
alarm["limit"] = 80.0f;
alarm["message"] = "Превышена температура двигателя!";
char alarmPayload[128];
serializeJson(alarm, alarmPayload);
mqtt.publish(TOPIC_ALARMS, alarmPayload, true); // retain = true
}
}
// ===== SETUP / LOOP =====
void setup() {
Serial.begin(115200);
Wire.begin(21, 22);
if (!bme.begin(0x76)) {
Serial.println("BME280 не найден!");
}
connectWiFi();
mqtt.setServer(MQTT_SERVER, MQTT_PORT);
mqtt.setCallback(mqttCallback);
mqtt.setBufferSize(1024); // Увеличиваем буфер для больших сообщений
connectMQTT();
}
void loop() {
// Переподключение при потере связи
if (!mqtt.connected()) {
if (WiFi.status() != WL_CONNECTED) {
connectWiFi();
}
connectMQTT();
}
mqtt.loop();
// Публикация каждые 5 секунд
if (millis() - lastPublish >= 5000) {
lastPublish = millis();
uptime_sec += 5;
publishTelemetry();
}
}
Python: обработка и алармы
import paho.mqtt.client as mqtt
import json
import time
from datetime import datetime
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
# ===== КОНФИГУРАЦИЯ =====
MQTT_BROKER = "192.168.1.100"
MQTT_PORT = 1883
MQTT_USER = "backend"
MQTT_PASS = "backendpass"
INFLUX_URL = "http://localhost:8086"
INFLUX_TOKEN = "your-influx-token"
INFLUX_ORG = "factory"
INFLUX_BUCKET = "telemetry"
# ===== ИНИЦИАЛИЗАЦИЯ =====
influx_client = InfluxDBClient(url=INFLUX_URL, token=INFLUX_TOKEN, org=INFLUX_ORG)
write_api = influx_client.write_api(write_options=SYNCHRONOUS)
# Состояние алармов (дедупликация)
active_alarms = {}
def on_message(client, userdata, msg):
topic = msg.topic
try:
data = json.loads(msg.payload.decode())
except json.JSONDecodeError:
print(f"Ошибка JSON в топике {topic}")
return
# Маршрутизация по топику
if "/telemetry" in topic:
handle_telemetry(topic, data)
elif "/alarms" in topic:
handle_alarm(topic, data)
elif "/status" in topic:
handle_status(topic, data)
def handle_telemetry(topic: str, data: dict):
"""Запись телеметрии в InfluxDB"""
# Извлекаем путь устройства из топика
# factory/line1/conveyor1/telemetry → ['factory', 'line1', 'conveyor1', 'telemetry']
parts = topic.split('/')
if len(parts) < 4:
return
location = parts[1] # line1
device = parts[2] # conveyor1
sensors = data.get('sensors', {})
# Формируем точку данных для InfluxDB
point = (
Point("telemetry")
.tag("location", location)
.tag("device", device)
.tag("device_id", data.get('device_id', device))
.field("temperature", float(sensors.get('temperature', 0)))
.field("humidity", float(sensors.get('humidity', 0)))
.field("pressure", float(sensors.get('pressure', 0)))
.field("current", float(sensors.get('current', 0)))
.field("running", int(data.get('running', False)))
.field("wifi_rssi", int(data.get('diagnostics', {}).get('wifi_rssi', 0)))
)
try:
write_api.write(bucket=INFLUX_BUCKET, record=point)
except Exception as e:
print(f"InfluxDB ошибка: {e}")
# Проверка пороговых значений
temp = sensors.get('temperature', 0)
if temp > 85.0:
send_alert(device, "critical", f"Критическая температура: {temp}°C")
elif temp > 75.0:
send_alert(device, "warning", f"Высокая температура: {temp}°C")
def handle_alarm(topic: str, data: dict):
"""Обработка аларм-сообщений от устройства"""
parts = topic.split('/')
device = parts[2] if len(parts) >= 3 else "unknown"
alarm_type = data.get('type', 'unknown')
alarm_key = f"{device}_{alarm_type}"
# Дедупликация: не спамим одинаковые алармы
now = time.time()
if alarm_key in active_alarms:
if now - active_alarms[alarm_key] < 300: # 5 минут
return
active_alarms[alarm_key] = now
print(f"🚨
АВАРИЯ [{device}]: {data.get('message', alarm_type)}") # Здесь можно добавить отправку в Telegram, email, SMS send_notification( f"⚠️ Авария на {device}\n" f"Тип: {alarm_type}\n" f"Значение: {data.get('value', 'N/A')}\n" f"Время: {datetime.now().strftime('%H:%M:%S')}" ) def handle_status(topic: str, data: dict): """Отслеживание онлайн/офлайн устройств""" parts = topic.split('/') device = parts[2] if len(parts) >= 3 else "unknown" online = data.get('online', False) print(f"{'🟢' if online else '🔴'} {device}: {'онлайн' if online else 'офлайн'}") if not online: send_alert(device, "critical", f"Устройство {device} потеряло связь!") def send_alert(device: str, level: str, message: str): """Отправка алерта (пример — в лог)""" timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") print(f"[{timestamp}] {level.upper()} [{device}]: {message}") def send_notification(text: str): """Здесь интеграция с Telegram Bot API""" # import requests # requests.post(f"https://api.telegram.org/bot{TOKEN}/sendMessage", # json={"chat_id": CHAT_ID, "text": text}) print(f"NOTIFICATION: {text}") # ===== ЗАПУСК ===== client = mqtt.Client(client_id="backend_processor") client.username_pw_set(MQTT_USER, MQTT_PASS) client.on_message = on_message client.connect(MQTT_BROKER, MQTT_PORT, 60) client.subscribe("factory/#", qos=1) # Подписка на всё print("Backend запущен, ожидаем данные...") client.loop_forever()
InfluxDB + Grafana: красивые дашборды
Установка через Docker Compose:
# docker-compose.yml
version: '3.8'
services:
mosquitto:
image: eclipse-mosquitto:2
ports:
- "1883:1883"
- "9001:9001"
volumes:
- ./mosquitto/config:/mosquitto/config
- mosquitto_data:/mosquitto/data
influxdb:
image: influxdb:2.7
ports:
- "8086:8086"
environment:
DOCKER_INFLUXDB_INIT_MODE: setup
DOCKER_INFLUXDB_INIT_USERNAME: admin
DOCKER_INFLUXDB_INIT_PASSWORD: secretpassword
DOCKER_INFLUXDB_INIT_ORG: factory
DOCKER_INFLUXDB_INIT_BUCKET: telemetry
DOCKER_INFLUXDB_INIT_RETENTION: 30d # Хранение 30 дней
volumes:
- influxdb_data:/var/lib/influxdb2
grafana:
image: grafana/grafana:latest
ports:
- "3000:3000"
environment:
GF_SECURITY_ADMIN_PASSWORD: grafanapass
GF_PLUGINS_ALLOW_LOADING_UNSIGNED_PLUGINS: "yesoreyeram-infinity-datasource"
volumes:
- grafana_data:/var/lib/grafana
depends_on:
- influxdb
node-red:
image: nodered/node-red:latest
ports:
- "1880:1880"
volumes:
- nodered_data:/data
volumes:
mosquitto_data:
influxdb_data:
grafana_data:
nodered_data:
Flux-запрос в Grafana (температура за последний час):
from(bucket: "telemetry")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "telemetry")
|> filter(fn: (r) => r._field == "temperature")
|> filter(fn: (r) => r.device == "conveyor1")
|> aggregateWindow(every: 1m, fn: mean, createEmpty: false)
|> yield(name: "mean_temperature")
Предиктивная аналитика: пример с вибрацией
import numpy as np
from scipy.fft import fft, fftfreq
from scipy.stats import zscore
def analyze_vibration(samples: list, sample_rate: int = 1000) -> dict:
"""
Анализ вибрации для обнаружения износа подшипников.
samples: список отсчётов акселерометра
sample_rate: частота дискретизации, Гц
"""
data = np.array(samples, dtype=float)
n = len(data)
# Временные характеристики
rms = np.sqrt(np.mean(data**2)) # Эффективное значение (RMS)
peak = np.max(np.abs(data)) # Пиковое значение
crest = peak / rms if rms > 0 else 0 # Пик-фактор (norma: 1.4–2.5, износ: >4)
kurtosis = float(np.mean((data - np.mean(data))**4) / (np.std(data)**4 + 1e-10))
# Спектральный анализ (FFT)
spectrum = np.abs(fft(data))[:n//2]
freqs = fftfreq(n, 1.0 / sample_rate)[:n//2]
# Поиск доминирующих частот
top_indices = np.argsort(spectrum)[-5:][::-1]
dominant_freqs = [(float(freqs[i]), float(spectrum[i])) for i in top_indices]
# Оценка состояния
# Crest Factor: <2.5 — норма, 2.5–4 — внимание, >4 — износ
# Kurtosis: <3 — норма (гауссов шум), >6 — дефект (удары)
if crest > 4.0 or kurtosis > 6.0:
status = "FAULT"
recommendation = "Замените подшипник в течение 48 часов"
elif crest > 2.5 or kurtosis > 4.5:
status = "WARNING"
recommendation = "Запланируйте замену при следующем ТО"
else:
status = "OK"
recommendation = "Оборудование в норме"
return {
'rms': round(rms, 4),
'peak': round(peak, 4),
'crest_factor': round(crest, 2),
'kurtosis': round(kurtosis, 2),
'dominant_freqs': dominant_freqs[:3],
'status': status,
'recommendation': recommendation,
}
# Пример использования с MQTT:
def on_vibration_data(client, userdata, msg):
data = json.loads(msg.payload)
samples = data['samples']
device = data['device_id']
analysis = analyze_vibration(samples, sample_rate=data.get('sample_rate', 1000))
# Публикуем результат анализа
result_topic = f"factory/analytics/{device}/bearing_health"
client.publish(result_topic, json.dumps(analysis), retain=True)
if analysis['status'] != 'OK':
print(f"⚠️
{device}: {analysis['recommendation']}") # Отправить уведомление...
Безопасность IIoT
Это не опционально. Промышленные системы с интернет-подключением — лакомая цель для атак.
Минимальный стандарт:
TLS везде — Mosquitto с сертификатами, никакого plaintext
Аутентификация — уникальный логин/пароль для каждого устройства, или X.509 сертификаты
Авторизация по ACL — устройство
conveyor1пишет только вfactory/line1/conveyor1/#, не может читать чужие командыСегментация сети — IoT-устройства в отдельном VLAN, без прямого доступа в интернет
OTA-обновления — возможность удалённого обновления прошивки при обнаружении уязвимостей
Мониторинг аномалий — необычное количество сообщений, соединения с нестандартных IP
# Mosquitto ACL файл /etc/mosquitto/acl:
# Устройство conveyor1 — пишет только в свои топики
user esp32_conveyor1
topic write factory/line1/conveyor1/+
topic read factory/line1/conveyor1/commands
# Backend — читает всё, пишет команды
user backend
topic readwrite factory/#
Заключение
IIoT с MQTT — это доступная и проверенная технология, которую можно внедрить даже на небольшом предприятии с минимальными затратами. ESP32 + Mosquitto + InfluxDB + Grafana — весь стек работает на одном Raspberry Pi 4 или бюджетном сервере.
Главные принципы: данные должны быть точными (правильная калибровка датчиков), надёжными (QoS, переподключение, buffering), безопасными (TLS, ACL) и полезными (не просто собирать, а анализировать и действовать).
Начните с малого: один датчик температуры, один MQTT-брокер, один Grafana-дашборд. После первого успешного графика желание расширять систему появится само.
Create an account or sign in to leave a review
There are no reviews to display.