node-red-contrib-questdb 0.6.1

Node-RED nodes for writing high-performance time-series data to QuestDB using Influx Line Protocol (ILP). Supports IoT, industrial monitoring, smart buildings, fleet telematics, healthcare, agriculture, and more.

npm install node-red-contrib-questdb

node-red-contrib-questdb

Node-RED nodes for writing data to QuestDB time-series database using the Influx Line Protocol (ILP).

Features

  • High-performance writes using QuestDB's native ILP protocol
  • Connection pooling with automatic reconnection
  • Multiple protocols: HTTP, HTTPS, TCP, TCPS
  • Authentication: Basic auth and Bearer token support
  • TLS/SSL: Full TLS support with certificate verification options
  • Flexible data mapping: Map message fields to QuestDB columns
  • Type support: Symbols, floats, integers, longs, booleans, strings, timestamps, arrays, and decimals
  • Auto-flush: Configurable automatic flushing by row count or time interval
  • Examples included: Ready-to-use flow examples

Installation

Via Node-RED Palette Manager

  1. Open Node-RED
  2. Go to Menu > Manage palette > Install
  3. Search for node-red-contrib-questdb
  4. Click Install

Via npm

cd ~/.node-red
npm install node-red-contrib-questdb

Then restart Node-RED.

Nodes

QuestDB Write

Writes data to QuestDB using the ILP protocol.

Configuration

Connection Settings:

  • Protocol: HTTP (default), HTTPS, TCP, or TCPS
  • Host: QuestDB server hostname or IP
  • Port: 9000 (HTTP) or 9009 (TCP)

Security Settings:

  • Enable Auth: Toggle authentication
  • Auth Type: Username/Password or Bearer Token
  • TLS Verify: Verify server certificate (for HTTPS/TCPS)

Advanced Settings:

  • Auto Flush: Enable automatic flushing
  • Flush Rows: Number of rows before auto-flush (default: 75000)
  • Flush Interval: Time interval for auto-flush in ms (default: 1000)
  • Request Timeout: HTTP request timeout in ms
  • Buffer Size: Initial and maximum buffer sizes

Input Message Format

msg.topic = "table_name";
msg.payload = {
    symbols: {
        tag_name: "sensor1",      // Indexed string columns
        location: "warehouse"
    },
    columns: {
        temperature: 23.5,        // Auto-detected as float
        humidity: 65,             // Auto-detected as float
        status: "active",         // String column
        alert: true               // Boolean column
    },
    timestamp: Date.now()         // Optional: milliseconds or Date object
};

Explicit Type Specification

For precise control over column types:

msg.payload = {
    symbols: { device: "sensor1" },
    columns: {
        value: { value: 123456789, type: "long" },
        price: { value: "123.456789", type: "decimal" },
        readings: { value: [1.1, 2.2, 3.3], type: "array", elementType: "double" }
    },
    timestamp: Date.now()
};

Supported Types:

  • int / integer - 32-bit signed integer
  • long - 64-bit signed integer
  • float - 32-bit floating point
  • double - 64-bit floating point
  • decimal - Arbitrary precision decimal
  • string - Text value
  • boolean - true/false
  • timestamp - Date/time value
  • array - Array with auto-detected element type
  • array_double - Array of doubles
  • array_long - Array of longs
  • array_string - Array of strings

QuestDB Mapper

Maps incoming message fields to QuestDB ILP structure. Useful for transforming data from various sources.

Configuration

  • Table Name: Target table (or use msg.topic)
  • Timestamp Field: Path to timestamp field in message
  • Symbol Mappings: Map source fields to QuestDB symbols
  • Column Mappings: Map source fields to columns with type conversion

Example

Input message:

{
    topic: "sensors",
    payload: {
        device: "sensor1",
        temp: 23.5,
        readings: [1.1, 2.2, 3.3],
        ts: 1699999999000
    }
}

With mappings:

  • Symbol: payload.devicedevice_id
  • Column: payload.temptemperature (double)
  • Column: payload.readingsvalues (array_double)
  • Timestamp: payload.ts

Output:

{
    topic: "sensors",
    payload: {
        symbols: { device_id: "sensor1" },
        columns: {
            temperature: { value: 23.5, type: "double" },
            values: { value: [1.1, 2.2, 3.3], type: "array", elementType: "double" }
        },
        timestamp: 1699999999000
    }
}

Examples

The package includes ready-to-use examples. After installation:

  1. Open Node-RED
  2. Go to Menu > Import
  3. Select Examples > node-red-contrib-questdb

Comprehensive Use Case Examples

1. Industrial IoT - Manufacturing Line Monitoring

Monitor production line equipment with high-frequency sensor data:

