Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 26 additions & 6 deletions web/src/lib/backend.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,22 @@ import { type SettingsChange } from "./miniconf-mqtt-client";
import { type MqttConnectionEvent } from "./mqtt-bus";
import { Schema } from "./schema";

class FakeResponseChannel {
constructor(
private readonly calls: string[],
private readonly prefix: string,
) {}

async set(path: string, value: unknown) {
this.calls.push(`set ${this.prefix} ${path} ${JSON.stringify(value)}`);
return { path, ok: true, code: "Ok", message: "" };
}

close() {
this.calls.push(`stopResponses ${this.prefix}`);
}
}

class FakeClient {
readonly calls: string[] = [];
readonly schemaValue = new Schema([
Expand All @@ -22,6 +38,11 @@ class FakeClient {
return this.schemaValue;
}

async openResponseChannel(prefix: string) {
this.calls.push(`watchResponses ${prefix}`);
return new FakeResponseChannel(this.calls, prefix);
}

watchConnection(listener: (event: MqttConnectionEvent) => void) {
this.calls.push("watchConnection");
this.connectionListener = listener;
Expand All @@ -47,11 +68,6 @@ class FakeClient {
return () => this.calls.push(`stopSettings ${prefix} ${root}`);
}

async set(prefix: string, path: string, value: unknown) {
this.calls.push(`set ${prefix} ${path} ${JSON.stringify(value)}`);
return { path, ok: true, code: "Ok", message: "" };
}

publishSetting(change: SettingsChange) {
this.settingsListener?.(change);
}
Expand Down Expand Up @@ -112,6 +128,7 @@ describe("PrefixSession", () => {
await vi.advanceTimersByTimeAsync(100);
expect(client.calls).toEqual([
"watchConnection",
"watchResponses dt/device",
"watchAlive dt/device",
"schema dt/device",
"watchSettings dt/device ",
Expand All @@ -126,11 +143,12 @@ describe("PrefixSession", () => {

await session.set("/leaf", 3);
session.close();
expect(client.calls.slice(-4)).toEqual([
expect(client.calls.slice(-5)).toEqual([
"set dt/device /leaf 3",
"stopConnection",
"stopSettings dt/device ",
"stopAlive dt/device",
"stopResponses dt/device",
]);
} finally {
vi.useRealTimers();
Expand Down Expand Up @@ -158,6 +176,7 @@ describe("PrefixSession", () => {

expect(client.calls).toEqual([
"watchConnection",
"watchResponses dt/device",
"watchAlive dt/device",
"schema dt/device",
"watchSettings dt/device ",
Expand Down Expand Up @@ -223,6 +242,7 @@ describe("PrefixSession", () => {

await expect(opened).rejects.toThrow("Prefix session closed");
expect(client.calls).toContain("stopAlive dt/device");
expect(client.calls).toContain("stopResponses dt/device");
});

it("does not surface transient reconnect timeouts as app errors", async () => {
Expand Down
15 changes: 14 additions & 1 deletion web/src/lib/backend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {
type DiscoveredPrefix,
type AliveManifest,
type SetResponse,
type SetResponseChannel,
type SettingsChange,
} from "./miniconf-mqtt-client";
import type { MqttAuth, MqttConnectionEvent } from "./mqtt-bus";
Expand Down Expand Up @@ -60,6 +61,7 @@ export class PrefixSession {
private stopConnection: (() => void) | undefined;
private stopAlive: (() => void) | undefined;
private stopSettings: (() => void) | undefined;
private responseChannel: SetResponseChannel | undefined;
private readonly mirror: SettingsMirror;

constructor(
Expand All @@ -74,6 +76,12 @@ export class PrefixSession {
async open(): Promise<void> {
const load = this.beginLoad("opening");
this.watchConnection();
this.responseChannel = await this.client.openResponseChannel(this.prefix);
if (!this.active(load)) {
this.responseChannel.close();
this.responseChannel = undefined;
return;
}
this.callbacks.status("Waiting for alive");
const alive = await this.waitInitialAlive(load);
if (!this.active(load)) {
Expand All @@ -86,8 +94,11 @@ export class PrefixSession {
}

async set(path: string, value: unknown): Promise<SetResponse> {
if (!this.responseChannel) {
throw new Error("Prefix session is not open");
}
this.callbacks.status(`Setting ${displayPath(path)}`);
const response = await this.client.set(this.prefix, path, value);
const response = await this.responseChannel.set(path, value);
this.callbacks.response(response);
return response;
}
Expand All @@ -99,9 +110,11 @@ export class PrefixSession {
this.stopConnection?.();
this.stopSettings?.();
this.stopAlive?.();
this.responseChannel?.close();
this.stopConnection = undefined;
this.stopSettings = undefined;
this.stopAlive = undefined;
this.responseChannel = undefined;
this.mirror.dispose();
}

Expand Down
20 changes: 12 additions & 8 deletions web/src/lib/miniconf-mqtt-client.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { describe, expect, it } from "vitest";
import { describe, expect, it, vi } from "vitest";
import { MiniconfMqttClient } from "./miniconf-mqtt-client";
import { MqttBus } from "./mqtt-bus";
import { FakeMqttClient, ResponseMqttClient } from "./mqtt-test-fixture";
Expand Down Expand Up @@ -69,12 +69,15 @@ describe("MiniconfMqttClient set", () => {
it("matches overlapping set responses by correlation data", async () => {
const mqtt = new ResponseMqttClient();
const client = new MiniconfMqttClient(new MqttBus(mqtt as never));
const responses = await client.openResponseChannel("dt/device");

const first = client.set("dt/device", "/a", 1);
const second = client.set("dt/device", "/b", 2);
for (let i = 0; i < 4; i += 1) {
await Promise.resolve();
}
const first = responses.set("/a", 1);
const second = responses.set("/b", 2);
await vi.waitFor(() => expect(mqtt.publications).toHaveLength(2));

expect(mqtt.subscriptions).toHaveLength(1);
expect(mqtt.unsubscriptions).toEqual([]);
expect(new Set(mqtt.publications.map(({ properties }) => properties.responseTopic)).size).toBe(1);

mqtt.respondToSet("/b", "Ok");
mqtt.respondToSet("/a", "BadRequest", "invalid");
Expand All @@ -90,11 +93,12 @@ describe("MiniconfMqttClient set", () => {

it("validates set path and payload at the protocol boundary", async () => {
const client = new MiniconfMqttClient(new MqttBus(new ResponseMqttClient() as never));
const responses = await client.openResponseChannel("dt/device");

await expect(client.set("dt/device", "a", 1)).rejects.toThrow(
await expect(responses.set("a", 1)).rejects.toThrow(
'Path must be empty or start with "/"',
);
await expect(client.set("dt/device", "/a", undefined)).rejects.toThrow(
await expect(responses.set("/a", undefined)).rejects.toThrow(
"Set value must be JSON-serializable",
);
});
Expand Down
145 changes: 99 additions & 46 deletions web/src/lib/miniconf-mqtt-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,19 @@ type PacketProperties = {
export type MiniconfMqttTransport = Pick<
MqttBus,
| "close"
| "listen"
| "publish"
| "subscribe"
| "watch"
| "watchConnection"
| "withSubscription"
>;

type PendingSetResponse = {
path: string;
resolve: (response: SetResponse) => void;
reject: (error: Error) => void;
timer: ReturnType<typeof globalThis.setTimeout>;
};

function properties(packet: Packet): PacketProperties {
return (packet as Packet & { properties?: PacketProperties }).properties ?? {};
}
Expand Down Expand Up @@ -277,61 +283,108 @@ export class MiniconfMqttClient {
});
}

async set(prefix: string, path: string, value: unknown, timeout = 3000): Promise<SetResponse> {
async openResponseChannel(prefix: string): Promise<SetResponseChannel> {
const topic = `${prefix}/response/${nanoid()}`;
let channel: SetResponseChannel | undefined;
const stop = await this.bus.subscribe(topic, SUBSCRIBE, (message) => {
channel?.handle(message);
});
channel = new SetResponseChannel(this.bus, prefix, topic, stop);
return channel;
}
}

export class SetResponseChannel {
private readonly pending = new Map<string, PendingSetResponse>();
private closed = false;

constructor(
private readonly bus: Pick<MiniconfMqttTransport, "publish">,
private readonly prefix: string,
private readonly topic: string,
private readonly stop: () => void,
) {}

close(): void {
if (this.closed) {
return;
}
this.closed = true;
this.stop();
for (const [key, pending] of this.pending) {
this.reject(key, pending, new Error("Response channel closed"));
}
}

async set(path: string, value: unknown, timeout = 3000): Promise<SetResponse> {
if (this.closed) {
throw new Error("Response channel closed");
}
const settingsPath = miniconfPath(path);
const payload = JSON.stringify(value);
if (payload === undefined) {
throw new Error("Set value must be JSON-serializable");
}
const responseTopic = `${prefix}/response/${nanoid()}`;
const correlation = randomCorrelation();
const key = bytesKey(correlation);
return this.bus.withSubscription(responseTopic, SUBSCRIBE, async () => {
return await new Promise<SetResponse>((resolve, reject) => {
const timer = globalThis.setTimeout(() => {
cleanup();
reject(new Error("Timed out waiting for set response"));
}, timeout);
const cleanup = () => {
globalThis.clearTimeout(timer);
stop();
};
const listener = (message: MqttMessage) => {
// Multiple /set requests may overlap on the same response topic
// pattern. Only MQTT v5 correlation data identifies this response.
if (bytesKey(properties(message.packet).correlationData) !== key) {
return;
}
cleanup();
const code = userProperty(message.packet, "code") || "Error";
const response = decode(message.payload);
resolve({
path: settingsPath,
ok: code === "Ok",
code,
message: response,
});
};
const stop = this.bus.listen(responseTopic, listener);
this.bus.publish(
`${prefix}/set${settingsPath}`,
payload,
{
qos: 1,
properties: {
responseTopic,
correlationData: correlation as never,
payloadFormatIndicator: true,
messageExpiryInterval: Math.max(1, Math.ceil(timeout / 1000)) || TRANSIENT_EXPIRY_S,
},

return await new Promise<SetResponse>((resolve, reject) => {
const timer = globalThis.setTimeout(() => {
this.reject(key, pending, new Error("Timed out waiting for set response"));
}, timeout);
const pending: PendingSetResponse = {
path: settingsPath,
resolve,
reject,
timer,
};
this.pending.set(key, pending);
this.bus.publish(
`${this.prefix}/set${settingsPath}`,
payload,
{
qos: 1,
properties: {
responseTopic: this.topic,
correlationData: correlation as never,
payloadFormatIndicator: true,
messageExpiryInterval: Math.max(1, Math.ceil(timeout / 1000)) || TRANSIENT_EXPIRY_S,
},
).catch((error: unknown) => {
cleanup();
reject(error);
});
},
).catch((error: unknown) => {
this.reject(
key,
pending,
error instanceof Error ? error : new Error(String(error)),
);
});
});
}

handle(message: MqttMessage): void {
const key = bytesKey(properties(message.packet).correlationData);
const pending = this.pending.get(key);
if (!pending) {
return;
}
this.pending.delete(key);
globalThis.clearTimeout(pending.timer);
const code = userProperty(message.packet, "code") || "Error";
pending.resolve({
path: pending.path,
ok: code === "Ok",
code,
message: decode(message.payload),
});
}

private reject(key: string, pending: PendingSetResponse, error: Error): void {
if (!this.pending.delete(key)) {
return;
}
globalThis.clearTimeout(pending.timer);
pending.reject(error);
}
}

function settingsChange(
Expand Down
12 changes: 0 additions & 12 deletions web/src/lib/mqtt-bus.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,18 +139,6 @@ describe("MQTT browser transport", () => {
expect(events).toEqual(["connected:", "error:subscribe failed"]);
});

it("does not resubscribe transient subscriptions on reconnect", async () => {
const mqtt = new FakeMqttClient();
const bus = new MqttBus(mqtt as never);

await bus.withSubscription("dt/device/response/1", { qos: 0 }, async () => {
mqtt.emit("connect");
await Promise.resolve();
});

expect(mqtt.subscriptions).toEqual(["dt/device/response/1"]);
});

it("publishes only while connected", async () => {
const mqtt = new FakeMqttClient();
const bus = new MqttBus(mqtt as never);
Expand Down
Loading
Loading