add etl raw and staging dag and scripts

This commit is contained in:
Debby
2026-03-07 13:57:42 +07:00
parent 7b45a58505
commit 728f8ca7c0
7 changed files with 1897 additions and 5 deletions

6
.gitignore vendored
View File

@@ -1,5 +1,5 @@
# CREDENTIALS - JANGAN DI-PUSH! # CREDENTIALS
credentials/ secrets/
*.json *.json
# ENVIRONMENT VARIABLES # ENVIRONMENT VARIABLES
@@ -24,4 +24,4 @@ standalone_admin_password.txt
# SYSTEM FILES # SYSTEM FILES
.DS_Store .DS_Store
Thumbs.db Thumbs.db

127
dags/etl_food_security.py Normal file
View File

@@ -0,0 +1,127 @@
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import sys
sys.path.insert(0, '/opt/airflow/scripts')
default_args = {
'owner' : 'airflow',
'retries' : 1,
'email_on_failure': False,
}
def task_verify_connection():
from bigquery_config import verify_setup
result = verify_setup()
if not result:
raise Exception("BigQuery connection failed!")
print("BigQuery connection OK")
def task_load_fao():
from bigquery_config import get_bigquery_client
from bigquery_raw_layer import FAODataSource
client = get_bigquery_client()
source = FAODataSource(client)
df = source.run()
print(f"FAO loaded: {len(df):,} rows")
def task_load_worldbank():
from bigquery_config import get_bigquery_client
from bigquery_raw_layer import FAODataSource, WorldBankDataSource
client = get_bigquery_client()
fao_source = FAODataSource(client)
df_fao = fao_source.run()
fao_indicators = df_fao['indicator'].unique().tolist()
wb_source = WorldBankDataSource(client, fao_indicators)
df = wb_source.run()
print(f"World Bank loaded: {len(df):,} rows")
def task_load_unicef():
from bigquery_config import get_bigquery_client
from bigquery_raw_layer import FAODataSource, UNICEFDataSource
client = get_bigquery_client()
fao_source = FAODataSource(client)
df_fao = fao_source.run()
fao_indicators = df_fao['indicator'].unique().tolist()
unicef_source = UNICEFDataSource(client, fao_indicators)
df = unicef_source.run()
print(f"UNICEF loaded: {len(df):,} rows")
def task_staging_integration():
from bigquery_config import get_bigquery_client
from bigquery_raw_layer import StagingDataIntegration
client = get_bigquery_client()
staging = StagingDataIntegration(client)
df = staging.run()
print(f"Staging integrated: {len(df):,} rows")
def task_cleaned_layer():
from bigquery_config import get_bigquery_client
from bigquery_cleaned_layer import (
load_staging_data,
standardize_country_names_asean,
assign_pillar,
assign_direction,
CleanedDataLoader
)
import pandas as pd
client = get_bigquery_client()
df_staging = load_staging_data(client)
df_staging, _ = standardize_country_names_asean(df_staging, country_column='country')
critical_columns = list(df_staging.columns)
df_no_missing = df_staging.dropna(subset=critical_columns)
df_cleaned = df_no_missing.drop_duplicates(
subset=['indicator_standardized', 'country', 'year'],
keep='first'
)
df_cleaned['pillar'] = df_cleaned['indicator_standardized'].apply(assign_pillar)
df_cleaned['direction'] = df_cleaned['indicator_standardized'].apply(assign_direction)
loader = CleanedDataLoader(client, load_mode='full_refresh')
final_count = loader.run(df_cleaned)
print(f"Cleaned loaded: {final_count:,} rows")
with DAG(
dag_id = "etl_food_security_bigquery",
start_date = datetime(2026, 3, 1),
schedule_interval= None,
catchup = False,
default_args = default_args,
tags = ["food-security", "bigquery", "kimball"]
) as dag:
verify = PythonOperator(
task_id = "verify_bigquery_connection",
python_callable= task_verify_connection
)
load_fao = PythonOperator(
task_id = "load_fao_to_bronze",
python_callable= task_load_fao
)
load_wb = PythonOperator(
task_id = "load_worldbank_to_bronze",
python_callable= task_load_worldbank
)
load_unicef = PythonOperator(
task_id = "load_unicef_to_bronze",
python_callable= task_load_unicef
)
staging = PythonOperator(
task_id = "staging_integration_to_silver",
python_callable= task_staging_integration
)
cleaned = PythonOperator(
task_id = "cleaned_layer_to_silver",
python_callable= task_cleaned_layer
)

View File

@@ -15,15 +15,17 @@ services:
depends_on: depends_on:
- postgres - postgres
environment: environment:
- PYTHONPATH=/opt/airflow # Kunci agar folder scripts terbaca - PYTHONPATH=/opt/airflow
- AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@postgres/airflow - AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@postgres/airflow
- AIRFLOW__CORE__EXECUTOR=LocalExecutor - AIRFLOW__CORE__EXECUTOR=LocalExecutor
- AIRFLOW__CORE__LOAD_EXAMPLES=False - AIRFLOW__CORE__LOAD_EXAMPLES=False
- GOOGLE_APPLICATION_CREDENTIALS=/opt/airflow/secrets/food-security-asean-project-826a4d7b302a.json
volumes: volumes:
- airflow_dags:/opt/airflow/dags - airflow_dags:/opt/airflow/dags
- airflow_logs:/opt/airflow/logs - airflow_logs:/opt/airflow/logs
- airflow_plugins:/opt/airflow/plugins - airflow_plugins:/opt/airflow/plugins
- airflow_scripts:/opt/airflow/scripts - airflow_scripts:/opt/airflow/scripts
- ./secrets:/opt/airflow/secrets:ro
ports: ports:
- "8081:8080" - "8081:8080"
command: bash -c "airflow db init && airflow webserver" command: bash -c "airflow db init && airflow webserver"
@@ -37,11 +39,13 @@ services:
- PYTHONPATH=/opt/airflow - PYTHONPATH=/opt/airflow
- AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@postgres/airflow - AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@postgres/airflow
- AIRFLOW__CORE__EXECUTOR=LocalExecutor - AIRFLOW__CORE__EXECUTOR=LocalExecutor
- GOOGLE_APPLICATION_CREDENTIALS=/opt/airflow/secrets/food-security-asean-project-826a4d7b302a.json
volumes: volumes:
- airflow_dags:/opt/airflow/dags - airflow_dags:/opt/airflow/dags
- airflow_logs:/opt/airflow/logs - airflow_logs:/opt/airflow/logs
- airflow_plugins:/opt/airflow/plugins - airflow_plugins:/opt/airflow/secrets:ro
- airflow_scripts:/opt/airflow/scripts - airflow_scripts:/opt/airflow/scripts
- ./secrets:/opt/airflow/secrets:ro
command: scheduler command: scheduler
volumes: volumes:

267
scripts/bigquery_config.py Normal file
View File

