U
    yh2                     @   s  d dl Z d dlmZmZmZmZmZmZmZ d dl	Z	d dl
mZ d dlmZ d dlmZ d dlmZ d dlmZ d dlmZ d dlmZ d d	lmZ d d
lmZ d dlmZm Z m!Z!m"Z"m#Z#mZm$Z$ d dl%m&Z&m'Z' d dl(m)Z)m*Z* d dl+m,Z, d dl-m.Z. d dl/m0Z0m1Z1m2Z2 d dl3m4Z4 d dl5m6Z6 d dl7m8Z8 ee9eeee:  ee: f f Z;dgZ<d)e:e9e9dddZ=d*eej> edddZ?e	j@eAdddZBd+eee: e9e	j@dd d!ZCe#ee;eej> f d"d#d$ZDG d%d& d&eZEd,e#e9e.ee' e#d'd(dZFdS )-    N)castDictListOptionalSequenceTupleUnion)_get_device_module)ShardedTensor)TensorProperties)Shard)ChunkShardingSpec)DTensor)unflatten_state_dict)DefaultLoadPlanner)BytesStorageMetadataChunkStorageMetadataMetadataMetadataIndexSTATE_DICT_TYPEr   TensorStorageMetadata)LoadPlanLoadPlanner)_create_read_items create_read_items_for_chunk_list)load_state_dict)StorageReader)_element_wise_add_element_wise_sub_normalize_device_info)_get_default_group)_create_chunk_sharded_tensor)_remote_device!load_sharded_optimizer_state_dictcuda)global_rankdevice_typereturnc                 C   s2   |dkrdS t |}| r.t|| |  S dS )Ncpu)r	   Zis_availabler   device_count)r%   r&   device_module r+   X/var/www/html/venv/lib/python3.8/site-packages/torch/distributed/checkpoint/optimizer.py_gen_rank_device4   s     
r-   )pgr'   c                    sl   t j j d kr2fddtt  D }n fddt  D }tdtt	t
ttf  |dS )Nc                    s"   g | ]}d | dt |  qS rank:/)r-   .0idx)pg_device_typer+   r,   
<listcomp>D   s   z(_create_colwise_spec.<locals>.<listcomp>c              
      s*   g | ]"}d | dt t | qS r/   )r-   distZget_global_rankr2   r.   r5   r+   r,   r6   I   s   r   dim
placements)r7   distributed_c10d_get_pg_default_devicetyperangeget_world_sizesizer   r   r   r   r"   str)r.   r;   r+   r8   r,   _create_colwise_spec?   s    


rC   )valr'   c                 C   s   t | tkrZt|  dkr dS t |  d jtkr:dS t |  d jtkrtdn0t | tkrt | jtkst | jtkrtddS )Nr   FTz2Cannot handle DTensor nested insided ShardedTensorzCannot handle nested DTensor)r>   r
   lenlocal_shardstensorr   
ValueErrorZ_local_tensor)rD   r+   r+   r,   _is_nested_tensorS   s    
rI   )propsrA   r&   r'   c              
   C   s.   t j|| j| j| j| jtt jt|	 dS )N)rA   dtypelayoutrequires_grad
pin_memorydevice)
torchemptyrK   rL   rM   rN   r   rO   r	   Zcurrent_device)rJ   rA   r&   r+   r+   r,   _alloc_tensorb   s    rR   )
state_dictr'   c                 C   s   i }d}|   D ]r\}}d| f||< t|rt| dksHtdt|tsZtd| d }|jj	|jj
f||< |jj}q||fS )a+  
    Load the right TP slice of the optimizer state.

    This is not easy since the per-tensor slicing can't be inferred from checkpoint metadata.
    We take advantage of the model state_dict producing a sliced ST to figure out what we need to load.
    This is pretty fragile and it might be easier for FSDP to compute this info for us.
    Returns a dictionary where keys are the same of the state_dict and the value is a tuple of
    (offset, size) for the current rank TP slice.
    N.B. The state_dict *MUST* come from FSDP.sharded_state_dict.
    N   z%Cannot handle ST with multiple shardsz$Can only handle nested ShardedTensorr   )itemsrA   rI   rE   rF   AssertionError
isinstancer
   metadatashard_offsetsshard_sizesrG   Z_process_group)rS   specsdp_pgkeyvalueZshardr+   r+   r,   _get_state_dict_2d_layouto   s.     
r_   c                       sv   e Zd ZU eeef ed< eed< eed< eee	e
 f dd fddZedd	d
Zeejd fddZ  ZS )_ReaderWithOffsettranslationrS   rX   N)fqn_to_offsetr'   c                    s*   t    || _ti | _i | _i | _d S N)super__init__rb   r   rX   rS   ra   )selfrb   	__class__r+   r,   re      s
    

z_ReaderWithOffset.__init__)r'   c                 C   s"  g }i | _ | j D ]\}}| jj| }t|tsF|t|||7 }q|| jkrb|t|||7 }q| j| }t	|
 dkst|
 d }ttt|jj|t|jjdg}t|tt||}|D ]D}	|	jjd k	stt|	jj|}
tj|	jt|
d}|| j |	j< q||7 }qt|S )NrT   r   )offsetssizes)offset)ra   rS   rU   rX   state_dict_metadatarW   r
   r   rb   rE   rF   rV   r   rP   Sizer   rY   rZ   r   r   r   Z
dest_indexrk   r   dataclassesreplacer   )rf   requestsZfqnobjmdrk   Zoriginal_shardZlocal_chunksreqsriZoriginal_offsetZoriginal_indexr+   r+   r,   create_local_plan   sF    


	   
z#_ReaderWithOffset.create_local_plan)indexr'   c                    s   t  | j||S rc   )rd   lookup_tensorra   get)rf   rv   rg   r+   r,   rw      s    z_ReaderWithOffset.lookup_tensor)__name__
__module____qualname__r   r   __annotations__r   r   rB   r   intre   r   ru   rP   Tensorrw   __classcell__r+   r+   rg   r,   r`      s   
 *r`   )model_state_dictoptimizer_keystorage_readerplannerr'   c              	   C   sX  |  }t| \}}tj|j}t|}|dkr~g }	tt D ],}
t	||
|
  }|	d|
 d|  qBtd|	d}nt|}i }i }|j D ]\}}|j| }|d |krqt|trd||< q|j dkrt|j|j|||< q|dkr.tt|j|j|t t |
 t d||< q|d	 }||d|jfd }t|jj|jj|jj|jj|jj d
}|!t"#||}g }t|}|j$D ]>}t%t&|j'( |krq|t)t|j|j*||d qt+j,|||d}||kr|| d dk	rt%t-t. || d ||< |||< qt/|||dk	r@t0|n|d t1||j}|S )a  
    Load a state_dict in conjunction with FSDP sharded optimizer state.

    This is the current recommended way to checkpoint FSDP.
    >>> # xdoctest: +SKIP
    >>> import torch.distributed.checkpoint as dist_cp
    >>> # Save
    >>> model: torch.nn.Model
    >>> optim_params = model.parameters()
    >>> optim = torch.optim.SGD(optim_params, lr=0.01)
    >>> # Save
    >>> with FSDP.state_dict_type(model, StateDictType.SHARDED_STATE_DICT):
    >>>     state_dict = {
    >>>         "optimizer": FSDP.optim_state_dict(model, optim),
    >>>         "model": model.state_dict()
    >>>     }
    >>>     dist_cp.save_state_dict(
    >>>         state_dict=optim_state,
    >>>         storage_writer=dist_cp.FileSystemWriter("checkpoint"),
    >>>         planner=dist_cp.DefaultSavePlanner(),
    >>>     )
    >>>
    >>> # Load
    >>> with FSDP.state_dict_type(model_tp, StateDictType.SHARDED_STATE_DICT):
    >>>     model_state_dict = model_tp.state_dict()
    >>>     checkpoint = {
    >>>         "model": model_state_dict
    >>>     }
    >>>     dist_cp.load_state_dict(
    >>>         state_dict=checkpoint,
    >>>         storage_reader=dist_cp.FileSystemReader(checkpoint_file),
    >>>         planner=dist_cp.DefaultLoadPlanner(),
    >>>     )
    >>>     model.load_state_dict(checkpoint["model_state"])
    >>>
    >>>     optim_state = dist_cp.load_sharded_optimizer_state_dict(
    >>>         model_state_dict,
    >>>         optimizer_key="optimizer",
    >>>         storage_reader=dist_cp.FileSystemReader("checkpoint"),
    >>>     )
    >>>
    >>>     flattened_osd = FSDP.optim_state_dict_to_load(
    >>>        model, optim, optim_state["optimizer"]
    >>>     )
    >>>
    >>>     optim.load_state_dict(flattened_osd)
    Nr0   r1   r   r9   z
<bytes_io>rT   )rankZ
world_sizeZnum_devices_per_noder.      )rK   rL   rM   memory_formatrN   )rG   rX   )Zprocess_group)rS   r   r   )2Zread_metadatar_   r7   r<   r=   r>   r	   r?   r@   r   r)   appendr   rC   rl   rU   Zplanner_datarW   r   rA   ZnumelrR   
propertiesr!   Zget_rankr    rx   ShardTensorPropertiesrK   rL   rM   r   rN   Zbuild_metadatarP   rm   Zshards_metadatar   r"   Z	placementr   r   rZ   r
   Z+_init_from_local_shards_and_global_metadatar   r}   r   r`   r   )r   r   r   r   rX   Zlayout_specsr\   Zdp_pg_device_typer*   r;   iZdevice_infoZsharding_specrS   rb   r]   r^   Zkey_pathZspec_keyZ
alloc_sizer   Zst_mdrF   Zcurrent_rankZshard_mdstr+   r+   r,   r#      s    5 


  



  	  
)r$   )N)r$   )N)Grn   typingr   r   r   r   r   r   r   rP   Ztorch.distributeddistributedr7   Ztorch._utilsr	   Z+torch.distributed._shard.sharded_tensor.apir
   Z0torch.distributed._shard.sharded_tensor.metadatar   r   Z-torch.distributed._shard.sharded_tensor.shardr   Z:torch.distributed._shard.sharding_spec.chunk_sharding_specr   Ztorch.distributed._tensorr   Z)torch.distributed.checkpoint._nested_dictr   Z,torch.distributed.checkpoint.default_plannerr   Z%torch.distributed.checkpoint.metadatar   r   r   r   r   r   Z$torch.distributed.checkpoint.plannerr   r   Z,torch.distributed.checkpoint.planner_helpersr   r   Z.torch.distributed.checkpoint.state_dict_loaderr   Z$torch.distributed.checkpoint.storager   Z"torch.distributed.checkpoint.utilsr   r   r   Z"torch.distributed.distributed_c10dr    Z#torch.distributed.fsdp._shard_utilsr!   Ztorch.distributed.remote_devicer"   rB   r}   ZSTATE_DICT_2D_LAYOUT__all__r-   ZProcessGrouprC   r~   boolrI   rR   r_   r`   r#   r+   r+   r+   r,   <module>   sb   $$	     %> 