o
    "j                     @   s   d Z ddlZddlZddlmZmZ ddlmZ g ZG dd dZ	G dd dZ
G d	d
 d
ZG dd deZG dd deZG dd deZG dd deZG dd deZG dd deZG dd deZdS )zDefinition of Role Makers.    N)ManagerProcess)basec                   @   s   e Zd ZdZdZdZdS )Role         N)__name__
__module____qualname__WORKERSERVERXPU r   r   m/var/www/html/Deteccion_Ine/venv/lib/python3.10/site-packages/paddle/incubate/distributed/fleet/role_maker.pyr      s    r   c                   @   s0   e Zd ZdZdd Zdd Zdd Zdd	 Zd
S )MockBarrierz|
    MockBarrier is a empty impletation for barrier
    mock as a real barrier for never-barrier in a specific scenario
    c                 C      dS )z+
        dummy barrier, do nothing
        Nr   selfr   r   r   barrier%      zMockBarrier.barrierc                 C   r   )z/
        dummy all barrier, do nothing
        Nr   r   r   r   r   barrier_all+   r   zMockBarrier.barrier_allc                 C   s   |S )zg
        dummy all reduce, do nothing
        Args:
            obj(any): obj to do all reduce
        r   r   objr   r   r   
all_reduce1   s   zMockBarrier.all_reducec                 C   s   |gS )zg
        dummy all gather, do nothing
        Args:
            obj(any): obj to do all gather
        r   r   r   r   r   
all_gather9   s   zMockBarrier.all_gatherN)r	   r
   r   __doc__r   r   r   r   r   r   r   r   r      s    r   c                   @   s   e Zd ZdZdd Zdd Zdd Zdd	 Zd
d Zdd Z	dd Z
dd Zdd Zdd Zdd Zdd Zd"ddZdd Zdd  Zd!S )#RoleMakerBasez
    RoleMakerBase is a base class for assigning a role to current process
    in distributed training.
    A paddle developer can implement RoleMakerBase to design a role maker
    for worker or pserver assignment.
    c                 C   s"   g | _ g | _d| _d | _d| _d S )NF)_worker_endpoints_server_endpoints_role_is_generated_role_current_idr   r   r   r   __init__J   s
   
zRoleMakerBase.__init__c                 C      t d)z7
        return is_worker() of current process
        +Please implement this method in child classNotImplementedErrorr   r   r   r   	is_workerQ      zRoleMakerBase.is_workerc                 C   r%   )z7
        return is_server() of current process
        r&   r'   r   r   r   r   	is_serverW   r*   zRoleMakerBase.is_serverc                 C   r%   )z
        Check whether the node is the first instance of worker.
        Returns:
            bool: True if this is the first node of worker,
                  False if not.
        r&   r'   r   r   r   r   is_first_worker]      zRoleMakerBase.is_first_workerc                 C   r%   )zc
        Get current total worker number.

        Returns:
            int: worker number
        r&   r'   r   r   r   r   
worker_numf   r-   zRoleMakerBase.worker_numc                 C   s   |   r|  S |  S N)r)   worker_indexserver_indexr   r   r   r   role_ido   s   zRoleMakerBase.role_idc                 C   r%   )zS
        Get current worker id.

        Returns:
            int: node id
        r&   r'   r   r   r   r   r0   r   r-   zRoleMakerBase.worker_indexc                 C   r%   )zS
        Get current server id.

        Returns:
            int: node id
        r&   r'   r   r   r   r   r1   {   r-   zRoleMakerBase.server_indexc                 C      | j S )z*
        return trainer endpoints
        )r   r   r   r   r   get_trainer_endpoints      z#RoleMakerBase.get_trainer_endpointsc                 C   r3   )z*
        return pserver endpoints
        )r    r   r   r   r   get_pserver_endpoints   r5   z#RoleMakerBase.get_pserver_endpointsc                 C   s   d | j| j| j| jS )NzDrole: {}, current_id: {}, worker_endpoints: {}, server_endpoints: {})formatr"   r#   r   r    r   r   r   r   	to_string   s   zRoleMakerBase.to_stringc                 C      t d dS )
        all gather between trainers and pservers

        Args:
            input(int|float): input value

        Returns:
            return a list of values
        z0warning: RoleMakerBase does not have all gather.Nprintr   inputr   r   r   r         
zRoleMakerBase.all_gathersumc                 C   r9   )  
        all reduce between trainers if current role is TRAINER,
        only support array of one dim.

        Args:
            input(list/numpy.array): array of one dim
            output(list/numpy.array): array of one dim
            mode(str): "sum" or "min" or "max"
        z7warning: RoleMakerBase does not have all reduce worker.Nr;   r   r>   outputmoder   r   r   all_reduce_worker   r?   zRoleMakerBase.all_reduce_workerc                 C   r9   )E
        barrier between trainers if current role is TRAINER
        z4warning: RoleMakerBase does not have barrier worker.Nr;   r   r   r   r   barrier_worker      zRoleMakerBase.barrier_workerc                 C   r9   )E
        barrier between trainers if current role is PSERVER
        z1warning: RoleMakerBase does not have barrier all.Nr;   r   r   r   r   r      rH   zRoleMakerBase.barrier_allNr@   )r	   r
   r   r   r$   r)   r+   r,   r.   r2   r0   r1   r4   r6   r8   r   rE   rG   r   r   r   r   r   r   B   s"    				
r   c                       sh   e Zd ZdZ fddZdd Zdd Zdd	 Zd
d Zdd Z	dd Z
dd Zdd Zdd Z  ZS )MPIRoleMakerz
    MPIRoleMaker is a MPI-API based role maker which is a counter-part of K8SRoleMaker
    mpi4py will be used if a developer inherits MPIRoleMaker
    c                    s:   t    ddlm} || _|j| _d| _d| _d| _dS )Init.r   )MPIN)	superr$   Zmpi4pyrM   Z
COMM_WORLD_comm_node_type_comm_ips_ip)r   rM   	__class__r   r   r$      s   

zMPIRoleMaker.__init__c                 C      | j  | _| jS )zReturn rank.)rO   ZGet_rank_rankr   r   r   r   	_get_rank      zMPIRoleMaker._get_rankc                 C   rU   )zReturn size.)rO   ZGet_size_sizer   r   r   r   	_get_size   rX   zMPIRoleMaker._get_sizec                 C   s   |    | j|S )zD
        all_gather(obj) will call MPI's allgather function
        )_barrier_allrO   	allgatherr   r   r   r   _all_gather   s   zMPIRoleMaker._all_gatherc                 C   s"   |   r| j  | j|S dS )zG
        worker_gather(obj) will call MPI's allgather function
        N)r)   rP   r   r\   r   r   r   r   _worker_gather   s   
zMPIRoleMaker._worker_gatherc                 C      | j   dS )zD
        barrier_all() will call MPI's barrier_all function
        N)rO   r   r   r   r   r   r[         zMPIRoleMaker._barrier_allc                 C   r_   )z4
        finalize the current MPI instance.
        N)rM   ZFinalizer   r   r   r   	_finalize   r`   zMPIRoleMaker._finalizec                 C   s   | j s| j|  | _ | j S )z;
        collect current distributed job's ip list
        )rQ   rO   r\   get_local_ipr   r   r   r   _get_ips   s   zMPIRoleMaker._get_ipsc                 C   s   ddl }|| | _| jS )zReturn get local ip.r   N)socketgethostbynamegethostnamerR   )r   rd   r   r   r   rb      s   zMPIRoleMaker.get_local_ipc                 C   r%   )zU
        generate_role() should be called to identify current process's role
        r&   r'   r   r   r   r   generate_role  r*   zMPIRoleMaker.generate_role)r	   r
   r   r   r$   rW   rZ   r]   r^   r[   ra   rc   rb   rg   __classcell__r   r   rS   r   rK      s    	rK   c                       s   e Zd ZdZ fddZdd Zdd Zd)d	d
Zdd Zdd Z	dd Z
dd Zdd Zdd Zdd Zdd Zdd Zdd Zdd  Zd)d!d"Zd#d$ Zd%d& Zd'd( Z  ZS )*MPISymetricRoleMakerz
    MPISymetricRoleMaker is designed for worker and server assignment
    under MPI. Typically, a worker and a server node will be appointed
    on each physical node. This role maker can be only used under MPI.
    c                    s    t    d| _d| _d| _dS )rL   Nr   r   )rN   r$   
_node_type_proc_per_node_pserver_rand_portr   rS   r   r   r$     s   

zMPISymetricRoleMaker.__init__c                 C   s   | j stddS )z&Check whether role has been generated.z&generate_role() should be called firstT)r!   	NameErrorr   r   r   r   _check_role_generation  s   z+MPISymetricRoleMaker._check_role_generationc                 C   s   | j s|   | |S r:   )r!   rg   r]   r=   r   r   r   r     s   

zMPISymetricRoleMaker.all_gatherr@   c                 C   s4   | j s|   |  std dS | ||| dS )rA   z8warning: current role is not worker in all_reduce_workerN)r!   rg   r)   r<   _all_reducerB   r   r   r   rE   ,  s   
z&MPISymetricRoleMaker.all_reduce_workerc                 C   s0   | j s|   |  r| j  dS td dS )rF   z5warning: current role is not worker in barrier_workerN)r!   rg   r)   rP   r   r<   r   r   r   r   rG   =  s
   z#MPISymetricRoleMaker.barrier_workerc                 C      | j s|   | j  dS rI   N)r!   rg   rO   r   r   r   r   r   r   H     z MPISymetricRoleMaker.barrier_allc                 C   s    |   r|  od|  kS dS )z[
        return whether current process is the first worker assigned by role maker
        r   F)rn   r)   r0   r   r   r   r   r,   P  s   z$MPISymetricRoleMaker.is_first_workerc                    sF    j dkrddl}|   |dd _  fdd jD }|S )zg
        get pserver endpoints
        Returns:
            endpoints(list): pserver endpoints
        r   Nia  i   c                    s   g | ]}|d  t  j qS ):)strrl   ).0xr   r   r   
<listcomp>f  s    z>MPISymetricRoleMaker.get_pserver_endpoints.<locals>.<listcomp>)rl   randomseed_server_numrandintr    )r   ry   Z	endpointsr   r   r   r6   X  s   

z*MPISymetricRoleMaker.get_pserver_endpointsc                 C   s   |   S r/   _worker_numr   r   r   r   r.   l  s   zMPISymetricRoleMaker.worker_numc                 C      |   r	| jdkS dS )zQ
        return whether current process is worker assigned by role maker
        r   Frn   rj   r   r   r   r   r)   o     
zMPISymetricRoleMaker.is_workerc                 C   r   )zQ
        return whether current process is server assigned by role maker
        r   Fr   r   r   r   r   r+   w  r   zMPISymetricRoleMaker.is_serverc                 C   s   |   rt|  | j S dS )5
        return the current number of worker
        r   )rn   intrZ   rk   r   r   r   r   r~     s   z MPISymetricRoleMaker._worker_numc                 C   s4   |   rt|  | j S |   t|  | j S z5
        return the current number of server
        )rn   r   rZ   rk   rg   r   r   r   r   r{     s   z MPISymetricRoleMaker._server_numc                 C   s0   |   rt| j| j S |   t|  d S )z,
        return the index of worker
        r   rn   r   rV   rk   rg   rZ   r   r   r   r   r0     s   z!MPISymetricRoleMaker.worker_indexc                 C   s2   |   rt| j| j S |   t|  | j S )z,
        return the index of server
        r   r   r   r   r   r1     s   z!MPISymetricRoleMaker.server_indexc                 C   sf   | j s|   |dkr| jj}n|dkr| jj}n|dkr"| jj}ntd| | jj|||d dS )rA   r@   maxminzunknown mode: %s)opN)	r!   rg   rM   ZSUMMAXZMIN
ValueErrorrP   Z	AllreducerB   r   r   r   rp     s   



z MPISymetricRoleMaker._all_reducec                 C   *   |   r|  r| j  dS dS td)@
        barrier all workers in current distributed job
        &You should check role generation firstN)rn   r)   rP   r   	Exceptionr   r   r   r   _barrier_worker  
   z$MPISymetricRoleMaker._barrier_workerc                 C   r   )@
        barrier all servers in current distributed job
        r   N)rn   r+   rP   r   r   r   r   r   r   _barrier_server  r   z$MPISymetricRoleMaker._barrier_serverc                 C   st   | j s6|  ddd | _|  ddd | _d|  | j d kr&d| _nd| _| j| j| _	d| _ dS t
d)z3
        generate currently process's role
        r   Nr   r   Tr   )r!   rc   r   r    rW   rk   rj   rO   ZSplitrP   r   r   r   r   r   rg     s   
z"MPISymetricRoleMaker.generate_rolerJ   )r	   r
   r   r   r$   rn   r   rE   rG   r   r,   r6   r.   r)   r+   r~   r{   r0   r1   rp   r   r   rg   rh   r   r   rS   r   ri   
  s*    






ri   c                       sb   e Zd ZdZd fdd	Zdd Zdd Zd	d
 Zdd Zdd Z	dd Z
dd Zdd Z  ZS )PaddleCloudRoleMakerzF
    role maker for paddle cloud,
    base class is RoleMakerBase
    Fc                    s   t    d| _|| _d S )NF)rN   r$   r!   _is_collective)r   Zis_collectiverS   r   r   r$     s   

zPaddleCloudRoleMaker.__init__c           
   
   C   sf  | j s| jsszOtjd d}ttjd }tjd }|dvr#td|dkr2tj}ttjd }n"|d	krPtj	}tjd
 }tjd }d
||g}||}ntdW n tye }	 ztdd}	~	ww || _|| _|| _|| _n9ttdd| _tdd| _| jdksJ td| _td| _| jdusJ d| jd| _t| j| _d| _ dS dS )zGenerate role.PADDLE_PSERVERS_IP_PORT_LIST,ZPADDLE_TRAINERS_NUMTRAINING_ROLETRAINERPSERVER(TRAINING_ROLE must be PSERVER or TRAINERr   PADDLE_TRAINER_IDr   POD_IPPADDLE_PORTrt   z:something wrong with PaddleCloud, please check environmentN0ZPADDLE_TRAINING_ROLEPADDLE_TRAINER_ENDPOINTSZPADDLE_CURRENT_ENDPOINTz#can't find PADDLE_TRAINER_ENDPOINTST)r!   r   osenvironsplitr   r   r   r   r   joinindex_trainers_numr    r"   r#   getenvZ_training_roler   Z_current_endpointlen)
r   eplisttrainers_numtraining_rolerole
current_idcur_ipZ	curr_portZcurr_endpointver   r   r   rg     sb   




z"PaddleCloudRoleMaker.generate_rolec                 C      | j s|   | jS r/   r!   rg   r    r   r   r   r   r6   )     z*PaddleCloudRoleMaker.get_pserver_endpointsc                 C      | j s|   | jtjkS r/   r!   rg   r"   r   r   r   r   r   r   r)   .     zPaddleCloudRoleMaker.is_workerc                 C   r   r/   r!   rg   r"   r   r   r   r   r   r   r+   3  r   zPaddleCloudRoleMaker.is_serverc                 C   $   | j s|   | jtjko| jdkS Nr   r!   rg   r"   r   r   r#   r   r   r   r   r,   8  s   z$PaddleCloudRoleMaker.is_first_workerc                 C   r   r/   r!   rg   r#   r   r   r   r   r0   =  r   z!PaddleCloudRoleMaker.worker_indexc                 C   r   r/   r   r   r   r   r   r1   B  r   z!PaddleCloudRoleMaker.server_indexc                 C   r   r/   r!   rg   r   r   r   r   r   r.   G  r   zPaddleCloudRoleMaker.worker_num)F)r	   r
   r   r   r$   rg   r6   r)   r+   r,   r0   r1   r.   rh   r   r   rS   r   r     s    <r   c                       s  e Zd ZdZ fddZdd Zdd ZdAd	d
Zdd Zdd Z	dd Z
dd Zdd Zdd Zdd Zdd Zdd Zdd Zdd  Zd!d" Zd#d$ Zd%d& Zd'd( Zd)d* Zd+d, Zd-d. ZdAd/d0Zd1d2 Zd3d4 Zd5d6 Zd7d8 Zd9d: Zd;d< Z d=d> Z!d?d@ Z"  Z#S )BGeneralRoleMakerg  
    This role maker is for general use, you can set os.environ to customize:
        PADDLE_PSERVERS_IP_PORT_LIST : all pservers' ip:port, separated by ','
        PADDLE_TRAINER_ENDPOINTS     : all trainers' ip:port, separated by ','
        TRAINING_ROLE                : TRAINER or PSERVER
        PADDLE_TRAINER_ID            : current trainer id (only for trainer),
                                       it is index in PADDLE_TRAINER_ENDPOINTS
        PADDLE_PSERVER_ID            : current pserver id (only for pserver)
                                       it is index in PADDLE_PSERVERS_IP_PORT_LIST
    c                    s   t    d| _|dd| _|dd| _|ddd| _|dd| _|d	d
| _	|dd| _
|dd}|dd| _g | _d | _|dkra|d| _t | _| j | _d| jd< |  | _| jdkrmdn| j| _tdd| _d S )NFZ	hdfs_name Zhdfs_ugipath/Zinit_timeout_seconds  Zrun_timeout_seconds Z
use_metricZhttp_ip_portZ
use_ps_gpurt   runningloZ
SYS_JOB_ID)rN   r$   r!   get
_hdfs_name	_hdfs_ugirstrip
_hdfs_path_init_timeout_seconds_run_timeout_seconds_use_metric_use_ps_gpu_http_ip_port_http_serverr   r   _managerdict_http_server_d$_GeneralRoleMaker__get_default_iface_ifacer   r   _prefix)r   kwargsip_portrS   r   r   r$   Y  s(   


zGeneralRoleMaker.__init__c                 C   s$  | j stjd d}tjd }tjd d}t|}|dvr%tdd| _dtjv r5ttjd | _|d	krtj	}ttjd
 }|dkrxt| j
dkrxt|t|t|t| d}t| j| j|fd| _d| j_d| jd< | j  d| _|| | _| jr
tj }|| |t| || j || j || j| j t| j
dkr|| j
d t| j
d d n| | j!d | j"| j# |$  || _%| j&s| j'r	tj( }	||	_)t||	_*| j
d |	_+t| j
d |	_,d}
d}|
|	_-||	_.tj/|	}|$  nt0 | _1n|dkrtj2}tj3ddur,ttjd }|| }ntjd }tjd }d4||g}|5|}d| _|| _tj }|| |t| || j || j || j| j t| j
dkr|| j
d t| j
d d n| | j!d | j"| j# |$  || _%tj }|| }||5| j |t| || j || j || j| j t| j
dkr|| j
d t| j
d d n| | j!d | j"| j# |$  || _1|| _6|| _7|| _8|| _9|5| j| _:t|| _;|| _<d| _ dS dS )6
        generate role for general role maker
        r   r   r   r   r   r   r   ZPADDLE_IS_BARRIER_ALL_ROLEr   r   r   )trainerpserverall)targetargsTr   r   /trainerr   r   r   PADDLE_PSERVER_IDNr   r   rt   r   /pserverr   /all)=r!   r   r   r   r   r   Z_is_barrier_allr   r   r   r   r   "_GeneralRoleMaker__start_kv_serverr   r   daemonstartrj   _cur_endpointr   coreGlooset_rankset_size
set_prefixr   	set_ifacer   set_timeout_secondsr   r   Zset_http_storeset_hdfs_storer   r   r   initrP   r   r   ZGlooParallelStrategyrankZrank_num
ip_addressr   Zinit_secondsZrun_secondsZGlooParallelContextr   	_all_commr   r   r   r   r   r    r"   r#   rV   rY   r   )r   r   r   worker_endpointsr   r   r   size_dglooZGloo_strategyZDefault_init_timeout_secondsZDefault_run_timeout_secondsr   cur_endpointr   cur_portall_listr   r   r   rg   t  s   




















 zGeneralRoleMaker.generate_rolec                 C   s
   |  |S ro   )r]   r=   r   r   r   r      s   

zGeneralRoleMaker.all_gatherr@   c                 C   s   |   sdS | ||| dS )rA   N)r)   rp   rB   r   r   r   rE     s   
z"GeneralRoleMaker.all_reduce_workerc                 C      |    dS )rF   N)r   r   r   r   r   rG     rH   zGeneralRoleMaker.barrier_workerc                 C   r   rr   )r[   r   r   r   r   r      rH   zGeneralRoleMaker.barrier_allc                 C   r   )z7
        get local endpoint of current process
        )r!   rg   r   r   r   r   r   get_local_endpoint&     z#GeneralRoleMaker.get_local_endpointc                 C   r   )z.
        get endpoint of all trainers
        )r!   rg   r   r   r   r   r   r4   .  r   z&GeneralRoleMaker.get_trainer_endpointsc                 C   r   )z.
        get endpoint of all pservers
        r   r   r   r   r   r6   6  r   z&GeneralRoleMaker.get_pserver_endpointsc                 C   r   )z3
        whether current process is worker
        r   r   r   r   r   r)   >     zGeneralRoleMaker.is_workerc                 C   r   z3
        whether current process is server
        r   r   r   r   r   r+   F  r   zGeneralRoleMaker.is_serverc                 C   r   z=
        whether current process is worker of rank 0
        r   r   r   r   r   r   r,   N     z GeneralRoleMaker.is_first_workerc                 C   r   )z-
        get index of current worker
        r   r   r   r   r   r0   V  r   zGeneralRoleMaker.worker_indexc                 C   r   )z-
        get index of current server
        r   r   r   r   r   r1   ^  r   zGeneralRoleMaker.server_indexc                 C      | j s|   |  S )z5
        retrun the current number of worker
        )r!   rg   r~   r   r   r   r   r.   f     zGeneralRoleMaker.worker_numc                 C   r   r   )r!   rg   r{   r   r   r   r   
server_numn  r   zGeneralRoleMaker.server_numc                 C   (   | j s|   |  r| j  dS dS r   N)r!   rg   r)   rP   r   r   r   r   r   r   v  
   z GeneralRoleMaker._barrier_workerc                 C   rq   )zL
        barrier all workers and servers in current distributed job
        N)r!   rg   r   r   r   r   r   r   r[     rs   zGeneralRoleMaker._barrier_allc                 C   r  )r   N)r!   rg   r+   rP   r   r   r   r   r   r     r  z GeneralRoleMaker._barrier_serverc                 C   r   )r   r   r   r   r   r   r~     r   zGeneralRoleMaker._worker_numc                 C      | j s|   t| jS r   )r!   rg   r   r    r   r   r   r   r{     s   
zGeneralRoleMaker._server_numc                 C   r   )zDefault do nothing.Nr   r   r   r   r   ra     s   zGeneralRoleMaker._finalizec                 C   sF   | j s|   t|}| j||}tt|D ]}|| ||< qdS )z
        all reduce between all workers

        Args:
            input(list|numpy.array): array of one dim
            output(list|numpy.array): array of one dim
            mode(str): "sum" or "min" or "max"
        N)r!   rg   listrP   r   ranger   )r   r>   rC   rD   Z
input_listZansir   r   r   rp     s   	zGeneralRoleMaker._all_reducec                 C   s"   | j s|   |   | j|S )z9
        gather between all workers and pservers
        )r!   rg   r[   r   r   r   r   r   r   r]     s   zGeneralRoleMaker._all_gatherc                 C   s.   | j s|   |  sdS |   | j|S )z,
        gather between all workers
        N)r!   rg   r)   r   rP   r   r   r   r   r   r^     s   zGeneralRoleMaker._worker_gatherc                 C   r   )z>
        get current rank in all workers and pservers
        )r!   rg   rV   r   r   r   r   rW     r   zGeneralRoleMaker._get_rankc                 C   r   )z;
        get total num of all workers and pservers
        )r!   rg   rY   r   r   r   r   rZ     r   zGeneralRoleMaker._get_sizec                 C   s    |   }|  }|dkr|S |S )0
        get default physical interface
        r   )1_GeneralRoleMaker__get_default_iface_from_gateway4_GeneralRoleMaker__get_default_iface_from_interfaces)r   Zdefault1Zdefault2r   r   r   Z__get_default_iface  s   z$GeneralRoleMaker.__get_default_ifacec                 C   s   t d  d}d}d}|D ]C}| }d|v r+d|v r+|d}|d}q|durU|durUd}t||kr?|| }|rU|dkrU|dkrUt||krU||   S qdS )	r	  zroute -A inet
NZGatewayZIface*z0.0.0.0r   )r   popenreadstripr   r   r   )r   resZgateway_idxZ	iface_idxitemZgatewayr   r   r   Z __get_default_iface_from_gateway  s(   
z1GeneralRoleMaker.__get_default_iface_from_gatewayc                 C   sD   t d  d}|D ]}d|v r|dd    S qdS )r	  zip -f inet addr | awk NR%3==1r  Z	BROADCASTrt   r   r   )r   r  r  r  r   )r   r  r  r   r   r   Z#__get_default_iface_from_interfaces  s   z4GeneralRoleMaker.__get_default_iface_from_interfacesc                 C   sZ   ddl m} |t| jd |}|  d}|ddr't| |dds|  d S )Nr   )KVServerr      r   F)	Z*paddle.distributed.fleet.utils.http_serverr  r   r   r   r   timesleepstop)r   Zhttp_server_dr   r  Zhttp_serverZwait_secondsr   r   r   Z__start_kv_server  s   
z"GeneralRoleMaker.__start_kv_serverrJ   )$r	   r
   r   r   r$   rg   r   rE   rG   r   r   r4   r6   r)   r+   r,   r0   r1   r.   r  r   r[   r   r~   r{   ra   rp   r]   r^   rW   rZ   r   r
  r  r   rh   r   r   rS   r   r   M  sD     
		
	r   c                   @   s@   e Zd ZdZdd Zdd Zdd Zdd	 Zd
d Zdd Z	dS )HeterRoleMakerr   c                 C   s  | j stjd d}tjd }tjd d}t|}tjd d}t|}|dvr1td|dkrtj}ttjd	 }d
| _	|| | _
tj }	|	| |	t| |	| j |	| j |	| j| j |	| jdd | j| j |	  |	| _n|dkrtj}ttjd }d| _	|| | _
tj }	|	| |	t| |	| j |	| j |	| j| j |	| jdd | j| j |	  |	| _ns|dkrHtj}tj ddurttjd }|| }
ntjd }tjd }d!||g}
|"|
}d| _	|
| _
tj }	|	| |	t| |	| j |	| j |	| j| j |	| jdd | j| j |	  |	| _|dksR|dkrtj }	|| }|	|"| j
 |	t| |	| j |	| j |	| j| j |	| jdd | j| j |	  |	| _#tj }	|| | }|	|"| j
 |	t| |	| j |	| j |	| j| j |	| jdd | j| j |	  |	| _$|| _%|| _&|| _'|| _(|"| j
| _)t|| _*|| _+|| _,d| _ dS dS )r   r   r   r   r   ZPADDLE_XPU_ENDPOINTS)r   r   r   z/TRAINING_ROLE must be PSERVER or TRAINER or XPUr   r   r   r   r   r   ZPADDLE_XPU_IDr   z/xpur   r   Nr   r   rt   r   r   z/heterr   T)-r!   r   r   r   r   r   r   r   r   rj   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   rP   r   r   r   r   r   _heter_commr   r   r    r"   r#   rV   rY   r   _xpu_endpoints)r   r   r   r   r   Zxpu_endpointsxpu_numr   r   r   r   r   r   Z
heter_listr   r   r   r   rg     s   

















zHeterRoleMaker.generate_rolec                 C   r   r   )r!   rg   r"   r   r   r   r   r   r   is_xpu  r   zHeterRoleMaker.is_xpuc                 C   r   r   )r!   rg   r"   r   r   r#   r   r   r   r   is_first_xpu  r   zHeterRoleMaker.is_first_xpuc                 C   r  r  )r!   rg   r  rP   r   r   r   r   r   _barrier_xpu  r  zHeterRoleMaker._barrier_xpuc                 C   s.   | j s|   |  s| jr| j  dS dS r  )r!   rg   r  r)   r  r   r   r   r   r   _barrier_heter  s
   zHeterRoleMaker._barrier_heterc                 C   r  ) )r!   rg   r   r  r   r   r   r   r    s   
zHeterRoleMaker.xpu_numN)
r	   r
   r   r   rg   r  r  r  r  r  r   r   r   r   r    s     		r  c                       sd   e Zd ZdZdejddf fdd	Zdd Zdd	 Zd
d Z	dd Z
dd Zdd Zdd Z  ZS )UserDefinedRoleMakerz
    UserDefinedRoleMaker is designed for worker and server assignment
    under manual. Typically, a worker and a server node will be appointed
    on each physical node, It can be assign by user.
    r   Nc                    s  t    t|tstdt|dkrtdt|tt|kr&td|D ]}t|ts3tdq(|| _	|t
jkrE|t
jkrEtd|| _t|tsQtd|dk rYtd| jt
jkri|t|kritd	|| _t|tsutd
|dkr}td|| _d S )Nz'server_endpoints must be as string listr   z:the length of server_endpoints list must be greater than 0z.server_endpoints can't have duplicate elementsz8every element in server_endpoints list must be as stringzrole must be as Rolecurrent_id must be as int-current_id must be greater than or equal to 0zZif role is Role.SERVER, current_id must be less than or equal to len(server_endpoints) - 1zworker_num must be as intz!worker_num must be greater than 0)rN   r$   
isinstancer  	TypeErrorr   r   setru   r    r   r   r   r"   r   r#   r~   )r   r   r   r.   Zserver_endpointsZserver_endpointrS   r   r   r$     sJ   





zUserDefinedRoleMaker.__init__c                 C   
   d| _ d S NTr!   r   r   r   r   rg        
z"UserDefinedRoleMaker.generate_rolec                 C      | j tjkS r/   )r"   r   r   r   r   r   r   r)   	     zUserDefinedRoleMaker.is_workerc                 C   r+  r/   )r"   r   r   r   r   r   r   r+     r,  zUserDefinedRoleMaker.is_serverc                 C   s   | j tjko
| jdkS r   )r"   r   r   r#   r   r   r   r   r,     s   z$UserDefinedRoleMaker.is_first_workerc                 C   r3   r/   r#   r   r   r   r   r0        z!UserDefinedRoleMaker.worker_indexc                 C   r3   r/   r-  r   r   r   r   r1     r.  z!UserDefinedRoleMaker.server_indexc                 C   r3   r/   r}   r   r   r   r   r.     r.  zUserDefinedRoleMaker.worker_num)r	   r
   r   r   r   r   r$   rg   r)   r+   r,   r0   r1   r.   rh   r   r   rS   r   r!    s    4r!  c                       sJ   e Zd ZdZd fdd	Zdd Zdd	 Zd
d Zdd Zdd Z	  Z
S )UserDefinedCollectiveRoleMakerzp
    UserDefinedCollectiveRoleMaker is designed for worker assignment
    under manual for collective mode.
    r   Nc                    s   t    t|tstdt|dkrtdt|tt|kr&td|D ]}t|ts3tdq(|| _	t|t
s@td|dk rHtd|t|krRtd|| _t| j	| _d S )	Nz'worker_endpoints must be as string listr   z:the length of worker_endpoints list must be greater than 0z.worker_endpoints can't have duplicate elementsz8every element in worker_endpoints list must be as stringr"  r#  zBcurrent_id must be less than or equal to len(worker_endpoints) - 1)rN   r$   r$  r  r%  r   r   r&  ru   r   r   r#   r~   )r   r   r   Zworker_endpointrS   r   r   r$   "  s8   



z'UserDefinedCollectiveRoleMaker.__init__c                 C   r'  r(  r)  r   r   r   r   rg   D  r*  z,UserDefinedCollectiveRoleMaker.generate_rolec                 C   r   r(  r   r   r   r   r   r)   G  s   z(UserDefinedCollectiveRoleMaker.is_workerc                 C   s
   | j dkS r   r-  r   r   r   r   r,   J  r*  z.UserDefinedCollectiveRoleMaker.is_first_workerc                 C   r3   r/   r-  r   r   r   r   r0   M  r.  z+UserDefinedCollectiveRoleMaker.worker_indexc                 C   r3   r/   r}   r   r   r   r   r.   P  r.  z)UserDefinedCollectiveRoleMaker.worker_num)r   N)r	   r
   r   r   r$   rg   r)   r,   r0   r.   rh   r   r   rS   r   r/    s    "r/  )r   r   r  multiprocessingr   r   Zpaddler   __all__r   r   r   rK   ri   r   r   r  r!  r/  r   r   r   r   <module>   s*   #{M Yk   G ;Q