@@ -0,0 +1,267 @@
"""
BIGQUERY CONFIGURATION FOR FOOD SECURITY DATA INTEGRATION
Kimball Data Warehouse Architecture
Dataset Naming:
- Bronze (fs_asean_bronze) : Raw layer — data as-is dari sumber
- Silver (fs_asean_silver) : Staging layer — staging_integrated, cleaned_integrated
- Audit (fs_asean_audit) : Audit layer — etl_logs, etl_metadata
- Gold (fs_asean_gold) : DW layer — Dim & Fact tables (Kimball Star Schema)
Kimball ETL Flow:
Source Data
RAW (Bronze) → raw_fao, raw_worldbank, raw_unicef
STAGING (Silver) → staging_integrated, cleaned_integrated
DATA WAREHOUSE (Gold) → dim_*, fact_food_security, fact_food_security_eligible
AUDIT (fs_asean_audit) → etl_logs, etl_metadata [semua layer log ke sini]
"""
import os
from pathlib import Path
from google.cloud import bigquery
from google.oauth2 import service_account
# BIGQUERY CONFIGURATION
CREDENTIALS_PATH = os.environ.get(
"GOOGLE_APPLICATION_CREDENTIALS",
"/opt/airflow/secrets/food-security-asean-project-826a4d7b302a.json"
)
PROJECT_ID = "food-security-asean-project"
LOCATION = "asia-southeast2"
# DATASET IDs
# Bronze = Raw Layer | Silver = Staging Layer | Gold = DW Layer (Kimball)
DATASET_BRONZE = "fs_asean_bronze" # Raw layer — data mentah dari sumber
DATASET_SILVER = "fs_asean_silver" # Staging layer — staging_integrated, cleaned_integrated
DATASET_AUDIT = "fs_asean_audit" # Audit layer — etl_logs, etl_metadata
DATASET_GOLD = "fs_asean_gold" # DW layer — Dim & Fact (Star Schema)
# Mapping layer name → dataset id
LAYER_DATASET_MAP = {
"bronze" : DATASET_BRONZE, # Raw
"silver" : DATASET_SILVER, # Staging, Cleaned
"audit" : DATASET_AUDIT, # Audit/Logs
"gold" : DATASET_GOLD, # DW
}
# Alias Kimball terminology → layer (untuk readability di file lain)
KIMBALL_LAYER_MAP = {
"raw" : "bronze",
"staging" : "silver",
"logs" : "audit",
"dw" : "gold",
}
# SETUP BIGQUERY CLIENT
def get_bigquery_client() -> bigquery.Client:
"""
Create BigQuery client dengan service account credentials
Returns:
bigquery.Client: Authenticated BigQuery client
"""
credentials = service_account.Credentials.from_service_account_file(
CREDENTIALS_PATH,
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
return bigquery.Client(
credentials=credentials,
project=PROJECT_ID,
location=LOCATION
)
# MATCHING CONFIGURATION
CONFIG = {
"bigquery": {
"project_id" : PROJECT_ID,
"dataset_bronze" : DATASET_BRONZE,
"dataset_silver" : DATASET_SILVER,
"dataset_audit" : DATASET_AUDIT,
"dataset_gold" : DATASET_GOLD,
"location" : LOCATION,
"credentials_path": CREDENTIALS_PATH
},
"matching": {
"threshold": 0.70,
"weights": {
"keyword" : 0.50,
"string_similarity" : 0.30,
"word_overlap" : 0.20
},
"penalties": {
"qualifier_mismatch" : 0.85,
"severity_mismatch" : 0.80,
"target_mismatch" : 0.90,
"service_level_mismatch": 0.88
}
},
"asean_countries": [
"Brunei Darussalam",
"Cambodia",
"Indonesia",
"Lao People's Democratic Republic",
"Malaysia",
"Myanmar",
"Philippines",
"Singapore",
"Thailand",
"Viet Nam"
],
"asean_iso_codes": ["BRN", "KHM", "IDN", "LAO", "MYS", "MMR", "PHL", "SGP", "THA", "VNM"],
"unicef_datasets": {
"WASH_HOUSEHOLDS": "Water, Sanitation & Hygiene",
"NUTRITION" : "Child Nutrition",
"EDUCATION" : "Education",
"HIV_AIDS" : "HIV/AIDS"
}
}
# DIRECTORY SETUP
BASE_DIR = Path.cwd()
EXPORTS_DIR = BASE_DIR / 'exports'
LOGS_DIR = BASE_DIR / 'logs'
for directory in [EXPORTS_DIR, LOGS_DIR]:
directory.mkdir(exist_ok=True)
# HELPER FUNCTIONS
def get_table_id(table_name: str, layer: str = "bronze") -> str:
# Resolve Kimball alias ke layer name
resolved = KIMBALL_LAYER_MAP.get(layer.lower(), layer.lower())
dataset = LAYER_DATASET_MAP.get(resolved, DATASET_BRONZE)
return f"{PROJECT_ID}.{dataset}.{table_name}"
def table_exists(client: bigquery.Client, table_name: str, layer: str = "bronze") -> bool:
"""
Check apakah table ada di BigQuery
Args:
client : BigQuery client
table_name : Nama table
layer : Layer — 'bronze'/'raw', 'silver'/'staging', 'gold'/'dw'
Returns:
bool: True jika table ada
"""
try:
client.get_table(get_table_id(table_name, layer))
return True
except Exception:
return False
def delete_table(client: bigquery.Client, table_name: str, layer: str = "bronze"):
"""
Delete table jika ada
Args:
client : BigQuery client
table_name : Nama table
layer : Layer — 'bronze'/'raw', 'silver'/'staging', 'gold'/'dw'
"""
table_id = get_table_id(table_name, layer)
try:
client.delete_table(table_id, not_found_ok=True)
print(f" Deleted [{layer.upper()}] table: {table_name}")
except Exception as e:
print(f" Error deleting [{layer.upper()}] table {table_name}: {e}")
def create_dataset_if_not_exists(client: bigquery.Client, dataset_id: str):
"""
Create dataset jika belum ada
Args:
client : BigQuery client
dataset_id : Dataset ID string
"""
full_id = f"{PROJECT_ID}.{dataset_id}"
try:
client.get_dataset(full_id)
print(f" ✓ Exists : {dataset_id}")
except Exception:
ds = bigquery.Dataset(full_id)
ds.location = LOCATION
client.create_dataset(ds, timeout=30)
print(f" ✓ Created : {dataset_id}")
def create_all_datasets(client: bigquery.Client):
"""Create semua 3 dataset (Raw/Staging/DW) jika belum ada"""
print("Setting up BigQuery Datasets (Kimball DW)...")
for layer, dataset_id in LAYER_DATASET_MAP.items():
create_dataset_if_not_exists(client, dataset_id)
# VERIFICATION
def verify_setup() -> bool:
"""
Verify BigQuery setup untuk semua 3 layer (Raw / Staging / DW)
Checks:
1. Credentials file exists
2. Koneksi ke BigQuery berhasil
3. Semua dataset ada atau berhasil dibuat
"""
print("=" * 60)
print("BIGQUERY SETUP VERIFICATION")
print("Kimball DW Architecture")
print("=" * 60)
# 1. Credentials
if not os.path.exists(CREDENTIALS_PATH):
print(f"Credentials not found : {CREDENTIALS_PATH}")
return False
print(f"✓ Credentials found")
# 2. Koneksi
try:
client = get_bigquery_client()
print(f"✓ Connected to BigQuery")
print(f" Project : {PROJECT_ID}")
print(f" Location : {LOCATION}")
except Exception as e:
print(f"Connection failed: {e}")
return False
# 3. Datasets
try:
print()
create_all_datasets(client)
except Exception as e:
print(f"Dataset setup failed: {e}")
return False
print("\n" + "=" * 60)
print("✓ SETUP SUCCESSFUL")
print(f" Raw (Bronze) : {DATASET_BRONZE}")
print(f" Staging (Silver) : {DATASET_SILVER}")
print(f" DW (Gold) : {DATASET_GOLD}")
print(f" Audit : {DATASET_AUDIT}")
print("=" * 60)
return True
# INITIALIZE ON IMPORT
if __name__ == "__main__":
verify_setup()
else:
print("BigQuery Config Loaded — Kimball DW Architecture")
print(f" Project : {PROJECT_ID}")
print(f" Raw (Bronze) : {DATASET_BRONZE}")
print(f" Staging (Silver) : {DATASET_SILVER}")
print(f" DW (Gold) : {DATASET_GOLD}")
print(f" Audit : {DATASET_AUDIT}")
print(f" Location : {LOCATION}")

View File

@@ -0,0 +1,271 @@
"""
BIGQUERY DATA SOURCE BASE CLASS
Kimball Data Warehouse Architecture
Layer Assignment:
RAW (Bronze) → Tempat load data mentah dari sumber eksternal
STAGING (Silver) → etl_logs, etl_metadata (via helpers)
DW (Gold) → dim_*, fact_* (di file terpisah)
Subclass yang menggunakan DataSource:
FAODataSource → load ke RAW (Bronze) : raw_fao
WorldBankDataSource → load ke RAW (Bronze) : raw_worldbank
UNICEFDataSource → load ke RAW (Bronze) : raw_unicef
Changes from MySQL version:
1. Replace SQLAlchemy engine → BigQuery client
2. Replace to_sql() → load_table_from_dataframe()
3. load_to_database() default layer = 'bronze' (RAW layer)
4. log_update() menggunakan label 'RAW' sesuai Kimball terminology
5. save_metadata() → save_etl_metadata() ke STAGING layer (Silver)
"""
from abc import ABC, abstractmethod
import pandas as pd
import logging
from datetime import datetime
from typing import Dict
import json
from bigquery_config import get_bigquery_client, get_table_id, table_exists, CONFIG
from bigquery_helpers import log_update, load_to_bigquery, read_from_bigquery, save_etl_metadata
from google.cloud import bigquery
class DataSource(ABC):
"""
Abstract base class untuk semua sumber data dengan template ETL pattern.
Menggunakan Kimball DW methodology.
Kimball Flow untuk setiap DataSource:
fetch_data() → Extract dari sumber eksternal (FAO/WB/UNICEF)
transform_data() → Transform ke format standar
validate_data() → Cek kualitas data
load_to_database() → Load ke RAW layer (Bronze)
save_metadata() → Simpan metadata ke STAGING layer (Silver)
Subclass wajib implement:
fetch_data()
transform_data()
"""
def __init__(self, client: bigquery.Client = None):
"""
Initialize DataSource dengan BigQuery client.
Args:
client: BigQuery client (jika None, akan dibuat baru)
"""
self.client = client if client else get_bigquery_client()
self.logger = logging.getLogger(self.__class__.__name__)
self.logger.propagate = False
self.data = None
self.table_name = None
self.target_layer = "bronze" # RAW layer — default untuk semua data sources
self.asean_countries = CONFIG['asean_countries']
# Metadata untuk tracking reproducibility (disimpan ke STAGING/Silver)
self.metadata = {
'source_class' : self.__class__.__name__,
'table_name' : None,
'execution_timestamp': None,
'duration_seconds' : None,
'rows_fetched' : 0,
'rows_transformed' : 0,
'rows_loaded' : 0,
'completeness_pct' : 0,
'config_snapshot' : json.dumps({
'threshold': float(CONFIG['matching']['threshold']),
'weights' : {k: float(v) for k, v in CONFIG['matching']['weights'].items()}
}),
'validation_metrics' : '{}'
}
@abstractmethod
def fetch_data(self) -> pd.DataFrame:
"""
Extract data mentah dari sumber eksternal.
WAJIB diimplementasikan oleh subclass.
"""
pass
@abstractmethod
def transform_data(self, df: pd.DataFrame) -> pd.DataFrame:
"""
Transform data ke format standar sebelum load ke RAW layer.
WAJIB diimplementasikan oleh subclass.
"""
pass
def validate_data(self, df: pd.DataFrame) -> Dict:
"""
Validasi kualitas data hasil transform sebelum load ke RAW layer.
Metrics yang dihitung:
total_rows, total_columns — dimensi data
null_count, null_percentage — kelengkapan per kolom
duplicate_count — duplikasi data
completeness_pct — persentase kelengkapan keseluruhan
memory_usage_mb — ukuran data di memori
year_range — rentang tahun (jika ada kolom year)
Returns:
Dict: Validation metrics
"""
validation = {
'total_rows' : int(len(df)),
'total_columns' : int(len(df.columns)),
'null_count' : {k: int(v) for k, v in df.isnull().sum().to_dict().items()},
'null_percentage' : {
k: float(v)
for k, v in (df.isnull().sum() / len(df) * 100).round(2).to_dict().items()
},
'duplicate_count' : int(df.duplicated().sum()),
'completeness_pct': float(round((1 - df.isnull().sum().sum() / df.size) * 100, 2)),
'memory_usage_mb' : float(round(df.memory_usage(deep=True).sum() / 1024**2, 2))
}
# Deteksi kolom year untuk year range info
year_cols = [col for col in df.columns if 'year' in col.lower() or 'tahun' in col.lower()]
if year_cols:
year_col = year_cols[0]
validation['year_range'] = {
'min' : int(df[year_col].min()) if not df[year_col].isnull().all() else None,
'max' : int(df[year_col].max()) if not df[year_col].isnull().all() else None,
'unique_years': int(df[year_col].nunique())
}
return validation
def load_to_database(self, df: pd.DataFrame, table_name: str):
"""
Load data ke RAW layer (Bronze) dengan full refresh strategy.
Kimball context:
RAW layer adalah landing zone pertama untuk data mentah dari sumber.
Menggunakan WRITE_TRUNCATE (full refresh) karena data sumber
bisa berubah setiap kali pipeline dijalankan.
Args:
df : DataFrame hasil transform
table_name : Nama table tujuan di RAW layer (Bronze)
Audit:
Setiap load dicatat ke etl_logs di STAGING layer (Silver)
"""
try:
# Load ke RAW layer (Bronze) — full refresh
load_to_bigquery(
self.client,
df,
table_name,
layer='bronze', # RAW layer
write_disposition="WRITE_TRUNCATE" # Full refresh
)
# Audit log ke STAGING layer (Silver)
log_update(
self.client,
layer='RAW', # Label Kimball
table_name=table_name,
update_method='full_refresh',
rows_affected=len(df)
)
except Exception as e:
log_update(
self.client,
layer='RAW',
table_name=table_name,
update_method='full_refresh',
rows_affected=0,
status='failed',
error_msg=str(e)
)
raise
def save_metadata(self):
"""
Simpan metadata eksekusi ETL ke STAGING layer (Silver).
Kimball context:
ETL metadata (execution time, row counts, completeness, dll.)
disimpan di Staging layer sebagai operational/audit table,
bukan bagian dari Star Schema di DW layer.
Metadata yang disimpan:
source_class, table_name, execution_timestamp,
duration_seconds, rows_fetched/transformed/loaded,
completeness_pct, config_snapshot, validation_metrics
"""
try:
self.metadata['table_name'] = self.table_name
# Pastikan validation_metrics dalam format JSON string
if isinstance(self.metadata.get('validation_metrics'), dict):
self.metadata['validation_metrics'] = json.dumps(
self.metadata['validation_metrics']
)
# Save ke STAGING layer (Silver) via helper
save_etl_metadata(self.client, self.metadata)
except Exception as e:
# Silent fail — metadata tracking tidak boleh menghentikan proses ETL
self.logger.warning(f"Failed to save ETL metadata to STAGING: {str(e)}")
def run(self) -> pd.DataFrame:
"""
Jalankan full ETL pipeline: Extract → Transform → Validate → Load → Metadata.
Kimball ETL steps:
1. EXTRACT — fetch_data() : Ambil dari sumber eksternal
2. TRANSFORM — transform_data() : Standardize format
3. VALIDATE — validate_data() : Cek kualitas
4. LOAD — load_to_database() : Load ke RAW layer (Bronze)
5. METADATA — save_metadata() : Simpan ke STAGING layer (Silver)
Returns:
pd.DataFrame: Data yang sudah di-load ke RAW layer
"""
start_time = datetime.now()
self.metadata['execution_timestamp'] = start_time
try:
# 1. EXTRACT
raw_data = self.fetch_data()
self.metadata['rows_fetched'] = len(raw_data) if hasattr(raw_data, '__len__') else 0
# 2. TRANSFORM
self.data = self.transform_data(raw_data)
self.metadata['rows_transformed'] = len(self.data)
# 3. VALIDATE
validation = self.validate_data(self.data)
self.metadata['completeness_pct'] = validation.get('completeness_pct', 0)
self.metadata['validation_metrics'] = json.dumps({
'total_rows' : validation['total_rows'],
'completeness_pct': validation['completeness_pct'],
'duplicate_count' : validation['duplicate_count']
})
# 4. LOAD → RAW layer (Bronze)
self.load_to_database(self.data, self.table_name)
self.metadata['rows_loaded'] = len(self.data)
# 5. METADATA → STAGING layer (Silver)
end_time = datetime.now()
self.metadata['duration_seconds'] = (end_time - start_time).total_seconds()
self.save_metadata()
return self.data
except Exception as e:
raise
print("DataSource base class loaded — Kimball DW Architecture")
print(" Default target layer : RAW (Bronze)")
print(" Audit logs : STAGING (Silver) via etl_logs")
print(" ETL metadata : STAGING (Silver) via etl_metadata")

422
scripts/bigquery_helpers.py Normal file
View File

@@ -0,0 +1,422 @@
"""
BIGQUERY HELPER FUNCTIONS
Kimball Data Warehouse Architecture
Layer Assignment (Kimball terminology):
RAW (Bronze) → raw_fao, raw_worldbank, raw_unicef
STAGING (Silver) → staging_integrated, cleaned_integrated
AUDIT (Audit) → etl_logs, etl_metadata
DW (Gold) → dim_*, fact_food_security, fact_food_security_eligible
Functions:
setup_logging() — Setup file & console logging
log_update() — Audit log ETL ke staging (Silver)
save_etl_metadata() — Save ETL metadata ke staging (Silver), preserve created_at
load_to_bigquery() — Load DataFrame ke layer tertentu
read_from_bigquery() — Read dari layer tertentu
truncate_table() — Hapus semua rows dari table
drop_table() — Drop table dari layer tertentu
get_staging_schema() — Schema staging_integrated
get_etl_metadata_schema() — Schema etl_metadata
"""
import pandas as pd
import logging
from datetime import datetime
import pytz
from google.cloud import bigquery
from bigquery_config import (
get_bigquery_client,
get_table_id,
table_exists,
CONFIG
)
import json
# LOGGING SETUP
def setup_logging(log_file: str = 'logs/etl_pipeline.log') -> logging.Logger:
"""
Setup logging system untuk tracking eksekusi ETL
Args:
log_file: Path to log file
Returns:
logging.Logger: Configured logger
"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file),
logging.StreamHandler()
]
)
return logging.getLogger(__name__)
# ETL AUDIT LOG — STAGING LAYER (Silver)
def ensure_etl_logs_table(client: bigquery.Client):
"""
Buat table etl_logs di STAGING layer (Silver) jika belum ada.
Kimball context:
etl_logs adalah operational/audit table, bukan bagian dari Star Schema.
Disimpan di Staging layer karena merupakan output proses ETL,
bukan data warehouse final.
Schema:
id STRING — unique log ID
timestamp DATETIME — waktu log dibuat
layer STRING — layer yang diproses (RAW/STAGING/DW)
table_name STRING — nama table yang diproses
update_method STRING — full_refresh / incremental
rows_affected INTEGER — jumlah rows
status STRING — success / failed
error_message STRING — pesan error jika gagal
"""
if not table_exists(client, 'etl_logs', layer='audit'):
table_id = get_table_id('etl_logs', layer='audit')
schema = [
bigquery.SchemaField("id", "STRING", mode="REQUIRED"),
bigquery.SchemaField("timestamp", "DATETIME", mode="REQUIRED"),
bigquery.SchemaField("layer", "STRING", mode="REQUIRED"),
bigquery.SchemaField("table_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("update_method", "STRING", mode="REQUIRED"),
bigquery.SchemaField("rows_affected", "INTEGER", mode="NULLABLE"),
bigquery.SchemaField("status", "STRING", mode="NULLABLE"),
bigquery.SchemaField("error_message", "STRING", mode="NULLABLE"),
]
table = bigquery.Table(table_id, schema=schema)
client.create_table(table)
print(f" [AUDIT] Created table: etl_logs")
def log_update(client: bigquery.Client, layer: str, table_name: str,
update_method: str, rows_affected: int,
status: str = 'success', error_msg: str = None):
"""
Catat aktivitas ETL ke etl_logs (STAGING/Silver) untuk audit trail.
Args:
client : BigQuery client
layer : Layer yang diproses — 'RAW', 'STAGING', atau 'DW'
table_name : Nama table yang diproses
update_method : 'full_refresh' atau 'incremental'
rows_affected : Jumlah rows yang diproses
status : 'success' atau 'failed'
error_msg : Pesan error jika status='failed'
Examples:
# Log saat load raw data
log_update(client, 'RAW', 'raw_fao', 'full_refresh', 5000)
# Log saat proses staging
log_update(client, 'STAGING', 'staging_integrated', 'full_refresh', 12000)
# Log saat load ke DW
log_update(client, 'DW', 'fact_food_security', 'full_refresh', 8000)
"""
try:
ensure_etl_logs_table(client)
log_data = pd.DataFrame([{
'id' : str(pd.util.hash_pandas_object(
pd.Series([datetime.now().isoformat()])).values[0]),
'timestamp' : datetime.now(pytz.timezone('Asia/Jakarta')),
'layer' : layer.upper(),
'table_name' : table_name,
'update_method': update_method,
'rows_affected': rows_affected,
'status' : status,
'error_message': error_msg
}])
# Hapus timezone untuk BigQuery DATETIME
log_data['timestamp'] = pd.to_datetime(log_data['timestamp']).dt.tz_localize(None)
log_data['id'] = log_data['id'].astype(str)
table_id = get_table_id('etl_logs', layer='audit')
job_config = bigquery.LoadJobConfig(write_disposition="WRITE_APPEND")
job = client.load_table_from_dataframe(log_data, table_id, job_config=job_config)
job.result()
except Exception as e:
print(f" Warning: Failed to write etl_logs [STAGING]: {e}")
# DATA LOADING TO BIGQUERY
def load_to_bigquery(client: bigquery.Client, df: pd.DataFrame,
table_name: str, layer: str = "bronze",
write_disposition: str = "WRITE_TRUNCATE",
schema: list = None) -> int:
"""
Load DataFrame ke BigQuery table pada layer tertentu.
Args:
client : BigQuery client
df : DataFrame yang akan di-load
table_name : Nama table tujuan
layer : 'bronze'/'raw', 'silver'/'staging', 'gold'/'dw'
write_disposition : WRITE_TRUNCATE (replace) atau WRITE_APPEND (append)
schema : Optional schema (list of SchemaField)
Returns:
int: Jumlah rows yang berhasil di-load
Examples (Kimball flow):
# RAW layer — data mentah dari sumber
load_to_bigquery(client, df_fao, 'raw_fao', layer='bronze')
load_to_bigquery(client, df_wb, 'raw_worldbank', layer='bronze')
load_to_bigquery(client, df_unicef, 'raw_unicef', layer='bronze')
# STAGING layer — cleaned & integrated
load_to_bigquery(client, df_staging, 'staging_integrated', layer='silver')
# DW layer — Kimball Star Schema
load_to_bigquery(client, df_dim, 'dim_country', layer='gold')
load_to_bigquery(client, df_fact, 'fact_food_security', layer='gold')
load_to_bigquery(client, df_elig, 'fact_food_security_eligible', layer='gold')
"""
table_id = get_table_id(table_name, layer)
job_config = bigquery.LoadJobConfig(
write_disposition=write_disposition,
autodetect=True if schema is None else False,
schema=schema
)
job = client.load_table_from_dataframe(df, table_id, job_config=job_config)
job.result()
table = client.get_table(table_id)
print(f" ✓ Loaded {table.num_rows:,} rows → [{layer.upper()}] {table_name}")
return table.num_rows
# DATA READING FROM BIGQUERY
def read_from_bigquery(client: bigquery.Client,
table_name: str = None,
layer: str = "bronze",
query: str = None) -> pd.DataFrame:
"""
Read data dari BigQuery table atau jalankan custom query.
Args:
client : BigQuery client
table_name : Nama table yang akan dibaca
layer : 'bronze'/'raw', 'silver'/'staging', 'gold'/'dw'
query : Custom SQL query (jika diisi, table_name diabaikan)
Returns:
pd.DataFrame: Hasil query
Examples (Kimball flow):
# Baca dari RAW layer
df = read_from_bigquery(client, 'raw_fao', layer='bronze')
# Baca dari STAGING layer
df = read_from_bigquery(client, 'staging_integrated', layer='silver')
# Baca dari DW layer
df = read_from_bigquery(client, 'fact_food_security', layer='gold')
df = read_from_bigquery(client, 'fact_food_security_eligible', layer='gold')
df = read_from_bigquery(client, 'dim_country', layer='gold')
"""
if query:
return client.query(query).result().to_dataframe(create_bqstorage_client=False)
elif table_name:
table_id = get_table_id(table_name, layer)
return client.query(f"SELECT * FROM `{table_id}`").result().to_dataframe(create_bqstorage_client=False)
else:
raise ValueError("Either table_name or query must be provided")
# TABLE MANAGEMENT
def truncate_table(client: bigquery.Client, table_name: str, layer: str = "bronze"):
"""
Hapus semua rows dari table (kosongkan table, struktur tetap ada).
Args:
client : BigQuery client
table_name : Nama table
layer : 'bronze'/'raw', 'silver'/'staging', 'gold'/'dw'
"""
table_id = get_table_id(table_name, layer)
job = client.query(f"DELETE FROM `{table_id}` WHERE TRUE")
job.result()
print(f" Truncated [{layer.upper()}] table: {table_name}")
def drop_table(client: bigquery.Client, table_name: str, layer: str = "bronze"):
"""
Drop table dari BigQuery jika ada.
Args:
client : BigQuery client
table_name : Nama table
layer : 'bronze'/'raw', 'silver'/'staging', 'gold'/'dw'
"""
table_id = get_table_id(table_name, layer)
client.delete_table(table_id, not_found_ok=True)
print(f" Dropped [{layer.upper()}] table: {table_name}")
# SCHEMA DEFINITIONS — STAGING LAYER (Silver)
def get_staging_schema() -> list:
"""
Schema untuk staging_integrated table (STAGING/Silver layer).
Staging table adalah area integrasi data dari semua sumber (FAO, WB, UNICEF)
sebelum di-load ke DW layer sebagai Dim & Fact tables.
Returns:
list: List of SchemaField objects
"""
return [
bigquery.SchemaField("source", "STRING", mode="REQUIRED"),
bigquery.SchemaField("indicator_original", "STRING", mode="REQUIRED"),
bigquery.SchemaField("indicator_standardized", "STRING", mode="REQUIRED"),
bigquery.SchemaField("country", "STRING", mode="REQUIRED"),
bigquery.SchemaField("year", "INTEGER", mode="NULLABLE"),
bigquery.SchemaField("year_range", "STRING", mode="NULLABLE"),
bigquery.SchemaField("value", "FLOAT", mode="NULLABLE"),
bigquery.SchemaField("unit", "STRING", mode="NULLABLE"),
]
def get_etl_metadata_schema() -> list:
"""
Schema untuk etl_metadata table (STAGING/Silver layer).
ETL metadata disimpan di Staging layer karena merupakan operational table
untuk reproducibility & tracking, bukan bagian Star Schema DW.
Returns:
list: List of SchemaField objects
"""
return [
bigquery.SchemaField("id", "STRING", mode="REQUIRED"),
bigquery.SchemaField("source_class", "STRING", mode="REQUIRED"),
bigquery.SchemaField("table_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("execution_timestamp", "DATETIME", mode="REQUIRED"),
bigquery.SchemaField("duration_seconds", "FLOAT", mode="NULLABLE"),
bigquery.SchemaField("rows_fetched", "INTEGER", mode="NULLABLE"),
bigquery.SchemaField("rows_transformed", "INTEGER", mode="NULLABLE"),
bigquery.SchemaField("rows_loaded", "INTEGER", mode="NULLABLE"),
bigquery.SchemaField("completeness_pct", "FLOAT", mode="NULLABLE"),
bigquery.SchemaField("config_snapshot", "STRING", mode="NULLABLE"),
bigquery.SchemaField("validation_metrics", "STRING", mode="NULLABLE"),
bigquery.SchemaField("created_at", "TIMESTAMP", mode="REQUIRED"),
bigquery.SchemaField("updated_at", "TIMESTAMP", mode="REQUIRED"),
]
# ETL METADATA — STAGING LAYER (Silver)
# FIXED: Preserve created_at dari eksekusi pertama
def save_etl_metadata(client: bigquery.Client, metadata: dict):
"""
Save ETL metadata ke etl_metadata table (STAGING/Silver layer).
Logic created_at vs updated_at:
created_at : diambil dari record PERTAMA untuk table_name yang sama
(preserved across runs — untuk reproducibility)
updated_at : selalu diperbarui ke waktu eksekusi sekarang
Args:
client : BigQuery client
metadata : Dict berisi informasi eksekusi ETL:
table_name (required)
source_class (required)
execution_timestamp
duration_seconds
rows_fetched
rows_transformed
rows_loaded
completeness_pct
config_snapshot (JSON string)
validation_metrics (JSON string)
"""
table_name = metadata.get('table_name', 'unknown')
table_id = get_table_id('etl_metadata', layer='audit')
# Buat table jika belum ada
if not table_exists(client, 'etl_metadata', layer='audit'):
schema = get_etl_metadata_schema()
table = bigquery.Table(table_id, schema=schema)
client.create_table(table)
print(f" [AUDIT] Created table: etl_metadata")
# Ambil created_at pertama untuk table ini (preserve across runs)
check_query = f"""
SELECT MIN(created_at) AS first_created_at
FROM `{table_id}`
WHERE table_name = @table_name
"""
job_config_q = bigquery.QueryJobConfig(
query_parameters=[
bigquery.ScalarQueryParameter("table_name", "STRING", table_name)
]
)
try:
rows = list(client.query(check_query, job_config=job_config_q).result())
is_first_run = True
if rows and rows[0]['first_created_at'] is not None:
created_at = rows[0]['first_created_at']
is_first_run = False
else:
created_at = datetime.now()
except Exception:
created_at = datetime.now()
is_first_run = True
current_time = datetime.now()
# Generate unique ID
import hashlib
record_id = hashlib.md5(
f"{metadata.get('source_class')}_{table_name}_{current_time.isoformat()}".encode()
).hexdigest()
meta_df = pd.DataFrame([{
'id' : record_id,
'source_class' : metadata.get('source_class', 'unknown'),
'table_name' : table_name,
'execution_timestamp': metadata.get('execution_timestamp', current_time),
'duration_seconds' : float(metadata.get('duration_seconds', 0)),
'rows_fetched' : int(metadata.get('rows_fetched', 0)),
'rows_transformed' : int(metadata.get('rows_transformed', 0)),
'rows_loaded' : int(metadata.get('rows_loaded', 0)),
'completeness_pct' : float(metadata.get('completeness_pct', 0)),
'config_snapshot' : metadata.get('config_snapshot', '{}'),
'validation_metrics' : metadata.get('validation_metrics', '{}'),
'created_at' : created_at, # PRESERVED dari run pertama
'updated_at' : current_time # SELALU waktu sekarang
}])
# Hapus timezone untuk BigQuery
for col in ['execution_timestamp', 'created_at', 'updated_at']:
meta_df[col] = pd.to_datetime(meta_df[col]).dt.tz_localize(None)
# APPEND ke STAGING layer (Silver)
job_config = bigquery.LoadJobConfig(write_disposition="WRITE_APPEND")
job = client.load_table_from_dataframe(meta_df, table_id, job_config=job_config)
job.result()
if is_first_run:
print(f"etl_metadata — first run | created_at : {created_at}")
else:
print(f"etl_metadata — preserved | created_at : {created_at}")
print(f"etl_metadata — updated_at : {current_time}")
# INITIALIZE
logger = setup_logging()
client = get_bigquery_client()
print("BigQuery Helpers Loaded — Kimball DW Architecture")
print(f"Project : {CONFIG['bigquery']['project_id']}")
print(f"Raw (Bronze) : {CONFIG['bigquery']['dataset_bronze']}")
print(f"Staging (Silver) : {CONFIG['bigquery']['dataset_silver']}")
print(f"DW (Gold) : {CONFIG['bigquery']['dataset_gold']}")

View File

@@ -0,0 +1,801 @@
"""
BIGQUERY RAW LAYER ETL
Kimball Data Warehouse Architecture
Kimball ETL Flow yang dijalankan file ini:
FAODataSource → EXTRACT & LOAD ke RAW layer (Bronze) : raw_fao
WorldBankDataSource → EXTRACT & LOAD ke RAW layer (Bronze) : raw_worldbank
UNICEFDataSource → EXTRACT & LOAD ke RAW layer (Bronze) : raw_unicef
StagingDataIntegration:
- READ dari RAW layer (Bronze) : raw_fao, raw_worldbank, raw_unicef
- LOAD ke STAGING layer (Silver) : staging_integrated
- LOG ke AUDIT layer (Audit) : etl_logs, etl_metadata
Classes:
IndicatorMatcher — Fuzzy matching indikator antar sumber data
FAODataSource — ETL data FAO Food Security
WorldBankDataSource — ETL data World Bank
UNICEFDataSource — ETL data UNICEF
StagingDataIntegration — Integrasi & standardisasi ke Staging layer
Usage:
python bigquery_raw_layer.py
"""
import pandas as pd
import numpy as np
from datetime import datetime
import logging
from typing import List, Dict, Optional, Union
import json
from functools import lru_cache
from pathlib import Path
from difflib import SequenceMatcher
import re
from bigquery_config import get_bigquery_client, CONFIG, EXPORTS_DIR, LOGS_DIR, get_table_id
from bigquery_helpers import (
log_update,
load_to_bigquery,
read_from_bigquery,
setup_logging,
save_etl_metadata,
get_staging_schema
)
from bigquery_datasource import DataSource
from google.cloud import bigquery
# INDICATOR MATCHER
class IndicatorMatcher:
CORE_KEYWORDS = {
'fat' : ['fat', 'lipid'],
'protein' : ['protein'],
'calorie' : ['calorie', 'caloric', 'kcal', 'energy intake'],
'energy' : ['energy', 'dietary energy consumption'],
'stunting' : ['stunting', 'stunted', 'height for age'],
'wasting' : ['wasting', 'wasted', 'weight for height'],
'underweight' : ['underweight', 'weight for age'],
'overweight' : ['overweight', 'overfed'],
'obesity' : ['obesity', 'obese'],
'anemia' : ['anemia', 'anaemia', 'hemoglobin', 'haemoglobin'],
'malnutrition' : ['malnutrition', 'undernourishment', 'malnourished', 'undernourished'],
'breastfeeding': ['breastfeeding', 'breast feeding'],
'birthweight' : ['birthweight', 'birth weight', 'low birth weight'],
'immunization' : ['immunization', 'immunisation', 'vaccination', 'vaccine'],
'gdp' : ['gdp', 'gross domestic product'],
'poverty' : ['poverty', 'poor', 'poverty line'],
'inequality' : ['inequality', 'gini'],
'water' : ['water', 'drinking water', 'clean water', 'safe water'],
'sanitation' : ['sanitation', 'toilet', 'improved sanitation'],
'electricity' : ['electricity', 'electric', 'power'],
'healthcare' : ['healthcare', 'health facility', 'hospital'],
'governance' : ['governance', 'government effectiveness'],
'corruption' : ['corruption', 'transparency'],
'stability' : ['political stability', 'stability', 'conflict']
}
QUALIFIERS = {
'at_least' : ['at least', 'minimum', 'or more', 'or better'],
'basic' : ['basic'],
'improved' : ['improved'],
'safely_managed': ['safely managed', 'safe'],
'exclusive' : ['exclusive', 'exclusively'],
'severe' : ['severe', 'severely'],
'moderate' : ['moderate'],
'mild' : ['mild'],
'children' : ['children', 'child', 'under 5', 'under five', 'u5'],
'women' : ['women', 'female', 'reproductive age'],
'adults' : ['adults', 'adult'],
'population' : ['population', 'people', 'persons'],
'household' : ['household', 'households']
}
def __init__(self):
self.threshold = CONFIG['matching']['threshold']
self.weights = CONFIG['matching']['weights']
self.penalties = CONFIG['matching']['penalties']
self.logger = logging.getLogger(self.__class__.__name__)
@staticmethod
@lru_cache(maxsize=1024)
def clean_text(text: str) -> str:
if pd.isna(text):
return ""
text = str(text).lower()
text = re.sub(r'[^\w\s\(\)]', ' ', text)
return ' '.join(text.split())
@classmethod
@lru_cache(maxsize=512)
def extract_keywords(cls, text: str) -> tuple:
text_clean = cls.clean_text(text)
return tuple(key for key, variants in cls.CORE_KEYWORDS.items()
if any(v in text_clean for v in variants))
@classmethod
@lru_cache(maxsize=512)
def detect_qualifiers(cls, text: str) -> frozenset:
text_clean = cls.clean_text(text)
return frozenset(q for q, variants in cls.QUALIFIERS.items()
if any(v in text_clean for v in variants))
@lru_cache(maxsize=2048)
def calculate_similarity(self, text1: str, text2: str) -> float:
if text1 == text2:
return 1.0
clean1 = self.clean_text(text1)
clean2 = self.clean_text(text2)
keywords1 = set(self.extract_keywords(text1))
keywords2 = set(self.extract_keywords(text2))
if keywords1 and keywords2 and not (keywords1 & keywords2):
return 0.0
core_score = len(keywords1 & keywords2) / max(len(keywords1), len(keywords2), 1)
base_score = SequenceMatcher(None, clean1, clean2).ratio()
words1, words2 = set(clean1.split()), set(clean2.split())
overlap = len(words1 & words2) / max(len(words1), len(words2), 1)
w = self.weights
final_score = (core_score * w['keyword'] +
base_score * w['string_similarity'] +
overlap * w['word_overlap'])
quals1 = self.detect_qualifiers(text1)
quals2 = self.detect_qualifiers(text2)
p = self.penalties
if ('at_least' in quals1) != ('at_least' in quals2): final_score *= p['qualifier_mismatch']
if ('exclusive' in quals1) != ('exclusive' in quals2): final_score *= p['qualifier_mismatch']
sev1 = {'severe', 'moderate', 'mild'} & quals1
sev2 = {'severe', 'moderate', 'mild'} & quals2
if sev1 != sev2 and (sev1 or sev2): final_score *= p['severity_mismatch']
tgt1 = {'children', 'women', 'adults'} & quals1
tgt2 = {'children', 'women', 'adults'} & quals2
if tgt1 != tgt2 and (tgt1 or tgt2): final_score *= p['target_mismatch']
lvl1 = {'basic', 'improved', 'safely_managed'} & quals1
lvl2 = {'basic', 'improved', 'safely_managed'} & quals2
if lvl1 != lvl2 and (lvl1 or lvl2): final_score *= p['service_level_mismatch']
return final_score
def match_indicators(self, source_indicators, target_indicators,
threshold=None, id_col='id', name_col='value', deduplicate=True):
if threshold is None:
threshold = self.threshold
all_matches = []
for source in sorted(source_indicators):
best = self._find_best_match(source, target_indicators, threshold, id_col, name_col)
if best:
all_matches.append({
'source_indicator': source,
'target_indicator': best['name'],
'target_code' : best['code'],
'similarity_score': round(best['similarity'] * 100, 1)
})
if deduplicate and all_matches:
all_matches = self._deduplicate_matches(all_matches)
return all_matches
def _find_best_match(self, source, targets, threshold, id_col, name_col):
best = None
best_score = threshold
if isinstance(targets, pd.DataFrame):
for _, row in targets.iterrows():
score = self.calculate_similarity(source, row[name_col])
if score > best_score:
best_score = score
best = {'code': row[id_col], 'name': row[name_col]}
else:
for target in targets:
score = self.calculate_similarity(source, target)
if score > best_score:
best_score = score
best = {'code': None, 'name': target}
return None if best is None else {**best, 'similarity': best_score}
def _deduplicate_matches(self, matches):
df = pd.DataFrame(matches).sort_values('similarity_score', ascending=False)
dup_col = 'target_code' if df['target_code'].notna().any() else 'target_indicator'
return df.drop_duplicates(subset=dup_col, keep='first').to_dict('records')
# FAO DATA SOURCE → RAW LAYER (Bronze)
class FAODataSource(DataSource):
"""
FAO Food Security Data Source (BigQuery version)
FIXED: Menggunakan bulk download karena faostat API butuh autentikasi
"""
def __init__(self, client: bigquery.Client = None):
super().__init__(client)
self.table_name = 'raw_fao'
self.domain_code = 'FS'
self.matcher = IndicatorMatcher()
self.logger.propagate = False
self.download_url = (
"https://bulks-faostat.fao.org/production/"
"Food_Security_Data_E_All_Data_(Normalized).zip"
)
def fetch_data(self) -> pd.DataFrame:
import requests
import zipfile
import io
print(" Downloading FAO Food Security dataset...")
response = requests.get(self.download_url, timeout=120)
response.raise_for_status()
with zipfile.ZipFile(io.BytesIO(response.content)) as z:
csv_name = [f for f in z.namelist() if f.endswith('.csv')][0]
df = pd.read_csv(z.open(csv_name), encoding='latin-1')
if 'Area' in df.columns:
df = df[df['Area'].isin(self.asean_countries)].copy()
print(f" Raw rows after ASEAN filter: {len(df):,}")
return df
def transform_data(self, df: pd.DataFrame) -> pd.DataFrame:
if 'Element' in df.columns:
df = df[df['Element'] == 'Value'].copy()
column_mapping = {
'Area' : 'country',
'Year' : 'year',
'Item' : 'indicator',
'Value': 'value',
'Unit' : 'unit'
}
df = df.rename(columns={k: v for k, v in column_mapping.items() if k in df.columns})
keep_cols = [c for c in ['country', 'year', 'indicator', 'value', 'unit'] if c in df.columns]
df = df[keep_cols].copy()
if all(col in df.columns for col in ['indicator', 'country', 'year']):
df = df.sort_values(['indicator', 'country', 'year']).reset_index(drop=True)
return df
# WORLD BANK DATA SOURCE → RAW LAYER (Bronze)
import wbgapi as wb
class WorldBankDataSource(DataSource):
def __init__(self, client: bigquery.Client, fao_indicators: List[str]):
super().__init__(client)
self.table_name = 'raw_worldbank'
self.fao_indicators = fao_indicators
self.asean_iso = CONFIG['asean_iso_codes']
self.matching_results = []
self.matcher = IndicatorMatcher()
self.logger.propagate = False
def fetch_data(self) -> Dict:
wb_indicators = pd.DataFrame(list(wb.series.list()))
matches = self.matcher.match_indicators(
self.fao_indicators, wb_indicators,
threshold=CONFIG['matching']['threshold'],
id_col='id', name_col='value', deduplicate=True
)
self.matching_results = [{
'indikator_fao' : m['source_indicator'],
'indikator_wb' : m['target_indicator'],
'kode_wb' : m['target_code'],
'similarity_persen': m['similarity_score']
} for m in matches]
wb_data_dict = {}
for item in self.matching_results:
try:
data = wb.data.DataFrame(item['kode_wb'], self.asean_iso, numericTimeKeys=True)
wb_data_dict[item['indikator_fao']] = data
except Exception:
pass
return wb_data_dict
def transform_data(self, wb_data_dict: Dict) -> pd.DataFrame:
all_data = []
for fao_indicator, df_wide in wb_data_dict.items():
info = next(i for i in self.matching_results if i['indikator_fao'] == fao_indicator)
temp = df_wide.reset_index()
temp.insert(0, 'indicator_wb_original', info['indikator_wb'])
temp.insert(1, 'indicator_fao', fao_indicator)
temp.insert(2, 'wb_code', info['kode_wb'])
all_data.append(temp)
if not all_data:
return pd.DataFrame()
df_combined = pd.concat(all_data, ignore_index=True)
id_vars = ['indicator_wb_original', 'indicator_fao', 'wb_code', 'economy']
value_vars = [c for c in df_combined.columns if c not in id_vars]
df_long = df_combined.melt(
id_vars=id_vars, value_vars=value_vars,
var_name='year', value_name='value'
).rename(columns={'economy': 'country'})
df_long['year'] = df_long['year'].astype(int)
df_long = df_long[['indicator_wb_original', 'indicator_fao', 'wb_code',
'country', 'year', 'value']]
return df_long.sort_values(['indicator_fao', 'country', 'year']).reset_index(drop=True)
# UNICEF DATA SOURCE → RAW LAYER (Bronze)
import requests
import time
class UNICEFDataSource(DataSource):
def __init__(self, client: bigquery.Client, fao_indicators: List[str]):
super().__init__(client)
self.table_name = 'raw_unicef'
self.fao_indicators = fao_indicators
self.base_url = "https://sdmx.data.unicef.org/ws/public/sdmxapi/rest"
self.datasets = CONFIG['unicef_datasets']
self.asean_keywords = ['Indonesia', 'Malaysia', 'Thailand', 'Vietnam', 'Viet Nam',
'Philippines', 'Singapore', 'Myanmar', 'Cambodia', 'Lao', 'Brunei']
self.matching_results = []
self.matcher = IndicatorMatcher()
self.logger.propagate = False
def fetch_data(self) -> pd.DataFrame:
all_data = []
for dataset_code, dataset_name in self.datasets.items():
try:
url = f"{self.base_url}/data/{dataset_code}/all/?format=sdmx-json"
response = requests.get(url, timeout=30)
response.raise_for_status()
data_json = response.json()
series_data = data_json['data']['dataSets'][0]['series']
dimensions = data_json['data']['structure']['dimensions']
data_list = []
for series_key, series_value in series_data.items():
indices = series_key.split(':')
row_data = {'dataset': dataset_code}
for i, dim in enumerate(dimensions['series']):
row_data[dim['id']] = dim['values'][int(indices[i])]['name']
for obs_key, obs_value in series_value.get('observations', {}).items():
obs_row = row_data.copy()
for i, dim in enumerate(dimensions['observation']):
obs_row[dim['id']] = dim['values'][int(obs_key.split(':')[i])]['id']
obs_row['value'] = obs_value[0]
data_list.append(obs_row)
df_temp = pd.DataFrame(data_list)
if 'REF_AREA' in df_temp.columns:
asean_found = [c for c in df_temp['REF_AREA'].unique()
if any(k.lower() in c.lower() for k in self.asean_keywords)]
df_temp = df_temp[df_temp['REF_AREA'].isin(asean_found)]
if len(df_temp) > 0:
all_data.append(df_temp)
time.sleep(0.5)
except Exception:
pass
return pd.concat(all_data, ignore_index=True) if all_data else pd.DataFrame()
def transform_data(self, df: pd.DataFrame) -> pd.DataFrame:
if df.empty:
return df
indicator_col = next((col for col in df.columns if 'indicator' in col.lower()), None)
if not indicator_col:
return pd.DataFrame()
unicef_indicators = df[indicator_col].unique()
matches = self.matcher.match_indicators(
self.fao_indicators, list(unicef_indicators),
threshold=CONFIG['matching']['threshold'], deduplicate=True
)
self.matching_results = []
for match in matches:
matched_rows = df[df[indicator_col] == match['target_indicator']]
if len(matched_rows) > 0:
self.matching_results.append({
'indikator_fao' : match['source_indicator'],
'indikator_unicef': match['target_indicator'],
'unicef_dataset' : matched_rows['dataset'].iloc[0],
'similarity_persen': match['similarity_score']
})
if not self.matching_results:
return pd.DataFrame()
unicef_matched = [i['indikator_unicef'] for i in self.matching_results]
df_filtered = df[df[indicator_col].isin(unicef_matched)].copy()
df_filtered = df_filtered.rename(columns={
indicator_col: 'indicator_unicef_original',
'REF_AREA' : 'country',
'TIME_PERIOD': 'year',
'value' : 'value'
})
unicef_to_fao = {i['indikator_unicef']: i['indikator_fao'] for i in self.matching_results}
df_filtered['indicator_fao'] = df_filtered['indicator_unicef_original'].map(unicef_to_fao)
return df_filtered
# STAGING DATA INTEGRATION → STAGING LAYER (Silver)
class StagingDataIntegration:
"""
Staging Data Integration (BigQuery version)
Input : RAW layer (Bronze) — raw_fao, raw_worldbank, raw_unicef
Output : STAGING layer (Silver) — staging_integrated
Audit : etl_logs, etl_metadata (Audit → fs_asean_audit)
Schema staging_integrated:
source varchar(20)
indicator_original varchar(255)
indicator_standardized varchar(255)
country varchar(100)
year int
year_range varchar(20)
value float
unit varchar(20)
"""
def __init__(self, client: bigquery.Client):
self.client = client
self.logger = logging.getLogger(self.__class__.__name__)
self.logger.propagate = False
self.staging_table = 'staging_integrated'
self.metadata = {
'source_class' : self.__class__.__name__,
'table_name' : self.staging_table,
'start_time' : None,
'end_time' : None,
'duration_seconds' : None,
'rows_fetched' : 0,
'rows_transformed' : 0,
'rows_loaded' : 0,
'validation_metrics': {}
}
def load_raw_data(self) -> Dict[str, pd.DataFrame]:
"""Load data dari semua tabel RAW layer (Bronze)"""
raw_data = {}
try:
raw_data['fao'] = read_from_bigquery(self.client, 'raw_fao', layer='bronze')
except Exception:
raw_data['fao'] = pd.DataFrame()
try:
raw_data['worldbank'] = read_from_bigquery(self.client, 'raw_worldbank', layer='bronze')
except Exception:
raw_data['worldbank'] = pd.DataFrame()
try:
raw_data['unicef'] = read_from_bigquery(self.client, 'raw_unicef', layer='bronze')
except Exception:
raw_data['unicef'] = pd.DataFrame()
return raw_data
def clean_value(self, value):
"""Clean dan convert value ke float"""
if pd.isna(value):
return None
value_str = str(value).strip().replace('<', '').replace('>', '').strip()
try:
return float(value_str)
except:
return None
def process_year_range(self, year_value):
"""
Process year range dan return (year_int, year_range_str)
Examples:
"2020" → (2020, "2020")
"2020-2021" → (2020, "2020-2021")
"20192021" → (2020, "2019-2021")
"""
if pd.isna(year_value):
return None, None
year_str = str(year_value).strip().replace('', '-').replace('', '-')
if '-' in year_str:
try:
parts = year_str.split('-')
if len(parts) == 2:
start_year = int(parts[0].strip())
end_year = int(parts[1].strip())
return (start_year + end_year) // 2, year_str
else:
return int(float(year_str)), year_str
except:
return None, year_str
else:
try:
single_year = int(float(year_str))
return single_year, str(single_year)
except:
return None, year_str
def truncate_string(self, value, max_length: int) -> str:
"""Truncate string sesuai varchar constraint"""
if pd.isna(value):
return ''
s = str(value).strip()
return s[:max_length] if len(s) > max_length else s
def standardize_dataframe(self, df: pd.DataFrame, source: str,
indicator_orig_col: str, indicator_std_col: str,
country_col: str, year_col: str, value_col: str,
unit_col: str = None) -> pd.DataFrame:
"""Standardize dataframe ke schema staging_integrated"""
if df.empty:
return pd.DataFrame()
df_clean = df.copy().dropna(subset=[indicator_orig_col, country_col, year_col])
year_data = df_clean[year_col].apply(self.process_year_range)
units = df_clean[unit_col].fillna('') if (unit_col and unit_col in df_clean.columns) else ''
return pd.DataFrame({
'source' : [self.truncate_string(source, 20)] * len(df_clean),
'indicator_original' : df_clean[indicator_orig_col].apply(lambda x: self.truncate_string(x, 255)),
'indicator_standardized': df_clean[indicator_std_col].apply(lambda x: self.truncate_string(x, 255)),
'country' : df_clean[country_col].apply(lambda x: self.truncate_string(x, 100)),
'year' : [y[0] for y in year_data],
'year_range' : [self.truncate_string(y[1], 20) for y in year_data],
'value' : df_clean[value_col].apply(self.clean_value),
'unit' : [
self.truncate_string(u, 20)
for u in (units if isinstance(units, pd.Series)
else [units] * len(df_clean))
]
})
def standardize_schema(self, raw_data: Dict[str, pd.DataFrame]) -> pd.DataFrame:
"""Standardize schema dari semua sumber data"""
integrated_data = []
# FAO — deteksi kolom (nama asli atau sudah di-rename)
if not raw_data['fao'].empty:
df = raw_data['fao'].copy()
integrated_data.append(self.standardize_dataframe(
df, 'FAO',
indicator_orig_col='Item' if 'Item' in df.columns else 'indicator',
indicator_std_col ='Item' if 'Item' in df.columns else 'indicator',
country_col ='Area' if 'Area' in df.columns else 'country',
year_col ='Year' if 'Year' in df.columns else 'year',
value_col ='Value' if 'Value' in df.columns else 'value',
unit_col ='Unit' if 'Unit' in df.columns else ('unit' if 'unit' in df.columns else None)
))
# World Bank
if not raw_data['worldbank'].empty:
df = raw_data['worldbank'].copy()
integrated_data.append(self.standardize_dataframe(
df, 'World Bank',
indicator_orig_col='indicator_wb_original',
indicator_std_col ='indicator_fao',
country_col ='country',
year_col ='year',
value_col ='value',
unit_col ='unit' if 'unit' in df.columns else None
))
# UNICEF
if not raw_data['unicef'].empty:
df = raw_data['unicef'].copy()
integrated_data.append(self.standardize_dataframe(
df, 'UNICEF',
indicator_orig_col='indicator_unicef_original',
indicator_std_col ='indicator_fao',
country_col ='country',
year_col ='year',
value_col ='value',
unit_col ='unit' if 'unit' in df.columns else None
))
if not integrated_data:
return pd.DataFrame()
df_integrated = pd.concat(integrated_data, ignore_index=True)
# Final type conversion
df_integrated['year'] = pd.to_numeric(df_integrated['year'], errors='coerce')
df_integrated['value'] = pd.to_numeric(df_integrated['value'], errors='coerce')
# Enforce varchar constraints
for col, max_len in [('source', 20), ('country', 100), ('indicator_original', 255),
('indicator_standardized', 255), ('year_range', 20), ('unit', 20)]:
df_integrated[col] = df_integrated[col].astype(str).apply(
lambda x: self.truncate_string(x, max_len)
)
return df_integrated.sort_values(
['source', 'indicator_standardized', 'country', 'year']
).reset_index(drop=True)
def validate_data(self, df: pd.DataFrame) -> Dict:
"""Validate data dan return metrics"""
validation = {
'total_rows' : int(len(df)),
'total_columns' : int(len(df.columns)),
'duplicate_count' : int(df.duplicated().sum()),
'completeness_pct': float(round((1 - df.isnull().sum().sum() / df.size) * 100, 2)),
'memory_usage_mb' : float(round(df.memory_usage(deep=True).sum() / 1024**2, 2))
}
if 'year' in df.columns:
validation['year_range'] = {
'min' : int(df['year'].min()) if not df['year'].isnull().all() else None,
'max' : int(df['year'].max()) if not df['year'].isnull().all() else None,
'unique_years': int(df['year'].nunique())
}
if 'source' in df.columns:
validation['source_breakdown'] = {
str(k): int(v) for k, v in df['source'].value_counts().to_dict().items()
}
if 'indicator_standardized' in df.columns:
validation['unique_indicators'] = int(df['indicator_standardized'].nunique())
if 'country' in df.columns:
validation['unique_countries'] = int(df['country'].nunique())
validation['schema_validation'] = {
'source_max_length' : int(df['source'].str.len().max()) if 'source' in df.columns else 0,
'indicator_original_max_length' : int(df['indicator_original'].str.len().max()) if 'indicator_original' in df.columns else 0,
'indicator_standardized_max_length': int(df['indicator_standardized'].str.len().max()) if 'indicator_standardized' in df.columns else 0,
'country_max_length' : int(df['country'].str.len().max()) if 'country' in df.columns else 0,
'year_range_max_length' : int(df['year_range'].str.len().max()) if 'year_range' in df.columns else 0,
'unit_max_length' : int(df['unit'].str.len().max()) if 'unit' in df.columns else 0
}
return validation
def save_to_staging(self, df: pd.DataFrame):
"""Save data ke staging_integrated table di STAGING layer (Silver)"""
try:
schema = get_staging_schema()
load_to_bigquery(
self.client,
df,
self.staging_table,
layer='silver', # → fs_asean_silver
write_disposition="WRITE_TRUNCATE",
schema=schema
)
log_update(self.client, 'STAGING', self.staging_table, 'full_refresh', len(df))
except Exception as e:
print(f"save_to_staging FAILED: {type(e).__name__}: {e}")
log_update(self.client, 'STAGING', self.staging_table, 'full_refresh', 0,
status='failed', error_msg=str(e))
raise
def run(self) -> pd.DataFrame:
"""Run staging integration process"""
self.metadata['start_time'] = datetime.now()
try:
print("Integrating data from all sources...")
raw_data = self.load_raw_data()
total_fetched = sum(len(df) for df in raw_data.values())
self.metadata['rows_fetched'] = total_fetched
print(f" Total rows fetched: {total_fetched:,}")
df_integrated = self.standardize_schema(raw_data)
if df_integrated.empty:
print("No data to integrate")
return df_integrated
self.metadata['rows_transformed'] = len(df_integrated)
validation = self.validate_data(df_integrated)
self.metadata['validation_metrics'] = validation
self.save_to_staging(df_integrated)
self.metadata['rows_loaded'] = len(df_integrated)
self.metadata['end_time'] = datetime.now()
self.metadata['duration_seconds'] = (
self.metadata['end_time'] - self.metadata['start_time']
).total_seconds()
self.metadata['execution_timestamp'] = self.metadata['start_time']
self.metadata['completeness_pct'] = validation.get('completeness_pct', 0)
self.metadata['config_snapshot'] = json.dumps(CONFIG['matching'])
self.metadata['validation_metrics'] = json.dumps(validation)
save_etl_metadata(self.client, self.metadata)
# Summary
print(f" ✓ Staging Integration completed: {len(df_integrated):,} rows")
print(f" Duration : {self.metadata['duration_seconds']:.2f}s")
if 'source_breakdown' in validation:
for src, cnt in validation['source_breakdown'].items():
print(f" - {src}: {cnt:,} rows")
print(f" Indicators : {validation.get('unique_indicators', '-')}")
print(f" Countries : {validation.get('unique_countries', '-')}")
if 'year_range' in validation:
yr = validation['year_range']
if yr['min'] and yr['max']:
print(f" Year range : {yr['min']}{yr['max']}")
print(f" Completeness: {validation['completeness_pct']:.2f}%")
schema_val = validation['schema_validation']
print(f"\n Schema Validation:")
print(f" - source max length : {schema_val['source_max_length']}/20")
print(f" - indicator_original max length : {schema_val['indicator_original_max_length']}/255")
print(f" - indicator_std max length : {schema_val['indicator_standardized_max_length']}/255")
print(f" - country max length : {schema_val['country_max_length']}/100")
print(f" - year_range max length : {schema_val['year_range_max_length']}/20")
print(f" - unit max length : {schema_val['unit_max_length']}/20")
print(f"\n Metadata → [AUDIT] etl_metadata")
return df_integrated
except Exception as e:
self.logger.error(f"Staging integration failed: {str(e)}")
raise
# MAIN EXECUTION
if __name__ == "__main__":
print("=" * 60)
print("BIGQUERY RAW LAYER ETL")
print("Kimball DW Architecture")
print("=" * 60)
logger = setup_logging()
client = get_bigquery_client()
# ── FAO ──────────────────────────────────────────────────────────────────
print("\n[1/4] Loading FAO Food Security Data → RAW (Bronze)...")
fao_source = FAODataSource(client)
df_fao = fao_source.run()
print(f" ✓ raw_fao: {len(df_fao):,} rows")
print(f" Indicators : {df_fao['indicator'].nunique()}")
print(f" Countries : {df_fao['country'].nunique()}")
print(f" Year range : {df_fao['year'].min()}{df_fao['year'].max()}")
fao_indicators = df_fao['indicator'].unique()
# ── World Bank ────────────────────────────────────────────────────────────
print("\n[2/4] Loading World Bank Data → RAW (Bronze)...")
wb_source = WorldBankDataSource(client, list(fao_indicators))
df_wb = wb_source.run()
print(f" ✓ raw_worldbank: {len(df_wb):,} rows")
print(f" Matched indicators : {df_wb['indicator_fao'].nunique()}")
print(f" Countries : {df_wb['country'].nunique()}")
if len(df_wb) > 0:
print(f" Year range : {df_wb['year'].min()}{df_wb['year'].max()}")
# ── UNICEF ────────────────────────────────────────────────────────────────
print("\n[3/4] Loading UNICEF Data → RAW (Bronze)...")
unicef_source = UNICEFDataSource(client, list(fao_indicators))
df_unicef = unicef_source.run()
print(f" ✓ raw_unicef: {len(df_unicef):,} rows")
if len(df_unicef) > 0:
print(f" Matched indicators : {df_unicef['indicator_fao'].nunique()}")
print(f" Countries : {df_unicef['country'].nunique()}")
# ── Staging Integration ───────────────────────────────────────────────────
print("\n[4/4] Staging Integration → STAGING (Silver)...")
staging = StagingDataIntegration(client)
df_staging = staging.run()
print("\n" + "=" * 60)
print("✓ ETL COMPLETED")
print(f"RAW (Bronze) : raw_fao, raw_worldbank, raw_unicef")
print(f"STAGING (Silver) : staging_integrated")
print(f"AUDIT : etl_logs, etl_metadata")
print("=" * 60)