U
    yh                     @   s  d dl Z d dlmZ d dlmZ d dlmZmZmZm	Z	m
Z
mZ d dlZd dlmZ d dlZd dlmZ d dlm  mZ d dlmZ d dlmZmZmZmZ d dlmZmZm Z m!Z! d dl"m#Z# d d	l$m%Z% d d
l&m'Z'm(Z(m)Z) d dlm*Z* d dl+m,Z, d dl-m.Z. d dl/m0Z0 ej1j2Z2ej34ddZ5e56d ej34ddZ7e78ddd d G dd deZ9G dd deZ:G dd deZ;eee<ej=f dddZ>d9ee?e
e	e(  ed d!d"Z@d:ee
e	e(  ed#d$d%ZAeed&d'd(ZBd;e*e?e?ee?eejCe f d)d*d+ZDe9jEfe*e?e?eejCe f e9dd,d-d.ZFee(ed/d0d1ZGe*e*d2d3d4ZHe*ejIe
ejJjK ee<ej=f ee<ef eed5f ee<ef ee9e?e*d6d7d8ZLdS )<    N)contextmanager)Enum)AnycastDictListOptionalTuple)BatchDimAnalyzer)
DeviceMeshdistribute_tensor	ReplicateShard)
OpStrategyPlacementStrategyStrategyTypeTupleStrategyredistribute_local_tensor)compute_local_shape)_PartialDTensorSpec	Placement)GraphModule)make_fx)_extract_tensor_metadata)NamedMemberAccessor_spmdZDEFztag_grad(Tensor self) -> TensorZIMPLtag_gradc                 C   s   | S N )xr    r    W/var/www/html/venv/lib/python3.8/site-packages/torch/distributed/_spmd/data_parallel.py<lambda>(       r#   ZCompositeExplicitAutogradc                   @   s   e Zd ZdZdZdZdZdS )DataParallelStyleaY  This enum represents the style of the data-parallel operation.

    We have three types of Data Parallel style:
    1. DEFAULT: the default data parallel style, which is to represent a mixed
                replicate and fully shard behavior. For each parameter that is able
                to be sharded evenly, we shard it, otherwise we would replicate the
                parameter. This style avoids potential padding if the parameters
                cannot be sharded evenly, but it would generate a mixed of all_reduce
                and reduce_scatter.
    2. REPLICATE: the data parallel style that replicates all model parameters.
                  This is similar to the behavior of DistributedDataParallel.
    3. FULLY_SHARD: the data parallel style that shards all model parameters. This
                    is similar to the behavior of FullyShardedDataParallel, the
                    difference is that FullyShardedDataParallel (ZERO-3), which
                    shards the model using FlatParameter based sharding,
                    while this style shards each parameter into DTensor.
    r         N)__name__
__module____qualname____doc__DEFAULT	REPLICATEFULLY_SHARDr    r    r    r"   r%   +   s   r%   c                   @   s$   e Zd ZdZdZdZdZdZdZdS )NodeTypezNodeType is an enum that records the type of the tensors in the graph.

    This is used to determine the data parallel strategy.
    r   r&   r'         N)	r(   r)   r*   r+   PARAMACTGRADSTATE
NON_TENSORr    r    r    r"   r/   C   s   r/   c                       sB   e Zd ZdZd	eee ed fddZe	d fddZ
  ZS )
DataParallelStrategya	  DataParallelStrategy is a special case of OpStrategy that only records the "data parallel style" placement
    strategy for each fx Node.

    It takes a list of PlacementStrategy, where each PlacementStrategy describes
    one way to distribute the tensor and computation. In the DataParallel case,
    there're two possible ways to distribute the parameters:
        1. replicate the parameter over a set of devices (DDP like behavior)
        2. shard the parameter on its tensor dimension 0 over a set of devices
           (FSDP like behavior).

    In addition to the strategy list, we also need to:
    1. `node_type`: record the type of each node in the graph, so that we can
        determine how to propagate in a data parallel fashion.
    2. `reduce_over_batch` is specifically tied to data parallel as the loss
        calculation usually results in scalar tensor where it comes from a
        reduction over the batch dimension. We need to know this information
        so that we could keep the output as sharded.
    F)	node_typestrategy_listreduction_over_batchc                    s   t  | || _|| _d S r   )super__init__r8   r:   )selfr8   r9   r:   	__class__r    r"   r<   d   s    zDataParallelStrategy.__init__)returnc                    s   d| j  dt   S )Nztype: z, )r8   r;   __str__)r=   r>   r    r"   rA   n   s    zDataParallelStrategy.__str__)F)r(   r)   r*   r+   r/   r   r   boolr<   strrA   __classcell__r    r    r>   r"   r7   P   s    
