U
    yh.1                  
   @   s  d dl Z d dlZd dlZd dlmZmZ d dlmZmZm	Z	 d dl
mZ d dlZd dlmZ d dlmZ d dlmZ d dlmZ d dlmZ d d	lmZmZ d d
lmZmZ d dlmZ d dl m!Z! d dl"m#Z# d dl$m%Z% ddl&m'Z'm(Z(m)Z) dddgZ*ede+dd$ee#eej, e-e.ee edddZ/edde'dddddee	e0ej1df ee# ee eej, edddZ2edddddddee	e0ej1df ee# ee eej, edddZ3eedd d!Z4d%ee#eej, e-e.ee edd"d#Z5dS )&    N)FutureThreadPoolExecutor)castOptionalUnion)
deprecated)_offload_state_dict_to_cpu)_storage_setup)DefaultSavePlanner)_dcp_method_logger)MetadataSTATE_DICT_TYPE)SavePlanSavePlanner)AsyncStager)Stateful)StorageWriter)_get_default_group   )_api_bc_check_DistWrapper_profilesave_state_dictsave
async_saveza`save_state_dict` is deprecated and will be removed in future versions.Please use `save` instead.)categoryF)
state_dictstorage_writerprocess_groupcoordinator_rankno_distplannerreturnc              
   C   s8   |   t   t| |||||W  5 Q R  S Q R X dS )z3This method is deprecated. Please switch to 'save'.N)resetr   _save_state_dict)r   r   r   r   r    r!    r%   _/var/www/html/venv/lib/python3.8/site-packages/torch/distributed/checkpoint/state_dict_saver.pyr      s    T)Zlog_exceptionscheckpoint_idr   r!   r   )r   r(   r   r!   r   r"   c             
   C   st   t jd t ot  }|r,td t 8 t	t
t||dd}tt| ||||dW  5 Q R  S Q R X dS )a  
    Save a distributed model in SPMD style.

    This function is different from ``torch.save()`` as it handles
    ``ShardedTensor`` , and ``DTensor`` by having each rank only save their local shards.

    For each ``Stateful`` object (having both a ``state_dict`` and a ``load_state_dict``),
    save will call ``state_dict`` before serialization.

    .. warning::
        There is no guarantees of Backwards Compatibility across PyTorch versions
        for saved state_dicts.

    .. warning::
        If using the `process_group` argument, make sure that only its ranks
        call `save_state_dict` and that all data in state_dict belong to it.

    .. note::
        When saving checkpoint for FSDP's `ShardingStrategy.HYBRID_SHARD`, only one of
        the shard_group should be calling `save_state_dict` and the corresponding process
        group needs to be passed in.

    .. note::
        If no process group is available, this function assumes the intention is to save the
         state_dict in the local process.

    .. note:
        Rank 0 is assumed to be the coordinator rank.


    Args:
        state_dict (Dict[str, Any]): The state_dict to save.
        checkpoint_id (Union[str, os.PathLike, None]):
            The ID of this checkpoint instance. The meaning of the checkpoint_id
            depends on the storage. It can be a path to a folder or to a file.
            It can also be a key if the storage is a key-value store.
            (Default: ``None``)
        storage_writer (Optional[StorageWriter]):
            Instance of StorageWriter used to perform writes. If this is not
            specified, DCP will automatically infer the writer based on the
            checkpoint_id. If checkpoint_id is also None, an exception will
            be raised. (Default: ``None``)
        planner (Optional[SavePlanner]):
            Instance of SavePlanner. If this is not specificed, the default
            planner will be used. (Default: ``None``)
        process_group (Optional[ProcessGroup]):
            ProcessGroup to be used for cross-rank synchronization.
            (Default: ``None``)

    Returns:
        Metadata: Metadata object for the saved checkpoint.

    Example:
        >>> # xdoctest: +SKIP
        >>> my_model = MyModule()

        >>> state_dict = {"model": my_model}

        >>> fs_storage_writer = torch.distributed.checkpoint.FileSystemWriter("/checkpoint/1")
        >>> torch.distributed.checkpoint.save(
        >>>     state_dict=state_dict,
        >>>     storage_writer=fs_storage_writer,
        >>> )

    .. note::
        save_state_dict uses collectives to coordinate writes across ranks.
        For NCCL-based process groups, internal tensor representations of
        objects must be moved to the GPU device before communication takes place.
        In this case, the device used is given by ``torch.cuda.current_device()``
        and it is the user's responsibility to ensure that this is set so that
        each rank has an individual GPU, via ``torch.cuda.set_device()``.
    z!torch.distributed.checkpoint.savezftorch.distributed is unavailable or uninitialized, assuming the intent is to save in a single process.Freader)r   r   r   r    r!   N)torch_C_log_api_usage_oncedistis_availableis_initializedwarningswarnr   r   r   r	   r$   _stateful_to_state_dict)r   r(   r   r!   r   r    r%   r%   r&   r   9   s$    R c                   s   t jd t r>t r>|p$t }t d|jks>t	dt
