Jump to content
View in the app

A better way to browse. Learn more.

T.M.I IThub

A full-screen app on your home screen with push notifications, badges and more.

To install this app on iOS and iPadOS
  1. Tap the Share icon in Safari
  2. Scroll the menu and tap Add to Home Screen.
  3. Tap Add in the top-right corner.
To install this app on Android
  1. Tap the 3-dot menu (⋮) in the top-right corner of the browser.
  2. Tap Add to Home screen or Install app.
  3. Confirm by tapping Install.

Python — второй язык каждого инженера

Matlab стоит дорого. LabVIEW — ещё дороже. Excel мощный, но у него есть потолок. Python — бесплатный, открытый, с огромной экосистемой библиотек для инженерных задач. И с каждым годом он глубже проникает в промышленность.

Инженер-электронщик использует Python для: анализа данных с измерительных приборов, автоматизации рутинных расчётов, создания отчётов, обработки сигналов с АЦП, управления лабораторным оборудованием (VISA/PyVISA), прототипирования алгоритмов перед переносом на микроконтроллер.

Специалист АСУ ТП — для: работы с Modbus/OPC UA, парсинга логов ПЛК, автоматического тестирования, интеграции различных систем.


NumPy: числа быстро

NumPy — фундамент научного Python. Массивы NumPy в 10–100 раз быстрее списков Python для математических операций.

import numpy as np
import time

# ===== БАЗОВЫЕ ОПЕРАЦИИ =====

# Создание массивов
t = np.linspace(0, 10, 1000)    # 1000 точек от 0 до 10
f = np.arange(0, 50, 0.1)       # От 0 до 50 с шагом 0.1
zeros = np.zeros((3, 4))         # Матрица 3×4 из нулей
eye = np.eye(3)                  # Единичная матрица 3×3

# Синтетический сигнал (для теста)
freq_signal = 50.0   # Гц
freq_noise  = 200.0  # Гц (помеха)
sample_rate = 1000.0 # Гц

t = np.arange(0, 1, 1/sample_rate)  # 1 секунда данных

signal_clean = 2.0 * np.sin(2 * np.pi * freq_signal * t)
noise        = 0.5 * np.sin(2 * np.pi * freq_noise * t)
noise       += 0.2 * np.random.randn(len(t))  # Белый шум

signal_noisy = signal_clean + noise

# ===== СКОРОСТЬ =====
def python_rms(data: list) -> float:
    return (sum(x**2 for x in data) / len(data)) ** 0.5

def numpy_rms(data: np.ndarray) -> float:
    return np.sqrt(np.mean(data**2))

# Сравнение скорости:
data_list = list(signal_noisy)
data_arr  = np.array(data_list)

t0 = time.time(); python_rms(data_list); t_py = time.time() - t0
t0 = time.time(); numpy_rms(data_arr);  t_np = time.time() - t0

print(f"Python: {t_py*1000:.2f} мс, NumPy: {t_np*1000:.3f} мс, "
      f"Ускорение: {t_py/t_np:.0f}x")

# ===== ИНЖЕНЕРНЫЕ РАСЧЁТЫ =====

def calculate_power_factor(voltage: np.ndarray, current: np.ndarray,
                             sample_rate: float) -> dict:
    """
    Расчёт коэффициента мощности из осциллограмм тока и напряжения.
    """
    # RMS значения
    V_rms = np.sqrt(np.mean(voltage**2))
    I_rms = np.sqrt(np.mean(current**2))
    
    # Активная мощность (среднее произведение)
    P = np.mean(voltage * current)
    
    # Полная мощность
    S = V_rms * I_rms
    
    # Коэффициент мощности
    pf = P / S if S > 0 else 0
    
    # Реактивная мощность
    Q = np.sqrt(max(0, S**2 - P**2))
    
    return {
        'V_rms':  round(V_rms, 2),
        'I_rms':  round(I_rms, 3),
        'P_kw':   round(P / 1000, 2),
        'Q_kvar': round(Q / 1000, 2),
        'S_kva':  round(S / 1000, 2),
        'PF':     round(abs(pf), 3),
    }

# Пример использования:
# Генерируем тестовые сигналы 220В 50Гц, ток 10А с φ=30°
t = np.linspace(0, 0.04, 400)  # 2 периода
V = 220 * np.sqrt(2) * np.sin(2 * np.pi * 50 * t)
I = 10  * np.sqrt(2) * np.sin(2 * np.pi * 50 * t - np.radians(30))

power = calculate_power_factor(V, I, 10000)
print(f"P={power['P_kw']} кВт, Q={power['Q_kvar']} квар, cos(φ)={power['PF']}")
# Ожидаем: cos(30°) ≈ 0.866

Scipy: анализ сигналов

from scipy import signal, fft
import numpy as np

# ===== FFT: СПЕКТРАЛЬНЫЙ АНАЛИЗ =====

def analyze_spectrum(data: np.ndarray, sample_rate: float) -> dict:
    """
    Анализ спектра сигнала через FFT.
    Используется для диагностики вибраций, качества электроэнергии.
    """
    n = len(data)
    
    # Оконная функция (Hanning) для уменьшения спектральных утечек
    window = np.hanning(n)
    data_windowed = data * window
    
    # FFT
    spectrum = np.abs(fft.rfft(data_windowed))
    freqs    = fft.rfftfreq(n, 1.0/sample_rate)
    
    # Нормировка (учёт оконной функции)
    spectrum = spectrum / (n / 2)
    
    # THD (Total Harmonic Distortion) — для качества сетевого напряжения
    # Находим основную частоту (50 Гц)
    fundamental_idx = np.argmin(np.abs(freqs - 50.0))
    fundamental_amp = spectrum[fundamental_idx]
    
    # Гармоники 2-я...7-я
    harmonic_power = sum(
        spectrum[np.argmin(np.abs(freqs - 50.0 * n))]**2
        for n in range(2, 8)
    )
    
    thd = np.sqrt(harmonic_power) / fundamental_amp * 100  # %
    
    # Топ-5 пиков спектра
    peak_indices = np.argsort(spectrum)[-10:][::-1]
    top_peaks = [(round(freqs[i], 1), round(spectrum[i], 4)) for i in peak_indices]
    
    return {
        'freqs':    freqs,
        'spectrum': spectrum,
        'thd_pct':  round(thd, 2),
        'top_peaks': top_peaks[:5],
        'rms':       round(np.sqrt(np.mean(data**2)), 4),
    }


# ===== ФИЛЬТРАЦИЯ СИГНАЛОВ =====

def design_lowpass_filter(cutoff_hz: float, sample_rate: float,
                            order: int = 4) -> tuple:
    """
    Проектирование фильтра нижних частот Баттерворта.
    Используется для сглаживания зашумлённых данных датчиков.
    """
    nyquist = sample_rate / 2
    normalized_cutoff = cutoff_hz / nyquist
    
    b, a = signal.butter(order, normalized_cutoff, btype='low', analog=False)
    return b, a


def apply_filter(data: np.ndarray, b: np.ndarray, a: np.ndarray,
                  zero_phase: bool = True) -> np.ndarray:
    """
    Применение фильтра к сигналу.
    zero_phase=True: filtfilt (нет фазового сдвига, требует данных полностью)
    zero_phase=False: lfilter (реального времени, есть фазовый сдвиг)
    """
    if zero_phase:
        return signal.filtfilt(b, a, data)  # Двупроходной (офлайн-обработка)
    else:
        return signal.lfilter(b, a, data)   # Однопроходной (онлайн-обработка)


# Пример: фильтрация зашумлённого датчика температуры
sample_rate = 100.0  # 100 Гц
t = np.arange(0, 10, 1/sample_rate)

# Реальная температура (медленно меняется)
true_temp = 75.0 + 5.0 * np.sin(2 * np.pi * 0.1 * t)  # 0.1 Гц

