o
    "jVM                     @   s   d dl Z d dlZd dlZd dlm  m  mZ d dlmZ d dl	m
Z
 d dlmZ d dlmZ d dlmZmZ d dlmZ d dlmZmZmZ d d	lmZ G d
d deZe ZG dd dejZG dd deZG dd deZdS )    N)base)CompiledProgram)Executor)Program)CheckpointSaverPaddleModel)RawProgramOptimizer)DistributedOptimizerFleetMode)ioc                       s   e Zd Z fddZdd ZdddZddd	Zd
d Zdd ZdddZ					dddZ
	dddZ			dddZ			dddZ  ZS )
Collectivec                    s<   t  tj d| _d | _d | _d | _d | _d| _	d| _
d S )Nr   Z__paddle_fleet_checkpoint__Z_paddle_fleet_param__)super__init__r   Z
COLLECTIVEZ	_local_ipstartup_program_origin_program_transpiled_programmain_programZ_checkpoint_prefixZ_param_file_nameself	__class__ m/var/www/html/Deteccion_Ine/venv/lib/python3.10/site-packages/paddle/incubate/distributed/fleet/collective.pyr   %   s   
zCollective.__init__c                 C      t d d S )Nz=You should not call 'init_worker' method for collective mode.loggingwarnr   r   r   r   init_worker0      zCollective.init_workerNc                 C   r   )Nz<You should not call 'run_worker' method for collective mode.r   )r   Zmain_programsZscopesr   r   r   
run_worker5   r   zCollective.run_workerc                 C   r   )Nz=You should not call 'init_server' method for collective mode.r   )r   Z	model_dirr   r   r   init_server:   r   zCollective.init_serverc                 C   r   )Nz<You should not call 'run_server' method for collective mode.r   r   r   r   r   
run_server?   r   zCollective.run_serverc                 C   r   )Nz=You should not call 'stop_worker' method for collective mode.r   r   r   r   r   stop_workerD   r   zCollective.stop_workerc                 C   s   t ||| _| jS N)CollectiveOptimizer
_optimizerr   	optimizerstrategyr   r   r   distributed_optimizerI   s   z Collective.distributed_optimizerFc                 C   sL   t |ts	J d|du r| j}t |tsJ dtj||||||d dS )z
        Prune the given `main_program` to build a new program especially for
        inference, and then save it and all related parameters to given
        `dirname` by the `executor`.
        LIn fleet.save_inference_model() function, executor must be as Executor type.NOIn fleet.save_inference_model() function, main_program must be as Program type.)programlegacy_format)
isinstancer   r   r   r   save_inference_model)r   executorZpath_prefixZ
feeded_vasZ
fetch_varsr-   r.   r   r   r   r0   M   s    
zCollective.save_inference_modelc                 C   sL   t |ts	J d|du r| j}t |tsJ dtjjj||||d dS )a  
        This function filters out all variables with `persistable==True` from
        the give `main_program` and then saves these variables to the folder
        `dirname` or file `filename`.

        The `dirname` is used to specify the folder where persistable variables
        are going to be saved. If you would like to save variables in separate
        files, set `filename` None; if you would like to save all variables in a
        single file, use `filename` to specify the file name.
        r+   Nr,   )filename)r/   r   r   r   paddledistributedr   save_persistables)r   r1   dirnamer   r2   r   r   r   r5   p   s   
zCollective.save_persistables.cacheTc	                 C   sT   |du r| j }t||}	|}
t|}|j||	|
g||d\}}|s&|| ||fS )zP
        This function save persistables and current epoch num to path.
        N)pathZslists
trainer_idlocal_cache_path)r   r   r   save_checkpointZclean_redundant_checkpoints)r   r1   r8   r9   train_statusfsr   r:   Zremain_all_checkpointmtc	real_pathZcheckpoint_nor   r   r   r;      s   


zCollective.save_checkpointc	                 C   s8   |du r| j }t||}	t|}
|
j||	|g|||dS )zR
        This function load persistables and current epoch num from path.
        N)r9   ignore_emptyr:   )r   r   r   load_checkpoint)r   r1   r8   r9   r<   r=   r   r:   rB   r>   r@   r   r   r   rC      s   
zCollective.load_checkpoint)NNr$   )NNNF)Nr7   T)__name__
__module____qualname__r   r   r    r!   r"   r#   r*   r0   r5   r;   rC   __classcell__r   r   r   r   r   $   s.    



$
%
'r   c                       s    e Zd ZdZ fddZ  ZS )DistributedStrategyz.
    Init function of DistributedStrategy
    c                    sZ   t    d| _d| _d | _d| _d | _d| _d| _g | _	d| _
