From ddf15ca9a5a051a0334178edc710bbbdae0a535c Mon Sep 17 00:00:00 2001 From: Debby Date: Tue, 31 Mar 2026 13:54:20 +0700 Subject: [PATCH] move sdgs to analytical_layer --- scripts/bigquery_analytical_layer.py | 427 +++++++++++++++++++------- scripts/bigquery_cleaned_layer.py | 80 +---- scripts/bigquery_dimensional_model.py | 51 +-- 3 files changed, 321 insertions(+), 237 deletions(-) diff --git a/scripts/bigquery_analytical_layer.py b/scripts/bigquery_analytical_layer.py index 8369311..1e7e9a8 100644 --- a/scripts/bigquery_analytical_layer.py +++ b/scripts/bigquery_analytical_layer.py @@ -1,6 +1,6 @@ """ BIGQUERY ANALYTICAL LAYER - DATA FILTERING -FIXED: fact_asean_food_security_selected disimpan di fs_asean_gold (layer='gold') +fact_asean_food_security_selected disimpan di fs_asean_gold (layer='gold') Filtering Order: 1. Load data (single years only) @@ -8,15 +8,20 @@ Filtering Order: 3. Filter complete indicators PER COUNTRY (auto-detect start year, no gaps) 4. Filter countries with ALL pillars (FIXED SET) 5. Filter indicators with consistent presence across FIXED countries -6. Calculate YoY per indicator per country -7. Save analytical table (dengan nama/label lengkap + kolom framework + YoY untuk Looker Studio) +6. Determine SDGs start year & assign framework (MDGs/SDGs) per indicator +7. Calculate YoY per indicator per country +8. Analyze indicator availability by year +9. Save analytical table (dengan nama/label lengkap + kolom framework + YoY untuk Looker Studio) -UPDATED: -- Kolom 'framework' (MDGs/SDGs) dipropagasi dari dim_indicator ke tabel output. - Hal ini memungkinkan Looker Studio melakukan filter/slice berdasarkan framework - tanpa perlu join ulang ke dim_indicator. -- Kolom 'yoy_change' dan 'yoy_pct' ditambahkan untuk analisis Year-over-Year - per indikator per negara langsung di Looker Studio. +FRAMEWORK LOGIC: +- SDG_START_YEAR = 2016 (default; auto-detect jika indikator SDGs pertama kali muncul lebih awal/lambat) +- Indikator yang namanya ada di SDG_INDICATOR_KEYWORDS: + * Jika data mulai >= SDG_START_YEAR -> 'SDGs' + * Jika data mulai < SDG_START_YEAR -> 'MDGs' + (artinya indikator ini sudah ada sebelum SDGs, mis. undernourishment) +- Indikator yang namanya TIDAK ada di SDG_INDICATOR_KEYWORDS -> 'MDGs' +- Penentuan framework dilakukan SETELAH filter selesai (data sudah bersih & range sudah fixed) + sehingga start_year per indikator yang digunakan adalah start_year AKTUAL di dataset ini. """ import pandas as pd @@ -42,6 +47,81 @@ from scripts.bigquery_helpers import ( from google.cloud import bigquery +# ============================================================================= +# SDG INDICATOR KEYWORDS +# ============================================================================= +# Daftar nama indikator (lowercase) yang termasuk dalam SDG Goal 2. +# Matching dilakukan dengan `kw in indicator_name.lower()` sehingga +# partial match tetap valid (menangani variasi format nama). +# +# Logika framework: +# - Nama ada di set ini + start_year >= SDG_START_YEAR -> 'SDGs' +# - Nama ada di set ini + start_year < SDG_START_YEAR -> 'MDGs' +# (indikator sudah eksis sebelum SDGs, mis. prevalence of undernourishment) +# - Nama TIDAK ada di set ini -> 'MDGs' + +SDG_INDICATOR_KEYWORDS = frozenset([ + # TARGET 2.1.1 — Prevalence of undernourishment (shared, sudah ada sebelum SDGs) + "prevalence of undernourishment (percent) (3-year average)", + "number of people undernourished (million) (3-year average)", + # TARGET 2.1.2 — FIES (SDGs only) + "prevalence of severe food insecurity in the total population (percent) (3-year average)", + "prevalence of severe food insecurity in the male adult population (percent) (3-year average)", + "prevalence of severe food insecurity in the female adult population (percent) (3-year average)", + "prevalence of moderate or severe food insecurity in the total population (percent) (3-year average)", + "prevalence of moderate or severe food insecurity in the male adult population (percent) (3-year average)", + "prevalence of moderate or severe food insecurity in the female adult population (percent) (3-year average)", + "number of severely food insecure people (million) (3-year average)", + "number of severely food insecure male adults (million) (3-year average)", + "number of severely food insecure female adults (million) (3-year average)", + "number of moderately or severely food insecure people (million) (3-year average)", + "number of moderately or severely food insecure male adults (million) (3-year average)", + "number of moderately or severely food insecure female adults (million) (3-year average)", + # TARGET 2.2.1 — Stunting (shared) + "percentage of children under 5 years of age who are stunted (modelled estimates) (percent)", + "number of children under 5 years of age who are stunted (modeled estimates) (million)", + # TARGET 2.2.2 — Wasting & Overweight (shared) + "percentage of children under 5 years affected by wasting (percent)", + "number of children under 5 years affected by wasting (million)", + "percentage of children under 5 years of age who are overweight (modelled estimates) (percent)", + "number of children under 5 years of age who are overweight (modeled estimates) (million)", + # TARGET 2.2.3 — Anaemia (SDGs only) + "prevalence of anemia among women of reproductive age (15-49 years) (percent)", + "number of women of reproductive age (15-49 years) affected by anemia (million)", +]) + +# Tahun resmi SDGs mulai berlaku (2030 Agenda adopted September 2015, +# data reporting mulai 2016). Dipakai sebagai default jika auto-detect gagal. +SDG_START_YEAR_DEFAULT = 2016 + + +def assign_framework_dynamic( + indicator_name: str, + indicator_start_year: int, + sdg_start_year: int, +) -> str: + """ + Tentukan framework (MDGs/SDGs) berdasarkan: + 1. Apakah nama indikator ada di SDG_INDICATOR_KEYWORDS? + 2. Apakah data indikator ini mulai pada tahun >= sdg_start_year? + + Args: + indicator_name : Nama indikator (akan di-lowercase untuk matching) + indicator_start_year : Tahun pertama data indikator ini tersedia di dataset + sdg_start_year : Tahun mulai SDGs (dari auto-detect atau default) + + Returns: + 'SDGs' jika indikator termasuk SDG list DAN mulai >= sdg_start_year + 'MDGs' untuk semua kasus lainnya + """ + ind_lower = str(indicator_name).lower().strip() + is_sdg_name = any(kw in ind_lower for kw in SDG_INDICATOR_KEYWORDS) + + if is_sdg_name and indicator_start_year >= sdg_start_year: + return 'SDGs' + return 'MDGs' + + # ============================================================================= # ANALYTICAL LAYER CLASS # ============================================================================= @@ -54,8 +134,9 @@ class AnalyticalLayerLoader: 1. Complete per country (no gaps from start_year to end_year) 2. Filter countries with all pillars 3. Ensure indicators have consistent country count across all years - 4. Calculate YoY (year-over-year) change per indicator per country - 5. Save dengan kolom lengkap (nama + ID + framework + YoY) untuk Looker Studio + 4. Determine SDGs start year & assign framework per indicator dynamically + 5. Calculate YoY (year-over-year) change per indicator per country + 6. Save dengan kolom lengkap (nama + ID + framework + YoY) untuk Looker Studio Output: fact_asean_food_security_selected -> DW layer (Gold) -> fs_asean_gold @@ -83,6 +164,9 @@ class AnalyticalLayerLoader: self.end_year = None self.baseline_year = 2023 + # SDGs-related — di-set oleh determine_sdg_start_year() + self.sdg_start_year = SDG_START_YEAR_DEFAULT + self.pipeline_metadata = { 'source_class' : self.__class__.__name__, 'start_time' : None, @@ -97,13 +181,18 @@ class AnalyticalLayerLoader: self.pipeline_start = None self.pipeline_end = None + # ------------------------------------------------------------------ + # STEP 1: LOAD SOURCE DATA + # ------------------------------------------------------------------ + def load_source_data(self): self.logger.info("\n" + "=" * 80) self.logger.info("STEP 1: LOADING SOURCE DATA from fs_asean_gold") self.logger.info("=" * 80) try: - # Sertakan kolom framework dari dim_indicator dalam query + # Tidak include framework dari dim_indicator — + # framework akan ditentukan dinamis di Step 6 (determine_sdg_start_year) query = f""" SELECT f.country_id, @@ -111,7 +200,6 @@ class AnalyticalLayerLoader: f.indicator_id, i.indicator_name, i.direction, - i.framework, f.pillar_id, p.pillar_name, f.time_id, @@ -128,7 +216,7 @@ class AnalyticalLayerLoader: JOIN `{get_table_id('dim_time', layer='gold')}` t ON f.time_id = t.time_id """ - self.logger.info("Loading fact table with dimensions (incl. framework)...") + self.logger.info("Loading fact table with dimensions...") self.df_clean = self.client.query(query).result().to_dataframe( create_bqstorage_client=False ) @@ -144,19 +232,6 @@ class AnalyticalLayerLoader: f" Year ranges (is_year_range=True): {yr.get(True, 0):,}" ) - # Validasi kolom framework tersedia - if 'framework' not in self.df_clean.columns: - raise ValueError( - "Kolom 'framework' tidak ditemukan di dim_indicator. " - "Pastikan bigquery_cleaned_layer.py dan bigquery_dimensional_model.py " - "sudah dijalankan dengan versi terbaru." - ) - - fw_dist = self.df_clean.drop_duplicates('indicator_id')['framework'].value_counts() - self.logger.info(f" Framework distribution (per indikator unik):") - for fw, cnt in fw_dist.items(): - self.logger.info(f" {fw}: {cnt} indicators") - self.df_indicator = read_from_bigquery(self.client, 'dim_indicator', layer='gold') self.df_country = read_from_bigquery(self.client, 'dim_country', layer='gold') self.df_pillar = read_from_bigquery(self.client, 'dim_pillar', layer='gold') @@ -172,6 +247,10 @@ class AnalyticalLayerLoader: self.logger.error(f"Error loading source data: {e}") raise + # ------------------------------------------------------------------ + # STEP 2: DETERMINE YEAR BOUNDARIES + # ------------------------------------------------------------------ + def determine_year_boundaries(self): self.logger.info("\n" + "=" * 80) self.logger.info("STEP 2: DETERMINE YEAR BOUNDARIES") @@ -214,6 +293,10 @@ class AnalyticalLayerLoader: self.logger.info(f" Rows after: {len(self.df_clean):,}") return self.df_clean + # ------------------------------------------------------------------ + # STEP 3: FILTER COMPLETE INDICATORS PER COUNTRY + # ------------------------------------------------------------------ + def filter_complete_indicators_per_country(self): self.logger.info("\n" + "=" * 80) self.logger.info("STEP 3: FILTER COMPLETE INDICATORS PER COUNTRY (NO GAPS)") @@ -285,6 +368,10 @@ class AnalyticalLayerLoader: self.logger.info(f" Indicators: {self.df_clean['indicator_id'].nunique()}") return self.df_clean + # ------------------------------------------------------------------ + # STEP 4: SELECT COUNTRIES WITH ALL PILLARS + # ------------------------------------------------------------------ + def select_countries_with_all_pillars(self): self.logger.info("\n" + "=" * 80) self.logger.info("STEP 4: SELECT COUNTRIES WITH ALL PILLARS (FIXED SET)") @@ -323,6 +410,10 @@ class AnalyticalLayerLoader: self.logger.info(f" Rows after: {len(self.df_clean):,}") return self.df_clean + # ------------------------------------------------------------------ + # STEP 5: FILTER INDICATORS CONSISTENT ACROSS FIXED COUNTRIES + # ------------------------------------------------------------------ + def filter_indicators_consistent_across_fixed_countries(self): self.logger.info("\n" + "=" * 80) self.logger.info("STEP 5: FILTER INDICATORS WITH CONSISTENT PRESENCE") @@ -404,9 +495,138 @@ class AnalyticalLayerLoader: self.logger.info(f" Pillars: {self.df_clean['pillar_id'].nunique()}") return self.df_clean + # ------------------------------------------------------------------ + # STEP 6: DETERMINE SDG START YEAR & ASSIGN FRAMEWORK + # ------------------------------------------------------------------ + + def determine_sdg_start_year(self): + """ + Tentukan tahun mulai SDGs secara otomatis dari data aktual, lalu + assign kolom 'framework' (MDGs/SDGs) ke setiap baris di df_clean. + + Logika penentuan SDG_START_YEAR: + - Cari indikator yang namanya ada di SDG_INDICATOR_KEYWORDS (FIES, anaemia, dll.) + dan yang diyakini HANYA ada di SDGs (bukan shared dengan MDGs). + Proxy: indikator dengan keyword 'food insecurity' atau 'anemia'. + - Ambil tahun pertama (min year) dari indikator-indikator tersebut di dataset ini. + - Jika ditemukan -> sdg_start_year = tahun pertama itu. + - Jika tidak ditemukan -> sdg_start_year = SDG_START_YEAR_DEFAULT (2016). + + Logika assign framework per indikator (assign_framework_dynamic): + - Nama ada di SDG_INDICATOR_KEYWORDS + start_year >= sdg_start_year -> 'SDGs' + - Nama ada di SDG_INDICATOR_KEYWORDS + start_year < sdg_start_year -> 'MDGs' + (indikator seperti undernourishment sudah ada sebelum SDGs) + - Nama TIDAK ada di SDG_INDICATOR_KEYWORDS -> 'MDGs' + """ + self.logger.info("\n" + "=" * 80) + self.logger.info("STEP 6: DETERMINE SDG START YEAR & ASSIGN FRAMEWORK") + self.logger.info("=" * 80) + + # --- 6a. Auto-detect SDG start year dari data aktual --- + # Proxy SDGs-only: indikator yang pasti baru di SDGs (FIES & anaemia) + sdg_proxy_keywords = [ + 'food insecurity', + 'anemia', + 'anaemia', + ] + + sdg_proxy_mask = self.df_clean['indicator_name'].str.lower().apply( + lambda n: any(kw in n for kw in sdg_proxy_keywords) + ) + df_sdg_proxy = self.df_clean[sdg_proxy_mask] + + if len(df_sdg_proxy) > 0: + detected_start = int(df_sdg_proxy['year'].min()) + self.sdg_start_year = detected_start + self.logger.info( + f"\n [OK] SDG start year AUTO-DETECTED dari data: {self.sdg_start_year}" + ) + self.logger.info(f" Proxy indicators used (sample):") + proxy_sample = ( + df_sdg_proxy['indicator_name'] + .drop_duplicates() + .head(5) + .tolist() + ) + for ind in proxy_sample: + self.logger.info(f" - {ind}") + else: + self.sdg_start_year = SDG_START_YEAR_DEFAULT + self.logger.warning( + f"\n [WARN] SDG proxy indicators not found in dataset. " + f"Using default: {self.sdg_start_year}" + ) + + self.logger.info(f"\n SDG_START_YEAR = {self.sdg_start_year}") + + # --- 6b. Hitung start_year aktual per indikator di dataset ini --- + indicator_start = ( + self.df_clean + .groupby(['indicator_id', 'indicator_name'])['year'] + .min() + .reset_index() + ) + indicator_start.columns = ['indicator_id', 'indicator_name', 'actual_start_year'] + + # --- 6c. Assign framework per indikator --- + indicator_start['framework'] = indicator_start.apply( + lambda row: assign_framework_dynamic( + indicator_name = row['indicator_name'], + indicator_start_year = int(row['actual_start_year']), + sdg_start_year = self.sdg_start_year, + ), + axis=1 + ) + + # --- 6d. Log hasil assignment --- + self.logger.info(f"\n Framework assignment per indicator:") + self.logger.info(f" {'-'*85}") + self.logger.info( + f" {'ID':<5} {'Framework':<10} {'Start Yr':<10} {'Indicator Name'}" + ) + self.logger.info(f" {'-'*85}") + + for _, row in indicator_start.sort_values( + ['framework', 'actual_start_year', 'indicator_name'] + ).iterrows(): + is_in_sdg_list = any( + kw in str(row['indicator_name']).lower() + for kw in SDG_INDICATOR_KEYWORDS + ) + note = " [in SDG list]" if is_in_sdg_list else "" + self.logger.info( + f" {int(row['indicator_id']):<5} {row['framework']:<10} " + f"{int(row['actual_start_year']):<10} {row['indicator_name'][:55]}{note}" + ) + + fw_summary = indicator_start['framework'].value_counts() + self.logger.info(f"\n Framework summary:") + for fw, cnt in fw_summary.items(): + self.logger.info(f" {fw}: {cnt} indicators") + + # --- 6e. Merge framework ke df_clean --- + self.df_clean = self.df_clean.merge( + indicator_start[['indicator_id', 'framework']], + on='indicator_id', how='left' + ) + self.df_clean['framework'] = self.df_clean['framework'].fillna('MDGs') + + self.logger.info(f"\n [OK] Kolom 'framework' ditambahkan ke df_clean") + self.logger.info( + f" Row distribution — MDGs: " + f"{(self.df_clean['framework'] == 'MDGs').sum():,} | " + f"SDGs: {(self.df_clean['framework'] == 'SDGs').sum():,}" + ) + + return self.df_clean + + # ------------------------------------------------------------------ + # STEP 6b: VERIFY NO GAPS + # ------------------------------------------------------------------ + def verify_no_gaps(self): self.logger.info("\n" + "=" * 80) - self.logger.info("STEP 6: VERIFY NO GAPS") + self.logger.info("STEP 6c: VERIFY NO GAPS") self.logger.info("=" * 80) expected_countries = len(self.selected_country_ids) @@ -431,53 +651,43 @@ class AnalyticalLayerLoader: return True + # ------------------------------------------------------------------ + # STEP 7: CALCULATE YOY + # ------------------------------------------------------------------ + def calculate_yoy(self): """ Hitung Year-over-Year (YoY) per indikator per negara. - Kolom yang ditambahkan ke df_clean: + Kolom yang ditambahkan: yoy_change : selisih absolut -> value - value_tahun_sebelumnya yoy_pct : perubahan relatif -> (yoy_change / abs(value_prev)) * 100 - Catatan: - - Baris tahun pertama per kombinasi country-indicator akan bernilai NULL - (tidak ada tahun sebelumnya sebagai pembanding) — ini intentional. - - value_prev di-drop setelah kalkulasi, tidak ikut disimpan ke BigQuery. - - Dilakukan SETELAH verify_no_gaps() agar data sudah clean dan sorted benar. + Baris tahun pertama per kombinasi country-indicator bernilai NULL (intentional). """ self.logger.info("\n" + "=" * 80) - self.logger.info("STEP 6b: CALCULATE YEAR-OVER-YEAR (YoY) PER INDICATOR PER COUNTRY") + self.logger.info("STEP 7: CALCULATE YEAR-OVER-YEAR (YoY) PER INDICATOR PER COUNTRY") self.logger.info("=" * 80) df = self.df_clean.sort_values(['country_id', 'indicator_id', 'year']).copy() - # Nilai tahun sebelumnya (shifted within each country-indicator group) df['value_prev'] = df.groupby(['country_id', 'indicator_id'])['value'].shift(1) - - # YoY absolute change: value(t) - value(t-1) df['yoy_change'] = df['value'] - df['value_prev'] - - # YoY percentage change: (yoy_change / |value_prev|) * 100 - # Hindari division by zero — jika value_prev == 0 atau NaN, hasilnya NaN - df['yoy_pct'] = np.where( + df['yoy_pct'] = np.where( df['value_prev'].notna() & (df['value_prev'] != 0), (df['yoy_change'] / df['value_prev'].abs()) * 100, np.nan ) - - # Drop kolom bantu value_prev, tidak ikut disimpan ke BigQuery df = df.drop(columns=['value_prev']) - # Log ringkasan - total_rows = len(df) - valid_yoy = df['yoy_pct'].notna().sum() - null_yoy = df['yoy_pct'].isna().sum() + total_rows = len(df) + valid_yoy = df['yoy_pct'].notna().sum() + null_yoy = df['yoy_pct'].isna().sum() self.logger.info(f" Total rows : {total_rows:,}") self.logger.info(f" YoY calculated : {valid_yoy:,}") self.logger.info(f" YoY NULL (base yr): {null_yoy:,} <- tahun pertama per country-indicator") - # Log distribusi YoY per indikator (sample) per_ind = ( df[df['yoy_pct'].notna()] .groupby(['indicator_id', 'indicator_name'])['yoy_pct'] @@ -504,7 +714,6 @@ class AnalyticalLayerLoader: f"{row['min']:>+8.2f} {row['max']:>+8.2f}" ) - # Log distribusi YoY per negara (ringkasan) per_country = ( df[df['yoy_pct'].notna()] .groupby(['country_id', 'country_name'])['yoy_pct'] @@ -526,9 +735,13 @@ class AnalyticalLayerLoader: self.logger.info(f"\n [OK] YoY columns added: yoy_change, yoy_pct") return self.df_clean + # ------------------------------------------------------------------ + # STEP 8: ANALYZE INDICATOR AVAILABILITY BY YEAR + # ------------------------------------------------------------------ + def analyze_indicator_availability_by_year(self): self.logger.info("\n" + "=" * 80) - self.logger.info("STEP 7: ANALYZE INDICATOR AVAILABILITY BY YEAR") + self.logger.info("STEP 8: ANALYZE INDICATOR AVAILABILITY BY YEAR") self.logger.info("=" * 80) year_stats = self.df_clean.groupby('year').agg({ @@ -586,6 +799,10 @@ class AnalyticalLayerLoader: return year_stats + # ------------------------------------------------------------------ + # STEP 9: SAVE ANALYTICAL TABLE + # ------------------------------------------------------------------ + def save_analytical_table(self): """ Simpan fact_asean_food_security_selected ke Gold layer. @@ -594,44 +811,20 @@ class AnalyticalLayerLoader: country_id, country_name — dimensi negara indicator_id, indicator_name — dimensi indikator direction — arah penilaian (higher/lower_better) - framework — MDGs / SDGs (untuk filter Looker Studio) + framework — MDGs/SDGs (ditentukan di Step 6) pillar_id, pillar_name — dimensi pilar time_id, year — dimensi waktu value — nilai indikator - yoy_change — perubahan absolut YoY (NULLABLE: NULL di tahun pertama) - yoy_pct — perubahan relatif YoY dalam % (NULLABLE: NULL di tahun pertama) - - Kolom framework memungkinkan filter langsung di Looker Studio tanpa join ke dim_indicator. - Kolom yoy_change dan yoy_pct memungkinkan analisis tren tahunan langsung di Looker Studio. + yoy_change — perubahan absolut YoY (NULL di tahun pertama) + yoy_pct — perubahan relatif YoY dalam % (NULL di tahun pertama) """ table_name = 'fact_asean_food_security_selected' self.logger.info("\n" + "=" * 80) - self.logger.info(f"STEP 8: SAVE TO [DW/Gold] {table_name} -> fs_asean_gold") + self.logger.info(f"STEP 9: SAVE TO [DW/Gold] {table_name} -> fs_asean_gold") self.logger.info("=" * 80) try: - # Pastikan kolom framework tersedia di df_clean - if 'framework' not in self.df_clean.columns: - self.logger.warning( - " [WARN] Kolom 'framework' tidak ada di df_clean. " - "Melakukan join ke dim_indicator sebagai fallback..." - ) - dim_ind = read_from_bigquery(self.client, 'dim_indicator', layer='gold') - if 'framework' in dim_ind.columns: - self.df_clean = self.df_clean.merge( - dim_ind[['indicator_id', 'framework']], - on='indicator_id', how='left' - ) - self.df_clean['framework'] = self.df_clean['framework'].fillna('MDGs') - self.logger.info(" [OK] framework di-join dari dim_indicator") - else: - self.df_clean['framework'] = 'MDGs' - self.logger.warning( - " [WARN] dim_indicator juga tidak punya kolom framework. " - "Default: MDGs. Jalankan ulang pipeline dari cleaned_layer." - ) - # Pastikan kolom YoY tersedia — fallback jika calculate_yoy() tidak dipanggil if 'yoy_change' not in self.df_clean.columns or 'yoy_pct' not in self.df_clean.columns: self.logger.warning( @@ -659,32 +852,28 @@ class AnalyticalLayerLoader: ['year', 'country_name', 'pillar_name', 'indicator_name'] ).reset_index(drop=True) - # Pastikan tipe data konsisten - analytical_df['country_id'] = analytical_df['country_id'].astype(int) - analytical_df['country_name'] = analytical_df['country_name'].astype(str) - analytical_df['indicator_id'] = analytical_df['indicator_id'].astype(int) - analytical_df['indicator_name']= analytical_df['indicator_name'].astype(str) - analytical_df['direction'] = analytical_df['direction'].astype(str) - analytical_df['framework'] = analytical_df['framework'].astype(str) - analytical_df['pillar_id'] = analytical_df['pillar_id'].astype(int) - analytical_df['pillar_name'] = analytical_df['pillar_name'].astype(str) - analytical_df['time_id'] = analytical_df['time_id'].astype(int) - analytical_df['year'] = analytical_df['year'].astype(int) - analytical_df['value'] = analytical_df['value'].astype(float) - # yoy_change dan yoy_pct tetap float — NULL (NaN) di tahun pertama adalah intentional - analytical_df['yoy_change'] = analytical_df['yoy_change'].astype(float) - analytical_df['yoy_pct'] = analytical_df['yoy_pct'].astype(float) + analytical_df['country_id'] = analytical_df['country_id'].astype(int) + analytical_df['country_name'] = analytical_df['country_name'].astype(str) + analytical_df['indicator_id'] = analytical_df['indicator_id'].astype(int) + analytical_df['indicator_name'] = analytical_df['indicator_name'].astype(str) + analytical_df['direction'] = analytical_df['direction'].astype(str) + analytical_df['framework'] = analytical_df['framework'].astype(str) + analytical_df['pillar_id'] = analytical_df['pillar_id'].astype(int) + analytical_df['pillar_name'] = analytical_df['pillar_name'].astype(str) + analytical_df['time_id'] = analytical_df['time_id'].astype(int) + analytical_df['year'] = analytical_df['year'].astype(int) + analytical_df['value'] = analytical_df['value'].astype(float) + analytical_df['yoy_change'] = analytical_df['yoy_change'].astype(float) + analytical_df['yoy_pct'] = analytical_df['yoy_pct'].astype(float) self.logger.info(f" Kolom yang disimpan: {list(analytical_df.columns)}") self.logger.info(f" Total rows: {len(analytical_df):,}") - # Log distribusi framework fw_dist = analytical_df.drop_duplicates('indicator_id')['framework'].value_counts() self.logger.info(f" Framework distribution (per indikator unik):") for fw, cnt in fw_dist.items(): self.logger.info(f" {fw}: {cnt} indicators") - # Log statistik YoY yoy_valid = analytical_df['yoy_pct'].notna().sum() yoy_null = analytical_df['yoy_pct'].isna().sum() self.logger.info(f" YoY rows (calculated): {yoy_valid:,}") @@ -702,7 +891,6 @@ class AnalyticalLayerLoader: bigquery.SchemaField("time_id", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("value", "FLOAT", mode="REQUIRED"), - # NULLABLE karena tahun pertama per country-indicator tidak memiliki nilai sebelumnya bigquery.SchemaField("yoy_change", "FLOAT", mode="NULLABLE"), bigquery.SchemaField("yoy_pct", "FLOAT", mode="NULLABLE"), ] @@ -725,19 +913,21 @@ class AnalyticalLayerLoader: 'rows_loaded' : rows_loaded, 'completeness_pct' : 100.0, 'config_snapshot' : json.dumps({ - 'start_year' : self.start_year, - 'end_year' : self.end_year, - 'fixed_countries': len(self.selected_country_ids), - 'no_gaps' : True, - 'layer' : 'gold', - 'columns' : ( - 'id + name + direction + framework + value + ' - 'yoy_change + yoy_pct (Looker Studio ready)' - ) + 'start_year' : self.start_year, + 'end_year' : self.end_year, + 'sdg_start_year' : self.sdg_start_year, + 'fixed_countries' : len(self.selected_country_ids), + 'no_gaps' : True, + 'layer' : 'gold', + 'framework_logic' : ( + f"SDGs if in SDG_INDICATOR_KEYWORDS AND start_year >= {self.sdg_start_year}, " + "else MDGs" + ), }), 'validation_metrics' : json.dumps({ 'fixed_countries' : len(self.selected_country_ids), 'total_indicators': int(self.df_clean['indicator_id'].nunique()), + 'sdg_start_year' : self.sdg_start_year, 'framework_dist' : fw_dist.to_dict(), 'yoy_rows_valid' : int(yoy_valid), 'yoy_rows_null' : int(yoy_null), @@ -755,6 +945,10 @@ class AnalyticalLayerLoader: self.logger.error(f"Error saving: {e}") raise + # ------------------------------------------------------------------ + # RUN + # ------------------------------------------------------------------ + def run(self): self.pipeline_start = datetime.now() self.pipeline_metadata['start_time'] = self.pipeline_start @@ -763,6 +957,7 @@ class AnalyticalLayerLoader: self.logger.info("Output: fact_asean_food_security_selected -> fs_asean_gold") self.logger.info("Kolom: country_id/name, indicator_id/name, direction, framework,") self.logger.info(" pillar_id/name, time_id, year, value, yoy_change, yoy_pct") + self.logger.info(f"Framework: ditentukan dinamis berdasarkan SDG_START_YEAR (auto-detect)") self.logger.info("=" * 80) self.load_source_data() @@ -770,8 +965,9 @@ class AnalyticalLayerLoader: self.filter_complete_indicators_per_country() self.select_countries_with_all_pillars() self.filter_indicators_consistent_across_fixed_countries() - self.verify_no_gaps() - self.calculate_yoy() # <-- Step 6b: hitung YoY + self.determine_sdg_start_year() # Step 6: auto-detect SDG year & assign framework + self.verify_no_gaps() # Step 6c: verifikasi tidak ada gap + self.calculate_yoy() # Step 7: hitung YoY self.analyze_indicator_availability_by_year() self.save_analytical_table() @@ -781,11 +977,12 @@ class AnalyticalLayerLoader: self.logger.info("\n" + "=" * 80) self.logger.info("COMPLETED") self.logger.info("=" * 80) - self.logger.info(f" Duration : {duration:.2f}s") - self.logger.info(f" Year Range : {self.start_year}-{self.end_year}") - self.logger.info(f" Countries : {len(self.selected_country_ids)}") - self.logger.info(f" Indicators : {self.df_clean['indicator_id'].nunique()}") - self.logger.info(f" Rows Loaded: {self.pipeline_metadata['rows_loaded']:,}") + self.logger.info(f" Duration : {duration:.2f}s") + self.logger.info(f" Year Range : {self.start_year}-{self.end_year}") + self.logger.info(f" SDG Start Yr : {self.sdg_start_year}") + self.logger.info(f" Countries : {len(self.selected_country_ids)}") + self.logger.info(f" Indicators : {self.df_clean['indicator_id'].nunique()}") + self.logger.info(f" Rows Loaded : {self.pipeline_metadata['rows_loaded']:,}") # ============================================================================= @@ -810,7 +1007,9 @@ def run_analytical_layer(): if __name__ == "__main__": print("=" * 80) + print("BIGQUERY ANALYTICAL LAYER - DATA FILTERING") print("Output: fact_asean_food_security_selected -> fs_asean_gold") + print("Framework: MDGs/SDGs ditentukan dinamis dari data (auto-detect SDG start year)") print("=" * 80) logger = setup_logging() @@ -820,4 +1019,6 @@ if __name__ == "__main__": print("\n" + "=" * 80) print("[OK] COMPLETED") + print(f" SDG Start Year : {loader.sdg_start_year}") + print(f" Rows Loaded : {loader.pipeline_metadata['rows_loaded']:,}") print("=" * 80) \ No newline at end of file diff --git a/scripts/bigquery_cleaned_layer.py b/scripts/bigquery_cleaned_layer.py index cef3d9f..bb882cc 100644 --- a/scripts/bigquery_cleaned_layer.py +++ b/scripts/bigquery_cleaned_layer.py @@ -62,7 +62,6 @@ COLUMN_CONSTRAINTS = { 'unit' : 20, 'pillar' : 20, 'direction' : 15, - 'framework' : 5, # 'MDGs'=4, 'SDGs'=4 } @@ -292,62 +291,6 @@ def assign_direction(indicator_name: str) -> str: return 'higher_better' -# ============================================================================= -# FRAMEWORK CLASSIFICATION (MDGs vs SDGs) -# ============================================================================= - -# Daftar keyword eksplisit dari SDG Goal 2 Khusus FIES (2030 Agenda for Sustainable Development). -# Disimpan lowercase agar matching tidak sensitif terhadap kapitalisasi input. - -SDG_INDICATOR_KEYWORDS = frozenset([ - "prevalence of severe food insecurity in the total population (percent) (3-year average)", - "prevalence of severe food insecurity in the male adult population (percent) (3-year average)", - "prevalence of severe food insecurity in the female adult population (percent) (3-year average)", - "prevalence of moderate or severe food insecurity in the total population (percent) (3-year average)", - "prevalence of moderate or severe food insecurity in the male adult population (percent) (3-year average)", - "prevalence of moderate or severe food insecurity in the female adult population (percent) (3-year average)", - "number of severely food insecure people (million) (3-year average)", - "number of severely food insecure male adults (million) (3-year average)", - "number of severely food insecure female adults (million) (3-year average)", - "number of moderately or severely food insecure people (million) (3-year average)", - "number of moderately or severely food insecure male adults (million) (3-year average)", - "number of moderately or severely food insecure female adults (million) (3-year average)", -]) - - -def assign_framework(indicator_name: str) -> str: - """ - Assign framework berdasarkan daftar eksplisit indikator SDG Goal 2 - dari 2030 Agenda for Sustainable Development (versi Maret 2020). - - Logika: - - Lowercase nama indikator input - - Cek apakah ada keyword SDG (lowercase) yang terkandung di dalam nama indikator - - Jika ya -> 'SDGs' - - Jika tidak -> 'MDGs' (indikator FAO/lama yang bukan SDG resmi) - - FIX: Bug sebelumnya menggunakan `kw in ind` (cek apakah keyword ada di dalam ind), - padahal seharusnya `kw in ind` sudah benar secara logika — tapi keyword di-set - dengan kapitalisasi campuran sementara `ind` sudah di-lowercase, sehingga - perbandingan selalu gagal. Solusi: simpan keyword dalam lowercase di set, - sehingga `kw in ind` bekerja dengan benar. - - Return values: 'MDGs' atau 'SDGs' - Panjang max 4 chars (dalam constraint varchar(5)). - """ - if pd.isna(indicator_name): - return 'MDGs' - - # Lowercase input agar matching tidak sensitif terhadap kapitalisasi - ind = str(indicator_name).lower().strip() - - # Cek apakah salah satu keyword SDG (sudah lowercase) ada di dalam ind - if any(kw in ind for kw in SDG_INDICATOR_KEYWORDS): - return 'SDGs' - - return 'MDGs' - - # ============================================================================= # CLEANED DATA LOADER # ============================================================================= @@ -365,7 +308,7 @@ class CleanedDataLoader: 1. Standardize country names (ASEAN) 2. Remove missing values 3. Remove duplicates - 4. Add pillar, direction & framework classification + 4. Add pillar & direction classification 5. Apply column constraints 6. Load ke BigQuery 7. Log ke Audit layer @@ -382,7 +325,6 @@ class CleanedDataLoader: bigquery.SchemaField("unit", "STRING", mode="NULLABLE"), bigquery.SchemaField("pillar", "STRING", mode="REQUIRED"), bigquery.SchemaField("direction", "STRING", mode="REQUIRED"), - bigquery.SchemaField("framework", "STRING", mode="REQUIRED"), ] def __init__(self, client: bigquery.Client, load_mode: str = 'full_refresh'): @@ -449,12 +391,11 @@ class CleanedDataLoader: return df_clean def _step_add_classifications(self, df: pd.DataFrame) -> pd.DataFrame: - print("\n [Step 4/5] Add pillar, direction & framework classification...") + print("\n [Step 4/5] Add pillar & direction classification...") df = df.copy() df['pillar'] = df['indicator_standardized'].apply(assign_pillar) df['direction'] = df['indicator_standardized'].apply(assign_direction) - df['framework'] = df['indicator_standardized'].apply(assign_framework) pillar_counts = df['pillar'].value_counts() print(f" Pillar distribution:") @@ -467,21 +408,6 @@ class CleanedDataLoader: pct = count / len(df) * 100 print(f" - {direction}: {count:,} ({pct:.1f}%)") - framework_counts = df['framework'].value_counts() - print(f" Framework distribution:") - for fw, count in framework_counts.items(): - pct = count / len(df) * 100 - print(f" - {fw}: {count:,} ({pct:.1f}%)") - - # Log indikator yang terklasifikasi SDGs untuk verifikasi - sdg_inds = ( - df[df['framework'] == 'SDGs']['indicator_standardized'] - .drop_duplicates().sort_values().tolist() - ) - print(f"\n SDG indicators ({len(sdg_inds)}):") - for ind in sdg_inds: - print(f" - {ind}") - return df def _step_apply_constraints(self, df: pd.DataFrame) -> pd.DataFrame: @@ -506,7 +432,7 @@ class CleanedDataLoader: 'max' : int(df['year'].max()) if not df['year'].isnull().all() else None, 'unique_years': int(df['year'].nunique()) } - for col in ('pillar', 'direction', 'framework', 'source'): + for col in ('pillar', 'direction', 'source'): if col in df.columns: validation[f'{col}_breakdown'] = { str(k): int(v) for k, v in df[col].value_counts().to_dict().items() diff --git a/scripts/bigquery_dimensional_model.py b/scripts/bigquery_dimensional_model.py index bf59fe7..c6394ef 100644 --- a/scripts/bigquery_dimensional_model.py +++ b/scripts/bigquery_dimensional_model.py @@ -52,7 +52,7 @@ class DimensionalModelLoader: Pipeline steps: 1. Load dim_country - 2. Load dim_indicator (+ kolom framework dari cleaned_integrated) + 2. Load dim_indicator 3. Load dim_time 4. Load dim_source 5. Load dim_pillar @@ -313,7 +313,6 @@ class DimensionalModelLoader: indicator_category — kategori (Health & Nutrition, dll.) unit — satuan ukuran direction — higher_better / lower_better - framework — MDGs / SDGs <-- BARU: dibaca dari cleaned_integrated """ table_name = 'dim_indicator' self.load_metadata[table_name]['start_time'] = datetime.now() @@ -323,7 +322,6 @@ class DimensionalModelLoader: has_direction = 'direction' in self.df_clean.columns has_unit = 'unit' in self.df_clean.columns has_category = 'indicator_category' in self.df_clean.columns - has_framework = 'framework' in self.df_clean.columns dim_indicator = self.df_clean[['indicator_standardized']].drop_duplicates().copy() dim_indicator.columns = ['indicator_name'] @@ -381,29 +379,10 @@ class DimensionalModelLoader: categorize_indicator ) - # Framework — KOLOM BARU - # Dibaca dari cleaned_integrated yang sudah menjalankan assign_framework(). - # Jika kolom belum ada (misal pipeline lama), fallback ke 'MDGs' dengan warning. - if has_framework: - fw_map = self.df_clean[ - ['indicator_standardized', 'framework'] - ].drop_duplicates() - fw_map.columns = ['indicator_name', 'framework'] - dim_indicator = dim_indicator.merge(fw_map, on='indicator_name', how='left') - # Pastikan tidak ada NULL setelah merge - dim_indicator['framework'] = dim_indicator['framework'].fillna('MDGs') - self.logger.info(" [OK] framework column from cleaned_integrated") - else: - dim_indicator['framework'] = 'MDGs' - self.logger.warning( - " [WARN] framework column not found in cleaned_integrated. " - "Default: MDGs. Jalankan bigquery_cleaned_layer.py terlebih dahulu." - ) - dim_indicator = dim_indicator.drop_duplicates(subset=['indicator_name'], keep='first') dim_indicator_final = dim_indicator[ - ['indicator_name', 'indicator_category', 'unit', 'direction', 'framework'] + ['indicator_name', 'indicator_category', 'unit', 'direction'] ].copy() dim_indicator_final = dim_indicator_final.reset_index(drop=True) dim_indicator_final.insert(0, 'indicator_id', range(1, len(dim_indicator_final) + 1)) @@ -414,7 +393,6 @@ class DimensionalModelLoader: bigquery.SchemaField("indicator_category", "STRING", mode="REQUIRED"), bigquery.SchemaField("unit", "STRING", mode="NULLABLE"), bigquery.SchemaField("direction", "STRING", mode="REQUIRED"), - bigquery.SchemaField("framework", "STRING", mode="REQUIRED"), ] rows_loaded = load_to_bigquery( @@ -427,7 +405,6 @@ class DimensionalModelLoader: for label, col in [ ('Categories', 'indicator_category'), ('Direction', 'direction'), - ('Framework', 'framework'), ]: self.logger.info(f" {label}:") for val, cnt in dim_indicator_final[col].value_counts().items(): @@ -766,22 +743,6 @@ class DimensionalModelLoader: self.logger.info(f" Unique Sources : {int(stats['unique_sources']):>10,}") self.logger.info(f" Unique Pillars : {int(stats['unique_pillars']):>10,}") - # Validasi distribusi framework di dim_indicator - query_fw = f""" - SELECT framework, COUNT(*) AS count - FROM `{get_table_id('dim_indicator', layer='gold')}` - GROUP BY framework ORDER BY framework - """ - df_fw = self.client.query(query_fw).result().to_dataframe( - create_bqstorage_client=False - ) - if len(df_fw) > 0: - self.logger.info(f"\n Framework Distribution (dim_indicator):") - for _, row in df_fw.iterrows(): - self.logger.info( - f" {row['framework']:10s}: {int(row['count']):>5,} indicators" - ) - query_dir = f""" SELECT direction, COUNT(*) AS count FROM `{get_table_id('dim_indicator', layer='gold')}` @@ -906,10 +867,6 @@ if __name__ == "__main__": print(f" Year range : {int(df_clean['year'].min())}-{int(df_clean['year'].max())}") if 'direction' in df_clean.columns: print(f" Direction : {df_clean['direction'].value_counts().to_dict()}") - if 'framework' in df_clean.columns: - print(f" Framework : {df_clean['framework'].value_counts().to_dict()}") - else: - print(" [WARN] framework column not found — run bigquery_cleaned_layer.py first") print("\n[1/1] Dimensional Model Load -> DW (Gold)...") loader = DimensionalModelLoader(client, df_clean) @@ -917,7 +874,7 @@ if __name__ == "__main__": print("\n" + "=" * 60) print("[OK] DIMENSIONAL MODEL ETL COMPLETED") - print(" DW (Gold) : dim_country, dim_indicator (+ framework),") - print(" dim_time, dim_source, dim_pillar, fact_food_security") + print(" DW (Gold) : dim_country, dim_indicator, dim_time,") + print(" dim_source, dim_pillar, fact_food_security") print(" AUDIT : etl_logs, etl_metadata") print("=" * 60) \ No newline at end of file