o
    "j                     @   sF  d dl Z d dlZd dlZd dlZd dlZd dlmZ d dlZd dlmZ ddl	m
Z
 ddlmZ ddlmZmZmZmZ ddlmZ d	d
lmZ d	dlmZ e jdejj Zerbd	dlm Z! nd	dlm"Z! d dl#m$Z$m%Z%m&Z& g Z'e(e jdd	Z)dddZ*G dd dZ+G dd deZ,G dd de,Z-G dd de-Z.dS )    N)defaultdict)	framework   )HybridParallelOptimizer)timer_helper)broadcast_dp_parametersbroadcast_mp_parametersbroadcast_sep_parametersbroadcast_sharding_parameters)logger   )MetaParallelBase)PipelineLayerZPADDLE_USE_FOUR_DIRECTIONS_P2P)!four_directions_p2p_communication)p2p_communication)HOOK_ACTIONFusedCommBufferassign_group_by_sizeZFLAGS_shard_use_reduceFc                 C   s   | st stjS |rtjS tjS N)g_shard_use_reducer   Z
ALL_REDUCEZREDUCE_SCATTERREDUCE)Zis_dpZshard_split_param r   y/var/www/html/Deteccion_Ine/venv/lib/python3.10/site-packages/paddle/distributed/fleet/meta_parallel/pipeline_parallel.py
get_action6   s
   r   c                   @   s<   e Zd Zdd Zdd Zdd Zdd Zd	d
 Zdd ZdS )FakeMicroDatasetc                 C   s(   || _ d| _|| _|| _|| _|| _d S Nr   )_data_index
_acc_steps_is_first_stage_is_last_stage_micro_batch_size)selfdataZis_first_stageZis_last_stage	acc_stepsmicro_batch_sizer   r   r   __init__A   s   
zFakeMicroDataset.__init__c                 C   s   | S r   r   r"   r   r   r   __iter__K   s   zFakeMicroDataset.__iter__c                 C   s>   | j | jkrt| js| jsJ | | j }|  j d7  _ |S Nr   )r   r   StopIterationr   r    _load_micro_batch)r"   micro_batch_datar   r   r   __next__N   s   zFakeMicroDataset.__next__c                 C   sj   | j }d }d }| jrt|dksJ d| |d |}| jr1t|dks)J d| |d |}||fS )Nr   zlength of input should be 2r   r   )r   r   len_load_micro_batch_implr    )r"   
micro_stepinputsr#   labelr   r   r   r+   V   s   z"FakeMicroDataset._load_micro_batchc                 C   s*  || j  }|| j  }t|trag }|D ]I}t|tr>t|| jks,J d| jt|f ||| d ur:||  nd  q|d urW| | ||||d d f   q|d  qt|S t|tr~t|| jksxJ d| j	t|f ||  S |d ur| | |||d d f  S d S )Nz)length of data should be %d, but it is %d)
r!   
isinstancetuplelistr.   r   appenddetach_check_data_vaildaccumulate_steps)r"   r1   r0   beginendoutputr#   r   r   r   r/   e   sD   




 

