188 lines
6.6 KiB
Python
188 lines
6.6 KiB
Python
"""
|
|
BIGQUERY DATA SOURCE BASE CLASS
|
|
Kimball Data Warehouse Architecture
|
|
|
|
Layer Assignment:
|
|
RAW (Bronze) → Tempat load data mentah dari sumber eksternal
|
|
STAGING (Silver) → etl_logs, etl_metadata (via helpers)
|
|
DW (Gold) → dim_*, fact_* (di file terpisah)
|
|
|
|
Subclass yang menggunakan DataSource:
|
|
FAODataSource → load ke RAW (Bronze) : raw_fao
|
|
WorldBankDataSource → load ke RAW (Bronze) : raw_worldbank
|
|
UNICEFDataSource → load ke RAW (Bronze) : raw_unicef
|
|
"""
|
|
|
|
from abc import ABC, abstractmethod
|
|
import pandas as pd
|
|
import logging
|
|
from datetime import datetime
|
|
from typing import Dict
|
|
import json
|
|
|
|
from scripts.bigquery_config import get_bigquery_client, get_table_id, table_exists, CONFIG
|
|
from scripts.bigquery_helpers import log_update, load_to_bigquery, read_from_bigquery, save_etl_metadata
|
|
from google.cloud import bigquery
|
|
|
|
|
|
class DataSource(ABC):
|
|
"""
|
|
Abstract base class untuk semua sumber data dengan template ETL pattern.
|
|
Menggunakan Kimball DW methodology.
|
|
|
|
Kimball Flow untuk setiap DataSource:
|
|
fetch_data() → Extract dari sumber eksternal (FAO/WB/UNICEF)
|
|
transform_data() → Transform ke format standar
|
|
validate_data() → Cek kualitas data
|
|
load_to_database() → Load ke RAW layer (Bronze)
|
|
save_metadata() → Simpan metadata ke AUDIT layer
|
|
|
|
Subclass wajib implement:
|
|
fetch_data()
|
|
transform_data()
|
|
"""
|
|
|
|
def __init__(self, client: bigquery.Client = None):
|
|
self.client = client if client else get_bigquery_client()
|
|
self.logger = logging.getLogger(self.__class__.__name__)
|
|
self.logger.propagate = False
|
|
|
|
self.data = None
|
|
self.table_name = None
|
|
self.target_layer = "bronze"
|
|
self.asean_countries = CONFIG['asean_countries']
|
|
|
|
self.metadata = {
|
|
'source_class' : self.__class__.__name__,
|
|
'table_name' : None,
|
|
'execution_timestamp': None,
|
|
'duration_seconds' : None,
|
|
'rows_fetched' : 0,
|
|
'rows_transformed' : 0,
|
|
'rows_loaded' : 0,
|
|
'completeness_pct' : 0,
|
|
'config_snapshot' : json.dumps({
|
|
'threshold': float(CONFIG['matching']['threshold']),
|
|
'weights' : {k: float(v) for k, v in CONFIG['matching']['weights'].items()}
|
|
}),
|
|
'validation_metrics' : '{}'
|
|
}
|
|
|
|
@abstractmethod
|
|
def fetch_data(self) -> pd.DataFrame:
|
|
pass
|
|
|
|
@abstractmethod
|
|
def transform_data(self, df: pd.DataFrame) -> pd.DataFrame:
|
|
pass
|
|
|
|
def validate_data(self, df: pd.DataFrame) -> Dict:
|
|
validation = {
|
|
'total_rows' : int(len(df)),
|
|
'total_columns' : int(len(df.columns)),
|
|
'null_count' : {k: int(v) for k, v in df.isnull().sum().to_dict().items()},
|
|
'null_percentage' : {
|
|
k: float(v)
|
|
for k, v in (df.isnull().sum() / len(df) * 100).round(2).to_dict().items()
|
|
},
|
|
'duplicate_count' : int(df.duplicated().sum()),
|
|
'completeness_pct': float(round((1 - df.isnull().sum().sum() / df.size) * 100, 2)),
|
|
'memory_usage_mb' : float(round(df.memory_usage(deep=True).sum() / 1024**2, 2))
|
|
}
|
|
|
|
year_cols = [col for col in df.columns if 'year' in col.lower() or 'tahun' in col.lower()]
|
|
if year_cols:
|
|
year_col = year_cols[0]
|
|
validation['year_range'] = {
|
|
'min' : int(df[year_col].min()) if not df[year_col].isnull().all() else None,
|
|
'max' : int(df[year_col].max()) if not df[year_col].isnull().all() else None,
|
|
'unique_years': int(df[year_col].nunique())
|
|
}
|
|
|
|
return validation
|
|
|
|
def load_to_database(self, df: pd.DataFrame, table_name: str):
|
|
try:
|
|
load_to_bigquery(
|
|
self.client,
|
|
df,
|
|
table_name,
|
|
layer='bronze',
|
|
write_disposition="WRITE_TRUNCATE"
|
|
)
|
|
|
|
log_update(
|
|
self.client,
|
|
layer='RAW',
|
|
table_name=table_name,
|
|
update_method='full_refresh',
|
|
rows_affected=len(df)
|
|
)
|
|
|
|
except Exception as e:
|
|
log_update(
|
|
self.client,
|
|
layer='RAW',
|
|
table_name=table_name,
|
|
update_method='full_refresh',
|
|
rows_affected=0,
|
|
status='failed',
|
|
error_msg=str(e)
|
|
)
|
|
raise
|
|
|
|
def save_metadata(self):
|
|
try:
|
|
self.metadata['table_name'] = self.table_name
|
|
|
|
if isinstance(self.metadata.get('validation_metrics'), dict):
|
|
self.metadata['validation_metrics'] = json.dumps(
|
|
self.metadata['validation_metrics']
|
|
)
|
|
|
|
save_etl_metadata(self.client, self.metadata)
|
|
|
|
except Exception as e:
|
|
self.logger.warning(f"Failed to save ETL metadata to AUDIT: {str(e)}")
|
|
|
|
def run(self) -> pd.DataFrame:
|
|
start_time = datetime.now()
|
|
self.metadata['execution_timestamp'] = start_time
|
|
|
|
try:
|
|
# 1. EXTRACT
|
|
raw_data = self.fetch_data()
|
|
self.metadata['rows_fetched'] = len(raw_data) if hasattr(raw_data, '__len__') else 0
|
|
|
|
# 2. TRANSFORM
|
|
self.data = self.transform_data(raw_data)
|
|
self.metadata['rows_transformed'] = len(self.data)
|
|
|
|
# 3. VALIDATE
|
|
validation = self.validate_data(self.data)
|
|
self.metadata['completeness_pct'] = validation.get('completeness_pct', 0)
|
|
self.metadata['validation_metrics'] = json.dumps({
|
|
'total_rows' : validation['total_rows'],
|
|
'completeness_pct': validation['completeness_pct'],
|
|
'duplicate_count' : validation['duplicate_count']
|
|
})
|
|
|
|
# 4. LOAD → RAW layer (Bronze)
|
|
self.load_to_database(self.data, self.table_name)
|
|
self.metadata['rows_loaded'] = len(self.data)
|
|
|
|
# 5. METADATA → AUDIT layer
|
|
end_time = datetime.now()
|
|
self.metadata['duration_seconds'] = (end_time - start_time).total_seconds()
|
|
self.save_metadata()
|
|
|
|
return self.data
|
|
|
|
except Exception as e:
|
|
raise
|
|
|
|
|
|
print("DataSource base class loaded — Kimball DW Architecture")
|
|
print(" Default target layer : RAW (Bronze)")
|
|
print(" Audit logs : AUDIT via etl_logs")
|
|
print(" ETL metadata : AUDIT via etl_metadata") |