Pular para conteúdo

Metricas de Qualidade do Pipeline

Metricas avancadas para monitorar cobertura de classificacao, consistencia tematica, qualidade de resumos e deteccao de drift.


Parte 1: Cobertura de Classificacao

# metrics/classification_coverage.py
import pandas as pd
from typing import Dict, Any
from datetime import datetime, timedelta

class ClassificationMetrics:
    """Metricas de cobertura e qualidade de classificacao."""

    def __init__(self, df: pd.DataFrame):
        self.df = df

    def coverage_rate(self) -> float:
        """Percentual de documentos com tema atribuido."""
        if 'themes' not in self.df.columns:
            return 0.0

        has_theme = self.df['themes'].apply(
            lambda x: bool(x) if isinstance(x, list) else False
        )
        return has_theme.mean()

    def cogfy_classification_rate(self) -> float:
        """Percentual processado pelo Cogfy."""
        if 'cogfy_classification' not in self.df.columns:
            return 0.0

        return self.df['cogfy_classification'].notna().mean()

    def summary_rate(self) -> float:
        """Percentual com resumo gerado."""
        if 'cogfy_summary' not in self.df.columns:
            return 0.0

        return self.df['cogfy_summary'].notna().mean()

    def coverage_by_agency(self) -> pd.DataFrame:
        """Cobertura de classificacao por orgao."""
        if 'themes' not in self.df.columns:
            return pd.DataFrame()

        def has_theme(x):
            return bool(x) if isinstance(x, list) else False

        return self.df.groupby('agency').agg({
            'unique_id': 'count',
            'themes': lambda x: x.apply(has_theme).mean(),
            'cogfy_classification': lambda x: x.notna().mean() if 'cogfy_classification' in self.df.columns else 0,
        }).rename(columns={
            'unique_id': 'total_docs',
            'themes': 'theme_coverage',
            'cogfy_classification': 'cogfy_coverage',
        }).sort_values('total_docs', ascending=False)

    def coverage_trend(self, days: int = 30) -> pd.DataFrame:
        """Tendencia de cobertura nos ultimos N dias."""
        cutoff = datetime.now() - timedelta(days=days)
        recent = self.df[self.df['scraped_at'] >= cutoff].copy()

        recent['date'] = recent['scraped_at'].dt.date

        def has_theme(x):
            return bool(x) if isinstance(x, list) else False

        return recent.groupby('date').agg({
            'unique_id': 'count',
            'themes': lambda x: x.apply(has_theme).mean(),
        }).rename(columns={
            'unique_id': 'daily_docs',
            'themes': 'daily_coverage',
        })

    def get_report(self) -> Dict[str, Any]:
        """Relatorio completo de cobertura."""
        return {
            'overall_theme_coverage': self.coverage_rate(),
            'cogfy_classification_rate': self.cogfy_classification_rate(),
            'summary_rate': self.summary_rate(),
            'coverage_by_agency': self.coverage_by_agency().to_dict(),
            'timestamp': datetime.now().isoformat(),
        }

# Uso
df = pd.read_parquet("documents.parquet")
metrics = ClassificationMetrics(df)
print(f"Cobertura de temas: {metrics.coverage_rate():.1%}")
print(f"Taxa Cogfy: {metrics.cogfy_classification_rate():.1%}")

Parte 2: Consistencia Tematica

flowchart TD
    subgraph "Hierarquia Tematica"
        A[Governo] --> B[Educacao]
        A --> C[Saude]
        A --> D[Economia]

        B --> B1[Ensino Superior]
        B --> B2[Educacao Basica]
        B --> B3[Bolsas]

        C --> C1[SUS]
        C --> C2[Vigilancia]
        C --> C3[Vacinacao]
    end
# metrics/thematic_consistency.py
from typing import Dict, List, Set, Any
import pandas as pd

