o
    "j!                     @   s\   d dl Z d dlmZmZ d dlmZ d dlmZ g Zda	dd Z
dd Zd	d
 Zdd ZdS )    N)ManagerProcess)core)wait_server_readyc                 C   sf   ddl m} |t| |d}|  d}|dds| s-t| |dds| r|  d S )Nr   )KVServer)size   runningF)	Z*paddle.distributed.fleet.utils.http_serverr   intstartgetZshould_stoptimesleepstop)portZhttp_server_dr   r   Zhttp_serverZwait_seconds r   f/var/www/html/Deteccion_Ine/venv/lib/python3.10/site-packages/paddle/distributed/parallel_with_gloo.py_start_kv_server   s   
r   c                 C   s   |dk du s
J dt  }| }d|d< | dkr8d|i}ttt|dd ||fd	}d
|_d
|d< |  t|g t	
 }| |_||_|dd |_t|dd |_d|_d|_t	|at  | dkrvd|d< |  dS dS )u  
    Initialize parallel environment with gloo for cpu only.

    Args:
        - rank_id（int, required) - the index of current rank;
        - rank_num (int, required) - the number of ranks in this parallel env;
        - server_endpoint (str, required) - endpoint of server to init gloo context in ip:port format;

    Returns:
        None

    Examples:
        .. code-block:: python

            >>> import paddle
            >>> import multiprocessing
            >>> from contextlib import closing
            >>> import socket

            >>> port_set = set()

            >>> def find_free_port():
            ...     def _free_port():
            ...         with closing(socket.socket(socket.AF_INET,
            ...             socket.SOCK_STREAM)) as s:
            ...             s.bind(('', 0))
            ...             return s.getsockname()[1]
            ...     while True:
            ...         port = _free_port()
            ...         if port not in port_set:
            ...             port_set.add(port)
            ...             return port

            >>> def test_gloo_init(id, rank_num, server_endpoint):
            ...     paddle.distributed.gloo_init_parallel_env(
            ...         id, rank_num, server_endpoint)

            >>> def test_gloo_init_with_multiprocess(num_of_ranks):
            ...     jobs = []
            ...     server_endpoint = "127.0.0.1:%s" % (find_free_port())
            ...     for id in range(num_of_ranks):
            ...         p = multiprocessing.Process(
            ...             target=test_gloo_init,
            ...             args=(id, num_of_ranks, server_endpoint))
            ...         jobs.append(p)
            ...         p.start()
            ...     for proc in jobs:
            ...         proc.join()

            >>> if __name__ == '__main__':
            ...     # Arg: number of ranks (processes)
            ...     test_gloo_init_with_multiprocess(2)
       FzRrank_num should greater than or equal to 2 for parallel environment initialzation.r	   r   Z_worker:   )targetargsTi  i N)r   dictr   r   r
   splitdaemonr   r   r   ZGlooParallelStrategyrankrank_num
ip_addressZip_portZinit_secondsZrun_secondsZGlooParallelContext_global_gloo_ctxinitjoin)Zrank_idr   Zserver_endpointmanagerZhttp_server_statusr   Zhttp_server_procZgloo_strategyr   r   r   gloo_init_parallel_env)   s>   8

r#   c                   C   s   t dusJ dt   dS )a	  
    Call barrier function with initialized gloo context.

    Args:
        None

    Returns:
        None

    Examples:
        .. code-block:: python

            >>> import paddle
            >>> import multiprocessing
            >>> from contextlib import closing
            >>> import socket

            >>> port_set = set()

            >>> def find_free_port():
            ...     def _free_port():
            ...         with closing(socket.socket(socket.AF_INET,
            ...             socket.SOCK_STREAM)) as s:
            ...             s.bind(('', 0))
            ...             return s.getsockname()[1]
            ...     while True:
            ...         port = _free_port()
            ...         if port not in port_set:
            ...             port_set.add(port)
            ...             return port

            >>> def test_gloo_barrier(id, rank_num, server_endpoint):
            ...     paddle.distributed.gloo_init_parallel_env(
            ...         id, rank_num, server_endpoint)
            ...     paddle.distributed.gloo_barrier()

            >>> def test_gloo_barrier_with_multiprocess(num_of_ranks):
            ...     jobs = []
            ...     server_endpoint = "127.0.0.1:%s" % (find_free_port())
            ...     for id in range(num_of_ranks):
            ...         p = multiprocessing.Process(
            ...             target=test_gloo_barrier,
            ...             args=(id, num_of_ranks, server_endpoint))
            ...         jobs.append(p)
            ...         p.start()
            ...     for proc in jobs:
            ...         proc.join()

            >>> if __name__ == '__main__':
            ...     # Arg: number of ranks (processes)
            ...     test_gloo_barrier_with_multiprocess(2)
    Nzgloo context is not initialzed.)r   Zbarrierr   r   r   r   gloo_barrier   s   6r$   c                   C   s   t dur
t   dS dS )a?  
    Release the parallel environment initialized by gloo

    Args:
        None

    Returns:
        None

    Examples:
        .. code-block:: python

            >>> import paddle
            >>> import multiprocessing
            >>> from contextlib import closing
            >>> import socket

            >>> port_set = set()

            >>> def find_free_port():
            ...     def _free_port():
            ...         with closing(socket.socket(socket.AF_INET,
            ...             socket.SOCK_STREAM)) as s:
            ...             s.bind(('', 0))
            ...             return s.getsockname()[1]
            ...     while True:
            ...         port = _free_port()
            ...         if port not in port_set:
            ...             port_set.add(port)
            ...             return port

            >>> def test_gloo_release(id, rank_num, server_endpoint):
            ...     paddle.distributed.gloo_init_parallel_env(
            ...         id, rank_num, server_endpoint)
            ...     paddle.distributed.gloo_barrier()
            ...     paddle.distributed.gloo_release()

            >>> def test_gloo_release_with_multiprocess(num_of_ranks):
            ...     jobs = []
            ...     server_endpoint = "127.0.0.1:%s" % (find_free_port())
            ...     for id in range(num_of_ranks):
            ...         p = multiprocessing.Process(
            ...             target=test_gloo_release,
            ...             args=(id, num_of_ranks, server_endpoint))
            ...         jobs.append(p)
            ...         p.start()
            ...     for proc in jobs:
            ...         proc.join()

            >>> if __name__ == '__main__':
            ...     # Arg: number of ranks (processes)
            ...     test_gloo_release_with_multiprocess(2)
    N)r   releaser   r   r   r   gloo_release   s   7r&   )r   multiprocessingr   r   Zpaddle.baser   Z5paddle.distributed.fleet.base.private_helper_functionr   __all__r   r   r#   r$   r&   r   r   r   r   <module>   s   a: