o
    "jAs                     @   sN  d dl Z d dl mZmZ d dlmZ d dlmZmZ d dlm	Z	 d dl
mZmZmZ d dlmZ d dlmZ d	d
lmZmZ G dd deZG dd deZd.ddZd/ddZd/ddZG dd deZejddddfddZd0ddZG dd deZ			 d1d!d"Zd2d#d$Z d%d& Z!	d/d'd(Z"	d/d)d*Z#	 	+				d3d,d-Z$dS )4    N)_C_ops_legacy_C_ops)PyLayer)check_dtypecheck_variable_and_dtype)
collective)LayerHelper_create_tensorin_dynamic_mode)Layer)dygraph_utils   )ReduceOp_get_reduce_opc                   @   $   e Zd Zedd Zedd ZdS )c_identity_eagerc              	   C   s&   || _ |r|S t|ddd|jddS Nuse_calc_streamTring_iduse_model_parallel)groupr   
c_identityid)ctxtensorr   skip_c_identity_dynamic r   k/var/www/html/Deteccion_Ine/venv/lib/python3.10/site-packages/paddle/distributed/fleet/layers/mpu/mp_ops.pyforward   s   zc_identity_eager.forwardc                 C   s    t tjd}| jj|| |S )N_c_identity)r   r   SUMr   process_groupall_reduce_on_calc_stream)r   dyop_typer   r   r   backward,   s   zc_identity_eager.backwardN__name__
__module____qualname__staticmethodr   r%   r   r   r   r   r      s
    
r   c                   @   r   )c_split_eagerc                 C   s,   || _ || _t|ddd|jd|d|ddS )Nr   Tr   ranknranksr   )r   r-   r   c_splitr   )r   r   r   r,   r-   r   r   r   r   4   s   zc_split_eager.forwardc                 C   s@   | j }|j}|d | j |d< tj||jd}|j|| |S )Nr   dtype)r   shaper-   paddleemptyr0   r!   Z%all_gather_into_tensor_on_calc_stream)r   r#   r   	out_shapeoutr   r   r   r%   F   s   zc_split_eager.backwardNr&   r   r   r   r   r+   3   s
    
r+   Fc                 C   s   |dur
|  s
dS |du rdn|j}t rt| ||S d}t|fi t }|j| jd}t	| dg dd |j
|d| id	|i|d
d
dd |S )a'  
    Return a copy of the tensor, mainly used with model parallel.

    Args:
        tensor (Tensor): The input Tensor. Its data type
            should be float16, float32, float64, int32 or int64.
        group (int): The id of the process group to work on.

    Returns:
        Tensor.
    Nr   r   r/   r   float16float32float64int32int64Zuint16r   XOutT)r   r   r   typeinputsZoutputsattrs)	is_memberr   r
   r   applyr   locals"create_variable_for_type_inferencer0   r   	append_op)r   r   r   r   r$   helperr5   r   r   r   r   S   s0   
r   c           	      C   s   |dur
|  s
dS |du rt n|}|j}t j}|j}|j}t r4t	| d|ddd|d|ddS d}t
|fi t }|j| jd	}t| d
g dd |j|d| id|i|dd||dd |S )a*  
    Return allgather of the tensor, mainly used with model parallel.

    Args:
        tensor (Tensor): The input Tensor. Its data type
            should be float16, float32, float64, int32 or int64.
        group (int): The id of the process group to work on.

    Returns:
        Tensor.
    Nr   r   Tr,   r-   r   c_concatr/   r   r6   	_c_concatr<   r=   )r   r   r   r-   r,   r>   )rB   r   _get_default_groupr   _get_global_envr,   r-   r
   r   rH   r   rD   rE   r0   r   rF   	r   r   r   global_rankr,   r-   r$   rG   r5   r   r   r   rI   ~   sT   
rI   c           	   
   C   s   |dur