z'FakeMicroDataset._load_micro_batch_implc                 C   s4   |j d }| j| j |ksJ d|| j| jf d S )Nr   zbatch_size needs to be divisible by micro_batch_size. Currently, batch_size = %d, micro_batch_size = %d, accumulate_steps = %d.)shaper!   r   )r"   r#   Z
batch_sizer   r   r   r8      s   
z"FakeMicroDataset._check_data_vaildN)	__name__
__module____qualname__r&   r(   r-   r+   r/   r8   r   r   r   r   r   @   s    
(r   c                       s   e Zd Z fddZd2ddZd2ddZdd	 Z	
d3ddZdd Z	
d3ddZ	dd Z
dd Zdd Z	d4ddZdd Zdd Zdd Zd5d d!Zd2d"d#Zd6d$d%Zd&d' Zd(d) Zd*d+ Zd,d- Zd.d/ Zd0d1 Z  ZS )7PipelineParallelc              	      s\  t |ts	tdt ||| | j dk| _| j dk| _	| j
 dk| _| j dk| _d | _| jjd | _| jjd | _| jjd | _| jjd | _| j | _| j | _| j | _| j | _| j | _| jrw| j | _| j  | _!d | _"d | _#| j| _$| j| _%| jj&d j'| _(| jj&d j)| _*| jj&d j+| _,| jj&d j-| _.| jj&d j/| _0t12d	| j* d
| j, d| j0 d | jj&d j3| _4g | _5dt6| jd  d | _7d| _8d| _9| j4rt12d | j*r| jr| jdksJ | j,r| jr| jdksJ | j*r| j,rJ dt:t;| _<| j*p| j,| _=| j.r2t>? s-t>@  t>A | _BtCD|| j| j. tCE| j| _F| j | _d| _Gd| _Ht12d| j d| j  | j	rkt12d tI| jJ| j | jr{t12d tK| jJ| j | jrt12d tL| jJ| j | jrt12d tM| jJ| j | j*r| N| jJ| j| jd d S d S )Nz5The Layer should be a derived class of PipelineLayer.r   r%   r9   Zenable_partial_send_recvZp2p_cache_shapeZ
pp_configsZsharding_configszdp_comm_overlap z$;             sharding_comm_overlap z#;             sharding_split_param ;zG"name": "{}{}", "cat": "pipeline timeline", "ph": {}, "pid": 0, "tid": , "ts": {}, "cname": "{}"thread_state_running	rail_idleaP  If enable pp profiling, the max training steps should be restricted to a reasonable value (such as 5) to avoid generating large profile files. The profiler will generate a profile file 'profile_record_tmp_file_for_rank_*' for each rank. Users should gather all profile files for one entire pipeline to one node (rank 0 is recommended) to get the full view of the pipeline profile. [DONT CHANGE THE NAME OF THE PROFILE FILES!]. Then get the profile parser from this url: https://github.com/PaddlePaddle/Paddle/blob/develop/python/paddle/distributed/fleet/meta_parallel/pp_utils/profiler_helper.py and save the script to the same directory of all profile files.Parse those files by this command: `python profiler_helper.py`. After parsing, a new file 'pipeline_profile.json' will be generated. Users can inspect this file by chrome://tracing website.zBCannot use dp pp overlap and sharding pp overlap at the same time.r   TzPipeline Info -- num_stages: z, stage_id: zstart broadcast mp parameterszstart broadcast sep parametersz#start broadcast sharding parameterszstart broadcast dp parameters)Or3   r   	TypeErrorsuperr&   _hcgZget_data_parallel_world_sizeZuse_data_parallelZget_model_parallel_world_sizeZuse_model_parallelZget_sep_parallel_world_sizeZuse_sep_parallelZ get_sharding_parallel_world_sizeZuse_sharding_parallel
total_lossZ	_strategyZpipeline_configsr%   r9   Z_enable_partial_send_recv_using_cacheZget_pipe_parallel_world_size
num_stagesZget_stage_idstage_idZget_global_rankglobal_rankZget_pipe_parallel_grouppp_groupZget_data_parallel_groupZdp_groupZget_dp_sep_parallel_groupZget_sharding_parallel_groupsharding_group_virtual_pp_world_size_virtual_pp_rank_real_pp_world_size_real_pp_rankZhybrid_configsZdelay_scale_loss_delay_scale_lossZdp_comm_overlapZ_dp_comm_overlapZsharding_comm_overlap_sharding_comm_overlapZenable_timer_enable_timerZsplit_param_sharding_split_paramr   infoZ	profiling
_profiling_recordsstr_record_format_forward_color_backward_colorr   r5   _chunk_2_comm_buffers_comm_overlaptimerZis_timer_initializedZ
set_timersZ
get_timerstimersp2pZinitialize_p2p_groupsZ	P2pHelper_p2p_helpermicro_batch_id_compute_lossr   _layersr	   r
   r   register_allreduce_overlap_hookr"   layershcgstrategy	__class__r   r   r&      s   








zPipelineParallel.__init__Fc                 C   sB   |s| j d ur| jd usJ | jdkrdS | jd usJ | jdkS )Nr   F)rP   rQ   rS   r"   ignore_virtualr   r   r   is_pipeline_first_stage2  s   


