Files
airflow-coolify/scripts/bigquery_analytical_layer.py
2026-04-02 17:34:33 +07:00

1141 lines
50 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
BIGQUERY ANALYTICAL LAYER - DATA FILTERING
fact_asean_food_security_selected disimpan di fs_asean_gold (layer='gold')
Filtering Order:
1. Load data (single years only)
2. Determine year boundaries (2013 - auto-detected end year, baseline=2023 per syarat dosen)
3. Filter complete indicators PER COUNTRY (auto-detect start year, no gaps)
4. Filter countries with ALL pillars (FIXED SET)
5. Filter indicators with consistent presence across FIXED countries
→ TIDAK menghapus baris year < max_start_year
→ Semua baris tetap ada; label framework ditentukan di Step 6
6. Assign framework (MDGs/SDGs) per indicator PER ROW
→ Indikator TIDAK di SDG_ONLY_KEYWORDS → 'MDGs' selalu
→ Indikator DI SDG_ONLY_KEYWORDS + year >= SDG_TRANSITION_YEAR → 'SDGs'
→ Indikator DI SDG_ONLY_KEYWORDS + year < SDG_TRANSITION_YEAR → 'MDGs'
→ SDG_TRANSITION_YEAR = 2015 (HARDCODE — tanggal resmi SDGs berlaku)
7. Verify no gaps (dari actual_start_year per indikator, bukan start_year global)
8. Calculate norm_value_1_100 per indicator (min-max, direction-aware, global)
*** PERBAIKAN: normalisasi dilakukan SEKALI untuk seluruh data (semua tahun),
bukan per-framework, agar nilai dari era MDGs dan SDGs berada di
skala yang sama dan dapat dibandingkan secara adil. ***
9. Calculate YoY per indicator per country
10. Analyze indicator availability by year
11. Save analytical table
FRAMEWORK LOGIC:
- SDG_TRANSITION_YEAR = 2015 (HARDCODE, bukan auto-detect dari data)
- Semua SDG-only indicators menggunakan SDG_TRANSITION_YEAR yang SAMA
- SDG-only + year < SDG_TRANSITION_YEAR → 'MDGs' (data tetap ada, tidak dihapus)
- SDG-only + year >= SDG_TRANSITION_YEAR → 'SDGs'
- Non-SDG-only indicators → 'MDGs' selalu (di semua tahun)
NORMALISASI (PERBAIKAN):
- norm_value_1_100 dihitung SATU KALI per indikator menggunakan seluruh data
(semua tahun, semua negara) sebagai referensi min-max.
- Ini memastikan nilai 60 di era MDGs dan nilai 60 di era SDGs memiliki
makna yang SAMA (posisi relatif yang sama dalam distribusi global).
- Tidak ada rescaling ulang per-framework di layer analitik ini.
- Rescaling per-framework (jika diperlukan untuk visualisasi) sebaiknya
dilakukan di layer agregasi (analysis_layer) dengan flag eksplisit.
"""
import pandas as pd
import numpy as np
from datetime import datetime
import logging
from typing import Dict, List
import json
import sys
if hasattr(sys.stdout, 'reconfigure'):
sys.stdout.reconfigure(encoding='utf-8')
from scripts.bigquery_config import get_bigquery_client, CONFIG, get_table_id
from scripts.bigquery_helpers import (
log_update,
load_to_bigquery,
read_from_bigquery,
setup_logging,
truncate_table,
save_etl_metadata,
)
from google.cloud import bigquery
# =============================================================================
# SDG-ONLY INDICATOR KEYWORDS
# =============================================================================
SDG_ONLY_KEYWORDS = frozenset([
# TARGET 2.1.1 — Undernourishment
"prevalence of undernourishment (percent) (3-year average)",
"number of people undernourished (million) (3-year average)",
# TARGET 2.1.2 — Food Insecurity (FIES)
"prevalence of severe food insecurity in the total population (percent) (3-year average)",
"prevalence of severe food insecurity in the male adult population (percent) (3-year average)",
"prevalence of severe food insecurity in the female adult population (percent) (3-year average)",
"prevalence of moderate or severe food insecurity in the total population (percent) (3-year average)",
"prevalence of moderate or severe food insecurity in the male adult population (percent) (3-year average)",
"prevalence of moderate or severe food insecurity in the female adult population (percent) (3-year average)",
"number of severely food insecure people (million) (3-year average)",
"number of severely food insecure male adults (million) (3-year average)",
"number of severely food insecure female adults (million) (3-year average)",
"number of moderately or severely food insecure people (million) (3-year average)",
"number of moderately or severely food insecure male adults (million) (3-year average)",
"number of moderately or severely food insecure female adults (million) (3-year average)",
# TARGET 2.2.1 — Stunting
"percentage of children under 5 years of age who are stunted (modelled estimates) (percent)",
"number of children under 5 years of age who are stunted (modeled estimates) (million)",
# TARGET 2.2.2 — Wasting
"percentage of children under 5 years affected by wasting (percent)",
"number of children under 5 years affected by wasting (million)",
# TARGET 2.2.2 — Overweight (children)
"percentage of children under 5 years of age who are overweight (modelled estimates) (percent)",
"number of children under 5 years of age who are overweight (modeled estimates) (million)",
# TARGET 2.2.3 — Anaemia
"prevalence of anemia among women of reproductive age (15-49 years) (percent)",
"number of women of reproductive age (15-49 years) affected by anemia (million)",
])
# =============================================================================
# SDG TRANSITION YEAR — HARDCODE
# =============================================================================
SDG_TRANSITION_YEAR = 2015
# =============================================================================
# THRESHOLD KONDISI (fixed absolute, skala 1-100)
# =============================================================================
THRESHOLD_BAD = 40.0
THRESHOLD_GOOD = 60.0
def assign_condition(norm_value_1_100: float) -> str:
if pd.isna(norm_value_1_100):
return None
if norm_value_1_100 > THRESHOLD_GOOD:
return 'good'
if norm_value_1_100 < THRESHOLD_BAD:
return 'bad'
return 'moderate'
# =============================================================================
# ANALYTICAL LAYER CLASS
# =============================================================================
class AnalyticalLayerLoader:
"""
Analytical Layer Loader for BigQuery
PERBAIKAN NORMALISASI:
- norm_value_1_100 dihitung SEKALI per indikator dari seluruh data
(semua tahun, semua negara). Tidak ada rescaling ulang per-framework.
- Ini memastikan komparabilitas lintas era MDGs dan SDGs.
"""
def __init__(self, client: bigquery.Client):
self.client = client
self.logger = logging.getLogger(self.__class__.__name__)
self.logger.propagate = False
self.df_clean = None
self.df_indicator = None
self.df_country = None
self.df_pillar = None
self.selected_country_ids = None
self.indicator_max_start_map = {}
self.start_year = 2013
self.end_year = None
self.baseline_year = 2023
self.sdg_transition_year = SDG_TRANSITION_YEAR
self.pipeline_metadata = {
'source_class' : self.__class__.__name__,
'start_time' : None,
'end_time' : None,
'duration_seconds' : None,
'rows_fetched' : 0,
'rows_transformed' : 0,
'rows_loaded' : 0,
'validation_metrics': {}
}
self.pipeline_start = None
self.pipeline_end = None
# ------------------------------------------------------------------
# STEP 1: LOAD SOURCE DATA
# ------------------------------------------------------------------
def load_source_data(self):
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 1: LOADING SOURCE DATA from fs_asean_gold")
self.logger.info("=" * 80)
try:
query = f"""
SELECT
f.country_id,
c.country_name,
f.indicator_id,
i.indicator_name,
i.direction,
f.pillar_id,
p.pillar_name,
f.time_id,
t.year,
t.start_year,
t.end_year,
t.is_year_range,
f.value,
f.source_id
FROM `{get_table_id('fact_food_security', layer='gold')}` f
JOIN `{get_table_id('dim_country', layer='gold')}` c ON f.country_id = c.country_id
JOIN `{get_table_id('dim_indicator', layer='gold')}` i ON f.indicator_id = i.indicator_id
JOIN `{get_table_id('dim_pillar', layer='gold')}` p ON f.pillar_id = p.pillar_id
JOIN `{get_table_id('dim_time', layer='gold')}` t ON f.time_id = t.time_id
"""
self.logger.info("Loading fact table with dimensions...")
self.df_clean = self.client.query(query).result().to_dataframe(
create_bqstorage_client=False
)
self.logger.info(f" Loaded: {len(self.df_clean):,} rows")
if 'is_year_range' in self.df_clean.columns:
yr = self.df_clean['is_year_range'].value_counts()
self.logger.info(
f" Single years: {yr.get(False, 0):,} | "
f"Year ranges: {yr.get(True, 0):,}"
)
self.df_indicator = read_from_bigquery(self.client, 'dim_indicator', layer='gold')
self.df_country = read_from_bigquery(self.client, 'dim_country', layer='gold')
self.df_pillar = read_from_bigquery(self.client, 'dim_pillar', layer='gold')
self.logger.info(f" Indicators: {len(self.df_indicator)}")
self.logger.info(f" Countries: {len(self.df_country)}")
self.logger.info(f" Pillars: {len(self.df_pillar)}")
self.pipeline_metadata['rows_fetched'] = len(self.df_clean)
return True
except Exception as e:
self.logger.error(f"Error loading source data: {e}")
raise
# ------------------------------------------------------------------
# STEP 2: DETERMINE YEAR BOUNDARIES
# ------------------------------------------------------------------
def determine_year_boundaries(self):
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 2: DETERMINE YEAR BOUNDARIES")
self.logger.info("=" * 80)
df_baseline = self.df_clean[self.df_clean['year'] == self.baseline_year]
baseline_indicator_count = df_baseline['indicator_id'].nunique()
self.logger.info(f"\n Baseline year (hardcode, syarat dosen): {self.baseline_year}")
self.logger.info(f" Baseline indicator count: {baseline_indicator_count}")
years_sorted = sorted(self.df_clean['year'].unique(), reverse=True)
selected_end_year = None
self.logger.info(f"\n Scanning end_year (>= {self.baseline_year}):")
for year in years_sorted:
if year >= self.baseline_year:
df_year = self.df_clean[self.df_clean['year'] == year]
year_indicator_count = df_year['indicator_id'].nunique()
status = "OK" if year_indicator_count >= baseline_indicator_count else "X"
self.logger.info(f" [{status}] Year {int(year)}: {year_indicator_count} indicators")
if year_indicator_count >= baseline_indicator_count and selected_end_year is None:
selected_end_year = int(year)
if selected_end_year is None:
selected_end_year = self.baseline_year
self.logger.warning(f" [!] Fallback to baseline: {selected_end_year}")
else:
self.logger.info(f"\n [OK] Selected end year: {selected_end_year}")
self.end_year = selected_end_year
original_count = len(self.df_clean)
self.df_clean = self.df_clean[
(self.df_clean['year'] >= self.start_year) &
(self.df_clean['year'] <= self.end_year)
].copy()
self.logger.info(f"\n Filtering {self.start_year}-{self.end_year}:")
self.logger.info(f" Rows before: {original_count:,}")
self.logger.info(f" Rows after : {len(self.df_clean):,}")
return self.df_clean
# ------------------------------------------------------------------
# STEP 3: FILTER COMPLETE INDICATORS PER COUNTRY
# ------------------------------------------------------------------
def filter_complete_indicators_per_country(self):
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 3: FILTER COMPLETE INDICATORS PER COUNTRY (NO GAPS)")
self.logger.info("=" * 80)
grouped = self.df_clean.groupby([
'country_id', 'country_name', 'indicator_id', 'indicator_name',
'pillar_id', 'pillar_name'
])
valid_combinations = []
removed_combinations = []
for (country_id, country_name, indicator_id, indicator_name,
pillar_id, pillar_name), group in grouped:
years_present = sorted(group['year'].unique())
start_year = int(min(years_present))
end_year_actual = int(max(years_present))
expected_years = list(range(start_year, self.end_year + 1))
missing_years = [y for y in expected_years if y not in years_present]
has_gap = len(missing_years) > 0
is_complete = (
end_year_actual >= self.end_year and
not has_gap and
(self.end_year - start_year) >= 4
)
if is_complete:
valid_combinations.append({'country_id': country_id, 'indicator_id': indicator_id})
else:
reasons = []
if end_year_actual < self.end_year:
reasons.append(f"ends {end_year_actual}")
if has_gap:
gap_str = str(missing_years[:3])[1:-1]
if len(missing_years) > 3:
gap_str += "..."
reasons.append(f"gap:{gap_str}")
if (self.end_year - start_year) < 4:
reasons.append(f"span={self.end_year - start_year}")
removed_combinations.append({
'country_name' : country_name,
'indicator_name': indicator_name,
'reasons' : ", ".join(reasons)
})
self.logger.info(f"\n [+] Valid: {len(valid_combinations):,}")
self.logger.info(f" [-] Removed: {len(removed_combinations):,}")
df_valid = pd.DataFrame(valid_combinations)
df_valid['key'] = (
df_valid['country_id'].astype(str) + '_' +
df_valid['indicator_id'].astype(str)
)
self.df_clean['key'] = (
self.df_clean['country_id'].astype(str) + '_' +
self.df_clean['indicator_id'].astype(str)
)
original_count = len(self.df_clean)
self.df_clean = self.df_clean[self.df_clean['key'].isin(df_valid['key'])].copy()
self.df_clean = self.df_clean.drop('key', axis=1)
self.logger.info(f"\n Rows before: {original_count:,}")
self.logger.info(f" Rows after: {len(self.df_clean):,}")
self.logger.info(f" Countries: {self.df_clean['country_id'].nunique()}")
self.logger.info(f" Indicators: {self.df_clean['indicator_id'].nunique()}")
return self.df_clean
# ------------------------------------------------------------------
# STEP 4: SELECT COUNTRIES WITH ALL PILLARS
# ------------------------------------------------------------------
def select_countries_with_all_pillars(self):
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 4: SELECT COUNTRIES WITH ALL PILLARS (FIXED SET)")
self.logger.info("=" * 80)
total_pillars = self.df_clean['pillar_id'].nunique()
country_pillar_count = self.df_clean.groupby(['country_id', 'country_name']).agg({
'pillar_id' : 'nunique',
'indicator_id': 'nunique',
'year' : lambda x: f"{int(x.min())}-{int(x.max())}"
}).reset_index()
country_pillar_count.columns = [
'country_id', 'country_name', 'pillar_count', 'indicator_count', 'year_range'
]
for _, row in country_pillar_count.sort_values('pillar_count', ascending=False).iterrows():
status = "[+] KEEP" if row['pillar_count'] == total_pillars else "[-] REMOVE"
self.logger.info(
f" {status:<12} {row['country_name']:25s} "
f"{row['pillar_count']}/{total_pillars} pillars"
)
selected_countries = country_pillar_count[
country_pillar_count['pillar_count'] == total_pillars
]
self.selected_country_ids = selected_countries['country_id'].tolist()
self.logger.info(f"\n FIXED SET: {len(self.selected_country_ids)} countries")
original_count = len(self.df_clean)
self.df_clean = self.df_clean[
self.df_clean['country_id'].isin(self.selected_country_ids)
].copy()
self.logger.info(f" Rows before: {original_count:,}")
self.logger.info(f" Rows after: {len(self.df_clean):,}")
return self.df_clean
# ------------------------------------------------------------------
# STEP 5: FILTER INDICATORS CONSISTENT ACROSS FIXED COUNTRIES
# ------------------------------------------------------------------
def filter_indicators_consistent_across_fixed_countries(self):
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 5: FILTER INDICATORS WITH CONSISTENT PRESENCE")
self.logger.info("=" * 80)
indicator_country_start = self.df_clean.groupby([
'indicator_id', 'indicator_name', 'country_id'
])['year'].min().reset_index()
indicator_country_start.columns = [
'indicator_id', 'indicator_name', 'country_id', 'start_year'
]
indicator_max_start = indicator_country_start.groupby([
'indicator_id', 'indicator_name'
])['start_year'].max().reset_index()
indicator_max_start.columns = ['indicator_id', 'indicator_name', 'max_start_year']
valid_indicators = []
removed_indicators = []
for _, ind_row in indicator_max_start.iterrows():
indicator_id = ind_row['indicator_id']
indicator_name = ind_row['indicator_name']
max_start = int(ind_row['max_start_year'])
span = self.end_year - max_start
if span < 4:
removed_indicators.append({
'indicator_name': indicator_name,
'reason' : f"span={span} < 4"
})
continue
expected_years = list(range(max_start, self.end_year + 1))
ind_data = self.df_clean[self.df_clean['indicator_id'] == indicator_id]
all_years_complete = True
problematic_years = []
for year in expected_years:
country_count = ind_data[ind_data['year'] == year]['country_id'].nunique()
if country_count < len(self.selected_country_ids):
all_years_complete = False
problematic_years.append(f"{int(year)}({country_count})")
if all_years_complete:
valid_indicators.append(indicator_id)
else:
removed_indicators.append({
'indicator_name': indicator_name,
'reason' : f"missing countries in years: {', '.join(problematic_years[:5])}"
})
self.logger.info(f"\n [+] Valid: {len(valid_indicators)}")
self.logger.info(f" [-] Removed: {len(removed_indicators)}")
if not valid_indicators:
raise ValueError("No valid indicators found after filtering!")
original_count = len(self.df_clean)
self.df_clean = self.df_clean[
self.df_clean['indicator_id'].isin(valid_indicators)
].copy()
self.indicator_max_start_map = (
indicator_max_start[indicator_max_start['indicator_id'].isin(valid_indicators)]
.set_index('indicator_id')['max_start_year']
.to_dict()
)
self.logger.info(f"\n Rows before : {original_count:,}")
self.logger.info(f" Rows after : {len(self.df_clean):,}")
self.logger.info(f" Countries : {self.df_clean['country_id'].nunique()}")
self.logger.info(f" Indicators : {self.df_clean['indicator_id'].nunique()}")
self.logger.info(f" Pillars : {self.df_clean['pillar_id'].nunique()}")
self.logger.info(
f"\n [NOTE] Baris year < max_start_year TETAP ADA di data. "
f"Label framework akan ditentukan di Step 6."
)
return self.df_clean
# ------------------------------------------------------------------
# STEP 6: ASSIGN FRAMEWORK PER ROW
# ------------------------------------------------------------------
def assign_framework(self):
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 6: ASSIGN FRAMEWORK PER ROW")
self.logger.info("=" * 80)
self.logger.info(f"\n SDG_TRANSITION_YEAR : {self.sdg_transition_year} (HARDCODE)")
self.logger.info(f" Alasan : SDGs resmi berlaku 1 Januari 2015")
self.logger.info(f" Bukan auto-detect : data FIES/anaemia ada sejak 2013,")
self.logger.info(f" tapi tahun 2013-2014 harus tetap MDGs")
indicator_info = (
self.df_clean[['indicator_id', 'indicator_name']]
.drop_duplicates()
.copy()
)
indicator_info['is_sdg_only'] = (
indicator_info['indicator_name']
.str.lower()
.str.strip()
.isin(SDG_ONLY_KEYWORDS)
)
sdg_only_ids = set(
indicator_info.loc[indicator_info['is_sdg_only'], 'indicator_id']
)
non_sdg_ids = set(
indicator_info.loc[~indicator_info['is_sdg_only'], 'indicator_id']
)
self.logger.info(f"\n SDG-only indicators ({len(sdg_only_ids)}):")
for _, row in indicator_info[indicator_info['is_sdg_only']].iterrows():
actual_start = self.indicator_max_start_map.get(row['indicator_id'], '?')
self.logger.info(
f" [SDG-only] id={int(row['indicator_id'])} "
f"actual_start={actual_start} | {row['indicator_name']}"
)
self.logger.info(f"\n Non-SDG-only indicators ({len(non_sdg_ids)}): → MDGs selalu")
if not sdg_only_ids:
raise ValueError(
"Tidak ada indikator SDG-only (FIES/anaemia) yang lolos filter. "
"Pastikan nama indikator di SDG_ONLY_KEYWORDS cocok dengan data BigQuery."
)
self.df_clean['_is_sdg_only'] = self.df_clean['indicator_id'].isin(sdg_only_ids)
self.df_clean['framework'] = np.where(
self.df_clean['_is_sdg_only'] &
(self.df_clean['year'] >= self.sdg_transition_year),
'SDGs',
'MDGs'
)
self.df_clean = self.df_clean.drop(columns=['_is_sdg_only'])
self.logger.info(f"\n Logika assign framework (PER BARIS):")
self.logger.info(f" {''*72}")
self.logger.info(f" Indikator TIDAK di SDG_ONLY_KEYWORDS → 'MDGs' di semua tahun")
self.logger.info(f" Indikator DI SDG_ONLY_KEYWORDS:")
self.logger.info(f" year < {self.sdg_transition_year}'MDGs' (data tetap ada, tidak dihapus)")
self.logger.info(f" year >= {self.sdg_transition_year}'SDGs'")
self.logger.info(f" {''*72}")
self.logger.info(f"\n Verifikasi framework per indikator:")
self.logger.info(f" {''*115}")
self.logger.info(
f" {'ID':<5} {'Indicator Name':<52} {'Data From':<11} "
f"{'MDGs rows':<11} {'SDGs rows':<11} {'Note'}"
)
self.logger.info(f" {''*115}")
for ind_id, grp in self.df_clean.groupby('indicator_id'):
ind_name = grp['indicator_name'].iloc[0]
mdgs_rows = (grp['framework'] == 'MDGs').sum()
sdgs_rows = (grp['framework'] == 'SDGs').sum()
is_sdg_only = ind_id in sdg_only_ids
data_from = int(grp['year'].min())
if is_sdg_only:
mdgs_yrs = sorted(grp[grp['framework'] == 'MDGs']['year'].unique())
sdgs_yrs = sorted(grp[grp['framework'] == 'SDGs']['year'].unique())
yr_range_mdgs = f"{min(mdgs_yrs)}-{max(mdgs_yrs)}" if mdgs_yrs else "-"
yr_range_sdgs = f"{min(sdgs_yrs)}-{max(sdgs_yrs)}" if sdgs_yrs else "-"
note = f"MDGs:{yr_range_mdgs} | SDGs:{yr_range_sdgs}"
else:
note = "MDGs always"
self.logger.info(
f" {int(ind_id):<5} {ind_name[:50]:<52} {data_from:<11} "
f"{mdgs_rows:<11} {sdgs_rows:<11} {note}"
)
fw_summary = self.df_clean['framework'].value_counts()
self.logger.info(f"\n Ringkasan rows: " + " | ".join(
f"{fw}: {cnt:,}" for fw, cnt in fw_summary.items()
))
end_year_df = self.df_clean[self.df_clean['year'] == self.end_year]
fw_ind_summary = end_year_df.groupby('framework')['indicator_id'].nunique()
self.logger.info(f" Indicators di year={self.end_year}: " + " | ".join(
f"{fw}: {cnt}" for fw, cnt in fw_ind_summary.items()
))
self.logger.info(
f"\n [OK] 'framework' ditambahkan — "
f"MDGs: {(self.df_clean['framework'] == 'MDGs').sum():,} rows | "
f"SDGs: {(self.df_clean['framework'] == 'SDGs').sum():,} rows"
)
return self.df_clean
# ------------------------------------------------------------------
# STEP 7: VERIFY NO GAPS
# ------------------------------------------------------------------
def verify_no_gaps(self):
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 7: VERIFY NO GAPS")
self.logger.info("=" * 80)
expected_countries = len(self.selected_country_ids)
all_good = True
bad_rows = []
for ind_id, grp in self.df_clean.groupby('indicator_id'):
actual_start = self.indicator_max_start_map.get(ind_id)
if actual_start is None:
continue
expected_years = list(range(int(actual_start), self.end_year + 1))
for year in expected_years:
country_count = grp[grp['year'] == year]['country_id'].nunique()
if country_count != expected_countries:
all_good = False
bad_rows.append({
'indicator_id' : int(ind_id),
'year' : int(year),
'country_count': int(country_count),
})
if all_good:
self.logger.info(
f" VERIFICATION PASSED — all combinations from actual_start_year "
f"have {expected_countries} countries"
)
else:
for row in bad_rows[:10]:
self.logger.error(
f" Indicator {row['indicator_id']}, Year {row['year']}: "
f"{row['country_count']} countries (expected {expected_countries})"
)
raise ValueError("Gap verification failed!")
return True
# ------------------------------------------------------------------
# STEP 8: CALCULATE NORM_VALUE_1_100 PER INDICATOR
# ------------------------------------------------------------------
# PERBAIKAN:
# Normalisasi dilakukan SEKALI per indikator dari SELURUH DATA
# (semua tahun 2013end_year, semua negara, tanpa memisahkan framework).
#
# Alasan:
# - Sebelumnya, rescaling per-framework di analysis_layer menyebabkan
# nilai 1-100 era MDGs dan SDGs memiliki referensi yang berbeda,
# sehingga tidak dapat dibandingkan secara adil.
# - Dengan satu normalisasi global per indikator, nilai 60 di era MDGs
# dan nilai 60 di era SDGs berarti hal yang sama: posisi relatif yang
# sama dalam distribusi historis indikator tersebut.
# - Jika SDGs memang era yang lebih buruk secara substantif, itu akan
# tercermin sebagai nilai norm yang memang lebih rendah — bukan artefak
# dari rescaling ulang.
# ------------------------------------------------------------------
def calculate_norm_value(self):
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 8: CALCULATE NORM_VALUE_1_100 PER INDICATOR (GLOBAL, SEKALI)")
self.logger.info("=" * 80)
self.logger.info(
"\n [PERBAIKAN] Normalisasi dilakukan SEKALI per indikator dari seluruh data."
"\n Tidak ada rescaling ulang per-framework."
"\n Ini memastikan komparabilitas lintas era MDGs dan SDGs."
)
DIRECTION_INVERT = frozenset({
"negative", "lower_better", "lower_is_better", "inverse", "neg",
})
df = self.df_clean.copy()
norm_parts = []
indicators = df.groupby(['indicator_id', 'indicator_name', 'direction'])
self.logger.info(
f"\n {'ID':<5} {'Direction':<15} {'Invert':<8} "
f"{'Min':>10} {'Max':>10} {'Indicator Name'}"
)
self.logger.info(f" {'-'*90}")
for (ind_id, ind_name, direction), grp in indicators:
grp = grp.copy()
do_invert = str(direction).lower().strip() in DIRECTION_INVERT
valid_mask = grp['value'].notna()
n_valid = valid_mask.sum()
if n_valid < 2:
grp['norm_value_1_100'] = np.nan
norm_parts.append(grp)
self.logger.warning(
f" {int(ind_id):<5} {direction:<15} {'N/A':<8} "
f"{'N/A':>10} {'N/A':>10} {ind_name[:45]} [SKIPPED: n_valid={n_valid}]"
)
continue
raw = grp.loc[valid_mask, 'value'].values
v_min = raw.min()
v_max = raw.max()
normed = np.full(len(grp), np.nan)
if v_min == v_max:
# Semua nilai sama → assign tengah skala
normed[valid_mask.values] = 50.5
else:
scaled = (raw - v_min) / (v_max - v_min)
if do_invert:
scaled = 1.0 - scaled
normed[valid_mask.values] = 1.0 + scaled * 99.0
grp['norm_value_1_100'] = normed
self.logger.info(
f" {int(ind_id):<5} {direction:<15} {'YES' if do_invert else 'no':<8} "
f"{v_min:>10.3f} {v_max:>10.3f} {ind_name[:45]}"
)
norm_parts.append(grp)
self.df_clean = pd.concat(norm_parts, ignore_index=True)
valid_norm = self.df_clean['norm_value_1_100'].notna().sum()
null_norm = self.df_clean['norm_value_1_100'].isna().sum()
self.logger.info(f"\n norm_value_1_100 — valid: {valid_norm:,} | null: {null_norm:,}")
self.logger.info(
f" Range aktual: "
f"{self.df_clean['norm_value_1_100'].min():.2f} - "
f"{self.df_clean['norm_value_1_100'].max():.2f}"
)
# ----------------------------------------------------------------
# VALIDASI KOMPARABILITAS: Cek apakah ada gap sistematis antar era
# Ini adalah sinyal diagnostik — bukan error.
# Gap besar (>15 poin) setelah perbaikan = fenomena nyata, bukan artefak.
# ----------------------------------------------------------------
self.logger.info(f"\n [DIAGNOSTIK KOMPARABILITAS] Rata-rata norm per framework per tahun:")
self.logger.info(f" {''*55}")
fw_year_mean = (
self.df_clean
.groupby(['framework', 'year'])['norm_value_1_100']
.mean()
.reset_index()
.sort_values(['framework', 'year'])
)
for fw, grp_fw in fw_year_mean.groupby('framework'):
means = grp_fw['norm_value_1_100'].values
years = grp_fw['year'].values
self.logger.info(f"\n Framework: {fw}")
for yr, m in zip(years, means):
bar = '' * int(m / 5)
self.logger.info(f" {int(yr)} : {m:6.2f} {bar}")
# Bandingkan rata-rata MDGs vs SDGs (hanya tahun di mana keduanya ada)
mdgs_mean_total = self.df_clean[self.df_clean['framework'] == 'MDGs']['norm_value_1_100'].mean()
sdgs_mean_total = self.df_clean[self.df_clean['framework'] == 'SDGs']['norm_value_1_100'].mean()
gap = mdgs_mean_total - sdgs_mean_total
self.logger.info(
f"\n Rata-rata keseluruhan:"
f"\n MDGs : {mdgs_mean_total:.2f}"
f"\n SDGs : {sdgs_mean_total:.2f}"
f"\n Gap : {gap:.2f} poin"
)
if abs(gap) > 15:
self.logger.info(
f"\n [INFO] Gap {gap:.2f} poin antara MDGs dan SDGs."
f"\n Setelah perbaikan normalisasi (satu referensi global),"
f"\n gap ini mencerminkan perbedaan SUBSTANTIF, bukan artefak teknis."
f"\n Indikator SDGs memang mengukur dimensi deprivasi yang lebih dalam"
f"\n (FIES, stunting, wasting, anaemia) dibanding indikator MDGs."
)
else:
self.logger.info(
f"\n [OK] Gap {gap:.2f} poin — dalam batas wajar, tidak ada bias sistematis."
)
# Distribusi kondisi
self.df_clean['_condition_preview'] = (
self.df_clean['norm_value_1_100'].apply(assign_condition)
)
cond_dist = self.df_clean['_condition_preview'].value_counts()
self.logger.info(
f"\n Distribusi kondisi "
f"(threshold: bad<{THRESHOLD_BAD}, good>{THRESHOLD_GOOD}):"
)
for cond, cnt in cond_dist.items():
self.logger.info(f" {cond}: {cnt:,} rows")
self.df_clean = self.df_clean.drop(columns=['_condition_preview'])
self.logger.info(f"\n [OK] Kolom 'norm_value_1_100' ditambahkan ke df_clean")
return self.df_clean
# ------------------------------------------------------------------
# STEP 9: CALCULATE YOY
# ------------------------------------------------------------------
def calculate_yoy(self):
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 9: CALCULATE YEAR-OVER-YEAR (YoY) PER INDICATOR PER COUNTRY")
self.logger.info("=" * 80)
df = self.df_clean.sort_values(['country_id', 'indicator_id', 'year']).copy()
df['value_prev'] = df.groupby(['country_id', 'indicator_id'])['value'].shift(1)
df['yoy_change'] = df['value'] - df['value_prev']
df['yoy_pct'] = np.where(
df['value_prev'].notna() & (df['value_prev'] != 0),
(df['yoy_change'] / df['value_prev'].abs()) * 100,
np.nan
)
df = df.drop(columns=['value_prev'])
total_rows = len(df)
valid_yoy = df['yoy_pct'].notna().sum()
null_yoy = df['yoy_pct'].isna().sum()
self.logger.info(f" Total rows : {total_rows:,}")
self.logger.info(f" YoY calculated : {valid_yoy:,}")
self.logger.info(f" YoY NULL (base yr): {null_yoy:,}")
self.df_clean = df
self.logger.info(f" [OK] Kolom 'yoy_change', 'yoy_pct' ditambahkan")
return self.df_clean
# ------------------------------------------------------------------
# STEP 10: ANALYZE INDICATOR AVAILABILITY BY YEAR
# ------------------------------------------------------------------
def analyze_indicator_availability_by_year(self):
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 10: ANALYZE INDICATOR AVAILABILITY BY YEAR")
self.logger.info("=" * 80)
year_stats = self.df_clean.groupby('year').agg({
'indicator_id': 'nunique',
'country_id' : 'nunique'
}).reset_index()
year_stats.columns = ['year', 'indicator_count', 'country_count']
self.logger.info(f"\n{'Year':<8} {'Indicators':<15} {'Countries':<12} {'Rows'}")
self.logger.info("-" * 50)
for _, row in year_stats.iterrows():
year = int(row['year'])
row_count = len(self.df_clean[self.df_clean['year'] == year])
self.logger.info(
f"{year:<8} {int(row['indicator_count']):<15} "
f"{int(row['country_count']):<12} {row_count:,}"
)
indicator_details = self.df_clean.groupby([
'indicator_id', 'indicator_name', 'pillar_name', 'direction'
]).agg({'year': ['min', 'max'], 'country_id': 'nunique'}).reset_index()
indicator_details.columns = [
'indicator_id', 'indicator_name', 'pillar_name', 'direction',
'start_year', 'end_year', 'country_count'
]
fw_at_end = (
self.df_clean[self.df_clean['year'] == self.end_year]
.groupby('indicator_id')['framework']
.first()
.reset_index()
)
indicator_details = indicator_details.merge(fw_at_end, on='indicator_id', how='left')
indicator_details['framework'] = indicator_details['framework'].fillna('MDGs')
indicator_details['year_range'] = (
indicator_details['start_year'].astype(int).astype(str) + '-' +
indicator_details['end_year'].astype(int).astype(str)
)
indicator_details = indicator_details.sort_values(
['framework', 'pillar_name', 'start_year', 'indicator_name']
)
self.logger.info(f"\nTotal Indicators: {len(indicator_details)}")
self.logger.info(f"Framework breakdown (at end_year={self.end_year}):")
for fw, count in indicator_details.groupby('framework').size().items():
self.logger.info(f" {fw}: {count} indicators")
self.logger.info(f"\n{'-'*110}")
self.logger.info(
f"{'ID':<5} {'Indicator Name':<55} {'Pillar':<15} "
f"{'Framework':<10} {'Years':<12} {'Dir':<8} {'Countries'}"
)
self.logger.info(f"{'-'*110}")
for _, row in indicator_details.iterrows():
direction = 'higher+' if row['direction'] == 'higher_better' else 'lower-'
self.logger.info(
f"{int(row['indicator_id']):<5} {row['indicator_name'][:52]:<55} "
f"{row['pillar_name'][:13]:<15} {row['framework']:<10} "
f"{row['year_range']:<12} {direction:<8} {int(row['country_count'])}"
)
return year_stats
# ------------------------------------------------------------------
# STEP 11: SAVE ANALYTICAL TABLE
# ------------------------------------------------------------------
def save_analytical_table(self):
table_name = 'fact_asean_food_security_selected'
self.logger.info("\n" + "=" * 80)
self.logger.info(f"STEP 11: SAVE TO [DW/Gold] {table_name} -> fs_asean_gold")
self.logger.info("=" * 80)
try:
if 'framework' not in self.df_clean.columns:
raise ValueError("Kolom 'framework' tidak ada. Pastikan Step 6 sudah dijalankan.")
if 'norm_value_1_100' not in self.df_clean.columns:
raise ValueError("Kolom 'norm_value_1_100' tidak ada. Pastikan Step 8 sudah dijalankan.")
if 'yoy_change' not in self.df_clean.columns:
raise ValueError("Kolom 'yoy_change' tidak ada. Pastikan Step 9 sudah dijalankan.")
analytical_df = self.df_clean[[
'country_id',
'country_name',
'indicator_id',
'indicator_name',
'direction',
'framework',
'pillar_id',
'pillar_name',
'time_id',
'year',
'value',
'norm_value_1_100',
'yoy_change',
'yoy_pct',
]].copy()
analytical_df = analytical_df.sort_values(
['year', 'country_name', 'pillar_name', 'indicator_name']
).reset_index(drop=True)
analytical_df['country_id'] = analytical_df['country_id'].astype(int)
analytical_df['country_name'] = analytical_df['country_name'].astype(str)
analytical_df['indicator_id'] = analytical_df['indicator_id'].astype(int)
analytical_df['indicator_name'] = analytical_df['indicator_name'].astype(str)
analytical_df['direction'] = analytical_df['direction'].astype(str)
analytical_df['framework'] = analytical_df['framework'].astype(str)
analytical_df['pillar_id'] = analytical_df['pillar_id'].astype(int)
analytical_df['pillar_name'] = analytical_df['pillar_name'].astype(str)
analytical_df['time_id'] = analytical_df['time_id'].astype(int)
analytical_df['year'] = analytical_df['year'].astype(int)
analytical_df['value'] = analytical_df['value'].astype(float)
analytical_df['norm_value_1_100'] = analytical_df['norm_value_1_100'].astype(float)
analytical_df['yoy_change'] = analytical_df['yoy_change'].astype(float)
analytical_df['yoy_pct'] = analytical_df['yoy_pct'].astype(float)
self.logger.info(f" Total rows: {len(analytical_df):,}")
fw_dist_rows = analytical_df['framework'].value_counts()
self.logger.info(f" Framework distribution (rows):")
for fw, cnt in fw_dist_rows.items():
self.logger.info(f" {fw}: {cnt:,} rows")
fw_dist_ind = (
analytical_df[analytical_df['year'] == self.end_year]
.drop_duplicates('indicator_id')['framework']
.value_counts()
)
self.logger.info(
f" Framework distribution (indicators at year={self.end_year}):"
)
for fw, cnt in fw_dist_ind.items():
self.logger.info(f" {fw}: {cnt} indicators")
self.logger.info(
f" norm_value_1_100 range: "
f"{analytical_df['norm_value_1_100'].min():.2f} - "
f"{analytical_df['norm_value_1_100'].max():.2f}"
)
schema = [
bigquery.SchemaField("country_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("country_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("indicator_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("indicator_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("direction", "STRING", mode="REQUIRED"),
bigquery.SchemaField("framework", "STRING", mode="REQUIRED"),
bigquery.SchemaField("pillar_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("pillar_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("time_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("value", "FLOAT", mode="REQUIRED"),
bigquery.SchemaField("norm_value_1_100", "FLOAT", mode="NULLABLE"),
bigquery.SchemaField("yoy_change", "FLOAT", mode="NULLABLE"),
bigquery.SchemaField("yoy_pct", "FLOAT", mode="NULLABLE"),
]
rows_loaded = load_to_bigquery(
self.client, analytical_df, table_name,
layer='gold', write_disposition="WRITE_TRUNCATE", schema=schema
)
self.pipeline_metadata['rows_loaded'] = rows_loaded
log_update(self.client, 'DW', table_name, 'full_load', rows_loaded)
metadata = {
'source_class' : self.__class__.__name__,
'table_name' : table_name,
'execution_timestamp': self.pipeline_start,
'duration_seconds' : (datetime.now() - self.pipeline_start).total_seconds(),
'rows_fetched' : self.pipeline_metadata['rows_fetched'],
'rows_transformed' : rows_loaded,
'rows_loaded' : rows_loaded,
'completeness_pct' : 100.0,
'config_snapshot' : json.dumps({
'start_year' : self.start_year,
'end_year' : self.end_year,
'baseline_year' : self.baseline_year,
'sdg_transition_year' : self.sdg_transition_year,
'sdg_transition_source' : 'HARDCODE — SDGs resmi berlaku 1 Jan 2015',
'fixed_countries' : len(self.selected_country_ids),
'norm_scale' : (
'1-100 per indicator global minmax direction-aware. '
'SATU normalisasi untuk seluruh data tanpa rescaling per-framework. '
'Komparabilitas lintas era MDGs/SDGs terjamin.'
),
'framework_logic' : (
f'SDG_TRANSITION_YEAR={SDG_TRANSITION_YEAR} (HARDCODE); '
'SDG-only + year >= SDG_TRANSITION_YEAR → SDGs; '
'SDG-only + year < SDG_TRANSITION_YEAR → MDGs (data tetap ada); '
'non-SDG-only → MDGs selalu'
),
'sdg_only_keywords_count': len(SDG_ONLY_KEYWORDS),
'condition_thresholds' : {
'bad' : f'< {THRESHOLD_BAD}',
'moderate': f'{THRESHOLD_BAD}-{THRESHOLD_GOOD}',
'good' : f'> {THRESHOLD_GOOD}',
},
}),
'validation_metrics' : json.dumps({
'fixed_countries' : len(self.selected_country_ids),
'total_indicators' : int(self.df_clean['indicator_id'].nunique()),
'sdg_transition_year': self.sdg_transition_year,
'framework_dist_rows': fw_dist_rows.to_dict(),
})
}
save_etl_metadata(self.client, metadata)
self.logger.info(f" [OK] {table_name}: {rows_loaded:,} rows -> fs_asean_gold")
return rows_loaded
except Exception as e:
self.logger.error(f"Error saving: {e}")
raise
# ------------------------------------------------------------------
# RUN
# ------------------------------------------------------------------
def run(self):
self.pipeline_start = datetime.now()
self.pipeline_metadata['start_time'] = self.pipeline_start
self.logger.info("\n" + "=" * 80)
self.logger.info("Output: fact_asean_food_security_selected -> fs_asean_gold")
self.logger.info("Kolom baru: norm_value_1_100 (min-max 1-100, direction-aware)")
self.logger.info(f"Condition threshold: bad<{THRESHOLD_BAD}, good>{THRESHOLD_GOOD}")
self.logger.info(
f"Framework: SDG_TRANSITION_YEAR={SDG_TRANSITION_YEAR} (HARDCODE). "
"SDG-only + year >= 2015 → SDGs; sebelumnya MDGs. Non-SDG-only → MDGs selalu."
)
self.logger.info(
"NORMALISASI: SATU referensi global per indikator — tidak ada rescaling per-framework."
)
self.logger.info("=" * 80)
self.load_source_data()
self.determine_year_boundaries()
self.filter_complete_indicators_per_country()
self.select_countries_with_all_pillars()
self.filter_indicators_consistent_across_fixed_countries()
self.assign_framework()
self.verify_no_gaps()
self.calculate_norm_value()
self.calculate_yoy()
self.analyze_indicator_availability_by_year()
self.save_analytical_table()
self.pipeline_end = datetime.now()
duration = (self.pipeline_end - self.pipeline_start).total_seconds()
self.logger.info("\n" + "=" * 80)
self.logger.info("COMPLETED")
self.logger.info("=" * 80)
self.logger.info(f" Duration : {duration:.2f}s")
self.logger.info(f" Year Range : {self.start_year}-{self.end_year}")
self.logger.info(f" SDG Transition Year: {self.sdg_transition_year} (HARDCODE)")
self.logger.info(f" Countries : {len(self.selected_country_ids)}")
self.logger.info(f" Indicators : {self.df_clean['indicator_id'].nunique()}")
self.logger.info(f" Rows Loaded : {self.pipeline_metadata['rows_loaded']:,}")
# =============================================================================
# AIRFLOW TASK FUNCTION
# =============================================================================
def run_analytical_layer():
from scripts.bigquery_config import get_bigquery_client
client = get_bigquery_client()
loader = AnalyticalLayerLoader(client)
loader.run()
print(f"Analytical layer loaded: {loader.pipeline_metadata['rows_loaded']:,} rows")
# =============================================================================
# MAIN EXECUTION
# =============================================================================
if __name__ == "__main__":
print("=" * 80)
print("BIGQUERY ANALYTICAL LAYER - DATA FILTERING")
print("Output: fact_asean_food_security_selected -> fs_asean_gold")
print(f"Norm: min-max 1-100 per indicator, direction-aware, GLOBAL (satu referensi)")
print(f"Condition threshold: bad<{THRESHOLD_BAD}, good>{THRESHOLD_GOOD}")
print(
f"Framework: SDG_TRANSITION_YEAR={SDG_TRANSITION_YEAR} (HARDCODE). "
"SDG-only + year >= 2015 → SDGs; sebelumnya MDGs. Non-SDG-only → MDGs selalu."
)
print("=" * 80)
logger = setup_logging()
client = get_bigquery_client()
loader = AnalyticalLayerLoader(client)
loader.run()
print("\n" + "=" * 80)
print("[OK] COMPLETED")
print(f" SDG Transition Year : {loader.sdg_transition_year} (HARDCODE)")
print(f" Rows Loaded : {loader.pipeline_metadata['rows_loaded']:,}")
print("=" * 80)