Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# ── Provider — any OpenAI-compatible API ─────────────────────────────────────
#
# Format: "openai" (default) or "anthropic". When anthropic, requests/responses
# pass through untouched — no translation. Use for vLLM, OpenCode Go, OpenRouter
# Anthropic endpoints, or any provider that speaks Anthropic's native Messages API.
# PROVIDER_API_FORMAT=anthropic
#
# Examples:
# NVIDIA NIM (free tier): https://integrate.api.nvidia.com/v1
# DeepSeek: https://api.deepseek.com/v1
Expand All @@ -18,10 +23,23 @@ PROVIDER_API_KEY=your-api-key-here
# OpenRouter: meta-llama/llama-3.3-70b-instruct
PROVIDER_MODEL=meta/llama-3.3-70b-instruct

# Optional: route Claude Code's cheap background requests (titles, file-path
# extraction, quota probes) to a cheaper/faster model. Claude Code tags these
# with a haiku-class model name. Leave empty to use PROVIDER_MODEL for everything.
# Example: PROVIDER_MODEL_BACKGROUND=llama-3.1-8b-instant
PROVIDER_MODEL_BACKGROUND=

PROVIDER_MAX_TOKENS=32768
PROVIDER_TEMPERATURE=1.0
PROVIDER_TOP_P=1.0

# ── Retries ──────────────────────────────────────────────────────────────────
# Retry transient provider failures (429 rate limits, 5xx). Honors Retry-After.
# Free tiers (Groq, NIM) throttle often; a single 429 otherwise kills the turn.
# Set PROVIDER_MAX_RETRIES=0 to disable.
PROVIDER_MAX_RETRIES=3
PROVIDER_RETRY_BASE_DELAY=0.5

# ── Server ───────────────────────────────────────────────────────────────────
HOST=127.0.0.1
PORT=8082
Expand Down
198 changes: 198 additions & 0 deletions docs/passthrough-plan.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
# Backdoor Passthrough Mode — Anthropic-Native Provider Support

## Status: Design (not yet implemented)

---

## Problem

Backdoor currently only supports OpenAI-compatible providers. Every request/reponse goes through a bidirectional translation layer (`translate.py`):

```
Claude Code → [Anthropic] → translate.py → [OpenAI] → Provider (DeepSeek, Groq, etc.)
Claude Code ← [Anthropic] ← translate.py ← [OpenAI] ← Provider
```

But some providers expose **Anthropic-compatible APIs** directly (vLLM, OpenCode Go with Qwen 3.7 Max, OpenRouter Anthropic endpoints, self-hosted LiteLLM in Anthropic mode). For these, the translation layer is unnecessary overhead and a source of bugs — wrong `stop_reason` mappings, dropped `cache_control`, broken tool call serialization.

**Without this feature**, Backdoor can't talk to Anthropic-native providers at all — it always rewrites requests to OpenAI format and sends them to `/chat/completions`.

## Solution

Add a **passthrough mode** that detects when the provider speaks Anthropic natively and relays requests/responses unchanged.

```
Claude Code → [Anthropic] → Passthrough → [Anthropic] → Provider (Qwen 3.7 via OpenCode Go)
Claude Code ← [Anthropic] ← Passthrough ← [Anthropic] ← Provider
```

The extra Backdoor features (retry, housekeeping mocking, background model routing, Telegram, health check) still apply.

## Scope

### In scope
- Passthrough for Anthropic-native providers (request body relayed as-is, response streamed back untouched)
- Config flag to select mode (`PROVIDER_API_FORMAT=anthropic|openai`)
- Retry logic applies to both modes
- Housekeeping interceptors still short-circuit before any provider call
- Background model routing (`PROVIDER_MODEL_BACKGROUND`) still applies

### Out of scope
- Auto-detection of provider format (explicit config only, v1)
- Response validation/transformation for "almost-compatible" providers (passthrough means passthrough)
- Non-streaming passthrough (Claude Code always streams)

## Design

### 1. Config

