o
    "j                     @   s,  d dl Z d dlZd dlmZ d dlmZ d dlmZ d dlm	Z	 d dl
mZ ddlmZ d	d
lmZ g Zdd ZdZdd Zdd Zdd ZG dd dZG dd dZG dd dZG dd dZG dd dZG dd dZG d d! d!ZG d"d# d#ZG d$d% d%ZG d&d' d'ZG d(d) d)eZ dS )*    N)base)core)CompiledProgram)Executor)Program   )wait_server_ready   )RuntimeBasec                 C   s   d dg|  S )N  )join)indent r   l/var/www/html/Deteccion_Ine/venv/lib/python3.10/site-packages/paddle/distributed/fleet/runtime/the_one_ps.pyconv_indent   s   r   z.shardc                 C   s   ddl m}m} | jD ]6}||s||sq|dd }|| kr(|jdks-|jdkrC|dr@|ddkr@|d  S  dS qd S )	Nr   is_distributed_sparse_opis_sparse_opWlookup_tablelookup_table_v2table_classnoneMemorySparseTable)	<paddle.incubate.distributed.fleet.parameter_server.ir.publicr   r   global_blockopsinputtypeZhas_attrattr)varnameo_main_programr   r   op
param_namer   r   r   parse_table_class%   s   

r%   c                 C   s  d}|  D ]}|j|kr|jd } nq| dsd| _| ds%|| _| ds/|d | _| ds7d| _| j}|d	sBd
|_	|dsJd|_
|dsRd|_|dsZd|_|dsbd|_|dsjd|_|dsrd|_|dszd|_|dsd|_| j| jfD ]}|dsd|_|jdks|jdkr|jdsd|j_|jdsd|j_|jdsd|j_t|jjdkr|jjdd g |jd!kr|jdsd|j_|jdsd|j_t|jjdkr|jjdd g |jd"krD|jdsd#|j_|jdsd|j_|jd$sd%|j_|jd&s'd'|j_|jd(s2d)|j_t|jjdkrD|jjdd g qd S )*Nr   r	   accessor_classZCtrCommonAccessorfea_dim
embedx_dim   embedx_thresholdnonclk_coeffg?click_coeffg      ?base_thresholddelta_thresholddelta_keep_days   show_click_decay_ratedelete_thresholddelete_after_unseen_days   ssd_unseenday_thresholdnameZSparseAdaGradSGDRuleZStdAdaGradSGDRulelearning_rateg?initial_g2sumg      @initial_rangeg-C6?g      $g      $@ZSparseNaiveSGDRuleZSparseAdamSGDRulegMbP?beta1_decay_rateg?beta2_decay_rateg+?ada_epsilong:0yE>) 	list_varsr6   shapeHasFieldr&   r'   r(   r*   ctr_accessor_paramr+   r,   r-   r.   r/   r1   r2   r3   r5   Zembed_sgd_paramZembedx_sgd_paramZadagradr7   r8   r9   lenZweight_boundsextendZnaiveadamr:   r;   r<   )accessorr!   r"   embedding_dimvarr@   Z	sgd_paramr   r   r   get_default_accessor_proto<   s   



















rG   c                 C   sr   d}|  D ]}|j|kr|jd } nq| j}||kr$td||| j}||d kr7td|d |d S )Nr   r	   zEThe fea_dim is wrong, it will be sparse_embedding_dim: {}, but got {}r)   zLThe embedx_dim is wrong, it will be sparse_embedding_dim - 3: {}, but got {})r=   r6   r>   r'   
ValueErrorformatr(   )rD   r!   r"   rE   rF   r'   r(   r   r   r   check_embedding_dim   s*   

rJ   c                   @      e Zd Zdd Zdd ZdS )Accessorc                 C   s"   d| _ d | _d| _d| _d | _d S )Nr   )r&   	optimizerfeature_dimrE   selfr   r   r   __init__   
   
zAccessor.__init__c                 C   sv   d}d}|d| j  d7 }|d| j d7 }|d| j d7 }|d7 }| jd ur0|| j|7 }|t||t|S )	N{}accessor {{{}
{}}}r   zaccessor_class: "" z	fea_dim: r   zembedx_dim: 
)r&   rO   rE   rN   	to_stringrI   r   )rQ   r   accessor_strattrsr   r   r   rW      s   
zAccessor.to_stringN__name__
__module____qualname__rR   rW   r   r   r   r   rL          rL   c                   @   sD   e Zd Zdd Zdd Zdd Zdd Zd	d
 Zdd Zdd Z	dS )CommonAccessorc                 C   s`   d| _ d | _d | _g | _g | _g | _d| _d| _d | _d | _	g | _
i | _i | _i | _|   d S )Nr   r   false)r&   
table_nameentryrY   paramsdimstrainer_numsync	table_num	table_diminitializersopt_input_mapopt_attr_mapopt_init_mapdefine_optimize_maprP   r   r   r   rR      s   zCommonAccessor.__init__c                 C   s   i }ddg|d< g d|d< g d|d< dg|d< g d	|d
< i }g |d< g |d< g |d
< g d|d< g d|d< i }g d|d< dg|d< g d|d< g d|d< || _ || _|| _d S )NParamNLearningRater	   sgd)rn   )ZMoment1N)ZMoment2N)ZBeta1Powr	   )ZBeta2Powr	   rp   rC   )rn   )ZD2SumN)G2SumN)ZMomentN)MomentDecayRater	   )AdaDecayRater	   )
AdaEpsilonr	   rp   
adam_d2sumsum)rn   )rs   r	   rp   naive_adagrad))Zbeta1f)Zbeta2rz   )epsilonrz   )seedmeanZstdZgaussian_randomvalueZfill_constant)r|   minmaxZuniform_randomZtruncated_gaussian_random)rk   rj   rl   )rQ   rj   rk   rl   r   r   r   rm      s(   



z"CommonAccessor.define_optimize_mapc                 C   s   ddl m}m} | jD ]3}||s||sq|dd }||kr1|jdkr1|d| _ d S ||kr@|jdkr@d| _ d S qd S )Nr   r   r   r   rb   r   r   )	r   r   r   r   r   r   r   r    rb   )rQ   r!   r"   r   r   r#   r$   r   r   r   parse_entry   s   zCommonAccessor.parse_entryc                 C   s@   t || d }||d  |kr|S || |k r|||  S dS )Nr	   r   int)rQ   Z	total_dim	shard_num
pserver_id	blocksizer   r   r   	get_shard  s   zCommonAccessor.get_shardc           	      C   s   d}d}|}|  jD ]2}|j| j v r=||dd kr=|jg}| j|j D ]}|t|| q(|	|} |S q|S )N&r   ZOutr   )
r   r   r   rl   keysoutputappendstrr    r   )	rQ   Z
value_nameZo_startup_programZl_inZattr_strZorigin_var_namer#   Z	init_attrr    r   r   r   get_initializer_attr  s   
 z#CommonAccessor.get_initializer_attrc              	   C   sP  ddl m} |j| }| \}	}
| }t| }||	}d }|D ]}d|jv r7|dd |kr7|} nq#|d u rCt	d| g }g }g }g }|
 | _|| _|| _|jdkrc|rctd d}td| | rz| jd	 }| jd	 }d	| _n5|jr|r| jd
 }| jd
 }d| _n"|r|s| jd }| jd }d| _n| j|j }| j|j }|j| _|D ]\}}|| | jdkr'|d u r|r|}n| |||}|| |dks|dkr|	 j||d  }|dkr|jdkrtd |	 jd }| |j|
}n|dkrd}n|dkrd}n
|dkrd}nd}|| q|dkr9|d d}|| q|	 j||d  }|dkr\|jdkr\td |	 jd }|d u rn|rg|}n| |||}|| | |j|
}|| q|D ]\}}||}|d||t|g q|| _ || _!|| _"|| _#d S )Nr   )_get_optimize_opsro   zcan not find optimizer for rC   z8optimization algorithm is not adam, set adam_d2sum FalseFzadam_d2sum:rx   ry   rr   rw   rq   learning_rate_0zwill support decay soonrt   zfill_constant&0.99ru   zfill_constant&0.9999rv   zfill_constant&1.0e-8zfill_constant&0rs   r	   r   )$r   r   grad_name_to_param_nameZget_origin_programsget_role_idrA   get_ps_endpointsZinput_namesr   rH   get_trainersre   rg   rh   r   printis_geo_moderj   rk   r&   
use_ps_gpur   r   r   varsr6   warningswarnr   r    r   r   rc   rd   ri   rY   )rQ   Z	grad_name	is_sparsesizeZ
single_dimcompiled_strategyrw   r   r$   main_programstartup_programr   Zpserver_numZoptimizer_opsZoopr#   rc   rd   rY   ri   Zparam_varnamesZattr_varnamesZformal_namer>   paraminitializerZattr_varnametype_r~   r   r   r   parse_by_optimizer)  s   	

























z!CommonAccessor.parse_by_optimizerc                 C   s  d}d}|d| j  d7 }| jr|d| j d7 }| jr%|d| j d7 }|d| j d7 }|d	| j d7 }| jrC|d
| j d7 }| jrO|d| j d7 }| jD ]
}|d| d7 }qR| jD ]
}|d| d7 }q`| j	D ]
}|d| d7 }qn|d7 }|
t||t|S )Nz{}common {{{}
{}}}r   zname: "rU   ztable_name: "zentry: "ztrainer_num: r   zsync: ztable_num: ztable_dim: z	params: "zdims: zinitializers: "rV   )r&   ra   rb   re   rf   rg   rh   rc   rd   ri   rI   r   )rQ   r   rX   rY   r   dimr   r   r   r   rW     s.   


zCommonAccessor.to_stringN)
r[   r\   r]   rR   rm   r   r   r   r   rW   r   r   r   r   r_      s    5 r_   c                   @   rK   )Tensorc                 C   s"   d | _ d | _d | _d | _d| _d S )NF)main_program_idstartup_program_idfeed_var_namefetch_var_nametensor_table_classrP   r   r   r   rR     rS   zTensor.__init__c                 C   s   d}d}|dt | j d7 }|dt | j d7 }|dt | j d7 }|dt | j d7 }|d	t | j d7 }|d
7 }|t||t|S )Nz{}tensor {{{}
{}}}r   zfeed_var_name: "rU   zfetch_var_name: "zstartup_program_id: r   zmain_program_id: ztensor_table_class: "rV   )r   r   r   r   r   r   rI   r   )rQ   r   Zprogram_strrY   r   r   r   rW     s   zTensor.to_stringNrZ   r   r   r   r   r     r^   r   c                   @   rK   )Tablec                 C   s4   d| _ d | _d| _d | _d | _d | _d | _d | _d S )NrM   )idr   r   r   rD   commontensoraccessor_protorP   r   r   r   rR     s   
zTable.__init__c                 C   s  d}d}|d| j  d7 }|d| j d7 }|d| j d7 }|d| j 7 }|d	7 }|d
7 }| jd urId}|t|| jt|}||d	 7 }n| jd urZ|| j|7 }|d	7 }| j	d urk|| j	|7 }|d	7 }| j
d ur||| j
|7 }|d	7 }|t||t|S )Nz {}downpour_table_param {{{}
{}}}r   z
table_id: r   ztable_class: "rU   zshard_num: ztype: rV   r   rT   )r   r   r   r   r   rI   r   rD   rW   r   r   )rQ   r   Z	table_strrY   rX   r   r   r   rW     s0   



zTable.to_stringNrZ   r   r   r   r   r     s    
r   c                   @   rK   )Servicec                 C   s"   d| _ d| _d| _d| _d| _d S )NZBrpcPsServerZBrpcPsClientZBrpcPsServicer      )server_classclient_classservice_classstart_server_portserver_thread_numrP   r   r   r   rR     rS   zService.__init__c                 C   sx   d}d}|d| j  d7 }|d| j d7 }|d| j d7 }|d| j d7 }|d	| j d7 }|t||t|S )
Nz{}service_param {{{}
{}}}r   zserver_class: "rU   zclient_class: "zservice_class: "zstart_server_port: r   zserver_thread_num: )r   r   r   r   r   rI   r   )rQ   r   Zservice_strrY   r   r   r   rW   "  s   zService.to_stringNrZ   r   r   r   r   r     r^   r   c                   @   s,   e Zd Zdd Zdd Zdd Zdd Zd	S )
DownpourServerc                 C   s   d | _ g | _d S N)servicetablesrP   r   r   r   rR   2  s   
zDownpourServer.__init__c                 C   s
   || _ d S r   )r   )rQ   r   r   r   r   set_service_param6     
z DownpourServer.set_service_paramc                 C   "   t |ts	td| j| d S Nzonly support instance Table
isinstancer   rH   r   r   rQ   tabler   r   r   append_tables9     
zDownpourServer.append_tablesc                 C   s`   d}d}|d7 }|d7 }|| j |7 }| jD ]}|d7 }|||7 }q|t||t|S )Nz!{}downpour_server_param {{{}
{}}}r   r   rV   )r   rW   r   rI   r   )rQ   r   
server_str
table_strsr   r   r   r   rW   >  s   
zDownpourServer.to_stringN)r[   r\   r]   rR   r   r   rW   r   r   r   r   r   1  s
    r   c                   @   $   e Zd Zdd Zdd Zdd ZdS )Serverc                 C   
   g | _ d S r   )serversrP   r   r   r   rR   P  r   zServer.__init__c                 C   r   )Nz$only support instance DownpourServer)r   r   rH   r   r   )rQ   serverr   r   r   
add_serverS  r   zServer.add_serverc                 C   8   d}d}d}| j D ]}|d7 }|||7 }q	||S )Nzserver_param {{{}
}}r   r   rV   )r   rW   rI   )rQ   r   r   Zservers_strr   r   r   r   __str__X     

zServer.__str__N)r[   r\   r]   rR   r   r   r   r   r   r   r   O      r   c                   @   r   )DownpourWorkerc                 C   r   r   )r   rP   r   r   r   rR   d  r   zDownpourWorker.__init__c                 C   r   r   r   r   r   r   r   r   g  r   zDownpourWorker.append_tablesc                 C   sH   d}d}|d7 }| j D ]}|d7 }|||7 }q|t||t|S )Nz!{}downpour_worker_param {{{}
{}}}r   r   rV   )r   rW   rI   r   )rQ   r   
worker_strr   r   r   r   r   rW   l  s   
zDownpourWorker.to_stringN)r[   r\   r]   rR   r   rW   r   r   r   r   r   c  r   r   c                   @   r   )Workerc                 C   r   r   )workersrP   r   r   r   rR   z  r   zWorker.__init__c                 C   r   )Nz$only support instance DownpourWorker)r   r   rH   r   r   )rQ   workerr   r   r   
add_worker}  r   zWorker.add_workerc                 C   r   )Nzworker_param {{{}
}}r   r   rV   )r   rW   rI   )rQ   r   r   Zworkers_strr   r   r   r   r     r   zWorker.__str__N)r[   r\   r]   rR   r   r   r   r   r   r   r   y  r   r   c                   @   rK   )fsClientc                 C   s*   || _ |j| _|j| _|j| _|j| _d S r   )protouriuserpasswdZ
hadoop_bin)rQ   r   r   r   r   rR     s
   zfsClient.__init__c                 C   s.   ddl m} || j}|rd}||S dS )Nr   text_formatzfs_client_param {{
{}}}r   )google.protobufr   MessageToStringr   rI   )rQ   r   	proto_txtZfs_strr   r   r   rW     s   
zfsClient.to_stringNrZ   r   r   r   r   r     r^   r   c                       s   e Zd Z fddZdd Zdd Zdd Zd	d
 Zde	 fddZ
dd Zdd Zd5ddZdd Zdd Zeg fddZdd Zdd Z	d6d d!Z	d7d"d#Z		$	d8d%d&Zd'd( Zd)d* Zd+d, Z	d9d-d.Zd/d0 Zd1d2 Zd:d3d4Z  ZS );TheOnePSRuntimec                    s2   t    d | _d | _tj | _g | _d | _	d S r   )
superrR   _communicator_serverr   r   DistFleetWrapper_worker_server_sub_program_heter_clientrP   	__class__r   r   rR     s   

zTheOnePSRuntime.__init__c                 C   s<   || _ |d | _|d | _|d | _|  | _|  | _d S )N
role_makerorigin_main_programorigin_startup_program)contextr   r   r   _get_distributed_strategyasync_strategybuild_compiled_startegyr   )rQ   r   r   r   r   _set_basic_info  s   



zTheOnePSRuntime._set_basic_infoc                 C   s   d }ddl m} | jd }|jd }|js|dkr| }|jr(|dkr(| }|jr4|dkr4||}|s:td|jd rBd|_	|S )Nr   )StrategyFactoryvalid_strategyk_stepsz+k_steps must be invalid value, please checkr   T)
]paddle.incubate.distributed.fleet.parameter_server.distribute_transpiler.distributed_strategyr   r   a_sync_configsZa_syncZcreate_sync_strategyZcreate_async_strategyZcreate_geo_strategyrH   r   )rQ   Zstrategyr   dist_strategyr   r   r   r   r     s   



z)TheOnePSRuntime._get_distributed_strategyc                 C   s4   ddl m} || j| j| j| j}| jjrd|_|S )Nr   )CompileTimeStrategyT)r   r   r   r   r   r   )rQ   r   Zcompiled_configr   r   r   r     s   z'TheOnePSRuntime.build_compiled_startegyc           &         s  ddl m}  j } jd|d} jd|d} jd }|jd }|rI jd jj}|j	s2i |_	d|j	d< t
d	}d
d |dD |j	d<  fdd}	t|d t| }
tdd}||
 W d    n1 snw   Y  ttt
dd}|rtd|
   j }g }t|D ]\}}|d\}}tj|t||}||  q jj jjd} jj jj jj|d} j }ttt
dd}|rtd|
  td |D ]}t| d||   q|D ]}t| d||   qi }d|d<  j  |d<  j! |d< |j"d j#D ]}|j$dkr*|j%|d<  nqt& j|r;|	 }|'| dd l(m)}m*} ||j+||,  _- j-.|||
|t/  dd!l0m1} |j23   j-4 }t&|t5rt6|dkr j7|d }t&|t5st89d" |g} j-:|  j-;  td# ntd$  jd }ttt
d%d}  j< rÈ jjrÈ jjddd&}!n|}!| sӈ j-=|! |j23   j->|! |j23   j-? s j-@  nt89d' |jd( }"tt
d)d*}#|"rL|#rNtA jB   jjr jC g krtA jC   jjrPg }$ jD g kr1 jD }$g }% jC g kr@ jC }%||%|$ j   _Ed S d S d S d S )+Nr   )SyncStrategyF	is_serveris_syncTr   r   ZlossFLAGS_selected_gpusc                 S   s   g | ]}t |qS r   r   ).0sr   r   r   
<listcomp>  s    z0TheOnePSRuntime._init_worker.<locals>.<listcomp>,Zworker_placesc                     s$   i }  j  | d<  j  | d< | S )NZpserver_endpoints
trainer_id)r   _get_pserver_endpointsZ_worker_index)kwargsrP   r   r   sync_strategy_envs  s   z8TheOnePSRuntime._init_worker.<locals>.sync_strategy_envsrV   r   wPSERVER_DEBUG0z	worker: 
:)split_dense_table)r
  use_origin_programZep_listzcommunicator send_ctx:z: Zneed_global_stepr  trainersBarrierTableZbarrier_table_id)CommunicatorHeterClientfleetz!gloo may not initialize correctlyzcreate c2c connection donezcannot create c2c connectionZ	TEST_MODE)r
  r  z'communicator has been initialized, skiplaunch_barrierZFLAGS_LAUNCH_BARRIER1)Fr   r   r   is_sync_mode_get_fleet_protor   r   blockprogramZ
_fleet_optosgetenvsplitr   openwriteboolr   r   r   	enumerater   r   PSHostr   serialize_to_stringget_the_one_recv_contextr   _is_heter_parameter_server_modeget_the_one_send_contextr   Zget_trainer_runtime_config_role_idZ_worker_numr   r   r   r   r   updateZpaddle.distributed.communicatorr  r  modeZget_communicator_flagsr   Zinit_with_ctxglobal_scopepaddle.distributedr  utilbarrierZget_client_infolistrA   Z_all_gatherr   r   Zset_clientsZ"create_client_to_client_connection_is_first_workerinit_params
pull_denseZ
is_runningstartr   r  Z_get_next_trainersZ_get_previous_trainersr   )&rQ   r   r   r   r   r   r   r   Zgpus_envr  r   rz   debug	endpointsstring_hostsidxephostportpshostZ	dense_mapsend_ctxZtrainer_configkeyr  r   Zsync_kwargsr  r  r  infoZall_infoZis_testr-  r  Zlaunch_barrier_flagZprevious_trainersZnext_trainersr   rP   r   _init_worker  s   






















zTheOnePSRuntime._init_workerrM   c                 C   s   | j ||| d S r   )r   Zpush_sparse_param)rQ   var_nametable_idscoper   r   r   _push_sparse_paramw  s   z"TheOnePSRuntime._push_sparse_paramc                 C   s   t t  }| jjrF| j rF| j  }|dvr"td| |dkr5tt 	t
tdd}|S |dkrFtt t
tdd}|S )N)GPUXPUZCPUz Heter Worker Not Support Device r@  r   r  rA  ZFLAGS_selected_xpus)r   r   ZCPUPlacer   r"  Z_is_heter_workerZ_heter_device_typeupperrH   Z	CUDAPlacer   r  r  ZXPUPlace)rQ   executorZheter_device_typer   r   r   _get_executor|  s,   
zTheOnePSRuntime._get_executorc                    s   dd fddfddfdd  fd	d
}|rPt  }t }t }jd }|jd }	|	r>d|_d|_|| | }
|
|_|	| |S t
 }t }| }
|
|_|| |S )Nc                 S   sR   t  }d|_d |_|  r|  d |_|  d |_|S |  d |_d|_|S )NCommMergeAccessorr   r	   )rL   r&   rN   r   sectionsrO   rE   )ctxrD   r   r   r   _build_merge_accessor  s   z?TheOnePSRuntime._get_fleet_proto.<locals>._build_merge_accessorc                    s   t  }| |_d|_d|_d|_t }d|_d |_d|_d|_	||_
t }d|_ j } jjr9|t j 7 }||_d|_g |_g |_||_|S )NPS_OTHER_TABLEr     rE  r   barrier_tabler   )r   r   r   r   r   rL   r&   rN   rO   rE   rD   r_   ra   r   r   r   r"  rA   _get_heter_worker_endpointsre   rY   rd   rc   r   )r3  r   rD   r   re   rP   r   r   _build_barrier_table  s0   
z>TheOnePSRuntime._get_fleet_proto.<locals>._build_barrier_tablec                    s   t  }| |_d|_|d |_d|_t }d|_d |_d|_d|_	||_
t }|d |_ j |_d|_g |_g |_||_t }|d |_|d	 |_|d |_|d
 |_|d |_||_|S )NrI  r   rJ  rE  r   r   r   r   r   r   )r   r   r   r   r   rL   r&   rN   rO   rE   rD   r_   ra   r   r   re   rY   rd   rc   r   r   r   r   r   r   r   r   )r3  Ztensor_dictr   rD   r   r   rP   r   r   _build_tensor_table  s4   






z=TheOnePSRuntime._get_fleet_proto.<locals>._build_tensor_tablec                    s   j  }d}|D ]J}|| d d ur(||| d< j|| d j |d7 }|| d d urE||| d< j|| d j |d7 } t| || }| | q	| S )Nr   r   r   r	   r   r   )r   get_tensor_table_dictr   r   descrA   )r   tensor_table_dictZprogram_idxra   Z	new_table)rN  rQ   r   r   _add_tensor_table  s4   
z;TheOnePSRuntime._get_fleet_proto.<locals>._add_tensor_tablec               	      s  j jdjjd} g }t|  D ]\}\}}| s%t| dk r&qt	 }|
 |_t }| rd|_d|_j j| d  |_j  rOd|_njd j}| }|D ]}	|	j|jkrg|	} nq[|d	rr|j|_nt|jj|_|jd
krd
|_td |dr|j|_nd|_td |j dkrtd t|j|jj t|j|jj ddl m!}
 |
"|j|_#nd|_d|_d|_d|_jd j$}|%| d | |& d | r|& d ndj | | r|'|jj r
d|_(nd|_(||_)|jd
kr|}||_|*| qj + }t|dkr4 |}n
t, }j-*|j. t|}|*| |S )NT)r  r
  r	   PS_SPARSE_TABLErJ  r   ZMemorySparseGeoTableuser_defined_strategyr   r   z'The PS mode must use MemorySparseTable.r   i  zAThe shard_num of sparse table is not set, use default value 1000.z;The accessor of sparse table is not set, use default value.r   ZPS_DENSE_TABLEZMemoryDenseTableZMergedDensetruer`   )/r   r#  r   r"  r  itemsZis_tensor_tablerA   Zorigin_varnamesr   r=  r   r_   r   r   r   r   ra   r   r   r   Zsparse_table_configsaddr?   r%   r   r   r   rD   ZByteSizerG   rJ   r   r   r   r   rw   r   rF  r   rf   r   r   rO  r   r   rP  )r8  r   r3  r6   rG  r   r   Zall_table_protoZtable_protor   r   rw   rD   rQ  Zempty_porgramrK  )rR  rM  rH  r   rQ   r   r   _get_tables  s   










	


z5TheOnePSRuntime._get_fleet_proto.<locals>._get_tablesr   r   ZPsLocalServerZPsLocalClient)r   r   r   r   r   r   r   r   r   r   r   r   r   )rQ   r   r   r  rX  r   Zdownpour_serverr   r   r   r   r   Zdownpour_workerr   )rR  rM  rH  rN  r   rQ   r   r    s2    v




z TheOnePSRuntime._get_fleet_protoNc                 K   s  | j  }| j  }| j  }| j  }| jjr!|t| j 7 }| j	d|d}t
|}	t| jd j}
|	d |
  }	tttdd}|rOtd|	  g }t|D ]\}}|d\}}tj|t||}||  qUtj | _| j|	|||| j d	d
lm } || j!d}|| j!d}|| }|d u r|}n|D ]}||vrt"d#|q|}|d u s|sd S i }|j$d	 j%D ]}|j&dkr|j'd ur|j(||j'j)< qtj*+|}| j, }|D ]}|| }| j-|d| qd S )NTr   rT  rV   r  r  z	server: 
r	  r   get_sparse_tablenamesFz6fleet.init server can only load sparse variables in {}rS  ).r   r   r   r  r   r   r"  rA   rL  r  r   r   r   Zfs_client_paramrW   r  r   r  r  r   r  r  r   r   r  r   r   r   r   Zinit_serverr   r   rZ  r   rH   rI   r   r   r   r   r   ra   pathnormpathr$  Zload_sparse)rQ   dirnameZ	var_namesr  Zrole_idr1  r   r  r   r   Z	fs_clientr0  r2  r3  r4  r5  r6  r7  rZ  Zdist_varnamessparse_varnamesdistributed_varnamesZload_varnamesr<  Zsparse_table_mapsr   r   r=  r   r   r   _init_server  sh   





zTheOnePSRuntime._init_serverc                 C   s.   | j  }|d\}}| j|t| d S )Nr	  )r   Zget_ps_endpointr  r   Z
run_serverr   )rQ   r4  r5  r6  r   r   r   _run_server  s   
zTheOnePSRuntime._run_serverc                 C   s6   | j   | jjr| jd usJ d| j  d S d S )Nz/heter client should not be None in heterps mode)r   stopr   r"  r   rP   r   r   r   _stop_worker  s   
zTheOnePSRuntime._stop_workerc                    s    fdd}|S )Nc                    s   | j  v rdS ddlm} || j \}}}|drdS |dkr"dS | j tjjj	ks@| j tjjj
ks@| j tjjjkrBdS | jS )NFr   )_get_varname_partsz@GRADr   )r6   r   rd  endswithrP  r   r   ZVarDescZVarTypeZFEED_MINIBATCHZ
FETCH_LISTZREADERZpersistable)rF   rd  Zorigin_varname_exclude_var_namesr   r   is_valid  s   

z0TheOnePSRuntime.__exclude_vars.<locals>.is_validr   )rh  ri  r   rg  r   Z__exclude_vars  s   zTheOnePSRuntime.__exclude_varsc                 C   s.   | ds
| drd}|S tj|d}|S )Nafs:hdfs:./dnn_plugin
dnn_plugin)
startswithr  r[  r   )rQ   r]  
model_pathr   r   r   _get_inference_model_path  s
   z)TheOnePSRuntime._get_inference_model_pathc                 C   s   ddl m} || jjd}g }| |}	| D ]%\}
}|d |vr0z	| j|
|	 W n   Y | j|
|| |	| q|S )Nr   rY  T)
r   rZ  r   r   rp  rV  r   Zrecv_and_save_modelZsave_one_modelrB   )rQ   rC  r]  r   r   r&  rZ  r_  valuesro  r   namesr   r   r   _save_sparse_params  s   
z#TheOnePSRuntime._save_sparse_paramsr   c                 C   s   | j jd| jjdd}| j jd| jjdd}| |||||}g }| D ]	\}	}
||
 q%| j| |}t	t
t|| }dd l}|D ]}| }|j|tj||jdd qId S )NTZis_denser
  r  Fr   Zuse_binary_format)r   r!  r   r"  rs  rV  rB   r   r.  r+  filterr   _TheOnePSRuntime__exclude_varsr=   paddle	get_valuesaver  r[  r   r6   )rQ   rC  r]  r   r&  densessparsesr^  recv_dense_varnamesr   rr  Zsaved_varnamesremaining_varsrx  rF   r   r   r   r   _save_distributed_persistables  s>   
z.TheOnePSRuntime._save_distributed_persistablesc                 K   sH   t |ts	td|du r| j }t |trtd| j|| dS )a  
        This function filters out all variables with `persistable==True` from the
        give `main_program` and then saves these variables to the folder `dirname`
        or file `filename`.

        The `dirname` is used to specify the folder where persistable variables
        are going to be saved. If you would like to save variables in separate
        files, set `filename` None; if you would like to save all variables in a
        single file, use `filename` to specify the file name.
        ;in fleet.save() function, executor must be as Executor typeN^in fleet.save() function, main_program must be as Program type, CompiledProgram is not allowed)r   r   	TypeErrorr   get_origin_ps_main_programr   r   Zsave_all_model)rQ   rC  r]  r   r&  r  r   r   r   _ps_inference_save_persistablesA  s   