// Function node: Production Line Sensor Data
msg.topic = "production_line";
msg.payload = {
    symbols: {
        factory_id: "PLANT_001",
        line_id: "LINE_A3",
        machine_id: "CNC_MILL_007",
        shift: "day"
    },
    columns: {
        // Machine status
        spindle_speed_rpm: { value: 12500, type: "int" },
        feed_rate_mmpm: 450.5,
        tool_wear_percent: 23.7,

        // Vibration analysis (100 samples at 10kHz)
        vibration_x: {
            value: Array.from({length: 100}, () => (Math.random() - 0.5) * 2),
            type: "array",
            elementType: "double"
        },
        vibration_y: {
            value: Array.from({length: 100}, () => (Math.random() - 0.5) * 2),
            type: "array",
            elementType: "double"
        },
        vibration_z: {
            value: Array.from({length: 100}, () => (Math.random() - 0.5) * 2),
            type: "array",
            elementType: "double"
        },

        // Thermal monitoring
        motor_temp_c: 67.3,
        coolant_temp_c: 22.1,
        ambient_temp_c: 24.5,

        // Power consumption
        power_kw: 15.7,
        current_amps: { value: 42.3, type: "double" },

        // Production metrics
        parts_produced: { value: 1247, type: "long" },
        cycle_time_ms: { value: 45230, type: "long" },
        oee_percent: 87.3,

        // Quality metrics
        defect_count: { value: 3, type: "int" },
        in_tolerance: true
    },
    timestamp: Date.now()
};
return msg;

Querying in QuestDB:

-- Real-time machine status
SELECT * FROM production_line
WHERE machine_id = 'CNC_MILL_007'
ORDER BY timestamp DESC LIMIT 1;

-- Hourly OEE trend
SELECT
    timestamp,
    avg(oee_percent) as avg_oee,
    sum(parts_produced) as total_parts,
    sum(defect_count) as total_defects
FROM production_line
WHERE factory_id = 'PLANT_001'
SAMPLE BY 1h;

-- Vibration anomaly detection
SELECT timestamp, machine_id,
    array_avg(vibration_x) as avg_vib_x,
    array_max(vibration_x) as max_vib_x
FROM production_line
WHERE array_max(vibration_x) > 1.5
ORDER BY timestamp DESC;

2. Smart Building - HVAC and Energy Management

Monitor building climate control and energy consumption:

// Function node: Building Management System Data
msg.topic = "building_hvac";
msg.payload = {
    symbols: {
        building_id: "HQ_TOWER_1",
        floor: "F15",
        zone: "ZONE_A",
        hvac_unit: "AHU_15A"
    },
    columns: {
        // Temperature readings
        supply_air_temp_c: 14.2,
        return_air_temp_c: 23.8,
        zone_temp_c: 22.1,
        setpoint_temp_c: 22.0,
        outdoor_temp_c: 31.5,

        // Humidity
        zone_humidity_percent: 45.3,
        setpoint_humidity_percent: 45.0,

        // Airflow
        supply_airflow_cfm: { value: 2500, type: "int" },
        damper_position_percent: 75.0,
        filter_pressure_drop_pa: 125.3,

        // Energy
        cooling_load_kw: 45.7,
        heating_load_kw: 0.0,
        fan_power_kw: 3.2,

        // Equipment status
        compressor_running: true,
        economizer_active: false,
        maintenance_due: false,

        // Occupancy (from sensors)
        occupancy_count: { value: 47, type: "int" },
        co2_ppm: { value: 650, type: "int" },

        // Comfort index (calculated)
        pmv_index: 0.2,  // Predicted Mean Vote (-3 to +3)
        ppd_percent: 6.1  // Predicted Percentage Dissatisfied
    },
    timestamp: Date.now()
};
return msg;

Energy Dashboard Queries:

-- Daily energy consumption by floor
SELECT
    floor,
    sum(cooling_load_kw + heating_load_kw + fan_power_kw) as total_energy_kwh
FROM building_hvac
WHERE building_id = 'HQ_TOWER_1'
    AND timestamp > dateadd('d', -1, now())
SAMPLE BY 1d;

-- Comfort tracking
SELECT
    timestamp,
    zone,
    zone_temp_c,
    zone_humidity_percent,
    pmv_index,
    occupancy_count
FROM building_hvac
WHERE building_id = 'HQ_TOWER_1'
SAMPLE BY 15m;

3. Financial Trading - Market Data Capture

Capture real-time market data and trading signals:

// Function node: Market Data Feed
const ticker = msg.payload.ticker || "AAPL";
const bid = 185.23 + (Math.random() - 0.5) * 0.1;
const ask = bid + 0.01;

