o
    "j?s                     @   s   d Z 	 	 	 ddlZddlZddlZddlmZ ddlZddlm	Z	 ddl
Z
ddl
mZ ddlmZ ddlmZ ddlmZ d	d
lmZ ddlmZ g ZG dd dZG dd dZdddZdS )zFleet Utils.    N)OrderedDict)text_format)	framework)core)framework_pb2)Program   )FS   )GraphPreviewGeneratorc                   @   s   e Zd ZdddZdS )UtilFactoryNc                 C   sF   t  }|d urd|v r||d  |d ur!d|v r!||d  |S )NZvalid_strategy
role_maker)UtilBase_set_strategy_set_role_maker)selfcontextutil r   k/var/www/html/Deteccion_Ine/venv/lib/python3.10/site-packages/paddle/distributed/fleet/base/util_factory.py_create_util(   s   zUtilFactory._create_utilN)__name__
__module____qualname__r   r   r   r   r   r   '   s    r   c                   @   s   e Zd Zdd Zdd Zdd Zdd Zd*ddZd+ddZd+ddZ	dd Z
dd Zdd Zdd Zdd Zd,ddZdd  Zd!d" Zd#d$ Zd%d& Zd'd( Zd)S )-r   c                 C   s   d | _ d | _d S r   )r   dist_strategyr   r   r   r   __init__2   s   
zUtilBase.__init__c                 C   
   || _ d S r   )r   )r   r   r   r   r   r   6      
zUtilBase._set_strategyc                 C   r   r   )r   )r   r   r   r   r   r   9   r   zUtilBase._set_role_makerc                 C   s   t |ts	J d|| _d S )NzCfs_client must be the instance of paddle.distributed.fleet.utils.FS)
isinstancer	   	fs_client)r   r!   r   r   r   _set_file_system<   s   
zUtilBase._set_file_systemsumworkerc                 C   s"   t |tr	t|}| j|||S )a  
        All reduce `input` between specified collection. This is a distributed API.

        Args:
            input (list|tuple|numpy.array): The input variable to do all_reduce between specified collection.
            mode (str): "sum" or "min" or "max".
            comm_world (str, optional): Collection used to execute all_reduce operation. Supported collections incude `worker` , `server` and `all` . The default is `worker` .

        Returns:
            output(Numpy.array|None): A numpy array with the same shape as the `input` .

        Examples:
            .. code-block:: python

                >>> # doctest: +REQUIRES(env: DISTRIBUTED)
                >>> # Save the following code in `train.py` , and then execute the command `fleetrun --server_num 2 --worker_num 2 train.py` .
                >>> import paddle.distributed.fleet as fleet
                >>> from paddle.distributed.fleet import PaddleCloudRoleMaker
                >>> import sys
                >>> import numpy as np
                >>> import os

                >>> os.environ["PADDLE_WITH_GLOO"] = "2"

                >>> def train():
                ...     role = PaddleCloudRoleMaker(
                ...         is_collective=False,
                ...         init_gloo=True,
                ...         path="./tmp_gloo")
                ...     fleet.init(role)
                ...
                ...     if fleet.is_server():
                ...         input = np.array([1, 2])
                ...         output = fleet.util.all_reduce(input, "sum", "server")
                ...         print(output) # [2, 4]
                ...     elif fleet.is_worker():
                ...         input = np.array([3, 4])
                ...         output = fleet.util.all_reduce(input, "sum", "worker")
                ...         print(output) # [6, 8]
                ...     output = fleet.util.all_reduce(input, "sum", "all")
                ...     print(output) # [8, 12]

                >>> if __name__ == "__main__":
                ...     train()
        )r    tuplelistr   Z_all_reduce)r   inputmode
