o
    "j[                     @   sh  d dl Z d dlmZ d dlZd dlmZ d dlmZmZ d dl	m
Z
 d dlmZmZ ddlT ed	G d
d deZedG dd deZedG dd deZedG dd deZedG dd deZedG dd deZedG dd deZedG dd  d eZed!G d"d# d#eZed$G d%d& d&eZed'G d(d) d)eZdS )*    N)defaultdict)	framework)PassBaseregister_pass)core)	ParameterProgram   )*Zappend_send_ops_passc                       D   e Zd Z fddZdd Zdd Zdd Zd	d
 Zdd Z  Z	S )AppendSendOpsPassc                       t    d S Nsuper__init__self	__class__ j/var/www/html/Deteccion_Ine/venv/lib/python3.10/site-packages/paddle/distributed/passes/ps_trainer_pass.pyr         zAppendSendOpsPass.__init__c                 C      dS NTr   r   r   r   r   _check_self!      zAppendSendOpsPass._check_selfc                 C   r   r   r   r   Z
other_passr   r   r   _check_conflict$   r   z!AppendSendOpsPass._check_conflictc           	         s|   |t krg }n	 fdd|D }g }|tjtjfv r$  jt d}  jdd|id|id|gd|d	|t	t
id
 |S )Nc                    s   g | ]	}   j| qS r   )global_blockvars).0Z	union_varprogramr   r   
<listcomp>-   s    z5AppendSendOpsPass._append_send_op.<locals>.<listcomp>)namesendXOutZsend_varnames	is_sparsetable_idtypeinputsoutputsattrs)ZSTEP_COUNTERDistributedModeSYNC
HALF_ASYNCr   
create_varr   Zgenerate_control_dev_var_name	append_opRPC_OP_ROLE_ATTR_NAMERPC_OP_ROLE_ATTR_VALUE)	r   r#   Z
union_varsqueuer)   r*   ps_modesend_input_varsdummy_outputr   r"   r   _append_send_op'   s*   
z!AppendSendOpsPass._append_send_opc              
   C   s.   |  jdd|idg id|ddttid d S )NZsend_barrierr'   r(   
trainer_idZ
half_asyncTr+   )r   r4   r5   r6   )r   r#   dummysr<   r   r   r   _append_barrier_opE   s   
z$AppendSendOpsPass._append_barrier_opc                 C   s   |j }|d }t||d d}g }| D ]O\}}	|	 r#|tjkr#q|	 s-|tjkr-q|	 t|d jj	kr:qt
|	 dkrCq|	 rIdnd}
|	 rQdn|
}
|| ||	 ||
|	 | q|tjtjfv r|t|d }| ||| d S d S )	Nr8   is_heter_ps_modeZsplit_dense_tableZlossr      r	   
role_maker)_attrsget_the_one_send_contextitemsr)   r0   ZGEOZ
program_ididblockr#   lenZremote_sparse_idsis_distributedappendr;   origin_varnamesr*   r1   r2   get_role_idr>   )r   main_programstartup_programpass_ctxr/   r8   send_ctxr=   Zmerged_namer&   r)   r<   r   r   r   _apply_single_implQ   s>   
z$AppendSendOpsPass._apply_single_impl)
__name__
__module____qualname__r   r   r   r;   r>   rQ   __classcell__r   r   r   r   r      s    r   Zdistributed_ops_passc                       L   e Zd Z fddZdd Zdd Zdd Zd	d
 Zdd Zdd Z	  Z
S )DistributedOpsPassc                    s   t    i | _i | _d S r   )r   r   w_2_table_idemb_sizer   r   r   r   r   w   s   

zDistributedOpsPass.__init__c                 C   r   r   r   r   r   r   r   r   |   r   zDistributedOpsPass._check_selfc                 C   r   r   r   r   r   r   r   r      r   z"DistributedOpsPass._check_conflictc                    s|  |d rd S t |dkrd S d }d }d}| D ]\}}	|	d }
 |
drh|
d}|d}t |dkrh|d dkrh|d }|d	 }|  jv rc|  jv rc  j| }  j| }d
}ntd |st	d   j
dtjjjdd
d}  jddi d|idg|jddd   j
dtjjjdd
d}  jddi d|idg|jddd | D ]\}}	  jfdd|	D } fdd|	D }  j|	d dd  }| j| }|	d d}|	d d}|	d j}dd |	D }t	d|  fdd|	D }|d d d D ]
}  | q  jd|||||dd |i|||| j| ||d!d" qd S )#N
use_ps_gpur   Fentry:   Zshow_click_entryrA   r	   TzGShowClickEntry configured, but cannot find show/click var, will not usez+ShowClickEntry not configured, will not useshow)r%   dtypepersistablestop_gradientZfill_constantr(   )shaper_   valueindexr,   r-   r.   r/   clkc                       g | ]}  |qS r   re   r!   opall_opsr   r   r$          z8DistributedOpsPass._push_sparse_fuse.<locals>.<listcomp>c                    $   g | ]}   j|d d  qS Idsr   r   r    inputri   _programr   r   r$          W@GRADpadding_idxrI   c                 S   s   g | ]}| d qS )Zslot)attrri   r   r   r   r$      rm   zdebug zcb slots: c                    rn   )zOut@GRADr   rq   ri   rs   r   r   r$      s    Zdistributed_push_sparse)rp   WOutputsZShowsZClicksr{   )rI   rw   r*   size
use_cvm_opslotsr+   )rH   rE   Zhas_attrrx   splitr   r    warningswarnprintr3   r   ZVarDescZVarTypeZFP32
_insert_opr_   opsoutputrX   r,   
_remove_opr4   rY   )r   rt   push_sparse_opsr/   r}   r^   rf   Z	use_entryparamr   Zop_firstr[   Zshow_var_nameZclick_var_nameop_idxsr-   wr*   rw   rI   op_typer~   r.   idxr   rt   rl   r   _push_sparse_fuse   s   








z$DistributedOpsPass._push_sparse_fusec           )         s@  dd }|d rg }g }g }g }	g }
t   jd }| D ]\}}  jd}|d r6|d d} fd	d
|D }  j|d dd  }|jd | j|< |d |j	 }d}| D ]\}}||
 v rp| }qb|dkrytd|| j|< |d d}|d d}|d j} fdd
|D }| || fdd
|D }|d d d D ]	}  | qdgt | }t   jd gt | }t  jD ]\\}}tdt |jD ]"}||j| }t|D ]\} }!|!j	|v rt|||  || < qqtdt |jD ]$}||j| }"t|D ]\}#}$|$j	|"v r.t|||# ||#< qqq|d rc|| || || |	| |
|jd gt |  tg ||R  }qt|t| dkrt|dkrzt|}%nt|d }%  j|%d||dd|i|||||dd qtt |D ]$}|| }%  j|%d|| g|dd|| gi|||||dd qq|d rt |dkrt|dkrtd  j|dd|id|	i|
dddd t }&z	dd
 |D }'W n ty   tdw |&|' d d
 |
D }(|&|( d S d S d S )!Nc           "      S   s  |   }t|j}d}dgt|j }dgt|j }t|jD ]j\}}	tdt|	jD ]+}
|| dkr6 n"|	|	j|
 }t|D ]\}}|j|v rVd||< t||} nqBq,tdt|	j	D ])}
|| dkrj n |	
|	j	|
 }t|D ]\}}|j|v rd||< t||}qvq`q tt|jD ]}
||
 dkr||
 dkrtd  d S q||k rg }t|d t|D ]}
||
 dkr||j|
 |
f qt|D ]\}
}	g }t }||	d  ||	d  d}|t|k r|| }|j| }	g }tdt|	j	D ]}|	
|	j	| }|| qt|d |d dD ]h}|j| }||v r-qd}tdt|jD ]0}||j| }tt|D ]}|| D ]}||v rXd} nqM|r_ nqG|rf nq7|r|| rytd   d S || ||j|  q|d }|t|k s|  |D ]J}|j|}||j| j |j|d |d  ||j| _|j|}||} ||}!|j|| |||  |||! |d }qq|j t|jksJ tt|jD ]}
|j|
|j|
 jks	J qd S d S )	Nry   r   rA   zunable to re-arrange dags order to combine distributed embedding ops because a op both needs embedding table's output as input and produces ids as the same embedding table's inputFTzDunable to re-arrange dags order to combine distributed embedding opsr	   )r   rH   r   	enumeraterangeoutput_namesr   r%   maxinput_namesrr   minr   r   rJ   setaddsortdescr   	copy_fromr   popinsertZop_sizerj   )"r#   r-   r.   r   Zmin_output_indexZmax_input_indexZinput_indexesZoutput_indexesr   rj   ioutsin_idin_varinsout_idout_varZmove_opsr7   visitedstartposZ	op_inputskjZop1foundtyre   r   Z	insert_opZinput_stateZoutput_stater   r   r   dag_check_up_and_reorder   s   











!

";zFDistributedOpsPass._pull_sparse_fuse.<locals>.dag_check_up_and_reorderrZ   rA    r?   r   	op_devicec                    rn   ro   rq   ri   rs   r   r   r$   ]  ru   z8DistributedOpsPass._pull_sparse_fuse.<locals>.<listcomp>rz   Zparam_name_to_grad_namery   z0can not find suitable sparse table, please checkrw   rI   c                    rn   )r(   r   )r   r    r   ri   rs   r   r   r$   u  ru   c                    rg   r   rh   ri   rk   r   r   r$   {  rm   Zdistributed_lookup_table)rp   rz   r{   )rI   rw   r*   Zlookup_table_versionr   rd   z,There can't be ops before embedding in gpupspull_gpups_sparserp   r(   T)r|   rI   r)   c                 S   s   g | ]}t |jqS r   )intr%   )r!   varr   r   r   r$     rm   z<The slot name in gpups Should be able to convert to integer.c                 S   s   g | ]}|d  qS )r]   r   )r!   xr   r   r   r$     s    )rH   r   r   rE   rx   r    rr   rb   rY   r%   rK   r*   
ValueErrorrX   r,   r   r   r   r   r   r   r   r   extendr   r   PSGPUZset_slot_vectorZset_slot_dim_vector))r   rt   pull_sparse_opsr/   rP   r   Zgpups_inputs_idxsZgpups_outputs_idxsZgpups_inputsZgpups_outputsZgpups_w_sizeZgpups_min_distributed_idxr   r   r   r-   r   Z	grad_namer*   r%   ctxrw   rI   r   r.   r   r   Zinputs_idxsZoutputs_idxsrj   r   r   r   r   r   r   r   Zdistributed_idxr   Zgpu_slotZgpu_mf_sizesr   r   r   _pull_sparse_fuse   s   _












z$DistributedOpsPass._pull_sparse_fusec                 C   sT  i }i }i }i }d}|  jD ]^}|jt v rf|ddu rf|t|j d }	|d r<|d s<|	|dd d 7 }	|	|d v rCq||	g }|| |||	< ||	g }
|
|dd  |
||	< |jd	krmd}q|  jD ]1}|jt	 v r|t	|j d }	|	|v r|dd ||	 v r||	g }|| |||	< qs|||fS )
NFZremote_prefetchTr   r?   Zis_fl_ps_moderp   local_sparseZcvm)
r   r   r,   ZSPARSE_OP_TYPE_DICTkeysrx   rr   getrJ   ZSPARSE_GRAD_OP_TYPE_DICT)r   rt   r/   r   Zpull_sparse_idsr   r   r}   rj   
param_nameZidsr   r   r   _get_pull_sparse_ops  s@   



z'DistributedOpsPass._get_pull_sparse_opsc           	      C   s^   |j }| ||\}}}td|d  t||d d}| |||| | |||| d S )Nz,is_heter_ps_mode in distributed_ops_pass {}?r?   r@   )rC   r   r   formatrD   r   r   )	r   rM   rN   rO   r/   r   r   r}   rP   r   r   r   rQ     s    
z%DistributedOpsPass._apply_single_impl)rR   rS   rT   r   r   r   r   r   r   rQ   rU   r   r   r   r   rW   u   s    n n'rW   Zdelete_optimizer_passc                       r   )DeleteOptimizesPassc                    r   r   r   r   r   r   r   r     r   zDeleteOptimizesPass.__init__c                 C   r   r   r   r   r   r   r   r     r   zDeleteOptimizesPass._check_selfc                 C   r   r   r   r   r   r   r   r     r   z#DeleteOptimizesPass._check_conflictc                 C   s   g }g }g }g }|D ]}| |j q
|D ]}| |j | |d qtt|}tt|}td||| |D ]}	|	|v rDq=|	|vrM||	 q=tt|}
t|	 | |
D ]}	|	 
|	rm|	 |	 q]d S )Nop_role_varzSremote_optimize_vars: {}, remote_optimize_op_role_vars: {}, local_optimize_vars: {})r   input_arg_namesrx   listr   r   r   rJ   
delete_opsr   has_var_remove_var)r   rt   remote_optimize_opslocal_optimize_opslocal_optimize_varsremote_optimize_varsremote_optimize_op_role_varsoptimize_need_delete_varsrj   r   need_delete_optimize_varsr   r   r   _delete_optimizer_op_and_vars"  sF   
z1DeleteOptimizesPass._delete_optimizer_op_and_varsc                 C   s:   |d   jd }|  j|j|j|j|j|jdd d S )Norigin_main_programlearning_rate_0T)r%   rb   r_   r,   	lod_levelr`   )r   r    r3   r%   rb   r_   r,   r   )r   rM   r/   Zlr_varr   r   r   _add_lr_varJ  s   
zDeleteOptimizesPass._add_lr_varc           	      C   sr   |j }t|}t||d }t|}|| tt|t| }| ||| t|d dr7| || d S d S )Nremote_sparser   Zlr_scheduler)	rC   get_optimize_opsZ
get_lr_opsr   r   r   r   hasattrr   )	r   rM   rN   rO   r/   all_optimize_opsr   Zlr_opsr   r   r   r   rQ   X  s    
z&DeleteOptimizesPass._apply_single_impl)
rR   rS   rT   r   r   r   r   r   rQ   rU   r   r   r   r   r     s    (r   Zdelete_extra_optimizer_passc                       4   e Zd Z fddZdd Zdd Zdd Z  ZS )	DeleteExtraOptimizerPassc                    r   r   r   r   r   r   r   r   m  r   z!DeleteExtraOptimizerPass.__init__c                 C   r   r   r   r   r   r   r   r   p  r   z$DeleteExtraOptimizerPass._check_selfc                 C   r   r   r   r   r   r   r   r   s  r   z(DeleteExtraOptimizerPass._check_conflictc                 C   sH  |j }g }g }g }t|}t||d }	tt|t|	 }
g }|
D ]}||j q"|	D ]}||j ||d q-tt|}tt|}|D ]}||v rSqLd|krXqL||vra|| qLtt|}g }|D ]}g }| j	D ]}||j
v r|| qu|| qlt| | |D ]}| |r| | qd S )Nr   r   r   )rC   r   r   r   r   r   rx   rJ   r   r   output_arg_namesr   r   r   )r   rM   rN   rO   r/   r   r   r   r   r   r   r   rj   r   r   Zinit_opsZparam_init_opr   r   r   rQ   v  sT   


z+DeleteExtraOptimizerPass._apply_single_implrR   rS   rT   r   r   r   rQ   rU   r   r   r   r   r   k  
    r   Zfake_init_ops_passc                       r   )FakeInitOpsPassc                    r   r   r   r   r   r   r   r     r   zFakeInitOpsPass.__init__c                 C   r   r   r   r   r   r   r   r     r   zFakeInitOpsPass._check_selfc                 C   r   r   r   r   r   r   r   r     r   zFakeInitOpsPass._check_conflictc                 C   s,   t |d d}t |d d}tt|| S )NZorigin_main_programsTF)Zget_sparse_tablenamesr   r   )r   r/   Zdist_varnamesZsparse_varnamesr   r   r   _get_sparse_table_names  s   z'FakeInitOpsPass._get_sparse_table_namesc           
   	   C   s   |D ]^}|  j| }t|dd   d |d v rqg }|  jD ]}||jv r2|| q&t|}|dkrCt	dt| |d }	|  j
di d|id	|	d	id
 t|  | qd S )Nr\   r   ry   r   rA   z&table init op num should be 1, now is Z	fake_initr(   rb   r+   )r   r    strr   stripr   r   rJ   rH   r   r4   rx   r   )
r   rN   Zsparse_table_namesr/   Z
table_nameZ	table_varZtable_param_init_oprj   Zinit_op_numZtable_init_opr   r   r   _fake_init_sparsetable  s2   


z&FakeInitOpsPass._fake_init_sparsetablec                 C   s"   |j }| |}| ||| d S r   )rC   r   r   )r   rM   rN   rO   r/   Zsparse_tablesr   r   r   rQ     s   
z"FakeInitOpsPass._apply_single_impl)
rR   rS   rT   r   r   r   r   r   rQ   rU   r   r   r   r   r     s    	r   Zps_gpu_passc                       rV   )	PsGpuPassc                    r   r   r   r   r   r   r   r     r   zPsGpuPass.__init__c                 C   r   r   r   r   r   r   r   r     r   zPsGpuPass._check_selfc                 C   r   r   r   r   r   r   r   r     r   zPsGpuPass._check_conflictc           
      C   s   d}t t| jD ]\}}|jdkr|}q| jD ]L}|jdkr)|jdkr)qt|jt g \}}|D ]1}| j	|d }|
| |tt tj| |}	| j|d |	 |   q6qd S )Nry   lookup_table_gradpull_box_sparser   rA   )r   r   r   r   r,   r   Zget_grad_op_descr   r   r   r   	_set_attrZop_role_attr_namebackwardpaddleZstaticOperatorr   _sync_with_cpp)
r   r#   Zinsert_indexr   rj   Zgrad_op_descZop_grad_to_varop_descZnew_op_descZnew_opr   r   r   _add_push_box_sparse_op  s0   



z!PsGpuPass._add_push_box_sparse_opc                 C   s*  i }t t| jD ]\}}|jdkr |dD ]}d||< qqg }g }g }t|D ]4}d|jvr3q+|dD ]&}||v r^||	d |jD ]}	|	dkrPqI||	D ]}
|
|
 qUqIq8q+t t|}t t|}|D ]}
|
|vry|
|
 qnt t|}|D ]}| |r| | qd S )Nr   rz   rA   Paramr   ZLearningRate)r   r   r   r   r,   rr   r   r   r   rx   rJ   r   r   r   )r   r#   Zembedding_wr   rj   r%   Zoptimize_varsZoptimize_op_role_varsr   key_namer   r   r   r   r   _remove_optimizer_var  sF   



	
zPsGpuPass._remove_optimizer_varc           
      C   s&  i }g }g }t t| jD ]+\}}|jdkr:|dD ]}d||< || || q|dD ]}d||< q3qt t| jD ](\}}|jdksR|jdkrSqD|jD ]}||D ]}	|	|v rj||  nq]qVqDt t	|}|j
dd |D ]	}| | q{|D ]	}| | qd S )	Nr   rv   rA   rz   r   r   T)reverse)r   r   r   r   r,   r   rJ   rr   r   r   r   r   r   )
r   r#   Zlookup_table_grad_varZremove_op_indexZ
remove_varr   rj   r%   r   r   r   r   r   $_remove_lookup_table_grad_op_and_var   s<   




z.PsGpuPass._remove_lookup_table_grad_op_and_varc                 C   s(   |j }| | | | | | d S r   )rC   r   r   r   )r   rM   rN   rO   r/   r   r   r   rQ   =  s   

zPsGpuPass._apply_single_impl)rR   rS   rT   r   r   r   r   r   r   rQ   rU   r   r   r   r   r     s    #r   Zps_transpile_passc                       r   )	PsTranspilePassc                    r   r   r   r   r   r   r   r   F  r   zPsTranspilePass.__init__c                 C   r   r   r   r   r   r   r   r   I  r   zPsTranspilePass._check_selfc                 C   r   r   r   r   r   r   r   r   L  r   zPsTranspilePass._check_conflictc                 C   sD   ddl m} |j}| }t }|j|||d |d |d dd d S )Nr	   )SingleProcessMultiThreadr<   Ztrainer_endpointscurrent_endpointF)rN   rM   rankZ	endpointsr   Z	wait_port)Ztranspiler.collectiver   rC   Zget_dist_envZ	transpile)r   rM   rN   rO   r   r/   r   envr   r   r   rQ   O  s   
z"PsTranspilePass._apply_single_implr   r   r   r   r   r   D  r   r   Zsplit_heter_worker_ops_passc                       s<   e Zd Z fddZdd Zdd Zdd Zd	d
 Z  ZS )SplitHeterWorkerOpsPassc                    r   r   r   r   r   r   r   r   a  r   z SplitHeterWorkerOpsPass.__init__c                 C   r   r   r   r   r   r   r   r   d  r   z#SplitHeterWorkerOpsPass._check_selfc                 C   r   r   r   r   r   r   r   r   g  r   z'SplitHeterWorkerOpsPass._check_conflictc           !      C   s  g }g }g }	|j d }
|d }|  }t| }||d  d }||d  d }||
}|| t|D ]\}}t|||| q9||d  d d }t	|||| ||d  d d }t	|||| t
|j}|t
|k r||
}|| t|D ]\}}t|||| q||d  d d }t	|||| ||d  d d }t	|||| t|||dd}||d d	 t|j  n4t|D ]\}}t|||| q||d  d d }t	|||| ||d  d d }t	|||| |}t|||dd}||d d	 t|j  t
|j}|t
|d kr't|||||||}t|||||||d
}t||||d  d d }	g }g }t|} d|d|dt|dt
t|dt|d|d dttddtti}| jddg ii |d d S )NrA   rB   forwardr   entranceexitr,   block_input_var_namer\   Fpersistablesmessage_to_block_idoptimize_blocksendpointfanin
pserver_iddistributed_moder8   rpc_exec_thread_numCPU_NUM    heter_listen_and_servr'   r+   )Z
num_blocksZ_heter_device_typelowerr   _get_stage_id_create_blockrJ   r   Zblock_append_opZadd_vars_by_var_listrH   r   get_communicate_var_infor   r   Zinsert_communicate_opadd_send_opZget_ps_endpointsget_heter_worker_endpointget_previous_stage_trainersrL   osgetenvr5   r6   r   r4   )!r   r#   r/   heter_programprogram_block_ops_list	heter_opsblock_var_detailoptimizer_blockgrad_to_block_idZsend_grad_var_listZpre_block_idxrB   Zcurrent_deviceZstage_idZheter_block_ops_forwardZheter_block_ops_backwardZheter_block_rj   Zentrance_varsZ	exit_varsZfirst_op_index_fpZheter_block_bpbp_entrance_varsZbp_exit_varsbackward_comm_infoZforward_comm_infoZfirst_op_index_bp
static_varZstatic_var_bpr9   r:   Zpserver_endpointsr   r   r   _create_heter_programj  s   








	
z-SplitHeterWorkerOpsPass._create_heter_programc                 C   st   |j }d}t||\}}}}	t|dkrtd |}dS t|	}	t||	|}
tj	 }| 
||||	||
 |}dS )z
        split heter worker program from origin-program
        1. find heter op (located on different device)
        2. find input&output of every heter-block
        3. create heter worker program, add listen&serv op
        cpur   zuCurrently running in Heter Parameter Server mode, but no OP running on heterogeneous devices, Please check your code.N)rC   find_heter_opsrH   r   r   union_forward_gradient_opfind_block_jointsr   r   r   r  )r   rM   rN   rO   r/   Zdefault_deveicer#   r  r  program_block_opsblock_vars_detailr  r   r   r   rQ     s2   
z*SplitHeterWorkerOpsPass._apply_single_impl)	rR   rS   rT   r   r   r   r  rQ   rU   r   r   r   r   r   _  s     "r   Zsplit_trainer_ops_passc                       sT   e Zd Z fddZdd Zdd Zdd Zd	d
 Zdd Zdd Z	dd Z
  ZS )SplitTrainerOpsPassc                    r   r   r   r   r   r   r   r   0  r   zSplitTrainerOpsPass.__init__c                 C   r   r   r   r   r   r   r   r   3  r   zSplitTrainerOpsPass._check_selfc                 C   r   r   r   r   r   r   r   r   6  r   z#SplitTrainerOpsPass._check_conflictc                 C   s   |  j}|d }d}|D ]}	t|	t|kr||	} nq|dks%J t|  | g }
|d }|dkrxt|}|| d d }
t||d |
}|  j|dd|  j|
d  id	g id
dd|
dg dg d|d d|dg dt	|t
tid |
S )Nr   ry   rB   rA   r   r   send_and_recvr'   r(   modesend_var_namemicrobatch_idrecv_var_namemessage_namer   next_endpointsprevious_endpointsr<   rd   )r   r   r   re   delete_same_opsget_next_stage_trainersr  r   r    rL   r5   r6   )r   r#   r/   heter_block_indexops_listr  Zall_opZstart_opZfirst_op_idxrj   Zentrance_varrB   Znext_heter_worker_endpoints	comm_infor   r   r   _replace_ops_by_communicate_op9  sH   




z2SplitTrainerOpsPass._replace_ops_by_communicate_opc                 C   s   t |d D ]\}}|d }|d }|jj|kr|d |= qt |d D ]\}}|d }|d }|jj|kr>|d |=  d S q$t |d D ]\}}|d }|d }|jj|kr_|d |=  d S qEd S )NZmerged_variables_pairsr   rA   Zmerged_dense_pairsZmerged_sparse_pairs)r   Z
merged_varr%   )r   var_namer/   re   pairr   Zvar_gradr   r   r   _remove_var_pair_by_grade  s*   


z,SplitTrainerOpsPass._remove_var_pair_by_gradc                 C   s   || d d || d d  }g }g }t |D ]&}t|| |\}	}
|	D ]}|dd }||v r=|| || q&qtt|}t| | |D ]}| || qNd S )Nr   r   r   z@GRADr   )	find_send_opZfind_op_input_outputr   r   rJ   r   r   r   r/  )r   r#   r/   r)  r  r   Zneed_remove_send_opZneed_remove_grad_varrj   Z
input_listr  r-  Zorigin_var_nameZgrad_var_namer   r   r   _remove_trainer_send_opz  s,   


z+SplitTrainerOpsPass._remove_trainer_send_opc                 C   s:  g }t dt|D ]!}|| d || d  }|| |||||7 }| |||| q	g }	g }
|d d }t| | t|| t||||}|d d d }t|d|dd}|
	|d d t
|j  |		| |d	 }d
|
d|	dt|dddt|d|d dttddtti}| jdddg ii |d d S )NrA   r   r   r   r   r   r   r\   rB   r   r   r   r   r   r   r8   r   r  r  r  r'   rd   )r   rH   r,  r1  r'  r   Zdelete_trainer_useless_varZcreate_backward_blockr  rJ   r   r   get_trainer_endpointrL   r   r  r  r5   r6   r   )r   r#   Zorigin_programr/   r  r  r  r)  r*  r  r  Zbp_ops_listZbackward_blockr  r  rB   r   r   r   _create_trainer_program  sf   





z+SplitTrainerOpsPass._create_trainer_programc                 C   sR   |j }d}t||\}}}}	t|	}	t||	|}
| }| ||||	|
 |}dS )z
        split cpu-trainer program from origin-program
        1. find heter op (located on different device)
        2. find input&output of every heter-block
        3. create cpu-trainer program, add send&recv op
        r  N)rC   r  r  r  cloner3  )r   rM   rN   rO   r/   Zdefault_device_r#   r  Zdefault_opsr  r  Ztrainer_programr   r   r   rQ     s$   z&SplitTrainerOpsPass._apply_single_impl)rR   rS   rT   r   r   r   r,  r/  r1  r3  rQ   rU   r   r   r   r   r  .  s    ,Nr  Zset_heter_pipeline_opt_passc                       r   )	SetHeterPipelineOptPassc                    r   r   r   r   r   r   r   r      r   z SetHeterPipelineOptPass.__init__c                 C   r   r   r   r   r   r   r   r     r   z#SetHeterPipelineOptPass._check_selfc                 C   r   r   r   r   r   r   r   r     r   z'SetHeterPipelineOptPass._check_conflictc                 C   s   |j }|d }|d jd }|t| d | dd|_dd| t| t| d t| ||| dd
|_d S )	NrB   Zuser_defined_strategyZaccumulate_stepsrA   )rN   pipeline_stageheter_place
is_fl_modeZHeterPipelineTrainerZHeterSection)
ZtrainerZdevice_workerZtrainersr<   r6  Znum_pipeline_stagesZsection_programnum_microbatchesr7  r8  )	rC   Zpipeline_configsr   r  Z_heter_deviceZ_heter_pipeline_optZ_get_stage_trainersZ_role_idZ_get_num_stage)r   rM   rN   rO   r/   rB   r9  r   r   r   rQ   	  s*   

z*SetHeterPipelineOptPass._apply_single_implr   r   r   r   r   r5    r   r5  Zsplit_fl_ops_passc                       s   e Zd Z fddZdd Zdd Zdd Zd	d
 Zdd Zdd Z	dd Z
dd Zdd Zdd Zdd Zdd Zdd Zdd Zdd  Zd!d" Zd#d$ Z  ZS )%SplitFlOpsPassc                    s&   t    d| _d| _d| _d| _d S )Nzgpu:0zgpu:2zgpu:1zgpu:3)r   r   PART_A_DEVICE_FlAGPART_A_JOINT_OP_DEVICE_FlAGPART_B_DEVICE_FlAGPART_B_JOINT_OP_DEVICE_FlAGr   r   r   r   r   &  s
   

zSplitFlOpsPass.__init__c                 C   r   r   r   r   r   r   r   r   -  r   zSplitFlOpsPass._check_selfc                 C   r   r   r   r   r   r   r   r   0  r   zSplitFlOpsPass._check_conflictc                 C      d S r   r   r   r   r   r   _insert_encrypt_op3  r   z!SplitFlOpsPass._insert_encrypt_opc                 C   r?  r   r   r   r   r   r   _insert_decrypt_op6  r   z!SplitFlOpsPass._insert_decrypt_opc                 C   s>   |j D ]}|jD ]}|t}|dkr|tdnd  qqd S )Nr   )blocksr   rx   OP_DEVICE_KEYr   )r   r#   rG   rj   devicer   r   r   _clear_op_device_flag9  s   


z$SplitFlOpsPass._clear_op_device_flagc           	      C   s   g | _ g | _tt}| jd}|jD ]I}|t}|| j	ks(|dks(|| j
kr3|d }| j | n|| jks=|| jkrG|d }| j| |j}| j }|| |t| qdD ]
}|| }|  q_|S )Nr   r   ab)rF  rG  )Z	partA_opsZ	partB_opsr   r   ori_main_programrG   r   rx   rC  r;  r<  rJ   r=  r>  r   r   r4   r   r   r   )	r   party_program_maprG   rj   rD  r#   r   ap_opkeyr   r   r   _split_fl_program?  s.   







z SplitFlOpsPass._split_fl_programc                 C   n   dd dd d}|j |dd| jidg id	d
d| jdg dg d|dt| jdt| jdt| jttid d S )Nforward_joint_rA   r  r	   @fl_psr  r'   r(   r   r   r!  r"  r#  r$  r%  r&  r<   r+   )	r   partA_to_partB_tensorpartA_to_partB_tensor_namer(  rB   r
  rL   r5   r6   r   rG   r   r+  r   r   r   _insert_partA_communicate_op^  ,   

z+SplitFlOpsPass._insert_partA_communicate_opc                 C   rM  )Nbackward_joint_r	   r  rA   rO  r  r'   r(   r   r   r!  r"  r#  r$  r%  r&  r<   r+   )	r   partB_to_partA_gradpartB_to_partA_grad_namer(  rB   r
  rL   r5   r6   rR  r   r   r   _insert_partB_communicate_opv  rT  z+SplitFlOpsPass._insert_partB_communicate_opc                 C   s   |D ]D}| t|rq| jt|}t|tr3|j|j|j|j	|j
|j|j|j|j|j|jd
}n||d}|j|_t|drF|j|_qd S )N)
r%   rb   r_   r,   r   ra   	trainableoptimize_attrregularizer
error_clipFrI   )Z_find_var_recursiver   ori_main_blockZ_var_recursive
isinstancer   Zcreate_parameterr%   rb   r_   r,   r   ra   rY  rZ  r[  r\  Z_clone_variabler   rI   )r   r    rG   r   Z
source_varZdest_varr   r   r   _create_var_for_block  s.   

