This commit is contained in:
Debby
2026-04-01 08:29:18 +07:00
parent db60e6e414
commit 0f93ff6ecd

View File

@@ -8,27 +8,32 @@ Filtering Order:
3. Filter complete indicators PER COUNTRY (auto-detect start year, no gaps) 3. Filter complete indicators PER COUNTRY (auto-detect start year, no gaps)
4. Filter countries with ALL pillars (FIXED SET) 4. Filter countries with ALL pillars (FIXED SET)
5. Filter indicators with consistent presence across FIXED countries 5. Filter indicators with consistent presence across FIXED countries
→ TIDAK menghapus baris year < max_start_year 6. Determine SDG start year & assign framework (MDGs/SDGs) per ROW per year
→ Semua baris tetap ada; label framework ditentukan di Step 6 7. Verify no gaps
6. Assign framework (MDGs/SDGs) per indicator PER ROW 8. Calculate norm_value_1_100 per indicator per country (min-max, direction-aware)
→ Indikator TIDAK di SDG_ONLY_KEYWORDS → 'MDGs' selalu
→ Indikator DI SDG_ONLY_KEYWORDS + year >= sdg_transition_year → 'SDGs'
→ Indikator DI SDG_ONLY_KEYWORDS + year < sdg_transition_year → 'MDGs'
→ sdg_transition_year = min(actual_start_year) dari semua SDG-only indicators
yang lolos filter (= tahun pertama data SDG-only konsisten di semua countries)
7. Verify no gaps (dari actual_start_year per indikator, bukan start_year global)
8. Calculate norm_value_1_100 per indicator (min-max, direction-aware, global)
9. Calculate YoY per indicator per country 9. Calculate YoY per indicator per country
10. Analyze indicator availability by year 10. Analyze indicator availability by year
11. Save analytical table 11. Save analytical table
FRAMEWORK LOGIC: NORMALISASI (Step 8):
- sdg_transition_year dihitung SATU KALI dari actual_start_year SDG-only indicators - norm_value_1_100 = min-max normalisasi nilai raw per indikator, skala 1-100
- Semua SDG-only indicators menggunakan sdg_transition_year yang SAMA - Direction-aware: lower_better diinvert sehingga nilai tinggi selalu = lebih baik
sehingga label berubah serentak di satu titik waktu - Normalisasi dilakukan GLOBAL per indikator (semua negara, semua tahun sekaligus)
- Baris sebelum sdg_transition_year → 'MDGs' (data tetap ada, tidak dihapus) sehingga nilai antar negara dan antar tahun tetap comparable
- Baris mulai sdg_transition_year → 'SDGs' - Kolom ini memungkinkan perbandingan antar indikator yang berbeda satuan di Looker Studio
- Indikator non-SDG-only → 'MDGs' selalu
FRAMEWORK LOGIC (Per-Row, bukan per indikator):
- sdg_start_year dideteksi dari data: tahun pertama indikator FIES lengkap
di semua fixed countries (setelah Step 3-5 filter selesai)
- Proxy deteksi sdg_start_year: HANYA FIES ("food insecurity", "food insecure")
Anemia TIDAK dipakai sebagai proxy karena datanya sudah ada sebelum era SDGs
- Framework di-assign PER BARIS (per year), bukan per indikator:
* row['year'] >= sdg_start_year AND nama ada di SDG_INDICATOR_KEYWORDS -> 'SDGs'
* Selain itu -> 'MDGs'
- Ini menangani indikator "shared" (anemia, stunting, wasting, undernourishment)
yang datanya ada sebelum SDGs:
* row lama (year < sdg_start_year) -> 'MDGs'
* row baru (year >= sdg_start_year) -> 'SDGs'
""" """
import pandas as pd import pandas as pd
@@ -55,14 +60,17 @@ from google.cloud import bigquery
# ============================================================================= # =============================================================================
# SDG-ONLY INDICATOR KEYWORDS # SDG INDICATOR KEYWORDS
# Daftar nama indikator (lowercase) yang masuk SDG framework.
# Indikator ini akan di-assign 'SDGs' untuk baris dengan year >= sdg_start_year,
# dan 'MDGs' untuk baris dengan year < sdg_start_year.
# ============================================================================= # =============================================================================
# Hanya indikator yang MURNI BARU di era SDGs yang didaftarkan di sini.
# Indikator di set ini → 'SDGs' mulai dari sdg_transition_year.
# Semua indikator lain (shared maupun tidak dikenal) → 'MDGs' di semua tahun.
SDG_ONLY_KEYWORDS = frozenset([ SDG_INDICATOR_KEYWORDS = frozenset([
# TARGET 2.1.2FIES (SDGs only) # TARGET 2.1.1Prevalence of undernourishment (shared: ada sebelum SDGs)
"prevalence of undernourishment (percent) (3-year average)",
"number of people undernourished (million) (3-year average)",
# TARGET 2.1.2 — FIES (SDGs only — murni baru di era SDGs)
"prevalence of severe food insecurity in the total population (percent) (3-year average)", "prevalence of severe food insecurity in the total population (percent) (3-year average)",
"prevalence of severe food insecurity in the male adult population (percent) (3-year average)", "prevalence of severe food insecurity in the male adult population (percent) (3-year average)",
"prevalence of severe food insecurity in the female adult population (percent) (3-year average)", "prevalence of severe food insecurity in the female adult population (percent) (3-year average)",
@@ -75,14 +83,45 @@ SDG_ONLY_KEYWORDS = frozenset([
"number of moderately or severely food insecure people (million) (3-year average)", "number of moderately or severely food insecure people (million) (3-year average)",
"number of moderately or severely food insecure male adults (million) (3-year average)", "number of moderately or severely food insecure male adults (million) (3-year average)",
"number of moderately or severely food insecure female adults (million) (3-year average)", "number of moderately or severely food insecure female adults (million) (3-year average)",
# TARGET 2.2.3Anaemia (SDGs only) # TARGET 2.2.1Stunting (shared: ada sebelum SDGs)
"percentage of children under 5 years of age who are stunted (modelled estimates) (percent)",
"number of children under 5 years of age who are stunted (modeled estimates) (million)",
# TARGET 2.2.2 — Wasting & Overweight (shared: ada sebelum SDGs)
"percentage of children under 5 years affected by wasting (percent)",
"number of children under 5 years affected by wasting (million)",
"percentage of children under 5 years of age who are overweight (modelled estimates) (percent)",
"number of children under 5 years of age who are overweight (modeled estimates) (million)",
# TARGET 2.2.3 — Anaemia (shared: data ada sebelum SDGs, listed here agar
# baris >= sdg_start_year di-assign 'SDGs')
"prevalence of anemia among women of reproductive age (15-49 years) (percent)", "prevalence of anemia among women of reproductive age (15-49 years) (percent)",
"number of women of reproductive age (15-49 years) affected by anemia (million)", "number of women of reproductive age (15-49 years) affected by anemia (million)",
]) ])
# =============================================================================
# SDG ERA PROXY KEYWORDS
# HANYA indikator yang MURNI baru di era SDGs (FIES saja).
# Dipakai untuk mendeteksi sdg_start_year dari data.
#
# PENTING — Anemia/anaemia TIDAK dipakai sebagai proxy:
# Data anemia sudah ada sebelum era SDGs sehingga actual_start_year-nya
# lebih awal dari sdg_start_year. Jika dipakai sebagai proxy, sdg_start_year
# akan terdeteksi terlalu awal dan seluruh baris anemia akan menjadi 'SDGs'.
# FIES adalah satu-satunya indikator yang benar-benar murni baru di era SDGs
# dan dapat dipakai sebagai penanda tahun mulainya era SDGs.
# =============================================================================
_SDG_ERA_PROXY_KEYWORDS = frozenset([
"food insecurity",
"food insecure",
])
# ============================================================================= # =============================================================================
# THRESHOLD KONDISI (fixed absolute, skala 1-100) # THRESHOLD KONDISI (fixed absolute, skala 1-100)
# ============================================================================= # =============================================================================
# Digunakan untuk assign kondisi di analysis_layer.
# Didefinisikan di sini agar konsisten antara kedua file.
# bad : norm_value_1_100 < THRESHOLD_BAD
# good : norm_value_1_100 > THRESHOLD_GOOD
# moderate : di antara keduanya
THRESHOLD_BAD = 40.0 THRESHOLD_BAD = 40.0
THRESHOLD_GOOD = 60.0 THRESHOLD_GOOD = 60.0
@@ -91,6 +130,8 @@ THRESHOLD_GOOD = 60.0
def assign_condition(norm_value_1_100: float) -> str: def assign_condition(norm_value_1_100: float) -> str:
""" """
Assign kondisi berdasarkan norm_value_1_100 (skala 1-100, sudah direction-aware). Assign kondisi berdasarkan norm_value_1_100 (skala 1-100, sudah direction-aware).
Nilai tinggi selalu berarti lebih baik (lower_better sudah diinvert).
Returns: 'good' / 'moderate' / 'bad' Returns: 'good' / 'moderate' / 'bad'
""" """
if pd.isna(norm_value_1_100): if pd.isna(norm_value_1_100):
@@ -102,6 +143,44 @@ def assign_condition(norm_value_1_100: float) -> str:
return 'moderate' return 'moderate'
def assign_framework_per_row(
indicator_name: str,
year: int,
sdg_start_year: int,
) -> str:
"""
Tentukan framework (MDGs/SDGs) per BARIS (per row year), bukan per indikator.
Logic:
- 'SDGs' jika KEDUA kondisi terpenuhi:
1. Nama indikator ada di SDG_INDICATOR_KEYWORDS
2. year (tahun baris ini) >= sdg_start_year
- 'MDGs' untuk semua kasus lain.
Mengapa per row, bukan per indikator?
Indikator "shared" seperti anemia, stunting, wasting, undernourishment
memiliki data yang ada SEBELUM era SDGs dimulai. Jika assign dilakukan
per indikator menggunakan actual_start_year, indikator-indikator ini
akan selalu di-assign 'MDGs' karena actual_start_year < sdg_start_year.
Dengan assign per row menggunakan year baris:
- baris lama (year < sdg_start_year) -> 'MDGs' (benar: belum era SDGs)
- baris baru (year >= sdg_start_year) -> 'SDGs' (benar: sudah era SDGs)
Contoh anemia (sdg_start_year = 2016):
- row year=2013 -> 'MDGs'
- row year=2014 -> 'MDGs'
- row year=2015 -> 'MDGs'
- row year=2016 -> 'SDGs'
- row year=2017 -> 'SDGs'
- ...
"""
name_lower = str(indicator_name).lower().strip()
in_sdg_list = name_lower in SDG_INDICATOR_KEYWORDS
if in_sdg_list and int(year) >= sdg_start_year:
return 'SDGs'
return 'MDGs'
# ============================================================================= # =============================================================================
# ANALYTICAL LAYER CLASS # ANALYTICAL LAYER CLASS
# ============================================================================= # =============================================================================
@@ -115,17 +194,14 @@ class AnalyticalLayerLoader:
indicator_id, indicator_name, direction, framework, indicator_id, indicator_name, direction, framework,
pillar_id, pillar_name, pillar_id, pillar_name,
time_id, year, value, time_id, year, value,
norm_value_1_100, norm_value_1_100, <- min-max norm per indikator, skala 1-100, direction-aware
yoy_change, yoy_pct yoy_change, yoy_pct
FRAMEWORK LOGIC: Catatan framework:
- Indikator TIDAK di SDG_ONLY_KEYWORDS → 'MDGs' di SEMUA tahun Framework di-assign PER BARIS (per year), sehingga indikator shared
- Indikator DI SDG_ONLY_KEYWORDS: seperti anemia dapat memiliki framework berbeda di baris yang berbeda:
year < sdg_transition_year 'MDGs' (data tetap ada, tidak dihapus) - baris sebelum sdg_start_year -> 'MDGs'
year >= sdg_transition_year 'SDGs' - baris sejak sdg_start_year -> 'SDGs'
- sdg_transition_year = min(actual_start_year) dari semua SDG-only indicators
yang lolos filter Step 3-5. Semua SDG-only indicators menggunakan
sdg_transition_year yang SAMA agar label berubah serentak.
""" """
def __init__(self, client: bigquery.Client): def __init__(self, client: bigquery.Client):
@@ -139,12 +215,12 @@ class AnalyticalLayerLoader:
self.df_pillar = None self.df_pillar = None
self.selected_country_ids = None self.selected_country_ids = None
self.indicator_max_start_map = {} # indicator_id → max_start_year (dari Step 5)
self.sdg_transition_year = None # tahun SDGs mulai berlaku (dari Step 6)
self.start_year = 2013 self.start_year = 2013
self.end_year = None self.end_year = None
self.baseline_year = 2023 self.baseline_year = 2023 # hardcode per syarat dosen (tahun terlengkap)
self.sdg_start_year = None
self.pipeline_metadata = { self.pipeline_metadata = {
'source_class' : self.__class__.__name__, 'source_class' : self.__class__.__name__,
@@ -230,6 +306,15 @@ class AnalyticalLayerLoader:
self.logger.info("STEP 2: DETERMINE YEAR BOUNDARIES") self.logger.info("STEP 2: DETERMINE YEAR BOUNDARIES")
self.logger.info("=" * 80) self.logger.info("=" * 80)
# Filter single years only (is_year_range == False)
if 'is_year_range' in self.df_clean.columns:
before = len(self.df_clean)
self.df_clean = self.df_clean[self.df_clean['is_year_range'] == False].copy()
self.logger.info(
f" Filter single years only: {before:,} -> {len(self.df_clean):,} rows"
)
# baseline_year = 2023 hardcode (syarat dosen: minimal 2023)
df_baseline = self.df_clean[self.df_clean['year'] == self.baseline_year] df_baseline = self.df_clean[self.df_clean['year'] == self.baseline_year]
baseline_indicator_count = df_baseline['indicator_id'].nunique() baseline_indicator_count = df_baseline['indicator_id'].nunique()
@@ -394,8 +479,6 @@ class AnalyticalLayerLoader:
self.logger.info("STEP 5: FILTER INDICATORS WITH CONSISTENT PRESENCE") self.logger.info("STEP 5: FILTER INDICATORS WITH CONSISTENT PRESENCE")
self.logger.info("=" * 80) self.logger.info("=" * 80)
# Hitung max_start_year per indikator = max(min_year per country)
# = tahun pertama di mana SEMUA fixed countries sudah punya data
indicator_country_start = self.df_clean.groupby([ indicator_country_start = self.df_clean.groupby([
'indicator_id', 'indicator_name', 'country_id' 'indicator_id', 'indicator_name', 'country_id'
])['year'].min().reset_index() ])['year'].min().reset_index()
@@ -424,8 +507,6 @@ class AnalyticalLayerLoader:
}) })
continue continue
# Cek apakah semua tahun dari max_start s/d end_year
# hadir di SEMUA fixed countries
expected_years = list(range(max_start, self.end_year + 1)) expected_years = list(range(max_start, self.end_year + 1))
ind_data = self.df_clean[self.df_clean['indicator_id'] == indicator_id] ind_data = self.df_clean[self.df_clean['indicator_id'] == indicator_id]
all_years_complete = True all_years_complete = True
@@ -448,173 +529,140 @@ class AnalyticalLayerLoader:
self.logger.info(f"\n [+] Valid: {len(valid_indicators)}") self.logger.info(f"\n [+] Valid: {len(valid_indicators)}")
self.logger.info(f" [-] Removed: {len(removed_indicators)}") self.logger.info(f" [-] Removed: {len(removed_indicators)}")
if removed_indicators:
self.logger.info(f"\n Removed indicators:")
for item in removed_indicators:
self.logger.info(f" [-] {item['indicator_name'][:60]} | {item['reason']}")
if not valid_indicators: if not valid_indicators:
raise ValueError("No valid indicators found after filtering!") raise ValueError("No valid indicators found after filtering!")
# ----------------------------------------------------------------
# Filter hanya indikator yang valid.
# PENTING: TIDAK menghapus baris year < max_start_year.
# Semua baris tetap ada — label framework ditentukan di Step 6.
# max_start_year disimpan sebagai lookup untuk Step 6 & 7.
# ----------------------------------------------------------------
original_count = len(self.df_clean) original_count = len(self.df_clean)
self.df_clean = self.df_clean[ self.df_clean = self.df_clean[
self.df_clean['indicator_id'].isin(valid_indicators) self.df_clean['indicator_id'].isin(valid_indicators)
].copy() ].copy()
# Simpan max_start_year per indicator_id untuk Step 6 dan Step 7 self.df_clean = self.df_clean.merge(
self.indicator_max_start_map = ( indicator_max_start[['indicator_id', 'max_start_year']],
indicator_max_start[indicator_max_start['indicator_id'].isin(valid_indicators)] on='indicator_id', how='left'
.set_index('indicator_id')['max_start_year']
.to_dict()
) )
self.df_clean = self.df_clean[
self.df_clean['year'] >= self.df_clean['max_start_year']
].copy()
self.df_clean = self.df_clean.drop('max_start_year', axis=1)
self.logger.info(f"\n Rows before: {original_count:,}") self.logger.info(f"\n Rows before: {original_count:,}")
self.logger.info(f" Rows after: {len(self.df_clean):,}") self.logger.info(f" Rows after: {len(self.df_clean):,}")
self.logger.info(f" Countries: {self.df_clean['country_id'].nunique()}") self.logger.info(f" Countries: {self.df_clean['country_id'].nunique()}")
self.logger.info(f" Indicators: {self.df_clean['indicator_id'].nunique()}") self.logger.info(f" Indicators: {self.df_clean['indicator_id'].nunique()}")
self.logger.info(f" Pillars: {self.df_clean['pillar_id'].nunique()}") self.logger.info(f" Pillars: {self.df_clean['pillar_id'].nunique()}")
self.logger.info(
f"\n [NOTE] Baris year < max_start_year TETAP ADA di data. "
f"Label framework akan ditentukan di Step 6."
)
return self.df_clean return self.df_clean
# ------------------------------------------------------------------ # ------------------------------------------------------------------
# STEP 6: ASSIGN FRAMEWORK PER ROW # STEP 6: DETERMINE SDG START YEAR & ASSIGN FRAMEWORK PER ROW
# ------------------------------------------------------------------ # ------------------------------------------------------------------
def determine_sdg_start_year(self): def determine_sdg_start_year(self):
self.logger.info("\n" + "=" * 80) self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 6: ASSIGN FRAMEWORK PER ROW") self.logger.info("STEP 6: DETERMINE SDG START YEAR & ASSIGN FRAMEWORK PER ROW")
self.logger.info("=" * 80) self.logger.info("=" * 80)
self.logger.info(
" Proxy: FIES only (food insecurity/food insecure).\n"
" Anemia TIDAK dipakai sebagai proxy — datanya ada sebelum era SDGs.\n"
" Framework di-assign PER BARIS (year), bukan per indikator."
)
# actual_start_year per indikator = max(min_year per country)
# = konsisten dengan max_start_year di Step 5
indicator_actual_start = (
self.df_clean
.groupby(['indicator_id', 'indicator_name', 'country_id'])['year']
.min().reset_index()
.groupby(['indicator_id', 'indicator_name'])['year']
.max().reset_index()
)
indicator_actual_start.columns = ['indicator_id', 'indicator_name', 'actual_start_year']
# Deteksi sdg_start_year dari proxy SDGs-only (FIES saja, BUKAN anemia)
proxy_mask = indicator_actual_start['indicator_name'].str.lower().apply(
lambda n: any(kw in n for kw in _SDG_ERA_PROXY_KEYWORDS)
)
df_proxy = indicator_actual_start[proxy_mask]
if df_proxy.empty:
raise ValueError(
"Tidak ada indikator proxy SDGs (FIES) yang lolos filter. "
"Pastikan indikator FIES (food insecurity/food insecure) ada di data."
)
self.sdg_start_year = int(df_proxy['actual_start_year'].min())
self.logger.info(f"\n sdg_start_year = {self.sdg_start_year}")
self.logger.info(f" Proxy indicators (FIES only):")
for _, row in df_proxy.iterrows():
self.logger.info(f" [{int(row['actual_start_year'])}] {row['indicator_name']}")
# ---------------------------------------------------------------- # ----------------------------------------------------------------
# Bangun tabel actual_start_year per indikator dari # Assign framework PER BARIS menggunakan year baris, bukan actual_start_year
# indicator_max_start_map yang sudah ditetapkan di Step 5. # Sehingga indikator "shared" (anemia, stunting, dll) mendapat:
# - 'MDGs' untuk baris sebelum sdg_start_year
# - 'SDGs' untuk baris sejak sdg_start_year
# ---------------------------------------------------------------- # ----------------------------------------------------------------
indicator_actual_start = pd.DataFrame([ self.df_clean['framework'] = self.df_clean.apply(
{'indicator_id': ind_id, 'actual_start_year': int(start_yr)} lambda row: assign_framework_per_row(
for ind_id, start_yr in self.indicator_max_start_map.items() indicator_name = row['indicator_name'],
]) year = int(row['year']),
sdg_start_year = self.sdg_start_year,
),
axis=1
)
# Merge indicator_name untuk logging # ----------------------------------------------------------------
indicator_actual_start = indicator_actual_start.merge( # Logging: ringkasan per indikator (frameworks apa yang muncul)
self.df_clean[['indicator_id', 'indicator_name']].drop_duplicates(), # ----------------------------------------------------------------
ind_fw_summary = (
self.df_clean
.groupby(['indicator_id', 'indicator_name'])['framework']
.unique()
.reset_index()
)
ind_fw_summary['frameworks'] = ind_fw_summary['framework'].apply(
lambda x: '/'.join(sorted(x))
)
ind_fw_summary = ind_fw_summary.merge(
indicator_actual_start[['indicator_id', 'actual_start_year']],
on='indicator_id', how='left' on='indicator_id', how='left'
) )
# Tandai mana yang SDG-only self.logger.info(f"\n Framework assignment per indikator:")
indicator_actual_start['is_sdg_only'] = ( self.logger.info(f" {'-'*85}")
indicator_actual_start['indicator_name'] self.logger.info(f" {'ID':<5} {'Frameworks':<18} {'ActualStart':<13} {'Indicator Name'}")
.str.lower().str.strip() self.logger.info(f" {'-'*85}")
.isin(SDG_ONLY_KEYWORDS) for _, row in ind_fw_summary.sort_values(
) ['frameworks', 'actual_start_year', 'indicator_name']
).iterrows():
# ----------------------------------------------------------------
# sdg_transition_year = min(actual_start_year) dari semua SDG-only
# indicators yang lolos filter.
# Ini adalah satu titik waktu di mana semua SDG-only indicators
# berubah dari 'MDGs' ke 'SDGs' secara SERENTAK.
# ----------------------------------------------------------------
sdg_only_df = indicator_actual_start[indicator_actual_start['is_sdg_only']]
if sdg_only_df.empty:
raise ValueError(
"Tidak ada indikator SDG-only (FIES/anaemia) yang lolos filter. "
"Pastikan indikator FIES dan anaemia ada di data."
)
self.sdg_transition_year = int(sdg_only_df['actual_start_year'].min())
self.logger.info(f"\n SDG-only indicators dan actual_start_year masing-masing:")
self.logger.info(f" {'-'*80}")
for _, row in sdg_only_df.iterrows():
self.logger.info( self.logger.info(
f" [SDG-only] actual_start={int(row['actual_start_year'])} | " f" {int(row['indicator_id']):<5} {row['frameworks']:<18} "
f"{row['indicator_name']}" f"{int(row['actual_start_year']):<13} {row['indicator_name'][:48]}"
) )
# Indikator dengan framework split (MDGs/SDGs) — highlight untuk validasi
split_inds = ind_fw_summary[ind_fw_summary['frameworks'] == 'MDGs/SDGs']
if not split_inds.empty:
self.logger.info( self.logger.info(
f"\n sdg_transition_year = {self.sdg_transition_year} " f"\n [INFO] {len(split_inds)} indikator memiliki framework split "
f"(min actual_start_year dari semua SDG-only indicators)" f"(MDGs sebelum {self.sdg_start_year}, SDGs sejak {self.sdg_start_year}):"
)
self.logger.info(f"\n Logika assign framework (PER BARIS):")
self.logger.info(f" ──────────────────────────────────────────────────────────")
self.logger.info(f" Indikator TIDAK di SDG_ONLY_KEYWORDS:")
self.logger.info(f"'MDGs' di semua tahun")
self.logger.info(f" Indikator DI SDG_ONLY_KEYWORDS:")
self.logger.info(f" year < {self.sdg_transition_year}'MDGs' (data tetap ada)")
self.logger.info(f" year >= {self.sdg_transition_year}'SDGs'")
self.logger.info(f" ──────────────────────────────────────────────────────────")
# ----------------------------------------------------------------
# Assign framework dengan vectorized operation menggunakan
# sdg_transition_year (SATU nilai untuk semua SDG-only indicators)
# ----------------------------------------------------------------
# Tandai apakah setiap baris adalah SDG-only indicator
sdg_only_ids = set(
indicator_actual_start.loc[
indicator_actual_start['is_sdg_only'], 'indicator_id'
]
)
self.df_clean['_is_sdg_only'] = self.df_clean['indicator_id'].isin(sdg_only_ids)
# Assign framework:
# - Bukan SDG-only → 'MDGs'
# - SDG-only AND year >= sdg_transition_year → 'SDGs'
# - SDG-only AND year < sdg_transition_year → 'MDGs'
self.df_clean['framework'] = np.where(
self.df_clean['_is_sdg_only'] &
(self.df_clean['year'] >= self.sdg_transition_year),
'SDGs',
'MDGs'
)
# Drop kolom bantu
self.df_clean = self.df_clean.drop(columns=['_is_sdg_only'])
# ----------------------------------------------------------------
# Log verifikasi per indikator
# ----------------------------------------------------------------
self.logger.info(f"\n Verifikasi framework per indikator:")
self.logger.info(f" {'-'*110}")
self.logger.info(
f" {'ID':<5} {'Indicator Name':<52} {'Data From':<12} "
f"{'MDGs rows':<12} {'SDGs rows':<12} {'Note'}"
)
self.logger.info(f" {'-'*110}")
for ind_id, grp in self.df_clean.groupby('indicator_id'):
ind_name = grp['indicator_name'].iloc[0]
mdgs_rows = (grp['framework'] == 'MDGs').sum()
sdgs_rows = (grp['framework'] == 'SDGs').sum()
is_sdg_only = ind_id in sdg_only_ids
data_from = int(grp['year'].min())
if is_sdg_only:
note = f"SDGs from {self.sdg_transition_year}, MDGs before"
else:
note = "MDGs always"
self.logger.info(
f" {int(ind_id):<5} {ind_name[:50]:<52} {data_from:<12} "
f"{mdgs_rows:<12} {sdgs_rows:<12} {note}"
) )
for _, row in split_inds.iterrows():
self.logger.info(f" - {row['indicator_name'][:60]}")
fw_summary = self.df_clean['framework'].value_counts() fw_summary = self.df_clean['framework'].value_counts()
self.logger.info(f"\n Ringkasan rows: " + " | ".join( self.logger.info(
f"{fw}: {cnt:,}" for fw, cnt in fw_summary.items() f"\n Ringkasan rows: " +
)) " | ".join(f"{fw}: {cnt:,}" for fw, cnt in fw_summary.items())
)
end_year_df = self.df_clean[self.df_clean['year'] == self.end_year]
fw_ind_summary = end_year_df.groupby('framework')['indicator_id'].nunique()
self.logger.info(f" Indicators di year={self.end_year}: " + " | ".join(
f"{fw}: {cnt}" for fw, cnt in fw_ind_summary.items()
))
self.logger.info( self.logger.info(
f"\n [OK] 'framework' ditambahkan — " f"\n [OK] 'framework' ditambahkan per row "
f"MDGs: {(self.df_clean['framework'] == 'MDGs').sum():,} rows | " f"MDGs: {(self.df_clean['framework'] == 'MDGs').sum():,} rows | "
f"SDGs: {(self.df_clean['framework'] == 'SDGs').sum():,} rows" f"SDGs: {(self.df_clean['framework'] == 'SDGs').sum():,} rows"
) )
@@ -629,44 +677,23 @@ class AnalyticalLayerLoader:
self.logger.info("STEP 7: VERIFY NO GAPS") self.logger.info("STEP 7: VERIFY NO GAPS")
self.logger.info("=" * 80) self.logger.info("=" * 80)
# ----------------------------------------------------------------
# Verifikasi dilakukan PER INDIKATOR dari actual_start_year-nya,
# bukan dari self.start_year global, karena tiap indikator bisa
# punya start year berbeda.
# Baris sebelum actual_start_year (yang berlabel MDGs) tidak dicek
# karena memang tidak semua country punya data di sana.
# ----------------------------------------------------------------
expected_countries = len(self.selected_country_ids) expected_countries = len(self.selected_country_ids)
all_good = True verification = self.df_clean.groupby(
bad_rows = [] ['indicator_id', 'year']
)['country_id'].nunique().reset_index()
for ind_id, grp in self.df_clean.groupby('indicator_id'): verification.columns = ['indicator_id', 'year', 'country_count']
actual_start = self.indicator_max_start_map.get(ind_id) all_good = (verification['country_count'] == expected_countries).all()
if actual_start is None:
continue
expected_years = list(range(int(actual_start), self.end_year + 1))
for year in expected_years:
country_count = grp[grp['year'] == year]['country_id'].nunique()
if country_count != expected_countries:
all_good = False
bad_rows.append({
'indicator_id' : int(ind_id),
'year' : int(year),
'country_count': int(country_count),
})
if all_good: if all_good:
self.logger.info( self.logger.info(
f" VERIFICATION PASSED — all combinations from actual_start_year " f" VERIFICATION PASSED — all combinations have {expected_countries} countries"
f"have {expected_countries} countries"
) )
else: else:
for row in bad_rows[:10]: bad = verification[verification['country_count'] != expected_countries]
for _, row in bad.head(10).iterrows():
self.logger.error( self.logger.error(
f" Indicator {row['indicator_id']}, Year {row['year']}: " f" Indicator {int(row['indicator_id'])}, Year {int(row['year'])}: "
f"{row['country_count']} countries (expected {expected_countries})" f"{int(row['country_count'])} countries (expected {expected_countries})"
) )
raise ValueError("Gap verification failed!") raise ValueError("Gap verification failed!")
@@ -679,7 +706,22 @@ class AnalyticalLayerLoader:
def calculate_norm_value(self): def calculate_norm_value(self):
""" """
Hitung norm_value_1_100 per indikator — min-max normalisasi skala 1-100, Hitung norm_value_1_100 per indikator — min-max normalisasi skala 1-100,
direction-aware, global per indikator (semua negara + semua tahun). direction-aware.
CARA KERJA:
- Normalisasi dilakukan GLOBAL per indikator (semua negara + semua tahun sekaligus)
sehingga nilai antar negara dan antar tahun tetap comparable.
- lower_better diinvert: nilai tinggi selalu = kondisi lebih baik.
Contoh: undernourishment 5% (rendah = baik) → norm tinggi setelah invert.
- Skala 1-100 (bukan 0-100) untuk menghindari nilai absolut nol di Looker Studio.
- Kolom ini memungkinkan perbandingan lintas indikator yang berbeda satuan
(persen, juta orang, dll) karena sudah dinormalisasi ke skala yang sama.
Catatan:
- Berbeda dengan norm_value di _get_norm_value_df() di analysis_layer
yang skala 0-1 dan dipakai untuk agregasi composite score.
- norm_value_1_100 ini adalah per baris (per country per year per indicator),
untuk ditampilkan langsung di Looker Studio.
""" """
self.logger.info("\n" + "=" * 80) self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 8: CALCULATE NORM_VALUE_1_100 PER INDICATOR") self.logger.info("STEP 8: CALCULATE NORM_VALUE_1_100 PER INDICATOR")
@@ -693,10 +735,7 @@ class AnalyticalLayerLoader:
norm_parts = [] norm_parts = []
indicators = df.groupby(['indicator_id', 'indicator_name', 'direction']) indicators = df.groupby(['indicator_id', 'indicator_name', 'direction'])
self.logger.info( self.logger.info(f"\n {'ID':<5} {'Direction':<15} {'Invert':<8} {'Min':>10} {'Max':>10} {'Indicator Name'}")
f"\n {'ID':<5} {'Direction':<15} {'Invert':<8} "
f"{'Min':>10} {'Max':>10} {'Indicator Name'}"
)
self.logger.info(f" {'-'*90}") self.logger.info(f" {'-'*90}")
for (ind_id, ind_name, direction), grp in indicators: for (ind_id, ind_name, direction), grp in indicators:
@@ -716,11 +755,15 @@ class AnalyticalLayerLoader:
normed = np.full(len(grp), np.nan) normed = np.full(len(grp), np.nan)
if v_min == v_max: if v_min == v_max:
# Semua nilai sama → beri nilai tengah (50.5 pada skala 1-100)
normed[valid_mask.values] = 50.5 normed[valid_mask.values] = 50.5
else: else:
# Min-max ke 0-1 dulu
scaled = (raw - v_min) / (v_max - v_min) scaled = (raw - v_min) / (v_max - v_min)
# Invert jika lower_better
if do_invert: if do_invert:
scaled = 1.0 - scaled scaled = 1.0 - scaled
# Scale ke 1-100
normed[valid_mask.values] = 1.0 + scaled * 99.0 normed[valid_mask.values] = 1.0 + scaled * 99.0
grp['norm_value_1_100'] = normed grp['norm_value_1_100'] = normed
@@ -733,6 +776,7 @@ class AnalyticalLayerLoader:
self.df_clean = pd.concat(norm_parts, ignore_index=True) self.df_clean = pd.concat(norm_parts, ignore_index=True)
# Statistik ringkasan
valid_norm = self.df_clean['norm_value_1_100'].notna().sum() valid_norm = self.df_clean['norm_value_1_100'].notna().sum()
null_norm = self.df_clean['norm_value_1_100'].isna().sum() null_norm = self.df_clean['norm_value_1_100'].isna().sum()
self.logger.info(f"\n norm_value_1_100 — valid: {valid_norm:,} | null: {null_norm:,}") self.logger.info(f"\n norm_value_1_100 — valid: {valid_norm:,} | null: {null_norm:,}")
@@ -742,14 +786,10 @@ class AnalyticalLayerLoader:
f"{self.df_clean['norm_value_1_100'].max():.2f}" f"{self.df_clean['norm_value_1_100'].max():.2f}"
) )
self.df_clean['_condition_preview'] = ( # Log distribusi kondisi berdasarkan threshold
self.df_clean['norm_value_1_100'].apply(assign_condition) self.df_clean['_condition_preview'] = self.df_clean['norm_value_1_100'].apply(assign_condition)
)
cond_dist = self.df_clean['_condition_preview'].value_counts() cond_dist = self.df_clean['_condition_preview'].value_counts()
self.logger.info( self.logger.info(f"\n Distribusi kondisi (threshold: bad<{THRESHOLD_BAD}, good>{THRESHOLD_GOOD}):")
f"\n Distribusi kondisi "
f"(threshold: bad<{THRESHOLD_BAD}, good>{THRESHOLD_GOOD}):"
)
for cond, cnt in cond_dist.items(): for cond, cnt in cond_dist.items():
self.logger.info(f" {cond}: {cnt:,} rows") self.logger.info(f" {cond}: {cnt:,} rows")
self.df_clean = self.df_clean.drop(columns=['_condition_preview']) self.df_clean = self.df_clean.drop(columns=['_condition_preview'])
@@ -822,39 +862,45 @@ class AnalyticalLayerLoader:
'start_year', 'end_year', 'country_count' 'start_year', 'end_year', 'country_count'
] ]
fw_at_end = ( # Framework summary per indikator (bisa MDGs, SDGs, atau MDGs/SDGs split)
self.df_clean[self.df_clean['year'] == self.end_year] ind_fw = (
self.df_clean
.groupby('indicator_id')['framework'] .groupby('indicator_id')['framework']
.first() .unique()
.reset_index() .reset_index()
) )
indicator_details = indicator_details.merge(fw_at_end, on='indicator_id', how='left') ind_fw['framework_label'] = ind_fw['framework'].apply(
indicator_details['framework'] = indicator_details['framework'].fillna('MDGs') lambda x: '/'.join(sorted(x))
)
indicator_details = indicator_details.merge(
ind_fw[['indicator_id', 'framework_label']],
on='indicator_id', how='left'
)
indicator_details['year_range'] = ( indicator_details['year_range'] = (
indicator_details['start_year'].astype(int).astype(str) + '-' + indicator_details['start_year'].astype(int).astype(str) + '-' +
indicator_details['end_year'].astype(int).astype(str) indicator_details['end_year'].astype(int).astype(str)
) )
indicator_details = indicator_details.sort_values( indicator_details = indicator_details.sort_values(
['framework', 'pillar_name', 'start_year', 'indicator_name'] ['framework_label', 'pillar_name', 'start_year', 'indicator_name']
) )
self.logger.info(f"\nTotal Indicators: {len(indicator_details)}") self.logger.info(f"\nTotal Indicators: {len(indicator_details)}")
self.logger.info(f"Framework breakdown (at end_year={self.end_year}):") self.logger.info(f"Framework breakdown (per indicator label):")
for fw, count in indicator_details.groupby('framework').size().items(): for fw, count in indicator_details.groupby('framework_label').size().items():
self.logger.info(f" {fw}: {count} indicators") self.logger.info(f" {fw}: {count} indicators")
self.logger.info(f"\n{'-'*110}") self.logger.info(f"\n{'-'*115}")
self.logger.info( self.logger.info(
f"{'ID':<5} {'Indicator Name':<55} {'Pillar':<15} " f"{'ID':<5} {'Indicator Name':<55} {'Pillar':<15} "
f"{'Framework':<10} {'Years':<12} {'Dir':<8} {'Countries'}" f"{'Framework':<15} {'Years':<12} {'Dir':<8} {'Countries'}"
) )
self.logger.info(f"{'-'*110}") self.logger.info(f"{'-'*115}")
for _, row in indicator_details.iterrows(): for _, row in indicator_details.iterrows():
direction = 'higher+' if row['direction'] == 'higher_better' else 'lower-' direction = 'higher+' if row['direction'] == 'higher_better' else 'lower-'
self.logger.info( self.logger.info(
f"{int(row['indicator_id']):<5} {row['indicator_name'][:52]:<55} " f"{int(row['indicator_id']):<5} {row['indicator_name'][:52]:<55} "
f"{row['pillar_name'][:13]:<15} {row['framework']:<10} " f"{row['pillar_name'][:13]:<15} {row['framework_label']:<15} "
f"{row['year_range']:<12} {direction:<8} {int(row['country_count'])}" f"{row['year_range']:<12} {direction:<8} {int(row['country_count'])}"
) )
@@ -917,20 +963,22 @@ class AnalyticalLayerLoader:
self.logger.info(f" Total rows: {len(analytical_df):,}") self.logger.info(f" Total rows: {len(analytical_df):,}")
# Framework distribution per row
fw_dist_rows = analytical_df['framework'].value_counts() fw_dist_rows = analytical_df['framework'].value_counts()
self.logger.info(f" Framework distribution (rows):") self.logger.info(f" Framework distribution (rows):")
for fw, cnt in fw_dist_rows.items(): for fw, cnt in fw_dist_rows.items():
self.logger.info(f" {fw}: {cnt:,} rows") self.logger.info(f" {fw}: {cnt:,} rows")
fw_dist_ind = ( # Framework distribution per indikator (label)
analytical_df[analytical_df['year'] == self.end_year] ind_fw_label = (
.drop_duplicates('indicator_id')['framework'] analytical_df
.groupby('indicator_id')['framework']
.unique()
.apply(lambda x: '/'.join(sorted(x)))
.value_counts() .value_counts()
) )
self.logger.info( self.logger.info(f" Framework distribution (per indicator label):")
f" Framework distribution (indicators at year={self.end_year}):" for fw, cnt in ind_fw_label.items():
)
for fw, cnt in fw_dist_ind.items():
self.logger.info(f" {fw}: {cnt} indicators") self.logger.info(f" {fw}: {cnt} indicators")
self.logger.info( self.logger.info(
@@ -977,16 +1025,11 @@ class AnalyticalLayerLoader:
'start_year' : self.start_year, 'start_year' : self.start_year,
'end_year' : self.end_year, 'end_year' : self.end_year,
'baseline_year' : self.baseline_year, 'baseline_year' : self.baseline_year,
'sdg_transition_year' : self.sdg_transition_year, 'sdg_start_year' : self.sdg_start_year,
'fixed_countries' : len(self.selected_country_ids), 'fixed_countries' : len(self.selected_country_ids),
'norm_scale' : '1-100 per indicator global minmax direction-aware', 'norm_scale' : '1-100 per indicator global minmax direction-aware',
'framework_logic' : ( 'framework_assignment' : 'per-row by year (not per-indicator)',
'sdg_transition_year = min(actual_start_year) dari SDG-only indicators; ' 'sdg_proxy_keywords' : list(_SDG_ERA_PROXY_KEYWORDS),
'SDG-only year >= sdg_transition_year → SDGs; '
'SDG-only year < sdg_transition_year → MDGs (data tetap ada); '
'non-SDG-only → MDGs selalu'
),
'sdg_only_keywords_count': len(SDG_ONLY_KEYWORDS),
'condition_thresholds' : { 'condition_thresholds' : {
'bad' : f'< {THRESHOLD_BAD}', 'bad' : f'< {THRESHOLD_BAD}',
'moderate': f'{THRESHOLD_BAD}-{THRESHOLD_GOOD}', 'moderate': f'{THRESHOLD_BAD}-{THRESHOLD_GOOD}',
@@ -996,8 +1039,9 @@ class AnalyticalLayerLoader:
'validation_metrics' : json.dumps({ 'validation_metrics' : json.dumps({
'fixed_countries' : len(self.selected_country_ids), 'fixed_countries' : len(self.selected_country_ids),
'total_indicators' : int(self.df_clean['indicator_id'].nunique()), 'total_indicators' : int(self.df_clean['indicator_id'].nunique()),
'sdg_transition_year': self.sdg_transition_year, 'sdg_start_year' : self.sdg_start_year,
'framework_dist_rows' : fw_dist_rows.to_dict(), 'framework_dist_rows' : fw_dist_rows.to_dict(),
'framework_dist_inds' : ind_fw_label.to_dict(),
}) })
} }
save_etl_metadata(self.client, metadata) save_etl_metadata(self.client, metadata)
@@ -1020,11 +1064,9 @@ class AnalyticalLayerLoader:
self.logger.info("\n" + "=" * 80) self.logger.info("\n" + "=" * 80)
self.logger.info("Output: fact_asean_food_security_selected -> fs_asean_gold") self.logger.info("Output: fact_asean_food_security_selected -> fs_asean_gold")
self.logger.info("Kolom baru: norm_value_1_100 (min-max 1-100, direction-aware)") self.logger.info("Kolom baru: norm_value_1_100 (min-max 1-100, direction-aware)")
self.logger.info("Framework: per-row by year (shared indicators split MDGs/SDGs)")
self.logger.info(f"SDG Proxy: FIES only (food insecurity/food insecure)")
self.logger.info(f"Condition threshold: bad<{THRESHOLD_BAD}, good>{THRESHOLD_GOOD}") self.logger.info(f"Condition threshold: bad<{THRESHOLD_BAD}, good>{THRESHOLD_GOOD}")
self.logger.info(
"Framework: SDG-only indicators → SDGs mulai sdg_transition_year, "
"MDGs sebelumnya (data tetap ada). Non-SDG-only → MDGs selalu."
)
self.logger.info("=" * 80) self.logger.info("=" * 80)
self.load_source_data() self.load_source_data()
@@ -1032,10 +1074,10 @@ class AnalyticalLayerLoader:
self.filter_complete_indicators_per_country() self.filter_complete_indicators_per_country()
self.select_countries_with_all_pillars() self.select_countries_with_all_pillars()
self.filter_indicators_consistent_across_fixed_countries() self.filter_indicators_consistent_across_fixed_countries()
self.determine_sdg_start_year() self.determine_sdg_start_year() # Step 6: per-row framework assignment
self.verify_no_gaps() self.verify_no_gaps()
self.calculate_norm_value() self.calculate_norm_value() # Step 8: norm_value_1_100
self.calculate_yoy() self.calculate_yoy() # Step 9: yoy_change, yoy_pct
self.analyze_indicator_availability_by_year() self.analyze_indicator_availability_by_year()
self.save_analytical_table() self.save_analytical_table()
@@ -1047,7 +1089,7 @@ class AnalyticalLayerLoader:
self.logger.info("=" * 80) self.logger.info("=" * 80)
self.logger.info(f" Duration : {duration:.2f}s") self.logger.info(f" Duration : {duration:.2f}s")
self.logger.info(f" Year Range : {self.start_year}-{self.end_year}") self.logger.info(f" Year Range : {self.start_year}-{self.end_year}")
self.logger.info(f" SDG Transition Year: {self.sdg_transition_year}") self.logger.info(f" SDG Start Yr : {self.sdg_start_year}")
self.logger.info(f" Countries : {len(self.selected_country_ids)}") self.logger.info(f" Countries : {len(self.selected_country_ids)}")
self.logger.info(f" Indicators : {self.df_clean['indicator_id'].nunique()}") self.logger.info(f" Indicators : {self.df_clean['indicator_id'].nunique()}")
self.logger.info(f" Rows Loaded : {self.pipeline_metadata['rows_loaded']:,}") self.logger.info(f" Rows Loaded : {self.pipeline_metadata['rows_loaded']:,}")
@@ -1074,11 +1116,8 @@ if __name__ == "__main__":
print("BIGQUERY ANALYTICAL LAYER - DATA FILTERING") print("BIGQUERY ANALYTICAL LAYER - DATA FILTERING")
print("Output: fact_asean_food_security_selected -> fs_asean_gold") print("Output: fact_asean_food_security_selected -> fs_asean_gold")
print(f"Norm: min-max 1-100 per indicator, direction-aware") print(f"Norm: min-max 1-100 per indicator, direction-aware")
print(f"Framework: per-row by year | SDG Proxy: FIES only")
print(f"Condition threshold: bad<{THRESHOLD_BAD}, good>{THRESHOLD_GOOD}") print(f"Condition threshold: bad<{THRESHOLD_BAD}, good>{THRESHOLD_GOOD}")
print(
"Framework: SDG-only → SDGs mulai sdg_transition_year, MDGs sebelumnya. "
"Non-SDG-only → MDGs selalu."
)
print("=" * 80) print("=" * 80)
logger = setup_logging() logger = setup_logging()
@@ -1088,6 +1127,6 @@ if __name__ == "__main__":
print("\n" + "=" * 80) print("\n" + "=" * 80)
print("[OK] COMPLETED") print("[OK] COMPLETED")
print(f" SDG Transition Year : {loader.sdg_transition_year}") print(f" SDG Start Year : {loader.sdg_start_year}")
print(f" Rows Loaded : {loader.pipeline_metadata['rows_loaded']:,}") print(f" Rows Loaded : {loader.pipeline_metadata['rows_loaded']:,}")
print("=" * 80) print("=" * 80)