Files
airflow-coolify/scripts/bigquery_analytical_layer.py
2026-05-19 10:09:48 +07:00

789 lines
38 KiB
Python

"""
BIGQUERY ANALYTICAL LAYER - DATA FILTERING
FIXED: fact_asean_food_security_selected disimpan di fs_asean_gold (layer='gold')
Filtering Order:
1. Load data (single years only)
2. Determine year boundaries (2013 - auto-detected end year)
3. Filter complete indicators PER COUNTRY (auto-detect start year, no gaps)
4. Filter countries with ALL pillars (FIXED SET)
5. Filter indicators with consistent presence across FIXED countries
6. Save analytical table (dengan nama/label lengkap untuk Looker Studio)
ADDED: Kolom indicator_name_id dan pillar_name_id (terjemahan Bahasa Indonesia)
"""
import pandas as pd
import numpy as np
from datetime import datetime
import logging
from typing import Dict, List
import json
import sys
if hasattr(sys.stdout, 'reconfigure'):
sys.stdout.reconfigure(encoding='utf-8')
from scripts.bigquery_config import get_bigquery_client, CONFIG, get_table_id
from scripts.bigquery_helpers import (
log_update,
load_to_bigquery,
read_from_bigquery,
setup_logging,
truncate_table,
save_etl_metadata,
)
from google.cloud import bigquery
# =============================================================================
# TRANSLATION DICTIONARIES
# =============================================================================
PILLAR_TRANSLATION_ID: dict = {
# 4 pilar utama Food Security
"Availability" : "Ketersediaan",
"Access" : "Keterjangkauan",
"Utilization" : "Pemanfaatan",
"Stability" : "Stabilitas",
# Variasi penulisan yang mungkin muncul
"availability" : "Ketersediaan",
"access" : "Keterjangkauan",
"utilization" : "Pemanfaatan",
"stability" : "Stabilitas",
"Food Availability" : "Ketersediaan Pangan",
"Food Access" : "Keterjangkauan Pangan",
"Food Utilization" : "Pemanfaatan Pangan",
"Food Stability" : "Stabilitas Pangan",
}
INDICATOR_TRANSLATION_ID: dict = {
# -------------------------------------------------------------------------
# AVAILABILITY
# -------------------------------------------------------------------------
"Average dietary energy supply adequacy (percent) (3-year average)":
"Kecukupan rata-rata pasokan energi makanan (persen) (rata-rata 3 tahun)",
"Average value of food production (constant 2014-2016 thousand US$) (3-year average)":
"Nilai rata-rata produksi pangan (ribu US$ konstan 2014-2016) (rata-rata 3 tahun)",
"Share of dietary energy supply derived from cereals, roots and tubers (percent) (3-year average)":
"Proporsi pasokan energi makanan dari serealia, akar, dan umbi-umbian (persen) (rata-rata 3 tahun)",
"Average protein supply (g/cap/day) (3-year average)":
"Rata-rata pasokan protein (g/kapita/hari) (rata-rata 3 tahun)",
"Average supply of protein of animal origin (g/cap/day) (3-year average)":
"Rata-rata pasokan protein hewani (g/kapita/hari) (rata-rata 3 tahun)",
"Cereal import dependency ratio (percent) (3-year average)":
"Rasio ketergantungan impor sereal (persen) (rata-rata 3 tahun)",
"Percent of arable land equipped for irrigation (percent) (3-year average)":
"Persentase lahan pertanian yang dilengkapi irigasi (persen) (rata-rata 3 tahun)",
"Crop production index (2014-2016 = 100)":
"Indeks produksi tanaman pangan (2014-2016 = 100)",
"Livestock production index (2014-2016 = 100)":
"Indeks produksi peternakan (2014-2016 = 100)",
"Value of food imports over total merchandise exports (percent) (3-year average)":
"Nilai impor pangan terhadap total ekspor barang (persen) (rata-rata 3 tahun)",
"Food production variability (constant 2014-2016 thousand US$ per capita)":
"Variabilitas produksi pangan (ribu US$ konstan 2014-2016 per kapita)",
"Food supply variability (kcal/cap/day)":
"Variabilitas pasokan pangan (kkal/kapita/hari)",
# -------------------------------------------------------------------------
# ACCESS
# -------------------------------------------------------------------------
"Gross domestic product per capita, PPP (constant 2017 international $)":
"Produk domestik bruto per kapita, PPP (internasional konstan 2017 US$)",
"Domestic food price level index (2015 = 1.00)":
"Indeks tingkat harga pangan domestik (2015 = 1,00)",
"Domestic food price volatility index":
"Indeks volatilitas harga pangan domestik",
"Prevalence of undernourishment (percent) (3-year average)":
"Prevalensi kekurangan gizi (persen) (rata-rata 3 tahun)",
"Number of people undernourished (million) (3-year average)":
"Jumlah penduduk kekurangan gizi (juta jiwa) (rata-rata 3 tahun)",
"Depth of the food deficit (kcal/capita/day) (3-year average)":
"Kedalaman defisit pangan (kkal/kapita/hari) (rata-rata 3 tahun)",
"Percentage of population using at least basic drinking water services (percent)":
"Persentase penduduk yang menggunakan layanan air minum dasar (persen)",
"Percentage of population using safely managed drinking water services (percent)":
"Persentase penduduk yang menggunakan layanan air minum yang dikelola dengan aman (persen)",
"Percentage of population using at least basic sanitation services (percent)":
"Persentase penduduk yang menggunakan layanan sanitasi dasar (persen)",
"Percentage of population using safely managed sanitation services (percent)":
"Persentase penduduk yang menggunakan layanan sanitasi yang dikelola dengan aman (persen)",
"Access to electricity (percent of rural population)":
"Akses listrik (persen penduduk pedesaan)",
"Proportion of population with access to electricity (percent)":
"Proporsi penduduk dengan akses listrik (persen)",
"Road infrastructure index":
"Indeks infrastruktur jalan",
"Rail lines density (total route-km per 100 square km of land area)":
"Kepadatan jalur kereta api (total rute-km per 100 km2 lahan)",
"Gross national income per capita (Atlas method, current US$)":
"Pendapatan nasional bruto per kapita (metode Atlas, US$ terkini)",
"Food Insecurity Experience Scale (FIES)":
"Skala Pengalaman Ketidakamanan Pangan (FIES)",
# -------------------------------------------------------------------------
# UTILIZATION
# -------------------------------------------------------------------------
"Prevalence of severe food insecurity in the total population (percent) (3-year average)":
"Prevalensi kerawanan pangan berat pada total penduduk (persen) (rata-rata 3 tahun)",
"Prevalence of severe food insecurity in the male adult population (percent) (3-year average)":
"Prevalensi kerawanan pangan berat pada penduduk laki-laki dewasa (persen) (rata-rata 3 tahun)",
"Prevalence of severe food insecurity in the female adult population (percent) (3-year average)":
"Prevalensi kerawanan pangan berat pada penduduk perempuan dewasa (persen) (rata-rata 3 tahun)",
"Prevalence of moderate or severe food insecurity in the total population (percent) (3-year average)":
"Prevalensi kerawanan pangan sedang atau berat pada total penduduk (persen) (rata-rata 3 tahun)",
"Prevalence of moderate or severe food insecurity in the male adult population (percent) (3-year average)":
"Prevalensi kerawanan pangan sedang atau berat pada penduduk laki-laki dewasa (persen) (rata-rata 3 tahun)",
"Prevalence of moderate or severe food insecurity in the female adult population (percent) (3-year average)":
"Prevalensi kerawanan pangan sedang atau berat pada penduduk perempuan dewasa (persen) (rata-rata 3 tahun)",
"Number of severely food insecure people (million) (3-year average)":
"Jumlah penduduk yang mengalami kerawanan pangan berat (juta jiwa) (rata-rata 3 tahun)",
"Number of severely food insecure male adults (million) (3-year average)":
"Jumlah laki-laki dewasa yang mengalami kerawanan pangan berat (juta jiwa) (rata-rata 3 tahun)",
"Number of severely food insecure female adults (million) (3-year average)":
"Jumlah perempuan dewasa yang mengalami kerawanan pangan berat (juta jiwa) (rata-rata 3 tahun)",
"Number of moderately or severely food insecure people (million) (3-year average)":
"Jumlah penduduk yang mengalami kerawanan pangan sedang atau berat (juta jiwa) (rata-rata 3 tahun)",
"Number of moderately or severely food insecure male adults (million) (3-year average)":
"Jumlah laki-laki dewasa yang mengalami kerawanan pangan sedang atau berat (juta jiwa) (rata-rata 3 tahun)",
"Number of moderately or severely food insecure female adults (million) (3-year average)":
"Jumlah perempuan dewasa yang mengalami kerawanan pangan sedang atau berat (juta jiwa) (rata-rata 3 tahun)",
"Percentage of children under 5 years of age who are stunted (modelled estimates) (percent)":
"Persentase anak di bawah 5 tahun yang mengalami stunting (estimasi model) (persen)",
"Number of children under 5 years of age who are stunted (modeled estimates) (million)":
"Jumlah anak di bawah 5 tahun yang mengalami stunting (estimasi model) (juta jiwa)",
"Percentage of children under 5 years affected by wasting (percent)":
"Persentase anak di bawah 5 tahun yang mengalami wasting (persen)",
"Number of children under 5 years affected by wasting (million)":
"Jumlah anak di bawah 5 tahun yang mengalami wasting (juta jiwa)",
"Percentage of children under 5 years of age who are overweight (modelled estimates) (percent)":
"Persentase anak di bawah 5 tahun yang mengalami kelebihan berat badan (estimasi model) (persen)",
"Number of children under 5 years of age who are overweight (modeled estimates) (million)":
"Jumlah anak di bawah 5 tahun yang mengalami kelebihan berat badan (estimasi model) (juta jiwa)",
"Prevalence of anemia among women of reproductive age (15-49 years) (percent)":
"Prevalensi anemia pada perempuan usia reproduksi (15-49 tahun) (persen)",
"Number of women of reproductive age (15-49 years) affected by anemia (million)":
"Jumlah perempuan usia reproduksi (15-49 tahun) yang menderita anemia (juta jiwa)",
"Prevalence of obesity in the adult population (18 years and older) (percent)":
"Prevalensi obesitas pada penduduk dewasa (18 tahun ke atas) (persen)",
"Prevalence of exclusive breastfeeding among infants 0-5 months of age (percent)":
"Prevalensi pemberian ASI eksklusif pada bayi usia 0-5 bulan (persen)",
"Minimum dietary diversity for women (MDD-W) (percent)":
"Keragaman pola makan minimum untuk perempuan (MDD-W) (persen)",
# -------------------------------------------------------------------------
# STABILITY
# -------------------------------------------------------------------------
"Cereal import dependency ratio (percent)":
"Rasio ketergantungan impor sereal (persen)",
"Political stability and absence of violence/terrorism (index)":
"Stabilitas politik dan tidak adanya kekerasan/terorisme (indeks)",
"Domestic food price volatility":
"Volatilitas harga pangan domestik",
"Per capita food supply variability (kcal/cap/day)":
"Variabilitas pasokan pangan per kapita (kkal/kapita/hari)",
"Percentage of arable land equipped for irrigation (percent)":
"Persentase lahan pertanian yang dilengkapi irigasi (persen)",
"GDP per capita growth (annual %)":
"Pertumbuhan PDB per kapita (% tahunan)",
"GDP growth (annual %)":
"Pertumbuhan PDB (% tahunan)",
}
def translate_indicator(name: str) -> str:
"""Terjemahkan nama indikator ke Bahasa Indonesia. Fallback ke nama asli."""
if not name:
return name
return INDICATOR_TRANSLATION_ID.get(name, name)
def translate_pillar(name: str) -> str:
"""Terjemahkan nama pillar ke Bahasa Indonesia. Fallback ke nama asli."""
if not name:
return name
return PILLAR_TRANSLATION_ID.get(name, name)
# =============================================================================
# ANALYTICAL LAYER CLASS
# =============================================================================
class AnalyticalLayerLoader:
"""
Analytical Layer Loader for BigQuery
Key Logic:
1. Complete per country (no gaps from start_year to end_year)
2. Filter countries with all pillars
3. Ensure indicators have consistent country count across all years
4. Save dengan kolom lengkap (nama + ID + nama Indonesia) untuk Looker Studio
Output: fact_asean_food_security_selected -> DW layer (Gold) -> fs_asean_gold
Kolom tambahan:
- indicator_name_id : terjemahan Bahasa Indonesia dari indicator_name
- pillar_name_id : terjemahan Bahasa Indonesia dari pillar_name
"""
def __init__(self, client: bigquery.Client):
self.client = client
self.logger = logging.getLogger(self.__class__.__name__)
self.logger.propagate = False
self.df_clean = None
self.df_indicator = None
self.df_country = None
self.df_pillar = None
self.selected_country_ids = None
self.start_year = 2013
self.end_year = None
self.baseline_year = 2023
self.pipeline_metadata = {
'source_class' : self.__class__.__name__,
'start_time' : None,
'end_time' : None,
'duration_seconds' : None,
'rows_fetched' : 0,
'rows_transformed' : 0,
'rows_loaded' : 0,
'validation_metrics': {}
}
self.pipeline_start = None
self.pipeline_end = None
def load_source_data(self):
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 1: LOADING SOURCE DATA from fs_asean_gold")
self.logger.info("=" * 80)
try:
query = f"""
SELECT
f.country_id,
c.country_name,
f.indicator_id,
i.indicator_name,
i.direction,
f.pillar_id,
p.pillar_name,
f.time_id,
t.year,
t.start_year,
t.end_year,
t.is_year_range,
f.value,
f.source_id
FROM `{get_table_id('fact_food_security', layer='gold')}` f
JOIN `{get_table_id('dim_country', layer='gold')}` c ON f.country_id = c.country_id
JOIN `{get_table_id('dim_indicator', layer='gold')}` i ON f.indicator_id = i.indicator_id
JOIN `{get_table_id('dim_pillar', layer='gold')}` p ON f.pillar_id = p.pillar_id
JOIN `{get_table_id('dim_time', layer='gold')}` t ON f.time_id = t.time_id
"""
self.logger.info("Loading fact table with dimensions...")
self.df_clean = self.client.query(query).result().to_dataframe(create_bqstorage_client=False)
self.logger.info(f" Loaded: {len(self.df_clean):,} rows")
if 'is_year_range' in self.df_clean.columns:
yr = self.df_clean['is_year_range'].value_counts()
self.logger.info(f" Breakdown:")
self.logger.info(f" Single years (is_year_range=False): {yr.get(False, 0):,}")
self.logger.info(f" Year ranges (is_year_range=True): {yr.get(True, 0):,}")
self.df_indicator = read_from_bigquery(self.client, 'dim_indicator', layer='gold')
self.df_country = read_from_bigquery(self.client, 'dim_country', layer='gold')
self.df_pillar = read_from_bigquery(self.client, 'dim_pillar', layer='gold')
self.logger.info(f" Indicators: {len(self.df_indicator)}")
self.logger.info(f" Countries: {len(self.df_country)}")
self.logger.info(f" Pillars: {len(self.df_pillar)}")
self.pipeline_metadata['rows_fetched'] = len(self.df_clean)
return True
except Exception as e:
self.logger.error(f"Error loading source data: {e}")
raise
def determine_year_boundaries(self):
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 2: DETERMINE YEAR BOUNDARIES")
self.logger.info("=" * 80)
df_2023 = self.df_clean[self.df_clean['year'] == self.baseline_year]
baseline_indicator_count = df_2023['indicator_id'].nunique()
self.logger.info(f"\nBaseline Year: {self.baseline_year}")
self.logger.info(f"Baseline Indicator Count: {baseline_indicator_count}")
years_sorted = sorted(self.df_clean['year'].unique(), reverse=True)
selected_end_year = None
for year in years_sorted:
if year >= self.baseline_year:
df_year = self.df_clean[self.df_clean['year'] == year]
year_indicator_count = df_year['indicator_id'].nunique()
status = "OK" if year_indicator_count >= baseline_indicator_count else "X"
self.logger.info(f" [{status}] Year {int(year)}: {year_indicator_count} indicators")
if year_indicator_count >= baseline_indicator_count and selected_end_year is None:
selected_end_year = int(year)
if selected_end_year is None:
selected_end_year = self.baseline_year
self.logger.warning(f" [!] No year found, using baseline: {selected_end_year}")
else:
self.logger.info(f"\n [OK] Selected End Year: {selected_end_year}")
self.end_year = selected_end_year
original_count = len(self.df_clean)
self.df_clean = self.df_clean[
(self.df_clean['year'] >= self.start_year) &
(self.df_clean['year'] <= self.end_year)
].copy()
self.logger.info(f"\nFiltering {self.start_year}-{self.end_year}:")
self.logger.info(f" Rows before: {original_count:,}")
self.logger.info(f" Rows after: {len(self.df_clean):,}")
return self.df_clean
def filter_complete_indicators_per_country(self):
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 3: FILTER COMPLETE INDICATORS PER COUNTRY (NO GAPS)")
self.logger.info("=" * 80)
grouped = self.df_clean.groupby([
'country_id', 'country_name', 'indicator_id', 'indicator_name',
'pillar_id', 'pillar_name'
])
valid_combinations = []
removed_combinations = []
for (country_id, country_name, indicator_id, indicator_name,
pillar_id, pillar_name), group in grouped:
years_present = sorted(group['year'].unique())
start_year = int(min(years_present))
end_year_actual = int(max(years_present))
expected_years = list(range(start_year, self.end_year + 1))
missing_years = [y for y in expected_years if y not in years_present]
has_gap = len(missing_years) > 0
is_complete = (
end_year_actual >= self.end_year and
not has_gap and
(self.end_year - start_year) >= 4
)
if is_complete:
valid_combinations.append({'country_id': country_id, 'indicator_id': indicator_id})
else:
reasons = []
if end_year_actual < self.end_year:
reasons.append(f"ends {end_year_actual}")
if has_gap:
gap_str = str(missing_years[:3])[1:-1]
if len(missing_years) > 3:
gap_str += "..."
reasons.append(f"gap:{gap_str}")
if (self.end_year - start_year) < 4:
reasons.append(f"span={self.end_year - start_year}")
removed_combinations.append({
'country_name' : country_name,
'indicator_name': indicator_name,
'reasons' : ", ".join(reasons)
})
self.logger.info(f"\n [+] Valid: {len(valid_combinations):,}")
self.logger.info(f" [-] Removed: {len(removed_combinations):,}")
df_valid = pd.DataFrame(valid_combinations)
df_valid['key'] = df_valid['country_id'].astype(str) + '_' + df_valid['indicator_id'].astype(str)
self.df_clean['key'] = (self.df_clean['country_id'].astype(str) + '_' +
self.df_clean['indicator_id'].astype(str))
original_count = len(self.df_clean)
self.df_clean = self.df_clean[self.df_clean['key'].isin(df_valid['key'])].copy()
self.df_clean = self.df_clean.drop('key', axis=1)
self.logger.info(f"\n Rows before: {original_count:,}")
self.logger.info(f" Rows after: {len(self.df_clean):,}")
self.logger.info(f" Countries: {self.df_clean['country_id'].nunique()}")
self.logger.info(f" Indicators: {self.df_clean['indicator_id'].nunique()}")
return self.df_clean
def select_countries_with_all_pillars(self):
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 4: SELECT COUNTRIES WITH ALL PILLARS (FIXED SET)")
self.logger.info("=" * 80)
total_pillars = self.df_clean['pillar_id'].nunique()
country_pillar_count = self.df_clean.groupby(['country_id', 'country_name']).agg({
'pillar_id' : 'nunique',
'indicator_id': 'nunique',
'year' : lambda x: f"{int(x.min())}-{int(x.max())}"
}).reset_index()
country_pillar_count.columns = [
'country_id', 'country_name', 'pillar_count', 'indicator_count', 'year_range'
]
for _, row in country_pillar_count.sort_values('pillar_count', ascending=False).iterrows():
status = "[+] KEEP" if row['pillar_count'] == total_pillars else "[-] REMOVE"
self.logger.info(
f" {status:<12} {row['country_name']:25s} "
f"{row['pillar_count']}/{total_pillars} pillars"
)
selected_countries = country_pillar_count[country_pillar_count['pillar_count'] == total_pillars]
self.selected_country_ids = selected_countries['country_id'].tolist()
self.logger.info(f"\n FIXED SET: {len(self.selected_country_ids)} countries")
original_count = len(self.df_clean)
self.df_clean = self.df_clean[self.df_clean['country_id'].isin(self.selected_country_ids)].copy()
self.logger.info(f" Rows before: {original_count:,}")
self.logger.info(f" Rows after: {len(self.df_clean):,}")
return self.df_clean
def filter_indicators_consistent_across_fixed_countries(self):
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 5: FILTER INDICATORS WITH CONSISTENT PRESENCE")
self.logger.info("=" * 80)
indicator_country_start = self.df_clean.groupby([
'indicator_id', 'indicator_name', 'country_id'
])['year'].min().reset_index()
indicator_country_start.columns = ['indicator_id', 'indicator_name', 'country_id', 'start_year']
indicator_max_start = indicator_country_start.groupby([
'indicator_id', 'indicator_name'
])['start_year'].max().reset_index()
indicator_max_start.columns = ['indicator_id', 'indicator_name', 'max_start_year']
valid_indicators = []
removed_indicators = []
for _, ind_row in indicator_max_start.iterrows():
indicator_id = ind_row['indicator_id']
indicator_name = ind_row['indicator_name']
max_start = int(ind_row['max_start_year'])
span = self.end_year - max_start
if span < 4:
removed_indicators.append({
'indicator_name': indicator_name,
'reason' : f"span={span} < 4"
})
continue
expected_years = list(range(max_start, self.end_year + 1))
ind_data = self.df_clean[self.df_clean['indicator_id'] == indicator_id]
all_years_complete = True
problematic_years = []
for year in expected_years:
country_count = ind_data[ind_data['year'] == year]['country_id'].nunique()
if country_count < len(self.selected_country_ids):
all_years_complete = False
problematic_years.append(f"{int(year)}({country_count})")
if all_years_complete:
valid_indicators.append(indicator_id)
else:
removed_indicators.append({
'indicator_name': indicator_name,
'reason' : f"missing countries in years: {', '.join(problematic_years[:5])}"
})
self.logger.info(f"\n [+] Valid: {len(valid_indicators)}")
self.logger.info(f" [-] Removed: {len(removed_indicators)}")
if not valid_indicators:
raise ValueError("No valid indicators found after filtering!")
original_count = len(self.df_clean)
self.df_clean = self.df_clean[self.df_clean['indicator_id'].isin(valid_indicators)].copy()
self.df_clean = self.df_clean.merge(
indicator_max_start[['indicator_id', 'max_start_year']], on='indicator_id', how='left'
)
self.df_clean = self.df_clean[self.df_clean['year'] >= self.df_clean['max_start_year']].copy()
self.df_clean = self.df_clean.drop('max_start_year', axis=1)
self.logger.info(f"\n Rows before: {original_count:,}")
self.logger.info(f" Rows after: {len(self.df_clean):,}")
self.logger.info(f" Countries: {self.df_clean['country_id'].nunique()}")
self.logger.info(f" Indicators: {self.df_clean['indicator_id'].nunique()}")
self.logger.info(f" Pillars: {self.df_clean['pillar_id'].nunique()}")
return self.df_clean
def verify_no_gaps(self):
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 6: VERIFY NO GAPS")
self.logger.info("=" * 80)
expected_countries = len(self.selected_country_ids)
verification = self.df_clean.groupby(['indicator_id', 'year'])['country_id'].nunique().reset_index()
verification.columns = ['indicator_id', 'year', 'country_count']
all_good = (verification['country_count'] == expected_countries).all()
if all_good:
self.logger.info(f" VERIFICATION PASSED — all combinations have {expected_countries} countries")
else:
bad = verification[verification['country_count'] != expected_countries]
for _, row in bad.head(10).iterrows():
self.logger.error(
f" Indicator {int(row['indicator_id'])}, Year {int(row['year'])}: "
f"{int(row['country_count'])} countries (expected {expected_countries})"
)
raise ValueError("Gap verification failed!")
return True
def analyze_indicator_availability_by_year(self):
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 7: ANALYZE INDICATOR AVAILABILITY BY YEAR")
self.logger.info("=" * 80)
year_stats = self.df_clean.groupby('year').agg({
'indicator_id': 'nunique',
'country_id' : 'nunique'
}).reset_index()
year_stats.columns = ['year', 'indicator_count', 'country_count']
self.logger.info(f"\n{'Year':<8} {'Indicators':<15} {'Countries':<12} {'Rows'}")
self.logger.info("-" * 50)
for _, row in year_stats.iterrows():
year = int(row['year'])
row_count = len(self.df_clean[self.df_clean['year'] == year])
self.logger.info(
f"{year:<8} {int(row['indicator_count']):<15} "
f"{int(row['country_count']):<12} {row_count:,}"
)
indicator_details = self.df_clean.groupby([
'indicator_id', 'indicator_name', 'pillar_name', 'direction'
]).agg({'year': ['min', 'max'], 'country_id': 'nunique'}).reset_index()
indicator_details.columns = [
'indicator_id', 'indicator_name', 'pillar_name', 'direction',
'start_year', 'end_year', 'country_count'
]
indicator_details['year_range'] = (
indicator_details['start_year'].astype(int).astype(str) + '-' +
indicator_details['end_year'].astype(int).astype(str)
)
indicator_details = indicator_details.sort_values(['pillar_name', 'start_year', 'indicator_name'])
self.logger.info(f"\nTotal Indicators: {len(indicator_details)}")
for pillar, count in indicator_details.groupby('pillar_name').size().items():
self.logger.info(f" {pillar}: {count} indicators")
self.logger.info(f"\n{'-'*100}")
self.logger.info(f"{'ID':<5} {'Indicator Name':<55} {'Pillar':<15} {'Years':<12} {'Dir':<8} {'Countries'}")
self.logger.info(f"{'-'*100}")
for _, row in indicator_details.iterrows():
direction = 'higher+' if row['direction'] == 'higher_better' else 'lower-'
self.logger.info(
f"{int(row['indicator_id']):<5} {row['indicator_name'][:52]:<55} "
f"{row['pillar_name'][:13]:<15} {row['year_range']:<12} "
f"{direction:<8} {int(row['country_count'])}"
)
return year_stats
def save_analytical_table(self):
table_name = 'fact_asean_food_security_selected'
self.logger.info("\n" + "=" * 80)
self.logger.info(f"STEP 8: SAVE TO [DW/Gold] {table_name} -> fs_asean_gold")
self.logger.info("=" * 80)
try:
analytical_df = self.df_clean[[
'country_id',
'country_name',
'indicator_id',
'indicator_name',
'direction',
'pillar_id',
'pillar_name',
'time_id',
'year',
'value',
]].copy()
# ------------------------------------------------------------------
# TAMBAHAN: kolom terjemahan Bahasa Indonesia
# indicator_name_id : terjemahan Bahasa Indonesia dari indicator_name
# pillar_name_id : terjemahan Bahasa Indonesia dari pillar_name
# ------------------------------------------------------------------
analytical_df['indicator_name_id'] = analytical_df['indicator_name'].apply(translate_indicator)
analytical_df['pillar_name_id'] = analytical_df['pillar_name'].apply(translate_pillar)
# Log indikator yang belum punya terjemahan (fallback ke nama asli)
no_trans_ind = analytical_df[
analytical_df['indicator_name_id'] == analytical_df['indicator_name']
]['indicator_name'].unique()
if len(no_trans_ind) > 0:
self.logger.warning(
f" [TRANSLATION] {len(no_trans_ind)} indicator(s) tidak ada di kamus "
f"(menggunakan nama asli): {list(no_trans_ind)[:5]}"
)
no_trans_pil = analytical_df[
analytical_df['pillar_name_id'] == analytical_df['pillar_name']
]['pillar_name'].unique()
if len(no_trans_pil) > 0:
self.logger.warning(
f" [TRANSLATION] {len(no_trans_pil)} pillar(s) tidak ada di kamus "
f"(menggunakan nama asli): {list(no_trans_pil)}"
)
analytical_df = analytical_df.sort_values(
['year', 'country_name', 'pillar_name', 'indicator_name']
).reset_index(drop=True)
# Pastikan tipe data konsisten
analytical_df['country_id'] = analytical_df['country_id'].astype(int)
analytical_df['country_name'] = analytical_df['country_name'].astype(str)
analytical_df['indicator_id'] = analytical_df['indicator_id'].astype(int)
analytical_df['indicator_name'] = analytical_df['indicator_name'].astype(str)
analytical_df['indicator_name_id'] = analytical_df['indicator_name_id'].astype(str)
analytical_df['direction'] = analytical_df['direction'].astype(str)
analytical_df['pillar_id'] = analytical_df['pillar_id'].astype(int)
analytical_df['pillar_name'] = analytical_df['pillar_name'].astype(str)
analytical_df['pillar_name_id'] = analytical_df['pillar_name_id'].astype(str)
analytical_df['time_id'] = analytical_df['time_id'].astype(int)
analytical_df['year'] = analytical_df['year'].astype(int)
analytical_df['value'] = analytical_df['value'].astype(float)
self.logger.info(f" Kolom yang disimpan: {list(analytical_df.columns)}")
self.logger.info(f" Total rows: {len(analytical_df):,}")
# Schema BigQuery
schema = [
bigquery.SchemaField("country_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("country_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("indicator_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("indicator_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("indicator_name_id", "STRING", mode="REQUIRED"),
bigquery.SchemaField("direction", "STRING", mode="REQUIRED"),
bigquery.SchemaField("pillar_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("pillar_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("pillar_name_id", "STRING", mode="REQUIRED"),
bigquery.SchemaField("time_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("value", "FLOAT", mode="REQUIRED"),
]
rows_loaded = load_to_bigquery(
self.client, analytical_df, table_name,
layer='gold', write_disposition="WRITE_TRUNCATE", schema=schema
)
self.pipeline_metadata['rows_loaded'] = rows_loaded
log_update(self.client, 'DW', table_name, 'full_load', rows_loaded)
metadata = {
'source_class' : self.__class__.__name__,
'table_name' : table_name,
'execution_timestamp': self.pipeline_start,
'duration_seconds' : (datetime.now() - self.pipeline_start).total_seconds(),
'rows_fetched' : self.pipeline_metadata['rows_fetched'],
'rows_transformed' : rows_loaded,
'rows_loaded' : rows_loaded,
'completeness_pct' : 100.0,
'config_snapshot' : json.dumps({
'start_year' : self.start_year,
'end_year' : self.end_year,
'fixed_countries': len(self.selected_country_ids),
'no_gaps' : True,
'layer' : 'gold',
'columns' : 'id + name + name_id (Looker Studio ready)'
}),
'validation_metrics' : json.dumps({
'fixed_countries' : len(self.selected_country_ids),
'total_indicators': int(self.df_clean['indicator_id'].nunique())
})
}
save_etl_metadata(self.client, metadata)
self.logger.info(f" [OK] {table_name}: {rows_loaded:,} rows -> [DW/Gold] fs_asean_gold")
self.logger.info(f" Metadata -> [AUDIT] etl_metadata")
return rows_loaded
except Exception as e:
self.logger.error(f"Error saving: {e}")
raise
def run(self):
self.pipeline_start = datetime.now()
self.pipeline_metadata['start_time'] = self.pipeline_start
self.logger.info("\n" + "=" * 80)
self.logger.info("Output: fact_asean_food_security_selected -> fs_asean_gold")
self.logger.info("=" * 80)
self.load_source_data()
self.determine_year_boundaries()
self.filter_complete_indicators_per_country()
self.select_countries_with_all_pillars()
self.filter_indicators_consistent_across_fixed_countries()
self.verify_no_gaps()
self.analyze_indicator_availability_by_year()
self.save_analytical_table()
self.pipeline_end = datetime.now()
duration = (self.pipeline_end - self.pipeline_start).total_seconds()
self.logger.info("\n" + "=" * 80)
self.logger.info("COMPLETED")
self.logger.info("=" * 80)
self.logger.info(f" Duration : {duration:.2f}s")
self.logger.info(f" Year Range : {self.start_year}-{self.end_year}")
self.logger.info(f" Countries : {len(self.selected_country_ids)}")
self.logger.info(f" Indicators : {self.df_clean['indicator_id'].nunique()}")
self.logger.info(f" Rows Loaded: {self.pipeline_metadata['rows_loaded']:,}")
# =============================================================================
# AIRFLOW TASK FUNCTION
# =============================================================================
def run_analytical_layer():
"""
Airflow task: Build fact_asean_food_security_selected dari fact_food_security + dims.
Dipanggil setelah dimensional_model_to_gold selesai.
"""
from scripts.bigquery_config import get_bigquery_client
client = get_bigquery_client()
loader = AnalyticalLayerLoader(client)
loader.run()
print(f"Analytical layer loaded: {loader.pipeline_metadata['rows_loaded']:,} rows")
# =============================================================================
# MAIN EXECUTION
# =============================================================================
if __name__ == "__main__":
print("=" * 80)
print("Output: fact_asean_food_security_selected -> fs_asean_gold")
print("=" * 80)
logger = setup_logging()
client = get_bigquery_client()
loader = AnalyticalLayerLoader(client)
loader.run()
print("\n" + "=" * 80)
print("[OK] COMPLETED")
print("=" * 80)