z(PipelineParallel.is_pipeline_first_stagec                 C   s\   |s| j d ur| jd usJ | j| j d krdS | jd usJ | jd us&J | j| jd kS )Nr   F)rP   rQ   rS   rR   ro   r   r   r   is_pipeline_last_stage;  s   
z'PipelineParallel.is_pipeline_last_stagec                 C   s
   || _ d S r   )rQ   )r"   rankr   r   r   set_virtual_pipeline_rankE  s   
z*PipelineParallel.set_virtual_pipeline_rank   c              
   C   sT  |  dkr| }n|g}t|| j}|tjkr,t| ds J t| jds(J | jj}t	|D ]v\}	}i }
dd |
 D }t|dk rH d S |tjkrp|D ]}|j|v sXJ ||j }||
v ri|
| | qO|g|
|< qOn||
d< |
D ]/}|
| }|tjkr|j| }t||}| D ]\}}t||||||}| j|	 | qqvq0| jS )Nr   	optimizer_param2rankc                 S      g | ]}|j s|qS r   stop_gradient).0pr   r   r   
<listcomp>\  s
    z3PipelineParallel.fused_gradient.<locals>.<listcomp>)get_num_virtual_stagesget_model_chunksr   rW   r   r   hasattrrv   rw   	enumerate
parametersr.   namer6   Zranksr   itemsr   r_   )r"   model
comm_groupr$   dp
group_sizemodelsZactrw   	chunk_idxZfused_parameter_groupZparameter_listr|   Zdst_rankdstZ
var_groupsZ	group_idxr   bufferr   r   r   fused_gradientH  sJ   






zPipelineParallel.fused_gradientc                       t j  fdd}|S )Nc                     s      d S r   Zadd_grad_r   paramr   r   fused_allreduce~  s   z6PipelineParallel.bw_hook_func.<locals>.fused_allreducepaddleautogradZno_gradr"   r   r   r   r   r   r   bw_hook_func}  s   zPipelineParallel.bw_hook_funcc           
   	   C   sR   |  ||||| | j D ]\}}|D ]}|jD ]}	|	| ||	 qqqd S r   )r   r_   r   _paramsZ_register_backward_hookr   )
r"   r   r   r$   r   r   r   buffersr   r   r   r   r   rh     s   

z0PipelineParallel.register_allreduce_overlap_hookc                 C   s&   | j sd S | jj }| j| d S r   )rV   rb   keyslog)r"   Zall_flag_namesr   r   r   timer_printer  s   zPipelineParallel.timer_printerc                 C   sH   | j r"tj  | jd| j|||tt		 d | d  d S d S )N{  })
rY   r   devicesynchronizerZ   r6   r\   formatinttime)r"   r   stepphasecolorr   r   r   _record_stamp  s    
zPipelineParallel._record_stampc                 C   s`   | j r.td| j d}| jD ]	}||d  qW d    n1 s$w   Y  g | _d S d S Nz#./profile_record_tmp_file_for_rank_za+
)rY   openrM   rZ   writer"   frecordr   r   r   _flush_records  s   


zPipelineParallel._flush_recordsNc                 C   sB  |r| j r	J d|d urtd td d}|| _d | _d| _| j| j	 d }t
|| j}| j| }g }g }| |}	t|D ]Q}
|rW|d|
 d7 }td	|
  qB| j|  }| d
|
d| j | ||	}| d
|
d| j | j||   || || |  s| | qB|dkr|s| j|  }t|D ]}|r|d||  d7 }|d| d7 }td	||   td|  q||d k}| d
|| d| j | ||	}| d
|| d| j | j||  }|| || |  s| | |d|d}}| d|d| j | |||}| d|d| j |rAd }| j||   q| j||  }qt|D ]T}|rj|d||  d7 }td||   qO|d}|d}| j|  }| d|| d| j | |||}| d|| d| j | j||   qO|r|S |   | j rt!| j"dksJ d| j"# D ]\}}|D ]}|$  qȐq| j%r| &d'  | j()  | j%r| &d*  | &d'  t+j,j-dd | . }W d    n	1 sw   Y  | j%r| &d*  | /  |S )N3While _profiling, static scheduler is not availableIStatic scheduler run won't real run the model, but data has been providedGenable static_scheduler will return the pp schedule instead of the loss r   r   r   rB   zforward step for micro step F"B""E"bzbackward step for micro step Bzcomm buffers should be created!allreduce_shared_weight_gradientsbroadcast_final_lossFenable)0rY   warningswarnr   rX   scalerrI   re   rK   rL   minr9   
_wrap_datarangerd   recv_forwardrq   r   r]   _forward_stepsend_forwardrr   r6   _release_outputZsend_forward_recv_backwardpopr^   _backward_stepZsend_backwardZsend_backward_recv_forwardrecv_backwardr   r`   r.   r_   r   scale_and_split_gradsrV   rb   startrg   r   stopr   amp	auto_cast_broadcast_final_lossr   )r"   r#   r   static_schedulerschedulestartup_stepssteady_stepsinput_buffersoutput_buffersmicro_datasetstep_idinput_tensoroutput_tensori	last_iteroutput_tensor_gradinput_tensor_gradr   r   r   
train_lossr   r   r   forward_backward_pipeline  s  












z*PipelineParallel.forward_backward_pipelinec                 C   sN   t |ts	J d|| _| jr#t| jdkr%| | j| j| j	d dS dS dS )z0for delayed hook register until we get optimizer5optimizer should be HybridParallelOptimizer subclass.r   FN)
r3   r   rv   rU   r.   r_   rh   rg   rO   r9   )r"   rv   r   r   r   #register_sharding_comm_overlap_hookQ  s   z4PipelineParallel.register_sharding_comm_overlap_hookc                 C   s   |  d t|tsJ dt jsJ d| jdds#| jddr,|d us+J dnd }|| _|| _	| j
  | | |S )Nr   r   z*Please enable the generation of gradients.Trp   z7For the first and the last stage, the data must be set.)rt   r3   r   r   Z_dygraph_tracerZ	_has_gradrq   rr   rv   lr_schedulerrg   trainr   )r"   r#   rv   r   r   r   r   _prepare_training\  s0   




z"PipelineParallel._prepare_trainingc                 C   s@   t |tst |ts|S t|| jdd| jdd| j| j}|S )zn
        for backward compatibilty, wrap data to Fake FakeMicroDataset if it is of type list or tuple
        Tr   )r3   r4   r5   r   rq   rr   r9   r%   )r"   r#   r   r   r   r   r   x  s   

zPipelineParallel._wrap_datac                 C   V   |  |||}| ||}tjjdd |   W d    |S 1 s$w   Y  |S NFr   r   r   r   r   r   _optimizer_stepr"   r#   rv   r   r   r   r   r   r   train_batch     

zPipelineParallel.train_batchc                 C   sH  |  d | j  || _d| _d | _| j| j d }t|| j	}| j	| }g }g }| 
|}t|D ]#}| j|  }	| |	|}
| j|
|   ||	 ||
 q3|dkrc| j|  }	t|D ]+}||d k}| |	|}
| j|
|   ||	 ||
 |s| j|  }	qg| jr|  | _| jS || _| jS )Nr   r   )rt   rg   evalrf   re   rI   rK   rL   r   r9   r   r   rd   r   rq   r   r   rr   r6   r   r   )r"   r#   compute_lossr   r   r   r   r   r   r   r   r   r   r   r   r   
eval_batch  sV   







zPipelineParallel.eval_batchc                 C   sb  | j r
| d  |  rt|d }| | |d u s$t|ts$J | jj	||d}| 
 r| jr| jjd us=J dt|d }| | | j||}t|tjtjjjfs^J dtjjdd) | jdkrs| jss|| j }| jd u r~t|| _|  j| 7  _W d    n1 sw   Y  |  s| 
 r|  jd7  _| j r| d  |S )	NZforward_stepr   )chunk_idz*loss function should exist to compute lossr   z4Currently, loss_fn should obtain Paddle.Tensor dtypeFr   )rV   rb   r   rq   next_check_micro_batch_data_validr3   r   rg   forwardrr   rf   Z_loss_fnr   Tensorr   coreeagerr   r   r9   rT   rI   Z
zeros_liker7   re   r   )r"   r   r   r   r   labelsr   r   r   r     s@   



