U
    yhp5                     @   s$  d dl Z d dlmZmZmZmZmZ d dlZd dlm	Z
 d dlm	  m  mZ d dlm	  mZ d dlmZmZmZmZ d dlmZ d dlmZ d dlmZmZmZmZ d dlm Z  d dl!m"Z" d d	l#m$Z$ d d
l%m&Z& d dl'm(Z( d dl)m*Z*m+Z+ dgZ,eeej-ej-f dddZ.ee/eej-ej-f dddZ0eeej-ej-f dddZ1ee/edddZ2eej3edddZ4eej3dddZ5ej6ej7e/ej6dd d!Z8ej7e/e/e/e
j3ej7d"d#d$Z9ej7e/eed%d&d'Z:ej7eej7ee f dd(d)Z;eee ej7d*d+d,Z<G d-d de$Z=dS ).    N)AnycastListOptionalTuple)ShardShardedTensorShardedTensorMetadataTensorProperties)ShardMetadata)ChunkShardingSpec)
DeviceMeshDTensor	Replicater   )_mesh_resources)_set_fsdp_flattened)FSDPExtensions)_create_chunk_sharded_tensor)_remote_device)_flatten_tensor_unflatten_tensorDTensorExtensionstensorreturnc                 C   s   | j }|jdkstd| jd }dgt|   }|jdd}| jd  rptt|j	}| || }|||< t
|| j fS )N   &Only 1D DeviceMeshes currently handledr   )Zmesh_dim)device_meshndimAssertionError
placementslensizeis_shardr   DSharddimtorchSize_local_tensor)r   r   	placementoffsetsZ
num_chunksZ	shard_dim
chunk_size r,   X/var/www/html/venv/lib/python3.8/site-packages/torch/distributed/tensor/parallel/fsdp.py_get_box"   s    
r.   )r   idxr   c                    s(   t | \}}t fdd|D |fS )Nc                    s   g | ]}|  qS r,   r,   ).0valr/   r,   r-   
<listcomp>4   s     z _get_box_for.<locals>.<listcomp>)r.   r&   r'   )r   r/   r*   r"   r,   r2   r-   _get_box_for2   s    r4   c                 C   s(   | j }| }|d k	stt| |d S )Nr   )r   Zget_coordinater   r4   )r   r   Zcoordr,   r,   r-   _get_local_box7   s    r5   )dtcurrent_rankr   c                 C   sJ   | j }|jdkstdt| \}}tt|t|d| d| jj dS )Nr   r   rank:/Zshard_offsetsZshard_sizesr)   )r   r   r   r5   r   listr(   device)r6   r7   meshr*   sizesr,   r,   r-   _create_shard_md_from_dt>   s    r?   )r6   dt_pgr   c           	   
   C   s   g }t |}|dkrdnd}| jd  r6| }nd}t|D ]J}t| |\}}|tt	|t	|d|dkrt|n| d| j
j d qBt||  t| j| j| jddS )Nr   r   r8   r9   r:   )dtypelayoutrequires_grad)Zshards_metadatar"   tensor_properties)distget_rankr    r#   r"   ranger4   appendr   r;   r(   r<   r	   r
   rA   rB   rC   )	r6   r@   Z	shards_mdZmy_rankZscapegoat_rankZshard_countir*   r>   r,   r,   r-   !_create_sharded_tensor_md_from_dtJ   s0    


rJ   )r6   r   c                 C   s    | j }|jdkstd| S )Nr   r   )r   r   r   Z	get_group)r6   r=   r,   r,   r-   
_get_dt_pgq   s    rK   )specr   rankr   c                 C   s   t | ts| S d}| jD ]0}tt|}| |kr| |jkrd} qJq|rt| } t	| jD ]H\}}tt|}| |krb| |jkrbtd| d|j | j|< qb| S )z
    Rewrite ``spec`` to match the device of ``tensor``.

    FSDP.sharded_optim_state_dict sneakly ships optimizer state to CPU so if the original ShardingSpec
    produces CUDA metadata, ST construction bombs.
    FTr8   r9   )

isinstancer   r    r   r   rM   r<   copydeepcopy	enumerate)rL   r   rM   ZrewriteprI   r)   r,   r,   r-   _rewrite_spec_if_neededw   s    	




rS   )r   rM   
world_sizenum_devices_per_nodepgr   c                 C   s$  t | tkrt|  dks t|  }t|||||}|  d }t|t	|j
g}t	| 
 }	d|	j_tj||	| jdd}
|
S t | tkr| j}|jdkstd| j}t|||tj |}t| }t|t| t|g}t| |}	d|	j_tj||	|dd}
|
S t| ||||S d S )Nr   r   F)Zsharded_tensor_metadataZprocess_groupZ
init_rrefsr   )typer   r!   local_shardsr   Zlocal_tensorr   r   rO   rP   metadatarD   rC   Z+_init_from_local_shards_and_global_metadataZ_process_groupr   r   r   r(   r&   cudaZdevice_countrK   r?   rE   rF   rJ   )r   rM   rT   rU   rV   Zinner_paramZinner_stZouter_local_shardshardsZst_metaZst_outerr   r@   r,   r,   r-   _chunk_tensor   sh    
r\   r   rM   r   r   c                 C   s$  t |}|dkrtd|jdk r8td|j dd|   } t| tjrt| t	sdd t
|jD }d	d t
|jD }td
|d
< t	j| ||ddj||dS | j}|d
 }|  } dd t
