This commit is contained in:
Debby
2026-04-01 21:30:54 +07:00
parent b7cab36bd9
commit 189e8895c9

View File

@@ -8,32 +8,27 @@ Filtering Order:
3. Filter complete indicators PER COUNTRY (auto-detect start year, no gaps)
4. Filter countries with ALL pillars (FIXED SET)
5. Filter indicators with consistent presence across FIXED countries
6. Determine SDG start year & assign framework (MDGs/SDGs) per ROW per year
7. Verify no gaps
8. Calculate norm_value_1_100 per indicator per country (min-max, direction-aware)
→ TIDAK menghapus baris year < max_start_year
→ Semua baris tetap ada; label framework ditentukan di Step 6
6. Assign framework (MDGs/SDGs) per indicator PER ROW
→ Indikator TIDAK di SDG_ONLY_KEYWORDS → 'MDGs' selalu
→ Indikator DI SDG_ONLY_KEYWORDS + year >= sdg_transition_year → 'SDGs'
→ Indikator DI SDG_ONLY_KEYWORDS + year < sdg_transition_year → 'MDGs'
→ sdg_transition_year = min(actual_start_year) dari semua SDG-only indicators
yang lolos filter (= tahun pertama data SDG-only konsisten di semua countries)
7. Verify no gaps (dari actual_start_year per indikator, bukan start_year global)
8. Calculate norm_value_1_100 per indicator (min-max, direction-aware, global)
9. Calculate YoY per indicator per country
10. Analyze indicator availability by year
11. Save analytical table
NORMALISASI (Step 8):
- norm_value_1_100 = min-max normalisasi nilai raw per indikator, skala 1-100
- Direction-aware: lower_better diinvert sehingga nilai tinggi selalu = lebih baik
- Normalisasi dilakukan GLOBAL per indikator (semua negara, semua tahun sekaligus)
sehingga nilai antar negara dan antar tahun tetap comparable
- Kolom ini memungkinkan perbandingan antar indikator yang berbeda satuan di Looker Studio
FRAMEWORK LOGIC (Per-Row, bukan per indikator):
- sdg_start_year dideteksi dari data: tahun pertama indikator FIES lengkap
di semua fixed countries (setelah Step 3-5 filter selesai)
- Proxy deteksi sdg_start_year: HANYA FIES ("food insecurity", "food insecure")
Anemia TIDAK dipakai sebagai proxy karena datanya sudah ada sebelum era SDGs
- Framework di-assign PER BARIS (per year), bukan per indikator:
* row['year'] >= sdg_start_year AND nama ada di SDG_INDICATOR_KEYWORDS -> 'SDGs'
* Selain itu -> 'MDGs'
- Ini menangani indikator "shared" (anemia, stunting, wasting, undernourishment)
yang datanya ada sebelum SDGs:
* row lama (year < sdg_start_year) -> 'MDGs'
* row baru (year >= sdg_start_year) -> 'SDGs'
FRAMEWORK LOGIC:
- sdg_transition_year dihitung SATU KALI dari actual_start_year SDG-only indicators
- Semua SDG-only indicators menggunakan sdg_transition_year yang SAMA
sehingga label berubah serentak di satu titik waktu
- Baris sebelum sdg_transition_year → 'MDGs' (data tetap ada, tidak dihapus)
- Baris mulai sdg_transition_year → 'SDGs'
- Indikator non-SDG-only → 'MDGs' selalu
"""
import pandas as pd
@@ -60,17 +55,14 @@ from google.cloud import bigquery
# =============================================================================
# SDG INDICATOR KEYWORDS
# Daftar nama indikator (lowercase) yang masuk SDG framework.
# Indikator ini akan di-assign 'SDGs' untuk baris dengan year >= sdg_start_year,
# dan 'MDGs' untuk baris dengan year < sdg_start_year.
# SDG-ONLY INDICATOR KEYWORDS
# =============================================================================
# Hanya indikator yang MURNI BARU di era SDGs yang didaftarkan di sini.
# Indikator di set ini → 'SDGs' mulai dari sdg_transition_year.
# Semua indikator lain (shared maupun tidak dikenal) → 'MDGs' di semua tahun.
SDG_INDICATOR_KEYWORDS = frozenset([
# TARGET 2.1.1Prevalence of undernourishment (shared: ada sebelum SDGs)
"prevalence of undernourishment (percent) (3-year average)",
"number of people undernourished (million) (3-year average)",
# TARGET 2.1.2 — FIES (SDGs only — murni baru di era SDGs)
SDG_ONLY_KEYWORDS = frozenset([
# TARGET 2.1.2FIES (SDGs only)
"prevalence of severe food insecurity in the total population (percent) (3-year average)",
"prevalence of severe food insecurity in the male adult population (percent) (3-year average)",
"prevalence of severe food insecurity in the female adult population (percent) (3-year average)",
@@ -83,45 +75,14 @@ SDG_INDICATOR_KEYWORDS = frozenset([
"number of moderately or severely food insecure people (million) (3-year average)",
"number of moderately or severely food insecure male adults (million) (3-year average)",
"number of moderately or severely food insecure female adults (million) (3-year average)",
# TARGET 2.2.1Stunting (shared: ada sebelum SDGs)
"percentage of children under 5 years of age who are stunted (modelled estimates) (percent)",
"number of children under 5 years of age who are stunted (modeled estimates) (million)",
# TARGET 2.2.2 — Wasting & Overweight (shared: ada sebelum SDGs)
"percentage of children under 5 years affected by wasting (percent)",
"number of children under 5 years affected by wasting (million)",
"percentage of children under 5 years of age who are overweight (modelled estimates) (percent)",
"number of children under 5 years of age who are overweight (modeled estimates) (million)",
# TARGET 2.2.3 — Anaemia (shared: data ada sebelum SDGs, listed here agar
# baris >= sdg_start_year di-assign 'SDGs')
# TARGET 2.2.3Anaemia (SDGs only)
"prevalence of anemia among women of reproductive age (15-49 years) (percent)",
"number of women of reproductive age (15-49 years) affected by anemia (million)",
])
# =============================================================================
# SDG ERA PROXY KEYWORDS
# HANYA indikator yang MURNI baru di era SDGs (FIES saja).
# Dipakai untuk mendeteksi sdg_start_year dari data.
#
# PENTING — Anemia/anaemia TIDAK dipakai sebagai proxy:
# Data anemia sudah ada sebelum era SDGs sehingga actual_start_year-nya
# lebih awal dari sdg_start_year. Jika dipakai sebagai proxy, sdg_start_year
# akan terdeteksi terlalu awal dan seluruh baris anemia akan menjadi 'SDGs'.
# FIES adalah satu-satunya indikator yang benar-benar murni baru di era SDGs
# dan dapat dipakai sebagai penanda tahun mulainya era SDGs.
# =============================================================================
_SDG_ERA_PROXY_KEYWORDS = frozenset([
"food insecurity",
"food insecure",
])
# =============================================================================
# THRESHOLD KONDISI (fixed absolute, skala 1-100)
# =============================================================================
# Digunakan untuk assign kondisi di analysis_layer.
# Didefinisikan di sini agar konsisten antara kedua file.
# bad : norm_value_1_100 < THRESHOLD_BAD
# good : norm_value_1_100 > THRESHOLD_GOOD
# moderate : di antara keduanya
THRESHOLD_BAD = 40.0
THRESHOLD_GOOD = 60.0
@@ -130,8 +91,6 @@ THRESHOLD_GOOD = 60.0
def assign_condition(norm_value_1_100: float) -> str:
"""
Assign kondisi berdasarkan norm_value_1_100 (skala 1-100, sudah direction-aware).
Nilai tinggi selalu berarti lebih baik (lower_better sudah diinvert).
Returns: 'good' / 'moderate' / 'bad'
"""
if pd.isna(norm_value_1_100):
@@ -143,44 +102,6 @@ def assign_condition(norm_value_1_100: float) -> str:
return 'moderate'
def assign_framework_per_row(
indicator_name: str,
year: int,
sdg_start_year: int,
) -> str:
"""
Tentukan framework (MDGs/SDGs) per BARIS (per row year), bukan per indikator.
Logic:
- 'SDGs' jika KEDUA kondisi terpenuhi:
1. Nama indikator ada di SDG_INDICATOR_KEYWORDS
2. year (tahun baris ini) >= sdg_start_year
- 'MDGs' untuk semua kasus lain.
Mengapa per row, bukan per indikator?
Indikator "shared" seperti anemia, stunting, wasting, undernourishment
memiliki data yang ada SEBELUM era SDGs dimulai. Jika assign dilakukan
per indikator menggunakan actual_start_year, indikator-indikator ini
akan selalu di-assign 'MDGs' karena actual_start_year < sdg_start_year.
Dengan assign per row menggunakan year baris:
- baris lama (year < sdg_start_year) -> 'MDGs' (benar: belum era SDGs)
- baris baru (year >= sdg_start_year) -> 'SDGs' (benar: sudah era SDGs)
Contoh anemia (sdg_start_year = 2016):
- row year=2013 -> 'MDGs'
- row year=2014 -> 'MDGs'
- row year=2015 -> 'MDGs'
- row year=2016 -> 'SDGs'
- row year=2017 -> 'SDGs'
- ...
"""
name_lower = str(indicator_name).lower().strip()
in_sdg_list = name_lower in SDG_INDICATOR_KEYWORDS
if in_sdg_list and int(year) >= sdg_start_year:
return 'SDGs'
return 'MDGs'
# =============================================================================
# ANALYTICAL LAYER CLASS
# =============================================================================
@@ -194,14 +115,17 @@ class AnalyticalLayerLoader:
indicator_id, indicator_name, direction, framework,
pillar_id, pillar_name,
time_id, year, value,
norm_value_1_100, <- min-max norm per indikator, skala 1-100, direction-aware
norm_value_1_100,
yoy_change, yoy_pct
Catatan framework:
Framework di-assign PER BARIS (per year), sehingga indikator shared
seperti anemia dapat memiliki framework berbeda di baris yang berbeda:
- baris sebelum sdg_start_year -> 'MDGs'
- baris sejak sdg_start_year -> 'SDGs'
FRAMEWORK LOGIC:
- Indikator TIDAK di SDG_ONLY_KEYWORDS → 'MDGs' di SEMUA tahun
- Indikator DI SDG_ONLY_KEYWORDS:
year < sdg_transition_year 'MDGs' (data tetap ada, tidak dihapus)
year >= sdg_transition_year 'SDGs'
- sdg_transition_year = min(actual_start_year) dari semua SDG-only indicators
yang lolos filter Step 3-5. Semua SDG-only indicators menggunakan
sdg_transition_year yang SAMA agar label berubah serentak.
"""
def __init__(self, client: bigquery.Client):
@@ -215,12 +139,12 @@ class AnalyticalLayerLoader:
self.df_pillar = None
self.selected_country_ids = None
self.indicator_max_start_map = {} # indicator_id → max_start_year (dari Step 5)
self.sdg_transition_year = None # tahun SDGs mulai berlaku (dari Step 6)
self.start_year = 2013
self.end_year = None
self.baseline_year = 2023 # hardcode per syarat dosen (tahun terlengkap)
self.sdg_start_year = None
self.baseline_year = 2023
self.pipeline_metadata = {
'source_class' : self.__class__.__name__,
@@ -306,15 +230,6 @@ class AnalyticalLayerLoader:
self.logger.info("STEP 2: DETERMINE YEAR BOUNDARIES")
self.logger.info("=" * 80)
# Filter single years only (is_year_range == False)
if 'is_year_range' in self.df_clean.columns:
before = len(self.df_clean)
self.df_clean = self.df_clean[self.df_clean['is_year_range'] == False].copy()
self.logger.info(
f" Filter single years only: {before:,} -> {len(self.df_clean):,} rows"
)
# baseline_year = 2023 hardcode (syarat dosen: minimal 2023)
df_baseline = self.df_clean[self.df_clean['year'] == self.baseline_year]
baseline_indicator_count = df_baseline['indicator_id'].nunique()
@@ -479,6 +394,8 @@ class AnalyticalLayerLoader:
self.logger.info("STEP 5: FILTER INDICATORS WITH CONSISTENT PRESENCE")
self.logger.info("=" * 80)
# Hitung max_start_year per indikator = max(min_year per country)
# = tahun pertama di mana SEMUA fixed countries sudah punya data
indicator_country_start = self.df_clean.groupby([
'indicator_id', 'indicator_name', 'country_id'
])['year'].min().reset_index()
@@ -507,6 +424,8 @@ class AnalyticalLayerLoader:
})
continue
# Cek apakah semua tahun dari max_start s/d end_year
# hadir di SEMUA fixed countries
expected_years = list(range(max_start, self.end_year + 1))
ind_data = self.df_clean[self.df_clean['indicator_id'] == indicator_id]
all_years_complete = True
@@ -529,140 +448,173 @@ class AnalyticalLayerLoader:
self.logger.info(f"\n [+] Valid: {len(valid_indicators)}")
self.logger.info(f" [-] Removed: {len(removed_indicators)}")
if removed_indicators:
self.logger.info(f"\n Removed indicators:")
for item in removed_indicators:
self.logger.info(f" [-] {item['indicator_name'][:60]} | {item['reason']}")
if not valid_indicators:
raise ValueError("No valid indicators found after filtering!")
# ----------------------------------------------------------------
# Filter hanya indikator yang valid.
# PENTING: TIDAK menghapus baris year < max_start_year.
# Semua baris tetap ada — label framework ditentukan di Step 6.
# max_start_year disimpan sebagai lookup untuk Step 6 & 7.
# ----------------------------------------------------------------
original_count = len(self.df_clean)
self.df_clean = self.df_clean[
self.df_clean['indicator_id'].isin(valid_indicators)
].copy()
self.df_clean = self.df_clean.merge(
indicator_max_start[['indicator_id', 'max_start_year']],
on='indicator_id', how='left'
# Simpan max_start_year per indicator_id untuk Step 6 dan Step 7
self.indicator_max_start_map = (
indicator_max_start[indicator_max_start['indicator_id'].isin(valid_indicators)]
.set_index('indicator_id')['max_start_year']
.to_dict()
)
self.df_clean = self.df_clean[
self.df_clean['year'] >= self.df_clean['max_start_year']
].copy()
self.df_clean = self.df_clean.drop('max_start_year', axis=1)
self.logger.info(f"\n Rows before: {original_count:,}")
self.logger.info(f" Rows after: {len(self.df_clean):,}")
self.logger.info(f" Countries: {self.df_clean['country_id'].nunique()}")
self.logger.info(f" Indicators: {self.df_clean['indicator_id'].nunique()}")
self.logger.info(f" Pillars: {self.df_clean['pillar_id'].nunique()}")
self.logger.info(f"\n Rows before : {original_count:,}")
self.logger.info(f" Rows after : {len(self.df_clean):,}")
self.logger.info(f" Countries : {self.df_clean['country_id'].nunique()}")
self.logger.info(f" Indicators : {self.df_clean['indicator_id'].nunique()}")
self.logger.info(f" Pillars : {self.df_clean['pillar_id'].nunique()}")
self.logger.info(
f"\n [NOTE] Baris year < max_start_year TETAP ADA di data. "
f"Label framework akan ditentukan di Step 6."
)
return self.df_clean
# ------------------------------------------------------------------
# STEP 6: DETERMINE SDG START YEAR & ASSIGN FRAMEWORK PER ROW
# STEP 6: ASSIGN FRAMEWORK PER ROW
# ------------------------------------------------------------------
def determine_sdg_start_year(self):
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 6: DETERMINE SDG START YEAR & ASSIGN FRAMEWORK PER ROW")
self.logger.info("STEP 6: ASSIGN FRAMEWORK PER ROW")
self.logger.info("=" * 80)
self.logger.info(
" Proxy: FIES only (food insecurity/food insecure).\n"
" Anemia TIDAK dipakai sebagai proxy — datanya ada sebelum era SDGs.\n"
" Framework di-assign PER BARIS (year), bukan per indikator."
)
# actual_start_year per indikator = max(min_year per country)
# = konsisten dengan max_start_year di Step 5
indicator_actual_start = (
self.df_clean
.groupby(['indicator_id', 'indicator_name', 'country_id'])['year']
.min().reset_index()
.groupby(['indicator_id', 'indicator_name'])['year']
.max().reset_index()
)
indicator_actual_start.columns = ['indicator_id', 'indicator_name', 'actual_start_year']
# Deteksi sdg_start_year dari proxy SDGs-only (FIES saja, BUKAN anemia)
proxy_mask = indicator_actual_start['indicator_name'].str.lower().apply(
lambda n: any(kw in n for kw in _SDG_ERA_PROXY_KEYWORDS)
)
df_proxy = indicator_actual_start[proxy_mask]
if df_proxy.empty:
raise ValueError(
"Tidak ada indikator proxy SDGs (FIES) yang lolos filter. "
"Pastikan indikator FIES (food insecurity/food insecure) ada di data."
)
self.sdg_start_year = int(df_proxy['actual_start_year'].min())
self.logger.info(f"\n sdg_start_year = {self.sdg_start_year}")
self.logger.info(f" Proxy indicators (FIES only):")
for _, row in df_proxy.iterrows():
self.logger.info(f" [{int(row['actual_start_year'])}] {row['indicator_name']}")
# ----------------------------------------------------------------
# Assign framework PER BARIS menggunakan year baris, bukan actual_start_year
# Sehingga indikator "shared" (anemia, stunting, dll) mendapat:
# - 'MDGs' untuk baris sebelum sdg_start_year
# - 'SDGs' untuk baris sejak sdg_start_year
# Bangun tabel actual_start_year per indikator dari
# indicator_max_start_map yang sudah ditetapkan di Step 5.
# ----------------------------------------------------------------
self.df_clean['framework'] = self.df_clean.apply(
lambda row: assign_framework_per_row(
indicator_name = row['indicator_name'],
year = int(row['year']),
sdg_start_year = self.sdg_start_year,
),
axis=1
)
indicator_actual_start = pd.DataFrame([
{'indicator_id': ind_id, 'actual_start_year': int(start_yr)}
for ind_id, start_yr in self.indicator_max_start_map.items()
])
# ----------------------------------------------------------------
# Logging: ringkasan per indikator (frameworks apa yang muncul)
# ----------------------------------------------------------------
ind_fw_summary = (
self.df_clean
.groupby(['indicator_id', 'indicator_name'])['framework']
.unique()
.reset_index()
)
ind_fw_summary['frameworks'] = ind_fw_summary['framework'].apply(
lambda x: '/'.join(sorted(x))
)
ind_fw_summary = ind_fw_summary.merge(
indicator_actual_start[['indicator_id', 'actual_start_year']],
# Merge indicator_name untuk logging
indicator_actual_start = indicator_actual_start.merge(
self.df_clean[['indicator_id', 'indicator_name']].drop_duplicates(),
on='indicator_id', how='left'
)
self.logger.info(f"\n Framework assignment per indikator:")
self.logger.info(f" {'-'*85}")
self.logger.info(f" {'ID':<5} {'Frameworks':<18} {'ActualStart':<13} {'Indicator Name'}")
self.logger.info(f" {'-'*85}")
for _, row in ind_fw_summary.sort_values(
['frameworks', 'actual_start_year', 'indicator_name']
).iterrows():
self.logger.info(
f" {int(row['indicator_id']):<5} {row['frameworks']:<18} "
f"{int(row['actual_start_year']):<13} {row['indicator_name'][:48]}"
# Tandai mana yang SDG-only
indicator_actual_start['is_sdg_only'] = (
indicator_actual_start['indicator_name']
.str.lower().str.strip()
.isin(SDG_ONLY_KEYWORDS)
)
# Indikator dengan framework split (MDGs/SDGs) — highlight untuk validasi
split_inds = ind_fw_summary[ind_fw_summary['frameworks'] == 'MDGs/SDGs']
if not split_inds.empty:
self.logger.info(
f"\n [INFO] {len(split_inds)} indikator memiliki framework split "
f"(MDGs sebelum {self.sdg_start_year}, SDGs sejak {self.sdg_start_year}):"
# ----------------------------------------------------------------
# sdg_transition_year = min(actual_start_year) dari semua SDG-only
# indicators yang lolos filter.
# Ini adalah satu titik waktu di mana semua SDG-only indicators
# berubah dari 'MDGs' ke 'SDGs' secara SERENTAK.
# ----------------------------------------------------------------
sdg_only_df = indicator_actual_start[indicator_actual_start['is_sdg_only']]
if sdg_only_df.empty:
raise ValueError(
"Tidak ada indikator SDG-only (FIES/anaemia) yang lolos filter. "
"Pastikan indikator FIES dan anaemia ada di data."
)
self.sdg_transition_year = int(sdg_only_df['actual_start_year'].min())
self.logger.info(f"\n SDG-only indicators dan actual_start_year masing-masing:")
self.logger.info(f" {'-'*80}")
for _, row in sdg_only_df.iterrows():
self.logger.info(
f" [SDG-only] actual_start={int(row['actual_start_year'])} | "
f"{row['indicator_name']}"
)
self.logger.info(
f"\n sdg_transition_year = {self.sdg_transition_year} "
f"(min actual_start_year dari semua SDG-only indicators)"
)
self.logger.info(f"\n Logika assign framework (PER BARIS):")
self.logger.info(f" ──────────────────────────────────────────────────────────")
self.logger.info(f" Indikator TIDAK di SDG_ONLY_KEYWORDS:")
self.logger.info(f"'MDGs' di semua tahun")
self.logger.info(f" Indikator DI SDG_ONLY_KEYWORDS:")
self.logger.info(f" year < {self.sdg_transition_year}'MDGs' (data tetap ada)")
self.logger.info(f" year >= {self.sdg_transition_year}'SDGs'")
self.logger.info(f" ──────────────────────────────────────────────────────────")
# ----------------------------------------------------------------
# Assign framework dengan vectorized operation menggunakan
# sdg_transition_year (SATU nilai untuk semua SDG-only indicators)
# ----------------------------------------------------------------
# Tandai apakah setiap baris adalah SDG-only indicator
sdg_only_ids = set(
indicator_actual_start.loc[
indicator_actual_start['is_sdg_only'], 'indicator_id'
]
)
self.df_clean['_is_sdg_only'] = self.df_clean['indicator_id'].isin(sdg_only_ids)
# Assign framework:
# - Bukan SDG-only → 'MDGs'
# - SDG-only AND year >= sdg_transition_year → 'SDGs'
# - SDG-only AND year < sdg_transition_year → 'MDGs'
self.df_clean['framework'] = np.where(
self.df_clean['_is_sdg_only'] &
(self.df_clean['year'] >= self.sdg_transition_year),
'SDGs',
'MDGs'
)
# Drop kolom bantu
self.df_clean = self.df_clean.drop(columns=['_is_sdg_only'])
# ----------------------------------------------------------------
# Log verifikasi per indikator
# ----------------------------------------------------------------
self.logger.info(f"\n Verifikasi framework per indikator:")
self.logger.info(f" {'-'*110}")
self.logger.info(
f" {'ID':<5} {'Indicator Name':<52} {'Data From':<12} "
f"{'MDGs rows':<12} {'SDGs rows':<12} {'Note'}"
)
self.logger.info(f" {'-'*110}")
for ind_id, grp in self.df_clean.groupby('indicator_id'):
ind_name = grp['indicator_name'].iloc[0]
mdgs_rows = (grp['framework'] == 'MDGs').sum()
sdgs_rows = (grp['framework'] == 'SDGs').sum()
is_sdg_only = ind_id in sdg_only_ids
data_from = int(grp['year'].min())
if is_sdg_only:
note = f"SDGs from {self.sdg_transition_year}, MDGs before"
else:
note = "MDGs always"
self.logger.info(
f" {int(ind_id):<5} {ind_name[:50]:<52} {data_from:<12} "
f"{mdgs_rows:<12} {sdgs_rows:<12} {note}"
)
for _, row in split_inds.iterrows():
self.logger.info(f" - {row['indicator_name'][:60]}")
fw_summary = self.df_clean['framework'].value_counts()
self.logger.info(
f"\n Ringkasan rows: " +
" | ".join(f"{fw}: {cnt:,}" for fw, cnt in fw_summary.items())
)
self.logger.info(f"\n Ringkasan rows: " + " | ".join(
f"{fw}: {cnt:,}" for fw, cnt in fw_summary.items()
))
end_year_df = self.df_clean[self.df_clean['year'] == self.end_year]
fw_ind_summary = end_year_df.groupby('framework')['indicator_id'].nunique()
self.logger.info(f" Indicators di year={self.end_year}: " + " | ".join(
f"{fw}: {cnt}" for fw, cnt in fw_ind_summary.items()
))
self.logger.info(
f"\n [OK] 'framework' ditambahkan per row "
f"\n [OK] 'framework' ditambahkan — "
f"MDGs: {(self.df_clean['framework'] == 'MDGs').sum():,} rows | "
f"SDGs: {(self.df_clean['framework'] == 'SDGs').sum():,} rows"
)
@@ -677,23 +629,44 @@ class AnalyticalLayerLoader:
self.logger.info("STEP 7: VERIFY NO GAPS")
self.logger.info("=" * 80)
# ----------------------------------------------------------------
# Verifikasi dilakukan PER INDIKATOR dari actual_start_year-nya,
# bukan dari self.start_year global, karena tiap indikator bisa
# punya start year berbeda.
# Baris sebelum actual_start_year (yang berlabel MDGs) tidak dicek
# karena memang tidak semua country punya data di sana.
# ----------------------------------------------------------------
expected_countries = len(self.selected_country_ids)
verification = self.df_clean.groupby(
['indicator_id', 'year']
)['country_id'].nunique().reset_index()
verification.columns = ['indicator_id', 'year', 'country_count']
all_good = (verification['country_count'] == expected_countries).all()
all_good = True
bad_rows = []
for ind_id, grp in self.df_clean.groupby('indicator_id'):
actual_start = self.indicator_max_start_map.get(ind_id)
if actual_start is None:
continue
expected_years = list(range(int(actual_start), self.end_year + 1))
for year in expected_years:
country_count = grp[grp['year'] == year]['country_id'].nunique()
if country_count != expected_countries:
all_good = False
bad_rows.append({
'indicator_id' : int(ind_id),
'year' : int(year),
'country_count': int(country_count),
})
if all_good:
self.logger.info(
f" VERIFICATION PASSED — all combinations have {expected_countries} countries"
f" VERIFICATION PASSED — all combinations from actual_start_year "
f"have {expected_countries} countries"
)
else:
bad = verification[verification['country_count'] != expected_countries]
for _, row in bad.head(10).iterrows():
for row in bad_rows[:10]:
self.logger.error(
f" Indicator {int(row['indicator_id'])}, Year {int(row['year'])}: "
f"{int(row['country_count'])} countries (expected {expected_countries})"
f" Indicator {row['indicator_id']}, Year {row['year']}: "
f"{row['country_count']} countries (expected {expected_countries})"
)
raise ValueError("Gap verification failed!")
@@ -706,22 +679,7 @@ class AnalyticalLayerLoader:
def calculate_norm_value(self):
"""
Hitung norm_value_1_100 per indikator — min-max normalisasi skala 1-100,
direction-aware.
CARA KERJA:
- Normalisasi dilakukan GLOBAL per indikator (semua negara + semua tahun sekaligus)
sehingga nilai antar negara dan antar tahun tetap comparable.
- lower_better diinvert: nilai tinggi selalu = kondisi lebih baik.
Contoh: undernourishment 5% (rendah = baik) → norm tinggi setelah invert.
- Skala 1-100 (bukan 0-100) untuk menghindari nilai absolut nol di Looker Studio.
- Kolom ini memungkinkan perbandingan lintas indikator yang berbeda satuan
(persen, juta orang, dll) karena sudah dinormalisasi ke skala yang sama.
Catatan:
- Berbeda dengan norm_value di _get_norm_value_df() di analysis_layer
yang skala 0-1 dan dipakai untuk agregasi composite score.
- norm_value_1_100 ini adalah per baris (per country per year per indicator),
untuk ditampilkan langsung di Looker Studio.
direction-aware, global per indikator (semua negara + semua tahun).
"""
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 8: CALCULATE NORM_VALUE_1_100 PER INDICATOR")
@@ -735,7 +693,10 @@ class AnalyticalLayerLoader:
norm_parts = []
indicators = df.groupby(['indicator_id', 'indicator_name', 'direction'])
self.logger.info(f"\n {'ID':<5} {'Direction':<15} {'Invert':<8} {'Min':>10} {'Max':>10} {'Indicator Name'}")
self.logger.info(
f"\n {'ID':<5} {'Direction':<15} {'Invert':<8} "
f"{'Min':>10} {'Max':>10} {'Indicator Name'}"
)
self.logger.info(f" {'-'*90}")
for (ind_id, ind_name, direction), grp in indicators:
@@ -755,15 +716,11 @@ class AnalyticalLayerLoader:
normed = np.full(len(grp), np.nan)
if v_min == v_max:
# Semua nilai sama → beri nilai tengah (50.5 pada skala 1-100)
normed[valid_mask.values] = 50.5
else:
# Min-max ke 0-1 dulu
scaled = (raw - v_min) / (v_max - v_min)
# Invert jika lower_better
if do_invert:
scaled = 1.0 - scaled
# Scale ke 1-100
normed[valid_mask.values] = 1.0 + scaled * 99.0
grp['norm_value_1_100'] = normed
@@ -776,7 +733,6 @@ class AnalyticalLayerLoader:
self.df_clean = pd.concat(norm_parts, ignore_index=True)
# Statistik ringkasan
valid_norm = self.df_clean['norm_value_1_100'].notna().sum()
null_norm = self.df_clean['norm_value_1_100'].isna().sum()
self.logger.info(f"\n norm_value_1_100 — valid: {valid_norm:,} | null: {null_norm:,}")
@@ -786,10 +742,14 @@ class AnalyticalLayerLoader:
f"{self.df_clean['norm_value_1_100'].max():.2f}"
)
# Log distribusi kondisi berdasarkan threshold
self.df_clean['_condition_preview'] = self.df_clean['norm_value_1_100'].apply(assign_condition)
self.df_clean['_condition_preview'] = (
self.df_clean['norm_value_1_100'].apply(assign_condition)
)
cond_dist = self.df_clean['_condition_preview'].value_counts()
self.logger.info(f"\n Distribusi kondisi (threshold: bad<{THRESHOLD_BAD}, good>{THRESHOLD_GOOD}):")
self.logger.info(
f"\n Distribusi kondisi "
f"(threshold: bad<{THRESHOLD_BAD}, good>{THRESHOLD_GOOD}):"
)
for cond, cnt in cond_dist.items():
self.logger.info(f" {cond}: {cnt:,} rows")
self.df_clean = self.df_clean.drop(columns=['_condition_preview'])
@@ -862,45 +822,39 @@ class AnalyticalLayerLoader:
'start_year', 'end_year', 'country_count'
]
# Framework summary per indikator (bisa MDGs, SDGs, atau MDGs/SDGs split)
ind_fw = (
self.df_clean
fw_at_end = (
self.df_clean[self.df_clean['year'] == self.end_year]
.groupby('indicator_id')['framework']
.unique()
.first()
.reset_index()
)
ind_fw['framework_label'] = ind_fw['framework'].apply(
lambda x: '/'.join(sorted(x))
)
indicator_details = indicator_details.merge(
ind_fw[['indicator_id', 'framework_label']],
on='indicator_id', how='left'
)
indicator_details = indicator_details.merge(fw_at_end, on='indicator_id', how='left')
indicator_details['framework'] = indicator_details['framework'].fillna('MDGs')
indicator_details['year_range'] = (
indicator_details['start_year'].astype(int).astype(str) + '-' +
indicator_details['end_year'].astype(int).astype(str)
)
indicator_details = indicator_details.sort_values(
['framework_label', 'pillar_name', 'start_year', 'indicator_name']
['framework', 'pillar_name', 'start_year', 'indicator_name']
)
self.logger.info(f"\nTotal Indicators: {len(indicator_details)}")
self.logger.info(f"Framework breakdown (per indicator label):")
for fw, count in indicator_details.groupby('framework_label').size().items():
self.logger.info(f"Framework breakdown (at end_year={self.end_year}):")
for fw, count in indicator_details.groupby('framework').size().items():
self.logger.info(f" {fw}: {count} indicators")
self.logger.info(f"\n{'-'*115}")
self.logger.info(f"\n{'-'*110}")
self.logger.info(
f"{'ID':<5} {'Indicator Name':<55} {'Pillar':<15} "
f"{'Framework':<15} {'Years':<12} {'Dir':<8} {'Countries'}"
f"{'Framework':<10} {'Years':<12} {'Dir':<8} {'Countries'}"
)
self.logger.info(f"{'-'*115}")
self.logger.info(f"{'-'*110}")
for _, row in indicator_details.iterrows():
direction = 'higher+' if row['direction'] == 'higher_better' else 'lower-'
self.logger.info(
f"{int(row['indicator_id']):<5} {row['indicator_name'][:52]:<55} "
f"{row['pillar_name'][:13]:<15} {row['framework_label']:<15} "
f"{row['pillar_name'][:13]:<15} {row['framework']:<10} "
f"{row['year_range']:<12} {direction:<8} {int(row['country_count'])}"
)
@@ -963,22 +917,20 @@ class AnalyticalLayerLoader:
self.logger.info(f" Total rows: {len(analytical_df):,}")
# Framework distribution per row
fw_dist_rows = analytical_df['framework'].value_counts()
self.logger.info(f" Framework distribution (rows):")
for fw, cnt in fw_dist_rows.items():
self.logger.info(f" {fw}: {cnt:,} rows")
# Framework distribution per indikator (label)
ind_fw_label = (
analytical_df
.groupby('indicator_id')['framework']
.unique()
.apply(lambda x: '/'.join(sorted(x)))
fw_dist_ind = (
analytical_df[analytical_df['year'] == self.end_year]
.drop_duplicates('indicator_id')['framework']
.value_counts()
)
self.logger.info(f" Framework distribution (per indicator label):")
for fw, cnt in ind_fw_label.items():
self.logger.info(
f" Framework distribution (indicators at year={self.end_year}):"
)
for fw, cnt in fw_dist_ind.items():
self.logger.info(f" {fw}: {cnt} indicators")
self.logger.info(
@@ -1025,11 +977,16 @@ class AnalyticalLayerLoader:
'start_year' : self.start_year,
'end_year' : self.end_year,
'baseline_year' : self.baseline_year,
'sdg_start_year' : self.sdg_start_year,
'sdg_transition_year' : self.sdg_transition_year,
'fixed_countries' : len(self.selected_country_ids),
'norm_scale' : '1-100 per indicator global minmax direction-aware',
'framework_assignment' : 'per-row by year (not per-indicator)',
'sdg_proxy_keywords' : list(_SDG_ERA_PROXY_KEYWORDS),
'framework_logic' : (
'sdg_transition_year = min(actual_start_year) dari SDG-only indicators; '
'SDG-only year >= sdg_transition_year → SDGs; '
'SDG-only year < sdg_transition_year → MDGs (data tetap ada); '
'non-SDG-only → MDGs selalu'
),
'sdg_only_keywords_count': len(SDG_ONLY_KEYWORDS),
'condition_thresholds' : {
'bad' : f'< {THRESHOLD_BAD}',
'moderate': f'{THRESHOLD_BAD}-{THRESHOLD_GOOD}',
@@ -1039,9 +996,8 @@ class AnalyticalLayerLoader:
'validation_metrics' : json.dumps({
'fixed_countries' : len(self.selected_country_ids),
'total_indicators' : int(self.df_clean['indicator_id'].nunique()),
'sdg_start_year' : self.sdg_start_year,
'framework_dist_rows' : fw_dist_rows.to_dict(),
'framework_dist_inds' : ind_fw_label.to_dict(),
'sdg_transition_year': self.sdg_transition_year,
'framework_dist_rows': fw_dist_rows.to_dict(),
})
}
save_etl_metadata(self.client, metadata)
@@ -1064,9 +1020,11 @@ class AnalyticalLayerLoader:
self.logger.info("\n" + "=" * 80)
self.logger.info("Output: fact_asean_food_security_selected -> fs_asean_gold")
self.logger.info("Kolom baru: norm_value_1_100 (min-max 1-100, direction-aware)")
self.logger.info("Framework: per-row by year (shared indicators split MDGs/SDGs)")
self.logger.info(f"SDG Proxy: FIES only (food insecurity/food insecure)")
self.logger.info(f"Condition threshold: bad<{THRESHOLD_BAD}, good>{THRESHOLD_GOOD}")
self.logger.info(
"Framework: SDG-only indicators → SDGs mulai sdg_transition_year, "
"MDGs sebelumnya (data tetap ada). Non-SDG-only → MDGs selalu."
)
self.logger.info("=" * 80)
self.load_source_data()
@@ -1074,10 +1032,10 @@ class AnalyticalLayerLoader:
self.filter_complete_indicators_per_country()
self.select_countries_with_all_pillars()
self.filter_indicators_consistent_across_fixed_countries()
self.determine_sdg_start_year() # Step 6: per-row framework assignment
self.determine_sdg_start_year()
self.verify_no_gaps()
self.calculate_norm_value() # Step 8: norm_value_1_100
self.calculate_yoy() # Step 9: yoy_change, yoy_pct
self.calculate_norm_value()
self.calculate_yoy()
self.analyze_indicator_availability_by_year()
self.save_analytical_table()
@@ -1089,7 +1047,7 @@ class AnalyticalLayerLoader:
self.logger.info("=" * 80)
self.logger.info(f" Duration : {duration:.2f}s")
self.logger.info(f" Year Range : {self.start_year}-{self.end_year}")
self.logger.info(f" SDG Start Yr : {self.sdg_start_year}")
self.logger.info(f" SDG Transition Year: {self.sdg_transition_year}")
self.logger.info(f" Countries : {len(self.selected_country_ids)}")
self.logger.info(f" Indicators : {self.df_clean['indicator_id'].nunique()}")
self.logger.info(f" Rows Loaded : {self.pipeline_metadata['rows_loaded']:,}")
@@ -1116,8 +1074,11 @@ if __name__ == "__main__":
print("BIGQUERY ANALYTICAL LAYER - DATA FILTERING")
print("Output: fact_asean_food_security_selected -> fs_asean_gold")
print(f"Norm: min-max 1-100 per indicator, direction-aware")
print(f"Framework: per-row by year | SDG Proxy: FIES only")
print(f"Condition threshold: bad<{THRESHOLD_BAD}, good>{THRESHOLD_GOOD}")
print(
"Framework: SDG-only → SDGs mulai sdg_transition_year, MDGs sebelumnya. "
"Non-SDG-only → MDGs selalu."
)
print("=" * 80)
logger = setup_logging()
@@ -1127,6 +1088,6 @@ if __name__ == "__main__":
print("\n" + "=" * 80)
print("[OK] COMPLETED")
print(f" SDG Start Year : {loader.sdg_start_year}")
print(f" SDG Transition Year : {loader.sdg_transition_year}")
print(f" Rows Loaded : {loader.pipeline_metadata['rows_loaded']:,}")
print("=" * 80)