Files
airflow-coolify/scripts/bigquery_analytical_layer.py
2026-04-02 07:10:41 +07:00

1118 lines
49 KiB
Python

"""
BIGQUERY ANALYTICAL LAYER - DATA FILTERING
fact_asean_food_security_selected disimpan di fs_asean_gold (layer='gold')
Filtering Order:
1. Load data (single years only)
2. Determine year boundaries (2013 - auto-detected end year, baseline=2023 per syarat dosen)
3. Filter complete indicators PER COUNTRY (auto-detect start year, no gaps)
4. Filter countries with ALL pillars (FIXED SET)
5. Filter indicators with consistent presence across FIXED countries
→ TIDAK menghapus baris year < max_start_year
→ Semua baris tetap ada; label framework ditentukan di Step 6
6. Assign framework (MDGs/SDGs) per indicator PER ROW
→ Indikator TIDAK di SDG_ONLY_KEYWORDS → 'MDGs' selalu
→ Indikator DI SDG_ONLY_KEYWORDS + year >= SDG_TRANSITION_YEAR → 'SDGs'
→ Indikator DI SDG_ONLY_KEYWORDS + year < SDG_TRANSITION_YEAR → 'MDGs'
→ SDG_TRANSITION_YEAR = 2016 (HARDCODE — tanggal resmi SDGs berlaku)
BUKAN dari actual_start_year data, karena data anaemia/FIES bisa ada
sebelum 2016 namun tetap harus dilabeli MDGs pada tahun-tahun tersebut.
7. Verify no gaps (dari actual_start_year per indikator, bukan start_year global)
8. Calculate norm_value_1_100 per indicator (min-max, direction-aware, global)
9. Calculate YoY per indicator per country
10. Analyze indicator availability by year
11. Save analytical table
FRAMEWORK LOGIC:
- SDG_TRANSITION_YEAR = 2016 (HARDCODE, bukan auto-detect dari data)
- Semua SDG-only indicators menggunakan SDG_TRANSITION_YEAR yang SAMA
sehingga label berubah serentak di satu titik waktu
- SDG-only + year < SDG_TRANSITION_YEAR → 'MDGs' (data tetap ada, tidak dihapus)
- SDG-only + year >= SDG_TRANSITION_YEAR → 'SDGs'
- Non-SDG-only indicators → 'MDGs' selalu (di semua tahun)
ALASAN HARDCODE:
- SDGs resmi diadopsi PBB pada 25 September 2015 dan mulai berlaku 1 Januari 2016
- Indikator FIES dan anaemia punya data sebelum 2016 (dari MDGs era)
- Jika sdg_transition_year di-auto-detect dari min(actual_start_year),
maka akan = 2013 (karena data ada sejak 2013), sehingga semua tahun
berlabel SDGs — yang secara historis tidak tepat.
"""
import pandas as pd
import numpy as np
from datetime import datetime
import logging
from typing import Dict, List
import json
import sys
if hasattr(sys.stdout, 'reconfigure'):
sys.stdout.reconfigure(encoding='utf-8')
from scripts.bigquery_config import get_bigquery_client, CONFIG, get_table_id
from scripts.bigquery_helpers import (
log_update,
load_to_bigquery,
read_from_bigquery,
setup_logging,
truncate_table,
save_etl_metadata,
)
from google.cloud import bigquery
# =============================================================================
# SDG-ONLY INDICATOR KEYWORDS
# =============================================================================
# Hanya indikator yang MURNI BARU di era SDGs yang didaftarkan di sini.
# Indikator di set ini → 'SDGs' mulai dari SDG_TRANSITION_YEAR (2016).
# Semua indikator lain (shared maupun tidak dikenal) → 'MDGs' di semua tahun.
SDG_ONLY_KEYWORDS = frozenset([
# TARGET 2.1.1
"prevalence of undernourishment (percent) (3-year average)",
"number of people undernourished (million) (3-year average)",
# TARGET 2.1.2 — FIES (SDGs only)
"prevalence of severe food insecurity in the total population (percent) (3-year average)",
"prevalence of severe food insecurity in the male adult population (percent) (3-year average)",
"prevalence of severe food insecurity in the female adult population (percent) (3-year average)",
"prevalence of moderate or severe food insecurity in the total population (percent) (3-year average)",
"prevalence of moderate or severe food insecurity in the male adult population (percent) (3-year average)",
"prevalence of moderate or severe food insecurity in the female adult population (percent) (3-year average)",
"number of severely food insecure people (million) (3-year average)",
"number of severely food insecure male adults (million) (3-year average)",
"number of severely food insecure female adults (million) (3-year average)",
"number of moderately or severely food insecure people (million) (3-year average)",
"number of moderately or severely food insecure male adults (million) (3-year average)",
"number of moderately or severely food insecure female adults (million) (3-year average)",
# TARGET 2.2.3 — Anaemia (SDGs only)
"prevalence of anemia among women of reproductive age (15-49 years) (percent)",
"number of women of reproductive age (15-49 years) affected by anemia (million)",
])
# =============================================================================
# SDG TRANSITION YEAR — HARDCODE
# =============================================================================
# SDGs resmi berlaku mulai 1 Januari 2016 (diadopsi PBB 25 September 2015).
# Nilai ini TIDAK boleh dihitung dari data karena indikator FIES/anaemia
# punya data historis sebelum 2016 yang harus tetap dilabeli 'MDGs'.
SDG_TRANSITION_YEAR = 2016
# =============================================================================
# THRESHOLD KONDISI (fixed absolute, skala 1-100)
# =============================================================================
THRESHOLD_BAD = 40.0
THRESHOLD_GOOD = 60.0
def assign_condition(norm_value_1_100: float) -> str:
"""
Assign kondisi berdasarkan norm_value_1_100 (skala 1-100, sudah direction-aware).
Returns: 'good' / 'moderate' / 'bad'
"""
if pd.isna(norm_value_1_100):
return None
if norm_value_1_100 > THRESHOLD_GOOD:
return 'good'
if norm_value_1_100 < THRESHOLD_BAD:
return 'bad'
return 'moderate'
# =============================================================================
# ANALYTICAL LAYER CLASS
# =============================================================================
class AnalyticalLayerLoader:
"""
Analytical Layer Loader for BigQuery
Output kolom fact_asean_food_security_selected:
country_id, country_name,
indicator_id, indicator_name, direction, framework,
pillar_id, pillar_name,
time_id, year, value,
norm_value_1_100,
yoy_change, yoy_pct
FRAMEWORK LOGIC:
- SDG_TRANSITION_YEAR = 2016 (HARDCODE — tanggal resmi SDGs berlaku)
- Indikator TIDAK di SDG_ONLY_KEYWORDS → 'MDGs' di SEMUA tahun
- Indikator DI SDG_ONLY_KEYWORDS:
year < SDG_TRANSITION_YEAR (2016) → 'MDGs' (data tetap ada, tidak dihapus)
year >= SDG_TRANSITION_YEAR (2016) → 'SDGs'
"""
def __init__(self, client: bigquery.Client):
self.client = client
self.logger = logging.getLogger(self.__class__.__name__)
self.logger.propagate = False
self.df_clean = None
self.df_indicator = None
self.df_country = None
self.df_pillar = None
self.selected_country_ids = None
self.indicator_max_start_map = {} # indicator_id → max_start_year (dari Step 5)
self.start_year = 2013
self.end_year = None
self.baseline_year = 2023
# SDG_TRANSITION_YEAR diambil dari konstanta modul (HARDCODE = 2016)
self.sdg_transition_year = SDG_TRANSITION_YEAR
self.pipeline_metadata = {
'source_class' : self.__class__.__name__,
'start_time' : None,
'end_time' : None,
'duration_seconds' : None,
'rows_fetched' : 0,
'rows_transformed' : 0,
'rows_loaded' : 0,
'validation_metrics': {}
}
self.pipeline_start = None
self.pipeline_end = None
# ------------------------------------------------------------------
# STEP 1: LOAD SOURCE DATA
# ------------------------------------------------------------------
def load_source_data(self):
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 1: LOADING SOURCE DATA from fs_asean_gold")
self.logger.info("=" * 80)
try:
query = f"""
SELECT
f.country_id,
c.country_name,
f.indicator_id,
i.indicator_name,
i.direction,
f.pillar_id,
p.pillar_name,
f.time_id,
t.year,
t.start_year,
t.end_year,
t.is_year_range,
f.value,
f.source_id
FROM `{get_table_id('fact_food_security', layer='gold')}` f
JOIN `{get_table_id('dim_country', layer='gold')}` c ON f.country_id = c.country_id
JOIN `{get_table_id('dim_indicator', layer='gold')}` i ON f.indicator_id = i.indicator_id
JOIN `{get_table_id('dim_pillar', layer='gold')}` p ON f.pillar_id = p.pillar_id
JOIN `{get_table_id('dim_time', layer='gold')}` t ON f.time_id = t.time_id
"""
self.logger.info("Loading fact table with dimensions...")
self.df_clean = self.client.query(query).result().to_dataframe(
create_bqstorage_client=False
)
self.logger.info(f" Loaded: {len(self.df_clean):,} rows")
if 'is_year_range' in self.df_clean.columns:
yr = self.df_clean['is_year_range'].value_counts()
self.logger.info(
f" Single years: {yr.get(False, 0):,} | "
f"Year ranges: {yr.get(True, 0):,}"
)
self.df_indicator = read_from_bigquery(self.client, 'dim_indicator', layer='gold')
self.df_country = read_from_bigquery(self.client, 'dim_country', layer='gold')
self.df_pillar = read_from_bigquery(self.client, 'dim_pillar', layer='gold')
self.logger.info(f" Indicators: {len(self.df_indicator)}")
self.logger.info(f" Countries: {len(self.df_country)}")
self.logger.info(f" Pillars: {len(self.df_pillar)}")
self.pipeline_metadata['rows_fetched'] = len(self.df_clean)
return True
except Exception as e:
self.logger.error(f"Error loading source data: {e}")
raise
# ------------------------------------------------------------------
# STEP 2: DETERMINE YEAR BOUNDARIES
# ------------------------------------------------------------------
def determine_year_boundaries(self):
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 2: DETERMINE YEAR BOUNDARIES")
self.logger.info("=" * 80)
df_baseline = self.df_clean[self.df_clean['year'] == self.baseline_year]
baseline_indicator_count = df_baseline['indicator_id'].nunique()
self.logger.info(f"\n Baseline year (hardcode, syarat dosen): {self.baseline_year}")
self.logger.info(f" Baseline indicator count: {baseline_indicator_count}")
years_sorted = sorted(self.df_clean['year'].unique(), reverse=True)
selected_end_year = None
self.logger.info(f"\n Scanning end_year (>= {self.baseline_year}):")
for year in years_sorted:
if year >= self.baseline_year:
df_year = self.df_clean[self.df_clean['year'] == year]
year_indicator_count = df_year['indicator_id'].nunique()
status = "OK" if year_indicator_count >= baseline_indicator_count else "X"
self.logger.info(f" [{status}] Year {int(year)}: {year_indicator_count} indicators")
if year_indicator_count >= baseline_indicator_count and selected_end_year is None:
selected_end_year = int(year)
if selected_end_year is None:
selected_end_year = self.baseline_year
self.logger.warning(f" [!] Fallback to baseline: {selected_end_year}")
else:
self.logger.info(f"\n [OK] Selected end year: {selected_end_year}")
self.end_year = selected_end_year
original_count = len(self.df_clean)
self.df_clean = self.df_clean[
(self.df_clean['year'] >= self.start_year) &
(self.df_clean['year'] <= self.end_year)
].copy()
self.logger.info(f"\n Filtering {self.start_year}-{self.end_year}:")
self.logger.info(f" Rows before: {original_count:,}")
self.logger.info(f" Rows after : {len(self.df_clean):,}")
return self.df_clean
# ------------------------------------------------------------------
# STEP 3: FILTER COMPLETE INDICATORS PER COUNTRY
# ------------------------------------------------------------------
def filter_complete_indicators_per_country(self):
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 3: FILTER COMPLETE INDICATORS PER COUNTRY (NO GAPS)")
self.logger.info("=" * 80)
grouped = self.df_clean.groupby([
'country_id', 'country_name', 'indicator_id', 'indicator_name',
'pillar_id', 'pillar_name'
])
valid_combinations = []
removed_combinations = []
for (country_id, country_name, indicator_id, indicator_name,
pillar_id, pillar_name), group in grouped:
years_present = sorted(group['year'].unique())
start_year = int(min(years_present))
end_year_actual = int(max(years_present))
expected_years = list(range(start_year, self.end_year + 1))
missing_years = [y for y in expected_years if y not in years_present]
has_gap = len(missing_years) > 0
is_complete = (
end_year_actual >= self.end_year and
not has_gap and
(self.end_year - start_year) >= 4
)
if is_complete:
valid_combinations.append({'country_id': country_id, 'indicator_id': indicator_id})
else:
reasons = []
if end_year_actual < self.end_year:
reasons.append(f"ends {end_year_actual}")
if has_gap:
gap_str = str(missing_years[:3])[1:-1]
if len(missing_years) > 3:
gap_str += "..."
reasons.append(f"gap:{gap_str}")
if (self.end_year - start_year) < 4:
reasons.append(f"span={self.end_year - start_year}")
removed_combinations.append({
'country_name' : country_name,
'indicator_name': indicator_name,
'reasons' : ", ".join(reasons)
})
self.logger.info(f"\n [+] Valid: {len(valid_combinations):,}")
self.logger.info(f" [-] Removed: {len(removed_combinations):,}")
df_valid = pd.DataFrame(valid_combinations)
df_valid['key'] = (
df_valid['country_id'].astype(str) + '_' +
df_valid['indicator_id'].astype(str)
)
self.df_clean['key'] = (
self.df_clean['country_id'].astype(str) + '_' +
self.df_clean['indicator_id'].astype(str)
)
original_count = len(self.df_clean)
self.df_clean = self.df_clean[self.df_clean['key'].isin(df_valid['key'])].copy()
self.df_clean = self.df_clean.drop('key', axis=1)
self.logger.info(f"\n Rows before: {original_count:,}")
self.logger.info(f" Rows after: {len(self.df_clean):,}")
self.logger.info(f" Countries: {self.df_clean['country_id'].nunique()}")
self.logger.info(f" Indicators: {self.df_clean['indicator_id'].nunique()}")
return self.df_clean
# ------------------------------------------------------------------
# STEP 4: SELECT COUNTRIES WITH ALL PILLARS
# ------------------------------------------------------------------
def select_countries_with_all_pillars(self):
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 4: SELECT COUNTRIES WITH ALL PILLARS (FIXED SET)")
self.logger.info("=" * 80)
total_pillars = self.df_clean['pillar_id'].nunique()
country_pillar_count = self.df_clean.groupby(['country_id', 'country_name']).agg({
'pillar_id' : 'nunique',
'indicator_id': 'nunique',
'year' : lambda x: f"{int(x.min())}-{int(x.max())}"
}).reset_index()
country_pillar_count.columns = [
'country_id', 'country_name', 'pillar_count', 'indicator_count', 'year_range'
]
for _, row in country_pillar_count.sort_values('pillar_count', ascending=False).iterrows():
status = "[+] KEEP" if row['pillar_count'] == total_pillars else "[-] REMOVE"
self.logger.info(
f" {status:<12} {row['country_name']:25s} "
f"{row['pillar_count']}/{total_pillars} pillars"
)
selected_countries = country_pillar_count[
country_pillar_count['pillar_count'] == total_pillars
]
self.selected_country_ids = selected_countries['country_id'].tolist()
self.logger.info(f"\n FIXED SET: {len(self.selected_country_ids)} countries")
original_count = len(self.df_clean)
self.df_clean = self.df_clean[
self.df_clean['country_id'].isin(self.selected_country_ids)
].copy()
self.logger.info(f" Rows before: {original_count:,}")
self.logger.info(f" Rows after: {len(self.df_clean):,}")
return self.df_clean
# ------------------------------------------------------------------
# STEP 5: FILTER INDICATORS CONSISTENT ACROSS FIXED COUNTRIES
# ------------------------------------------------------------------
def filter_indicators_consistent_across_fixed_countries(self):
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 5: FILTER INDICATORS WITH CONSISTENT PRESENCE")
self.logger.info("=" * 80)
# Hitung max_start_year per indikator = max(min_year per country)
# = tahun pertama di mana SEMUA fixed countries sudah punya data
indicator_country_start = self.df_clean.groupby([
'indicator_id', 'indicator_name', 'country_id'
])['year'].min().reset_index()
indicator_country_start.columns = [
'indicator_id', 'indicator_name', 'country_id', 'start_year'
]
indicator_max_start = indicator_country_start.groupby([
'indicator_id', 'indicator_name'
])['start_year'].max().reset_index()
indicator_max_start.columns = ['indicator_id', 'indicator_name', 'max_start_year']
valid_indicators = []
removed_indicators = []
for _, ind_row in indicator_max_start.iterrows():
indicator_id = ind_row['indicator_id']
indicator_name = ind_row['indicator_name']
max_start = int(ind_row['max_start_year'])
span = self.end_year - max_start
if span < 4:
removed_indicators.append({
'indicator_name': indicator_name,
'reason' : f"span={span} < 4"
})
continue
# Cek apakah semua tahun dari max_start s/d end_year
# hadir di SEMUA fixed countries
expected_years = list(range(max_start, self.end_year + 1))
ind_data = self.df_clean[self.df_clean['indicator_id'] == indicator_id]
all_years_complete = True
problematic_years = []
for year in expected_years:
country_count = ind_data[ind_data['year'] == year]['country_id'].nunique()
if country_count < len(self.selected_country_ids):
all_years_complete = False
problematic_years.append(f"{int(year)}({country_count})")
if all_years_complete:
valid_indicators.append(indicator_id)
else:
removed_indicators.append({
'indicator_name': indicator_name,
'reason' : f"missing countries in years: {', '.join(problematic_years[:5])}"
})
self.logger.info(f"\n [+] Valid: {len(valid_indicators)}")
self.logger.info(f" [-] Removed: {len(removed_indicators)}")
if not valid_indicators:
raise ValueError("No valid indicators found after filtering!")
# ----------------------------------------------------------------
# Filter hanya indikator yang valid.
# PENTING: TIDAK menghapus baris year < max_start_year.
# Semua baris tetap ada — label framework ditentukan di Step 6.
# max_start_year disimpan sebagai lookup untuk Step 7.
# ----------------------------------------------------------------
original_count = len(self.df_clean)
self.df_clean = self.df_clean[
self.df_clean['indicator_id'].isin(valid_indicators)
].copy()
# Simpan max_start_year per indicator_id untuk Step 7
self.indicator_max_start_map = (
indicator_max_start[indicator_max_start['indicator_id'].isin(valid_indicators)]
.set_index('indicator_id')['max_start_year']
.to_dict()
)
self.logger.info(f"\n Rows before : {original_count:,}")
self.logger.info(f" Rows after : {len(self.df_clean):,}")
self.logger.info(f" Countries : {self.df_clean['country_id'].nunique()}")
self.logger.info(f" Indicators : {self.df_clean['indicator_id'].nunique()}")
self.logger.info(f" Pillars : {self.df_clean['pillar_id'].nunique()}")
self.logger.info(
f"\n [NOTE] Baris year < max_start_year TETAP ADA di data. "
f"Label framework akan ditentukan di Step 6."
)
return self.df_clean
# ------------------------------------------------------------------
# STEP 6: ASSIGN FRAMEWORK PER ROW
# ------------------------------------------------------------------
def assign_framework(self):
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 6: ASSIGN FRAMEWORK PER ROW")
self.logger.info("=" * 80)
# ----------------------------------------------------------------
# SDG_TRANSITION_YEAR = 2016 (HARDCODE)
# SDGs diadopsi PBB 25 September 2015, berlaku 1 Januari 2016.
#
# PENTING — TIDAK dihitung dari data:
# Jika auto-detect dari min(actual_start_year SDG-only indicators),
# hasilnya = 2013 (karena data FIES/anaemia ada sejak 2013).
# Akibatnya year >= 2013 → SDGs → SEMUA tahun berlabel SDGs.
# Ini secara historis salah karena SDGs belum berlaku di 2013-2015.
# ----------------------------------------------------------------
self.logger.info(f"\n SDG_TRANSITION_YEAR : {self.sdg_transition_year} (HARDCODE)")
self.logger.info(f" Alasan : SDGs resmi berlaku 1 Januari 2016")
self.logger.info(f" Bukan auto-detect : data FIES/anaemia ada sejak 2013,")
self.logger.info(f" tapi tahun 2013-2015 harus tetap MDGs")
# ----------------------------------------------------------------
# Identifikasi indikator SDG-only berdasarkan SDG_ONLY_KEYWORDS
# ----------------------------------------------------------------
indicator_info = (
self.df_clean[['indicator_id', 'indicator_name']]
.drop_duplicates()
.copy()
)
indicator_info['is_sdg_only'] = (
indicator_info['indicator_name']
.str.lower()
.str.strip()
.isin(SDG_ONLY_KEYWORDS)
)
sdg_only_ids = set(
indicator_info.loc[indicator_info['is_sdg_only'], 'indicator_id']
)
non_sdg_ids = set(
indicator_info.loc[~indicator_info['is_sdg_only'], 'indicator_id']
)
self.logger.info(f"\n SDG-only indicators ({len(sdg_only_ids)}):")
for _, row in indicator_info[indicator_info['is_sdg_only']].iterrows():
actual_start = self.indicator_max_start_map.get(row['indicator_id'], '?')
self.logger.info(
f" [SDG-only] id={int(row['indicator_id'])} "
f"actual_start={actual_start} | {row['indicator_name']}"
)
self.logger.info(f"\n Non-SDG-only indicators ({len(non_sdg_ids)}): → MDGs selalu")
# ----------------------------------------------------------------
# Validasi: pastikan ada SDG-only indicators yang lolos filter
# ----------------------------------------------------------------
if not sdg_only_ids:
raise ValueError(
"Tidak ada indikator SDG-only (FIES/anaemia) yang lolos filter. "
"Pastikan nama indikator di SDG_ONLY_KEYWORDS cocok dengan data BigQuery."
)
# ----------------------------------------------------------------
# Assign framework dengan vectorized np.where:
#
# Kondisi SDG-only AND year >= SDG_TRANSITION_YEAR → 'SDGs'
# Semua kondisi lain (non-SDG-only ATAU year < SDG_TRANSITION_YEAR) → 'MDGs'
#
# Hasilnya dalam 1 indikator SDG-only (misal anaemia, data mulai 2013):
# 2013, 2014, 2015 → 'MDGs' (data tetap ada)
# 2016, 2017, ... → 'SDGs'
# ----------------------------------------------------------------
self.df_clean['_is_sdg_only'] = self.df_clean['indicator_id'].isin(sdg_only_ids)
self.df_clean['framework'] = np.where(
self.df_clean['_is_sdg_only'] &
(self.df_clean['year'] >= self.sdg_transition_year),
'SDGs',
'MDGs'
)
self.df_clean = self.df_clean.drop(columns=['_is_sdg_only'])
# ----------------------------------------------------------------
# Log verifikasi per indikator — tampilkan split MDGs/SDGs per tahun
# ----------------------------------------------------------------
self.logger.info(f"\n Logika assign framework (PER BARIS):")
self.logger.info(f" {''*72}")
self.logger.info(f" Indikator TIDAK di SDG_ONLY_KEYWORDS → 'MDGs' di semua tahun")
self.logger.info(f" Indikator DI SDG_ONLY_KEYWORDS:")
self.logger.info(f" year < {self.sdg_transition_year}'MDGs' (data tetap ada, tidak dihapus)")
self.logger.info(f" year >= {self.sdg_transition_year}'SDGs'")
self.logger.info(f" {''*72}")
self.logger.info(f"\n Verifikasi framework per indikator:")
self.logger.info(f" {''*115}")
self.logger.info(
f" {'ID':<5} {'Indicator Name':<52} {'Data From':<11} "
f"{'MDGs rows':<11} {'SDGs rows':<11} {'Note'}"
)
self.logger.info(f" {''*115}")
for ind_id, grp in self.df_clean.groupby('indicator_id'):
ind_name = grp['indicator_name'].iloc[0]
mdgs_rows = (grp['framework'] == 'MDGs').sum()
sdgs_rows = (grp['framework'] == 'SDGs').sum()
is_sdg_only = ind_id in sdg_only_ids
data_from = int(grp['year'].min())
if is_sdg_only:
mdgs_yrs = sorted(grp[grp['framework'] == 'MDGs']['year'].unique())
sdgs_yrs = sorted(grp[grp['framework'] == 'SDGs']['year'].unique())
yr_range_mdgs = f"{min(mdgs_yrs)}-{max(mdgs_yrs)}" if mdgs_yrs else "-"
yr_range_sdgs = f"{min(sdgs_yrs)}-{max(sdgs_yrs)}" if sdgs_yrs else "-"
note = f"MDGs:{yr_range_mdgs} | SDGs:{yr_range_sdgs}"
else:
note = "MDGs always"
self.logger.info(
f" {int(ind_id):<5} {ind_name[:50]:<52} {data_from:<11} "
f"{mdgs_rows:<11} {sdgs_rows:<11} {note}"
)
fw_summary = self.df_clean['framework'].value_counts()
self.logger.info(f"\n Ringkasan rows: " + " | ".join(
f"{fw}: {cnt:,}" for fw, cnt in fw_summary.items()
))
end_year_df = self.df_clean[self.df_clean['year'] == self.end_year]
fw_ind_summary = end_year_df.groupby('framework')['indicator_id'].nunique()
self.logger.info(f" Indicators di year={self.end_year}: " + " | ".join(
f"{fw}: {cnt}" for fw, cnt in fw_ind_summary.items()
))
self.logger.info(
f"\n [OK] 'framework' ditambahkan — "
f"MDGs: {(self.df_clean['framework'] == 'MDGs').sum():,} rows | "
f"SDGs: {(self.df_clean['framework'] == 'SDGs').sum():,} rows"
)
return self.df_clean
# ------------------------------------------------------------------
# STEP 7: VERIFY NO GAPS
# ------------------------------------------------------------------
def verify_no_gaps(self):
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 7: VERIFY NO GAPS")
self.logger.info("=" * 80)
# ----------------------------------------------------------------
# Verifikasi dilakukan PER INDIKATOR dari actual_start_year-nya,
# bukan dari self.start_year global, karena tiap indikator bisa
# punya start year berbeda.
# Baris sebelum actual_start_year (yang berlabel MDGs) tidak dicek
# karena memang tidak semua country punya data di sana.
# ----------------------------------------------------------------
expected_countries = len(self.selected_country_ids)
all_good = True
bad_rows = []
for ind_id, grp in self.df_clean.groupby('indicator_id'):
actual_start = self.indicator_max_start_map.get(ind_id)
if actual_start is None:
continue
expected_years = list(range(int(actual_start), self.end_year + 1))
for year in expected_years:
country_count = grp[grp['year'] == year]['country_id'].nunique()
if country_count != expected_countries:
all_good = False
bad_rows.append({
'indicator_id' : int(ind_id),
'year' : int(year),
'country_count': int(country_count),
})
if all_good:
self.logger.info(
f" VERIFICATION PASSED — all combinations from actual_start_year "
f"have {expected_countries} countries"
)
else:
for row in bad_rows[:10]:
self.logger.error(
f" Indicator {row['indicator_id']}, Year {row['year']}: "
f"{row['country_count']} countries (expected {expected_countries})"
)
raise ValueError("Gap verification failed!")
return True
# ------------------------------------------------------------------
# STEP 8: CALCULATE NORM_VALUE_1_100 PER INDICATOR
# ------------------------------------------------------------------
def calculate_norm_value(self):
"""
Hitung norm_value_1_100 per indikator — min-max normalisasi skala 1-100,
direction-aware, global per indikator (semua negara + semua tahun).
"""
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 8: CALCULATE NORM_VALUE_1_100 PER INDICATOR")
self.logger.info("=" * 80)
DIRECTION_INVERT = frozenset({
"negative", "lower_better", "lower_is_better", "inverse", "neg",
})
df = self.df_clean.copy()
norm_parts = []
indicators = df.groupby(['indicator_id', 'indicator_name', 'direction'])
self.logger.info(
f"\n {'ID':<5} {'Direction':<15} {'Invert':<8} "
f"{'Min':>10} {'Max':>10} {'Indicator Name'}"
)
self.logger.info(f" {'-'*90}")
for (ind_id, ind_name, direction), grp in indicators:
grp = grp.copy()
do_invert = str(direction).lower().strip() in DIRECTION_INVERT
valid_mask = grp['value'].notna()
n_valid = valid_mask.sum()
if n_valid < 2:
grp['norm_value_1_100'] = np.nan
norm_parts.append(grp)
continue
raw = grp.loc[valid_mask, 'value'].values
v_min = raw.min()
v_max = raw.max()
normed = np.full(len(grp), np.nan)
if v_min == v_max:
normed[valid_mask.values] = 50.5
else:
scaled = (raw - v_min) / (v_max - v_min)
if do_invert:
scaled = 1.0 - scaled
normed[valid_mask.values] = 1.0 + scaled * 99.0
grp['norm_value_1_100'] = normed
self.logger.info(
f" {int(ind_id):<5} {direction:<15} {'YES' if do_invert else 'no':<8} "
f"{v_min:>10.3f} {v_max:>10.3f} {ind_name[:45]}"
)
norm_parts.append(grp)
self.df_clean = pd.concat(norm_parts, ignore_index=True)
valid_norm = self.df_clean['norm_value_1_100'].notna().sum()
null_norm = self.df_clean['norm_value_1_100'].isna().sum()
self.logger.info(f"\n norm_value_1_100 — valid: {valid_norm:,} | null: {null_norm:,}")
self.logger.info(
f" Range aktual: "
f"{self.df_clean['norm_value_1_100'].min():.2f} - "
f"{self.df_clean['norm_value_1_100'].max():.2f}"
)
self.df_clean['_condition_preview'] = (
self.df_clean['norm_value_1_100'].apply(assign_condition)
)
cond_dist = self.df_clean['_condition_preview'].value_counts()
self.logger.info(
f"\n Distribusi kondisi "
f"(threshold: bad<{THRESHOLD_BAD}, good>{THRESHOLD_GOOD}):"
)
for cond, cnt in cond_dist.items():
self.logger.info(f" {cond}: {cnt:,} rows")
self.df_clean = self.df_clean.drop(columns=['_condition_preview'])
self.logger.info(f"\n [OK] Kolom 'norm_value_1_100' ditambahkan ke df_clean")
return self.df_clean
# ------------------------------------------------------------------
# STEP 9: CALCULATE YOY
# ------------------------------------------------------------------
def calculate_yoy(self):
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 9: CALCULATE YEAR-OVER-YEAR (YoY) PER INDICATOR PER COUNTRY")
self.logger.info("=" * 80)
df = self.df_clean.sort_values(['country_id', 'indicator_id', 'year']).copy()
df['value_prev'] = df.groupby(['country_id', 'indicator_id'])['value'].shift(1)
df['yoy_change'] = df['value'] - df['value_prev']
df['yoy_pct'] = np.where(
df['value_prev'].notna() & (df['value_prev'] != 0),
(df['yoy_change'] / df['value_prev'].abs()) * 100,
np.nan
)
df = df.drop(columns=['value_prev'])
total_rows = len(df)
valid_yoy = df['yoy_pct'].notna().sum()
null_yoy = df['yoy_pct'].isna().sum()
self.logger.info(f" Total rows : {total_rows:,}")
self.logger.info(f" YoY calculated : {valid_yoy:,}")
self.logger.info(f" YoY NULL (base yr): {null_yoy:,}")
self.df_clean = df
self.logger.info(f" [OK] Kolom 'yoy_change', 'yoy_pct' ditambahkan")
return self.df_clean
# ------------------------------------------------------------------
# STEP 10: ANALYZE INDICATOR AVAILABILITY BY YEAR
# ------------------------------------------------------------------
def analyze_indicator_availability_by_year(self):
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 10: ANALYZE INDICATOR AVAILABILITY BY YEAR")
self.logger.info("=" * 80)
year_stats = self.df_clean.groupby('year').agg({
'indicator_id': 'nunique',
'country_id' : 'nunique'
}).reset_index()
year_stats.columns = ['year', 'indicator_count', 'country_count']
self.logger.info(f"\n{'Year':<8} {'Indicators':<15} {'Countries':<12} {'Rows'}")
self.logger.info("-" * 50)
for _, row in year_stats.iterrows():
year = int(row['year'])
row_count = len(self.df_clean[self.df_clean['year'] == year])
self.logger.info(
f"{year:<8} {int(row['indicator_count']):<15} "
f"{int(row['country_count']):<12} {row_count:,}"
)
indicator_details = self.df_clean.groupby([
'indicator_id', 'indicator_name', 'pillar_name', 'direction'
]).agg({'year': ['min', 'max'], 'country_id': 'nunique'}).reset_index()
indicator_details.columns = [
'indicator_id', 'indicator_name', 'pillar_name', 'direction',
'start_year', 'end_year', 'country_count'
]
fw_at_end = (
self.df_clean[self.df_clean['year'] == self.end_year]
.groupby('indicator_id')['framework']
.first()
.reset_index()
)
indicator_details = indicator_details.merge(fw_at_end, on='indicator_id', how='left')
indicator_details['framework'] = indicator_details['framework'].fillna('MDGs')
indicator_details['year_range'] = (
indicator_details['start_year'].astype(int).astype(str) + '-' +
indicator_details['end_year'].astype(int).astype(str)
)
indicator_details = indicator_details.sort_values(
['framework', 'pillar_name', 'start_year', 'indicator_name']
)
self.logger.info(f"\nTotal Indicators: {len(indicator_details)}")
self.logger.info(f"Framework breakdown (at end_year={self.end_year}):")
for fw, count in indicator_details.groupby('framework').size().items():
self.logger.info(f" {fw}: {count} indicators")
self.logger.info(f"\n{'-'*110}")
self.logger.info(
f"{'ID':<5} {'Indicator Name':<55} {'Pillar':<15} "
f"{'Framework':<10} {'Years':<12} {'Dir':<8} {'Countries'}"
)
self.logger.info(f"{'-'*110}")
for _, row in indicator_details.iterrows():
direction = 'higher+' if row['direction'] == 'higher_better' else 'lower-'
self.logger.info(
f"{int(row['indicator_id']):<5} {row['indicator_name'][:52]:<55} "
f"{row['pillar_name'][:13]:<15} {row['framework']:<10} "
f"{row['year_range']:<12} {direction:<8} {int(row['country_count'])}"
)
return year_stats
# ------------------------------------------------------------------
# STEP 11: SAVE ANALYTICAL TABLE
# ------------------------------------------------------------------
def save_analytical_table(self):
table_name = 'fact_asean_food_security_selected'
self.logger.info("\n" + "=" * 80)
self.logger.info(f"STEP 11: SAVE TO [DW/Gold] {table_name} -> fs_asean_gold")
self.logger.info("=" * 80)
try:
if 'framework' not in self.df_clean.columns:
raise ValueError("Kolom 'framework' tidak ada. Pastikan Step 6 sudah dijalankan.")
if 'norm_value_1_100' not in self.df_clean.columns:
raise ValueError("Kolom 'norm_value_1_100' tidak ada. Pastikan Step 8 sudah dijalankan.")
if 'yoy_change' not in self.df_clean.columns:
raise ValueError("Kolom 'yoy_change' tidak ada. Pastikan Step 9 sudah dijalankan.")
analytical_df = self.df_clean[[
'country_id',
'country_name',
'indicator_id',
'indicator_name',
'direction',
'framework',
'pillar_id',
'pillar_name',
'time_id',
'year',
'value',
'norm_value_1_100',
'yoy_change',
'yoy_pct',
]].copy()
analytical_df = analytical_df.sort_values(
['year', 'country_name', 'pillar_name', 'indicator_name']
).reset_index(drop=True)
analytical_df['country_id'] = analytical_df['country_id'].astype(int)
analytical_df['country_name'] = analytical_df['country_name'].astype(str)
analytical_df['indicator_id'] = analytical_df['indicator_id'].astype(int)
analytical_df['indicator_name'] = analytical_df['indicator_name'].astype(str)
analytical_df['direction'] = analytical_df['direction'].astype(str)
analytical_df['framework'] = analytical_df['framework'].astype(str)
analytical_df['pillar_id'] = analytical_df['pillar_id'].astype(int)
analytical_df['pillar_name'] = analytical_df['pillar_name'].astype(str)
analytical_df['time_id'] = analytical_df['time_id'].astype(int)
analytical_df['year'] = analytical_df['year'].astype(int)
analytical_df['value'] = analytical_df['value'].astype(float)
analytical_df['norm_value_1_100'] = analytical_df['norm_value_1_100'].astype(float)
analytical_df['yoy_change'] = analytical_df['yoy_change'].astype(float)
analytical_df['yoy_pct'] = analytical_df['yoy_pct'].astype(float)
self.logger.info(f" Total rows: {len(analytical_df):,}")
fw_dist_rows = analytical_df['framework'].value_counts()
self.logger.info(f" Framework distribution (rows):")
for fw, cnt in fw_dist_rows.items():
self.logger.info(f" {fw}: {cnt:,} rows")
fw_dist_ind = (
analytical_df[analytical_df['year'] == self.end_year]
.drop_duplicates('indicator_id')['framework']
.value_counts()
)
self.logger.info(
f" Framework distribution (indicators at year={self.end_year}):"
)
for fw, cnt in fw_dist_ind.items():
self.logger.info(f" {fw}: {cnt} indicators")
self.logger.info(
f" norm_value_1_100 range: "
f"{analytical_df['norm_value_1_100'].min():.2f} - "
f"{analytical_df['norm_value_1_100'].max():.2f}"
)
schema = [
bigquery.SchemaField("country_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("country_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("indicator_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("indicator_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("direction", "STRING", mode="REQUIRED"),
bigquery.SchemaField("framework", "STRING", mode="REQUIRED"),
bigquery.SchemaField("pillar_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("pillar_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("time_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("value", "FLOAT", mode="REQUIRED"),
bigquery.SchemaField("norm_value_1_100", "FLOAT", mode="NULLABLE"),
bigquery.SchemaField("yoy_change", "FLOAT", mode="NULLABLE"),
bigquery.SchemaField("yoy_pct", "FLOAT", mode="NULLABLE"),
]
rows_loaded = load_to_bigquery(
self.client, analytical_df, table_name,
layer='gold', write_disposition="WRITE_TRUNCATE", schema=schema
)
self.pipeline_metadata['rows_loaded'] = rows_loaded
log_update(self.client, 'DW', table_name, 'full_load', rows_loaded)
metadata = {
'source_class' : self.__class__.__name__,
'table_name' : table_name,
'execution_timestamp': self.pipeline_start,
'duration_seconds' : (datetime.now() - self.pipeline_start).total_seconds(),
'rows_fetched' : self.pipeline_metadata['rows_fetched'],
'rows_transformed' : rows_loaded,
'rows_loaded' : rows_loaded,
'completeness_pct' : 100.0,
'config_snapshot' : json.dumps({
'start_year' : self.start_year,
'end_year' : self.end_year,
'baseline_year' : self.baseline_year,
'sdg_transition_year' : self.sdg_transition_year,
'sdg_transition_source' : 'HARDCODE — SDGs resmi berlaku 1 Jan 2016',
'fixed_countries' : len(self.selected_country_ids),
'norm_scale' : '1-100 per indicator global minmax direction-aware',
'framework_logic' : (
f'SDG_TRANSITION_YEAR={SDG_TRANSITION_YEAR} (HARDCODE); '
'SDG-only + year >= SDG_TRANSITION_YEAR → SDGs; '
'SDG-only + year < SDG_TRANSITION_YEAR → MDGs (data tetap ada); '
'non-SDG-only → MDGs selalu'
),
'sdg_only_keywords_count': len(SDG_ONLY_KEYWORDS),
'condition_thresholds' : {
'bad' : f'< {THRESHOLD_BAD}',
'moderate': f'{THRESHOLD_BAD}-{THRESHOLD_GOOD}',
'good' : f'> {THRESHOLD_GOOD}',
},
}),
'validation_metrics' : json.dumps({
'fixed_countries' : len(self.selected_country_ids),
'total_indicators' : int(self.df_clean['indicator_id'].nunique()),
'sdg_transition_year': self.sdg_transition_year,
'framework_dist_rows': fw_dist_rows.to_dict(),
})
}
save_etl_metadata(self.client, metadata)
self.logger.info(f" [OK] {table_name}: {rows_loaded:,} rows -> fs_asean_gold")
return rows_loaded
except Exception as e:
self.logger.error(f"Error saving: {e}")
raise
# ------------------------------------------------------------------
# RUN
# ------------------------------------------------------------------
def run(self):
self.pipeline_start = datetime.now()
self.pipeline_metadata['start_time'] = self.pipeline_start
self.logger.info("\n" + "=" * 80)
self.logger.info("Output: fact_asean_food_security_selected -> fs_asean_gold")
self.logger.info("Kolom baru: norm_value_1_100 (min-max 1-100, direction-aware)")
self.logger.info(f"Condition threshold: bad<{THRESHOLD_BAD}, good>{THRESHOLD_GOOD}")
self.logger.info(
f"Framework: SDG_TRANSITION_YEAR={SDG_TRANSITION_YEAR} (HARDCODE). "
"SDG-only + year >= 2016 → SDGs; sebelumnya MDGs. Non-SDG-only → MDGs selalu."
)
self.logger.info("=" * 80)
self.load_source_data()
self.determine_year_boundaries()
self.filter_complete_indicators_per_country()
self.select_countries_with_all_pillars()
self.filter_indicators_consistent_across_fixed_countries()
self.assign_framework()
self.verify_no_gaps()
self.calculate_norm_value()
self.calculate_yoy()
self.analyze_indicator_availability_by_year()
self.save_analytical_table()
self.pipeline_end = datetime.now()
duration = (self.pipeline_end - self.pipeline_start).total_seconds()
self.logger.info("\n" + "=" * 80)
self.logger.info("COMPLETED")
self.logger.info("=" * 80)
self.logger.info(f" Duration : {duration:.2f}s")
self.logger.info(f" Year Range : {self.start_year}-{self.end_year}")
self.logger.info(f" SDG Transition Year: {self.sdg_transition_year} (HARDCODE)")
self.logger.info(f" Countries : {len(self.selected_country_ids)}")
self.logger.info(f" Indicators : {self.df_clean['indicator_id'].nunique()}")
self.logger.info(f" Rows Loaded : {self.pipeline_metadata['rows_loaded']:,}")
# =============================================================================
# AIRFLOW TASK FUNCTION
# =============================================================================
def run_analytical_layer():
from scripts.bigquery_config import get_bigquery_client
client = get_bigquery_client()
loader = AnalyticalLayerLoader(client)
loader.run()
print(f"Analytical layer loaded: {loader.pipeline_metadata['rows_loaded']:,} rows")
# =============================================================================
# MAIN EXECUTION
# =============================================================================
if __name__ == "__main__":
print("=" * 80)
print("BIGQUERY ANALYTICAL LAYER - DATA FILTERING")
print("Output: fact_asean_food_security_selected -> fs_asean_gold")
print(f"Norm: min-max 1-100 per indicator, direction-aware")
print(f"Condition threshold: bad<{THRESHOLD_BAD}, good>{THRESHOLD_GOOD}")
print(
f"Framework: SDG_TRANSITION_YEAR={SDG_TRANSITION_YEAR} (HARDCODE). "
"SDG-only + year >= 2016 → SDGs; sebelumnya MDGs. Non-SDG-only → MDGs selalu."
)
print("=" * 80)
logger = setup_logging()
client = get_bigquery_client()
loader = AnalyticalLayerLoader(client)
loader.run()
print("\n" + "=" * 80)
print("[OK] COMPLETED")
print(f" SDG Transition Year : {loader.sdg_transition_year} (HARDCODE)")
print(f" Rows Loaded : {loader.pipeline_metadata['rows_loaded']:,}")
print("=" * 80)