comm_worldr   r   r   
all_reduceB   s   
.zUtilBase.all_reducec                 C   s   | j | dS )a}  
        Barrier between specified collection.

        Args:
            comm_world (str, optional): Collection used to execute barrier operation. Supported collections incude `worker` , `server` and `all` . The default is `worker` .

        Examples:

            .. code-block:: python

                >>> # doctest: +REQUIRES(env: DISTRIBUTED)
                >>> # Save the following code in `train.py` , and then execute the command `fleetrun --server_num 2 --worker_num 2 train.py` .
                >>> import paddle.distributed.fleet as fleet
                >>> from paddle.distributed.fleet import PaddleCloudRoleMaker
                >>> import sys
                >>> import os

                >>> os.environ["PADDLE_WITH_GLOO"] = "2"

                >>> def train():
                ...     role = PaddleCloudRoleMaker(
                ...         is_collective=False,
                ...         init_gloo=True,
                ...         path="./tmp_gloo")
                ...     fleet.init(role)
                ...
                ...     if fleet.is_server():
                ...         fleet.util.barrier("server")
                ...         print("all server arrive here") # all server arrive here
                ...     elif fleet.is_worker():
                ...         fleet.util.barrier("worker")
                ...         print("all server arrive here") # all server arrive here
                ...     fleet.util.barrier("all")
                ...     print("all servers and workers arrive here") #all servers and workers arrive here

                >>> if __name__ == "__main__":
                ...     train()
        N)r   Z_barrier)r   r)   r   r   r   barriert   s   'zUtilBase.barrierc                 C   s   | j ||S )af  
        All gather `input` between specified collection.

        Args:
            input (Int|Float): The input variable to do all_gather between specified collection.
            comm_world (str, optional): Collection used to execute all_reduce operation. Supported collections incude `worker` , `server` and `all` . The default is `worker` .

        Returns:
            output (List): A list of gathered values.

        Examples:

            .. code-block:: python

                >>> # doctest: +REQUIRES(env: DISTRIBUTED)
                >>> # Save the following code in `train.py` , and then execute the command `fleetrun --server_num 2 --worker_num 2 train.py` .
                >>> import paddle.distributed.fleet as fleet
                >>> from paddle.distributed.fleet import PaddleCloudRoleMaker
                >>> import sys
                >>> import os

                >>> os.environ["PADDLE_WITH_GLOO"] = "2"

                >>> def train():
                ...     role = PaddleCloudRoleMaker(
                ...         is_collective=False,
                ...         init_gloo=True,
                ...         path="./tmp_gloo")
                ...     fleet.init(role)
                ...
                ...     if fleet.is_server():
                ...         input = fleet.server_index()
                ...         output = fleet.util.all_gather(input, "server")
                ...         print(output) # [0, 1]
                ...     elif fleet.is_worker():
                ...         input = fleet.worker_index()
                ...         output = fleet.util.all_gather(input, "worker")
                ...         print(output) # [0, 1]
                ...     output = fleet.util.all_gather(input, "all")
                ...     print(output) # [0, 1, 0, 1]

                >>> if __name__ == "__main__":
                ...     train()
        )r   Z_all_gather)r   r'   r)   r   r   r   
all_gather   s   .zUtilBase.all_gatherc                 C      d S r   r   r   r   r   r   
_broadcast      zUtilBase._broadcastc                 C   r-   r   r   r   r   r   r   _scatter   r/   zUtilBase._scatterc           
      C   s   t |ts	td| j }| j | }t|| }tt|| }|g| }t|D ]
}||  d7  < q,g g| }d}	t|D ]}||	|	||   ||< |	|| 7 }	qB|| S )N/files should be a list of file need to be read.r
   r   )	r    r&   	TypeErrorr   _worker_num_worker_indexlenintrange)
r   filestrainers
trainer_id	remainder	blocksizeblocksitrainer_filesbeginr   r   r   get_heter_file_shard   s   



zUtilBase.get_heter_file_shardc           
      C   s   t |ts	td| j }| j }t|| }tt|| }|g| }t|D ]
}||  d7  < q*g g| }d}	t|D ]}||	|	||   ||< |	|| 7 }	q@|| S )am  
        Split files before distributed training, and return filelist assigned to the current trainer.

        .. code-block:: text

            example 1: files is [a, b, c ,d, e]  and trainer_num = 2, then trainer
                    0 gets [a, b, c] and trainer 1 gets [d, e].
            example 2: files is [a, b], and trainer_num = 3, then trainer 0 gets
                    [a], trainer 1 gets [b],  trainer 2 gets []

        Args:
            files(list): File list need to be read.

        Returns:
            List: Files belong to this worker.

        Examples:

            .. code-block:: python

                >>> # doctest: +REQUIRES(env: DISTRIBUTED)
                >>> import paddle.distributed.fleet as fleet
                >>> from paddle.distributed.fleet import UserDefinedRoleMaker

                >>> role = UserDefinedRoleMaker(
                ...     is_collective=False,
                ...     init_gloo=False,
                ...     current_id=0,
                ...     role=fleet.Role.WORKER,
                ...     worker_endpoints=["127.0.0.1:6003", "127.0.0.1:6004"],
                ...     server_endpoints=["127.0.0.1:6001", "127.0.0.1:6002"])
                >>> fleet.init(role)

                >>> files = fleet.util.get_file_shard(["file1", "file2", "file3"])
                >>> print(files)
                ["file1", "file2"]
        r1   r
   r   )	r    r&   r2   r   r4   r3   r5   r6   r7   )
r   r8   r:   r9   r;   r<   r=   r>   r?   r@   r   r   r   get_file_shard   s   
&



zUtilBase.get_file_shardc                 C   s   | j  |kr	dS t| dS )a  
        Woker of rank `rank_id` print some message.

        Args:
            message(str): Log to be printed.
            rank_id(int): trainer id.

        Examples:

            .. code-block:: python

                >>> # doctest: +REQUIRES(env: DISTRIBUTED)
                >>> import paddle.distributed.fleet as fleet
                >>> from paddle.distributed.fleet import UserDefinedRoleMaker

                >>> role = UserDefinedRoleMaker(
                ...     is_collective=False,
                ...     init_gloo=False,
                ...     current_id=0,
                ...     role=fleet.Role.WORKER,
                ...     worker_endpoints=["127.0.0.1:6003", "127.0.0.1:6004"],
                ...     server_endpoints=["127.0.0.1:6001", "127.0.0.1:6002"])
                >>> fleet.init(role)

                >>> fleet.util.print_on_rank("I'm worker 0", 0)
                I'm worker 0
        N)r   r4   print)r   messageZrank_idr   r   r   print_on_rank"  s   zUtilBase.print_on_rank	__model__Fc                 C   s   |r!t |d}|t| W d    d S 1 sw   Y  d S t |d}||j  W d    d S 1 s:w   Y  d S )Nwwb)openwritestrdescserialize_to_string)r   programmodel_filenameis_textfr   r   r   _save_programB  s   ""zUtilBase._save_programc                 C   s$   dd }dd }|r||S ||S )Nc                 S   s<   t | d}| }W d   n1 sw   Y  t|S )z$load program from binary string filerbN)rI   readr   parse_from_string)pathrQ   Zprogram_desc_strr   r   r   load_program_binaryK  s   

z3UtilBase._load_program.<locals>.load_program_binaryc                 S   sT   t | d}| }W d   n1 sw   Y  t }t|| t| S )z*load program from human-readable text filerN)	rI   rT   r   ZProgramDescr   ZMerger   rU   ZSerializeToString)rV   rQ   Zprogram_desc_textZ	prog_descr   r   r   load_program_textQ  s   
z1UtilBase._load_program.<locals>.load_program_textr   )r   rV   rP   rW   rY   r   r   r   _load_programJ  s
   	zUtilBase._load_programc                 C   sJ   |  tj|||}|r|d n|d }| |tj||d|  |S )Nz.binz.pbtxtr
   )rZ   osrV   joinrR   )r   Zprog_dirZprog_fnrP   progZprog_out_fnr   r   r   _program_type_trans_  s   zUtilBase._program_type_transc           	      C   sj   |  }tj||d }tj||d }t||d dd|d|g}tj|tjtjtjd}|  d S )Nz.dotz.pdf)rV   dotz-Tpdfz-o)stdinstdoutstderr)	global_blockr[   rV   r\   draw_block_graphviz
subprocessPopenPIPEwait)	r   rN   
output_dirZoutput_filenameblockZdot_pathZpdf_pathcmdpr   r   r   _visualize_graphvizg  s   zUtilBase._visualize_graphvizc                 C   s  |  |j|j}|  |j|j}d}dd | D }t|}t|}td|  t	j
jjt	j
jjg}|D ]O}|| }	|	j|v rD |S z	| |}
W n tyg } ztd|  d}W Y d }~q6d }~ww |	j|
jkst|	j|
jkrtd||	j|	j|
j|
j d}q6|S )NTc                 S   s$   g | ]}t jj|r|j|fqS r   )paddlestaticiois_persistablename.0vr   r   r   
<listcomp>  s    z)UtilBase._proto_check.<locals>.<listcomp>z$persistable vars in pruned program: z>Not find variable '%s' in train program. please check pruning.Fzbvariable: {} not match. in pruned program shape: {} dtype:{}, in train program shape: {} dtype: {})rZ   Ztrain_prog_pathZis_text_train_programZpruned_prog_pathZis_text_pruned_program	list_varsr   r&   rC   r   ZVarDescZVarTypeZFEED_MINIBATCHZ
FETCH_LISTtyperc   var
ValueErrorshapedtypeformat)r   configZ
train_progZpruned_progZis_matchZpruned_varsZpruned_vars_nameZfeed_fetch_type_listvar_namery   Ztrain_prog_varer   r   r   _proto_checku  s^   
	zUtilBase._proto_checkc           &         s  dd }|  tj|j|j|j}|jr| |j|j|j}dd | D }t	ddd |D   dd }||d	g}t
