Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RFC]: Disaggregated prefilling and KV cache transfer roadmap #10818

Open
2 of 34 tasks
KuntaiDu opened this issue Dec 2, 2024 · 14 comments
Open
2 of 34 tasks

[RFC]: Disaggregated prefilling and KV cache transfer roadmap #10818

KuntaiDu opened this issue Dec 2, 2024 · 14 comments
Labels

Comments

@KuntaiDu
Copy link
Collaborator

KuntaiDu commented Dec 2, 2024

Motivation.

Here is the roadmap for disaggregated prefill (and general-purpose kv cache transfer). Feel free to contribute 😁.

Proposed Change.

  • XpYd support (X vLLM prefill instances, and Y vLLM decode instances, very likely that the tp and pp are not the same between prefill and decode instances)
    • [Feature] Allow specifying region-of-interest / roi on num_head dimension and layer dimension (currently the roi tensor only contains tokens dimension)
    • [Feature] XpYd support by building multiple connections between Xp and Yd
    • [Feature] XpYd support by letting Xp connect to one KV cache server, and connect this server to Yd
  • Building connection
    • [Usage] Keep distributed connection alive by periodically sending dummy requests.
    • [Usage] Build connection by running vllm connect ([Frontend] Disaggregate prefill decode with zmq #11791 )
    • [Feature] allow connecting prefiller and decoder between different nodes
    • [Perf] Build connection by directly talking to the Engine instead of talking to the API server
  • Compatibility
    • [Feature] Compatible with chunked prefill
    • [Feature] Compatble with prefix caching
    • [Feature] Compatible with pipeline parallel
    • [Feature] Compatible with multi-modality
  • Asynchronous KV cache transfer
    • [Perf] KV cache prefetching
    • [Perf] layer-by-layer pipelining (by changing model forward context -- please don't change model code)
  • Communication support
    • RCCL pipe
    • RDMA pipe
    • DCN pipe
    • CXL pipe
    • Infra-specific pipe (e.g. AWS / Azure / Google cloud)
  • Better memory control
    • [Perf] Reusing vLLM page table to avoid memory fragmentation
    • [Perf] Reduce number of tensor copy
  • Adaptivity and fault tolerance
    • [Perf] If not all KV caches in the batch are received, only perform prefiling on those tokens without KV cache
    • [Perf] Allow one prefill/decode vllm worker to be repurposed to decode/prefill vllm worker
  • Third-party engine integration
  • Persistant prefix caching support
    • [Feature] allow fetching the KV cache on some prefix tokens and then prefill on the remaining tokens
    • [Feature] allow fetching the KV cache of some contiguous tokens in the middle and then perform prefill on the remaining tokens to blend the KV cache with remaining context
  • Orchestration
    • [Feature] A centralized orchestrator for a pool of prefill and decode workers
    • [Feature] Dynamically add / remove worker
    • [Feature] Let the orchestrator be able to observe the workers using the observability APIs already exposed by vLLM
    • [Feature] Initial routing support (send the decoding request to the most available decode instance first)

Feedback Period.

No response

CC List.

@youkaichao @zeroorhero @comaniac @rkooo567 @WoosukKwon @liweiqing1997 @ShangmingCai @Leaf996 @coolkp @sjnaj @K-Mistele @ApostaC @YaoJiayi @njhill

Any Other Things.

No response

Before submitting a new issue...

  • Make sure you already searched for relevant issues, and asked the chatbot living at the bottom right corner of the documentation page, which can answer lots of frequently asked questions.
@KuntaiDu KuntaiDu added the RFC label Dec 2, 2024
@WangErXiao
Copy link
Contributor

Will the logic for model upgrades and instance service discovery be introduced in XpYd?

@KuntaiDu
Copy link
Collaborator Author

KuntaiDu commented Dec 2, 2024

Will the logic for model upgrades and instance service discovery be introduced in XpYd?

Model upgrades --- not in the scope of disaggregated prefill roadmap for now. But this IS important, RLHF-style training also needs this, so I can add that if this is a common need. Please ❤️ this message if you need this feature.
Instance service discovery --- this will be done by a request gateway, which is also an ongoing effort but I need to discuss with other people more to figure out where should we put it (inside vLLM, or as a k8s/kserve plugin, or other options).

@WangErXiao
Copy link
Contributor

Will the logic for model upgrades and instance service discovery be introduced in XpYd?

Model upgrades --- not in the scope of disaggregated prefill roadmap for now. But this IS important, RLHF-style training also needs this, so I can add that if this is a common need. Please ❤️ this message if you need this feature. Instance service discovery --- this will be done by a request gateway, which is also an ongoing effort but I need to discuss with other people more to figure out where should we put it (inside vLLM, or as a k8s/kserve plugin, or other options).

When using XpYd in production, model upgrades are frequent. During the upgrade period, there are two versions of the model. I think vLLM gateway need to pair prefill and decode instances, ensuring they are from the same model version.

@xiuqiaoli
Copy link

Glad to see the progress of supporting P/D disaggreation feature.

  1. Will this RFC support a central scheduler to determine the best prefill and decode instances to serve current request? Many PD disaggregation papers (Mooncake, distserv, etc) introduce similar components to consider a couple of metrics (instance load, KV Cache locality, etc) before choosing p/d instances.
  2. Since there are multiple choices of memory store or storage systems for sharing KV Cache, will your design introduce interfaces to plugin extensions for 3rd party KV Cache memory/storage systems?

@KuntaiDu
Copy link
Collaborator Author

KuntaiDu commented Dec 3, 2024

Glad to see the progress of supporting P/D disaggreation feature.

  1. Will this RFC support a central scheduler to determine the best prefill and decode instances to serve current request? Many PD disaggregation papers (Mooncake, distserv, etc) introduce similar components to consider a couple of metrics (instance load, KV Cache locality, etc) before choosing p/d instances.
  2. Since there are multiple choices of memory store or storage systems for sharing KV Cache, will your design introduce interfaces to plugin extensions for 3rd party KV Cache memory/storage systems?
  1. Yes. I am discussing with people to decide where to put the scheduler (it likely should not be implemented in python as scalability matters for this scheduler, so I am not sure if it should be placed in vllm, or some other repos underneath vllm-project)
  2. Yes and I leave the APIs for third-party systems to integrate. Key APIs are insert and drop_select.

@wyzzbond
Copy link

wyzzbond commented Dec 4, 2024

To better support the PD disaggregated architecture, we are actively developing a dual-tiered scheduler, implemented in Go, to optimize XpYd and request management. This upgrade has been built upon our PD disaggregated feature within vllm and is now live in our production environment, showing improved performance with good stability.

The core design of our scheduler is outlined below:

● Observability: To reduce reliance on any single inference engine, we have implemented a Go-based reverse proxy that directly collects and computes instance-level performance metrics in real time, such as TTFT, TPOT, instance load, and cache status.

● Hierarchical Scheduling System: Our system features a Cluster Level Scheduler (CLS) and an Instance Level Scheduler (ILS), aiming to maximize goodput per GPU while meeting latency SLOs. The CLS leverages a workload-aware performance-cost model to refine request routing, determining whether to use a disaggregated or colocated serving mode and pinpointing the most cost-effective GPU types. Subsequently, the ILS assigns the most suitable P/D instance pairs for incoming requests, optimizing load balancing and cache reuse.

● Dynamic P/D Adjustment: By leveraging instance-level metrics, we've developed a role shift module that periodically evaluates instance load stats and decides when to add, remove, or switch P/D instances as needed.

We are looking forward to releasing the code for our global scheduler to OSS shortly. Additional features are currently in development. We welcome any discussions and opportunities for collaboration.

@yuleil
Copy link
Contributor

yuleil commented Dec 4, 2024

We (Alibaba Cloud) are actively developing a disaggregated prefilling feature for vLLM to tackle latency issues and minimize interference during prefilling and decoding.  Leveraging fully asynchronous I/O, it ensures minimal overhead for PD disaggregation. This implementation has been widely verified in our production system, proving its robust stability and exceptional performance.

Design Highlights

  • Fully asynchronous: KV Cache transfer does not block computation. We observed with nsys that NCCL communication and computation are fully overlapped with different CUDA streams.image

  • Control the behavior of disaggregated prefill at the request level: Our engine is designed to handle each request with the flexibility to switch between different serving strategies, ranging from single-instance serving to PD disaggregation. This architecture markedly enhances the scheduler’s potential for optimization. By merely including{"prefill_endpoint": "http://192.168.1.124:8001/v1/completions"}in the request, we can conduct PD disaggregation with the prefill instance selected by our global scheduler based on workload attributes and instance-level performance metrics. This capability allows for on-the-fly adjustment of the P/D instance ratios to optimize performance and facilitates instantaneous role transitions as required.

Workflow

PD disaggregation inference can enabled by using the"prefill_endpointparameter. However, to achieve optimal global load balancing, enhance prefix caching affinity, and minimize the mismatch between P/D instances, a global scheduler has been incorporated into the system. The whole process is structured as follows:

image

Performance Evaluation

Microbenchmark

A10 Single instance TP1python benchmark_serving.py --model=meta-llama/Meta-Llama-3.1-8B-Instruct --dataset-name=random --random-input-len=1000 --random-output-len=200   --request-rate=1.3

image

A10 1P1D TP1 python benchmark_serving.py --model=meta-llama/Meta-Llama-3.1-8B-Instruct --dataset-name=random --random-input-len=1000 --random-output-len=200   --request-rate=2.6

image

TPOT is decreased by 27%, while the TTFT is increased, which was caused by the prefill request queuing.That can be optimized by adjusting to a more suitable PD ratio.

Conclusion

We have developed a flexible implementation for pd-disaggregation, with a special focus on XpYd support and the asynchronous KV cache transfer.  We hope to contribute to the community in these areas, further boosting the performance of disaggregated prefilling.

@tanzelin430
Copy link

We (Alibaba Cloud) are actively developing a disaggregated prefilling feature for vLLM to tackle latency issues and minimize interference during prefilling and decoding.  Leveraging fully asynchronous I/O, it ensures minimal overhead for PD disaggregation. This implementation has been widely verified in our production system, proving its robust stability and exceptional performance.

Design Highlights

  • Fully asynchronous: KV Cache transfer does not block computation. We observed with nsys that NCCL communication and computation are fully overlapped with different CUDA streams.image
  • Control the behavior of disaggregated prefill at the request level: Our engine is designed to handle each request with the flexibility to switch between different serving strategies, ranging from single-instance serving to PD disaggregation. This architecture markedly enhances the scheduler’s potential for optimization. By merely including{"prefill_endpoint": "http://192.168.1.124:8001/v1/completions"}in the request, we can conduct PD disaggregation with the prefill instance selected by our global scheduler based on workload attributes and instance-level performance metrics. This capability allows for on-the-fly adjustment of the P/D instance ratios to optimize performance and facilitates instantaneous role transitions as required.

Workflow

PD disaggregation inference can enabled by using the"prefill_endpointparameter. However, to achieve optimal global load balancing, enhance prefix caching affinity, and minimize the mismatch between P/D instances, a global scheduler has been incorporated into the system. The whole process is structured as follows:

image

Performance Evaluation

Microbenchmark

A10 Single instance TP1python benchmark_serving.py --model=meta-llama/Meta-Llama-3.1-8B-Instruct --dataset-name=random --random-input-len=1000 --random-output-len=200   --request-rate=1.3

image

A10 1P1D TP1 python benchmark_serving.py --model=meta-llama/Meta-Llama-3.1-8B-Instruct --dataset-name=random --random-input-len=1000 --random-output-len=200   --request-rate=2.6

image

TPOT is decreased by 27%, while the TTFT is increased, which was caused by the prefill request queuing.That can be optimized by adjusting to a more suitable PD ratio.

Conclusion

We have developed a flexible implementation for pd-disaggregation, with a special focus on XpYd support and the asynchronous KV cache transfer.  We hope to contribute to the community in these areas, further boosting the performance of disaggregated prefilling.

@yuleil Hello, I was wondering how to use nsys to profile such distributed system, I have lots of experience in using nsys to profile vllm. But for PD disagg You know I have to run prefill/decode instance seperately, I want use one nsys profile two seperate instance. After check the help doc I still can not find the solution.

@Jeffwan
Copy link
Contributor

Jeffwan commented Dec 5, 2024

Let's also add some orchestration support in the roadmap. Seems how to orchestrate such stateful application is not covered yet. Let's create one sub-task to track it

@a32543254
Copy link

Hi @KuntaiDu
How can disaggregated prefill Compatible with chunked prefill ?
As I know, chunked prefill will chunk large prefills into smaller chunks and batch them together with decode requests, which combine the prefill and decode rather than separation.
Do you mean we chunk the large input into smaller chunks only in prefill instance? To avoid frequent dynamic adjustment of kernel dispatching and memory allocation?

@KuntaiDu
Copy link
Collaborator Author

Hi @KuntaiDu How can disaggregated prefill Compatible with chunked prefill ? As I know, chunked prefill will chunk large prefills into smaller chunks and batch them together with decode requests, which combine the prefill and decode rather than separation. Do you mean we chunk the large input into smaller chunks only in prefill instance? To avoid frequent dynamic adjustment of kernel dispatching and memory allocation?

Chunked prefill chunks is useful in terms of controlling the peak GPU memory usage of prefilling very long context. So for long context usecase, it makes sense to use both.

@ANormalMan12
Copy link

We (Alibaba Cloud) are actively developing a disaggregated prefilling feature for vLLM to tackle latency issues and minimize interference during prefilling and decoding.  Leveraging fully asynchronous I/O, it ensures minimal overhead for PD disaggregation. This implementation has been widely verified in our production system, proving its robust stability and exceptional performance.

Design Highlights

  • Fully asynchronous: KV Cache transfer does not block computation. We observed with nsys that NCCL communication and computation are fully overlapped with different CUDA streams.image
  • Control the behavior of disaggregated prefill at the request level: Our engine is designed to handle each request with the flexibility to switch between different serving strategies, ranging from single-instance serving to PD disaggregation. This architecture markedly enhances the scheduler’s potential for optimization. By merely including{"prefill_endpoint": "http://192.168.1.124:8001/v1/completions"}in the request, we can conduct PD disaggregation with the prefill instance selected by our global scheduler based on workload attributes and instance-level performance metrics. This capability allows for on-the-fly adjustment of the P/D instance ratios to optimize performance and facilitates instantaneous role transitions as required.

Workflow

PD disaggregation inference can enabled by using the"prefill_endpointparameter. However, to achieve optimal global load balancing, enhance prefix caching affinity, and minimize the mismatch between P/D instances, a global scheduler has been incorporated into the system. The whole process is structured as follows:

Performance Evaluation

Microbenchmark

A10 Single instance TP1python benchmark_serving.py --model=meta-llama/Meta-Llama-3.1-8B-Instruct --dataset-name=random --random-input-len=1000 --random-output-len=200   --request-rate=1.3

A10 1P1D TP1 python benchmark_serving.py --model=meta-llama/Meta-Llama-3.1-8B-Instruct --dataset-name=random --random-input-len=1000 --random-output-len=200   --request-rate=2.6

TPOT is decreased by 27%, while the TTFT is increased, which was caused by the prefill request queuing.That can be optimized by adjusting to a more suitable PD ratio.

Conclusion

We have developed a flexible implementation for pd-disaggregation, with a special focus on XpYd support and the asynchronous KV cache transfer.  We hope to contribute to the community in these areas, further boosting the performance of disaggregated prefilling.

Fully asynchronous KV Cache transfer is a great feature. It can reduce latency. I hope that this feature can be merged to the main branch soon. Will it be merged to the main branch? If so, when will it be merged? @yuleil

@dutsc
Copy link

dutsc commented Jan 9, 2025

@KuntaiDu. I am trying to implement XpYd (taking 1P3D as an example), here are my method and problem.

Method

  1. Implement a polling distribution scheme in the proxy, let original_request_data["pd_pair"]=[0,1],[0,2],[0,3],[0,1]... This is implemented for the convenience of testing. There can be further optimization of the distribution strategy here.
  2. Pass the pd_pair parameter all the way to the execute_model() function of model_runner, and also add the pd_pair parameter to the send_kv_caches_and_hidden_states and recv_kv_caches_and_hidden_states functions. The path of the pd_pair parameter is: send_kv_caches_and_hidden_states->insert()->drop_select_handler()->send_tensor()->send_tensor_wrapper()->send_impl(), and the same is true for the recv side. The final landing point is that before the self.device_send_func function is actually run each time, I will modify self.target_rank_for_send = rank, and the same is true for the recv side. In this way, I achieve my purpose of controlling the process of request sending and receiving.
  3. In this way, I can control which PD instances the request needs to be sent to for execution by changing the pd_pair of the request.

Problem

But the problem I encountered is: when I send an instance of the request pd_pair=[0,2], the result can be returned normally, and when I continue to send requests for pd_pair=[0,2], the result can still be returned normally. But once I change pd_pair=[0,1] or [0,3], the program will get stuck without any error.
I guess the problem is that I can't change the rank of the sending and receiving ends of the nccl pipe at will. Do I need to create 3 connections for 1P3D and use a different connection each time I send? (Instead of creating one connection and changing the target rank each time)

Note: Even if I use TCPStore to transfer kvcache, the same problem occurs that the system gets stuck after the next request changes pd_pair, and through the log, I found that the stuck position is after the D instance sends the signal. So it is not a problem with nccl at all, but it is stuck somewhere else! Maybe I need to check my system implementation again.

Note

my send func is as follows:

    def _send_impl(self, tensor: Optional[torch.Tensor], rank: int) -> None:
        """
        The actual implementation of sending the tensor and its metadata to the 
        target rank.

        Parameters:
            - tensor: The input tensor to be sent, or None if no tensor is 
              being sent.
        """
        metadata = self._make_metadata(tensor)
        self._send_metadata(metadata, rank)
        if tensor is not None:
            self.target_rank_for_send = rank
            self.device_send_func(tensor.to(self.device),
                                  self.target_rank_for_send)

recv func:

    def _recv_impl(self, rank) -> Optional[torch.Tensor]:
        """
        The actual implementation of receiving a tensor and its metadata from 
        the target rank.

        Returns:
            - buffer: The received tensor, or None if no tensor is received.
        """
        metadata = self._recv_metadata(rank)
        if metadata["dtype"] is None:
            return None
        buffer = self._prepare_recv_buffer(metadata)
        self.target_rank_for_recv = rank
        self.device_recv_func(buffer, self.target_rank_for_recv)

@dutsc
Copy link

dutsc commented Jan 10, 2025

I solved the problem of the system hanging when changing pd_pair.

Problem description

I only passed in parameters when creating the drop_select_handler thread, but after changing pd_pair, I could not pass the new d_rank to the drop_select_handler thread. This is because I cannot pass parameters to a running thread.

Solution

I use queue to pass parameters to the drop_select_handler thread. Every time insert, d_rank_queue.put(d_rank) operation is performed, and d_rank = d_rank_queue.get() is first performed in the while loop of the drop_select_handler thread each time.

New Bug

But I encountered a new problem:

Traceback (most recent call last):
  File "/usr/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.10/threading.py", line 953, in run
    self._target(*self._args, **self._kwargs)
  File "/workspace/disagg/vllm/vllm/distributed/kv_transfer/kv_lookup_buffer/simple_buffer.py", line 159, in drop_select_handler
    temp_length = self._matches(self.buffer[0],
  File "/workspace/disagg/vllm/vllm/distributed/kv_transfer/kv_lookup_buffer/simple_buffer.py", line 74, in _matches
    tokens_recver = tokens_recver[roi_recver]
IndexError: The shape of the mask [63] at index 0 does not match the shape of the indexed tensor [18] at index 0

What is going on here? How can I solve this problem?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

10 participants