U
    Mh                     @   s~  d dl Z d dlZd dlZd dlZd dlZd dlmZmZ d dl mZ d dl	m
Z
 d dlmZmZ d dlmZ d dlmZmZmZmZmZmZmZmZmZ d dlmZ d dlZd dlmZ d dl m!Z! d dl"m!  m#Z$ d d	l%m&Z& d d
l'm(Z( d dl)m*Z*m+Z+ d dl,m-Z-m.Z.m/Z/ d dl0m1Z1 d dl2m3Z3m4Z5 d dl6m7Z7 d dl8m9Z9 d dl:m;Z;m<Z<m=Z= d dl>m?Z? d dl@mAZAmBZBmCZC d dlDmEZEmFZFmGZGmHZH d dl mIZImJZJ d dlKmLZM d dlNmOZOmPZPmQZQmRZR d dlSmTZTmUZU d dlVmWZW G dd deZXG dd deZYG dd de!jZeZ[e!jZej\ed d!d"Z]dve!jZe^d%d&d'Z_dwd(d)Z`d*d+ Zad,d- Zbdxe!jZe^d.d/d0Zce!jZe^d1d2d3Zde!jZe^d4d5d6ZeG d7d8 d8ZfG d9d: d:e[ZgG d;d< d<e[ZhG d=d> d>ehZiG d?d@ d@ehZjG dAdB dBe[ZkG dCdD dDekZlG dEdF dFe!jZZmG dGdH dHehZnG dIdJ dJe!jZZoG dKdL dLe!jpZqG dMdN dNe!jZZre jsedOdPdQZte jsedRdSdTZue jsedUdVdWZvee jsedXdYdZZwee jsed[d\d]Zxee jsed^d_d`ZyeeeedadbdcZzdye!jZe!jZee{def dfdgdhZ|G didj djePZ}G dkdl dleOZ~dzee dmdndoZG dpdq dqe!jZZG drds dse!jZZG dtdu due!jZZdS ){    N)ABCabstractmethod)nullcontext)deepcopy)autoEnumwraps)	AnyCallableDictListno_type_checkOptionalTupleTypeUnion)mock)
checkpoint)fully_shard)FSDPParamGroupRegisterPostBackwardFunction)distribute_tensorDTensorShard)
DeviceMesh)
CPUOffloadFullyShardedDataParallel)TrainingState)#NO_RESHARD_AFTER_FORWARD_STRATEGIES)BackwardPrefetchMixedPrecisionShardingStrategy)ShardedGradScaler)always_wrap_policyModuleWrapPolicywrap)ColwiseParallelparallelize_moduleRowwiseParallelSequenceParallel)TransformerDecoderLayerTransformerEncoderLayer)DistributedDataParallel)MultiProcessTestCaseMultiThreadedTestCaserun_subtests
TEST_SKIPS)FILE_SCHEMAget_cycles_per_ms)
has_tritonc                   @   s   e Zd Ze Ze ZdS )FSDPInitModeN)__name__
__module____qualname__r   NO_FSDP	RECURSIVE r;   r;   U/var/www/html/venv/lib/python3.8/site-packages/torch/testing/_internal/common_fsdp.pyr5   C   s   r5   c                   @   s   e Zd Ze Ze Ze ZdS )CUDAInitModeN)r6   r7   r8   r   CUDA_BEFORE
CUDA_AFTERZ
CUDA_NEVERr;   r;   r;   r<   r=   L   s   r=   c                   @   sn   e Zd ZdZeeejdf dddZeejdddZ	eddd	d
Z
eeeeejdddZdS )FSDPTestModelzZThis defines the interface expected from all models used commonly for
    FSDP unit tests..returnc                 C   s   dS )z+Returns an input for the model as as tuple.Nr;   selfdevicer;   r;   r<   	get_inputY   s    zFSDPTestModel.get_inputc                 C   s   dS )z,Returns the loss given the input and output.Nr;   )rD   inputoutputr;   r;   r<   get_loss^   s    zFSDPTestModel.get_lossNc                 C   s   dS )z<Runs the backward pass (e.g. including ``loss.backward()``).Nr;   rD   lossr;   r;   r<   run_backwardc   s    zFSDPTestModel.run_backward)argskwargsrB   c                  O   s   dS )z&Initializes an instance of this model.Nr;   rM   rN   r;   r;   r<   inith   s    zFSDPTestModel.init)r6   r7   r8   __doc__r   r   torchTensorrF   rI   rL   staticmethodr
   nnModulerP   r;   r;   r;   r<   r@   U   s   r@   )modelprocess_group	assert_fnc                 C   s   dd |   D }|dd |  D 7 }t|}dd t|D }tj|||d |d }|dk	sht|dd D ]6}|dk	stt||D ]\\}}	\}}
||	|
 qqtdS )	a  
    All-gathers module states across ranks and calls ``assert_fn`` on each pair
    of corresponding states from rank 0 and a nonzero rank. For example, if
    ``assert_fn`` is ``self.assertEqual()``, then this checks that all module
    states are equal across ranks.
    c                 S   s    g | ]\}}||   fqS r;   detachcpu).0
param_nameparamr;   r;   r<   
<listcomp>{   s   z)_assert_module_states.<locals>.<listcomp>c                 S   s    g | ]\}}||   fqS r;   rZ   )r]   Zbuffer_namebufferr;   r;   r<   r`      s   c                 S   s   g | ]}d qS Nr;   )r]   _r;   r;   r<   r`      s     groupr   N   )named_parametersZnamed_buffersdistZget_world_sizerangeZall_gather_objectAssertionErrorzip)rW   rX   rY   Znamed_module_states
world_sizeolistZrank0_statesstaterc   p1Zp2r;   r;   r<   _assert_module_stateso   s    
rp   FT)rW   zero_buffersc                 C   s   |rt | nt }|^ |  D ] }t  |  W 5 Q R X q"|rr|  D ] }t  |  W 5 Q R X qPW 5 Q R X dS )zBZeros the parameters and optionally buffers of ``model`` in place.N)FSDPsummon_full_paramsr   
parametersrR   no_gradZzero_buffers)rW   rq   Zsummon_fullctxr_   ra   r;   r;   r<   _zero_model   s    

rx   c                 C   s    |s|   } |r|   |  S rb   )cudahalf
state_dict)rW   cpu_offloadrz   r;   r;   r<   _get_state_dict   s
    r}   c                    s   d  fdd|D S )Nrc   c                    s$   g | ]}|d k	r t | ndqS )Nnone)str)r]   stest_name_mappingr;   r<   r`      s     z subtest_name.<locals>.<listcomp>)join)r   rM   r;   r   r<   subtest_name   s    r   c                 C   st   |  D ]$\}}|jtdkr| ||< q| dkr:|nd g}t| |d }| D ]}||  ||< qZ|S )Nr\   r   )itemsrE   rR   r\   rh   Zbroadcast_object_listkeysry   )rankr{   r^   r_   rm   r;   r;   r<   _broadcast_state_dict   s    
r   rW   recursec              
   C   s6   t j| |d tt|  W  5 Q R  S Q R X dS )a[  
    Returns the full unsharded parameters of ``model``. Any FSDP-managed
    parameters offloaded to CPU are moved to GPU in the returned list.

    Args:
        recurse (bool): If ``False``, only unshards the parameters immediate to
            ``model``; if ``True``, recurses through the module hierarchy
            rooted at ``model``.
    )r   N)rr   rs   r   listrt   r   r;   r;   r<   get_full_params   s    
r   rW   move_to_cudac                 C   s   |r|   S | S rb   )ry   r   r;   r;   r<   _maybe_cuda   s    r   )rW   	wrap_fsdpc                 O   s   |s| S t | f||S rb   rr   )rW   r   rM   rN   r;   r;   r<   _maybe_wrap_fsdp   s    r   c                   @   s@   e Zd ZeedddZedddZedddZd	d
 ZdS )DummyProcessGroup)r   sizec                 C   s   || _ || _d S rb   )_rank_size)rD   r   r   r;   r;   r<   __init__   s    zDummyProcessGroup.__init__rA   c                 C   s   | j S rb   )r   rD   r;   r;   r<   r      s    zDummyProcessGroup.rankc                 C   s   | j S rb   )r   r   r;   r;   r<   r      s    zDummyProcessGroup.sizec                 O   s   t  }dd }||_|S )Nc                  S   s   t j } | d | S )Nrf   )rR   ZfuturesFutureZ
set_result)futurer;   r;   r<   
get_future   s    

z/DummyProcessGroup.allreduce.<locals>.get_future)r   ZMockr   )rD   rM   rN   Z	dist_waitr   r;   r;   r<   	allreduce   s    zDummyProcessGroup.allreduceN)r6   r7   r8   intr   r   r   r   r;   r;   r;   r<   r      s   r   c                       s   e Zd Zejeeed fddZdd Zdd Z	dd	 Z
d
d Zedejeeeeeef  eeeejef dddZdd Z  ZS )TransformerWithSharedParams)re   cuda_init_modeadd_bndeterministicc                    s   t    | | _| | _|r,td d}d}t||| _	tj
|ddddd| _t||| _| j	j| j_| d| j	j|f | d	tj| jtjd
 d| _|rtj| jntj | _|tjkr|  } |r|   d S )Nr               g?)d_modelZnum_encoder_layersZnum_decoder_layersZdim_feedforwardZdropout
vocab_biaslong_buffer)dtype)superr   r   r   rl   rR   manual_seedrU   Z	Embeddingembed_tokensZTransformertransformerLinearoutput_projweightregister_bufferZnew_onesZ
zeros_liker   longbsZBatchNorm1dZIdentitybnr=   r>   ry   eval)rD   re   r   r   r   Zd_vocabr   	__class__r;   r<   r      s>    



 
z$TransformerWithSharedParams.__init__c                 C   sN   t d| j  t jd|dd| j}t j| jd |dd| j}||fS )Nrf      rE         )rR   r   r   Zarangeviewr   )rD   rE   srctgtr;   r;   r<   rF     s    z%TransformerWithSharedParams.get_inputc                 C   sJ   |  |}|| j | j| }|  |}| |}| ||}| |S rb   )r   r   r   Ztype_asr   r   r   )rD   Zsrc_idsZtgt_idsr   r   xr;   r;   r<   forward  s    


z#TransformerWithSharedParams.forwardc                 C   s.   |\}}t jj|d|d|dddS )Nsum)Z	reduction)rU   
functionalZcross_entropyr   r   )rD   rG   rH   rc   r   r;   r;   r<   rI   %  s      z$TransformerWithSharedParams.get_lossc                 C   s   |   d S rb   backwardrJ   r;   r;   r<   rL   +  s    z(TransformerWithSharedParams.run_backwardNFT)re   fsdp_init_moder   fsdp_kwargsr   r   rB   c                 C   s   |dkri }|t jkr<t| tr*| d }n| }t||||S |t jkrd|kr\ttth}n
|	d}d|kr|d t
jt
jhkrt| tsd}n| }t| tr| d }	n| }	t|	|||}
t|
|fd|i|}|tjkr| }|S td| dS )ao  
        Initializes a :class:`TransformerWithSharedParams` instance.

        Args:
            fsdp_init_mode (FSDPInitMode): If ``NO_FSDP``, then does not wrap
                any modules with FSDP. If ``RECURSIVE``, then wraps with
                top-level FSDP. By default, the top-level FSDP uses the
                ``ModuleWrapPolicy`` for encoder and decoder layers, but a
                different auto wrap policy may be specified via
                ``fsdp_kwargs``.
            cuda_init_mode (CUDAInitMode): Determines model movement to CUDA.
            fsdp_kwargs (Optional[Dict[str, Any]]): Optional keyword arguments
                forwarded to the FSDP constructor.
            deterministic (bool): Whether to make the model deterministic
                across constructions.
            add_bn (bool): Whether to include batch norm in the model.
        Nr   auto_wrap_policysharding_strategyUnsupported FSDP init mode: )r5   r9   
isinstancetupler   r:   r%   r,   r+   popr"   ZHYBRID_SHARDZ_HYBRID_SHARD_ZERO2rr   r=   r?   ry   
ValueError)re   r   r   r   r   r   Zpgr   Zfsdp_pgZ
tformer_pgm
fsdp_modelr;   r;   r<   rP   .  sf    


   




   
z TransformerWithSharedParams.initc                 C   s   | j gS rb   )r   r   r;   r;   r<   get_ignored_modules|  s    z/TransformerWithSharedParams.get_ignored_modules)NFT)r6   r7   r8   rh   ProcessGroupr=   boolr   rF   r   rI   rL   rT   r5   r   r   r   r
   r   rU   rV   rr   rP   r   __classcell__r;   r;   r   r<   r      s.   *   Mr   c                	       sx   e Zd Zejeeed fddZdd Zdd Z	dd	 Z
d
d Zedejeeeeeef  eejdddZ  ZS )NestedWrappedModulere   r   r   r   c                    s   t     | _ | _|tjk} fdd}|rFtd t	
tt	dd||t	
|tt	dd|tt	dd||tt	dd|tt	dd|| _d S )Nc                    s   rt | f S | S rb   r   layerr   re   r   r;   r<   _maybe_wrap  s    z1NestedWrappedModule.__init__.<locals>._maybe_wrapr   r   r   r   )r   r   r   r   rl   r=   r>   rR   r   rU   
Sequentialr   r   modulerD   re   r   r   r   r   r   r   r   r   r<   r     s$    




zNestedWrappedModule.__init__c                 C   s"   t d| j  t jdd|dfS )Nrf   r   r   r   )rR   r   r   ZrandrC   r;   r;   r<   rF     s    zNestedWrappedModule.get_inputc                 C   s
   |  |S rb   r   rD   r   r;   r;   r<   r     s    zNestedWrappedModule.forwardc                 C   s   |  }|S rb   )r   rD   rG   rH   rK   r;   r;   r<   rI     s    zNestedWrappedModule.get_lossc                 C   s   |   d S rb   r   rJ   r;   r;   r<   rL     s    z NestedWrappedModule.run_backwardNF)re   r   r   r   r   rB   c                 C   sp   |dkri }|t jkr&t| d||dS |t jkr^t| fd||d|}|tjkrZ| }|S td| dS )a  
        Initializes a :class:`NestedWrappedModule` instance.

        Args:
            fsdp_init_mode (FSDPInitMode): If ``NO_FSDP``, then does not wrap
                any modules with FSDP. If ``RECURSIVE``, then wraps some nested
                modules with FSDP but not the top-level module. The model may
                later be wrapped with a top-level FSDP external to this method
                if desired.
            cuda_init_mode (CUDAInitMode): Determines model movement to CUDA.
            fsdp_kwargs (Optional[Dict[str, Any]]): Optional keyword arguments
                forwarded to the FSDP constructor.
            deterministic (bool): Whether to make the model deterministic
                across constructions.
        NFr   r   r   Tr   )r5   r9   r   r:   r=   r?   ry   r   )re   r   r   r   r   r   r;   r;   r<   rP     s.    


zNestedWrappedModule.init)NF)r6   r7   r8   rh   r   r   r=   r   rF   r   rI   rL   rT   r5   r   r   r   r
   rU   rV   rP   r   r;   r;   r   r<   r     s(      r   c                	       s>   e Zd Zedejeeee	e
ef  ed fddZ  ZS )AlwaysWrapNestedWrappedModuleNFre   r   r   r   r   c                    sj   t ttj| tj|||d}|tjkr*|S |tjkrf|p:i }t|fdti|}|tj	krb|
 }|S dS )z
        Initializes a :class:`NestedWrappedModule` instance, but unlike
        :meth:`NestedWrappedModule.init`, for the ``RECURSIVE`` init mode, this
        wraps with top-level FSDP and the ``always_wrap_policy()`` auto wrap
        policy.
        r   r   N)r   r   rP   r5   r9   r:   rr   r$   r=   r?   ry   )re   r   r   r   r   rW   r   r   r;   r<   rP     s$     	


z"AlwaysWrapNestedWrappedModule.init)NF)r6   r7   r8   rT   rh   r   r5   r=   r   r   r   r
   r   rP   r   r;   r;   r   r<   r     s     r   c                	       sf   e Zd Zejeeed fddZeddddZ	edeje
eeeeef  ed	d
dZ  ZS )NonUniformReqGradNWMr   c                    s   t t|    | _ | _|tjk} fdd}|rJt	d t
tt
dd||t
|tt
dd|tt
dd||t
tt
dd|tt
dd|| _d S )Nc                    s   rt | f S | S rb   r   r   r   r;   r<   r     s    z2NonUniformReqGradNWM.__init__.<locals>._maybe_wrapr   r   r   r   )r   r   r   r   r   rl   r=   r>   rR   r   rU   r   r   r   r   r   r   r   r<   r     s,    



zNonUniformReqGradNWM.__init__NrA   c                 C   s,   |   D ]\}}t||s|d qd S )NF)rg   rematchZrequires_grad_)rW   Zreq_grad_masknpr;   r;   r<   _set_nonuniform_req_grad+  s    z-NonUniformReqGradNWM._set_nonuniform_req_gradFr   c                 C   s   t d}|tjkr4t| d||d}t|| |S |tjkr|dkrJi }t| fd||d|}|tjkrt|	 }t|| |S t
d| dS )a  
        Initializes a :class:`NestedWrappedModule` instance, but unlike
        :meth:`NestedWrappedModule.init`, it wraps a second :class:`torch.nn.Sequential`
        container to enable the desired non-uniform ``requires_grad``
        ``use_orig_params=True`` tests. For both ``RECURSIVE`` and ``NO_FSDP``
        init modes, freezes all parameters except the last two to validate
        ``ShardedGradScaler`` support for ranks with no (non-zero sized) local shards in
        FSDP ``use_orig_params=True`` mode.
        zmodule\.2.*\.1.*Fr   NTr   )r   compiler5   r9   r   r   r:   r=   r?   ry   r   )re   r   r   r   r   Zreq_grad_patternZ	ddp_modelr   r;   r;   r<   rP   1  s6    



zNonUniformReqGradNWM.init)NF)r6   r7   r8   rh   r   r   r=   r   rT   r   r5   r   r   r   r
   rP   r   r;   r;   r   r<   r      s"   *  r   c                       sj   e Zd ZdZejeed fddZdd Zdd Z	d	d
 Z
dd Zeee eeeedddZ  ZS )ModuleWithDelayzThis class wraps a :class:`FSDPTestModel` to optionally add a delay
    after computing the loss and/or before the gradient reduction.)r   delay_after_loss_msdelay_before_reduction_msc                    s    t    || _|| _|| _d S rb   )r   r   r   r   r   )rD   r   r   r   r   r;   r<   r   d  s    
zModuleWithDelay.__init__c                 C   s   | j |S rb   )r   rF   rC   r;   r;   r<   rF   o  s    zModuleWithDelay.get_inputc                 C   s
   |  |S rb   r   r   r;   r;   r<   r   r  s    zModuleWithDelay.forwardc                 C   s4   | j ||}| jdkr0tjt| jt   |S Nr   )r   rI   r   rR   ry   _sleepr   r3   r   r;   r;   r<   rI   u  s    
zModuleWithDelay.get_lossc              	      s>   t jj  fdd}td| j| W 5 Q R X d S )Nc                     s,   j dkr"tjtj t    | |S r   )r   rR   ry   r   r   r3   rO   orig_reduce_scatterrD   r;   r<   _delayed_reduce_scatter~  s
    
z=ModuleWithDelay.run_backward.<locals>._delayed_reduce_scatterz'torch.distributed.reduce_scatter_tensor)rR   distributedreduce_scatter_tensorr   patchr   rL   )rD   rK   r   r;   r   r<   rL   {  s     zModuleWithDelay.run_backward)module_class
model_argsr   r   model_kwargsc                O   s   t | j||||S )aA  
        Args:
            module_class (Type[FSDPTestModel]): Wrapped module class to which
                to add delays.
            model_args: Positional arguments forwarded to the ``module_class``
                ``init()``.
            delay_after_loss_ms (int): Delay after computing the loss/before
                the optimizer step (in ms).
            delay_before_reduction_ms (int): Delay before reduce-scattering
                gradients (in ms).
            model_kwargs: Keyword arguments forwarded to the ``module_class``
                ``init()``.
        )r   rP   )r   r   r   r   r   r;   r;   r<   rP     s
    
zModuleWithDelay.init)r6   r7   r8   rQ   rU   rV   r   r   rF   r   rI   rL   rT   r   r@   r
   rP   r   r;   r;   r   r<   r   `  s    r   c                
   @   sF   e Zd Zeejddddfejeee	e
