
    iQ                        d Z ddlZddlZddlZddlZddlZddlZddlZddlZddl	m
Z
mZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ dd	lmZ er2ddlZddlZddlZdd
lmZmZmZmZm Z  ddl!m"Z" ddl#m$Z$ ddl%m&Z& ddl'm(Z(  ejR                  e*      Z+dZ, G d dejZ                        Z. G d d      Z/ G d de0      Z1ddddiZ2ddde3dz  fdZ4 G d d       Z5 G d! d"      Z6d#ed$e7de8fd%Z9 G d& d'      Z: G d( d)      Z;d*e<ddfd+Z=d9d,Z> G d- d.      Z? G d/ d0e
      Z@ G d1 d2e@      ZA G d3 d4e@      ZB G d5 d6      ZC G d7 d8      ZDy):z?
Shared types, constants, and utilities for the serving layer.
    N)ABCabstractmethod)Callable)Future)BytesIO)Queue)TYPE_CHECKING)logging)ContinuousBatchingConfigGenerationConfigPreTrainedModelPreTrainedTokenizerFastProcessorMixin)ContinuousBatchingManager)GenerationOutput)	Scheduler   )ModelManagerzx-request-idc                       e Zd ZdZdZdZdZy)ModalityLLMVLMSTTTTSN)__name__
__module____qualname__r   r   r   r        o/var/www/vps2.regionflexible.com/Desarrollo/venv/lib/python3.12/site-packages/transformers/cli/serving/utils.pyr   r   =   s    
C
C
C
Cr   r   c                       e Zd ZdZdefdZy)_StreamErrorz5Sentinel to signal an error from the generate thread.msgc                     || _         y N)r#   )selfr#   s     r    __init__z_StreamError.__init__G   s	    r   N)r   r   r   __doc__strr'   r   r   r    r"   r"   D   s    ?C r   r"   c                       e Zd ZdZy)_GenerationCancelledzERaised inside ``DirectStreamer.put()`` to abort ``model.generate()``.N)r   r   r   r(   r   r   r    r+   r+   K   s    Or   r+   qwenz<tool_call>z</tool_call>)startendmodelr   returnc                     | j                   j                  d   j                         }t        D ]  }||v st        |   c S  y)a.  Return the tool call token format for a model, if supported.

    Args:
        model (`PreTrainedModel`): The loaded model.

    Returns:
        `dict | None`: A dict ``{"start": str, "end": str}`` with the model's tool call
        delimiters, or ``None`` if the model family is not recognized.
    r   N)configarchitectureslower_TOOL_CALL_TOKENS)r/   architecturefamilys      r    detect_tool_formatr8   [   sH     <<--a0668L# -\!$V,,- r   c                       e Zd ZdZdefdZ e       Zdedeez  dz  fdZ	e
dedeeef   dz  fd	       Ze
dededee   dz  fd
       Zdededz  fdZy)ToolCallParseru]  Parses tool calls from model output.

    The model emits tool calls as structured text between start/end tokens
    (e.g. ``<tool_call>{"name": "fn", "arguments": {...}}</tool_call>``).

    **Streaming** (``feed``): buffers tokens between start/end markers, parses
    the complete block when the end marker is seen, returns a ``ChoiceDeltaToolCall``.

    **Non-streaming** (``parse``): extracts all tool call blocks from complete text.

    Usage::

        parser = ToolCallParser(tool_format={"start": ..., "end": ...})
        for text_chunk in streamer:
            result = parser.feed(text_chunk)
            if result is None:
                # Normal text — emit as content
            elif result is ToolCallParser.CONSUMED:
                # Buffering — skip
            else:
                # result is a ChoiceDeltaToolCall — emit it
    tool_formatc                 .    || _         d| _        d| _        y )NF )_tokens_inside_buffer)r&   r;   s     r    r'   zToolCallParser.__init__   s    "r   textr0   Nc                    |j                         | j                  d   k(  rd| _        d| _        | j                  S |j                         | j                  d   k(  rGd| _        | j                  j                         }d| _        | j                  |      xs | j                  S | j                  r!| xj                  |z  c_        | j                  S y)u  Feed a text chunk (streaming).

        Returns:
        - ``None`` — normal text, not a tool token. Emit as content.
        - ``CONSUMED`` — token consumed internally (buffering/markers). Skip.
        - A ``ChoiceDeltaToolCall`` — emit as a tool call delta.
        r-   Tr=   r.   FN)stripr>   r?   r@   CONSUMED_parse_block)r&   rA   blocks      r    feedzToolCallParser.feed   s     ::<4<<00DLDL== ::<4<<.. DLLL&&(EDL$$U+<t}}<<<LLD L== r   rF   c                     | syt        j                  |       }|j                  d      }|y|j                  di       }|t        j                  |      fS )zJExtract (name, arguments_json) from a tool call block, or None if invalid.Nname	arguments)jsonloadsgetdumps)rF   parsedrI   rJ   s       r    _extract_name_and_argsz%ToolCallParser._extract_name_and_args   sR     E"zz&!<JJ{B/	TZZ	***r   c                 ^   |d   |d   }}g }d}	 | j                  ||      }|dk  rn| j                  ||t        |      z         }|dk  rn]t        j                  | |t        |      z   | j	                               }||j                  |d   |d   d       |t        |      z   }|r|S dS )zParse tool calls from complete text.

        Returns a list of ``{"name": str, "arguments": str}`` dicts, or ``None`` if none found.
        r-   r.   r   Nr   rI   rJ   )findlenr:   rP   rC   append)	rA   r;   r-   r.   
