From a4ff15677e884d0cccd49db51d18c6b266293fe8 Mon Sep 17 00:00:00 2001 From: Debby Date: Sun, 15 Mar 2026 00:15:53 +0700 Subject: [PATCH] replace sklearn with pure numpy --- scripts/bigquery_aggregate_layer.py | 21 +- scripts/bigquery_dimesional_model.py | 850 --------------------------- 2 files changed, 10 insertions(+), 861 deletions(-) delete mode 100644 scripts/bigquery_dimesional_model.py diff --git a/scripts/bigquery_aggregate_layer.py b/scripts/bigquery_aggregate_layer.py index 7f7c6b7..15ed0a0 100644 --- a/scripts/bigquery_aggregate_layer.py +++ b/scripts/bigquery_aggregate_layer.py @@ -24,7 +24,6 @@ from scripts.bigquery_helpers import ( save_etl_metadata, ) from google.cloud import bigquery -from sklearn.preprocessing import MinMaxScaler # ============================================================================= @@ -87,12 +86,10 @@ def global_minmax(series: pd.Series, lo: float = 1.0, hi: float = 100.0) -> pd.S v_min, v_max = values.min(), values.max() if v_min == v_max: return pd.Series((lo + hi) / 2.0, index=series.index) - scaler = MinMaxScaler(feature_range=(lo, hi)) - result = np.full(len(series), np.nan) + result = np.full(len(series), np.nan) not_nan = series.notna() - result[not_nan.values] = scaler.fit_transform( - series[not_nan].values.reshape(-1, 1) - ).flatten() + raw = series[not_nan].values + result[not_nan.values] = lo + (raw - v_min) / (v_max - v_min) * (hi - lo) return pd.Series(result, index=series.index) @@ -274,11 +271,13 @@ class FoodSecurityAggregator: norm_parts.append(grp) continue - scaler = MinMaxScaler(feature_range=(0, 1)) + raw = grp.loc[valid_mask, "value"].values + v_min, v_max = raw.min(), raw.max() normed = np.full(len(grp), np.nan) - normed[valid_mask.values] = scaler.fit_transform( - grp.loc[valid_mask, ["value"]] - ).flatten() + if v_min == v_max: + normed[valid_mask.values] = 0.5 + else: + normed[valid_mask.values] = (raw - v_min) / (v_max - v_min) if do_invert: normed = np.where(np.isnan(normed), np.nan, 1.0 - normed) @@ -757,7 +756,7 @@ if __name__ == "__main__": _sys.stderr = io.TextIOWrapper(_sys.stderr.buffer, encoding="utf-8", errors="replace") print("=" * 70) - print("FOOD SECURITY AGGREGATION v8.0 — 4 TABLES -> fs_asean_gold") + print("FOOD SECURITY AGGREGATION 4 TABLES -> fs_asean_gold") print(f" NORMALIZE_FRAMEWORKS_JOINTLY = {NORMALIZE_FRAMEWORKS_JOINTLY}") print("=" * 70) diff --git a/scripts/bigquery_dimesional_model.py b/scripts/bigquery_dimesional_model.py deleted file mode 100644 index a5e665c..0000000 --- a/scripts/bigquery_dimesional_model.py +++ /dev/null @@ -1,850 +0,0 @@ -""" -BIGQUERY DIMENSIONAL MODEL LOAD -Kimball Data Warehouse Architecture - -Kimball ETL Flow yang dijalankan file ini: - Input : STAGING layer (Silver) — cleaned_integrated (fs_asean_silver) - Output : DW layer (Gold) — dim_*, fact_* (fs_asean_gold) - Audit : AUDIT layer — etl_logs, etl_metadata (fs_asean_audit) - -Classes: - DimensionalModelLoader — Build Star Schema & load ke Gold layer - -Usage: - python bigquery_dimensional_model.py -""" - -import pandas as pd -import numpy as np -from datetime import datetime -import logging -from typing import Dict, List -import json -import sys - -from scripts.bigquery_config import get_bigquery_client, CONFIG, get_table_id -from scripts.bigquery_helpers import ( - log_update, - load_to_bigquery, - read_from_bigquery, - setup_logging, - truncate_table, - save_etl_metadata, -) -from google.cloud import bigquery - -if hasattr(sys.stdout, 'reconfigure'): - sys.stdout.reconfigure(encoding='utf-8') - - -# ============================================================================= -# DIMENSIONAL MODEL LOADER -# ============================================================================= - -class DimensionalModelLoader: - """ - Loader untuk dimensional model ke DW layer (Gold) — fs_asean_gold. - - Kimball context: - Input : cleaned_integrated → STAGING (Silver) — fs_asean_silver - Output : dim_* + fact_* → DW (Gold) — fs_asean_gold - Audit : etl_logs, etl_metadata → AUDIT — fs_asean_audit - - Pipeline steps: - 1. Load dim_country - 2. Load dim_indicator - 3. Load dim_time - 4. Load dim_source - 5. Load dim_pillar - 6. Load fact_food_security (resolve FK dari Gold dims) - 7. Validate constraints & data load - """ - - def __init__(self, client: bigquery.Client, df_clean: pd.DataFrame): - self.client = client - self.df_clean = df_clean - self.logger = logging.getLogger(self.__class__.__name__) - self.logger.propagate = False - self.target_layer = 'gold' - - self.load_metadata = { - 'dim_country' : {'start_time': None, 'end_time': None, 'rows_loaded': 0, 'status': 'pending'}, - 'dim_indicator' : {'start_time': None, 'end_time': None, 'rows_loaded': 0, 'status': 'pending'}, - 'dim_time' : {'start_time': None, 'end_time': None, 'rows_loaded': 0, 'status': 'pending'}, - 'dim_source' : {'start_time': None, 'end_time': None, 'rows_loaded': 0, 'status': 'pending'}, - 'dim_pillar' : {'start_time': None, 'end_time': None, 'rows_loaded': 0, 'status': 'pending'}, - 'fact_food_security': {'start_time': None, 'end_time': None, 'rows_loaded': 0, 'status': 'pending'}, - } - - self.pipeline_metadata = { - 'source_class' : self.__class__.__name__, - 'start_time' : None, - 'end_time' : None, - 'duration_seconds' : None, - 'rows_fetched' : 0, - 'rows_transformed' : 0, - 'rows_loaded' : 0, - 'validation_metrics': {} - } - - # ------------------------------------------------------------------ - # CONSTRAINT HELPERS - # ------------------------------------------------------------------ - - def _add_primary_key(self, table_name: str, column_name: str): - table_id = get_table_id(table_name, layer='gold') - query = f"ALTER TABLE `{table_id}` ADD PRIMARY KEY ({column_name}) NOT ENFORCED" - try: - self.client.query(query).result() - self.logger.info(f" [OK] PRIMARY KEY: {table_name}({column_name})") - except Exception as e: - if "already exists" in str(e).lower(): - self.logger.info(f" [INFO] PRIMARY KEY already exists: {table_name}({column_name})") - else: - self.logger.warning(f" [WARN] Could not add PRIMARY KEY to {table_name}.{column_name}: {e}") - - def _add_foreign_key(self, table_name: str, fk_column: str, - ref_table: str, ref_column: str): - table_id = get_table_id(table_name, layer='gold') - ref_table_id = get_table_id(ref_table, layer='gold') - constraint_name = f"fk_{table_name}_{fk_column}" - query = f""" - ALTER TABLE `{table_id}` - ADD CONSTRAINT {constraint_name} - FOREIGN KEY ({fk_column}) - REFERENCES `{ref_table_id}`({ref_column}) - NOT ENFORCED - """ - try: - self.client.query(query).result() - self.logger.info(f" [OK] FK: {table_name}.{fk_column} → {ref_table}.{ref_column}") - except Exception as e: - if "already exists" in str(e).lower(): - self.logger.info(f" [INFO] FK already exists: {constraint_name}") - else: - self.logger.warning(f" [WARN] Could not add FK {constraint_name}: {e}") - - # ------------------------------------------------------------------ - # METADATA HELPER - # ------------------------------------------------------------------ - - def _save_table_metadata(self, table_name: str): - meta = self.load_metadata[table_name] - metadata = { - 'source_class' : self.__class__.__name__, - 'table_name' : table_name, - 'execution_timestamp': meta['start_time'], - 'duration_seconds' : (meta['end_time'] - meta['start_time']).total_seconds() - if meta['end_time'] else 0, - 'rows_fetched' : 0, - 'rows_transformed' : meta['rows_loaded'], - 'rows_loaded' : meta['rows_loaded'], - 'completeness_pct' : 100.0 if meta['status'] == 'success' else 0.0, - 'config_snapshot' : json.dumps({'load_mode': 'full_refresh', 'layer': 'gold'}), - 'validation_metrics' : json.dumps({'status': meta['status'], 'rows': meta['rows_loaded']}) - } - try: - save_etl_metadata(self.client, metadata) - self.logger.info(f" Metadata → [AUDIT] etl_metadata") - except Exception as e: - self.logger.warning(f" [WARN] Could not save metadata for {table_name}: {e}") - - # ------------------------------------------------------------------ - # DIMENSION LOADERS - # ------------------------------------------------------------------ - - def load_dim_time(self): - table_name = 'dim_time' - self.load_metadata[table_name]['start_time'] = datetime.now() - self.logger.info("Loading dim_time → [DW/Gold] fs_asean_gold...") - - try: - if 'year_range' in self.df_clean.columns: - dim_time = self.df_clean[['year', 'year_range']].drop_duplicates().copy() - else: - dim_time = self.df_clean[['year']].drop_duplicates().copy() - dim_time['year_range'] = None - - dim_time['year'] = dim_time['year'].astype(int) - - def parse_year_range(row): - year = row['year'] - year_range = row.get('year_range') - start_year = year - end_year = year - if pd.notna(year_range) and year_range is not None: - yr_str = str(year_range).strip() - if yr_str and yr_str != 'nan': - if '-' in yr_str: - parts = yr_str.split('-') - if len(parts) == 2: - try: - start_year = int(parts[0].strip()) - end_year = int(parts[1].strip()) - year = (start_year + end_year) // 2 - except Exception: - pass - else: - try: - single = int(yr_str) - start_year = single - end_year = single - year = single - except Exception: - pass - return pd.Series({'year': year, 'start_year': start_year, 'end_year': end_year}) - - parsed = dim_time.apply(parse_year_range, axis=1) - dim_time['year'] = parsed['year'].astype(int) - dim_time['start_year'] = parsed['start_year'].astype(int) - dim_time['end_year'] = parsed['end_year'].astype(int) - dim_time['is_year_range'] = (dim_time['start_year'] != dim_time['end_year']) - dim_time['decade'] = (dim_time['year'] // 10) * 10 - dim_time['is_range'] = (dim_time['start_year'] != dim_time['end_year']).astype(int) - dim_time = dim_time.sort_values(['is_range', 'start_year'], ascending=[True, True]) - dim_time = dim_time.drop(['is_range', 'year_range'], axis=1, errors='ignore') - dim_time = dim_time.drop_duplicates(subset=['start_year', 'end_year'], keep='first') - - dim_time_final = dim_time[['year', 'start_year', 'end_year', 'decade', 'is_year_range']].copy() - dim_time_final = dim_time_final.reset_index(drop=True) - dim_time_final.insert(0, 'time_id', range(1, len(dim_time_final) + 1)) - - schema = [ - bigquery.SchemaField("time_id", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("start_year", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("end_year", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("decade", "INTEGER", mode="NULLABLE"), - bigquery.SchemaField("is_year_range", "BOOLEAN", mode="NULLABLE"), - ] - - rows_loaded = load_to_bigquery( - self.client, dim_time_final, table_name, - layer='gold', write_disposition="WRITE_TRUNCATE", schema=schema - ) - self._add_primary_key(table_name, 'time_id') - - self.load_metadata[table_name].update( - {'rows_loaded': rows_loaded, 'status': 'success', 'end_time': datetime.now()} - ) - log_update(self.client, 'DW', table_name, 'full_load', rows_loaded) - self._save_table_metadata(table_name) - self.logger.info(f" ✓ dim_time: {rows_loaded} rows\n") - return rows_loaded - - except Exception as e: - self.load_metadata[table_name].update({'status': 'failed', 'end_time': datetime.now()}) - log_update(self.client, 'DW', table_name, 'full_load', 0, 'failed', str(e)) - raise - - def load_dim_country(self): - table_name = 'dim_country' - self.load_metadata[table_name]['start_time'] = datetime.now() - self.logger.info("Loading dim_country → [DW/Gold] fs_asean_gold...") - - try: - dim_country = self.df_clean[['country']].drop_duplicates().copy() - dim_country.columns = ['country_name'] - - region_mapping = { - 'Brunei Darussalam': ('Southeast Asia', 'ASEAN'), - 'Cambodia' : ('Southeast Asia', 'ASEAN'), - 'Indonesia' : ('Southeast Asia', 'ASEAN'), - 'Laos' : ('Southeast Asia', 'ASEAN'), - 'Malaysia' : ('Southeast Asia', 'ASEAN'), - 'Myanmar' : ('Southeast Asia', 'ASEAN'), - 'Philippines' : ('Southeast Asia', 'ASEAN'), - 'Singapore' : ('Southeast Asia', 'ASEAN'), - 'Thailand' : ('Southeast Asia', 'ASEAN'), - 'Vietnam' : ('Southeast Asia', 'ASEAN'), - } - iso_mapping = { - 'Brunei Darussalam': 'BRN', 'Cambodia': 'KHM', 'Indonesia': 'IDN', - 'Laos': 'LAO', 'Malaysia': 'MYS', 'Myanmar': 'MMR', - 'Philippines': 'PHL', 'Singapore': 'SGP', 'Thailand': 'THA', 'Vietnam': 'VNM', - } - - dim_country['region'] = dim_country['country_name'].map( - lambda x: region_mapping.get(x, ('Unknown', 'Unknown'))[0]) - dim_country['subregion'] = dim_country['country_name'].map( - lambda x: region_mapping.get(x, ('Unknown', 'Unknown'))[1]) - dim_country['iso_code'] = dim_country['country_name'].map(iso_mapping) - - dim_country_final = dim_country[['country_name', 'region', 'subregion', 'iso_code']].copy() - dim_country_final = dim_country_final.reset_index(drop=True) - dim_country_final.insert(0, 'country_id', range(1, len(dim_country_final) + 1)) - - schema = [ - bigquery.SchemaField("country_id", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("country_name", "STRING", mode="REQUIRED"), - bigquery.SchemaField("region", "STRING", mode="NULLABLE"), - bigquery.SchemaField("subregion", "STRING", mode="NULLABLE"), - bigquery.SchemaField("iso_code", "STRING", mode="NULLABLE"), - ] - - rows_loaded = load_to_bigquery( - self.client, dim_country_final, table_name, - layer='gold', write_disposition="WRITE_TRUNCATE", schema=schema - ) - self._add_primary_key(table_name, 'country_id') - - self.load_metadata[table_name].update( - {'rows_loaded': rows_loaded, 'status': 'success', 'end_time': datetime.now()} - ) - log_update(self.client, 'DW', table_name, 'full_load', rows_loaded) - self._save_table_metadata(table_name) - self.logger.info(f" ✓ dim_country: {rows_loaded} rows\n") - return rows_loaded - - except Exception as e: - self.load_metadata[table_name].update({'status': 'failed', 'end_time': datetime.now()}) - log_update(self.client, 'DW', table_name, 'full_load', 0, 'failed', str(e)) - raise - - def load_dim_indicator(self): - table_name = 'dim_indicator' - self.load_metadata[table_name]['start_time'] = datetime.now() - self.logger.info("Loading dim_indicator → [DW/Gold] fs_asean_gold...") - - try: - has_direction = 'direction' in self.df_clean.columns - has_unit = 'unit' in self.df_clean.columns - has_category = 'indicator_category' in self.df_clean.columns - - dim_indicator = self.df_clean[['indicator_standardized']].drop_duplicates().copy() - dim_indicator.columns = ['indicator_name'] - - if has_unit: - unit_map = self.df_clean[['indicator_standardized', 'unit']].drop_duplicates() - unit_map.columns = ['indicator_name', 'unit'] - dim_indicator = dim_indicator.merge(unit_map, on='indicator_name', how='left') - else: - dim_indicator['unit'] = None - - if has_direction: - dir_map = self.df_clean[['indicator_standardized', 'direction']].drop_duplicates() - dir_map.columns = ['indicator_name', 'direction'] - dim_indicator = dim_indicator.merge(dir_map, on='indicator_name', how='left') - self.logger.info(" [OK] direction column from cleaned_integrated") - else: - dim_indicator['direction'] = 'higher_better' - self.logger.warning(" [WARN] direction not found, default: higher_better") - - if has_category: - cat_map = self.df_clean[['indicator_standardized', 'indicator_category']].drop_duplicates() - cat_map.columns = ['indicator_name', 'indicator_category'] - dim_indicator = dim_indicator.merge(cat_map, on='indicator_name', how='left') - else: - def categorize_indicator(name): - n = str(name).lower() - if any(w in n for w in ['undernourishment', 'malnutrition', 'stunting', - 'wasting', 'anemia', 'food security', 'food insecure', 'hunger']): - return 'Health & Nutrition' - elif any(w in n for w in ['production', 'yield', 'cereal', 'crop', - 'import dependency', 'share of dietary']): - return 'Agricultural Production' - elif any(w in n for w in ['import', 'export', 'trade']): - return 'Trade' - elif any(w in n for w in ['gdp', 'income', 'economic']): - return 'Economic' - elif any(w in n for w in ['water', 'sanitation', 'infrastructure', 'rail']): - return 'Infrastructure' - else: - return 'Other' - dim_indicator['indicator_category'] = dim_indicator['indicator_name'].apply(categorize_indicator) - - dim_indicator = dim_indicator.drop_duplicates(subset=['indicator_name'], keep='first') - dim_indicator_final = dim_indicator[ - ['indicator_name', 'indicator_category', 'unit', 'direction'] - ].copy() - dim_indicator_final = dim_indicator_final.reset_index(drop=True) - dim_indicator_final.insert(0, 'indicator_id', range(1, len(dim_indicator_final) + 1)) - - schema = [ - bigquery.SchemaField("indicator_id", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("indicator_name", "STRING", mode="REQUIRED"), - bigquery.SchemaField("indicator_category", "STRING", mode="REQUIRED"), - bigquery.SchemaField("unit", "STRING", mode="NULLABLE"), - bigquery.SchemaField("direction", "STRING", mode="REQUIRED"), - ] - - rows_loaded = load_to_bigquery( - self.client, dim_indicator_final, table_name, - layer='gold', write_disposition="WRITE_TRUNCATE", schema=schema - ) - self._add_primary_key(table_name, 'indicator_id') - - for label, col in [('Categories', 'indicator_category'), ('Direction', 'direction')]: - self.logger.info(f" {label}:") - for val, cnt in dim_indicator_final[col].value_counts().items(): - self.logger.info(f" - {val}: {cnt} ({cnt/len(dim_indicator_final)*100:.1f}%)") - - self.load_metadata[table_name].update( - {'rows_loaded': rows_loaded, 'status': 'success', 'end_time': datetime.now()} - ) - log_update(self.client, 'DW', table_name, 'full_load', rows_loaded) - self._save_table_metadata(table_name) - self.logger.info(f" ✓ dim_indicator: {rows_loaded} rows\n") - return rows_loaded - - except Exception as e: - self.load_metadata[table_name].update({'status': 'failed', 'end_time': datetime.now()}) - log_update(self.client, 'DW', table_name, 'full_load', 0, 'failed', str(e)) - raise - - def load_dim_source(self): - table_name = 'dim_source' - self.load_metadata[table_name]['start_time'] = datetime.now() - self.logger.info("Loading dim_source → [DW/Gold] fs_asean_gold...") - - try: - source_details = { - 'FAO': { - 'source_type' : 'International Organization', - 'organization' : 'Food and Agriculture Organization', - 'access_method': 'Python Library (faostat)', - 'api_endpoint' : None, - }, - 'World Bank': { - 'source_type' : 'International Organization', - 'organization' : 'The World Bank', - 'access_method': 'Python Library (wbgapi)', - 'api_endpoint' : None, - }, - 'UNICEF': { - 'source_type' : 'International Organization', - 'organization' : "United Nations Children's Fund", - 'access_method': 'SDMX API', - 'api_endpoint' : 'https://sdmx.data.unicef.org/ws/public/sdmxapi/rest', - }, - } - - sources_data = [] - for source in self.df_clean['source'].unique(): - detail = source_details.get(source, { - 'source_type' : 'International Organization', - 'organization' : source, - 'access_method': 'Unknown', - 'api_endpoint' : None, - }) - sources_data.append({'source_name': source, **detail}) - - dim_source_final = pd.DataFrame(sources_data)[ - ['source_name', 'source_type', 'organization', 'access_method', 'api_endpoint'] - ].copy() - dim_source_final = dim_source_final.reset_index(drop=True) - dim_source_final.insert(0, 'source_id', range(1, len(dim_source_final) + 1)) - - schema = [ - bigquery.SchemaField("source_id", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("source_name", "STRING", mode="REQUIRED"), - bigquery.SchemaField("source_type", "STRING", mode="NULLABLE"), - bigquery.SchemaField("organization", "STRING", mode="NULLABLE"), - bigquery.SchemaField("access_method", "STRING", mode="NULLABLE"), - bigquery.SchemaField("api_endpoint", "STRING", mode="NULLABLE"), - ] - - rows_loaded = load_to_bigquery( - self.client, dim_source_final, table_name, - layer='gold', write_disposition="WRITE_TRUNCATE", schema=schema - ) - self._add_primary_key(table_name, 'source_id') - - self.load_metadata[table_name].update( - {'rows_loaded': rows_loaded, 'status': 'success', 'end_time': datetime.now()} - ) - log_update(self.client, 'DW', table_name, 'full_load', rows_loaded) - self._save_table_metadata(table_name) - self.logger.info(f" ✓ dim_source: {rows_loaded} rows\n") - return rows_loaded - - except Exception as e: - self.load_metadata[table_name].update({'status': 'failed', 'end_time': datetime.now()}) - log_update(self.client, 'DW', table_name, 'full_load', 0, 'failed', str(e)) - raise - - def load_dim_pillar(self): - table_name = 'dim_pillar' - self.load_metadata[table_name]['start_time'] = datetime.now() - self.logger.info("Loading dim_pillar → [DW/Gold] fs_asean_gold...") - - try: - pillar_codes = { - 'Availability': 'AVL', 'Access' : 'ACC', - 'Utilization' : 'UTL', 'Stability': 'STB', 'Other': 'OTH', - } - pillars_data = [ - {'pillar_name': p, 'pillar_code': pillar_codes.get(p, 'OTH')} - for p in self.df_clean['pillar'].unique() - ] - - dim_pillar_final = pd.DataFrame(pillars_data).sort_values('pillar_name')[ - ['pillar_name', 'pillar_code'] - ].copy() - dim_pillar_final = dim_pillar_final.reset_index(drop=True) - dim_pillar_final.insert(0, 'pillar_id', range(1, len(dim_pillar_final) + 1)) - - schema = [ - bigquery.SchemaField("pillar_id", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("pillar_name", "STRING", mode="REQUIRED"), - bigquery.SchemaField("pillar_code", "STRING", mode="NULLABLE"), - ] - - rows_loaded = load_to_bigquery( - self.client, dim_pillar_final, table_name, - layer='gold', write_disposition="WRITE_TRUNCATE", schema=schema - ) - self._add_primary_key(table_name, 'pillar_id') - - self.load_metadata[table_name].update( - {'rows_loaded': rows_loaded, 'status': 'success', 'end_time': datetime.now()} - ) - log_update(self.client, 'DW', table_name, 'full_load', rows_loaded) - self._save_table_metadata(table_name) - self.logger.info(f" ✓ dim_pillar: {rows_loaded} rows\n") - return rows_loaded - - except Exception as e: - self.load_metadata[table_name].update({'status': 'failed', 'end_time': datetime.now()}) - log_update(self.client, 'DW', table_name, 'full_load', 0, 'failed', str(e)) - raise - - # ------------------------------------------------------------------ - # FACT LOADER - # ------------------------------------------------------------------ - - def load_fact_food_security(self): - table_name = 'fact_food_security' - self.load_metadata[table_name]['start_time'] = datetime.now() - self.logger.info("Loading fact_food_security → [DW/Gold] fs_asean_gold...") - - try: - # Load dims dari Gold untuk FK resolution - dim_country = read_from_bigquery(self.client, 'dim_country', layer='gold') - dim_indicator = read_from_bigquery(self.client, 'dim_indicator', layer='gold') - dim_time = read_from_bigquery(self.client, 'dim_time', layer='gold') - dim_source = read_from_bigquery(self.client, 'dim_source', layer='gold') - dim_pillar = read_from_bigquery(self.client, 'dim_pillar', layer='gold') - - fact_table = self.df_clean.copy() - - def parse_year_range_for_merge(row): - year = row['year'] - year_range = row.get('year_range') - start_year = year - end_year = year - if pd.notna(year_range) and year_range is not None: - yr_str = str(year_range).strip() - if yr_str and yr_str != 'nan': - if '-' in yr_str: - parts = yr_str.split('-') - if len(parts) == 2: - try: - start_year = int(parts[0].strip()) - end_year = int(parts[1].strip()) - except Exception: - pass - else: - try: - single = int(yr_str) - start_year = single - end_year = single - except Exception: - pass - return pd.Series({'start_year': start_year, 'end_year': end_year}) - - if 'year_range' in fact_table.columns: - parsed = fact_table.apply(parse_year_range_for_merge, axis=1) - fact_table['start_year'] = parsed['start_year'].astype(int) - fact_table['end_year'] = parsed['end_year'].astype(int) - else: - fact_table['start_year'] = fact_table['year'].astype(int) - fact_table['end_year'] = fact_table['year'].astype(int) - - # Resolve FKs - fact_table = fact_table.merge( - dim_country[['country_id', 'country_name']].rename(columns={'country_name': 'country'}), - on='country', how='left' - ) - fact_table = fact_table.merge( - dim_indicator[['indicator_id', 'indicator_name']].rename( - columns={'indicator_name': 'indicator_standardized'}), - on='indicator_standardized', how='left' - ) - fact_table = fact_table.merge( - dim_time[['time_id', 'start_year', 'end_year']], - on=['start_year', 'end_year'], how='left' - ) - fact_table = fact_table.merge( - dim_source[['source_id', 'source_name']].rename(columns={'source_name': 'source'}), - on='source', how='left' - ) - fact_table = fact_table.merge( - dim_pillar[['pillar_id', 'pillar_name']].rename(columns={'pillar_name': 'pillar'}), - on='pillar', how='left' - ) - - # Filter hanya row dengan FK lengkap - fact_table = fact_table[ - fact_table['country_id'].notna() & - fact_table['indicator_id'].notna() & - fact_table['time_id'].notna() & - fact_table['source_id'].notna() & - fact_table['pillar_id'].notna() - ] - - fact_final = fact_table[ - ['country_id', 'indicator_id', 'time_id', 'source_id', 'pillar_id', 'value'] - ].copy() - fact_final['data_quality_score'] = 0.95 - - for col in ['country_id', 'indicator_id', 'time_id', 'source_id', 'pillar_id']: - fact_final[col] = fact_final[col].astype(int) - fact_final['value'] = fact_final['value'].astype(float) - - fact_final = fact_final.reset_index(drop=True) - fact_final.insert(0, 'fact_id', range(1, len(fact_final) + 1)) - - schema = [ - bigquery.SchemaField("fact_id", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("country_id", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("indicator_id", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("time_id", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("source_id", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("pillar_id", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("value", "FLOAT", mode="NULLABLE"), - bigquery.SchemaField("data_quality_score", "FLOAT", mode="NULLABLE"), - ] - - rows_loaded = load_to_bigquery( - self.client, fact_final, table_name, - layer='gold', write_disposition="WRITE_TRUNCATE", schema=schema - ) - - # Add PK + FKs - self._add_primary_key(table_name, 'fact_id') - self._add_foreign_key(table_name, 'country_id', 'dim_country', 'country_id') - self._add_foreign_key(table_name, 'indicator_id', 'dim_indicator', 'indicator_id') - self._add_foreign_key(table_name, 'time_id', 'dim_time', 'time_id') - self._add_foreign_key(table_name, 'source_id', 'dim_source', 'source_id') - self._add_foreign_key(table_name, 'pillar_id', 'dim_pillar', 'pillar_id') - - self.load_metadata[table_name].update( - {'rows_loaded': rows_loaded, 'status': 'success', 'end_time': datetime.now()} - ) - log_update(self.client, 'DW', table_name, 'full_load', rows_loaded) - self._save_table_metadata(table_name) - self.logger.info(f" ✓ fact_food_security: {rows_loaded:,} rows\n") - return rows_loaded - - except Exception as e: - self.load_metadata[table_name].update({'status': 'failed', 'end_time': datetime.now()}) - log_update(self.client, 'DW', table_name, 'full_load', 0, 'failed', str(e)) - raise - - # ------------------------------------------------------------------ - # VALIDATION - # ------------------------------------------------------------------ - - def validate_constraints(self): - self.logger.info("\n" + "=" * 60) - self.logger.info("CONSTRAINT VALIDATION — fs_asean_gold") - self.logger.info("=" * 60) - try: - gold_dataset = CONFIG['bigquery']['dataset_gold'] - query = f""" - SELECT table_name, constraint_name, constraint_type - FROM `{CONFIG['bigquery']['project_id']}.{gold_dataset}.INFORMATION_SCHEMA.TABLE_CONSTRAINTS` - WHERE table_name IN ( - 'dim_country', 'dim_indicator', 'dim_time', - 'dim_source', 'dim_pillar', 'fact_food_security' - ) - ORDER BY - CASE WHEN table_name LIKE 'dim_%' THEN 1 ELSE 2 END, - table_name, constraint_type - """ - df = self.client.query(query).result().to_dataframe(create_bqstorage_client=False) - if len(df) > 0: - for _, row in df.iterrows(): - icon = "[PK]" if row['constraint_type'] == "PRIMARY KEY" else "[FK]" - self.logger.info( - f" {icon} {row['table_name']:25s} | " - f"{row['constraint_type']:15s} | {row['constraint_name']}" - ) - pk_count = len(df[df['constraint_type'] == 'PRIMARY KEY']) - fk_count = len(df[df['constraint_type'] == 'FOREIGN KEY']) - self.logger.info(f"\n Primary Keys : {pk_count}") - self.logger.info(f" Foreign Keys : {fk_count}") - self.logger.info(f" Total : {len(df)}") - else: - self.logger.warning(" [WARN] No constraints found!") - except Exception as e: - self.logger.error(f"Error validating constraints: {e}") - - def validate_data_load(self): - self.logger.info("\n" + "=" * 60) - self.logger.info("DATA LOAD VALIDATION — fs_asean_gold") - self.logger.info("=" * 60) - try: - for table in ['dim_country', 'dim_indicator', 'dim_time', - 'dim_source', 'dim_pillar', 'fact_food_security']: - df = read_from_bigquery(self.client, table, layer='gold') - self.logger.info(f" {table:25s}: {len(df):>10,} rows") - - query = f""" - SELECT - COUNT(*) AS total_facts, - COUNT(DISTINCT country_id) AS unique_countries, - COUNT(DISTINCT indicator_id) AS unique_indicators, - COUNT(DISTINCT time_id) AS unique_years, - COUNT(DISTINCT source_id) AS unique_sources, - COUNT(DISTINCT pillar_id) AS unique_pillars - FROM `{get_table_id('fact_food_security', layer='gold')}` - """ - stats = self.client.query(query).result().to_dataframe( - create_bqstorage_client=False - ).iloc[0] - self.logger.info(f"\n Fact Table Summary:") - self.logger.info(f" Total Facts : {int(stats['total_facts']):>10,}") - self.logger.info(f" Unique Countries : {int(stats['unique_countries']):>10,}") - self.logger.info(f" Unique Indicators : {int(stats['unique_indicators']):>10,}") - self.logger.info(f" Unique Years : {int(stats['unique_years']):>10,}") - self.logger.info(f" Unique Sources : {int(stats['unique_sources']):>10,}") - self.logger.info(f" Unique Pillars : {int(stats['unique_pillars']):>10,}") - - query_dir = f""" - SELECT direction, COUNT(*) AS count - FROM `{get_table_id('dim_indicator', layer='gold')}` - GROUP BY direction ORDER BY direction - """ - df_dir = self.client.query(query_dir).result().to_dataframe(create_bqstorage_client=False) - if len(df_dir) > 0: - self.logger.info(f"\n Direction Distribution:") - for _, row in df_dir.iterrows(): - self.logger.info(f" {row['direction']:15s}: {int(row['count']):>5,} indicators") - - self.logger.info("\n [OK] Validation completed") - except Exception as e: - self.logger.error(f"Error during validation: {e}") - raise - - # ------------------------------------------------------------------ - # RUN - # ------------------------------------------------------------------ - - def run(self): - """Execute full dimensional model load ke DW layer (Gold).""" - self.pipeline_metadata['start_time'] = datetime.now() - self.pipeline_metadata['rows_fetched'] = len(self.df_clean) - - self.logger.info("\n" + "=" * 60) - self.logger.info("DIMENSIONAL MODEL LOAD — DW (Gold) → fs_asean_gold") - self.logger.info("=" * 60) - - # Dimensions - self.logger.info("\nLOADING DIMENSION TABLES → fs_asean_gold") - self.load_dim_country() - self.load_dim_indicator() - self.load_dim_time() - self.load_dim_source() - self.load_dim_pillar() - - # Fact - self.logger.info("\nLOADING FACT TABLE → fs_asean_gold") - self.load_fact_food_security() - - # Validate - self.validate_constraints() - self.validate_data_load() - - pipeline_end = datetime.now() - duration = (pipeline_end - self.pipeline_metadata['start_time']).total_seconds() - total_loaded = sum(m['rows_loaded'] for m in self.load_metadata.values()) - - self.pipeline_metadata.update({ - 'end_time' : pipeline_end, - 'duration_seconds' : duration, - 'rows_transformed' : total_loaded, - 'rows_loaded' : total_loaded, - 'execution_timestamp': self.pipeline_metadata['start_time'], - 'completeness_pct' : 100.0, - 'config_snapshot' : json.dumps({'load_mode': 'full_refresh', 'layer': 'gold'}), - 'validation_metrics': json.dumps({t: m['status'] for t, m in self.load_metadata.items()}), - 'table_name' : 'dimensional_model_pipeline', - }) - try: - save_etl_metadata(self.client, self.pipeline_metadata) - except Exception as e: - self.logger.warning(f" [WARN] Could not save pipeline metadata: {e}") - - # Summary - self.logger.info("\n" + "=" * 60) - self.logger.info("DIMENSIONAL MODEL LOAD COMPLETED") - self.logger.info("=" * 60) - self.logger.info(f" Dataset : fs_asean_gold") - self.logger.info(f" Duration : {duration:.2f}s") - self.logger.info(f" Tables :") - for tbl, meta in self.load_metadata.items(): - icon = "✓" if meta['status'] == 'success' else "✗" - self.logger.info(f" {icon} {tbl:25s}: {meta['rows_loaded']:>10,} rows") - self.logger.info(f"\n Metadata → [AUDIT] etl_metadata") - self.logger.info("=" * 60) - - -# ============================================================================= -# AIRFLOW TASK FUNCTIONS ← sama polanya dengan raw & cleaned layer -# ============================================================================= - -def run_dimensional_model(): - """ - Airflow task: Load dimensional model dari cleaned_integrated. - - Dipanggil oleh DAG setelah task cleaned_integration_to_silver selesai. - """ - from scripts.bigquery_config import get_bigquery_client - client = get_bigquery_client() - df_clean = read_from_bigquery(client, 'cleaned_integrated', layer='silver') - loader = DimensionalModelLoader(client, df_clean) - loader.run() - print(f"Dimensional model loaded: {len(df_clean):,} source rows processed") - - -# ============================================================================= -# MAIN EXECUTION -# ============================================================================= - -if __name__ == "__main__": - print("=" * 60) - print("BIGQUERY DIMENSIONAL MODEL LOAD") - print("Kimball DW Architecture") - print(" Input : STAGING (Silver) → cleaned_integrated (fs_asean_silver)") - print(" Output : DW (Gold) → dim_*, fact_* (fs_asean_gold)") - print(" Audit : AUDIT → etl_logs, etl_metadata (fs_asean_audit)") - print("=" * 60) - - logger = setup_logging() - client = get_bigquery_client() - - print("\nLoading cleaned_integrated (fs_asean_silver)...") - df_clean = read_from_bigquery(client, 'cleaned_integrated', layer='silver') - print(f" ✓ Loaded : {len(df_clean):,} rows") - print(f" Columns : {len(df_clean.columns)}") - print(f" Sources : {df_clean['source'].nunique()}") - print(f" Indicators : {df_clean['indicator_standardized'].nunique()}") - print(f" Countries : {df_clean['country'].nunique()}") - print(f" Year range : {int(df_clean['year'].min())}–{int(df_clean['year'].max())}") - if 'direction' in df_clean.columns: - print(f" Direction : {df_clean['direction'].value_counts().to_dict()}") - else: - print(f" [WARN] direction column not found — run bigquery_cleaned_layer.py first") - - print("\n[1/1] Dimensional Model Load → DW (Gold)...") - loader = DimensionalModelLoader(client, df_clean) - loader.run() - - print("\n" + "=" * 60) - print("✓ DIMENSIONAL MODEL ETL COMPLETED") - print(" 🥇 DW (Gold) : dim_country, dim_indicator, dim_time,") - print(" dim_source, dim_pillar, fact_food_security") - print(" 📋 AUDIT : etl_logs, etl_metadata") - print("=" * 60) \ No newline at end of file