Skip to content

Commit

Permalink
use SSE
Browse files Browse the repository at this point in the history
  • Loading branch information
tlgimenes committed Aug 22, 2024
1 parent fe9ce04 commit f7aec70
Show file tree
Hide file tree
Showing 8 changed files with 276 additions and 150 deletions.
4 changes: 3 additions & 1 deletion daemon/daemon.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ import {
} from "@std/http/server-sent-event-stream";
import { ENV_SITE_NAME } from "../engine/decofile/constants.ts";
import { createAuth } from "./auth.ts";
import { createFSAPIs } from "./fs/api.ts";
import { createGitAPIS } from "./git.ts";
import { logs } from "./loggings/stream.ts";
import { createRealtimeAPIs } from "./realtime/app.ts";
import { createFSAPIs } from "./fs/api.ts";
import { createSSE } from "./sse/api.ts";

export const DECO_SITE_NAME = Deno.env.get(ENV_SITE_NAME);
export const DECO_ENV_NAME = Deno.env.get("DECO_ENV_NAME");
Expand Down Expand Up @@ -73,6 +74,7 @@ export const createDaemonAPIs = (

app.route("/volumes/:id/files", createRealtimeAPIs());
app.route("/fs", createFSAPIs());
app.route("", createSSE());

return async (c, next) => {
const isDaemonAPI = c.req.header(DAEMON_API_SPECIFIER) ??
Expand Down
197 changes: 68 additions & 129 deletions daemon/fs/api.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import * as colors from "@std/fmt/colors";
import { ensureFile, walk } from "@std/fs";
import { join, SEPARATOR } from "@std/path";
import { createReadWriteLock, type RwLock } from "deco/daemon/async.ts";
import type { StatusResult } from "simple-git";
import { Hono } from "../../runtime/deps.ts";
import { sha1 } from "../../utils/sha1.ts";
import { git, lockerGitAPI } from "../git.ts";
import { broadcast } from "../sse/channel.ts";
import {
applyPatch,
type FSEvent,
Expand Down Expand Up @@ -109,156 +109,95 @@ const shouldIgnore = (path: string) =>
path.includes("/.git/") ||
path.includes("/node_modules/");

export const createFSAPIs = () => {
const app = new Hono();
const cwd = Deno.cwd();
const systemPathFromBrowser = (url: string) => {
const [_, ...segments] = url.split("/file");
const s = segments.join("/file");

const sockets: WebSocket[] = [];

const lockByPath = new Map<string, RwLock>();
const getRwLock = (filepath: string) => {
if (!lockByPath.has(filepath)) {
lockByPath.set(filepath, createReadWriteLock());
}
return join(Deno.cwd(), "/", s);
};

return lockByPath.get(filepath);
};
const browserPathFromSystem = (filepath: string) =>
filepath.replace(Deno.cwd(), "").replaceAll(SEPARATOR, "/");

const send = (msg: FSEvent, socket: WebSocket) => {
if (socket.readyState === WebSocket.OPEN) {
socket.send(JSON.stringify(msg));
}
};
export async function* start(since: number): AsyncIterableIterator<FSEvent> {
const walker = walk(Deno.cwd(), { includeDirs: false, includeFiles: true });

const broadcast = (msg: FSEvent) => {
const msgStr = JSON.stringify(msg);
for (const socket of sockets) {
if (socket.readyState === WebSocket.OPEN) {
socket.send(msgStr);
}
for await (const entry of walker) {
if (shouldIgnore(entry.path)) {
continue;
}
};

const watchFS = async () => {
const watcher = Deno.watchFs(cwd, { recursive: true });

for await (const { kind, paths } of watcher) {
if (kind !== "create" && kind !== "remove" && kind !== "modify") {
continue;
}
const [metadata, stats] = await Promise.all([
inferMetadata(entry.path),
stat(entry.path),
]);

const [filepath] = paths;
const mtime = stats.mtime?.getTime() ?? Date.now();

if (shouldIgnore(filepath)) {
continue;
}
if (
!metadata || mtime < since
) {
continue;
}

const [status, metadata, stats] = await Promise.all([
git.status(),
inferMetadata(filepath),
stat(filepath),
]);
const filepath = browserPathFromSystem(entry.path);
yield {
type: "fs-sync",
detail: { metadata, filepath, timestamp: mtime },
};
}

broadcast({
type: "sync",
detail: {
status,
metadata,
timestamp: stats.mtime?.getTime() ?? Date.now(),
filepath: browserPathFromSystem(filepath),
},
});
}
yield {
type: "fs-snapshot",
detail: { timestamp: Date.now(), status: await git.status() },
};
}

app.use(lockerGitAPI.rlock);

app.get("/watch", (c) => {
const since = Number(c.req.query("since"));
const watchFS = async () => {
const watcher = Deno.watchFs(Deno.cwd(), { recursive: true });

if (c.req.header("Upgrade") !== "websocket") {
return new Response("Missing header Upgrade: websocket ", {
status: 400,
});
for await (const { kind, paths } of watcher) {
if (kind !== "create" && kind !== "remove" && kind !== "modify") {
continue;
}

const { response, socket } = Deno.upgradeWebSocket(c.req.raw);

socket.addEventListener(
"close",
() => {
const index = sockets.findIndex((s) => s === socket);
console.log(
colors.bold(`[admin.deco.cx][${index}]:`),
"socket is",
colors.red("closed"),
);

if (index > -1) {
sockets.splice(index, 1);
}
},
);
const [filepath] = paths;

socket.addEventListener("open", async () => {
const index = sockets.findIndex((s) => s === socket);
console.log(
colors.bold(`[admin.deco.cx][${index}]:`),
"socket is",
colors.green("open"),
);

const walker = walk(cwd, { includeDirs: false, includeFiles: true });

for await (const entry of walker) {
if (shouldIgnore(entry.path)) {
continue;
}

const [metadata, stats] = await Promise.all([
inferMetadata(entry.path),
stat(entry.path),
]);

const mtime = stats.mtime?.getTime() ?? Date.now();

if (
!metadata || mtime < since
) {
continue;
}

const filepath = browserPathFromSystem(entry.path);
send({
type: "sync",
detail: { metadata, filepath, timestamp: mtime },
}, socket);
}
if (shouldIgnore(filepath)) {
continue;
}

if (socket.readyState !== WebSocket.OPEN) {
return;
}
const [status, metadata, stats] = await Promise.all([
git.status(),
inferMetadata(filepath),
stat(filepath),
]);

send({
type: "snapshot",
detail: { timestamp: Date.now(), status: await git.status() },
}, socket);
broadcast({
type: "fs-sync",
detail: {
status,
metadata,
timestamp: stats.mtime?.getTime() ?? Date.now(),
filepath: browserPathFromSystem(filepath),
},
});
}
};

sockets.push(socket);

return response;
});
export const createFSAPIs = () => {
const app = new Hono();

const systemPathFromBrowser = (url: string) => {
const [_, ...segments] = url.split("/file");
const s = segments.join("/file");
const lockByPath = new Map<string, RwLock>();
const getRwLock = (filepath: string) => {
if (!lockByPath.has(filepath)) {
lockByPath.set(filepath, createReadWriteLock());
}

return join(cwd, "/", s);
return lockByPath.get(filepath);
};

const browserPathFromSystem = (filepath: string) =>
filepath.replace(cwd, "").replaceAll(SEPARATOR, "/");
app.use(lockerGitAPI.rlock);

app.get("/file/*", async (c) => {
const filepath = systemPathFromBrowser(c.req.raw.url);
Expand Down Expand Up @@ -346,7 +285,7 @@ export const createFSAPIs = () => {
return c.json(update);
});

watchFS().catch(console.error);

return app;
};

watchFS().catch(console.error);
4 changes: 2 additions & 2 deletions daemon/fs/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ export type Metadata = BlockMetadata | PageBlockMetadata | FileMetadata;
export type GitStatus = StatusResult;

export type FSEvent = {
type: "sync";
type: "fs-sync";
detail: SyncUpdate;
} | {
type: "snapshot";
type: "fs-snapshot";
detail: SnapshotUpdate;
};

Expand Down
81 changes: 81 additions & 0 deletions daemon/meta.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import type { JSONSchema } from "deco/types.ts";
import { broadcast } from "./sse/channel.ts";
import { dispatchWorkerState, worker } from "./worker.ts";

export type BlockMap = Record<string, { $ref: string; namespace: string }>;

export interface ManifestBlocks {
blocks: Record<string, BlockMap>;
}

export interface MetaInfo {
major: number;
namespace: string;
version: string;
schema: {
definitions: Record<string, JSONSchema>;
root: Record<string, JSONSchema>;
};
manifest: ManifestBlocks;
site: string;
etag?: string;
}

export type MetaEvent = {
type: "meta-info";
detail: MetaInfo;
};

let meta: PromiseWithResolvers<MetaInfo> | MetaInfo = Promise.withResolvers<
MetaInfo
>();

const metaRequest = (etag: string) =>
new Request(`https://localhost/deco/meta?waitForChanges=true`, {
method: "GET",
headers: {
"Content-Type": "application/json",
"If-None-Match": etag,
},
});

const isPromiseLike = <T>(
x: T | PromiseWithResolvers<T>,
): x is PromiseWithResolvers<T> =>
// @ts-expect-error typescript is wild
typeof x.resolve === "function" && typeof x.reject === "function";

export const start = async (): Promise<MetaEvent> => ({
type: "meta-info",
detail: isPromiseLike(meta) ? await meta.promise : meta,
});

const watchMeta = async () => {
let etag = "";

while (true) {
try {
const w = await worker();
const response = await w.fetch(metaRequest(etag));
const m: MetaInfo = await response.json();

etag = response.headers.get("etag") ?? etag;

const withEtag = { ...m, etag };

if (isPromiseLike(meta)) {
meta.resolve(withEtag);
}

meta = withEtag;

dispatchWorkerState("ready");
broadcast({ type: "meta-info", detail: meta });
} catch (error) {
dispatchWorkerState("updating");
console.error(error);
}
}
};

watchMeta().catch(console.error);
Loading

0 comments on commit f7aec70

Please sign in to comment.