U
    Mh/                     @   s   d dl Z d dlmZ d dlmZmZmZmZmZm	Z	m
Z
mZ d dlZd dlmZ d dlmZmZ d dlmZ ddd	gZed
ddZedddZedG dd dee ZedG dd	 d	eZedG dd dee ZdS )    N)defaultdict)AnyCallableDefaultDictIteratorListOptionalSizedTypeVar)functional_datapipe)	DataChunkIterDataPipe)_check_unpickable_fnBatcherIterDataPipeGrouperIterDataPipeUnBatcherIterDataPipeT_coT)	covariantnamec                 C   sR   | dkr:t jd|  d|  dtdd ttjjjjj	| S t
dt d|  d S )	N)ZSHARDING_PRIORITIESZShardingFilterIterDataPipe`zc` from `torch.utils.data.datapipes.iter.grouping` is going to be removed in PyTorch 2.1Please use `z5` from the `torch.utils.data.datapipes.iter.sharding`   )category
stacklevelzmodule z has no attribute )warningswarnFutureWarninggetattrtorchutilsdataZ	datapipesiterZshardingAttributeError__name__r    r$   Z/var/www/html/venv/lib/python3.8/site-packages/torch/utils/data/datapipes/iter/grouping.py__getattr__   s     r&   batchc                       sl   e Zd ZU dZeed< eed< eed< defeeedd fdd	Z	e
e d
ddZed
ddZ  ZS )r   a2  
    Creates mini-batches of data (functional name: ``batch``).

    An outer dimension will be added as ``batch_size`` if ``drop_last`` is set to ``True``, or ``length % batch_size`` for the
    last batch if ``drop_last`` is set to ``False``.

    Args:
        datapipe: Iterable DataPipe being batched
        batch_size: The size of each batch
        drop_last: Option to drop the last batch if it's not full
        wrapper_class: wrapper to apply onto each batch (type ``List``) before yielding,
            defaults to ``DataChunk``

    Example:
        >>> # xdoctest: +SKIP
        >>> from torchdata.datapipes.iter import IterableWrapper
        >>> dp = IterableWrapper(range(10))
        >>> dp = dp.batch(batch_size=3, drop_last=True)
        >>> list(dp)
        [[0, 1, 2], [3, 4, 5], [6, 7, 8]]
    datapipe
batch_size	drop_lastFN)r(   r)   r*   returnc                    s6   |dkst dt   || _|| _|| _|| _d S )Nr   z+Batch size is required to be larger than 0!)AssertionErrorsuper__init__r(   r)   r*   wrapper_class)selfr(   r)   r*   r/   	__class__r$   r%   r.   :   s    
zBatcherIterDataPipe.__init__r+   c                 c   sZ   g }| j D ],}|| t|| jkr
| |V  g }q
t|dkrV| jsV| |V  d S Nr   )r(   appendlenr)   r/   r*   )r0   r'   xr$   r$   r%   __iter__G   s    

zBatcherIterDataPipe.__iter__c                 C   sV   t | jtr>| jr"t| j| j S t| j| j d | j S ntt| j dd S )N   z# instance doesn't have valid length)	
isinstancer(   r	   r*   r6   r)   	TypeErrortyper#   r0   r$   r$   r%   __len__R   s
    zBatcherIterDataPipe.__len__)r#   
