U
    yhZ                  
   @   sT  U d dl Z d dlZd dlmZmZmZmZmZmZm	Z	m
Z
mZmZ d dlmZ ddlmZmZmZmZmZmZ ddlmZ e	eee
e f  Zeddddd	ee	e ee ee	e ed
ddZG dd dZi Zeedf ed< e Zejed< eeddddZdee eeef e ee dddZ!eeej"ef e	e dddZ#dS )    N)
AnyCallableDictIterableIteratorListOptionalSequenceTupleUnion)
exposed_in   )_C_library_opsautogradlibraryTensor   )utilsztorch.library)device_typesschema)namefnmutates_argsr   r   returnc                  s&    fdd}|dkr|S ||S )a  Wraps a function into custom operator.

    Reasons why you may want to create a custom op include:
    - Wrapping a third-party library or custom kernel to work with PyTorch
    subsystems like Autograd.
    - Preventing torch.compile/export/FX tracing from peeking inside your function.

    This API is used as a decorator around a function (please see examples).
    The provided function must have type hints; these are needed to interface
    with PyTorch's various subsystems.

    Args:
        name (str): A name for the custom op that looks like "{namespace}::{name}",
            e.g. "mylib::my_linear". The name is used as the op's stable identifier
            in PyTorch subsystems (e.g. torch.export, FX graphs).
            To avoid name collisions, please use your project name as the namespace;
            e.g. all custom ops in pytorch/fbgemm use "fbgemm" as the namespace.
        mutates_args (Iterable[str]): The names of args that the function mutates.
            This MUST be accurate, otherwise, the behavior is undefined.
        device_types (None | str | Sequence[str]): The device type(s) the function
            is valid for. If no device type is provided, then the function
            is used as the default implementation for all device types.
            Examples: "cpu", "cuda".
        schema (None | str): A schema string for the operator. If None
            (recommended) we'll infer a schema for the operator from its type
            annotations. We recommend letting us infer a schema unless you
            have a specific reason not to.
            Example: "(Tensor x, int y) -> (Tensor, Tensor)".

    .. note::
        We recommend not passing in a ``schema`` arg and instead letting us infer
        it from the type annotations. It is error-prone to write your own schema.
        You may wish to provide your own schema if our interpretation of
        the type annotation is not what you want.
        For more info on how to write a schema string, see
        `here <https://github.com/pytorch/pytorch/blob/main/aten/src/ATen/native/README.md#func>`_

    Examples::
        >>> import torch
        >>> from torch import Tensor
        >>> from torch.library import custom_op
        >>> import numpy as np
        >>>
        >>> @custom_op("mylib::numpy_sin", mutates_args=())
        >>> def numpy_sin(x: Tensor) -> Tensor:
        >>>     x_np = x.cpu().numpy()
        >>>     y_np = np.sin(x_np)
        >>>     return torch.from_numpy(y_np).to(device=x.device)
        >>>
        >>> x = torch.randn(3)
        >>> y = numpy_sin(x)
        >>> assert torch.allclose(y, x.sin())
        >>>
        >>> # Example of a custom op that only works for one device type.
        >>> @custom_op("mylib::numpy_sin_cpu", mutates_args=(), device_types="cpu")
        >>> def numpy_sin_cpu(x: Tensor) -> Tensor:
        >>>     x_np = x.numpy()
        >>>     y_np = np.sin(x_np)
        >>>     return torch.from_numpy(y_np)
        >>>
        >>> x = torch.randn(3)
        >>> y = numpy_sin_cpu(x)
        >>> assert torch.allclose(y, x.sin())
        >>>
        >>> # Example of a custom op that mutates an input
        >>> @custom_op("mylib::numpy_sin_inplace", mutates_args={"x"}, device_types="cpu")
        >>> def numpy_sin_inplace(x: Tensor) -> None:
        >>>     x_np = x.numpy()
        >>>     np.sin(x_np, out=x_np)
        >>>
        >>> x = torch.randn(3)
        >>> expected = x.sin()
        >>> numpy_sin_inplace(x)
        >>> assert torch.allclose(x, expected)

    c                    s   dd l }d kr*dd l}|jj| }n}d\}}t|||| }d k	rt }|jj	j
D ]"}|jd k	rb|jjrb||j qb|tkrtd d d| d| |  |S )Nr   ::z3Attempted to create a custom op with `mutates_args=z` and `schema=z*. The schema suggests that the op mutates z`which is different from what was provided to us in `mutates_args`. Please make these consistent.)torchZtorch._custom_op.implZ
_custom_opimplZinfer_schemasplitCustomOpDefset_opoverload_schema	arguments
alias_infois_writeaddr   
ValueErrorregister_kernel)r   r   
schema_str	namespaceopnameresultexpectedargr   r   r   r    K/var/www/html/venv/lib/python3.8/site-packages/torch/_library/custom_ops.pyinnerq   s$    zcustom_op.<locals>.innerNr1   )r   r   r   r   r   r3   r1   r0   r2   	custom_op   s    Wr4   c                   @   s   e Zd ZdZeeeeddddZeedddZedd	d
Z	de
ee edddZeedddZddeee ddddZddddZdd ZdS )r    a  CustomOpDef is a wrapper around a function that turns it into a custom op.

    It has various methods for registering additional behavior for this
    custom op.

    You should not instantiate CustomOpDef directly; instead, use the
    :func:`torch.library.custom_op` API.
    N)r+   r   r   r   r   c                 C   sV   || _ || _|| _|| _i | _d | _d | _d | _t| j | j| _	| 
  | t| j< d S N)
_namespace_namer#   _init_fn_backend_fns_abstract_fn_setup_context_fn_backward_fnget_library_allowing_overwrite_lib_register_to_dispatcherOPDEFS	_qualname)selfr+   r   r   r   r1   r1   r2   __init__   s    zCustomOpDef.__init__)r   c                 C   s   | j  d| j S )Nr   )r6   r7   rB   r1   r1   r2   rA      s    zCustomOpDef._qualnamec                 C   s   d| j  dS )Nz<CustomOpDef(z)>)rA   rD   r1   r1   r2   __repr__   s    zCustomOpDef.__repr__)r   r   r   c                   s"    fdd}|dkr|S ||S )a  Register an implementation for a device type for this operator.

        Some valid device_types are: "cpu", "cuda", "xla", "mps", "ipu", "xpu".
        This API may be used as a decorator.

        Args:
            fn (Callable): The function to register as the implementation for
                the given device types.
            device_types (str | Sequence[str]): The device device_types to register an impl to.

        Examples::
            >>> # xdoctest: +REQUIRES(env:TORCH_DOCTEST_CUDA)
            >>> import torch
            >>> from torch import Tensor
            >>> from torch.library import custom_op
            >>> import numpy as np
            >>>
            >>> # Create a custom op that works on cpu
            >>> @custom_op("mylib::numpy_sin", mutates_args=(), device_types="cpu")
            >>> def numpy_sin(x: Tensor) -> Tensor:
            >>>     x_np = x.numpy()
            >>>     y_np = np.sin(x_np)
            >>>     return torch.from_numpy(y_np)
            >>>
            >>> # Add implementations for the cuda device
            >>> @numpy_sin.register_kernel("cuda")
            >>> def _(x):
            >>>     x_np = x.cpu().numpy()
            >>>     y_np = np.sin(x_np)
            >>>     return torch.from_numpy(y_np).to(device=x.device)
            >>>
            >>> x_cpu = torch.randn(3)
            >>> x_cuda = x_cpu.cuda()
            >>> assert torch.allclose(numpy_sin(x_cpu), x_cpu.sin())
            >>> assert torch.allclose(numpy_sin(x_cuda), x_cuda.sin())

        c                    s   d kst trg}nt}|D ]Z  jkrv fdd} d kr^jj|d njj|t  | j < q&| S )Nc            	         s   dd t | |D }j  | |}|}t|ts8|f}t |i D ]V}t| }t| |krj  }t|}tdj	 d| d|
