U
    yh                     @   s  d dl Z d dlmZmZ d dlmZ d dlmZ d dlm	Z	m
Z
mZmZmZmZmZmZ d dlZd dlmZ d dlmZ ddlmZmZmZ dd	lmZ d
dddddgZe eZ G dd deZ!G dd deZ"G dd deZ#d"eej$ ee% dddZ&d#eej$ ee% ee'ej(f dddZ)G dd
 d
e#Z*G dd de*Z+G dd de*Z,G dd de#Z-G d d de-Z.G d!d de-Z/dS )$    N)ABCabstractmethod)defaultdict)Enum)AnyCallableDictList
NamedTupleOptionalTupleUnion)record_function   )merge_chunkssplit_args_kwargs_into_chunksTensorChunkSpec)_PipelineStageBasePipelineScheduleSinglePipelineScheduleMultiSchedule1F1BScheduleGPipeScheduleInterleaved1F1BScheduleLoopedBFSc                   @   s   e Zd ZdZdZdd ZdS )_ComputationTyper      c                 C   s   | t jkrdS dS d S )NFB)r   FORWARDself r!   X/var/www/html/venv/lib/python3.8/site-packages/torch/distributed/pipelining/schedules.py__str__"   s    
z_ComputationType.__str__N)__name__
__module____qualname__r   BACKWARDr#   r!   r!   r!   r"   r      s   r   c                   @   s.   e Zd ZU eed< eed< eed< dd ZdS )_Actioncomputation_typemicrobatch_indexstage_indexc                 C   s   | j  | j d| j S )NZ_s)r)   r*   r+   r   r!   r!   r"   __repr__.   s    z_Action.__repr__N)r$   r%   r&   r   __annotations__intr,   r!   r!   r!   r"   r(   )   s   
r(   c                
   @   s*  e Zd Zdeeedejf  eee	df  ee
ee	f  eee
eef ee f  dddZdd Zdd	 Zd
d Zedee ee ee ee dddZedddee dddZdee ee ee ee dddZdd Zd eedf ee
eef  dddZee edddZdS )!_PipelineScheduleN.n_microbatchesloss_fnargs_chunk_speckwargs_chunk_specoutput_merge_specc                 C   sH   || _ || _|| _|| _|| _| jd k	| _g | _td| j	j
  d S )NzUsing )_n_microbatches_loss_fn_args_chunk_spec_kwargs_chunk_spec_output_merge_spec_has_backward_internal_lossesloggerinfo	__class__r$   )r    r1   r2   r3   r4   r5   r!   r!   r"   __init__3   s    	z_PipelineSchedule.__init__c                 C   s,   |j r(| jr(| ||| }| j| d S N)is_lastr;   _compute_lossr<   append)r    stageoutput
target_mbsmb_indexlossr!   r!   r"   _maybe_compute_lossP   s    z%_PipelineSchedule._maybe_compute_lossc                 C   sj   d|  kot | jk n  }|jr8| jr8|r8| j| S t | jdkrb|sbtd| d| j nd S d S )Nr   zLoss for microbatch z6 is not available. Available losses for microbatches: )lenr<   rB   r;   RuntimeError)r    rE   rH   Zvalid_indexr!   r!   r"   _maybe_get_lossU   s    
z!_PipelineSchedule._maybe_get_lossc                 C   s|   t |ts|g}tdd |D }|rn|dk	rnt| j| jkrZtd| j dt| j |  || j | j  dS )zB
        Update the losses to those in the internal state
        c                 s   s   | ]}|j V  qd S rA   )rB   .0rE   r!   r!   r"   	<genexpr>h   s     z3_PipelineSchedule._update_losses.<locals>.<genexpr>N