z$SplitFlOpsPass._create_var_for_blockc           	      C   s   |t |jk r||}n| }t|D ](\}}|j }||j |t	|
t	 |j |j  }| || q|  |S r   )rH   rB  rG   r  r   r   r4   r   r   rC  rx   r   r   r_  r   )	r   op_listr#   Z	block_idxZ	new_blockr  rj   rJ  r    r   r   r   _get_block_by_idx  s   
z SplitFlOpsPass._get_block_by_idxc                 C   :   d}|j D ]}t|r|t|kr|  S |d7 }q|S Nr   rA   )r   Zis_forward_oprx   rC  r   rG   flagop_idxrj   r   r   r   _find_joint_forward_op     

z%SplitFlOpsPass._find_joint_forward_opc                 C   rb  rc  )r   Zis_backward_oprx   rC  rd  r   r   r   _find_joint_backward_op  rh  z&SplitFlOpsPass._find_joint_backward_opc                 C   s   |  ||}|j| }|j }| ||}|j| }|j }tt|t| | _g | _	| jD ]}| j	
| j| q1d S r   )ri  r   r   r   rg  r   r   r   rW  rV  rJ   r]  r   )r   rG   re  rf  rj   Zvars1Zvars2r-  r   r   r   _get_partB_to_partA_grad  s   




z'SplitFlOpsPass._get_partB_to_partA_gradc                 C   s(   | j }t||\}}t||t|| S r   )rH  Zfind_ops_list_input_outputZscreen_persistables)r   
bp_op_listr#   Zbp_op_inputZbp_op_outputr   r   r   _find_dense_grad_vars  s   
z$SplitFlOpsPass._find_dense_grad_varsc                 C   s`  |  || j}g }tt|jD ]&}|j| }|| ||kr6|j d }|j | _| j	
|| _ nq| || jd}| ||d  t|}t|}	| ||	 | jd}
dd dd d}|d t|
j }d|gd	|
gd
t| jdddt| jd| jdttddtti}|
jdddg ii |d t| j}t|| |  |}t!| j|
| d S )Nr   rA   rU  r	   r  rO  r\   r   r   r   r   r   r   r   r  r  r  r'   rd   )"rg  r<  r   rH   r   rJ   r   r   rQ  r]  r   rP  ra  partA_programrS  get_bp_op_listZ#get_distributed_push_sparse_op_listr   r   r2  rB   rL   r8   r   r  r  r5   r6   r   r0  rH  r'  rl  r  )r   rG   rf  r`  r   rj   Zout_namefirst_blockrk  Zpush_sparse_op_listsecond_blockblock_input_flagr  r/   Zsend_opsdense_grad_varsr   r   r   _get_partA_program  sT   







z!SplitFlOpsPass._get_partA_programc                 C   s\  |  || j}| || j}d}g }g }g }|jD ]}||k r%|| n||kr/|| n|| |d7 }q| || jd}	| || jd}
| |
t| | || jd}
t	|
}| 
|}t| j|
| dd dd d}|d t|
j }d|gd	|
gd
t| jdtt| jddd| jdttddtti}|	jt|ddg ii |d d S )Nr   rA   rN  r  r	   rO  r\   r   r   r   r   r   r   r   r  r  r  r'   rd   )rg  r>  ri  r   rJ   ra  partB_programrX  rH   rn  rl  r  rH  r   r   r	  rB   r
  r8   r   r  r  r5   r6   r   )r   rG   Zop_idx1Zop_idx2Zop_cntZop_list1Zop_list2Zop_list3rj   ro  rp  rk  rr  rq  r  r/   r   r   r   _get_partB_program  sT   






z!SplitFlOpsPass._get_partB_programc           	      C   s
  |j }|d | _|d | _|d | _|| _|d| _|  }|d }td }t	|| | 
| | j |d }td }t	|| | jsetj | _| |  | j|j d	< | | j t| j d S tj | _| |  | j|j d
< | | j t| j d S )NrB   r8   Zis_heter_workerr   rF  z6_fl_A_main_program.prototxtrG  z6_fl_B_main_program.prototxtZpart_a_main_programZpart_b_main_program)rC   rB   r8   Z	is_part_brH  rG   r]  rL  Zps_log_root_dirZdebug_programrj  r   r<  r   r   r   rm  rs  rE  Zcheck_programrt  ru  )	r   rM   rN   rO   r/   rI  Zprog_aZ
_main_fileZprog_br   r   r   rQ   J  s6   





z!SplitFlOpsPass._apply_single_impl)rR   rS   rT   r   r   r   r@  rA  rE  rL  rS  rX  r_  ra  rg  ri  rj  rl  rs  ru  rQ   rU   r   r   r   r   r:  $  s&    			5:r:  )r  _collectionsr   r   Zpaddle.baser   Z#paddle.distributed.passes.pass_baser   r   Zpaddle.frameworkr   Zpaddle.staticr   r   Zps.utils.publicr   rW   r   r   r   r   r   r   r  r5  r:  r   r   r   r   <module>   sF   X   $S86h O P%