Skip to content

dataxwalker/chorus-sdk

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

2 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

◎ CHORUS

A shared memory protocol for collaborative AI agents

npm version license TypeScript built for Claude

Agents that never speak to each other can still think together.


The Idea

Most multi-agent systems route messages between agents explicitly. CHORUS takes a different approach — a shared memory pool that any agent can read from and write to.

Before each step, an agent reads the most relevant insights from the pool. After responding, it extracts and deposits its own discovery. The result is emergent collaboration: agents build on a growing collective knowledge base without ever communicating directly.

Agent A ──read──▶ [ Memory Pool ] ◀──read── Agent B
        ◀─write─                   ─write──▶

Insights are ranked by popularity — the most frequently referenced entries surface first. Each agent sees its own entries first, then entries from others, with duplicates removed automatically.


Install

npm install @dataxwalker/chorus-sdk

Requires Node.js ≥ 18 and an Anthropic API key.

Store your key in a .env file at the project root (never commit it):

# .env
ANTHROPIC_API_KEY=sk-ant-...

Load it at startup with Node's built-in env support (Node ≥ 20.6) or a package like dotenv:

# Node ≥ 20.6 — no extra dependency needed
node --env-file=.env your-script.js
# or with dotenv
npm install dotenv
// your-script.ts
import 'dotenv/config';

Quick Start

import Chorus from '@dataxwalker/chorus-sdk';

const chorus = new Chorus({ apiKey: process.env.ANTHROPIC_API_KEY! });

// Agent A learns something from the user
const a = await chorus.step(
    'agent-A',
    "I'm building a fitness app in React Native",
);
console.log(a.text);
// a.newInsight → "User is building a fitness app in React Native"

// Agent B automatically knows what A learned — shared memory pool
const b = await chorus.step(
    'agent-B',
    'What tech stack should I use for the backend?',
);
console.log(b.text); // Response informed by A's insight
console.log(b.insightsUsed); // ["User is building a fitness app in React Native"]

// Streaming support
const stream = await chorus.stepStream('agent-A', 'How do I track workouts?');
for await (const chunk of stream) process.stdout.write(chunk);
const result = await stream.finalResult();

Each call to step() automatically:

  1. Reads the top insights from the pool
  2. Injects them into the system prompt as a numbered list
  3. Calls the Claude API
  4. AI determines which injected insights are relevant to the current query
  5. Extracts any new INSIGHT: from the response and saves it to the pool

API Reference

new Chorus(config)

Option Type Default Description
apiKey string Your Anthropic API key
instanceId string auto-generated Identifier for this session/tenant
storageAdapter IStorageAdapter In-memory Custom storage backend
defaultModel string claude-3-5-sonnet-20240620 Claude model to use

chorus.step(agentId, prompt, options?)

The core method. Runs a full Read → Think → Write cycle.

Option Type Default Description
model string defaultModel from config Claude model to use for this step — overrides the instance default
system string 'You are a helpful AI assistant.' System prompt prefix for this agent
topK number 5 How many insights to pull from the pool

You can set a global default model once in the constructor and override it per-step when a specific agent needs a different model:

const chorus = new Chorus({
    apiKey: process.env.ANTHROPIC_API_KEY!,
    defaultModel: 'claude-haiku-4-5', // used by all agents unless overridden
});

const analyst = await chorus.step('analyst', 'Summarize the findings.');
const writer = await chorus.step('writer', 'Write the final report.', {
    model: 'claude-opus-4-5', // override for this step only
});
const result = await chorus.step('analyst', 'Summarize the latest findings.', {
    system: 'You are a financial analyst.',
    model: 'claude-opus-4-5',
    topK: 3,
});

console.log(result.text); // full agent response (clean, no markers)
console.log(result.insightsUsed); // only insights the AI found relevant to this query
console.log(result.newInsight); // new insight this agent extracted and saved, if any

Returns StepResult:

interface StepResult {
    agentId: string;
    text: string; // clean response text (USED header and INSIGHT marker stripped)
    insightsUsed: string[]; // insights the AI determined were relevant to this specific query
    newInsight?: string; // new insight written to the pool, if any
    metadata: {
        timestamp: number;
        instanceId: string;
    };
}

Note: insightsUsed contains only the insights the AI marked as relevant via the USED: header — not everything that was injected into the prompt. If the AI didn't follow the USED format, the SDK gracefully falls back to returning all injected insights.


chorus.stepStream(agentId, prompt, options?)

Streaming variant of step(). Returns a ChorusStream — iterate it to receive text chunks as they arrive, then call finalResult() once the stream ends to get the full result with extracted insights.

const stream = await chorus.stepStream(
    'writer',
    'Write a summary of renewable energy trends.',
);

// All insights injected into the prompt — available immediately
console.log(stream.insightsUsed);

// Stream text to the client chunk by chunk (USED header & INSIGHT marker auto-stripped)
for await (const chunk of stream) {
    process.stdout.write(chunk); // or send via SSE / WebSocket
}

// After the first chunk arrives, AI-determined relevant insights are populated
console.log(stream.relevantInsights);

// Get the full result (insight already written to pool)
const result = await stream.finalResult();
console.log(result.newInsight); // what was written back, if any

stepStream() accepts the same options as step() (model, system, topK).

ChorusStream shape:

class ChorusStream {
    readonly insightsUsed: string[]; // all insights injected into the prompt (available immediately)
    readonly relevantInsights: string[]; // insights AI marked as relevant (populated after first chunk)

    [Symbol.asyncIterator](): AsyncIterator<string>; // text delta chunks (clean, no markers)
    finalResult(): Promise<StepResult>; // full result; safe to call before or after iteration
}

The iterator handles all marker parsing automatically:

  • Buffers and strips the USED: ... / --- header from the start
  • Detects \nINSIGHT: ... near the end and stops yielding before it
  • Only emits clean response text to the consumer

