o
    "j:                     @   s2   d Z ddlZddlZddlZg ZG dd dZdS )zParameter Server utils    Nc                   @   sH   e Zd ZdZdddZ	dddZdd Zdd	d
Zdd Zdd Z	dS )DistributedInferz>
    Utility class for distributed infer of PaddlePaddle.
    Nc                 C   sB   |r|  | _ntj   | _|r|| _ntj | _d | _d S N)cloneorigin_main_programpaddlestaticdefault_main_programorigin_startup_programdefault_startup_programsparse_table_maps)selfmain_programstartup_program r   g/var/www/html/Deteccion_Ine/venv/lib/python3.10/site-packages/paddle/distributed/fleet/utils/ps_util.py__init__   s   
zDistributedInfer.__init__c                 C   s   ddl m} |jjd u r`|j|d tj }| }d|_|j	||d}|j
|| jd | r<|j|d |  n|tj  |  | || tj }	| j}	tj }
| j}
d S d S )Nr   fleet)
role_makerT)strategy)r   dirname)paddle.distributedr   _runtime_handleinitr   	optimizerZSGDZDistributedStrategyZa_syncZdistributed_optimizerZminimizer	   Z	is_serverZinit_serverZ
run_serverrunr   r
   Zinit_worker_init_dense_paramsr   r   )r   exeZlossr   r   r   Zfake_optimizerr   r   Zglobal_startup_programZglobal_main_programr   r   r   init_distributed_infer_env-   s.   




z+DistributedInfer.init_distributed_infer_envc                 C   s^   ddl m} | jd u r,i | _|jjj}| D ]\}}|jr+|d}| | j|< qq| jS )Nr   r   z@GRAD)	r   r   r   r   Z	_send_ctxitemsZ	is_sparsestriptable_id)r   r   Zsend_ctxZgradnamectxparamr   r   r   _get_sparse_table_mapK   s   


z&DistributedInfer._get_sparse_table_mapc                    sr   |    d ur5|d ur7dd | j D }fdd|D } fdd|D }tjj| | j|d d S d S d S )Nc                 S   s   g | ]}t jj|r|qS r   )r   r   ioZis_persistable.0vr   r   r   
<listcomp>]   s    z7DistributedInfer._init_dense_params.<locals>.<listcomp>c                    s    g | ]}|j  vr|j |fqS r   )namer'   )r   r   r   r*   b   s
    
c              	      s.   g | ]}t jt j |d  r|d qS )r      )ospathisfilejoinr'   r   r   r   r*   g   s    )r   vars)r%   r   Z	list_varsr   r   Z	load_vars)r   r   r   Zall_persist_varsZdense_persist_varsZneed_load_varsr   )r   r   r   r   Y   s$   


z#DistributedInfer._init_dense_paramsc                 C   s   |   }| | j|}|S r   )r%   _convert_programr   )r   varname2tablesZconvert_programr   r   r   get_dist_infer_programs   s
   z'DistributedInfer.get_dist_infer_programc                    s    fdd}||}|S )Nc                    sD   ddd  fdd}fdd}|}t d || S )NW)Zlookup_tableZlookup_table_v2c                    sf   i }|   jD ])}|j  v r0|ddu r0| |j d }||g }|| |||< q|S )NZremote_prefetchTr   )global_blockopstypekeysattrinputgetappend)_programpull_sparse_opsop
param_namer7   )SPARSE_OP_TYPE_DICTr   r   _get_pull_sparse_ops~   s   
z]DistributedInfer._convert_program.<locals>.distributed_ops_pass.<locals>._get_pull_sparse_opsc                    s<  dd }|  D ]\}} j fdd|D } j|d dd  }|j vr9td|j d|j }|d d	}|d d
}	|d j	}
fdd|D }|||  fdd|D }|d d d D ]	} 
| qpdgt| }t jd gt| }t jD ]X\}}tdt|jD ]!}||j| }t|D ]\}}|j|v rt||| ||< qqtdt|jD ]!}||j| }t|D ]\}}|j|v rt||| ||< qqqt|t| dkrt|d } j|d||dd|i|	||d|
dd qtdd S )Nc           "      S   s  |   }t|j}d}dgt|j }dgt|j }t|jD ]j\}}	tdt|	jD ]+}
|| dkr6 n"|	|	j|
 }t|D ]\}}|j|v rVd||< t||} nqBq,tdt|	j	D ])}
|| dkrj n |	
|	j	|
 }t|D ]\}}|j|v rd||< t||}qvq`q tt|jD ]}
