Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce (experimental) DALI proxy #5726

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
25 changes: 18 additions & 7 deletions dali/python/nvidia/dali/external_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,27 @@
)


def _get_shape(data):
if hasattr(data, "shape"):
return data.shape() if callable(data.shape) else data.shape
elif hasattr(data, "__array_interface__"):
return data.__array_interface__["shape"]
elif hasattr(data, "__cuda_array_interface__"):
return data.__cuda_array_interface__["shape"]
elif hasattr(data, "__array__"):
return data.__array__().shape
else:
raise RuntimeError(f"Don't know how to extract the shape out of {type(data)}")


def _get_batch_shape(data):
if isinstance(data, (list, tuple, _tensors.TensorListCPU, _tensors.TensorListGPU)):
if len(data) == 0:
return [], True
if callable(data[0].shape):
return [x.shape() for x in data], False
else:
return [x.shape for x in data], False
return [_get_shape(x) for x in data], False
else:
shape = data.shape
if callable(shape):
shape = data.shape()
shape = _get_shape(data)
return [shape[1:]] * shape[0], True


Expand Down Expand Up @@ -68,6 +77,8 @@ def to_numpy(x):
return x.asnumpy()
elif _types._is_torch_tensor(x):
return x.numpy()
elif hasattr(x, "__array__"):
return x.__array__()
else:
return x

Expand All @@ -79,7 +90,7 @@ def to_numpy(x):
if layout is not None:
_check_data_batch(data, batch_size, layout)
data = type(data)(data, layout)
elif isinstance(data, list):
elif isinstance(data, (list, tuple)):
inputs = []
checked = False
for datum in data:
Expand Down
7 changes: 7 additions & 0 deletions dali/python/nvidia/dali/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,12 @@ def is_restored_from_checkpoint(self):
"""If True, this pipeline was restored from checkpoint."""
return self._is_restored_from_checkpoint

@property
def num_outputs(self):
"""Number of pipeline outputs."""
self.build()
return self._num_outputs
jantonguirao marked this conversation as resolved.
Show resolved Hide resolved

def output_dtype(self) -> list:
"""Data types expected at the outputs."""
self.build()
Expand Down Expand Up @@ -854,6 +860,7 @@ def contains_nested_datanode(nested):
self._require_no_foreign_ops("The pipeline does not support checkpointing")

self._graph_outputs = outputs
self._num_outputs = len(self._graph_outputs)
self._setup_input_callbacks()
self._disable_pruned_external_source_instances()
self._py_graph_built = True
Expand Down
66 changes: 4 additions & 62 deletions dali/python/nvidia/dali/plugin/pytorch/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,89 +15,31 @@
import sys

from typing import Union, Optional
from typing import Any, Dict, List
from typing import Dict, List

from nvidia.dali import internal as _internal
from nvidia.dali import ops
from nvidia.dali import types
from nvidia.dali.backend import TensorCPU, TensorGPU, TensorListCPU, TensorListGPU
from nvidia.dali.backend import TensorCPU, TensorGPU, TensorListGPU
from nvidia.dali.pipeline import Pipeline

from nvidia.dali.plugin.base_iterator import _DaliBaseIterator
from nvidia.dali.plugin.base_iterator import LastBatchPolicy

import torch
import torch.utils.dlpack as torch_dlpack # noqa: F401
import ctypes
import numpy as np

from . import fn # noqa: F401
from . import experimental # noqa: F401

from nvidia.dali.plugin.pytorch.torch_utils import to_torch_type, feed_ndarray
from nvidia.dali.plugin.pytorch._torch_function import TorchPythonFunction as TorchPythonFunction

_internal._adjust_operator_module(TorchPythonFunction, sys.modules[__name__], [])

ops._wrap_op(TorchPythonFunction, "fn", __name__)


to_torch_type = {
types.DALIDataType.FLOAT: torch.float32,
types.DALIDataType.FLOAT64: torch.float64,
types.DALIDataType.FLOAT16: torch.float16,
types.DALIDataType.UINT8: torch.uint8,
types.DALIDataType.INT8: torch.int8,
types.DALIDataType.BOOL: torch.bool,
types.DALIDataType.INT16: torch.int16,
types.DALIDataType.INT32: torch.int32,
types.DALIDataType.INT64: torch.int64,
}


def feed_ndarray(
dali_tensor: Union[TensorCPU, TensorGPU, TensorListCPU, TensorListGPU],
arr: torch.Tensor,
cuda_stream: Union[torch.cuda.Stream, Any, None] = None,
) -> torch.Tensor:
"""
Copy contents of DALI tensor to PyTorch's Tensor.

Parameters
----------
dali_tensor : nvidia.dali.backend.TensorCPU or nvidia.dali.backend.TensorGPU
Tensor from which to copy
arr : torch.Tensor
Destination of the copy
cuda_stream : torch.cuda.Stream, cudaStream_t or any value that can be cast to cudaStream_t.
CUDA stream to be used for the copy
(if not provided, an internal user stream will be selected)
In most cases, using pytorch's current stream is expected (for example,
if we are copying to a tensor allocated with torch.zeros(...))
"""
dali_type = to_torch_type[dali_tensor.dtype]

assert dali_type == arr.dtype, (
"The element type of DALI Tensor/TensorList"
" doesn't match the element type of the target PyTorch Tensor: "
"{} vs {}".format(dali_type, arr.dtype)
)
assert dali_tensor.shape() == list(
arr.size()
), "Shapes do not match: DALI tensor has size {0}, but PyTorch Tensor has size {1}".format(
dali_tensor.shape(), list(arr.size())
)

non_blocking = cuda_stream is not None
cuda_stream = types._raw_cuda_stream_ptr(cuda_stream)

# turn raw int to a c void pointer
c_type_pointer = ctypes.c_void_p(arr.data_ptr())
if isinstance(dali_tensor, (TensorGPU, TensorListGPU)):
dali_tensor.copy_to_external(c_type_pointer, cuda_stream, non_blocking=non_blocking)
else:
dali_tensor.copy_to_external(c_type_pointer)
return arr


class DALIGenericIterator(_DaliBaseIterator):
"""
General DALI iterator for PyTorch. It can return any number of
Expand Down
15 changes: 15 additions & 0 deletions dali/python/nvidia/dali/plugin/pytorch/experimental/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from . import proxy # noqa: F401
Loading
Loading