r7   )paramsc              	   c   sP   g }z2|  D ]}|tjjj}|| qdV  W 5 |D ]}|   q<X dS )zTag the gradient of the parameters with a special tag, so that we can identify them during SPMD expansion.

    It's safe to trace those hooks and we would remove those nodes later.
    N)removevaluesregister_hooktorchopsr   r   append)rE   Ztagging_hookshpr    r    r"   gradients_taggingr   s    
rN   )mesh	shard_diminput_specsr@   c                 C   s   t t| t|fd|dS )z8Util function to generate a shard strategy on shard_dim.rO   
placementsoutput_specsrQ   )r   r   r   )rO   rP   rQ   r    r    r"   _gen_shard_strategy   s    rV   )rO   rQ   r@   c                 C   s   t t| t fd|dS )z/Util function to generate a replicate strategy.rR   rT   )r   r   r   )rO   rQ   r    r    r"   _gen_replicate_strategy   s    rW   )rO   r@   c                 C   s   t t| tdfddS )z-Util function to generate a partial strategy.avgrR   )rU   )r   r   r   )rO   r    r    r"   _gen_partial_strategy   s    	rY   )train_step_graph
num_params
num_statesrO   	batch_dimr@   c           ,         sr  || }t jjt jjt jjt jjt jjt jjtj	j
jjtjg}t jjg}i  t|}d}	d}
tt| jjD ]z}|jtj	j
jjkrl|}|j|kr|jd }t|}ttj|g |< q|
d7 }
||jd  | j| |
|krl qql| jjD ]z}|jdkrd|jkr ttjg  |< n|	|k rRt|d}t |}ttj!||g |< n|	|k rt |}t|d}ttj"||g |< nN|jd j#| }|j$dkr|%| |&|| t||}ttj'|g |< |	d7 }	q|jdkr<|j|krF|jtj	j
jjkst(|j)}t*|dks:t(d| dt*|j  |d  }|jtjkr|jd }t+|t,rz|j-|  |< n| |< qt+|tst(|j.}|tj!krt |}ttj!|g |< q|tjkrt|}ttj|g |< q|tj'kr4|/|d |}|/||}t0||gd	}ttj'|g |< qt1d
| dq|j)}g }| kr | }t+|tsrt(|j.}|tjkst(|j2}d}|D ]J} | }t+|tst(|j.}|tj'krd}|/||}|3| q|r:t*|dkst(||d _4ql|j|krt*|jd } g }!t5| D ]}"t+|j|" ts^t1dt6|j|"  dt*|j|" dkr. |j|" d  }t+|tst(|j.tj!tjtj"fkst(dt |}t|dd}t|j.||g}#|!3|# q.t,t7|!}$|$ |< ql fdd|D }%tj|%krt |}t|dd}tj!}&dd |%D }'|'d }&|'D ]&}(|(|&ksPt(d|& d|( dqP|&tj!tj"fkst(d|& dt|&||g |< qltj"|%krt |}t|dd}tj!|%krtj!ntj"}&t|&||g |< qltj!|%krtj'|%kr|D ]|} | }t+|ts6t(|j.}|tj'kr`|/||}|3| n4|tj!kr|3t8|t9 fd nt1d|% dq|/||}t0||d	})ttj'|)g |< n |d   |< nd|D ]4} | }t+|tst(|/||}*|3|* q|/||}t0||d	}+ttj'|+g |< q|jdkrZttjg  |< qt1d|j dq S )zXLoop through the train step graph and build the data parallel strategy for each fx Node.r   r&   placeholdervalcall_functionz7non-compute op only support one input now, found node: z with length of inputs: rT   znon compute op not supporting z! FTz>Expecting list as arg to build Tuple Strategy, but found type !z:Expecting param/grad/state as arg to build Tuple Strategy!)rP   c                    s*   g | ]"}t  | trtt | jqS r    )
isinstancer7   r   r8   ).0argdp_strategy_mapr    r"   
<listcomp>  s   z2build_data_parallel_strategies.<locals>.<listcomp>c                 S   s   g | ]}|t jkr|qS r    )r/   r4   )rd   tr    r    r"   rh     s     
 z+Found more than one non grad types! Expect z but found zBExpecting output node type to be either state or param, but found rR   z8Expecting node with parameter and activation, but found outputop code  not supported):atenclonedefaultdetachZ	ones_likereshaperi   viewrI   rJ   r   r   operatorgetitemZ_fused_adamr
   reversedlistgraphnodestargetargsrY   r7   r/   r4   Zreplace_all_uses_withZ
