o
    "j                     @   sb   d dl Z ddlmZ ddlmZmZ G dd de jjjj	Z
G dd dZdd
dZdddZdS )    N   )Variable)LayerHelpercorec                       s,   e Zd ZdZ fddZ fddZ  ZS )BlockGuardServzl
    BlockGuardServ class.

    BlockGuardServ class is used to create an op with a block in a program.
    c                    s,   t |ts	tdt |jj || _d S )Nz$BlockGuardServ takes a ListenAndServ)
isinstanceListenAndServ	TypeErrorsuper__init__helpermain_programserver)selfr   	__class__ \/var/www/html/Deteccion_Ine/venv/lib/python3.10/site-packages/paddle/incubate/nn/layer/io.pyr      s   

zBlockGuardServ.__init__c                    s&   |d urdS | j   t |||S )NF)r   complete_opr
   __exit__)r   exc_typeexc_valexc_tbr   r   r   r   !   s   
zBlockGuardServ.__exit__)__name__
__module____qualname____doc__r   r   __classcell__r   r   r   r   r      s    r   c                   @   s:   e Zd ZdZdddZdd Zdd	 Zd
d Zdd ZdS )r   a  
    **ListenAndServ Layer**

    ListenAndServ is used to create a rpc server bind and listen
    on specific TCP port, this server will run the sub-block when
    received variables from clients.

    Args:
        endpoint(string): IP:port string which the server will listen on.
        inputs(list): a list of variables that the server will get from clients.
        fan_in(int): how many client are expected to report to this server, default: 1.
        optimizer_mode(bool): whether to run the server as a parameter server, default: True.

    Examples:
        .. code-block:: python

            >>> # doctest: +REQUIRES(env:DISTRIBUTED)
            >>> from paddle.incubate.nn.layer.io import ListenAndServ
            >>> import paddle
            >>> paddle.enable_static()
            >>> place = paddle.CPUPlace()
            >>> main = paddle.static.Program()
            >>> with paddle.static.program_guard(main):
            ...     serv = ListenAndServ(
            ...         "127.0.0.1:6170", ["X"], optimizer_mode=False)
            ...     with serv.do():
            ...         x = paddle.static.data(
            ...             shape=[32, 32],
            ...             dtype='float32',
            ...             name="X")
            ...         paddle.nn.initializer.Constant(value=1.0)(x, main.global_block())
            ...         paddle.scale(x=x, scale=10.0)

            >>> exe = paddle.static.Executor(place)
            >>> exe.run(main)
       Tc                 C   s,   t d| _|| _g | _|| _|| _|| _d S )Nlisten_and_serv)r   r   inputsoutputsendpointfan_inoptimizer_mode)r   r"   r    r#   r$   r   r   r   r   O   s   

zListenAndServ.__init__c                 C   s   t | S )N)r   )r   r   r   r   doY   s   zListenAndServ.doc           	      C   s   | j j}| }|  }g }g }|jD ]@}| jr5d|jv r4d|jv r4||jd j ||jd j q|j	D ]}|
|D ]}||| ||| q?q8q||fS )NZGradParam)r   r   current_blockparent_blockopsr$   r    appendnameZinput_namesinputvar)	r   r   r'   r(   paramsZgradsopZinameZin_var_namer   r   r   get_params_and_grads\   s$   

z"ListenAndServ.get_params_and_gradsc                 C   s,   | j j}| j}|dksJ ||}|S )Nr   )r   r   r'   
parent_idxblock)r   progr1   r(   r   r   r   r(   r   s
   

zListenAndServ.parent_blockc              
   C   sV   ddl m} | jj}| }|  }|jdd| jii | j| j	|g|j
dgdd d S )Nr   )DistributedModer   X )r"   ZFaninZoptimize_blocksZdistributed_modeZgrad_to_block_idtyper    r!   attrs)Z7paddle.incubate.distributed.fleet.parameter_server.moder4   r   r   r'   r(   	append_opr    r"   r#   ZSYNC)r   r4   r   r'   r(   r   r   r   r   y   s    
zListenAndServ.complete_opN)r   T)	r   r   r   r   r   r%   r0   r(   r   r   r   r   r   r   )   s    
%
r   Tc              
   C   s   t |tksJ |du rg }nt|tr|g}t |tksJ | d}tt|} tdi t }tj	
 }|jdd|id|id| d||tj	jjid	 |r`|jd
d|idg id| id	 dS dS )ak  
    Send variables to the server side, and get vars from server
    side when server have finished running server side program.

    Args:
        endpoints (str): comma separated IP:PORT pairs in the order
                   of send_vars to send
        send_vars (list): variables to send to server
        sync (bool): whether to wait the request finish

    N,Sendsendr5   Out	endpointsepmapr7   Zsend_barrier)r<   )r8   listr   r   splitsetr   localsr   Zop_proto_and_checker_makerZkOpRoleAttrNamer:   ZOpRoleZRPC)r?   Z	send_varsZdummy_outputsyncr@   r   Zrpc_op_role_namer   r   r   r<      s6   





r<   c                 C   s   t |tksJ |du rg }nt|tr|g}t |tksJ | d}tt|} tdi t }|jdd|id|i| |dd |rP|jd	d|id
| id |S )aY  
    Receive variables from server side

    Args:
        endpoints (str): comma separated IP:PORT pairs in the order
                   of send_vars to send
        get_vars (list): vars to get from server after send completes.
        sync (bool): whether to wait the request finish

    Returns:
        list: list of received variables
    Nr;   Recvrecvr5   r>   )r?   r@   r7   Zfetch_barrierr?   )r8   r!   r9   )rF   )	r8   rA   r   r   rB   rC   r   rD   r:   )r?   Zget_varsZdummy_inputrE   r@   r   r   r   r   rF      s,   

rF   )NT)ZpaddleZbase.frameworkr   Z	frameworkr   r   ZstaticnnZcontrol_flowZ
BlockGuardr   r   r<   rF   r   r   r   r   <module>   s   
i.