eef  eeedddZdS )NestedWrappedModuleWithDelayNFr   re   r   r   r   r   r   r   c              
   C   s   t jt| ||||||dS )Nr   )r   rP   r   r   r;   r;   r<   rP     s    
z!NestedWrappedModuleWithDelay.init)r6   r7   r8   rT   r=   r?   rh   r   r5   r   r   r   r
   r   r   rP   r;   r;   r;   r<   r     s   r   c                       s$   e Zd Z fddZdd Z  ZS )DummyDDPc                    s   t    || _d S rb   )r   r   r   )rD   r   r   r;   r<   r     s    
zDummyDDP.__init__c                 O   s   | j ||S rb   r   rD   rM   rN   r;   r;   r<   r     s    zDummyDDP.forwardr6   r7   r8   r   r   r   r;   r;   r   r<   r     s   r   c                	       sh   e Zd Zejeeeed fddZdd Z	dd Z
edejeeeeeef  eedddZ  ZS )MixtureOfExperts)re   r   r   delay_before_free_msr   c              
      s  t  j||||d || _|| _|| _|tjk| _|rFt	d| j
  d}d}d}	tt||| j}
tdd |
 D | _|
 D ]
}d|_q|rt	d	 tt||| j}|rtj|
 g}t|
|f|}
t||f|}ttt|	|| j||
tt||	| j| _d S )
Nr   *   r   r   r   c                 s   s   | ]}|  V  qd S rb   )Znumel)r]   r   r;   r;   r<   	<genexpr>  s     z,MixtureOfExperts.__init__.<locals>.<genexpr>Tr   )r   r   re   r   r   r=   r>   r   rR   r   r   r   rU   r   r   rt   Znum_expert_paramsexpertr   Z	new_grouprr   r   r   )rD   re   r   r   r   r   r   Zd_expertZd_sharedZd_inputr  r   ZsharedZexpert_groupr   r;   r<   r     sD    	
zMixtureOfExperts.__init__c              
      sl   j dkrbjd }t|trbtjjjj  fdd}t	