zPipelineParallel._forward_stepc                 C   s"  | j r
| d  tjjddv |  r1|d u sJ | jr*tj	| j
| n2tj	| n+t|trRdd |D }t|t|ksGJ tjj	|t|d n
tjj	|g|gd d }|d urtt|trqtdd |D }n|j}| j r~| d  |W  d    S 1 sw   Y  d S )NZbackward_stepFr   c                 S   rx   r   ry   r{   tr   r   r   r}     s    z3PipelineParallel._backward_step.<locals>.<listcomp>)ZtensorsZgrad_tensorsc                 S   s   g | ]}|j s|jqS r   )rz   gradr   r   r   r   r}     s    )rV   rb   r   r   r   r   rr   r   r   Zbackwardscaler3   r4   r.   r5   r   r   )r"   r   r   r   Zoutputsr   r   r   r   r     s<   

$zPipelineParallel._backward_stepc                 C   sF   t |ttfr|D ]}| | q	d S |d urt |tjs!J d S d S r   )r3   r4   r5   r   r   r   )r"   r,   r#   r   r   r   r     s   z.PipelineParallel._check_micro_batch_data_validc                 C   s  | j ddrK| jd usJ d| js| j n| j| j }|jtjkr*tg ddntg dd}tj	j
|| jd| jd tj	j
|| jd| jd |S tg dd}tj	j
|| j| jd d| jd | rptjdgdd	ntjdgd
d	}tj	j
|| j| jd d| jd |S )NTr   z4train_batch() in last stage should obtain vaild lossr   Zint64r   )srcZsync_opgroupfloat32)r=   dtypeZfloat16)rr   rI   rT   r7   r9   r  r   r  fulldistributed	broadcastrM   rN   rH   Zget_rank_from_stagerK   itemZzeros)r"   ZlossZis_fp32r   r   r   r   &  sJ   
z&PipelineParallel._broadcast_final_lossc                 C   s   | j r6| j D ]-}t|dr&|jd ur&|jd u sJ |jd| j |_q|jd ur5|jd| j |_q| jrF| j	| j
 | j  n| j
	  | j
  | jrZ| j	  d S d S )N	main_gradg      ?)rT   rg   r   r   r  r   r   r9   r   r   rv   updateZ
clear_gradr   )r"   r|   r   r   r   r   R  s    


z PipelineParallel._optimizer_stepc                 C   sL   dd }t |ttfr|D ]
}||r|  qd S ||r$|  d S d S )Nc                 S   s&   | d uot | tjo|  o| jdkS r   )r3   r   r   Z_is_initializedZinplace_version)r   r   r   r   can_freef  s   
z2PipelineParallel._release_output.<locals>.can_free)r3   r4   r5   Z_clear_dataptr)r"   r<   r
  r   r   r   r   r   e  s   z PipelineParallel._release_outputc                 C   s   | j d ddS )NT)r#   r   r   r'   r   r   r   get_static_schedulerv  s   z%PipelineParallel.get_static_schedulerF)ru   )NFNNr   )r>   r?   r@   r&   rq   rr   rt   r   r   rh   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r  __classcell__r   r   rm   r   rA      s:     

	

5

 #


@($,rA   c                       s   e Zd Z fddZdd Zdd Zd$dd	Zd
d Zdd Zdd Z	dd Z
dd Zdd Zdd Z fddZ			d%ddZd&ddZd'd d!Zd"d# Z  ZS )(PipelineParallelWithInterleavec                    s   t  j|||d dt| jd  d | _ddg| _ddg| _i | _i | _|	 dks-J | 
  |	 | _| | _| jd usBJ t| j| jksLJ | j| _d	| _|   d S )
Nrj   rk   rl   zT"name": "{}{}_VP{}", "cat": "virtual pipeline timeline", "ph": {}, "pid": 0, "tid": r   rC   rD   Zthread_state_unknownZ	rail_loadrE   r   )rG   r&   r[   rL   r\   _forward_colors_backward_colors_forward_micro_step_counter_backward_micro_step_counterr   _check_sanitynum_model_chunksr   Zmodel_chunksr.   rP   rQ   _reset_counterri   rm   r   r   r&   }  s0   

z'PipelineParallelWithInterleave.__init__c                 C   s>   t  sJ d| jdksJ d| j| j dksJ dd S )NFvirtual pipeline stage with interleave only support eager dygraph moder   -virtual pipeline must run under pp degree > 2r   zVaccumulate_steps should be evenly divisible by num_stages for pipeline with interleave)r   in_dynamic_moderK   r9   r'   r   r   r   r    s   z,PipelineParallelWithInterleave._check_sanityc                 C   s(   t | jD ]}d| j|< d| j|< qd S r   )r   r  r  r  )r"   r   r   r   r   r    s   
z-PipelineParallelWithInterleave._reset_counterTc           	      C   s   | j r_tj  | j||d}|d }|r-| j| }| j| }|dkr,| j|  d7  < n| j| }| j| }|dkrD| j|  d7  < | j	
d| j||||tt d | d  d S d S )Nr   r   r   r   r   r   r   )rY   r   r   r   _get_virtual_pp_rankr  r  r  r  rZ   r6   r\   r   r   r   )	r"   r   r   r   r   virtual_pp_rankZ	color_idxr   r0   r   r   r   r     s:   




	z,PipelineParallelWithInterleave._record_stampc                 C   sh   | j r2td| j d}| jD ]	}||d  qW d    n1 s$w   Y  g | _|   d S d S r   )rY   r   rM   rZ   r   r  r   r   r   r   r     s   

z-PipelineParallelWithInterleave._flush_recordsc                 C   0   || j | j  }|| j  }|s| j| d }|S r)   )rK   r  r"   r0   r   Zvirtual_pp_stager   r   r   r    s   

z3PipelineParallelWithInterleave._get_virtual_pp_rankc                 C   s   | j |dd}| | t| dsJ t| dsJ | js$t| ds$J t| j| t| j| d ks6J | j| d }| |||}| j| | | jr]| j| 	  | j| 	  |S )NTr  input_tensorsoutput_tensorsoutput_tensor_gradsr   r~   )
r  rt   r   _forward_onlyr.   r!  r"  r   r6   r   )r"   r   r0   r  r   r   r   r   r   _forward_step_helper  s$   
z3PipelineParallelWithInterleave._forward_step_helperc                 C   s   | j rI|  jd7  _| j| j }|dkr/|| j dkr/| j|| j  }| j| D ]}|  q(| jdkrK| j| j| j krM| jd D ]}|  qBd S d S d S d S Nr   r   )r`   _backward_step_countrL   rK   rP   r_   
comm_gradsr  r"   Z	sync_stepr   r   r   r   r   _overlap_comm_grads  s&   




z2PipelineParallelWithInterleave._overlap_comm_gradsc                 C   sd   | j r.| j| j| j ksJ d| j d| j| j  | j D ]\}}|D ]}|  q&q d S d S )NzYbackward step count should be equal to accumulate steps * virtual pp world size, but get z, excepted result is )r`   r'  rK   r  r_   r   r   )r"   r   r   r   r   r   r   _sync_overlap_grads  s    


	z2PipelineParallelWithInterleave._sync_overlap_gradsc                 C   s   | j |dd}| | t| dsJ t| dsJ t| ds!J t| j| dks1J d| t| j| dks<J t| j| dksGJ | j| d}| j| d}| j| d}| |||}| 	  |S )	NFr  r!  r"  r#  r   z1output_tensor_grads is empty for virtual_pp_rank r   )
r  rt   r   r.   r#  r!  r"  r   r   r*  )r"   r0   r  r   r   r   r   r   r   r   _backward_step_helper  s$   
z4PipelineParallelWithInterleave._backward_step_helperc                    r   )Nc                     s    j dd d S )NF)Zuse_commr   r   r   r   r   r   8  s   zDPipelineParallelWithInterleave.bw_hook_func.<locals>.fused_allreducer   r   r   r   r   r   4  s   z+PipelineParallelWithInterleave.bw_hook_funcc                    s   t  j||||tjd d S )N)r   )rG   rh   sysmaxsize)r"   r   r   r$   r   rm   r   r   rh   >  s   
z>PipelineParallelWithInterleave.register_allreduce_overlap_hookFc                 C   sd  |s|rJ d|r'|rJ d| j rJ d|d ur td td d}| js.J d|| _d | _d| _|| _	| j
| j dksLJ d	| j
| j| j
| j }|d
  | j | j | _dd t| jD | _dd t| jD | _dd t| jD | _| |}| j
| j }	|r|	}
n| j| j d
 d }
|
| jd
 | j 7 }
t|
|	}
|	|
 }| d |s| jd | jj|  dd t|
D ]}|r| j|dd}| j| }| j|  d
7  < |d| d| d7 }td| d|  q| jd|ddd | ||}| jd|ddd | j|d
 dd}d}| jddr(|dkr(d}||	d
 kr1d}|  r8d }||
d
 krj|sj|rjd }d}| jddrRd}| jj ||||d\}}| j| jd
  | n| jj!||d}| j| | | "| qt|D ]}|r||
 }| j|dd}|}| j|dd}| j| }| j|  d
7  < | j#| }| j#|  d
7  < |d| d| d7 }|d| d| d7 }td| d|  td | d|  q||
 }| jd|ddd | ||}| jd|ddd |}| jd!|ddd | $|}| jd!|ddd | j|dd}| | |  r0d }| j|dd}| | |  rCd }d}| j|d
 dd}| jddr\|dkr\d}||d
 kred}d}| j|d
 dd}| jddr|| jd
 krd}| jj ||||d\}}| j| | | j| | | "| q|s| "| |so|st%&|  }| j| jd
  | t||	D ]}}|r| j|dd}| j#| }| j#|  d
7  < |d| d| d7 }td | d|  q| jd!|ddd | $|}| jd!|ddd | j|d
 dd}d}| jddr.|| jd
 kr.d}||	d
 kr7d}| j| | jj'||d" q| (  |rT| )  |S | j*r_| +d#,  | j-.  | j*ro| +d#/  | 0  |r| j*r| +d$,  t1j2j3dd% | 4 }W d    n	1 sw   Y  | j*r| +d$/  n| j}| 5  |S )&NFcompute_loss can only be set to False when forward_only is set to Truez/static_scheduler only for training not for evalr   r   r   r   4cache should be enabled for pipeline with interleaver   z^accumulate_steps({}) should be evenly divisible by num_stages({}) for pipeline with interleaver   c                 S      g | ]}g qS r   r   r{   r   r   r   r   r}   {      zLPipelineParallelWithInterleave.forward_backward_pipeline.<locals>.<listcomp>c                 S   r1  r   r   r2  r   r   r   r}   |  r3  c                 S   r1  r   r   r2  r   r   r   r}   }  r3  r   FZ	sync_recvTr  r   Z_vprB   zforward step for z with virtual pp rank r   r   r   r   )	recv_prev	recv_nextr5  r   zbackward step for r   r6  r   r   r   )6rY   r   r   r   rX   rJ   r   rI   re   r$  r9   rK   r   r  r'  r   r!  r"  r#  r   rL   r   rt   r6   rd   r   rq   r  r  r   r%  rr   Z+send_forward_backward_recv_forward_backwardsend_forward_recv_forwardr   r  r,  rc   r   send_backward_recv_backwardr+  r  rV   rb   r   rg   r   r   r   r   r   r   r   r   )r"   r#   r   forward_onlyr   r   r   Zper_stage_accumulate_stepsr   	num_stepsr   r   r0   r  Zreal_micro_stepr   next_virtual_pp_rankr5  r   r6  r   r   Zforward_micro_step_idZforward_virtual_pp_rankZbackward_micro_step_idZbackward_virtual_pp_rankZreal_forward_micro_stepZreal_backward_micro_stepZnext_forward_virtual_pp_ranknext_backward_virtual_pp_rankr   r   r   r   r   C  s  





	











z8PipelineParallelWithInterleave.forward_backward_pipelineNc                 C   r   r   r   r   r   r   r   r     r   z*PipelineParallelWithInterleave.train_batchc                 C   s*   |  d | j  || _| j|d ddS )Nr   T)r;  )rt   rg   r   rf   r   )r"   r#   r   r   r   r   r     s   

z)PipelineParallelWithInterleave.eval_batchc                 C   s   | j d d ddS )NT)r#   r   r   r  r'   r   r   r   r    s   z3PipelineParallelWithInterleave.get_static_scheduler)T)FTFr  r  )r>   r?   r@   r&   r  r  r   r   r  r%  r*  r+  r,  r   rh   r   r   r   r  r  r   r   rm   r   r  z  s,    
	
	
  
