o
    "j^                     @   s
  d dl Z d dlZd dlZd dlZd dlZd dlmZ d dlmZ d dl	m
Z
mZ d dlmZ d dlmZ d dlmZmZmZ d dlmZmZmZ d d	lmZ g ZG d
d dZdd Zdd Zdd Zdd Zdd Z dd Z!dd Z"dd Z#G dd dZ$d$d"d#Z%dS )%    N)core)
get_device)_get_trainers_numget_cluster_and_pod)use_paddlecloud)get_cluster_from_args)
DeviceModeblock_windows_and_macoscheck_backend)_prepare_trainer_env_print_argumentsget_host_name_ip)	set_flagsc                   @   s   e Zd Zdd ZdS )ParallelEnvArgsc                 C   s(   d | _ d | _d | _d | _d| _d | _d S )NT)cluster_node_ipsnode_ipr   started_portprint_configselected_devices)self r   Y/var/www/html/Deteccion_Ine/venv/lib/python3.10/site-packages/paddle/distributed/spawn.py__init__/   s   
zParallelEnvArgs.__init__N)__name__
__module____qualname__r   r   r   r   r   r   .   s    r   c                 C   sL   g d}g d}| D ]}||vr#||v rt d| t q
td| q
d S )N)start_methodipsgpusxpusr   backend)r   r   r   r   r   zThe config option (%s) of `paddle.distributed.spawn` is deprecated. Please use the latest config options stated in the `spawn` API documentation.zFThe config option (%s) of `paddle.distributed.spawn` is not supported.)warningswarnDeprecationWarning
ValueError)optionsZsupported_optionsZdeprecated_optionskeyr   r   r   _options_valid_checkG   s$   r'   c                  C   sf   t  } d| v rt S d| v rt S d| v rt S | t v r+t| dd S t	d|  d)Ngpuxpucpu:r   I`paddle.distributed.spawn` does not support parallel training on device `` now.)
r   r   get_cuda_device_countget_xpu_device_countmultiprocessing	cpu_countget_available_custom_deviceget_custom_device_countsplitRuntimeErrorZdevicer   r   r   _get_default_nprocsh   s   
r7   c                  C   sJ   t  } d| v r	dS d| v rdS d| v rdS | t v rdS td|  d	)
Nr(   ncclr)   bkclr*   glooxcclr,   r-   )r   r   r2   r5   r6   r   r   r   _get_default_backendx   s   
r<   c                 C   s>   d }dd |  dD }t|dkr|d }|S t \}}|S )Nc                 S   s   g | ]}|  qS r   )strip.0xr   r   r   
<listcomp>   s    z _get_node_ip.<locals>.<listcomp>,   r   )r4   lenr   )r   r   Znode_ips_r   r   r   _get_node_ip   s   
rF   c                    s  d|vs
|d dkrt  |d< t|d  t|d  g }t }|dd |_|jd u r;|dd |_|jd u r;d|_|d dkr|dd |_|jd u rT|dd |_td	d }|d u sb|d
krndd t	t
 D  n|d |jd u rt | k rtdt | f d fddt	d| D |_nP|jd}t|| krtdt|| f |D ]}| vrtd|d qn$|d dkrU|dd |_|jd u r|dd |_tdd }|d u s|d
krdd t	t
 D  n|d |jd u r't | k rtdt | f d fddt	d| D |_n|jd}t|| kr>tdt|| f |D ]}| vrRtd|d q@n|d dkrtd d|_d |_|j|_|dd d u sxJ dt|jdd ksJ d!t d ksJ d"nX|d d#krd |_t
 d }td$| d%d }|d u s|d
