Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ describe('BasicLlmRequestProcessor', () => {
model: 'test-basic-processor-model',
});
const runConfig: RunConfig = {
responseModalities: ['AUDIO' as unknown as Modality],
responseModalities: [Modality.AUDIO],
speechConfig: {voiceConfig: {prebuiltVoiceConfig: {voiceName: 'Puck'}}},
outputAudioTranscription: {},
inputAudioTranscription: {},
Expand Down
10 changes: 10 additions & 0 deletions dev/src/cli/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ const A2A_OPTION = new Option(
'--a2a [boolean]',
'Optional. Whether to enable A2A for web/api server. Default: false',
).default(false);
const RELOAD_AGENTS_OPTION = new Option(
'--reload_agents [boolean]',
'Optional. Watch agent files for changes and automatically reload them. Default: false. To see any changes to your agent file, you need to initiate a new agent run.',
).default(false);
const AGENT_FILE_MODULE_TYPE = new Option('--file_type <string>', 'Optional. ');
AGENT_FILE_MODULE_TYPE.argChoices = [FileModuleType.CJS, FileModuleType.ESM];

Expand Down Expand Up @@ -206,6 +210,7 @@ export function createProgram(): Command {
.addOption(BUNDLE_AGENT_FILE)
.addOption(AGENT_FILE_MODULE_TYPE)
.addOption(A2A_OPTION)
.addOption(RELOAD_AGENTS_OPTION)
.action(async (agentsDir: string, options: Record<string, string>) => {
const logLevel = getLogLevelFromOptions(options);
setAdkCoreLogLevel(logLevel);
Expand All @@ -223,6 +228,7 @@ export function createProgram(): Command {
otelToCloud: options['otel_to_cloud'] ? true : false,
agentFileLoadOptions: getAgentFileOptions(options),
a2a: getBoolean(options['a2a']),
reloadAgents: getBoolean(options['reload_agents']),
});

await server.start();
Expand All @@ -248,6 +254,7 @@ export function createProgram(): Command {
.addOption(BUNDLE_AGENT_FILE)
.addOption(AGENT_FILE_MODULE_TYPE)
.addOption(A2A_OPTION)
.addOption(RELOAD_AGENTS_OPTION)
.action(async (agentsDir: string, options: Record<string, string>) => {
const logLevel = getLogLevelFromOptions(options);
setAdkCoreLogLevel(logLevel);
Expand All @@ -265,6 +272,7 @@ export function createProgram(): Command {
otelToCloud: options['otel_to_cloud'] ? true : false,
agentFileLoadOptions: getAgentFileOptions(options),
a2a: getBoolean(options['a2a']),
reloadAgents: getBoolean(options['reload_agents']),
});
await server.start();
} catch (error) {
Expand Down Expand Up @@ -340,6 +348,7 @@ export function createProgram(): Command {
.addOption(COMPILE_AGENT_FILE)
.addOption(BUNDLE_AGENT_FILE)
.addOption(AGENT_FILE_MODULE_TYPE)
.addOption(RELOAD_AGENTS_OPTION)
.action(async (agentPath: string, options: Record<string, string>) => {
setAdkCoreLogLevel(getLogLevelFromOptions(options));

Expand All @@ -354,6 +363,7 @@ export function createProgram(): Command {
artifactService: getArtifactServiceFromOptions(options),
otelToCloud: options['otel_to_cloud'] ? true : false,
agentFileLoadOptions: getAgentFileOptions(options),
reloadAgents: getBoolean(options['reload_agents']),
});
} catch (error) {
logger.error('Error running agent:', (error as Error).message);
Expand Down
139 changes: 96 additions & 43 deletions dev/src/cli/cli_run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import {
Runner,
Session,
} from '@google/adk';
import * as fs from 'node:fs';
import * as path from 'node:path';
import * as readline from 'node:readline';

Expand Down Expand Up @@ -101,18 +102,32 @@ interface RunInteractivelyOptions {
artifactService: BaseArtifactService;
sessionService: BaseSessionService;
memoryService?: BaseMemoryService;
onAgentFileReloaded?: (subscribe: (newAgent: BaseAgent) => void) => void;
}
async function runInteractively(
options: RunInteractivelyOptions,
): Promise<void> {
const runner = new Runner({
appName: options.rootAgent.name,
agent: options.rootAgent,
let currentAgent = options.rootAgent;
let runner = new Runner({
appName: currentAgent.name,
agent: currentAgent,
artifactService: options.artifactService,
sessionService: options.sessionService,
memoryService: options.memoryService,
});

options.onAgentFileReloaded?.((newAgent: BaseAgent) => {
currentAgent = newAgent;
runner = new Runner({
appName: newAgent.name,
agent: newAgent,
artifactService: options.artifactService,
sessionService: options.sessionService,
memoryService: options.memoryService,
});
console.log(`Agent reloaded. New runner created with existing session.`);
});

while (true) {
const query = await getUserInput('[user]: ');

Expand Down Expand Up @@ -155,6 +170,7 @@ export interface RunAgentOptions {
memoryService?: BaseMemoryService;
otelToCloud?: boolean;
agentFileLoadOptions?: AgentFileOptions;
reloadAgents?: boolean;
}
export async function runAgent(options: RunAgentOptions): Promise<void> {
try {
Expand All @@ -175,50 +191,87 @@ export async function runAgent(options: RunAgentOptions): Promise<void> {
userId,
});

if (options.inputFile) {
session =
(await runFromInputFile({
appName: rootAgent.name,
userId,
agent: rootAgent,
artifactService,
sessionService,
memoryService,
filePath: options.inputFile,
})) || session;
} else if (options.savedSessionFile) {
const loadedSession = await loadFileData<Session>(
options.savedSessionFile,
);
if (loadedSession) {
for (const event of loadedSession.events) {
await sessionService.appendEvent({session, event});
const content = event.content;
if (content && content.parts?.length) {
const text = content.parts.map((part) => part.text || '').join('');
if (text) {
console.log(`[${event.author}]: ${text}`);
const reloadSubscribers: Array<(agent: BaseAgent) => void> = [];
let watcher: fs.FSWatcher | undefined;

if (options.reloadAgents) {
const agentFilePath = path.join(dirname, options.agentPath);
watcher = fs.watch(agentFilePath, async () => {
try {
await using reloadedFile = new AgentFile(
agentFilePath,
options.agentFileLoadOptions,
);
const newAgent = await reloadedFile.load();
for (const subscriber of reloadSubscribers) {
subscriber(newAgent);
}
} catch (err) {
console.warn('Failed to reload agent:', (err as Error).message);
}
});
}

const onAgentFileReloaded = (subscribe: (agent: BaseAgent) => void) => {
reloadSubscribers.push(subscribe);
};

try {
if (options.inputFile) {
session =
(await runFromInputFile({
appName: rootAgent.name,
userId,
agent: rootAgent,
artifactService,
sessionService,
memoryService,
filePath: options.inputFile,
})) || session;
} else if (options.savedSessionFile) {
const loadedSession = await loadFileData<Session>(
options.savedSessionFile,
);
if (loadedSession) {
for (const event of loadedSession.events) {
await sessionService.appendEvent({session, event});
const content = event.content;
if (content && content.parts?.length) {
const text = content.parts
.map((part) => part.text || '')
.join('');
if (text) {
console.log(`[${event.author}]: ${text}`);
}
}
}
}
}

await runInteractively({
rootAgent,
artifactService,
sessionService,
memoryService,
session,
});
} else {
console.log(`Running agent ${rootAgent.name}, type exit to exit.`);
await runInteractively({
rootAgent,
artifactService,
sessionService,
memoryService,
session,
});
await runInteractively({
rootAgent,
artifactService,
sessionService,
memoryService,
session,
onAgentFileReloaded: options.reloadAgents
? onAgentFileReloaded
: undefined,
});
} else {
console.log(`Running agent ${rootAgent.name}, type exit to exit.`);
await runInteractively({
rootAgent,
artifactService,
sessionService,
memoryService,
session,
onAgentFileReloaded: options.reloadAgents
? onAgentFileReloaded
: undefined,
});
}
} finally {
watcher?.close();
}

if (options.saveSession) {
Expand Down
7 changes: 6 additions & 1 deletion dev/src/server/adk_api_server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ interface ServerOptions {
logger?: Logger;
logLevel?: LogLevel;
a2a?: boolean;
reloadAgents?: boolean;
registerProcessors?: (tracerProvider: TracerProvider) => void;
}

Expand Down Expand Up @@ -99,7 +100,11 @@ export class AdkApiServer {
options.artifactService ?? new InMemoryArtifactService();
this.agentLoader =
options.agentLoader ??
new AgentLoader(options.agentsDir, options.agentFileLoadOptions);
new AgentLoader(
options.agentsDir,
options.agentFileLoadOptions,
options.reloadAgents ?? false,
);
this.serveDebugUI = options.serveDebugUI ?? false;
this.allowOrigins = options.allowOrigins;
this.otelToCloud = options.otelToCloud ?? false;
Expand Down
58 changes: 58 additions & 0 deletions dev/src/utils/agent_loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import esbuild from 'esbuild';
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
import {shimPlugin} from 'esbuild-shim-plugin';
import * as fs from 'node:fs';
import * as fsPromises from 'node:fs/promises';
import * as path from 'node:path';
import {pathToFileURL} from 'node:url';
Expand All @@ -22,6 +23,9 @@ import {
removeFolder,
tryToFindFileRecursively,
} from './file_utils.js';
import {AdkLogger} from './logger.js';

const logger = new AdkLogger({label: 'AgentLoader', colorize: {all: true}});

/**
* Supported file extensions for JavaScript and TypeScript.
Expand Down Expand Up @@ -287,10 +291,12 @@ export class AgentFile {
export class AgentLoader {
private agentsAlreadyPreloaded = false;
private readonly preloadedAgents: Record<string, AgentFile> = {};
private watcher?: fs.FSWatcher;

constructor(
private readonly agentsDirPath: string = process.cwd(),
private readonly options = DEFAULT_AGENT_FILE_OPTIONS,
private readonly watchForChanges = false,
) {
// Do cleanups on exit
const exitHandler = async ({
Expand All @@ -316,6 +322,51 @@ export class AgentLoader {
process.on('uncaughtException', () => exitHandler({exit: true}));
}

/**
* Starts watching the agents directory for file changes. When a change is
* detected all cached agents are invalidated so they are reloaded on the
* next request.
*/
private startWatching(): void {
if (this.watcher) {
return;
}

try {
this.watcher = fs.watch(
this.agentsDirPath,
{recursive: true},
(_event, filename) => {
if (filename && isJsFile(path.extname(filename))) {
logger.info(`Detected change in ${filename}, reloading agents...`);
this.invalidateAll();
}
},
);

this.watcher.on('error', (err) => {
logger.warn('File watcher error:', err.message);
});
} catch (err) {
logger.warn('Could not start file watcher:', (err as Error).message);
}
}

/**
* Disposes all cached agents and marks them for reload on the next request.
*/
private invalidateAll(): void {
for (const agentFile of Object.values(this.preloadedAgents)) {
agentFile.dispose().catch(() => {});
}

for (const key of Object.keys(this.preloadedAgents)) {
delete this.preloadedAgents[key];
}

this.agentsAlreadyPreloaded = false;
}

async listAgents(): Promise<string[]> {
await this.preloadAgents();

Expand All @@ -329,6 +380,8 @@ export class AgentLoader {
}

async disposeAll(): Promise<void> {
this.watcher?.close();
this.watcher = undefined;
await Promise.all(
Object.values(this.preloadedAgents).map((f) => f.dispose()),
);
Expand Down Expand Up @@ -356,6 +409,11 @@ export class AgentLoader {
);

this.agentsAlreadyPreloaded = true;

if (this.watchForChanges && !this.watcher) {
this.startWatching();
}

return;
}

Expand Down
Loading