o
    "jC:                     @   s   d dl Z d dlZd dlZd dlZd dlmZ d dlmZ d dlm	Z	 d dl
mZ ddlmZ ddgZed	d Zd
d Zdd Zdd ZdddZdd Zdd Zdd ZdS )    N)dygraph_only)fleet)logger   )save_for_auto_inferencesaver   c           
   	   K   sD  | dd}t dks|du rt|}tj| |fi |S | dd}t|ts-J d|dv s5J dt	| d|du rOt
d	| d
| d| d t }| dkr_| dksmJ d|  d|  | }| }|dkr|jdkrt| |sJ d|jdkrt| |sJ dt|}tj| |fi |S |jdkrt|}tj| |fi |S t| |rtj| |fi |S t|tttfsJ t|tr|g}| dd}z9t
dt|    t| |||d}	t
dt|    t |v rt|}tj|	|fi | W dS W dS    td| d)a:  
    Save a state dict to the specified path in both distributed and single-card environment.

    Note:
        Now supports saving ``state_dict`` of Layer/Optimizer, Tensor and nested structure containing Tensor, Program.

    Note:
        Different from ``paddle.jit.save``, since the save result of ``paddle.save`` is a single file,
        there is no need to distinguish multiple saved files by adding a suffix. The argument ``path``
        of ``paddle.save`` will be directly used as the saved file name instead of a prefix.
        In order to unify the saved file name format, we recommend using the paddle standard suffix:
        1. for ``Layer.state_dict`` , recommend to use ``.pdparams`` ;
        2. for ``Optimizer.state_dict`` , recommend to use ``.pdopt`` .
        For specific examples, please refer to API code examples.

    Args:
        obj(Object) : The object to be saved.
        path(str|BytesIO) : The path/buffer of the object to be saved.
            If saved in the current directory, the input path string will be used as the file name.
        protocol(int, optional): The protocol version of pickle module must be greater than 1 and less than 5.
            Default: 4.
        **configs(dict, optional): optional keyword arguments. The following options are currently supported:

            1. use_binary_format(bool):
                To be used in paddle.save. When the saved object is static graph variable, you can specify ``use_binary_for_var``.
                If True, save the file in the c++ binary format when saving a single static graph variable; otherwise, save it in pickle format.
                Default: False.
            2. gather_to(int|list|tuple|None):
                To specify which global rank to save in.Defalut is None.
                None value means distributed saving with no gathering to a single card.
            3. state_type(str):
                Value can be 'params' or 'opt', specifying to save parametres or optimizer state.
            4. max_grouped_size(str|int):
                To limit the max size(how many bits) a object group to be transfered a time.
                If str, the format must be as num+'G/M/K', for example, 3G, 2K, 10M, etc. Default is 3G.

    Returns:
        None

    Examples:

        .. code-block:: python

            >>> # doctest: +SKIP('TODO: the error will be fix in the feature')
            >>> import paddle
            >>> paddle.distributed.init_process_group(backend='nccl')
            >>> paddle.distributed.fleet.init(is_collective=True)

            >>> model = build_model()
            >>> optimizer = build_optimizer(model)

            >>> dist_optimizer = paddle.distributed_optimizer(optimizer)
            >>> dist_model = paddle.distributed_optimizer(model)

            >>> # gather params to rank 0 and then save
            >>> paddle.incubate.distributed.utils.io.save(model.state_dict(), path="path/to/save.pdparams", gather_to=[0], state_type="params")

            >>> # save whoe params on all ranks
            >>> paddle.incubate.distributed.utils.io.save(model.state_dict(), path="path/to/save.pdparams", gather_to=[0,1], state_type="params")

            >>> # save optimizer state dict on rank 0
            >>> paddle.incubate.distributed.utils.io.save(optimizer.state_dict(), path="path/to/save.pdopt", gather=0, state_type="opt")

    	gather_toNr   
state_typez|must pass an arg state_type='params' or state_type='opt' to specify whether to save model state_dict or optimizer state_dict)paramsoptz8must pass an arg state_type='params' or state_type='opt'$zYou are saving z, while the path(z does not end with )z;Only DP and Sharding is supported now. However, current MP=z , PP=r
   z0only sharding stage 1/2 and DP are supported nowZmax_grouped_size3Gzstate_dict_keys:)max_sizezgathered_state_dict_keys:zSaving failed. Follwing are some suggestions:
    1) pass the param max_grouped_size to turn the grouped size smaller (current value of max_grouped_size is zr)
    2) if sharding stage is 1, use paddle.save rather than paddle.distributed.save
    3) Concat the developers
)getdistget_world_size_remove_not_supported_confpaddler   
isinstancestrresearchr   warningr   Zget_hybrid_communicate_groupZget_model_parallel_world_sizeZget_pipe_parallel_world_sizeZget_sharding_parallel_groupZget_data_parallel_groupZnranks
_same_keyslisttupleintinfokeys_gather_state_dictget_rankRuntimeError)

state_dictpathconfigsr   r	   ZhcgZsharding_groupZdp_groupr   Zgathered_state_dict r&   o/var/www/html/Deteccion_Ine/venv/lib/python3.10/site-packages/paddle/incubate/distributed/utils/io/dist_save.pyr      s   C