d| |W  5 Q R  S Q R X |S )Nr   r   c                     s"   t jtjt    | |S rb   )rR   ry   r   r   r   r3   rO   Zorig_reshardrD   r;   r<   _delayed_reshard  s    z2MixtureOfExperts.forward.<locals>._delayed_reshardz.torch.distributed.fsdp._runtime_utils._reshard)r   r   r   rr   rR   r   fsdpZ_runtime_utilsZ_reshardr   r   )rD   r   r  r  r;   r  r<   r     s    


 zMixtureOfExperts.forwardc              	   C   sl   |   | jsht L |  D ]<}t|dr0q |jd k	r |j| j tj	j
|j| jd q W 5 Q R X d S )Nr  rd   )r   r   rR   ru   rt   hasattrgradZdiv_rl   r   
all_reducere   )rD   rK   r   r;   r;   r<   rL     s    


zMixtureOfExperts.run_backwardNFr   )re   r   r   r   r   r   c                 C   st   |dkri }|t jkr(t| d|||dS |t jkrbt| fd|||d|}|tjkr^| }|S td| dS )a  
        Initializes a :class:`MixtureOfExperts` instance.

        Args:
            fsdp_init_mode (FSDPInitMode): If ``NO_FSDP``, then does not wrap
                any modules with FSDP. If ``RECURSIVE``, then wraps some nested
                modules with FSDP, including the expert and shared layers, but
                not the top-level module. The model may later be wrapped with a
                top-level FSDP external to this method if desired.
            cuda_init_mode (CUDAInitMode): Determines model movement to CUDA.
            fsdp_kwargs (Optional[Dict[str, Any]]): Optional keyword arguments
                forwarded to the FSDP constructor.
            deterministic (bool): Whether to make the model deterministic
                across constructions.
            delay_before_free_ms (int): Delay before resharding expert
                parameters in the forward pass (in ms).
        NF)r   r   r   r   Tr   )r5   r9   r   r:   r=   r?   ry   r   )re   r   r   r   r   r   r   r;   r;   r<   rP     s2    


