diff --git a/dags/etl_food_security.py b/dags/etl_food_security.py index 77fd197..1c69968 100644 --- a/dags/etl_food_security.py +++ b/dags/etl_food_security.py @@ -22,6 +22,8 @@ Kimball ETL Flow: │ agg_pillar_by_country │ │ agg_framework_by_country │ │ agg_framework_asean │ + │ ↓ │ + │ agg_indicator_norm │ │ │ │ AUDIT : etl_logs, etl_metadata (setiap layer) │ └──────────────────────────────────────────────────────────────────────────┘ @@ -36,13 +38,15 @@ Task Order: → dimensional_model_to_gold → analytical_layer_to_gold → aggregation_to_gold + → indicator_norm_aggregation_to_gold Scripts folder harus berisi: - - bigquery_raw_layer.py (run_verify_connection, run_load_fao, ...) - - bigquery_cleaned_layer.py (run_cleaned_integration) - - bigquery_dimensional_model.py (run_dimensional_model) - - bigquery_analytical_layer.py (run_analytical_layer) - - bigquery_analysis_aggregation.py (run_aggregation) + - bigquery_raw_layer.py (run_verify_connection, run_load_fao, ...) + - bigquery_cleaned_layer.py (run_cleaned_integration) + - bigquery_dimensional_model.py (run_dimensional_model) + - bigquery_analytical_layer.py (run_analytical_layer) + - bigquery_analysis_aggregation.py (run_aggregation) + - bigquery_aggraget_fact_selected_layer.py (run_indicator_norm_aggregation) - bigquery_config.py - bigquery_helpers.py - bigquery_datasource.py @@ -71,6 +75,9 @@ from scripts.bigquery_analytical_layer import ( from scripts.bigquery_aggregate_layer import ( run_aggregation, ) +from scripts.bigquery_aggraget_fact_selected_layer import ( + run_indicator_norm_aggregation, +) # DEFAULT ARGS @@ -136,5 +143,21 @@ with DAG( python_callable = run_aggregation ) - - task_verify >> task_fao >> task_worldbank >> task_unicef >> task_staging >> task_cleaned >> task_dimensional >> task_analytical >> task_aggregation \ No newline at end of file + task_indicator_norm = PythonOperator( + task_id = "indicator_norm_aggregation_to_gold", + python_callable = run_indicator_norm_aggregation + ) + + # Task Dependencies + ( + task_verify + >> task_fao + >> task_worldbank + >> task_unicef + >> task_staging + >> task_cleaned + >> task_dimensional + >> task_analytical + >> task_aggregation + >> task_indicator_norm + ) \ No newline at end of file diff --git a/scripts/bigquery_aggraget_fact_selected_layer.py b/scripts/bigquery_aggraget_fact_selected_layer.py new file mode 100644 index 0000000..5825b90 --- /dev/null +++ b/scripts/bigquery_aggraget_fact_selected_layer.py @@ -0,0 +1,730 @@ +""" +BIGQUERY ANALYSIS LAYER - INDICATOR NORM AGGREGATION +Tabel: agg_indicator_norm -> fs_asean_gold + +Tujuan: + Menghitung norm_value per indikator per negara per tahun, sehingga dapat + melihat performa setiap indikator secara individual (lower_better & higher_better + sudah dibalik). + +Framework Classification Logic: + - Semua indikator berlabel "MDGs" secara default. + - Indikator yang ada dalam SDG_ONLY_KEYWORDS akan berlabel "SDGs" mulai dari + sdgs_start_year (tahun pertama FIES hadir, dihitung otomatis). + - Indikator yang SUDAH ADA sebelum sdgs_start_year DAN juga termasuk + SDG_ONLY_KEYWORDS akan memiliki DUA label framework: + * "MDGs" untuk year < sdgs_start_year + * "SDGs" untuk year >= sdgs_start_year + - Indikator yang TIDAK ada dalam SDG_ONLY_KEYWORDS selalu "MDGs". + +Output Schema (agg_indicator_norm): + year, country_id, country_name, + indicator_id, indicator_name, direction, + pillar_id, pillar_name, + framework, -- "MDGs" | "SDGs" + value, -- raw value asli + norm_value, -- 0-1, direction sudah diperhitungkan + norm_score_1_100, -- scaled 1-100 (global per indikator) + rank_in_indicator_year, -- rank negara di dalam satu indikator & tahun + rank_in_country_year -- rank indikator di dalam satu negara & tahun +""" + +import pandas as pd +import numpy as np +from datetime import datetime +import logging +import json + +from scripts.bigquery_config import get_bigquery_client +from scripts.bigquery_helpers import ( + log_update, + load_to_bigquery, + read_from_bigquery, + setup_logging, + save_etl_metadata, +) +from google.cloud import bigquery + + +# ============================================================================= +# SDG-ONLY KEYWORD SET +# ============================================================================= + +SDG_ONLY_KEYWORDS: frozenset = frozenset([ + # TARGET 2.1.1 - Undernourishment + "prevalence of undernourishment (percent) (3-year average)", + "number of people undernourished (million) (3-year average)", + # TARGET 2.1.2 - Food Insecurity (FIES) + "prevalence of severe food insecurity in the total population (percent) (3-year average)", + "prevalence of severe food insecurity in the male adult population (percent) (3-year average)", + "prevalence of severe food insecurity in the female adult population (percent) (3-year average)", + "prevalence of moderate or severe food insecurity in the total population (percent) (3-year average)", + "prevalence of moderate or severe food insecurity in the male adult population (percent) (3-year average)", + "prevalence of moderate or severe food insecurity in the female adult population (percent) (3-year average)", + "number of severely food insecure people (million) (3-year average)", + "number of severely food insecure male adults (million) (3-year average)", + "number of severely food insecure female adults (million) (3-year average)", + "number of moderately or severely food insecure people (million) (3-year average)", + "number of moderately or severely food insecure male adults (million) (3-year average)", + "number of moderately or severely food insecure female adults (million) (3-year average)", + # TARGET 2.2.1 - Stunting + "percentage of children under 5 years of age who are stunted (modelled estimates) (percent)", + "number of children under 5 years of age who are stunted (modeled estimates) (million)", + # TARGET 2.2.2 - Wasting + "percentage of children under 5 years affected by wasting (percent)", + "number of children under 5 years affected by wasting (million)", + # TARGET 2.2.2 - Overweight (children) + "percentage of children under 5 years of age who are overweight (modelled estimates) (percent)", + "number of children under 5 years of age who are overweight (modeled estimates) (million)", + # TARGET 2.2.3 - Anaemia + "prevalence of anemia among women of reproductive age (15-49 years) (percent)", + "number of women of reproductive age (15-49 years) affected by anemia (million)", +]) + +# Lowercase set untuk matching case-insensitive +_SDG_ONLY_LOWER: frozenset = frozenset(k.lower() for k in SDG_ONLY_KEYWORDS) + +# FIES-specific keywords untuk deteksi sdgs_start_year +# (indikator yang HANYA muncul setelah SDGs era dimulai) +_FIES_DETECTION_KEYWORDS: frozenset = frozenset([ + "prevalence of severe food insecurity in the total population (percent) (3-year average)", + "prevalence of moderate or severe food insecurity in the total population (percent) (3-year average)", + "number of severely food insecure people (million) (3-year average)", + "number of moderately or severely food insecure people (million) (3-year average)", +]) +_FIES_DETECTION_LOWER: frozenset = frozenset(k.lower() for k in _FIES_DETECTION_KEYWORDS) + +DIRECTION_INVERT_KEYWORDS = frozenset({ + "negative", "lower_better", "lower_is_better", "inverse", "neg", +}) +DIRECTION_POSITIVE_KEYWORDS = frozenset({ + "positive", "higher_better", "higher_is_better", +}) + + +# ============================================================================= +# PURE HELPERS +# ============================================================================= + +def _should_invert(direction: str, logger=None, context: str = "") -> bool: + d = str(direction).lower().strip() + if d in DIRECTION_INVERT_KEYWORDS: + return True + if d in DIRECTION_POSITIVE_KEYWORDS: + return False + if logger: + logger.warning( + f" [DIRECTION WARNING] Unknown direction '{direction}' " + f"{'(' + context + ')' if context else ''}. Defaulting to positive (no invert)." + ) + return False + + +def global_minmax(series: pd.Series, lo: float = 1.0, hi: float = 100.0) -> pd.Series: + values = series.dropna().values + if len(values) == 0: + return pd.Series(np.nan, index=series.index) + v_min, v_max = values.min(), values.max() + if v_min == v_max: + return pd.Series((lo + hi) / 2.0, index=series.index) + result = np.full(len(series), np.nan) + not_nan = series.notna() + result[not_nan.values] = lo + (series[not_nan].values - v_min) / (v_max - v_min) * (hi - lo) + return pd.Series(result, index=series.index) + + +# ============================================================================= +# MAIN CLASS +# ============================================================================= + +class IndicatorNormAggregator: + """ + Hitung norm_value per indikator untuk seluruh data di + fact_asean_food_security_selected, lalu simpan ke agg_indicator_norm. + + Alur: + 1. Load fact_asean_food_security_selected + 2. Deteksi sdgs_start_year (tahun pertama FIES hadir di data) + 3. Assign framework per baris mengikuti aturan MDGs/SDGs dual-label + 4. Hitung norm_value per indikator (direction-aware, 0-1) + 5. Scale ke 1-100 per indikator (global) + 6. Hitung rank_in_indicator_year & rank_in_country_year + 7. Simpan ke BigQuery + """ + + def __init__(self, client: bigquery.Client): + self.client = client + self.logger = logging.getLogger(self.__class__.__name__) + self.logger.propagate = False + + self.df = None + self.sdgs_start_year = None + + self.pipeline_start = None + self.pipeline_metadata = { + "rows_fetched": 0, + "rows_loaded" : 0, + "start_time" : None, + "end_time" : None, + } + + # ========================================================================= + # STEP 1: Load + # ========================================================================= + + def load_data(self): + self.logger.info("\n" + "=" * 80) + self.logger.info("STEP 1: LOAD DATA — fact_asean_food_security_selected") + self.logger.info("=" * 80) + + self.df = read_from_bigquery( + self.client, "fact_asean_food_security_selected", layer="gold" + ) + + required = { + "country_id", "country_name", + "indicator_id", "indicator_name", "direction", + "pillar_id", "pillar_name", + "year", "value", + } + missing = required - set(self.df.columns) + if missing: + raise ValueError(f"Kolom tidak ditemukan: {missing}") + + n_null = self.df["direction"].isna().sum() + if n_null > 0: + self.logger.warning(f" {n_null} rows direction NULL -> diisi 'positive'") + self.df["direction"] = self.df["direction"].fillna("positive") + + self.pipeline_metadata["rows_fetched"] = len(self.df) + self.logger.info(f" Rows : {len(self.df):,}") + self.logger.info(f" Countries : {self.df['country_id'].nunique()}") + self.logger.info(f" Indicators: {self.df['indicator_id'].nunique()}") + self.logger.info( + f" Years : {int(self.df['year'].min())} - {int(self.df['year'].max())}" + ) + + # ========================================================================= + # STEP 2: Deteksi sdgs_start_year + # ========================================================================= + + def _detect_sdgs_start_year(self) -> int: + """ + sdgs_start_year = tahun pertama FIES hadir di data. + FIES = indikator yang ada di _FIES_DETECTION_LOWER. + + Fallback ke metode gap-terbesar pada min_year distribusi per indikator + jika FIES tidak ditemukan. + """ + self.logger.info("\n" + "=" * 80) + self.logger.info("STEP 2: DETECT sdgs_start_year (first FIES year)") + self.logger.info("=" * 80) + + # Metode 1: Explicit FIES detection + fies_rows = self.df[ + self.df["indicator_name"].str.lower().str.strip().isin(_FIES_DETECTION_LOWER) + ] + if not fies_rows.empty: + sdgs_start = int(fies_rows["year"].min()) + n_fies_ind = fies_rows["indicator_name"].nunique() + self.logger.info(f" [Metode 1 - FIES explicit] sdgs_start_year = {sdgs_start}") + self.logger.info(f" FIES indicators found: {n_fies_ind}, first year = {sdgs_start}") + for nm in fies_rows["indicator_name"].unique(): + min_y = int(fies_rows[fies_rows["indicator_name"] == nm]["year"].min()) + self.logger.info(f" - {nm[:60]} (first year: {min_y})") + return sdgs_start + + # Fallback: gap-terbesar + self.logger.info(" [Metode 1] Tidak ada FIES rows -> fallback gap-terbesar") + ind_min_year = ( + self.df.groupby("indicator_id")["year"] + .min().reset_index() + .rename(columns={"year": "min_year"}) + ) + unique_years = sorted(ind_min_year["min_year"].unique()) + self.logger.info(f" Unique min_year per indikator: {unique_years}") + + if len(unique_years) == 1: + sdgs_start = int(unique_years[0]) + 9999 + self.logger.info(" Hanya 1 cluster -> semua MDGs") + else: + gaps = [ + (unique_years[i+1] - unique_years[i], unique_years[i], unique_years[i+1]) + for i in range(len(unique_years) - 1) + ] + gaps.sort(reverse=True) + _, y_before, y_after = gaps[0] + sdgs_start = int(y_after) + self.logger.info( + f" Gap terbesar: {y_before} -> {y_after} -> sdgs_start_year = {sdgs_start}" + ) + + return sdgs_start + + # ========================================================================= + # STEP 3: Assign framework + # ========================================================================= + + def _assign_framework(self): + """ + Tambahkan kolom 'framework' ke self.df. + + Aturan per baris: + - Indikator TIDAK di SDG_ONLY_KEYWORDS: + framework = "MDGs" (selalu, semua tahun) + + - Indikator DI SDG_ONLY_KEYWORDS: + year < sdgs_start_year -> framework = "MDGs" + year >= sdgs_start_year -> framework = "SDGs" + + Contoh dual-label (indicator "prevalence of undernourishment"): + Jika data ada dari 2013 dan sdgs_start_year = 2019: + - Baris 2013-2018: framework = "MDGs" (masuk era MDGs) + - Baris 2019-dst : framework = "SDGs" (masuk era SDGs) + Sehingga indikator ini muncul di kedua framework tanpa duplikasi baris. + + Contoh FIES-only (indicator "prevalence of severe food insecurity"): + Data baru ada mulai 2019 (= sdgs_start_year): + - Semua baris: framework = "SDGs" + """ + self.logger.info("\n" + "=" * 80) + self.logger.info("STEP 3: ASSIGN FRAMEWORK PER BARIS") + self.logger.info(f" sdgs_start_year = {self.sdgs_start_year}") + self.logger.info("=" * 80) + + df = self.df.copy() + + # Flag apakah indikator ada di SDG_ONLY_KEYWORDS + df["_is_sdg_kw"] = df["indicator_name"].str.lower().str.strip().isin(_SDG_ONLY_LOWER) + + # Default semua MDGs + df["framework"] = "MDGs" + + # SDG_ONLY + year >= sdgs_start_year -> SDGs + mask_sdgs = df["_is_sdg_kw"] & (df["year"] >= self.sdgs_start_year) + df.loc[mask_sdgs, "framework"] = "SDGs" + + # Drop helper column + df = df.drop(columns=["_is_sdg_kw"]) + + # ---- Logging ---- + fw_dist = df["framework"].value_counts() + self.logger.info("\n Framework distribution (rows):") + for fw, cnt in fw_dist.items(): + self.logger.info(f" {fw:<6}: {cnt:,} rows") + + # Cek berapa indikator punya dual-framework + dual = ( + df.groupby("indicator_id")["framework"] + .nunique() + .reset_index() + .rename(columns={"framework": "n_frameworks"}) + ) + dual_ids = dual[dual["n_frameworks"] > 1]["indicator_id"].tolist() + self.logger.info( + f"\n Indikator dengan DUAL framework (MDGs + SDGs): {len(dual_ids)}" + ) + if dual_ids: + for iid in dual_ids: + ind_name = df[df["indicator_id"] == iid]["indicator_name"].iloc[0] + yr_range = df[df["indicator_id"] == iid][["year", "framework"]].drop_duplicates() + mdgs_yrs = sorted(yr_range[yr_range["framework"] == "MDGs"]["year"].tolist()) + sdgs_yrs = sorted(yr_range[yr_range["framework"] == "SDGs"]["year"].tolist()) + self.logger.info( + f" [{iid}] {ind_name[:55]}\n" + f" MDGs years: {mdgs_yrs}\n" + f" SDGs years: {sdgs_yrs}" + ) + + self.logger.info( + f"\n Indikator SDGs only (semua tahun = SDGs): " + f"{len(dual[(dual['n_frameworks'] == 1)].merge(df[df['framework'] == 'SDGs'][['indicator_id']].drop_duplicates(), on='indicator_id'))}" + ) + + self.df = df + + # ========================================================================= + # STEP 4: Hitung norm_value per indikator (direction-aware) + # ========================================================================= + + def _compute_norm_values(self) -> pd.DataFrame: + """ + Normalisasi per indikator secara global (semua tahun & negara): + norm_value = (raw - min) / (max - min) [higher_better] + norm_value = 1 - (raw - min) / (max - min) [lower_better] + + Normalisasi dilakukan satu kali per indicator_id, + mencakup SEMUA baris (MDGs + SDGs dari indikator yang sama) + agar skor konsisten antar framework. + """ + self.logger.info("\n" + "=" * 80) + self.logger.info("STEP 4: COMPUTE NORM_VALUE PER INDICATOR (direction-aware)") + self.logger.info("=" * 80) + + df = self.df.copy() + norm_parts = [] + + for ind_id, grp in df.groupby("indicator_id"): + grp = grp.copy() + direction = str(grp["direction"].iloc[0]) + do_invert = _should_invert( + direction, self.logger, context=f"indicator_id={ind_id}" + ) + + valid_mask = grp["value"].notna() + n_valid = valid_mask.sum() + + if n_valid < 2: + grp["norm_value"] = np.nan + norm_parts.append(grp) + self.logger.warning( + f" [SKIP] indicator_id={ind_id}: only {n_valid} valid values" + ) + continue + + raw = grp.loc[valid_mask, "value"].values + v_min = raw.min() + v_max = raw.max() + + normed = np.full(len(grp), np.nan) + if v_min == v_max: + normed[valid_mask.values] = 0.5 + else: + normed[valid_mask.values] = (raw - v_min) / (v_max - v_min) + + if do_invert: + normed = np.where(np.isnan(normed), np.nan, 1.0 - normed) + + grp["norm_value"] = normed + norm_parts.append(grp) + + df_normed = pd.concat(norm_parts, ignore_index=True) + + n_ind_computed = df_normed["indicator_id"].nunique() + self.logger.info(f" norm_value computed: {n_ind_computed} indicators") + self.logger.info( + f" norm_value range : " + f"{df_normed['norm_value'].min():.4f} - {df_normed['norm_value'].max():.4f}" + ) + self.logger.info( + f" norm_value nulls : {df_normed['norm_value'].isna().sum()}" + ) + return df_normed + + # ========================================================================= + # STEP 5: Scale ke 1-100, hitung rank + # ========================================================================= + + def _compute_scores_and_ranks(self, df: pd.DataFrame) -> pd.DataFrame: + """ + norm_score_1_100: + Scale norm_value ke 1-100 secara global PER INDIKATOR + (semua tahun & negara dalam satu indikator di-scale bersama). + + rank_in_indicator_year: + Rank negara dalam satu (indicator_id, year). + rank=1 -> negara dengan norm_score terbaik untuk indikator tsb di tahun tsb. + + rank_in_country_year: + Rank indikator dalam satu (country_id, year). + rank=1 -> indikator dengan norm_score terbaik untuk negara tsb di tahun tsb. + """ + self.logger.info("\n" + "=" * 80) + self.logger.info("STEP 5: SCALE TO 1-100 & COMPUTE RANKS") + self.logger.info("=" * 80) + + # Scale per indikator + score_parts = [] + for ind_id, grp in df.groupby("indicator_id"): + grp = grp.copy() + grp["norm_score_1_100"] = global_minmax(grp["norm_value"]) + score_parts.append(grp) + df = pd.concat(score_parts, ignore_index=True) + + # rank_in_indicator_year: rank negara per (indicator, year) + df["rank_in_indicator_year"] = ( + df.groupby(["indicator_id", "year"])["norm_score_1_100"] + .rank(method="min", ascending=False) + .astype("Int64") + ) + + # rank_in_country_year: rank indikator per (country, year) + df["rank_in_country_year"] = ( + df.groupby(["country_id", "year"])["norm_score_1_100"] + .rank(method="min", ascending=False) + .astype("Int64") + ) + + self.logger.info( + f" norm_score_1_100 range : " + f"{df['norm_score_1_100'].min():.2f} - {df['norm_score_1_100'].max():.2f}" + ) + self.logger.info( + f" rank_in_indicator_year max: {df['rank_in_indicator_year'].max()}" + ) + self.logger.info( + f" rank_in_country_year max : {df['rank_in_country_year'].max()}" + ) + return df + + # ========================================================================= + # STEP 6: Save to BigQuery + # ========================================================================= + + def _save(self, df: pd.DataFrame) -> int: + table_name = "agg_indicator_norm" + + self.logger.info("\n" + "=" * 80) + self.logger.info(f"STEP 6: SAVE -> [Gold] {table_name}") + self.logger.info("=" * 80) + + out = df[[ + "year", + "country_id", + "country_name", + "indicator_id", + "indicator_name", + "direction", + "pillar_id", + "pillar_name", + "framework", + "value", + "norm_value", + "norm_score_1_100", + "rank_in_indicator_year", + "rank_in_country_year", + ]].copy() + + out = out.sort_values( + ["year", "country_name", "pillar_name", "indicator_name"] + ).reset_index(drop=True) + + # Cast + out["year"] = out["year"].astype(int) + out["country_id"] = out["country_id"].astype(int) + out["country_name"] = out["country_name"].astype(str) + out["indicator_id"] = out["indicator_id"].astype(int) + out["indicator_name"] = out["indicator_name"].astype(str) + out["direction"] = out["direction"].astype(str) + out["pillar_id"] = out["pillar_id"].astype(int) + out["pillar_name"] = out["pillar_name"].astype(str) + out["framework"] = out["framework"].astype(str) + out["value"] = out["value"].astype(float) + out["norm_value"] = out["norm_value"].astype(float) + out["norm_score_1_100"] = out["norm_score_1_100"].astype(float) + out["rank_in_indicator_year"] = pd.to_numeric( + out["rank_in_indicator_year"], errors="coerce" + ).astype("Int64") + out["rank_in_country_year"] = pd.to_numeric( + out["rank_in_country_year"], errors="coerce" + ).astype("Int64") + + self.logger.info(f" Columns : {list(out.columns)}") + self.logger.info(f" Total rows : {len(out):,}") + self.logger.info(f" Countries : {out['country_id'].nunique()}") + self.logger.info(f" Indicators : {out['indicator_id'].nunique()}") + self.logger.info(f" Years : {int(out['year'].min())} - {int(out['year'].max())}") + self.logger.info(f" Frameworks : {dict(out['framework'].value_counts())}") + + schema = [ + bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("country_id", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("country_name", "STRING", mode="REQUIRED"), + bigquery.SchemaField("indicator_id", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("indicator_name", "STRING", mode="REQUIRED"), + bigquery.SchemaField("direction", "STRING", mode="REQUIRED"), + bigquery.SchemaField("pillar_id", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("pillar_name", "STRING", mode="REQUIRED"), + bigquery.SchemaField("framework", "STRING", mode="REQUIRED"), + bigquery.SchemaField("value", "FLOAT", mode="REQUIRED"), + bigquery.SchemaField("norm_value", "FLOAT", mode="NULLABLE"), + bigquery.SchemaField("norm_score_1_100", "FLOAT", mode="NULLABLE"), + bigquery.SchemaField("rank_in_indicator_year", "INTEGER", mode="NULLABLE"), + bigquery.SchemaField("rank_in_country_year", "INTEGER", mode="NULLABLE"), + ] + + rows_loaded = load_to_bigquery( + self.client, out, table_name, + layer="gold", write_disposition="WRITE_TRUNCATE", schema=schema, + ) + + log_update(self.client, "DW", table_name, "full_load", rows_loaded) + self.logger.info(f" [OK] {table_name}: {rows_loaded:,} rows -> [Gold] fs_asean_gold") + + metadata = { + "source_class" : self.__class__.__name__, + "table_name" : table_name, + "execution_timestamp": self.pipeline_start, + "duration_seconds" : (datetime.now() - self.pipeline_start).total_seconds(), + "rows_fetched" : self.pipeline_metadata["rows_fetched"], + "rows_transformed" : rows_loaded, + "rows_loaded" : rows_loaded, + "completeness_pct" : 100.0, + "config_snapshot" : json.dumps({ + "sdgs_start_year" : self.sdgs_start_year, + "sdg_only_keywords_n" : len(SDG_ONLY_KEYWORDS), + "layer" : "gold", + "normalization" : "per_indicator_global_minmax", + "direction_handling" : "lower_better_inverted", + "framework_logic" : ( + "SDG_ONLY_KEYWORDS: MDGs if year < sdgs_start_year, " + "SDGs if year >= sdgs_start_year. " + "Non-SDG_ONLY: always MDGs." + ), + }), + "validation_metrics" : json.dumps({ + "total_rows" : rows_loaded, + "n_indicators" : int(out["indicator_id"].nunique()), + "n_countries" : int(out["country_id"].nunique()), + "sdgs_start_year": self.sdgs_start_year, + }), + } + save_etl_metadata(self.client, metadata) + self.logger.info(" Metadata -> [AUDIT] etl_metadata") + return rows_loaded + + # ========================================================================= + # STEP 7: Summary log + # ========================================================================= + + def _log_summary(self, df: pd.DataFrame): + self.logger.info("\n" + "=" * 80) + self.logger.info("STEP 7: SUMMARY") + self.logger.info("=" * 80) + + # Per framework & year + summary = ( + df.groupby(["framework", "year"]) + .agg( + n_indicators=("indicator_id", "nunique"), + n_countries =("country_id", "nunique"), + avg_score =("norm_score_1_100", "mean"), + ) + .reset_index() + ) + self.logger.info( + f"\n{'Framework':<8} {'Year':<6} {'Indicators':<12} {'Countries':<12} {'Avg Score'}" + ) + self.logger.info("-" * 55) + for _, r in summary.iterrows(): + self.logger.info( + f"{r['framework']:<8} {int(r['year']):<6} " + f"{int(r['n_indicators']):<12} {int(r['n_countries']):<12} " + f"{r['avg_score']:.2f}" + ) + + # Top 5 & Bottom 5 indikator (rata-rata norm_score_1_100) + ind_avg = ( + df.groupby(["indicator_id", "indicator_name", "pillar_name", "direction"]) + ["norm_score_1_100"].mean() + .reset_index() + .sort_values("norm_score_1_100", ascending=False) + ) + + self.logger.info( + "\n TOP 5 Indicators (avg norm_score_1_100 across all years & countries):" + ) + for _, r in ind_avg.head(5).iterrows(): + tag = "[lower+]" if r["direction"] in DIRECTION_INVERT_KEYWORDS else "[higher+]" + self.logger.info( + f" [{int(r['indicator_id'])}] {r['indicator_name'][:55]:<57} " + f"{r['norm_score_1_100']:.2f} {tag}" + ) + + self.logger.info("\n BOTTOM 5 Indicators:") + for _, r in ind_avg.tail(5).iterrows(): + tag = "[lower+]" if r["direction"] in DIRECTION_INVERT_KEYWORDS else "[higher+]" + self.logger.info( + f" [{int(r['indicator_id'])}] {r['indicator_name'][:55]:<57} " + f"{r['norm_score_1_100']:.2f} {tag}" + ) + + # Indikator per pillar + pillar_summary = ( + df.drop_duplicates(subset=["indicator_id", "pillar_name"]) + .groupby("pillar_name")["indicator_id"] + .count() + .reset_index() + .rename(columns={"indicator_id": "n_indicators"}) + ) + self.logger.info("\n Indicators per pillar:") + for _, r in pillar_summary.iterrows(): + self.logger.info(f" {r['pillar_name']:<30}: {r['n_indicators']}") + + # ========================================================================= + # RUN + # ========================================================================= + + def run(self): + self.pipeline_start = datetime.now() + self.pipeline_metadata["start_time"] = self.pipeline_start + + self.logger.info("\n" + "=" * 80) + self.logger.info("INDICATOR NORM AGGREGATION") + self.logger.info(" Source : fact_asean_food_security_selected") + self.logger.info(" Output : agg_indicator_norm -> fs_asean_gold") + self.logger.info("=" * 80) + + self.load_data() + self.sdgs_start_year = self._detect_sdgs_start_year() + self._assign_framework() + df_normed = self._compute_norm_values() + df_final = self._compute_scores_and_ranks(df_normed) + rows_loaded = self._save(df_final) + self.pipeline_metadata["rows_loaded"] = rows_loaded + self._log_summary(df_final) + + self.pipeline_metadata["end_time"] = datetime.now() + duration = ( + self.pipeline_metadata["end_time"] - self.pipeline_start + ).total_seconds() + + self.logger.info("\n" + "=" * 80) + self.logger.info("COMPLETED") + self.logger.info("=" * 80) + self.logger.info(f" Duration : {duration:.2f}s") + self.logger.info(f" Rows Fetched : {self.pipeline_metadata['rows_fetched']:,}") + self.logger.info(f" Rows Loaded : {rows_loaded:,}") + self.logger.info(f" sdgs_start_year : {self.sdgs_start_year}") + + +# ============================================================================= +# AIRFLOW TASK +# ============================================================================= + +def run_indicator_norm_aggregation(): + """ + Airflow task: Build agg_indicator_norm. + Dipanggil setelah analytical_layer_to_gold selesai. + """ + client = get_bigquery_client() + agg = IndicatorNormAggregator(client) + agg.run() + print(f"agg_indicator_norm loaded: {agg.pipeline_metadata['rows_loaded']:,} rows") + + +# ============================================================================= +# MAIN +# ============================================================================= + +if __name__ == "__main__": + import sys, io + if sys.stdout.encoding and sys.stdout.encoding.lower() not in ("utf-8", "utf8"): + sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", errors="replace") + if sys.stderr.encoding and sys.stderr.encoding.lower() not in ("utf-8", "utf8"): + sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding="utf-8", errors="replace") + + print("=" * 80) + print("INDICATOR NORM AGGREGATION -> fs_asean_gold") + print(" Source : fact_asean_food_security_selected") + print(" Output : agg_indicator_norm") + print("=" * 80) + + logger = setup_logging() + client = get_bigquery_client() + agg = IndicatorNormAggregator(client) + agg.run() + + print("\n" + "=" * 80) + print("[OK] COMPLETED") + print("=" * 80) \ No newline at end of file