o
    "j                     @   s  d dl Z d dlmZ d dlmZ d dlZd dlZd dlm	Z
 d dlmZ d dlmZ d dlmZmZmZ d dlmZmZ d dlmZ d d	lmZ d d
lmZ d dlmZ d dlm Z  d dl!m"Z"m#Z# d dl$m%Z% ddl&m'Z'm(Z( G dd de%j)Z*G dd dZ+G dd dZ,G dd dej-Z.				d.dejj/fddZ0	d/ddZdd  Z1d!d" Z2			d0dej/d#e
j3d$ed%ed&ed'ej/fd(d)Z4G d*d+ d+Z5d1d,d-Z6dS )2    N)defaultdict)Callable)nn)unique_name)EagerParamBaseVariabledefault_main_program)Enginestrategyshard_tensor)to_placements)$mark_as_sharding_propagation_skip_op)get_default_distributed_context)DistributedOperator)convert_to_dims_mappingget_dist_attr)core   )check_placements_equalget_shard_specc                   @   s$   e Zd ZdZdd Zedd ZdS )DistAttra  
    DistAttr specifies how tensors are distributed or sliced on ProcessMesh.

    Args:
        mesh(paddle.distributed.ProcessMesh): The `ProcessMesh` object describes the Cartesian topology of the used processes.
        sharding_specs(list[str|None]): The specification describing how to shard the Tensor.

    Examples:
        .. code-block:: python

            >>> import paddle
            >>> import paddle.distributed as dist

            >>> mesh = dist.ProcessMesh([[2, 4, 5], [0, 1, 3]], dim_names=['x', 'y'])
            >>> dist_attr = dist.DistAttr(mesh=mesh, sharding_specs=['x', 'y'])

            >>> print(dist_attr)

    c                    s   t  tjs
tdt |tstdtdd |D s J d|| _ fdd|D }tj|   | _	|| _
| d | d	 d S )
Nz?The mesh must be an instance of paddle.distributed.ProcessMesh.z/The sharding_specs must be an instance of list.c                 s   s"    | ]}t |tp|d u V  qd S N)
isinstancestr.0Zdim_name r   e/var/www/html/Deteccion_Ine/venv/lib/python3.10/site-packages/paddle/distributed/auto_parallel/api.py	<genexpr>U   s
    
z$DistAttr.__init__.<locals>.<genexpr>z@The dimension name in sharding_specs must be an instance of str.c                    s$   g | ]}|d ur j |ndqS )N)Z	dim_namesindexr   meshr   r   
<listcomp>[   s    z%DistAttr.__init__.<locals>.<listcomp>process_meshdims_mapping)r   r   ProcessMesh
ValueErrorlistall_sharding_specsTensorDistAttr__init__r%   r&   mark_annotated)selfr#   sharding_specsr&   r   r"   r   r-   M   s(   



zDistAttr.__init__c                 C      | j S )zl
        Get sharding_specs of the dist_attr
        Returns:
            list[str]: sharding_specs
        )r+   r/   r   r   r   r0   h   s   zDistAttr.sharding_specsN)__name__
__module____qualname____doc__r-   propertyr0   r   r   r   r   r   8   s
    r   c                   @   s   e Zd ZdZ				d"ddZdd Zdd Zd	d
 Zdd Zd#ddZ	d#ddZ
d#ddZd#ddZdd Zdd Zdd Zd$ddZdd Zd d! ZdS )%	DistModela  
    DistModel is generated by ``paddle.distributed.to_static``. It contains the
    static graph converted from a ``paddle.nn.layer`` whose parameters are
    distributed tensors (constructed from ``paddle.distributed.shard_tensor``),
    and provides the APIs for training, evaluation and prediction with the
    static graph.

    Please first set the DistModel to "train", "eval" or "predict" mode and
    then use the __call__ method for training, evaluation and prediction
    respectively.

    In "train" mode, executing ``__call__`` will update the parameters
    of the model and return the loss. In "eval" mode, executing ``__call__``
    will return the loss. In "predict" mode, executing ``__call__`` returns a
    dict that contains the outputs of the model, where the value of "out0" is
    the first output.

    If loss and optimizer are both given, DistModel will be set to "train" mode
    in default. If loss is given but optimizer is None, DistModel will be set
    to "eval" mode in default. If loss and optimizer are both None, DistModel
    will be set to "predict" mode in default.

    For more details of the usage, please refer to the sample code in
    ``paddle.distributed.to_static``.
    Nc           
      C   s   g | _ | || _t||||| jd| _d | _i | _ |jj}| j|j	d |\}}	|d ur;|d ur;| jj
||	ddd |d urI| jj
||	ddd | jj
|d ddd |d ur`|d ur`|   n|d uri|   n|   | j|}| jj|j	d|d| _d S )	N)r
   trainF)modeZinit_parametersevalpredictT)Zreturn_list
batch_size)_feed_name_list_DistModel__convert_strategyZ_inner_strategyr	   _engine_modeZbatch_samplerr=   Z_prepare_data_specZdatasetpreparer9   r;   r<   Z_validate_batch_sizeZ_prepare_dataloaderdist_loader)
r/   layerloaderloss	optimizerr
   Zmetricsr=   Zinputs_specZlabels_specr   r   r   r-      s>   	

zDistModel.__init__c                 C   *   | j jd s
tdd| _| j d dS )z7
        Set the mode of DistModel to "train".
        r9   z-The model for training has not been prepared.Nr@   Z_has_preparedRuntimeErrorrA   Zto_moder2   r   r   r   r9      s   zDistModel.trainc                 C   rH   )z6
        Set the mode of DistModel to "eval".
        r;   z/The model for evaluation has not been prepared.NrI   r2   r   r   r   r;         zDistModel.evalc                 C   rH   )z9
        Set the mode of DistModel to "predict".
        r<   z/The model for prediction has not been prepared.NrI   r2   r   r   r   r<      rK   zDistModel.predictc                 C   s<   |d u r| j d u rtd|d u r| j }|dvrtd|S )Nz;Please set the mode or call train()/eval()/predict() first.)r9   r;   r<   z.mode can only be 'train', 'eval' or 'predict'.)rA   r(   r/   r:   r   r   r   Z__validate_mode   s   zDistModel.__validate_modec                 C      |  |}| j|S )z
        Get the distributed main program of ``mode``. The ``mode``
        arg can be "train", "eval" or "predict", if it is not set,
        ``self._mode`` will be used.
        )_DistModel__validate_moder@   Zget_dist_main_programrL   r   r   r   dist_main_program      
zDistModel.dist_main_programc                 C   rM   )z
        Get the distributed startup program of ``mode``. The ``mode``
        arg can be "train", "eval" or "predict", if it is not set,
        ``self._mode`` will be used.
        )rN   r@   Zget_dist_startup_programrL   r   r   r   dist_startup_program   rP   zDistModel.dist_startup_programc                 C   rM   )z:
        Get the serial main program of ``mode``.
        )rN   r@   Zget_serial_main_programrL   r   r   r   serial_main_program      
zDistModel.serial_main_programc                 C   rM   )z=
        Get the serial startup program of ``mode``.
        )rN   r@   Zget_serial_startup_programrL   r   r   r   serial_startup_program  rS   z DistModel.serial_startup_programc                 C   sv   | j | jvs| j| j  g kr| j }dd |D | j| j < | j| j  }t|t|kr4tdt| tt||S )Nc                 S   s   g | ]}|j qS r   )name)r   varr   r   r   r$     s    z)DistModel._make_feeds.<locals>.<listcomp>zKThe input data and feed_list are not consistent.The model takes %s as input)	rA   r>   r@   Zget_feed_listlenr(   r   dictzip)r/   Z	data_listZ	feed_listZfeed_name_listr   r   r   _make_feeds  s   
zDistModel._make_feedsc                 C   s   dd l }|d u r
d S t }|jj|j_|jjdu r!|jjd |jjdu r.|jjd |	|j
|_
|	|j|_|	|j|_|S )Nr   TZfused_gemm_epilogue_passZfused_dropout_add_pass)copyauto_strategyStrategyfused_passesenablegemm_epilogueZfused_passes_listappenddropout_adddeepcopyshardinggradient_mergepipeline)r/   r
   r[   Zinner_strategyr   r   r   Z__convert_strategy  s"   zDistModel.__convert_strategyc                 G   s   | j d u r	td| j dkr| jjd u s| jjd u rtd| j dkr-| jjd u r-td| t|}| j|}| j dkrId|v rG|d S d S d|v rQ|d S d S )	Nz+Please call train()/eval()/predict() first.r9   z7Please set optimizer and loss function before training.r;   z+Please set loss function before evaluation.r<   outputsrF   )rA   r(   r@   Z
_optimizerZ_lossrZ   r)   run)r/   argsZfeedsZoutsr   r   r   __call__/  s&   



zDistModel.__call__r*   c                 C   s$   | j | jjd|}| |}|S )a  
        Get the state dict of model and optimizer.

        Args:
            mode (str): Can be ['opt', 'param', 'all'],
                'opt' :  The return value only contains the variable in the optimizer.
                'param' : The return value only contains the variable in the network, not the variable in the optimizer.
                'all' : The return value contains the variable in the network and optimizer.
                Default: 'all'
        r:   )rO   r@   rA   
state_dict_build_distributed_state_dict)r/   r:   local_state_dictZdist_state_dictr   r   r   rl   H  s   
zDistModel.state_dictc           	      C   s   | j | jjd}| jj| j }t||}dd }i }tjj * |	 D ]\}}||v s8J d| d| d|||| ||< q%W d   |S 1 sMw   Y  |S )zo
        Args:
            local_state_dict(Dict[str, libpaddle.Tensor]): The state dict from program.
        rk   c           	      S   s  t | tjtjtjjfsJ t | tjst| } t | tjs,J d|  dt|  dt| jt|d ksEJ d| j d|d  d| j}t	|d D ]5\}}|dkr]|t| jk seJ d	| d
