o
    #jQ                     @   s   d dl Z d dlZd dlZd dlZd dlZd dlZd dl mZ d dlmZ d dl	m
Z
 d dlmZ g ZejdkrBejdkrBedZneZd	d
 Zdd Zdd Zdd ZG dd deZdd Zdd Zdd ZG dd dZd"ddZd#d d!ZdS )$    N)zip_longest)Queue)Thread)QUEUE_GET_TIMEOUT)      darwinforkc                    s   t |    fdd}|S )a  
    Cache the reader data into memory.

    Be careful that this method may take long time to process,
    and consume lots of memory. :code:`reader()` would only
    call once.

    Args:
        reader (generator): a reader object which yields
            data each time.

    Returns:
        generator: a decorated reader object which yields data from cached memory.

    Examples:
        .. code-block:: python

            >>> import paddle

            >>> def reader():
            ...     for i in range(3):
            ...         yield i
            ...
            >>> # All data is cached into memory
            >>> cached_reader = paddle.base.io.cache(reader)

            >>> for i in cached_reader():
            ...     print(i)
            0
            1
            2
    c                   3   s     E d H  d S N r   Zall_datar   X/var/www/html/Deteccion_Ine/venv/lib/python3.10/site-packages/paddle/reader/decorator.py__impl__P   s   zcache.<locals>.__impl__)tuple)readerr   r   r   r   cache-   s   
!r   c                        fdd}|S )a  
    Creates a data reader that outputs return value of function using
    output of each data reader as arguments.

    If input readers output the following data entries: 2 3,
    and the input func is mul(x, y),
    the output of the resulted reader will be 6.


    Args:
        func: a function to read data and compute result, the output of this function
              will be set as the output of the resulted data reader.
        readers (Reader|list of Reader): list of readers whose outputs will be used as arguments of func.

    Returns:
        the resulted data reader (Reader)

    Examples:

        .. code-block:: python

            >>> import paddle.reader
            >>> d = {"h": 0, "i": 1}
            >>> def func(x):
            ...     return d[x]
            >>> def reader():
            ...     yield "h"
            ...     yield "i"
            >>> map_reader_result = paddle.reader.map_readers(func, reader)
    c                  3   s6    g } D ]}|  |  qt g| R  E d H  d S r
   )appendmaprsrfuncreadersr   r   r   v   s
   zmap_readers.<locals>.readerr   )r   r   r   r   r   r   map_readersV       r   c                    r   )aV  
    This API creates a decorated reader that outputs the shuffled data.

    The output data from the origin reader will be saved into a buffer,
    and then shuffle the data. The size of buffer is determined by argument buf_size.

    Args:
        reader(callable): the original reader whose data will be shuffled.
        buf_size(int): the size of shuffled buffer.

    Returns:
        callable: a decorated reader.

    Examples:
        .. code-block:: python

            >>> # doctest: +SKIP('outputs are 0~4 unordered arrangement')
            >>> def reader():
            ...     for i in range(5):
            ...         yield i
            >>> shuffled_reader = paddle.reader.decorator.shuffle(reader, 3)
            >>> for e in shuffled_reader():
            ...     print(e)
            >>> # outputs are 0~4 unordered arrangement
    c                  3   st    g }  D ]}|  | t|  kr"t|  | D ]}|V  qg } qt| dkr6t|  | D ]}|V  q0d S d S )Nr   )r   lenrandomshuffle)bufebbuf_sizer   r   r   data_reader   s    



