o
    "jy                     @   sR   d dl mZ d dlZd dlmZ d dlmZ d dlm	Z	m
Z
mZ G dd dZdS )    )defaultdictN)core)fleet)BlockProgramin_dynamic_modec                   @   s   e Zd ZdZ						dddZdd Zd	d
 Zdd Zdd Zdd Z	dd Z
dd Zdd Zdd Zdd Z			dddZdS ) HybridParallelInferenceHelperaB   
    A helper class to split program for inference with hybrid parallelism.

    Args:
        startup_program (Program): the startup program.
        main_program (Program): the main program.
        num_mp (int): number of model parallel degree. Default ``1``.
        num_pp (int): number of pipeline parallel degree. Default ``1``.
        micro_batch_size (int): number of micro batch size. Default ``1``.
        beam_size (int): number of beam search size. Default ``1``.
        init_comm (bool): wheter if initilize comminication group. Default ``True``.
        role_maker (RoleMakerBase or subclass): user custom define RoleMakerBase.
            If ``role_maker==None``, then use PaddleCloudRoleMaker. Default ``None``.

    Returns:
        None.

    Write Paradigm:

        .. code-block:: text
            :name: text-example1

            >>> # doctest: +REQUIRES(env:DISTRIBUTED, env:GPU)
            >>> import paddle
            >>> # while op pattern
            >>> with paddle.base.device_guard(f'{device}:all'):
            ...     # init global cond
            ...     max_len = paddle.full(shape=[1], dtype="int64", fill_value=10)
            ...     step_idx = paddle.full(shape=[1], dtype="int64", fill_value=0)
            ...     cond_int = paddle.full(shape=[1], dtype="int64", fill_value=0, name="cond_int")
            ...     cond = layers.cast(step_idx < max_len, dtype="bool")
            ...     while_op = layers.While(cond, is_test=True)

            ...     # init global lod_tensor_array for generation task
            ...     arr = paddle.tensor.array_write(data, step_idx)

            >>> with while_op.block():
            ...     with paddle.base.device_guard(f'{device}:all'):
            ...         # read data from global lod_tensor_array
            ...         element_in_arr = paddle.tensor.array_read(array=arr, i=step_idx)
            ...         # write placehold data to global lod_tensor_array,
            ...         # it need for send_v2 of lod_tensor_array
            ...         paddle.increment(x=step_idx, value=1.0)
            ...         paddle.tensor.array_write(element_in_arr, i=step_idx, array=arr)
            ...     with paddle.base.device_guard(f'{device}:0'):
            ...         pass # some code
            ...     with paddle.base.device_guard(f'{device}:1'):
            ...         pass # some code
            ...     with paddle.base.device_guard(f'{device}:{num_pp-1}'):
            ...         # generate some data in while block and write to global lod_tensor_array
            ...         # that they are read in next while step.
            ...         # we will using send_v2 to send global lod_tensor_array to other pipeline and sync
            ...         paddle.tensor.array_write(other_var, i=step_idx, array=arr)
            ...         # update cond and assign to cond_int, we will sync cond_int
            ...         layers.assign(layers.cast(cond, dtype="int32"), cond_int)
            ...     with paddle.base.device_guard(f'{model._device}:all'):
            ...         # the code below must at end of while block and exists in device:all
            ...         layers.assign(layers.cast(cond_int, dtype='bool'), cond)

            >>> with paddle.base.device_guard(f'{model._device}:all'):
            ...     # use a empty lod_tensor_array to clear lod_tensor_array
            ...     layers.assign(layers.create_array(data.dtype), arr)

    Examples:

        .. code-block:: python
            :name: code-example1

            >>> # doctest: +REQUIRES(env:DISTRIBUTED, env:GPU)
            >>> import os
            >>> import numpy as np
            >>> import paddle
            >>> import paddle.distributed.fleet as fleet
            >>> from paddle.distributed.fleet.utils import hybrid_parallel_inference
            >>> paddle.enable_static()
            >>> nranks = int(os.getenv("PADDLE_TRAINERS_NUM", 1))
            >>> rank = int(os.getenv("PADDLE_TRAINER_ID", 0))
            >>> dev_id = int(os.getenv("FLAGS_selected_gpus", 0))
            >>> main_program = paddle.static.Program()
            >>> startup_program = paddle.static.Program()
            >>> if nranks > 1:
            ...     dist_strategy = fleet.DistributedStrategy()
            ...     dist_strategy.without_graph_optimization = True
            ...     fleet.init(is_collective=True, strategy=dist_strategy)
            >>> device = "gpu"
            >>> with paddle.static.program_guard(main_program, startup_program):
            ...     with paddle.base.device_guard(f'{device}:0'):
            ...         X = paddle.static.data(name='X', shape=[None, 2], dtype='float32')
            ...     with paddle.base.device_guard(f'{device}:all'):
            ...         max_len = paddle.full(
            ...             shape=[1], dtype="int64", fill_value=5, name="n")
            ...         step_idx = paddle.full(
            ...             shape=[1], dtype="int64", fill_value=0, name="i")
            ...         data = paddle.tensor.array_write(X, step_idx)
            ...         cond_int = paddle.full(shape=[1], dtype="int64", fill_value=0, name="cond_int")
            ...         cond = paddle.less_than(x=step_idx, y=max_len)
            ...         while_op = paddle.static.nn.control_flow.While(cond, is_test=True)
            ...     with while_op.block():
            ...         with paddle.base.device_guard(f'{device}:all'):
            ...             input = paddle.tensor.array_read(array=data, i=step_idx)
            ...             paddle.increment(x=step_idx, value=1.0)
            ...             paddle.tensor.array_write(input, i=step_idx, array=data)
            ...         with paddle.base.device_guard(f'{device}:0'):
            ...             param_attr = paddle.ParamAttr(initializer=paddle.nn.initializer.Constant(1.0))
            ...             weight1 = paddle.static.create_parameter(
            ...                 shape=[2, 5], dtype='float32', attr=param_attr, is_bias=False)
            ...             hidden1 = paddle.matmul(input, weight1)
            ...         with paddle.base.device_guard(f'{device}:1'):
            ...             param_attr = paddle.ParamAttr(initializer=paddle.nn.initializer.Constant(2.0))
            ...             weight2 = paddle.static.create_parameter(
            ...                 shape=[5, 2], dtype='float32', attr=param_attr, is_bias=False)
            ...             hidden2 = paddle.matmul(hidden1, weight2)
            ...             paddle.tensor.array_write(hidden2, i=step_idx, array=data)
            ...             # update cond and assign to cond_int, we will sync cond_int
            ...             paddle.assign(paddle.less_than(x=step_idx, y=max_len), cond)
            ...             paddle.assign(paddle.cast(cond, dtype="int32"), cond_int)
            ...         with paddle.base.device_guard(f'{device}:all'):
            ...             # the code below must at end of while block and exists in device:all
            ...             paddle.assign(paddle.cast(cond_int, dtype='bool'), cond)
            ...     with paddle.base.device_guard(f'{device}:all'):
            ...         out = paddle.tensor.create_array(data.dtype)
            ...         paddle.assign(data, out)
            ...     with paddle.base.device_guard(f'{device}:all'):
            ...         # use a empty lod_tensor_array to clear lod_tensor_array
            ...         paddle.assign(paddle.tensor.create_array(data.dtype), data)
            >>> helper = hybrid_parallel_inference.HybridParallelInferenceHelper(startup_program, main_program, micro_batch_size=2, num_pp=2, init_comm=nranks>1)
            >>> helper.gen_infer_program(['array_write_0.out'], ['cond_int.tmp_0'])
            >>> exe = paddle.static.Executor(paddle.CUDAPlace(dev_id))
            >>> exe.run(startup_program)
            >>> np.random.seed(2333)
            >>> for step in range(5):
            ...     init_data = np.random.uniform(low=0.0, high=1.0, size=[2, 2]).astype('float32')
            ...     [res] = exe.run(main_program, feed={"X": init_data}, fetch_list=[out])
            ...     print('-------- step', step, ' --------')
            ...     print(res)

       TNc	                 C   s  t |tsJ t |tsJ d | _t rd| _| jsJ dt r&J dtj}	|	j| _|		 | _
