diff --git a/scripts/bigquery_analytical_layer.py b/scripts/bigquery_analytical_layer.py index bf1381e..018be28 100644 --- a/scripts/bigquery_analytical_layer.py +++ b/scripts/bigquery_analytical_layer.py @@ -1,39 +1,14 @@ """ BIGQUERY ANALYTICAL LAYER - DATA FILTERING -fact_asean_food_security_selected disimpan di fs_asean_gold (layer='gold') +FIXED: fact_asean_food_security_selected disimpan di fs_asean_gold (layer='gold') Filtering Order: 1. Load data (single years only) -2. Determine year boundaries (2013 - auto-detected end year, baseline=2023 per syarat dosen) +2. Determine year boundaries (2013 - auto-detected end year) 3. Filter complete indicators PER COUNTRY (auto-detect start year, no gaps) 4. Filter countries with ALL pillars (FIXED SET) 5. Filter indicators with consistent presence across FIXED countries -6. Determine SDG start year & assign framework (MDGs/SDGs) per ROW per year -7. Verify no gaps -8. Calculate norm_value_1_100 per indicator per country (min-max, direction-aware) -9. Calculate YoY per indicator per country -10. Analyze indicator availability by year -11. Save analytical table - -NORMALISASI (Step 8): -- norm_value_1_100 = min-max normalisasi nilai raw per indikator, skala 1-100 -- Direction-aware: lower_better diinvert sehingga nilai tinggi selalu = lebih baik -- Normalisasi dilakukan GLOBAL per indikator (semua negara, semua tahun sekaligus) - sehingga nilai antar negara dan antar tahun tetap comparable -- Kolom ini memungkinkan perbandingan antar indikator yang berbeda satuan di Looker Studio - -FRAMEWORK LOGIC (Per-Row, bukan per indikator): -- sdg_start_year dideteksi dari data: tahun pertama indikator FIES lengkap - di semua fixed countries (setelah Step 3-5 filter selesai) -- Proxy deteksi sdg_start_year: HANYA FIES ("food insecurity", "food insecure") - Anemia TIDAK dipakai sebagai proxy karena datanya sudah ada sebelum era SDGs -- Framework di-assign PER BARIS (per year), bukan per indikator: - * row['year'] >= sdg_start_year AND nama ada di SDG_INDICATOR_KEYWORDS -> 'SDGs' - * Selain itu -> 'MDGs' -- Ini menangani indikator "shared" (anemia, stunting, wasting, undernourishment) - yang datanya ada sebelum SDGs: - * row lama (year < sdg_start_year) -> 'MDGs' - * row baru (year >= sdg_start_year) -> 'SDGs' +6. Save analytical table (dengan nama/label lengkap untuk Looker Studio) """ import pandas as pd @@ -59,128 +34,6 @@ from scripts.bigquery_helpers import ( from google.cloud import bigquery -# ============================================================================= -# SDG INDICATOR KEYWORDS -# Daftar nama indikator (lowercase) yang masuk SDG framework. -# Indikator ini akan di-assign 'SDGs' untuk baris dengan year >= sdg_start_year, -# dan 'MDGs' untuk baris dengan year < sdg_start_year. -# ============================================================================= - -SDG_INDICATOR_KEYWORDS = frozenset([ - # TARGET 2.1.1 — Prevalence of undernourishment (shared: ada sebelum SDGs) - "prevalence of undernourishment (percent) (3-year average)", - "number of people undernourished (million) (3-year average)", - # TARGET 2.1.2 — FIES (SDGs only — murni baru di era SDGs) - "prevalence of severe food insecurity in the total population (percent) (3-year average)", - "prevalence of severe food insecurity in the male adult population (percent) (3-year average)", - "prevalence of severe food insecurity in the female adult population (percent) (3-year average)", - "prevalence of moderate or severe food insecurity in the total population (percent) (3-year average)", - "prevalence of moderate or severe food insecurity in the male adult population (percent) (3-year average)", - "prevalence of moderate or severe food insecurity in the female adult population (percent) (3-year average)", - "number of severely food insecure people (million) (3-year average)", - "number of severely food insecure male adults (million) (3-year average)", - "number of severely food insecure female adults (million) (3-year average)", - "number of moderately or severely food insecure people (million) (3-year average)", - "number of moderately or severely food insecure male adults (million) (3-year average)", - "number of moderately or severely food insecure female adults (million) (3-year average)", - # TARGET 2.2.1 — Stunting (shared: ada sebelum SDGs) - "percentage of children under 5 years of age who are stunted (modelled estimates) (percent)", - "number of children under 5 years of age who are stunted (modeled estimates) (million)", - # TARGET 2.2.2 — Wasting & Overweight (shared: ada sebelum SDGs) - "percentage of children under 5 years affected by wasting (percent)", - "number of children under 5 years affected by wasting (million)", - "percentage of children under 5 years of age who are overweight (modelled estimates) (percent)", - "number of children under 5 years of age who are overweight (modeled estimates) (million)", - # TARGET 2.2.3 — Anaemia (shared: data ada sebelum SDGs, listed here agar - # baris >= sdg_start_year di-assign 'SDGs') - "prevalence of anemia among women of reproductive age (15-49 years) (percent)", - "number of women of reproductive age (15-49 years) affected by anemia (million)", -]) - -# ============================================================================= -# SDG ERA PROXY KEYWORDS -# HANYA indikator yang MURNI baru di era SDGs (FIES saja). -# Dipakai untuk mendeteksi sdg_start_year dari data. -# -# PENTING — Anemia/anaemia TIDAK dipakai sebagai proxy: -# Data anemia sudah ada sebelum era SDGs sehingga actual_start_year-nya -# lebih awal dari sdg_start_year. Jika dipakai sebagai proxy, sdg_start_year -# akan terdeteksi terlalu awal dan seluruh baris anemia akan menjadi 'SDGs'. -# FIES adalah satu-satunya indikator yang benar-benar murni baru di era SDGs -# dan dapat dipakai sebagai penanda tahun mulainya era SDGs. -# ============================================================================= -_SDG_ERA_PROXY_KEYWORDS = frozenset([ - "food insecurity", - "food insecure", -]) - -# ============================================================================= -# THRESHOLD KONDISI (fixed absolute, skala 1-100) -# ============================================================================= -# Digunakan untuk assign kondisi di analysis_layer. -# Didefinisikan di sini agar konsisten antara kedua file. -# bad : norm_value_1_100 < THRESHOLD_BAD -# good : norm_value_1_100 > THRESHOLD_GOOD -# moderate : di antara keduanya - -THRESHOLD_BAD = 40.0 -THRESHOLD_GOOD = 60.0 - - -def assign_condition(norm_value_1_100: float) -> str: - """ - Assign kondisi berdasarkan norm_value_1_100 (skala 1-100, sudah direction-aware). - Nilai tinggi selalu berarti lebih baik (lower_better sudah diinvert). - - Returns: 'good' / 'moderate' / 'bad' - """ - if pd.isna(norm_value_1_100): - return None - if norm_value_1_100 > THRESHOLD_GOOD: - return 'good' - if norm_value_1_100 < THRESHOLD_BAD: - return 'bad' - return 'moderate' - - -def assign_framework_per_row( - indicator_name: str, - year: int, - sdg_start_year: int, -) -> str: - """ - Tentukan framework (MDGs/SDGs) per BARIS (per row year), bukan per indikator. - - Logic: - - 'SDGs' jika KEDUA kondisi terpenuhi: - 1. Nama indikator ada di SDG_INDICATOR_KEYWORDS - 2. year (tahun baris ini) >= sdg_start_year - - 'MDGs' untuk semua kasus lain. - - Mengapa per row, bukan per indikator? - Indikator "shared" seperti anemia, stunting, wasting, undernourishment - memiliki data yang ada SEBELUM era SDGs dimulai. Jika assign dilakukan - per indikator menggunakan actual_start_year, indikator-indikator ini - akan selalu di-assign 'MDGs' karena actual_start_year < sdg_start_year. - Dengan assign per row menggunakan year baris: - - baris lama (year < sdg_start_year) -> 'MDGs' (benar: belum era SDGs) - - baris baru (year >= sdg_start_year) -> 'SDGs' (benar: sudah era SDGs) - - Contoh anemia (sdg_start_year = 2016): - - row year=2013 -> 'MDGs' - - row year=2014 -> 'MDGs' - - row year=2015 -> 'MDGs' - - row year=2016 -> 'SDGs' - - row year=2017 -> 'SDGs' - - ... - """ - name_lower = str(indicator_name).lower().strip() - in_sdg_list = name_lower in SDG_INDICATOR_KEYWORDS - if in_sdg_list and int(year) >= sdg_start_year: - return 'SDGs' - return 'MDGs' - - # ============================================================================= # ANALYTICAL LAYER CLASS # ============================================================================= @@ -189,19 +42,13 @@ class AnalyticalLayerLoader: """ Analytical Layer Loader for BigQuery - Output kolom fact_asean_food_security_selected: - country_id, country_name, - indicator_id, indicator_name, direction, framework, - pillar_id, pillar_name, - time_id, year, value, - norm_value_1_100, <- min-max norm per indikator, skala 1-100, direction-aware - yoy_change, yoy_pct + Key Logic: + 1. Complete per country (no gaps from start_year to end_year) + 2. Filter countries with all pillars + 3. Ensure indicators have consistent country count across all years + 4. Save dengan kolom lengkap (nama + ID) untuk kemudahan Looker Studio - Catatan framework: - Framework di-assign PER BARIS (per year), sehingga indikator shared - seperti anemia dapat memiliki framework berbeda di baris yang berbeda: - - baris sebelum sdg_start_year -> 'MDGs' - - baris sejak sdg_start_year -> 'SDGs' + Output: fact_asean_food_security_selected -> DW layer (Gold) -> fs_asean_gold """ def __init__(self, client: bigquery.Client): @@ -218,9 +65,7 @@ class AnalyticalLayerLoader: self.start_year = 2013 self.end_year = None - self.baseline_year = 2023 # hardcode per syarat dosen (tahun terlengkap) - - self.sdg_start_year = None + self.baseline_year = 2023 self.pipeline_metadata = { 'source_class' : self.__class__.__name__, @@ -236,10 +81,6 @@ class AnalyticalLayerLoader: self.pipeline_start = None self.pipeline_end = None - # ------------------------------------------------------------------ - # STEP 1: LOAD SOURCE DATA - # ------------------------------------------------------------------ - def load_source_data(self): self.logger.info("\n" + "=" * 80) self.logger.info("STEP 1: LOADING SOURCE DATA from fs_asean_gold") @@ -270,17 +111,14 @@ class AnalyticalLayerLoader: """ self.logger.info("Loading fact table with dimensions...") - self.df_clean = self.client.query(query).result().to_dataframe( - create_bqstorage_client=False - ) + self.df_clean = self.client.query(query).result().to_dataframe(create_bqstorage_client=False) self.logger.info(f" Loaded: {len(self.df_clean):,} rows") if 'is_year_range' in self.df_clean.columns: yr = self.df_clean['is_year_range'].value_counts() - self.logger.info( - f" Single years: {yr.get(False, 0):,} | " - f"Year ranges: {yr.get(True, 0):,}" - ) + self.logger.info(f" Breakdown:") + self.logger.info(f" Single years (is_year_range=False): {yr.get(False, 0):,}") + self.logger.info(f" Year ranges (is_year_range=True): {yr.get(True, 0):,}") self.df_indicator = read_from_bigquery(self.client, 'dim_indicator', layer='gold') self.df_country = read_from_bigquery(self.client, 'dim_country', layer='gold') @@ -297,48 +135,34 @@ class AnalyticalLayerLoader: self.logger.error(f"Error loading source data: {e}") raise - # ------------------------------------------------------------------ - # STEP 2: DETERMINE YEAR BOUNDARIES - # ------------------------------------------------------------------ - def determine_year_boundaries(self): self.logger.info("\n" + "=" * 80) self.logger.info("STEP 2: DETERMINE YEAR BOUNDARIES") self.logger.info("=" * 80) - # Filter single years only (is_year_range == False) - if 'is_year_range' in self.df_clean.columns: - before = len(self.df_clean) - self.df_clean = self.df_clean[self.df_clean['is_year_range'] == False].copy() - self.logger.info( - f" Filter single years only: {before:,} -> {len(self.df_clean):,} rows" - ) + df_2023 = self.df_clean[self.df_clean['year'] == self.baseline_year] + baseline_indicator_count = df_2023['indicator_id'].nunique() - # baseline_year = 2023 hardcode (syarat dosen: minimal 2023) - df_baseline = self.df_clean[self.df_clean['year'] == self.baseline_year] - baseline_indicator_count = df_baseline['indicator_id'].nunique() - - self.logger.info(f"\n Baseline year (hardcode, syarat dosen): {self.baseline_year}") - self.logger.info(f" Baseline indicator count: {baseline_indicator_count}") + self.logger.info(f"\nBaseline Year: {self.baseline_year}") + self.logger.info(f"Baseline Indicator Count: {baseline_indicator_count}") years_sorted = sorted(self.df_clean['year'].unique(), reverse=True) selected_end_year = None - self.logger.info(f"\n Scanning end_year (>= {self.baseline_year}):") for year in years_sorted: if year >= self.baseline_year: df_year = self.df_clean[self.df_clean['year'] == year] year_indicator_count = df_year['indicator_id'].nunique() status = "OK" if year_indicator_count >= baseline_indicator_count else "X" - self.logger.info(f" [{status}] Year {int(year)}: {year_indicator_count} indicators") + self.logger.info(f" [{status}] Year {int(year)}: {year_indicator_count} indicators") if year_indicator_count >= baseline_indicator_count and selected_end_year is None: selected_end_year = int(year) if selected_end_year is None: selected_end_year = self.baseline_year - self.logger.warning(f" [!] Fallback to baseline: {selected_end_year}") + self.logger.warning(f" [!] No year found, using baseline: {selected_end_year}") else: - self.logger.info(f"\n [OK] Selected end year: {selected_end_year}") + self.logger.info(f"\n [OK] Selected End Year: {selected_end_year}") self.end_year = selected_end_year original_count = len(self.df_clean) @@ -348,15 +172,11 @@ class AnalyticalLayerLoader: (self.df_clean['year'] <= self.end_year) ].copy() - self.logger.info(f"\n Filtering {self.start_year}-{self.end_year}:") - self.logger.info(f" Rows before: {original_count:,}") - self.logger.info(f" Rows after : {len(self.df_clean):,}") + self.logger.info(f"\nFiltering {self.start_year}-{self.end_year}:") + self.logger.info(f" Rows before: {original_count:,}") + self.logger.info(f" Rows after: {len(self.df_clean):,}") return self.df_clean - # ------------------------------------------------------------------ - # STEP 3: FILTER COMPLETE INDICATORS PER COUNTRY - # ------------------------------------------------------------------ - def filter_complete_indicators_per_country(self): self.logger.info("\n" + "=" * 80) self.logger.info("STEP 3: FILTER COMPLETE INDICATORS PER COUNTRY (NO GAPS)") @@ -408,15 +228,10 @@ class AnalyticalLayerLoader: self.logger.info(f"\n [+] Valid: {len(valid_combinations):,}") self.logger.info(f" [-] Removed: {len(removed_combinations):,}") - df_valid = pd.DataFrame(valid_combinations) - df_valid['key'] = ( - df_valid['country_id'].astype(str) + '_' + - df_valid['indicator_id'].astype(str) - ) - self.df_clean['key'] = ( - self.df_clean['country_id'].astype(str) + '_' + - self.df_clean['indicator_id'].astype(str) - ) + df_valid = pd.DataFrame(valid_combinations) + df_valid['key'] = df_valid['country_id'].astype(str) + '_' + df_valid['indicator_id'].astype(str) + self.df_clean['key'] = (self.df_clean['country_id'].astype(str) + '_' + + self.df_clean['indicator_id'].astype(str)) original_count = len(self.df_clean) self.df_clean = self.df_clean[self.df_clean['key'].isin(df_valid['key'])].copy() @@ -428,10 +243,6 @@ class AnalyticalLayerLoader: self.logger.info(f" Indicators: {self.df_clean['indicator_id'].nunique()}") return self.df_clean - # ------------------------------------------------------------------ - # STEP 4: SELECT COUNTRIES WITH ALL PILLARS - # ------------------------------------------------------------------ - def select_countries_with_all_pillars(self): self.logger.info("\n" + "=" * 80) self.logger.info("STEP 4: SELECT COUNTRIES WITH ALL PILLARS (FIXED SET)") @@ -454,26 +265,18 @@ class AnalyticalLayerLoader: f"{row['pillar_count']}/{total_pillars} pillars" ) - selected_countries = country_pillar_count[ - country_pillar_count['pillar_count'] == total_pillars - ] + selected_countries = country_pillar_count[country_pillar_count['pillar_count'] == total_pillars] self.selected_country_ids = selected_countries['country_id'].tolist() self.logger.info(f"\n FIXED SET: {len(self.selected_country_ids)} countries") original_count = len(self.df_clean) - self.df_clean = self.df_clean[ - self.df_clean['country_id'].isin(self.selected_country_ids) - ].copy() + self.df_clean = self.df_clean[self.df_clean['country_id'].isin(self.selected_country_ids)].copy() self.logger.info(f" Rows before: {original_count:,}") self.logger.info(f" Rows after: {len(self.df_clean):,}") return self.df_clean - # ------------------------------------------------------------------ - # STEP 5: FILTER INDICATORS CONSISTENT ACROSS FIXED COUNTRIES - # ------------------------------------------------------------------ - def filter_indicators_consistent_across_fixed_countries(self): self.logger.info("\n" + "=" * 80) self.logger.info("STEP 5: FILTER INDICATORS WITH CONSISTENT PRESENCE") @@ -482,9 +285,7 @@ class AnalyticalLayerLoader: indicator_country_start = self.df_clean.groupby([ 'indicator_id', 'indicator_name', 'country_id' ])['year'].min().reset_index() - indicator_country_start.columns = [ - 'indicator_id', 'indicator_name', 'country_id', 'start_year' - ] + indicator_country_start.columns = ['indicator_id', 'indicator_name', 'country_id', 'start_year'] indicator_max_start = indicator_country_start.groupby([ 'indicator_id', 'indicator_name' @@ -529,26 +330,16 @@ class AnalyticalLayerLoader: self.logger.info(f"\n [+] Valid: {len(valid_indicators)}") self.logger.info(f" [-] Removed: {len(removed_indicators)}") - if removed_indicators: - self.logger.info(f"\n Removed indicators:") - for item in removed_indicators: - self.logger.info(f" [-] {item['indicator_name'][:60]} | {item['reason']}") - if not valid_indicators: raise ValueError("No valid indicators found after filtering!") original_count = len(self.df_clean) - self.df_clean = self.df_clean[ - self.df_clean['indicator_id'].isin(valid_indicators) - ].copy() + self.df_clean = self.df_clean[self.df_clean['indicator_id'].isin(valid_indicators)].copy() self.df_clean = self.df_clean.merge( - indicator_max_start[['indicator_id', 'max_start_year']], - on='indicator_id', how='left' + indicator_max_start[['indicator_id', 'max_start_year']], on='indicator_id', how='left' ) - self.df_clean = self.df_clean[ - self.df_clean['year'] >= self.df_clean['max_start_year'] - ].copy() + self.df_clean = self.df_clean[self.df_clean['year'] >= self.df_clean['max_start_year']].copy() self.df_clean = self.df_clean.drop('max_start_year', axis=1) self.logger.info(f"\n Rows before: {original_count:,}") @@ -558,136 +349,18 @@ class AnalyticalLayerLoader: self.logger.info(f" Pillars: {self.df_clean['pillar_id'].nunique()}") return self.df_clean - # ------------------------------------------------------------------ - # STEP 6: DETERMINE SDG START YEAR & ASSIGN FRAMEWORK PER ROW - # ------------------------------------------------------------------ - - def determine_sdg_start_year(self): - self.logger.info("\n" + "=" * 80) - self.logger.info("STEP 6: DETERMINE SDG START YEAR & ASSIGN FRAMEWORK PER ROW") - self.logger.info("=" * 80) - self.logger.info( - " Proxy: FIES only (food insecurity/food insecure).\n" - " Anemia TIDAK dipakai sebagai proxy — datanya ada sebelum era SDGs.\n" - " Framework di-assign PER BARIS (year), bukan per indikator." - ) - - # actual_start_year per indikator = max(min_year per country) - # = konsisten dengan max_start_year di Step 5 - indicator_actual_start = ( - self.df_clean - .groupby(['indicator_id', 'indicator_name', 'country_id'])['year'] - .min().reset_index() - .groupby(['indicator_id', 'indicator_name'])['year'] - .max().reset_index() - ) - indicator_actual_start.columns = ['indicator_id', 'indicator_name', 'actual_start_year'] - - # Deteksi sdg_start_year dari proxy SDGs-only (FIES saja, BUKAN anemia) - proxy_mask = indicator_actual_start['indicator_name'].str.lower().apply( - lambda n: any(kw in n for kw in _SDG_ERA_PROXY_KEYWORDS) - ) - df_proxy = indicator_actual_start[proxy_mask] - - if df_proxy.empty: - raise ValueError( - "Tidak ada indikator proxy SDGs (FIES) yang lolos filter. " - "Pastikan indikator FIES (food insecurity/food insecure) ada di data." - ) - - self.sdg_start_year = int(df_proxy['actual_start_year'].min()) - self.logger.info(f"\n sdg_start_year = {self.sdg_start_year}") - self.logger.info(f" Proxy indicators (FIES only):") - for _, row in df_proxy.iterrows(): - self.logger.info(f" [{int(row['actual_start_year'])}] {row['indicator_name']}") - - # ---------------------------------------------------------------- - # Assign framework PER BARIS menggunakan year baris, bukan actual_start_year - # Sehingga indikator "shared" (anemia, stunting, dll) mendapat: - # - 'MDGs' untuk baris sebelum sdg_start_year - # - 'SDGs' untuk baris sejak sdg_start_year - # ---------------------------------------------------------------- - self.df_clean['framework'] = self.df_clean.apply( - lambda row: assign_framework_per_row( - indicator_name = row['indicator_name'], - year = int(row['year']), - sdg_start_year = self.sdg_start_year, - ), - axis=1 - ) - - # ---------------------------------------------------------------- - # Logging: ringkasan per indikator (frameworks apa yang muncul) - # ---------------------------------------------------------------- - ind_fw_summary = ( - self.df_clean - .groupby(['indicator_id', 'indicator_name'])['framework'] - .unique() - .reset_index() - ) - ind_fw_summary['frameworks'] = ind_fw_summary['framework'].apply( - lambda x: '/'.join(sorted(x)) - ) - ind_fw_summary = ind_fw_summary.merge( - indicator_actual_start[['indicator_id', 'actual_start_year']], - on='indicator_id', how='left' - ) - - self.logger.info(f"\n Framework assignment per indikator:") - self.logger.info(f" {'-'*85}") - self.logger.info(f" {'ID':<5} {'Frameworks':<18} {'ActualStart':<13} {'Indicator Name'}") - self.logger.info(f" {'-'*85}") - for _, row in ind_fw_summary.sort_values( - ['frameworks', 'actual_start_year', 'indicator_name'] - ).iterrows(): - self.logger.info( - f" {int(row['indicator_id']):<5} {row['frameworks']:<18} " - f"{int(row['actual_start_year']):<13} {row['indicator_name'][:48]}" - ) - - # Indikator dengan framework split (MDGs/SDGs) — highlight untuk validasi - split_inds = ind_fw_summary[ind_fw_summary['frameworks'] == 'MDGs/SDGs'] - if not split_inds.empty: - self.logger.info( - f"\n [INFO] {len(split_inds)} indikator memiliki framework split " - f"(MDGs sebelum {self.sdg_start_year}, SDGs sejak {self.sdg_start_year}):" - ) - for _, row in split_inds.iterrows(): - self.logger.info(f" - {row['indicator_name'][:60]}") - - fw_summary = self.df_clean['framework'].value_counts() - self.logger.info( - f"\n Ringkasan rows: " + - " | ".join(f"{fw}: {cnt:,}" for fw, cnt in fw_summary.items()) - ) - - self.logger.info( - f"\n [OK] 'framework' ditambahkan per row — " - f"MDGs: {(self.df_clean['framework'] == 'MDGs').sum():,} rows | " - f"SDGs: {(self.df_clean['framework'] == 'SDGs').sum():,} rows" - ) - return self.df_clean - - # ------------------------------------------------------------------ - # STEP 7: VERIFY NO GAPS - # ------------------------------------------------------------------ - def verify_no_gaps(self): self.logger.info("\n" + "=" * 80) - self.logger.info("STEP 7: VERIFY NO GAPS") + self.logger.info("STEP 6: VERIFY NO GAPS") self.logger.info("=" * 80) expected_countries = len(self.selected_country_ids) - verification = self.df_clean.groupby( - ['indicator_id', 'year'] - )['country_id'].nunique().reset_index() + verification = self.df_clean.groupby(['indicator_id', 'year'])['country_id'].nunique().reset_index() verification.columns = ['indicator_id', 'year', 'country_count'] all_good = (verification['country_count'] == expected_countries).all() if all_good: - self.logger.info( - f" VERIFICATION PASSED — all combinations have {expected_countries} countries" - ) + self.logger.info(f" VERIFICATION PASSED — all combinations have {expected_countries} countries") else: bad = verification[verification['country_count'] != expected_countries] for _, row in bad.head(10).iterrows(): @@ -699,143 +372,9 @@ class AnalyticalLayerLoader: return True - # ------------------------------------------------------------------ - # STEP 8: CALCULATE NORM_VALUE_1_100 PER INDICATOR - # ------------------------------------------------------------------ - - def calculate_norm_value(self): - """ - Hitung norm_value_1_100 per indikator — min-max normalisasi skala 1-100, - direction-aware. - - CARA KERJA: - - Normalisasi dilakukan GLOBAL per indikator (semua negara + semua tahun sekaligus) - sehingga nilai antar negara dan antar tahun tetap comparable. - - lower_better diinvert: nilai tinggi selalu = kondisi lebih baik. - Contoh: undernourishment 5% (rendah = baik) → norm tinggi setelah invert. - - Skala 1-100 (bukan 0-100) untuk menghindari nilai absolut nol di Looker Studio. - - Kolom ini memungkinkan perbandingan lintas indikator yang berbeda satuan - (persen, juta orang, dll) karena sudah dinormalisasi ke skala yang sama. - - Catatan: - - Berbeda dengan norm_value di _get_norm_value_df() di analysis_layer - yang skala 0-1 dan dipakai untuk agregasi composite score. - - norm_value_1_100 ini adalah per baris (per country per year per indicator), - untuk ditampilkan langsung di Looker Studio. - """ - self.logger.info("\n" + "=" * 80) - self.logger.info("STEP 8: CALCULATE NORM_VALUE_1_100 PER INDICATOR") - self.logger.info("=" * 80) - - DIRECTION_INVERT = frozenset({ - "negative", "lower_better", "lower_is_better", "inverse", "neg", - }) - - df = self.df_clean.copy() - norm_parts = [] - - indicators = df.groupby(['indicator_id', 'indicator_name', 'direction']) - self.logger.info(f"\n {'ID':<5} {'Direction':<15} {'Invert':<8} {'Min':>10} {'Max':>10} {'Indicator Name'}") - self.logger.info(f" {'-'*90}") - - for (ind_id, ind_name, direction), grp in indicators: - grp = grp.copy() - do_invert = str(direction).lower().strip() in DIRECTION_INVERT - valid_mask = grp['value'].notna() - n_valid = valid_mask.sum() - - if n_valid < 2: - grp['norm_value_1_100'] = np.nan - norm_parts.append(grp) - continue - - raw = grp.loc[valid_mask, 'value'].values - v_min = raw.min() - v_max = raw.max() - normed = np.full(len(grp), np.nan) - - if v_min == v_max: - # Semua nilai sama → beri nilai tengah (50.5 pada skala 1-100) - normed[valid_mask.values] = 50.5 - else: - # Min-max ke 0-1 dulu - scaled = (raw - v_min) / (v_max - v_min) - # Invert jika lower_better - if do_invert: - scaled = 1.0 - scaled - # Scale ke 1-100 - normed[valid_mask.values] = 1.0 + scaled * 99.0 - - grp['norm_value_1_100'] = normed - - self.logger.info( - f" {int(ind_id):<5} {direction:<15} {'YES' if do_invert else 'no':<8} " - f"{v_min:>10.3f} {v_max:>10.3f} {ind_name[:45]}" - ) - norm_parts.append(grp) - - self.df_clean = pd.concat(norm_parts, ignore_index=True) - - # Statistik ringkasan - valid_norm = self.df_clean['norm_value_1_100'].notna().sum() - null_norm = self.df_clean['norm_value_1_100'].isna().sum() - self.logger.info(f"\n norm_value_1_100 — valid: {valid_norm:,} | null: {null_norm:,}") - self.logger.info( - f" Range aktual: " - f"{self.df_clean['norm_value_1_100'].min():.2f} - " - f"{self.df_clean['norm_value_1_100'].max():.2f}" - ) - - # Log distribusi kondisi berdasarkan threshold - self.df_clean['_condition_preview'] = self.df_clean['norm_value_1_100'].apply(assign_condition) - cond_dist = self.df_clean['_condition_preview'].value_counts() - self.logger.info(f"\n Distribusi kondisi (threshold: bad<{THRESHOLD_BAD}, good>{THRESHOLD_GOOD}):") - for cond, cnt in cond_dist.items(): - self.logger.info(f" {cond}: {cnt:,} rows") - self.df_clean = self.df_clean.drop(columns=['_condition_preview']) - - self.logger.info(f"\n [OK] Kolom 'norm_value_1_100' ditambahkan ke df_clean") - return self.df_clean - - # ------------------------------------------------------------------ - # STEP 9: CALCULATE YOY - # ------------------------------------------------------------------ - - def calculate_yoy(self): - self.logger.info("\n" + "=" * 80) - self.logger.info("STEP 9: CALCULATE YEAR-OVER-YEAR (YoY) PER INDICATOR PER COUNTRY") - self.logger.info("=" * 80) - - df = self.df_clean.sort_values(['country_id', 'indicator_id', 'year']).copy() - - df['value_prev'] = df.groupby(['country_id', 'indicator_id'])['value'].shift(1) - df['yoy_change'] = df['value'] - df['value_prev'] - df['yoy_pct'] = np.where( - df['value_prev'].notna() & (df['value_prev'] != 0), - (df['yoy_change'] / df['value_prev'].abs()) * 100, - np.nan - ) - df = df.drop(columns=['value_prev']) - - total_rows = len(df) - valid_yoy = df['yoy_pct'].notna().sum() - null_yoy = df['yoy_pct'].isna().sum() - - self.logger.info(f" Total rows : {total_rows:,}") - self.logger.info(f" YoY calculated : {valid_yoy:,}") - self.logger.info(f" YoY NULL (base yr): {null_yoy:,}") - - self.df_clean = df - self.logger.info(f" [OK] Kolom 'yoy_change', 'yoy_pct' ditambahkan") - return self.df_clean - - # ------------------------------------------------------------------ - # STEP 10: ANALYZE INDICATOR AVAILABILITY BY YEAR - # ------------------------------------------------------------------ - def analyze_indicator_availability_by_year(self): self.logger.info("\n" + "=" * 80) - self.logger.info("STEP 10: ANALYZE INDICATOR AVAILABILITY BY YEAR") + self.logger.info("STEP 7: ANALYZE INDICATOR AVAILABILITY BY YEAR") self.logger.info("=" * 80) year_stats = self.df_clean.groupby('year').agg({ @@ -861,147 +400,89 @@ class AnalyticalLayerLoader: 'indicator_id', 'indicator_name', 'pillar_name', 'direction', 'start_year', 'end_year', 'country_count' ] - - # Framework summary per indikator (bisa MDGs, SDGs, atau MDGs/SDGs split) - ind_fw = ( - self.df_clean - .groupby('indicator_id')['framework'] - .unique() - .reset_index() - ) - ind_fw['framework_label'] = ind_fw['framework'].apply( - lambda x: '/'.join(sorted(x)) - ) - indicator_details = indicator_details.merge( - ind_fw[['indicator_id', 'framework_label']], - on='indicator_id', how='left' - ) - indicator_details['year_range'] = ( indicator_details['start_year'].astype(int).astype(str) + '-' + indicator_details['end_year'].astype(int).astype(str) ) - indicator_details = indicator_details.sort_values( - ['framework_label', 'pillar_name', 'start_year', 'indicator_name'] - ) + indicator_details = indicator_details.sort_values(['pillar_name', 'start_year', 'indicator_name']) self.logger.info(f"\nTotal Indicators: {len(indicator_details)}") - self.logger.info(f"Framework breakdown (per indicator label):") - for fw, count in indicator_details.groupby('framework_label').size().items(): - self.logger.info(f" {fw}: {count} indicators") + for pillar, count in indicator_details.groupby('pillar_name').size().items(): + self.logger.info(f" {pillar}: {count} indicators") - self.logger.info(f"\n{'-'*115}") - self.logger.info( - f"{'ID':<5} {'Indicator Name':<55} {'Pillar':<15} " - f"{'Framework':<15} {'Years':<12} {'Dir':<8} {'Countries'}" - ) - self.logger.info(f"{'-'*115}") + self.logger.info(f"\n{'-'*100}") + self.logger.info(f"{'ID':<5} {'Indicator Name':<55} {'Pillar':<15} {'Years':<12} {'Dir':<8} {'Countries'}") + self.logger.info(f"{'-'*100}") for _, row in indicator_details.iterrows(): direction = 'higher+' if row['direction'] == 'higher_better' else 'lower-' self.logger.info( f"{int(row['indicator_id']):<5} {row['indicator_name'][:52]:<55} " - f"{row['pillar_name'][:13]:<15} {row['framework_label']:<15} " - f"{row['year_range']:<12} {direction:<8} {int(row['country_count'])}" + f"{row['pillar_name'][:13]:<15} {row['year_range']:<12} " + f"{direction:<8} {int(row['country_count'])}" ) return year_stats - # ------------------------------------------------------------------ - # STEP 11: SAVE ANALYTICAL TABLE - # ------------------------------------------------------------------ - def save_analytical_table(self): + # --------------------------------------------------------------- + # CHANGED: nama tabel baru + kolom lengkap untuk Looker Studio + # --------------------------------------------------------------- table_name = 'fact_asean_food_security_selected' self.logger.info("\n" + "=" * 80) - self.logger.info(f"STEP 11: SAVE TO [DW/Gold] {table_name} -> fs_asean_gold") + self.logger.info(f"STEP 8: SAVE TO [DW/Gold] {table_name} -> fs_asean_gold") self.logger.info("=" * 80) try: - if 'framework' not in self.df_clean.columns: - raise ValueError("Kolom 'framework' tidak ada. Pastikan Step 6 sudah dijalankan.") - if 'norm_value_1_100' not in self.df_clean.columns: - raise ValueError("Kolom 'norm_value_1_100' tidak ada. Pastikan Step 8 sudah dijalankan.") - if 'yoy_change' not in self.df_clean.columns: - raise ValueError("Kolom 'yoy_change' tidak ada. Pastikan Step 9 sudah dijalankan.") - + # ------------------------------------------------------------------ + # Pilih kolom: ID + Nama lengkap + value + # Kolom nama memudahkan filtering/slicing langsung di Looker Studio + # tanpa perlu join ulang ke tabel dimensi. + # ------------------------------------------------------------------ analytical_df = self.df_clean[[ 'country_id', 'country_name', 'indicator_id', 'indicator_name', 'direction', - 'framework', 'pillar_id', 'pillar_name', 'time_id', 'year', 'value', - 'norm_value_1_100', - 'yoy_change', - 'yoy_pct', ]].copy() analytical_df = analytical_df.sort_values( ['year', 'country_name', 'pillar_name', 'indicator_name'] ).reset_index(drop=True) - analytical_df['country_id'] = analytical_df['country_id'].astype(int) - analytical_df['country_name'] = analytical_df['country_name'].astype(str) - analytical_df['indicator_id'] = analytical_df['indicator_id'].astype(int) - analytical_df['indicator_name'] = analytical_df['indicator_name'].astype(str) - analytical_df['direction'] = analytical_df['direction'].astype(str) - analytical_df['framework'] = analytical_df['framework'].astype(str) - analytical_df['pillar_id'] = analytical_df['pillar_id'].astype(int) - analytical_df['pillar_name'] = analytical_df['pillar_name'].astype(str) - analytical_df['time_id'] = analytical_df['time_id'].astype(int) - analytical_df['year'] = analytical_df['year'].astype(int) - analytical_df['value'] = analytical_df['value'].astype(float) - analytical_df['norm_value_1_100'] = analytical_df['norm_value_1_100'].astype(float) - analytical_df['yoy_change'] = analytical_df['yoy_change'].astype(float) - analytical_df['yoy_pct'] = analytical_df['yoy_pct'].astype(float) + # Pastikan tipe data konsisten + analytical_df['country_id'] = analytical_df['country_id'].astype(int) + analytical_df['country_name'] = analytical_df['country_name'].astype(str) + analytical_df['indicator_id'] = analytical_df['indicator_id'].astype(int) + analytical_df['indicator_name']= analytical_df['indicator_name'].astype(str) + analytical_df['direction'] = analytical_df['direction'].astype(str) + analytical_df['pillar_id'] = analytical_df['pillar_id'].astype(int) + analytical_df['pillar_name'] = analytical_df['pillar_name'].astype(str) + analytical_df['time_id'] = analytical_df['time_id'].astype(int) + analytical_df['year'] = analytical_df['year'].astype(int) + analytical_df['value'] = analytical_df['value'].astype(float) + self.logger.info(f" Kolom yang disimpan: {list(analytical_df.columns)}") self.logger.info(f" Total rows: {len(analytical_df):,}") - # Framework distribution per row - fw_dist_rows = analytical_df['framework'].value_counts() - self.logger.info(f" Framework distribution (rows):") - for fw, cnt in fw_dist_rows.items(): - self.logger.info(f" {fw}: {cnt:,} rows") - - # Framework distribution per indikator (label) - ind_fw_label = ( - analytical_df - .groupby('indicator_id')['framework'] - .unique() - .apply(lambda x: '/'.join(sorted(x))) - .value_counts() - ) - self.logger.info(f" Framework distribution (per indicator label):") - for fw, cnt in ind_fw_label.items(): - self.logger.info(f" {fw}: {cnt} indicators") - - self.logger.info( - f" norm_value_1_100 range: " - f"{analytical_df['norm_value_1_100'].min():.2f} - " - f"{analytical_df['norm_value_1_100'].max():.2f}" - ) - + # Schema BigQuery schema = [ - bigquery.SchemaField("country_id", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("country_name", "STRING", mode="REQUIRED"), - bigquery.SchemaField("indicator_id", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("indicator_name", "STRING", mode="REQUIRED"), - bigquery.SchemaField("direction", "STRING", mode="REQUIRED"), - bigquery.SchemaField("framework", "STRING", mode="REQUIRED"), - bigquery.SchemaField("pillar_id", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("pillar_name", "STRING", mode="REQUIRED"), - bigquery.SchemaField("time_id", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("value", "FLOAT", mode="REQUIRED"), - bigquery.SchemaField("norm_value_1_100", "FLOAT", mode="NULLABLE"), - bigquery.SchemaField("yoy_change", "FLOAT", mode="NULLABLE"), - bigquery.SchemaField("yoy_pct", "FLOAT", mode="NULLABLE"), + bigquery.SchemaField("country_id", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("country_name", "STRING", mode="REQUIRED"), + bigquery.SchemaField("indicator_id", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("indicator_name", "STRING", mode="REQUIRED"), + bigquery.SchemaField("direction", "STRING", mode="REQUIRED"), + bigquery.SchemaField("pillar_id", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("pillar_name", "STRING", mode="REQUIRED"), + bigquery.SchemaField("time_id", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("value", "FLOAT", mode="REQUIRED"), ] rows_loaded = load_to_bigquery( @@ -1022,51 +503,34 @@ class AnalyticalLayerLoader: 'rows_loaded' : rows_loaded, 'completeness_pct' : 100.0, 'config_snapshot' : json.dumps({ - 'start_year' : self.start_year, - 'end_year' : self.end_year, - 'baseline_year' : self.baseline_year, - 'sdg_start_year' : self.sdg_start_year, - 'fixed_countries' : len(self.selected_country_ids), - 'norm_scale' : '1-100 per indicator global minmax direction-aware', - 'framework_assignment' : 'per-row by year (not per-indicator)', - 'sdg_proxy_keywords' : list(_SDG_ERA_PROXY_KEYWORDS), - 'condition_thresholds' : { - 'bad' : f'< {THRESHOLD_BAD}', - 'moderate': f'{THRESHOLD_BAD}-{THRESHOLD_GOOD}', - 'good' : f'> {THRESHOLD_GOOD}', - }, + 'start_year' : self.start_year, + 'end_year' : self.end_year, + 'fixed_countries': len(self.selected_country_ids), + 'no_gaps' : True, + 'layer' : 'gold', + 'columns' : 'id + name + value (Looker Studio ready)' }), 'validation_metrics' : json.dumps({ - 'fixed_countries' : len(self.selected_country_ids), - 'total_indicators' : int(self.df_clean['indicator_id'].nunique()), - 'sdg_start_year' : self.sdg_start_year, - 'framework_dist_rows' : fw_dist_rows.to_dict(), - 'framework_dist_inds' : ind_fw_label.to_dict(), + 'fixed_countries' : len(self.selected_country_ids), + 'total_indicators': int(self.df_clean['indicator_id'].nunique()) }) } save_etl_metadata(self.client, metadata) - self.logger.info(f" [OK] {table_name}: {rows_loaded:,} rows -> fs_asean_gold") + self.logger.info(f" ✓ {table_name}: {rows_loaded:,} rows → [DW/Gold] fs_asean_gold") + self.logger.info(f" Metadata → [AUDIT] etl_metadata") return rows_loaded except Exception as e: self.logger.error(f"Error saving: {e}") raise - # ------------------------------------------------------------------ - # RUN - # ------------------------------------------------------------------ - def run(self): self.pipeline_start = datetime.now() self.pipeline_metadata['start_time'] = self.pipeline_start self.logger.info("\n" + "=" * 80) - self.logger.info("Output: fact_asean_food_security_selected -> fs_asean_gold") - self.logger.info("Kolom baru: norm_value_1_100 (min-max 1-100, direction-aware)") - self.logger.info("Framework: per-row by year (shared indicators split MDGs/SDGs)") - self.logger.info(f"SDG Proxy: FIES only (food insecurity/food insecure)") - self.logger.info(f"Condition threshold: bad<{THRESHOLD_BAD}, good>{THRESHOLD_GOOD}") + self.logger.info("Output: fact_asean_food_security_selected → fs_asean_gold") self.logger.info("=" * 80) self.load_source_data() @@ -1074,10 +538,7 @@ class AnalyticalLayerLoader: self.filter_complete_indicators_per_country() self.select_countries_with_all_pillars() self.filter_indicators_consistent_across_fixed_countries() - self.determine_sdg_start_year() # Step 6: per-row framework assignment self.verify_no_gaps() - self.calculate_norm_value() # Step 8: norm_value_1_100 - self.calculate_yoy() # Step 9: yoy_change, yoy_pct self.analyze_indicator_availability_by_year() self.save_analytical_table() @@ -1087,12 +548,11 @@ class AnalyticalLayerLoader: self.logger.info("\n" + "=" * 80) self.logger.info("COMPLETED") self.logger.info("=" * 80) - self.logger.info(f" Duration : {duration:.2f}s") - self.logger.info(f" Year Range : {self.start_year}-{self.end_year}") - self.logger.info(f" SDG Start Yr : {self.sdg_start_year}") - self.logger.info(f" Countries : {len(self.selected_country_ids)}") - self.logger.info(f" Indicators : {self.df_clean['indicator_id'].nunique()}") - self.logger.info(f" Rows Loaded : {self.pipeline_metadata['rows_loaded']:,}") + self.logger.info(f" Duration : {duration:.2f}s") + self.logger.info(f" Year Range : {self.start_year}-{self.end_year}") + self.logger.info(f" Countries : {len(self.selected_country_ids)}") + self.logger.info(f" Indicators : {self.df_clean['indicator_id'].nunique()}") + self.logger.info(f" Rows Loaded: {self.pipeline_metadata['rows_loaded']:,}") # ============================================================================= @@ -1100,6 +560,10 @@ class AnalyticalLayerLoader: # ============================================================================= def run_analytical_layer(): + """ + Airflow task: Build fact_asean_food_security_selected dari fact_food_security + dims. + Dipanggil setelah dimensional_model_to_gold selesai. + """ from scripts.bigquery_config import get_bigquery_client client = get_bigquery_client() loader = AnalyticalLayerLoader(client) @@ -1113,11 +577,7 @@ def run_analytical_layer(): if __name__ == "__main__": print("=" * 80) - print("BIGQUERY ANALYTICAL LAYER - DATA FILTERING") - print("Output: fact_asean_food_security_selected -> fs_asean_gold") - print(f"Norm: min-max 1-100 per indicator, direction-aware") - print(f"Framework: per-row by year | SDG Proxy: FIES only") - print(f"Condition threshold: bad<{THRESHOLD_BAD}, good>{THRESHOLD_GOOD}") + print("Output: fact_asean_food_security_selected → fs_asean_gold") print("=" * 80) logger = setup_logging() @@ -1127,6 +587,4 @@ if __name__ == "__main__": print("\n" + "=" * 80) print("[OK] COMPLETED") - print(f" SDG Start Year : {loader.sdg_start_year}") - print(f" Rows Loaded : {loader.pipeline_metadata['rows_loaded']:,}") print("=" * 80) \ No newline at end of file