Jump to content
View in the app

A better way to browse. Learn more.

T.M.I IThub

A full-screen app on your home screen with push notifications, badges and more.

To install this app on iOS and iPadOS
  1. Tap the Share icon in Safari
  2. Scroll the menu and tap Add to Home Screen.
  3. Tap Add in the top-right corner.
To install this app on Android
  1. Tap the 3-dot menu (⋮) in the top-right corner of the browser.
  2. Tap Add to Home screen or Install app.
  3. Confirm by tapping Install.

Когда Linux вместо микроконтроллера

Выбор между голым МК (Arduino/STM32) и Linux-системой (Raspberry Pi/промышленный ПК) — одно из ключевых архитектурных решений.

Linux выигрывает когда нужно:

  • Сложные сетевые протоколы (TCP/IP стек, TLS, MQTT, OPC UA)

  • Работа с файлами: логирование, конфигурация, обновления ПО

  • Высокоуровневые вычисления: Python/NumPy, ML-инференс

  • Несколько параллельных задач с разной логикой

  • Веб-интерфейс или REST API

  • Большой объём RAM/Flash (база данных, historian)

Микроконтроллер выигрывает когда нужно:

  • Детерминированное реальное время (< 1 мс)

  • Мгновенный старт (Linux загружается 10–30 секунд)

  • Минимальное энергопотребление

  • Дешёвое серийное производство

Золотое правило: Linux для "мозга" и коммуникаций, микроконтроллер для "мышц" и реального времени. Оба — в одной системе, связанные UART или SPI.


Raspberry Pi в промышленности: что учесть

Raspberry Pi не проектировался для промышленного применения, но активно используется. Ключевые ограничения и решения:

Ограничение 1: SD-карта умирает SD-карты не рассчитаны на постоянную запись. В промышленном применении — выход из строя за 3–12 месяцев.

Решения:

# 1. Read-only файловая система (overlayfs)
# В /boot/cmdline.txt добавить: overlayroot=tmpfs
# Данные писать только на специальный раздел с journaling

# 2. Переместить tmpfs для логов в RAM
# /etc/fstab:
tmpfs /tmp      tmpfs defaults,noatime,size=100m 0 0
tmpfs /var/log  tmpfs defaults,noatime,size=50m  0 0
tmpfs /var/tmp  tmpfs defaults,noatime,size=20m  0 0

# 3. Использовать SSD через USB3 или eMMC-модуль (CM4)

Ограничение 2: Нет RTC (часов реального времени) При потере питания время сбивается.

# Установить модуль DS3231 через I2C
# /boot/config.txt:
dtoverlay=i2c-rtc,ds3231

# Синхронизация при загрузке:
sudo hwclock --hctosys  # Hardware clock → System clock

Ограничение 3: Нет аппаратного watchdog "из коробки"

# Включить встроенный watchdog BCM2835
# /boot/config.txt:
dtparam=watchdog=on

# /etc/systemd/system.conf:
RuntimeWatchdogSec=10   # Сброс если systemd не пингует 10 секунд
RebootWatchdogSec=60

# Проверка:
ls /dev/watchdog  # Должен существовать

GPIO: управление пинами из userspace

sysfs (устаревший, но всё ещё работает):

# Экспортируем GPIO 17
echo "17" > /sys/class/gpio/export

# Устанавливаем направление
echo "out" > /sys/class/gpio/gpio17/direction

# Устанавливаем значение
echo "1" > /sys/class/gpio/gpio17/value

# Читаем состояние входа
cat /sys/class/gpio/gpio18/value

libgpiod (современный стандарт):

# Установка
sudo apt install gpiod libgpiod-dev

# Командная строка
gpioget gpiochip0 17         # Прочитать GPIO 17
gpioset gpiochip0 17=1       # Установить в HIGH
gpioset gpiochip0 17=0 18=1  # Установить несколько
gpioinfo gpiochip0           # Информация о всех пинах
# Python + gpiod
# pip install gpiod
import gpiod
import time

# Открываем чип
chip = gpiod.Chip('gpiochip0')

