-
Notifications
You must be signed in to change notification settings - Fork 55
/
server.tsx
123 lines (105 loc) · 3.65 KB
/
server.tsx
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import "server-only";
import { ReactNode, isValidElement } from "react";
import { AIProvider } from "./client";
import { createStreamableUI, createStreamableValue } from "ai/rsc";
import { Runnable } from "@langchain/core/runnables";
import { CompiledStateGraph } from "@langchain/langgraph";
import { StreamEvent } from "@langchain/core/tracers/log_stream";
import { AIMessage } from "@/ai/message";
export const dynamic = "force-dynamic";
export const CUSTOM_UI_YIELD_NAME = "__yield_ui__";
/**
* Executes `streamEvents` method on a runnable
* and converts the generator to a RSC friendly stream
*
* @param runnable
* @returns React node which can be sent to the client
*/
export function streamRunnableUI<RunInput, RunOutput>(
runnable:
| Runnable<RunInput, RunOutput>
| CompiledStateGraph<RunInput, Partial<RunInput>>,
inputs: RunInput,
) {
const ui = createStreamableUI();
const [lastEvent, resolve] = withResolvers<string>();
(async () => {
let lastEventValue: StreamEvent | null = null;
const callbacks: Record<
string,
ReturnType<typeof createStreamableUI | typeof createStreamableValue>
> = {};
for await (const streamEvent of (
runnable as Runnable<RunInput, RunOutput>
).streamEvents(inputs, {
version: "v2",
})) {
if (
streamEvent.name === CUSTOM_UI_YIELD_NAME &&
isValidElement(streamEvent.data.output.value)
) {
if (streamEvent.data.output.type === "append") {
ui.append(streamEvent.data.output.value);
} else if (streamEvent.data.output.type === "update") {
ui.update(streamEvent.data.output.value);
}
}
if (streamEvent.event === "on_chat_model_stream") {
const chunk = streamEvent.data.chunk;
if ("text" in chunk && typeof chunk.text === "string") {
if (!callbacks[streamEvent.run_id]) {
// the createStreamableValue / useStreamableValue is preferred
// as the stream events are updated immediately in the UI
// rather than being batched by React via createStreamableUI
const textStream = createStreamableValue();
ui.append(<AIMessage value={textStream.value} />);
callbacks[streamEvent.run_id] = textStream;
}
callbacks[streamEvent.run_id].append(chunk.text);
}
}
lastEventValue = streamEvent;
}
// resolve the promise, allowing the client to continue
resolve(lastEventValue?.data.output);
// Close the UI stream for all text streams.
Object.values(callbacks).forEach((cb) => cb.done());
// Close the main UI stream for component streams yielded by tools.
ui.done();
})();
return { ui: ui.value, lastEvent };
}
/**
* Expose these endpoints outside for the client
* We wrap the functions in order to properly resolve importing
* client components.
*
* TODO: replace with createAI instead, even though that
* implicitly handles state management
*
* See https://github.com/vercel/next.js/pull/59615
* @param actions
*/
export function exposeEndpoints<T extends Record<string, unknown>>(
actions: T,
): {
(props: { children: ReactNode }): Promise<JSX.Element>;
$$types?: T;
} {
return async function AI(props: { children: ReactNode }) {
return <AIProvider actions={actions}>{props.children}</AIProvider>;
};
}
/**
* Polyfill to emulate the upcoming Promise.withResolvers
*/
export function withResolvers<T>() {
let resolve: (value: T) => void;
let reject: (reason?: any) => void;
const innerPromise = new Promise<T>((res, rej) => {
resolve = res;
reject = rej;
});
// @ts-expect-error
return [innerPromise, resolve, reject] as const;
}