o
    "jeG                     @   sj   d dl mZmZ d dlmZ d dlmZ G dd dZG dd dZG dd	 d	Z		
dddZ
dd ZdS )    )OP_ROLE_KEYOpRole)coreProgramc                   @   s   e Zd ZdZ									dddZdd Zd	d
 Zdd Zdd Zdd Z	de
jjfddZde
jjfddZdd ZdS )TaskNodezC
    Python side TaskNode, connection to the c++ side TaskNode
    Nr   Fc                 C   s   |du|duA sJ d|dur|rJ dt || _|| _|| _|| _|| _|| _|	| _|
| _|| _	d| _
d| _d| _g | _g | _|sv|dur_|durP|dusTJ dt|||||| _nt|j|| j|| _| jrx| j| j dS dS dS )a  
        :param rank (int): Current rank of the task node.
        :param max_run_times (int): The max run times of the task node.
        :param role (int): The role of the task node. (Will be removed in the future)
        :param node_type (str): The type of the task node.
        :param task_id (int): The id of task node.
        :param ops (list): A list of op.desc to init the task node. (Will be removed in the future)
        :param program (Program): An instance of Program to init the task node.
        :param lazy_initialize (bool): In user-defined task, the program may change adding feed/fetch op. As efficient consideration, the task node will have the C++ object later.
        :param cond_var_name (string): Indicate the cond var name of while.
        Nz7Should provide only one of ops or program to task node.z1Lazy initialization doesn't support with ops listz@If init task node with ops, should provide `role` and `task_id`.)intidrankmax_run_times	node_typeprogramlazy_initializecond_var_namevars_to_dtypevars_to_shaperun_pre_stepsrun_at_offsetnode	upstreamsdownstreamsr   r   descset_type)selfr
   r   roler   task_idopsr   r   r   r   r    r   n/var/www/html/Deteccion_Ine/venv/lib/python3.10/site-packages/paddle/distributed/fleet/fleet_executor_utils.py__init__   sX   
zTaskNode.__init__c                 C   s   | j rxt| jj| j| j| j| _| j	r| j
| j	 | jr%| j| j | jr/| j| j | jr9| j| j | jrC| j| j | jrM| j| j | jD ]}| j|d |d |d  qP| jD ]}| j|d |d |d  qdd| _ | jS )Nr         F)r   r   r   r   r   r
   r	   r   r   r   r   r   set_run_pre_stepsr   set_run_at_offsetr   Zset_cond_var_namer   Zset_vars_to_shaper   Zset_vars_to_dtyper   add_upstream_taskr   add_downstream_task)r   upZdownr   r   r   	task_node\   s2   

zTaskNode.task_nodec                 C   s   | j sJ d|| _d S )NzInside program is unchangable for immediate initialized task node. Set the lazy_initialize to be true if the inside program need to be update. Remember to do all your change before eval node.task_node().)r   r   )r   r   r   r   r   set_programw   s
   
zTaskNode.set_programc                 C   s   | j d us	J d| j S )Nz.The task node is not initialized using program)r   r   r   r   r   get_program}   s   zTaskNode.get_programc                 C       | j r|| _d S | j| d S N)r   r   r   r"   )r   Zstepsr   r   r   r"         
zTaskNode.set_run_pre_stepsc                 C   r+   r,   )r   r   r   r#   )r   offsetr   r   r   r#      r-   zTaskNode.set_run_at_offsetr!   c                 C   0   | j r| j|||f d S | j||| d S r,   )r   r   appendr   r$   )r   Zupstreambuffer_sizedepend_typer   r   r   r$         zTaskNode.add_upstream_taskc                 C   r/   r,   )r   r   r0   r   r%   )r   Z
downstreamr1   r2   r   r   r   r%      r3   zTaskNode.add_downstream_taskc                 C   s   | j S r,   )r	   r)   r   r   r   r      s   zTaskNode.task_id)	NNr   NNFNNN)__name__
__module____qualname____doc__r   r'   r(   r*   r"   r#   r   Z
DependTypeZNORMALr$   r%   r   r   r   r   r   r      s,    
C
	
r   c                   @   s0   e Zd ZdZdd Zdd Zdd Zdd	 Zd
S )CoordSysz[
    This class is used to mapping rank to (mp rank, sharding rank, pp rank, dp rank).
    c                 C   s<   | dd| _| dd| _| dd| _| dd| _d S )N	dp_degreer    	pp_degreesharding_degree	mp_degree)getr9   r:   r;   r<   )r   dist_optr   r   r   r      s   zCoordSys.__init__c                 C   sh   |d dk p3|d | j kp3|d dk p3|d | jkp3|d dk p3|d | jkp3|d dk p3|d | jkS )z
        Test the input coord is valid or not.
        :param coord: The coord to be tested
        :return: False if valid, True if invalid.
        mp_idxr   sharding_idxpp_idxdp_idx)r<   r;   r:   r9   r   coordr   r   r   _invalide_coord   s   


zCoordSys._invalide_coordc                 C   sV   |  |rdS t|d | j | j | j |d | j | j  |d | j  |d  S )z
        Map the input coord to it's corresponding rank.
        :param coord:  The coord to be converted
        :return: The rank corresponding with the coord
        rB   rA   r@   r?   )rE   r   r:   r;   r<   rC   r   r   r   coord_to_rank   s"   
zCoordSys.coord_to_rankc                 C   sd   || j  }|| j  }|| j }|| j }|| j }|| j }|| j }t|t|t|t|dS )z
        Map the input rank to it's corresponding coord
        :param rank: The rank to be converted
        :return: The coord corresponding with the rank
        )r?   r@   rA   rB   )r<   r;   r:   r9   r   )r   r
   r?   r@   rA   rB   r   r   r   rank_to_coord   s   






zCoordSys.rank_to_coordN)r4   r5   r6   r7   r   rE   rG   rH   r   r   r   r   r8      s    r8   c                   @   sh   e Zd Z	dddZdd Zdd Zdd	 Zd
d Zdd Zdd Z	dd Z
dd Zdd Zdd ZdS )FleetExecutorUtilsNc                 C   s`   || _ || _|| _|| _|d u rdnd| _d| _d | _d | _|r.t|| _| j	|| _d S d S )NTF   )
dist_strategyr
   nrankr   is_auto_parallelnum_of_functionality	coord_sysrD   r8   rH   )r   rK   r
   rL   r   r   r   r   r      s   
zFleetExecutorUtils.__init__c                 C   s   |t tjkS r,   )r   r   Optimizer   op_roler   r   r   is_optimizer_op   s   z"FleetExecutorUtils.is_optimizer_opc                 C   s   |t tjjkS r,   )r   r   rP   LRSchedrQ   r   r   r   is_lr_sched_op   s   z!FleetExecutorUtils.is_lr_sched_opc                 C   &   |t tjkp|t tjt tjB kS r,   )r   r   ForwardLossrQ   r   r   r   is_forward_op      z FleetExecutorUtils.is_forward_opc                 C   rV   r,   )r   r   BackwardrX   rQ   r   r   r   is_backward_op  rZ   z!FleetExecutorUtils.is_backward_opc                 C   s   g g g g d}| djD ]E}t| t }| |r$|d | q| |r1|d | q| |r>|d | q| 	|rK|d | qdt
| d |S )	Nlrfwdbwdoptr   r^   r_   r`   ra   zThe op role: z6 isn't one of LRSched, Forward, Backward or Optimizer.)blockr   r   Z	all_attrsr   rU   r0   rY   r\   rS   str)r   r   op_list_mapoprR   r   r   r   split_program_to_op_list  s"   



z+FleetExecutorUtils.split_program_to_op_listc                 C   s   t  t  t  t  d}|S )Nr]   r   )r   Zop_listZcomplete_programprogram_mapr   r   r   convert_op_list_to_program  s   z-FleetExecutorUtils.convert_op_list_to_programc                 C   sd  | j rJ d| j| j }t| jd | jd  }|d |d  |d | |d |d | |d |d | |d |d	  |d
 |d  | j | j }}|d d |d< |d d |d< | j	
|}| j	
|}|dk}|dk}	|| j }
|| j }|s|d |
d  |d |
d  |	s|d |d  |d |d  |S )NzAHandly add dependency should not be invoked in auto parallel moder:   rA   r^   r    r_   r!   r`      ra   rF   )rM   r
   rN   r   rK   rD   r%   r$   copyrO   rG   )r   task_node_mapcur_start_idZpp_buff_sizeZupstream_coordZdownstream_coordZpp_upstreamZpp_downstreamZfirst_stageZ
last_stageZprev_pp_start_idZnext_pp_start_idr   r   r   build_1f1b_dependency#  s<   

z(FleetExecutorUtils.build_1f1b_dependencyc                 C   s   t | j| j }t| j| j|d |d}t| j| j|d |d d}t| j| j|d |d d}t| j| j|d |d d}||||d	S )
Nr^   )r
   r   r   r   r_   r    r`   r!   ra   ri   r]   )r   r
   rN   r   r   )r   rg   rl   lr_task_nodefwd_task_nodebwd_task_nodeopt_task_noder   r   r   construct_task_nodes_1f1bK  s<   z,FleetExecutorUtils.construct_task_nodes_1f1bc                 C   s>   i }t | jD ]}t | jD ]}||t|| j | < qq|S r,   )rangerL   rN   r   )r   task_id_to_rankijr   r   r   rt   l  s   z"FleetExecutorUtils.task_id_to_rankc                 C   s   t | j| j }t| j| jt tjj|d |dd}|| j t| j| jt tj	|d |d dd}t| j| jt tj
|d |d dd}t| j| jt tj|d	 |d
 dd}|| j || jd  ||||dS )Nr^   Z	Amplifier)r
   r   r   r   r   r   r_   r    Computer`   r!   ra   ri   r]   )r   r
   rN   r   r   r   rP   rT   r"   rW   r[   r#   )r   rd   rl   rn   ro   rp   rq   r   r   r   !construct_task_nodes_1f1b_op_lists  sR   
z4FleetExecutorUtils.construct_task_nodes_1f1b_op_list)NNNN)r4   r5   r6   r   rS   rU   rY   r\   rf   rh   rm   rr   rt   rx   r   r   r   r   rI      s    

(!rI   Fc                    s   t d t||||d}|| }d |r!||| }|| n g g g g d}	|D ]}
||
 D ]
}|	|
 |j q0q*||	 |  |	 } fdd D }||fS )aI  
    Split the program to support 1f1b pipeline scheduler.
    This funct will split the program based on the op_role.
    The program will be split into four parts: lr_sched, fwd, bwd, opt.
    And will create task nodes based on the four parts of the program.
    :param program: The origin program.
    :param rank: Current rank (can be got from fleet.worker_index()).
    :param max_run_times: Max run times for a micro batch. AKA number of micro steps.
    :param dist_opt: The fleet_opt configured by user.
    :param nrank: Number of workers (can be got from fleet.worker_num()).
    :param with_standalone_executor: Experiment feature, use fleet executor with standalone executor.
    :return:
        task_nodes (list): four task nodes for current rank
        task_id_to_rank (dict): task nodes' ids to it's corresponding rank
    z3fleet executor will use python side 1f1b scheduler.)rK   r
   rL   r   Nr]   c                    s   g | ]} |   qS r   )r'   ).0keyrk   r   r   
<listcomp>  s    zrun1f1b.<locals>.<listcomp>)
printrI   rf   rh   rr   r0   r   rx   rm   rt   )r   r
   r   r>   rL   Zwith_standalone_executorZfleet_executor_utilsrd   rg   Zop_desc_list_maprz   re   rt   Ztask_node_listr   r{   r   run1f1b  s8   

r~   c                 C   s2   t d t| |ddd}| |i}| g|fS )ai  
    Origin scheduler for fleet executor, supports non-pp mode
    :param program: The origin program.
    :param rank: Current rank (can be got from fleet.worker_index()).
    :return:
        task_nodes (list): four task nodes for current rank
        task_id_to_rank (dict): a fake dict, since there is no upstream or downstream, this dict won't be used
    z5fleet executor will use python side origin scheduler.rw   r    )r   r
   r   r   )r}   r   r   r'   )r   r
   r'   rt   r   r   r   origin  s   	r   N)F)Z/paddle.distributed.fleet.meta_optimizers.commonr   r   Zpaddle.frameworkr   Zpaddle.staticr   r   r8   rI   r~   r   r   r   r   r   <module>   s    C A
5