U
    yhd                  	   @   s  U d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	Z	d dl
mZmZ d dlmZ d dlmZ d dlmZ d dlmZmZmZmZmZmZmZmZmZmZmZmZ d dlZd dlm Z  d dl!m"Z"m#Z# d d	l$m%Z% d d
l&m'Z'm(Z(m)Z)m*Z* d dl+m,Z,m-Z-m.Z.m/Z/m0Z0m1Z1m2Z2m3Z3 d dl4m5Z5 d dl6m7Z7m8Z8m9Z9 d dl:m;Z; d dl<m=Z= ddddgZ>dZ?e@eAd< eG dd dZBeG dd dZCdZDe@dddZEG dd deZFG d d! d!eFZGG d"d# d#eFZHe2eId$d%d&ZJeIee2 eee2  d'd(d)ZKejLeejMej f e2e@e9d*d+d,ZNeejOejOe1eIePeIdd-d.d/ZQG d0d deZRG d1d deRZSG d2d3 d3e8ZTG d4d de7ZUG d5d deTe5ZVdS )6    N)ABCabstractmethod)contextmanager)	dataclass)Path)AnyCallablecastDict	GeneratorIOIterableIteratorListOptionalTupleUnion)Tensor)_get_available_device_type_get_device_module)narrow_tensor_by_index)MetadataMetadataIndexSTATE_DICT_TYPEStorageMeta)LoadItemTypeLoadPlanLoadPlannerReadItemSavePlanSavePlanner	WriteItemWriteItemType)BlockingAsyncStager)StorageReaderStorageWriterWriteResult)_create_file_view)FutureFileSystemWriterFileSystemReader
FileSystemFileSystemBase	.metadata_metadata_fnc                   @   s*   e Zd ZU dZeed< eed< eed< dS )_StorageInfoz#This is the per entry storage info.relative_pathoffsetlengthN)__name__
__module____qualname____doc__str__annotations__int r:   r:   Y/var/www/html/venv/lib/python3.8/site-packages/torch/distributed/checkpoint/filesystem.pyr/   B   s   
r/   c                   @   s   e Zd ZU eed< dS )_StoragePrefixprefixN)r3   r4   r5   r7   r8   r:   r:   r:   r;   r<   K   s   
r<   z.distcpreturnc                   C   s   t t S N)r7   uuiduuid4r:   r:   r:   r;   _generate_uuidS   s    rC   c                   @   sT   e Zd ZeeeddddZeddddZeee	e
jef  ddd	ZdS )
_TensorLoaderNsizeobjr?   c                 C   s   d S r@   r:   selfrF   rG   r:   r:   r;   addX   s    z_TensorLoader.addr>   c                 C   s   d S r@   r:   rI   r:   r:   r;   start_loading\   s    z_TensorLoader.start_loadingc                 C   s   d S r@   r:   rK   r:   r:   r;   values`   s    z_TensorLoader.values)r3   r4   r5   r   r9   objectrJ   rL   r   r   torchr   rM   r:   r:   r:   r;   rD   W   s   rD   c                   @   sX   e Zd ZeddddZeeddddZddd	d
Ze	e
ejef  dddZdS )_SerialCpuLoaderN)resolve_funr?   c                 C   s   || _ g | _d S r@   )rQ   items)rI   rQ   r:   r:   r;   __init__f   s    z_SerialCpuLoader.__init__rE   c                 C   s   | j ||f d S r@   )rR   appendrH   r:   r:   r;   rJ   j   s    z_SerialCpuLoader.addr>   c                 C   s   d S r@   r:   rK   r:   r:   r;   rL   m   s    z_SerialCpuLoader.start_loadingc                 c   sP   | j D ]D\}}| | }| }|  | kr@| }||fV  qd S r@   )rR   rQ   detachcpuZstoragerF   numelclonerI   _rG   tensorr:   r:   r;   rM   p   s    z_SerialCpuLoader.values)r3   r4   r5   r   rS   r9   rN   rJ   rL   r   r   rO   r   rM   r:   r:   r:   r;   rP   e   s   rP   c                   @   s   e Zd Zdeeej eddddZe	e
dddZeeejef  dd	d
ZddddZeeejef  dddZeeddddZddddZeeejef  dddZdS )_OverlappingCpuLoaderN@B )rQ   streaminflight_threshholdr?   c                 C   s   || _ g | _|| _d| _t | _d| _d| _|r8|j	nt
 | _	t| j	| _ttjj|p`| j | _| j| j kr| j| j  d S )Nr   F)rQ   rR   r_   in_flight_datacollectionsdequecurrent_itemsidxstarteddevice_typer   r   device_moduler	   rO   cudaStreamZcurrent_streamr^   Zwait_stream)rI   rQ   r^   r_   r:   r:   r;   rS   }   s     
 z_OverlappingCpuLoader.__init__r>   c                 C   s   | j t| jkS r@   )rd   lenrR   rK   r:   r:   r;   _done   s    z_OverlappingCpuLoader._donec                 C   sb   g }| j | jkr| j  | j | jkr^| j }|  j |d  |d   8  _ || q|S Nr   )	r`   r_   r^   synchronizerc   popleftrW   element_sizerT   )rI   drainedvalr:   r:   r;   _drain   s    

"z_OverlappingCpuLoader._drainc              	   C   s   | j | j | js| j| jk r| j| j \}}|  jd7  _| | }|j	j
| jkrl|jddd}n2|j	t	dkr|  | |j kr| }| j||f |  j| |  7  _qW 5 Q R X d S )N   rV   T)deviceZnon_blocking)rg   r^   rk   r`   r_   rR   rd   rQ   rU   rt   typerf   torO   Zuntyped_storagerF   rW   itemsizerX   rc   rT   ro   rY   r:   r:   r;   _refill   s&    
z_OverlappingCpuLoader._refillc                 C   s(   | j s
tt| jdkr"| j  | jS rl   )rk   AssertionErrorrj   rc   r^   rm   rK   r:   r:   r;   _finish   s    

z_OverlappingCpuLoader._finishrE   c                 C   s"   | j rtd| j||f d S )Nz&cannot add items after loading started)re   RuntimeErrorrR   rT   rH   r:   r:   r;   rJ      s    z_OverlappingCpuLoader.addc                 C   s0   | j r
d S d| _ | jjtdd |   d S )NTr   key)re   rR   sortoperator
itemgetterrx   rK   r:   r:   r;   rL      s
    z#_OverlappingCpuLoader.start_loadingc                 c   s<   |    | js*|  }|   |E d H  q|  E d H  d S r@   )rL   rk   rr   rx   rz   )rI   rp   r:   r:   r;   rM      s    z_OverlappingCpuLoader.values)Nr]   )r3   r4   r5   r   r   rO   ri   r9   rS   propertyboolrk   r   r   r   rN   rr   rx   r   rz   rJ   rL   r   rM   r:   r:   r:   r;   r\   |   s      