# Настраиваем пины
led_line    = chip.get_line(17)
button_line = chip.get_line(18)

led_config = gpiod.LineRequest()
led_config.consumer = "myapp"
led_config.request_type = gpiod.LineRequest.DIRECTION_OUTPUT
led_line.request(led_config)

btn_config = gpiod.LineRequest()
btn_config.consumer = "myapp"
btn_config.request_type = gpiod.LineRequest.EVENT_BOTH_EDGES  # Прерывания!
button_line.request(btn_config)

try:
    while True:
        # Ожидание события с таймаутом 100 мс
        event_happened = button_line.event_wait(nsec=100_000_000)
        
        if event_happened:
            event = button_line.event_read()
            if event.type == gpiod.LineEvent.RISING_EDGE:
                print("Кнопка нажата")
                led_line.set_value(1)
            else:
                print("Кнопка отпущена")
                led_line.set_value(0)
finally:
    led_line.release()
    button_line.release()

SPI и I2C из userspace

I2C (smbus2):

# pip install smbus2
import smbus2
import time

class BME280_Linux:
    """Работа с датчиком BME280 через Linux I2C"""
    
    ADDR = 0x76
    REG_ID     = 0xD0
    REG_CTRL   = 0xF4
    REG_DATA   = 0xF7
    
    def __init__(self, bus_num: int = 1):
        self.bus = smbus2.SMBus(bus_num)
    
    def read_reg(self, reg: int) -> int:
        return self.bus.read_byte_data(self.ADDR, reg)
    
    def write_reg(self, reg: int, value: int):
        self.bus.write_byte_data(self.ADDR, reg, value)
    
    def read_burst(self, reg: int, length: int) -> bytes:
        return bytes(self.bus.read_i2c_block_data(self.ADDR, reg, length))
    
    def init(self):
        chip_id = self.read_reg(self.REG_ID)
        if chip_id != 0x60:
            raise RuntimeError(f"BME280 не найден, ID={chip_id:#x}")
        
        # Нормальный режим, oversampling ×4
        self.write_reg(0xF4, 0x97)  # ctrl_meas
        self.write_reg(0xF5, 0xA0)  # config: IIR filter 16
        time.sleep(0.1)
    
    def read(self) -> dict:
        data = self.read_burst(self.REG_DATA, 6)
        
        raw_press = (data[0] << 12) | (data[1] << 4) | (data[2] >> 4)
        raw_temp  = (data[3] << 12) | (data[4] << 4) | (data[5] >> 4)
        
        # Упрощённое преобразование (без компенсации)
        # В реальности нужно читать калибровочные коэффициенты!
        temp     = raw_temp / 5120.0
        pressure = raw_press / 25600.0 / 100.0  # гПа
        
        return {'temperature': round(temp, 1), 'pressure': round(pressure, 1)}

# Использование:
sensor = BME280_Linux(bus_num=1)  # /dev/i2c-1
sensor.init()

while True:
    data = sensor.read()
    print(f"T={data['temperature']}°C, P={data['pressure']}гПа")
    time.sleep(1)

SPI (spidev):

import spidev
import time

class MCP3208_ADC:
    """12-битный АЦП MCP3208 через SPI"""
    
    CHANNELS = 8
    
    def __init__(self, bus: int = 0, device: int = 0, speed_hz: int = 1_000_000):
        self.spi = spidev.SpiDev()
        self.spi.open(bus, device)  # /dev/spidev0.0
        self.spi.max_speed_hz = speed_hz
        self.spi.mode = 0
    
    def read_channel(self, channel: int) -> int:
        """Чтение канала 0-7, возвращает 0-4095"""
        if not 0 <= channel < self.CHANNELS:
            raise ValueError(f"Канал {channel} вне диапазона 0-7")
        
        # MCP3208: 3 байта транзакции
        # Байт 1: старт-бит + single/diff + D2
        # Байт 2: D1, D0, X, X, X, X, X, X
        # Байт 3: X, X, X, X, X, X, X, X
        cmd = [0x06 | (channel >> 2), (channel & 0x03) << 6, 0x00]
        
        response = self.spi.xfer2(cmd)
        
        # Из ответа: байт 1 биты 1-0 + байт 2 все 8 бит = 12 бит
        result = ((response[1] & 0x0F) << 8) | response[2]
        return result
    
    def read_voltage(self, channel: int, vref: float = 3.3) -> float:
        """Чтение в вольтах"""
        raw = self.read_channel(channel)
        return raw * vref / 4095.0
    
    def read_all(self, vref: float = 3.3) -> list:
        """Чтение всех 8 каналов"""
        return [self.read_voltage(ch, vref) for ch in range(self.CHANNELS)]
    
    def close(self):
        self.spi.close()