class ThematicConsistency:
    """Verifica consistencia da hierarquia tematica."""

    # Arvore tematica do DestaquesGovBr
    THEME_HIERARCHY = {
        'governo': {
            'educacao': ['ensino-superior', 'educacao-basica', 'bolsas', 'enem', 'prouni'],
            'saude': ['sus', 'vigilancia', 'vacinacao', 'hospitais', 'medicamentos'],
            'economia': ['orcamento', 'impostos', 'investimentos', 'comercio'],
            'infraestrutura': ['transportes', 'energia', 'saneamento', 'telecomunicacoes'],
            'meio-ambiente': ['preservacao', 'clima', 'fauna', 'flora', 'licenciamento'],
            'seguranca': ['policia-federal', 'fronteiras', 'prisional', 'defesa'],
            'social': ['assistencia', 'previdencia', 'trabalho', 'cultura'],
        }
    }

    def __init__(self, df: pd.DataFrame):
        self.df = df
        self.valid_themes = self._build_valid_themes()

    def _build_valid_themes(self) -> Set[str]:
        """Constroi set de todos os temas validos."""
        valid = set()
        for category, subcategories in self.THEME_HIERARCHY.get('governo', {}).items():
            valid.add(category)
            valid.update(subcategories)
        return valid

    def check_theme_validity(self) -> Dict[str, Any]:
        """Verifica se temas atribuidos sao validos."""
        invalid_themes = {}

        for idx, row in self.df.iterrows():
            themes = row.get('themes', [])
            if not themes:
                continue

            for theme in themes:
                if theme.lower() not in self.valid_themes:
                    if theme not in invalid_themes:
                        invalid_themes[theme] = 0
                    invalid_themes[theme] += 1

        return {
            'total_invalid_themes': len(invalid_themes),
            'invalid_theme_counts': invalid_themes,
        }

    def check_hierarchy_consistency(self) -> List[Dict]:
        """Verifica se temas respeitam hierarquia (pai + filho)."""
        inconsistencies = []

        for idx, row in self.df.iterrows():
            themes = row.get('themes', [])
            if not themes or len(themes) < 2:
                continue

            themes_set = set(t.lower() for t in themes)

            # Verifica se subcategoria esta presente sem categoria pai
            for category, subcategories in self.THEME_HIERARCHY.get('governo', {}).items():
                for sub in subcategories:
                    if sub in themes_set and category not in themes_set:
                        inconsistencies.append({
                            'unique_id': row['unique_id'],
                            'themes': themes,
                            'issue': f"Subcategoria '{sub}' sem categoria pai '{category}'",
                        })
                        break

        return inconsistencies

    def theme_distribution(self) -> pd.DataFrame:
        """Distribuicao de temas no dataset."""
        theme_counts = {}

        for themes in self.df['themes'].dropna():
            if isinstance(themes, list):
                for theme in themes:
                    theme_counts[theme] = theme_counts.get(theme, 0) + 1

        return pd.DataFrame([
            {'theme': k, 'count': v}
            for k, v in sorted(theme_counts.items(), key=lambda x: -x[1])
        ])

# Uso
df = pd.read_parquet("documents.parquet")
consistency = ThematicConsistency(df)

validity = consistency.check_theme_validity()
print(f"Temas invalidos: {validity['total_invalid_themes']}")

inconsistencies = consistency.check_hierarchy_consistency()
print(f"Inconsistencias hierarquicas: {len(inconsistencies)}")

Parte 3: Qualidade dos Resumos

# metrics/summary_quality.py
from typing import Dict, Any, List
import pandas as pd
import numpy as np

# Para metricas ROUGE
try:
    from rouge_score import rouge_scorer
    ROUGE_AVAILABLE = True
except ImportError:
    ROUGE_AVAILABLE = False
    print("Instale rouge-score: pip install rouge-score")

# Para BERTScore
try:
    from bert_score import score as bert_score
    BERT_AVAILABLE = True
except ImportError:
    BERT_AVAILABLE = False
    print("Instale bert-score: pip install bert-score")

class SummaryQualityMetrics:
    """Metricas de qualidade para resumos gerados."""

    def __init__(self):
        if ROUGE_AVAILABLE:
            self.rouge_scorer = rouge_scorer.RougeScorer(
                ['rouge1', 'rouge2', 'rougeL'],
                use_stemmer=True
            )

    def compute_rouge(
        self,
        summaries: List[str],
        references: List[str]
    ) -> Dict[str, float]:
        """
        Calcula metricas ROUGE.

        Args:
            summaries: Resumos gerados pelo modelo
            references: Textos originais (body)

        Returns:
            Dict com scores ROUGE-1, ROUGE-2, ROUGE-L
        """
        if not ROUGE_AVAILABLE:
            return {'error': 'rouge-score nao instalado'}

        scores = {'rouge1': [], 'rouge2': [], 'rougeL': []}

        for summary, reference in zip(summaries, references):
            if not summary or not reference:
                continue

            result = self.rouge_scorer.score(reference, summary)
            for key in scores:
                scores[key].append(result[key].fmeasure)

        return {
            'rouge1_mean': np.mean(scores['rouge1']),
            'rouge2_mean': np.mean(scores['rouge2']),
            'rougeL_mean': np.mean(scores['rougeL']),
            'sample_size': len(scores['rouge1']),
        }

    def compute_bertscore(
        self,
        summaries: List[str],
        references: List[str],
        lang: str = 'pt'
    ) -> Dict[str, float]:
        """
        Calcula BERTScore para qualidade semantica.

        Nota: Requer GPU para performance.
        """
        if not BERT_AVAILABLE:
            return {'error': 'bert-score nao instalado'}

        # Filtrar pares validos
        valid_pairs = [
            (s, r) for s, r in zip(summaries, references)
            if s and r
        ]

        if not valid_pairs:
            return {'error': 'Nenhum par valido para avaliacao'}

        summaries_clean, references_clean = zip(*valid_pairs)

        P, R, F1 = bert_score(
            list(summaries_clean),
            list(references_clean),
            lang=lang,
            verbose=False
        )

        return {
            'bertscore_precision': P.mean().item(),
            'bertscore_recall': R.mean().item(),
            'bertscore_f1': F1.mean().item(),
            'sample_size': len(valid_pairs),
        }

    def length_analysis(
        self,
        summaries: List[str],
        references: List[str]
    ) -> Dict[str, Any]:
        """Analisa proporcao de tamanho resumo/original."""
        ratios = []

        for summary, reference in zip(summaries, references):
            if summary and reference:
                ratio = len(summary) / len(reference)
                ratios.append(ratio)

        if not ratios:
            return {}

        return {
            'mean_compression_ratio': np.mean(ratios),
            'median_compression_ratio': np.median(ratios),
            'min_ratio': np.min(ratios),
            'max_ratio': np.max(ratios),
            'ideal_range': '0.1-0.3 (10-30% do original)',
        }

    def evaluate_sample(
        self,
        df: pd.DataFrame,
        summary_col: str = 'cogfy_summary',
        reference_col: str = 'body',
        sample_size: int = 100
    ) -> Dict[str, Any]:
        """
        Avalia uma amostra do dataset.

        Args:
            df: DataFrame com documentos
            summary_col: Coluna com resumos
            reference_col: Coluna com texto original
            sample_size: Tamanho da amostra
        """
        # Filtrar documentos com resumo e body
        valid = df[
            (df[summary_col].notna()) &
            (df[reference_col].notna())
        ]

        if len(valid) == 0:
            return {'error': 'Nenhum documento com resumo e body'}

        # Amostra
        sample = valid.sample(min(sample_size, len(valid)))

        summaries = sample[summary_col].tolist()
        references = sample[reference_col].tolist()

        return {
            'sample_size': len(sample),
            'rouge_scores': self.compute_rouge(summaries, references),
            'length_analysis': self.length_analysis(summaries, references),
            # BERTScore e lento - usar apenas quando necessario
            # 'bertscore': self.compute_bertscore(summaries, references),
        }

# Uso
df = pd.read_parquet("documents.parquet")
quality = SummaryQualityMetrics()
report = quality.evaluate_sample(df, sample_size=50)
print(f"ROUGE-L medio: {report['rouge_scores'].get('rougeL_mean', 'N/A'):.3f}")

Parte 4: Monitoramento de Drift

# metrics/drift_detector.py
import pandas as pd
import numpy as np
from typing import Dict, Any, Optional
from datetime import datetime, timedelta
from scipy import stats

