framework v1

This commit is contained in:
Debby
2026-04-01 20:33:16 +07:00
parent c3b7674001
commit d948819535

View File

@@ -1,39 +1,14 @@
"""
BIGQUERY ANALYTICAL LAYER - DATA FILTERING
fact_asean_food_security_selected disimpan di fs_asean_gold (layer='gold')
FIXED: fact_asean_food_security_selected disimpan di fs_asean_gold (layer='gold')
Filtering Order:
1. Load data (single years only)
2. Determine year boundaries (2013 - auto-detected end year, baseline=2023 per syarat dosen)
2. Determine year boundaries (2013 - auto-detected end year)
3. Filter complete indicators PER COUNTRY (auto-detect start year, no gaps)
4. Filter countries with ALL pillars (FIXED SET)
5. Filter indicators with consistent presence across FIXED countries
6. Determine SDG start year & assign framework (MDGs/SDGs) per ROW per year
7. Verify no gaps
8. Calculate norm_value_1_100 per indicator per country (min-max, direction-aware)
9. Calculate YoY per indicator per country
10. Analyze indicator availability by year
11. Save analytical table
NORMALISASI (Step 8):
- norm_value_1_100 = min-max normalisasi nilai raw per indikator, skala 1-100
- Direction-aware: lower_better diinvert sehingga nilai tinggi selalu = lebih baik
- Normalisasi dilakukan GLOBAL per indikator (semua negara, semua tahun sekaligus)
sehingga nilai antar negara dan antar tahun tetap comparable
- Kolom ini memungkinkan perbandingan antar indikator yang berbeda satuan di Looker Studio
FRAMEWORK LOGIC (Per-Row, bukan per indikator):
- sdg_start_year dideteksi dari data: tahun pertama indikator FIES lengkap
di semua fixed countries (setelah Step 3-5 filter selesai)
- Proxy deteksi sdg_start_year: HANYA FIES ("food insecurity", "food insecure")
Anemia TIDAK dipakai sebagai proxy karena datanya sudah ada sebelum era SDGs
- Framework di-assign PER BARIS (per year), bukan per indikator:
* row['year'] >= sdg_start_year AND nama ada di SDG_INDICATOR_KEYWORDS -> 'SDGs'
* Selain itu -> 'MDGs'
- Ini menangani indikator "shared" (anemia, stunting, wasting, undernourishment)
yang datanya ada sebelum SDGs:
* row lama (year < sdg_start_year) -> 'MDGs'
* row baru (year >= sdg_start_year) -> 'SDGs'
6. Save analytical table (dengan nama/label lengkap untuk Looker Studio)
"""
import pandas as pd
@@ -59,128 +34,6 @@ from scripts.bigquery_helpers import (
from google.cloud import bigquery
# =============================================================================
# SDG INDICATOR KEYWORDS
# Daftar nama indikator (lowercase) yang masuk SDG framework.
# Indikator ini akan di-assign 'SDGs' untuk baris dengan year >= sdg_start_year,
# dan 'MDGs' untuk baris dengan year < sdg_start_year.
# =============================================================================
SDG_INDICATOR_KEYWORDS = frozenset([
# TARGET 2.1.1 — Prevalence of undernourishment (shared: ada sebelum SDGs)
"prevalence of undernourishment (percent) (3-year average)",
"number of people undernourished (million) (3-year average)",
# TARGET 2.1.2 — FIES (SDGs only — murni baru di era SDGs)
"prevalence of severe food insecurity in the total population (percent) (3-year average)",
"prevalence of severe food insecurity in the male adult population (percent) (3-year average)",
"prevalence of severe food insecurity in the female adult population (percent) (3-year average)",
"prevalence of moderate or severe food insecurity in the total population (percent) (3-year average)",
"prevalence of moderate or severe food insecurity in the male adult population (percent) (3-year average)",
"prevalence of moderate or severe food insecurity in the female adult population (percent) (3-year average)",
"number of severely food insecure people (million) (3-year average)",
"number of severely food insecure male adults (million) (3-year average)",
"number of severely food insecure female adults (million) (3-year average)",
"number of moderately or severely food insecure people (million) (3-year average)",
"number of moderately or severely food insecure male adults (million) (3-year average)",
"number of moderately or severely food insecure female adults (million) (3-year average)",
# TARGET 2.2.1 — Stunting (shared: ada sebelum SDGs)
"percentage of children under 5 years of age who are stunted (modelled estimates) (percent)",
"number of children under 5 years of age who are stunted (modeled estimates) (million)",
# TARGET 2.2.2 — Wasting & Overweight (shared: ada sebelum SDGs)
"percentage of children under 5 years affected by wasting (percent)",
"number of children under 5 years affected by wasting (million)",
"percentage of children under 5 years of age who are overweight (modelled estimates) (percent)",
"number of children under 5 years of age who are overweight (modeled estimates) (million)",
# TARGET 2.2.3 — Anaemia (shared: data ada sebelum SDGs, listed here agar
# baris >= sdg_start_year di-assign 'SDGs')
"prevalence of anemia among women of reproductive age (15-49 years) (percent)",
"number of women of reproductive age (15-49 years) affected by anemia (million)",
])
# =============================================================================
# SDG ERA PROXY KEYWORDS
# HANYA indikator yang MURNI baru di era SDGs (FIES saja).
# Dipakai untuk mendeteksi sdg_start_year dari data.
#
# PENTING — Anemia/anaemia TIDAK dipakai sebagai proxy:
# Data anemia sudah ada sebelum era SDGs sehingga actual_start_year-nya
# lebih awal dari sdg_start_year. Jika dipakai sebagai proxy, sdg_start_year
# akan terdeteksi terlalu awal dan seluruh baris anemia akan menjadi 'SDGs'.
# FIES adalah satu-satunya indikator yang benar-benar murni baru di era SDGs
# dan dapat dipakai sebagai penanda tahun mulainya era SDGs.
# =============================================================================
_SDG_ERA_PROXY_KEYWORDS = frozenset([
"food insecurity",
"food insecure",
])
# =============================================================================
# THRESHOLD KONDISI (fixed absolute, skala 1-100)
# =============================================================================
# Digunakan untuk assign kondisi di analysis_layer.
# Didefinisikan di sini agar konsisten antara kedua file.
# bad : norm_value_1_100 < THRESHOLD_BAD
# good : norm_value_1_100 > THRESHOLD_GOOD
# moderate : di antara keduanya
THRESHOLD_BAD = 40.0
THRESHOLD_GOOD = 60.0
def assign_condition(norm_value_1_100: float) -> str:
"""
Assign kondisi berdasarkan norm_value_1_100 (skala 1-100, sudah direction-aware).
Nilai tinggi selalu berarti lebih baik (lower_better sudah diinvert).
Returns: 'good' / 'moderate' / 'bad'
"""
if pd.isna(norm_value_1_100):
return None
if norm_value_1_100 > THRESHOLD_GOOD:
return 'good'
if norm_value_1_100 < THRESHOLD_BAD:
return 'bad'
return 'moderate'
def assign_framework_per_row(
indicator_name: str,
year: int,
sdg_start_year: int,
) -> str:
"""
Tentukan framework (MDGs/SDGs) per BARIS (per row year), bukan per indikator.
Logic:
- 'SDGs' jika KEDUA kondisi terpenuhi:
1. Nama indikator ada di SDG_INDICATOR_KEYWORDS
2. year (tahun baris ini) >= sdg_start_year
- 'MDGs' untuk semua kasus lain.
Mengapa per row, bukan per indikator?
Indikator "shared" seperti anemia, stunting, wasting, undernourishment
memiliki data yang ada SEBELUM era SDGs dimulai. Jika assign dilakukan
per indikator menggunakan actual_start_year, indikator-indikator ini
akan selalu di-assign 'MDGs' karena actual_start_year < sdg_start_year.
Dengan assign per row menggunakan year baris:
- baris lama (year < sdg_start_year) -> 'MDGs' (benar: belum era SDGs)
- baris baru (year >= sdg_start_year) -> 'SDGs' (benar: sudah era SDGs)
Contoh anemia (sdg_start_year = 2016):
- row year=2013 -> 'MDGs'
- row year=2014 -> 'MDGs'
- row year=2015 -> 'MDGs'
- row year=2016 -> 'SDGs'
- row year=2017 -> 'SDGs'
- ...
"""
name_lower = str(indicator_name).lower().strip()
in_sdg_list = name_lower in SDG_INDICATOR_KEYWORDS
if in_sdg_list and int(year) >= sdg_start_year:
return 'SDGs'
return 'MDGs'
# =============================================================================
# ANALYTICAL LAYER CLASS
# =============================================================================
@@ -189,19 +42,13 @@ class AnalyticalLayerLoader:
"""
Analytical Layer Loader for BigQuery
Output kolom fact_asean_food_security_selected:
country_id, country_name,
indicator_id, indicator_name, direction, framework,
pillar_id, pillar_name,
time_id, year, value,
norm_value_1_100, <- min-max norm per indikator, skala 1-100, direction-aware
yoy_change, yoy_pct
Key Logic:
1. Complete per country (no gaps from start_year to end_year)
2. Filter countries with all pillars
3. Ensure indicators have consistent country count across all years
4. Save dengan kolom lengkap (nama + ID) untuk kemudahan Looker Studio
Catatan framework:
Framework di-assign PER BARIS (per year), sehingga indikator shared
seperti anemia dapat memiliki framework berbeda di baris yang berbeda:
- baris sebelum sdg_start_year -> 'MDGs'
- baris sejak sdg_start_year -> 'SDGs'
Output: fact_asean_food_security_selected -> DW layer (Gold) -> fs_asean_gold
"""
def __init__(self, client: bigquery.Client):
@@ -218,9 +65,7 @@ class AnalyticalLayerLoader:
self.start_year = 2013
self.end_year = None
self.baseline_year = 2023 # hardcode per syarat dosen (tahun terlengkap)
self.sdg_start_year = None
self.baseline_year = 2023
self.pipeline_metadata = {
'source_class' : self.__class__.__name__,
@@ -236,10 +81,6 @@ class AnalyticalLayerLoader:
self.pipeline_start = None
self.pipeline_end = None
# ------------------------------------------------------------------
# STEP 1: LOAD SOURCE DATA
# ------------------------------------------------------------------
def load_source_data(self):
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 1: LOADING SOURCE DATA from fs_asean_gold")
@@ -270,17 +111,14 @@ class AnalyticalLayerLoader:
"""
self.logger.info("Loading fact table with dimensions...")
self.df_clean = self.client.query(query).result().to_dataframe(
create_bqstorage_client=False
)
self.df_clean = self.client.query(query).result().to_dataframe(create_bqstorage_client=False)
self.logger.info(f" Loaded: {len(self.df_clean):,} rows")
if 'is_year_range' in self.df_clean.columns:
yr = self.df_clean['is_year_range'].value_counts()
self.logger.info(
f" Single years: {yr.get(False, 0):,} | "
f"Year ranges: {yr.get(True, 0):,}"
)
self.logger.info(f" Breakdown:")
self.logger.info(f" Single years (is_year_range=False): {yr.get(False, 0):,}")
self.logger.info(f" Year ranges (is_year_range=True): {yr.get(True, 0):,}")
self.df_indicator = read_from_bigquery(self.client, 'dim_indicator', layer='gold')
self.df_country = read_from_bigquery(self.client, 'dim_country', layer='gold')
@@ -297,48 +135,34 @@ class AnalyticalLayerLoader:
self.logger.error(f"Error loading source data: {e}")
raise
# ------------------------------------------------------------------
# STEP 2: DETERMINE YEAR BOUNDARIES
# ------------------------------------------------------------------
def determine_year_boundaries(self):
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 2: DETERMINE YEAR BOUNDARIES")
self.logger.info("=" * 80)
# Filter single years only (is_year_range == False)
if 'is_year_range' in self.df_clean.columns:
before = len(self.df_clean)
self.df_clean = self.df_clean[self.df_clean['is_year_range'] == False].copy()
self.logger.info(
f" Filter single years only: {before:,} -> {len(self.df_clean):,} rows"
)
df_2023 = self.df_clean[self.df_clean['year'] == self.baseline_year]
baseline_indicator_count = df_2023['indicator_id'].nunique()
# baseline_year = 2023 hardcode (syarat dosen: minimal 2023)
df_baseline = self.df_clean[self.df_clean['year'] == self.baseline_year]
baseline_indicator_count = df_baseline['indicator_id'].nunique()
self.logger.info(f"\n Baseline year (hardcode, syarat dosen): {self.baseline_year}")
self.logger.info(f" Baseline indicator count: {baseline_indicator_count}")
self.logger.info(f"\nBaseline Year: {self.baseline_year}")
self.logger.info(f"Baseline Indicator Count: {baseline_indicator_count}")
years_sorted = sorted(self.df_clean['year'].unique(), reverse=True)
selected_end_year = None
self.logger.info(f"\n Scanning end_year (>= {self.baseline_year}):")
for year in years_sorted:
if year >= self.baseline_year:
df_year = self.df_clean[self.df_clean['year'] == year]
year_indicator_count = df_year['indicator_id'].nunique()
status = "OK" if year_indicator_count >= baseline_indicator_count else "X"
self.logger.info(f" [{status}] Year {int(year)}: {year_indicator_count} indicators")
self.logger.info(f" [{status}] Year {int(year)}: {year_indicator_count} indicators")
if year_indicator_count >= baseline_indicator_count and selected_end_year is None:
selected_end_year = int(year)
if selected_end_year is None:
selected_end_year = self.baseline_year
self.logger.warning(f" [!] Fallback to baseline: {selected_end_year}")
self.logger.warning(f" [!] No year found, using baseline: {selected_end_year}")
else:
self.logger.info(f"\n [OK] Selected end year: {selected_end_year}")
self.logger.info(f"\n [OK] Selected End Year: {selected_end_year}")
self.end_year = selected_end_year
original_count = len(self.df_clean)
@@ -348,15 +172,11 @@ class AnalyticalLayerLoader:
(self.df_clean['year'] <= self.end_year)
].copy()
self.logger.info(f"\n Filtering {self.start_year}-{self.end_year}:")
self.logger.info(f" Rows before: {original_count:,}")
self.logger.info(f" Rows after : {len(self.df_clean):,}")
self.logger.info(f"\nFiltering {self.start_year}-{self.end_year}:")
self.logger.info(f" Rows before: {original_count:,}")
self.logger.info(f" Rows after: {len(self.df_clean):,}")
return self.df_clean
# ------------------------------------------------------------------
# STEP 3: FILTER COMPLETE INDICATORS PER COUNTRY
# ------------------------------------------------------------------
def filter_complete_indicators_per_country(self):
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 3: FILTER COMPLETE INDICATORS PER COUNTRY (NO GAPS)")
@@ -408,15 +228,10 @@ class AnalyticalLayerLoader:
self.logger.info(f"\n [+] Valid: {len(valid_combinations):,}")
self.logger.info(f" [-] Removed: {len(removed_combinations):,}")
df_valid = pd.DataFrame(valid_combinations)
df_valid['key'] = (
df_valid['country_id'].astype(str) + '_' +
df_valid['indicator_id'].astype(str)
)
self.df_clean['key'] = (
self.df_clean['country_id'].astype(str) + '_' +
self.df_clean['indicator_id'].astype(str)
)
df_valid = pd.DataFrame(valid_combinations)
df_valid['key'] = df_valid['country_id'].astype(str) + '_' + df_valid['indicator_id'].astype(str)
self.df_clean['key'] = (self.df_clean['country_id'].astype(str) + '_' +
self.df_clean['indicator_id'].astype(str))
original_count = len(self.df_clean)
self.df_clean = self.df_clean[self.df_clean['key'].isin(df_valid['key'])].copy()
@@ -428,10 +243,6 @@ class AnalyticalLayerLoader:
self.logger.info(f" Indicators: {self.df_clean['indicator_id'].nunique()}")
return self.df_clean
# ------------------------------------------------------------------
# STEP 4: SELECT COUNTRIES WITH ALL PILLARS
# ------------------------------------------------------------------
def select_countries_with_all_pillars(self):
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 4: SELECT COUNTRIES WITH ALL PILLARS (FIXED SET)")
@@ -454,26 +265,18 @@ class AnalyticalLayerLoader:
f"{row['pillar_count']}/{total_pillars} pillars"
)
selected_countries = country_pillar_count[
country_pillar_count['pillar_count'] == total_pillars
]
selected_countries = country_pillar_count[country_pillar_count['pillar_count'] == total_pillars]
self.selected_country_ids = selected_countries['country_id'].tolist()
self.logger.info(f"\n FIXED SET: {len(self.selected_country_ids)} countries")
original_count = len(self.df_clean)
self.df_clean = self.df_clean[
self.df_clean['country_id'].isin(self.selected_country_ids)
].copy()
self.df_clean = self.df_clean[self.df_clean['country_id'].isin(self.selected_country_ids)].copy()
self.logger.info(f" Rows before: {original_count:,}")
self.logger.info(f" Rows after: {len(self.df_clean):,}")
return self.df_clean
# ------------------------------------------------------------------
# STEP 5: FILTER INDICATORS CONSISTENT ACROSS FIXED COUNTRIES
# ------------------------------------------------------------------
def filter_indicators_consistent_across_fixed_countries(self):
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 5: FILTER INDICATORS WITH CONSISTENT PRESENCE")
@@ -482,9 +285,7 @@ class AnalyticalLayerLoader:
indicator_country_start = self.df_clean.groupby([
'indicator_id', 'indicator_name', 'country_id'
])['year'].min().reset_index()
indicator_country_start.columns = [
'indicator_id', 'indicator_name', 'country_id', 'start_year'
]
indicator_country_start.columns = ['indicator_id', 'indicator_name', 'country_id', 'start_year']
indicator_max_start = indicator_country_start.groupby([
'indicator_id', 'indicator_name'
@@ -529,26 +330,16 @@ class AnalyticalLayerLoader:
self.logger.info(f"\n [+] Valid: {len(valid_indicators)}")
self.logger.info(f" [-] Removed: {len(removed_indicators)}")
if removed_indicators:
self.logger.info(f"\n Removed indicators:")
for item in removed_indicators:
self.logger.info(f" [-] {item['indicator_name'][:60]} | {item['reason']}")
if not valid_indicators:
raise ValueError("No valid indicators found after filtering!")
original_count = len(self.df_clean)
self.df_clean = self.df_clean[
self.df_clean['indicator_id'].isin(valid_indicators)
].copy()
self.df_clean = self.df_clean[self.df_clean['indicator_id'].isin(valid_indicators)].copy()
self.df_clean = self.df_clean.merge(
indicator_max_start[['indicator_id', 'max_start_year']],
on='indicator_id', how='left'
indicator_max_start[['indicator_id', 'max_start_year']], on='indicator_id', how='left'
)
self.df_clean = self.df_clean[
self.df_clean['year'] >= self.df_clean['max_start_year']
].copy()
self.df_clean = self.df_clean[self.df_clean['year'] >= self.df_clean['max_start_year']].copy()
self.df_clean = self.df_clean.drop('max_start_year', axis=1)
self.logger.info(f"\n Rows before: {original_count:,}")
@@ -558,136 +349,18 @@ class AnalyticalLayerLoader:
self.logger.info(f" Pillars: {self.df_clean['pillar_id'].nunique()}")
return self.df_clean
# ------------------------------------------------------------------
# STEP 6: DETERMINE SDG START YEAR & ASSIGN FRAMEWORK PER ROW
# ------------------------------------------------------------------
def determine_sdg_start_year(self):
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 6: DETERMINE SDG START YEAR & ASSIGN FRAMEWORK PER ROW")
self.logger.info("=" * 80)
self.logger.info(
" Proxy: FIES only (food insecurity/food insecure).\n"
" Anemia TIDAK dipakai sebagai proxy — datanya ada sebelum era SDGs.\n"
" Framework di-assign PER BARIS (year), bukan per indikator."
)
# actual_start_year per indikator = max(min_year per country)
# = konsisten dengan max_start_year di Step 5
indicator_actual_start = (
self.df_clean
.groupby(['indicator_id', 'indicator_name', 'country_id'])['year']
.min().reset_index()
.groupby(['indicator_id', 'indicator_name'])['year']
.max().reset_index()
)
indicator_actual_start.columns = ['indicator_id', 'indicator_name', 'actual_start_year']
# Deteksi sdg_start_year dari proxy SDGs-only (FIES saja, BUKAN anemia)
proxy_mask = indicator_actual_start['indicator_name'].str.lower().apply(
lambda n: any(kw in n for kw in _SDG_ERA_PROXY_KEYWORDS)
)
df_proxy = indicator_actual_start[proxy_mask]
if df_proxy.empty:
raise ValueError(
"Tidak ada indikator proxy SDGs (FIES) yang lolos filter. "
"Pastikan indikator FIES (food insecurity/food insecure) ada di data."
)
self.sdg_start_year = int(df_proxy['actual_start_year'].min())
self.logger.info(f"\n sdg_start_year = {self.sdg_start_year}")
self.logger.info(f" Proxy indicators (FIES only):")
for _, row in df_proxy.iterrows():
self.logger.info(f" [{int(row['actual_start_year'])}] {row['indicator_name']}")
# ----------------------------------------------------------------
# Assign framework PER BARIS menggunakan year baris, bukan actual_start_year
# Sehingga indikator "shared" (anemia, stunting, dll) mendapat:
# - 'MDGs' untuk baris sebelum sdg_start_year
# - 'SDGs' untuk baris sejak sdg_start_year
# ----------------------------------------------------------------
self.df_clean['framework'] = self.df_clean.apply(
lambda row: assign_framework_per_row(
indicator_name = row['indicator_name'],
year = int(row['year']),
sdg_start_year = self.sdg_start_year,
),
axis=1
)
# ----------------------------------------------------------------
# Logging: ringkasan per indikator (frameworks apa yang muncul)
# ----------------------------------------------------------------
ind_fw_summary = (
self.df_clean
.groupby(['indicator_id', 'indicator_name'])['framework']
.unique()
.reset_index()
)
ind_fw_summary['frameworks'] = ind_fw_summary['framework'].apply(
lambda x: '/'.join(sorted(x))
)
ind_fw_summary = ind_fw_summary.merge(
indicator_actual_start[['indicator_id', 'actual_start_year']],
on='indicator_id', how='left'
)
self.logger.info(f"\n Framework assignment per indikator:")
self.logger.info(f" {'-'*85}")
self.logger.info(f" {'ID':<5} {'Frameworks':<18} {'ActualStart':<13} {'Indicator Name'}")
self.logger.info(f" {'-'*85}")
for _, row in ind_fw_summary.sort_values(
['frameworks', 'actual_start_year', 'indicator_name']
).iterrows():
self.logger.info(
f" {int(row['indicator_id']):<5} {row['frameworks']:<18} "
f"{int(row['actual_start_year']):<13} {row['indicator_name'][:48]}"
)
# Indikator dengan framework split (MDGs/SDGs) — highlight untuk validasi
split_inds = ind_fw_summary[ind_fw_summary['frameworks'] == 'MDGs/SDGs']
if not split_inds.empty:
self.logger.info(
f"\n [INFO] {len(split_inds)} indikator memiliki framework split "
f"(MDGs sebelum {self.sdg_start_year}, SDGs sejak {self.sdg_start_year}):"
)
for _, row in split_inds.iterrows():
self.logger.info(f" - {row['indicator_name'][:60]}")
fw_summary = self.df_clean['framework'].value_counts()
self.logger.info(
f"\n Ringkasan rows: " +
" | ".join(f"{fw}: {cnt:,}" for fw, cnt in fw_summary.items())
)
self.logger.info(
f"\n [OK] 'framework' ditambahkan per row — "
f"MDGs: {(self.df_clean['framework'] == 'MDGs').sum():,} rows | "
f"SDGs: {(self.df_clean['framework'] == 'SDGs').sum():,} rows"
)
return self.df_clean
# ------------------------------------------------------------------
# STEP 7: VERIFY NO GAPS
# ------------------------------------------------------------------
def verify_no_gaps(self):
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 7: VERIFY NO GAPS")
self.logger.info("STEP 6: VERIFY NO GAPS")
self.logger.info("=" * 80)
expected_countries = len(self.selected_country_ids)
verification = self.df_clean.groupby(
['indicator_id', 'year']
)['country_id'].nunique().reset_index()
verification = self.df_clean.groupby(['indicator_id', 'year'])['country_id'].nunique().reset_index()
verification.columns = ['indicator_id', 'year', 'country_count']
all_good = (verification['country_count'] == expected_countries).all()
if all_good:
self.logger.info(
f" VERIFICATION PASSED — all combinations have {expected_countries} countries"
)
self.logger.info(f" VERIFICATION PASSED — all combinations have {expected_countries} countries")
else:
bad = verification[verification['country_count'] != expected_countries]
for _, row in bad.head(10).iterrows():
@@ -699,143 +372,9 @@ class AnalyticalLayerLoader:
return True
# ------------------------------------------------------------------
# STEP 8: CALCULATE NORM_VALUE_1_100 PER INDICATOR
# ------------------------------------------------------------------
def calculate_norm_value(self):
"""
Hitung norm_value_1_100 per indikator — min-max normalisasi skala 1-100,
direction-aware.
CARA KERJA:
- Normalisasi dilakukan GLOBAL per indikator (semua negara + semua tahun sekaligus)
sehingga nilai antar negara dan antar tahun tetap comparable.
- lower_better diinvert: nilai tinggi selalu = kondisi lebih baik.
Contoh: undernourishment 5% (rendah = baik) → norm tinggi setelah invert.
- Skala 1-100 (bukan 0-100) untuk menghindari nilai absolut nol di Looker Studio.
- Kolom ini memungkinkan perbandingan lintas indikator yang berbeda satuan
(persen, juta orang, dll) karena sudah dinormalisasi ke skala yang sama.
Catatan:
- Berbeda dengan norm_value di _get_norm_value_df() di analysis_layer
yang skala 0-1 dan dipakai untuk agregasi composite score.
- norm_value_1_100 ini adalah per baris (per country per year per indicator),
untuk ditampilkan langsung di Looker Studio.
"""
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 8: CALCULATE NORM_VALUE_1_100 PER INDICATOR")
self.logger.info("=" * 80)
DIRECTION_INVERT = frozenset({
"negative", "lower_better", "lower_is_better", "inverse", "neg",
})
df = self.df_clean.copy()
norm_parts = []
indicators = df.groupby(['indicator_id', 'indicator_name', 'direction'])
self.logger.info(f"\n {'ID':<5} {'Direction':<15} {'Invert':<8} {'Min':>10} {'Max':>10} {'Indicator Name'}")
self.logger.info(f" {'-'*90}")
for (ind_id, ind_name, direction), grp in indicators:
grp = grp.copy()
do_invert = str(direction).lower().strip() in DIRECTION_INVERT
valid_mask = grp['value'].notna()
n_valid = valid_mask.sum()
if n_valid < 2:
grp['norm_value_1_100'] = np.nan
norm_parts.append(grp)
continue
raw = grp.loc[valid_mask, 'value'].values
v_min = raw.min()
v_max = raw.max()
normed = np.full(len(grp), np.nan)
if v_min == v_max:
# Semua nilai sama → beri nilai tengah (50.5 pada skala 1-100)
normed[valid_mask.values] = 50.5
else:
# Min-max ke 0-1 dulu
scaled = (raw - v_min) / (v_max - v_min)
# Invert jika lower_better
if do_invert:
scaled = 1.0 - scaled
# Scale ke 1-100
normed[valid_mask.values] = 1.0 + scaled * 99.0
grp['norm_value_1_100'] = normed
self.logger.info(
f" {int(ind_id):<5} {direction:<15} {'YES' if do_invert else 'no':<8} "
f"{v_min:>10.3f} {v_max:>10.3f} {ind_name[:45]}"
)
norm_parts.append(grp)
self.df_clean = pd.concat(norm_parts, ignore_index=True)
# Statistik ringkasan
valid_norm = self.df_clean['norm_value_1_100'].notna().sum()
null_norm = self.df_clean['norm_value_1_100'].isna().sum()
self.logger.info(f"\n norm_value_1_100 — valid: {valid_norm:,} | null: {null_norm:,}")
self.logger.info(
f" Range aktual: "
f"{self.df_clean['norm_value_1_100'].min():.2f} - "
f"{self.df_clean['norm_value_1_100'].max():.2f}"
)
# Log distribusi kondisi berdasarkan threshold
self.df_clean['_condition_preview'] = self.df_clean['norm_value_1_100'].apply(assign_condition)
cond_dist = self.df_clean['_condition_preview'].value_counts()
self.logger.info(f"\n Distribusi kondisi (threshold: bad<{THRESHOLD_BAD}, good>{THRESHOLD_GOOD}):")
for cond, cnt in cond_dist.items():
self.logger.info(f" {cond}: {cnt:,} rows")
self.df_clean = self.df_clean.drop(columns=['_condition_preview'])
self.logger.info(f"\n [OK] Kolom 'norm_value_1_100' ditambahkan ke df_clean")
return self.df_clean
# ------------------------------------------------------------------
# STEP 9: CALCULATE YOY
# ------------------------------------------------------------------
def calculate_yoy(self):
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 9: CALCULATE YEAR-OVER-YEAR (YoY) PER INDICATOR PER COUNTRY")
self.logger.info("=" * 80)
df = self.df_clean.sort_values(['country_id', 'indicator_id', 'year']).copy()
df['value_prev'] = df.groupby(['country_id', 'indicator_id'])['value'].shift(1)
df['yoy_change'] = df['value'] - df['value_prev']
df['yoy_pct'] = np.where(
df['value_prev'].notna() & (df['value_prev'] != 0),
(df['yoy_change'] / df['value_prev'].abs()) * 100,
np.nan
)
df = df.drop(columns=['value_prev'])
total_rows = len(df)
valid_yoy = df['yoy_pct'].notna().sum()
null_yoy = df['yoy_pct'].isna().sum()
self.logger.info(f" Total rows : {total_rows:,}")
self.logger.info(f" YoY calculated : {valid_yoy:,}")
self.logger.info(f" YoY NULL (base yr): {null_yoy:,}")
self.df_clean = df
self.logger.info(f" [OK] Kolom 'yoy_change', 'yoy_pct' ditambahkan")
return self.df_clean
# ------------------------------------------------------------------
# STEP 10: ANALYZE INDICATOR AVAILABILITY BY YEAR
# ------------------------------------------------------------------
def analyze_indicator_availability_by_year(self):
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 10: ANALYZE INDICATOR AVAILABILITY BY YEAR")
self.logger.info("STEP 7: ANALYZE INDICATOR AVAILABILITY BY YEAR")
self.logger.info("=" * 80)
year_stats = self.df_clean.groupby('year').agg({
@@ -861,147 +400,89 @@ class AnalyticalLayerLoader:
'indicator_id', 'indicator_name', 'pillar_name', 'direction',
'start_year', 'end_year', 'country_count'
]
# Framework summary per indikator (bisa MDGs, SDGs, atau MDGs/SDGs split)
ind_fw = (
self.df_clean
.groupby('indicator_id')['framework']
.unique()
.reset_index()
)
ind_fw['framework_label'] = ind_fw['framework'].apply(
lambda x: '/'.join(sorted(x))
)
indicator_details = indicator_details.merge(
ind_fw[['indicator_id', 'framework_label']],
on='indicator_id', how='left'
)
indicator_details['year_range'] = (
indicator_details['start_year'].astype(int).astype(str) + '-' +
indicator_details['end_year'].astype(int).astype(str)
)
indicator_details = indicator_details.sort_values(
['framework_label', 'pillar_name', 'start_year', 'indicator_name']
)
indicator_details = indicator_details.sort_values(['pillar_name', 'start_year', 'indicator_name'])
self.logger.info(f"\nTotal Indicators: {len(indicator_details)}")
self.logger.info(f"Framework breakdown (per indicator label):")
for fw, count in indicator_details.groupby('framework_label').size().items():
self.logger.info(f" {fw}: {count} indicators")
for pillar, count in indicator_details.groupby('pillar_name').size().items():
self.logger.info(f" {pillar}: {count} indicators")
self.logger.info(f"\n{'-'*115}")
self.logger.info(
f"{'ID':<5} {'Indicator Name':<55} {'Pillar':<15} "
f"{'Framework':<15} {'Years':<12} {'Dir':<8} {'Countries'}"
)
self.logger.info(f"{'-'*115}")
self.logger.info(f"\n{'-'*100}")
self.logger.info(f"{'ID':<5} {'Indicator Name':<55} {'Pillar':<15} {'Years':<12} {'Dir':<8} {'Countries'}")
self.logger.info(f"{'-'*100}")
for _, row in indicator_details.iterrows():
direction = 'higher+' if row['direction'] == 'higher_better' else 'lower-'
self.logger.info(
f"{int(row['indicator_id']):<5} {row['indicator_name'][:52]:<55} "
f"{row['pillar_name'][:13]:<15} {row['framework_label']:<15} "
f"{row['year_range']:<12} {direction:<8} {int(row['country_count'])}"
f"{row['pillar_name'][:13]:<15} {row['year_range']:<12} "
f"{direction:<8} {int(row['country_count'])}"
)
return year_stats
# ------------------------------------------------------------------
# STEP 11: SAVE ANALYTICAL TABLE
# ------------------------------------------------------------------
def save_analytical_table(self):
# ---------------------------------------------------------------
# CHANGED: nama tabel baru + kolom lengkap untuk Looker Studio
# ---------------------------------------------------------------
table_name = 'fact_asean_food_security_selected'
self.logger.info("\n" + "=" * 80)
self.logger.info(f"STEP 11: SAVE TO [DW/Gold] {table_name} -> fs_asean_gold")
self.logger.info(f"STEP 8: SAVE TO [DW/Gold] {table_name} -> fs_asean_gold")
self.logger.info("=" * 80)
try:
if 'framework' not in self.df_clean.columns:
raise ValueError("Kolom 'framework' tidak ada. Pastikan Step 6 sudah dijalankan.")
if 'norm_value_1_100' not in self.df_clean.columns:
raise ValueError("Kolom 'norm_value_1_100' tidak ada. Pastikan Step 8 sudah dijalankan.")
if 'yoy_change' not in self.df_clean.columns:
raise ValueError("Kolom 'yoy_change' tidak ada. Pastikan Step 9 sudah dijalankan.")
# ------------------------------------------------------------------
# Pilih kolom: ID + Nama lengkap + value
# Kolom nama memudahkan filtering/slicing langsung di Looker Studio
# tanpa perlu join ulang ke tabel dimensi.
# ------------------------------------------------------------------
analytical_df = self.df_clean[[
'country_id',
'country_name',
'indicator_id',
'indicator_name',
'direction',
'framework',
'pillar_id',
'pillar_name',
'time_id',
'year',
'value',
'norm_value_1_100',
'yoy_change',
'yoy_pct',
]].copy()
analytical_df = analytical_df.sort_values(
['year', 'country_name', 'pillar_name', 'indicator_name']
).reset_index(drop=True)
analytical_df['country_id'] = analytical_df['country_id'].astype(int)
analytical_df['country_name'] = analytical_df['country_name'].astype(str)
analytical_df['indicator_id'] = analytical_df['indicator_id'].astype(int)
analytical_df['indicator_name'] = analytical_df['indicator_name'].astype(str)
analytical_df['direction'] = analytical_df['direction'].astype(str)
analytical_df['framework'] = analytical_df['framework'].astype(str)
analytical_df['pillar_id'] = analytical_df['pillar_id'].astype(int)
analytical_df['pillar_name'] = analytical_df['pillar_name'].astype(str)
analytical_df['time_id'] = analytical_df['time_id'].astype(int)
analytical_df['year'] = analytical_df['year'].astype(int)
analytical_df['value'] = analytical_df['value'].astype(float)
analytical_df['norm_value_1_100'] = analytical_df['norm_value_1_100'].astype(float)
analytical_df['yoy_change'] = analytical_df['yoy_change'].astype(float)
analytical_df['yoy_pct'] = analytical_df['yoy_pct'].astype(float)
# Pastikan tipe data konsisten
analytical_df['country_id'] = analytical_df['country_id'].astype(int)
analytical_df['country_name'] = analytical_df['country_name'].astype(str)
analytical_df['indicator_id'] = analytical_df['indicator_id'].astype(int)
analytical_df['indicator_name']= analytical_df['indicator_name'].astype(str)
analytical_df['direction'] = analytical_df['direction'].astype(str)
analytical_df['pillar_id'] = analytical_df['pillar_id'].astype(int)
analytical_df['pillar_name'] = analytical_df['pillar_name'].astype(str)
analytical_df['time_id'] = analytical_df['time_id'].astype(int)
analytical_df['year'] = analytical_df['year'].astype(int)
analytical_df['value'] = analytical_df['value'].astype(float)
self.logger.info(f" Kolom yang disimpan: {list(analytical_df.columns)}")
self.logger.info(f" Total rows: {len(analytical_df):,}")
# Framework distribution per row
fw_dist_rows = analytical_df['framework'].value_counts()
self.logger.info(f" Framework distribution (rows):")
for fw, cnt in fw_dist_rows.items():
self.logger.info(f" {fw}: {cnt:,} rows")
# Framework distribution per indikator (label)
ind_fw_label = (
analytical_df
.groupby('indicator_id')['framework']
.unique()
.apply(lambda x: '/'.join(sorted(x)))
.value_counts()
)
self.logger.info(f" Framework distribution (per indicator label):")
for fw, cnt in ind_fw_label.items():
self.logger.info(f" {fw}: {cnt} indicators")
self.logger.info(
f" norm_value_1_100 range: "
f"{analytical_df['norm_value_1_100'].min():.2f} - "
f"{analytical_df['norm_value_1_100'].max():.2f}"
)
# Schema BigQuery
schema = [
bigquery.SchemaField("country_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("country_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("indicator_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("indicator_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("direction", "STRING", mode="REQUIRED"),
bigquery.SchemaField("framework", "STRING", mode="REQUIRED"),
bigquery.SchemaField("pillar_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("pillar_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("time_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("value", "FLOAT", mode="REQUIRED"),
bigquery.SchemaField("norm_value_1_100", "FLOAT", mode="NULLABLE"),
bigquery.SchemaField("yoy_change", "FLOAT", mode="NULLABLE"),
bigquery.SchemaField("yoy_pct", "FLOAT", mode="NULLABLE"),
bigquery.SchemaField("country_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("country_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("indicator_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("indicator_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("direction", "STRING", mode="REQUIRED"),
bigquery.SchemaField("pillar_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("pillar_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("time_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("value", "FLOAT", mode="REQUIRED"),
]
rows_loaded = load_to_bigquery(
@@ -1022,51 +503,34 @@ class AnalyticalLayerLoader:
'rows_loaded' : rows_loaded,
'completeness_pct' : 100.0,
'config_snapshot' : json.dumps({
'start_year' : self.start_year,
'end_year' : self.end_year,
'baseline_year' : self.baseline_year,
'sdg_start_year' : self.sdg_start_year,
'fixed_countries' : len(self.selected_country_ids),
'norm_scale' : '1-100 per indicator global minmax direction-aware',
'framework_assignment' : 'per-row by year (not per-indicator)',
'sdg_proxy_keywords' : list(_SDG_ERA_PROXY_KEYWORDS),
'condition_thresholds' : {
'bad' : f'< {THRESHOLD_BAD}',
'moderate': f'{THRESHOLD_BAD}-{THRESHOLD_GOOD}',
'good' : f'> {THRESHOLD_GOOD}',
},
'start_year' : self.start_year,
'end_year' : self.end_year,
'fixed_countries': len(self.selected_country_ids),
'no_gaps' : True,
'layer' : 'gold',
'columns' : 'id + name + value (Looker Studio ready)'
}),
'validation_metrics' : json.dumps({
'fixed_countries' : len(self.selected_country_ids),
'total_indicators' : int(self.df_clean['indicator_id'].nunique()),
'sdg_start_year' : self.sdg_start_year,
'framework_dist_rows' : fw_dist_rows.to_dict(),
'framework_dist_inds' : ind_fw_label.to_dict(),
'fixed_countries' : len(self.selected_country_ids),
'total_indicators': int(self.df_clean['indicator_id'].nunique())
})
}
save_etl_metadata(self.client, metadata)
self.logger.info(f" [OK] {table_name}: {rows_loaded:,} rows -> fs_asean_gold")
self.logger.info(f" {table_name}: {rows_loaded:,} rows → [DW/Gold] fs_asean_gold")
self.logger.info(f" Metadata → [AUDIT] etl_metadata")
return rows_loaded
except Exception as e:
self.logger.error(f"Error saving: {e}")
raise
# ------------------------------------------------------------------
# RUN
# ------------------------------------------------------------------
def run(self):
self.pipeline_start = datetime.now()
self.pipeline_metadata['start_time'] = self.pipeline_start
self.logger.info("\n" + "=" * 80)
self.logger.info("Output: fact_asean_food_security_selected -> fs_asean_gold")
self.logger.info("Kolom baru: norm_value_1_100 (min-max 1-100, direction-aware)")
self.logger.info("Framework: per-row by year (shared indicators split MDGs/SDGs)")
self.logger.info(f"SDG Proxy: FIES only (food insecurity/food insecure)")
self.logger.info(f"Condition threshold: bad<{THRESHOLD_BAD}, good>{THRESHOLD_GOOD}")
self.logger.info("Output: fact_asean_food_security_selected fs_asean_gold")
self.logger.info("=" * 80)
self.load_source_data()
@@ -1074,10 +538,7 @@ class AnalyticalLayerLoader:
self.filter_complete_indicators_per_country()
self.select_countries_with_all_pillars()
self.filter_indicators_consistent_across_fixed_countries()
self.determine_sdg_start_year() # Step 6: per-row framework assignment
self.verify_no_gaps()
self.calculate_norm_value() # Step 8: norm_value_1_100
self.calculate_yoy() # Step 9: yoy_change, yoy_pct
self.analyze_indicator_availability_by_year()
self.save_analytical_table()
@@ -1087,12 +548,11 @@ class AnalyticalLayerLoader:
self.logger.info("\n" + "=" * 80)
self.logger.info("COMPLETED")
self.logger.info("=" * 80)
self.logger.info(f" Duration : {duration:.2f}s")
self.logger.info(f" Year Range : {self.start_year}-{self.end_year}")
self.logger.info(f" SDG Start Yr : {self.sdg_start_year}")
self.logger.info(f" Countries : {len(self.selected_country_ids)}")
self.logger.info(f" Indicators : {self.df_clean['indicator_id'].nunique()}")
self.logger.info(f" Rows Loaded : {self.pipeline_metadata['rows_loaded']:,}")
self.logger.info(f" Duration : {duration:.2f}s")
self.logger.info(f" Year Range : {self.start_year}-{self.end_year}")
self.logger.info(f" Countries : {len(self.selected_country_ids)}")
self.logger.info(f" Indicators : {self.df_clean['indicator_id'].nunique()}")
self.logger.info(f" Rows Loaded: {self.pipeline_metadata['rows_loaded']:,}")
# =============================================================================
@@ -1100,6 +560,10 @@ class AnalyticalLayerLoader:
# =============================================================================
def run_analytical_layer():
"""
Airflow task: Build fact_asean_food_security_selected dari fact_food_security + dims.
Dipanggil setelah dimensional_model_to_gold selesai.
"""
from scripts.bigquery_config import get_bigquery_client
client = get_bigquery_client()
loader = AnalyticalLayerLoader(client)
@@ -1113,11 +577,7 @@ def run_analytical_layer():
if __name__ == "__main__":
print("=" * 80)
print("BIGQUERY ANALYTICAL LAYER - DATA FILTERING")
print("Output: fact_asean_food_security_selected -> fs_asean_gold")
print(f"Norm: min-max 1-100 per indicator, direction-aware")
print(f"Framework: per-row by year | SDG Proxy: FIES only")
print(f"Condition threshold: bad<{THRESHOLD_BAD}, good>{THRESHOLD_GOOD}")
print("Output: fact_asean_food_security_selected → fs_asean_gold")
print("=" * 80)
logger = setup_logging()
@@ -1127,6 +587,4 @@ if __name__ == "__main__":
print("\n" + "=" * 80)
print("[OK] COMPLETED")
print(f" SDG Start Year : {loader.sdg_start_year}")
print(f" Rows Loaded : {loader.pipeline_metadata['rows_loaded']:,}")
print("=" * 80)