Compare commits

..

36 Commits

Author SHA1 Message Date
Debby
cfb0df3a15 indonesian version column 2026-05-19 10:09:48 +07:00
Debby
4bab746779 create colomn indonesian text 2026-05-12 09:55:15 +07:00
Debby
f9d013f8e6 new narrative teks 2026-04-22 16:02:05 +07:00
Debby
40528766bd add metadata aggregate 2026-04-16 08:14:10 +07:00
Debby
74be63226a dag etl 2026-04-15 13:56:55 +07:00
Debby
76b451b2c1 schdule /3 bulan dan tambah metadata 2026-04-15 13:37:40 +07:00
Debby
fa2cf75634 ganti narasi 2026-04-07 23:10:34 +07:00
Debby
f13a76756f capek eror 2026-04-07 20:58:34 +07:00
Debby
e00e9c569d sum indicator problem solve 2026-04-07 20:04:52 +07:00
Debby
00cdf961a9 ulang 2026-04-07 19:14:10 +07:00
Debby
8aed670267 narrative indicator 2026-04-07 18:15:58 +07:00
Debby
327768cc01 delete mark 2026-04-07 11:48:41 +07:00
Debby
7ccc3ea35d rename pillar 2026-04-07 11:07:58 +07:00
Debby
933c370606 try again 2026-04-07 10:19:35 +07:00
Debby
0384e62b01 bismillah done 2026-04-07 10:00:45 +07:00
Debby
cebb6b88eb finish etl code 2026-04-06 16:37:05 +07:00
Debby
5313039b50 bismillah capekk 2026-04-03 08:40:30 +07:00
Debby
f652f2f730 agregate fact selected 2026-04-03 08:09:57 +07:00
Debby
d4bee86331 finish fact dan dim 2026-04-02 20:31:19 +07:00
Debby
47ea9c0492 code last done 2026-04-02 19:58:05 +07:00
Debby
6030268924 salah file 2026-04-02 17:43:31 +07:00
Debby
b54b276c63 done 1 2026-04-02 17:34:33 +07:00
Debby
ba4927f620 rename other to supporting 2026-04-02 07:54:23 +07:00
Debby
ffd8cdf65e year hardcode sdgs 2026-04-02 07:10:41 +07:00
Debby
189e8895c9 coba 1 2026-04-01 21:30:54 +07:00
Debby
b7cab36bd9 framework v2 2026-04-01 20:42:54 +07:00
Debby
d948819535 framework v1 2026-04-01 20:33:16 +07:00
Debby
c3b7674001 keawal 2026-04-01 15:58:59 +07:00
Debby
6a55a91112 code final 2026-04-01 15:46:20 +07:00
Debby
0f93ff6ecd try1 2026-04-01 08:29:18 +07:00
Debby
db60e6e414 sdgs era v5 2026-04-01 08:04:19 +07:00
Debby
236d4b4dc8 sdgs year v4 2026-04-01 07:43:31 +07:00
Debby
64e3095e7a sdgs year v3 2026-04-01 07:13:07 +07:00
Debby
8ae5018a62 sdgs year v2 2026-03-31 23:38:15 +07:00
Debby
0d89c60b12 sdgs year v1 2026-03-31 23:20:54 +07:00
Debby
beb494f89c sdg start year and label condition 2026-03-31 15:42:11 +07:00
6 changed files with 3132 additions and 1568 deletions

View File