# Использование:
adc = MCP3208_ADC()

while True:
    voltages = adc.read_all()
    for ch, v in enumerate(voltages):
        print(f"CH{ch}: {v:.3f}В", end="  ")
    print()
    time.sleep(0.5)

UART и последовательный порт

import serial
import serial.tools.list_ports

# Найти все доступные порты
def list_serial_ports():
    ports = serial.tools.list_ports.comports()
    for port in ports:
        print(f"{port.device}: {port.description} ({port.hwid})")

# На Raspberry Pi:
# /dev/ttyAMA0 или /dev/serial0 — встроенный UART (GPIO 14/15)
# /dev/ttyUSB0 — USB-UART адаптер
# /dev/ttyACM0 — USB CDC (Arduino)

# Важно для RPi: отключить console на /dev/serial0
# sudo raspi-config → Interface Options → Serial Port
# "Would you like a login shell to be accessible over serial?" → No
# "Would you like the serial port hardware to be enabled?" → Yes

class SerialDevice:
    """Надёжная работа с последовательным портом"""
    
    def __init__(self, port: str, baudrate: int = 9600, timeout: float = 1.0):
        self.port_name = port
        self.baudrate  = baudrate
        self.timeout   = timeout
        self.ser       = None
    
    def connect(self):
        try:
            self.ser = serial.Serial(
                port     = self.port_name,
                baudrate = self.baudrate,
                bytesize = serial.EIGHTBITS,
                parity   = serial.PARITY_NONE,
                stopbits = serial.STOPBITS_ONE,
                timeout  = self.timeout
            )
            print(f"Подключено: {self.port_name}")
            return True
        except serial.SerialException as e:
            print(f"Ошибка подключения: {e}")
            return False
    
    def send(self, data: bytes) -> bool:
        try:
            self.ser.write(data)
            return True
        except serial.SerialException:
            return False
    
    def send_line(self, text: str) -> bool:
        return self.send((text + '\n').encode('utf-8'))
    
    def read_line(self) -> str | None:
        try:
            line = self.ser.readline()
            if line:
                return line.decode('utf-8', errors='replace').strip()
            return None
        except serial.SerialException:
            return None
    
    def close(self):
        if self.ser and self.ser.is_open:
            self.ser.close()

systemd: управление сервисами приложения

Правильное промышленное приложение на Linux должно запускаться как системный сервис — автостарт при загрузке, перезапуск при сбое, логирование.

Создание systemd unit:

# /etc/systemd/system/industrial-gateway.service
[Unit]
Description=Industrial IoT Gateway
After=network.target
Wants=network-online.target
After=network-online.target

# Зависимость от другого сервиса (например, MQTT-брокера)
# Requires=mosquitto.service
# After=mosquitto.service

[Service]
Type=simple
User=pi
Group=pi
WorkingDirectory=/opt/gateway

# Переменные окружения из файла
EnvironmentFile=/etc/gateway/config.env

# Команда запуска
ExecStart=/opt/gateway/venv/bin/python /opt/gateway/main.py

# Перезапуск при сбое
Restart=on-failure
RestartSec=10s
StartLimitIntervalSec=60s
StartLimitBurst=3       # Максимум 3 попытки за 60 секунд