c           
      c   s   d}|   D ]\}}|t|t| k r!t|t| }qt||}td|  i }t|  }d}d}|t|k rt| ||  t||  }	||	 |kr^|V  i }d}| ||  ||| < |d7 }||	7 }|t|kr}|dkr}|V  |t|k sAdS dS )zw
    Description:
        Generator of state dict groups to transfer.the size of each group is less than max_size.
    r   zmax tensor size: r   N)	itemssys	getsizeofmaxr   debugr   r   len)
r#   r   Zmax_tensor_sizekvZstate_groupZk_listindexbitsZbsizer&   r&   r'   _state_dict_groups   s4   
r2   c                 C   s    | D ]}t |dkr dS qdS )z&
    Check if all items are empty
    r   FTr-   )Z	dict_listr/   r&   r&   r'   	all_empty   s
   r4   c                 C   s   t | ttfs	J t | tr?td| sJ d|  t| dd }| d dkr/|d } | S | d dkr;|d } | S |d	 } | S )
z
    Parse an integer or a mem size str to an integer
    convert xxxG to xxx * 1024^3
    convert xxxM to xxx * 1024^2
    convert xxxK to xxx * 1024^1
    z^[0-9]*[GMK]$zeWrong max_size 's format, the format ust be like 10K, 9M, 200G , etc, or an integer. However this is NGi   @Mi   i   )r   r   r   r   r   )r   numr&   r&   r'   _parse_mem_size_to_bits   s    
r9   r   c                 C   s   t |tttfsJ dt |tr|g}t|}|t| }td t		| }d}d}d}d|v r;|
dd}d}d|v rG|
dd}d}t||||}	t |tr[t||||}
n|du shJ dt| |rn|
|	d< |rt||	d< |	S )	aq  
    Description:
        Gather state dicts across all group ranks to dst, Depiring the same elements. including LR_Scheduler.
    Args:
        state_dict(dict):
            local state dict
        dst(int|list|tuple):
            ranks the state dicts are gathered to
        group(ProcessGroup):
            group across which the state dicts are gathered
        max_size(int|str):
            The max limitation of the gathered tensor group size transformered a time. Default is 3G bits.
            Each rank 's max tensor group before gathering is max_size // group.size
    Returns:
        Gathered state dict
    z,dst' type must be one of int, list and tuplezlen state_dict: len(state_dict)NFZmaster_weightsTZLR_Schedulerz%Wrong type of master weights . type: )r   r   r   r   r9   r   r   r   r,   copypop_grouped_gather_data_dictdicttype)r#   dstgroupr   Zstate_dict_mwZhas_mwZhas_lrlroutputZmastersr&   r&   r'   r       s:   




r    c              
   C   s  i }t dt|   |  D ]\}}z| ||< W q   tdt| d| dd}i }t d t||D ]W}	g }
|t|	7 }t d| dt|  t	
|
|	| t	 |v r~|
D ]}| D ]\}}t d	| d
|j  qg|| qat dtdd |
D  dt|  q;	 g }
i }	t d t	
|
|	| t|
rn:t	 |v r|
D ]}| D ]\}}t d	| d
|j  q|| qt dtdd |
D  dt|  qt d t	 |v rt }| D ]}tj|| |d||< ||| _q|S i S )a  
    Description:
        Gather state data dict by groups.
    Args:
        state__data_dict(dict):
            local dict to transfer.The state_data_dict only contains the mapping: str->paddle.Tensor
        dst(int|list|tuple):
            ranks the state dicts are gathered to
        group(ProcessGroup):
            group across which the state dicts are gathered
        max_size(int|str):
            The max limitation of the gathered tensor group size transformered a time. Default is 3G bits.
            Each rank 's max tensor group before gathering is max_size // group.size
    Returns:
        Gatherd state_data_dict

    zlen state_tict_ : zthe object (type of z) of 'z!' is neither tensor nor parameterr   zstart all gather ...zgen to gather: z / z
gathered: z, zs list size: c                 s       | ]}t |V  qd S Nr3   .0sr&   r&   r'   	<genexpr>W      z,_grouped_gather_data_dict.<locals>.<genexpr>z	 output: Tz
while Truec                 s   rD   rE   r3   rF   r&   r&   r'   rI   i  rJ   zall gathered ...)place)r   r,   r-   r(   numpy	TypeErrorr>   r   r2   r   all_gather_objectr!   shapeupdatesumr4   r   ZCPUPlacer   Z	to_tensorname)Zstate_data_dictr?   r@   r   Z
numpy_dictr.   r/   totalZoutput_statestateZs_listrH   rK   r&   r&   r'   r<   )  sb   
 
 
r<   c                 C   sF   t |  }g }t| tj|||d |D ]	}||ks  dS qdS )z
    Check whther all keys in each dict in the group are the same.
    Used in sharding strategy to determine whether a dict needs to be gathered.
    )r@   FT)r   r   r   r   r   rN   )r#   r@   r   Zkey_listr.   r&   r&   r'   r   x  s   
r   c                 C   s6   dg}t  | }|  D ]}||vr||d q|S )z?
    Remove the config values not supported by paddle.save
    Zuse_binary_formatN)r:   r   r;   )r%   Z__supported_by_save__Zconfigs_r.   r&   r&   r'   r     s   
r   )r   )r:   r   r)   r   Zpaddle.distributeddistributedr   Zpaddle.base.frameworkr   r   Z'paddle.distributed.fleet.utils.log_utilr   Zsave_for_autor   __all__r   r2   r4   r9   r    r<   r   r   r&   r&   r&   r'   <module>   s(   
 	'

;O