Files
airflow-coolify/scripts/bigquery_dimesional_model.py
2026-03-14 23:29:45 +07:00

850 lines
40 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
BIGQUERY DIMENSIONAL MODEL LOAD
Kimball Data Warehouse Architecture
Kimball ETL Flow yang dijalankan file ini:
Input : STAGING layer (Silver) — cleaned_integrated (fs_asean_silver)
Output : DW layer (Gold) — dim_*, fact_* (fs_asean_gold)
Audit : AUDIT layer — etl_logs, etl_metadata (fs_asean_audit)
Classes:
DimensionalModelLoader — Build Star Schema & load ke Gold layer
Usage:
python bigquery_dimensional_model.py
"""
import pandas as pd
import numpy as np
from datetime import datetime
import logging
from typing import Dict, List
import json
import sys
from scripts.bigquery_config import get_bigquery_client, CONFIG, get_table_id
from scripts.bigquery_helpers import (
log_update,
load_to_bigquery,
read_from_bigquery,
setup_logging,
truncate_table,
save_etl_metadata,
)
from google.cloud import bigquery
if hasattr(sys.stdout, 'reconfigure'):
sys.stdout.reconfigure(encoding='utf-8')
# =============================================================================
# DIMENSIONAL MODEL LOADER
# =============================================================================
class DimensionalModelLoader:
"""
Loader untuk dimensional model ke DW layer (Gold) — fs_asean_gold.
Kimball context:
Input : cleaned_integrated → STAGING (Silver) — fs_asean_silver
Output : dim_* + fact_* → DW (Gold) — fs_asean_gold
Audit : etl_logs, etl_metadata → AUDIT — fs_asean_audit
Pipeline steps:
1. Load dim_country
2. Load dim_indicator
3. Load dim_time
4. Load dim_source
5. Load dim_pillar
6. Load fact_food_security (resolve FK dari Gold dims)
7. Validate constraints & data load
"""
def __init__(self, client: bigquery.Client, df_clean: pd.DataFrame):
self.client = client
self.df_clean = df_clean
self.logger = logging.getLogger(self.__class__.__name__)
self.logger.propagate = False
self.target_layer = 'gold'
self.load_metadata = {
'dim_country' : {'start_time': None, 'end_time': None, 'rows_loaded': 0, 'status': 'pending'},
'dim_indicator' : {'start_time': None, 'end_time': None, 'rows_loaded': 0, 'status': 'pending'},
'dim_time' : {'start_time': None, 'end_time': None, 'rows_loaded': 0, 'status': 'pending'},
'dim_source' : {'start_time': None, 'end_time': None, 'rows_loaded': 0, 'status': 'pending'},
'dim_pillar' : {'start_time': None, 'end_time': None, 'rows_loaded': 0, 'status': 'pending'},
'fact_food_security': {'start_time': None, 'end_time': None, 'rows_loaded': 0, 'status': 'pending'},
}
self.pipeline_metadata = {
'source_class' : self.__class__.__name__,
'start_time' : None,
'end_time' : None,
'duration_seconds' : None,
'rows_fetched' : 0,
'rows_transformed' : 0,
'rows_loaded' : 0,
'validation_metrics': {}
}
# ------------------------------------------------------------------
# CONSTRAINT HELPERS
# ------------------------------------------------------------------
def _add_primary_key(self, table_name: str, column_name: str):
table_id = get_table_id(table_name, layer='gold')
query = f"ALTER TABLE `{table_id}` ADD PRIMARY KEY ({column_name}) NOT ENFORCED"
try:
self.client.query(query).result()
self.logger.info(f" [OK] PRIMARY KEY: {table_name}({column_name})")
except Exception as e:
if "already exists" in str(e).lower():
self.logger.info(f" [INFO] PRIMARY KEY already exists: {table_name}({column_name})")
else:
self.logger.warning(f" [WARN] Could not add PRIMARY KEY to {table_name}.{column_name}: {e}")
def _add_foreign_key(self, table_name: str, fk_column: str,
ref_table: str, ref_column: str):
table_id = get_table_id(table_name, layer='gold')
ref_table_id = get_table_id(ref_table, layer='gold')
constraint_name = f"fk_{table_name}_{fk_column}"
query = f"""
ALTER TABLE `{table_id}`
ADD CONSTRAINT {constraint_name}
FOREIGN KEY ({fk_column})
REFERENCES `{ref_table_id}`({ref_column})
NOT ENFORCED
"""
try:
self.client.query(query).result()
self.logger.info(f" [OK] FK: {table_name}.{fk_column}{ref_table}.{ref_column}")
except Exception as e:
if "already exists" in str(e).lower():
self.logger.info(f" [INFO] FK already exists: {constraint_name}")
else:
self.logger.warning(f" [WARN] Could not add FK {constraint_name}: {e}")
# ------------------------------------------------------------------
# METADATA HELPER
# ------------------------------------------------------------------
def _save_table_metadata(self, table_name: str):
meta = self.load_metadata[table_name]
metadata = {
'source_class' : self.__class__.__name__,
'table_name' : table_name,
'execution_timestamp': meta['start_time'],
'duration_seconds' : (meta['end_time'] - meta['start_time']).total_seconds()
if meta['end_time'] else 0,
'rows_fetched' : 0,
'rows_transformed' : meta['rows_loaded'],
'rows_loaded' : meta['rows_loaded'],
'completeness_pct' : 100.0 if meta['status'] == 'success' else 0.0,
'config_snapshot' : json.dumps({'load_mode': 'full_refresh', 'layer': 'gold'}),
'validation_metrics' : json.dumps({'status': meta['status'], 'rows': meta['rows_loaded']})
}
try:
save_etl_metadata(self.client, metadata)
self.logger.info(f" Metadata → [AUDIT] etl_metadata")
except Exception as e:
self.logger.warning(f" [WARN] Could not save metadata for {table_name}: {e}")
# ------------------------------------------------------------------
# DIMENSION LOADERS
# ------------------------------------------------------------------
def load_dim_time(self):
table_name = 'dim_time'
self.load_metadata[table_name]['start_time'] = datetime.now()
self.logger.info("Loading dim_time → [DW/Gold] fs_asean_gold...")
try:
if 'year_range' in self.df_clean.columns:
dim_time = self.df_clean[['year', 'year_range']].drop_duplicates().copy()
else:
dim_time = self.df_clean[['year']].drop_duplicates().copy()
dim_time['year_range'] = None
dim_time['year'] = dim_time['year'].astype(int)
def parse_year_range(row):
year = row['year']
year_range = row.get('year_range')
start_year = year
end_year = year
if pd.notna(year_range) and year_range is not None:
yr_str = str(year_range).strip()
if yr_str and yr_str != 'nan':
if '-' in yr_str:
parts = yr_str.split('-')
if len(parts) == 2:
try:
start_year = int(parts[0].strip())
end_year = int(parts[1].strip())
year = (start_year + end_year) // 2
except Exception:
pass
else:
try:
single = int(yr_str)
start_year = single
end_year = single
year = single
except Exception:
pass
return pd.Series({'year': year, 'start_year': start_year, 'end_year': end_year})
parsed = dim_time.apply(parse_year_range, axis=1)
dim_time['year'] = parsed['year'].astype(int)
dim_time['start_year'] = parsed['start_year'].astype(int)
dim_time['end_year'] = parsed['end_year'].astype(int)
dim_time['is_year_range'] = (dim_time['start_year'] != dim_time['end_year'])
dim_time['decade'] = (dim_time['year'] // 10) * 10
dim_time['is_range'] = (dim_time['start_year'] != dim_time['end_year']).astype(int)
dim_time = dim_time.sort_values(['is_range', 'start_year'], ascending=[True, True])
dim_time = dim_time.drop(['is_range', 'year_range'], axis=1, errors='ignore')
dim_time = dim_time.drop_duplicates(subset=['start_year', 'end_year'], keep='first')
dim_time_final = dim_time[['year', 'start_year', 'end_year', 'decade', 'is_year_range']].copy()
dim_time_final = dim_time_final.reset_index(drop=True)
dim_time_final.insert(0, 'time_id', range(1, len(dim_time_final) + 1))
schema = [
bigquery.SchemaField("time_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("start_year", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("end_year", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("decade", "INTEGER", mode="NULLABLE"),
bigquery.SchemaField("is_year_range", "BOOLEAN", mode="NULLABLE"),
]
rows_loaded = load_to_bigquery(
self.client, dim_time_final, table_name,
layer='gold', write_disposition="WRITE_TRUNCATE", schema=schema
)
self._add_primary_key(table_name, 'time_id')
self.load_metadata[table_name].update(
{'rows_loaded': rows_loaded, 'status': 'success', 'end_time': datetime.now()}
)
log_update(self.client, 'DW', table_name, 'full_load', rows_loaded)
self._save_table_metadata(table_name)
self.logger.info(f" ✓ dim_time: {rows_loaded} rows\n")
return rows_loaded
except Exception as e:
self.load_metadata[table_name].update({'status': 'failed', 'end_time': datetime.now()})
log_update(self.client, 'DW', table_name, 'full_load', 0, 'failed', str(e))
raise
def load_dim_country(self):
table_name = 'dim_country'
self.load_metadata[table_name]['start_time'] = datetime.now()
self.logger.info("Loading dim_country → [DW/Gold] fs_asean_gold...")
try:
dim_country = self.df_clean[['country']].drop_duplicates().copy()
dim_country.columns = ['country_name']
region_mapping = {
'Brunei Darussalam': ('Southeast Asia', 'ASEAN'),
'Cambodia' : ('Southeast Asia', 'ASEAN'),
'Indonesia' : ('Southeast Asia', 'ASEAN'),
'Laos' : ('Southeast Asia', 'ASEAN'),
'Malaysia' : ('Southeast Asia', 'ASEAN'),
'Myanmar' : ('Southeast Asia', 'ASEAN'),
'Philippines' : ('Southeast Asia', 'ASEAN'),
'Singapore' : ('Southeast Asia', 'ASEAN'),
'Thailand' : ('Southeast Asia', 'ASEAN'),
'Vietnam' : ('Southeast Asia', 'ASEAN'),
}
iso_mapping = {
'Brunei Darussalam': 'BRN', 'Cambodia': 'KHM', 'Indonesia': 'IDN',
'Laos': 'LAO', 'Malaysia': 'MYS', 'Myanmar': 'MMR',
'Philippines': 'PHL', 'Singapore': 'SGP', 'Thailand': 'THA', 'Vietnam': 'VNM',
}
dim_country['region'] = dim_country['country_name'].map(
lambda x: region_mapping.get(x, ('Unknown', 'Unknown'))[0])
dim_country['subregion'] = dim_country['country_name'].map(
lambda x: region_mapping.get(x, ('Unknown', 'Unknown'))[1])
dim_country['iso_code'] = dim_country['country_name'].map(iso_mapping)
dim_country_final = dim_country[['country_name', 'region', 'subregion', 'iso_code']].copy()
dim_country_final = dim_country_final.reset_index(drop=True)
dim_country_final.insert(0, 'country_id', range(1, len(dim_country_final) + 1))
schema = [
bigquery.SchemaField("country_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("country_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("region", "STRING", mode="NULLABLE"),
bigquery.SchemaField("subregion", "STRING", mode="NULLABLE"),
bigquery.SchemaField("iso_code", "STRING", mode="NULLABLE"),
]
rows_loaded = load_to_bigquery(
self.client, dim_country_final, table_name,
layer='gold', write_disposition="WRITE_TRUNCATE", schema=schema
)
self._add_primary_key(table_name, 'country_id')
self.load_metadata[table_name].update(
{'rows_loaded': rows_loaded, 'status': 'success', 'end_time': datetime.now()}
)
log_update(self.client, 'DW', table_name, 'full_load', rows_loaded)
self._save_table_metadata(table_name)
self.logger.info(f" ✓ dim_country: {rows_loaded} rows\n")
return rows_loaded
except Exception as e:
self.load_metadata[table_name].update({'status': 'failed', 'end_time': datetime.now()})
log_update(self.client, 'DW', table_name, 'full_load', 0, 'failed', str(e))
raise
def load_dim_indicator(self):
table_name = 'dim_indicator'
self.load_metadata[table_name]['start_time'] = datetime.now()
self.logger.info("Loading dim_indicator → [DW/Gold] fs_asean_gold...")
try:
has_direction = 'direction' in self.df_clean.columns
has_unit = 'unit' in self.df_clean.columns
has_category = 'indicator_category' in self.df_clean.columns
dim_indicator = self.df_clean[['indicator_standardized']].drop_duplicates().copy()
dim_indicator.columns = ['indicator_name']
if has_unit:
unit_map = self.df_clean[['indicator_standardized', 'unit']].drop_duplicates()
unit_map.columns = ['indicator_name', 'unit']
dim_indicator = dim_indicator.merge(unit_map, on='indicator_name', how='left')
else:
dim_indicator['unit'] = None
if has_direction:
dir_map = self.df_clean[['indicator_standardized', 'direction']].drop_duplicates()
dir_map.columns = ['indicator_name', 'direction']
dim_indicator = dim_indicator.merge(dir_map, on='indicator_name', how='left')
self.logger.info(" [OK] direction column from cleaned_integrated")
else:
dim_indicator['direction'] = 'higher_better'
self.logger.warning(" [WARN] direction not found, default: higher_better")
if has_category:
cat_map = self.df_clean[['indicator_standardized', 'indicator_category']].drop_duplicates()
cat_map.columns = ['indicator_name', 'indicator_category']
dim_indicator = dim_indicator.merge(cat_map, on='indicator_name', how='left')
else:
def categorize_indicator(name):
n = str(name).lower()
if any(w in n for w in ['undernourishment', 'malnutrition', 'stunting',
'wasting', 'anemia', 'food security', 'food insecure', 'hunger']):
return 'Health & Nutrition'
elif any(w in n for w in ['production', 'yield', 'cereal', 'crop',
'import dependency', 'share of dietary']):
return 'Agricultural Production'
elif any(w in n for w in ['import', 'export', 'trade']):
return 'Trade'
elif any(w in n for w in ['gdp', 'income', 'economic']):
return 'Economic'
elif any(w in n for w in ['water', 'sanitation', 'infrastructure', 'rail']):
return 'Infrastructure'
else:
return 'Other'
dim_indicator['indicator_category'] = dim_indicator['indicator_name'].apply(categorize_indicator)
dim_indicator = dim_indicator.drop_duplicates(subset=['indicator_name'], keep='first')
dim_indicator_final = dim_indicator[
['indicator_name', 'indicator_category', 'unit', 'direction']
].copy()
dim_indicator_final = dim_indicator_final.reset_index(drop=True)
dim_indicator_final.insert(0, 'indicator_id', range(1, len(dim_indicator_final) + 1))
schema = [
bigquery.SchemaField("indicator_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("indicator_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("indicator_category", "STRING", mode="REQUIRED"),
bigquery.SchemaField("unit", "STRING", mode="NULLABLE"),
bigquery.SchemaField("direction", "STRING", mode="REQUIRED"),
]
rows_loaded = load_to_bigquery(
self.client, dim_indicator_final, table_name,
layer='gold', write_disposition="WRITE_TRUNCATE", schema=schema
)
self._add_primary_key(table_name, 'indicator_id')
for label, col in [('Categories', 'indicator_category'), ('Direction', 'direction')]:
self.logger.info(f" {label}:")
for val, cnt in dim_indicator_final[col].value_counts().items():
self.logger.info(f" - {val}: {cnt} ({cnt/len(dim_indicator_final)*100:.1f}%)")
self.load_metadata[table_name].update(
{'rows_loaded': rows_loaded, 'status': 'success', 'end_time': datetime.now()}
)
log_update(self.client, 'DW', table_name, 'full_load', rows_loaded)
self._save_table_metadata(table_name)
self.logger.info(f" ✓ dim_indicator: {rows_loaded} rows\n")
return rows_loaded
except Exception as e:
self.load_metadata[table_name].update({'status': 'failed', 'end_time': datetime.now()})
log_update(self.client, 'DW', table_name, 'full_load', 0, 'failed', str(e))
raise
def load_dim_source(self):
table_name = 'dim_source'
self.load_metadata[table_name]['start_time'] = datetime.now()
self.logger.info("Loading dim_source → [DW/Gold] fs_asean_gold...")
try:
source_details = {
'FAO': {
'source_type' : 'International Organization',
'organization' : 'Food and Agriculture Organization',
'access_method': 'Python Library (faostat)',
'api_endpoint' : None,
},
'World Bank': {
'source_type' : 'International Organization',
'organization' : 'The World Bank',
'access_method': 'Python Library (wbgapi)',
'api_endpoint' : None,
},
'UNICEF': {
'source_type' : 'International Organization',
'organization' : "United Nations Children's Fund",
'access_method': 'SDMX API',
'api_endpoint' : 'https://sdmx.data.unicef.org/ws/public/sdmxapi/rest',
},
}
sources_data = []
for source in self.df_clean['source'].unique():
detail = source_details.get(source, {
'source_type' : 'International Organization',
'organization' : source,
'access_method': 'Unknown',
'api_endpoint' : None,
})
sources_data.append({'source_name': source, **detail})
dim_source_final = pd.DataFrame(sources_data)[
['source_name', 'source_type', 'organization', 'access_method', 'api_endpoint']
].copy()
dim_source_final = dim_source_final.reset_index(drop=True)
dim_source_final.insert(0, 'source_id', range(1, len(dim_source_final) + 1))
schema = [
bigquery.SchemaField("source_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("source_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("source_type", "STRING", mode="NULLABLE"),
bigquery.SchemaField("organization", "STRING", mode="NULLABLE"),
bigquery.SchemaField("access_method", "STRING", mode="NULLABLE"),
bigquery.SchemaField("api_endpoint", "STRING", mode="NULLABLE"),
]
rows_loaded = load_to_bigquery(
self.client, dim_source_final, table_name,
layer='gold', write_disposition="WRITE_TRUNCATE", schema=schema
)
self._add_primary_key(table_name, 'source_id')
self.load_metadata[table_name].update(
{'rows_loaded': rows_loaded, 'status': 'success', 'end_time': datetime.now()}
)
log_update(self.client, 'DW', table_name, 'full_load', rows_loaded)
self._save_table_metadata(table_name)
self.logger.info(f" ✓ dim_source: {rows_loaded} rows\n")
return rows_loaded
except Exception as e:
self.load_metadata[table_name].update({'status': 'failed', 'end_time': datetime.now()})
log_update(self.client, 'DW', table_name, 'full_load', 0, 'failed', str(e))
raise
def load_dim_pillar(self):
table_name = 'dim_pillar'
self.load_metadata[table_name]['start_time'] = datetime.now()
self.logger.info("Loading dim_pillar → [DW/Gold] fs_asean_gold...")
try:
pillar_codes = {
'Availability': 'AVL', 'Access' : 'ACC',
'Utilization' : 'UTL', 'Stability': 'STB', 'Other': 'OTH',
}
pillars_data = [
{'pillar_name': p, 'pillar_code': pillar_codes.get(p, 'OTH')}
for p in self.df_clean['pillar'].unique()
]
dim_pillar_final = pd.DataFrame(pillars_data).sort_values('pillar_name')[
['pillar_name', 'pillar_code']
].copy()
dim_pillar_final = dim_pillar_final.reset_index(drop=True)
dim_pillar_final.insert(0, 'pillar_id', range(1, len(dim_pillar_final) + 1))
schema = [
bigquery.SchemaField("pillar_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("pillar_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("pillar_code", "STRING", mode="NULLABLE"),
]
rows_loaded = load_to_bigquery(
self.client, dim_pillar_final, table_name,
layer='gold', write_disposition="WRITE_TRUNCATE", schema=schema
)
self._add_primary_key(table_name, 'pillar_id')
self.load_metadata[table_name].update(
{'rows_loaded': rows_loaded, 'status': 'success', 'end_time': datetime.now()}
)
log_update(self.client, 'DW', table_name, 'full_load', rows_loaded)
self._save_table_metadata(table_name)
self.logger.info(f" ✓ dim_pillar: {rows_loaded} rows\n")
return rows_loaded
except Exception as e:
self.load_metadata[table_name].update({'status': 'failed', 'end_time': datetime.now()})
log_update(self.client, 'DW', table_name, 'full_load', 0, 'failed', str(e))
raise
# ------------------------------------------------------------------
# FACT LOADER
# ------------------------------------------------------------------
def load_fact_food_security(self):
table_name = 'fact_food_security'
self.load_metadata[table_name]['start_time'] = datetime.now()
self.logger.info("Loading fact_food_security → [DW/Gold] fs_asean_gold...")
try:
# Load dims dari Gold untuk FK resolution
dim_country = read_from_bigquery(self.client, 'dim_country', layer='gold')
dim_indicator = read_from_bigquery(self.client, 'dim_indicator', layer='gold')
dim_time = read_from_bigquery(self.client, 'dim_time', layer='gold')
dim_source = read_from_bigquery(self.client, 'dim_source', layer='gold')
dim_pillar = read_from_bigquery(self.client, 'dim_pillar', layer='gold')
fact_table = self.df_clean.copy()
def parse_year_range_for_merge(row):
year = row['year']
year_range = row.get('year_range')
start_year = year
end_year = year
if pd.notna(year_range) and year_range is not None:
yr_str = str(year_range).strip()
if yr_str and yr_str != 'nan':
if '-' in yr_str:
parts = yr_str.split('-')
if len(parts) == 2:
try:
start_year = int(parts[0].strip())
end_year = int(parts[1].strip())
except Exception:
pass
else:
try:
single = int(yr_str)
start_year = single
end_year = single
except Exception:
pass
return pd.Series({'start_year': start_year, 'end_year': end_year})
if 'year_range' in fact_table.columns:
parsed = fact_table.apply(parse_year_range_for_merge, axis=1)
fact_table['start_year'] = parsed['start_year'].astype(int)
fact_table['end_year'] = parsed['end_year'].astype(int)
else:
fact_table['start_year'] = fact_table['year'].astype(int)
fact_table['end_year'] = fact_table['year'].astype(int)
# Resolve FKs
fact_table = fact_table.merge(
dim_country[['country_id', 'country_name']].rename(columns={'country_name': 'country'}),
on='country', how='left'
)
fact_table = fact_table.merge(
dim_indicator[['indicator_id', 'indicator_name']].rename(
columns={'indicator_name': 'indicator_standardized'}),
on='indicator_standardized', how='left'
)
fact_table = fact_table.merge(
dim_time[['time_id', 'start_year', 'end_year']],
on=['start_year', 'end_year'], how='left'
)
fact_table = fact_table.merge(
dim_source[['source_id', 'source_name']].rename(columns={'source_name': 'source'}),
on='source', how='left'
)
fact_table = fact_table.merge(
dim_pillar[['pillar_id', 'pillar_name']].rename(columns={'pillar_name': 'pillar'}),
on='pillar', how='left'
)
# Filter hanya row dengan FK lengkap
fact_table = fact_table[
fact_table['country_id'].notna() &
fact_table['indicator_id'].notna() &
fact_table['time_id'].notna() &
fact_table['source_id'].notna() &
fact_table['pillar_id'].notna()
]
fact_final = fact_table[
['country_id', 'indicator_id', 'time_id', 'source_id', 'pillar_id', 'value']
].copy()
fact_final['data_quality_score'] = 0.95
for col in ['country_id', 'indicator_id', 'time_id', 'source_id', 'pillar_id']:
fact_final[col] = fact_final[col].astype(int)
fact_final['value'] = fact_final['value'].astype(float)
fact_final = fact_final.reset_index(drop=True)
fact_final.insert(0, 'fact_id', range(1, len(fact_final) + 1))
schema = [
bigquery.SchemaField("fact_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("country_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("indicator_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("time_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("source_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("pillar_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("value", "FLOAT", mode="NULLABLE"),
bigquery.SchemaField("data_quality_score", "FLOAT", mode="NULLABLE"),
]
rows_loaded = load_to_bigquery(
self.client, fact_final, table_name,
layer='gold', write_disposition="WRITE_TRUNCATE", schema=schema
)
# Add PK + FKs
self._add_primary_key(table_name, 'fact_id')
self._add_foreign_key(table_name, 'country_id', 'dim_country', 'country_id')
self._add_foreign_key(table_name, 'indicator_id', 'dim_indicator', 'indicator_id')
self._add_foreign_key(table_name, 'time_id', 'dim_time', 'time_id')
self._add_foreign_key(table_name, 'source_id', 'dim_source', 'source_id')
self._add_foreign_key(table_name, 'pillar_id', 'dim_pillar', 'pillar_id')
self.load_metadata[table_name].update(
{'rows_loaded': rows_loaded, 'status': 'success', 'end_time': datetime.now()}
)
log_update(self.client, 'DW', table_name, 'full_load', rows_loaded)
self._save_table_metadata(table_name)
self.logger.info(f" ✓ fact_food_security: {rows_loaded:,} rows\n")
return rows_loaded
except Exception as e:
self.load_metadata[table_name].update({'status': 'failed', 'end_time': datetime.now()})
log_update(self.client, 'DW', table_name, 'full_load', 0, 'failed', str(e))
raise
# ------------------------------------------------------------------
# VALIDATION
# ------------------------------------------------------------------
def validate_constraints(self):
self.logger.info("\n" + "=" * 60)
self.logger.info("CONSTRAINT VALIDATION — fs_asean_gold")
self.logger.info("=" * 60)
try:
gold_dataset = CONFIG['bigquery']['dataset_gold']
query = f"""
SELECT table_name, constraint_name, constraint_type
FROM `{CONFIG['bigquery']['project_id']}.{gold_dataset}.INFORMATION_SCHEMA.TABLE_CONSTRAINTS`
WHERE table_name IN (
'dim_country', 'dim_indicator', 'dim_time',
'dim_source', 'dim_pillar', 'fact_food_security'
)
ORDER BY
CASE WHEN table_name LIKE 'dim_%' THEN 1 ELSE 2 END,
table_name, constraint_type
"""
df = self.client.query(query).result().to_dataframe(create_bqstorage_client=False)
if len(df) > 0:
for _, row in df.iterrows():
icon = "[PK]" if row['constraint_type'] == "PRIMARY KEY" else "[FK]"
self.logger.info(
f" {icon} {row['table_name']:25s} | "
f"{row['constraint_type']:15s} | {row['constraint_name']}"
)
pk_count = len(df[df['constraint_type'] == 'PRIMARY KEY'])
fk_count = len(df[df['constraint_type'] == 'FOREIGN KEY'])
self.logger.info(f"\n Primary Keys : {pk_count}")
self.logger.info(f" Foreign Keys : {fk_count}")
self.logger.info(f" Total : {len(df)}")
else:
self.logger.warning(" [WARN] No constraints found!")
except Exception as e:
self.logger.error(f"Error validating constraints: {e}")
def validate_data_load(self):
self.logger.info("\n" + "=" * 60)
self.logger.info("DATA LOAD VALIDATION — fs_asean_gold")
self.logger.info("=" * 60)
try:
for table in ['dim_country', 'dim_indicator', 'dim_time',
'dim_source', 'dim_pillar', 'fact_food_security']:
df = read_from_bigquery(self.client, table, layer='gold')
self.logger.info(f" {table:25s}: {len(df):>10,} rows")
query = f"""
SELECT
COUNT(*) AS total_facts,
COUNT(DISTINCT country_id) AS unique_countries,
COUNT(DISTINCT indicator_id) AS unique_indicators,
COUNT(DISTINCT time_id) AS unique_years,
COUNT(DISTINCT source_id) AS unique_sources,
COUNT(DISTINCT pillar_id) AS unique_pillars
FROM `{get_table_id('fact_food_security', layer='gold')}`
"""
stats = self.client.query(query).result().to_dataframe(
create_bqstorage_client=False
).iloc[0]
self.logger.info(f"\n Fact Table Summary:")
self.logger.info(f" Total Facts : {int(stats['total_facts']):>10,}")
self.logger.info(f" Unique Countries : {int(stats['unique_countries']):>10,}")
self.logger.info(f" Unique Indicators : {int(stats['unique_indicators']):>10,}")
self.logger.info(f" Unique Years : {int(stats['unique_years']):>10,}")
self.logger.info(f" Unique Sources : {int(stats['unique_sources']):>10,}")
self.logger.info(f" Unique Pillars : {int(stats['unique_pillars']):>10,}")
query_dir = f"""
SELECT direction, COUNT(*) AS count
FROM `{get_table_id('dim_indicator', layer='gold')}`
GROUP BY direction ORDER BY direction
"""
df_dir = self.client.query(query_dir).result().to_dataframe(create_bqstorage_client=False)
if len(df_dir) > 0:
self.logger.info(f"\n Direction Distribution:")
for _, row in df_dir.iterrows():
self.logger.info(f" {row['direction']:15s}: {int(row['count']):>5,} indicators")
self.logger.info("\n [OK] Validation completed")
except Exception as e:
self.logger.error(f"Error during validation: {e}")
raise
# ------------------------------------------------------------------
# RUN
# ------------------------------------------------------------------
def run(self):
"""Execute full dimensional model load ke DW layer (Gold)."""
self.pipeline_metadata['start_time'] = datetime.now()
self.pipeline_metadata['rows_fetched'] = len(self.df_clean)
self.logger.info("\n" + "=" * 60)
self.logger.info("DIMENSIONAL MODEL LOAD — DW (Gold) → fs_asean_gold")
self.logger.info("=" * 60)
# Dimensions
self.logger.info("\nLOADING DIMENSION TABLES → fs_asean_gold")
self.load_dim_country()
self.load_dim_indicator()
self.load_dim_time()
self.load_dim_source()
self.load_dim_pillar()
# Fact
self.logger.info("\nLOADING FACT TABLE → fs_asean_gold")
self.load_fact_food_security()
# Validate
self.validate_constraints()
self.validate_data_load()
pipeline_end = datetime.now()
duration = (pipeline_end - self.pipeline_metadata['start_time']).total_seconds()
total_loaded = sum(m['rows_loaded'] for m in self.load_metadata.values())
self.pipeline_metadata.update({
'end_time' : pipeline_end,
'duration_seconds' : duration,
'rows_transformed' : total_loaded,
'rows_loaded' : total_loaded,
'execution_timestamp': self.pipeline_metadata['start_time'],
'completeness_pct' : 100.0,
'config_snapshot' : json.dumps({'load_mode': 'full_refresh', 'layer': 'gold'}),
'validation_metrics': json.dumps({t: m['status'] for t, m in self.load_metadata.items()}),
'table_name' : 'dimensional_model_pipeline',
})
try:
save_etl_metadata(self.client, self.pipeline_metadata)
except Exception as e:
self.logger.warning(f" [WARN] Could not save pipeline metadata: {e}")
# Summary
self.logger.info("\n" + "=" * 60)
self.logger.info("DIMENSIONAL MODEL LOAD COMPLETED")
self.logger.info("=" * 60)
self.logger.info(f" Dataset : fs_asean_gold")
self.logger.info(f" Duration : {duration:.2f}s")
self.logger.info(f" Tables :")
for tbl, meta in self.load_metadata.items():
icon = "" if meta['status'] == 'success' else ""
self.logger.info(f" {icon} {tbl:25s}: {meta['rows_loaded']:>10,} rows")
self.logger.info(f"\n Metadata → [AUDIT] etl_metadata")
self.logger.info("=" * 60)
# =============================================================================
# AIRFLOW TASK FUNCTIONS ← sama polanya dengan raw & cleaned layer
# =============================================================================
def run_dimensional_model():
"""
Airflow task: Load dimensional model dari cleaned_integrated.
Dipanggil oleh DAG setelah task cleaned_integration_to_silver selesai.
"""
from scripts.bigquery_config import get_bigquery_client
client = get_bigquery_client()
df_clean = read_from_bigquery(client, 'cleaned_integrated', layer='silver')
loader = DimensionalModelLoader(client, df_clean)
loader.run()
print(f"Dimensional model loaded: {len(df_clean):,} source rows processed")
# =============================================================================
# MAIN EXECUTION
# =============================================================================
if __name__ == "__main__":
print("=" * 60)
print("BIGQUERY DIMENSIONAL MODEL LOAD")
print("Kimball DW Architecture")
print(" Input : STAGING (Silver) → cleaned_integrated (fs_asean_silver)")
print(" Output : DW (Gold) → dim_*, fact_* (fs_asean_gold)")
print(" Audit : AUDIT → etl_logs, etl_metadata (fs_asean_audit)")
print("=" * 60)
logger = setup_logging()
client = get_bigquery_client()
print("\nLoading cleaned_integrated (fs_asean_silver)...")
df_clean = read_from_bigquery(client, 'cleaned_integrated', layer='silver')
print(f" ✓ Loaded : {len(df_clean):,} rows")
print(f" Columns : {len(df_clean.columns)}")
print(f" Sources : {df_clean['source'].nunique()}")
print(f" Indicators : {df_clean['indicator_standardized'].nunique()}")
print(f" Countries : {df_clean['country'].nunique()}")
print(f" Year range : {int(df_clean['year'].min())}{int(df_clean['year'].max())}")
if 'direction' in df_clean.columns:
print(f" Direction : {df_clean['direction'].value_counts().to_dict()}")
else:
print(f" [WARN] direction column not found — run bigquery_cleaned_layer.py first")
print("\n[1/1] Dimensional Model Load → DW (Gold)...")
loader = DimensionalModelLoader(client, df_clean)
loader.run()
print("\n" + "=" * 60)
print("✓ DIMENSIONAL MODEL ETL COMPLETED")
print(" 🥇 DW (Gold) : dim_country, dim_indicator, dim_time,")
print(" dim_source, dim_pillar, fact_food_security")
print(" 📋 AUDIT : etl_logs, etl_metadata")
print("=" * 60)