From 97dd2a2bd13a6a182cd667e9303cccde54006aeb Mon Sep 17 00:00:00 2001 From: n4ze3m Date: Sat, 10 Aug 2024 01:00:00 +0530 Subject: [PATCH] chore: Add autoSyncDataSources column to Bot table and update related code --- app/ui/src/@types/bot.ts | 1 + .../components/Bot/Settings/SettingsBody.tsx | 9 +++ app/ui/src/routes/settings/application.tsx | 11 ++- server/package.json | 1 + server/prisma/migrations/q_14_3/migration.sql | 2 + server/prisma/migrations/q_14_4/migration.sql | 2 + server/prisma/schema.prisma | 2 + server/src/app.ts | 12 ++++ server/src/cron/index.ts | 70 +++++++++++++++++++ server/src/handlers/api/v1/admin/type.ts | 1 + .../src/queue/controllers/crawl.controller.ts | 59 +++++++++------- .../queue/controllers/sitemap.controller.ts | 55 +++++++++------ server/src/schema/api/v1/admin/index.ts | 2 + server/yarn.lock | 18 +++++ 14 files changed, 195 insertions(+), 50 deletions(-) create mode 100644 server/prisma/migrations/q_14_3/migration.sql create mode 100644 server/prisma/migrations/q_14_4/migration.sql create mode 100644 server/src/cron/index.ts diff --git a/app/ui/src/@types/bot.ts b/app/ui/src/@types/bot.ts index d6ba8202..aac427e2 100644 --- a/app/ui/src/@types/bot.ts +++ b/app/ui/src/@types/bot.ts @@ -21,6 +21,7 @@ export type BotSettings = { semanticSearchSimilarityScore: string; inactivityTimeout: number; autoResetSession: boolean; + autoSyncDataSources: boolean; }; chatModel: { label: string; diff --git a/app/ui/src/components/Bot/Settings/SettingsBody.tsx b/app/ui/src/components/Bot/Settings/SettingsBody.tsx index 5a30ffbe..2f2def2a 100644 --- a/app/ui/src/components/Bot/Settings/SettingsBody.tsx +++ b/app/ui/src/components/Bot/Settings/SettingsBody.tsx @@ -168,6 +168,7 @@ export const SettingsBody: React.FC = ({ semanticSearchSimilarityScore: data.semanticSearchSimilarityScore, autoResetSession: data.autoResetSession, inactivityTimeout: data.inactivityTimeout, + autoSyncDataSources: data.autoSyncDataSources, }} form={form} requiredMark={false} @@ -453,6 +454,14 @@ export const SettingsBody: React.FC = ({ placeholder="Enter inactivity timeout" /> + + + + diff --git a/app/ui/src/routes/settings/application.tsx b/app/ui/src/routes/settings/application.tsx index c8812fb1..f9f0a25f 100644 --- a/app/ui/src/routes/settings/application.tsx +++ b/app/ui/src/routes/settings/application.tsx @@ -29,6 +29,7 @@ export default function SettingsApplicationRoot() { dynamicallyFetchOllamaModels: boolean; ollamaURL: string; fileUploadSizeLimit: number; + refetchDatasource: boolean; }; }); @@ -173,9 +174,15 @@ export default function SettingsApplicationRoot() { ]} tooltip="Default is 10" > - - + + +
diff --git a/server/package.json b/server/package.json index 6e7774ff..38494f84 100644 --- a/server/package.json +++ b/server/package.json @@ -61,6 +61,7 @@ "cohere-ai": "^6.2.1", "concurrently": "^7.0.0", "copyfiles": "^2.4.1", + "cron": "^3.1.7", "d3-dsv": "2", "date-fns": "^3.6.0", "discord.js": "^14.11.0", diff --git a/server/prisma/migrations/q_14_3/migration.sql b/server/prisma/migrations/q_14_3/migration.sql new file mode 100644 index 00000000..ce2026bf --- /dev/null +++ b/server/prisma/migrations/q_14_3/migration.sql @@ -0,0 +1,2 @@ +-- AlterTable +ALTER TABLE "DialoqbaseSettings" ADD COLUMN "refetchDatasource" BOOLEAN NOT NULL DEFAULT false; diff --git a/server/prisma/migrations/q_14_4/migration.sql b/server/prisma/migrations/q_14_4/migration.sql new file mode 100644 index 00000000..a42621a5 --- /dev/null +++ b/server/prisma/migrations/q_14_4/migration.sql @@ -0,0 +1,2 @@ +-- AlterTable +ALTER TABLE "Bot" ADD COLUMN "autoSyncDataSources" BOOLEAN DEFAULT false; diff --git a/server/prisma/schema.prisma b/server/prisma/schema.prisma index 46134c8a..406972cd 100644 --- a/server/prisma/schema.prisma +++ b/server/prisma/schema.prisma @@ -44,6 +44,7 @@ model Bot { bot_api_key String? bot_model_api_key String? options Json? @default("{}") @db.Json + autoSyncDataSources Boolean? @default(false) BotAppearance BotAppearance[] document BotDocument[] BotIntegration BotIntegration[] @@ -106,6 +107,7 @@ model DialoqbaseSettings { defaultEmbeddingModel String @default("dialoqbase_eb_text-embedding-ada-002") ollamaURL String? @default("http://host.docker.internal:11434") usePuppeteerFetch Boolean? @default(false) + refetchDatasource Boolean @default(false) } model BotIntegration { diff --git a/server/src/app.ts b/server/src/app.ts index cec4fe06..76ee7bb9 100644 --- a/server/src/app.ts +++ b/server/src/app.ts @@ -13,6 +13,9 @@ import swaggerUi from "@fastify/swagger-ui"; import { pathToFileURL } from "url"; import { Worker } from "bullmq"; import { parseRedisUrl } from "./utils/redis"; +import { CronJob } from 'cron'; +import { processDatasourceCron } from "./cron/index"; + declare module "fastify" { interface Session { is_bot_allowed: boolean; @@ -103,8 +106,17 @@ const worker = new Worker("vector", workerUrl, { useWorkerThreads: workerThreads === "true", }); +const job = new CronJob( + process.env.DB_CRON_TIME || '0 0 0 * * *', + processDatasourceCron, + null, + true, + process.env.DB_CRON_TIMEZONE +); + process.on("SIGINT", async () => { await worker.close(); + job.stop(); process.exit(); }); diff --git a/server/src/cron/index.ts b/server/src/cron/index.ts new file mode 100644 index 00000000..4181d0b4 --- /dev/null +++ b/server/src/cron/index.ts @@ -0,0 +1,70 @@ +import { PrismaClient } from "@prisma/client"; +import { getSettings } from "../utils/common"; +import { queue } from "../queue/q"; +const prisma = new PrismaClient(); + +async function processDatasourceCron() { + try { + await prisma.$connect(); + const setting = await getSettings(prisma); + + if (!setting.refetchDatasource) { + return; + } + + console.log("[CRON] Processing datasource cron"); + + + const dataSources = await prisma.botSource.findMany({ + where: { + bot: { + autoSyncDataSources: true + }, + type: { + in: [ + "website", + "crawl", + "sitemap", + ] + } + }, + include: { + bot: true + } + }) + + for (const dataSource of dataSources) { + + await prisma.botDocument.deleteMany({ + where: { + botId: dataSource.botId, + sourceId: dataSource.id, + }, + }); + await queue.add( + "process", + [ + { + ...dataSource, + embedding: dataSource.bot.embedding, + }, + ], + { + jobId: dataSource.id, + removeOnComplete: true, + removeOnFail: true, + } + ); + } + + + console.log("[CRON] Finished processing datasource cron"); + + } catch (error) { + console.error(error); + } finally { + await prisma.$disconnect(); + } +} + +export { processDatasourceCron }; \ No newline at end of file diff --git a/server/src/handlers/api/v1/admin/type.ts b/server/src/handlers/api/v1/admin/type.ts index 3a68be82..a7514a27 100644 --- a/server/src/handlers/api/v1/admin/type.ts +++ b/server/src/handlers/api/v1/admin/type.ts @@ -5,6 +5,7 @@ export type UpdateDialoqbaseSettingsRequest = { allowUserToRegister: boolean; usePuppeteerFetch: boolean; fileUploadSizeLimit: number; + refetchDatasource: boolean; }; }; diff --git a/server/src/queue/controllers/crawl.controller.ts b/server/src/queue/controllers/crawl.controller.ts index 18bff46c..cabc1215 100644 --- a/server/src/queue/controllers/crawl.controller.ts +++ b/server/src/queue/controllers/crawl.controller.ts @@ -12,37 +12,46 @@ export const crawlQueueController = async (source: QSource) => { const links = Array.from(data?.links || []); for (const link of links) { - const newSource = await prisma.botSource.create({ - data: { + const existingSource = await prisma.botSource.findFirst({ + where: { botId: source.botId, content: link, - isPending: true, - status: "PENDING", - type: "website", }, }); - await websiteQueueController( - { - ...newSource, - embedding: source.embedding, - chunkOverlap: source.chunkOverlap, - chunkSize: source.chunkSize, - usePuppeteerFetch: source.usePuppeteerFetch, - doNotClosePuppeteer: true, - }, - prisma - ); + if (!existingSource) { + const newSource = await prisma.botSource.create({ + data: { + botId: source.botId, + content: link, + isPending: true, + status: "PENDING", + type: "website", + }, + }); - await prisma.botSource.update({ - where: { - id: newSource.id, - }, - data: { - status: "FINISHED", - isPending: false, - }, - }); + await websiteQueueController( + { + ...newSource, + embedding: source.embedding, + chunkOverlap: source.chunkOverlap, + chunkSize: source.chunkSize, + usePuppeteerFetch: source.usePuppeteerFetch, + doNotClosePuppeteer: true, + }, + prisma + ); + + await prisma.botSource.update({ + where: { + id: newSource.id, + }, + data: { + status: "FINISHED", + isPending: false, + }, + }); + } } await closePuppeteer() diff --git a/server/src/queue/controllers/sitemap.controller.ts b/server/src/queue/controllers/sitemap.controller.ts index 93137bd2..1aad3408 100644 --- a/server/src/queue/controllers/sitemap.controller.ts +++ b/server/src/queue/controllers/sitemap.controller.ts @@ -33,34 +33,43 @@ export const sitemapQueueController = async (source: QSource) => { const links = data.sites; for (const link of links) { - const newSource = await prisma.botSource.create({ - data: { + const existingSource = await prisma.botSource.findFirst({ + where: { botId: source.botId, content: link, - isPending: true, - status: "PENDING", - type: "website", }, }); - await websiteQueueController( - { - ...newSource, - embedding: source.embedding, - chunkSize: source.chunkSize, - chunkOverlap: source.chunkOverlap, - }, - prisma - ); + if (!existingSource) { + const newSource = await prisma.botSource.create({ + data: { + botId: source.botId, + content: link, + isPending: true, + status: "PENDING", + type: "website", + }, + }); - await prisma.botSource.update({ - where: { - id: newSource.id, - }, - data: { - status: "FINISHED", - isPending: false, - }, - }); + await websiteQueueController( + { + ...newSource, + embedding: source.embedding, + chunkSize: source.chunkSize, + chunkOverlap: source.chunkOverlap, + }, + prisma + ); + + await prisma.botSource.update({ + where: { + id: newSource.id, + }, + data: { + status: "FINISHED", + isPending: false, + }, + }); + } } }; diff --git a/server/src/schema/api/v1/admin/index.ts b/server/src/schema/api/v1/admin/index.ts index fdbd383f..8149cb61 100644 --- a/server/src/schema/api/v1/admin/index.ts +++ b/server/src/schema/api/v1/admin/index.ts @@ -24,6 +24,7 @@ export const dialoqbaseSettingsSchema: FastifySchema = { ollamaURL: { type: "string" }, usePuppeteerFetch: { type: "boolean" }, fileUploadSizeLimit: { type: "number" }, + refetchDatasource: { type: "boolean" }, }, }, }; @@ -51,6 +52,7 @@ export const updateDialoqbaseSettingsSchema: FastifySchema = { ollamaURL: { type: "string" }, usePuppeteerFetch: { type: "boolean" }, fileUploadSizeLimit: { type: "number" }, + refetchDatasource: { type: "boolean" }, }, }, response: { diff --git a/server/yarn.lock b/server/yarn.lock index 1c63f5b5..26437ebc 100644 --- a/server/yarn.lock +++ b/server/yarn.lock @@ -1427,6 +1427,11 @@ resolved "https://registry.yarnpkg.com/@types/long/-/long-4.0.2.tgz#b74129719fc8d11c01868010082d483b7545591a" integrity sha512-MqTGEo5bj5t157U6fA/BiDynNkn0YknVdh48CMPkTSpFTVmvao5UQmm7uEF6xBEo7qIMAlY/JSleYaE6VOdpaA== +"@types/luxon@~3.4.0": + version "3.4.2" + resolved "https://registry.yarnpkg.com/@types/luxon/-/luxon-3.4.2.tgz#e4fc7214a420173cea47739c33cdf10874694db7" + integrity sha512-TifLZlFudklWlMBfhubvgqTXRzLDI5pCbGa4P8a3wPyUQSW+1xQ5eDsreP9DWHX3tjq1ke96uYG/nwundroWcA== + "@types/mime@*": version "3.0.1" resolved "https://registry.yarnpkg.com/@types/mime/-/mime-3.0.1.tgz#5f8f2bca0a5863cb69bc0b0acd88c96cb1d4ae10" @@ -2705,6 +2710,14 @@ cron-parser@^4.6.0: dependencies: luxon "^3.2.1" +cron@^3.1.7: + version "3.1.7" + resolved "https://registry.yarnpkg.com/cron/-/cron-3.1.7.tgz#3423d618ba625e78458fff8cb67001672d49ba0d" + integrity sha512-tlBg7ARsAMQLzgwqVxy8AZl/qlTc5nibqYwtNGoCrd+cV+ugI+tvZC1oT/8dFH8W455YrywGykx/KMmAqOr7Jw== + dependencies: + "@types/luxon" "~3.4.0" + luxon "~3.4.0" + cross-fetch@^3.1.5: version "3.1.8" resolved "https://registry.yarnpkg.com/cross-fetch/-/cross-fetch-3.1.8.tgz#0327eba65fd68a7d119f8fb2bf9334a1a7956f82" @@ -5313,6 +5326,11 @@ luxon@^3.2.1: resolved "https://registry.yarnpkg.com/luxon/-/luxon-3.3.0.tgz#d73ab5b5d2b49a461c47cedbc7e73309b4805b48" integrity sha512-An0UCfG/rSiqtAIiBPO0Y9/zAnHUZxAMiCpTd5h2smgsj7GGmcenvrvww2cqNA8/4A5ZrD1gJpHN2mIHZQF+Mg== +luxon@~3.4.0: + version "3.4.4" + resolved "https://registry.yarnpkg.com/luxon/-/luxon-3.4.4.tgz#cf20dc27dc532ba41a169c43fdcc0063601577af" + integrity sha512-zobTr7akeGHnv7eBOXcRgMeCP6+uyYsczwmeRCauvpvaAltgNyTbLH/+VaEAPUeWBT+1GuNmz4wC/6jtQzbbVA== + m3u8stream@^0.8.6: version "0.8.6" resolved "https://registry.yarnpkg.com/m3u8stream/-/m3u8stream-0.8.6.tgz#0d6de4ce8ee69731734e6b616e7b05dd9d9a55b1"