|d
krOt	dt| dS t }tj|}	tj }
tj|
a tjjj|j|	||jd\}}dd |D }|D ]>}tj |j}|d usJ d|j t| j}|j|v sJ |jd ||j}||krt d||j|q|j! |j"}dd |D }|st	d |st	d |}|} j#d ur$| j#kr$t	d| j#  j#}$ }g }t%|j&D ]\}}|j'(d |j)dkr|*| q|d d d D ]}|+| q|j,d uru||j,krut	d||j, fdd|j,D }$ }g }t%|j&D ]\}}|j'(d |j)dkrc|*| qN|d d d D ]}|+| qlt-dd |D }g t
 j#t
 j.  krt
 j/ksJ  J t0t
 j#D ]C}$ 1 j#| }t2 j.| tt3fs j.| f}nt3 j.| }| j.|< |jdd  }||krt d j#| ||q j4skt	d  t0t
 j#D ]d}$ 1 j#| }|j5d
kr#*tjtj66t3|j7gt j.|   j/| d! q|j5dkrTtjtj66t3|j7gt j.|   j/| d!}*tj89|dg|j7 g| qt d"|	j:fd#dt%|D ||d$} n8t	d% j4 d&  fd'dt0t
 j#D }!tj8j;|!|d(}"||j7 j. j4}#|#g}$|	j:|"<|$||d$} t%|D ]\}}%t	d)|%j  t	d*| |   q| W  d    S 1 sw   Y  d S )+Nc                 S   s8   dd }g }t |D ]\}}||| |||  q
|S )Nc           
      S   s   g }t |ttfrt|}d}|D ]}|| }q|}n|g}| g| }||  }t|dD ]3}| d}dd |D }t||kr_|d | }	||d  }|t	|	
| t||ksBq,|S )Nr
   rX    c                 S   s   g | ]}t |qS r   )float)rt   dr   r   r   rv     s    zLUtilBase._params_check.<locals>.feed_gen.<locals>.reader.<locals>.<listcomp>)r    r&   r%   rI   stripsplitr5   appendnparrayZreshape)

batch_sizefndimdatar{   _tempxlinefieldstmpr   r   r   reader  s(   

z8UtilBase._params_check.<locals>.feed_gen.<locals>.reader)	enumerater   )r   feeded_vars_dimsfeeded_vars_filelistr   
batch_feedr>   r   r   r   r   feed_gen  s
   z(UtilBase._params_check.<locals>.feed_genc                 S   s   g | ]}t jj|r|qS r   )rn   ro   rp   rq   rs   r   r   r   rv     s
    z*UtilBase._params_check.<locals>.<listcomp>z"persistable vars in dump program: c                 S      g | ]}|j qS r   rr   rs   r   r   r   rv         c                 S   s:   t  }|  jD ]}|j|v r|j|vr||j q|S r   )setrc   opsrx   add)r]   not_expected_op_typesZop_types_setopr   r   r   check_not_expected_ops  s   

z6UtilBase._params_check.<locals>.check_not_expected_opsZlookup_tabler   zPfind op type '{}' in program, please check if your program is pruned correctly !F)rO   Zparams_filenamec                 S   s   i | ]}|j t|j qS r   )rr   r%   rL   r{   )rt   each_varr   r   r   
<dictcomp>  s    z*UtilBase._params_check.<locals>.<dictcomp>zcan't not find var: zMUST in var listzShape not matching: the Program requires a parameter with a shape of ({}), while the loaded parameter (namely [ {} ]) has a shape of  ({}).c                 S   r   r   r   rs   r   r   r   rv     r   z$warning! no feed targets in program.z%warning! no fetch targets in program.zZwarning! feed vars in program and config are diff: feed in program: {}. feed in config {}.feedz]warning! fetch vars in program and config are diff: fetch in program: {}. fetch in config {}.c                    s   g | ]	}   |qS r   )rc   ry   rt   r>   )inference_programr   r   rv   ;  s    fetchc                 s   s    | ]}|j d kV  qdS )r   N)	lod_levelrs   r   r   r   	<genexpr>J  s    z)UtilBase._params_check.<locals>.<genexpr>r
   zSfeed variable '{}' shape not match. infer program  shape: {}. feed tensor shape: {}zgenerate random feed vars.)r|   zOvars with lod_level >= 2 is not supported now in this infer program check tool.c                    s   i | ]	\}}| | qS r   r   )rt   r>   rr   )feed_tensorsr   r   r     s    )r   
fetch_listreturn_numpyzload feed vars from files: .c                    s    g | ]}   j| qS r   )rc   ry   feeded_vars_namesr   )feed_configr   r   r   rv     s    )Z	feed_listplacezfetch_targets name: %szfetch_targets: )=rZ   r[   rV   r\   Zdump_model_dirZdump_program_filenameZis_text_dump_programr^   rw   rC   r5   r}   r&   r   ZCPUPlacern   ro   ZExecutorZScopeZscope_guarddistributedrp   Z load_inference_model_distributedZsave_params_filenameZglobal_scopeZfind_varrr   r   r   Z
get_tensorr{   getRuntimeErrorr   fetch_configr   rc   r   r   rL   Zset_is_targetrx   r   Z
_remove_opZfetch_vars_namesallr   Zfeeded_vars_typesr7   ry   r    r%   r   r   randomr   baseZcreate_lod_tensorrunZ
DataFeederr   )&r   r~   r   r]   rO   Zsaved_paramsr   r   r   exescopeZfeed_target_namesZfetch_targetsZorig_para_shaper   Zvar_tempZ	new_shapeZ
orig_shaper   Zfetch_targets_namesr   Zfeed_name_listrc   Zneed_to_remove_op_indexr>   r   indexr   ry   Ztensor_shapeZ	var_shapetresultsZ	feed_varsZfeederr   slotsru   r   )r   r   r   r   _params_check  s  

	
	





		

 &zUtilBase._params_checkN)r#   r$   )r$   )rF   F)r   r   r   r   r   r   r"   r*   r+   r,   r.   r0   rA   rB   rE   rR   rZ   r^   rm   r   r   r   r   r   r   r   1   s&    

2
)0;
 7r   
./temp.dotc           
         s  t d | j }tjt|}fddi |jD ]*}|jr6 j	|j
t|jddd|j
d}n j|j
|j
d}||j
< qd fd	d
	}|jD ]%} j|j|jd}	|jD ]}||	|d qc|jD ]}||	|d qoqS |dd dS )zT
    Generate a debug graph for block.
    Args:
        block(Block): a block.
    z
some graphc                    s<    d u rdS  D ]}t |tu sJ t|| r dS qdS )NFT)rx   rK   rematch)rr   pattern)
highlightsr   r   need_highlight  s   z+draw_block_graphviz.<locals>.need_highlight
z<br />r
   	highlightFc                    sp   |j D ]2}|vr j||d|< | }| jp!|j}|r- j| ||d q j|| |d qd S )Nr   )	argumentsadd_argdescriptionZadd_edge)r   ry   Zop2varargvarnr   )graphr   varsr   r   add_op_link_var  s   
z,draw_block_graphviz.<locals>.add_op_link_varT)showN)F)r   rL   rM   r   Z	BlockDescZ
FromStringbytesr   ZpersistableZ	add_paramrr   rK   rx   replacer   r   Zadd_opZinputsZoutputs)
rj   r   rV   ZprotostrrL   ry   r   r   r   Zopnr   )r   r   r   r   r   rd     s.   





rd   )Nr   )__doc__r[   r   re   collectionsr   numpyr   Zgoogle.protobufr   rn   r   Zpaddle.baser   Zpaddle.base.protor   Zpaddle.staticr   Zutils.fsr	   graphvizr   __all__r   r   rd   r   r   r   r   <module>   s4   
     