Jump to content
View in the app

A better way to browse. Learn more.

T.M.I IThub

A full-screen app on your home screen with push notifications, badges and more.

To install this app on iOS and iPadOS
  1. Tap the Share icon in Safari
  2. Scroll the menu and tap Add to Home Screen.
  3. Tap Add in the top-right corner.
To install this app on Android
  1. Tap the 3-dot menu (⋮) in the top-right corner of the browser.
  2. Tap Add to Home screen or Install app.
  3. Confirm by tapping Install.

Зачем специализированная БД для временных рядов

Технологические данные — это всегда временной ряд: температура каждую секунду, давление каждые 100 мс, состояние оборудования каждые 10 мс. PostgreSQL или MySQL могут хранить такие данные. Но при миллионах записей в день начинаются проблемы.

Почему реляционные БД плохо справляются:

  • Индексы B-Tree неэффективны для временных запросов ("за последний час")

  • Запись строк в таблицу с индексами — медленно при высоком темпе

  • GROUP BY time_interval требует дорогих вычислений

  • Партиционирование по времени нужно настраивать вручную

  • Хранение тысяч тегов → тысячи колонок или плохая схема

Что умеют Time-Series TSDB:

  • Оптимизированная запись: 100 000+ точек/сек на скромном железе

  • Встроенное сжатие (delta-delta, XOR float compression)

  • Автоматические retention policies (TTL данных)

  • Downsampling: автоматически агрегируем "горячие" данные в "холодные"

  • Встроенные временны́е функции: moving average, rate, derivative


InfluxDB 2.x: промышленный стандарт IoT

Основные концепции

Measurement — аналог таблицы:

measurement: "telemetry"

Tags — индексированные метаданные (строки):

tags: device="conveyor1", location="line1", area="factory"

Fields — неиндексированные данные (числа, строки, bool):

fields: temperature=87.3, current=15.5, running=true

Timestamp — время с нано-точностью.

Точка данных (Point):

measurement,tags fields timestamp
telemetry,device=conveyor1,location=line1 temperature=87.3,current=15.5 1710000000000000000

Почему Tags vs Fields важно

Tags:    ИНДЕКСИРОВАНЫ → используйте для группировки/фильтрации
         device, location, sensor_type, unit_id

Fields:  НЕ индексированы → используйте для числовых данных
         temperature, pressure, current, voltage
         
ОШИБКА: положить temperature в Tag — поиск по значению работает,
        но карданальность огромная → индекс разрастётся → InfluxDB замедлится.
ОШИБКА: положить device_id в Field — нельзя эффективно фильтровать по устройству.

Python клиент InfluxDB 2.x:

from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS, WriteOptions
from datetime import datetime, timezone
import time

INFLUX_URL    = "http://localhost:8086"
INFLUX_TOKEN  = "your-api-token-here"
INFLUX_ORG    = "factory"
INFLUX_BUCKET = "process_data"

# Клиент с батчевой записью
client = InfluxDBClient(url=INFLUX_URL, token=INFLUX_TOKEN, org=INFLUX_ORG)

write_api = client.write_api(write_options=WriteOptions(
    batch_size        = 1000,    # Накапливаем до 1000 точек
    flush_interval    = 5_000,   # Или сбрасываем каждые 5 секунд
    jitter_interval   = 500,     # ±500мс для сглаживания нагрузки
    retry_interval    = 5_000,   # Retry при ошибке через 5с
    max_retry_time    = 180_000, # Максимум 3 минуты retry
))

query_api = client.query_api()

# ===== ЗАПИСЬ =====

def write_single_point(device: str, location: str,
                        temperature: float, current: float, running: bool):
    """Запись одной точки"""
    point = (
        Point("telemetry")
        .tag("device",   device)
        .tag("location", location)
        .field("temperature", temperature)
        .field("current",     current)
        .field("running",     int(running))  # bool → int (InfluxDB лучше хранит)
        .time(datetime.now(timezone.utc))
    )
    write_api.write(bucket=INFLUX_BUCKET, record=point)


def write_batch(measurements: list[dict]):
    """
    Эффективная пакетная запись.
    measurements: [{'device': 'pump1', 'temp': 25.3, 'current': 12.1}, ...]
    """
    points = []
    for m in measurements:
        p = (
            Point("telemetry")
            .tag("device",   m['device'])
            .tag("location", m.get('location', 'unknown'))
            .field("temperature", float(m.get('temp',    0)))
            .field("current",     float(m.get('current', 0)))
            .field("pressure",    float(m.get('pressure', 0)))
        )
        points.append(p)
    
    write_api.write(bucket=INFLUX_BUCKET, record=points)


