News Enrichment Worker¶
Worker de enriquecimento de notícias via AWS Bedrock - Core do sistema event-driven do DestaquesGovbr.
Visão Geral¶
O News Enrichment Worker é um serviço Cloud Run que processa notícias de forma assíncrona via Google Cloud Pub/Sub, enriquecendo-as com classificação temática hierárquica, resumos e metadados semânticos usando AWS Bedrock (Claude 3 Haiku).
Características Principais¶
- ✅ Event-Driven: Recebe eventos Pub/Sub do tópico
dgb.news.scraped - ✅ Alta Performance: Latência P95 < 15s (99.97% redução vs batch anterior de 45min)
- ✅ Idempotente: Verifica campos
most_specific_theme_idesummaryantes de processar - ✅ Resiliência: Dead-Letter Queue + retry exponencial (10s → 600s)
- ✅ Custo Otimizado: $0.00074/documento (40% redução vs Cogfy)
- ✅ Observabilidade: Trace ID tracking, logging estruturado, métricas Cloud Monitoring
Arquitetura¶
Fluxo de Dados¶
graph LR
A[Scraper] -->|Pub/Sub| B[Topic: dgb.news.scraped]
B -->|Push Subscription| C[Enrichment Worker]
C -->|AWS Bedrock| D[Claude 3 Haiku]
D -->|Classificação| C
C -->|UPDATE PostgreSQL| E[Silver Layer]
E -->|Pub/Sub| F[Topic: dgb.news.enriched]
F --> G[Embeddings Worker]
C -.->|5xx Error| H[Dead-Letter Queue]
H -.->|Retry exponencial| B
Stack Tecnológico¶
| Componente | Tecnologia | Versão |
|---|---|---|
| Runtime | Python | 3.11 |
| Framework | FastAPI | 0.109+ |
| LLM Provider | AWS Bedrock | Claude 3 Haiku |
| Message Queue | Google Cloud Pub/Sub | - |
| Database | PostgreSQL | 15 (Cloud SQL) |
| Deploy | Cloud Run Gen2 | - |
| Observability | Cloud Logging + Monitoring | - |
Componentes¶
1. FastAPI Application (worker/app.py)¶
from fastapi import FastAPI, Request, HTTPException
import base64
import json
app = FastAPI(title="News Enrichment Worker")
@app.post("/")
async def handle_pubsub_push(request: Request):
"""
Endpoint que recebe mensagens Pub/Sub push.
Payload Format:
{
"message": {
"data": "base64-encoded-json",
"attributes": {
"trace_id": "...",
"news_id": "..."
}
},
"subscription": "projects/.../subscriptions/..."
}
"""
envelope = await request.json()
# Decode base64 message
message_data = base64.b64decode(envelope['message']['data']).decode('utf-8')
news_data = json.loads(message_data)
# Process news
result = await process_news(news_data)
# Return 200 to ACK, 4xx to NACK permanently, 5xx to retry
return {"status": "success", "result": result}
2. LLM Client (llm_client.py)¶
class BedrockLLMClient:
"""Cliente otimizado para AWS Bedrock."""
def __init__(
self,
model_id: str = "anthropic.claude-3-haiku-20240307-v1:0",
region: str = "us-east-1",
batch_size: int = 8,
max_workers: int = 4
):
self.bedrock = boto3.client('bedrock-runtime', region_name=region)
self.model_id = model_id
self.batch_size = batch_size
self.executor = ThreadPoolExecutor(max_workers=max_workers)
def enrich_news_batch(self, news_list: List[Dict]) -> List[Dict]:
"""
Processa batch de notícias em paralelo.
Retry Policy:
- 3 tentativas com backoff exponencial (1s, 2s, 4s)
- ThrottlingException: espera e retenta
- ValidationException: retorna erro (não retenta)
"""
with self.executor as executor:
futures = [
executor.submit(self._enrich_single, news)
for news in news_list
]
results = [f.result() for f in futures]
return results
3. Taxonomy Loader (taxonomy.py)¶
from functools import lru_cache
@lru_cache(maxsize=1)
def load_taxonomy_from_postgres(conn_string: str) -> Dict:
"""
Carrega taxonomia hierárquica do PostgreSQL.
Cache: @lru_cache garante que taxonomia é carregada 1x na inicialização.
Returns:
{
"themes": [...], # 543 temas hierárquicos
"code_to_id": {...}, # Mapeamento código → ID
"taxonomy_text": "..." # Prompt para LLM
}
"""
# Query PostgreSQL
themes = fetch_themes_from_db(conn_string)
# Build taxonomy tree
taxonomy_tree = build_hierarchy(themes)
return {
"themes": themes,
"code_to_id": {t['code']: t['id'] for t in themes},
"taxonomy_text": format_taxonomy_for_prompt(taxonomy_tree)
}
4. Pub/Sub Handler (worker/handler.py)¶
def parse_pubsub_envelope(envelope: Dict) -> Tuple[Dict, str]:
"""
Parseia envelope Pub/Sub e extrai trace_id.
Args:
envelope: Payload Pub/Sub push
Returns:
(news_data, trace_id)
"""
message = envelope.get('message', {})
attributes = message.get('attributes', {})
# Decode base64 data
data_b64 = message.get('data', '')
data_json = base64.b64decode(data_b64).decode('utf-8')
news_data = json.loads(data_json)
# Extract trace_id for logging
trace_id = attributes.get('trace_id', 'unknown')
return news_data, trace_id
5. Idempotency Check (enrichment_job.py)¶
def fetch_unenriched_news(conn, limit: int = 100) -> List[Dict]:
"""
Busca notícias que ainda não foram enriquecidas.
Idempotency: Verifica se most_specific_theme_id ou summary IS NULL.
"""
query = """
SELECT unique_id, title, content, publication_date
FROM news
WHERE most_specific_theme_id IS NULL
OR summary IS NULL
ORDER BY publication_date DESC
LIMIT %s
"""
return execute_query(conn, query, (limit,))
Configuração¶
Variáveis de Ambiente¶
# AWS Bedrock
AWS_ACCESS_KEY_ID=AKIA...
AWS_SECRET_ACCESS_KEY=...
AWS_DEFAULT_REGION=us-east-1
# PostgreSQL
POSTGRES_HOST=10.x.x.x
POSTGRES_PORT=5432
POSTGRES_DB=destaquesgovbr
POSTGRES_USER=enrichment_worker
POSTGRES_PASSWORD=...
# Worker Config
WORKER_BATCH_SIZE=8
WORKER_MAX_WORKERS=4
WORKER_RETRY_ATTEMPTS=3
WORKER_LOG_LEVEL=INFO
# Pub/Sub
PUBSUB_PROJECT_ID=destaques-govbr
PUBSUB_TOPIC_ENRICHED=dgb.news.enriched
Credenciais AWS (Airflow Connection URI)¶
Para integração com Airflow, o worker suporta parsing de connection URIs:
# Formato: aws://ACCESS_KEY:SECRET_KEY@/?region_name=REGION
connection_uri = "aws://AKIA...:SECRET@/?region_name=us-east-1"
# Parser automático em worker/handler.py (linhas 37-61)
parsed = parse_aws_connection_uri(connection_uri)
# → {"access_key_id": "AKIA...", "secret_access_key": "SECRET", "region": "us-east-1"}
Deployment¶
Cloud Run Configuration¶
# service.yaml
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
name: enrichment-worker
spec:
template:
metadata:
annotations:
autoscaling.knative.dev/minScale: "1" # Min 1 instância (evitar cold starts)
autoscaling.knative.dev/maxScale: "10"
spec:
containers:
- image: us-east1-docker.pkg.dev/destaques-govbr/workers/enrichment:latest
resources:
limits:
cpu: "1"
memory: "2Gi"
env:
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: aws-credentials
key: access_key_id
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: aws-credentials
key: secret_access_key
- name: POSTGRES_HOST
valueFrom:
secretKeyRef:
name: postgres-credentials
key: host
Pub/Sub Subscription¶
# Criar subscription com push para Cloud Run
gcloud pubsub subscriptions create enrichment-worker-sub \
--topic=dgb.news.scraped \
--push-endpoint=https://enrichment-worker-xxx.a.run.app \
--ack-deadline=600 \
--retry-policy-minimum-backoff=10s \
--retry-policy-maximum-backoff=600s \
--dead-letter-topic=dgb.news.scraped-dlq \
--max-delivery-attempts=5
Docker Build¶
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
# Install dependencies
COPY pyproject.toml poetry.lock ./
RUN pip install poetry && \
poetry config virtualenvs.create false && \
poetry install --no-dev --no-root
# Copy application
COPY src/ ./src/
# Expose port
EXPOSE 8080
# Run
CMD ["uvicorn", "src.news_enrichment.worker.app:app", "--host", "0.0.0.0", "--port", "8080"]
Monitoramento¶
Métricas Cloud Monitoring¶
| Métrica | Descrição | Alertas |
|---|---|---|
enrichment_latency_p95 |
Latência P95 end-to-end | > 20s |
enrichment_error_rate |
Taxa de erros (%) | > 1% |
enrichment_success_rate |
Taxa de sucesso (%) | < 95% |
bedrock_throttling_rate |
Throttling da AWS | > 5% |
pubsub_ack_latency |
Tempo até ACK | > 30s |
dlq_message_count |
Mensagens na DLQ | > 10 |
Logs Estruturados¶
import logging
import json
logger = logging.getLogger(__name__)
def log_enrichment_event(trace_id: str, news_id: str, status: str, latency_ms: int):
"""Log estruturado para Cloud Logging."""
log_entry = {
"trace_id": trace_id,
"news_id": news_id,
"status": status,
"latency_ms": latency_ms,
"timestamp": datetime.utcnow().isoformat(),
"service": "enrichment-worker"
}
logger.info(json.dumps(log_entry))
Debug Endpoint¶
@app.get("/health")
async def health_check():
"""Health check endpoint."""
return {
"status": "healthy",
"version": "1.0.0",
"bedrock_region": "us-east-1",
"taxonomy_loaded": taxonomy is not None
}
@app.get("/debug")
async def debug_info():
"""Debug endpoint (apenas em dev/staging)."""
return {
"taxonomy_themes_count": len(taxonomy['themes']),
"batch_size": llm_client.batch_size,
"max_workers": llm_client.max_workers,
"postgres_connected": test_postgres_connection()
}
Troubleshooting¶
Problema: Latência alta (>20s)¶
Causas: - Batch size muito grande - Throttling AWS Bedrock - PostgreSQL connection pool esgotado
Solução:
# Reduzir batch size
export WORKER_BATCH_SIZE=4
# Aumentar connection pool
export POSTGRES_POOL_SIZE=20
# Verificar throttling
aws cloudwatch get-metric-statistics \
--namespace AWS/Bedrock \
--metric-name ThrottledRequests \
--dimensions Name=ModelId,Value=anthropic.claude-3-haiku-20240307-v1:0 \
--start-time 2026-05-05T00:00:00Z \
--end-time 2026-05-05T23:59:59Z \
--period 3600 \
--statistics Sum
Problema: Mensagens na DLQ¶
Causas: - 5xx errors persistentes - Timeout PostgreSQL - Bedrock ValidationException
Solução:
# Inspecionar mensagens DLQ
gcloud pubsub subscriptions pull dgb.news.scraped-dlq --limit=10
# Replay manual após correção
gcloud pubsub topics publish dgb.news.scraped \
--message='{"unique_id": "...", "title": "...", "content": "..."}'
Problema: Taxa de sucesso < 95%¶
Causas: - Taxonomia não carregada - Credenciais AWS inválidas - Notícias com conteúdo vazio
Solução:
# Verificar taxonomia
curl https://enrichment-worker-xxx.a.run.app/debug | jq '.taxonomy_themes_count'
# Esperado: 543
# Verificar credenciais AWS
aws sts get-caller-identity --region us-east-1
# Adicionar validação de conteúdo
if not news_data.get('content') or len(news_data['content']) < 100:
logger.warning(f"Conteúdo vazio ou muito curto: {news_data['unique_id']}")
return None # ACK sem processar
Performance¶
Benchmarks (Claude 3 Haiku)¶
| Métrica | Valor | Notas |
|---|---|---|
| Latência P50 | 8s | Mediana |
| Latência P95 | 15s | Target SLO |
| Latência P99 | 22s | Casos complexos |
| Throughput | ~240 docs/min | Com batch_size=8, max_workers=4 |
| Custo | $0.00074/doc | 40% redução vs Cogfy ($0.00124) |
| Taxa de sucesso | 97% | Target SLO: >95% |
| Error rate | 0.3% | Target SLO: <1% |
Otimizações Aplicadas¶
- Batch Processing: ThreadPoolExecutor com max_workers=4
- Taxonomy Caching:
@lru_cachepara evitar queries repetidas - Connection Pooling: PostgreSQL pool com 10-20 conexões
- Retry Exponencial: 1s → 2s → 4s para throttling
- Min Instances: 1 instância sempre ativa (evitar cold starts)
Roadmap¶
Melhorias Planejadas (Q2 2026)¶
- [ ] Sentiment Analysis: Adicionar análise de sentimento além de temas
- [ ] Entity Extraction: Extrair entidades nomeadas (pessoas, locais, organizações)
- [ ] Multi-Region: Deploy em us-east-1 e sa-east-1 para redundância
- [ ] Caching Redis: Cache de resultados para notícias similares
- [ ] Streaming Mode: Suporte a processamento streaming (vs batch)
Melhorias Futuras (Q3 2026)¶
- [ ] Modelo Próprio: Fine-tuning de modelo específico para gov.br
- [ ] A/B Testing: Comparação Claude vs modelos alternativos
- [ ] Auto-Scaling Agressivo: Scale to zero em períodos de baixo volume
Referências¶
Código-Fonte¶
- Worker:
C:\Users\joserm\Documents\Projetos\Inspire\Meta-7\Git\data-science\src\news_enrichment\worker\ - LLM Client:
C:\Users\joserm\Documents\Projetos\Inspire\Meta-7\Git\data-science\src\news_enrichment\llm_client.py - Taxonomy:
C:\Users\joserm\Documents\Projetos\Inspire\Meta-7\Git\data-science\src\news_enrichment\taxonomy.py
Documentação Relacionada¶
Links Externos¶
Última atualização: 05/05/2026
Responsável: Equipe Data Science - DestaquesGovbr
Status: ✅ Em Produção (desde 27/02/2026)