SDGS MDGS indicator

This commit is contained in:
Debby
2026-03-28 19:15:24 +07:00
parent 0ffdf40430
commit dc981aacab
4 changed files with 812 additions and 329 deletions

View File

@@ -46,13 +46,13 @@ class DimensionalModelLoader:
Loader untuk dimensional model ke DW layer (Gold) — fs_asean_gold.
Kimball context:
Input : cleaned_integrated STAGING (Silver) — fs_asean_silver
Output : dim_* + fact_* DW (Gold) — fs_asean_gold
Audit : etl_logs, etl_metadata AUDIT — fs_asean_audit
Input : cleaned_integrated -> STAGING (Silver) — fs_asean_silver
Output : dim_* + fact_* -> DW (Gold) — fs_asean_gold
Audit : etl_logs, etl_metadata -> AUDIT — fs_asean_audit
Pipeline steps:
1. Load dim_country
2. Load dim_indicator
2. Load dim_indicator (+ kolom framework dari cleaned_integrated)
3. Load dim_time
4. Load dim_source
5. Load dim_pillar
@@ -117,7 +117,7 @@ class DimensionalModelLoader:
"""
try:
self.client.query(query).result()
self.logger.info(f" [OK] FK: {table_name}.{fk_column} {ref_table}.{ref_column}")
self.logger.info(f" [OK] FK: {table_name}.{fk_column} -> {ref_table}.{ref_column}")
except Exception as e:
if "already exists" in str(e).lower():
self.logger.info(f" [INFO] FK already exists: {constraint_name}")
@@ -129,7 +129,7 @@ class DimensionalModelLoader:
# ------------------------------------------------------------------
def _save_table_metadata(self, table_name: str):
meta = self.load_metadata[table_name]
meta = self.load_metadata[table_name]
metadata = {
'source_class' : self.__class__.__name__,
'table_name' : table_name,
@@ -145,7 +145,7 @@ class DimensionalModelLoader:
}
try:
save_etl_metadata(self.client, metadata)
self.logger.info(f" Metadata [AUDIT] etl_metadata")
self.logger.info(f" Metadata -> [AUDIT] etl_metadata")
except Exception as e:
self.logger.warning(f" [WARN] Could not save metadata for {table_name}: {e}")
@@ -156,13 +156,13 @@ class DimensionalModelLoader:
def load_dim_time(self):
table_name = 'dim_time'
self.load_metadata[table_name]['start_time'] = datetime.now()
self.logger.info("Loading dim_time [DW/Gold] fs_asean_gold...")
self.logger.info("Loading dim_time -> [DW/Gold] fs_asean_gold...")
try:
if 'year_range' in self.df_clean.columns:
dim_time = self.df_clean[['year', 'year_range']].drop_duplicates().copy()
else:
dim_time = self.df_clean[['year']].drop_duplicates().copy()
dim_time = self.df_clean[['year']].drop_duplicates().copy()
dim_time['year_range'] = None
dim_time['year'] = dim_time['year'].astype(int)
@@ -194,10 +194,10 @@ class DimensionalModelLoader:
pass
return pd.Series({'year': year, 'start_year': start_year, 'end_year': end_year})
parsed = dim_time.apply(parse_year_range, axis=1)
dim_time['year'] = parsed['year'].astype(int)
dim_time['start_year'] = parsed['start_year'].astype(int)
dim_time['end_year'] = parsed['end_year'].astype(int)
parsed = dim_time.apply(parse_year_range, axis=1)
dim_time['year'] = parsed['year'].astype(int)
dim_time['start_year'] = parsed['start_year'].astype(int)
dim_time['end_year'] = parsed['end_year'].astype(int)
dim_time['is_year_range'] = (dim_time['start_year'] != dim_time['end_year'])
dim_time['decade'] = (dim_time['year'] // 10) * 10
dim_time['is_range'] = (dim_time['start_year'] != dim_time['end_year']).astype(int)
@@ -229,7 +229,7 @@ class DimensionalModelLoader:
)
log_update(self.client, 'DW', table_name, 'full_load', rows_loaded)
self._save_table_metadata(table_name)
self.logger.info(f" dim_time: {rows_loaded} rows\n")
self.logger.info(f" dim_time: {rows_loaded} rows\n")
return rows_loaded
except Exception as e:
@@ -240,11 +240,11 @@ class DimensionalModelLoader:
def load_dim_country(self):
table_name = 'dim_country'
self.load_metadata[table_name]['start_time'] = datetime.now()
self.logger.info("Loading dim_country [DW/Gold] fs_asean_gold...")
self.logger.info("Loading dim_country -> [DW/Gold] fs_asean_gold...")
try:
dim_country = self.df_clean[['country']].drop_duplicates().copy()
dim_country.columns = ['country_name']
dim_country = self.df_clean[['country']].drop_duplicates().copy()
dim_country.columns = ['country_name']
region_mapping = {
'Brunei Darussalam': ('Southeast Asia', 'ASEAN'),
@@ -270,7 +270,9 @@ class DimensionalModelLoader:
lambda x: region_mapping.get(x, ('Unknown', 'Unknown'))[1])
dim_country['iso_code'] = dim_country['country_name'].map(iso_mapping)
dim_country_final = dim_country[['country_name', 'region', 'subregion', 'iso_code']].copy()
dim_country_final = dim_country[
['country_name', 'region', 'subregion', 'iso_code']
].copy()
dim_country_final = dim_country_final.reset_index(drop=True)
dim_country_final.insert(0, 'country_id', range(1, len(dim_country_final) + 1))
@@ -293,7 +295,7 @@ class DimensionalModelLoader:
)
log_update(self.client, 'DW', table_name, 'full_load', rows_loaded)
self._save_table_metadata(table_name)
self.logger.info(f" dim_country: {rows_loaded} rows\n")
self.logger.info(f" dim_country: {rows_loaded} rows\n")
return rows_loaded
except Exception as e:
@@ -302,60 +304,106 @@ class DimensionalModelLoader:
raise
def load_dim_indicator(self):
"""
Load dim_indicator ke Gold layer.
Kolom yang dimuat:
indicator_id — surrogate key
indicator_name — nama standar indikator
indicator_category — kategori (Health & Nutrition, dll.)
unit — satuan ukuran
direction — higher_better / lower_better
framework — MDGs / SDGs <-- BARU: dibaca dari cleaned_integrated
"""
table_name = 'dim_indicator'
self.load_metadata[table_name]['start_time'] = datetime.now()
self.logger.info("Loading dim_indicator [DW/Gold] fs_asean_gold...")
self.logger.info("Loading dim_indicator -> [DW/Gold] fs_asean_gold...")
try:
has_direction = 'direction' in self.df_clean.columns
has_unit = 'unit' in self.df_clean.columns
has_category = 'indicator_category' in self.df_clean.columns
has_framework = 'framework' in self.df_clean.columns
dim_indicator = self.df_clean[['indicator_standardized']].drop_duplicates().copy()
dim_indicator.columns = ['indicator_name']
dim_indicator = self.df_clean[['indicator_standardized']].drop_duplicates().copy()
dim_indicator.columns = ['indicator_name']
# Unit
if has_unit:
unit_map = self.df_clean[['indicator_standardized', 'unit']].drop_duplicates()
unit_map.columns = ['indicator_name', 'unit']
dim_indicator = dim_indicator.merge(unit_map, on='indicator_name', how='left')
unit_map.columns = ['indicator_name', 'unit']
dim_indicator = dim_indicator.merge(unit_map, on='indicator_name', how='left')
else:
dim_indicator['unit'] = None
# Direction
if has_direction:
dir_map = self.df_clean[['indicator_standardized', 'direction']].drop_duplicates()
dir_map.columns = ['indicator_name', 'direction']
dim_indicator = dim_indicator.merge(dir_map, on='indicator_name', how='left')
dir_map.columns = ['indicator_name', 'direction']
dim_indicator = dim_indicator.merge(dir_map, on='indicator_name', how='left')
self.logger.info(" [OK] direction column from cleaned_integrated")
else:
dim_indicator['direction'] = 'higher_better'
self.logger.warning(" [WARN] direction not found, default: higher_better")
# Indicator category
if has_category:
cat_map = self.df_clean[['indicator_standardized', 'indicator_category']].drop_duplicates()
cat_map.columns = ['indicator_name', 'indicator_category']
dim_indicator = dim_indicator.merge(cat_map, on='indicator_name', how='left')
cat_map = self.df_clean[
['indicator_standardized', 'indicator_category']
].drop_duplicates()
cat_map.columns = ['indicator_name', 'indicator_category']
dim_indicator = dim_indicator.merge(cat_map, on='indicator_name', how='left')
else:
def categorize_indicator(name):
n = str(name).lower()
if any(w in n for w in ['undernourishment', 'malnutrition', 'stunting',
'wasting', 'anemia', 'food security', 'food insecure', 'hunger']):
if any(w in n for w in [
'undernourishment', 'malnutrition', 'stunting',
'wasting', 'anemia', 'anaemia', 'food security',
'food insecure', 'hunger'
]):
return 'Health & Nutrition'
elif any(w in n for w in ['production', 'yield', 'cereal', 'crop',
'import dependency', 'share of dietary']):
elif any(w in n for w in [
'production', 'yield', 'cereal', 'crop',
'import dependency', 'share of dietary'
]):
return 'Agricultural Production'
elif any(w in n for w in ['import', 'export', 'trade']):
return 'Trade'
elif any(w in n for w in ['gdp', 'income', 'economic']):
return 'Economic'
elif any(w in n for w in ['water', 'sanitation', 'infrastructure', 'rail']):
elif any(w in n for w in [
'water', 'sanitation', 'infrastructure', 'rail'
]):
return 'Infrastructure'
else:
return 'Other'
dim_indicator['indicator_category'] = dim_indicator['indicator_name'].apply(categorize_indicator)
dim_indicator['indicator_category'] = dim_indicator['indicator_name'].apply(
categorize_indicator
)
# Framework — KOLOM BARU
# Dibaca dari cleaned_integrated yang sudah menjalankan assign_framework().
# Jika kolom belum ada (misal pipeline lama), fallback ke 'MDGs' dengan warning.
if has_framework:
fw_map = self.df_clean[
['indicator_standardized', 'framework']
].drop_duplicates()
fw_map.columns = ['indicator_name', 'framework']
dim_indicator = dim_indicator.merge(fw_map, on='indicator_name', how='left')
# Pastikan tidak ada NULL setelah merge
dim_indicator['framework'] = dim_indicator['framework'].fillna('MDGs')
self.logger.info(" [OK] framework column from cleaned_integrated")
else:
dim_indicator['framework'] = 'MDGs'
self.logger.warning(
" [WARN] framework column not found in cleaned_integrated. "
"Default: MDGs. Jalankan bigquery_cleaned_layer.py terlebih dahulu."
)
dim_indicator = dim_indicator.drop_duplicates(subset=['indicator_name'], keep='first')
dim_indicator = dim_indicator.drop_duplicates(subset=['indicator_name'], keep='first')
dim_indicator_final = dim_indicator[
['indicator_name', 'indicator_category', 'unit', 'direction']
['indicator_name', 'indicator_category', 'unit', 'direction', 'framework']
].copy()
dim_indicator_final = dim_indicator_final.reset_index(drop=True)
dim_indicator_final.insert(0, 'indicator_id', range(1, len(dim_indicator_final) + 1))
@@ -366,6 +414,7 @@ class DimensionalModelLoader:
bigquery.SchemaField("indicator_category", "STRING", mode="REQUIRED"),
bigquery.SchemaField("unit", "STRING", mode="NULLABLE"),
bigquery.SchemaField("direction", "STRING", mode="REQUIRED"),
bigquery.SchemaField("framework", "STRING", mode="REQUIRED"),
]
rows_loaded = load_to_bigquery(
@@ -374,17 +423,23 @@ class DimensionalModelLoader:
)
self._add_primary_key(table_name, 'indicator_id')
for label, col in [('Categories', 'indicator_category'), ('Direction', 'direction')]:
# Log distribusi
for label, col in [
('Categories', 'indicator_category'),
('Direction', 'direction'),
('Framework', 'framework'),
]:
self.logger.info(f" {label}:")
for val, cnt in dim_indicator_final[col].value_counts().items():
self.logger.info(f" - {val}: {cnt} ({cnt/len(dim_indicator_final)*100:.1f}%)")
pct = cnt / len(dim_indicator_final) * 100
self.logger.info(f" - {val}: {cnt} ({pct:.1f}%)")
self.load_metadata[table_name].update(
{'rows_loaded': rows_loaded, 'status': 'success', 'end_time': datetime.now()}
)
log_update(self.client, 'DW', table_name, 'full_load', rows_loaded)
self._save_table_metadata(table_name)
self.logger.info(f" dim_indicator: {rows_loaded} rows\n")
self.logger.info(f" dim_indicator: {rows_loaded} rows\n")
return rows_loaded
except Exception as e:
@@ -395,7 +450,7 @@ class DimensionalModelLoader:
def load_dim_source(self):
table_name = 'dim_source'
self.load_metadata[table_name]['start_time'] = datetime.now()
self.logger.info("Loading dim_source [DW/Gold] fs_asean_gold...")
self.logger.info("Loading dim_source -> [DW/Gold] fs_asean_gold...")
try:
source_details = {
@@ -455,7 +510,7 @@ class DimensionalModelLoader:
)
log_update(self.client, 'DW', table_name, 'full_load', rows_loaded)
self._save_table_metadata(table_name)
self.logger.info(f" dim_source: {rows_loaded} rows\n")
self.logger.info(f" dim_source: {rows_loaded} rows\n")
return rows_loaded
except Exception as e:
@@ -466,7 +521,7 @@ class DimensionalModelLoader:
def load_dim_pillar(self):
table_name = 'dim_pillar'
self.load_metadata[table_name]['start_time'] = datetime.now()
self.logger.info("Loading dim_pillar [DW/Gold] fs_asean_gold...")
self.logger.info("Loading dim_pillar -> [DW/Gold] fs_asean_gold...")
try:
pillar_codes = {
@@ -501,7 +556,7 @@ class DimensionalModelLoader:
)
log_update(self.client, 'DW', table_name, 'full_load', rows_loaded)
self._save_table_metadata(table_name)
self.logger.info(f" dim_pillar: {rows_loaded} rows\n")
self.logger.info(f" dim_pillar: {rows_loaded} rows\n")
return rows_loaded
except Exception as e:
@@ -516,10 +571,9 @@ class DimensionalModelLoader:
def load_fact_food_security(self):
table_name = 'fact_food_security'
self.load_metadata[table_name]['start_time'] = datetime.now()
self.logger.info("Loading fact_food_security [DW/Gold] fs_asean_gold...")
self.logger.info("Loading fact_food_security -> [DW/Gold] fs_asean_gold...")
try:
# Load dims dari Gold untuk FK resolution
dim_country = read_from_bigquery(self.client, 'dim_country', layer='gold')
dim_indicator = read_from_bigquery(self.client, 'dim_indicator', layer='gold')
dim_time = read_from_bigquery(self.client, 'dim_time', layer='gold')
@@ -561,9 +615,9 @@ class DimensionalModelLoader:
fact_table['start_year'] = fact_table['year'].astype(int)
fact_table['end_year'] = fact_table['year'].astype(int)
# Resolve FKs
fact_table = fact_table.merge(
dim_country[['country_id', 'country_name']].rename(columns={'country_name': 'country'}),
dim_country[['country_id', 'country_name']].rename(
columns={'country_name': 'country'}),
on='country', how='left'
)
fact_table = fact_table.merge(
@@ -576,15 +630,16 @@ class DimensionalModelLoader:
on=['start_year', 'end_year'], how='left'
)
fact_table = fact_table.merge(
dim_source[['source_id', 'source_name']].rename(columns={'source_name': 'source'}),
dim_source[['source_id', 'source_name']].rename(
columns={'source_name': 'source'}),
on='source', how='left'
)
fact_table = fact_table.merge(
dim_pillar[['pillar_id', 'pillar_name']].rename(columns={'pillar_name': 'pillar'}),
dim_pillar[['pillar_id', 'pillar_name']].rename(
columns={'pillar_name': 'pillar'}),
on='pillar', how='left'
)
# Filter hanya row dengan FK lengkap
fact_table = fact_table[
fact_table['country_id'].notna() &
fact_table['indicator_id'].notna() &
@@ -621,7 +676,6 @@ class DimensionalModelLoader:
layer='gold', write_disposition="WRITE_TRUNCATE", schema=schema
)
# Add PK + FKs
self._add_primary_key(table_name, 'fact_id')
self._add_foreign_key(table_name, 'country_id', 'dim_country', 'country_id')
self._add_foreign_key(table_name, 'indicator_id', 'dim_indicator', 'indicator_id')
@@ -634,7 +688,7 @@ class DimensionalModelLoader:
)
log_update(self.client, 'DW', table_name, 'full_load', rows_loaded)
self._save_table_metadata(table_name)
self.logger.info(f" fact_food_security: {rows_loaded:,} rows\n")
self.logger.info(f" fact_food_security: {rows_loaded:,} rows\n")
return rows_loaded
except Exception as e:
@@ -712,16 +766,36 @@ class DimensionalModelLoader:
self.logger.info(f" Unique Sources : {int(stats['unique_sources']):>10,}")
self.logger.info(f" Unique Pillars : {int(stats['unique_pillars']):>10,}")
# Validasi distribusi framework di dim_indicator
query_fw = f"""
SELECT framework, COUNT(*) AS count
FROM `{get_table_id('dim_indicator', layer='gold')}`
GROUP BY framework ORDER BY framework
"""
df_fw = self.client.query(query_fw).result().to_dataframe(
create_bqstorage_client=False
)
if len(df_fw) > 0:
self.logger.info(f"\n Framework Distribution (dim_indicator):")
for _, row in df_fw.iterrows():
self.logger.info(
f" {row['framework']:10s}: {int(row['count']):>5,} indicators"
)
query_dir = f"""
SELECT direction, COUNT(*) AS count
FROM `{get_table_id('dim_indicator', layer='gold')}`
GROUP BY direction ORDER BY direction
"""
df_dir = self.client.query(query_dir).result().to_dataframe(create_bqstorage_client=False)
df_dir = self.client.query(query_dir).result().to_dataframe(
create_bqstorage_client=False
)
if len(df_dir) > 0:
self.logger.info(f"\n Direction Distribution:")
for _, row in df_dir.iterrows():
self.logger.info(f" {row['direction']:15s}: {int(row['count']):>5,} indicators")
self.logger.info(
f" {row['direction']:15s}: {int(row['count']):>5,} indicators"
)
self.logger.info("\n [OK] Validation completed")
except Exception as e:
@@ -738,22 +812,19 @@ class DimensionalModelLoader:
self.pipeline_metadata['rows_fetched'] = len(self.df_clean)
self.logger.info("\n" + "=" * 60)
self.logger.info("DIMENSIONAL MODEL LOAD — DW (Gold) fs_asean_gold")
self.logger.info("DIMENSIONAL MODEL LOAD — DW (Gold) -> fs_asean_gold")
self.logger.info("=" * 60)
# Dimensions
self.logger.info("\nLOADING DIMENSION TABLES → fs_asean_gold")
self.logger.info("\nLOADING DIMENSION TABLES -> fs_asean_gold")
self.load_dim_country()
self.load_dim_indicator()
self.load_dim_time()
self.load_dim_source()
self.load_dim_pillar()
# Fact
self.logger.info("\nLOADING FACT TABLE → fs_asean_gold")
self.logger.info("\nLOADING FACT TABLE -> fs_asean_gold")
self.load_fact_food_security()
# Validate
self.validate_constraints()
self.validate_data_load()
@@ -762,22 +833,23 @@ class DimensionalModelLoader:
total_loaded = sum(m['rows_loaded'] for m in self.load_metadata.values())
self.pipeline_metadata.update({
'end_time' : pipeline_end,
'duration_seconds' : duration,
'rows_transformed' : total_loaded,
'rows_loaded' : total_loaded,
'end_time' : pipeline_end,
'duration_seconds' : duration,
'rows_transformed' : total_loaded,
'rows_loaded' : total_loaded,
'execution_timestamp': self.pipeline_metadata['start_time'],
'completeness_pct' : 100.0,
'config_snapshot' : json.dumps({'load_mode': 'full_refresh', 'layer': 'gold'}),
'validation_metrics': json.dumps({t: m['status'] for t, m in self.load_metadata.items()}),
'table_name' : 'dimensional_model_pipeline',
'completeness_pct' : 100.0,
'config_snapshot' : json.dumps({'load_mode': 'full_refresh', 'layer': 'gold'}),
'validation_metrics' : json.dumps(
{t: m['status'] for t, m in self.load_metadata.items()}
),
'table_name' : 'dimensional_model_pipeline',
})
try:
save_etl_metadata(self.client, self.pipeline_metadata)
except Exception as e:
self.logger.warning(f" [WARN] Could not save pipeline metadata: {e}")
# Summary
self.logger.info("\n" + "=" * 60)
self.logger.info("DIMENSIONAL MODEL LOAD COMPLETED")
self.logger.info("=" * 60)
@@ -785,20 +857,19 @@ class DimensionalModelLoader:
self.logger.info(f" Duration : {duration:.2f}s")
self.logger.info(f" Tables :")
for tbl, meta in self.load_metadata.items():
icon = "" if meta['status'] == 'success' else ""
self.logger.info(f" {icon} {tbl:25s}: {meta['rows_loaded']:>10,} rows")
self.logger.info(f"\n Metadata [AUDIT] etl_metadata")
icon = "OK" if meta['status'] == 'success' else "FAIL"
self.logger.info(f" [{icon}] {tbl:25s}: {meta['rows_loaded']:>10,} rows")
self.logger.info(f"\n Metadata -> [AUDIT] etl_metadata")
self.logger.info("=" * 60)
# =============================================================================
# AIRFLOW TASK FUNCTIONS ← sama polanya dengan raw & cleaned layer
# AIRFLOW TASK FUNCTIONS
# =============================================================================
def run_dimensional_model():
"""
Airflow task: Load dimensional model dari cleaned_integrated.
Dipanggil oleh DAG setelah task cleaned_integration_to_silver selesai.
"""
from scripts.bigquery_config import get_bigquery_client
@@ -817,9 +888,9 @@ if __name__ == "__main__":
print("=" * 60)
print("BIGQUERY DIMENSIONAL MODEL LOAD")
print("Kimball DW Architecture")
print(" Input : STAGING (Silver) cleaned_integrated (fs_asean_silver)")
print(" Output : DW (Gold) dim_*, fact_* (fs_asean_gold)")
print(" Audit : AUDIT etl_logs, etl_metadata (fs_asean_audit)")
print(" Input : STAGING (Silver) -> cleaned_integrated (fs_asean_silver)")
print(" Output : DW (Gold) -> dim_*, fact_* (fs_asean_gold)")
print(" Audit : AUDIT -> etl_logs, etl_metadata (fs_asean_audit)")
print("=" * 60)
logger = setup_logging()
@@ -827,24 +898,26 @@ if __name__ == "__main__":
print("\nLoading cleaned_integrated (fs_asean_silver)...")
df_clean = read_from_bigquery(client, 'cleaned_integrated', layer='silver')
print(f" Loaded : {len(df_clean):,} rows")
print(f" Loaded : {len(df_clean):,} rows")
print(f" Columns : {len(df_clean.columns)}")
print(f" Sources : {df_clean['source'].nunique()}")
print(f" Indicators : {df_clean['indicator_standardized'].nunique()}")
print(f" Countries : {df_clean['country'].nunique()}")
print(f" Year range : {int(df_clean['year'].min())}{int(df_clean['year'].max())}")
print(f" Year range : {int(df_clean['year'].min())}-{int(df_clean['year'].max())}")
if 'direction' in df_clean.columns:
print(f" Direction : {df_clean['direction'].value_counts().to_dict()}")
if 'framework' in df_clean.columns:
print(f" Framework : {df_clean['framework'].value_counts().to_dict()}")
else:
print(f" [WARN] direction column not found — run bigquery_cleaned_layer.py first")
print(" [WARN] framework column not found — run bigquery_cleaned_layer.py first")
print("\n[1/1] Dimensional Model Load DW (Gold)...")
print("\n[1/1] Dimensional Model Load -> DW (Gold)...")
loader = DimensionalModelLoader(client, df_clean)
loader.run()
print("\n" + "=" * 60)
print(" DIMENSIONAL MODEL ETL COMPLETED")
print(" 🥇 DW (Gold) : dim_country, dim_indicator, dim_time,")
print(" dim_source, dim_pillar, fact_food_security")
print(" 📋 AUDIT : etl_logs, etl_metadata")
print("[OK] DIMENSIONAL MODEL ETL COMPLETED")
print(" DW (Gold) : dim_country, dim_indicator (+ framework),")
print(" dim_time, dim_source, dim_pillar, fact_food_security")
print(" AUDIT : etl_logs, etl_metadata")
print("=" * 60)