U
    Mh7                     @   st  U d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	Z	d dl
Z
d dlZd dlZd dlZd dlZd dlmZ d dlmZ d dlmZ d dlmZ d dlmZmZmZ d dlmZ d dlmZmZm Z m!Z!m"Z"m#Z#m$Z$m%Z% d d	l&m'Z' d dl(Z(d dl)Z(d dl*Z(d dl+m,Z- d dl.m/Z/ d d
l0m1Z1m2Z2m3Z3m4Z4m5Z5m6Z6m7Z7m8Z8m9Z9m:Z: d dl;m<Z<m=Z=m>Z> d dl?Z?ej@ejAd eBeCZDG dd deZEeEddeEddeEddeEddeEddeEddeEddeEddeEdd eEd!d"eEd#d$eEd%d&eEd'd(eEd)d*eEd+d,eEd-d.eEd/d0d1ZFeG d2d3 d3ZGd4d5 ZHd6d7 ZId8d9 ZJd:d; ZKd<d= ZLd>d? ZMd@dA ZNdBdC ZOdDdE ZPdFdG ZQdHdI ZRdJdK ZSdLdM ZTdNdO ZUdPdQ ZVdRdS ZWdTdU ZXe4dVdWdXedYdZdXd[dXfd\d]ZYe8rd^ZZne[e\d_d`ZZdadbiZ]e7rdce]dd< ddedfZ^e[dgdhdiZ_edjdk Z`de[e[e[dldmdnZae[ebdodpdqZcdade e	je efdr< de eb ddsdtduZgddgdvdwZhdxZiG dydz dze9Zjeebe"e# f e$e#d{d|d}Zkdalemdgd~dZndd ZodeZeifddZpG dd de9ZqG dd de/jrZsG dd de/jrZtedddZuG dd de(jvjwj9ZxG dd dejZyG dd de9ZzdS )    N)contextmanager)	dataclass)	timedelta)Enum)partialreducewraps)StringIO)Dict
NamedTupleOptionalUnionListAnyCallableTuple)patch)
FILE_SCHEMAfind_free_portIS_SANDCASTLEretry_on_connect_failuresskip_but_pass_in_sandcastleskip_but_pass_in_sandcastle_ifTEST_WITH_ROCMTEST_WITH_TSANTestCase	run_tests)_install_threaded_pg_uninstall_threaded_pgProcessLocalGroup)levelc                   @   s   e Zd ZU eed< eed< dS )TestSkip	exit_codemessageN)__name__
__module____qualname__int__annotations__str r*   r*   \/var/www/html/venv/lib/python3.8/site-packages/torch/testing/_internal/common_distributed.pyr!   7   s   
r!   H   z5Skipped because distributed backend is not available.I   z Skipped due to small world size.W   zSkipped due to odd world size.J   zCUDA is not available.K   zNeed at least 1 CUDA deviceM   zNeed at least 2 CUDA devicesP   zNeed at least 3 CUDA devicesQ   zNeed at least 4 CUDA devicesR   zNeed at least 5 CUDA devicesS   zNeed at least 6 CUDA devicesT   zNeed at least 7 CUDA devicesU   zNeed at least 8 CUDA devicesL   z#c10d not compiled with NCCL supportN   zTest skipped for ROCmO   z'Test skipped because no GPU peer accessV   zHTest skipped at subprocess level, look at subprocess log for skip reasonX   z"Test skipped due to missing import)Zbackend_unavailablesmall_worldsizeodd_worldsizeno_cudazmulti-gpu-1zmulti-gpu-2zmulti-gpu-3zmulti-gpu-4zmulti-gpu-5zmulti-gpu-6zmulti-gpu-7zmulti-gpu-8nccl
skipIfRocmZno_peer_accessgenericimporterrorc                   @   s   e Zd Zi Zdddhed< e ed< ddhed< ddhed< i Zdddhed	< dddhed
< dddhed< dddhed< e ed< dS )DistTestCasesr@   mpiZuccZallgather_coalescedr   zsendrecv anysourcezcpu barrierZglooZgpucudaZddpZsubgrouppluginN)r$   r%   r&   Zskip_collectivesetZbackend_featurer*   r*   r*   r+   rD   U   s   
rD   c                    s   t   fdd}|S )zSkips if the world size exceeds the number of GPUs, ensuring that if the
    test is run, each rank has its own GPU via ``torch.cuda.device(rank)``.c                     sV   t j sttd j ttj	d }t j
 |k rLttd|  j  | |S )Nr?   