|  s
dS |du rdn|j}t j}|du r|n||}|du r,t jn|j}t r:t	
| |||S d}t|fi t }|j| jd}t| dg dd |j|d| id	|i|d
||d
dd |S )af  
    Split tensor evenly among all members, mainly used with model parallel.

    Args:
        tensor (Tensor): The input Tensor. Its data type
            should be float16, float32, float64, int32 or int64.
        rank (int): The rank of the current process.
        group (int): The id of the process group to work on.

    Returns:
        Tensor.
    Nr   r.   r/   r   r6   _c_splitr<   r=   T)r   r   r,   r-   r   r>   )rB   r   r   rK   r,   get_group_rank
world_sizer-   r
   r+   rC   r   rD   rE   r0   r   rF   rL   r   r   r   rN      s@   

rN   c                   @   r   )mp_allreduce_eagerc                 C   sB   |j | _|| _|rt|d}|j|| |S t|d|d|j S )N_mp_allreducer   r   )r   r   r   r   r!   r"   r   Zc_allreduce_sum_)r   r   r   r   r   opr   r$   r   r   r   r      s   

zmp_allreduce_eager.forwardc              	   C   s"   | j r|S t|ddd| jddS r   )r   r   r   r   )r   r#   r   r   r   r%     s   zmp_allreduce_eager.backwardNr&   r   r   r   r   rQ      s
    
rQ   Tc           
      C   s   |dur
|  s
dS t r.|du rt n|}|tjks$J d| dt| |||||S |du r4dn|j}d}t	|fi t
 }|j| jd}	t| dg d| |j|d	| id
|	i||dd |	S )z`[it is same as allreduce above, but it supports model parallel. And it support inplace startegy]NzUnknown parameter: .r   mp_allreduce_sumr/   r   )r7   r8   r9   r:   Zint64uint16r<   r=   r   r   r>   )rB   r
   r   rJ   r   r    rQ   rC   r   r   rD   rE   r0   r   rF   )
r   rS   r   r   r   r   r   r$   rG   r5   r   r   r   rR     s@   			rR   c           	      C   sz   t  rt| |||S d}t|fi t }|jdd}t|dddg| ||}|jd|| dd|i||d	d
 |S )aX  
    Lookup table according to index.

    Args:
        table (Tensor): The input Tensor. Its data type
            should be float16, float32, float64.
        index (Tensor): The index to lookup table.
        start_index (int): The initial index for table range.
        name (string): The name of the api

    Returns:
        Tensor.
    c_embeddingtable)Zinput_param_nameinputr:   r;   )ZIdsWr=   )start_index
vocab_sizer>   )	r
   r   rX   r   rD   Zinput_dtyper   rE   rF   )	rY   indexr\   r]   namer$   rG   r0   tmpr   r   r   _c_lookup_tableM  s   
ra   c                       s8   e Zd ZdZ			d	 fdd	Zdd Zdd Z  ZS )
_Linearz
    Linear
    Nc                    sb   t    | j | _|| _|| _| j||g| j| jdd| _| j|g| j| jdd| _	|| _
d S )NF)r1   attrr0   is_biasT)super__init__Z_helperget_default_dtype_dtypeZ_weight_attr
_bias_attrcreate_parameterweightbiasr_   )selfZin_featuresZout_featuresweight_attr	bias_attrr_   	__class__r   r   rf   q  s"   

z_Linear.__init__c                 C   s   t || j| j| jd}|S )N)xrk   rl   r_   )_linearrk   rl   r_   )rm   rZ   r5   r   r   r   r     s   z_Linear.forwardc                 C   s8   | j r	d| j  nd}d| jjd | jjd | j|S )Nz, name= z+in_features={}, out_features={}, dtype={}{}r      )r_   formatrk   r1   rh   )rm   Zname_strr   r   r   
extra_repr  s   z_Linear.extra_repr)NNN)r'   r(   r)   __doc__rf   r   rw   __classcell__r   r   rp   r   rb   l  s    rb   c                 C   s\  |d ur
|  s
d S |d u rdn|j}t j}|d u r|n||}|d u r,t jn|j}tt	| j
}	tt	|j
}
|	d |
krR|	|
krRtd|	 d|
 d|	d |
kr_tj|dd}t rzt| |d|d	|d
|d|
\}}|sv|S ||fS ||||d}tdi t }|j| jd}|j| jd}|jd| |d||d|d |r||fS |S )Nr   ru   z\Expected input_dims - 1 = label_dims or input_dims == label_dims             (got input_dimsz, label_dims)rW   axisr   r,   r-   ignore_index)r   r,   r-   r~   c_softmax_with_cross_entropyr/   )ZLogitsLabel)ZSoftmaxZLossr>   )r   )rB   r   r   rK   r,   rO   rP   r-   lenlistr1   
ValueErrorr2   Z	unsqueezer
   r   r   r   rD   rE   r0   rF   )Zlogitslabelr   Zreturn_softmaxr~   r   rM   r,   r-   Z
input_dimsZ
label_dimsZsoftmaxZlossrA   rG   r   r   r   _c_softmax_with_cross_entropy  sl   

r   c                 C   s  t  r#t| jd}t| ||dddddd	 tj||t| jd dS t	di t
 }| j}t| jd	k s9J d
t| dg dd t|dg dd | g|gd}dddd}||}	|jd|d|	i|d |dur||}
|jd|	g|gdd|
gidt| jd id |
S |	}
|
S )z
    Fuction Linear
    r/   transpose_XFtranspose_Yalpharu   r|   linearr   z/X latitude is not supported greater than 3 now.rr   )r7   r8   r9   r0   )r<   Y)r   r   r   Z	matmul_v2r=   r>   NZelementwise_addr}   )r   )r
   r	   r0   r   matmulr   Z_append_bias_in_dygraphr   r1   r   rD   r   r   rE   rF   )rr   rk   rl   r_   Zpre_biasrG   r0   r@   rA   r`   resr   r   r   rs     sZ   

rs   c                 C   sN   | d u rd S d| _ tj  }tj  }d|| j_ d|| j_ d S )NT)is_distributedr2   staticdefault_startup_programcurrent_blockdefault_main_programZ_find_var_recursiver_   )varstartup_block
main_blockr   r   r   _set_var_distributed  s   r   c              
   C   s|  |dur
|  s
dS |du rdn|j}|dkr |	rt| |d} nt| |d} tjj|||||
d}tjjj}|| |j	|dkr@dn|j
|j}t|j	 |dkrY|jdurYt|j
 |s]|S t|j}|d  |dkrldn|9  < tj  }|j||j|j|jdd|j d}|dkr|jdd	|id
|i|ddd |j
dur||j
 }|S |jdd	|id
|i|||dddd |S )z
    Parallel Linear

    axis the dimension of the parameter of linear layer.
    axis = 0: the row dimension
    axis = 1: the col dimension

    Nr   r   )rn   ro   r_   ru   F)r1   r0   r?   	lod_levelZpersistableZis_dataneed_check_feedrU   r<   r=   TrV   r>   rH   )r,   r   r-   r   r   )rB   r   rN   r   r2   nnZLinear
functionalr   rk   rl   r_   r   ri   r   r1   r   r   r   Z
create_varr0   r?   r   descr   rF   )rr   Znum_rowsZnum_colsr}   
param_attrro   
gather_out
inner_rankr-   Zsplit_tensorr_   r   r   r   Zlinear_function
linear_outr4   r   r5   r   r   r   _parallel_linear   s~   
	


	
	
r   c                 C   s   |dur
|  s
dS |du rdn|j}tdi t }	|}
|}||
 }|	 }|
|d g}|	j|||dd}|dkrFtjjj	| |dd|dS tj
  }tj
  }d|j|j _d|j|j _t|| ||d |d	}t||ddd
}|S )z
    Parallel Embedding
    Nr   _parallel_embeddingru   F)rc   r1   r0   rd   )rk   Zpadding_idxsparser_   T)r\   r]   r_   )r   r   r   )r   )rB   r   r   rD   rg   rj   r2   r   r   	embeddingr   r   Zglobal_blockr   varsr_   r   ra   rR   )rr   Zper_part_embeddingsZorigin_sizer   r   num_partitionsr_   r   r   rG   per_part_sizer,   Zvocab_start_indexr0   sizerk   r   r   Zoutput_parallelr5   r   r   r   r     sD   
r   ru   c	                 C   s  t |ttfsJ dt|dksJ dt |tsJ dddg}	||	v s.J d|	 dt r5td	d
dlm}
 |
j	sBJ d|

 }|
 }|| }|dkr|d
ksZJ d|d
 | d
ksnJ d|d
  d| |d
 | }t| ||||||dd}|S d}|d
kr|d
 | d
ksJ d|d
  d| d|d
 | }||d f}| jd |d
 krd}n.|dkr|d | d
ksJ d|d  d| d|d | }|d
 |f}ntd| dt| |d
 |d ||||||||dd}|S )aH  

    Split the weight of the specified operation into multiple devices
    and do the computation in parallel.

    Now the following three cases are supported.

    Case 1: Parallel Embedding
        The weight of the embedding operation is a NxM matrix with N rows and M columns.
        With parallel embedding, the weight is split into num_partitions partitions, each
        of which is a matrix with (N/num_partitions + 1) rows and M column where the last
        row as the padding idx.

        Suppose we split the NxM weight into two partitons on device_0 and device_1
        respectively. Then, one each device, the final weight has (N/2 + 1) rows with the
        index range from 0 to N/2. On device_0, all values in the input within [0, N/2 -1]
        keep unchanged and all other values are changed to N/2 which is the padding index and
        are mapped to all zeros after embedding. In the same way, on device_1, the value V in the
        input within [N/2, N-1] will be changed to (V - N/2), and all other values are changed
        to N/2 and are mapped to all zeros after embedding. Finally, the results on the two
        devices are sum-reduced.

        The Embedding put on single card is as shown below:

        .. image:: https://githubraw.cdn.bcebos.com/PaddlePaddle/docs/develop/docs/api/paddle/distributed/img/split_embedding_single.png
            :width: 800
            :height: 350
            :alt: single_embedding
            :align: center

        Parallel Embedding is shown as below:

        .. image:: https://githubraw.cdn.bcebos.com/PaddlePaddle/docs/develop/docs/api/paddle/distributed/img/split_embedding_split.png
            :width: 800
            :alt: split_embedding
            :align: center

    Case 2: Row Parallel Linear
        The weight of the linear operation is a NxM matrix with N rows and M columns.
        With row parallel linear, the weight is split into num_partitions partitions, each
        of which is a matrix with N/num_partitions rows and M column.

        The linear layer put on single card is shown as below, the input variable is represented by X,
        the weight matrix is represented by W and the output vaiable is O. The linear layer on single card is
        simple matrix multiplication operation, O = X * W.

        .. image:: https://githubraw.cdn.bcebos.com/PaddlePaddle/docs/develop/docs/api/paddle/distributed/img/split_single.png
            :width: 800
            :alt: single_linear
            :align: center

        Row Parallel Linear is shown as below. As the name suggests, Row Parallel Linear splits the weight matrix W into
        [[W_row1], [W_row2]] along the row. And accordingly the input is splitted along the column into [X_col1, X_col2] and multiply their
        respective weight matrices. Finally apply AllReduce on the output from each card to get the final output.

        .. image:: https://githubraw.cdn.bcebos.com/PaddlePaddle/docs/develop/docs/api/paddle/distributed/img/split_row.png
            :width: 800
            :alt: split_row
            :align: center

    Case 3: Column Parallel Linear
        The weight of the linear operation is a NxM matrix with N rows and M columns.
        With column parallel linear, the weight is split into num_paratitions partitions, each
        of which is a matrix with N rows and M/num_partitions column.

        The linear layer put on single card has been illustrated on case 2 and Column Parallel Linear
        is shown as below. The Column Parallel Linear splits the weight matrix W into [W_col1, W_col2] along the column and
        these splitted matrices respectively multiply the input. Finally apply AllGather on the output from each card to get the final output.

        .. image:: https://githubraw.cdn.bcebos.com/PaddlePaddle/docs/develop/docs/api/paddle/distributed/img/split_col.png
            :width: 800
            :alt: split_col
            :align: center

    As observed, the column parallel linear and row parallel linear can be combined to skip one ALLGATHER communication
    operator. Furthermore the Attention and MLP can be combined to imporve the performance as shown below.

    .. image:: https://githubraw.cdn.bcebos.com/PaddlePaddle/docs/develop/docs/api/paddle/distributed/img/split_col_row.png
            :width: 800
            :alt: split_col_row
            :align: center

    Args:
        x (Tensor): Input tensor. It's data type should be float16, float32, float64, int32 or int64.
        size (list|tuple): A list or tuple with two elements indicating the shape of the weight.
        operation (str): The name of the operation. The supported operations are 'linear' and 'embedding'.
        axis (int, Optional): Indicate along which axis to split the weight. Default: 0.
        num_partitions (int, Optional): How many parts the weight is partitioned. Default: 1.
        gather_out (bool, Optional): Whether to gather the output after computation. By default, the output
            on each partitions will be gathered after computation. Default: True.
        weight_attr (ParamAttr, Optional): The parameter attribute for the learnable
            weights(Parameter) of the specified operation. Default: None.
        bias_attr (ParamAttr, Optional): The parameter attribute for the bias
            of the specified operation. Default: None.
        name (str, Optional): The default value is None. Normally there is no need for user to set this
            property. Default: None. For more information, please refer to :ref:`api_guide_Name`.

    Returns:
        Tensor.

    Examples:
        .. code-block:: python

            >>> # doctest: +REQUIRES(env:DISTRIBUTED)
            >>> import paddle
            >>> import paddle.distributed.fleet as fleet

            >>> paddle.enable_static()
            >>> paddle.set_device('gpu:%d'%paddle.distributed.ParallelEnv().dev_id)
            >>> fleet.init(is_collective=True)
            >>> data = paddle.randint(0, 8, shape=[10,4])
            >>> emb_out = paddle.distributed.split(
            ...     data,
            ...     (8, 8),
            ...     operation="embedding",
            ...     num_partitions=2)

    zDThe type of size for paddle.distributed.split must be list or tuple.   zCNumber of elements in size of paddle.distributed.split must be two.z?The type of operation for paddle.distributed.split must be str.r   r   z:The operation for paddle.distributed.split must be one of rT   zpaddle.distributed.split cannot be used in dynamic graph mode, plese use ParallelEmbedding, ParallelRowLinear, ParallelColumnLinear instead.r   )fleetzDTo use paddle.distributed.split, you must call fleet.init() firstly.zJWe only support to split the weight of embedding along the first axis now.zYThe length of the vocabulary must be divisible by num_partitions but received vocabulary=z num_partitions=Nr   Fz)Number of rows of the weight for linear (z') must be divisible by num_partitions (r{   ru   rW   Tz+Number of column of the weight for linear (z9The value of axis must be 0 or 1, but the value given is )r_   r   )
isinstancer   tupler   strr
   r   Zpaddle.distributed.fleetr   Z_role_makerZworker_indexZ
worker_numr   r1   r   )rr   r   Z	operationr}   r   r   rn   ro   r_   Zsupported_operationsr   r,   r-   r   r   Zemb_outZshould_splitZlinear_sizer   r   r   r   split  s    


r   )NF)N)r   rW   N)NFrz   )NN)r   ru   TNNN)%r2   r   r   Zpaddle.autogradr   Zpaddle.base.data_feederr   r   Zpaddle.distributedr   Zpaddle.frameworkr   r	   r
   Z	paddle.nnr   Zpaddle.nn.utilsr   Zcommunication.reducer   r   r   r+   r   rI   rN   rQ   r    rR   ra   rb   r   rs   r   r   r   r   r   r   r   r   <module>   sP   
 
+
>6-

0/

D7
l
<