msg.topic = "market_data";
msg.payload = {
    symbols: {
        exchange: "NASDAQ",
        ticker: ticker,
        asset_class: "equity"
    },
    columns: {
        // Price data (use decimal for financial precision)
        bid_price: { value: bid.toFixed(4), type: "decimal" },
        ask_price: { value: ask.toFixed(4), type: "decimal" },
        mid_price: { value: ((bid + ask) / 2).toFixed(4), type: "decimal" },
        spread_bps: { value: ((ask - bid) / bid * 10000).toFixed(2), type: "decimal" },

        // Volume
        bid_size: { value: Math.floor(Math.random() * 1000) * 100, type: "long" },
        ask_size: { value: Math.floor(Math.random() * 1000) * 100, type: "long" },

        // Trade data
        last_price: { value: (bid + Math.random() * (ask - bid)).toFixed(4), type: "decimal" },
        last_size: { value: Math.floor(Math.random() * 500) * 100, type: "long" },
        volume_today: { value: Math.floor(Math.random() * 10000000), type: "long" },
        vwap: { value: (185.15 + Math.random() * 0.2).toFixed(4), type: "decimal" },

        // Market microstructure
        trade_count: { value: Math.floor(Math.random() * 100), type: "int" },
        quote_count: { value: Math.floor(Math.random() * 500), type: "int" },

        // Greeks (for options)
        implied_vol: 0.235,

        // Flags
        halted: false,
        auction_mode: false
    },
    timestamp: Date.now()
};
return msg;

Trading Analytics:

-- VWAP calculation
SELECT
    ticker,
    sum(last_price * last_size) / sum(last_size) as vwap,
    sum(volume_today) as total_volume
FROM market_data
WHERE timestamp > dateadd('h', -1, now())
GROUP BY ticker;

-- Spread analysis
SELECT
    timestamp,
    ticker,
    spread_bps,
    bid_size,
    ask_size
FROM market_data
WHERE ticker = 'AAPL'
SAMPLE BY 1s;

4. Weather Station Network

Collect meteorological data from distributed weather stations:

// Function node: Weather Station Data
msg.topic = "weather_stations";
msg.payload = {
    symbols: {
        station_id: "WS_" + String(Math.floor(Math.random() * 100)).padStart(3, "0"),
        region: "pacific_northwest",
        elevation_class: "lowland",
        station_type: "automated"
    },
    columns: {
        // Temperature
        air_temp_c: 18.3 + (Math.random() - 0.5) * 5,
        feels_like_c: 16.8 + (Math.random() - 0.5) * 5,
        dew_point_c: 12.1 + (Math.random() - 0.5) * 3,
        ground_temp_c: 15.2 + (Math.random() - 0.5) * 2,

        // Humidity & Pressure
        relative_humidity_percent: 65 + Math.random() * 20,
        pressure_hpa: { value: (1013.25 + (Math.random() - 0.5) * 20).toFixed(2), type: "decimal" },
        pressure_trend: { value: (Math.random() - 0.5) * 2, type: "double" },

        // Wind
        wind_speed_ms: Math.random() * 10,
        wind_gust_ms: Math.random() * 15,
        wind_direction_deg: { value: Math.floor(Math.random() * 360), type: "int" },

        // Precipitation
        rain_mm: Math.random() * 2,
        rain_rate_mmh: Math.random() * 10,
        rain_24h_mm: { value: (Math.random() * 20).toFixed(1), type: "decimal" },

        // Solar
        solar_radiation_wm2: Math.random() * 800,
        uv_index: { value: Math.floor(Math.random() * 11), type: "int" },

        // Visibility
        visibility_km: 10 + Math.random() * 40,
        cloud_base_m: { value: Math.floor(1000 + Math.random() * 2000), type: "int" },

        // Air Quality
        pm25_ugm3: Math.random() * 50,
        pm10_ugm3: Math.random() * 100,
        aqi: { value: Math.floor(Math.random() * 150), type: "int" },

        // Battery & Status
        battery_voltage: 12.4 + Math.random() * 0.4,
        solar_charging: true,
        sensor_status: "OK"
    },
    timestamp: Date.now()
};
return msg;

Weather Analysis Queries:

-- Regional temperature map
SELECT DISTINCT ON (station_id)
    station_id,
    air_temp_c,
    relative_humidity_percent,
    wind_speed_ms
FROM weather_stations
WHERE region = 'pacific_northwest'
ORDER BY station_id, timestamp DESC;

-- Precipitation totals by region
SELECT
    region,
    sum(rain_mm) as total_rain_mm,
    max(rain_rate_mmh) as max_rain_rate
FROM weather_stations
WHERE timestamp > dateadd('h', -24, now())
SAMPLE BY 1h;

5. Fleet Tracking and Telematics

Monitor vehicle fleet with GPS and diagnostics:

// Function node: Vehicle Telematics
const vehicleId = "VH_" + String(Math.floor(Math.random() * 50) + 1).padStart(3, "0");
const baseLat = 37.7749 + (Math.random() - 0.5) * 0.1;
const baseLon = -122.4194 + (Math.random() - 0.5) * 0.1;