# Запись в нативном line protocol (максимальная производительность):
def write_line_protocol(lines: list[str]):
    """
    Прямая запись в line protocol — самый быстрый способ.
    Формат: measurement[,tag=value...] field=value[,field=value...] [timestamp]
    """
    write_api.write(bucket=INFLUX_BUCKET, record='\n'.join(lines),
                    write_precision=WritePrecision.NANOSECONDS)

# Пример:
lines = [
    "telemetry,device=pump1,location=line1 temperature=87.3,current=15.5 1710000000000000000",
    "telemetry,device=pump2,location=line1 temperature=72.1,current=8.2  1710000000000000000",
    "telemetry,device=valve1,location=line2 position=75.0               1710000000000000000",
]
write_line_protocol(lines)


# ===== ЗАПРОСЫ (Flux) =====

def query_last_hour(device: str) -> list[dict]:
    """Последний час данных устройства"""
    flux = f'''
    from(bucket: "{INFLUX_BUCKET}")
        |> range(start: -1h)
        |> filter(fn: (r) => r._measurement == "telemetry")
        |> filter(fn: (r) => r.device == "{device}")
        |> filter(fn: (r) => r._field == "temperature" or r._field == "current")
        |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
        |> sort(columns: ["_time"])
    '''
    
    tables = query_api.query(flux)
    results = []
    for table in tables:
        for record in table.records:
            results.append({
                'time':        record.get_time().isoformat(),
                'temperature': record.values.get('temperature'),
                'current':     record.values.get('current'),
            })
    return results


def query_aggregated_stats(device: str, window: str = "5m",
                            range_start: str = "-24h") -> list[dict]:
    """
    Агрегированная статистика по временным окнам.
    window: "1m", "5m", "1h", "1d"
    """
    flux = f'''
    from(bucket: "{INFLUX_BUCKET}")
        |> range(start: {range_start})
        |> filter(fn: (r) => r._measurement == "telemetry" and r.device == "{device}")
        |> filter(fn: (r) => r._field == "temperature")
        |> aggregateWindow(
            every: {window},
            fn: (tables=<-, column) => tables |> reduce(
                identity: {{mean: 0.0, min: 99999.0, max: -99999.0, count: 0}},
                fn: (r, accumulator) => ({{
                    mean:  accumulator.mean + r._value,
                    min:   if r._value < accumulator.min  then r._value else accumulator.min,
                    max:   if r._value > accumulator.max  then r._value else accumulator.max,
                    count: accumulator.count + 1,
                }})
            ),
            createEmpty: false
        )
    '''
    # Для простого avg/min/max лучше использовать встроенные функции:
    flux_simple = f'''
    from(bucket: "{INFLUX_BUCKET}")
        |> range(start: {range_start})
        |> filter(fn: (r) => r._measurement == "telemetry" 
                          and r.device == "{device}"
                          and r._field == "temperature")
        |> aggregateWindow(every: {window}, fn: mean, createEmpty: false)
        |> yield(name: "mean")
    '''
    
    tables = query_api.query(flux_simple)
    return [{'time': r.get_time().isoformat(), 'mean_temp': r.get_value()}
            for table in tables for r in table.records]


def query_anomalies(threshold_high: float = 85.0,
                     range_start: str = "-7d") -> list[dict]:
    """Поиск аномалий — превышений порога"""
    flux = f'''
    from(bucket: "{INFLUX_BUCKET}")
        |> range(start: {range_start})
        |> filter(fn: (r) => r._measurement == "telemetry" and r._field == "temperature")
        |> filter(fn: (r) => r._value > {threshold_high})
        |> group(columns: ["device"])
        |> sort(columns: ["_time"], desc: true)
    '''
    
    tables = query_api.query(flux)
    return [{
        'device': r.values.get('device'),
        'time':   r.get_time().isoformat(),
        'value':  r.get_value(),
        'excess': round(r.get_value() - threshold_high, 2),
    } for table in tables for r in table.records]