tt||dd}t| } t|trp|| }nt| dd}tdd  jt|||||d	}| fd
d t|tr|jr|  |S )a$  Asynchronous version of ``save``. This code first de-stages the state_dict on CPU, and then calls
    `save` in a separate thread.

    .. warning::
        This feature is experimental and subject to change.

    Args:
        state_dict (Dict[str, Any]): The state_dict to save.
        checkpoint_id (Union[str, os.PathLike, None]):
            The ID of this checkpoint instance. The meaning of the checkpoint_id
            depends on the storage. It can be a path to a folder or to a file.
            It can also be a key if the storage is a key-value store.
            (Default: ``None``)
        storage_writer (Optional[StorageWriter]):
            Instance of StorageWriter used to perform writes. If this is not
            specified, DCP will automatically infer the writer based on the
            checkpoint_id. If checkpoint_id is also None, an exception will
            be raised. (Default: ``None``)
        planner (Optional[SavePlanner]):
            Instance of SavePlanner. If this is not specificed, the default
            planner will be used. (Default: ``None``)
        process_group (Optional[ProcessGroup]):
            ProcessGroup to be used for cross-rank synchronization.
            (Default: ``None``)

    Returns:
        Future: A future holding the resultant Metadata object from `save`.

    Example:
        >>> # xdoctest: +SKIP
        >>> my_model = MyModule()

        >>> state_dict = {"model": my_model}

        >>> fs_storage_writer = torch.distributed.checkpoint.FileSystemWriter("/checkpoint/1")
        >>> checkpoint_future = torch.distributed.checkpoint.async_save(
        >>>     state_dict=state_dict,
        >>>     storage_writer=fs_storage_writer,
        >>> )
        >>>
        >>> # ... do some work ...
        >>>
        >>> checkpoint_future.result()

    z'torch.distributed.checkpoint.async_savecpuzfA CPU backend must be enabled for async save; try initializing process group with 'cpu:gloo,cuda:nccl'Fr)   )Z
type_checkr   )max_workersr'   c                    s    j ddS )NF)wait)shutdown)fexecutorr%   r&   <lambda>       zasync_save.<locals>.<lambda>)r+   r,   r-   r.   r/   r0   r   ZdeviceZ_device_typesAssertionErrorr   r   r	   r3   
isinstancer   Zstager   r   Zsubmitr   Zadd_done_callbackZ should_synchronize_after_executeZsynchronize_staging)r   r(   r   r!   r   ZpgZstaged_state_dictr8   r%   r9   r&   r      s>    6
 

)r   r"   c                 C   s4   i }|   D ]"\}}t|tr&| n|||< q|S )z]Creates a shallow copy of `state_dict` where `state_dict` is called for each Stateful object.)itemsr>   r   r   )r   Zstateful_state_dictkeyelemr%   r%   r&   r3      s
    r3   c                    s   t jd t|| |d kr(t d k	s4td i }tdd  }d k	rX||d< tf |fdd}tf |fdd}	d||	 tf | fdd	}
tf |fd
d}	d|
|S )Nz,torch.distributed.checkpoint.save_state_dictr(   c                     st   d k	st  } dtjjkr@td  j nj|  jd 	 j 
 }|}|S )Nstorage_metazThe function definition for SavePlanner.set_up_planner has been updated to include the storage_meta argument. Please update your implementation to include this parameter.)r   rB   is_coordinator)r=   rB   inspect	signatureZset_up_planner
parametersr1   r2   rC   Zset_up_storage_writerZcreate_local_planZprepare_local_plan)rB   Z
local_plan)distWr!   r   r   r%   r&   
local_step  s     
z$_save_state_dict.<locals>.local_stepc                    s(   d k	st | \}  | } | S N)r=   Zcreate_global_planZprepare_global_plan)Zall_local_plans)global_metadatar!   r   r%   r&   global_step3  s    
z%_save_state_dict.<locals>.global_stepZplanc                     s2   d k	st  } | }|  | S rI   )r=   Zfinish_plan
write_datar6   value)Zfinal_local_planZ
all_writes)central_planr!   r   r%   r&   rL   >  s
    
z$_save_state_dict.<locals>.write_datac                    s    d k	st j | d  S )N)metadataresults)r=   finish)Zall_results)rJ   r   r%   r&   finish_checkpointG  s    z+_save_state_dict.<locals>.finish_checkpointwrite)
r+   r,   r-   r   r
   r=   getattrr   Zreduce_scatterZ
all_reduce)r   r   r   r   r    r!   Zckpt_kwargsZckpt_idrH   rK   rL   rR   r%   )rN   rG   rJ   r!   r   r   r&   r$     s&    r$   )Nr   FN)Nr   FN)6rD   osr1   concurrent.futuresr   r   typingr   r   r   typing_extensionsr   r+   Ztorch.distributeddistributedr.   Z#torch.distributed._state_dict_utilsr   Z+torch.distributed.checkpoint._storage_utilsr	   Z,torch.distributed.checkpoint.default_plannerr
   Z#torch.distributed.checkpoint.loggerr   Z%torch.distributed.checkpoint.metadatar   r   Z$torch.distributed.checkpoint.plannerr   r   Z$torch.distributed.checkpoint.stagingr   Z%torch.distributed.checkpoint.statefulr   Z$torch.distributed.checkpoint.storager   Z"torch.distributed.distributed_c10dr   utilsr   r   r   __all__FutureWarningZProcessGroupintboolr   strPathLiker   r   r3   r$   r%   r%   r%   r&   <module>   s   
    f[    