|jD }||d< dd t
|jD }td
|d< ||d< t	j| ||ddj||dS dS )z
    Shard a tensor to chunks along the first dimension.

    The local rank will gets its corresponding chunk as the local tensor to create a DTensor.
    Nz4No parent device_mesh is found for FSDP device_mesh.   z!Found parent device_mesh of ndim=,zbut meshes must be at least 2D.c                 S   s   g | ]
}t  qS r,   r   r0   _r,   r,   r-   r3      s     z"_chunk_dtensor.<locals>.<listcomp>c                 S   s   g | ]
}t  qS r,   r`   ra   r,   r,   r-   r3      s     r   F)Z	run_checkr   r    c                 S   s   g | ]
}t  qS r,   r`   ra   r,   r,   r-   r3     s     c                 S   s   g | ]
}t  qS r,   r`   )r0   rI   r,   r,   r-   r3     s     )r   Zget_parent_meshRuntimeErrorr   clonedetachrN   r&   Tensorr   rG   r$   Z
from_localredistributer    to_local)r   rM   r   parent_meshZreplicate_placementsZshard_placementsZtp_placementsZtp_placementr,   r,   r-   _chunk_dtensor   sN    


      rm   c                 C   sZ   t t|  }t|dkrBt|d jtkrB|d j}| }|} | t|dkrT|ng fS )Nr   r   )r   r   rX   r!   rW   r   )r   r[   Zinner_tensorr,   r,   r-   _pre_load_state_dict   s    
rn   r   rl   r   c                 C   sX   || j ksttt| j}tdt|d D ]}t ||< q0| j	| j |d} | 
 S )zGAll gather a DTensor in its FSDP dimension and return the local tensor.r   r   rc   )r   r   r;   rO   rP   r    rG   r!   r   rj   rk   )r   rl   r    rI   r,   r,   r-   _all_gather_dtensor,  s    rp   c                	       s   e Zd ZdZdd fddZejeejee	 f dddZ
eje	ejd	d
dZdejeeeejeej ejdddZejeeejdddZejeejee f dddZeee ejdddZ  ZS )r   z
    DTensorExtension is the TensorFlattener extension needed for 2D FSDP + TP.

    This is the implementation for FSDPExtensions defined in
    https://github.com/pytorch/pytorch/blob/main/torch/distributed/fsdp/_fsdp_extensions.py
    N)r   c                    s*   t    d | _|| _tj| j| _d S N)super__init__compute_streamdevice_handler&   Z_dynamodisablepost_unflatten_transform)selfru   	__class__r,   r-   rs   G  s    
zDTensorExtensions.__init__r   c                 C   s   t |S rq   )r   rx   r   r,   r,   r-   pre_flatten_transformO  s    z'DTensorExtensions.pre_flatten_transform)r   param_extensionr   c              
   C   sT   | j p| j }| j|. t||| j| j d}t| |W  5 Q R  S Q R X d S )N)ru   rt   )rt   ru   Zcurrent_streamstreamr   r   )rx   r   r}   r~   resultr,   r,   r-   rw   U  s    z*DTensorExtensions.post_unflatten_transform)r   rM   rT   rU   rV   r<   r   c                 C   s   t |||||S rq   )r\   )rx   r   rM   rT   rU   rV   r<   r,   r,   r-   chunk_tensorh  s    	zDTensorExtensions.chunk_tensorr]   c                 C   s   t |||S rq   )rm   )rx   r   rM   r   r,   r,   r-   chunk_dtensors  s    zDTensorExtensions.chunk_dtensorc                 C   s   t |S rq   )rn   r{   r,   r,   r-   pre_load_state_dict_transform{  s    z/DTensorExtensions.pre_load_state_dict_transformro   c                 C   s
   t ||S rq   )rp   )rx   r   rl   r,   r,   r-   all_gather_dtensor  s    z$DTensorExtensions.all_gather_dtensor)N)__name__
__module____qualname____doc__rs   r&   ri   r   r   r   r|   rw   intrE   ProcessGroupr<   r   r   r   r   r   r   r   r   __classcell__r,   r,   ry   r-   r   @  s>   
  
)>rO   typingr   r   r   r   r   r&   Ztorch.distributeddistributedrE   Z&torch.distributed._shard.sharding_specZ_shardZsharding_specZ
shard_specZ"torch.distributed.distributed_c10dZdistributed_c10dZc10dZ'torch.distributed._shard.sharded_tensorr   r   r	   r
   r   Z:torch.distributed._shard.sharding_spec.chunk_sharding_specr   Ztorch.distributed._tensorr   r   r   r$   Ztorch.distributed.device_meshr   Z$torch.distributed.fsdp._common_utilsr   Z'torch.distributed.fsdp._fsdp_extensionsr   Z#torch.distributed.fsdp._shard_utilsr   Ztorch.distributed.remote_devicer   Z6torch.distributed.tensor.parallel._data_parallel_utilsr   r   __all__r'   r.   r   r4   r5   r?   r   rJ   rK   ZShardingSpecri   rS   r\   rm   rn   rp   r   r,   r,   r,   r-   <module>   sd    '  KC