One new field in `.env` and `Settings`:

```
# .env
PROVIDER_API_FORMAT=anthropic # "openai" (default) or "anthropic"
```

```python
# config.py
class Settings:
provider_api_format: str = "openai" # "openai" | "anthropic"
```

### 2. Client — new passthrough endpoint methods

`client.py` gets two new methods that talk to the provider's Anthropic endpoint instead of OpenAI:

```python
# client.py
class ProviderClient:

async def complete_anthropic(self, payload: dict) -> dict:
"""Send raw Anthropic request, return raw Anthropic response (non-streaming)."""
resp = await self._client.post("/v1/messages", json=payload)
# same retry wrapper as complete()
...

async def stream_anthropic(self, payload: dict) -> AsyncIterator[str]:
"""Stream raw Anthropic SSE events back untouched."""
async with self._client.stream("POST", "/v1/messages", json=payload) as resp:
async for line in resp.aiter_lines():
yield line + "\n" # pass through raw SSE lines
```

The existing `complete()` and `stream()` (OpenAI path) remain unchanged.

### 3. Routes — branch on format

`routes.py` branches early in `create_message`:

```python
@router.post("/v1/messages")
async def create_message(req, settings):
# ── Housekeeping interceptors still fire first ──
fast = _check_optimizations(req, settings)
if fast:
return fast

# ── Resolve model (background routing still applies) ──
# In passthrough mode we swap req.model in-flight so the
# provider sees the resolved model name
if settings.provider_api_format == "anthropic":
resolved_model = _resolve_model(req.model, settings) if settings.provider_model_background else settings.provider_model
if resolved_model != req.model:
req.model = resolved_model

client = get_provider_client()
msg_id = f"msg_{uuid.uuid4().hex}"

# ── Passthrough path ──
if settings.provider_api_format == "anthropic":
return StreamingResponse(
_stream_passthrough(client, req.model_dump(), msg_id, settings),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
)

# ── Existing OpenAI translation path (unchanged) ──
payload = build_nim_payload(req, settings)
...
```

### 4. Streaming helper

```python
async def _stream_passthrough(
client: ProviderClient,
body: dict,
msg_id: str,
settings: Settings,
) -> AsyncIterator[str]:
"""Relay raw Anthropic SSE stream from provider to Claude Code."""
provider = body.get("model", settings.provider_model)
logger.info("→ %s [passthrough] stream", provider)

state: dict = {}
try:
async for line in client.stream_anthropic(body):
# Only count for logging, don't modify the stream
yield line
logger.info("← %s [passthrough] done", provider)
except ProviderError as e:
logger.error("Provider stream error %s: %s", e.status_code, e.message)
yield f"event: error\ndata: {json.dumps({'type':'error','error':{'type':'api_error','message':e.message}})}\n\n"
```

### 5. Background model routing

When `PROVIDER_API_FORMAT=anthropic` AND `PROVIDER_MODEL_BACKGROUND` is set, we need to swap the model name in the request body before forwarding. Claude Code sends e.g. `"model": "claude-haiku-4-5"` for background tasks — the provider doesn't know that name. We replace it with the configured background model before relaying.

This happens in `create_message` (step 3 above), not in `_stream_passthrough`. The streaming helper just forwards whatever body it receives.

## Edge Cases & Risks

| Risk | Mitigation |
|---|---|
| Provider's Anthropic API is subtly non-compliant (different SSE format, missing fields) | Passthrough means we don't touch the response. If Claude Code can't parse it, it fails — same as if `ANTHROPIC_BASE_URL` pointed there directly. This is expected. |
| Claude Code sends Anthropic API features the provider doesn't support (thinking blocks, citations, computer use) | Passthrough doesn't filter these. Provider returns an error. Claude Code handles it. |
| `PROVIDER_MODEL_BACKGROUND` swaps the model name for all haiku-class requests including ones that need Anthropic-specific model names | The background model name in config is the user's responsibility to set correctly for their provider. |
| `complete()` (non-streaming) passthrough needed | Claude Code primarily uses streaming. Non-streaming support added only if observed in practice. |

