U
    yh'                     @   s  d dl Z d dlZd dlmZ d dlmZmZmZmZm	Z	 d dl
Z
d dlmZ d dlmZ d dlmZmZ d dlmZ d dlmZmZ d dlmZmZmZmZmZ d d	lmZm Z m!Z! d d
l"m#Z# d dl$m%Z% d dl&m'Z' d dl(m)Z) d dl*m+Z+ ddddgZ,G dd de)Z-G dd deZ.e	e/ej0f e	e/ej0f dddZ1e	e/ej0f e	e/ej0f dddZ2e3dkrG dd deZ4e 5 Z6e6j7de/ddd e4D e4j8d  e6j7d!e/d"d# e6j7d$e/d%d# e69 Z:e;d&e:j< d'e:j= d(e:j> d) d*e:j< d+Z?e:j>e4j8j@kr8ejABe:j<r.e2e:j<e:j= ne;e? nJe:j>e4jCj@krrejADe:j<rhe1e:j<e:j= ne;e? neEd,e:j> dS )-    N)Enum)castDictListOptionalUnion)narrow_tensor_by_index)FileSystemReaderFileSystemWriter)flatten_state_dict)_EmptyStateDictLoadPlannerDefaultLoadPlanner)MetadataSTATE_DICT_TYPESTORAGE_TYPESTensorPropertiesTensorStorageMetadata)LoadItemTypeLoadPlanLoadPlanner)_create_chunk_list)_load_state_dict)_save_state_dict)StorageReader)Futuredcp_to_torch_savetorch_save_to_dcpBroadcastingTorchSaveReaderDynamicMetaLoadPlannerc                   @   s   e Zd ZdZdeeeejf  e	ddddZ
eddd	Zeeed d
ddZeeddddZeedddZee ee dddZdeeejdf ddddZeeeejf edddZdS )r   aI  
    StorageReader for reading a Torch Save file. This reader will read the entire checkpoint
    on the coordinator rank, and then broadcast and shard each tensor to all ranks.

    . N.B. Intended to be used with DynamicMetaLoadPlanner

    .. warning::
        Current implementation only supports loading Tensors.

    >>> # xdoctest: +SKIP("undefined vars")
    >>> sd = {"mode": model}
    >>> dcp.load(
    >>>    sd,
    >>>    storage_reader=BroadcastingTorchSaveReader(),
    >>>    planner=DynamicMetaLoadPlanner(),
    >>>    checkpoint_id="path_to_model.pt"
    >>> )
    Nr   )checkpoint_idcoordinator_rankreturnc                 C   s   || _ || _d S )N)r   r    )selfr   r     r#   [/var/www/html/venv/lib/python3.8/site-packages/torch/distributed/checkpoint/format_utils.py__init__;   s    z$BroadcastingTorchSaveReader.__init__)r!   c                 C   s
   t i dS )zGExtends the default StorageReader to support building the metadata filestate_dict_metadata)r   )r"   r#   r#   r$   read_metadataC   s    z)BroadcastingTorchSaveReader.read_metadata)planplannerr!   c           	   	   C   s<  t t|}| jrD| jdk	sttj| jddd}|jrHt|\}}nd}|jD ]}|j	t
jkr~td|jj dt	| j d| jr||jj  }nt|j|jj }tj|| jdd t||j|j}|| }| | kstd	|j d
|  d|  || ||| qNt }|d |S )z
        Reads torch save data on the coordinator rank, and broadcast afterwards
        this incurrs a communication cost, but avoids having to load
        the entire checkpoint on each rank, hopefully preventing OOM issues
        NcpuF)Zmap_locationweights_onlyNon-tensor value identified at . At this time  only supports loading Tensors.)srcZasync_opzreq z mismatch sizes, z vs ) r   r   is_coordinatorr   AssertionErrortorchloadr   itemstyper   ZBYTE_IORuntimeErrorZstorage_indexZfqn__name__cudaZ
empty_like
state_dictdist	broadcastr    r   Zstorage_offsetslengthsZresolve_tensordetachsizeZcopy_Zcommit_tensorr   Z
set_result)	r"   r)   r*   Ztorch_state_dict_reqtensorZtarget_tensorZfutr#   r#   r$   	read_dataI   s<    
  


z%BroadcastingTorchSaveReader.read_data)metadatar1   r!   c                 C   s0   || _ | j rt | jkst| jdk	s,tdS *Implementation of the StorageReader methodN)r1   r;   Zget_rankr    r2   r   )r"   rD   r1   r#   r#   r$   set_up_storage_reader{   s    z1BroadcastingTorchSaveReader.set_up_storage_reader)r)   r!   c                 C   s   |S rF   r#   )r"   r)   r#   r#   r$   prepare_local_plan   s    z.BroadcastingTorchSaveReader.prepare_local_plan)global_planr!   c                 C   s   |S rH   r#   )r"   rJ   r#   r#   r$   prepare_global_plan   s    z/BroadcastingTorchSaveReader.prepare_global_plan)r   r!   c                 C   s
   || _ dS rE   )r   )r"   r   r#   r#   r$   reset   s    z!BroadcastingTorchSaveReader.resetc                 C   s   t j|S rH   )ospathisfile)clsr   r#   r#   r$   validate_checkpoint_id   s    z2BroadcastingTorchSaveReader.validate_checkpoint_id)Nr   )N)r8   
