Compare commits

...

2 Commits

Author SHA1 Message Date
Debby
ddc9fb3b48 Merge branch 'main' of https://git.ifuntanhub.dev/izu/airflow-coolify 2026-03-31 13:57:12 +07:00
Debby
ddf15ca9a5 move sdgs to analytical_layer 2026-03-31 13:54:20 +07:00
3 changed files with 321 additions and 237 deletions

View File

@@ -1,6 +1,6 @@
""" """
BIGQUERY ANALYTICAL LAYER - DATA FILTERING BIGQUERY ANALYTICAL LAYER - DATA FILTERING
FIXED: fact_asean_food_security_selected disimpan di fs_asean_gold (layer='gold') fact_asean_food_security_selected disimpan di fs_asean_gold (layer='gold')
Filtering Order: Filtering Order:
1. Load data (single years only) 1. Load data (single years only)
@@ -8,15 +8,20 @@ Filtering Order:
3. Filter complete indicators PER COUNTRY (auto-detect start year, no gaps) 3. Filter complete indicators PER COUNTRY (auto-detect start year, no gaps)
4. Filter countries with ALL pillars (FIXED SET) 4. Filter countries with ALL pillars (FIXED SET)
5. Filter indicators with consistent presence across FIXED countries 5. Filter indicators with consistent presence across FIXED countries
6. Calculate YoY per indicator per country 6. Determine SDGs start year & assign framework (MDGs/SDGs) per indicator
7. Save analytical table (dengan nama/label lengkap + kolom framework + YoY untuk Looker Studio) 7. Calculate YoY per indicator per country
8. Analyze indicator availability by year
9. Save analytical table (dengan nama/label lengkap + kolom framework + YoY untuk Looker Studio)
UPDATED: FRAMEWORK LOGIC:
- Kolom 'framework' (MDGs/SDGs) dipropagasi dari dim_indicator ke tabel output. - SDG_START_YEAR = 2016 (default; auto-detect jika indikator SDGs pertama kali muncul lebih awal/lambat)
Hal ini memungkinkan Looker Studio melakukan filter/slice berdasarkan framework - Indikator yang namanya ada di SDG_INDICATOR_KEYWORDS:
tanpa perlu join ulang ke dim_indicator. * Jika data mulai >= SDG_START_YEAR -> 'SDGs'
- Kolom 'yoy_change' dan 'yoy_pct' ditambahkan untuk analisis Year-over-Year * Jika data mulai < SDG_START_YEAR -> 'MDGs'
per indikator per negara langsung di Looker Studio. (artinya indikator ini sudah ada sebelum SDGs, mis. undernourishment)
- Indikator yang namanya TIDAK ada di SDG_INDICATOR_KEYWORDS -> 'MDGs'
- Penentuan framework dilakukan SETELAH filter selesai (data sudah bersih & range sudah fixed)
sehingga start_year per indikator yang digunakan adalah start_year AKTUAL di dataset ini.
""" """
import pandas as pd import pandas as pd
@@ -42,6 +47,81 @@ from scripts.bigquery_helpers import (
from google.cloud import bigquery from google.cloud import bigquery
# =============================================================================
# SDG INDICATOR KEYWORDS
# =============================================================================
# Daftar nama indikator (lowercase) yang termasuk dalam SDG Goal 2.
# Matching dilakukan dengan `kw in indicator_name.lower()` sehingga
# partial match tetap valid (menangani variasi format nama).
#
# Logika framework:
# - Nama ada di set ini + start_year >= SDG_START_YEAR -> 'SDGs'
# - Nama ada di set ini + start_year < SDG_START_YEAR -> 'MDGs'
# (indikator sudah eksis sebelum SDGs, mis. prevalence of undernourishment)
# - Nama TIDAK ada di set ini -> 'MDGs'
SDG_INDICATOR_KEYWORDS = frozenset([
# TARGET 2.1.1 — Prevalence of undernourishment (shared, sudah ada sebelum SDGs)
"prevalence of undernourishment (percent) (3-year average)",
"number of people undernourished (million) (3-year average)",
# TARGET 2.1.2 — FIES (SDGs only)
"prevalence of severe food insecurity in the total population (percent) (3-year average)",
"prevalence of severe food insecurity in the male adult population (percent) (3-year average)",
"prevalence of severe food insecurity in the female adult population (percent) (3-year average)",
"prevalence of moderate or severe food insecurity in the total population (percent) (3-year average)",
"prevalence of moderate or severe food insecurity in the male adult population (percent) (3-year average)",
"prevalence of moderate or severe food insecurity in the female adult population (percent) (3-year average)",
"number of severely food insecure people (million) (3-year average)",
"number of severely food insecure male adults (million) (3-year average)",
"number of severely food insecure female adults (million) (3-year average)",
"number of moderately or severely food insecure people (million) (3-year average)",
"number of moderately or severely food insecure male adults (million) (3-year average)",
"number of moderately or severely food insecure female adults (million) (3-year average)",
# TARGET 2.2.1 — Stunting (shared)
"percentage of children under 5 years of age who are stunted (modelled estimates) (percent)",
"number of children under 5 years of age who are stunted (modeled estimates) (million)",
# TARGET 2.2.2 — Wasting & Overweight (shared)
"percentage of children under 5 years affected by wasting (percent)",
"number of children under 5 years affected by wasting (million)",
"percentage of children under 5 years of age who are overweight (modelled estimates) (percent)",
"number of children under 5 years of age who are overweight (modeled estimates) (million)",
# TARGET 2.2.3 — Anaemia (SDGs only)
"prevalence of anemia among women of reproductive age (15-49 years) (percent)",
"number of women of reproductive age (15-49 years) affected by anemia (million)",
])
# Tahun resmi SDGs mulai berlaku (2030 Agenda adopted September 2015,
# data reporting mulai 2016). Dipakai sebagai default jika auto-detect gagal.
SDG_START_YEAR_DEFAULT = 2016
def assign_framework_dynamic(
indicator_name: str,
indicator_start_year: int,
sdg_start_year: int,
) -> str:
"""
Tentukan framework (MDGs/SDGs) berdasarkan:
1. Apakah nama indikator ada di SDG_INDICATOR_KEYWORDS?
2. Apakah data indikator ini mulai pada tahun >= sdg_start_year?
Args:
indicator_name : Nama indikator (akan di-lowercase untuk matching)
indicator_start_year : Tahun pertama data indikator ini tersedia di dataset
sdg_start_year : Tahun mulai SDGs (dari auto-detect atau default)
Returns:
'SDGs' jika indikator termasuk SDG list DAN mulai >= sdg_start_year
'MDGs' untuk semua kasus lainnya
"""
ind_lower = str(indicator_name).lower().strip()
is_sdg_name = any(kw in ind_lower for kw in SDG_INDICATOR_KEYWORDS)
if is_sdg_name and indicator_start_year >= sdg_start_year:
return 'SDGs'
return 'MDGs'
# ============================================================================= # =============================================================================
# ANALYTICAL LAYER CLASS # ANALYTICAL LAYER CLASS
# ============================================================================= # =============================================================================
@@ -54,8 +134,9 @@ class AnalyticalLayerLoader:
1. Complete per country (no gaps from start_year to end_year) 1. Complete per country (no gaps from start_year to end_year)
2. Filter countries with all pillars 2. Filter countries with all pillars
3. Ensure indicators have consistent country count across all years 3. Ensure indicators have consistent country count across all years
4. Calculate YoY (year-over-year) change per indicator per country 4. Determine SDGs start year & assign framework per indicator dynamically
5. Save dengan kolom lengkap (nama + ID + framework + YoY) untuk Looker Studio 5. Calculate YoY (year-over-year) change per indicator per country
6. Save dengan kolom lengkap (nama + ID + framework + YoY) untuk Looker Studio
Output: fact_asean_food_security_selected -> DW layer (Gold) -> fs_asean_gold Output: fact_asean_food_security_selected -> DW layer (Gold) -> fs_asean_gold
@@ -83,6 +164,9 @@ class AnalyticalLayerLoader:
self.end_year = None self.end_year = None
self.baseline_year = 2023 self.baseline_year = 2023
# SDGs-related — di-set oleh determine_sdg_start_year()
self.sdg_start_year = SDG_START_YEAR_DEFAULT
self.pipeline_metadata = { self.pipeline_metadata = {
'source_class' : self.__class__.__name__, 'source_class' : self.__class__.__name__,
'start_time' : None, 'start_time' : None,
@@ -97,13 +181,18 @@ class AnalyticalLayerLoader:
self.pipeline_start = None self.pipeline_start = None
self.pipeline_end = None self.pipeline_end = None
# ------------------------------------------------------------------
# STEP 1: LOAD SOURCE DATA
# ------------------------------------------------------------------
def load_source_data(self): def load_source_data(self):
self.logger.info("\n" + "=" * 80) self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 1: LOADING SOURCE DATA from fs_asean_gold") self.logger.info("STEP 1: LOADING SOURCE DATA from fs_asean_gold")
self.logger.info("=" * 80) self.logger.info("=" * 80)
try: try:
# Sertakan kolom framework dari dim_indicator dalam query # Tidak include framework dari dim_indicator
# framework akan ditentukan dinamis di Step 6 (determine_sdg_start_year)
query = f""" query = f"""
SELECT SELECT
f.country_id, f.country_id,
@@ -111,7 +200,6 @@ class AnalyticalLayerLoader:
f.indicator_id, f.indicator_id,
i.indicator_name, i.indicator_name,
i.direction, i.direction,
i.framework,
f.pillar_id, f.pillar_id,
p.pillar_name, p.pillar_name,
f.time_id, f.time_id,
@@ -128,7 +216,7 @@ class AnalyticalLayerLoader:
JOIN `{get_table_id('dim_time', layer='gold')}` t ON f.time_id = t.time_id JOIN `{get_table_id('dim_time', layer='gold')}` t ON f.time_id = t.time_id
""" """
self.logger.info("Loading fact table with dimensions (incl. framework)...") self.logger.info("Loading fact table with dimensions...")
self.df_clean = self.client.query(query).result().to_dataframe( self.df_clean = self.client.query(query).result().to_dataframe(
create_bqstorage_client=False create_bqstorage_client=False
) )
@@ -144,19 +232,6 @@ class AnalyticalLayerLoader:
f" Year ranges (is_year_range=True): {yr.get(True, 0):,}" f" Year ranges (is_year_range=True): {yr.get(True, 0):,}"
) )
# Validasi kolom framework tersedia
if 'framework' not in self.df_clean.columns:
raise ValueError(
"Kolom 'framework' tidak ditemukan di dim_indicator. "
"Pastikan bigquery_cleaned_layer.py dan bigquery_dimensional_model.py "
"sudah dijalankan dengan versi terbaru."
)
fw_dist = self.df_clean.drop_duplicates('indicator_id')['framework'].value_counts()
self.logger.info(f" Framework distribution (per indikator unik):")
for fw, cnt in fw_dist.items():
self.logger.info(f" {fw}: {cnt} indicators")
self.df_indicator = read_from_bigquery(self.client, 'dim_indicator', layer='gold') self.df_indicator = read_from_bigquery(self.client, 'dim_indicator', layer='gold')
self.df_country = read_from_bigquery(self.client, 'dim_country', layer='gold') self.df_country = read_from_bigquery(self.client, 'dim_country', layer='gold')
self.df_pillar = read_from_bigquery(self.client, 'dim_pillar', layer='gold') self.df_pillar = read_from_bigquery(self.client, 'dim_pillar', layer='gold')
@@ -172,6 +247,10 @@ class AnalyticalLayerLoader:
self.logger.error(f"Error loading source data: {e}") self.logger.error(f"Error loading source data: {e}")
raise raise
# ------------------------------------------------------------------
# STEP 2: DETERMINE YEAR BOUNDARIES
# ------------------------------------------------------------------
def determine_year_boundaries(self): def determine_year_boundaries(self):
self.logger.info("\n" + "=" * 80) self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 2: DETERMINE YEAR BOUNDARIES") self.logger.info("STEP 2: DETERMINE YEAR BOUNDARIES")
@@ -214,6 +293,10 @@ class AnalyticalLayerLoader:
self.logger.info(f" Rows after: {len(self.df_clean):,}") self.logger.info(f" Rows after: {len(self.df_clean):,}")
return self.df_clean return self.df_clean
# ------------------------------------------------------------------
# STEP 3: FILTER COMPLETE INDICATORS PER COUNTRY
# ------------------------------------------------------------------
def filter_complete_indicators_per_country(self): def filter_complete_indicators_per_country(self):
self.logger.info("\n" + "=" * 80) self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 3: FILTER COMPLETE INDICATORS PER COUNTRY (NO GAPS)") self.logger.info("STEP 3: FILTER COMPLETE INDICATORS PER COUNTRY (NO GAPS)")
@@ -285,6 +368,10 @@ class AnalyticalLayerLoader:
self.logger.info(f" Indicators: {self.df_clean['indicator_id'].nunique()}") self.logger.info(f" Indicators: {self.df_clean['indicator_id'].nunique()}")
return self.df_clean return self.df_clean
# ------------------------------------------------------------------
# STEP 4: SELECT COUNTRIES WITH ALL PILLARS
# ------------------------------------------------------------------
def select_countries_with_all_pillars(self): def select_countries_with_all_pillars(self):
self.logger.info("\n" + "=" * 80) self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 4: SELECT COUNTRIES WITH ALL PILLARS (FIXED SET)") self.logger.info("STEP 4: SELECT COUNTRIES WITH ALL PILLARS (FIXED SET)")
@@ -323,6 +410,10 @@ class AnalyticalLayerLoader:
self.logger.info(f" Rows after: {len(self.df_clean):,}") self.logger.info(f" Rows after: {len(self.df_clean):,}")
return self.df_clean return self.df_clean
# ------------------------------------------------------------------
# STEP 5: FILTER INDICATORS CONSISTENT ACROSS FIXED COUNTRIES
# ------------------------------------------------------------------
def filter_indicators_consistent_across_fixed_countries(self): def filter_indicators_consistent_across_fixed_countries(self):
self.logger.info("\n" + "=" * 80) self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 5: FILTER INDICATORS WITH CONSISTENT PRESENCE") self.logger.info("STEP 5: FILTER INDICATORS WITH CONSISTENT PRESENCE")
@@ -404,9 +495,138 @@ class AnalyticalLayerLoader:
self.logger.info(f" Pillars: {self.df_clean['pillar_id'].nunique()}") self.logger.info(f" Pillars: {self.df_clean['pillar_id'].nunique()}")
return self.df_clean return self.df_clean
# ------------------------------------------------------------------
# STEP 6: DETERMINE SDG START YEAR & ASSIGN FRAMEWORK
# ------------------------------------------------------------------
def determine_sdg_start_year(self):
"""
Tentukan tahun mulai SDGs secara otomatis dari data aktual, lalu
assign kolom 'framework' (MDGs/SDGs) ke setiap baris di df_clean.
Logika penentuan SDG_START_YEAR:
- Cari indikator yang namanya ada di SDG_INDICATOR_KEYWORDS (FIES, anaemia, dll.)
dan yang diyakini HANYA ada di SDGs (bukan shared dengan MDGs).
Proxy: indikator dengan keyword 'food insecurity' atau 'anemia'.
- Ambil tahun pertama (min year) dari indikator-indikator tersebut di dataset ini.
- Jika ditemukan -> sdg_start_year = tahun pertama itu.
- Jika tidak ditemukan -> sdg_start_year = SDG_START_YEAR_DEFAULT (2016).
Logika assign framework per indikator (assign_framework_dynamic):
- Nama ada di SDG_INDICATOR_KEYWORDS + start_year >= sdg_start_year -> 'SDGs'
- Nama ada di SDG_INDICATOR_KEYWORDS + start_year < sdg_start_year -> 'MDGs'
(indikator seperti undernourishment sudah ada sebelum SDGs)
- Nama TIDAK ada di SDG_INDICATOR_KEYWORDS -> 'MDGs'
"""
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 6: DETERMINE SDG START YEAR & ASSIGN FRAMEWORK")
self.logger.info("=" * 80)
# --- 6a. Auto-detect SDG start year dari data aktual ---
# Proxy SDGs-only: indikator yang pasti baru di SDGs (FIES & anaemia)
sdg_proxy_keywords = [
'food insecurity',
'anemia',
'anaemia',
]
sdg_proxy_mask = self.df_clean['indicator_name'].str.lower().apply(
lambda n: any(kw in n for kw in sdg_proxy_keywords)
)
df_sdg_proxy = self.df_clean[sdg_proxy_mask]
if len(df_sdg_proxy) > 0:
detected_start = int(df_sdg_proxy['year'].min())
self.sdg_start_year = detected_start
self.logger.info(
f"\n [OK] SDG start year AUTO-DETECTED dari data: {self.sdg_start_year}"
)
self.logger.info(f" Proxy indicators used (sample):")
proxy_sample = (
df_sdg_proxy['indicator_name']
.drop_duplicates()
.head(5)
.tolist()
)
for ind in proxy_sample:
self.logger.info(f" - {ind}")
else:
self.sdg_start_year = SDG_START_YEAR_DEFAULT
self.logger.warning(
f"\n [WARN] SDG proxy indicators not found in dataset. "
f"Using default: {self.sdg_start_year}"
)
self.logger.info(f"\n SDG_START_YEAR = {self.sdg_start_year}")
# --- 6b. Hitung start_year aktual per indikator di dataset ini ---
indicator_start = (
self.df_clean
.groupby(['indicator_id', 'indicator_name'])['year']
.min()
.reset_index()
)
indicator_start.columns = ['indicator_id', 'indicator_name', 'actual_start_year']
# --- 6c. Assign framework per indikator ---
indicator_start['framework'] = indicator_start.apply(
lambda row: assign_framework_dynamic(
indicator_name = row['indicator_name'],
indicator_start_year = int(row['actual_start_year']),
sdg_start_year = self.sdg_start_year,
),
axis=1
)
# --- 6d. Log hasil assignment ---
self.logger.info(f"\n Framework assignment per indicator:")
self.logger.info(f" {'-'*85}")
self.logger.info(
f" {'ID':<5} {'Framework':<10} {'Start Yr':<10} {'Indicator Name'}"
)
self.logger.info(f" {'-'*85}")
for _, row in indicator_start.sort_values(
['framework', 'actual_start_year', 'indicator_name']
).iterrows():
is_in_sdg_list = any(
kw in str(row['indicator_name']).lower()
for kw in SDG_INDICATOR_KEYWORDS
)
note = " [in SDG list]" if is_in_sdg_list else ""
self.logger.info(
f" {int(row['indicator_id']):<5} {row['framework']:<10} "
f"{int(row['actual_start_year']):<10} {row['indicator_name'][:55]}{note}"
)
fw_summary = indicator_start['framework'].value_counts()
self.logger.info(f"\n Framework summary:")
for fw, cnt in fw_summary.items():
self.logger.info(f" {fw}: {cnt} indicators")
# --- 6e. Merge framework ke df_clean ---
self.df_clean = self.df_clean.merge(
indicator_start[['indicator_id', 'framework']],
on='indicator_id', how='left'
)
self.df_clean['framework'] = self.df_clean['framework'].fillna('MDGs')
self.logger.info(f"\n [OK] Kolom 'framework' ditambahkan ke df_clean")
self.logger.info(
f" Row distribution — MDGs: "
f"{(self.df_clean['framework'] == 'MDGs').sum():,} | "
f"SDGs: {(self.df_clean['framework'] == 'SDGs').sum():,}"
)
return self.df_clean
# ------------------------------------------------------------------
# STEP 6b: VERIFY NO GAPS
# ------------------------------------------------------------------
def verify_no_gaps(self): def verify_no_gaps(self):
self.logger.info("\n" + "=" * 80) self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 6: VERIFY NO GAPS") self.logger.info("STEP 6c: VERIFY NO GAPS")
self.logger.info("=" * 80) self.logger.info("=" * 80)
expected_countries = len(self.selected_country_ids) expected_countries = len(self.selected_country_ids)
@@ -431,44 +651,35 @@ class AnalyticalLayerLoader:
return True return True
# ------------------------------------------------------------------
# STEP 7: CALCULATE YOY
# ------------------------------------------------------------------
def calculate_yoy(self): def calculate_yoy(self):
""" """
Hitung Year-over-Year (YoY) per indikator per negara. Hitung Year-over-Year (YoY) per indikator per negara.
Kolom yang ditambahkan ke df_clean: Kolom yang ditambahkan:
yoy_change : selisih absolut -> value - value_tahun_sebelumnya yoy_change : selisih absolut -> value - value_tahun_sebelumnya
yoy_pct : perubahan relatif -> (yoy_change / abs(value_prev)) * 100 yoy_pct : perubahan relatif -> (yoy_change / abs(value_prev)) * 100
Catatan: Baris tahun pertama per kombinasi country-indicator bernilai NULL (intentional).
- Baris tahun pertama per kombinasi country-indicator akan bernilai NULL
(tidak ada tahun sebelumnya sebagai pembanding) — ini intentional.
- value_prev di-drop setelah kalkulasi, tidak ikut disimpan ke BigQuery.
- Dilakukan SETELAH verify_no_gaps() agar data sudah clean dan sorted benar.
""" """
self.logger.info("\n" + "=" * 80) self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 6b: CALCULATE YEAR-OVER-YEAR (YoY) PER INDICATOR PER COUNTRY") self.logger.info("STEP 7: CALCULATE YEAR-OVER-YEAR (YoY) PER INDICATOR PER COUNTRY")
self.logger.info("=" * 80) self.logger.info("=" * 80)
df = self.df_clean.sort_values(['country_id', 'indicator_id', 'year']).copy() df = self.df_clean.sort_values(['country_id', 'indicator_id', 'year']).copy()
# Nilai tahun sebelumnya (shifted within each country-indicator group)
df['value_prev'] = df.groupby(['country_id', 'indicator_id'])['value'].shift(1) df['value_prev'] = df.groupby(['country_id', 'indicator_id'])['value'].shift(1)
# YoY absolute change: value(t) - value(t-1)
df['yoy_change'] = df['value'] - df['value_prev'] df['yoy_change'] = df['value'] - df['value_prev']
# YoY percentage change: (yoy_change / |value_prev|) * 100
# Hindari division by zero — jika value_prev == 0 atau NaN, hasilnya NaN
df['yoy_pct'] = np.where( df['yoy_pct'] = np.where(
df['value_prev'].notna() & (df['value_prev'] != 0), df['value_prev'].notna() & (df['value_prev'] != 0),
(df['yoy_change'] / df['value_prev'].abs()) * 100, (df['yoy_change'] / df['value_prev'].abs()) * 100,
np.nan np.nan
) )
# Drop kolom bantu value_prev, tidak ikut disimpan ke BigQuery
df = df.drop(columns=['value_prev']) df = df.drop(columns=['value_prev'])
# Log ringkasan
total_rows = len(df) total_rows = len(df)
valid_yoy = df['yoy_pct'].notna().sum() valid_yoy = df['yoy_pct'].notna().sum()
null_yoy = df['yoy_pct'].isna().sum() null_yoy = df['yoy_pct'].isna().sum()
@@ -477,7 +688,6 @@ class AnalyticalLayerLoader:
self.logger.info(f" YoY calculated : {valid_yoy:,}") self.logger.info(f" YoY calculated : {valid_yoy:,}")
self.logger.info(f" YoY NULL (base yr): {null_yoy:,} <- tahun pertama per country-indicator") self.logger.info(f" YoY NULL (base yr): {null_yoy:,} <- tahun pertama per country-indicator")
# Log distribusi YoY per indikator (sample)
per_ind = ( per_ind = (
df[df['yoy_pct'].notna()] df[df['yoy_pct'].notna()]
.groupby(['indicator_id', 'indicator_name'])['yoy_pct'] .groupby(['indicator_id', 'indicator_name'])['yoy_pct']
@@ -504,7 +714,6 @@ class AnalyticalLayerLoader:
f"{row['min']:>+8.2f} {row['max']:>+8.2f}" f"{row['min']:>+8.2f} {row['max']:>+8.2f}"
) )
# Log distribusi YoY per negara (ringkasan)
per_country = ( per_country = (
df[df['yoy_pct'].notna()] df[df['yoy_pct'].notna()]
.groupby(['country_id', 'country_name'])['yoy_pct'] .groupby(['country_id', 'country_name'])['yoy_pct']
@@ -526,9 +735,13 @@ class AnalyticalLayerLoader:
self.logger.info(f"\n [OK] YoY columns added: yoy_change, yoy_pct") self.logger.info(f"\n [OK] YoY columns added: yoy_change, yoy_pct")
return self.df_clean return self.df_clean
# ------------------------------------------------------------------
# STEP 8: ANALYZE INDICATOR AVAILABILITY BY YEAR
# ------------------------------------------------------------------
def analyze_indicator_availability_by_year(self): def analyze_indicator_availability_by_year(self):
self.logger.info("\n" + "=" * 80) self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 7: ANALYZE INDICATOR AVAILABILITY BY YEAR") self.logger.info("STEP 8: ANALYZE INDICATOR AVAILABILITY BY YEAR")
self.logger.info("=" * 80) self.logger.info("=" * 80)
year_stats = self.df_clean.groupby('year').agg({ year_stats = self.df_clean.groupby('year').agg({
@@ -586,6 +799,10 @@ class AnalyticalLayerLoader:
return year_stats return year_stats
# ------------------------------------------------------------------
# STEP 9: SAVE ANALYTICAL TABLE
# ------------------------------------------------------------------
def save_analytical_table(self): def save_analytical_table(self):
""" """
Simpan fact_asean_food_security_selected ke Gold layer. Simpan fact_asean_food_security_selected ke Gold layer.
@@ -594,44 +811,20 @@ class AnalyticalLayerLoader:
country_id, country_name — dimensi negara country_id, country_name — dimensi negara
indicator_id, indicator_name — dimensi indikator indicator_id, indicator_name — dimensi indikator
direction — arah penilaian (higher/lower_better) direction — arah penilaian (higher/lower_better)
framework — MDGs / SDGs (untuk filter Looker Studio) framework — MDGs/SDGs (ditentukan di Step 6)
pillar_id, pillar_name — dimensi pilar pillar_id, pillar_name — dimensi pilar
time_id, year — dimensi waktu time_id, year — dimensi waktu
value — nilai indikator value — nilai indikator
yoy_change — perubahan absolut YoY (NULLABLE: NULL di tahun pertama) yoy_change — perubahan absolut YoY (NULL di tahun pertama)
yoy_pct — perubahan relatif YoY dalam % (NULLABLE: NULL di tahun pertama) yoy_pct — perubahan relatif YoY dalam % (NULL di tahun pertama)
Kolom framework memungkinkan filter langsung di Looker Studio tanpa join ke dim_indicator.
Kolom yoy_change dan yoy_pct memungkinkan analisis tren tahunan langsung di Looker Studio.
""" """
table_name = 'fact_asean_food_security_selected' table_name = 'fact_asean_food_security_selected'
self.logger.info("\n" + "=" * 80) self.logger.info("\n" + "=" * 80)
self.logger.info(f"STEP 8: SAVE TO [DW/Gold] {table_name} -> fs_asean_gold") self.logger.info(f"STEP 9: SAVE TO [DW/Gold] {table_name} -> fs_asean_gold")
self.logger.info("=" * 80) self.logger.info("=" * 80)
try: try:
# Pastikan kolom framework tersedia di df_clean
if 'framework' not in self.df_clean.columns:
self.logger.warning(
" [WARN] Kolom 'framework' tidak ada di df_clean. "
"Melakukan join ke dim_indicator sebagai fallback..."
)
dim_ind = read_from_bigquery(self.client, 'dim_indicator', layer='gold')
if 'framework' in dim_ind.columns:
self.df_clean = self.df_clean.merge(
dim_ind[['indicator_id', 'framework']],
on='indicator_id', how='left'
)
self.df_clean['framework'] = self.df_clean['framework'].fillna('MDGs')
self.logger.info(" [OK] framework di-join dari dim_indicator")
else:
self.df_clean['framework'] = 'MDGs'
self.logger.warning(
" [WARN] dim_indicator juga tidak punya kolom framework. "
"Default: MDGs. Jalankan ulang pipeline dari cleaned_layer."
)
# Pastikan kolom YoY tersedia — fallback jika calculate_yoy() tidak dipanggil # Pastikan kolom YoY tersedia — fallback jika calculate_yoy() tidak dipanggil
if 'yoy_change' not in self.df_clean.columns or 'yoy_pct' not in self.df_clean.columns: if 'yoy_change' not in self.df_clean.columns or 'yoy_pct' not in self.df_clean.columns:
self.logger.warning( self.logger.warning(
@@ -659,11 +852,10 @@ class AnalyticalLayerLoader:
['year', 'country_name', 'pillar_name', 'indicator_name'] ['year', 'country_name', 'pillar_name', 'indicator_name']
).reset_index(drop=True) ).reset_index(drop=True)
# Pastikan tipe data konsisten
analytical_df['country_id'] = analytical_df['country_id'].astype(int) analytical_df['country_id'] = analytical_df['country_id'].astype(int)
analytical_df['country_name'] = analytical_df['country_name'].astype(str) analytical_df['country_name'] = analytical_df['country_name'].astype(str)
analytical_df['indicator_id'] = analytical_df['indicator_id'].astype(int) analytical_df['indicator_id'] = analytical_df['indicator_id'].astype(int)
analytical_df['indicator_name']= analytical_df['indicator_name'].astype(str) analytical_df['indicator_name'] = analytical_df['indicator_name'].astype(str)
analytical_df['direction'] = analytical_df['direction'].astype(str) analytical_df['direction'] = analytical_df['direction'].astype(str)
analytical_df['framework'] = analytical_df['framework'].astype(str) analytical_df['framework'] = analytical_df['framework'].astype(str)
analytical_df['pillar_id'] = analytical_df['pillar_id'].astype(int) analytical_df['pillar_id'] = analytical_df['pillar_id'].astype(int)
@@ -671,20 +863,17 @@ class AnalyticalLayerLoader:
analytical_df['time_id'] = analytical_df['time_id'].astype(int) analytical_df['time_id'] = analytical_df['time_id'].astype(int)
analytical_df['year'] = analytical_df['year'].astype(int) analytical_df['year'] = analytical_df['year'].astype(int)
analytical_df['value'] = analytical_df['value'].astype(float) analytical_df['value'] = analytical_df['value'].astype(float)
# yoy_change dan yoy_pct tetap float — NULL (NaN) di tahun pertama adalah intentional
analytical_df['yoy_change'] = analytical_df['yoy_change'].astype(float) analytical_df['yoy_change'] = analytical_df['yoy_change'].astype(float)
analytical_df['yoy_pct'] = analytical_df['yoy_pct'].astype(float) analytical_df['yoy_pct'] = analytical_df['yoy_pct'].astype(float)
self.logger.info(f" Kolom yang disimpan: {list(analytical_df.columns)}") self.logger.info(f" Kolom yang disimpan: {list(analytical_df.columns)}")
self.logger.info(f" Total rows: {len(analytical_df):,}") self.logger.info(f" Total rows: {len(analytical_df):,}")
# Log distribusi framework
fw_dist = analytical_df.drop_duplicates('indicator_id')['framework'].value_counts() fw_dist = analytical_df.drop_duplicates('indicator_id')['framework'].value_counts()
self.logger.info(f" Framework distribution (per indikator unik):") self.logger.info(f" Framework distribution (per indikator unik):")
for fw, cnt in fw_dist.items(): for fw, cnt in fw_dist.items():
self.logger.info(f" {fw}: {cnt} indicators") self.logger.info(f" {fw}: {cnt} indicators")
# Log statistik YoY
yoy_valid = analytical_df['yoy_pct'].notna().sum() yoy_valid = analytical_df['yoy_pct'].notna().sum()
yoy_null = analytical_df['yoy_pct'].isna().sum() yoy_null = analytical_df['yoy_pct'].isna().sum()
self.logger.info(f" YoY rows (calculated): {yoy_valid:,}") self.logger.info(f" YoY rows (calculated): {yoy_valid:,}")
@@ -702,7 +891,6 @@ class AnalyticalLayerLoader:
bigquery.SchemaField("time_id", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("time_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("value", "FLOAT", mode="REQUIRED"), bigquery.SchemaField("value", "FLOAT", mode="REQUIRED"),
# NULLABLE karena tahun pertama per country-indicator tidak memiliki nilai sebelumnya
bigquery.SchemaField("yoy_change", "FLOAT", mode="NULLABLE"), bigquery.SchemaField("yoy_change", "FLOAT", mode="NULLABLE"),
bigquery.SchemaField("yoy_pct", "FLOAT", mode="NULLABLE"), bigquery.SchemaField("yoy_pct", "FLOAT", mode="NULLABLE"),
] ]
@@ -727,17 +915,19 @@ class AnalyticalLayerLoader:
'config_snapshot' : json.dumps({ 'config_snapshot' : json.dumps({
'start_year' : self.start_year, 'start_year' : self.start_year,
'end_year' : self.end_year, 'end_year' : self.end_year,
'fixed_countries': len(self.selected_country_ids), 'sdg_start_year' : self.sdg_start_year,
'fixed_countries' : len(self.selected_country_ids),
'no_gaps' : True, 'no_gaps' : True,
'layer' : 'gold', 'layer' : 'gold',
'columns' : ( 'framework_logic' : (
'id + name + direction + framework + value + ' f"SDGs if in SDG_INDICATOR_KEYWORDS AND start_year >= {self.sdg_start_year}, "
'yoy_change + yoy_pct (Looker Studio ready)' "else MDGs"
) ),
}), }),
'validation_metrics' : json.dumps({ 'validation_metrics' : json.dumps({
'fixed_countries' : len(self.selected_country_ids), 'fixed_countries' : len(self.selected_country_ids),
'total_indicators': int(self.df_clean['indicator_id'].nunique()), 'total_indicators': int(self.df_clean['indicator_id'].nunique()),
'sdg_start_year' : self.sdg_start_year,
'framework_dist' : fw_dist.to_dict(), 'framework_dist' : fw_dist.to_dict(),
'yoy_rows_valid' : int(yoy_valid), 'yoy_rows_valid' : int(yoy_valid),
'yoy_rows_null' : int(yoy_null), 'yoy_rows_null' : int(yoy_null),
@@ -755,6 +945,10 @@ class AnalyticalLayerLoader:
self.logger.error(f"Error saving: {e}") self.logger.error(f"Error saving: {e}")
raise raise
# ------------------------------------------------------------------
# RUN
# ------------------------------------------------------------------
def run(self): def run(self):
self.pipeline_start = datetime.now() self.pipeline_start = datetime.now()
self.pipeline_metadata['start_time'] = self.pipeline_start self.pipeline_metadata['start_time'] = self.pipeline_start
@@ -763,6 +957,7 @@ class AnalyticalLayerLoader:
self.logger.info("Output: fact_asean_food_security_selected -> fs_asean_gold") self.logger.info("Output: fact_asean_food_security_selected -> fs_asean_gold")
self.logger.info("Kolom: country_id/name, indicator_id/name, direction, framework,") self.logger.info("Kolom: country_id/name, indicator_id/name, direction, framework,")
self.logger.info(" pillar_id/name, time_id, year, value, yoy_change, yoy_pct") self.logger.info(" pillar_id/name, time_id, year, value, yoy_change, yoy_pct")
self.logger.info(f"Framework: ditentukan dinamis berdasarkan SDG_START_YEAR (auto-detect)")
self.logger.info("=" * 80) self.logger.info("=" * 80)
self.load_source_data() self.load_source_data()
@@ -770,8 +965,9 @@ class AnalyticalLayerLoader:
self.filter_complete_indicators_per_country() self.filter_complete_indicators_per_country()
self.select_countries_with_all_pillars() self.select_countries_with_all_pillars()
self.filter_indicators_consistent_across_fixed_countries() self.filter_indicators_consistent_across_fixed_countries()
self.verify_no_gaps() self.determine_sdg_start_year() # Step 6: auto-detect SDG year & assign framework
self.calculate_yoy() # <-- Step 6b: hitung YoY self.verify_no_gaps() # Step 6c: verifikasi tidak ada gap
self.calculate_yoy() # Step 7: hitung YoY
self.analyze_indicator_availability_by_year() self.analyze_indicator_availability_by_year()
self.save_analytical_table() self.save_analytical_table()
@@ -783,9 +979,10 @@ class AnalyticalLayerLoader:
self.logger.info("=" * 80) self.logger.info("=" * 80)
self.logger.info(f" Duration : {duration:.2f}s") self.logger.info(f" Duration : {duration:.2f}s")
self.logger.info(f" Year Range : {self.start_year}-{self.end_year}") self.logger.info(f" Year Range : {self.start_year}-{self.end_year}")
self.logger.info(f" SDG Start Yr : {self.sdg_start_year}")
self.logger.info(f" Countries : {len(self.selected_country_ids)}") self.logger.info(f" Countries : {len(self.selected_country_ids)}")
self.logger.info(f" Indicators : {self.df_clean['indicator_id'].nunique()}") self.logger.info(f" Indicators : {self.df_clean['indicator_id'].nunique()}")
self.logger.info(f" Rows Loaded: {self.pipeline_metadata['rows_loaded']:,}") self.logger.info(f" Rows Loaded : {self.pipeline_metadata['rows_loaded']:,}")
# ============================================================================= # =============================================================================
@@ -810,7 +1007,9 @@ def run_analytical_layer():
if __name__ == "__main__": if __name__ == "__main__":
print("=" * 80) print("=" * 80)
print("BIGQUERY ANALYTICAL LAYER - DATA FILTERING")
print("Output: fact_asean_food_security_selected -> fs_asean_gold") print("Output: fact_asean_food_security_selected -> fs_asean_gold")
print("Framework: MDGs/SDGs ditentukan dinamis dari data (auto-detect SDG start year)")
print("=" * 80) print("=" * 80)
logger = setup_logging() logger = setup_logging()
@@ -820,4 +1019,6 @@ if __name__ == "__main__":
print("\n" + "=" * 80) print("\n" + "=" * 80)
print("[OK] COMPLETED") print("[OK] COMPLETED")
print(f" SDG Start Year : {loader.sdg_start_year}")
print(f" Rows Loaded : {loader.pipeline_metadata['rows_loaded']:,}")
print("=" * 80) print("=" * 80)

View File

@@ -62,7 +62,6 @@ COLUMN_CONSTRAINTS = {
'unit' : 20, 'unit' : 20,
'pillar' : 20, 'pillar' : 20,
'direction' : 15, 'direction' : 15,
'framework' : 5, # 'MDGs'=4, 'SDGs'=4
} }
@@ -292,62 +291,6 @@ def assign_direction(indicator_name: str) -> str:
return 'higher_better' return 'higher_better'
# =============================================================================
# FRAMEWORK CLASSIFICATION (MDGs vs SDGs)
# =============================================================================
# Daftar keyword eksplisit dari SDG Goal 2 Khusus FIES (2030 Agenda for Sustainable Development).
# Disimpan lowercase agar matching tidak sensitif terhadap kapitalisasi input.
SDG_INDICATOR_KEYWORDS = frozenset([
"prevalence of severe food insecurity in the total population (percent) (3-year average)",
"prevalence of severe food insecurity in the male adult population (percent) (3-year average)",
"prevalence of severe food insecurity in the female adult population (percent) (3-year average)",
"prevalence of moderate or severe food insecurity in the total population (percent) (3-year average)",
"prevalence of moderate or severe food insecurity in the male adult population (percent) (3-year average)",
"prevalence of moderate or severe food insecurity in the female adult population (percent) (3-year average)",
"number of severely food insecure people (million) (3-year average)",
"number of severely food insecure male adults (million) (3-year average)",
"number of severely food insecure female adults (million) (3-year average)",
"number of moderately or severely food insecure people (million) (3-year average)",
"number of moderately or severely food insecure male adults (million) (3-year average)",
"number of moderately or severely food insecure female adults (million) (3-year average)",
])
def assign_framework(indicator_name: str) -> str:
"""
Assign framework berdasarkan daftar eksplisit indikator SDG Goal 2
dari 2030 Agenda for Sustainable Development (versi Maret 2020).
Logika:
- Lowercase nama indikator input
- Cek apakah ada keyword SDG (lowercase) yang terkandung di dalam nama indikator
- Jika ya -> 'SDGs'
- Jika tidak -> 'MDGs' (indikator FAO/lama yang bukan SDG resmi)
FIX: Bug sebelumnya menggunakan `kw in ind` (cek apakah keyword ada di dalam ind),
padahal seharusnya `kw in ind` sudah benar secara logika — tapi keyword di-set
dengan kapitalisasi campuran sementara `ind` sudah di-lowercase, sehingga
perbandingan selalu gagal. Solusi: simpan keyword dalam lowercase di set,
sehingga `kw in ind` bekerja dengan benar.
Return values: 'MDGs' atau 'SDGs'
Panjang max 4 chars (dalam constraint varchar(5)).
"""
if pd.isna(indicator_name):
return 'MDGs'
# Lowercase input agar matching tidak sensitif terhadap kapitalisasi
ind = str(indicator_name).lower().strip()
# Cek apakah salah satu keyword SDG (sudah lowercase) ada di dalam ind
if any(kw in ind for kw in SDG_INDICATOR_KEYWORDS):
return 'SDGs'
return 'MDGs'
# ============================================================================= # =============================================================================
# CLEANED DATA LOADER # CLEANED DATA LOADER
# ============================================================================= # =============================================================================
@@ -365,7 +308,7 @@ class CleanedDataLoader:
1. Standardize country names (ASEAN) 1. Standardize country names (ASEAN)
2. Remove missing values 2. Remove missing values
3. Remove duplicates 3. Remove duplicates
4. Add pillar, direction & framework classification 4. Add pillar & direction classification
5. Apply column constraints 5. Apply column constraints
6. Load ke BigQuery 6. Load ke BigQuery
7. Log ke Audit layer 7. Log ke Audit layer
@@ -382,7 +325,6 @@ class CleanedDataLoader:
bigquery.SchemaField("unit", "STRING", mode="NULLABLE"), bigquery.SchemaField("unit", "STRING", mode="NULLABLE"),
bigquery.SchemaField("pillar", "STRING", mode="REQUIRED"), bigquery.SchemaField("pillar", "STRING", mode="REQUIRED"),
bigquery.SchemaField("direction", "STRING", mode="REQUIRED"), bigquery.SchemaField("direction", "STRING", mode="REQUIRED"),
bigquery.SchemaField("framework", "STRING", mode="REQUIRED"),
] ]
def __init__(self, client: bigquery.Client, load_mode: str = 'full_refresh'): def __init__(self, client: bigquery.Client, load_mode: str = 'full_refresh'):
@@ -449,12 +391,11 @@ class CleanedDataLoader:
return df_clean return df_clean
def _step_add_classifications(self, df: pd.DataFrame) -> pd.DataFrame: def _step_add_classifications(self, df: pd.DataFrame) -> pd.DataFrame:
print("\n [Step 4/5] Add pillar, direction & framework classification...") print("\n [Step 4/5] Add pillar & direction classification...")
df = df.copy() df = df.copy()
df['pillar'] = df['indicator_standardized'].apply(assign_pillar) df['pillar'] = df['indicator_standardized'].apply(assign_pillar)
df['direction'] = df['indicator_standardized'].apply(assign_direction) df['direction'] = df['indicator_standardized'].apply(assign_direction)
df['framework'] = df['indicator_standardized'].apply(assign_framework)
pillar_counts = df['pillar'].value_counts() pillar_counts = df['pillar'].value_counts()
print(f" Pillar distribution:") print(f" Pillar distribution:")
@@ -467,21 +408,6 @@ class CleanedDataLoader:
pct = count / len(df) * 100 pct = count / len(df) * 100
print(f" - {direction}: {count:,} ({pct:.1f}%)") print(f" - {direction}: {count:,} ({pct:.1f}%)")
framework_counts = df['framework'].value_counts()
print(f" Framework distribution:")
for fw, count in framework_counts.items():
pct = count / len(df) * 100
print(f" - {fw}: {count:,} ({pct:.1f}%)")
# Log indikator yang terklasifikasi SDGs untuk verifikasi
sdg_inds = (
df[df['framework'] == 'SDGs']['indicator_standardized']
.drop_duplicates().sort_values().tolist()
)
print(f"\n SDG indicators ({len(sdg_inds)}):")
for ind in sdg_inds:
print(f" - {ind}")
return df return df
def _step_apply_constraints(self, df: pd.DataFrame) -> pd.DataFrame: def _step_apply_constraints(self, df: pd.DataFrame) -> pd.DataFrame:
@@ -506,7 +432,7 @@ class CleanedDataLoader:
'max' : int(df['year'].max()) if not df['year'].isnull().all() else None, 'max' : int(df['year'].max()) if not df['year'].isnull().all() else None,
'unique_years': int(df['year'].nunique()) 'unique_years': int(df['year'].nunique())
} }
for col in ('pillar', 'direction', 'framework', 'source'): for col in ('pillar', 'direction', 'source'):
if col in df.columns: if col in df.columns:
validation[f'{col}_breakdown'] = { validation[f'{col}_breakdown'] = {
str(k): int(v) for k, v in df[col].value_counts().to_dict().items() str(k): int(v) for k, v in df[col].value_counts().to_dict().items()

View File

@@ -52,7 +52,7 @@ class DimensionalModelLoader:
Pipeline steps: Pipeline steps:
1. Load dim_country 1. Load dim_country
2. Load dim_indicator (+ kolom framework dari cleaned_integrated) 2. Load dim_indicator
3. Load dim_time 3. Load dim_time
4. Load dim_source 4. Load dim_source
5. Load dim_pillar 5. Load dim_pillar
@@ -313,7 +313,6 @@ class DimensionalModelLoader:
indicator_category — kategori (Health & Nutrition, dll.) indicator_category — kategori (Health & Nutrition, dll.)
unit — satuan ukuran unit — satuan ukuran
direction — higher_better / lower_better direction — higher_better / lower_better
framework — MDGs / SDGs <-- BARU: dibaca dari cleaned_integrated
""" """
table_name = 'dim_indicator' table_name = 'dim_indicator'
self.load_metadata[table_name]['start_time'] = datetime.now() self.load_metadata[table_name]['start_time'] = datetime.now()
@@ -323,7 +322,6 @@ class DimensionalModelLoader:
has_direction = 'direction' in self.df_clean.columns has_direction = 'direction' in self.df_clean.columns
has_unit = 'unit' in self.df_clean.columns has_unit = 'unit' in self.df_clean.columns
has_category = 'indicator_category' in self.df_clean.columns has_category = 'indicator_category' in self.df_clean.columns
has_framework = 'framework' in self.df_clean.columns
dim_indicator = self.df_clean[['indicator_standardized']].drop_duplicates().copy() dim_indicator = self.df_clean[['indicator_standardized']].drop_duplicates().copy()
dim_indicator.columns = ['indicator_name'] dim_indicator.columns = ['indicator_name']
@@ -381,29 +379,10 @@ class DimensionalModelLoader:
categorize_indicator categorize_indicator
) )
# Framework — KOLOM BARU
# Dibaca dari cleaned_integrated yang sudah menjalankan assign_framework().
# Jika kolom belum ada (misal pipeline lama), fallback ke 'MDGs' dengan warning.
if has_framework:
fw_map = self.df_clean[
['indicator_standardized', 'framework']
].drop_duplicates()
fw_map.columns = ['indicator_name', 'framework']
dim_indicator = dim_indicator.merge(fw_map, on='indicator_name', how='left')
# Pastikan tidak ada NULL setelah merge
dim_indicator['framework'] = dim_indicator['framework'].fillna('MDGs')
self.logger.info(" [OK] framework column from cleaned_integrated")
else:
dim_indicator['framework'] = 'MDGs'
self.logger.warning(
" [WARN] framework column not found in cleaned_integrated. "
"Default: MDGs. Jalankan bigquery_cleaned_layer.py terlebih dahulu."
)
dim_indicator = dim_indicator.drop_duplicates(subset=['indicator_name'], keep='first') dim_indicator = dim_indicator.drop_duplicates(subset=['indicator_name'], keep='first')
dim_indicator_final = dim_indicator[ dim_indicator_final = dim_indicator[
['indicator_name', 'indicator_category', 'unit', 'direction', 'framework'] ['indicator_name', 'indicator_category', 'unit', 'direction']
].copy() ].copy()
dim_indicator_final = dim_indicator_final.reset_index(drop=True) dim_indicator_final = dim_indicator_final.reset_index(drop=True)
dim_indicator_final.insert(0, 'indicator_id', range(1, len(dim_indicator_final) + 1)) dim_indicator_final.insert(0, 'indicator_id', range(1, len(dim_indicator_final) + 1))
@@ -414,7 +393,6 @@ class DimensionalModelLoader:
bigquery.SchemaField("indicator_category", "STRING", mode="REQUIRED"), bigquery.SchemaField("indicator_category", "STRING", mode="REQUIRED"),
bigquery.SchemaField("unit", "STRING", mode="NULLABLE"), bigquery.SchemaField("unit", "STRING", mode="NULLABLE"),
bigquery.SchemaField("direction", "STRING", mode="REQUIRED"), bigquery.SchemaField("direction", "STRING", mode="REQUIRED"),
bigquery.SchemaField("framework", "STRING", mode="REQUIRED"),
] ]
rows_loaded = load_to_bigquery( rows_loaded = load_to_bigquery(
@@ -427,7 +405,6 @@ class DimensionalModelLoader:
for label, col in [ for label, col in [
('Categories', 'indicator_category'), ('Categories', 'indicator_category'),
('Direction', 'direction'), ('Direction', 'direction'),
('Framework', 'framework'),
]: ]:
self.logger.info(f" {label}:") self.logger.info(f" {label}:")
for val, cnt in dim_indicator_final[col].value_counts().items(): for val, cnt in dim_indicator_final[col].value_counts().items():
@@ -766,22 +743,6 @@ class DimensionalModelLoader:
self.logger.info(f" Unique Sources : {int(stats['unique_sources']):>10,}") self.logger.info(f" Unique Sources : {int(stats['unique_sources']):>10,}")
self.logger.info(f" Unique Pillars : {int(stats['unique_pillars']):>10,}") self.logger.info(f" Unique Pillars : {int(stats['unique_pillars']):>10,}")
# Validasi distribusi framework di dim_indicator
query_fw = f"""
SELECT framework, COUNT(*) AS count
FROM `{get_table_id('dim_indicator', layer='gold')}`
GROUP BY framework ORDER BY framework
"""
df_fw = self.client.query(query_fw).result().to_dataframe(
create_bqstorage_client=False
)
if len(df_fw) > 0:
self.logger.info(f"\n Framework Distribution (dim_indicator):")
for _, row in df_fw.iterrows():
self.logger.info(
f" {row['framework']:10s}: {int(row['count']):>5,} indicators"
)
query_dir = f""" query_dir = f"""
SELECT direction, COUNT(*) AS count SELECT direction, COUNT(*) AS count
FROM `{get_table_id('dim_indicator', layer='gold')}` FROM `{get_table_id('dim_indicator', layer='gold')}`
@@ -906,10 +867,6 @@ if __name__ == "__main__":
print(f" Year range : {int(df_clean['year'].min())}-{int(df_clean['year'].max())}") print(f" Year range : {int(df_clean['year'].min())}-{int(df_clean['year'].max())}")
if 'direction' in df_clean.columns: if 'direction' in df_clean.columns:
print(f" Direction : {df_clean['direction'].value_counts().to_dict()}") print(f" Direction : {df_clean['direction'].value_counts().to_dict()}")
if 'framework' in df_clean.columns:
print(f" Framework : {df_clean['framework'].value_counts().to_dict()}")
else:
print(" [WARN] framework column not found — run bigquery_cleaned_layer.py first")
print("\n[1/1] Dimensional Model Load -> DW (Gold)...") print("\n[1/1] Dimensional Model Load -> DW (Gold)...")
loader = DimensionalModelLoader(client, df_clean) loader = DimensionalModelLoader(client, df_clean)
@@ -917,7 +874,7 @@ if __name__ == "__main__":
print("\n" + "=" * 60) print("\n" + "=" * 60)
print("[OK] DIMENSIONAL MODEL ETL COMPLETED") print("[OK] DIMENSIONAL MODEL ETL COMPLETED")
print(" DW (Gold) : dim_country, dim_indicator (+ framework),") print(" DW (Gold) : dim_country, dim_indicator, dim_time,")
print(" dim_time, dim_source, dim_pillar, fact_food_security") print(" dim_source, dim_pillar, fact_food_security")
print(" AUDIT : etl_logs, etl_metadata") print(" AUDIT : etl_logs, etl_metadata")
print("=" * 60) print("=" * 60)