@inteli.city/node-red-contrib-db-collection 1.1.3

Unified Node-RED database collection: async PostgreSQL query, async Oracle query, table copy, and same-engine table clone nodes

npm install @inteli.city/node-red-contrib-db-collection

node-red-contrib-db-collection

A set of Node-RED nodes for working with PostgreSQL and Oracle databases — run queries, copy table rows between databases, and clone tables or whole schemas. Oracle support uses oracledb in Thin mode (no Instant Client needed).


Contents


Nodes

Node Purpose
async.pg Run a PostgreSQL query for each incoming message
async.oracle Run an Oracle query for each incoming message
copy.db Copy rows from one existing table to another
clone.db Clone a table or whole schema (structure, optionally with data)
import.pg Import external data (flow message, local CSV, or object-storage bucket) into a PostgreSQL table
config.pg PostgreSQL connection (host, port, database, user, password, SSL)
config.oracle Oracle connection (connect string, user, password, optional wallet)
config.s3 Bucket / object-storage connection — S3-compatible (AWS S3, MinIO, …) or Google Cloud Storage

config.pg and config.oracle are connection nodes shared by every node above.


Supported databases

  • PostgreSQL
  • Oracle (12c+ for OFFSET/FETCH pagination)

A schema means different things in the two engines, and several nodes treat them differently:

  • In PostgreSQL a schema is a namespace inside a database. It is optional in many places — when blank, public is used.
  • In Oracle a schema is a user (the owner). It must be specified explicitly; there is no default.

