o
    "j$?                     @   s  d dl Z d dlZd dlZd dlmZ d dlmZ d dlmZm	Z	 d dl
mZ d dlmZ d dlmZ dd	lmZmZmZmZ dd
lmZ dd ZdddZdddZG dd deZG dd deZG dd deZG dd deZdd ZG dd dejZ dS )     N)nn)PyLayer)global_gatherglobal_scatter)check_nccl_version_for_p2p)in_dynamic_mode)recompute_hybrid   )BaseGate
GShardGate	NaiveGate
SwitchGate)count_by_gatec                 C   s<   |j dgkrt| |d}|S tjd| j d g| jd}|S )Nr   r	   dtype)shapepaddleZindex_selectemptyr   )inpposinp_buf r   q/var/www/html/Deteccion_Ine/venv/lib/python3.10/site-packages/paddle/incubate/distributed/models/moe/moe_layer.py_local_scatter&   s
   r   Tc                 C   sv   |j dgkr,| j}tj| dd} tjtj|| j d gdd|| dd}tj||d}|S tj|| j d g| jd}|S )Nr   Zfloat32r   )r   r   T)	overwrite)r   r   r   castZscatterZzeros)r   r   out_batch_sizemaybe_overlapZorigin_dtyper   r   r   r   _local_gather.   s   r   c              	   C   s   |d ur
|  s
d S t r;|d u rtjj n|}t| j}|d  |j9  < t	|| j
}|j| |}|  |S |d u rAdn|j}|d u rOtjj jn|j}tj| d|d|d|S )Nr   use_calc_streamring_idnranks)Z	is_memberr   r   distributedZ
collectiveZ_get_default_grouplistr   r"   r   r   Zprocess_group
all_gatherwaitidZ_get_global_groupZ_legacy_C_opsZc_allgather)Ztensorgroupr    Ztensor_shapeouttaskr!   r"   r   r   r   _all_gather@   s6   
r+   c                   @   ,   e Zd ZdZe	dddZedd ZdS )
MoEScatterz
    Scatter input samples from [batch x sequences] to contiguous alone experts.
    If `world_size` is greater than 1, the samples will first be locally
    scattered, and then exchanged across workers.
    Nc                 C   sR   t ||}|dkrt||||d}	n|}	|jd ||f| _|||f}
| j|
  |	S )Nr	   r(   r   )r   r   r   moe_argssave_for_backward)ctxr   r   local_expert_countglobal_expert_countfwd_batch_size
world_sizer(   Zlocal_input_bufZglobal_input_buf	variablesr   r   r   forwardj   s   


zMoEScatter.forwardc           
      C   sP   |   \}}}| j\}}}|dkrt||||d}n|}t|||}	|	d d d fS Nr	   r.   )saved_tensorr/   r   r   )
r1   Zgradr   r2   r3   Zinp_batch_sizer5   r(   Zlocal_grad_inZgrad_inr   r   r   backward   s   zMoEScatter.backwardN__name__
__module____qualname____doc__staticmethodr7   r:   r   r   r   r   r-   c   s    	r-   c                   @   r,   )	MoEGatherz
    Gather output samples from contiguous alone experts back to [batch x
    sequences]. Works symmetrically with MoEScatter.
    Nc                 C   sX   |dkrt ||||d}n|}t|||dd}	|jd ||f| _|||f}
| j|
  |	S )Nr	   r.   F)r   r   )r   r   r   r/   r0   )r1   Zglobal_output_bufr   r2   r3   local_batch_sizer5   r(   Zlocal_output_bufoutputr6   r   r   r   r7      s   

zMoEGather.forwardc           
      C   sN   |   \}}}| j\}}}t||}|dkrt||||d}	n|}	|	d d d fS r8   )r9   r/   r   r   )
r1   grad_outr   r2   r3   r4   r5   r(   Zgrad_out_bufZglobal_grad_out_bufr   r   r   r:      s   
zMoEGather.backwardr;   r<   r   r   r   r   rB      s    	rB   c                   @   (   e Zd ZdZedd Zedd ZdS )	AllGatherzP
    A wrapper for the All-Gather function to support auto-differentiation.
    c                 C   s8   g }t jj|||d t j|dd}||jd f| _|S Nr.   r   Zaxis)r   r#   r%   concatr   args)r1   r   rankr5   r(   Ztensor_listrD   r   r   r   r7      s
   zAllGather.forwardc                 C   s.   | j \}}tj|dg|| g|d | gdS )Nr   r	   ZaxesZstartsZends)rK   r   slice)r1   rE   rL   Zdim0r   r   r   r:      s   
