o
    "j                      @   sf   d Z ddlZddlmZ ddlmZ g ZG dd dZG dd deZG d	d
 d
Z	G dd dZ
dS )z
Communicator is used for async distribute training in distribute_transpiler mode.
It's a wrapper of a cpp class Communicator and should be used inside fleet API.
    N)DistributedMode)corec                   @   s|   e Zd ZdddZ	dddZ			d d	d
Zdd Zdd Zdd Zdd Z	dd Z
dd Zdd Zdd Zd!ddZdS )"CommunicatorNc                 C   s   |du r|du r
i }n.|t jkrd|d |d< t|d |d< t|d |d< t|d |d< t|d |d< d}|t jkrCd}n|t jkrKd	}n|t jkrSd
}n|t jkrZd}|| _|| _d| _	d| _
d| _dS )a  
        Communicator is used for async distribute training in distribute_transpiler mode.
        It's a wrapper of a cpp class Communicator and should be used inside fleet API.

        Args:
            program(Program): the trainers program after transpile of distribute_transpiler.
            It's used by communicator to extract the information to do communication.

        Returns:
            None

        Examples:
            .. code-block:: python

                >>> import paddle

                >>> prog = paddle.static.Program()
                >>> comm = paddle.distributed.communicator.Communicator(prog)
                >>> comm.start()
                >>> comm.stop()
        N,Zpserver_endpointsZtrainers
trainer_idZneed_global_stepZbarrier_table_idSYNCASYNC
HALF_ASYNCGEO)r   r   joinstrr   r	   r
   modeenvscommunicator_	send_ctx_	recv_ctx_)selfr   kwargsr   Zmode_str r   `/var/www/html/Deteccion_Ine/venv/lib/python3.10/site-packages/paddle/distributed/communicator.py__init__)   s4   





zCommunicator.__init__c              	   C   s>   |d u r	t j }t| j|||||| j| _|| _|| _	d S N)
paddlestaticglobal_scoper   ZDistCommunicatorr   r   r   r   r   )r   send_ctxZrecv_ctxZ	proto_txtZunit64_hostsscoper   r   r   init_with_ctx`   s   
	
zCommunicator.init_with_ctx  '     c                 C   s   | j ||| d S r   )r   "create_client_to_client_connection)r   Zpserver_timeout_msZpserver_connect_timeout_msZ	max_retryr   r   r   r!   q   s   z/Communicator.create_client_to_client_connectionc                 C   s
   | j  S r   )r   get_client_infor   r   r   r   r"   {   s   
zCommunicator.get_client_infoc                 C      | j | d S r   )r   set_clients)r   Z	host_listr   r   r   r%   ~      zCommunicator.set_clientsc                 C   $   | j du rtd dS | j   dS )a  
        Start communicator. Should call before training process.

        Returns:
            None

        Examples:
            .. code-block:: python

                >>> import paddle

                >>> prog = paddle.static.Program()
                >>> comm = paddle.distributed.communicator.Communicator(prog)
                >>> comm.start()
                >>> comm.stop()
        Nz;you must call init_with_ctx first to init comm before start)r   printstartr#   r   r   r   r)         
zCommunicator.startc                 C   r'   )a  
        Stop communicator. Should call after training process.

        Returns:
            None

        Examples:
            .. code-block:: python

                >>> import paddle

                >>> prog = paddle.static.Program()
                >>> comm = paddle.distributed.communicator.Communicator(prog)
                >>> comm.start()
                >>> comm.stop()
        N:you must call init_with_ctx first to init comm before stop)r   r(   stopr#   r   r   r   r,      r*   zCommunicator.stopc                 C   r'   )aZ  
        Get communicator is running or stop.

        Returns:
            bool

        Examples:
            .. code-block:: python

                >>> import paddle

                >>> prog = paddle.static.Program()
                >>> comm = paddle.distributed.communicator.Communicator(prog)
                >>> comm.is_running()
        Nr+   )r   r(   
is_runningr#   r   r   r   r-      s   
zCommunicator.is_runningc                 C      | j   d S r   )r   recvr#   r   r   r   r/         zCommunicator.recvc                 C   r$   r   )r   init_paramsr   contextr   r   r   r1      r&   zCommunicator.init_paramsc                 C   r$   r   )r   
pull_denser2   r   r   r   r4      r&   zCommunicator.pull_densec                 C   sh   |d u r	t j }|  stdt|tsJ t|tsJ |dkr*| j| 	 }| j
||| d S )NzTCommunicator should init first. Using fleet.init_worker() before push_sparse_param()r5   )r   r   r   r-   
ValueError
isinstancer   intr   table_idr   push_sparse_param)r   var_namer9   r   r   r   r   r:      s   
zCommunicator.push_sparse_param)NNr   )r   r   r    )r5   N)__name__
__module____qualname__r   r   r!   r"   r%   r)   r,   r-   r/   r1   r4   r:   r   r   r   r   r   (   s"    
8


r   c                       s6   e Zd Zd
 fdd	Zdd Zdd Zdd	 Z  ZS )FLCommunicatorNc                    s8   d }t  || i }i }d}d| _| |||| d S )N ZWITH_COORDINATOR)superr   r   r   )r   Zps_hostsr   r   r   Z	dense_mapZprototxt	__class__r   r   r      s   zFLCommunicator.__init__c                 C   s    | j d ur| j || d S d S r   )r   start_coordinator)r   Zself_endpointZtrainer_endpointsr   r   r   rD      s
   
z FLCommunicator.start_coordinatorc                 C   s"   | j d ur| j | d S td)Nzself.communicator_ is null)r   save_fl_strategyr6   )r   mpr   r   r   rE      s   
zFLCommunicator.save_fl_strategyc                 C   s   i }| j d ur| j  }|S r   )r   query_fl_clients_info)r   Zinfo_mpr   r   r   rG      s   

z$FLCommunicator.query_fl_clients_infor   )r<   r=   r>   r   rD   rE   rG   __classcell__r   r   rB   r   r?      s
    	r?   c                   @   s,   e Zd Zdd Zdd Zdd Zdd Zd	S )
LargeScaleKVc                 C   s   t  | _d S r   )r   rI   scale_kvr#   r   r   r   r      r0   zLargeScaleKV.__init__c                 C      | j || d S r   )rJ   saver   varnamedirnamer   r   r   rL         zLargeScaleKV.savec                 C   rK   r   )rJ   loadrM   r   r   r   rQ      rP   zLargeScaleKV.loadc                 C   s   | j |S r   )rJ   size)r   rN   r   r   r   rR      s   zLargeScaleKV.sizeN)r<   r=   r>   r   rL   rQ   rR   r   r   r   r   rI      s
    rI   c                   @   s   e Zd Zdd Zdd ZdS )HeterClientc                 C   s   t |||| _d S r   )r   rS   heter_client_)r   ZendpointZprevious_endpointr   r   r   r   r     s   
zHeterClient.__init__c                 C   r.   r   )rT   r,   r#   r   r   r   r,   
  r0   zHeterClient.stopN)r<   r=   r>   r   r,   r   r   r   r   rS     s    rS   )__doc__r   Z"paddle.distributed.ps.utils.publicr   Zpaddle.frameworkr   __all__r   r?   rI   rS   r   r   r   r   <module>   s    2