z/TheOnePSRuntime._ps_inference_save_persistablesTc                    s>  t |ts	tdddl}|du r| jn| t  trtd fdd|D }	|j |	|}
|
  | 	|}d}t
j||}||
| | jjd| jjd	d
}| |||||}| jjd	| jjd	d
}| j| | jd jd }t|}ttt||
 }|D ]}| }|j|t
j||jd	d qdS )z
        Prune the given `main_program` to build a new program especially for inference,
        and then save it and all related parameters to given `dirname` by the `executor`.
        r  r   Nr  c                    s   g | ]	}   |qS r   )r   rF   )r   r6   r  r   r   r     s    zFTheOnePSRuntime._ps_inference_save_inference_model.<locals>.<listcomp>Z	__model__FTrt  rT  Zstat_var_namesru  )r   r   r  rx  r   r   ZstaticZnormalize_programZ_copy_dist_param_info_fromrp  r  r[  r   rz  r   r!  r   r"  rs  r   r.  r   Ztrainer_desc_configsr+  rv  r   rw  r=   ry  r6   )rQ   rC  r]  Zfeeded_var_namesZtarget_varsr   Zexport_for_deploymentr&  rx  Z	feed_varsZinfer_programro  Zmodel_basenamer|  Zsparse_namesr{  Zgenerate_varsr~  rF   r   r   r  r   "_ps_inference_save_inference_modela  sp   





z2TheOnePSRuntime._ps_inference_save_inference_modelc                 O      | j |i | d S r   )r  rQ   argsr  r   r   r   _save_inference_model     z%TheOnePSRuntime._save_inference_modelc                 O   r  r   )r  r  r   r   r   _save_persistables  r  z"TheOnePSRuntime._save_persistablesc           
      C   sd   ddl m} || jjd}g }| D ]\}}	|	d |vr"td | j||| |	|	 q|S )Nr   rY  Tz,varname is not in distributed_varnames, pass)
r   rZ  r   r   rV  r   r   r   Zload_one_tablerB   )
rQ   r]  r   r   r&  rZ  r_  rq  r   rr  r   r   r   _load_sparse_params  s   
z#TheOnePSRuntime._load_sparse_paramsc                 C   s  |d u r	| j  }t|trtd| j jd| jjdd}| j jd| jjdd}| ||||}g }|	 D ]	\}}	|
|	 q6|}
ttt|
| }|dsX|dr[d}ntj|d}d	d l}|D ]}|j|vrpqh|tj||j}|| qh| j| d S )
Nr  Trt  Frj  rk  rl  rm  r   )r   r  r   r   r  r!  r   r"  r  rV  rB   r+  rv  r   rw  r=   rn  r  r[  r   rx  r6   load	set_valuer   r-  )rQ   r]  r&  r   r{  r|  r^  r}  r   rr  Zloaded_varnamesr~  ro  rx  rF   r   r   r   r   "_ps_inference_load_inference_model  sN   


z2TheOnePSRuntime._ps_inference_load_inference_modelc                 C   s   | j || d S r   )r   
load_modelrQ   r[  r&  r   r   r   _load_distributed_persistables   s   z.TheOnePSRuntime._load_distributed_persistablesc                 C   s0   |dks|dkr|  || d S | || d S )Nr   r)   )r  r  r  r   r   r   r    s   zTheOnePSRuntime.load_modelc                 C   s|   |d ur
t d nd}ddlm} |j  | j r7| jj	d| jj
dd}| D ]\}}| j|| q+|j  d S )NznThe param threshold is not used in MemorySparseTable, if you need to shrink, please set the config of accessorr   r  FTrt  )r   r   r(  r  r)  r*  r   r,  r   r!  r"  rV  r   Zshrink_sparse_table)rQ   	thresholdr  r|  r   rr  r   r   r   _shrink
  s    

zTheOnePSRuntime._shrink)NN)r   )Nr   )NTr   )r   Nr   )r[   r\   r]   rR   r   r   r   r;  r   r'  r?  rD  r  r`  ra  rc  staticmethodrw  rp  rs  r  r  r  r  r  r  r  r  r  r  __classcell__r   r   r   r   r     sF     
 
vE

+
&
R
5r   )!r  r   rx  r   Zpaddle.baser   Zpaddle.base.compilerr   Zpaddle.base.executorr   Zpaddle.base.frameworkr   Zbase.private_helper_functionr   Zruntime_baser
   __all__r   ZPSERVER_SAVE_SUFFIXr%   rG   rJ   rL   r_   r   r   r   r   r   r   r   r   r   r   r   r   r   <module>   s8   I  %0