krd&d t	t
|D  n|d t | k rtd't | |f d fd(dt	d| D |_|d)d |_|jd u rt|j|_|d*d |_|dd |_|jd u rt |_|d dkr-tt	d| }t|t j!|\}	}
nt"|\}	}
|
j#D ]}|$t%|	||d  q6|d+d,|_&|j&rTt'| |S )-Nr    autor   r   z	127.0.0.1r8   r   r   ZCUDA_VISIBLE_DEVICES c                 S      g | ]}t |qS r   strr>   r   r   r   rA          z,_get_subprocess_env_list.<locals>.<listcomp>rB   zthe number of visible devices(%d) is less than the number of spawn processes(%d), please ensure that the correct `nprocs` argument is passed or the environment variable `CUDA_VISIBLE_DEVICES` is correctly configured.c                       g | ]}t  | qS r   rJ   r>   Zenv_devices_listr   r   rA          r   zThe number of selected devices(%s) is not equal to the number of spawn processes(%d), please ensure that the correct `nprocs` and `gpus` arguments are passed.zCThe selected gpu card {} cannot found in CUDA_VISIBLE_DEVICES ({}).r9   r   ZXPU_VISIBLE_DEVICESc                 S   rI   r   rJ   r>   r   r   r   rA      rL   zthe number of visible devices(%d) is less than the number of spawn processes(%d), please ensure that the correct `nprocs` argument is passed or the environment variable `XPU_VISIBLE_DEVICES` is correctly configured.c                    rM   r   rJ   r>   rN   r   r   rA      rO   zThe number of selected devices(%s) is not equal to the number of spawn processes(%d), please ensure that the correct `nprocs` and `xpus` arguments are passed.zBThe selected xpu card {} cannot found in XPU_VISIBLE_DEVICES ({}).r:   zYour model will be trained under CPUONLY mode by using GLOO,because CPUPlace is specified manually or your installed PaddlePaddle only support CPU Device.Tr   z.CPUONLY spawn doesn't support use paddle cloudrC   zJCPUONLY spawn only support single trainer, that is len(ips)=1, but got %s.z+CPUONLY spawn doesn't support multi-trainerr;   ZFLAGS_selected_sc                 S   rI   r   rJ   r>   r   r   r   rA     s    zthe number of visible devices(%d) is less than the number of spawn processes(%d), please ensure that the correct `nprocs` argument is passed or the environment variable `FLAGS_selected_%ss` is correctly configured.c                    rM   r   rJ   r>   rN   r   r   rA   *  rO   r   r   r   F)(r<   r
   r	   r   getr   r   osgetenvranger   r.   r4   rD   r5   joinr$   formatr/   r!   r"   Zpaddle_cpuonlyr   r   Zget_all_custom_device_typer3   r   rF   r   r   listr   r   ZCPUr   Ztrainersappendr   r   r   )nprocsr%   Zprocesses_env_listargsZenv_devicesZselected_device_listZcard_idZcustom_device_nameZdevices_per_procZclusterZpodZtrainerr   rN   r   _get_subprocess_env_list   s  











	









r[   c                   C   s    t jdd  t jdd  d S )NZ
http_proxyZhttps_proxy)rR   environpopr   r   r   r   _remove_risky_envO  s   r^   c                 C   sR   |dkrt d| d i n|dkrt d| d i n	 | D ]	}| | tj|< qd S )Nr8   ZFLAGS_selected_gpusr9   ZFLAGS_selected_xpus)r   rR   r\   )env_dictr    var_namer   r   r   _set_trainer_envV  s   
ra   c                 C   sp   zt   t|| | | }|| W d S  ty   Y d S  ty7   dd l}||  td Y d S w )Nr   rC   )	r^   ra   putKeyboardInterrupt	Exception	traceback
format_excsysexit)funcrZ   error_queuereturn_queuer_   r    resultre   r   r   r   _func_wrappern  s   
rm   c                   @   s&   e Zd Zdd ZdddZdd ZdS )	MultiprocessContextc                 C   s*   || _ || _|| _dd t|D | _d S )Nc                 S   s   i | ]\}}|j |qS r   )sentinel)r?   indexprocessr   r   r   
<dictcomp>  s    z0MultiprocessContext.__init__.<locals>.<dictcomp>)error_queuesreturn_queues	processes	enumerate	sentinels)r   ru   rs   rt   r   r   r   r     s   zMultiprocessContext.__init__Nc                 C   s   t | jdkr	dS tjj| j |d}d }|D ]}| j|}| j| }|  |j	dkr2|} nq|d u r>t | jdkS | jD ]}|
 rK|  |  qA| | d S )Nr   T)timeout)rD   rw   r0   
connectionwaitkeysr]   ru   rU   exitcodeis_alive	terminate_throw_exception)r   rx   readyerror_indexro   rp   rq   r   r   r   rU     s*   




zMultiprocessContext.joinc                 C   sv   | j |  r(| j| j}|dk r t| j}td||f td||f | j |  }d| }||7 }t|)Nr   z%Process %d terminated with signal %s.z(Process %d terminated with exit code %d.z

----------------------------------------------
Process %d terminated with the following error:
----------------------------------------------

)	rs   emptyru   r|   signalSignalsnamerd   rQ   )r   r   r|   r   Zoriginal_tracemsgr   r   r   r     s*   z$MultiprocessContext._throw_exception)N)r   r   r   r   rU   r   r   r   r   r   rn     s    
rn   r   TFc              
   K   s   t | |dkrt }t||}|dd}|du rd}t|}g }	g }
g }t|D ]1}| }| }|jt	| ||||| |d fd}||_
|  |	| |
| || q+t||	|
}|sg|S | sp	 | rk|S )a  
    Start multiple processes with ``spawn`` method for parallel training.

    .. note::
        ``spawn`` now only supports GPU or XPU collective mode. The collective mode
        of GPU and XPU cannot be started at the same time, so the option `gpus` and
        `xpus` cannot be configured at the same time.

    Args:
        func (function): The target function is called by spawned process.
            This function need to be able to pickled, so it must be defined
            at the top level of a module.
        args (list|tuple, optional): Arguments passed to ``func``.
        nprocs (int, optional): Number of processed to start. Default: -1.
            when nprocs is -1, the available device will be obtained from
            the environment variable when the model is executed: If use GPU,
            the currently available device ID is obtained from the environment
            variable CUDA_VISIBLE_DEVICES; If use XPU, the currently available
            device ID is obtained from the environment variable XPU_VISIBLE_DEVICES.
        join (bool, optional): Perform a blocking join on all spawned processes.
            Default: True.
        daemon (bool, optional): The spawned processes' daemon flag. Default: False.
        **options(dict, optional): Other initial parallel execution environment
            configuration options. The following options are currently supported:
            (1) start_method (string): the way to start a process.
            The start method can be ``spawn`` , ``fork`` , ``forkserver`` .
            Because the CUDA runtime does not support the ``fork`` start method,
            when use CUDA in subprocesses, we should start process by ``spawn``
            or ``forkserver`` method. Default: "spawn" ;
            (2) gpus (string): The training process will run on the
            selected gpus, such as "0,1,2,3". Default: None;
            (3) xpus (string): The training process will run on the
            selected xpus, such as "0,1,2,3". Default: None;
            (5) ips (string): Paddle cluster nodes ips, such as
            "192.168.0.16,192.168.0.17". Default: "127.0.0.1" .

    Returns:
        ``MultiprocessContext`` object, it hold the spawned processes.

    Examples:
        .. code-block:: python

            >>> # doctest: +REQUIRES(env:DISTRIBUTED)
            >>> import paddle
            >>> import paddle.nn as nn
            >>> import paddle.optimizer as opt
            >>> import paddle.distributed as dist

            >>> class LinearNet(nn.Layer):
            ...     def __init__(self):
            ...         super().__init__()
            ...         self._linear1 = nn.Linear(10, 10)
            ...         self._linear2 = nn.Linear(10, 1)
            ...     def forward(self, x):
            ...         return self._linear2(self._linear1(x))

            >>> def train(print_result=False):
            ...     # 1. initialize parallel environment
            ...     group = dist.init_parallel_env()
            ...     process_group = group.process_group if group else None
            ...     # 2. create data parallel layer & optimizer
            ...     layer = LinearNet()
            ...     dp_layer = paddle.DataParallel(layer, group = process_group)
            ...     loss_fn = nn.MSELoss()
            ...     adam = opt.Adam(
            ...         learning_rate=0.001, parameters=dp_layer.parameters())
            ...     # 3. run layer
            ...     inputs = paddle.randn([10, 10], 'float32')
            ...     outputs = dp_layer(inputs)
            ...     labels = paddle.randn([10, 1], 'float32')
            ...     loss = loss_fn(outputs, labels)
            ...     if print_result is True:
            ...         print("loss:", loss.numpy())
            ...     loss.backward()
            ...     adam.step()
            ...     adam.clear_grad()

            >>> # Usage 1: only pass function.
            >>> # If your training method no need any argument, and
            >>> # use all visible devices for parallel training.
            >>> if __name__ == '__main__':
            ...     dist.spawn(train)

            >>> # Usage 2: pass function and arguments.
            >>> # If your training method need some arguments, and
            >>> # use all visible devices for parallel training.
            >>> if __name__ == '__main__':
            ...     dist.spawn(train, args=(True,))

            >>> # Usage 3: pass function, arguments and nprocs.
            >>> # If your training method need some arguments, and
            >>> # only use part of visible devices for parallel training.
            >>> # If your machine hold 8 cards {0,1,2,3,4,5,6,7},
            >>> # this case will use cards {0,1}; If you set
            >>> # CUDA_VISIBLE_DEVICES=4,5,6,7, this case will use
            >>> # cards {4,5}
            >>> if __name__ == '__main__':
            ...     dist.spawn(train, args=(True,), nprocs=2)

            >>> # Usage 4: pass function, arguments, nprocs and gpus.
            >>> # If your training method need some arguments, and
            >>> # only use part of visible devices for parallel training,
            >>> # but you can't set your machine's environment variable
            >>> # CUDA_VISIBLE_DEVICES, such as it is None or all cards
            >>> # {0,1,2,3,4,5,6,7}, you can pass `gpus` to
            >>> # select the GPU cards you want to use. For example,
            >>> # this case will use cards {4,5} if your machine hold 8 cards.
            >>> if __name__ == '__main__':
            ...     dist.spawn(train, args=(True,), nprocs=2, gpus='4,5')

    r   r   Nspawnr    )targetrZ   )r'   r7   r[   rQ   r0   Zget_contextrT   SimpleQueueProcessrm   daemonstartrX   rn   rU   )ri   rZ   rY   rU   r   r%   Zprocs_env_listr   mprs   rt   ru   irj   rk   rq   contextr   r   r   r     sH   r



r   )r   r   TF)&r0   rR   r   rg   r!   Zpaddle.baser   Zpaddle.devicer   Zpaddle.distributed.cloud_utilsr   r   Z$paddle.distributed.fleet.cloud_utilsr   Zpaddle.distributed.fleet.launchr   Z%paddle.distributed.fleet.launch_utilsr   r	   r
   Z%paddle.distributed.utils.launch_utilsr   r   r   Zpaddle.frameworkr   __all__r   r'   r7   r<   rF   r[   r^   ra   rm   rn   r   r   r   r   r   <module>   s4   !
 >B