zMixtureOfExperts.init)NFr   )r6   r7   r8   rh   r   r   r=   r   r   r   rL   rT   r5   r   r   r   r
   rP   r   r;   r;   r   r<   r     s(   4   r   c                       sX   e Zd Zdddddeeej eeed fddZej	ej	d	d
dZ
dd Z  ZS )MLPNTFr   )biaswith_bufferdim_multiplier)dimrE   r
  r  r  c                   sb   t    tj||| ||d| _tj|| |||d| _|rX| dtj|f|d nd | _	d S )N)rE   r
  ra   r   )
r   r   rU   r   in_projout_projr   rR   Zrandnra   )rD   r  rE   r
  r  r  r   r;   r<   r   P  s    	
zMLP.__init__r   rB   c                 C   s@   |  |}t|}| |}t|}| jd k	r<|| j }|S rb   )r  Frelur  ra   )rD   r   zr;   r;   r<   r   a  s    





zMLP.forwardc                 C   s   | j d k	rtjj| j  d S rb   )ra   rR   rU   rP   Znormal_r   r;   r;   r<   reset_parametersj  s    
zMLP.reset_parameters)N)r6   r7   r8   r   r   rR   rE   r   r   rS   r   r  r   r;   r;   r   r<   r	  O  s    	r	  c                       s>   e Zd Zddeed fddZeeed dddZ  ZS )	MLPStackF)with_seq_parallel)mlp_dimr  c                   sL   t |ddt |t |ddg}|r6|tj|dd t j|  || _d S )N   )r  Fr
  )r	  appendrU   	LayerNormr   r   r  )rD   r  r  modulesr   r;   r<   r   p  s    

zMLPStack.__init__)tp_meshdp_meshuse_activation_checkpointingrB   c                 K   s   t ddtddt ddtddt dd| jr<ttddnt d}| jrZtdd|d< t| ||d | D ]2}t|tjr~ql|rt	| t
|fd	|i| qlt
| fd	|i| | S )
NF)Zuse_local_outputrf   )Zoutput_layouts)z	0.in_projz
0.out_projz	1.in_projz
1.out_projz	2.in_projz
2.out_proj)Zsequence_dim3)device_meshparallelize_planmesh)r'   r)   r  r   r*   r(   r   rU   r  r   r   )rD   r  r  r  r   r"  r   r;   r;   r<   parallelize|  s(    
zMLPStack.parallelize)	r6   r7   r8   r   r   r   r   r$  r   r;   r;   r   r<   r  o  s   r  c                       sR   e Zd ZdZd	eed fddZeje	e
ejejf ejf dddZ  ZS )
DoubleLinearz
    This can be used for returning multiple outputs from a module
    (``use_second_linear=True``) or for having an unused module (``False``).
    T)r  use_second_linearc                    s:   t    t||| _t||| _t | _|| _d S rb   )	r   r   rU   r   lin1lin2ZReLUr  r&  )rD   r  r&  r   r;   r<   r     s
    

zDoubleLinear.__init__r  c                 C   s6   | j r&| | || | |fS | | |S rb   )r&  r  r'  r(  r   r;   r;   r<   r     s     zDoubleLinear.forward)T)r6   r7   r8   rQ   r   r   r   rR   rS   r   r   r   r   r;   r;   r   r<   r%    s
   r%  )new_all_gather_into_tensorc                 c   s4   t j}t   | t _z
d V  W 5 t   |t _X d S rb   )rh   Zall_gather_into_tensorbarrier)r)  Zorig_all_gatherr;   r;   r<   patch_all_gather  s    
r+  )new_reduce_scatter_tensorc                 c   s4   t j}t   | t _z
d V  W 5 t   |t _X d S rb   )rh   r   r*  )r,  r   r;   r;   r<   patch_reduce_scatter  s    
r-  )new_all_reducec                 c   s4   t j}t   | t _z
d V  W 5 t   |t _X d S rb   )rh   r  r*  )r.  Zorig_all_reducer;   r;   r<   patch_all_reduce  s    
r/  )new_unshardc                 c   s4   t j}t  | t _z
d V  W 5 t  |t _X d S rb   )r   Zunshardrh   r*  )r0  Zorig_unshardr;   r;   r<   patch_unshard  s    
r1  )new_post_backwardc                 c   s4   t j}t  | t _z
d V  W 5 t  |t _X d S rb   )r   Zpost_backwardrh   r*  )r2  Zorig_post_backwardr;   r;   r<   patch_post_backward  s    
r3  )new_backwardc                 c   s4   t j}t  | t _z
d V  W 5 t  |t _X d S rb   )r   r   rh   r*  )r4  Zorig_backwardr;   r;   r<   *patch_register_post_backward_hook_backward  s    
r5  )r   rY   rM   rN   c                 O   sN   t |dkr|d }n&d|kr(|d }ntd| d| || |||S )Nr   rH   z,Cannot get reduce-scatter output from
args: z	
kwargs: )lenrj   )clsr   rY   rM   rN   rH   r;   r;   r<   reduce_scatter_with_assert  s    

r8  r;   .)replicated_modulesharded_moduleprefixes_to_ignorec                 C   s   t | | D ]\\}}\}}|}|D ]}	||	d}q,| || | |t t|tsdt|j|j	 }
}t
|tdtdfkrtdt||
|}| | |  |jd kr| |j q| |j t|j|
|}| |jt t|jtst| |j |  qd S )N r   zmFSDP's (Shard(0), Shard(0)) layout differs from distribute_tensor(), so we cannot check for equality using it)rk   rg   replaceassertEqualZassertIsInstancer   r   rj   r!  
placementsr   r   r   Zto_localr  ZassertIsNoneZassertIsNotNone)r7  r9  r:  r;  Zreplicated_nameZreplicated_paramZsharded_nameZsharded_paramZclean_sharded_nameprefixr#  r?  Zsharded_ref_paramZsharded_ref_gradr;   r;   r<   check_sharded_parity  s2     
rA  c                       s@   e Zd Zedd Z fddZdd Zdd Zd	d
 Z  Z	S )FSDPTestMultiThreadc                 C   s   t j rt j S dS )Nr   )rR   ry   is_availabledevice_countr   r;   r;   r<   rl   7  s    zFSDPTestMultiThread.world_sizec                    s   t    |   d S rb   )r   setUpZ_spawn_threadsr   r   r;   r<   rE  ;  s    
zFSDPTestMultiThread.setUpc                 O   s   t | f||S rb   r0   r   r;   r;   r<   r0   ?  s    z FSDPTestMultiThread.run_subtestsc                 C   s   t j  d S rb   rR   _dynamoresetr   r;   r;   r<   perThreadSetUpB  s    z"FSDPTestMultiThread.perThreadSetUpc                 C   s   t j  d S rb   rG  r   r;   r;   r<   perThreadTearDownE  s    z%FSDPTestMultiThread.perThreadTearDown)
r6   r7   r8   propertyrl   rE  r0   rJ  rK  r   r;   r;   r   r<   rB  6  s   
rB  c                       s  e Zd Z fddZedd Zedd Zedd Zd	d
 Zdd Z	dd Z
dd Zedd Zdejeeeee eee eeeeeef  d
ddZddde dddddddddfee eeee eeeee ee  ee eeeeeeeef  eeeef  dddZ!  Z"S )FSDPTestc                    s    t    dtjd< |   d S )N0ZTORCH_NCCL_DESYNC_DEBUG)r   rE  osenvironZ_spawn_processesr   r   r;   r<   rE  J  s    

zFSDPTest.setUpc                 C   s   t j rtt j dS dS )Nr   r   )rR   ry   rC  minrD  r   r;   r;   r<   rl   R  s    zFSDPTest.world_sizec                 C   s
   t j S rb   )rh   Zdistributed_c10dZ_get_default_groupr   r;   r;   r<   rX   V  s    zFSDPTest.process_groupc                 C   s   t  | j S rb   )r2   	file_namer   r;   r;   r<   init_methodZ  s    zFSDPTest.init_methodc                 C   s   |  ||j d S rb   )r>  r|   )rD   r   r|   r;   r;   r<   _check_cpu_offload^  s    zFSDPTest._check_cpu_offloadc                 C   s   |  ||j d S rb   )r>  backward_prefetch)rD   r   rU  r;   r;   r<   _check_backward_prefetcha  s    z!FSDPTest._check_backward_prefetchc                 C   s   |  ||j d S rb   )r>  forward_prefetch)rD   r   rW  r;   r;   r<   _check_forward_prefetchd  s    z FSDPTest._check_forward_prefetchc                 O   s   t | f||S rb   rF  r   r;   r;   r<   r0   g  s    zFSDPTest.run_subtestsc           
   
   C   s   | |}||_ ||_td|j  d|j  tj r:dnd}z tj|j	|t
|j|j d W nB tk
r } z$d|jd krttd j  W 5 d }~X Y nX d }tj rtj r|j tj  }	tj|	 |	g}tj|d	 tj  ||| tj  tj|d	 t  d S )
Nzdist init r=z, world=ZncclZgloo)rS  backendrl   r   Z	recompiler   Zbackend_unavailable)
device_ids)r   rR  printrl   rR   ry   rC  rh   Zinit_process_grouprS  r   RuntimeErrorrM   sysexitr1   	exit_coderD  Z
set_devicer*  rH  rI  Zrun_testZdestroy_process_group)
r7  r   Z	test_namerR  piperD   rY  erZ  Z	device_idr;   r;   r<   _runj  s6    


zFSDPTest._run{Gz?NF)
rW   	num_stepsautocastlrfsdp_cpu_offload
save_modelmixed_precisionenable_sharded_grad_scaleruse_pure_fp16sharded_grad_scaler_kwargsc              
   C   sJ  |o|j }t| j}|
d kr$i }
tf d|i|
}tjj| |dd}t|D ]}|	  tj
jj|d |jtd}|	s|rt|tst|tjr| }ntdd |D }|| }|rt|tr|jtkr| D ]}| |jtd q|j|||}W 5 Q R X ||}|sT|	sT|jtjkstd	nT|	rl| |jtj n<t|tr|d k	st| |j|j n| |jtj |j| |rt|tr| D ]}| |jtd q| | |!  |rTd
d |" # D }t$| |%| qTt|trB|&t'j( |) S )Nenabledg?)rf  Zmomentum)rm  ry   c                 s   s   | ]}|  V  qd S rb   )rz   )r]   r   r;   r;   r<   r    s     z4FSDPTest._train_for_several_steps.<locals>.<genexpr>r\   zeloss data type should be float32, as the original                     parameter data type is float32.c                 S   s   i | ]\}}||  qS r;   )clone)r]   kvr;   r;   r<   
<dictcomp>  s      z5FSDPTest._train_for_several_steps.<locals>.<dictcomp>)*offload_paramsnextrt   rE   r#   rR   optimZSGDri   Z	zero_gradry   ampre  r   rF   r   rr   rS   rz   r   r   r   r>  rI   toscaler   Zfloat32rj   Zfloat16Zparam_dtyperL   stepupdater{   r   rx   Zload_state_dictZ_assert_stater   ZIDLEr[   )rD   rW   rd  re  rf  rg  rh  ri  rj  rk  rl  Zcpu_offload_paramsZmodel_deviceZsharded_grad_scalerrt  rc   rG   rH   r   rK   r{   r;   r;   r<   _train_for_several_steps  sn    




z!FSDPTest._train_for_several_stepsr   T)model_classr   r   ref_init_fn	num_itersrh  r|   rU  r   ri  rW  use_orig_paramsrj  rk  init_kwargsrl  c           "      K   s  |t jkstd|dkri }d}| j }|j| jt jtjfddi|}|dkrft||g|d}n||}|rz|	 }| j
|||
dk	|||
|||d	}t| }||||	|
||d z"|j| j|||fddi|}W n> tk
r } ztd	| d
t| |W 5 d}~X Y nX t|ts<t|| jf|}|rJ|	 }|tjkr^| }|dk	ol|j}|o||tjk}|o|tjk}|rtd}| D ]}| |j| q|r| tdnt }|$ | j
||d||||
|||d
} W 5 Q R X |rdS |rBtd}| D ]}| |j| q$|  } t|}!tjj|| dd |
dkr~|s~| j||!ddd dS )a  
        Tests FSDP training against a reference, which defaults to DDP but
        may be customized with ``ref_init_fn``.

        Args:
            model_class (Type[FSDPTestModel]): A model class that inherits from
                ``FSDPTestModel``, which defines the expected interface.
            fsdp_init_mode (FSDPInitMode): The mode to initialize the
                FSDP-wrapped model. This should not be ``NO_FSDP``.
            ref_init_fn (Optional[Callable]): A callable to invoke that wraps a
                non-wrapped model to construct the reference model, where this
                wrapper should provide data parallel semantics. If ``None``,
                then the callable defaults to the DDP constructor.
        z.Expects an FSDP init mode that wraps with FSDPNrc  r   T)rZ  Zoutput_device)re  rf  rg  ri  rj  rk  rl  )r|   rU  r   ri  rW  r~  zInitializing z raised error r\   zSAn FSDP-managed module with parameter CPU offloading enabled has parameters on cudaF)re  rf  rg  rh  ri  rj  rk  rl  )Zcheck_dtypezFSDP did not match DDP)Zexact_devicemsg)r5   r9   rj   rX   r   rP   r=   r>   DDPrz   rz  r   rt   ry  	Exceptionr   r   r   rr   r?   ry   rr  rR   rE   r>  assertRaisesRegexr\  r   r   ZtestingZassert_close)"rD   r{  r   r   r|  r}  rh  r|   rU  r   ri  rW  r~  rj  rk  r  rl  r   rf  r   rW   Z	ref_modelZref_lossZ
