o
    "j.x                     @   s   d dl Z d dlZd dlmZ d dlmZ d dlmZ d dlm	Z	m
Z
 g ZejjZG dd dZG dd	 d	eZG d
d deZG dd deZG dd deZdS )    N)unique_name)wait_server_ready)core)default_main_programdefault_startup_programc                   @   sd   e Zd ZdZdd Zdd Zdd Zdd	 Z	
dddZdd Z	dd Z
dd Zdd Zdd ZdS )
Collective c                 C   sN   || _ d | _d | _d | _d | _d | _d | _d | _tj	}|
 | _| | _d S N)nrings	endpointscurrent_endpointother_endpointsnranksrankstartup_programmain_programr   op_proto_and_checker_makerZkOpRoleAttrNameop_role_keyZkOpRoleVarAttrNameop_role_var_key)selfr
   Zop_maker r   r/var/www/html/Deteccion_Ine/venv/lib/python3.10/site-packages/paddle/distributed/ps/utils/collective_transpiler.py__init__!   s   
zCollective.__init__c           	      C   s  t |tr
|d}|| _|d u rt | _|| _|d u r t | _t|| _| jdkr8| j	dkr8| j	dkr8t
d|dk r@t
d|| _||vrOt
d|t||| _|| _|rit|}|d d  }|| || _|| _| j | j_|   | j | j_|   d S )	N,   single_process_multi_threadboxz the number of endpoints must > 1r   zrank must >= 0z current endpoint %s is not in %s)
isinstancestrsplitr   r   r   r   lenr   mode
ValueErrorr   r   r   remover   	wait_portcloneZ_origin_program_transpile_startup_program_transpile_main_program)	r   r   r   r   r   r   r$   r   r   r   r   r   	transpile.   sD   







zCollective.transpilec                 C   s   t d)Nz'call the inherited method of subclasses)NotImplementedErrorr   r   r   r   r'   g   s   z"Collective._transpile_main_programc              	   C   s:   t | jD ]}| | j| j| j| j|| j q|   d S r	   )	ranger
   _init_communicatorr   r   r   r   r$   _broadcast_params)r   ring_idr   r   r   r&   j   s   z%Collective._transpile_startup_programFc                 C   s6  d |}t|}	|d d  }
|
| | }|dkr"|r"t|
 | }t ro|jt	ddtj
jjd}dd t|D }|jdi d	|id
|d|d|
| jtjid |jdd|ii d|	d
|d|d|| jtjid d S t r|jt	ddtj
jjd}|jdi d	|id
|d|d|
| jtjid |s|jdd|ii d|	d
|d|d|| jtjid d S |jdd|ii d|	d|d|| jtjid d S tj jtj v r|jt	ddtj
jjd}dd t|D }|jdi d	|id
|d|d|
| jtjid |jdd|ii d|	d
|d|d|| jtjid d S d S )Nr   r   Zbkcl_idT)namepersistabletypec                 S      i | ]\}}||qS r   r   .0idxer   r   r   
<dictcomp>       z1Collective._init_communicator.<locals>.<dictcomp>Zc_gen_bkcl_idOutr   Zendpointr   r1   inputsoutputsattrsZc_comm_initXr   r.   r   Znccl_idZc_gen_nccl_idZc_comm_init_multitrainerZ	ntrainersZ
trainer_idZxccl_idc                 S   r2   r   r   r3   r   r   r   r7      r8   Zc_gen_xccl_id)joinr    r#   global_blockr   r   Zis_compiled_with_xpu
create_varr   generateVarDescVarTypeZRAW	enumerate	append_opr   OpRoleForwardZis_compiled_with_cudapaddledistributedZParallelEnvZdevice_typeZdeviceZget_all_custom_device_type)r   programr   r   r   r.   r$   Zhas_multitrainerZendpoints_strr   r   blockZbkcl_id_varZendpoint_to_index_mapZnccl_id_varZxccl_id_varr   r   r   r,   v   s   







zCollective._init_communicatorc                 C   s   | j  }d}| D ]"}|jrq|d | j }|jdd|id|id|dd| jtjid	 qt	| jD ]}|jd
d|id|id|| jtjid	 q3d S )Nr   Zc_broadcastr>   r9   r.   rootr   r:   c_sync_comm_stream)
r   r@   iter_parametersis_distributedr
   rF   r   rG   rH   r+   )r   rL   r.   paramr   r   r   r-      s.   
zCollective._broadcast_paramsc                 C   s>   | j |jvrdS t| | j  }|ttj@ o|ttj@ S )NF)r   
attr_namesint	all_attrsrG   BackwardZLoss)r   opZop_roler   r   r   _is_loss_grad_op  s   zCollective._is_loss_grad_opc                 C   (   | j |jv ot| | j  ttj@ S r	   )r   rS   rT   rU   rG   rV   r   rW   r   r   r   _is_backward_op  
   zCollective._is_backward_opc                 C   s   d|j v od|j v od|j v S )NParamGradLearningRate)Zinput_namesrZ   r   r   r   _is_update_op  s
   
zCollective._is_update_opc                 C   rY   r	   )r   rS   rT   rU   rG   OptimizerZ   r   r   r   _is_optimizer_op"  r\   zCollective._is_optimizer_opN)F)__name__
__module____qualname____doc__r   r(   r'   r&   r,   r-   rX   r[   r`   rb   r   r   r   r   r      s    9
r   c                   @   2   e Zd ZdZdddZdd Zdd Zd	d
 ZdS )GradAllReducer      c                 C   s   t | | d| _d S )NZgrad_allreduce)r   r   r!   r   r
   r   r   r   r   +     
zGradAllReduce.__init__c                 C   s   |    |   d S r	   )_insert_scale_loss_grad_ops_insert_allreduce_opsr*   r   r   r   r'   /  s   z%GradAllReduce._transpile_main_programc              
   C   sv   | j  }ttt|jD ]*\}}| |r8|j|jd  }|j	|d dd|id|idd| j
 | jtjid qdS )	z
        In order to keep the learning rate consistent in different numbers of
        training workers, we scale the loss grad by the number of workers
        r   r   scaler>   r9         ?r:   N)r   r@   reversedlistrE   opsrX   varsZoutput_arg_names
_insert_opr   r   rG   rV   )r   rL   r5   rW   Zloss_grad_varr   r   r   rl   3  s   


z)GradAllReduce._insert_scale_loss_grad_opsc           
      C   s  | j  }d}d }ttt|jD ]\}}| |r| j|jv r|	 | j }t
|dkr/qt
|d dks9J |}tdt
|dD ]O}|j||  }	|j||d   }|	jrYqC||krw|d7 }|j|dd|id|i| jtjid |d7 }|d | j }|j|d	d|id|id
|| jtjid qCq|d u rd S t|jD ]*\}}| |rt| jD ]}|j|| dd|id|id
|| jtjid q d S qd S )NrM   r   ri   r   c_sync_calc_streamr>   r9   r:   c_allreduce_sumr.   rO   )r   r@   rp   rq   rE   rr   r[   r   rS   rU   r    r+   rs   rQ   rt   r   rG   rV   r
   rb   )
r   rL   r.   gradr5   rW   op_role_varoffsetirR   r   r   r   rm   G  sn   



z#GradAllReduce._insert_allreduce_opsNri   )rc   rd   re   rf   r   r'   rl   rm   r   r   r   r   rh   (  s    
rh   c                   @   rg   )LocalSGDr   ri   c                 C   s   t | | d| _d| _d S )Nz	@SNAPSHOTZ	local_sgd)r   r   snapshot_keyr!   rj   r   r   r   r     s   
zLocalSGD.__init__c                 C   s   t |  | j }g }| D ]
}|js|| q|D ]#}|j| |j	|j
ddd}|jdd|gid|gi| jtjid qd S )NT)r/   shaper0   stop_gradientassignr>   r9   r:   )r   r&   r   r@   rP   rQ   appendrA   snapshot_namer/   r~   rF   r   rG   rH   )r   rL   Znon_dist_paramsrR   snapshotr   r   r   r&     s*   




z#LocalSGD._transpile_startup_programc                 C   s
   || j  S r	   )r}   )r   
param_namer   r   r   r     s   
zLocalSGD.snapshot_namec           	   
   C   s  | j  }g }d}ttt|jD ]y\}}| |r|j|dd  }|j	r)q|j