msg.topic = "fleet_telematics";
msg.payload = {
    symbols: {
        vehicle_id: vehicleId,
        fleet_id: "DELIVERY_WEST",
        vehicle_type: "van",
        driver_id: "DRV_" + String(Math.floor(Math.random() * 20) + 1).padStart(3, "0")
    },
    columns: {
        // GPS Position (high precision)
        latitude: { value: baseLat.toFixed(7), type: "decimal" },
        longitude: { value: baseLon.toFixed(7), type: "decimal" },
        altitude_m: { value: Math.floor(Math.random() * 100), type: "int" },
        gps_accuracy_m: 3.5,
        heading_deg: { value: Math.floor(Math.random() * 360), type: "int" },

        // Speed & Motion
        speed_kmh: Math.floor(Math.random() * 80),
        acceleration_ms2: (Math.random() - 0.5) * 3,
        odometer_km: { value: 45678 + Math.floor(Math.random() * 100), type: "long" },

        // Engine Diagnostics (OBD-II)
        engine_rpm: { value: Math.floor(1000 + Math.random() * 3000), type: "int" },
        engine_load_percent: Math.random() * 100,
        coolant_temp_c: 85 + Math.random() * 15,
        intake_temp_c: 25 + Math.random() * 20,
        throttle_position_percent: Math.random() * 100,

        // Fuel
        fuel_level_percent: 45 + Math.random() * 30,
        fuel_rate_lph: 8 + Math.random() * 12,
        fuel_economy_kpl: { value: (12 + Math.random() * 4).toFixed(2), type: "decimal" },

        // Battery & Electrical
        battery_voltage: 13.8 + Math.random() * 0.5,
        alternator_voltage: 14.2 + Math.random() * 0.3,

        // DTC Codes (Diagnostic Trouble Codes)
        dtc_count: { value: Math.floor(Math.random() * 3), type: "int" },
        check_engine_light: Math.random() > 0.95,

        // Trip Data
        trip_distance_km: { value: (Math.random() * 50).toFixed(2), type: "decimal" },
        idle_time_sec: { value: Math.floor(Math.random() * 300), type: "int" },

        // Safety Events
        harsh_braking: Math.random() > 0.9,
        harsh_acceleration: Math.random() > 0.92,
        harsh_cornering: Math.random() > 0.95,

        // Cargo (for delivery vehicles)
        cargo_door_open: false,
        cargo_temp_c: 4.2 + Math.random() * 2,
        deliveries_completed: { value: Math.floor(Math.random() * 15), type: "int" }
    },
    timestamp: Date.now()
};
return msg;

Fleet Management Queries:

-- Current fleet positions
SELECT DISTINCT ON (vehicle_id)
    vehicle_id,
    driver_id,
    latitude,
    longitude,
    speed_kmh,
    fuel_level_percent
FROM fleet_telematics
WHERE fleet_id = 'DELIVERY_WEST'
ORDER BY vehicle_id, timestamp DESC;

-- Driver safety scores
SELECT
    driver_id,
    count(*) as total_events,
    sum(case when harsh_braking then 1 else 0 end) as harsh_brakes,
    sum(case when harsh_acceleration then 1 else 0 end) as harsh_accel,
    sum(case when harsh_cornering then 1 else 0 end) as harsh_corners
FROM fleet_telematics
WHERE timestamp > dateadd('d', -7, now())
GROUP BY driver_id
ORDER BY total_events DESC;

-- Fuel efficiency by vehicle
SELECT
    vehicle_id,
    avg(fuel_economy_kpl) as avg_fuel_economy,
    sum(trip_distance_km) as total_distance
FROM fleet_telematics
WHERE timestamp > dateadd('d', -30, now())
GROUP BY vehicle_id;

6. Healthcare - Patient Monitoring

Collect vital signs from patient monitoring devices:

