o
    "jtk                     @   st   d dl Z d dlZd dlZd dlmZ d dlmZmZmZm	Z	m
Z
mZ ddlmZ ddlmZ g ZG dd	 d	eZdS )
    N)core)CompiledProgramExecutorProgramVariabledefault_main_programdefault_startup_program   )wait_server_ready   )RuntimeBasec                       s   e Zd Z fddZdd Zdd Zdd Z		d1d
dZdd Ze	g fddZ
dd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zd d! Zd"d# Zd$d% Z	&d2d'd(Z			)	*d3d+d,Zd-d. Zd/d0 Z  ZS )4ParameterServerRuntimec                    s   t    d | _d S N)super__init___communicatorself	__class__ z/var/www/html/Deteccion_Ine/venv/lib/python3.10/site-packages/paddle/distributed/fleet/runtime/parameter_server_runtime.pyr   $   s   

zParameterServerRuntime.__init__c                 C   s<   || _ |d | _|d | _|d | _|  | _|  | _d S )N
role_makerorigin_main_programorigin_startup_program)contextr   r   r   _get_distributed_strategyasync_strategybuild_compiled_startegycompiled_strategy)r   r   r   r   r   _set_basic_info(   s   



z&ParameterServerRuntime._set_basic_infoc                 C   sx   d }ddl m} | jd }|jd }|js|dkr| }|jr(|dkr(| }|jr4|dkr4||}|s:td|S )Nr   )StrategyFactoryvalid_strategyk_stepsz+k_steps must be invalid value, please check)	]paddle.incubate.distributed.fleet.parameter_server.distribute_transpiler.distributed_strategyr!   r   a_sync_configsZa_syncZcreate_sync_strategyZcreate_async_strategyZcreate_geo_strategy
ValueError)r   Zstrategyr!   dist_strategyr#   r   r   r   r   0   s   


z0ParameterServerRuntime._get_distributed_strategyc                 C   s&   ddl m} || j| j| j| j}|S )Nr   )CompileTimeStrategy)<paddle.incubate.distributed.fleet.parameter_server.ir.publicr(   r   r   r   )r   r(   Zcompiled_configr   r   r   r   H   s   z.ParameterServerRuntime.build_compiled_startegyNc              
      s  t d usJ g }t }| } fdd}tt|t  }	|d u r&| j}ddlm	}
 |	D ]Y}t
|ts7J |
|j\}}}tjj||}tj||}tj|s_td|j d| tj|r|jdi d|gitj||| j | j |jd	d
 || q.|| d S )Nc                    s
   | j  v S r   )name)varvarnamesr   r   _in_varnames]   s   
z@ParameterServerRuntime._load_sparse_params.<locals>._in_varnamesr   _get_varname_partszSelectedRows var z can not find at Zsparse_tensor_loadOut)	file_pathZ
node_indexZnode_numshape)typeZinputsZoutputsattrs)varsr   global_blocklistfilterr   	list_varsr   r)   r0   
isinstancer   r*   paddlestaticioZ_clone_var_in_blockospathjoinexistsr&   isfile	append_opr   Z_server_indexZ_server_numr3   appendrun)r   executordirnamer-   main_programZ
check_varsZ	load_progZ
load_blockr.   	load_varsr0   Zeach_varorigin_varname_Znew_varZvar_pathr   r,   r   _load_sparse_paramsU   sB   z*ParameterServerRuntime._load_sparse_paramsc           
      C   sV   ddl m} ddlm} | }|D ]}||\}}}tj|||}	|||	 qd S )Nr   )LargeScaleKVr/   )paddle.distributed.communicatorrN   r)   r0   r?   r@   rA   load)
r   rH   r-   rN   r0   Zscale_kvvarnamerK   rL   Z
sparse_dirr   r   r   _load_distributed_params   s   z/ParameterServerRuntime._load_distributed_paramsc                    s    fdd}|S )Nc                    s   | j  v rdS ddlm} || j \}}}|drdS |dkr"dS | j tjjj	ks@| j tjjj
ks@| j tjjjkrBdS | jS )NFr   r/   z@GRADZlearning_rate_0)r*   r)   r0   endswithdescr4   r   ZVarDescZVarTypeZFEED_MINIBATCHZ
FETCH_LISTZREADERZpersistable)r+   r0   rK   rL   exclude_var_namesr   r   is_valid   s   

z7ParameterServerRuntime.__exclude_vars.<locals>.is_validr   )rV   rW   r   rU   r   Z__exclude_vars   s   z%ParameterServerRuntime.__exclude_varsc                    s~   fdd} fdd}ddl m}m} ddlm}m}  j }t|  j	d }|j
d	 }	|	rHt j   jjrH j rHt j  || j}
|
rVd
di}nd
di}t j|rh| }|| t j|rv| }|| |rz|nd } j } j r jjdd}n jjdd}ddlm} ||j||  _ j||  j s j  d S t !d d S )Nc                     s$   i }  j  | d<  j  | d< | S )NZpserver_endpoints
trainer_id)r   _get_pserver_endpoints_worker_index)kwargsr   r   r   sync_strategy_envs   s   z?ParameterServerRuntime._init_worker.<locals>.sync_strategy_envsc                     s:   ddl m   fdd} i }j |d< |  |d< |S )Nr   get_sparse_tablenamesc            
   	      s"  i } g d| d< dg| d< g d| d< g d| d<  j d} j d	}t|d
kr/tdg }|D ]X}j  j| }|ddd |jD g}j jD ]:}|j	| 
 v r||dd
 kr|j	g}| |j	 D ]}	|t||	 qk|d| |d|  nqPq3d|S )N)seedmeanZstdZgaussian_randomvalueZfill_constant)r_   minmaxZuniform_randomZtruncated_gaussian_randomTFr   z[GeoStrategy can not support large scale embeding now, please use paddle.static.nn.embedding,c                 S      g | ]}t |qS r   str).0dimr   r   r   
<listcomp>       zlParameterServerRuntime._init_worker.<locals>.geo_strategy_envs.<locals>.get_sparse_attrs.<locals>.<listcomp>r1   &:#)r   lenr&   r7   r6   rA   r3   r   opsr4   keysoutputrE   rg   attr)
Zopt_init_mapZdist_varnamessparse_varnamesZ
init_attrsZ
value_nameZ	value_varZ
value_attropZ	init_attrrs   )r^   r   r   r   get_sparse_attrs   sD   


zXParameterServerRuntime._init_worker.<locals>.geo_strategy_envs.<locals>.get_sparse_attrsZtrainersZsparse_attrs)r)   r^   r   Z_worker_num)rv   r[   r   r]   r   geo_strategy_envs   s   -
z>ParameterServerRuntime._init_worker.<locals>.geo_strategy_envsr   )GeoStrategySyncStrategy)_get_lr_ops_has_global_stepr"   launch_barrierZneed_global_step10   )	recv_typer   )Communicatorz'communicator has been initialized, skip)"r$   rx   ry   r)   rz   r{   r   Zget_trainer_runtime_configprintr   r%   r
   r   rY   _is_heter_parameter_server_modeZ
_is_workerZ_get_heter_worker_endpointsr   r;   updater   Zget_communicator_send_contextZis_geo_modeget_communicator_recv_contextrO   r   modeZget_communicator_flagsr   Zinit_with_ctxZ
is_runningstartwarningswarn)r   r\   rw   rx   ry   rz   r{   Ztrainer_configr'   r|   Zlrsr[   Z
geo_kwargsZsync_kwargsZsend_ctxZrecv_ctxr   r   r   r   _init_worker   sT   7








z#ParameterServerRuntime._init_workerc                 C   s   t t }| jjrH| jd jd  }|dvrtd| | j	 rH|dkr7t t
ttdd}|S |dkrHt tttd	d}|S )
Nr"   heter_worker_device_guard)GPUXPUZCPUz Heter Worker Not Support Device r   ZFLAGS_selected_gpusr~   r   ZFLAGS_selected_xpus)r   r<   ZCPUPlacer   r   r   r%   upperr&   _is_heter_workerZ	CUDAPlaceintr?   getenvZXPUPlace)r   rG   r   r   r   r   _get_executor,  s4   
z$ParameterServerRuntime._get_executorc                 O   sp  t |dkr
tdt |dkr|d }nd }|  }| j r/| jd jd r/t| j  |	t
  | j r@|   d S | jd}g }|D ]
}|| j|7 }qJtt|}| jd}g }	|D ]
}|	| j|7 }	qett|	}	ttt|| | |	 t  }
|sd S tj|std|tjj|t ||
d	 | j|||| d
 | j|||	 d d S )Nr   z-init server can only accept 1 args: `dirname`r   r"   r|   FTz There is no directory named '%s'rI   rH   r6   )rG   rH   r-   )rH   r-   )ro   r&   r   r   r   r   r%   r
   rY   rF   r   r   r   Zget_sparse_varname_on_psZget_optimize_varname_on_psr8   setr9   r   %_ParameterServerRuntime__exclude_varsr   r:   r?   r@   isdirr<   r=   rJ   rM   rR   )r   argsr[   Zmodel_dirnamerG   rt   Z sparse_related_optimize_varnamesvar_nameZdistribtued_varnamesZ%distributed_related_optimize_varnamesremaining_varsr   r   r   _init_serverG  s   





z#ParameterServerRuntime._init_serverc                 C   s   |   }|t  d S r   )r   rF   r   r   rG   r   r   r   _run_server  s   z"ParameterServerRuntime._run_serverc                 C   s   | j   |  }|  d S r   )r   stopr   closer   r   r   r   _stop_worker  s   
z#ParameterServerRuntime._stop_workerc                    s   g d}i }g |d< ddg|d< dg|d< ddg|d	< d
g|d< d
g|d< g d|d< dg|d< ddg|d< i }ddg|d< dg|d	< ||vrOt d|| fdd|| D }||vrdg }||fS  fdd|| D }||fS )N)	sgdadamadagradadamaxmomentumlars_momentumrmspropdecayed_adagradftrlr   Z	moment1_0Z	moment2_0r   Zmoment_0r   Z
inf_norm_0r   Z
velocity_0r   r   )Z
momentum_0Zmean_square_0Zmean_grad_0r   r   Z	squared_0Zlinear_0r   Zbeta1_pow_acc_0Zbeta2_pow_acc_0zCfleet can not support optimizer: {}, only this can be supported: {}c                       g | ]} d  | qS rL   r   rh   val
param_namer   r   rj         z@ParameterServerRuntime._get_optimizer_status.<locals>.<listcomp>c                    r   r   r   r   r   r   r   rj     r   )r&   format)r   ru   r   Zsupported_optsZreshaped_val_mapZorishaped_val_mapZreshaped_namesZorigin_namesr   r   r   _get_optimizer_status  s<   






z,ParameterServerRuntime._get_optimizer_statusc                 C   sR   ddl m} || j}|D ]}d|jv r&d|jv r&|dd |kr&|  S qd S )Nr   )_get_optimize_opsParamZLearningRate)r)   r   r   Zinput_namesinput)r   r   r   optsru   r   r   r   _get_optimizer_op  s   


z(ParameterServerRuntime._get_optimizer_opc                 C   s   | j   t }| }g }| D ]e\}}	t|	 dkr"td|	 d }
||
 | 	|
}| 
|j|
\}}|
g| | D ]4}| j j| }|jd| j |jddd |jD g|jg|jgd|	 tj||jd	d
 qBq|| |S )Nr    Dense can not support split now.r   	recv_saverd   c                 S   re   r   rf   rh   ir   r   r   rj     rk   z=ParameterServerRuntime._save_dense_params.<locals>.<listcomp>FrX   r3   slice_shapesslice_varnamesremote_varnames	is_sparse	endpointsr2   r4   r5   )r   recvr   r7   itemsro   origin_varnamesr&   rE   r   r   r4   r   r6   rD   r   rZ   r3   rA   r*   split_endpointsr?   r@   rF   )r   rG   rH   r   rI   progblock
local_varsr*   var_ctxrQ   	optimizerreshaped_varnamesr   r   r+   r   r   r   _save_dense_params  s<   



z)ParameterServerRuntime._save_dense_paramsc                 C   s*  t  }| }g }| D ]\}}	t|	 dkrtd|	 d }
||
 | |
}| |j	|
\}}| j
 j|
 }g }ddd |jdd  D }|	 D ]}|t||  qS|jd| j |j||	 |	 d|	 t| j tj||jd		d
 |D ]J}| j
 j| }g }g }tt|	 D ]}|| d|  || q|jd| j |j|||d|	 t| j tj||jd		d
 q|D ]6}| j
 j| }|jd| j |jddd |jD g|g|gd|	 d d tj||jdd
 qq|| | S )Nr   r   r   rd   c                 S   re   r   rf   r   r   r   r   rj     rk   z>ParameterServerRuntime._save_sparse_params.<locals>.<listcomp>r   T)	rX   r3   r   r   r   r   r   Zpserver_numr2   r   z.blockc                 S   re   r   rf   r   r   r   r   rj   X  rk   Fr   )r   r7   r   ro   r   r&   rE   r   r   r4   r   r6   rA   r3   sectionsrg   rD   r   rZ   split_varnamesr   rY   r?   r@   r*   rangerF   rq   )r   rG   rH   r   rI   r   r   r   r*   r   rQ   r   r   r   r+   r   Zdims1sectionZreshaped_varnamer   r   r   rK   r   r   r   _save_sparse_params	  s   