This is why some features (like clone.db's "Create if missing") are PostgreSQL-only.


async.pg

Runs one PostgreSQL statement for each incoming message with an explicit FIFO queue and a configurable concurrency limit.

Providing the SQL

The SQL is configured on the node. It can be static text, or a Nunjucks template — the template is rendered against the incoming msg object into a SQL string, and that string is what gets executed.

SELECT *
FROM {{ payload.tableName }}
WHERE region = '{{ payload.region }}'

Anything reachable from msg is available to the template.

Render only trusted values. Nunjucks generates SQL text — {{ ... }} substitutions are concatenated into the statement before the driver sees it, not bound as separate parameters. Validate or constrain inputs upstream; do not interpolate untrusted user input directly into a SQL template.

Output

msg.payload is an array of row objects (column names as keys). For non-SELECT statements it is an empty array.

Concurrency and queue

The node has a Queue setting that bounds both the maximum number of simultaneous queries and the size of the connection pool — there is no hidden extra capacity. Excess messages wait FIFO and are dispatched as slots free up. The node status shows queued (executing/max) while busy.

Cancel

The cancel button on the editor stops all in-flight queries and drops all queued messages. Sending msg.stop = true does the same from a flow. The node stays active and processes new messages after cancellation.

Errors

Execution errors are reported via Node-RED's standard node.error(err, msg) — the original message is attached so a Catch node can act on it. The node continues processing subsequent messages. Cancelled queries are not reported as errors.


async.oracle

Runs one Oracle statement for each incoming message with the same FIFO queue, concurrency model, and dynamic-SQL behavior as async.pg.

Providing the SQL

Static SQL or a Nunjucks template, rendered against the incoming msg:

SELECT *
FROM {{ payload.tableName }}
WHERE region = '{{ payload.region }}'

The same safety note applies: templates produce SQL text. Render only trusted or pre-validated values.

Output

  • msg.payload — array of row objects (column names as keys), empty for DML/DDL.
  • msg.rowsAffected — number of rows affected for INSERT/UPDATE/DELETE; undefined for SELECT.

Engine specifics

Identifiers are upper-cased and unquoted by Oracle, and pagination uses OFFSET … FETCH NEXT … ROWS ONLY.

Queue, cancel, errors

Same as async.pg: bounded concurrency tied to the Oracle pool size, FIFO queue, cancel button / msg.stop, and node.error(err, msg) reporting.


copy.db

Copies rows from a source table to a destination table. Both tables must already exist — copy.db does not create tables.

Modes

  • Append — Insert rows into the destination. Existing destination rows are kept. Inserts that collide with existing primary keys will fail at execution time.
  • Overwrite — Truncate the destination, then insert all source rows. Destructive: destination rows are removed before any new rows are written. If the copy fails after truncation, the destination may be left empty or partial.
  • Incremental — Insert only the source rows whose primary key is not already present in the destination. Existing rows are not updated. Both source and destination must have a primary key, and the primary key columns must match on both sides.

Column matching

Columns are matched by name (case-insensitive). Manual column mapping is not supported.

  • Source-only columns (in source, not in destination): handled by the Extra cols policy — either Fail or Ignore extra source columns (the matched columns are still copied).
  • Destination-only columns (in destination, not in source): receive the destination's default value or NULL on insert. Columns marked NOT NULL without a default will cause the insert to fail.

One job at a time

copy.db runs at most one copy job per node instance. Additional incoming messages are queued FIFO and processed one after another.

Status

The node status shows the current phase, the source/destination it is working on, progress when available, and a +N queued suffix when other jobs are waiting:

copying data: users → users_backup · 12,400 rows · +2 queued

Phases include idle, queued, validating, copying data, done, failed, cancelled.

Preview

The editor has a Preview button (between the Copy and Execution sections). Clicking it validates the current — possibly unsaved — configuration against the live source and destination catalogs and reports:

  • Whether the copy is ready to run, or has blocking errors
  • Source and destination engine, schema, and table
  • The selected mode
  • Column-match summary (matched / source-only / destination-only counts and lists)
  • Mode-specific actions and warnings (e.g. Overwrite is destructive)
  • Blocking errors with a short reason

Preview never copies, inserts, truncates, deletes, updates rows, or enqueues a job. It only reads catalog tables. Preview is advisory — the runtime re-validates everything before executing, so if the database changes between Preview and Run, the runtime checks are authoritative.

Cancel

The cancel button (or msg.stop = true) stops the active copy and clears the queue. Cancellation may leave the destination in a partial state, especially in Overwrite mode.


clone.db

Clones table or whole-schema structure from a source to a destination, optionally followed by a row copy. Source and destination must use the same engine (no cross-engine clone).

Scope

  • Table — One source table → one destination table. Names can differ.
  • Schema — Every supported source-schema table → one destination schema. Table names are not remapped.

Clone mode

  • Structure only — Create the destination table(s) and the supported per-table objects (see below). No rows copied.
  • Structure and data — Create structure first, then copy rows in batches, then re-align identity sequences. For schema-scope, all tables are created before any data is copied.

What is cloned

  • Columns
  • Primary key
  • Unique constraints
  • Check constraints
  • Indexes
  • Identity sequences (re-aligned after a data copy)

What is NOT cloned (v1)

  • Views
  • Triggers
  • Foreign keys
  • Grants and privileges
  • Stored routines (functions, procedures, packages)

PostgreSQL vs Oracle

  • Schema default: PostgreSQL uses public when the source schema is left blank (table scope only). Oracle requires an explicit owner — there is no default.
  • Identifier casing: PostgreSQL identifiers are quoted as written (embedded " is doubled). Oracle identifiers are upper-cased and unquoted.
  • Source snapshot during a data copy: PostgreSQL opens a REPEATABLE READ READ ONLY transaction (snapshot is guaranteed; the copy fails hard if the snapshot can't be taken). Oracle attempts SET TRANSACTION READ ONLY; if the database refuses, the copy continues with sourceConsistency: "not_guaranteed".

Create if missing

If the destination schema does not exist:

  • PostgreSQL: the Create if missing option will issue CREATE SCHEMA before any other DDL.
  • Oracle: not supported — schemas are users, and creating a user requires DBA privileges. The destination user/schema must exist already; otherwise the clone fails clearly.

The option is hidden in the editor when the destination engine is Oracle, and is rejected by the runtime even if it leaks in via a stored config or msg override.

Preview

The editor has a Preview button. Clicking it validates the current configuration and shows what the clone would do — the planned steps, warnings, blocking errors, and (for schema scope) the list of tables that would be cloned. Preview never executes any DDL or data copy. Like copy.db, Preview is advisory and the runtime re-validates everything before executing.

One job at a time

clone.db runs at most one clone job per node instance. Additional incoming messages are queued FIFO. The status pill shows the current phase and target, plus a +N queued suffix when other jobs are waiting:

copying data: public → backup · users 12,400 rows · 5/12 · +1 queued

Cancel

The cancel button (or msg.stop = true) stops the active clone and clears the queue. The clone is fail-fast: destinations must not already contain the target tables, and unsupported source objects abort the clone with a clear error.


import.pg

Imports external data into an existing PostgreSQL table. PostgreSQL only — there is no Oracle counterpart.

Sources

Three source types, selectable on the node:

  • Flow message — rows are read from a configured message field (default msg.payload) as an Array of plain Objects. Each object is one row; object keys are column names. Nested or non-object payloads are rejected.
  • Local CSV file — an absolute path to a CSV file with a header row. UTF-8 (default) or Latin-1 encoding. Quoted fields, embedded commas, embedded newlines, and "" escapes are supported. The path on the node is optional: if left blank, the path is taken from msg.filename at runtime. msg.filename may be a single absolute path (string) or an array of absolute paths, in which case all files are imported as one atomic transaction into the same destination table — either every file commits or nothing does (see Multi-file imports below). Each file must have a header row; columns are matched per file. All paths must be absolute and existing.
  • Object storage bucket — a CSV object in any S3-compatible bucket (AWS S3, MinIO, Oracle Object Storage) or Google Cloud Storage. The connection lives in a reusable config.s3 node which switches between providers internally: S3-compatible uses endpoint + access/secret key + region + SSL/path-style, while GCS uses a service-account JSON. On the import.pg node you only pick the connection and specify the per-import bucket, object key, and encoding. Bucket and Object key are both optional on the node — when blank, they fall back to msg.bucket and msg.objectName at runtime (matching node-red-contrib-bucket-collection's field names so the same upstream nodes can drive both packages without renaming). Each field falls back independently: you can hard-code the bucket and pass the object key from msg, or vice versa. The object body is streamed end-to-end into PostgreSQL — the file is never fully buffered in memory and is never written to disk. Credentials are stored as Node-RED credentials (encrypted at rest via Node-RED's credential store) and are never sent in HTTP responses or log output.

Other source formats (JSON, multiple files, glob patterns) are not supported in v1.

Destination

  • A config.pg connection, plus a schema (optional, defaults to public) and table.
  • The destination table must already existimport.pg does not create tables.

Column matching

  • Columns are matched by name (case-insensitive). No manual mapping.
  • Source columns missing in the destination are governed by an Extra cols policy: either Fail or Ignore extra source columns.
  • Destination columns missing in the source must be nullable or have a default; otherwise the import fails up front with a clear error.

Mode

  • Append (default) — insert-only. Existing destination rows are not modified. Inserts that collide with existing primary keys will fail at execution time.
  • Upsert (Flow message source only) — insert new rows and update existing rows on conflict, in a single PostgreSQL transaction. See Upsert (msg source only) below. Overwrite and incremental are not supported.

Upsert (msg source only)

Available only when the source is "Flow message" — the editor hides the Mode field for CSV and S3 sources, and the runtime + preview reject mode: "upsert" for those.

  • Conflict columns — comma-separated list of destination column names. If left blank, auto-detected from the destination's primary key. If the destination has no primary key, the import fails up front asking for an explicit list.
  • Constraint check — the conflict column set must match an existing primary key or UNIQUE constraint on the destination. Validated at preview and runtime so PostgreSQL doesn't raise "no unique or exclusion constraint matching the ON CONFLICT specification" mid-INSERT.
  • Update set (v1) — every matched source column except the conflict columns is overwritten on conflict via EXCLUDED.col. There is no UI for selecting which columns get updated; if the only matched columns are the conflict columns the import errors out (use Append mode instead).
  • Payload validationmsg.payload (or the configured field) must be an array of plain objects. Each object must contain every conflict column. Nested arrays/objects in values are rejected (v1 supports flat values only; Date and Buffer are exempt).
  • Duplicate keys inside one payload — allowed. PostgreSQL resolves them: the last row wins per conflict key. The node does no client-side deduplication.
  • Transactional — one BEGIN → batched INSERT ... ON CONFLICT (cols) DO UPDATE SET ... RETURNING (xmax = 0) AS insertedCOMMIT. Any error or cancellation triggers ROLLBACK. Insert vs update counts come from the xmax returning trick — xmax = 0 means a fresh insert.

Output payload for upsert:

{
  "mode": "upsert",
  "committed": true,
  "rolledBack": false,
  "totalRowsProcessed": 1200,
  "totalRowsInserted": 940,
  "totalRowsUpdated": 260,
  "conflictCols": ["id"],
  "source": "msg.payload",
  "destination": "public.users",
  "rowsImported": 1200
}

On rollback the same shape is set on the failed msg.payload (Catch-node territory) with committed: false, rolledBack: true, totalRowsInserted: 0, totalRowsUpdated: 0, and failedPhase: "validation" | "execution". Append mode keeps its existing { rowsImported, source, destination } payload — Upsert opts in to the new shape.

Multi-file imports (CSV only)

When msg.filename is an array (or even a single CSV file), the import runs as a single logical operation:

Preflight (before any SQL writes). For every file:

  • Path is non-empty, absolute, and the file exists.
  • Header is read once and validated: not empty, no empty column names, no duplicate column names (case-insensitive).
  • Per-file column matching against the destination is run with the configured Extra cols policy and the destination NOT NULL / no-default rule. A failure on any file rejects the whole job before BEGIN.
  • Optional Require same row count across files (CSV section, off by default): when on and msg.filename is an array, every file is scanned end-to-end during preflight and the data-row counts are compared. Any mismatch is reported with per-file counts and the whole job is rejected. Off by default because it requires reading each file once before the import starts.

Transactional execution. Once preflight passes, the import opens a single PostgreSQL transaction:

  • BEGIN, then each file is streamed and inserted in order.
  • All files succeed → COMMIT. Any error (or msg.stop cancellation) → ROLLBACK. The destination never sees a partial multi-file import.

Output payload. Multi-file imports return a structured summary:

{
  "committed": true,
  "rolledBack": false,
  "totalRowsImported": 12345,
  "files": [
    { "path": "/abs/a.csv", "rowsImported": 4000, "status": "success" },
    { "path": "/abs/b.csv", "rowsImported": 8345, "status": "success" }
  ],
  "source": "2 files (a.csv, b.csv)",
  "destination": "public.users",
  "rowsImported": 12345
}

On rollback the same shape is returned (on msg.payload of the failed message — Catch-node territory) with committed: false, rolledBack: true, totalRowsImported: 0, failedFile, and failedPhase: "preflight" | "import". Files validated but never imported are marked "skipped"; the file that triggered the failure is "failed".

Preview

The editor has a Preview button. It validates the source description and inspects the destination catalog, then reports:

  • Source description (for CSV: the file path, encoding, and header column count — or a deferred-validation note when the path is left blank and will come from msg.filename; for S3/MinIO: bucket/key, encoding, header column count, and object size; for msg: a deferred-validation note).
  • Matched / source-only / destination-only column counts (for CSV and S3/MinIO — msg defers column comparison to runtime).
  • Blocking errors and warnings.
  • Action summary ("insert rows into …").

Preview never inserts rows, modifies the database, or enqueues a job. It opens a short-lived connection for catalog reads only. For object-storage sources (S3-compatible or GCS) it reads only enough bytes from the object stream to extract the header row, then destroys the stream — the full object is never downloaded during preview. Credentials are read server-side from the deployed config.s3 connection and are never round-tripped through the editor.

Execution and concurrency

  • Messages queue FIFO with bounded concurrency. The Queue field on the node (maxConcurrency, default 1) controls how many imports run in parallel; surplus messages wait their turn. This mirrors async.pg's queue surface — set Queue > 1 to allow concurrent imports against the same destination. Each concurrent import gets its own pooled connection and (for CSV) its own transaction, so one rollback never affects another.
  • For msg and S3 sources, output msg.payload is { rowsImported, source, destination }. For CSV sources (single or multi-file) the payload includes committed / rolledBack / totalRowsImported / files[] plus failedFile / failedPhase on rollback (see Multi-file imports). rowsImported is preserved as a top-level alias.

Status

Format mirrors async.pg: {waiting} ({executing}/{maxConcurrency}) — e.g. 2 (1/1) means one import is running and two are waiting on a Queue=1 node. Blue ring while executing, grey dot when idle. Throttled to one update per second. Cancellation via the stop button or msg.stop = true interrupts every active import on this node (CSV imports ROLLBACK; msg / S3 sources stop after the current batch) and drains the queue.

Errors

Fail-fast on: source not found, CSV invalid or unreadable, bucket/credentials/object problems, destination table missing, no matching columns, source columns missing in destination (when policy is fail), or destination NOT-NULL columns without defaults that the source does not provide. Common provider error codes (NoSuchBucket, NoSuchKey, AccessDenied, InvalidAccessKeyId, SignatureDoesNotMatch) are translated to clear messages — GCS errors are normalized into the same set so the messaging is uniform across providers.


Safety and behavior

  • Preview is advisory. The runtime re-validates everything before executing. If the database changes between Preview and Run, the runtime is authoritative.
  • Destructive operations are explicit. copy.db's Overwrite truncates the destination before copying. clone.db rejects pre-existing destination tables — it does not silently overwrite them.
  • One job at a time. Both copy.db and clone.db process one job per node instance and queue additional messages FIFO. The queue count is shown in the status as +N queued.
  • Cancellation may leave partial state. Stopping a copy or clone mid-run does not roll back rows already inserted or DDL already issued. Overwrite in particular can leave the destination empty if the copy was interrupted after truncation.
  • Oracle schema creation is not automated. Oracle schemas are users — create the destination user/schema yourself before running a clone.

Installation

npm install @inteli.city/node-red-contrib-db-collection

Or install via Node-RED's Palette Manager: Manage palette → Install and search for @inteli.city/node-red-contrib-db-collection.


Development

Run the test suite:

npm test

The harness is dependency-free and exits non-zero on the first failure.


For repository structure and implementation details, see CLAUDE.md.

Node Info

Version: 1.1.3
Updated 1 week ago
License: Apache-2.0
Rating: not yet rated

Categories

Actions

Rate:

Downloads

735 in the last week

Nodes

  • config.pg
  • config.oracle
  • copy.db
  • clone.db
  • async.pg
  • async.oracle
  • config.s3
  • import.pg

Keywords

  • node-red
  • postgresql
  • postgres
  • oracle
  • copy
  • clone
  • database
  • async
  • query