// Function node: Patient Vitals
msg.topic = "patient_vitals";
msg.payload = {
    symbols: {
        patient_id: "PT_" + String(Math.floor(Math.random() * 100) + 1).padStart(4, "0"),
        ward: "ICU_A",
        bed: "BED_" + String(Math.floor(Math.random() * 20) + 1).padStart(2, "0"),
        device_type: "bedside_monitor"
    },
    columns: {
        // Heart
        heart_rate_bpm: { value: Math.floor(60 + Math.random() * 40), type: "int" },
        heart_rate_variability_ms: Math.floor(20 + Math.random() * 30),

        // ECG Waveform (250Hz sampling, 1 second)
        ecg_waveform: {
            value: Array.from({length: 250}, () => Math.sin(Math.random() * Math.PI * 2) * 0.5 + Math.random() * 0.1),
            type: "array",
            elementType: "double"
        },

        // Blood Pressure
        systolic_mmhg: { value: Math.floor(110 + Math.random() * 30), type: "int" },
        diastolic_mmhg: { value: Math.floor(70 + Math.random() * 20), type: "int" },
        map_mmhg: { value: Math.floor(80 + Math.random() * 20), type: "int" },  // Mean Arterial Pressure

        // Respiratory
        respiratory_rate: { value: Math.floor(12 + Math.random() * 8), type: "int" },
        spo2_percent: { value: (95 + Math.random() * 4).toFixed(1), type: "decimal" },
        etco2_mmhg: { value: Math.floor(35 + Math.random() * 10), type: "int" },

        // Respiratory waveform
        pleth_waveform: {
            value: Array.from({length: 100}, (_, i) => Math.sin(i * 0.1) * 50 + 100),
            type: "array",
            elementType: "double"
        },

        // Temperature
        body_temp_c: { value: (36.5 + Math.random() * 1.5).toFixed(1), type: "decimal" },
        skin_temp_c: { value: (32 + Math.random() * 2).toFixed(1), type: "decimal" },

        // Consciousness
        gcs_score: { value: Math.floor(13 + Math.random() * 3), type: "int" },  // Glasgow Coma Scale

        // Pain (if conscious)
        pain_score: { value: Math.floor(Math.random() * 5), type: "int" },

        // IV Fluids
        iv_rate_mlh: { value: Math.floor(50 + Math.random() * 100), type: "int" },
        iv_total_ml: { value: Math.floor(500 + Math.random() * 1500), type: "int" },

        // Alarms
        alarm_active: Math.random() > 0.9,
        alarm_priority: Math.random() > 0.95 ? "high" : "low"
    },
    timestamp: Date.now()
};
return msg;

Clinical Queries:

-- Current patient status
SELECT DISTINCT ON (patient_id)
    patient_id,
    ward,
    bed,
    heart_rate_bpm,
    systolic_mmhg,
    diastolic_mmhg,
    spo2_percent,
    body_temp_c,
    alarm_active
FROM patient_vitals
WHERE ward = 'ICU_A'
ORDER BY patient_id, timestamp DESC;

-- Vital trends for specific patient
SELECT
    timestamp,
    heart_rate_bpm,
    systolic_mmhg,
    spo2_percent,
    respiratory_rate
FROM patient_vitals
WHERE patient_id = 'PT_0042'
ORDER BY timestamp DESC
LIMIT 100;

7. Agriculture - Smart Farming

Monitor crop conditions and irrigation systems:

// Function node: Agricultural Sensors
msg.topic = "farm_sensors";
msg.payload = {
    symbols: {
        farm_id: "FARM_VALLEY_01",
        field_id: "FIELD_A3",
        crop_type: "corn",
        sensor_node: "NODE_" + String(Math.floor(Math.random() * 50) + 1).padStart(3, "0")
    },
    columns: {
        // Soil Sensors (multiple depths)
        soil_moisture_10cm: Math.random() * 100,
        soil_moisture_30cm: Math.random() * 100,
        soil_moisture_60cm: Math.random() * 100,
        soil_temp_10cm: 18 + Math.random() * 8,
        soil_temp_30cm: 16 + Math.random() * 4,

        // Soil Chemistry
        soil_ph: { value: (6.0 + Math.random() * 1.5).toFixed(2), type: "decimal" },
        soil_ec_dsm: { value: (1.0 + Math.random() * 2.0).toFixed(2), type: "decimal" },  // Electrical Conductivity
        nitrogen_ppm: { value: Math.floor(20 + Math.random() * 40), type: "int" },
        phosphorus_ppm: { value: Math.floor(10 + Math.random() * 30), type: "int" },
        potassium_ppm: { value: Math.floor(100 + Math.random() * 100), type: "int" },

        // Weather at field level
        air_temp_c: 22 + (Math.random() - 0.5) * 10,
        humidity_percent: 50 + Math.random() * 30,
        wind_speed_ms: Math.random() * 8,
        solar_radiation_wm2: Math.random() * 1000,
        leaf_wetness_percent: Math.random() * 100,

        // Crop Health (from imaging)
        ndvi: { value: (0.3 + Math.random() * 0.5).toFixed(3), type: "decimal" },  // Vegetation Index
        chlorophyll_index: { value: (40 + Math.random() * 20).toFixed(1), type: "decimal" },
        canopy_temp_c: 20 + Math.random() * 10,

        // Growth Stage
        plant_height_cm: { value: Math.floor(50 + Math.random() * 150), type: "int" },
        growth_stage: "V12",  // Vegetative stage 12
        days_since_planting: { value: 65, type: "int" },

        // Irrigation
        irrigation_active: Math.random() > 0.7,
        water_applied_mm: { value: (Math.random() * 5).toFixed(1), type: "decimal" },
        irrigation_efficiency: 0.85 + Math.random() * 0.1,

        // Pest Pressure
        pest_trap_count: { value: Math.floor(Math.random() * 20), type: "int" },
        disease_risk_index: Math.random() * 100,

        // Battery
        battery_percent: 70 + Math.random() * 30,
        solar_panel_watts: Math.random() * 5
    },
    timestamp: Date.now()
};
return msg;

