o
    "jc,                     @   s   d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlmZ d dl	m
Z
 dZdd ZG dd dZG d	d
 d
eZG dd deZdS )    N)KVClient)KVServeretcd://c                 C   s.   t | } | dddd }tt|S )N	candidatez127.0.0.1:8080:r   )jsonloadsgetsplitint	ipaddressIPv4Address)xZip_x r   m/var/www/html/Deteccion_Ine/venv/lib/python3.10/site-packages/paddle/distributed/launch/controllers/master.py
_cmp_by_ip   s   
r   c                   @   sd   e Zd ZdZdZdZdZdd Zdd Zd	d
 Z	dd Z
dd ZddeeffddZedd ZdS )MasterzK
    Master is a distributed store design to exchange info among nodes
    mainZstandbyZparticipantc                 C   s   || _ d | _d| _d | _d S )NF)ctxserverinitializedendpoint)selfr   r   r   r   __init__,   s   
zMaster.__init__c                 C      t NNotImplementedErrorr   r   r   r   stop2      zMaster.stopc                 C      d S r   r   r   statusr   r   r   
set_status5   r    zMaster.set_statusc                 C   r!   r   r   r   r   r   r   
get_status8   r    zMaster.get_statusc                 C   r!   r   r   r   r   r   r   restart_peer;   r    zMaster.restart_peerreturnc                 C   r   r   r   )r   prefixkeyvaluesizerankr   r   r   
sync_peers>   r    zMaster.sync_peersc                 C   s&   |j jr|j jtrt|S t|S r   )argsmaster
startswithETCD_PROTOCAL
ETCDMaster
HTTPMaster)clsr   r   r   r   factoryA   s   zMaster.factoryNr'   )__name__
__module____qualname____doc__MAINZSTANDBY
PATICIPANTr   r   r$   r%   r&   listr   r.   classmethodr6   r   r   r   r   r   #   s    r   c                   @   s@   e Zd Zdd Zdd Zdd Zdd Zdd
eeffddZ	dS )r4   c              
   C   s  | j rd S tj| _| jjjrh| jjj| _| jd\}}|d| jj	j
fv rgtdt   | jj	|t|sgztt|| _tj| _W n! tyf } z| jjd|  td W Y d }~q.d }~ww nZ| jj	 }| jj	j
 d| | _t|| _tj| _td tjdd d	d
g}|d| jg |tjdd   td td| td t| jjjdkr| jjd d| jv r| jd| jj	j
| _t| j| _ d| _ | !  d S )Nr   z	127.0.0.1   zstart master failed 皙?z1Copy the following command to other nodes to run./r'   z-mzpaddle.distributed.launchz--master   zP-------------------------------------------------------------------------------- r   z9--rank set in the command may not compatible in auto modeT)"r   r   r=   roler   r/   r0   r   r
   nodeiptimesleeprandomZis_server_readyr   r   r   r<   	ExceptionloggerwarningZget_free_portprintsys
executableextendargvjoinr-   replacer   client_start_server)r   rG   portecmdr   r   r   	lazy_initJ   sV   



zHTTPMaster.lazy_initc                 C   s:   | j r| j js| j   | jjd| j  d S d S d S )NzKV server start at )r   startedstartr   rL   debugr   r   r   r   r   rV   }   s   
zHTTPMaster._start_serverc                 C   s2   | j r| j js| j   | jjd d S d S d S )NzKV server stopped)r   stoppedr   r   rL   r]   r   r   r   r   _stop_server   s   
zHTTPMaster._stop_serverc                 C   s   |    d S r   )r_   r   r   r   r   r      s   zHTTPMaster.stopr'   r(   c                    s  |dk r	|gdfS | j jd |   | j j s4| jjddr"n| j jd t	
d | j j r|dk r@| jtjkr@dn|}| d	| d	| }| j j s| j||sf| j jd
 t	
d qL| j| | j jd    rt |kr| j jjrt  td}||}	||	fS |dk rt  }
|
   fdd|
D }||}	||	fS d g| }  D ]\}}||t|d	d < q||fS t	
d | j j rRg dfS )Nr@   r   Waiting peer start...   )timeoutzmaster not readyrA   ZaaaaaarB   zput value failedsync peers r*   c                    s   g | ]} | qS r   r   .0kZrjsonr   r   
<listcomp>   s    z)HTTPMaster.sync_peers.<locals>.<listcomp>r'         ?)r   rL   inforZ   r#   is_donerU   Zwait_server_readyrM   rH   rI   rE   r   r<   put
get_prefixr]   lenr/   sort_ipsortedvaluesr   indexr>   keyssortitemsr   r
   )r   r)   r*   r+   r,   r-   kyrg   retidxrt   vr   rh   r   r.      sJ   







zHTTPMaster.sync_peersNr7   )
r8   r9   r:   rZ   rV   r_   r   r>   r   r.   r   r   r   r   r4   I   s    3r4   c                       sj   e Zd Z fddZddeeffddZddd	Zd
d Zdd Z	dd Z
dd Zdd Zdd Z  ZS )r3   c                    sz   t  | | jjjr| jjjd| _dd l}ddlm	} | j
d\}}| r3|||d| _d S |j||d| _d S )Nr   r   r@   )
ETCDClientr   )hostrW   )superr   r   r/   r0   stripr   etcd3Zutils.etcd_clientr{   r
   Zis_auto_tuner_moderU   )r   r   r   r{   r|   rW   	__class__r   r   r      s   
zETCDMaster.__init__r'   r(   c                    s  |dk r	|gdfS | j jd | d| d| }| j| | j jd| d|  | j j s| j||	d t
| j|}t|}| j jd|  t||kr| j jjrud	d
 |D ttd}||}	||	fS |dk rdd
 |D  dd
 |D }
|
  dd
 |D  fdd
|
D }||}	||	fS dg| }|D ]%\}}t|j dd }|dk r| j jd| d | ||< q||fS td | j j r3dS dS )z
        sync_peers gather all value for key under scope prefix
        result always be sorted either by rank or alphabet of pod.name
        r@   r   r`   rB   z
sync path z value latin-1rc   c                 S      g | ]}|d    qS r   decoderf   ir   r   r   ri          z)ETCDMaster.sync_peers.<locals>.<listcomp>rd   c                 S      g | ]	}|d  j  qS rC   r*   r   r   r   r   r   ri          c                 S   r   r   r   r   r   r   r   ri      r   c                 S   r   r   r   r   r   r   r   ri      r   c                    s   g | ]	}  | qS r   )rs   re   rt   rr   r   r   ri      r   Nr'   zrank z error in syncrj   )r   rL   rk   rU   delete_prefixr]   r#   rl   rm   encoder>   rn   copydeepcopyro   r/   rp   rq   r   rs   ru   r   r*   r   r
   errorrH   rI   )r   r)   r*   r+   r,   r-   pathresultrx   ry   Zsorted_keysrz   rg   iir   r   r   r.      sD   






zETCDMaster.sync_peers
   c                    s   t drjjd d S d| _j d_jj jj d  jj	 
dd fdd	}jj| fd
d}tjd|dd_j  d S )Nheartbeat_prefixzHeartbeat already donez/paddle/z
/heartbeatrB   r   leasec                    s    j j  d S r   )r   r#   Zrestart)eventr   r   r   _beat_watch  s   z2ETCDMaster.register_heartbeat.<locals>._beat_watchc               
      s   j j sOz   vr$jj dd j j	d W n t
yA }  zj jd|   W Y d } ~ nd } ~ ww td  j j rj j	d j d S )Nr   r   zHeartbeat register againzHeartbeat error r@   zHeartbeat done)r   r#   rl   refreshfetch_peer_aliverU   rm   r   rL   r]   rK   r   rH   rI   Zcancel_watch)rX   Z	beat_pathZ
beat_watchr   pod_idr   ttlr   r   
_heartbeat  s"    z1ETCDMaster.register_heartbeat.<locals>._heartbeat	heartbeatT)nametargetdaemon)hasattrr   rL   rM   
job_prefixr   rU   r   r   rm   r   Zadd_watch_prefix_callback	threadingThreadbeat_threadr\   )r   Zjob_idr   r   r   r   r   r   r   register_heartbeat   s$   
zETCDMaster.register_heartbeatc                 C   s0   dd | j | jD }| jjd|  |S )Nc                 S   r   r   r   r   r   r   r   ri   (  s    z/ETCDMaster.fetch_peer_alive.<locals>.<listcomp>zpeer alive )rU   rn   r   r   rL   r]   )r   Z
peer_aliver   r   r   r   '  s
   zETCDMaster.fetch_peer_alivec                 C   s   |dkr|nd}t   | }t|  }| jj sQt   |k rQt|  }||kr.d|fS ||kr@t   | }|}t d nt d | jj sQt   |k s t|  }||krc||krcd|fS d|fS )NrC      Tg?rj   F)rH   ro   r   r   r#   rl   rI   )r   Zreplicas_minZreplicas_maxrb   endZnp_prenpr   r   r   wait_peer_ready.  s"   
zETCDMaster.wait_peer_readyc                 C   s   | j | j d S r   )rU   r   r   r   r   r   r   r&   F  s   zETCDMaster.restart_peerc                 C   s4   | j j| j|d| j ddsJ d| d S )Nr   iX  r   zset status failed )rU   rm   r   r   r   r"   r   r   r   r$   I  s   
zETCDMaster.set_statusc                 C   s&   | j | jd }|d ur| S dS )Nr    )rU   r	   r   r   )r   r+   r   r   r   r%   P  s   zETCDMaster.get_statusc                 C   s   t | dr| jj  d S d S )Nr   )r   r   r#   doner   r   r   r   r   T  s   
zETCDMaster.stopr7   )r   )r8   r9   r:   r   r>   r   r.   r   r   r   r&   r$   r%   r   __classcell__r   r   r   r   r3      s    
1*r3   )r   r   r   rJ   rO   r   rH   Z)paddle.distributed.launch.utils.kv_clientr   Z)paddle.distributed.launch.utils.kv_serverr   r2   r   r   r4   r3   r   r   r   r   <module>   s   &q