| qB|S )Nc                 S   s   h | ]}t | qS r1   )iduntyped_storage).0tensorr1   r1   r2   	<setcomp>   s   zSCustomOpDef.register_kernel.<locals>.inner.<locals>.backend_impl.<locals>.<setcomp>zTensors returned from custom ops (1) must not be inputs to the custom op and (2) may not alias any inputs or other returns. Please clone the the offending output tensors (e.g. output.clone()) or refactor your code. Offending op: z (with implementation in ))iter_tensorsr9   
isinstancetuplerF   rG   inspect	getmoduleRuntimeErrorr7   r'   )	argskwargsZstoragesr-   Ztuple_resultrI   keyr   module)device_typerB   r1   r2   backend_impl   s"    


z@CustomOpDef.register_kernel.<locals>.inner.<locals>.backend_implZCompositeExplicitAutograd)	rM   strlistr9   r>   r   r7   r   Z_dispatch_key_for_device)r   ZdtypesrW   r   rB   )rV   r2   r3      s&    
  z*CustomOpDef.register_kernel.<locals>.innerNr1   )rB   r   r   r3   r1   rZ   r2   r)      s    )3zCustomOpDef.register_kernel)r   r   c                C   s
   || _ |S )a  Register a FakeTensor implementation for this custom op.

        This is necessary to get the operator to work efficiently with torch.compile.

        The Fake impl (sometimes also known as a meta kernel or abstract impl)
        specifies the behavior of this operator on Tensors that carry no data.
        Given some input Tensors with certain properties
        (sizes/strides/storage_offset/device), it specifies what the properties of
        the output Tensors are.

        Please see :func:`torch.library.impl_abstract` for more details.

        Args:
            fn (Callable): The function to register as the FakeTensor
                implementation.

        Examples:
            >>> import torch
            >>> import numpy as np
            >>> from torch import Tensor
            >>>
            >>> # Example 1: an operator without data-dependent output shape
            >>> @torch.library.custom_op("mylib::linear", mutates_args=())
            >>> def linear(x: Tensor, weight: Tensor, bias: Tensor) -> Tensor:
            >>>     return (x @ weight.t()) + bias
            >>>
            >>> @linear.register_fake
            >>> def _(x, weight, bias):
            >>>     assert x.dim() == 2
            >>>     assert weight.dim() == 2
            >>>     assert bias.dim() == 1
            >>>     assert x.shape[1] == weight.shape[1]
            >>>     assert weight.shape[0] == bias.shape[0]
            >>>     assert x.device == weight.device
            >>>     return x.new_empty(x.size(0), weight.size(0))
            >>>
            >>> x = torch.randn(2, 2)
            >>> weight = torch.randn(2, 2)
            >>> bias = torch.randn(2)
            >>> # xdoctest: +SKIP("Requires Python <= 3.11")
            >>> out = torch.compile(linear, fullgraph=True)(x, weight, bias)
            >>> # xdoctest: +SKIP("Requires Python <= 3.11")
            >>> assert torch.allclose(out, torch.nn.functional.linear(x, weight, bias))
            >>>
            >>> # Example 2: an operator with data-dependent output shape
            >>> @torch.library.custom_op("mylib::nonzero", mutates_args=())
            >>> def nonzero(x: Tensor) -> Tensor:
            >>>     x_np = x.cpu().numpy()
            >>>     res = np.stack(np.nonzero(x_np), axis=1)
            >>>     return torch.tensor(res, device=x.device)
            >>>
            >>> @nonzero.register_fake
            >>> def _(x):
            >>>     # Number of nonzero-elements is data-dependent.
            >>>     # Since we cannot peek at the data in an abstract impl,
            >>>     # we use the ctx object to construct a new symint that
            >>>     # represents the data-dependent size.
            >>>     ctx = torch.library.get_ctx()
            >>>     nnz = ctx.new_dynamic_size()
            >>>     shape = [nnz, x.dim()]
            >>>     result = x.new_empty(shape, dtype=torch.int64)
            >>>     return result
            >>>
            >>> x = torch.tensor([0, 1, 2, 0, 0, 1])
            >>> # xdoctest: +SKIP("Requires Python <= 3.11")
            >>> out = torch.compile(nonzero, fullgraph=True)(x)
            >>> # xdoctest: +SKIP("Requires Python <= 3.11")
            >>> assert torch.allclose(out, x.nonzero())

        )r:   )rB   r   r1   r1   r2   register_fake  s    GzCustomOpDef.register_fake)setup_context)backwardr\   r   c               C   s:   | j j}tj|s*td|  d| d|| _|| _dS )ad  Register a backward formula for this custom op.

        In order for an operator to work with autograd, you need to register
        a backward formula:
        1. You must tell us how to compute gradients during the backward pass
        by providing us a "backward" function.
        2. If you need any values from the forward to compute gradients, you can
        use `setup_context` to save values for backward.

        ``backward_fn`` runs during the backward pass. It accepts ``(ctx, *grads)``:
        - ``grads`` is one or more gradients. The number of gradients matches
        the number of outputs of the operator.
        The ``ctx`` object is `the same ctx object <context_method_mixins>`_ used by
        :class:`torch.autograd.Function`. The semantics of ``backward_fn`` are the
        same as :meth:`torch.autograd.Function.backward`.

        ``setup_context(ctx, inputs, output)`` runs during the forward pass.
        Please save quantities needed for backward onto the ``ctx`` object via
        either :meth:`torch.autograd.function.FunctionCtx.save_for_backward`
        or assigning them as attributes of ``ctx``. If your custom op has
        kwarg-only arguments, we expect the signature of ``setup_context``
        to be ``setup_context(ctx, inputs, keyword_only_inputs, output)``.

        Both ``setup_context_fn`` and ``backward_fn`` must be traceable. That is,
        they may not directly access :meth:`torch.Tensor.data_ptr` and they must
        not depend on or mutate global state. If you need a non-traceable backward,
        you can make it a separate custom_op that you call inside ``backward_fn``.

        Examples:
            >>> import torch
            >>> import numpy as np
            >>> from torch import Tensor
            >>>
            >>> @torch.library.custom_op("mylib::numpy_sin", mutates_args=())
            >>> def numpy_sin(x: Tensor) -> Tensor:
            >>>     x_np = x.cpu().numpy()
            >>>     y_np = np.sin(x_np)
            >>>     return torch.from_numpy(y_np).to(device=x.device)
            >>>
            >>> def setup_context(ctx, inputs, output) -> Tensor:
            >>>     x, = inputs
            >>>     ctx.save_for_backward(x)
            >>>
            >>> def backward(ctx, grad):
            >>>     x, = ctx.saved_tensors
            >>>     return grad * x.cos()
            >>>
            >>> numpy_sin.register_autograd(backward, setup_context=setup_context)
            >>>
            >>> x = torch.randn(3, requires_grad=True)
            >>> y = numpy_sin(x)
            >>> grad_x, = torch.autograd.grad(y, x, torch.ones_like(y))
            >>> assert torch.allclose(grad_x, x.cos())
            >>>
            >>> # Example with a keyword-only arg
            >>> @torch.library.custom_op("mylib::numpy_mul", mutates_args=())
            >>> def numpy_mul(x: Tensor, *, val: float) -> Tensor:
            >>>     x_np = x.cpu().numpy()
            >>>     y_np = x_np * val
            >>>     return torch.from_numpy(y_np).to(device=x.device)
            >>>
            >>> def setup_context(ctx, inputs, keyword_only_inputs, output) -> Tensor:
            >>>     ctx.val = keyword_only_inputs["val"]
            >>>
            >>> def backward(ctx, grad):
            >>>     return grad * ctx.val
            >>>
            >>> numpy_mul.register_autograd(backward, setup_context=setup_context)
            >>>
            >>> x = torch.randn(3, requires_grad=True)
            >>> y = numpy_mul(x, val=3.14)
            >>> grad_x, = torch.autograd.grad(y, x, torch.ones_like(y))
            >>> assert torch.allclose(grad_x, torch.full_like(x, 3.14))

        z=Cannot register autograd formula for non-functional operator z with schema zP. Please create a functional operator and register an autograd formula for that.N)r"   r#   r   r   Zis_functional_schemarQ   r<   r;   )rB   r]   r\   r   r1   r1   r2   register_autograd]  s    RzCustomOpDef.register_autogradc                    s   j }jj }t|}t|r4td| |j|tj	j