Farm Analytics:

-- Field irrigation needs
SELECT
    field_id,
    avg(soil_moisture_30cm) as avg_moisture,
    avg(ndvi) as avg_ndvi,
    max(canopy_temp_c) as max_canopy_temp
FROM farm_sensors
WHERE farm_id = 'FARM_VALLEY_01'
    AND timestamp > dateadd('h', -24, now())
GROUP BY field_id
HAVING avg_moisture < 40;

-- Growth tracking
SELECT
    timestamp,
    avg(plant_height_cm) as avg_height,
    avg(ndvi) as avg_ndvi,
    avg(chlorophyll_index) as avg_chlorophyll
FROM farm_sensors
WHERE field_id = 'FIELD_A3'
SAMPLE BY 1d;

8. Energy Grid - Smart Meter Data

Collect data from smart meters and grid equipment:

// Function node: Smart Meter Reading
msg.topic = "smart_meters";
msg.payload = {
    symbols: {
        meter_id: "MTR_" + String(Math.floor(Math.random() * 10000) + 1).padStart(6, "0"),
        utility: "PACIFIC_POWER",
        meter_type: "residential",
        tariff_plan: "time_of_use",
        feeder: "FDR_" + String(Math.floor(Math.random() * 50) + 1).padStart(3, "0")
    },
    columns: {
        // Energy Consumption
        active_energy_kwh: { value: (15234.567 + Math.random() * 10).toFixed(3), type: "decimal" },
        reactive_energy_kvarh: { value: (1234.567 + Math.random() * 5).toFixed(3), type: "decimal" },

        // Power
        active_power_kw: { value: (Math.random() * 15).toFixed(3), type: "decimal" },
        reactive_power_kvar: { value: (Math.random() * 3).toFixed(3), type: "decimal" },
        apparent_power_kva: { value: (Math.random() * 16).toFixed(3), type: "decimal" },
        power_factor: { value: (0.85 + Math.random() * 0.15).toFixed(3), type: "decimal" },

        // Voltage (per phase)
        voltage_l1: { value: (230 + (Math.random() - 0.5) * 10).toFixed(1), type: "decimal" },
        voltage_l2: { value: (230 + (Math.random() - 0.5) * 10).toFixed(1), type: "decimal" },
        voltage_l3: { value: (230 + (Math.random() - 0.5) * 10).toFixed(1), type: "decimal" },

        // Current (per phase)
        current_l1: { value: (Math.random() * 30).toFixed(2), type: "decimal" },
        current_l2: { value: (Math.random() * 30).toFixed(2), type: "decimal" },
        current_l3: { value: (Math.random() * 30).toFixed(2), type: "decimal" },

        // Frequency
        frequency_hz: { value: (50 + (Math.random() - 0.5) * 0.1).toFixed(3), type: "decimal" },

        // Harmonics (THD - Total Harmonic Distortion)
        thd_voltage_percent: Math.random() * 5,
        thd_current_percent: Math.random() * 15,

        // Power Quality Events
        sag_count: { value: Math.floor(Math.random() * 3), type: "int" },
        swell_count: { value: Math.floor(Math.random() * 2), type: "int" },
        outage_duration_sec: { value: 0, type: "int" },

        // Tamper Detection
        cover_opened: false,
        magnetic_tamper: false,
        reverse_energy_flow: Math.random() > 0.95,

        // Communication
        signal_strength_dbm: { value: Math.floor(-90 + Math.random() * 40), type: "int" },
        last_comm_success: true
    },
    timestamp: Date.now()
};
return msg;

9. Network Infrastructure - Server Monitoring

Monitor servers and network equipment:

// Function node: Server Metrics
const hostname = "srv-" + ["web", "db", "app", "cache"][Math.floor(Math.random() * 4)]
                 + "-" + String(Math.floor(Math.random() * 10) + 1).padStart(2, "0");

