replace sklearn with pure numpy

This commit is contained in:
Debby
2026-03-15 00:15:53 +07:00
parent 4b617a1e8f
commit a4ff15677e
2 changed files with 10 additions and 861 deletions

View File

@@ -24,7 +24,6 @@ from scripts.bigquery_helpers import (
save_etl_metadata,
)
from google.cloud import bigquery
from sklearn.preprocessing import MinMaxScaler
# =============================================================================
@@ -87,12 +86,10 @@ def global_minmax(series: pd.Series, lo: float = 1.0, hi: float = 100.0) -> pd.S
v_min, v_max = values.min(), values.max()
if v_min == v_max:
return pd.Series((lo + hi) / 2.0, index=series.index)
scaler = MinMaxScaler(feature_range=(lo, hi))
result = np.full(len(series), np.nan)
result = np.full(len(series), np.nan)
not_nan = series.notna()
result[not_nan.values] = scaler.fit_transform(
series[not_nan].values.reshape(-1, 1)
).flatten()
raw = series[not_nan].values
result[not_nan.values] = lo + (raw - v_min) / (v_max - v_min) * (hi - lo)
return pd.Series(result, index=series.index)
@@ -274,11 +271,13 @@ class FoodSecurityAggregator:
norm_parts.append(grp)
continue
scaler = MinMaxScaler(feature_range=(0, 1))
raw = grp.loc[valid_mask, "value"].values
v_min, v_max = raw.min(), raw.max()
normed = np.full(len(grp), np.nan)
normed[valid_mask.values] = scaler.fit_transform(
grp.loc[valid_mask, ["value"]]
).flatten()
if v_min == v_max:
normed[valid_mask.values] = 0.5
else:
normed[valid_mask.values] = (raw - v_min) / (v_max - v_min)
if do_invert:
normed = np.where(np.isnan(normed), np.nan, 1.0 - normed)
@@ -757,7 +756,7 @@ if __name__ == "__main__":
_sys.stderr = io.TextIOWrapper(_sys.stderr.buffer, encoding="utf-8", errors="replace")
print("=" * 70)
print("FOOD SECURITY AGGREGATION v8.0 — 4 TABLES -> fs_asean_gold")
print("FOOD SECURITY AGGREGATION 4 TABLES -> fs_asean_gold")
print(f" NORMALIZE_FRAMEWORKS_JOINTLY = {NORMALIZE_FRAMEWORKS_JOINTLY}")
print("=" * 70)

View File

