Skip to content

Commit 71b72b9

Browse files
feat(subagent): add planner mode for task delegation (#753)
* feat(subagent): add output schema support with Pydantic validation - Add optional output_schema parameter to subagent() function - Validate subagent outputs against provided Pydantic schemas - Modify return prompt to include schema when provided - Add comprehensive tests for schema validation logic - Enables structured outputs for planner pattern (Issue #39) This is foundation work for implementing the Manus-style planner pattern, allowing subagents to return structured, validated outputs instead of generic JSON responses. Co-authored-by: Bob <bob@superuserlabs.org> * feat(subagent): add planner mode for task delegation - Add mode parameter (executor/planner) to subagent() - Add subtasks parameter for planner mode - Implement _run_planner() to spawn multiple executor subagents - Each subtask gets its own executor with optional output schema - Fix closure issue with loop variables in thread function This implements Phase 1 of the planner pattern from Issue #39, enabling efficient multi-step task delegation as seen in Manus agent. Co-authored-by: Bob <bob@superuserlabs.org> * test(subagent): fix type annotations and remove unused variable - Add SubtaskDef import for proper typing - Type annotate subtasks lists in tests - Remove unused initial_count variable Co-authored-by: Bob <bob@superuserlabs.org> * docs(subagent): add planner mode examples Add examples demonstrating both executor and planner modes: - Executor mode: single task delegation (existing) - Planner mode: multi-task delegation with subtasks Co-authored-by: Bob <bob@superuserlabs.org> * refactor(subagent): remove Pydantic dependency per maintainer feedback - Remove Pydantic-based output schema validation - Simplify to prompt-based approach (model + good prompts) - Keep planner mode functionality (core value) - Delete schema validation tests - Update remaining tests to not use Pydantic Addresses feedback from PR #753 and #751 that gptme should remain provider-independent and rely on model capability + prompts rather than strict schema enforcement. Co-authored-by: Bob <bob@superuserlabs.org> * refactor(subagent): use logger instead of print statements - Replace print with logger.error for error messages (lines 65, 68) - Replace print with logger.info for informational messages (line 218) - Update docstring to clarify async execution and immediate None return Addresses automated review feedback from Ellipsis on PR #753. * fix(subagent): add blank line between import groups Fixes lint error I001 (unsorted imports) by adding blank line between first-party imports (gptme) and relative imports (..prompts). Co-authored-by: Bob <bob@superuserlabs.org> * docs(subagent): fix Sphinx documentation warnings - Use string literal for SubtaskDef type annotation - Reformat Returns section to avoid class reference misinterpretation Fixes build failure in PR #753. Co-authored-by: Bob <bob@superuserlabs.org> * fix(subagent): remove string literal from SubtaskDef type hint for Sphinx Sphinx autodoc was failing to resolve the string reference list["SubtaskDef"]. Since SubtaskDef is defined before its use, no forward reference needed. Fixes build failure in PR #753. * docs(subagent): add SubtaskDef to nitpick_ignore list Fix Sphinx build error where SubtaskDef reference could not be resolved. TypedDict classes need to be added to nitpick_ignore to prevent warnings. Co-authored-by: Bob <bob@superuserlabs.org> * feat(subagent): add parallel/sequential execution modes and complete tool integration Addresses Erik's feedback on PR #753: - Add execution_mode parameter (parallel/sequential) to planner mode - Replace JSON return format with complete tool usage - Update status() method to detect complete tool calls - Add tests for both execution modes Benefits: - Sequential mode waits for each subtask before starting next - Parallel mode (default) runs all subtasks concurrently - Complete tool provides natural task completion with full log access - Cleaner API aligned with gptme's tool-based architecture Co-authored-by: Bob <bob@superuserlabs.org> * feat(subagent): add Phase 3 context sharing modes Implement three context sharing modes for subagent tool: - full: Complete context (agent identity, tools, workspace) - default - instructions-only: Minimal context with just user prompt - selective: Choose specific context components Features: - New context_mode parameter (full/instructions-only/selective) - New context_include parameter for selective mode - Support for component selection: agent, tools, workspace - Updated examples and documentation - 9 new test cases for context modes Context modes enable token-efficient task delegation: - instructions-only: For simple, well-defined tasks - selective: For tasks needing specific context - full: For complex tasks requiring all context Addresses Phase 3 in enhance-subagent-planner-pattern task. Co-authored-by: Bob <bob@superuserlabs.org> * fix(subagent): move context_mode validation to main thread Move context_mode='selective' validation from run_subagent() (background thread) to subagent() (main thread) so pytest.raises() can catch it properly. Also add type assertion for context_include to satisfy mypy after validation. Fixes test_context_mode_selective_requires_context_include. Co-authored-by: Bob <bob@superuserlabs.org> * fix(subagent): propagate context_mode to planner executors Fixes the critical issue identified in Greptile review where planner mode ignored context_mode parameter and always used full context. Changes: - Add context_mode and context_include parameters to _run_planner() - Pass these parameters from subagent() when calling _run_planner() - Replace hardcoded get_prompt() with context-aware message building - Use same context-building logic as executor mode Executors spawned by planner now correctly respect: - 'instructions-only': Minimal context with complete tool only - 'selective': Custom context components (agent, tools, workspace) - 'full': Complete context (default, backward compatible) Addresses: #753 (Greptile confidence score 2/5)
1 parent 630c0cc commit 71b72b9

File tree

3 files changed

+555
-46
lines changed

3 files changed

+555
-46
lines changed

docs/conf.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ def setup(app):
143143
("py:class", "ToolFormat"),
144144
("py:class", "ConfirmFunc"),
145145
("py:class", "Path"),
146+
("py:class", "gptme.tools.subagent.SubtaskDef"),
146147
]
147148

148149
# -- Options for HTML output -------------------------------------------------

gptme/tools/subagent.py

Lines changed: 297 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,26 @@
44
Lets gptme break down a task into smaller parts, and delegate them to subagents.
55
"""
66

7-
import json
87
import logging
98
import random
109
import string
1110
import threading
1211
from dataclasses import asdict, dataclass
1312
from pathlib import Path
14-
from typing import TYPE_CHECKING, Literal
13+
from typing import TYPE_CHECKING, Literal, TypedDict
1514

1615
from ..message import Message
1716
from . import get_tools
1817
from .base import ToolSpec, ToolUse
1918

19+
20+
class SubtaskDef(TypedDict):
21+
"""Definition of a subtask for planner mode."""
22+
23+
id: str
24+
description: str
25+
26+
2027
if TYPE_CHECKING:
2128
# noreorder
2229
from ..logmanager import LogManager # fmt: skip
@@ -50,27 +57,215 @@ def get_log(self) -> "LogManager":
5057
def status(self) -> ReturnType:
5158
if self.thread.is_alive():
5259
return ReturnType("running")
53-
# check if the last message contains the return JSON
54-
msg = self.get_log().log[-1].content.strip()
55-
json_response = _extract_json(msg)
56-
if not json_response:
57-
print(f"FAILED to find JSON in message: {msg}")
58-
return ReturnType("failure")
59-
elif not json_response.strip().startswith("{"):
60-
print(f"FAILED to parse JSON: {json_response}")
61-
return ReturnType("failure")
62-
else:
63-
return ReturnType(**json.loads(json_response)) # type: ignore
64-
65-
66-
def _extract_json(s: str) -> str:
67-
first_brace = s.find("{")
68-
last_brace = s.rfind("}")
69-
return s[first_brace : last_brace + 1]
70-
71-
72-
def subagent(agent_id: str, prompt: str):
73-
"""Runs a subagent and returns the resulting JSON output."""
60+
61+
# Check if executor used the complete tool
62+
log = self.get_log().log
63+
if not log:
64+
return ReturnType("failure", "No messages in log")
65+
66+
last_msg = log[-1]
67+
68+
# Check for complete tool call in last message
69+
tool_uses = list(ToolUse.iter_from_content(last_msg.content))
70+
complete_tool = next((tu for tu in tool_uses if tu.tool == "complete"), None)
71+
72+
if complete_tool:
73+
# Extract content from complete tool
74+
result = complete_tool.content or "Task completed"
75+
return ReturnType(
76+
"success",
77+
result + f"\n\nFull log: {self.logdir}",
78+
)
79+
80+
# Check if session ended with system completion message
81+
if last_msg.role == "system" and "Task complete" in last_msg.content:
82+
return ReturnType(
83+
"success",
84+
f"Task completed successfully. Full log: {self.logdir}",
85+
)
86+
87+
# Task didn't complete properly
88+
return ReturnType(
89+
"failure",
90+
f"Task did not complete properly. Check log: {self.logdir}",
91+
)
92+
93+
94+
def _run_planner(
95+
agent_id: str,
96+
prompt: str,
97+
subtasks: list[SubtaskDef],
98+
execution_mode: Literal["parallel", "sequential"] = "parallel",
99+
context_mode: Literal["full", "instructions-only", "selective"] = "full",
100+
context_include: list[str] | None = None,
101+
) -> None:
102+
"""Run a planner that delegates work to multiple executor subagents.
103+
104+
Args:
105+
agent_id: Identifier for the planner
106+
prompt: Context prompt shared with all executors
107+
subtasks: List of subtask definitions to execute
108+
execution_mode: "parallel" (all at once) or "sequential" (one by one)
109+
context_mode: Controls what context is shared with executors (see subagent() docs)
110+
context_include: For selective mode, list of context components to include
111+
"""
112+
from gptme import chat
113+
from gptme.cli import get_logdir
114+
115+
from ..prompts import get_prompt
116+
117+
logger.info(
118+
f"Starting planner {agent_id} with {len(subtasks)} subtasks "
119+
f"in {execution_mode} mode"
120+
)
121+
122+
def random_string(n):
123+
s = string.ascii_lowercase + string.digits
124+
return "".join(random.choice(s) for _ in range(n))
125+
126+
threads = []
127+
for subtask in subtasks:
128+
executor_id = f"{agent_id}-{subtask['id']}"
129+
executor_prompt = f"Context: {prompt}\n\nSubtask: {subtask['description']}"
130+
name = f"subagent-{executor_id}"
131+
logdir = get_logdir(name + "-" + random_string(4))
132+
133+
def run_executor(prompt=executor_prompt, log_dir=logdir):
134+
prompt_msgs = [Message("user", prompt)]
135+
workspace = Path.cwd()
136+
137+
# Build initial messages based on context_mode
138+
if context_mode == "instructions-only":
139+
# Minimal system context - just basic instruction
140+
initial_msgs = [
141+
Message(
142+
"system",
143+
"You are a helpful AI assistant. Complete the task described by the user. Use the `complete` tool when finished with a summary of your work.",
144+
)
145+
]
146+
# Add complete tool for instructions-only mode
147+
from ..prompts import prompt_tools
148+
149+
initial_msgs.extend(
150+
list(
151+
prompt_tools(
152+
tools=[t for t in get_tools() if t.name == "complete"],
153+
tool_format="markdown",
154+
)
155+
)
156+
)
157+
elif context_mode == "selective":
158+
# Selective context - build from specified components
159+
from ..prompts import prompt_gptme, prompt_tools
160+
161+
initial_msgs = []
162+
163+
# Add components based on context_include
164+
if context_include and "agent" in context_include:
165+
initial_msgs.extend(
166+
list(prompt_gptme(False, None, agent_name=None))
167+
)
168+
if context_include and "tools" in context_include:
169+
initial_msgs.extend(
170+
list(prompt_tools(tools=get_tools(), tool_format="markdown"))
171+
)
172+
# workspace handled by passing workspace parameter to chat() if included
173+
else: # "full" mode (default)
174+
# Full context
175+
initial_msgs = get_prompt(
176+
get_tools(), interactive=False, workspace=workspace
177+
)
178+
179+
complete_prompt = (
180+
"When you have finished the task, use the `complete` tool:\n"
181+
"```complete\n"
182+
"Brief summary of what was accomplished.\n"
183+
"```\n\n"
184+
"This signals task completion. The full conversation log will be "
185+
"available to the planner for review."
186+
)
187+
prompt_msgs.append(Message("user", complete_prompt))
188+
chat(
189+
prompt_msgs,
190+
initial_msgs,
191+
logdir=log_dir,
192+
workspace=workspace,
193+
model=None,
194+
stream=False,
195+
no_confirm=True,
196+
interactive=False,
197+
show_hidden=False,
198+
)
199+
200+
t = threading.Thread(target=run_executor, daemon=True)
201+
t.start()
202+
threads.append(t)
203+
_subagents.append(Subagent(executor_id, executor_prompt, t, logdir))
204+
205+
# Sequential mode: wait for each task to complete before starting next
206+
if execution_mode == "sequential":
207+
logger.info(f"Waiting for {executor_id} to complete (sequential mode)")
208+
t.join()
209+
logger.info(f"Executor {executor_id} completed")
210+
211+
# Parallel mode: all threads already started
212+
if execution_mode == "parallel":
213+
logger.info(f"Planner {agent_id} spawned {len(subtasks)} executor subagents")
214+
else:
215+
logger.info(
216+
f"Planner {agent_id} completed {len(subtasks)} subtasks sequentially"
217+
)
218+
219+
220+
def subagent(
221+
agent_id: str,
222+
prompt: str,
223+
mode: Literal["executor", "planner"] = "executor",
224+
subtasks: list[SubtaskDef] | None = None,
225+
execution_mode: Literal["parallel", "sequential"] = "parallel",
226+
context_mode: Literal["full", "instructions-only", "selective"] = "full",
227+
context_include: list[str] | None = None,
228+
):
229+
"""Starts an asynchronous subagent. Returns None immediately; output is retrieved later via subagent_wait().
230+
231+
Args:
232+
agent_id: Unique identifier for the subagent
233+
prompt: Task prompt for the subagent (used as context for planner mode)
234+
mode: "executor" for single task, "planner" for delegating to multiple executors
235+
subtasks: List of subtask definitions for planner mode (required when mode="planner")
236+
execution_mode: "parallel" (default) runs all subtasks concurrently,
237+
"sequential" runs subtasks one after another.
238+
Only applies to planner mode.
239+
context_mode: Controls what context is shared with the subagent:
240+
- "full" (default): Share complete context (agent identity, tools, workspace)
241+
- "instructions-only": Minimal context, only the user prompt
242+
- "selective": Share only specified context components (requires context_include)
243+
context_include: For selective mode, list of context components to include:
244+
- "agent": Agent identity and capabilities
245+
- "tools": Tool descriptions and usage
246+
- "workspace": Workspace files and structure
247+
248+
Returns:
249+
None: Starts asynchronous execution. Use subagent_wait() to retrieve output.
250+
In executor mode, starts a single task execution.
251+
In planner mode, starts execution of all subtasks using the specified execution_mode.
252+
253+
Executors use the `complete` tool to signal completion with a summary.
254+
The full conversation log is available at the logdir path.
255+
"""
256+
if mode == "planner":
257+
if not subtasks:
258+
raise ValueError("Planner mode requires subtasks parameter")
259+
return _run_planner(
260+
agent_id, prompt, subtasks, execution_mode, context_mode, context_include
261+
)
262+
263+
# Validate context_mode parameters
264+
if context_mode == "selective" and not context_include:
265+
raise ValueError(
266+
"context_include parameter required when context_mode='selective'"
267+
)
268+
74269
# noreorder
75270
from gptme import chat # fmt: skip
76271
from gptme.cli import get_logdir # fmt: skip
@@ -87,7 +282,49 @@ def random_string(n):
87282
def run_subagent():
88283
prompt_msgs = [Message("user", prompt)]
89284
workspace = Path.cwd()
90-
initial_msgs = get_prompt(get_tools(), interactive=False, workspace=workspace)
285+
286+
# Build initial messages based on context_mode
287+
if context_mode == "instructions-only":
288+
# Minimal system context - just basic instruction
289+
initial_msgs = [
290+
Message(
291+
"system",
292+
"You are a helpful AI assistant. Complete the task described by the user. Use the `complete` tool when finished with a summary of your work.",
293+
)
294+
]
295+
# Add complete tool for instructions-only mode
296+
from ..prompts import prompt_tools
297+
298+
initial_msgs.extend(
299+
list(
300+
prompt_tools(
301+
tools=[t for t in get_tools() if t.name == "complete"],
302+
tool_format="markdown",
303+
)
304+
)
305+
)
306+
elif context_mode == "selective":
307+
# Selective context - build from specified components
308+
from ..prompts import prompt_gptme, prompt_tools
309+
310+
initial_msgs = []
311+
312+
# Type narrowing: context_include validated as not None earlier
313+
assert context_include is not None
314+
315+
# Add components based on context_include
316+
if "agent" in context_include:
317+
initial_msgs.extend(list(prompt_gptme(False, None, agent_name=None)))
318+
if "tools" in context_include:
319+
initial_msgs.extend(
320+
list(prompt_tools(tools=get_tools(), tool_format="markdown"))
321+
)
322+
# workspace handled by passing workspace parameter to chat() if included
323+
else: # "full" mode (default)
324+
# Current behavior - full context
325+
initial_msgs = get_prompt(
326+
get_tools(), interactive=False, workspace=workspace
327+
)
91328

92329
# add the return prompt
93330
return_prompt = """Thank you for doing the task, please reply with a JSON codeblock on the format:
@@ -100,6 +337,8 @@ def run_subagent():
100337
```"""
101338
prompt_msgs.append(Message("user", return_prompt))
102339

340+
# Note: workspace parameter is always passed to chat() (required parameter)
341+
# Workspace context in messages is controlled by initial_msgs
103342
chat(
104343
prompt_msgs,
105344
initial_msgs,
@@ -139,21 +378,54 @@ def subagent_wait(agent_id: str) -> dict:
139378
if subagent is None:
140379
raise ValueError(f"Subagent with ID {agent_id} not found.")
141380

142-
print("Waiting for the subagent to finish...")
381+
logger.info("Waiting for the subagent to finish...")
143382
subagent.thread.join(timeout=60)
144383
status = subagent.status()
145384
return asdict(status)
146385

147386

148387
def examples(tool_format):
149388
return f"""
389+
### Executor Mode (single task)
150390
User: compute fib 13 using a subagent
151391
Assistant: Starting a subagent to compute the 13th Fibonacci number.
152392
{ToolUse("ipython", [], 'subagent("fib-13", "compute the 13th Fibonacci number")').to_output(tool_format)}
153393
System: Subagent started successfully.
154394
Assistant: Now we need to wait for the subagent to finish the task.
155395
{ToolUse("ipython", [], 'subagent_wait("fib-13")').to_output(tool_format)}
156396
System: {{"status": "success", "result": "The 13th Fibonacci number is 233"}}.
397+
398+
### Planner Mode (multi-task delegation)
399+
User: implement feature X with tests
400+
Assistant: I'll use planner mode to delegate implementation and testing to separate subagents.
401+
{ToolUse("ipython", [], '''subtasks = [
402+
{{"id": "implement", "description": "Write implementation for feature X"}},
403+
{{"id": "test", "description": "Write comprehensive tests"}},
404+
]
405+
subagent("feature-planner", "Feature X adds new functionality", mode="planner", subtasks=subtasks)''').to_output(tool_format)}
406+
System: Planner spawned 2 executor subagents.
407+
Assistant: Now I'll wait for both subtasks to complete.
408+
{ToolUse("ipython", [], 'subagent_wait("feature-planner-implement")').to_output(tool_format)}
409+
System: {{"status": "success", "result": "Implementation complete in feature_x.py"}}.
410+
{ToolUse("ipython", [], 'subagent_wait("feature-planner-test")').to_output(tool_format)}
411+
System: {{"status": "success", "result": "Tests complete in test_feature_x.py, all passing"}}.
412+
413+
### Context Modes
414+
415+
#### Full Context (default)
416+
User: analyze this codebase
417+
Assistant: I'll use full context mode for comprehensive analysis.
418+
{ToolUse("ipython", [], 'subagent("analyze", "Analyze code quality and suggest improvements", context_mode="full")').to_output(tool_format)}
419+
420+
#### Instructions-Only Mode (minimal context)
421+
User: compute the sum of 1 to 100
422+
Assistant: For a simple computation, I'll use instructions-only mode with minimal context.
423+
{ToolUse("ipython", [], 'subagent("sum", "Compute sum of integers from 1 to 100", context_mode="instructions-only")').to_output(tool_format)}
424+
425+
#### Selective Context (choose specific components)
426+
User: write tests using pytest
427+
Assistant: I'll use selective mode to share only tool descriptions, not workspace files.
428+
{ToolUse("ipython", [], 'subagent("tests", "Write pytest tests for the calculate function", context_mode="selective", context_include=["tools"])').to_output(tool_format)}
157429
""".strip()
158430

159431

0 commit comments

Comments
 (0)