ddp_paramsr   ra  rr  Zexpects_device_errorZexpects_cpu_deviceZ
cpu_devicer_   contextZ	fsdp_lossZfsdp_unsharded_paramsr;   r;   r<   _test_fsdp_parity  s    #


,
	
zFSDPTest._test_fsdp_parity)rc  NFNFFN)#r6   r7   r8   rE  rL  rl   rX   rS  rT  rV  rX  r0   classmethodrb  rU   rV   r   r   floatr   r   r!   r   r   r
   rz  r   r@   r5   r=   r   r    r"   r  r   r;   r;   r   r<   rM  I  s~   



0       \rM  compile_compute_on_modulec                    s.   fddG dd dt   fdd}|S )Nc                     s8   t jjjj| |  d ks(t| d  r4| d   d S r   )rR   r   _composabler  r   r   r   rO   r  r;   r<   !fully_shard_with_compiled_compute  s    
 z=test_compiled_fsdp.<locals>.fully_shard_with_compiled_computec                   @   s   e Zd Ze Ze ZdS )z*test_compiled_fsdp.<locals>.FullyShardModeN)r6   r7   r8   r   EAGERCOMPILED_COMPUTEr;   r;   r;   r<   FullyShardMode  s   r  c                    s   t   fdd}|S )Nc                     s   t jjjj} D ]}| jkr0t s0td qt j	j
j}t jj
j}t j  | jkr^|}n2| jkrdt j	j
_dt jj
_}ntd| |j|j< | | t j  |j|j< |t j	j
_|t jj
_qd S )Nz0Inductor on GPU needs Triton and recent GPU archTrf   z!Need to implement FullyShardMode=)rR   r   r  r  r   r  r4   warningswarnrH  configZskip_fsdp_hooksZ	_inductorZcompile_threadsr*  r  NotImplementedError__globals__r6   )rM   rN   Zoriginal_fully_shardmodeZoriginal_skip_fsdp_hooksZoriginal_compile_threadsZfully_shard_patch)r  r  funcr;   r<   wrapper  s.    










z6test_compiled_fsdp.<locals>.decorator.<locals>.wrapperr   )r  r  )r  r  )r  r<   	decorator  s     z%test_compiled_fsdp.<locals>.decorator)r   )r  r  r;   )r  r  r  r<   test_compiled_fsdp  s    $r  c                       s$   e Zd Z fddZdd Z  ZS )
SkipModulec                    s    t    tjdddd| _d S N
   Fr  )r   r   rU   r   linr   r   r;   r<   r     s    
zSkipModule.__init__c                 C   s
   |  |S rb   )r  r   r;   r;   r<   r     s    zSkipModule.forwardr   r;   r;   r   r<   r    s   r  c                       s$   e Zd Z fddZdd Z  ZS )NestedLinearc                    sD   t    |r*ttjdddd | _ntjdddd | _d S r  )r   r   r&   rU   r   ry   nested_linear)rD   	fsdp_wrapr   r;   r<   r     s    
zNestedLinear.__init__c                 C   s
   |  |S rb   )r  r   r;   r;   r<   r     s    zNestedLinear.forwardr   r;   r;   r   r<   r    s   r  c                       s$   e Zd Z fddZdd Z  ZS )	SkipModelc                    s@   t    tjdddd | _t  | _tt	|d| _
d S )Nr  Fr  )r  )r   r   rU   r   ry   linearr  linear_skipr&   r  r  )rD   Zdouble_nestr   r;   r<   r     s    
zSkipModel.__init__c                 C   s"   |  |}| |}| |}|S rb   )r  r  r  r   r;   r;   r<   r     s    


zSkipModel.forwardr   r;   r;   r   r<   r    s   r  )FT)FF)T)r;   )N)
contextlibrO  r   r]  r  abcr   r   r   copyr   enumr   r   	functoolsr	   typingr
   r   r   r   r   r   r   r   r   Zunittestr   rR   Ztorch.distributedr   rh   Ztorch.nnrU   Ztorch.nn.functionalr   r  Ztorch.distributed._composabler   Z"torch.distributed._composable.fsdpr   Z4torch.distributed._composable.fsdp._fsdp_param_groupr   r   Ztorch.distributed._tensorr   r   r   Ztorch.distributed.device_meshr   Ztorch.distributed.fsdpr   r   rr   Z$torch.distributed.fsdp._common_utilsr   Z"torch.distributed.fsdp._init_utilsr   Z2torch.distributed.fsdp.fully_sharded_data_parallelr    r!   r"   Z*torch.distributed.fsdp.sharded_grad_scalerr#   Ztorch.distributed.fsdp.wrapr$   r%   r&   Z!torch.distributed.tensor.parallelr'   r(   r)   r*   r+   r,   Ztorch.nn.parallel.distributedr-   r  Z*torch.testing._internal.common_distributedr.   r/   r0   r1   Z$torch.testing._internal.common_utilsr2   r3   Ztorch.utils._tritonr4   r5   r=   rV   r@   r   rp   r   rx   r}   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r	  r   r  r%  contextmanagerr+  r-  r/  r1  r3  r5  r8  r   rA  rB  rM  typer  r  r  r  r;   r;   r;   r<   <module>   s   ,		!  
	 ^"`F	 
 - 
!  K3	