# С шумом (50Гц помеха от сети + белый шум)
noisy_temp = true_temp + 2.0 * np.sin(2 * np.pi * 50 * t) + \
             0.5 * np.random.randn(len(t))

# Фильтр НЧ с частотой среза 1 Гц (убираем всё выше 1 Гц)
b, a = design_lowpass_filter(cutoff_hz=1.0, sample_rate=sample_rate)
filtered_temp = apply_filter(noisy_temp, b, a)

print(f"Шум до фильтрации:  {np.std(noisy_temp - true_temp):.3f}°C")
print(f"Шум после фильтра: {np.std(filtered_temp - true_temp):.3f}°C")


# ===== КОРРЕЛЯЦИЯ И ОБНАРУЖЕНИЕ СИГНАЛА =====

def find_pattern_in_signal(signal_data: np.ndarray, 
                             pattern: np.ndarray) -> list[int]:
    """
    Поиск паттерна в сигнале через кросс-корреляцию.
    Применение: нахождение пакетов в потоке данных, обнаружение событий.
    """
    correlation = np.correlate(signal_data, pattern, mode='valid')
    threshold = 0.8 * np.max(np.abs(correlation))
    
    peaks, _ = signal.find_peaks(correlation, height=threshold, distance=len(pattern))
    return list(peaks)

Pandas: анализ промышленных данных

import pandas as pd
import numpy as np
from datetime import datetime, timedelta

# ===== ЗАГРУЗКА И ОЧИСТКА ДАННЫХ =====

def load_plc_log(filepath: str) -> pd.DataFrame:
    """
    Загрузка и нормализация лога ПЛК.
    Типичный формат: CSV с временной меткой и значениями тегов.
    """
    df = pd.read_csv(filepath, 
                      parse_dates=['timestamp'],
                      index_col='timestamp')
    
    # Нормализация имён колонок
    df.columns = df.columns.str.lower().str.replace(' ', '_').str.replace('.', '_')
    
    # Приведение типов
    numeric_cols = ['temperature', 'pressure', 'current', 'flow']
    for col in numeric_cols:
        if col in df.columns:
            df[col] = pd.to_numeric(df[col], errors='coerce')
    
    bool_cols = ['running', 'fault', 'alarm']
    for col in bool_cols:
        if col in df.columns:
            df[col] = df[col].astype(bool, errors='ignore')
    
    # Удаление дубликатов
    df = df[~df.index.duplicated(keep='first')]
    
    # Сортировка по времени
    df = df.sort_index()
    
    # Интерполяция пропущенных значений (не более 5 пропусков подряд)
    df[numeric_cols] = df[numeric_cols].interpolate(
        method='time', limit=5, limit_direction='forward'
    )
    
    return df


