Pular para conteúdo

Subscriptions & SSE

A única subscription do schema é generateRecortes — o agente de IA que gera recortes de clipping em tempo real, transmitindo o raciocínio do LLM conforme ele acontece.

type Subscription {
  """Gera recortes em tempo real via agente LLM. Passthrough do SSE do
     worker clipping. Requer autenticacao."""
  generateRecortes(prompt: String!): AgentEvent!
}

Por que SSE e não WebSocket

Subscriptions em GraphQL costumam usar WebSocket (graphql-ws). Aqui usamos Server-Sent Events (graphql-sse) por dois motivos:

  • O fluxo é unidirecional (servidor → cliente) e de vida curta (a duração de uma geração). SSE é mais simples e atravessa proxies/Cloud Run melhor que WS.
  • O agente real roda no clipping worker (outro serviço). Este serviço só faz passthrough: abre o SSE do worker e repassa os eventos. SSE encaixa naturalmente nesse padrão de proxy de stream.

O endpoint /graphql/stream

Subscriptions vão para /graphql/stream, não /graphql

/graphql serve só queries e mutations (HTTP POST). As subscriptions têm um endpoint dedicado /graphql/stream (text/event-stream). Um cliente que aponte o SSE para /graphql recebe 404 / "Failed to fetch". Foi um dos bugs da R1 — o portal derivava a URL do SSE errada. O cliente deve transformar …/graphql…/graphql/stream.

O endpoint responde com headers de streaming:

StreamingResponse(
    generator,
    media_type="text/event-stream",
    headers={
        "Cache-Control": "no-cache",
        "Connection": "keep-alive",
        "X-Accel-Buffering": "no",  # desabilita buffering em Nginx/Cloud Run
    },
)

Fluxo do passthrough

sequenceDiagram
    participant P as Portal (browser)
    participant A as graphql-api
    participant W as clipping worker
    P->>A: POST /graphql/stream<br/>generateRecortes(prompt) + Bearer JWT
    A->>A: valida JWT (IsAuthenticated)
    A->>A: obtém OIDC token (audience = worker)
    A->>W: abre SSE /agent/... + Bearer OIDC
    loop streaming
        W-->>A: evento (thinking / tool / sample / done)
        A-->>P: AgentEvent (SSE)
    end

A autenticação tem dois saltos: usuário → graphql-api (JWT Keycloak) e graphql-api → worker (OIDC do Google, via lib/oidc.py). Ver auth.

O tipo AgentEvent (union)

Cada evento do stream é um membro de uma union — o cliente faz pattern-match no __typename:

Membro Significado
AgentEventThinking raciocínio em andamento (message)
AgentEventToolCall o agente chamou uma ferramenta (tool, argsJson)
AgentEventToolResult resultado de uma ferramenta (tool, resultJson)
AgentEventSampleResult amostra de artigos para um recorte (payloadJson)
AgentEventAdjusting ajustando a estratégia (message)
AgentEventDone concluído: recortes, explanation, suggestedName, iterations, converged
AgentEventError erro (message)
subscription Generate($prompt: String!) {
  generateRecortes(prompt: $prompt) {
    __typename
    ... on AgentEventThinking { message }
    ... on AgentEventDone {
      suggestedName
      recortes { title themes agencies keywords }
    }
    ... on AgentEventError { message }
  }
}

Dev local vs Cloud Run

Em Cloud Run o OIDC sai do metadata server e o worker aceita. Em dev local, o fallback ADC só funciona se a sua identidade tiver roles/run.invoker no worker; sem isso, o stream do agente falha no salto OIDC (o resto do schema funciona normalmente).