Skip to content
This repository has been archived by the owner on Dec 20, 2024. It is now read-only.

Add react hook to openai module #94

Merged
merged 10 commits into from
Oct 27, 2023
156 changes: 154 additions & 2 deletions src/llms/openai.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@
*/

import { isLiveChannelMessageEvent, LiveChannelAddress, LiveChannelMessageEvent, LiveChannelScope } from "@grafana/data";
import { getBackendSrv, getGrafanaLiveSrv, logDebug } from "@grafana/runtime";
import { getBackendSrv, getGrafanaLiveSrv, logDebug, /* logError */ } from "@grafana/runtime";

import { pipe, Observable, UnaryFunction } from "rxjs";
import React, { useEffect, useCallback, useState } from 'react';
import { useAsync } from 'react-use';
import { pipe, Observable, UnaryFunction, Subscription } from "rxjs";
import { filter, map, scan, takeWhile, tap } from "rxjs/operators";

import { LLM_PLUGIN_ID, LLM_PLUGIN_ROUTE, setLLMPluginVersion } from "./constants";
Expand Down Expand Up @@ -388,3 +390,153 @@ export const enabled = async (): Promise<OpenAIHealthDetails> => {
{ configured: details.openAI, ok: details.openAI } :
details.openAI;
}

export enum StreamStatus {
IDLE = 'idle',
GENERATING = 'generating',
COMPLETED = 'completed',
}

export const TIMEOUT = 10000;


export type OpenAIStreamState = {
setMessages: React.Dispatch<React.SetStateAction<Message[]>>;
reply: string;
streamStatus: StreamStatus;
error: Error | undefined;
value:
| {
enabled: boolean | undefined;
stream?: undefined;
}
| {
enabled: boolean | undefined;
stream: Subscription;
}
| undefined;
}
codeincarnate marked this conversation as resolved.
Show resolved Hide resolved

/**
* A custom React hook for managing an OpenAI stream that communicates with the provided model.
*
* @param {string} [model=DEFAULT_OAI_MODEL] - The OpenAI model to use for communication.
* @param {number} [temperature=1] - The temperature value for text generation (default is 1).
* @param {function} [notifyError] - A callback function for handling errors.
*
* @returns {OpenAIStreamState} - An object containing the state of the OpenAI stream.
* @property {function} setMessages - A function to update the list of messages in the stream.
* @property {string} reply - The most recent reply received from the OpenAI stream.
* @property {StreamStatus} streamStatus - The status of the stream ("idle", "generating" or "completed").
* @property {Error|undefined} error - An error object if an error occurs, or undefined if no error.
* @property {object|undefined} value - The current value of the stream.
* @property {boolean|undefined} value.enabled - Indicates whether the stream is enabled (true or false).
* @property {Subscription|undefined} value.stream - The stream subscription object if the stream is active, or undefined if not.
*/
Comment on lines +442 to +457
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure how easy this will be but could you add a super minimal example to the JSDoc? It can be similar to the examples in the other exposed functions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is feasible because there is so much handling to do, but once we have a working example elsewhere perhaps we can link it?

export function useOpenAIStream(
model = 'gpt-4',
temperature = 1,
SandersAaronD marked this conversation as resolved.
Show resolved Hide resolved
notifyError: (title: string, text?: string, traceId?: string) => void = () => {},
): OpenAIStreamState {
// The messages array to send to the LLM.
const [messages, setMessages] = useState<Message[]>([]);
// The latest reply from the LLM.
const [reply, setReply] = useState('');
const [streamStatus, setStreamStatus] = useState<StreamStatus>(StreamStatus.IDLE);
const [error, setError] = useState<Error>();

const onError = useCallback(
(e: Error) => {
setStreamStatus(StreamStatus.IDLE);
setMessages([]);
setError(e);
notifyError(
'Failed to generate content using OpenAI',
`Please try again or if the problem persists, contact your organization admin.`
);
console.error(e);
// logError(e, { messages: JSON.stringify(messages), model, temperature: String(temperature) });
},
[notifyError]
);

const { error: enabledError, value: isEnabled } = useAsync(
async () => await enabled().then((response) => response.ok),
[enabled]
);

const { error: asyncError, value } = useAsync(async () => {
if (!isEnabled || !messages.length) {
return { enabled: isEnabled };
}

setStreamStatus(StreamStatus.GENERATING);
setError(undefined);
// Stream the completions. Each element is the next stream chunk.
const stream = streamChatCompletions({
model,
temperature,
messages,
})
.pipe(
// Accumulate the stream content into a stream of strings, where each
// element contains the accumulated message so far.
accumulateContent()
// The stream is just a regular Observable, so we can use standard rxjs
// functionality to update state, e.g. recording when the stream
// has completed.
// The operator decision tree on the rxjs website is a useful resource:
// https://rxjs.dev/operator-decision-tree.)
);
// Subscribe to the stream and update the state for each returned value.
return {
enabled: isEnabled,
stream: stream.subscribe({
next: setReply,
error: onError,
complete: () => {
setStreamStatus(StreamStatus.COMPLETED);
setTimeout(() => {
setStreamStatus(StreamStatus.IDLE);
});
setMessages([]);
setError(undefined);
},
}),
};
}, [messages, isEnabled]);

// Unsubscribe from the stream when the component unmounts.
useEffect(() => {
return () => {
if (value?.stream) {
value.stream.unsubscribe();
}
};
}, [value]);

// If the stream is generating and we haven't received a reply, it times out.
useEffect(() => {
let timeout: NodeJS.Timeout | undefined;
if (streamStatus === StreamStatus.GENERATING && reply === '') {
timeout = setTimeout(() => {
onError(new Error(`OpenAI stream timed out after ${TIMEOUT}ms`));
}, TIMEOUT);
}
return () => {
timeout && clearTimeout(timeout);
};
}, [streamStatus, reply, onError]);

if (asyncError || enabledError) {
setError(asyncError || enabledError);
}

return {
setMessages,
reply,
streamStatus,
error,
value,
};
}
Loading