Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[V1] Simplify Shutdown #11659

Merged
merged 23 commits into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 0 additions & 12 deletions vllm/v1/engine/async_llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,6 @@ def sigquit_handler(signum, frame):

self.output_handler: Optional[asyncio.Task] = None

def __del__(self):
self.shutdown()

@classmethod
def from_engine_args(
cls,
Expand Down Expand Up @@ -136,15 +133,6 @@ def from_engine_args(
stat_loggers=stat_loggers,
)

def shutdown(self):
"""Shutdown, cleaning up the background proc and IPC."""

if engine_core := getattr(self, "engine_core", None):
engine_core.shutdown()

if handler := getattr(self, "output_handler", None):
handler.cancel()

@classmethod
def _get_executor_cls(cls, vllm_config: VllmConfig) -> Type[Executor]:
executor_class: Type[Executor]
Expand Down
9 changes: 0 additions & 9 deletions vllm/v1/engine/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,6 @@ def step(self) -> List[EngineCoreOutput]:
scheduler_output, output)
return engine_core_outputs

def shutdown(self):
self.model_executor.shutdown()

def profile(self, is_start: bool = True):
self.model_executor.profile(is_start)

Expand Down Expand Up @@ -187,7 +184,6 @@ def signal_handler(signum, frame):
signal.signal(signal.SIGINT, signal_handler)

parent_process = psutil.Process().parent()
engine_core = None
try:
engine_core = EngineCoreProc(*args, **kwargs)
engine_core.run_busy_loop()
Expand All @@ -200,11 +196,6 @@ def signal_handler(signum, frame):
logger.error("EngineCore hit an exception: %s", traceback)
parent_process.send_signal(signal.SIGQUIT)

finally:
if engine_core is not None:
engine_core.shutdown()
engine_core = None

def run_busy_loop(self):
"""Core busy loop of the EngineCore."""

Expand Down
21 changes: 5 additions & 16 deletions vllm/v1/engine/core_client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import List, Optional, Type
import weakref
from typing import List, Type

import msgspec
import zmq
Expand Down Expand Up @@ -52,9 +53,6 @@ def make_client(

return InprocClient(vllm_config, executor_class, log_stats)

def shutdown(self):
pass

def get_output(self) -> List[EngineCoreOutput]:
raise NotImplementedError

Expand Down Expand Up @@ -104,12 +102,6 @@ def add_request(self, request: EngineCoreRequest) -> None:
def abort_requests(self, request_ids: List[str]) -> None:
self.engine_core.abort_requests(request_ids)

def shutdown(self):
self.engine_core.shutdown()

def __del__(self):
self.shutdown()

def profile(self, is_start: bool = True) -> None:
self.engine_core.profile(is_start)

Expand All @@ -134,6 +126,9 @@ def __init__(
executor_class: Type[Executor],
log_stats: bool = False,
):
# Ensure cleanup of ZMQ during GC.
self._finalizer = weakref.finalize(self, self.shutdown)

# Serialization setup.
self.encoder = PickleEncoder()
self.decoder = msgspec.msgpack.Decoder(EngineCoreOutputs)
Expand All @@ -153,7 +148,6 @@ def __init__(
zmq.constants.PUSH)

# Start EngineCore in background process.
self.proc_handle: Optional[BackgroundProcHandle]
self.proc_handle = BackgroundProcHandle(
input_path=input_path,
output_path=output_path,
Expand All @@ -166,13 +160,8 @@ def __init__(
})

def shutdown(self):
# Shut down the zmq context.
self.ctx.destroy(linger=0)

if hasattr(self, "proc_handle") and self.proc_handle:
self.proc_handle.shutdown()
self.proc_handle = None


class SyncMPClient(MPClient):
"""Synchronous client for multi-proc EngineCore."""
Expand Down
7 changes: 0 additions & 7 deletions vllm/v1/engine/llm_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,10 +205,3 @@ def get_tokenizer_group(
f"found type: {type(tokenizer_group)}")

return tokenizer_group

def __del__(self):
self.shutdown()

def shutdown(self):
if engine_core := getattr(self, "engine_core", None):
engine_core.shutdown()
4 changes: 1 addition & 3 deletions vllm/v1/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ def __init__(
target_fn: Callable,
process_kwargs: Dict[Any, Any],
):
# Ensure cleanup of background process during GC.
self._finalizer = weakref.finalize(self, self.shutdown)

context = get_mp_context()
Expand All @@ -114,9 +115,6 @@ def __init__(
raise RuntimeError(f"{process_name} initialization failed. "
"See root cause above.")

def __del__(self):
self.shutdown()

def shutdown(self):
# Shutdown the process if needed.
if hasattr(self, "proc") and self.proc.is_alive():
Expand Down
Loading