erase_nodeopmetar6   rV   rW   r2   r5   shapeZbatch_dim_sizeZinit_batch_dim_sizeZset_batch_dimr3   AssertionErrorall_input_nodeslenrc   r   childsr8   Zcompute_act_specr   RuntimeError
strategiesrK   rQ   rangetypetupler   r   ),rZ   r[   r\   rO   r]   activation_idxZnon_compute_opsZtuple_strategy_opsZbatch_dim_analyzerplaceholder_idxZnum_param_gradnodeZcur_nodeZpartial_strategyZshard_strategyZreplica_strategyZactivation_batch_dim_sizeZinput_nodesZarg_strategyZgetitem_idxZarg_node_typeZpartial_sigZarg_node_specoutput_specZ
input_argsrQ   node_strategyr8   Zproduce_param_grad_stratZhas_activationre   Zact_specZoutput_strategy_lenZtuple_strategiesiZout_node_strategyZoutput_tuple_strategyZinput_node_typesZoutput_node_typeZnon_grad_typesZnon_grad_typeZact_strategyZ
input_specZop_strategyr    rf   r"   build_data_parallel_strategies   s   



 


 


 


 



 
 
   



 


 
  
  

  
r   )rZ   num_parametersr\   rg   parallel_moder@   c                 C   s\  || }d}| j jD ]@}|| }|jdkrt|ts:t|j}	|j}
|	tj	krVd}n||k rt
|
dksrtd|tjkr|
d }q|tjkrt
|
dkr|
d }q|
d }q|tjkrtdnt
|
dkstd|
d }||jd< |d7 }q|jdkr
t|trTtt|jd }|jD ](}t|ts6t|j|jks"tq"|j}
nt|tsdt|j}
t
|
d	kstd
t
|
dkr|
d |jd< njt
|
d	kr|tjkr|
d |jd< n$|tjkr|
d |jd< ntdntd| dt
|
 dq|jdkrDt|tr0|jtj	ks8tdd|jd< qtd|j dqdS )z8Mark the sharding for the nodes in the train_step_graph.r   r^   Nz#node_strategies should not be emptyr&   zdefault mode not implementedshardingra   r'   z.data parallel should have at most 2 strategieszdefault mode not supported yet!znode z strategy length z is not expected!rj   z output node should not be tensorrk   rl   )rw   rx   r{   rc   r7   r~   r8   r   r/   r6   r   r%   r-   r.   r,   NotImplementedErrorr|   r   r   r   r   )rZ   r   r\   rg   r   r   r   r   r   r8   Znode_strategiesnode_shardingZfirst_strategyZchild_strategyr    r    r"   mark_data_parallel_shardings  sr    














r   )r_   specr@   c                    s   t | tjr| }| jdkr|S t jD ]d\}}| r(tt|} j	j
|d} j	 }|dk	sjtd|| }|j||dddd | }q(|S t | ttfr|  fdd| D S td	t|  d
dS )zBUtil function to convert a full tensor val to its local component.r   )Zmesh_dimNzcurrent rank not in mesh!F)Zwith_padding
contiguousc                 3   s   | ]}t | V  qd S r   )_partition_val)rd   vr   r    r"   	<genexpr>a  s     z!_partition_val.<locals>.<genexpr>z	val type rl   )rc   rI   Tensorndim	enumeraterS   Zis_shardr   r   rO   sizeZget_coordinater~   Z_split_tensorr   rv   r?   r   r   )r_   r   Zlocal_shardidx	placementZ
num_chunksZmy_coordZmy_coord_on_mesh_dimr    r   r"   r   L  s2    


   r   )rw   r@   c                    s  t jjdt jjdt jjdt jjdt jjdt jjdt jjdi}| j	j