WORLD_SIZE
multi-gpu-)torchrF   is_availablesysexit
TEST_SKIPSr"   r'   osenvirondevice_count)argskwargs
world_sizefuncr*   r+   wrapperk   s    
zskip_if_no_gpu.<locals>.wrapperr   rW   rX   r*   rV   r+   skip_if_no_gpug   s    	r[   c                    s   t   fdd}|S )Nc                     s:   t jd dkr0tt jd dkr0ttd j  | |S )NBACKENDrE   rI      r=   rP   rQ   r'   rM   rN   rO   r"   rS   rT   rV   r*   r+   rX   y   s     z(skip_if_small_worldsize.<locals>.wrapperrY   rZ   r*   rV   r+   skip_if_small_worldsizex   s    r`   c                    s   t   fdd}|S )Nc                     s>   t jd dkr4tt jd d dkr4ttd j  | |S )Nr\   rE   rI   r]      r>   r^   r_   rV   r*   r+   rX      s    $z&skip_if_odd_worldsize.<locals>.wrapperrY   rZ   r*   rV   r+   skip_if_odd_worldsize   s    rb   c                    s    fdd}|S )Nc                    s   t   fdd}|S )Nc                     s<    dkr.t j k r.ttd  j n
| |S d S Nr@   rJ   )rK   rF   rR   rM   rN   rO   r"   r_   )backendrW   nr*   r+   rX      s    zCrequire_n_gpus_for_nccl_backend.<locals>.decorator.<locals>.wrapperrY   rZ   rd   re   rV   r+   	decorator   s    z2require_n_gpus_for_nccl_backend.<locals>.decoratorr*   )re   rd   rg   r*   rf   r+   require_n_gpus_for_nccl_backend   s    
rh   c                  C   s   dd } | S )Nc                    s   t   fdd}|S )Nc                     sF   zddl m}m}  | |W S  tk
r@   ttd j Y nX d S )Nr   )AutoModelForMaskedLM
BertConfigrC   )Ztransformersri   rj   ImportErrorrM   rN   rO   r"   )rS   rT   ri   rj   rV   r*   r+   rX      s
    z?import_transformers_or_skip.<locals>.decorator.<locals>.wrapperrY   rZ   r*   rV   r+   rg      s    z.import_transformers_or_skip.<locals>.decoratorr*   )rg   r*   r*   r+   import_transformers_or_skip   s    rl   c                    s    fdd}|S )Nc                    s   t   fdd}|S )Nc                     s<   t j r"t j kr" | |S ttd  j d S )NrJ   rK   rF   rL   rR   rM   rN   rO   r"   r_   )rW   xr*   r+   rX      s    
z4skip_if_lt_x_gpu.<locals>.decorator.<locals>.wrapperrY   rZ   rn   rV   r+   rg      s    z#skip_if_lt_x_gpu.<locals>.decoratorr*   )rn   rg   r*   ro   r+   skip_if_lt_x_gpu   s    	rp   c                    s    fdd}|S )Nc                    s   t   fdd}|S )Nc                     sN    dkr| |S t j r4t j kr4| |S ttd  j d S rc   rm   r_   )rd   rW   rn   r*   r+   rX      s
    

z9nccl_skip_if_lt_x_gpu.<locals>.decorator.<locals>.wrapperrY   rZ   rd   rn   rV   r+   rg      s    z(nccl_skip_if_lt_x_gpu.<locals>.decoratorr*   )rd   rn   rg   r*   rq   r+   nccl_skip_if_lt_x_gpu   s    rr   c                 C   st   |   }d|kstd|ks td|ks,t|d }|ddkrF|n|dd }||ksptd| d| d S )	N	iterationZ	has_errorerrorz
Exception raised from r   zDid not find expected z in ddp logging data error: )Z_get_ddp_logging_dataAssertionErrorfindsplit)Z	model_DDPZ
err_substrZddp_logging_dataZlogging_erractualr*   r*   r+   verify_ddp_error_logged   s    rz   c                    s   t   fdd}|S )aJ  
    Convenience decorator to set/unset TORCH_NCCL_BLOCKING_WAIT flag. Note that use of
    this decorator will override the setting of TORCH_NCCL_ASYNC_ERROR_HANDLING for
    the particular test. After the test, both TORCH_NCCL_BLOCKING_WAIT and
    TORCH_NCCL_ASYNC_ERROR_HANDLING will be restored to their original values.
    c               	      s   zt jd }t jd= W n tk
r.   d }Y nX z,zt jd }W n tk
rX   d }Y nX W 5 dt jd< X z | |}|W S |d k	r|t jd< |d k	r|t jd< X d S )NZTORCH_NCCL_ASYNC_ERROR_HANDLING1ZTORCH_NCCL_BLOCKING_WAIT)rP   rQ   KeyError)rS   rT   Z cached_nccl_async_error_handlingZcached_nccl_blocking_waitretrV   r*   r+   rX      s0    

z(with_nccl_blocking_wait.<locals>.wrapperrY   rZ   r*   rV   r+   with_nccl_blocking_wait   s    "r~   c                    s    fdd}|S )zK
    Runs a test for each distributed debug level specified in levels.
    c                    s   t   fdd}|S )Nc                     sR   t jdd }D ]:}|t jd< t   | |}t  |d k	r|t jd< q|S )NZTORCH_DISTRIBUTED_DEBUG)rP   rQ   getc10dZset_debug_level_from_envbarrier)rS   rT   Z	old_levelr    r}   )rW   levelsr*   r+   rX     s    

