raw and staging data

This commit is contained in:
Debby
2026-03-12 14:57:30 +07:00
parent 847a6a9859
commit 0235dfbc75
5 changed files with 30 additions and 219 deletions

View File

@@ -1,17 +0,0 @@
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
from scripts.test_data import run_fao_test
with DAG(
dag_id="etl_fao_bigquery",
start_date=datetime(2026, 3, 3),
schedule_interval="@daily",
catchup=False
) as dag:
task_load_fao = PythonOperator(
task_id="load_fao_to_bigquery",
python_callable=run_fao_test
)

View File

@@ -21,6 +21,7 @@ Kimball ETL Flow:
""" """
import os import os
import json
from pathlib import Path from pathlib import Path
from google.cloud import bigquery from google.cloud import bigquery
from google.oauth2 import service_account from google.oauth2 import service_account
@@ -88,25 +89,6 @@ KIMBALL_LAYER_MAP = {
"dw" : "gold", "dw" : "gold",
} }
# SETUP BIGQUERY CLIENT
def get_bigquery_client() -> bigquery.Client:
"""
Create BigQuery client dengan service account credentials
Returns:
bigquery.Client: Authenticated BigQuery client
"""
credentials = service_account.Credentials.from_service_account_file(
CREDENTIALS_PATH,
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
return bigquery.Client(
credentials=credentials,
project=PROJECT_ID,
location=LOCATION
)
# MATCHING CONFIGURATION # MATCHING CONFIGURATION
CONFIG = { CONFIG = {
@@ -166,7 +148,6 @@ for directory in [EXPORTS_DIR, LOGS_DIR]:
# HELPER FUNCTIONS # HELPER FUNCTIONS
def get_table_id(table_name: str, layer: str = "bronze") -> str: def get_table_id(table_name: str, layer: str = "bronze") -> str:
# Resolve Kimball alias ke layer name # Resolve Kimball alias ke layer name
resolved = KIMBALL_LAYER_MAP.get(layer.lower(), layer.lower()) resolved = KIMBALL_LAYER_MAP.get(layer.lower(), layer.lower())
dataset = LAYER_DATASET_MAP.get(resolved, DATASET_BRONZE) dataset = LAYER_DATASET_MAP.get(resolved, DATASET_BRONZE)
@@ -174,17 +155,6 @@ def get_table_id(table_name: str, layer: str = "bronze") -> str:
def table_exists(client: bigquery.Client, table_name: str, layer: str = "bronze") -> bool: def table_exists(client: bigquery.Client, table_name: str, layer: str = "bronze") -> bool:
"""
Check apakah table ada di BigQuery
Args:
client : BigQuery client
table_name : Nama table
layer : Layer — 'bronze'/'raw', 'silver'/'staging', 'gold'/'dw'
Returns:
bool: True jika table ada
"""
try: try:
client.get_table(get_table_id(table_name, layer)) client.get_table(get_table_id(table_name, layer))
return True return True
@@ -193,14 +163,6 @@ def table_exists(client: bigquery.Client, table_name: str, layer: str = "bronze"
def delete_table(client: bigquery.Client, table_name: str, layer: str = "bronze"): def delete_table(client: bigquery.Client, table_name: str, layer: str = "bronze"):
"""
Delete table jika ada
Args:
client : BigQuery client
table_name : Nama table
layer : Layer — 'bronze'/'raw', 'silver'/'staging', 'gold'/'dw'
"""
table_id = get_table_id(table_name, layer) table_id = get_table_id(table_name, layer)
try: try:
client.delete_table(table_id, not_found_ok=True) client.delete_table(table_id, not_found_ok=True)
@@ -210,13 +172,6 @@ def delete_table(client: bigquery.Client, table_name: str, layer: str = "bronze"
def create_dataset_if_not_exists(client: bigquery.Client, dataset_id: str): def create_dataset_if_not_exists(client: bigquery.Client, dataset_id: str):
"""
Create dataset jika belum ada
Args:
client : BigQuery client
dataset_id : Dataset ID string
"""
full_id = f"{PROJECT_ID}.{dataset_id}" full_id = f"{PROJECT_ID}.{dataset_id}"
try: try:
client.get_dataset(full_id) client.get_dataset(full_id)
@@ -229,7 +184,6 @@ def create_dataset_if_not_exists(client: bigquery.Client, dataset_id: str):
def create_all_datasets(client: bigquery.Client): def create_all_datasets(client: bigquery.Client):
"""Create semua 3 dataset (Raw/Staging/DW) jika belum ada"""
print("Setting up BigQuery Datasets (Kimball DW)...") print("Setting up BigQuery Datasets (Kimball DW)...")
for layer, dataset_id in LAYER_DATASET_MAP.items(): for layer, dataset_id in LAYER_DATASET_MAP.items():
create_dataset_if_not_exists(client, dataset_id) create_dataset_if_not_exists(client, dataset_id)
@@ -238,21 +192,14 @@ def create_all_datasets(client: bigquery.Client):
# VERIFICATION # VERIFICATION
def verify_setup() -> bool: def verify_setup() -> bool:
"""
Verify BigQuery setup untuk semua 3 layer (Raw / Staging / DW)
Checks:
1. Credentials file exists
2. Koneksi ke BigQuery berhasil
3. Semua dataset ada atau berhasil dibuat
"""
print("=" * 60) print("=" * 60)
print("BIGQUERY SETUP VERIFICATION") print("BIGQUERY SETUP VERIFICATION")
print("Kimball DW Architecture") print("Kimball DW Architecture")
print("=" * 60) print("=" * 60)
# 1. Credentials # 1. Credentials
if not os.path.exists(CREDENTIALS_PATH): credentials_json = os.environ.get("GOOGLE_CREDENTIALS_JSON")
if not credentials_json and not os.path.exists(CREDENTIALS_PATH):
print(f"Credentials not found : {CREDENTIALS_PATH}") print(f"Credentials not found : {CREDENTIALS_PATH}")
return False return False
print(f"✓ Credentials found") print(f"✓ Credentials found")
@@ -284,15 +231,16 @@ def verify_setup() -> bool:
print("=" * 60) print("=" * 60)
return True return True
# INITIALIZE ON IMPORT # INITIALIZE ON IMPORT
if __name__ == "__main__": if __name__ == "__main__":
verify_setup() verify_setup()
else: else:
print("BigQuery Config Loaded — Kimball DW Architecture") print("BigQuery Config Loaded — Kimball DW Architecture")
print(f" Project : {PROJECT_ID}") print(f" Project : {PROJECT_ID}")
print(f" Raw (Bronze) : {DATASET_BRONZE}") print(f" Raw (Bronze) : {DATASET_BRONZE}")
print(f" Staging (Silver) : {DATASET_SILVER}") print(f" Staging (Silver) : {DATASET_SILVER}")
print(f" DW (Gold) : {DATASET_GOLD}") print(f" DW (Gold) : {DATASET_GOLD}")
print(f" Audit : {DATASET_AUDIT}") print(f" Audit : {DATASET_AUDIT}")
print(f" Location : {LOCATION}") print(f" Location : {LOCATION}")

View File

@@ -11,13 +11,6 @@ Subclass yang menggunakan DataSource:
FAODataSource → load ke RAW (Bronze) : raw_fao FAODataSource → load ke RAW (Bronze) : raw_fao
WorldBankDataSource → load ke RAW (Bronze) : raw_worldbank WorldBankDataSource → load ke RAW (Bronze) : raw_worldbank
UNICEFDataSource → load ke RAW (Bronze) : raw_unicef UNICEFDataSource → load ke RAW (Bronze) : raw_unicef
Changes from MySQL version:
1. Replace SQLAlchemy engine → BigQuery client
2. Replace to_sql() → load_table_from_dataframe()
3. load_to_database() default layer = 'bronze' (RAW layer)
4. log_update() menggunakan label 'RAW' sesuai Kimball terminology
5. save_metadata() → save_etl_metadata() ke STAGING layer (Silver)
""" """
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
@@ -27,8 +20,8 @@ from datetime import datetime
from typing import Dict from typing import Dict
import json import json
from bigquery_config import get_bigquery_client, get_table_id, table_exists, CONFIG from scripts.bigquery_config import get_bigquery_client, get_table_id, table_exists, CONFIG
from bigquery_helpers import log_update, load_to_bigquery, read_from_bigquery, save_etl_metadata from scripts.bigquery_helpers import log_update, load_to_bigquery, read_from_bigquery, save_etl_metadata
from google.cloud import bigquery from google.cloud import bigquery
@@ -42,7 +35,7 @@ class DataSource(ABC):
transform_data() → Transform ke format standar transform_data() → Transform ke format standar
validate_data() → Cek kualitas data validate_data() → Cek kualitas data
load_to_database() → Load ke RAW layer (Bronze) load_to_database() → Load ke RAW layer (Bronze)
save_metadata() → Simpan metadata ke STAGING layer (Silver) save_metadata() → Simpan metadata ke AUDIT layer
Subclass wajib implement: Subclass wajib implement:
fetch_data() fetch_data()
@@ -50,22 +43,15 @@ class DataSource(ABC):
""" """
def __init__(self, client: bigquery.Client = None): def __init__(self, client: bigquery.Client = None):
"""
Initialize DataSource dengan BigQuery client.
Args:
client: BigQuery client (jika None, akan dibuat baru)
"""
self.client = client if client else get_bigquery_client() self.client = client if client else get_bigquery_client()
self.logger = logging.getLogger(self.__class__.__name__) self.logger = logging.getLogger(self.__class__.__name__)
self.logger.propagate = False self.logger.propagate = False
self.data = None self.data = None
self.table_name = None self.table_name = None
self.target_layer = "bronze" # RAW layer — default untuk semua data sources self.target_layer = "bronze"
self.asean_countries = CONFIG['asean_countries'] self.asean_countries = CONFIG['asean_countries']
# Metadata untuk tracking reproducibility (disimpan ke STAGING/Silver)
self.metadata = { self.metadata = {
'source_class' : self.__class__.__name__, 'source_class' : self.__class__.__name__,
'table_name' : None, 'table_name' : None,
@@ -84,35 +70,13 @@ class DataSource(ABC):
@abstractmethod @abstractmethod
def fetch_data(self) -> pd.DataFrame: def fetch_data(self) -> pd.DataFrame:
"""
Extract data mentah dari sumber eksternal.
WAJIB diimplementasikan oleh subclass.
"""
pass pass
@abstractmethod @abstractmethod
def transform_data(self, df: pd.DataFrame) -> pd.DataFrame: def transform_data(self, df: pd.DataFrame) -> pd.DataFrame:
"""
Transform data ke format standar sebelum load ke RAW layer.
WAJIB diimplementasikan oleh subclass.
"""
pass pass
def validate_data(self, df: pd.DataFrame) -> Dict: def validate_data(self, df: pd.DataFrame) -> Dict:
"""
Validasi kualitas data hasil transform sebelum load ke RAW layer.
Metrics yang dihitung:
total_rows, total_columns — dimensi data
null_count, null_percentage — kelengkapan per kolom
duplicate_count — duplikasi data
completeness_pct — persentase kelengkapan keseluruhan
memory_usage_mb — ukuran data di memori
year_range — rentang tahun (jika ada kolom year)
Returns:
Dict: Validation metrics
"""
validation = { validation = {
'total_rows' : int(len(df)), 'total_rows' : int(len(df)),
'total_columns' : int(len(df.columns)), 'total_columns' : int(len(df.columns)),
@@ -126,7 +90,6 @@ class DataSource(ABC):
'memory_usage_mb' : float(round(df.memory_usage(deep=True).sum() / 1024**2, 2)) 'memory_usage_mb' : float(round(df.memory_usage(deep=True).sum() / 1024**2, 2))
} }
# Deteksi kolom year untuk year range info
year_cols = [col for col in df.columns if 'year' in col.lower() or 'tahun' in col.lower()] year_cols = [col for col in df.columns if 'year' in col.lower() or 'tahun' in col.lower()]
if year_cols: if year_cols:
year_col = year_cols[0] year_col = year_cols[0]
@@ -139,35 +102,18 @@ class DataSource(ABC):
return validation return validation
def load_to_database(self, df: pd.DataFrame, table_name: str): def load_to_database(self, df: pd.DataFrame, table_name: str):
"""
Load data ke RAW layer (Bronze) dengan full refresh strategy.
Kimball context:
RAW layer adalah landing zone pertama untuk data mentah dari sumber.
Menggunakan WRITE_TRUNCATE (full refresh) karena data sumber
bisa berubah setiap kali pipeline dijalankan.
Args:
df : DataFrame hasil transform
table_name : Nama table tujuan di RAW layer (Bronze)
Audit:
Setiap load dicatat ke etl_logs di STAGING layer (Silver)
"""
try: try:
# Load ke RAW layer (Bronze) — full refresh
load_to_bigquery( load_to_bigquery(
self.client, self.client,
df, df,
table_name, table_name,
layer='bronze', # RAW layer layer='bronze',
write_disposition="WRITE_TRUNCATE" # Full refresh write_disposition="WRITE_TRUNCATE"
) )
# Audit log ke STAGING layer (Silver)
log_update( log_update(
self.client, self.client,
layer='RAW', # Label Kimball layer='RAW',
table_name=table_name, table_name=table_name,
update_method='full_refresh', update_method='full_refresh',
rows_affected=len(df) rows_affected=len(df)
@@ -186,49 +132,20 @@ class DataSource(ABC):
raise raise
def save_metadata(self): def save_metadata(self):
"""
Simpan metadata eksekusi ETL ke STAGING layer (Silver).
Kimball context:
ETL metadata (execution time, row counts, completeness, dll.)
disimpan di Staging layer sebagai operational/audit table,
bukan bagian dari Star Schema di DW layer.
Metadata yang disimpan:
source_class, table_name, execution_timestamp,
duration_seconds, rows_fetched/transformed/loaded,
completeness_pct, config_snapshot, validation_metrics
"""
try: try:
self.metadata['table_name'] = self.table_name self.metadata['table_name'] = self.table_name
# Pastikan validation_metrics dalam format JSON string
if isinstance(self.metadata.get('validation_metrics'), dict): if isinstance(self.metadata.get('validation_metrics'), dict):
self.metadata['validation_metrics'] = json.dumps( self.metadata['validation_metrics'] = json.dumps(
self.metadata['validation_metrics'] self.metadata['validation_metrics']
) )
# Save ke STAGING layer (Silver) via helper
save_etl_metadata(self.client, self.metadata) save_etl_metadata(self.client, self.metadata)
except Exception as e: except Exception as e:
# Silent fail — metadata tracking tidak boleh menghentikan proses ETL self.logger.warning(f"Failed to save ETL metadata to AUDIT: {str(e)}")
self.logger.warning(f"Failed to save ETL metadata to STAGING: {str(e)}")
def run(self) -> pd.DataFrame: def run(self) -> pd.DataFrame:
"""
Jalankan full ETL pipeline: Extract → Transform → Validate → Load → Metadata.
Kimball ETL steps:
1. EXTRACT — fetch_data() : Ambil dari sumber eksternal
2. TRANSFORM — transform_data() : Standardize format
3. VALIDATE — validate_data() : Cek kualitas
4. LOAD — load_to_database() : Load ke RAW layer (Bronze)
5. METADATA — save_metadata() : Simpan ke STAGING layer (Silver)
Returns:
pd.DataFrame: Data yang sudah di-load ke RAW layer
"""
start_time = datetime.now() start_time = datetime.now()
self.metadata['execution_timestamp'] = start_time self.metadata['execution_timestamp'] = start_time
@@ -254,7 +171,7 @@ class DataSource(ABC):
self.load_to_database(self.data, self.table_name) self.load_to_database(self.data, self.table_name)
self.metadata['rows_loaded'] = len(self.data) self.metadata['rows_loaded'] = len(self.data)
# 5. METADATA → STAGING layer (Silver) # 5. METADATA → AUDIT layer
end_time = datetime.now() end_time = datetime.now()
self.metadata['duration_seconds'] = (end_time - start_time).total_seconds() self.metadata['duration_seconds'] = (end_time - start_time).total_seconds()
self.save_metadata() self.save_metadata()
@@ -267,5 +184,5 @@ class DataSource(ABC):
print("DataSource base class loaded — Kimball DW Architecture") print("DataSource base class loaded — Kimball DW Architecture")
print(" Default target layer : RAW (Bronze)") print(" Default target layer : RAW (Bronze)")
print(" Audit logs : STAGING (Silver) via etl_logs") print(" Audit logs : AUDIT via etl_logs")
print(" ETL metadata : STAGING (Silver) via etl_metadata") print(" ETL metadata : AUDIT via etl_metadata")

View File

@@ -77,7 +77,7 @@ def log_update(client: bigquery.Client, layer: str, table_name: str,
job.result() job.result()
except Exception as e: except Exception as e:
print(f" Warning: Failed to write etl_logs [STAGING]: {e}") print(f" Warning: Failed to write etl_logs [AUDIT]: {e}")
def load_to_bigquery(client: bigquery.Client, df: pd.DataFrame, def load_to_bigquery(client: bigquery.Client, df: pd.DataFrame,
@@ -223,4 +223,4 @@ def save_etl_metadata(client: bigquery.Client, metadata: dict):
print(f"etl_metadata — first run | created_at : {created_at}") print(f"etl_metadata — first run | created_at : {created_at}")
else: else:
print(f"etl_metadata — preserved | created_at : {created_at}") print(f"etl_metadata — preserved | created_at : {created_at}")
print(f"etl_metadata — updated_at : {current_time}") print(f"etl_metadata — updated_at : {current_time}")

View File

@@ -33,8 +33,8 @@ from pathlib import Path
from difflib import SequenceMatcher from difflib import SequenceMatcher
import re import re
from bigquery_config import get_bigquery_client, CONFIG, EXPORTS_DIR, LOGS_DIR, get_table_id from scripts.bigquery_config import get_bigquery_client, CONFIG, EXPORTS_DIR, LOGS_DIR, get_table_id
from bigquery_helpers import ( from scripts.bigquery_helpers import (
log_update, log_update,
load_to_bigquery, load_to_bigquery,
read_from_bigquery, read_from_bigquery,
@@ -42,9 +42,10 @@ from bigquery_helpers import (
save_etl_metadata, save_etl_metadata,
get_staging_schema get_staging_schema
) )
from bigquery_datasource import DataSource from scripts.bigquery_datasource import DataSource
from google.cloud import bigquery from google.cloud import bigquery
# INDICATOR MATCHER # INDICATOR MATCHER
class IndicatorMatcher: class IndicatorMatcher:
@@ -200,7 +201,7 @@ class IndicatorMatcher:
class FAODataSource(DataSource): class FAODataSource(DataSource):
""" """
FAO Food Security Data Source (BigQuery version) FAO Food Security Data Source (BigQuery version)
FIXED: Menggunakan bulk download karena faostat API butuh autentikasi Menggunakan bulk download karena faostat API butuh autentikasi
""" """
def __init__(self, client: bigquery.Client = None): def __init__(self, client: bigquery.Client = None):
@@ -447,28 +448,22 @@ class StagingDataIntegration:
} }
def load_raw_data(self) -> Dict[str, pd.DataFrame]: def load_raw_data(self) -> Dict[str, pd.DataFrame]:
"""Load data dari semua tabel RAW layer (Bronze)"""
raw_data = {} raw_data = {}
try: try:
raw_data['fao'] = read_from_bigquery(self.client, 'raw_fao', layer='bronze') raw_data['fao'] = read_from_bigquery(self.client, 'raw_fao', layer='bronze')
except Exception: except Exception:
raw_data['fao'] = pd.DataFrame() raw_data['fao'] = pd.DataFrame()
try: try:
raw_data['worldbank'] = read_from_bigquery(self.client, 'raw_worldbank', layer='bronze') raw_data['worldbank'] = read_from_bigquery(self.client, 'raw_worldbank', layer='bronze')
except Exception: except Exception:
raw_data['worldbank'] = pd.DataFrame() raw_data['worldbank'] = pd.DataFrame()
try: try:
raw_data['unicef'] = read_from_bigquery(self.client, 'raw_unicef', layer='bronze') raw_data['unicef'] = read_from_bigquery(self.client, 'raw_unicef', layer='bronze')
except Exception: except Exception:
raw_data['unicef'] = pd.DataFrame() raw_data['unicef'] = pd.DataFrame()
return raw_data return raw_data
def clean_value(self, value): def clean_value(self, value):
"""Clean dan convert value ke float"""
if pd.isna(value): if pd.isna(value):
return None return None
value_str = str(value).strip().replace('<', '').replace('>', '').strip() value_str = str(value).strip().replace('<', '').replace('>', '').strip()
@@ -478,18 +473,9 @@ class StagingDataIntegration:
return None return None
def process_year_range(self, year_value): def process_year_range(self, year_value):
"""
Process year range dan return (year_int, year_range_str)
Examples:
"2020" → (2020, "2020")
"2020-2021" → (2020, "2020-2021")
"20192021" → (2020, "2019-2021")
"""
if pd.isna(year_value): if pd.isna(year_value):
return None, None return None, None
year_str = str(year_value).strip().replace('', '-').replace('', '-') year_str = str(year_value).strip().replace('', '-').replace('', '-')
if '-' in year_str: if '-' in year_str:
try: try:
parts = year_str.split('-') parts = year_str.split('-')
@@ -509,7 +495,6 @@ class StagingDataIntegration:
return None, year_str return None, year_str
def truncate_string(self, value, max_length: int) -> str: def truncate_string(self, value, max_length: int) -> str:
"""Truncate string sesuai varchar constraint"""
if pd.isna(value): if pd.isna(value):
return '' return ''
s = str(value).strip() s = str(value).strip()
@@ -519,7 +504,6 @@ class StagingDataIntegration:
indicator_orig_col: str, indicator_std_col: str, indicator_orig_col: str, indicator_std_col: str,
country_col: str, year_col: str, value_col: str, country_col: str, year_col: str, value_col: str,
unit_col: str = None) -> pd.DataFrame: unit_col: str = None) -> pd.DataFrame:
"""Standardize dataframe ke schema staging_integrated"""
if df.empty: if df.empty:
return pd.DataFrame() return pd.DataFrame()
@@ -543,10 +527,9 @@ class StagingDataIntegration:
}) })
def standardize_schema(self, raw_data: Dict[str, pd.DataFrame]) -> pd.DataFrame: def standardize_schema(self, raw_data: Dict[str, pd.DataFrame]) -> pd.DataFrame:
"""Standardize schema dari semua sumber data"""
integrated_data = [] integrated_data = []
# FAO — deteksi kolom (nama asli atau sudah di-rename) # FAO
if not raw_data['fao'].empty: if not raw_data['fao'].empty:
df = raw_data['fao'].copy() df = raw_data['fao'].copy()
integrated_data.append(self.standardize_dataframe( integrated_data.append(self.standardize_dataframe(
@@ -590,11 +573,9 @@ class StagingDataIntegration:
df_integrated = pd.concat(integrated_data, ignore_index=True) df_integrated = pd.concat(integrated_data, ignore_index=True)
# Final type conversion
df_integrated['year'] = pd.to_numeric(df_integrated['year'], errors='coerce') df_integrated['year'] = pd.to_numeric(df_integrated['year'], errors='coerce')
df_integrated['value'] = pd.to_numeric(df_integrated['value'], errors='coerce') df_integrated['value'] = pd.to_numeric(df_integrated['value'], errors='coerce')
# Enforce varchar constraints
for col, max_len in [('source', 20), ('country', 100), ('indicator_original', 255), for col, max_len in [('source', 20), ('country', 100), ('indicator_original', 255),
('indicator_standardized', 255), ('year_range', 20), ('unit', 20)]: ('indicator_standardized', 255), ('year_range', 20), ('unit', 20)]:
df_integrated[col] = df_integrated[col].astype(str).apply( df_integrated[col] = df_integrated[col].astype(str).apply(
@@ -606,7 +587,6 @@ class StagingDataIntegration:
).reset_index(drop=True) ).reset_index(drop=True)
def validate_data(self, df: pd.DataFrame) -> Dict: def validate_data(self, df: pd.DataFrame) -> Dict:
"""Validate data dan return metrics"""
validation = { validation = {
'total_rows' : int(len(df)), 'total_rows' : int(len(df)),
'total_columns' : int(len(df.columns)), 'total_columns' : int(len(df.columns)),
@@ -621,15 +601,12 @@ class StagingDataIntegration:
'max' : int(df['year'].max()) if not df['year'].isnull().all() else None, 'max' : int(df['year'].max()) if not df['year'].isnull().all() else None,
'unique_years': int(df['year'].nunique()) 'unique_years': int(df['year'].nunique())
} }
if 'source' in df.columns: if 'source' in df.columns:
validation['source_breakdown'] = { validation['source_breakdown'] = {
str(k): int(v) for k, v in df['source'].value_counts().to_dict().items() str(k): int(v) for k, v in df['source'].value_counts().to_dict().items()
} }
if 'indicator_standardized' in df.columns: if 'indicator_standardized' in df.columns:
validation['unique_indicators'] = int(df['indicator_standardized'].nunique()) validation['unique_indicators'] = int(df['indicator_standardized'].nunique())
if 'country' in df.columns: if 'country' in df.columns:
validation['unique_countries'] = int(df['country'].nunique()) validation['unique_countries'] = int(df['country'].nunique())
@@ -645,21 +622,15 @@ class StagingDataIntegration:
return validation return validation
def save_to_staging(self, df: pd.DataFrame): def save_to_staging(self, df: pd.DataFrame):
"""Save data ke staging_integrated table di STAGING layer (Silver)"""
try: try:
schema = get_staging_schema() schema = get_staging_schema()
load_to_bigquery( load_to_bigquery(
self.client, self.client, df, self.staging_table,
df, layer='silver',
self.staging_table,
layer='silver', # → fs_asean_silver
write_disposition="WRITE_TRUNCATE", write_disposition="WRITE_TRUNCATE",
schema=schema schema=schema
) )
log_update(self.client, 'STAGING', self.staging_table, 'full_refresh', len(df)) log_update(self.client, 'STAGING', self.staging_table, 'full_refresh', len(df))
except Exception as e: except Exception as e:
print(f"save_to_staging FAILED: {type(e).__name__}: {e}") print(f"save_to_staging FAILED: {type(e).__name__}: {e}")
log_update(self.client, 'STAGING', self.staging_table, 'full_refresh', 0, log_update(self.client, 'STAGING', self.staging_table, 'full_refresh', 0,
@@ -667,7 +638,6 @@ class StagingDataIntegration:
raise raise
def run(self) -> pd.DataFrame: def run(self) -> pd.DataFrame:
"""Run staging integration process"""
self.metadata['start_time'] = datetime.now() self.metadata['start_time'] = datetime.now()
try: try:
@@ -703,7 +673,6 @@ class StagingDataIntegration:
save_etl_metadata(self.client, self.metadata) save_etl_metadata(self.client, self.metadata)
# Summary
print(f" ✓ Staging Integration completed: {len(df_integrated):,} rows") print(f" ✓ Staging Integration completed: {len(df_integrated):,} rows")
print(f" Duration : {self.metadata['duration_seconds']:.2f}s") print(f" Duration : {self.metadata['duration_seconds']:.2f}s")
if 'source_breakdown' in validation: if 'source_breakdown' in validation:
@@ -725,7 +694,6 @@ class StagingDataIntegration:
print(f" - country max length : {schema_val['country_max_length']}/100") print(f" - country max length : {schema_val['country_max_length']}/100")
print(f" - year_range max length : {schema_val['year_range_max_length']}/20") print(f" - year_range max length : {schema_val['year_range_max_length']}/20")
print(f" - unit max length : {schema_val['unit_max_length']}/20") print(f" - unit max length : {schema_val['unit_max_length']}/20")
print(f"\n Metadata → [AUDIT] etl_metadata") print(f"\n Metadata → [AUDIT] etl_metadata")
return df_integrated return df_integrated
@@ -734,6 +702,7 @@ class StagingDataIntegration:
self.logger.error(f"Staging integration failed: {str(e)}") self.logger.error(f"Staging integration failed: {str(e)}")
raise raise
# MAIN EXECUTION # MAIN EXECUTION
if __name__ == "__main__": if __name__ == "__main__":
@@ -745,11 +714,9 @@ if __name__ == "__main__":
logger = setup_logging() logger = setup_logging()
client = get_bigquery_client() client = get_bigquery_client()
# ── FAO ──────────────────────────────────────────────────────────────────
print("\n[1/4] Loading FAO Food Security Data → RAW (Bronze)...") print("\n[1/4] Loading FAO Food Security Data → RAW (Bronze)...")
fao_source = FAODataSource(client) fao_source = FAODataSource(client)
df_fao = fao_source.run() df_fao = fao_source.run()
print(f" ✓ raw_fao: {len(df_fao):,} rows") print(f" ✓ raw_fao: {len(df_fao):,} rows")
print(f" Indicators : {df_fao['indicator'].nunique()}") print(f" Indicators : {df_fao['indicator'].nunique()}")
print(f" Countries : {df_fao['country'].nunique()}") print(f" Countries : {df_fao['country'].nunique()}")
@@ -757,28 +724,23 @@ if __name__ == "__main__":
fao_indicators = df_fao['indicator'].unique() fao_indicators = df_fao['indicator'].unique()
# ── World Bank ────────────────────────────────────────────────────────────
print("\n[2/4] Loading World Bank Data → RAW (Bronze)...") print("\n[2/4] Loading World Bank Data → RAW (Bronze)...")
wb_source = WorldBankDataSource(client, list(fao_indicators)) wb_source = WorldBankDataSource(client, list(fao_indicators))
df_wb = wb_source.run() df_wb = wb_source.run()
print(f" ✓ raw_worldbank: {len(df_wb):,} rows") print(f" ✓ raw_worldbank: {len(df_wb):,} rows")
print(f" Matched indicators : {df_wb['indicator_fao'].nunique()}") print(f" Matched indicators : {df_wb['indicator_fao'].nunique()}")
print(f" Countries : {df_wb['country'].nunique()}") print(f" Countries : {df_wb['country'].nunique()}")
if len(df_wb) > 0: if len(df_wb) > 0:
print(f" Year range : {df_wb['year'].min()}{df_wb['year'].max()}") print(f" Year range : {df_wb['year'].min()}{df_wb['year'].max()}")
# ── UNICEF ────────────────────────────────────────────────────────────────
print("\n[3/4] Loading UNICEF Data → RAW (Bronze)...") print("\n[3/4] Loading UNICEF Data → RAW (Bronze)...")
unicef_source = UNICEFDataSource(client, list(fao_indicators)) unicef_source = UNICEFDataSource(client, list(fao_indicators))
df_unicef = unicef_source.run() df_unicef = unicef_source.run()
print(f" ✓ raw_unicef: {len(df_unicef):,} rows") print(f" ✓ raw_unicef: {len(df_unicef):,} rows")
if len(df_unicef) > 0: if len(df_unicef) > 0:
print(f" Matched indicators : {df_unicef['indicator_fao'].nunique()}") print(f" Matched indicators : {df_unicef['indicator_fao'].nunique()}")
print(f" Countries : {df_unicef['country'].nunique()}") print(f" Countries : {df_unicef['country'].nunique()}")
# ── Staging Integration ───────────────────────────────────────────────────
print("\n[4/4] Staging Integration → STAGING (Silver)...") print("\n[4/4] Staging Integration → STAGING (Silver)...")
staging = StagingDataIntegration(client) staging = StagingDataIntegration(client)
df_staging = staging.run() df_staging = staging.run()
@@ -789,7 +751,8 @@ if __name__ == "__main__":
print(f"STAGING (Silver) : staging_integrated") print(f"STAGING (Silver) : staging_integrated")
print(f"AUDIT : etl_logs, etl_metadata") print(f"AUDIT : etl_logs, etl_metadata")
print("=" * 60) print("=" * 60)
# AIRFLOW TASK FUNCTIONS # AIRFLOW TASK FUNCTIONS
def run_verify_connection(): def run_verify_connection():