o
    "j,                     @   sP   d dl Z d dlZddlmZ ddlmZmZ G dd deZG dd	 d	eZdS )
    N   )
DeviceType   )ControleMode
Controllerc                       sJ   e Zd Z fddZedd Zdd Zdd Zd	d
 ZdddZ	  Z
S )CollectiveControllerc                    s   d | _ t | d S )N)_tuner_run_modesuper__init__)selfctx	__class__ q/var/www/html/Deteccion_Ine/venv/lib/python3.10/site-packages/paddle/distributed/launch/controllers/collective.pyr
      s   zCollectiveController.__init__c                 C   s*   |r|j | j d tj|j_dS dS )N enabledTF)loggerdebug__name__r   
COLLECTIVEargsrun_modeclsr   r   r   r   enable   s
   
zCollectiveController.enablec                 C   sV   |   }|rd S | jjjd u r| jjjr| jjjr|  S | jjjd u r&d}| |S )NT)	_build_pod_with_tunerr   r   master
start_portips_build_pod_with_argsauto_parallel_config_build_pod_with_master)r   Zskip_runr   r   r   	build_pod%   s   
zCollectiveController.build_podc              
   C   s  | j jj}|d urtj|s| j jd |ds"| j jd t	|d}t
| }|dd| _W d    n1 s@w   Y  | j jd| j  d| j j  }|  }| jd	v r| j jjd
|d|dd|t| j jj  | d	}d}| j||dd | jdkrdS dS )Nzauto_parallel_conf not exists!z.jsonz2auto_parallel_config should be a json format file!rZtuner_run_modetuner_and_runztuner_run_mode is: z
127.0.0.1:)
tuner_onlyr$   10Ztuner)	PADDLE_AUTO_PARALLEL_CONFIGPADDLE_TRAINERS_NUMPADDLE_TRAINER_ENDPOINTSPADDLE_TRAINER_IDPADDLE_CURRENT_ENDPOINTZFLAGS_selected_gpusPADDLE_AUTO_PARALLEL_STAGEPADDLE_GLOBAL_SIZEPADDLE_LOCAL_SIZEz	tuner.logT)envslog_fileZis_initr%   F)r   r   r    ospathexistsr   warningendswithopenjsonloadsreadgetr   infonodeget_free_portpod_replicasintZnnodesadd_container)r   r    ZrobjZauto_parallel_dataZendpointr?   er1   r   r   r   r   5   sB   



z*CollectiveController._build_pod_with_tunerc           	         s      j_t jjj jjjd} fdd|D } jj	
d|   jjj|v r<| jjj jj nd} |  jjj } jjj jjj}t jjD ]}t|  jj ||  | t| |||  ||  t| t|d	}td|dk r|dd|i  jd ur| jjjd	d
 t|dkrو jjjjtjkr| jjj   jjdkr||d|i n|||| i n|ddi d| } j||d q[dS )N,c                    s0   g | ]}t  jjD ]}| d |  q
qS :)rangepodreplicas).0hpr   r   r   r   
<listcomp>a   s
    z=CollectiveController._build_pod_with_args.<locals>.<listcomp>zjob endpoints: r   )	r.   r/   PADDLE_GLOBAL_RANKPADDLE_LOCAL_RANKPADDLE_NNODESr,   r+   r)   PADDLE_RANK_IN_NODE  r*   runr(   r-   r   PADDLE_DISTRI_BACKENDgloo
workerlog.r0   r1   T)r?   rG   rH   r@   r   r   r   r   splitr   r   r=   ipindexsave_pod_logdeviceget_selected_device_keyget_selected_devicesdevicesrF   lenstrjoinupdater   r    dtyper   CUSTOM_DEVICEget_custom_device_envsrA   )	r   r   job_endpointsrank_offsetselected_dev_keyselected_dev_listirB   r1   r   rL   r   r   [   sZ   




z)CollectiveController._build_pod_with_argsTc                    s      j_t jjj j_ jj } fdd jj	 jjD }t
 jj jj jj jjjj jjj d| d|d} jd jj d jj| jj jj\}}| j_t|dk rjd	S d
d |D } jjd|   | tdd |D }tdd |d | D }	 |d d }	|	dd  }
|
tjd< dd |D }|r j   jjj } jjj  jjj!}t" jjD ]}|	|  jj ||  |  jj || ||  | t#|d
}td|dk r|$dd|i  j%d ur|$ jjj&dd t|dkrO jjjjt'j(kr3|$ jjj)   jjdkrE|$|d|i n|$||| i n|$ddi d| } j*||d qdS )Nc                    s    g | ]} j jj d | qS rD   )r   r=   rZ   )rI   rK   r   r   r   rM      s    z?CollectiveController._build_pod_with_master.<locals>.<listcomp>rE   rC   )namerankrH   re   	candidate	endpoints/z/infor   Fc                 S   s   g | ]}t |qS r   )r8   r9   rI   rl   r   r   r   rM      s    zsync peers done c                 S      g | ]}|d  qS rH   r   rs   r   r   r   rM          c                 S   rt   ru   r   rs   r   r   r   rM      rv   r   rp   ZCOLLECTIVE_MASTER_IPc                 S   rt   )rq   r   rs   r   r   r   rM      rv   )
ZPADDLE_MASTERr.   r/   rN   rO   rP   r,   r+   r)   rQ   rR   r*   rS   rT   rU   rV   rW   rX   T)+r?   rG   rH   r@   r   r   ro   r=   r>   Zget_free_portsr8   dumpsrn   r]   re   rZ   rc   r   Z
sync_peersjobidra   r   r   r\   sumrY   stripr2   environresetr^   r_   r`   rF   rb   rd   r   r    r   rf   rg   rA   )r   Z	reset_podportrq   dataZ	peer_listro   Zglobal_sizeri   Zcollective_masterZcollective_master_iprh   rj   rk   rl   rB   r1   r   rm   r   r!      s   






z+CollectiveController._build_pod_with_master)T)r   
__module____qualname__r
   classmethodr   r"   r   r   r!   __classcell__r   r   r   r   r      s    
	&?r   c                   @   s(   e Zd Zedd Zdd Zdd ZdS )CollectiveElasticControllerc                 C   s<   |j jr|j jdr|j| j d tj|j _dS dS )Nzetcd://r   TF)	r   r   
startswithr   r   r   r   r   r   r   r   r   r   r     s
   
z"CollectiveElasticController.enablec                 C   s4   | j jdkr| jjd | j| j j| jj d S )Ndefaultz?Using default job name may cause conflict, add --job_id in args)	rx   ry   r   r   r5   r   Zregister_heartbeatrG   rn   rm   r   r   r   register
  s
   z$CollectiveElasticController.registerc                 C   s.  t | jjj}| jjr|n|d }|   | jj| jjj	kr| 
  | jjd | j| jj| jj|\}}|r>|| j_n!| jjd| j  | j r^| jjd dd l}|d n+| jjd| j  |  soq| j| jjj |   |  rn	| jj| jjj	ks| jjd| j  d S )	N
   zWaiting peer ready...zpeer not ready z&Failed to start peer, auto tuner exit.r   zRun z	Job done )r@   r   r   Zelastic_timeoutrx   Zelasticr   rG   ZrestartZmax_restartZ	build_jobr   r<   r   Zwait_peer_readyZreplicas_minZreplicas_maxrH   r5   Zis_auto_tuner_modesysexitr   r"   Z
set_statusstatusZRUNNINGZ
deploy_podwatch)r   timeoutokrH   r   r   r   r   rS     s8   


!zCollectiveElasticController.runN)r   r   r   r   r   r   rS   r   r   r   r   r      s
    
r   )	r8   r2   Zcontext.devicer   
controllerr   r   r   r   r   r   r   r   <module>   s    k