generated from deepgram/oss-repo-template
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathclient.py
234 lines (191 loc) · 6.71 KB
/
client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
import pyaudio
import asyncio
import websockets
import os
import json
import threading
import janus
import queue
import sys
import requests
# troubleshooting notes
#if you tend to close your laptop versus shutting down each night, I would recommend that you restart. I know that portaudio is a little temperamental if it isnt shutdown correctly (ie doing a cntl + c for a break).
# use postman to test the api key and endpoint
# Deepgram Voice Agent Code using Azure OpenAI Services
# Your Deepgram Voice Agent URL
VOICE_AGENT_URL = "wss://agent.deepgram.com/agent"
# Your Azure OpenAI endpoint
AZURE_URL = "Your Azure OpenAI endpoint here."
# Your Agent prompt
PROMPT = "Your prompt here."
# Your Deepgram TTS model
VOICE = "aura-orion-en"
# Your Deepgram STT model
LISTEN = "nova-2"
# Your model from Azure OpenAI Services
LLM_MODEL = "gpt-4o-mini"
USER_AUDIO_SAMPLE_RATE = 16000
USER_AUDIO_SECS_PER_CHUNK = 0.05
USER_AUDIO_SAMPLES_PER_CHUNK = round(USER_AUDIO_SAMPLE_RATE * USER_AUDIO_SECS_PER_CHUNK)
AGENT_AUDIO_SAMPLE_RATE = 16000
AGENT_AUDIO_BYTES_PER_SEC = 2 * AGENT_AUDIO_SAMPLE_RATE
SETTINGS = {
"type": "SettingsConfiguration",
"audio": {
"input": {
"encoding": "linear16",
"sample_rate": USER_AUDIO_SAMPLE_RATE,
},
"output": {
"encoding": "linear16",
"sample_rate": AGENT_AUDIO_SAMPLE_RATE,
"container": "none",
},
},
"agent": {
"listen": {
"model": LISTEN
},
"think": {
"provider": {
"type": "custom",
"url": AZURE_URL,
"headers": [
{
"key": "api-key",
"value": os.environ.get("AZURE_OPENAI_API_KEY")
}
]
},
"model": LLM_MODEL,
"instructions": PROMPT,
},
"speak": {
"model": VOICE
},
},
"context": {
"messages": [], # LLM message history (e.g. to restore existing conversation if websocket connection breaks)
"replay": False # whether to replay the last message, if it is an assistant message
}
}
mic_audio_queue = asyncio.Queue()
def callback(input_data, frame_count, time_info, status_flag):
mic_audio_queue.put_nowait(input_data)
return (input_data, pyaudio.paContinue)
async def run():
dg_api_key = os.environ.get("DEEPGRAM_API_KEY")
if dg_api_key is None:
print("DEEPGRAM_API_KEY env var not present")
return
azure_api_key = os.environ.get("AZURE_OPENAI_API_KEY")
if azure_api_key is None:
print("AZURE_OPENAI_API_KEY env var not present")
return
async with websockets.connect(
VOICE_AGENT_URL,
extra_headers={"Authorization": f"Token {dg_api_key}"},
) as ws:
async def microphone():
audio = pyaudio.PyAudio()
stream = audio.open(
format=pyaudio.paInt16,
rate=USER_AUDIO_SAMPLE_RATE,
input=True,
frames_per_buffer=USER_AUDIO_SAMPLES_PER_CHUNK,
stream_callback=callback,
channels=1
)
stream.start_stream()
while stream.is_active():
await asyncio.sleep(0.1)
stream.stop_stream()
stream.close()
async def sender(ws):
await ws.send(json.dumps(SETTINGS))
try:
while True:
data = await mic_audio_queue.get()
await ws.send(data)
except Exception as e:
print("Error while sending: " + str(e))
raise
async def receiver(ws):
try:
speaker = Speaker()
with speaker:
async for message in ws:
if type(message) is str:
print(message)
if json.loads(message)["type"] == "UserStartedSpeaking":
speaker.stop()
elif type(message) is bytes:
await speaker.play(message)
except Exception as e:
print(e)
await asyncio.wait(
[
asyncio.ensure_future(microphone()),
asyncio.ensure_future(sender(ws)),
asyncio.ensure_future(receiver(ws)),
]
)
def main():
asyncio.run(run())
def _play(audio_out, stream, stop):
while not stop.is_set():
try:
# Janus sync queue mimics the API of queue.Queue, and async queue mimics the API of
# asyncio.Queue. So for this line check these docs:
# https://docs.python.org/3/library/queue.html#queue.Queue.get.
#
# The timeout of 0.05 is to prevent this line from going into an uninterruptible wait,
# which can interfere with shutting down the program on some systems.
data = audio_out.sync_q.get(True, 0.05)
# In PyAudio's "blocking mode," the `write` function will block until playback is
# finished. This is why we can stop playback very quickly by simply stopping this loop;
# there is never more than 1 chunk of audio awaiting playback inside PyAudio.
# Read more: https://people.csail.mit.edu/hubert/pyaudio/docs/#example-blocking-mode-audio-i-o
stream.write(data)
except queue.Empty:
pass
class Speaker:
def __init__(self):
self._queue = None
self._stream = None
self._thread = None
self._stop = None
def __enter__(self):
audio = pyaudio.PyAudio()
self._stream = audio.open(
format=pyaudio.paInt16,
channels=1,
rate=AGENT_AUDIO_SAMPLE_RATE,
input=False,
output=True,
)
self._queue = janus.Queue()
self._stop = threading.Event()
self._thread = threading.Thread(
target=_play, args=(self._queue, self._stream, self._stop), daemon=True
)
self._thread.start()
def __exit__(self, exc_type, exc_value, traceback):
self._stop.set()
self._thread.join()
self._stream.close()
self._stream = None
self._queue = None
self._thread = None
self._stop = None
async def play(self, data):
return await self._queue.async_q.put(data)
def stop(self):
if self._queue and self._queue.async_q:
while not self._queue.async_q.empty():
try:
self._queue.async_q.get_nowait()
except janus.QueueEmpty:
break
if __name__ == "__main__":
sys.exit(main() or 0)