U
    Mh
                     @   s|   d dl mZmZmZ d dlmZ d dlmZ d dlm	Z	 ddgZ
G dd de	ZG dd	 d	eZed
G dd deZdS )    )DictSizedTuple)functional_datapipe)IterDataPipe)IntEnumSHARDING_PRIORITIESShardingFilterIterDataPipec                   @   s   e Zd ZdZdZdZdS )r            N)__name__
__module____qualname__DEFAULTZDISTRIBUTEDZMULTIPROCESSING r   r   Z/var/www/html/venv/lib/python3.8/site-packages/torch/utils/data/datapipes/iter/sharding.pyr      s   c                   @   s   e Zd ZeeedddZdS )_ShardingIterDataPipe)num_of_instancesinstance_idsharding_groupc                 C   s   t d S N)NotImplementedErrorselfr   r   r   r   r   r   apply_sharding   s    z$_ShardingIterDataPipe.apply_shardingN)r   r   r   intr   r   r   r   r   r   r      s   r   Zsharding_filterc                   @   sF   e Zd ZdZdedddZejfddZdd	 Z	d
d Z
dd ZdS )r	   ao  
    Wrapper that allows DataPipe to be sharded (functional name: ``sharding_filter``).

    After ``apply_sharding`` is called, each instance of the DataPipe (on different workers) will have every `n`-th element of the
    original DataPipe, where `n` equals to the number of instances.

    Args:
        source_datapipe: Iterable DataPipe that will be sharded
    N)source_datapipec                 C   s*   || _ || _i | _d| _d| _|   d S Nr
   r   )r   sharding_group_filtergroupsr   r   _update_num_of_instances)r   r   r   r   r   r   __init__)   s    z#ShardingFilterIterDataPipe.__init__c                 C   sv   ||krt d| d| d|tjkrHt| jr\tj| jkr\tdntj| jkr\td||f| j|< |   d S )Nzinstance_id(z*) should be smaller than num_of_instances()z8ShardingFilter cannot mix DEFAULT and non DEFAULT groups)
ValueErrorr   r   lenr    	Exceptionr!   r   r   r   r   r   1   s    

z)ShardingFilterIterDataPipe.apply_shardingc                 C   s   g }t | j D ](}| jd ks*|| jkr|| j|  q|  d| _d| _|D ]*\}}|  j| j| 7  _|  j|9  _qTd S r   )sortedr    keysr   appendreverser   r   )r   Zsorted_sharding_groupskeyZgroup_num_of_instancesZgroup_instance_idr   r   r   r!   =   s    z3ShardingFilterIterDataPipe._update_num_of_instancesc                 c   s.   t | jD ]\}}|| j | jkr
|V  q
d S r   )	enumerater   r   r   )r   iitemr   r   r   __iter__L   s    z#ShardingFilterIterDataPipe.__iter__c                 C   sR   t | jtr:t| j| j | jt| j| j k r4dnd S tt| j dd S )Nr
   r   z# instance doesn't have valid length)	
isinstancer   r   r%   r   r   	TypeErrortyper   )r   r   r   r   __len__Q   s
    z"ShardingFilterIterDataPipe.__len__)N)r   r   r   __doc__r   r"   r   r   r   r!   r/   r3   r   r   r   r   r	      s   
N)typingr   r   r   Z%torch.utils.data.datapipes._decoratorr   Z#torch.utils.data.datapipes.datapiper   enumr   __all__r   r   r	   r   r   r   r   <module>   s   