zshuffle.<locals>.data_readerr   )r   r$   r%   r   r#   r   r      s   r   c                     s    fdd}|S )aN  
    Use the input data readers to create a chained data reader. The new created reader
    chains the outputs of input readers together as its output, and it do not change
    the format of the outputs.

    **Note**:
        ``paddle.reader.chain`` is the alias of ``paddle.base.io.chain``, and
        ``paddle.base.io.chain`` is recommended to use.

    For example, if three input readers' outputs are as follows:
    [0, 0, 0],
    [10, 10, 10],
    [20, 20, 20].
    The chained reader will output:
    [0, 0, 0], [10, 10, 10], [20, 20, 20].

    Args:
        readers(list): input data readers.

    Returns:
        callable: the new chained data reader.

    Examples:
        .. code-block:: python

            >>> import paddle

            >>> def reader_creator_3(start):
            ...     def reader():
            ...         for i in range(start, start + 3):
            ...             yield [i, i, i]
            ...     return reader
            ...
            >>> c = paddle.reader.chain(reader_creator_3(0), reader_creator_3(10), reader_creator_3(20))
            >>> for e in c():
            ...     print(e)
            [0, 0, 0]
            [1, 1, 1]
            [2, 2, 2]
            [10, 10, 10]
            [11, 11, 11]
            [12, 12, 12]
            [20, 20, 20]
            [21, 21, 21]
            [22, 22, 22]

    c                  3   s0    g }  D ]}|  |  qtj|  E d H  d S r
   )r   	itertoolschainr   r   r   r   r      s
   zchain.<locals>.readerr   )r   r   r   r(   r   r'      s   1r'   c                   @      e Zd ZdS )ComposeNotAlignedN__name__
__module____qualname__r   r   r   r   r*          r*   c                     s(   | dd dd  fdd}|S )a#  
    Creates a data reader whose output is the combination of input readers.

    If input readers output following data entries:
    (1, 2)    3    (4, 5)
    The composed reader will output:
    (1, 2, 3, 4, 5)

    Args:
        readers (Reader|list of Reader): readers that will be composed together.
        check_alignment(bool, optional): Indicates whether the input readers are checked for
                              alignment. If True, whether input readers are aligned
                              correctly will be checked, else alignment will not be checkout and trailing outputs
                              will be discarded. Defaults to True.

    Returns:
        the new data reader (Reader).

    Examples:
        .. code-block:: python

            >>> def reader_creator_10(dur):
            ...     def reader():
            ...         for i in range(10):
            ...             yield i
            ...     return reader
            >>> reader = paddle.reader.decorator.compose(reader_creator_10(0), reader_creator_10(0))
    check_alignmentTc                 S   s   t | tr| S | fS r
   )
isinstancer   )xr   r   r   
make_tuple
  s   
zcompose.<locals>.make_tuplec                  3   s    g } D ]}|  |  q s$t|  D ]}ttt|dV  qd S t|  D ]}|D ]
}|d u r6tdq,ttt|dV  q(d S )Nr   z#outputs of readers are not aligned.)r   zipsumlistr   r   r*   )r   r   Zoutputsor0   r3   r   r   r   r     s"   zcompose.<locals>.reader)pop)r   kwargsr   r   r8   r   compose   s   r;   c                    s6   G dd d}|   fdd fdd}|S )ac  
    Creates a buffered data reader.

    The buffered data reader will read and save data entries into a
    buffer. Reading from the buffered data reader will proceed as long
    as the buffer is not empty.

    Args:
        reader(generator): the data reader to read from.
        size(int): max buffer size.

    Returns:
        generator: the buffered data reader.

    Examples:
        .. code-block:: python

            >>> import paddle

            >>> def reader():
            ...     for i in range(3):
            ...         yield i
            ...
            >>> # Create a buffered reader, and the buffer size is 2.
            >>> buffered_reader = paddle.reader.decorator.buffered(reader, 2)

            >>> # Output: 0 1 2
            >>> for i in buffered_reader():
            ...     print(i)
            0
            1
            2
    c                   @   r)   )zbuffered.<locals>.EndSignalNr+   r   r   r   r   	EndSignalG  r/   r<   c                    s"   | D ]}| | q|   d S r
   put)r   qdendr   r   read_workerL  s   zbuffered.<locals>.read_workerc                  3   s^     } t d}t| |fd}d|_|  | }| kr-|V  | }| ks d S d S )N)maxsizetargetargsT)r   r   daemonstartget)r   r?   tr!   rB   rC   r   sizer   r   r%   Q  s    
zbuffered.<locals>.data_readerr   )r   rM   r<   r%   r   rL   r   buffered$  s
   #rN   c                    r   )a  

    This API creates a decorated reader, and limits the max number of
    samples that reader could return.

    Args:
        reader(callable): the input reader.
        n(int): the max number of samples in the reader.

    Returns:
        callable: the decorated reader.

    Examples:
        .. code-block:: python

            >>> def reader():
            ...     for i in range(100):
            ...         yield i
            >>> firstn_reader = paddle.reader.decorator.firstn(reader, 5)
            >>> for e in firstn_reader():
            ...     print(e)
            0
            1
            2
            3
            4
    c                  3   s.    t  D ]\} }|  kr d S |V  qd S r
   )	enumerate)iitemnr   r   r   firstn_reader  s   zfirstn.<locals>.firstn_readerr   )r   rS   rT   r   rR   r   firstne  r   rU   c                   @   r)   )XmapEndSignalNr+   r   r   r   r   rV     r/   rV   Fc              	      sV   t  fddfddfddfdd 	f	d	d
}|S )a  
    Use multi-threads to map samples from reader by a mapper defined by user.

    Args:
        mapper (callable): a function to map the data from reader.
        reader (callable): a data reader which yields the data.
        process_num (int): thread number to handle original sample.
        buffer_size (int): size of the queue to read data in.
        order (bool): whether to keep the data order from original reader.
            Default False.

    Returns:
        callable: a decorated reader with data mapping.
    c                    s$   |  D ]}| | q|   d S r
   r=   )r   in_queuerP   rA   r   r   rC     s   
z!xmap_readers.<locals>.read_workerc                    s4   d}|  D ]}| ||f |d7 }q|   d S Nr      r=   )r   rW   Zin_orderrP   rA   r   r   order_read_worker  s
   

z'xmap_readers.<locals>.order_read_workerc                    sN   |   }t|ts||}|| |   }t|tr	|   |  d S r
   rJ   r1   rV   r>   )rW   	out_queuemappersampler   rA   r   r   handle_worker  s   



z#xmap_readers.<locals>.handle_workerc                    s   |   }t|ts4|\}}||}||d kr	 ||d ks|| |d  d7  < |   }t|tr	|   |  d S rX   r[   )rW   r\   r]   	out_orderZinsorderr^   r   rA   r   r   order_handle_worker  s   



z)xmap_readers.<locals>.order_handle_workerc                  3   s   t  } t  }dg}rn}t|| fd}d|_|  r%n}r/| ||fn| |f}g }tD ]}t||d}d|_|| q:|D ]}	|	  qM| }
t|
tsi|
V  | }
t|
tr]d}|k r| }
t|
tr}|d7 }n|
V  |k sod S d S )Nr   rE   TrY   )	r   r   rH   rI   ranger   rJ   r1   rV   )rW   r\   r`   rF   rK   rG   workersrP   Zworkerwr^   finish)	buffer_sizer_   r]   ra   rb   rZ   process_numrC   r   r   r   xreader  s@   




zxmap_readers.<locals>.xreader)rV   )r]   r   rh   rg   ra   ri   r   )
rg   rB   r_   r]   ra   rb   rZ   rh   rC   r   r   xmap_readers  s   	$rj   T  c              
      s   t jdkr	tdzddlW n ty* } ztd ddlW Y d}~nd}~ww tt	t
fr8tdks<J ddd fd	d
}fdd  fdd}|rZ|S |S )a=  
    This API use python ``multiprocessing`` to read data from ``readers`` parallelly,
    and then ``multiprocess.Queue`` or ``multiprocess.Pipe`` is used to merge
    these data. A separate process will be created for each reader in the
    ``readers`` list, please guarantee every reader can work independently
    to avoid conflicts in parallel environment.


    ``Multiprocess.Queue`` require the rw access right to /dev/shm, and it's not supported
    in some platforms.

    Parameters:
       readers (list( ``generator`` ) | tuple( ``generator`` )): a python ``generator`` list
           used to read input data
       use_pipe (bool, optional): control the inner API used to implement the multi-processing,
           default True - use ``multiprocess.Pipe`` which is recommended
       queue_size (int, optional): only useful when ``use_pipe`` is False - ``multiprocess.Queue``
           is used, default 1000. Increase this value can speed up the data reading, and more memory
           will be consumed.

    Returns:
        ``generator``: a new reader which can be run parallelly


    Example:

        .. code-block:: python

            >>> import paddle
            >>> import numpy as np

            >>> sample_files = ['sample_file_1', 'sample_file_2']

            >>> def fake_input_files():
            ...     with open(sample_files[0], 'wb') as f:
            ...         np.savez(f, a=np.array([1, 2]), b=np.array([3, 4]), c=np.array([5, 6]), d=np.array([7, 8]))
            ...     with open(sample_files[1], 'wb') as f:
            ...         np.savez(f, a=np.array([9, 10]), b=np.array([11, 12]), c=np.array([13, 14]))
            ...
            ...
            >>> def generate_reader(file_name):
            ...     # load data file
            ...     def _impl():
            ...         data = np.load(file_name)
            ...         for item in sorted(data.files):
            ...             yield data[item],
            ...     return _impl
            ...
            >>> if __name__ == '__main__':
            ...     # generate sample input files
            ...     fake_input_files()
            ...
            ...     with base.program_guard(base.Program(), base.Program()):
            ...         place = base.CPUPlace()
            ...         # the 1st 2 is batch size
            ...
            ...         image = paddle.static.data(name='image', dtype='int64', shape=[2, 1, 2])
            ...         paddle.static.Print(image)
            ...         # print detailed tensor info of image variable
            ...
            ...         reader = base.io.PyReader(feed_list=[image], capacity=2)
            ...
            ...         decorated_reader = paddle.reader.multiprocess_reader(
            ...             [generate_reader(sample_files[0]), generate_reader(sample_files[1])], False)
            ...
            ...         reader.decorate_sample_generator(decorated_reader, batch_size=2, places=[place])
            ...
            ...         exe = base.Executor(place)
            ...         exe.run(base.default_startup_program())
            ...
            ...         for data in reader():
            ...             res = exe.run(feed=data, fetch_list=[image])
            ...             print(res[0])
            [[[1 2]], [[3 4]]]
            [[[5 6]], [[7 8]]]
            [[[9 10]], [[11 12]]]
    win32z;The multiprocess_reader method is not supported on windows.r   NzThe `ujson` module is not found, use the `json` module, `ujson` encodes and decodes faster, you can install `ujson` through `pip install ujson`.z `readers` must be list or tuple.c              
   S   s^   z|  D ]}|d u rt d|| q|d  W d S  ty. } z|d |d }~ww )Nzsample has None )
ValueErrorr>   	Exception)r   queuer^   r!   r   r   r   _read_into_queueT  s   

z-multiprocess_reader.<locals>._read_into_queuec               
   3   s    t } D ]}t j || fd}|  qt}d}||k rXz| jtd}W n ty= } zt	d |d }~ww |d u rG|d7 }n|dkrOt
d|V  ||k s"d S d S )NrE   r   )timeoutzFmultiprocess_reader failed to get data from the multiprocessing.Queue.rY   rm   zFmultiprocess_reader failed to put data into the multiprocessing.Queue.)fork_contextr   ProcessrI   r   rJ   r   ro   loggingerrorrn   )rp   r   p
reader_num
finish_numr^   r!   )rq   
queue_sizer   r   r   queue_reader_  s6   


z)multiprocess_reader.<locals>.queue_readerc              
      s   z%|  D ]}|d u rt d| | q| d  |  W d S  ty? } z| d |  |d }~ww )Nzsample has None!rm   )rn   senddumpsclosero   )r   connr^   r!   )jsonr   r   _read_into_pipe{  s   
z,multiprocess_reader.<locals>._read_into_pipec            
      3   s    g } D ]}t  \}}| | t j ||fd}|  qt}d}g }||k rp|D ]}| | q.g }| D ]/}| }	|	d u rU|d7 }|	  || q:|	dkrf|	  || t
d|	V  q:||k s,d S d S )NrE   r   rY   rm   zFmultiprocess_reader failed to send data into the multiprocessing.Pipe.)rs   ZPiper   rt   rI   r   removeloadsrecvr~   rn   )
Zconnsr   Zparent_connZ
child_connrw   rx   ry   Zconn_to_remover   r^   )r   r   r   r   r   pipe_reader  s<   


z(multiprocess_reader.<locals>.pipe_reader)sysplatformNotImplementedErrorZujsonro   warningswarnr   r1   r6   r   r   )r   Zuse_piperz   r!   r{   r   r   )r   rq   r   rz   r   r   multiprocess_reader  s2   
O r   )F)Trk   )r&   ru   multiprocessingr   r   r   r   rp   r   	threadingr   Zpaddle.base.readerr   __all__version_infor   Zget_contextrs   r   r   r   r'   rn   r*   r;   rN   rU   rV   rj   r   r   r   r   r   <module>   s2   ))-;9A)
`