| |j|jdd|jd}|j|d d|g|gdd	|gi| jtjid
 |j|d dd|id	|i| jtjid
 |d | j }|j|d dd|gid	|gid|| jtjid
 |||f qt| jD ]}|jdd|id	|id|| jtjid
 qt|D ]J}|d }|d }|jdd|gid	|gidd| j | jtjid
 |jd|g|gdd	|gi| jtjid
 |jdd|gid	|gi| jtjid
 qd S )NrM   r]   r   T)r/   r~   r0   r   dtyper   Zelementwise_sub)r>   Yr9   r:   ri   ru   r>      rv   r.   rO   rn   ro   r   )r   r@   rp   rq   rE   rr   r`   rs   inputrQ   rA   r   r/   r~   r   rt   r   rG   ra   r
   r   r+   rF   r   )	r   rL   Zordered_param_snapshotr.   r5   rW   rR   r   Zparam_snapshotr   r   r   r'     s   





	

z LocalSGD._transpile_main_programNr{   )rc   rd   re   rf   r   r&   r   r'   r   r   r   r   r|     s    
r|   c                   @   s    e Zd ZdZdd Zdd ZdS )SingleProcessMultiThreadr   c                 C   s   t | d d| _d S )Nr   r   )rh   r   r!   r*   r   r   r   r     rk   z!SingleProcessMultiThread.__init__c                 C   s    | j  }|jdddid d S )Nc_comm_init_allr.   r   r1   r=   )r   r@   rF   )r   rL   r   r   r   r&     s   
z3SingleProcessMultiThread._transpile_startup_programN)rc   rd   re   rf   r   r&   r   r   r   r   r     s    r   c                   @   sB   e Zd ZdZdddZdd Zdd	 Zd
d Zdd Zdd Z	dS )MultiThreadr   r   
all_reducec                 C   s>   t | | d| _|| _d| _tddd}t|| _	d S )Nr      FLAGS_selected_gpusz0,1,2,3,4,5,6,7,8r   )
rh   r   r!   
trans_modefuse_grad_size_in_numosgetenvr   r    gpu_num)r   r
   r   Zgpu_numsr   r   r   r   
  s   zMultiThread.__init__c              
   C   s   t | jdkr;td td| j td| j td| j| jf  t| jD ]}| | j| j| j| j|| j	d q&d S d| j
v r`td | j }|jd	ttttd
dddd d S td | j }|jd	ddid d S )Nr   z2begin to _transpile_startup_program for multi-nodezcurrent_endpoint: ztotal endpoints: zrank: %d, ring_id: %dTZxpuz:begin to _transpile_startup_program for single-node in XPUr   r   r   r   )Zdevicesr.   r   z3begin to _transpile_startup_program for single-noder.   )r    r   printr   r   r
   r+   r,   r   r$   r   r@   rF   rq   maprT   r   r   r   )r   r.   rL   r   r   r   r&     sD   



z&MultiThread._transpile_startup_programc                 C   s   |    | jdkrtd | j| j | _|   |   d S | jdkr-td |   d S | jdkrDt	t
dddkrDtd	 d S td
 |   d S )NZ
all_gatherz%begin to transpile in all-gather modeZfuse_all_reducez*begin to transpile in fuse all-reduce modeZall_reduce_xpur   r   r   zHskip transpile in all-reduce-xpu mode when number of devices is only onez%begin to transpile in all-reduce mode)rl   r   r   r   r   allgather_ranks_insert_allgather_ops_update_adam_ops_insert_fuse_allreduce_opsr    r   r   r   rm   r*   r   r   r   r'   ;  s    


z#MultiThread._transpile_main_programc                 C   s  | j  }d}d}ttt|jD ]\}}| |r| j|jv r|	 | j }t
|dkr/qt
|d dks9J |}tdt
|dD ]j}|j||  }	|j|| d | jgt|	j dtjjjdd}
|j||d	   }|	jrqqC||kr|d	7 }|j|d
d|id|i| jtjid |d	7 }|d	 | j }|j|dd|id|
id| jd|| jtjid qCq|du rdS t|jD ]*\}}| |rt| jD ]}|j|| dd|id|id|| jtjid q dS qdS )z9
        insert allgather op to the main_program
        rM   Nr   ri   
_allgatherFTr/   r~   r0   r   r   r   ru   r>   r9   r:   Zc_allgatherr   r.   rO   )r   r@   rp   rq   rE   rr   r[   r   rS   rU   r    r+   rs   rA   r   r~   r   rC   rD   FP32rQ   rt   r   rG   rV   r
   rb   )r   rL   r.   rw   r5   rW   rx   ry   rz   rR   Znew_grad_varr   r   r   r   P  s~   




z!MultiThread._insert_allgather_opsc              
      s  | j   ttt jD ]\}| r|}jdkr$jdkr$qdd  j	dd   j	dd   j	dd   j	dd   j	dd   j	d	d  d
} j	
dd   j	
dd   j	
dd   j	
dd   j	
dd  d}dddddd} fddt| jD } j|dd j	dd d  id|i| jddd |d7 }t| jD ]}|| |d <  j|j|||d |d7 }qވ | qd!S )"zC
        remove the original adam op, and add new adam ops
        ZadamZlambr]   r   r_   Moment1Moment2Beta1PowBeta2Pow)r]   r_   r   r   r   r   ParamOut
Moment1Out
Moment2OutBeta1PowOutBeta2PowOut)r   r   r   r   r   epsilonbeta1beta2	lazy_modemin_row_size_to_use_multithread)r   r   r   r   r   c              	      sD   g | ]} j d  t|  jdd  jdtjjjddqS )_r]   r   FTr   )	rA   r   rs   r   r~   r   rC   rD   r   )r4   rz   rL   rW   r   r   r   
<listcomp>  s    z0MultiThread._update_adam_ops.<locals>.<listcomp>r   r>   r   r9   )numZaxisr:   r   r^   N)r   r@   rp   rq   rE   rr   rb   r1   r   rs   outputattrr+   r   rt   Z
_remove_op)r   r5   ry   r;   r<   r=   Z
split_varsrz   r   r   r   r     sj   

		

	

zMultiThread._update_adam_opsc                 C   s  | j  }d| j }d}g }t|jD ]O}| |rb| j|jv rb| | j }t	|dkr.qt	|d dks:J dt
dt	|dD ]}|| }||}	||d  }
||
}|	jr\qB|| qBq|du ridS g }d}|D ]'}t	|dkst	|d | jks|j|kr||g |j}qo|d | qog }t|jD ]F\}}| |r|D ]8}|jtd|d j |d jdd	d
}|| |j|dd|i||ddd	dd	d|d j| jtjid q nqt|jD ]9\}}| |r#|D ]*}|j|dd|id|id|dd| jtjid |j|dd|id|i| jtjid q nqt	|dkr1|  dS t|jD ]%\}}| |rZ|j|dd|d id|d id|| jtjid  nq6|  dS )z;
        insert coalesce_tensor and all reduce ops
        r   Nri   zRvars need to be one param var followed by one grad var, but got odd number of varsr   rM   ZFusedOutput_FT)r/   r   r0   r   Zcoalesce_tensorZInput)OutputZFusedOutputZ	copy_dataZ	use_alignr   r:   rv   r>   r9   r.   Zuse_calc_streamru   rO   )r   r@   r
   rp   rr   r[   r   rS   rU   r    r+   varrQ   r   r   r   rE   rb   rA   r   rB   r/   rt   r   rG   rV   Z_sync_with_cpp)r   rL   r.   rw   Zparam_gradsrW   rx   rz   r   rR   Z	grad_namesegmentsZ
last_dtyper   Z
fused_varsr5   segmentZtmp_varZ	fused_varr   r   r   r     s   











z&MultiThread._insert_fuse_allreduce_opsN)r   r   )
rc   rd   re   rf   r   r&   r'   r   r   r   r   r   r   r   r     s    

'KHr   )r   rI   Zpaddle.baser   Z5paddle.distributed.fleet.base.private_helper_functionr   Zpaddle.frameworkr   Zpaddle.staticr   r   __all__r   rG   r   rh   r|   r   r   r   r   r   r   <module>   s     ar