o
    "jG                    @   sz   d dl Z d dlZd dlmZ d dlmZmZ d dlZd dl	Z	d dl
mZmZ d dlmZmZmZmZ g ZG dd dZdS )    N)defaultdict)
cmp_to_keyreduce)coreunique_name)	ParameterProgramdefault_startup_programin_dygraph_modec                   @   sp  e Zd ZdZdZddZdd Zdd	 Zd
d Zdd Zdd Z	dd Z
dd Zdd Zdd Zdd Zdd Zdd Zdd Zd d! Zd[d#d$Zd%d& Zd'd( Zd)d* Zd+d, Zd-d. Zd/d0 Zd1d2 Zd3d4 Zd5d6 Zd7d8 Z	"d\d:d;Zd<d= Z	"d[d>d?Z d@dA Z!dBdC Z"dDdE Z#dFdG Z$dHdI Z%dJdK Z&dLdM Z'dNdO Z(dPdQ Z)dRdS Z*dTdU Z+dVdW Z,	"d]dXdYZ-d"S )^PipelineOptimizera
  
        :api_attr: Static Graph

    Pipeline Optimizer: Make a program to run as pipeline, that is splitting a
    program into multiple sections (sub-programs) and each section run on a
    device to enable the training of large scale models and the use of
    heterogeneous devices. Meanwhile, all sections run in the stype of pipeline.

    Args:
        optimizer (Optimizer): The optimizer to use, such as SGD.
        num_microbatches (int): Number of microbatches. [Optional. Default:1].
        start_cpu_core_id (int): The first cpu core id to use. [Optional. Default:0].

    Examples:
        .. code-block:: python

            >>> import paddle
            >>> import paddle.base as base
            >>> import paddle.base.layers as layers
            >>> import numpy as np

            >>> paddle.enable_static()
            >>> with base.device_guard("gpu:0"):
            ...     x = paddle.static.data(name='x', shape=[-1, 1], dtype='int64', lod_level=0)
            ...     y = paddle.static.data(name='y', shape=[-1, 1], dtype='int64', lod_level=0)
            ...     data_loader = base.io.DataLoader.from_generator(
            ...         feed_list=[x, y],
            ...         capacity=64,
            ...         use_double_buffer=True,
            ...         iterable=False)

            ...     emb_x = layers.embedding(input=x, param_attr=base.ParamAttr(name="embx"), size=[10,2], is_sparse=False)
            ...     emb_y = layers.embedding(input=y, param_attr=base.ParamAttr(name="emby",learning_rate=0.9), size=[10,2], is_sparse=False)

            >>> with base.device_guard("gpu:1"):
            ...     concat = layers.concat([emb_x, emb_y], axis=1)
            ...     fc = paddle.static.nn.fc(x=concat, name="fc", size=1, num_flatten_dims=1, bias_attr=False)
            ...     loss = paddle.mean(fc)
            >>> optimizer = paddle.optimizer.SGD(learning_rate=0.5)
            >>> optimizer = paddle.incubate.optimizer.PipelineOptimizer(optimizer)
            >>> optimizer.minimize(loss)

            >>> def train_reader():
            ...     for _ in range(4):
            ...         x = np.random.random(size=[1]).astype('int64')
            ...         y = np.random.random(size=[1]).astype('int64')
            ...         yield x, y
            >>> data_loader.set_sample_generator(train_reader, batch_size=1)

            >>> place = paddle.CUDAPlace(0)
            >>> exe = paddle.static.Executor(place)
            >>> exe.run(paddle.static.default_startup_program())
            >>> batch_size = 1
            >>> data_loader.start()
            >>> exe.train_from_dataset(
            ...         paddle.static.default_main_program())
            >>> data_loader.reset()
       r   c                 C   s
  d| _ t r
d| _ t rtdtjjtjj	j
jf}t||s-td| dt| d|| _| j| _t| jdrE| jj| _t| jds:|dksMJ d	|| _|d
ksXJ d|| _d | _tj}|j| _| | _| | _| | _d | _g | _ i | _!d | _"d | _#d S )Ncpugpuz,In dygraph, don't support PipelineOptimizer.zGThe 'optimizer' parameter for PipelineOptimizer must be an instance of z, but the given type is .	inner_optr   z*num_microbatches must be a positive value.r   z1start_cpu_core_id must be a non-negative integer.)$_devicer   is_compiled_with_cudar
   	Exceptionpaddle	optimizerZ	OptimizerZstaticamp	decoratorZOptimizerWithMixedPrecision
isinstance
ValueErrortype
_optimizer_origin_optimizerhasattrr   _num_microbatches_start_cpu_core_idZ_place_listZop_proto_and_checker_makerZOpRole_op_roleZkOpRoleAttrName_op_role_keyZkOpRoleVarAttrName_op_role_var_keyZkOpDeviceAttrName_op_device_key_param_device_map_pipeline_pair_pp_ring_mapoutput_var_to_opinput_var_to_op)selfr   num_microbatchesstart_cpu_core_idZvalid_optimizersZop_maker r,   c/var/www/html/Deteccion_Ine/venv/lib/python3.10/site-packages/paddle/incubate/optimizer/pipeline.py__init__^   sP   








