U
    yhO                     @   s   d dl mZ d dlmZ d dlmZ dZdZdddd	gZee	d
ddZ
eeedddZdeeeee	ee dddZeeedddZdeee	dddd	ZdS )    )	timedelta)List)contextmanagerz/num_membersz/last_memberstore_timeoutget_allsynchronizebarrier)timeoutc                 c   s*   | j }| t|d dV  | | dS )z
    This sets the timeout and then restores the old timeout when the context
    manager exits.

    Args:
        store: the store to set the timeout on
        timeout: the timeout to set
    )secondsN)r	   Zset_timeoutr   )storer	   Zold_timeout r   W/var/www/html/venv/lib/python3.8/site-packages/torch/distributed/elastic/utils/store.pyr      s    )rankprefix
world_sizec                    sF   |   fddt|D }t| |  dd}|dkrB| | |S )aa  
    Given a store and a prefix, the method goes through the array of keys
    of the following format: ``{prefix}{idx}``, where idx is in a range
    from 0 to size, and tries to retrieve the data.

    The Rank0 process waits at the end to make sure all other processes
    finished the procedure before exiting.

    Usage

    ::

     values = get_all(store, 'torchelastic/data', 3)
     value1 = values[0] # retrieves the data for key torchelastic/data0
     value2 = values[1] # retrieves the data for key torchelastic/data1
     value3 = values[2] # retrieves the data for key torchelastic/data2

    c                    s   g | ]}  | qS r   r   ).0idxr   r   r   
<listcomp>8   s     zget_all.<locals>.<listcomp>z	/finishedr   r   
key_prefixr   )Z	multi_getrange_barrier_nonblockingget)r   r   r   r   Zdata_arrZbarrier_keyr   r   r   r   $   s    
,  )datar   r   r   r	   returnc              
   C   sH   t | |4 | | | | t| |||}|W  5 Q R  S Q R X dS )aT  
    Synchronizes ``world_size`` agents between each other using the underlying c10d store.
    The ``data`` will be available on each of the agents.

    Note: The data on the path is not deleted, as a result there can be stale data if
        you use the same key_prefix twice.

    Time complexity: O(N) per worker, O(N^2) globally.
    N)r   setr   )r   r   r   r   r   r	   Z
agent_datar   r   r   r   I   s    )r   r   r   c                 C   s4   |t  }|t }| |d}||kr0| |d |S )zq
    Does all the non-blocking operations for a barrier and returns the final key
    that can be waited on.
       z<val_ignored>)_NUM_MEMBERS_LAST_MEMBER_CHECKINaddr   )r   r   r   Znum_members_keylast_member_keyr   r   r   r   r   `   s    r   N)r   r   barrier_timeoutr   c              	   C   s2   t | | t| ||d}| | W 5 Q R X dS )a  
    A global lock between agents. This will pause all workers until at least
    ``world_size`` workers respond.

    This uses a fast incrementing index to assign waiting ranks and a success
    flag set by the last worker.

    Time complexity: O(1) per worker, O(N) globally.

    Note: Since the data is not removed from the store, the barrier can be used
        once per unique ``key_prefix``.
    r   N)r   r   r   )r   r   r   r#   r"   r   r   r   r   p   s    )r   )r   )datetimer   typingr   
contextlibr   r   r    __all__floatr   intstrr   bytesr   r   r   r   r   r   r   <module>
   s2   +    