# Watchdog интеграция с systemd
# Приложение должно вызывать sd_notify WATCHDOG=1 каждые N секунд
WatchdogSec=30s

# Логирование
StandardOutput=journal
StandardError=journal
SyslogIdentifier=gateway

# Безопасность (опционально, но рекомендуется)
NoNewPrivileges=true
PrivateTmp=true

[Install]
WantedBy=multi-user.target
# Установка и управление:
sudo systemctl daemon-reload
sudo systemctl enable industrial-gateway
sudo systemctl start industrial-gateway
sudo systemctl status industrial-gateway

# Логи:
journalctl -u industrial-gateway -f          # В реальном времени
journalctl -u industrial-gateway --since today
journalctl -u industrial-gateway -n 100      # Последние 100 строк

Watchdog из Python (sd-notify):

# pip install sdnotify
import sdnotify
import time
import threading

notifier = sdnotify.SystemdNotifier()

def watchdog_thread():
    """Пингуем systemd watchdog каждые 10 секунд"""
    while True:
        notifier.notify("WATCHDOG=1")
        time.sleep(10)

def main():
    # Сообщаем systemd что мы готовы
    notifier.notify("READY=1")
    notifier.notify("STATUS=Инициализация...")
    
    # Запускаем watchdog в фоне
    wdg = threading.Thread(target=watchdog_thread, daemon=True)
    wdg.start()
    
    try:
        # Основной цикл приложения
        while True:
            notifier.notify("STATUS=Работает нормально")
            # ... бизнес-логика
            time.sleep(1)
    except Exception as e:
        notifier.notify(f"STATUS=ОШИБКА: {e}")
        raise

Сетевая конфигурация для промышленного шлюза

# /etc/dhcpcd.conf — статический IP для промышленной сети
interface eth0
static ip_address=192.168.1.200/24
static routers=192.168.1.1
static domain_name_servers=192.168.1.1 8.8.8.8

# Или через NetworkManager (современный способ):
sudo nmcli con add type ethernet ifname eth0 con-name industrial \
    ipv4.method manual ipv4.addresses 192.168.1.200/24 \
    ipv4.gateway 192.168.1.1 ipv4.dns "192.168.1.1 8.8.8.8"
sudo nmcli con up industrial

Bonding (резервирование сети):

# Два сетевых интерфейса — один основной, второй резервный
# /etc/network/interfaces:
auto bond0
iface bond0 inet static
    address 192.168.1.200
    netmask 255.255.255.0
    gateway 192.168.1.1
    bond-slaves eth0 eth1
    bond-mode active-backup    # При отказе eth0 — переключаемся на eth1
    bond-miimon 100            # Проверка связи каждые 100 мс
    bond-primary eth0

Buildroot: минимальный Linux-образ

Для серийного производства не нужен полный Raspberry Pi OS с Python IDE и LibreOffice. Нужен минимальный образ с только нужными компонентами.

Buildroot — система сборки кастомных Linux-образов:

# Клонируем Buildroot
git clone https://git.buildroot.net/buildroot
cd buildroot

# Начинаем с дефолтной конфигурации для Raspberry Pi
make raspberrypi4_64_defconfig

# Настраиваем через menuconfig
make menuconfig
# Target packages → Networking applications → mosquitto (MQTT-брокер)
# Target packages → Libraries → python3
# Target packages → Libraries → python-paho-mqtt
# System configuration → Root password

# Собираем (первый раз ~1-2 часа)
make -j4

# Результат: output/images/sdcard.img
# Записываем на SD:
sudo dd if=output/images/sdcard.img of=/dev/sdX bs=4M status=progress

Преимущества кастомного образа:

  • Размер: 50–200 МБ вместо 4–8 ГБ

  • Быстрый старт: 5–8 секунд вместо 30+

  • Безопасность: минимальная поверхность атаки

  • Reproducible builds: одинаковый образ на всех устройствах


Python-приложение как надёжный шлюз

#!/opt/gateway/venv/bin/python3
"""
Промышленный IoT-шлюз: Modbus RTU → MQTT
"""