z:with_dist_debug_levels.<locals>.decorator.<locals>.wrapperrY   rZ   r   rV   r+   rg     s    z)with_dist_debug_levels.<locals>.decoratorr*   )r   rg   r*   r   r+   with_dist_debug_levels  s    r   c                   C   s   t t  dS )Nz+c10d was not compiled with the Gloo backend)r   r   Zis_gloo_availabler*   r*   r*   r+   requires_gloo&  s    r   c                 C   sD   t  stdS ttjj | k d|  dtjj  d| S d S )N+c10d was not compiled with the NCCL backendz0Requires NCCL version greater than or equal to: z	, found: z
, reason: )r   is_nccl_availabler   r   rK   rF   r@   version)r   msgr*   r*   r+   requires_nccl_version-  s    r   c                   C   s   t t  dS )Nr   )r   r   r   r*   r*   r*   r+   requires_nccl9  s    r   c                   C   s   t t  dS )Nz*c10d was not compiled with the UCC backend)r   r   Zis_ucc_availabler*   r*   r*   r+   requires_ucc?  s    r   c                   C   s   t t  dS )Nz*c10d was not compiled with the MPI backend)r   r   Zis_mpi_availabler*   r*   r*   r+   requires_mpiE  s    r   c                    s   d _ t  fdd}|S )zSkips a test for ROCmTc                     s"   t s | |S ttd j d S )NrA   )r   rM   rN   rO   r"   r_   rV   r*   r+   rX   P  s    
zskip_if_rocm.<locals>.wrapper)skip_if_rocmr   rZ   r*   rV   r+   r   L  s    r   c                   C   s   t tjdkdS )Nwin32z8This unit test case is not supported on Windows platform)r   rM   platformr*   r*   r*   r+   skip_if_win32Y  s    r   	localhostra   T   )minutesFc           	      C   sL   t  }|r2t|tdd }tjj| ||||S tj| |||||dS dS )zL
    Creates a TCP store. Retries if the chosen port is already in use.
    ra   )Zmilliseconds)wait_for_workers	use_libuvN)r   r'   r   rK   classesZ	dist_c10dZTCPStorer   )	addrrU   	is_mastertimeoutr   Z	jit_classr   portZtimeout_millisecondr*   r*   r+   create_tcp_store`  s$             r   i  Z!DISTRIBUTED_TESTS_DEFAULT_TIMEOUTZ300Ztest_ddp_uneven_inputsi     Ztest_join_kwargsc                 C   s2   t jdks| d kr tjjddS tjj| dS d S )Nr   z	127.0.0.1)hostnameZ	interface)rM   r   r   ZProcessGroupGloocreate_devicer   r*   r*   r+   r     s    r   returnc                 C   s   t | dd tS N.ru   )TIMEOUT_OVERRIDEr   rx   TIMEOUT_DEFAULT)Ztest_idr*   r*   r+   get_timeout  s    r   c               	   c   sR   t  t   } }tjtj }}z | | t_t_tjtjfV  W 5 || t_t_X d S N)r	   rM   stdoutstderr)Znew_outZnew_errZold_outZold_errr*   r*   r+   captured_output  s    r   )rankrU   
num_inputsc              
      sx   dt t t t ddd}t ddd  fd	d
t|ddt|ddt|ddt|ddt|ddt|ddfD S )z
    Generate a number of basic test cases for sparse reduction.
    These cover tensors with a varying number of sparse dimensions and a varying
    number of dense dimensions. The only reduction operation we support is sum.
    ra   r   )r   rU   sparse_dims
dense_dimsc              	   S   s   t t | d d| d f}|gdd t|D  }t|d D ](}t |t d| d f}|| qBt | d gdd t|D  }t |||S )Nra   c                 S   s   g | ]}d qS r]   r*   .0_r*   r*   r+   
<listcomp>  s     z@simple_sparse_reduce_tests.<locals>.generate.<locals>.<listcomp>c                 S   s   g | ]}d qS r   r*   r   r*   r*   r+   r     s     )	rK   ZreshapeZarangerangecatZzerosappendZonesZsparse_coo_tensor)r   rU   r   r   indicesshaper   valuesr*   r*   r+   generate  s    "z,simple_sparse_reduce_tests.<locals>.generate)rU   c                    s    t tj fddtD S )Nc                    s   g | ]} |qS r*   r*   )r   r   fnrU   r*   r+   r     s     zCsimple_sparse_reduce_tests.<locals>.compute_sum.<locals>.<listcomp>)r   operatoraddr   r   r*   r   r+   compute_sum  s     z/simple_sparse_reduce_tests.<locals>.compute_sumc                    sD   g | ]<  fd dt D  fddt D fqS )c                    s"   g | ]}  |  qS r*   r*   r   i)r   r   r   rU   r*   r+   r     s   z9simple_sparse_reduce_tests.<locals>.<listcomp>.<listcomp>c                    s   g | ]}  qS r*   r*   r   )r   r   r   rU   r*   r+   r     s     )r   )r   r   r   r   rU   r   r+   r     s   z.simple_sparse_reduce_tests.<locals>.<listcomp>)r   r]      )r   )ra   r   )r'   r   )r   rU   r   r   r*   r   r+   simple_sparse_reduce_tests  s    	





r   )rU   rd   c                    sB   t j }t|d | |kr&||    fddt| D }|S )zMultigpu tests are designed to simulate the multi nodes with multi
    GPUs on each node. Nccl backend requires equal #GPUs in each process.
    On a single node, all visible GPUs are evenly
    divided to subsets, each process only uses a subset.
    ra   c                    s*   i | ]"}|t |  |d     qS ra   )listr   ZnGPUs_per_processZvisible_devicesr*   r+   
<dictcomp>  s    z(init_multigpu_helper.<locals>.<dictcomp>)rK   rF   rR   r   )rU   rd   ZnGPUsZrank_to_GPUr*   r   r+   init_multigpu_helper  s    
r   tmp_dir)init_methodr   c                 C   s   t  atjtjd< ttjtjd ttjtjd tjtjd}t| | d k	rn| tjd< nt	tj|d tjd< d S )NZTEMP_DIRr   Ztest_dirZinit_dirZINIT_METHODZshared_init_file)
tempfileTemporaryDirectoryr   namerP   rQ   mkdirpathjoinr   )r   Zinit_dir_pathr*   r*   r+   initialize_temp_directories  s    
 r   c                   C   s   t d k	rt   d S r   )r   cleanupr*   r*   r*   r+   cleanup_temp_dir  s    r      c                       s:  e Zd ZdZdZedddZeedddZ	dd	 Z
d.eedd fddZdd fddZdd fddZedddZddddZddddZG dd deZeedddZeeeedddd Zedd!d"d#Zddd$d%Zddd&d'Zddd(d)Zddd*d+Zeedd,d-Z  ZS )/MultiProcessTestCaseru   
   r   c                 C   s   dS )NFr*   selfr*   r*   r+   _should_stop_test_suite  s    z,MultiProcessTestCase._should_stop_test_suitec                 C   s   t S r   DEFAULT_WORLD_SIZEr   r*   r*   r+   rU     s    zMultiProcessTestCase.world_sizec                    s    t   fdd}t|| S )Nc                    s"   | j | jkr|   n   d S r   )r   MAIN_PROCESS_RANK_join_processesr   r   r*   r+   rX     s    z1MultiProcessTestCase.join_or_run.<locals>.wrapperr   types
MethodTyper   r   rX   r*   r   r+   join_or_run  s    z MultiProcessTestCase.join_or_runrunTestN)method_name
methodNamer   c                    s8   |dkr|}t  | t| |}t| || | d S )Nr   super__init__getattrsetattrr   )r   r   r   r   	__class__r*   r+   r   #  s
    
zMultiProcessTestCase.__init__c                    s8   t    g | _g | _| j| _tjddj| _	i | _
d S )NF)delete)r   setUpskip_return_code_checks	processesr   r   r   NamedTemporaryFiler   	file_namepid_to_piper   r   r*   r+   r   ,  s    
zMultiProcessTestCase.setUpc                    s(   t    | jD ]}|  qg | _d S r   )r   tearDownr   	terminate)r   pr   r*   r+   r   5  s    


zMultiProcessTestCase.tearDownc                 C   s   |   dd S r   idrx   r   r*   r*   r+   _current_test_name?  s    z'MultiProcessTestCase._current_test_namec                 C   s   g | _ tt| jD ]j}tj \}}|| jjdt	| || 
 | j|fd}|  td||j || j|j< | j | qd S )Nzprocess )targetr   rS   zStarted process %s with pid %s)r   r   r'   rU   rK   multiprocessingPiper   _runr)   r   r   startloggerinfopidr   r   )r   procr   Zparent_connZ
child_connprocessr*   r*   r+   _start_processesC  s    
z%MultiProcessTestCase._start_processesc                 C   s   t jdj}| | d S )Nspawn)rK   r   Zget_contextProcessr  )r   r  r*   r*   r+   _spawn_processesQ  s    z%MultiProcessTestCase._spawn_processesc                   @   s   e Zd ZdZdS )zMultiProcessTestCase.Eventra   N)r$   r%   r&   GET_TRACEBACKr*   r*   r*   r+   EventU  s   r  r   c              	   C   s   t d| tj| |g}| |kr| jr:t d| d S |  }t d|| |tjj	krt
jdd<}t| |  |d | |  t d| W 5 Q R X ||krd S qd S )Nz*Starting event listener thread for rank %sz:Pipe closed for process %s, stopping event listener threadzReceived event %s on process %szr+)moder   zProcess %s sent traceback)r  r  r   
connectionwaitclosedrecvr   r  r  r   r   faulthandlerZdump_tracebackflushseeksendread)parent_pipeZsignal_piper   Zready_pipeseventZtmp_filer*   r*   r+   _event_listenerX  s(     

z$MultiProcessTestCase._event_listenerr   	test_namer   r   c                 C   s$   | |}||_ ||_||| d S r   r   r   run_testclsr   r  r   r  r   r*   r*   r+   r   w  s    zMultiProcessTestCase._run)r  r   c              
   C   sL  t jjdd\}}tjtj||| jfdd}|  t	j
dkrTt	j
dkrTt jd dtjd< zzt| |  W n tjk
r } z*td	| j|t| t	td
 j W 5 d }~X Y nV tk
r } z6tdt | jtj  |t  t	tj  W 5 d }~X Y nX W 5 |d k	r(|d  |d k	s6t|  |  X d S )NF)ZduplexT)r   rS   daemonr   darwinr{   TORCH_SHOW_CPP_STACKTRACESz4Process %s skipping test %s for following reason: %srB   z;Caught exception: 
%s exiting process %s with exit code: %s)!rK   r   r   	threadingThreadr   r  r   r  rM   r   _CZ'_set_print_stack_traces_on_fatal_signalrP   rQ   r  rv   r   closer   unittestSkipTestr  r  r)   rN   rO   r"   	Exceptionrt   	traceback
format_excTEST_ERROR_EXIT_CODE)r   r  r  Zsignal_recv_pipeZsignal_send_pipeZevent_listener_threadseer*   r*   r+   r  ~  sD    

      "

zMultiProcessTestCase.run_testc                 C   s  g }t | jD ]p\}}|jd kr| j|j }z |tjj |	||f W q t
k
r| } ztd|| W 5 d }~X Y qX q|D ]\}}zH|dr|jrtd| W q| }td|| ntd| W q t
k
r } ztd|| W 5 d }~X Y qX qd S )NzBEncountered error while trying to get traceback for process %s: %sr   z5Pipe closed for process %s, cannot retrieve tracebackz)Process %s timed out with traceback: 

%sz6Could not retrieve traceback for timed out process: %s)	enumerater   exitcoder   r  r  r   r  r  r   ConnectionErrorr  rt   pollr  r  r  )r   Zpipesr   r  piper/  r   r+  r*   r*   r+   _get_timedout_process_traceback  sL    
  
      z4MultiProcessTestCase._get_timedout_process_tracebackc              	   C   s2  t |  }t }d}zt| jD ]P\}}|jt	j
kr$td| d|j d tj }|D ]}	|	  q^d} qvq$|r|qtdd | jD rqt | }
|
|kr|   td| d	 | jD ]}|  qqtd
 qt | }|| jkr| | n
| | W 5 | j D ]}|  qX d S )NFProcess z terminated with exit code z", terminating remaining processes.Tc                 s   s   | ]}|j d k	V  qd S r   )r1  )r   r   r*   r*   r+   	<genexpr>  s     z7MultiProcessTestCase._join_processes.<locals>.<genexpr>zTiming out after z" seconds and killing subprocesses.g?)r   r   timer   r   r'  r0  r   r1  r   r-  printrK   r   active_childrenr   allr5  sleepr   _check_no_test_errors_check_return_codes)r   r   r   
start_timeZsubprocess_errorr4  r   r   r:  acelapsedelapsed_timer*   r*   r+   r     sD    




z$MultiProcessTestCase._join_processesc                 C   sH   t | jD ]8\}}|jdkr2td| d| d| | j|j q
dS )zV
        Checks that we didn't have any errors thrown in the child processes.
        Nr6  z timed out after  seconds)r0  r   r1  RuntimeErrorassertNotEqualr-  )r   rB  r   r   r*   r*   r+   r=    s    
z*MultiProcessTestCase._check_no_test_errorsc           
   
   C   sH  | j std dS | j d }dd t| j D }|r~d}|D ]6\}}| j|j  }|d| dtj d	| d
7 }q>t	|t| j D ]R\}}|j
dkrt	d| d| d| j|j
|j
d| d|j
 d|j
 d qt D ]<}	|j
|	jkrtrtd|  |	j  dS t|	jq| j|j
dd|j
 d|j d dS )z
        Checks that the return codes of all spawned processes match, and skips
        tests if they returned a return code indicating a skipping condition.
        z<Note: no subprocesses were spawned, test was likely skipped.Nr   c                 S   s$   g | ]\}}|j tjkr||fqS r*   )r1  r   r-  )r   r   r   r*   r*   r+   r     s   z<MultiProcessTestCase._check_return_codes.<locals>.<listcomp> r6  z exited with error code z and exception:

 terminated or timed out after rC  zExpect process z+ exit code to match Process 0 exit code of z
, but got )r   6Skipping %s on sandcastle for the following reason: %sz Expected zero exit code but got z
 for pid: )r   r  warningr0  r   r  r  r   r-  rD  r1  assertEqualrO   r   r"   r   r  r   r#   r(  r)  )
r   rB  Zfirst_processZerrored_processesrt   r   r  error_messager   skipr*   r*   r+   r>  	  sP    


  z(MultiProcessTestCase._check_return_codesc                 C   s
   | j dkS )Nr   r  r   r*   r*   r+   r   I  s    zMultiProcessTestCase.is_master)r   r   )r$   r%   r&   r   r-  boolr   propertyr'   rU   r   r)   r   r   r   r   r  r
  r   r  staticmethodr  classmethodr   r  r5  r   r=  r>  r   __classcell__r*   r*   r   r+   r     s0   		
+%0@r   )subtest_configtest_fntest_kwargsc           
   
   O   s   t | }dd |D }dd |D }tj| D ]T}tt||}	| jf |	( tj	  ||||	 tj	  W 5 Q R X t
  q2dS )a\  
    Runs a test function given by ``test_fn`` as a subtest according to the
    configurations specified by ``subtest_config``. This amortizes the
    costly setup overhead (including process spawn and initializing the
    process group) over the subtests.

    Args:
        subtest_config (Dict[str, List[Any]]): A mapping from subtest
            keyword argument name to a list of its possible values.
        test_fn (Callable): A callable that runs the actual test.
        test_args: Positional arguments to pass to ``test_fn``.
        test_kwargs: Keyword arguments to pass to ``test_fn``.
    c                 S   s   g | ]}|d  qS )r   r*   r   itemr*   r*   r+   r   d  s     z run_subtests.<locals>.<listcomp>c                 S   s   g | ]}|d  qS r   r*   rV  r*   r*   r+   r   e  s     N)r   items	itertoolsproductdictzipZsubTestrK   _dynamoresetr   r   )
Zcls_instrS  rT  Z	test_argsrU  Zsubtest_config_itemsZsubtest_config_keysZsubtest_config_valuesr   Zsubtest_kwargsr*   r*   r+   run_subtestsN  s    
r_  c                   C   sL   t dk	rt S z"tjdddddgddjd	ka W n tk
rF   da Y nX t S )
a   
    If shell command `fi_info -p efa -t FI_EP_RDM` returns exit code 0 then we assume that the machine has
    Libfabric EFA interfaces and EFA software components installed,
    see https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/efa-start.html.
    NZfi_infoz-pZefaz-tZ	FI_EP_RDMF)checkr   )EFA_PROBE_RESULT
subprocessrun
returncodeFileNotFoundErrorr*   r*   r*   r+   has_efat  s    
rf  c                   C   s   t  rddgS dS )a  
    If the machine has Libfabric EFA interfaces and EFA software components installed it may cause
    'RuntimeError: In operator() at tensorpipe/common/ibv.h:172 "": Operation not supported' if tensorpipe
    uses InfiniBand transport, so we exclude it from tensorpipe transports,
    see https://github.com/pytorch/pytorch/issues/73885 and https://github.com/pytorch/pytorch/issues/65022
    ZshmZuvN)rf  r*   r*   r*   r+   tp_transports  s    rg  c                    s:   dkrt t|dS dd  t fdd}|S )z+
    Wrapper to use with a test method
    N)r   rU   c                    sf   t  t }fdd fdd}g }tD ]*}tj|||fd}|  || q6|S )Nc                      s    t jjkS r   r   distributed_c10dZ_worldr*   worldr*   r+   world_is_valid  s    zaspawn_threads_and_init_comms.<locals>._run_test_method_with_multi_threads.<locals>.world_is_validc              
      sx   t jd| |d zPz
   W n@ tk
r^ } z"tj| t f t	
| W 5 d }~X Y nX W 5  rrt   X d S )Nthreadedrd   r   rU   store)r   init_process_groupdestroy_process_groupBaseExceptionMultiThreadedTestCaseexception_queueputrM   exc_infor   exception_handle)r   Zworld_pgro  ex)callbackrl  rU   r*   r+   worker  s       
 zYspawn_threads_and_init_comms.<locals>._run_test_method_with_multi_threads.<locals>.workerr   rS   )r   r   	HashStorer   r$  r%  r  r   )rU   ry  global_storerz  threadsr   tr*   )ry  rk  rl  rU   r+   #_run_test_method_with_multi_threads  s    zIspawn_threads_and_init_comms.<locals>._run_test_method_with_multi_threadsc              	      sL   t jjd z( fdd}t| W 5 t jjd X d S )NTFc                      s   f S r   r*   r*   )rS   rW   rT   r   r*   r+   <lambda>      z?spawn_threads_and_init_comms.<locals>.wrapper.<locals>.<lambda>)rK   r&  _distributed_c10d_set_thread_isolation_moders  _join_threads)r   rS   rT   r~  r  rW   rU   )rS   rT   r   r+   rX     s
    z-spawn_threads_and_init_comms.<locals>.wrapper)r   spawn_threads_and_init_commsr   )rW   r   rU   rX   r*   r  r+   r    s      
r  c                       s   e Zd ZdZe ZdZdd Zd'e	dd fdd	Z
d
d Zdd Zdd fddZ fddZdd Zedd Zdd Zedd Zedd ZeedddZee	ddd Zd(d!d"d#d$Zd)d!d"d%d&Z  ZS )*rs  a5  
    Test runner that runs all tests with the in-proc process group using
    multiple threads with the threaded process group.

    Each test spawns world_size threads and run the test method in each thread.

    Difference from regular MultiProcess test runner:
    Must explicitly defines SetUp and call self._spawn_threads() to run the tests.
    Cannot use setUp / tearDown (must use perThreadSetup / perThreadShutdown)
        to set up / tear down each thread when running each test.
    No global state possible
        How bad of a limitation is this?
    ru   c                    s    t   fdd}t|| S )Nc                    s&   | j | jkr| | j  n   d S r   )r   MAIN_THREAD_RANKr  r~  r   r   r*   r+   rX     s    z2MultiThreadedTestCase.join_or_run.<locals>.wrapperr   r   r*   r   r+   r     s    z!MultiThreadedTestCase.join_or_runr   N)r   r   c                    s.   t  | t| |d }t| || | d S r   r   )r   r   rT  r   r*   r+   r     s    zMultiThreadedTestCase.__init__c                 C   s   d S r   r*   r   r*   r*   r+   perThreadSetUp  s    z$MultiThreadedTestCase.perThreadSetUpc                 C   s   d S r   r*   r   r*   r*   r+   perThreadTearDown  s    z'MultiThreadedTestCase.perThreadTearDownr   c                    s&   t    | j| _g | _dtjd< dS )z
        setUp only set up things in the main thread, if you want to configure things
        in the spawned threads, use perThreadSetUp
        r{   r#  N)r   r   r  r   r~  rP   rQ   r   r   r*   r+   r     s    
zMultiThreadedTestCase.setUpc                    s   t    g | _dS )z
        tearDown only set up things in the main thread, if you want to configure things
        in the spawned threads, use perThreadTearDown
        N)r   r   r~  r   r   r*   r+   r     s    
zMultiThreadedTestCase.tearDownc                    s   t jjd | j}t  t | j_	 fdd}| s@t
dt| jD ]2}tj| jj||| jfd}|  | j| qJdS )zk
        class method to spawn threads and run test, use this method in the SetUp of your TestCase
        Tc                      s    t jjkS r   rh  r*   rj  r*   r+   rl    s    z<MultiThreadedTestCase._spawn_threads.<locals>.world_is_validzInvalid worldr{  N)rK   r&  r  r  r   r   r   r|  r   r}  rD  r   rU   r$  r%  r   r  r~  r   )r   r  rl  r   r  r*   rj  r+   _spawn_threads  s    z$MultiThreadedTestCase._spawn_threadsc                 C   sH   | |}||_ t|dr6t |_tj|j_tj|j_	|
||| d S )N_tls)r   hasattrr$  localr  r   Z
_precision	precisionZ_rel_tolZrel_tolrun_test_with_threaded_pg)r   r  r   rU   r   r*   r*   r+   r     s    



zMultiThreadedTestCase._runc              
   C   s   t jd||| jjd |   zVzt| |  W n@ tk
rp } z"| j	
|t f t| W 5 d}~X Y nX W 5 t   |   X dS )zd
        Run the current test associated with `test_name` using the threaded process group.
        rm  rn  N)r   rp  r   r}  r  rq  r  r   rr  rt  ru  rM   rv  r   rw  )r   r  r   rU   rx  r*   r*   r+   r  )  s        z/MultiThreadedTestCase.run_test_with_threaded_pgc              	   C   s   t }z|t|D ]B\}}|td| |	 rt
j|ttd| dd ff qt  g }| j s~| j }|| q^W 5 t  tjjd X | ||| d S )NFr   zRank failed to join in under rC  )r   r   rK   r&  r  r  r0  r   maxis_alivers  rt  ru  TimeoutErrorr   r^  emptyr   r   r>  )r   r~  r   r   idxthreadfailed_ranksZfailurer*   r*   r+   r  <  s.    


z#MultiThreadedTestCase._join_threadsc                 C   sN  d}d}|D ]\}}|d }t |tjrPtd||t| |dk rtd j}qt |trd| d| d	}	t	|	 t
|	qt |trdtj| }	t	d
|	| |d| d|	 d7 }qt |trt|jtkr|dk r|j}qt|dkrt
||dkrJt D ]:}
||
jkrtr:td||
j  d S t|
jqd S )NrF  ru   ra   z3Thread %s skipping test %s for following reason: %sr   rB   zThread rH  z	 seconds
z'Caught exception: 
%s exiting thread %sz exited with exception:
rG  rI  )
isinstancer(  r)  r  r  r)   rO   r"   r  rt   rD  r*  r   r+  format_exception
SystemExittypecoder'   lenr   r   r#   )r   r  r   r   	error_msgZ	skip_coder   rv  excr   rM  r*   r*   r+   r>  Z  sV       



  

  z)MultiThreadedTestCase._check_return_codesc                 C   s   t S r   r   r   r*   r*   r+   rU     s    z MultiThreadedTestCase.world_sizec                 C   s   |   dd S r   r   r   r*   r*   r+   r     s    z(MultiThreadedTestCase._current_test_namer   r  c                C   s   | j |kr| ||| dS )z
        The reason why we have this util function instead of
        self.assertEqual is all threads are sharing one CPU RNG
        so the assertion result is only reliable on rank 0
        N)r   rK  r   rn   yr   r   r*   r*   r+   assertEqualOnRank  s    
z'MultiThreadedTestCase.assertEqualOnRankc                C   s   | j |kr| || d S r   )r   rE  r  r*   r*   r+   assertNotEqualOnRank  s    
z*MultiThreadedTestCase.assertNotEqualOnRank)r   )N)N)r$   r%   r&   __doc__queueQueuert  r  r   r)   r   r  r  r   r   r  rQ  r   r  r  r>  rO  r'   rU   r   r  r  rR  r*   r*   r   r+   rs    s.   



0	rs  c                       sF   e Zd Zeejejf edd fddZ	ejejdddZ
  ZS )SaveForwardInputsModuleNforward_inputscast_forward_inputsr   c                    s(   t    tdd| _|| _|| _d S )Nd   )r   r   nnZLinearlr  r  r   r  r  r   r*   r+   r     s    
z SaveForwardInputsModule.__init__rn   r   c                 C   s*   || j | < | | jr$|| jjjn|S r   )r  r  r  toweightZdtyper   rn   r*   r*   r+   forward  s    
zSaveForwardInputsModule.forwardr$   r%   r&   r
   r  ModulerK   ZTensorrN  r   r  rR  r*   r*   r   r+   r    s
   
r  c                       sF   e Zd Zeejejf edd fddZ	ejejdddZ
  ZS )SaveForwardInputsModelNr  c                    s,   t    t||| _t||| _|| _d S r   )r   r   r  c1c2r  r  r   r*   r+   r     s    
zSaveForwardInputsModel.__init__r  c                 C   s   || j | < | | |S r   )r  r  r  r  r*   r*   r+   r    s    
zSaveForwardInputsModel.forwardr  r*   r*   r   r+   r    s
   
r  c                 c   s   t j|  dtjd< dtjd< |r4tjd| |d t j  t jj	j
  z
d V  W 5 t j  t jj	j
  |r|t  X d S )Nr   MASTER_ADDRZ6789MASTER_PORTr@   r   rU   )rK   rF   Z
set_devicerP   rQ   r   rp  r]  r^  utilsZcountersclearrq  )r   rU   Zinit_pgr*   r*   r+   _dynamo_dist_per_rank_init  s    




r  c                       s4   e Zd ZdZe fddZe fddZ  ZS )#DynamoDistributedSingleProcTestCasez
    Test harness for single-process dynamo distributed tests,
    initializes dist process group.

    Prefer this for simple tests, as it's easier to debug.
    c                    sh   t    | jttjddd d| _d| j | _	d| j	krHd n| jg| _
tjd| jdd	 d S )
Nr   Z12355)r  r  r   zcuda:rF   r@   ra   r  )r   
setUpClassZ_exit_stackenter_contextr   r[  rP   rQ   r   ZdeviceZ
device_idsr   rp  r   r   r*   r+   r    s    
	z.DynamoDistributedSingleProcTestCase.setUpClassc                    s   t   t   d S r   )r   rq  r   tearDownClassr  r   r*   r+   r    s    z1DynamoDistributedSingleProcTestCase.tearDownClass)r$   r%   r&   r  rQ  r  r  rR  r*   r*   r   r+   r    s
   r  c                       sV   e Zd ZdZ fddZ fddZeedddZe	ee
e
d	d
ddZ  ZS )"DynamoDistributedMultiProcTestCasea   
    Use this for tests that actually run on multiple GPUs.

    Decorate tests with @skip_if_lt_x_gpu(ngpu)

    Note: MultiProcTestCase spawns processes per test and is slow.
    Prefer MultiThreadedTestCase for most tests. Perhaps use this one
    sparingly for integration tests.
    c                    s   t    |   d S r   )r   r   r
  r   r   r*   r+   r     s    
z(DynamoDistributedMultiProcTestCase.setUpc                    s4   t    zt| j W n tk
r.   Y nX d S r   )r   r   rP   remover   OSErrorr   r   r*   r+   r     s
    
z+DynamoDistributedMultiProcTestCase.tearDownr   c                 C   s
   t j S r   )rK   rF   rR   r   r*   r*   r+   rU   
  s    z-DynamoDistributedMultiProcTestCase.world_sizeNr  c                 C   s$   | |}||_ ||_||| d S r   r  r  r*   r*   r+   r     s    z'DynamoDistributedMultiProcTestCase._run)r$   r%   r&   r  r   r   rO  r'   rU   rQ  r)   r   rR  r*   r*   r   r+   r    s   	r  c                       s   e Zd ZU dZeed< dZeed< dZee	 ed< e
eje	ddd	Ze
dddZe
 fddZe
 fddZe
deeee	 dddZ  ZS )MultiProcContinousTestr]   rU   ru   r   N	rdvz_filer   c                 C   s   t ddS )z
        ProcessGroup backend str.
        To be customized by sub test classes, e.g. "nccl".
        Here we raise error.
        z/Please implement backend_str in your test classN)NotImplementedErrorr  r*   r*   r+   backend_str   s    z"MultiProcContinousTest.backend_strFc                 C   s   dS )z
        ProcessGroup init options.
        To be customized by sub test classes, e.g. ProcessGroupNCCLOpTest
        Here we return None.
        Nr*   )r   Zhigh_priority_streamr*   r*   r+   opts*  s    zMultiProcContinousTest.optsc                    s   t    d| j  kr"| jk s<n td| j d| j | jrTt| j| j}nd}|  }| 	 }t
d| tj|| j| j||d tj | _t
d| j d dS )	z
        Class-scope test fixture. Run once for entire test class, before any test starts.
        Set up the process group.
        r   zBRank must be set and in the range of 0 to world_size. World size: z Rank: NzTesting backend=)rd   rU   r   ro  Z
pg_optionsRank z setup complete)r   r  r   rU   rD  r  r   Z	FileStorer  r  r9  rp  ri  Z_get_default_groupZpg)r   ro  r  rd   r   r*   r+   r  3  s(    
z!MultiProcContinousTest.setUpClassc                    sT   t   t   | jr>zt| j W n tk
r<   Y nX td| j	 d dS )z
        Class-scope test fixture. Run once for entire test class, after all tests finish.
        Tear down the process group.
        r  z teardown completeN)
r   rq  r   r  r  rP   r  r  r9  r   r  r   r*   r+   r  R  s    
z$MultiProcContinousTest.tearDownClass)r   rU   r  c                 C   s   || _ || _|| _t  dS )ad  
        This is an entry point for each rank to run the tests in `MultiProcContinousTest`.
        In this entry point, we set the class variables for the test class.
        Then we run all tests.

        Note:
        - This helper only works for a subclass of `MultiProcContinousTest`.

        Example:
        - See `test_c10d_ops_nccl.py`.
        N)r   rU   r  r   )r   r   rU   r  r*   r*   r+   run_rankb  s    zMultiProcContinousTest.run_rank)F)N)r$   r%   r&   rU   r'   r(   r   r  r   r)   rQ  abcabstractmethodr  r  r  r  r  rR  r*   r*   r   r+   r    s&   
 r  )N)ra   )N)T){r  r  rY  loggingr   rP   r  rb  rM   r   r$  r8  r+  r   r(  
contextlibr   dataclassesr   datetimer   enumr   	functoolsr   r   r   ior	   typingr
   r   r   r   r   r   r   r   Zunittest.mockr   rK   Ztorch._dynamo.test_caseZtorch.cuda.ncclZtorch.distributeddistributedr   Ztorch.nnr  Z$torch.testing._internal.common_utilsr   r   r   r   r   r   r   r   r   r   Z5torch.testing._internal.distributed.multi_threaded_pgr   r   r   r   basicConfigINFO	getLoggerr$   r  r!   rO   rD   r[   r`   rb   rh   rl   rp   rr   rz   r~   r   r   r   r   r   r   r   r   r   r   r'   getenvr   r   r   r   r   r)   r   r   r   r(   r   r   r   r   r_  ra  rN  rf  rg  r  rs  r  r  r  r  r]  Z	test_caser  r  r  r*   r*   r*   r+   <module>   s    (0
  .


,  M#  
8 Z "