zAllGather.backwardNr<   r   r   r   r   rG      s    
rG   c                   @   rF   )SlicezK
    A wrapper for the Slice function to support auto-differentiation.
    c           	      C   sN   |j d }|| }|| }t|| |}tj|dg|g|gd}||f| _|S )Nr   rM   )r   minr   rN   rK   )	r1   r   rL   r5   r(   BrC   Zbatch_startZ	batch_endr   r   r   r7      s   

zSlice.forwardc                 C   s   | j \}}t||dS )Nr.   )rK   r+   )r1   rE   r5   r(   r   r   r   r:      s   
zSlice.backwardNr<   r   r   r   r   rO      s    
rO   c           	      C   sr   t | |||d\}}}t  |||gjdd}t|  }W d    n1 s-w   Y  |||||fS rH   )r   r   Zno_gradreshape_sumintitem)	gate
num_expertr5   	moe_groupr   r2   r3   fwd_expert_countr4   r   r   r   prepare_forward   s"   
rZ   c                       s4   e Zd ZdZ					d fdd	Zdd Z  ZS )	MoELayera
  MoE Layer
    Args:
        d_model (int): Model dimention.
        experts (nn.LayerList): Expert networks list.
        gate (dict|NaiveGate|SwitchGate|NaiveGate):

            - If gate is a dict:
              gate is a gate network config, containing 2 keys:
              `type` (str) value can be: "naive", "gshard", "switch" or None, default is "gshard".
              `top_k` (int) Default value is 2.
            else gate is an instance of NaiveGate|SwitchGate|NaiveGate:

        moe_group: moe group for experts communication.
        mp_group: mp group for mp communication.
        recompute_interval (int, optional): Whether to use recompute, default 0, means to disable recompute.
        recompute_ctx (dict, optional): The context for recompute, if recompute_interval > 1, recompute_ctx must be given.

    Examples:

        .. code-block:: python

            >>> # doctest: +SKIP('Until Distributed move successfully, just skip it')
            >>> from paddle.nn import layer, LayerList
            >>> from paddle.distributed.moe import MoElayer
            >>> from paddle.distributed.collective import Group
            >>> from paddle.distributed import fleet

            >>> moe_group = Group(fleet.worker_index(),
            ...                   0,
            ...                   list(range(fleet.worker_num())))
            >>> mp_group = None

            >>> num_experts=8
            >>> dim_feedforward=512
            >>> d_model=8
            >>> top_k=2

            >>> class ExpertLayer(Layer):
            ...     def __init__(self, d_model, d_hidden, name=None,rank=0, windex = 0, num_expert=1):
            ...         super().__init__()
            ...         self.htoh4 = nn.Linear(d_model, d_hidden)
            ...         self.h4toh = nn.Linear(d_hidden, d_model)

            ...     def forward(self, x):
            ...         x = self.htoh4(x)
            ...         x = self.h4toh(x)
            ...         return x

            >>> gate_config = {
            ...         "type": "gshard",
            ...         "top_k": top_k,
            ... }

            >>> experts_list = LayerList()
            >>> for expi in range(num_experts):
            ...     exp_layer = ExpertLayer(d_model, dim_feedforward // top_k, windex=expi, num_expert=num_experts)
            ...     experts_list.append(exp_layer)

            >>> moeLayer = MoELayer(d_model = d_model,
            ...                     experts=experts_list,
            ...                     gate=gate_config,
            ...                     moe_group=moe_group,
            ...                     mp_group=mp_group,
            ...                     recompute_interval=0)

    Nr   c                    s  t    || _|d u ri }t|ttfsJ d|| _d| _| jd ur)| jj| _t	|| _
|| _|d us7J || _| jdkrJtdd dkrJt  || _|| _t|tr|dd| _|dd}|d	ksj|d u rxt| jt	|| j| jd
}nK|dkrt| jt	|| j| j| jd}n7|dkrt| jt	|| j| j| jd}n#tdt|t|tr|j| _nt|trtdt|td|| _d S )Nz9gate config' type must be dict or an instance of BaseGater	   ZPADDLE_DISTRI_BACKENDZxccltop_k   typeZgshardZnaive)rW   r5   topk)rW   r5   r_   r(   switchzWe only support naive gate,                                 gshard gate and switch gate,                                 but you choose {} gate.zUnimplemented gate type: z/gate's type must be either dict or moe.BaseGate)super__init__recompute_ctx
isinstancedictr
   r(   r5   r"   lenrW   recompute_intervalexpertsosgetenvr   mp_groupd_modelgetr\   r   r   r   AssertionErrorformatstr	TypeErrorr^   rV   )selfrl   rh   rV   rX   rk   rg   rc   	__class__r   r   rb   K  sv   










zMoELayer.__init__c              	   C   s  t |jdks	J |j}|d|d g}d}d}| jd ur&| jj}| jj}|dkr3t|||| j}| |\}}t	|| j
| j| j\}}}	}
}d}t |jdkrW|jd }|jdgkrb|| }n|}|| jkskJ t||||	|| j| j}| j}dd }| jdks|jd dkr|||
 | j}nt| j|||
 | j}|jd }t |jdkr||jd 9 }t||||	|| j| j}|d| j|g}||jd d| jg}t||d|g}|dkrt|||| j}t||}|S )N   r   r]   r   r	   c                 S   s   | j d dkr	| S g }d}t|tjsJ t|t|ksJ t|D ]\}}|dkr,q#||| | |||   || }q#tj|ddS )Nr   rI   )	r   rd   npZndarrayrf   	enumerateappendr   rJ   )xrY   rh   yZ
last_indexidxZexpert_countr   r   r   experts_fwd  s   
z%MoELayer.forward.<locals>.experts_fwd)rf   r   rR   rk   rL   r"   rO   applyrV   rZ   rW   r5   r(   r\   r-   rl   rg   numpyrh   r   rc   rB   Zreshaper   ZbmmrG   )rr   r   Zorigin_shapeZmp_rankZmp_sizevaluerV   r   r2   r3   rY   r4   r_   Ztemp_posry   rl   r|   r   r   r   r   r7     s   





zMoELayer.forward)NNNr   N)r=   r>   r?   r@   rb   r7   __classcell__r   r   rs   r   r[     s    GOr[   )T)NT)!ri   r~   rv   r   r   Zpaddle.autogradr   Z"paddle.distributed.utils.moe_utilsr   r   Z#paddle.distributed.utils.nccl_utilsr   Zpaddle.frameworkr   Z!paddle.incubate.distributed.fleetr   rV   r
   r   r   r   utilsr   r   r   r+   r-   rB   rG   rO   rZ   ZLayerr[   r   r   r   r   <module>   s(   

#24