Bronze Layer - Google Cloud Storage¶
Camada de dados brutos (Raw Layer) da arquitetura Medallion no DestaquesGovbr.
Visão Geral¶
A Bronze Layer armazena dados brutos em formato Parquet no Google Cloud Storage (GCS), servindo como repositório imutável de todos os eventos do sistema antes de qualquer transformação.
Características¶
- ✅ Formato: Apache Parquet (colunar, comprimido)
- ✅ Storage: Google Cloud Storage (GCS)
- ✅ Imutabilidade: Dados nunca são modificados após escrita
- ✅ Particionamento: Por data (
year=YYYY/month=MM/day=DD) - ✅ Retenção: 90 dias (lifecycle policy)
- ✅ Acesso: BigQuery External Tables para queries SQL
- ✅ Custo: ~$0.02/GB/mês (GCS Standard)
Arquitetura¶
Fluxo de Dados¶
graph LR
A[Scraper Worker] -->|Pub/Sub| B[Topic: dgb.news.scraped]
B -->|Push| C[Enrichment Worker]
C -->|Pub/Sub| D[Topic: dgb.news.enriched]
D -->|Push| E[Embeddings Worker]
B -->|Cloud Function| F[Bronze Writer]
D -->|Cloud Function| F
F -->|Write Parquet| G[GCS Bronze Bucket]
G -->|External Table| H[BigQuery]
G -->|Lifecycle 90d| I[Delete]
style G fill:#d4a373
style F fill:#f9a825
Camada Bronze no Medallion¶
graph TB
subgraph Bronze Layer
A[GCS Bucket<br/>destaques-govbr-bronze]
A -->|Raw JSON → Parquet| B[news_scraped/]
A -->|Raw JSON → Parquet| C[news_enriched/]
A -->|Raw JSON → Parquet| D[news_embedded/]
end
subgraph Silver Layer
E[(PostgreSQL)]
end
subgraph Gold Layer
F[(Typesense)]
G[(HuggingFace)]
end
B --> E
C --> E
D --> E
E --> F
E --> G
style A fill:#d4a373
style E fill:#c0c0c0
style F fill:#ffd700
style G fill:#ffd700
Estrutura do Bucket GCS¶
Hierarquia¶
gs://destaques-govbr-bronze/
├── news_scraped/
│ ├── year=2026/
│ │ ├── month=01/
│ │ │ ├── day=01/
│ │ │ │ ├── part-00000.parquet
│ │ │ │ ├── part-00001.parquet
│ │ │ │ └── ...
│ │ │ ├── day=02/
│ │ │ └── ...
│ │ ├── month=02/
│ │ └── ...
│ └── ...
├── news_enriched/
│ └── year=2026/month=MM/day=DD/part-XXXXX.parquet
└── news_embedded/
└── year=2026/month=MM/day=DD/part-XXXXX.parquet
Schema Parquet¶
news_scraped/¶
| Campo | Tipo | Descrição |
|---|---|---|
unique_id |
STRING | ID único da notícia |
title |
STRING | Título original |
content |
STRING | Conteúdo completo |
url |
STRING | URL fonte |
published_at |
TIMESTAMP | Data de publicação |
agency_id |
STRING | ID do órgão (ex: "mgi") |
scraped_at |
TIMESTAMP | Data de coleta |
trace_id |
STRING | Rastreamento do evento |
news_enriched/¶
Herda todos os campos de news_scraped/ e adiciona:
| Campo | Tipo | Descrição |
|---|---|---|
theme_l1_code |
STRING | Código do tema L1 (ex: "01") |
theme_l2_code |
STRING | Código do tema L2 (ex: "01.01") |
theme_l3_code |
STRING | Código do tema L3 (ex: "01.01.01") |
summary |
STRING | Resumo LLM-gerado |
enriched_at |
TIMESTAMP | Data de enriquecimento |
model_id |
STRING | Modelo AWS Bedrock usado |
news_embedded/¶
Herda todos os campos de news_enriched/ e adiciona:
| Campo | Tipo | Descrição |
|---|---|---|
content_embedding |
ARRAY |
Vetor 768-dim |
embedded_at |
TIMESTAMP | Data de embedding |
embedding_model |
STRING | Modelo usado (mpnet-base-v2) |
Cloud Function: Bronze Writer¶
Função¶
A Bronze Writer é uma Cloud Function acionada por Pub/Sub que escreve eventos em formato Parquet no GCS.
Código Exemplo¶
# functions/bronze_writer/main.py
import base64
import json
import io
from datetime import datetime
from google.cloud import storage
import pyarrow as pa
import pyarrow.parquet as pq
def write_to_bronze(event, context):
"""
Escreve evento Pub/Sub na Bronze Layer.
Acionador: Pub/Sub topics (dgb.news.scraped, dgb.news.enriched)
"""
# 1. Decodificar mensagem Pub/Sub
pubsub_message = base64.b64decode(event['data']).decode('utf-8')
data = json.loads(pubsub_message)
# 2. Determinar prefixo baseado no topic
topic_name = context.resource['name']
if 'scraped' in topic_name:
prefix = 'news_scraped'
elif 'enriched' in topic_name:
prefix = 'news_enriched'
elif 'embedded' in topic_name:
prefix = 'news_embedded'
else:
raise ValueError(f"Unknown topic: {topic_name}")
# 3. Extrair partição da data
published_at = datetime.fromisoformat(data['published_at'])
year = published_at.year
month = published_at.month
day = published_at.day
# 4. Construir path do arquivo
partition_path = f"{prefix}/year={year}/month={month:02d}/day={day:02d}"
filename = f"part-{context.event_id[:8]}.parquet"
blob_path = f"{partition_path}/{filename}"
# 5. Converter para Parquet
table = pa.Table.from_pylist([data])
buffer = io.BytesIO()
pq.write_table(table, buffer)
# 6. Escrever no GCS
storage_client = storage.Client()
bucket = storage_client.bucket('destaques-govbr-bronze')
blob = bucket.blob(blob_path)
blob.upload_from_file(buffer, rewind=True, content_type='application/octet-stream')
print(f"Written to gs://destaques-govbr-bronze/{blob_path}")
return {'status': 'success', 'path': blob_path}
Deployment¶
gcloud functions deploy bronze-writer \
--runtime=python311 \
--trigger-topic=dgb.news.scraped \
--entry-point=write_to_bronze \
--memory=512MB \
--timeout=60s \
--region=southamerica-east1 \
--set-env-vars=GCS_BUCKET=destaques-govbr-bronze
Lifecycle Policy¶
Configuração¶
{
"lifecycle": {
"rule": [
{
"action": {
"type": "Delete"
},
"condition": {
"age": 90,
"matchesPrefix": ["news_scraped/", "news_enriched/", "news_embedded/"]
}
}
]
}
}
Aplicar Policy¶
# Criar arquivo de configuração
cat > lifecycle.json <<EOF
{
"lifecycle": {
"rule": [
{
"action": {"type": "Delete"},
"condition": {
"age": 90,
"matchesPrefix": ["news_scraped/", "news_enriched/", "news_embedded/"]
}
}
]
}
}
EOF
# Aplicar ao bucket
gsutil lifecycle set lifecycle.json gs://destaques-govbr-bronze
Verificar Policy¶
gsutil lifecycle get gs://destaques-govbr-bronze
Integração com BigQuery¶
Criar External Tables¶
-- Tabela externa para news_scraped
CREATE OR REPLACE EXTERNAL TABLE `destaques-govbr.bronze.news_scraped`
OPTIONS (
format = 'PARQUET',
uris = ['gs://destaques-govbr-bronze/news_scraped/*/*/*/*.parquet'],
hive_partition_uri_prefix = 'gs://destaques-govbr-bronze/news_scraped',
require_hive_partition_filter = FALSE
);
-- Tabela externa para news_enriched
CREATE OR REPLACE EXTERNAL TABLE `destaques-govbr.bronze.news_enriched`
OPTIONS (
format = 'PARQUET',
uris = ['gs://destaques-govbr-bronze/news_enriched/*/*/*/*.parquet'],
hive_partition_uri_prefix = 'gs://destaques-govbr-bronze/news_enriched',
require_hive_partition_filter = FALSE
);
-- Tabela externa para news_embedded
CREATE OR REPLACE EXTERNAL TABLE `destaques-govbr.bronze.news_embedded`
OPTIONS (
format = 'PARQUET',
uris = ['gs://destaques-govbr-bronze/news_embedded/*/*/*/*.parquet'],
hive_partition_uri_prefix = 'gs://destaques-govbr-bronze/news_embedded',
require_hive_partition_filter = FALSE
);
Queries de Exemplo¶
-- Contar eventos por dia
SELECT
DATE(published_at) as date,
COUNT(*) as events
FROM `destaques-govbr.bronze.news_scraped`
WHERE DATE(published_at) >= DATE_SUB(CURRENT_DATE(), INTERVAL 7 DAY)
GROUP BY date
ORDER BY date DESC;
-- Analisar latência de enriquecimento
SELECT
AVG(TIMESTAMP_DIFF(enriched_at, scraped_at, SECOND)) as avg_latency_seconds,
MAX(TIMESTAMP_DIFF(enriched_at, scraped_at, SECOND)) as max_latency_seconds
FROM `destaques-govbr.bronze.news_enriched`
WHERE DATE(enriched_at) = CURRENT_DATE();
-- Distribuição de temas (L1)
SELECT
theme_l1_code,
COUNT(*) as count
FROM `destaques-govbr.bronze.news_enriched`
WHERE DATE(enriched_at) >= DATE_SUB(CURRENT_DATE(), INTERVAL 30 DAY)
GROUP BY theme_l1_code
ORDER BY count DESC
LIMIT 10;
Imutabilidade e Auditoria¶
Garantias¶
- Write-Once: Arquivos nunca são modificados após escrita
- Append-Only: Novos eventos sempre adicionam novos arquivos
- Particionamento Temporal: Cada dia tem seus próprios arquivos
- Trace ID: Cada evento tem
trace_idúnico para rastreamento
Auditoria¶
# Listar arquivos de uma partição
gsutil ls gs://destaques-govbr-bronze/news_scraped/year=2026/month=05/day=05/
# Ver metadados de um arquivo
gsutil stat gs://destaques-govbr-bronze/news_scraped/year=2026/month=05/day=05/part-abc12345.parquet
# Contar total de arquivos
gsutil du -sh gs://destaques-govbr-bronze/news_scraped/
Recuperação de Desastres¶
Cenário 1: Perda de dados na Silver Layer (PostgreSQL)¶
Solução: Reprocessar eventos da Bronze Layer.
# scripts/replay_from_bronze.py
from google.cloud import bigquery
client = bigquery.Client()
# Buscar eventos de um período
query = """
SELECT *
FROM `destaques-govbr.bronze.news_scraped`
WHERE DATE(scraped_at) BETWEEN '2026-05-01' AND '2026-05-05'
ORDER BY scraped_at
"""
for row in client.query(query):
# Republicar no Pub/Sub
publish_to_topic('dgb.news.scraped', row)
Cenário 2: Corrupção de dados¶
Solução: GCS mantém versões de objetos (se habilitado).
# Habilitar versionamento
gsutil versioning set on gs://destaques-govbr-bronze
# Listar versões de um arquivo
gsutil ls -a gs://destaques-govbr-bronze/news_scraped/year=2026/month=05/day=05/part-abc12345.parquet
# Restaurar versão anterior
gsutil cp gs://destaques-govbr-bronze/news_scraped/.../part-abc12345.parquet#1234567890 \
gs://destaques-govbr-bronze/news_scraped/.../part-abc12345-restored.parquet
Custos¶
Cálculo¶
| Item | Valor | Custo Mensal |
|---|---|---|
| Storage (90 dias) | ~50 GB | $1.00 |
| Class A Operations | ~10k/mês | $0.05 |
| BigQuery Queries | ~100 GB/mês | $5.00 |
| Data Transfer | ~10 GB egress | $1.20 |
| Total | - | ~$7.25/mês |
Assumptions: - 5.000 notícias/dia × 0.5 KB/notícia (Parquet) = 2.5 MB/dia - 90 dias × 2.5 MB = 225 MB × 3 tópicos = 675 MB ≈ 1 GB - Escala conservadora: 50 GB para 90 dias (inclui todos os campos)
Monitoramento¶
Métricas Cloud Monitoring¶
# Alertas recomendados
- name: bronze_layer_write_errors
metric: cloud.googleapis.com/functions/execution/error_count
filter: resource.function_name="bronze-writer"
threshold: 10 errors/hour
- name: bronze_layer_storage_growth
metric: storage.googleapis.com/storage/total_bytes
filter: resource.bucket_name="destaques-govbr-bronze"
threshold: > 100 GB (revisar lifecycle)
- name: bronze_layer_write_latency
metric: cloud.googleapis.com/functions/execution/duration
filter: resource.function_name="bronze-writer"
threshold: P95 > 30s
Dashboard Grafana¶
# Query para monitorar escritas
- expr: |
rate(gcp_cloudfunctions_function_invocations_total{function_name="bronze-writer"}[5m])
legendFormat: "Bronze Writes/sec"
- expr: |
histogram_quantile(0.95,
rate(gcp_cloudfunctions_function_execution_duration_seconds_bucket[5m]))
legendFormat: "P95 Latency"
Troubleshooting¶
Problema: Bronze Writer retorna erro "quota exceeded"¶
Causa: Rate limit de escritas no GCS excedido.
Solução:
# Verificar quota atual
gcloud compute project-info describe --project=destaques-govbr
# Solicitar aumento de quota
# Console GCP → IAM & Admin → Quotas → Cloud Storage API → Request Increase
Problema: Arquivos Parquet corrompidos¶
Causa: Falha na escrita (ex: OOM na Cloud Function).
Solução:
# Validar arquivo Parquet
import pyarrow.parquet as pq
try:
table = pq.read_table('gs://destaques-govbr-bronze/news_scraped/.../part-abc.parquet')
print(f"Valid file: {table.num_rows} rows")
except Exception as e:
print(f"Corrupted file: {e}")
# Deletar e reprocessar
Problema: BigQuery External Table não encontra arquivos¶
Causa: Padrão de URI incorreto ou partições não atualizadas.
Solução:
-- Recriar tabela com padrão correto
DROP EXTERNAL TABLE IF EXISTS `destaques-govbr.bronze.news_scraped`;
CREATE EXTERNAL TABLE `destaques-govbr.bronze.news_scraped`
OPTIONS (
format = 'PARQUET',
uris = ['gs://destaques-govbr-bronze/news_scraped/*/*/*/*.parquet']
);
-- Verificar
SELECT COUNT(*) FROM `destaques-govbr.bronze.news_scraped` LIMIT 10;
Best Practices¶
1. Particionamento¶
Recomendação: Particionar por published_at (não scraped_at).
Motivo: Queries de análise geralmente filtram por data de publicação da notícia, não por quando foi coletada.
2. Compressão¶
Recomendação: Usar compressão SNAPPY (padrão Parquet).
Motivo: Melhor custo-benefício entre taxa de compressão (~2-3x) e velocidade de leitura.
3. Tamanho de Arquivos¶
Recomendação: Arquivos de 64-128 MB (uncompressed).
Motivo: Otimiza leitura paralela no BigQuery e evita small files problem.
4. Schema Evolution¶
Recomendação: Adicionar campos novos no final do schema.
Motivo: Compatibilidade com arquivos existentes (Parquet suporta schema evolution).
Referências¶
Interna¶
Externa¶
- Apache Parquet Documentation
- GCS Lifecycle Management
- BigQuery External Tables
- Cloud Functions Triggers
Última atualização: 06/05/2026
Responsável: Equipe Data Platform - DestaquesGovbr