def analyze_production_data(df: pd.DataFrame) -> dict:
    """
    Анализ производственных данных: KPI, простои, отклонения.
    """
    results = {}
    
    # ===== ДОСТУПНОСТЬ ОБОРУДОВАНИЯ =====
    if 'running' in df.columns:
        total_time   = (df.index[-1] - df.index[0]).total_seconds() / 3600  # часы
        running_time = df['running'].mean() * total_time
        
        results['availability'] = {
            'total_hours':   round(total_time, 1),
            'running_hours': round(running_time, 1),
            'availability_pct': round(df['running'].mean() * 100, 1),
        }
    
    # ===== АНАЛИЗ ПРОСТОЕВ =====
    if 'running' in df.columns:
        # Нахождение периодов простоя
        running_changes = df['running'].astype(int).diff()
        stop_times  = df.index[running_changes == -1]  # Моменты остановки
        start_times = df.index[running_changes == 1]   # Моменты пуска
        
        downtimes = []
        for stop in stop_times:
            # Найти следующий пуск после остановки
            next_start = start_times[start_times > stop]
            if len(next_start) > 0:
                duration = (next_start[0] - stop).total_seconds() / 60  # минуты
                downtimes.append({'stop': stop, 'start': next_start[0],
                                   'duration_min': round(duration, 1)})
        
        if downtimes:
            dt_df = pd.DataFrame(downtimes)
            results['downtimes'] = {
                'count':        len(dt_df),
                'total_min':    round(dt_df['duration_min'].sum(), 1),
                'avg_min':      round(dt_df['duration_min'].mean(), 1),
                'max_min':      round(dt_df['duration_min'].max(), 1),
                'longest_stop': dt_df.loc[dt_df['duration_min'].idxmax(), 'stop'].isoformat(),
            }
    
    # ===== СТАТИСТИКА ПАРАМЕТРОВ =====
    numeric_cols = df.select_dtypes(include=np.number).columns.tolist()
    if numeric_cols:
        stats = df[numeric_cols].describe()
        results['parameters'] = stats.to_dict()
    
    # ===== ОБНАРУЖЕНИЕ ВЫБРОСОВ (метод IQR) =====
    outliers = {}
    for col in numeric_cols:
        Q1 = df[col].quantile(0.25)
        Q3 = df[col].quantile(0.75)
        IQR = Q3 - Q1
        
        mask = (df[col] < Q1 - 1.5 * IQR) | (df[col] > Q3 + 1.5 * IQR)
        outlier_count = mask.sum()
        
        if outlier_count > 0:
            outliers[col] = {
                'count': int(outlier_count),
                'pct':   round(outlier_count / len(df) * 100, 2),
                'examples': df[mask][col].head(3).tolist(),
            }
    
    results['outliers'] = outliers
    
    return results


# ===== ГЕНЕРАЦИЯ ОТЧЁТОВ =====

def generate_daily_report(df: pd.DataFrame, date: str = None) -> pd.DataFrame:
    """Сводная таблица по часам за день"""
    if date:
        df = df[df.index.date == pd.Timestamp(date).date()]
    
    # Агрегация по часам
    hourly = df.resample('1h').agg({
        'temperature': ['mean', 'min', 'max'],
        'current':     ['mean', 'max'],
        'pressure':    ['mean', 'min', 'max'],
        'running':     'mean',  # Доступность за час
        'fault':       'any',   # Были ли аварии
    }).round(2)
    
    # Плоские имена колонок
    hourly.columns = ['_'.join(col) for col in hourly.columns]
    hourly['availability_pct'] = (hourly['running_mean'] * 100).round(1)
    hourly['had_fault'] = hourly['fault_any']
    
    return hourly


# ===== EXCEL ОТЧЁТ =====

def export_to_excel(df: pd.DataFrame, hourly: pd.DataFrame,
                     kpi: dict, filepath: str):
    """Красивый Excel-отчёт с несколькими листами"""
    
    with pd.ExcelWriter(filepath, engine='xlsxwriter') as writer:
        workbook = writer.book
        
        # Форматы
        header_fmt = workbook.add_format({
            'bold': True, 'bg_color': '#2C3E50', 'font_color': 'white',
            'border': 1
        })
        number_fmt = workbook.add_format({'num_format': '0.0#', 'border': 1})
        pct_fmt    = workbook.add_format({'num_format': '0.0%', 'border': 1})
        bad_fmt    = workbook.add_format({'bg_color': '#FFB3B3', 'border': 1})
        
        # ===== Лист 1: KPI =====
        ws_kpi = workbook.add_worksheet('KPI')
        ws_kpi.write('A1', 'Показатель', header_fmt)
        ws_kpi.write('B1', 'Значение',   header_fmt)
        
        avail = kpi.get('availability', {})
        row = 1
        for key, val in avail.items():
            ws_kpi.write(row, 0, key)
            ws_kpi.write(row, 1, val)
            row += 1
        
        ws_kpi.set_column('A:A', 25)
        ws_kpi.set_column('B:B', 15)
        
        # ===== Лист 2: Почасовой отчёт =====
        hourly.to_excel(writer, sheet_name='Почасовой отчёт', startrow=1)
        ws = writer.sheets['Почасовой отчёт']
        
        # Условное форматирование: красим аварийные часы
        ws.conditional_format('A2:Z1000', {
            'type': 'formula',
            'criteria': '=$G2=TRUE',  # Если был fault
            'format': bad_fmt
        })
        
        # ===== Лист 3: Сырые данные (последние 1000 строк) =====
        df.tail(1000).to_excel(writer, sheet_name='Данные')
        
    print(f"Отчёт сохранён: {filepath}")