tool_callsposseresults	            r    parsezToolCallParser.parse   s     !);u+=s
		%%A1u		#q3u:~.A1u#::4CJQR;S;Y;Y;[\F!!!6!96!9"MNc#h,C  (z1T1r   c                 @    | j                  |      }|y|d   |d   dS )zVParse a buffered tool call block. Returns ``{"name": str, "arguments": str}`` or None.Nr   r   rR   )rP   )r&   rF   rZ   s      r    rE   zToolCallParser._parse_block   s-    ,,U3>q	q	::r   )r   r   r   r(   dictr'   objectrD   r)   rG   staticmethodtuplerP   listr[   rE   r   r   r    r:   r:   l   s    .D  xH $!5 2 	+c 	+eCHo.D 	+ 	+ 2C 2d 2tDzD/@ 2 2*;# ;$+ ;r   r:   c                   p    e Zd ZdZdedefdZdededz  ddfd	Zded
ededz  ddfdZ	deddfdZ
ddZy)DownloadAggregatora	  Aggregates byte-progress across multiple concurrent download tqdm bars.

    huggingface_hub opens one tqdm bar per file shard. This class tracks them all and emits
    a single aggregate ``{"stage": "download", "progress": {...}}`` event whenever any updates.
    enqueuemodel_idc                 <    || _         || _        i | _        d | _        y r%   )rd   r/   barslast_emitted_current)r&   rd   re   s      r    r'   zDownloadAggregator.__init__   s    
79	04!r   bar_idtotalNr0   c                 F    d|f| j                   |<   | j                          y)z6Register a new download bar with its total byte count.r   Nrg   _emit)r&   ri   rj   s      r    registerzDownloadAggregator.register   s    J		&

r   currentc                 F    ||f| j                   |<   | j                          y)z>Update a bar's current byte count and emit aggregate progress.Nrl   )r&   ri   ro   rj   s       r    updatezDownloadAggregator.update   s    $e,		&

r   c                      y r%   r   )r&   ri   s     r    closezDownloadAggregator.close       r   c                 T   t        d | j                  j                         D              }|| j                  k(  ry || _        | j                  j                         D cg c]
  \  }}|	| }}}|rt        |      nd }| j	                  d| j
                  d||dd       y c c}}w )Nc              3   &   K   | ]	  \  }}|  y wr%   r   ).0c_s      r    	<genexpr>z+DownloadAggregator._emit.<locals>.<genexpr>   s     ;1!;s   loadingdownloadro   rj   statusr/   stageprogress)sumrg   valuesrh   rd   r/   )r&   agg_currentry   ttotals	agg_totals         r    rm   zDownloadAggregator._emit   s    ;		(8(8(:;;$333$/! $		 0 0 2D1am!DD#)CKt	##(3iH		
 Es   
B$*B$r0   N)r   r   r   r(   r   r)   r'   intrn   rq   rs   rm   r   r   r    rc   rc      su    5 5C 5s 3: $ 
S 3 sTz d 
C D 
r   rc   callbackre   c                 N     ddl m} t                G  fdd|      }|S )u  Create a tqdm subclass that routes progress to a callback.

    Bars with ``unit="B"`` are download bars — aggregated via ``DownloadAggregator``.
    Other bars (e.g. "Loading weights") emit ``weights`` stage events.

    Args:
        callback (`callable`): Called with a dict payload
            ``{"status": "loading", "model": ..., "stage": ..., "progress": ...}``.
        model_id (`str`): The model ID (included in progress payloads).

    Returns:
        A tqdm subclass that forwards progress to *callback*.
    r   )tqdmc                   L     e Zd Z fdZdfd	ZfdZ fdZ xZS ).make_progress_tqdm_class.<locals>.ProgressTqdmc                 
   |j                  d      xs d| _        d|d<   t        |   |i | d| _        d| _        | j                  dk(  r7t        |       | _        j                  | j                  | j                         y y )NunititTdisabler   B)