||
 dkr||
 dkrtd  d S q||k rg }t|d t|D ]}
||
 dkr||j|
 |
f qt|D ]\}
}	g }t }||	d  ||	d  d}|t|k r|| }|j| }	g }tdt|	j	D ]}|	
|	j	| }|| qt|d |d dD ]h}|j| }||v r-qd}tdt|jD ]0}||j| }tt|D ]}|| D ]}||v rXd} nqM|r_ nqG|rf nq7|r|| rytd   d S || ||j|  q|d }|t|k s|  |D ]J}|j|}||j| j |j|d |d  ||j| _|j|}||} ||}!|j|| |||  |||! |d }qq|j t|jksJ tt|jD ]}
|j|
|j|
 jks	J qd S d S )	Nr   r,   zunable to re-arrange dags order to combine distributed embedding ops because a op both needs embedding table's output as input and produces ids as the same embedding table's inputFTzDunable to re-arrange dags order to combine distributed embedding ops   )r6   lenr7   	enumeraterangeoutput_namesoutputr+   maxinput_namesr;   minwarningswarnr=   setaddsortdesc
_insert_opZ	copy_from
_remove_oppopinsertZop_sizer@   )"programinputsoutputsr6   Zmin_output_indexZmax_input_indexZinput_indexesZoutput_indexesidxr@   ioutsin_idin_varinsout_idout_varZmove_opsqueuevisitedstartposZ	op_inputskjZop1foundtyindexrS   Z	insert_opZinput_stateZoutput_stater   r   r   dag_check_up_and_reorder   s   










#




Kz|DistributedInfer._convert_program.<locals>.distributed_ops_pass.<locals>._pull_sparse_fuse.<locals>.dag_check_up_and_reorderc                    $   g | ]}   j|d d  qS )Idsr   )r6   r1   r;   r(   r@   rX   r   r   r*         znDistributedInfer._convert_program.<locals>.distributed_ops_pass.<locals>._pull_sparse_fuse.<locals>.<listcomp>r   r5   zcan not find variable z!, please check your configurationpadding_idxis_distributedc                    rn   )ZOutr   )r6   r1   rJ   rp   rq   r   r   r*     rr   c                    s   g | ]}  |qS r   )rl   rp   Zall_opsr   r   r*     s    rD   r,   Zdistributed_lookup_table)ro   r5   ZOutputsT)rt   rs   r"   Zis_testZlookup_table_version)rl   r8   rY   rZ   attrsz9something wrong with Fleet, submit a issue is recommended)r    r6   r7   r1   r;   r+   r9   
ValueErrorr:   r8   rU   rF   rG   rH   rI   rJ   rK   rL   rM   rT   )r>   r?   rm   r$   r7   rY   wr"   rs   rt   Zop_typerZ   Zop_idxsr[   Zinputs_idxsZoutputs_idxsr@   r\   r]   r^   r_   r`   ra   rb   Zdistributed_idx)rX   r3   ru   r   _pull_sparse_fuse   s   t






zZDistributedInfer._convert_program.<locals>.distributed_ops_pass.<locals>._pull_sparse_fusezBlookup_table will be forced to test mode when use DistributedInfer)rN   rO   )rX   rC   ry   r?   r3   )rB   rX   r   distributed_ops_pass{   s   
 @
z?DistributedInfer._convert_program.<locals>.distributed_ops_passr   )r   r   r3   r{   Zcovert_programr   rz   r   r2   z   s    Wz!DistributedInfer._convert_program)NN)
__name__
__module____qualname____doc__r   r   r%   r   r4   r2   r   r   r   r   r      s    


r   )r   r-   rN   r   __all__r   r   r   r   r   <module>   s   