FastAPI: REST API для промышленных данных

# pip install fastapi uvicorn
from fastapi import FastAPI, HTTPException, Query
from pydantic import BaseModel
from typing import Optional
from datetime import datetime
import uvicorn

app = FastAPI(
    title="Industrial Data API",
    description="API для доступа к данным производственного оборудования",
    version="1.0.0"
)

# Модели данных
class TelemetryPoint(BaseModel):
    device:      str
    temperature: float
    current:     float
    pressure:    float
    running:     bool
    timestamp:   datetime

class DeviceCommand(BaseModel):
    device:   str
    command:  str    # "start", "stop", "set_setpoint"
    value:    Optional[float] = None
    operator: str

# Имитация БД (в реальности — запросы к InfluxDB/TimescaleDB)
telemetry_db = []

@app.get("/api/v1/devices", summary="Список устройств")
async def get_devices():
    return {
        "devices": [
            {"id": "pump1", "name": "Насос 1", "location": "Линия 1", "online": True},
            {"id": "pump2", "name": "Насос 2", "location": "Линия 1", "online": True},
            {"id": "valve1","name": "Клапан 1","location": "Линия 2","online": False},
        ]
    }

@app.get("/api/v1/telemetry/{device_id}", summary="Телеметрия устройства")
async def get_telemetry(
    device_id: str,
    hours:     int   = Query(default=1,  ge=1, le=720, description="Глубина истории"),
    resample:  str   = Query(default="1min", description="Гранулярность: 1s, 1min, 5min, 1h")
):
    # Проверка устройства
    valid_devices = ["pump1", "pump2", "valve1"]
    if device_id not in valid_devices:
        raise HTTPException(status_code=404, detail=f"Устройство '{device_id}' не найдено")
    
    # Запрос к historian (имитация)
    # В реальности: query_influxdb(device_id, hours, resample)
    return {
        "device":    device_id,
        "from":      (datetime.now().replace(hour=0, minute=0)).isoformat(),
        "to":        datetime.now().isoformat(),
        "resample":  resample,
        "points":    [
            {"time": datetime.now().isoformat(), "temperature": 85.3,
             "current": 15.2, "pressure": 5.8, "running": True}
        ]
    }

@app.post("/api/v1/commands", summary="Отправить команду устройству",
          status_code=202)
async def send_command(cmd: DeviceCommand):
    # Валидация команды
    valid_commands = ["start", "stop", "set_setpoint"]
    if cmd.command not in valid_commands:
        raise HTTPException(status_code=400,
                           detail=f"Неизвестная команда: {cmd.command}")
    
    if cmd.command == "set_setpoint" and cmd.value is None:
        raise HTTPException(status_code=400,
                           detail="set_setpoint требует параметр value")
    
    # Аудит-лог (обязательно для промышленных систем!)
    print(f"[AUDIT] {datetime.now()} | Operator: {cmd.operator} | "
          f"Device: {cmd.device} | Command: {cmd.command} | Value: {cmd.value}")
    
    # Отправить команду (через очередь, Modbus, OPC UA...)
    # command_queue.put(cmd)
    
    return {"status": "accepted", "command_id": "cmd_123456"}

@app.get("/api/v1/health", summary="Healthcheck")
async def health():
    return {"status": "ok", "timestamp": datetime.now().isoformat()}

# Запуск: uvicorn main:app --host 0.0.0.0 --port 8080 --reload

asyncio: асинхронный опрос оборудования