__module____qualname____doc__r   r   strrM   PathLikeintr%   r   r(   r   r   r   rC   boolrG   rI   r   rK   rL   classmethodrQ   r#   r#   r#   r$   r   '   s      2c                       s2   e Zd ZdZdeee edd fddZ  Z	S )r   a  
    Extension of DefaultLoadPlanner, which creates a new Metadata object based on the passed in state dict,
    avoiding the need to read metadata from disk. This is useful when reading formats which don't have a
    metadata file, like Torch Save files.

    . N.B. Intended to be used with BroadcastingTorchSaveReader

    .. warning::
        Current implementation only supports loading Tensors.

    >>> # xdoctest: +SKIP("undefined vars")
    >>> sd = {"mode": model}
    >>> dcp.load(
    >>>    sd,
    >>>    storage_reader=BroadcastingTorchSaveReader(),
    >>>    planner=DynamicMetaLoadPlanner(),
    >>>    checkpoint_id="path_to_model.pt"
    >>> )
    NF)r:   rD   r1   r!   c                    s~   t  ||| i }| j D ]N\}}t|sLtd| dt| j dt	t
|jd| t|||< qt|d| _dS )zdSetups of the planner, extnding default behavior by creating the Metadata object from the state dictr-   r.   r/   )dtyper&   N)superset_up_plannerr:   r5   r3   Z	is_tensorr7   r6   r8   r   r   rZ   r?   r   r   rD   )r"   r:   rD   r1   r'   keyrB   	__class__r#   r$   r\      s    


z%DynamicMetaLoadPlanner.set_up_planner)NF)
r8   rR   rS   rT   r   r   r   rX   r\   __classcell__r#   r#   r^   r$   r      s     )dcp_checkpoint_dirtorch_save_pathc                 C   s*   i }t |t| t dd t|| dS )aq  
    Given a directory containing a DCP checkpoint, this function will convert it into a
    Torch save file.

    Args:
        dcp_checkpoint_dir: Directory containing the DCP checkpoint.
        torch_save_path: Filename to store the converted Torch save file.

    .. warning::
        To avoid OOM, it's recommended to only run this function on a single rank.
    T)Zstorage_readerr*   no_distN)r   r	   r   r3   save)ra   rb   sdr#   r#   r$   r      s    )rb   ra   c                 C   s$   t j| dd}t|t|dd dS )aB  
    Given the location of a torch save file, converts it into a DCP checkpoint.

    Args:
        torch_save_path: Filename of the Torch save file.
        dcp_checkpoint_dir: Directory to store the DCP checkpoint.

    .. warning::
        To avoid OOM, it's recommended to only run this function on a single rank.
    F)r,   T)Zstorage_writerrc   N)r3   r4   r   r
   )rb   ra   r:   r#   r#   r$   r      s      __main__c                   @   s   e Zd ZdZdZdS )
FormatModeZtorch_to_dcpZdcp_to_torchN)r8   rR   rS   TORCH_TO_DCPDCP_TO_TORCHr#   r#   r#   r$   rg      s   rg   modezConversion modec                 C   s   g | ]
}|j qS r#   )value).0mr#   r#   r$   
<listcomp>   s     rn   )r6   helpchoicesdefaultr0   zPath to the source model)r6   ro   dstzPath to the destination modelzConverting checkpoint from z to z using method: ''zNo checkpoint found at z. Skipping conversion.zUnknown conversion mode: )FargparserM   enumr   typingr   r   r   r   r   r3   Ztorch.distributeddistributedr;   Ztorch.distributed._shard._utilsr   Ztorch.distributed.checkpointr	   r
   Z)torch.distributed.checkpoint._nested_dictr   Z,torch.distributed.checkpoint.default_plannerr   r   Z%torch.distributed.checkpoint.metadatar   r   r   r   r   Z$torch.distributed.checkpoint.plannerr   r   r   Z,torch.distributed.checkpoint.planner_helpersr   Z.torch.distributed.checkpoint.state_dict_loaderr   Z-torch.distributed.checkpoint.state_dict_saverr   Z$torch.distributed.checkpoint.storager   Ztorch.futuresr   __all__r   r   rU   rV   r   r   r8   rg   ArgumentParserparseradd_argumentrh   
parse_argsargsprintr0   rr   rj   Zcheckpoint_missing_warningrk   rN   rO   ri   isdir
ValueErrorr#   r#   r#   r$   <module>   sp   n/