z*ParameterServerRuntime._save_sparse_paramsc           	   
   C   sZ   t  }| }| D ]\}}|jd||| | | |dd q|| | S )NZcheckpoint_notify)rQ   r   r   r   r   rH   r   )r   r7   r   rD   r   r   rF   rq   )	r   rG   rH   r   r   r   r   r*   r   r   r   r   _save_distributed_paramsc  s   
z/ParameterServerRuntime._save_distributed_paramsc                 C   s   | j jddd}| j jddd}| j jddd}| ||||}| ||||}	| ||||}
|t|	 t|
 }ttt||	 }t
jj||||d d S )Nr   T)r   Zuse_origin_programr	      r   )r   r   r   r   r   r8   r9   r   r   r:   r<   r=   Z	save_vars)r   rG   rH   rI   r   Z	dense_ctxZ
sparse_ctxZdistributed_ctxZrecv_dense_varnamesZrecv_sparse_varnamesZrecv_distributed_varnamesZsaved_varnamesr   r   r   r   _save_distributed_persistablesw  sH   
z5ParameterServerRuntime._save_distributed_persistablesr   c                 K   sJ   t |ts	td|du r| j }t |trtd| |||| dS )a  
        This function filters out all variables with `persistable==True` from the
        give `main_program` and then saves these variables to the folder `dirname`
        or file `filename`.

        The `dirname` is used to specify the folder where persistable variables
        are going to be saved. If you would like to save variables in separate
        files, set `filename` None; if you would like to save all variables in a
        single file, use `filename` to specify the file name.
        zHin fleet.save_persistables() function, executor must be as Executor typeNzkin fleet.save_persistables() function, main_program must be as Program type, CompiledProgram is not allowed)r;   r   	TypeErrorr   Zget_origin_ps_main_programr   r   )r   rG   rH   rI   r   r[   r   r   r   _ps_inference_save_persistables  s   


z6ParameterServerRuntime._ps_inference_save_persistablesTFc                 C   s   t |ts	td|dur%t |trtdtjjj||||||d dS tjj||||| j|d d}t	j
||}	t|	d}
|
 }W d   n1 sOw   Y  t|}|t  | j|||dd dS )	z
        Prune the given `main_program` to build a new program especially for inference,
        and then save it and all related parameters to given `dirname` by the `executor`.
        zKin fleet.save_inference_model() function, executor must be as Executor typeNznin fleet.save_inference_model() function, main_program must be as Program type, CompiledProgram is not allowed)programlegacy_formatZ	__model__rbr   )r   )r;   r   r   r   r<   r=   r>   Zsave_inference_modelr   r?   r@   rA   openreadr   Zparse_from_stringZ_copy_dist_param_info_fromr   r   )r   rG   rH   Zfeeded_varsZtarget_varsrI   Zexport_for_deploymentr   Zmodel_basenameZmodel_filenamefZprogram_desc_strr   r   r   r   "_ps_inference_save_inference_model  sF   


		


z9ParameterServerRuntime._ps_inference_save_inference_modelc                 O      | j |i | d S r   )r   r   r   r[   r   r   r   _save_inference_model     z,ParameterServerRuntime._save_inference_modelc                 O   r   r   )r   r   r   r   r   _save_persistables  r   z)ParameterServerRuntime._save_persistablesr   )Nr   )NTF)__name__
__module____qualname__r   r    r   r   rM   rR   staticmethodr   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   __classcell__r   r   r   r   r   #   s<    
0 R3&Z0
%
7r   )r?   r   r<   Zpaddle.frameworkr   Zpaddle.staticr   r   r   r   r   r   Zbase.private_helper_functionr
   Zruntime_baser   __all__r   r   r   r   r   <module>   s    	