U
    Mh#                     @   s   d dl Z d dlmZ d dlmZmZmZmZmZm	Z	m
Z
mZmZ d dlmZ d dlmZ d dlmZ d dlmZ d dlmZmZ d	d
gZedddZedG dd
 d
ee Zdd ZedG dd	 d	eZdS )    N)
namedtuple)	CallableIteratorSizedTypeVarOptionalUnionAnyDictList)functional_datapipe)default_collate)dataframe_wrapper)IterDataPipe)_check_unpickable_fnvalidate_input_colCollatorIterDataPipeMapperIterDataPipeT_coT)	covariantmapc                       sf   e Zd ZU dZeed< eed< deedd fddZdd	 Ze	e
 d
ddZed
ddZ  ZS )r   a  
    Applies a function over each item from the source DataPipe (functional name: ``map``).

    The function can be any regular Python function or partial object. Lambda
    function is not recommended as it is not supported by pickle.

    Args:
        datapipe: Source Iterable DataPipe
        fn: Function being applied over each item
        input_col: Index or indices of data which ``fn`` is applied, such as:

            - ``None`` as default to apply ``fn`` to the data directly.
            - Integer(s) is used for list/tuple.
            - Key(s) is used for dict.

        output_col: Index of data where result of ``fn`` is placed. ``output_col`` can be specified
            only when ``input_col`` is not ``None``

            - ``None`` as default to replace the index that ``input_col`` specified; For ``input_col`` with
              multiple indices, the left-most one is used, and other indices will be removed.
            - Integer is used for list/tuple. ``-1`` represents to append result at the end.
            - Key is used for dict. New key is acceptable.

    Example:
        >>> # xdoctest: +SKIP
        >>> from torchdata.datapipes.iter import IterableWrapper, Mapper
        >>> def add_one(x):
        ...     return x + 1
        >>> dp = IterableWrapper(range(10))
        >>> map_dp_1 = dp.map(add_one)  # Invocation via functional form is preferred
        >>> list(map_dp_1)
        [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
        >>> # We discourage the usage of `lambda` functions as they are not serializable with `pickle`
        >>> # Use `functools.partial` or explicitly define the function instead
        >>> map_dp_2 = Mapper(dp, lambda x: x + 1)
        >>> list(map_dp_2)
        [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    datapipefnN)r   r   returnc                    sz   t    || _t| || _|| _|d kr<|d k	r<tdt|tt	frft
|dkr^td|d }|| _t|| d S )Nz3`output_col` must be None when `input_col` is None.   z3`output_col` must be a single-element list or tupler   )super__init__r   r   r   	input_col
ValueError
isinstancelisttuplelen
output_colr   )selfr   r   r   r#   	__class__ Z/var/www/html/venv/lib/python3.8/site-packages/torch/utils/data/datapipes/iter/callable.pyr   B   s    
zMapperIterDataPipe.__init__c                    s   | j d kr| jd kr|  S | j d kr4|  }nDt| j ttfrht fdd| j D }| j| }n|  | j  }t trd}t  nd}| jd krt| j ttfr| | j d < t| j dd  ddD ]
} |= qn
| | j < n"| jdkr | n
| | j< |rt S  S )	Nc                 3   s   | ]} | V  qd S Nr'   ).0coldatar'   r(   	<genexpr>`   s     z/MapperIterDataPipe._apply_fn.<locals>.<genexpr>TFr   r   )reverse)r   r#   r   r   r    r!   sortedappend)r$   r-   resargsZt_flagidxr'   r,   r(   	_apply_fnY   s,    






zMapperIterDataPipe._apply_fn)r   c                 c   s   | j D ]}| |V  qd S r)   )r   r6   )r$   r-   r'   r'   r(   __iter__|   s    
zMapperIterDataPipe.__iter__c                 C   s.   t | jtrt| jS tt| j dd S )Nz# instance doesn't have valid length)r   r   r   r"   	TypeErrortype__name__)r$   r'   r'   r(   __len__   s
    
zMapperIterDataPipe.__len__)NN)r:   
__module____qualname____doc__r   __annotations__r   r   r6   r   r   r7   intr;   __classcell__r'   r'   r%   r(   r      s   
'  #c                 C   s   t |jdkrtd|d }t|}g }g }|  D ]}||kr8tdq8|D ]}|| kr|t| | srtd| | }nHzdd lm} |j	
 }W n, tk
r }	 ztd|	W 5 d }	~	X Y nX |t| ||| }
||
 qRtd|}|| }|S )Nr   z%Only supports one DataFrame per batchr   zConversion keys missmatchz5Collate (DF)DataPipe requires callable as dict valuesz?unable to import default collation function from the TorchArrowZCollateResult)r"   items	Exception
df_wrapperZget_columnskeyscallableZtorcharrow.pytorchZpytorchZrecZDefaultr2   strr   )
conversionitemdfZcolumns_nameZtuple_namesZtuple_valuesnameZcollation_fntapevalueZtpl_clsr!   r'   r'   r(   _collate_helper   s2    



rO   Zcollatec                
       sb   e Zd ZdZedfeeeede	f e
eee	f eee	f f f  ee dd fddZ  ZS )r   af  
    Collates samples from DataPipe to Tensor(s) by a custom collate function (functional name: ``collate``).

    By default, it uses :func:`torch.utils.data.default_collate`.

    .. note::
        While writing a custom collate function, you can import :func:`torch.utils.data.default_collate` for the
        default behavior and `functools.partial` to specify any additional arguments.

    Args:
        datapipe: Iterable DataPipe being collated
        collate_fn: Customized collate function to collect and combine data or a batch of data.
            Default function collates to Tensor(s) based on data type.

    Example:
        >>> # xdoctest: +SKIP
        >>> # Convert integer data to float Tensor
        >>> class MyIterDataPipe(torch.utils.data.IterDataPipe):
        ...     def __init__(self, start, end):
        ...         super(MyIterDataPipe).__init__()
        ...         assert end > start, "this example code only works with end >= start"
        ...         self.start = start
        ...         self.end = end
        ...
        ...     def __iter__(self):
        ...         return iter(range(self.start, self.end))
        ...
        ...     def __len__(self):
        ...         return self.end - self.start
        ...
        >>> ds = MyIterDataPipe(start=3, end=7)
        >>> print(list(ds))
        [3, 4, 5, 6]
        >>> def collate_fn(batch):
        ...     return torch.tensor(batch, dtype=torch.float)
        ...
        >>> collated_ds = CollateIterDataPipe(ds, collate_fn=collate_fn)
        >>> print(list(collated_ds))
        [tensor(3.), tensor(4.), tensor(5.), tensor(6.)]
    N.)r   rH   
collate_fnr   c                    sT   |d k	rt  j||d n6t|r4t  j||d ntt|}t  j||d d S )N)r   )r   r   rF   	functoolspartialrO   )r$   r   rH   rP   r%   r'   r(   r      s    zCollatorIterDataPipe.__init__)r:   r<   r=   r>   r   r   r   r   r   r	   r
   rG   r   rA   r'   r'   r%   r(   r      s   1
)rQ   collectionsr   typingr   r   r   r   r   r   r	   r
   r   Z%torch.utils.data.datapipes._decoratorr   Ztorch.utils.data._utils.collater   Z$torch.utils.data.datapipes.dataframer   rD   Z#torch.utils.data.datapipes.datapiper   Z'torch.utils.data.datapipes.utils.commonr   r   __all__r   r   rO   r   r'   r'   r'   r(   <module>   s    ,q&