def query_device_availability(range_start: str = "-30d") -> list[dict]:
    """Доступность (availability) по устройствам за период"""
    flux = f'''
    import "math"
    
    total = from(bucket: "{INFLUX_BUCKET}")
        |> range(start: {range_start})
        |> filter(fn: (r) => r._measurement == "telemetry" and r._field == "running")
        |> group(columns: ["device"])
        |> count()
        |> rename(columns: {{_value: "total_count"}})
    
    running = from(bucket: "{INFLUX_BUCKET}")
        |> range(start: {range_start})
        |> filter(fn: (r) => r._measurement == "telemetry" and r._field == "running")
        |> filter(fn: (r) => r._value == 1)
        |> group(columns: ["device"])
        |> count()
        |> rename(columns: {{_value: "running_count"}})
    
    join(tables: {{total, running}}, on: ["device"])
        |> map(fn: (r) => ({{ r with availability_pct: 
            math.round(x: r.running_count / r.total_count * 1000.0) / 10.0
        }}))
    '''
    
    tables = query_api.query(flux)
    return [{'device': r.values.get('device'),
             'availability': r.values.get('availability_pct')}
            for table in tables for r in table.records]

Retention Policies и Downsampling

Хранить сырые данные с секундным разрешением 10 лет — безумно дорого. Правильная стратегия:

"Горячие" данные:  1 секунда, 30 дней    → быстрый SSD
"Тёплые" данные:   1 минута,  1 год      → обычный SSD
"Холодные" данные: 1 час,     10 лет     → HDD/объектное хранилище

Конфигурация в InfluxDB 2.x:

# Создание bucket с retention 30 дней (сырые данные)
influx bucket create \
    --name process_data_raw \
    --retention 30d \
    --org factory

# Bucket для агрегированных данных (бессрочно)
influx bucket create \
    --name process_data_aggregated \
    --retention 0 \
    --org factory

Задача downsampling (Flux):

def setup_downsampling_task():
    """
    Создаём задачу InfluxDB для автоматического downsampling.
    Каждые 5 минут агрегируем сырые данные в минутные.
    """
    
    flux_task = '''
    option task = {
        name: "Downsampling: raw→1min",
        every: 5m,          // Запускать каждые 5 минут
        offset: 1m,         // Смещение (ждём пока данные придут)
    }
    
    // Читаем сырые данные за последние 5 минут
    data = from(bucket: "process_data_raw")
        |> range(start: -task.every)
        |> filter(fn: (r) => r._measurement == "telemetry")
    
    // Агрегируем каждую числовую метрику
    data
        |> filter(fn: (r) => r._field == "temperature" or 
                              r._field == "current"     or
                              r._field == "pressure")
        |> aggregateWindow(every: 1m, fn: mean, createEmpty: false)
        |> set(key: "_measurement", value: "telemetry_1m")
        |> to(bucket: "process_data_aggregated")
    
    // Для бинарных данных (running) — используем last
    data
        |> filter(fn: (r) => r._field == "running")
        |> aggregateWindow(every: 1m, fn: last, createEmpty: false)
        |> set(key: "_measurement", value: "telemetry_1m")
        |> to(bucket: "process_data_aggregated")
    '''
    
    # Создание задачи через API
    tasks_api = client.tasks_api()
    task = tasks_api.create_task_every(
        name="Downsampling: raw→1min",
        flux=flux_task,
        every="5m",
        organization=INFLUX_ORG
    )
    print(f"Задача создана: {task.id}")

TimescaleDB: PostgreSQL для временных рядов

TimescaleDB — расширение PostgreSQL. Если вы уже используете PostgreSQL и знаете SQL — это лучший выбор. Вы получаете TSDB-оптимизации при сохранении полного SQL.

-- Установка расширения
CREATE EXTENSION IF NOT EXISTS timescaledb;

-- Обычная таблица PostgreSQL
CREATE TABLE telemetry (
    time        TIMESTAMPTZ NOT NULL,
    device      TEXT        NOT NULL,
    location    TEXT        NOT NULL,
    temperature FLOAT,
    current     FLOAT,
    pressure    FLOAT,
    running     BOOLEAN,
    quality     TEXT DEFAULT 'GOOD'
);

-- Превращаем в hypertable (TimescaleDB магия!)
SELECT create_hypertable('telemetry', 'time',
    chunk_time_interval => INTERVAL '1 day'  -- Партиция = 1 день
);

-- Индекс на часто используемые теги
CREATE INDEX ON telemetry (device, time DESC);
CREATE INDEX ON telemetry (location, time DESC);

-- Compression (сжатие старых данных)
ALTER TABLE telemetry SET (
    timescaledb.compress,
    timescaledb.compress_segmentby = 'device',
    timescaledb.compress_orderby = 'time DESC'
);

-- Автоматическое сжатие данных старше 7 дней
SELECT add_compression_policy('telemetry', INTERVAL '7 days');

-- Автоматическое удаление старых данных (30 дней)
SELECT add_retention_policy('telemetry', INTERVAL '30 days');