r\   )itemr?   c                 C   sB   d}| j d k	st| j jD ]}||9 }q| j jj}|tj| S Nrs   )Ztensor_datary   rF   
propertiesdtyperO   Z_utilsZ_element_size)r   rF   sr   r:   r:   r;   
_item_size   s    

r   )binsrR   r?   c           	      C   s   | dkr|gS dd |D }dd |D }dd t | D }dd t | D }|jtdd t|D ]\}}|||   | qd|D ]@}tt|tdd	d
 }|| | ||  t|7  < q|S )Nrs   c                 S   s   g | ]}|j tjkr|qS r:   ru   r"   BYTE_IO.0wir:   r:   r;   
<listcomp>   s      z+_split_by_size_and_type.<locals>.<listcomp>c                 S   s   g | ]}|j tjkr|qS r:   r   r   r:   r:   r;   r      s      c                 S   s   g | ]}g qS r:   r:   r   rZ   r:   r:   r;   r      s     c                 S   s   g | ]}d qS )r   r:   r   r:   r:   r;   r      s     T)r}   reverser|   r   )ranger~   r   	enumeraterT   minr   r   )	r   rR   bytes_wtensor_wZbucketsZbucket_sizesir   rd   r:   r:   r;   _split_by_size_and_type   s    r   )r^   data
write_itemstorage_keyr?   c                 C   s   |   }|jtjkr4t|tjs$t| |	  n:t|t
jsDt|jt
dksXtt
|ttt |  |   | }t|j|t|||dS )NrV   )indexZsize_in_bytesstorage_data)tellru   r"   r   
isinstanceioBytesIOry   write	getbufferrO   r   rt   saver	   r   bytesr&   r   r/   )r^   r   r   r   r1   r2   r:   r:   r;   _write_item   s    
r   )create_stream
file_queueresult_queueplannerr_   	use_fsyncthread_countr?   c              	   C   s|  z\|  \}}}	tj }
tt|
d }|dkr^tj sF|r^| r^|dkr^t|j|d}n
t	|j}dd |	D }|D ]}|
t|| qz|  dd |	D }g }| |d}|D ]"}||}|t|||| q| D ]&\}}|jst|t|||| q|rFzt|  W n tk
rD   t  Y nX W 5 Q R X || qW n tjk
rv   Y nX d S )Nrs   r   )r_   c                 S   s   g | ]}|j tjkr|qS r:   r   r   r:   r:   r;   r   6  s      z+_write_files_from_queue.<locals>.<listcomp>c                 S   s   g | ]}|j tjkr|qS r:   r   r   r:   r:   r;   r   ;  s      wb)
get_nowaitrO   Z_CZ_get_privateuse1_backend_namegetattrrh   Zis_availabler\   Zresolve_datarP   rJ   r   rL   rT   r   rM   Zis_cpury   osfsyncfilenoAttributeErrorsyncputqueueEmpty)r   r   r   r   r_   r   r   	file_namer   Zwrite_itemsZcustom_backend_nameZcustom_device_modloaderr   r   r   Zwrite_resultsr^   r   r[   r:   r:   r;   _write_files_from_queue  s\    	


r   c                   @   s4  e Zd Zeeeeejf ee	e
jddf dddZeeeejf eeeejf dddZeeeejf eeejf ddd	d
Zeeeejf eeejf dddZeeeejf ddddZeeeeejf edddZeeeejf edddZeeeejf ddddZdS )r,   Npathmoder?   c                 C   s   d S r@   r:   )rI   r   r   r:   r:   r;   r   V  s    zFileSystemBase.create_streamr   suffixr?   c                 C   s   d S r@   r:   rI   r   r   r:   r:   r;   concat_path]  s    zFileSystemBase.concat_pathr   new_pathr?   c                 C   s   d S r@   r:   rI   r   r   r:   r:   r;   renamec  s    zFileSystemBase.renamer   r?   c                 C   s   d S r@   r:   rI   r   r:   r:   r;   	init_pathi  s    zFileSystemBase.init_pathc                 C   s   d S r@   r:   r   r:   r:   r;   mkdirm  s    zFileSystemBase.mkdircheckpoint_idr?   c                 C   s   d S r@   r:   clsr   r:   r:   r;   validate_checkpoint_idq  s    z%FileSystemBase.validate_checkpoint_idc                 C   s   d S r@   r:   r   r:   r:   r;   existsv  s    zFileSystemBase.existsc                 C   s   d S r@   r:   r   r:   r:   r;   rm_filez  s    zFileSystemBase.rm_file)r3   r4   r5   r   r   r   r7   r   PathLiker   r   IOBaser   r   r   r   r   classmethodr   r   r   r   r:   r:   r:   r;   r,   U  s6      &c                   @   s  e Zd Zeeeejf eee	j
ddf dddZeeejf eeeejf dddZeeejf eeejf dd	d
Zeeejf eeejf ddddZeeejf ddddZeeeejf edddZeeejf edddZeeejf ddddZdS )r+   Nr   c              	   c   s.   t t||}t tj|V  W 5 Q R X d S r@   )r	   r   openr   r   )rI   r   r   r^   r:   r:   r;   r     s    zFileSystem.create_streamr   c                 C   s   t t|| S r@   )r	   r   r   r:   r:   r;   r     s    zFileSystem.concat_pathr   c                 C   s   t |tst|}|S r@   )r   r   r   r:   r:   r;   r     s    
zFileSystem.init_pathr   c                 C   s   t t|t t| d S r@   )r	   r   r   r   r:   r:   r;   r     s    zFileSystem.renamec                 C   s   t t|jddd d S )NT)parentsexist_ok)r	   r   r   r   r:   r:   r;   r     s    zFileSystem.mkdirr   c                 C   sR   t |trdS dt|krdS t|jD ]$}| r(tt|tjr( dS q(dS )NTz://F)r   r   r7   r   r   r   accessW_OK)r   r   pr:   r:   r;   r     s    
z!FileSystem.validate_checkpoint_idc                 C   s   t t| S r@   )r	   r   r   r   r:   r:   r;   r     s    zFileSystem.existsc                 C   s   t t|  d S r@   )r	   r   unlinkr   r:   r:   r;   r     s    zFileSystem.rm_file)r3   r4   r5   r   r   r7   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r:   r:   r:   r;   r+     s&     $ c                       s4  e Zd ZdZd$eeejf eee	e	ee
e
dd	 fddZd%eeejdf dd	d
dZeddddZeedddZee ee dddZeeeee  dddZeeee  ddddZee dddZeeeejf dddZeeeejf dd d!Zeeeejf ed	d"d#Z  Z S )&_FileSystemWritera  
    Basic implementation of StorageWriter using file IO.

    This implementation makes the following assumptions and simplifications:

    * The checkpoint path is an empty or non-existing directory.
    * File creation is atomic

    The checkpoint consist of one file per write request plus
    a `.metadata` file with the serialized metadata.

    Trs   逖 N)	r   single_file_per_rank