tj	jgd tjj_fdd}|jj|dd tjj}|jj|ddd	 jj  jrЇ fd
d}|jj|ddd	 d S )NzUcustom_op with kwarg-only Tensor args. Please make your tensors not kwarg-only. Got: )tagsc                     sB    j d kr6tj jrd S td  d jj d j | |S )Nz&There was no fake impl registered for zM. This is necessary for torch.compile/export/fx tracing to work. Please use `z$.register_fake` to add an fake impl.)r:   r   r   Zcan_generate_trivial_fake_implr"   rQ   r8   __name__)rR   rS   rD   r1   r2   	fake_impl  s    
z6CustomOpDef._register_to_dispatcher.<locals>.fake_impl   )Z_stacklevelZAutogradT)Zwith_keysetc              
      s   t j ||D ]`\}}|js q|jjs*qt|trBtj	| qt|t
tfr|D ]}t|trTtj	| qTqt ( jj| tj@ f||W  5 Q R  S Q R X d S r5   )r   r   Z
zip_schemar%   r&   rM   r   r   graphZincrement_versionrN   rY   r   Z!_AutoDispatchBelowADInplaceOrViewr"   Z
redispatchZ_after_ADInplaceOrView_keyset)ZkeysetrR   rS   r/   valvr   rB   r1   r2   adinplaceorview_impl  s&    


zACustomOpDef._register_to_dispatcher.<locals>.adinplaceorview_implZADInplaceOrView)r>   r7   r#   r   Zparse_schemar   Zhas_kwarg_only_tensorsNotImplementedErrordefineTagZpt2_compliant_tagZneeds_fixed_stride_orderr   Z	lookup_oprA   r"   Z_register_faker   Zmake_autograd_implr   Z
is_mutable)rB   libr*   Z
cpp_schemara   Zautograd_implrg   r1   rf   r2   r?     s2    

z#CustomOpDef._register_to_dispatcherc                 O   s   | j ||S r5   )r"   )rB   rR   rS   r1   r1   r2   __call__  s    zCustomOpDef.__call__)N)r`   
__module____qualname____doc__rX   r   rC   propertyrA   rE   device_types_tr   r)   r[   r^   r?   rl   r1   r1   r1   r2   r       s(   	  `O]?r    zlibrary.LibraryOPDEF_TO_LIBr@   )r+   r   r   c                 C   s@   |  d| }|t kr(t |   t |= t| d}|t |< |S )Nr   ZFRAGMENT)rr   Z_destroyr   Library)r+   r   qualnamerk   r1   r1   r2   r=     s    r=   )rR   rS   allowed_nestingr   c                 #   sD    fdd}| D ]}||E d H  q|  D ]}||E d H  q,d S )Nc                 3   sF   t | tr| V  n0 dkrBt | ttfrBtt| i  d E d H  d S )Nr   r   )rM   r   rN   rY   rL   )r/   ru   r1   r2   check'  s    
ziter_tensors.<locals>.check)values)rR   rS   ru   rw   r/   kwargr1   rv   r2   rL   $  s
    rL   )opr   c                 C   sB   t | tr| S t | tjr | j} t | ts.t| tkr>t|  S d S r5   )rM   r    r   
OpOverloadr7   rX   AssertionErrorr@   )rz   r1   r1   r2   _maybe_get_opdef3  s    
r}   )N)r   )$rO   weakreftypingr   r   r   r   r   r   r   r	   r
   r   Ztorch.utils._exposed_inr    r   r   r   r   r   r   r   rX   rq   r4   r    rr   __annotations__WeakValueDictionaryr@   r=   intrL   r{   r}   r1   r1   r1   r2   <module>   sJ    0  v     
 