sdg start year and label condition

This commit is contained in:
Debby
2026-03-31 15:42:11 +07:00
parent ddc9fb3b48
commit beb494f89c
2 changed files with 513 additions and 690 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -4,24 +4,31 @@ fact_asean_food_security_selected disimpan di fs_asean_gold (layer='gold')
Filtering Order: Filtering Order:
1. Load data (single years only) 1. Load data (single years only)
2. Determine year boundaries (2013 - auto-detected end year) 2. Determine year boundaries (2013 - auto-detected end year, baseline=2023 per syarat dosen)
3. Filter complete indicators PER COUNTRY (auto-detect start year, no gaps) 3. Filter complete indicators PER COUNTRY (auto-detect start year, no gaps)
4. Filter countries with ALL pillars (FIXED SET) 4. Filter countries with ALL pillars (FIXED SET)
5. Filter indicators with consistent presence across FIXED countries 5. Filter indicators with consistent presence across FIXED countries
6. Determine SDGs start year & assign framework (MDGs/SDGs) per indicator 6. Determine SDG start year & assign framework (MDGs/SDGs) per indicator
7. Calculate YoY per indicator per country 7. Verify no gaps
8. Analyze indicator availability by year 8. Calculate norm_value_1_100 per indicator per country (min-max, direction-aware)
9. Save analytical table (dengan nama/label lengkap + kolom framework + YoY untuk Looker Studio) 9. Calculate YoY per indicator per country
10. Analyze indicator availability by year
11. Save analytical table
NORMALISASI (Step 8):
- norm_value_1_100 = min-max normalisasi nilai raw per indikator, skala 1-100
- Direction-aware: lower_better diinvert sehingga nilai tinggi selalu = lebih baik
- Normalisasi dilakukan GLOBAL per indikator (semua negara, semua tahun sekaligus)
sehingga nilai antar negara dan antar tahun tetap comparable
- Kolom ini memungkinkan perbandingan antar indikator yang berbeda satuan di Looker Studio
FRAMEWORK LOGIC: FRAMEWORK LOGIC:
- SDG_START_YEAR = 2016 (default; auto-detect jika indikator SDGs pertama kali muncul lebih awal/lambat) - SDG start year dideteksi dari data: tahun pertama indikator FIES lengkap
di semua fixed countries (setelah Step 3-5 filter selesai)
- Indikator yang namanya ada di SDG_INDICATOR_KEYWORDS: - Indikator yang namanya ada di SDG_INDICATOR_KEYWORDS:
* Jika data mulai >= SDG_START_YEAR -> 'SDGs' * Jika actual_start_year >= sdg_start_year -> 'SDGs'
* Jika data mulai < SDG_START_YEAR -> 'MDGs' * Jika actual_start_year < sdg_start_year -> 'MDGs'
(artinya indikator ini sudah ada sebelum SDGs, mis. undernourishment)
- Indikator yang namanya TIDAK ada di SDG_INDICATOR_KEYWORDS -> 'MDGs' - Indikator yang namanya TIDAK ada di SDG_INDICATOR_KEYWORDS -> 'MDGs'
- Penentuan framework dilakukan SETELAH filter selesai (data sudah bersih & range sudah fixed)
sehingga start_year per indikator yang digunakan adalah start_year AKTUAL di dataset ini.
""" """
import pandas as pd import pandas as pd
@@ -50,15 +57,6 @@ from google.cloud import bigquery
# ============================================================================= # =============================================================================
# SDG INDICATOR KEYWORDS # SDG INDICATOR KEYWORDS
# ============================================================================= # =============================================================================
# Daftar nama indikator (lowercase) yang termasuk dalam SDG Goal 2.
# Matching dilakukan dengan `kw in indicator_name.lower()` sehingga
# partial match tetap valid (menangani variasi format nama).
#
# Logika framework:
# - Nama ada di set ini + start_year >= SDG_START_YEAR -> 'SDGs'
# - Nama ada di set ini + start_year < SDG_START_YEAR -> 'MDGs'
# (indikator sudah eksis sebelum SDGs, mis. prevalence of undernourishment)
# - Nama TIDAK ada di set ini -> 'MDGs'
SDG_INDICATOR_KEYWORDS = frozenset([ SDG_INDICATOR_KEYWORDS = frozenset([
# TARGET 2.1.1 — Prevalence of undernourishment (shared, sudah ada sebelum SDGs) # TARGET 2.1.1 — Prevalence of undernourishment (shared, sudah ada sebelum SDGs)
@@ -90,34 +88,55 @@ SDG_INDICATOR_KEYWORDS = frozenset([
"number of women of reproductive age (15-49 years) affected by anemia (million)", "number of women of reproductive age (15-49 years) affected by anemia (million)",
]) ])
# Tahun resmi SDGs mulai berlaku (2030 Agenda adopted September 2015, # Proxy keywords untuk deteksi era SDGs dari data (indikator murni baru di SDGs)
# data reporting mulai 2016). Dipakai sebagai default jika auto-detect gagal. _SDG_ERA_PROXY_KEYWORDS = frozenset([
SDG_START_YEAR_DEFAULT = 2016 "food insecurity",
"anemia",
"anaemia",
])
# =============================================================================
# THRESHOLD KONDISI (fixed absolute, skala 1-100)
# =============================================================================
# Digunakan untuk assign kondisi di analysis_layer.
# Didefinisikan di sini agar konsisten antara kedua file.
# bad : norm_value_1_100 < THRESHOLD_BAD
# good : norm_value_1_100 > THRESHOLD_GOOD
# moderate : di antara keduanya
THRESHOLD_BAD = 40.0
THRESHOLD_GOOD = 60.0
def assign_framework_dynamic( def assign_condition(norm_value_1_100: float) -> str:
"""
Assign kondisi berdasarkan norm_value_1_100 (skala 1-100, sudah direction-aware).
Nilai tinggi selalu berarti lebih baik (lower_better sudah diinvert).
Returns: 'good' / 'moderate' / 'bad'
"""
if pd.isna(norm_value_1_100):
return None
if norm_value_1_100 > THRESHOLD_GOOD:
return 'good'
if norm_value_1_100 < THRESHOLD_BAD:
return 'bad'
return 'moderate'
def assign_framework(
indicator_name: str, indicator_name: str,
indicator_start_year: int, actual_start_year: int,
sdg_start_year: int, sdg_start_year: int,
) -> str: ) -> str:
""" """
Tentukan framework (MDGs/SDGs) berdasarkan: Tentukan framework (MDGs/SDGs) per indikator.
1. Apakah nama indikator ada di SDG_INDICATOR_KEYWORDS? 'SDGs' jika nama ada di SDG_INDICATOR_KEYWORDS DAN actual_start_year >= sdg_start_year.
2. Apakah data indikator ini mulai pada tahun >= sdg_start_year? 'MDGs' untuk semua kasus lainnya.
Args:
indicator_name : Nama indikator (akan di-lowercase untuk matching)
indicator_start_year : Tahun pertama data indikator ini tersedia di dataset
sdg_start_year : Tahun mulai SDGs (dari auto-detect atau default)
Returns:
'SDGs' jika indikator termasuk SDG list DAN mulai >= sdg_start_year
'MDGs' untuk semua kasus lainnya
""" """
ind_lower = str(indicator_name).lower().strip() name_lower = str(indicator_name).lower().strip()
is_sdg_name = any(kw in ind_lower for kw in SDG_INDICATOR_KEYWORDS) in_sdg_list = name_lower in SDG_INDICATOR_KEYWORDS
if in_sdg_list and actual_start_year >= sdg_start_year:
if is_sdg_name and indicator_start_year >= sdg_start_year:
return 'SDGs' return 'SDGs'
return 'MDGs' return 'MDGs'
@@ -130,21 +149,12 @@ class AnalyticalLayerLoader:
""" """
Analytical Layer Loader for BigQuery Analytical Layer Loader for BigQuery
Key Logic: Output kolom fact_asean_food_security_selected:
1. Complete per country (no gaps from start_year to end_year)
2. Filter countries with all pillars
3. Ensure indicators have consistent country count across all years
4. Determine SDGs start year & assign framework per indicator dynamically
5. Calculate YoY (year-over-year) change per indicator per country
6. Save dengan kolom lengkap (nama + ID + framework + YoY) untuk Looker Studio
Output: fact_asean_food_security_selected -> DW layer (Gold) -> fs_asean_gold
Kolom output:
country_id, country_name, country_id, country_name,
indicator_id, indicator_name, direction, framework, indicator_id, indicator_name, direction, framework,
pillar_id, pillar_name, pillar_id, pillar_name,
time_id, year, value, time_id, year, value,
norm_value_1_100, <- NEWmin-max norm per indikator, skala 1-100, direction-aware
yoy_change, yoy_pct yoy_change, yoy_pct
""" """
@@ -162,10 +172,9 @@ class AnalyticalLayerLoader:
self.start_year = 2013 self.start_year = 2013
self.end_year = None self.end_year = None
self.baseline_year = 2023 self.baseline_year = 2023 # hardcode per syarat dosen (tahun terlengkap)
# SDGs-related — di-set oleh determine_sdg_start_year() self.sdg_start_year = None
self.sdg_start_year = SDG_START_YEAR_DEFAULT
self.pipeline_metadata = { self.pipeline_metadata = {
'source_class' : self.__class__.__name__, 'source_class' : self.__class__.__name__,
@@ -191,8 +200,6 @@ class AnalyticalLayerLoader:
self.logger.info("=" * 80) self.logger.info("=" * 80)
try: try:
# Tidak include framework dari dim_indicator —
# framework akan ditentukan dinamis di Step 6 (determine_sdg_start_year)
query = f""" query = f"""
SELECT SELECT
f.country_id, f.country_id,
@@ -224,12 +231,9 @@ class AnalyticalLayerLoader:
if 'is_year_range' in self.df_clean.columns: if 'is_year_range' in self.df_clean.columns:
yr = self.df_clean['is_year_range'].value_counts() yr = self.df_clean['is_year_range'].value_counts()
self.logger.info(f" Breakdown:")
self.logger.info( self.logger.info(
f" Single years (is_year_range=False): {yr.get(False, 0):,}" f" Single years: {yr.get(False, 0):,} | "
) f"Year ranges: {yr.get(True, 0):,}"
self.logger.info(
f" Year ranges (is_year_range=True): {yr.get(True, 0):,}"
) )
self.df_indicator = read_from_bigquery(self.client, 'dim_indicator', layer='gold') self.df_indicator = read_from_bigquery(self.client, 'dim_indicator', layer='gold')
@@ -256,29 +260,31 @@ class AnalyticalLayerLoader:
self.logger.info("STEP 2: DETERMINE YEAR BOUNDARIES") self.logger.info("STEP 2: DETERMINE YEAR BOUNDARIES")
self.logger.info("=" * 80) self.logger.info("=" * 80)
df_2023 = self.df_clean[self.df_clean['year'] == self.baseline_year] # baseline_year = 2023 hardcode (syarat dosen: minimal 2023)
baseline_indicator_count = df_2023['indicator_id'].nunique() df_baseline = self.df_clean[self.df_clean['year'] == self.baseline_year]
baseline_indicator_count = df_baseline['indicator_id'].nunique()
self.logger.info(f"\nBaseline Year: {self.baseline_year}") self.logger.info(f"\n Baseline year (hardcode, syarat dosen): {self.baseline_year}")
self.logger.info(f"Baseline Indicator Count: {baseline_indicator_count}") self.logger.info(f" Baseline indicator count: {baseline_indicator_count}")
years_sorted = sorted(self.df_clean['year'].unique(), reverse=True) years_sorted = sorted(self.df_clean['year'].unique(), reverse=True)
selected_end_year = None selected_end_year = None
self.logger.info(f"\n Scanning end_year (>= {self.baseline_year}):")
for year in years_sorted: for year in years_sorted:
if year >= self.baseline_year: if year >= self.baseline_year:
df_year = self.df_clean[self.df_clean['year'] == year] df_year = self.df_clean[self.df_clean['year'] == year]
year_indicator_count = df_year['indicator_id'].nunique() year_indicator_count = df_year['indicator_id'].nunique()
status = "OK" if year_indicator_count >= baseline_indicator_count else "X" status = "OK" if year_indicator_count >= baseline_indicator_count else "X"
self.logger.info(f" [{status}] Year {int(year)}: {year_indicator_count} indicators") self.logger.info(f" [{status}] Year {int(year)}: {year_indicator_count} indicators")
if year_indicator_count >= baseline_indicator_count and selected_end_year is None: if year_indicator_count >= baseline_indicator_count and selected_end_year is None:
selected_end_year = int(year) selected_end_year = int(year)
if selected_end_year is None: if selected_end_year is None:
selected_end_year = self.baseline_year selected_end_year = self.baseline_year
self.logger.warning(f" [!] No year found, using baseline: {selected_end_year}") self.logger.warning(f" [!] Fallback to baseline: {selected_end_year}")
else: else:
self.logger.info(f"\n [OK] Selected End Year: {selected_end_year}") self.logger.info(f"\n [OK] Selected end year: {selected_end_year}")
self.end_year = selected_end_year self.end_year = selected_end_year
original_count = len(self.df_clean) original_count = len(self.df_clean)
@@ -288,9 +294,9 @@ class AnalyticalLayerLoader:
(self.df_clean['year'] <= self.end_year) (self.df_clean['year'] <= self.end_year)
].copy() ].copy()
self.logger.info(f"\nFiltering {self.start_year}-{self.end_year}:") self.logger.info(f"\n Filtering {self.start_year}-{self.end_year}:")
self.logger.info(f" Rows before: {original_count:,}") self.logger.info(f" Rows before: {original_count:,}")
self.logger.info(f" Rows after: {len(self.df_clean):,}") self.logger.info(f" Rows after : {len(self.df_clean):,}")
return self.df_clean return self.df_clean
# ------------------------------------------------------------------ # ------------------------------------------------------------------
@@ -463,9 +469,7 @@ class AnalyticalLayerLoader:
else: else:
removed_indicators.append({ removed_indicators.append({
'indicator_name': indicator_name, 'indicator_name': indicator_name,
'reason' : ( 'reason' : f"missing countries in years: {', '.join(problematic_years[:5])}"
f"missing countries in years: {', '.join(problematic_years[:5])}"
)
}) })
self.logger.info(f"\n [+] Valid: {len(valid_indicators)}") self.logger.info(f"\n [+] Valid: {len(valid_indicators)}")
@@ -500,133 +504,86 @@ class AnalyticalLayerLoader:
# ------------------------------------------------------------------ # ------------------------------------------------------------------
def determine_sdg_start_year(self): def determine_sdg_start_year(self):
"""
Tentukan tahun mulai SDGs secara otomatis dari data aktual, lalu
assign kolom 'framework' (MDGs/SDGs) ke setiap baris di df_clean.
Logika penentuan SDG_START_YEAR:
- Cari indikator yang namanya ada di SDG_INDICATOR_KEYWORDS (FIES, anaemia, dll.)
dan yang diyakini HANYA ada di SDGs (bukan shared dengan MDGs).
Proxy: indikator dengan keyword 'food insecurity' atau 'anemia'.
- Ambil tahun pertama (min year) dari indikator-indikator tersebut di dataset ini.
- Jika ditemukan -> sdg_start_year = tahun pertama itu.
- Jika tidak ditemukan -> sdg_start_year = SDG_START_YEAR_DEFAULT (2016).
Logika assign framework per indikator (assign_framework_dynamic):
- Nama ada di SDG_INDICATOR_KEYWORDS + start_year >= sdg_start_year -> 'SDGs'
- Nama ada di SDG_INDICATOR_KEYWORDS + start_year < sdg_start_year -> 'MDGs'
(indikator seperti undernourishment sudah ada sebelum SDGs)
- Nama TIDAK ada di SDG_INDICATOR_KEYWORDS -> 'MDGs'
"""
self.logger.info("\n" + "=" * 80) self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 6: DETERMINE SDG START YEAR & ASSIGN FRAMEWORK") self.logger.info("STEP 6: DETERMINE SDG START YEAR & ASSIGN FRAMEWORK")
self.logger.info("=" * 80) self.logger.info("=" * 80)
# --- 6a. Auto-detect SDG start year dari data aktual --- # actual_start_year per indikator = max(min_year per country)
# Proxy SDGs-only: indikator yang pasti baru di SDGs (FIES & anaemia) # = konsisten dengan max_start_year di Step 5
sdg_proxy_keywords = [ indicator_actual_start = (
'food insecurity',
'anemia',
'anaemia',
]
sdg_proxy_mask = self.df_clean['indicator_name'].str.lower().apply(
lambda n: any(kw in n for kw in sdg_proxy_keywords)
)
df_sdg_proxy = self.df_clean[sdg_proxy_mask]
if len(df_sdg_proxy) > 0:
detected_start = int(df_sdg_proxy['year'].min())
self.sdg_start_year = detected_start
self.logger.info(
f"\n [OK] SDG start year AUTO-DETECTED dari data: {self.sdg_start_year}"
)
self.logger.info(f" Proxy indicators used (sample):")
proxy_sample = (
df_sdg_proxy['indicator_name']
.drop_duplicates()
.head(5)
.tolist()
)
for ind in proxy_sample:
self.logger.info(f" - {ind}")
else:
self.sdg_start_year = SDG_START_YEAR_DEFAULT
self.logger.warning(
f"\n [WARN] SDG proxy indicators not found in dataset. "
f"Using default: {self.sdg_start_year}"
)
self.logger.info(f"\n SDG_START_YEAR = {self.sdg_start_year}")
# --- 6b. Hitung start_year aktual per indikator di dataset ini ---
indicator_start = (
self.df_clean self.df_clean
.groupby(['indicator_id', 'indicator_name', 'country_id'])['year']
.min().reset_index()
.groupby(['indicator_id', 'indicator_name'])['year'] .groupby(['indicator_id', 'indicator_name'])['year']
.min() .max().reset_index()
.reset_index()
) )
indicator_start.columns = ['indicator_id', 'indicator_name', 'actual_start_year'] indicator_actual_start.columns = ['indicator_id', 'indicator_name', 'actual_start_year']
# --- 6c. Assign framework per indikator --- # Deteksi sdg_start_year dari proxy SDGs-only (FIES & anaemia)
indicator_start['framework'] = indicator_start.apply( proxy_mask = indicator_actual_start['indicator_name'].str.lower().apply(
lambda row: assign_framework_dynamic( lambda n: any(kw in n for kw in _SDG_ERA_PROXY_KEYWORDS)
indicator_name = row['indicator_name'], )
indicator_start_year = int(row['actual_start_year']), df_proxy = indicator_actual_start[proxy_mask]
sdg_start_year = self.sdg_start_year,
if df_proxy.empty:
raise ValueError(
"Tidak ada indikator proxy SDGs (FIES/anaemia) yang lolos filter. "
"Pastikan indikator FIES dan anaemia ada di data."
)
self.sdg_start_year = int(df_proxy['actual_start_year'].min())
self.logger.info(f"\n sdg_start_year = {self.sdg_start_year}")
self.logger.info(f" Proxy indicators:")
for _, row in df_proxy.iterrows():
self.logger.info(f" [{int(row['actual_start_year'])}] {row['indicator_name']}")
# Assign framework per indikator
indicator_actual_start['framework'] = indicator_actual_start.apply(
lambda row: assign_framework(
indicator_name = row['indicator_name'],
actual_start_year = int(row['actual_start_year']),
sdg_start_year = self.sdg_start_year,
), ),
axis=1 axis=1
) )
# --- 6d. Log hasil assignment --- # Log hasil
self.logger.info(f"\n Framework assignment per indicator:") self.logger.info(f"\n Framework assignment:")
self.logger.info(f" {'-'*85}") self.logger.info(f" {'-'*80}")
self.logger.info( self.logger.info(f" {'ID':<5} {'Framework':<10} {'Start Yr':<10} {'Indicator Name'}")
f" {'ID':<5} {'Framework':<10} {'Start Yr':<10} {'Indicator Name'}" self.logger.info(f" {'-'*80}")
) for _, row in indicator_actual_start.sort_values(
self.logger.info(f" {'-'*85}")
for _, row in indicator_start.sort_values(
['framework', 'actual_start_year', 'indicator_name'] ['framework', 'actual_start_year', 'indicator_name']
).iterrows(): ).iterrows():
is_in_sdg_list = any(
kw in str(row['indicator_name']).lower()
for kw in SDG_INDICATOR_KEYWORDS
)
note = " [in SDG list]" if is_in_sdg_list else ""
self.logger.info( self.logger.info(
f" {int(row['indicator_id']):<5} {row['framework']:<10} " f" {int(row['indicator_id']):<5} {row['framework']:<10} "
f"{int(row['actual_start_year']):<10} {row['indicator_name'][:55]}{note}" f"{int(row['actual_start_year']):<10} {row['indicator_name'][:55]}"
) )
fw_summary = indicator_start['framework'].value_counts() fw_summary = indicator_actual_start['framework'].value_counts()
self.logger.info(f"\n Framework summary:") self.logger.info(f"\n Ringkasan: " + " | ".join(f"{fw}: {cnt}" for fw, cnt in fw_summary.items()))
for fw, cnt in fw_summary.items():
self.logger.info(f" {fw}: {cnt} indicators")
# --- 6e. Merge framework ke df_clean --- # Merge ke df_clean
self.df_clean = self.df_clean.merge( self.df_clean = self.df_clean.merge(
indicator_start[['indicator_id', 'framework']], indicator_actual_start[['indicator_id', 'framework']],
on='indicator_id', how='left' on='indicator_id', how='left'
) )
self.df_clean['framework'] = self.df_clean['framework'].fillna('MDGs') self.df_clean['framework'] = self.df_clean['framework'].fillna('MDGs')
self.logger.info(f"\n [OK] Kolom 'framework' ditambahkan ke df_clean")
self.logger.info( self.logger.info(
f" Row distribution — MDGs: " f"\n [OK] 'framework' ditambahkan — "
f"{(self.df_clean['framework'] == 'MDGs').sum():,} | " f"MDGs: {(self.df_clean['framework'] == 'MDGs').sum():,} rows | "
f"SDGs: {(self.df_clean['framework'] == 'SDGs').sum():,}" f"SDGs: {(self.df_clean['framework'] == 'SDGs').sum():,} rows"
) )
return self.df_clean return self.df_clean
# ------------------------------------------------------------------ # ------------------------------------------------------------------
# STEP 6b: VERIFY NO GAPS # STEP 7: VERIFY NO GAPS
# ------------------------------------------------------------------ # ------------------------------------------------------------------
def verify_no_gaps(self): def verify_no_gaps(self):
self.logger.info("\n" + "=" * 80) self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 6c: VERIFY NO GAPS") self.logger.info("STEP 7: VERIFY NO GAPS")
self.logger.info("=" * 80) self.logger.info("=" * 80)
expected_countries = len(self.selected_country_ids) expected_countries = len(self.selected_country_ids)
@@ -652,21 +609,110 @@ class AnalyticalLayerLoader:
return True return True
# ------------------------------------------------------------------ # ------------------------------------------------------------------
# STEP 7: CALCULATE YOY # STEP 8: CALCULATE NORM_VALUE_1_100 PER INDICATOR PER COUNTRY
# ------------------------------------------------------------------
def calculate_norm_value(self):
"""
Hitung norm_value_1_100 per indikator — min-max normalisasi skala 1-100,
direction-aware.
CARA KERJA:
- Normalisasi dilakukan GLOBAL per indikator (semua negara + semua tahun sekaligus)
sehingga nilai antar negara dan antar tahun tetap comparable.
- lower_better diinvert: nilai tinggi selalu = kondisi lebih baik.
Contoh: undernourishment 5% (rendah = baik) → norm tinggi setelah invert.
- Skala 1-100 (bukan 0-100) untuk menghindari nilai absolut nol di Looker Studio.
- Kolom ini memungkinkan perbandingan lintas indikator yang berbeda satuan
(persen, juta orang, dll) karena sudah dinormalisasi ke skala yang sama.
Catatan:
- Berbeda dengan norm_value di _get_norm_value_df() di analysis_layer
yang skala 0-1 dan dipakai untuk agregasi composite score.
- norm_value_1_100 ini adalah per baris (per country per year per indicator),
untuk ditampilkan langsung di Looker Studio.
"""
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 8: CALCULATE NORM_VALUE_1_100 PER INDICATOR")
self.logger.info("=" * 80)
DIRECTION_INVERT = frozenset({
"negative", "lower_better", "lower_is_better", "inverse", "neg",
})
df = self.df_clean.copy()
norm_parts = []
indicators = df.groupby(['indicator_id', 'indicator_name', 'direction'])
self.logger.info(f"\n {'ID':<5} {'Direction':<15} {'Invert':<8} {'Min':>10} {'Max':>10} {'Indicator Name'}")
self.logger.info(f" {'-'*90}")
for (ind_id, ind_name, direction), grp in indicators:
grp = grp.copy()
do_invert = str(direction).lower().strip() in DIRECTION_INVERT
valid_mask = grp['value'].notna()
n_valid = valid_mask.sum()
if n_valid < 2:
grp['norm_value_1_100'] = np.nan
norm_parts.append(grp)
continue
raw = grp.loc[valid_mask, 'value'].values
v_min = raw.min()
v_max = raw.max()
normed = np.full(len(grp), np.nan)
if v_min == v_max:
# Semua nilai sama → beri nilai tengah (50.5 pada skala 1-100)
normed[valid_mask.values] = 50.5
else:
# Min-max ke 0-1 dulu
scaled = (raw - v_min) / (v_max - v_min)
# Invert jika lower_better
if do_invert:
scaled = 1.0 - scaled
# Scale ke 1-100
normed[valid_mask.values] = 1.0 + scaled * 99.0
grp['norm_value_1_100'] = normed
self.logger.info(
f" {int(ind_id):<5} {direction:<15} {'YES' if do_invert else 'no':<8} "
f"{v_min:>10.3f} {v_max:>10.3f} {ind_name[:45]}"
)
norm_parts.append(grp)
self.df_clean = pd.concat(norm_parts, ignore_index=True)
# Statistik ringkasan
valid_norm = self.df_clean['norm_value_1_100'].notna().sum()
null_norm = self.df_clean['norm_value_1_100'].isna().sum()
self.logger.info(f"\n norm_value_1_100 — valid: {valid_norm:,} | null: {null_norm:,}")
self.logger.info(
f" Range aktual: "
f"{self.df_clean['norm_value_1_100'].min():.2f} - "
f"{self.df_clean['norm_value_1_100'].max():.2f}"
)
# Log distribusi kondisi berdasarkan threshold
self.df_clean['_condition_preview'] = self.df_clean['norm_value_1_100'].apply(assign_condition)
cond_dist = self.df_clean['_condition_preview'].value_counts()
self.logger.info(f"\n Distribusi kondisi (threshold: bad<{THRESHOLD_BAD}, good>{THRESHOLD_GOOD}):")
for cond, cnt in cond_dist.items():
self.logger.info(f" {cond}: {cnt:,} rows")
self.df_clean = self.df_clean.drop(columns=['_condition_preview'])
self.logger.info(f"\n [OK] Kolom 'norm_value_1_100' ditambahkan ke df_clean")
return self.df_clean
# ------------------------------------------------------------------
# STEP 9: CALCULATE YOY
# ------------------------------------------------------------------ # ------------------------------------------------------------------
def calculate_yoy(self): def calculate_yoy(self):
"""
Hitung Year-over-Year (YoY) per indikator per negara.
Kolom yang ditambahkan:
yoy_change : selisih absolut -> value - value_tahun_sebelumnya
yoy_pct : perubahan relatif -> (yoy_change / abs(value_prev)) * 100
Baris tahun pertama per kombinasi country-indicator bernilai NULL (intentional).
"""
self.logger.info("\n" + "=" * 80) self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 7: CALCULATE YEAR-OVER-YEAR (YoY) PER INDICATOR PER COUNTRY") self.logger.info("STEP 9: CALCULATE YEAR-OVER-YEAR (YoY) PER INDICATOR PER COUNTRY")
self.logger.info("=" * 80) self.logger.info("=" * 80)
df = self.df_clean.sort_values(['country_id', 'indicator_id', 'year']).copy() df = self.df_clean.sort_values(['country_id', 'indicator_id', 'year']).copy()
@@ -686,62 +732,19 @@ class AnalyticalLayerLoader:
self.logger.info(f" Total rows : {total_rows:,}") self.logger.info(f" Total rows : {total_rows:,}")
self.logger.info(f" YoY calculated : {valid_yoy:,}") self.logger.info(f" YoY calculated : {valid_yoy:,}")
self.logger.info(f" YoY NULL (base yr): {null_yoy:,} <- tahun pertama per country-indicator") self.logger.info(f" YoY NULL (base yr): {null_yoy:,}")
per_ind = (
df[df['yoy_pct'].notna()]
.groupby(['indicator_id', 'indicator_name'])['yoy_pct']
.agg(['mean', 'std', 'min', 'max'])
.reset_index()
)
per_ind.columns = ['indicator_id', 'indicator_name', 'mean', 'std', 'min', 'max']
self.logger.info(f"\n YoY summary per indicator (top 10 by abs mean change):")
self.logger.info(f" {'-'*100}")
self.logger.info(
f" {'ID':<5} {'Indicator Name':<52} {'Mean%':>8} {'Std%':>8} {'Min%':>8} {'Max%':>8}"
)
self.logger.info(f" {'-'*100}")
top_ind = per_ind.reindex(
per_ind['mean'].abs().sort_values(ascending=False).index
).head(10)
for _, row in top_ind.iterrows():
self.logger.info(
f" {int(row['indicator_id']):<5} {row['indicator_name'][:50]:<52} "
f"{row['mean']:>+8.2f} {row['std']:>8.2f} "
f"{row['min']:>+8.2f} {row['max']:>+8.2f}"
)
per_country = (
df[df['yoy_pct'].notna()]
.groupby(['country_id', 'country_name'])['yoy_pct']
.agg(['mean', 'std'])
.reset_index()
)
per_country.columns = ['country_id', 'country_name', 'mean_yoy', 'std_yoy']
self.logger.info(f"\n YoY summary per country:")
self.logger.info(f" {'-'*60}")
self.logger.info(f" {'Country':<30} {'Mean YoY%':>10} {'Std YoY%':>10}")
self.logger.info(f" {'-'*60}")
for _, row in per_country.sort_values('mean_yoy', ascending=False).iterrows():
self.logger.info(
f" {row['country_name']:<30} {row['mean_yoy']:>+10.2f} {row['std_yoy']:>10.2f}"
)
self.df_clean = df self.df_clean = df
self.logger.info(f"\n [OK] YoY columns added: yoy_change, yoy_pct") self.logger.info(f" [OK] Kolom 'yoy_change', 'yoy_pct' ditambahkan")
return self.df_clean return self.df_clean
# ------------------------------------------------------------------ # ------------------------------------------------------------------
# STEP 8: ANALYZE INDICATOR AVAILABILITY BY YEAR # STEP 10: ANALYZE INDICATOR AVAILABILITY BY YEAR
# ------------------------------------------------------------------ # ------------------------------------------------------------------
def analyze_indicator_availability_by_year(self): def analyze_indicator_availability_by_year(self):
self.logger.info("\n" + "=" * 80) self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 8: ANALYZE INDICATOR AVAILABILITY BY YEAR") self.logger.info("STEP 10: ANALYZE INDICATOR AVAILABILITY BY YEAR")
self.logger.info("=" * 80) self.logger.info("=" * 80)
year_stats = self.df_clean.groupby('year').agg({ year_stats = self.df_clean.groupby('year').agg({
@@ -776,10 +779,7 @@ class AnalyticalLayerLoader:
) )
self.logger.info(f"\nTotal Indicators: {len(indicator_details)}") self.logger.info(f"\nTotal Indicators: {len(indicator_details)}")
for pillar, count in indicator_details.groupby('pillar_name').size().items(): self.logger.info(f"Framework breakdown:")
self.logger.info(f" {pillar}: {count} indicators")
self.logger.info(f"\nFramework breakdown:")
for fw, count in indicator_details.groupby('framework').size().items(): for fw, count in indicator_details.groupby('framework').size().items():
self.logger.info(f" {fw}: {count} indicators") self.logger.info(f" {fw}: {count} indicators")
@@ -800,37 +800,23 @@ class AnalyticalLayerLoader:
return year_stats return year_stats
# ------------------------------------------------------------------ # ------------------------------------------------------------------
# STEP 9: SAVE ANALYTICAL TABLE # STEP 11: SAVE ANALYTICAL TABLE
# ------------------------------------------------------------------ # ------------------------------------------------------------------
def save_analytical_table(self): def save_analytical_table(self):
"""
Simpan fact_asean_food_security_selected ke Gold layer.
Kolom yang disimpan:
country_id, country_name — dimensi negara
indicator_id, indicator_name — dimensi indikator
direction — arah penilaian (higher/lower_better)
framework — MDGs/SDGs (ditentukan di Step 6)
pillar_id, pillar_name — dimensi pilar
time_id, year — dimensi waktu
value — nilai indikator
yoy_change — perubahan absolut YoY (NULL di tahun pertama)
yoy_pct — perubahan relatif YoY dalam % (NULL di tahun pertama)
"""
table_name = 'fact_asean_food_security_selected' table_name = 'fact_asean_food_security_selected'
self.logger.info("\n" + "=" * 80) self.logger.info("\n" + "=" * 80)
self.logger.info(f"STEP 9: SAVE TO [DW/Gold] {table_name} -> fs_asean_gold") self.logger.info(f"STEP 11: SAVE TO [DW/Gold] {table_name} -> fs_asean_gold")
self.logger.info("=" * 80) self.logger.info("=" * 80)
try: try:
# Pastikan kolom YoY tersedia — fallback jika calculate_yoy() tidak dipanggil if 'framework' not in self.df_clean.columns:
if 'yoy_change' not in self.df_clean.columns or 'yoy_pct' not in self.df_clean.columns: raise ValueError("Kolom 'framework' tidak ada. Pastikan Step 6 sudah dijalankan.")
self.logger.warning( if 'norm_value_1_100' not in self.df_clean.columns:
" [WARN] Kolom YoY tidak ditemukan. Menjalankan calculate_yoy() sebagai fallback..." raise ValueError("Kolom 'norm_value_1_100' tidak ada. Pastikan Step 8 sudah dijalankan.")
) if 'yoy_change' not in self.df_clean.columns:
self.calculate_yoy() raise ValueError("Kolom 'yoy_change' tidak ada. Pastikan Step 9 sudah dijalankan.")
analytical_df = self.df_clean[[ analytical_df = self.df_clean[[
'country_id', 'country_id',
@@ -844,6 +830,7 @@ class AnalyticalLayerLoader:
'time_id', 'time_id',
'year', 'year',
'value', 'value',
'norm_value_1_100',
'yoy_change', 'yoy_change',
'yoy_pct', 'yoy_pct',
]].copy() ]].copy()
@@ -852,47 +839,49 @@ class AnalyticalLayerLoader:
['year', 'country_name', 'pillar_name', 'indicator_name'] ['year', 'country_name', 'pillar_name', 'indicator_name']
).reset_index(drop=True) ).reset_index(drop=True)
analytical_df['country_id'] = analytical_df['country_id'].astype(int) analytical_df['country_id'] = analytical_df['country_id'].astype(int)
analytical_df['country_name'] = analytical_df['country_name'].astype(str) analytical_df['country_name'] = analytical_df['country_name'].astype(str)
analytical_df['indicator_id'] = analytical_df['indicator_id'].astype(int) analytical_df['indicator_id'] = analytical_df['indicator_id'].astype(int)
analytical_df['indicator_name'] = analytical_df['indicator_name'].astype(str) analytical_df['indicator_name'] = analytical_df['indicator_name'].astype(str)
analytical_df['direction'] = analytical_df['direction'].astype(str) analytical_df['direction'] = analytical_df['direction'].astype(str)
analytical_df['framework'] = analytical_df['framework'].astype(str) analytical_df['framework'] = analytical_df['framework'].astype(str)
analytical_df['pillar_id'] = analytical_df['pillar_id'].astype(int) analytical_df['pillar_id'] = analytical_df['pillar_id'].astype(int)
analytical_df['pillar_name'] = analytical_df['pillar_name'].astype(str) analytical_df['pillar_name'] = analytical_df['pillar_name'].astype(str)
analytical_df['time_id'] = analytical_df['time_id'].astype(int) analytical_df['time_id'] = analytical_df['time_id'].astype(int)
analytical_df['year'] = analytical_df['year'].astype(int) analytical_df['year'] = analytical_df['year'].astype(int)
analytical_df['value'] = analytical_df['value'].astype(float) analytical_df['value'] = analytical_df['value'].astype(float)
analytical_df['yoy_change'] = analytical_df['yoy_change'].astype(float) analytical_df['norm_value_1_100'] = analytical_df['norm_value_1_100'].astype(float)
analytical_df['yoy_pct'] = analytical_df['yoy_pct'].astype(float) analytical_df['yoy_change'] = analytical_df['yoy_change'].astype(float)
analytical_df['yoy_pct'] = analytical_df['yoy_pct'].astype(float)
self.logger.info(f" Kolom yang disimpan: {list(analytical_df.columns)}")
self.logger.info(f" Total rows: {len(analytical_df):,}") self.logger.info(f" Total rows: {len(analytical_df):,}")
fw_dist = analytical_df.drop_duplicates('indicator_id')['framework'].value_counts() fw_dist = analytical_df.drop_duplicates('indicator_id')['framework'].value_counts()
self.logger.info(f" Framework distribution (per indikator unik):") self.logger.info(f" Framework distribution:")
for fw, cnt in fw_dist.items(): for fw, cnt in fw_dist.items():
self.logger.info(f" {fw}: {cnt} indicators") self.logger.info(f" {fw}: {cnt} indicators")
yoy_valid = analytical_df['yoy_pct'].notna().sum() self.logger.info(
yoy_null = analytical_df['yoy_pct'].isna().sum() f" norm_value_1_100 range: "
self.logger.info(f" YoY rows (calculated): {yoy_valid:,}") f"{analytical_df['norm_value_1_100'].min():.2f} - "
self.logger.info(f" YoY rows (NULL/base) : {yoy_null:,}") f"{analytical_df['norm_value_1_100'].max():.2f}"
)
schema = [ schema = [
bigquery.SchemaField("country_id", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("country_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("country_name", "STRING", mode="REQUIRED"), bigquery.SchemaField("country_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("indicator_id", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("indicator_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("indicator_name", "STRING", mode="REQUIRED"), bigquery.SchemaField("indicator_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("direction", "STRING", mode="REQUIRED"), bigquery.SchemaField("direction", "STRING", mode="REQUIRED"),
bigquery.SchemaField("framework", "STRING", mode="REQUIRED"), bigquery.SchemaField("framework", "STRING", mode="REQUIRED"),
bigquery.SchemaField("pillar_id", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("pillar_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("pillar_name", "STRING", mode="REQUIRED"), bigquery.SchemaField("pillar_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("time_id", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("time_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("value", "FLOAT", mode="REQUIRED"), bigquery.SchemaField("value", "FLOAT", mode="REQUIRED"),
bigquery.SchemaField("yoy_change", "FLOAT", mode="NULLABLE"), bigquery.SchemaField("norm_value_1_100", "FLOAT", mode="NULLABLE"),
bigquery.SchemaField("yoy_pct", "FLOAT", mode="NULLABLE"), bigquery.SchemaField("yoy_change", "FLOAT", mode="NULLABLE"),
bigquery.SchemaField("yoy_pct", "FLOAT", mode="NULLABLE"),
] ]
rows_loaded = load_to_bigquery( rows_loaded = load_to_bigquery(
@@ -915,30 +904,26 @@ class AnalyticalLayerLoader:
'config_snapshot' : json.dumps({ 'config_snapshot' : json.dumps({
'start_year' : self.start_year, 'start_year' : self.start_year,
'end_year' : self.end_year, 'end_year' : self.end_year,
'baseline_year' : self.baseline_year,
'sdg_start_year' : self.sdg_start_year, 'sdg_start_year' : self.sdg_start_year,
'fixed_countries' : len(self.selected_country_ids), 'fixed_countries' : len(self.selected_country_ids),
'no_gaps' : True, 'norm_scale' : '1-100 per indicator global minmax direction-aware',
'layer' : 'gold', 'condition_thresholds': {
'framework_logic' : ( 'bad' : f'< {THRESHOLD_BAD}',
f"SDGs if in SDG_INDICATOR_KEYWORDS AND start_year >= {self.sdg_start_year}, " 'moderate': f'{THRESHOLD_BAD}-{THRESHOLD_GOOD}',
"else MDGs" 'good' : f'> {THRESHOLD_GOOD}',
), },
}), }),
'validation_metrics' : json.dumps({ 'validation_metrics' : json.dumps({
'fixed_countries' : len(self.selected_country_ids), 'fixed_countries' : len(self.selected_country_ids),
'total_indicators': int(self.df_clean['indicator_id'].nunique()), 'total_indicators': int(self.df_clean['indicator_id'].nunique()),
'sdg_start_year' : self.sdg_start_year, 'sdg_start_year' : self.sdg_start_year,
'framework_dist' : fw_dist.to_dict(), 'framework_dist' : fw_dist.to_dict(),
'yoy_rows_valid' : int(yoy_valid),
'yoy_rows_null' : int(yoy_null),
}) })
} }
save_etl_metadata(self.client, metadata) save_etl_metadata(self.client, metadata)
self.logger.info( self.logger.info(f" [OK] {table_name}: {rows_loaded:,} rows -> fs_asean_gold")
f" {table_name}: {rows_loaded:,} rows -> [DW/Gold] fs_asean_gold"
)
self.logger.info(f" Metadata -> [AUDIT] etl_metadata")
return rows_loaded return rows_loaded
except Exception as e: except Exception as e:
@@ -955,9 +940,8 @@ class AnalyticalLayerLoader:
self.logger.info("\n" + "=" * 80) self.logger.info("\n" + "=" * 80)
self.logger.info("Output: fact_asean_food_security_selected -> fs_asean_gold") self.logger.info("Output: fact_asean_food_security_selected -> fs_asean_gold")
self.logger.info("Kolom: country_id/name, indicator_id/name, direction, framework,") self.logger.info("Kolom baru: norm_value_1_100 (min-max 1-100, direction-aware)")
self.logger.info(" pillar_id/name, time_id, year, value, yoy_change, yoy_pct") self.logger.info(f"Condition threshold: bad<{THRESHOLD_BAD}, good>{THRESHOLD_GOOD}")
self.logger.info(f"Framework: ditentukan dinamis berdasarkan SDG_START_YEAR (auto-detect)")
self.logger.info("=" * 80) self.logger.info("=" * 80)
self.load_source_data() self.load_source_data()
@@ -965,9 +949,10 @@ class AnalyticalLayerLoader:
self.filter_complete_indicators_per_country() self.filter_complete_indicators_per_country()
self.select_countries_with_all_pillars() self.select_countries_with_all_pillars()
self.filter_indicators_consistent_across_fixed_countries() self.filter_indicators_consistent_across_fixed_countries()
self.determine_sdg_start_year() # Step 6: auto-detect SDG year & assign framework self.determine_sdg_start_year()
self.verify_no_gaps() # Step 6c: verifikasi tidak ada gap self.verify_no_gaps()
self.calculate_yoy() # Step 7: hitung YoY self.calculate_norm_value() # Step 8: norm_value_1_100
self.calculate_yoy() # Step 9: yoy_change, yoy_pct
self.analyze_indicator_availability_by_year() self.analyze_indicator_availability_by_year()
self.save_analytical_table() self.save_analytical_table()
@@ -990,10 +975,6 @@ class AnalyticalLayerLoader:
# ============================================================================= # =============================================================================
def run_analytical_layer(): def run_analytical_layer():
"""
Airflow task: Build fact_asean_food_security_selected dari fact_food_security + dims.
Dipanggil setelah dimensional_model_to_gold selesai.
"""
from scripts.bigquery_config import get_bigquery_client from scripts.bigquery_config import get_bigquery_client
client = get_bigquery_client() client = get_bigquery_client()
loader = AnalyticalLayerLoader(client) loader = AnalyticalLayerLoader(client)
@@ -1009,7 +990,8 @@ if __name__ == "__main__":
print("=" * 80) print("=" * 80)
print("BIGQUERY ANALYTICAL LAYER - DATA FILTERING") print("BIGQUERY ANALYTICAL LAYER - DATA FILTERING")
print("Output: fact_asean_food_security_selected -> fs_asean_gold") print("Output: fact_asean_food_security_selected -> fs_asean_gold")
print("Framework: MDGs/SDGs ditentukan dinamis dari data (auto-detect SDG start year)") print(f"Norm: min-max 1-100 per indicator, direction-aware")
print(f"Condition threshold: bad<{THRESHOLD_BAD}, good>{THRESHOLD_GOOD}")
print("=" * 80) print("=" * 80)
logger = setup_logging() logger = setup_logging()