diff --git a/.gitignore b/.gitignore index 0fb81cd..5b83b5f 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,5 @@ -# CREDENTIALS - JANGAN DI-PUSH! -credentials/ +# CREDENTIALS +secrets/ *.json # ENVIRONMENT VARIABLES @@ -24,4 +24,4 @@ standalone_admin_password.txt # SYSTEM FILES .DS_Store -Thumbs.db \ No newline at end of file +Thumbs.db diff --git a/dags/etl_food_security.py b/dags/etl_food_security.py new file mode 100644 index 0000000..4fe061a --- /dev/null +++ b/dags/etl_food_security.py @@ -0,0 +1,127 @@ +from airflow import DAG +from airflow.operators.python import PythonOperator +from datetime import datetime +import sys + +sys.path.insert(0, '/opt/airflow/scripts') + +default_args = { + 'owner' : 'airflow', + 'retries' : 1, + 'email_on_failure': False, +} + +def task_verify_connection(): + from bigquery_config import verify_setup + result = verify_setup() + if not result: + raise Exception("BigQuery connection failed!") + print("BigQuery connection OK") + +def task_load_fao(): + from bigquery_config import get_bigquery_client + from bigquery_raw_layer import FAODataSource + client = get_bigquery_client() + source = FAODataSource(client) + df = source.run() + print(f"FAO loaded: {len(df):,} rows") + +def task_load_worldbank(): + from bigquery_config import get_bigquery_client + from bigquery_raw_layer import FAODataSource, WorldBankDataSource + client = get_bigquery_client() + fao_source = FAODataSource(client) + df_fao = fao_source.run() + fao_indicators = df_fao['indicator'].unique().tolist() + wb_source = WorldBankDataSource(client, fao_indicators) + df = wb_source.run() + print(f"World Bank loaded: {len(df):,} rows") + +def task_load_unicef(): + from bigquery_config import get_bigquery_client + from bigquery_raw_layer import FAODataSource, UNICEFDataSource + client = get_bigquery_client() + fao_source = FAODataSource(client) + df_fao = fao_source.run() + fao_indicators = df_fao['indicator'].unique().tolist() + unicef_source = UNICEFDataSource(client, fao_indicators) + df = unicef_source.run() + print(f"UNICEF loaded: {len(df):,} rows") + +def task_staging_integration(): + from bigquery_config import get_bigquery_client + from bigquery_raw_layer import StagingDataIntegration + client = get_bigquery_client() + staging = StagingDataIntegration(client) + df = staging.run() + print(f"Staging integrated: {len(df):,} rows") + +def task_cleaned_layer(): + from bigquery_config import get_bigquery_client + from bigquery_cleaned_layer import ( + load_staging_data, + standardize_country_names_asean, + assign_pillar, + assign_direction, + CleanedDataLoader + ) + import pandas as pd + + client = get_bigquery_client() + df_staging = load_staging_data(client) + + df_staging, _ = standardize_country_names_asean(df_staging, country_column='country') + + critical_columns = list(df_staging.columns) + df_no_missing = df_staging.dropna(subset=critical_columns) + + df_cleaned = df_no_missing.drop_duplicates( + subset=['indicator_standardized', 'country', 'year'], + keep='first' + ) + + df_cleaned['pillar'] = df_cleaned['indicator_standardized'].apply(assign_pillar) + df_cleaned['direction'] = df_cleaned['indicator_standardized'].apply(assign_direction) + + loader = CleanedDataLoader(client, load_mode='full_refresh') + final_count = loader.run(df_cleaned) + print(f"Cleaned loaded: {final_count:,} rows") + +with DAG( + dag_id = "etl_food_security_bigquery", + start_date = datetime(2026, 3, 1), + schedule_interval= None, + catchup = False, + default_args = default_args, + tags = ["food-security", "bigquery", "kimball"] +) as dag: + + verify = PythonOperator( + task_id = "verify_bigquery_connection", + python_callable= task_verify_connection + ) + + load_fao = PythonOperator( + task_id = "load_fao_to_bronze", + python_callable= task_load_fao + ) + + load_wb = PythonOperator( + task_id = "load_worldbank_to_bronze", + python_callable= task_load_worldbank + ) + + load_unicef = PythonOperator( + task_id = "load_unicef_to_bronze", + python_callable= task_load_unicef + ) + + staging = PythonOperator( + task_id = "staging_integration_to_silver", + python_callable= task_staging_integration + ) + + cleaned = PythonOperator( + task_id = "cleaned_layer_to_silver", + python_callable= task_cleaned_layer + ) \ No newline at end of file diff --git a/docker-compose.yaml b/docker-compose.yaml index 6a1529d..457ba52 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -15,15 +15,17 @@ services: depends_on: - postgres environment: - - PYTHONPATH=/opt/airflow # Kunci agar folder scripts terbaca + - PYTHONPATH=/opt/airflow - AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@postgres/airflow - AIRFLOW__CORE__EXECUTOR=LocalExecutor - AIRFLOW__CORE__LOAD_EXAMPLES=False + - GOOGLE_APPLICATION_CREDENTIALS=/opt/airflow/secrets/food-security-asean-project-826a4d7b302a.json volumes: - airflow_dags:/opt/airflow/dags - airflow_logs:/opt/airflow/logs - airflow_plugins:/opt/airflow/plugins - airflow_scripts:/opt/airflow/scripts + - ./secrets:/opt/airflow/secrets:ro ports: - "8081:8080" command: bash -c "airflow db init && airflow webserver" @@ -37,11 +39,13 @@ services: - PYTHONPATH=/opt/airflow - AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@postgres/airflow - AIRFLOW__CORE__EXECUTOR=LocalExecutor + - GOOGLE_APPLICATION_CREDENTIALS=/opt/airflow/secrets/food-security-asean-project-826a4d7b302a.json volumes: - airflow_dags:/opt/airflow/dags - airflow_logs:/opt/airflow/logs - - airflow_plugins:/opt/airflow/plugins + - airflow_plugins:/opt/airflow/secrets:ro - airflow_scripts:/opt/airflow/scripts + - ./secrets:/opt/airflow/secrets:ro command: scheduler volumes: diff --git a/scripts/bigquery_config.py b/scripts/bigquery_config.py new file mode 100644 index 0000000..c74e87b --- /dev/null +++ b/scripts/bigquery_config.py @@ -0,0 +1,267 @@ +""" +BIGQUERY CONFIGURATION FOR FOOD SECURITY DATA INTEGRATION +Kimball Data Warehouse Architecture + +Dataset Naming: +- Bronze (fs_asean_bronze) : Raw layer — data as-is dari sumber +- Silver (fs_asean_silver) : Staging layer — staging_integrated, cleaned_integrated +- Audit (fs_asean_audit) : Audit layer — etl_logs, etl_metadata +- Gold (fs_asean_gold) : DW layer — Dim & Fact tables (Kimball Star Schema) + +Kimball ETL Flow: + Source Data + ↓ + RAW (Bronze) → raw_fao, raw_worldbank, raw_unicef + ↓ + STAGING (Silver) → staging_integrated, cleaned_integrated + ↓ + DATA WAREHOUSE (Gold) → dim_*, fact_food_security, fact_food_security_eligible + + AUDIT (fs_asean_audit) → etl_logs, etl_metadata [semua layer log ke sini] +""" + +import os +from pathlib import Path +from google.cloud import bigquery +from google.oauth2 import service_account + +# BIGQUERY CONFIGURATION +CREDENTIALS_PATH = os.environ.get( + "GOOGLE_APPLICATION_CREDENTIALS", + "/opt/airflow/secrets/food-security-asean-project-826a4d7b302a.json" +) +PROJECT_ID = "food-security-asean-project" +LOCATION = "asia-southeast2" + +# DATASET IDs +# Bronze = Raw Layer | Silver = Staging Layer | Gold = DW Layer (Kimball) + +DATASET_BRONZE = "fs_asean_bronze" # Raw layer — data mentah dari sumber +DATASET_SILVER = "fs_asean_silver" # Staging layer — staging_integrated, cleaned_integrated +DATASET_AUDIT = "fs_asean_audit" # Audit layer — etl_logs, etl_metadata +DATASET_GOLD = "fs_asean_gold" # DW layer — Dim & Fact (Star Schema) + +# Mapping layer name → dataset id +LAYER_DATASET_MAP = { + "bronze" : DATASET_BRONZE, # Raw + "silver" : DATASET_SILVER, # Staging, Cleaned + "audit" : DATASET_AUDIT, # Audit/Logs + "gold" : DATASET_GOLD, # DW +} + +# Alias Kimball terminology → layer (untuk readability di file lain) +KIMBALL_LAYER_MAP = { + "raw" : "bronze", + "staging" : "silver", + "logs" : "audit", + "dw" : "gold", +} + +# SETUP BIGQUERY CLIENT + +def get_bigquery_client() -> bigquery.Client: + """ + Create BigQuery client dengan service account credentials + + Returns: + bigquery.Client: Authenticated BigQuery client + """ + credentials = service_account.Credentials.from_service_account_file( + CREDENTIALS_PATH, + scopes=["https://www.googleapis.com/auth/cloud-platform"] + ) + return bigquery.Client( + credentials=credentials, + project=PROJECT_ID, + location=LOCATION + ) + +# MATCHING CONFIGURATION + +CONFIG = { + "bigquery": { + "project_id" : PROJECT_ID, + "dataset_bronze" : DATASET_BRONZE, + "dataset_silver" : DATASET_SILVER, + "dataset_audit" : DATASET_AUDIT, + "dataset_gold" : DATASET_GOLD, + "location" : LOCATION, + "credentials_path": CREDENTIALS_PATH + }, + "matching": { + "threshold": 0.70, + "weights": { + "keyword" : 0.50, + "string_similarity" : 0.30, + "word_overlap" : 0.20 + }, + "penalties": { + "qualifier_mismatch" : 0.85, + "severity_mismatch" : 0.80, + "target_mismatch" : 0.90, + "service_level_mismatch": 0.88 + } + }, + "asean_countries": [ + "Brunei Darussalam", + "Cambodia", + "Indonesia", + "Lao People's Democratic Republic", + "Malaysia", + "Myanmar", + "Philippines", + "Singapore", + "Thailand", + "Viet Nam" + ], + "asean_iso_codes": ["BRN", "KHM", "IDN", "LAO", "MYS", "MMR", "PHL", "SGP", "THA", "VNM"], + "unicef_datasets": { + "WASH_HOUSEHOLDS": "Water, Sanitation & Hygiene", + "NUTRITION" : "Child Nutrition", + "EDUCATION" : "Education", + "HIV_AIDS" : "HIV/AIDS" + } +} + +# DIRECTORY SETUP + +BASE_DIR = Path.cwd() +EXPORTS_DIR = BASE_DIR / 'exports' +LOGS_DIR = BASE_DIR / 'logs' + +for directory in [EXPORTS_DIR, LOGS_DIR]: + directory.mkdir(exist_ok=True) + +# HELPER FUNCTIONS + +def get_table_id(table_name: str, layer: str = "bronze") -> str: + + # Resolve Kimball alias ke layer name + resolved = KIMBALL_LAYER_MAP.get(layer.lower(), layer.lower()) + dataset = LAYER_DATASET_MAP.get(resolved, DATASET_BRONZE) + return f"{PROJECT_ID}.{dataset}.{table_name}" + + +def table_exists(client: bigquery.Client, table_name: str, layer: str = "bronze") -> bool: + """ + Check apakah table ada di BigQuery + + Args: + client : BigQuery client + table_name : Nama table + layer : Layer — 'bronze'/'raw', 'silver'/'staging', 'gold'/'dw' + + Returns: + bool: True jika table ada + """ + try: + client.get_table(get_table_id(table_name, layer)) + return True + except Exception: + return False + + +def delete_table(client: bigquery.Client, table_name: str, layer: str = "bronze"): + """ + Delete table jika ada + + Args: + client : BigQuery client + table_name : Nama table + layer : Layer — 'bronze'/'raw', 'silver'/'staging', 'gold'/'dw' + """ + table_id = get_table_id(table_name, layer) + try: + client.delete_table(table_id, not_found_ok=True) + print(f" Deleted [{layer.upper()}] table: {table_name}") + except Exception as e: + print(f" Error deleting [{layer.upper()}] table {table_name}: {e}") + + +def create_dataset_if_not_exists(client: bigquery.Client, dataset_id: str): + """ + Create dataset jika belum ada + + Args: + client : BigQuery client + dataset_id : Dataset ID string + """ + full_id = f"{PROJECT_ID}.{dataset_id}" + try: + client.get_dataset(full_id) + print(f" ✓ Exists : {dataset_id}") + except Exception: + ds = bigquery.Dataset(full_id) + ds.location = LOCATION + client.create_dataset(ds, timeout=30) + print(f" ✓ Created : {dataset_id}") + + +def create_all_datasets(client: bigquery.Client): + """Create semua 3 dataset (Raw/Staging/DW) jika belum ada""" + print("Setting up BigQuery Datasets (Kimball DW)...") + for layer, dataset_id in LAYER_DATASET_MAP.items(): + create_dataset_if_not_exists(client, dataset_id) + + +# VERIFICATION + +def verify_setup() -> bool: + """ + Verify BigQuery setup untuk semua 3 layer (Raw / Staging / DW) + + Checks: + 1. Credentials file exists + 2. Koneksi ke BigQuery berhasil + 3. Semua dataset ada atau berhasil dibuat + """ + print("=" * 60) + print("BIGQUERY SETUP VERIFICATION") + print("Kimball DW Architecture") + print("=" * 60) + + # 1. Credentials + if not os.path.exists(CREDENTIALS_PATH): + print(f"Credentials not found : {CREDENTIALS_PATH}") + return False + print(f"✓ Credentials found") + + # 2. Koneksi + try: + client = get_bigquery_client() + print(f"✓ Connected to BigQuery") + print(f" Project : {PROJECT_ID}") + print(f" Location : {LOCATION}") + except Exception as e: + print(f"Connection failed: {e}") + return False + + # 3. Datasets + try: + print() + create_all_datasets(client) + except Exception as e: + print(f"Dataset setup failed: {e}") + return False + + print("\n" + "=" * 60) + print("✓ SETUP SUCCESSFUL") + print(f" Raw (Bronze) : {DATASET_BRONZE}") + print(f" Staging (Silver) : {DATASET_SILVER}") + print(f" DW (Gold) : {DATASET_GOLD}") + print(f" Audit : {DATASET_AUDIT}") + print("=" * 60) + return True + +# INITIALIZE ON IMPORT + +if __name__ == "__main__": + verify_setup() +else: + print("BigQuery Config Loaded — Kimball DW Architecture") + print(f" Project : {PROJECT_ID}") + print(f" Raw (Bronze) : {DATASET_BRONZE}") + print(f" Staging (Silver) : {DATASET_SILVER}") + print(f" DW (Gold) : {DATASET_GOLD}") + print(f" Audit : {DATASET_AUDIT}") + print(f" Location : {LOCATION}") \ No newline at end of file diff --git a/scripts/bigquery_datasource.py b/scripts/bigquery_datasource.py new file mode 100644 index 0000000..859e521 --- /dev/null +++ b/scripts/bigquery_datasource.py @@ -0,0 +1,271 @@ +""" +BIGQUERY DATA SOURCE BASE CLASS +Kimball Data Warehouse Architecture + +Layer Assignment: + RAW (Bronze) → Tempat load data mentah dari sumber eksternal + STAGING (Silver) → etl_logs, etl_metadata (via helpers) + DW (Gold) → dim_*, fact_* (di file terpisah) + +Subclass yang menggunakan DataSource: + FAODataSource → load ke RAW (Bronze) : raw_fao + WorldBankDataSource → load ke RAW (Bronze) : raw_worldbank + UNICEFDataSource → load ke RAW (Bronze) : raw_unicef + +Changes from MySQL version: + 1. Replace SQLAlchemy engine → BigQuery client + 2. Replace to_sql() → load_table_from_dataframe() + 3. load_to_database() default layer = 'bronze' (RAW layer) + 4. log_update() menggunakan label 'RAW' sesuai Kimball terminology + 5. save_metadata() → save_etl_metadata() ke STAGING layer (Silver) +""" + +from abc import ABC, abstractmethod +import pandas as pd +import logging +from datetime import datetime +from typing import Dict +import json + +from bigquery_config import get_bigquery_client, get_table_id, table_exists, CONFIG +from bigquery_helpers import log_update, load_to_bigquery, read_from_bigquery, save_etl_metadata +from google.cloud import bigquery + + +class DataSource(ABC): + """ + Abstract base class untuk semua sumber data dengan template ETL pattern. + Menggunakan Kimball DW methodology. + + Kimball Flow untuk setiap DataSource: + fetch_data() → Extract dari sumber eksternal (FAO/WB/UNICEF) + transform_data() → Transform ke format standar + validate_data() → Cek kualitas data + load_to_database() → Load ke RAW layer (Bronze) + save_metadata() → Simpan metadata ke STAGING layer (Silver) + + Subclass wajib implement: + fetch_data() + transform_data() + """ + + def __init__(self, client: bigquery.Client = None): + """ + Initialize DataSource dengan BigQuery client. + + Args: + client: BigQuery client (jika None, akan dibuat baru) + """ + self.client = client if client else get_bigquery_client() + self.logger = logging.getLogger(self.__class__.__name__) + self.logger.propagate = False + + self.data = None + self.table_name = None + self.target_layer = "bronze" # RAW layer — default untuk semua data sources + self.asean_countries = CONFIG['asean_countries'] + + # Metadata untuk tracking reproducibility (disimpan ke STAGING/Silver) + self.metadata = { + 'source_class' : self.__class__.__name__, + 'table_name' : None, + 'execution_timestamp': None, + 'duration_seconds' : None, + 'rows_fetched' : 0, + 'rows_transformed' : 0, + 'rows_loaded' : 0, + 'completeness_pct' : 0, + 'config_snapshot' : json.dumps({ + 'threshold': float(CONFIG['matching']['threshold']), + 'weights' : {k: float(v) for k, v in CONFIG['matching']['weights'].items()} + }), + 'validation_metrics' : '{}' + } + + @abstractmethod + def fetch_data(self) -> pd.DataFrame: + """ + Extract data mentah dari sumber eksternal. + WAJIB diimplementasikan oleh subclass. + """ + pass + + @abstractmethod + def transform_data(self, df: pd.DataFrame) -> pd.DataFrame: + """ + Transform data ke format standar sebelum load ke RAW layer. + WAJIB diimplementasikan oleh subclass. + """ + pass + + def validate_data(self, df: pd.DataFrame) -> Dict: + """ + Validasi kualitas data hasil transform sebelum load ke RAW layer. + + Metrics yang dihitung: + total_rows, total_columns — dimensi data + null_count, null_percentage — kelengkapan per kolom + duplicate_count — duplikasi data + completeness_pct — persentase kelengkapan keseluruhan + memory_usage_mb — ukuran data di memori + year_range — rentang tahun (jika ada kolom year) + + Returns: + Dict: Validation metrics + """ + validation = { + 'total_rows' : int(len(df)), + 'total_columns' : int(len(df.columns)), + 'null_count' : {k: int(v) for k, v in df.isnull().sum().to_dict().items()}, + 'null_percentage' : { + k: float(v) + for k, v in (df.isnull().sum() / len(df) * 100).round(2).to_dict().items() + }, + 'duplicate_count' : int(df.duplicated().sum()), + 'completeness_pct': float(round((1 - df.isnull().sum().sum() / df.size) * 100, 2)), + 'memory_usage_mb' : float(round(df.memory_usage(deep=True).sum() / 1024**2, 2)) + } + + # Deteksi kolom year untuk year range info + year_cols = [col for col in df.columns if 'year' in col.lower() or 'tahun' in col.lower()] + if year_cols: + year_col = year_cols[0] + validation['year_range'] = { + 'min' : int(df[year_col].min()) if not df[year_col].isnull().all() else None, + 'max' : int(df[year_col].max()) if not df[year_col].isnull().all() else None, + 'unique_years': int(df[year_col].nunique()) + } + + return validation + + def load_to_database(self, df: pd.DataFrame, table_name: str): + """ + Load data ke RAW layer (Bronze) dengan full refresh strategy. + + Kimball context: + RAW layer adalah landing zone pertama untuk data mentah dari sumber. + Menggunakan WRITE_TRUNCATE (full refresh) karena data sumber + bisa berubah setiap kali pipeline dijalankan. + + Args: + df : DataFrame hasil transform + table_name : Nama table tujuan di RAW layer (Bronze) + + Audit: + Setiap load dicatat ke etl_logs di STAGING layer (Silver) + """ + try: + # Load ke RAW layer (Bronze) — full refresh + load_to_bigquery( + self.client, + df, + table_name, + layer='bronze', # RAW layer + write_disposition="WRITE_TRUNCATE" # Full refresh + ) + + # Audit log ke STAGING layer (Silver) + log_update( + self.client, + layer='RAW', # Label Kimball + table_name=table_name, + update_method='full_refresh', + rows_affected=len(df) + ) + + except Exception as e: + log_update( + self.client, + layer='RAW', + table_name=table_name, + update_method='full_refresh', + rows_affected=0, + status='failed', + error_msg=str(e) + ) + raise + + def save_metadata(self): + """ + Simpan metadata eksekusi ETL ke STAGING layer (Silver). + + Kimball context: + ETL metadata (execution time, row counts, completeness, dll.) + disimpan di Staging layer sebagai operational/audit table, + bukan bagian dari Star Schema di DW layer. + + Metadata yang disimpan: + source_class, table_name, execution_timestamp, + duration_seconds, rows_fetched/transformed/loaded, + completeness_pct, config_snapshot, validation_metrics + """ + try: + self.metadata['table_name'] = self.table_name + + # Pastikan validation_metrics dalam format JSON string + if isinstance(self.metadata.get('validation_metrics'), dict): + self.metadata['validation_metrics'] = json.dumps( + self.metadata['validation_metrics'] + ) + + # Save ke STAGING layer (Silver) via helper + save_etl_metadata(self.client, self.metadata) + + except Exception as e: + # Silent fail — metadata tracking tidak boleh menghentikan proses ETL + self.logger.warning(f"Failed to save ETL metadata to STAGING: {str(e)}") + + def run(self) -> pd.DataFrame: + """ + Jalankan full ETL pipeline: Extract → Transform → Validate → Load → Metadata. + + Kimball ETL steps: + 1. EXTRACT — fetch_data() : Ambil dari sumber eksternal + 2. TRANSFORM — transform_data() : Standardize format + 3. VALIDATE — validate_data() : Cek kualitas + 4. LOAD — load_to_database() : Load ke RAW layer (Bronze) + 5. METADATA — save_metadata() : Simpan ke STAGING layer (Silver) + + Returns: + pd.DataFrame: Data yang sudah di-load ke RAW layer + """ + start_time = datetime.now() + self.metadata['execution_timestamp'] = start_time + + try: + # 1. EXTRACT + raw_data = self.fetch_data() + self.metadata['rows_fetched'] = len(raw_data) if hasattr(raw_data, '__len__') else 0 + + # 2. TRANSFORM + self.data = self.transform_data(raw_data) + self.metadata['rows_transformed'] = len(self.data) + + # 3. VALIDATE + validation = self.validate_data(self.data) + self.metadata['completeness_pct'] = validation.get('completeness_pct', 0) + self.metadata['validation_metrics'] = json.dumps({ + 'total_rows' : validation['total_rows'], + 'completeness_pct': validation['completeness_pct'], + 'duplicate_count' : validation['duplicate_count'] + }) + + # 4. LOAD → RAW layer (Bronze) + self.load_to_database(self.data, self.table_name) + self.metadata['rows_loaded'] = len(self.data) + + # 5. METADATA → STAGING layer (Silver) + end_time = datetime.now() + self.metadata['duration_seconds'] = (end_time - start_time).total_seconds() + self.save_metadata() + + return self.data + + except Exception as e: + raise + + +print("DataSource base class loaded — Kimball DW Architecture") +print(" Default target layer : RAW (Bronze)") +print(" Audit logs : STAGING (Silver) via etl_logs") +print(" ETL metadata : STAGING (Silver) via etl_metadata") \ No newline at end of file diff --git a/scripts/bigquery_helpers.py b/scripts/bigquery_helpers.py new file mode 100644 index 0000000..b883b8b --- /dev/null +++ b/scripts/bigquery_helpers.py @@ -0,0 +1,422 @@ +""" +BIGQUERY HELPER FUNCTIONS +Kimball Data Warehouse Architecture + +Layer Assignment (Kimball terminology): + RAW (Bronze) → raw_fao, raw_worldbank, raw_unicef + STAGING (Silver) → staging_integrated, cleaned_integrated + AUDIT (Audit) → etl_logs, etl_metadata + DW (Gold) → dim_*, fact_food_security, fact_food_security_eligible + +Functions: + setup_logging() — Setup file & console logging + log_update() — Audit log ETL ke staging (Silver) + save_etl_metadata() — Save ETL metadata ke staging (Silver), preserve created_at + load_to_bigquery() — Load DataFrame ke layer tertentu + read_from_bigquery() — Read dari layer tertentu + truncate_table() — Hapus semua rows dari table + drop_table() — Drop table dari layer tertentu + get_staging_schema() — Schema staging_integrated + get_etl_metadata_schema() — Schema etl_metadata +""" + +import pandas as pd +import logging +from datetime import datetime +import pytz +from google.cloud import bigquery +from bigquery_config import ( + get_bigquery_client, + get_table_id, + table_exists, + CONFIG +) +import json + +# LOGGING SETUP + +def setup_logging(log_file: str = 'logs/etl_pipeline.log') -> logging.Logger: + """ + Setup logging system untuk tracking eksekusi ETL + + Args: + log_file: Path to log file + + Returns: + logging.Logger: Configured logger + """ + logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler(log_file), + logging.StreamHandler() + ] + ) + return logging.getLogger(__name__) + +# ETL AUDIT LOG — STAGING LAYER (Silver) + +def ensure_etl_logs_table(client: bigquery.Client): + """ + Buat table etl_logs di STAGING layer (Silver) jika belum ada. + + Kimball context: + etl_logs adalah operational/audit table, bukan bagian dari Star Schema. + Disimpan di Staging layer karena merupakan output proses ETL, + bukan data warehouse final. + + Schema: + id STRING — unique log ID + timestamp DATETIME — waktu log dibuat + layer STRING — layer yang diproses (RAW/STAGING/DW) + table_name STRING — nama table yang diproses + update_method STRING — full_refresh / incremental + rows_affected INTEGER — jumlah rows + status STRING — success / failed + error_message STRING — pesan error jika gagal + """ + if not table_exists(client, 'etl_logs', layer='audit'): + table_id = get_table_id('etl_logs', layer='audit') + schema = [ + bigquery.SchemaField("id", "STRING", mode="REQUIRED"), + bigquery.SchemaField("timestamp", "DATETIME", mode="REQUIRED"), + bigquery.SchemaField("layer", "STRING", mode="REQUIRED"), + bigquery.SchemaField("table_name", "STRING", mode="REQUIRED"), + bigquery.SchemaField("update_method", "STRING", mode="REQUIRED"), + bigquery.SchemaField("rows_affected", "INTEGER", mode="NULLABLE"), + bigquery.SchemaField("status", "STRING", mode="NULLABLE"), + bigquery.SchemaField("error_message", "STRING", mode="NULLABLE"), + ] + table = bigquery.Table(table_id, schema=schema) + client.create_table(table) + print(f" [AUDIT] Created table: etl_logs") + + +def log_update(client: bigquery.Client, layer: str, table_name: str, + update_method: str, rows_affected: int, + status: str = 'success', error_msg: str = None): + """ + Catat aktivitas ETL ke etl_logs (STAGING/Silver) untuk audit trail. + + Args: + client : BigQuery client + layer : Layer yang diproses — 'RAW', 'STAGING', atau 'DW' + table_name : Nama table yang diproses + update_method : 'full_refresh' atau 'incremental' + rows_affected : Jumlah rows yang diproses + status : 'success' atau 'failed' + error_msg : Pesan error jika status='failed' + + Examples: + # Log saat load raw data + log_update(client, 'RAW', 'raw_fao', 'full_refresh', 5000) + + # Log saat proses staging + log_update(client, 'STAGING', 'staging_integrated', 'full_refresh', 12000) + + # Log saat load ke DW + log_update(client, 'DW', 'fact_food_security', 'full_refresh', 8000) + """ + try: + ensure_etl_logs_table(client) + + log_data = pd.DataFrame([{ + 'id' : str(pd.util.hash_pandas_object( + pd.Series([datetime.now().isoformat()])).values[0]), + 'timestamp' : datetime.now(pytz.timezone('Asia/Jakarta')), + 'layer' : layer.upper(), + 'table_name' : table_name, + 'update_method': update_method, + 'rows_affected': rows_affected, + 'status' : status, + 'error_message': error_msg + }]) + + # Hapus timezone untuk BigQuery DATETIME + log_data['timestamp'] = pd.to_datetime(log_data['timestamp']).dt.tz_localize(None) + log_data['id'] = log_data['id'].astype(str) + + table_id = get_table_id('etl_logs', layer='audit') + job_config = bigquery.LoadJobConfig(write_disposition="WRITE_APPEND") + job = client.load_table_from_dataframe(log_data, table_id, job_config=job_config) + job.result() + + except Exception as e: + print(f" Warning: Failed to write etl_logs [STAGING]: {e}") + +# DATA LOADING TO BIGQUERY + +def load_to_bigquery(client: bigquery.Client, df: pd.DataFrame, + table_name: str, layer: str = "bronze", + write_disposition: str = "WRITE_TRUNCATE", + schema: list = None) -> int: + """ + Load DataFrame ke BigQuery table pada layer tertentu. + + Args: + client : BigQuery client + df : DataFrame yang akan di-load + table_name : Nama table tujuan + layer : 'bronze'/'raw', 'silver'/'staging', 'gold'/'dw' + write_disposition : WRITE_TRUNCATE (replace) atau WRITE_APPEND (append) + schema : Optional schema (list of SchemaField) + + Returns: + int: Jumlah rows yang berhasil di-load + + Examples (Kimball flow): + # RAW layer — data mentah dari sumber + load_to_bigquery(client, df_fao, 'raw_fao', layer='bronze') + load_to_bigquery(client, df_wb, 'raw_worldbank', layer='bronze') + load_to_bigquery(client, df_unicef, 'raw_unicef', layer='bronze') + + # STAGING layer — cleaned & integrated + load_to_bigquery(client, df_staging, 'staging_integrated', layer='silver') + + # DW layer — Kimball Star Schema + load_to_bigquery(client, df_dim, 'dim_country', layer='gold') + load_to_bigquery(client, df_fact, 'fact_food_security', layer='gold') + load_to_bigquery(client, df_elig, 'fact_food_security_eligible', layer='gold') + """ + table_id = get_table_id(table_name, layer) + job_config = bigquery.LoadJobConfig( + write_disposition=write_disposition, + autodetect=True if schema is None else False, + schema=schema + ) + + job = client.load_table_from_dataframe(df, table_id, job_config=job_config) + job.result() + + table = client.get_table(table_id) + print(f" ✓ Loaded {table.num_rows:,} rows → [{layer.upper()}] {table_name}") + return table.num_rows + +# DATA READING FROM BIGQUERY + +def read_from_bigquery(client: bigquery.Client, + table_name: str = None, + layer: str = "bronze", + query: str = None) -> pd.DataFrame: + """ + Read data dari BigQuery table atau jalankan custom query. + + Args: + client : BigQuery client + table_name : Nama table yang akan dibaca + layer : 'bronze'/'raw', 'silver'/'staging', 'gold'/'dw' + query : Custom SQL query (jika diisi, table_name diabaikan) + + Returns: + pd.DataFrame: Hasil query + + Examples (Kimball flow): + # Baca dari RAW layer + df = read_from_bigquery(client, 'raw_fao', layer='bronze') + + # Baca dari STAGING layer + df = read_from_bigquery(client, 'staging_integrated', layer='silver') + + # Baca dari DW layer + df = read_from_bigquery(client, 'fact_food_security', layer='gold') + df = read_from_bigquery(client, 'fact_food_security_eligible', layer='gold') + df = read_from_bigquery(client, 'dim_country', layer='gold') + """ + if query: + return client.query(query).result().to_dataframe(create_bqstorage_client=False) + elif table_name: + table_id = get_table_id(table_name, layer) + return client.query(f"SELECT * FROM `{table_id}`").result().to_dataframe(create_bqstorage_client=False) + else: + raise ValueError("Either table_name or query must be provided") + +# TABLE MANAGEMENT + +def truncate_table(client: bigquery.Client, table_name: str, layer: str = "bronze"): + """ + Hapus semua rows dari table (kosongkan table, struktur tetap ada). + + Args: + client : BigQuery client + table_name : Nama table + layer : 'bronze'/'raw', 'silver'/'staging', 'gold'/'dw' + """ + table_id = get_table_id(table_name, layer) + job = client.query(f"DELETE FROM `{table_id}` WHERE TRUE") + job.result() + print(f" Truncated [{layer.upper()}] table: {table_name}") + + +def drop_table(client: bigquery.Client, table_name: str, layer: str = "bronze"): + """ + Drop table dari BigQuery jika ada. + + Args: + client : BigQuery client + table_name : Nama table + layer : 'bronze'/'raw', 'silver'/'staging', 'gold'/'dw' + """ + table_id = get_table_id(table_name, layer) + client.delete_table(table_id, not_found_ok=True) + print(f" Dropped [{layer.upper()}] table: {table_name}") + +# SCHEMA DEFINITIONS — STAGING LAYER (Silver) + +def get_staging_schema() -> list: + """ + Schema untuk staging_integrated table (STAGING/Silver layer). + + Staging table adalah area integrasi data dari semua sumber (FAO, WB, UNICEF) + sebelum di-load ke DW layer sebagai Dim & Fact tables. + + Returns: + list: List of SchemaField objects + """ + return [ + bigquery.SchemaField("source", "STRING", mode="REQUIRED"), + bigquery.SchemaField("indicator_original", "STRING", mode="REQUIRED"), + bigquery.SchemaField("indicator_standardized", "STRING", mode="REQUIRED"), + bigquery.SchemaField("country", "STRING", mode="REQUIRED"), + bigquery.SchemaField("year", "INTEGER", mode="NULLABLE"), + bigquery.SchemaField("year_range", "STRING", mode="NULLABLE"), + bigquery.SchemaField("value", "FLOAT", mode="NULLABLE"), + bigquery.SchemaField("unit", "STRING", mode="NULLABLE"), + ] + + +def get_etl_metadata_schema() -> list: + """ + Schema untuk etl_metadata table (STAGING/Silver layer). + + ETL metadata disimpan di Staging layer karena merupakan operational table + untuk reproducibility & tracking, bukan bagian Star Schema DW. + + Returns: + list: List of SchemaField objects + """ + return [ + bigquery.SchemaField("id", "STRING", mode="REQUIRED"), + bigquery.SchemaField("source_class", "STRING", mode="REQUIRED"), + bigquery.SchemaField("table_name", "STRING", mode="REQUIRED"), + bigquery.SchemaField("execution_timestamp", "DATETIME", mode="REQUIRED"), + bigquery.SchemaField("duration_seconds", "FLOAT", mode="NULLABLE"), + bigquery.SchemaField("rows_fetched", "INTEGER", mode="NULLABLE"), + bigquery.SchemaField("rows_transformed", "INTEGER", mode="NULLABLE"), + bigquery.SchemaField("rows_loaded", "INTEGER", mode="NULLABLE"), + bigquery.SchemaField("completeness_pct", "FLOAT", mode="NULLABLE"), + bigquery.SchemaField("config_snapshot", "STRING", mode="NULLABLE"), + bigquery.SchemaField("validation_metrics", "STRING", mode="NULLABLE"), + bigquery.SchemaField("created_at", "TIMESTAMP", mode="REQUIRED"), + bigquery.SchemaField("updated_at", "TIMESTAMP", mode="REQUIRED"), + ] + +# ETL METADATA — STAGING LAYER (Silver) +# FIXED: Preserve created_at dari eksekusi pertama + +def save_etl_metadata(client: bigquery.Client, metadata: dict): + """ + Save ETL metadata ke etl_metadata table (STAGING/Silver layer). + + Logic created_at vs updated_at: + created_at : diambil dari record PERTAMA untuk table_name yang sama + (preserved across runs — untuk reproducibility) + updated_at : selalu diperbarui ke waktu eksekusi sekarang + + Args: + client : BigQuery client + metadata : Dict berisi informasi eksekusi ETL: + table_name (required) + source_class (required) + execution_timestamp + duration_seconds + rows_fetched + rows_transformed + rows_loaded + completeness_pct + config_snapshot (JSON string) + validation_metrics (JSON string) + """ + table_name = metadata.get('table_name', 'unknown') + table_id = get_table_id('etl_metadata', layer='audit') + + # Buat table jika belum ada + if not table_exists(client, 'etl_metadata', layer='audit'): + schema = get_etl_metadata_schema() + table = bigquery.Table(table_id, schema=schema) + client.create_table(table) + print(f" [AUDIT] Created table: etl_metadata") + + # Ambil created_at pertama untuk table ini (preserve across runs) + check_query = f""" + SELECT MIN(created_at) AS first_created_at + FROM `{table_id}` + WHERE table_name = @table_name + """ + job_config_q = bigquery.QueryJobConfig( + query_parameters=[ + bigquery.ScalarQueryParameter("table_name", "STRING", table_name) + ] + ) + + try: + rows = list(client.query(check_query, job_config=job_config_q).result()) + is_first_run = True + if rows and rows[0]['first_created_at'] is not None: + created_at = rows[0]['first_created_at'] + is_first_run = False + else: + created_at = datetime.now() + except Exception: + created_at = datetime.now() + is_first_run = True + + current_time = datetime.now() + + # Generate unique ID + import hashlib + record_id = hashlib.md5( + f"{metadata.get('source_class')}_{table_name}_{current_time.isoformat()}".encode() + ).hexdigest() + + meta_df = pd.DataFrame([{ + 'id' : record_id, + 'source_class' : metadata.get('source_class', 'unknown'), + 'table_name' : table_name, + 'execution_timestamp': metadata.get('execution_timestamp', current_time), + 'duration_seconds' : float(metadata.get('duration_seconds', 0)), + 'rows_fetched' : int(metadata.get('rows_fetched', 0)), + 'rows_transformed' : int(metadata.get('rows_transformed', 0)), + 'rows_loaded' : int(metadata.get('rows_loaded', 0)), + 'completeness_pct' : float(metadata.get('completeness_pct', 0)), + 'config_snapshot' : metadata.get('config_snapshot', '{}'), + 'validation_metrics' : metadata.get('validation_metrics', '{}'), + 'created_at' : created_at, # PRESERVED dari run pertama + 'updated_at' : current_time # SELALU waktu sekarang + }]) + + # Hapus timezone untuk BigQuery + for col in ['execution_timestamp', 'created_at', 'updated_at']: + meta_df[col] = pd.to_datetime(meta_df[col]).dt.tz_localize(None) + + # APPEND ke STAGING layer (Silver) + job_config = bigquery.LoadJobConfig(write_disposition="WRITE_APPEND") + job = client.load_table_from_dataframe(meta_df, table_id, job_config=job_config) + job.result() + + if is_first_run: + print(f"etl_metadata — first run | created_at : {created_at}") + else: + print(f"etl_metadata — preserved | created_at : {created_at}") + print(f"etl_metadata — updated_at : {current_time}") + +# INITIALIZE + +logger = setup_logging() +client = get_bigquery_client() + +print("BigQuery Helpers Loaded — Kimball DW Architecture") +print(f"Project : {CONFIG['bigquery']['project_id']}") +print(f"Raw (Bronze) : {CONFIG['bigquery']['dataset_bronze']}") +print(f"Staging (Silver) : {CONFIG['bigquery']['dataset_silver']}") +print(f"DW (Gold) : {CONFIG['bigquery']['dataset_gold']}") \ No newline at end of file diff --git a/scripts/bigquery_raw_layer.py b/scripts/bigquery_raw_layer.py new file mode 100644 index 0000000..177e4f0 --- /dev/null +++ b/scripts/bigquery_raw_layer.py @@ -0,0 +1,801 @@ +""" +BIGQUERY RAW LAYER ETL +Kimball Data Warehouse Architecture + +Kimball ETL Flow yang dijalankan file ini: + FAODataSource → EXTRACT & LOAD ke RAW layer (Bronze) : raw_fao + WorldBankDataSource → EXTRACT & LOAD ke RAW layer (Bronze) : raw_worldbank + UNICEFDataSource → EXTRACT & LOAD ke RAW layer (Bronze) : raw_unicef + StagingDataIntegration: + - READ dari RAW layer (Bronze) : raw_fao, raw_worldbank, raw_unicef + - LOAD ke STAGING layer (Silver) : staging_integrated + - LOG ke AUDIT layer (Audit) : etl_logs, etl_metadata + +Classes: + IndicatorMatcher — Fuzzy matching indikator antar sumber data + FAODataSource — ETL data FAO Food Security + WorldBankDataSource — ETL data World Bank + UNICEFDataSource — ETL data UNICEF + StagingDataIntegration — Integrasi & standardisasi ke Staging layer + +Usage: + python bigquery_raw_layer.py +""" + +import pandas as pd +import numpy as np +from datetime import datetime +import logging +from typing import List, Dict, Optional, Union +import json +from functools import lru_cache +from pathlib import Path +from difflib import SequenceMatcher +import re + +from bigquery_config import get_bigquery_client, CONFIG, EXPORTS_DIR, LOGS_DIR, get_table_id +from bigquery_helpers import ( + log_update, + load_to_bigquery, + read_from_bigquery, + setup_logging, + save_etl_metadata, + get_staging_schema +) +from bigquery_datasource import DataSource +from google.cloud import bigquery + +# INDICATOR MATCHER + +class IndicatorMatcher: + CORE_KEYWORDS = { + 'fat' : ['fat', 'lipid'], + 'protein' : ['protein'], + 'calorie' : ['calorie', 'caloric', 'kcal', 'energy intake'], + 'energy' : ['energy', 'dietary energy consumption'], + 'stunting' : ['stunting', 'stunted', 'height for age'], + 'wasting' : ['wasting', 'wasted', 'weight for height'], + 'underweight' : ['underweight', 'weight for age'], + 'overweight' : ['overweight', 'overfed'], + 'obesity' : ['obesity', 'obese'], + 'anemia' : ['anemia', 'anaemia', 'hemoglobin', 'haemoglobin'], + 'malnutrition' : ['malnutrition', 'undernourishment', 'malnourished', 'undernourished'], + 'breastfeeding': ['breastfeeding', 'breast feeding'], + 'birthweight' : ['birthweight', 'birth weight', 'low birth weight'], + 'immunization' : ['immunization', 'immunisation', 'vaccination', 'vaccine'], + 'gdp' : ['gdp', 'gross domestic product'], + 'poverty' : ['poverty', 'poor', 'poverty line'], + 'inequality' : ['inequality', 'gini'], + 'water' : ['water', 'drinking water', 'clean water', 'safe water'], + 'sanitation' : ['sanitation', 'toilet', 'improved sanitation'], + 'electricity' : ['electricity', 'electric', 'power'], + 'healthcare' : ['healthcare', 'health facility', 'hospital'], + 'governance' : ['governance', 'government effectiveness'], + 'corruption' : ['corruption', 'transparency'], + 'stability' : ['political stability', 'stability', 'conflict'] + } + + QUALIFIERS = { + 'at_least' : ['at least', 'minimum', 'or more', 'or better'], + 'basic' : ['basic'], + 'improved' : ['improved'], + 'safely_managed': ['safely managed', 'safe'], + 'exclusive' : ['exclusive', 'exclusively'], + 'severe' : ['severe', 'severely'], + 'moderate' : ['moderate'], + 'mild' : ['mild'], + 'children' : ['children', 'child', 'under 5', 'under five', 'u5'], + 'women' : ['women', 'female', 'reproductive age'], + 'adults' : ['adults', 'adult'], + 'population' : ['population', 'people', 'persons'], + 'household' : ['household', 'households'] + } + + def __init__(self): + self.threshold = CONFIG['matching']['threshold'] + self.weights = CONFIG['matching']['weights'] + self.penalties = CONFIG['matching']['penalties'] + self.logger = logging.getLogger(self.__class__.__name__) + + @staticmethod + @lru_cache(maxsize=1024) + def clean_text(text: str) -> str: + if pd.isna(text): + return "" + text = str(text).lower() + text = re.sub(r'[^\w\s\(\)]', ' ', text) + return ' '.join(text.split()) + + @classmethod + @lru_cache(maxsize=512) + def extract_keywords(cls, text: str) -> tuple: + text_clean = cls.clean_text(text) + return tuple(key for key, variants in cls.CORE_KEYWORDS.items() + if any(v in text_clean for v in variants)) + + @classmethod + @lru_cache(maxsize=512) + def detect_qualifiers(cls, text: str) -> frozenset: + text_clean = cls.clean_text(text) + return frozenset(q for q, variants in cls.QUALIFIERS.items() + if any(v in text_clean for v in variants)) + + @lru_cache(maxsize=2048) + def calculate_similarity(self, text1: str, text2: str) -> float: + if text1 == text2: + return 1.0 + clean1 = self.clean_text(text1) + clean2 = self.clean_text(text2) + keywords1 = set(self.extract_keywords(text1)) + keywords2 = set(self.extract_keywords(text2)) + if keywords1 and keywords2 and not (keywords1 & keywords2): + return 0.0 + core_score = len(keywords1 & keywords2) / max(len(keywords1), len(keywords2), 1) + base_score = SequenceMatcher(None, clean1, clean2).ratio() + words1, words2 = set(clean1.split()), set(clean2.split()) + overlap = len(words1 & words2) / max(len(words1), len(words2), 1) + w = self.weights + final_score = (core_score * w['keyword'] + + base_score * w['string_similarity'] + + overlap * w['word_overlap']) + quals1 = self.detect_qualifiers(text1) + quals2 = self.detect_qualifiers(text2) + p = self.penalties + if ('at_least' in quals1) != ('at_least' in quals2): final_score *= p['qualifier_mismatch'] + if ('exclusive' in quals1) != ('exclusive' in quals2): final_score *= p['qualifier_mismatch'] + sev1 = {'severe', 'moderate', 'mild'} & quals1 + sev2 = {'severe', 'moderate', 'mild'} & quals2 + if sev1 != sev2 and (sev1 or sev2): final_score *= p['severity_mismatch'] + tgt1 = {'children', 'women', 'adults'} & quals1 + tgt2 = {'children', 'women', 'adults'} & quals2 + if tgt1 != tgt2 and (tgt1 or tgt2): final_score *= p['target_mismatch'] + lvl1 = {'basic', 'improved', 'safely_managed'} & quals1 + lvl2 = {'basic', 'improved', 'safely_managed'} & quals2 + if lvl1 != lvl2 and (lvl1 or lvl2): final_score *= p['service_level_mismatch'] + return final_score + + def match_indicators(self, source_indicators, target_indicators, + threshold=None, id_col='id', name_col='value', deduplicate=True): + if threshold is None: + threshold = self.threshold + all_matches = [] + for source in sorted(source_indicators): + best = self._find_best_match(source, target_indicators, threshold, id_col, name_col) + if best: + all_matches.append({ + 'source_indicator': source, + 'target_indicator': best['name'], + 'target_code' : best['code'], + 'similarity_score': round(best['similarity'] * 100, 1) + }) + if deduplicate and all_matches: + all_matches = self._deduplicate_matches(all_matches) + return all_matches + + def _find_best_match(self, source, targets, threshold, id_col, name_col): + best = None + best_score = threshold + if isinstance(targets, pd.DataFrame): + for _, row in targets.iterrows(): + score = self.calculate_similarity(source, row[name_col]) + if score > best_score: + best_score = score + best = {'code': row[id_col], 'name': row[name_col]} + else: + for target in targets: + score = self.calculate_similarity(source, target) + if score > best_score: + best_score = score + best = {'code': None, 'name': target} + return None if best is None else {**best, 'similarity': best_score} + + def _deduplicate_matches(self, matches): + df = pd.DataFrame(matches).sort_values('similarity_score', ascending=False) + dup_col = 'target_code' if df['target_code'].notna().any() else 'target_indicator' + return df.drop_duplicates(subset=dup_col, keep='first').to_dict('records') + + +# FAO DATA SOURCE → RAW LAYER (Bronze) + +class FAODataSource(DataSource): + """ + FAO Food Security Data Source (BigQuery version) + FIXED: Menggunakan bulk download karena faostat API butuh autentikasi + """ + + def __init__(self, client: bigquery.Client = None): + super().__init__(client) + self.table_name = 'raw_fao' + self.domain_code = 'FS' + self.matcher = IndicatorMatcher() + self.logger.propagate = False + self.download_url = ( + "https://bulks-faostat.fao.org/production/" + "Food_Security_Data_E_All_Data_(Normalized).zip" + ) + + def fetch_data(self) -> pd.DataFrame: + import requests + import zipfile + import io + + print(" Downloading FAO Food Security dataset...") + response = requests.get(self.download_url, timeout=120) + response.raise_for_status() + + with zipfile.ZipFile(io.BytesIO(response.content)) as z: + csv_name = [f for f in z.namelist() if f.endswith('.csv')][0] + df = pd.read_csv(z.open(csv_name), encoding='latin-1') + + if 'Area' in df.columns: + df = df[df['Area'].isin(self.asean_countries)].copy() + + print(f" Raw rows after ASEAN filter: {len(df):,}") + return df + + def transform_data(self, df: pd.DataFrame) -> pd.DataFrame: + if 'Element' in df.columns: + df = df[df['Element'] == 'Value'].copy() + + column_mapping = { + 'Area' : 'country', + 'Year' : 'year', + 'Item' : 'indicator', + 'Value': 'value', + 'Unit' : 'unit' + } + df = df.rename(columns={k: v for k, v in column_mapping.items() if k in df.columns}) + + keep_cols = [c for c in ['country', 'year', 'indicator', 'value', 'unit'] if c in df.columns] + df = df[keep_cols].copy() + + if all(col in df.columns for col in ['indicator', 'country', 'year']): + df = df.sort_values(['indicator', 'country', 'year']).reset_index(drop=True) + + return df + + +# WORLD BANK DATA SOURCE → RAW LAYER (Bronze) + +import wbgapi as wb + +class WorldBankDataSource(DataSource): + + def __init__(self, client: bigquery.Client, fao_indicators: List[str]): + super().__init__(client) + self.table_name = 'raw_worldbank' + self.fao_indicators = fao_indicators + self.asean_iso = CONFIG['asean_iso_codes'] + self.matching_results = [] + self.matcher = IndicatorMatcher() + self.logger.propagate = False + + def fetch_data(self) -> Dict: + wb_indicators = pd.DataFrame(list(wb.series.list())) + matches = self.matcher.match_indicators( + self.fao_indicators, wb_indicators, + threshold=CONFIG['matching']['threshold'], + id_col='id', name_col='value', deduplicate=True + ) + self.matching_results = [{ + 'indikator_fao' : m['source_indicator'], + 'indikator_wb' : m['target_indicator'], + 'kode_wb' : m['target_code'], + 'similarity_persen': m['similarity_score'] + } for m in matches] + + wb_data_dict = {} + for item in self.matching_results: + try: + data = wb.data.DataFrame(item['kode_wb'], self.asean_iso, numericTimeKeys=True) + wb_data_dict[item['indikator_fao']] = data + except Exception: + pass + return wb_data_dict + + def transform_data(self, wb_data_dict: Dict) -> pd.DataFrame: + all_data = [] + for fao_indicator, df_wide in wb_data_dict.items(): + info = next(i for i in self.matching_results if i['indikator_fao'] == fao_indicator) + temp = df_wide.reset_index() + temp.insert(0, 'indicator_wb_original', info['indikator_wb']) + temp.insert(1, 'indicator_fao', fao_indicator) + temp.insert(2, 'wb_code', info['kode_wb']) + all_data.append(temp) + + if not all_data: + return pd.DataFrame() + + df_combined = pd.concat(all_data, ignore_index=True) + id_vars = ['indicator_wb_original', 'indicator_fao', 'wb_code', 'economy'] + value_vars = [c for c in df_combined.columns if c not in id_vars] + + df_long = df_combined.melt( + id_vars=id_vars, value_vars=value_vars, + var_name='year', value_name='value' + ).rename(columns={'economy': 'country'}) + + df_long['year'] = df_long['year'].astype(int) + df_long = df_long[['indicator_wb_original', 'indicator_fao', 'wb_code', + 'country', 'year', 'value']] + return df_long.sort_values(['indicator_fao', 'country', 'year']).reset_index(drop=True) + + +# UNICEF DATA SOURCE → RAW LAYER (Bronze) + +import requests +import time + +class UNICEFDataSource(DataSource): + + def __init__(self, client: bigquery.Client, fao_indicators: List[str]): + super().__init__(client) + self.table_name = 'raw_unicef' + self.fao_indicators = fao_indicators + self.base_url = "https://sdmx.data.unicef.org/ws/public/sdmxapi/rest" + self.datasets = CONFIG['unicef_datasets'] + self.asean_keywords = ['Indonesia', 'Malaysia', 'Thailand', 'Vietnam', 'Viet Nam', + 'Philippines', 'Singapore', 'Myanmar', 'Cambodia', 'Lao', 'Brunei'] + self.matching_results = [] + self.matcher = IndicatorMatcher() + self.logger.propagate = False + + def fetch_data(self) -> pd.DataFrame: + all_data = [] + for dataset_code, dataset_name in self.datasets.items(): + try: + url = f"{self.base_url}/data/{dataset_code}/all/?format=sdmx-json" + response = requests.get(url, timeout=30) + response.raise_for_status() + data_json = response.json() + series_data = data_json['data']['dataSets'][0]['series'] + dimensions = data_json['data']['structure']['dimensions'] + + data_list = [] + for series_key, series_value in series_data.items(): + indices = series_key.split(':') + row_data = {'dataset': dataset_code} + for i, dim in enumerate(dimensions['series']): + row_data[dim['id']] = dim['values'][int(indices[i])]['name'] + for obs_key, obs_value in series_value.get('observations', {}).items(): + obs_row = row_data.copy() + for i, dim in enumerate(dimensions['observation']): + obs_row[dim['id']] = dim['values'][int(obs_key.split(':')[i])]['id'] + obs_row['value'] = obs_value[0] + data_list.append(obs_row) + + df_temp = pd.DataFrame(data_list) + if 'REF_AREA' in df_temp.columns: + asean_found = [c for c in df_temp['REF_AREA'].unique() + if any(k.lower() in c.lower() for k in self.asean_keywords)] + df_temp = df_temp[df_temp['REF_AREA'].isin(asean_found)] + if len(df_temp) > 0: + all_data.append(df_temp) + time.sleep(0.5) + except Exception: + pass + + return pd.concat(all_data, ignore_index=True) if all_data else pd.DataFrame() + + def transform_data(self, df: pd.DataFrame) -> pd.DataFrame: + if df.empty: + return df + indicator_col = next((col for col in df.columns if 'indicator' in col.lower()), None) + if not indicator_col: + return pd.DataFrame() + + unicef_indicators = df[indicator_col].unique() + matches = self.matcher.match_indicators( + self.fao_indicators, list(unicef_indicators), + threshold=CONFIG['matching']['threshold'], deduplicate=True + ) + + self.matching_results = [] + for match in matches: + matched_rows = df[df[indicator_col] == match['target_indicator']] + if len(matched_rows) > 0: + self.matching_results.append({ + 'indikator_fao' : match['source_indicator'], + 'indikator_unicef': match['target_indicator'], + 'unicef_dataset' : matched_rows['dataset'].iloc[0], + 'similarity_persen': match['similarity_score'] + }) + + if not self.matching_results: + return pd.DataFrame() + + unicef_matched = [i['indikator_unicef'] for i in self.matching_results] + df_filtered = df[df[indicator_col].isin(unicef_matched)].copy() + df_filtered = df_filtered.rename(columns={ + indicator_col: 'indicator_unicef_original', + 'REF_AREA' : 'country', + 'TIME_PERIOD': 'year', + 'value' : 'value' + }) + unicef_to_fao = {i['indikator_unicef']: i['indikator_fao'] for i in self.matching_results} + df_filtered['indicator_fao'] = df_filtered['indicator_unicef_original'].map(unicef_to_fao) + return df_filtered + + +# STAGING DATA INTEGRATION → STAGING LAYER (Silver) + +class StagingDataIntegration: + """ + Staging Data Integration (BigQuery version) + + Input : RAW layer (Bronze) — raw_fao, raw_worldbank, raw_unicef + Output : STAGING layer (Silver) — staging_integrated + Audit : etl_logs, etl_metadata (Audit → fs_asean_audit) + + Schema staging_integrated: + source varchar(20) + indicator_original varchar(255) + indicator_standardized varchar(255) + country varchar(100) + year int + year_range varchar(20) + value float + unit varchar(20) + """ + + def __init__(self, client: bigquery.Client): + self.client = client + self.logger = logging.getLogger(self.__class__.__name__) + self.logger.propagate = False + self.staging_table = 'staging_integrated' + + self.metadata = { + 'source_class' : self.__class__.__name__, + 'table_name' : self.staging_table, + 'start_time' : None, + 'end_time' : None, + 'duration_seconds' : None, + 'rows_fetched' : 0, + 'rows_transformed' : 0, + 'rows_loaded' : 0, + 'validation_metrics': {} + } + + def load_raw_data(self) -> Dict[str, pd.DataFrame]: + """Load data dari semua tabel RAW layer (Bronze)""" + raw_data = {} + + try: + raw_data['fao'] = read_from_bigquery(self.client, 'raw_fao', layer='bronze') + except Exception: + raw_data['fao'] = pd.DataFrame() + + try: + raw_data['worldbank'] = read_from_bigquery(self.client, 'raw_worldbank', layer='bronze') + except Exception: + raw_data['worldbank'] = pd.DataFrame() + + try: + raw_data['unicef'] = read_from_bigquery(self.client, 'raw_unicef', layer='bronze') + except Exception: + raw_data['unicef'] = pd.DataFrame() + + return raw_data + + def clean_value(self, value): + """Clean dan convert value ke float""" + if pd.isna(value): + return None + value_str = str(value).strip().replace('<', '').replace('>', '').strip() + try: + return float(value_str) + except: + return None + + def process_year_range(self, year_value): + """ + Process year range dan return (year_int, year_range_str) + Examples: + "2020" → (2020, "2020") + "2020-2021" → (2020, "2020-2021") + "2019–2021" → (2020, "2019-2021") + """ + if pd.isna(year_value): + return None, None + + year_str = str(year_value).strip().replace('–', '-').replace('—', '-') + + if '-' in year_str: + try: + parts = year_str.split('-') + if len(parts) == 2: + start_year = int(parts[0].strip()) + end_year = int(parts[1].strip()) + return (start_year + end_year) // 2, year_str + else: + return int(float(year_str)), year_str + except: + return None, year_str + else: + try: + single_year = int(float(year_str)) + return single_year, str(single_year) + except: + return None, year_str + + def truncate_string(self, value, max_length: int) -> str: + """Truncate string sesuai varchar constraint""" + if pd.isna(value): + return '' + s = str(value).strip() + return s[:max_length] if len(s) > max_length else s + + def standardize_dataframe(self, df: pd.DataFrame, source: str, + indicator_orig_col: str, indicator_std_col: str, + country_col: str, year_col: str, value_col: str, + unit_col: str = None) -> pd.DataFrame: + """Standardize dataframe ke schema staging_integrated""" + if df.empty: + return pd.DataFrame() + + df_clean = df.copy().dropna(subset=[indicator_orig_col, country_col, year_col]) + year_data = df_clean[year_col].apply(self.process_year_range) + units = df_clean[unit_col].fillna('') if (unit_col and unit_col in df_clean.columns) else '' + + return pd.DataFrame({ + 'source' : [self.truncate_string(source, 20)] * len(df_clean), + 'indicator_original' : df_clean[indicator_orig_col].apply(lambda x: self.truncate_string(x, 255)), + 'indicator_standardized': df_clean[indicator_std_col].apply(lambda x: self.truncate_string(x, 255)), + 'country' : df_clean[country_col].apply(lambda x: self.truncate_string(x, 100)), + 'year' : [y[0] for y in year_data], + 'year_range' : [self.truncate_string(y[1], 20) for y in year_data], + 'value' : df_clean[value_col].apply(self.clean_value), + 'unit' : [ + self.truncate_string(u, 20) + for u in (units if isinstance(units, pd.Series) + else [units] * len(df_clean)) + ] + }) + + def standardize_schema(self, raw_data: Dict[str, pd.DataFrame]) -> pd.DataFrame: + """Standardize schema dari semua sumber data""" + integrated_data = [] + + # FAO — deteksi kolom (nama asli atau sudah di-rename) + if not raw_data['fao'].empty: + df = raw_data['fao'].copy() + integrated_data.append(self.standardize_dataframe( + df, 'FAO', + indicator_orig_col='Item' if 'Item' in df.columns else 'indicator', + indicator_std_col ='Item' if 'Item' in df.columns else 'indicator', + country_col ='Area' if 'Area' in df.columns else 'country', + year_col ='Year' if 'Year' in df.columns else 'year', + value_col ='Value' if 'Value' in df.columns else 'value', + unit_col ='Unit' if 'Unit' in df.columns else ('unit' if 'unit' in df.columns else None) + )) + + # World Bank + if not raw_data['worldbank'].empty: + df = raw_data['worldbank'].copy() + integrated_data.append(self.standardize_dataframe( + df, 'World Bank', + indicator_orig_col='indicator_wb_original', + indicator_std_col ='indicator_fao', + country_col ='country', + year_col ='year', + value_col ='value', + unit_col ='unit' if 'unit' in df.columns else None + )) + + # UNICEF + if not raw_data['unicef'].empty: + df = raw_data['unicef'].copy() + integrated_data.append(self.standardize_dataframe( + df, 'UNICEF', + indicator_orig_col='indicator_unicef_original', + indicator_std_col ='indicator_fao', + country_col ='country', + year_col ='year', + value_col ='value', + unit_col ='unit' if 'unit' in df.columns else None + )) + + if not integrated_data: + return pd.DataFrame() + + df_integrated = pd.concat(integrated_data, ignore_index=True) + + # Final type conversion + df_integrated['year'] = pd.to_numeric(df_integrated['year'], errors='coerce') + df_integrated['value'] = pd.to_numeric(df_integrated['value'], errors='coerce') + + # Enforce varchar constraints + for col, max_len in [('source', 20), ('country', 100), ('indicator_original', 255), + ('indicator_standardized', 255), ('year_range', 20), ('unit', 20)]: + df_integrated[col] = df_integrated[col].astype(str).apply( + lambda x: self.truncate_string(x, max_len) + ) + + return df_integrated.sort_values( + ['source', 'indicator_standardized', 'country', 'year'] + ).reset_index(drop=True) + + def validate_data(self, df: pd.DataFrame) -> Dict: + """Validate data dan return metrics""" + validation = { + 'total_rows' : int(len(df)), + 'total_columns' : int(len(df.columns)), + 'duplicate_count' : int(df.duplicated().sum()), + 'completeness_pct': float(round((1 - df.isnull().sum().sum() / df.size) * 100, 2)), + 'memory_usage_mb' : float(round(df.memory_usage(deep=True).sum() / 1024**2, 2)) + } + + if 'year' in df.columns: + validation['year_range'] = { + 'min' : int(df['year'].min()) if not df['year'].isnull().all() else None, + 'max' : int(df['year'].max()) if not df['year'].isnull().all() else None, + 'unique_years': int(df['year'].nunique()) + } + + if 'source' in df.columns: + validation['source_breakdown'] = { + str(k): int(v) for k, v in df['source'].value_counts().to_dict().items() + } + + if 'indicator_standardized' in df.columns: + validation['unique_indicators'] = int(df['indicator_standardized'].nunique()) + + if 'country' in df.columns: + validation['unique_countries'] = int(df['country'].nunique()) + + validation['schema_validation'] = { + 'source_max_length' : int(df['source'].str.len().max()) if 'source' in df.columns else 0, + 'indicator_original_max_length' : int(df['indicator_original'].str.len().max()) if 'indicator_original' in df.columns else 0, + 'indicator_standardized_max_length': int(df['indicator_standardized'].str.len().max()) if 'indicator_standardized' in df.columns else 0, + 'country_max_length' : int(df['country'].str.len().max()) if 'country' in df.columns else 0, + 'year_range_max_length' : int(df['year_range'].str.len().max()) if 'year_range' in df.columns else 0, + 'unit_max_length' : int(df['unit'].str.len().max()) if 'unit' in df.columns else 0 + } + + return validation + + def save_to_staging(self, df: pd.DataFrame): + """Save data ke staging_integrated table di STAGING layer (Silver)""" + try: + schema = get_staging_schema() + + load_to_bigquery( + self.client, + df, + self.staging_table, + layer='silver', # → fs_asean_silver + write_disposition="WRITE_TRUNCATE", + schema=schema + ) + + log_update(self.client, 'STAGING', self.staging_table, 'full_refresh', len(df)) + + except Exception as e: + print(f"save_to_staging FAILED: {type(e).__name__}: {e}") + log_update(self.client, 'STAGING', self.staging_table, 'full_refresh', 0, + status='failed', error_msg=str(e)) + raise + + def run(self) -> pd.DataFrame: + """Run staging integration process""" + self.metadata['start_time'] = datetime.now() + + try: + print("Integrating data from all sources...") + + raw_data = self.load_raw_data() + total_fetched = sum(len(df) for df in raw_data.values()) + self.metadata['rows_fetched'] = total_fetched + print(f" Total rows fetched: {total_fetched:,}") + + df_integrated = self.standardize_schema(raw_data) + + if df_integrated.empty: + print("No data to integrate") + return df_integrated + + self.metadata['rows_transformed'] = len(df_integrated) + + validation = self.validate_data(df_integrated) + self.metadata['validation_metrics'] = validation + + self.save_to_staging(df_integrated) + self.metadata['rows_loaded'] = len(df_integrated) + + self.metadata['end_time'] = datetime.now() + self.metadata['duration_seconds'] = ( + self.metadata['end_time'] - self.metadata['start_time'] + ).total_seconds() + self.metadata['execution_timestamp'] = self.metadata['start_time'] + self.metadata['completeness_pct'] = validation.get('completeness_pct', 0) + self.metadata['config_snapshot'] = json.dumps(CONFIG['matching']) + self.metadata['validation_metrics'] = json.dumps(validation) + + save_etl_metadata(self.client, self.metadata) + + # Summary + print(f" ✓ Staging Integration completed: {len(df_integrated):,} rows") + print(f" Duration : {self.metadata['duration_seconds']:.2f}s") + if 'source_breakdown' in validation: + for src, cnt in validation['source_breakdown'].items(): + print(f" - {src}: {cnt:,} rows") + print(f" Indicators : {validation.get('unique_indicators', '-')}") + print(f" Countries : {validation.get('unique_countries', '-')}") + if 'year_range' in validation: + yr = validation['year_range'] + if yr['min'] and yr['max']: + print(f" Year range : {yr['min']}–{yr['max']}") + print(f" Completeness: {validation['completeness_pct']:.2f}%") + + schema_val = validation['schema_validation'] + print(f"\n Schema Validation:") + print(f" - source max length : {schema_val['source_max_length']}/20") + print(f" - indicator_original max length : {schema_val['indicator_original_max_length']}/255") + print(f" - indicator_std max length : {schema_val['indicator_standardized_max_length']}/255") + print(f" - country max length : {schema_val['country_max_length']}/100") + print(f" - year_range max length : {schema_val['year_range_max_length']}/20") + print(f" - unit max length : {schema_val['unit_max_length']}/20") + + print(f"\n Metadata → [AUDIT] etl_metadata") + + return df_integrated + + except Exception as e: + self.logger.error(f"Staging integration failed: {str(e)}") + raise + +# MAIN EXECUTION + +if __name__ == "__main__": + print("=" * 60) + print("BIGQUERY RAW LAYER ETL") + print("Kimball DW Architecture") + print("=" * 60) + + logger = setup_logging() + client = get_bigquery_client() + + # ── FAO ────────────────────────────────────────────────────────────────── + print("\n[1/4] Loading FAO Food Security Data → RAW (Bronze)...") + fao_source = FAODataSource(client) + df_fao = fao_source.run() + + print(f" ✓ raw_fao: {len(df_fao):,} rows") + print(f" Indicators : {df_fao['indicator'].nunique()}") + print(f" Countries : {df_fao['country'].nunique()}") + print(f" Year range : {df_fao['year'].min()}–{df_fao['year'].max()}") + + fao_indicators = df_fao['indicator'].unique() + + # ── World Bank ──────────────────────────────────────────────────────────── + print("\n[2/4] Loading World Bank Data → RAW (Bronze)...") + wb_source = WorldBankDataSource(client, list(fao_indicators)) + df_wb = wb_source.run() + + print(f" ✓ raw_worldbank: {len(df_wb):,} rows") + print(f" Matched indicators : {df_wb['indicator_fao'].nunique()}") + print(f" Countries : {df_wb['country'].nunique()}") + if len(df_wb) > 0: + print(f" Year range : {df_wb['year'].min()}–{df_wb['year'].max()}") + + # ── UNICEF ──────────────────────────────────────────────────────────────── + print("\n[3/4] Loading UNICEF Data → RAW (Bronze)...") + unicef_source = UNICEFDataSource(client, list(fao_indicators)) + df_unicef = unicef_source.run() + + print(f" ✓ raw_unicef: {len(df_unicef):,} rows") + if len(df_unicef) > 0: + print(f" Matched indicators : {df_unicef['indicator_fao'].nunique()}") + print(f" Countries : {df_unicef['country'].nunique()}") + + # ── Staging Integration ─────────────────────────────────────────────────── + print("\n[4/4] Staging Integration → STAGING (Silver)...") + staging = StagingDataIntegration(client) + df_staging = staging.run() + + print("\n" + "=" * 60) + print("✓ ETL COMPLETED") + print(f"RAW (Bronze) : raw_fao, raw_worldbank, raw_unicef") + print(f"STAGING (Silver) : staging_integrated") + print(f"AUDIT : etl_logs, etl_metadata") + print("=" * 60) \ No newline at end of file