Когда Linux вместо микроконтроллера
Выбор между голым МК (Arduino/STM32) и Linux-системой (Raspberry Pi/промышленный ПК) — одно из ключевых архитектурных решений.
Linux выигрывает когда нужно:
Сложные сетевые протоколы (TCP/IP стек, TLS, MQTT, OPC UA)
Работа с файлами: логирование, конфигурация, обновления ПО
Высокоуровневые вычисления: Python/NumPy, ML-инференс
Несколько параллельных задач с разной логикой
Веб-интерфейс или REST API
Большой объём RAM/Flash (база данных, historian)
Микроконтроллер выигрывает когда нужно:
Детерминированное реальное время (< 1 мс)
Мгновенный старт (Linux загружается 10–30 секунд)
Минимальное энергопотребление
Дешёвое серийное производство
Золотое правило: Linux для "мозга" и коммуникаций, микроконтроллер для "мышц" и реального времени. Оба — в одной системе, связанные UART или SPI.
Raspberry Pi в промышленности: что учесть
Raspberry Pi не проектировался для промышленного применения, но активно используется. Ключевые ограничения и решения:
Ограничение 1: SD-карта умирает SD-карты не рассчитаны на постоянную запись. В промышленном применении — выход из строя за 3–12 месяцев.
Решения:
# 1. Read-only файловая система (overlayfs)
# В /boot/cmdline.txt добавить: overlayroot=tmpfs
# Данные писать только на специальный раздел с journaling
# 2. Переместить tmpfs для логов в RAM
# /etc/fstab:
tmpfs /tmp tmpfs defaults,noatime,size=100m 0 0
tmpfs /var/log tmpfs defaults,noatime,size=50m 0 0
tmpfs /var/tmp tmpfs defaults,noatime,size=20m 0 0
# 3. Использовать SSD через USB3 или eMMC-модуль (CM4)
Ограничение 2: Нет RTC (часов реального времени) При потере питания время сбивается.
# Установить модуль DS3231 через I2C
# /boot/config.txt:
dtoverlay=i2c-rtc,ds3231
# Синхронизация при загрузке:
sudo hwclock --hctosys # Hardware clock → System clock
Ограничение 3: Нет аппаратного watchdog "из коробки"
# Включить встроенный watchdog BCM2835
# /boot/config.txt:
dtparam=watchdog=on
# /etc/systemd/system.conf:
RuntimeWatchdogSec=10 # Сброс если systemd не пингует 10 секунд
RebootWatchdogSec=60
# Проверка:
ls /dev/watchdog # Должен существовать
GPIO: управление пинами из userspace
sysfs (устаревший, но всё ещё работает):
# Экспортируем GPIO 17
echo "17" > /sys/class/gpio/export
# Устанавливаем направление
echo "out" > /sys/class/gpio/gpio17/direction
# Устанавливаем значение
echo "1" > /sys/class/gpio/gpio17/value
# Читаем состояние входа
cat /sys/class/gpio/gpio18/value
libgpiod (современный стандарт):
# Установка
sudo apt install gpiod libgpiod-dev
# Командная строка
gpioget gpiochip0 17 # Прочитать GPIO 17
gpioset gpiochip0 17=1 # Установить в HIGH
gpioset gpiochip0 17=0 18=1 # Установить несколько
gpioinfo gpiochip0 # Информация о всех пинах
# Python + gpiod
# pip install gpiod
import gpiod
import time
# Открываем чип
chip = gpiod.Chip('gpiochip0')
# Настраиваем пины
led_line = chip.get_line(17)
button_line = chip.get_line(18)
led_config = gpiod.LineRequest()
led_config.consumer = "myapp"
led_config.request_type = gpiod.LineRequest.DIRECTION_OUTPUT
led_line.request(led_config)
btn_config = gpiod.LineRequest()
btn_config.consumer = "myapp"
btn_config.request_type = gpiod.LineRequest.EVENT_BOTH_EDGES # Прерывания!
button_line.request(btn_config)
try:
while True:
# Ожидание события с таймаутом 100 мс
event_happened = button_line.event_wait(nsec=100_000_000)
if event_happened:
event = button_line.event_read()
if event.type == gpiod.LineEvent.RISING_EDGE:
print("Кнопка нажата")
led_line.set_value(1)
else:
print("Кнопка отпущена")
led_line.set_value(0)
finally:
led_line.release()
button_line.release()
SPI и I2C из userspace
I2C (smbus2):
# pip install smbus2
import smbus2
import time
class BME280_Linux:
"""Работа с датчиком BME280 через Linux I2C"""
ADDR = 0x76
REG_ID = 0xD0
REG_CTRL = 0xF4
REG_DATA = 0xF7
def __init__(self, bus_num: int = 1):
self.bus = smbus2.SMBus(bus_num)
def read_reg(self, reg: int) -> int:
return self.bus.read_byte_data(self.ADDR, reg)
def write_reg(self, reg: int, value: int):
self.bus.write_byte_data(self.ADDR, reg, value)
def read_burst(self, reg: int, length: int) -> bytes:
return bytes(self.bus.read_i2c_block_data(self.ADDR, reg, length))
def init(self):
chip_id = self.read_reg(self.REG_ID)
if chip_id != 0x60:
raise RuntimeError(f"BME280 не найден, ID={chip_id:#x}")
# Нормальный режим, oversampling ×4
self.write_reg(0xF4, 0x97) # ctrl_meas
self.write_reg(0xF5, 0xA0) # config: IIR filter 16
time.sleep(0.1)
def read(self) -> dict:
data = self.read_burst(self.REG_DATA, 6)
raw_press = (data[0] << 12) | (data[1] << 4) | (data[2] >> 4)
raw_temp = (data[3] << 12) | (data[4] << 4) | (data[5] >> 4)
# Упрощённое преобразование (без компенсации)
# В реальности нужно читать калибровочные коэффициенты!
temp = raw_temp / 5120.0
pressure = raw_press / 25600.0 / 100.0 # гПа
return {'temperature': round(temp, 1), 'pressure': round(pressure, 1)}
# Использование:
sensor = BME280_Linux(bus_num=1) # /dev/i2c-1
sensor.init()
while True:
data = sensor.read()
print(f"T={data['temperature']}°C, P={data['pressure']}гПа")
time.sleep(1)
SPI (spidev):
import spidev
import time
class MCP3208_ADC:
"""12-битный АЦП MCP3208 через SPI"""
CHANNELS = 8
def __init__(self, bus: int = 0, device: int = 0, speed_hz: int = 1_000_000):
self.spi = spidev.SpiDev()
self.spi.open(bus, device) # /dev/spidev0.0
self.spi.max_speed_hz = speed_hz
self.spi.mode = 0
def read_channel(self, channel: int) -> int:
"""Чтение канала 0-7, возвращает 0-4095"""
if not 0 <= channel < self.CHANNELS:
raise ValueError(f"Канал {channel} вне диапазона 0-7")
# MCP3208: 3 байта транзакции
# Байт 1: старт-бит + single/diff + D2
# Байт 2: D1, D0, X, X, X, X, X, X
# Байт 3: X, X, X, X, X, X, X, X
cmd = [0x06 | (channel >> 2), (channel & 0x03) << 6, 0x00]
response = self.spi.xfer2(cmd)
# Из ответа: байт 1 биты 1-0 + байт 2 все 8 бит = 12 бит
result = ((response[1] & 0x0F) << 8) | response[2]
return result
def read_voltage(self, channel: int, vref: float = 3.3) -> float:
"""Чтение в вольтах"""
raw = self.read_channel(channel)
return raw * vref / 4095.0
def read_all(self, vref: float = 3.3) -> list:
"""Чтение всех 8 каналов"""
return [self.read_voltage(ch, vref) for ch in range(self.CHANNELS)]
def close(self):
self.spi.close()
# Использование:
adc = MCP3208_ADC()
while True:
voltages = adc.read_all()
for ch, v in enumerate(voltages):
print(f"CH{ch}: {v:.3f}В", end=" ")
print()
time.sleep(0.5)
UART и последовательный порт
import serial
import serial.tools.list_ports
# Найти все доступные порты
def list_serial_ports():
ports = serial.tools.list_ports.comports()
for port in ports:
print(f"{port.device}: {port.description} ({port.hwid})")
# На Raspberry Pi:
# /dev/ttyAMA0 или /dev/serial0 — встроенный UART (GPIO 14/15)
# /dev/ttyUSB0 — USB-UART адаптер
# /dev/ttyACM0 — USB CDC (Arduino)
# Важно для RPi: отключить console на /dev/serial0
# sudo raspi-config → Interface Options → Serial Port
# "Would you like a login shell to be accessible over serial?" → No
# "Would you like the serial port hardware to be enabled?" → Yes
class SerialDevice:
"""Надёжная работа с последовательным портом"""
def __init__(self, port: str, baudrate: int = 9600, timeout: float = 1.0):
self.port_name = port
self.baudrate = baudrate
self.timeout = timeout
self.ser = None
def connect(self):
try:
self.ser = serial.Serial(
port = self.port_name,
baudrate = self.baudrate,
bytesize = serial.EIGHTBITS,
parity = serial.PARITY_NONE,
stopbits = serial.STOPBITS_ONE,
timeout = self.timeout
)
print(f"Подключено: {self.port_name}")
return True
except serial.SerialException as e:
print(f"Ошибка подключения: {e}")
return False
def send(self, data: bytes) -> bool:
try:
self.ser.write(data)
return True
except serial.SerialException:
return False
def send_line(self, text: str) -> bool:
return self.send((text + '\n').encode('utf-8'))
def read_line(self) -> str | None:
try:
line = self.ser.readline()
if line:
return line.decode('utf-8', errors='replace').strip()
return None
except serial.SerialException:
return None
def close(self):
if self.ser and self.ser.is_open:
self.ser.close()
systemd: управление сервисами приложения
Правильное промышленное приложение на Linux должно запускаться как системный сервис — автостарт при загрузке, перезапуск при сбое, логирование.
Создание systemd unit:
# /etc/systemd/system/industrial-gateway.service
[Unit]
Description=Industrial IoT Gateway
After=network.target
Wants=network-online.target
After=network-online.target
# Зависимость от другого сервиса (например, MQTT-брокера)
# Requires=mosquitto.service
# After=mosquitto.service
[Service]
Type=simple
User=pi
Group=pi
WorkingDirectory=/opt/gateway
# Переменные окружения из файла
EnvironmentFile=/etc/gateway/config.env
# Команда запуска
ExecStart=/opt/gateway/venv/bin/python /opt/gateway/main.py
# Перезапуск при сбое
Restart=on-failure
RestartSec=10s
StartLimitIntervalSec=60s
StartLimitBurst=3 # Максимум 3 попытки за 60 секунд
# Watchdog интеграция с systemd
# Приложение должно вызывать sd_notify WATCHDOG=1 каждые N секунд
WatchdogSec=30s
# Логирование
StandardOutput=journal
StandardError=journal
SyslogIdentifier=gateway
# Безопасность (опционально, но рекомендуется)
NoNewPrivileges=true
PrivateTmp=true
[Install]
WantedBy=multi-user.target
# Установка и управление:
sudo systemctl daemon-reload
sudo systemctl enable industrial-gateway
sudo systemctl start industrial-gateway
sudo systemctl status industrial-gateway
# Логи:
journalctl -u industrial-gateway -f # В реальном времени
journalctl -u industrial-gateway --since today
journalctl -u industrial-gateway -n 100 # Последние 100 строк
Watchdog из Python (sd-notify):
# pip install sdnotify
import sdnotify
import time
import threading
notifier = sdnotify.SystemdNotifier()
def watchdog_thread():
"""Пингуем systemd watchdog каждые 10 секунд"""
while True:
notifier.notify("WATCHDOG=1")
time.sleep(10)
def main():
# Сообщаем systemd что мы готовы
notifier.notify("READY=1")
notifier.notify("STATUS=Инициализация...")
# Запускаем watchdog в фоне
wdg = threading.Thread(target=watchdog_thread, daemon=True)
wdg.start()
try:
# Основной цикл приложения
while True:
notifier.notify("STATUS=Работает нормально")
# ... бизнес-логика
time.sleep(1)
except Exception as e:
notifier.notify(f"STATUS=ОШИБКА: {e}")
raise
Сетевая конфигурация для промышленного шлюза
# /etc/dhcpcd.conf — статический IP для промышленной сети
interface eth0
static ip_address=192.168.1.200/24
static routers=192.168.1.1
static domain_name_servers=192.168.1.1 8.8.8.8
# Или через NetworkManager (современный способ):
sudo nmcli con add type ethernet ifname eth0 con-name industrial \
ipv4.method manual ipv4.addresses 192.168.1.200/24 \
ipv4.gateway 192.168.1.1 ipv4.dns "192.168.1.1 8.8.8.8"
sudo nmcli con up industrial
Bonding (резервирование сети):
# Два сетевых интерфейса — один основной, второй резервный
# /etc/network/interfaces:
auto bond0
iface bond0 inet static
address 192.168.1.200
netmask 255.255.255.0
gateway 192.168.1.1
bond-slaves eth0 eth1
bond-mode active-backup # При отказе eth0 — переключаемся на eth1
bond-miimon 100 # Проверка связи каждые 100 мс
bond-primary eth0
Buildroot: минимальный Linux-образ
Для серийного производства не нужен полный Raspberry Pi OS с Python IDE и LibreOffice. Нужен минимальный образ с только нужными компонентами.
Buildroot — система сборки кастомных Linux-образов:
# Клонируем Buildroot
git clone https://git.buildroot.net/buildroot
cd buildroot
# Начинаем с дефолтной конфигурации для Raspberry Pi
make raspberrypi4_64_defconfig
# Настраиваем через menuconfig
make menuconfig
# Target packages → Networking applications → mosquitto (MQTT-брокер)
# Target packages → Libraries → python3
# Target packages → Libraries → python-paho-mqtt
# System configuration → Root password
# Собираем (первый раз ~1-2 часа)
make -j4
# Результат: output/images/sdcard.img
# Записываем на SD:
sudo dd if=output/images/sdcard.img of=/dev/sdX bs=4M status=progress
Преимущества кастомного образа:
Размер: 50–200 МБ вместо 4–8 ГБ
Быстрый старт: 5–8 секунд вместо 30+
Безопасность: минимальная поверхность атаки
Reproducible builds: одинаковый образ на всех устройствах
Python-приложение как надёжный шлюз
#!/opt/gateway/venv/bin/python3
"""
Промышленный IoT-шлюз: Modbus RTU → MQTT
"""
import logging
import signal
import sys
import time
import json
import threading
from pathlib import Path
from typing import Optional
import sdnotify
import paho.mqtt.client as mqtt
from pymodbus.client import ModbusSerialClient
# Настройка логирования (в journald через stderr)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(name)s: %(message)s',
stream=sys.stderr
)
log = logging.getLogger('gateway')
class IndustrialGateway:
def __init__(self):
self.running = False
self.notifier = sdnotify.SystemdNotifier()
self.mqtt_client: Optional[mqtt.Client] = None
self.modbus_client: Optional[ModbusSerialClient] = None
# Конфигурация из переменных окружения
import os
self.mqtt_host = os.getenv('MQTT_HOST', 'localhost')
self.mqtt_port = int(os.getenv('MQTT_PORT', '1883'))
self.modbus_port = os.getenv('MODBUS_PORT', '/dev/serial0')
self.modbus_baud = int(os.getenv('MODBUS_BAUD', '9600'))
self.poll_interval = float(os.getenv('POLL_INTERVAL', '1.0'))
def setup(self):
"""Инициализация подключений"""
log.info("Инициализация шлюза...")
# MQTT
self.mqtt_client = mqtt.Client(client_id="industrial-gateway")
self.mqtt_client.on_connect = self._on_mqtt_connect
self.mqtt_client.on_disconnect = self._on_mqtt_disconnect
self.mqtt_client.will_set("gateway/status",
'{"online": false}', retain=True)
self.mqtt_client.connect_async(self.mqtt_host, self.mqtt_port)
self.mqtt_client.loop_start()
# Modbus
self.modbus_client = ModbusSerialClient(
port=self.modbus_port,
baudrate=self.modbus_baud,
timeout=1.0
)
if not self.modbus_client.connect():
log.error(f"Не удалось подключиться к Modbus: {self.modbus_port}")
def _on_mqtt_connect(self, client, userdata, flags, rc):
if rc == 0:
log.info(f"MQTT подключён: {self.mqtt_host}")
client.publish("gateway/status", '{"online": true}', retain=True)
else:
log.error(f"MQTT ошибка подключения: {rc}")
def _on_mqtt_disconnect(self, client, userdata, rc):
log.warning(f"MQTT отключён (rc={rc}), переподключение...")
def poll_device(self, device_addr: int, reg_start: int,
reg_count: int) -> Optional[list]:
"""Опрос устройства Modbus"""
try:
result = self.modbus_client.read_input_registers(
address=reg_start, count=reg_count, slave=device_addr
)
if result.isError():
log.warning(f"Modbus ошибка: устройство {device_addr}")
return None
return result.registers
except Exception as e:
log.error(f"Исключение при опросе {device_addr}: {e}")
return None
def publish(self, topic: str, data: dict):
"""Публикация в MQTT"""
try:
payload = json.dumps(data, ensure_ascii=False)
self.mqtt_client.publish(topic, payload)
except Exception as e:
log.error(f"Ошибка публикации: {e}")
def run(self):
"""Главный цикл"""
self.running = True
# Сигналы завершения
signal.signal(signal.SIGTERM, self._shutdown)
signal.signal(signal.SIGINT, self._shutdown)
self.setup()
# Сообщаем systemd о готовности
self.notifier.notify("READY=1")
log.info("Шлюз запущен и готов к работе")
poll_count = 0
while self.running:
loop_start = time.monotonic()
# Опрашиваем частотник (адрес 1, регистры 0-5)
vfd_data = self.poll_device(1, 0, 6)
if vfd_data:
self.publish("factory/line1/vfd1/telemetry", {
'status_word': vfd_data[0],
'freq_hz': vfd_data[1] / 100.0,
'current_a': vfd_data[2] / 10.0,
'voltage_v': vfd_data[3],
'power_kw': vfd_data[4] / 10.0,
'fault_code': vfd_data[5],
'running': bool(vfd_data[0] & 0x0001),
'fault': bool(vfd_data[0] & 0x0008),
})
# Опрашиваем датчик давления (адрес 5, регистры 0-1)
pressure_data = self.poll_device(5, 0, 2)
if pressure_data:
import struct
raw = struct.pack('>HH', pressure_data[0], pressure_data[1])
pressure = struct.unpack('>f', raw)[0]
self.publish("factory/line1/pressure1/telemetry", {
'pressure_bar': round(pressure, 2)
})
poll_count += 1
# Watchdog
if poll_count % 5 == 0:
self.notifier.notify("WATCHDOG=1")
self.notifier.notify(f"STATUS=Опросов: {poll_count}")
# Точный таймер интервала опроса
elapsed = time.monotonic() - loop_start
sleep_time = self.poll_interval - elapsed
if sleep_time > 0:
time.sleep(sleep_time)
elif sleep_time < -0.1:
log.warning(f"Опрос занял {elapsed:.3f}с (лимит {self.poll_interval}с)")
def _shutdown(self, signum, frame):
log.info(f"Получен сигнал {signum}, завершаем работу...")
self.running = False
if self.mqtt_client:
self.mqtt_client.publish("gateway/status",
'{"online": false}', retain=True)
self.mqtt_client.loop_stop()
if self.modbus_client:
self.modbus_client.close()
sys.exit(0)
if __name__ == "__main__":
gateway = IndustrialGateway()
gateway.run()
Безопасность Linux-устройства в сети
# Минимальный hardening для промышленного устройства
# 1. Обновление системы (автоматически)
sudo apt install unattended-upgrades
sudo dpkg-reconfigure unattended-upgrades
# 2. UFW файрвол
sudo apt install ufw
sudo ufw default deny incoming
sudo ufw default allow outgoing
sudo ufw allow ssh
sudo ufw allow 1883 # MQTT (только если нужен снаружи)
sudo ufw enable
# 3. Отключить неиспользуемые сервисы
sudo systemctl disable bluetooth
sudo systemctl disable avahi-daemon
# 4. Изменить стандартный пароль!
passwd pi # ОБЯЗАТЕЛЬНО
# 5. SSH ключи вместо паролей
# Скопировать ключ: ssh-copy-id pi@192.168.1.200
# /etc/ssh/sshd_config:
# PasswordAuthentication no
# PermitRootLogin no
# 6. Логирование попыток взлома
sudo apt install fail2ban
Заключение
Linux на встраиваемых системах — мощнейший инструмент. Raspberry Pi, как платформа, имеет ограничения для промышленного применения, но при правильной настройке (read-only FS, watchdog, systemd-сервисы, резервирование) служит надёжно годами.
Для новых промышленных проектов рассмотрите специализированные платформы: Raspberry Pi CM4 (eMMC, нет SD-карты), BeagleBone Black (PRU для реального времени), Toradex Colibri/Apalis (промышленный температурный диапазон, многолетняя поддержка), IEI, Advantech (сертифицированные промышленные платформы).
Главный принцип для промышленного Linux: устройство должно работать без участия человека годами. Watchdog, автоперезапуск сервисов, защита файловой системы от записи, автоматические обновления безопасности — это не опции, это базовые требования.
Create an account or sign in to leave a review
There are no reviews to display.