|	 | _i | _g | _g | _i | _d| _|| _|| _|| _d | _d | _|| _|| _|d u ritjjjdd| _nt |tjjjry|jsvJ || _d| _d| _ | j! | _"| j"| j#  | _$| j# | _%| j& | _'|| | j'ksJ || _(|| _)| j"| _*| j%| _+| j'| _,t-.d| j(| j) /| j(| j)g}
t-0|
| j%k\}}|d }|d }|
|d d f | _1|
d d |f | _2|| _3d S )	NZgpuzOnly gpu are supported.z$Only static graph mode is supported.   T)Zis_collectiver   r	   )4
isinstancer   _devicer   Zis_compiled_with_cudar   Zop_proto_and_checker_makerZOpRole_op_roleZkOpRoleAttrName_op_role_keyZkOpDeviceAttrName_op_device_key_param_device_map_pipeline_pair_pipeline_pair_in_while_pp_ring_mapring_idmicro_batch_size	beam_size	init_comm_output_var_to_op_input_var_to_op_main_program_startup_programr   base
role_makerZPaddleCloudRoleMakerZRoleMakerBaseZ_is_collective
mp_ring_idglobal_ring_idZ_get_trainer_endpoints	endpointsZ_worker_indexcurrent_endpointrankZ_worker_numZnranksnum_ppnum_mpglobal_endpointsglobal_rankZglobal_nranksnpZarangeZreshapewheremp_grouppp_group_stage)selfZstartup_programZmain_programr$   r#   r   r   r   r   Zop_makerZarrZippimp r.   y/var/www/html/Deteccion_Ine/venv/lib/python3.10/site-packages/paddle/distributed/fleet/utils/hybrid_parallel_inference.py__init__   sf   





z&HybridParallelInferenceHelper.__init__c                    s  g } j D ]}|\}}||vr|| ||vr|| qt|}td|}| jks7J d| d j tjjj j	dd}|
 j j j j jd jd  jdkr fdd jD } fd	dt jD d
 }|
 j j|| jd jd  jdkr j D ]\}|d
 d |d  }	 j|	 }
 j|d
  } j|d  } j|kr j|kr|
 jd d d d d jd q j|  j| g} j|krd
nd}|
 j j|||
d jd qd S d S )Nr	   znum_pp: z, self.num_pp: F)Z	wait_portTc                    s   g | ]} j | qS r.   )r    ).0mp_idxr,   r.   r/   
<listcomp>  s    zKHybridParallelInferenceHelper._init_communication_group.<locals>.<listcomp>c                    s   g | ]\}}| j kr|qS r.   )r"   )r1   idxr2   r3   r.   r/   r4     s
    
r     )r   appendlenmaxr#   r   Zmeta_optimizerscommonZCollectiveHelperr   Z_init_communicatorr   r!   r%   r&   r   r$   r)   	enumerater   r   r*   r"   r    )r,   dev_idspairprev_idcur_idr#   Zcollective_helperZmp_endpointsZmp_rankpair_keyr   Z
first_nodeZsecond_nodeZpipeline_endpointsZpipeline_rankr.   r3   r/   _init_communication_group   s   









z7HybridParallelInferenceHelper._init_communication_groupc                 C   sh   t t}t t}t|jD ]"\}}|jD ]}|| ||g q|jD ]}|| ||g q#q||fS )z2
        Get info of op input and output.
        )r   listr;   opsinput_arg_namesr7   output_arg_names)r,   blockZoutput_var_to_opZinput_var_to_opindexopvar_namer.   r.   r/   _get_input_output_infoI  s   

z4HybridParallelInferenceHelper._get_input_output_infoc                 C   sh   dd | j  D }| j jD ]#}|jD ]}|jD ]}||vs#|| jv r$q|| j}|| j|< qqqdS )z5
        Get the device info for parameters.
        c                 S   s   g | ]}|j qS r.   )name)r1   paramr.   r.   r/   r4   ^  s    zJHybridParallelInferenceHelper._update_param_device_map.<locals>.<listcomp>N)r   Zall_parametersblocksrC   rD   r   attrr   )r,   paramsZ
each_blockrH   rI   devicer.   r.   r/   _update_param_device_mapZ  s   


z6HybridParallelInferenceHelper._update_param_device_mapc                 C   s  t  }||}d}t|jD ]}|| jdd }|dks&t||kr|d7 }|jdkrt|dj	}	| 
|||	}
||
 g }|d}t|D ]\}}||