## Files Changed

| File | Change |
|---|---|
| `src/proxy/config.py` | Add `provider_api_format` field |
| `src/proxy/client.py` | Add `complete_anthropic()`, `stream_anthropic()` methods |
| `src/proxy/routes.py` | Branch on format, add `_stream_passthrough()` helper |
| `.env.example` | Document `PROVIDER_API_FORMAT` |
| `tests/test_passthrough.py` | New tests |

## Estimated Size

~80 lines of new code, ~40 lines of tests. No changes to existing translation code. No breaking changes — defaults to `openai` mode, preserving all current behavior.

## Example: Qwen 3.7 Max via OpenCode Go

```bash
# .env
PROVIDER_API_FORMAT=anthropic
PROVIDER_BASE_URL=https://opencode-go.example.com/v1
PROVIDER_API_KEY=sk-xxx
PROVIDER_MODEL=qwen-3.7-max
PROVIDER_MODEL_BACKGROUND=qwen-3.7-lite # optional
PROVIDER_MAX_RETRIES=3
```

Flow:
1. Claude Code sends `POST /v1/messages` with `"model": "claude-opus-4-5"`
2. Backdoor intercepts housekeeping requests (quota probes, title gen) — never hits provider
3. For real requests, swaps model to `qwen-3.7-max` (or `qwen-3.7-lite` for haiku-class background)
4. Forwards raw body to OpenCode Go's `/v1/messages`
5. Streams SSE response back to Claude Code untouched
6. Retries on 429/5xx

## Future Enhancements (v2)

- **Auto-detection**: probe the provider's root endpoint to see if it exposes `/v1/messages` vs `/chat/completions`
- **Hybrid mode**: use Anthropic-native for main model, OpenAI-native for background model (different providers)
- **Response validation in passthrough mode**: optionally validate that the provider's Anthropic response is well-formed before forwarding
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ dev = [

[tool.pytest.ini_options]
asyncio_mode = "auto"
pythonpath = ["."]

[tool.hatch.build.targets.wheel]
packages = ["src/proxy"]
Expand Down
126 changes: 110 additions & 16 deletions src/proxy/client.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
"""Async OpenAI-compatible provider client."""

import asyncio
import json
import logging
from email.utils import parsedate_to_datetime
from datetime import datetime, timezone
from typing import AsyncIterator, Any

import httpx
Expand All @@ -12,6 +15,27 @@

TIMEOUT = httpx.Timeout(connect=10.0, read=120.0, write=30.0, pool=5.0)

# Provider statuses worth retrying. 429 is rate limiting (common on free tiers
# like Groq and NIM); 500/502/503/504 are transient upstream errors. 4xx other
# than 429 mean the request is wrong and a retry would just fail again.
RETRYABLE_STATUS = {429, 500, 502, 503, 504}


def _retry_after_seconds(headers: httpx.Headers, fallback: float) -> float:
"""Return how long to wait, honoring a Retry-After header when present."""
ra = headers.get("retry-after")
if not ra:
return fallback
try:
return float(ra) # delta-seconds form
except ValueError:
pass
try: # HTTP-date form
when = parsedate_to_datetime(ra)
return max(0.0, (when - datetime.now(timezone.utc)).total_seconds())
except (TypeError, ValueError):
return fallback


class ProviderError(Exception):
def __init__(self, status_code: int, message: str):
Expand All @@ -29,30 +53,100 @@ def __init__(self, settings: Settings):
timeout=TIMEOUT,
)

async def _backoff(self, attempt: int, headers: httpx.Headers | None) -> None:
delay = self._settings.provider_retry_base_delay * (2 ** attempt)
if headers is not None:
delay = _retry_after_seconds(headers, delay)
await asyncio.sleep(delay)