sync_filesr   per_thread_copy_ahead	overwriteargskwargsr?   c           	         sJ   t    t | _| j|| _|| _|| _|| _|| _	t
 | _|| _dS )a  
        Initialize the writer pointing to `path`.

        Args:
            path: directory where the checkpoint will be written to.
            single_file_per_rank: Produce one file per rank instead of one file per tensor/blob. Default to True.
            sync_files : force files to be synced to permanent storage. Default to True.
            thread_count: Number of IO threads to use to write. Default to 1.
            per_thread_copy_ahead: How many bytes to copy from the GPU ahead of saving then. Default 10Mb.
            overwrite: Whether to allow overwriting existing checkpoints. Defaults to True.

        N. B. If sync_files is disabled, there's no guarantee that the checkpoint will be consistent in the case of a failure.
        N)superrS   r+   fsr   r   r   r   r   r   rC   save_idr   )	rI   r   r   r   r   r   r   r   r   	__class__r:   r;   rS     s    
z_FileSystemWriter.__init__r   c                 C   s   |r| j || _t | _d S r@   )r   r   r   rC   r   rI   r   r:   r:   r;   reset  s    z_FileSystemWriter.reset)is_coordinatorr?   c                 C   s   d S r@   r:   )rI   r   r:   r:   r;   set_up_storage_writer  s    z'_FileSystemWriter.set_up_storage_writerplanr?   c                 C   sV   | j | j | j | jrR| jr@td| j d| jd ntd| jd|S )Nz#Detected an existing checkpoint in z#, overwriting since self.overwrite=z. Past version 2.5 of PyTorch, `overwrite` will default to False. Set this variable to True to maintain this functionality or False to raise when an existing checkpoint is found.z-Checkpoint already exists and self.overwrite=.)	r   r   r   r   metadata_pathr   warningswarnr{   rI   r   r:   r:   r;   prepare_local_plan  s    z$_FileSystemWriter.prepare_local_planplansr?   c                 C   s   dd t |D }|S )Nc                 S   s*   g | ]"\}}t j|td | ddqS )__rZ   )r   )dataclassesreplacer<   )r   r   r   r:   r:   r;   r     s   z9_FileSystemWriter.prepare_global_plan.<locals>.<listcomp>)r   )rI   r   Z	new_plansr:   r:   r;   prepare_global_plan  s    z%_FileSystemWriter.prepare_global_planr   r   r?   c              
      sj  |j d  fdd}t }| jrbt| j|jD ]*}| }| j| j	|}|
|||f q4n4|jD ],}| }| j| j	|}|
|||gf qht }	g }
td| jD ]<}tjt| jj||	|| j| j| jfd}|  |
| qt| jj||	|| j| j| jd |
D ]}|  qg }z||	 7 }q&W n, tjk
rd   t }|| | Y S X d S )Nr   c                     s   j    t }  d7  | S r   )r=   DEFAULT_SUFFIX)r   Z
file_countZstorage_planr:   r;   gen_file  s    z._FileSystemWriter.write_data.<locals>.gen_filers   )targetr   )r   r   r   r   r_   r   r   )r   r   Queuer   r   r   rR   r   r   r   r   r   	threadingThreadr   r   r   r   startrT   joinr   r   r(   
set_result)rI   r   r   r   r   Zbucketr   r   r   r   threadsrZ   tresfutr:   r   r;   
write_data  s^    


z_FileSystemWriter.write_data)metadataresultsr?   c              	   C   s   t  }|D ]}|dd |D  q
||_|  |_tt| j| jt	 d}| j
|dH}t|| | jrzt|  W n tk
r   t  Y nX W 5 Q R X | j| jr| j| j | j|| j d S )Nc                 S   s   i | ]}|j |jqS r:   )r   r   )r   wrr:   r:   r;   
<dictcomp>C  s      z,_FileSystemWriter.finish.<locals>.<dictcomp>z.tmpr   )dictupdater   storage_metar	   r   r   r   r   r.   r   pickledumpr   r   r   r   r   r   r   r   r   r   )rI   r  r  Z
storage_mdZwr_listZtmp_pathmetadata_filer:   r:   r;   finish@  s     
z_FileSystemWriter.finishr>   c                 C   s   t | j| jdS )N)r   r   )r   r   r   rK   r:   r:   r;   r  W  s    z_FileSystemWriter.storage_metac                 C   s   t t| j| jtS r@   )r	   r   r   r   r   r.   rK   r:   r:   r;   r   Z  s    z_FileSystemWriter.metadata_pathc                 C   s   | j S zT
        return the checkpoint_id that will be used to save the checkpoint.
        r   rK   r:   r:   r;   r   ^  s    z_FileSystemWriter.checkpoint_idc                 C   s
   t |S r@   r+   r   r   r:   r:   r;   r   e  s    z(_FileSystemWriter.validate_checkpoint_id)TTrs   r   T)N)!r3   r4   r5   r6   r   r7   r   r   r   r9   r   rS   r   r   r   r   r   r   r    r(   r&   r
  r   r  r   r   r  r   r   r   r   r   __classcell__r:   r:   r   r;   r     sB        "	
Er   c                       s   e Zd Zeeejf dd fddZee	j
dddZdeeejdf ddd	d
Zeeed dddZedddZeeddddZeedddZee ee dddZeeeejf dddZeeeejf edddZ  ZS )r*   Nr   c                    s4   t    t | _| j|| _t | _t | _	d S r@   )
r   rS   r+   r   r   r   r  r   rC   load_idr   r   r:   r;   rS   k  s
    
zFileSystemReader.__init__)sinfor?   c                 C   s   t ||j|jS r@   )r'   r1   r2   )rI   filer  r:   r:   r;   _slice_filer  s    zFileSystemReader._slice_filer   c                 C   s&   t  | _|r| j|| _t | _d S r@   )r  r   r   r   r   rC   r  r   r:   r:   r;   r   u  s    zFileSystemReader.resetr   c                 C   sj  t  }|jD ](}| j|j }|j}||g | q| D ]\}}| j| j	|}	| j
|	d}
|D ]}| j|j }| |
|}|jtjkrt||j}|d ||| qltttjttt |ddd}t||j|j}|| }|  |  ks2t!d|j d|   d|   |"| |#|| qlW 5 Q R X q>t$ }|%d  |S )	Nrbr   rV   T)Zmap_locationZweights_onlyzreq z mismatch sizes z vs )&r  rR   r   Zstorage_indexr0   
setdefaultrT   r   r   r   r   r  ru   r   r   r   r   readr2   seekZ
load_bytesr	   r   rO   loadr   r   r   Zstorage_offsetslengthsZresolve_tensorrU   rF   ry   Zcopy_Zcommit_tensorr(   r  )rI   r   r   Zper_fileZ	read_itemZitem_mdr   r0   reqsr   r^   reqZ
file_slice
read_bytesr[   Ztarget_tensorr	  r:   r:   r;   	read_data{  sL    

  

zFileSystemReader.read_datar>   c              	   C   sZ   | j | jd}| j |d}t|}W 5 Q R X t|dd d krLt |_| j	|j_	|S )Nr-   r  r  )
r   r   r   r   r  r"  r   r   r  r  )rI   r   r  r  r:   r:   r;   read_metadata  s    
zFileSystemReader.read_metadata)r  r   r?   c                 C   s   |j | _ | j d k	std S r@   )r   ry   )rI   r  r   r:   r:   r;   set_up_storage_reader  s    z&FileSystemReader.set_up_storage_readerr   c                 C   s   |S r@   r:   r   r:   r:   r;   r     s    z#FileSystemReader.prepare_local_planr   c                 C   s   |S r@   r:   )rI   r   r:   r:   r;   r     s    z$FileSystemReader.prepare_global_planc                 C   s   | j S r  r  rK   r:   r:   r;   r     s    zFileSystemReader.checkpoint_idc                 C   s
   t |S r@   r  r   r:   r:   r;   r     s    z'FileSystemReader.validate_checkpoint_id)N)r3   r4   r5   r   r7   r   r   rS   r/   r   r   r  r   r   r   r(   r'  r   r(  r   r)  r   r   r   r   r   r   r   r  r:   r:   r   r;   r*   j  s   ,c                
       sT   e Zd ZdZdeeejf eee	e	eedd fdd	Z