vr[|| qNt|dkrut|D ]}|| qf|jd| g }|d}t|D ]\}}||
vr|| qt|dkrt|D ]}|| q|jd| |j|j D ]}|| qq|| qt|j D ]}||vr|| q|S )	a  
        Split a program and get the one with the given pipeline stage.

        Args:
            stage (int): pipeline stage
            block_idx (int): block index

        Returns:
            used_var_names (set): used var names in block_idx block
        r   :r	   allwhile	sub_blockXOut)setrF   rB   rC   rN   r   splitinttypeid_split_programupdateinputr;   r7   r8   reversedpopdescZ	set_inputoutputZ
set_outputrD   rE   addZ
_remove_opvarskeysZ_remove_var)r,   programstageZ	block_idxZused_var_namesrF   Zop_idxrH   Zop_stagesub_block_idZsub_used_var_namesZ
input_idxsrD   irK   Zoutput_idxsrE   rI   r.   r.   r/   r]   k  sT   







z,HybridParallelInferenceHelper._split_programc                 C   sB   | j | }|du rdS d}t|D ]\}}||k r|} |S q|S )ze
        Find the previous op of op with index that outputs
        variable named var_name.
        N)r   r`   )r,   rG   rI   Zprev_opsZ	result_opprev_opZprev_idxr.   r.   r/   _find_prev_op  s   
z+HybridParallelInferenceHelper._find_prev_opc                 C   sp   t |tsJ g d}|jD ]'}|j|v r|| j| jd  |jdkr5|dj}|j	
|}| | qdS )z
        Add op_device attrribute for ops in block that have
        not that attribute set.

        Args:
            block (Block): the block to process.
        )Zcreate_py_readerreadZcreate_double_buffer_readerrT   z:allrT   rU   N)r   r   rC   r[   Z	_set_attrr   r   rN   r\   rg   rF   _add_op_device_attr)r,   rF   Zdevice_all_opsrH   ri   rU   r.   r.   r/   rn     s   



z1HybridParallelInferenceHelper._add_op_device_attrc           
      C   s4  t |tsJ d}|jD ]}|| jsJ d|j| j|| j}|t| j	j
ks0J d||jsP|jdv s?J d|dj}|j|}| | || js`J d|j| j|| j}|stJ |j d| j d	|d
d dkr~q|d
d }|| jksJ t|d
d }	|	}qdS )zn
        Check whether ops in a block have both the op_device and the
        op_role attributes set.
        Nz{} has no {} set .z(Only forward is supported for inference.)rT   Zconditional_blockz.The only supported op without kernel is while.rU   z{} has no {} set.z has no z set.rR   r	   rS   r   )r   r   rC   Zhas_attrr   formatr[   rN   rZ   r   ForwardZ_has_kernelr\   rg   rF   _check_validationr   rY   r   )
r,   rF   Zpre_stage_idrH   op_roleri   rU   rP   Zdev_typeZstage_idr.   r.   r/   rq     s>   

z/HybridParallelInferenceHelper._check_validationc                    s  i ddit tjD ]\j}|dd dkr!qjD ]		s1	r1q$		}|j
r:q$d}j	}|du rQ	jvrLq$j	 }	}|sc|ra|jnd}|du sp|dd dkrqq$||krvq$	vr~g 	< ||f	 v rq$j|dd ksJ d|dd d  	f
dd	  t|dd
 t|dd
  q$q  dS )zp
        Insert a pair of send and recv ops for every two
        consecutive ops on different devices.
        rG   r   rR   rS   Nz More than one device type found.c           
         s  | |ksJ t |  }t | }||f	 v rd S | | dkr; | d |  | | d  	 ||f d S | | dksCJ 	 ||f j}j	 }|| f}rh|jvrhj| |d |  }|jvrj| jj|< j} jd7  _nj| }j	d  dd|ij
|j|ddddd	|id
 d  d7  < t|j}	|	d dk r͈rȈjj |	d< nj|	d< j	d  dd|gid|	d|jj
|j|ddddd	|id d  d7  < d S )Nr	   r6   rG   send_v2rV   use_calc_streamTpeerr   rG   r[   Zinputsattrsr   recv_v2rW   	out_shapedtyperG   r[   Zoutputsrx   )strr7   rN   r   re   r   r   r   r   _insert_op_without_syncr   rB   shaper   r   r{   )
r?   r>   Zcur_devZprev_devrr   varr=   r@   r   	var_shape
_insert_send_recvrF   Zdevice_typeZextra_index_inforG   Zinput_var_to_deviceis_while_blockrH   r,   rI   r.   r/   r   C  sx   








z\HybridParallelInferenceHelper._insert_sendrecv_ops_for_boundaries.<locals>._insert_send_recvr	   )r;   rB   rC   rN   r   rY   rD   Zhas_varZ_find_var_recursiver   Zis_datar   getr   rl   r   rZ   _sync_with_cpp)r,   rF   r   Z
cur_devicer   Zprev_deviceZgenerate_opsrk   r.   r   r/   #_insert_sendrecv_ops_for_boundaries  sX   



KvzAHybridParallelInferenceHelper._insert_sendrecv_ops_for_boundariesc                 C   s  g }| j D ]}|\}}||vr|| ||vr|| qt|dkr&d S t|}	t|}
t|jdks9J dt|jd }|D ]}||krIqB||ksOJ ||f}|d | }|| jvru| j| | j| j|< | j}|  jd7  _n| j| }||
kr||	kr|| }n|}|D ]y}|	|}||kr|j
|dd|i| j| jd t| | jt| jjd	d
ddd|id nGt|j}t| t|jdkr|d dk r| jn|d |d< |j
|dd|gid|d|j| j| jd t| | jt| jjd	d
ddd|id |d7 }qqB|  d S )Nr      zIt must have more than 2 ops in while sub block, layers.assign(layers.cast(cond_int, dtype='bool'), cond) must at end of while block, because nccl cannot send bool dtype varr6   r	   rt   rV   rR   ru   Trv   r   rw   ry   rW   rz   r{   r|   )r   r7   r8   minr9   rC   r   r   r   Z_var_recursiver~   r   r   r}   r   rZ   r   rp   rB   r   printr   r{   r   )r,   rF   &sync_in_while_lastpp2firstpp_var_namessync_in_while_var_namesrh   r<   r=   r>   r?   Zfirst_idZlast_idrG   r@   r   Z	var_namesrI   r   r   r.   r.   r/   #_insert_sendrecv_ops_in_while_block  s   







+zAHybridParallelInferenceHelper._insert_sendrecv_ops_in_while_blockc                 C   sd   | j  }d}d}|jD ]}|dk sJ d|jdkr%|dj}|d7 }q|r0|| j |fS dS )	z*
        Get the while sub-block.
        r   Nr   zMore than one while op found.rT   rU   r	   )NN)r   global_blockrC   r[   rN   r\   rF   )r,   
main_blockZ	num_whileri   rH   r.   r.   r/   _get_while_block  s   


z.HybridParallelInferenceHelper._get_while_blockFc                 C   s  | j  }| j }|rFtdd}|t| j  W d   n1 s$w   Y  tdd}|t| j W d   n1 sAw   Y  | | | | | | | | |   | 	|\}}|| _
|| _| |d |  \}	}
|
r| 	|
\}}|| _
|| _| |
d | |
||| j | | j| jd | | j | jd |rtd| j d}|t| j  W d   n1 sw   Y  td	| j d}|t| j W d   n1 sw   Y  | jr|   dS dS )
a  
        Generate inference program.
        Params:
            sync_in_while_lastpp2firstpp_var_names (list(str)): the vars in the last pipeline
                that need to send var to first pipeline and exclude bool dtype var
            sync_in_while_var_names (list(str)): the vars sync among all pipeline in while block
                e.g cond. Note that cond cannot be bool dtype.
            debug (bool): the flag indicate debug
        zmain_program.txtwNzstartup_program.txtFTr   zmain_program.txt.zstartup_program.txt.)r   r   r   openwriter}   rn   rq   rQ   rJ   r   r   r   r   r   r+   r]   r"   r   rA   )r,   r   r   debugr   Zstartup_blockfZout_var_to_opZin_var_to_opZwhile_opZwhile_blockr.   r.   r/   gen_infer_program  sX   





z/HybridParallelInferenceHelper.gen_infer_program)r	   r	   r	   r	   TN)NNF)__name__
__module____qualname____doc__r0   rA   rJ   rQ   r]   rl   rn   rq   r   r   r   r   r.   r.   r.   r/   r      s2     
OVN& 
ar   )collectionsr   numpyr'   Zpaddle.baser   Zpaddle.distributedr   Zpaddle.frameworkr   r   r   r   r.   r.   r.   r/   <module>   s   