o
    "jj                     @   s  d dl Z d dlZd dlmZ d dlZd dlmZmZmZm	Z	m
Z
 d dlmZ d dlmZ dd ZdCd	d
Zdd Zdd Zdd Zdd Zdd Zdd ZejddfddZG dd dZejddfddZejdddfdd Zejdddfd!d"Zejdddfd#d$Zejdddfd%d&Zejdfd'd(Z d)d* Z!dDd,d-Z"d.d/ Z#	dEd0d1Z$ej%j&j'd2ej%j&j(d3ej%j&j)d4ej%j&j*d2ej%j&j+d3ej%j&j,d4ej%j&j-d5ej%j&j.d5iZ/d6d7 Z0dFd9d:Z1d;d< Z2d=d> Z3dGd?d@Z4dAdB Z5dS )H    N)reduce)OP_ROLE_KEYOpRoleis_backward_opis_loss_grad_opis_optimizer_op)core)unique_namec                 C   s  i }t | jD ]2\}}|jdkr9| d s9|j d }d|v r9||v r2td||| d |d|d||< qt | jD ]\}}|jd	krZ|j d }||v rY||| d
< q?q?d}d}t | jD ]o\}}|jdkrp|}qd|jdkrx|}qd|jdkr| d s|j d }d|v r|| d
 dkr|dksJ || d
 |k sJ ||k sJ qd|j D ] }||v r|| d dksJ || d |k sJ ||k sJ qqddS )am  
    if a var is broadcasted, it should have a sync_comm before
    this var is used, if not, raise error.
    if the broadcasted var has a fill_constant op, the fill_constant
    op should stay forward before the broadcast op, and before a
    sync_calc op. Otherwise, raise error.

    should ignore and skip broadcast_op of inner_parallelism (e.g. Megatron)
    c_broadcastuse_calc_streamr   z
@BroadCastz>var_name areadly exist: {}the old pos is {}, the new pos is {}broadcast_pos)fill_constant_posr   fill_constantr   c_sync_comm_streamc_sync_calc_streamN)		enumerateopstype	all_attrsdescinput_arg_names
ValueErrorformatoutput_arg_names)blockbroadcast_varsidxopvar_nameZlast_sync_comm_op_idxZlast_sync_calc_op_idx
input_name r!   x/var/www/html/Deteccion_Ine/venv/lib/python3.10/site-packages/paddle/distributed/fleet/meta_optimizers/sharding/utils.pycheck_broadcast   sp   


	






r#   r   c                 C   sp  i }i }d}d}d}t | jD ]f\}	}
|
jdks|
jdkrn|
 d sn|
jd}|
j d }|dd }d|v sAd	|v sAJ d|v sJ||sOd||< nd||< ||krd||s^J ||ksdJ d|v rk|	}n	 |	}|
jd
kru|	}q| jD ]}
|
jdkr|D ]}||v r|| dkrd||< q|D ]}||v r|| dkrd||< qqy|
jdks|
jdkr2|
 d s1|
j d }|
jd}||kr|
jdksJ d||v r|| }n|| }|dkrt	| d|dkrt	d| d|dksJ ||v r	d||< qyd||< qy||ksJ |dd }||s$J || dks-J d||< qy|
jdkr|
j d }|
jd}||krz|
j D ])}||v re|| dks_J d||< qO||v rw|| dkssJ d||< qOqy|
j D ]&}|dd }||ksJ ||sJ || dksJ d||< qqy|
j D ]E}||v r|| dkrt	d| t	d
|||v r|dkr|| dkrt	d| q|| dkrt	d| q|
j D ]#}||v r	|| dkr	d||< ||v r|| dkrd||< qqy|dkr(||ks(J |dkr4||ks6J dS dS )a  
    the op order should be:
        grad:
            - 0: op that generate Var
            - 1: sync_calc
            - 2: reduce_sum_sharding (allreduce --> reduce)
            - 3: sync_comm
            - 4: allreuce_sum_dp (dp_grads)
            - 5: sync_comm (dp_grads)
            - 6: op that use Var (dp_grads & sum)

    should ignore and skip allreduce_op of inner_parallelism (e.g. Megatron)
    r   c_allreduce_sumc_reduce_sumr   ring_idr   @sum@GRADZc_allreduce_maxr      z=Grad in Sharding group should be reduce rather than allreducez5 is not generated, but you aretrying to all-reduce itz3There should be a sync_calc op after generate Var: z! and before thec_allreduce_sum op         r      z8There should be a sync_comm op after allreduce the Var: zCThe reduce output grad [{}] should NOT be be used in Non-root rank.zAThe grad in shard should be allreduce and synctwice before usage N)r   r   r   r   r   attrr   splitZ	has_paramr   r   r   )r   shardsharding_ring_idZ
