From 453fb3ef52b09320debc8c6030a8a44215dca953 Mon Sep 17 00:00:00 2001 From: Debby Date: Sat, 14 Mar 2026 23:29:45 +0700 Subject: [PATCH] create dimensional model --- dags/etl_food_security.py | 10 +- scripts/bigquery_dimesional_model.py | 850 +++++++++++++++++++++++++++ 2 files changed, 859 insertions(+), 1 deletion(-) create mode 100644 scripts/bigquery_dimesional_model.py diff --git a/dags/etl_food_security.py b/dags/etl_food_security.py index d5010c7..c031db5 100644 --- a/dags/etl_food_security.py +++ b/dags/etl_food_security.py @@ -15,6 +15,10 @@ from scripts.bigquery_cleaned_layer import ( run_cleaned_integration, ) +from scripts.bigquery_dimesional_model import ( + run_dimensional_model, +) + with DAG( dag_id = "etl_food_security_bigquery", @@ -54,6 +58,10 @@ with DAG( python_callable = run_cleaned_integration ) + task_dimensional = PythonOperator( + task_id = "dimensional_model_to_gold", + python_callable = run_dimensional_model + ) - task_verify >> task_fao >> task_worldbank >> task_unicef >> task_staging >> task_cleaned \ No newline at end of file + task_verify >> task_fao >> task_worldbank >> task_unicef >> task_staging >> task_cleaned >> task_dimensional \ No newline at end of file diff --git a/scripts/bigquery_dimesional_model.py b/scripts/bigquery_dimesional_model.py new file mode 100644 index 0000000..a5e665c --- /dev/null +++ b/scripts/bigquery_dimesional_model.py @@ -0,0 +1,850 @@ +""" +BIGQUERY DIMENSIONAL MODEL LOAD +Kimball Data Warehouse Architecture + +Kimball ETL Flow yang dijalankan file ini: + Input : STAGING layer (Silver) — cleaned_integrated (fs_asean_silver) + Output : DW layer (Gold) — dim_*, fact_* (fs_asean_gold) + Audit : AUDIT layer — etl_logs, etl_metadata (fs_asean_audit) + +Classes: + DimensionalModelLoader — Build Star Schema & load ke Gold layer + +Usage: + python bigquery_dimensional_model.py +""" + +import pandas as pd +import numpy as np +from datetime import datetime +import logging +from typing import Dict, List +import json +import sys + +from scripts.bigquery_config import get_bigquery_client, CONFIG, get_table_id +from scripts.bigquery_helpers import ( + log_update, + load_to_bigquery, + read_from_bigquery, + setup_logging, + truncate_table, + save_etl_metadata, +) +from google.cloud import bigquery + +if hasattr(sys.stdout, 'reconfigure'): + sys.stdout.reconfigure(encoding='utf-8') + + +# ============================================================================= +# DIMENSIONAL MODEL LOADER +# ============================================================================= + +class DimensionalModelLoader: + """ + Loader untuk dimensional model ke DW layer (Gold) — fs_asean_gold. + + Kimball context: + Input : cleaned_integrated → STAGING (Silver) — fs_asean_silver + Output : dim_* + fact_* → DW (Gold) — fs_asean_gold + Audit : etl_logs, etl_metadata → AUDIT — fs_asean_audit + + Pipeline steps: + 1. Load dim_country + 2. Load dim_indicator + 3. Load dim_time + 4. Load dim_source + 5. Load dim_pillar + 6. Load fact_food_security (resolve FK dari Gold dims) + 7. Validate constraints & data load + """ + + def __init__(self, client: bigquery.Client, df_clean: pd.DataFrame): + self.client = client + self.df_clean = df_clean + self.logger = logging.getLogger(self.__class__.__name__) + self.logger.propagate = False + self.target_layer = 'gold' + + self.load_metadata = { + 'dim_country' : {'start_time': None, 'end_time': None, 'rows_loaded': 0, 'status': 'pending'}, + 'dim_indicator' : {'start_time': None, 'end_time': None, 'rows_loaded': 0, 'status': 'pending'}, + 'dim_time' : {'start_time': None, 'end_time': None, 'rows_loaded': 0, 'status': 'pending'}, + 'dim_source' : {'start_time': None, 'end_time': None, 'rows_loaded': 0, 'status': 'pending'}, + 'dim_pillar' : {'start_time': None, 'end_time': None, 'rows_loaded': 0, 'status': 'pending'}, + 'fact_food_security': {'start_time': None, 'end_time': None, 'rows_loaded': 0, 'status': 'pending'}, + } + + self.pipeline_metadata = { + 'source_class' : self.__class__.__name__, + 'start_time' : None, + 'end_time' : None, + 'duration_seconds' : None, + 'rows_fetched' : 0, + 'rows_transformed' : 0, + 'rows_loaded' : 0, + 'validation_metrics': {} + } + + # ------------------------------------------------------------------ + # CONSTRAINT HELPERS + # ------------------------------------------------------------------ + + def _add_primary_key(self, table_name: str, column_name: str): + table_id = get_table_id(table_name, layer='gold') + query = f"ALTER TABLE `{table_id}` ADD PRIMARY KEY ({column_name}) NOT ENFORCED" + try: + self.client.query(query).result() + self.logger.info(f" [OK] PRIMARY KEY: {table_name}({column_name})") + except Exception as e: + if "already exists" in str(e).lower(): + self.logger.info(f" [INFO] PRIMARY KEY already exists: {table_name}({column_name})") + else: + self.logger.warning(f" [WARN] Could not add PRIMARY KEY to {table_name}.{column_name}: {e}") + + def _add_foreign_key(self, table_name: str, fk_column: str, + ref_table: str, ref_column: str): + table_id = get_table_id(table_name, layer='gold') + ref_table_id = get_table_id(ref_table, layer='gold') + constraint_name = f"fk_{table_name}_{fk_column}" + query = f""" + ALTER TABLE `{table_id}` + ADD CONSTRAINT {constraint_name} + FOREIGN KEY ({fk_column}) + REFERENCES `{ref_table_id}`({ref_column}) + NOT ENFORCED + """ + try: + self.client.query(query).result() + self.logger.info(f" [OK] FK: {table_name}.{fk_column} → {ref_table}.{ref_column}") + except Exception as e: + if "already exists" in str(e).lower(): + self.logger.info(f" [INFO] FK already exists: {constraint_name}") + else: + self.logger.warning(f" [WARN] Could not add FK {constraint_name}: {e}") + + # ------------------------------------------------------------------ + # METADATA HELPER + # ------------------------------------------------------------------ + + def _save_table_metadata(self, table_name: str): + meta = self.load_metadata[table_name] + metadata = { + 'source_class' : self.__class__.__name__, + 'table_name' : table_name, + 'execution_timestamp': meta['start_time'], + 'duration_seconds' : (meta['end_time'] - meta['start_time']).total_seconds() + if meta['end_time'] else 0, + 'rows_fetched' : 0, + 'rows_transformed' : meta['rows_loaded'], + 'rows_loaded' : meta['rows_loaded'], + 'completeness_pct' : 100.0 if meta['status'] == 'success' else 0.0, + 'config_snapshot' : json.dumps({'load_mode': 'full_refresh', 'layer': 'gold'}), + 'validation_metrics' : json.dumps({'status': meta['status'], 'rows': meta['rows_loaded']}) + } + try: + save_etl_metadata(self.client, metadata) + self.logger.info(f" Metadata → [AUDIT] etl_metadata") + except Exception as e: + self.logger.warning(f" [WARN] Could not save metadata for {table_name}: {e}") + + # ------------------------------------------------------------------ + # DIMENSION LOADERS + # ------------------------------------------------------------------ + + def load_dim_time(self): + table_name = 'dim_time' + self.load_metadata[table_name]['start_time'] = datetime.now() + self.logger.info("Loading dim_time → [DW/Gold] fs_asean_gold...") + + try: + if 'year_range' in self.df_clean.columns: + dim_time = self.df_clean[['year', 'year_range']].drop_duplicates().copy() + else: + dim_time = self.df_clean[['year']].drop_duplicates().copy() + dim_time['year_range'] = None + + dim_time['year'] = dim_time['year'].astype(int) + + def parse_year_range(row): + year = row['year'] + year_range = row.get('year_range') + start_year = year + end_year = year + if pd.notna(year_range) and year_range is not None: + yr_str = str(year_range).strip() + if yr_str and yr_str != 'nan': + if '-' in yr_str: + parts = yr_str.split('-') + if len(parts) == 2: + try: + start_year = int(parts[0].strip()) + end_year = int(parts[1].strip()) + year = (start_year + end_year) // 2 + except Exception: + pass + else: + try: + single = int(yr_str) + start_year = single + end_year = single + year = single + except Exception: + pass + return pd.Series({'year': year, 'start_year': start_year, 'end_year': end_year}) + + parsed = dim_time.apply(parse_year_range, axis=1) + dim_time['year'] = parsed['year'].astype(int) + dim_time['start_year'] = parsed['start_year'].astype(int) + dim_time['end_year'] = parsed['end_year'].astype(int) + dim_time['is_year_range'] = (dim_time['start_year'] != dim_time['end_year']) + dim_time['decade'] = (dim_time['year'] // 10) * 10 + dim_time['is_range'] = (dim_time['start_year'] != dim_time['end_year']).astype(int) + dim_time = dim_time.sort_values(['is_range', 'start_year'], ascending=[True, True]) + dim_time = dim_time.drop(['is_range', 'year_range'], axis=1, errors='ignore') + dim_time = dim_time.drop_duplicates(subset=['start_year', 'end_year'], keep='first') + + dim_time_final = dim_time[['year', 'start_year', 'end_year', 'decade', 'is_year_range']].copy() + dim_time_final = dim_time_final.reset_index(drop=True) + dim_time_final.insert(0, 'time_id', range(1, len(dim_time_final) + 1)) + + schema = [ + bigquery.SchemaField("time_id", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("start_year", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("end_year", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("decade", "INTEGER", mode="NULLABLE"), + bigquery.SchemaField("is_year_range", "BOOLEAN", mode="NULLABLE"), + ] + + rows_loaded = load_to_bigquery( + self.client, dim_time_final, table_name, + layer='gold', write_disposition="WRITE_TRUNCATE", schema=schema + ) + self._add_primary_key(table_name, 'time_id') + + self.load_metadata[table_name].update( + {'rows_loaded': rows_loaded, 'status': 'success', 'end_time': datetime.now()} + ) + log_update(self.client, 'DW', table_name, 'full_load', rows_loaded) + self._save_table_metadata(table_name) + self.logger.info(f" ✓ dim_time: {rows_loaded} rows\n") + return rows_loaded + + except Exception as e: + self.load_metadata[table_name].update({'status': 'failed', 'end_time': datetime.now()}) + log_update(self.client, 'DW', table_name, 'full_load', 0, 'failed', str(e)) + raise + + def load_dim_country(self): + table_name = 'dim_country' + self.load_metadata[table_name]['start_time'] = datetime.now() + self.logger.info("Loading dim_country → [DW/Gold] fs_asean_gold...") + + try: + dim_country = self.df_clean[['country']].drop_duplicates().copy() + dim_country.columns = ['country_name'] + + region_mapping = { + 'Brunei Darussalam': ('Southeast Asia', 'ASEAN'), + 'Cambodia' : ('Southeast Asia', 'ASEAN'), + 'Indonesia' : ('Southeast Asia', 'ASEAN'), + 'Laos' : ('Southeast Asia', 'ASEAN'), + 'Malaysia' : ('Southeast Asia', 'ASEAN'), + 'Myanmar' : ('Southeast Asia', 'ASEAN'), + 'Philippines' : ('Southeast Asia', 'ASEAN'), + 'Singapore' : ('Southeast Asia', 'ASEAN'), + 'Thailand' : ('Southeast Asia', 'ASEAN'), + 'Vietnam' : ('Southeast Asia', 'ASEAN'), + } + iso_mapping = { + 'Brunei Darussalam': 'BRN', 'Cambodia': 'KHM', 'Indonesia': 'IDN', + 'Laos': 'LAO', 'Malaysia': 'MYS', 'Myanmar': 'MMR', + 'Philippines': 'PHL', 'Singapore': 'SGP', 'Thailand': 'THA', 'Vietnam': 'VNM', + } + + dim_country['region'] = dim_country['country_name'].map( + lambda x: region_mapping.get(x, ('Unknown', 'Unknown'))[0]) + dim_country['subregion'] = dim_country['country_name'].map( + lambda x: region_mapping.get(x, ('Unknown', 'Unknown'))[1]) + dim_country['iso_code'] = dim_country['country_name'].map(iso_mapping) + + dim_country_final = dim_country[['country_name', 'region', 'subregion', 'iso_code']].copy() + dim_country_final = dim_country_final.reset_index(drop=True) + dim_country_final.insert(0, 'country_id', range(1, len(dim_country_final) + 1)) + + schema = [ + bigquery.SchemaField("country_id", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("country_name", "STRING", mode="REQUIRED"), + bigquery.SchemaField("region", "STRING", mode="NULLABLE"), + bigquery.SchemaField("subregion", "STRING", mode="NULLABLE"), + bigquery.SchemaField("iso_code", "STRING", mode="NULLABLE"), + ] + + rows_loaded = load_to_bigquery( + self.client, dim_country_final, table_name, + layer='gold', write_disposition="WRITE_TRUNCATE", schema=schema + ) + self._add_primary_key(table_name, 'country_id') + + self.load_metadata[table_name].update( + {'rows_loaded': rows_loaded, 'status': 'success', 'end_time': datetime.now()} + ) + log_update(self.client, 'DW', table_name, 'full_load', rows_loaded) + self._save_table_metadata(table_name) + self.logger.info(f" ✓ dim_country: {rows_loaded} rows\n") + return rows_loaded + + except Exception as e: + self.load_metadata[table_name].update({'status': 'failed', 'end_time': datetime.now()}) + log_update(self.client, 'DW', table_name, 'full_load', 0, 'failed', str(e)) + raise + + def load_dim_indicator(self): + table_name = 'dim_indicator' + self.load_metadata[table_name]['start_time'] = datetime.now() + self.logger.info("Loading dim_indicator → [DW/Gold] fs_asean_gold...") + + try: + has_direction = 'direction' in self.df_clean.columns + has_unit = 'unit' in self.df_clean.columns + has_category = 'indicator_category' in self.df_clean.columns + + dim_indicator = self.df_clean[['indicator_standardized']].drop_duplicates().copy() + dim_indicator.columns = ['indicator_name'] + + if has_unit: + unit_map = self.df_clean[['indicator_standardized', 'unit']].drop_duplicates() + unit_map.columns = ['indicator_name', 'unit'] + dim_indicator = dim_indicator.merge(unit_map, on='indicator_name', how='left') + else: + dim_indicator['unit'] = None + + if has_direction: + dir_map = self.df_clean[['indicator_standardized', 'direction']].drop_duplicates() + dir_map.columns = ['indicator_name', 'direction'] + dim_indicator = dim_indicator.merge(dir_map, on='indicator_name', how='left') + self.logger.info(" [OK] direction column from cleaned_integrated") + else: + dim_indicator['direction'] = 'higher_better' + self.logger.warning(" [WARN] direction not found, default: higher_better") + + if has_category: + cat_map = self.df_clean[['indicator_standardized', 'indicator_category']].drop_duplicates() + cat_map.columns = ['indicator_name', 'indicator_category'] + dim_indicator = dim_indicator.merge(cat_map, on='indicator_name', how='left') + else: + def categorize_indicator(name): + n = str(name).lower() + if any(w in n for w in ['undernourishment', 'malnutrition', 'stunting', + 'wasting', 'anemia', 'food security', 'food insecure', 'hunger']): + return 'Health & Nutrition' + elif any(w in n for w in ['production', 'yield', 'cereal', 'crop', + 'import dependency', 'share of dietary']): + return 'Agricultural Production' + elif any(w in n for w in ['import', 'export', 'trade']): + return 'Trade' + elif any(w in n for w in ['gdp', 'income', 'economic']): + return 'Economic' + elif any(w in n for w in ['water', 'sanitation', 'infrastructure', 'rail']): + return 'Infrastructure' + else: + return 'Other' + dim_indicator['indicator_category'] = dim_indicator['indicator_name'].apply(categorize_indicator) + + dim_indicator = dim_indicator.drop_duplicates(subset=['indicator_name'], keep='first') + dim_indicator_final = dim_indicator[ + ['indicator_name', 'indicator_category', 'unit', 'direction'] + ].copy() + dim_indicator_final = dim_indicator_final.reset_index(drop=True) + dim_indicator_final.insert(0, 'indicator_id', range(1, len(dim_indicator_final) + 1)) + + schema = [ + bigquery.SchemaField("indicator_id", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("indicator_name", "STRING", mode="REQUIRED"), + bigquery.SchemaField("indicator_category", "STRING", mode="REQUIRED"), + bigquery.SchemaField("unit", "STRING", mode="NULLABLE"), + bigquery.SchemaField("direction", "STRING", mode="REQUIRED"), + ] + + rows_loaded = load_to_bigquery( + self.client, dim_indicator_final, table_name, + layer='gold', write_disposition="WRITE_TRUNCATE", schema=schema + ) + self._add_primary_key(table_name, 'indicator_id') + + for label, col in [('Categories', 'indicator_category'), ('Direction', 'direction')]: + self.logger.info(f" {label}:") + for val, cnt in dim_indicator_final[col].value_counts().items(): + self.logger.info(f" - {val}: {cnt} ({cnt/len(dim_indicator_final)*100:.1f}%)") + + self.load_metadata[table_name].update( + {'rows_loaded': rows_loaded, 'status': 'success', 'end_time': datetime.now()} + ) + log_update(self.client, 'DW', table_name, 'full_load', rows_loaded) + self._save_table_metadata(table_name) + self.logger.info(f" ✓ dim_indicator: {rows_loaded} rows\n") + return rows_loaded + + except Exception as e: + self.load_metadata[table_name].update({'status': 'failed', 'end_time': datetime.now()}) + log_update(self.client, 'DW', table_name, 'full_load', 0, 'failed', str(e)) + raise + + def load_dim_source(self): + table_name = 'dim_source' + self.load_metadata[table_name]['start_time'] = datetime.now() + self.logger.info("Loading dim_source → [DW/Gold] fs_asean_gold...") + + try: + source_details = { + 'FAO': { + 'source_type' : 'International Organization', + 'organization' : 'Food and Agriculture Organization', + 'access_method': 'Python Library (faostat)', + 'api_endpoint' : None, + }, + 'World Bank': { + 'source_type' : 'International Organization', + 'organization' : 'The World Bank', + 'access_method': 'Python Library (wbgapi)', + 'api_endpoint' : None, + }, + 'UNICEF': { + 'source_type' : 'International Organization', + 'organization' : "United Nations Children's Fund", + 'access_method': 'SDMX API', + 'api_endpoint' : 'https://sdmx.data.unicef.org/ws/public/sdmxapi/rest', + }, + } + + sources_data = [] + for source in self.df_clean['source'].unique(): + detail = source_details.get(source, { + 'source_type' : 'International Organization', + 'organization' : source, + 'access_method': 'Unknown', + 'api_endpoint' : None, + }) + sources_data.append({'source_name': source, **detail}) + + dim_source_final = pd.DataFrame(sources_data)[ + ['source_name', 'source_type', 'organization', 'access_method', 'api_endpoint'] + ].copy() + dim_source_final = dim_source_final.reset_index(drop=True) + dim_source_final.insert(0, 'source_id', range(1, len(dim_source_final) + 1)) + + schema = [ + bigquery.SchemaField("source_id", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("source_name", "STRING", mode="REQUIRED"), + bigquery.SchemaField("source_type", "STRING", mode="NULLABLE"), + bigquery.SchemaField("organization", "STRING", mode="NULLABLE"), + bigquery.SchemaField("access_method", "STRING", mode="NULLABLE"), + bigquery.SchemaField("api_endpoint", "STRING", mode="NULLABLE"), + ] + + rows_loaded = load_to_bigquery( + self.client, dim_source_final, table_name, + layer='gold', write_disposition="WRITE_TRUNCATE", schema=schema + ) + self._add_primary_key(table_name, 'source_id') + + self.load_metadata[table_name].update( + {'rows_loaded': rows_loaded, 'status': 'success', 'end_time': datetime.now()} + ) + log_update(self.client, 'DW', table_name, 'full_load', rows_loaded) + self._save_table_metadata(table_name) + self.logger.info(f" ✓ dim_source: {rows_loaded} rows\n") + return rows_loaded + + except Exception as e: + self.load_metadata[table_name].update({'status': 'failed', 'end_time': datetime.now()}) + log_update(self.client, 'DW', table_name, 'full_load', 0, 'failed', str(e)) + raise + + def load_dim_pillar(self): + table_name = 'dim_pillar' + self.load_metadata[table_name]['start_time'] = datetime.now() + self.logger.info("Loading dim_pillar → [DW/Gold] fs_asean_gold...") + + try: + pillar_codes = { + 'Availability': 'AVL', 'Access' : 'ACC', + 'Utilization' : 'UTL', 'Stability': 'STB', 'Other': 'OTH', + } + pillars_data = [ + {'pillar_name': p, 'pillar_code': pillar_codes.get(p, 'OTH')} + for p in self.df_clean['pillar'].unique() + ] + + dim_pillar_final = pd.DataFrame(pillars_data).sort_values('pillar_name')[ + ['pillar_name', 'pillar_code'] + ].copy() + dim_pillar_final = dim_pillar_final.reset_index(drop=True) + dim_pillar_final.insert(0, 'pillar_id', range(1, len(dim_pillar_final) + 1)) + + schema = [ + bigquery.SchemaField("pillar_id", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("pillar_name", "STRING", mode="REQUIRED"), + bigquery.SchemaField("pillar_code", "STRING", mode="NULLABLE"), + ] + + rows_loaded = load_to_bigquery( + self.client, dim_pillar_final, table_name, + layer='gold', write_disposition="WRITE_TRUNCATE", schema=schema + ) + self._add_primary_key(table_name, 'pillar_id') + + self.load_metadata[table_name].update( + {'rows_loaded': rows_loaded, 'status': 'success', 'end_time': datetime.now()} + ) + log_update(self.client, 'DW', table_name, 'full_load', rows_loaded) + self._save_table_metadata(table_name) + self.logger.info(f" ✓ dim_pillar: {rows_loaded} rows\n") + return rows_loaded + + except Exception as e: + self.load_metadata[table_name].update({'status': 'failed', 'end_time': datetime.now()}) + log_update(self.client, 'DW', table_name, 'full_load', 0, 'failed', str(e)) + raise + + # ------------------------------------------------------------------ + # FACT LOADER + # ------------------------------------------------------------------ + + def load_fact_food_security(self): + table_name = 'fact_food_security' + self.load_metadata[table_name]['start_time'] = datetime.now() + self.logger.info("Loading fact_food_security → [DW/Gold] fs_asean_gold...") + + try: + # Load dims dari Gold untuk FK resolution + dim_country = read_from_bigquery(self.client, 'dim_country', layer='gold') + dim_indicator = read_from_bigquery(self.client, 'dim_indicator', layer='gold') + dim_time = read_from_bigquery(self.client, 'dim_time', layer='gold') + dim_source = read_from_bigquery(self.client, 'dim_source', layer='gold') + dim_pillar = read_from_bigquery(self.client, 'dim_pillar', layer='gold') + + fact_table = self.df_clean.copy() + + def parse_year_range_for_merge(row): + year = row['year'] + year_range = row.get('year_range') + start_year = year + end_year = year + if pd.notna(year_range) and year_range is not None: + yr_str = str(year_range).strip() + if yr_str and yr_str != 'nan': + if '-' in yr_str: + parts = yr_str.split('-') + if len(parts) == 2: + try: + start_year = int(parts[0].strip()) + end_year = int(parts[1].strip()) + except Exception: + pass + else: + try: + single = int(yr_str) + start_year = single + end_year = single + except Exception: + pass + return pd.Series({'start_year': start_year, 'end_year': end_year}) + + if 'year_range' in fact_table.columns: + parsed = fact_table.apply(parse_year_range_for_merge, axis=1) + fact_table['start_year'] = parsed['start_year'].astype(int) + fact_table['end_year'] = parsed['end_year'].astype(int) + else: + fact_table['start_year'] = fact_table['year'].astype(int) + fact_table['end_year'] = fact_table['year'].astype(int) + + # Resolve FKs + fact_table = fact_table.merge( + dim_country[['country_id', 'country_name']].rename(columns={'country_name': 'country'}), + on='country', how='left' + ) + fact_table = fact_table.merge( + dim_indicator[['indicator_id', 'indicator_name']].rename( + columns={'indicator_name': 'indicator_standardized'}), + on='indicator_standardized', how='left' + ) + fact_table = fact_table.merge( + dim_time[['time_id', 'start_year', 'end_year']], + on=['start_year', 'end_year'], how='left' + ) + fact_table = fact_table.merge( + dim_source[['source_id', 'source_name']].rename(columns={'source_name': 'source'}), + on='source', how='left' + ) + fact_table = fact_table.merge( + dim_pillar[['pillar_id', 'pillar_name']].rename(columns={'pillar_name': 'pillar'}), + on='pillar', how='left' + ) + + # Filter hanya row dengan FK lengkap + fact_table = fact_table[ + fact_table['country_id'].notna() & + fact_table['indicator_id'].notna() & + fact_table['time_id'].notna() & + fact_table['source_id'].notna() & + fact_table['pillar_id'].notna() + ] + + fact_final = fact_table[ + ['country_id', 'indicator_id', 'time_id', 'source_id', 'pillar_id', 'value'] + ].copy() + fact_final['data_quality_score'] = 0.95 + + for col in ['country_id', 'indicator_id', 'time_id', 'source_id', 'pillar_id']: + fact_final[col] = fact_final[col].astype(int) + fact_final['value'] = fact_final['value'].astype(float) + + fact_final = fact_final.reset_index(drop=True) + fact_final.insert(0, 'fact_id', range(1, len(fact_final) + 1)) + + schema = [ + bigquery.SchemaField("fact_id", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("country_id", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("indicator_id", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("time_id", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("source_id", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("pillar_id", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("value", "FLOAT", mode="NULLABLE"), + bigquery.SchemaField("data_quality_score", "FLOAT", mode="NULLABLE"), + ] + + rows_loaded = load_to_bigquery( + self.client, fact_final, table_name, + layer='gold', write_disposition="WRITE_TRUNCATE", schema=schema + ) + + # Add PK + FKs + self._add_primary_key(table_name, 'fact_id') + self._add_foreign_key(table_name, 'country_id', 'dim_country', 'country_id') + self._add_foreign_key(table_name, 'indicator_id', 'dim_indicator', 'indicator_id') + self._add_foreign_key(table_name, 'time_id', 'dim_time', 'time_id') + self._add_foreign_key(table_name, 'source_id', 'dim_source', 'source_id') + self._add_foreign_key(table_name, 'pillar_id', 'dim_pillar', 'pillar_id') + + self.load_metadata[table_name].update( + {'rows_loaded': rows_loaded, 'status': 'success', 'end_time': datetime.now()} + ) + log_update(self.client, 'DW', table_name, 'full_load', rows_loaded) + self._save_table_metadata(table_name) + self.logger.info(f" ✓ fact_food_security: {rows_loaded:,} rows\n") + return rows_loaded + + except Exception as e: + self.load_metadata[table_name].update({'status': 'failed', 'end_time': datetime.now()}) + log_update(self.client, 'DW', table_name, 'full_load', 0, 'failed', str(e)) + raise + + # ------------------------------------------------------------------ + # VALIDATION + # ------------------------------------------------------------------ + + def validate_constraints(self): + self.logger.info("\n" + "=" * 60) + self.logger.info("CONSTRAINT VALIDATION — fs_asean_gold") + self.logger.info("=" * 60) + try: + gold_dataset = CONFIG['bigquery']['dataset_gold'] + query = f""" + SELECT table_name, constraint_name, constraint_type + FROM `{CONFIG['bigquery']['project_id']}.{gold_dataset}.INFORMATION_SCHEMA.TABLE_CONSTRAINTS` + WHERE table_name IN ( + 'dim_country', 'dim_indicator', 'dim_time', + 'dim_source', 'dim_pillar', 'fact_food_security' + ) + ORDER BY + CASE WHEN table_name LIKE 'dim_%' THEN 1 ELSE 2 END, + table_name, constraint_type + """ + df = self.client.query(query).result().to_dataframe(create_bqstorage_client=False) + if len(df) > 0: + for _, row in df.iterrows(): + icon = "[PK]" if row['constraint_type'] == "PRIMARY KEY" else "[FK]" + self.logger.info( + f" {icon} {row['table_name']:25s} | " + f"{row['constraint_type']:15s} | {row['constraint_name']}" + ) + pk_count = len(df[df['constraint_type'] == 'PRIMARY KEY']) + fk_count = len(df[df['constraint_type'] == 'FOREIGN KEY']) + self.logger.info(f"\n Primary Keys : {pk_count}") + self.logger.info(f" Foreign Keys : {fk_count}") + self.logger.info(f" Total : {len(df)}") + else: + self.logger.warning(" [WARN] No constraints found!") + except Exception as e: + self.logger.error(f"Error validating constraints: {e}") + + def validate_data_load(self): + self.logger.info("\n" + "=" * 60) + self.logger.info("DATA LOAD VALIDATION — fs_asean_gold") + self.logger.info("=" * 60) + try: + for table in ['dim_country', 'dim_indicator', 'dim_time', + 'dim_source', 'dim_pillar', 'fact_food_security']: + df = read_from_bigquery(self.client, table, layer='gold') + self.logger.info(f" {table:25s}: {len(df):>10,} rows") + + query = f""" + SELECT + COUNT(*) AS total_facts, + COUNT(DISTINCT country_id) AS unique_countries, + COUNT(DISTINCT indicator_id) AS unique_indicators, + COUNT(DISTINCT time_id) AS unique_years, + COUNT(DISTINCT source_id) AS unique_sources, + COUNT(DISTINCT pillar_id) AS unique_pillars + FROM `{get_table_id('fact_food_security', layer='gold')}` + """ + stats = self.client.query(query).result().to_dataframe( + create_bqstorage_client=False + ).iloc[0] + self.logger.info(f"\n Fact Table Summary:") + self.logger.info(f" Total Facts : {int(stats['total_facts']):>10,}") + self.logger.info(f" Unique Countries : {int(stats['unique_countries']):>10,}") + self.logger.info(f" Unique Indicators : {int(stats['unique_indicators']):>10,}") + self.logger.info(f" Unique Years : {int(stats['unique_years']):>10,}") + self.logger.info(f" Unique Sources : {int(stats['unique_sources']):>10,}") + self.logger.info(f" Unique Pillars : {int(stats['unique_pillars']):>10,}") + + query_dir = f""" + SELECT direction, COUNT(*) AS count + FROM `{get_table_id('dim_indicator', layer='gold')}` + GROUP BY direction ORDER BY direction + """ + df_dir = self.client.query(query_dir).result().to_dataframe(create_bqstorage_client=False) + if len(df_dir) > 0: + self.logger.info(f"\n Direction Distribution:") + for _, row in df_dir.iterrows(): + self.logger.info(f" {row['direction']:15s}: {int(row['count']):>5,} indicators") + + self.logger.info("\n [OK] Validation completed") + except Exception as e: + self.logger.error(f"Error during validation: {e}") + raise + + # ------------------------------------------------------------------ + # RUN + # ------------------------------------------------------------------ + + def run(self): + """Execute full dimensional model load ke DW layer (Gold).""" + self.pipeline_metadata['start_time'] = datetime.now() + self.pipeline_metadata['rows_fetched'] = len(self.df_clean) + + self.logger.info("\n" + "=" * 60) + self.logger.info("DIMENSIONAL MODEL LOAD — DW (Gold) → fs_asean_gold") + self.logger.info("=" * 60) + + # Dimensions + self.logger.info("\nLOADING DIMENSION TABLES → fs_asean_gold") + self.load_dim_country() + self.load_dim_indicator() + self.load_dim_time() + self.load_dim_source() + self.load_dim_pillar() + + # Fact + self.logger.info("\nLOADING FACT TABLE → fs_asean_gold") + self.load_fact_food_security() + + # Validate + self.validate_constraints() + self.validate_data_load() + + pipeline_end = datetime.now() + duration = (pipeline_end - self.pipeline_metadata['start_time']).total_seconds() + total_loaded = sum(m['rows_loaded'] for m in self.load_metadata.values()) + + self.pipeline_metadata.update({ + 'end_time' : pipeline_end, + 'duration_seconds' : duration, + 'rows_transformed' : total_loaded, + 'rows_loaded' : total_loaded, + 'execution_timestamp': self.pipeline_metadata['start_time'], + 'completeness_pct' : 100.0, + 'config_snapshot' : json.dumps({'load_mode': 'full_refresh', 'layer': 'gold'}), + 'validation_metrics': json.dumps({t: m['status'] for t, m in self.load_metadata.items()}), + 'table_name' : 'dimensional_model_pipeline', + }) + try: + save_etl_metadata(self.client, self.pipeline_metadata) + except Exception as e: + self.logger.warning(f" [WARN] Could not save pipeline metadata: {e}") + + # Summary + self.logger.info("\n" + "=" * 60) + self.logger.info("DIMENSIONAL MODEL LOAD COMPLETED") + self.logger.info("=" * 60) + self.logger.info(f" Dataset : fs_asean_gold") + self.logger.info(f" Duration : {duration:.2f}s") + self.logger.info(f" Tables :") + for tbl, meta in self.load_metadata.items(): + icon = "✓" if meta['status'] == 'success' else "✗" + self.logger.info(f" {icon} {tbl:25s}: {meta['rows_loaded']:>10,} rows") + self.logger.info(f"\n Metadata → [AUDIT] etl_metadata") + self.logger.info("=" * 60) + + +# ============================================================================= +# AIRFLOW TASK FUNCTIONS ← sama polanya dengan raw & cleaned layer +# ============================================================================= + +def run_dimensional_model(): + """ + Airflow task: Load dimensional model dari cleaned_integrated. + + Dipanggil oleh DAG setelah task cleaned_integration_to_silver selesai. + """ + from scripts.bigquery_config import get_bigquery_client + client = get_bigquery_client() + df_clean = read_from_bigquery(client, 'cleaned_integrated', layer='silver') + loader = DimensionalModelLoader(client, df_clean) + loader.run() + print(f"Dimensional model loaded: {len(df_clean):,} source rows processed") + + +# ============================================================================= +# MAIN EXECUTION +# ============================================================================= + +if __name__ == "__main__": + print("=" * 60) + print("BIGQUERY DIMENSIONAL MODEL LOAD") + print("Kimball DW Architecture") + print(" Input : STAGING (Silver) → cleaned_integrated (fs_asean_silver)") + print(" Output : DW (Gold) → dim_*, fact_* (fs_asean_gold)") + print(" Audit : AUDIT → etl_logs, etl_metadata (fs_asean_audit)") + print("=" * 60) + + logger = setup_logging() + client = get_bigquery_client() + + print("\nLoading cleaned_integrated (fs_asean_silver)...") + df_clean = read_from_bigquery(client, 'cleaned_integrated', layer='silver') + print(f" ✓ Loaded : {len(df_clean):,} rows") + print(f" Columns : {len(df_clean.columns)}") + print(f" Sources : {df_clean['source'].nunique()}") + print(f" Indicators : {df_clean['indicator_standardized'].nunique()}") + print(f" Countries : {df_clean['country'].nunique()}") + print(f" Year range : {int(df_clean['year'].min())}–{int(df_clean['year'].max())}") + if 'direction' in df_clean.columns: + print(f" Direction : {df_clean['direction'].value_counts().to_dict()}") + else: + print(f" [WARN] direction column not found — run bigquery_cleaned_layer.py first") + + print("\n[1/1] Dimensional Model Load → DW (Gold)...") + loader = DimensionalModelLoader(client, df_clean) + loader.run() + + print("\n" + "=" * 60) + print("✓ DIMENSIONAL MODEL ETL COMPLETED") + print(" 🥇 DW (Gold) : dim_country, dim_indicator, dim_time,") + print(" dim_source, dim_pillar, fact_food_security") + print(" 📋 AUDIT : etl_logs, etl_metadata") + print("=" * 60) \ No newline at end of file