""" BIGQUERY DATA SOURCE BASE CLASS Kimball Data Warehouse Architecture Layer Assignment: RAW (Bronze) → Tempat load data mentah dari sumber eksternal STAGING (Silver) → etl_logs, etl_metadata (via helpers) DW (Gold) → dim_*, fact_* (di file terpisah) Subclass yang menggunakan DataSource: FAODataSource → load ke RAW (Bronze) : raw_fao WorldBankDataSource → load ke RAW (Bronze) : raw_worldbank UNICEFDataSource → load ke RAW (Bronze) : raw_unicef """ from abc import ABC, abstractmethod import pandas as pd import logging from datetime import datetime from typing import Dict import json from scripts.bigquery_config import get_bigquery_client, get_table_id, table_exists, CONFIG from scripts.bigquery_helpers import log_update, load_to_bigquery, read_from_bigquery, save_etl_metadata from google.cloud import bigquery class DataSource(ABC): """ Abstract base class untuk semua sumber data dengan template ETL pattern. Menggunakan Kimball DW methodology. Kimball Flow untuk setiap DataSource: fetch_data() → Extract dari sumber eksternal (FAO/WB/UNICEF) transform_data() → Transform ke format standar validate_data() → Cek kualitas data load_to_database() → Load ke RAW layer (Bronze) save_metadata() → Simpan metadata ke AUDIT layer Subclass wajib implement: fetch_data() transform_data() """ def __init__(self, client: bigquery.Client = None): self.client = client if client else get_bigquery_client() self.logger = logging.getLogger(self.__class__.__name__) self.logger.propagate = False self.data = None self.table_name = None self.target_layer = "bronze" self.asean_countries = CONFIG['asean_countries'] self.metadata = { 'source_class' : self.__class__.__name__, 'table_name' : None, 'execution_timestamp': None, 'duration_seconds' : None, 'rows_fetched' : 0, 'rows_transformed' : 0, 'rows_loaded' : 0, 'completeness_pct' : 0, 'config_snapshot' : json.dumps({ 'threshold': float(CONFIG['matching']['threshold']), 'weights' : {k: float(v) for k, v in CONFIG['matching']['weights'].items()} }), 'validation_metrics' : '{}' } @abstractmethod def fetch_data(self) -> pd.DataFrame: pass @abstractmethod def transform_data(self, df: pd.DataFrame) -> pd.DataFrame: pass def validate_data(self, df: pd.DataFrame) -> Dict: validation = { 'total_rows' : int(len(df)), 'total_columns' : int(len(df.columns)), 'null_count' : {k: int(v) for k, v in df.isnull().sum().to_dict().items()}, 'null_percentage' : { k: float(v) for k, v in (df.isnull().sum() / len(df) * 100).round(2).to_dict().items() }, 'duplicate_count' : int(df.duplicated().sum()), 'completeness_pct': float(round((1 - df.isnull().sum().sum() / df.size) * 100, 2)), 'memory_usage_mb' : float(round(df.memory_usage(deep=True).sum() / 1024**2, 2)) } year_cols = [col for col in df.columns if 'year' in col.lower() or 'tahun' in col.lower()] if year_cols: year_col = year_cols[0] validation['year_range'] = { 'min' : int(df[year_col].min()) if not df[year_col].isnull().all() else None, 'max' : int(df[year_col].max()) if not df[year_col].isnull().all() else None, 'unique_years': int(df[year_col].nunique()) } return validation def load_to_database(self, df: pd.DataFrame, table_name: str): try: load_to_bigquery( self.client, df, table_name, layer='bronze', write_disposition="WRITE_TRUNCATE" ) log_update( self.client, layer='RAW', table_name=table_name, update_method='full_refresh', rows_affected=len(df) ) except Exception as e: log_update( self.client, layer='RAW', table_name=table_name, update_method='full_refresh', rows_affected=0, status='failed', error_msg=str(e) ) raise def save_metadata(self): try: self.metadata['table_name'] = self.table_name if isinstance(self.metadata.get('validation_metrics'), dict): self.metadata['validation_metrics'] = json.dumps( self.metadata['validation_metrics'] ) save_etl_metadata(self.client, self.metadata) except Exception as e: self.logger.warning(f"Failed to save ETL metadata to AUDIT: {str(e)}") def run(self) -> pd.DataFrame: start_time = datetime.now() self.metadata['execution_timestamp'] = start_time try: # 1. EXTRACT raw_data = self.fetch_data() self.metadata['rows_fetched'] = len(raw_data) if hasattr(raw_data, '__len__') else 0 # 2. TRANSFORM self.data = self.transform_data(raw_data) self.metadata['rows_transformed'] = len(self.data) # 3. VALIDATE validation = self.validate_data(self.data) self.metadata['completeness_pct'] = validation.get('completeness_pct', 0) self.metadata['validation_metrics'] = json.dumps({ 'total_rows' : validation['total_rows'], 'completeness_pct': validation['completeness_pct'], 'duplicate_count' : validation['duplicate_count'] }) # 4. LOAD → RAW layer (Bronze) self.load_to_database(self.data, self.table_name) self.metadata['rows_loaded'] = len(self.data) # 5. METADATA → AUDIT layer end_time = datetime.now() self.metadata['duration_seconds'] = (end_time - start_time).total_seconds() self.save_metadata() return self.data except Exception as e: raise print("DataSource base class loaded — Kimball DW Architecture") print(" Default target layer : RAW (Bronze)") print(" Audit logs : AUDIT via etl_logs") print(" ETL metadata : AUDIT via etl_metadata")