__module____qualname____doc__r   __annotations__intboolr   r.   r   r8   r>   __classcell__r$   r$   r1   r%   r      s   
Zunbatchc                   @   s2   e Zd ZdZdeedddZdd Zdd	 Zd
S )r   a   
    Undos batching of data (functional name: ``unbatch``).

    In other words, it flattens the data up to the specified level within a batched DataPipe.

    Args:
        datapipe: Iterable DataPipe being un-batched
        unbatch_level: Defaults to ``1`` (only flattening the top level). If set to ``2``,
            it will flatten the top two levels, and ``-1`` will flatten the entire DataPipe.

    Example:
        >>> # xdoctest: +SKIP
        >>> from torchdata.datapipes.iter import IterableWrapper
        >>> source_dp = IterableWrapper([[[0, 1], [2]], [[3, 4], [5]], [[6]]])
        >>> dp1 = source_dp.unbatch()
        >>> list(dp1)
        [[0, 1], [2], [3, 4], [5], [6]]
        >>> dp2 = source_dp.unbatch(unbatch_level=2)
        >>> list(dp2)
        [0, 1, 2, 3, 4, 5, 6]
    r9   r(   unbatch_levelc                 C   s   || _ || _d S NrF   )r0   r(   rG   r$   r$   r%   r.   t   s    zUnBatcherIterDataPipe.__init__c                 c   s&   | j D ]}| j|| jdE d H  qd S )NrG   )r(   _diverG   )r0   elementr$   r$   r%   r8   z   s    
zUnBatcherIterDataPipe.__iter__c                 c   s   |dk rt d|dkrNt|ttfrF|D ]}| j|ddE d H  q*q|V  nT|dkr^|V  nDt|ttfr|D ]}| j||d dE d H  qpntd| j dd S )Nz unbatch_level must be -1 or >= 0rI   r   r9   zunbatch_level z" exceeds the depth of the DataPipe)
ValueErrorr:   listr   rJ   
IndexErrorrG   )r0   rK   rG   itemr$   r$   r%   rJ   ~   s    zUnBatcherIterDataPipe._diveN)r9   )	r#   r?   r@   rA   r   rC   r.   r8   rJ   r$   r$   r$   r%   r   \   s    groupbyc                	   @   s   e Zd ZdZddddddee eegef ee	e
e	 e
e	 edddZd	d
 Zdd ZddddZdd Zdd Zdd ZdS )r   a!
  
    Groups data from IterDataPipe by keys from ``group_key_fn``, yielding a ``DataChunk`` with batch size up to ``group_size``.

    (functional name: ``groupby``).

    The samples are read sequentially from the source ``datapipe``, and a batch of samples belonging to the same group
    will be yielded as soon as the size of the batch reaches ``group_size``. When the buffer is full,
    the DataPipe will yield the largest batch with the same key, provided that its size is larger
    than ``guaranteed_group_size``. If its size is smaller, it will be dropped if ``drop_remaining=True``.

    After iterating through the entirety of source ``datapipe``, everything not dropped due to the buffer capacity
    will be yielded from the buffer, even if the group sizes are smaller than ``guaranteed_group_size``.

    Args:
        datapipe: Iterable datapipe to be grouped
        group_key_fn: Function used to generate group key from the data of the source datapipe
        keep_key: Option to yield the matching key along with the items in a tuple,
            resulting in `(key, [items])` otherwise returning [items]
        buffer_size: The size of buffer for ungrouped data
        group_size: The max size of each group, a batch is yielded as soon as it reaches this size
        guaranteed_group_size: The guaranteed minimum group size to be yielded in case the buffer is full
        drop_remaining: Specifies if the group smaller than ``guaranteed_group_size`` will be dropped from buffer
            when the buffer is full

    Example:
        >>> import os
        >>> # xdoctest: +SKIP
        >>> from torchdata.datapipes.iter import IterableWrapper
        >>> def group_fn(file):
        ...     return os.path.basename(file).split(".")[0]
        >>> source_dp = IterableWrapper(["a.png", "b.png", "a.json", "b.json", "a.jpg", "c.json"])
        >>> dp0 = source_dp.groupby(group_key_fn=group_fn)
        >>> list(dp0)
        [['a.png', 'a.json', 'a.jpg'], ['b.png', 'b.json'], ['c.json']]
        >>> # A group is yielded as soon as its size equals to `group_size`
        >>> dp1 = source_dp.groupby(group_key_fn=group_fn, group_size=2)
        >>> list(dp1)
        [['a.png', 'a.json'], ['b.png', 'b.json'], ['a.jpg'], ['c.json']]
        >>> # Scenario where `buffer` is full, and group 'a' needs to be yielded since its size > `guaranteed_group_size`
        >>> dp2 = source_dp.groupby(group_key_fn=group_fn, buffer_size=3, group_size=3, guaranteed_group_size=2)
        >>> list(dp2)
        [['a.png', 'a.json'], ['b.png', 'b.json'], ['a.jpg'], ['c.json']]
    Fi'  N)keep_keybuffer_size
