o
    0jK                     @   s   d dl Z d dlZd dlZd dlZd dlmZmZmZ d dl	m
Z
mZ d dlmZ ddlmZ ddlmZ g dZG d	d
 d
e
ZdefddZdd ZG dd dZdadd Zdd Zdd Zd"defddZd#ddZedG d d! d!eZ dS )$    N)AnyDictOptional)	BaseModelmodel_validator)Literal   )logging)class_requires_deps)fastdeploy-servervllm-serversglang-servermlx-vlm-serverllama-cpp-serverc                   @   sd   e Zd ZU dZed ed< dZee ed< dZ	e
ed< dZeeeef  ed< ed	d
dd ZdS )GenAIConfignative)r   r   r   r   r   r   backendN
server_url   max_concurrencyclient_kwargsafter)modec                 C   s.   | j tv r| jd u rtdt| j  d| S )Nz(`server_url` must not be `None` for the z	 backend.)r   SERVER_BACKENDSr   
ValueErrorreprself r   f/var/www/html/Deteccion_Ine/venv/lib/python3.10/site-packages/paddlex/inference/models/common/genai.pycheck_server_url1   s
   zGenAIConfig.check_server_url)__name__
__module____qualname__r   r   __annotations__r   r   strr   intr   r   r   r   r    r   r   r   r   r   $   s   
 r   returnc                 C   s.   | du rdS t | tr| nt| }|jtv S )z:Return whether the config targets a remote server backend.NF)
isinstancer   Zmodel_validater   r   )genai_configZ	validatedr   r   r   uses_server_backend:   s   
r*   c                 C   s
   t |  S N)r*   )r)   r   r   r   need_local_modelF   s   
r,   c                   @   sT   e Zd ZdZdd Zdd Zdd Zdd	efd
dZdd Z	dd Z
edd ZdS )_AsyncThreadManageraH  
    Manages an asyncio event loop running in a dedicated background thread.

    This class provides a bridge between synchronous code and async operations,
    allowing sync code to submit coroutines to be executed in the background
    event loop.

    Thread Safety:
        - Only `run_async()` and `stop()` are designed to be called from other threads
        - All internal asyncio operations are executed within the event loop thread
        - Uses `run_coroutine_threadsafe()` and `call_soon_threadsafe()` for cross-thread
          communication as recommended by Python documentation

    Lifecycle:
        1. `start()` - Creates a daemon thread running `loop.run_forever()`
        2. `run_async(coro)` - Submits coroutines to the loop (returns Future)
        3. `stop(timeout)` - Gracefully shuts down: waits for tasks, cancels remaining,
           then cleans up resources

    Example:
        manager = _AsyncThreadManager()
        manager.start()
        future = manager.run_async(some_async_function())
        result = future.result(timeout=10)
        manager.stop()
    c                 C   s0   d | _ d | _d| _t | _t | _d| _d S )NF)loopthreadstopped	threadingEvent_event_start_event_cleanup_done_shutting_downr   r   r   r   __init__g   s   


z_AsyncThreadManager.__init__c                    s`      rdS d _d _ j   j   fdd}tj|dd _ j	   j
  dS )a9  
        Start the background event loop thread.

        This method is idempotent - calling it multiple times has no effect
        if the loop is already running.

        The method blocks until the event loop is fully initialized and ready
        to accept tasks (synchronized via threading.Event).
        NFc                      sd   t   _t  j  j  z j  W     j  d _	d S     j  d _	w )NT)
asyncioZnew_event_loopr.   Zset_event_loopr3   setZrun_forever_cleanup_loop_internalr4   r0   r   r   r   r   	_run_loop   s   




z,_AsyncThreadManager.start.<locals>._run_loopT)targetdaemon)
is_runningr5   r0   r3   clearr4   r1   Threadr/   startwait)r   r:   r   r   r   r@   q   s   



z_AsyncThreadManager.startc              
   C   s  | j du rdS zwzBt| j }|r1tdt| d |D ]}|  q| j tj|ddi | j | j 	  t
| j drI| j | j   W n tyd } ztd|  W Y d}~nd}~ww W | j   td dS W | j   td dS | j   td w )	a#  
        Perform cleanup operations within the event loop thread.

        IMPORTANT: This method MUST be called from the event loop thread
        (i.e., in the finally block of _run_loop) because asyncio operations
        like `all_tasks()`, `task.cancel()` are NOT thread-safe.

        Cleanup sequence:
        1. Cancel all remaining tasks
        2. Wait for cancellation to complete
        3. Shutdown async generators (prevents ResourceWarning)
        4. Shutdown default executor (Python 3.9+)
        5. Close the event loop
        NzCancelling z pending tasks during cleanupreturn_exceptionsTshutdown_default_executorz!Error during event loop cleanup: zEvent loop closed successfully)r.   r7   	all_tasksr	   debuglencancelZrun_until_completegatherZshutdown_asyncgenshasattrrC   	Exceptionwarningclose)r   pendingtasker   r   r   r9      s6   




z*_AsyncThreadManager._cleanup_loop_internal      @timeoutc              
      s$     sdS d _ fdd}zt|  j}|jd d W n/ tjjy8   t	
dd  d Y n tyQ } zt	
d	|  W Y d}~nd}~ww z
 j jj W n	 tye   Y nw  jjd
d}|stt	
d  jdur jjdd  j rt	
d d _d _dS )a  
        Gracefully stop the event loop.

        This method performs a graceful shutdown sequence:
        1. Sets shutting_down flag to reject new tasks
        2. Schedules a graceful shutdown coroutine in the event loop that:
           - Waits for pending tasks to complete (with timeout)
           - Cancels tasks that don't complete in time
        3. Signals the loop to stop
        4. Waits for cleanup to complete in the loop thread
        5. Joins the background thread

        Args:
            timeout: Maximum seconds to wait for pending tasks to complete.
                    Tasks not completed within this time will be cancelled.
                    Default is 5.0 seconds.

        Thread Safety:
            This method is safe to call from any thread. It uses only thread-safe
            mechanisms (run_coroutine_threadsafe, call_soon_threadsafe, Events)
            to communicate with the event loop thread.
        NTc                     s   t    fddt jD } | std dS tdt|  d d t j| t jdI dH \}}|r[t	d	t| d
 d |D ]}|
  qIt j|ddiI dH  td dS )z
            Graceful shutdown coroutine that runs inside the event loop.

            All asyncio operations here are thread-safe because this coroutine
            executes in the event loop thread.
            c                    s    g | ]}| ur|  s|qS r   )done).0tcurrent_taskr   r   
<listcomp>   s
    zH_AsyncThreadManager.stop.<locals>._graceful_shutdown.<locals>.<listcomp>z,No pending tasks to wait for during shutdownNzGraceful shutdown: waiting for z pending tasks (timeout=zs))rQ   Zreturn_whenzGraceful shutdown: cancelling z$ tasks that did not complete within srB   Tz%Graceful shutdown coroutine completed)r7   rV   rD   r.   r	   rE   rF   rA   ZALL_COMPLETEDrK   rG   rH   )rM   rR   Zstill_pendingrN   r   rQ   rU   r   _graceful_shutdown   s4   



z4_AsyncThreadManager.stop.<locals>._graceful_shutdowng       @rQ   z"Graceful shutdown timed out after zs, forcing loop stopz Error during graceful shutdown: rP   z-Event loop cleanup did not complete within 5szYBackground thread did not terminate in time. Some resources may not be properly released.)r=   r5   r7   run_coroutine_threadsafer.   result
concurrentfuturesTimeoutErrorr	   rK   rJ   Zcall_soon_threadsafestopRuntimeErrorr4   rA   r/   joinis_alive)r   rQ   rZ   futurerO   Zcleanup_completedr   rY   r   ra      s>   ,



z_AsyncThreadManager.stopc                 C   s0   |   std| jrtdt|| j}|S )a  
        Submit a coroutine to be executed in the background event loop.

        This is the primary method for bridging sync and async code.
        The coroutine will be scheduled to run in the background thread's
        event loop.

        Args:
            coro: A coroutine object to be executed

        Returns:
            concurrent.futures.Future: A future that can be used to:
                - Wait for the result: future.result(timeout=...)
                - Check completion: future.done()
                - Cancel the task: future.cancel()

        Raises:
            RuntimeError: If the event loop is not running or is shutting down

        Thread Safety:
            This method is safe to call from any thread. It uses
            `asyncio.run_coroutine_threadsafe()` which is explicitly
            documented as thread-safe.

        Example:
            future = manager.run_async(fetch_data())
            result = future.result(timeout=30)
        zEvent loop is not runningzxEvent loop is shutting down, cannot accept new tasks. Please ensure all async operations complete before calling stop().)r=   rb   r5   r7   r\   r.   )r   corore   r   r   r   	run_async=  s   z_AsyncThreadManager.run_asyncc                 C   s   | j duo| j   o| j S )z
        Check if the event loop is currently running and accepting tasks.

        Returns:
            bool: True if the loop is running and not closed/stopped
        N)r.   	is_closedr0   r   r   r   r   r=   g  s   z_AsyncThreadManager.is_runningc                 C      | j S )a   
        Check if the event loop is in the process of shutting down.

        During shutdown, new tasks will be rejected but existing tasks
        are still being processed.

        Returns:
            bool: True if shutdown has been initiated
        )r5   r   r   r   r   is_shutting_downp  s   z$_AsyncThreadManager.is_shutting_downNrP   )r!   r"   r#   __doc__r6   r@   r9   floatra   rg   r=   propertyrj   r   r   r   r   r-   K   s    
)2q*	r-   c                   C   s   t d u rt a t S r+   )_async_thread_managerr-   r   r   r   r   get_async_manager  s   rp   c                  C   s   t  } |  o
| j S )z
    Check if the async event loop is ready to accept tasks.

    Returns:
        bool: True if the event loop is running and not shutting down
    )rp   r=   rj   managerr   r   r   is_aio_loop_ready  s   rs   c                  C   s*   t  } |  s|   t| j dS dS )aF  
    Start the global async event loop if not already running.

    This function also registers an atexit handler to ensure graceful
    shutdown when the program exits.

    Note:
        The atexit handler calls stop() which performs graceful shutdown,
        waiting for pending tasks to complete before terminating.
    N)rp   r=   r@   atexitregisterra   rq   r   r   r   start_aio_loop  s
   rv   rP   rQ   c                 C   s"   t  }| r|j| d dS dS )a  
    Gracefully close the global async event loop.

    This function initiates a graceful shutdown sequence that:
    1. Waits for pending tasks to complete (up to timeout)
    2. Cancels any remaining tasks
    3. Cleans up resources (async generators, executor)
    4. Closes the event loop

    Args:
        timeout: Maximum seconds to wait for pending tasks to complete.
                Default is 5.0 seconds.
    r[   N)rp   r=   ra   )rQ   rr   r   r   r   close_aio_loop  s   rw   Fc              
   C   s   t  }| s
t  | std|jrtd|| }|r"|S z|j|dW S  tjj	y<   t
d| d   tyP } z	t
d|   d}~ww )ao  
    Execute a coroutine in the background event loop.

    This is the main entry point for running async code from sync contexts.
    It automatically starts the event loop if not already running.

    Args:
        coro: The coroutine to execute
        return_future: If True, return a Future immediately without waiting.
                      If False (default), block until the coroutine completes.
        timeout: Maximum seconds to wait for completion (only used when
                return_future=False). None means wait indefinitely.

    Returns:
        If return_future=True: concurrent.futures.Future
        If return_future=False: The result of the coroutine

    Raises:
        RuntimeError: If the event loop fails to start or is shutting down
        concurrent.futures.TimeoutError: If timeout is exceeded
        Exception: Any exception raised by the coroutine

    Example:
        # Blocking call
        result = run_async(fetch_data(), timeout=30)

        # Non-blocking call
        future = run_async(fetch_data(), return_future=True)
        # ... do other work ...
        result = future.result()
    zFailed to start event loopz4Event loop is shutting down, cannot accept new tasksr[   zTask timed out after z secondszTask failed with error: N)rp   r=   rv   rb   rj   rg   r]   r^   r_   r`   r	   rK   rJ   error)rf   return_futurerQ   rr   re   rO   r   r   r   rg     s(    
rg   openaic                       sJ   e Zd Z	d fdd	Zedd Zddd	d
Zdd Zdd Z  Z	S )GenAIClientr   Nc                    sv   ddl m} t   || _|| _d|vrd|d< |dd|i|| _|d u r/t|  dd}|| _	t
| j| _d S )	Nr   )AsyncOpenAIZapi_keynullbase_url
   r[   r   )rz   r|   superr6   r   Z_max_concurrency_clientrg   _get_model_name_model_namer7   	Semaphore
_semaphore)r   r   r~   r   Z
model_namekwargsr|   	__class__r   r   r6     s   
zGenAIClient.__init__c                 C   ri   r+   )r   r   r   r   r   openai_client  s   zGenAIClient.openai_clientFry   c                   s*    fdd}t |d j|d||dS )Nc               	      s\    j 4 I d H   jjjj| i |I d H W  d   I d H  S 1 I d H s'w   Y  d S r+   )r   r   ZchatZcompletionscreate)argsr   r   r   r   &_create_chat_completion_with_semaphore  s   

0zRGenAIClient.create_chat_completion.<locals>._create_chat_completion_with_semaphore)modelmessagesr   r   )rg   r   )r   r   ry   r   r   r   r   r   create_chat_completion  s   z"GenAIClient.create_chat_completionc                 C   s   t | j dd d S )N   r[   )rg   r   rL   r   r   r   r   rL   #  s   zGenAIClient.closec              
      sN   z| j j I d H }W n ty  } ztd| |d }~ww |jd jS )Nz@Failed to get the model list from the OpenAI-compatible server: r   )r   modelslistrJ   rb   dataid)r   r   rO   r   r   r   r   &  s   zGenAIClient._get_model_name)r   N)
r!   r"   r#   r6   rn   r   r   rL   r   __classcell__r   r   r   r   r{     s    
r{   rk   )FN)!r7   rt   concurrent.futuresr^   r1   typingr   r   r   Zpydanticr   r   Ztyping_extensionsr   utilsr	   Z
utils.depsr
   r   r   boolr*   r,   r-   ro   rp   rs   rv   rm   rw   rg   objectr{   r   r   r   r   <module>   s0   	  5
>