Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[V1] [4/N] API Server: ZMQ/MP Utilities #11541

Merged
merged 18 commits into from
Dec 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/requirements-docs.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ openai # Required by docs/source/serving/openai_compatible_server.md's vllm.entr
fastapi # Required by docs/source/serving/openai_compatible_server.md's vllm.entrypoints.openai.cli_args
partial-json-parser # Required by docs/source/serving/openai_compatible_server.md's vllm.entrypoints.openai.cli_args
requests
zmq
13 changes: 4 additions & 9 deletions tests/v1/engine/test_engine_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from vllm import SamplingParams
from vllm.engine.arg_utils import EngineArgs
from vllm.platforms import current_platform
from vllm.usage.usage_lib import UsageContext
from vllm.v1.engine import EngineCoreRequest
from vllm.v1.engine.async_llm import AsyncLLM
from vllm.v1.engine.core import EngineCore
Expand Down Expand Up @@ -43,13 +42,11 @@ def test_engine_core(monkeypatch):
m.setenv("VLLM_USE_V1", "1")
"""Setup the EngineCore."""
engine_args = EngineArgs(model=MODEL_NAME)
vllm_config = engine_args.create_engine_config(
usage_context=UsageContext.UNKNOWN_CONTEXT)
vllm_config = engine_args.create_engine_config()
executor_class = AsyncLLM._get_executor_cls(vllm_config)

engine_core = EngineCore(vllm_config=vllm_config,
executor_class=executor_class,
usage_context=UsageContext.UNKNOWN_CONTEXT)
executor_class=executor_class)
"""Test basic request lifecycle."""

# First request.
Expand Down Expand Up @@ -151,13 +148,11 @@ def test_engine_core_advanced_sampling(monkeypatch):
m.setenv("VLLM_USE_V1", "1")
"""Setup the EngineCore."""
engine_args = EngineArgs(model=MODEL_NAME)
vllm_config = engine_args.create_engine_config(
usage_context=UsageContext.UNKNOWN_CONTEXT)
vllm_config = engine_args.create_engine_config()
executor_class = AsyncLLM._get_executor_cls(vllm_config)

engine_core = EngineCore(vllm_config=vllm_config,
executor_class=executor_class,
usage_context=UsageContext.UNKNOWN_CONTEXT)
executor_class=executor_class)
"""Test basic request lifecycle."""
# First request.
request: EngineCoreRequest = make_request()
Expand Down
10 changes: 4 additions & 6 deletions tests/v1/engine/test_engine_core_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,10 @@ def test_engine_core_client(monkeypatch, multiprocessing_mode: bool):
UsageContext.UNKNOWN_CONTEXT)
executor_class = AsyncLLM._get_executor_cls(vllm_config)
client = EngineCoreClient.make_client(
vllm_config,
executor_class,
UsageContext.UNKNOWN_CONTEXT,
multiprocess_mode=multiprocessing_mode,
asyncio_mode=False,
vllm_config=vllm_config,
executor_class=executor_class,
)

MAX_TOKENS = 20
Expand Down Expand Up @@ -158,11 +157,10 @@ async def test_engine_core_client_asyncio(monkeypatch):
usage_context=UsageContext.UNKNOWN_CONTEXT)
executor_class = AsyncLLM._get_executor_cls(vllm_config)
client = EngineCoreClient.make_client(
vllm_config,
executor_class,
UsageContext.UNKNOWN_CONTEXT,
multiprocess_mode=True,
asyncio_mode=True,
vllm_config=vllm_config,
executor_class=executor_class,
)

MAX_TOKENS = 20
Expand Down
11 changes: 10 additions & 1 deletion vllm/entrypoints/openai/api_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
from vllm.logger import init_logger
from vllm.usage.usage_lib import UsageContext
from vllm.utils import (FlexibleArgumentParser, get_open_zmq_ipc_path,
is_valid_ipv6_address, set_ulimit)
is_valid_ipv6_address, kill_process_tree, set_ulimit)
from vllm.version import __version__ as VLLM_VERSION