group_sizeguaranteed_group_sizedrop_remaining)r(   group_key_fnrR   rS   rT   rU   rV   c                C   s   t | || _|| _|| _|| _tt| _d| _|| _	d | _
|d k	rl|d k	rld|  k r`|ksfn t|| _
|d k	r|d k	rd|  k r|ksn t|| _
|| _t| _d S r4   )r   r(   rW   rR   max_buffer_sizer   rN   buffer_elementscurr_buffer_sizerT   rU   r,   rV   r   r/   )r0   r(   rW   rR   rS   rT   rU   rV   r$   r$   r%   r.      s"    	
"zGrouperIterDataPipe.__init__c                 C   s   d }d}d }| j  D ](}t| j | |krt| j | }|}q| jd k	rn|| jk rn| jsntdt| j | | jd ks|| jkr| j | }|  j|8  _| j |= |S )Nr   zFailed to group items)rY   keysr6   rU   rV   RuntimeErrorstrrZ   )r0   Zbiggest_keyZbiggest_sizeresult_to_yieldZfindkeyr$   r$   r%   _remove_biggest_key   s    
z'GrouperIterDataPipe._remove_biggest_keyc                 c   s"  | j D ]}| |}| j| | |  jd7  _| jd k	r| jt| j| kr| | j| }| jrn||fn|V  |  jt| j| 8  _| j|= | j| j	kr| 
 }|d k	r| |}| jr||fn|V  qt| j D ]>}| | j|}|  jt|8  _| jr||fn|V  qd S )Nr9   )r(   rW   rY   r5   rZ   rT   r6   r/   rR   rX   r_   tupler[   pop)r0   r7   keyresultr^   r$   r$   r%   r8      s$    


zGrouperIterDataPipe.__iter__r3   c                 C   s   d| _ tt| _d S r4   )rZ   r   rN   rY   r=   r$   r$   r%   reset  s    zGrouperIterDataPipe.resetc              
   C   sD   | j | j| j| j| j| j| j| j| j| j	f
}t
jd k	r@t
|S |S rH   )r(   rW   rR   rX   rT   rU   rV   r/   _valid_iterator_id_number_of_samples_yieldedr   Zgetstate_hookr0   stater$   r$   r%   __getstate__  s    

z GrouperIterDataPipe.__getstate__c                 C   s@   |\
| _ | _| _| _| _| _| _| _| _| _	d| _
tt| _d S r4   )r(   rW   rR   rX   rT   rU   rV   r/   re   rf   rZ   r   rN   rY   rg   r$   r$   r%   __setstate__  s    z GrouperIterDataPipe.__setstate__c                 C   s   | j   d S rH   )rY   clearr=   r$   r$   r%   __del__,  s    zGrouperIterDataPipe.__del__)r#   r?   r@   rA   r   r   r   r   rD   rC   r   r.   r_   r8   rd   ri   rj   rl   r$   r$   r$   r%   r      s*   0)r   collectionsr   typingr   r   r   r   r   r   r	   r
   Z(torch.utils.data.datapipes.iter.shardingr   Z%torch.utils.data.datapipes._decoratorr   Z#torch.utils.data.datapipes.datapiper   r   Z'torch.utils.data.datapipes.utils.commonr   __all__r   r]   r&   r   r   r   r$   r$   r$   r%   <module>   s$   (
=4