Запросы (обычный SQL!):

-- Последний час данных с устройства
SELECT time, temperature, current, running
FROM telemetry
WHERE device = 'pump1'
  AND time > NOW() - INTERVAL '1 hour'
ORDER BY time DESC;

-- Среднее по 5-минутным окнам
SELECT 
    time_bucket('5 minutes', time) AS bucket,
    device,
    ROUND(AVG(temperature)::numeric, 2) AS avg_temp,
    ROUND(MIN(temperature)::numeric, 2) AS min_temp,
    ROUND(MAX(temperature)::numeric, 2) AS max_temp,
    COUNT(*) AS samples
FROM telemetry
WHERE device = 'pump1'
  AND time > NOW() - INTERVAL '24 hours'
GROUP BY bucket, device
ORDER BY bucket DESC;

-- Обнаружение аномалий (значение > avg + 2*stddev)
WITH stats AS (
    SELECT 
        device,
        AVG(temperature) AS avg_temp,
        STDDEV(temperature) AS std_temp
    FROM telemetry
    WHERE time > NOW() - INTERVAL '7 days'
    GROUP BY device
)
SELECT 
    t.time, t.device, t.temperature,
    s.avg_temp, s.std_temp,
    (t.temperature - s.avg_temp) / NULLIF(s.std_temp, 0) AS z_score
FROM telemetry t
JOIN stats s ON t.device = s.device
WHERE t.time > NOW() - INTERVAL '24 hours'
  AND ABS((t.temperature - s.avg_temp) / NULLIF(s.std_temp, 0)) > 2.0
ORDER BY ABS((t.temperature - s.avg_temp) / NULLIF(s.std_temp, 0)) DESC
LIMIT 50;

-- Доступность оборудования за месяц
SELECT 
    device,
    COUNT(*) FILTER (WHERE running = true)  AS running_count,
    COUNT(*) AS total_count,
    ROUND(
        COUNT(*) FILTER (WHERE running = true)::numeric / COUNT(*) * 100, 1
    ) AS availability_pct,
    SUM(CASE WHEN running THEN 1 ELSE 0 END) * 
        EXTRACT(EPOCH FROM INTERVAL '1 second') / 3600.0 AS running_hours
FROM telemetry
WHERE time > NOW() - INTERVAL '30 days'
GROUP BY device
ORDER BY availability_pct DESC;

Непрерывные агрегации (Continuous Aggregates):

-- Создаём материализованное представление с автообновлением
CREATE MATERIALIZED VIEW telemetry_5min
WITH (timescaledb.continuous) AS
SELECT 
    time_bucket('5 minutes', time) AS bucket,
    device,
    location,
    AVG(temperature)  AS avg_temp,
    MIN(temperature)  AS min_temp,
    MAX(temperature)  AS max_temp,
    AVG(current)      AS avg_current,
    MAX(current)      AS max_current,
    BOOL_OR(running)  AS any_running,
    COUNT(*)          AS sample_count
FROM telemetry
GROUP BY bucket, device, location
WITH NO DATA;

-- Автоматическое обновление каждые 5 минут
SELECT add_continuous_aggregate_policy('telemetry_5min',
    start_offset => INTERVAL '15 minutes',
    end_offset   => INTERVAL '5 minutes',
    schedule_interval => INTERVAL '5 minutes'
);

-- Запрос к агрегированным данным (мгновенно!)
SELECT * FROM telemetry_5min
WHERE device = 'pump1'
  AND bucket > NOW() - INTERVAL '24 hours'
ORDER BY bucket DESC;

Python + SQLAlchemy + TimescaleDB:

from sqlalchemy import create_engine, text
from sqlalchemy.orm import Session
import pandas as pd
from datetime import datetime, timedelta, timezone

DATABASE_URL = "postgresql://user:password@localhost:5432/factory_db"
engine = create_engine(DATABASE_URL, pool_size=10, max_overflow=20)