zPipelineOptimizer.__init__c           	      C   sL  |j | }|j d }||}d}|jdkrIt|d }|j|dgdd}|j|d | dd|id	|id
|j	d|j	| j
| jjid |d7 }|j|d | |jdkrWdndd|jdkr`|n|id	|jdkrj|n|id| j| j
| jjddid |d7 }|jdkr|j|d | dd|id	|id
|j	d|j	| j
| jjid |d7 }|S )zj
        Insert allreduce op to sync global information for global
        gradient clip and amp.
        r   
reduce_anyZ_cast_int32r   Zint32)nameshapedtypecastXOutin_dtype	out_dtyper   inputsoutputsattrsZc_allreduce_maxZc_allreduce_sumring_iduse_calc_streamT)opsdescoutput_arg_namesvarr   r   generate
create_var
_insert_opr2   r!   r    Optimizeglobal_ring_id)	r)   op_idxblockopout_nameZout_varoffsetZtemp_var_nameZtemp_varr,   r,   r-   _insert_allreduce_op   s\   










z&PipelineOptimizer._insert_allreduce_opc                 C   s  t  }d}d}|j }||| k rbd}|j| }g }	|jdkr)| |r)d}n|jdkrN| |rN|jdD ]}
||
rE|	|
 q9|j	d|	 n|jdkru|jdD ]}
||
re|	|
 qY|j	d|	 |j
d|	 n]|jd	kr|jdD ]}
||
r|	|
 q|j	d|	 |j
d|	 t|	dkr|| |d
8 }qn&|jdkr| |r|jdD ]}
||
r|	|
 q|j	d|	 d}|j |j  }|D ]a}||v sd|v rq|| |t|rq|t|}|jtjjjkr|j|tjjj|jd}n&t|tr3|j|j|j|j|j|j|j |j!|j"|j#|j$d
}n|%|d}| &|| q|d
7 }| j'sK|sLq| (|d
 |}||7 }||7 }||| k s|)  d S )Nr   Fr/   Tconcatr4   update_loss_scalingr5   check_finite_and_unscaler   sumZ_blocking_queue)r0   r   persistable)
r0   r1   r2   r   	lod_levelstop_gradient	trainableoptimize_attrregularizer
error_clip)*setr?   op_sizer>   r   _is_optimize_opinputZ_find_var_recursiveappendZ	set_inputZ
set_outputlen
_remove_op_is_gradient_clip_opinput_arg_namesr@   addstrZ_var_recursiver   VarDescVarTypeZREADERrC   rQ   r   r   Zcreate_parameterr0   r1   r2   rR   rS   rT   rU   rV   rW   Z_clone_variable_clone_var_attruse_shardingrL   _sync_with_cpp)r)   rH   Z	ori_blockZused_var_setZadded_op_numrG   rY   Zshould_insertrI   Z
reserved_x
input_namevarsrA   Z
source_varZdest_varZinserted_opsr,   r,   r-   _create_vars   s   













OzPipelineOptimizer._create_varsc                 C   s@   | j |jv sJ t|| j }|t| jj@ o|t| jj@ S N)r!   
attr_namesintattrr    BackwardLoss)r)   rI   op_roler,   r,   r-   _is_loss_grad_op  s
   z"PipelineOptimizer._is_loss_grad_opc                 C   s(   | j |jv ot|| j t| jjkS rk   )r!   rl   rm   rn   r    Forwardr)   rI   r,   r,   r-   _is_forward_op!     z PipelineOptimizer._is_forward_opc                 C   (   | j |jv ot|| j t| jj@ S rk   )r!   rl   rm   rn   r    ro   rt   r,   r,   r-   _is_backward_op&  rv   z!PipelineOptimizer._is_backward_opc                 C   s,   | j |jv sJ t|| j t| jjkS rk   )r!   rl   rm   rn   r    rp   rt   r,   r,   r-   _is_loss_op+  s   zPipelineOptimizer._is_loss_opc                 C   rw   rk   )r!   rl   rm   rn   r    rE   rt   r,   r,   r-   rZ   /  rv   z!PipelineOptimizer._is_optimize_opc                 C   s   d|j v od|j v od|j v S )NParamZGradZLearningRate)input_namesrt   r,   r,   r-   _is_update_op4  s
   
zPipelineOptimizer._is_update_opc                 C   s   t t}|d}|jD ]J}|| j}|| j dkr<|D ]}|| }|j}| j	 }	|	
| |	| jd qq|| }|j}| j	 }	|	
| |	| jd qg }
|D ]}|| }|  |
| q[|
S )a  
        Split a program into sections according to devices that ops run on.
        The op whose op_device attr is "gpu:all" is copied to all sections.

        Args:
            main_program (Program): the main program
            devices: all used devices
        r   :all )r   r   rH   r>   rn   r#   r   r?   global_block	append_op	copy_from	_set_attrrg   r\   )r)   main_programZdevicesZdevice_program_maprH   rI   deviceprogramop_descap_opprogram_listkeyr,   r,   r-   _split_program;  s.   




z PipelineOptimizer._split_programc                 C   s8   d|v sd|v sJ d|d| d }| j| }|S )a  
        For adam optimizer, it will add accumulators and initialize them
        with fill_constant, and force the op device to cpu. Hence, we should
        get the real op_device attribute of the fill_constant as the device
        where the corresponding parameters on.
        Zbeta1_pow_accZbeta2_pow_acczPFor accumulators for Adam, the name must contain beta1_pow_acc or beta2_pow_acc.r   Z_beta)indexr$   )r)   var_name
param_namer   r,   r,   r-   "_get_op_device_for_startup_programa  s   
z4PipelineOptimizer._get_op_device_for_startup_programc                 C   s   |  }t }|jD ]J}|| j}|dkr)|jdksJ d|jd }| |}|r5t|	dd }nd }|r>||kr>q
|j
}	|  j
 }
|
|	 |
| jd q
|  | |  | |S )Nr   fill_constantzcFor ops in startup program with the op_device attribute of cpu, they must be of type fill_constant.r   :r   r~   )r   r   r>   rn   r#   r   r@   r   rm   splitr?   r   r   r   rg   rj   )r)   startup_programZ	device_idrH   new_startup_programrI   r   Z
output_varZdevice_indexr   r   r,   r,   r-   _split_startup_programp  s,   



z(PipelineOptimizer._split_startup_programc                 C   sj   d|v r
| dd}d|v r| dd}| j| }|du rdS d}t|D ]\}}||kr2|} |S q%|S )zM
        Find the post op that has variable named var_name as input.
        z
.cast_fp32r~   
.cast_fp16N)replacer(   reversed)r)   r   r   Zpost_ops	result_oppost_opZpost_idxr,   r,   r-   _find_post_op  s   
zPipelineOptimizer._find_post_opc                 C   sB   | j | }|du rdS d}t|D ]\}}||k r|} |S q|S )ze
        Find the previous op of op with index that outputs
        variable named var_name.
        N)r'   r   )r)   r   r   Zprev_opsr   prev_opZprev_idxr,   r,   r-   _find_prev_op  s   
zPipelineOptimizer._find_prev_opc                 C   s   | || ||| d S rk   )Z_rename_inputZ_rename_output)r)   rI   Zold_namenew_namer,   r,   r-   _rename_arg  s   zPipelineOptimizer._rename_argNc              
   C   sH   |j ||j|du r|jn||j|j|j|j|j d}| 	|| |S )z
        Create a new var for block, which has the same type,
        shape and dtype as ref_var, then rename it with the
        name `name`.
        N)r0   r1   r2   r   rR   rQ   is_dataneed_check_feed)
rC   r1   r2   r   rR   rQ   r   r?   r   re   )r)   rH   Zref_varr0   r2   Znew_varr,   r,   r-   _create_var  s   
zPipelineOptimizer._create_varc                 C   s"   |j |_ t|dr|j|_d S d S )Nis_distributed)rS   r   r   )r)   destsrcr,   r,   r-   re     s   
z!PipelineOptimizer._clone_var_attrc                 C   s&   | t }|dkr|d| S |S )zD
        Strip the grad suffix from the given variable name
        N)findr   grad_var_suffix)r)   r0   posr,   r,   r-   _strip_grad_suffix  s   z$PipelineOptimizer._strip_grad_suffixc                 C   s   |t   S )z?
        Append grad suffix to the given variable name
        )r   r   )r)   r0   r,   r,   r-   _append_grad_suffix  s   z%PipelineOptimizer._append_grad_suffixc                 C   s<   | | jr|| jnd}|r|dd dksJ d|S )z6
        Get the op_device attribute of a op.
        Nr      r   z<Now, only gpu devices are supported in pipeline parallemism.)has_attrr#   rn   )r)   rI   r   r,   r,   r-   _get_op_device_attr  s   
z%PipelineOptimizer._get_op_device_attrc                 C   sx  t | jj}|| j|kr|| j| j d dS |jdkrq| 	|rq|j
 D ]
}d|v s4J dq*t|j
 dks@J |j
 d }| ||}|ds\J |j d| || j}|shJ d	|| j| dS |jd
ks{|jdkr| 	|s| |r| ||j
dd }	|| j|	| j dS |jdkr| |st|jdkrt|jdksJ |jd }
|jd }d|v r| ||}|| j|| j dS | ||j
dd }	|| j|	| j dS | |rVd}|j||  | jr|j||  | js.|d7 }|j||  | jr|j||  | jr|j||  | j}|s@J dt|D ]}|j||  | j| qDdS | |r|jd
kr|d}t|dkspJ | |d }| j| }|| j| dS | |s| |r| j|jv sJ d|| j}t|dksJ d|d }| j| }|jdks|jdks|jdks|jdks|jdkr| j d}|| j| dS |jdks|jdkr|| j| j d || j| jj |jd }||}d|_dS g d}|j|v s'J d| d|j | |s/J || j| j d dS )a  
        Add op_device attrribute for ops that have not that attribute set.
        We use "gpu:all" to represent the op should be put on all
        sub-programs, such as lr-related ops. Note that: "gpu:all"
        is only used by pipeline as an indicator.
        r}   rP   z@RENAME@z3The op must be sum used to accumulate renamed vars.r   r   	op_devicez has no op_device attr for var z$The post op must have op_device set.r3   scaler4   memcpyz@Fetchz1Please put you program within device_guard scope.r5   zEgradient_clip and regularization ops must have op_role_var attribute.   zHop_role_var for gradient_clip regularization ops must have two elements.sqrtr   Zelementwise_maxZelementwise_divZalloc_float_statusZclear_float_statusT)rN   r/   rM   rP   rO   r   z9For other ops without op_device set, they must be one of z, but it is N) rm   r    LRSchedrn   r!   r   r#   r   r   rx   r?   r`   r]   r@   r   r   ru   r   r[   rZ   ry   r>   rangeoutputr   r$   r_   _is_regularization_opr"   rl   rA   rQ   )r)   rI   idxrH   Zlrsched_roler0   rJ   r   r   r   rh   Zoutput_namerK   i	grad_namer   op_role_varZfloat_status_nameZfloat_status_varZother_known_opsr,   r,   r-   _add_op_device_attr_for_op  s   
 







z,PipelineOptimizer._add_op_device_attr_for_opc                 C   sl   t t|jD ],\}}|jdks|jdks|jdkr&|| j| j d q| |r,q| ||| qdS )ze
        Add op_device attrribute for ops in block that have
        not that attribute set.
        create_py_readerreadZcreate_double_buffer_readerr}   N)		enumeratelistr>   r   r   r#   r   r   r   )r)   rH   r   rI   r,   r,   r-   _add_op_device_attrb  s   



z%PipelineOptimizer._add_op_device_attrc                 C   sr  g }t | jjt | jjt | jjt | jjt | jjt | jjt | jjB g}|jD ]}||j	sH|j	dkrD|
| jt | jjksHJ d|| js[J d|j	 d| j d|
| j}t ||v sqJ d||j	||| jsJ d|j	 d| j d|
| j}|sJ d|j	 d|| j d	krq+|d
d }|dksJ d||vr|| q+|S )z
        Check whether ops in a block have both the op_device and the
        op_role attributes set.
        Then, return all devices in order.
        Zconditional_blockz`Now, the only supported op without kernel is conditional_block, and its op role must be LRSched.zop (z	) has no z attribute.z&op_role {} for op {} must be one of {}zop_device attribute for op z has not been set.r}   r   r   r   z<Now only gpu devices are supported for pipeline parallelism.)rm   r    r   rs   ro   rp   rE   r>   _has_kernelr   rn   r!   r   formatr#   r   r   r\   )r)   rH   device_listZvalid_op_role_valuerI   rq   r   Zdev_typer,   r,   r-   _check_validationy  sX   








z#PipelineOptimizer._check_validationc                    s  i d}t tjD ]\r} nqd|dt tjD ]\j}|j dkr8q%jD ]		}|j	rFq;d}
	}|du r]	jvrXq;j	 }|si|rg|jnd}|du su|j dkrvq;||kr{q;	vrg 	< ||f	 v rq;|dd d fdd  	f
dd	t|dd
 t|dd
  q;q%  dS )zp
        Insert a pair of send and recv ops for every two
        consecutive ops on different devices.
        Nr   )r   first_optimize_indexr}   r   c                    sr     } }|s|sJ d  |r&|| k s$J d||  d S |r5|| ks7J d||  d S d S )Nzdsend/recv in pipeline should only be inserted in forward or backward,please check the op_role of op=zIn forward, send/recv can only be passed forward, but now prev_stage={} great than cur_stage={}, please check op_device of op={}zIn backward, send/recv can only be passed backward, but now prev_stage={} less than cur_stage={}, please check op_device of op={})ru   rx   r   )cur_idprev_idZ
is_forwardZis_backward)rI   r)   r,   r-   _check_stage  s.   




zKPipelineOptimizer._insert_sendrecv_ops_for_boundaries.<locals>._check_stagec                    sX  t |  }t | }||f	 v rd S | | dkr5| d | | | d  	 ||f d S | | dk rT| d | | | d  	 ||f d S t| | dks^J 	 ||f j}j	 }|| f}|d |  }|jvrj| jj|< j} jd7  _nj| }j	dkrj
d  dd|ij|j|dd	d
dd|id d  d7  < t|j}	|	d dk rԈjn|	d |	d< j
d  dd|gid|	d|jj|j|dd	d
dd|id d  d7  < d S j	dkrt|j}	|	d dk rjn|	d |	d< t|	}
jdko1|
j dk}d|jv rt|jdd dd }|}j
d  dd|gid|gid|	d|jj|j|dd	id d  d7  < d S  | | j
d  dd|gid|gij|j|id d  d7  < |jdd }|}t|trd	nd}j
d  |r|rdndd|ij|j|ddd|d
ddjdjid d  d7  < d }t|tjjkrd }jj}n}jj}j
|d  dd|gid|gij|j|d|id}t|tjjkr4|dd  d  d7  < j
d  |rA|rCdnd!d|gid|	d|jj|j|dd	d
dd|djdji	d d  d7  < |r|sj
d  d"d|gid|gij|j|dd	ddd#jd$jid d  d7  < d S d S d S td%j	 d&)'Nr   r     zF-then-Br   send_v2r4   r=   Tpeerr<   r   r   r9   r;   r   recv_v2r5   	out_shaper2   r   r   r:   r;   1F1BZsubprogZassignr   r   r9   r:   r;   Zc_sync_calc_stream@FZpartial_sendnumidr   c_sync_comm_streampipeline_flagr~   partial_recvpartial_allgathernranksrankz@Now only 'F-then-B' and '1F1B' are supported.The given value is r   ) rb   r\   absrn   r!   ri   r%   r<   r&   schedule_mode_insert_op_without_syncr#   r   r1   micro_batch_sizer2   npprod	mp_degreer0   r   rA   r   r   mp_rankrm   r    ro   rE   rs   r   r   )r   r   Zcur_devZprev_devrq   rA   pairpair_keyr<   Z	var_shapeZnumelZuse_mpZorigin_nameZassociate_varZprefix_nameZ
prefix_varZis_paraminsert_indexZnew_op_roleZsync_comm_op
r   _insert_send_recvrH   Zdevice_typeZextra_index_infor   Zinput_var_to_devicerI   r)   r   r,   r-   r     sp  


















zPPipelineOptimizer._insert_sendrecv_ops_for_boundaries.<locals>._insert_send_recvr   )r   r   r>   rZ   rn   r#   r   r`   rA   r   r   r$   r   rm   rg   )r)   rH   r   Z
cur_devicerA   Zprev_devicer   r,   r   r-   #_insert_sendrecv_ops_for_boundaries  sb   




 V    z5PipelineOptimizer._insert_sendrecv_ops_for_boundariesc                 C   s   | j dkrdS tttt|jD ]2\}}| |rD|jdks(J d|j |ds/J t	|
d}|| j  }|d|  dS qdS )zJ
        Scale the loss corresponding to number of micro-batches.
        r   Nr   z6loss_grad_op must be fill_constant op, but this op is value)r   r   tupler   r   r>   rr   r   r   floatrn   r   )r)   rH   r   rI   Z
loss_scaler,   r,   r-   _insert_loss_scale  s    


z$PipelineOptimizer._insert_loss_scalec           
      C   s   t |jD ]=\}}| |sq|j}|j}|| }|jdks#|jdkr$q|D ]}t |vr/q&|t }|d }	| 	|||	 q&qd S )Nr3   r   @MERGED)
r   r>   rZ   r`   r@   r   r   r   stripr   )
r)   rH   r   rI   r{   output_namesZin_out_namesr0   r   Znew_grad_namer,   r,   r-   _rename_gradient_var_name  s    
z+PipelineOptimizer._rename_gradient_var_nameFc                 C   s  |r|j nd}|r|jr| |||j|}|S g }d}|rdnd}	|r&tjnd}
tttt	|j
D ]4\}}| |rd|jdkrd|jd }|jd }|d| jv rd|dd	|ks^J || q3| |rq|du rq|d
 }| |rh| j|jv rh|| j}t|dkrq3t|d dksJ tdt|dD ]}d}|| }||sqd|v rq|t  }||	 }||s| ||j| ||
 ||sJ ||}||}d|_|j || di d|gid|j!d|j"dt#d| j$| j%j&j'id |d
7 }||d
  }|j| }d|v }||u}|rG|d }| ||||
}d|_|j || dd|id|id|j"d|j"| j$| j%j(id |d
7 }|}|j || dd||gid|i| j$| j%j(id |d
7 }|)| qq3|sn|S d}tttt	|j
D ]\}}| |r|du r|d
 } nq{|dusJ |D ]M}|dd	}|dd	}||s| ||j| | ||sJ ||}||}d|_|j |dd|id|id|j"d|j"| j$| j%j&id q|S )zz
        Create a new merged gradient for each parameter and accumulate the
        corresponding gradient to it.
        FN@MERGED@FP16r   r3   r   @GRADr   r~   r   r   
@BroadCastTr   r5   r1   r2   r   r   	cast_fp16@TMPr4   r6   r7   rP   z@FP16z@GRAD@MERGED@FP16)*fp16_allreduceZfuse_grad_merge_accumulate_gradients_with_fuseZfuse_grad_size_in_MBr   float16r   r   r   r   r>   rZ   r   r`   r@   r   r$   r   r^   rx   r"   rl   rn   r]   r   has_varr   r   r   ri   rA   rQ   rD   r1   r2   r   r!   r    rE   r   ro   r\   )r)   rH   Zpp_allreduce_in_optimizeZstrategyshardr   Zfused_gradient_namesZmerged_gradient_namesfirst_opt_op_idxmerged_suffixr2   r   rI   in_namerJ   r   r   rK   r   Zparam_grad_nameZmerged_param_grad_nameZparam_grad_varZmerged_param_grad_varr   Zgrad_varis_fp16_grad	need_castcast_grad_var_namecast_grad_varfp16_grad_nameZfp16_grad_varr,   r,   r-   _accumulate_gradients  s   

 










	



z'PipelineOptimizer._accumulate_gradientsc           *      C   s  |  ||}g }|rdnd}|rtjntj}d}	d }
|D ]n\}}||}|j|t  | ||jddd}||}t	|drD|j
|_
| |}t|dksZ|	| |ksZ|j|
krk||g|g|gf |j}
d}	q|d	 d | |d	 d
 | |d	 d | |	|7 }	qg }g }|D ]H}|d }|d }|jd|d j |d jddd}|d jtjkrdnd}|d|d j  }|j||d jddd}|| || qt|t|ksJ t|t|ksJ d }t|jD ]\}}| |r|d u r|} nq|d usJ d}tt|D ]s}|| }|| }|| d }|| d
 } || d }!|j|| dd| i||dddddddd|d j| j| jjdtdddid |d
7 }|j|| dd| i|!|dddddddddddd|!d j| j| jjjid |d
7 }q||7 }d}tt|D ]a}|| }|| }d|jv }"|"|u}#|#r|jd }$|j|$|ddd}%|j|| dd|id |%id!|jd"|%j| j| jjid# |d
7 }|%}|j|| d$d||gid |i| j| jjid# |d
7 }q|rR|D ]R\}}||}|t  d }&||&sJ ||&}'|t  d }(|j|(tj|jddd})|j|| dd|'id |)id!tjd"tj| j| jjid# |d
7 }qtt|D ]
}|| j||< qX||fS )%Nr   r   g        TF)r0   r2   r1   rQ   rS   r   r   r   r   r   Z
FusedGrad_)r0   r2   rQ   rS   zFusedMergedGrad.cast_fp16.ZFusedMergedGrad_Zcoalesce_tensorZInput)OutputZFusedOutputZuser_defined_size_of_dtypeZ	copy_dataZ	use_alignr2   Zset_constantZnpuZconstantr8   r   r   r3   r4   r5   r6   r7   r   rP   )_sort_grad_param_by_dtyper   r   float32rA   rC   r   r   r1   r   r   _get_var_sizer]   r2   r\   r0   r   r>   rx   r   r   r!   r    ro   Zis_compiled_with_custom_devicerE   r   rD   r   )*r)   
main_blockfp16
fused_sizegrad_param_pairsr   Zgrad_param_segmentsr   r2   Zcur_sizeZ
last_dtypeZgradparamZ	real_gradZmerged_grad_varZ
real_paramZtmp_sizeZfused_gradientsfused_merged_gradientsZgrad_param_segmentZgrad_segmentZmerged_grad_segmentZ
fused_gradZfused_merged_grad_name_prefixZfused_merged_grad_nameZfused_merged_gradZfirst_back_op_idxr   rI   rK   r   ZgradsparamsZmerged_gradsr   r   r   r  r  Z	fp16_gradZfp32_grad_nameZ	fp32_gradr,   r,   r-   &_insert_accumulate_gradients_with_fuse  sV  





	











