Skip to content

vllm.v1.engine.tensor_ipc

Tensor IPC transport via torch.multiprocessing.Queue.

This module contains the queue-based transport logic for sharing tensors between processes (e.g., API server -> engine core). The msgpack layer emits/consumes lightweight :class:TensorIpcData values, while transport state such as request association, handle generation, queue routing, buffering, and cleanup lives here.

TensorIpcData dataclass

Data sent via torch.multiprocessing.Queue for zero-copy IPC.

Contains the tensor_id and the actual tensor. The tensor is shared in memory (GPU or CPU) for efficient inter-process communication.

Source code in vllm/v1/engine/tensor_ipc.py
@dataclasses.dataclass
class TensorIpcData:
    """
    Data sent via torch.multiprocessing.Queue for zero-copy IPC.

    Contains the tensor_id and the actual tensor. The tensor is
    shared in memory (GPU or CPU) for efficient inter-process communication.
    """

    sender_id: str
    message_id: int
    tensor_id: int
    tensor: torch.Tensor

TensorIpcReceiver

Receive-side logic for tensor IPC via torch.multiprocessing.Queue.

Wraps the queue receive logic previously embedded in MsgpackDecoder.

Source code in vllm/v1/engine/tensor_ipc.py
class TensorIpcReceiver:
    """Receive-side logic for tensor IPC via torch.multiprocessing.Queue.

    Wraps the queue receive logic previously embedded in MsgpackDecoder.
    """

    def __init__(self, queue: TensorIpcQueue):
        self.queue = queue
        self._tensor_buffers = defaultdict[str, _Sender](_Sender)

    def __call__(
        self, dtype: str, shape: tuple[int, ...], meta: dict[str, Any]
    ) -> torch.Tensor:
        """Retrieve a tensor from torch.multiprocessing.Queue.

        Uses a drain-and-buffer pattern: drains all available tensors from
        the queue, buffering them, until the requested tensor is found.
        Works for CUDA and CPU.
        """

        # Create lookup key from handle
        sender_id: str = meta["sender_id"]
        message_id: int = meta["message_id"]
        tensor_id: int = meta["tensor_id"]

        # Drain all available tensors. We save them regardless if this is
        # the one we're waiting for as they may arrive out of order from
        # multiple producers.
        while True:
            sender = self._tensor_buffers.get(sender_id)
            if sender is not None:
                tensors = sender.tensors
                tensor = tensors.get(message_id, {}).pop(tensor_id, None)
                if tensor is not None:
                    if sender.current_message_id != message_id:
                        while tensors and (mid := next(iter(tensors))) < message_id:
                            if sender.tensors.pop(mid):
                                logger.warning(
                                    "Discarding %d stale tensors from sender %s",
                                    sender_id,
                                )
                        sender.current_message_id = message_id
                    logger.debug(
                        "Received tensor %s from sender %s for (shape=%s, device=%s) "
                        "via IPC queue (shared memory)",
                        (message_id, tensor_id),
                        sender_id,
                        tensor.shape,
                        tensor.device,
                    )
                    return tensor

            ipc_data: TensorIpcData = self.queue.get(timeout=10.0)

            # Store tensor
            sender = self._tensor_buffers[ipc_data.sender_id]
            if sender.current_message_id > ipc_data.message_id:
                logger.warning(
                    "Ignoring stale tensor from sender %s", ipc_data.sender_id
                )
                continue

            sender.tensors.setdefault(ipc_data.message_id, {})[ipc_data.tensor_id] = (
                ipc_data.tensor
            )

__call__

__call__(
    dtype: str, shape: tuple[int, ...], meta: dict[str, Any]
) -> Tensor

Retrieve a tensor from torch.multiprocessing.Queue.

Uses a drain-and-buffer pattern: drains all available tensors from the queue, buffering them, until the requested tensor is found. Works for CUDA and CPU.