|dkrjqN|dkr||d | | j|  ||< qNt
d	| dtj|| jd}tt|d |d }t|d |}t|||}| j| jksJ d| j d| j t| |  |S )Nzlocal tensor:z type z is not paddle.Tensor.r&   zlocal tensor shape z! not equal to dims_mapping shape .r    zdim z out of range.r   Zprocess_shapez is not supported.dtypeZprocess_groupz! not equal to local_tensor.shape:)r   paddleTensornpZndarraybasetyperW   shape	enumerater(   Zzerosrq   distr'   arrayZreshaper   r   _local_valueassign)	Zlocal_tensor	dist_attrZglobal_shapeidimZglobal_tensorr#   
placementsdist_tensorr   r   r   build_distributed_tensorc  sR   


zIDistModel._build_distributed_state_dict.<locals>.build_distributed_tensorzvar z not in dist attrs:ro   N)
rO   r@   rA   Z_dist_contextsr   rr   ru   Zdygraphguarditems)	r/   rn   rO   Zdist_contextZ
dist_attrsr   Zglobal_state_dictvar_nametensorr   r   r   rm   Y  s$   
(


z'DistModel._build_distributed_state_dictc                 C   s   i }| j | jjd}|  }| D ]E\}}| s%J d| d| d||v rQ|| }|j|| jksQt|j|jsQJ d|j d|j d|j d|j d	|	 ||< q|
| d S )	Nrk   zkey z value:z is not a dist tensor.zprocess_mesh:z != z or placements:z
 not match)rO   r@   rA   rl   r   is_distr%   r   r   r{   set_state_dict)r/   rl   rn   rO   Zcur_state_dictkvZcur_vr   r   r   r     s(   $zDistModel.set_state_dictNNNNr   )r*   )r3   r4   r5   r6   r-   r9   r;   r<   rN   rO   rQ   rR   rT   rZ   r?   rj   rl   rm   r   r   r   r   r   r8   r   s*    
4	

	
	

=r8   c                   @   s   e Zd ZdZdddZdS )
FusePassesz@
    A helper class for users to configure the fuse passes.
    Nc                 C   sX   d| _ d| _d| _|d ur(| D ]\}}t| |r!t| || qtd| d S d S )NFzUnknown fuse pass )r_   r`   rb   r   hasattrsetattrr(   )r/   config_dictkeyvaluer   r   r   r-     s   
zFusePasses.__init__r   )r3   r4   r5   r6   r-   r   r   r   r   r     s    r   c                       sR   e Zd ZdZd fdd	Zedd Zedd Zed	d
 Zedd Z	  Z
S )r]   a  
    The `Strategy` object is used to configure the parallelization
    and optimization strategies for static graph. Currently contains
    configuring ``sharding``, ``fused_passes``, ``gradient_merge``
    and ``pipline``. More strategies will be supported in the future.

    ``sharding`` is used to cnofigure the sharding states of the optimizer,
    for saving the GPU memory.

    ``fused_passes`` is used to configure the fusion of the computation in
    the model.

    ``gradient_merge`` is used to configure the gradient merge strategy in
    training.

    ``pipeline`` is used to configure the pipeline parallelism strategy.

    Args:
        config (dict|None, optional): If ``config`` is None, the default
        configurations will be set. If it is a dict, the itmes inside
        the dict will be used to set the configurations, the others remain
        the default values.

    Examples:
        .. code-block:: python

            >>> import paddle
            >>> import paddle.distributed as dist

            >>> strategy = dist.Strategy()

            >>> strategy.sharding.enable = True
            >>> strategy.sharding.stage = 2
            >>> strategy.sharding.degree = 2

            >>> strategy.gradient_merge.enable = True
            >>> strategy.gradient_merge.k_steps = 2
            >>> strategy.gradient_merge.avg = False

            >>> strategy.pipeline.enable = True
            >>> strategy.pipeline.schedule_mode = "1F1B" # default is "1F1B"
            >>> strategy.pipeline.micro_batch_size = 2
    Nc                    s   |d urt |trt|| _n
td| i | _tjj}t	 
|| j | jtjjd }t|| _| jtjjd }t|| _| jtjjd }t|| _| jtjjd }t|| _d S )Nz%Expected a dictionary. But received: )r   rX   r[   rc   Z_config_dictr(   r\   	constantsZBASEsuperr-   getZSHARDINGZShardingConfig	_shardingZGRADIENT_MERGEZGradientMergeConfig_gradient_mergeZPIPELINEZPipelineConfig	_pipelineZFUSED_PASSESr   _fused_passes)r/   configcategoryr   	__class__r   r   r-     s2   
zStrategy.__init__c                 C   r1   )a{  
        ``sharding`` is used to cnofigure the sharding states of the optimizer,
        containing following configs:

            ``enable`` (bool): whether to enable sharding. Default: False.

            ``stage`` (int): can be set to 1, 2 or 3. 1 indicates the optimizer states segmentation,
            2 indicates optimizer states and gradient segmentation, 3 indicates the segmentation
            of optimizer states, gradient and parameters. Default: 1.

            ``degree`` (int): the number of segmentation pieces. Default: 8.

        Examples:
            .. code-block:: python

                >>> import paddle
                >>> import paddle.distributed as dist

                >>> strategy = dist.Strategy()

                >>> strategy.sharding.enable = True
                >>> strategy.sharding.stage = 2
                >>> strategy.sharding.degree = 2
        )r   r2   r   r   r   rd   
  s   zStrategy.shardingc                 C   r1   )a  
        ``gradient_merge`` is used to configure the gradient merge strategy in
        training, containing following configs:

            ``enable`` (bool): whether to enable gradient merge. Default: False.

            ``k_steps`` (int): the number of steps for merging gradients. Default: 1.

            ``avg`` (bool): whether to average the gradients of each step. Default: True.

        Examples:
            .. code-block:: python

                >>> import paddle
                >>> import paddle.distributed as dist

                >>> strategy = dist.Strategy()

                >>> strategy.gradient_merge.enable = True
                >>> strategy.gradient_merge.k_steps = 2
                >>> strategy.gradient_merge.avg = True
        )r   r2   r   r   r   re   &  s   zStrategy.gradient_mergec                 C   r1   )aC  
        ``fused_passes`` is used to configure the fusion of the computation in
        the model, containing following configs:

            ``enable`` (bool): whether to enable fused passes. Default: False.

            ``gemm_epilogue`` (bool): whether to fuse ``matmul`` and ``add`` computation
            in the ``Linear`` layer. Default: False

            "dropout_add" (bool): whether to fuse ``dropout`` and ``add`` computation. Default: False.

        Examples:
            .. code-block:: python

                >>> import paddle
                >>> import paddle.distributed as dist

                >>> strategy = dist.Strategy()

                >>> strategy.fused_passes.enable = True
                >>> strategy.fused_passes.gemm_spilogue = True
                >>> strategy.fused_passes.dropout_add = True
        )r   r2   r   r   r   r^   @     zStrategy.fused_passesc                 C   r1   )a(  
        ``pipeline`` is used to configure the pipeline parallelism in training,
        containing following configs:

            ``enable`` (bool): whether to enable pipeline parallelism. Default: False.

            ``schedule_mode`` (str): the scheduling mode of pipeline parallelism. Default: "1F1B".

            ``micro_batch_size`` (int): the size of each micro-batch inside a mini-batch. Default: 1.

            ``accumulate_steps`` (int): number of steps for accumulating. Default: 1.

        Examples:
            .. code-block:: python

                >>> import paddle
                >>> import paddle.distributed as dist

                >>> strategy = dist.Strategy()

                >>> strategy.pipeline.enable = True
                >>> strategy.pipeline.micro_batch_size = 2
        )r   r2   r   r   r   rf   [  r   zStrategy.pipeliner   )r3   r4   r5   r6   r-   r7   rd   re   r^   rf   __classcell__r   r   r   r   r]     s    ,"


r]   rD   c                 C   s   t | ||||}|j}||fS )a_  
    Converts the ``layer`` with distributed tensor (constructed from
    ``paddle.distributed.shard_tensor``) to a static graph. to_static
    returns a DistModel instance containing the static graph for
    distributed training, evaluation and prediction, and an object of
    DistributedDataLoader to generate data.

    Args:
        layer(paddle.nn.Layer): The layer in dygraph mode, the parameters
            or its inputs can be distributed tensors.
        loader(paddle.io.DataLoader): The data loader used in dygraph mode,
            used to generate DistributedDataloader.
        loss(Loss|Callable|None, optional): The loss function for training
            or evaluating the model. Can be a `paddle.nn.Layer` instance or
            any callable function. Default: None.
        optimizer(paddle.optimizer.Optimizer|None, optional): The optimizer
            for training. Default: None.
        strategy(paddle.distributed.Strategy|None, optional): Configs for
            parallel strategies and optimization settings (e.g. sharding,
            pipeline parallelism). Default: None.

    Returns:
        DistModel: A DistModel tha contains corresponding computational graph
            for the input ``layer`` and provides APIs for training, evaluation
            and prediction.
        DistributedDataLoader: An optimized data loader that can be used
            to generate data.

    Examples:
        .. code-block:: python

            >>> import numpy as np
            >>> import paddle
            >>> import paddle.distributed as dist
            >>> from paddle import nn
            >>> from paddle.distributed import Replicate, Shard

            >>> BATCH_SIZE = 4
            >>> BATCH_NUM = 4
            >>> IMAGE_SIZE = 16
            >>> CLASS_NUM = 8
            >>> class RandomDataset(paddle.io.Dataset):
            ...     def __init__(self, images, labels, num_samples):
            ...         self.images = images
            ...         self.labels = labels
            ...         self.num_samples = num_samples
            ...     def __getitem__(self, idx):
            ...         return self.images[idx], self.labels[idx]
            ...     def __len__(self):
            ...         return self.num_samples

            >>> class DemoNet(nn.Layer):
            ...     def __init__(self, mesh):
            ...         super().__init__()
            ...         self._mesh = mesh
            ...         self.linear_0 = nn.Linear(IMAGE_SIZE, IMAGE_SIZE)
            ...         self.linear_1 = nn.Linear(IMAGE_SIZE, CLASS_NUM)
            ...         self.relu = nn.ReLU()
            ...         # shard the weights of this layer
            ...         self.linear_0.weight = dist.shard_tensor(
            ...             self.linear_0.weight,
            ...             self._mesh,
            ...             [Shard(1)],
            ...             stop_gradient=False,
            ...         )
            ...         self.linear_1.weight = dist.shard_tensor(
            ...             self.linear_1.weight,
            ...             self._mesh,
            ...             [Shard(0)],
            ...             stop_gradient=False,
            ...         )
            ...     def forward(self, x):
            ...         out = self.linear_0(x)
            ...         out = self.relu(out)
            ...         out = self.linear_1(out)
            ...         return out

            >>> # doctest: +REQUIRES(env:DISTRIBUTED)
            >>> images = np.random.rand(BATCH_SIZE, IMAGE_SIZE).astype('float32')
            >>> labels = np.random.rand(BATCH_SIZE, CLASS_NUM).astype('float32')
            >>> dataset = RandomDataset(images, labels, BATCH_SIZE)
            >>> loader = paddle.io.DataLoader(dataset, batch_size=BATCH_SIZE)

            >>> mesh = dist.ProcessMesh([0, 1], dim_names=["x"])
            >>> layer = DemoNet(mesh)
            >>> opt = paddle.optimizer.SGD(
            ...     learning_rate=0.1, parameters=layer.parameters()
            ... )
            >>> loss_fn = nn.MSELoss()

            >>> dist_model, dist_loader = dist.to_static(
            ...     layer, loader, loss_fn, opt
            ... )

            >>> # training
            >>> dist_model.train()
            >>> for batch_id, (image, label) in enumerate(dist_loader()):
            ...     # in train mode, executing the __call__ method will
            ...     # update the parameters of the model and return the
            ...     # loss
            ...     loss = dist_model(image, label)

            >>> # evaluation
            >>> dist_model.eval()
            >>> for batch_id, (image, label) in enumerate(dist_loader()):
            ...     # in eval mode, executing the __call__ method will
            ...     # return the loss
            ...     loss = dist_model(image, label)

            >>> # prediction
            >>> dist_model.predict()
            >>> for batch_id, (image, label) in enumerate(dist_loader()):
            ...     # in predict mode, executing the __call__ method will
            ...     # return a dict that contains the outputs of the model,
            ...     # where the value of "out0" is the first output.
            ...     outs = dist_model(image)

            >>> # This case need to be excuted in multi-card environment
            >>> # export CUDA_VISIBLE_DEVICES=0,1
            >>> # python -m paddle.distributed.launch {test_case}.py
    )r8   rC   )rD   rE   rF   rG   r
   Z
dist_modelrC   r   r   r   	to_staticw  s    r   Tc                 C   s   |du r	t j }t j|}t j| |||d}t  r7t| tr.tj|f||d|j	S t j
||||dS t|||j}t|||S )aD  
    Constructs a ``paddle.Tensor`` with distributed attributes from ``data``,
    which can scalar, tuple, list, numpy.ndarray, paddle.Tensor.

    If the ``data`` is already a Tensor, transform it to a Distributed Tensor.

    Args:
        data(scalar|tuple|list|ndarray|Tensor): Initial data for the tensor.
            Can be a scalar, list, tuple, numpy.ndarray, paddle.Tensor.
        mesh(paddle.distributed.ProcessMesh): The `ProcessMesh` object describes the Cartesian topology of the used processes.
        placements(list[paddle.distributed.Placement]): the placements describe how to place the tensor on ProcessMesh, it can
            be Shard, Replicate and Partial.
        dtype(str|np.dtype, optional): The desired data type of returned tensor. Can be 'bool' , 'float16' ,
            'float32' , 'float64' , 'int8' , 'int16' , 'int32' , 'int64' , 'uint8',
            'complex64' , 'complex128'. Default: None, infers dtype from ``data``
            except for python float number which gets dtype from ``get_default_type`` .
        place(CPUPlace|CUDAPinnedPlace|CUDAPlace|str, optional): The place to allocate Tensor. Can be
            CPUPlace, CUDAPinnedPlace, CUDAPlace. Default: None, means global place. If ``place`` is
            string, It can be ``cpu``, ``gpu:x`` and ``gpu_pinned``, where ``x`` is the index of the GPUs.
        stop_gradient(bool, optional): Whether to block the gradient propagation of Autograd. Default: True.

    Returns:
        Tensor: A Tensor constructed from ``data`` with distributed attributes.

    Examples:
        .. code-block:: python

            >>> import paddle
            >>> import paddle.distributed as dist

            >>> mesh = dist.ProcessMesh([[2, 4, 5], [0, 1, 3]], dim_names=['x', 'y'])

            >>> # dense tensor
            >>> a = paddle.to_tensor([[1,2,3],
            ...                       [5,6,7]])

            >>> # doctest: +REQUIRES(env:DISTRIBUTED)
            >>> # distributed tensor
            >>> d_tensor = dist.shard_tensor(a, mesh, [dist.Shard(0), dist.Shard(1)])

            >>> print(d_tensor)

    N)rq   placestop_gradient)r%   r   )r%   r   r   )rr   	frameworkZ_current_expected_placeZ_get_paddle_placeZ	to_tensorin_dynamic_moder   r   Zfrom_tensor__dict__rs   r   ndimshard_tensor_static)datar#   r   rq   r   r   r   r0   r   r   r   r     s*   .

r   c                 O   s   | |i |}t |||S )a  
    Construct a Distributed Tensor from a function of arguments.

    Args:
        fn (callable): A callable function that takes arguments of Distributed Tensor and returns tensor.
        mesh(paddle.distributed.ProcessMesh): The `ProcessMesh` object describes the Cartesian topology of the used processes.
        placements(list[paddle.distributed.Placement]): the placements describe how to place the tensor on ProcessMesh, it can
            be Shard, Replicate and Partial.
        *args (tuple): A tuple of arguments to be passed to the ``fn`` function.
        **kwargs (dict): A dict of arguments to be passed to the ``fn`` function.

    Retruns:
        Tensor: A Tensor constructed from ``fn`` with distributed attributes.

    Examples:
        .. code-block:: python

            >>> import paddle
            >>> import paddle.distributed as dist
            >>> # Create a distributed attribute
            >>> mesh = dist.ProcessMesh([0, 1], dim_names=["x"])
            >>> # Call the function dtensor_from_fn with dist_attr parameter
            >>> d_tensor = dist.dtensor_from_fn(paddle.ones, mesh, [dist.Replicate()], shape=[1])
            >>> print(d_tensor)

    r   )fnr#   r   ri   kwargsr   r   r   r   dtensor_from_fnH  s   r   c                 C   sz  t j r:t||| j}t||}g }t|D ]\}}t|tj	r&|
| qt|dkr2|| t jj| |S t| tsFJ d| t||| j}t }t }	| jtdddg| j| j| j| j| jd}
t||}| jdd| gid	|
gid
}t |}||j!_"|j!#d d|j!_$|j!%| j&}||_'|#d |j!(|
j&}||_'|#d |	)| t*| |
S )aN  
    Reshard a distributed ``paddle.Tensor`` with given distributed attributes.

    Args:
        dist_tensor(Tensor): the distributed tensor to be resharded.
        mesh(paddle.distributed.ProcessMesh): The `ProcessMesh` object describes the Cartesian topology of the used processes.
        placements(list[paddle.distributed.Placement]): the placements describe how to place the tensor on ProcessMesh, it can
            be Shard, Replicate and Partial.

    Returns:
        Tensor: A Distributed Tensor reshared with distributed attributes.

    Examples:
        .. code-block:: python

            >>> import paddle
            >>> import paddle.distributed as dist

            >>> mesh = dist.ProcessMesh([0, 1], dim_names=["x"])

            >>> # dense tensor
            >>> a = paddle.ones([10, 20])

            >>> # doctest: +REQUIRES(env:DISTRIBUTED)
            >>> # distributed tensor
            >>> d_tensor = dist.shard_tensor(a, mesh, [dist.Partial()])

            >>> out_d_tensor = dist.reshard(d_tensor, mesh, [dist.Replicate()])

            >>> print(out_d_tensor)

    r   zCin dy2static mode, reshard's input should be Variable, but got [{}]ro   Zreshard_apitmp)rU   rq   rw   rv   persistabler   r|   XZOut)rv   inputsrg   r%   r&   )+rr   r   r   r   r   r   rx   r   ry   ZPartialra   rW   Z_set_partial_dimsru   r   reshardr   formatr   r   Zcurrent_blockZ
create_varr   Zgenerate_with_ignorable_keyjoinrq   rw   rv   r   r   r   Z	append_opr   r}   r%   r.   Zchunk_idZget_input_dist_attrrU   r&   Zget_output_dist_attrZadd_dist_op_for_programr   )r   r#   r   r0   r}   Zpartial_dimsr~   pZmain_programZdefault_dist_ctxZout_varZtarget_dims_mappingZtrans_opZdist_opZinput_dist_attrZoutput_dist_attrr   r   r   r   j  sh   
"






r   r%   shard_fninput_fn	output_fnreturnc                    s   du rt dttjst ddtjdtjddfdd}t rl|du r8| jd	d
D ]	\}}|| q-n| jd	d
D ]\}}||| || q> dur\| 	 fdd durj| 
fdd | S td)a  
    Converts all layer's parameters to DistTensor parameters according to
    the `shard_fn` specified. It could also control the conversion of input
    or output of the layer by specifying the `input_fn` and `output_fn`.
    (i.e. convert the input to `paddle.Tensor` with DistTensor, convert output
    back to `paddle.Tensor` with DenseTensor.)

    The `shard_fn` should have the following signature:

        def shard_fn(layer_name, layer, process_mesh) -> None

    The `input_fn` should have the following signature:

        def input_fn(inputs, process_mesh) -> list(paddle.Tensor)

    In general, the type of `input_fn` return value is paddle.Tensor with DistTensor.

    The `output_fn` should have the following signature:

        def output_fn(outputs, process_mesh) -> list(paddle.Tensor)

    In general, the type of `output_fn` return value is paddle.Tensor with DenseTensor.

    Args:
        layer (paddle.nn.Layer): The Layer object to be shard.
        process_mesh (paddle.distributed.ProcessMesh): The `ProcessMesh` information
            to be place the input `layer`.
        shard_fn (Callable): The function to shard layer parameters across
            the `process_mesh`. If not specified, by default we replicate
            all parameters of the layer across the `process_mesh`.
        input_fn (Callable): Specify how the input of the layer is sharded.
            The `input_fn` will be registered for the Layer as a `forward pre-hook`.
            By default we do not shard the input.
        output_fn (Callable): Specify how the output of the layer is sharded or
            convert it back to `paddle.Tensor` with DenseTensor.
            The `output_fn` will be registered for the Layer as `forward post-hook`.
            By default we do not shard or convert the output.
    Returns:
        Layer: A layer that contains parameters/buffers
            that are all `paddle.Tensor` with DistTensor

    Examples:
        .. code-block:: python

            >>> import paddle
            >>> import paddle.distributed as dist

            >>> mesh = dist.ProcessMesh([0, 1], dim_names=["x"])

            >>> class MLP(paddle.nn.Layer):
            ...     def __init__(self):
            ...         super().__init__()
            ...         self.fc1 = paddle.nn.Linear(8, 8)
            ...         self.fc2 = paddle.nn.Linear(8, 8)
            ...
            ...     def forward(self, input):
            ...         return self.fc2(self.fc1(input))

            >>> def shard_fn(layer_name, layer, process_mesh):
            ...     if layer_name == 'fc1':
            ...         layer.weight = dist.shard_tensor(layer.weight, process_mesh, [dist.Shard(0)])

            >>> layer = MLP()
            >>> layer = dist.shard_layer(layer, mesh, shard_fn)
            >>> print(layer)

            >>> # This case need to be excuted in multi-card environment
            >>> # export CUDA_VISIBLE_DEVICES=0,1
            >>> # python -m paddle.distributed.launch {test_case}.py
    Nz,The argument `process_mesh` cannot be empty.z;The argument `process_mesh` is not `dist.ProcessMesh` type.rD   r#   r   c                 S   s   | j  D ]$\}}|d ur(| s(dd tt|jD }| |t||| q	 q| j D ]$\}}|d urR| sRdd tt|jD }| 	|t||| q/	 q/d S )Nc                 S      g | ]}t j qS r   rr   distributed	Replicater   _r   r   r   r$   )      zKshard_layer.<locals>.replicate_layer_params_and_buffers.<locals>.<listcomp>c                 S   r   r   r   r   r   r   r   r$   6  r   )
_parametersr   r   rangerW   rw   Zadd_parameterr   _buffersZregister_buffer)rD   r#   r   paramr   bufferr   r   r   "replicate_layer_params_and_buffers$  s*   

z7shard_layer.<locals>.replicate_layer_params_and_buffersT)Zinclude_selfc                    s
    |S r   r   )r   r   )r   r%   r   r   <lambda>T     
 zshard_layer.<locals>.<lambda>c                    s
    |S r   r   )r   r   rg   )r   r%   r   r   r   Y  r   zx`paddle.distributed.shard_layer` only supports dynamic graph mode now. It will be supported for static graph mode later.)r(   r   ry   r'   r   Layerrr   r   Znamed_sublayersZregister_forward_pre_hookZregister_forward_post_hookNotImplementedError)rD   r%   r   r   r   r   rU   Z	sublayersr   )r   r   r%   r   shard_layer  sB   N
r   c                   @   s6   e Zd ZdddZdd Zdd Zdd	 Zd
d ZdS )_ShardOptimizerNc                 C   sd   |d usJ dt |tjjtjjfsJ dtjj  | _	tjj
|jj|_|| _|| _d S )Nz)The argument `optimizer` cannot be empty.zR`paddle.distributed.ShardOptimizer` only supports AdamW and SGD optimizer for now.)r   rr   rG   ZAdamWZSGDru   r   r   Zglobal_blocktarget_blockZlayer_helperZLayerHelperr   r3   helper
_inner_opt	_shard_fn)r/   rG   r   r   r   r   r-   f  s   

z_ShardOptimizer.__init__c                 C   s   | j | j|g |j}|j| j j v r| j j|j j}| j j D ]I}| j j| | }| r2q#| jd urE| |||| j j| |< q#| rld|vrQ|j	}ndd t
t|jjD }t||j|d| j j| |< q#d S )Nbetac                 S   s   g | ]}t  qS r   )ry   r   r   r   r   r   r$     s    z6_ShardOptimizer._shard_accumulator.<locals>.<listcomp>)r#   r   )r   Z_create_accumulatorsr   rU   Z_master_weightskeysZ_accumulatorsr   r   r   r   rW   r%   rw   r   )r/   r   target_namer   Zaccumulatorr   r   r   r   _shard_accumulatorw  s:   


z"_ShardOptimizer._shard_accumulatorc                 C   s  t | jjd ts>g }| jjD ]}|jrq| d ur&| }|||f q|D ]	\}}| | q)| jjd d |d d S | jj	D ]I}t
dd }|d D ]}|jrTqN| d urg| }|d ||f qN|dd | D  |d D ]	\}}| | qx| jjd d |d qBd S )Nr   )rF   Zstartup_programparams_gradsc                   S   s   g S r   r   r   r   r   r   r     s    z&_ShardOptimizer.step.<locals>.<lambda>paramsc                 S   s   i | ]\}}|d kr||qS )r   r   r   r   r   r   r   r   
<dictcomp>  s    z(_ShardOptimizer.step.<locals>.<dictcomp>)r   r   _parameter_listrX   r   Z
_grad_ivarra   r   Z_apply_optimize_param_groupsr   updater   )r/   r   r   Zgrad_varr   gparam_groupr   r   r   step  s@   
z_ShardOptimizer.stepc                 C   s  | j  }g }t| j jd tr| j jD ]}||d 7 }qn| j j}|D ]}|jr*q$t|dr9|jdur8|  S q$|jdurB|  S q$t	dd |
 D rP|S t| j jd ts| j jD ]8}|jrcq]t|dr|jdurutd|j tj|tjd|_q]|jdurtd|j tj||jd|_q]nD| j jD ]?}|d D ]8}|jrqt|dr|jdurtd|j tj|tjd|_q|jdurtd|j tj||jd|_qq|   | j jd	d
 | j  S )z
        Create and shard the optimizer states e.g., acumulators and master_weights before load_state_dict.
        If training has already started or the optimizer states are already created and sharded, do nothing.
        r   r   	main_gradNc                 s   s$    | ]\}}|d vr|  V  qdS ))Zmaster_weightsZLR_SchedulerN)r   r   r   r   r   r     s    z-_ShardOptimizer.state_dict.<locals>.<genexpr>z gradient should be None, but is rp   F)Zset_to_zero)r   rl   r   r   rX   r   r   r   Zgradanyr   r(   rr   Z
zeros_likeZfloat32rq   r   r   Z
clear_grad)r/   rl   Z
param_listr   r   r   r   r   rl     sx   
















z_ShardOptimizer.state_dictc                 C   s   t | j|S r   )getattrr   )r/   itemr   r   r   __getattr__	  s   z_ShardOptimizer.__getattr__r   )r3   r4   r5   r-   r   r   rl   r   r   r   r   r   r   e  s    
% Mr   c                 C   s
   t | |S )aJ  

    Warp the global view optimizer to distributed view.

    Note:
        The `shard_fn` should have the following signature:
            def shard_fn(accumulator_name, param, accumulator) -> sharded_accumulator

    Args:
        optimizer (paddle.optimizer.Optimizer): The optimizer to be sharded.
        shard_fn (Callable, optional): The function to shard accumulators. If not specified,
           we simply pass down the dist attr of the params.

    Returns:
        An optimizer with distributed view.

    Examples:
        .. code-block:: python

            >>> import paddle
            >>> import paddle.distributed as dist
            >>> mesh = dist.ProcessMesh([0, 1], dim_names=["x"])
            >>> class MLP(paddle.nn.Layer):
            ...     def __init__(self):
            ...         super().__init__()
            ...         self.fc1 = paddle.nn.Linear(8, 8)
            ...         self.fc2 = paddle.nn.Linear(8, 8)
            ...
            ...     def forward(self, input):
            ...         return self.fc2(self.fc1(input))
            >>> layer = MLP()
            >>> batch = paddle.rand(shape=[8, 8])
            >>> opt = paddle.optimizer.AdamW(parameters=layer.parameters())
            >>> opt = dist.shard_optimizer(opt)
            >>> for _ in range(5):
            >>>     loss = layer(batch)
            >>>     loss.backward()
            >>>     opt.step()
            >>>     opt.clear_grad()
            >>> # This case need to be executed in multi-card environment
            >>> # python -m paddle.distributed.launch --gpus=0,1 {test_case}.py

    )r   )rG   r   r   r   r   shard_optimizer  s   
,r   r   )NNT)NNNr   )7r[   collectionsr   typingr   numpyrt   rr   Zpaddle.distributedr   ry   r   Zpaddle.baser   Zpaddle.base.frameworkr   r   r   Z paddle.distributed.auto_parallelr	   r
   r\   Z*paddle.distributed.auto_parallel.interfacer   r   Z/paddle.distributed.auto_parallel.placement_typer   Z2paddle.distributed.auto_parallel.static.completionr   Z4paddle.distributed.auto_parallel.static.dist_contextr   Z/paddle.distributed.auto_parallel.static.dist_opr   Z-paddle.distributed.auto_parallel.static.utilsr   r   Zpaddle.frameworkr   Zplacement_typer   r   r,   r   r8   r   Z
BaseConfigr]   r   r   r   r   r'   r   r   r   r   r   r   r   <module>   sr   	:  : ?
 
K"f
  )