Everything posted by IThub
-
Комментарий в коде: “не трогать” — и ты сразу хочешь трогать
Встретил в коде: // не трогать, всё сломается Конечно же, первое желание — проверить. Сломалось. Автор комментария: — я же говорил.
-
Починил баг. Появилось 3 новых. Это фича?
Исправил одну проблему. Появились три новые. Судя по динамике, если продолжать — можно создать новую вселенную. Кто-нибудь доводил это до конца?
-
Синьор — это тот, кто умеет гуглить… или уже нет?
Раньше говорили: «хороший разработчик — тот, кто умеет гуглить». Теперь: «тот, кто умеет правильно задать вопрос ИИ». Следующий этап: синьор — это тот, кто понимает, где ИИ врёт.
-
Почему код работает… пока я смотрю на экран?
Замечал странную закономерность: пока смотришь на код — всё работает. Стоит отойти за кофе — падает. Есть гипотеза, что баги боятся зрительного контакта. Кто-нибудь пробовал фиксить прод просто пристальным взглядом?
-
Node.js на бэкенде — удобно или опасно?
Один язык везде — звучит идеально. Но event loop, блокировки, память… Подходит ли Node для серьёзных систем? Или это инструмент «для стартапов»?
-
Почему фронтенд стал сложнее бэкенда?
Раньше HTML+CSS+JS. Теперь сборщики, фреймворки, стейт-менеджмент. Фронт превратился в отдельную инженерную дисциплину. Это оправдано или перегруз?
-
ORM — ускорение разработки или замедление системы?
ORM экономит время. Но потом начинаются странные SQL-запросы и проблемы с производительностью. Стоит ли сразу писать «чистый SQL»? Или ORM — это нормальный компромисс?
-
Монолит vs микросервисы — честно, кто реально выигрывает?
Все хотят микросервисы. Но у маленьких команд это превращается в хаос. Сеть, деплой, логирование — всё усложняется. Может, старый добрый монолит недооценён?
-
Docker решает проблемы — или просто добавляет новый слой боли?
Контейнеры — стандарт индустрии. Но теперь вместо «у меня не работает» → «у меня не работает в контейнере». Сложность только растёт. Docker — необходимость или оверинжиниринг?
-
Async/await — спасение или источник скрытых багов?
Асинхронность упростили, но стало ли реально проще? Ошибки «проглатываются», порядок выполнения неочевиден. Особенно в Python и JS. Стоит ли возвращаться к более явным моделям?
-
JavaScript захватил мир — но ценой здравого смысла?
Фронт, бэк, мобильные приложения — всё на JS. Но сколько боли: npm, зависимости, ломающееся окружение. Почему язык, который задумывался как скриптовый, стал основой всего? Это эволюция или архитектурная ошибка?
-
Python уже не тот? Когда «просто скрипт» превращается в монстра
Всегда любил Python за простоту. Но в последние годы ощущение, что он стал слишком «тяжёлым». Типизация, async, куча фреймворков — и вот уже проект сложнее, чем на Java. В какой момент Python перестаёт быть удобным инструментом? Или проблема в нас, а не в языке?
-
Силовая электроника: тиристоры, IGBT и управление мощными нагрузками
Силовая электроника: между схемотехникой и энергетикойСиловая электроника — область, где электроника управляет реальной мощностью: сотнями ампер, тысячами вольт, мегаваттами. Это регуляторы яркости, частотные преобразователи, зарядные станции для электромобилей, солнечные инверторы, промышленные нагреватели. Ключевое отличие от малосигнальной электроники: КПД критически важен. 99% КПД в источнике 100 кВт означает 1 кВт тепла на радиаторах — и это допустимо. 90% КПД — уже 10 кВт тепловых потерь, требующих серьёзного охлаждения. Тиристор (SCR): управляемый диодТиристор (Silicon Controlled Rectifier, SCR) — четырёхслойный PNPN прибор. Включается коротким импульсом на управляющий электрод (Gate), выключается только при уменьшении тока ниже тока удержания (обычно — переходом через ноль в сети AC). Анод (A) ─── P ─── N ─── P ─── N ─── Катод (K) │ Управляющий электрод (G) Характеристики: Включается: короткий импульс IG > IGT (обычно 10–100 мА) Выключается: ток анода < IH (ток удержания), обычно при переходе AC через ноль Прямое напряжение в открытом состоянии: 1–3В (значительные потери!) Применение: однофазные и трёхфазные выпрямители, регуляторы мощности Фазовое управление тиристоромОсновной метод управления мощностью с тиристором в сети AC: Угол включения α=0°: Полная мощность (100%) Угол включения α=90°: Половинная мощность (~50%) Угол включения α=150°: Малая мощность (~6%) Угол включения α=180°: Минимальная мощность (0%) Средняя мощность ≈ P_max × (1 + cos α) / 2 Схема фазового управления на Arduino/STM32: // Детектор перехода через ноль (Zero-Crossing Detector) // Подключён к INT0 (PD2) через оптопару // При каждом переходе через ноль — прерывание volatile bool zero_cross = false; volatile uint8_t power_pct = 50; // 0-100% // Прерывание от детектора нуля ISR(INT0_vect) // AVR / адаптируй под STM32/ESP32 { zero_cross = true; } void setup() { // Выход на оптотиристор/оптосимистор (через ограничительный резистор ~300Ом) pinMode(9, OUTPUT); digitalWrite(9, LOW); // Прерывание INT0 по фронту (или обоим — зависит от схемы) attachInterrupt(0, zero_cross_ISR, RISING); // Таймер 1: генерирует прерывание через N микросекунд после нуля // Для сети 50 Гц: полупериод = 10 000 мкс // Угол 90° = 5 000 мкс задержка } void zero_cross_ISR() { zero_cross = true; } void loop() { if (zero_cross) { zero_cross = false; // Рассчитываем задержку включения // power_pct = 100 → delay = 0 мкс (включить немедленно) // power_pct = 50 → delay = 5000 мкс (угол 90°) // power_pct = 0 → delay = 10000 мкс (не включать) uint32_t delay_us = (100 - power_pct) * 100; // 0-10000 мкс if (power_pct > 0 && power_pct < 100) { delayMicroseconds(delay_us); // Короткий импульс управления (100-200 мкс достаточно) digitalWrite(9, HIGH); delayMicroseconds(100); digitalWrite(9, LOW); } else if (power_pct >= 100) { digitalWrite(9, HIGH); // Постоянно включено } else { digitalWrite(9, LOW); // Постоянно выключено } } } Проблемы фазового управления: Генерирует гармоники в сети (помехи) Вызывает мигание освещения Создаёт радиопомехи (EMI) Решение для нагревателей: управление по полным полупериодам (Burst Firing) Симистор (TRIAC): для двунаправленного управленияСимистор = два тиристора включённых встречно-параллельно. Проводит ток в ОБОИХ направлениях — идеален для управления нагрузкой переменного тока без выпрямления. MT2 │ ── P ─┤ ── N ─┤── Gate (G) ── P ─┤ ── N ─┤ │ MT1 Управление по нулю (Zero-Crossing Control / Burst Firing)Вместо фазового управления — включаем нагреватель на N полных периодов из M: Мощность 33%: ██░░██░░██░░██░░ (1 из 3 периодов включён) Мощность 50%: ████░░░░████░░░░ (2 из 4 периодов) Мощность 75%: ██████░░██████░░ (3 из 4 периодов) Мощность 100%: ████████████████ (все периоды) Преимущества: нет гармоник, нет EMI, нет щелчков в контакторах. Недостатки: медленнее регулирование (минимальный шаг — полпериода = 10 мс). Оптимально для: промышленные нагреватели, печи сопротивления, ИК-нагреватели. // Burst Firing контроллер class BurstController { private: uint8_t window_size; // Размер окна в полупериодах (например, 20 = 200мс) uint8_t on_count; // Сколько периодов включено uint8_t current_period; // Счётчик текущего периода public: BurstController(uint8_t window = 20) : window_size(window), on_count(0), current_period(0) {} void setPower(float power_pct) { on_count = (uint8_t)(power_pct / 100.0f * window_size + 0.5f); on_count = constrain(on_count, 0, window_size); } // Вызвать при каждом переходе через ноль bool onZeroCross() { bool turn_on = (current_period < on_count); current_period = (current_period + 1) % window_size; return turn_on; } }; BurstController burster(20); // Окно 20 полупериодов = 200 мс void zero_cross_handler() { bool should_be_on = burster.onZeroCross(); digitalWrite(TRIAC_PIN, should_be_on ? HIGH : LOW); } Твёрдотельное реле (SSR): простое решениеSSR (Solid-State Relay) — готовый модуль с тиристором/симистором и оптической развязкой внутри. Управление: 3–32В DC сигнал (совместимо с Arduino/ПЛК), нагрузка: до 40А/480В AC. Выбор SSR: Параметр Рекомендация Ток нагрузки Выбирать с запасом ×2 (40А SSR для 20А нагрузки) Тип управления DC Control (3-32V) для ПЛК, AC Control (90-280V) для PID-регулятора с выходом AC Тип коммутации Zero-Cross для нагревателей, Random Fire для двигателей и трансформаторов Напряжение нагрузки 24-380В AC (проверьте соответствие!) Охлаждение ОБЯЗАТЕЛЬНО радиатор! 0.5°C/Вт для алюминиевого радиатора Тепловой расчёт SSR: def calculate_ssr_heatsink(load_current_a: float, ambient_temp_c: float = 40.0, max_case_temp_c: float = 80.0) -> dict: """ Расчёт требуемого теплового сопротивления радиатора для SSR. SSR: прямое напряжение ~1.2В (Fotek, Crydom) """ # Тепловыделение SSR vf = 1.2 # В, прямое падение на симисторе p_loss = vf * load_current_a # Вт # Тепловое сопротивление корпус-радиатор (junction-to-case): ~0.5°C/Вт rth_jc = 0.5 # °C/W (из datasheet) # Максимальная температура p-n перехода обычно 125°C t_junction_max = 125.0 # Требуемое тепловое сопротивление радиатор-воздух # T_ambient + P × (Rth_jc + Rth_heatsink) = T_case_max rth_heatsink = (max_case_temp_c - ambient_temp_c) / p_loss - rth_jc # Размер алюминиевого радиатора (грубая оценка): # R_th ≈ 50 / (площадь_см²) для вертикального расположения heatsink_area_cm2 = 50.0 / rth_heatsink if rth_heatsink > 0 else float('inf') return { 'load_current_a': load_current_a, 'power_loss_w': round(p_loss, 1), 'rth_heatsink': round(rth_heatsink, 2), 'heatsink_area_cm2': round(heatsink_area_cm2, 0), 'safe_operation': rth_heatsink > 0, } # Пример: SSR 25А нагрузки при T_окр=40°C result = calculate_ssr_heatsink(25.0) print(f"Потери: {result['power_loss_w']} Вт") print(f"Требуется радиатор: {result['heatsink_area_cm2']} см²") # Результат: 30 Вт потерь, нужен радиатор ~150 см² IGBT: для высоких частот и больших мощностейIGBT (Insulated Gate Bipolar Transistor) — гибрид MOSFET и биполярного транзистора. Управляется напряжением (как MOSFET), но имеет низкое напряжение насыщения при больших токах (как биполярный). Где используется: Частотные преобразователи (инвертор моста) ИБП и стабилизаторы Сварочные аппараты Индукционные нагреватели Зарядные станции для электромобилей Ключевые характеристики при выборе IGBT:Vce_max — максимальное напряжение коллектор-эмиттер (выбирать ×2 от напряжения шины!) Ic_max — максимальный ток (с учётом теплового сопротивления!) Vce(sat) — напряжение насыщения (потери в открытом состоянии) Eoff/Eon — энергия переключения (потери на коммутацию, растут с частотой!) toff — время выключения (ограничивает максимальную частоту ШИМ) Расчёт потерь IGBT: def calculate_igbt_losses(vce_sat: float, ic_rms: float, e_on_j: float, e_off_j: float, fsw_hz: float, duty: float) -> dict: """ Расчёт потерь IGBT. vce_sat: напряжение насыщения, В (из datasheet при Ic и Tj) ic_rms: действующий ток коллектора, А e_on_j: энергия включения, Дж (из datasheet) e_off_j: энергия выключения, Дж fsw_hz: частота коммутации, Гц duty: скважность ШИМ (0..1) """ # Потери проводимости (conduction losses) # P_cond = Vce_sat × Ic_avg ic_avg = ic_rms * duty # Упрощение для прямоугольного тока p_cond = vce_sat * ic_avg # Потери переключения (switching losses) # P_sw = (E_on + E_off) × fsw p_sw = (e_on_j + e_off_j) * fsw_hz # Суммарные потери p_total = p_cond + p_sw return { 'conduction_w': round(p_cond, 2), 'switching_w': round(p_sw, 2), 'total_w': round(p_total, 2), 'efficiency_pct': round((1 - p_total / (vce_sat * ic_rms + p_total)) * 100, 1), } # Пример: IGBT 1200В/50А в инверторе 400В # Частотник 11 кВт: Idc ≈ 30А, fsw=8кГц, d=0.8 losses = calculate_igbt_losses( vce_sat = 2.0, # В при 125°C ic_rms = 30.0, # А e_on_j = 0.8e-3, # 0.8 мДж e_off_j = 1.2e-3, # 1.2 мДж fsw_hz = 8000, # 8 кГц duty = 0.8 ) print(f"Потери проводимости: {losses['conduction_w']} Вт") print(f"Потери переключения: {losses['switching_w']} Вт") print(f"Итого: {losses['total_w']} Вт") # При 6 IGBT в трёхфазном мосте: ×6 = итоговые потери инвертора Снабберные цепи: защита от перенапряженийПри выключении IGBT/тиристора индуктивная нагрузка создаёт выброс напряжения: V_spike = L × dI/dt. Без защиты — мгновенная смерть транзистора. RC-снаббер (для симистора):Нагрузка │ [Симистор] │ ───────── ← RC снаббер параллельно симистору │ │ [R ~47 Ом] [C ~47 нФ, 630В] │ │ ───────── │ GND Расчёт RC-снаббера: def calculate_rc_snubber(load_inductance_h: float, switch_current_a: float, supply_voltage_v: float) -> dict: """ Расчёт RC-снаббера для тиристора/симистора. Критерий: выброс напряжения ≤ 2 × Vsupply """ import math # Пиковое напряжение без снаббера # V_peak ≈ V_supply + I × sqrt(L/C_parasitic) # С снаббером ограничиваем до 2×V_supply v_max = 2 * supply_voltage_v # Ёмкость снаббера (минимальная для ограничения выброса) # C ≥ L × I² / (V_max - V_supply)² delta_v = v_max - supply_voltage_v c_min = load_inductance_h * switch_current_a**2 / delta_v**2 c_snubber = c_min * 2 # Запас × 2, нормализуем до стандартного ряда E12 # Стандартный ряд конденсаторов (нФ) e12 = [10, 12, 15, 18, 22, 27, 33, 39, 47, 56, 68, 82] c_nf = c_snubber * 1e9 c_standard_nf = min(e12, key=lambda x: abs(x - c_nf)) # Если нет в ряду — берём следующий больший for val in sorted(e12): if val >= c_nf: c_standard_nf = val break c_actual = c_standard_nf * 1e-9 # Сопротивление снаббера # R ≈ sqrt(L/C) для критического затухания r_critical = math.sqrt(load_inductance_h / c_actual) r_snubber = r_critical # Или немного больше для надёжности # Мощность резистора # P = 0.5 × C × V² × f (для каждого переключения) # Для AC 50Гц: f = 100 (2 перехода через ноль) p_resistor = 0.5 * c_actual * supply_voltage_v**2 * 100 return { 'C_nF': c_standard_nf, 'C_voltage': f"{int(v_max * 1.5 / 100) * 100}В", # Округляем до стандарта 'R_ohm': round(r_snubber, 0), 'R_watts': round(p_resistor * 2, 1), # Запас × 2 } # Пример: управление нагревателем 5 кВт через симистор # Нагреватель = почти чисто активная нагрузка, но есть монтажная индуктивность ~10 мкГн result = calculate_rc_snubber(10e-6, 22.7, 220) print(f"Снаббер: R={result['R_ohm']} Ом/{result['R_watts']} Вт, " f"C={result['C_nF']} нФ/{result['C_voltage']}") Трёхфазное управление нагревателямиДля трёхфазных печей и нагревателей используют 3 SSR (по одному на каждую фазу) или специализированные трёхфазные тиристорные модули: L1 ──[SSR_A]──┐ L2 ──[SSR_B]──┼── Нагреватели (треугольник или звезда) L3 ──[SSR_C]──┘ ↑ ↑ ↑ Сигналы управления с ПЛК/контроллера Управление: все три SSR получают одинаковый сигнал (одновременно вкл/выкл) → симметричная нагрузка, нет перекоса фаз Или: поочерёдное включение фаз (phase rotation) → снижает пиковый ток пуска Алгоритм ПИД для температуры печи с трёхфазным нагревателем: class FurnaceController: """Контроллер температуры трёхфазной печи""" def __init__(self, kp=5.0, ki=0.1, kd=2.0, max_power=100.0): self.pid = PIDController(kp, ki, kd, ts=1.0, out_min=0, out_max=max_power) self.setpoint = 0.0 # Защиты self.max_temp = 1200.0 # °C максимум печи self.fault = False def control_cycle(self, temp_actual: float) -> dict: """Один цикл управления (вызывать каждую секунду)""" # Защита по превышению температуры if temp_actual > self.max_temp: self.fault = True if self.fault: return {'ssr_output': 0.0, 'fault': True, 'temp': temp_actual} # ПИД self.pid.set_setpoint(self.setpoint) power_pct = self.pid.compute(temp_actual) # Мощность → количество периодов включения (Burst Fire) # Окно 10 полупериодов = 100 мс on_periods = int(power_pct / 100 * 10 + 0.5) return { 'ssr_output': power_pct, 'on_periods': on_periods, # из 10 'setpoint': self.setpoint, 'temp_actual': temp_actual, 'error': self.setpoint - temp_actual, 'fault': False, } Тепловой расчёт: как не сжечь компонентыТепловая цепь аналогична электрической: Температура ↔ Напряжение Мощность потерь ↔ Ток Тепловое сопротивление Rth ↔ Электрическое сопротивление T_junction = T_ambient + P_loss × (Rth_j-c + Rth_c-hs + Rth_hs-a) Где: Rth_j-c — тепловое сопротивление кристалл→корпус (из datasheet) Rth_c-hs — корпус→радиатор (зависит от термопасты, ~0.1–0.5 °C/Вт) Rth_hs-a — радиатор→воздух (зависит от размера и обдува) def thermal_check(p_loss_w: float, t_ambient_c: float, rth_jc: float, rth_chs: float, rth_hsa: float, t_junction_max: float = 125.0) -> dict: """ Проверка теплового режима силового прибора. """ rth_total = rth_jc + rth_chs + rth_hsa t_junction = t_ambient_c + p_loss_w * rth_total t_case = t_ambient_c + p_loss_w * rth_hsa # Температура корпуса margin = t_junction_max - t_junction safe = margin > 10.0 # Запас минимум 10°C return { 'T_junction_c': round(t_junction, 1), 'T_case_c': round(t_case, 1), 'margin_c': round(margin, 1), 'safe': safe, 'warning': not safe, } # Проверка SSR 30А: result = thermal_check( p_loss_w = 1.2 * 25, # 1.2В × 25А = 30 Вт t_ambient_c = 40.0, rth_jc = 0.5, # из datasheet Fotek SSR-40DA rth_chs = 0.2, # хорошая термопаста rth_hsa = 1.5, # алюминиевый радиатор 150 см² ) print(f"Температура перехода: {result['T_junction_c']}°C") print(f"Запас: {result['margin_c']}°C — {'OK' if result['safe'] else 'ОПАСНО!'}") Типичные ошибки в силовой электроникеНет снаббера на индуктивной нагрузке → выброс напряжения → смерть тиристора SSR без радиатора → перегрев за несколько минут при токе > 5А Управляющий сигнал без оптической развязки → 220В на Arduino/ПЛК = конец Не проверена полярность тиристора → не переключается или горит сразу Фазовое управление на трансформаторную нагрузку → насыщение сердечника IGBT с Vce_max = Vsupply → первый же выброс убивает → минимум ×2 запас ЗаключениеСиловая электроника — область, где цена ошибки высока: сгоревший IGBT, пожар, травма. Всегда работайте с полной изоляцией от сети, используйте изолирующие трансформаторы при разработке, не экономьте на снабберах и радиаторах. Для начала: освойте управление нагревателем через SSR и ПИД-регулятор — это самая распространённая и безопасная задача. Потом — изучите теорию тиристорного управления. После этого — IGBT в H-мостах для двигателей. И только с хорошей теоретической базой — трёхфазные инверторы. Измерительный осциллограф с изолированными щупами и клещи-амперметр — ваши обязательные инструменты в этой области. Без возможности видеть что происходит на осциллографе — работать в силовой электронике вслепую.
-
Python для инженеров: автоматизация, обработка данных и промышленные приложения
Python — второй язык каждого инженераMatlab стоит дорого. LabVIEW — ещё дороже. Excel мощный, но у него есть потолок. Python — бесплатный, открытый, с огромной экосистемой библиотек для инженерных задач. И с каждым годом он глубже проникает в промышленность. Инженер-электронщик использует Python для: анализа данных с измерительных приборов, автоматизации рутинных расчётов, создания отчётов, обработки сигналов с АЦП, управления лабораторным оборудованием (VISA/PyVISA), прототипирования алгоритмов перед переносом на микроконтроллер. Специалист АСУ ТП — для: работы с Modbus/OPC UA, парсинга логов ПЛК, автоматического тестирования, интеграции различных систем. NumPy: числа быстроNumPy — фундамент научного Python. Массивы NumPy в 10–100 раз быстрее списков Python для математических операций. import numpy as np import time # ===== БАЗОВЫЕ ОПЕРАЦИИ ===== # Создание массивов t = np.linspace(0, 10, 1000) # 1000 точек от 0 до 10 f = np.arange(0, 50, 0.1) # От 0 до 50 с шагом 0.1 zeros = np.zeros((3, 4)) # Матрица 3×4 из нулей eye = np.eye(3) # Единичная матрица 3×3 # Синтетический сигнал (для теста) freq_signal = 50.0 # Гц freq_noise = 200.0 # Гц (помеха) sample_rate = 1000.0 # Гц t = np.arange(0, 1, 1/sample_rate) # 1 секунда данных signal_clean = 2.0 * np.sin(2 * np.pi * freq_signal * t) noise = 0.5 * np.sin(2 * np.pi * freq_noise * t) noise += 0.2 * np.random.randn(len(t)) # Белый шум signal_noisy = signal_clean + noise # ===== СКОРОСТЬ ===== def python_rms(data: list) -> float: return (sum(x**2 for x in data) / len(data)) ** 0.5 def numpy_rms(data: np.ndarray) -> float: return np.sqrt(np.mean(data**2)) # Сравнение скорости: data_list = list(signal_noisy) data_arr = np.array(data_list) t0 = time.time(); python_rms(data_list); t_py = time.time() - t0 t0 = time.time(); numpy_rms(data_arr); t_np = time.time() - t0 print(f"Python: {t_py*1000:.2f} мс, NumPy: {t_np*1000:.3f} мс, " f"Ускорение: {t_py/t_np:.0f}x") # ===== ИНЖЕНЕРНЫЕ РАСЧЁТЫ ===== def calculate_power_factor(voltage: np.ndarray, current: np.ndarray, sample_rate: float) -> dict: """ Расчёт коэффициента мощности из осциллограмм тока и напряжения. """ # RMS значения V_rms = np.sqrt(np.mean(voltage**2)) I_rms = np.sqrt(np.mean(current**2)) # Активная мощность (среднее произведение) P = np.mean(voltage * current) # Полная мощность S = V_rms * I_rms # Коэффициент мощности pf = P / S if S > 0 else 0 # Реактивная мощность Q = np.sqrt(max(0, S**2 - P**2)) return { 'V_rms': round(V_rms, 2), 'I_rms': round(I_rms, 3), 'P_kw': round(P / 1000, 2), 'Q_kvar': round(Q / 1000, 2), 'S_kva': round(S / 1000, 2), 'PF': round(abs(pf), 3), } # Пример использования: # Генерируем тестовые сигналы 220В 50Гц, ток 10А с φ=30° t = np.linspace(0, 0.04, 400) # 2 периода V = 220 * np.sqrt(2) * np.sin(2 * np.pi * 50 * t) I = 10 * np.sqrt(2) * np.sin(2 * np.pi * 50 * t - np.radians(30)) power = calculate_power_factor(V, I, 10000) print(f"P={power['P_kw']} кВт, Q={power['Q_kvar']} квар, cos(φ)={power['PF']}") # Ожидаем: cos(30°) ≈ 0.866 Scipy: анализ сигналовfrom scipy import signal, fft import numpy as np # ===== FFT: СПЕКТРАЛЬНЫЙ АНАЛИЗ ===== def analyze_spectrum(data: np.ndarray, sample_rate: float) -> dict: """ Анализ спектра сигнала через FFT. Используется для диагностики вибраций, качества электроэнергии. """ n = len(data) # Оконная функция (Hanning) для уменьшения спектральных утечек window = np.hanning(n) data_windowed = data * window # FFT spectrum = np.abs(fft.rfft(data_windowed)) freqs = fft.rfftfreq(n, 1.0/sample_rate) # Нормировка (учёт оконной функции) spectrum = spectrum / (n / 2) # THD (Total Harmonic Distortion) — для качества сетевого напряжения # Находим основную частоту (50 Гц) fundamental_idx = np.argmin(np.abs(freqs - 50.0)) fundamental_amp = spectrum[fundamental_idx] # Гармоники 2-я...7-я harmonic_power = sum( spectrum[np.argmin(np.abs(freqs - 50.0 * n))]**2 for n in range(2, 8) ) thd = np.sqrt(harmonic_power) / fundamental_amp * 100 # % # Топ-5 пиков спектра peak_indices = np.argsort(spectrum)[-10:][::-1] top_peaks = [(round(freqs[i], 1), round(spectrum[i], 4)) for i in peak_indices] return { 'freqs': freqs, 'spectrum': spectrum, 'thd_pct': round(thd, 2), 'top_peaks': top_peaks[:5], 'rms': round(np.sqrt(np.mean(data**2)), 4), } # ===== ФИЛЬТРАЦИЯ СИГНАЛОВ ===== def design_lowpass_filter(cutoff_hz: float, sample_rate: float, order: int = 4) -> tuple: """ Проектирование фильтра нижних частот Баттерворта. Используется для сглаживания зашумлённых данных датчиков. """ nyquist = sample_rate / 2 normalized_cutoff = cutoff_hz / nyquist b, a = signal.butter(order, normalized_cutoff, btype='low', analog=False) return b, a def apply_filter(data: np.ndarray, b: np.ndarray, a: np.ndarray, zero_phase: bool = True) -> np.ndarray: """ Применение фильтра к сигналу. zero_phase=True: filtfilt (нет фазового сдвига, требует данных полностью) zero_phase=False: lfilter (реального времени, есть фазовый сдвиг) """ if zero_phase: return signal.filtfilt(b, a, data) # Двупроходной (офлайн-обработка) else: return signal.lfilter(b, a, data) # Однопроходной (онлайн-обработка) # Пример: фильтрация зашумлённого датчика температуры sample_rate = 100.0 # 100 Гц t = np.arange(0, 10, 1/sample_rate) # Реальная температура (медленно меняется) true_temp = 75.0 + 5.0 * np.sin(2 * np.pi * 0.1 * t) # 0.1 Гц # С шумом (50Гц помеха от сети + белый шум) noisy_temp = true_temp + 2.0 * np.sin(2 * np.pi * 50 * t) + \ 0.5 * np.random.randn(len(t)) # Фильтр НЧ с частотой среза 1 Гц (убираем всё выше 1 Гц) b, a = design_lowpass_filter(cutoff_hz=1.0, sample_rate=sample_rate) filtered_temp = apply_filter(noisy_temp, b, a) print(f"Шум до фильтрации: {np.std(noisy_temp - true_temp):.3f}°C") print(f"Шум после фильтра: {np.std(filtered_temp - true_temp):.3f}°C") # ===== КОРРЕЛЯЦИЯ И ОБНАРУЖЕНИЕ СИГНАЛА ===== def find_pattern_in_signal(signal_data: np.ndarray, pattern: np.ndarray) -> list[int]: """ Поиск паттерна в сигнале через кросс-корреляцию. Применение: нахождение пакетов в потоке данных, обнаружение событий. """ correlation = np.correlate(signal_data, pattern, mode='valid') threshold = 0.8 * np.max(np.abs(correlation)) peaks, _ = signal.find_peaks(correlation, height=threshold, distance=len(pattern)) return list(peaks) Pandas: анализ промышленных данныхimport pandas as pd import numpy as np from datetime import datetime, timedelta # ===== ЗАГРУЗКА И ОЧИСТКА ДАННЫХ ===== def load_plc_log(filepath: str) -> pd.DataFrame: """ Загрузка и нормализация лога ПЛК. Типичный формат: CSV с временной меткой и значениями тегов. """ df = pd.read_csv(filepath, parse_dates=['timestamp'], index_col='timestamp') # Нормализация имён колонок df.columns = df.columns.str.lower().str.replace(' ', '_').str.replace('.', '_') # Приведение типов numeric_cols = ['temperature', 'pressure', 'current', 'flow'] for col in numeric_cols: if col in df.columns: df[col] = pd.to_numeric(df[col], errors='coerce') bool_cols = ['running', 'fault', 'alarm'] for col in bool_cols: if col in df.columns: df[col] = df[col].astype(bool, errors='ignore') # Удаление дубликатов df = df[~df.index.duplicated(keep='first')] # Сортировка по времени df = df.sort_index() # Интерполяция пропущенных значений (не более 5 пропусков подряд) df[numeric_cols] = df[numeric_cols].interpolate( method='time', limit=5, limit_direction='forward' ) return df def analyze_production_data(df: pd.DataFrame) -> dict: """ Анализ производственных данных: KPI, простои, отклонения. """ results = {} # ===== ДОСТУПНОСТЬ ОБОРУДОВАНИЯ ===== if 'running' in df.columns: total_time = (df.index[-1] - df.index[0]).total_seconds() / 3600 # часы running_time = df['running'].mean() * total_time results['availability'] = { 'total_hours': round(total_time, 1), 'running_hours': round(running_time, 1), 'availability_pct': round(df['running'].mean() * 100, 1), } # ===== АНАЛИЗ ПРОСТОЕВ ===== if 'running' in df.columns: # Нахождение периодов простоя running_changes = df['running'].astype(int).diff() stop_times = df.index[running_changes == -1] # Моменты остановки start_times = df.index[running_changes == 1] # Моменты пуска downtimes = [] for stop in stop_times: # Найти следующий пуск после остановки next_start = start_times[start_times > stop] if len(next_start) > 0: duration = (next_start[0] - stop).total_seconds() / 60 # минуты downtimes.append({'stop': stop, 'start': next_start[0], 'duration_min': round(duration, 1)}) if downtimes: dt_df = pd.DataFrame(downtimes) results['downtimes'] = { 'count': len(dt_df), 'total_min': round(dt_df['duration_min'].sum(), 1), 'avg_min': round(dt_df['duration_min'].mean(), 1), 'max_min': round(dt_df['duration_min'].max(), 1), 'longest_stop': dt_df.loc[dt_df['duration_min'].idxmax(), 'stop'].isoformat(), } # ===== СТАТИСТИКА ПАРАМЕТРОВ ===== numeric_cols = df.select_dtypes(include=np.number).columns.tolist() if numeric_cols: stats = df[numeric_cols].describe() results['parameters'] = stats.to_dict() # ===== ОБНАРУЖЕНИЕ ВЫБРОСОВ (метод IQR) ===== outliers = {} for col in numeric_cols: Q1 = df[col].quantile(0.25) Q3 = df[col].quantile(0.75) IQR = Q3 - Q1 mask = (df[col] < Q1 - 1.5 * IQR) | (df[col] > Q3 + 1.5 * IQR) outlier_count = mask.sum() if outlier_count > 0: outliers[col] = { 'count': int(outlier_count), 'pct': round(outlier_count / len(df) * 100, 2), 'examples': df[mask][col].head(3).tolist(), } results['outliers'] = outliers return results # ===== ГЕНЕРАЦИЯ ОТЧЁТОВ ===== def generate_daily_report(df: pd.DataFrame, date: str = None) -> pd.DataFrame: """Сводная таблица по часам за день""" if date: df = df[df.index.date == pd.Timestamp(date).date()] # Агрегация по часам hourly = df.resample('1h').agg({ 'temperature': ['mean', 'min', 'max'], 'current': ['mean', 'max'], 'pressure': ['mean', 'min', 'max'], 'running': 'mean', # Доступность за час 'fault': 'any', # Были ли аварии }).round(2) # Плоские имена колонок hourly.columns = ['_'.join(col) for col in hourly.columns] hourly['availability_pct'] = (hourly['running_mean'] * 100).round(1) hourly['had_fault'] = hourly['fault_any'] return hourly # ===== EXCEL ОТЧЁТ ===== def export_to_excel(df: pd.DataFrame, hourly: pd.DataFrame, kpi: dict, filepath: str): """Красивый Excel-отчёт с несколькими листами""" with pd.ExcelWriter(filepath, engine='xlsxwriter') as writer: workbook = writer.book # Форматы header_fmt = workbook.add_format({ 'bold': True, 'bg_color': '#2C3E50', 'font_color': 'white', 'border': 1 }) number_fmt = workbook.add_format({'num_format': '0.0#', 'border': 1}) pct_fmt = workbook.add_format({'num_format': '0.0%', 'border': 1}) bad_fmt = workbook.add_format({'bg_color': '#FFB3B3', 'border': 1}) # ===== Лист 1: KPI ===== ws_kpi = workbook.add_worksheet('KPI') ws_kpi.write('A1', 'Показатель', header_fmt) ws_kpi.write('B1', 'Значение', header_fmt) avail = kpi.get('availability', {}) row = 1 for key, val in avail.items(): ws_kpi.write(row, 0, key) ws_kpi.write(row, 1, val) row += 1 ws_kpi.set_column('A:A', 25) ws_kpi.set_column('B:B', 15) # ===== Лист 2: Почасовой отчёт ===== hourly.to_excel(writer, sheet_name='Почасовой отчёт', startrow=1) ws = writer.sheets['Почасовой отчёт'] # Условное форматирование: красим аварийные часы ws.conditional_format('A2:Z1000', { 'type': 'formula', 'criteria': '=$G2=TRUE', # Если был fault 'format': bad_fmt }) # ===== Лист 3: Сырые данные (последние 1000 строк) ===== df.tail(1000).to_excel(writer, sheet_name='Данные') print(f"Отчёт сохранён: {filepath}") FastAPI: REST API для промышленных данных# pip install fastapi uvicorn from fastapi import FastAPI, HTTPException, Query from pydantic import BaseModel from typing import Optional from datetime import datetime import uvicorn app = FastAPI( title="Industrial Data API", description="API для доступа к данным производственного оборудования", version="1.0.0" ) # Модели данных class TelemetryPoint(BaseModel): device: str temperature: float current: float pressure: float running: bool timestamp: datetime class DeviceCommand(BaseModel): device: str command: str # "start", "stop", "set_setpoint" value: Optional[float] = None operator: str # Имитация БД (в реальности — запросы к InfluxDB/TimescaleDB) telemetry_db = [] @app.get("/api/v1/devices", summary="Список устройств") async def get_devices(): return { "devices": [ {"id": "pump1", "name": "Насос 1", "location": "Линия 1", "online": True}, {"id": "pump2", "name": "Насос 2", "location": "Линия 1", "online": True}, {"id": "valve1","name": "Клапан 1","location": "Линия 2","online": False}, ] } @app.get("/api/v1/telemetry/{device_id}", summary="Телеметрия устройства") async def get_telemetry( device_id: str, hours: int = Query(default=1, ge=1, le=720, description="Глубина истории"), resample: str = Query(default="1min", description="Гранулярность: 1s, 1min, 5min, 1h") ): # Проверка устройства valid_devices = ["pump1", "pump2", "valve1"] if device_id not in valid_devices: raise HTTPException(status_code=404, detail=f"Устройство '{device_id}' не найдено") # Запрос к historian (имитация) # В реальности: query_influxdb(device_id, hours, resample) return { "device": device_id, "from": (datetime.now().replace(hour=0, minute=0)).isoformat(), "to": datetime.now().isoformat(), "resample": resample, "points": [ {"time": datetime.now().isoformat(), "temperature": 85.3, "current": 15.2, "pressure": 5.8, "running": True} ] } @app.post("/api/v1/commands", summary="Отправить команду устройству", status_code=202) async def send_command(cmd: DeviceCommand): # Валидация команды valid_commands = ["start", "stop", "set_setpoint"] if cmd.command not in valid_commands: raise HTTPException(status_code=400, detail=f"Неизвестная команда: {cmd.command}") if cmd.command == "set_setpoint" and cmd.value is None: raise HTTPException(status_code=400, detail="set_setpoint требует параметр value") # Аудит-лог (обязательно для промышленных систем!) print(f"[AUDIT] {datetime.now()} | Operator: {cmd.operator} | " f"Device: {cmd.device} | Command: {cmd.command} | Value: {cmd.value}") # Отправить команду (через очередь, Modbus, OPC UA...) # command_queue.put(cmd) return {"status": "accepted", "command_id": "cmd_123456"} @app.get("/api/v1/health", summary="Healthcheck") async def health(): return {"status": "ok", "timestamp": datetime.now().isoformat()} # Запуск: uvicorn main:app --host 0.0.0.0 --port 8080 --reload asyncio: асинхронный опрос оборудованияimport asyncio import aiohttp import json from datetime import datetime async def poll_device_modbus(device_id: str, host: str, interval: float = 1.0): """Асинхронный опрос устройства через Modbus TCP""" from pymodbus.client import AsyncModbusTcpClient async with AsyncModbusTcpClient(host=host, port=502) as client: print(f"Подключён к {device_id} ({host})") while True: start = asyncio.get_event_loop().time() try: result = await client.read_input_registers(address=0, count=4, slave=1) if not result.isError(): data = { 'device': device_id, 'timestamp': datetime.now().isoformat(), 'temperature': result.registers[0] / 10.0, 'current': result.registers[1] / 10.0, 'pressure': result.registers[2] / 100.0, 'running': bool(result.registers[3] & 1), } # Публикуем данные (в очередь, БД, MQTT...) print(f"{device_id}: T={data['temperature']}°C") else: print(f"{device_id}: Ошибка Modbus") except Exception as e: print(f"{device_id}: {e}") await asyncio.sleep(5) # Пауза перед повтором continue # Точный интервал опроса elapsed = asyncio.get_event_loop().time() - start await asyncio.sleep(max(0, interval - elapsed)) async def main(): """Параллельный опрос нескольких устройств""" devices = [ ("pump1", "192.168.1.10"), ("pump2", "192.168.1.11"), ("valve1", "192.168.1.12"), ] # Запускаем все опросы параллельно tasks = [poll_device_modbus(dev_id, host, interval=1.0) for dev_id, host in devices] await asyncio.gather(*tasks) # Все работают одновременно! asyncio.run(main()) Полезные однострочники для инженераimport subprocess, json, struct, serial from pathlib import Path # Быстрый Modbus опрос из командной строки: # python -c "from pymodbus.client import ModbusTcpClient; c=ModbusTcpClient('192.168.1.10'); c.connect(); print(c.read_input_registers(0,4,slave=1).registers)" # Конвертация hex-дампа в float: def hex_to_float(hex_str: str) -> float: return struct.unpack('>f', bytes.fromhex(hex_str.replace(' ','')))[0] print(hex_to_float("42 48 00 00")) # → 50.0 # Поиск COM-портов: import serial.tools.list_ports for p in serial.tools.list_ports.comports(): print(f"{p.device}: {p.description}") # Быстрый парсинг CSV с временными метками: df = pd.read_csv('data.csv', parse_dates=['time'], index_col='time') print(df.resample('5min').mean()) # Сохранение данных в Parquet (быстрее CSV в 10-50 раз): df.to_parquet('data.parquet', compression='snappy') df2 = pd.read_parquet('data.parquet') ЗаключениеPython — это не замена C для микроконтроллеров и не замена SQL для баз данных. Это клей, который соединяет всё: читает данные из любого источника, анализирует, визуализирует, отправляет куда надо. Для инженера ключевые библиотеки: NumPy (быстрые вычисления), Pandas (анализ данных), SciPy (сигналы и системы), Matplotlib/Plotly (визуализация), pymodbus (Modbus), pyserial (UART), asyncua (OPC UA), FastAPI (REST API). Вложите неделю в изучение NumPy и Pandas — окупится сотнями часов сэкономленного времени на анализе данных, отчётах и автоматизации рутины.
-
FreeRTOS: операционная система реального времени для встраиваемых систем
Зачем RTOS на микроконтроллереПростой проект — один while(1) цикл. Всё хорошо: считали датчик, обновили дисплей, проверили кнопку. Но что если: Нужно принять UART-пакет точно за 10 мс, иначе потеряем байты Одновременно управлять тремя независимыми ПИД-контурами Обрабатывать CAN-сообщения с задержкой не более 5 мс И параллельно вести логирование на SD-карту Суперцикл (while(1)) ломается: длинная операция блокирует всё остальное. Прерывания помогают, но сложная логика в прерываниях — путь к хаосу. FreeRTOS решает это элегантно: каждая задача — отдельный "поток" со своим стеком и приоритетом. Планировщик переключает их так быстро (обычно каждые 1 мс), что кажется будто они работают одновременно. Задача с высоким приоритетом всегда получает процессор раньше. Ключевые концепции FreeRTOSTask (Задача)// Прототип задачи — бесконечный цикл! void vTaskFunction(void *pvParameters) { // Инициализация задачи int *param = (int *)pvParameters; for (;;) // Никогда не выходит! { // Работа задачи... // Уступить процессор (обязательно в каждом цикле!) vTaskDelay(pdMS_TO_TICKS(100)); // Пауза 100 мс } vTaskDelete(NULL); // Никогда не достигается, но хорошая практика } // Создание задачи: TaskHandle_t xTaskHandle = NULL; xTaskCreate( vTaskFunction, // Функция задачи "TaskName", // Имя (для отладки) configMINIMAL_STACK_SIZE * 4, // Размер стека в словах NULL, // Параметр (pvParameters) tskIDLE_PRIORITY + 2, // Приоритет (выше = важнее) &xTaskHandle // Хендл задачи ); ПриоритетыconfigMAX_PRIORITIES = 7 (типично) Приоритет 6: КРИТИЧЕСКИЙ (ISR-уровень, прерывания) Приоритет 5: Коммуникации реального времени (CAN, UART) Приоритет 4: Управление (ПИД-контроллеры) Приоритет 3: Мониторинг, аварийная логика Приоритет 2: UI, дисплей, кнопки Приоритет 1: Логирование, некритичные задачи Приоритет 0: Idle task (только когда все остальные ждут) Архитектура многозадачного приложенияРеальный пример: контроллер насосной станции на STM32F4. // ===== ЗАГОЛОВКИ ===== #include "FreeRTOS.h" #include "task.h" #include "queue.h" #include "semphr.h" #include "timers.h" // ===== ГЛОБАЛЬНЫЕ ОБЪЕКТЫ FreeRTOS ===== QueueHandle_t xSensorQueue; // Данные датчиков QueueHandle_t xCommandQueue; // Команды управления QueueHandle_t xLogQueue; // Сообщения лога SemaphoreHandle_t xI2CMutex; // Защита I2C шины SemaphoreHandle_t xUARTMutex; // Защита UART (printf) TimerHandle_t xHeartbeatTimer; // Мигание LED watchdog // ===== СТРУКТУРЫ ДАННЫХ ===== typedef struct { float temperature; float pressure; float flow; uint32_t timestamp_ms; uint8_t quality; // 0=BAD, 1=UNCERTAIN, 2=GOOD } SensorData_t; typedef enum { CMD_START, CMD_STOP, CMD_SET_SETPOINT, CMD_RESET_FAULT, } CommandType_t; typedef struct { CommandType_t type; float value; uint8_t source; // 0=HMI, 1=Modbus, 2=Auto } Command_t; typedef struct { char message[80]; uint8_t level; // 0=DEBUG, 1=INFO, 2=WARN, 3=ERROR uint32_t timestamp_ms; } LogMessage_t; // ===== ВСПОМОГАТЕЛЬНЫЙ МАКРОС ДЛЯ PRINTF ===== // Потокобезопасный printf через мьютекс #define LOG(level, fmt, ...) do { \ LogMessage_t msg; \ msg.level = (level); \ msg.timestamp_ms = xTaskGetTickCount(); \ snprintf(msg.message, sizeof(msg.message), fmt, ##__VA_ARGS__); \ xQueueSend(xLogQueue, &msg, 0); \ } while(0) #define LOG_INFO(fmt,...) LOG(1, fmt, ##__VA_ARGS__) #define LOG_WARN(fmt,...) LOG(2, "[WARN] " fmt, ##__VA_ARGS__) #define LOG_ERROR(fmt,...) LOG(3, "[ERR!] " fmt, ##__VA_ARGS__) // ===== ЗАДАЧА 1: ЧТЕНИЕ ДАТЧИКОВ (Приоритет 4) ===== static void vSensorTask(void *pvParam) { SensorData_t data; TickType_t xLastWakeTime = xTaskGetTickCount(); const TickType_t xPeriod = pdMS_TO_TICKS(100); // 10 Гц LOG_INFO("Sensor task started"); for (;;) { // Ждём ровно 100 мс от последнего пробуждения // vTaskDelayUntil гарантирует точный период! vTaskDelayUntil(&xLastWakeTime, xPeriod); data.timestamp_ms = xTaskGetTickCount(); // Захватываем I2C шину if (xSemaphoreTake(xI2CMutex, pdMS_TO_TICKS(50)) == pdTRUE) { data.temperature = BMP280_ReadTemperature(); data.pressure = BMP280_ReadPressure(); data.quality = 2; // GOOD xSemaphoreGive(xI2CMutex); } else { // I2C занята дольше 50 мс — что-то пошло не так data.quality = 0; // BAD LOG_WARN("I2C timeout in sensor task"); } // Читаем расходомер через 4-20мА data.flow = ADC_ReadFlow(); // Отправляем данные в очередь (не блокируем — если полная, пропускаем) if (xQueueSend(xSensorQueue, &data, 0) != pdTRUE) { LOG_WARN("Sensor queue full!"); } } } // ===== ЗАДАЧА 2: ПИД-УПРАВЛЕНИЕ (Приоритет 5) ===== static void vControlTask(void *pvParam) { SensorData_t sensorData; Command_t command; float setpoint = 5.0f; // Уставка давления, бар float output = 0.0f; bool running = false; bool fault = false; // ПИД параметры float kp = 2.0f, ki = 0.5f, kd = 0.1f; float integral = 0.0f, prevError = 0.0f; const float TS = 0.1f; // Совпадает с периодом датчиков LOG_INFO("Control task started"); for (;;) { // Ждём новые данные датчиков (блокирующий ждём до 200 мс) if (xQueueReceive(xSensorQueue, &sensorData, pdMS_TO_TICKS(200)) == pdTRUE) { // Проверяем входящие команды (неблокирующий) while (xQueueReceive(xCommandQueue, &command, 0) == pdTRUE) { switch (command.type) { case CMD_START: running = true; fault = false; integral = 0; LOG_INFO("Pump STARTED by source %d", command.source); break; case CMD_STOP: running = false; output = 0; LOG_INFO("Pump STOPPED by source %d", command.source); break; case CMD_SET_SETPOINT: setpoint = command.value; LOG_INFO("Setpoint changed to %.1f bar", setpoint); break; case CMD_RESET_FAULT: fault = false; LOG_INFO("Fault reset"); break; } } // Защиты if (sensorData.quality == 0) { running = false; fault = true; output = 0; LOG_ERROR("Sensor fault! Emergency stop."); } if (sensorData.pressure > 12.0f) { running = false; fault = true; output = 0; LOG_ERROR("High pressure! %.2f bar > 12.0 bar", sensorData.pressure); } if (sensorData.temperature > 90.0f) { running = false; fault = true; output = 0; LOG_ERROR("Motor overtemp! %.1f°C > 90°C", sensorData.temperature); } // ПИД вычисление if (running && !fault) { float error = setpoint - sensorData.pressure; integral += ki * error * TS; integral = fmaxf(-50.0f, fminf(50.0f, integral)); // Anti-windup float derivative = -(sensorData.pressure - prevError) / TS; prevError = sensorData.pressure; output = kp * error + integral + kd * derivative; output = fmaxf(0.0f, fminf(100.0f, output)); } else { output = 0.0f; integral = 0.0f; } // Применяем управляющий сигнал VFD_SetFrequency(output * 0.5f); // 0-100% → 0-50 Гц } else { // Таймаут ожидания данных датчика — авария LOG_ERROR("Sensor data timeout!"); running = false; output = 0; VFD_SetFrequency(0); } } } // ===== ЗАДАЧА 3: MODBUS SLAVE (Приоритет 3) ===== static void vModbusTask(void *pvParam) { uint8_t rxBuf[64]; uint8_t rxLen = 0; LOG_INFO("Modbus task started"); for (;;) { // Ждём байт из UART (через семафор от прерывания) if (UART_WaitForData(rxBuf, &rxLen, pdMS_TO_TICKS(100))) { // Обрабатываем Modbus запрос if (Modbus_ProcessRequest(rxBuf, rxLen)) { // Если команда — отправляем в очередь управления Command_t cmd; if (Modbus_ExtractCommand(&cmd)) { xQueueSend(xCommandQueue, &cmd, pdMS_TO_TICKS(10)); } } } } } // ===== ЗАДАЧА 4: ЛОГИРОВАНИЕ (Приоритет 1, самый низкий) ===== static void vLoggingTask(void *pvParam) { LogMessage_t msg; const char *levelNames[] = {"DBG", "INF", "WRN", "ERR"}; for (;;) { // Ждём сообщение из очереди if (xQueueReceive(xLogQueue, &msg, portMAX_DELAY) == pdTRUE) { // Пишем в UART (захватываем мьютекс) if (xSemaphoreTake(xUARTMutex, pdMS_TO_TICKS(50)) == pdTRUE) { printf("[%6lu][%s] %s\r\n", (unsigned long)msg.timestamp_ms, levelNames[msg.level % 4], msg.message); xSemaphoreGive(xUARTMutex); } // Пишем на SD-карту (низкий приоритет = не мешаем критичным задачам) // SD_AppendLog(&msg); } } } // ===== ТАЙМЕР HEARTBEAT ===== static void vHeartbeatCallback(TimerHandle_t xTimer) { // Мигаем светодиодом — система жива HAL_GPIO_TogglePin(GPIOC, GPIO_PIN_13); } // ===== ИНИЦИАЛИЗАЦИЯ ===== void App_Init(void) { // Создаём очереди xSensorQueue = xQueueCreate(5, sizeof(SensorData_t)); xCommandQueue = xQueueCreate(10, sizeof(Command_t)); xLogQueue = xQueueCreate(20, sizeof(LogMessage_t)); // Создаём мьютексы xI2CMutex = xSemaphoreCreateMutex(); xUARTMutex = xSemaphoreCreateMutex(); // Создаём задачи xTaskCreate(vSensorTask, "Sensors", 512, NULL, 4, NULL); xTaskCreate(vControlTask, "Control", 1024, NULL, 5, NULL); xTaskCreate(vModbusTask, "Modbus", 512, NULL, 3, NULL); xTaskCreate(vLoggingTask, "Logging", 256, NULL, 1, NULL); // Создаём таймер heartbeat (500 мс) xHeartbeatTimer = xTimerCreate("Heartbeat", pdMS_TO_TICKS(500), pdTRUE, NULL, vHeartbeatCallback); xTimerStart(xHeartbeatTimer, 0); // Запуск планировщика vTaskStartScheduler(); // Никогда не должно дойти сюда! for (;;); } Очереди: безопасная передача данных между задачамиОчередь — это основной механизм коммуникации в FreeRTOS. Thread-safe, FIFO, блокирующий. // Создание очереди на 10 элементов типа uint32_t QueueHandle_t xQueue = xQueueCreate(10, sizeof(uint32_t)); // Отправка (из задачи) uint32_t value = 42; xQueueSend(xQueue, &value, pdMS_TO_TICKS(100)); // Ждём 100мс если полная // Отправка с высоким приоритетом (в начало очереди) xQueueSendToFront(xQueue, &value, 0); // Отправка из прерывания (другая функция!) BaseType_t xHigherPriorityTaskWoken = pdFALSE; xQueueSendFromISR(xQueue, &value, &xHigherPriorityTaskWoken); portYIELD_FROM_ISR(xHigherPriorityTaskWoken); // Уступить если нужно // Приём (блокирует до данных или таймаута) uint32_t received; if (xQueueReceive(xQueue, &received, portMAX_DELAY) == pdTRUE) { // Данные получены } // "Подсмотреть" без извлечения xQueuePeek(xQueue, &received, 0); // Мониторинг UBaseType_t count = uxQueueMessagesWaiting(xQueue); // Сколько элементов UBaseType_t space = uxQueueSpacesAvailable(xQueue); // Сколько свободно Семафоры и мьютексы: защита разделяемых ресурсов// ===== МЬЮТЕКС (Mutual Exclusion) ===== // Для защиты ресурсов (I2C, SPI, UART, глобальные переменные) SemaphoreHandle_t xMutex = xSemaphoreCreateMutex(); // Правильный паттерн: void safe_i2c_read(uint8_t addr, uint8_t reg, uint8_t *buf, uint8_t len) { if (xSemaphoreTake(xMutex, pdMS_TO_TICKS(100)) == pdTRUE) { HAL_I2C_Mem_Read(&hi2c1, addr, reg, 1, buf, len, 100); xSemaphoreGive(xMutex); } else { // Таймаут — логируем, возвращаем ошибку } } // ===== ДВОИЧНЫЙ СЕМАФОР (уведомление о событии) ===== // Задача ждёт событие от ISR SemaphoreHandle_t xDataReadySemaphore = xSemaphoreCreateBinary(); // В прерывании (данные готовы): void HAL_UART_RxCpltCallback(UART_HandleTypeDef *huart) { BaseType_t xWoken = pdFALSE; xSemaphoreGiveFromISR(xDataReadySemaphore, &xWoken); portYIELD_FROM_ISR(xWoken); } // В задаче (ждём данные): void vProcessingTask(void *pvParam) { for (;;) { // Эффективное ожидание — задача не потребляет CPU! xSemaphoreTake(xDataReadySemaphore, portMAX_DELAY); // Данные готовы — обрабатываем process_uart_data(); } } // ===== СЧЁТНЫЙ СЕМАФОР (ограничение конкурентного доступа) ===== // Пример: максимум 3 одновременных подключения SemaphoreHandle_t xConnectionSlots = xSemaphoreCreateCounting(3, 3); void handle_new_connection() { if (xSemaphoreTake(xConnectionSlots, pdMS_TO_TICKS(5000)) == pdTRUE) { // Слот получен serve_client(); xSemaphoreGive(xConnectionSlots); // Освободить слот } else { // Нет свободных слотов send_busy_response(); } } Программные таймеры// Таймер однократный (one-shot) vs периодический TimerHandle_t xOneShotTimer; TimerHandle_t xPeriodicTimer; void vTimerCallback(TimerHandle_t xTimer) { // pvTimerGetTimerID позволяет использовать один callback для многих таймеров uint32_t timerID = (uint32_t)pvTimerGetTimerID(xTimer); switch (timerID) { case 1: // Однократный таймер — отключить нагреватель через 30 сек Heater_Off(); break; case 2: // Периодический — опрос watchdog External_WDT_Kick(); break; } } void setup_timers(void) { // One-shot таймер (не перезапускается автоматически) xOneShotTimer = xTimerCreate( "Heater", pdMS_TO_TICKS(30000), // 30 секунд pdFALSE, // pdFALSE = one-shot (void *)1, // ID таймера vTimerCallback ); // Периодический таймер xPeriodicTimer = xTimerCreate( "WDT", pdMS_TO_TICKS(500), // 500 мс pdTRUE, // pdTRUE = периодический (void *)2, vTimerCallback ); xTimerStart(xPeriodicTimer, 0); // Запустить один-шот когда нужно: // xTimerStart(xOneShotTimer, 0); // Сбросить периодический (перезапустить отсчёт): // xTimerReset(xPeriodicTimer, 0); // Изменить период на лету: // xTimerChangePeriod(xPeriodicTimer, pdMS_TO_TICKS(1000), 0); } Управление памятью и отладка// Мониторинг стека задачи (важно для нахождения переполнений!) void vCheckStackTask(void *pvParam) { for (;;) { vTaskDelay(pdMS_TO_TICKS(5000)); // Минимальный остаток стека (в словах) с начала работы UBaseType_t hwm = uxTaskGetStackHighWaterMark(NULL); if (hwm < 50) { // Меньше 50 слов — опасно! printf("WARNING: Task '%s' stack low! HWM=%lu words\r\n", pcTaskGetName(NULL), (unsigned long)hwm); } } } // Вывод информации о всех задачах (для отладки) void vPrintTaskStats(void) { char buffer[512]; vTaskList(buffer); // Требует configUSE_TRACE_FACILITY=1 printf("Task Name\t\tState\tPrio\tStack\tNum\r\n%s", buffer); // Загрузка CPU по задачам (требует configGENERATE_RUN_TIME_STATS=1) vTaskGetRunTimeStats(buffer); printf("\r\nTask\t\t\tTime\t\t%%\r\n%s", buffer); } // Обработчик нехватки памяти void vApplicationMallocFailedHook(void) { taskDISABLE_INTERRUPTS(); printf("FATAL: malloc failed! Heap exhausted.\r\n"); for (;;); } // Переполнение стека задачи void vApplicationStackOverflowHook(TaskHandle_t xTask, char *pcTaskName) { taskDISABLE_INTERRUPTS(); printf("FATAL: Stack overflow in task '%s'!\r\n", pcTaskName); for (;;); } FreeRTOS на ESP32ESP-IDF (официальный SDK ESP32) использует FreeRTOS как основу. На двухядерном ESP32 задачи можно привязывать к конкретному ядру: // ESP32-специфичное создание задачи с указанием ядра xTaskCreatePinnedToCore( vWiFiTask, // Функция "WiFi", // Имя 8192, // Стек (в байтах для ESP32!) NULL, // Параметры 5, // Приоритет &xWiFiHandle, // Хендл 0 // Ядро: 0 = Protocol CPU, 1 = Application CPU ); // WiFi и Bluetooth — всегда на ядре 0 (Protocol CPU) // Ваш код — лучше на ядре 1 (Application CPU) // Это разделяет сетевой стек и бизнес-логику // Встроенный мониторинг задач ESP-IDF: void print_esp_task_info(void) { printf("Free heap: %u bytes\r\n", esp_get_free_heap_size()); printf("Min free heap: %u bytes\r\n", esp_get_minimum_free_heap_size()); } Типичные ошибки FreeRTOS1. Вызов обычных функций из ISR // НЕПРАВИЛЬНО — заблокирует прерывание! void HAL_GPIO_EXTI_Callback(uint16_t pin) { xQueueSend(xQueue, &data, portMAX_DELAY); // ОШИБКА: блокирует ISR! } // ПРАВИЛЬНО — FromISR версии функций: void HAL_GPIO_EXTI_Callback(uint16_t pin) { BaseType_t xWoken = pdFALSE; xQueueSendFromISR(xQueue, &data, &xWoken); portYIELD_FROM_ISR(xWoken); } 2. Бесконечная задача без yield // НЕПРАВИЛЬНО — монополизирует процессор! void vBadTask(void *p) { for (;;) { do_something(); // Нет vTaskDelay или блокирующего ожидания! } } // ПРАВИЛЬНО: void vGoodTask(void *p) { for (;;) { do_something(); vTaskDelay(pdMS_TO_TICKS(10)); // Уступаем хотя бы 10 мс } } 3. Доступ к глобальным данным без защиты // НЕПРАВИЛЬНО — race condition! float g_temperature = 0; void vSensor(void *p) { g_temperature = read_sensor(); } void vControl(void *p) { if (g_temperature > 80) alarm(); } // ПРАВИЛЬНО — через очередь или мьютекс ЗаключениеFreeRTOS превращает микроконтроллер из последовательного автомата в полноценную многозадачную систему. Это не усложнение ради усложнения — это решение реальных проблем: независимость задач, чёткие интерфейсы через очереди, защита ресурсов через мьютексы. Начните с малого: замените суперцикл двумя задачами — одна читает датчик, другая управляет выходом, общаются через очередь. Это уже даст почувствовать преимущества. FreeRTOS поддерживается на STM32, ESP32, Arduino (с ограничениями), Raspberry Pi Pico и десятках других платформ. Документация на freertos.org — отличная, с примерами и объяснениями.
-
Базы данных временных рядов: InfluxDB, TimescaleDB и промышленный historian
Зачем специализированная БД для временных рядовТехнологические данные — это всегда временной ряд: температура каждую секунду, давление каждые 100 мс, состояние оборудования каждые 10 мс. PostgreSQL или MySQL могут хранить такие данные. Но при миллионах записей в день начинаются проблемы. Почему реляционные БД плохо справляются: Индексы B-Tree неэффективны для временных запросов ("за последний час") Запись строк в таблицу с индексами — медленно при высоком темпе GROUP BY time_interval требует дорогих вычислений Партиционирование по времени нужно настраивать вручную Хранение тысяч тегов → тысячи колонок или плохая схема Что умеют Time-Series TSDB: Оптимизированная запись: 100 000+ точек/сек на скромном железе Встроенное сжатие (delta-delta, XOR float compression) Автоматические retention policies (TTL данных) Downsampling: автоматически агрегируем "горячие" данные в "холодные" Встроенные временны́е функции: moving average, rate, derivative InfluxDB 2.x: промышленный стандарт IoTОсновные концепцииMeasurement — аналог таблицы: measurement: "telemetry" Tags — индексированные метаданные (строки): tags: device="conveyor1", location="line1", area="factory" Fields — неиндексированные данные (числа, строки, bool): fields: temperature=87.3, current=15.5, running=true Timestamp — время с нано-точностью. Точка данных (Point): measurement,tags fields timestamp telemetry,device=conveyor1,location=line1 temperature=87.3,current=15.5 1710000000000000000 Почему Tags vs Fields важноTags: ИНДЕКСИРОВАНЫ → используйте для группировки/фильтрации device, location, sensor_type, unit_id Fields: НЕ индексированы → используйте для числовых данных temperature, pressure, current, voltage ОШИБКА: положить temperature в Tag — поиск по значению работает, но карданальность огромная → индекс разрастётся → InfluxDB замедлится. ОШИБКА: положить device_id в Field — нельзя эффективно фильтровать по устройству. Python клиент InfluxDB 2.x:from influxdb_client import InfluxDBClient, Point, WritePrecision from influxdb_client.client.write_api import SYNCHRONOUS, WriteOptions from datetime import datetime, timezone import time INFLUX_URL = "http://localhost:8086" INFLUX_TOKEN = "your-api-token-here" INFLUX_ORG = "factory" INFLUX_BUCKET = "process_data" # Клиент с батчевой записью client = InfluxDBClient(url=INFLUX_URL, token=INFLUX_TOKEN, org=INFLUX_ORG) write_api = client.write_api(write_options=WriteOptions( batch_size = 1000, # Накапливаем до 1000 точек flush_interval = 5_000, # Или сбрасываем каждые 5 секунд jitter_interval = 500, # ±500мс для сглаживания нагрузки retry_interval = 5_000, # Retry при ошибке через 5с max_retry_time = 180_000, # Максимум 3 минуты retry )) query_api = client.query_api() # ===== ЗАПИСЬ ===== def write_single_point(device: str, location: str, temperature: float, current: float, running: bool): """Запись одной точки""" point = ( Point("telemetry") .tag("device", device) .tag("location", location) .field("temperature", temperature) .field("current", current) .field("running", int(running)) # bool → int (InfluxDB лучше хранит) .time(datetime.now(timezone.utc)) ) write_api.write(bucket=INFLUX_BUCKET, record=point) def write_batch(measurements: list[dict]): """ Эффективная пакетная запись. measurements: [{'device': 'pump1', 'temp': 25.3, 'current': 12.1}, ...] """ points = [] for m in measurements: p = ( Point("telemetry") .tag("device", m['device']) .tag("location", m.get('location', 'unknown')) .field("temperature", float(m.get('temp', 0))) .field("current", float(m.get('current', 0))) .field("pressure", float(m.get('pressure', 0))) ) points.append(p) write_api.write(bucket=INFLUX_BUCKET, record=points) # Запись в нативном line protocol (максимальная производительность): def write_line_protocol(lines: list[str]): """ Прямая запись в line protocol — самый быстрый способ. Формат: measurement[,tag=value...] field=value[,field=value...] [timestamp] """ write_api.write(bucket=INFLUX_BUCKET, record='\n'.join(lines), write_precision=WritePrecision.NANOSECONDS) # Пример: lines = [ "telemetry,device=pump1,location=line1 temperature=87.3,current=15.5 1710000000000000000", "telemetry,device=pump2,location=line1 temperature=72.1,current=8.2 1710000000000000000", "telemetry,device=valve1,location=line2 position=75.0 1710000000000000000", ] write_line_protocol(lines) # ===== ЗАПРОСЫ (Flux) ===== def query_last_hour(device: str) -> list[dict]: """Последний час данных устройства""" flux = f''' from(bucket: "{INFLUX_BUCKET}") |> range(start: -1h) |> filter(fn: (r) => r._measurement == "telemetry") |> filter(fn: (r) => r.device == "{device}") |> filter(fn: (r) => r._field == "temperature" or r._field == "current") |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value") |> sort(columns: ["_time"]) ''' tables = query_api.query(flux) results = [] for table in tables: for record in table.records: results.append({ 'time': record.get_time().isoformat(), 'temperature': record.values.get('temperature'), 'current': record.values.get('current'), }) return results def query_aggregated_stats(device: str, window: str = "5m", range_start: str = "-24h") -> list[dict]: """ Агрегированная статистика по временным окнам. window: "1m", "5m", "1h", "1d" """ flux = f''' from(bucket: "{INFLUX_BUCKET}") |> range(start: {range_start}) |> filter(fn: (r) => r._measurement == "telemetry" and r.device == "{device}") |> filter(fn: (r) => r._field == "temperature") |> aggregateWindow( every: {window}, fn: (tables=<-, column) => tables |> reduce( identity: {{mean: 0.0, min: 99999.0, max: -99999.0, count: 0}}, fn: (r, accumulator) => ({{ mean: accumulator.mean + r._value, min: if r._value < accumulator.min then r._value else accumulator.min, max: if r._value > accumulator.max then r._value else accumulator.max, count: accumulator.count + 1, }}) ), createEmpty: false ) ''' # Для простого avg/min/max лучше использовать встроенные функции: flux_simple = f''' from(bucket: "{INFLUX_BUCKET}") |> range(start: {range_start}) |> filter(fn: (r) => r._measurement == "telemetry" and r.device == "{device}" and r._field == "temperature") |> aggregateWindow(every: {window}, fn: mean, createEmpty: false) |> yield(name: "mean") ''' tables = query_api.query(flux_simple) return [{'time': r.get_time().isoformat(), 'mean_temp': r.get_value()} for table in tables for r in table.records] def query_anomalies(threshold_high: float = 85.0, range_start: str = "-7d") -> list[dict]: """Поиск аномалий — превышений порога""" flux = f''' from(bucket: "{INFLUX_BUCKET}") |> range(start: {range_start}) |> filter(fn: (r) => r._measurement == "telemetry" and r._field == "temperature") |> filter(fn: (r) => r._value > {threshold_high}) |> group(columns: ["device"]) |> sort(columns: ["_time"], desc: true) ''' tables = query_api.query(flux) return [{ 'device': r.values.get('device'), 'time': r.get_time().isoformat(), 'value': r.get_value(), 'excess': round(r.get_value() - threshold_high, 2), } for table in tables for r in table.records] def query_device_availability(range_start: str = "-30d") -> list[dict]: """Доступность (availability) по устройствам за период""" flux = f''' import "math" total = from(bucket: "{INFLUX_BUCKET}") |> range(start: {range_start}) |> filter(fn: (r) => r._measurement == "telemetry" and r._field == "running") |> group(columns: ["device"]) |> count() |> rename(columns: {{_value: "total_count"}}) running = from(bucket: "{INFLUX_BUCKET}") |> range(start: {range_start}) |> filter(fn: (r) => r._measurement == "telemetry" and r._field == "running") |> filter(fn: (r) => r._value == 1) |> group(columns: ["device"]) |> count() |> rename(columns: {{_value: "running_count"}}) join(tables: {{total, running}}, on: ["device"]) |> map(fn: (r) => ({{ r with availability_pct: math.round(x: r.running_count / r.total_count * 1000.0) / 10.0 }})) ''' tables = query_api.query(flux) return [{'device': r.values.get('device'), 'availability': r.values.get('availability_pct')} for table in tables for r in table.records] Retention Policies и DownsamplingХранить сырые данные с секундным разрешением 10 лет — безумно дорого. Правильная стратегия: "Горячие" данные: 1 секунда, 30 дней → быстрый SSD "Тёплые" данные: 1 минута, 1 год → обычный SSD "Холодные" данные: 1 час, 10 лет → HDD/объектное хранилище Конфигурация в InfluxDB 2.x:# Создание bucket с retention 30 дней (сырые данные) influx bucket create \ --name process_data_raw \ --retention 30d \ --org factory # Bucket для агрегированных данных (бессрочно) influx bucket create \ --name process_data_aggregated \ --retention 0 \ --org factory Задача downsampling (Flux):def setup_downsampling_task(): """ Создаём задачу InfluxDB для автоматического downsampling. Каждые 5 минут агрегируем сырые данные в минутные. """ flux_task = ''' option task = { name: "Downsampling: raw→1min", every: 5m, // Запускать каждые 5 минут offset: 1m, // Смещение (ждём пока данные придут) } // Читаем сырые данные за последние 5 минут data = from(bucket: "process_data_raw") |> range(start: -task.every) |> filter(fn: (r) => r._measurement == "telemetry") // Агрегируем каждую числовую метрику data |> filter(fn: (r) => r._field == "temperature" or r._field == "current" or r._field == "pressure") |> aggregateWindow(every: 1m, fn: mean, createEmpty: false) |> set(key: "_measurement", value: "telemetry_1m") |> to(bucket: "process_data_aggregated") // Для бинарных данных (running) — используем last data |> filter(fn: (r) => r._field == "running") |> aggregateWindow(every: 1m, fn: last, createEmpty: false) |> set(key: "_measurement", value: "telemetry_1m") |> to(bucket: "process_data_aggregated") ''' # Создание задачи через API tasks_api = client.tasks_api() task = tasks_api.create_task_every( name="Downsampling: raw→1min", flux=flux_task, every="5m", organization=INFLUX_ORG ) print(f"Задача создана: {task.id}") TimescaleDB: PostgreSQL для временных рядовTimescaleDB — расширение PostgreSQL. Если вы уже используете PostgreSQL и знаете SQL — это лучший выбор. Вы получаете TSDB-оптимизации при сохранении полного SQL. -- Установка расширения CREATE EXTENSION IF NOT EXISTS timescaledb; -- Обычная таблица PostgreSQL CREATE TABLE telemetry ( time TIMESTAMPTZ NOT NULL, device TEXT NOT NULL, location TEXT NOT NULL, temperature FLOAT, current FLOAT, pressure FLOAT, running BOOLEAN, quality TEXT DEFAULT 'GOOD' ); -- Превращаем в hypertable (TimescaleDB магия!) SELECT create_hypertable('telemetry', 'time', chunk_time_interval => INTERVAL '1 day' -- Партиция = 1 день ); -- Индекс на часто используемые теги CREATE INDEX ON telemetry (device, time DESC); CREATE INDEX ON telemetry (location, time DESC); -- Compression (сжатие старых данных) ALTER TABLE telemetry SET ( timescaledb.compress, timescaledb.compress_segmentby = 'device', timescaledb.compress_orderby = 'time DESC' ); -- Автоматическое сжатие данных старше 7 дней SELECT add_compression_policy('telemetry', INTERVAL '7 days'); -- Автоматическое удаление старых данных (30 дней) SELECT add_retention_policy('telemetry', INTERVAL '30 days'); Запросы (обычный SQL!):-- Последний час данных с устройства SELECT time, temperature, current, running FROM telemetry WHERE device = 'pump1' AND time > NOW() - INTERVAL '1 hour' ORDER BY time DESC; -- Среднее по 5-минутным окнам SELECT time_bucket('5 minutes', time) AS bucket, device, ROUND(AVG(temperature)::numeric, 2) AS avg_temp, ROUND(MIN(temperature)::numeric, 2) AS min_temp, ROUND(MAX(temperature)::numeric, 2) AS max_temp, COUNT(*) AS samples FROM telemetry WHERE device = 'pump1' AND time > NOW() - INTERVAL '24 hours' GROUP BY bucket, device ORDER BY bucket DESC; -- Обнаружение аномалий (значение > avg + 2*stddev) WITH stats AS ( SELECT device, AVG(temperature) AS avg_temp, STDDEV(temperature) AS std_temp FROM telemetry WHERE time > NOW() - INTERVAL '7 days' GROUP BY device ) SELECT t.time, t.device, t.temperature, s.avg_temp, s.std_temp, (t.temperature - s.avg_temp) / NULLIF(s.std_temp, 0) AS z_score FROM telemetry t JOIN stats s ON t.device = s.device WHERE t.time > NOW() - INTERVAL '24 hours' AND ABS((t.temperature - s.avg_temp) / NULLIF(s.std_temp, 0)) > 2.0 ORDER BY ABS((t.temperature - s.avg_temp) / NULLIF(s.std_temp, 0)) DESC LIMIT 50; -- Доступность оборудования за месяц SELECT device, COUNT(*) FILTER (WHERE running = true) AS running_count, COUNT(*) AS total_count, ROUND( COUNT(*) FILTER (WHERE running = true)::numeric / COUNT(*) * 100, 1 ) AS availability_pct, SUM(CASE WHEN running THEN 1 ELSE 0 END) * EXTRACT(EPOCH FROM INTERVAL '1 second') / 3600.0 AS running_hours FROM telemetry WHERE time > NOW() - INTERVAL '30 days' GROUP BY device ORDER BY availability_pct DESC; Непрерывные агрегации (Continuous Aggregates):-- Создаём материализованное представление с автообновлением CREATE MATERIALIZED VIEW telemetry_5min WITH (timescaledb.continuous) AS SELECT time_bucket('5 minutes', time) AS bucket, device, location, AVG(temperature) AS avg_temp, MIN(temperature) AS min_temp, MAX(temperature) AS max_temp, AVG(current) AS avg_current, MAX(current) AS max_current, BOOL_OR(running) AS any_running, COUNT(*) AS sample_count FROM telemetry GROUP BY bucket, device, location WITH NO DATA; -- Автоматическое обновление каждые 5 минут SELECT add_continuous_aggregate_policy('telemetry_5min', start_offset => INTERVAL '15 minutes', end_offset => INTERVAL '5 minutes', schedule_interval => INTERVAL '5 minutes' ); -- Запрос к агрегированным данным (мгновенно!) SELECT * FROM telemetry_5min WHERE device = 'pump1' AND bucket > NOW() - INTERVAL '24 hours' ORDER BY bucket DESC; Python + SQLAlchemy + TimescaleDB:from sqlalchemy import create_engine, text from sqlalchemy.orm import Session import pandas as pd from datetime import datetime, timedelta, timezone DATABASE_URL = "postgresql://user:password@localhost:5432/factory_db" engine = create_engine(DATABASE_URL, pool_size=10, max_overflow=20) class TelemetryRepository: def write_batch(self, records: list[dict]) -> int: """Пакетная запись телеметрии""" if not records: return 0 with engine.begin() as conn: result = conn.execute( text(""" INSERT INTO telemetry (time, device, location, temperature, current, pressure, running) VALUES (:time, :device, :location, :temperature, :current, :pressure, :running) ON CONFLICT DO NOTHING """), records ) return result.rowcount def get_latest(self, device: str, fields: list[str] = None) -> dict | None: """Последнее значение устройства""" field_list = ', '.join(fields or ['temperature', 'current', 'pressure', 'running']) with engine.connect() as conn: row = conn.execute( text(f""" SELECT time, {field_list} FROM telemetry WHERE device = :device ORDER BY time DESC LIMIT 1 """), {'device': device} ).fetchone() return dict(row._mapping) if row else None def get_as_dataframe(self, device: str, hours: int = 24) -> pd.DataFrame: """Загрузка данных в Pandas DataFrame для анализа""" query = text(""" SELECT time, temperature, current, pressure, running FROM telemetry WHERE device = :device AND time > :since ORDER BY time """) with engine.connect() as conn: df = pd.read_sql( query, conn, params={'device': device, 'since': datetime.now(timezone.utc) - timedelta(hours=hours)}, parse_dates=['time'], index_col='time' ) return df def detect_anomalies_zscore(self, device: str, field: str = 'temperature', threshold: float = 2.5) -> pd.DataFrame: """Обнаружение аномалий методом z-score""" df = self.get_as_dataframe(device, hours=24) if df.empty or field not in df.columns: return pd.DataFrame() mean = df[field].mean() std = df[field].std() if std == 0: return pd.DataFrame() df['z_score'] = (df[field] - mean) / std anomalies = df[df['z_score'].abs() > threshold].copy() anomalies['is_high'] = anomalies['z_score'] > 0 return anomalies[['z_score', field, 'is_high']] def get_equipment_report(self, days: int = 30) -> pd.DataFrame: """Отчёт по оборудованию за период""" query = text(""" SELECT device, COUNT(*) as total_records, COUNT(*) FILTER (WHERE running) as running_records, ROUND((COUNT(*) FILTER (WHERE running)::numeric / COUNT(*) * 100)::numeric, 1) as availability_pct, ROUND(AVG(temperature)::numeric, 1) as avg_temp, ROUND(MAX(temperature)::numeric, 1) as max_temp, ROUND(AVG(current)::numeric, 2) as avg_current FROM telemetry WHERE time > NOW() - MAKE_INTERVAL(days => :days) GROUP BY device ORDER BY device """) with engine.connect() as conn: return pd.read_sql(query, conn, params={'days': days}) Выбор TSDB: сравнительная таблицаКритерий InfluxDB 2.x TimescaleDB Prometheus Основа Собственный движок PostgreSQL Собственный Язык запросов Flux (мощный, непривычный) SQL PromQL Производительность записи ★★★★★ ★★★★ ★★★ SQL-совместимость ❌ ✅ (полная) ❌ Сжатие ★★★★★ ★★★★ ★★★ Масштабирование InfluxDB Enterprise TimescaleDB Thanos/Cortex Лицензия BSL (OSS ограничен) Apache 2 Apache 2 Интеграция с Grafana ★★★★★ ★★★★★ ★★★★★ Лучше для IoT, большой объём тегов Существующий PostgreSQL-стек DevOps мониторинг ЗаключениеВыбор TSDB зависит от контекста. InfluxDB — лучший выбор для чистых IoT/телеметрия проектов: максимальная производительность, мощный Flux для временны́х вычислений, отличная экосистема. TimescaleDB — если уже есть PostgreSQL инфраструктура, нужны JOINs с другими данными или разработчики лучше знают SQL. Ключевые принципы для production: всегда настраивайте retention policies (данные должны автоматически удаляться), используйте downsampling для долгосрочного хранения агрегатов, настройте сжатие (экономия 90%+ дискового пространства), мониторьте производительность самой TSDB. Deadband-фильтрация на уровне edge-узла (не писать если значение не изменилось существенно) снижает нагрузку на БД в 5–50 раз для медленно меняющихся процессов. Это первое что нужно сделать перед любой оптимизацией TSDB.
-
Git и современные практики разработки: от хаоса к порядку
Git: это не просто "сохранить файл"Git изобрёл Линус Торвальдс в 2005 году за две недели — потому что существующие системы контроля версий его раздражали. Результат стал стандартом де-факто для всей современной разработки. Но большинство разработчиков используют только 10% возможностей Git: git add, git commit, git push. И потом удивляются, почему в команде хаос, история проекта нечитаема, а деплой — это страшный ритуал. Правильное использование Git — это не набор команд, это культура разработки. Сегодня разберём, как устроена эта культура в реальных командах. Анатомия правильного коммитаКоммит — это единица изменений. Плохой коммит: "исправил баги и добавил фичи". Хороший коммит: одно логическое изменение, понятное описание. Conventional Commits — стандарт сообщений<type>(<scope>): <description> [optional body] [optional footer(s)] Типы: feat — новая функциональность fix — исправление бага docs — только документация style — форматирование, точки с запятой (нет изменений логики) refactor — рефакторинг (нет новой функциональности, нет фикса) perf — оптимизация производительности test — добавление тестов chore — обслуживание: обновление зависимостей, конфигурации CI ci — изменения CI/CD конфигурации revert — откат предыдущего коммита Примеры: # Плохо: git commit -m "fix" git commit -m "wip" git commit -m "changes" git commit -m "поправил немного" # Хорошо: git commit -m "fix(auth): исправлена утечка токена при logout" git commit -m "feat(modbus): добавлена поддержка FC15 (write multiple coils)" git commit -m "perf(historian): оптимизирован batch-insert, +340% throughput" git commit -m "docs(api): добавлены примеры для /api/v1/devices endpoint" # С телом для сложных изменений: git commit -m "fix(plc): исправлено переполнение счётчика при rollover Счётчик типа UINT использовался для значений >65535. Изменён на DINT (32-бит). Затронутые устройства: все узлы с FC03. Closes #247" Почему это важно?Автоматический CHANGELOG — инструменты как conventional-changelog генерируют его автоматически Семантическое версионирование — feat → minor, fix → patch, feat! или BREAKING CHANGE → major Читаемая история — через год понятно что и зачем было сделано Быстрый поиск — git log --grep="fix(modbus)" найдёт все фиксы Modbus Стратегии ветвленияGit Flow — классика для релизного циклаmain ────────────────────────────────────── (production-ready, теги версий) \ / release/1.2.0 ─────────────────────────── (только bagfixes перед релизом) \ / develop ──────────────────────────────── (интеграция фич) \ \ / feat/A feat/B feat/C Ветки: main — всегда стабильный, деплоится в прод, только через merge из release/* develop — основная ветка разработки, всегда должна собираться feature/* — новые фичи, создаются из develop, мержатся в develop release/* — подготовка релиза (версия, changelog), только bugfix hotfix/* — срочные фиксы прод, мержатся в main И develop # Создать фичу: git checkout develop git checkout -b feature/modbus-fc15-support # Завершить фичу: git checkout develop git merge --no-ff feature/modbus-fc15-support # --no-ff сохраняет историю git branch -d feature/modbus-fc15-support # Подготовить релиз: git checkout develop git checkout -b release/1.2.0 # Обновить версию, CHANGELOG... git commit -m "chore(release): version 1.2.0" git checkout main git merge --no-ff release/1.2.0 git tag -a v1.2.0 -m "Release 1.2.0" git checkout develop git merge --no-ff release/1.2.0 Trunk-Based Development — для быстрых командВсе разработчики работают в одной ветке (main), фичи прячутся за feature-флагами. Деплой несколько раз в день. Подходит для опытных команд с хорошим покрытием тестами. # Только короткоживущие ветки (1-2 дня максимум) git checkout -b task/PLC-247-fix-counter-overflow # ... работа ... git push origin task/PLC-247-fix-counter-overflow # Pull Request → Review → Merge в main # Деплой автоматически GitHub Flow — для непрерывного деплояУпрощённый Git Flow без develop: main = то, что в проде Feature branches — от main, в main через PR Деплой = merge в main Code Review: как делать правильноCode review — не поиск ошибок, это обмен знаниями и повышение качества. Хороший review делает команду сильнее. Для автора PR:## Описание Добавлена поддержка записи нескольких coils (FC15) в Modbus slave. ## Мотивация Клиент запросил управление 16 выходными реле через один Modbus-запрос вместо 16 отдельных FC05. Уменьшает нагрузку на шину в 16 раз. ## Изменения - `ModbusSlave::handle_fc15()` — новый обработчик функционального кода 15 - Обновлён маппинг coils на GPIO пины - Добавлены unit-тесты: 8 тест-кейсов ## Тестирование - [x] Unit-тесты: все зелёные - [x] Интеграционный тест с реальным Modbus-мастером (Python pymodbus) - [x] Проверен на железе: Raspberry Pi + MCP2551 ## Breaking Changes Нет. FC05 продолжает работать. ## Связанные Issues Closes #247 Для ревьюера:Проверяйте: Логику — правильно ли реализовано то, что задумано? Граничные случаи — что при пустых данных? При переполнении? При сетевой ошибке? Безопасность — нет ли SQL-инъекций, XSS, незащищённых данных? Производительность — нет ли N+1 запросов, бесконечных циклов? Тесты — покрывают ли они описанную функциональность? Документацию — понятно ли из кода и комментариев что происходит? Не проверяйте: Стиль форматирования (для этого есть линтеры и форматтеры) Личные предпочтения (если оба подхода корректны) Тон комментариев: ❌ Это неправильно, так делать нельзя ✅ Здесь возможно переполнение при dlc > 8, как насчёт проверки? ❌ Почему ты использовал цикл вместо map()? ✅ Можно ли тут использовать list comprehension для читаемости? ❌ Нет, переделай. ✅ Мне кажется, паттерн Strategy тут подошёл бы лучше — как думаешь? GitHub Actions: автоматизация всего# .github/workflows/ci.yml name: CI/CD Pipeline on: push: branches: [ main, develop ] pull_request: branches: [ main, develop ] jobs: # ===== ЛИНТИНГ И СТАТИЧЕСКИЙ АНАЛИЗ ===== lint: name: Lint & Static Analysis runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - name: Set up Python uses: actions/setup-python@v4 with: python-version: '3.11' cache: 'pip' - name: Install dependencies run: | pip install flake8 pylint mypy black isort pip install -r requirements.txt - name: Check formatting (black) run: black --check --diff . - name: Check imports (isort) run: isort --check-only --diff . - name: Lint (flake8) run: flake8 . --max-line-length=100 --exclude=.venv,migrations - name: Type check (mypy) run: mypy src/ --strict --ignore-missing-imports # ===== ТЕСТИРОВАНИЕ ===== test: name: Unit & Integration Tests runs-on: ubuntu-latest needs: lint services: # Поднимаем сервисы для интеграционных тестов redis: image: redis:7 ports: ['6379:6379'] mosquitto: image: eclipse-mosquitto:2 ports: ['1883:1883'] strategy: matrix: python-version: ['3.10', '3.11', '3.12'] # Тестируем на всех версиях steps: - uses: actions/checkout@v4 - name: Set up Python ${{ matrix.python-version }} uses: actions/setup-python@v4 with: python-version: ${{ matrix.python-version }} cache: 'pip' - name: Install dependencies run: pip install -r requirements.txt -r requirements-dev.txt - name: Run tests with coverage run: | pytest tests/ \ --cov=src \ --cov-report=xml \ --cov-report=term-missing \ --cov-fail-under=80 \ -v env: REDIS_URL: redis://localhost:6379 MQTT_HOST: localhost - name: Upload coverage to Codecov uses: codecov/codecov-action@v3 with: file: ./coverage.xml # ===== СБОРКА DOCKER ОБРАЗА ===== build: name: Build & Push Docker Image runs-on: ubuntu-latest needs: test if: github.ref == 'refs/heads/main' || startsWith(github.ref, 'refs/tags/') steps: - uses: actions/checkout@v4 - name: Set up Docker Buildx uses: docker/setup-buildx-action@v3 - name: Login to Registry uses: docker/login-action@v3 with: registry: ghcr.io username: ${{ github.actor }} password: ${{ secrets.GITHUB_TOKEN }} - name: Extract metadata id: meta uses: docker/metadata-action@v5 with: images: ghcr.io/${{ github.repository }} tags: | type=ref,event=branch type=semver,pattern={{version}} type=semver,pattern={{major}}.{{minor}} type=sha,prefix=sha- - name: Build and push uses: docker/build-push-action@v5 with: context: . platforms: linux/amd64,linux/arm64 # Для x86 серверов И Raspberry Pi push: true tags: ${{ steps.meta.outputs.tags }} cache-from: type=gha cache-to: type=gha,mode=max # ===== ДЕПЛОЙ ===== deploy-staging: name: Deploy to Staging runs-on: ubuntu-latest needs: build if: github.ref == 'refs/heads/develop' environment: staging steps: - name: Deploy via SSH uses: appleboy/ssh-action@v1 with: host: ${{ secrets.STAGING_HOST }} username: deploy key: ${{ secrets.STAGING_SSH_KEY }} script: | cd /opt/gateway docker compose pull docker compose up -d --remove-orphans docker compose ps deploy-production: name: Deploy to Production runs-on: ubuntu-latest needs: build if: startsWith(github.ref, 'refs/tags/v') environment: production # Требует одобрения в GitHub steps: - name: Deploy to production uses: appleboy/ssh-action@v1 with: host: ${{ secrets.PROD_HOST }} username: deploy key: ${{ secrets.PROD_SSH_KEY }} script: | cd /opt/gateway export IMAGE_TAG=${{ github.ref_name }} docker compose pull docker compose up -d --remove-orphans # Smoke test sleep 10 curl -f http://localhost:8080/health || (docker compose logs && exit 1) Git Hooks: автоматизация на уровне репозитория# .git/hooks/pre-commit (запускается перед каждым коммитом) #!/bin/bash set -e echo "🔍 Pre-commit проверки..." # Форматирование Python if command -v black &> /dev/null; then black --check . --quiet if [ $? -ne 0 ]; then echo "❌ Форматирование не соответствует black. Запустите: black ." exit 1 fi fi # Быстрые тесты (только изменённые файлы) CHANGED_PY=$(git diff --cached --name-only --diff-filter=ACM | grep '\.py$') if [ -n "$CHANGED_PY" ]; then pytest tests/unit/ -x -q --tb=short fi echo "✅ Все проверки прошли" Лучше использовать pre-commit framework:# .pre-commit-config.yaml repos: - repo: https://github.com/pre-commit/pre-commit-hooks rev: v4.5.0 hooks: - id: trailing-whitespace - id: end-of-file-fixer - id: check-yaml - id: check-json - id: check-merge-conflict - id: detect-private-key # Не допускаем секреты в коде! - repo: https://github.com/psf/black rev: 23.10.1 hooks: - id: black - repo: https://github.com/pycqa/isort rev: 5.12.0 hooks: - id: isort - repo: https://github.com/commitizen-tools/commitizen rev: v3.12.0 hooks: - id: commitizen # Проверяет формат сообщения коммита # Установка: pip install pre-commit pre-commit install # Устанавливает хуки в .git/hooks/ pre-commit run --all-files # Запуск вручную Семантическое версионирование и автоматический релизSemantic Versioning: MAJOR.MINOR.PATCHPATCH (1.2.3 → 1.2.4): багфиксы, обратно совместимые изменения MINOR (1.2.4 → 1.3.0): новая функциональность, обратно совместимая MAJOR (1.3.0 → 2.0.0): несовместимые изменения API Автоматический выпуск с commitizen:# pyproject.toml: [tool.commitizen] name = "cz_conventional_commits" tag_format = "v$version" version_scheme = "semver" version_files = [ "src/__init__.py:__version__", "pyproject.toml:version" ] update_changelog_on_bump = true # Команды: cz bump # Автоматически определяет тип bumpa из коммитов cz bump --major # Принудительно major cz changelog # Генерирует CHANGELOG.md Автоматический CHANGELOG.md из conventional commits: ## v1.3.0 (2024-03-15) ### Features - **modbus**: добавлена поддержка FC15 (write multiple coils) (#247) - **historian**: реализован deadband-алгоритм сжатия, экономия 78% места ### Bug Fixes - **uart**: исправлена потеря байт при высоких нагрузках (#251) - **pid**: устранено интегральное насыщение при длительной работе ### Performance - **batch-write**: оптимизирован bulk insert в InfluxDB, +340% throughput Практические советы.gitignore — не игнорируйте важное# Python __pycache__/ *.pyc *.pyo .venv/ .env venv/ # IDE .idea/ .vscode/ *.swp *.swo # Сборка dist/ build/ *.egg-info/ # Тесты .coverage htmlcov/ .pytest_cache/ # Секреты (НИКОГДА не коммитить!) *.key *.pem .env.local config.secret.yaml # OS .DS_Store Thumbs.db Работа с секретами — никогда в репозиторий!# Плохо: секреты в коде MQTT_PASSWORD = "supersecret123" # Хорошо: из переменных окружения MQTT_PASSWORD = os.getenv("MQTT_PASSWORD") # В .env локально, в CI — secrets # Для локальной разработки: .env файл (в .gitignore!) # pip install python-dotenv from dotenv import load_dotenv load_dotenv() # Для проверки что секрет не утёк: git log --all --full-history -- "*.env" # Поиск в истории git grep "supersecret" # Поиск в текущем состоянии Интерактивный rebase для чистой истории# Перед мержем PR привести историю в порядок git rebase -i origin/main # Редактор покажет: # pick a1b2c3 WIP fix something # pick d4e5f6 another fix # pick g7h8i9 добавил логирование # Меняем на: # reword a1b2c3 fix(modbus): исправлен CRC при DLC=0 # squash d4e5f6 # Объединить с предыдущим # pick g7h8i9 feat(logging): добавлено структурированное логирование # Результат: чистая, осмысленная история ЗаключениеGit — это не инструмент, это язык коммуникации в команде. Правильные коммиты рассказывают историю проекта. Грамотное ветвление изолирует работу. CI/CD устраняет ручной труд и человеческие ошибки при деплое. Начните с малого: установите .pre-commit-config.yaml с black и detect-private-key. Перейдите на conventional commits. Добавьте один GitHub Actions workflow с тестами. Каждый из этих шагов принесёт немедленную пользу. Инвестиция в культуру работы с кодом возвращается многократно: меньше времени на дебаггинг, меньше страха перед деплоем, больше времени на реальную разработку.
-
Linux для встраиваемых систем: от Raspberry Pi до промышленного шлюза
Когда Linux вместо микроконтроллераВыбор между голым МК (Arduino/STM32) и Linux-системой (Raspberry Pi/промышленный ПК) — одно из ключевых архитектурных решений. Linux выигрывает когда нужно: Сложные сетевые протоколы (TCP/IP стек, TLS, MQTT, OPC UA) Работа с файлами: логирование, конфигурация, обновления ПО Высокоуровневые вычисления: Python/NumPy, ML-инференс Несколько параллельных задач с разной логикой Веб-интерфейс или REST API Большой объём RAM/Flash (база данных, historian) Микроконтроллер выигрывает когда нужно: Детерминированное реальное время (< 1 мс) Мгновенный старт (Linux загружается 10–30 секунд) Минимальное энергопотребление Дешёвое серийное производство Золотое правило: Linux для "мозга" и коммуникаций, микроконтроллер для "мышц" и реального времени. Оба — в одной системе, связанные UART или SPI. Raspberry Pi в промышленности: что учестьRaspberry Pi не проектировался для промышленного применения, но активно используется. Ключевые ограничения и решения: Ограничение 1: SD-карта умирает SD-карты не рассчитаны на постоянную запись. В промышленном применении — выход из строя за 3–12 месяцев. Решения: # 1. Read-only файловая система (overlayfs) # В /boot/cmdline.txt добавить: overlayroot=tmpfs # Данные писать только на специальный раздел с journaling # 2. Переместить tmpfs для логов в RAM # /etc/fstab: tmpfs /tmp tmpfs defaults,noatime,size=100m 0 0 tmpfs /var/log tmpfs defaults,noatime,size=50m 0 0 tmpfs /var/tmp tmpfs defaults,noatime,size=20m 0 0 # 3. Использовать SSD через USB3 или eMMC-модуль (CM4) Ограничение 2: Нет RTC (часов реального времени) При потере питания время сбивается. # Установить модуль DS3231 через I2C # /boot/config.txt: dtoverlay=i2c-rtc,ds3231 # Синхронизация при загрузке: sudo hwclock --hctosys # Hardware clock → System clock Ограничение 3: Нет аппаратного watchdog "из коробки" # Включить встроенный watchdog BCM2835 # /boot/config.txt: dtparam=watchdog=on # /etc/systemd/system.conf: RuntimeWatchdogSec=10 # Сброс если systemd не пингует 10 секунд RebootWatchdogSec=60 # Проверка: ls /dev/watchdog # Должен существовать GPIO: управление пинами из userspacesysfs (устаревший, но всё ещё работает):# Экспортируем GPIO 17 echo "17" > /sys/class/gpio/export # Устанавливаем направление echo "out" > /sys/class/gpio/gpio17/direction # Устанавливаем значение echo "1" > /sys/class/gpio/gpio17/value # Читаем состояние входа cat /sys/class/gpio/gpio18/value libgpiod (современный стандарт):# Установка sudo apt install gpiod libgpiod-dev # Командная строка gpioget gpiochip0 17 # Прочитать GPIO 17 gpioset gpiochip0 17=1 # Установить в HIGH gpioset gpiochip0 17=0 18=1 # Установить несколько gpioinfo gpiochip0 # Информация о всех пинах # Python + gpiod # pip install gpiod import gpiod import time # Открываем чип chip = gpiod.Chip('gpiochip0') # Настраиваем пины led_line = chip.get_line(17) button_line = chip.get_line(18) led_config = gpiod.LineRequest() led_config.consumer = "myapp" led_config.request_type = gpiod.LineRequest.DIRECTION_OUTPUT led_line.request(led_config) btn_config = gpiod.LineRequest() btn_config.consumer = "myapp" btn_config.request_type = gpiod.LineRequest.EVENT_BOTH_EDGES # Прерывания! button_line.request(btn_config) try: while True: # Ожидание события с таймаутом 100 мс event_happened = button_line.event_wait(nsec=100_000_000) if event_happened: event = button_line.event_read() if event.type == gpiod.LineEvent.RISING_EDGE: print("Кнопка нажата") led_line.set_value(1) else: print("Кнопка отпущена") led_line.set_value(0) finally: led_line.release() button_line.release() SPI и I2C из userspaceI2C (smbus2):# pip install smbus2 import smbus2 import time class BME280_Linux: """Работа с датчиком BME280 через Linux I2C""" ADDR = 0x76 REG_ID = 0xD0 REG_CTRL = 0xF4 REG_DATA = 0xF7 def __init__(self, bus_num: int = 1): self.bus = smbus2.SMBus(bus_num) def read_reg(self, reg: int) -> int: return self.bus.read_byte_data(self.ADDR, reg) def write_reg(self, reg: int, value: int): self.bus.write_byte_data(self.ADDR, reg, value) def read_burst(self, reg: int, length: int) -> bytes: return bytes(self.bus.read_i2c_block_data(self.ADDR, reg, length)) def init(self): chip_id = self.read_reg(self.REG_ID) if chip_id != 0x60: raise RuntimeError(f"BME280 не найден, ID={chip_id:#x}") # Нормальный режим, oversampling ×4 self.write_reg(0xF4, 0x97) # ctrl_meas self.write_reg(0xF5, 0xA0) # config: IIR filter 16 time.sleep(0.1) def read(self) -> dict: data = self.read_burst(self.REG_DATA, 6) raw_press = (data[0] << 12) | (data[1] << 4) | (data[2] >> 4) raw_temp = (data[3] << 12) | (data[4] << 4) | (data[5] >> 4) # Упрощённое преобразование (без компенсации) # В реальности нужно читать калибровочные коэффициенты! temp = raw_temp / 5120.0 pressure = raw_press / 25600.0 / 100.0 # гПа return {'temperature': round(temp, 1), 'pressure': round(pressure, 1)} # Использование: sensor = BME280_Linux(bus_num=1) # /dev/i2c-1 sensor.init() while True: data = sensor.read() print(f"T={data['temperature']}°C, P={data['pressure']}гПа") time.sleep(1) SPI (spidev):import spidev import time class MCP3208_ADC: """12-битный АЦП MCP3208 через SPI""" CHANNELS = 8 def __init__(self, bus: int = 0, device: int = 0, speed_hz: int = 1_000_000): self.spi = spidev.SpiDev() self.spi.open(bus, device) # /dev/spidev0.0 self.spi.max_speed_hz = speed_hz self.spi.mode = 0 def read_channel(self, channel: int) -> int: """Чтение канала 0-7, возвращает 0-4095""" if not 0 <= channel < self.CHANNELS: raise ValueError(f"Канал {channel} вне диапазона 0-7") # MCP3208: 3 байта транзакции # Байт 1: старт-бит + single/diff + D2 # Байт 2: D1, D0, X, X, X, X, X, X # Байт 3: X, X, X, X, X, X, X, X cmd = [0x06 | (channel >> 2), (channel & 0x03) << 6, 0x00] response = self.spi.xfer2(cmd) # Из ответа: байт 1 биты 1-0 + байт 2 все 8 бит = 12 бит result = ((response[1] & 0x0F) << 8) | response[2] return result def read_voltage(self, channel: int, vref: float = 3.3) -> float: """Чтение в вольтах""" raw = self.read_channel(channel) return raw * vref / 4095.0 def read_all(self, vref: float = 3.3) -> list: """Чтение всех 8 каналов""" return [self.read_voltage(ch, vref) for ch in range(self.CHANNELS)] def close(self): self.spi.close() # Использование: adc = MCP3208_ADC() while True: voltages = adc.read_all() for ch, v in enumerate(voltages): print(f"CH{ch}: {v:.3f}В", end=" ") print() time.sleep(0.5) UART и последовательный портimport serial import serial.tools.list_ports # Найти все доступные порты def list_serial_ports(): ports = serial.tools.list_ports.comports() for port in ports: print(f"{port.device}: {port.description} ({port.hwid})") # На Raspberry Pi: # /dev/ttyAMA0 или /dev/serial0 — встроенный UART (GPIO 14/15) # /dev/ttyUSB0 — USB-UART адаптер # /dev/ttyACM0 — USB CDC (Arduino) # Важно для RPi: отключить console на /dev/serial0 # sudo raspi-config → Interface Options → Serial Port # "Would you like a login shell to be accessible over serial?" → No # "Would you like the serial port hardware to be enabled?" → Yes class SerialDevice: """Надёжная работа с последовательным портом""" def __init__(self, port: str, baudrate: int = 9600, timeout: float = 1.0): self.port_name = port self.baudrate = baudrate self.timeout = timeout self.ser = None def connect(self): try: self.ser = serial.Serial( port = self.port_name, baudrate = self.baudrate, bytesize = serial.EIGHTBITS, parity = serial.PARITY_NONE, stopbits = serial.STOPBITS_ONE, timeout = self.timeout ) print(f"Подключено: {self.port_name}") return True except serial.SerialException as e: print(f"Ошибка подключения: {e}") return False def send(self, data: bytes) -> bool: try: self.ser.write(data) return True except serial.SerialException: return False def send_line(self, text: str) -> bool: return self.send((text + '\n').encode('utf-8')) def read_line(self) -> str | None: try: line = self.ser.readline() if line: return line.decode('utf-8', errors='replace').strip() return None except serial.SerialException: return None def close(self): if self.ser and self.ser.is_open: self.ser.close() systemd: управление сервисами приложенияПравильное промышленное приложение на Linux должно запускаться как системный сервис — автостарт при загрузке, перезапуск при сбое, логирование. Создание systemd unit:# /etc/systemd/system/industrial-gateway.service [Unit] Description=Industrial IoT Gateway After=network.target Wants=network-online.target After=network-online.target # Зависимость от другого сервиса (например, MQTT-брокера) # Requires=mosquitto.service # After=mosquitto.service [Service] Type=simple User=pi Group=pi WorkingDirectory=/opt/gateway # Переменные окружения из файла EnvironmentFile=/etc/gateway/config.env # Команда запуска ExecStart=/opt/gateway/venv/bin/python /opt/gateway/main.py # Перезапуск при сбое Restart=on-failure RestartSec=10s StartLimitIntervalSec=60s StartLimitBurst=3 # Максимум 3 попытки за 60 секунд # Watchdog интеграция с systemd # Приложение должно вызывать sd_notify WATCHDOG=1 каждые N секунд WatchdogSec=30s # Логирование StandardOutput=journal StandardError=journal SyslogIdentifier=gateway # Безопасность (опционально, но рекомендуется) NoNewPrivileges=true PrivateTmp=true [Install] WantedBy=multi-user.target # Установка и управление: sudo systemctl daemon-reload sudo systemctl enable industrial-gateway sudo systemctl start industrial-gateway sudo systemctl status industrial-gateway # Логи: journalctl -u industrial-gateway -f # В реальном времени journalctl -u industrial-gateway --since today journalctl -u industrial-gateway -n 100 # Последние 100 строк Watchdog из Python (sd-notify):# pip install sdnotify import sdnotify import time import threading notifier = sdnotify.SystemdNotifier() def watchdog_thread(): """Пингуем systemd watchdog каждые 10 секунд""" while True: notifier.notify("WATCHDOG=1") time.sleep(10) def main(): # Сообщаем systemd что мы готовы notifier.notify("READY=1") notifier.notify("STATUS=Инициализация...") # Запускаем watchdog в фоне wdg = threading.Thread(target=watchdog_thread, daemon=True) wdg.start() try: # Основной цикл приложения while True: notifier.notify("STATUS=Работает нормально") # ... бизнес-логика time.sleep(1) except Exception as e: notifier.notify(f"STATUS=ОШИБКА: {e}") raise Сетевая конфигурация для промышленного шлюза# /etc/dhcpcd.conf — статический IP для промышленной сети interface eth0 static ip_address=192.168.1.200/24 static routers=192.168.1.1 static domain_name_servers=192.168.1.1 8.8.8.8 # Или через NetworkManager (современный способ): sudo nmcli con add type ethernet ifname eth0 con-name industrial \ ipv4.method manual ipv4.addresses 192.168.1.200/24 \ ipv4.gateway 192.168.1.1 ipv4.dns "192.168.1.1 8.8.8.8" sudo nmcli con up industrial Bonding (резервирование сети):# Два сетевых интерфейса — один основной, второй резервный # /etc/network/interfaces: auto bond0 iface bond0 inet static address 192.168.1.200 netmask 255.255.255.0 gateway 192.168.1.1 bond-slaves eth0 eth1 bond-mode active-backup # При отказе eth0 — переключаемся на eth1 bond-miimon 100 # Проверка связи каждые 100 мс bond-primary eth0 Buildroot: минимальный Linux-образДля серийного производства не нужен полный Raspberry Pi OS с Python IDE и LibreOffice. Нужен минимальный образ с только нужными компонентами. Buildroot — система сборки кастомных Linux-образов: # Клонируем Buildroot git clone https://git.buildroot.net/buildroot cd buildroot # Начинаем с дефолтной конфигурации для Raspberry Pi make raspberrypi4_64_defconfig # Настраиваем через menuconfig make menuconfig # Target packages → Networking applications → mosquitto (MQTT-брокер) # Target packages → Libraries → python3 # Target packages → Libraries → python-paho-mqtt # System configuration → Root password # Собираем (первый раз ~1-2 часа) make -j4 # Результат: output/images/sdcard.img # Записываем на SD: sudo dd if=output/images/sdcard.img of=/dev/sdX bs=4M status=progress Преимущества кастомного образа: Размер: 50–200 МБ вместо 4–8 ГБ Быстрый старт: 5–8 секунд вместо 30+ Безопасность: минимальная поверхность атаки Reproducible builds: одинаковый образ на всех устройствах Python-приложение как надёжный шлюз#!/opt/gateway/venv/bin/python3 """ Промышленный IoT-шлюз: Modbus RTU → MQTT """ import logging import signal import sys import time import json import threading from pathlib import Path from typing import Optional import sdnotify import paho.mqtt.client as mqtt from pymodbus.client import ModbusSerialClient # Настройка логирования (в journald через stderr) logging.basicConfig( level=logging.INFO, format='%(asctime)s [%(levelname)s] %(name)s: %(message)s', stream=sys.stderr ) log = logging.getLogger('gateway') class IndustrialGateway: def __init__(self): self.running = False self.notifier = sdnotify.SystemdNotifier() self.mqtt_client: Optional[mqtt.Client] = None self.modbus_client: Optional[ModbusSerialClient] = None # Конфигурация из переменных окружения import os self.mqtt_host = os.getenv('MQTT_HOST', 'localhost') self.mqtt_port = int(os.getenv('MQTT_PORT', '1883')) self.modbus_port = os.getenv('MODBUS_PORT', '/dev/serial0') self.modbus_baud = int(os.getenv('MODBUS_BAUD', '9600')) self.poll_interval = float(os.getenv('POLL_INTERVAL', '1.0')) def setup(self): """Инициализация подключений""" log.info("Инициализация шлюза...") # MQTT self.mqtt_client = mqtt.Client(client_id="industrial-gateway") self.mqtt_client.on_connect = self._on_mqtt_connect self.mqtt_client.on_disconnect = self._on_mqtt_disconnect self.mqtt_client.will_set("gateway/status", '{"online": false}', retain=True) self.mqtt_client.connect_async(self.mqtt_host, self.mqtt_port) self.mqtt_client.loop_start() # Modbus self.modbus_client = ModbusSerialClient( port=self.modbus_port, baudrate=self.modbus_baud, timeout=1.0 ) if not self.modbus_client.connect(): log.error(f"Не удалось подключиться к Modbus: {self.modbus_port}") def _on_mqtt_connect(self, client, userdata, flags, rc): if rc == 0: log.info(f"MQTT подключён: {self.mqtt_host}") client.publish("gateway/status", '{"online": true}', retain=True) else: log.error(f"MQTT ошибка подключения: {rc}") def _on_mqtt_disconnect(self, client, userdata, rc): log.warning(f"MQTT отключён (rc={rc}), переподключение...") def poll_device(self, device_addr: int, reg_start: int, reg_count: int) -> Optional[list]: """Опрос устройства Modbus""" try: result = self.modbus_client.read_input_registers( address=reg_start, count=reg_count, slave=device_addr ) if result.isError(): log.warning(f"Modbus ошибка: устройство {device_addr}") return None return result.registers except Exception as e: log.error(f"Исключение при опросе {device_addr}: {e}") return None def publish(self, topic: str, data: dict): """Публикация в MQTT""" try: payload = json.dumps(data, ensure_ascii=False) self.mqtt_client.publish(topic, payload) except Exception as e: log.error(f"Ошибка публикации: {e}") def run(self): """Главный цикл""" self.running = True # Сигналы завершения signal.signal(signal.SIGTERM, self._shutdown) signal.signal(signal.SIGINT, self._shutdown) self.setup() # Сообщаем systemd о готовности self.notifier.notify("READY=1") log.info("Шлюз запущен и готов к работе") poll_count = 0 while self.running: loop_start = time.monotonic() # Опрашиваем частотник (адрес 1, регистры 0-5) vfd_data = self.poll_device(1, 0, 6) if vfd_data: self.publish("factory/line1/vfd1/telemetry", { 'status_word': vfd_data[0], 'freq_hz': vfd_data[1] / 100.0, 'current_a': vfd_data[2] / 10.0, 'voltage_v': vfd_data[3], 'power_kw': vfd_data[4] / 10.0, 'fault_code': vfd_data[5], 'running': bool(vfd_data[0] & 0x0001), 'fault': bool(vfd_data[0] & 0x0008), }) # Опрашиваем датчик давления (адрес 5, регистры 0-1) pressure_data = self.poll_device(5, 0, 2) if pressure_data: import struct raw = struct.pack('>HH', pressure_data[0], pressure_data[1]) pressure = struct.unpack('>f', raw)[0] self.publish("factory/line1/pressure1/telemetry", { 'pressure_bar': round(pressure, 2) }) poll_count += 1 # Watchdog if poll_count % 5 == 0: self.notifier.notify("WATCHDOG=1") self.notifier.notify(f"STATUS=Опросов: {poll_count}") # Точный таймер интервала опроса elapsed = time.monotonic() - loop_start sleep_time = self.poll_interval - elapsed if sleep_time > 0: time.sleep(sleep_time) elif sleep_time < -0.1: log.warning(f"Опрос занял {elapsed:.3f}с (лимит {self.poll_interval}с)") def _shutdown(self, signum, frame): log.info(f"Получен сигнал {signum}, завершаем работу...") self.running = False if self.mqtt_client: self.mqtt_client.publish("gateway/status", '{"online": false}', retain=True) self.mqtt_client.loop_stop() if self.modbus_client: self.modbus_client.close() sys.exit(0) if __name__ == "__main__": gateway = IndustrialGateway() gateway.run() Безопасность Linux-устройства в сети# Минимальный hardening для промышленного устройства # 1. Обновление системы (автоматически) sudo apt install unattended-upgrades sudo dpkg-reconfigure unattended-upgrades # 2. UFW файрвол sudo apt install ufw sudo ufw default deny incoming sudo ufw default allow outgoing sudo ufw allow ssh sudo ufw allow 1883 # MQTT (только если нужен снаружи) sudo ufw enable # 3. Отключить неиспользуемые сервисы sudo systemctl disable bluetooth sudo systemctl disable avahi-daemon # 4. Изменить стандартный пароль! passwd pi # ОБЯЗАТЕЛЬНО # 5. SSH ключи вместо паролей # Скопировать ключ: ssh-copy-id pi@192.168.1.200 # /etc/ssh/sshd_config: # PasswordAuthentication no # PermitRootLogin no # 6. Логирование попыток взлома sudo apt install fail2ban ЗаключениеLinux на встраиваемых системах — мощнейший инструмент. Raspberry Pi, как платформа, имеет ограничения для промышленного применения, но при правильной настройке (read-only FS, watchdog, systemd-сервисы, резервирование) служит надёжно годами. Для новых промышленных проектов рассмотрите специализированные платформы: Raspberry Pi CM4 (eMMC, нет SD-карты), BeagleBone Black (PRU для реального времени), Toradex Colibri/Apalis (промышленный температурный диапазон, многолетняя поддержка), IEI, Advantech (сертифицированные промышленные платформы). Главный принцип для промышленного Linux: устройство должно работать без участия человека годами. Watchdog, автоперезапуск сервисов, защита файловой системы от записи, автоматические обновления безопасности — это не опции, это базовые требования.
-
CAN Bus: протокол, который держит автомобили и заводы
Почему CAN Bus? История одного протокола1983 год. Инженеры Bosch смотрят на жгут проводки в Mercedes-Benz W126 и понимают, что так продолжаться не может. 1000+ метров провода, сотни коннекторов — система ненадёжна, дорога и тяжела. Им нужна шина данных, по которой все блоки управления могут общаться. В 1986 году появляется CAN (Controller Area Network). В 1991 году Mercedes внедряет CAN в S-класс. Сегодня нет ни одного автомобиля без CAN. И не только автомобиля: промышленные роботы, строительная техника, медицинское оборудование, поезда, самолёты. Секреты успеха: Надёжность — дифференциальный сигнал, устойчив к помехам Детерминизм — приоритетная схема без коллизий Простота — только 2 провода (CANH и CANL) Скорость — до 1 Мбит/с (Classic CAN), до 8 Мбит/с (CAN FD) Физический уровень: как это работаетCAN использует дифференциальную пару: два провода CANH и CANL. Информация кодируется разностью напряжений, а не абсолютным значением. Рецессивный бит (логическая 1): CANH ≈ 2.5В, CANL ≈ 2.5В → разность ≈ 0В Доминантный бит (логический 0): CANH ≈ 3.5В, CANL ≈ 1.5В → разность ≈ 2В Дифференциальный сигнал нечувствителен к синфазным помехам — если на оба провода наводится шум, разность остаётся неизменной. Именно поэтому CAN работает в двигательном отсеке автомобиля рядом с высоковольтной проводкой зажигания. ТопологияСтрого линейная шина с терминаторами 120 Ом на концах: [Узел A]──────[Узел B]──────[Узел C]──────[Узел D] 120Ом 120Ом Максимальная длина зависит от скорости: Скорость Макс. длина 1 Мбит/с 25 м 500 Кбит/с 100 м 250 Кбит/с 250 м 125 Кбит/с 500 м 10 Кбит/с 5000 м Структура CAN-фрейма (Standard 11-bit)SOF│ Identifier (11 бит) │RTR│IDE│r0│ DLC (4) │ Data (0-8 байт) │ CRC │ACK│ EOF 1 11 1 1 1 4 0-64 15 2 7 SOF (Start of Frame): 1 доминантный бит — все узлы синхронизируются. Identifier (ID, 11 бит): Идентифицирует тип сообщения, а не адрес отправителя/получателя. Одновременно определяет приоритет — чем меньше ID, тем выше приоритет. ID=0 — наивысший приоритет. RTR (Remote Transmission Request): Запрос данных от другого узла (редко используется). DLC (Data Length Code): Количество байт данных, 0–8. Data: Полезные данные, 0–8 байт. CRC (15 бит): Контрольная сумма для обнаружения ошибок. ACK: Все узлы, успешно принявшие фрейм, устанавливают доминантный бит в поле ACK. Отправитель проверяет — если ACK не получен, повторяет передачу. Extended Frame (29-bit ID)Для приложений где 2048 идентификаторов мало (J1939, CANopen): SOF│ ID_A (11) │SRR│IDE=1│ ID_B (18 бит) │RTR│... 29-битный ID даёт 536 870 912 возможных идентификаторов. Битовый арбитраж: без коллизийСамая элегантная часть CAN. Когда два узла начинают передачу одновременно — нет коллизии, как в Ethernet. Побеждает тот, у кого ID меньше (приоритетнее). Механизм: каждый передающий узел одновременно читает шину. Пока он видит то, что передаёт — продолжает. Как только видит расхождение (отправил рецессивный 1, а на шине доминантный 0 — значит другой узел передаёт доминантный бит с более высоким приоритетом) — немедленно прекращает передачу и переходит в режим приёма. Узел A: 0 0 0 1 0 ... (ID = 0b00010...) Узел B: 0 0 0 0 1 ... (ID = 0b00001...) Бит 4: Узел A передаёт рецессивный (1) Узел B передаёт доминантный (0) Шина показывает доминантный (0) → Узел A видит расхождение и ОСТАНАВЛИВАЕТСЯ → Узел B продолжает передачу Узел B выиграл арбитраж! Нет потерянных данных, нет задержек. Обработка ошибок и состояния узлаCAN имеет развитую систему самодиагностики. Каждый узел ведёт два счётчика: TEC (Transmit Error Counter) REC (Receive Error Counter) Состояния узла: Error Active (TEC<128, REC<128) — Нормальная работа ↓ TEC или REC ≥ 128 Error Passive (TEC≥128 или REC≥128) — Узел работает, но: - Не посылает Active Error Flags - Ждёт 8 рецессивных бит между передачами ↓ TEC ≥ 256 Bus Off — Узел ОТКЛЮЧЁН от шины (требует программного сброса или 128×11 рецессивных бит) Это важно: неисправный узел не "разваливает" шину, а сначала становится пассивным, затем отключается — остальные продолжают работать. STM32: встроенный CAN-контроллерSTM32F103 имеет встроенный bxCAN (Basic Extended CAN). Пины: PA11/PA12 или PB8/PB9. #include "stm32f1xx_hal.h" CAN_HandleTypeDef hcan; // ===== ИНИЦИАЛИЗАЦИЯ CAN 500 Кбит/с ===== void CAN_Init_500kbps(void) { hcan.Instance = CAN1; // Тайминг для 500 Кбит/с при тактовой 36 МГц // Bit time = Prescaler × (1 + BS1 + BS2) // 36 МГц / 4 / (1+7+2) = 900 Кбит/с ... нет, подберём: // 36 МГц / 9 / (1+3+2) = 500 Кбит/с ← правильно hcan.Init.Prescaler = 9; hcan.Init.Mode = CAN_MODE_NORMAL; // CAN_MODE_LOOPBACK для теста! hcan.Init.SyncJumpWidth = CAN_SJW_1TQ; hcan.Init.TimeSeg1 = CAN_BS1_3TQ; // BS1 = 3 TQ hcan.Init.TimeSeg2 = CAN_BS2_2TQ; // BS2 = 2 TQ hcan.Init.TimeTriggeredMode = DISABLE; hcan.Init.AutoBusOff = ENABLE; // Автоматический выход из Bus-Off hcan.Init.AutoWakeUp = DISABLE; hcan.Init.AutoRetransmission = ENABLE; // Автоповтор при ошибках hcan.Init.ReceiveFifoLocked = DISABLE; hcan.Init.TransmitFifoPriority= DISABLE; HAL_CAN_Init(&hcan); // ===== ФИЛЬТР ПРИЁМА ===== CAN_FilterTypeDef filter = {0}; // Принимать ВСЕ сообщения (маска 0 — все биты любые) filter.FilterBank = 0; filter.FilterMode = CAN_FILTERMODE_IDMASK; filter.FilterScale = CAN_FILTERSCALE_32BIT; filter.FilterIdHigh = 0x0000; filter.FilterIdLow = 0x0000; filter.FilterMaskIdHigh = 0x0000; // Маска 0 = принимать всё filter.FilterMaskIdLow = 0x0000; filter.FilterFIFOAssignment = CAN_RX_FIFO0; filter.FilterActivation = ENABLE; HAL_CAN_ConfigFilter(&hcan, &filter); // Активировать прерывания приёма HAL_CAN_ActivateNotification(&hcan, CAN_IT_RX_FIFO0_MSG_PENDING); HAL_CAN_Start(&hcan); } // ===== ОТПРАВКА СООБЩЕНИЯ ===== HAL_StatusTypeDef CAN_SendMessage(uint32_t id, uint8_t *data, uint8_t len) { CAN_TxHeaderTypeDef txHeader; uint32_t txMailbox; txHeader.StdId = id; // 11-битный ID txHeader.ExtId = 0; txHeader.IDE = CAN_ID_STD; txHeader.RTR = CAN_RTR_DATA; txHeader.DLC = len; txHeader.TransmitGlobalTime = DISABLE; return HAL_CAN_AddTxMessage(&hcan, &txHeader, data, &txMailbox); } // ===== ПРИЁМ ЧЕРЕЗ ПРЕРЫВАНИЕ ===== void HAL_CAN_RxFifo0MsgPendingCallback(CAN_HandleTypeDef *hcan_ptr) { CAN_RxHeaderTypeDef rxHeader; uint8_t rxData[8]; if (HAL_CAN_GetRxMessage(hcan_ptr, CAN_RX_FIFO0, &rxHeader, rxData) == HAL_OK) { uint32_t id = (rxHeader.IDE == CAN_ID_STD) ? rxHeader.StdId : rxHeader.ExtId; // Обработка по ID switch (id) { case 0x100: // Состояние узла 1 process_node1_status(rxData, rxHeader.DLC); break; case 0x200: // Измерения температуры process_temperature_data(rxData, rxHeader.DLC); break; default: // Неизвестное сообщение break; } } } // Пример обработки температурных данных // Договорённость: 2 байта = температура × 10 (signed) void process_temperature_data(uint8_t *data, uint8_t len) { if (len < 2) return; int16_t raw = (int16_t)((data[0] << 8) | data[1]); float temperature = raw / 10.0f; if (temperature > 80.0f) { // Высокая температура — принять меры activate_cooling(); } } // ===== ПРИМЕР: ПЕРИОДИЧЕСКАЯ ОТПРАВКА ДАННЫХ УЗЛА ===== void CAN_SendNodeStatus(void) { uint8_t data[8]; // Байт 0: статусные биты data[0] = 0x00; if (motor_running) data[0] |= 0x01; if (fault_active) data[0] |= 0x02; if (io_ready) data[0] |= 0x04; // Байты 1-2: скорость двигателя (об/мин × 10, unsigned) uint16_t speed_raw = (uint16_t)(motor_speed_rpm * 10.0f); data[1] = speed_raw >> 8; data[2] = speed_raw & 0xFF; // Байты 3-4: ток (А × 100, signed) int16_t current_raw = (int16_t)(motor_current_a * 100.0f); data[3] = current_raw >> 8; data[4] = current_raw & 0xFF; // Байты 5-6: температура (°C × 10, signed) int16_t temp_raw = (int16_t)(temperature_c * 10.0f); data[5] = temp_raw >> 8; data[6] = temp_raw & 0xFF; // Байт 7: номер пакета (для обнаружения потерь) static uint8_t packet_num = 0; data[7] = packet_num++; CAN_SendMessage(0x100, data, 8); } Arduino + MCP2515: добавляем CANArduino не имеет встроенного CAN. Используем MCP2515 — внешний CAN-контроллер с SPI. #include <SPI.h> #include <mcp2515.h> // Библиотека arduino-mcp2515 MCP2515 mcp2515(10); // CS pin struct can_frame rxMsg, txMsg; void setup() { Serial.begin(115200); SPI.begin(); mcp2515.reset(); mcp2515.setBitrate(CAN_500KBPS, MCP_8MHZ); // 500 Кбит/с, кварц 8 МГц mcp2515.setNormalMode(); // или setLoopbackMode() для теста без шины Serial.println("CAN Bus Ready"); } // Отправка измерений температуры void sendTemperature(float temp_celsius) { txMsg.can_id = 0x200; // ID нашего сообщения txMsg.can_dlc = 2; // 2 байта данных // Упаковываем: температура × 10, int16 int16_t raw = (int16_t)(temp_celsius * 10.0f); txMsg.data[0] = raw >> 8; txMsg.data[1] = raw & 0xFF; mcp2515.sendMessage(&txMsg); } void loop() { // Приём сообщений if (mcp2515.readMessage(&rxMsg) == MCP2515::ERROR_OK) { Serial.print("ID: 0x"); Serial.print(rxMsg.can_id, HEX); Serial.print(", DLC: "); Serial.print(rxMsg.can_dlc); Serial.print(", Data: "); for (int i = 0; i < rxMsg.can_dlc; i++) { Serial.print("0x"); Serial.print(rxMsg.data[i], HEX); Serial.print(" "); } Serial.println(); // Обработка сообщения статуса узла if (rxMsg.can_id == 0x100 && rxMsg.can_dlc >= 3) { bool running = rxMsg.data[0] & 0x01; bool fault = rxMsg.data[0] & 0x02; uint16_t speed_raw = (rxMsg.data[1] << 8) | rxMsg.data[2]; float speed = speed_raw / 10.0f; Serial.print("Узел 1: "); Serial.print(running ? "Работает" : "Стоит"); if (fault) Serial.print(" [АВАРИЯ]"); Serial.print(", Скорость: "); Serial.print(speed); Serial.println(" об/мин"); } } // Отправка своих данных каждые 100 мс static uint32_t lastSend = 0; if (millis() - lastSend >= 100) { lastSend = millis(); float temp = 25.0f + analogRead(A0) * 0.05f; sendTemperature(temp); } } Linux SocketCAN: мощный инструментарийНа Linux (Raspberry Pi, промышленные PC) CAN работает через SocketCAN — часть ядра с 2009 года. # Настройка CAN интерфейса # Если есть USB-CAN адаптер (Peak PCAN, Kvaser, SeedStudio) sudo ip link set can0 up type can bitrate 500000 # Или через модуль ядра для MCP2515 на Raspberry Pi # /boot/config.txt: # dtoverlay=mcp2515-can0,oscillator=8000000,interrupt=25 # Просмотр трафика candump can0 # Отправка сообщения: ID=0x100, 3 байта cansend can0 100#010203 # Фильтрация - только ID 0x100-0x1FF candump can0 100~1FF # Статистика ошибок ip -details -statistics link show can0 # Python + python-can # pip install python-can import can import struct import time # Создаём интерфейс bus = can.interface.Bus(channel='can0', bustype='socketcan') # Отправка def send_temperature(temp: float): raw = struct.pack('>h', int(temp * 10)) # big-endian signed short msg = can.Message(arbitration_id=0x200, data=raw, is_extended_id=False) bus.send(msg) # Приём с фильтрацией по ID bus.set_filters([ {"can_id": 0x100, "can_mask": 0x7FF, "extended": False}, # Только 0x100 {"can_id": 0x200, "can_mask": 0x7FF, "extended": False}, # И 0x200 ]) print("Ожидаем CAN-сообщения...") for msg in bus: if msg.arbitration_id == 0x100: status = msg.data[0] speed = struct.unpack('>H', bytes(msg.data[1:3]))[0] / 10.0 current = struct.unpack('>h', bytes(msg.data[3:5]))[0] / 100.0 temp = struct.unpack('>h', bytes(msg.data[5:7]))[0] / 10.0 print(f"Узел 100: {'Работает' if status & 1 else 'Стоит'}, " f"n={speed} об/мин, I={current}А, T={temp}°C") Протоколы высшего уровняJ1939 — для грузовой техники и дизелейJ1939 — стандарт SAE для коммуникации в коммерческом транспорте и тяжёлой технике. Работает поверх CAN с 29-битными ID. Структура J1939 ID (29 бит): Приоритет (3б) │ Reserved (1б) │ Data Page (1б) │ PGN (8б) │ Source Address (8б) Популярные PGN (Parameter Group Number): PGN 0xF004 (EEC1) — данные двигателя: обороты, момент, нагрузка PGN 0xFEF1 (CCVS) — скорость, круиз-контроль PGN 0xFEE5 (HOURS) — моточасы PGN 0xFEE6 (TIME) — время и дата CANopen — для промышленной автоматизацииCANopen — стандарт для промышленного оборудования: частотники, серводрайвы, I/O модули. Определяет: Словарь объектов (Object Dictionary) — структурированное хранилище всех параметров устройства PDO (Process Data Object) — быстрая передача данных реального времени (заменяет аналог 4-20мА) SDO (Service Data Object) — медленная конфигурация и чтение параметров NMT (Network Management) — управление состоянием узлов Heartbeat / Node Guarding — контроль жизнеспособности узлов Пример: частотник с CANopen. Через SDO читаем/пишем параметры (коэффициент разгона, макс. частота). Через PDO каждые 10 мс обмениваемся уставкой и текущими значениями. Диагностика сети CANПризнаки проблем:Симптом Вероятная причина Bus Off у одного узла Неисправный узел, помехи на кабеле Много ошибок CRC Неправильный биттайминг, плохая линия Узел не видит свои сообщения в ACK Он один на шине, некому подтверждать Периодические потери сообщений Нет терминаторов или два на одном конце Все узлы в Bus Off Неисправная нагрузка на шине Инструменты:PEAK PCAN-USB (~€80) + PCAN-View (бесплатно) — лучший бюджетный вариант Kvaser Leaf Light — популярен с CANalyzer Vector CANalyzer — профессиональный инструмент для автомобильных применений Linux candump / cansniffer — бесплатно, для socketcan-интерфейсов Wireshark с плагином SocketCAN — анализ на Linux CAN FD: следующее поколениеCAN FD (Flexible Data-rate) — развитие стандарта 2012 года. Нет обратной совместимости на физическом уровне, но концептуально тот же подход. Отличия от Classic CAN: До 64 байт данных в одном фрейме (против 8) До 8 Мбит/с в поле данных (при сохранении 1 Мбит/с для арбитража) Обязательная CRC 21-бит для надёжности при высоких скоростях CAN FD активно внедряется в новых автомобилях (все автомобили с AUTOSAR) и промышленной автоматизации. STM32G4, STM32H7 имеют встроенный FDCAN-контроллер. Практический чеклист для CAN-системы□ Терминаторы 120 Ом на ОБОИХ концах шины (и только там) □ Экранированная витая пара (для помехонагруженных сред) □ Максимальная длина ответвлений (stub) < 0.3 м □ Все узлы имеют ОДИНАКОВУЮ скорость и биттайминг □ Адреса узлов (если используются) уникальны □ Заземление: один общий провод + заземление экрана в одной точке □ Проверить напряжение CANH/CANL (рецесс.: оба ~2.5В, домин.: разность ~2В) □ Подключить анализатор и убедиться в отсутствии ошибок □ Задокументировать все ID и значения данных ЗаключениеCAN Bus — это элегантное инженерное решение, выдержавшее испытание десятилетиями. Детерминизм без коллизий, встроенная обработка ошибок, устойчивость к помехам — всё это делает CAN первым выбором для распределённых систем управления с жёсткими требованиями к надёжности. Для старта: MCP2515 + Arduino даёт минимальный стенд за $5. SocketCAN на Raspberry Pi — бесплатный анализатор. PCAN-USB — профессиональный инструмент за разумные деньги. Знание CAN открывает двери в automotive-разработку, промышленную автоматизацию и встраиваемые системы. Это инвестиция, которая окупается.
-
Промышленный IoT: MQTT, телеметрия и мониторинг оборудования
IIoT: не просто модное словоПромышленный IoT (IIoT — Industrial Internet of Things) — это не умный чайник и не фитнес-браслет. Это системы, которые собирают данные с реального производственного оборудования, анализируют их и помогают принимать решения. Реальный кейс: завод по производству подшипников. Раньше плановое ТО каждые 3 месяца — меняли подшипники в редукторах "по графику". После внедрения IIoT (вибродатчики на каждом редукторе + MQTT + аналитика): 30% редукторов работали нормально и менялись зря, 5% уже имели износ и могли выйти из строя раньше графика. Экономия на расходниках — 28%, аварийных остановок из-за поломки — минус 4 в год. Это и есть предиктивное обслуживание. И начинается оно с правильного сбора данных. Архитектура IIoT-системыУровень 0 — Полевые устройства: Датчики (температура, вибрация, давление, ток) Исполнительные механизмы Уровень 1 — Агрегаторы / Граничные узлы (Edge): ESP32, Raspberry Pi, промышленные шлюзы Протоколы: Modbus, 1-Wire, I2C, SPI, 4-20мА Уровень 2 — Брокер сообщений: MQTT Broker (Mosquitto, EMQX, HiveMQ) Нормализация данных, маршрутизация Уровень 3 — Обработка и хранение: Node-RED / Python — логика, алармы InfluxDB / TimescaleDB — временные ряды PostgreSQL — конфигурация, справочники Уровень 4 — Визуализация и аналитика: Grafana — дашборды, алерты Jupyter Notebook — анализ данных ML модели — предиктивная аналитика MQTT: почему именно онMQTT (Message Queuing Telemetry Transport) — лёгкий протокол публикации/подписки, разработанный IBM в 1999 году для телеметрии нефтепроводов через спутник. Идеален для IoT: Лёгкий: заголовок всего 2 байта, работает при 2G-соединении Асинхронный: устройства не опрашиваются, а сами публикуют данные QoS (Quality of Service): три уровня надёжности доставки Retain: брокер хранит последнее значение, новые подписчики сразу его получают Last Will: автоматическое сообщение при потере связи с устройством Уровни QoS:QoS 0 (At most once): Отправил и забыл. Быстро, но сообщение может потеряться. Для частых нечувствительных данных (телеметрия каждую секунду). QoS 1 (At least once): Гарантированная доставка, но возможны дубликаты. Для алармов и важных событий. QoS 2 (Exactly once): Ровно один раз. Медленнее, для критичных команд управления. Структура топиков (best practices):factory/ ← Завод ├── line1/ ← Производственная линия 1 │ ├── conveyor1/ ← Конвейер 1 │ │ ├── telemetry ← Данные датчиков (JSON, часто) │ │ ├── status ← Состояние (работает/стоит) │ │ ├── alarms ← Аварии │ │ └── commands ← Команды управления │ └── robot1/ │ └── telemetry └── utilities/ ├── compressor1/ │ └── telemetry └── hvac/ └── telemetry Примеры топиков: factory/line1/conveyor1/telemetry factory/line1/robot1/status factory/+/+/alarms ← Подписка на все аварии всей линии 1 factory/# ← Подписка на ВСЁ (осторожно!) Установка и настройка Mosquitto# Ubuntu/Debian sudo apt update sudo apt install mosquitto mosquitto-clients # Конфигурация /etc/mosquitto/mosquitto.conf: listener 1883 allow_anonymous false password_file /etc/mosquitto/passwd # TLS (обязательно для производства!): listener 8883 cafile /etc/ssl/certs/ca-certificates.crt certfile /etc/mosquitto/certs/server.crt keyfile /etc/mosquitto/certs/server.key require_certificate false # WebSocket для Node-RED и браузерных клиентов: listener 9001 protocol websockets # Логирование: log_dest file /var/log/mosquitto/mosquitto.log log_type all # Создание пользователя: sudo mosquitto_passwd -c /etc/mosquitto/passwd username sudo systemctl enable mosquitto sudo systemctl start mosquitto # Тест: mosquitto_sub -h localhost -u user -P pass -t "factory/#" -v & mosquitto_pub -h localhost -u user -P pass -t "factory/test" -m "hello" ESP32: узел сбора данныхESP32 — идеальный Edge-узел: WiFi/BT, 240 МГц, 520 КБ RAM, куча периферии, цена $3–5. #include <WiFi.h> #include <PubSubClient.h> #include <ArduinoJson.h> #include <Wire.h> #include "Adafruit_BME280.h" // ===== КОНФИГУРАЦИЯ ===== const char* WIFI_SSID = "Factory_WiFi"; const char* WIFI_PASSWORD = "secretpass"; const char* MQTT_SERVER = "192.168.1.100"; const int MQTT_PORT = 1883; const char* MQTT_USER = "esp32_node1"; const char* MQTT_PASS = "nodepass"; const char* DEVICE_ID = "conveyor1"; // Топики const char* TOPIC_TELEMETRY = "factory/line1/conveyor1/telemetry"; const char* TOPIC_STATUS = "factory/line1/conveyor1/status"; const char* TOPIC_ALARMS = "factory/line1/conveyor1/alarms"; const char* TOPIC_COMMANDS = "factory/line1/conveyor1/commands"; const char* TOPIC_WILL = "factory/line1/conveyor1/status"; WiFiClient espClient; PubSubClient mqtt(espClient); Adafruit_BME280 bme; // Состояние bool motorRunning = false; float setpoint = 50.0f; uint32_t lastPublish = 0; uint32_t uptime_sec = 0; // ===== ПОДКЛЮЧЕНИЕ ===== void connectWiFi() { WiFi.begin(WIFI_SSID, WIFI_PASSWORD); Serial.print("WiFi..."); while (WiFi.status() != WL_CONNECTED) { delay(500); Serial.print("."); } Serial.println(" OK"); Serial.println(WiFi.localIP()); } void connectMQTT() { while (!mqtt.connected()) { Serial.print("MQTT..."); // Last Will & Testament — сообщение при потере связи const char* willMsg = "{\"online\":false}"; if (mqtt.connect(DEVICE_ID, MQTT_USER, MQTT_PASS, TOPIC_WILL, 1, true, willMsg)) { Serial.println(" OK"); // Сообщение о подключении mqtt.publish(TOPIC_STATUS, "{\"online\":true}", true); // Подписываемся на команды mqtt.subscribe(TOPIC_COMMANDS, 1); // QoS 1 } else { Serial.printf(" Ошибка: %d\n", mqtt.state()); delay(5000); } } } // ===== ОБРАБОТКА КОМАНД ===== void mqttCallback(char* topic, byte* payload, unsigned int length) { // Парсим JSON команду StaticJsonDocument<256> doc; DeserializationError error = deserializeJson(doc, payload, length); if (error) { Serial.println("JSON error"); return; } String topicStr = String(topic); if (topicStr == TOPIC_COMMANDS) { // Команда пуск/стоп if (doc.containsKey("run")) { motorRunning = doc["run"].as<bool>(); Serial.printf("Команда: %s\n", motorRunning ? "ПУСК" : "СТОП"); } // Изменение уставки if (doc.containsKey("setpoint")) { setpoint = doc["setpoint"].as<float>(); Serial.printf("Уставка: %.1f\n", setpoint); } // Сброс аварии if (doc["reset_alarm"].as<bool>()) { Serial.println("Сброс аварии"); } } } // ===== ПУБЛИКАЦИЯ ДАННЫХ ===== void publishTelemetry() { // Читаем датчики float temperature = bme.readTemperature(); float humidity = bme.readHumidity(); float pressure = bme.readPressure() / 100.0f; int current_raw = analogRead(34); // 4-20мА через ACS712 float current_a = current_raw * 25.0f / 4095.0f; // 0-25А // Формируем JSON StaticJsonDocument<512> doc; doc["device_id"] = DEVICE_ID; doc["timestamp"] = millis() / 1000; doc["uptime"] = uptime_sec; doc["running"] = motorRunning; doc["setpoint"] = setpoint; JsonObject sensors = doc.createNestedObject("sensors"); sensors["temperature"] = round(temperature * 10) / 10.0; sensors["humidity"] = round(humidity * 10) / 10.0; sensors["pressure"] = round(pressure * 10) / 10.0; sensors["current"] = round(current_a * 100) / 100.0; // Диагностика устройства JsonObject diag = doc.createNestedObject("diagnostics"); diag["wifi_rssi"] = WiFi.RSSI(); diag["free_heap"] = ESP.getFreeHeap(); diag["cpu_freq"] = ESP.getCpuFreqMHz(); // Сериализация и публикация char payload[512]; serializeJson(doc, payload); mqtt.publish(TOPIC_TELEMETRY, payload, false); // QoS 0, не retain // Проверка алармов if (temperature > 80.0f) { StaticJsonDocument<128> alarm; alarm["type"] = "high_temperature"; alarm["value"] = temperature; alarm["limit"] = 80.0f; alarm["message"] = "Превышена температура двигателя!"; char alarmPayload[128]; serializeJson(alarm, alarmPayload); mqtt.publish(TOPIC_ALARMS, alarmPayload, true); // retain = true } } // ===== SETUP / LOOP ===== void setup() { Serial.begin(115200); Wire.begin(21, 22); if (!bme.begin(0x76)) { Serial.println("BME280 не найден!"); } connectWiFi(); mqtt.setServer(MQTT_SERVER, MQTT_PORT); mqtt.setCallback(mqttCallback); mqtt.setBufferSize(1024); // Увеличиваем буфер для больших сообщений connectMQTT(); } void loop() { // Переподключение при потере связи if (!mqtt.connected()) { if (WiFi.status() != WL_CONNECTED) { connectWiFi(); } connectMQTT(); } mqtt.loop(); // Публикация каждые 5 секунд if (millis() - lastPublish >= 5000) { lastPublish = millis(); uptime_sec += 5; publishTelemetry(); } } Python: обработка и алармыimport paho.mqtt.client as mqtt import json import time from datetime import datetime from influxdb_client import InfluxDBClient, Point from influxdb_client.client.write_api import SYNCHRONOUS # ===== КОНФИГУРАЦИЯ ===== MQTT_BROKER = "192.168.1.100" MQTT_PORT = 1883 MQTT_USER = "backend" MQTT_PASS = "backendpass" INFLUX_URL = "http://localhost:8086" INFLUX_TOKEN = "your-influx-token" INFLUX_ORG = "factory" INFLUX_BUCKET = "telemetry" # ===== ИНИЦИАЛИЗАЦИЯ ===== influx_client = InfluxDBClient(url=INFLUX_URL, token=INFLUX_TOKEN, org=INFLUX_ORG) write_api = influx_client.write_api(write_options=SYNCHRONOUS) # Состояние алармов (дедупликация) active_alarms = {} def on_message(client, userdata, msg): topic = msg.topic try: data = json.loads(msg.payload.decode()) except json.JSONDecodeError: print(f"Ошибка JSON в топике {topic}") return # Маршрутизация по топику if "/telemetry" in topic: handle_telemetry(topic, data) elif "/alarms" in topic: handle_alarm(topic, data) elif "/status" in topic: handle_status(topic, data) def handle_telemetry(topic: str, data: dict): """Запись телеметрии в InfluxDB""" # Извлекаем путь устройства из топика # factory/line1/conveyor1/telemetry → ['factory', 'line1', 'conveyor1', 'telemetry'] parts = topic.split('/') if len(parts) < 4: return location = parts[1] # line1 device = parts[2] # conveyor1 sensors = data.get('sensors', {}) # Формируем точку данных для InfluxDB point = ( Point("telemetry") .tag("location", location) .tag("device", device) .tag("device_id", data.get('device_id', device)) .field("temperature", float(sensors.get('temperature', 0))) .field("humidity", float(sensors.get('humidity', 0))) .field("pressure", float(sensors.get('pressure', 0))) .field("current", float(sensors.get('current', 0))) .field("running", int(data.get('running', False))) .field("wifi_rssi", int(data.get('diagnostics', {}).get('wifi_rssi', 0))) ) try: write_api.write(bucket=INFLUX_BUCKET, record=point) except Exception as e: print(f"InfluxDB ошибка: {e}") # Проверка пороговых значений temp = sensors.get('temperature', 0) if temp > 85.0: send_alert(device, "critical", f"Критическая температура: {temp}°C") elif temp > 75.0: send_alert(device, "warning", f"Высокая температура: {temp}°C") def handle_alarm(topic: str, data: dict): """Обработка аларм-сообщений от устройства""" parts = topic.split('/') device = parts[2] if len(parts) >= 3 else "unknown" alarm_type = data.get('type', 'unknown') alarm_key = f"{device}_{alarm_type}" # Дедупликация: не спамим одинаковые алармы now = time.time() if alarm_key in active_alarms: if now - active_alarms[alarm_key] < 300: # 5 минут return active_alarms[alarm_key] = now print(f"🚨 АВАРИЯ [{device}]: {data.get('message', alarm_type)}") # Здесь можно добавить отправку в Telegram, email, SMS send_notification( f"⚠️ Авария на {device}\n" f"Тип: {alarm_type}\n" f"Значение: {data.get('value', 'N/A')}\n" f"Время: {datetime.now().strftime('%H:%M:%S')}" ) def handle_status(topic: str, data: dict): """Отслеживание онлайн/офлайн устройств""" parts = topic.split('/') device = parts[2] if len(parts) >= 3 else "unknown" online = data.get('online', False) print(f"{'🟢' if online else '🔴'} {device}: {'онлайн' if online else 'офлайн'}") if not online: send_alert(device, "critical", f"Устройство {device} потеряло связь!") def send_alert(device: str, level: str, message: str): """Отправка алерта (пример — в лог)""" timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") print(f"[{timestamp}] {level.upper()} [{device}]: {message}") def send_notification(text: str): """Здесь интеграция с Telegram Bot API""" # import requests # requests.post(f"https://api.telegram.org/bot{TOKEN}/sendMessage", # json={"chat_id": CHAT_ID, "text": text}) print(f"NOTIFICATION: {text}") # ===== ЗАПУСК ===== client = mqtt.Client(client_id="backend_processor") client.username_pw_set(MQTT_USER, MQTT_PASS) client.on_message = on_message client.connect(MQTT_BROKER, MQTT_PORT, 60) client.subscribe("factory/#", qos=1) # Подписка на всё print("Backend запущен, ожидаем данные...") client.loop_forever() InfluxDB + Grafana: красивые дашбордыУстановка через Docker Compose:# docker-compose.yml version: '3.8' services: mosquitto: image: eclipse-mosquitto:2 ports: - "1883:1883" - "9001:9001" volumes: - ./mosquitto/config:/mosquitto/config - mosquitto_data:/mosquitto/data influxdb: image: influxdb:2.7 ports: - "8086:8086" environment: DOCKER_INFLUXDB_INIT_MODE: setup DOCKER_INFLUXDB_INIT_USERNAME: admin DOCKER_INFLUXDB_INIT_PASSWORD: secretpassword DOCKER_INFLUXDB_INIT_ORG: factory DOCKER_INFLUXDB_INIT_BUCKET: telemetry DOCKER_INFLUXDB_INIT_RETENTION: 30d # Хранение 30 дней volumes: - influxdb_data:/var/lib/influxdb2 grafana: image: grafana/grafana:latest ports: - "3000:3000" environment: GF_SECURITY_ADMIN_PASSWORD: grafanapass GF_PLUGINS_ALLOW_LOADING_UNSIGNED_PLUGINS: "yesoreyeram-infinity-datasource" volumes: - grafana_data:/var/lib/grafana depends_on: - influxdb node-red: image: nodered/node-red:latest ports: - "1880:1880" volumes: - nodered_data:/data volumes: mosquitto_data: influxdb_data: grafana_data: nodered_data: Flux-запрос в Grafana (температура за последний час):from(bucket: "telemetry") |> range(start: -1h) |> filter(fn: (r) => r._measurement == "telemetry") |> filter(fn: (r) => r._field == "temperature") |> filter(fn: (r) => r.device == "conveyor1") |> aggregateWindow(every: 1m, fn: mean, createEmpty: false) |> yield(name: "mean_temperature") Предиктивная аналитика: пример с вибрациейimport numpy as np from scipy.fft import fft, fftfreq from scipy.stats import zscore def analyze_vibration(samples: list, sample_rate: int = 1000) -> dict: """ Анализ вибрации для обнаружения износа подшипников. samples: список отсчётов акселерометра sample_rate: частота дискретизации, Гц """ data = np.array(samples, dtype=float) n = len(data) # Временные характеристики rms = np.sqrt(np.mean(data**2)) # Эффективное значение (RMS) peak = np.max(np.abs(data)) # Пиковое значение crest = peak / rms if rms > 0 else 0 # Пик-фактор (norma: 1.4–2.5, износ: >4) kurtosis = float(np.mean((data - np.mean(data))**4) / (np.std(data)**4 + 1e-10)) # Спектральный анализ (FFT) spectrum = np.abs(fft(data))[:n//2] freqs = fftfreq(n, 1.0 / sample_rate)[:n//2] # Поиск доминирующих частот top_indices = np.argsort(spectrum)[-5:][::-1] dominant_freqs = [(float(freqs[i]), float(spectrum[i])) for i in top_indices] # Оценка состояния # Crest Factor: <2.5 — норма, 2.5–4 — внимание, >4 — износ # Kurtosis: <3 — норма (гауссов шум), >6 — дефект (удары) if crest > 4.0 or kurtosis > 6.0: status = "FAULT" recommendation = "Замените подшипник в течение 48 часов" elif crest > 2.5 or kurtosis > 4.5: status = "WARNING" recommendation = "Запланируйте замену при следующем ТО" else: status = "OK" recommendation = "Оборудование в норме" return { 'rms': round(rms, 4), 'peak': round(peak, 4), 'crest_factor': round(crest, 2), 'kurtosis': round(kurtosis, 2), 'dominant_freqs': dominant_freqs[:3], 'status': status, 'recommendation': recommendation, } # Пример использования с MQTT: def on_vibration_data(client, userdata, msg): data = json.loads(msg.payload) samples = data['samples'] device = data['device_id'] analysis = analyze_vibration(samples, sample_rate=data.get('sample_rate', 1000)) # Публикуем результат анализа result_topic = f"factory/analytics/{device}/bearing_health" client.publish(result_topic, json.dumps(analysis), retain=True) if analysis['status'] != 'OK': print(f"⚠️ {device}: {analysis['recommendation']}") # Отправить уведомление... Безопасность IIoTЭто не опционально. Промышленные системы с интернет-подключением — лакомая цель для атак. Минимальный стандарт: TLS везде — Mosquitto с сертификатами, никакого plaintext Аутентификация — уникальный логин/пароль для каждого устройства, или X.509 сертификаты Авторизация по ACL — устройство conveyor1 пишет только в factory/line1/conveyor1/#, не может читать чужие команды Сегментация сети — IoT-устройства в отдельном VLAN, без прямого доступа в интернет OTA-обновления — возможность удалённого обновления прошивки при обнаружении уязвимостей Мониторинг аномалий — необычное количество сообщений, соединения с нестандартных IP # Mosquitto ACL файл /etc/mosquitto/acl: # Устройство conveyor1 — пишет только в свои топики user esp32_conveyor1 topic write factory/line1/conveyor1/+ topic read factory/line1/conveyor1/commands # Backend — читает всё, пишет команды user backend topic readwrite factory/# ЗаключениеIIoT с MQTT — это доступная и проверенная технология, которую можно внедрить даже на небольшом предприятии с минимальными затратами. ESP32 + Mosquitto + InfluxDB + Grafana — весь стек работает на одном Raspberry Pi 4 или бюджетном сервере. Главные принципы: данные должны быть точными (правильная калибровка датчиков), надёжными (QoS, переподключение, buffering), безопасными (TLS, ACL) и полезными (не просто собирать, а анализировать и действовать). Начните с малого: один датчик температуры, один MQTT-брокер, один Grafana-дашборд. После первого успешного графика желание расширять систему появится само.
-
ПИД-регулятор: теория, настройка и реальные примеры
Что такое ПИД и почему он вездеПИД-регулятор (Пропорционально-Интегрально-Дифференциальный) — самый распространённый алгоритм автоматического управления в промышленности. По различным оценкам, более 90% всех промышленных регуляторов используют ПИД или его вариации. Термостат в вашей духовке. Круиз-контроль в автомобиле. Система стабилизации квадрокоптера. Регулятор давления в насосной станции. Система автопилота самолёта. Везде — ПИД. Почему? Потому что он: Прост в понимании — три компоненты, три параметра Работает для подавляющего большинства объектов управления Хорошо изучен — 80 лет теории и практики Легко реализуется — 10–20 строк кода Математика: просто о сложномПусть у нас есть: SP (Setpoint) — желаемое значение (уставка) PV (Process Variable) — измеренное значение e(t) = SP – PV — ошибка регулирования u(t) — управляющий сигнал (выход регулятора) Формула ПИД: u(t) = Kp × e(t) + Ki × ∫e(t)dt + Kd × de(t)/dt │ │ │ Пропорциональная Интегральная Дифференциальная составляющая составляющая составляющая Разберём каждую составляющую на примере: нагреватель, цель — держать 80°C. P — Пропорциональная составляющаяУправляющий сигнал пропорционален текущей ошибке. Температура 60°C, уставка 80°C, ошибка 20°C → включить нагрев на 20×Kp%. Проблема чистого P-регулятора: статическая ошибка (offset). При стабильном нагреве нагрев должен компенсировать теплопотери, значит ошибка никогда не станет нулём — иначе нагрев выключится и температура упадёт. Система "зависает" с постоянной небольшой ошибкой. I — Интегральная составляющаяСуммирует ошибку со временем. Даже маленькая постоянная ошибка, накапливаясь, создаёт всё больший управляющий сигнал — и в конечном счёте устраняет статическую ошибку. Проблема: интегральное насыщение (windup). Если выход ограничен (0–100%), а ошибка долго накапливается (например, нагрев не справляется при открытом окне), интеграл "разгоняется" до огромного значения. Потом, когда ошибка уменьшается, нужно много времени чтобы интеграл "разрядился" — система перерегулирует. D — Дифференциальная составляющаяРеагирует на скорость изменения ошибки. Если ошибка быстро уменьшается (температура быстро растёт к уставке) — D-составляющая "притормаживает" для предотвращения перерегулирования. Проблема: шум. Производная усиливает высокочастотный шум измерений. Любой дёрг датчика превращается в огромный кратковременный скачок управляющего сигнала. Поэтому D всегда применяется с фильтром. Дискретная реализация (для цифровых систем)В реальных системах регулятор выполняется с дискретным шагом Ts (время выборки). Непрерывная формула преобразуется в разностное уравнение: P = Kp × e[k] I = I[k-1] + Ki × e[k] × Ts D = Kd × (e[k] - e[k-1]) / Ts u[k] = P + I + D Но правильная реализация сложнее. Полная профессиональная реализация: class PIDController: """ Промышленный ПИД-регулятор с: - Anti-windup (ограничение интеграла) - Производная по измерению (не по ошибке) - Фильтр производной - Ограничение выхода - Bump-less transfer (безударное переключение авто/ручной) """ def __init__(self, kp: float, ki: float, kd: float, ts: float, # Период дискретизации, секунды out_min: float = 0.0, out_max: float = 100.0, filter_coeff: float = 0.1): # Коэффициент фильтра D self.kp = kp self.ki = ki self.kd = kd self.ts = ts self.out_min = out_min self.out_max = out_max self.filter_coeff = filter_coeff # Nf: чем меньше, тем сильнее фильтрация # Состояние self.setpoint = 0.0 self.integral = 0.0 self.prev_measurement = None self.filtered_deriv = 0.0 self.output = 0.0 # Режим self.auto_mode = True def set_setpoint(self, sp: float): self.setpoint = sp def set_manual(self, output: float): """Ручной режим — задать выход напрямую""" self.auto_mode = False self.output = max(self.out_min, min(self.out_max, output)) # При переходе обратно в авто — интеграл синхронизируется def set_auto(self): """Переход в автоматический режим (bump-less)""" if not self.auto_mode: # Инициализируем интеграл текущим выходом # чтобы не было скачка при переключении self.integral = self.output self.auto_mode = True def compute(self, measurement: float) -> float: """ Вычислить управляющий сигнал. measurement — текущее значение регулируемой величины. Вызывать строго с периодом ts! """ if not self.auto_mode: return self.output if self.prev_measurement is None: self.prev_measurement = measurement error = self.setpoint - measurement # === Пропорциональная составляющая === p_term = self.kp * error # === Интегральная составляющая с anti-windup === # Предварительно вычисляем интеграл integral_new = self.integral + self.ki * error * self.ts # === Дифференциальная составляющая === # Производная по ИЗМЕРЕНИЮ (не по ошибке) — избегаем derivative kick # при изменении уставки raw_deriv = -(measurement - self.prev_measurement) / self.ts # Фильтр производной (экспоненциальный) self.filtered_deriv = (self.filter_coeff * raw_deriv + (1 - self.filter_coeff) * self.filtered_deriv) d_term = self.kd * self.filtered_deriv # === Предварительный выход === output_raw = p_term + integral_new + d_term # === Ограничение выхода === self.output = max(self.out_min, min(self.out_max, output_raw)) # === Anti-windup: обновляем интеграл только если не насыщены === # Или используем "back-calculation anti-windup" if self.output == output_raw: # Нет насыщения — обновляем интеграл нормально self.integral = integral_new else: # Насыщение — не накапливаем интеграл дальше # Back-calculation: уменьшаем интеграл на величину насыщения saturation = output_raw - self.output self.integral = integral_new - saturation self.prev_measurement = measurement return self.output def get_components(self) -> dict: """Для отладки и мониторинга""" error = self.setpoint - (self.prev_measurement or 0) return { 'setpoint': self.setpoint, 'error': error, 'p_term': self.kp * error, 'i_term': self.integral, 'd_term': self.kd * self.filtered_deriv, 'output': self.output, } Методы настройки коэффициентовМетод 1: Инженерная настройка (руками)Алгоритм по шагам: Ki = 0, Kd = 0. Постепенно увеличиваем Kp до появления устойчивых колебаний (нарастающих) — это Kc. Уменьшаем Kp до 0.5 × Kc. Медленно увеличиваем Ki. Он должен устранить остаточную ошибку. Если появились колебания — Ki велик. Если нужно улучшить реакцию и уменьшить перерегулирование — добавляем Kd. Часто тепловые объекты обходятся без D. Метод 2: Циглера-Николса (классика)Установить Ki = 0, Kd = 0 Увеличивать Kp до возникновения незатухающих колебаний Записать: Ku — критический коэффициент, Tu — период колебаний Тип Kp Ki Kd Только P 0.5 × Ku — — PI 0.45 × Ku 0.54 × Ku / Tu — ПИД 0.6 × Ku 1.2 × Ku / Tu 0.075 × Ku × Tu Результат: Обычно агрессивный, дающий ~25% перерегулирования. Хорош как стартовая точка. Метод 3: Ступенчатый отклик (Step Response)Более безопасный метод — не нужно доводить до автоколебаний: Перевести систему в устойчивое состояние Скачком изменить управляющий сигнал на 10–20% Записать переходный процесс import numpy as np import matplotlib.pyplot as plt from scipy.optimize import curve_fit def fit_first_order_plus_delay(t, data, step_size): """ Аппроксимация объекта управления моделью первого порядка с запаздыванием: G(s) = K × exp(-L×s) / (T×s + 1) """ # Нормализация данных y_norm = (data - data[0]) / step_size # Начальные параметры: K, T, L K_init = y_norm[-1] # Статический коэффициент усиления T_init = t[np.argmin(np.abs(y_norm - 0.63 * K_init))] L_init = t[np.argmin(np.abs(y_norm - 0.05 * K_init))] def model(t, K, T, L): result = np.zeros_like(t) for i, ti in enumerate(t): if ti > L: result[i] = K * (1 - np.exp(-(ti - L) / T)) return result try: popt, _ = curve_fit(model, t, y_norm, p0=[K_init, T_init, max(L_init, 0.001)], bounds=([0, 0, 0], [np.inf, np.inf, np.inf])) K, T, L = popt return K, T, L except RuntimeError: return K_init, T_init, max(L_init, 0.001) def imc_tuning(K, T, L, lambda_factor=1.0): """ IMC (Internal Model Control) настройка — лучший метод для объектов с запаздыванием. lambda_factor: чем больше — тем медленнее, но стабильнее (рекомендуется L...3L) """ lambda_c = lambda_factor * max(L, 0.1 * T) Kp = (2 * T + L) / (2 * K * (lambda_c + L)) Ti = T + L / 2 # Постоянная интегрирования Td = T * L / (2 * T + L) # Постоянная дифференцирования Ki = Kp / Ti Kd = Kp * Td return { 'Kp': round(Kp, 4), 'Ki': round(Ki, 4), 'Kd': round(Kd, 4), 'Ti': round(Ti, 4), 'Td': round(Td, 4), } # Пример использования: # K=2.0 (усиление объекта), T=30с (постоянная времени), L=5с (запаздывание) params = imc_tuning(K=2.0, T=30.0, L=5.0, lambda_factor=1.0) print(f"Kp={params['Kp']}, Ki={params['Ki']}, Kd={params['Kd']}") Симуляция и проверка настройкиПеред применением в реальной системе — всегда симулируйте: import numpy as np import matplotlib.pyplot as plt def simulate_pid(kp, ki, kd, setpoint, sim_time=200, ts=0.1, K=2.0, T=30.0, L=5.0, noise_std=0.1): """ Симуляция ПИД с объектом первого порядка + запаздывание. Возвращает массивы времени, выходного значения, управляющего сигнала. """ steps = int(sim_time / ts) delay_steps = int(L / ts) t = np.zeros(steps) pv = np.zeros(steps) u = np.zeros(steps) sp = np.zeros(steps) pid = PIDController(kp, ki, kd, ts, out_min=0.0, out_max=100.0) # Ступенчатое изменение уставки sp[:steps//4] = 0 sp[steps//4:] = setpoint # Буфер запаздывания u_delayed = np.zeros(delay_steps + 1) for i in range(1, steps): t[i] = i * ts sp_now = sp[i] pid.set_setpoint(sp_now) # Добавляем шум измерения noise = np.random.normal(0, noise_std) # Управляющий сигнал u[i] = pid.compute(pv[i-1] + noise) # Обновляем буфер запаздывания u_delayed = np.roll(u_delayed, 1) u_delayed[0] = u[i] # Объект управления: первый порядок с запаздыванием # dy/dt = (K × u_delayed - y) / T pv[i] = pv[i-1] + ts * (K * u_delayed[-1] - pv[i-1]) / T return t, pv, u, sp # Сравнение разных настроек fig, axes = plt.subplots(2, 1, figsize=(12, 8)) configs = [ {'kp': 0.5, 'ki': 0.01, 'kd': 2.0, 'label': 'IMC (lambda=1)', 'color': 'blue'}, {'kp': 1.5, 'ki': 0.05, 'kd': 5.0, 'label': 'Агрессивный', 'color': 'red'}, {'kp': 0.2, 'ki': 0.005, 'kd': 0.5, 'label': 'Консервативный', 'color': 'green'}, ] for cfg in configs: t, pv, u, sp = simulate_pid( kp=cfg['kp'], ki=cfg['ki'], kd=cfg['kd'], setpoint=60.0 ) axes[0].plot(t, pv, label=cfg['label'], color=cfg['color']) axes[1].plot(t, u, color=cfg['color'], alpha=0.7) axes[0].plot(t, sp, 'k--', label='Уставка', linewidth=2) axes[0].set_ylabel('Температура, °C') axes[0].set_title('Сравнение настроек ПИД-регулятора') axes[0].legend() axes[0].grid(True) axes[1].set_xlabel('Время, с') axes[1].set_ylabel('Управляющий сигнал, %') axes[1].grid(True) plt.tight_layout() plt.savefig('pid_comparison.png', dpi=150) Специальные случаи и решенияКаскадный ПИД (Cascade Control)Используется когда одного ПИД недостаточно. Классический пример: температурный реактор с нагревателем и теплоносителем. Внешний ПИД (медленный): SP температуры → [ПИД_темп] → SP расхода теплоносителя Внутренний ПИД (быстрый): SP расхода → [ПИД_расход] → Управление клапаном Внутренний контур в 5–10 раз быстрее внешнего. Преимущество: внутренний контур компенсирует возмущения по теплоносителю до того, как они повлияют на температуру. class CascadePID: """Каскадный регулятор с двумя ПИД""" def __init__(self, outer_pid: PIDController, inner_pid: PIDController): self.outer = outer_pid self.inner = inner_pid def compute(self, outer_measurement: float, inner_measurement: float) -> float: """ outer_measurement — медленная переменная (температура) inner_measurement — быстрая переменная (расход) """ # Внешний контур вырабатывает уставку для внутреннего inner_setpoint = self.outer.compute(outer_measurement) # Ограничиваем уставку внутреннего контура inner_setpoint = max(0.0, min(100.0, inner_setpoint)) self.inner.set_setpoint(inner_setpoint) # Внутренний контур управляет исполнительным устройством return self.inner.compute(inner_measurement) ПИД с ограничением скорости изменения выхода (Rate Limiting)Некоторые исполнительные устройства не любят резких скачков (например, регулирующие клапана — быстрое перемещение вызывает гидроудары): def rate_limited_output(new_output: float, prev_output: float, max_rate: float, ts: float) -> float: """ Ограничивает скорость изменения выходного сигнала. max_rate — максимальное изменение в секунду (например, 10%/сек) """ max_change = max_rate * ts change = new_output - prev_output change = max(-max_change, min(max_change, change)) return prev_output + change ПИД для нестационарных объектов (Gain Scheduling)Если параметры объекта меняются в зависимости от рабочей точки (например, нагрев при низкой температуре работает иначе чем при высокой): class GainSchedulingPID: """ПИД с переключением коэффициентов в зависимости от уставки""" def __init__(self): # Коэффициенты для разных диапазонов температур self.schedules = [ # (max_sp, kp, ki, kd) (50.0, 2.0, 0.05, 1.0), # Низкая температура (100.0, 1.5, 0.03, 0.8), # Средняя (200.0, 1.0, 0.02, 0.5), # Высокая (объект другой!) ] self.pid = PIDController(kp=2.0, ki=0.05, kd=1.0, ts=1.0) def update_gains(self, setpoint: float): for max_sp, kp, ki, kd in self.schedules: if setpoint <= max_sp: self.pid.kp = kp self.pid.ki = ki self.pid.kd = kd break def compute(self, setpoint: float, measurement: float) -> float: self.update_gains(setpoint) self.pid.set_setpoint(setpoint) return self.pid.compute(measurement) Реализация на C для встраиваемых систем#include <stdint.h> #include <float.h> typedef struct { float kp; float ki; float kd; float ts; // Период дискретизации, секунды float out_min; float out_max; float filter_coeff; // Коэффициент фильтра производной (0.01..0.2) // Состояние float setpoint; float integral; float prev_measurement; float filtered_deriv; float output; uint8_t first_run; } PIDState; void PID_Init(PIDState *pid, float kp, float ki, float kd, float ts, float out_min, float out_max) { pid->kp = kp; pid->ki = ki; pid->kd = kd; pid->ts = ts; pid->out_min = out_min; pid->out_max = out_max; pid->filter_coeff = 0.1f; pid->setpoint = 0.0f; pid->integral = 0.0f; pid->prev_measurement = 0.0f; pid->filtered_deriv = 0.0f; pid->output = 0.0f; pid->first_run = 1; } void PID_SetSetpoint(PIDState *pid, float sp) { pid->setpoint = sp; } void PID_Reset(PIDState *pid) { pid->integral = 0.0f; pid->filtered_deriv = 0.0f; pid->first_run = 1; } float PID_Compute(PIDState *pid, float measurement) { float error, p_term, d_term, output_raw; float raw_deriv, integral_new; if (pid->first_run) { pid->prev_measurement = measurement; pid->first_run = 0; } error = pid->setpoint - measurement; // Пропорциональная p_term = pid->kp * error; // Интегральная (предварительный расчёт) integral_new = pid->integral + pid->ki * error * pid->ts; // Дифференциальная по измерению raw_deriv = -(measurement - pid->prev_measurement) / pid->ts; // Фильтр производной (IIR первого порядка) pid->filtered_deriv = pid->filter_coeff * raw_deriv + (1.0f - pid->filter_coeff) * pid->filtered_deriv; d_term = pid->kd * pid->filtered_deriv; // Суммируем output_raw = p_term + integral_new + d_term; // Ограничение выхода if (output_raw > pid->out_max) pid->output = pid->out_max; else if (output_raw < pid->out_min) pid->output = pid->out_min; else pid->output = output_raw; // Anti-windup: обновляем интеграл только если нет насыщения if (pid->output == output_raw) { pid->integral = integral_new; } else { // Back-calculation float saturation = output_raw - pid->output; pid->integral = integral_new - saturation; } pid->prev_measurement = measurement; return pid->output; } // Использование на STM32/Arduino (в прерывании таймера): PIDState temp_pid; void PID_ISR(void) // Вызывается каждые 100 мс { float measured_temp = read_temperature_sensor(); float heater_power = PID_Compute(&temp_pid, measured_temp); // Перевести в ШИМ 0-255 uint8_t pwm = (uint8_t)(heater_power * 255.0f / 100.0f); set_heater_pwm(pwm); } Диагностика плохой работы ПИДСимптом: Медленный выход на уставку, большая статическая ошибкаМало Kp и/или Ki Проверьте нет ли механических ограничений (клапан не открывается полностью) Проверьте масштабирование — правильно ли в % интерпретируется выход? Симптом: Сильные устойчивые колебанияKp слишком велик Ki слишком велик (самая частая причина!) Период дискретизации слишком большой относительно динамики объекта Симптом: Большое перерегулирование при изменении уставкиВелико Kp, увеличьте Kd Используйте "setpoint weighting": P-составляющую считайте как Kp × (w × SP – PV), где w < 1 Симптом: Шумный выход, дёргает исполнительный механизмШум датчика, увеличьте filter_coeff для фильтрации D Уменьшите Kd Добавьте мёртвую зону: не реагировать если |error| < deadband ЗаключениеПИД-регулятор — это не просто формула, это искусство. Одни и те же Kp, Ki, Kd дадут отличные результаты на одном объекте и катастрофу на другом. Понимание физики объекта управления важнее знания формул. Практический совет: начинайте с IMC-метода настройки — он даёт предсказуемые результаты и физический смысл у параметра lambda (желаемая скорость реакции). Используйте симуляцию перед внедрением. Добавляйте anti-windup всегда — без него реальная система будет вести себя непредсказуемо после насыщения. И помните: 80% задач управления решается PI-регулятором (без D). D добавляйте только когда PI действительно недостаточно и когда шум измерений не является проблемой.
-
Arduino в реальных проектах: от мигания светодиодом до промышленного прототипа
Честный разговор об ArduinoArduino получил репутацию "игрушки для начинающих". Это несправедливо. Arduino — это доступная платформа с огромной экосистемой, которая используется в реальных устройствах, промышленных прототипах и мелкосерийном производстве. Да, у неё есть ограничения: 8-битный AVR, 16 МГц, 2 КБ RAM. Но для огромного класса задач этого более чем достаточно. Умный контроллер теплицы, анализатор вибраций, узел сбора данных, промышленный таймер — Arduino справится. Проблема не в платформе. Проблема в том, как большинство людей пишут код для Arduino. Сегодня покажу как писать Arduino-код правильно — как настоящие инженеры. Главная ошибка: delay()delay() — это яд для Arduino. Пока выполняется delay(1000), микроконтроллер ничего не делает. Не обрабатывает кнопки. Не читает датчики. Не обновляет выходы. Он просто спит. В реальном устройстве с несколькими задачами это неприемлемо. Правильный подход: millis() и конечный автомат// ПЛОХО — как делают 90% новичков: void loop() { digitalWrite(LED1, HIGH); delay(500); digitalWrite(LED1, LOW); delay(500); // Пока мигает LED1 — ничего другого не происходит! } // ХОРОШО — неблокирующий код: class BlinkTask { private: uint8_t pin; uint32_t interval; uint32_t lastToggle; bool state; public: BlinkTask(uint8_t _pin, uint32_t _interval) : pin(_pin), interval(_interval), lastToggle(0), state(false) {} void begin() { pinMode(pin, OUTPUT); } void update() { uint32_t now = millis(); if (now - lastToggle >= interval) { lastToggle = now; state = !state; digitalWrite(pin, state); } } }; BlinkTask led1(13, 500); // LED на пине 13, период 500 мс BlinkTask led2(12, 200); // LED на пине 12, период 200 мс void setup() { led1.begin(); led2.begin(); } void loop() { led1.update(); // Каждый вызов занимает микросекунды led2.update(); // Оба LED мигают независимо! // Здесь можно добавить ещё десятки задач } Антидребезг кнопок: правильная реализацияФизическая кнопка при нажатии/отпускании "дребезжит" — за несколько миллисекунд создаёт 10–50 ложных переключений. Простой delay(50) тут не поможет — он заблокирует программу. class Button { private: uint8_t pin; uint8_t currentState; uint8_t lastRawState; uint32_t lastChangeTime; uint32_t debounceTime; // Callbacks void (*onPress)(); void (*onRelease)(); void (*onLongPress)(); uint32_t pressTime; uint32_t longPressTime; bool longPressTriggered; public: Button(uint8_t _pin, uint32_t _debounce = 50, uint32_t _longPress = 1000) : pin(_pin), currentState(HIGH), lastRawState(HIGH), lastChangeTime(0), debounceTime(_debounce), onPress(nullptr), onRelease(nullptr), onLongPress(nullptr), pressTime(0), longPressTime(_longPress), longPressTriggered(false) {} void begin() { pinMode(pin, INPUT_PULLUP); // Внутренняя подтяжка, нажатие → LOW } void setOnPress(void (*cb)()) { onPress = cb; } void setOnRelease(void (*cb)()) { onRelease = cb; } void setOnLongPress(void (*cb)()){ onLongPress = cb; } bool isPressed() { return currentState == LOW; } void update() { uint32_t now = millis(); uint8_t rawState = digitalRead(pin); // Антидребезг: состояние должно держаться debounceTime мс if (rawState != lastRawState) { lastChangeTime = now; lastRawState = rawState; } if (now - lastChangeTime >= debounceTime) { if (rawState != currentState) { currentState = rawState; if (currentState == LOW) { // Нажата pressTime = now; longPressTriggered = false; if (onPress) onPress(); } else { // Отпущена if (onRelease) onRelease(); } } } // Проверка длинного нажатия if (currentState == LOW && !longPressTriggered) { if (now - pressTime >= longPressTime) { longPressTriggered = true; if (onLongPress) onLongPress(); } } } }; // Использование: Button btn1(2); Button btn2(3); int counter = 0; void onBtn1Press() { Serial.println("Кнопка 1 нажата"); counter++; } void onBtn1LongPress() { Serial.println("Длинное нажатие! Сброс"); counter = 0; } void onBtn2Press() { Serial.println("Кнопка 2 нажата"); counter--; } void setup() { Serial.begin(115200); btn1.begin(); btn2.begin(); btn1.setOnPress(onBtn1Press); btn1.setOnLongPress(onBtn1LongPress); btn2.setOnPress(onBtn2Press); } void loop() { btn1.update(); btn2.update(); // ... другие задачи } Конечный автомат (State Machine): управление процессомКонечный автомат — правильный способ описывать сложное поведение без вложенных if-else и флагов. Пример: автоматическая стиральная машина (упрощённо): enum WasherState { STATE_IDLE, STATE_FILL_WATER, STATE_WASH, STATE_DRAIN, STATE_SPIN, STATE_COMPLETE, STATE_ERROR }; enum WasherEvent { EVENT_START, EVENT_WATER_FULL, EVENT_WASH_DONE, EVENT_DRAIN_DONE, EVENT_SPIN_DONE, EVENT_ERROR, EVENT_RESET }; class WashingMachine { private: WasherState state; uint32_t stateEnterTime; uint32_t washDuration; uint32_t spinDuration; // Пины static const uint8_t PIN_VALVE = 4; // Клапан залива воды static const uint8_t PIN_PUMP = 5; // Насос откачки static const uint8_t PIN_MOTOR = 6; // Двигатель барабана static const uint8_t PIN_LED_RUN = 7; // Индикатор работы static const uint8_t PIN_SENSOR_FULL = 8; // Датчик полного бака void enterState(WasherState newState) { state = newState; stateEnterTime = millis(); // Действия при входе в состояние switch (state) { case STATE_IDLE: allOff(); Serial.println("Ожидание..."); break; case STATE_FILL_WATER: digitalWrite(PIN_VALVE, HIGH); Serial.println("Заполнение водой..."); break; case STATE_WASH: digitalWrite(PIN_VALVE, LOW); digitalWrite(PIN_MOTOR, HIGH); Serial.println("Стирка..."); break; case STATE_DRAIN: digitalWrite(PIN_MOTOR, LOW); digitalWrite(PIN_PUMP, HIGH); Serial.println("Слив воды..."); break; case STATE_SPIN: digitalWrite(PIN_PUMP, LOW); // Быстрое вращение для отжима analogWrite(PIN_MOTOR, 200); // 78% мощности Serial.println("Отжим..."); break; case STATE_COMPLETE: allOff(); Serial.println("Стирка завершена!"); break; case STATE_ERROR: allOff(); Serial.println("АВАРИЯ!"); break; } } void allOff() { digitalWrite(PIN_VALVE, LOW); digitalWrite(PIN_PUMP, LOW); digitalWrite(PIN_MOTOR, LOW); } public: WashingMachine(uint32_t _washMin = 30, uint32_t _spinMin = 5) : state(STATE_IDLE), washDuration(_washMin * 60000UL), spinDuration(_spinMin * 60000UL) {} void begin() { pinMode(PIN_VALVE, OUTPUT); pinMode(PIN_PUMP, OUTPUT); pinMode(PIN_MOTOR, OUTPUT); pinMode(PIN_LED_RUN, OUTPUT); pinMode(PIN_SENSOR_FULL, INPUT_PULLUP); allOff(); } void sendEvent(WasherEvent event) { switch (state) { case STATE_IDLE: if (event == EVENT_START) enterState(STATE_FILL_WATER); break; case STATE_FILL_WATER: if (event == EVENT_WATER_FULL) enterState(STATE_WASH); if (event == EVENT_ERROR) enterState(STATE_ERROR); break; case STATE_WASH: if (event == EVENT_WASH_DONE) enterState(STATE_DRAIN); if (event == EVENT_ERROR) enterState(STATE_ERROR); break; case STATE_DRAIN: if (event == EVENT_DRAIN_DONE) enterState(STATE_SPIN); if (event == EVENT_ERROR) enterState(STATE_ERROR); break; case STATE_SPIN: if (event == EVENT_SPIN_DONE) enterState(STATE_COMPLETE); break; case STATE_ERROR: if (event == EVENT_RESET) enterState(STATE_IDLE); break; default: break; } } // Вызывать в loop() каждый цикл void update() { uint32_t elapsed = millis() - stateEnterTime; // Индикатор работы bool running = (state != STATE_IDLE && state != STATE_COMPLETE && state != STATE_ERROR); digitalWrite(PIN_LED_RUN, running ? (millis() % 500 < 250) : LOW); switch (state) { case STATE_FILL_WATER: // Проверяем датчик уровня if (digitalRead(PIN_SENSOR_FULL) == LOW) { sendEvent(EVENT_WATER_FULL); } // Таймаут заполнения 10 минут if (elapsed > 600000UL) { sendEvent(EVENT_ERROR); } break; case STATE_WASH: if (elapsed >= washDuration) { sendEvent(EVENT_WASH_DONE); } break; case STATE_DRAIN: // 3 минуты на слив if (elapsed >= 180000UL) { sendEvent(EVENT_DRAIN_DONE); } break; case STATE_SPIN: if (elapsed >= spinDuration) { sendEvent(EVENT_SPIN_DONE); } break; default: break; } } WasherState getState() { return state; } }; WashingMachine washer(30, 5); // 30 мин стирка, 5 мин отжим Button startBtn(2); void setup() { Serial.begin(115200); washer.begin(); startBtn.begin(); startBtn.setOnPress([]() { washer.sendEvent(EVENT_START); }); } void loop() { washer.update(); startBtn.update(); } ПИД-регулятор: управление температуройclass PIDController { private: float kp, ki, kd; float setpoint; float integral; float prevError; float integralLimit; float outputMin, outputMax; uint32_t lastTime; public: PIDController(float _kp, float _ki, float _kd, float _outMin = 0.0f, float _outMax = 100.0f) : kp(_kp), ki(_ki), kd(_kd), setpoint(0), integral(0), prevError(0), integralLimit(50.0f), outputMin(_outMin), outputMax(_outMax), lastTime(0) {} void setSetpoint(float sp) { setpoint = sp; } void reset() { integral = 0; prevError = 0; } float compute(float measured) { uint32_t now = millis(); float dt = (now - lastTime) / 1000.0f; // Секунды lastTime = now; if (dt <= 0 || dt > 1.0f) dt = 0.1f; // Защита от аномальных dt float error = setpoint - measured; // Интегральная составляющая с ограничением (anti-windup) integral += error * dt; integral = constrain(integral, -integralLimit, integralLimit); // Производная (фильтруем шум — берём производную измерения, не ошибки) float derivative = -(measured - prevError) / dt; // -d(PV)/dt prevError = measured; float output = kp * error + ki * integral + kd * derivative; return constrain(output, outputMin, outputMax); } }; // Применение: ПИД-термостат с твёрдотельным реле (SSR) const uint8_t PIN_HEATER = 9; // ШИМ-выход на SSR const uint8_t PIN_TEMP_SCL = A4; // I2C — датчик температуры const uint8_t PIN_TEMP_SDA = A5; PIDController tempPID(2.0f, 0.5f, 1.0f, 0.0f, 255.0f); float readTemperature() { // Здесь чтение датчика DS18B20 или термопары MAX6675 // Возвращаем демо-значение: return 25.0f + random(-10, 10) / 10.0f; } void setup() { Serial.begin(115200); pinMode(PIN_HEATER, OUTPUT); tempPID.setSetpoint(75.0f); // Уставка 75°C } void loop() { static uint32_t lastPID = 0; if (millis() - lastPID >= 500) { // ПИД каждые 500 мс lastPID = millis(); float temp = readTemperature(); float output = tempPID.compute(temp); analogWrite(PIN_HEATER, (uint8_t)output); Serial.print("T="); Serial.print(temp, 1); Serial.print("°C SP=75°C OUT="); Serial.print((int)output * 100 / 255); Serial.println("%"); } } Modbus RTU Slave на Arduino#include <SoftwareSerial.h> // RS-485 через MAX485 SoftwareSerial rs485(10, 11); // RX, TX const uint8_t DE_RE_PIN = 4; // Driver Enable / Receiver Enable // Данные устройства (10 Holding Registers) uint16_t holdingRegs[10] = {0}; uint16_t inputRegs[10] = {0}; const uint8_t DEVICE_ADDRESS = 1; uint16_t crc16(uint8_t *buf, uint16_t len) { uint16_t crc = 0xFFFF; for (uint16_t i = 0; i < len; i++) { crc ^= buf[i]; for (uint8_t j = 0; j < 8; j++) { crc = (crc & 1) ? ((crc >> 1) ^ 0xA001) : (crc >> 1); } } return crc; } void rs485Send(uint8_t *data, uint8_t len) { digitalWrite(DE_RE_PIN, HIGH); // Режим передачи delayMicroseconds(100); rs485.write(data, len); rs485.flush(); delayMicroseconds(100); digitalWrite(DE_RE_PIN, LOW); // Режим приёма } void processModbus(uint8_t *req, uint8_t len) { if (req[0] != DEVICE_ADDRESS) return; uint16_t rxCRC = (req[len-1] << 8) | req[len-2]; if (crc16(req, len-2) != rxCRC) return; uint8_t resp[64]; uint8_t rlen = 0; uint8_t fc = req[1]; uint16_t addr = (req[2] << 8) | req[3]; uint16_t count = (req[4] << 8) | req[5]; resp[rlen++] = DEVICE_ADDRESS; resp[rlen++] = fc; if (fc == 0x03 && addr + count <= 10) { // Read Holding Registers resp[rlen++] = count * 2; for (uint16_t i = 0; i < count; i++) { resp[rlen++] = holdingRegs[addr + i] >> 8; resp[rlen++] = holdingRegs[addr + i] & 0xFF; } } else if (fc == 0x04 && addr + count <= 10) { // Read Input Registers resp[rlen++] = count * 2; for (uint16_t i = 0; i < count; i++) { resp[rlen++] = inputRegs[addr + i] >> 8; resp[rlen++] = inputRegs[addr + i] & 0xFF; } } else if (fc == 0x06 && addr < 10) { // Write Single Register holdingRegs[addr] = (req[4] << 8) | req[5]; memcpy(resp + 2, req + 2, 4); rlen += 4; } else { resp[1] |= 0x80; resp[rlen++] = (fc == 0x03 || fc == 0x04 || fc == 0x06) ? 0x02 : 0x01; } uint16_t c = crc16(resp, rlen); resp[rlen++] = c & 0xFF; resp[rlen++] = c >> 8; rs485Send(resp, rlen); } void setup() { Serial.begin(115200); rs485.begin(9600); pinMode(DE_RE_PIN, OUTPUT); digitalWrite(DE_RE_PIN, LOW); // Режим приёма по умолчанию } void loop() { // Обновляем Input Registers реальными данными inputRegs[0] = analogRead(A0); // Аналоговый вход 0-1023 inputRegs[1] = (uint16_t)(millis() / 1000); // Uptime в секундах // Принимаем Modbus-запросы static uint8_t rxBuf[64]; static uint8_t rxLen = 0; static uint32_t lastByte = 0; while (rs485.available()) { if (rxLen < sizeof(rxBuf)) { rxBuf[rxLen++] = rs485.read(); } lastByte = millis(); } // Конец фрейма — пауза 3.5 символа (при 9600 бод ~4 мс) if (rxLen > 0 && millis() - lastByte > 4) { processModbus(rxBuf, rxLen); rxLen = 0; } } Работа с несколькими датчиками I2C#include <Wire.h> #include <Adafruit_BMP280.h> #include <Adafruit_SHT31.h> #include <RTC_DS3231.h> Adafruit_BMP280 bmp; Adafruit_SHT31 sht; RTC_DS3231 rtc; struct SensorData { float temperature_bmp; float pressure_hpa; float temperature_sht; float humidity; uint32_t timestamp; bool bmp_ok; bool sht_ok; }; SensorData readAllSensors() { SensorData data = {0}; data.timestamp = rtc.now().unixtime(); // BMP280: давление и температура if (bmp.begin(0x76)) { data.temperature_bmp = bmp.readTemperature(); data.pressure_hpa = bmp.readPressure() / 100.0f; data.bmp_ok = true; } // SHT31: температура и влажность if (sht.begin(0x44)) { data.temperature_sht = sht.readTemperature(); data.humidity = sht.readHumidity(); data.sht_ok = (!isnan(data.temperature_sht)); } return data; } void printSensorData(const SensorData& d) { Serial.print("T1="); Serial.print(d.temperature_bmp, 1); Serial.print("°C "); Serial.print("P="); Serial.print(d.pressure_hpa, 1); Serial.print("гПа "); Serial.print("T2="); Serial.print(d.temperature_sht, 1); Serial.print("°C "); Serial.print("H="); Serial.print(d.humidity, 1); Serial.println("%"); } void setup() { Serial.begin(115200); Wire.begin(); Wire.setClock(400000); // Fast mode 400 кГц if (!bmp.begin(0x76)) Serial.println("BMP280 не найден!"); if (!sht.begin(0x44)) Serial.println("SHT31 не найден!"); if (!rtc.begin()) Serial.println("DS3231 не найден!"); } void loop() { static uint32_t lastRead = 0; if (millis() - lastRead >= 5000) { // Каждые 5 секунд lastRead = millis(); SensorData data = readAllSensors(); printSensorData(data); } } Запись данных на SD-карту#include <SPI.h> #include <SD.h> const uint8_t SD_CS_PIN = 10; bool sdAvailable = false; void sdInit() { sdAvailable = SD.begin(SD_CS_PIN); if (!sdAvailable) { Serial.println("SD-карта не найдена!"); } else { Serial.println("SD-карта готова."); } } void logData(const String& filename, float temp, float humidity, uint32_t timestamp) { if (!sdAvailable) return; File file = SD.open(filename, FILE_WRITE); if (!file) { Serial.println("Ошибка открытия файла!"); return; } // CSV-формат file.print(timestamp); file.print(','); file.print(temp, 2); file.print(','); file.println(humidity, 2); file.close(); } // Создание нового файла каждый день String getDailyFilename(uint32_t unixtime) { // Простой расчёт дня uint32_t day = unixtime / 86400; return "LOG_" + String(day) + ".CSV"; } Советы по надёжности Arduino-проектов1. Сторожевой таймер (Watchdog)#include <avr/wdt.h> void setup() { wdt_enable(WDTO_2S); // Сброс если loop() не выполняется 2 секунды } void loop() { wdt_reset(); // Сброс таймера — "я ещё живой" // ... ваш код } 2. EEPROM для сохранения настроек#include <EEPROM.h> struct Settings { float setpoint; uint16_t interval; uint8_t mode; uint16_t checksum; }; void saveSettings(const Settings& s) { Settings toSave = s; // Простая контрольная сумма toSave.checksum = (uint16_t)(s.setpoint * 100) + s.interval + s.mode; EEPROM.put(0, toSave); } bool loadSettings(Settings& s) { EEPROM.get(0, s); uint16_t expected = (uint16_t)(s.setpoint * 100) + s.interval + s.mode; return (s.checksum == expected); // Данные корректны? } 3. Ограничение частоты Serial// НЕ ПИШИТЕ В Serial КАЖДЫЙ ЦИКЛ LOOP! // При 115200 бод запись 100 байт занимает ~8мс void printPeriodic(float value) { static uint32_t lastPrint = 0; if (millis() - lastPrint >= 200) { // Максимум 5 раз в секунду lastPrint = millis(); Serial.println(value); } } ЗаключениеArduino — это не учебная игрушка, это платформа. Разница между "поделкой" и "устройством" — не в железе, а в качестве кода. Ключевые принципы: никогда не используйте delay() в итоговом устройстве, структурируйте код как набор независимых задач через millis(), используйте конечные автоматы для сложной логики, добавляйте watchdog timer и защиты, документируйте код. С этими принципами Arduino становится полноценным инструментом для создания надёжных устройств — от умного дома до промышленных узлов сбора данных.
-
STM32 с нуля: практическое руководство для тех, кто вырос из Arduino
Почему STM32, а не продолжать на ArduinoArduino — отличный старт. Но в какой-то момент вы упираетесь в потолок: скорость 16 МГц не хватает, Flash/RAM заканчивается, нужны возможности которых у AVR нет — несколько UART, USB Device, Ethernet MAC, криптоускоритель, DSP-инструкции. STM32 — это семейство 32-битных микроконтроллеров от STMicroelectronics на базе ядер ARM Cortex-M. Характеристики даже бюджетного STM32F103C8T6 ("Blue Pill"): Параметр Arduino Uno (ATmega328P) STM32F103C8T6 Ядро AVR 8-бит ARM Cortex-M3 32-бит Тактовая частота 16 МГц 72 МГц Flash 32 КБ 64 КБ RAM 2 КБ 20 КБ GPIO 23 37 ADC 6 × 10-бит 10 × 12-бит Таймеры 3 7 SPI / I2C / UART 1 / 1 / 1 2 / 2 / 3 USB Нет Full-Speed USB 2.0 Цена ~$3–5 (оригинал) ~$0.8–2 И это самый простой STM32. Линейки F4, F7, H7 — ещё на порядок мощнее. Семейства STM32: как не запутатьсяSTM32 делится на несколько линеек по ядру и позиционированию: STM32F0/F1/F3 — Базовые (Cortex-M0/M3/M4) F103: самый популярный, "Blue Pill", 72 МГц — идеален для старта F303: F3 с матфлоатом и операционными усилителями внутри F030/F042: ультрадешёвые, от $0.3, для массового производства STM32F4 — Производительные (Cortex-M4F с FPU) F401/F411: 84–100 МГц, USB, хороший баланс F407/F429: 168 МГц, Ethernet MAC, FMC для внешней SDRAM, камеры Популярны для DSP-задач, аудио, обработки изображений STM32F7/H7 — Высокопроизводительные (Cortex-M7) H743: 480 МГц, двойная точность float, L1-кэш, умереть не встать Используются в промышленных системах реального времени STM32L — Низкое энергопотребление (Low Power) L051/L071: ток в sleep < 1 мкА, для батарейных устройств STM32G/U — Новые серии (2019–2022) G431/G474: отличные для силовой электроники (Timer1 с мёртвым временем) U5: Cortex-M33 с TrustZone, IoT-безопасность Рекомендации для старта: STM32F103C8T6 (Blue Pill) или STM32G031 для новых проектов. Настройка среды разработкиSTM32CubeIDE + STM32CubeMXSTM32CubeIDE — официальная бесплатная среда от ST. Включает: Eclipse-based IDE Компилятор GCC ARM OpenOCD для программирования/отладки Встроенный STM32CubeMX для генерации кода инициализации Установка: Скачать STM32CubeIDE с сайта st.com (требует регистрации, бесплатно) Установить, выбрать пакеты для нужных семейств Подключить программатор ST-Link V2 ($3–5 на AliExpress) Первое подключение (Blue Pill → ST-Link V2):ST-Link V2 Blue Pill SWDIO → PA13 SWCLK → PA14 GND → GND 3.3V → 3V3 Важно: На оригинальных Blue Pill загрузчик прошит неправильно. Для работы с ST-Link через SWD это не проблема — программируем напрямую в flash. HAL vs LL: что выбратьSTMicroelectronics предоставляет два уровня библиотек: HAL (Hardware Abstraction Layer): Высокоуровневый, максимально переносимый код Автоматически генерируется CubeMX Проще в использовании, больше overhead Рекомендуется для большинства проектов LL (Low Layer): Тонкие обёртки над регистрами, почти без overhead Максимальная производительность и предсказуемость Нужно хорошее знание периферии Для критичного по времени кода Смешанный подход (лучший для опытных): HAL для инициализации (CubeMX генерирует) LL для критичных по времени операций в прерываниях GPIO: мигаем светодиодом правильноCubeMX генерирует такой код инициализации GPIO: // Автосгенерированный код CubeMX static void MX_GPIO_Init(void) { GPIO_InitTypeDef GPIO_InitStruct = {0}; __HAL_RCC_GPIOC_CLK_ENABLE(); // Включаем тактирование порта C // Настройка PC13 как выход (встроенный LED на Blue Pill) GPIO_InitStruct.Pin = GPIO_PIN_13; GPIO_InitStruct.Mode = GPIO_MODE_OUTPUT_PP; // Push-Pull выход GPIO_InitStruct.Pull = GPIO_NOPULL; GPIO_InitStruct.Speed = GPIO_SPEED_FREQ_LOW; // 2 МГц, достаточно для LED HAL_GPIO_Init(GPIOC, &GPIO_InitStruct); // Входной сигнал на PA0 (кнопка) GPIO_InitStruct.Pin = GPIO_PIN_0; GPIO_InitStruct.Mode = GPIO_MODE_INPUT; GPIO_InitStruct.Pull = GPIO_PULLUP; // Внутренний pull-up HAL_GPIO_Init(GPIOA, &GPIO_InitStruct); } Управление GPIO в программе: // Включить / выключить / переключить HAL_GPIO_WritePin(GPIOC, GPIO_PIN_13, GPIO_PIN_RESET); // LED on (active low) HAL_GPIO_WritePin(GPIOC, GPIO_PIN_13, GPIO_PIN_SET); // LED off HAL_GPIO_TogglePin(GPIOC, GPIO_PIN_13); // Toggle // Читать состояние входа GPIO_PinState btn = HAL_GPIO_ReadPin(GPIOA, GPIO_PIN_0); if (btn == GPIO_PIN_RESET) { // Кнопка нажата (pull-up, нажатие — к GND) HAL_GPIO_WritePin(GPIOC, GPIO_PIN_13, GPIO_PIN_RESET); } GPIO режимы:GPIO_MODE_OUTPUT_PP — Push-Pull выход (стандартный) GPIO_MODE_OUTPUT_OD — Open-Drain (для I2C, совместимость 5В) GPIO_MODE_INPUT — Вход GPIO_MODE_IT_RISING/FALLING/RISING_FALLING — Вход с прерыванием GPIO_MODE_AF_PP — Альтернативная функция (UART, SPI, Timer...) GPIO_MODE_ANALOG — Аналоговый режим (для ADC/DAC) UART: последовательная связь// Инициализация UART1 на PA9(TX)/PA10(RX), 115200 бод static void MX_USART1_UART_Init(void) { huart1.Instance = USART1; huart1.Init.BaudRate = 115200; huart1.Init.WordLength = UART_WORDLENGTH_8B; huart1.Init.StopBits = UART_STOPBITS_1; huart1.Init.Parity = UART_PARITY_NONE; huart1.Init.Mode = UART_MODE_TX_RX; huart1.Init.HwFlowCtl = UART_HWCONTROL_NONE; huart1.Init.OverSampling = UART_OVERSAMPLING_16; HAL_UART_Init(&huart1); } // Отправка данных (блокирующий режим) char msg[] = "Hello STM32!\r\n"; HAL_UART_Transmit(&huart1, (uint8_t*)msg, strlen(msg), 100); // timeout 100ms // Приём (с таймаутом) uint8_t rx_buf[64]; uint16_t bytes_received; HAL_StatusTypeDef status = HAL_UART_Receive(&huart1, rx_buf, 64, 1000); // printf через UART (настройка retarget) // В файле syscalls.c добавить: int __io_putchar(int ch) { HAL_UART_Transmit(&huart1, (uint8_t*)&ch, 1, 100); return ch; } // После этого можно использовать printf! printf("Температура: %.2f°C\r\n", temperature); Приём через прерывания (правильный подход):// Буфер приёма и флаги #define RX_BUFFER_SIZE 256 static uint8_t rx_buffer[RX_BUFFER_SIZE]; static uint8_t rx_byte; // Один байт для приёма по прерыванию static uint16_t rx_index = 0; static volatile uint8_t line_ready = 0; // Запускаем прием одного байта в прерывании // (вызвать после инициализации и после каждого приёма) void UART_StartReceive(void) { HAL_UART_Receive_IT(&huart1, &rx_byte, 1); } // Callback — вызывается автоматически при приёме байта void HAL_UART_RxCpltCallback(UART_HandleTypeDef *huart) { if (huart->Instance == USART1) { if (rx_byte == '\n' || rx_index >= RX_BUFFER_SIZE - 1) { rx_buffer[rx_index] = '\0'; rx_index = 0; line_ready = 1; // Сигнализируем что строка готова } else if (rx_byte != '\r') { rx_buffer[rx_index++] = rx_byte; } // Запускаем следующий приём HAL_UART_Receive_IT(&huart1, &rx_byte, 1); } } // В главном цикле: if (line_ready) { line_ready = 0; printf("Получено: %s\r\n", rx_buffer); process_command((char*)rx_buffer); } Таймеры: ШИМ и точное времяТаймеры STM32 — мощнейшая периферия. Используются для ШИМ, измерения частоты, генерации прерываний, управления сервоприводами. ШИМ (PWM) для управления яркостью/скоростью:// TIM3, Channel 1, PA6, частота 1 кГц // Настройка через CubeMX, затем в коде: // Запуск ШИМ HAL_TIM_PWM_Start(&htim3, TIM_CHANNEL_1); // Изменение скважности (0–999 для ARR=999) // 500 = 50% скважности __HAL_TIM_SET_COMPARE(&htim3, TIM_CHANNEL_1, 500); // Плавное изменение яркости LED for (int i = 0; i <= 999; i++) { __HAL_TIM_SET_COMPARE(&htim3, TIM_CHANNEL_1, i); HAL_Delay(1); } for (int i = 999; i >= 0; i--) { __HAL_TIM_SET_COMPARE(&htim3, TIM_CHANNEL_1, i); HAL_Delay(1); } Управление сервоприводом (50 Гц, 1–2 мс):// TIM2, 50 Гц (период 20 мс = 20000 тиков при предделителе 72-1) // ARR = 19999, PSC = 71 → 1 тик = 1 мкс #define SERVO_MIN_US 1000 // 1 мс = левый предел #define SERVO_MAX_US 2000 // 2 мс = правый предел #define SERVO_MID_US 1500 // 1.5 мс = центр void servo_set_angle(int angle_degrees) // 0–180 градусов { // Линейное масштабирование угла → ширина импульса uint32_t pulse = SERVO_MIN_US + (uint32_t)(angle_degrees * (SERVO_MAX_US - SERVO_MIN_US) / 180); __HAL_TIM_SET_COMPARE(&htim2, TIM_CHANNEL_1, pulse); } // Использование: servo_set_angle(0); // Левый предел HAL_Delay(1000); servo_set_angle(90); // Центр HAL_Delay(1000); servo_set_angle(180); // Правый предел Прерывание по таймеру (точный период):// Прерывание каждые 1 мс от TIM6 (базовый таймер) // Запуск: HAL_TIM_Base_Start_IT(&htim6); // Callback: void HAL_TIM_PeriodElapsedCallback(TIM_HandleTypeDef *htim) { if (htim->Instance == TIM6) { system_tick_ms++; // Собственный миллисекундный счётчик // Задачи каждые 10 мс if (system_tick_ms % 10 == 0) { adc_trigger_conversion(); } // Задачи каждые 1000 мс if (system_tick_ms % 1000 == 0) { led_heartbeat_toggle(); } } } АЦП: чтение аналоговых сигналов// Одиночное преобразование (блокирующий режим) uint32_t adc_read_single(void) { HAL_ADC_Start(&hadc1); HAL_ADC_PollForConversion(&hadc1, 10); // Ждём не более 10 мс uint32_t value = HAL_ADC_GetValue(&hadc1); HAL_ADC_Stop(&hadc1); return value; // 0–4095 для 12-бит АЦП } // Перевод в напряжение (опорное 3.3В): float adc_to_voltage(uint32_t raw) { return raw * 3.3f / 4095.0f; } // Перевод в температуру для NTC-термистора (10кОм, B=3950): float ntc_to_celsius(uint32_t raw_adc) { float voltage = adc_to_voltage(raw_adc); float resistance = 10000.0f * voltage / (3.3f - voltage); // Делитель с 10кОм // Уравнение Стейнхарта-Харта (упрощённое) float steinhart; steinhart = resistance / 10000.0f; // R/Rnom steinhart = logf(steinhart); // ln(R/Rnom) steinhart /= 3950.0f; // / B steinhart += 1.0f / (25.0f + 273.15f); // + 1/T0 steinhart = 1.0f / steinhart; // Инверсия return steinhart - 273.15f; // Кельвин → Цельсий } АЦП с DMA (несколько каналов, без участия CPU):// Настройка: ADC + DMA, Continuous mode, 4 канала (PA0-PA3) #define ADC_CHANNELS 4 static uint16_t adc_dma_buffer[ADC_CHANNELS]; // Запуск: HAL_ADC_Start_DMA(&hadc1, (uint32_t*)adc_dma_buffer, ADC_CHANNELS); // DMA автоматически обновляет буфер! // В основном цикле просто читаем: float temp = ntc_to_celsius(adc_dma_buffer[0]); float pressure = adc_dma_buffer[1] * 3.3f / 4095.0f; // Callback при завершении преобразований всех каналов: void HAL_ADC_ConvCpltCallback(ADC_HandleTypeDef* hadc) { // Все 4 канала обновлены adc_data_ready = 1; } I2C: подключение датчиков// Пример: датчик давления/температуры BMP280 #define BMP280_ADDR 0x76 << 1 // 7-бит адрес, сдвиг влево для HAL // Читать регистр uint8_t BMP280_ReadReg(uint8_t reg) { uint8_t value; HAL_I2C_Mem_Read(&hi2c1, BMP280_ADDR, // Адрес устройства reg, // Адрес регистра I2C_MEMADD_SIZE_8BIT, &value, // Буфер 1, // Количество байт 100); // Таймаут return value; } // Записать регистр void BMP280_WriteReg(uint8_t reg, uint8_t value) { HAL_I2C_Mem_Write(&hi2c1, BMP280_ADDR, reg, I2C_MEMADD_SIZE_8BIT, &value, 1, 100); } // Читать несколько байт подряд void BMP280_ReadBurst(uint8_t reg, uint8_t *buf, uint8_t len) { HAL_I2C_Mem_Read(&hi2c1, BMP280_ADDR, reg, I2C_MEMADD_SIZE_8BIT, buf, len, 100); } // Инициализация BMP280 void BMP280_Init(void) { // Проверка ID (должен быть 0x60) uint8_t id = BMP280_ReadReg(0xD0); if (id != 0x60) { printf("BMP280 не найден! ID=0x%02X\r\n", id); return; } // Нормальный режим, oversampling ×4 для давления и температуры BMP280_WriteReg(0xF4, 0x97); // ctrl_meas: осsp×4, osst×4, normal mode BMP280_WriteReg(0xF5, 0xA0); // config: t_sb=1000мс, filter=16 } // Чтение данных (упрощённо, без компенсации) typedef struct { float temperature; float pressure; } BMP280_Data; BMP280_Data BMP280_ReadData(void) { uint8_t raw[6]; BMP280_ReadBurst(0xF7, raw, 6); // press_msb, press_lsb, press_xlsb, temp×3 int32_t raw_press = ((int32_t)raw[0] << 12) | ((int32_t)raw[1] << 4) | (raw[2] >> 4); int32_t raw_temp = ((int32_t)raw[3] << 12) | ((int32_t)raw[4] << 4) | (raw[5] >> 4); // Реальный код должен использовать калибровочные коэффициенты из регистров! // Это упрощение для иллюстрации BMP280_Data data; data.temperature = raw_temp / 5120.0f; // Очень грубо! data.pressure = raw_press / 25600.0f; // Очень грубо! return data; } SPI: быстрая связь с периферией// SPI — быстрее I2C, до 45 МГц на STM32F4 // Пример: дисплей ST7735 (128×160 пикселей) // CS-пин вручную (NSS в software-режиме) #define LCD_CS_LOW() HAL_GPIO_WritePin(GPIOA, GPIO_PIN_4, GPIO_PIN_RESET) #define LCD_CS_HIGH() HAL_GPIO_WritePin(GPIOA, GPIO_PIN_4, GPIO_PIN_SET) #define LCD_DC_LOW() HAL_GPIO_WritePin(GPIOA, GPIO_PIN_3, GPIO_PIN_RESET) // Command #define LCD_DC_HIGH() HAL_GPIO_WritePin(GPIOA, GPIO_PIN_3, GPIO_PIN_SET) // Data void LCD_SendCommand(uint8_t cmd) { LCD_DC_LOW(); LCD_CS_LOW(); HAL_SPI_Transmit(&hspi1, &cmd, 1, 100); LCD_CS_HIGH(); } void LCD_SendData(uint8_t *data, uint16_t len) { LCD_DC_HIGH(); LCD_CS_LOW(); HAL_SPI_Transmit(&hspi1, data, len, 1000); LCD_CS_HIGH(); } // Быстрая передача через DMA (не блокирует CPU) void LCD_SendDataDMA(uint8_t *data, uint16_t len) { LCD_DC_HIGH(); LCD_CS_LOW(); HAL_SPI_Transmit_DMA(&hspi1, data, len); // CS поднимется в callback после завершения DMA } void HAL_SPI_TxCpltCallback(SPI_HandleTypeDef *hspi) { if (hspi->Instance == SPI1) { LCD_CS_HIGH(); dma_complete = 1; } } Структура хорошего проекта на STM32project/ ├── Core/ │ ├── Inc/ │ │ ├── main.h │ │ └── stm32f1xx_hal_conf.h │ └── Src/ │ ├── main.c — Только инициализация + main loop │ ├── stm32f1xx_it.c — Обработчики прерываний │ └── syscalls.c — printf retarget ├── Drivers/ │ ├── CMSIS/ — ARM заголовки, системный файл │ └── STM32F1xx_HAL_Driver/ — HAL библиотека (не трогать) ├── App/ — ВАШ КОД здесь! │ ├── sensors/ │ │ ├── bmp280.c / .h │ │ └── ntc.c / .h │ ├── control/ │ │ ├── pid.c / .h │ │ └── state_machine.c / .h │ ├── comm/ │ │ ├── modbus_slave.c / .h │ │ └── protocol.c / .h │ └── app.c — Главная логика приложения └── CMakeLists.txt / .ioc — Конфигурация проекта Принцип: main.c содержит только вызов App_Init() и App_Run(). Вся логика — в директории App/. Типичные ошибки и как их избежать1. Забыли включить тактирование периферии // Без этого HAL_GPIO_Init ничего не сделает! __HAL_RCC_GPIOA_CLK_ENABLE(); __HAL_RCC_USART1_CLK_ENABLE(); __HAL_RCC_SPI1_CLK_ENABLE(); // CubeMX делает это автоматически — используйте его для инициализации 2. Неправильные Alternate Functions STM32F1 имеет фиксированный маппинг пинов. STM32F4+ — гибкий (GPIO_AF1_..., GPIO_AF7_...). Смотрите datasheet, раздел "Alternate function mapping". 3. Блокирующие задержки в прерываниях // НЕЛЬЗЯ! HAL_Delay использует SysTick-прерывание низшего приоритета void HAL_UART_RxCpltCallback(UART_HandleTypeDef *huart) { HAL_Delay(100); // Зависнет если прерывание выше приоритета SysTick! } 4. Запись в HAL-переменную напрямую Не изменяйте поля структур huart1, htim2 вручную в рантайме — используйте API функции. 5. Stack Overflow STM32F103 имеет только 20 КБ RAM. Большие массивы на стеке — прямой путь к Hard Fault. Объявляйте большие буферы как глобальные или статические. Отладка: когда всё идёт не такHard Fault Handler — ваш лучший другvoid HardFault_Handler(void) { // Получаем регистры состояния __asm volatile("TST lr, #4\n" "ITE EQ\n" "MRSEQ r0, MSP\n" "MRSNE r0, PSP\n" "B HardFault_HandlerC"); } void HardFault_HandlerC(uint32_t *stack_frame) { printf("=== HARD FAULT ===\r\n"); printf("PC = 0x%08lX\r\n", stack_frame[6]); // Адрес проблемной инструкции printf("LR = 0x%08lX\r\n", stack_frame[5]); printf("CFSR= 0x%08lX\r\n", SCB->CFSR); // Причина fault while(1); } SWO (Serial Wire Output) — printf без UARTЕсли UART занят, используйте SWO для отладочного вывода. В STM32CubeIDE включается за 5 кликов, вывод идёт в консоль IDE без дополнительного кабеля. ЗаключениеSTM32 — это настоящий профессиональный инструмент. Первые шаги сложнее чем с Arduino: нужно настроить тактирование, разобраться с HAL, понять концепцию прерываний и DMA. Но это окупается многократно: производительность, периферия, потребление, цена в серийном производстве. Рекомендуемый путь: STM32F103 Blue Pill + ST-Link V2 + STM32CubeIDE. Первый проект — мигание LED через прерывание таймера. Второй — чтение кнопки с антидребезгом. Третий — датчик по I2C с выводом в UART. К четвёртому проекту вы уже будете чувствовать себя уверенно. Документация ST: datasheet, Reference Manual, Programming Manual — читайте их. Они написаны хорошо и содержат ответы на все вопросы.
-
Протокол Modbus: полное практическое руководство
Почему Modbus жив и актуален спустя 45 летModbus был разработан компанией Modicon в 1979 году для связи ПЛК по последовательной шине. Прошло почти полвека — а протокол по-прежнему является самым распространённым в промышленной автоматизации. По различным оценкам, более 30 миллионов устройств в мире используют Modbus. Секрет долголетия прост: протокол исключительно прост в понимании и реализации. Он работает по схеме "мастер-слейв", имеет открытую спецификацию, не требует лицензирования, поддерживается абсолютно всеми промышленными устройствами. Сегодня существуют три основные реализации: Modbus RTU — по последовательной шине (RS-232, RS-485) Modbus ASCII — текстовое представление, устарело Modbus TCP — поверх TCP/IP Ethernet Архитектура: мастер и слейвыModbus — строго мастер-слейв (Master-Slave) протокол: Мастер (Master): Инициирует все запросы. Только один мастер в сети. Обычно это ПЛК, SCADA-сервер, промышленный компьютер. Слейв (Slave): Отвечает на запросы мастера. В сети RS-485 может быть до 247 слейвов с адресами 1–247. Адрес 0 — широковещательный (слейвы не отвечают). Важно: Слейв никогда не инициирует передачу! Он только отвечает. Мастер Слейв 1 Слейв 2 Слейв 3 | | | | |── Request → Addr=1 ─────>| | | |<── Response ────────────| | | | | | | |── Request → Addr=2 ──────────────>| | |<── Response ──────────────────────| | Типы данных (регистры)Modbus оперирует четырьмя типами данных: 1. Coils (Coil Status) — Дискретные выходыРазмер: 1 бит Доступ: Чтение и запись мастером Функциональные коды: FC01 (читать), FC05 (записать один), FC15 (записать несколько) Адреса: 00001–09999 (в Modbus-нотации), 0x0000–0xFFFF (в PDU) Применение: Состояния реле, клапанов, двигателей 2. Discrete Inputs (Input Status) — Дискретные входыРазмер: 1 бит Доступ: Только чтение мастером (данные поступают от физических входов) Функциональные коды: FC02 (читать) Адреса: 10001–19999 Применение: Состояния кнопок, датчиков, концевиков 3. Holding Registers — Регистры храненияРазмер: 16 бит (2 байта), беззнаковое целое 0–65535 Доступ: Чтение и запись мастером Функциональные коды: FC03 (читать), FC06 (записать один), FC16 (записать несколько) Адреса: 40001–49999 Применение: Уставки, параметры настройки, команды управления 4. Input Registers — Входные регистрыРазмер: 16 бит Доступ: Только чтение мастером Функциональные коды: FC04 (читать) Адреса: 30001–39999 Применение: Измеренные значения датчиков, счётчики Хранение чисел с плавающей точкой16 бит для float недостаточно. Для передачи float используют два последовательных регистра (32 бит = IEEE 754): Регистр 40001 (HIGH word): первые 16 бит float Регистр 40002 (LOW word): вторые 16 бит float Значение 3.14159: IEEE 754: 0x40490FDB HIGH: 0x4049 LOW: 0x0FDB Важная ловушка: порядок байт и слов (endianness) различается у разных производителей! Четыре варианта: Big-Endian, Little-Endian, Big-Endian Byte Swap, Little-Endian Byte Swap. Смотрите документацию устройства. Modbus RTU: структура фреймаRTU (Remote Terminal Unit) — бинарный формат, максимально компактный. ┌─────────┬──────────────┬──────┬────────────────────┬───────┐ │ Address │ Function Code│ Data │ Data │ CRC │ │ 1 байт │ 1 байт │ ... │ ... │ 2 байт│ └─────────┴──────────────┴──────┴────────────────────┴───────┘ Пример запроса FC03 (читать Holding Registers): Запрос: "Слейв №1, дай мне 2 регистра начиная с адреса 0x0064 (100)" 01 03 00 64 00 02 85 D5 01 — Адрес слейва 03 — Код функции (читать Holding Registers) 00 64 — Начальный адрес (0x0064 = 100) 00 02 — Количество регистров (2) 85 D5 — CRC16 (контрольная сумма) Ответ слейва: 01 03 04 01 F4 00 0A 2B 11 01 — Адрес слейва 03 — Код функции 04 — Количество байт данных (2 регистра × 2 байта = 4) 01 F4 — Значение регистра 100 (0x01F4 = 500) 00 0A — Значение регистра 101 (0x000A = 10) 2B 11 — CRC16 Ответ при ошибке (Exception Response): 01 83 02 C0 F1 01 — Адрес слейва 83 — Код функции + 0x80 (признак ошибки) 02 — Код исключения (02 = Illegal Data Address) C0 F1 — CRC16 Коды исключений:Код Название Описание 01 Illegal Function Устройство не поддерживает данный FC 02 Illegal Data Address Запрошенный адрес не существует 03 Illegal Data Value Недопустимое значение данных 04 Server Device Failure Внутренняя ошибка устройства 06 Server Device Busy Устройство занято, повторите позже Расчёт CRC16CRC16 (Cyclic Redundancy Check) — контрольная сумма для обнаружения ошибок передачи. Алгоритм несложный, но важный: uint16_t ModbusCRC16(uint8_t *buffer, uint16_t length) { uint16_t crc = 0xFFFF; for (uint16_t i = 0; i < length; i++) { crc ^= (uint16_t)buffer[i]; for (uint8_t j = 0; j < 8; j++) { if (crc & 0x0001) { crc >>= 1; crc ^= 0xA001; // Полином Modbus } else { crc >>= 1; } } } return crc; // Младший байт первый в фрейме! } // Использование: uint8_t frame[] = {0x01, 0x03, 0x00, 0x64, 0x00, 0x02}; uint16_t crc = ModbusCRC16(frame, 6); // Добавить в конец фрейма: (crc & 0xFF), (crc >> 8) Важно: В Modbus RTU CRC передаётся младшим байтом вперёд (Little-Endian)! RS-485: физический уровеньModbus RTU работает поверх RS-485 — дифференциальной последовательной шины. Параметры сети:Длина: до 1200 м (при скорости 9600 бод), до 100 м (при 115200 бод) Устройств: до 32 без репитеров, до 247 с репитерами Скорости: 1200, 2400, 4800, 9600, 19200, 38400, 57600, 115200 бод Линия: витая пара, лучше экранированная (STP) Терминаторы: 120 Ом на каждом конце шины Топология — только шина!Мастер ──────┬──────────┬──────────┬──────── Терминатор 120Ом 120Ом ───┘ Слейв1 Слейв2 Слейв3 НЕЛЬЗЯ делать "звезду" — отражения сигнала разрушат связь. Типичные проблемы RS-485:Проблема 1: Нет терминирующих резисторов Симптомы: связь работает на малой скорости, не работает на высокой. Или работает с одним устройством, не работает с несколькими. Решение: 120 Ом строго на двух концах шины — и только там. Проблема 2: Земля не подключена RS-485 — дифференциальный сигнал (A-B), не требует общего провода теоретически. Практически — без общей земли при большой разнице потенциалов (грозозащита, разные здания) трансиверы сгорают. Третий провод "GND" обязателен. Проблема 3: Смешаны A и B Сигнальные линии перепутаны местами. Ошибка типичная при ручном монтаже. Симптом: нет ответа вообще или постоянные ошибки CRC. Проблема 4: Нет pull-up/pull-down резисторов на линии Когда все устройства молчат (пауза между транзакциями), линия "висит в воздухе". Нужны подтягивающие резисторы: A через ~560 Ом на +5В, B через ~560 Ом на GND. Многие USB-RS485 адаптеры имеют их встроенными. Modbus TCP: Ethernet-версияModbus TCP — это Modbus RTU без адреса устройства и без CRC, завёрнутый в TCP/IP пакет. Структура Modbus TCP фрейма:┌─────────────┬──────────────────────────────────────────────┐ │ MBAP Header (7 байт) │ PDU (Protocol Data Unit) │ ├──────┬───────┬──────────┬───────┬──────────────────────────┤ │TrID │ Proto │ Length │ UnitID│ FC │ Data │ │2 байт│ 2 байт│ 2 байта │ 1 байт│1 байт│ N байт │ └──────┴───────┴──────────┴───────┴──────┴──────────────────┘ TrID — Transaction Identifier (любое число, повторяется в ответе) Proto — 0x0000 (всегда) Length — длина оставшейся части (Unit ID + FC + Data) UnitID — адрес устройства (для RTU-TCP шлюзов) Порт: 502 (стандартный, зарезервирован IANA) TCP обеспечивает надёжную доставку — CRC не нужен. Но помните: TCP не обеспечивает реальное время. Задержки могут варьироваться от единиц миллисекунд до нескольких секунд при перегрузке сети. Практика: Modbus на PythonБиблиотека pymodbus (полноценная реализация)# Установка: pip install pymodbus from pymodbus.client import ModbusTcpClient, ModbusSerialClient import struct import time # ========== MODBUS TCP ========== def read_vfd_status_tcp(host: str, port: int = 502, unit_id: int = 1) -> dict: """ Читаем параметры частотника по Modbus TCP. Пример для ABB ACS550. """ client = ModbusTcpClient(host=host, port=port, timeout=3) if not client.connect(): raise ConnectionError(f"Не удалось подключиться к {host}:{port}") try: # Читаем Input Registers 30001-30006 (адрес 0-5) result = client.read_input_registers( address=0, # Начальный адрес (0 = регистр 30001) count=6, # Количество регистров slave=unit_id ) if result.isError(): raise Exception(f"Ошибка Modbus: {result}") regs = result.registers return { 'status_word': regs[0], # Слово состояния 'speed_rpm': regs[1], # Скорость об/мин 'frequency_hz': regs[2] / 100.0, # Частота (0.01 Гц) 'current_a': regs[3] / 10.0, # Ток (0.1 А) 'voltage_v': regs[4], # Напряжение 'power_kw': regs[5] / 10.0, # Мощность (0.1 кВт) 'running': bool(regs[0] & 0x0001), 'fault': bool(regs[0] & 0x0008), } finally: client.close() def write_vfd_command_tcp(host: str, freq_hz: float, run: bool, unit_id: int = 1): """Управление частотником через Modbus TCP""" client = ModbusTcpClient(host=host, port=502, timeout=3) if not client.connect(): raise ConnectionError(f"Не удалось подключиться") try: # Задаём частоту (Holding Register 40002, адрес 1) freq_raw = int(freq_hz * 100) # 50.00 Гц → 5000 client.write_register(address=1, value=freq_raw, slave=unit_id) # Команда пуск/стоп (Holding Register 40001, адрес 0) control_word = 0x0002 if run else 0x0001 # 2=Run, 1=Stop client.write_register(address=0, value=control_word, slave=unit_id) print(f"VFD: {'Пуск' if run else 'Стоп'}, частота {freq_hz} Гц") finally: client.close() # ========== MODBUS RTU ========== def create_rtu_client(port: str, baudrate: int = 9600) -> ModbusSerialClient: """Создаём RTU клиент для RS-485""" return ModbusSerialClient( port=port, # '/dev/ttyUSB0' или 'COM3' baudrate=baudrate, bytesize=8, parity='N', # N=None, E=Even, O=Odd stopbits=1, timeout=1.0 ) def scan_modbus_rtu_network(port: str, baudrate: int = 9600) -> list: """ Сканирование Modbus RTU сети — ищем все активные устройства. Возвращает список адресов ответивших устройств. """ client = create_rtu_client(port, baudrate) client.connect() found_devices = [] print(f"Сканирование Modbus RTU на {port}, {baudrate} бод...") for address in range(1, 248): try: # Пробуем прочитать 1 регистр — если устройство есть, оно ответит result = client.read_holding_registers(0, 1, slave=address) if not result.isError(): found_devices.append(address) print(f" ✅ Найдено устройство: адрес {address}") except Exception: pass # Таймаут — устройства нет time.sleep(0.05) # Пауза между запросами client.close() print(f"Найдено устройств: {len(found_devices)}") return found_devices # ========== РАБОТА С FLOAT ========== def registers_to_float(high_reg: int, low_reg: int, byte_order: str = 'big') -> float: """ Конвертация двух Modbus-регистров в float IEEE 754. byte_order: 'big' (ABCD), 'little' (DCBA), 'big_swap' (BADC), 'little_swap' (CDAB) """ if byte_order == 'big': raw = struct.pack('>HH', high_reg, low_reg) elif byte_order == 'little': raw = struct.pack('<HH', low_reg, high_reg) elif byte_order == 'big_swap': raw = struct.pack('>HH', ((high_reg & 0xFF) << 8) | (high_reg >> 8), ((low_reg & 0xFF) << 8) | (low_reg >> 8)) else: raw = struct.pack('>HH', high_reg, low_reg) return struct.unpack('>f', raw)[0] def float_to_registers(value: float, byte_order: str = 'big') -> tuple: """Конвертация float в два Modbus-регистра""" raw = struct.pack('>f', value) high_reg, low_reg = struct.unpack('>HH', raw) if byte_order == 'big': return high_reg, low_reg elif byte_order == 'little': return low_reg, high_reg return high_reg, low_reg # ========== ПРИМЕР OPROS SCADA ========== class ModbusDataCollector: """ Циклический опрос Modbus-устройств для SCADA/мониторинга. """ def __init__(self, host: str): self.client = ModbusTcpClient(host=host, port=502, timeout=5) self.data = {} def poll_all_devices(self) -> dict: """Опросить все устройства и вернуть данные""" if not self.client.connect(): return {'error': 'connection_failed'} try: results = {} # Насос 1 (Unit ID = 1) pump1 = self.client.read_input_registers(0, 4, slave=1) if not pump1.isError(): r = pump1.registers results['pump1'] = { 'running': bool(r[0] & 1), 'fault': bool(r[0] & 8), 'freq_hz': r[1] / 100.0, 'current_a': r[2] / 10.0, 'power_kw': r[3] / 10.0, } # Датчик давления (Unit ID = 5, счётчик давления) pressure = self.client.read_input_registers(0, 2, slave=5) if not pressure.isError(): r = pressure.registers results['pressure_bar'] = registers_to_float(r[0], r[1]) # Расходомер (Unit ID = 6) flow = self.client.read_input_registers(0, 4, slave=6) if not flow.isError(): r = flow.registers results['flow'] = { 'instant_m3h': registers_to_float(r[0], r[1]), 'total_m3': registers_to_float(r[2], r[3]), } return results finally: self.client.close() # ========== ЗАПУСК ========== if __name__ == "__main__": # Пример использования collector = ModbusDataCollector('192.168.1.100') while True: data = collector.poll_all_devices() print(f"Давление: {data.get('pressure_bar', 0):.2f} бар") print(f"Насос 1: {'Работает' if data.get('pump1', {}).get('running') else 'Стоит'}, " f"{data.get('pump1', {}).get('freq_hz', 0):.1f} Гц") time.sleep(1) Реализация Modbus Slave на микроконтроллере (C)Иногда нужно сделать собственное устройство с Modbus-интерфейсом. Вот минимальная реализация для STM32/Arduino: #include <stdint.h> #include <string.h> #define MODBUS_ADDRESS 1 // Адрес нашего устройства #define HOLDING_REG_COUNT 20 // Количество Holding Registers #define INPUT_REG_COUNT 10 // Количество Input Registers // Данные регистров static uint16_t holding_regs[HOLDING_REG_COUNT] = {0}; static uint16_t input_regs[INPUT_REG_COUNT] = {0}; // Расчёт CRC16 static uint16_t crc16(uint8_t *buf, uint16_t len) { uint16_t crc = 0xFFFF; for (uint16_t i = 0; i < len; i++) { crc ^= buf[i]; for (uint8_t j = 0; j < 8; j++) { if (crc & 1) { crc = (crc >> 1) ^ 0xA001; } else { crc >>= 1; } } } return crc; } // Отправка ответа (реализуйте под вашу платформу) extern void uart_send(uint8_t *data, uint16_t len); // Обработка Modbus запроса void modbus_process_request(uint8_t *request, uint16_t req_len) { uint8_t response[256]; uint16_t resp_len = 0; // Проверяем адрес if (request[0] != MODBUS_ADDRESS) return; // Проверяем CRC uint16_t received_crc = (request[req_len-1] << 8) | request[req_len-2]; uint16_t calculated_crc = crc16(request, req_len - 2); if (received_crc != calculated_crc) return; // CRC ошибка — игнорируем uint8_t fc = request[1]; uint16_t addr = (request[2] << 8) | request[3]; uint16_t count = (request[4] << 8) | request[5]; response[resp_len++] = MODBUS_ADDRESS; response[resp_len++] = fc; switch (fc) { case 0x03: // Читать Holding Registers { if (addr + count > HOLDING_REG_COUNT) { // Исключение: неверный адрес response[1] = fc | 0x80; response[resp_len++] = 0x02; break; } response[resp_len++] = count * 2; // Количество байт for (uint16_t i = 0; i < count; i++) { response[resp_len++] = holding_regs[addr + i] >> 8; response[resp_len++] = holding_regs[addr + i] & 0xFF; } break; } case 0x04: // Читать Input Registers { if (addr + count > INPUT_REG_COUNT) { response[1] = fc | 0x80; response[resp_len++] = 0x02; break; } response[resp_len++] = count * 2; for (uint16_t i = 0; i < count; i++) { response[resp_len++] = input_regs[addr + i] >> 8; response[resp_len++] = input_regs[addr + i] & 0xFF; } break; } case 0x06: // Записать один Holding Register { if (addr >= HOLDING_REG_COUNT) { response[1] = fc | 0x80; response[resp_len++] = 0x02; break; } holding_regs[addr] = (request[4] << 8) | request[5]; // Эхо запроса как ответ memcpy(response + 2, request + 2, 4); resp_len += 4; break; } case 0x10: // Записать несколько Holding Registers { if (addr + count > HOLDING_REG_COUNT) { response[1] = fc | 0x80; response[resp_len++] = 0x02; break; } uint8_t byte_count = request[6]; for (uint16_t i = 0; i < count; i++) { holding_regs[addr + i] = (request[7 + i*2] << 8) | request[8 + i*2]; } // Ответ: адрес и количество записанных регистров response[resp_len++] = request[2]; response[resp_len++] = request[3]; response[resp_len++] = request[4]; response[resp_len++] = request[5]; break; } default: { // Неизвестная функция response[1] = fc | 0x80; response[resp_len++] = 0x01; // Illegal Function break; } } // Добавляем CRC uint16_t resp_crc = crc16(response, resp_len); response[resp_len++] = resp_crc & 0xFF; response[resp_len++] = resp_crc >> 8; // Отправляем ответ uart_send(response, resp_len); } // Обновление Input Registers из реальных данных void modbus_update_inputs(uint16_t temp_x10, uint16_t pressure_x100, uint16_t status_bits) { input_regs[0] = temp_x10; // Температура × 10 (250 = 25.0°C) input_regs[1] = pressure_x100; // Давление × 100 (1013 = 10.13 бар) input_regs[2] = status_bits; // Биты состояния } Диагностика сети Modbus: практические инструментыWireshark для Modbus TCPWireshark понимает Modbus TCP "из коробки". Фильтр для захвата: modbus.func_code — фильтр по функциональному коду tcp.port == 502 — весь Modbus TCP трафик modbus.exception_code — только ошибки ModRSsim2 / Diagslave — эмуляторы слейваНезаменимы при разработке — тестируете мастер без реального оборудования. Свой анализатор на Python:import socket import struct def modbus_tcp_sniffer(host: str, port: int = 502): """Простой анализатор Modbus TCP запросов""" FC_NAMES = { 1: 'Read Coils', 2: 'Read Discrete Inputs', 3: 'Read Holding Registers', 4: 'Read Input Registers', 5: 'Write Single Coil', 6: 'Write Single Register', 15: 'Write Multiple Coils', 16: 'Write Multiple Registers', } sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.connect((host, port)) while True: # Читаем MBAP-заголовок (7 байт) header = sock.recv(7) if len(header) < 7: break transaction_id = struct.unpack('>H', header[0:2])[0] protocol_id = struct.unpack('>H', header[2:4])[0] length = struct.unpack('>H', header[4:6])[0] unit_id = header[6] # Читаем PDU pdu = sock.recv(length - 1) fc = pdu[0] fc_name = FC_NAMES.get(fc, f'Unknown FC {fc}') if fc < 0x80: # Запрос или нормальный ответ if len(pdu) >= 5: addr = struct.unpack('>H', pdu[1:3])[0] count = struct.unpack('>H', pdu[3:5])[0] print(f"[{transaction_id}] Unit={unit_id} | {fc_name} | " f"Addr={addr} Count={count}") else: # Ошибка exc_code = pdu[1] print(f"[{transaction_id}] EXCEPTION: FC={fc & 0x7F} Code={exc_code}") Типичные проблемы и решения"Устройство не отвечает"Алгоритм диагностики: Проверьте физику: контакты, полярность A/B, терминаторы Проверьте параметры порта: baudrate, parity, stopbits — должны совпадать с устройством Проверьте адрес — он правильно задан в устройстве? Не все устройства имеют адрес "1" по умолчанию Используйте осциллограф или логический анализатор — видны ли данные на линии? Попробуйте другой кабель "Иногда не отвечает, CRC-ошибки"Слишком длинная линия без репитера Отсутствуют или не там стоят терминаторы Несколько устройств с одинаковым адресом Помехи от силового оборудования — проложите кабель отдельно "Данные неверные"Неправильный порядок байт (endianness) для float Смещение адреса: в документации адреса часто указываются с 1 (40001), а в запросе нужно с 0 (0x0000) Неправильный масштабный коэффициент (×10, ×100, ×0.01...) ЗаключениеModbus — это фундамент промышленной коммуникации. Несмотря на почтенный возраст, он остаётся стандартом де-факто и будет таковым ещё долгие годы. Понимание структуры фрейма, типов данных и физического уровня RS-485 — это базовый навык любого инженера автоматики. Для новых проектов, где нет ограничений совместимости, стоит рассматривать OPC UA или MQTT как более современные альтернативы. Но если перед вами стоит задача интегрировать любое промышленное оборудование — с вероятностью 90% оно имеет Modbus, и знание этого протокола решит задачу быстро и надёжно.
-
Программирование ПЛК: от нуля до первой рабочей программы
Что такое ПЛК и почему это не просто "мощный Arduino"ПЛК (Программируемый Логический Контроллер) — это специализированный промышленный компьютер, разработанный для управления технологическими процессами в реальном времени. Главное отличие от обычного компьютера или Arduino — детерминизм: гарантированное время реакции на входные сигналы, независимо от загрузки процессора. Когда на производстве нужно, чтобы насос включился строго через 50 мс после срабатывания датчика — это ПЛК. Когда допустима задержка в секунду — можно обойтись более дешёвыми решениями. Когда вопрос в надёжности и работе в условиях вибраций, пыли, температур от -40 до +70°C — снова ПЛК. Другие ключевые отличия: Цикличность: программа выполняется повторяющимися циклами (обычно 1–100 мс). Каждый цикл: считать все входы → выполнить программу → записать все выходы Гальваническая развязка входов/выходов: промышленные сигналы (24В DC, 220В AC) изолированы от внутренней логики Горячая замена модулей: во многих системах можно менять I/O-модули без остановки контроллера Встроенная диагностика: ПЛК сам следит за собственным здоровьем Архитектура ПЛК: как это устроеноЦентральный процессорный модуль (CPU)Выполняет программу, управляет обменом данных, содержит основную память. В S7-1200 CPU также имеет встроенные входы/выходы, Ethernet порт и возможность расширения. Модули ввода-вывода (I/O Modules)Дискретные входы (DI): Воспринимают сигналы "есть напряжение / нет напряжения". Обычно 24В DC или 220В AC. Примеры источников: кнопки, концевые выключатели, датчики приближения (индуктивные, ёмкостные), фотодатчики, реле. Дискретные выходы (DO): Управляют исполнительными устройствами. Бывают транзисторные (24В DC, быстрые) и релейные (любое напряжение до 250В AC, медленные, но универсальные). Аналоговые входы (AI): Принимают непрерывные сигналы: 4–20мА, 0–10В, термопары (тип K, J, T...), термосопротивления (Pt100, Pt1000). Преобразуют в число (обычно 0–27648 для диапазона 0–100%). Аналоговые выходы (AO): Выдают аналоговые сигналы для управления частотниками, регулирующими клапанами, позиционерами. Память ПЛК (на примере Siemens)Область Обозначение Описание Входы I, %IX Образ входов, обновляется каждый цикл Выходы Q, %QX Образ выходов, пишется в физику после цикла Меркеры M, %MX Внутренние биты/байты/слова — "рабочая память" Таймеры T (S7-classic) / IEC-timer Отсчёт времени Счётчики C (S7-classic) / IEC-counter Счёт импульсов Data Blocks DB Блоки данных: рецепты, уставки, история Стандарт МЭК 61131-3: пять языков программированияМеждународный стандарт определяет пять языков для ПЛК. Хороший инженер знает минимум два-три. 1. Ladder Diagram (LD) — Релейно-контактная схемаИсторически первый язык — имитация схем из физических реле. Читается слева направо, как электрическая цепь. Левая шина — "фаза", правая — "ноль". Ток "течёт" если путь замкнут. I0.0 I0.1 Q0.0 Пуск Стоп (НЗ) Выход насос ──┤ ├────┤/├──────────( )── Q0.0 (самоподхват) ──┤ ├── Эта схема — классика: кнопка ПУСК (I0.0) запускает насос (Q0.0), контакт самоподхвата удерживает его включённым, кнопка СТОП (I0.1, нормально-закрытый) его отключает. Преимущества LD: Понятен электрикам без IT-образования, визуально отображает логику цепей, легко отлаживать онлайн (подсвечиваются активные цепи). Недостатки: Громоздкий для сложных вычислений и работы с данными. 2. Function Block Diagram (FBD) — Диаграмма функциональных блоковПрограмма строится из готовых блоков (AND, OR, NOT, таймеры, счётчики, ПИД-регуляторы), соединённых сигнальными линиями. Хорошо подходит для управления потоками сигналов. I0.0 ──┐ ├──[AND]──── Q0.0 I0.1 ──┘ 3. Structured Text (ST) — Структурированный текстЯзык высокого уровня, похожий на Pascal/Ada. Самый мощный для математических вычислений, работы с массивами, строками. (* Программа управления насосом с ПИД *) IF Start AND NOT Stop THEN Running := TRUE; END_IF; IF Stop THEN Running := FALSE; END_IF; (* Расчёт ПИД *) IF Running THEN Error := Setpoint - Feedback; Integral := Integral + Error * CycleTime; Integral := LIMIT(-100.0, Integral, 100.0); (* Ограничение интеграла *) Output := Kp * Error + Ki * Integral + Kd * (Error - PrevError) / CycleTime; Output := LIMIT(0.0, Output, 100.0); PrevError := Error; ELSE Output := 0.0; Integral := 0.0; END_IF; 4. Instruction List (IL) — Список инструкцийАссемблер для ПЛК. Устаревший язык, в новом стандарте МЭК 61131-3 третьей редакции официально deprecated. Знать необязательно. 5. Sequential Function Chart (SFC) — Диаграмма последовательных функцийПохоже на блок-схему или граф состояний. Идеален для описания технологических последовательностей: шаг 1 → условие перехода → шаг 2 → условие → шаг 3... Реальный проект: управление насосной станциейРазберём полноценный пример — система управления двумя насосами с чередованием и защитами. Техническое задание:2 насоса, работают поочерёдно для равномерного износа Автоматическое включение второго при отказе первого Защита от сухого хода (датчик уровня) Защита от давления (реле давления) Ручной/автоматический режим Счётчик моточасов каждого насоса Распределение входов/выходов:ВХОДЫ: I0.0 — Кнопка ПУСК (автоматический режим) I0.1 — Кнопка СТОП I0.2 — Переключатель Авт/Ручной I0.3 — Датчик уровня (нижний предел — сухой ход) I0.4 — Датчик уровня (верхний предел — бак полон) I0.5 — Реле давления насос 1 (авария — нет давления) I0.6 — Реле давления насос 2 I0.7 — Тепловое реле насос 1 (перегрев) I1.0 — Тепловое реле насос 2 I1.1 — Ручное управление насос 1 (в ручном режиме) I1.2 — Ручное управление насос 2 ВЫХОДЫ: Q0.0 — Контактор насос 1 Q0.1 — Контактор насос 2 Q0.2 — Лампа "Работа авто" Q0.3 — Лампа "Авария" Q0.4 — Сирена (авария критическая) Программа на Structured Text (ST):PROGRAM PumpStation (* ===== ПЕРЕМЕННЫЕ ===== *) VAR // Входы btnStart AT %I0.0 : BOOL; btnStop AT %I0.1 : BOOL; swAutoManual AT %I0.2 : BOOL; // TRUE = авто snsLevelLow AT %I0.3 : BOOL; // TRUE = уровень низкий (авария) snsLevelHigh AT %I0.4 : BOOL; // TRUE = бак полон relPressure1 AT %I0.5 : BOOL; // FALSE = нет давления (авария) relPressure2 AT %I0.6 : BOOL; thmRelay1 AT %I0.7 : BOOL; // TRUE = перегрев (авария) thmRelay2 AT %I1.0 : BOOL; btnPump1Manual AT %I1.1 : BOOL; btnPump2Manual AT %I1.2 : BOOL; // Выходы outPump1 AT %Q0.0 : BOOL; outPump2 AT %Q0.1 : BOOL; lampAutoRun AT %Q0.2 : BOOL; lampFault AT %Q0.3 : BOOL; siren AT %Q0.4 : BOOL; // Внутренние переменные SystemRun : BOOL := FALSE; ActivePump : INT := 1; // Какой насос сейчас основной Fault : BOOL := FALSE; FaultCode : INT := 0; Pump1Fault : BOOL := FALSE; Pump2Fault : BOOL := FALSE; // Таймеры TimerPumpStart : TON; // Задержка пуска насоса TimerRotation : TON; // Таймер чередования (8 часов) TimerAlarmDelay : TON; // Задержка подтверждения аварии // Счётчики моточасов Hours_Pump1 : DINT := 0; Hours_Pump2 : DINT := 0; TimerH_Pump1 : TON; TimerH_Pump2 : TON; END_VAR (* ===== ЛОГИКА АВАРИЙ ===== *) // Авария сухого хода — критическая, немедленная остановка IF snsLevelLow THEN Fault := TRUE; FaultCode := 1; // Сухой ход SystemRun := FALSE; END_IF; // Задержанные аварии давления (3 секунды для исключения ложных срабатываний) TimerAlarmDelay(IN := (outPump1 AND NOT relPressure1) OR (outPump2 AND NOT relPressure2), PT := T#3S); IF TimerAlarmDelay.Q THEN IF outPump1 AND NOT relPressure1 THEN Pump1Fault := TRUE; FaultCode := 2; // Нет давления насос 1 END_IF; IF outPump2 AND NOT relPressure2 THEN Pump2Fault := TRUE; FaultCode := 3; // Нет давления насос 2 END_IF; END_IF; // Тепловая защита IF thmRelay1 THEN Pump1Fault := TRUE; FaultCode := 4; // Перегрев насос 1 END_IF; IF thmRelay2 THEN Pump2Fault := TRUE; FaultCode := 5; // Перегрев насос 2 END_IF; // Оба насоса в аварии IF Pump1Fault AND Pump2Fault THEN Fault := TRUE; SystemRun := FALSE; END_IF; (* ===== КОМАНДЫ ПУСК/СТОП ===== *) IF btnStart AND NOT Fault THEN SystemRun := TRUE; END_IF; IF btnStop OR snsLevelHigh THEN // Стоп или бак полон SystemRun := FALSE; END_IF; // Квитирование аварии (нажать СТОП для сброса) IF btnStop THEN Fault := FALSE; FaultCode := 0; Pump1Fault := FALSE; Pump2Fault := FALSE; END_IF; (* ===== АВТОМАТИЧЕСКИЙ РЕЖИМ ===== *) IF swAutoManual AND SystemRun THEN // Чередование каждые 8 часов TimerRotation(IN := SystemRun, PT := T#28800S); // 8 часов = 28800 секунд IF TimerRotation.Q THEN IF ActivePump = 1 THEN ActivePump := 2; ELSE ActivePump := 1; END_IF; TimerRotation(IN := FALSE); // Сброс таймера TimerRotation(IN := TRUE); END_IF; // При аварии основного насоса — переключаемся на резервный IF ActivePump = 1 AND Pump1Fault AND NOT Pump2Fault THEN ActivePump := 2; ELSIF ActivePump = 2 AND Pump2Fault AND NOT Pump1Fault THEN ActivePump := 1; END_IF; // Задержка пуска 1 секунда (защита от дребезга) TimerPumpStart(IN := SystemRun, PT := T#1S); outPump1 := TimerPumpStart.Q AND (ActivePump = 1) AND NOT Pump1Fault; outPump2 := TimerPumpStart.Q AND (ActivePump = 2) AND NOT Pump2Fault; (* ===== РУЧНОЙ РЕЖИМ ===== *) ELSIF NOT swAutoManual THEN outPump1 := btnPump1Manual AND NOT Pump1Fault; outPump2 := btnPump2Manual AND NOT Pump2Fault; ELSE outPump1 := FALSE; outPump2 := FALSE; END_IF; (* ===== СЧЁТЧИКИ МОТОЧАСОВ ===== *) TimerH_Pump1(IN := outPump1, PT := T#1S); IF TimerH_Pump1.Q THEN Hours_Pump1 := Hours_Pump1 + 1; TimerH_Pump1(IN := FALSE); TimerH_Pump1(IN := TRUE); END_IF; TimerH_Pump2(IN := outPump2, PT := T#1S); IF TimerH_Pump2.Q THEN Hours_Pump2 := Hours_Pump2 + 1; END_IF; (* ===== ИНДИКАЦИЯ ===== *) lampAutoRun := SystemRun AND swAutoManual; lampFault := Fault OR Pump1Fault OR Pump2Fault; siren := Fault; // Сирена только при критической аварии END_PROGRAM Таймеры и счётчики: подробноТаймеры и счётчики — основа любой программы ПЛК. В стандарте МЭК 61131-3 они реализованы как функциональные блоки. Типы таймеровTON (Timer On Delay) — таймер с задержкой включения: TimerFan(IN := MotorRun, PT := T#5S); // Q становится TRUE через 5 секунд после включения MotorRun FanStart := TimerFan.Q; TOF (Timer Off Delay) — таймер с задержкой выключения: TimerFan(IN := MotorRun, PT := T#30S); // Q остаётся TRUE ещё 30 секунд после выключения MotorRun // Используется для дополнительного охлаждения после остановки FanRun := TimerFan.Q; TP (Timer Pulse) — таймер импульса: TimerBuzzer(IN := AlarmNew, PT := T#2S); // При фронте AlarmNew генерирует импульс 2 секунды // Независимо от того, сколько ещё держится AlarmNew Buzzer := TimerBuzzer.Q; СчётчикиCTU (Count Up) — счётчик вперёд: CounterBottles(CU := BottleSensor, R := ResetButton, PV := 100); // Считает бутылки, при достижении 100 — Q=TRUE BatchComplete := CounterBottles.Q; CurrentCount := CounterBottles.CV; // Текущее значение CTD (Count Down) — счётчик назад: CounterProducts(CD := ProductSensor, LD := LoadButton, PV := OrderQty); // PV загружается при LD=TRUE OrderComplete := CounterProducts.Q; // Q=TRUE когда CV=0 Работа с аналоговыми сигналамиМасштабирование аналогового входаАналоговый модуль S7-1200 возвращает значение 0–27648 для диапазона 0–100% входного сигнала (4–20мА или 0–10В). Для получения реального значения нужно масштабирование: FUNCTION_BLOCK ScaleAnalog VAR_INPUT RawValue : INT; // Сырое значение от АЦП (0..27648) RawMin : INT; // Мин значение АЦП (обычно 0 или 5530 для 4мА) RawMax : INT; // Макс значение АЦП (обычно 27648) PhysMin : REAL; // Физический минимум (например, 0.0 бар) PhysMax : REAL; // Физический максимум (например, 16.0 бар) END_VAR VAR_OUTPUT PhysValue : REAL; // Результат в физических единицах Broken : BOOL; // Обрыв линии (значение ниже 4мА) END_VAR // Проверка обрыва линии (для 4-20мА: ниже ~5% = обрыв) Broken := (RawValue < (RawMin - 1000)); IF NOT Broken THEN // Линейное масштабирование PhysValue := PhysMin + (REAL(RawValue - RawMin) / REAL(RawMax - RawMin)) * (PhysMax - PhysMin); // Ограничение выхода PhysValue := MAX(PhysMin, MIN(PhysMax, PhysValue)); ELSE PhysValue := PhysMin; // При обрыве — безопасное значение END_IF; END_FUNCTION_BLOCK Использование: // Датчик давления 4-20мА, диапазон 0-16 бар PressureSensor( RawValue := %IW64, // Адрес аналогового входа RawMin := 5530, // 4мА соответствует 5530 RawMax := 27648, // 20мА соответствует 27648 PhysMin := 0.0, PhysMax := 16.0 ); Pressure := PressureSensor.PhysValue; // Давление в барах PressureAlarm := PressureSensor.Broken; // Авария обрыва линии IF Pressure > 12.0 THEN HighPressureAlarm := TRUE; END_IF; Организационные блоки и структура программыПрофессиональная программа ПЛК разделена на функциональные блоки: Организация в TIA Portal (Siemens):OB1 (Main) — Главный цикл ├── FC10: ReadInputs — Чтение и нормализация входов ├── FC20: SafetyLogic — Приоритетные защиты (всегда первыми!) ├── FB30: PumpControl [DB30] — Управление насосами (с памятью) ├── FB40: PIDControl [DB40] — ПИД-регулятор ├── FC50: WriteOutputs — Запись выходов └── FC60: Diagnostics — Диагностика и коммуникации OB35 (Cyclic Interrupt, 10ms) — Быстрые задачи └── FB35: FastCounter — Высокоскоростной счётчик OB82 (I/O Error) — Обработка ошибок I/O-модулей OB121 (Programming Error) — Обработка ошибок программы Принцип: защиты и аварии — всегда в начале главного цикла. Они должны отработать независимо от состояния остальной программы. Отладка и диагностика программыОнлайн-мониторингВ TIA Portal при подключении к ПЛК все блоки отображают реальные значения переменных. В Ladder Diagram активные цепи подсвечиваются зелёным — мгновенно видно что работает. Форсирование переменных (Force)Можно принудительно установить значение входа или переменной для тестирования. Внимание: принудительные значения перекрывают реальные физические сигналы. Не забудьте снять форсирование перед вводом в эксплуатацию! ТрассировкаЗапись значений переменных в реальном времени с временной меткой. Незаменима для поиска редко возникающих ошибок — включаете запись и ждёте появления проблемы. Типичные ошибки начинающих:1. Использование выходных катушек несколько раз в Ladder НЕПРАВИЛЬНО: Цепь 1: I0.0 → Q0.0 (катушка) Цепь 2: I0.1 → Q0.0 (катушка снова!) Вторая катушка перезаписывает первую. Результат Q0.0 определяется только второй цепью. ПРАВИЛЬНО: Цепь 1: I0.0 ──┐ ├── Q0.0 Цепь 2: I0.1 ──┘ 2. Гонка состояний при SET/RESET // ОПАСНО: порядок операций важен IF Condition1 THEN SET(Coil); END_IF; IF Condition2 THEN RESET(Coil); END_IF; // Если оба условия TRUE — RESET побеждает (последний) // Убедитесь, что это желаемое поведение! 3. Деление на ноль // ВСЕГДА проверяйте делитель IF Denominator <> 0.0 THEN Result := Numerator / Denominator; ELSE Result := 0.0; // Безопасное значение END_IF; Коммуникация ПЛК с внешним миромModbus TCP (через Ethernet)Большинство современных ПЛК поддерживают Modbus TCP "из коробки". Это самый распространённый протокол для связи с SCADA, HMI и частотниками. В S7-1200 для Modbus TCP используются системные функциональные блоки: MB_CLIENT — ПЛК как Modbus-мастер (опрашивает устройства) MB_SERVER — ПЛК как Modbus-сервер (отвечает на запросы SCADA) OPC UAСовременный открытый протокол для промышленной коммуникации. Поддерживает семантику данных, безопасность, публикацию/подписку. Все серьёзные ПЛК последних поколений имеют встроенный OPC UA сервер. PROFINET/EtherCATПромышленные реальном-временные сети. PROFINET — стандарт Siemens и Profibus International. EtherCAT — от Beckhoff, исключительно быстрый (цикл 250 мкс). Используются для связи с распределёнными I/O-модулями, сервоприводами, vision-системами. Советы по надёжности программыВсегда инициализируйте переменные — не полагайтесь на "нулевое" начальное значение Используйте watchdog-таймер — если программа "зависла", таймер переводит выходы в безопасное состояние Документируйте каждую переменную — через полгода вы забудете что значит переменная b47 Разделяйте задачи по функциональным блокам — один блок = одна задача Тестируйте на симуляторе перед загрузкой в реальный ПЛК Делайте резервные копии программы перед каждым изменением — версионирование спасало многих Стандартизируйте именование: btnStart — кнопка, snsLevel — датчик, outPump — выход, tmrDelay — таймер ЗаключениеПрограммирование ПЛК — это отдельная инженерная дисциплина на стыке электротехники, автоматики и программирования. Ключевой принцип: программа управляет реальным оборудованием, и любая ошибка может привести к аварии или травме. Поэтому надёжность, защиты и понятность кода здесь важнее красоты архитектуры. Начните с Ladder Diagram — он прозрачен и хорошо отлаживается онлайн. Освойте Structured Text для сложных вычислений. Используйте SFC для технологических последовательностей. И всегда: сначала безопасность, потом функциональность. Хорошая программа ПЛК должна безопасно остановить оборудование при любой нештатной ситуации — потере связи, пропадании питания, выходе из строя датчика. Проектируйте с расчётом на отказ.