Expecting z losses but got )	
isinstancelistanyrK   r<   r6   rL   clearextend)r    stageslossesZcontains_last_stager!   r!   r"   _update_lossesa   s    
z _PipelineSchedule._update_lossesarg_mbs	kwarg_mbsrG   rX   c                 C   s   t dS )z
        Run one iteration of the pipeline schedule with list of microbatches.
        Will go through all the microbatches according to the schedule
        implementation.

        Args:
            microbatches: list of microbatch args.
        NNotImplementedError)r    r[   r\   rG   rX   r!   r!   r"   _step_microbatchesx   s    z$_PipelineSchedule._step_microbatchestargetrX   rX   c                O   s   t dS a  
        Run one iteration of the pipeline schedule with *whole-batch* input.
        Will chunk the input into microbatches automatically, and go through the
        microbatches according to the schedule implementation.

        args: positional arguments to the model (as in non-pipeline case).
        kwargs: keyword arguments to the model (as in non-pipeline case).
        target: target for the loss function.
        losses: a list to store the losses for each microbatch.
        Nr]   )r    ra   rX   argskwargsr!   r!   r"   step   s    z_PipelineSchedule.stepc                    s   t d fdd}|dk	r&||d ndg j }|dk	rF||d ni g j }|dk	rd||d |dk	rt|tstd	t| ||fS )
z*
        Pre-process/check inputs
        )namec                    sR   t | ts t| dt|  t|  jkrNtd j d| dt|  d S )Nz must be a list but got a rQ    z	 but got )rR   rS   	TypeErrortyperK   r6   
ValueError)Zmbsrg   r   r!   r"   check_type_and_len   s    
z;_PipelineSchedule._check_inputs.<locals>.check_type_and_lenNr[   r!   r\   rG   z losses must be a list but got a )strr6   rR   rS   ri   rj   )r    r[   r\   rG   rX   rl   r!   r   r"   _check_inputs   s    

z_PipelineSchedule._check_inputsc                 C   s   |  ||S rA   )r7   )r    rF   ra   r!   r!   r"   rC      s    z_PipelineSchedule._compute_loss)rd   re   c                 C   sF   |s|r*t ||| j| j| j\}}||fS dg| j i g| j fS dS )zj
        Splits a full-batch input into chunks (i.e. microbatches) and returns
        the chunks
        r!   N)r   r6   r8   r9   )r    rd   re   
args_splitkwargs_splitr!   r!   r"   _split_inputs   s    	z_PipelineSchedule._split_inputs)output_chunksreturnc                 C   s   t || jS )z
        Merge output chunks back to a batch state.
        If output_merge_spec is None, the utility will merge output chunks by dimension 0 (batch dim).
        )r   r:   )r    rr   r!   r!   r"   _merge_outputs   s    z _PipelineSchedule._merge_outputs)NNNN)NNNN)NNNN)N)r$   r%   r&   r.   r   r   torchZTensorr   r   r   rm   r   r   r@   rJ   rM   rY   r   r	   r_   rf   rn   rC   rq   rt   r!   r!   r!   r"   r/   2   sX               & 
r/   )p2p_opsdescc                 C   sD   t | dkrdS |r| dnd}td| |   t|  S )zt
    Simple wrapper over batch_isend_irecv from torch.distributed, which just adds a descriptive logger on top.
    r   Nz,  z