import logging
import signal
import sys
import time
import json
import threading
from pathlib import Path
from typing import Optional

import sdnotify
import paho.mqtt.client as mqtt
from pymodbus.client import ModbusSerialClient

# Настройка логирования (в journald через stderr)
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s [%(levelname)s] %(name)s: %(message)s',
    stream=sys.stderr
)
log = logging.getLogger('gateway')


class IndustrialGateway:
    
    def __init__(self):
        self.running    = False
        self.notifier   = sdnotify.SystemdNotifier()
        self.mqtt_client: Optional[mqtt.Client] = None
        self.modbus_client: Optional[ModbusSerialClient] = None
        
        # Конфигурация из переменных окружения
        import os
        self.mqtt_host    = os.getenv('MQTT_HOST', 'localhost')
        self.mqtt_port    = int(os.getenv('MQTT_PORT', '1883'))
        self.modbus_port  = os.getenv('MODBUS_PORT', '/dev/serial0')
        self.modbus_baud  = int(os.getenv('MODBUS_BAUD', '9600'))
        self.poll_interval = float(os.getenv('POLL_INTERVAL', '1.0'))
    
    def setup(self):
        """Инициализация подключений"""
        log.info("Инициализация шлюза...")
        
        # MQTT
        self.mqtt_client = mqtt.Client(client_id="industrial-gateway")
        self.mqtt_client.on_connect    = self._on_mqtt_connect
        self.mqtt_client.on_disconnect = self._on_mqtt_disconnect
        self.mqtt_client.will_set("gateway/status", 
                                   '{"online": false}', retain=True)
        
        self.mqtt_client.connect_async(self.mqtt_host, self.mqtt_port)
        self.mqtt_client.loop_start()
        
        # Modbus
        self.modbus_client = ModbusSerialClient(
            port=self.modbus_port,
            baudrate=self.modbus_baud,
            timeout=1.0
        )
        
        if not self.modbus_client.connect():
            log.error(f"Не удалось подключиться к Modbus: {self.modbus_port}")
    
    def _on_mqtt_connect(self, client, userdata, flags, rc):
        if rc == 0:
            log.info(f"MQTT подключён: {self.mqtt_host}")
            client.publish("gateway/status", '{"online": true}', retain=True)
        else:
            log.error(f"MQTT ошибка подключения: {rc}")
    
    def _on_mqtt_disconnect(self, client, userdata, rc):
        log.warning(f"MQTT отключён (rc={rc}), переподключение...")
    
    def poll_device(self, device_addr: int, reg_start: int,
                    reg_count: int) -> Optional[list]:
        """Опрос устройства Modbus"""
        try:
            result = self.modbus_client.read_input_registers(
                address=reg_start, count=reg_count, slave=device_addr
            )
            if result.isError():
                log.warning(f"Modbus ошибка: устройство {device_addr}")
                return None
            return result.registers
        except Exception as e:
            log.error(f"Исключение при опросе {device_addr}: {e}")
            return None
    
    def publish(self, topic: str, data: dict):
        """Публикация в MQTT"""
        try:
            payload = json.dumps(data, ensure_ascii=False)
            self.mqtt_client.publish(topic, payload)
        except Exception as e:
            log.error(f"Ошибка публикации: {e}")
    
    def run(self):
        """Главный цикл"""
        self.running = True
        
        # Сигналы завершения
        signal.signal(signal.SIGTERM, self._shutdown)
        signal.signal(signal.SIGINT,  self._shutdown)
        
        self.setup()
        
        # Сообщаем systemd о готовности
        self.notifier.notify("READY=1")
        log.info("Шлюз запущен и готов к работе")
        
        poll_count = 0
        
        while self.running:
            loop_start = time.monotonic()
            
            # Опрашиваем частотник (адрес 1, регистры 0-5)
            vfd_data = self.poll_device(1, 0, 6)
            if vfd_data:
                self.publish("factory/line1/vfd1/telemetry", {
                    'status_word': vfd_data[0],
                    'freq_hz':     vfd_data[1] / 100.0,
                    'current_a':   vfd_data[2] / 10.0,
                    'voltage_v':   vfd_data[3],
                    'power_kw':    vfd_data[4] / 10.0,
                    'fault_code':  vfd_data[5],
                    'running':     bool(vfd_data[0] & 0x0001),
                    'fault':       bool(vfd_data[0] & 0x0008),
                })
            
            # Опрашиваем датчик давления (адрес 5, регистры 0-1)
            pressure_data = self.poll_device(5, 0, 2)
            if pressure_data:
                import struct
                raw = struct.pack('>HH', pressure_data[0], pressure_data[1])
                pressure = struct.unpack('>f', raw)[0]
                self.publish("factory/line1/pressure1/telemetry", {
                    'pressure_bar': round(pressure, 2)
                })
            
            poll_count += 1
            
            # Watchdog
            if poll_count % 5 == 0:
                self.notifier.notify("WATCHDOG=1")
                self.notifier.notify(f"STATUS=Опросов: {poll_count}")
            
            # Точный таймер интервала опроса
            elapsed = time.monotonic() - loop_start
            sleep_time = self.poll_interval - elapsed
            if sleep_time > 0:
                time.sleep(sleep_time)
            elif sleep_time < -0.1:
                log.warning(f"Опрос занял {elapsed:.3f}с (лимит {self.poll_interval}с)")
    
    def _shutdown(self, signum, frame):
        log.info(f"Получен сигнал {signum}, завершаем работу...")
        self.running = False
        
        if self.mqtt_client:
            self.mqtt_client.publish("gateway/status", 
                                     '{"online": false}', retain=True)
            self.mqtt_client.loop_stop()
        
        if self.modbus_client:
            self.modbus_client.close()
        
        sys.exit(0)