async def complete(self, payload: dict[str, Any]) -> dict[str, Any]:
payload = {**payload, "stream": False}
resp = await self._client.post("/chat/completions", json=payload)
if resp.status_code != 200:
max_attempts = self._settings.provider_max_retries + 1
last_error: ProviderError | None = None
for attempt in range(max_attempts):
try:
resp = await self._client.post("/chat/completions", json=payload)
except httpx.TransportError as e:
last_error = ProviderError(503, f"transport error: {e}")
if attempt < max_attempts - 1:
await self._backoff(attempt, None)
continue
raise last_error
if resp.status_code == 200:
return resp.json()
if resp.status_code in RETRYABLE_STATUS and attempt < max_attempts - 1:
logger.warning("provider %s, retrying (attempt %d)", resp.status_code, attempt + 1)
await self._backoff(attempt, resp.headers)
continue
raise ProviderError(resp.status_code, resp.text)
return resp.json()
raise last_error or ProviderError(503, "exhausted retries")

async def stream(self, payload: dict[str, Any]) -> AsyncIterator[dict[str, Any]]:
payload = {**payload, "stream": True, "stream_options": {"include_usage": True}}
async with self._client.stream("POST", "/chat/completions", json=payload) as resp:
if resp.status_code != 200:
body = await resp.aread()
raise ProviderError(resp.status_code, body.decode())
async for line in resp.aiter_lines():
line = line.strip()
if not line or not line.startswith("data:"):
max_attempts = self._settings.provider_max_retries + 1
# Only the connection/status phase is retryable. Once chunks start
# flowing, retrying would re-yield message_start and corrupt the
# stream, so a mid-stream failure surfaces to the caller instead.
for attempt in range(max_attempts):
started = False
try:
async with self._client.stream("POST", "/chat/completions", json=payload) as resp:
if resp.status_code != 200:
body = await resp.aread()
if resp.status_code in RETRYABLE_STATUS and attempt < max_attempts - 1:
logger.warning("provider %s on stream, retrying (attempt %d)", resp.status_code, attempt + 1)
await self._backoff(attempt, resp.headers)
continue
raise ProviderError(resp.status_code, body.decode())
async for line in resp.aiter_lines():
line = line.strip()
if not line or not line.startswith("data:"):
continue
data = line[5:].strip()
if data == "[DONE]":
return
try:
chunk = json.loads(data)
except json.JSONDecodeError:
logger.warning("Unparseable SSE chunk: %s", data)
continue
started = True
yield chunk
return
except httpx.TransportError as e:
if not started and attempt < max_attempts - 1:
logger.warning("transport error on stream, retrying (attempt %d): %s", attempt + 1, e)
await self._backoff(attempt, None)
continue
data = line[5:].strip()
if data == "[DONE]":
raise ProviderError(503, f"transport error: {e}")

# ── Anthropic-native passthrough ──────────────────────────────────────────

async def stream_anthropic(self, payload: dict[str, Any]) -> AsyncIterator[str]:
"""Stream raw Anthropic SSE events back untouched (passthrough mode)."""
payload = {**payload, "stream": True}
max_attempts = self._settings.provider_max_retries + 1
for attempt in range(max_attempts):
started = False
try:
async with self._client.stream("POST", "/messages", json=payload) as resp:
if resp.status_code != 200:
body = await resp.aread()
if resp.status_code in RETRYABLE_STATUS and attempt < max_attempts - 1:
logger.warning("provider %s on anthropic stream, retrying (attempt %d)", resp.status_code, attempt + 1)
await self._backoff(attempt, resp.headers)
continue
raise ProviderError(resp.status_code, body.decode())
async for line in resp.aiter_lines():
started = True
yield line + "\n"
return
try:
yield json.loads(data)
except json.JSONDecodeError:
logger.warning("Unparseable SSE chunk: %s", data)
except httpx.TransportError as e:
if not started and attempt < max_attempts - 1:
logger.warning("transport error on anthropic stream, retrying (attempt %d): %s", attempt + 1, e)
await self._backoff(attempt, None)
continue
raise ProviderError(503, f"transport error: {e}")

async def aclose(self):
await self._client.aclose()
Loading