D ](}|jd }|dkr^qD|jdkr|j}t|dst|jd |}||jd< qD|jdkrJ|j}|j}t|jD ]\}}|jd }	|	j|dkr|n||   kr|jd	 _|jd	  _|jd }
tjtjd
 fdd}t||
}t|j	j
}|d }| j	| | j	j|j	||id}W 5 Q R X ||| q|jd }|jtjj jjkrt|tjstt |j!|j"|j#}|j$d jd j!}dd }|%d||| nF|j|kr8t|tjstt |j!|j"|j#}||j }|%|| t|||jd< qD|jdkr^ qpqDt&d| dqD| j	j
D ]R}d|jkr|jd= d|jkrxt|jd tjrxt'|jd }||jd	< qx| j	(  | )  | S )zOGraph partitioner that partitions the single device graph to distributed graph.r&   r   r   Nr^   Z
from_localr_   ra   tensor_meta)local_tensorr@   c                    s   t |  S r   r   )r   Zdesired_specZinput_arg_specr    r"   
reshard_fn  s
    zpartitioner.<locals>.reshard_fn)Zval_mapc                 S   sb   dgt |  }t | t | }tt | D ]2}||k rD| | ||< q*| | |||   ||< q*|S )Nr&   )r   r   )Zrepeated_shapeinput_shapeZrepeated_sizeZpadded_lengthr   r    r    r"   infer_repeat_sizes  s    z'partitioner.<locals>.infer_repeat_sizesrj   rk   rl   )*rm   Z_unsafe_viewro   expandZ	new_zerosZonesrq   rr   Zzerosrw   rx   r|   r{   r   hasattrr   rQ   r   r   r   rI   r   r   rv   Zinserting_beforeZ
graph_copyZreplace_input_withry   rJ   repeatrc   r~   r   r}   rO   rS   rz   Z
update_argr   r   Zlint	recompile)rw   Zshape_adjustment_opsr   r   Zout_specZ	local_valZexpected_input_specsr   Z	input_argZinput_arg_shardingZinput_arg_tensorr   Z
reshard_gmZreshard_gm_nodes
input_nodeZoutput_nodeZ
output_valZlocal_shaper   r   Zshape_arg_numZlocal_tensor_metar    r   r"   partitionerf  s           





 
    
 
r   .)rw   model	optimizerparams_buffersnamed_statesrz   kwargsrO   parallel_styleinput_batch_dimr@   c
                 C   s  t |}
t|}t |}| j }|r0|   t| |
|||	d}t| |
|||d t| }|jj	D ]n}||kr|| }t
|tr|j|jd< qt
|trtj|jd< qtd| qd|jd }|jd |jd< qdt|}| D ]\}}t }|tjkrtd}n|tjkr$td| dt|||g}| ||< ||| |dk	r||jkr|| }i }| D ]P\}}t
|tjr|j dkrt|||g}|||< | ||< n|||< qp|j!| ||j|< q|S )	zPartition the graph to into a data parallel graph.

    This function also shards/replicates the model parameters and optimizer states to DTensors.
    )rO   r]   )r   r\   rg   r   r8   zUnknown node strategy r   zparallel style z not supported yetN)"r   pytreeZtree_leavesrw   Zeliminate_dead_coder   r   r   r   rx   rc   r7   r8   r|   r   r/   r6   r   r   r   itemsr   r%   r.   r   r-   r   Zto_localZ
set_tensorstaterI   r   r   pop)rw   r   r   r   r   rz   r   rO   r   r   Znum_params_buffersZflattened_statesr\   changedZstrategy_mapZpartitioned_graphr   r   r   accessorZ	param_keyparamr   Zdtensor_paramZparam_statesZparam_dtensor_statesZ	state_keyZ	state_valZdtensor_stater    r    r"   partition_data_parallel  sj    

    	



r   )N)N)r   )Mrs   
contextlibr   enumr   typingr   r   r   r   r   r	   rI   Ztorch.fxZfxZtorch.libraryZtorch.nnnnZtorch.utils._pytreeutilsZ_pytreer   Z'torch.distributed._spmd.batch_dim_utilsr
   Ztorch.distributed._tensorr   r   r   r   Z$torch.distributed._tensor._op_schemar   r   r   r   Z'torch.distributed._tensor._redistributer   Z torch.distributed._tensor._utilsr   Z)torch.distributed._tensor.placement_typesr   r   r   r   Z"torch.fx.experimental.proxy_tensorr   Ztorch.fx.passes.shape_propr   Z%torch.nn.utils._named_member_accessorr   rJ   rm   libraryLibraryZ_spmd_lib_defdefineZ_spmd_lib_implimplr%   r/   r7   rC   r   rN   intrV   rW   rY   Noder   r.   r   r   r   ModuleZoptimZ	Optimizerr   r    r    r    r"   <module>   s    
"   
  

   VWw



