Зачем специализированная БД для временных рядов
Технологические данные — это всегда временной ряд: температура каждую секунду, давление каждые 100 мс, состояние оборудования каждые 10 мс. PostgreSQL или MySQL могут хранить такие данные. Но при миллионах записей в день начинаются проблемы.
Почему реляционные БД плохо справляются:
Индексы B-Tree неэффективны для временных запросов ("за последний час")
Запись строк в таблицу с индексами — медленно при высоком темпе
GROUP BY time_interval требует дорогих вычислений
Партиционирование по времени нужно настраивать вручную
Хранение тысяч тегов → тысячи колонок или плохая схема
Что умеют Time-Series TSDB:
Оптимизированная запись: 100 000+ точек/сек на скромном железе
Встроенное сжатие (delta-delta, XOR float compression)
Автоматические retention policies (TTL данных)
Downsampling: автоматически агрегируем "горячие" данные в "холодные"
Встроенные временны́е функции: moving average, rate, derivative
InfluxDB 2.x: промышленный стандарт IoT
Основные концепции
Measurement — аналог таблицы:
measurement: "telemetry"
Tags — индексированные метаданные (строки):
tags: device="conveyor1", location="line1", area="factory"
Fields — неиндексированные данные (числа, строки, bool):
fields: temperature=87.3, current=15.5, running=true
Timestamp — время с нано-точностью.
Точка данных (Point):
measurement,tags fields timestamp
telemetry,device=conveyor1,location=line1 temperature=87.3,current=15.5 1710000000000000000
Почему Tags vs Fields важно
Tags: ИНДЕКСИРОВАНЫ → используйте для группировки/фильтрации
device, location, sensor_type, unit_id
Fields: НЕ индексированы → используйте для числовых данных
temperature, pressure, current, voltage
ОШИБКА: положить temperature в Tag — поиск по значению работает,
но карданальность огромная → индекс разрастётся → InfluxDB замедлится.
ОШИБКА: положить device_id в Field — нельзя эффективно фильтровать по устройству.
Python клиент InfluxDB 2.x:
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS, WriteOptions
from datetime import datetime, timezone
import time
INFLUX_URL = "http://localhost:8086"
INFLUX_TOKEN = "your-api-token-here"
INFLUX_ORG = "factory"
INFLUX_BUCKET = "process_data"
# Клиент с батчевой записью
client = InfluxDBClient(url=INFLUX_URL, token=INFLUX_TOKEN, org=INFLUX_ORG)
write_api = client.write_api(write_options=WriteOptions(
batch_size = 1000, # Накапливаем до 1000 точек
flush_interval = 5_000, # Или сбрасываем каждые 5 секунд
jitter_interval = 500, # ±500мс для сглаживания нагрузки
retry_interval = 5_000, # Retry при ошибке через 5с
max_retry_time = 180_000, # Максимум 3 минуты retry
))
query_api = client.query_api()
# ===== ЗАПИСЬ =====
def write_single_point(device: str, location: str,
temperature: float, current: float, running: bool):
"""Запись одной точки"""
point = (
Point("telemetry")
.tag("device", device)
.tag("location", location)
.field("temperature", temperature)
.field("current", current)
.field("running", int(running)) # bool → int (InfluxDB лучше хранит)
.time(datetime.now(timezone.utc))
)
write_api.write(bucket=INFLUX_BUCKET, record=point)
def write_batch(measurements: list[dict]):
"""
Эффективная пакетная запись.
measurements: [{'device': 'pump1', 'temp': 25.3, 'current': 12.1}, ...]
"""
points = []
for m in measurements:
p = (
Point("telemetry")
.tag("device", m['device'])
.tag("location", m.get('location', 'unknown'))
.field("temperature", float(m.get('temp', 0)))
.field("current", float(m.get('current', 0)))
.field("pressure", float(m.get('pressure', 0)))
)
points.append(p)
write_api.write(bucket=INFLUX_BUCKET, record=points)
# Запись в нативном line protocol (максимальная производительность):
def write_line_protocol(lines: list[str]):
"""
Прямая запись в line protocol — самый быстрый способ.
Формат: measurement[,tag=value...] field=value[,field=value...] [timestamp]
"""
write_api.write(bucket=INFLUX_BUCKET, record='\n'.join(lines),
write_precision=WritePrecision.NANOSECONDS)
# Пример:
lines = [
"telemetry,device=pump1,location=line1 temperature=87.3,current=15.5 1710000000000000000",
"telemetry,device=pump2,location=line1 temperature=72.1,current=8.2 1710000000000000000",
"telemetry,device=valve1,location=line2 position=75.0 1710000000000000000",
]
write_line_protocol(lines)
# ===== ЗАПРОСЫ (Flux) =====
def query_last_hour(device: str) -> list[dict]:
"""Последний час данных устройства"""
flux = f'''
from(bucket: "{INFLUX_BUCKET}")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "telemetry")
|> filter(fn: (r) => r.device == "{device}")
|> filter(fn: (r) => r._field == "temperature" or r._field == "current")
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|> sort(columns: ["_time"])
'''
tables = query_api.query(flux)
results = []
for table in tables:
for record in table.records:
results.append({
'time': record.get_time().isoformat(),
'temperature': record.values.get('temperature'),
'current': record.values.get('current'),
})
return results
def query_aggregated_stats(device: str, window: str = "5m",
range_start: str = "-24h") -> list[dict]:
"""
Агрегированная статистика по временным окнам.
window: "1m", "5m", "1h", "1d"
"""
flux = f'''
from(bucket: "{INFLUX_BUCKET}")
|> range(start: {range_start})
|> filter(fn: (r) => r._measurement == "telemetry" and r.device == "{device}")
|> filter(fn: (r) => r._field == "temperature")
|> aggregateWindow(
every: {window},
fn: (tables=<-, column) => tables |> reduce(
identity: {{mean: 0.0, min: 99999.0, max: -99999.0, count: 0}},
fn: (r, accumulator) => ({{
mean: accumulator.mean + r._value,
min: if r._value < accumulator.min then r._value else accumulator.min,
max: if r._value > accumulator.max then r._value else accumulator.max,
count: accumulator.count + 1,
}})
),
createEmpty: false
)
'''
# Для простого avg/min/max лучше использовать встроенные функции:
flux_simple = f'''
from(bucket: "{INFLUX_BUCKET}")
|> range(start: {range_start})
|> filter(fn: (r) => r._measurement == "telemetry"
and r.device == "{device}"
and r._field == "temperature")
|> aggregateWindow(every: {window}, fn: mean, createEmpty: false)
|> yield(name: "mean")
'''
tables = query_api.query(flux_simple)
return [{'time': r.get_time().isoformat(), 'mean_temp': r.get_value()}
for table in tables for r in table.records]
def query_anomalies(threshold_high: float = 85.0,
range_start: str = "-7d") -> list[dict]:
"""Поиск аномалий — превышений порога"""
flux = f'''
from(bucket: "{INFLUX_BUCKET}")
|> range(start: {range_start})
|> filter(fn: (r) => r._measurement == "telemetry" and r._field == "temperature")
|> filter(fn: (r) => r._value > {threshold_high})
|> group(columns: ["device"])
|> sort(columns: ["_time"], desc: true)
'''
tables = query_api.query(flux)
return [{
'device': r.values.get('device'),
'time': r.get_time().isoformat(),
'value': r.get_value(),
'excess': round(r.get_value() - threshold_high, 2),
} for table in tables for r in table.records]
def query_device_availability(range_start: str = "-30d") -> list[dict]:
"""Доступность (availability) по устройствам за период"""
flux = f'''
import "math"
total = from(bucket: "{INFLUX_BUCKET}")
|> range(start: {range_start})
|> filter(fn: (r) => r._measurement == "telemetry" and r._field == "running")
|> group(columns: ["device"])
|> count()
|> rename(columns: {{_value: "total_count"}})
running = from(bucket: "{INFLUX_BUCKET}")
|> range(start: {range_start})
|> filter(fn: (r) => r._measurement == "telemetry" and r._field == "running")
|> filter(fn: (r) => r._value == 1)
|> group(columns: ["device"])
|> count()
|> rename(columns: {{_value: "running_count"}})
join(tables: {{total, running}}, on: ["device"])
|> map(fn: (r) => ({{ r with availability_pct:
math.round(x: r.running_count / r.total_count * 1000.0) / 10.0
}}))
'''
tables = query_api.query(flux)
return [{'device': r.values.get('device'),
'availability': r.values.get('availability_pct')}
for table in tables for r in table.records]
Retention Policies и Downsampling
Хранить сырые данные с секундным разрешением 10 лет — безумно дорого. Правильная стратегия:
"Горячие" данные: 1 секунда, 30 дней → быстрый SSD
"Тёплые" данные: 1 минута, 1 год → обычный SSD
"Холодные" данные: 1 час, 10 лет → HDD/объектное хранилище
Конфигурация в InfluxDB 2.x:
# Создание bucket с retention 30 дней (сырые данные)
influx bucket create \
--name process_data_raw \
--retention 30d \
--org factory
# Bucket для агрегированных данных (бессрочно)
influx bucket create \
--name process_data_aggregated \
--retention 0 \
--org factory
Задача downsampling (Flux):
def setup_downsampling_task():
"""
Создаём задачу InfluxDB для автоматического downsampling.
Каждые 5 минут агрегируем сырые данные в минутные.
"""
flux_task = '''
option task = {
name: "Downsampling: raw→1min",
every: 5m, // Запускать каждые 5 минут
offset: 1m, // Смещение (ждём пока данные придут)
}
// Читаем сырые данные за последние 5 минут
data = from(bucket: "process_data_raw")
|> range(start: -task.every)
|> filter(fn: (r) => r._measurement == "telemetry")
// Агрегируем каждую числовую метрику
data
|> filter(fn: (r) => r._field == "temperature" or
r._field == "current" or
r._field == "pressure")
|> aggregateWindow(every: 1m, fn: mean, createEmpty: false)
|> set(key: "_measurement", value: "telemetry_1m")
|> to(bucket: "process_data_aggregated")
// Для бинарных данных (running) — используем last
data
|> filter(fn: (r) => r._field == "running")
|> aggregateWindow(every: 1m, fn: last, createEmpty: false)
|> set(key: "_measurement", value: "telemetry_1m")
|> to(bucket: "process_data_aggregated")
'''
# Создание задачи через API
tasks_api = client.tasks_api()
task = tasks_api.create_task_every(
name="Downsampling: raw→1min",
flux=flux_task,
every="5m",
organization=INFLUX_ORG
)
print(f"Задача создана: {task.id}")
TimescaleDB: PostgreSQL для временных рядов
TimescaleDB — расширение PostgreSQL. Если вы уже используете PostgreSQL и знаете SQL — это лучший выбор. Вы получаете TSDB-оптимизации при сохранении полного SQL.
-- Установка расширения
CREATE EXTENSION IF NOT EXISTS timescaledb;
-- Обычная таблица PostgreSQL
CREATE TABLE telemetry (
time TIMESTAMPTZ NOT NULL,
device TEXT NOT NULL,
location TEXT NOT NULL,
temperature FLOAT,
current FLOAT,
pressure FLOAT,
running BOOLEAN,
quality TEXT DEFAULT 'GOOD'
);
-- Превращаем в hypertable (TimescaleDB магия!)
SELECT create_hypertable('telemetry', 'time',
chunk_time_interval => INTERVAL '1 day' -- Партиция = 1 день
);
-- Индекс на часто используемые теги
CREATE INDEX ON telemetry (device, time DESC);
CREATE INDEX ON telemetry (location, time DESC);
-- Compression (сжатие старых данных)
ALTER TABLE telemetry SET (
timescaledb.compress,
timescaledb.compress_segmentby = 'device',
timescaledb.compress_orderby = 'time DESC'
);
-- Автоматическое сжатие данных старше 7 дней
SELECT add_compression_policy('telemetry', INTERVAL '7 days');
-- Автоматическое удаление старых данных (30 дней)
SELECT add_retention_policy('telemetry', INTERVAL '30 days');
Запросы (обычный SQL!):
-- Последний час данных с устройства
SELECT time, temperature, current, running
FROM telemetry
WHERE device = 'pump1'
AND time > NOW() - INTERVAL '1 hour'
ORDER BY time DESC;
-- Среднее по 5-минутным окнам
SELECT
time_bucket('5 minutes', time) AS bucket,
device,
ROUND(AVG(temperature)::numeric, 2) AS avg_temp,
ROUND(MIN(temperature)::numeric, 2) AS min_temp,
ROUND(MAX(temperature)::numeric, 2) AS max_temp,
COUNT(*) AS samples
FROM telemetry
WHERE device = 'pump1'
AND time > NOW() - INTERVAL '24 hours'
GROUP BY bucket, device
ORDER BY bucket DESC;
-- Обнаружение аномалий (значение > avg + 2*stddev)
WITH stats AS (
SELECT
device,
AVG(temperature) AS avg_temp,
STDDEV(temperature) AS std_temp
FROM telemetry
WHERE time > NOW() - INTERVAL '7 days'
GROUP BY device
)
SELECT
t.time, t.device, t.temperature,
s.avg_temp, s.std_temp,
(t.temperature - s.avg_temp) / NULLIF(s.std_temp, 0) AS z_score
FROM telemetry t
JOIN stats s ON t.device = s.device
WHERE t.time > NOW() - INTERVAL '24 hours'
AND ABS((t.temperature - s.avg_temp) / NULLIF(s.std_temp, 0)) > 2.0
ORDER BY ABS((t.temperature - s.avg_temp) / NULLIF(s.std_temp, 0)) DESC
LIMIT 50;
-- Доступность оборудования за месяц
SELECT
device,
COUNT(*) FILTER (WHERE running = true) AS running_count,
COUNT(*) AS total_count,
ROUND(
COUNT(*) FILTER (WHERE running = true)::numeric / COUNT(*) * 100, 1
) AS availability_pct,
SUM(CASE WHEN running THEN 1 ELSE 0 END) *
EXTRACT(EPOCH FROM INTERVAL '1 second') / 3600.0 AS running_hours
FROM telemetry
WHERE time > NOW() - INTERVAL '30 days'
GROUP BY device
ORDER BY availability_pct DESC;
Непрерывные агрегации (Continuous Aggregates):
-- Создаём материализованное представление с автообновлением
CREATE MATERIALIZED VIEW telemetry_5min
WITH (timescaledb.continuous) AS
SELECT
time_bucket('5 minutes', time) AS bucket,
device,
location,
AVG(temperature) AS avg_temp,
MIN(temperature) AS min_temp,
MAX(temperature) AS max_temp,
AVG(current) AS avg_current,
MAX(current) AS max_current,
BOOL_OR(running) AS any_running,
COUNT(*) AS sample_count
FROM telemetry
GROUP BY bucket, device, location
WITH NO DATA;
-- Автоматическое обновление каждые 5 минут
SELECT add_continuous_aggregate_policy('telemetry_5min',
start_offset => INTERVAL '15 minutes',
end_offset => INTERVAL '5 minutes',
schedule_interval => INTERVAL '5 minutes'
);
-- Запрос к агрегированным данным (мгновенно!)
SELECT * FROM telemetry_5min
WHERE device = 'pump1'
AND bucket > NOW() - INTERVAL '24 hours'
ORDER BY bucket DESC;
Python + SQLAlchemy + TimescaleDB:
from sqlalchemy import create_engine, text
from sqlalchemy.orm import Session
import pandas as pd
from datetime import datetime, timedelta, timezone
DATABASE_URL = "postgresql://user:password@localhost:5432/factory_db"
engine = create_engine(DATABASE_URL, pool_size=10, max_overflow=20)
class TelemetryRepository:
def write_batch(self, records: list[dict]) -> int:
"""Пакетная запись телеметрии"""
if not records:
return 0
with engine.begin() as conn:
result = conn.execute(
text("""
INSERT INTO telemetry (time, device, location,
temperature, current, pressure, running)
VALUES (:time, :device, :location,
:temperature, :current, :pressure, :running)
ON CONFLICT DO NOTHING
"""),
records
)
return result.rowcount
def get_latest(self, device: str, fields: list[str] = None) -> dict | None:
"""Последнее значение устройства"""
field_list = ', '.join(fields or ['temperature', 'current', 'pressure', 'running'])
with engine.connect() as conn:
row = conn.execute(
text(f"""
SELECT time, {field_list}
FROM telemetry
WHERE device = :device
ORDER BY time DESC
LIMIT 1
"""),
{'device': device}
).fetchone()
return dict(row._mapping) if row else None
def get_as_dataframe(self, device: str, hours: int = 24) -> pd.DataFrame:
"""Загрузка данных в Pandas DataFrame для анализа"""
query = text("""
SELECT time, temperature, current, pressure, running
FROM telemetry
WHERE device = :device
AND time > :since
ORDER BY time
""")
with engine.connect() as conn:
df = pd.read_sql(
query,
conn,
params={'device': device,
'since': datetime.now(timezone.utc) - timedelta(hours=hours)},
parse_dates=['time'],
index_col='time'
)
return df
def detect_anomalies_zscore(self, device: str,
field: str = 'temperature',
threshold: float = 2.5) -> pd.DataFrame:
"""Обнаружение аномалий методом z-score"""
df = self.get_as_dataframe(device, hours=24)
if df.empty or field not in df.columns:
return pd.DataFrame()
mean = df[field].mean()
std = df[field].std()
if std == 0:
return pd.DataFrame()
df['z_score'] = (df[field] - mean) / std
anomalies = df[df['z_score'].abs() > threshold].copy()
anomalies['is_high'] = anomalies['z_score'] > 0
return anomalies[['z_score', field, 'is_high']]
def get_equipment_report(self, days: int = 30) -> pd.DataFrame:
"""Отчёт по оборудованию за период"""
query = text("""
SELECT
device,
COUNT(*) as total_records,
COUNT(*) FILTER (WHERE running) as running_records,
ROUND((COUNT(*) FILTER (WHERE running)::numeric / COUNT(*) * 100)::numeric, 1) as availability_pct,
ROUND(AVG(temperature)::numeric, 1) as avg_temp,
ROUND(MAX(temperature)::numeric, 1) as max_temp,
ROUND(AVG(current)::numeric, 2) as avg_current
FROM telemetry
WHERE time > NOW() - MAKE_INTERVAL(days => :days)
GROUP BY device
ORDER BY device
""")
with engine.connect() as conn:
return pd.read_sql(query, conn, params={'days': days})
Выбор TSDB: сравнительная таблица
Критерий | InfluxDB 2.x | TimescaleDB | Prometheus |
|---|---|---|---|
Основа | Собственный движок | PostgreSQL | Собственный |
Язык запросов | Flux (мощный, непривычный) | SQL | PromQL |
Производительность записи | ★★★★★ | ★★★★ | ★★★ |
SQL-совместимость | ❌ | ✅ (полная) | ❌ |
Сжатие | ★★★★★ | ★★★★ | ★★★ |
Масштабирование | InfluxDB Enterprise | TimescaleDB | Thanos/Cortex |
Лицензия | BSL (OSS ограничен) | Apache 2 | Apache 2 |
Интеграция с Grafana | ★★★★★ | ★★★★★ | ★★★★★ |
Лучше для | IoT, большой объём тегов | Существующий PostgreSQL-стек | DevOps мониторинг |
Заключение
Выбор TSDB зависит от контекста. InfluxDB — лучший выбор для чистых IoT/телеметрия проектов: максимальная производительность, мощный Flux для временны́х вычислений, отличная экосистема. TimescaleDB — если уже есть PostgreSQL инфраструктура, нужны JOINs с другими данными или разработчики лучше знают SQL.
Ключевые принципы для production: всегда настраивайте retention policies (данные должны автоматически удаляться), используйте downsampling для долгосрочного хранения агрегатов, настройте сжатие (экономия 90%+ дискового пространства), мониторьте производительность самой TSDB.
Deadband-фильтрация на уровне edge-узла (не писать если значение не изменилось существенно) снижает нагрузку на БД в 5–50 раз для медленно меняющихся процессов. Это первое что нужно сделать перед любой оптимизацией TSDB.
Create an account or sign in to leave a review
There are no reviews to display.