""" BIGQUERY DATA SOURCE BASE CLASS Kimball Data Warehouse Architecture Layer Assignment: RAW (Bronze) → Tempat load data mentah dari sumber eksternal STAGING (Silver) → etl_logs, etl_metadata (via helpers) DW (Gold) → dim_*, fact_* (di file terpisah) Subclass yang menggunakan DataSource: FAODataSource → load ke RAW (Bronze) : raw_fao WorldBankDataSource → load ke RAW (Bronze) : raw_worldbank UNICEFDataSource → load ke RAW (Bronze) : raw_unicef Changes from MySQL version: 1. Replace SQLAlchemy engine → BigQuery client 2. Replace to_sql() → load_table_from_dataframe() 3. load_to_database() default layer = 'bronze' (RAW layer) 4. log_update() menggunakan label 'RAW' sesuai Kimball terminology 5. save_metadata() → save_etl_metadata() ke STAGING layer (Silver) """ from abc import ABC, abstractmethod import pandas as pd import logging from datetime import datetime from typing import Dict import json from bigquery_config import get_bigquery_client, get_table_id, table_exists, CONFIG from bigquery_helpers import log_update, load_to_bigquery, read_from_bigquery, save_etl_metadata from google.cloud import bigquery class DataSource(ABC): """ Abstract base class untuk semua sumber data dengan template ETL pattern. Menggunakan Kimball DW methodology. Kimball Flow untuk setiap DataSource: fetch_data() → Extract dari sumber eksternal (FAO/WB/UNICEF) transform_data() → Transform ke format standar validate_data() → Cek kualitas data load_to_database() → Load ke RAW layer (Bronze) save_metadata() → Simpan metadata ke STAGING layer (Silver) Subclass wajib implement: fetch_data() transform_data() """ def __init__(self, client: bigquery.Client = None): """ Initialize DataSource dengan BigQuery client. Args: client: BigQuery client (jika None, akan dibuat baru) """ self.client = client if client else get_bigquery_client() self.logger = logging.getLogger(self.__class__.__name__) self.logger.propagate = False self.data = None self.table_name = None self.target_layer = "bronze" # RAW layer — default untuk semua data sources self.asean_countries = CONFIG['asean_countries'] # Metadata untuk tracking reproducibility (disimpan ke STAGING/Silver) self.metadata = { 'source_class' : self.__class__.__name__, 'table_name' : None, 'execution_timestamp': None, 'duration_seconds' : None, 'rows_fetched' : 0, 'rows_transformed' : 0, 'rows_loaded' : 0, 'completeness_pct' : 0, 'config_snapshot' : json.dumps({ 'threshold': float(CONFIG['matching']['threshold']), 'weights' : {k: float(v) for k, v in CONFIG['matching']['weights'].items()} }), 'validation_metrics' : '{}' } @abstractmethod def fetch_data(self) -> pd.DataFrame: """ Extract data mentah dari sumber eksternal. WAJIB diimplementasikan oleh subclass. """ pass @abstractmethod def transform_data(self, df: pd.DataFrame) -> pd.DataFrame: """ Transform data ke format standar sebelum load ke RAW layer. WAJIB diimplementasikan oleh subclass. """ pass def validate_data(self, df: pd.DataFrame) -> Dict: """ Validasi kualitas data hasil transform sebelum load ke RAW layer. Metrics yang dihitung: total_rows, total_columns — dimensi data null_count, null_percentage — kelengkapan per kolom duplicate_count — duplikasi data completeness_pct — persentase kelengkapan keseluruhan memory_usage_mb — ukuran data di memori year_range — rentang tahun (jika ada kolom year) Returns: Dict: Validation metrics """ validation = { 'total_rows' : int(len(df)), 'total_columns' : int(len(df.columns)), 'null_count' : {k: int(v) for k, v in df.isnull().sum().to_dict().items()}, 'null_percentage' : { k: float(v) for k, v in (df.isnull().sum() / len(df) * 100).round(2).to_dict().items() }, 'duplicate_count' : int(df.duplicated().sum()), 'completeness_pct': float(round((1 - df.isnull().sum().sum() / df.size) * 100, 2)), 'memory_usage_mb' : float(round(df.memory_usage(deep=True).sum() / 1024**2, 2)) } # Deteksi kolom year untuk year range info year_cols = [col for col in df.columns if 'year' in col.lower() or 'tahun' in col.lower()] if year_cols: year_col = year_cols[0] validation['year_range'] = { 'min' : int(df[year_col].min()) if not df[year_col].isnull().all() else None, 'max' : int(df[year_col].max()) if not df[year_col].isnull().all() else None, 'unique_years': int(df[year_col].nunique()) } return validation def load_to_database(self, df: pd.DataFrame, table_name: str): """ Load data ke RAW layer (Bronze) dengan full refresh strategy. Kimball context: RAW layer adalah landing zone pertama untuk data mentah dari sumber. Menggunakan WRITE_TRUNCATE (full refresh) karena data sumber bisa berubah setiap kali pipeline dijalankan. Args: df : DataFrame hasil transform table_name : Nama table tujuan di RAW layer (Bronze) Audit: Setiap load dicatat ke etl_logs di STAGING layer (Silver) """ try: # Load ke RAW layer (Bronze) — full refresh load_to_bigquery( self.client, df, table_name, layer='bronze', # RAW layer write_disposition="WRITE_TRUNCATE" # Full refresh ) # Audit log ke STAGING layer (Silver) log_update( self.client, layer='RAW', # Label Kimball table_name=table_name, update_method='full_refresh', rows_affected=len(df) ) except Exception as e: log_update( self.client, layer='RAW', table_name=table_name, update_method='full_refresh', rows_affected=0, status='failed', error_msg=str(e) ) raise def save_metadata(self): """ Simpan metadata eksekusi ETL ke STAGING layer (Silver). Kimball context: ETL metadata (execution time, row counts, completeness, dll.) disimpan di Staging layer sebagai operational/audit table, bukan bagian dari Star Schema di DW layer. Metadata yang disimpan: source_class, table_name, execution_timestamp, duration_seconds, rows_fetched/transformed/loaded, completeness_pct, config_snapshot, validation_metrics """ try: self.metadata['table_name'] = self.table_name # Pastikan validation_metrics dalam format JSON string if isinstance(self.metadata.get('validation_metrics'), dict): self.metadata['validation_metrics'] = json.dumps( self.metadata['validation_metrics'] ) # Save ke STAGING layer (Silver) via helper save_etl_metadata(self.client, self.metadata) except Exception as e: # Silent fail — metadata tracking tidak boleh menghentikan proses ETL self.logger.warning(f"Failed to save ETL metadata to STAGING: {str(e)}") def run(self) -> pd.DataFrame: """ Jalankan full ETL pipeline: Extract → Transform → Validate → Load → Metadata. Kimball ETL steps: 1. EXTRACT — fetch_data() : Ambil dari sumber eksternal 2. TRANSFORM — transform_data() : Standardize format 3. VALIDATE — validate_data() : Cek kualitas 4. LOAD — load_to_database() : Load ke RAW layer (Bronze) 5. METADATA — save_metadata() : Simpan ke STAGING layer (Silver) Returns: pd.DataFrame: Data yang sudah di-load ke RAW layer """ start_time = datetime.now() self.metadata['execution_timestamp'] = start_time try: # 1. EXTRACT raw_data = self.fetch_data() self.metadata['rows_fetched'] = len(raw_data) if hasattr(raw_data, '__len__') else 0 # 2. TRANSFORM self.data = self.transform_data(raw_data) self.metadata['rows_transformed'] = len(self.data) # 3. VALIDATE validation = self.validate_data(self.data) self.metadata['completeness_pct'] = validation.get('completeness_pct', 0) self.metadata['validation_metrics'] = json.dumps({ 'total_rows' : validation['total_rows'], 'completeness_pct': validation['completeness_pct'], 'duplicate_count' : validation['duplicate_count'] }) # 4. LOAD → RAW layer (Bronze) self.load_to_database(self.data, self.table_name) self.metadata['rows_loaded'] = len(self.data) # 5. METADATA → STAGING layer (Silver) end_time = datetime.now() self.metadata['duration_seconds'] = (end_time - start_time).total_seconds() self.save_metadata() return self.data except Exception as e: raise print("DataSource base class loaded — Kimball DW Architecture") print(" Default target layer : RAW (Bronze)") print(" Audit logs : STAGING (Silver) via etl_logs") print(" ETL metadata : STAGING (Silver) via etl_metadata")