diff --git a/apps/server/src/index.ts b/apps/server/src/index.ts index c5840f8462..132c607863 100644 --- a/apps/server/src/index.ts +++ b/apps/server/src/index.ts @@ -28,18 +28,16 @@ import { getPinoTransport } from '@hyperdx/node-opentelemetry' import { PRODUCTION } from '@magickml/config' if (PRODUCTION) { - initLogger({ - name: 'cloud-agent-worker', - transport: { - targets: [ - getPinoTransport('info') - ] - }, - level: 'info', - }) + initLogger({ + name: 'cloud-agent-worker', + transport: { + targets: [getPinoTransport('info')] + }, + level: 'info' + }) } else { - initLogger({ name: 'cloud-agent-worker' }) -} + initLogger({ name: 'cloud-agent-worker' }) +} const logger = getLogger() // log handle errors @@ -69,7 +67,7 @@ const routes: Route[] = [...spells, ...apis, ...serverRoutes] * form and multipart-json requests, and routes. */ async function init() { - await initApp() + await initApp('server') await initAgentCommander() // load plugins await (async () => { diff --git a/package-lock.json b/package-lock.json index fd80b3ebca..58a42ff93f 100644 --- a/package-lock.json +++ b/package-lock.json @@ -78,6 +78,7 @@ "@use-gesture/react": "10.2.27", "@welldone-software/why-did-you-render": "7.0.1", "axios": "1.4.0", + "axios-retry": "^3.8.0", "bullmq": "4.6.0", "class-variance-authority": "^0.7.0", "classnames": "2.3.2", @@ -94,6 +95,7 @@ "ethers": "5.7.2", "expletives": "0.1.5", "express": "4.18.2", + "feathers-permissions": "^2.1.4", "feathers-sync": "3.0.3", "flatted": "3.2.7", "flexlayout-react": "0.7.7", @@ -22472,6 +22474,15 @@ "proxy-from-env": "^1.1.0" } }, + "node_modules/axios-retry": { + "version": "3.8.0", + "resolved": "https://registry.npmjs.org/axios-retry/-/axios-retry-3.8.0.tgz", + "integrity": "sha512-CfIsQyWNc5/AE7x/UEReRUadiBmQeoBpSEC+4QyGLJMswTsP1tz0GW2YYPnE7w9+ESMef5zOgLDFpHynNyEZ1w==", + "dependencies": { + "@babel/runtime": "^7.15.4", + "is-retry-allowed": "^2.2.0" + } + }, "node_modules/axobject-query": { "version": "3.2.1", "dev": true, @@ -29206,6 +29217,30 @@ "pend": "~1.2.0" } }, + "node_modules/feathers-permissions": { + "version": "2.1.4", + "resolved": "https://registry.npmjs.org/feathers-permissions/-/feathers-permissions-2.1.4.tgz", + "integrity": "sha512-7z6nBw0FqKQUS31WwRSyXG9LHesCxryVtkhP3L87pvwFB1WeFEMiOwvP4toeYaBmzZ1aFsHDIVECgEIQ9naxtg==", + "dependencies": { + "@feathersjs/errors": "^4.5.8", + "debug": "^4.2.0", + "lodash": "^4.17.20" + }, + "engines": { + "node": ">= 10" + } + }, + "node_modules/feathers-permissions/node_modules/@feathersjs/errors": { + "version": "4.5.17", + "resolved": "https://registry.npmjs.org/@feathersjs/errors/-/errors-4.5.17.tgz", + "integrity": "sha512-HY1YJV/9d5wKd3RPNaWggOhAX4NmOulr5EvBMMm6jaMizJ7UMRUgZmqyRtuHL4h+u2LoLmWv9+wO3V+uCFoULg==", + "dependencies": { + "debug": "^4.3.3" + }, + "engines": { + "node": ">= 10" + } + }, "node_modules/feathers-sync": { "version": "3.0.3", "license": "MIT", @@ -32750,6 +32785,17 @@ "node": ">=0.10.0" } }, + "node_modules/is-retry-allowed": { + "version": "2.2.0", + "resolved": "https://registry.npmjs.org/is-retry-allowed/-/is-retry-allowed-2.2.0.tgz", + "integrity": "sha512-XVm7LOeLpTW4jV19QSH38vkswxoLud8sQ57YwJVTPWdiaI9I8keEhGFpBlslyVsgdQy4Opg8QOLb8YRgsyZiQg==", + "engines": { + "node": ">=10" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/is-root": { "version": "2.1.0", "license": "MIT", diff --git a/package.json b/package.json index ae406bf7b7..9af58e1e83 100644 --- a/package.json +++ b/package.json @@ -109,6 +109,7 @@ "@use-gesture/react": "10.2.27", "@welldone-software/why-did-you-render": "7.0.1", "axios": "1.4.0", + "axios-retry": "^3.8.0", "bullmq": "4.6.0", "class-variance-authority": "^0.7.0", "classnames": "2.3.2", @@ -125,6 +126,7 @@ "ethers": "5.7.2", "expletives": "0.1.5", "express": "4.18.2", + "feathers-permissions": "^2.1.4", "feathers-sync": "3.0.3", "flatted": "3.2.7", "flexlayout-react": "0.7.7", @@ -206,7 +208,6 @@ "tailwindcss-animate": "^1.0.7", "tesseract.js": "4.0.6", "three": "^0.154.0", - "ts-node": "10.9.1", "tslib": "^2.3.0", "twitter-api-v2": "1.14.3", "typechat": "^0.0.10", diff --git a/packages/agents/src/lib/Agent.ts b/packages/agents/src/lib/Agent.ts index 9f1ce4aa17..1d0ef4da38 100644 --- a/packages/agents/src/lib/Agent.ts +++ b/packages/agents/src/lib/Agent.ts @@ -201,6 +201,7 @@ export class Agent implements AgentInterface { async runWorker(job: Job) { // the job name is the agent id. Only run if the agent id matches. + this.logger.debug('running worker', { id: this.id, data: job.data }) if (this.id !== job.data.agentId) return const { data } = job @@ -235,6 +236,7 @@ export class Agent implements AgentInterface { ...this.secrets, ...data.secrets, }, + sessionId: data?.sessionId, publicVariables: this.publicVariables, runSubspell: data.runSubspell, app, @@ -269,6 +271,7 @@ export class Agent implements AgentInterface { export interface AgentRunJob { inputs: MagickSpellInput + sessionId?: string jobId: string agentId: string spellId: string diff --git a/packages/agents/src/lib/AgentCommander.ts b/packages/agents/src/lib/AgentCommander.ts index 753e5c280e..b480f97287 100644 --- a/packages/agents/src/lib/AgentCommander.ts +++ b/packages/agents/src/lib/AgentCommander.ts @@ -15,7 +15,8 @@ import { AgentResult, AgentRunJob } from './Agent' import { AGENT_RESPONSE_TIMEOUT_MSEC } from '@magickml/config' export type RunRootSpellArgs = { - agent: Agent + agent?: Agent + agentId?: string inputs: MagickSpellInput componentName?: string runSubspell?: boolean @@ -25,6 +26,7 @@ export type RunRootSpellArgs = { isSubSpell?: boolean currentJob?: Job subSpellDepth?: number + sessionId?: string } interface AgentCommanderArgs { @@ -41,7 +43,10 @@ export class AgentCommander extends EventEmitter { } runSpellWithResponse(args: RunRootSpellArgs) { - const { agent } = args + const { agentId, agent } = args + const id = agentId || agent?.id + if (!id) throw new Error('Agent or agent id is required') + return new Promise((resolve, reject) => { ;(async () => { setTimeout(() => { @@ -50,7 +55,7 @@ export class AgentCommander extends EventEmitter { let jobId: null | string = null - const agentMessageName = AGENT_RUN_RESULT(agent.id) + const agentMessageName = AGENT_RUN_RESULT(id) this.pubSub.subscribe(agentMessageName, (data: AgentResult) => { if (data.result.error) { @@ -66,7 +71,7 @@ export class AgentCommander extends EventEmitter { } }) - const agentErrorName = AGENT_RUN_ERROR(agent.id) + const agentErrorName = AGENT_RUN_ERROR(id) this.pubSub.subscribe(agentErrorName, (data: AgentResult) => { if (data.jobId === jobId) { this.pubSub.unsubscribe(agentErrorName) @@ -83,7 +88,7 @@ export class AgentCommander extends EventEmitter { private runRootSpellArgsToString( jobId: string, { - agent, + agentId, inputs, componentName, runSubspell, @@ -91,38 +96,46 @@ export class AgentCommander extends EventEmitter { publicVariables, spellId, subSpellDepth, + sessionId, }: RunRootSpellArgs ) { return JSON.stringify({ jobId, - agentId: agent.id, - spellId: spellId || agent.rootSpellId, + agentId, + spellId: spellId, inputs, componentName, runSubspell, secrets, publicVariables, subSpellDepth, + sessionId, }) } async runSubSpell(args: RunRootSpellArgs) { - const { agent } = args + const { agentId, agent } = args + const id = agentId || agent?.id + if (!id) throw new Error('Agent or agent id is required') + const jobId = uuidv4() await this.pubSub.publish( - AGENT_RUN_JOB(agent.id), + AGENT_RUN_JOB(id), this.runRootSpellArgsToString(jobId, args) ) return jobId } async runSpell(args: RunRootSpellArgs) { - const { agent } = args - this.logger.debug(`Running Spell on Agent: ${agent.id}`) - this.logger.debug(AGENT_RUN_JOB(agent.id)) + const { agent, agentId } = args + const id = agentId || agent?.id + if (!id) throw new Error('Agent or agent id is required') + + this.logger.debug(`Running Spell on Agent: ${id}`) + this.logger.debug(AGENT_RUN_JOB(id)) const jobId = uuidv4() await this.pubSub.publish( - AGENT_RUN_JOB(agent.id), + AGENT_RUN_JOB(id), this.runRootSpellArgsToString(jobId, args) ) return jobId diff --git a/packages/cloud-agent-manager/src/lib/CloudAgentManager.ts b/packages/cloud-agent-manager/src/lib/CloudAgentManager.ts index 4ac3d28370..1d8c398184 100644 --- a/packages/cloud-agent-manager/src/lib/CloudAgentManager.ts +++ b/packages/cloud-agent-manager/src/lib/CloudAgentManager.ts @@ -1,10 +1,15 @@ -import pino from "pino" -import { diff, unique } from "radash" -import { AGENT_DELETE, AGENT_DELETE_JOB, AGENT_UPDATE_JOB, getLogger } from "@magickml/core" -import type { Reporter } from "./Reporters" -import { type PubSub, type MessageQueue, app } from "@magickml/server-core" -import { Agent } from "packages/core/server/src/services/agents/agents.schema" -import { HEARTBEAT_MSEC, MANAGER_WARM_UP_MSEC } from "@magickml/config" +import pino from 'pino' +import { diff, unique } from 'radash' +import { + AGENT_DELETE, + AGENT_DELETE_JOB, + AGENT_UPDATE_JOB, + getLogger, +} from '@magickml/core' +import type { Reporter } from './Reporters' +import { type PubSub, type MessageQueue, app } from '@magickml/server-core' +import { Agent } from 'packages/core/server/src/services/agents/agents.schema' +import { HEARTBEAT_MSEC, MANAGER_WARM_UP_MSEC } from '@magickml/config' interface CloudAgentManagerConstructor { pubSub: PubSub @@ -42,18 +47,26 @@ export class CloudAgentManager { this.logger.info(`Agent Updated: ${agent.id}`) if (agent.enabled) { - this.logger.info(`Agent ${agent.id} enabled, adding to cloud agent worker`) - await this.newQueue.addJob('agent:new', {agentId: agent.id}) + this.logger.info( + `Agent ${agent.id} enabled, adding to cloud agent worker` + ) + await this.newQueue.addJob('agent:new', { agentId: agent.id }) this.logger.debug(`Agent create job for ${agent.id} added`) return } - this.pubSub.publish(AGENT_UPDATE_JOB(agent.id), JSON.stringify({ agentId: agent.id })) + this.pubSub.publish( + AGENT_UPDATE_JOB(agent.id), + JSON.stringify({ agentId: agent.id }) + ) }) this.agentStateReporter.on(AGENT_DELETE, async (data: unknown) => { const agent = data as Agent - this.pubSub.publish(AGENT_DELETE_JOB(agent.id), JSON.stringify({ agentId: agent.id })) + this.pubSub.publish( + AGENT_DELETE_JOB(agent.id), + JSON.stringify({ agentId: agent.id }) + ) }) } @@ -61,10 +74,13 @@ export class CloudAgentManager { const deduped = unique(agents) const diffAgents = diff(agents, deduped) - this.logger.info("deduping agents %o", diffAgents) - diffAgents.forEach(async (agentId) => { - await this.pubSub.publish(AGENT_DELETE_JOB(agentId), JSON.stringify({ agentId: agentId })) - await this.newQueue.addJob('agent:new', {agentId: agentId}) + this.logger.trace('deduping agents %o', diffAgents) + diffAgents.forEach(async agentId => { + await this.pubSub.publish( + AGENT_DELETE_JOB(agentId), + JSON.stringify({ agentId: agentId }) + ) + await this.newQueue.addJob('agent:new', { agentId: agentId }) }) return deduped @@ -72,39 +88,51 @@ export class CloudAgentManager { // Eventually we'll need this heartbeat to keep track of running agents on workers async heartbeat() { - this.logger.debug("Started heartbeat") + this.logger.debug('Started heartbeat') let agentsOfWorkers: string[] = [] - this.pubSub.subscribe("heartbeat-pong", async (agents: string[]) => { - this.logger.debug("Got heartbeat pong") + this.pubSub.subscribe('heartbeat-pong', async (agents: string[]) => { + this.logger.trace('Got heartbeat pong') agents.forEach(a => agentsOfWorkers.push(a)) agentsOfWorkers = await this.dedupeAgents(agentsOfWorkers) }) - await this.pubSub.publish("heartbeat-ping", "{}") - - setTimeout(() => - setInterval(async () => { - this.logger.debug(`Starting Heartbeat update`) - const enabledAgents = await app.service('agents').find({ - query: { - enabled: true, - }, - }) - - const agentDiff = diff(enabledAgents.data.map(a => a.id), Array.from(agentsOfWorkers)) - const agentsToUpdate = enabledAgents.data.filter(a => agentDiff.includes(a.id)) - - if (agentDiff.length > 0) { - this.logger.info(`Found ${agentDiff.length} agents to Update`) - const agentPromises: Promise[] = [] - for (const agent of agentsToUpdate) { - this.logger.debug(`Adding agent ${agent.id} to cloud agent worker`) - agentPromises.push(this.newQueue.addJob('agent:new', {agentId: agent.id})) + await this.pubSub.publish('heartbeat-ping', '{}') + + setTimeout( + () => + setInterval(async () => { + this.logger.trace(`Starting Heartbeat update`) + const enabledAgents = await app.service('agents').find({ + query: { + enabled: true, + }, + }) + + const agentDiff = diff( + enabledAgents.data.map(a => a.id), + Array.from(agentsOfWorkers) + ) + const agentsToUpdate = enabledAgents.data.filter(a => + agentDiff.includes(a.id) + ) + + if (agentDiff.length > 0) { + this.logger.info(`Found ${agentDiff.length} agents to Update`) + const agentPromises: Promise[] = [] + for (const agent of agentsToUpdate) { + this.logger.debug( + `Adding agent ${agent.id} to cloud agent worker` + ) + agentPromises.push( + this.newQueue.addJob('agent:new', { agentId: agent.id }) + ) + } + + await Promise.all(agentPromises) } - - await Promise.all(agentPromises) - } - agentsOfWorkers = []; - this.pubSub.publish("heartbeat-ping", "{}") - }, HEARTBEAT_MSEC), MANAGER_WARM_UP_MSEC) + agentsOfWorkers = [] + this.pubSub.publish('heartbeat-ping', '{}') + }, HEARTBEAT_MSEC), + MANAGER_WARM_UP_MSEC + ) } } diff --git a/packages/cloud-agent-worker/src/lib/cloud-agent-worker.ts b/packages/cloud-agent-worker/src/lib/cloud-agent-worker.ts index 894df2b2cf..ac991fa93e 100644 --- a/packages/cloud-agent-worker/src/lib/cloud-agent-worker.ts +++ b/packages/cloud-agent-worker/src/lib/cloud-agent-worker.ts @@ -40,7 +40,7 @@ export class CloudAgentWorker extends AgentManager { }) this.pubSub.subscribe('heartbeat-ping', async () => { - this.logger.debug('Got heartbeat ping') + this.logger.trace('Got heartbeat ping') const agentIds = Object.keys(this.currentAgents) this.pubSub.publish('heartbeat-pong', JSON.stringify(agentIds)) }) diff --git a/packages/config/src/config.ts b/packages/config/src/config.ts index 8f78a2ddda..caeb909150 100644 --- a/packages/config/src/config.ts +++ b/packages/config/src/config.ts @@ -122,6 +122,10 @@ export const AGENT_RESPONSE_TIMEOUT_MSEC = export const CLOUD_AGENT_KEY = getVarForEnvironment('CLOUD_AGENT_KEY') || v4() +export const BACKOFF_RETRY_LIMIT = Number( + getVarForEnvironment('BACKOFF_RETRY_LIMIT') || 0 +) + export const AWS_ACCESS_KEY = getVarForEnvironment('AWS_ACCESS_KEY') || '' export const AWS_SECRET_KEY = getVarForEnvironment('AWS_SECRET_KEY') || '' export const AWS_REGION = getVarForEnvironment('AWS_REGION') || '' diff --git a/packages/core/client/src/components/Drawer/CustomNode.tsx b/packages/core/client/src/components/Drawer/CustomNode.tsx index 17804c5b39..70232a8021 100644 --- a/packages/core/client/src/components/Drawer/CustomNode.tsx +++ b/packages/core/client/src/components/Drawer/CustomNode.tsx @@ -1,4 +1,4 @@ -import React, { useState } from 'react' +import { useState,useEffect } from 'react' import Typography from '@mui/material/Typography' import { NodeModel } from '@minoru/react-dnd-treeview' import ChevronRightIcon from '@mui/icons-material/ChevronRight' @@ -138,6 +138,8 @@ export const CustomNode: React.FC = props => { return } + await dispatch(closeTab(props.node.id)) + const spell: any = props.node.id const response: any = await patchSpell({ id: props.node.id, @@ -145,17 +147,21 @@ export const CustomNode: React.FC = props => { name: newName, }, }) - + if (response.error) { enqueueSnackbar('Error saving spell', { variant: 'error', }) return } - - enqueueSnackbar('Spell saved', { variant: 'success' }) - - dispatch(closeTab(props.node.id)) + + if (response){ + enqueueSnackbar('Spell saved', { variant: 'success' }) + dispatch( + closeTab(props.node.id ) + ) + } + dispatch( openTab({ name: props.node.id + '-' + encodeURIComponent(btoa(newName)), @@ -165,7 +171,6 @@ export const CustomNode: React.FC = props => { ) setToDelete(spell) setIsAdded(true) - setIsRenaming(false) } const setClassSelectedFile = () => { @@ -174,13 +179,17 @@ export const CustomNode: React.FC = props => { return activeTab?.name === props.node.text ? styles.isSelected : '' } - React.useEffect(() => { + useEffect(() => { if (isRenaming) { const renameInput = document?.querySelector('.rename-input') as HTMLElement if (renameInput) renameInput.focus() } }, [isRenaming]) + // useEffect(() => { + // setIsRenaming(false) + // }, [props.node, newName]) + return (
{ // if the route is to the api service, skip auth if (context.path === 'api') { + context.params.user = { + id: 'api', + permissions: ['admin', 'owner'], + } + return next() } // if we are authenticated with the API key, skip auth if (context.params.authenticated && context.params.apiKey) { + // set the user to the api user for all permissions here + context.params.user = { + id: 'api', + permissions: ['admin', 'owner'], + } return next() } @@ -225,7 +243,11 @@ export async function initApp() { ], }, before: { - all: [], + all: [ + checkPermissions({ + roles: ['admin', 'owner', 'public'], + }), + ], }, after: {}, error: {}, diff --git a/packages/core/server/src/index.ts b/packages/core/server/src/index.ts index 56ae8e9cb3..6d39958f49 100644 --- a/packages/core/server/src/index.ts +++ b/packages/core/server/src/index.ts @@ -1,47 +1,49 @@ -// DOCUMENTED +// DOCUMENTED /** * This file exports several modules that provide functions and classes for various parts of the application. * Please see individual modules for detailed documentation on their usage. */ // Export all API related modules -export * from './api/apis'; -export * from './api'; +export * from './api/apis' +export * from './api' // Export the main app module -export * from './app'; +export * from './app' // Configuration-related exports -export * from './config/configuration'; -export * from './config/types'; -export * from './config/validators'; +export * from './config/configuration' +export * from './config/types' +export * from './config/validators' // Declaration of types -export * from './declarations'; +export * from './declarations' // Export file server -export * from './servers/fileServer'; +export * from './servers/fileServer' // Export google text-to-speech and speech-to-text servers -export * from './servers/googleSpeechToText'; -export * from './servers/googleTextToSpeech'; +export * from './servers/googleSpeechToText' +export * from './servers/googleTextToSpeech' // Export hooks -export * from './hooks'; +export * from './hooks' // Export the database client -export * from './dbClient'; +export * from './dbClient' // Export service modules -export * from './services'; +export * from './services' // Export socket related modules -export * from './sockets/sockets'; +export * from './sockets/sockets' //Export the tiktalknet server -export * from './servers/tiktalknet'; +export * from './servers/tiktalknet' // Export metering functions -export * from './metering'; +export * from './metering' // tools for intrasystem communication export * from './communication' + +export * from './lib/feathersPermissions' diff --git a/packages/core/server/src/lib/feathersPermissions.ts b/packages/core/server/src/lib/feathersPermissions.ts new file mode 100644 index 0000000000..3103479ab9 --- /dev/null +++ b/packages/core/server/src/lib/feathersPermissions.ts @@ -0,0 +1,97 @@ +import { Forbidden } from '@feathersjs/errors' +import { HookContext } from '@feathersjs/feathers' +import get from 'lodash/get' +import createDebug from 'debug' + +const debug = createDebug('feathers-permissions') + +type Options = { + entity?: string + field?: string + error?: boolean + permissions?: string[] | Function + roles?: string[] | Function +} + +export function checkPermissions(options: Options = {}) { + const { + entity: entityName = 'user', + field = 'permissions', + error = true, + } = options + const permissions = options.permissions || options.roles + + if (!Array.isArray(permissions) && typeof permissions !== 'function') { + throw new Error( + "'roles' option for feathers-permissions hook must be an array or a function" + ) + } + + return async (context: HookContext) => { + const { params, type, method } = context + const currentPermissions = await Promise.resolve( + typeof permissions === 'function' ? permissions(context) : permissions + ) + + if (type !== 'before') { + throw new Error( + "The feathers-permissions hook should only be used as a 'before' hook." + ) + } + + debug('Running checkPermissions hook with options:', { + entityName, + field, + roles: permissions, + }) + + const entity = context.params[entityName] + + if (!entity) { + debug( + `hook.params.${entityName} does not exist. If you were expecting it to be defined check your hook order and your idField options in your auth config.` + ) + + if (params.provider) { + throw new Forbidden( + 'You do not have the correct permissions (invalid permission entity).' + ) + } + + return context + } + + // Normalize permissions. They can either be a comma separated string or an array. + const value = get(entity, field, []) + const permissionList = + typeof value === 'string' + ? value.split(',').map(current => current.trim()) + : value + const requiredPermissions = ['*', `*:${method}`] + + currentPermissions.forEach(permission => + requiredPermissions.push( + `${permission}`, + `${permission}:*`, + `${permission}:${method}` + ) + ) + + debug('Required Permissions', requiredPermissions) + + const permitted = permissionList.some(current => + requiredPermissions.includes(current) + ) + + if (error !== false && !permitted) { + throw new Forbidden('You do not have the correct permissions.') + } + + context.params = { + permitted, + ...params, + } + + return context + } +} diff --git a/packages/core/server/src/services/agentImage/agentImage.ts b/packages/core/server/src/services/agentImage/agentImage.ts index ff773571ad..a9f5346d5d 100644 --- a/packages/core/server/src/services/agentImage/agentImage.ts +++ b/packages/core/server/src/services/agentImage/agentImage.ts @@ -1,11 +1,20 @@ import { Application } from '../../declarations' +import { checkPermissions } from '../../lib/feathersPermissions' import { AgentImageService } from './agentImage.class' export const agentImage = (app: Application) => { app.use('agentImage', new AgentImageService()) // Add any necessary hooks here - app.service('agentImage').hooks({}) + app.service('agentImage').hooks({ + before: { + all: [ + checkPermissions({ + roles: ['owner', 'agent'], + }), + ], + }, + }) } declare module '../../declarations' { diff --git a/packages/core/server/src/services/agents/agents.class.ts b/packages/core/server/src/services/agents/agents.class.ts index bc60e356af..0eddbbaf2d 100644 --- a/packages/core/server/src/services/agents/agents.class.ts +++ b/packages/core/server/src/services/agents/agents.class.ts @@ -4,10 +4,10 @@ import type { Params } from '@feathersjs/feathers' import { KnexService } from '@feathersjs/knex' import type { KnexAdapterParams, KnexAdapterOptions } from '@feathersjs/knex' import { app } from '@magickml/server-core' -import md5 from 'md5'; +import md5 from 'md5' import type { Application } from '../../declarations' import type { Agent, AgentData, AgentPatch, AgentQuery } from './agents.schema' -import { Queue } from 'bullmq' +import { RunRootSpellArgs } from '@magickml/agents' // Define AgentParams type based on KnexAdapterParams with AgentQuery export type AgentParams = KnexAdapterParams @@ -31,14 +31,10 @@ export class AgentService< ServiceParams extends Params = AgentParams > extends KnexService { app: Application - runQueue: Queue constructor(options: KnexAdapterOptions, app: Application) { super(options) this.app = app - this.runQueue = new Queue(`agent:run`, { - connection: app.get('redis'), - }) } // we use this ping to avoid firing a patched event on the agent @@ -53,22 +49,21 @@ export class AgentService< return { data: query } } - async run(data: AgentRunData) { + async run(data: Omit) { if (!data.agentId) throw new Error('agentId is required') // probably need to authenticate the request here against project id // add the job to the queueD - const job = await this.runQueue.add(data.agentId, { - ...data, - }) + + const agentCommander = this.app.get('agentCommander') + const response = await agentCommander.runSpellWithResponse(data) // return the job id - return { jobId: job.id } + return { response } } async create( data: AgentData | AgentData[] | any ): Promise { - // ADDING REST API KEY TO AGENT's DATA if (data.data) { data.data = JSON.stringify({ @@ -100,4 +95,3 @@ export const getOptions = (app: Application): KnexAdapterOptions => { multi: ['remove'], } } - diff --git a/packages/core/server/src/services/agents/agents.ts b/packages/core/server/src/services/agents/agents.ts index 5212698f44..f3dca43d0c 100644 --- a/packages/core/server/src/services/agents/agents.ts +++ b/packages/core/server/src/services/agents/agents.ts @@ -5,7 +5,6 @@ */ // Import necessary modules and functions -import * as BullMQ from 'bullmq' import { hooks as schemaHooks } from '@feathersjs/schema' import { agentDataValidator, @@ -22,12 +21,35 @@ import type { Application, HookContext } from '../../declarations' import { AgentService, getOptions } from './agents.class' import { jsonResolver } from '../utils' import { v4 as uuidv4 } from 'uuid' +import { checkPermissions } from '../../lib/feathersPermissions' // Re-export agents.class and agents.schema export * from './agents.class' export * from './agents.schema' -const AGENT_EVENTS = ['log', 'result', 'spell'] +function removeUnwantedProperties(obj: any, keysToRemove: string[]): any { + // Base cases + if (obj === null || typeof obj !== 'object') { + return obj + } + + // For arrays, iterate over each item + if (Array.isArray(obj)) { + return obj.map(item => removeUnwantedProperties(item, keysToRemove)) + } + + // Create a new object without the unwanted properties + const result: any = {} + for (const key of Object.keys(obj)) { + if (!keysToRemove.includes(key)) { + result[key] = removeUnwantedProperties(obj[key], keysToRemove) + } + } + + return result +} + +const AGENT_EVENTS = ['log', 'result', 'spell', 'run'] /** * Configure the agent service by registering it, its hooks, and its options. @@ -44,7 +66,7 @@ export const agent = (app: Application) => { // this handles relaying all agent messages up to connected clients. pubSub.patternSubscribe('agent*', (message, channel) => { - if (app.get('isAgent')) return + if (app.get('environment') !== 'server') return // parse the channel from agent:agentId:messageType const agentId = channel.split(':')[1] @@ -57,43 +79,31 @@ export const agent = (app: Application) => { app.service('agents').emit('log', { channel, agentId, + project: agentId, data: { - message: `Unknown message type ${messageType}`, + message: `Unknown message type ${messageType} on channel ${channel}`, }, }) } + // remove unwanted properties from the message + // embeddings and spells are large data packages we don't need on the client + const cleanMessage = removeUnwantedProperties(message, [ + 'embedding', + 'spell', + ]) + // this is where we relay messages up based upon the time. // note for every custom type we need to add it to the above // todo harder typing on all message transports app.service('agents').emit(messageType, { - ...message, + ...cleanMessage, messageType, channel, agentId, }) }) - // todo more predictable channel names and method for handling message queues - // similar to the above - new BullMQ.Worker( - 'agent:run:result', - async job => { - // we wil shuttle this message from here back up a socket to the client - const { agentId, projectId, originalData } = job.data - // emit custom events via the agent service - app.service('agents').emit('result', { - channel: `agent:${agentId}`, - sessionId: originalData.sessionId, - projectId, - data: job.data, - }) - }, - { - connection: app.get('redis'), - } - ) - // Initialize hooks for the agent service app.service('agents').hooks({ around: { @@ -105,6 +115,9 @@ export const agent = (app: Application) => { }, before: { all: [ + checkPermissions({ + roles: ['owner', 'agent'], + }), schemaHooks.validateQuery(agentQueryValidator), schemaHooks.resolveQuery(agentQueryResolver), ], diff --git a/packages/core/server/src/services/documents/documents.class.ts b/packages/core/server/src/services/documents/documents.class.ts index e9356ba671..660b50f5aa 100644 --- a/packages/core/server/src/services/documents/documents.class.ts +++ b/packages/core/server/src/services/documents/documents.class.ts @@ -126,8 +126,6 @@ export class DocumentService< ) { const param = params.query - console.log('param!!!!!!!', param) - const querys = await db('documents') .joinRaw( 'inner join embeddings on documents.id = embeddings."documentId" and embeddings.index = 0' diff --git a/packages/core/server/src/services/documents/documents.ts b/packages/core/server/src/services/documents/documents.ts index 6d32d85043..d8d3c199da 100644 --- a/packages/core/server/src/services/documents/documents.ts +++ b/packages/core/server/src/services/documents/documents.ts @@ -9,6 +9,7 @@ import { documentQueryResolver, documentQueryValidator, } from './documents.schema' +import { checkPermissions } from '../../lib/feathersPermissions' // Array with 1536 elements containing 0 const nullArray = new Array(1536).fill(0) @@ -37,6 +38,9 @@ export const document = (app: Application) => { }, before: { all: [ + checkPermissions({ + roles: ['owner', 'documents'], + }), schemaHooks.validateQuery(documentQueryValidator), schemaHooks.resolveQuery(documentQueryResolver), ], diff --git a/packages/core/server/src/services/events/events.ts b/packages/core/server/src/services/events/events.ts index ad00f7de3b..4662965695 100644 --- a/packages/core/server/src/services/events/events.ts +++ b/packages/core/server/src/services/events/events.ts @@ -13,6 +13,7 @@ import { import { Application, HookContext } from '../../declarations' import { EventService, getOptions } from './events.class' +import { checkPermissions } from '../../lib/feathersPermissions' /** * Export the Event class and event schema @@ -45,6 +46,9 @@ export const event = (app: Application) => { }, before: { all: [ + checkPermissions({ + roles: ['owner', 'events'], + }), schemaHooks.validateQuery(eventQueryValidator), schemaHooks.resolveQuery(eventQueryResolver), ], diff --git a/packages/core/server/src/services/projects/projects.ts b/packages/core/server/src/services/projects/projects.ts index ad5ba1d496..1933b1b7fe 100644 --- a/packages/core/server/src/services/projects/projects.ts +++ b/packages/core/server/src/services/projects/projects.ts @@ -1,8 +1,9 @@ -// DOCUMENTED +// DOCUMENTED // For more information about this file, see https://dove.feathersjs.com/guides/cli/service.html -import type { Application } from '../../declarations'; -import { ProjectsService } from './projects.class'; -export * from './projects.class'; +import type { Application } from '../../declarations' +import { checkPermissions } from '../../lib/feathersPermissions' +import { ProjectsService } from './projects.class' +export * from './projects.class' /** * Configure function that registers the service and its hooks. @@ -16,7 +17,7 @@ export const projects = (app: Application) => { methods: ['find', 'create'], // You can add additional custom events to be sent to clients here events: [], - }); + }) // Initialize hooks app.service('projects').hooks({ @@ -24,7 +25,11 @@ export const projects = (app: Application) => { all: [], }, before: { - all: [], + all: [ + checkPermissions({ + roles: ['owner', 'projects'], + }), + ], find: [], create: [], }, @@ -34,8 +39,8 @@ export const projects = (app: Application) => { error: { all: [], }, - }); -}; + }) +} // Add this service to the service type index declare module '../../declarations' { @@ -43,6 +48,6 @@ declare module '../../declarations' { * Interface for ServiceTypes */ interface ServiceTypes { - 'projects': ProjectsService; + projects: ProjectsService } -} \ No newline at end of file +} diff --git a/packages/core/server/src/services/requests/requests.ts b/packages/core/server/src/services/requests/requests.ts index 92bc4d5319..baa9c061bf 100644 --- a/packages/core/server/src/services/requests/requests.ts +++ b/packages/core/server/src/services/requests/requests.ts @@ -17,6 +17,7 @@ import { import type { Application } from '../../declarations' import { AnalyticsParams, RequestService, getOptions } from './requests.class' +import { checkPermissions } from '../../lib/feathersPermissions' // Exporting all functions and classes to be used by other modules export * from './requests.class' @@ -50,6 +51,9 @@ export const request = (app: Application): void => { }, before: { all: [ + checkPermissions({ + roles: ['owner', 'requests'], + }), // Push `requestQueryValidator` and `requestQueryResolver` hooks that validate and resolve QueryParams respectively schemaHooks.validateQuery(requestQueryValidator), schemaHooks.resolveQuery(requestQueryResolver), diff --git a/packages/core/server/src/services/spell-runner/spell-runner.class.ts b/packages/core/server/src/services/spell-runner/spell-runner.class.ts index 03836a7e36..4ec9f44950 100644 --- a/packages/core/server/src/services/spell-runner/spell-runner.class.ts +++ b/packages/core/server/src/services/spell-runner/spell-runner.class.ts @@ -68,24 +68,27 @@ export class SpellRunnerService< data: CreateData, params?: ServiceParams ): Promise | void> { + const logger = app.get('logger') if (!app.userSpellManagers) return {} - if (!params) return console.error('No params present in service') + if (!params) return logger.error('No params present in service') const { user } = params as any - if (!user) return console.error('No user is present in service') + if (!user) return logger.error('No user is present in service') const { inputs, projectId, secrets, publicVariables, id } = data const decodedId = id.length > 36 ? id.slice(0, 36) : id const spellManager = app.userSpellManagers.get(user.id) - if (!spellManager) return console.error('No spell manager found for user!') + if (!spellManager) return logger.error('No spell manager found for user!') if (!spellManager.hasSpellRunner(decodedId)) { const spell = await getSpell({ app, id: decodedId, projectId }) await spellManager.load(spell as SpellInterface) } + logger.debug('Running playtest spell %s', id) + const result = await spellManager.run({ spellId: id, inputs, diff --git a/packages/core/server/src/services/spell-runner/spell-runner.ts b/packages/core/server/src/services/spell-runner/spell-runner.ts index 1b5f69bc5b..39b9ef8415 100644 --- a/packages/core/server/src/services/spell-runner/spell-runner.ts +++ b/packages/core/server/src/services/spell-runner/spell-runner.ts @@ -1,17 +1,17 @@ -// DOCUMENTED +// DOCUMENTED /** * This module provides a configure function that registers the spell-runner service and its hooks on a Feathers application instance. * @packageDocumentation */ - -import type { Application } from '../../declarations'; -import { checkForSpellInManager } from '../../hooks/spellmanagerHooks'; -import { SpellRunnerService } from './spell-runner.class'; +import checkPermissions from 'feathers-permissions' +import type { Application } from '../../declarations' +import { checkForSpellInManager } from '../../hooks/spellmanagerHooks' +import { SpellRunnerService } from './spell-runner.class' /** * Exports all members of the `SpellRunnerService` module. */ -export * from './spell-runner.class'; +export * from './spell-runner.class' /** * Configures a Feathers application instance by registering the `spell-runner` service and its hooks on it. @@ -24,7 +24,7 @@ export const spellRunner = (app: Application): void => { methods: ['get', 'create', 'update'], // You can add additional custom events to be sent to clients here events: [], - }); + }) // Initialize hooks for the `spell-runner` service app.service('spell-runner').hooks({ @@ -32,7 +32,11 @@ export const spellRunner = (app: Application): void => { all: [], }, before: { - all: [], + all: [ + checkPermissions({ + roles: ['owner', 'spell-runner'], + }) as any, + ], get: [], create: [checkForSpellInManager], // Only check for the spell in the manager before creating it. update: [], @@ -43,14 +47,14 @@ export const spellRunner = (app: Application): void => { error: { all: [], }, - }); -}; + }) +} /** * Augments the `ServiceTypes` interface of the Feathers application so that it includes the `spell-runner` service. */ declare module '../../declarations' { interface ServiceTypes { - 'spell-runner': SpellRunnerService; + 'spell-runner': SpellRunnerService } -} \ No newline at end of file +} diff --git a/packages/core/server/src/services/spells/spells.ts b/packages/core/server/src/services/spells/spells.ts index 512b9fe19a..acc5b13942 100644 --- a/packages/core/server/src/services/spells/spells.ts +++ b/packages/core/server/src/services/spells/spells.ts @@ -4,6 +4,7 @@ */ // Imports +import checkPermissions from 'feathers-permissions' import { hooks as schemaHooks } from '@feathersjs/schema' import type { Application } from '../../declarations' import { checkForSpellInManager } from '../../hooks/spellmanagerHooks' @@ -56,42 +57,15 @@ export const spell = (app: Application) => { all: [ schemaHooks.validateQuery(spellQueryValidator), schemaHooks.resolveQuery(spellQueryResolver), + checkPermissions({ + roles: ['owner', 'spells'], + }) as any, ], find: [], get: [], create: [ schemaHooks.validateData(spellDataValidator), schemaHooks.resolveData(spellDataResolver), - // async (context: HookContext) => { - // const { data, service } = context - // context.data = { - // [service.id]: uuidv4(), - // ...data, - // } - // await context.service - // .find({ - // query: { - // projectId: data.projectId, - // name: data.name, - // }, - // }) - // .then(async param => { - // if (param.data.length >= 1) { - // await context.service - // .find({ - // query: { - // projectId: data.projectId, - // name: { - // $ilike: data.name + ' (%)', - // }, - // }, - // }) - // .then(val => { - // context.data.name = data.name + ' (' + (1 + val.data.length) + ')' - // }) - // } - // }) - // }, ], patch: [ schemaHooks.validateData(spellPatchValidator), diff --git a/packages/core/server/src/services/tasks/tasks.ts b/packages/core/server/src/services/tasks/tasks.ts index 029e426aeb..0157fb618c 100644 --- a/packages/core/server/src/services/tasks/tasks.ts +++ b/packages/core/server/src/services/tasks/tasks.ts @@ -1,22 +1,23 @@ -// DOCUMENTED -import { hooks as schemaHooks } from '@feathersjs/schema'; +// DOCUMENTED +import { hooks as schemaHooks } from '@feathersjs/schema' import { taskExternalResolver, taskPatchResolver, taskPatchValidator, taskQueryResolver, taskQueryValidator, - taskResolver -} from './tasks.schema'; + taskResolver, +} from './tasks.schema' -import { Application } from '../../declarations'; -import { TaskService, getOptions } from './tasks.class'; +import { Application } from '../../declarations' +import { TaskService, getOptions } from './tasks.class' +import { checkPermissions } from '../../lib/feathersPermissions' /** * Export the Task class and task schema */ -export * from './tasks.class'; -export * from './tasks.schema'; +export * from './tasks.class' +export * from './tasks.schema' /** * A configure function that registers the service and its hooks via `app.configure` @@ -25,11 +26,11 @@ export * from './tasks.schema'; export const task = (app: Application) => { // Register our service on the Feathers application app.use('tasks', new TaskService(getOptions(app)), { - methods: ['find', 'get', 'create', 'patch', 'remove'] - }); + methods: ['find', 'get', 'create', 'patch', 'remove'], + }) // Initialize hooks - + app.service('tasks').hooks({ around: { all: [ @@ -39,6 +40,9 @@ export const task = (app: Application) => { }, before: { all: [ + checkPermissions({ + roles: ['owner', 'tasks'], + }), schemaHooks.validateQuery(taskQueryValidator), schemaHooks.resolveQuery(taskQueryResolver), ], @@ -58,12 +62,12 @@ export const task = (app: Application) => { error: { all: [], }, - }); -}; + }) +} // Add this service to the service type index declare module '../../declarations' { interface ServiceTypes { - tasks: TaskService; + tasks: TaskService } } diff --git a/packages/core/server/src/sockets/channels.ts b/packages/core/server/src/sockets/channels.ts index 34bb362b86..afe9d8c3ca 100644 --- a/packages/core/server/src/sockets/channels.ts +++ b/packages/core/server/src/sockets/channels.ts @@ -13,6 +13,8 @@ export default function (app: Application): void { return } + const logger = app.get('logger') + /** * Handle new real-time connections. * @param connection - The new real-time connection. @@ -34,16 +36,27 @@ export default function (app: Application): void { */ app.on('login', (authResult: any, { connection }: any): void => { // Return early if there's no real-time connection (e.g. during REST login) + logger.debug(`CHANNELS: Login event for ${authResult.user.id}`) if (!connection) { + logger.debug(`CHANNELS: No connection for ${authResult.user.id}`) return } // Remove the connection from the anonymous channel app.channel('anonymous').leave(connection) - app.channel('authenticated').join(connection) + if (authResult.sessionId) { + logger.debug(`CHANNELS: Joining session id ${authResult.sessionId}`) + const sessionId = authResult.sessionId + app.channel(sessionId).join(connection) + return + } + + logger.debug( + 'CHANNELS: Joining authenticated channel for project %s', + authResult.project + ) app.channel(authResult.project).join(connection) - // Additional custom channels can be set up and joined here }) /** @@ -52,18 +65,40 @@ export default function (app: Application): void { * @param hook - The hook context. */ app.publish((data: any, context) => { + // get the user from the context + if (app.get('environment') !== 'server') return + + const sessionId = + context.params?.sessionId || + data.sessionId || + data.originalData?.sessionId + // Session IDs are used when we are running a spell in a session + // Currently this is only used for the cloud web client + if (sessionId) { + // conly send the right events up the right channel + logger.trace(`CHANNELS: Publishing to session ${sessionId}!`) + + // Lets not relay up all the patch events + if (context.method === 'patch') return + + // Publish all events to the authenticated user channel + const channel = app.channel(sessionId) + return channel + } + const projectId = context.params?.projectId || context.result.projectId || context.data?.projectId || data.projectId - // don't publish if we are an agent - if (app.get('isAgent')) return - // Lets not relay up all the patch events - if (context.method === 'patch') return + if (context.method === 'patch' || !projectId) return + logger.trace( + `CHANNELS: Publishing event path ${context.path} to project %s!`, + projectId + ) // Publish all events to the authenticated user channel const channel = app.channel(projectId) return channel diff --git a/packages/core/server/src/sockets/sockets.ts b/packages/core/server/src/sockets/sockets.ts index 3e0d8312be..460040d3e8 100644 --- a/packages/core/server/src/sockets/sockets.ts +++ b/packages/core/server/src/sockets/sockets.ts @@ -7,11 +7,13 @@ import { SpellManager } from '@magickml/core' * @returns {(io: any) => void} - A function that takes an `io` instance and sets up socket connections. */ const handleSockets = (app: any) => { + const logger = app.get('logger') return (io: any) => { /** * Set up a connection event listener for incoming sockets. */ io.on('connection', async function (socket: any) { + logger.debug('Socket connected', socket.id) // // Use a custom header for the handshake. let token = socket?.handshake?.query?.token @@ -25,7 +27,7 @@ const handleSockets = (app: any) => { } if (!token) - return console.error( + return logger.error( 'No token provided in handshake query. Socket connection failed.' ) @@ -39,9 +41,16 @@ const handleSockets = (app: any) => { .service('authentication') .verifyAccessToken(token) + logger.debug( + `Socket connection for user ${payload.user.id}: %o`, + payload + ) + const user = payload.user // Attach the user info to the params for use in services socket.feathers.user = user + if (payload.sessionId) socket.feathers.sessionId = payload.sessionId + // Instantiate the interface within the runner rather than the spell manager to avoid shared state issues. const spellManager = new SpellManager({ socket, @@ -50,10 +59,11 @@ const handleSockets = (app: any) => { }) app.userSpellManagers.set(user.id, spellManager) // emit login event to be handled by global app login methods for channels + logger.debug('Emitting login event for connection') app.emit('login', payload, { connection: socket.feathers }) socket.emit('connected') } catch (error: any) { - console.error('Authentication error:', error.message) + logger.error('Authentication error: %o', error.message) socket.emit('auth_error', 'Authentication failed.') return } diff --git a/packages/core/shared/src/communication/agentEventTypes.ts b/packages/core/shared/src/communication/agentEventTypes.ts index 5e98b2c9b1..0f0ae8aea5 100644 --- a/packages/core/shared/src/communication/agentEventTypes.ts +++ b/packages/core/shared/src/communication/agentEventTypes.ts @@ -1,5 +1,7 @@ export const AGENT_RUN_ERROR = (agentId: string) => `agent:${agentId}:run:error` -export const AGENT_RUN_RESULT = (agentId: string) => `agent:${agentId}:run:result` +export const AGENT_RUN_RESULT = (agentId: string) => + `agent:${agentId}:run:result` +export const AGENT_SPELL = (agentId: string) => `agent:${agentId}:spell` export const AGENT_LOG = (agentId: string) => `agent:${agentId}:log` export const AGENT_WARN = (agentId: string) => `agent:${agentId}:warn` export const AGENT_ERROR = (agentId: string) => `agent:${agentId}:error` diff --git a/packages/core/shared/src/logger/index.ts b/packages/core/shared/src/logger/index.ts index 709bacef9d..02bfff5835 100644 --- a/packages/core/shared/src/logger/index.ts +++ b/packages/core/shared/src/logger/index.ts @@ -8,16 +8,17 @@ const defaultLoggerOpts = {} export const initLogger = (opts: object = defaultLoggerOpts) => { if (NODE_ENV === 'development') { logger = pino({ + level: PINO_LOG_LEVEL, transport: { - targets: [ - { - target: 'pino-pretty', - level: PINO_LOG_LEVEL, - options: { - colorize: true, - }, - } - ] + targets: [ + { + target: 'pino-pretty', + level: PINO_LOG_LEVEL, + options: { + colorize: true, + }, + }, + ], }, ...opts, }) @@ -25,7 +26,7 @@ export const initLogger = (opts: object = defaultLoggerOpts) => { return } - logger = pino(opts) + logger = pino(opts) } export const getLogger: () => pino.Logger = () => { diff --git a/packages/core/shared/src/nodes/io/Output.ts b/packages/core/shared/src/nodes/io/Output.ts index bc53e968a0..29ac030adb 100644 --- a/packages/core/shared/src/nodes/io/Output.ts +++ b/packages/core/shared/src/nodes/io/Output.ts @@ -27,10 +27,12 @@ const defaultOutputTypes = [ }, ] + /** * Output component */ export class Output extends MagickComponent { + /** * Constructor for Output component */ @@ -49,7 +51,6 @@ export class Output extends MagickComponent { ) this.common = true - this.module = { nodeType: 'output', socket: anySocket, @@ -162,9 +163,9 @@ export class Output extends MagickComponent { const t = agent.outputTypes.find(t => t.name === outputType) // Find outputType in outputTypes where name is outputType if (!t) { - console.error('output type is not defined', t) + this.logger.error({ outputType, finalType: t }, 'Output type is not defined') } else if (!t.handler) { - console.error('output type handler is not defined', t) + this.logger.error({ outputType, finalType: t }, 'Output type is not defined') } else { t.handler({ output, diff --git a/packages/core/shared/src/nodes/io/Skill.ts b/packages/core/shared/src/nodes/io/Skill.ts index a02b4e33d2..82864475d0 100644 --- a/packages/core/shared/src/nodes/io/Skill.ts +++ b/packages/core/shared/src/nodes/io/Skill.ts @@ -137,10 +137,11 @@ export class Skill extends MagickComponent> { 'Input - Default': task, }, runSubspell: false, - agent: agent, + agentId: agent.id, secrets: agent?.secrets ?? secrets, app: module.app, publicVariables: {}, + spellId: agent.rootSpellId as string, } const outputs = await app .get('agentCommander') diff --git a/packages/core/shared/src/nodes/io/Spell.ts b/packages/core/shared/src/nodes/io/Spell.ts index 4040f80a1c..1d190f4a04 100644 --- a/packages/core/shared/src/nodes/io/Spell.ts +++ b/packages/core/shared/src/nodes/io/Spell.ts @@ -337,7 +337,7 @@ export class SpellComponent extends MagickComponent< spellId: node.data.spellId as string, inputs: flattenedInputs, runSubspell: true, - agent: agent, + agentId: agent.id, secrets: agent?.secrets ?? secrets, app, publicVariables: variables, diff --git a/packages/core/shared/src/nodes/io/SpellByName.ts b/packages/core/shared/src/nodes/io/SpellByName.ts index c065acce5a..d53ae828b4 100644 --- a/packages/core/shared/src/nodes/io/SpellByName.ts +++ b/packages/core/shared/src/nodes/io/SpellByName.ts @@ -131,6 +131,7 @@ export class SpellByName extends MagickComponent> { 'Input - Default': event, }, runSubspell: false, + spellId, agent: agent, secrets: agent?.secrets ?? secrets, app: module.app, diff --git a/packages/core/shared/src/plugins/modulePlugin/module.ts b/packages/core/shared/src/plugins/modulePlugin/module.ts index dbe87abc13..db4f616a73 100644 --- a/packages/core/shared/src/plugins/modulePlugin/module.ts +++ b/packages/core/shared/src/plugins/modulePlugin/module.ts @@ -1,8 +1,17 @@ +type ReadArgs = { + inputs: Record + secrets?: Record + publicVariables?: Record + app?: any + sessionId?: string +} + export class Module { secrets?: Record - publicVariables?: Record + publicVariables?: Record inputs: Record outputs: Record + sessionId?: string app?: any // set to App, but move App to engine first constructor() { this.inputs = {} @@ -12,11 +21,12 @@ export class Module { this.app = null } - read({ inputs, secrets, publicVariables, app }) { + read({ inputs, secrets, publicVariables, app, sessionId }: ReadArgs) { this.inputs = inputs this.secrets = secrets || ({} as Record) - this.publicVariables = publicVariables || ({} as Record) + this.publicVariables = publicVariables || ({} as Record) this.app = app + this.sessionId = sessionId } write(outputs: Record) { diff --git a/packages/core/shared/src/plugins/remotePlugin/index.ts b/packages/core/shared/src/plugins/remotePlugin/index.ts index 278f5288e4..580ad0587b 100644 --- a/packages/core/shared/src/plugins/remotePlugin/index.ts +++ b/packages/core/shared/src/plugins/remotePlugin/index.ts @@ -155,7 +155,11 @@ function install( // the event is agent:agentId:spell since we are in the spellrunnner emit({ eventType, + sessionId: context.module.sessionId || null, nodeId: node.id, + component: component.name, + outputType: node.data.outputType || null, + name: node.data.name, output: result, input: inputs, }) @@ -167,6 +171,10 @@ function install( output: null, eventType, input: inputs, + component: component.name, + name: node.data.name, + outputType: node.data.outputType || null, + sessionId: context.module.sessionId || null, error: { message: err.message, stack: err.stack, diff --git a/packages/core/shared/src/spellManager/SpellManager.ts b/packages/core/shared/src/spellManager/SpellManager.ts index a70b3de65d..6fd1446a43 100644 --- a/packages/core/shared/src/spellManager/SpellManager.ts +++ b/packages/core/shared/src/spellManager/SpellManager.ts @@ -153,7 +153,8 @@ export default class SpellManager { secrets, publicVariables, app, - agent: this.agent, + agentId: this.agent.id, + spellId: this.agent.rootSpellId as string, }) this.agent?.publishEvent(`${spellId}:run`, { diff --git a/packages/core/shared/src/spellManager/SpellRunner.ts b/packages/core/shared/src/spellManager/SpellRunner.ts index 96f23aaba2..05e2d9c11d 100644 --- a/packages/core/shared/src/spellManager/SpellRunner.ts +++ b/packages/core/shared/src/spellManager/SpellRunner.ts @@ -16,8 +16,10 @@ import SpellManager from './SpellManager' import { getLogger } from '../logger' import { NodeData } from 'rete/types/core/data' import { SPELLRUNNER_BUSY_TIMEOUT_MSEC } from '@magickml/config' +import { AGENT_SPELL } from '../communication/agentEventTypes' export type RunComponentArgs = { + sessionId?: string inputs: MagickSpellInput agent?: any componentName?: string @@ -68,14 +70,6 @@ class SpellRunner { }) } - publish(event, message) { - if (!this.agent) return - this.agent.publishEvent(`spell:${this.currentSpell.id}`, { - ...message, - event, - }) - } - emit(_message) { // same message emitted from server or agent const message = { @@ -88,12 +82,15 @@ class SpellRunner { if (!this.agent) { // if we aren't in an agent, we are on the server. // Emit the event directly via the agent service + this.logger.trace( + 'SPELLRUNNER: Emitting spell event from sandbox %o', + message + ) this.app.service('agents').emit('spell', message) } else { // handle the case of the emit being run on an agent not the server - console.log('emitting from new!!!!') // to do we probably want these events to be constants somewhere - this.agent.publishEvent('spell', message) + this.agent.publishEvent(AGENT_SPELL(this.agent.id), message) } } @@ -304,6 +301,7 @@ class SpellRunner { runSubspell = false, secrets, publicVariables, + sessionId, app, }: RunComponentArgs) { this.busy = true @@ -326,6 +324,7 @@ class SpellRunner { secrets, publicVariables, app, + sessionId, }) const component = this._getComponent( diff --git a/packages/core/shared/src/types.ts b/packages/core/shared/src/types.ts index 821948504a..9b5bd9db8d 100644 --- a/packages/core/shared/src/types.ts +++ b/packages/core/shared/src/types.ts @@ -614,6 +614,7 @@ export type ModuleContext = { app?: Application inputs: Record outputs: Record + sessionId?: string } projectId: string currentSpell: Spell diff --git a/packages/plugins/discord/server/src/connectors/discord.ts b/packages/plugins/discord/server/src/connectors/discord.ts index bf64654d19..1917eca840 100644 --- a/packages/plugins/discord/server/src/connectors/discord.ts +++ b/packages/plugins/discord/server/src/connectors/discord.ts @@ -266,7 +266,7 @@ export class DiscordConnector { } // add the sender if not already in the list - if (!entities.includes(author.username)) { + if (!entities.includes(this.client.user.username)) { entities.push(this.client.user.username) } @@ -285,7 +285,7 @@ export class DiscordConnector { [`Input - Discord (${inputType})`]: { connector: `Discord (${inputType})`, content: content, - sender: author.username, + sender: author.id, observer: this.client.user.username, client: 'discord', channel: message.channel.id, diff --git a/packages/plugins/discord/shared/src/nodes/utils.ts b/packages/plugins/discord/shared/src/nodes/utils.ts index cf6177d2c6..820fce3ee3 100644 --- a/packages/plugins/discord/shared/src/nodes/utils.ts +++ b/packages/plugins/discord/shared/src/nodes/utils.ts @@ -68,7 +68,7 @@ export async function runSpell( 'Input - Discord (Text)': { connector: 'Discord (Text)', content: content, - sender: ' message.author.username', + sender: ' message.author.id', observer: ' message.author.username', client: 'discord', channel: 'message.channel.id', diff --git a/packages/plugins/googleai/server/src/functions/makeChatCompletion.ts b/packages/plugins/googleai/server/src/functions/makeChatCompletion.ts index 37df308d1f..6f479c39ea 100644 --- a/packages/plugins/googleai/server/src/functions/makeChatCompletion.ts +++ b/packages/plugins/googleai/server/src/functions/makeChatCompletion.ts @@ -113,15 +113,14 @@ export async function makeChatCompletion( nodeId: node.id, }) - // Save metering event - trackGoogleAIUsage({ - projectId: context.projectId, - model: node?.data?.model as string, - callCount: 1, - wordCount: wordCount(result), - }) - if (result) { + // Save metering event + trackGoogleAIUsage({ + projectId: context.projectId, + model: node?.data?.model as string, + callCount: 1, + wordCount: wordCount(result), + }) return { success: true, result } } diff --git a/packages/plugins/intent/server/src/services/intent/intent.ts b/packages/plugins/intent/server/src/services/intent/intent.ts index 5614959829..6ab11fd8fd 100644 --- a/packages/plugins/intent/server/src/services/intent/intent.ts +++ b/packages/plugins/intent/server/src/services/intent/intent.ts @@ -11,7 +11,11 @@ import { intentQueryResolver, intentQueryValidator } from './intent.schema' import { v4 as uuidv4 } from 'uuid' import pgvector from 'pgvector/pg' // Import types and classes -import type { Application, HookContext } from '@magickml/server-core' +import { + checkPermissions, + type Application, + type HookContext, +} from '@magickml/server-core' import { IntentService, getOptions } from './intent.class' // Array with 1536 elements containing 0 @@ -55,6 +59,9 @@ export const intent = (app: Application) => { }, before: { all: [ + checkPermissions({ + roles: ['owner', 'agent'], + }), schemaHooks.validateQuery(intentQueryValidator), schemaHooks.resolveQuery(intentQueryResolver), ], diff --git a/packages/plugins/openai/server/src/functions/makeChatCompletion.ts b/packages/plugins/openai/server/src/functions/makeChatCompletion.ts index 50084ec650..46dc8e0c6d 100644 --- a/packages/plugins/openai/server/src/functions/makeChatCompletion.ts +++ b/packages/plugins/openai/server/src/functions/makeChatCompletion.ts @@ -6,9 +6,14 @@ import { } from '@magickml/core' import axios from 'axios' import { OPENAI_ENDPOINT } from '../constants' -import { DEFAULT_OPENAI_KEY, PRODUCTION } from '@magickml/config' +import { + DEFAULT_OPENAI_KEY, + PRODUCTION, + BACKOFF_RETRY_LIMIT, +} from '@magickml/config' import { GPT4_MODELS } from '@magickml/plugin-openai-shared' import { trackOpenAIUsage } from '@magickml/server-core' +import axiosRetry from 'axios-retry' /** * Generate a completion text based on prior chat conversation input. @@ -109,6 +114,16 @@ export async function makeChatCompletion( } try { + // Exponential back-off retry delay between requests + axiosRetry(axios, { + retries: BACKOFF_RETRY_LIMIT, + retryDelay: axiosRetry.exponentialDelay, + shouldResetTimeout: true, + retryCondition: error => { + return error?.response?.status === 429 + }, + }) + const start = Date.now() // Make the API call to OpenAI const completion = await axios.post( diff --git a/packages/plugins/openai/server/src/functions/makeTextCompletion.ts b/packages/plugins/openai/server/src/functions/makeTextCompletion.ts index 922e33e7dd..83f9577a1e 100644 --- a/packages/plugins/openai/server/src/functions/makeTextCompletion.ts +++ b/packages/plugins/openai/server/src/functions/makeTextCompletion.ts @@ -2,9 +2,14 @@ import { CompletionHandlerInputData, saveRequest } from '@magickml/core' import axios from 'axios' import { OPENAI_ENDPOINT } from '../constants' -import { DEFAULT_OPENAI_KEY, PRODUCTION } from '@magickml/config' +import { + DEFAULT_OPENAI_KEY, + PRODUCTION, + BACKOFF_RETRY_LIMIT, +} from '@magickml/config' import { GPT4_MODELS } from '@magickml/plugin-openai-shared' import { trackOpenAIUsage } from '@magickml/server-core' +import axiosRetry from 'axios-retry' /** * Makes an API request to an AI text completion service. @@ -72,6 +77,16 @@ export async function makeTextCompletion( // Make the API request and handle the response. try { + // Exponential back-off retry delay between requests + axiosRetry(axios, { + retries: BACKOFF_RETRY_LIMIT, + retryDelay: axiosRetry.exponentialDelay, + shouldResetTimeout: true, + retryCondition: error => { + return error?.response?.status === 429 + }, + }) + const start = Date.now() const resp = await axios.post(`${OPENAI_ENDPOINT}/completions`, settings, { headers: headers, diff --git a/packages/plugins/rest/server/src/services/agentHttp/agentHttp.class.ts b/packages/plugins/rest/server/src/services/agentHttp/agentHttp.class.ts index 8b3ab0bcbc..94bb40fb02 100644 --- a/packages/plugins/rest/server/src/services/agentHttp/agentHttp.class.ts +++ b/packages/plugins/rest/server/src/services/agentHttp/agentHttp.class.ts @@ -68,6 +68,7 @@ const getAgent = async ( type Request = { agent: Agent spellId?: string + sessionId?: string inputs: { [key: string]: { connector: string @@ -88,6 +89,7 @@ type RequestData = { content: string spellId?: string isCloud?: boolean + sessionId?: string secrets?: { [key: string]: string } @@ -108,7 +110,6 @@ const formatRequest = async ( params: any ): Promise => { const { - spellId, content, isCloud = false, secrets = {}, @@ -116,6 +117,7 @@ const formatRequest = async ( sender = 'api', client = 'rest', channel = 'rest', + sessionId, } = data // validate if method is GET, POST, PATCH, DELETE @@ -146,7 +148,8 @@ const formatRequest = async ( return { agent, - spellId, + spellId: agent.rootSpellId as string, + sessionId: sessionId, inputs: { [`Input - REST API (${method})`]: { connector: `REST API (${method})`, diff --git a/packages/plugins/rest/server/src/services/agentHttp/agentHttp.ts b/packages/plugins/rest/server/src/services/agentHttp/agentHttp.ts index 7da97c385e..c4d65fb8fb 100644 --- a/packages/plugins/rest/server/src/services/agentHttp/agentHttp.ts +++ b/packages/plugins/rest/server/src/services/agentHttp/agentHttp.ts @@ -17,7 +17,7 @@ import { } from './agentHttp.schema' // Import types and classes -import type { Application } from '@magickml/server-core' +import { checkPermissions, type Application } from '@magickml/server-core' import { AgentHttpService } from './agentHttp.class' // Add this service to the service type index @@ -63,6 +63,11 @@ export const agentHttp = (app: Application) => { ], }, before: { + all: [ + checkPermissions({ + roles: ['owner', 'agentHttp'], + }), + ], get: [ schemaHooks.validateQuery(agentHttpQueryValidator), schemaHooks.resolveQuery(agentHttpQueryResolver),