Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add httpx proxy client for embedded model #199

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 43 additions & 11 deletions backend/app/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@
from __future__ import annotations

import os
from typing import Any, BinaryIO, List, Optional
import httpx
import logging
from typing import Any, BinaryIO, List, Optional, Union
from urllib.parse import urlparse

from langchain_text_splitters import RecursiveCharacterTextSplitter, TextSplitter
from langchain_community.document_loaders.blob_loaders.schema import Blob
Expand All @@ -25,6 +28,8 @@
from app.ingest import ingest_blob
from app.parsing import MIMETYPE_BASED_PARSER

logger = logging.getLogger(__name__)


def _guess_mimetype(file_bytes: bytes) -> str:
"""Guess the mime-type of a file."""
Expand Down Expand Up @@ -52,6 +57,34 @@ def _convert_ingestion_input_to_blob(data: BinaryIO) -> Blob:
)


def _get_http_client(use_async: bool = False) -> Union[httpx.Client, httpx.AsyncClient]:
"""
Create and return a httpx.Client or httpx.AsyncClient instance, configured with a proxy if available and valid.

The method checks for a PROXY_URL environment variable. If a valid proxy URL is found,
the client is configured to use this proxy. Otherwise, a default client is returned.

Args:
use_async (bool): Flag indicating whether to return an asynchronous HTTP client.

Returns:
An instance of httpx.Client or httpx.AsyncClient configured with or without a proxy based on the environment configuration.
"""
proxy_url = os.getenv("PROXY_URL")
client_kwargs = {}
if proxy_url:
parsed_url = urlparse(proxy_url)
if parsed_url.scheme and parsed_url.netloc:
client_kwargs["proxies"] = proxy_url
else:
logger.warning("Invalid proxy URL provided. Proceeding without proxy.")

if use_async:
return httpx.AsyncClient(**client_kwargs)
else:
return httpx.Client(**client_kwargs)


class IngestRunnable(RunnableSerializable[BinaryIO, List[str]]):
"""Runnable for ingesting files into a vectorstore."""

Expand All @@ -72,25 +105,25 @@ class Config:
@property
def namespace(self) -> str:
if (self.assistant_id is None and self.thread_id is None) or (
self.assistant_id is not None and self.thread_id is not None
self.assistant_id is not None and self.thread_id is not None
):
raise ValueError(
"Exactly one of assistant_id or thread_id must be provided"
)
return self.assistant_id if self.assistant_id is not None else self.thread_id

def invoke(
self, input: BinaryIO, config: Optional[RunnableConfig] = None
self, input: BinaryIO, config: Optional[RunnableConfig] = None
) -> List[str]:
return self.batch([input], config)

def batch(
self,
inputs: List[BinaryIO],
config: RunnableConfig | List[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
self,
inputs: List[BinaryIO],
config: RunnableConfig | List[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> List:
"""Ingest a batch of files into the vectorstore."""
ids = []
Expand Down Expand Up @@ -118,11 +151,10 @@ def batch(
)
vstore = PGVector(
connection_string=PG_CONNECTION_STRING,
embedding_function=OpenAIEmbeddings(),
embedding_function=OpenAIEmbeddings(http_client=_get_http_client()),
use_jsonb=True,
)


ingest_runnable = IngestRunnable(
text_splitter=RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200),
vectorstore=vstore,
Expand Down
Loading