o
    "j-                     @   sX   d dl Z d dlmZ ddlmZmZmZmZm	Z	m
Z
 ddlmZ g ZG dd deZdS )    N)PipelineOptimizer   )OP_ROLE_KEYOP_ROLE_VAR_KEYCollectiveHelperOpRoleis_backward_opis_loss_grad_op)MetaOptimizerBasec                       s|   e Zd Z fddZ fddZdd Zdd Zd	d
 Zdd Zdd Z	dd Z
	dddZdd Zdd Zdd Z  ZS )r   c                    s8   t  | || _ddg| _g | _d| _d| _d| _d S )NZRecomputeOptimizerZAMPOptimizerr         )super__init__	inner_optZmeta_optimizers_white_listZmeta_optimizers_black_listglobal_ring_id
dp_ring_idstart_pipeline_ring_id)selfZ	optimizer	__class__ |/var/www/html/Deteccion_Ine/venv/lib/python3.10/site-packages/paddle/distributed/fleet/meta_optimizers/pipeline_optimizer.pyr      s   
zPipelineOptimizer.__init__c                    sB   t  |||| |jd | _|jd | _|jd | _|j| _d S )Nmicro_batch_sizeaccumulate_stepsschedule_mode)r   _set_basic_infopipeline_configsr   num_microbatchesr   Zshardinguse_sharding)r   loss
role_makerZuser_defined_optimizeruser_defined_strategyr   r   r   r   +   s   z!PipelineOptimizer._set_basic_infoc                 C   s&   | j jsdS | jrdS | jjrdS dS )NFT)r    Z_is_collectiver   r!   pipeliner   r   r   r   
_can_apply<   s   zPipelineOptimizer._can_applyc                 C      d|_ dddd|_d S )NFr   1F1Br   r   r   r"   r   )r   dist_strategyr   r   r   _disable_strategyH   
   z#PipelineOptimizer._disable_strategyc                 C   r%   )NTr   r&   r'   r(   )r   r)   contextr   r   r   _enable_strategyP   r+   z"PipelineOptimizer._enable_strategyc                 C   s|   | j  }d }| D ]}|jrq|jdd|id|id|ddttjid q|s*d S |jdd|id|id|ttjid d S )	NZc_broadcastXOutring_idrootr   typeZinputsZoutputsattrsZc_sync_comm_stream)startup_programglobal_blockZiter_parametersis_distributedZ	append_opr   r   ZForward)r   r0   blockparamr   r   r   _broadcast_paramsX   s,   

z#PipelineOptimizer._broadcast_paramsc                    sn    j  _ j _ j _ jdkr5 j j  _ j j  _	 j j  fddt
 jD  _d S d S )Nr   c                    s    g | ]} j | j   qS r   )	endpointsinner_parallelism).0ir   start_indexr   r   
<listcomp>~   s    z=PipelineOptimizer._get_process_group_info.<locals>.<listcomp>)r;   global_endpointsrankglobal_ranknranksZglobal_nrankspipeline_numr<   dp_rankZ	dp_nranksrangedp_endpointsr#   r   r?   r   _get_process_group_infos   s   
z)PipelineOptimizer._get_process_group_infoc                 C   sb  |    t| jdd}|| j| j| j| j| jd| jd | j	dkr| j
| j	 }|| j	 }|D ]`}|d d |d  }|| }|| jksFJ |d | }	|d | }
| j
|	krk| j
|
krk|| jd d d d d| jd q/| j|	 | j|
 g}| j
|	kr|dnd}d}|| j| j|||d| jd q/| jdkr|| j| j| j| j| jd| jd | | j d S d S )NF)Z	wait_portTr   r   i  r   )rJ   r   r    Z_init_communicatorr5   current_endpointrB   rD   r   r<   rC   r   r;   rF   rI   rG   r   r:   )r   Zpipeline_pairZpipeline_ring_mapZcollective_helperZpipeline_idr@   pairZpair_keyr0   Z
first_nodeZsecond_nodeZpipeline_endpointsZpipeline_rankZpipeline_nranksr   r   r   _init_process_group   sz   




z%PipelineOptimizer._init_process_groupNc                 C   s\  | j  | _| j| j   | _| j  | _| j  | _t| j	| j
d| _|r(|ntj }|j}|j}i |_| j|jd< | j|jd< | j|jd< | j|jd< | j|jd< d|jd< d	|jd
< d|jd< | j||||\}}	}
}}|jd | _|jd | _| j| j dksJ |
sJ t| j| j | _| || |
| _|| _| jd	kr| | ||	fS )N)r   Z
local_rankr   r0   r   r   Fr   r   Z	mp_degreer   Zmp_rankr5   r<   )r    Z_get_trainer_endpointsr;   Z_worker_indexrK   rC   Z_worker_numrE   POr   r   Zwrapped_optpaddleZstaticZdefault_startup_programr8   program_pipeline_optr   r   r   r   Zminimizer5   r<   lenrF   rM   main_program_listmain_program_transpile_main_program)r   r   r5   Zparameter_listZno_grad_setZorig_startup_programr8   rP   Zoptimize_opsZparams_gradsZ	prog_listZpp_pairZring_mapr   r   r   minimize_impl   sX   




zPipelineOptimizer.minimize_implc                 C   s   |  || j | | j d S )N)_insert_loss_grad_opsrF   _insert_allreduce_opsr   )r   r   r   r   r   rU      s   z)PipelineOptimizer._transpile_main_programc              
   C   st   | j d  }ttt|jD ]'\}}t|r7|j|jd  }|j	|d dd|id|idd| t
tjid qd	S )
z
        In order to keep the learning rate consistent in different numbers of
        training workers, we scale the loss grad by the number of workers
        r   r   scaler.   r/   g      ?r2   N)rS   r6   reversedlist	enumerateopsr	   varsZoutput_arg_names
_insert_opr   r   ZBackward)r   r   rF   r8   idxopZloss_grad_varr   r   r   rW      s   z'PipelineOptimizer._insert_loss_grad_opsc                 C   s^  | j jd  }| j  }d }t }d }ttt|jD ]\}}t|r5|s5|d }|t	|jkr5 d S t|rt
|jv r| t
 }	t	|	dkrKqt	|	d dksUJ d}
tdt	|	dD ]L}|	| }|j|	|  }||v rqq_|| |	|d  }d|vr|d7 }|j| }|j|	|  }|jrq_|j||
 dd|id	|id
|ddttjid q_qd S )NZsection_programr   r   r   ZMERGEDz@MERGEDZc_allreduce_sumr.   r/   r0   Zuse_calc_streamTr2   )rT   rQ   r6   setr[   r\   r]   r^   r   rR   r   Z
attr_namesZ	all_attrsrH   r_   addr7   r`   r   r   ZOptimize)r   r0   r8   Zorigin_blockZgradZprocessed_param_nameZfirst_optimize_op_idxra   rb   Zop_role_varoffsetr>   
param_namer9   Z	grad_nameZorigin_paramr   r   r   rX     sZ   


z'PipelineOptimizer._insert_allreduce_ops)NNN)__name__
__module____qualname__r   r   r$   r*   r-   r:   rJ   rM   rV   rU   rW   rX   __classcell__r   r   r   r   r      s    D
5r   )rO   Zpaddle.incubate.optimizerr   rN   commonr   r   r   r   r   r	   Zmeta_optimizer_baser
   __all__r   r   r   r   <module>   s    