batch_p2p )rK   r=   debugdistZbatch_isend_irecvpop)rv   rw   Zdesc_strr!   r!   r"   
_batch_p2p   s
    r|   )rv   rw   rs   c                 C   s`   t t}i }t| dkr|S | D ]}||j | q t| D ]\}}t||d||< qB|S )z
    Sorts the list of P2P ops by the peer rank, and then calls
    batch_isend_irecv. Return a dictionary of works by peer rank. This function
    helps us avoid hangs in case of skip connections.
    r   rw   )r   rS   rK   peerrD   sorteditemsr|   )rv   rw   Zops_by_peerZwork_by_peeropr~   opsr!   r!   r"   _sorted_batch_p2p   s    r   c                       s   e Zd ZdZdeeee eee	df  ee
ee	f  eee
eef ee f  d fddZdddee dd	d
Z  ZS )r   z
    Base class for single-stage schedules.
    Implements the `step` method.
    Derived classes should implement `_step_microbatches`.
    N.)rE   r1   r2   r3   r4   r5   c                    sL   t  j|||||d || _|j| _| j| j_|| | jrH|| d S )Nr0   )	superr@   _stage
num_stages_num_stagesr;   has_backward_prepare_forward_infra_prepare_backward_infra)r    rE   r1   r2   r3   r4   r5   r?   r!   r"   r@     s    


zPipelineScheduleSingle.__init__r`   rb   c                O   sh   | j   | ||\}}|dk	r6tt|| j}nd}| |||| | j jr`| 	| j j
S dS dS rc   )r   clear_runtime_statesrq   rS   ru   tensor_splitr6   r_   rB   rt   rr   )r    ra   rX   rd   re   ro   rp   targets_splitr!   r!   r"   rf   -  s    
zPipelineScheduleSingle.step)NNNN)r$   r%   r&   __doc__r   r.   r   r   r   r   r   rm   r   r   r@   r	   rf   __classcell__r!   r!   r   r"   r   	  s   
    c                   @   s6   e Zd ZdZdee ee ee ee dddZdS )r   z^
    The GPipe schedule.
    Will go through all the microbatches in a fill-drain manner.
    NrZ   c              
   C   s  |  ||||\}}g }t| jD ]}td| r | j|}t|dd}| D ]}	|	  qV| j	||| || }
| j
|}t|dd}||  W 5 Q R X td| jj d|  | | j|
|| q"|D ]}	|	  q| jsdS g }t| jD ]}td| z | j|}t|d	d}| D ]}	|	  q8| | j|}| jj||d
 | j|}t|dd}||  W 5 Q R X td| jj d|  q| | j| |D ]}	|	  qdS )z
        Run one iteration of the pipeline schedule with list of microbatches.
        Will go through all the microbatches according to the GPipe schedule.

        Args:
            microbatches: list of microbatch args.
        zForward fwd_recvr}   fwd_send[z] Forwarded microbatch Nz	Backward bwd_recvrI   bwd_sendz] Backwarded microbatch )rn   ranger6   r   r   get_fwd_recv_opsr   valueswaitforward_one_chunkget_fwd_send_opsrV   r=   ry   r+   rJ   r;   get_bwd_recv_opsrM   backward_one_chunkget_bwd_send_opsrY   )r    r[   r\   rG   rX   Zfwd_sends_to_waitir   ZworksZworkrF   Zbwd_sends_to_waitrI   r!   r!   r"   r_   U  sL    

z ScheduleGPipe._step_microbatches)NNNNr$   r%   r&   r   r   r	   r_   r!   r!   r!   r"   r   O  s       c                   @   s6   e Zd ZdZdee ee ee ee dddZdS )r   zo
    The 1F1B schedule.
    Will perform one forward and one backward on the microbatches in steady state.
    NrZ   c                 C   sP  |  ||||\}}t| j| j| jj }d}d}d}g }	t|D ]}
| j|}t|dd }rj|	  | j
||| || }|r|	  | j|}	||d krt|	dd}| | j||| |d7 }qB| j|}t|	| dd }r|	  | | j|}| jj||d | j|}|d7 }|| jkr8q| j|}t|| d	d }rb|	  | j
||| || }| | j||| | j|}	|d7 }qt|d
d}|| jk r0| j|}t|dd }r|	  | | j|}| jj||d |r|	  | j|}t|d
d}|d7 }q|r>|	  | | j| dS )z
        Run one iteration of the pipeline schedule with list of microbatches.
        Will go through all the microbatches according to the 1F1B schedule.

        Args:
            microbatches: list of microbatch args.
        r   Nr   r}   r   r   Zfwd_send_bwd_recvr   Zbwd_send_fwd_recvr   r   )rn   minr6   r   r   r+   r   r   r|   r   r   r   rJ   r   rM   r   r   rY   )r    r[   r\   rG   rX   Zwarmup_chunksfwd_mb_indexbwd_mb_indexZ	send_workZ	fwd_sends_Z	fwd_recvsZ	recv_workrF   Z	bwd_recvsZ	fuse_workrI   Z	bwd_sendsr!   r!   r"   r_     sh    

zSchedule1F1B._step_microbatches)NNNNr   r!   r!   r!   r"   r     s       c                       s   e Zd ZdZdee eee ee	e
df  eeee
f  eeeeef e	e f  d fddZdddee dd	d
Zdee ee ee ee dddZ  ZS )r   zQ
    Base class for multi-stage schedules.
    Implements the `step` method.
    N.rW   r1   r2   r3   r4   r5   c                    s   t |dkrtdt | t j|||||d | _|d j _|d j _|d j	 _
 jD ]} j|_qd fdd _i  _ jD ]}||  jr|| qd S )Nr   z9Multi-stage schedule expects at least two stages but got r0   r   c                    s   | j o jd k	S rA   )rB   r7   )rE   r   r!   r"   <lambda>O      z0PipelineScheduleMulti.__init__.<locals>.<lambda>)rK   rk   r   r@   _stagesr   r   
group_sizepp_group_size
group_rankrankr;   r   Z_should_compute_losspipeline_orderr   r   )r    rW   r1   r2   r3   r4   r5   rE   r   r   r"   r@   0  s0    	




zPipelineScheduleMulti.__init__r`   rb   c          	      O   sz   | j D ]}|  q| ||\}}|dk	r@tt|| j}nd}| |||| | j D ]}|jrZ| 	|j
  S qZdS rc   )r   r   rq   rS   ru   r   r6   r_   rB   rt   rr   )	r    ra   rX   rd   re   rE   ro   rp   r   r!   r!   r"   rf   \  s    


zPipelineScheduleMulti.steprZ   c                 C   sH  |  ||||\}}dd | jD }| jd | j }| jd | j }t| j| j D ]\}}	| j| }
| j| }g }|	dk	r"|	\}}}|tjkr|| }|||| || }| 	|||| |
|| nN|tjkr|| }| ||}|j||d |
|| ntd| d}|t|
k r<|
| }|dk	r|\}}}|tjkr|| jd kr||d  }|
|| n|tjkrntd| d}|t|k r|| }|dk	r$|\}}}|tjkrnB|tjkr|dkr$||d  }|
|| ntd| |rTt|  qT| | j| dS )z
        Operate on the microbatches for looped schedules (multiple stages on each rank).

        TODO: Does not use sorted_batch_isend_irecv(). As a result, this schedule does
        not support models with skip connections.
        c                 S   s   i | ]}|j |qS r!   )r+   rN   r!   r!   r"   
<dictcomp>  s     z<PipelineScheduleMulti._step_microbatches.<locals>.<dictcomp>r   Nr   zUnknown computation type r   )rn   r   r   r   	enumerater   r   r   r   rJ   rV   r   r'   rM   r   r   rk   rK   r   r   r   r|   r   rY   )r    r[   r\   rG   rX   Zstage_index_to_stageZ	prev_rankZ	next_rankZ	time_stepactionZprev_rank_opsZnext_rank_opsr   r)   rH   r+   rE   rF   rI   Zprev_rank_actionZnext_rank_actionr!   r!   r"   r_     sl    




  




z(PipelineScheduleMulti._step_microbatches)NNNN)NNNN)r$   r%   r&   r   r	   r   r.   r   r   r   r   r   rm   r   r   r@   rf   r_   r   r!   r!   r   r"   r   *  s0   	    ,%    c                	       sV   e Zd ZdZdee eee ee	e
eef ee f  d fddZdd Z  ZS )	r   ai  
    Breadth-First Pipeline Parallelism.
    See https://arxiv.org/abs/2211.05953 for details.
    Simliar to Interleaved 1F1B, Looped BFS supports multiple stages per rank.
    What is different is that when microbatches are ready for multiple local
    stages, Loops BFS will prioritizes the earlier stage, running all available
    microbatches at once.
    NrW   r1   r2   r5   c                    sB   t  j||||d i | _t| jD ]}| |}|| j|< q$d S )Nr   )r   r@   r   r   r   !_calculate_single_rank_operations)r    rW   r1   r2   r5   r   rank_opsr   r!   r"   r@     s    

zScheduleLoopedBFS.__init__c           	      C   s   t | j}t|| j| | j}g }t|D ]}|d  q*|D ](}t| jD ]}|ttj|| qLq>d| jd |  }|	d g|  t
|D ],}t
t| jD ]}|ttj|| qq|S )Nr   r   )rK   r   r   r   rD   r6   r(   r   r   rV   reversedr'   )	r    r   n_local_stagesZstage_indicesr   r   r+   rH   post_warmup_opsr!   r!   r"   r     s,    
  z3ScheduleLoopedBFS._calculate_single_rank_operations)NN)r$   r%   r&   r   r	   r   r.   r   r   r   r   rm   r   r   r@   r   r   r!   r!   r   r"   r     s     c                       s   e Zd ZdZd
ee eee ee	e
df  eeee
f  eeeeef e	e f  d fddZeee  ddd	Z  ZS )r   ak  
    The Interleaved 1F1B schedule.
    See https://arxiv.org/pdf/2104.04473 for details.
    Will perform one forward and one backward on the microbatches in steady
    state and supports multiple stages per rank. When microbatches are ready for
    multiple local stages, Interleaved 1F1B prioritizes the earlier microbatch
    (also called "depth first").
    N.r   c           	         s   |d j | _|| j dkr2td| d| j dt j||||||d t|| _|d j| _|d j	| _	i | _
t| jD ]}| |}|| j
|< q|d S )Nr   z?Interleaved 1F1B schedule requires the number of microbatches (zD)                 to be a multiple of the number of pipeline ranks (z).r   )r   r   rk   r   r@   rK   r   r   r   groupr   r   r   )	r    rW   r1   r2   r3   r4   r5   r   r   r   r!   r"   r@   &  s,    		

z ScheduleInterleaved1F1B.__init__)rs   c                    s  fdd}| j j }| }|| }| | }td |||  fdd} fdd}tt}	tt}
g }t D ]}|d  qj j djd	        }t|D ]}|k r(||}|	|  }d	 |	|< |t	t
j|| |d	 kr|d g|  qȈ|  krD| k rn nb||}|	|  }d	 |	|< |t	t
j|| ||}|
|  }d	 |
|< |t	t
j|| q|d  ||}|
|  }d	 |
|< |t	t
j|| qtj  d	 D ]}|d  q|S )
Nc                    s8    j d  j }|d jd |    }t| j j  S )Nr   r   )r   r   r   r6   )r   Zwarmups_ops_last_stage
warmup_opsr   r!   r"   get_rank_warmup_opsN  s    zVScheduleInterleaved1F1B._calculate_single_rank_operations.<locals>.get_rank_warmup_opsz=rank %s, warmup_ops %s, 1f1b %s, cooldown_ops %s total_ops %sc                    s   | j  j }|j    S rA   )r   r   rf   Zlocal_index)r   r    r!   r"   forward_stage_indexj  s    zVScheduleInterleaved1F1B._calculate_single_rank_operations.<locals>.forward_stage_indexc                    s,   j d |  j j   }|j   S )Nr   )r   r   r   r   r    r   r!   r"   backward_stage_indexo  s    zWScheduleInterleaved1F1B._calculate_single_rank_operations.<locals>.backward_stage_indexr   r   )r   r6   r=   ry   r   r.   r   rD   r   r(   r   r   rV   r'   )r    r   r   Zmicrobatch_opsZfwd_bwd_opsZcooldown_opsZ	total_opsr   r   Zfwd_stage_mb_indexZbwd_stage_mb_indexr   r   r   r   Zfwd_stage_indexrH   r   Zbwd_stage_indexr   r!   r   r"   r   M  s~    





 



z9ScheduleInterleaved1F1B._calculate_single_rank_operations)NNNN)r$   r%   r&   r   r	   r   r.   r   r   r   r   r   rm   r   r   r@   r(   r   r   r!   r!   r   r"   r     s       ')N)N)0loggingabcr   r   collectionsr   enumr   typingr   r   r   r	   r
   r   r   r   ru   Ztorch.distributeddistributedrz   Ztorch.profilerr   Z
microbatchr   r   r   rE   r   __all__	getLoggerr$   r=   r   r(   r/   ZP2POprm   r|   r.   ZWorkr   r   r   r   r   r   r   r!   r!   r!   r"   <module>   sH   (	
	 2  FT  3@