@@ -22,6 +22,8 @@ Kimball ETL Flow:
│ agg_pillar_by_country │ │ agg_pillar_by_country │
│ agg_framework_by_country │ │ agg_framework_by_country │
│ agg_framework_asean │ │ agg_framework_asean │
│ ↓ │
│ agg_indicator_norm │
│ │ │ │
│ AUDIT : etl_logs, etl_metadata (setiap layer) │ │ AUDIT : etl_logs, etl_metadata (setiap layer) │
└──────────────────────────────────────────────────────────────────────────┘ └──────────────────────────────────────────────────────────────────────────┘
@@ -36,6 +38,7 @@ Task Order:
→ dimensional_model_to_gold → dimensional_model_to_gold
→ analytical_layer_to_gold → analytical_layer_to_gold
→ aggregation_to_gold → aggregation_to_gold
→ indicator_norm_aggregation_to_gold
Scripts folder harus berisi: Scripts folder harus berisi:
- bigquery_raw_layer.py (run_verify_connection, run_load_fao, ...) - bigquery_raw_layer.py (run_verify_connection, run_load_fao, ...)
@@ -43,6 +46,7 @@ Scripts folder harus berisi:
- bigquery_dimensional_model.py (run_dimensional_model) - bigquery_dimensional_model.py (run_dimensional_model)
- bigquery_analytical_layer.py (run_analytical_layer) - bigquery_analytical_layer.py (run_analytical_layer)
- bigquery_analysis_aggregation.py (run_aggregation) - bigquery_analysis_aggregation.py (run_aggregation)
- bigquery_aggraget_fact_selected_layer.py (run_indicator_norm_aggregation)
- bigquery_config.py - bigquery_config.py
- bigquery_helpers.py - bigquery_helpers.py
- bigquery_datasource.py - bigquery_datasource.py
@@ -71,11 +75,14 @@ from scripts.bigquery_analytical_layer import (
from scripts.bigquery_aggregate_layer import ( from scripts.bigquery_aggregate_layer import (
run_aggregation, run_aggregation,
) )
from scripts.bigquery_aggraget_fact_selected_layer import (
run_indicator_norm_aggregation,
)
# DEFAULT ARGS # DEFAULT ARGS
default_args = { default_args = {
'owner': 'data-engineering', 'owner': 'Debby Seftia',
'email': ['d1041221004@student.untan.ac.id'], 'email': ['d1041221004@student.untan.ac.id'],
} }
@@ -86,7 +93,7 @@ with DAG(
description = "Kimball ETL: FAO, World Bank, UNICEF → BigQuery (Bronze → Silver → Gold)", description = "Kimball ETL: FAO, World Bank, UNICEF → BigQuery (Bronze → Silver → Gold)",
default_args = default_args, default_args = default_args,
start_date = datetime(2026, 3, 1), start_date = datetime(2026, 3, 1),
schedule_interval = "0 0 */3 * *", schedule_interval = "0 0 1 */3 *",
catchup = False, catchup = False,
tags = ["food-security", "bigquery", "kimball"], tags = ["food-security", "bigquery", "kimball"],
) as dag: ) as dag:
@@ -136,5 +143,21 @@ with DAG(
python_callable = run_aggregation python_callable = run_aggregation
) )
task_indicator_norm = PythonOperator(
task_id = "indicator_norm_aggregation_to_gold",
python_callable = run_indicator_norm_aggregation
)
task_verify >> task_fao >> task_worldbank >> task_unicef >> task_staging >> task_cleaned >> task_dimensional >> task_analytical >> task_aggregation # Task Dependencies
(
task_verify
>> task_fao
>> task_worldbank
>> task_unicef
>> task_staging
>> task_cleaned
>> task_dimensional
>> task_analytical
>> task_aggregation
>> task_indicator_norm
)

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -1,6 +1,6 @@
""" """
BIGQUERY ANALYTICAL LAYER - DATA FILTERING BIGQUERY ANALYTICAL LAYER - DATA FILTERING
fact_asean_food_security_selected disimpan di fs_asean_gold (layer='gold') FIXED: fact_asean_food_security_selected disimpan di fs_asean_gold (layer='gold')
Filtering Order: Filtering Order:
1. Load data (single years only) 1. Load data (single years only)
@@ -8,20 +8,9 @@ Filtering Order:
3. Filter complete indicators PER COUNTRY (auto-detect start year, no gaps) 3. Filter complete indicators PER COUNTRY (auto-detect start year, no gaps)
4. Filter countries with ALL pillars (FIXED SET) 4. Filter countries with ALL pillars (FIXED SET)
5. Filter indicators with consistent presence across FIXED countries 5. Filter indicators with consistent presence across FIXED countries
6. Determine SDGs start year & assign framework (MDGs/SDGs) per indicator 6. Save analytical table (dengan nama/label lengkap untuk Looker Studio)
7. Calculate YoY per indicator per country
8. Analyze indicator availability by year
9. Save analytical table (dengan nama/label lengkap + kolom framework + YoY untuk Looker Studio)
FRAMEWORK LOGIC: ADDED: Kolom indicator_name_id dan pillar_name_id (terjemahan Bahasa Indonesia)
- SDG_START_YEAR = 2016 (default; auto-detect jika indikator SDGs pertama kali muncul lebih awal/lambat)
- Indikator yang namanya ada di SDG_INDICATOR_KEYWORDS:
* Jika data mulai >= SDG_START_YEAR -> 'SDGs'
* Jika data mulai < SDG_START_YEAR -> 'MDGs'
(artinya indikator ini sudah ada sebelum SDGs, mis. undernourishment)
- Indikator yang namanya TIDAK ada di SDG_INDICATOR_KEYWORDS -> 'MDGs'
- Penentuan framework dilakukan SETELAH filter selesai (data sudah bersih & range sudah fixed)
sehingga start_year per indikator yang digunakan adalah start_year AKTUAL di dataset ini.
""" """
import pandas as pd import pandas as pd
@@ -48,78 +37,173 @@ from google.cloud import bigquery
# ============================================================================= # =============================================================================
# SDG INDICATOR KEYWORDS # TRANSLATION DICTIONARIES
# ============================================================================= # =============================================================================
# Daftar nama indikator (lowercase) yang termasuk dalam SDG Goal 2.
# Matching dilakukan dengan `kw in indicator_name.lower()` sehingga
# partial match tetap valid (menangani variasi format nama).
#
# Logika framework:
# - Nama ada di set ini + start_year >= SDG_START_YEAR -> 'SDGs'
# - Nama ada di set ini + start_year < SDG_START_YEAR -> 'MDGs'
# (indikator sudah eksis sebelum SDGs, mis. prevalence of undernourishment)
# - Nama TIDAK ada di set ini -> 'MDGs'
SDG_INDICATOR_KEYWORDS = frozenset([ PILLAR_TRANSLATION_ID: dict = {
# TARGET 2.1.1 — Prevalence of undernourishment (shared, sudah ada sebelum SDGs) # 4 pilar utama Food Security
"prevalence of undernourishment (percent) (3-year average)", "Availability" : "Ketersediaan",
"number of people undernourished (million) (3-year average)", "Access" : "Keterjangkauan",
# TARGET 2.1.2 — FIES (SDGs only) "Utilization" : "Pemanfaatan",
"prevalence of severe food insecurity in the total population (percent) (3-year average)", "Stability" : "Stabilitas",
"prevalence of severe food insecurity in the male adult population (percent) (3-year average)", # Variasi penulisan yang mungkin muncul
"prevalence of severe food insecurity in the female adult population (percent) (3-year average)", "availability" : "Ketersediaan",
"prevalence of moderate or severe food insecurity in the total population (percent) (3-year average)", "access" : "Keterjangkauan",
"prevalence of moderate or severe food insecurity in the male adult population (percent) (3-year average)", "utilization" : "Pemanfaatan",
"prevalence of moderate or severe food insecurity in the female adult population (percent) (3-year average)", "stability" : "Stabilitas",
"number of severely food insecure people (million) (3-year average)", "Food Availability" : "Ketersediaan Pangan",
"number of severely food insecure male adults (million) (3-year average)", "Food Access" : "Keterjangkauan Pangan",
"number of severely food insecure female adults (million) (3-year average)", "Food Utilization" : "Pemanfaatan Pangan",
"number of moderately or severely food insecure people (million) (3-year average)", "Food Stability" : "Stabilitas Pangan",
"number of moderately or severely food insecure male adults (million) (3-year average)", }
"number of moderately or severely food insecure female adults (million) (3-year average)",
# TARGET 2.2.1 — Stunting (shared)
"percentage of children under 5 years of age who are stunted (modelled estimates) (percent)",
"number of children under 5 years of age who are stunted (modeled estimates) (million)",
# TARGET 2.2.2 — Wasting & Overweight (shared)
"percentage of children under 5 years affected by wasting (percent)",
"number of children under 5 years affected by wasting (million)",
"percentage of children under 5 years of age who are overweight (modelled estimates) (percent)",
"number of children under 5 years of age who are overweight (modeled estimates) (million)",
# TARGET 2.2.3 — Anaemia (SDGs only)
"prevalence of anemia among women of reproductive age (15-49 years) (percent)",
"number of women of reproductive age (15-49 years) affected by anemia (million)",
])
# Tahun resmi SDGs mulai berlaku (2030 Agenda adopted September 2015, INDICATOR_TRANSLATION_ID: dict = {
# data reporting mulai 2016). Dipakai sebagai default jika auto-detect gagal. # -------------------------------------------------------------------------
SDG_START_YEAR_DEFAULT = 2016 # AVAILABILITY
# -------------------------------------------------------------------------
"Average dietary energy supply adequacy (percent) (3-year average)":
"Kecukupan rata-rata pasokan energi makanan (persen) (rata-rata 3 tahun)",
"Average value of food production (constant 2014-2016 thousand US$) (3-year average)":
"Nilai rata-rata produksi pangan (ribu US$ konstan 2014-2016) (rata-rata 3 tahun)",
"Share of dietary energy supply derived from cereals, roots and tubers (percent) (3-year average)":
"Proporsi pasokan energi makanan dari serealia, akar, dan umbi-umbian (persen) (rata-rata 3 tahun)",
"Average protein supply (g/cap/day) (3-year average)":
"Rata-rata pasokan protein (g/kapita/hari) (rata-rata 3 tahun)",
"Average supply of protein of animal origin (g/cap/day) (3-year average)":
"Rata-rata pasokan protein hewani (g/kapita/hari) (rata-rata 3 tahun)",
"Cereal import dependency ratio (percent) (3-year average)":
"Rasio ketergantungan impor sereal (persen) (rata-rata 3 tahun)",
"Percent of arable land equipped for irrigation (percent) (3-year average)":
"Persentase lahan pertanian yang dilengkapi irigasi (persen) (rata-rata 3 tahun)",
"Crop production index (2014-2016 = 100)":
"Indeks produksi tanaman pangan (2014-2016 = 100)",
"Livestock production index (2014-2016 = 100)":
"Indeks produksi peternakan (2014-2016 = 100)",
"Value of food imports over total merchandise exports (percent) (3-year average)":
"Nilai impor pangan terhadap total ekspor barang (persen) (rata-rata 3 tahun)",
"Food production variability (constant 2014-2016 thousand US$ per capita)":
"Variabilitas produksi pangan (ribu US$ konstan 2014-2016 per kapita)",
"Food supply variability (kcal/cap/day)":
"Variabilitas pasokan pangan (kkal/kapita/hari)",
# -------------------------------------------------------------------------
# ACCESS
# -------------------------------------------------------------------------
"Gross domestic product per capita, PPP (constant 2017 international $)":
"Produk domestik bruto per kapita, PPP (internasional konstan 2017 US$)",
"Domestic food price level index (2015 = 1.00)":
"Indeks tingkat harga pangan domestik (2015 = 1,00)",
"Domestic food price volatility index":
"Indeks volatilitas harga pangan domestik",
"Prevalence of undernourishment (percent) (3-year average)":
"Prevalensi kekurangan gizi (persen) (rata-rata 3 tahun)",
"Number of people undernourished (million) (3-year average)":
"Jumlah penduduk kekurangan gizi (juta jiwa) (rata-rata 3 tahun)",
"Depth of the food deficit (kcal/capita/day) (3-year average)":
"Kedalaman defisit pangan (kkal/kapita/hari) (rata-rata 3 tahun)",
"Percentage of population using at least basic drinking water services (percent)":
"Persentase penduduk yang menggunakan layanan air minum dasar (persen)",
"Percentage of population using safely managed drinking water services (percent)":
"Persentase penduduk yang menggunakan layanan air minum yang dikelola dengan aman (persen)",
"Percentage of population using at least basic sanitation services (percent)":
"Persentase penduduk yang menggunakan layanan sanitasi dasar (persen)",
"Percentage of population using safely managed sanitation services (percent)":
"Persentase penduduk yang menggunakan layanan sanitasi yang dikelola dengan aman (persen)",
"Access to electricity (percent of rural population)":
"Akses listrik (persen penduduk pedesaan)",
"Proportion of population with access to electricity (percent)":
"Proporsi penduduk dengan akses listrik (persen)",
"Road infrastructure index":
"Indeks infrastruktur jalan",
"Rail lines density (total route-km per 100 square km of land area)":
"Kepadatan jalur kereta api (total rute-km per 100 km2 lahan)",
"Gross national income per capita (Atlas method, current US$)":
"Pendapatan nasional bruto per kapita (metode Atlas, US$ terkini)",
"Food Insecurity Experience Scale (FIES)":
"Skala Pengalaman Ketidakamanan Pangan (FIES)",
# -------------------------------------------------------------------------
# UTILIZATION
# -------------------------------------------------------------------------
"Prevalence of severe food insecurity in the total population (percent) (3-year average)":
"Prevalensi kerawanan pangan berat pada total penduduk (persen) (rata-rata 3 tahun)",
"Prevalence of severe food insecurity in the male adult population (percent) (3-year average)":
"Prevalensi kerawanan pangan berat pada penduduk laki-laki dewasa (persen) (rata-rata 3 tahun)",
"Prevalence of severe food insecurity in the female adult population (percent) (3-year average)":
"Prevalensi kerawanan pangan berat pada penduduk perempuan dewasa (persen) (rata-rata 3 tahun)",
"Prevalence of moderate or severe food insecurity in the total population (percent) (3-year average)":
"Prevalensi kerawanan pangan sedang atau berat pada total penduduk (persen) (rata-rata 3 tahun)",
"Prevalence of moderate or severe food insecurity in the male adult population (percent) (3-year average)":
"Prevalensi kerawanan pangan sedang atau berat pada penduduk laki-laki dewasa (persen) (rata-rata 3 tahun)",
"Prevalence of moderate or severe food insecurity in the female adult population (percent) (3-year average)":
"Prevalensi kerawanan pangan sedang atau berat pada penduduk perempuan dewasa (persen) (rata-rata 3 tahun)",
"Number of severely food insecure people (million) (3-year average)":
"Jumlah penduduk yang mengalami kerawanan pangan berat (juta jiwa) (rata-rata 3 tahun)",
"Number of severely food insecure male adults (million) (3-year average)":
"Jumlah laki-laki dewasa yang mengalami kerawanan pangan berat (juta jiwa) (rata-rata 3 tahun)",
"Number of severely food insecure female adults (million) (3-year average)":
"Jumlah perempuan dewasa yang mengalami kerawanan pangan berat (juta jiwa) (rata-rata 3 tahun)",
"Number of moderately or severely food insecure people (million) (3-year average)":
"Jumlah penduduk yang mengalami kerawanan pangan sedang atau berat (juta jiwa) (rata-rata 3 tahun)",
"Number of moderately or severely food insecure male adults (million) (3-year average)":
"Jumlah laki-laki dewasa yang mengalami kerawanan pangan sedang atau berat (juta jiwa) (rata-rata 3 tahun)",
"Number of moderately or severely food insecure female adults (million) (3-year average)":
"Jumlah perempuan dewasa yang mengalami kerawanan pangan sedang atau berat (juta jiwa) (rata-rata 3 tahun)",
"Percentage of children under 5 years of age who are stunted (modelled estimates) (percent)":
"Persentase anak di bawah 5 tahun yang mengalami stunting (estimasi model) (persen)",
"Number of children under 5 years of age who are stunted (modeled estimates) (million)":
"Jumlah anak di bawah 5 tahun yang mengalami stunting (estimasi model) (juta jiwa)",
"Percentage of children under 5 years affected by wasting (percent)":
"Persentase anak di bawah 5 tahun yang mengalami wasting (persen)",
"Number of children under 5 years affected by wasting (million)":
"Jumlah anak di bawah 5 tahun yang mengalami wasting (juta jiwa)",
"Percentage of children under 5 years of age who are overweight (modelled estimates) (percent)":
"Persentase anak di bawah 5 tahun yang mengalami kelebihan berat badan (estimasi model) (persen)",
"Number of children under 5 years of age who are overweight (modeled estimates) (million)":
"Jumlah anak di bawah 5 tahun yang mengalami kelebihan berat badan (estimasi model) (juta jiwa)",
"Prevalence of anemia among women of reproductive age (15-49 years) (percent)":
"Prevalensi anemia pada perempuan usia reproduksi (15-49 tahun) (persen)",
"Number of women of reproductive age (15-49 years) affected by anemia (million)":
"Jumlah perempuan usia reproduksi (15-49 tahun) yang menderita anemia (juta jiwa)",
"Prevalence of obesity in the adult population (18 years and older) (percent)":
"Prevalensi obesitas pada penduduk dewasa (18 tahun ke atas) (persen)",
"Prevalence of exclusive breastfeeding among infants 0-5 months of age (percent)":
"Prevalensi pemberian ASI eksklusif pada bayi usia 0-5 bulan (persen)",
"Minimum dietary diversity for women (MDD-W) (percent)":
"Keragaman pola makan minimum untuk perempuan (MDD-W) (persen)",
# -------------------------------------------------------------------------
# STABILITY
# -------------------------------------------------------------------------
"Cereal import dependency ratio (percent)":
"Rasio ketergantungan impor sereal (persen)",
"Political stability and absence of violence/terrorism (index)":
"Stabilitas politik dan tidak adanya kekerasan/terorisme (indeks)",
"Domestic food price volatility":
"Volatilitas harga pangan domestik",
"Per capita food supply variability (kcal/cap/day)":
"Variabilitas pasokan pangan per kapita (kkal/kapita/hari)",
"Percentage of arable land equipped for irrigation (percent)":
"Persentase lahan pertanian yang dilengkapi irigasi (persen)",
"GDP per capita growth (annual %)":
"Pertumbuhan PDB per kapita (% tahunan)",
"GDP growth (annual %)":
"Pertumbuhan PDB (% tahunan)",
}
def assign_framework_dynamic( def translate_indicator(name: str) -> str:
indicator_name: str, """Terjemahkan nama indikator ke Bahasa Indonesia. Fallback ke nama asli."""
indicator_start_year: int, if not name:
sdg_start_year: int, return name
) -> str: return INDICATOR_TRANSLATION_ID.get(name, name)
"""
Tentukan framework (MDGs/SDGs) berdasarkan:
1. Apakah nama indikator ada di SDG_INDICATOR_KEYWORDS?
2. Apakah data indikator ini mulai pada tahun >= sdg_start_year?
Args:
indicator_name : Nama indikator (akan di-lowercase untuk matching)
indicator_start_year : Tahun pertama data indikator ini tersedia di dataset
sdg_start_year : Tahun mulai SDGs (dari auto-detect atau default)
Returns: def translate_pillar(name: str) -> str:
'SDGs' jika indikator termasuk SDG list DAN mulai >= sdg_start_year """Terjemahkan nama pillar ke Bahasa Indonesia. Fallback ke nama asli."""
'MDGs' untuk semua kasus lainnya if not name:
""" return name
ind_lower = str(indicator_name).lower().strip() return PILLAR_TRANSLATION_ID.get(name, name)
is_sdg_name = any(kw in ind_lower for kw in SDG_INDICATOR_KEYWORDS)
if is_sdg_name and indicator_start_year >= sdg_start_year:
return 'SDGs'
return 'MDGs'
# ============================================================================= # =============================================================================
@@ -134,18 +218,13 @@ class AnalyticalLayerLoader:
1. Complete per country (no gaps from start_year to end_year) 1. Complete per country (no gaps from start_year to end_year)
2. Filter countries with all pillars 2. Filter countries with all pillars
3. Ensure indicators have consistent country count across all years 3. Ensure indicators have consistent country count across all years
4. Determine SDGs start year & assign framework per indicator dynamically 4. Save dengan kolom lengkap (nama + ID + nama Indonesia) untuk Looker Studio
5. Calculate YoY (year-over-year) change per indicator per country
6. Save dengan kolom lengkap (nama + ID + framework + YoY) untuk Looker Studio
Output: fact_asean_food_security_selected -> DW layer (Gold) -> fs_asean_gold Output: fact_asean_food_security_selected -> DW layer (Gold) -> fs_asean_gold
Kolom output: Kolom tambahan:
country_id, country_name, - indicator_name_id : terjemahan Bahasa Indonesia dari indicator_name
indicator_id, indicator_name, direction, framework, - pillar_name_id : terjemahan Bahasa Indonesia dari pillar_name
pillar_id, pillar_name,
time_id, year, value,
yoy_change, yoy_pct
""" """
def __init__(self, client: bigquery.Client): def __init__(self, client: bigquery.Client):
@@ -164,9 +243,6 @@ class AnalyticalLayerLoader:
self.end_year = None self.end_year = None
self.baseline_year = 2023 self.baseline_year = 2023
# SDGs-related — di-set oleh determine_sdg_start_year()
self.sdg_start_year = SDG_START_YEAR_DEFAULT
self.pipeline_metadata = { self.pipeline_metadata = {
'source_class' : self.__class__.__name__, 'source_class' : self.__class__.__name__,
'start_time' : None, 'start_time' : None,
@@ -181,18 +257,12 @@ class AnalyticalLayerLoader:
self.pipeline_start = None self.pipeline_start = None
self.pipeline_end = None self.pipeline_end = None
# ------------------------------------------------------------------
# STEP 1: LOAD SOURCE DATA
# ------------------------------------------------------------------
def load_source_data(self): def load_source_data(self):
self.logger.info("\n" + "=" * 80) self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 1: LOADING SOURCE DATA from fs_asean_gold") self.logger.info("STEP 1: LOADING SOURCE DATA from fs_asean_gold")
self.logger.info("=" * 80) self.logger.info("=" * 80)
try: try:
# Tidak include framework dari dim_indicator —
# framework akan ditentukan dinamis di Step 6 (determine_sdg_start_year)
query = f""" query = f"""
SELECT SELECT
f.country_id, f.country_id,
@@ -217,20 +287,14 @@ class AnalyticalLayerLoader:
""" """
self.logger.info("Loading fact table with dimensions...") self.logger.info("Loading fact table with dimensions...")
self.df_clean = self.client.query(query).result().to_dataframe( self.df_clean = self.client.query(query).result().to_dataframe(create_bqstorage_client=False)
create_bqstorage_client=False
)
self.logger.info(f" Loaded: {len(self.df_clean):,} rows") self.logger.info(f" Loaded: {len(self.df_clean):,} rows")
if 'is_year_range' in self.df_clean.columns: if 'is_year_range' in self.df_clean.columns:
yr = self.df_clean['is_year_range'].value_counts() yr = self.df_clean['is_year_range'].value_counts()
self.logger.info(f" Breakdown:") self.logger.info(f" Breakdown:")
self.logger.info( self.logger.info(f" Single years (is_year_range=False): {yr.get(False, 0):,}")
f" Single years (is_year_range=False): {yr.get(False, 0):,}" self.logger.info(f" Year ranges (is_year_range=True): {yr.get(True, 0):,}")
)
self.logger.info(
f" Year ranges (is_year_range=True): {yr.get(True, 0):,}"
)
self.df_indicator = read_from_bigquery(self.client, 'dim_indicator', layer='gold') self.df_indicator = read_from_bigquery(self.client, 'dim_indicator', layer='gold')
self.df_country = read_from_bigquery(self.client, 'dim_country', layer='gold') self.df_country = read_from_bigquery(self.client, 'dim_country', layer='gold')
@@ -247,10 +311,6 @@ class AnalyticalLayerLoader:
self.logger.error(f"Error loading source data: {e}") self.logger.error(f"Error loading source data: {e}")
raise raise
# ------------------------------------------------------------------
# STEP 2: DETERMINE YEAR BOUNDARIES
# ------------------------------------------------------------------
def determine_year_boundaries(self): def determine_year_boundaries(self):
self.logger.info("\n" + "=" * 80) self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 2: DETERMINE YEAR BOUNDARIES") self.logger.info("STEP 2: DETERMINE YEAR BOUNDARIES")
@@ -293,10 +353,6 @@ class AnalyticalLayerLoader:
self.logger.info(f" Rows after: {len(self.df_clean):,}") self.logger.info(f" Rows after: {len(self.df_clean):,}")
return self.df_clean return self.df_clean
# ------------------------------------------------------------------
# STEP 3: FILTER COMPLETE INDICATORS PER COUNTRY
# ------------------------------------------------------------------
def filter_complete_indicators_per_country(self): def filter_complete_indicators_per_country(self):
self.logger.info("\n" + "=" * 80) self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 3: FILTER COMPLETE INDICATORS PER COUNTRY (NO GAPS)") self.logger.info("STEP 3: FILTER COMPLETE INDICATORS PER COUNTRY (NO GAPS)")
@@ -349,14 +405,9 @@ class AnalyticalLayerLoader:
self.logger.info(f" [-] Removed: {len(removed_combinations):,}") self.logger.info(f" [-] Removed: {len(removed_combinations):,}")
df_valid = pd.DataFrame(valid_combinations) df_valid = pd.DataFrame(valid_combinations)
df_valid['key'] = ( df_valid['key'] = df_valid['country_id'].astype(str) + '_' + df_valid['indicator_id'].astype(str)
df_valid['country_id'].astype(str) + '_' + self.df_clean['key'] = (self.df_clean['country_id'].astype(str) + '_' +
df_valid['indicator_id'].astype(str) self.df_clean['indicator_id'].astype(str))
)
self.df_clean['key'] = (
self.df_clean['country_id'].astype(str) + '_' +
self.df_clean['indicator_id'].astype(str)
)
original_count = len(self.df_clean) original_count = len(self.df_clean)
self.df_clean = self.df_clean[self.df_clean['key'].isin(df_valid['key'])].copy() self.df_clean = self.df_clean[self.df_clean['key'].isin(df_valid['key'])].copy()
@@ -368,10 +419,6 @@ class AnalyticalLayerLoader:
self.logger.info(f" Indicators: {self.df_clean['indicator_id'].nunique()}") self.logger.info(f" Indicators: {self.df_clean['indicator_id'].nunique()}")
return self.df_clean return self.df_clean
# ------------------------------------------------------------------
# STEP 4: SELECT COUNTRIES WITH ALL PILLARS
# ------------------------------------------------------------------
def select_countries_with_all_pillars(self): def select_countries_with_all_pillars(self):
self.logger.info("\n" + "=" * 80) self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 4: SELECT COUNTRIES WITH ALL PILLARS (FIXED SET)") self.logger.info("STEP 4: SELECT COUNTRIES WITH ALL PILLARS (FIXED SET)")
@@ -394,26 +441,18 @@ class AnalyticalLayerLoader:
f"{row['pillar_count']}/{total_pillars} pillars" f"{row['pillar_count']}/{total_pillars} pillars"
) )
selected_countries = country_pillar_count[ selected_countries = country_pillar_count[country_pillar_count['pillar_count'] == total_pillars]
country_pillar_count['pillar_count'] == total_pillars
]
self.selected_country_ids = selected_countries['country_id'].tolist() self.selected_country_ids = selected_countries['country_id'].tolist()
self.logger.info(f"\n FIXED SET: {len(self.selected_country_ids)} countries") self.logger.info(f"\n FIXED SET: {len(self.selected_country_ids)} countries")
original_count = len(self.df_clean) original_count = len(self.df_clean)
self.df_clean = self.df_clean[ self.df_clean = self.df_clean[self.df_clean['country_id'].isin(self.selected_country_ids)].copy()
self.df_clean['country_id'].isin(self.selected_country_ids)
].copy()
self.logger.info(f" Rows before: {original_count:,}") self.logger.info(f" Rows before: {original_count:,}")
self.logger.info(f" Rows after: {len(self.df_clean):,}") self.logger.info(f" Rows after: {len(self.df_clean):,}")
return self.df_clean return self.df_clean
# ------------------------------------------------------------------
# STEP 5: FILTER INDICATORS CONSISTENT ACROSS FIXED COUNTRIES
# ------------------------------------------------------------------
def filter_indicators_consistent_across_fixed_countries(self): def filter_indicators_consistent_across_fixed_countries(self):
self.logger.info("\n" + "=" * 80) self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 5: FILTER INDICATORS WITH CONSISTENT PRESENCE") self.logger.info("STEP 5: FILTER INDICATORS WITH CONSISTENT PRESENCE")
@@ -422,9 +461,7 @@ class AnalyticalLayerLoader:
indicator_country_start = self.df_clean.groupby([ indicator_country_start = self.df_clean.groupby([
'indicator_id', 'indicator_name', 'country_id' 'indicator_id', 'indicator_name', 'country_id'
])['year'].min().reset_index() ])['year'].min().reset_index()
indicator_country_start.columns = [ indicator_country_start.columns = ['indicator_id', 'indicator_name', 'country_id', 'start_year']
'indicator_id', 'indicator_name', 'country_id', 'start_year'
]
indicator_max_start = indicator_country_start.groupby([ indicator_max_start = indicator_country_start.groupby([
'indicator_id', 'indicator_name' 'indicator_id', 'indicator_name'
@@ -463,9 +500,7 @@ class AnalyticalLayerLoader:
else: else:
removed_indicators.append({ removed_indicators.append({
'indicator_name': indicator_name, 'indicator_name': indicator_name,
'reason' : ( 'reason' : f"missing countries in years: {', '.join(problematic_years[:5])}"
f"missing countries in years: {', '.join(problematic_years[:5])}"
)
}) })
self.logger.info(f"\n [+] Valid: {len(valid_indicators)}") self.logger.info(f"\n [+] Valid: {len(valid_indicators)}")
@@ -475,17 +510,12 @@ class AnalyticalLayerLoader:
raise ValueError("No valid indicators found after filtering!") raise ValueError("No valid indicators found after filtering!")
original_count = len(self.df_clean) original_count = len(self.df_clean)
self.df_clean = self.df_clean[ self.df_clean = self.df_clean[self.df_clean['indicator_id'].isin(valid_indicators)].copy()
self.df_clean['indicator_id'].isin(valid_indicators)
].copy()
self.df_clean = self.df_clean.merge( self.df_clean = self.df_clean.merge(
indicator_max_start[['indicator_id', 'max_start_year']], indicator_max_start[['indicator_id', 'max_start_year']], on='indicator_id', how='left'
on='indicator_id', how='left'
) )
self.df_clean = self.df_clean[ self.df_clean = self.df_clean[self.df_clean['year'] >= self.df_clean['max_start_year']].copy()
self.df_clean['year'] >= self.df_clean['max_start_year']
].copy()
self.df_clean = self.df_clean.drop('max_start_year', axis=1) self.df_clean = self.df_clean.drop('max_start_year', axis=1)
self.logger.info(f"\n Rows before: {original_count:,}") self.logger.info(f"\n Rows before: {original_count:,}")
@@ -495,151 +525,18 @@ class AnalyticalLayerLoader:
self.logger.info(f" Pillars: {self.df_clean['pillar_id'].nunique()}") self.logger.info(f" Pillars: {self.df_clean['pillar_id'].nunique()}")
return self.df_clean return self.df_clean
# ------------------------------------------------------------------
# STEP 6: DETERMINE SDG START YEAR & ASSIGN FRAMEWORK
# ------------------------------------------------------------------
def determine_sdg_start_year(self):
"""
Tentukan tahun mulai SDGs secara otomatis dari data aktual, lalu
assign kolom 'framework' (MDGs/SDGs) ke setiap baris di df_clean.
Logika penentuan SDG_START_YEAR:
- Cari indikator yang namanya ada di SDG_INDICATOR_KEYWORDS (FIES, anaemia, dll.)
dan yang diyakini HANYA ada di SDGs (bukan shared dengan MDGs).
Proxy: indikator dengan keyword 'food insecurity' atau 'anemia'.
- Ambil tahun pertama (min year) dari indikator-indikator tersebut di dataset ini.
- Jika ditemukan -> sdg_start_year = tahun pertama itu.
- Jika tidak ditemukan -> sdg_start_year = SDG_START_YEAR_DEFAULT (2016).
Logika assign framework per indikator (assign_framework_dynamic):
- Nama ada di SDG_INDICATOR_KEYWORDS + start_year >= sdg_start_year -> 'SDGs'
- Nama ada di SDG_INDICATOR_KEYWORDS + start_year < sdg_start_year -> 'MDGs'
(indikator seperti undernourishment sudah ada sebelum SDGs)
- Nama TIDAK ada di SDG_INDICATOR_KEYWORDS -> 'MDGs'
"""
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 6: DETERMINE SDG START YEAR & ASSIGN FRAMEWORK")
self.logger.info("=" * 80)
# --- 6a. Auto-detect SDG start year dari data aktual ---
# Proxy SDGs-only: indikator yang pasti baru di SDGs (FIES & anaemia)
sdg_proxy_keywords = [
'food insecurity',
'anemia',
'anaemia',
]
sdg_proxy_mask = self.df_clean['indicator_name'].str.lower().apply(
lambda n: any(kw in n for kw in sdg_proxy_keywords)
)
df_sdg_proxy = self.df_clean[sdg_proxy_mask]
if len(df_sdg_proxy) > 0:
detected_start = int(df_sdg_proxy['year'].min())
self.sdg_start_year = detected_start
self.logger.info(
f"\n [OK] SDG start year AUTO-DETECTED dari data: {self.sdg_start_year}"
)
self.logger.info(f" Proxy indicators used (sample):")
proxy_sample = (
df_sdg_proxy['indicator_name']
.drop_duplicates()
.head(5)
.tolist()
)
for ind in proxy_sample:
self.logger.info(f" - {ind}")
else:
self.sdg_start_year = SDG_START_YEAR_DEFAULT
self.logger.warning(
f"\n [WARN] SDG proxy indicators not found in dataset. "
f"Using default: {self.sdg_start_year}"
)
self.logger.info(f"\n SDG_START_YEAR = {self.sdg_start_year}")
# --- 6b. Hitung start_year aktual per indikator di dataset ini ---
indicator_start = (
self.df_clean
.groupby(['indicator_id', 'indicator_name'])['year']
.min()
.reset_index()
)
indicator_start.columns = ['indicator_id', 'indicator_name', 'actual_start_year']
# --- 6c. Assign framework per indikator ---
indicator_start['framework'] = indicator_start.apply(
lambda row: assign_framework_dynamic(
indicator_name = row['indicator_name'],
indicator_start_year = int(row['actual_start_year']),
sdg_start_year = self.sdg_start_year,
),
axis=1
)
# --- 6d. Log hasil assignment ---
self.logger.info(f"\n Framework assignment per indicator:")
self.logger.info(f" {'-'*85}")
self.logger.info(
f" {'ID':<5} {'Framework':<10} {'Start Yr':<10} {'Indicator Name'}"
)
self.logger.info(f" {'-'*85}")
for _, row in indicator_start.sort_values(
['framework', 'actual_start_year', 'indicator_name']
).iterrows():
is_in_sdg_list = any(
kw in str(row['indicator_name']).lower()
for kw in SDG_INDICATOR_KEYWORDS
)
note = " [in SDG list]" if is_in_sdg_list else ""
self.logger.info(
f" {int(row['indicator_id']):<5} {row['framework']:<10} "
f"{int(row['actual_start_year']):<10} {row['indicator_name'][:55]}{note}"
)
fw_summary = indicator_start['framework'].value_counts()
self.logger.info(f"\n Framework summary:")
for fw, cnt in fw_summary.items():
self.logger.info(f" {fw}: {cnt} indicators")
# --- 6e. Merge framework ke df_clean ---
self.df_clean = self.df_clean.merge(
indicator_start[['indicator_id', 'framework']],
on='indicator_id', how='left'
)
self.df_clean['framework'] = self.df_clean['framework'].fillna('MDGs')
self.logger.info(f"\n [OK] Kolom 'framework' ditambahkan ke df_clean")
self.logger.info(
f" Row distribution — MDGs: "
f"{(self.df_clean['framework'] == 'MDGs').sum():,} | "
f"SDGs: {(self.df_clean['framework'] == 'SDGs').sum():,}"
)
return self.df_clean
# ------------------------------------------------------------------
# STEP 6b: VERIFY NO GAPS
# ------------------------------------------------------------------
def verify_no_gaps(self): def verify_no_gaps(self):
self.logger.info("\n" + "=" * 80) self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 6c: VERIFY NO GAPS") self.logger.info("STEP 6: VERIFY NO GAPS")
self.logger.info("=" * 80) self.logger.info("=" * 80)
expected_countries = len(self.selected_country_ids) expected_countries = len(self.selected_country_ids)
verification = self.df_clean.groupby( verification = self.df_clean.groupby(['indicator_id', 'year'])['country_id'].nunique().reset_index()
['indicator_id', 'year']
)['country_id'].nunique().reset_index()
verification.columns = ['indicator_id', 'year', 'country_count'] verification.columns = ['indicator_id', 'year', 'country_count']
all_good = (verification['country_count'] == expected_countries).all() all_good = (verification['country_count'] == expected_countries).all()
if all_good: if all_good:
self.logger.info( self.logger.info(f" VERIFICATION PASSED — all combinations have {expected_countries} countries")
f" VERIFICATION PASSED — all combinations have {expected_countries} countries"
)
else: else:
bad = verification[verification['country_count'] != expected_countries] bad = verification[verification['country_count'] != expected_countries]
for _, row in bad.head(10).iterrows(): for _, row in bad.head(10).iterrows():
@@ -651,97 +548,9 @@ class AnalyticalLayerLoader:
return True return True
# ------------------------------------------------------------------
# STEP 7: CALCULATE YOY
# ------------------------------------------------------------------
def calculate_yoy(self):
"""
Hitung Year-over-Year (YoY) per indikator per negara.
Kolom yang ditambahkan:
yoy_change : selisih absolut -> value - value_tahun_sebelumnya
yoy_pct : perubahan relatif -> (yoy_change / abs(value_prev)) * 100
Baris tahun pertama per kombinasi country-indicator bernilai NULL (intentional).
"""
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 7: CALCULATE YEAR-OVER-YEAR (YoY) PER INDICATOR PER COUNTRY")
self.logger.info("=" * 80)
df = self.df_clean.sort_values(['country_id', 'indicator_id', 'year']).copy()
df['value_prev'] = df.groupby(['country_id', 'indicator_id'])['value'].shift(1)
df['yoy_change'] = df['value'] - df['value_prev']
df['yoy_pct'] = np.where(
df['value_prev'].notna() & (df['value_prev'] != 0),
(df['yoy_change'] / df['value_prev'].abs()) * 100,
np.nan
)
df = df.drop(columns=['value_prev'])
total_rows = len(df)
valid_yoy = df['yoy_pct'].notna().sum()
null_yoy = df['yoy_pct'].isna().sum()
self.logger.info(f" Total rows : {total_rows:,}")
self.logger.info(f" YoY calculated : {valid_yoy:,}")
self.logger.info(f" YoY NULL (base yr): {null_yoy:,} <- tahun pertama per country-indicator")
per_ind = (
df[df['yoy_pct'].notna()]
.groupby(['indicator_id', 'indicator_name'])['yoy_pct']
.agg(['mean', 'std', 'min', 'max'])
.reset_index()
)
per_ind.columns = ['indicator_id', 'indicator_name', 'mean', 'std', 'min', 'max']
self.logger.info(f"\n YoY summary per indicator (top 10 by abs mean change):")
self.logger.info(f" {'-'*100}")
self.logger.info(
f" {'ID':<5} {'Indicator Name':<52} {'Mean%':>8} {'Std%':>8} {'Min%':>8} {'Max%':>8}"
)
self.logger.info(f" {'-'*100}")
top_ind = per_ind.reindex(
per_ind['mean'].abs().sort_values(ascending=False).index
).head(10)
for _, row in top_ind.iterrows():
self.logger.info(
f" {int(row['indicator_id']):<5} {row['indicator_name'][:50]:<52} "
f"{row['mean']:>+8.2f} {row['std']:>8.2f} "
f"{row['min']:>+8.2f} {row['max']:>+8.2f}"
)
per_country = (
df[df['yoy_pct'].notna()]
.groupby(['country_id', 'country_name'])['yoy_pct']
.agg(['mean', 'std'])
.reset_index()
)
per_country.columns = ['country_id', 'country_name', 'mean_yoy', 'std_yoy']
self.logger.info(f"\n YoY summary per country:")
self.logger.info(f" {'-'*60}")
self.logger.info(f" {'Country':<30} {'Mean YoY%':>10} {'Std YoY%':>10}")
self.logger.info(f" {'-'*60}")
for _, row in per_country.sort_values('mean_yoy', ascending=False).iterrows():
self.logger.info(
f" {row['country_name']:<30} {row['mean_yoy']:>+10.2f} {row['std_yoy']:>10.2f}"
)
self.df_clean = df
self.logger.info(f"\n [OK] YoY columns added: yoy_change, yoy_pct")
return self.df_clean
# ------------------------------------------------------------------
# STEP 8: ANALYZE INDICATOR AVAILABILITY BY YEAR
# ------------------------------------------------------------------
def analyze_indicator_availability_by_year(self): def analyze_indicator_availability_by_year(self):
self.logger.info("\n" + "=" * 80) self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 8: ANALYZE INDICATOR AVAILABILITY BY YEAR") self.logger.info("STEP 7: ANALYZE INDICATOR AVAILABILITY BY YEAR")
self.logger.info("=" * 80) self.logger.info("=" * 80)
year_stats = self.df_clean.groupby('year').agg({ year_stats = self.df_clean.groupby('year').agg({
@@ -761,138 +570,118 @@ class AnalyticalLayerLoader:
) )
indicator_details = self.df_clean.groupby([ indicator_details = self.df_clean.groupby([
'indicator_id', 'indicator_name', 'pillar_name', 'direction', 'framework' 'indicator_id', 'indicator_name', 'pillar_name', 'direction'
]).agg({'year': ['min', 'max'], 'country_id': 'nunique'}).reset_index() ]).agg({'year': ['min', 'max'], 'country_id': 'nunique'}).reset_index()
indicator_details.columns = [ indicator_details.columns = [
'indicator_id', 'indicator_name', 'pillar_name', 'direction', 'framework', 'indicator_id', 'indicator_name', 'pillar_name', 'direction',
'start_year', 'end_year', 'country_count' 'start_year', 'end_year', 'country_count'
] ]
indicator_details['year_range'] = ( indicator_details['year_range'] = (
indicator_details['start_year'].astype(int).astype(str) + '-' + indicator_details['start_year'].astype(int).astype(str) + '-' +
indicator_details['end_year'].astype(int).astype(str) indicator_details['end_year'].astype(int).astype(str)
) )
indicator_details = indicator_details.sort_values( indicator_details = indicator_details.sort_values(['pillar_name', 'start_year', 'indicator_name'])
['framework', 'pillar_name', 'start_year', 'indicator_name']
)
self.logger.info(f"\nTotal Indicators: {len(indicator_details)}") self.logger.info(f"\nTotal Indicators: {len(indicator_details)}")
for pillar, count in indicator_details.groupby('pillar_name').size().items(): for pillar, count in indicator_details.groupby('pillar_name').size().items():
self.logger.info(f" {pillar}: {count} indicators") self.logger.info(f" {pillar}: {count} indicators")
self.logger.info(f"\nFramework breakdown:") self.logger.info(f"\n{'-'*100}")
for fw, count in indicator_details.groupby('framework').size().items(): self.logger.info(f"{'ID':<5} {'Indicator Name':<55} {'Pillar':<15} {'Years':<12} {'Dir':<8} {'Countries'}")
self.logger.info(f" {fw}: {count} indicators") self.logger.info(f"{'-'*100}")
self.logger.info(f"\n{'-'*110}")
self.logger.info(
f"{'ID':<5} {'Indicator Name':<55} {'Pillar':<15} "
f"{'Framework':<10} {'Years':<12} {'Dir':<8} {'Countries'}"
)
self.logger.info(f"{'-'*110}")
for _, row in indicator_details.iterrows(): for _, row in indicator_details.iterrows():
direction = 'higher+' if row['direction'] == 'higher_better' else 'lower-' direction = 'higher+' if row['direction'] == 'higher_better' else 'lower-'
self.logger.info( self.logger.info(
f"{int(row['indicator_id']):<5} {row['indicator_name'][:52]:<55} " f"{int(row['indicator_id']):<5} {row['indicator_name'][:52]:<55} "
f"{row['pillar_name'][:13]:<15} {row['framework']:<10} " f"{row['pillar_name'][:13]:<15} {row['year_range']:<12} "
f"{row['year_range']:<12} {direction:<8} {int(row['country_count'])}" f"{direction:<8} {int(row['country_count'])}"
) )
return year_stats return year_stats
# ------------------------------------------------------------------
# STEP 9: SAVE ANALYTICAL TABLE
# ------------------------------------------------------------------
def save_analytical_table(self): def save_analytical_table(self):
"""
Simpan fact_asean_food_security_selected ke Gold layer.
Kolom yang disimpan:
country_id, country_name — dimensi negara
indicator_id, indicator_name — dimensi indikator
direction — arah penilaian (higher/lower_better)
framework — MDGs/SDGs (ditentukan di Step 6)
pillar_id, pillar_name — dimensi pilar
time_id, year — dimensi waktu
value — nilai indikator
yoy_change — perubahan absolut YoY (NULL di tahun pertama)
yoy_pct — perubahan relatif YoY dalam % (NULL di tahun pertama)
"""
table_name = 'fact_asean_food_security_selected' table_name = 'fact_asean_food_security_selected'
self.logger.info("\n" + "=" * 80) self.logger.info("\n" + "=" * 80)
self.logger.info(f"STEP 9: SAVE TO [DW/Gold] {table_name} -> fs_asean_gold") self.logger.info(f"STEP 8: SAVE TO [DW/Gold] {table_name} -> fs_asean_gold")
self.logger.info("=" * 80) self.logger.info("=" * 80)
try: try:
# Pastikan kolom YoY tersedia — fallback jika calculate_yoy() tidak dipanggil
if 'yoy_change' not in self.df_clean.columns or 'yoy_pct' not in self.df_clean.columns:
self.logger.warning(
" [WARN] Kolom YoY tidak ditemukan. Menjalankan calculate_yoy() sebagai fallback..."
)
self.calculate_yoy()
analytical_df = self.df_clean[[ analytical_df = self.df_clean[[
'country_id', 'country_id',
'country_name', 'country_name',
'indicator_id', 'indicator_id',
'indicator_name', 'indicator_name',
'direction', 'direction',
'framework',
'pillar_id', 'pillar_id',
'pillar_name', 'pillar_name',
'time_id', 'time_id',
'year', 'year',
'value', 'value',
'yoy_change',
'yoy_pct',
]].copy() ]].copy()
# ------------------------------------------------------------------
# TAMBAHAN: kolom terjemahan Bahasa Indonesia
# indicator_name_id : terjemahan Bahasa Indonesia dari indicator_name
# pillar_name_id : terjemahan Bahasa Indonesia dari pillar_name
# ------------------------------------------------------------------
analytical_df['indicator_name_id'] = analytical_df['indicator_name'].apply(translate_indicator)
analytical_df['pillar_name_id'] = analytical_df['pillar_name'].apply(translate_pillar)
# Log indikator yang belum punya terjemahan (fallback ke nama asli)
no_trans_ind = analytical_df[
analytical_df['indicator_name_id'] == analytical_df['indicator_name']
]['indicator_name'].unique()
if len(no_trans_ind) > 0:
self.logger.warning(
f" [TRANSLATION] {len(no_trans_ind)} indicator(s) tidak ada di kamus "
f"(menggunakan nama asli): {list(no_trans_ind)[:5]}"
)
no_trans_pil = analytical_df[
analytical_df['pillar_name_id'] == analytical_df['pillar_name']
]['pillar_name'].unique()
if len(no_trans_pil) > 0:
self.logger.warning(
f" [TRANSLATION] {len(no_trans_pil)} pillar(s) tidak ada di kamus "
f"(menggunakan nama asli): {list(no_trans_pil)}"
)
analytical_df = analytical_df.sort_values( analytical_df = analytical_df.sort_values(
['year', 'country_name', 'pillar_name', 'indicator_name'] ['year', 'country_name', 'pillar_name', 'indicator_name']
).reset_index(drop=True) ).reset_index(drop=True)
# Pastikan tipe data konsisten
analytical_df['country_id'] = analytical_df['country_id'].astype(int) analytical_df['country_id'] = analytical_df['country_id'].astype(int)
analytical_df['country_name'] = analytical_df['country_name'].astype(str) analytical_df['country_name'] = analytical_df['country_name'].astype(str)
analytical_df['indicator_id'] = analytical_df['indicator_id'].astype(int) analytical_df['indicator_id'] = analytical_df['indicator_id'].astype(int)
analytical_df['indicator_name'] = analytical_df['indicator_name'].astype(str) analytical_df['indicator_name'] = analytical_df['indicator_name'].astype(str)
analytical_df['indicator_name_id'] = analytical_df['indicator_name_id'].astype(str)
analytical_df['direction'] = analytical_df['direction'].astype(str) analytical_df['direction'] = analytical_df['direction'].astype(str)
analytical_df['framework'] = analytical_df['framework'].astype(str)
analytical_df['pillar_id'] = analytical_df['pillar_id'].astype(int) analytical_df['pillar_id'] = analytical_df['pillar_id'].astype(int)
analytical_df['pillar_name'] = analytical_df['pillar_name'].astype(str) analytical_df['pillar_name'] = analytical_df['pillar_name'].astype(str)
analytical_df['pillar_name_id'] = analytical_df['pillar_name_id'].astype(str)
analytical_df['time_id'] = analytical_df['time_id'].astype(int) analytical_df['time_id'] = analytical_df['time_id'].astype(int)
analytical_df['year'] = analytical_df['year'].astype(int) analytical_df['year'] = analytical_df['year'].astype(int)
analytical_df['value'] = analytical_df['value'].astype(float) analytical_df['value'] = analytical_df['value'].astype(float)
analytical_df['yoy_change'] = analytical_df['yoy_change'].astype(float)
analytical_df['yoy_pct'] = analytical_df['yoy_pct'].astype(float)
self.logger.info(f" Kolom yang disimpan: {list(analytical_df.columns)}") self.logger.info(f" Kolom yang disimpan: {list(analytical_df.columns)}")
self.logger.info(f" Total rows: {len(analytical_df):,}") self.logger.info(f" Total rows: {len(analytical_df):,}")
fw_dist = analytical_df.drop_duplicates('indicator_id')['framework'].value_counts() # Schema BigQuery
self.logger.info(f" Framework distribution (per indikator unik):")
for fw, cnt in fw_dist.items():
self.logger.info(f" {fw}: {cnt} indicators")
yoy_valid = analytical_df['yoy_pct'].notna().sum()
yoy_null = analytical_df['yoy_pct'].isna().sum()
self.logger.info(f" YoY rows (calculated): {yoy_valid:,}")
self.logger.info(f" YoY rows (NULL/base) : {yoy_null:,}")
schema = [ schema = [
bigquery.SchemaField("country_id", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("country_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("country_name", "STRING", mode="REQUIRED"), bigquery.SchemaField("country_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("indicator_id", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("indicator_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("indicator_name", "STRING", mode="REQUIRED"), bigquery.SchemaField("indicator_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("indicator_name_id", "STRING", mode="REQUIRED"),
bigquery.SchemaField("direction", "STRING", mode="REQUIRED"), bigquery.SchemaField("direction", "STRING", mode="REQUIRED"),
bigquery.SchemaField("framework", "STRING", mode="REQUIRED"),
bigquery.SchemaField("pillar_id", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("pillar_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("pillar_name", "STRING", mode="REQUIRED"), bigquery.SchemaField("pillar_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("pillar_name_id", "STRING", mode="REQUIRED"),
bigquery.SchemaField("time_id", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("time_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("value", "FLOAT", mode="REQUIRED"), bigquery.SchemaField("value", "FLOAT", mode="REQUIRED"),
bigquery.SchemaField("yoy_change", "FLOAT", mode="NULLABLE"),
bigquery.SchemaField("yoy_pct", "FLOAT", mode="NULLABLE"),
] ]
rows_loaded = load_to_bigquery( rows_loaded = load_to_bigquery(
@@ -915,29 +704,19 @@ class AnalyticalLayerLoader:
'config_snapshot' : json.dumps({ 'config_snapshot' : json.dumps({
'start_year' : self.start_year, 'start_year' : self.start_year,
'end_year' : self.end_year, 'end_year' : self.end_year,
'sdg_start_year' : self.sdg_start_year,
'fixed_countries': len(self.selected_country_ids), 'fixed_countries': len(self.selected_country_ids),
'no_gaps' : True, 'no_gaps' : True,
'layer' : 'gold', 'layer' : 'gold',
'framework_logic' : ( 'columns' : 'id + name + name_id (Looker Studio ready)'
f"SDGs if in SDG_INDICATOR_KEYWORDS AND start_year >= {self.sdg_start_year}, "
"else MDGs"
),
}), }),
'validation_metrics' : json.dumps({ 'validation_metrics' : json.dumps({
'fixed_countries' : len(self.selected_country_ids), 'fixed_countries' : len(self.selected_country_ids),
'total_indicators': int(self.df_clean['indicator_id'].nunique()), 'total_indicators': int(self.df_clean['indicator_id'].nunique())
'sdg_start_year' : self.sdg_start_year,
'framework_dist' : fw_dist.to_dict(),
'yoy_rows_valid' : int(yoy_valid),
'yoy_rows_null' : int(yoy_null),
}) })
} }
save_etl_metadata(self.client, metadata) save_etl_metadata(self.client, metadata)
self.logger.info( self.logger.info(f" [OK] {table_name}: {rows_loaded:,} rows -> [DW/Gold] fs_asean_gold")
f" {table_name}: {rows_loaded:,} rows -> [DW/Gold] fs_asean_gold"
)
self.logger.info(f" Metadata -> [AUDIT] etl_metadata") self.logger.info(f" Metadata -> [AUDIT] etl_metadata")
return rows_loaded return rows_loaded
@@ -945,19 +724,12 @@ class AnalyticalLayerLoader:
self.logger.error(f"Error saving: {e}") self.logger.error(f"Error saving: {e}")
raise raise
# ------------------------------------------------------------------
# RUN
# ------------------------------------------------------------------
def run(self): def run(self):
self.pipeline_start = datetime.now() self.pipeline_start = datetime.now()
self.pipeline_metadata['start_time'] = self.pipeline_start self.pipeline_metadata['start_time'] = self.pipeline_start
self.logger.info("\n" + "=" * 80) self.logger.info("\n" + "=" * 80)
self.logger.info("Output: fact_asean_food_security_selected -> fs_asean_gold") self.logger.info("Output: fact_asean_food_security_selected -> fs_asean_gold")
self.logger.info("Kolom: country_id/name, indicator_id/name, direction, framework,")
self.logger.info(" pillar_id/name, time_id, year, value, yoy_change, yoy_pct")
self.logger.info(f"Framework: ditentukan dinamis berdasarkan SDG_START_YEAR (auto-detect)")
self.logger.info("=" * 80) self.logger.info("=" * 80)
self.load_source_data() self.load_source_data()
@@ -965,9 +737,7 @@ class AnalyticalLayerLoader:
self.filter_complete_indicators_per_country() self.filter_complete_indicators_per_country()
self.select_countries_with_all_pillars() self.select_countries_with_all_pillars()
self.filter_indicators_consistent_across_fixed_countries() self.filter_indicators_consistent_across_fixed_countries()
self.determine_sdg_start_year() # Step 6: auto-detect SDG year & assign framework self.verify_no_gaps()
self.verify_no_gaps() # Step 6c: verifikasi tidak ada gap
self.calculate_yoy() # Step 7: hitung YoY
self.analyze_indicator_availability_by_year() self.analyze_indicator_availability_by_year()
self.save_analytical_table() self.save_analytical_table()
@@ -979,7 +749,6 @@ class AnalyticalLayerLoader:
self.logger.info("=" * 80) self.logger.info("=" * 80)
self.logger.info(f" Duration : {duration:.2f}s") self.logger.info(f" Duration : {duration:.2f}s")
self.logger.info(f" Year Range : {self.start_year}-{self.end_year}") self.logger.info(f" Year Range : {self.start_year}-{self.end_year}")
self.logger.info(f" SDG Start Yr : {self.sdg_start_year}")
self.logger.info(f" Countries : {len(self.selected_country_ids)}") self.logger.info(f" Countries : {len(self.selected_country_ids)}")
self.logger.info(f" Indicators : {self.df_clean['indicator_id'].nunique()}") self.logger.info(f" Indicators : {self.df_clean['indicator_id'].nunique()}")
self.logger.info(f" Rows Loaded: {self.pipeline_metadata['rows_loaded']:,}") self.logger.info(f" Rows Loaded: {self.pipeline_metadata['rows_loaded']:,}")
@@ -1007,9 +776,7 @@ def run_analytical_layer():
if __name__ == "__main__": if __name__ == "__main__":
print("=" * 80) print("=" * 80)
print("BIGQUERY ANALYTICAL LAYER - DATA FILTERING")
print("Output: fact_asean_food_security_selected -> fs_asean_gold") print("Output: fact_asean_food_security_selected -> fs_asean_gold")
print("Framework: MDGs/SDGs ditentukan dinamis dari data (auto-detect SDG start year)")
print("=" * 80) print("=" * 80)
logger = setup_logging() logger = setup_logging()
@@ -1019,6 +786,4 @@ if __name__ == "__main__":
print("\n" + "=" * 80) print("\n" + "=" * 80)
print("[OK] COMPLETED") print("[OK] COMPLETED")
print(f" SDG Start Year : {loader.sdg_start_year}")
print(f" Rows Loaded : {loader.pipeline_metadata['rows_loaded']:,}")
print("=" * 80) print("=" * 80)

View File

@@ -40,7 +40,7 @@ def load_staging_data(client: bigquery.Client) -> pd.DataFrame:
"""Load data dari staging_integrated (STAGING/Silver layer).""" """Load data dari staging_integrated (STAGING/Silver layer)."""
print("\nLoading data from staging_integrated (fs_asean_silver)...") print("\nLoading data from staging_integrated (fs_asean_silver)...")
df_staging = read_from_bigquery(client, 'staging_integrated', layer='silver') df_staging = read_from_bigquery(client, 'staging_integrated', layer='silver')
print(f" Loaded : {len(df_staging):,} rows") print(f" Loaded : {len(df_staging):,} rows")
print(f" Columns : {len(df_staging.columns)}") print(f" Columns : {len(df_staging.columns)}")
print(f" Sources : {df_staging['source'].nunique()}") print(f" Sources : {df_staging['source'].nunique()}")
print(f" Indicators : {df_staging['indicator_standardized'].nunique()}") print(f" Indicators : {df_staging['indicator_standardized'].nunique()}")
@@ -53,6 +53,7 @@ def load_staging_data(client: bigquery.Client) -> pd.DataFrame:
# COLUMN CONSTRAINT HELPERS # COLUMN CONSTRAINT HELPERS
# ============================================================================= # =============================================================================
# Schema constraints — semua varchar max lengths
COLUMN_CONSTRAINTS = { COLUMN_CONSTRAINTS = {
'source' : 20, 'source' : 20,
'indicator_original' : 255, 'indicator_original' : 255,
@@ -61,7 +62,7 @@ COLUMN_CONSTRAINTS = {
'year_range' : 20, 'year_range' : 20,
'unit' : 20, 'unit' : 20,
'pillar' : 20, 'pillar' : 20,
'direction' : 15, 'direction' : 15, # 'higher_better'=13, 'lower_better'=12
} }
@@ -100,11 +101,11 @@ def apply_column_constraints(df: pd.DataFrame) -> pd.DataFrame:
) )
if truncation_report: if truncation_report:
print("\n Column Truncations Applied:") print("\n Column Truncations Applied:")
for column, info in truncation_report.items(): for column, info in truncation_report.items():
print(f" - {column}: {info['count']} values truncated to {info['max_length']} chars") print(f" - {column}: {info['count']} values truncated to {info['max_length']} chars")
else: else:
print("\n No truncations needed — all values within constraints") print("\n No truncations needed — all values within constraints")
return df_constrained return df_constrained
@@ -176,16 +177,16 @@ def standardize_country_names_asean(df: pd.DataFrame, country_column: str = 'cou
def assign_pillar(indicator_name: str) -> str: def assign_pillar(indicator_name: str) -> str:
""" """
Assign pillar berdasarkan keyword indikator. Assign pillar berdasarkan keyword indikator.
Return values: 'Availability', 'Access', 'Utilization', 'Stability', 'Other' Return values: 'Availability', 'Access', 'Utilization', 'Stability', 'Sustainability'
All <= 20 chars (varchar(20) constraint). All 20 chars (varchar(20) constraint).
""" """
if pd.isna(indicator_name): if pd.isna(indicator_name):
return 'Other' return 'Sustainability'
ind = str(indicator_name).lower() ind = str(indicator_name).lower()
for kw in ['requirement', 'coefficient', 'losses', 'fat supply']: for kw in ['requirement', 'coefficient', 'losses', 'fat supply']:
if kw in ind: if kw in ind:
return 'Other' return 'Sustainability'
if any(kw in ind for kw in [ if any(kw in ind for kw in [
'adequacy', 'protein supply', 'supply of protein', 'adequacy', 'protein supply', 'supply of protein',
@@ -209,13 +210,12 @@ def assign_pillar(indicator_name: str) -> str:
if any(kw in ind for kw in [ if any(kw in ind for kw in [
'wasting', 'wasted', 'stunted', 'overweight', 'obese', 'obesity', 'wasting', 'wasted', 'stunted', 'overweight', 'obese', 'obesity',
'anemia', 'anaemia', 'birthweight', 'breastfeeding', 'drinking water', 'anemia', 'birthweight', 'breastfeeding', 'drinking water', 'sanitation',
'sanitation', 'children under 5', 'newborns with low', 'children under 5', 'newborns with low', 'women of reproductive'
'women of reproductive'
]): ]):
return 'Utilization' return 'Utilization'
return 'Other' return 'Sustainability'
# ============================================================================= # =============================================================================
@@ -226,15 +226,17 @@ def assign_direction(indicator_name: str) -> str:
""" """
Assign direction berdasarkan indikator. Assign direction berdasarkan indikator.
Return values: 'higher_better' (13 chars) atau 'lower_better' (12 chars) Return values: 'higher_better' (13 chars) atau 'lower_better' (12 chars)
Both <= 15 chars (varchar(15) constraint). Both 15 chars (varchar(15) constraint).
""" """
if pd.isna(indicator_name): if pd.isna(indicator_name):
return 'higher_better' return 'higher_better'
ind = str(indicator_name).lower() ind = str(indicator_name).lower()
# Spesifik lower_better
if 'share of dietary energy supply derived from cereals' in ind: if 'share of dietary energy supply derived from cereals' in ind:
return 'lower_better' return 'lower_better'
# Higher_better exceptions — cek sebelum lower_better keywords
for kw in [ for kw in [
'exclusive breastfeeding', 'exclusive breastfeeding',
'dietary energy supply', 'dietary energy supply',
@@ -246,6 +248,7 @@ def assign_direction(indicator_name: str) -> str:
if kw in ind: if kw in ind:
return 'higher_better' return 'higher_better'
# Lower_better — masalah yang harus diminimalkan
for kw in [ for kw in [
'prevalence of undernourishment', 'prevalence of undernourishment',
'prevalence of severe food insecurity', 'prevalence of severe food insecurity',
@@ -256,7 +259,6 @@ def assign_direction(indicator_name: str) -> str:
'prevalence of overweight', 'prevalence of overweight',
'prevalence of obesity', 'prevalence of obesity',
'prevalence of anemia', 'prevalence of anemia',
'prevalence of anaemia',
'prevalence of low birthweight', 'prevalence of low birthweight',
'number of people undernourished', 'number of people undernourished',
'number of severely food insecure', 'number of severely food insecure',
@@ -281,9 +283,6 @@ def assign_direction(indicator_name: str) -> str:
'coefficient of variation', 'coefficient of variation',
'incidence of caloric losses', 'incidence of caloric losses',
'food losses', 'food losses',
'indicator of food price anomalies',
'proportion of local breeds classified as being at risk',
'agricultural export subsidies',
]: ]:
if kw in ind: if kw in ind:
return 'lower_better' return 'lower_better'
@@ -300,18 +299,19 @@ class CleanedDataLoader:
Loader untuk cleaned integrated data ke STAGING layer (Silver). Loader untuk cleaned integrated data ke STAGING layer (Silver).
Kimball context: Kimball context:
Input : staging_integrated -> STAGING (Silver) — fs_asean_silver Input : staging_integrated STAGING (Silver) — fs_asean_silver
Output : cleaned_integrated -> STAGING (Silver) — fs_asean_silver Output : cleaned_integrated STAGING (Silver) — fs_asean_silver
Audit : etl_logs, etl_metadata -> AUDIT — fs_asean_audit Audit : etl_logs, etl_metadata AUDIT — fs_asean_audit
Pipeline steps: Pipeline steps:
1. Standardize country names (ASEAN) 1. Standardize country names (ASEAN)
2. Remove missing values 2. Remove missing values
3. Remove duplicates 3. Remove duplicates
4. Add pillar & direction classification 4. Add pillar classification
5. Apply column constraints 5. Add direction classification
6. Load ke BigQuery 6. Apply column constraints
7. Log ke Audit layer 7. Load ke BigQuery
8. Log ke Audit layer
""" """
SCHEMA = [ SCHEMA = [
@@ -355,7 +355,7 @@ class CleanedDataLoader:
def _step_standardize_countries(self, df: pd.DataFrame) -> pd.DataFrame: def _step_standardize_countries(self, df: pd.DataFrame) -> pd.DataFrame:
print("\n [Step 1/5] Standardize country names...") print("\n [Step 1/5] Standardize country names...")
df, report = standardize_country_names_asean(df, country_column='country') df, report = standardize_country_names_asean(df, country_column='country')
print(f" ASEAN countries mapped : {report['countries_mapped']}") print(f" ASEAN countries mapped : {report['countries_mapped']}")
unique_countries = sorted(df['country'].unique()) unique_countries = sorted(df['country'].unique())
print(f" Countries ({len(unique_countries)}) : {', '.join(unique_countries)}") print(f" Countries ({len(unique_countries)}) : {', '.join(unique_countries)}")
log_update(self.client, 'STAGING', 'staging_integrated', log_update(self.client, 'STAGING', 'staging_integrated',
@@ -377,9 +377,7 @@ class CleanedDataLoader:
def _step_remove_duplicates(self, df: pd.DataFrame) -> pd.DataFrame: def _step_remove_duplicates(self, df: pd.DataFrame) -> pd.DataFrame:
print("\n [Step 3/5] Remove duplicates...") print("\n [Step 3/5] Remove duplicates...")
exact_dups = df.duplicated().sum() exact_dups = df.duplicated().sum()
data_dups = df.duplicated( data_dups = df.duplicated(subset=['indicator_standardized', 'country', 'year', 'value']).sum()
subset=['indicator_standardized', 'country', 'year', 'value']
).sum()
print(f" Exact duplicates : {exact_dups:,}") print(f" Exact duplicates : {exact_dups:,}")
print(f" Data duplicates : {data_dups:,}") print(f" Data duplicates : {data_dups:,}")
rows_before = len(df) rows_before = len(df)
@@ -393,21 +391,19 @@ class CleanedDataLoader:
def _step_add_classifications(self, df: pd.DataFrame) -> pd.DataFrame: def _step_add_classifications(self, df: pd.DataFrame) -> pd.DataFrame:
print("\n [Step 4/5] Add pillar & direction classification...") print("\n [Step 4/5] Add pillar & direction classification...")
df = df.copy() df = df.copy()
df['pillar'] = df['indicator_standardized'].apply(assign_pillar) df['pillar'] = df['indicator_standardized'].apply(assign_pillar)
df['direction'] = df['indicator_standardized'].apply(assign_direction) df['direction'] = df['indicator_standardized'].apply(assign_direction)
pillar_counts = df['pillar'].value_counts() pillar_counts = df['pillar'].value_counts()
print(f" Pillar distribution:") print(f" Pillar distribution:")
for pillar, count in pillar_counts.items(): for pillar, count in pillar_counts.items():
print(f" - {pillar}: {count:,}") print(f" - {pillar}: {count:,}")
direction_counts = df['direction'].value_counts() direction_counts = df['direction'].value_counts()
print(f" Direction distribution:") print(f" Direction distribution:")
for direction, count in direction_counts.items(): for direction, count in direction_counts.items():
pct = count / len(df) * 100 pct = count / len(df) * 100
print(f" - {direction}: {count:,} ({pct:.1f}%)") print(f" - {direction}: {count:,} ({pct:.1f}%)")
return df return df
def _step_apply_constraints(self, df: pd.DataFrame) -> pd.DataFrame: def _step_apply_constraints(self, df: pd.DataFrame) -> pd.DataFrame:
@@ -442,6 +438,7 @@ class CleanedDataLoader:
if 'country' in df.columns: if 'country' in df.columns:
validation['unique_countries'] = int(df['country'].nunique()) validation['unique_countries'] = int(df['country'].nunique())
# Column length check
column_length_check = {} column_length_check = {}
for col, max_len in COLUMN_CONSTRAINTS.items(): for col, max_len in COLUMN_CONSTRAINTS.items():
if col in df.columns: if col in df.columns:
@@ -460,7 +457,7 @@ class CleanedDataLoader:
def run(self, df: pd.DataFrame) -> int: def run(self, df: pd.DataFrame) -> int:
""" """
Execute full cleaning pipeline -> load ke STAGING (Silver). Execute full cleaning pipeline load ke STAGING (Silver).
Returns: Returns:
int: Rows loaded int: Rows loaded
@@ -472,6 +469,7 @@ class CleanedDataLoader:
print(" ERROR: DataFrame is empty, nothing to process.") print(" ERROR: DataFrame is empty, nothing to process.")
return 0 return 0
# Pipeline steps
df = self._step_standardize_countries(df) df = self._step_standardize_countries(df)
df = self._step_remove_missing(df) df = self._step_remove_missing(df)
df = self._step_remove_duplicates(df) df = self._step_remove_duplicates(df)
@@ -480,6 +478,7 @@ class CleanedDataLoader:
self.metadata['rows_transformed'] = len(df) self.metadata['rows_transformed'] = len(df)
# Validate
validation = self.validate_data(df) validation = self.validate_data(df)
self.metadata['validation_metrics'] = validation self.metadata['validation_metrics'] = validation
@@ -488,12 +487,13 @@ class CleanedDataLoader:
for info in validation.get('column_length_check', {}).values() for info in validation.get('column_length_check', {}).values()
) )
if not all_within_limits: if not all_within_limits:
print("\n WARNING: Some columns still exceed length constraints!") print("\n WARNING: Some columns still exceed length constraints!")
for col, info in validation['column_length_check'].items(): for col, info in validation['column_length_check'].items():
if not info['within_limit']: if not info['within_limit']:
print(f" - {col}: {info['max_actual_length']} > {info['max_length_constraint']}") print(f" - {col}: {info['max_actual_length']} > {info['max_length_constraint']}")
print(f"\n Loading to [STAGING/Silver] {self.table_name} -> fs_asean_silver...") # Load ke Silver
print(f"\n Loading to [STAGING/Silver] {self.table_name} → fs_asean_silver...")
rows_loaded = load_to_bigquery( rows_loaded = load_to_bigquery(
self.client, df, self.table_name, self.client, df, self.table_name,
layer='silver', layer='silver',
@@ -502,8 +502,10 @@ class CleanedDataLoader:
) )
self.metadata['rows_loaded'] = rows_loaded self.metadata['rows_loaded'] = rows_loaded
# Audit logs
log_update(self.client, 'STAGING', self.table_name, 'full_refresh', rows_loaded) log_update(self.client, 'STAGING', self.table_name, 'full_refresh', rows_loaded)
# ETL metadata
self.metadata['end_time'] = datetime.now() self.metadata['end_time'] = datetime.now()
self.metadata['duration_seconds'] = ( self.metadata['duration_seconds'] = (
self.metadata['end_time'] - self.metadata['start_time'] self.metadata['end_time'] - self.metadata['start_time']
@@ -514,31 +516,33 @@ class CleanedDataLoader:
self.metadata['validation_metrics'] = json.dumps(validation) self.metadata['validation_metrics'] = json.dumps(validation)
save_etl_metadata(self.client, self.metadata) save_etl_metadata(self.client, self.metadata)
print(f"\n Cleaned Integration completed: {rows_loaded:,} rows") # Summary
print(f"\n ✓ Cleaned Integration completed: {rows_loaded:,} rows")
print(f" Duration : {self.metadata['duration_seconds']:.2f}s") print(f" Duration : {self.metadata['duration_seconds']:.2f}s")
print(f" Completeness : {validation['completeness_pct']:.2f}%") print(f" Completeness : {validation['completeness_pct']:.2f}%")
if 'year_range' in validation: if 'year_range' in validation:
yr = validation['year_range'] yr = validation['year_range']
if yr['min'] and yr['max']: if yr['min'] and yr['max']:
print(f" Year range : {yr['min']}-{yr['max']}") print(f" Year range : {yr['min']}{yr['max']}")
print(f" Indicators : {validation.get('unique_indicators', '-')}") print(f" Indicators : {validation.get('unique_indicators', '-')}")
print(f" Countries : {validation.get('unique_countries', '-')}") print(f" Countries : {validation.get('unique_countries', '-')}")
print(f"\n Schema Validation:") print(f"\n Schema Validation:")
for col, info in validation.get('column_length_check', {}).items(): for col, info in validation.get('column_length_check', {}).items():
status = "OK" if info['within_limit'] else "FAIL" status = "" if info['within_limit'] else ""
print(f" [{status}] {col}: {info['max_actual_length']}/{info['max_length_constraint']}") print(f" {status} {col}: {info['max_actual_length']}/{info['max_length_constraint']}")
print(f"\n Metadata -> [AUDIT] etl_metadata") print(f"\n Metadata [AUDIT] etl_metadata")
return rows_loaded return rows_loaded
# ============================================================================= # =============================================================================
# AIRFLOW TASK FUNCTIONS # AIRFLOW TASK FUNCTIONS ← sama polanya dengan raw layer
# ============================================================================= # =============================================================================
def run_cleaned_integration(): def run_cleaned_integration():
""" """
Airflow task: Load cleaned_integrated dari staging_integrated. Airflow task: Load cleaned_integrated dari staging_integrated.
Dipanggil oleh DAG setelah task staging_integration_to_silver selesai. Dipanggil oleh DAG setelah task staging_integration_to_silver selesai.
""" """
from scripts.bigquery_config import get_bigquery_client from scripts.bigquery_config import get_bigquery_client
@@ -557,21 +561,21 @@ if __name__ == "__main__":
print("=" * 60) print("=" * 60)
print("BIGQUERY CLEANED LAYER ETL") print("BIGQUERY CLEANED LAYER ETL")
print("Kimball DW Architecture") print("Kimball DW Architecture")
print(" Input : STAGING (Silver) -> staging_integrated") print(" Input : STAGING (Silver) staging_integrated")
print(" Output : STAGING (Silver) -> cleaned_integrated") print(" Output : STAGING (Silver) cleaned_integrated")
print(" Audit : AUDIT -> etl_logs, etl_metadata") print(" Audit : AUDIT etl_logs, etl_metadata")
print("=" * 60) print("=" * 60)
logger = setup_logging() logger = setup_logging()
client = get_bigquery_client() client = get_bigquery_client()
df_staging = load_staging_data(client) df_staging = load_staging_data(client)
print("\n[1/1] Cleaned Integration -> STAGING (Silver)...") print("\n[1/1] Cleaned Integration STAGING (Silver)...")
loader = CleanedDataLoader(client, load_mode='full_refresh') loader = CleanedDataLoader(client, load_mode='full_refresh')
final_count = loader.run(df_staging) final_count = loader.run(df_staging)
print("\n" + "=" * 60) print("\n" + "=" * 60)
print("[OK] CLEANED LAYER ETL COMPLETED") print(" CLEANED LAYER ETL COMPLETED")
print(f" STAGING (Silver) : cleaned_integrated ({final_count:,} rows)") print(f" 🥈 STAGING (Silver) : cleaned_integrated ({final_count:,} rows)")
print(f" AUDIT : etl_logs, etl_metadata") print(f" 📋 AUDIT : etl_logs, etl_metadata")
print("=" * 60) print("=" * 60)

View File

@@ -46,9 +46,9 @@ class DimensionalModelLoader:
Loader untuk dimensional model ke DW layer (Gold) — fs_asean_gold. Loader untuk dimensional model ke DW layer (Gold) — fs_asean_gold.
Kimball context: Kimball context:
Input : cleaned_integrated -> STAGING (Silver) — fs_asean_silver Input : cleaned_integrated STAGING (Silver) — fs_asean_silver
Output : dim_* + fact_* -> DW (Gold) — fs_asean_gold Output : dim_* + fact_* DW (Gold) — fs_asean_gold
Audit : etl_logs, etl_metadata -> AUDIT — fs_asean_audit Audit : etl_logs, etl_metadata AUDIT — fs_asean_audit
Pipeline steps: Pipeline steps:
1. Load dim_country 1. Load dim_country
@@ -117,7 +117,7 @@ class DimensionalModelLoader:
""" """
try: try:
self.client.query(query).result() self.client.query(query).result()
self.logger.info(f" [OK] FK: {table_name}.{fk_column} -> {ref_table}.{ref_column}") self.logger.info(f" [OK] FK: {table_name}.{fk_column} {ref_table}.{ref_column}")
except Exception as e: except Exception as e:
if "already exists" in str(e).lower(): if "already exists" in str(e).lower():
self.logger.info(f" [INFO] FK already exists: {constraint_name}") self.logger.info(f" [INFO] FK already exists: {constraint_name}")
@@ -145,7 +145,7 @@ class DimensionalModelLoader:
} }
try: try:
save_etl_metadata(self.client, metadata) save_etl_metadata(self.client, metadata)
self.logger.info(f" Metadata -> [AUDIT] etl_metadata") self.logger.info(f" Metadata [AUDIT] etl_metadata")
except Exception as e: except Exception as e:
self.logger.warning(f" [WARN] Could not save metadata for {table_name}: {e}") self.logger.warning(f" [WARN] Could not save metadata for {table_name}: {e}")
@@ -156,7 +156,7 @@ class DimensionalModelLoader:
def load_dim_time(self): def load_dim_time(self):
table_name = 'dim_time' table_name = 'dim_time'
self.load_metadata[table_name]['start_time'] = datetime.now() self.load_metadata[table_name]['start_time'] = datetime.now()
self.logger.info("Loading dim_time -> [DW/Gold] fs_asean_gold...") self.logger.info("Loading dim_time [DW/Gold] fs_asean_gold...")
try: try:
if 'year_range' in self.df_clean.columns: if 'year_range' in self.df_clean.columns:
@@ -229,7 +229,7 @@ class DimensionalModelLoader:
) )
log_update(self.client, 'DW', table_name, 'full_load', rows_loaded) log_update(self.client, 'DW', table_name, 'full_load', rows_loaded)
self._save_table_metadata(table_name) self._save_table_metadata(table_name)
self.logger.info(f" dim_time: {rows_loaded} rows\n") self.logger.info(f" dim_time: {rows_loaded} rows\n")
return rows_loaded return rows_loaded
except Exception as e: except Exception as e:
@@ -240,7 +240,7 @@ class DimensionalModelLoader:
def load_dim_country(self): def load_dim_country(self):
table_name = 'dim_country' table_name = 'dim_country'
self.load_metadata[table_name]['start_time'] = datetime.now() self.load_metadata[table_name]['start_time'] = datetime.now()
self.logger.info("Loading dim_country -> [DW/Gold] fs_asean_gold...") self.logger.info("Loading dim_country [DW/Gold] fs_asean_gold...")
try: try:
dim_country = self.df_clean[['country']].drop_duplicates().copy() dim_country = self.df_clean[['country']].drop_duplicates().copy()
@@ -270,9 +270,7 @@ class DimensionalModelLoader:
lambda x: region_mapping.get(x, ('Unknown', 'Unknown'))[1]) lambda x: region_mapping.get(x, ('Unknown', 'Unknown'))[1])
dim_country['iso_code'] = dim_country['country_name'].map(iso_mapping) dim_country['iso_code'] = dim_country['country_name'].map(iso_mapping)
dim_country_final = dim_country[ dim_country_final = dim_country[['country_name', 'region', 'subregion', 'iso_code']].copy()
['country_name', 'region', 'subregion', 'iso_code']
].copy()
dim_country_final = dim_country_final.reset_index(drop=True) dim_country_final = dim_country_final.reset_index(drop=True)
dim_country_final.insert(0, 'country_id', range(1, len(dim_country_final) + 1)) dim_country_final.insert(0, 'country_id', range(1, len(dim_country_final) + 1))
@@ -295,7 +293,7 @@ class DimensionalModelLoader:
) )
log_update(self.client, 'DW', table_name, 'full_load', rows_loaded) log_update(self.client, 'DW', table_name, 'full_load', rows_loaded)
self._save_table_metadata(table_name) self._save_table_metadata(table_name)
self.logger.info(f" dim_country: {rows_loaded} rows\n") self.logger.info(f" dim_country: {rows_loaded} rows\n")
return rows_loaded return rows_loaded
except Exception as e: except Exception as e:
@@ -304,19 +302,9 @@ class DimensionalModelLoader:
raise raise
def load_dim_indicator(self): def load_dim_indicator(self):
"""
Load dim_indicator ke Gold layer.
Kolom yang dimuat:
indicator_id — surrogate key
indicator_name — nama standar indikator
indicator_category — kategori (Health & Nutrition, dll.)
unit — satuan ukuran
direction — higher_better / lower_better
"""
table_name = 'dim_indicator' table_name = 'dim_indicator'
self.load_metadata[table_name]['start_time'] = datetime.now() self.load_metadata[table_name]['start_time'] = datetime.now()
self.logger.info("Loading dim_indicator -> [DW/Gold] fs_asean_gold...") self.logger.info("Loading dim_indicator [DW/Gold] fs_asean_gold...")
try: try:
has_direction = 'direction' in self.df_clean.columns has_direction = 'direction' in self.df_clean.columns
@@ -326,7 +314,6 @@ class DimensionalModelLoader:
dim_indicator = self.df_clean[['indicator_standardized']].drop_duplicates().copy() dim_indicator = self.df_clean[['indicator_standardized']].drop_duplicates().copy()
dim_indicator.columns = ['indicator_name'] dim_indicator.columns = ['indicator_name']
# Unit
if has_unit: if has_unit:
unit_map = self.df_clean[['indicator_standardized', 'unit']].drop_duplicates() unit_map = self.df_clean[['indicator_standardized', 'unit']].drop_duplicates()
unit_map.columns = ['indicator_name', 'unit'] unit_map.columns = ['indicator_name', 'unit']
@@ -334,7 +321,6 @@ class DimensionalModelLoader:
else: else:
dim_indicator['unit'] = None dim_indicator['unit'] = None
# Direction
if has_direction: if has_direction:
dir_map = self.df_clean[['indicator_standardized', 'direction']].drop_duplicates() dir_map = self.df_clean[['indicator_standardized', 'direction']].drop_duplicates()
dir_map.columns = ['indicator_name', 'direction'] dir_map.columns = ['indicator_name', 'direction']
@@ -344,43 +330,30 @@ class DimensionalModelLoader:
dim_indicator['direction'] = 'higher_better' dim_indicator['direction'] = 'higher_better'
self.logger.warning(" [WARN] direction not found, default: higher_better") self.logger.warning(" [WARN] direction not found, default: higher_better")
# Indicator category
if has_category: if has_category:
cat_map = self.df_clean[ cat_map = self.df_clean[['indicator_standardized', 'indicator_category']].drop_duplicates()
['indicator_standardized', 'indicator_category']
].drop_duplicates()
cat_map.columns = ['indicator_name', 'indicator_category'] cat_map.columns = ['indicator_name', 'indicator_category']
dim_indicator = dim_indicator.merge(cat_map, on='indicator_name', how='left') dim_indicator = dim_indicator.merge(cat_map, on='indicator_name', how='left')
else: else:
def categorize_indicator(name): def categorize_indicator(name):
n = str(name).lower() n = str(name).lower()
if any(w in n for w in [ if any(w in n for w in ['undernourishment', 'malnutrition', 'stunting',
'undernourishment', 'malnutrition', 'stunting', 'wasting', 'anemia', 'food security', 'food insecure', 'hunger']):
'wasting', 'anemia', 'anaemia', 'food security',
'food insecure', 'hunger'
]):
return 'Health & Nutrition' return 'Health & Nutrition'
elif any(w in n for w in [ elif any(w in n for w in ['production', 'yield', 'cereal', 'crop',
'production', 'yield', 'cereal', 'crop', 'import dependency', 'share of dietary']):
'import dependency', 'share of dietary'
]):
return 'Agricultural Production' return 'Agricultural Production'
elif any(w in n for w in ['import', 'export', 'trade']): elif any(w in n for w in ['import', 'export', 'trade']):
return 'Trade' return 'Trade'
elif any(w in n for w in ['gdp', 'income', 'economic']): elif any(w in n for w in ['gdp', 'income', 'economic']):
return 'Economic' return 'Economic'
elif any(w in n for w in [ elif any(w in n for w in ['water', 'sanitation', 'infrastructure', 'rail']):
'water', 'sanitation', 'infrastructure', 'rail'
]):
return 'Infrastructure' return 'Infrastructure'
else: else:
return 'Other' return 'Sustainability'
dim_indicator['indicator_category'] = dim_indicator['indicator_name'].apply( dim_indicator['indicator_category'] = dim_indicator['indicator_name'].apply(categorize_indicator)
categorize_indicator
)
dim_indicator = dim_indicator.drop_duplicates(subset=['indicator_name'], keep='first') dim_indicator = dim_indicator.drop_duplicates(subset=['indicator_name'], keep='first')
dim_indicator_final = dim_indicator[ dim_indicator_final = dim_indicator[
['indicator_name', 'indicator_category', 'unit', 'direction'] ['indicator_name', 'indicator_category', 'unit', 'direction']
].copy() ].copy()
@@ -401,22 +374,17 @@ class DimensionalModelLoader:
) )
self._add_primary_key(table_name, 'indicator_id') self._add_primary_key(table_name, 'indicator_id')
# Log distribusi for label, col in [('Categories', 'indicator_category'), ('Direction', 'direction')]:
for label, col in [
('Categories', 'indicator_category'),
('Direction', 'direction'),
]:
self.logger.info(f" {label}:") self.logger.info(f" {label}:")
for val, cnt in dim_indicator_final[col].value_counts().items(): for val, cnt in dim_indicator_final[col].value_counts().items():
pct = cnt / len(dim_indicator_final) * 100 self.logger.info(f" - {val}: {cnt} ({cnt/len(dim_indicator_final)*100:.1f}%)")
self.logger.info(f" - {val}: {cnt} ({pct:.1f}%)")
self.load_metadata[table_name].update( self.load_metadata[table_name].update(
{'rows_loaded': rows_loaded, 'status': 'success', 'end_time': datetime.now()} {'rows_loaded': rows_loaded, 'status': 'success', 'end_time': datetime.now()}
) )
log_update(self.client, 'DW', table_name, 'full_load', rows_loaded) log_update(self.client, 'DW', table_name, 'full_load', rows_loaded)
self._save_table_metadata(table_name) self._save_table_metadata(table_name)
self.logger.info(f" dim_indicator: {rows_loaded} rows\n") self.logger.info(f" dim_indicator: {rows_loaded} rows\n")
return rows_loaded return rows_loaded
except Exception as e: except Exception as e:
@@ -427,7 +395,7 @@ class DimensionalModelLoader:
def load_dim_source(self): def load_dim_source(self):
table_name = 'dim_source' table_name = 'dim_source'
self.load_metadata[table_name]['start_time'] = datetime.now() self.load_metadata[table_name]['start_time'] = datetime.now()
self.logger.info("Loading dim_source -> [DW/Gold] fs_asean_gold...") self.logger.info("Loading dim_source [DW/Gold] fs_asean_gold...")
try: try:
source_details = { source_details = {
@@ -487,7 +455,7 @@ class DimensionalModelLoader:
) )
log_update(self.client, 'DW', table_name, 'full_load', rows_loaded) log_update(self.client, 'DW', table_name, 'full_load', rows_loaded)
self._save_table_metadata(table_name) self._save_table_metadata(table_name)
self.logger.info(f" dim_source: {rows_loaded} rows\n") self.logger.info(f" dim_source: {rows_loaded} rows\n")
return rows_loaded return rows_loaded
except Exception as e: except Exception as e:
@@ -498,15 +466,15 @@ class DimensionalModelLoader:
def load_dim_pillar(self): def load_dim_pillar(self):
table_name = 'dim_pillar' table_name = 'dim_pillar'
self.load_metadata[table_name]['start_time'] = datetime.now() self.load_metadata[table_name]['start_time'] = datetime.now()
self.logger.info("Loading dim_pillar -> [DW/Gold] fs_asean_gold...") self.logger.info("Loading dim_pillar [DW/Gold] fs_asean_gold...")
try: try:
pillar_codes = { pillar_codes = {
'Availability': 'AVL', 'Access' : 'ACC', 'Availability': 'AVL', 'Access' : 'ACC',
'Utilization' : 'UTL', 'Stability': 'STB', 'Other': 'OTH', 'Utilization' : 'UTL', 'Stability': 'STB', 'Sustainability': 'STN',
} }
pillars_data = [ pillars_data = [
{'pillar_name': p, 'pillar_code': pillar_codes.get(p, 'OTH')} {'pillar_name': p, 'pillar_code': pillar_codes.get(p, 'STN')}
for p in self.df_clean['pillar'].unique() for p in self.df_clean['pillar'].unique()
] ]
@@ -533,7 +501,7 @@ class DimensionalModelLoader:
) )
log_update(self.client, 'DW', table_name, 'full_load', rows_loaded) log_update(self.client, 'DW', table_name, 'full_load', rows_loaded)
self._save_table_metadata(table_name) self._save_table_metadata(table_name)
self.logger.info(f" dim_pillar: {rows_loaded} rows\n") self.logger.info(f" dim_pillar: {rows_loaded} rows\n")
return rows_loaded return rows_loaded
except Exception as e: except Exception as e:
@@ -548,9 +516,10 @@ class DimensionalModelLoader:
def load_fact_food_security(self): def load_fact_food_security(self):
table_name = 'fact_food_security' table_name = 'fact_food_security'
self.load_metadata[table_name]['start_time'] = datetime.now() self.load_metadata[table_name]['start_time'] = datetime.now()
self.logger.info("Loading fact_food_security -> [DW/Gold] fs_asean_gold...") self.logger.info("Loading fact_food_security [DW/Gold] fs_asean_gold...")
try: try:
# Load dims dari Gold untuk FK resolution
dim_country = read_from_bigquery(self.client, 'dim_country', layer='gold') dim_country = read_from_bigquery(self.client, 'dim_country', layer='gold')
dim_indicator = read_from_bigquery(self.client, 'dim_indicator', layer='gold') dim_indicator = read_from_bigquery(self.client, 'dim_indicator', layer='gold')
dim_time = read_from_bigquery(self.client, 'dim_time', layer='gold') dim_time = read_from_bigquery(self.client, 'dim_time', layer='gold')
@@ -592,9 +561,9 @@ class DimensionalModelLoader:
fact_table['start_year'] = fact_table['year'].astype(int) fact_table['start_year'] = fact_table['year'].astype(int)
fact_table['end_year'] = fact_table['year'].astype(int) fact_table['end_year'] = fact_table['year'].astype(int)
# Resolve FKs
fact_table = fact_table.merge( fact_table = fact_table.merge(
dim_country[['country_id', 'country_name']].rename( dim_country[['country_id', 'country_name']].rename(columns={'country_name': 'country'}),
columns={'country_name': 'country'}),
on='country', how='left' on='country', how='left'
) )
fact_table = fact_table.merge( fact_table = fact_table.merge(
@@ -607,16 +576,15 @@ class DimensionalModelLoader:
on=['start_year', 'end_year'], how='left' on=['start_year', 'end_year'], how='left'
) )
fact_table = fact_table.merge( fact_table = fact_table.merge(
dim_source[['source_id', 'source_name']].rename( dim_source[['source_id', 'source_name']].rename(columns={'source_name': 'source'}),
columns={'source_name': 'source'}),
on='source', how='left' on='source', how='left'
) )
fact_table = fact_table.merge( fact_table = fact_table.merge(
dim_pillar[['pillar_id', 'pillar_name']].rename( dim_pillar[['pillar_id', 'pillar_name']].rename(columns={'pillar_name': 'pillar'}),
columns={'pillar_name': 'pillar'}),
on='pillar', how='left' on='pillar', how='left'
) )
# Filter hanya row dengan FK lengkap
fact_table = fact_table[ fact_table = fact_table[
fact_table['country_id'].notna() & fact_table['country_id'].notna() &
fact_table['indicator_id'].notna() & fact_table['indicator_id'].notna() &
@@ -653,6 +621,7 @@ class DimensionalModelLoader:
layer='gold', write_disposition="WRITE_TRUNCATE", schema=schema layer='gold', write_disposition="WRITE_TRUNCATE", schema=schema
) )
# Add PK + FKs
self._add_primary_key(table_name, 'fact_id') self._add_primary_key(table_name, 'fact_id')
self._add_foreign_key(table_name, 'country_id', 'dim_country', 'country_id') self._add_foreign_key(table_name, 'country_id', 'dim_country', 'country_id')
self._add_foreign_key(table_name, 'indicator_id', 'dim_indicator', 'indicator_id') self._add_foreign_key(table_name, 'indicator_id', 'dim_indicator', 'indicator_id')
@@ -665,7 +634,7 @@ class DimensionalModelLoader:
) )
log_update(self.client, 'DW', table_name, 'full_load', rows_loaded) log_update(self.client, 'DW', table_name, 'full_load', rows_loaded)
self._save_table_metadata(table_name) self._save_table_metadata(table_name)
self.logger.info(f" fact_food_security: {rows_loaded:,} rows\n") self.logger.info(f" fact_food_security: {rows_loaded:,} rows\n")
return rows_loaded return rows_loaded
except Exception as e: except Exception as e:
@@ -748,15 +717,11 @@ class DimensionalModelLoader:
FROM `{get_table_id('dim_indicator', layer='gold')}` FROM `{get_table_id('dim_indicator', layer='gold')}`
GROUP BY direction ORDER BY direction GROUP BY direction ORDER BY direction
""" """
df_dir = self.client.query(query_dir).result().to_dataframe( df_dir = self.client.query(query_dir).result().to_dataframe(create_bqstorage_client=False)
create_bqstorage_client=False
)
if len(df_dir) > 0: if len(df_dir) > 0:
self.logger.info(f"\n Direction Distribution:") self.logger.info(f"\n Direction Distribution:")
for _, row in df_dir.iterrows(): for _, row in df_dir.iterrows():
self.logger.info( self.logger.info(f" {row['direction']:15s}: {int(row['count']):>5,} indicators")
f" {row['direction']:15s}: {int(row['count']):>5,} indicators"
)
self.logger.info("\n [OK] Validation completed") self.logger.info("\n [OK] Validation completed")
except Exception as e: except Exception as e:
@@ -773,19 +738,22 @@ class DimensionalModelLoader:
self.pipeline_metadata['rows_fetched'] = len(self.df_clean) self.pipeline_metadata['rows_fetched'] = len(self.df_clean)
self.logger.info("\n" + "=" * 60) self.logger.info("\n" + "=" * 60)
self.logger.info("DIMENSIONAL MODEL LOAD — DW (Gold) -> fs_asean_gold") self.logger.info("DIMENSIONAL MODEL LOAD — DW (Gold) fs_asean_gold")
self.logger.info("=" * 60) self.logger.info("=" * 60)
self.logger.info("\nLOADING DIMENSION TABLES -> fs_asean_gold") # Dimensions
self.logger.info("\nLOADING DIMENSION TABLES → fs_asean_gold")
self.load_dim_country() self.load_dim_country()
self.load_dim_indicator() self.load_dim_indicator()
self.load_dim_time() self.load_dim_time()
self.load_dim_source() self.load_dim_source()
self.load_dim_pillar() self.load_dim_pillar()
self.logger.info("\nLOADING FACT TABLE -> fs_asean_gold") # Fact
self.logger.info("\nLOADING FACT TABLE → fs_asean_gold")
self.load_fact_food_security() self.load_fact_food_security()
# Validate
self.validate_constraints() self.validate_constraints()
self.validate_data_load() self.validate_data_load()
@@ -801,9 +769,7 @@ class DimensionalModelLoader:
'execution_timestamp': self.pipeline_metadata['start_time'], 'execution_timestamp': self.pipeline_metadata['start_time'],
'completeness_pct' : 100.0, 'completeness_pct' : 100.0,
'config_snapshot' : json.dumps({'load_mode': 'full_refresh', 'layer': 'gold'}), 'config_snapshot' : json.dumps({'load_mode': 'full_refresh', 'layer': 'gold'}),
'validation_metrics' : json.dumps( 'validation_metrics': json.dumps({t: m['status'] for t, m in self.load_metadata.items()}),
{t: m['status'] for t, m in self.load_metadata.items()}
),
'table_name' : 'dimensional_model_pipeline', 'table_name' : 'dimensional_model_pipeline',
}) })
try: try:
@@ -811,6 +777,7 @@ class DimensionalModelLoader:
except Exception as e: except Exception as e:
self.logger.warning(f" [WARN] Could not save pipeline metadata: {e}") self.logger.warning(f" [WARN] Could not save pipeline metadata: {e}")
# Summary
self.logger.info("\n" + "=" * 60) self.logger.info("\n" + "=" * 60)
self.logger.info("DIMENSIONAL MODEL LOAD COMPLETED") self.logger.info("DIMENSIONAL MODEL LOAD COMPLETED")
self.logger.info("=" * 60) self.logger.info("=" * 60)
@@ -818,19 +785,20 @@ class DimensionalModelLoader:
self.logger.info(f" Duration : {duration:.2f}s") self.logger.info(f" Duration : {duration:.2f}s")
self.logger.info(f" Tables :") self.logger.info(f" Tables :")
for tbl, meta in self.load_metadata.items(): for tbl, meta in self.load_metadata.items():
icon = "OK" if meta['status'] == 'success' else "FAIL" icon = "" if meta['status'] == 'success' else ""
self.logger.info(f" [{icon}] {tbl:25s}: {meta['rows_loaded']:>10,} rows") self.logger.info(f" {icon} {tbl:25s}: {meta['rows_loaded']:>10,} rows")
self.logger.info(f"\n Metadata -> [AUDIT] etl_metadata") self.logger.info(f"\n Metadata [AUDIT] etl_metadata")
self.logger.info("=" * 60) self.logger.info("=" * 60)
# ============================================================================= # =============================================================================
# AIRFLOW TASK FUNCTIONS # AIRFLOW TASK FUNCTIONS ← sama polanya dengan raw & cleaned layer
# ============================================================================= # =============================================================================
def run_dimensional_model(): def run_dimensional_model():
""" """
Airflow task: Load dimensional model dari cleaned_integrated. Airflow task: Load dimensional model dari cleaned_integrated.
Dipanggil oleh DAG setelah task cleaned_integration_to_silver selesai. Dipanggil oleh DAG setelah task cleaned_integration_to_silver selesai.
""" """
from scripts.bigquery_config import get_bigquery_client from scripts.bigquery_config import get_bigquery_client
@@ -849,9 +817,9 @@ if __name__ == "__main__":
print("=" * 60) print("=" * 60)
print("BIGQUERY DIMENSIONAL MODEL LOAD") print("BIGQUERY DIMENSIONAL MODEL LOAD")
print("Kimball DW Architecture") print("Kimball DW Architecture")
print(" Input : STAGING (Silver) -> cleaned_integrated (fs_asean_silver)") print(" Input : STAGING (Silver) cleaned_integrated (fs_asean_silver)")
print(" Output : DW (Gold) -> dim_*, fact_* (fs_asean_gold)") print(" Output : DW (Gold) dim_*, fact_* (fs_asean_gold)")
print(" Audit : AUDIT -> etl_logs, etl_metadata (fs_asean_audit)") print(" Audit : AUDIT etl_logs, etl_metadata (fs_asean_audit)")
print("=" * 60) print("=" * 60)
logger = setup_logging() logger = setup_logging()
@@ -859,22 +827,24 @@ if __name__ == "__main__":
print("\nLoading cleaned_integrated (fs_asean_silver)...") print("\nLoading cleaned_integrated (fs_asean_silver)...")
df_clean = read_from_bigquery(client, 'cleaned_integrated', layer='silver') df_clean = read_from_bigquery(client, 'cleaned_integrated', layer='silver')
print(f" Loaded : {len(df_clean):,} rows") print(f" Loaded : {len(df_clean):,} rows")
print(f" Columns : {len(df_clean.columns)}") print(f" Columns : {len(df_clean.columns)}")
print(f" Sources : {df_clean['source'].nunique()}") print(f" Sources : {df_clean['source'].nunique()}")
print(f" Indicators : {df_clean['indicator_standardized'].nunique()}") print(f" Indicators : {df_clean['indicator_standardized'].nunique()}")
print(f" Countries : {df_clean['country'].nunique()}") print(f" Countries : {df_clean['country'].nunique()}")
print(f" Year range : {int(df_clean['year'].min())}-{int(df_clean['year'].max())}") print(f" Year range : {int(df_clean['year'].min())}{int(df_clean['year'].max())}")
if 'direction' in df_clean.columns: if 'direction' in df_clean.columns:
print(f" Direction : {df_clean['direction'].value_counts().to_dict()}") print(f" Direction : {df_clean['direction'].value_counts().to_dict()}")
else:
print(f" [WARN] direction column not found — run bigquery_cleaned_layer.py first")
print("\n[1/1] Dimensional Model Load -> DW (Gold)...") print("\n[1/1] Dimensional Model Load DW (Gold)...")
loader = DimensionalModelLoader(client, df_clean) loader = DimensionalModelLoader(client, df_clean)
loader.run() loader.run()
print("\n" + "=" * 60) print("\n" + "=" * 60)
print("[OK] DIMENSIONAL MODEL ETL COMPLETED") print(" DIMENSIONAL MODEL ETL COMPLETED")
print(" DW (Gold) : dim_country, dim_indicator, dim_time,") print(" 🥇 DW (Gold) : dim_country, dim_indicator, dim_time,")
print(" dim_source, dim_pillar, fact_food_security") print(" dim_source, dim_pillar, fact_food_security")
print(" AUDIT : etl_logs, etl_metadata") print(" 📋 AUDIT : etl_logs, etl_metadata")
print("=" * 60) print("=" * 60)