Skip to content

feat: Add capability for loaders to natively store and expose target state #9719

@edgarrmondragon

Description

@edgarrmondragon

Summary

Add a new capability for loader plugins to natively store state in the target system and expose it to Meltano. This would enable Meltano to retrieve state from loaders at the beginning of a run (similar to state backends) and allow loaders to continuously update state during the sync.

Background

Currently, state management in Meltano pipelines relies on state backends or the BookmarkWriter class (see src/meltano/core/plugin/singer/target.py). State messages from targets are captured and persisted externally. This proposal suggests allowing loaders to natively store state in the target system itself and expose a capability for Meltano to retrieve that state.

Proposed Solution

Loaders would expose a new capability (tentatively named store-target, target-state, or similar) that indicates they can:

  1. Retrieve state: At the beginning of a run, dump the current state from the target system
  2. Store state: During the sync, continuously update state in the target system as state messages are received

Meltano would:

  1. Detect this capability in the loader plugin
  2. At the beginning of a run, query the loader for existing state (similar to state backend retrieval)
  3. Pass this state to the tap for incremental extraction
  4. During the sync, the loader receives and stores state messages directly in the target system
  5. On the next run, retrieve the updated state from the target

Benefits

  • Simplified architecture: State lives where the data lives (in the target system)
  • Improved reliability: State is atomically stored with data in the target
  • Better accuracy: State reflects what was actually loaded into the target
  • Reduced dependencies: Less reliance on separate state backend systems
  • Natural disaster recovery: State survives as long as the target data survives

Architecture

Current Flow

sequenceDiagram
    participant Meltano
    participant StateBackend
    participant Tap
    participant Target
    
    Meltano->>StateBackend: Read previous state
    StateBackend->>Meltano: Return state
    Meltano->>Tap: Start with state
    Tap->>Target: Stream records + STATE messages
    Target->>Meltano: Echo STATE messages (stdout)
    Meltano->>StateBackend: Save STATE continuously
    Note over StateBackend: BookmarkWriter persists<br/>state during sync
Loading

Proposed Flow with store-target Capability

sequenceDiagram
    participant Meltano
    participant Loader
    participant TargetSystem
    participant Tap
    
    Note over Meltano: Beginning of run
    Meltano->>Loader: Query state (via capability)
    Loader->>TargetSystem: Retrieve stored state
    TargetSystem->>Loader: Return state
    Loader->>Meltano: Dump state
    Meltano->>Tap: Start with state
    
    Note over Meltano: During sync
    Tap->>Loader: Stream records + STATE messages
    Loader->>TargetSystem: Store records + STATE continuously
    Note over TargetSystem: State persisted<br/>with data
    
    Note over Meltano: Next run
    Meltano->>Loader: Query state again
    Note over Loader: Cycle repeats
Loading

State Storage Architecture

graph TB
    subgraph "Run N"
        A[Meltano] -->|1. Query state| B[Loader Plugin]
        B -->|2. Retrieve| C[Target System State Table]
        C -->|3. Return state| B
        B -->|4. Dump state| A
        A -->|5. Pass state| D[Tap]
        D -->|6. Records + STATE| B
        B -->|7. Store continuously| C
    end
    
    subgraph "Run N+1"
        A2[Meltano] -->|1. Query state| B2[Loader Plugin]
        B2 -->|2. Retrieve updated| C
        C -->|3. Return new state| B2
    end
    
    C -.->|State persists| C
Loading

State Update Pattern During Sync

sequenceDiagram
    participant Tap
    participant Loader
    participant TargetDB
    
    Note over Tap,TargetDB: Continuous state updates during sync
    
    Tap->>Loader: RECORD (stream1, id=1)
    Tap->>Loader: RECORD (stream1, id=2)
    Tap->>Loader: STATE (stream1: bookmark=ts2)
    Loader->>TargetDB: UPDATE _meltano_state<br/>SET state = {stream1: ts2}
    
    Tap->>Loader: RECORD (stream2, id=5)
    Tap->>Loader: STATE (stream2: bookmark=ts5)
    Loader->>TargetDB: UPDATE _meltano_state<br/>SET state = {stream1: ts2, stream2: ts5}
    
    Note over TargetDB: State updated atomically<br/>with data commits
Loading

Implementation Considerations

Capability Declaration

Loaders would declare the capability in their plugin definition:

capabilities:
  - store-target  # or target-state, native-state, etc.

State Retrieval Interface

At the beginning of a run, Meltano would query the loader for state. Potential approaches:

meltano invoke target-postgres --dump-state <state ID>
# Output: {"bookmarks": {"users": {"updated_at": "2024-01-01T00:00:00Z"}}}

State Storage in Target System

The actual storage mechanism would be loader-specific:

Database loaders: Dedicated state table

CREATE TABLE _singer_state (
    id VARCHAR PRIMARY KEY DEFAULT 'default',
    state JSONB NOT NULL,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

File-based loaders: Metadata file

/data/output.parquet
/data/.meltano_state.json

Cloud storage loaders: Object metadata or dedicated state object

s3://bucket/data/table.parquet
s3://bucket/data/.meltano/state.json

State Update During Sync

Similar to current BookmarkWriter behavior, but handled by the loader:

  • Loader receives STATE messages on stdin (Singer protocol)
  • Loader persists state to target system continuously
  • State updates are atomic with data commits
  • No need to echo STATE to stdout for Meltano to capture

Open Questions

  1. Backward compatibility:

    • How do we handle loaders without this capability?
    • Should we support both state backends and native storage simultaneously?
  2. Multi-tap scenarios:

    • If multiple taps load to the same target, how do we namespace state?
    • Per-tap state tables? State key prefixes?
  3. State initialization:

    • What happens on first run when no state exists?
    • Should loaders return empty state or error?
  4. State migration:

    • How do we migrate from state backends to native storage?
    • Should there be a migration tool/command?
  5. Protocol standardization:

    • Should this be a Singer spec extension?
    • Or Meltano-specific convention?
  6. Naming: What should this capability be called?

    • store-target
    • target-state
    • native-state
    • state-storage

Comparison with Current Approach

Aspect Current (StateBackend) Proposed (Native Storage)
State location System DB / external Target system
State retrieval StateBackend query Loader query
State persistence BookmarkWriter Loader native
Disaster recovery Separate backup Included with target backup
Multi-environment Shared state backend State in each target
Complexity Meltano manages Loader manages

Related Work

  • Current implementation: src/meltano/core/plugin/singer/target.py (BookmarkWriter)
  • Singer spec state messages
  • Meltano state backend implementations
  • Target plugin capabilities system

Next Steps

  1. Gather community feedback on the approach
  2. Finalize capability naming and state retrieval protocol
  3. Create a proof-of-concept implementation with a popular loader (e.g., target-postgres)
  4. Update Meltano SDK documentation for loader developers
  5. Implement capability detection and state querying in Meltano core
  6. Consider migration path from state backends

Note: This is a proposal for discussion. Feedback, alternative approaches, and use case insights are welcome!

Metadata

Metadata

Assignees

No one assigned

    Labels

    State ManagementState backends, the 'meltano state' command and other state management topicsneeds refinement

    Type

    No fields configured for Feat.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions