o
    "j                    @   s  d dl Z d dlmZ d dlZd dlmZmZmZ d dlm	Z	 d dl
mZmZmZmZmZmZmZmZmZmZmZ d dlmZ d dlmZ d dlmZmZ d d	lmZ d
dlm Z m!Z! d
dl"m#Z# ej$j%Z%ej$& Z'g dZ(g dZ)ee j*Z+dd Z,e!dG dd de Z-dd Z.dd Z/e%j0dfddZ1dd Z2dd Z3dd  Z4d!d" Z5d#d$ Z6ej7j8j9ej7j8j:fd%d&Z;d'd( Z<d)d* Z=d+d, Z>d-d. Z?d/d0 Z@d1d2 ZAd3d4 ZBd@d6d7ZCd8d9 ZDd:d; ZEG d<d= d=ZFG d>d? d?ZGdS )A    N)reduce)ParallelModeis_data_parallel_reduce_opis_parameter_related)new_process_group)_get_comm_group
get_loggerget_var_numelinsert_dependencies_for_varsis_backward_opis_dep_skip_opis_forward_opis_loss_grad_opis_optimize_op6naive_set_dist_op_attr_for_program_by_mesh_and_mappingset_var_dist_attrget_var_size)core)default_main_programdefault_startup_program)unique_name   )PassBaseregister_pass)AutoParallelStreamType)Zcreate_py_readerZcreate_double_buffer_readerreadslicesplitZassignZsend_v2)
ZadamZadamaxZadamwZdecayed_adagradZmomentumZdgc_momentumZlars_momentumZmerged_momentumZlambZsgdc                 C   s   | j dod| j dv S )Nop_namescopez/auto_parallel/reshard)deschas_attrattrop r%   q/var/www/html/Deteccion_Ine/venv/lib/python3.10/site-packages/paddle/distributed/passes/auto_parallel_sharding.py_is_reshard_opL   s
   r'   Zauto_parallel_shardingc                       s   e Zd Z fddZdd Zdd Zdd Zd	d
 Zdd Zdd Z	dd Z
dd Zdd Zdd Zdd Zdd Zdd Zdd Zdd  Zd!d" Zd#d$ Zd%d& Zd'd( Zd)d* Zd+d, Zd-d. Z  ZS )/ShardingPassc                    s   t    | dd  | dd  | dd  | dd  | dd  | dd  | dd  | dd  | d	d  | d
d  | dd  | dg  | dd t | _g | _i | _d| _d | _g | _	d S )Ndist_contextstagesharding_degreedegreeenable_overlapparam_comm_stream_numgrad_comm_stream_numparam_bucket_size_numelgrad_bucket_size_numelpartition_algorenable_hierarchical_commparams_gradsglobal_rankF)
super__init__set_attrset	dp_groupssharding_infosvarname_to_sharding_infosharding_hybrid_dpouter_dp_groupshared_params_gradsself	__class__r%   r&   r8   X   s(   

zShardingPass.__init__c                 C   sF  |  dd u r	dS |  ddvrdS |  dd ur+t|  dtr(|  ddkr*dS n|  dd urDt|  dtrA|  ddkrCdS ndS t|  dd	krQdS t|  d
tr`|  d
d	k rbdS |  dd u rkdS |  dd u rtdS |  dd u r}dS |  dd u rdS |  dd u rdS |  dd u rdS |  dd u rdS dS )Nr)   Fr*   )r         r+   r   r,   r4   r   r5   r-   r.   r/   r0   r1   r2   r3   T)get_attr
isinstanceintlenrA   r%   r%   r&   _check_selfn   sT   zShardingPass._check_selfc                 C   s   dS )NTr%   )rB   Z
other_passr%   r%   r&   _check_conflict      zShardingPass._check_conflictc                 C   s@  |  d| _t|  dp|  d| _t|  d| _t|  d| _|  d| _t|  d| _t|  d| _|  d	| _	| jd
ksI| jd
krP| jsPJ dt|  d| _
t|  d| _|  d| _|  d}| | }}| || |jD ]}| || | | | || q}|d| j | || d S )Nr)   r+   r,   r*   r5   r-   r.   r/   r3   r   z3multiple comm stream need enable_overlap to be Truer0   r1   r2   r4   )rG   _dist_contextrI   sharding_world_sizer*   r5   r-   r.   r/   r3   r0   r1   r2   global_block_build_sharding_groupsblocks_shard_optimizer_shard_gradient_synchronization_shard_parameterr9   r@   _optimization_pass)rB   main_programstartup_programcontextr4   
main_blockstartup_blockblockr%   r%   r&   _apply_single_impl   sF   


zShardingPass._apply_single_implc                 C   s   |  | | || d S N) _collective_data_parallel_groups_build_sharding_infos)rB   rZ   r4   r%   r%   r&   rQ      s   
z#ShardingPass._build_sharding_groupsc                 C   st   |j D ]#}t|r|jtv rqt|rqt| j|| j}|d ur&| j	| qt
| jdkr8tdt
| jd S )Nr   zuSo far Only and Exactly one data parallel group in network are supported, but got [{}] different data parallel groups)opsr   type	_skip_opsr'   +_inference_data_parallel_group_for_operatorr5   rN   r;   addrJ   NotImplementedErrorformat)rB   rZ   r$   groupr%   r%   r&   r_      s$   

z-ShardingPass._collective_data_parallel_groupsc                 C   sV  t ||| j}| jD ]}|j| jksJ d| j|j|j| j dks.J d| j|j| j|jv s>J d| j|jt|| jksPJ dt|| j|j| jkrd| _	| j
dk s`J | jdk sgJ t| jdksrJ d	t|j| j| j\}}t|}t|| _n|}|| j_t|| j|| j}| j| |jD ]}|| j|j< qq
d S )
NzBsharding world size [{}] should not larger than dp world size [{}]r   zBsharding world size [{}] should be divisible by dp world size [{}]zBcurrent ranks [{}] does NOT belong to the data parallel group [{}]zDnumber of parameters [{}] is not enough to be shard among [{}] ranksTrE   r   zthybrid sharding and data parallelism are supported only when there is excatly one data parallel group in the network)re_order_programrN   r;   nranksrO   rg   r5   ranksrJ   r>   r.   r/   _get_dp_and_sharding_groupsr   r?   Z_sharding_groupShardingInfor2   r<   appendparamsr=   name)rB   rZ   r4   dp_groupr?   sharding_groupsharding_infoparamr%   r%   r&   r`      sb   


z"ShardingPass._build_sharding_infosc                 C   s0   |  | | | | || | || dS )z
        sharding all optimizer related ops and vars, include:
        gradient clip ops & vars
        weight decay ops & vars
        optimizer ops and states
        N)_shard_amp_related_op_and_vars_shard_weight_decay_shard_optimizer_ops_and_states_insert_optimizer_broadcasts)rB   rZ   r[   r%   r%   r&   rS     s   

zShardingPass._shard_optimizerc                 C   sf  | j dk rd S ttt|jD ]\}}t||r@| j dkr@|jd }|d |d }| |s?|j	|dd |j
|dd q|jdv rg }|jdD ]}|d |d }| |rb|| qM|rt|jd| |jd| q|jd	kr|d
}|jd }	|j|	 }
|j	|dd |j|dd|
id|
jd|
jddt|id q|j	|dd q|  d S )Nr   r   @Fsync)check_finite_and_unscaleZupdate_loss_scalingXOutr|   op_roleZfill_constantshapedtypevaluerb   outputsattrs)r*   reversedlist	enumeratera   _is_param_grad_fp32_cast_opoutput_arg_namesfind_is_parameter_in_local_shard
_remove_op_remove_varrb   r    inputrn   	set_inputZ
set_outputr"   vars_insert_op_without_syncr   r   OP_ROLE_KEY_sync_with_cpp)rB   rZ   idxr$   output_name
param_nameZ
reversed_x
input_namer   out_nameZout_varr%   r%   r&   ru   *  sL   









z+ShardingPass._shard_amp_related_op_and_varsc                 C   s  | j dk rd S g d}t }t }tt|jD ]8\}}t|s!q|j|v rP|dd }|d |d }| 	|sP|
| |jdv rP|jD ]}	|
|	 qHqttt|jD ]\}}t|scqZ||v rn|j|dd qZ|D ]	}
|j|
dd qqtt|jD ]_\}}t|sq|jd	krg }|jD ]}||vr|| q|jd| |jd }t| jD ],\}}|j|| d
 dd|gid|gid|jjddddttjid}| j||}q nq|  d S )NrE   )Zelementwise_mulsquared_l2_normclip_by_normr}   r   @GRAD)r   r   Frz   sumr   c_allreduce_sumr~   ring_idr   z /gradient_clip_model_parallelismuse_calc_streamTrb   Zinputsr   r   )r*   r:   r   r   ra   _is_gradient_clip_oprb   r   r   r   re   r   r   r   r   input_arg_namesrn   r    r   r<   Z
_insert_oprh   idr   OpRoleOptimizerN    get_tensor_dist_attr_for_programvarr   )rB   rZ   Zremoved_op_typeZremoved_op_idxZremoved_tmp_varr   r$   r   r   r   varnameZreserved_varsZsum_op_outputirs   new_op	dist_attrr%   r%   r&   _shard_gradient_clip[  sn   










	 z!ShardingPass._shard_gradient_clipc                 C   sF   | j dk rd S ttt|jD ]\}}t|sqtd|  d S )NrE   z$weight decay is NOT supported by now)r*   r   r   r   ra   _is_weight_decay_oprf   r   )rB   rZ   r   r$   r%   r%   r&   rv     s   
z ShardingPass._shard_weight_decayc                    sB  g }t tt|jD ]K\}}t|s nB|jtv rVd|jv s!J t|	ddks,J |	dd  | 
 sM| fdd|jD  |j|dd q| j|   qt tt|jD ]\}}t|jdkry|jd |v ry|j|dd q`|D ]}||r|j|dd ||r|j|dd q||  |  d S )NParamr   r   c                    s   g | ]}| kr|qS r%   r%   ).0r   r   r%   r&   
<listcomp>  s
    z@ShardingPass._shard_optimizer_ops_and_states.<locals>.<listcomp>Frz   )r   r   r   ra   r   rb   _supported_optimizer_typeZinput_namesrJ   r   r   extendr   r   r@   rn   _get_param_gradhas_varr   r   )rB   rZ   r[   Zshould_removed_optimizer_statesr   r$   r   r%   r   r&   rw     s@   




z,ShardingPass._shard_optimizer_ops_and_statesc                 C   s   | j dks
| jdkrd S | jD ]S}|jD ]M}||jsJ ||js&J |jdd|id|id|jjd|	|jdd	t
tjid
}|ddtj  | j|}|d usWJ t||j|j| j qq|  d S )NrE   r   c_broadcastr}   r~   r   rootr   Tr   r   /)r*   r0   r<   ro   r   rp   	append_oprh   r   get_var_rankr   r   r   	_set_attrr   DataParallelrN   r   r   process_meshdims_mappingr   )rB   rZ   r[   rs   rt   r   Zparam_dist_attrr%   r%   r&   rx     s<   



z)ShardingPass._insert_optimizer_broadcastsc                 C   s"   || j v sJ | j | }||S r^   )r=   is_in_local_shard)rB   r   rs   r%   r%   r&   r     s   

z)ShardingPass._is_parameter_in_local_shardc                 C   s2   || j v sJ | j | }||}|d usJ |S r^   )r=   get_param_grad)rB   r   rs   Zp_gr%   r%   r&   r     s
   

zShardingPass._get_param_gradc           
   	   C   s  | j dk rd S dd | jD }ttt|jD ]h\}}t||ra|jd }t|}| j	| }t
||||jj||| j}| jrF||sP|j|d dd n|d| jj |d	d
tj  t||r|jd }	t|	}| j	| }||s|j|dd q|  d S )NrE   c                 S      g | ]}|j qS r%   r   r   rh   r%   r%   r&   r         z@ShardingPass._shard_gradient_synchronization.<locals>.<listcomp>r   r   Frz   r   r   r   )r*   r;   r   r   r   ra   _is_param_grad_allreduce_opr   _get_base_name_from_grad_namer=   _insert_reduce_oprh   r   r   rN   r>   r   r   r   r?   r   r   _is_param_grad_sum_opr   r   )
rB   rZ   dp_ring_idsr   r$   r   	base_namers   Z	reduce_opr   r%   r%   r&   rT     sB   



	




z,ShardingPass._shard_gradient_synchronizationc                 C   s  | j dk rd S dd | jD }| jD ]O}||\}}g }|D ]}|| dkr5|||jkr5|| q ttt	|j
D ]r\}	}
t|
rHq?|
jD ]e}t||
|jr_| |s^|| qK||vrdqK||}||jkrq|}n-t|d }||}|j||j|jdd}| j|}t| j||j|j}|
|| t||	||j||jj|
d| j qKq?ttt	|j
D ]&\}	}
|
j d	krq|
jd }|
j!d }||v r|j"|	dd
 |j#|dd
 qttt	|j
D ]Y\}	}
t$|
j!dksJ |
j!d }|
j dkr)|
d|v r)| j%r!|||jkr!|
&d| j%j n|j"|	dd
 q|