if __name__ == "__main__":
    gateway = IndustrialGateway()
    gateway.run()

Безопасность Linux-устройства в сети

# Минимальный hardening для промышленного устройства

# 1. Обновление системы (автоматически)
sudo apt install unattended-upgrades
sudo dpkg-reconfigure unattended-upgrades

# 2. UFW файрвол
sudo apt install ufw
sudo ufw default deny incoming
sudo ufw default allow outgoing
sudo ufw allow ssh
sudo ufw allow 1883   # MQTT (только если нужен снаружи)
sudo ufw enable

# 3. Отключить неиспользуемые сервисы
sudo systemctl disable bluetooth
sudo systemctl disable avahi-daemon

# 4. Изменить стандартный пароль!
passwd pi  # ОБЯЗАТЕЛЬНО

# 5. SSH ключи вместо паролей
# Скопировать ключ: ssh-copy-id pi@192.168.1.200
# /etc/ssh/sshd_config:
# PasswordAuthentication no
# PermitRootLogin no

# 6. Логирование попыток взлома
sudo apt install fail2ban

Заключение

Linux на встраиваемых системах — мощнейший инструмент. Raspberry Pi, как платформа, имеет ограничения для промышленного применения, но при правильной настройке (read-only FS, watchdog, systemd-сервисы, резервирование) служит надёжно годами.

Для новых промышленных проектов рассмотрите специализированные платформы: Raspberry Pi CM4 (eMMC, нет SD-карты), BeagleBone Black (PRU для реального времени), Toradex Colibri/Apalis (промышленный температурный диапазон, многолетняя поддержка), IEI, Advantech (сертифицированные промышленные платформы).

Главный принцип для промышленного Linux: устройство должно работать без участия человека годами. Watchdog, автоперезапуск сервисов, защита файловой системы от записи, автоматические обновления безопасности — это не опции, это базовые требования.

User Feedback

Create an account or sign in to leave a review

There are no reviews to display.

Configure browser push notifications

Chrome (Android)
  1. Tap the lock icon next to the address bar.
  2. Tap Permissions → Notifications.
  3. Adjust your preference.
Chrome (Desktop)
  1. Click the padlock icon in the address bar.
  2. Select Site settings.
  3. Find Notifications and adjust your preference.