rM   sse_unitsuperr'   nlast_emittedid_bar_idrn   rj   )r&   argskwargs	__class__download_aggregators      r    r'   z7make_progress_tqdm_class.<locals>.ProgressTqdm.__init__  sw    "JJv.6$DM $F9Gd-f-DF "D}}#!$x#,,T\\4::F $r   c                 X   |d}| xj                   |z  c_         | j                  dk(  r2j                  | j                  | j                   | j                         y | j                   | j
                  k7  r6| j                   | _         dd| j                   | j                  dd       y y Nr   r   r{   weightsr}   r~   )r   r   rq   r   rj   r   )r&   r   r   r   re   s     r    rq   z5make_progress_tqdm_class.<locals>.ProgressTqdm.update  s    yFFaKF}}##**4<<L4,,,$(FF!"+!)!*04$L	 -r   c           	   3     K   | j                   D ]  }| xj                  dz  c_        | j                  dk(  r2j                  | j                  | j                  | j
                         nN| j                  | j                  k7  r5| j                  | _         dd| j                  | j
                  dd       |  y wr   )iterabler   r   rq   r   rj   r   )r&   itemr   r   re   s     r    __iter__z7make_progress_tqdm_class.<locals>.ProgressTqdm.__iter__(  s      !==C''..t||TVVTZZPVVt000(,D%&/%-%.48FFTZZ(P	 
s   B;B>c                 v    | j                   dk(  rj                  | j                         t        |           y )Nr   )r   rs   r   r   )r&   r   r   s    r    rs   z4make_progress_tqdm_class.<locals>.ProgressTqdm.close9  s*    }}##))$,,7GMOr   )r   )r   r   r   r'   rq   r   rs   __classcell__)r   r   r   re   s   @r    ProgressTqdmr     s    	G	"	"	 	r   r   )	tqdm.autor   rc   )r   re   	base_tqdmr   r   s   ``  @r    make_progress_tqdm_classr      s/     ,,Xx@0 0y 0d r   c            	       l    e Zd ZdZ	 ddddej
                  dej                  defdZdd	Z	dd
Z
ddZy)DirectStreamera  Streamer for ``model.generate()`` (used by :class:`GenerateManager`).

    Implements the ``put``/``end`` protocol that ``model.generate()`` expects:
    generate calls ``put(token_tensor)`` after each decode step, and ``end()``
    when generation is complete. Tokens are decoded incrementally via the Rust
    ``DecodeStream`` (O(1) per token) and pushed as text to an asyncio.Queue.
    	tokenizertokenizers.Tokenizerloopqueueskip_special_tokensc                     ddl m} || _        || _        || _         |g |      | _        d| _        t        j                         | _	        d| _
        y)a  
        Args:
            tokenizer: The Rust tokenizer (``tokenizer._tokenizer``).
            loop (`asyncio.AbstractEventLoop`): The event loop to push decoded text to.
            queue (`asyncio.Queue`): The queue that receives decoded text chunks.
            skip_special_tokens (`bool`, *optional*, defaults to `True`):
                Whether to strip special tokens during decoding.
        r   DecodeStreamTN)tokenizers.decodersr   
_tokenizer_loop_queue_decode_stream_first	threadingEvent
_cancelledtotal_tokens)r&   r   r   r   r   r   s         r    r'   zDirectStreamer.__init__J  sI     	5#
*2/BC#//+r   Nc                 z   | j                   j                         r
t               | j                  rd| _        y|j	                         D ]p  }| xj
                  dz  c_        | j                  j                  | j                  |      }|A| j                  j                  | j                  j                  |       r y)zHCalled by ``model.generate()`` after each decode step with new token(s).FNr   )r   is_setr+   r   tolistr   r   stepr   r   call_soon_threadsafer   
put_nowait)r&   valuetoken_idrA   s       r    putzDirectStreamer.putc  s    ??!!#&((;;DK 	NH"&&++DOOXFD

//0F0FM		Nr   c                 d    | j                   j                  | j                  j                  d       y)z;Called by ``model.generate()`` when generation is complete.N)r   r   r   r   r&   s    r    r.   zDirectStreamer.endq  s     

''(>(>Er   c                 8    | j                   j                          y)zWSignal cancellation. The next ``put()`` call will raise and abort ``model.generate()``.N)r   setr   s    r    cancelzDirectStreamer.cancelu  s    r   )T)r   torch.Tensorr0   Nr   )r   r   r   r(   asyncioAbstractEventLoopr   boolr'   r   r.   r   r   r   r    r   r   A  sR     %)) '' }}	
 "2NFr   r   c            
       l    e Zd ZdZdddedddej                  dej                  f
d	ZddZ	ddZ
ddZy
)
CBStreamera  Streamer for continuous batching (used by :class:`CBGenerateManager`).

    Same ``put``/``end`` protocol as :class:`DirectStreamer`, but called manually
    by :class:`CBGenerateManager` instead of by ``model.generate()``:
    ``put(output)`` receives a CB ``GenerationOutput``, decodes new tokens, and
    pushes text to the asyncio.Queue. ``end()`` signals the stream is complete.
    
cb_managerr   
request_idr   r   r   r   c                     ddl m} || _        || _        || _        || _        || _         |g d      | _        d| _        d| _	        y)a  
        Args:
            cb_manager (`ContinuousBatchingManager`): The CB manager instance.
            request_id (`str`): The request ID to track in the CB scheduler.
            tokenizer: The Rust tokenizer (``tokenizer._tokenizer``).
            loop (`asyncio.AbstractEventLoop`): The event loop to push decoded text to.
            queue (`asyncio.Queue`): The queue that receives decoded text chunks.
        r   r   TN)
r   r   _cb_request_idr   r   r   r   	_prev_lenr   )r&   r   r   r   r   r   r   s          r    r'   zCBStreamer.__init__  sI      	5%
#*2t4r   Nc                 *   |j                   | j                  d }t        |j                         | _        |D ][  }| xj                  dz  c_        | j                  j                  | j                  |      }|A| j                  j                  |       ] y)zLDecode new tokens from a CB ``GenerationOutput`` and push text to the queue.Nr   )	generated_tokensr   rT   r   r   r   r   r   r   )r&   output
new_tokensr   rA   s        r    r   zCBStreamer.put  s    ,,T^^-=>
V445" 	-H"&&++DOOXFD&&t,		-r   c                 :    | j                   j                  d       y)zSignal end of stream.N)r   r   r   s    r    r.   zCBStreamer.end  s    t$r   c                 N    | j                   j                  | j                         y)zCancel the CB request.N)r   cancel_requestr   r   s    r    r   zCBStreamer.cancel  s     0 01r   )r   r   r0   Nr   )r   r   r   r(   r)   r   r   r   r'   r   r.   r   r   r   r    r   r   z  sU    /  *	
 '' }}6-%2r   r   seedc                 0    ddl } |j                  |        y)z8Set the PyTorch random seed for reproducible generation.r   N)torchmanual_seed)r   r   s     r    set_torch_seedr     s    Edr   c                  v    ddl } | j                  j                         r| j                  j                          yy)z+Empty the CUDA cache if a GPU is available.r   N)r   cudais_availableempty_cache)r   s    r    reset_torch_cacher     s*    zz 

  !r   c                   J    e Zd ZdZd ZddZdefdZdej                  fdZ	y)	InferenceThreadzPersistent thread for ``model.generate()`` calls.

    ``torch.compile`` with CUDA graphs stores state in thread-local storage.
    All inference must run on the same thread to avoid corrupted graph state.
    c                     t               | _        t        j                  | j                  d      | _        | j
                  j                          y )NT)targetdaemon)r   r   r   Thread_run_threadr-   r   s    r    r'   zInferenceThread.__init__  s3    "W ''tyyFr   r0   Nc                 D   	 | j                   j                         \  }}}}}	  ||i |}||j                  |j                  |       n|j                  |       Z# t        $ r:}||j                  |j
                  |       n|j                  |       Y d }~?d }~ww xY wr%   )r   rM   r   
