o
    "jT                    @   s8  d dl Z d dlZd dlmZ d dlmZ d dlmZmZm	Z	 d dl
mZ ddlmZ dd	lmZmZmZmZmZmZmZ dd
lmZ ddlmZ ddlmZ ddlmZ ddlmZ ddl m!Z! ddl"m#Z#m$Z$ ddl%m&Z&m'Z'm(Z(m)Z)m*Z*m+Z+m,Z,m-Z-m.Z.m/Z/m0Z0 ddl1m2Z2 g Z3G dd deZ4G dd de4Z5dS )    N)core)PipelineOptimizer)create_global_vardefault_startup_programdevice_guard)unique_name   )logger   )OP_ROLE_KEYOP_ROLE_VAR_KEYCollectiveHelperOpRoleis_backward_opis_optimizer_opis_update_op)MetaOptimizerBase)utils)	FP16Utils)GradientClipHelper)OffloadHelper)ProgramDeps)ProgramSegmentShard)get_first_optimize_op_idxget_grad_deviceget_var_sizeinsert_allreduce_opsinsert_broadcast_opsinsert_cast_opsinsert_fill_constant_opsinsert_reduce_opsinsert_scale_loss_grad_opsinsert_sync_calc_opinsert_sync_comm_ops)WeightDecayHelperc                       s,  e Zd ZdZ fddZdd Zdd Zdd	 Zd
d Zdd Z	dd Z
dd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zd d! Z	"dGd#d$Zd%d& Zd'd( Zd)d* Zd+d, Zd-d. Zd/d0 Zd1d2 Zd3d4 Zd5d6 Zd7d8 Zd9d: Zd;d< Z d=d> Z!d?d@ Z"dAdB Z#dCdD Z$dEdF Z%  Z&S )HShardingOptimizerSharding Optimizer.c                    sn   t  | || _g d| _g | _d | _d | _g | _t | _	t | _
i | _t | _d| _d| _d| _d| _d S )N)RecomputeOptimizerAMPOptimizerLarsOptimizerLambOptimizerASPOptimizerFr
   )super__init__	inner_optmeta_optimizers_white_listZmeta_optimizers_black_list_main_program_startup_program	_segmentsset_params_broadcast_vars_reduced_grads_to_paramr   _shard_verbose_thread_mode_use_calc_stream	mp_degree)self	optimizer	__class__ |/var/www/html/Deteccion_Ine/venv/lib/python3.10/site-packages/paddle/distributed/fleet/meta_optimizers/sharding_optimizer.pyr.   A   s   
	
zShardingOptimizer.__init__c                 C   s&   | j jsdS | j  dkrdS | jjS )NFr
   )
role_makerZ_is_collective_worker_numuser_defined_strategyshardingr=   rA   rA   rB   
_can_apply^   s
   zShardingOptimizer._can_applyc                 C   s   d|_ i |_d S )NFrF   sharding_configs)r=   dist_strategyrA   rA   rB   _disable_strategye   s   
z#ShardingOptimizer._disable_strategyc                 C   s   d|_ ddi|_d S )NTsegment_broadcast_MB    rI   )r=   rK   contextrA   rA   rB   _enable_strategyi   s   z"ShardingOptimizer._enable_strategyc                 C   s   | j }|j}t|d }|dkr|d | _| jdksJ dn)|dkr?|d | _t| jdks3J d| jdd | _g | _n	td	t||| _
dS )	a	  get
        self._sharding_segment_strategy
        1. if by_size:    self._broadcast_MB
        2. if by_anchors: self._sharding_segment_anchors
                          self._backward_remain_anchors
                          self._forward_remain_anchors
        Zsharding_segment_strategyrM   r   z&segment size should larger than zero !segment_anchorsz-you should set the sharding segment anchors !Nz5the sharding segment strategy [{}] is not implemented)rE   rJ   str_broadcast_MBZ_sharding_segment_anchorslen_backward_remain_anchors_forward_remain_anchorsNotImplementedErrorformat_sharding_segment_strategy)r=   strategyrJ   Zsegment_strategyrA   rA   rB   _get_sharding_segment_strategym   s*   


z0ShardingOptimizer._get_sharding_segment_strategyc                 C   s$  | j }|j}t|d }t|d }t|d }t|d }| j }|dks+J d|dkr6|jdu s6J td	d
rW|dksDJ d||| | ksVJ d||||n||| | | kslJ d||||||d r{t	
d |dks{J |dkrdnd| _|| _|| _|| _|| _d
S )zget
        self.hybrid_dp
        self.sharding_degree
        self.mp_degree
        self.pp_degree
        self.dp_degree
        sharding_degreer<   	pp_degree	dp_degreer   z(sharding degree must be larger than zeror
   TPADDLE_MANUAL_PIPELINE_STAGENr   ;For manually set pipeline, only pp_degree = 2 is supported.Lglobal work size [{}], mp_degree [{}], sharding_degree [{}], dp_degree [{}].z\global work size [{}], mp_degree [{}], sharding_degree [{}], pp_degree [{}], dp_degree [{}].	hybrid_dpzk[hybrid_dp] API setting is deprecated. Now when dp_degree >= 2, its will be in hybrid dp mode automaticallyF)rE   rJ   intrC   rD   ZpipelineosgetenvrX   r	   warningrb   r\   r<   r]   r^   )r=   rZ   rJ   r\   r<   r]   r^   Zglobal_world_sizerA   rA   rB   _get_hybrid_degree   sP   


z$ShardingOptimizer._get_hybrid_degreec                 C   s  | j }|j}d}| jr| jdkrd}n| jdksJ dd}d}t|d }| jdkr1d}i | _nd}|jd	 }|j}|d
 dksJJ d	|d
 |d | _
|dkr_td| d| d d}| jdkrv| jdkrv|d rv| jdkrvd}|| _|| _|| _|| _|d | _dS )zget
        self.hybrid_dp_mode = 'pp_hybrid_dp' or 'sharding_hybrid_dp'
        self.gradient_merge_mode = 'pp_gm' or 'sharding_gm'
        self._gradient_merge_acc_step
        self.pp_allreduce_in_optimize
        self._optimizer_sharding
        Nr
   pp_hybrid_dpzby now we only support five kind of hybrid dp: sharding_hybrid_dp, mp_sharding_hybrid_dp, pp_hybrid_dp, mp_sharding_pp_hybrid_dp, sharding_pp_hybrid_dp.sharding_hybrid_dpgradient_merge_acc_stepsharding_gmZpp_gmZaccumulate_stepsZscale_strategyZavgzFFor pipeline mode, the gradient scale mode should be "avg", but got {}scale_gradientzGradient merge in [z], acc step = []FZ_dp_as_optimizer_shardingTpp_allreduce_in_optimize)rE   rJ   rb   r]   r\   rc   _grad2merged_gradpipeline_configsgradient_scale_configsrX   rl   r	   infor^   hybrid_dp_modegradient_merge_mode_gradient_merge_acc_step_optimizer_shardingrn   )r=   rZ   rJ   Zdp_modeZgm_modeZgm_acc_steprq   Zoptimizer_shardingrA   rA   rB   _get_hybrid_dp_mode   sV   









z%ShardingOptimizer._get_hybrid_dp_modec                 C   s"  | j j}| jd u rtd| jdkrZt| j| j}|| _| j	 }|d }||d | j
|ddd| j|| j | jd
}	|jj}
|	|
_|||||\}}}| _| _| jt|ksYJ n| j||||\}}|d u rlt }| jdkr|jd	 }td
| j
 tdd r|ttd }
n|| j
 }
td| j	  d}|t|
 W d    n1 sw   Y  |
 }g }|D ]\}}||jr|||f q|}n|j}| }|j| _ || _!| jdkr|"| td| j	  d}|t|
 W d    ||fS 1 sw   Y  ||fS )Nz7self.inner_opt of ShardingOptimizer should not be None.r
   schedule_modemicro_batch_sizeT      )
rx   ry   Z
local_rankglobal_rankZuse_shardingring_idglobal_ring_idr<   mp_rankrl   startup_programzpp_rank:r_   zmain_%dw)#rE   rp   r/   
ValueErrorr]   r   ru   _pp_optimizerrC   _worker_indexpp_rankr<   rl   blockprogramZ_pipeline_optZminimizepipeline_pairpp_ring_maprT   r   printrd   re   rc   open
writelinesrR   global_blockhas_varnameappendr1   r2   Z_rename_gradient_var_name)r=   lossr   parameter_listno_grad_setrp   Zpp_optimizerr|   rx   Zpipeline_optZmain_programoptimize_opsparams_gradsZprogram_listf
main_blockZnew_params_gradsparamgradstartup_blockrA   rA   rB   _inner_opt_minimize  s   







z%ShardingOptimizer._inner_opt_minimizec                 C   s   | j dkrd S | j }| j }| || j| j  | | | | |  |  | 	|| j
| j| j| jg | || j
 d S Nr
   )r\   r1   r   r2   _build_shardsharding_rank_split_program_add_broadcast_allreduce_sync_with_cpp_prune_main_programr8   
mp_ring_idsharding_ring_id
pp_ring_id_prune_startup_program)r=   r   r   r   rA   rA   rB   _apply_sharding_passj  s"   





z&ShardingOptimizer._apply_sharding_passc                 C   s~   | j du rdS | j }| j }| || j| j |D ]\}}|j| j|j< q| 	|| j
| j| j| jg | || j
 dS )zouter dp as optimizer shardingFN)rv   r1   r   r2   r   dp_rankr^   r   r7   r   r8   r   r   
dp_ring_idr   )r=   r   r   r   r   r   rA   rA   rB   _apply_opt_sharding_pass  s   


z*ShardingOptimizer._apply_opt_sharding_passc                 C   s  | j dkrd S | j}|j}| j }| j }| jdkrkttt	|j
D ]\}}t|rA|d}|d }	| j|	sA|| q%ttt	|j
D ]\}}|jdkrUqK|jd }
|
| jvr`qK|
|jvrj|| qK| jrqd|_| jrw| jnd }| jj|||d}t|j
}| jr| || t|}| jrtd|  t||| j || jt!j"j#j$d| j%d	}td
|  |t|j
| 7 }t|j
}| jr3t&j||| j'|| jt#j$d| j(|d	}td|  |t|j
| 7 }t|j
}|d }t&j)||| j'dd |D | jt#j$d| j(|rd n|d	}td|  |j*s-|s/t|t|ks1J d S d S d S | j+r\| j,dkr^t-||| j'|t!j"j#j$d|d |t|j
| 7 }t|j
}d S d S d S )Nr
   op_role_varr   castF)rZ   shardzPipeline Persistable grad is T)use_calc_streamrankzPP-Sharding grad is )r   r   rZ   zOptimizer grad in this rank optimize_castc                 S      g | ]}|d  j qS r   r   .0xrA   rA   rB   
<listcomp>      z>ShardingOptimizer._insert_allreduce_for_pp.<locals>.<listcomp>zOptimizer param in this rank rh   )r   rE   ).r]   rE   rJ   r1   r   r2   r\   reversedlist	enumerateopsr   attrr8   	has_param
_remove_optypeinput_arg_namesr5   varsrv   Zfp16_allreducer   Z_accumulate_gradientsrT   rl   _avg_grad_merge_after_sumr   rn   r	   rr   r!   r   r   op_proto_and_checker_makerr   Optimizer   r   r   r   Zinsert_broadcast_param_opsZfuse_grad_mergerb   rs   r   )r=   r   rZ   rJ   r   r   idxopr   
param_nameZin_namer   accumulated_grad_namesZ
len_of_opsZfirst_optimize_op_indexr   Zoptimizer_paramrA   rA   rB   _insert_allreduce_for_pp  s   













	z*ShardingOptimizer._insert_allreduce_for_ppc                 C   s0  | j jrV| j jd rVt|jD ]D\}}|jdkrS|dd }||}|d }|j||j	|j
d}|j|dd|id	|id| jd
dddttjid |||  d S qd S d }	t|jD ]\}}t|rn|jdkrn|}	 nq]|	d uswJ d|D ]}
|j|	dd|
id	|
idd| j d
dddttjid qyd S )NZuse_dynamic_loss_scalingZcheck_finite_and_unscaleZScaler   @TMPr   shapedtypescaleXOutbias        bias_after_scaleFr   Zinputsoutputsattrsc_sync_comm_streamz#Occurs some errors, no optimize ops      ?)rE   ampZamp_configsr   r   r   inputvar
create_varr   r   _insert_op_without_syncru   r   r   r   _rename_inputr   )r=   r   r   r   r   Zloss_scale_nameZloss_scaling_varZloss_scale_tmp_var_nameZloss_scale_tmp_varZtmp_first_opt_idxr   rA   rA   rB   r     sj   



z+ShardingOptimizer._avg_grad_merge_after_sumc                 C   sh   | j dkrd S | jrd S | j }| j }| j| jg}t|| t	d }|
|| j| jg| j d S r   )r\   rv   r1   r   r2   r   r   r   Zsync_amp_check_nan_infr   Zsync_global_normr   )r=   r   r   ringsgradientclip_helperrA   rA   rB    _adapt_amp_clip_without_shardingI  s   


z2ShardingOptimizer._adapt_amp_clip_without_shardingc                 C   sF   | j  }| j| j }t||ksJ |dkrt||d |  d S )Nr
   )r   )r1   r   r\   r^   rc   r"   r   )r=   r   Zglobal_dp_degreerA   rA   rB   _insert_loss_grad_scale_op^  s   
z,ShardingOptimizer._insert_loss_grad_scale_opc           	      C   s   | j }|j}| j }| j }| jdkr| jnd }| jdkr"| jnd }t	||d}|d rAt
d ||| ||| d S |d rmt
d | jre|||dd |D  tj||| j|d	 d S ||| d S d S )
Nr
   )r   r   Zoptimize_offloadz Sharding with optimize offload !r   zSharding with optimize cast !c                 S   r   r   r   r   rA   rA   rB   r     r   zBShardingOptimizer._apply_optimize_offload_pass.<locals>.<listcomp>)rZ   )rE   rJ   r1   r   r2   r<   r   r^   r   r   r	   rr   ZoffloadZoffload_fp32paramrv   Zopt_sharding_cast_fp32paramr   Zfuse_opt_broadcast_param_opsr8   Zcast_fp32param_in_optimize)	r=   r   rZ   rJ   r   r   r   r   Zoffload_helperrA   rA   rB   _apply_optimize_offload_passi  s4   





z.ShardingOptimizer._apply_optimize_offload_passc                 C   s   | j  }| j }td| j  d}|t|j W d    n1 s'w   Y  td| j  d}|t|j W d    d S 1 sJw   Y  d S )Nzstart_sharding_%dr   zmain_sharding_%d)	r1   r   r2   r   rC   r   r   rR   r   )r=   r   r   r   rA   rA   rB   _dump_program_for_debug  s   

"z)ShardingOptimizer._dump_program_for_debugNc                 C   s   d| _ d| _|   |   |   |   | ||||\}}|   | | | 	| | 
| |   |   | | |   |   |   |   tdd }|s]|   ||fS )Nr
   Z!FLAGS_dynamic_static_unified_comm)_nrings_shardingZ
_nrings_dpr[   rg   rw   _build_groupsr   
_init_commr   r   r   r   r   r   _sharding_gradient_merge_initialization_broadcast"_recreate_not_persist_param_as_varr   paddleZ	get_flags_wait)r=   r   r   r   r   r   r   Zuse_new_commrA   rA   rB   minimize_impl  s4   




zShardingOptimizer.minimize_implc              	   C   sh   | j |d  | j |d  g}| j|d krdnd}tdd d u r2| jj| j| j|||ddd d S d S )Nr   r
   r_   Fsync)pp_group_endpointsr   rd   re   _collective_helper_init_communicatorr2   current_endpoint)r=   pairr}   r   r   rA   rA   rB   _init_pair_comm  s   
z!ShardingOptimizer._init_pair_commc              	   C   s   t dd d u r| jj| j| j| j| j| jddd | j	D ]'}|d d |d  }| j
| }td| d|  | j|v rD| || qd S )	Nr_   Fr   r   i  r
   zpp pair:z, ring_id: )rd   re   r   r   r2   r   r   r   r   r   r   r	   rr   r   )r=   r   r   Zpair_keyr}   rA   rA   rB   _init_pipeline_comm  s$   


z%ShardingOptimizer._init_pipeline_commc              	   C   s   | j  }| jdkr| jj| j | j| j| j| jddd | j	dkr3| jj| j | j| j
| j| jddd | jdkr=| | | jdkrT| jj| j | j| j| j| jddd |  d S )Nr
   Fr   )r2   r   r<   r   r   r   mp_group_endpointsr   r   r\   sharding_group_endpointsr   r   r]   r   r^   dp_group_endpointsr   r   r   )r=   r   rA   rA   rB   r     sD   






zShardingOptimizer._init_commc                 C   s8   dd |D | _ | j||| | j| j | _d S )Nc                 S   s   h | ]}|d  j qS r   r   r   rA   rA   rB   	<setcomp>-  r   z1ShardingOptimizer._build_shard.<locals>.<setcomp>)r5   r8   setupZfind_broadcast_paramsr1   r   r6   )r=   r   Z
shard_rankZ
shard_sizerA   rA   rB   r   +  s
   
zShardingOptimizer._build_shardc                 C   s8   | j d d  }|| j }| jdkr| j|| d S d S )Nr   )global_endpointsr|   r   r   )r=   	endpointsr   rA   rA   rB   r   5  s
   

zShardingOptimizer._waitc                 C   s.   |d |_ | jd| t|}|d |_|S )Nr
   r   )
_start_idxr3   insertr   _end_idx)r=   segmentop_idxr   Znew_segmentrA   rA   rB   collect_segment=  s
   

z!ShardingOptimizer.collect_segmentc              	   C   s  t tt|jD ]\}}t|dttjkr|d } nq	i }t|}||_	t t
|D ]}|j| }t|dttjksEJ | jdkrX|j| jkrW| |||}nw| jdkrt|dttjkr|j D ]9}| jjrd|vryqn|d |d }|| jv r| |||}|| jvsJ d| d| j| | j| qnn&t|dttjkr|j D ]}|| jv r| |||}| j| q|j D ]q}|| jvrq||jv r|j| }	||	kr|||	 q| j|r|}
nt !|d }
| j"s|j#|
 |}d	|v r|d |d
 }|$|dd ||< |
|j|< |j|
| j%|f | jt&| j'( )|7  _q| j*dkrQ| j+rQnMt,|rt-|j.v r|/ t- }t0|dkrt0|d dkstJ t
dt0|dD ]!}|| ||d  }}|j1| || j2vsJ || j2|< q|t34||| j5r|j d }|j d }| j|r||j6|< q/|jdkrd|_7| j89d| | jdkrt0| jdksJ d| j t0| jdksJ d| j | j:rat;||j$ddD ]}t<=d>|| | qt
t0| j8D ]H}t<=d| d t<=d>|j| j8| j7 j? |j| j8| j7 j  t<=d>|j| j8| j	 j? |j| j8| j	 j  qd S d S )Nop_roler
   rM   rQ   z.cast_fp16@GRADzsegment anchor [z] met twice !
@BroadCastZsubprogz.subprogr   r   zremain anchors T)keyreversez#Sharding broadcast: [{}] times [{}]z	segment [z] :zstart op: [{}]  [{}]zend   op: [{}]  [{}])@r   r   r   r   rc   r   r   r   r   r  rangerY   Z
_param_memrS   r  Backwarddescr   rE   r   findrU   rV   remover   Forwardoutput_arg_namesr6   _param2broadcastr   r8   r   r   generater:   _fill_constant_varsgetZdevicer   r1   r   r   r]   rn   r   r   Z
attr_namesZ	all_attrsrT   _allreduce_varsr7   r   Zis_fp16_cast_opr5   	_cast_opsr   r3   r   r9   sortedr	   rr   rX   r   )r=   r   r  r   Zlast_backward_op_idxZvar2broadcast_timer  
input_nameoutput_namebroadcast_nameZbroadcast_var_nameZbroadcast_var_base_namer   ir   Zreduced_gradZ
fp32_paramZ
fp16_paramvarnameZidx_rA   rA   rB   r   E  s  


















	z ShardingOptimizer._split_programc                 C   s  t  }||| t||| j| td}|||| g }t|jD ]+\}}|j	
 }	|j	 }
|jdkrO|ddu rOt|
dksFJ |
d }|| q$g }t|j D ]}||rj||sj|| qYt|||}|jD ]}|j| qtttt|jD ]\}}|jdv rq|jdkr|j	d	sJ |j	d	j}||}|du s| |sqg }|j	d
D ]}||jv r|j| ||| q|| qttt|j jD ]\}}|!|r|"| qg }|j	#dD ]}||jvr|| q|$|| q|j	%d| |j	&d
| q|!|r0| j'r(| j(nd}|"|| q|)  ttt|jD ].\}}|jdkrkt*|rkg }|j	#dD ]}|+|rb|| qU|j	%d| q>|)  dS )aW  
        calculate deps from allredce op to optimize op,
        remove ops and vars not needed in this worker

        1. prune regularization (weight decay)
        2. prune cast_fp32_to_fp16; update amp_infine_checking
        3. prune gradient_clip related; update global_norm_sum
        4. prune optimizer op + param + gradient

        Nc_allreduce_sumZuse_model_parallelFr
   r   )	r  r   Zc_calc_comm_streamc_gen_nccl_idZc_gen_bkcl_idZc_gen_xccl_idZc_comm_initZsend_v2Zrecv_v2conditional_block	sub_blockr   Inputconcatr   ),r%   Zprune_weight_decayr   Z
prune_fp16r7   r   Zprune_gradient_clipr   r   r  r   r  r   r   rT   r   r   r   keysZis_opti_varZhas_opt_varr   Z	_end_varsZ_should_removed_varaddr   Zhas_attridZget_sub_block_depsZ_is_amp_subblockoutputZcrop_output_var_from_op_blockZshould_remove_opZ	remove_opr   Zcrop_input_var_from_opZ	set_inputZ
set_outputrv   r5   r   r   r   )r=   r   r   r   Zweightdecay_helperr   Zreduced_gradsr   r   Zinput_namesZoutput_namesr  Zpruned_opti_varsvar_nameZprogram_depsZsubblock_idxZsubblock_depsZreversed_output_varsZ
sub_op_idx_Zreversed_input_varsr  Zreserved_varsZ
reserved_xrA   rA   rB   r     s   











z%ShardingOptimizer._prune_main_programc              
   C   s~  t | jdk r	dS | jdkr'| jr'tt | jD ]}t | j| jdks&J q| jd j}t| jd jd | jd jd dD ]-}|j| }|j	dksP|j	dkr\d|j
d v r[|d }q?|j	dkrld	|j
d v rl|d }q?|| jd _| jd jr| j| jd j}| jd
ks| jdkr| jr| jdkrt |dkr| jst|| jd j| j| t|| jd j| j|| j| jd n| jd
kr| jdkr| || j | jd j|| j | jst|| jd j| j| jd j t|| jd j| j| jd j| jtj| jd ttt| jD ]\}}|dkr| j|d  jng }|t | jd k r1| j|d  j ng }|t | jd k rE| j|d  j!ng }	|t | jd k rY| j|d  j"ni }
tt|j|jD ](}|j| }|j#$ D ]}||j%v r||j%| kr|&||j%|  qpqd|j%' D ]!\}}||kr|j(|| j) *|j+| j) *|j,dd q|-  | jt./|| j0|d7  _| j|}| jd
ks| jdkr0| jr| jdkrt |dkr| jst||j| j| dd |D }| jst |dkrt||j| j| nE|dd |D  }| js/t |dkr/t||j| j| n'| jd
krW| jdkrWdd |D }| jsWt |dkrWt||j| j| |	dd |
' D  | j| j }| js}t |dkr}t1||j|d g t2||j|	 t3||j|
 | jd
kr| jdkr| || j |j|| j t4||j| j|| j | jd
ks| jdkr| jr| jdkrt |dkrt||j| j|| j| jd | jst||j| j| n| jd
kr| jdkr| jst||j| j| t |dkrt||j| j|| jtj| jd |-  q| jd j rSdd | jd j D }| jsAt|| jd j| j| t4|| jd j| j| jd j | j g }	| jdd D ]}|	|j!7 }	q\i }
| jdd D ]}|j"' D ]	\}}||
|< quqn|	dd |
' D  }| js|	s|
rt1|| jd j|d g |	rt2|| jd j|	 |
rt3|| jd j|
 dS )z
        add broadcast allreduce op
        if enable gradient_merge, insert related ops

        if combined with pipeline(grad accumulate),
        the grad allreduce should be done in optimize role
        r
   Nr   fill_constantsumZMERGEDr   r   rk   ri   )rE   r   )r  r   r   Fr   r   r   persistablec                 S      g | ]}|d  qS r   rA   r   rA   rA   rB   r         z>ShardingOptimizer._add_broadcast_allreduce.<locals>.<listcomp>c                 S   r.  r   rA   r   rA   rA   rB   r     s    c                 S   r.  r   rA   r   rA   rA   rB   r     r/  c                 S      g | ]\}}|qS rA   rA   r   kvrA   rA   rB   r     r/  c                 S   r.  r   rA   r   rA   rA   rB   r   p  r/  c                 S   r0  rA   rA   r1  rA   rA   rB   r     r/  )5rT   r3   r]   rn   r	  r  r  r   r   r   r  r8   Zfilter_gradsrt   ru   rb   rs   r;   r$   r   r   rE   1create_persistable_gradients_and_insert_merge_opsr2   r   r   r!   r   r
  r   r   r   r6   r  r  r  r   r  r   itemsr   r1   r   r   r   r   r   Zremove_cast_opr5   r#   r    r   r   )r=   r   r   Znew_end_idxr   Zshard_allredue_varsr  Zallreduce_varsZbroadcast_varsZfill_constant_varsZcast_opsr  r  r   r  Zbroad_cast_varsZcomm_dep_varsZcalc_dep_varsr   r2  r3  Zcalc_deps_varsrA   rA   rB   r   O  s  


















	



	



z*ShardingOptimizer._add_broadcast_allreducec                 C   s   t tt|jD ]"\}}|j D ]}||rq| jr#||r#q|j	|dd  q	t|j
 D ]}||r;q3| jrD||rDq3|j|dd q3|  d S )NFr   )r   r   r   r   r  r  r   rv   Zis_paramr   r   r"  _remove_varr   )r=   r   r   r   r   r  r'  rA   rA   rB   r     s    

z(ShardingOptimizer._prune_startup_programc                    sT   j   _ j   _ j   _ jr j j    _	n j j  _	t
 j  jd _ j j dksAJ d j j j j dksSJ d j j j j dkseJ d j j j j dkswJ d j j jdkrd _ j j  _ j j  _ fdd	t jD  _ j	 jv sJ t j jksJ d
t j jnd _d _d _d _g  _ jdkrd _ j j  j  _ j j j   _ jdkr fdd	t jD  _n fdd	t jD  _ j	 jv sJ nd _d _d _d _g  _ jdkrd _d _ j j j   j  _ j j j  j   _ j j j   j j j  j   } j j }g  _ t! jD ]} j " j|||    qe j	 j v sJ nd _d _d _d _d _g  _  j}t#$ddrĈ jdksJ d j j j  j ksJ d j j j jd}n  j j j  j  j ksJ d j j j j j jdkr4d _% j j j |   _& j j j |  } j j | }g  _'t! jD ]} j'" j|||    q j	 j'v s.J t()d n	d _%d _&g  _'d _*t()d j  t()d j  t()d j  t()d j*  t()d t()d j  t()d j  t()d j  t()d  j  t()d! j  t()d t()d" j  t()d# j  t()d$ j  t()d% j  t()d& j  t()d t()d' j  t()d( j  t()d) j  t()d* j   t()d+ j  t()d t()d, j  t()d- j&  t()d. j'  t()d/ j%  t()d dS )0a$  
        pre-assign ring ids
            mp: 0
            sharding: 1
            pure-dp: 2
            global: 3
            pp: 4
            pp-pair: >= 20
        if one parallelism is not enable: -1
        and only support parallelism hierarchy: mp --> sharding --> pp --> dp
        )Znringsr   z=global_word_size: {} should be divisible to the mp_degree: {}zCglobal_word_size: {} should be divisible to the sharding_degree: {}z=global_word_size: {} should be divisible to the pp_degree: {}z=global_word_size: {} should be divisible to the dp_degree: {}r
   c                    s$   g | ]\}}| j   jkr|qS rA   )r<   mp_group_idr   r   eprG   rA   rB   r     s
    z3ShardingOptimizer._build_groups.<locals>.<listcomp>z<num of mp worker in group is [{}], but mp group size is [{}]r)  c                    s:   g | ]\}}| j  j   jkr| j   jkr|qS rA   )r<   r\   sharding_group_idr   r8  rG   rA   rB   r     s    c                    s*   g | ]\}}| j  j   jkr|qS rA   )r<   r\   r:  r8  rG   rA   rB   r     s    rz      r_   Nr   r`   ra   z`mp_degree: [{}], sharding_degree: [{}], pp_degree: [{}], dp_degree: [{}]; BUT global nrank: [{}]zHybrid DP mode turn on !r{   zglobal word size: zglobal rank: zglobal endpoints: zglobal ring id: z##############################zmp group size: z	mp rank: zmp group id: zmp group endpoints: zmp ring id: zsharding group size: zsharding rank: zsharding group id: zsharding group endpoints: zsharding ring id: zpp group size: z	pp rank: zpp group id: zpp group endpoints: zpp ring id: zpure dp group size: zpure dp rank: zpure dp group endpoints: zpure dp ring id: )+rC   rD   Zglobal_word_sizer   r|   Z_get_trainer_endpointsr   r:   _role_idr   r   r   r   r<   rX   r\   r]   r^   r   r   r7  r   r   rT   r   r   r:  r   Zpp_pair_ring_idr   r   Zpp_group_idr   r	  r   rd   re   r   r   r   r	   rr   r~   )r=   Zpp_first_stage_idxZpp_stage_offsetr  Zlocal_pp_degreeZdp_first_rank_idxZ	dp_offsetrA   rG   rB   r     sv  







	







zShardingOptimizer._build_groupsc                 C   s    dd }|| j  || j d S )Nc                 S   s   |   }| }|D ]H}|jrq
|j}|j}|j}|j}|j}|j}	|j	}
|j
}|j}d}d}t|dr9d}|j}|j|dd |j||||||	|
dd}|rR||_q
|  d S )NFis_distributedTr   )r   r   r   r   	lod_levelstop_gradient	trainabler-  )r   all_parametersr-  r   r   r   r   r>  r?  r@  optimize_attrregularizerhasattrr=  r6  r   r   )r   r   paramsr   r   r   r   r   r>  r?  r@  rB  rC  Zhave_dist_attrr=  r   rA   rA   rB   !recreate_not_persist_param_as_var  sD   

z_ShardingOptimizer._recreate_not_persist_param_as_var.<locals>.recreate_not_persist_param_as_var)r2   r1   )r=   rF  rA   rA   rB   r     s   
&z4ShardingOptimizer._recreate_not_persist_param_as_varc           
      C   s$  | j dkr| jdkrdS | j }| }g }t }|D ]}||j t|dr,|j	s2|
|j qt }|jD ]}|jdkrJ|
|j d  q9|D ]>}||v rTqMg }| jdkre||v re|| j | j dkrp|| j |D ]}	|jdd|id|id|	d	dd
dttjid qrqM|  dS )z
        this funtion is to ensure the initialization between dp group to be
        identical when hybrid-dp is used, and the initialization of
        not distributed param between mp group to be identical.
        r
   Nr=  Zc_broadcastr   r   r   r}   rootr   Tr   )r^   r<   r2   r   rA  r4   r   r   rD  r=  r#  r   r   r  r  r   r   	append_opr   r   r  r   )
r=   r   rE  Zparams_nameZnot_dist_param_namer   Zbroadcast_paramsr   r   ringrA   rA   rB   r     sJ   



z+ShardingOptimizer._initialization_broadcastc                 C   s   |D ]j}t |||jksJ d| d|d }|| jvs$J d||| j|< ||}|j||j|jdd}	|j||j|jdd}
|j|d||	dd	|	id
dddt	t
jid |jdd	|
i|j|jtddd q|  |  d S )Nz4try to merge gradient not belong to current shard: [rm   z@GradiantMergezKgrad [{}] already in grad2merged_grad, maybe you meet sharing weight case !Tr,  Zelementwise_addr   Yr   axisr)  
use_mkldnnFr   r*  r   )r   r   valuer   r   r   )r   Z
worker_idxro   rX   r   r   r   r   r   r   r   r
  rH  floatr   )r=   r   r   Z
insert_idxZ
grad_namesr   Z	grad_nameZpersistable_grad_nameZgrad_varZgradient_merge_varZstartup_gradient_merge_varrA   rA   rB   r4    sX   



zCShardingOptimizer.create_persistable_gradients_and_insert_merge_opsc                 C   s   t ddgt| jdddd}t ddgddddd}t ddgddddd}|jd	dgd
d}tdE |jdd|gid|gidtdttj	id |jd||dd|iddttj	ddid |jd||dd|ittj	id W d    |S 1 syw   Y  |S )Nrj   r
   Zint32T)r   r   rN  r   r-  Z	force_cpuZgradient_merge_zeror   Zgradient_merge_current_stepZgradient_merge_condboolr   cpu	incrementr   r   stepr   Zelementwise_modrJ  rL  r)  rM  Fequal)
r   rc   ru   r   r   rH  rP  r   r   r   )r=   r   Zacc_step_varZzero_varZcurrent_step_varZcond_varrA   rA   rB   _create_gm_cond-  sj   	
	

z!ShardingOptimizer._create_gm_condc                 C   s6  | j  }| j j}| j  }| j  | _|| | jrI| jdks%J d| j	 D ]\}}|
|}|jdd|id|id| jddttjid	 q*| j	 D ]$\}}|
|}|jd
d|id|id
dt| j ddddttjid	 qNg }| jD ]o}|j }	|	| |	 D ]}
|
| jv r|	|
| j|
  q|	 D ]I}|| jv r|	|| j|  ||vr|| j vr| j  
|}|js|j}|j}|j}| j  j|jdd | jj|||dd || qqx| j     |   | j	 D ] \}}|
|}|jdd|id|jd|jdtdttjid qdS )a   
        allreduce grad@gradientmerge in dp group
        grad@gradientmerge / acc_step
        re-create all optimize ops of origin main block and rename them
            cast(backward)
            amp
            clip
            opt
        # fill constant grad@gradientmerge

        r   z8dp_ring_id should larger than 0 when in sharding&DP moder  r   r   r}   r   Tr   r   r   r   r   r   Fr   r,  r*  r   r   rN  rO  N)!r1   r   Zcurrent_block_idxZcurrent_block
cond_blockZ_set_forward_block_idxrb   r   ro   r5  r   rH  r   r   r   rP  ru   original_optimize_ops_descr  	copy_fromr   r   r  _rename_outputr"  r-  r   r   r   r6  r   r   r   )r=   r   Zcur_block_idxZ	cur_blockr   Zmerged_gradZmerged_grad_varZalready_moved_var_namesZop_descZnew_op_descr  r  Zvar_Zname_Zshape_type_rA   rA   rB   _true_apply_gradientm  s   














z&ShardingOptimizer._true_apply_gradientc           
      C   s(  | j dks
| jdkrdS | j }| j }g | _ttt|j	D ])\}}t
|dt
tjkr3q"|j }||j | j| |j|dd q"|  tt| j| _| j  | |}| j }|   | j  | j jtjjjd}| j jd|g d	g |gd
|ddd}	dS )z
        copy all optimize ops in origin main block
        remove all optimize ops in origin main block
        create cond block

        rk   r
   Nr  Fr   )r   r  )ZCondr   )r   ZScopeT)r  Zis_scalar_conditionr   )rt   ru   r1   r   Z_create_blockrX  r   r   r   r   rc   r   r   r   r  rH  rY  r   r   r   Z	_rollbackrV  r\  r   r   VarDescVarTypeZSTEP_SCOPES)
r=   r   Ztmp_copy_blockr  r   Ztmp_op_descZcondrW  Z
step_scopeZconditional_block_oprA   rA   rB   r     sF   












z*ShardingOptimizer._sharding_gradient_mergeNNN)'__name__
__module____qualname____doc__r.   rH   rL   rP   r[   rg   rw   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r  r   r   r   r   r   r   r   r4  rV  r\  r   __classcell__rA   rA   r?   rB   r&   >   sR     ;NTr:%
=.
 s  M k*58@sr&   c                       sX   e Zd ZdZ fddZdd Zdd Z	d fd	d
	Zdd Zdd Z	dd Z
  ZS )ThreadShardingOptimizerr'   c                    s<   t  | || _g d| _d| _d| _tj}| | _	d S )N)ZParameterServerOptimizerr(   r)   r*   r+   r,   TF)
r-   r.   r/   r0   r:   r;   r   r   ZkOpRoleAttrNameop_role_key)r=   r>   Zop_makerr?   rA   rB   r.   "  s   

z ThreadShardingOptimizer.__init__c                 C   s   t  }t|jD ]L\}}|j D ]}|d}|dkrq|d| }	|j||	 || q|j D ]}
|
d}|dkrAq5|
d| }	|j	|
|	 ||
 q5q|D ]	}|j
|dd qWtdt| |  dS )z)
        rename BroadCast param

        r  r   Fr   zremove broadcast param count=N)r4   r   r   r  r   r  r   r#  r  rZ  r6  r   rT   r   )r=   r   r   r   Z	var_namesr   r   r  posnew_namer  r'  rA   rA   rB   r   4  s*   

z+ThreadShardingOptimizer._prune_main_programc                 C   s   |   dS )z"
        not need process
        N)r   )r=   r   r   rA   rA   rB   r   P  s   z.ThreadShardingOptimizer._prune_startup_programNc           	         s\   | j j}d|v r|d | _t ||||\}}| j|j_ddlm	} |j
| j ||fS )z6
        reset start program and main program
        r   r   )fluid)rE   rJ   r;   r-   r   r1   r   r   r   ri  Z	frameworkZswitch_startup_programr2   )	r=   r   r   r   r   rJ   r   r   ri  r?   rA   rB   r   V  s   

z%ThreadShardingOptimizer.minimize_implc                 C   s   | j  | _| j  | _| j }t| j}| j|ksJ d| j| j | _	| j
dkr9| | j| j	| j| j| j | jdkrL| | j| j	| j| j| j | jdkr_| | j| j	| j| j| j |  d S )Nzend points not equal node numsr
   )rC   r<  role_idZ	_node_num	node_numsr2   r   rT   r   r   r<   r   r   r   r\   r   r   r^   r   r   r   )r=   r   rk  rA   rA   rB   r   j  s>   



	
	z"ThreadShardingOptimizer._init_commc                 C   s^   t | jdkr	d S | jd d  }|| j  }| jdkr-ddlm} || || d S d S )Nr
   r   )wait_server_ready)rT   r   rC   r<  r|   Zpaddle.fluid.transpiler.detailsrl  r  )r=   r   r   rl  rA   rA   rB   r     s   

zThreadShardingOptimizer._waitc           
      C   s   t |}| }|dkrQ|d d  }|| |jtddtjjj	d}	|j
di d|	id|d|d	|| jtjid
 |j
dd|	ii d|d|d|| jtjid
 d S |j
dd|id d S )Nr
   Znccl_idT)r   r-  r   r  r   r   Zendpointother_endpointsr   Zc_comm_init_multitrainerr   Z	ntrainersZ
trainer_idr}   Zc_comm_init_all)r   r   )rT   r   r  r   r   r  r   r]  r^  ZRAWrH  rf  r   r  )
r=   r   r   r   rj  r}   Znranksr   rm  Znccl_id_varrA   rA   rB   r     s>   

z*ThreadShardingOptimizer._init_communicatorr_  )r`  ra  rb  rc  r.   r   r   r   r   r   r   rd  rA   rA   r?   rB   re    s    )re  )6rd   r   Zpaddle.baser   Zpaddle.incubate.optimizerr   Zpaddle.staticr   r   r   Zpaddle.utilsr   Zutils.log_utilr	   commonr   r   r   r   r   r   r   Zmeta_optimizer_baser   rF   r   Zsharding.fp16_helperr   Zsharding.gradient_clip_helperr   Zsharding.offload_helperr   Zsharding.pruner   Zsharding.shardr   r   Zsharding.utilsr   r   r   r   r   r   r    r!   r"   r#   r$   Zsharding.weight_decay_helperr%   __all__r&   re  rA   rA   rA   rB   <module>   sF   $	4               p