msg.topic = "server_metrics";
msg.payload = {
    symbols: {
        hostname: hostname,
        datacenter: "DC_WEST_1",
        rack: "RACK_" + String(Math.floor(Math.random() * 20) + 1).padStart(2, "0"),
        environment: "production",
        service: hostname.split("-")[1]
    },
    columns: {
        // CPU
        cpu_usage_percent: Math.random() * 100,
        cpu_user_percent: Math.random() * 60,
        cpu_system_percent: Math.random() * 30,
        cpu_iowait_percent: Math.random() * 20,
        cpu_cores_used: { value: Math.floor(Math.random() * 32), type: "int" },
        load_1m: Math.random() * 10,
        load_5m: Math.random() * 8,
        load_15m: Math.random() * 6,

        // Memory
        memory_total_gb: { value: 128, type: "int" },
        memory_used_gb: { value: (Math.random() * 100).toFixed(2), type: "decimal" },
        memory_cached_gb: { value: (Math.random() * 30).toFixed(2), type: "decimal" },
        memory_buffers_gb: { value: (Math.random() * 10).toFixed(2), type: "decimal" },
        swap_used_mb: { value: Math.floor(Math.random() * 1000), type: "int" },

        // Disk
        disk_read_mbps: Math.random() * 500,
        disk_write_mbps: Math.random() * 300,
        disk_iops: { value: Math.floor(Math.random() * 10000), type: "int" },
        disk_latency_ms: Math.random() * 10,
        disk_used_percent: 30 + Math.random() * 50,

        // Network
        network_rx_mbps: Math.random() * 1000,
        network_tx_mbps: Math.random() * 800,
        network_packets_rx: { value: Math.floor(Math.random() * 100000), type: "long" },
        network_packets_tx: { value: Math.floor(Math.random() * 80000), type: "long" },
        network_errors: { value: Math.floor(Math.random() * 10), type: "int" },
        tcp_connections: { value: Math.floor(Math.random() * 5000), type: "int" },

        // Process
        process_count: { value: Math.floor(200 + Math.random() * 300), type: "int" },
        thread_count: { value: Math.floor(500 + Math.random() * 1000), type: "int" },
        zombie_count: { value: Math.floor(Math.random() * 3), type: "int" },

        // File Descriptors
        fd_used: { value: Math.floor(Math.random() * 10000), type: "int" },
        fd_max: { value: 65535, type: "int" },

        // Temperature
        cpu_temp_c: { value: Math.floor(45 + Math.random() * 30), type: "int" },

        // Uptime
        uptime_seconds: { value: Math.floor(Math.random() * 10000000), type: "long" }
    },
    timestamp: Date.now()
};
return msg;

Infrastructure Queries:

-- Resource utilization dashboard
SELECT
    hostname,
    service,
    cpu_usage_percent,
    memory_used_gb * 100 / memory_total_gb as memory_percent,
    disk_used_percent,
    network_rx_mbps + network_tx_mbps as network_total_mbps
FROM server_metrics
WHERE environment = 'production'
ORDER BY timestamp DESC
LIMIT 100;

-- Service health aggregation
SELECT
    service,
    count(DISTINCT hostname) as host_count,
    avg(cpu_usage_percent) as avg_cpu,
    avg(memory_used_gb) as avg_memory_gb,
    sum(network_errors) as total_errors
FROM server_metrics
WHERE timestamp > dateadd('m', -5, now())
GROUP BY service;

10. E-commerce - Real-time Analytics

Track user behavior and transactions:

// Function node: E-commerce Events
const eventTypes = ["page_view", "add_to_cart", "purchase", "search", "wishlist"];
const eventType = eventTypes[Math.floor(Math.random() * eventTypes.length)];

msg.topic = "ecommerce_events";
msg.payload = {
    symbols: {
        event_type: eventType,
        session_id: "SES_" + Math.random().toString(36).substring(2, 15),
        user_id: Math.random() > 0.3 ? "USR_" + String(Math.floor(Math.random() * 10000)).padStart(6, "0") : "anonymous",
        device_type: ["desktop", "mobile", "tablet"][Math.floor(Math.random() * 3)],
        country: ["US", "UK", "DE", "FR", "JP"][Math.floor(Math.random() * 5)],
        channel: ["organic", "paid", "email", "social", "direct"][Math.floor(Math.random() * 5)]
    },
    columns: {
        // Page/Product Info
        page_url: "/products/category/item-" + Math.floor(Math.random() * 1000),
        product_id: "PROD_" + String(Math.floor(Math.random() * 5000)).padStart(5, "0"),
        category: ["electronics", "clothing", "home", "sports"][Math.floor(Math.random() * 4)],

        // Pricing (decimal for accuracy)
        product_price: { value: (Math.random() * 500 + 10).toFixed(2), type: "decimal" },
        discount_percent: { value: Math.floor(Math.random() * 30), type: "int" },
        cart_value: { value: (Math.random() * 1000).toFixed(2), type: "decimal" },

        // Quantities
        quantity: { value: Math.floor(Math.random() * 5) + 1, type: "int" },
        cart_items: { value: Math.floor(Math.random() * 10), type: "int" },

        // Transaction (for purchase events)
        order_id: eventType === "purchase" ? "ORD_" + Date.now() : null,
        payment_method: eventType === "purchase" ? ["card", "paypal", "apple_pay"][Math.floor(Math.random() * 3)] : null,

        // User Behavior
        time_on_page_sec: { value: Math.floor(Math.random() * 300), type: "int" },
        scroll_depth_percent: Math.floor(Math.random() * 100),
        click_count: { value: Math.floor(Math.random() * 20), type: "int" },

        // Search (for search events)
        search_query: eventType === "search" ? ["shoes", "laptop", "jacket", "headphones"][Math.floor(Math.random() * 4)] : null,
        search_results_count: eventType === "search" ? { value: Math.floor(Math.random() * 500), type: "int" } : null,

        // Technical
        page_load_ms: { value: Math.floor(500 + Math.random() * 2000), type: "int" },
        browser: ["Chrome", "Firefox", "Safari", "Edge"][Math.floor(Math.random() * 4)],

        // Attribution
        referrer: ["google", "facebook", "direct", "email"][Math.floor(Math.random() * 4)],
        campaign_id: Math.random() > 0.5 ? "CAMP_" + Math.floor(Math.random() * 100) : null
    },
    timestamp: Date.now()
};
return msg;