z8PipelineOptimizer._insert_accumulate_gradients_with_fusec                 C   s  d }g }t ttt|jD ]\}}| |r?|jdkr?|jd }	|jd }
|
	d| j
v r?|	dd|
ks9J || q| |rV|d u rV|d }|t|jkrV d S | |r| j|jv r|| j}t|dkrnqt|d dksxJ tdt|dD ]}|| }||sqd|v rq|||d  || f qqt|dkrd S |r|jnd}d	d
 t|D }|D ]"}|r||d nd}d|  kr|k sJ  J || | qg }|D ]}| |||||\}}||7 }q|  |S )Nr3   r   r   r   r~   r   r   r   c                 S   s   g | ]}g qS r,   r,   ).0r  r,   r,   r-   
<listcomp>  s    zEPipelineOptimizer._accumulate_gradients_with_fuse.<locals>.<listcomp>)r   r   r   r   r>   rZ   r   r`   r@   r   r$   r   r^   rx   r]   r"   rl   rn   r   r   r\   Z
worker_numr   r  rg   )r)   r	  r
  r  r   r   r  r   rI   r   rJ   r   r   r   r   Zdevice_to_pairsr   Zroot_idZall_fused_merged_gradientspairsr  r,   r,   r-   r   |  sb   






z1PipelineOptimizer._accumulate_gradients_with_fusec           	      C   sx   g }g }g }|D ]%}| |d j}|tjkr|| q|tjkr(|| q|| q|}|| || |S )Nr   )rA   r2   r   r  r\   r   extend)	r)   r	  r  Z
fp16_pairsZ
fp32_pairsZother_pairsr  r2   Zsorted_pairsr,   r,   r-   r    s   



z+PipelineOptimizer._sort_grad_param_by_dtypec                 C   s   t jjjdt jjjdt jjjdt jjjdt jjjdt jjjdt jjj	dt jjj
dt jjjdi	}d|jvs6J tdd |jd||j  d d S )	Nr         r   r   c                 S   s   | | S rk   r,   )xyr,   r,   r-   <lambda>  s    z1PipelineOptimizer._get_var_size.<locals>.<lambda>g      @)r   rc   rd   ZFP16ZBF16ZFP32ZFP64ZINT16ZINT32ZINT64ZBOOLZUINT8r1   r   r2   )r)   rA   Zdtype_to_sizer,   r,   r-   r    s&   








zPipelineOptimizer._get_var_sizec                 C   s   |j }|D ]E}|djD ]<}|dsq|dj}||}|jdd}|jD ]}	|	j}
|j }|	|
 q)|
  | || |d| qqd S )Nr   Z	sub_block)Z
parent_idx)r   rH   r>   r   rn   r   Z_create_blockr?   r   r   rg   rj   r   )r)   r	  r   r   progrI   Zorigin_sub_block_idZorigin_sub_blockZnew_sub_blockZsub_opr   r   r,   r,   r-   _add_sub_blocks  s"   



z!PipelineOptimizer._add_sub_blocksc                 C   s0   |j D ]}||jsq|| j}|  S d S rk   )r>   r   r   rn   r#   )r)   rH   rI   r   r,   r,   r-   _get_device_info  s   
z"PipelineOptimizer._get_device_infoc                 C   s  i }|D ]0}| d}|jD ]%}|dkrq||}|jsq||vr&g ||< ||| vr3|| | qqt| D ]}t|| dkrJ|| q;i }	| D ]S}|| D ]L}| d}|j	D ]A}
|
j
dksw|
j
dksw|
j
dksw|
j
dkrxqa|
| jt| jjjkrqa||
j v r||	vsJ d| d	|
 d
||	|<  nqaqWqQ| D ]}||	vrq|	| }| d}| |}t|dd }|| }|D ]}||krq| d}| |}t|dd }||f}|d | }|| jvr| j| | j| j|< | j}|  jd7  _n| j| }|jddd||i| j|dd| j| jjd|d|id |jddd||gid||jd||j| j|dd| j| jjd|d|id |jddd||gid||gi| j|| j| jjd|id qqdS )zu
        Special Case: process persistable vars that exist in
        multiple sections, e.g., shared weight
        r   Zdouble_buffer_0r   r   r   r   rN   z two sections write the same var(z): second op r   r   r   r   r4   r=   Fr   r<   r   r5   r   r2   r   r   r   N)rH   ri   rA   rQ   r\   r   keysr]   popr>   r   rn   r!   rm   r    rE   r   r?   r@   r  r   r%   r<   r&   rD   r#   r1   r2   )r)   r   Zstartup_progr   Zvar_infor  rH   r   rA   Z
write_inforI   Z
write_progZwrite_blockZwrite_deviceZwrite_dev_indexZ	all_progsZ
read_blockZread_deviceZread_dev_indexr   r   r<   r,   r,   r-   +_process_persistable_vars_in_multi_sections  s   	




















z=PipelineOptimizer._process_persistable_vars_in_multi_sectionsc                 C      |j do|j ddS )Nop_namescopez/gradient_clipr?   r   rn   
startswithrt   r,   r,   r-   r_   v  
   z&PipelineOptimizer._is_gradient_clip_opc                 C   r   )Nr!  z/regularizationr"  rt   r,   r,   r-   r   {  r$  z'PipelineOptimizer._is_regularization_opc                 C   s   |j dod|j dv S )Nr!  zweight decay)r?   r   rn   rt   r,   r,   r-   _is_weight_decay_op  s
   z%PipelineOptimizer._is_weight_decay_opc                 C   sh   t t}t t}t|jD ]"\}}|jD ]}|| ||g q|jD ]}|| ||g q#q||fS )z2
        Get info of op input and output.
        )r   r   r   r>   r`   r\   r@   )r)   rH   r'   r(   r   rI   r   r,   r,   r-   _get_input_output_info  s   

