o
    "jm                     @   s   d dl Z d dlmZ d dlmZ d dlmZ d dlmZm	Z	m
Z
mZ d dlmZ ddlmZmZ dd	lmZmZ ejjjejjjejjjejjjejjjgZd
d ZedG dd deZdS )    N)core)Program)remove_process_group)is_backward_opis_forward_opis_lr_sched_opis_optimize_op)TaskNode   )PassBaseregister_pass)_create_program_insert_sync_for_fthenb_1f1bc                 C   s   |  dod| dv S )Nop_namescope/auto_parallel/reshard)has_attrattr)op r   q/var/www/html/Deteccion_Ine/venv/lib/python3.10/site-packages/paddle/distributed/passes/auto_parallel_pipeline.pyis_reshard_op*   s   r   Zauto_parallel_pipelinec                       sT   e Zd Z fddZdd Zdd Zdd Zd	d
 Zdd Zdd Z	dd Z
  ZS )PipelinePassc                    s   t    | dd  d S )Ndist_context)super__init__Zset_attrself	__class__r   r   r   2   s   
zPipelinePass.__init__c                 C   s   |  dd u r	dS dS )Nr   FT)get_attrr   r   r   r   _check_self6   s   zPipelinePass._check_selfc                 C   s   dS )NTr   )r   Z
other_passr   r   r   _check_conflict;   s   zPipelinePass._check_conflictc                 C   s   |  d| _|  d| _|  d| _|  d| _|| _ttdd| _	tdd
d	}t|| _t| jj| _| | j	| _| jd
krPt| j |   d S | jdkrYtd| jdkrh|   |   d S td| j d)Nr   Zaccumulate_stepsZschedule_modeZgeneration_batch_sizeZPADDLE_TRAINER_IDr   ZPADDLE_TRAINER_ENDPOINTS ,Z1F1BzF-Then-Bz!F-Then-B has not been implementedstreamzJNow only 'F-then-B', '1F1B' and 'stream' are supported.The given value is .)r   _dist_context
_acc_steps_mode_gen_bsz_programintosgetenv	_cur_ranksplitlen_nrankprocess_meshes
_pp_stages_get_pp_stage_cur_pp_stager   
_task_1f1bNotImplementedError_insert_sync_ops_for_stream_task_stream
ValueError)r   Zmain_programZstartup_programcontextZtrainer_endpointsr   r   r   _apply_single_impl>   s.   




zPipelinePass._apply_single_implc           
   	   C   s   | j jD ]s}d}g }tt|jD ]F\}}|jdv r |dd |jdkrW|dd |d}|jd }|	|}|j
|| dd	|gid
|gid|id |d7 }|| q|D ]}|jdd}	|	jd	|g |	jd
|g qZ|  qd S )Nr   send_v2recv_v2Zdynamic_shapeFr>   Zuse_calc_streamop_rolec_sync_calc_streamXZOut)indextypeZinputsZoutputsattrsr
   nop)rD   )r*   blocks	enumeratelistopsrD   Z	_set_attrr   input_arg_namesvarZ_insert_op_without_syncappendZ	append_opdescZ	set_inputZ
set_output_sync_with_cpp)
r   blockoffsetZ	send_varsrC   r   r@   var_namerL   Znop_opr   r   r   r8   [   s6   






z(PipelinePass._insert_sync_ops_for_streamc                 C   s2   d }t | jjD ]\}}||jv r|} |S q|S )N)rH   r&   r2   Zprocess_ids)r   rankZpp_idxidxZprocess_meshr   r   r   r4   }   s   
zPipelinePass._get_pp_stagec           '   
   C   s  d}t  }t  }t  }t  }t| jjD ]\}}|dkr1|d}|d}	|d}
|d}n4|j|jd}|j|jd}	|j|jd}
|j|jd}||j |	|j |
|j ||j |j	D ]9}t
|rtt||| t|rt||	| qht|rt||
| qht|rt||| qhtdt|d d q|  |  |  |  |  |  |  |  t| j| j|t| j| d ddd	}|| j t| j| j|t| j| d
 ddd	}t| j| j|t| j| d ddd	}t| j| j|t| j| d ddd	}|| j || jd
  ||||d}| jj}|| j}|| j}t| D ]\}\}}t| j| | }g }g }t| j| j  }|d
 }|d
 }|dkrx|dkro|nd}|!||f |dkr|dkr|nd}|!||f |D ]/}t|| | } |dkr|dkr|!| df q|dkr|dkr|!| df q|D ]/}!t|!| | }"|dkr|!dkr|!|"df q|dkr|!dkr|!|"df q|D ]}#t"d|d|#d d|#d
  |#|#d |#d
  q|D ]}$t"d|d|$d d|$d
  |$|$d |$d
  qq@i }%t%| j&D ]}t%|D ]}&||%t|| |& < q:q4i | j_'t(|) |%| jd| jj'd< d S )N   r   )
parent_idxzThe op role: r@   z6 isn't one of LRSched, Forward, Backward or Optimizer.Z	AmplifierT)rS   max_run_timesprogramtask_id	node_typelazy_initializer
   Compute      )lrfwdbwdoptr_   ra   rb   r`   zTask:z's upstream includes:, buffer size is:z's downstream includes:)taskstask_id_to_ranknum_micro_batches	fleet_opt)*r   rH   r*   rG   rP   Z_create_blockrV   Z_set_forward_block_idxZforward_block_idxrJ   r   r   r   r   r   r:   strr   rO   Z	_rollbackr	   r.   r'   r+   Zset_run_pre_stepsZset_run_at_offsetr&   up_down_streamsupsdownsitemsr3   r5   rM   printadd_upstream_taskadd_downstream_taskranger1   _pipeline_optrI   values)'r   num_of_functionalityZlr_progZfwd_progZbwd_progZopt_progrT   	src_blockZlr_blockZ	fwd_blockZ	bwd_blockZ	opt_blockr   Zlr_task_nodeZfwd_task_nodeZbwd_task_nodeZopt_task_nodeZ
task_nodesrj   pp_upstream_rankspp_downstream_ranksiZ	task_roleZ	task_nodeZcur_idrk   rl   pp_buff_sizeZprev_idZnext_idbuf_sizeZupstreamZupstream_idZ
downstreamZdownstream_idupZdownrf   jr   r   r   r6      s>  














zPipelinePass._task_1f1bc           3      C   s  d}t  }t  }t  }t  }t  }d }t }i }	t| jjD ]y\}
}|
dkrg|d}|d}d}|jD ]/}|jdkrRt|	ddksHJ |	dd }d}q6|s]t
|||dd q6t
|||dd q6q|
dkr|d}|d}d}d}t|jD ]\}}|jd	kr|sd}|r|s|jd
krd}|r|s| j| jd kr|jdkr|j|d  jd	krq|jdkrq|jdvr|drd|dv rt|j dkrd|j d vr||j d  |jd	krt|d q|jd	krt|d qt
|||dd q|r|r|drsd|dv rs|jdv r1t|d |j d }|d}|dkrI|d | }n|}||	|< ||sr||}|j|j||j|j|j|j|j|j|j|jd
 q|j D ]}||	v r|j ||	|  qxt
|||dd qqqt!d|"  |"  |"  |"  |d usJ i }i }i }i }t#|D ]#}|$ j%| }t&|j}||dtd d  ||< |j||< qt#t|	' D ]#}|$ j%| }t&|j}||dtd d  ||< |j||< qg } g }!t|dkr$t|dks J |} |}!t|dkr8t|dks4J |} |}!t(| j)| j*dt+| j)| d |dd}"t(| j)| j*dt+| j)| d ||dd}#t(| j)| j*dt+| j)| d |dd}$t(| j)| j*dt+| j)| d |d| |!d}%t(| j)| j*dt+| j)| d |dd}&d }'t+| j| j }(|",|#- | j. t/d!|"- d"|#- d#| j. |#0|"- | j. t/d!|#- d$|"- d#| j. |#,|$- |' t/d!|#- d"|$- d#|' |$0|#- |' t/d!|$- d$|#- d#|' |$,|%- |( t/d!|$- d"|%- d#|( |%0|$- |( t/d!|%- d$|$- d#|( |%,|#- |'t1j2j3 t/d!|%- d"|#- d#|' |#0|%- |'t1j2j3 t/d!|#- d$|%- d#|' |#,|&- |'t1j2j4 t/d!|#- d"|&- d#|' |&0|#- |'t1j2j4 t/d!|&- d$|#- d#|' | j5j6})|)7| j)}*|)8| j)}+|*D ]B},| 9|,}-|-| jd k rt+|,| d }.|$0|. t/d!|$- d$|.d#d qt+|,| d }.|%0|. t/d!|%- d$|.d#d q|+D ]>}/| j| jd k rt+|/| d }0|$,|0 t/d!|$- d"|0d#d qt+|/| d }0|%,|0 t/d!|%- d"|0d#d qi }1t:| j;D ]}t:|D ]}2||1t+|| |2 < q)q#d%|"|#|$|%|&g|1| j*dd&i| j_<d S )'N   r   Fwhile	Conditionr
   T)Zforce_creater>   r?   rA   rF   )Zrecv_2ZassignZc_allgatherr   r   z@RESHARDZring_idr=   @)
rD   nameshapedtype	lod_levelpersistable
error_clipstop_gradientis_databelong_to_optimizerz"Only support generation condition.zpaddle.ZStart)rS   rW   rZ   rY   rX   r[   ZCond)rS   rW   rZ   rY   rX   cond_var_namer[   r\   r]   r^   )rS   rW   rZ   rY   rX   r[   vars_to_dtypevars_to_shaperU   rc   zTask z's downstream is:rd   z's upstream is:rh   )re   rf   rg   Zinference_generation)=r   setrH   r*   rG   rP   rJ   rD   r0   inputr   r5   r3   r   r   rN   rK   addr   Zoutput_arg_namesfindZ_find_var_recursiveZ_var_recursiveZ
create_varr   r   r   r   r   r   r   r   Z_rename_input	ExceptionrO   rI   Zglobal_blockvarsri   rs   r	   r.   r'   r+   rp   rY   r)   rn   ro   r   Z
DependTypeZLOOPZ	STOP_LOOPr&   rj   rk   rl   r4   rq   r1   rr   )3r   rt   Z
start_progZ	cond_progZend_progZ	send_progZ	recv_progr   Zsend_vars_nameZrecv_vars_nameZibru   Zstrat_blockZ	end_blockZis_after_while_opr   Z
send_blockZ
recv_blockZis_after_send_opZis_after_recv_oprx   rR   rC   Zold_var_nameZsrc_varZin_nameZsend_task_node_var_dtypeZsend_task_node_var_shapeZrecv_task_node_var_dtypeZrecv_task_node_var_shaperL   r   r   r   Zstart_task_nodeZcond_task_nodeZsend_task_nodeZrecv_task_nodeZend_task_nodeinfry   rj   rv   rw   Zupstream_rankZupstream_pp_stageZupstream_task_idZdownstream_rankZdownstream_task_idrf   r|   r   r   r   r9   5  s  















[

	





	


	

	zPipelinePass._task_stream)__name__
__module____qualname__r   r    r!   r<   r8   r4   r6   r9   __classcell__r   r   r   r   r   0   s    " 1r   )r,   Zpaddle.baser   Zpaddle.base.frameworkr   Z5paddle.distributed.auto_parallel.static.process_groupr   Z-paddle.distributed.auto_parallel.static.utilsr   r   r   r   Z-paddle.distributed.fleet.fleet_executor_utilsr	   Z	pass_baser   r   Z
pass_utilsr   r   ZVarDescZVarTypeZREADERZSTEP_SCOPESZLOD_TENSOR_ARRAYZFEED_MINIBATCHZ
FETCH_LISTZ__not_shape_var_type__r   r   r   r   r   r   <module>   s"   	