Skip to content

RFC: one metrics pipeline — sample→patch→persist behind an engine, all reads behind a standalone reader #50

@caezium

Description

@caezium

Problem

The snapshot pipeline — sample → patch → validate → persist → query — is smeared across seven files, and its one schema is co-owned by four of them:

  • Sampler spawns mo status --json on a utility queue, hands the JSON to LocalMetrics.patched() (which mutates the parsed dictionary to fill Apple-Silicon gaps from IOKit/SMC), decodes to validate, then inserts the string into SQLite — and also caches lastSnapshot in memory, which the UI reads across threads with no synchronization.
  • SnapshotStore re-decodes that JSON per row on every chart redraw, silently skipping malformed rows — so when mo's schema drifts, charts just go blank with no visible cause.
  • IOMonitor is a second, parallel sampler: a @mainactor singleton reading net/disk at 1 Hz into an in-memory ring that never reaches the DB (QueryServer and MCP can't see it), duplicating the IOBlockStorageDriver walker that already exists in LocalMetrics.
  • Retention is split three ways: Store (config), Maintenance (hourly timing), DB (delete mechanics).
  • Consumers choose between three inconsistent read paths (sampler.lastSnapshot, SnapshotStore.range, IOMonitor.shared), and the per-process aggregation loop (peaks/averages/CPU-time) is re-implemented in HistoryLoader, MCP.callTopProcesses, MCP.callProcessUsage, and Explain.
  • The --mcp process opens the same DB file; today nothing distinguishes its read-only role, so a reader could run the destructive corruption-recovery ladder against the writer's live WAL.

The patching step is the riskiest untested seam in the app: a silent schema drift means rows decode-fail, SnapshotStore skips them, and the dashboard quietly empties.

Proposed Interface

Two entry points with a hard topology split — an engine that only the GUI process constructs, and a standalone-openable reader shared by both processes. (DB.swift survives unchanged as the hidden SQLite engine; MoleStatus stays the schema vocabulary; on-disk format (prefix, ts, json) is unchanged so old/new binaries interoperate.)

@MainActor
public final class Metrics {                      // GUI process only — owns the whole write side
    public init(configuration: Configuration = .standard) throws
    public func start()                           // sampler + 1 Hz live feed + retention janitor
    public func stop()
    public func setLiveCadence(_ enabled: Bool)   // foreground cadence survives (min(liveInterval, configured))
    @discardableResult public func sampleNow() -> Result<Snapshot, SampleFailure>   // deterministic tick
    @discardableResult public func runMaintenanceNow() -> MaintenanceReport

    public let reader: MetricsReader              // the query surface (below)
    public let live: LiveFeed                     // @MainActor ObservableObject — absorbs IOMonitor
    public var health: PipelineHealth { get }     // freshness, consecutive failures, drift counters

    public struct Configuration {                 // every impure edge, injected; .standard wires production
        public var statusSource: @Sendable () throws -> String      // default: engine capture of `mo status --json` (RFC #48)
        public var native: any NativeMetricsProviding               // ONE IOKit/SMC provider (dedupes the walker)
        public var databaseURL: URL
        public var policy: @Sendable () -> Policy                   // re-read each tick; Settings apply within a cycle
        public var clock: @Sendable () -> Date
    }
    public struct Policy: Equatable, Sendable {   // ALL cadence + retention knobs in one value
        public var sampleInterval: TimeInterval
        public var liveSampleInterval: TimeInterval
        public var retention: TimeInterval
        public var autoVacuum: Bool
        public var maintenanceInterval: TimeInterval
        public static var standard: Policy { get }                  // the one Store→Policy mapping
    }
}

@MainActor
public final class LiveFeed: ObservableObject {   // replaces IOMonitor.shared AND cross-thread lastSnapshot
    @Published public private(set) var snapshot: MoleStatus?        // latest decoded, main-actor published
    @Published public private(set) var rates: Rates                 // 1 Hz net/disk
    public private(set) var ring: [RateSample]                      // ≤1 h, never persisted
}

public struct MetricsReader: Sendable {           // both processes; sync; thread-safe (WAL + FULLMUTEX)
    /// Standalone handle for `burrow --mcp`: same file, NO destructive
    /// recovery ladder (a reader must never quarantine the writer's live WAL).
    public static func openDefault() throws -> MetricsReader
    public static func open(at url: URL) throws -> MetricsReader

    public func latest() -> Snapshot?             // falls back through up to 5 rows on drift — HUD never blanks silently
    public func snapshots(in window: Window, maxPoints: Int = 720) -> SnapshotSlice   // skipped rows COUNTED
    public func series(of metrics: [Metric], in window: Window, maxPoints: Int = 720) -> SeriesBundle  // ONE decode pass
    public func processUsage(in window: Window, rankedBy: ProcessRank, limit: Int = 10) -> ProcessUsageReport
    public func latestRaw() -> RawRow?            // frozen wire surfaces embed stored JSON verbatim
    public func rawRows(prefix: String, in window: Window, sampledTo maxPoints: Int?) -> [RawRow]
    public func info() -> Info                    // prefixes, staleness, cumulative decode-skip counter
}

public enum Metric: String, CaseIterable, Sendable {   // THE projection table, once
    case cpuUsage, cpuLoad1, memoryUsedPercent, gpuUsage,
         diskRead, diskWrite, networkRx, networkTx,
         thermalCPU, thermalGPU, thermalBattery, fanSpeed, batteryPercent, healthScore
    public func value(in status: MoleStatus) -> Double?   // nil = no honest value (gpu<0, thermal 0s, fanCount gating)
}

public struct Snapshot: Sendable { public let ts: Int; public let status: MoleStatus; public let rawJSON: String; public var age: TimeInterval { get } }
public struct SnapshotSlice: Sendable { public let snapshots: [Snapshot]; public let droppedRows: Int; public let firstSkip: DriftReport? }
public struct DriftReport: Sendable {             // coding-path-precise: "missing key 'usage' at path 'cpu'"
    public enum Kind { case missingKey(key: String, path: String), typeMismatch(expected: String, path: String), dataCorrupted(path: String, detail: String), notJSON }
    public let kind: Kind; public let snippet: String; public let at: Date
}
public protocol NativeMetricsProviding: AnyObject, Sendable {   // stateless counters out; engine owns delta math
    func diskByteCounters() -> (read: UInt64, write: UInt64)?
    func netByteCounters() -> (rx: UInt64, tx: UInt64)?
    func gpuUtilization() -> Double?
    func fans() -> (count: Int, maxRPM: Int)?
    func temperatures() -> (cpu: Double?, gpu: Double?)
}

Representative usage:

// AppDelegate (replaces DB + Sampler + IOMonitor + Maintenance juggling):
let metrics = try Metrics(); metrics.start()
queryServer = QueryServer(reader: metrics.reader, port: port)

// HistoryView, 24 h — one decode pass, splice + drift included:
let bundle = await metrics.series(of: charts, in: .last(hours: 24), maxPoints: 720)

// MCP process (no engine, no timers, non-destructive open):
let reader = try MetricsReader.openDefault()
let report = reader.processUsage(in: .last(minutes: 60), rankedBy: .peakCPU, limit: 10)

// QueryServer /metrics — wire format byte-identical (raw rows embedded verbatim).

IOMonitor verdict (all three design panels converged): unify the 1 Hz sampler into the engine's live feed, never persist it (86 k rows/day would wreck retention), don't splice inside the reader (cross-process answers must not differ) — engine-level series(of:) does the splice for the GUI, with an opt-out. The asymmetry (the --mcp process has no ring) is documented, not papered over.

What it hides: subprocess mechanics; the hole-filling patch policy ("only fill 0/−1/absent, never synthesize objects, skip counter resets") plus all IOKit/SMC traffic; decode-before-insert validation with path-precise drift reporting; all of SQLite (WAL, recovery, stride sampling, prune/vacuum); retention config+timing+mechanics as one janitor; cadence policy incl. foreground floor; the cross-thread snapshot race (now a main-actor @Published); the aggregation semantics implemented once instead of four times.

Dependency Strategy

Local-substitutable. Tests run the real pipeline against a temp-file SQLite DB with fakes injected via Configuration — no mocking framework:

Seam Production Test substitute
statusSource engine capture of mo status --json (8 s) closure returning canned/drifted JSON fixtures
native IOKit + SMC provider FakeNative with scripted cumulative counters (engine owns delta math, so fakes are tables)
databaseURL Application Support path temp file per test (real SQLite, as DBTests already does)
policy .standard (reads Store) fixed literal (no UserDefaults)
clock Date.init stepped clock

Testing Strategy

New boundary tests:

  • Patch end-to-end with zero IOKit: scripted Apple-Silicon fixture (disk_io: 0/0, gpu[0].usage: -1) + fake counters → sampleNow()reader.latest() shows patched values; second tick after advancing counters asserts rate math and reset-skips.
  • Write-side drift: source emits JSON missing a required key → .rejected(DriftReport), nothing inserted, health counters advance.
  • Read-side drift: insert a malformed row directly → snapshots(in:).droppedRows == 1 with the coding path; info().decodeSkippedTotal accumulates; latest() falls back to the previous good row.
  • Retention: policy with short horizon → runMaintenanceNow() prunes and reports; auto-vacuum gating.
  • Foreground cadence: setLiveCadence(true) takes an immediate sample and tightens the interval.
  • Reader in a second handle (simulating the MCP process) sees the writer's rows; read-only open never mutates the file.

Old tests to delete/migrate: SnapshotStoreTests (subsumed by reader tests), the Store-coupling halves of MaintenanceTests and StoreTests (retention moves behind Policy). DBTests survive as hidden-engine tests. MCPTests' top-processes/process-usage assertions retarget reader.processUsage.

Test environment needs: temp-dir SQLite files only.

Implementation Recommendations

  • The engine owns the full snapshot lifecycle (spawn, patch, validate, persist, publish, prune) and the 1 Hz live feed; the reader owns every query and aggregation. Nothing else in the app touches DB, prefix strings, JSONDecoder for snapshots, or IOKit.
  • Make drift loud at both edges: reject-and-count on write (a malformed payload never reaches disk), skip-and-count on read, with coding-path detail. Surface the counters through health, /info, and burrow_info so a blank chart always has a visible cause.
  • Keep the wire formats frozen: /snapshot, /metrics, and MCP passthrough embed stored JSON verbatim (no parse→re-encode). The raw-row methods exist for exactly that; don't "clean them up".
  • Type the topology: only the engine type can sample; the reader is all the MCP process can construct. Read-only opens must skip the destructive recovery ladder.
  • Centralize the "0/−1 means missing" projection rules in Metric.value(in:) — they are currently re-implemented in three view models and drift independently.
  • Migration is mechanical but wide (~8 files swap (db, sampler) pairs for metrics/reader); land the module first, then move consumers one surface per commit. Depends on the execution engine RFC (RFC: deepen mo execution into one engine (capture / stream / PTY / elevated behind ports) #48) only for its default statusSource; an interim MoleCLI.run closure works.

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions