o
    "j2                     @   s  d dl Z d dlZd dlmZ d dlZd dlmZ d dlmZm	Z	 d dl
mZ d dlmZ d dlmZ d dlmZ d d	lT d d
lmZ d dlmZmZmZ g dZdd Zdd Zdd ZG dd dZG dd deZG dd dZG dd deZ G dd dZ!G dd dZ"G dd  d e"Z#G d!d" d"e"Z$G d#d$ d$e"Z%G d%d& d&e%Z&G d'd( d(e"Z'G d)d* d*Z(G d+d, d,e(Z)G d-d. d.Z*G d/d0 d0e*Z+G d1d2 d2Z,G d3d4 d4Z-G d5d6 d6eZ.dS )7    N)text_format)fleet)CommunicatorHeterClient)wait_server_ready)the_one_ps_pb2)RuntimeBase)Coordinator)*)core)CompiledProgramExecutorProgram)TableSparseTableGeoSparseTableBarrierTableTensorTable
DenseTablec                 C   s@   | d }t |D ]\}}t||kr|| d | |f  S qdS )Norigin_main_programsorigin_startup_programs)NNN)	enumerateid)context
program_idZprogramsiprogram r   a/var/www/html/Deteccion_Ine/venv/lib/python3.10/site-packages/paddle/distributed/ps/the_one_ps.pyget_program_by_id+   s   r   c                 C   s   t ||\}}}| jD ]6}t|st|sq|dd }|| kr(|jdks-|jdkrC|dr@|ddkr@|d  S  dS qd S )NWr   lookup_tablelookup_table_v2table_classnoneMemorySparseTable)	r   global_blockopsis_distributed_sparse_opis_sparse_opinputtypeZhas_attrattr)varnamer   r   main_programstartup_programidxop
param_namer   r   r   parse_table_class3   s   

r3   c                 C   s   t ||\}}}d}| D ]}|j|kr*|jd }td| d| d| j   nq| j}	| jdkrD|	|d krCtd|d |	n|	|krPtd||	| j	}
| jdkrj|
|d krhtd	|d |
d S |
|d
 krztd|d
 |
d S )Nr      z	new var: z, SparseAccessor   zIThe fea_dim is wrong, it will be sparse_embedding_dim + 2: {}, but got {}zEThe fea_dim is wrong, it will be sparse_embedding_dim: {}, but got {}zLThe embedx_dim is wrong, it will be sparse_embedding_dim - 1: {}, but got {}   zLThe embedx_dim is wrong, it will be sparse_embedding_dim - 3: {}, but got {})
r   	list_varsnameshapeprintfea_dimaccessor_class
ValueErrorformat
embedx_dim)accessor_protor-   r   r   r.   r/   r0   embedding_dimvarr<   r@   r   r   r   check_embedding_dimF   sN   



rD   c                   @      e Zd Zdd Zdd ZdS )Servicec                 C      d S Nr   selfr   r   r   __init__q      zService.__init__c                 C   s"   d|_ d|_d|_d|_d|_d S )NZBrpcPsServerZBrpcPsClientZBrpcPsServicer      )server_classclient_classZservice_classZstart_server_portZserver_thread_numrJ   Zservice_protor   r   r   _sett   s
   
zService._setN__name__
__module____qualname__rK   rQ   r   r   r   r   rF   p       rF   c                       $   e Zd Z fddZdd Z  ZS )
GpuServicec                       t    d S rH   superrK   rI   	__class__r   r   rK   }      zGpuService.__init__c                 C   s   d|_ d|_d S )NZPsLocalServerZPsLocalClient)rN   rO   rP   r   r   r   rQ      s   
zGpuService._setrS   rT   rU   rK   rQ   __classcell__r   r   r\   r   rX   |       rX   c                   @   rE   )Accessorc                 C   s   d| _ d | _d| _d| _d S )N r   )r=   Z	optimizerZfeature_dimrB   rI   r   r   r   rK      s   
zAccessor.__init__c                 C   s&  t ||\}}}d}	| D ]}
|
j|kr|
jd }	 nq|ds-|d r*d|_nd|_|ds@|jdkr=|	d |_n|	|_|d	sU|jdkrP|	d |_n|	d
 |_|ds]d|_|j	}|dshd|_
|dspd|_|j}|jdkr{d|_|dsd|_|dsd|_|dsd|_|dsd|_|dsd|_|dsd|_|dsd|_|dsd|_|dsd|_|j|jfD ]F}|ds|jdkrd |_|jd!krd"|_nd"|_|jd#ks|jd$kr"|jd%sd|j_|jd&sd'|j_|jd(sd)|j_t|jjdkr"|jjd*d+g |jd krh|j d%s?|j!d, "d-d }t#||j _|j d(sV|j!d "d-d, }t#||j _t|j jdkrh|j jd*d+g |jd"kst|jd.kr|j$d%s|j!d, "d-d }t#||j$_|j$d(s|j!d "d-d, }t#||j$_d/d0 |j%D }|j$d1s|jd!krt#|d d |j$_&nd2|j$_&|j$d3s|jd!krt#|d d |j$_'nd4|j$_'|j$d5s|jd!krt#|d d |j$_(nd6|j$_(t|j$jdkr|j$jd*d+g qd S )7Nr   r4   r=   
use_ps_gpuZCtrDymfAccessorr5   r<   r6   r@   r7   embedx_thresholdnodeid_sloti0#  feature_learning_rateg?Fnonclk_coeffg?click_coeffg      ?base_thresholddelta_thresholddelta_keep_days   show_click_decay_ratedelete_thresholddelete_after_unseen_days   ssd_unseenday_thresholdr9   sgdZSparseNaiveSGDRuleadamZSparseAdamSGDRuleZSparseAdaGradSGDRuleZStdAdaGradSGDRulelearning_rateinitial_g2sumg      @initial_rangeg-C6?g      $g      $@&ZSparseSharedAdamSGDRulec                 S   s   g | ]}| d qS )ry   )split).0xr   r   r   
<listcomp>   s    z!Accessor._set.<locals>.<listcomp>beta1_decay_rateg?beta2_decay_rateg+?ada_epsilong:0yE>))r   r8   r9   r:   HasFieldr=   r<   r@   re   graph_sgd_paramrf   rg   ctr_accessor_paramZ	zero_initrh   ri   rj   rk   rl   rn   ro   rp   rr   Zembed_sgd_paramZembedx_sgd_paramZadagradru   rv   rw   lenZweight_boundsextendZnaiveinitializersrz   floatrt   attrsr~   r   r   )rJ   rA   r-   r   r   Zcommon_accessorr.   r/   r0   rB   rC   r   r   Z	sgd_paramru   rw   Z	attr_listr   r   r   rQ      s   





























zAccessor._setNrR   r   r   r   r   rb      s    rb   c                       sL   e Zd Z fddZdd Zdd Zdd Zd	d
 Zdd Zdd Z	  Z
S )CommonAccessorc                    sX   t    d| _d| _g | _g | _g | _d| _d| _g | _	i | _
i | _i | _|   d S )Nrc   r$   r   F)r[   rK   
table_nameentryr   paramsdimstrainer_numsyncr   opt_input_mapopt_attr_mapopt_init_mapdefine_optimize_maprI   r\   r   r   rK     s   
zCommonAccessor.__init__c                 C   s   i }ddg|d< g d|d< g d|d< dg|d< g d	|d
< ddg|d< i }g |d< g |d< g |d
< g d|d< g d|d< dg|d< i }g d|d< dg|d< g d|d< g d|d< || _ || _|| _d S )NParamNLearningRater4   rs   )r   )ZMoment1N)ZMoment2N)ZBeta1Powr4   )ZBeta2Powr4   r   rt   )r   )ZD2SumN)G2SumN)ZMomentN)MomentDecayRater4   )AdaDecayRater4   )
AdaEpsilonr4   r   
adam_d2sumsum)r   )r   r4   r   naive_adagrad)SummaryDecayRater4   summary))Zbeta1f)Zbeta2r   )epsilonr   )Zsummary_decay_rater   )seedmeanZstdZgaussian_randomvalueZfill_constant)r   minmaxZuniform_randomZtruncated_gaussian_random)r   r   r   )rJ   r   r   r   r   r   r   r   (  s,   




z"CommonAccessor.define_optimize_mapc           	      C   s   t ||\}}}| jD ]3}t|st|sq|dd }||kr1|jdkr1|d| _ d S ||kr@|jdkr@d| _ d S qd S )Nr    r   r!   r   r"   r$   )	r   r&   r'   r(   r)   r*   r+   r,   r   )	rJ   r-   r   r   r.   r/   r0   r1   r2   r   r   r   parse_entry_  s   
zCommonAccessor.parse_entryc                 C   s@   t || d }||d  |kr|S || |k r|||  S dS )Nr4   r   int)rJ   Z	total_dim	shard_num
pserver_id	blocksizer   r   r   	get_shardq  s   zCommonAccessor.get_shardc           	      C   s   d}d}|}|  jD ]2}|j| j v r=||dd kr=|jg}| j|j D ]}|t|| q(|	|} |S q|S )Nry   rc   ZOutr   )
r&   r'   r+   r   keysoutputappendstrr,   join)	rJ   Z
value_nameZo_startup_programZl_inZattr_strZorigin_var_namer1   Z	init_attrr,   r   r   r   get_initializer_attr|  s   
 z#CommonAccessor.get_initializer_attrc                 C   s  |  d }| }| d }| r| d nd}|d j}t|| \}}	}
t|d }tt|d }t	|}d }|D ]}d|j
v rY|dd |d | krY|} nqA|d u retd| g }g }g }g }t|d | _|| _|| _|jdkr|rtd	 d
}td| |d tjkr| jd }| jd }d| _nV|d r|r| jd }| jd }d| _nB| r| jd }| jd }d| _n0|r|s| jd }| jd }d| _n|jdkr|jdkrtd| j|j }| j|j }|j| _|D ])\}}|| | jdkr}|d u r|r|}n| |||}|| |dks*|dkr]| j||d  }|dkrU|jdt|
 krUt d | jdt|
  }| !|j|	}n|dkred}n|dkrmd}n
|dkrud}nd}|| q| jdkr|d u r|r|}n| |||}|| |dkr| j||d  }| !|j|	}n
|dkrd}nd}|| q|d kr|d d}|| q| j||d  }|dkr |jdt|
 kr t d | jdt|
  }|d u r|r|}n| |||}|| | !|j|	}|| q| jdkrLt"|}|D ]}d!|j
v rJ|d!d |d | krJ|} nq0|D ]\}}|#|}|d"$|t|g qN|| _%|| _&|| _'|| _(d S )#Nr   r4   user_defined_strategy
role_makerr   grad_name_to_param_namezcan not find optimizer for rt   z8optimization algorithm is not adam, set adam_d2sum FalseFzadam_d2sum:ps_moder   rd   r   rs   r   r   z8The dense optimizer in PS is only supported SGD or Adam!r   learning_rate_zwill support decay soonr   zfill_constant&0.99r   zfill_constant&0.9999r   zfill_constant&1.0e-8zfill_constant&0r   zfill_constant&0.999999r   Z	BatchSizery   ))origin_varnames	is_sparsesectionsr   r   r   get_role_idr   get_ps_endpointsZget_optimize_opsZinput_namesr*   r>   get_trainersr   	table_num	table_dimr+   r;   DistributedModeGEOr   r   r=   Zis_datanorm_tabler   r   r&   varsr9   r   warningswarnr   Zget_datanorm_opsr,   r   r   r   r   r   )rJ   ctxr   Z	grad_namer   sizeZ
single_dimr   r.   r/   r0   r   Zpserver_numZoptimizer_opsZoopr1   r   r   r   r   Zparam_varnamesZattr_varnamesZformal_namer:   paramZinitializerZdatanorm_opsZattr_varnametype_r   r   r   r   parse_by_optimizer  s  



































z!CommonAccessor.parse_by_optimizerc                 C   st   | j |_| j|_|j| j |j| j |j| j | j|_| j|_| j	|_	| j
|_
| j|_d| j|_d S )N#)r=   r9   r   r   r   r   r   r   r   r   r   r   r   r   r,   rJ   protor   r   r   rQ   F  s   zCommonAccessor._set)rS   rT   rU   rK   r   r   r   r   r   rQ   r`   r   r   r\   r   r     s    7 6r   c                   @   rE   )Tensorc                 C   
   || _ d S rH   )tensor_dict)rJ   Ztesnor_dcitr   r   r   rK   U     
zTensor.__init__c                 C   sT   | j dd|_| j dd|_| j dd|_| j dd|_| j dd|_d S )Nmain_program_idr   startup_program_idfeed_var_namerc   fetch_var_nametensor_table_class)r   getr   r   r   r   r   )rJ   Ztensor_protor   r   r   rQ   X  s   
zTensor._setNrR   r   r   r   r   r   T  rV   r   c                   @   rE   )r   c                 C   s2   d | _ d| _d | _t | _d| _t | _d | _d S )Nrx      )r#   r   r+   rb   accessorr   commontensorrI   r   r   r   rK   g  s   
zTable.__init__c                 C   rG   rH   r   )rJ   table_protor   r   r   rQ   p  rL   z
Table._setNrR   r   r   r   r   r   f  s    	r   c                       rW   )r   c                    s^   t    d | _d| _d| j_d| j_g | j_g | j_	|d | _
|d | _|| _|d | _d S )Nr   CommMergeAccessorrc   is_heter_ps_moder   is_sync)r[   rK   r+   r   r   r=   r   r   r   r   r   r   r0   r   )rJ   r   r0   r\   r   r   rK   u  s   


zBarrierTable.__init__c                 C   s   | j |_d|_d|_tj|_d|j_d|j_	d|j_
d|j_d|j_| j|j_d|j_t| j}| jr<|t| j 7 }||j_d S )Nr   r   r   r   rc   Zbarrier_tabler$   )r0   table_idr#   r   r   PS_OTHER_TABLEr+   r   r=   r<   r@   r   r9   r   r   r   r   r   r   r   r   _get_heter_worker_endpointsr   )rJ   r   r   r   r   r   rQ     s   

zBarrierTable._setr_   r   r   r\   r   r   t  s    r   c                       rW   )r   c                    s    t    || _|| _|| _d S rH   )r[   rK   r0   r   r   )rJ   r0   r   r   r\   r   r   rK     s   