j dkrD||v rD|||jkrD|j"|	dd
 q|D ]}|||jkr`|j#|dd
 |j#|dd
 qGq|'  |'  d S )NrF   c                 S   r   r%   r   r   r%   r%   r&   r   1  r   z1ShardingPass._shard_parameter.<locals>.<listcomp>r   z
@BroadCastF)rp   r   r   persistabler   castrz   r   r   r   )(r*   r;   r<   "get_broadcast_vars_and_param_usager   
local_rankrn   r   r   r   ra   r   r   _is_param_fp16_cast_opparam_namesr   r   generater   
create_varr   r   rN   r   r   r   r   _rename_input_insert_init_and_broadcast_oprh   r   r"   rb   r   r   r   rJ   r?   r   r   )rB   rZ   r[   r   rs   Zneed_broadcast_varsparam_usageZnot_used_param_namer   r   r$   r   	root_rankbroadcast_varname	input_varZnew_varZref_dist_attrZout_var_dist_attrr   r%   r%   r&   rU   -  s   







4




zShardingPass._shard_parameterc                 C   s   | j dkrd S d| _d| _d| _t| jdks!J dt| j| jd }tj	||@ | 
| | jdkrU| j dkrC| | n"| j dkr]| | W d    d S W d    d S W d    d S W d    d S 1 spw   Y  d S )	Nr   Zsharding_coalesce_grad_Zsharding_coalesce_param_r6   z^gradient synchronization optimization only support one sharding group right now, but got [{}].r   rE   rF   )r*   grad_coalesce_prefixparam_coalesce_prefixcomm_op_scheduling_priorityrJ   r<   rg   paddleZstaticZprogram_guard_gradient_sync_optimizationr0   &_fuse_overlap_parameter_comm_stage_two(_fuse_overlap_parameter_comm_stage_three)rB   rW   rX   rs   r%   r%   r&   rV     s2   





"zShardingPass._optimization_passc                 C   sL   | j dkr
| js
d S t  }t  }| ||\}}| |||| d S )Nr   )r1   r-   r   rP   r   _group_grads_overlap_grad_comm)rB   rs   rZ   r[   coalesce_to_group_mapgrad_name_to_group_mapr%   r%   r&   r     s   

z(ShardingPass._gradient_sync_optimizationc                 C   s  t   }t  }t|| j\}}td td| jt|	 t|	  i }| j
rcg | _|jj}t| jD ]}|dkrE|j}	nt|dd}	| j|	tjjd q;td| j i | _t|	 D ]\}}
t|
dksuJ t|
dkrt| jt| }|j||
jddd	 |j||
jddd	|
_|jd
d|
ji|
j|
jdddddd|
jt t!j"id n|
jd |
_td|t#dd |
jD  t$d| ddd |
jD  d |
||
jj%< || j }| j| d }| j| d }|jdd|
jid|
jid|j&d|
j'ddt t!j(id}|| j|< |)d d!t*j+  | j
r-||j,_-| j.|j,_/qii }t|j0D ]\}}t1|r|2dd }|j| }|| }
d }| j
rX|j,j-}t|	 | jk rqt3|}|j|2d"d  }n|j0|| j  }t1|sJ d#|j|2dd  }||g|g|fg||< t|
jdkr|
j4r|
jd$ }|| ||g|g|f || |d |g|
j|f q5t5|	 dd%}|D ]2}|| d d d$ D ]%\}}}}t6||||| j7t!j(d$gd&d&d'd(
}| j
r||j,_-| j.|j,_/qېq|8  d S ))NzSharding Stage2 Optimization:zFParam Bucket size is [{}], [{}] Parameters are fused into [{}] Bucketsr   TZforce_new_group
comm_groupcomm_streamz/Parameter Communication would use [{}] streams.r   rp   r   r   Zstop_gradientcoalesce_tensorInputOutputZFusedOutput	copy_data	use_alignr   r   zBucket[{}] size [{}]MB.c                 S      g | ]}t |qS r%   r   r   pr%   r%   r&   r   #      zGShardingPass._fuse_overlap_parameter_comm_stage_two.<locals>.<listcomp>zBucket[z] parameters: c                 S   r   r%   rp   r   r%   r%   r&   r   '  r   .r   r   r   r}   r~   r   r   r   r   r   ZParamOutz:Unexpected: sharding broadcast pre op should be broadcast.r6   reverseFZsharding_stage2_broadcast_depr   Zis_recomputer{   r   )9r   rP   r   group_paramr0   _loggerinforg   rJ   keysr-   Zparam_comm_group_stream_pairsrh   rk   ranger.   r   rn   r   ZSHARDING_STREAMr   Zop_to_stream_idxr   r   r   r   strr   r   coalesce_varr   r   r   r   ZForwardr   debugrp   r   rankr   r   r   r   r   execution_streamr   scheduling_priorityra   is_sharding_param_broadcast_opoutput_get_broadcast_first_depend_opr   sortedr
   rN   r   )rB   rs   rZ   r[   group_to_param_mapparam_to_group_mapZbroadcast_var_to_group_maprk   r   rh   Zparam_groupZcoalesce_var_nameZcomm_stream_idxr   r   r   dep_mapr$   r   broadcast_varZ	prior_varZpre_opZ	last_gradindicer   
prior_vars	post_vars	depend_opr%   r%   r&   r     s2  









z3ShardingPass._fuse_overlap_parameter_comm_stage_twoc                 C   s   d S r^   r%   )rB   rs   r%   r%   r&   r     rM   z5ShardingPass._fuse_overlap_parameter_comm_stage_threec           )      C   s<  |j }| jdk rd| _d}|D ]}t|r|}q|du rdS |jd }t| j}g }t }	dd }
d}|t|k r	|| }t|r|jdksKJ d|jd }t	|}|
|}||}|||rq||	vsjJ ||| n|| t| j}||| t|jdkr|d |_d}t|||  r|d7 }t|||  s|||  }|jd }||_|	| |j| | jr||rd	|_||d  jd
ksJ d||d  jd |ksJ d|j|d  |d7 }n|
||r|| t| j}|d7 }|t|k s:t|jdkr|| td td| jt|	t| i }i }i }i }g }t|D ]`\}}t|jdkr|jt| jt | |j!dd	d|_"|||j< |j# }|||< |$|j |jr~|j# }|||< |$|j n|jd |_"|jD ]}|||j%< q|||j"j%< q8t|& }t|& }t|}|'|}t|dksJ |'|}t|dksJ |'|}t|dksJ t(t)t|j D ]\} }| |v r||  }|jd }||jd j%ksJ d|jd j%||*||j"j% |+||j"j% | |v r|j,| dd | |v r||  }|jd j%}!|!|jv s>J d|!t |dd |jD }"g }#g }$|jD ]}%|%j-}&|#$|& |$t|& qM|j.| dd|"i|"|j"ddddd	d|j!d|#d|$t/t0j1id}'t2|| ||j|j"| j3t0j1dgdddd 
}(q|4  ||fS )!a  
        conditions for gradients to be grouped:
            1. group size < grad_bucket_size_numel
            2. same dp group (TODO)
            3. same src rank
            4. same dtype
            5. dependency: grad would NOT be used by other ops within group segment

        main logic:
            1. record coalesce group
            2. record all dp allreduce/reduce op idx

            3. insert coalesce op
            4. insert coalesce dependency (avoid allocate memory too early)
            5. modify and remove allreduce/reduce op
            6. ensure sharding-dp hybrid parallel logic

        gradients inside same group would be fuse into one coalesce tensor
        r   Nr   c                 S   s2   t | j| j }dd |jD }t||dkS )Nc                 S   s   h | ]}|j qS r%   r   r   r   r%   r%   r&   	<setcomp>  r   zHShardingPass._group_grads.<locals>.op_depend_on_group.<locals>.<setcomp>r   )r:   r   r   r   rJ   intersection)r$   rh   Zvars_Z	var_namesr%   r%   r&   op_depend_on_group  s   z5ShardingPass._group_grads.<locals>.op_depend_on_groupc_reduce_sumzZSharding should reduce grad first and than allreduce if Hybrid Sharding with Data-ParallelrE   Tr   z@Hybrid Sharding with Data-Parallel should sync same gradient varz-Sharding Gradient Communication Optimization:zIGradient Bucket size is [{}], [{}] Gradients are fused into [{}] Buckets.Fr   r6   z4Unexpected: it is supposed to sync [{}] but got [{}]rz   z=Unexpected: op is supposed to generate grad [{}] but got [{}]c                 S   r   r%   r   )r   gradr%   r%   r&   r   G  r   z-ShardingPass._group_grads.<locals>.<listcomp>r   r   r   r   r   r   concated_shapesconcated_ranksr   Zsharding_grad_coalesce_depr   )5ra   r1   r   r   VarGroupr:   rJ   r   rb   r   r   r   
acceptablecollectrn   r   coalesce_op_idxr   coalesce_dep_varnamere   reduce_op_indicesr>   r   allreduce_op_indicesr   r   rg   r   r   r   r   r   r   r   r   popr   rp   r   r  r   r   r   Z_rename_outputr   r   r   r   r   Backwardr
   rN   r   ))rB   r\   rs   ra   Zfirst_backward_opr$   Zfirst_backward_varname	cur_groupZgrad_groupsZgrouped_grad_namesr  r   	grad_namer   r  Zgrad_varjZdep_opZdep_varnamer   r   Zmodify_reduce_op_mapZcoalesce_op_mapZremove_reduce_op_indicesrh   Zlast_reduce_op_idxZlast_allreduce_op_idxr  Zcoalesce_op_setZmodify_op_setZremove_op_setZconfilctr   Zfirst_grad_nameZ
grad_namesr  r  Zgrad_r   Zcoalesce_opr  r%   r%   r&   r     sN  














4

	













zShardingPass._group_gradsc           )         s~  | j sdS g | _|jj}t| jD ]}|dkr|j}nt|dd}d| }| j||d q|j}	i }
d}i }t	|	D ]\}}t
|r|jdkrKq=|| j }|||< | j| d }| j| d	 }|d
d }|| }|jj|ksuJ t|jdkr||jd |j|fg|
|< |d }| jr|jr|d7 }|
| ||j|j|f ||j_| j|j_|d|j | jr|jr|	|d  }|jdksJ |d
d |ksJ ||j_| j|j_|d7 }|d7 }|d7 }q=t|
 dd}|D ],}|
| ddd D ] \}}}}t||||| jtjdgdddd
}||j_| j|j_qq| jr9d| j  dks/J |j}|j}| j! | j!  fdd|D }t"#d t"#d| j! d t"#d| d t|| j  ksmJ  fdd|D }t|ksJ t"#d| d g }g }t| jD ]} |t|dd |t|dd qt$t%t	|jD ]\}}t
|r7|jdksJ || }!||! }"||! }#|d
d }| j r|jj}t&|'d}$d}%|$ krd}%|$ }&|d|#j |d|& |%r7|$ }'|j(|d dd|id
|id|"jd|'ddt)tjid}(|(d d!t*j+  | j r7||(j_| j|(j_q|,  dS )"a  
        overlap gradient communication with backward & optimizer computation.

        1. assign gradient communications to grad comm stream
        2. for coalesce gradient communication:
            2.1 insert before communication dependencies
            2.2 insert after communication dependencies only when need
        3. there is not need to add explicit dependencies for non-coalesce gradient communication

        P.S. this overlap pass is ONLY adapted for standalone executor (graph based) and stream awared allocator.
        Nr   Tr   Zsharding_grad_comm_streamr   r   r   r   r~   r   r6   r   r   FZsharding_grad_comm_depr      c                    s   g | ]
}|  kr|qS r%   r%   r   r  )nranks_per_noderelative_idx_in_noder%   r&   r     
    z3ShardingPass._overlap_grad_comm.<locals>.<listcomp>z:Sharding Gradient Hierarchical Communication Optimization.zcurrent global rank idx: r   zlocal inter node ranks idx: c                    s   g | ]
}|  kr|qS r%   r%   r%  )node_idxr&  r%   r&   r     r(  zlocal intra node ranks idx: r  root_idr}   r   r   r   r   )-r-   Zgrad_comm_group_stream_pairsrh   rk   r   r/   r   rn   ra   r   r   rb   r  r   rp   rJ   r   r>   r   r   r  r   r  r   r   r  r   r
   rN   r   r   r3   rO   r5   r   r   r   r   rI   r"   r   r   r   r   r   ))rB   r\   rs   r   r   rk   r   rh   streamra   r
  Zreduce_op_countZgrad_comm_op_to_stream_idxr   r$   Z
stream_idxr   r   Zreduce_varnameZ
grad_groupZpost_idxZnext_opr  r  r  r  Zglobal_groupZglobal_ranksZinter_node_ranksZintra_node_ranksZinter_node_groupsZintra_node_groups_Zgrad_comm_stream_idxZinter_node_groupZintra_node_groupZdst_rankZin_peerZintra_node_dstZinter_node_dstr   r%   )r)  r&  r'  r&   r   s  s6  


	








zShardingPass._overlap_grad_comm)__name__
__module____qualname__r8   rK   rL   r]   rQ   r_   r`   rS   ru   r   rv   rw   rx   r   r   rT   rU   rV   r   r   r   r   r   __classcell__r%   r%   rC   r&   r(   V   s4    (091D(#*y < dr(   c                 C   s&   | j D ]}|jtv r|  S qtd)NzCould not find optimizer op.)ra   rb   r   	Exception)r\   r$   r%   r%   r&   r  W  s
   

r  c                 C   s   |  |}||}	| j|dd|id|id|d|ddt|id}
|
d	d
tj  t|
|	j|	j	| ||krU| j|dd|j
id|jd|jt|id}
t|
|	j|	j	| dS dS )z%
    empty op for initialization
    r   r}   r~   r   r   r   Tr   r   r   emptyr   r   r   N)r   r   r   r   r   r   r   r   r   r   rp   r   r   )r\   
insert_idxr   r   r   r   r   r)   r  Zbroadcast_var_dist_attrr   r%   r%   r&   r   _  sN   

r   Tc           
      C   s~   |dksJ d| | j |dd|gid|gid|d|d|t|id	}|| |}	t||	j|	j| |d
dtj	  |S )Nr   z5root id should be a positive int, but now root id is r  r}   r~   r   r*  r   r   r   r   )
r   r   r   r   r   r   r   r   r   r   )
r\   r3  Z
reduce_varr   r*  r)   r   r   r   r   r%   r%   r&   r     s,   
r   c                 C   s<   d}d}t | | |g}t| |||}t| |||}||fS )Nr   r   )rJ   r   )Zorigin_groupZsharding_group_sizer  Zdp_axisZsharding_axisr   rq   rr   r%   r%   r&   rl     s   rl   c                 C      | j do| j ddS )Nr   z/gradient_clipr    r!   r"   
startswithr#   r%   r%   r&   r     
   r   c                 C   r4  )Nr   z/regularizationr5  r#   r%   r%   r&   r     r7  r   c                 C   s`   t |sdS t| |tjjjtjjjsdS |jd }|d |d }| 	|s*dS | 
|jS )NFr   ry   )r   _is_desired_cast_opr   VarDescVarTypeFP16FP32r   r   r   r   is_parameter)r\   r$   r   r   r%   r%   r&   r     s   

r   c                 C   s4   t |rdS t| |sdS |jd }||vrdS dS )NFr   T)r   r8  r   )r\   r$   ro   r   r%   r%   r&   r     s   

r   c                 C   sn   |j dkrdS t|jdksJ t|jdksJ | |jd }| |jd }|j|ks3|j|kr5dS dS )Nr   Fr   r   T)rb   rJ   r   r   r   r   )r\   r$   Zsrc_var_typeZdst_var_typer   Z
output_varr%   r%   r&   r8    s   
r8  c                 C   s@   d }d| v r| d |  d }|S d| v r| d |  d }|S )Nz.cast_fp16@GRADr   )r   )r"  r   r%   r%   r&   r     s   r   c                 C   s8   t | sdS | jd }t|}||sdS ||jS )NFr   )r   r   r   r   r   r=  r$   r\   r   r   r%   r%   r&   r     s   

r   c                 C   sF   t | sdS | jdkrdS | jd }t|}||sdS ||jS )NFr   r   )r   rb   r   r   r   r   r=  r>  r%   r%   r&   r     s   


r   c                 C   s(   | j dko| jdotj| jdv S )Nr   r   )rb   r    r!   r   r   r"   r#   r%   r%   r&   r  #  s
   

r  c                 C   s   d }|j D ]>}t||j|sC||}|j}||}|j}t|dkr%q|d }	|	dkrC||	 dkrCt|j	|j|	| }
t
|
} |S q|S )Nr   r6   r   )r   r   r\   get_op_dist_attr_for_programr   Zget_input_dims_mappingr   rJ   r   Zprocess_idsr   )Zrank_idr$   r)   rq   r   r   r   Zinput_dim_mappingZ
mesh_shapeZbatch_size_axisZgroup_ranksr%   r%   r&   rd   +  s,   


 rd   c           	      C   s   i }d}g }| D ]}t |}||7 }|||f qdd t|D }d}d}|D ]\}}||d |d  | kr=|d7 }|| | ||7 }q)|S )z
    shard the continouse param into same rank and divide the forward&backward computation into segement,
    which will favor the fuse pass in later.

    we assume that the params is already sorted by utilization order.
    g        c                 S   s   i | ]}|g qS r%   r%   r   xr%   r%   r&   
<dictcomp>U  r   z*partition_by_use_order.<locals>.<dictcomp>r   g      ?r   )r   rn   r   )	ro   
group_sizemappingZtotal_param_memZ	param2memrt   ZmemZcur_rankZmem_accur%   r%   r&   partition_by_use_orderG  s    
rE  c                 C   s   i }t |D ]}g ||< qdg| }| D ]1}|t|}|| | tdd |jd}|dks=J d|j d| d||  |7  < q|S )zJ
    use greedy alogrithm to partition parameter as even as possible.
    r   c                 S   s   | | S r^   r%   )rA  yr%   r%   r&   <lambda>l  s    z*partition_by_greedy_even.<locals>.<lambda>r   zparam [z#] should larger than 0, but it is [])r   indexminrn   r   r   rp   )ro   rC  rD  Zrank_sizesrt   r  numelr%   r%   r&   partition_by_greedy_evena  s   


rM  greedy_evenc              	   C   s   |dkr
t | |}nt| |}td | D ]%\}}td| dtdd |D  d tdd	d |D  d
 q|S )NrN  zSharding Parameter Partition:zRank:z, Parameter Size:c                 S   r   r%   r   r  r%   r%   r&   r   ~  r   z(partition_parameters.<locals>.<listcomp>z MB.zParams in this rank: c                 S   r   r%   r   r  r%   r%   r&   r     r   r   )rM  rE  r   r   itemsr   )ro   rC  Zalgorrank_to_paramskvr%   r%   r&   partition_parametersu  s   

rS  c                    s  i  |D ]\}}||f |j < qg }| jD ]}|jD ]}| v r)||vr)|| qt|t kr4 nq| jd }i }	t| j}
g }t|r|jtv rtt	t
| jD ]"\}}|jtv rwt|ddksiJ ||	|dd < || qUt|t|	ksJ |D ]}| jdd}|j|	| j ||||	|  q|D ]	}| j|dd q|   t| j|
ksJ td	| d
  fdd|D S )Nr6   r   r   r   Znop)rb   Frz   z(Sharding the Order of param being used: r   c                    s   g | ]} | qS r%   r%   r   Zpname_to_pg_pairsr%   r&   r     r   z$re_order_program.<locals>.<listcomp>)rp   ra   r   rn   rJ   r   rb   r   r   r   r   r   r   r    Z	copy_fromZset_op_dist_attr_for_programr?  r   r   r   r   )r\   Zparam_gradsr)   r   gZ	use_orderr$   r   Zlast_opZpname_to_opZnum_opsZremove_op_indicesr   Zpnamer   r%   rT  r&   ri     sL   






ri   c                 C   s   i }i }g }t |}| jD ]>}| |j}|||r"||| n
t |}||| | |j|_||v r@|| |j n|jg||< |||j< q||fS )zA
    param are group by:
    rank id
    fuse_size
    dtype
    )r  ro   r   rp   r  r  r   rn   )rs   Z	fuse_sizer  r	  Zbucketr!  rt   r  r%   r%   r&   r     s$   
r   c                   @   s<   e Zd Zdd Zdd Zdd Zdd Zd	d
 Zdd ZdS )rm   c                 C   s   || _ dd |D | _t| jtt| jksJ ddd |D | _dd | jD | _|j| _|| _|j	
| j| _|| _t| j| j| j| _i | _|   d S )Nc                 S   s   i | ]
\}}|j ||fqS r%   r   )r   r   rU  r%   r%   r&   rB    s    z)ShardingInfo.__init__.<locals>.<dictcomp>z&found duplicated param in params_gradsc                 S   s   g | ]\}}|qS r%   r%   )r   r   r,  r%   r%   r&   r     r   z)ShardingInfo.__init__.<locals>.<listcomp>c                 S   r   r%   r   r   r%   r%   r&   r     r   )rh   r4   rJ   r:   ro   r   rj   rC  r5   rk   rI  r   r2   rS  rP  param_to_rank_map_param_to_rank)rB   rh   r  r4   r2   r%   r%   r&   r8     s$   
zShardingInfo.__init__c                 C   s.   | j  D ]\}}|D ]}|| j|j< qqdS )z@
        mapping parameters to the rank which holds it.
        N)rP  rO  rV  rp   )rB   r  ro   rt   r%   r%   r&   rW    s
   zShardingInfo._map_param_to_rankc                 C   s   || j v r
| j | S dS )Nr6   )rV  )rB   r   r%   r%   r&   r     s   

zShardingInfo.get_var_rankc                 C   s   |  || jkS r^   )r   r   rB   r   r%   r%   r&   r     s   zShardingInfo.is_in_local_shardc                 C   s   t  }t  }i }dd | jD }|jD ]}t|rq|jD ]}|| jv r,||  d7  < qq|jD ]2}t||| js;q1|jd }|jd }|| || |||< ||  d8  < | j| | j|< q1|	 D ]\}	}
|
dkru||	 qh||fS )Nc                 S   s   i | ]}|d qS )r   r%   r@  r%   r%   r&   rB    r   zCShardingInfo.get_broadcast_vars_and_param_usage.<locals>.<dictcomp>r   r   )
r:   r   ra   r   r   r   r   re   rV  rO  )rB   r\   Zbroadcast_varsZfp16_paramsZfp16_to_fp32r   r$   r   r   rt   usager%   r%   r&   r     s6   








z/ShardingInfo.get_broadcast_vars_and_param_usagec                 C   sB   |  |std| d|| jvrtd| d| j|d S )Nzparam[z] not in current rank.z] not in params_grads)r   
ValueErrorr4   getrX  r%   r%   r&   r   #  s
   

zShardingInfo.get_param_gradN)	r-  r.  r/  r8   rW  r   r   r   r   r%   r%   r%   r&   rm     s    rm   c                   @   s,   e Zd Zdd Zdd Zdd Zdd Zd	S )
r  c                 C   sF   || _ d | _d| _d| _g | _d | _d | _d | _g | _g | _	d| _
d S )Nr6   r   F)max_siezr   r  rL  r   r   r  r  r  r  r   )rB   max_sizer%   r%   r&   r8   ,  s   
zVarGroup.__init__c                 C   sH   | j dkrdS |j| jkrdS || jkrdS | j t| | jkr"dS dS )Nr   TF)rL  r   r  r	   r\  rB   rt   r  r%   r%   r&   r  9  s   

zVarGroup.acceptablec                 C   s0   |j | _ || _|  jt|7  _| j| d S r^   )r   r  rL  r	   r   rn   r^  r%   r%   r&   r  E  s   zVarGroup.collectc                 C   s
   t | jS r^   )rJ   r   rA   r%   r%   r&   __len__K  s   
zVarGroup.__len__N)r-  r.  r/  r8   r  r  r_  r%   r%   r%   r&   r  +  s
    r  )rN  )Hlogging	functoolsr   r   Z8paddle.distributed.auto_parallel.static.operators.commonr   r   r   Z5paddle.distributed.auto_parallel.static.process_groupr   Z-paddle.distributed.auto_parallel.static.utilsr   r   r	   r
   r   r   r   r   r   r   r   Z7paddle.distributed.fleet.meta_optimizers.sharding.utilsr   Zpaddle.frameworkr   Zpaddle.staticr   r   Zpaddle.utilsr   Z	pass_baser   r   Z
pass_utilsr   Zop_proto_and_checker_makerr   ZkOpRoleAttrNamer   rc   r   INFOr   r'   r(   r  r   r   r   rl   r   r   r   r   r9  r:  r<  r;  r8  r   r   r   r  rd   rE  rM  rS  ri   r   rm   r  r%   r%   r%   r&   <module>   sn   4



          
?
$
	
4"P