class DriftDetector:
    """Detecta mudancas na distribuicao de dados ao longo do tempo."""

    def __init__(self, df: pd.DataFrame, date_column: str = 'scraped_at'):
        self.df = df
        self.date_column = date_column

    def detect_volume_drift(
        self,
        window_days: int = 7,
        baseline_days: int = 30
    ) -> Dict[str, Any]:
        """
        Compara volume recente com baseline historico.
        """
        now = datetime.now()

        # Periodo recente
        recent_start = now - timedelta(days=window_days)
        recent = self.df[self.df[self.date_column] >= recent_start]

        # Baseline (periodo anterior)
        baseline_end = recent_start
        baseline_start = baseline_end - timedelta(days=baseline_days)
        baseline = self.df[
            (self.df[self.date_column] >= baseline_start) &
            (self.df[self.date_column] < baseline_end)
        ]

        # Volumes diarios
        recent_daily = recent.groupby(
            recent[self.date_column].dt.date
        ).size().mean()

        baseline_daily = baseline.groupby(
            baseline[self.date_column].dt.date
        ).size().mean()

        # Variacao percentual
        if baseline_daily > 0:
            pct_change = (recent_daily - baseline_daily) / baseline_daily
        else:
            pct_change = 0

        return {
            'recent_daily_avg': recent_daily,
            'baseline_daily_avg': baseline_daily,
            'pct_change': pct_change,
            'drift_detected': abs(pct_change) > 0.3,  # >30% mudanca
            'direction': 'increase' if pct_change > 0 else 'decrease',
        }

    def detect_agency_drift(
        self,
        window_days: int = 7,
        baseline_days: int = 30
    ) -> Dict[str, Any]:
        """
        Detecta mudancas na distribuicao de orgaos.
        """
        now = datetime.now()

        # Periodos
        recent = self.df[self.df[self.date_column] >= now - timedelta(days=window_days)]
        baseline = self.df[
            (self.df[self.date_column] >= now - timedelta(days=baseline_days)) &
            (self.df[self.date_column] < now - timedelta(days=window_days))
        ]

        # Distribuicoes
        recent_dist = recent['agency'].value_counts(normalize=True)
        baseline_dist = baseline['agency'].value_counts(normalize=True)

        # Alinhar indices
        all_agencies = set(recent_dist.index) | set(baseline_dist.index)
        recent_aligned = recent_dist.reindex(all_agencies, fill_value=0)
        baseline_aligned = baseline_dist.reindex(all_agencies, fill_value=0)

        # Teste Chi-quadrado
        try:
            chi2, p_value = stats.chisquare(
                recent_aligned.values,
                baseline_aligned.values + 1e-10  # Evitar divisao por zero
            )
        except:
            chi2, p_value = 0, 1.0

        # Maiores diferencas
        diff = (recent_aligned - baseline_aligned).abs().sort_values(ascending=False)

        return {
            'chi2_statistic': chi2,
            'p_value': p_value,
            'drift_detected': p_value < 0.05,
            'top_changes': diff.head(5).to_dict(),
        }

    def detect_theme_drift(
        self,
        window_days: int = 7,
        baseline_days: int = 30
    ) -> Dict[str, Any]:
        """
        Detecta mudancas na distribuicao de temas.
        """
        now = datetime.now()

        def extract_themes(series):
            themes = {}
            for theme_list in series.dropna():
                if isinstance(theme_list, list):
                    for theme in theme_list:
                        themes[theme] = themes.get(theme, 0) + 1
            total = sum(themes.values())
            return {k: v/total for k, v in themes.items()} if total > 0 else {}

        recent = self.df[self.df[self.date_column] >= now - timedelta(days=window_days)]
        baseline = self.df[
            (self.df[self.date_column] >= now - timedelta(days=baseline_days)) &
            (self.df[self.date_column] < now - timedelta(days=window_days))
        ]

        recent_themes = extract_themes(recent.get('themes', pd.Series()))
        baseline_themes = extract_themes(baseline.get('themes', pd.Series()))

        # Novos temas
        new_themes = set(recent_themes.keys()) - set(baseline_themes.keys())

        # Temas que desapareceram
        missing_themes = set(baseline_themes.keys()) - set(recent_themes.keys())

        return {
            'new_themes': list(new_themes),
            'missing_themes': list(missing_themes),
            'theme_count_recent': len(recent_themes),
            'theme_count_baseline': len(baseline_themes),
        }

    def get_drift_report(self) -> Dict[str, Any]:
        """Relatorio completo de drift."""
        return {
            'timestamp': datetime.now().isoformat(),
            'volume_drift': self.detect_volume_drift(),
            'agency_drift': self.detect_agency_drift(),
            'theme_drift': self.detect_theme_drift(),
        }

# Uso
df = pd.read_parquet("documents.parquet")
detector = DriftDetector(df)
report = detector.get_drift_report()

if report['volume_drift']['drift_detected']:
    print(f"ALERTA: Drift de volume detectado ({report['volume_drift']['pct_change']:.1%})")

Parte 5: Metricas de Scraping

# metrics/scraping_metrics.py
import pandas as pd
from typing import Dict, Any, List
from datetime import datetime, timedelta
from dataclasses import dataclass

@dataclass
class ScrapingRun:
    """Registro de uma execucao de scraping."""
    run_id: str
    start_time: datetime
    end_time: datetime
    agency: str
    documents_scraped: int
    documents_new: int
    documents_updated: int
    errors: List[Dict[str, Any]]
    status: str  # 'success', 'partial', 'failed'

class ScrapingMetrics:
    """Metricas de performance do scraping."""

    def __init__(self, runs: List[ScrapingRun]):
        self.runs = runs

    def success_rate(self, days: int = 7) -> Dict[str, float]:
        """Taxa de sucesso por orgao nos ultimos N dias."""
        cutoff = datetime.now() - timedelta(days=days)
        recent_runs = [r for r in self.runs if r.start_time >= cutoff]

        rates = {}
        for agency in set(r.agency for r in recent_runs):
            agency_runs = [r for r in recent_runs if r.agency == agency]
            success_count = len([r for r in agency_runs if r.status == 'success'])
            rates[agency] = success_count / len(agency_runs) if agency_runs else 0

        return rates

    def error_analysis(self, days: int = 7) -> Dict[str, Any]:
        """Analise de erros recentes."""
        cutoff = datetime.now() - timedelta(days=days)
        recent_runs = [r for r in self.runs if r.start_time >= cutoff]

        error_types = {}
        error_by_agency = {}

        for run in recent_runs:
            for error in run.errors:
                error_type = error.get('type', 'unknown')
                error_types[error_type] = error_types.get(error_type, 0) + 1

                if run.agency not in error_by_agency:
                    error_by_agency[run.agency] = 0
                error_by_agency[run.agency] += 1

        return {
            'total_errors': sum(error_types.values()),
            'error_types': error_types,
            'error_by_agency': error_by_agency,
        }

    def throughput(self, days: int = 7) -> Dict[str, float]:
        """Documentos processados por hora."""
        cutoff = datetime.now() - timedelta(days=days)
        recent_runs = [r for r in self.runs if r.start_time >= cutoff]

        total_docs = sum(r.documents_scraped for r in recent_runs)
        total_hours = sum(
            (r.end_time - r.start_time).total_seconds() / 3600
            for r in recent_runs
        )

        return {
            'docs_per_hour': total_docs / total_hours if total_hours > 0 else 0,
            'total_documents': total_docs,
            'total_hours': total_hours,
        }

    def latency_stats(self) -> Dict[str, float]:
        """Estatisticas de latencia de scraping."""
        durations = [
            (r.end_time - r.start_time).total_seconds() / 60  # minutos
            for r in self.runs
        ]

        if not durations:
            return {}

        return {
            'mean_duration_min': sum(durations) / len(durations),
            'min_duration_min': min(durations),
            'max_duration_min': max(durations),
            'p95_duration_min': sorted(durations)[int(len(durations) * 0.95)],
        }

    def get_dashboard_metrics(self) -> Dict[str, Any]:
        """Metricas para dashboard."""
        return {
            'timestamp': datetime.now().isoformat(),
            'success_rates': self.success_rate(),
            'error_analysis': self.error_analysis(),
            'throughput': self.throughput(),
            'latency': self.latency_stats(),
        }



Voltar para Introducao e Validacao