chorus.read(agentId, topK?)

Pulls the most relevant insights from the pool. Own entries come first, then entries from other agents. Duplicates (case-insensitive) are skipped. Records are ranked by readCount — the most referenced insights surface first within each group.

const insights = await chorus.read('analyst', 5);
// → ['My own earlier insight...', 'Solar costs dropped 40% in 2025.', ...]
Parameter Type Default Description
agentId string The requesting agent
topK number 5 Maximum number of insights to return

Each returned entry has its readCount incremented, so frequently-read insights naturally rise to the top over time.


chorus.write(agentId, insight)

Manually deposits an insight into the pool. Useful when you want to seed the pool with known facts before running agents, or when you process insights outside of step().

Entries shorter than 5 characters and case-insensitive duplicates of existing entries are silently ignored.

await chorus.write(
    'researcher',
    'Solar adoption is accelerating fastest in Southeast Asia.',
);

chorus.clear()

Resets the entire memory pool. All recorded insights are discarded. Useful for starting a new task within the same server session without creating a new Chorus instance.

await chorus.clear();

Memory Helpers

// Get all entries — useful for real-time dashboards and visualizations
const pool = await chorus.getAllInsights(); // → InsightRecord[]

// Get entries written by a specific agent
const mine = await chorus.getInsightsByAgent('researcher'); // → InsightRecord[]

InsightRecord shape:

interface InsightRecord {
    agentId: string;
    insight: string;
    timestamp: number;
    readCount: number; // incremented each time the entry is surfaced via read()
}

chorus.parallel(tasks)

Runs multiple agents in parallel. All agents read the pool before any of them writes — they share the same "generation" of knowledge. After all complete, every agent's insight lands in the pool simultaneously.

const results = await chorus.parallel([
    { agentId: 'researcher', prompt: 'Find key facts about fusion energy.' },
    {
        agentId: 'analyst',
        prompt: 'Find key facts about fusion energy.',
        options: { system: 'You are a data analyst.' },
    },
    {
        agentId: 'skeptic',
        prompt: 'Find key facts about fusion energy.',
        options: { model: 'claude-opus-4-5' },
    },
]);

for (const result of results) {
    console.log(result.agentId, result.text);
}

ParallelTask shape:

interface ParallelTask {
    agentId: string;
    prompt: string;
    options?: StepOptions;
}
Round 1 (parallel):   Agent A ──read──▶ [ Pool ] ◀──read── Agent B
                             ──write─▶          ◀──write──

Round 2 (sequential): Agent C reads insights from both A and B

Ideal for breadth-first exploration — diverse independent perspectives on the same question before a synthesizing agent consolidates them.


Custom Storage

The default storage is in-memory and scoped to a single process. For persistence or multi-instance deployments, implement the IStorageAdapter interface:

import Chorus, {
    IStorageAdapter,
    InsightRecord,
} from '@dataxwalker/chorus-sdk';

class RedisAdapter implements IStorageAdapter {
    async get(): Promise<InsightRecord[]> {
        /* read from Redis */
    }
    async add(record: InsightRecord): Promise<void> {
        /* write to Redis */
    }
    async reset(): Promise<void> {
        /* flush */
    }
}

const chorus = new Chorus({
    apiKey: process.env.ANTHROPIC_API_KEY!,
    storageAdapter: new RedisAdapter(),
});

IStorageAdapter:

interface IStorageAdapter {
    get(): Promise<InsightRecord[]>;
    add(record: InsightRecord): Promise<void>;
    reset(): Promise<void>;
}

Multi-Tenant Pattern

Create one isolated Chorus instance per user session. Instances share no memory.

const sessions = new Map<string, Chorus>();

function getSession(userId: string): Chorus {
    if (!sessions.has(userId)) {
        sessions.set(
            userId,
            new Chorus({
                apiKey: process.env.ANTHROPIC_API_KEY!,
                instanceId: userId,
            }),
        );
    }
    return sessions.get(userId)!;
}

How It Works Under the Hood

CHORUS uses a lightweight marker protocol — no fine-tuning, no structured output mode, just prompt conventions.

1. Memory Injection

Before each step(), the SDK reads the pool and injects insights as a numbered list in the system prompt:

[SHARED MEMORY POOL]
Insights from previous interactions:
[0] User is building a fitness app in React Native
[1] User prefers PostgreSQL over MongoDB
[2] The app needs offline sync support

2. Relevance Detection (USED header)

When insights are present, the AI is instructed to output a USED: header before its response, listing which entries were actually relevant:

USED: 0, 2
---
Based on your React Native fitness app with offline sync needs, I'd recommend...

The SDK parses this header to populate insightsUsed / relevantInsights with only the entries the AI considered relevant. The USED: line and --- delimiter are stripped — only clean text reaches the consumer.

If the pool is empty, or the AI doesn't follow the format, the SDK gracefully falls back to returning all injected insights.

3. Insight Extraction (INSIGHT marker)

When the AI discovers something new worth remembering, it appends an INSIGHT: line to the end of its response:

...end of response.

INSIGHT: User wants the app to support Apple Watch integration

The SDK parses the last INSIGHT: occurrence, strips it from the response text, and saves it to the pool via write(). Duplicates (case-insensitive) are automatically rejected.

4. Streaming Behavior

In stepStream(), the iterator buffers incoming chunks until the --- delimiter is found, parses the USED: line, and only then starts yielding clean text. It also watches for the \nINSIGHT: marker near the end and stops yielding once detected. The consumer receives only the response prose — never the protocol markers.


Roadmap

  • Redis & PostgreSQL adapter packages
  • Semantic insight similarity pruning
  • Support for additional LLM providers

License

MIT

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors