Workflow: Pipeline de Coleta e Enriquecimento¶
Pipeline completo de coleta de notícias (scraper) e enriquecimento event-driven.
Arquitetura Atualizada (27/02/2026)
Pipeline migrado para event-driven com AWS Bedrock. Cogfy e GitHub Actions batch descontinuados.
Visão Geral¶
O pipeline é baseado em arquitetura event-driven com Cloud Pub/Sub:
- Scraping (repo
scraper): Via Airflow DAGs, a cada 15 minutos → publica eventos - Enrichment (repo
data-science): Via Enrichment Worker (Cloud Run) → event-driven - Embeddings (repo
embeddings): Via Embeddings Worker (Cloud Run) → event-driven - Indexação: Via Typesense Sync Worker (Cloud Run) → event-driven
flowchart LR
subgraph "Scraping (Airflow, a cada 15min)"
DAG[~158 DAGs] -->|HTTP POST| API[Scraper API<br/>Cloud Run]
API -->|INSERT + publish| PG[(PostgreSQL)]
API -->|publish| PS1{{dgb.news.scraped}}
end
subgraph "Event-Driven Processing (~15s total)"
PS1 -->|push| EW[Enrichment Worker]
EW -->|AWS Bedrock| EW
EW -->|UPDATE + publish| PG
EW -->|publish| PS2{{dgb.news.enriched}}
PS2 -->|push| EAPI[Embeddings Worker]
EAPI -->|UPDATE + publish| PG
EAPI -->|publish| PS3{{dgb.news.embedded}}
PS2 -->|push| TSW[Typesense Sync]
PS3 -->|push| TSW
TSW -->|upsert| TS[Typesense]
end
style PS1 fill:#f3e5f5
style PS2 fill:#f3e5f5
style PS3 fill:#f3e5f5
Estágio 1: Scraping (Airflow)¶
Repositório: destaquesgovbr/scraper
Como funciona¶
- ~158 DAGs dinâmicas (1 por agência gov.br) + 1 DAG EBC
- Cada DAG roda a cada 15 minutos
- A DAG faz HTTP POST para a Scraper API no Cloud Run
- A API raspa o site, parseia HTML → Markdown, e insere no PostgreSQL
DAGs¶
| DAG | Schedule | Descrição |
|---|---|---|
scrape_{agency_key} (~158) |
*/15 * * * * |
Raspa 1 agência gov.br |
scrape_ebc |
*/15 * * * * |
Raspa sites EBC |
API Endpoints¶
| Método | Endpoint | Descrição |
|---|---|---|
POST |
/scrape/agencies |
Raspa sites gov.br |
POST |
/scrape/ebc |
Raspa sites EBC |
GET |
/health |
Health check |
Deploy¶
| Componente | Destino | Workflow |
|---|---|---|
| API | Cloud Run | scraper-api-deploy.yaml |
| DAGs | Composer bucket {bucket}/scraper/ |
composer-deploy-dags.yaml |
Config Sync: site_urls.yaml (Dual YAML)¶
Problema: O arquivo site_urls.yaml existe em dois locais que devem ser mantidos sincronizados:
src/govbr_scraper/scrapers/config/site_urls.yaml(fonte) - Usado pela API Cloud Rundags/config/site_urls.yaml(cópia) - Usado pelas DAGs Airflow
Por que dois arquivos?
- DAGs do Airflow são autocontidas e não importam código Python da API
- API Cloud Run empacota o arquivo de src/ na imagem Docker
- Mantém separação de responsabilidades entre orquestração (DAGs) e execução (API)
Como modificar configuração de agências:
# 1. Sempre edite o arquivo fonte
vim src/govbr_scraper/scrapers/config/site_urls.yaml
# 2. Copie manualmente para o arquivo das DAGs
cp src/govbr_scraper/scrapers/config/site_urls.yaml dags/config/site_urls.yaml
# 3. Commit ambos os arquivos
git add src/govbr_scraper/scrapers/config/site_urls.yaml dags/config/site_urls.yaml
git commit -m "feat: adiciona nova agência XYZ"
# 4. Abra PR - CI validará sincronização
Validação CI:
O teste test_config_sync.py valida que os arquivos estão idênticos:
def test_site_urls_yaml_files_are_in_sync():
"""Valida que os dois arquivos site_urls.yaml estão sincronizados."""
source = "src/govbr_scraper/scrapers/config/site_urls.yaml"
copy = "dags/config/site_urls.yaml"
with open(source) as f1, open(copy) as f2:
source_content = f1.read()
copy_content = f2.read()
assert source_content == copy_content, \
f"Files out of sync! Run: cp {source} {copy}"
PRs com arquivos dessincronizados são bloqueados pelo CI.
Estrutura do YAML:
agencies:
mec:
url: https://www.gov.br/mec/pt-br/assuntos/noticias
active: true
saude:
url: https://www.gov.br/saude/pt-br/assuntos/noticias
active: true
disabled_agency:
url: https://www.example.gov.br/noticias
active: false
disabled_reason: "Site descontinuado"
disabled_date: "2025-12-01"
Campos:
- url (obrigatório): URL da página de notícias
- active (opcional, default: true): Se false, não gera DAG no Airflow
- disabled_reason (opcional): Motivo da desativação
- disabled_date (opcional): Data da desativação
Migração futura: Esta configuração pode migrar para PostgreSQL, eliminando a necessidade de sincronização manual.
→ Documentação completa: scraper/dags/config/README.md
Estágio 2: Enrichment Event-Driven (Cloud Run Workers)¶
Repositórios: - destaquesgovbr/data-science - Enrichment Worker - destaquesgovbr/embeddings - Embeddings Worker - destaquesgovbr/data-platform - Typesense Sync Worker
Trigger¶
Event-driven: Push subscriptions do Cloud Pub/Sub chamam endpoints HTTP dos workers
Workers¶
| # | Worker | Trigger | Descrição | Latência |
|---|---|---|---|---|
| 1 | Enrichment Worker | Topic: dgb.news.scraped |
Classifica via AWS Bedrock (Claude 3 Haiku): temas L1/L2/L3 + resumo + sentiment + entities | ~5s |
| 2 | Embeddings Worker | Topic: dgb.news.enriched |
Gera vetores 768-dim via modelo local paraphrase-multilingual-mpnet-base-v2 |
~5s |
| 3 | Typesense Sync Worker | Topics: dgb.news.enriched + dgb.news.embedded |
Sincroniza documentos enriquecidos para Typesense (upsert idempotente) | ~5s |
Latência total: ~15 segundos (scraping → indexação)
Diagrama de Sequência¶
sequenceDiagram
participant AF as Airflow DAGs
participant API as Scraper API (Cloud Run)
participant GOV as Sites gov.br / EBC
participant PG as PostgreSQL
participant PS1 as Pub/Sub: scraped
participant EW as Enrichment Worker
participant Bedrock as AWS Bedrock (Claude 3 Haiku)
participant PS2 as Pub/Sub: enriched
participant EAPI as Embeddings Worker
participant PS3 as Pub/Sub: embedded
participant TSW as Typesense Sync Worker
participant TS as Typesense
Note over AF: A cada 15 min
AF->>API: POST /scrape/agencies
API->>GOV: Fetch sites
GOV-->>API: HTML pages
API->>API: Parse → Markdown
API->>PG: INSERT articles
API->>PS1: publish scraped event
Note over PS1: Event-driven processing (~15s total)
PS1->>EW: push notification (unique_id)
EW->>PG: fetch article
EW->>Bedrock: classify (themes + summary + sentiment + entities)
Bedrock-->>EW: JSON response
EW->>PG: UPDATE themes + features
EW->>PS2: publish enriched event
PS2->>EAPI: push notification (unique_id)
EAPI->>PG: fetch title + summary
EAPI->>EAPI: generate embedding 768-dim
EAPI->>PG: UPDATE content_embedding
EAPI->>PS3: publish embedded event
PS2->>TSW: push notification (enriched)
PS3->>TSW: push notification (embedded)
TSW->>PG: fetch full document
TSW->>TS: upsert document
Note over API,TS: Latência total: ~15 segundos
Secrets Necessárias¶
Repo data-science (Enrichment Worker)¶
| Secret | Descrição |
|---|---|
DATABASE_URL |
Connection string PostgreSQL |
AWS_ACCESS_KEY_ID |
AWS credentials para Bedrock |
AWS_SECRET_ACCESS_KEY |
AWS credentials para Bedrock |
AWS_REGION |
Região AWS (us-east-1) |
BEDROCK_MODEL_ID |
ID do modelo (anthropic.claude-3-haiku-20240307-v1:0) |
GCP_PROJECT_ID |
Projeto GCP para Pub/Sub |
PUBSUB_TOPIC_ENRICHED |
Nome do topic enriched |
Repo embeddings (Embeddings Worker)¶
| Secret | Descrição |
|---|---|
DATABASE_URL |
Connection string PostgreSQL |
GCP_PROJECT_ID |
Projeto GCP para Pub/Sub |
PUBSUB_TOPIC_EMBEDDED |
Nome do topic embedded |
Repo data-platform (Typesense Sync Worker)¶
| Secret | Descrição |
|---|---|
DATABASE_URL |
Connection string PostgreSQL |
TYPESENSE_HOST |
Host do Typesense |
TYPESENSE_API_KEY |
API Key do Typesense |
Repo scraper¶
| Secret | Descrição |
|---|---|
DATABASE_URL |
Connection string PostgreSQL |
GCP_PROJECT_ID |
Projeto GCP para Pub/Sub |
PUBSUB_TOPIC_SCRAPED |
Nome do topic scraped |
| GCP SA credentials | Para deploy no Cloud Run e Composer |
Monitoramento¶
Scraping (Airflow)¶
# Acessar Web UI do Airflow
gcloud composer environments describe destaquesgovbr-composer \
--location us-central1 \
--format="value(config.airflowUri)"
Workers Event-Driven (Cloud Run + Pub/Sub)¶
# Logs do Enrichment Worker
gcloud logging read "resource.type=cloud_run_revision AND resource.labels.service_name=enrichment-worker" --limit 50
# Logs do Embeddings Worker
gcloud logging read "resource.type=cloud_run_revision AND resource.labels.service_name=embeddings-worker" --limit 50
# Logs do Typesense Sync Worker
gcloud logging read "resource.type=cloud_run_revision AND resource.labels.service_name=typesense-sync-worker" --limit 50
# Métricas de Pub/Sub (backlog de mensagens)
gcloud monitoring dashboards list --filter="displayName:Pub/Sub"
# Dead-Letter Queue (mensagens falhadas)
gcloud pubsub subscriptions pull dgb.news.scraped-dlq --limit=10
gcloud pubsub subscriptions pull dgb.news.enriched-dlq --limit=10
gcloud pubsub subscriptions pull dgb.news.embedded-dlq --limit=10
Sync HuggingFace (Separado)¶
O sync para o HuggingFace é feito via DAG no Cloud Composer (repo data-platform), não faz parte dos pipelines acima.
→ Veja Airflow DAGs para detalhes.
Links Relacionados¶
- Módulo Scraper - Detalhes do scraper standalone
- Data Platform - Repositório de enrichment
- PostgreSQL - Fonte de verdade
- Airflow DAGs - DAGs de sync e scraping
- Docker Builds - Build das imagens