o
    *j#"                     @   s   d Z ddlZddlZddlZddlZddlmZ dadd Z	d#ddZ
	d$d	d
Zd%ddZdd Zdd Zdd Ze dd Zdd Zdd Zd&ddZdd Zdefdd Zdefd!d"ZdS )'zDistributed helpers.    Nc                    sl   g }g }t  }| D ]  fddt|D }t j| dd || q
|D ]}|tj|dd q'|S )z
    All gathers the provided tensors from all processes across machines.
    Args:
        tensors (list): tensors to perform all gather across all processes in
        all machines.
    c                    s   g | ]}t  qS  )torchZ	ones_like.0_tensorr   ~/var/www/html/Deteccion_Ine/venv/lib/python3.10/site-packages/modelscope/models/multi_modal/videocomposer/utils/distributed.py
<listcomp>   s    
zall_gather.<locals>.<listcomp>FZasync_opr   dim)distget_world_sizerange
all_gatherappendr   cat)tensorsZgather_listZoutput_tensor
world_sizeZtensor_placeholderZgathered_tensorr   r   r	   r      s   
r   Tc                 C   s@   | D ]	}t j|dd q|rt  }| D ]	}|d|  q| S )a-  
    All reduce the provided tensors from all processes across machines.
    Args:
        tensors (list): tensors to perform all reduce across all processes in
        all machines.
        average (bool): scales the reduced tensor by the number of overall
        processes across all machines.
    Fr   g      ?)r   
all_reducer   Zmul_)r   Zaverager   r   r   r   r	   r   %   s   
r   ncclc                 C   s6   t j|  | ||  }|| }tj||||d dS )a~  
    Initializes the default process group.
    Args:
        local_rank (int): the rank on the current local machine.
        local_world_size (int): the world size (number of processes running) on
        the current local machine.
        shard_id (int): the shard index (machine rank) of the current machine.
        num_shards (int): number of shards for distributed training.
        init_method (string): supporting three different methods for
            initializing process groups:
            "file": use shared file system to initialize the groups across
            different processes.
            "tcp": use tcp address to initialize the groups across different
        dist_backend (string): backend to use for distributed training. Options
            includes gloo, mpi and nccl, the details can be found here:
            https://pytorch.org/docs/stable/distributed.html
    )backendinit_methodr   rankN)r   cudaZ
set_devicer   init_process_group)Z
local_rankZlocal_world_sizeZshard_idZ
num_shardsr   Zdist_backendZ	proc_rankr   r   r   r	   r   8   s   
r      c                 C   s   t j rt |  dkS dS )zB
    Determines if the current process is the master process.
    r   T)r   distributedis_initializedr   get_rank)Znum_gpusr   r   r	   is_master_proc^   s   
r!   c                   C       t  sdS t  sdS t  S )z$
    Get the size of the world.
       )r   is_availabler   r   r   r   r   r	   r   h   
   r   c                   C   r"   )z.
    Get the rank of the current process.
    r   )r   r$   r   r    r   r   r   r	   r    s   r%   r    c                  C   s8   t  sdS t  sdS t  } | dkrdS t   dS )zj
    Helper function to synchronize (barrier) among all processes when
    using distributed training
    Nr#   )r   r$   r   r   Zbarrier)r   r   r   r	   synchronize~   s   r&   c                   C   s    t  dkrt jddS t jjS )z
    Return a process group based on gloo backend, containing all the ranks
    The result is cached.
    Returns:
        (group): pytorch dist group.
    r   gloo)r   )r   get_backend	new_groupgroupZWORLDr   r   r   r	   _get_global_gloo_group   s   r+   c                 C   s   t |}|dv sJ t|dkrdnd}t| }t|dkr5tt	}|
dt t|d | tj|}t|j|d}|S )a  
    Seriialize the tensor to ByteTensor. Note that only `gloo` and `nccl`
        backend is supported.
    Args:
        data (data): data to be serialized.
        group (group): pytorch dist group.
    Returns:
        tensor (ByteTensor): tensor that serialized.
    )r'   r   r'   cpur   i   @z;Rank {} trying to all-gather {:.2f} GB of data on device {})device)r   r(   r   r-   pickledumpslenlogging	getLogger__name__warningformatr    ZByteStoragefrom_bufferZ
ByteTensorto)datar*   r   r-   bufferloggerZstorager   r   r   r	   _serialize_to_tensor   s   


r;   c                    s   t j|d}|dksJ dtj  gtj jd} fddt|D }t j|||d dd |D }t	|}||krStj
|| ftj jd}tj |fdd	 | fS )
a  
    Padding all the tensors from different GPUs to the largest ones.
    Args:
        tensor (tensor): tensor to pad.
        group (group): pytorch dist group.
    Returns:
        list[int]: size of the tensor, on each rank
        Tensor: padded tensor that has the max size
    r*   r#   zHcomm.gather/all_gather must be called from ranks within the given group!Zdtyper-   c                    s"   g | ]}t jd gt j jdqS )r#   r=   )r   zerosint64r-   r   r   r   r	   r
          z*_pad_to_largest_tensor.<locals>.<listcomp>c                 S   s   g | ]}t | qS r   )intitem)r   sizer   r   r	   r
      s    r   r   )r   r   r   r   Znumelr?   r-   r   r   maxr>   uint8r   )r   r*   r   Z
local_size	size_listmax_sizepaddingr   r   r	   _pad_to_largest_tensor   s*   


rI   c                    s   t  dkr| gS |du rt }t |dkr| gS t| |t|\}t|  fdd|D }tj||d g }t||D ]\} 	 
 d| }|t| qB|S )a=  
    Run all_gather on arbitrary picklable data (not necessarily tensors).

    Args:
        data: any picklable object
        group: a torch process group. By default, will use a group which
            contains all ranks on gloo backend.

    Returns:
        list[data]: list of data gathered from each rank
    r#   Nc                    s"   g | ]}t j ft jjd qS )r=   )r   emptyrE   r-   r   rG   r   r   r	   r
      r@   z(all_gather_unaligned.<locals>.<listcomp>r<   )r   r+   r   r;   rI   rD   r   zipr,   numpytobytesr   r.   loads)r8   r*   rF   Ztensor_listZ	data_listrC   r9   r   rK   r	   all_gather_unaligned   s$   

rP   c                 C   sd   | j dkrdS | j }t | }t|D ]}tt|| |d | }t|}|| jkr/|aqdS )z?
    Initialize variables needed for distributed training.
    r#   N)ZNUM_GPUSr   r   r   listr)   ZSHARD_ID_LOCAL_PROCESS_GROUP)cfgZnum_gpus_per_machineZnum_machinesiZ
ranks_on_iZpgr   r   r	   init_distributed_training  s   


rU   returnc                   C   s$   t  sdS t  sdS t jtdS )zw
    Returns:
        The size of the per-machine process group,
        i.e. the number of processes per machine.
    r#   r<   )r   r$   r   r   rR   r   r   r   r	   get_local_size  s
   rW   c                   C   s0   t  sdS t  sdS tdusJ t jtdS )zh
    Returns:
        The rank of the current process within the local (per-machine) process group.
    r   Nr<   )r   r$   r   rR   r    r   r   r   r	   get_local_rank  s   rX   )T)r   )r   )N)__doc__	functoolsr1   r.   r   Ztorch.distributedr   r   rR   r   r   r   r!   r   r    r&   	lru_cacher+   r;   rI   rP   rU   rA   rW   rX   r   r   r   r	   <module>   s.   


&


$'