z(PipelineOptimizer._get_input_output_infoc           
      C   s
  | j dkrdS |d}| jdkrdnd}d}t|jD ]\}}|j|kr.| |r.|} nq|du r5dS d}tt|jD ]@\}}||krH n7|jdkr~|dr~|j	d }|
|}	|j|| d	d
 |d8 }|j|dd|	gid|	gi| j| jjid q>|  dS )zC
        optimize forward send's sync_comm_stream schedule
        r   Nr   r   r   r   r   r   F)syncnopr4   r5   r   )r   rH   r   r   r>   r   rx   r   r   r`   rA   r^   r   r!   r    ro   rg   )
r)   r   rH   Z	recv_typeZbackward_recv_indexr   rI   rK   r   rA   r,   r,   r-   _optimize_forward_send_sync  s<   



z-PipelineOptimizer._optimize_forward_send_syncc                 C   s  d}d}|  }t|  j}t|D ]}d}|  j| }t|| j}	|	t| jjkr4|du r4|}|j	dkrI|j	dkrI|j	dkrI|j	dkrIq|	t| jj
kr]||krZ|d7 }q|}n|	t| jjkrq||krn|d7 }q|}ntd|	 i }
|jD ]	}|||
|< q}i }|jD ]	}||||< q|j||j	|
|| d	 ||d  |	t| jj
kr|d7 }q|	t| jjkr|d7 }q|  dS )
zc
        A pass to move the recv op to the beginning of
        the forward/backward phase
        r   Nr   r   r(  r   r   zUnknown op_role: r   )r   r]   r>   r   rm   rn   r!   r    ro   r   rs   r   r{   r[   r   r   r   Z	all_attrsr^   rg   )r)   r   Zforward_insert_indexZbackward_insert_indexrH   Znum_opsr   r   rI   rq   Z	op_inputsr0   Z
op_outputsr,   r,   r-   _mv_head_recv  s^   






zPipelineOptimizer._mv_head_recvc                 C   s   |  }t }t }|jD ]/}| |r(|jD ]}|j| }|jr&|| qq| |r<|j	D ]}||v r;|| q0qt
|dkrEdS td|  dS )z;
        Pipeline may need multiple forward before
        r   Na  The pipeline requires multiple forward calculations before backward, so when the persistable var is changed in the forward, it may cause errors in the backward calculation who using this persistable var. However, some backward op don't need this var(NoNeedBufferVars), there will be no error at this time.
So please check these persistable vars which changed in forward and used in backward:
)r   rX   r>   ru   r@   ri   rQ   ra   rx   r`   r]   warningswarn)r)   r   rH   Zpersist_outputZused_in_backwardrI   r   rA   r,   r,   r-   _check_pipeline_persist_var  s.   







z-PipelineOptimizer._check_pipeline_persist_varc                 C   s  |j }|| _|j}|d u rt }|j}|sJ dg d}|D ]}	|	|v s-J d|	 dq|d | _|d | _|d | _|d | _|d	 | _	|d
 | _
|d | _|d | _|dd| _| jdksdJ d| j  krq| jk stJ  J | j||||\}
}| jj| _| |\| _| _| | | |}dd }t|t|d}||ksJ d| | |j}| ||}|D ]
}| | | qtdd rt td| _| jt!|k sJ dn	|  jt!|;  _| "|| j  | #|| g }|D ]}t |$dd }t%& r|'t%(|d  q| )|| j}d|i|_|| j  }| js2| *| | jsH| +| |,  | -| |,  t%& rUt tdd}| .|| j  | /|| j  dd| jt!|| jt!||| j || j |d| j0| j1d|_|
||| j2| j3fS )NzPlease use pipeline with fleet.)
local_rankr   r   r<   rF   rf   r   r   z&Please use pipeline with fleet to use r   r.  r   r   rf   r<   rF   r   r   scale_gradientFr   r   c                 S   s@   t | dd }t |dd }||k rdS ||krdS dS )Nr   r   r   r   )rm   r   )Zdevice1Zdevice2Zdev1_idZdev2_idr,   r,   r-   
device_cmpL  s   z.PipelineOptimizer.minimize.<locals>.device_cmp)r   z`With pipeline parallelism, you must use gpu devices one after another in the order of their ids.ZPADDLE_MANUAL_PIPELINE_STAGEzTManually specified pipeline stage must be less than total number of pipeline stages.r   r   ZFLAGS_selected_gpus0ZPipelineTrainerZSectionr   )ZtrainerZdevice_workerZpipeline_stageZnum_pipeline_stagesr   Zinner_parallelismZsection_programZplaceplace_idZ
sync_stepsr*   r+   )4rH   Zorigin_main_blockr   r	   Z_pipeline_optr.  r   r   rf   r<   rF   r   r   getr/  r   minimizer   r$   r&  r'   r(   r   r   sortedr   r   r   rj   r   osgetenvrm   r]   r)  r  r   r   r   r\   Z	CUDAPlacer   r   r   rg   r  r*  r-  r   r   r%   r&   )r)   Zlossr   Zparameter_listZno_grad_setr	  r   Zpipeline_optrequired_keysr   Zoptimize_opsZparams_gradsr   r0  Zsorted_device_listr   pZ
place_listdevZ	dev_indexr   Z
real_blockr2  r,   r,   r-   r4    s   










 





	




zPipelineOptimizer.minimize)r   r   rk   )FNN)NNN).__name__
__module____qualname____doc__r.   rL   rj   rr   ru   rx   ry   rZ   r|   r   r   r   r   r   r   r   re   r   r   r   r   r   r   r   r   r   r  r  r   r  r  r  r  r  r_   r   r%  r&  r)  r*  r-  r4  r,   r,   r,   r-   r   "   sf    
;-8W&
z8  +
   a
>~,9r   )r6  r+  collectionsr   	functoolsr   r   numpyr   r   Zpaddle.baser   r   Zpaddle.base.frameworkr   r   r	   r
   __all__r   r,   r,   r,   r-   <module>   s   