@@ -1,850 +0,0 @@
"""
BIGQUERY DIMENSIONAL MODEL LOAD
Kimball Data Warehouse Architecture
Kimball ETL Flow yang dijalankan file ini:
Input : STAGING layer (Silver) — cleaned_integrated (fs_asean_silver)
Output : DW layer (Gold) — dim_*, fact_* (fs_asean_gold)
Audit : AUDIT layer — etl_logs, etl_metadata (fs_asean_audit)
Classes:
DimensionalModelLoader — Build Star Schema & load ke Gold layer
Usage:
python bigquery_dimensional_model.py
"""
import pandas as pd
import numpy as np
from datetime import datetime
import logging
from typing import Dict, List
import json
import sys
from scripts.bigquery_config import get_bigquery_client, CONFIG, get_table_id
from scripts.bigquery_helpers import (
log_update,
load_to_bigquery,
read_from_bigquery,
setup_logging,
truncate_table,
save_etl_metadata,
)
from google.cloud import bigquery
if hasattr(sys.stdout, 'reconfigure'):
sys.stdout.reconfigure(encoding='utf-8')
# =============================================================================
# DIMENSIONAL MODEL LOADER
# =============================================================================
class DimensionalModelLoader:
"""
Loader untuk dimensional model ke DW layer (Gold) — fs_asean_gold.
Kimball context:
Input : cleaned_integrated → STAGING (Silver) — fs_asean_silver
Output : dim_* + fact_* → DW (Gold) — fs_asean_gold
Audit : etl_logs, etl_metadata → AUDIT — fs_asean_audit
Pipeline steps:
1. Load dim_country
2. Load dim_indicator
3. Load dim_time
4. Load dim_source
5. Load dim_pillar
6. Load fact_food_security (resolve FK dari Gold dims)
7. Validate constraints & data load
"""
def __init__(self, client: bigquery.Client, df_clean: pd.DataFrame):
self.client = client
self.df_clean = df_clean
self.logger = logging.getLogger(self.__class__.__name__)
self.logger.propagate = False
self.target_layer = 'gold'
self.load_metadata = {
'dim_country' : {'start_time': None, 'end_time': None, 'rows_loaded': 0, 'status': 'pending'},
'dim_indicator' : {'start_time': None, 'end_time': None, 'rows_loaded': 0, 'status': 'pending'},
'dim_time' : {'start_time': None, 'end_time': None, 'rows_loaded': 0, 'status': 'pending'},
'dim_source' : {'start_time': None, 'end_time': None, 'rows_loaded': 0, 'status': 'pending'},
'dim_pillar' : {'start_time': None, 'end_time': None, 'rows_loaded': 0, 'status': 'pending'},
'fact_food_security': {'start_time': None, 'end_time': None, 'rows_loaded': 0, 'status': 'pending'},
}
self.pipeline_metadata = {
'source_class' : self.__class__.__name__,
'start_time' : None,
'end_time' : None,
'duration_seconds' : None,
'rows_fetched' : 0,
'rows_transformed' : 0,
'rows_loaded' : 0,
'validation_metrics': {}
}
# ------------------------------------------------------------------
# CONSTRAINT HELPERS
# ------------------------------------------------------------------
def _add_primary_key(self, table_name: str, column_name: str):
table_id = get_table_id(table_name, layer='gold')
query = f"ALTER TABLE `{table_id}` ADD PRIMARY KEY ({column_name}) NOT ENFORCED"
try:
self.client.query(query).result()
self.logger.info(f" [OK] PRIMARY KEY: {table_name}({column_name})")
except Exception as e:
if "already exists" in str(e).lower():
self.logger.info(f" [INFO] PRIMARY KEY already exists: {table_name}({column_name})")
else:
self.logger.warning(f" [WARN] Could not add PRIMARY KEY to {table_name}.{column_name}: {e}")
def _add_foreign_key(self, table_name: str, fk_column: str,
ref_table: str, ref_column: str):
table_id = get_table_id(table_name, layer='gold')
ref_table_id = get_table_id(ref_table, layer='gold')
constraint_name = f"fk_{table_name}_{fk_column}"
query = f"""
ALTER TABLE `{table_id}`
ADD CONSTRAINT {constraint_name}
FOREIGN KEY ({fk_column})
REFERENCES `{ref_table_id}`({ref_column})
NOT ENFORCED
"""
try:
self.client.query(query).result()
self.logger.info(f" [OK] FK: {table_name}.{fk_column}{ref_table}.{ref_column}")
except Exception as e:
if "already exists" in str(e).lower():
self.logger.info(f" [INFO] FK already exists: {constraint_name}")
else:
self.logger.warning(f" [WARN] Could not add FK {constraint_name}: {e}")
# ------------------------------------------------------------------
# METADATA HELPER
# ------------------------------------------------------------------
def _save_table_metadata(self, table_name: str):
meta = self.load_metadata[table_name]
metadata = {
'source_class' : self.__class__.__name__,
'table_name' : table_name,
'execution_timestamp': meta['start_time'],
'duration_seconds' : (meta['end_time'] - meta['start_time']).total_seconds()
if meta['end_time'] else 0,
'rows_fetched' : 0,
'rows_transformed' : meta['rows_loaded'],
'rows_loaded' : meta['rows_loaded'],
'completeness_pct' : 100.0 if meta['status'] == 'success' else 0.0,
'config_snapshot' : json.dumps({'load_mode': 'full_refresh', 'layer': 'gold'}),
'validation_metrics' : json.dumps({'status': meta['status'], 'rows': meta['rows_loaded']})
}
try:
save_etl_metadata(self.client, metadata)
self.logger.info(f" Metadata → [AUDIT] etl_metadata")
except Exception as e:
self.logger.warning(f" [WARN] Could not save metadata for {table_name}: {e}")
# ------------------------------------------------------------------
# DIMENSION LOADERS
# ------------------------------------------------------------------
def load_dim_time(self):
table_name = 'dim_time'
self.load_metadata[table_name]['start_time'] = datetime.now()
self.logger.info("Loading dim_time → [DW/Gold] fs_asean_gold...")
try:
if 'year_range' in self.df_clean.columns:
dim_time = self.df_clean[['year', 'year_range']].drop_duplicates().copy()
else:
dim_time = self.df_clean[['year']].drop_duplicates().copy()
dim_time['year_range'] = None
dim_time['year'] = dim_time['year'].astype(int)
def parse_year_range(row):
year = row['year']
year_range = row.get('year_range')
start_year = year
end_year = year
if pd.notna(year_range) and year_range is not None:
yr_str = str(year_range).strip()
if yr_str and yr_str != 'nan':
if '-' in yr_str:
parts = yr_str.split('-')
if len(parts) == 2:
try:
start_year = int(parts[0].strip())
end_year = int(parts[1].strip())
year = (start_year + end_year) // 2
except Exception:
pass
else:
try:
single = int(yr_str)
start_year = single
end_year = single
year = single
except Exception:
pass
return pd.Series({'year': year, 'start_year': start_year, 'end_year': end_year})
parsed = dim_time.apply(parse_year_range, axis=1)
dim_time['year'] = parsed['year'].astype(int)
dim_time['start_year'] = parsed['start_year'].astype(int)
dim_time['end_year'] = parsed['end_year'].astype(int)
dim_time['is_year_range'] = (dim_time['start_year'] != dim_time['end_year'])
dim_time['decade'] = (dim_time['year'] // 10) * 10
dim_time['is_range'] = (dim_time['start_year'] != dim_time['end_year']).astype(int)
dim_time = dim_time.sort_values(['is_range', 'start_year'], ascending=[True, True])
dim_time = dim_time.drop(['is_range', 'year_range'], axis=1, errors='ignore')
dim_time = dim_time.drop_duplicates(subset=['start_year', 'end_year'], keep='first')
dim_time_final = dim_time[['year', 'start_year', 'end_year', 'decade', 'is_year_range']].copy()
dim_time_final = dim_time_final.reset_index(drop=True)
dim_time_final.insert(0, 'time_id', range(1, len(dim_time_final) + 1))
schema = [
bigquery.SchemaField("time_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("start_year", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("end_year", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("decade", "INTEGER", mode="NULLABLE"),
bigquery.SchemaField("is_year_range", "BOOLEAN", mode="NULLABLE"),
]
rows_loaded = load_to_bigquery(
self.client, dim_time_final, table_name,
layer='gold', write_disposition="WRITE_TRUNCATE", schema=schema
)
self._add_primary_key(table_name, 'time_id')
self.load_metadata[table_name].update(
{'rows_loaded': rows_loaded, 'status': 'success', 'end_time': datetime.now()}
)
log_update(self.client, 'DW', table_name, 'full_load', rows_loaded)
self._save_table_metadata(table_name)
self.logger.info(f" ✓ dim_time: {rows_loaded} rows\n")
return rows_loaded
except Exception as e:
self.load_metadata[table_name].update({'status': 'failed', 'end_time': datetime.now()})
log_update(self.client, 'DW', table_name, 'full_load', 0, 'failed', str(e))
raise
def load_dim_country(self):
table_name = 'dim_country'
self.load_metadata[table_name]['start_time'] = datetime.now()
self.logger.info("Loading dim_country → [DW/Gold] fs_asean_gold...")
try:
dim_country = self.df_clean[['country']].drop_duplicates().copy()
dim_country.columns = ['country_name']
region_mapping = {
'Brunei Darussalam': ('Southeast Asia', 'ASEAN'),
'Cambodia' : ('Southeast Asia', 'ASEAN'),
'Indonesia' : ('Southeast Asia', 'ASEAN'),
'Laos' : ('Southeast Asia', 'ASEAN'),
'Malaysia' : ('Southeast Asia', 'ASEAN'),
'Myanmar' : ('Southeast Asia', 'ASEAN'),
'Philippines' : ('Southeast Asia', 'ASEAN'),
'Singapore' : ('Southeast Asia', 'ASEAN'),
'Thailand' : ('Southeast Asia', 'ASEAN'),
'Vietnam' : ('Southeast Asia', 'ASEAN'),
}
iso_mapping = {
'Brunei Darussalam': 'BRN', 'Cambodia': 'KHM', 'Indonesia': 'IDN',
'Laos': 'LAO', 'Malaysia': 'MYS', 'Myanmar': 'MMR',
'Philippines': 'PHL', 'Singapore': 'SGP', 'Thailand': 'THA', 'Vietnam': 'VNM',
}
dim_country['region'] = dim_country['country_name'].map(
lambda x: region_mapping.get(x, ('Unknown', 'Unknown'))[0])
dim_country['subregion'] = dim_country['country_name'].map(
lambda x: region_mapping.get(x, ('Unknown', 'Unknown'))[1])
dim_country['iso_code'] = dim_country['country_name'].map(iso_mapping)
dim_country_final = dim_country[['country_name', 'region', 'subregion', 'iso_code']].copy()
dim_country_final = dim_country_final.reset_index(drop=True)
dim_country_final.insert(0, 'country_id', range(1, len(dim_country_final) + 1))
schema = [
bigquery.SchemaField("country_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("country_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("region", "STRING", mode="NULLABLE"),
bigquery.SchemaField("subregion", "STRING", mode="NULLABLE"),
bigquery.SchemaField("iso_code", "STRING", mode="NULLABLE"),
]
rows_loaded = load_to_bigquery(
self.client, dim_country_final, table_name,
layer='gold', write_disposition="WRITE_TRUNCATE", schema=schema
)
self._add_primary_key(table_name, 'country_id')
self.load_metadata[table_name].update(
{'rows_loaded': rows_loaded, 'status': 'success', 'end_time': datetime.now()}
)
log_update(self.client, 'DW', table_name, 'full_load', rows_loaded)
self._save_table_metadata(table_name)
self.logger.info(f" ✓ dim_country: {rows_loaded} rows\n")
return rows_loaded
except Exception as e:
self.load_metadata[table_name].update({'status': 'failed', 'end_time': datetime.now()})
log_update(self.client, 'DW', table_name, 'full_load', 0, 'failed', str(e))
raise
def load_dim_indicator(self):
table_name = 'dim_indicator'
self.load_metadata[table_name]['start_time'] = datetime.now()
self.logger.info("Loading dim_indicator → [DW/Gold] fs_asean_gold...")
try:
has_direction = 'direction' in self.df_clean.columns
has_unit = 'unit' in self.df_clean.columns
has_category = 'indicator_category' in self.df_clean.columns
dim_indicator = self.df_clean[['indicator_standardized']].drop_duplicates().copy()
dim_indicator.columns = ['indicator_name']
if has_unit:
unit_map = self.df_clean[['indicator_standardized', 'unit']].drop_duplicates()
unit_map.columns = ['indicator_name', 'unit']
dim_indicator = dim_indicator.merge(unit_map, on='indicator_name', how='left')
else:
dim_indicator['unit'] = None
if has_direction:
dir_map = self.df_clean[['indicator_standardized', 'direction']].drop_duplicates()
dir_map.columns = ['indicator_name', 'direction']
dim_indicator = dim_indicator.merge(dir_map, on='indicator_name', how='left')
self.logger.info(" [OK] direction column from cleaned_integrated")
else:
dim_indicator['direction'] = 'higher_better'
self.logger.warning(" [WARN] direction not found, default: higher_better")
if has_category:
cat_map = self.df_clean[['indicator_standardized', 'indicator_category']].drop_duplicates()
cat_map.columns = ['indicator_name', 'indicator_category']
dim_indicator = dim_indicator.merge(cat_map, on='indicator_name', how='left')
else:
def categorize_indicator(name):
n = str(name).lower()
if any(w in n for w in ['undernourishment', 'malnutrition', 'stunting',
'wasting', 'anemia', 'food security', 'food insecure', 'hunger']):
return 'Health & Nutrition'
elif any(w in n for w in ['production', 'yield', 'cereal', 'crop',
'import dependency', 'share of dietary']):
return 'Agricultural Production'
elif any(w in n for w in ['import', 'export', 'trade']):
return 'Trade'
elif any(w in n for w in ['gdp', 'income', 'economic']):
return 'Economic'
elif any(w in n for w in ['water', 'sanitation', 'infrastructure', 'rail']):
return 'Infrastructure'
else:
return 'Other'
dim_indicator['indicator_category'] = dim_indicator['indicator_name'].apply(categorize_indicator)
dim_indicator = dim_indicator.drop_duplicates(subset=['indicator_name'], keep='first')
dim_indicator_final = dim_indicator[
['indicator_name', 'indicator_category', 'unit', 'direction']
].copy()
dim_indicator_final = dim_indicator_final.reset_index(drop=True)
dim_indicator_final.insert(0, 'indicator_id', range(1, len(dim_indicator_final) + 1))
schema = [
bigquery.SchemaField("indicator_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("indicator_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("indicator_category", "STRING", mode="REQUIRED"),
bigquery.SchemaField("unit", "STRING", mode="NULLABLE"),
bigquery.SchemaField("direction", "STRING", mode="REQUIRED"),
]
rows_loaded = load_to_bigquery(
self.client, dim_indicator_final, table_name,
layer='gold', write_disposition="WRITE_TRUNCATE", schema=schema
)
self._add_primary_key(table_name, 'indicator_id')
for label, col in [('Categories', 'indicator_category'), ('Direction', 'direction')]:
self.logger.info(f" {label}:")
for val, cnt in dim_indicator_final[col].value_counts().items():
self.logger.info(f" - {val}: {cnt} ({cnt/len(dim_indicator_final)*100:.1f}%)")
self.load_metadata[table_name].update(
{'rows_loaded': rows_loaded, 'status': 'success', 'end_time': datetime.now()}
)
log_update(self.client, 'DW', table_name, 'full_load', rows_loaded)
self._save_table_metadata(table_name)
self.logger.info(f" ✓ dim_indicator: {rows_loaded} rows\n")
return rows_loaded
except Exception as e:
self.load_metadata[table_name].update({'status': 'failed', 'end_time': datetime.now()})
log_update(self.client, 'DW', table_name, 'full_load', 0, 'failed', str(e))
raise
def load_dim_source(self):
table_name = 'dim_source'
self.load_metadata[table_name]['start_time'] = datetime.now()
self.logger.info("Loading dim_source → [DW/Gold] fs_asean_gold...")
try:
source_details = {
'FAO': {
'source_type' : 'International Organization',
'organization' : 'Food and Agriculture Organization',
'access_method': 'Python Library (faostat)',
'api_endpoint' : None,
},
'World Bank': {
'source_type' : 'International Organization',
'organization' : 'The World Bank',
'access_method': 'Python Library (wbgapi)',
'api_endpoint' : None,
},
'UNICEF': {
'source_type' : 'International Organization',
'organization' : "United Nations Children's Fund",
'access_method': 'SDMX API',
'api_endpoint' : 'https://sdmx.data.unicef.org/ws/public/sdmxapi/rest',
},
}
sources_data = []
for source in self.df_clean['source'].unique():
detail = source_details.get(source, {
'source_type' : 'International Organization',
'organization' : source,
'access_method': 'Unknown',
'api_endpoint' : None,
})
sources_data.append({'source_name': source, **detail})
dim_source_final = pd.DataFrame(sources_data)[
['source_name', 'source_type', 'organization', 'access_method', 'api_endpoint']
].copy()
dim_source_final = dim_source_final.reset_index(drop=True)
dim_source_final.insert(0, 'source_id', range(1, len(dim_source_final) + 1))
schema = [
bigquery.SchemaField("source_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("source_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("source_type", "STRING", mode="NULLABLE"),
bigquery.SchemaField("organization", "STRING", mode="NULLABLE"),
bigquery.SchemaField("access_method", "STRING", mode="NULLABLE"),
bigquery.SchemaField("api_endpoint", "STRING", mode="NULLABLE"),
]
rows_loaded = load_to_bigquery(
self.client, dim_source_final, table_name,
layer='gold', write_disposition="WRITE_TRUNCATE", schema=schema
)
self._add_primary_key(table_name, 'source_id')
self.load_metadata[table_name].update(
{'rows_loaded': rows_loaded, 'status': 'success', 'end_time': datetime.now()}
)
log_update(self.client, 'DW', table_name, 'full_load', rows_loaded)
self._save_table_metadata(table_name)
self.logger.info(f" ✓ dim_source: {rows_loaded} rows\n")
return rows_loaded
except Exception as e:
self.load_metadata[table_name].update({'status': 'failed', 'end_time': datetime.now()})
log_update(self.client, 'DW', table_name, 'full_load', 0, 'failed', str(e))
raise
def load_dim_pillar(self):
table_name = 'dim_pillar'
self.load_metadata[table_name]['start_time'] = datetime.now()
self.logger.info("Loading dim_pillar → [DW/Gold] fs_asean_gold...")
try:
pillar_codes = {
'Availability': 'AVL', 'Access' : 'ACC',
'Utilization' : 'UTL', 'Stability': 'STB', 'Other': 'OTH',
}
pillars_data = [
{'pillar_name': p, 'pillar_code': pillar_codes.get(p, 'OTH')}
for p in self.df_clean['pillar'].unique()
]
dim_pillar_final = pd.DataFrame(pillars_data).sort_values('pillar_name')[
['pillar_name', 'pillar_code']
].copy()
dim_pillar_final = dim_pillar_final.reset_index(drop=True)
dim_pillar_final.insert(0, 'pillar_id', range(1, len(dim_pillar_final) + 1))
schema = [
bigquery.SchemaField("pillar_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("pillar_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("pillar_code", "STRING", mode="NULLABLE"),
]
rows_loaded = load_to_bigquery(
self.client, dim_pillar_final, table_name,
layer='gold', write_disposition="WRITE_TRUNCATE", schema=schema
)
self._add_primary_key(table_name, 'pillar_id')
self.load_metadata[table_name].update(
{'rows_loaded': rows_loaded, 'status': 'success', 'end_time': datetime.now()}
)
log_update(self.client, 'DW', table_name, 'full_load', rows_loaded)
self._save_table_metadata(table_name)
self.logger.info(f" ✓ dim_pillar: {rows_loaded} rows\n")
return rows_loaded
except Exception as e:
self.load_metadata[table_name].update({'status': 'failed', 'end_time': datetime.now()})
log_update(self.client, 'DW', table_name, 'full_load', 0, 'failed', str(e))
raise
# ------------------------------------------------------------------
# FACT LOADER
# ------------------------------------------------------------------
def load_fact_food_security(self):
table_name = 'fact_food_security'
self.load_metadata[table_name]['start_time'] = datetime.now()
self.logger.info("Loading fact_food_security → [DW/Gold] fs_asean_gold...")
try:
# Load dims dari Gold untuk FK resolution
dim_country = read_from_bigquery(self.client, 'dim_country', layer='gold')
dim_indicator = read_from_bigquery(self.client, 'dim_indicator', layer='gold')
dim_time = read_from_bigquery(self.client, 'dim_time', layer='gold')
dim_source = read_from_bigquery(self.client, 'dim_source', layer='gold')
dim_pillar = read_from_bigquery(self.client, 'dim_pillar', layer='gold')
fact_table = self.df_clean.copy()
def parse_year_range_for_merge(row):
year = row['year']
year_range = row.get('year_range')
start_year = year
end_year = year
if pd.notna(year_range) and year_range is not None:
yr_str = str(year_range).strip()
if yr_str and yr_str != 'nan':
if '-' in yr_str:
parts = yr_str.split('-')
if len(parts) == 2:
try:
start_year = int(parts[0].strip())
end_year = int(parts[1].strip())
except Exception:
pass
else:
try:
single = int(yr_str)
start_year = single
end_year = single
except Exception:
pass
return pd.Series({'start_year': start_year, 'end_year': end_year})
if 'year_range' in fact_table.columns:
parsed = fact_table.apply(parse_year_range_for_merge, axis=1)
fact_table['start_year'] = parsed['start_year'].astype(int)
fact_table['end_year'] = parsed['end_year'].astype(int)
else:
fact_table['start_year'] = fact_table['year'].astype(int)
fact_table['end_year'] = fact_table['year'].astype(int)
# Resolve FKs
fact_table = fact_table.merge(
dim_country[['country_id', 'country_name']].rename(columns={'country_name': 'country'}),
on='country', how='left'
)
fact_table = fact_table.merge(
dim_indicator[['indicator_id', 'indicator_name']].rename(
columns={'indicator_name': 'indicator_standardized'}),
on='indicator_standardized', how='left'
)
fact_table = fact_table.merge(
dim_time[['time_id', 'start_year', 'end_year']],
on=['start_year', 'end_year'], how='left'
)
fact_table = fact_table.merge(
dim_source[['source_id', 'source_name']].rename(columns={'source_name': 'source'}),
on='source', how='left'
)
fact_table = fact_table.merge(
dim_pillar[['pillar_id', 'pillar_name']].rename(columns={'pillar_name': 'pillar'}),
on='pillar', how='left'
)
# Filter hanya row dengan FK lengkap
fact_table = fact_table[
fact_table['country_id'].notna() &
fact_table['indicator_id'].notna() &
fact_table['time_id'].notna() &
fact_table['source_id'].notna() &
fact_table['pillar_id'].notna()
]
fact_final = fact_table[
['country_id', 'indicator_id', 'time_id', 'source_id', 'pillar_id', 'value']
].copy()
fact_final['data_quality_score'] = 0.95
for col in ['country_id', 'indicator_id', 'time_id', 'source_id', 'pillar_id']:
fact_final[col] = fact_final[col].astype(int)
fact_final['value'] = fact_final['value'].astype(float)
fact_final = fact_final.reset_index(drop=True)
fact_final.insert(0, 'fact_id', range(1, len(fact_final) + 1))
schema = [
bigquery.SchemaField("fact_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("country_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("indicator_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("time_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("source_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("pillar_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("value", "FLOAT", mode="NULLABLE"),
bigquery.SchemaField("data_quality_score", "FLOAT", mode="NULLABLE"),
]
rows_loaded = load_to_bigquery(
self.client, fact_final, table_name,
layer='gold', write_disposition="WRITE_TRUNCATE", schema=schema
)
# Add PK + FKs
self._add_primary_key(table_name, 'fact_id')
self._add_foreign_key(table_name, 'country_id', 'dim_country', 'country_id')
self._add_foreign_key(table_name, 'indicator_id', 'dim_indicator', 'indicator_id')
self._add_foreign_key(table_name, 'time_id', 'dim_time', 'time_id')
self._add_foreign_key(table_name, 'source_id', 'dim_source', 'source_id')
self._add_foreign_key(table_name, 'pillar_id', 'dim_pillar', 'pillar_id')
self.load_metadata[table_name].update(
{'rows_loaded': rows_loaded, 'status': 'success', 'end_time': datetime.now()}
)
log_update(self.client, 'DW', table_name, 'full_load', rows_loaded)
self._save_table_metadata(table_name)
self.logger.info(f" ✓ fact_food_security: {rows_loaded:,} rows\n")
return rows_loaded
except Exception as e:
self.load_metadata[table_name].update({'status': 'failed', 'end_time': datetime.now()})
log_update(self.client, 'DW', table_name, 'full_load', 0, 'failed', str(e))
raise
# ------------------------------------------------------------------
# VALIDATION
# ------------------------------------------------------------------
def validate_constraints(self):
self.logger.info("\n" + "=" * 60)
self.logger.info("CONSTRAINT VALIDATION — fs_asean_gold")
self.logger.info("=" * 60)
try:
gold_dataset = CONFIG['bigquery']['dataset_gold']
query = f"""
SELECT table_name, constraint_name, constraint_type
FROM `{CONFIG['bigquery']['project_id']}.{gold_dataset}.INFORMATION_SCHEMA.TABLE_CONSTRAINTS`
WHERE table_name IN (
'dim_country', 'dim_indicator', 'dim_time',
'dim_source', 'dim_pillar', 'fact_food_security'
)
ORDER BY
CASE WHEN table_name LIKE 'dim_%' THEN 1 ELSE 2 END,
table_name, constraint_type
"""
df = self.client.query(query).result().to_dataframe(create_bqstorage_client=False)
if len(df) > 0:
for _, row in df.iterrows():
icon = "[PK]" if row['constraint_type'] == "PRIMARY KEY" else "[FK]"
self.logger.info(
f" {icon} {row['table_name']:25s} | "
f"{row['constraint_type']:15s} | {row['constraint_name']}"
)
pk_count = len(df[df['constraint_type'] == 'PRIMARY KEY'])
fk_count = len(df[df['constraint_type'] == 'FOREIGN KEY'])
self.logger.info(f"\n Primary Keys : {pk_count}")
self.logger.info(f" Foreign Keys : {fk_count}")
self.logger.info(f" Total : {len(df)}")
else:
self.logger.warning(" [WARN] No constraints found!")
except Exception as e:
self.logger.error(f"Error validating constraints: {e}")
def validate_data_load(self):
self.logger.info("\n" + "=" * 60)
self.logger.info("DATA LOAD VALIDATION — fs_asean_gold")
self.logger.info("=" * 60)
try:
for table in ['dim_country', 'dim_indicator', 'dim_time',
'dim_source', 'dim_pillar', 'fact_food_security']:
df = read_from_bigquery(self.client, table, layer='gold')
self.logger.info(f" {table:25s}: {len(df):>10,} rows")
query = f"""
SELECT
COUNT(*) AS total_facts,
COUNT(DISTINCT country_id) AS unique_countries,
COUNT(DISTINCT indicator_id) AS unique_indicators,
COUNT(DISTINCT time_id) AS unique_years,
COUNT(DISTINCT source_id) AS unique_sources,
COUNT(DISTINCT pillar_id) AS unique_pillars
FROM `{get_table_id('fact_food_security', layer='gold')}`
"""
stats = self.client.query(query).result().to_dataframe(
create_bqstorage_client=False
).iloc[0]
self.logger.info(f"\n Fact Table Summary:")
self.logger.info(f" Total Facts : {int(stats['total_facts']):>10,}")
self.logger.info(f" Unique Countries : {int(stats['unique_countries']):>10,}")
self.logger.info(f" Unique Indicators : {int(stats['unique_indicators']):>10,}")
self.logger.info(f" Unique Years : {int(stats['unique_years']):>10,}")
self.logger.info(f" Unique Sources : {int(stats['unique_sources']):>10,}")
self.logger.info(f" Unique Pillars : {int(stats['unique_pillars']):>10,}")
query_dir = f"""
SELECT direction, COUNT(*) AS count
FROM `{get_table_id('dim_indicator', layer='gold')}`
GROUP BY direction ORDER BY direction
"""
df_dir = self.client.query(query_dir).result().to_dataframe(create_bqstorage_client=False)
if len(df_dir) > 0:
self.logger.info(f"\n Direction Distribution:")
for _, row in df_dir.iterrows():
self.logger.info(f" {row['direction']:15s}: {int(row['count']):>5,} indicators")
self.logger.info("\n [OK] Validation completed")
except Exception as e:
self.logger.error(f"Error during validation: {e}")
raise
# ------------------------------------------------------------------
# RUN
# ------------------------------------------------------------------
def run(self):
"""Execute full dimensional model load ke DW layer (Gold)."""
self.pipeline_metadata['start_time'] = datetime.now()
self.pipeline_metadata['rows_fetched'] = len(self.df_clean)
self.logger.info("\n" + "=" * 60)
self.logger.info("DIMENSIONAL MODEL LOAD — DW (Gold) → fs_asean_gold")
self.logger.info("=" * 60)
# Dimensions
self.logger.info("\nLOADING DIMENSION TABLES → fs_asean_gold")
self.load_dim_country()
self.load_dim_indicator()
self.load_dim_time()
self.load_dim_source()
self.load_dim_pillar()
# Fact
self.logger.info("\nLOADING FACT TABLE → fs_asean_gold")
self.load_fact_food_security()
# Validate
self.validate_constraints()
self.validate_data_load()
pipeline_end = datetime.now()
duration = (pipeline_end - self.pipeline_metadata['start_time']).total_seconds()
total_loaded = sum(m['rows_loaded'] for m in self.load_metadata.values())
self.pipeline_metadata.update({
'end_time' : pipeline_end,
'duration_seconds' : duration,
'rows_transformed' : total_loaded,
'rows_loaded' : total_loaded,
'execution_timestamp': self.pipeline_metadata['start_time'],
'completeness_pct' : 100.0,
'config_snapshot' : json.dumps({'load_mode': 'full_refresh', 'layer': 'gold'}),
'validation_metrics': json.dumps({t: m['status'] for t, m in self.load_metadata.items()}),
'table_name' : 'dimensional_model_pipeline',
})
try:
save_etl_metadata(self.client, self.pipeline_metadata)
except Exception as e:
self.logger.warning(f" [WARN] Could not save pipeline metadata: {e}")
# Summary
self.logger.info("\n" + "=" * 60)
self.logger.info("DIMENSIONAL MODEL LOAD COMPLETED")
self.logger.info("=" * 60)
self.logger.info(f" Dataset : fs_asean_gold")
self.logger.info(f" Duration : {duration:.2f}s")
self.logger.info(f" Tables :")
for tbl, meta in self.load_metadata.items():
icon = "" if meta['status'] == 'success' else ""
self.logger.info(f" {icon} {tbl:25s}: {meta['rows_loaded']:>10,} rows")
self.logger.info(f"\n Metadata → [AUDIT] etl_metadata")
self.logger.info("=" * 60)
# =============================================================================
# AIRFLOW TASK FUNCTIONS ← sama polanya dengan raw & cleaned layer
# =============================================================================
def run_dimensional_model():
"""
Airflow task: Load dimensional model dari cleaned_integrated.
Dipanggil oleh DAG setelah task cleaned_integration_to_silver selesai.
"""
from scripts.bigquery_config import get_bigquery_client
client = get_bigquery_client()
df_clean = read_from_bigquery(client, 'cleaned_integrated', layer='silver')
loader = DimensionalModelLoader(client, df_clean)
loader.run()
print(f"Dimensional model loaded: {len(df_clean):,} source rows processed")
# =============================================================================
# MAIN EXECUTION
# =============================================================================
if __name__ == "__main__":
print("=" * 60)
print("BIGQUERY DIMENSIONAL MODEL LOAD")
print("Kimball DW Architecture")
print(" Input : STAGING (Silver) → cleaned_integrated (fs_asean_silver)")
print(" Output : DW (Gold) → dim_*, fact_* (fs_asean_gold)")
print(" Audit : AUDIT → etl_logs, etl_metadata (fs_asean_audit)")
print("=" * 60)
logger = setup_logging()
client = get_bigquery_client()
print("\nLoading cleaned_integrated (fs_asean_silver)...")
df_clean = read_from_bigquery(client, 'cleaned_integrated', layer='silver')
print(f" ✓ Loaded : {len(df_clean):,} rows")
print(f" Columns : {len(df_clean.columns)}")
print(f" Sources : {df_clean['source'].nunique()}")
print(f" Indicators : {df_clean['indicator_standardized'].nunique()}")
print(f" Countries : {df_clean['country'].nunique()}")
print(f" Year range : {int(df_clean['year'].min())}{int(df_clean['year'].max())}")
if 'direction' in df_clean.columns:
print(f" Direction : {df_clean['direction'].value_counts().to_dict()}")
else:
print(f" [WARN] direction column not found — run bigquery_cleaned_layer.py first")
print("\n[1/1] Dimensional Model Load → DW (Gold)...")
loader = DimensionalModelLoader(client, df_clean)
loader.run()
print("\n" + "=" * 60)
print("✓ DIMENSIONAL MODEL ETL COMPLETED")
print(" 🥇 DW (Gold) : dim_country, dim_indicator, dim_time,")
print(" dim_source, dim_pillar, fact_food_security")
print(" 📋 AUDIT : etl_logs, etl_metadata")
print("=" * 60)