eed
 fddZ  ZS )r)   r   Trs   r   FN)r   r   r   r   r   cache_staged_state_dictr   r?   c              	      s   t  j|||||||d dS )aM  
        Initialize the writer pointing to `path`.

        Args:
            path: directory where the checkpoint will be written to.
            single_file_per_rank: Produce one file per rank instead of one file per tensor/blob. Default to True.
            sync_files : force files to be synced to permanent storage. Default to True.
            thread_count: Number of IO threads to use to write. Default to 1.
            per_thread_copy_ahead: How many bytes to copy from the GPU ahead of saving then. Default 10Mb.
            cache_staged_state_dict: Whether to cache the staged state_dict. This option decreases staging latency
                at the cost of increases memory usage. Additionally, if this parameter is set to True, it's the expectation
                that the stager is maintained and re-used for multiple dcp.async_save calls. Default to False.
            overwrite: Whether to allow overwriting existing checkpoints. Defaults to True.

        N. B. If sync_files is disabled, there's no guarantee that the checkpoint will be consistent in the case of a failure.
        )r   r   r   r   r   r*  r   N)r   rS   )rI   r   r   r   r   r   r*  r   r   r:   r;   rS     s    zFileSystemWriter.__init__)
state_dictr?   c                    s   d| _ t |S )zOverride of AsyncStager.stager   )r   r   stage)rI   r+  r   r:   r;   r,    s    zFileSystemWriter.stage)TTrs   r   FT)r3   r4   r5   r6   r   r7   r   r   r   r9   rS   r   r,  r  r:   r:   r   r;   r)     s$         $)Wra   r   r   r   r   r  r   r  rA   r   abcr   r   
contextlibr   r   pathlibr   typingr   r   r	   r
   r   r   r   r   r   r   r   r   rO   r   Ztorch._utilsr   r   Ztorch.distributed._shard._utilsr   Z%torch.distributed.checkpoint.metadatar   r   r   r   Z$torch.distributed.checkpoint.plannerr   r   r   r   r   r    r!   r"   Z$torch.distributed.checkpoint.stagingr#   Z$torch.distributed.checkpoint.storager$   r%   r&   Z"torch.distributed.checkpoint.utilsr'   Ztorch.futuresr(   __all__r.   r7   r8   r/   r<   r   rC   rD   rP   r\   r9   r   r   r   r   r   r   r   r   r,   r+   r   r*   r)   r:   r:   r:   r;   <module>   sv    8(
ZD*/ =^