o
    "j2                     @   s  d dl Z d dlZd dlZd dlZd dlmZ d dlmZ d dlm	Z	 d dl
mZmZ d dlmZ edg dZd	Zd
ZdZdad add Zdd Zdd Zdd Zdd Zd(ddZddefddZddefddZdd Zdd Zd d! Zd"d# Z d$d% Z!d&d' Z"dS ))    N)
namedtuple)core)Node)
PythonFunc
_serialize)logger
WorkerInfo)namerankipportiic                 C   s   | a d S N_barrier_store)store r   [/var/www/html/Deteccion_Ine/venv/lib/python3.10/site-packages/paddle/distributed/rpc/rpc.py_set_barrier_store&   s   r   c                   C   s   b d S r   r   r   r   r   r   _del_barrier_store+   s   r   c                 C   s(   t t| |||}tt|| d S r   )pickledumpsr   r   setstr)r	   r
   r   r   Z	self_infor   r   r   _set_self_info0   s   r   c                 C   sX   g }t  }t| D ] }ttt|}|j|vsJ d||j |	| q	|S )Nz:The Worker name must be unique, but name `{}` is repeated.)
r   ranger   loadsr   getr   r	   addappend)
world_size	all_infossr
   infor   r   r   _exchange_all_service_infos5   s   r$   c                  C   s$   t  } |  }|  }| d| S )N:)r   Zget_host_ipZget_free_port)noder   Z	free_portr   r   r   _gen_endpointB   s   r'   c                 C   sX  |du rt tjd n|}|du rt tjd n|}tdd}|du r't }td| d|  |dur8|ntjd }|d\}}t |}t td	d
}tj	|||dk||d}t
| |d\}	}
t |
}
t| ||	|
 t|}g }|D ]}t|j|j|j|j}|| qzt| | t  t|| t  td| d dS )a  
    init rpc.

    Args:
        name (str): worker name.
        rank (int, optional): worker id, default is None.
        world_size (int, optional): number of workers, default is None.
        master_endpoint (str, optional): id address of master, other nodes communicate with the master to
            get the information of all worker nodes, default is None.

    Returns:
        None.

    Examples:
        .. code-block:: python

            >>> # doctest: +REQUIRES(env:DISTRIBUTED)
            >>> import paddle.distributed.rpc as rpc

            >>> rpc.init_rpc("worker0", rank=0, world_size=1,
            ...             master_endpoint="127.0.0.1:8001")

            >>> rpc.shutdown()

    NZPADDLE_TRAINER_IDZPADDLE_TRAINERS_NUMZPADDLE_WORKER_ENDPOINTTrainer z: worker endpoint: ZPADDLE_MASTER_ENDPOINTr%   ZFLAGS_stop_check_timeoutZ900r   )timeoutz: Init RPC done!)intosenvirongetenvr'   r   r#   splitr   ZTCPStorer   r   r$   r   r	   r
   r   r   r   Zinit_and_set_agent_instanceZrpc_start_worker_barrier_never_timeoutZrpc_start_client)r	   r
   r    Zmaster_endpointZworker_endpointZmaster_addrZmaster_portZstop_check_timeoutr   r   r   r!   Zc_infosZ	node_infor#   r   r   r   init_rpcI   sN   
r0   c                 C   s   t | ||||}| S )a  
    Make a blocking RPC call to run function ``fn`` on worker ``to``. Attention: Users must use this API in a secure network environment.

    Args:
        to (str): name of the destination worker.
        fn (fn): a callable function, such as Python callables.
        args (tuple, optional): the argument tuple for the ``fn`` invocation, default is None.
        kwargs (dict, optional): is a dictionary of keyword arguments for the ``fn``
                       invocation, default is None.
        timeout (int, optional): timeout in seconds to use for this RPC. If
                                   the RPC does not complete in this amount of
                                   time, an exception indicating it has
                                   timed out will be raised. A value less than or equal to 0
                                   indicates an infinite timeout, i.e. a timeout
                                   error will never be raised. The default value is -1.

    Returns:
        Returns the result of running ``fn`` with ``args`` and ``kwargs``.

    Examples:
        .. code-block:: python

            >>> # doctest: +REQUIRES(env:DISTRIBUTED)
            >>> import paddle.distributed.rpc as rpc

            >>> def add(a, b):
            ...     return a + b

            >>> rpc.init_rpc("worker0", rank=0, world_size=1,
            ...         master_endpoint="127.0.0.1:8002")

            >>> ret = rpc.rpc_sync("worker0", add, args=(2, 3))
            >>> rpc.shutdown()

    )_invoke_rpcwait)tofnargskwargsr)   Zfutr   r   r   rpc_sync   s   $r7   c                 C   s   t | ||||S )a  
    Make a non-blocking RPC call to run function ``fn`` on worker ``to``. Attention: Users must use this API in a secure network environment.

    Args:
        to (str): name of the destination worker.
        fn (fn): a callable function, such as Python callables.
        args (tuple, optional): the argument tuple for the ``fn`` invocation, default is None.
        kwargs (dict, optional): is a dictionary of keyword arguments for the ``fn``
                       invocation, default is None.
        timeout (int, optional): timeout in seconds to use for this RPC. If
                                   the RPC does not complete in this amount of
                                   time, an exception indicating it has
                                   timed out will be raised. A value less than or equal to 0
                                   indicates an infinite timeout, i.e. a timeout
                                   error will never be raised. The default value is -1.

    Returns:
        Returns a :class:`FutureWrapper` object that can be waited
        on. When completed, the return value of ``fn`` on ``args`` and
        ``kwargs`` can be got by `fut.wait()`.

    Examples:
        .. code-block:: python

            >>> # doctest: +REQUIRES(env:DISTRIBUTED)
            >>> import paddle.distributed.rpc as rpc

            >>> def add(a, b):
            ...     return a + b

            >>> rpc.init_rpc("worker0", rank=0, world_size=1,
            ...         master_endpoint="127.0.0.1:8003")

            >>> fut = rpc.rpc_async("worker0", add, args=(2, 3))
            >>> print(fut.wait())
            5

            >>> rpc.shutdown()

    )r1   )r3   r4   r5   r6   r)   r   r   r   	rpc_async   s   )r8   c                 C   sR   |r|nd}|r
|ni }t t|||}|d }|dkrtn|}t| ||}|S )Nr   i  r   )r   r   _MAX_RPC_TIMEOUT_MSr   Z
invoke_rpc)r3   r4   r5   r6   r)   Z
serial_objZ
timeout_msfuturer   r   r   r1      s   r1   c                    s   t jtd|dk rd S dtt d  td7 adk}fdd}|rA fd	d
td|D }t td d || d S  td g}|| t t d d S )N)days   zBarrier//   r   c                    sp   t   }t| dkr6t d t   | }tj|dkr%td|  ttdd | } t| dks
d S d S )Nr   g?)secondsz4Keys {} are not ready sinck rank {} is waiting them.c                 S   s   t t| dkS )Nr>   )r*   r   r   )keyr   r   r   <lambda>  s    zC_barrier_never_timeout.<locals>._check_keys_ready.<locals>.<lambda>)	timelensleepdatetime	timedeltaRuntimeErrorformatlistfilter)	wait_keys
start_timeZelapse_time)global_rankr)   r   r   _check_keys_ready   s   
z1_barrier_never_timeout.<locals>._check_keys_readyc                    s   g | ]} t | qS r   )r   ).0r
   )barrier_prefixr   r   
