Files
airflow-coolify/scripts/bigquery_analytical_layer.py
2026-03-31 13:54:20 +07:00

1024 lines
45 KiB
Python

"""
BIGQUERY ANALYTICAL LAYER - DATA FILTERING
fact_asean_food_security_selected disimpan di fs_asean_gold (layer='gold')
Filtering Order:
1. Load data (single years only)
2. Determine year boundaries (2013 - auto-detected end year)
3. Filter complete indicators PER COUNTRY (auto-detect start year, no gaps)
4. Filter countries with ALL pillars (FIXED SET)
5. Filter indicators with consistent presence across FIXED countries
6. Determine SDGs start year & assign framework (MDGs/SDGs) per indicator
7. Calculate YoY per indicator per country
8. Analyze indicator availability by year
9. Save analytical table (dengan nama/label lengkap + kolom framework + YoY untuk Looker Studio)
FRAMEWORK LOGIC:
- SDG_START_YEAR = 2016 (default; auto-detect jika indikator SDGs pertama kali muncul lebih awal/lambat)
- Indikator yang namanya ada di SDG_INDICATOR_KEYWORDS:
* Jika data mulai >= SDG_START_YEAR -> 'SDGs'
* Jika data mulai < SDG_START_YEAR -> 'MDGs'
(artinya indikator ini sudah ada sebelum SDGs, mis. undernourishment)
- Indikator yang namanya TIDAK ada di SDG_INDICATOR_KEYWORDS -> 'MDGs'
- Penentuan framework dilakukan SETELAH filter selesai (data sudah bersih & range sudah fixed)
sehingga start_year per indikator yang digunakan adalah start_year AKTUAL di dataset ini.
"""
import pandas as pd
import numpy as np
from datetime import datetime
import logging
from typing import Dict, List
import json
import sys
if hasattr(sys.stdout, 'reconfigure'):
sys.stdout.reconfigure(encoding='utf-8')
from scripts.bigquery_config import get_bigquery_client, CONFIG, get_table_id
from scripts.bigquery_helpers import (
log_update,
load_to_bigquery,
read_from_bigquery,
setup_logging,
truncate_table,
save_etl_metadata,
)
from google.cloud import bigquery
# =============================================================================
# SDG INDICATOR KEYWORDS
# =============================================================================
# Daftar nama indikator (lowercase) yang termasuk dalam SDG Goal 2.
# Matching dilakukan dengan `kw in indicator_name.lower()` sehingga
# partial match tetap valid (menangani variasi format nama).
#
# Logika framework:
# - Nama ada di set ini + start_year >= SDG_START_YEAR -> 'SDGs'
# - Nama ada di set ini + start_year < SDG_START_YEAR -> 'MDGs'
# (indikator sudah eksis sebelum SDGs, mis. prevalence of undernourishment)
# - Nama TIDAK ada di set ini -> 'MDGs'
SDG_INDICATOR_KEYWORDS = frozenset([
# TARGET 2.1.1 — Prevalence of undernourishment (shared, sudah ada sebelum SDGs)
"prevalence of undernourishment (percent) (3-year average)",
"number of people undernourished (million) (3-year average)",
# TARGET 2.1.2 — FIES (SDGs only)
"prevalence of severe food insecurity in the total population (percent) (3-year average)",
"prevalence of severe food insecurity in the male adult population (percent) (3-year average)",
"prevalence of severe food insecurity in the female adult population (percent) (3-year average)",
"prevalence of moderate or severe food insecurity in the total population (percent) (3-year average)",
"prevalence of moderate or severe food insecurity in the male adult population (percent) (3-year average)",
"prevalence of moderate or severe food insecurity in the female adult population (percent) (3-year average)",
"number of severely food insecure people (million) (3-year average)",
"number of severely food insecure male adults (million) (3-year average)",
"number of severely food insecure female adults (million) (3-year average)",
"number of moderately or severely food insecure people (million) (3-year average)",
"number of moderately or severely food insecure male adults (million) (3-year average)",
"number of moderately or severely food insecure female adults (million) (3-year average)",
# TARGET 2.2.1 — Stunting (shared)
"percentage of children under 5 years of age who are stunted (modelled estimates) (percent)",
"number of children under 5 years of age who are stunted (modeled estimates) (million)",
# TARGET 2.2.2 — Wasting & Overweight (shared)
"percentage of children under 5 years affected by wasting (percent)",
"number of children under 5 years affected by wasting (million)",
"percentage of children under 5 years of age who are overweight (modelled estimates) (percent)",
"number of children under 5 years of age who are overweight (modeled estimates) (million)",
# TARGET 2.2.3 — Anaemia (SDGs only)
"prevalence of anemia among women of reproductive age (15-49 years) (percent)",
"number of women of reproductive age (15-49 years) affected by anemia (million)",
])
# Tahun resmi SDGs mulai berlaku (2030 Agenda adopted September 2015,
# data reporting mulai 2016). Dipakai sebagai default jika auto-detect gagal.
SDG_START_YEAR_DEFAULT = 2016
def assign_framework_dynamic(
indicator_name: str,
indicator_start_year: int,
sdg_start_year: int,
) -> str:
"""
Tentukan framework (MDGs/SDGs) berdasarkan:
1. Apakah nama indikator ada di SDG_INDICATOR_KEYWORDS?
2. Apakah data indikator ini mulai pada tahun >= sdg_start_year?
Args:
indicator_name : Nama indikator (akan di-lowercase untuk matching)
indicator_start_year : Tahun pertama data indikator ini tersedia di dataset
sdg_start_year : Tahun mulai SDGs (dari auto-detect atau default)
Returns:
'SDGs' jika indikator termasuk SDG list DAN mulai >= sdg_start_year
'MDGs' untuk semua kasus lainnya
"""
ind_lower = str(indicator_name).lower().strip()
is_sdg_name = any(kw in ind_lower for kw in SDG_INDICATOR_KEYWORDS)
if is_sdg_name and indicator_start_year >= sdg_start_year:
return 'SDGs'
return 'MDGs'
# =============================================================================
# ANALYTICAL LAYER CLASS
# =============================================================================
class AnalyticalLayerLoader:
"""
Analytical Layer Loader for BigQuery
Key Logic:
1. Complete per country (no gaps from start_year to end_year)
2. Filter countries with all pillars
3. Ensure indicators have consistent country count across all years
4. Determine SDGs start year & assign framework per indicator dynamically
5. Calculate YoY (year-over-year) change per indicator per country
6. Save dengan kolom lengkap (nama + ID + framework + YoY) untuk Looker Studio
Output: fact_asean_food_security_selected -> DW layer (Gold) -> fs_asean_gold
Kolom output:
country_id, country_name,
indicator_id, indicator_name, direction, framework,
pillar_id, pillar_name,
time_id, year, value,
yoy_change, yoy_pct
"""
def __init__(self, client: bigquery.Client):
self.client = client
self.logger = logging.getLogger(self.__class__.__name__)
self.logger.propagate = False
self.df_clean = None
self.df_indicator = None
self.df_country = None
self.df_pillar = None
self.selected_country_ids = None
self.start_year = 2013
self.end_year = None
self.baseline_year = 2023
# SDGs-related — di-set oleh determine_sdg_start_year()
self.sdg_start_year = SDG_START_YEAR_DEFAULT
self.pipeline_metadata = {
'source_class' : self.__class__.__name__,
'start_time' : None,
'end_time' : None,
'duration_seconds' : None,
'rows_fetched' : 0,
'rows_transformed' : 0,
'rows_loaded' : 0,
'validation_metrics': {}
}
self.pipeline_start = None
self.pipeline_end = None
# ------------------------------------------------------------------
# STEP 1: LOAD SOURCE DATA
# ------------------------------------------------------------------
def load_source_data(self):
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 1: LOADING SOURCE DATA from fs_asean_gold")
self.logger.info("=" * 80)
try:
# Tidak include framework dari dim_indicator —
# framework akan ditentukan dinamis di Step 6 (determine_sdg_start_year)
query = f"""
SELECT
f.country_id,
c.country_name,
f.indicator_id,
i.indicator_name,
i.direction,
f.pillar_id,
p.pillar_name,
f.time_id,
t.year,
t.start_year,
t.end_year,
t.is_year_range,
f.value,
f.source_id
FROM `{get_table_id('fact_food_security', layer='gold')}` f
JOIN `{get_table_id('dim_country', layer='gold')}` c ON f.country_id = c.country_id
JOIN `{get_table_id('dim_indicator', layer='gold')}` i ON f.indicator_id = i.indicator_id
JOIN `{get_table_id('dim_pillar', layer='gold')}` p ON f.pillar_id = p.pillar_id
JOIN `{get_table_id('dim_time', layer='gold')}` t ON f.time_id = t.time_id
"""
self.logger.info("Loading fact table with dimensions...")
self.df_clean = self.client.query(query).result().to_dataframe(
create_bqstorage_client=False
)
self.logger.info(f" Loaded: {len(self.df_clean):,} rows")
if 'is_year_range' in self.df_clean.columns:
yr = self.df_clean['is_year_range'].value_counts()
self.logger.info(f" Breakdown:")
self.logger.info(
f" Single years (is_year_range=False): {yr.get(False, 0):,}"
)
self.logger.info(
f" Year ranges (is_year_range=True): {yr.get(True, 0):,}"
)
self.df_indicator = read_from_bigquery(self.client, 'dim_indicator', layer='gold')
self.df_country = read_from_bigquery(self.client, 'dim_country', layer='gold')
self.df_pillar = read_from_bigquery(self.client, 'dim_pillar', layer='gold')
self.logger.info(f" Indicators: {len(self.df_indicator)}")
self.logger.info(f" Countries: {len(self.df_country)}")
self.logger.info(f" Pillars: {len(self.df_pillar)}")
self.pipeline_metadata['rows_fetched'] = len(self.df_clean)
return True
except Exception as e:
self.logger.error(f"Error loading source data: {e}")
raise
# ------------------------------------------------------------------
# STEP 2: DETERMINE YEAR BOUNDARIES
# ------------------------------------------------------------------
def determine_year_boundaries(self):
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 2: DETERMINE YEAR BOUNDARIES")
self.logger.info("=" * 80)
df_2023 = self.df_clean[self.df_clean['year'] == self.baseline_year]
baseline_indicator_count = df_2023['indicator_id'].nunique()
self.logger.info(f"\nBaseline Year: {self.baseline_year}")
self.logger.info(f"Baseline Indicator Count: {baseline_indicator_count}")
years_sorted = sorted(self.df_clean['year'].unique(), reverse=True)
selected_end_year = None
for year in years_sorted:
if year >= self.baseline_year:
df_year = self.df_clean[self.df_clean['year'] == year]
year_indicator_count = df_year['indicator_id'].nunique()
status = "OK" if year_indicator_count >= baseline_indicator_count else "X"
self.logger.info(f" [{status}] Year {int(year)}: {year_indicator_count} indicators")
if year_indicator_count >= baseline_indicator_count and selected_end_year is None:
selected_end_year = int(year)
if selected_end_year is None:
selected_end_year = self.baseline_year
self.logger.warning(f" [!] No year found, using baseline: {selected_end_year}")
else:
self.logger.info(f"\n [OK] Selected End Year: {selected_end_year}")
self.end_year = selected_end_year
original_count = len(self.df_clean)
self.df_clean = self.df_clean[
(self.df_clean['year'] >= self.start_year) &
(self.df_clean['year'] <= self.end_year)
].copy()
self.logger.info(f"\nFiltering {self.start_year}-{self.end_year}:")
self.logger.info(f" Rows before: {original_count:,}")
self.logger.info(f" Rows after: {len(self.df_clean):,}")
return self.df_clean
# ------------------------------------------------------------------
# STEP 3: FILTER COMPLETE INDICATORS PER COUNTRY
# ------------------------------------------------------------------
def filter_complete_indicators_per_country(self):
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 3: FILTER COMPLETE INDICATORS PER COUNTRY (NO GAPS)")
self.logger.info("=" * 80)
grouped = self.df_clean.groupby([
'country_id', 'country_name', 'indicator_id', 'indicator_name',
'pillar_id', 'pillar_name'
])
valid_combinations = []
removed_combinations = []
for (country_id, country_name, indicator_id, indicator_name,
pillar_id, pillar_name), group in grouped:
years_present = sorted(group['year'].unique())
start_year = int(min(years_present))
end_year_actual = int(max(years_present))
expected_years = list(range(start_year, self.end_year + 1))
missing_years = [y for y in expected_years if y not in years_present]
has_gap = len(missing_years) > 0
is_complete = (
end_year_actual >= self.end_year and
not has_gap and
(self.end_year - start_year) >= 4
)
if is_complete:
valid_combinations.append({'country_id': country_id, 'indicator_id': indicator_id})
else:
reasons = []
if end_year_actual < self.end_year:
reasons.append(f"ends {end_year_actual}")
if has_gap:
gap_str = str(missing_years[:3])[1:-1]
if len(missing_years) > 3:
gap_str += "..."
reasons.append(f"gap:{gap_str}")
if (self.end_year - start_year) < 4:
reasons.append(f"span={self.end_year - start_year}")
removed_combinations.append({
'country_name' : country_name,
'indicator_name': indicator_name,
'reasons' : ", ".join(reasons)
})
self.logger.info(f"\n [+] Valid: {len(valid_combinations):,}")
self.logger.info(f" [-] Removed: {len(removed_combinations):,}")
df_valid = pd.DataFrame(valid_combinations)
df_valid['key'] = (
df_valid['country_id'].astype(str) + '_' +
df_valid['indicator_id'].astype(str)
)
self.df_clean['key'] = (
self.df_clean['country_id'].astype(str) + '_' +
self.df_clean['indicator_id'].astype(str)
)
original_count = len(self.df_clean)
self.df_clean = self.df_clean[self.df_clean['key'].isin(df_valid['key'])].copy()
self.df_clean = self.df_clean.drop('key', axis=1)
self.logger.info(f"\n Rows before: {original_count:,}")
self.logger.info(f" Rows after: {len(self.df_clean):,}")
self.logger.info(f" Countries: {self.df_clean['country_id'].nunique()}")
self.logger.info(f" Indicators: {self.df_clean['indicator_id'].nunique()}")
return self.df_clean
# ------------------------------------------------------------------
# STEP 4: SELECT COUNTRIES WITH ALL PILLARS
# ------------------------------------------------------------------
def select_countries_with_all_pillars(self):
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 4: SELECT COUNTRIES WITH ALL PILLARS (FIXED SET)")
self.logger.info("=" * 80)
total_pillars = self.df_clean['pillar_id'].nunique()
country_pillar_count = self.df_clean.groupby(['country_id', 'country_name']).agg({
'pillar_id' : 'nunique',
'indicator_id': 'nunique',
'year' : lambda x: f"{int(x.min())}-{int(x.max())}"
}).reset_index()
country_pillar_count.columns = [
'country_id', 'country_name', 'pillar_count', 'indicator_count', 'year_range'
]
for _, row in country_pillar_count.sort_values('pillar_count', ascending=False).iterrows():
status = "[+] KEEP" if row['pillar_count'] == total_pillars else "[-] REMOVE"
self.logger.info(
f" {status:<12} {row['country_name']:25s} "
f"{row['pillar_count']}/{total_pillars} pillars"
)
selected_countries = country_pillar_count[
country_pillar_count['pillar_count'] == total_pillars
]
self.selected_country_ids = selected_countries['country_id'].tolist()
self.logger.info(f"\n FIXED SET: {len(self.selected_country_ids)} countries")
original_count = len(self.df_clean)
self.df_clean = self.df_clean[
self.df_clean['country_id'].isin(self.selected_country_ids)
].copy()
self.logger.info(f" Rows before: {original_count:,}")
self.logger.info(f" Rows after: {len(self.df_clean):,}")
return self.df_clean
# ------------------------------------------------------------------
# STEP 5: FILTER INDICATORS CONSISTENT ACROSS FIXED COUNTRIES
# ------------------------------------------------------------------
def filter_indicators_consistent_across_fixed_countries(self):
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 5: FILTER INDICATORS WITH CONSISTENT PRESENCE")
self.logger.info("=" * 80)
indicator_country_start = self.df_clean.groupby([
'indicator_id', 'indicator_name', 'country_id'
])['year'].min().reset_index()
indicator_country_start.columns = [
'indicator_id', 'indicator_name', 'country_id', 'start_year'
]
indicator_max_start = indicator_country_start.groupby([
'indicator_id', 'indicator_name'
])['start_year'].max().reset_index()
indicator_max_start.columns = ['indicator_id', 'indicator_name', 'max_start_year']
valid_indicators = []
removed_indicators = []
for _, ind_row in indicator_max_start.iterrows():
indicator_id = ind_row['indicator_id']
indicator_name = ind_row['indicator_name']
max_start = int(ind_row['max_start_year'])
span = self.end_year - max_start
if span < 4:
removed_indicators.append({
'indicator_name': indicator_name,
'reason' : f"span={span} < 4"
})
continue
expected_years = list(range(max_start, self.end_year + 1))
ind_data = self.df_clean[self.df_clean['indicator_id'] == indicator_id]
all_years_complete = True
problematic_years = []
for year in expected_years:
country_count = ind_data[ind_data['year'] == year]['country_id'].nunique()
if country_count < len(self.selected_country_ids):
all_years_complete = False
problematic_years.append(f"{int(year)}({country_count})")
if all_years_complete:
valid_indicators.append(indicator_id)
else:
removed_indicators.append({
'indicator_name': indicator_name,
'reason' : (
f"missing countries in years: {', '.join(problematic_years[:5])}"
)
})
self.logger.info(f"\n [+] Valid: {len(valid_indicators)}")
self.logger.info(f" [-] Removed: {len(removed_indicators)}")
if not valid_indicators:
raise ValueError("No valid indicators found after filtering!")
original_count = len(self.df_clean)
self.df_clean = self.df_clean[
self.df_clean['indicator_id'].isin(valid_indicators)
].copy()
self.df_clean = self.df_clean.merge(
indicator_max_start[['indicator_id', 'max_start_year']],
on='indicator_id', how='left'
)
self.df_clean = self.df_clean[
self.df_clean['year'] >= self.df_clean['max_start_year']
].copy()
self.df_clean = self.df_clean.drop('max_start_year', axis=1)
self.logger.info(f"\n Rows before: {original_count:,}")
self.logger.info(f" Rows after: {len(self.df_clean):,}")
self.logger.info(f" Countries: {self.df_clean['country_id'].nunique()}")
self.logger.info(f" Indicators: {self.df_clean['indicator_id'].nunique()}")
self.logger.info(f" Pillars: {self.df_clean['pillar_id'].nunique()}")
return self.df_clean
# ------------------------------------------------------------------
# STEP 6: DETERMINE SDG START YEAR & ASSIGN FRAMEWORK
# ------------------------------------------------------------------
def determine_sdg_start_year(self):
"""
Tentukan tahun mulai SDGs secara otomatis dari data aktual, lalu
assign kolom 'framework' (MDGs/SDGs) ke setiap baris di df_clean.
Logika penentuan SDG_START_YEAR:
- Cari indikator yang namanya ada di SDG_INDICATOR_KEYWORDS (FIES, anaemia, dll.)
dan yang diyakini HANYA ada di SDGs (bukan shared dengan MDGs).
Proxy: indikator dengan keyword 'food insecurity' atau 'anemia'.
- Ambil tahun pertama (min year) dari indikator-indikator tersebut di dataset ini.
- Jika ditemukan -> sdg_start_year = tahun pertama itu.
- Jika tidak ditemukan -> sdg_start_year = SDG_START_YEAR_DEFAULT (2016).
Logika assign framework per indikator (assign_framework_dynamic):
- Nama ada di SDG_INDICATOR_KEYWORDS + start_year >= sdg_start_year -> 'SDGs'
- Nama ada di SDG_INDICATOR_KEYWORDS + start_year < sdg_start_year -> 'MDGs'
(indikator seperti undernourishment sudah ada sebelum SDGs)
- Nama TIDAK ada di SDG_INDICATOR_KEYWORDS -> 'MDGs'
"""
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 6: DETERMINE SDG START YEAR & ASSIGN FRAMEWORK")
self.logger.info("=" * 80)
# --- 6a. Auto-detect SDG start year dari data aktual ---
# Proxy SDGs-only: indikator yang pasti baru di SDGs (FIES & anaemia)
sdg_proxy_keywords = [
'food insecurity',
'anemia',
'anaemia',
]
sdg_proxy_mask = self.df_clean['indicator_name'].str.lower().apply(
lambda n: any(kw in n for kw in sdg_proxy_keywords)
)
df_sdg_proxy = self.df_clean[sdg_proxy_mask]
if len(df_sdg_proxy) > 0:
detected_start = int(df_sdg_proxy['year'].min())
self.sdg_start_year = detected_start
self.logger.info(
f"\n [OK] SDG start year AUTO-DETECTED dari data: {self.sdg_start_year}"
)
self.logger.info(f" Proxy indicators used (sample):")
proxy_sample = (
df_sdg_proxy['indicator_name']
.drop_duplicates()
.head(5)
.tolist()
)
for ind in proxy_sample:
self.logger.info(f" - {ind}")
else:
self.sdg_start_year = SDG_START_YEAR_DEFAULT
self.logger.warning(
f"\n [WARN] SDG proxy indicators not found in dataset. "
f"Using default: {self.sdg_start_year}"
)
self.logger.info(f"\n SDG_START_YEAR = {self.sdg_start_year}")
# --- 6b. Hitung start_year aktual per indikator di dataset ini ---
indicator_start = (
self.df_clean
.groupby(['indicator_id', 'indicator_name'])['year']
.min()
.reset_index()
)
indicator_start.columns = ['indicator_id', 'indicator_name', 'actual_start_year']
# --- 6c. Assign framework per indikator ---
indicator_start['framework'] = indicator_start.apply(
lambda row: assign_framework_dynamic(
indicator_name = row['indicator_name'],
indicator_start_year = int(row['actual_start_year']),
sdg_start_year = self.sdg_start_year,
),
axis=1
)
# --- 6d. Log hasil assignment ---
self.logger.info(f"\n Framework assignment per indicator:")
self.logger.info(f" {'-'*85}")
self.logger.info(
f" {'ID':<5} {'Framework':<10} {'Start Yr':<10} {'Indicator Name'}"
)
self.logger.info(f" {'-'*85}")
for _, row in indicator_start.sort_values(
['framework', 'actual_start_year', 'indicator_name']
).iterrows():
is_in_sdg_list = any(
kw in str(row['indicator_name']).lower()
for kw in SDG_INDICATOR_KEYWORDS
)
note = " [in SDG list]" if is_in_sdg_list else ""
self.logger.info(
f" {int(row['indicator_id']):<5} {row['framework']:<10} "
f"{int(row['actual_start_year']):<10} {row['indicator_name'][:55]}{note}"
)
fw_summary = indicator_start['framework'].value_counts()
self.logger.info(f"\n Framework summary:")
for fw, cnt in fw_summary.items():
self.logger.info(f" {fw}: {cnt} indicators")
# --- 6e. Merge framework ke df_clean ---
self.df_clean = self.df_clean.merge(
indicator_start[['indicator_id', 'framework']],
on='indicator_id', how='left'
)
self.df_clean['framework'] = self.df_clean['framework'].fillna('MDGs')
self.logger.info(f"\n [OK] Kolom 'framework' ditambahkan ke df_clean")
self.logger.info(
f" Row distribution — MDGs: "
f"{(self.df_clean['framework'] == 'MDGs').sum():,} | "
f"SDGs: {(self.df_clean['framework'] == 'SDGs').sum():,}"
)
return self.df_clean
# ------------------------------------------------------------------
# STEP 6b: VERIFY NO GAPS
# ------------------------------------------------------------------
def verify_no_gaps(self):
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 6c: VERIFY NO GAPS")
self.logger.info("=" * 80)
expected_countries = len(self.selected_country_ids)
verification = self.df_clean.groupby(
['indicator_id', 'year']
)['country_id'].nunique().reset_index()
verification.columns = ['indicator_id', 'year', 'country_count']
all_good = (verification['country_count'] == expected_countries).all()
if all_good:
self.logger.info(
f" VERIFICATION PASSED — all combinations have {expected_countries} countries"
)
else:
bad = verification[verification['country_count'] != expected_countries]
for _, row in bad.head(10).iterrows():
self.logger.error(
f" Indicator {int(row['indicator_id'])}, Year {int(row['year'])}: "
f"{int(row['country_count'])} countries (expected {expected_countries})"
)
raise ValueError("Gap verification failed!")
return True
# ------------------------------------------------------------------
# STEP 7: CALCULATE YOY
# ------------------------------------------------------------------
def calculate_yoy(self):
"""
Hitung Year-over-Year (YoY) per indikator per negara.
Kolom yang ditambahkan:
yoy_change : selisih absolut -> value - value_tahun_sebelumnya
yoy_pct : perubahan relatif -> (yoy_change / abs(value_prev)) * 100
Baris tahun pertama per kombinasi country-indicator bernilai NULL (intentional).
"""
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 7: CALCULATE YEAR-OVER-YEAR (YoY) PER INDICATOR PER COUNTRY")
self.logger.info("=" * 80)
df = self.df_clean.sort_values(['country_id', 'indicator_id', 'year']).copy()
df['value_prev'] = df.groupby(['country_id', 'indicator_id'])['value'].shift(1)
df['yoy_change'] = df['value'] - df['value_prev']
df['yoy_pct'] = np.where(
df['value_prev'].notna() & (df['value_prev'] != 0),
(df['yoy_change'] / df['value_prev'].abs()) * 100,
np.nan
)
df = df.drop(columns=['value_prev'])
total_rows = len(df)
valid_yoy = df['yoy_pct'].notna().sum()
null_yoy = df['yoy_pct'].isna().sum()
self.logger.info(f" Total rows : {total_rows:,}")
self.logger.info(f" YoY calculated : {valid_yoy:,}")
self.logger.info(f" YoY NULL (base yr): {null_yoy:,} <- tahun pertama per country-indicator")
per_ind = (
df[df['yoy_pct'].notna()]
.groupby(['indicator_id', 'indicator_name'])['yoy_pct']
.agg(['mean', 'std', 'min', 'max'])
.reset_index()
)
per_ind.columns = ['indicator_id', 'indicator_name', 'mean', 'std', 'min', 'max']
self.logger.info(f"\n YoY summary per indicator (top 10 by abs mean change):")
self.logger.info(f" {'-'*100}")
self.logger.info(
f" {'ID':<5} {'Indicator Name':<52} {'Mean%':>8} {'Std%':>8} {'Min%':>8} {'Max%':>8}"
)
self.logger.info(f" {'-'*100}")
top_ind = per_ind.reindex(
per_ind['mean'].abs().sort_values(ascending=False).index
).head(10)
for _, row in top_ind.iterrows():
self.logger.info(
f" {int(row['indicator_id']):<5} {row['indicator_name'][:50]:<52} "
f"{row['mean']:>+8.2f} {row['std']:>8.2f} "
f"{row['min']:>+8.2f} {row['max']:>+8.2f}"
)
per_country = (
df[df['yoy_pct'].notna()]
.groupby(['country_id', 'country_name'])['yoy_pct']
.agg(['mean', 'std'])
.reset_index()
)
per_country.columns = ['country_id', 'country_name', 'mean_yoy', 'std_yoy']
self.logger.info(f"\n YoY summary per country:")
self.logger.info(f" {'-'*60}")
self.logger.info(f" {'Country':<30} {'Mean YoY%':>10} {'Std YoY%':>10}")
self.logger.info(f" {'-'*60}")
for _, row in per_country.sort_values('mean_yoy', ascending=False).iterrows():
self.logger.info(
f" {row['country_name']:<30} {row['mean_yoy']:>+10.2f} {row['std_yoy']:>10.2f}"
)
self.df_clean = df
self.logger.info(f"\n [OK] YoY columns added: yoy_change, yoy_pct")
return self.df_clean
# ------------------------------------------------------------------
# STEP 8: ANALYZE INDICATOR AVAILABILITY BY YEAR
# ------------------------------------------------------------------
def analyze_indicator_availability_by_year(self):
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 8: ANALYZE INDICATOR AVAILABILITY BY YEAR")
self.logger.info("=" * 80)
year_stats = self.df_clean.groupby('year').agg({
'indicator_id': 'nunique',
'country_id' : 'nunique'
}).reset_index()
year_stats.columns = ['year', 'indicator_count', 'country_count']
self.logger.info(f"\n{'Year':<8} {'Indicators':<15} {'Countries':<12} {'Rows'}")
self.logger.info("-" * 50)
for _, row in year_stats.iterrows():
year = int(row['year'])
row_count = len(self.df_clean[self.df_clean['year'] == year])
self.logger.info(
f"{year:<8} {int(row['indicator_count']):<15} "
f"{int(row['country_count']):<12} {row_count:,}"
)
indicator_details = self.df_clean.groupby([
'indicator_id', 'indicator_name', 'pillar_name', 'direction', 'framework'
]).agg({'year': ['min', 'max'], 'country_id': 'nunique'}).reset_index()
indicator_details.columns = [
'indicator_id', 'indicator_name', 'pillar_name', 'direction', 'framework',
'start_year', 'end_year', 'country_count'
]
indicator_details['year_range'] = (
indicator_details['start_year'].astype(int).astype(str) + '-' +
indicator_details['end_year'].astype(int).astype(str)
)
indicator_details = indicator_details.sort_values(
['framework', 'pillar_name', 'start_year', 'indicator_name']
)
self.logger.info(f"\nTotal Indicators: {len(indicator_details)}")
for pillar, count in indicator_details.groupby('pillar_name').size().items():
self.logger.info(f" {pillar}: {count} indicators")
self.logger.info(f"\nFramework breakdown:")
for fw, count in indicator_details.groupby('framework').size().items():
self.logger.info(f" {fw}: {count} indicators")
self.logger.info(f"\n{'-'*110}")
self.logger.info(
f"{'ID':<5} {'Indicator Name':<55} {'Pillar':<15} "
f"{'Framework':<10} {'Years':<12} {'Dir':<8} {'Countries'}"
)
self.logger.info(f"{'-'*110}")
for _, row in indicator_details.iterrows():
direction = 'higher+' if row['direction'] == 'higher_better' else 'lower-'
self.logger.info(
f"{int(row['indicator_id']):<5} {row['indicator_name'][:52]:<55} "
f"{row['pillar_name'][:13]:<15} {row['framework']:<10} "
f"{row['year_range']:<12} {direction:<8} {int(row['country_count'])}"
)
return year_stats
# ------------------------------------------------------------------
# STEP 9: SAVE ANALYTICAL TABLE
# ------------------------------------------------------------------
def save_analytical_table(self):
"""
Simpan fact_asean_food_security_selected ke Gold layer.
Kolom yang disimpan:
country_id, country_name — dimensi negara
indicator_id, indicator_name — dimensi indikator
direction — arah penilaian (higher/lower_better)
framework — MDGs/SDGs (ditentukan di Step 6)
pillar_id, pillar_name — dimensi pilar
time_id, year — dimensi waktu
value — nilai indikator
yoy_change — perubahan absolut YoY (NULL di tahun pertama)
yoy_pct — perubahan relatif YoY dalam % (NULL di tahun pertama)
"""
table_name = 'fact_asean_food_security_selected'
self.logger.info("\n" + "=" * 80)
self.logger.info(f"STEP 9: SAVE TO [DW/Gold] {table_name} -> fs_asean_gold")
self.logger.info("=" * 80)
try:
# Pastikan kolom YoY tersedia — fallback jika calculate_yoy() tidak dipanggil
if 'yoy_change' not in self.df_clean.columns or 'yoy_pct' not in self.df_clean.columns:
self.logger.warning(
" [WARN] Kolom YoY tidak ditemukan. Menjalankan calculate_yoy() sebagai fallback..."
)
self.calculate_yoy()
analytical_df = self.df_clean[[
'country_id',
'country_name',
'indicator_id',
'indicator_name',
'direction',
'framework',
'pillar_id',
'pillar_name',
'time_id',
'year',
'value',
'yoy_change',
'yoy_pct',
]].copy()
analytical_df = analytical_df.sort_values(
['year', 'country_name', 'pillar_name', 'indicator_name']
).reset_index(drop=True)
analytical_df['country_id'] = analytical_df['country_id'].astype(int)
analytical_df['country_name'] = analytical_df['country_name'].astype(str)
analytical_df['indicator_id'] = analytical_df['indicator_id'].astype(int)
analytical_df['indicator_name'] = analytical_df['indicator_name'].astype(str)
analytical_df['direction'] = analytical_df['direction'].astype(str)
analytical_df['framework'] = analytical_df['framework'].astype(str)
analytical_df['pillar_id'] = analytical_df['pillar_id'].astype(int)
analytical_df['pillar_name'] = analytical_df['pillar_name'].astype(str)
analytical_df['time_id'] = analytical_df['time_id'].astype(int)
analytical_df['year'] = analytical_df['year'].astype(int)
analytical_df['value'] = analytical_df['value'].astype(float)
analytical_df['yoy_change'] = analytical_df['yoy_change'].astype(float)
analytical_df['yoy_pct'] = analytical_df['yoy_pct'].astype(float)
self.logger.info(f" Kolom yang disimpan: {list(analytical_df.columns)}")
self.logger.info(f" Total rows: {len(analytical_df):,}")
fw_dist = analytical_df.drop_duplicates('indicator_id')['framework'].value_counts()
self.logger.info(f" Framework distribution (per indikator unik):")
for fw, cnt in fw_dist.items():
self.logger.info(f" {fw}: {cnt} indicators")
yoy_valid = analytical_df['yoy_pct'].notna().sum()
yoy_null = analytical_df['yoy_pct'].isna().sum()
self.logger.info(f" YoY rows (calculated): {yoy_valid:,}")
self.logger.info(f" YoY rows (NULL/base) : {yoy_null:,}")
schema = [
bigquery.SchemaField("country_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("country_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("indicator_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("indicator_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("direction", "STRING", mode="REQUIRED"),
bigquery.SchemaField("framework", "STRING", mode="REQUIRED"),
bigquery.SchemaField("pillar_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("pillar_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("time_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("value", "FLOAT", mode="REQUIRED"),
bigquery.SchemaField("yoy_change", "FLOAT", mode="NULLABLE"),
bigquery.SchemaField("yoy_pct", "FLOAT", mode="NULLABLE"),
]
rows_loaded = load_to_bigquery(
self.client, analytical_df, table_name,
layer='gold', write_disposition="WRITE_TRUNCATE", schema=schema
)
self.pipeline_metadata['rows_loaded'] = rows_loaded
log_update(self.client, 'DW', table_name, 'full_load', rows_loaded)
metadata = {
'source_class' : self.__class__.__name__,
'table_name' : table_name,
'execution_timestamp': self.pipeline_start,
'duration_seconds' : (datetime.now() - self.pipeline_start).total_seconds(),
'rows_fetched' : self.pipeline_metadata['rows_fetched'],
'rows_transformed' : rows_loaded,
'rows_loaded' : rows_loaded,
'completeness_pct' : 100.0,
'config_snapshot' : json.dumps({
'start_year' : self.start_year,
'end_year' : self.end_year,
'sdg_start_year' : self.sdg_start_year,
'fixed_countries' : len(self.selected_country_ids),
'no_gaps' : True,
'layer' : 'gold',
'framework_logic' : (
f"SDGs if in SDG_INDICATOR_KEYWORDS AND start_year >= {self.sdg_start_year}, "
"else MDGs"
),
}),
'validation_metrics' : json.dumps({
'fixed_countries' : len(self.selected_country_ids),
'total_indicators': int(self.df_clean['indicator_id'].nunique()),
'sdg_start_year' : self.sdg_start_year,
'framework_dist' : fw_dist.to_dict(),
'yoy_rows_valid' : int(yoy_valid),
'yoy_rows_null' : int(yoy_null),
})
}
save_etl_metadata(self.client, metadata)
self.logger.info(
f" {table_name}: {rows_loaded:,} rows -> [DW/Gold] fs_asean_gold"
)
self.logger.info(f" Metadata -> [AUDIT] etl_metadata")
return rows_loaded
except Exception as e:
self.logger.error(f"Error saving: {e}")
raise
# ------------------------------------------------------------------
# RUN
# ------------------------------------------------------------------
def run(self):
self.pipeline_start = datetime.now()
self.pipeline_metadata['start_time'] = self.pipeline_start
self.logger.info("\n" + "=" * 80)
self.logger.info("Output: fact_asean_food_security_selected -> fs_asean_gold")
self.logger.info("Kolom: country_id/name, indicator_id/name, direction, framework,")
self.logger.info(" pillar_id/name, time_id, year, value, yoy_change, yoy_pct")
self.logger.info(f"Framework: ditentukan dinamis berdasarkan SDG_START_YEAR (auto-detect)")
self.logger.info("=" * 80)
self.load_source_data()
self.determine_year_boundaries()
self.filter_complete_indicators_per_country()
self.select_countries_with_all_pillars()
self.filter_indicators_consistent_across_fixed_countries()
self.determine_sdg_start_year() # Step 6: auto-detect SDG year & assign framework
self.verify_no_gaps() # Step 6c: verifikasi tidak ada gap
self.calculate_yoy() # Step 7: hitung YoY
self.analyze_indicator_availability_by_year()
self.save_analytical_table()
self.pipeline_end = datetime.now()
duration = (self.pipeline_end - self.pipeline_start).total_seconds()
self.logger.info("\n" + "=" * 80)
self.logger.info("COMPLETED")
self.logger.info("=" * 80)
self.logger.info(f" Duration : {duration:.2f}s")
self.logger.info(f" Year Range : {self.start_year}-{self.end_year}")
self.logger.info(f" SDG Start Yr : {self.sdg_start_year}")
self.logger.info(f" Countries : {len(self.selected_country_ids)}")
self.logger.info(f" Indicators : {self.df_clean['indicator_id'].nunique()}")
self.logger.info(f" Rows Loaded : {self.pipeline_metadata['rows_loaded']:,}")
# =============================================================================
# AIRFLOW TASK FUNCTION
# =============================================================================
def run_analytical_layer():
"""
Airflow task: Build fact_asean_food_security_selected dari fact_food_security + dims.
Dipanggil setelah dimensional_model_to_gold selesai.
"""
from scripts.bigquery_config import get_bigquery_client
client = get_bigquery_client()
loader = AnalyticalLayerLoader(client)
loader.run()
print(f"Analytical layer loaded: {loader.pipeline_metadata['rows_loaded']:,} rows")
# =============================================================================
# MAIN EXECUTION
# =============================================================================
if __name__ == "__main__":
print("=" * 80)
print("BIGQUERY ANALYTICAL LAYER - DATA FILTERING")
print("Output: fact_asean_food_security_selected -> fs_asean_gold")
print("Framework: MDGs/SDGs ditentukan dinamis dari data (auto-detect SDG start year)")
print("=" * 80)
logger = setup_logging()
client = get_bigquery_client()
loader = AnalyticalLayerLoader(client)
loader.run()
print("\n" + "=" * 80)
print("[OK] COMPLETED")
print(f" SDG Start Year : {loader.sdg_start_year}")
print(f" Rows Loaded : {loader.pipeline_metadata['rows_loaded']:,}")
print("=" * 80)