zTensorTable.__init__c                 C   sb   | j |_tj|_| jdd|_d|j_	| jdd|j
_t| j|j
_t| j}||j d S )Nr   rc   r   r   )r0   r   r   r   r+   r   r   r#   r   r=   r   r   r   r   r   r   rQ   r   )rJ   r   r   r   r   r   rQ     s   
zTensorTable._setr_   r   r   r\   r   r         r   c                       rW   )r   c                    s.   t    || _|| _d | _d| _t | _d S )Nr%   )r[   rK   r   r   r+   r#   rb   r   rJ   r   send_ctxr\   r   r   rK     s   
zSparseTable.__init__c                 C   sN  | j }| st| dk s| sd S | |_| j|_tj|_	| j
|_
|jtt| jd kr<tt| jd |_| jd | d  | j_| j|| j | j| jj| | j | jd rednd| j_| j|j td| jj  | jd	 j}| }|D ]}|j| jjkr|} nq|d
r|j|_nd|_td |dr|j
|_
n| jd rd|_
td nd|_
td |dr|j|_|dr|j|_|dr|j|_|dr|j|_|dr|j|_|j  dkrtd |j!|j"  | j|j| jj| | j| j t#|j| jj| | j d S )Nr4   r   r   r   r   TFznew table_name: r   r#   r%   z'The PS mode must use MemorySparseTable.r   rd   %   zHThe shard_num of sparse table is not set, use default value 37 in gpups.i  zJThe shard_num of sparse table is not set, use default value 1000 in cpups.enable_sparse_table_cachesparse_table_cache_ratesparse_table_cache_file_numenable_revertshard_merge_ratez;The accessor of sparse table is not set, use default value.)$r   is_tensor_tabler   r   r   r   r#   r   PS_SPARSE_TABLEr+   r   r   r   r   r   r   r   r   r   r   rQ   r;   Zsparse_table_configsaddr   r   r   r   r   r   r   r   ZByteSizeZParseFromStringZSerializeToStringrD   )rJ   r   r   Zall_table_protoZusr_table_protor   r   r   r   rQ     s   












zSparseTable._setr_   r   r   r\   r   r     s    r   c                       rW   )r   c                    s0   t  || d| _| jd tjkrtdd S )NZMemorySparseGeoTabler   znot geo sparse table!)r[   rK   r#   r   r   r   r>   r   r\   r   r   rK     s
   zGeoSparseTable.__init__c                 C   s   | j }| st| dk s| sd S | |_| j|_tj|_	| j
|_
d|j_| d |j_| d |j_| jd | d  | j_| j|| j | j| jj| | j d| j_| j|j d S )Nr4   r   r   r   F)r   r   r   r   r   r   r#   r   r   r+   r   r   r=   r   r<   r@   r   r   r   r   r   r   r   rQ   rJ   r   r   r   r   r   rQ   "  s.   

zGeoSparseTable._setr_   r   r   r\   r   r     r   r   c                       rW   )r   c                    s"   t    || _|| _t | _d S rH   )r[   rK   r   r   rb   r   r   r\   r   r   rK   ?  s   
zDenseTable.__init__c                 C   s   | j }| st| dk s| rd S | |_tj|_d|_	d|_
d|j_| d |j_d|j_d| j_| j|| j | j| jj| | j | jd rTdnd	| j_| j|j d S )
Nr4   ZMemoryDenseTabler   r   r   ZMergedDenser   TF)r   r   r   r   r   r   r   ZPS_DENSE_TABLEr+   r#   r   r   r=   r   r<   r@   r   r   r   r   r   r   r   rQ   r   r   r   r   rQ   E  s*   
zDenseTable._setr_   r   r   r\   r   r   >  r   r   c                   @   rE   )Serverc                 C   rG   rH   r   rI   r   r   r   rK   c  rL   zServer.__init__c                 C   rG   rH   r   rI   r   r   r   rQ   f  rL   zServer._setNrR   r   r   r   r   r   b  rV   r   c                       rW   )DownpourServerc                    rY   rH   rZ   rI   r\   r   r   rK   k  r^   zDownpourServer.__init__c                 C   rG   rH   r   rI   r   r   r   rQ   n  rL   zDownpourServer._setr_   r   r   r\   r   r   j  ra   r   c                   @   rE   )Workerc                 C   rG   rH   r   rI   r   r   r   rK   s  rL   zWorker.__init__c                 C   rG   rH   r   rI   r   r   r   rQ   v  rL   zWorker._setNrR   r   r   r   r   r   r  rV   r   c                       rW   )DownpourWorkerc                    rY   rH   rZ   rI   r\   r   r   rK   {  r^   zDownpourWorker.__init__c                 C   rG   rH   r   rI   r   r   r   rQ   ~  rL   zDownpourWorker._setr_   r   r   r\   r   r   z  ra   r   c                   @   rE   )fsClientc                 C   r   rH   )fs_client_param)rJ   r   r   r   r   rK     r   zfsClient.__init__c                 C   s<   t | jsd S | jj|_| jj|_| jj|_| jj|_d S rH   )r   MessageToStringr   uriuserpasswdZ
hadoop_binr   r   r   r   rQ     s   


zfsClient._setNrR   r   r   r   r   r     rV   r   c                   @   sL   e Zd Zdd Zdd Zdd Zdd Zd	d
 Zdd Zdd Z	dd Z
dS )PsDescBuilderc                 C   s   || _ |d | _|d | _|d | _|d | _d | _t| j | jd| _i | _g | _	| 
 | _|  | _|  | _t | _t | _d S )Nr   r   r   rd   split_dense_table)r   r   r   r   rd   barrier_table_idget_the_one_send_contextr   tensor_table_dict_server_sub_program_get_tablestables_get_serviceservice_get_fs_client	fs_clientr   ZPSParameterps_descZFLParameterZfl_desc)rJ   r   r   r   r   rK     s    







zPsDescBuilder.__init__c                 C   sX   d}| j s| jt j g }| j D ]}|t d t|t| jd  |d7 }q|S )Nr   r   r   r4   )	r   r  r   r   descglobalsr   r   r   )rJ   Zprogram_idxr  r   r   r   r   _get_tensor_tables  s   

z PsDescBuilder._get_tensor_tablesc                 C   s  g }t | j D ]b\}\}}td||| | r\| jtjkrO| jd r0|d d | jd v s5| jd sB|	t
 d | j| q	|	t
 d | j| q	|	t
 d | j| q	| jsk|	t
 d | j| q	|  | _|| j |	t
 d | jt| |S )Nzidx, name, ctx:local_sparser   r   r   r   )r   r   itemsr;   r   r   r   r   r   r   r
  rd   r  Ztensor_tablesr   r   )rJ   r  r0   r9   r   r   r   r   r    s0   
zPsDescBuilder._get_tablesc                 C   s   | j rt S t S rH   )rd   rX   rF   rI   r   r   r   r    s   zPsDescBuilder._get_servicec                 C   s   t | jd jS )Nr   )r   r   r   rI   r   r   r   r    s   zPsDescBuilder._get_fs_clientc                 C   rG   rH   r   )rJ   Zclient_infor   r   r   build_fl_client_desc  rL   z"PsDescBuilder.build_fl_client_descc                 C   s   | j D ]+}| jjjj }|| | jjjj }|| t	|t
kr.| jd u r.|j| _q| j| jjjj | j| jj t| jS rH   )r  r  Zworker_paramZdownpour_worker_paramdownpour_table_paramr   rQ   server_paramdownpour_server_paramr+   r   r   r0   r  service_paramr  r   r   r   rJ   tabler   r   r   r   build_worker_desc  s   



zPsDescBuilder.build_worker_descc                 C   s   i | _ | jD ]"}| jjjj }|| |jt	j
kr(|jd ur(|j| j |jj< q| j| jjjj | j| jj t| jS rH   )sparse_table_mapsr  r  r  r  r  r   rQ   r+   r   r   r   r   r   r  r  r  r   r   r   r  r   r   r   build_server_desc  s"   



zPsDescBuilder.build_server_descN)rS   rT   rU   rK   r  r  r  r  r  r  r  r   r   r   r   r     s    r   c                       s0  e Zd Z fddZdd Zdd Zdd Zd	d
 Zdd ZdBddZ	dBddZ
dd ZdCddZdd Zdd Zeg fddZdd Z	dBddZd d! Z	"dDd#d$Z		%	"dEd&d'Zd(d) Zd*d+ Zd,d- Zd.d/ Z	dFd0d1Zd2d3 Zd4d5 Zd6d7 Zd8d9 Zd:d; Zd<d= Z d>d? Z!dBd@dAZ"  Z#S )GTheOnePSRuntimec                    s<   t    d | _d | _t | _d | _g | _d | _	d | _
d S rH   )r[   rK   _communicator_serverr   DistFleetWrapper_worker_coordinatorr  _heter_client	_send_ctxrI   r\   r   r   rK     s   


zTheOnePSRuntime.__init__c           	      C   s*  || _ |d | _t| j| _tttdd| _|d | _	|
d| j	g| _| j| j d< |
d|d g| j d< | jj| j d< | j d | _t|d	 | j d
< | j d
 j| j d< |d	 jd | j d< | j d tjkrldnd| j d< i | j d< i | j d< |d jd | j d< |d jd | j d< td| j d | j d  t| j  t| j| _t| j| _g | _t| jD ]\}}|d\}}t|t||}| j |!  q| jj"| _#g | _$| j#rtd| j  | j% }t|D ]\}}|d\}}t|t||}| j$ |!  qt&| j | _'d S )Nr   ZPSERVER_DEBUG0origin_main_programr   r   Zorigin_startup_programr   valid_strategytrainerr   rd   TFr   r   Ztensor_tabler   r  Zremote_sparsez+fl-ps > local_sparse: {}, remote_sparse: {}:zfl-ps > all ps addrs: )(r   r   r   role_idboolr   osgetenvdebugr"  r   r   _is_heter_parameter_server_moder   ZTrainerRuntimeConfigmodea_sync_configsr   SYNCtrainer_desc_configsr;   r?   Zbuild_var_distributedZget_trainer_endpointstrainer_endpointsr   	endpointsstring_hostsr   rz   r   ZPSHostr   Zserialize_to_stringZ_with_coordinatorwith_coordinatorcoordinator_hostsZ_get_coordinator_endpointsr   ps_desc_builder)	rJ   r   r0   ephostportZpshostZcoordinator_endpointsipr   r   r   _set_basic_info  sz   












zTheOnePSRuntime._set_basic_infoc                 C   l   g }|  D ]-\}}| rqt| j| \}}}|| }	| }
||
 }| j|	|
| || q|S rH   )	r  r   r   r   r   r   r  push_dense_paramsr   rJ   scopesr   recv_mapall_var_namesr9   r   _r0   scoper   	var_namesr   r   r   _init_all_paramsQ     z TheOnePSRuntime._init_all_paramsc                 C   r;  rH   )	r  r   r   r   r   r   r  pull_dense_paramsr   r=  r   r   r   _pull_all_dense_  rE  zTheOnePSRuntime._pull_all_densec           
      C   `   g }|  D ]'\}}| rq| t|krq| }|| }	| j|||	 ||	 q|S rH   )r  r   r   r   r   r  r<  r   
rJ   r   rB  r   r?  r@  r9   r   r   rC  r   r   r   _init_paramsm     zTheOnePSRuntime._init_paramsc           
      C   rH  rH   )r  r   r   r   r   r  rF  r   rI  r   r   r   _pull_dense{  rK  zTheOnePSRuntime._pull_denseNc                    s   j  } jd r0 jd jj}|jsi |_d|jd< td}dd |dD }||jd<  fd	d
}t	 j j
d}t j j
 jd}| _ jd } jr~td|  td |D ]}	t|	 d||	   q`|D ]}	t|	 d||	   qpi }
d|
d<  j |
d<  j |
d<  j j|
d<  jd tjkr| }|
| td|   j| j j  j
st j _td j  td j  td j   jr j  j j j  jd tj!ks j
rt"|j#|
|  _$ j$%||| jt&j'(  t)j*+   j, }t-|t.rIt/|dkrI j0|d }t-|t.s9t12d |g} j3|  j4  td ntd  jd }t5t6td d}|d u rst/ j7d!krmt8d"t&j'( g}t/ j7t/|krt9d#| _:|s jd tj!ks j
r j$;| n jd s jdkrtd$  <||| t)j*+   jd sĈ =||| t)j*+   jd tj!ksֈ j
r j$> s j$?  nt12d% |j@d& }t6td'd(}|rB|rDtA jB   j
r jC g krtA jC   j
rFg } jD g kr' jD }g } jC g kr6 jC }tE|| j  _Fd S d S d S d S ))Nrd   ZlossTZFLAGS_selected_gpusc                 S   s   g | ]}t |qS r   r   )r{   sr   r   r   r}     s    z0TheOnePSRuntime._init_worker.<locals>.<listcomp>,Zworker_placesc                     s$   i }  j  | d<  j  | d< | S )NZpserver_endpoints
trainer_id)r   _get_pserver_endpointsZ_worker_index)kwargsrI   r   r   sync_strategy_envs  s   z8TheOnePSRuntime._init_worker.<locals>.sync_strategy_envsr   r   Zep_listr$  zworker_desc: 
zcommunicator send_ctx:z: r!  Zneed_global_steprO  trainersr   r   zcommunicator config:zfl-ps > trainer_endpoint: zfl-ps > with_coordinator? zfl-ps > coordinator addr: r   z!gloo may not initialize correctlyzcreate c2c connection donezcannot create c2c connectionr#  Z	TEST_MODEr4   z;You must set the scope list when you have Multiple programszlen(programs) != len(scopes)z entering self._init_all_params()z'communicator has been initialized, skiplaunch_barrierZFLAGS_LAUNCH_BARRIER1)Gr5  r  r   blockr   Z
_fleet_optr(  r)  rz   get_the_one_recv_contextr   r   r1  r   r*  r;   r   _role_idZ_worker_numr   r   r.  updateZget_communicator_flagsr  Zinit_workerr2  r&  Zget_trainer_endpointZtrainer_endpointr3  r4  Zinit_fl_workerr   r   r,  r  Zinit_with_ctxpaddlestaticZglobal_scoper   utilbarrierZget_client_info
isinstancelistr   Z_all_gatherr   r   Zset_clientsZcreate_client2client_connectionr'  r   r   r>   Z
VauleErrorr>  Zinit_paramsrD  rG  Z
is_runningstartr-  r   rP  Z_get_next_trainersZ_get_previous_trainersr   r  )rJ   r>  Zworker_descr.   Zgpus_envrR  	dense_mapr   Ztrainer_configkeyrQ  Zsync_kwargsinfoZall_infoZdist_strategyZis_testrU  Zlaunch_barrier_flagZprevious_trainersZnext_trainersr   rI   r   _init_worker  s   




















zTheOnePSRuntime._init_workerc                 C   sT   | j d u rt| j| _ td| jd   td| j  | j | jd | j d S )Nz>>> curr node ip: r   z>>> all trainer endpoints: )r  r	   r2  r;   r4  r0  Zstart_coordinator)rJ   r>  r   r   r   _init_coordinator)  s   
z!TheOnePSRuntime._init_coordinatorc                 C   s   | j d u rd S | j   d S rH   )r  Zmake_fl_strategyrI   r   r   r   _make_fl_strategy3  s   
z!TheOnePSRuntime._make_fl_strategyc                 K   s
  | j  }t| j}| jr|t| j 7 }| jr td|  t	
 | _| j|| j| j|| j t| jd}t| jd}|| }|d u rI|}	n|D ]}
|
|vrXtd|qK|}	|d u sa|	scd S | j j}tj|}| j }|	D ]}
||
 }| j|d| qtd S )Nzserver_desc: 
TFz6fleet.init server can only load sparse variables in {}r!  )r5  r  r   r   r   r   r   r*  r;   r   r  r  Zinit_serverr2  r&  r  get_sparse_tablenamesr   r>   r?   r  r(  pathnormpathrY  Zload_sparse)rJ   dirnamerC  rQ  Zserver_descrT  Zdist_varnamessparse_varnamesdistributed_varnamesZload_varnamesvar_namer  r   r   r   r   r   _init_server9  sN   



zTheOnePSRuntime._init_serverc                 C   s.   t | j}|d\}}| j|t| d S )Nr%  )Zget_ps_endpointr   rz   r  Z
run_serverr   )rJ   r6  r7  r8  r   r   r   _run_serverj  s   
zTheOnePSRuntime._run_serverc                 C   sN   | j d tjkr| j  | j  | jr%| jd usJ d| j  d S d S )Nr   z/heter client should not be None in heterps mode)	r   r   r   r  stopr  Zstop_workerr   r  rI   r   r   r   _stop_workero  s   

zTheOnePSRuntime._stop_workerc                    s    fdd}|S )Nc                    s   | j  v rdS ddlm} || j \}}}|drdS |dr#dS | j tjj	j
ksA| j tjj	jksA| j tjj	jkrCdS | jS )NFr4   )_get_varname_partsz@GRADr   )r9   Zutils.publicrs  endswith
startswithr	  r+   r   ZVarDescZVarTypeZFEED_MINIBATCHZ
FETCH_LISTZREADERZpersistable)rC   rs  Zorigin_varnamerA  exclude_var_namesr   r   is_valid{  s   


z0TheOnePSRuntime.__exclude_vars.<locals>.is_validr   )rw  rx  r   rv  r   Z__exclude_varsy  s   zTheOnePSRuntime.__exclude_varsc                 C   s.   | ds
| drd}|S tj|d}|S )Nzafs:zhdfs:z./dnn_pluginZ
dnn_plugin)ru  r(  ri  r   )rJ   rk  
model_pathr   r   r   _get_inference_model_path  s
   z)TheOnePSRuntime._get_inference_model_pathc                    s   t | j| jd}t| j| j| jd} d u st| jdkr"| jd  |  |||}|d u r0|n|}	 fdd|	D }
dd l}|j	
| |j	j|d |
|d W d    d S 1 s\w   Y  d S )	Nr   rS  r4   r   c                       g | ]	}   |qS r   r&   rC   )r{   r   r   r   r   r}     s    z9TheOnePSRuntime._ps_save_dense_params.<locals>.<listcomp>z./)r   filename)rX  r   r   r   r1  r   r   rL  r[  r\  Zscope_guardZ	save_vars)rJ   executorrk  rB  r   rC  rb  r   Zdense_var_namesZsave_var_namesr   r[  r   r}  r   _ps_save_dense_params  s&   

"z%TheOnePSRuntime._ps_save_dense_paramsc                 C   sr   t | jd}g }| |}| D ]%\}	}
|
d |vr)z	| j|	| W n   Y | j|	|| ||
 q|S NTr   )rh  r   rz  r  r  Zrecv_and_save_modelsave_one_modelr   )rJ   r  rk  r   r.   r,  rm  valuesry  r   namesr   r   r   _save_sparse_params  s   
z#TheOnePSRuntime._save_sparse_paramsr   c                 K   sH   t |ts	td|du r| jd }t |trtd| j|| dS )a  
        This function filters out all variables with `persistable==True` from the
        give `main_program` and then saves these variables to the folder `dirname`
        or file `filename`.

        The `dirname` is used to specify the folder where persistable variables
        are going to be saved. If you would like to save variables in separate
        files, set `filename` None; if you would like to save all variables in a
        single file, use `filename` to specify the file name.
        ;in fleet.save() function, executor must be as Executor typeNr"  ^in fleet.save() function, main_program must be as Program type, CompiledProgram is not allowed)r_  r   	TypeErrorr   r   r  Zsave_all_model)rJ   r  rk  r.   r,  rQ  r   r   r   _save_distributed_persistables  s   


z.TheOnePSRuntime._save_distributed_persistablesTc                    s|  t |ts	tdddl}|du r| jd n| t| jt \}	}	}
| j|
 }t	d|
 t  t
r6td fdd|D }|j ||}|  | |}d}tj||}||| t| jd	| jd
}| |||||}t| j| jd}t| j| j| jd}|  ||| | jd jd }t|}ttt|| }|D ]}||}|j|tj||j dd qdS )z
        Prune the given `main_program` to build a new program especially for inference,
        and then save it and all related parameters to given `dirname` by the `executor`.
        r  r   Nzsave inference model scope idx:r  c                    r{  r   r|  )r{   r9   r}  r   r   r}     s    zFTheOnePSRuntime._ps_inference_save_inference_model.<locals>.<listcomp>Z	__model__FZis_denser   r   rS  r   Zstat_var_namesT)Zuse_binary_format)!r_  r   r  r[  r   r   r   r   r>  r;   r   r\  Znormalize_programZ_copy_dist_param_info_fromrz  r(  ri  r   saverX  r   r  r   r1  rL  r/  r`  filterr  _TheOnePSRuntime__exclude_varsr8   	get_valuer9   )rJ   r  rk  Zfeeded_var_namesZtarget_varsr.   Zexport_for_deploymentr,  r[  rA  r0   rB  Z	feed_varsZinfer_programry  Zmodel_basenamesparsesZsparse_namesrb  r   Zgenerate_varsremaining_varsrC   r   r   r}  r   "_ps_inference_save_inference_model  s   









z2TheOnePSRuntime._ps_inference_save_inference_modelc                 K   s   | dd}| dd}| j  tj  d}| j r#| j|}tj  | j r6| j	|||| tj  d}| j rJ| j
|||}tj  |S )Nr,  r4   r   r   g        rx   )r   r  Zclient_flushr   r]  r^  r   _is_first_workerZget_cache_thresholdZcache_shuffleZ
save_cache)rJ   rk  rQ  r,  r   Zcache_thresholdZfeasign_numr   r   r   _save_cache_model;  s    







z!TheOnePSRuntime._save_cache_modelc                 C   s<   t j  | jd s| j r| j||| t j  d S Nrd   )r   r]  r^  r   r   r  r  Zsave_cache_table)rJ   r   Zpass_idZmem_cache_key_thresholdr   r   r   _save_cache_tableS  s   
z!TheOnePSRuntime._save_cache_tablec                 C   s&   t j  | j r| j  d S d S rH   )r   r]  r^  r   r  r  Zcheck_save_pre_patch_donerI   r   r   r   _check_save_pre_patch_done[  s   

z*TheOnePSRuntime._check_save_pre_patch_donec           	      C   sV   t | jd}g }| D ]\}}|d |vrtd | j||| || q|S )NTr   z,varname is not in distributed_varnames, pass)rh  r   r  r   r   r  load_one_tabler   )	rJ   rk  r   r.   r,  rm  r  r   r  r   r   r   _load_sparse_params`  s   
z#TheOnePSRuntime._load_sparse_paramsc                 C   s2  |d u r	| j d n|}t| jt|\}}}| j| }td| t|tr)tdt	| jd| j
d}| ||||}t	| j| j
d}	t| j| j
| jd}
g }|	 D ]	\}}|| qR|}ttt|| }| |}dd l}|D ]}|j|vr}qu|tj||j}||| qu| |||
|	 d S )Nr   zload inference model scope idx:r  Fr  r   rS  )r   r   r   r   r>  r;   r_  r   r  rX  r   r  r   r1  r  r   r`  r  r  r  r8   rz  r[  r9   loadr(  ri  r   	set_valuerJ  )rJ   rk  r,  r.   rA  r0   rB  r  rl  rb  r   Zrecv_dense_varnamesr  Zloaded_varnamesr  ry  r[  rC   r   r   r   r   "_ps_inference_load_inference_modeln  sZ   





z2TheOnePSRuntime._ps_inference_load_inference_modelc                 C   2   t j  | j r| j||| t j  d S rH   )r   r]  r^  r   r  r  r  rJ   r   ri  r,  r   r   r   _save_one_table     

zTheOnePSRuntime._save_one_tablec                 O   2   t j  | j r| j|i | t j  d S rH   )r   r]  r^  r   r  r  rJ   argsrQ  r   r   r   _save_dense_params  r  z"TheOnePSRuntime._save_dense_paramsc                 O   s<   t j  | jd s| j r| j|i | t j  d S r  )r   r]  r^  r   r   r  r  r  r   r   r   _save_persistables  s   
z"TheOnePSRuntime._save_persistablesc                 O   r  rH   )r   r]  r^  r   r  r  r  r   r   r   _save_inference_model  r  z%TheOnePSRuntime._save_inference_modelc                 C   r  rH   )r   r]  r^  r   r  r  r  r  r   r   r   _load_one_table  r  zTheOnePSRuntime._load_one_tablec                 C   s:   t j  | jd s| j r| j|| t j  d S r  )r   r]  r^  r   r   r  r  Z
load_modelrJ   ri  r,  r   r   r   _load_persistables  s   
z"TheOnePSRuntime._load_persistablesc                 C   s.   t j  | j r| || t j  d S rH   )r   r]  r^  r   r  r  r  r   r   r   _load_inference_model  s   

z%TheOnePSRuntime._load_inference_modelc                 C   sx   |d ur
t d nd}tj  | jd s| j r5t| jd| jj	d}|
 D ]\}}| j|| q)tj  d S )NznThe param threshold is not used in MemorySparseTable, if you need to shrink, please set the config of accessorr   rd   Fr  )r   r   r   r]  r^  r   r   r  rX  r+  r  r  Zshrink_sparse_table)rJ   	thresholdr  r   r  r   r   r   _shrink  s   
zTheOnePSRuntime._shrinkrH   )NN)Nr   r  )r   N)$rS   rT   rU   rK   r:  rD  rG  rJ  rL  re  rf  rg  ro  rp  rr  staticmethodr  rz  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r`   r   r   r\   r   r    sP    
B
 
!

1


#
Y
<r  )/r(  r   Zgoogle.protobufr   r[  Zpaddle.distributedr   Zpaddle.distributed.communicatorr   r   Z5paddle.distributed.fleet.base.private_helper_functionr   Zpaddle.distributed.fleet.protor   Z-paddle.distributed.fleet.runtime.runtime_baser   Z!paddle.distributed.ps.coordinatorr	   Z"paddle.distributed.ps.utils.publicZpaddle.frameworkr   Zpaddle.staticr   r   r   __all__r   r3   rD   rF   rX   rb   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r  r   r   r   r   <module>   sL   
*	   >#m#$u