<listcomp>  s    z*_barrier_never_timeout.<locals>.<listcomp>)rE   rF   _BARRIER_TIMEOUT_MAX_DAYSr   _barrier_countr   r   r   )rM   Zglobal_world_sizeZ	is_masterrN   rK   r   )rP   rM   r)   r   r/      s    
r/   c                  C   sD   t  } | j}tt }t|| t  t  t	d| d dS )a+  
    Perform a shutdown of the RPC agent, stop the worker and destroy the agent.
    This will block until all local and remote RPC processes reach this method
    and wait for all outstanding work to complete.

    Returns:
        None.

    Examples:
        .. code-block:: python

            >>> # doctest: +REQUIRES(env:DISTRIBUTED)
            >>> import paddle.distributed.rpc as rpc

            >>> rpc.init_rpc("worker0", rank=0, world_size=1,
            ...             master_endpoint="127.0.0.1:8004")

            >>> rpc.shutdown()

    r(   z: rpc shutdown!N)
get_current_worker_infor
   rC   get_all_worker_infosr/   r   Zrpc_stop_workerr   r   r#   )r#   r
   r    r   r   r   shutdown  s   

rV   c                 C   s
   t | S )a  
    Get worker information by worker name.

    Args:
        name (str): name of the worker.

    Returns:
        class `WorkerInfo` with attribute `name`, `rank`, `ip` and `port`.

    Examples:
        .. code-block:: python

            >>> # doctest: +REQUIRES(env:DISTRIBUTED)
            >>> import paddle.distributed.rpc as rpc
            >>> import os

            >>> os.environ["PADDLE_WORKER_ENDPOINT"] = "127.0.0.1:9002"
            >>> rpc.init_rpc("worker0", rank=0, world_size=1,
            ...             master_endpoint="127.0.0.1:8005")

            >>> print(rpc.get_worker_info("worker0"))
            {name: worker0, rank: 0, ip: 127.0.0.1, port: 9002}

            >>> rpc.shutdown()

    )r   Zrpc_get_worker_info)r	   r   r   r   get_worker_info5  s   
rW   c                   C      t  S )aY  
    Get all worker informations.

    Returns:
        List[WorkerInfo].

    Examples:
        .. code-block:: python

            >>> # doctest: +REQUIRES(env:DISTRIBUTED)
            >>> import paddle.distributed.rpc as rpc
            >>> import os

            >>> os.environ["PADDLE_WORKER_ENDPOINT"] = "127.0.0.1:9003"
            >>> rpc.init_rpc("worker0", rank=0, world_size=1,
            ...         master_endpoint="127.0.0.1:8006")

            >>> print(rpc.get_all_worker_infos())
            [{name: worker0, rank: 0, ip: 127.0.0.1, port: 9003}]

            >>> rpc.shutdown()

    )r   Zrpc_get_all_worker_infosr   r   r   r   rU   S     rU   c                   C   rX   )a  
    Get current worker information.

    Returns:
        class `WorkerInfo` with attribute `name`, `rank`, `ip` and `port`.

    Examples:
        .. code-block:: python

            >>> # doctest: +REQUIRES(env:DISTRIBUTED)
            >>> import paddle.distributed.rpc as rpc
            >>> import os

            >>> os.environ["PADDLE_WORKER_ENDPOINT"] = "127.0.0.1:9004"
            >>> rpc.init_rpc("worker0", rank=0, world_size=1,
            ...             master_endpoint="127.0.0.1:8007")

            >>> print(rpc.get_current_worker_info())
            {name: worker0, rank: 0, ip: 127.0.0.1, port: 9004}

            >>> rpc.shutdown()

    )r   Zrpc_get_current_worker_infor   r   r   r   rT   n  rY   rT   )NNN)#rE   r+   r   rB   collectionsr   Zpaddle.baser   Z!paddle.distributed.launch.contextr   Zpaddle.distributed.rpc.internalr   r   Z%paddle.distributed.utils.launch_utilsr   r   Z_DEFAULT_RPC_TIMEOUTr9   rR   r   rS   r   r   r   r$   r'   r0   r7   r8   r1   r/   rV   rW   rU   rT   r   r   r   r   <module>   s:   
F(,
)