dp_ring_idZvars_statusZdp_grads_statusZidx_last_grad_allreduceZidx_amp_allreduceZidx_gradient_clip_allreducer   r   r&   r   paramZ_statusr    Zoutput_namer!   r!   r"   check_allreduce_sumb   s  

















r4   c                 C   sh   | j | d}|t| j ks|ttjttjfv rtjS |ttjttjfv r-tjS t	| |d S )z2
    return OpRole.Forward or OpRole.Backward
    op_roler*   )
r   r/   lenintr   BackwardOptimizeForwardZLossget_valid_op_role)r   
insert_idxr5   r!   r!   r"   r;     s   r;   c                 C   s.   t | |}| j|dd|id|it|id dS )z
    _insert_sync_calc_op
    r   XOutr   ZinputsoutputsattrsNr;   _insert_op_without_syncr   )r   r<   Zcalc_dep_varsr5   r!   r!   r"   insert_sync_calc_op  s   

rD   c              	   C   s2   t | |}| j|dd|id|id|t|id dS )z,
    insert sync_comm_op for single var
    r   r=   r>   r&   r?   r*   rB   r   r<   r&   Zcomm_dep_varsr5   r!   r!   r"   insert_sync_comm_op  s   

rF   c              	   C   sF   t |dkrdS t| |}| j|dd|id|idt|t|id dS )z&
    insert sync_comm_op for vars
    r   r   r=   r>   r&   r?   r*   )r6   r;   rC   r7   r   rE   r!   r!   r"   insert_sync_comm_ops-  s   
rG   c                 C   sN   t | |}|D ]}| |}| j|dd|jid|jd|jddt|id qdS )	z 
    _add_fill_constant_ops
    r   r>   shapedtypevalue        r   r@   rA   N)r;   varrC   namerH   rI   r   )r   r<   Zfill_constant_varsr5   broadcast_nameZbroadcast_varr!   r!   r"   insert_fill_constant_ops@  s   

rP   c                 C   sT   t | |}| D ]\}}| j|dd|id|idtjjjdtjjjt|id q	dS )z
    _add_cast_ops
    castr=   r>   Zin_dtypeZ	out_dtyper?   N)	r;   itemsrC   r   VarDescVarTypeFP32FP16r   )r   r<   Zcast_opsr5   Z	fp16_nameZ	fp32_namer!   r!   r"   insert_cast_opsT  s   


rW   Fc                 C   sp   t |dkrdS |r|jr|jst| ||||||j dS |D ]}| j|dd|id|id|d|t|id qdS )	z
    _add_allreduce_ops
    r   Nr$   r=   r>   r&   r   r?   )r6   fuse_all_reduce_opsfuse_grad_mergeinsert_fused_allreduce_opsfuse_grad_size_in_MBrC   r   )r   r<   r&   allreduce_varsr5   r   Zuser_defined_strategyrM   r!   r!   r"   insert_allreduce_opsg  s<   r]   c                   @   s:   e Zd Zedd Zed
ddZeejdfddZd	S )
FuseHelperc                 C   sz   g }g }g }|D ]#}|  |j}|tjkr|| q|tjkr&|| q|| qt|dks6J d|| |S )Nr   z$only support fp32/fp16 vars for fuse)rM   rI   paddleZfloat32appendZfloat16r6   extend)r   	vars_nameZ	fp32_varsZ	fp16_varsZ
other_varsrM   rI   r!   r!   r"   sort_vars_by_dtype  s   


zFuseHelper.sort_vars_by_dtype      @@c           	      C   s|   g }d}d}|D ]3}|  |}t|}|| |ks$t|dks$|j|kr0||g |}|j}q|d | ||7 }q|S )z coalesce tensor, get fused grouprK   Nr   r   )rM   get_var_sizer6   rI   r`   )	r   rb   	fuse_sizegroupsZcur_sizeZ
last_dtyper   Zreal_varZvar_sizer!   r!   r"   get_fused_groups  s   


zFuseHelper.get_fused_groupsOutputc           	      C   s   g }d}|D ]S}t |dksJ t |dkr||d  q| jtd| d|d j |d jddd}|| | j|dd	|i||d
ddddd|d jt|id |d7 }q||fS )Nr   r*   ZFused_FT)rN   rI   persistableZstop_gradientZcoalesce_tensorZInput)ri   ZFusedOutputZ	copy_dataZ	use_alignrI   r?   )	r6   r`   Z
create_varr	   generaterN   rI   rC   r   )	r   indexrg   r5   prefix
fused_vars
insert_numgroup	fused_varr!   r!   r"   insert_coalesce_tensor  s6   


z!FuseHelper.insert_coalesce_tensorN)rd   )	__name__
__module____qualname__staticmethodrc   rh   r   r8   rs   r!   r!   r!   r"   r^     s    
r^       c                 C   s   t | ||}t j| |||dd\}}	|D ],}
| j||	 dd|
id|
id|d|t|id |sA| j||	 d	d|
id|
it|id qd S )
NGradrn   r$   r=   r>   r&   r   r?   r   )r^   rh   rs   rC   r   )r   r<   r&   r\   r5   r   r[   rg   ro   rp   rr   r!   r!   r"   rZ     s6   	

rZ   c	                 C   s  |j }	dd t|	D }
|D ]&}t||}d|  kr|	k s-n J d|	 d| d| |
| | qt|
D ]H\}}t| ||}tj| |||dd\}}|D ].}| j|| d	d
|id|id|d|d|t	|id |s| j|| dd
|id|it	|id qRq9|d u rg S |
| S )Nc                 S      g | ]}g qS r!   r!   .0rj   r!   r!   r"   
<listcomp>      z+insert_fused_reduce_ops.<locals>.<listcomp>r   0root_id should >=0 and < nranks, but now nranks=, the root_id of var= is ry   rz   r%   r=   r>   r&   root_idr   r?   r   )

worker_numrangeget_grad_devicer`   r   r^   rh   rs   rC   r   )r   r<   r&   reduce_varsr1   r5   r   rankZfuse_grad_sizenranksdevice_to_varsrM   r   rb   rg   ro   rp   rr   r!   r!   r"   insert_fused_reduce_ops  T   


r   c	                 C   s   |r|j r|jst| ||||||||j	S g }	|D ]E}
|
}|r+|j r+|jr+|
dd}t||}|dks;J d| |durH||krH|	|
 | j|dd|
id|
id	|d
|d|t|id q|	S )z
    _add_reduce_ops
    ZFusedMergedGrad_ r   5root id should be a positive int, but now root id is Nr%   r=   r>   r&   r   r   r?   )	rX   rY   r   r[   replacer   r`   rC   r   )r   r<   r&   r   r1   r5   r   r   strategyZgrad_in_this_devicerM   Zgrad_varr   r!   r!   r"   insert_reduce_opsA  sZ   


r   c	                 C   s  |j }	dd t|	D }
|D ]&}||}d|  kr|	k s-n J d|	 d| d| |
| | qt|
D ]H\}}t| ||}tj| |||dd\}}|D ].}| j|| d	d
|id|id|d|d|t	|id |s| j|| dd
|id|it	|id qRq9|d u rg S |
| S )Nc                 S   r{   r!   r!   r|   r!   r!   r"   r~     r   z4insert_fused_broadcast_param_ops.<locals>.<listcomp>r   r   r   r   Paramrz   r
   r=   r>   r&   rootr   r?   r   )
r   r   devicer`   r   r^   rh   rs   rC   r   )r   r<   r&   paramsr1   r5   r   r   rf   r   r   rM   r   rb   rg   ro   rp   rr   r!   r!   r"    insert_fused_broadcast_param_ops  r   r   c	                 C   s   |r|j rt| ||||||||j	S g }	|D ]5}
||
}|dks(J d| |dur5||kr5|	|
 | j|dd|
id|
id|d|d	|t|id
 q|	S )z!
    add broadcast param ops
    r   r   Nr
   r=   r>   r&   r   r   r?   )rX   r   r[   r   r`   rC   r   )r   r<   r&   r   r1   r5   r   r   r   Zparam_in_this_devicer3   r   r!   r!   r"   insert_broadcast_param_ops  s@   



r   c                 C   s&  |du s|j s	dS |j}|j}dd t|D }ttt| jD ](\}}	t|	r.|	j	dkr0 n|	j
d }
|	d}|| d|
 | j|dd q!|d	 }t|D ]:\}}t| |}t| ||}tj| |||d
d\}}|D ]}| j|| dd|id|id|d|ddt|id qqqR|   dS )z5
    fuse optimizer sharding broadcast param ops
    Nc                 S   r{   r!   r!   r|   r!   r!   r"   r~     r   z0fuse_opt_broadcast_param_ops.<locals>.<listcomp>r
   r   r   F)syncr*   r   rz   r=   r>   r&   r   Tr?   )rX   r[   r   r   reversedlistr   r   r   r   r   r/   insertZ
_remove_opr^   rc   rh   rs   rC   r   Z_sync_with_cpp)r   r&   r1   r5   r   rf   r   r   r   r   rM   r   r<   rb   rg   ro   rp   rr   r!   r!   r"   fuse_opt_broadcast_param_ops  sB   



r   c                 C   sl   d| v sJ d|  dd }g d}|D ]}|| v r#t |d| } nq||jv s1J d| d|j| S )Nr)   [z] should be a grad variable.)z.cast_fp16@GRAD@MERGEDz.cast_fp16@GRADz@GRAD@MERGED@FP16z@GRAD@MERGEDr)   r   z] should be a param variable.)resubZglobal_param2device)Z	grad_namer1   	base_nameZpossible_suffixessuffixr!   r!   r"   r     s   	

r   Tc                 C   s6   t | jD ]\}}|jdkr|  S q|rtddS )NZcheck_finite_and_unscalezMamp is turned on but check_finite_and_unscale op does not exist in main blockr   )r   r   r   r   )r   Zraise_errorr   r   r!   r!   r"   )get_first_check_finite_and_unscale_op_idx4  s   
r   c                 C   sB   d }t tt| jD ]\}}t|r|d u r|d } |S q|S )Nr*   )r   tupler   r   r   )r   Zfirst_opt_op_idxrm   r   r!   r!   r"   get_first_optimize_op_idxA  s    r   c                 C   sH   t | |}|D ]\}}| j|dd|id|id|d|d|t|id qdS )	z
    _add_broadcast_ops
    r
   r=   r>   r&   r   r   r?   NrB   )r   r<   r&   Zbroadcast2rootr   r5   rO   Zroot_devicer!   r!   r"   insert_broadcast_opsJ  s   
r   r+   r-      r*   c                 C   s2   d| j vsJ tdd | j dt| j  d d S )zH
    input:
        - param: var
    return:
        var size in MB
    r   c                 S   s   | | S Nr!   )xyr!   r!   r"   <lambda>u      zget_var_size.<locals>.<lambda>r*         @)rH   r   DtypeToSizerI   )r3   r!   r!   r"   re   l  s   re         ?c                 C   sx   t tt| jD ]0\}}t|r9|jdksJ d|j |ds%J t|d}|| }|	d|  dS q	dS )z
    In order to keep the learning rate consistent in different numbers of
    training workers, we scale the loss grad by the number of workers
    r   z6loss_grad_op must be fill_constant op, but this op is rJ   N)
r   r   r   r   r   r   Zhas_attrfloatr/   Z	_set_attr)r   scaler   r   Z
loss_scaler!   r!   r"   insert_scale_loss_grad_ops|  s   r   c                 C   s  i }i }|   }|jD ]1}|jdkr%|j d }t||d ||< q|jdkr<|j d }t||d ||< qi }d}| D ],\}}	td| d|	 d t	|	| |v ri|t	|	|   d7  < qEd|t	|	| < qE| D ],\}}	td	| d|	 d t	|	| |v r|t	|	|   d7  < qvd|t	|	| < qvt
d
d:}
t| dd d}|D ]$\}}td| d|d  d|  |
d| d|d  d| d qW d   dS 1 sw   Y  dS )za
    Analyse the parameter size that need to be broadcast/allreduce during sharding training
    r
   r   r   r$   r*   zbroadcast: z: z KBzallreduce: znccl_size.txtwc                 S   s   | d S )Nr   r!   )r   r!   r!   r"   r     r   zcomm_analyse.<locals>.<lambda>)keyz
NCCL size ~z KB: 
N)global_blockr   r   r   r   re   rM   rR   printr7   opensortedwrite)main_programr   r   r   r   r   Zvarsize_countgapkvfZsorted_varsizeZvarsizecountr!   r!   r"   comm_analyse  s>   


$"r   c                 C   s   |dksJ d|   }t }|jD ]&}|jdv r&|j D ]}|| q|jdkr8|j D ]}|| q0q|rT|jddt	|idt	|i|t
jjjdd d	S d	S )
z
    When clone a test prog by clone from the sharding main prog,
    part of the sync_comm op maybe be pruned by mistake, this function
    add the sync_comm op for the test prog.

    r   z(sharding_ring_id should larger than zero)r
   Zc_allreducer   r=   r>   )r&   r5   r?   N)r   setr   r   r   r   addremove	append_opr   r   Zop_proto_and_checker_makerr   r:   )programr2   r   Znot_sync_varsr   r    r!   r!   r"   add_sync_comm  s*   






r   c                    s   |j r|j d }dd dd  dd  fdd	}ttjd
ddkr4tjjj| |||d dS tj	j
| |||dd dS )a  
    When use sharding, part of persistable vars are unique and are partitioned in different ranks,
    and part of persistable vars are duplicated and exist in all the ranks with different values.
    This function handles the model saving for sharding training.
    Zsection_programc                 S   s.   g d}|D ]}| j |r| jr dS qdS )N)Z
_moment1_0Z
_moment2_0Z_beta1_pow_acc_0Z_beta2_pow_acc_0Z_velocity_0Z_ema_0z
@offload_0z
.cast_fp16TF)rN   endswithrk   )rM   Zcheckscheckr!   r!   r"   is_opt_vars  s   
z&save_persistables.<locals>.is_opt_varsc                 S   s   | j dS )Nz@GradiantMerge)rN   r   rM   r!   r!   r"   is_gradient_merge_vars  s   z1save_persistables.<locals>.is_gradient_merge_varsc                 S   s   t | tjjjo
| jS r   )
isinstancer_   baseZ	framework	ParameterZ	trainabler   r!   r!   r"   is_trainable  s   z'save_persistables.<locals>.is_trainablec                    s   | p| p | S r   r!   r   r   r   r   r!   r"   sharding_predicate   s   z-save_persistables.<locals>.sharding_predicateZPADDLE_TRAINER_IDr   )r   filenameN)r   	predicater   )Z_pipeline_optr7   osenvirongetr_   distributediosave_persistablesZstaticZ	save_vars)exedirnamer   r   r   r!   r   r"   r     s"   


r   c              
   C   sp   | j dd|i|j|jddd | j dd|id|id|d	d
ttjid | j dd|gid|gittjid d S )Nr   r>   r*   )rH   rI   rJ   rL   r$   r=   r&   r   Tr?   r   )r   rH   rI   r   r   r:   )r   Zsync_varr&   r!   r!   r"   append_naive_sync  s.   	

r   )r   )T)F)r   r   )6r   r   	functoolsr   r_   Z/paddle.distributed.fleet.meta_optimizers.commonr   r   r   r   r   Zpaddle.frameworkr   Zpaddle.utilsr	   r#   r4   r;   rD   rF   rG   rP   rW   r8   r]   r^   rZ   r   r   r9   r   r   r   r   r   r   r   rS   rT   rV   rU   ZFP64ZINT16ZINT32ZINT64ZBOOLZUINT8r   re   r   r   r   r   r   r!   r!   r!   r"   <module>   s   
D  
0R
-
<
F
<
5
0











*
 ;