d| _t | _d| _d S )NFZnccl2   i   )r   r   use_local_sgduse_dist_fcdist_fc_configmodecollective_modenccl_comm_numforward_recomputerecompute_checkpointsuse_ampamp_loss_scalingr   ZExecutionStrategyexec_strategy_ut4grad_allreducer   r   r   r   r      s   


zDistributedStrategy.__init__)rD   rE   rF   __doc__r   rG   r   r   r   r   rH      s    rH   c                       s<   e Zd ZdZd	 fdd	Z				d
ddZdd Z  ZS )CollectiveOpBasedOptimizerzg
    Collective Operator Base Class For Distributed Optimizer
    The class is invisible to a user
    Nc                    s$   t |ts	J dt || d S )Nz$strategy must be DistributedStrategy)r/   rH   r   r   r'   r   r   r   r      s   z#CollectiveOpBasedOptimizer.__init__c                 C      | j |||||S r$   r&   backwardr   lossr   parameter_listno_grad_set	callbacksr   r   r   rZ         
z#CollectiveOpBasedOptimizer.backwardc                 C      | j |S r$   r&   apply_gradientsr   Zparams_gradsr   r   r   rc         z*CollectiveOpBasedOptimizer.apply_gradientsr$   NNNN)rD   rE   rF   rV   r   rZ   rc   rG   r   r   r   r   rW      s    	
rW   c                       s   e Zd ZdZe f fdd	Z				dddZdd Zd	d
 Zdd Z	dd Z
dd Zdd Zdd Zdd Z	dddZ  ZS )r%   a  
    DistributedOptimizer is a wrapper for paddle.base.optimizer
    A user should pass a paddle.base.optimizer to DistributedOptimizer
    minimize() function is implemented.
    DistributedOptimizer is the starting point for a user who wants to
    run distributed training. The optimized information will be stored in
    Fleet() instance who holds the global information about current distributed
    training.
    c                    sZ   |d u rt  }t || |j| _t|jtstd|j| _	|j
| _|j| _d| _d S )Nz2DistStrategy.recompute_checkpoints shouldbe a ListF)rH   r   r   rP   _forward_recomputer/   rQ   list
ValueError_recompute_checkpointsrR   _use_amprS   _amp_loss_scalingprint_configr'   r   r   r   r     s   
zCollectiveOptimizer.__init__Nc                 C   rX   r$   rY   r[   r   r   r   rZ     r`   zCollectiveOptimizer.backwardc                 C   ra   r$   rb   rd   r   r   r   rc   (  re   z#CollectiveOptimizer.apply_gradientsc                 K   s4   |  D ]\}}|du rtd| d| dqd S )NTzyou can't use z and z	 together)itemsAssertionError)r   namekwargskvr   r   r   _check_condition+  s
   z$CollectiveOptimizer._check_conditionc                 C   s   |j rd|_d|_| jd|j|j|jd |jr-| jd|j|j |jd |jdus-J d|jr@d|_d	|_| jd
|j|jd | j	jdksL| j	jd	krX| j	jdksVJ ddS dS )z0
        Check the conflict conditions.
        
collectiveZ	local_sgdrJ   )use_dgcrK   use_lambrK   )rv   rJ   rw   Nz0DistributedStrategy.dist_fc_config should be setZgrad_allreducerU   )rv   rw   z>local_sgd and grad_allreduce can be used under collective mode)
rJ   rM   rN   rt   Z_enable_dgcrK   Z	_use_lambrL   rU   	_strategy)r   r   r(   r)   r   r   r   _check_collective_mode0  sB   z*CollectiveOptimizer._check_collective_modec           
      C   s   t  }t  }t  | }d|}t  }| jr$td|||| t	 }| j
j|_| j
j|_| j
j|_| j
j|_| j
j|_tj|d}	|	j|||||d dS )zX
        Transpile the programs to distributed programs. And add the variables.
        ,zWworker_endpoints:{} trainers_num:{} current_endpoint:{}                   trainer_id:{})config)r9   Ztrainersr   r-   current_endpointN)fleetworker_endpointsworker_indexjoin
worker_numrm   printformatdist_transpilerZDistributeTranspilerConfigrx   rM   rN   rO   use_hierarchical_allreduceZ#hierarchical_allreduce_inter_nranksZDistributeTranspilerZ	transpile)
r   r   r   r~   r9   r|   worker_endpoints_envZtrainers_numr{   r?   r   r   r   
_transpileZ  s8   




zCollectiveOptimizer._transpilec                 C   sH   t  }g }|D ]}|dd  }||vr!|| || qq|S )N:r   )setsplitstripaddappend)r   	endpointsssipsepipr   r   r   _get_node_ips_from_endpoints  s   
z0CollectiveOptimizer._get_node_ips_from_endpointsc                 C   sJ   t  }t  t   }d|}| |}|dd  }t|}|S )Nrz   r   r   )r}   r~   r   r   r   r   r   len)r   r~   r|   r   Znode_ipsZnode_ipnode_numr   r   r   	_node_num  s   

zCollectiveOptimizer._node_numc                 C   s  |   }|dksJ d| | jj}|dkr2| jjdkr!td d| j_| jjr.td d| j_td}|d u s?|dkr]| jjd |_	| jjrSd| jj d |_	|j	d	kr]td
 | jj
}|d ury|du ryd| j_d| j_d|_	td | jrtd|d|j	d| jjd| jjd|
 | || | jjdkr|S t | j_t | j_t | j_d| j_t| j}d|_d|_| jj|_|jt  |_t |_t |_||_|jdkr| | j! t"|j| jd| _#| j#S )NrI   z nccl2 node_num must >= 1, now:{}z/set nccl_comm_num=1 since you only have 1 node.z@set use_hierarchical_allreduce=False since you only have 1 node.FZFLAGS_sync_nccl_allreduce1      zjif you use use_hierarchical_allreduce or with multi nccl comm, please export FLAGS_sync_nccl_allreduce = 0Tzuse sync_batch_norm will hang when set num_threads > 1, so set num_threads=1, nccl_comm_num=1, use_hierarchical_allreduce=False.z	node_num:znum_threads:zuse_hierarchical_allreduce:znccl_comm_num:zFLAGS_sync_nccl_allreduce:ru   )Zbuild_strategy)$r   rx   rT   rO   r   r   r   osgetenvZnum_threadssync_batch_normrm   r   r   rM   r}   r   Znum_trainersr   r9   r~   Ztrainers_endpointsZ!enable_backward_optimizer_op_depsr   r&   Zfuse_all_reduce_opsZfuse_grad_size_in_numr   r|   rankZnranksr   Z_transpile_main_program_lossr   Z_compiled_program)r   r   r   r   rT   Zsync_allreducer   Zcomm_optr   r   r   _try_to_compile  s~   







z#CollectiveOptimizer._try_to_compilec                 C   s   t d| d| d)Nzcan not use z when you set DistStrategy.z as True)ri   )r   Zstrategy_nameZoptimize_namer   r   r   raiseOptimizeError  s   z&CollectiveOptimizer.raiseOptimizeErrorc                 C   s  | j r,| jg krtd| jjjdv r| d| jjj tjj	
| j| _| j| j | jrL| jjjdv r?| d| jjj tjjj| j| jdd| _|jj}|du rXt }|t_|| _| || j| j | jj||||d	\}}|jd
dt_|t_| ||t_||fS )a  
        minimize a program through loss
        Args:
            loss (Variable|Variable List): loss variable or loss variable list to run optimization.
            startup_program (Program): startup_program for initializing parameters
                in `parameter_list`.
            parameter_list (list): list of Variables to update.
            no_grad_set (set|None): set of Variables should be ignored.
        Returns:
            tuple: (optimize_ops, params_grads) which are, list of operators appended;
            and list of (param, grad) Variables pair for optimization.
        Note that in parameter server mode, a worker will not get anything about optimize_os
        Because optimizer algorithms run on pserver side. We will make this usable in pserver
        process, but currently the optimization part is written into Fleet(). A user does not
        need to care about how to startup a pserver node.
        zTplease set strategy.recompute_checkpointswhen set strategy.forward_recompute as True)RecomputeOptimizerOptimizerWithMixedPrecisionrP   )r   ZDGCMomentumOptimizerZmixed_precisionT)Zinit_loss_scalingZuse_dynamic_loss_scalingN)r^   F)Zfor_test) rg   rj   ri   r&   r   rD   r   r3   Zincubater(   r   Z_set_checkpointsrk   ZstaticampZdecoraterl   blockr-   r   Zdefault_startup_programr}   r   r   ry   rx   minimizecloner   r   r   r   )r   r\   r   r]   r^   r   Zoptimize_opsZparam_gradsr   r   r   r     sL   




zCollectiveOptimizer.minimizerf   )NNN)rD   rE   rF   rV   rH   r   rZ   rc   rt   ry   r   r   r   r   r   r   rG   r   r   r   r   r%     s$    

*(Pr%   ) r   r   r3   Z3paddle.distributed.transpiler.distribute_transpilerr4   Z
transpilerZdistribute_transpilerr   r   Zpaddle.base.compilerr   Zpaddle.base.executorr   Zpaddle.base.frameworkr   Z0paddle.base.incubate.checkpoint.checkpoint_saverr   r   Z(paddle.distributed.fleet.meta_optimizersr   Z&paddle.incubate.distributed.fleet.baser	   r
   r   Zpaddle.staticr   r   r}   ZBuildStrategyrH   rW   r%   r   r   r   r   <module>   s$    (