sdgs year v4

This commit is contained in:
Debby
2026-04-01 07:43:31 +07:00
parent 64e3095e7a
commit 236d4b4dc8

View File

@@ -8,7 +8,7 @@ Filtering Order:
3. Filter complete indicators PER COUNTRY (auto-detect start year, no gaps)
4. Filter countries with ALL pillars (FIXED SET)
5. Filter indicators with consistent presence across FIXED countries
6. Determine SDG start year & assign framework (MDGs/SDGs) per indicator PER ROW
6. Assign framework (MDGs/SDGs) per indicator PER ROW
7. Verify no gaps
8. Calculate norm_value_1_100 per indicator per country (min-max, direction-aware)
9. Calculate YoY per indicator per country
@@ -22,16 +22,17 @@ NORMALISASI (Step 8):
sehingga nilai antar negara dan antar tahun tetap comparable
- Kolom ini memungkinkan perbandingan antar indikator yang berbeda satuan di Looker Studio
FRAMEWORK LOGIC (Row-Level Assignment):
- SDG start year dideteksi dari data: tahun pertama indikator FIES/anaemia lengkap
di semua fixed countries (setelah Step 3-5 filter selesai)
- Framework di-assign PER BARIS (per tahun):
* year < sdg_start_year → selalu 'MDGs' (semua indikator)
* year >= sdg_start_year + nama di SDG_ONLY_KEYWORDS'SDGs'
* selain itu (implisit) 'MDGs'
- Hanya FIES dan anaemia yang masuk SDG_ONLY_KEYWORDS karena murni baru di era SDGs.
- Shared indicators (stunting, wasting, overweight, undernourishment) tidak terdaftar
di SDG_ONLY_KEYWORDS sehingga secara implisit selalu berlabel 'MDGs' di semua tahun.
FRAMEWORK LOGIC (FIX - Per Indicator, Per Row):
- Framework di-assign PER BARIS dengan mempertimbangkan actual_start_year MASING-MASING
indikator, bukan satu sdg_start_year global.
- Logika:
* Jika nama indikator TIDAK ada di SDG_ONLY_KEYWORDS → selalu 'MDGs' (semua tahun)
* Jika nama indikator ADA di SDG_ONLY_KEYWORDS:
- row['year'] >= actual_start_year[indicator]'SDGs'
- row['year'] < actual_start_year[indicator] → 'MDGs'
- Baris dengan year < actual_start_year TETAP ADA di data (tidak dihapus di Step 5),
hanya mendapat label 'MDGs'.
- actual_start_year per indikator = max(min_year per country) setelah Step 3-4 filter
"""
import pandas as pd
@@ -61,8 +62,8 @@ from google.cloud import bigquery
# SDG-ONLY INDICATOR KEYWORDS
# =============================================================================
# Hanya indikator yang MURNI BARU di era SDGs yang didaftarkan di sini.
# Baris dengan year >= sdg_start_year + nama ada di set ini → 'SDGs'.
# Semua indikator lain (shared maupun tidak dikenal) → 'MDGs' secara implisit.
# Indikator di set ini → 'SDGs' mulai dari actual_start_year indikator tersebut.
# Semua indikator lain (shared maupun tidak dikenal) → 'MDGs' di semua tahun.
SDG_ONLY_KEYWORDS = frozenset([
# TARGET 2.1.2 — FIES (SDGs only)
@@ -83,19 +84,9 @@ SDG_ONLY_KEYWORDS = frozenset([
"number of women of reproductive age (15-49 years) affected by anemia (million)",
])
# Proxy keywords untuk deteksi era SDGs dari data (indikator murni baru di SDGs)
_SDG_ERA_PROXY_KEYWORDS = frozenset([
"food insecurity",
"anemia",
"anaemia",
])
# =============================================================================
# THRESHOLD KONDISI (fixed absolute, skala 1-100)
# =============================================================================
# bad : norm_value_1_100 < THRESHOLD_BAD
# good : norm_value_1_100 > THRESHOLD_GOOD
# moderate : di antara keduanya
THRESHOLD_BAD = 40.0
THRESHOLD_GOOD = 60.0
@@ -104,8 +95,6 @@ THRESHOLD_GOOD = 60.0
def assign_condition(norm_value_1_100: float) -> str:
"""
Assign kondisi berdasarkan norm_value_1_100 (skala 1-100, sudah direction-aware).
Nilai tinggi selalu berarti lebih baik (lower_better sudah diinvert).
Returns: 'good' / 'moderate' / 'bad'
"""
if pd.isna(norm_value_1_100):
@@ -117,38 +106,6 @@ def assign_condition(norm_value_1_100: float) -> str:
return 'moderate'
def assign_framework_for_row(
indicator_name: str,
row_year: int,
sdg_start_year: int,
) -> str:
"""
Tentukan framework (MDGs/SDGs) PER BARIS (per tahun).
Logic:
─────────────────────────────────────────────────────────────────────────
RULE 1: row_year < sdg_start_year
→ selalu 'MDGs', tanpa kecuali.
RULE 2: row_year >= sdg_start_year AND nama ada di SDG_ONLY_KEYWORDS
'SDGs'
RULE 3 (implisit): semua kondisi lain
'MDGs'
Ini mencakup shared indicators (stunting, wasting, overweight,
undernourishment) yang tidak terdaftar di SDG_ONLY_KEYWORDS,
sehingga tidak perlu di-list secara eksplisit.
─────────────────────────────────────────────────────────────────────────
"""
if row_year < sdg_start_year:
return 'MDGs'
if str(indicator_name).lower().strip() in SDG_ONLY_KEYWORDS:
return 'SDGs'
return 'MDGs'
# =============================================================================
# ANALYTICAL LAYER CLASS
# =============================================================================
@@ -162,13 +119,16 @@ class AnalyticalLayerLoader:
indicator_id, indicator_name, direction, framework,
pillar_id, pillar_name,
time_id, year, value,
norm_value_1_100, <- min-max norm per indikator, skala 1-100, direction-aware
norm_value_1_100,
yoy_change, yoy_pct
FRAMEWORK LOGIC:
- year < sdg_start_year 'MDGs' (semua indikator)
- year >= sdg_start_year + nama di SDG_ONLY_KEYWORDS → 'SDGs' (FIES + anaemia)
- selain itu (implisit) 'MDGs'
FRAMEWORK LOGIC (FIX):
- Indikator TIDAK di SDG_ONLY_KEYWORDS'MDGs' di SEMUA tahun
- Indikator DI SDG_ONLY_KEYWORDS:
year >= actual_start_year[indikator]'SDGs'
year < actual_start_year[indikator] → 'MDGs'
- actual_start_year per indikator = max(min_year per country) setelah Step 3-4 filter
- Baris year < actual_start_year TETAP ADA, hanya berlabel 'MDGs'
"""
def __init__(self, client: bigquery.Client):
@@ -182,12 +142,13 @@ class AnalyticalLayerLoader:
self.df_pillar = None
self.selected_country_ids = None
self.indicator_max_start_map = {} # indicator_id → max_start_year (dari Step 5)
self.start_year = 2013
self.end_year = None
self.baseline_year = 2023 # hardcode per syarat dosen (tahun terlengkap)
self.baseline_year = 2023
self.sdg_start_year = None
self.sdg_start_year = None # disimpan untuk metadata/logging saja
self.pipeline_metadata = {
'source_class' : self.__class__.__name__,
@@ -490,19 +451,22 @@ class AnalyticalLayerLoader:
if not valid_indicators:
raise ValueError("No valid indicators found after filtering!")
# ----------------------------------------------------------------
# Filter hanya indikator yang valid
# TIDAK menghapus baris year < max_start_year —
# semua baris tetap ada, label framework ditentukan di Step 6
# ----------------------------------------------------------------
original_count = len(self.df_clean)
self.df_clean = self.df_clean[
self.df_clean['indicator_id'].isin(valid_indicators)
].copy()
self.df_clean = self.df_clean.merge(
indicator_max_start[['indicator_id', 'max_start_year']],
on='indicator_id', how='left'
# Simpan max_start_year sebagai lookup untuk Step 6
self.indicator_max_start_map = (
indicator_max_start[indicator_max_start['indicator_id'].isin(valid_indicators)]
.set_index('indicator_id')['max_start_year']
.to_dict()
)
self.df_clean = self.df_clean[
self.df_clean['year'] >= self.df_clean['max_start_year']
].copy()
self.df_clean = self.df_clean.drop('max_start_year', axis=1)
self.logger.info(f"\n Rows before: {original_count:,}")
self.logger.info(f" Rows after: {len(self.df_clean):,}")
@@ -512,74 +476,123 @@ class AnalyticalLayerLoader:
return self.df_clean
# ------------------------------------------------------------------
# STEP 6: DETERMINE SDG START YEAR & ASSIGN FRAMEWORK (ROW-LEVEL)
# STEP 6: ASSIGN FRAMEWORK PER ROW (per-indicator actual_start_year)
# ------------------------------------------------------------------
def determine_sdg_start_year(self):
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 6: DETERMINE SDG START YEAR & ASSIGN FRAMEWORK (ROW-LEVEL)")
self.logger.info("STEP 6: ASSIGN FRAMEWORK PER ROW (per-indicator actual_start_year)")
self.logger.info("=" * 80)
indicator_actual_start = (
self.df_clean
.groupby(['indicator_id', 'indicator_name', 'country_id'])['year']
.min().reset_index()
.groupby(['indicator_id', 'indicator_name'])['year']
.max().reset_index()
)
indicator_actual_start.columns = ['indicator_id', 'indicator_name', 'actual_start_year']
# ----------------------------------------------------------------
# Hitung actual_start_year PER INDIKATOR dari indicator_max_start_map
# yang sudah dihitung di Step 5.
# actual_start_year = max(min_year per country) per indikator
# = tahun di mana semua fixed countries sudah punya data
# ----------------------------------------------------------------
indicator_actual_start = pd.DataFrame([
{'indicator_id': ind_id, 'actual_start_year': start_yr}
for ind_id, start_yr in self.indicator_max_start_map.items()
])
# Deteksi sdg_start_year dari proxy SDGs-only (FIES & anaemia)
proxy_mask = indicator_actual_start['indicator_name'].str.lower().apply(
lambda n: any(kw in n for kw in _SDG_ERA_PROXY_KEYWORDS)
# Merge indicator_name untuk keperluan logging
indicator_actual_start = indicator_actual_start.merge(
self.df_clean[['indicator_id', 'indicator_name']].drop_duplicates(),
on='indicator_id', how='left'
)
df_proxy = indicator_actual_start[proxy_mask]
if df_proxy.empty:
# Tandai mana yang SDG-only
indicator_actual_start['is_sdg_only'] = (
indicator_actual_start['indicator_name']
.str.lower().str.strip()
.isin(SDG_ONLY_KEYWORDS)
)
# sdg_start_year global = min(actual_start_year dari SDG-only indicators)
# Disimpan hanya untuk metadata/logging
sdg_only_df = indicator_actual_start[indicator_actual_start['is_sdg_only']]
if sdg_only_df.empty:
raise ValueError(
"Tidak ada indikator proxy SDGs (FIES/anaemia) yang lolos filter. "
"Tidak ada indikator SDG-only (FIES/anaemia) yang lolos filter. "
"Pastikan indikator FIES dan anaemia ada di data."
)
self.sdg_start_year = int(sdg_only_df['actual_start_year'].min())
self.sdg_start_year = int(df_proxy['actual_start_year'].min())
self.logger.info(f"\n sdg_start_year = {self.sdg_start_year}")
self.logger.info(f" Proxy indicators (penentu sdg_start_year):")
for _, row in df_proxy.iterrows():
self.logger.info(f" [{int(row['actual_start_year'])}] {row['indicator_name']}")
self.logger.info(f"\n Assigning framework PER ROW...")
self.logger.info(f" year < {self.sdg_start_year} → MDGs (semua indikator)")
self.logger.info(f" year >= {self.sdg_start_year} + nama in SDG_ONLY_KEYWORDS → SDGs")
self.logger.info(f" selain itu (implisit) → MDGs")
self.df_clean['framework'] = self.df_clean.apply(
lambda row: assign_framework_for_row(
indicator_name = row['indicator_name'],
row_year = int(row['year']),
sdg_start_year = self.sdg_start_year,
),
axis=1
)
# Log ringkasan per indikator untuk verifikasi
self.logger.info(f"\n {'Framework Assignment per Indicator':}")
self.logger.info(f" {'-'*100}")
self.logger.info(f"\n SDG-only indicators dan actual_start_year masing-masing:")
self.logger.info(f" {'-'*80}")
for _, row in indicator_actual_start[indicator_actual_start['is_sdg_only']].iterrows():
self.logger.info(
f" {'ID':<5} {'Indicator Name':<52} "
f"{'Pre-SDG':<10} {'MDGs':<10} {'SDGs':<10} {'SDG-Only?'}"
f" [SDG-only] start={int(row['actual_start_year'])} | {row['indicator_name']}"
)
self.logger.info(f" {'-'*100}")
self.logger.info(
f"\n sdg_start_year (earliest SDG-only, for metadata): {self.sdg_start_year}"
)
# Lookup: indicator_id → actual_start_year (hanya SDG-only, untuk logging)
sdg_only_start_map = (
indicator_actual_start[indicator_actual_start['is_sdg_only']]
.set_index('indicator_id')['actual_start_year']
.to_dict()
)
self.logger.info(f"\n Logika assign framework (PER BARIS, PER INDIKATOR):")
self.logger.info(f" ─────────────────────────────────────────────────────")
self.logger.info(f" Jika indikator TIDAK di SDG_ONLY_KEYWORDS:")
self.logger.info(f"'MDGs' di semua tahun (shared indicators)")
self.logger.info(f" Jika indikator DI SDG_ONLY_KEYWORDS:")
self.logger.info(f" year >= actual_start_year[indikator] → 'SDGs'")
self.logger.info(f" year < actual_start_year[indikator] → 'MDGs'")
self.logger.info(f" ─────────────────────────────────────────────────────")
# ----------------------------------------------------------------
# Assign framework dengan vectorized merge
# ----------------------------------------------------------------
self.df_clean = self.df_clean.merge(
indicator_actual_start[['indicator_id', 'is_sdg_only', 'actual_start_year']],
on='indicator_id',
how='left'
)
# Assign framework:
# - Jika bukan SDG-only → 'MDGs'
# - Jika SDG-only AND year >= actual_start_year → 'SDGs'
# - Jika SDG-only AND year < actual_start_year → 'MDGs'
self.df_clean['framework'] = np.where(
self.df_clean['is_sdg_only'] & (self.df_clean['year'] >= self.df_clean['actual_start_year']),
'SDGs',
'MDGs'
)
# Drop kolom bantu
self.df_clean = self.df_clean.drop(columns=['is_sdg_only', 'actual_start_year'])
# ----------------------------------------------------------------
# Log verifikasi per indikator
# ----------------------------------------------------------------
self.logger.info(f"\n Verifikasi framework per indikator:")
self.logger.info(f" {'-'*105}")
self.logger.info(
f" {'ID':<5} {'Indicator Name':<52} {'Start':<8} "
f"{'MDGs rows':<12} {'SDGs rows':<12} {'Expected'}"
)
self.logger.info(f" {'-'*105}")
for ind_id, grp in self.df_clean.groupby('indicator_id'):
ind_name = grp['indicator_name'].iloc[0]
pre_sdg = (grp['year'] < self.sdg_start_year).sum()
mdgs_rows = (grp['framework'] == 'MDGs').sum()
sdgs_rows = (grp['framework'] == 'SDGs').sum()
is_sdg_only = ind_name.lower().strip() in SDG_ONLY_KEYWORDS
start_yr = int(grp['year'].min())
if is_sdg_only:
ind_start = sdg_only_start_map.get(ind_id, '?')
expected = f"SDGs from {ind_start}, MDGs before"
else:
expected = "MDGs always"
self.logger.info(
f" {int(ind_id):<5} {ind_name[:50]:<52} "
f"{pre_sdg:<10} {mdgs_rows:<10} {sdgs_rows:<10} "
f"{'YES' if is_sdg_only else 'no'}"
f" {int(ind_id):<5} {ind_name[:50]:<52} {start_yr:<8} "
f"{mdgs_rows:<12} {sdgs_rows:<12} {expected}"
)
fw_summary = self.df_clean['framework'].value_counts()
@@ -609,23 +622,41 @@ class AnalyticalLayerLoader:
self.logger.info("STEP 7: VERIFY NO GAPS")
self.logger.info("=" * 80)
# ----------------------------------------------------------------
# Verifikasi dilakukan PER INDIKATOR dari actual_start_year-nya,
# bukan dari self.start_year global, karena tiap indikator bisa
# punya start year berbeda.
# ----------------------------------------------------------------
expected_countries = len(self.selected_country_ids)
verification = self.df_clean.groupby(
['indicator_id', 'year']
)['country_id'].nunique().reset_index()
verification.columns = ['indicator_id', 'year', 'country_count']
all_good = (verification['country_count'] == expected_countries).all()
all_good = True
bad_rows = []
for ind_id, grp in self.df_clean.groupby('indicator_id'):
actual_start = self.indicator_max_start_map.get(ind_id)
if actual_start is None:
continue
expected_years = list(range(int(actual_start), self.end_year + 1))
for year in expected_years:
country_count = grp[grp['year'] == year]['country_id'].nunique()
if country_count != expected_countries:
all_good = False
bad_rows.append({
'indicator_id' : int(ind_id),
'year' : int(year),
'country_count': int(country_count),
})
if all_good:
self.logger.info(
f" VERIFICATION PASSED — all combinations have {expected_countries} countries"
)
else:
bad = verification[verification['country_count'] != expected_countries]
for _, row in bad.head(10).iterrows():
for row in bad_rows[:10]:
self.logger.error(
f" Indicator {int(row['indicator_id'])}, Year {int(row['year'])}: "
f"{int(row['country_count'])} countries (expected {expected_countries})"
f" Indicator {row['indicator_id']}, Year {row['year']}: "
f"{row['country_count']} countries (expected {expected_countries})"
)
raise ValueError("Gap verification failed!")
@@ -638,13 +669,7 @@ class AnalyticalLayerLoader:
def calculate_norm_value(self):
"""
Hitung norm_value_1_100 per indikator — min-max normalisasi skala 1-100,
direction-aware.
CARA KERJA:
- Normalisasi dilakukan GLOBAL per indikator (semua negara + semua tahun sekaligus)
sehingga nilai antar negara dan antar tahun tetap comparable.
- lower_better diinvert: nilai tinggi selalu = kondisi lebih baik.
- Skala 1-100 (bukan 0-100) untuk menghindari nilai absolut nol di Looker Studio.
direction-aware, global per indikator (semua negara + semua tahun).
"""
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 8: CALCULATE NORM_VALUE_1_100 PER INDICATOR")
@@ -936,9 +961,9 @@ class AnalyticalLayerLoader:
'fixed_countries' : len(self.selected_country_ids),
'norm_scale' : '1-100 per indicator global minmax direction-aware',
'framework_logic' : (
'row-level: year < sdg_start_year → MDGs always; '
'year >= sdg_start_year + SDG_ONLY_KEYWORDS → SDGs; '
'else (implicit) → MDGs'
'per-indicator actual_start_year: '
'SDG-only indicator → SDGs from its own actual_start_year, MDGs before; '
'shared/other indicators → MDGs always'
),
'sdg_only_keywords_count' : len(SDG_ONLY_KEYWORDS),
'condition_thresholds' : {
@@ -975,7 +1000,7 @@ class AnalyticalLayerLoader:
self.logger.info("Output: fact_asean_food_security_selected -> fs_asean_gold")
self.logger.info("Kolom baru: norm_value_1_100 (min-max 1-100, direction-aware)")
self.logger.info(f"Condition threshold: bad<{THRESHOLD_BAD}, good>{THRESHOLD_GOOD}")
self.logger.info("Framework: year < sdg_start_year → MDGs | SDG_ONLY → SDGs | else → MDGs (implicit)")
self.logger.info("Framework: per-indicator actual_start_year (baris year < actual_start_year tetap ada, berlabel MDGs)")
self.logger.info("=" * 80)
self.load_source_data()
@@ -1026,7 +1051,7 @@ if __name__ == "__main__":
print("Output: fact_asean_food_security_selected -> fs_asean_gold")
print(f"Norm: min-max 1-100 per indicator, direction-aware")
print(f"Condition threshold: bad<{THRESHOLD_BAD}, good>{THRESHOLD_GOOD}")
print(f"Framework: year < sdg_start_year → MDGs | SDG_ONLY → SDGs | else → MDGs (implicit)")
print("Framework: per-indicator actual_start_year (baris year < actual_start_year tetap ada, berlabel MDGs)")
print("=" * 80)
logger = setup_logging()