class TelemetryRepository:
    
    def write_batch(self, records: list[dict]) -> int:
        """Пакетная запись телеметрии"""
        if not records:
            return 0
        
        with engine.begin() as conn:
            result = conn.execute(
                text("""
                    INSERT INTO telemetry (time, device, location,
                                          temperature, current, pressure, running)
                    VALUES (:time, :device, :location,
                            :temperature, :current, :pressure, :running)
                    ON CONFLICT DO NOTHING
                """),
                records
            )
            return result.rowcount
    
    def get_latest(self, device: str, fields: list[str] = None) -> dict | None:
        """Последнее значение устройства"""
        field_list = ', '.join(fields or ['temperature', 'current', 'pressure', 'running'])
        
        with engine.connect() as conn:
            row = conn.execute(
                text(f"""
                    SELECT time, {field_list}
                    FROM telemetry
                    WHERE device = :device
                    ORDER BY time DESC
                    LIMIT 1
                """),
                {'device': device}
            ).fetchone()
            
            return dict(row._mapping) if row else None
    
    def get_as_dataframe(self, device: str, hours: int = 24) -> pd.DataFrame:
        """Загрузка данных в Pandas DataFrame для анализа"""
        query = text("""
            SELECT time, temperature, current, pressure, running
            FROM telemetry
            WHERE device = :device
              AND time > :since
            ORDER BY time
        """)
        
        with engine.connect() as conn:
            df = pd.read_sql(
                query,
                conn,
                params={'device': device,
                        'since': datetime.now(timezone.utc) - timedelta(hours=hours)},
                parse_dates=['time'],
                index_col='time'
            )
        
        return df
    
    def detect_anomalies_zscore(self, device: str, 
                                  field: str = 'temperature',
                                  threshold: float = 2.5) -> pd.DataFrame:
        """Обнаружение аномалий методом z-score"""
        df = self.get_as_dataframe(device, hours=24)
        
        if df.empty or field not in df.columns:
            return pd.DataFrame()
        
        mean = df[field].mean()
        std  = df[field].std()
        
        if std == 0:
            return pd.DataFrame()
        
        df['z_score'] = (df[field] - mean) / std
        anomalies = df[df['z_score'].abs() > threshold].copy()
        anomalies['is_high'] = anomalies['z_score'] > 0
        
        return anomalies[['z_score', field, 'is_high']]
    
    def get_equipment_report(self, days: int = 30) -> pd.DataFrame:
        """Отчёт по оборудованию за период"""
        query = text("""
            SELECT 
                device,
                COUNT(*) as total_records,
                COUNT(*) FILTER (WHERE running) as running_records,
                ROUND((COUNT(*) FILTER (WHERE running)::numeric / COUNT(*) * 100)::numeric, 1) as availability_pct,
                ROUND(AVG(temperature)::numeric, 1) as avg_temp,
                ROUND(MAX(temperature)::numeric, 1) as max_temp,
                ROUND(AVG(current)::numeric, 2) as avg_current
            FROM telemetry
            WHERE time > NOW() - MAKE_INTERVAL(days => :days)
            GROUP BY device
            ORDER BY device
        """)
        
        with engine.connect() as conn:
            return pd.read_sql(query, conn, params={'days': days})

Выбор TSDB: сравнительная таблица

Критерий

InfluxDB 2.x

TimescaleDB

Prometheus

Основа

Собственный движок

PostgreSQL

Собственный

Язык запросов

Flux (мощный, непривычный)

SQL

PromQL

Производительность записи

★★★★★

★★★★

★★★

SQL-совместимость

(полная)

Сжатие

★★★★★

★★★★

★★★

Масштабирование

InfluxDB Enterprise

TimescaleDB

Thanos/Cortex

Лицензия

BSL (OSS ограничен)

Apache 2

Apache 2

Интеграция с Grafana

★★★★★

★★★★★

★★★★★

Лучше для

IoT, большой объём тегов

Существующий PostgreSQL-стек

DevOps мониторинг


Заключение

Выбор TSDB зависит от контекста. InfluxDB — лучший выбор для чистых IoT/телеметрия проектов: максимальная производительность, мощный Flux для временны́х вычислений, отличная экосистема. TimescaleDB — если уже есть PostgreSQL инфраструктура, нужны JOINs с другими данными или разработчики лучше знают SQL.

Ключевые принципы для production: всегда настраивайте retention policies (данные должны автоматически удаляться), используйте downsampling для долгосрочного хранения агрегатов, настройте сжатие (экономия 90%+ дискового пространства), мониторьте производительность самой TSDB.

Deadband-фильтрация на уровне edge-узла (не писать если значение не изменилось существенно) снижает нагрузку на БД в 5–50 раз для медленно меняющихся процессов. Это первое что нужно сделать перед любой оптимизацией TSDB.

User Feedback

Create an account or sign in to leave a review

There are no reviews to display.

Configure browser push notifications

Chrome (Android)
  1. Tap the lock icon next to the address bar.
  2. Tap Permissions → Notifications.
  3. Adjust your preference.
Chrome (Desktop)
  1. Click the padlock icon in the address bar.
  2. Select Site settings.
  3. Find Notifications and adjust your preference.