TIMEOUT_KEEP_ALIVE = 5 # seconds
Expand Down Expand Up @@ -737,6 +737,15 @@ def signal_handler(*_) -> None:

signal.signal(signal.SIGTERM, signal_handler)

# The child processes will send SIGQUIT to this process when
# any error happens. This process then clean up the whole tree.
# TODO(rob): move this into AsyncLLM.__init__ once we remove
# the context manager below.
def sigquit_handler(signum, frame):
kill_process_tree(os.getpid())

signal.signal(signal.SIGQUIT, sigquit_handler)

async with build_async_engine_client(args) as engine_client:
app = build_app(args)

Expand Down
22 changes: 1 addition & 21 deletions vllm/executor/multiproc_worker_utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import asyncio
import multiprocessing
import os
import sys
import threading
Expand All @@ -13,10 +12,9 @@

import torch

import vllm.envs as envs
from vllm.logger import init_logger
from vllm.triton_utils.importing import HAS_TRITON
from vllm.utils import cuda_is_initialized
from vllm.utils import _check_multiproc_method, get_mp_context

if HAS_TRITON:
from vllm.triton_utils import maybe_set_triton_cache_manager
Expand Down Expand Up @@ -274,24 +272,6 @@ def write_with_prefix(s: str):
file.write = write_with_prefix # type: ignore[method-assign]


def _check_multiproc_method():
if (cuda_is_initialized()
and os.environ.get("VLLM_WORKER_MULTIPROC_METHOD") != "spawn"):
logger.warning("CUDA was previously initialized. We must use "
"the `spawn` multiprocessing start method. Setting "
"VLLM_WORKER_MULTIPROC_METHOD to 'spawn'. "
"See https://docs.vllm.ai/en/latest/getting_started/"
"debugging.html#python-multiprocessing "
"for more information.")
os.environ["VLLM_WORKER_MULTIPROC_METHOD"] = "spawn"


def get_mp_context():
_check_multiproc_method()
mp_method = envs.VLLM_WORKER_MULTIPROC_METHOD
return multiprocessing.get_context(mp_method)


