Python — второй язык каждого инженера
Matlab стоит дорого. LabVIEW — ещё дороже. Excel мощный, но у него есть потолок. Python — бесплатный, открытый, с огромной экосистемой библиотек для инженерных задач. И с каждым годом он глубже проникает в промышленность.
Инженер-электронщик использует Python для: анализа данных с измерительных приборов, автоматизации рутинных расчётов, создания отчётов, обработки сигналов с АЦП, управления лабораторным оборудованием (VISA/PyVISA), прототипирования алгоритмов перед переносом на микроконтроллер.
Специалист АСУ ТП — для: работы с Modbus/OPC UA, парсинга логов ПЛК, автоматического тестирования, интеграции различных систем.
NumPy: числа быстро
NumPy — фундамент научного Python. Массивы NumPy в 10–100 раз быстрее списков Python для математических операций.
import numpy as np
import time
# ===== БАЗОВЫЕ ОПЕРАЦИИ =====
# Создание массивов
t = np.linspace(0, 10, 1000) # 1000 точек от 0 до 10
f = np.arange(0, 50, 0.1) # От 0 до 50 с шагом 0.1
zeros = np.zeros((3, 4)) # Матрица 3×4 из нулей
eye = np.eye(3) # Единичная матрица 3×3
# Синтетический сигнал (для теста)
freq_signal = 50.0 # Гц
freq_noise = 200.0 # Гц (помеха)
sample_rate = 1000.0 # Гц
t = np.arange(0, 1, 1/sample_rate) # 1 секунда данных
signal_clean = 2.0 * np.sin(2 * np.pi * freq_signal * t)
noise = 0.5 * np.sin(2 * np.pi * freq_noise * t)
noise += 0.2 * np.random.randn(len(t)) # Белый шум
signal_noisy = signal_clean + noise
# ===== СКОРОСТЬ =====
def python_rms(data: list) -> float:
return (sum(x**2 for x in data) / len(data)) ** 0.5
def numpy_rms(data: np.ndarray) -> float:
return np.sqrt(np.mean(data**2))
# Сравнение скорости:
data_list = list(signal_noisy)
data_arr = np.array(data_list)
t0 = time.time(); python_rms(data_list); t_py = time.time() - t0
t0 = time.time(); numpy_rms(data_arr); t_np = time.time() - t0
print(f"Python: {t_py*1000:.2f} мс, NumPy: {t_np*1000:.3f} мс, "
f"Ускорение: {t_py/t_np:.0f}x")
# ===== ИНЖЕНЕРНЫЕ РАСЧЁТЫ =====
def calculate_power_factor(voltage: np.ndarray, current: np.ndarray,
sample_rate: float) -> dict:
"""
Расчёт коэффициента мощности из осциллограмм тока и напряжения.
"""
# RMS значения
V_rms = np.sqrt(np.mean(voltage**2))
I_rms = np.sqrt(np.mean(current**2))
# Активная мощность (среднее произведение)
P = np.mean(voltage * current)
# Полная мощность
S = V_rms * I_rms
# Коэффициент мощности
pf = P / S if S > 0 else 0
# Реактивная мощность
Q = np.sqrt(max(0, S**2 - P**2))
return {
'V_rms': round(V_rms, 2),
'I_rms': round(I_rms, 3),
'P_kw': round(P / 1000, 2),
'Q_kvar': round(Q / 1000, 2),
'S_kva': round(S / 1000, 2),
'PF': round(abs(pf), 3),
}
# Пример использования:
# Генерируем тестовые сигналы 220В 50Гц, ток 10А с φ=30°
t = np.linspace(0, 0.04, 400) # 2 периода
V = 220 * np.sqrt(2) * np.sin(2 * np.pi * 50 * t)
I = 10 * np.sqrt(2) * np.sin(2 * np.pi * 50 * t - np.radians(30))
power = calculate_power_factor(V, I, 10000)
print(f"P={power['P_kw']} кВт, Q={power['Q_kvar']} квар, cos(φ)={power['PF']}")
# Ожидаем: cos(30°) ≈ 0.866
Scipy: анализ сигналов
from scipy import signal, fft
import numpy as np
# ===== FFT: СПЕКТРАЛЬНЫЙ АНАЛИЗ =====
def analyze_spectrum(data: np.ndarray, sample_rate: float) -> dict:
"""
Анализ спектра сигнала через FFT.
Используется для диагностики вибраций, качества электроэнергии.
"""
n = len(data)
# Оконная функция (Hanning) для уменьшения спектральных утечек
window = np.hanning(n)
data_windowed = data * window
# FFT
spectrum = np.abs(fft.rfft(data_windowed))
freqs = fft.rfftfreq(n, 1.0/sample_rate)
# Нормировка (учёт оконной функции)
spectrum = spectrum / (n / 2)
# THD (Total Harmonic Distortion) — для качества сетевого напряжения
# Находим основную частоту (50 Гц)
fundamental_idx = np.argmin(np.abs(freqs - 50.0))
fundamental_amp = spectrum[fundamental_idx]
# Гармоники 2-я...7-я
harmonic_power = sum(
spectrum[np.argmin(np.abs(freqs - 50.0 * n))]**2
for n in range(2, 8)
)
thd = np.sqrt(harmonic_power) / fundamental_amp * 100 # %
# Топ-5 пиков спектра
peak_indices = np.argsort(spectrum)[-10:][::-1]
top_peaks = [(round(freqs[i], 1), round(spectrum[i], 4)) for i in peak_indices]
return {
'freqs': freqs,
'spectrum': spectrum,
'thd_pct': round(thd, 2),
'top_peaks': top_peaks[:5],
'rms': round(np.sqrt(np.mean(data**2)), 4),
}
# ===== ФИЛЬТРАЦИЯ СИГНАЛОВ =====
def design_lowpass_filter(cutoff_hz: float, sample_rate: float,
order: int = 4) -> tuple:
"""
Проектирование фильтра нижних частот Баттерворта.
Используется для сглаживания зашумлённых данных датчиков.
"""
nyquist = sample_rate / 2
normalized_cutoff = cutoff_hz / nyquist
b, a = signal.butter(order, normalized_cutoff, btype='low', analog=False)
return b, a
def apply_filter(data: np.ndarray, b: np.ndarray, a: np.ndarray,
zero_phase: bool = True) -> np.ndarray:
"""
Применение фильтра к сигналу.
zero_phase=True: filtfilt (нет фазового сдвига, требует данных полностью)
zero_phase=False: lfilter (реального времени, есть фазовый сдвиг)
"""
if zero_phase:
return signal.filtfilt(b, a, data) # Двупроходной (офлайн-обработка)
else:
return signal.lfilter(b, a, data) # Однопроходной (онлайн-обработка)
# Пример: фильтрация зашумлённого датчика температуры
sample_rate = 100.0 # 100 Гц
t = np.arange(0, 10, 1/sample_rate)
# Реальная температура (медленно меняется)
true_temp = 75.0 + 5.0 * np.sin(2 * np.pi * 0.1 * t) # 0.1 Гц
# С шумом (50Гц помеха от сети + белый шум)
noisy_temp = true_temp + 2.0 * np.sin(2 * np.pi * 50 * t) + \
0.5 * np.random.randn(len(t))
# Фильтр НЧ с частотой среза 1 Гц (убираем всё выше 1 Гц)
b, a = design_lowpass_filter(cutoff_hz=1.0, sample_rate=sample_rate)
filtered_temp = apply_filter(noisy_temp, b, a)
print(f"Шум до фильтрации: {np.std(noisy_temp - true_temp):.3f}°C")
print(f"Шум после фильтра: {np.std(filtered_temp - true_temp):.3f}°C")
# ===== КОРРЕЛЯЦИЯ И ОБНАРУЖЕНИЕ СИГНАЛА =====
def find_pattern_in_signal(signal_data: np.ndarray,
pattern: np.ndarray) -> list[int]:
"""
Поиск паттерна в сигнале через кросс-корреляцию.
Применение: нахождение пакетов в потоке данных, обнаружение событий.
"""
correlation = np.correlate(signal_data, pattern, mode='valid')
threshold = 0.8 * np.max(np.abs(correlation))
peaks, _ = signal.find_peaks(correlation, height=threshold, distance=len(pattern))
return list(peaks)
Pandas: анализ промышленных данных
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
# ===== ЗАГРУЗКА И ОЧИСТКА ДАННЫХ =====
def load_plc_log(filepath: str) -> pd.DataFrame:
"""
Загрузка и нормализация лога ПЛК.
Типичный формат: CSV с временной меткой и значениями тегов.
"""
df = pd.read_csv(filepath,
parse_dates=['timestamp'],
index_col='timestamp')
# Нормализация имён колонок
df.columns = df.columns.str.lower().str.replace(' ', '_').str.replace('.', '_')
# Приведение типов
numeric_cols = ['temperature', 'pressure', 'current', 'flow']
for col in numeric_cols:
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors='coerce')
bool_cols = ['running', 'fault', 'alarm']
for col in bool_cols:
if col in df.columns:
df[col] = df[col].astype(bool, errors='ignore')
# Удаление дубликатов
df = df[~df.index.duplicated(keep='first')]
# Сортировка по времени
df = df.sort_index()
# Интерполяция пропущенных значений (не более 5 пропусков подряд)
df[numeric_cols] = df[numeric_cols].interpolate(
method='time', limit=5, limit_direction='forward'
)
return df
def analyze_production_data(df: pd.DataFrame) -> dict:
"""
Анализ производственных данных: KPI, простои, отклонения.
"""
results = {}
# ===== ДОСТУПНОСТЬ ОБОРУДОВАНИЯ =====
if 'running' in df.columns:
total_time = (df.index[-1] - df.index[0]).total_seconds() / 3600 # часы
running_time = df['running'].mean() * total_time
results['availability'] = {
'total_hours': round(total_time, 1),
'running_hours': round(running_time, 1),
'availability_pct': round(df['running'].mean() * 100, 1),
}
# ===== АНАЛИЗ ПРОСТОЕВ =====
if 'running' in df.columns:
# Нахождение периодов простоя
running_changes = df['running'].astype(int).diff()
stop_times = df.index[running_changes == -1] # Моменты остановки
start_times = df.index[running_changes == 1] # Моменты пуска
downtimes = []
for stop in stop_times:
# Найти следующий пуск после остановки
next_start = start_times[start_times > stop]
if len(next_start) > 0:
duration = (next_start[0] - stop).total_seconds() / 60 # минуты
downtimes.append({'stop': stop, 'start': next_start[0],
'duration_min': round(duration, 1)})
if downtimes:
dt_df = pd.DataFrame(downtimes)
results['downtimes'] = {
'count': len(dt_df),
'total_min': round(dt_df['duration_min'].sum(), 1),
'avg_min': round(dt_df['duration_min'].mean(), 1),
'max_min': round(dt_df['duration_min'].max(), 1),
'longest_stop': dt_df.loc[dt_df['duration_min'].idxmax(), 'stop'].isoformat(),
}
# ===== СТАТИСТИКА ПАРАМЕТРОВ =====
numeric_cols = df.select_dtypes(include=np.number).columns.tolist()
if numeric_cols:
stats = df[numeric_cols].describe()
results['parameters'] = stats.to_dict()
# ===== ОБНАРУЖЕНИЕ ВЫБРОСОВ (метод IQR) =====
outliers = {}
for col in numeric_cols:
Q1 = df[col].quantile(0.25)
Q3 = df[col].quantile(0.75)
IQR = Q3 - Q1
mask = (df[col] < Q1 - 1.5 * IQR) | (df[col] > Q3 + 1.5 * IQR)
outlier_count = mask.sum()
if outlier_count > 0:
outliers[col] = {
'count': int(outlier_count),
'pct': round(outlier_count / len(df) * 100, 2),
'examples': df[mask][col].head(3).tolist(),
}
results['outliers'] = outliers
return results
# ===== ГЕНЕРАЦИЯ ОТЧЁТОВ =====
def generate_daily_report(df: pd.DataFrame, date: str = None) -> pd.DataFrame:
"""Сводная таблица по часам за день"""
if date:
df = df[df.index.date == pd.Timestamp(date).date()]
# Агрегация по часам
hourly = df.resample('1h').agg({
'temperature': ['mean', 'min', 'max'],
'current': ['mean', 'max'],
'pressure': ['mean', 'min', 'max'],
'running': 'mean', # Доступность за час
'fault': 'any', # Были ли аварии
}).round(2)
# Плоские имена колонок
hourly.columns = ['_'.join(col) for col in hourly.columns]
hourly['availability_pct'] = (hourly['running_mean'] * 100).round(1)
hourly['had_fault'] = hourly['fault_any']
return hourly
# ===== EXCEL ОТЧЁТ =====
def export_to_excel(df: pd.DataFrame, hourly: pd.DataFrame,
kpi: dict, filepath: str):
"""Красивый Excel-отчёт с несколькими листами"""
with pd.ExcelWriter(filepath, engine='xlsxwriter') as writer:
workbook = writer.book
# Форматы
header_fmt = workbook.add_format({
'bold': True, 'bg_color': '#2C3E50', 'font_color': 'white',
'border': 1
})
number_fmt = workbook.add_format({'num_format': '0.0#', 'border': 1})
pct_fmt = workbook.add_format({'num_format': '0.0%', 'border': 1})
bad_fmt = workbook.add_format({'bg_color': '#FFB3B3', 'border': 1})
# ===== Лист 1: KPI =====
ws_kpi = workbook.add_worksheet('KPI')
ws_kpi.write('A1', 'Показатель', header_fmt)
ws_kpi.write('B1', 'Значение', header_fmt)
avail = kpi.get('availability', {})
row = 1
for key, val in avail.items():
ws_kpi.write(row, 0, key)
ws_kpi.write(row, 1, val)
row += 1
ws_kpi.set_column('A:A', 25)
ws_kpi.set_column('B:B', 15)
# ===== Лист 2: Почасовой отчёт =====
hourly.to_excel(writer, sheet_name='Почасовой отчёт', startrow=1)
ws = writer.sheets['Почасовой отчёт']
# Условное форматирование: красим аварийные часы
ws.conditional_format('A2:Z1000', {
'type': 'formula',
'criteria': '=$G2=TRUE', # Если был fault
'format': bad_fmt
})
# ===== Лист 3: Сырые данные (последние 1000 строк) =====
df.tail(1000).to_excel(writer, sheet_name='Данные')
print(f"Отчёт сохранён: {filepath}")
FastAPI: REST API для промышленных данных
# pip install fastapi uvicorn
from fastapi import FastAPI, HTTPException, Query
from pydantic import BaseModel
from typing import Optional
from datetime import datetime
import uvicorn
app = FastAPI(
title="Industrial Data API",
description="API для доступа к данным производственного оборудования",
version="1.0.0"
)
# Модели данных
class TelemetryPoint(BaseModel):
device: str
temperature: float
current: float
pressure: float
running: bool
timestamp: datetime
class DeviceCommand(BaseModel):
device: str
command: str # "start", "stop", "set_setpoint"
value: Optional[float] = None
operator: str
# Имитация БД (в реальности — запросы к InfluxDB/TimescaleDB)
telemetry_db = []
@app.get("/api/v1/devices", summary="Список устройств")
async def get_devices():
return {
"devices": [
{"id": "pump1", "name": "Насос 1", "location": "Линия 1", "online": True},
{"id": "pump2", "name": "Насос 2", "location": "Линия 1", "online": True},
{"id": "valve1","name": "Клапан 1","location": "Линия 2","online": False},
]
}
@app.get("/api/v1/telemetry/{device_id}", summary="Телеметрия устройства")
async def get_telemetry(
device_id: str,
hours: int = Query(default=1, ge=1, le=720, description="Глубина истории"),
resample: str = Query(default="1min", description="Гранулярность: 1s, 1min, 5min, 1h")
):
# Проверка устройства
valid_devices = ["pump1", "pump2", "valve1"]
if device_id not in valid_devices:
raise HTTPException(status_code=404, detail=f"Устройство '{device_id}' не найдено")
# Запрос к historian (имитация)
# В реальности: query_influxdb(device_id, hours, resample)
return {
"device": device_id,
"from": (datetime.now().replace(hour=0, minute=0)).isoformat(),
"to": datetime.now().isoformat(),
"resample": resample,
"points": [
{"time": datetime.now().isoformat(), "temperature": 85.3,
"current": 15.2, "pressure": 5.8, "running": True}
]
}
@app.post("/api/v1/commands", summary="Отправить команду устройству",
status_code=202)
async def send_command(cmd: DeviceCommand):
# Валидация команды
valid_commands = ["start", "stop", "set_setpoint"]
if cmd.command not in valid_commands:
raise HTTPException(status_code=400,
detail=f"Неизвестная команда: {cmd.command}")
if cmd.command == "set_setpoint" and cmd.value is None:
raise HTTPException(status_code=400,
detail="set_setpoint требует параметр value")
# Аудит-лог (обязательно для промышленных систем!)
print(f"[AUDIT] {datetime.now()} | Operator: {cmd.operator} | "
f"Device: {cmd.device} | Command: {cmd.command} | Value: {cmd.value}")
# Отправить команду (через очередь, Modbus, OPC UA...)
# command_queue.put(cmd)
return {"status": "accepted", "command_id": "cmd_123456"}
@app.get("/api/v1/health", summary="Healthcheck")
async def health():
return {"status": "ok", "timestamp": datetime.now().isoformat()}
# Запуск: uvicorn main:app --host 0.0.0.0 --port 8080 --reload
asyncio: асинхронный опрос оборудования
import asyncio
import aiohttp
import json
from datetime import datetime
async def poll_device_modbus(device_id: str, host: str, interval: float = 1.0):
"""Асинхронный опрос устройства через Modbus TCP"""
from pymodbus.client import AsyncModbusTcpClient
async with AsyncModbusTcpClient(host=host, port=502) as client:
print(f"Подключён к {device_id} ({host})")
while True:
start = asyncio.get_event_loop().time()
try:
result = await client.read_input_registers(address=0, count=4, slave=1)
if not result.isError():
data = {
'device': device_id,
'timestamp': datetime.now().isoformat(),
'temperature': result.registers[0] / 10.0,
'current': result.registers[1] / 10.0,
'pressure': result.registers[2] / 100.0,
'running': bool(result.registers[3] & 1),
}
# Публикуем данные (в очередь, БД, MQTT...)
print(f"{device_id}: T={data['temperature']}°C")
else:
print(f"{device_id}: Ошибка Modbus")
except Exception as e:
print(f"{device_id}: {e}")
await asyncio.sleep(5) # Пауза перед повтором
continue
# Точный интервал опроса
elapsed = asyncio.get_event_loop().time() - start
await asyncio.sleep(max(0, interval - elapsed))
async def main():
"""Параллельный опрос нескольких устройств"""
devices = [
("pump1", "192.168.1.10"),
("pump2", "192.168.1.11"),
("valve1", "192.168.1.12"),
]
# Запускаем все опросы параллельно
tasks = [poll_device_modbus(dev_id, host, interval=1.0)
for dev_id, host in devices]
await asyncio.gather(*tasks) # Все работают одновременно!
asyncio.run(main())
Полезные однострочники для инженера
import subprocess, json, struct, serial
from pathlib import Path
# Быстрый Modbus опрос из командной строки:
# python -c "from pymodbus.client import ModbusTcpClient; c=ModbusTcpClient('192.168.1.10'); c.connect(); print(c.read_input_registers(0,4,slave=1).registers)"
# Конвертация hex-дампа в float:
def hex_to_float(hex_str: str) -> float:
return struct.unpack('>f', bytes.fromhex(hex_str.replace(' ','')))[0]
print(hex_to_float("42 48 00 00")) # → 50.0
# Поиск COM-портов:
import serial.tools.list_ports
for p in serial.tools.list_ports.comports():
print(f"{p.device}: {p.description}")
# Быстрый парсинг CSV с временными метками:
df = pd.read_csv('data.csv', parse_dates=['time'], index_col='time')
print(df.resample('5min').mean())
# Сохранение данных в Parquet (быстрее CSV в 10-50 раз):
df.to_parquet('data.parquet', compression='snappy')
df2 = pd.read_parquet('data.parquet')
Заключение
Python — это не замена C для микроконтроллеров и не замена SQL для баз данных. Это клей, который соединяет всё: читает данные из любого источника, анализирует, визуализирует, отправляет куда надо.
Для инженера ключевые библиотеки: NumPy (быстрые вычисления), Pandas (анализ данных), SciPy (сигналы и системы), Matplotlib/Plotly (визуализация), pymodbus (Modbus), pyserial (UART), asyncua (OPC UA), FastAPI (REST API).
Вложите неделю в изучение NumPy и Pandas — окупится сотнями часов сэкономленного времени на анализе данных, отчётах и автоматизации рутины.
Create an account or sign in to leave a review
There are no reviews to display.