o
    "jV                     @   s   d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	m
Z
mZ d dlmZ ddlmZ eddZdZd	Zd
ZdZG dd dZG dd dZG dd dZG dd dZdS )    N)cloud_utilslaunch_utils)
get_logger   )getenv_or_backupINFOELASTICe   f   x   <   c                   @   s   e Zd ZdZdZdS )ElasticLevel      N)__name__
__module____qualname__FAULT_TOLERANCEr    r   r   i/var/www/html/Deteccion_Ine/venv/lib/python3.10/site-packages/paddle/distributed/fleet/elastic/manager.pyr   +   s    r   c                   @   s    e Zd ZdZdZdZdZdZdS )ElasticStatus	completederrorZholdZrestartexitN)r   r   r   	COMPLETEDERRORHOLDRESTARTEXITr   r   r   r   r   0   s    r   c                   @   s<   e Zd Zdd Zdd Zdd Zdd Zd	d
 Zdd ZdS )LauncherInterfacec                 C   s   || _ g | _d S N)argsprocs)selfr!   r   r   r   __init__9   s   
zLauncherInterface.__init__c                 C   s  t jdkr5| jD ]'}|j d u r/t t |jjtj	 |j
r%|j
  td|jj  qtd | jD ] }|j d u rX|j  |j
rN|j
  td|jj  q8tddD ]+}d}| jD ]}|j d u ryt |jjtj d}qe|std	  dS td q^dS )
Nntzterminate process group gid:r   zterminate process id:r   2   FTzterminated all the procs)osnamer"   procpollkillpggetpgidpidsignalSIGTERMZlog_fncloseloggerinfotimesleep	terminaterangekillSIGKILL)r#   pstepaliver   r   r   _terminate_procs=   s8   








z"LauncherInterface._terminate_procsc                 C   s   d}d }| j D ]0}|j }|d u rd}q|dkr7|tkr&td |  S td td|j| |}q|s@|d u r@dS |S )NFTr   z+return form elastic auto parallel re-launchzABORT!!! ABORT!!! ABORT!!!z<ERROR rank {} error with exit code {}, check log for detail.)	r"   r)   r*   ELASTIC_AUTO_PARALLEL_EXIT_CODEr1   r2   r   formatrank)r#   r;   resultr9   retr   r   r   _check_procs]   s*   



zLauncherInterface._check_procsc                 C      t r    NotImplementedErrorr#   r   r   r   launcht      zLauncherInterface.launchc                 C   rC   r    rD   rF   r   r   r   stopw   rH   zLauncherInterface.stopc                 C   rC   r    rD   rF   r   r   r   watchz   rH   zLauncherInterface.watchN)	r   r   r   r$   r<   rB   rG   rI   rJ   r   r   r   r   r   8   s     r   c                	   @   s   e Zd Zdd Z	d,dedededefdd	Zd-ddZdd Z	defddZ
dd Zdd Zd.defddZdd Zdd Zdd Zd d! Zd"d# Zd$d% Zd&d' Zd(d) Zd*d+ ZdS )/ElasticManagerc                    s  |_ |jp
td}|jptd}|j\__|j	p$td}|j
p/ttdd}|jp7td}|r<|n _	t|\__ttdt_ttdt d _t rtd	d
_tjd_ttdd_tdd
_tdd
}|d_n2|jptd	d
_jd}	t|	_ttdd_|	jj_fdd|	D _dj	jf _ t!"dj  t!"dj dj  ttdt#j$_%jjksjdkrjdkrt#j$_%t!"d jdkrjjkrt#j&_%t!"d |s5tdr5tdr5d'tdtd}t!(d| d|  g _)d_*d_+d_,d _-|r^d |vs^|r^jsnt!"d!'||j d_.d S d"_.|_/d#| _0j0d$ _1j0d% _2j0d& _3d
4d'd( t5d)D }
j1 d*|
 t66  _7	 j/8j0d+ fd,d-}j/9j1|}j/:  fd.d/}t;j<d0|d"d1}|=  j/j8j7j >d2d3 j/8j3j d4j >d2 fd5d6}j/?j3|}||g_@d _Ad S )7NZPADDLE_ELASTIC_SERVERZPADDLE_ELASTIC_JOB_IDZPOD_IPZPADDLE_ELASTIC_SCALEr   ZPADDLE_ELASTIC_FORCEZPADDLE_ELASTIC_TIMEOUTZPADDLE_ELASTIC_TTLPADDLE_TRAINERS ,ZPADDLE_PORTZ6170DISTRIBUTED_TRAINER_ENDPOINTSPADDLE_TRAINER_ENDPOINTSZFLAGS_START_PORTc                    s   g | ]	}d | j f qS %s:%d)
start_port).0iprF   r   r   
<listcomp>   s    z+ElasticManager.__init__.<locals>.<listcomp>rR   zstart job with np=z	trainers=z, trainer_endpoints_list=Z#PADDLE_ELASTIC_FAULT_TOLERANC_LEVELz+start job with ElasticLevel.FAULT_TOLERANCEz#start job with ElasticLevel.ELASTICZ PADDLE_ELASTIC_ETCD_SERVICE_HOSTZ PADDLE_ELASTIC_ETCD_SERVICE_PORTz{}:{}zinit with server z host F:z7Elastic is not enabled with server {} name {} and np {}Tz/paddle/z/nodesz/npz
/endpointsc                 s   s    | ]}t d V  qdS )abcdefghijklmnopqrstuvwxyzN)randomchoice)rT   _r   r   r   	<genexpr>   s    

z*ElasticManager.__init__.<locals>.<genexpr>   /   0c                    s`   dd  j  jD  _ jrtt jn j _td j d j  d _	d  _
d S )Nc                 S      g | ]}|d    qS r   decoderT   ir   r   r   rV          zCElasticManager.__init__.<locals>.host_call_back.<locals>.<listcomp>zhost_call_back curr_host=z, hosts:T)etcd
get_prefixnode_prefixhostslistsetr1   r2   	curr_host	need_syncelastic_startup_time)eventrF   r   r   host_call_back   s   
z/ElasticManager.__init__.<locals>.host_call_backc               
      s   	 zD   dd jjD } | rtt| n| } tdj d|   j| vrDtdj  jj	j
jdd W n  tye } ztd	| d
t   W Y d }~d S d }~ww t d  q)NTc                 S   r`   ra   rb   rd   r   r   r   rV     s    
zDElasticManager.__init__.<locals>.lease_heartbeat.<locals>.<listcomp>z[lease_heartbeat] curr_host=, hosts=z [lease_heartbeat] register host=latin-1leasez![lease_heartbeat] internal error: r   )refreshrg   rh   ri   rk   rl   r1   r2   rm   put	host_pathencode	Exceptionr   	traceback
format_excr3   r4   )rj   eZelastic_ttlZ
host_leaser#   r   r   lease_heartbeat  s:   


z0ElasticManager.__init__.<locals>.lease_heartbeatr   )r(   targetdaemonrs   rt   |c                    sn    j sd S  j jd }|d ur| nd}|d\ _  _td j  d td j d d S )Nr   rM   r   z"set DISTRIBUTED_TRAINER_ENDPOINTS rv   zset PADDLE_TRAINERS )	dist_endpointsrg   getendpoints_pathrc   splittrainersr1   r2   )rp   valueZedpsrF   r   r   endpoints_call_back-  s   z4ElasticManager.__init__.<locals>.endpoints_call_back)Br!   Zelastic_serverr'   getenvZjob_id	_parse_npnpmin_npmax_nphostscaleintforce	_get_hostr   Zget_device_proc_infoZdevice_modedevices_per_procELASTIC_TIMEOUTelastic_timeoutELASTIC_TTLrS   r   Zuse_paddlecloudr   lenr   r   r   trainer_endpoints_listips_host_to_endpointsrm   r1   r2   r   r   elastic_levelr   r>   debugrj   stoppedsigintrn   ro   enablerg   prefixri   Znp_pathr   joinr6   r3   ry   rx   Zadd_watch_prefix_callbackru   	threadingThreadstartrz   Zadd_watch_callbackwatcheslauncher)r#   r!   Zetcd_clientserverr(   r   r   r   Ztrainer_endpointsZnode_ipsZnode_tagrq   Z
host_watchr   Zkeepalived_threadr   Zendpoints_watchr   r   r   r$      s   




$





zElasticManager.__init__  ip_port_listr   rS   returnc           
         s~   g }|D ]3}| d}t|dkr|d  t|d }n| |}tt||t| }| fdd|D  qd|}	|	S )NrW   r   r   r   c                    s   g | ]}d  |f qS rQ   r   )rT   portrU   r   r   rV   M  s    z5ElasticManager._host_to_endpoints.<locals>.<listcomp>rN   )r   r   r   rk   r6   extendr   )
r#   r   r   rS   Zendpoint_listip_port	endpointsr   Zportsr   r   r   r   r   ?  s   

z!ElasticManager._host_to_endpointsFc                 C   s   t d|  | jr| j  | jsd S |r| j| jd | jD ]}| j	| q"| j
| j t| j| j}t|dkrJ| j| j d S d S )Nzmanager exist completed    1r   )r1   r2   r   rI   r   rg   rx   r   r   Zcancel_watchdeletery   rk   rh   ri   r   Zdelete_prefix)r#   r   rJ   rj   r   r   r   r   R  s   

zElasticManager.exitc                 C   s   | j jstd d S td ttj }tj| j j|tj	tj	dd
 \}}|r3td d S td|d   d S )Nzskip pre_hookzexecute pre_hook...T)envstdoutstderrshellzpre_hook exec failedzpre_hook exec result: zutf-8)r!   Zelastic_pre_hookr1   r2   copyr'   environ
subprocessPopenPIPEcommunicatewarningrc   strip)r#   Zcurrent_envouterrr   r   r   pre_hookf  s"   

zElasticManager.pre_hookr   c                 C   s   |pt dd}|d}d }}t|dkr+t|d }|dkr#dn|}d}||fS t|dkrQt|d }t|d }|dkrCdn|}||krK|n|}||fS td| d)	z1
        np format is "MIN" or "MIN:MAX"
        ZPADDLE_ELASTIC_NP0rW   r   r   r   zthe np=z) needs to be in "MIN" or "MIN:MAX" format)r'   r   r   r   r   
ValueError)r#   r   Znp_strZnp_dictr   r   r   r   r   r   x  s"   

zElasticManager._parse_npc                 C   s$   zt t t  W S    Y dS )Nz	127.0.0.1)socketgethostbynamegetfqdngethostnamerF   r   r   r   r     s   zElasticManager._get_hostc                 C   s$   | j sdS t| j| jd dkS )NTr   r   )r   r   rg   r   r   rF   r   r   r   
_completed  s   zElasticManager._completedN	host_listc              
   C   s  |r|| _ ndd | j| jD | _ | j rtt| j n| j | _ | jtjkr3t	| j | j
kr1dS dS | jtjkrt	| j }|| j
krEdS | jsMt | _|| jkrWd | _dS || jkr|| jk rt | j }|| jkrtd| d| j d| d| j  dS dS d | _dS dS )	Nc                 S   r`   ra   rb   rd   r   r   r   rV     rf   z)ElasticManager._match.<locals>.<listcomp>TFzawait for timeout, you can set value by PADDLE_ELASTIC_TIMEOUT,                         hosts_num=z	, min_np=z(,                         interval_time=z, elastic_timeout=)rj   rg   rh   ri   rk   rl   r   r   r   r   r   r   ro   r3   r   r   r   r1   r2   )r#   r   Z	hosts_numZinterval_timer   r   r   _match  sL   




zElasticManager._matchc                 C   s$   | j | j| d| d d S )Nr   rs   )rg   rx   r   rz   )r#   r   rj   r   r   r   _update_endpoint  s   zElasticManager._update_endpointc                 C   s   t tdd}td| j d| j  | j| jv r=| jtjd< | jtjd< t	d| j d t	d	| j d d S | j
| j}|d
krW| j
| | j
|< | j| j
|< n| tjd< ddd | j
D }|| j_|tjd< d S )NPADDLE_TRAINER_IDzself.curr_host=z, self.dist_endpoints=rO   rL   z)update env DISTRIBUTED_TRAINER_ENDPOINTS rv   zupdate env PADDLE_TRAINERS r   rN   c                 S      g | ]	}| d d qS rW   r   r   rT   	host_portr   r   r   rV         z9ElasticManager._update_fault_tolrance.<locals>.<listcomp>)r   r'   r   r1   r   rm   r   r   r   r2   rj   indexr   r!   r   )r#   r?   idxrj   r   r   r   _update_fault_tolrance  s(   z%ElasticManager._update_fault_tolrancec              
   C   s   t | j}tdt| j d| j d| j d|  | jD ]}||vr*|| qt	|
| jtjd< ddd |D }|| j_|tjd	< t|| _d|tjd
< | jtjd< || _d S )Nzelastic scale out, from  to rr   , host_endpoints=r   rN   c                 S   r   r   r   r   r   r   r   rV     r   z<ElasticManager._update_elastic_scale_out.<locals>.<listcomp>rL   rP   rO   )r   deepcopyr   r1   r2   r   rj   r   appendstrr   rm   r'   r   r   r!   r   r   )r#   host_endpointsZcurr_host_portrj   r   r   r   _update_elastic_scale_out  s(   $






z(ElasticManager._update_elastic_scale_outc              
   C   s|  t | j}td| j dt| j d| j d|  i }g }t| jD ]!\}}|	|}|t| jd krA|
|sA|||< q%|| q%d}g }tt| jD ]}|
|sit|dkri|| ||< |d7 }||
| qRtd|  || _dd	 |D }	d
|	}
| || j}|
| j_t|	| jtjd< |
tjd< t|| _d
|tjd< |tjd< | ||
 d S )Nzelastic scale in, from r   rr   r   r   r   z#elastic scale in, sorted_endpoints=c                 S   r   r   r   )rT   r   r   r   r   rV     r   z;ElasticManager._update_elastic_scale_in.<locals>.<listcomp>rN   r   rL   rP   rO   )r   r   r   r1   r2   r   r   rj   	enumerater   r   r   r6   r   r   r   r!   r   r   rm   r'   r   r   )r#   r   Zendpoints_dictZunsorted_endpointsidr   r   Z
idle_indexZsorted_endpointsZip_listrj   Znew_endpointsr   r   r   _update_elastic_scale_in  sD   $








z'ElasticManager._update_elastic_scale_inc                 C   s   t | jdksJ d| jtjkr|   d S t | j| jkr.td| j  |   d S t | j| jkr<| 	  d S | 
  d S )Nr   zhosts emptyzelastic startup, hosts=)r   rj   r   r   r   r   r   r1   r2   r   r   rF   r   r   r   _update_hosts/  s   zElasticManager._update_hostsc                 C   sp   | j sd S d}| js6|  rtd| j  |   d S td| j d| j  |d7 }t	d | jr
d S )Nr   zready with hosts znot ready for np z with hosts r   )
r   r   r   r1   r2   rj   r   r   r3   r4   )r#   r   r   r   r   wait@  s   
zElasticManager.waitc                 C   s$   | j rd S || j| _| j  d S r    )r   r!   r   rG   )r#   r   r   r   r   runO  s   zElasticManager.runc                 C   s   | j rd| _ | jsm| j }td|  |d urRtd|  |tkr3td | j  t	j
S |dkr9dnd}| j|d |rFt	jS | jtjkrOt	jS t	jS |  se|  r]| j re| j  t	j
S td | jr	| jru| j  t	jS )	NFzlauncher.watch():zjob exit with code zjob re-launch for auto parallelr   T)r   r   )rn   r   r   rJ   r1   r   r2   r=   rI   r   r   r   r   r   r   r   r   r   r   r   r3   r4   r   )r#   rA   r   r   r   r   rJ   V  s4   





zElasticManager.watchc                 C   s   | j r|   || _d| _d S )NT)r   r   r   r   )r#   r   framer   r   r   signal_handlerz  s   
zElasticManager.signal_handler)r   )Fr    )r   r   r   r$   rk   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   rJ   r   r   r   r   r   rK   ~   s8     B

)1$rK   )r   r'   rY   r.   r   r   r   r3   r|   Zpaddle.distributed.fleetr   r   Z"paddle.distributed.utils.log_utilsr   Z
backup_envr   r1   ZELASTIC_EXIT_CODEr=   r   r   r   r   r   rK   r   r   r   r   <module>   s*   
F