import asyncio
import aiohttp
import json
from datetime import datetime

async def poll_device_modbus(device_id: str, host: str, interval: float = 1.0):
    """Асинхронный опрос устройства через Modbus TCP"""
    from pymodbus.client import AsyncModbusTcpClient
    
    async with AsyncModbusTcpClient(host=host, port=502) as client:
        print(f"Подключён к {device_id} ({host})")
        
        while True:
            start = asyncio.get_event_loop().time()
            
            try:
                result = await client.read_input_registers(address=0, count=4, slave=1)
                
                if not result.isError():
                    data = {
                        'device':      device_id,
                        'timestamp':   datetime.now().isoformat(),
                        'temperature': result.registers[0] / 10.0,
                        'current':     result.registers[1] / 10.0,
                        'pressure':    result.registers[2] / 100.0,
                        'running':     bool(result.registers[3] & 1),
                    }
                    # Публикуем данные (в очередь, БД, MQTT...)
                    print(f"{device_id}: T={data['temperature']}°C")
                else:
                    print(f"{device_id}: Ошибка Modbus")
            
            except Exception as e:
                print(f"{device_id}: {e}")
                await asyncio.sleep(5)  # Пауза перед повтором
                continue
            
            # Точный интервал опроса
            elapsed = asyncio.get_event_loop().time() - start
            await asyncio.sleep(max(0, interval - elapsed))


async def main():
    """Параллельный опрос нескольких устройств"""
    
    devices = [
        ("pump1",  "192.168.1.10"),
        ("pump2",  "192.168.1.11"),
        ("valve1", "192.168.1.12"),
    ]
    
    # Запускаем все опросы параллельно
    tasks = [poll_device_modbus(dev_id, host, interval=1.0)
             for dev_id, host in devices]
    
    await asyncio.gather(*tasks)  # Все работают одновременно!

asyncio.run(main())

Полезные однострочники для инженера

import subprocess, json, struct, serial
from pathlib import Path

# Быстрый Modbus опрос из командной строки:
# python -c "from pymodbus.client import ModbusTcpClient; c=ModbusTcpClient('192.168.1.10'); c.connect(); print(c.read_input_registers(0,4,slave=1).registers)"

# Конвертация hex-дампа в float:
def hex_to_float(hex_str: str) -> float:
    return struct.unpack('>f', bytes.fromhex(hex_str.replace(' ','')))[0]

print(hex_to_float("42 48 00 00"))  # → 50.0

# Поиск COM-портов:
import serial.tools.list_ports
for p in serial.tools.list_ports.comports():
    print(f"{p.device}: {p.description}")

# Быстрый парсинг CSV с временными метками:
df = pd.read_csv('data.csv', parse_dates=['time'], index_col='time')
print(df.resample('5min').mean())

# Сохранение данных в Parquet (быстрее CSV в 10-50 раз):
df.to_parquet('data.parquet', compression='snappy')
df2 = pd.read_parquet('data.parquet')

Заключение

Python — это не замена C для микроконтроллеров и не замена SQL для баз данных. Это клей, который соединяет всё: читает данные из любого источника, анализирует, визуализирует, отправляет куда надо.

Для инженера ключевые библиотеки: NumPy (быстрые вычисления), Pandas (анализ данных), SciPy (сигналы и системы), Matplotlib/Plotly (визуализация), pymodbus (Modbus), pyserial (UART), asyncua (OPC UA), FastAPI (REST API).

Вложите неделю в изучение NumPy и Pandas — окупится сотнями часов сэкономленного времени на анализе данных, отчётах и автоматизации рутины.

User Feedback

Create an account or sign in to leave a review

There are no reviews to display.

Configure browser push notifications

Chrome (Android)
  1. Tap the lock icon next to the address bar.
  2. Tap Permissions → Notifications.
  3. Adjust your preference.
Chrome (Desktop)
  1. Click the padlock icon in the address bar.
  2. Select Site settings.
  3. Find Notifications and adjust your preference.