E-commerce Analytics:

-- Real-time conversion funnel
SELECT
    event_type,
    count(*) as event_count,
    count(DISTINCT session_id) as unique_sessions
FROM ecommerce_events
WHERE timestamp > dateadd('h', -1, now())
GROUP BY event_type;

-- Revenue by channel
SELECT
    channel,
    country,
    sum(cart_value) as total_revenue,
    count(*) as purchase_count
FROM ecommerce_events
WHERE event_type = 'purchase'
    AND timestamp > dateadd('d', -1, now())
GROUP BY channel, country
ORDER BY total_revenue DESC;

Data Type Reference

JavaScript Value Auto-Detection Explicit Type
42 float { value: 42, type: "int" }
42.5 float { value: 42.5, type: "double" }
9876543210 float { value: 9876543210, type: "long" }
"123.45" string { value: "123.45", type: "decimal" }
"hello" string { value: "hello", type: "string" }
true boolean { value: true, type: "boolean" }
[1.1, 2.2] array (double) { value: [...], type: "array", elementType: "double" }
Date.now() timestamp { value: Date.now(), type: "timestamp" }

Best Practices

Symbol Selection

Use symbols (tags) for columns that will be frequently filtered:

  • Device/sensor identifiers
  • Geographic regions
  • Service names
  • Environment (prod/dev/staging)

Symbols are indexed and provide fast filtering performance.

Timestamp Handling

  • Use Date.now() for millisecond precision
  • Omit timestamp to use QuestDB server time
  • ISO strings are auto-converted: "2024-01-15T10:30:00Z"

Array Columns

Ideal for:

  • High-frequency sensor samples
  • FFT/spectral data
  • Batch readings from single collection window

Decimal Type

Use for financial data requiring exact precision:

price: { value: "123.456789", type: "decimal" }

Performance Tips

  1. Batch related data into single rows when possible
  2. Use appropriate flush intervals for your write pattern
  3. Leverage symbols for frequently queried dimensions
  4. Use arrays for high-frequency sampled data

QuestDB Setup

Using Docker

docker run -p 9000:9000 -p 9009:9009 questdb/questdb

Connection String Format

The node uses QuestDB's connection string format internally:

http::addr=localhost:9000;auto_flush_rows=75000;auto_flush_interval=1000;

Compatibility

  • Node-RED: >= 2.0.0
  • Node.js: >= 14.0.0
  • QuestDB: >= 6.0 (recommended: latest)

Troubleshooting

Connection Issues

  1. Verify QuestDB is running: curl http://localhost:9000
  2. Check firewall settings for ports 9000/9009
  3. For HTTPS/TCPS, ensure certificates are properly configured

Data Not Appearing

  1. Check the node status indicator (green = connected)
  2. Verify table creation in QuestDB console
  3. Enable debug output to see write confirmations

Performance Tips

  1. Use symbols for frequently queried columns (they're indexed)
  2. Batch writes when possible using arrays
  3. Adjust auto-flush settings based on your write patterns

License

MIT

Author

Holger Amort

Links

Node Info

Version: 0.6.1
Updated 5 days ago
License: MIT
Rating: not yet rated

Categories

Actions

Rate:

Downloads

147 in the last week

Nodes

  • questdb-config
  • questdb
  • questdb-mapper
  • questdb-type-router

Keywords

  • node-red
  • questdb
  • time-series
  • database
  • ilp
  • influx-line-protocol
  • iot
  • industrial
  • monitoring
  • telemetry
  • metrics
  • sensors
  • analytics
  • mqtt
  • modbus
  • scada
  • manufacturing
  • smart-building
  • hvac
  • energy
  • agriculture
  • fleet-tracking
  • telematics
  • healthcare
  • vitals
  • weather
  • smart-meter
  • observability
  • data-ingestion

Maintainers