def set_multiprocessing_worker_envs(parallel_config):
""" Set up environment variables that should be used when there are workers
in a multiprocessing environment. This should be called by the parent
Expand Down
90 changes: 87 additions & 3 deletions vllm/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import importlib.util
import inspect
import ipaddress
import multiprocessing
import os
import re
import resource
Expand All @@ -20,6 +21,7 @@
import tempfile
import threading
import time
import traceback
import uuid
import warnings
import weakref
Expand All @@ -29,8 +31,9 @@
from dataclasses import dataclass, field
from functools import lru_cache, partial, wraps
from typing import (TYPE_CHECKING, Any, AsyncGenerator, Awaitable, Callable,
Dict, Generator, Generic, List, Literal, NamedTuple,
Optional, Tuple, Type, TypeVar, Union, overload)
Dict, Generator, Generic, Iterator, List, Literal,
NamedTuple, Optional, Tuple, Type, TypeVar, Union,
overload)
from uuid import uuid4

import numpy as np
Expand All @@ -39,6 +42,8 @@
import torch
import torch.types
import yaml
import zmq
import zmq.asyncio
from packaging.version import Version
from torch.library import Library
from typing_extensions import ParamSpec, TypeIs, assert_never
Expand Down Expand Up @@ -1844,7 +1849,7 @@ def memory_profiling(
result.non_kv_cache_memory_in_bytes = result.non_torch_increase_in_bytes + result.torch_peak_increase_in_bytes + result.weights_memory_in_bytes # noqa


# Adapted from: https://github.com/sgl-project/sglang/blob/f46f394f4d4dbe4aae85403dec006199b34d2840/python/sglang/srt/utils.py#L630 # noqa: E501Curre
# Adapted from: https://github.com/sgl-project/sglang/blob/v0.4.1/python/sglang/srt/utils.py#L630 # noqa: E501
def set_ulimit(target_soft_limit=65535):
resource_type = resource.RLIMIT_NOFILE
current_soft, current_hard = resource.getrlimit(resource_type)
Expand All @@ -1859,3 +1864,82 @@ def set_ulimit(target_soft_limit=65535):
"with error %s. This can cause fd limit errors like"
"`OSError: [Errno 24] Too many open files`. Consider "
"increasing with ulimit -n", current_soft, e)


# Adapted from: https://github.com/sgl-project/sglang/blob/v0.4.1/python/sglang/utils.py#L28 # noqa: E501
def get_exception_traceback():
etype, value, tb = sys.exc_info()
err_str = "".join(traceback.format_exception(etype, value, tb))
return err_str


# Adapted from: https://github.com/sgl-project/sglang/blob/v0.4.1/python/sglang/srt/utils.py#L783 # noqa: E501
def make_zmq_socket(
ctx: Union[zmq.asyncio.Context, zmq.Context], # type: ignore[name-defined]
path: str,
type: Any,
) -> Union[zmq.Socket, zmq.asyncio.Socket]: # type: ignore[name-defined]
"""Make a ZMQ socket with the proper bind/connect semantics."""

mem = psutil.virtual_memory()
socket = ctx.socket(type)

# Calculate buffer size based on system memory
total_mem = mem.total / 1024**3
available_mem = mem.available / 1024**3
# For systems with substantial memory (>32GB total, >16GB available):
# - Set a large 0.5GB buffer to improve throughput
# For systems with less memory:
# - Use system default (-1) to avoid excessive memory consumption
if total_mem > 32 and available_mem > 16:
buf_size = int(0.5 * 1024**3) # 0.5GB in bytes
else:
buf_size = -1 # Use system default buffer size

if type == zmq.constants.PULL:
socket.setsockopt(zmq.constants.RCVHWM, 0)
socket.setsockopt(zmq.constants.RCVBUF, buf_size)
socket.connect(path)
elif type == zmq.constants.PUSH:
socket.setsockopt(zmq.constants.SNDHWM, 0)
socket.setsockopt(zmq.constants.SNDBUF, buf_size)
socket.bind(path)
else:
raise ValueError(f"Unknown Socket Type: {type}")

return socket


@contextlib.contextmanager
def zmq_socket_ctx(
path: str,
type: Any) -> Iterator[zmq.Socket]: # type: ignore[name-defined]
"""Context manager for a ZMQ socket"""

ctx = zmq.Context(io_threads=2) # type: ignore[attr-defined]
try:
yield make_zmq_socket(ctx, path, type)

except KeyboardInterrupt:
logger.debug("Got Keyboard Interrupt.")

finally:
ctx.destroy(linger=0)


def _check_multiproc_method():
if (cuda_is_initialized()
and os.environ.get("VLLM_WORKER_MULTIPROC_METHOD") != "spawn"):
logger.warning("CUDA was previously initialized. We must use "
"the `spawn` multiprocessing start method. Setting "
"VLLM_WORKER_MULTIPROC_METHOD to 'spawn'. "
"See https://docs.vllm.ai/en/latest/getting_started/"
"debugging.html#python-multiprocessing "
"for more information.")
os.environ["VLLM_WORKER_MULTIPROC_METHOD"] = "spawn"


def get_mp_context():
_check_multiproc_method()
mp_method = envs.VLLM_WORKER_MULTIPROC_METHOD
return multiprocessing.get_context(mp_method)
6 changes: 3 additions & 3 deletions vllm/v1/engine/async_llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,11 @@ def __init__(

# EngineCore (starts the engine in background process).
self.engine_core = EngineCoreClient.make_client(
vllm_config=vllm_config,
executor_class=executor_class,
usage_context=usage_context,
multiprocess_mode=True,
asyncio_mode=True,
vllm_config=vllm_config,
executor_class=executor_class,
log_stats=self.log_stats,
)

self.output_handler: Optional[asyncio.Task] = None
Expand Down
Loading
Loading