sdgs era v5

This commit is contained in:
Debby
2026-04-01 08:04:19 +07:00
parent 236d4b4dc8
commit db60e6e414

View File

@@ -8,31 +8,27 @@ Filtering Order:
3. Filter complete indicators PER COUNTRY (auto-detect start year, no gaps)
4. Filter countries with ALL pillars (FIXED SET)
5. Filter indicators with consistent presence across FIXED countries
→ TIDAK menghapus baris year < max_start_year
→ Semua baris tetap ada; label framework ditentukan di Step 6
6. Assign framework (MDGs/SDGs) per indicator PER ROW
7. Verify no gaps
8. Calculate norm_value_1_100 per indicator per country (min-max, direction-aware)
→ Indikator TIDAK di SDG_ONLY_KEYWORDS → 'MDGs' selalu
→ Indikator DI SDG_ONLY_KEYWORDS + year >= sdg_transition_year → 'SDGs'
→ Indikator DI SDG_ONLY_KEYWORDS + year < sdg_transition_year → 'MDGs'
→ sdg_transition_year = min(actual_start_year) dari semua SDG-only indicators
yang lolos filter (= tahun pertama data SDG-only konsisten di semua countries)
7. Verify no gaps (dari actual_start_year per indikator, bukan start_year global)
8. Calculate norm_value_1_100 per indicator (min-max, direction-aware, global)
9. Calculate YoY per indicator per country
10. Analyze indicator availability by year
11. Save analytical table
NORMALISASI (Step 8):
- norm_value_1_100 = min-max normalisasi nilai raw per indikator, skala 1-100
- Direction-aware: lower_better diinvert sehingga nilai tinggi selalu = lebih baik
- Normalisasi dilakukan GLOBAL per indikator (semua negara, semua tahun sekaligus)
sehingga nilai antar negara dan antar tahun tetap comparable
- Kolom ini memungkinkan perbandingan antar indikator yang berbeda satuan di Looker Studio
FRAMEWORK LOGIC (FIX - Per Indicator, Per Row):
- Framework di-assign PER BARIS dengan mempertimbangkan actual_start_year MASING-MASING
indikator, bukan satu sdg_start_year global.
- Logika:
* Jika nama indikator TIDAK ada di SDG_ONLY_KEYWORDS → selalu 'MDGs' (semua tahun)
* Jika nama indikator ADA di SDG_ONLY_KEYWORDS:
- row['year'] >= actual_start_year[indicator] → 'SDGs'
- row['year'] < actual_start_year[indicator] → 'MDGs'
- Baris dengan year < actual_start_year TETAP ADA di data (tidak dihapus di Step 5),
hanya mendapat label 'MDGs'.
- actual_start_year per indikator = max(min_year per country) setelah Step 3-4 filter
FRAMEWORK LOGIC:
- sdg_transition_year dihitung SATU KALI dari actual_start_year SDG-only indicators
- Semua SDG-only indicators menggunakan sdg_transition_year yang SAMA
sehingga label berubah serentak di satu titik waktu
- Baris sebelum sdg_transition_year → 'MDGs' (data tetap ada, tidak dihapus)
- Baris mulai sdg_transition_year → 'SDGs'
- Indikator non-SDG-only → 'MDGs' selalu
"""
import pandas as pd
@@ -62,7 +58,7 @@ from google.cloud import bigquery
# SDG-ONLY INDICATOR KEYWORDS
# =============================================================================
# Hanya indikator yang MURNI BARU di era SDGs yang didaftarkan di sini.
# Indikator di set ini → 'SDGs' mulai dari actual_start_year indikator tersebut.
# Indikator di set ini → 'SDGs' mulai dari sdg_transition_year.
# Semua indikator lain (shared maupun tidak dikenal) → 'MDGs' di semua tahun.
SDG_ONLY_KEYWORDS = frozenset([
@@ -122,13 +118,14 @@ class AnalyticalLayerLoader:
norm_value_1_100,
yoy_change, yoy_pct
FRAMEWORK LOGIC (FIX):
FRAMEWORK LOGIC:
- Indikator TIDAK di SDG_ONLY_KEYWORDS → 'MDGs' di SEMUA tahun
- Indikator DI SDG_ONLY_KEYWORDS:
year >= actual_start_year[indikator]'SDGs'
year < actual_start_year[indikator]'MDGs'
- actual_start_year per indikator = max(min_year per country) setelah Step 3-4 filter
- Baris year < actual_start_year TETAP ADA, hanya berlabel 'MDGs'
year < sdg_transition_year'MDGs' (data tetap ada, tidak dihapus)
year >= sdg_transition_year'SDGs'
- sdg_transition_year = min(actual_start_year) dari semua SDG-only indicators
yang lolos filter Step 3-5. Semua SDG-only indicators menggunakan
sdg_transition_year yang SAMA agar label berubah serentak.
"""
def __init__(self, client: bigquery.Client):
@@ -143,13 +140,12 @@ class AnalyticalLayerLoader:
self.selected_country_ids = None
self.indicator_max_start_map = {} # indicator_id → max_start_year (dari Step 5)
self.sdg_transition_year = None # tahun SDGs mulai berlaku (dari Step 6)
self.start_year = 2013
self.end_year = None
self.baseline_year = 2023
self.sdg_start_year = None # disimpan untuk metadata/logging saja
self.pipeline_metadata = {
'source_class' : self.__class__.__name__,
'start_time' : None,
@@ -398,6 +394,8 @@ class AnalyticalLayerLoader:
self.logger.info("STEP 5: FILTER INDICATORS WITH CONSISTENT PRESENCE")
self.logger.info("=" * 80)
# Hitung max_start_year per indikator = max(min_year per country)
# = tahun pertama di mana SEMUA fixed countries sudah punya data
indicator_country_start = self.df_clean.groupby([
'indicator_id', 'indicator_name', 'country_id'
])['year'].min().reset_index()
@@ -426,6 +424,8 @@ class AnalyticalLayerLoader:
})
continue
# Cek apakah semua tahun dari max_start s/d end_year
# hadir di SEMUA fixed countries
expected_years = list(range(max_start, self.end_year + 1))
ind_data = self.df_clean[self.df_clean['indicator_id'] == indicator_id]
all_years_complete = True
@@ -452,50 +452,53 @@ class AnalyticalLayerLoader:
raise ValueError("No valid indicators found after filtering!")
# ----------------------------------------------------------------
# Filter hanya indikator yang valid
# TIDAK menghapus baris year < max_start_year
# semua baris tetap ada, label framework ditentukan di Step 6
# Filter hanya indikator yang valid.
# PENTING: TIDAK menghapus baris year < max_start_year.
# Semua baris tetap ada label framework ditentukan di Step 6.
# max_start_year disimpan sebagai lookup untuk Step 6 & 7.
# ----------------------------------------------------------------
original_count = len(self.df_clean)
self.df_clean = self.df_clean[
self.df_clean['indicator_id'].isin(valid_indicators)
].copy()
# Simpan max_start_year sebagai lookup untuk Step 6
# Simpan max_start_year per indicator_id untuk Step 6 dan Step 7
self.indicator_max_start_map = (
indicator_max_start[indicator_max_start['indicator_id'].isin(valid_indicators)]
.set_index('indicator_id')['max_start_year']
.to_dict()
)
self.logger.info(f"\n Rows before: {original_count:,}")
self.logger.info(f" Rows after: {len(self.df_clean):,}")
self.logger.info(f" Countries: {self.df_clean['country_id'].nunique()}")
self.logger.info(f" Indicators: {self.df_clean['indicator_id'].nunique()}")
self.logger.info(f" Pillars: {self.df_clean['pillar_id'].nunique()}")
self.logger.info(f"\n Rows before : {original_count:,}")
self.logger.info(f" Rows after : {len(self.df_clean):,}")
self.logger.info(f" Countries : {self.df_clean['country_id'].nunique()}")
self.logger.info(f" Indicators : {self.df_clean['indicator_id'].nunique()}")
self.logger.info(f" Pillars : {self.df_clean['pillar_id'].nunique()}")
self.logger.info(
f"\n [NOTE] Baris year < max_start_year TETAP ADA di data. "
f"Label framework akan ditentukan di Step 6."
)
return self.df_clean
# ------------------------------------------------------------------
# STEP 6: ASSIGN FRAMEWORK PER ROW (per-indicator actual_start_year)
# STEP 6: ASSIGN FRAMEWORK PER ROW
# ------------------------------------------------------------------
def determine_sdg_start_year(self):
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 6: ASSIGN FRAMEWORK PER ROW (per-indicator actual_start_year)")
self.logger.info("STEP 6: ASSIGN FRAMEWORK PER ROW")
self.logger.info("=" * 80)
# ----------------------------------------------------------------
# Hitung actual_start_year PER INDIKATOR dari indicator_max_start_map
# yang sudah dihitung di Step 5.
# actual_start_year = max(min_year per country) per indikator
# = tahun di mana semua fixed countries sudah punya data
# Bangun tabel actual_start_year per indikator dari
# indicator_max_start_map yang sudah ditetapkan di Step 5.
# ----------------------------------------------------------------
indicator_actual_start = pd.DataFrame([
{'indicator_id': ind_id, 'actual_start_year': start_yr}
{'indicator_id': ind_id, 'actual_start_year': int(start_yr)}
for ind_id, start_yr in self.indicator_max_start_map.items()
])
# Merge indicator_name untuk keperluan logging
# Merge indicator_name untuk logging
indicator_actual_start = indicator_actual_start.merge(
self.df_clean[['indicator_id', 'indicator_name']].drop_duplicates(),
on='indicator_id', how='left'
@@ -508,91 +511,95 @@ class AnalyticalLayerLoader:
.isin(SDG_ONLY_KEYWORDS)
)
# sdg_start_year global = min(actual_start_year dari SDG-only indicators)
# Disimpan hanya untuk metadata/logging
# ----------------------------------------------------------------
# sdg_transition_year = min(actual_start_year) dari semua SDG-only
# indicators yang lolos filter.
# Ini adalah satu titik waktu di mana semua SDG-only indicators
# berubah dari 'MDGs' ke 'SDGs' secara SERENTAK.
# ----------------------------------------------------------------
sdg_only_df = indicator_actual_start[indicator_actual_start['is_sdg_only']]
if sdg_only_df.empty:
raise ValueError(
"Tidak ada indikator SDG-only (FIES/anaemia) yang lolos filter. "
"Pastikan indikator FIES dan anaemia ada di data."
)
self.sdg_start_year = int(sdg_only_df['actual_start_year'].min())
self.sdg_transition_year = int(sdg_only_df['actual_start_year'].min())
self.logger.info(f"\n SDG-only indicators dan actual_start_year masing-masing:")
self.logger.info(f" {'-'*80}")
for _, row in indicator_actual_start[indicator_actual_start['is_sdg_only']].iterrows():
for _, row in sdg_only_df.iterrows():
self.logger.info(
f" [SDG-only] start={int(row['actual_start_year'])} | {row['indicator_name']}"
f" [SDG-only] actual_start={int(row['actual_start_year'])} | "
f"{row['indicator_name']}"
)
self.logger.info(
f"\n sdg_start_year (earliest SDG-only, for metadata): {self.sdg_start_year}"
f"\n sdg_transition_year = {self.sdg_transition_year} "
f"(min actual_start_year dari semua SDG-only indicators)"
)
# Lookup: indicator_id → actual_start_year (hanya SDG-only, untuk logging)
sdg_only_start_map = (
indicator_actual_start[indicator_actual_start['is_sdg_only']]
.set_index('indicator_id')['actual_start_year']
.to_dict()
)
self.logger.info(f"\n Logika assign framework (PER BARIS, PER INDIKATOR):")
self.logger.info(f" ─────────────────────────────────────────────────────")
self.logger.info(f" Jika indikator TIDAK di SDG_ONLY_KEYWORDS:")
self.logger.info(f"'MDGs' di semua tahun (shared indicators)")
self.logger.info(f" Jika indikator DI SDG_ONLY_KEYWORDS:")
self.logger.info(f" year >= actual_start_year[indikator] → 'SDGs'")
self.logger.info(f" year < actual_start_year[indikator] → 'MDGs'")
self.logger.info(f" ─────────────────────────────────────────────────────")
self.logger.info(f"\n Logika assign framework (PER BARIS):")
self.logger.info(f" ──────────────────────────────────────────────────────────")
self.logger.info(f" Indikator TIDAK di SDG_ONLY_KEYWORDS:")
self.logger.info(f"'MDGs' di semua tahun")
self.logger.info(f" Indikator DI SDG_ONLY_KEYWORDS:")
self.logger.info(f" year < {self.sdg_transition_year}'MDGs' (data tetap ada)")
self.logger.info(f" year >= {self.sdg_transition_year}'SDGs'")
self.logger.info(f" ──────────────────────────────────────────────────────────")
# ----------------------------------------------------------------
# Assign framework dengan vectorized merge
# Assign framework dengan vectorized operation menggunakan
# sdg_transition_year (SATU nilai untuk semua SDG-only indicators)
# ----------------------------------------------------------------
self.df_clean = self.df_clean.merge(
indicator_actual_start[['indicator_id', 'is_sdg_only', 'actual_start_year']],
on='indicator_id',
how='left'
# Tandai apakah setiap baris adalah SDG-only indicator
sdg_only_ids = set(
indicator_actual_start.loc[
indicator_actual_start['is_sdg_only'], 'indicator_id'
]
)
self.df_clean['_is_sdg_only'] = self.df_clean['indicator_id'].isin(sdg_only_ids)
# Assign framework:
# - Jika bukan SDG-only → 'MDGs'
# - Jika SDG-only AND year >= actual_start_year → 'SDGs'
# - Jika SDG-only AND year < actual_start_year → 'MDGs'
# - Bukan SDG-only → 'MDGs'
# - SDG-only AND year >= sdg_transition_year → 'SDGs'
# - SDG-only AND year < sdg_transition_year → 'MDGs'
self.df_clean['framework'] = np.where(
self.df_clean['is_sdg_only'] & (self.df_clean['year'] >= self.df_clean['actual_start_year']),
self.df_clean['_is_sdg_only'] &
(self.df_clean['year'] >= self.sdg_transition_year),
'SDGs',
'MDGs'
)
# Drop kolom bantu
self.df_clean = self.df_clean.drop(columns=['is_sdg_only', 'actual_start_year'])
self.df_clean = self.df_clean.drop(columns=['_is_sdg_only'])
# ----------------------------------------------------------------
# Log verifikasi per indikator
# ----------------------------------------------------------------
self.logger.info(f"\n Verifikasi framework per indikator:")
self.logger.info(f" {'-'*105}")
self.logger.info(f" {'-'*110}")
self.logger.info(
f" {'ID':<5} {'Indicator Name':<52} {'Start':<8} "
f"{'MDGs rows':<12} {'SDGs rows':<12} {'Expected'}"
f" {'ID':<5} {'Indicator Name':<52} {'Data From':<12} "
f"{'MDGs rows':<12} {'SDGs rows':<12} {'Note'}"
)
self.logger.info(f" {'-'*105}")
self.logger.info(f" {'-'*110}")
for ind_id, grp in self.df_clean.groupby('indicator_id'):
ind_name = grp['indicator_name'].iloc[0]
mdgs_rows = (grp['framework'] == 'MDGs').sum()
sdgs_rows = (grp['framework'] == 'SDGs').sum()
is_sdg_only = ind_name.lower().strip() in SDG_ONLY_KEYWORDS
start_yr = int(grp['year'].min())
is_sdg_only = ind_id in sdg_only_ids
data_from = int(grp['year'].min())
if is_sdg_only:
ind_start = sdg_only_start_map.get(ind_id, '?')
expected = f"SDGs from {ind_start}, MDGs before"
note = f"SDGs from {self.sdg_transition_year}, MDGs before"
else:
expected = "MDGs always"
note = "MDGs always"
self.logger.info(
f" {int(ind_id):<5} {ind_name[:50]:<52} {start_yr:<8} "
f"{mdgs_rows:<12} {sdgs_rows:<12} {expected}"
f" {int(ind_id):<5} {ind_name[:50]:<52} {data_from:<12} "
f"{mdgs_rows:<12} {sdgs_rows:<12} {note}"
)
fw_summary = self.df_clean['framework'].value_counts()
@@ -626,6 +633,8 @@ class AnalyticalLayerLoader:
# Verifikasi dilakukan PER INDIKATOR dari actual_start_year-nya,
# bukan dari self.start_year global, karena tiap indikator bisa
# punya start year berbeda.
# Baris sebelum actual_start_year (yang berlabel MDGs) tidak dicek
# karena memang tidak semua country punya data di sana.
# ----------------------------------------------------------------
expected_countries = len(self.selected_country_ids)
all_good = True
@@ -650,7 +659,8 @@ class AnalyticalLayerLoader:
if all_good:
self.logger.info(
f" VERIFICATION PASSED — all combinations have {expected_countries} countries"
f" VERIFICATION PASSED — all combinations from actual_start_year "
f"have {expected_countries} countries"
)
else:
for row in bad_rows[:10]:
@@ -683,7 +693,10 @@ class AnalyticalLayerLoader:
norm_parts = []
indicators = df.groupby(['indicator_id', 'indicator_name', 'direction'])
self.logger.info(f"\n {'ID':<5} {'Direction':<15} {'Invert':<8} {'Min':>10} {'Max':>10} {'Indicator Name'}")
self.logger.info(
f"\n {'ID':<5} {'Direction':<15} {'Invert':<8} "
f"{'Min':>10} {'Max':>10} {'Indicator Name'}"
)
self.logger.info(f" {'-'*90}")
for (ind_id, ind_name, direction), grp in indicators:
@@ -729,9 +742,14 @@ class AnalyticalLayerLoader:
f"{self.df_clean['norm_value_1_100'].max():.2f}"
)
self.df_clean['_condition_preview'] = self.df_clean['norm_value_1_100'].apply(assign_condition)
self.df_clean['_condition_preview'] = (
self.df_clean['norm_value_1_100'].apply(assign_condition)
)
cond_dist = self.df_clean['_condition_preview'].value_counts()
self.logger.info(f"\n Distribusi kondisi (threshold: bad<{THRESHOLD_BAD}, good>{THRESHOLD_GOOD}):")
self.logger.info(
f"\n Distribusi kondisi "
f"(threshold: bad<{THRESHOLD_BAD}, good>{THRESHOLD_GOOD}):"
)
for cond, cnt in cond_dist.items():
self.logger.info(f" {cond}: {cnt:,} rows")
self.df_clean = self.df_clean.drop(columns=['_condition_preview'])
@@ -909,7 +927,9 @@ class AnalyticalLayerLoader:
.drop_duplicates('indicator_id')['framework']
.value_counts()
)
self.logger.info(f" Framework distribution (indicators at year={self.end_year}):")
self.logger.info(
f" Framework distribution (indicators at year={self.end_year}):"
)
for fw, cnt in fw_dist_ind.items():
self.logger.info(f" {fw}: {cnt} indicators")
@@ -954,28 +974,29 @@ class AnalyticalLayerLoader:
'rows_loaded' : rows_loaded,
'completeness_pct' : 100.0,
'config_snapshot' : json.dumps({
'start_year' : self.start_year,
'end_year' : self.end_year,
'baseline_year' : self.baseline_year,
'sdg_start_year' : self.sdg_start_year,
'fixed_countries' : len(self.selected_country_ids),
'norm_scale' : '1-100 per indicator global minmax direction-aware',
'framework_logic' : (
'per-indicator actual_start_year: '
'SDG-only indicator → SDGs from its own actual_start_year, MDGs before; '
'shared/other indicators → MDGs always'
'start_year' : self.start_year,
'end_year' : self.end_year,
'baseline_year' : self.baseline_year,
'sdg_transition_year' : self.sdg_transition_year,
'fixed_countries' : len(self.selected_country_ids),
'norm_scale' : '1-100 per indicator global minmax direction-aware',
'framework_logic' : (
'sdg_transition_year = min(actual_start_year) dari SDG-only indicators; '
'SDG-only year >= sdg_transition_year → SDGs; '
'SDG-only year < sdg_transition_year → MDGs (data tetap ada); '
'non-SDG-only → MDGs selalu'
),
'sdg_only_keywords_count' : len(SDG_ONLY_KEYWORDS),
'condition_thresholds' : {
'sdg_only_keywords_count': len(SDG_ONLY_KEYWORDS),
'condition_thresholds' : {
'bad' : f'< {THRESHOLD_BAD}',
'moderate': f'{THRESHOLD_BAD}-{THRESHOLD_GOOD}',
'good' : f'> {THRESHOLD_GOOD}',
},
}),
'validation_metrics' : json.dumps({
'fixed_countries' : len(self.selected_country_ids),
'total_indicators': int(self.df_clean['indicator_id'].nunique()),
'sdg_start_year' : self.sdg_start_year,
'fixed_countries' : len(self.selected_country_ids),
'total_indicators' : int(self.df_clean['indicator_id'].nunique()),
'sdg_transition_year': self.sdg_transition_year,
'framework_dist_rows': fw_dist_rows.to_dict(),
})
}
@@ -1000,7 +1021,10 @@ class AnalyticalLayerLoader:
self.logger.info("Output: fact_asean_food_security_selected -> fs_asean_gold")
self.logger.info("Kolom baru: norm_value_1_100 (min-max 1-100, direction-aware)")
self.logger.info(f"Condition threshold: bad<{THRESHOLD_BAD}, good>{THRESHOLD_GOOD}")
self.logger.info("Framework: per-indicator actual_start_year (baris year < actual_start_year tetap ada, berlabel MDGs)")
self.logger.info(
"Framework: SDG-only indicators → SDGs mulai sdg_transition_year, "
"MDGs sebelumnya (data tetap ada). Non-SDG-only → MDGs selalu."
)
self.logger.info("=" * 80)
self.load_source_data()
@@ -1021,12 +1045,12 @@ class AnalyticalLayerLoader:
self.logger.info("\n" + "=" * 80)
self.logger.info("COMPLETED")
self.logger.info("=" * 80)
self.logger.info(f" Duration : {duration:.2f}s")
self.logger.info(f" Year Range : {self.start_year}-{self.end_year}")
self.logger.info(f" SDG Start Yr : {self.sdg_start_year}")
self.logger.info(f" Countries : {len(self.selected_country_ids)}")
self.logger.info(f" Indicators : {self.df_clean['indicator_id'].nunique()}")
self.logger.info(f" Rows Loaded : {self.pipeline_metadata['rows_loaded']:,}")
self.logger.info(f" Duration : {duration:.2f}s")
self.logger.info(f" Year Range : {self.start_year}-{self.end_year}")
self.logger.info(f" SDG Transition Year: {self.sdg_transition_year}")
self.logger.info(f" Countries : {len(self.selected_country_ids)}")
self.logger.info(f" Indicators : {self.df_clean['indicator_id'].nunique()}")
self.logger.info(f" Rows Loaded : {self.pipeline_metadata['rows_loaded']:,}")
# =============================================================================
@@ -1051,7 +1075,10 @@ if __name__ == "__main__":
print("Output: fact_asean_food_security_selected -> fs_asean_gold")
print(f"Norm: min-max 1-100 per indicator, direction-aware")
print(f"Condition threshold: bad<{THRESHOLD_BAD}, good>{THRESHOLD_GOOD}")
print("Framework: per-indicator actual_start_year (baris year < actual_start_year tetap ada, berlabel MDGs)")
print(
"Framework: SDG-only → SDGs mulai sdg_transition_year, MDGs sebelumnya. "
"Non-SDG-only → MDGs selalu."
)
print("=" * 80)
logger = setup_logging()
@@ -1061,6 +1088,6 @@ if __name__ == "__main__":
print("\n" + "=" * 80)
print("[OK] COMPLETED")
print(f" SDG Start Year : {loader.sdg_start_year}")
print(f" Rows Loaded : {loader.pipeline_metadata['rows_loaded']:,}")
print(f" SDG Transition Year : {loader.sdg_transition_year}")
print(f" Rows Loaded : {loader.pipeline_metadata['rows_loaded']:,}")
print("=" * 80)