set_result	Exceptionset_exception)r&   fnr   r   futurer   rZ   rY   s           r    r   zInferenceThread._run  s    -1[[__->*Bffd
,T,V,#--f.?.?H%%f-   ,#--f.B.BAF((+	,s   8A 	B%0BBc                 Z    t               }| j                  j                  ||||df       |S )ESubmit a callable to the inference thread. Returns a blocking Future.N)r   r   r   )r&   r   r   r   r   s        r    submitzInferenceThread.submit  s)    T66489r   c                     t        j                         }|j                         }| j                  j	                  |||||f       |S zOSubmit a callable to the inference thread. Returns an awaitable asyncio.Future.)r   get_running_loopcreate_futurer   r   )r&   r   r   r   r   r   s         r    async_submitzInferenceThread.async_submit  s>    '')##%T66489r   r   )
r   r   r   r(   r'   r   r   r   r   r   r   r   r    r   r     s-    
,V 7>> r   r   c                       e Zd ZdZedddddeddd	ed
eej                  df   fd       Z
edddddeddd	ed
eeeee   f   fd       Zedd       Zy)BaseGenerateManageru   Base class for generation managers.

    Subclasses:
    - :class:`GenerateManager` — sequential ``model.generate()`` on a persistent thread.
    - :class:`CBGenerateManager` — continuous batching with paged attention.
    r/   r   	processor(ProcessorMixin | PreTrainedTokenizerFastinputs
gen_configr   r   r0   zDirectStreamer | CBStreamerc                      y)ax  Start streaming generation.

        Args:
            model (`PreTrainedModel`): The loaded model.
            processor: The processor or tokenizer for decoding.
            inputs (`dict`): Tokenized inputs (tensors for sequential, lists for CB).
            gen_config (`GenerationConfig`): Generation parameters.
            request_id (`str`): Unique request identifier.

        Returns:
            `tuple[asyncio.Queue, DirectStreamer | CBStreamer]`: A ``(queue, streamer)`` pair
            where *queue* yields ``str | _StreamError | None`` and *streamer* exposes
            ``.total_tokens`` and ``.cancel()``.
        Nr   r&   r/   r  r  r  r   s         r    generate_streamingz&BaseGenerateManager.generate_streaming      r   c                      y)a  Run generation to completion.

        Args:
            model (`PreTrainedModel`): The loaded model.
            processor: The processor or tokenizer for decoding.
            inputs (`dict`): Tokenized inputs (tensors for sequential, lists for CB).
            gen_config (`GenerationConfig`): Generation parameters.
            request_id (`str`): Unique request identifier.

        Returns:
            `tuple[str, int, list[int]]`: ``(text, input_len, generated_ids)``.
        Nr   r  s         r    generate_non_streamingz*BaseGenerateManager.generate_non_streaming	  r
  r   Nc                      y)z/Stop the generation manager and free resources.Nr   r   s    r    stopzBaseGenerateManager.stop  r
  r   r   )r   r   r   r(   r   r]   r)   r`   r   r   r	  r   ra   r  r  r   r   r    r  r    s       > 	
 '  
w}};;	< .   > 	
 '  
sCc"	# * > >r   r  c                       e Zd ZdZd Zdddddedd	d
edeej                  e
f   fdZdddddedd	d
edeeedf   fdZdedefdZdedej                  fdZddZy)GenerateManagerzFSequential generation via ``model.generate()`` on a persistent thread.c                 "    t               | _        y r%   )r   r   r   s    r    r'   zGenerateManager.__init__'  s    &(r   r/   r   r  r  r  r  r   r   r0   c                    	
 t        j                         	t        j                         
t        |j                  	
d      }i ||||dd	
fd}| j                  |       
|fS )zLStart streaming generation via ``model.generate()`` on the inference thread.Tr   )streamergeneration_configr   c            	          	  j                   di  y # t        $ r j                  j                  d        Y y t        $ r8} j                  j                  t        t        |                    Y d } ~ y d } ~ ww xY w)Nr   )generater+   r   r   r   r"   r)   )rY   
gen_kwargsr   r/   r   s    r    r   z0GenerateManager.generate_streaming.<locals>._run8  sm    R,,' B))%*:*:DA R))%*:*:LQ<PQQRs    %A=A=.A88A=r   )r   r   r   r   r   r   )r&   r/   r  r  r  r   r  r   r  r   r   s    `      @@@r    r	  z"GenerateManager.generate_streaming*  sm     '')&}}!)"6"6eY]^nnH:dmn
	R 	R 	Dhr   r   c                    K    | j                   |j                  fi |||d d{   }|d   j                  d   }|d|df   }|j                  |d      }	|	||fS 7 7w)zNRun generation to completion via ``model.generate()`` on the inference thread.)r  r   N	input_idsr   r   Tr  )r   r  shapedecode)
r&   r/   r  r  r  r   	sequences	input_lengenerated_idsrA   s
             r    r  z&GenerateManager.generate_non_streamingC  s      ,$++NN
$
8Bi
 
	 ;'--b1	!!YZ-04HY--
s   &A"A 8A"r   c                 B     | j                   j                  |g|i |S )r   )r   r   r&   r   r   r   s       r    r   zGenerateManager.submitT  s#    "t||""27777r   c                 B     | j                   j                  |g|i |S r   )r   r   r!  s       r    r   zGenerateManager.async_submitX  s#    (t||((=d=f==r   Nc                      y r%   r   r   s    r    r  zGenerateManager.stop\  rt   r   r   )r   r   r   r(   r'   r]   r)   r`   r   r   r   r	  r   r  r   r   r   r   r  r   r   r    r  r  $  s    P)  > 	
 '  
w}}n,	-2. . >. 	.
 '. . 
sC'	(."8 8v 8>x >W^^ >r   r  c                       e Zd ZdZdddZdd	Zddd
ddedddedee	j                  ef   fdZddd
ddedddedeeeee   f   fdZedd       ZddZy)CBGenerateManagerau  Continuous batching generation via paged attention.

    Translates between the handler's text-level asyncio.Queue and CB's
    token-level interface. Per-request: ``max_new_tokens``, ``eos_token_id``.

    The CB manager is initialized lazily on the first request via
    :meth:`ensure_initialized`, using that request's ``gen_config`` for shared
    sampling params (temperature, top_p, do_sample).

    .. todo:: Remove :meth:`init_cb` when CB supports per-request
       generation config. At that point, ``gen_config`` can be passed directly
       to ``add_request`` and the CB manager no longer needs a shared config.
    Nc                      d | _         || _        y r%   )r   
_cb_config)r&   	cb_configs     r    r'   zCBGenerateManager.__init__o  s    #r   r/   r   r  r   r0   c                     | j                   y|j                  || j                        | _         | j                   j                          y)at  Initialize the CB manager on first call with the request's generation config.

        .. todo:: Remove when CB supports per-request generation config.

        Args:
            model (`PreTrainedModel`): The loaded model (must support ``init_continuous_batching``).
            gen_config (`GenerationConfig`): Generation config used for shared sampling params.
        N)r  continuous_batching_config)r   init_continuous_batchingr'  r-   )r&   r/   r  s      r    init_cbzCBGenerateManager.init_cbs  sA     8811(T__ 2 
 	r   r  r  r  r   c                 Z  	
 t        j                         }t        j                         
|d   }| j                  j	                  ||d|j
                  |j                        }t        | j                  ||j                  |
      		
fd}| j                  j                  ||       
	fS )zFStart streaming CB generation. Registers a per-request output handler.r  T)r   	streamingmax_new_tokenseos_token_idc                     	 j                  |        | j                         rj                          y y # t        $ r-}j	                  t        t        |                   Y d }~y d }~ww xY wr%   )r   is_finishedr.   r   r   r"   r)   )r   rY   r  
text_queues     r    
_on_outputz8CBGenerateManager.generate_streaming.<locals>._on_output  sX    <V$%%'LLN ( <%%l3q6&:;;<s   16 	A,#A''A,)
r   r   r   r   add_requestr/  r0  r   r   register_result_handler)r&   r/   r  r  r  r   r   r  r4  r  r3  s            @@r    r	  z$CBGenerateManager.generate_streaming  s     '')$+MMO
;'	XX))!%44#00 * 

 dhh
I4H4H$PZ[	< 	((Z@8##r   c                   K   |d   }t        |      }t        j                         }|j                         fd}	| j                  j                  ||	       | j                  j                  |||j                  d|j                          d{   }
|
t        d|       |
j                  }|j                  |d      }|||fS 7 8w)	zcRun non-streaming CB generation. Registers a handler that resolves an asyncio.Future on completion.r  c                 J    j                         sj                  |        y y r%   )doner   )rZ   r   s    r    
_on_resultz<CBGenerateManager.generate_non_streaming.<locals>._on_result  s    ;;=!!&) !r   F)r   r/  r.  r0  Nz1CB manager stopped before producing a result for Tr  )rT   r   r   r   r   r6  r5  r/  r0  RuntimeErrorr   r  )r&   r/   r  r  r  r   r  r  r   r:  rZ   r  rA   r   s                @r    r  z(CBGenerateManager.generate_non_streaming  s      ;'		N	 '')##%	* 	((Z@!%44#00 	 	
 >!RS]R^_``//4HY-- s   BCC9Cc                 B    | j                   j                  j                  S )z*The CB scheduler (for testing/monitoring).)r   batch_processor	schedulerr   s    r    r>  zCBGenerateManager.scheduler  s     xx''111r   c                 X    | j                   | j                   j                  dd       y y )NT   )rF   timeout)r   r  r   s    r    r  zCBGenerateManager.stop  s%    88HHMMaM0  r   r%   )r(  ContinuousBatchingConfig | None)r/   r   r  r   r0   N)r0   r   r   )r   r   r   r(   r'   r,  r]   r)   r`   r   r   r   r	  r   ra   r  propertyr>  r  r   r   r    r%  r%  `  s    $"!$ !$ >!$ 	!$
 '!$ !$ 
w}}j(	)!$F". ". >". 	".
 '". ". 
sCc"	#".H 2 21r   r%  c                   ^    e Zd ZdZ	 	 	 ddededdfdZdd	d
edefdZddedede	fdZ
ddZy)GenerationStatea'  Shared generation state across all handlers.

    Manages per-model :class:`GenerateManager` instances (each with its own
    :class:`InferenceThread` so different models can run concurrently while
    ``torch.compile`` / CUDA graphs require same-model-same-thread) and a
    single :class:`CBGenerateManager` for continuous batching.

    Args:
        continuous_batching (`bool`, *optional*, defaults to `False`):
            Whether to use continuous batching with paged attention instead of
            sequential ``model.generate()`` calls.
    Ncontinuous_batchingcompiler(  rB  c                 X    || _         || _        || _        i | _        d | _        d | _        y r%   )_continuous_batching_compiler'  _generate_managers_cb_manager_cb_model_id)r&   rF  rG  r(  s       r    r'   zGenerationState.__init__  s2     %8!#>@59(,r   r/   r   modalityr0   c                     | j                   syt        |d      xr |t        j                  k(  }|s,t        j                  |j                  j                   d       |S )aW  Check if continuous batching can be used for this model and modality.

        Args:
            model (`PreTrainedModel`): The loaded model.
            modality (`Modality`): The detected model modality (LLM, VLM, etc.).

        Returns:
            `bool`: ``True`` if CB is enabled and the model supports it, ``False`` otherwise.
        Fr+  zM does not support continuous batching. Falling back to sequential generation.)rI  hasattrr   r   loggerwarning_oncer   r   )r&   r/   rN  cans       r    use_continuous_batchingz'GenerationState.use_continuous_batching  s]     ((e78UX=U??++, -9 9 
r   re   use_cbc                 Z   |rv| j                   |k7  r-| j                  !| j                  j                          d| _        | j                  "t        | j                        | _        || _         | j                  S || j
                  vrt               | j
                  |<   | j
                  |   S )af  Return a per-model generation manager, lazily created on first request.

        Args:
            model_id (`str`): The model ID in ``'model_id@revision'`` format.
            use_cb (`bool`): Whether to return a CB manager or a sequential one.

        Returns:
            `BaseGenerateManager`: Either a `GenerateManager` or `CBGenerateManager`.
        N)r(  )rM  rL  r  r%  r'  rK  r  )r&   re   rU  s      r    get_managerzGenerationState.get_manager  s       H,##/$$))+'+D$'#4t#O $,!###42220?0AD##H-&&x00r   c                 `    | j                   "| j                   j                          d| _         yy)z$Stop any active generation managers.N)rL  r  r   s    r    shutdownzGenerationState.shutdown  s-    '!!##D (r   )FFNFr   )r   r   r   r(   r   r'   r   rT  r)   r  rW  rY  r   r   r    rE  rE    so     %*7;	-!- - 5	--> ( W[ (1C 1 1BU 1.$r   rE  c            	           e Zd ZU dZdZedz  ed<    e       Zee	   ed<   ddde
fdZd	ed
dfdZeddd
e	fd       Zd	ed
ee	ddf   fdZ	 dd	eddded
dfdZedee   ded
ee   fd       Zy)BaseHandlera  Shared logic for chat completion and responses handlers.

    Provides model resolution, generation config building, and SSE formatting.
    Generation is delegated to the shared :class:`GenerationState`.

    Args:
        model_manager (`ModelManager`):
            Handles model loading, caching, and lifecycle.
        generation_state (`GenerationState`):
            Shared state managing per-model generation managers.
    N_valid_params_class_unused_fieldsmodel_managerr   generation_statec                      || _         || _        y r%   )r_  r`  )r&   r_  r`  s      r    r'   zBaseHandler.__init__2  s    
 + 0r   bodyr0   c                     ddl m} t        |j                               }| j                  (|| j                  j
                  z
  }|r |dd|       || j                  z  }|rt        j                  d|        yy)zMValidate request fields against the handler's params class and unused fields.r   )HTTPExceptionNi  z"Unexpected fields in the request: )status_codedetailz,Ignoring unsupported fields in the request: )	fastapird  r   keysr]  __mutable_keys__r^  rQ  rR  )r&   rb  rd  
input_keys
unexpectedunuseds         r    _validate_requestzBaseHandler._validate_request:  s    )%
##/#d&>&>&O&OOJ#>`ak`l<mnnd111"Nvh WX r   chunkzstr | pydantic.BaseModelc                     t        | t              r| j                  d      r| S d|  dS d| j                  d       dS )z;Format a pydantic model or string as an SSE ``data:`` line.zdata: z

T)exclude_none)
isinstancer)   
startswithmodel_dump_json)rn  s    r    chunk_to_ssezBaseHandler.chunk_to_sseG  sM     eS!!,,X65PfUG4<PP--4-@AFFr   r   r  c                     | j                   j                  | j                   j                  |d<   | j                   j                  |d         }| j                   j                  |      \  }}|||fS )zfApply force_model, load model + processor.

        Returns ``(model_id, model, processor)``.
        r/   )r_  force_modelprocess_model_nameload_model_and_processor)r&   rb  re   r/   r  s        r    _resolve_modelzBaseHandler._resolve_modelN  sm    
 ))5 ..::DM%%88gG--FFxPy	))r   model_generation_configr   rU  c                 B   ddl m} |j                  d       |di t        j                  |d         }n7t        j                  |      }|j                  |j                  dk  rd|_        |j                  d      +t        |d         |_	        t        |d         dk(  rd|_
        |j                  d      t        |d         |_        |j                  d	      t        |d	          | j                  j                  r|j                  d
|_        |rd|_        |S )a'  Build a GenerationConfig from shared params (temperature, top_p, seed, generation_config JSON).

        Subclasses should call ``super()._build_generation_config(...)`` then apply
        endpoint-specific params (``max_tokens``, ``max_output_tokens``, etc.).

        Args:
            body (`dict`):
                The raw request body.
            model_generation_config (`GenerationConfig`):
                The model's default generation config (will be deep-copied).
            use_cb (`bool`, *optional*, defaults to `False`):
                Whether continuous batching is active. If ``True``, disables the model's
                internal KV cache (CB manages its own paged cache).

        Returns:
            `GenerationConfig`: A new config with request-specific overrides applied.
        r   )r   r  i   temperatureg        Ftop_pr   staticr   )transformersr   rM   rK   rL   copydeepcopyr/  floatr|  	do_sampler}  r   r`  rJ  cache_implementation	use_cache)r&   rb  rz  rU  r   r  s         r    _build_generation_configz$BaseHandler._build_generation_config[  s   ( 	288'(4 0 Y4::dCV>W3X Y $.E F //7;L;[;[^b;b37!088M".,1$}2E,F)T-()S0.3!+88G(&+DM&:#88F'4<(   )).?.T.T.\5=2 */' ! r   messagesrN  c           	         g }| D ]  }|d   g d}|t         j                  k(  ret        |d   t              r
|d   |d<   net        |d   t              rQ|d   D cg c]  }|d   dk(  s|d    }}dj                  |      |d<   n|t         j                  k(  r	t        |d   t              r|d   j                  d|d   d       n|d   D ]  }|d   dk(  r|d   j                  |        |d   dk(  s)d	d
lm	} |d   d   }	d|	v rt        j                  dd|	      }
|j                  t        t        j                  |
                  }t!        j"                  dd      }|j%                  |j&                         |j&                  }	|d   j                  d|	d        |j                  |        |S c c}w )a  Convert OpenAI-format messages to the format expected by HF processors.

        For LLMs, collapses list content blocks into plain text. For VLMs, converts
        ``image_url`` content parts (including base64) into ``{"type": "image", "url": ...}``
        entries that HF processors understand.

        Args:
            messages (`list[dict]`): OpenAI-format chat messages.
            modality (`Modality`): Whether the model is an LLM or VLM.

        Returns:
            `list[dict]`: Processor-compatible messages.
        role)r  contentr  typerA    )r  rA   	image_urlr   )Imageurlbase64z^data:image/.+;base64,r=   z.pngF)suffixdeleteimage)r  r  )r   r   rq  r)   ra   joinr   rU   PILr  resubopenr   r  	b64decodetempfileNamedTemporaryFilesaverI   )r  rN  processor_inputsmessagerO   rx   textsr  r  r  
image_datar  files                r    "get_processor_inputs_from_messagesz.BaseHandler.get_processor_inputs_from_messages  s     	,G%fo"=F8<<'gi0#6(/	(:F9%	 2D907	0BZ1aiSYFYQvYZEZ(+F9%X\\)gi0#69%,,fgiFX-YZ#*9#5 T"6?f4"9-44W=$V_;1")+"6u"=C'3-/VV4LbRU-V
(-

76;K;KJ;W3X(Y'/'B'B&Y^'_ %

499 5&*ii"9-44gc5RST ##F+;	,<  / [s   F<*F<rZ  )r   r   r   r(   r]  r  __annotations__r   r^  r)   rE  r'   r]   rm  r_   rt  r`   ry  r   r  ra   r   r  r   r   r    r\  r\  "  s    
 (,+"uNCH$1%1 *1Yd Yt Y G6 G3 G G*4 *E#7HJt2t,u * W\0!0!3E0!OS0!	0!d . T$Z . 8 . X\]aXb .  . r   r\  r   )Er(   r   r  r  enumrK   r  r  r   abcr   r   collections.abcr   concurrent.futuresr   ior   r   r   typingr	   transformers.utilsr
   pydantic
tokenizersr   r  r   r   r   r   r   :transformers.generation.continuous_batching.continuous_apir   4transformers.generation.continuous_batching.requestsr   5transformers.generation.continuous_batching.schedulerr   r_  r   
get_loggerr   rQ  X_REQUEST_IDEnumr   r"   r   r+   r5   r]   r8   r:   rc   r)   r  r   r   r   r   r   r   r   r  r  r%  rE  r\  r   r   r    <module>r     s        	   # $ %     &   eUO+ 
		H	% tyy  P9 P  / D4K "`; `;F(
 (
VDx D3 D4 DN6 6r42 42n  !& &R8># 8>v9) 9xr1+ r1jJ$ J$ZZ  Z r   