Source code in vllm/v1/engine/tensor_ipc.py
def __call__(
    self, dtype: str, shape: tuple[int, ...], meta: dict[str, Any]
) -> torch.Tensor:
    """Retrieve a tensor from torch.multiprocessing.Queue.

    Uses a drain-and-buffer pattern: drains all available tensors from
    the queue, buffering them, until the requested tensor is found.
    Works for CUDA and CPU.
    """

    # Create lookup key from handle
    sender_id: str = meta["sender_id"]
    message_id: int = meta["message_id"]
    tensor_id: int = meta["tensor_id"]

    # Drain all available tensors. We save them regardless if this is
    # the one we're waiting for as they may arrive out of order from
    # multiple producers.
    while True:
        sender = self._tensor_buffers.get(sender_id)
        if sender is not None:
            tensors = sender.tensors
            tensor = tensors.get(message_id, {}).pop(tensor_id, None)
            if tensor is not None:
                if sender.current_message_id != message_id:
                    while tensors and (mid := next(iter(tensors))) < message_id:
                        if sender.tensors.pop(mid):
                            logger.warning(
                                "Discarding %d stale tensors from sender %s",
                                sender_id,
                            )
                    sender.current_message_id = message_id
                logger.debug(
                    "Received tensor %s from sender %s for (shape=%s, device=%s) "
                    "via IPC queue (shared memory)",
                    (message_id, tensor_id),
                    sender_id,
                    tensor.shape,
                    tensor.device,
                )
                return tensor

        ipc_data: TensorIpcData = self.queue.get(timeout=10.0)

        # Store tensor
        sender = self._tensor_buffers[ipc_data.sender_id]
        if sender.current_message_id > ipc_data.message_id:
            logger.warning(
                "Ignoring stale tensor from sender %s", ipc_data.sender_id
            )
            continue

        sender.tensors.setdefault(ipc_data.message_id, {})[ipc_data.tensor_id] = (
            ipc_data.tensor
        )

TensorIpcSender

Bases: OOBTensorConsumer

Send-side logic for tensor IPC via torch.multiprocessing.Queue.

Uses a single queue targeting rank 0 (the only rank that consumes multimodal tensors during TP>1 / PP>1. Note: DP>1 not supported).

Source code in vllm/v1/engine/tensor_ipc.py
class TensorIpcSender(OOBTensorConsumer):
    """Send-side logic for tensor IPC via torch.multiprocessing.Queue.

    Uses a single queue targeting rank 0 (the only rank that consumes
    multimodal tensors during TP>1 / PP>1. Note: DP>1 not supported).
    """

    def __init__(self, queue: TensorIpcQueue):
        self.queue = queue
        self._tensor_id_counter = 0
        self._message_counter = 0
        self._sender_id = uuid.uuid4().hex[:8]

    def set_target_engine(self, target_engine: int) -> None:
        if target_engine != 0:
            raise IndexError(
                "TensorIpcSender only supports a single queue; "
                f"got target engine {target_engine}"
            )

    def new_message(self) -> None:
        self._message_counter += 1
        self._tensor_id_counter = 0

    def __call__(self, tensor: torch.Tensor) -> dict[str, Any] | None:
        """Send tensor via queue, return its handle. Returns None if failed."""
        try:
            # Move tensor to shared memory for IPC
            # This is required for proper inter-process communication
            if not tensor.is_shared():
                tensor = tensor.share_memory_()

            metadata = {
                "sender_id": self._sender_id,
                "message_id": self._message_counter,
                "tensor_id": self._tensor_id_counter,
            }

            self._tensor_id_counter += 1

            ipc_data = TensorIpcData(**metadata, tensor=tensor)  # type: ignore[arg-type]

            # Use a timeout to avoid blocking indefinitely
            self.queue.put(ipc_data, timeout=10.0)

            logger.debug(
                "Sent tensor %s for (shape=%s, device=%s) "
                "via IPC queue (shared memory)",
                metadata,
                tensor.shape,
                tensor.device,
            )

            return metadata
        except Exception as e:
            logger.warning(
                "Failed to send tensor via IPC queue: %s. "
                "Falling back to standard serialization.",
                e,
            )
            return None

__call__

__call__(tensor: Tensor) -> dict[str, Any] | None

Send tensor via queue, return its handle. Returns None if failed.

Source code in vllm/v1/engine/tensor_ipc.py
def __call__(self, tensor: torch.Tensor) -> dict[str, Any] | None:
    """Send tensor via queue, return its handle. Returns None if failed."""
    try:
        # Move tensor to shared memory for IPC
        # This is required for proper inter-process communication
        if not tensor.is_shared():
            tensor = tensor.share_memory_()

        metadata = {
            "sender_id": self._sender_id,
            "message_id": self._message_counter,
            "tensor_id": self._tensor_id_counter,
        }

        self._tensor_id_counter += 1

        ipc_data = TensorIpcData(**metadata, tensor=tensor)  # type: ignore[arg-type]

        # Use a timeout to avoid blocking indefinitely
        self.queue.put(ipc_data, timeout=10.0)

        logger.debug(
            "Sent tensor %s for (shape=%s, device=%s) "
            "via IPC queue (shared memory)",
            metadata,
            tensor.shape,
            tensor.device,
        )

        return metadata
    except Exception as e:
        logger.warning(
            "Failed to send tensor via IPC queue: %s. "
            "Falling back to standard serialization.",
            e,
        )
        return None