f
	r  c                       sH   e Zd Z fddZdd Zdd Zdd Zd	d
 Z	dddZ  Z	S )$PipelineParallelWithInterleaveFthenBc                    s   t  j|||d d S )Nr  )rG   r&   ri   rm   r   r   r&     s   z-PipelineParallelWithInterleaveFthenB.__init__c                 C   s&   t  sJ d| jdksJ dd S )Nr  r   r  )r   r  rK   r'   r   r   r   r    s   z2PipelineParallelWithInterleaveFthenB._check_sanityc                 C   r  r)   )r9   r  r   r   r   r   r    s   

z9PipelineParallelWithInterleaveFthenB._get_virtual_pp_rankc                 C   s   | j sd S |  jd7  _| j| j }|dkr1|| j dkr1| j|| j  }| j| D ]}|  q*| jdkr8d S | j| j| j krM| jd D ]}|  qFd S d S r&  )r`   r'  rL   r9   rP   r_   r(  r)  r   r   r   r*    s&   



z8PipelineParallelWithInterleaveFthenB._overlap_comm_gradsc                 C   s\   | j sd S | j| j }| j|ksJ d| j d| | j D ]}|D ]}|  q$q d S )NzYbackward step count should be equal to accumulate steps * virtual pp world size, but got z, expected result is )r`   r9   rP   r'  r_   valuesr   )r"   Zexpected_countr   r   r   r   r   r+    s   
z8PipelineParallelWithInterleaveFthenB._sync_overlap_gradsFTc                 C   s  |s|rJ d| j sJ d|| _d | _d| _|| _| j| jks+J d| j| j| jd| j k s=J d| j| jd| _| j| j }t	
 }dd t| jD | _d	d t| jD | _d
d t| jD | _| |}| j| j }| d | jd | jj|  dd t|D ]a}	| ||	}
| j|	d dd}d}| jddr|dkrd}|	|d krd}| jddr|  s||
 |	|k s|  r|	| j |krd }
n| }
| jj|
|d}| j| | | |
 q| sJ d|s| j| jd  | jj|  dd t|D ]e}	| |	}| j|	d dd}d}| jddr6|| jd kr6d}|	|d kr?d}| jddri|  sP|| |	|k sb|  re|	| j |kred }n| }| j| | jj ||d q| sJ d| !  | j"r| #d$  | j%&  | j"r| #d'  |r| j"r| #d$  t(j)j*dd | + }W d    n	1 sw   Y  | j"r| #d'  n| j}| ,  |S )Nr/  r0  r   zVaccumulate_steps({}) should be larger than num_stages({}) for pipeline with interleaver   z[accumulate_steps({}) should be smaller than 2 * num_stages({}) for pipeline with interleavec                 S   r1  r   r   r2  r   r   r   r}      r3  zRPipelineParallelWithInterleaveFthenB.forward_backward_pipeline.<locals>.<listcomp>c                 S   r1  r   r   r2  r   r   r   r}   !  r3  c                 S   r1  r   r   r2  r   r   r   r}   "  r3  Fr4  r   Tr  r   r7  z send_recv buffer should be emptyr8  r   r   r   )-rJ   r   rI   re   r$  r9   rK   r   r'  queueQueuer   r  r!  r"  r#  r   rt   r6   rd   r   rq   r%  r  rr   putgetr9  r   emptyr   r,  r:  r+  rV   rb   r   rg   r   r   r   r   r   r   r   )r"   r#   r   r;  r   Z
skip_stepsZsend_recv_buffer_queuer   r<  r0   r   r=  r5  r   r   r>  r6  r   r   r   r   r     s   










z>PipelineParallelWithInterleaveFthenB.forward_backward_pipeline)FT)
r>   r?   r@   r&   r  r  r*  r+  r   r  r   r   rm   r   r?    s    	
r?  r  )/osrA  r-  r   r   collectionsr   r   r   Z!meta_optimizers.dygraph_optimizerr   utilsr   ra   Zutils.hybrid_parallel_utilr   r   r	   r
   Zutils.log_utilr   Zmeta_parallel_baser   Zparallel_layers.pp_layersr   environrD  baser   Zis_compiled_with_xpuZ_use_four_directionsZpp_utilsr   rc   r   Z3paddle.distributed.fleet.utils.tensor_fusion_helperr   r   r   __all__r   r   r   r   rA   r  r?  r   r   r   r   <module>   sJ   

V     i    K