From 47ea9c04920a85b08de78d45b267e10078b7cefa Mon Sep 17 00:00:00 2001 From: Debby Date: Thu, 2 Apr 2026 19:58:05 +0700 Subject: [PATCH] code last done --- scripts/bigquery_aggregate_layer.py | 988 ++++++++++++-------------- scripts/bigquery_analytical_layer.py | 774 +++----------------- scripts/bigquery_cleaned_layer.py | 114 +-- scripts/bigquery_dimensional_model.py | 206 +++--- 4 files changed, 708 insertions(+), 1374 deletions(-) diff --git a/scripts/bigquery_aggregate_layer.py b/scripts/bigquery_aggregate_layer.py index 9cbc343..977a0d4 100644 --- a/scripts/bigquery_aggregate_layer.py +++ b/scripts/bigquery_aggregate_layer.py @@ -1,43 +1,13 @@ """ BIGQUERY ANALYSIS LAYER - FOOD SECURITY AGGREGATION Semua agregasi pakai norm_value dari _get_norm_value_df() - -PERBAIKAN (vs versi sebelumnya): -───────────────────────────────────────────────────────────────────────────── -1. NORMALIZE_FRAMEWORKS_JOINTLY dihapus. - Setelah perbaikan di analytical_layer, norm_value_1_100 sudah dihitung - SEKALI per indikator dari seluruh data (semua tahun, semua negara). - Tidak ada lagi rescaling ulang per-framework di layer ini. - Semua framework (MDGs, SDGs, Total) menggunakan norm_value yang SAMA - sebagai basis, sehingga skor mereka berada pada skala yang setara. - -2. _get_norm_value_df() DISEDERHANAKAN. - Fungsi ini sekarang hanya membaca kolom norm_value_1_100 yang sudah ada - di fact_asean_food_security_selected (hasil dari analytical_layer), - kemudian memetakan ke skala 0-1 untuk keperluan agregasi internal. - TIDAK ada lagi normalisasi ulang per indikator di sini. - -3. global_minmax() TETAP DIGUNAKAN untuk mengubah rata-rata norm (0-1) menjadi - skor 1-100 di level agregasi (pillar / country / asean). - Ini adalah rescaling level AGREGAT (bukan level indikator), sehingga masih - valid dan tidak menimbulkan bias komparabilitas. - -4. Framework MDGs dan SDGs sekarang comparable: - - Jika skor SDGs < skor MDGs → memang karena indikator SDGs mengukur - dimensi deprivasi yang lebih dalam (substantif), bukan artefak teknis. - - Log diagnostik ditambahkan untuk memverifikasi ini. - -5. Kolom 'condition' (good/moderate/bad) TETAP dengan threshold yang sama. - -Simpan 6 tabel ke fs_asean_gold (layer='gold'): +UPDATED: Simpan 6 tabel ke fs_asean_gold (layer='gold'): - agg_pillar_composite - agg_pillar_by_country - agg_framework_by_country - agg_framework_asean - agg_narrative_overview - agg_narrative_pillar - -SOURCE TABLE: fact_asean_food_security_selected """ import pandas as pd @@ -70,23 +40,7 @@ DIRECTION_POSITIVE_KEYWORDS = frozenset({ "positive", "higher_better", "higher_is_better", }) -# Threshold kondisi — fixed absolute, skala 1-100 -THRESHOLD_BAD = 40.0 -THRESHOLD_GOOD = 60.0 - - -def assign_condition(score) -> str: - """ - Assign kondisi berdasarkan score skala 1-100 (direction-aware, nilai tinggi = lebih baik). - Returns: 'good' / 'moderate' / 'bad' / None jika NaN - """ - if score is None or (isinstance(score, float) and np.isnan(score)): - return None - if score > THRESHOLD_GOOD: - return 'good' - if score < THRESHOLD_BAD: - return 'bad' - return 'moderate' +NORMALIZE_FRAMEWORKS_JOINTLY = False # ============================================================================= @@ -128,11 +82,6 @@ def _should_invert(direction: str, logger=None, context: str = "") -> bool: def global_minmax(series: pd.Series, lo: float = 1.0, hi: float = 100.0) -> pd.Series: - """ - Rescale series ke rentang [lo, hi]. - Digunakan untuk mengubah norm agregat (0-1) menjadi skor 1-100 di level - pillar / country / asean. Bukan untuk normalisasi indikator mentah. - """ values = series.dropna().values if len(values) == 0: return pd.Series(np.nan, index=series.index) @@ -155,9 +104,7 @@ def add_yoy(df: pd.DataFrame, group_cols: list, score_col: str) -> pd.DataFrame: return df -def safe_int( - series: pd.Series, fill: int = 0, col_name: str = "", logger=None -) -> pd.Series: +def safe_int(series: pd.Series, fill: int = 0, col_name: str = "", logger=None) -> pd.Series: n_nan = series.isna().sum() if n_nan > 0 and logger: logger.warning( @@ -166,9 +113,7 @@ def safe_int( return series.fillna(fill).astype(int) -def check_and_dedup( - df: pd.DataFrame, key_cols: list, context: str = "", logger=None -) -> pd.DataFrame: +def check_and_dedup(df: pd.DataFrame, key_cols: list, context: str = "", logger=None) -> pd.DataFrame: dupes = df.duplicated(subset=key_cols, keep=False) if dupes.any(): n_dupes = dupes.sum() @@ -186,30 +131,19 @@ def check_and_dedup( return df -def add_condition_column(df: pd.DataFrame, score_col: str) -> pd.DataFrame: - df['condition'] = df[score_col].apply(assign_condition) - return df - - -def log_condition_summary(df: pd.DataFrame, context: str, logger) -> None: - dist = df['condition'].value_counts() - logger.info( - f" Condition distribution ({context}): " + - " | ".join(f"{c}: {n:,}" for c, n in dist.items()) - ) - - # ============================================================================= -# NARRATIVE BUILDER FUNCTIONS (tidak berubah) +# NARRATIVE BUILDER FUNCTIONS (pure functions, easy to unit-test) # ============================================================================= def _fmt_score(score) -> str: + """Format score to 2 decimal places.""" if score is None or (isinstance(score, float) and np.isnan(score)): return "N/A" return f"{score:.2f}" def _fmt_delta(delta) -> str: + """Format YoY delta with sign and 2 decimal places.""" if delta is None or (isinstance(delta, float) and np.isnan(delta)): return "N/A" sign = "+" if delta >= 0 else "" @@ -217,11 +151,33 @@ def _fmt_delta(delta) -> str: def _build_overview_narrative( - year, n_mdg, n_sdg, n_total_ind, score, yoy_val, yoy_pct, - prev_year, prev_score, ranking_list, - most_improved_country, most_improved_delta, - most_declined_country, most_declined_delta, + year: int, + n_mdg: int, + n_sdg: int, + n_total_ind: int, + score: float, + yoy_val, + yoy_pct, + prev_year: int, + prev_score, + ranking_list: list, + most_improved_country, + most_improved_delta, + most_declined_country, + most_declined_delta, ) -> str: + """ + Compose a full English prose narrative for the Overview tab. + + Narrative structure + ------------------- + 1. Indicator composition (MDGs first, then SDGs) + 2. ASEAN score + YoY + 3. Country ranking + 4. Most improved / declined country + """ + + # -- Sentence 1: indicator composition ---------------------------------- parts_ind = [] if n_mdg > 0: parts_ind.append(f"{n_mdg} MDG indicator{'s' if n_mdg > 1 else ''}") @@ -241,6 +197,7 @@ def _build_overview_narrative( f"{n_total_ind} indicator{'s' if n_total_ind != 1 else ''}." ) + # -- Sentence 2: ASEAN score + YoY ------------------------------------- if yoy_val is not None and prev_score is not None: direction_word = "increasing" if yoy_val >= 0 else "decreasing" pct_clause = "" @@ -259,11 +216,13 @@ def _build_overview_narrative( f"no prior-year data is available for year-over-year comparison." ) + # -- Sentence 3: country ranking ---------------------------- sent3 = "" if ranking_list: first = ranking_list[0] last = ranking_list[-1] middle = ranking_list[1:-1] + if len(ranking_list) == 1: sent3 = ( f"In terms of country performance, {first['country_name']} was the only " @@ -277,11 +236,16 @@ def _build_overview_narrative( f"{_fmt_score(last['score'])} in {year}." ) else: - middle_parts = [f"{c['country_name']} ({_fmt_score(c['score'])})" for c in middle] - middle_str = ( - middle_parts[0] if len(middle_parts) == 1 - else ", ".join(middle_parts[:-1]) + f", and {middle_parts[-1]}" - ) + # Susun semua negara di tengah: "B (xx.xx), C (xx.xx), ..., and Y (xx.xx)" + middle_parts = [ + f"{c['country_name']} ({_fmt_score(c['score'])})" + for c in middle + ] + if len(middle_parts) == 1: + middle_str = middle_parts[0] + else: + middle_str = ", ".join(middle_parts[:-1]) + f", and {middle_parts[-1]}" + sent3 = ( f"In terms of country performance, {first['country_name']} led the region " f"with a score of {_fmt_score(first['score'])}, followed by {middle_str}. " @@ -289,6 +253,7 @@ def _build_overview_narrative( f"of {_fmt_score(last['score'])} in {year}." ) + # -- Sentence 4: most improved / declined ------------------------------ sent4_parts = [] if most_improved_country and most_improved_delta is not None: sent4_parts.append( @@ -312,22 +277,49 @@ def _build_overview_narrative( sent4 = ", ".join(sent4_parts) + "." sent4 = sent4[0].upper() + sent4[1:] + # -- Assemble ---------------------------------------------------------- return " ".join(s for s in [sent1, sent2, sent3, sent4] if s) def _build_pillar_narrative( - year, pillar_name, pillar_score, rank_in_year, n_pillars, yoy_val, - top_country, top_country_score, bot_country, bot_country_score, - strongest_pillar, strongest_score, weakest_pillar, weakest_score, - most_improved_pillar, most_improved_delta, - most_declined_pillar, most_declined_delta, + year: int, + pillar_name: str, + pillar_score: float, + rank_in_year: int, + n_pillars: int, + yoy_val, + top_country, + top_country_score, + bot_country, + bot_country_score, + strongest_pillar, + strongest_score, + weakest_pillar, + weakest_score, + most_improved_pillar, + most_improved_delta, + most_declined_pillar, + most_declined_delta, ) -> str: + """ + Compose a full English prose narrative for a single pillar in a given year. + + Narrative structure + ------------------- + 1. Pillar score and rank + 2. Strongest / weakest pillar context + 3. Top / bottom country within this pillar + 4. YoY movement for this pillar + biggest mover across all pillars + """ + + # -- Sentence 1: pillar overview ---------------------------------------- rank_suffix = {1: "st", 2: "nd", 3: "rd"}.get(rank_in_year, "th") sent1 = ( f"In {year}, the {pillar_name} pillar scored {_fmt_score(pillar_score)}, " f"ranking {rank_in_year}{rank_suffix} out of {n_pillars} pillars assessed across ASEAN." ) + # -- Sentence 2: strongest / weakest context ---------------------------- sent2 = "" if strongest_pillar and weakest_pillar: if strongest_pillar == pillar_name: @@ -349,6 +341,7 @@ def _build_pillar_narrative( f"was the weakest (score: {_fmt_score(weakest_score)})." ) + # -- Sentence 3: country top / bottom within this pillar --------------- sent3 = "" if top_country and bot_country: if top_country != bot_country: @@ -363,6 +356,7 @@ def _build_pillar_narrative( f"with available data, scoring {_fmt_score(top_country_score)}." ) + # -- Sentence 4: YoY movement ------------------------------------------- if yoy_val is not None: direction_word = "improved" if yoy_val >= 0 else "declined" sent4 = ( @@ -375,9 +369,9 @@ def _build_pillar_narrative( f"for the {pillar_name} pillar in {year}" ) - if (most_improved_pillar and most_improved_delta is not None - and most_declined_pillar and most_declined_delta is not None - and most_improved_pillar != most_declined_pillar): + if most_improved_pillar and most_improved_delta is not None \ + and most_declined_pillar and most_declined_delta is not None \ + and most_improved_pillar != most_declined_pillar: sent4 += ( f". Across all pillars, {most_improved_pillar} showed the greatest improvement " f"({_fmt_delta(most_improved_delta)} pts), while {most_declined_pillar} " @@ -387,6 +381,7 @@ def _build_pillar_narrative( sent4 += "." sent4 = sent4[0].upper() + sent4[1:] + # -- Assemble ---------------------------------------------------------- return " ".join(s for s in [sent1, sent2, sent3, sent4] if s) @@ -418,7 +413,7 @@ class FoodSecurityAggregator: self.sdgs_indicator_ids = set() # ========================================================================= - # STEP 1: Load data + # STEP 1: Load data dari Gold layer # ========================================================================= def load_data(self): @@ -426,92 +421,47 @@ class FoodSecurityAggregator: self.logger.info("STEP 1: LOAD DATA from fs_asean_gold") self.logger.info("=" * 70) - self.df = read_from_bigquery( - self.client, "fact_asean_food_security_selected", layer='gold' + self.df = read_from_bigquery(self.client, "analytical_food_security", layer='gold') + self.logger.info(f" analytical_food_security : {len(self.df):,} rows") + + self.dims["country"] = read_from_bigquery(self.client, "dim_country", layer='gold') + self.dims["indicator"] = read_from_bigquery(self.client, "dim_indicator", layer='gold') + self.dims["pillar"] = read_from_bigquery(self.client, "dim_pillar", layer='gold') + self.dims["time"] = read_from_bigquery(self.client, "dim_time", layer='gold') + + ind_cols = ["indicator_id"] + if "direction" in self.dims["indicator"].columns: + ind_cols.append("direction") + + self.df = ( + self.df + .merge(self.dims["time"][["time_id", "year"]], on="time_id", how="left") + .merge(self.dims["country"][["country_id", "country_name"]], on="country_id", how="left") + .merge(self.dims["pillar"][["pillar_id", "pillar_name"]], on="pillar_id", how="left") + .merge(self.dims["indicator"][ind_cols], on="indicator_id", how="left") ) - self.logger.info(f" fact_asean_food_security_selected : {len(self.df):,} rows") - required_cols = { - "country_id", "country_name", - "indicator_id", "indicator_name", "direction", "framework", - "pillar_id", "pillar_name", - "time_id", "year", "value", - # PERBAIKAN: norm_value_1_100 wajib ada (hasil analytical_layer) - "norm_value_1_100", - } - missing_cols = required_cols - set(self.df.columns) - if missing_cols: - raise ValueError( - f"Kolom berikut tidak ditemukan: {missing_cols}\n" - f"Pastikan pipeline dijalankan berurutan:\n" - f" 1. bigquery_cleaned_layer.py\n" - f" 2. bigquery_dimensional_model.py\n" - f" 3. bigquery_analytical_layer.py ← harus dijalankan dulu\n" - f" 4. bigquery_analysis_layer.py (file ini)" - ) - - self.df["direction"] = self.df["direction"].fillna("positive") - self.df["framework"] = self.df["framework"].fillna("MDGs") - self.df["norm_value_1_100"] = self.df["norm_value_1_100"].astype(float) + if "direction" not in self.df.columns: + self.df["direction"] = "positive" + else: + n_null_dir = self.df["direction"].isna().sum() + if n_null_dir > 0: + self.logger.warning(f" [DIRECTION] {n_null_dir} rows dengan direction NULL -> diisi 'positive'") + self.df["direction"] = self.df["direction"].fillna("positive") dir_dist = self.df.drop_duplicates("indicator_id")["direction"].value_counts() self.logger.info(f"\n Distribusi direction per indikator:") for d, cnt in dir_dist.items(): - tag = "INVERT" if _should_invert(d, self.logger, "load_data") else "normal" - self.logger.info(f" {d:<25} : {cnt:>3} [{tag}]") + tag = "INVERT" if _should_invert(d, self.logger, "load_data check") else "normal" + self.logger.info(f" {d:<25} : {cnt:>3} indikator [{tag}]") - fw_dist = self.df.drop_duplicates("indicator_id")["framework"].value_counts() - self.logger.info(f"\n Distribusi framework per indikator:") - for fw, cnt in fw_dist.items(): - self.logger.info(f" {fw:<10} : {cnt:>3}") - - self.logger.info( - f"\n Rows: {len(self.df):,} | Negara: {self.df['country_id'].nunique()} | " - f"Indikator: {self.df['indicator_id'].nunique()} | " - f"Tahun: {int(self.df['year'].min())}-{int(self.df['year'].max())}" - ) - - # Diagnostik: cek komparabilitas norm antar framework - self._log_norm_comparability_diagnostics() - - def _log_norm_comparability_diagnostics(self): - """ - Log diagnostik untuk memverifikasi bahwa norm_value_1_100 sudah comparable - antar framework setelah perbaikan di analytical_layer. - """ - self.logger.info(f"\n [DIAGNOSTIK] Komparabilitas norm_value_1_100 antar framework:") - self.logger.info(f" {'─'*60}") - - fw_stats = ( - self.df.groupby('framework')['norm_value_1_100'] - .agg(['mean', 'median', 'std', 'min', 'max']) - .round(2) - ) - for fw, row in fw_stats.iterrows(): - self.logger.info( - f" {fw:<8} mean={row['mean']:>6.2f} median={row['median']:>6.2f} " - f"std={row['std']:>5.2f} range=[{row['min']:.2f},{row['max']:.2f}]" - ) - - mdgs_mean = self.df[self.df['framework'] == 'MDGs']['norm_value_1_100'].mean() - sdgs_mean = self.df[self.df['framework'] == 'SDGs']['norm_value_1_100'].mean() - gap = mdgs_mean - sdgs_mean - - if abs(gap) > 15: - self.logger.info( - f"\n [INFO] Gap MDGs-SDGs = {gap:.2f} poin." - f"\n Ini adalah perbedaan SUBSTANTIF (bukan artefak normalisasi):" - f"\n Indikator SDGs mengukur deprivasi yang lebih dalam" - f"\n (FIES, stunting, wasting, anaemia) vs indikator MDGs." - f"\n Gap ini valid untuk dilaporkan sebagai temuan analisis." - ) - else: - self.logger.info( - f"\n [OK] Gap MDGs-SDGs = {gap:.2f} poin — dalam batas wajar." - ) + self.logger.info(f"\n Setelah join: {len(self.df):,} rows") + self.logger.info(f" Negara : {self.df['country_id'].nunique()}") + self.logger.info(f" Indikator : {self.df['indicator_id'].nunique()}") + self.logger.info(f" Tahun : {int(self.df['year'].min())} - {int(self.df['year'].max())}") # ========================================================================= - # STEP 1b: Klasifikasi indikator + # STEP 1b: Klasifikasi indikator ke MDGs / SDGs # ========================================================================= def _classify_indicators(self): @@ -519,108 +469,91 @@ class FoodSecurityAggregator: self.logger.info("STEP 1b: KLASIFIKASI INDIKATOR -> MDGs / SDGs") self.logger.info("=" * 70) - self.mdgs_indicator_ids = set( - self.df[self.df["framework"] == "MDGs"]["indicator_id"].unique().tolist() - ) - self.sdgs_indicator_ids = set( - self.df[self.df["framework"] == "SDGs"]["indicator_id"].unique().tolist() + ind_min_year = ( + self.df.groupby("indicator_id")["year"] + .min().reset_index() + .rename(columns={"year": "min_year"}) ) - _PROXY_KW = frozenset(['food insecurity', 'anemia', 'anaemia']) - proxy_mask = ( - (self.df["framework"] == "SDGs") & - self.df["indicator_name"].str.lower().apply( - lambda n: any(kw in n for kw in _PROXY_KW) - ) - ) - df_proxy = self.df[proxy_mask] + unique_years = sorted(ind_min_year["min_year"].unique()) + self.logger.info(f"\n Unique min_year per indikator: {unique_years}") - if not df_proxy.empty: - self.sdgs_start_year = int(df_proxy["year"].min()) - self.logger.info( - f"\n sdgs_start_year = {self.sdgs_start_year} " - f"(dari proxy FIES/anaemia di tabel)" - ) + if len(unique_years) == 1: + gap_threshold = unique_years[0] + 1 + self.logger.info(" Hanya 1 cluster -> semua = MDGs") else: - sdgs_rows = self.df[self.df["framework"] == "SDGs"] - if not sdgs_rows.empty: - self.sdgs_start_year = int(sdgs_rows["year"].min()) - self.logger.warning( - f" [WARN] Proxy tidak ditemukan, fallback ke min(year) SDGs: " - f"{self.sdgs_start_year}" - ) - else: - self.sdgs_start_year = int(self.df["year"].max()) + 1 - self.logger.warning( - f" [WARN] Tidak ada SDGs. sdgs_start_year = {self.sdgs_start_year}" - ) + gaps = [ + (unique_years[i+1] - unique_years[i], unique_years[i], unique_years[i+1]) + for i in range(len(unique_years) - 1) + ] + gaps.sort(reverse=True) + largest_gap_size, y_before, y_after = gaps[0] + gap_threshold = y_after + self.logger.info(f" Gap terbesar: {y_before} -> {y_after} (selisih {largest_gap_size})") - self.logger.info(f" MDGs : {len(self.mdgs_indicator_ids)} indikator") - self.logger.info(f" SDGs : {len(self.sdgs_indicator_ids)} indikator") + ind_min_year["framework"] = ind_min_year["min_year"].apply( + lambda y: "MDGs" if int(y) < gap_threshold else "SDGs" + ) - for fw in ["MDGs", "SDGs"]: - fw_inds = ( - self.df[self.df["framework"] == fw] - .drop_duplicates("indicator_id")[["indicator_id", "indicator_name"]] - .sort_values("indicator_name") - ) - self.logger.info(f"\n {fw} indicators ({len(fw_inds)}):") - for _, row in fw_inds.iterrows(): - self.logger.info(f" [{int(row['indicator_id'])}] {row['indicator_name']}") + sdgs_rows = ind_min_year[ind_min_year["framework"] == "SDGs"] + self.sdgs_start_year = int(sdgs_rows["min_year"].min()) if not sdgs_rows.empty else int(self.df["year"].max()) + 1 + + self.logger.info(f" sdgs_start_year: {self.sdgs_start_year}") + + self.mdgs_indicator_ids = set(ind_min_year[ind_min_year["framework"] == "MDGs"]["indicator_id"].tolist()) + self.sdgs_indicator_ids = set(ind_min_year[ind_min_year["framework"] == "SDGs"]["indicator_id"].tolist()) + + self.logger.info(f" MDGs: {len(self.mdgs_indicator_ids)} indicators") + self.logger.info(f" SDGs: {len(self.sdgs_indicator_ids)} indicators") + + self.df = self.df.merge(ind_min_year[["indicator_id", "framework"]], on="indicator_id", how="left") # ========================================================================= - # CORE HELPER: _get_norm_value_df() - # ========================================================================= - # PERBAIKAN: - # Fungsi ini TIDAK lagi melakukan normalisasi ulang per indikator. - # Kolom norm_value_1_100 sudah dihitung sekali di analytical_layer - # dengan referensi global (semua tahun, semua negara, per indikator). - # - # Yang dilakukan di sini hanya: - # 1. Membaca norm_value_1_100 dari df - # 2. Mengubah skala 1-100 → 0-1 (untuk keperluan rata-rata agregat) - # dengan rumus linear: norm_0_1 = (norm_1_100 - 1) / 99 - # - # Rescaling agregat (0-1 → 1-100) tetap dilakukan via global_minmax() - # di masing-masing fungsi calc_* untuk menghasilkan skor level pillar/country/asean. + # CORE HELPER: normalisasi raw value per indikator # ========================================================================= def _get_norm_value_df(self) -> pd.DataFrame: - """ - Mengembalikan df dengan kolom 'norm_value' (skala 0-1) yang diturunkan - dari norm_value_1_100 (sudah ada di source, dihitung di analytical_layer). + if "framework" not in self.df.columns: + raise ValueError("Kolom 'framework' tidak ada. Pastikan _classify_indicators() dipanggil lebih dulu.") - Transformasi: norm_value = (norm_value_1_100 - 1) / 99 - Ini adalah transformasi LINEAR — tidak mengubah urutan relatif antar indikator, - negara, atau tahun. Komparabilitas lintas framework tetap terjaga. - """ - df = self.df.copy() + norm_parts = [] + for ind_id, grp in self.df.groupby("indicator_id"): + grp = grp.copy() + direction = str(grp["direction"].iloc[0]) + do_invert = _should_invert(direction, self.logger, context=f"indicator_id={ind_id}") + valid_mask = grp["value"].notna() + n_valid = valid_mask.sum() - # Konversi 1-100 → 0-1 secara linear - df["norm_value"] = np.where( - df["norm_value_1_100"].notna(), - (df["norm_value_1_100"] - 1.0) / 99.0, - np.nan - ) + if n_valid < 2: + grp["norm_value"] = np.nan + norm_parts.append(grp) + continue - n_null = df["norm_value"].isna().sum() - n_valid = df["norm_value"].notna().sum() - self.logger.debug( - f" _get_norm_value_df: {n_valid:,} valid | {n_null:,} null " - f"(dari norm_value_1_100 analytical_layer)" - ) + raw = grp.loc[valid_mask, "value"].values + v_min, v_max = raw.min(), raw.max() + normed = np.full(len(grp), np.nan) + if v_min == v_max: + normed[valid_mask.values] = 0.5 + else: + normed[valid_mask.values] = (raw - v_min) / (v_max - v_min) - return df + if do_invert: + normed = np.where(np.isnan(normed), np.nan, 1.0 - normed) + + grp["norm_value"] = normed + norm_parts.append(grp) + + return pd.concat(norm_parts, ignore_index=True) # ========================================================================= - # STEP 2: agg_pillar_composite + # STEP 2: agg_pillar_composite -> Gold # ========================================================================= def calc_pillar_composite(self) -> pd.DataFrame: table_name = "agg_pillar_composite" self.load_metadata[table_name]["start_time"] = datetime.now() self.logger.info("\n" + "=" * 70) - self.logger.info(f"STEP 2: {table_name}") + self.logger.info(f"STEP 2: {table_name} -> [Gold] fs_asean_gold") self.logger.info("=" * 70) df_normed = self._get_norm_value_df() @@ -629,22 +562,20 @@ class FoodSecurityAggregator: df_normed .groupby(["pillar_id", "pillar_name", "year"]) .agg( - pillar_norm =("norm_value", "mean"), - n_indicators =("indicator_id", "nunique"), - n_countries =("country_id", "nunique"), + pillar_norm =("norm_value", "mean"), + n_indicators=("indicator_id", "nunique"), + n_countries =("country_id", "nunique"), ) .reset_index() ) df["pillar_score_1_100"] = global_minmax(df["pillar_norm"]) - df["rank_in_year"] = ( + df["rank_in_year"] = ( df.groupby("year")["pillar_score_1_100"] .rank(method="min", ascending=False) .astype(int) ) df = add_yoy(df, ["pillar_id"], "pillar_score_1_100") - df = add_condition_column(df, "pillar_score_1_100") - log_condition_summary(df, table_name, self.logger) df["pillar_id"] = df["pillar_id"].astype(int) df["year"] = df["year"].astype(int) @@ -664,24 +595,20 @@ class FoodSecurityAggregator: bigquery.SchemaField("pillar_score_1_100", "FLOAT", mode="REQUIRED"), bigquery.SchemaField("rank_in_year", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("year_over_year_change", "FLOAT", mode="NULLABLE"), - bigquery.SchemaField("condition", "STRING", mode="NULLABLE"), ] - rows = load_to_bigquery( - self.client, df, table_name, layer='gold', - write_disposition="WRITE_TRUNCATE", schema=schema - ) + rows = load_to_bigquery(self.client, df, table_name, layer='gold', write_disposition="WRITE_TRUNCATE", schema=schema) self._finalize(table_name, rows) return df # ========================================================================= - # STEP 3: agg_pillar_by_country + # STEP 3: agg_pillar_by_country -> Gold # ========================================================================= def calc_pillar_by_country(self) -> pd.DataFrame: table_name = "agg_pillar_by_country" self.load_metadata[table_name]["start_time"] = datetime.now() self.logger.info("\n" + "=" * 70) - self.logger.info(f"STEP 3: {table_name}") + self.logger.info(f"STEP 3: {table_name} -> [Gold] fs_asean_gold") self.logger.info("=" * 70) df_normed = self._get_norm_value_df() @@ -694,14 +621,12 @@ class FoodSecurityAggregator: ) df["pillar_country_score_1_100"] = global_minmax(df["pillar_country_norm"]) - df["rank_in_pillar_year"] = ( + df["rank_in_pillar_year"] = ( df.groupby(["pillar_id", "year"])["pillar_country_score_1_100"] .rank(method="min", ascending=False) .astype(int) ) df = add_yoy(df, ["country_id", "pillar_id"], "pillar_country_score_1_100") - df = add_condition_column(df, "pillar_country_score_1_100") - log_condition_summary(df, table_name, self.logger) df["country_id"] = df["country_id"].astype(int) df["pillar_id"] = df["pillar_id"].astype(int) @@ -720,30 +645,17 @@ class FoodSecurityAggregator: bigquery.SchemaField("pillar_country_score_1_100", "FLOAT", mode="REQUIRED"), bigquery.SchemaField("rank_in_pillar_year", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("year_over_year_change", "FLOAT", mode="NULLABLE"), - bigquery.SchemaField("condition", "STRING", mode="NULLABLE"), ] - rows = load_to_bigquery( - self.client, df, table_name, layer='gold', - write_disposition="WRITE_TRUNCATE", schema=schema - ) + rows = load_to_bigquery(self.client, df, table_name, layer='gold', write_disposition="WRITE_TRUNCATE", schema=schema) self._finalize(table_name, rows) return df # ========================================================================= - # STEP 4: agg_framework_by_country - # ========================================================================= - # PERBAIKAN: - # - Flag NORMALIZE_FRAMEWORKS_JOINTLY dihapus. - # - Tidak ada lagi rescaling ulang per-framework di sini. - # - Semua framework (Total, MDGs, SDGs) menggunakan norm_value yang SAMA - # sebagai basis (sudah comparable dari analytical_layer). - # - global_minmax() hanya digunakan SEKALI untuk mengubah norm agregat - # (rata-rata norm_value per country-framework-year) menjadi skor 1-100 - # di level country-framework, menggunakan SATU POOL DATA BERSAMA. - # - Dengan ini, perbandingan skor MDGs vs SDGs per negara adalah valid. + # STEP 4: agg_framework_by_country -> Gold # ========================================================================= def _calc_country_composite_inmemory(self) -> pd.DataFrame: + """Hitung country composite in-memory (tidak disimpan ke BQ).""" df_normed = self._get_norm_value_df() df = ( df_normed @@ -773,52 +685,37 @@ class FoodSecurityAggregator: table_name = "agg_framework_by_country" self.load_metadata[table_name]["start_time"] = datetime.now() self.logger.info("\n" + "=" * 70) - self.logger.info(f"STEP 4: {table_name}") + self.logger.info(f"STEP 4: {table_name} -> [Gold] fs_asean_gold") self.logger.info("=" * 70) - self.logger.info( - " [PERBAIKAN] Semua framework di-aggregate dari norm_value yang SAMA." - "\n Tidak ada rescaling per-framework. Skor MDGs dan SDGs comparable." - ) country_composite = self._calc_country_composite_inmemory() df_normed = self._get_norm_value_df() parts = [] - # ── Layer TOTAL ─────────────────────────────────────────────────────── + # Layer TOTAL agg_total = ( country_composite[[ "country_id", "country_name", "year", "score_1_100", "n_indicators", "composite_score" ]] .copy() - .rename(columns={ - "score_1_100" : "framework_score_1_100", - "composite_score": "framework_norm" - }) + .rename(columns={"score_1_100": "framework_score_1_100", "composite_score": "framework_norm"}) ) agg_total["framework"] = "Total" parts.append(agg_total) - # ── Layer MDGs pre-SDGs (tahun sebelum sdgs_start_year) ────────────── - pre_sdgs_rows = country_composite[ - country_composite["year"] < self.sdgs_start_year - ].copy() + # Layer MDGs — Era pre-SDGs = Total + pre_sdgs_rows = country_composite[country_composite["year"] < self.sdgs_start_year].copy() if not pre_sdgs_rows.empty: mdgs_pre = ( - pre_sdgs_rows[[ - "country_id", "country_name", "year", - "score_1_100", "n_indicators", "composite_score" - ]] + pre_sdgs_rows[["country_id", "country_name", "year", "score_1_100", "n_indicators", "composite_score"]] .copy() - .rename(columns={ - "score_1_100" : "framework_score_1_100", - "composite_score": "framework_norm" - }) + .rename(columns={"score_1_100": "framework_score_1_100", "composite_score": "framework_norm"}) ) mdgs_pre["framework"] = "MDGs" parts.append(mdgs_pre) - # ── Layer MDGs mixed (setelah SDGs masuk, hanya indikator MDGs) ────── + # Layer MDGs — Era mixed if self.mdgs_indicator_ids: df_mdgs_mixed = df_normed[ (df_normed["indicator_id"].isin(self.mdgs_indicator_ids)) & @@ -828,17 +725,15 @@ class FoodSecurityAggregator: agg_mdgs_mixed = ( df_mdgs_mixed .groupby(["country_id", "country_name", "year"]) - .agg( - framework_norm=("norm_value", "mean"), - n_indicators =("indicator_id", "nunique") - ) + .agg(framework_norm=("norm_value", "mean"), n_indicators=("indicator_id", "nunique")) .reset_index() ) - # PERBAIKAN: rescale dari POOL GABUNGAN bersama SDGs (lihat bawah) + if not NORMALIZE_FRAMEWORKS_JOINTLY: + agg_mdgs_mixed["framework_score_1_100"] = global_minmax(agg_mdgs_mixed["framework_norm"]) agg_mdgs_mixed["framework"] = "MDGs" parts.append(agg_mdgs_mixed) - # ── Layer SDGs (hanya indikator SDGs, mulai sdgs_start_year) ───────── + # Layer SDGs if self.sdgs_indicator_ids: df_sdgs = df_normed[ (df_normed["indicator_id"].isin(self.sdgs_indicator_ids)) & @@ -848,51 +743,28 @@ class FoodSecurityAggregator: agg_sdgs = ( df_sdgs .groupby(["country_id", "country_name", "year"]) - .agg( - framework_norm=("norm_value", "mean"), - n_indicators =("indicator_id", "nunique") - ) + .agg(framework_norm=("norm_value", "mean"), n_indicators=("indicator_id", "nunique")) .reset_index() ) + if not NORMALIZE_FRAMEWORKS_JOINTLY: + agg_sdgs["framework_score_1_100"] = global_minmax(agg_sdgs["framework_norm"]) agg_sdgs["framework"] = "SDGs" parts.append(agg_sdgs) df = pd.concat(parts, ignore_index=True) - # PERBAIKAN: Rescale framework_score_1_100 dari SATU POOL BERSAMA - # untuk semua framework (MDGs mixed + SDGs) sekaligus. - # Ini memastikan skor 60 di MDGs dan skor 60 di SDGs memiliki makna - # yang sama: posisi relatif yang sama dalam distribusi gabungan. - mixed_mask = df["framework"].isin(["MDGs", "SDGs"]) - mixed_pre_mask = (df["framework"] == "MDGs") & (df["year"] < self.sdgs_start_year) - - # Rescale pre-SDGs MDGs dari pool Total (sudah dihitung) - # → sudah ada di agg_total (framework_score_1_100 = dari country_composite) - - # Rescale MDGs mixed + SDGs dari SATU POOL BERSAMA - post_sdgs_mask = mixed_mask & ~mixed_pre_mask & df["framework_norm"].notna() - if post_sdgs_mask.any(): - df.loc[post_sdgs_mask, "framework_score_1_100"] = global_minmax( - df.loc[post_sdgs_mask, "framework_norm"] - ) + if NORMALIZE_FRAMEWORKS_JOINTLY: + mixed_mask = (df["framework"].isin(["MDGs", "SDGs"])) & (df["year"] >= self.sdgs_start_year) + if mixed_mask.any(): + df.loc[mixed_mask, "framework_score_1_100"] = global_minmax(df.loc[mixed_mask, "framework_norm"]) df = check_and_dedup(df, ["country_id", "framework", "year"], context=table_name, logger=self.logger) - - # Pastikan kolom framework_score_1_100 ada untuk semua baris - if "framework_score_1_100" not in df.columns: - df["framework_score_1_100"] = np.nan - df["rank_in_framework_year"] = ( df.groupby(["framework", "year"])["framework_score_1_100"] .rank(method="min", ascending=False) .astype(int) ) df = add_yoy(df, ["country_id", "framework"], "framework_score_1_100") - df = add_condition_column(df, "framework_score_1_100") - log_condition_summary(df, table_name, self.logger) - - # Log diagnostik: bandingkan skor MDGs vs SDGs - self._log_framework_score_diagnostics(df, table_name) df["country_id"] = df["country_id"].astype(int) df["year"] = df["year"].astype(int) @@ -913,143 +785,119 @@ class FoodSecurityAggregator: bigquery.SchemaField("framework_score_1_100", "FLOAT", mode="REQUIRED"), bigquery.SchemaField("rank_in_framework_year", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("year_over_year_change", "FLOAT", mode="NULLABLE"), - bigquery.SchemaField("condition", "STRING", mode="NULLABLE"), ] - rows = load_to_bigquery( - self.client, df, table_name, layer='gold', - write_disposition="WRITE_TRUNCATE", schema=schema - ) + rows = load_to_bigquery(self.client, df, table_name, layer='gold', write_disposition="WRITE_TRUNCATE", schema=schema) self._finalize(table_name, rows) return df # ========================================================================= - # STEP 5: agg_framework_asean - # ========================================================================= - # PERBAIKAN: Sama dengan framework_by_country — tidak ada rescaling terpisah - # per framework. MDGs mixed dan SDGs di-rescale dari satu pool bersama. + # STEP 5: agg_framework_asean -> Gold # ========================================================================= def calc_framework_asean(self) -> pd.DataFrame: table_name = "agg_framework_asean" self.load_metadata[table_name]["start_time"] = datetime.now() self.logger.info("\n" + "=" * 70) - self.logger.info(f"STEP 5: {table_name}") + self.logger.info(f"STEP 5: {table_name} -> [Gold] fs_asean_gold") self.logger.info("=" * 70) - self.logger.info( - " [PERBAIKAN] MDGs mixed + SDGs di-rescale dari SATU POOL BERSAMA." - "\n Skor ASEAN MDGs dan SDGs sekarang comparable." - ) df_normed = self._get_norm_value_df() country_composite = self._calc_country_composite_inmemory() country_norm = ( - df_normed - .groupby(["country_id", "country_name", "year"])["norm_value"] - .mean().reset_index() - .rename(columns={"norm_value": "country_norm"}) + df_normed.groupby(["country_id", "country_name", "year"])["norm_value"] + .mean().reset_index().rename(columns={"norm_value": "country_norm"}) ) asean_overall = ( country_norm.groupby("year") - .agg( - asean_norm =("country_norm", "mean"), - std_norm =("country_norm", "std"), - n_countries =("country_norm", "count") - ) + .agg(asean_norm=("country_norm", "mean"), std_norm=("country_norm", "std"), + n_countries=("country_norm", "count")) .reset_index() ) asean_overall["asean_score_1_100"] = global_minmax(asean_overall["asean_norm"]) + asean_comp = ( + country_composite.groupby("year")["composite_score"] + .mean().reset_index().rename(columns={"composite_score": "asean_composite"}) + ) + asean_overall = asean_overall.merge(asean_comp, on="year", how="left") parts = [] - # ── Layer TOTAL ─────────────────────────────────────────────────────── + # Layer TOTAL total_cols = asean_overall[["year", "asean_score_1_100", "asean_norm", "std_norm", "n_countries"]].copy() total_cols = total_cols.rename(columns={ "asean_score_1_100": "framework_score_1_100", - "asean_norm" : "framework_norm", - "n_countries" : "n_countries_with_data", + "asean_norm": "framework_norm", + "n_countries": "n_countries_with_data", }) n_ind_total = df_normed.groupby("year")["indicator_id"].nunique().reset_index().rename(columns={"indicator_id": "n_indicators"}) total_cols = total_cols.merge(n_ind_total, on="year", how="left") total_cols["framework"] = "Total" parts.append(total_cols) - # ── Layer MDGs pre-SDGs ─────────────────────────────────────────────── + # Layer MDGs — pre-SDGs = Total pre_sdgs = asean_overall[asean_overall["year"] < self.sdgs_start_year].copy() if not pre_sdgs.empty: mdgs_pre = pre_sdgs[["year", "asean_score_1_100", "asean_norm", "std_norm", "n_countries"]].copy() mdgs_pre = mdgs_pre.rename(columns={ "asean_score_1_100": "framework_score_1_100", - "asean_norm" : "framework_norm", - "n_countries" : "n_countries_with_data", + "asean_norm": "framework_norm", + "n_countries": "n_countries_with_data", }) - n_ind_pre = ( - df_normed[df_normed["year"] < self.sdgs_start_year] - .groupby("year")["indicator_id"].nunique() - .reset_index().rename(columns={"indicator_id": "n_indicators"}) - ) - mdgs_pre = mdgs_pre.merge(n_ind_pre, on="year", how="left") + n_ind_pre = df_normed[df_normed["year"] < self.sdgs_start_year].groupby("year")["indicator_id"].nunique().reset_index().rename(columns={"indicator_id": "n_indicators"}) + mdgs_pre = mdgs_pre.merge(n_ind_pre, on="year", how="left") mdgs_pre["framework"] = "MDGs" parts.append(mdgs_pre) - # ── Siapkan MDGs mixed dan SDGs untuk rescaling BERSAMA ─────────────── - mixed_parts = [] - + # Layer MDGs — mixed if self.mdgs_indicator_ids: df_mdgs_mixed = df_normed[ (df_normed["indicator_id"].isin(self.mdgs_indicator_ids)) & (df_normed["year"] >= self.sdgs_start_year) ].copy() if not df_mdgs_mixed.empty: - cn = ( - df_mdgs_mixed.groupby(["country_id", "year"])["norm_value"].mean() - .reset_index().rename(columns={"norm_value": "country_norm"}) - ) + cn = df_mdgs_mixed.groupby(["country_id", "year"])["norm_value"].mean().reset_index().rename(columns={"norm_value": "country_norm"}) asean_mdgs = cn.groupby("year").agg( - framework_norm =("country_norm", "mean"), - std_norm =("country_norm", "std"), - n_countries_with_data =("country_id", "count"), + framework_norm=("country_norm", "mean"), + std_norm=("country_norm", "std"), + n_countries_with_data=("country_id", "count"), ).reset_index() n_ind_mdgs = df_mdgs_mixed.groupby("year")["indicator_id"].nunique().reset_index().rename(columns={"indicator_id": "n_indicators"}) asean_mdgs = asean_mdgs.merge(n_ind_mdgs, on="year", how="left") + if not NORMALIZE_FRAMEWORKS_JOINTLY: + asean_mdgs["framework_score_1_100"] = global_minmax(asean_mdgs["framework_norm"]) asean_mdgs["framework"] = "MDGs" - mixed_parts.append(asean_mdgs) + parts.append(asean_mdgs) + # Layer SDGs if self.sdgs_indicator_ids: df_sdgs = df_normed[ (df_normed["indicator_id"].isin(self.sdgs_indicator_ids)) & (df_normed["year"] >= self.sdgs_start_year) ].copy() if not df_sdgs.empty: - cn = ( - df_sdgs.groupby(["country_id", "year"])["norm_value"].mean() - .reset_index().rename(columns={"norm_value": "country_norm"}) - ) + cn = df_sdgs.groupby(["country_id", "year"])["norm_value"].mean().reset_index().rename(columns={"norm_value": "country_norm"}) asean_sdgs = cn.groupby("year").agg( - framework_norm =("country_norm", "mean"), - std_norm =("country_norm", "std"), - n_countries_with_data =("country_id", "count"), + framework_norm=("country_norm", "mean"), + std_norm=("country_norm", "std"), + n_countries_with_data=("country_id", "count"), ).reset_index() n_ind_sdgs = df_sdgs.groupby("year")["indicator_id"].nunique().reset_index().rename(columns={"indicator_id": "n_indicators"}) asean_sdgs = asean_sdgs.merge(n_ind_sdgs, on="year", how="left") + if not NORMALIZE_FRAMEWORKS_JOINTLY: + asean_sdgs["framework_score_1_100"] = global_minmax(asean_sdgs["framework_norm"]) asean_sdgs["framework"] = "SDGs" - mixed_parts.append(asean_sdgs) - - # PERBAIKAN: Rescale MDGs mixed + SDGs dari SATU POOL BERSAMA - if mixed_parts: - df_mixed = pd.concat(mixed_parts, ignore_index=True) - df_mixed["framework_score_1_100"] = global_minmax(df_mixed["framework_norm"]) - parts.append(df_mixed) + parts.append(asean_sdgs) df = pd.concat(parts, ignore_index=True) + if NORMALIZE_FRAMEWORKS_JOINTLY: + mixed_mask = (df["framework"].isin(["MDGs", "SDGs"])) & (df["year"] >= self.sdgs_start_year) + if mixed_mask.any(): + df.loc[mixed_mask, "framework_score_1_100"] = global_minmax(df.loc[mixed_mask, "framework_norm"]) + df = check_and_dedup(df, ["framework", "year"], context=table_name, logger=self.logger) df = add_yoy(df, ["framework"], "framework_score_1_100") - df = add_condition_column(df, "framework_score_1_100") - log_condition_summary(df, table_name, self.logger) - - # Log diagnostik: bandingkan skor ASEAN MDGs vs SDGs - self._log_framework_score_diagnostics(df, table_name) df["year"] = df["year"].astype(int) df["n_indicators"] = safe_int(df["n_indicators"], col_name="n_indicators", logger=self.logger) @@ -1068,31 +916,58 @@ class FoodSecurityAggregator: bigquery.SchemaField("std_norm", "FLOAT", mode="NULLABLE"), bigquery.SchemaField("framework_score_1_100", "FLOAT", mode="REQUIRED"), bigquery.SchemaField("year_over_year_change", "FLOAT", mode="NULLABLE"), - bigquery.SchemaField("condition", "STRING", mode="NULLABLE"), ] - rows = load_to_bigquery( - self.client, df, table_name, layer='gold', - write_disposition="WRITE_TRUNCATE", schema=schema - ) + rows = load_to_bigquery(self.client, df, table_name, layer='gold', write_disposition="WRITE_TRUNCATE", schema=schema) self._finalize(table_name, rows) return df # ========================================================================= - # STEP 6 & 7: Narrative (tidak ada perubahan) + # STEP 6: agg_narrative_overview -> Gold (NEW) + # + # Sumber data : df_framework_asean (framework='Total') + df_framework_by_country + # Granularity : 1 row per year + # Columns : year, n_mdg_indicators, n_sdg_indicators, n_total_indicators, + # asean_total_score, yoy_change, yoy_change_pct, + # country_ranking_json, most_improved_country, most_improved_delta, + # most_declined_country, most_declined_delta, narrative_overview # ========================================================================= - def calc_narrative_overview(self, df_framework_asean, df_framework_by_country): + def calc_narrative_overview( + self, + df_framework_asean: pd.DataFrame, + df_framework_by_country: pd.DataFrame, + ) -> pd.DataFrame: table_name = "agg_narrative_overview" self.load_metadata[table_name]["start_time"] = datetime.now() self.logger.info("\n" + "=" * 70) - self.logger.info(f"STEP 6: {table_name}") + self.logger.info(f"STEP 6: {table_name} -> [Gold] fs_asean_gold") self.logger.info("=" * 70) - asean_total = df_framework_asean[df_framework_asean["framework"] == "Total"].sort_values("year").reset_index(drop=True) - score_by_year = dict(zip(asean_total["year"].astype(int), asean_total["framework_score_1_100"].astype(float))) - country_total = df_framework_by_country[df_framework_by_country["framework"] == "Total"].copy() - ind_year = self.df.drop_duplicates(subset=["indicator_id", "year", "framework"]) - records = [] + # ASEAN-level Total framework rows only, sorted by year + # PENTING: filter framework='Total' dulu sebelum apapun + asean_total = ( + df_framework_asean[df_framework_asean["framework"] == "Total"] + .sort_values("year") + .reset_index(drop=True) + ) + + # Buat lookup score per tahun untuk ambil prev_score yang akurat + # Tidak mengandalkan score - yoy_val karena floating point bisa drift + score_by_year = dict(zip( + asean_total["year"].astype(int), + asean_total["framework_score_1_100"].astype(float), + )) + + # Country-level Total framework rows (ranking + YoY per country) + country_total = ( + df_framework_by_country[df_framework_by_country["framework"] == "Total"] + .copy() + ) + + # Indicator counts per year per framework (self.df already has 'framework' column) + ind_year = self.df.drop_duplicates(subset=["indicator_id", "year", "framework"]) + + records = [] for _, row in asean_total.iterrows(): yr = int(row["year"]) @@ -1100,28 +975,46 @@ class FoodSecurityAggregator: yoy = row["year_over_year_change"] yoy_val = float(yoy) if pd.notna(yoy) else None + # -- Indicator counts per framework for this year --------------- yr_ind = ind_year[ind_year["year"] == yr] n_mdg = int(yr_ind[yr_ind["framework"] == "MDGs"]["indicator_id"].nunique()) n_sdg = int(yr_ind[yr_ind["framework"] == "SDGs"]["indicator_id"].nunique()) n_total_ind = int(yr_ind["indicator_id"].nunique()) - prev_score = score_by_year.get(yr - 1, None) - yoy_pct = ((yoy_val / prev_score * 100) if (yoy_val is not None and prev_score and prev_score != 0) else None) - yr_country = country_total[country_total["year"] == yr].sort_values("rank_in_framework_year").reset_index(drop=True) + # -- prev_score diambil langsung dari lookup, bukan score - yoy_val + # Ini memastikan nilai konsisten 100% dengan tabel agg_framework_asean + prev_score = score_by_year.get(yr - 1, None) + + # -- YoY % ----------------------------------------------------- + yoy_pct = ( + (yoy_val / prev_score * 100) + if (yoy_val is not None and prev_score is not None and prev_score != 0) + else None + ) + + # -- Country ranking for this year ----------------------------- + yr_country = ( + country_total[country_total["year"] == yr] + .sort_values("rank_in_framework_year") + .reset_index(drop=True) + ) + ranking_list = [] for _, cr in yr_country.iterrows(): cr_yoy = cr.get("year_over_year_change", None) ranking_list.append({ - "rank" : int(cr["rank_in_framework_year"]), + "rank": int(cr["rank_in_framework_year"]), "country_name": str(cr["country_name"]), - "score" : round(float(cr["framework_score_1_100"]), 2), - "yoy_change" : round(float(cr_yoy), 2) if pd.notna(cr_yoy) else None, + "score": round(float(cr["framework_score_1_100"]), 2), + "yoy_change": round(float(cr_yoy), 2) if pd.notna(cr_yoy) else None, }) + country_ranking_json = json.dumps(ranking_list, ensure_ascii=False) + # -- Most improved / declined country -------------------------- yr_country_yoy = yr_country.dropna(subset=["year_over_year_change"]) if not yr_country_yoy.empty: - best_idx = yr_country_yoy["year_over_year_change"].idxmax() - worst_idx = yr_country_yoy["year_over_year_change"].idxmin() + best_idx = yr_country_yoy["year_over_year_change"].idxmax() + worst_idx = yr_country_yoy["year_over_year_change"].idxmin() most_improved_country = str(yr_country_yoy.loc[best_idx, "country_name"]) most_improved_delta = round(float(yr_country_yoy.loc[best_idx, "year_over_year_change"]), 2) most_declined_country = str(yr_country_yoy.loc[worst_idx, "country_name"]) @@ -1130,28 +1023,38 @@ class FoodSecurityAggregator: most_improved_country = most_declined_country = None most_improved_delta = most_declined_delta = None + # -- Build narrative ------------------------------------------- narrative = _build_overview_narrative( - year=yr, n_mdg=n_mdg, n_sdg=n_sdg, n_total_ind=n_total_ind, - score=score, yoy_val=yoy_val, yoy_pct=yoy_pct, - prev_year=yr-1, prev_score=prev_score, ranking_list=ranking_list, - most_improved_country=most_improved_country, most_improved_delta=most_improved_delta, - most_declined_country=most_declined_country, most_declined_delta=most_declined_delta, + year = yr, + n_mdg = n_mdg, + n_sdg = n_sdg, + n_total_ind = n_total_ind, + score = score, + yoy_val = yoy_val, + yoy_pct = yoy_pct, + prev_year = yr - 1, + prev_score = prev_score, + ranking_list = ranking_list, + most_improved_country = most_improved_country, + most_improved_delta = most_improved_delta, + most_declined_country = most_declined_country, + most_declined_delta = most_declined_delta, ) records.append({ - "year" : yr, - "n_mdg_indicators" : n_mdg, - "n_sdg_indicators" : n_sdg, - "n_total_indicators" : n_total_ind, - "asean_total_score" : round(score, 2), - "yoy_change" : yoy_val, - "yoy_change_pct" : round(yoy_pct, 2) if yoy_pct is not None else None, - "country_ranking_json" : json.dumps(ranking_list, ensure_ascii=False), + "year": yr, + "n_mdg_indicators": n_mdg, + "n_sdg_indicators": n_sdg, + "n_total_indicators": n_total_ind, + "asean_total_score": round(score, 2), + "yoy_change": yoy_val, + "yoy_change_pct": round(yoy_pct, 2) if yoy_pct is not None else None, + "country_ranking_json": country_ranking_json, "most_improved_country": most_improved_country, - "most_improved_delta" : most_improved_delta, + "most_improved_delta": most_improved_delta, "most_declined_country": most_declined_country, - "most_declined_delta" : most_declined_delta, - "narrative_overview" : narrative, + "most_declined_delta": most_declined_delta, + "narrative_overview": narrative, }) df = pd.DataFrame(records) @@ -1178,28 +1081,54 @@ class FoodSecurityAggregator: bigquery.SchemaField("most_declined_delta", "FLOAT", mode="NULLABLE"), bigquery.SchemaField("narrative_overview", "STRING", mode="REQUIRED"), ] - rows = load_to_bigquery(self.client, df, table_name, layer='gold', write_disposition="WRITE_TRUNCATE", schema=schema) + rows = load_to_bigquery( + self.client, df, table_name, layer='gold', + write_disposition="WRITE_TRUNCATE", schema=schema, + ) self._finalize(table_name, rows) return df - def calc_narrative_pillar(self, df_pillar_composite, df_pillar_by_country): + # ========================================================================= + # STEP 7: agg_narrative_pillar -> Gold (NEW) + # + # Sumber data : df_pillar_composite + df_pillar_by_country + # Granularity : 1 row per (year, pillar_id) + # Columns : year, pillar_id, pillar_name, pillar_score, rank_in_year, + # yoy_change, top_country, top_country_score, + # bottom_country, bottom_country_score, narrative_pillar + # ========================================================================= + + def calc_narrative_pillar( + self, + df_pillar_composite: pd.DataFrame, + df_pillar_by_country: pd.DataFrame, + ) -> pd.DataFrame: table_name = "agg_narrative_pillar" self.load_metadata[table_name]["start_time"] = datetime.now() self.logger.info("\n" + "=" * 70) - self.logger.info(f"STEP 7: {table_name}") + self.logger.info(f"STEP 7: {table_name} -> [Gold] fs_asean_gold") self.logger.info("=" * 70) records = [] - for yr in sorted(df_pillar_composite["year"].unique()): - yr_pillars = df_pillar_composite[df_pillar_composite["year"] == yr].sort_values("rank_in_year").reset_index(drop=True) - yr_country_pillar = df_pillar_by_country[df_pillar_by_country["year"] == yr] - strongest_pillar = yr_pillars.iloc[0] if len(yr_pillars) > 0 else None - weakest_pillar = yr_pillars.iloc[-1] if len(yr_pillars) > 0 else None + years = sorted(df_pillar_composite["year"].unique()) + for yr in years: + yr_pillars = ( + df_pillar_composite[df_pillar_composite["year"] == yr] + .sort_values("rank_in_year") + .reset_index(drop=True) + ) + yr_country_pillar = df_pillar_by_country[df_pillar_by_country["year"] == yr] + + # Strongest / weakest pillar this year (for context sentence) + strongest_pillar = yr_pillars.iloc[0] if len(yr_pillars) > 0 else None + weakest_pillar = yr_pillars.iloc[-1] if len(yr_pillars) > 0 else None + + # Biggest improvement / decline across all pillars this year yr_pillars_yoy = yr_pillars.dropna(subset=["year_over_year_change"]) if not yr_pillars_yoy.empty: - best_p_idx = yr_pillars_yoy["year_over_year_change"].idxmax() - worst_p_idx = yr_pillars_yoy["year_over_year_change"].idxmin() + best_p_idx = yr_pillars_yoy["year_over_year_change"].idxmax() + worst_p_idx = yr_pillars_yoy["year_over_year_change"].idxmin() most_improved_pillar = str(yr_pillars_yoy.loc[best_p_idx, "pillar_name"]) most_improved_delta = round(float(yr_pillars_yoy.loc[best_p_idx, "year_over_year_change"]), 2) most_declined_pillar = str(yr_pillars_yoy.loc[worst_p_idx, "pillar_name"]) @@ -1210,42 +1139,61 @@ class FoodSecurityAggregator: for _, prow in yr_pillars.iterrows(): p_id = int(prow["pillar_id"]) - p_country = yr_country_pillar[yr_country_pillar["pillar_id"] == p_id].sort_values("rank_in_pillar_year").reset_index(drop=True) - top_country = bot_country = None - top_country_score = bot_country_score = None + p_name = str(prow["pillar_name"]) + p_score = float(prow["pillar_score_1_100"]) + p_rank = int(prow["rank_in_year"]) + p_yoy = prow["year_over_year_change"] + p_yoy_val = float(p_yoy) if pd.notna(p_yoy) else None + + # Top / bottom country within this pillar & year + p_country = ( + yr_country_pillar[yr_country_pillar["pillar_id"] == p_id] + .sort_values("rank_in_pillar_year") + .reset_index(drop=True) + ) if not p_country.empty: top_country = str(p_country.iloc[0]["country_name"]) top_country_score = round(float(p_country.iloc[0]["pillar_country_score_1_100"]), 2) bot_country = str(p_country.iloc[-1]["country_name"]) bot_country_score = round(float(p_country.iloc[-1]["pillar_country_score_1_100"]), 2) + else: + top_country = bot_country = None + top_country_score = bot_country_score = None - p_yoy = prow["year_over_year_change"] + # -- Build narrative --------------------------------------- narrative = _build_pillar_narrative( - year=yr, pillar_name=str(prow["pillar_name"]), - pillar_score=float(prow["pillar_score_1_100"]), - rank_in_year=int(prow["rank_in_year"]), n_pillars=len(yr_pillars), - yoy_val=float(p_yoy) if pd.notna(p_yoy) else None, - top_country=top_country, top_country_score=top_country_score, - bot_country=bot_country, bot_country_score=bot_country_score, - strongest_pillar=str(strongest_pillar["pillar_name"]) if strongest_pillar is not None else None, - strongest_score=round(float(strongest_pillar["pillar_score_1_100"]), 2) if strongest_pillar is not None else None, - weakest_pillar=str(weakest_pillar["pillar_name"]) if weakest_pillar is not None else None, - weakest_score=round(float(weakest_pillar["pillar_score_1_100"]), 2) if weakest_pillar is not None else None, - most_improved_pillar=most_improved_pillar, most_improved_delta=most_improved_delta, - most_declined_pillar=most_declined_pillar, most_declined_delta=most_declined_delta, + year = yr, + pillar_name = p_name, + pillar_score = p_score, + rank_in_year = p_rank, + n_pillars = len(yr_pillars), + yoy_val = p_yoy_val, + top_country = top_country, + top_country_score = top_country_score, + bot_country = bot_country, + bot_country_score = bot_country_score, + strongest_pillar = str(strongest_pillar["pillar_name"]) if strongest_pillar is not None else None, + strongest_score = round(float(strongest_pillar["pillar_score_1_100"]), 2) if strongest_pillar is not None else None, + weakest_pillar = str(weakest_pillar["pillar_name"]) if weakest_pillar is not None else None, + weakest_score = round(float(weakest_pillar["pillar_score_1_100"]), 2) if weakest_pillar is not None else None, + most_improved_pillar = most_improved_pillar, + most_improved_delta = most_improved_delta, + most_declined_pillar = most_declined_pillar, + most_declined_delta = most_declined_delta, ) + records.append({ - "year" : yr, - "pillar_id" : p_id, - "pillar_name" : str(prow["pillar_name"]), - "pillar_score" : round(float(prow["pillar_score_1_100"]), 2), - "rank_in_year" : int(prow["rank_in_year"]), - "yoy_change" : float(p_yoy) if pd.notna(p_yoy) else None, - "top_country" : top_country, - "top_country_score" : top_country_score, - "bottom_country" : bot_country, + "year": yr, + "pillar_id": p_id, + "pillar_name": p_name, + "pillar_score": round(p_score, 2), + "rank_in_year": p_rank, + "yoy_change": p_yoy_val, + "top_country": top_country, + "top_country_score": top_country_score, + "bottom_country": bot_country, "bottom_country_score": bot_country_score, - "narrative_pillar" : narrative, + "narrative_pillar": narrative, }) df = pd.DataFrame(records) @@ -1268,43 +1216,21 @@ class FoodSecurityAggregator: bigquery.SchemaField("bottom_country_score", "FLOAT", mode="NULLABLE"), bigquery.SchemaField("narrative_pillar", "STRING", mode="REQUIRED"), ] - rows = load_to_bigquery(self.client, df, table_name, layer='gold', write_disposition="WRITE_TRUNCATE", schema=schema) + rows = load_to_bigquery( + self.client, df, table_name, layer='gold', + write_disposition="WRITE_TRUNCATE", schema=schema, + ) self._finalize(table_name, rows) return df # ========================================================================= - # DIAGNOSTIK & VALIDASI + # HELPERS # ========================================================================= - def _log_framework_score_diagnostics(self, df: pd.DataFrame, context: str): - """ - Log perbandingan rata-rata skor per framework. - Setelah perbaikan, gap antar framework mencerminkan perbedaan substantif, - bukan artefak normalisasi. - """ - self.logger.info(f"\n [DIAGNOSTIK] Rata-rata skor per framework ({context}):") - fw_means = df.groupby("framework")["framework_score_1_100"].agg(['mean', 'min', 'max']).round(2) - for fw, row in fw_means.iterrows(): - self.logger.info( - f" {fw:<8} mean={row['mean']:>6.2f} " - f"range=[{row['min']:.2f}, {row['max']:.2f}]" - ) - - if "MDGs" in fw_means.index and "SDGs" in fw_means.index: - gap = fw_means.loc["MDGs", "mean"] - fw_means.loc["SDGs", "mean"] - self.logger.info( - f"\n Gap MDGs-SDGs = {gap:.2f} poin" - + ( - " → SUBSTANTIF (indikator SDGs mengukur deprivasi lebih dalam)" - if abs(gap) > 10 else - " → dalam batas wajar" - ) - ) - def _validate_mdgs_equals_total(self, df: pd.DataFrame, level: str = ""): self.logger.info(f"\n Validasi MDGs < {self.sdgs_start_year} == Total [{level}]:") group_by = ["year"] if level.startswith("asean") else ["country_id", "year"] - mdgs_pre = df[(df["framework"] == "MDGs") & (df["year"] < self.sdgs_start_year)][group_by + ["framework_score_1_100"]].rename(columns={"framework_score_1_100": "mdgs_score"}) + mdgs_pre = df[(df["framework"] == "MDGs") & (df["year"] < self.sdgs_start_year)][group_by + ["framework_score_1_100"]].rename(columns={"framework_score_1_100": "mdgs_score"}) total_pre = df[(df["framework"] == "Total") & (df["year"] < self.sdgs_start_year)][group_by + ["framework_score_1_100"]].rename(columns={"framework_score_1_100": "total_score"}) if mdgs_pre.empty and total_pre.empty: self.logger.info(f" -> Tidak ada data pre-{self.sdgs_start_year} (skip)") @@ -1318,9 +1244,12 @@ class FoodSecurityAggregator: self.logger.info(f" -> {status} (n_checked={len(check)})") def _finalize(self, table_name: str, rows_loaded: int): - self.load_metadata[table_name].update({"rows_loaded": rows_loaded, "status": "success", "end_time": datetime.now()}) + self.load_metadata[table_name].update({ + "rows_loaded": rows_loaded, "status": "success", "end_time": datetime.now(), + }) log_update(self.client, "DW", table_name, "full_load", rows_loaded) - self.logger.info(f" {table_name}: {rows_loaded:,} rows -> [Gold] fs_asean_gold") + self.logger.info(f" ✓ {table_name}: {rows_loaded:,} rows → [Gold] fs_asean_gold") + self.logger.info(f" Metadata → [AUDIT] etl_logs") def _fail(self, table_name: str, error: Exception): self.load_metadata[table_name].update({"status": "failed", "end_time": datetime.now()}) @@ -1328,29 +1257,36 @@ class FoodSecurityAggregator: log_update(self.client, "DW", table_name, "full_load", 0, "failed", str(error)) # ========================================================================= - # RUN + # RUN — 6 tabel (4 lama + 2 narrative baru) # ========================================================================= def run(self): start = datetime.now() self.logger.info("\n" + "=" * 70) - self.logger.info("FOOD SECURITY AGGREGATION — 6 TABLES -> fs_asean_gold") - self.logger.info(f" Condition threshold: bad<{THRESHOLD_BAD}, good>{THRESHOLD_GOOD}") - self.logger.info( - " NORMALISASI: norm_value dari analytical_layer (satu referensi global)." - "\n Tidak ada rescaling per-framework. MDGs dan SDGs comparable." - ) + self.logger.info("FOOD SECURITY AGGREGATION v9.0 — 6 TABLES -> fs_asean_gold") + self.logger.info(" agg_pillar_composite | agg_pillar_by_country") + self.logger.info(" agg_framework_by_country| agg_framework_asean") + self.logger.info(" agg_narrative_overview | agg_narrative_pillar") self.logger.info("=" * 70) self.load_data() self._classify_indicators() + # -- 4 tabel lama (tidak ada perubahan) ---------------------------- df_pillar_composite = self.calc_pillar_composite() df_pillar_by_country = self.calc_pillar_by_country() df_framework_by_country = self.calc_framework_by_country() df_framework_asean = self.calc_framework_asean() - self.calc_narrative_overview(df_framework_asean=df_framework_asean, df_framework_by_country=df_framework_by_country) - self.calc_narrative_pillar(df_pillar_composite=df_pillar_composite, df_pillar_by_country=df_pillar_by_country) + + # -- 2 tabel narrative baru ---------------------------------------- + self.calc_narrative_overview( + df_framework_asean = df_framework_asean, + df_framework_by_country = df_framework_by_country, + ) + self.calc_narrative_pillar( + df_pillar_composite = df_pillar_composite, + df_pillar_by_country = df_pillar_by_country, + ) duration = (datetime.now() - start).total_seconds() total_rows = sum(m["rows_loaded"] for m in self.load_metadata.values()) @@ -1361,15 +1297,20 @@ class FoodSecurityAggregator: self.logger.info(f" Durasi : {duration:.2f}s") self.logger.info(f" Total rows : {total_rows:,}") for tbl, meta in self.load_metadata.items(): - icon = "OK" if meta["status"] == "success" else "FAIL" - self.logger.info(f" [{icon}] {tbl:<35} {meta['rows_loaded']:>10,}") + icon = "✓" if meta["status"] == "success" else "✗" + self.logger.info(f" {icon} {tbl:<35} {meta['rows_loaded']:>10,}") # ============================================================================= -# AIRFLOW & MAIN +# AIRFLOW TASK FUNCTIONS # ============================================================================= def run_aggregation(): + """ + Airflow task: Hitung semua agregasi dari analytical_food_security. + Dipanggil setelah analytical_layer_to_gold selesai. + Menjalankan 6 tabel sekaligus: 4 agregasi + 2 narrative. + """ from scripts.bigquery_config import get_bigquery_client client = get_bigquery_client() agg = FoodSecurityAggregator(client) @@ -1378,18 +1319,21 @@ def run_aggregation(): print(f"Aggregation completed: {total:,} total rows loaded") +# ============================================================================= +# MAIN EXECUTION +# ============================================================================= + if __name__ == "__main__": import io + if _sys.stdout.encoding and _sys.stdout.encoding.lower() not in ("utf-8", "utf8"): _sys.stdout = io.TextIOWrapper(_sys.stdout.buffer, encoding="utf-8", errors="replace") if _sys.stderr.encoding and _sys.stderr.encoding.lower() not in ("utf-8", "utf8"): _sys.stderr = io.TextIOWrapper(_sys.stderr.buffer, encoding="utf-8", errors="replace") print("=" * 70) - print("FOOD SECURITY AGGREGATION -> fs_asean_gold") - print(f"Condition threshold: bad<{THRESHOLD_BAD}, moderate {THRESHOLD_BAD}-{THRESHOLD_GOOD}, good>{THRESHOLD_GOOD}") - print("NORMALISASI: satu referensi global per indikator (dari analytical_layer).") - print("Tidak ada rescaling per-framework. MDGs dan SDGs comparable.") + print("FOOD SECURITY AGGREGATION-> fs_asean_gold") + print(f" NORMALIZE_FRAMEWORKS_JOINTLY = {NORMALIZE_FRAMEWORKS_JOINTLY}") print("=" * 70) logger = setup_logging() diff --git a/scripts/bigquery_analytical_layer.py b/scripts/bigquery_analytical_layer.py index 9661578..6543564 100644 --- a/scripts/bigquery_analytical_layer.py +++ b/scripts/bigquery_analytical_layer.py @@ -1,44 +1,14 @@ """ BIGQUERY ANALYTICAL LAYER - DATA FILTERING -fact_asean_food_security_selected disimpan di fs_asean_gold (layer='gold') +FIXED: analytical_food_security disimpan di fs_asean_gold (layer='gold') Filtering Order: 1. Load data (single years only) -2. Determine year boundaries (2013 - auto-detected end year, baseline=2023 per syarat dosen) +2. Determine year boundaries (2013 - auto-detected end year) 3. Filter complete indicators PER COUNTRY (auto-detect start year, no gaps) 4. Filter countries with ALL pillars (FIXED SET) 5. Filter indicators with consistent presence across FIXED countries - → TIDAK menghapus baris year < max_start_year - → Semua baris tetap ada; label framework ditentukan di Step 6 -6. Assign framework (MDGs/SDGs) per indicator PER ROW - → Indikator TIDAK di SDG_ONLY_KEYWORDS → 'MDGs' selalu - → Indikator DI SDG_ONLY_KEYWORDS + year >= SDG_TRANSITION_YEAR → 'SDGs' - → Indikator DI SDG_ONLY_KEYWORDS + year < SDG_TRANSITION_YEAR → 'MDGs' - → SDG_TRANSITION_YEAR = 2015 (HARDCODE — tanggal resmi SDGs berlaku) -7. Verify no gaps (dari actual_start_year per indikator, bukan start_year global) -8. Calculate norm_value_1_100 per indicator (min-max, direction-aware, global) - *** PERBAIKAN: normalisasi dilakukan SEKALI untuk seluruh data (semua tahun), - bukan per-framework, agar nilai dari era MDGs dan SDGs berada di - skala yang sama dan dapat dibandingkan secara adil. *** -9. Calculate YoY per indicator per country -10. Analyze indicator availability by year -11. Save analytical table - -FRAMEWORK LOGIC: -- SDG_TRANSITION_YEAR = 2015 (HARDCODE, bukan auto-detect dari data) -- Semua SDG-only indicators menggunakan SDG_TRANSITION_YEAR yang SAMA -- SDG-only + year < SDG_TRANSITION_YEAR → 'MDGs' (data tetap ada, tidak dihapus) -- SDG-only + year >= SDG_TRANSITION_YEAR → 'SDGs' -- Non-SDG-only indicators → 'MDGs' selalu (di semua tahun) - -NORMALISASI (PERBAIKAN): -- norm_value_1_100 dihitung SATU KALI per indikator menggunakan seluruh data - (semua tahun, semua negara) sebagai referensi min-max. -- Ini memastikan nilai 60 di era MDGs dan nilai 60 di era SDGs memiliki - makna yang SAMA (posisi relatif yang sama dalam distribusi global). -- Tidak ada rescaling ulang per-framework di layer analitik ini. -- Rescaling per-framework (jika diperlukan untuk visualisasi) sebaiknya - dilakukan di layer agregasi (analysis_layer) dengan flag eksplisit. +6. Save analytical table (value only, normalisasi & direction handled downstream) """ import pandas as pd @@ -64,82 +34,21 @@ from scripts.bigquery_helpers import ( from google.cloud import bigquery -# ============================================================================= -# SDG-ONLY INDICATOR KEYWORDS -# ============================================================================= -SDG_ONLY_KEYWORDS = frozenset([ - # TARGET 2.1.1 — Undernourishment - "prevalence of undernourishment (percent) (3-year average)", - "number of people undernourished (million) (3-year average)", - - # TARGET 2.1.2 — Food Insecurity (FIES) - "prevalence of severe food insecurity in the total population (percent) (3-year average)", - "prevalence of severe food insecurity in the male adult population (percent) (3-year average)", - "prevalence of severe food insecurity in the female adult population (percent) (3-year average)", - - "prevalence of moderate or severe food insecurity in the total population (percent) (3-year average)", - "prevalence of moderate or severe food insecurity in the male adult population (percent) (3-year average)", - "prevalence of moderate or severe food insecurity in the female adult population (percent) (3-year average)", - - "number of severely food insecure people (million) (3-year average)", - "number of severely food insecure male adults (million) (3-year average)", - "number of severely food insecure female adults (million) (3-year average)", - - "number of moderately or severely food insecure people (million) (3-year average)", - "number of moderately or severely food insecure male adults (million) (3-year average)", - "number of moderately or severely food insecure female adults (million) (3-year average)", - - # TARGET 2.2.1 — Stunting - "percentage of children under 5 years of age who are stunted (modelled estimates) (percent)", - "number of children under 5 years of age who are stunted (modeled estimates) (million)", - - # TARGET 2.2.2 — Wasting - "percentage of children under 5 years affected by wasting (percent)", - "number of children under 5 years affected by wasting (million)", - - # TARGET 2.2.2 — Overweight (children) - "percentage of children under 5 years of age who are overweight (modelled estimates) (percent)", - "number of children under 5 years of age who are overweight (modeled estimates) (million)", - - # TARGET 2.2.3 — Anaemia - "prevalence of anemia among women of reproductive age (15-49 years) (percent)", - "number of women of reproductive age (15-49 years) affected by anemia (million)", -]) - -# ============================================================================= -# SDG TRANSITION YEAR — HARDCODE -# ============================================================================= -SDG_TRANSITION_YEAR = 2015 - -# ============================================================================= -# THRESHOLD KONDISI (fixed absolute, skala 1-100) -# ============================================================================= -THRESHOLD_BAD = 40.0 -THRESHOLD_GOOD = 60.0 - - -def assign_condition(norm_value_1_100: float) -> str: - if pd.isna(norm_value_1_100): - return None - if norm_value_1_100 > THRESHOLD_GOOD: - return 'good' - if norm_value_1_100 < THRESHOLD_BAD: - return 'bad' - return 'moderate' - - # ============================================================================= # ANALYTICAL LAYER CLASS # ============================================================================= class AnalyticalLayerLoader: """ - Analytical Layer Loader for BigQuery + Analytical Layer Loader for BigQuery - CORRECTED VERSION v4 - PERBAIKAN NORMALISASI: - - norm_value_1_100 dihitung SEKALI per indikator dari seluruh data - (semua tahun, semua negara). Tidak ada rescaling ulang per-framework. - - Ini memastikan komparabilitas lintas era MDGs dan SDGs. + Key Logic: + 1. Complete per country (no gaps from start_year to end_year) + 2. Filter countries with all pillars + 3. Ensure indicators have consistent country count across all years + 4. Save raw value only (normalisasi & direction handled downstream) + + Output: analytical_food_security -> DW layer (Gold) -> fs_asean_gold """ def __init__(self, client: bigquery.Client): @@ -152,15 +61,12 @@ class AnalyticalLayerLoader: self.df_country = None self.df_pillar = None - self.selected_country_ids = None - self.indicator_max_start_map = {} + self.selected_country_ids = None self.start_year = 2013 self.end_year = None self.baseline_year = 2023 - self.sdg_transition_year = SDG_TRANSITION_YEAR - self.pipeline_metadata = { 'source_class' : self.__class__.__name__, 'start_time' : None, @@ -175,10 +81,6 @@ class AnalyticalLayerLoader: self.pipeline_start = None self.pipeline_end = None - # ------------------------------------------------------------------ - # STEP 1: LOAD SOURCE DATA - # ------------------------------------------------------------------ - def load_source_data(self): self.logger.info("\n" + "=" * 80) self.logger.info("STEP 1: LOADING SOURCE DATA from fs_asean_gold") @@ -209,17 +111,14 @@ class AnalyticalLayerLoader: """ self.logger.info("Loading fact table with dimensions...") - self.df_clean = self.client.query(query).result().to_dataframe( - create_bqstorage_client=False - ) + self.df_clean = self.client.query(query).result().to_dataframe(create_bqstorage_client=False) self.logger.info(f" Loaded: {len(self.df_clean):,} rows") if 'is_year_range' in self.df_clean.columns: yr = self.df_clean['is_year_range'].value_counts() - self.logger.info( - f" Single years: {yr.get(False, 0):,} | " - f"Year ranges: {yr.get(True, 0):,}" - ) + self.logger.info(f" Breakdown:") + self.logger.info(f" Single years (is_year_range=False): {yr.get(False, 0):,}") + self.logger.info(f" Year ranges (is_year_range=True): {yr.get(True, 0):,}") self.df_indicator = read_from_bigquery(self.client, 'dim_indicator', layer='gold') self.df_country = read_from_bigquery(self.client, 'dim_country', layer='gold') @@ -236,39 +135,34 @@ class AnalyticalLayerLoader: self.logger.error(f"Error loading source data: {e}") raise - # ------------------------------------------------------------------ - # STEP 2: DETERMINE YEAR BOUNDARIES - # ------------------------------------------------------------------ - def determine_year_boundaries(self): self.logger.info("\n" + "=" * 80) self.logger.info("STEP 2: DETERMINE YEAR BOUNDARIES") self.logger.info("=" * 80) - df_baseline = self.df_clean[self.df_clean['year'] == self.baseline_year] - baseline_indicator_count = df_baseline['indicator_id'].nunique() + df_2023 = self.df_clean[self.df_clean['year'] == self.baseline_year] + baseline_indicator_count = df_2023['indicator_id'].nunique() - self.logger.info(f"\n Baseline year (hardcode, syarat dosen): {self.baseline_year}") - self.logger.info(f" Baseline indicator count: {baseline_indicator_count}") + self.logger.info(f"\nBaseline Year: {self.baseline_year}") + self.logger.info(f"Baseline Indicator Count: {baseline_indicator_count}") years_sorted = sorted(self.df_clean['year'].unique(), reverse=True) selected_end_year = None - self.logger.info(f"\n Scanning end_year (>= {self.baseline_year}):") for year in years_sorted: if year >= self.baseline_year: df_year = self.df_clean[self.df_clean['year'] == year] year_indicator_count = df_year['indicator_id'].nunique() status = "OK" if year_indicator_count >= baseline_indicator_count else "X" - self.logger.info(f" [{status}] Year {int(year)}: {year_indicator_count} indicators") + self.logger.info(f" [{status}] Year {int(year)}: {year_indicator_count} indicators") if year_indicator_count >= baseline_indicator_count and selected_end_year is None: selected_end_year = int(year) if selected_end_year is None: selected_end_year = self.baseline_year - self.logger.warning(f" [!] Fallback to baseline: {selected_end_year}") + self.logger.warning(f" [!] No year found, using baseline: {selected_end_year}") else: - self.logger.info(f"\n [OK] Selected end year: {selected_end_year}") + self.logger.info(f"\n [OK] Selected End Year: {selected_end_year}") self.end_year = selected_end_year original_count = len(self.df_clean) @@ -278,15 +172,11 @@ class AnalyticalLayerLoader: (self.df_clean['year'] <= self.end_year) ].copy() - self.logger.info(f"\n Filtering {self.start_year}-{self.end_year}:") - self.logger.info(f" Rows before: {original_count:,}") - self.logger.info(f" Rows after : {len(self.df_clean):,}") + self.logger.info(f"\nFiltering {self.start_year}-{self.end_year}:") + self.logger.info(f" Rows before: {original_count:,}") + self.logger.info(f" Rows after: {len(self.df_clean):,}") return self.df_clean - # ------------------------------------------------------------------ - # STEP 3: FILTER COMPLETE INDICATORS PER COUNTRY - # ------------------------------------------------------------------ - def filter_complete_indicators_per_country(self): self.logger.info("\n" + "=" * 80) self.logger.info("STEP 3: FILTER COMPLETE INDICATORS PER COUNTRY (NO GAPS)") @@ -338,15 +228,10 @@ class AnalyticalLayerLoader: self.logger.info(f"\n [+] Valid: {len(valid_combinations):,}") self.logger.info(f" [-] Removed: {len(removed_combinations):,}") - df_valid = pd.DataFrame(valid_combinations) - df_valid['key'] = ( - df_valid['country_id'].astype(str) + '_' + - df_valid['indicator_id'].astype(str) - ) - self.df_clean['key'] = ( - self.df_clean['country_id'].astype(str) + '_' + - self.df_clean['indicator_id'].astype(str) - ) + df_valid = pd.DataFrame(valid_combinations) + df_valid['key'] = df_valid['country_id'].astype(str) + '_' + df_valid['indicator_id'].astype(str) + self.df_clean['key'] = (self.df_clean['country_id'].astype(str) + '_' + + self.df_clean['indicator_id'].astype(str)) original_count = len(self.df_clean) self.df_clean = self.df_clean[self.df_clean['key'].isin(df_valid['key'])].copy() @@ -358,10 +243,6 @@ class AnalyticalLayerLoader: self.logger.info(f" Indicators: {self.df_clean['indicator_id'].nunique()}") return self.df_clean - # ------------------------------------------------------------------ - # STEP 4: SELECT COUNTRIES WITH ALL PILLARS - # ------------------------------------------------------------------ - def select_countries_with_all_pillars(self): self.logger.info("\n" + "=" * 80) self.logger.info("STEP 4: SELECT COUNTRIES WITH ALL PILLARS (FIXED SET)") @@ -384,26 +265,18 @@ class AnalyticalLayerLoader: f"{row['pillar_count']}/{total_pillars} pillars" ) - selected_countries = country_pillar_count[ - country_pillar_count['pillar_count'] == total_pillars - ] + selected_countries = country_pillar_count[country_pillar_count['pillar_count'] == total_pillars] self.selected_country_ids = selected_countries['country_id'].tolist() self.logger.info(f"\n FIXED SET: {len(self.selected_country_ids)} countries") original_count = len(self.df_clean) - self.df_clean = self.df_clean[ - self.df_clean['country_id'].isin(self.selected_country_ids) - ].copy() + self.df_clean = self.df_clean[self.df_clean['country_id'].isin(self.selected_country_ids)].copy() self.logger.info(f" Rows before: {original_count:,}") self.logger.info(f" Rows after: {len(self.df_clean):,}") return self.df_clean - # ------------------------------------------------------------------ - # STEP 5: FILTER INDICATORS CONSISTENT ACROSS FIXED COUNTRIES - # ------------------------------------------------------------------ - def filter_indicators_consistent_across_fixed_countries(self): self.logger.info("\n" + "=" * 80) self.logger.info("STEP 5: FILTER INDICATORS WITH CONSISTENT PRESENCE") @@ -412,9 +285,7 @@ class AnalyticalLayerLoader: indicator_country_start = self.df_clean.groupby([ 'indicator_id', 'indicator_name', 'country_id' ])['year'].min().reset_index() - indicator_country_start.columns = [ - 'indicator_id', 'indicator_name', 'country_id', 'start_year' - ] + indicator_country_start.columns = ['indicator_id', 'indicator_name', 'country_id', 'start_year'] indicator_max_start = indicator_country_start.groupby([ 'indicator_id', 'indicator_name' @@ -463,379 +334,47 @@ class AnalyticalLayerLoader: raise ValueError("No valid indicators found after filtering!") original_count = len(self.df_clean) - self.df_clean = self.df_clean[ - self.df_clean['indicator_id'].isin(valid_indicators) - ].copy() + self.df_clean = self.df_clean[self.df_clean['indicator_id'].isin(valid_indicators)].copy() - self.indicator_max_start_map = ( - indicator_max_start[indicator_max_start['indicator_id'].isin(valid_indicators)] - .set_index('indicator_id')['max_start_year'] - .to_dict() + self.df_clean = self.df_clean.merge( + indicator_max_start[['indicator_id', 'max_start_year']], on='indicator_id', how='left' ) + self.df_clean = self.df_clean[self.df_clean['year'] >= self.df_clean['max_start_year']].copy() + self.df_clean = self.df_clean.drop('max_start_year', axis=1) - self.logger.info(f"\n Rows before : {original_count:,}") - self.logger.info(f" Rows after : {len(self.df_clean):,}") - self.logger.info(f" Countries : {self.df_clean['country_id'].nunique()}") - self.logger.info(f" Indicators : {self.df_clean['indicator_id'].nunique()}") - self.logger.info(f" Pillars : {self.df_clean['pillar_id'].nunique()}") - self.logger.info( - f"\n [NOTE] Baris year < max_start_year TETAP ADA di data. " - f"Label framework akan ditentukan di Step 6." - ) + self.logger.info(f"\n Rows before: {original_count:,}") + self.logger.info(f" Rows after: {len(self.df_clean):,}") + self.logger.info(f" Countries: {self.df_clean['country_id'].nunique()}") + self.logger.info(f" Indicators: {self.df_clean['indicator_id'].nunique()}") + self.logger.info(f" Pillars: {self.df_clean['pillar_id'].nunique()}") return self.df_clean - # ------------------------------------------------------------------ - # STEP 6: ASSIGN FRAMEWORK PER ROW - # ------------------------------------------------------------------ - - def assign_framework(self): - self.logger.info("\n" + "=" * 80) - self.logger.info("STEP 6: ASSIGN FRAMEWORK PER ROW") - self.logger.info("=" * 80) - - self.logger.info(f"\n SDG_TRANSITION_YEAR : {self.sdg_transition_year} (HARDCODE)") - self.logger.info(f" Alasan : SDGs resmi berlaku 1 Januari 2015") - self.logger.info(f" Bukan auto-detect : data FIES/anaemia ada sejak 2013,") - self.logger.info(f" tapi tahun 2013-2014 harus tetap MDGs") - - indicator_info = ( - self.df_clean[['indicator_id', 'indicator_name']] - .drop_duplicates() - .copy() - ) - indicator_info['is_sdg_only'] = ( - indicator_info['indicator_name'] - .str.lower() - .str.strip() - .isin(SDG_ONLY_KEYWORDS) - ) - - sdg_only_ids = set( - indicator_info.loc[indicator_info['is_sdg_only'], 'indicator_id'] - ) - non_sdg_ids = set( - indicator_info.loc[~indicator_info['is_sdg_only'], 'indicator_id'] - ) - - self.logger.info(f"\n SDG-only indicators ({len(sdg_only_ids)}):") - for _, row in indicator_info[indicator_info['is_sdg_only']].iterrows(): - actual_start = self.indicator_max_start_map.get(row['indicator_id'], '?') - self.logger.info( - f" [SDG-only] id={int(row['indicator_id'])} " - f"actual_start={actual_start} | {row['indicator_name']}" - ) - - self.logger.info(f"\n Non-SDG-only indicators ({len(non_sdg_ids)}): → MDGs selalu") - - if not sdg_only_ids: - raise ValueError( - "Tidak ada indikator SDG-only (FIES/anaemia) yang lolos filter. " - "Pastikan nama indikator di SDG_ONLY_KEYWORDS cocok dengan data BigQuery." - ) - - self.df_clean['_is_sdg_only'] = self.df_clean['indicator_id'].isin(sdg_only_ids) - - self.df_clean['framework'] = np.where( - self.df_clean['_is_sdg_only'] & - (self.df_clean['year'] >= self.sdg_transition_year), - 'SDGs', - 'MDGs' - ) - - self.df_clean = self.df_clean.drop(columns=['_is_sdg_only']) - - self.logger.info(f"\n Logika assign framework (PER BARIS):") - self.logger.info(f" {'─'*72}") - self.logger.info(f" Indikator TIDAK di SDG_ONLY_KEYWORDS → 'MDGs' di semua tahun") - self.logger.info(f" Indikator DI SDG_ONLY_KEYWORDS:") - self.logger.info(f" year < {self.sdg_transition_year} → 'MDGs' (data tetap ada, tidak dihapus)") - self.logger.info(f" year >= {self.sdg_transition_year} → 'SDGs'") - self.logger.info(f" {'─'*72}") - - self.logger.info(f"\n Verifikasi framework per indikator:") - self.logger.info(f" {'─'*115}") - self.logger.info( - f" {'ID':<5} {'Indicator Name':<52} {'Data From':<11} " - f"{'MDGs rows':<11} {'SDGs rows':<11} {'Note'}" - ) - self.logger.info(f" {'─'*115}") - - for ind_id, grp in self.df_clean.groupby('indicator_id'): - ind_name = grp['indicator_name'].iloc[0] - mdgs_rows = (grp['framework'] == 'MDGs').sum() - sdgs_rows = (grp['framework'] == 'SDGs').sum() - is_sdg_only = ind_id in sdg_only_ids - data_from = int(grp['year'].min()) - - if is_sdg_only: - mdgs_yrs = sorted(grp[grp['framework'] == 'MDGs']['year'].unique()) - sdgs_yrs = sorted(grp[grp['framework'] == 'SDGs']['year'].unique()) - yr_range_mdgs = f"{min(mdgs_yrs)}-{max(mdgs_yrs)}" if mdgs_yrs else "-" - yr_range_sdgs = f"{min(sdgs_yrs)}-{max(sdgs_yrs)}" if sdgs_yrs else "-" - note = f"MDGs:{yr_range_mdgs} | SDGs:{yr_range_sdgs}" - else: - note = "MDGs always" - - self.logger.info( - f" {int(ind_id):<5} {ind_name[:50]:<52} {data_from:<11} " - f"{mdgs_rows:<11} {sdgs_rows:<11} {note}" - ) - - fw_summary = self.df_clean['framework'].value_counts() - self.logger.info(f"\n Ringkasan rows: " + " | ".join( - f"{fw}: {cnt:,}" for fw, cnt in fw_summary.items() - )) - - end_year_df = self.df_clean[self.df_clean['year'] == self.end_year] - fw_ind_summary = end_year_df.groupby('framework')['indicator_id'].nunique() - self.logger.info(f" Indicators di year={self.end_year}: " + " | ".join( - f"{fw}: {cnt}" for fw, cnt in fw_ind_summary.items() - )) - - self.logger.info( - f"\n [OK] 'framework' ditambahkan — " - f"MDGs: {(self.df_clean['framework'] == 'MDGs').sum():,} rows | " - f"SDGs: {(self.df_clean['framework'] == 'SDGs').sum():,} rows" - ) - return self.df_clean - - # ------------------------------------------------------------------ - # STEP 7: VERIFY NO GAPS - # ------------------------------------------------------------------ - def verify_no_gaps(self): self.logger.info("\n" + "=" * 80) - self.logger.info("STEP 7: VERIFY NO GAPS") + self.logger.info("STEP 6: VERIFY NO GAPS") self.logger.info("=" * 80) expected_countries = len(self.selected_country_ids) - all_good = True - bad_rows = [] - - for ind_id, grp in self.df_clean.groupby('indicator_id'): - actual_start = self.indicator_max_start_map.get(ind_id) - if actual_start is None: - continue - - expected_years = list(range(int(actual_start), self.end_year + 1)) - - for year in expected_years: - country_count = grp[grp['year'] == year]['country_id'].nunique() - if country_count != expected_countries: - all_good = False - bad_rows.append({ - 'indicator_id' : int(ind_id), - 'year' : int(year), - 'country_count': int(country_count), - }) + verification = self.df_clean.groupby(['indicator_id', 'year'])['country_id'].nunique().reset_index() + verification.columns = ['indicator_id', 'year', 'country_count'] + all_good = (verification['country_count'] == expected_countries).all() if all_good: - self.logger.info( - f" VERIFICATION PASSED — all combinations from actual_start_year " - f"have {expected_countries} countries" - ) + self.logger.info(f" VERIFICATION PASSED — all combinations have {expected_countries} countries") else: - for row in bad_rows[:10]: + bad = verification[verification['country_count'] != expected_countries] + for _, row in bad.head(10).iterrows(): self.logger.error( - f" Indicator {row['indicator_id']}, Year {row['year']}: " - f"{row['country_count']} countries (expected {expected_countries})" + f" Indicator {int(row['indicator_id'])}, Year {int(row['year'])}: " + f"{int(row['country_count'])} countries (expected {expected_countries})" ) raise ValueError("Gap verification failed!") return True - # ------------------------------------------------------------------ - # STEP 8: CALCULATE NORM_VALUE_1_100 PER INDICATOR - # ------------------------------------------------------------------ - # PERBAIKAN: - # Normalisasi dilakukan SEKALI per indikator dari SELURUH DATA - # (semua tahun 2013–end_year, semua negara, tanpa memisahkan framework). - # - # Alasan: - # - Sebelumnya, rescaling per-framework di analysis_layer menyebabkan - # nilai 1-100 era MDGs dan SDGs memiliki referensi yang berbeda, - # sehingga tidak dapat dibandingkan secara adil. - # - Dengan satu normalisasi global per indikator, nilai 60 di era MDGs - # dan nilai 60 di era SDGs berarti hal yang sama: posisi relatif yang - # sama dalam distribusi historis indikator tersebut. - # - Jika SDGs memang era yang lebih buruk secara substantif, itu akan - # tercermin sebagai nilai norm yang memang lebih rendah — bukan artefak - # dari rescaling ulang. - # ------------------------------------------------------------------ - - def calculate_norm_value(self): - self.logger.info("\n" + "=" * 80) - self.logger.info("STEP 8: CALCULATE NORM_VALUE_1_100 PER INDICATOR (GLOBAL, SEKALI)") - self.logger.info("=" * 80) - self.logger.info( - "\n [PERBAIKAN] Normalisasi dilakukan SEKALI per indikator dari seluruh data." - "\n Tidak ada rescaling ulang per-framework." - "\n Ini memastikan komparabilitas lintas era MDGs dan SDGs." - ) - - DIRECTION_INVERT = frozenset({ - "negative", "lower_better", "lower_is_better", "inverse", "neg", - }) - - df = self.df_clean.copy() - norm_parts = [] - - indicators = df.groupby(['indicator_id', 'indicator_name', 'direction']) - self.logger.info( - f"\n {'ID':<5} {'Direction':<15} {'Invert':<8} " - f"{'Min':>10} {'Max':>10} {'Indicator Name'}" - ) - self.logger.info(f" {'-'*90}") - - for (ind_id, ind_name, direction), grp in indicators: - grp = grp.copy() - do_invert = str(direction).lower().strip() in DIRECTION_INVERT - valid_mask = grp['value'].notna() - n_valid = valid_mask.sum() - - if n_valid < 2: - grp['norm_value_1_100'] = np.nan - norm_parts.append(grp) - self.logger.warning( - f" {int(ind_id):<5} {direction:<15} {'N/A':<8} " - f"{'N/A':>10} {'N/A':>10} {ind_name[:45]} [SKIPPED: n_valid={n_valid}]" - ) - continue - - raw = grp.loc[valid_mask, 'value'].values - v_min = raw.min() - v_max = raw.max() - normed = np.full(len(grp), np.nan) - - if v_min == v_max: - # Semua nilai sama → assign tengah skala - normed[valid_mask.values] = 50.5 - else: - scaled = (raw - v_min) / (v_max - v_min) - if do_invert: - scaled = 1.0 - scaled - normed[valid_mask.values] = 1.0 + scaled * 99.0 - - grp['norm_value_1_100'] = normed - - self.logger.info( - f" {int(ind_id):<5} {direction:<15} {'YES' if do_invert else 'no':<8} " - f"{v_min:>10.3f} {v_max:>10.3f} {ind_name[:45]}" - ) - norm_parts.append(grp) - - self.df_clean = pd.concat(norm_parts, ignore_index=True) - - valid_norm = self.df_clean['norm_value_1_100'].notna().sum() - null_norm = self.df_clean['norm_value_1_100'].isna().sum() - self.logger.info(f"\n norm_value_1_100 — valid: {valid_norm:,} | null: {null_norm:,}") - self.logger.info( - f" Range aktual: " - f"{self.df_clean['norm_value_1_100'].min():.2f} - " - f"{self.df_clean['norm_value_1_100'].max():.2f}" - ) - - # ---------------------------------------------------------------- - # VALIDASI KOMPARABILITAS: Cek apakah ada gap sistematis antar era - # Ini adalah sinyal diagnostik — bukan error. - # Gap besar (>15 poin) setelah perbaikan = fenomena nyata, bukan artefak. - # ---------------------------------------------------------------- - self.logger.info(f"\n [DIAGNOSTIK KOMPARABILITAS] Rata-rata norm per framework per tahun:") - self.logger.info(f" {'─'*55}") - - fw_year_mean = ( - self.df_clean - .groupby(['framework', 'year'])['norm_value_1_100'] - .mean() - .reset_index() - .sort_values(['framework', 'year']) - ) - for fw, grp_fw in fw_year_mean.groupby('framework'): - means = grp_fw['norm_value_1_100'].values - years = grp_fw['year'].values - self.logger.info(f"\n Framework: {fw}") - for yr, m in zip(years, means): - bar = '█' * int(m / 5) - self.logger.info(f" {int(yr)} : {m:6.2f} {bar}") - - # Bandingkan rata-rata MDGs vs SDGs (hanya tahun di mana keduanya ada) - mdgs_mean_total = self.df_clean[self.df_clean['framework'] == 'MDGs']['norm_value_1_100'].mean() - sdgs_mean_total = self.df_clean[self.df_clean['framework'] == 'SDGs']['norm_value_1_100'].mean() - gap = mdgs_mean_total - sdgs_mean_total - self.logger.info( - f"\n Rata-rata keseluruhan:" - f"\n MDGs : {mdgs_mean_total:.2f}" - f"\n SDGs : {sdgs_mean_total:.2f}" - f"\n Gap : {gap:.2f} poin" - ) - if abs(gap) > 15: - self.logger.info( - f"\n [INFO] Gap {gap:.2f} poin antara MDGs dan SDGs." - f"\n Setelah perbaikan normalisasi (satu referensi global)," - f"\n gap ini mencerminkan perbedaan SUBSTANTIF, bukan artefak teknis." - f"\n Indikator SDGs memang mengukur dimensi deprivasi yang lebih dalam" - f"\n (FIES, stunting, wasting, anaemia) dibanding indikator MDGs." - ) - else: - self.logger.info( - f"\n [OK] Gap {gap:.2f} poin — dalam batas wajar, tidak ada bias sistematis." - ) - - # Distribusi kondisi - self.df_clean['_condition_preview'] = ( - self.df_clean['norm_value_1_100'].apply(assign_condition) - ) - cond_dist = self.df_clean['_condition_preview'].value_counts() - self.logger.info( - f"\n Distribusi kondisi " - f"(threshold: bad<{THRESHOLD_BAD}, good>{THRESHOLD_GOOD}):" - ) - for cond, cnt in cond_dist.items(): - self.logger.info(f" {cond}: {cnt:,} rows") - self.df_clean = self.df_clean.drop(columns=['_condition_preview']) - - self.logger.info(f"\n [OK] Kolom 'norm_value_1_100' ditambahkan ke df_clean") - return self.df_clean - - # ------------------------------------------------------------------ - # STEP 9: CALCULATE YOY - # ------------------------------------------------------------------ - - def calculate_yoy(self): - self.logger.info("\n" + "=" * 80) - self.logger.info("STEP 9: CALCULATE YEAR-OVER-YEAR (YoY) PER INDICATOR PER COUNTRY") - self.logger.info("=" * 80) - - df = self.df_clean.sort_values(['country_id', 'indicator_id', 'year']).copy() - - df['value_prev'] = df.groupby(['country_id', 'indicator_id'])['value'].shift(1) - df['yoy_change'] = df['value'] - df['value_prev'] - df['yoy_pct'] = np.where( - df['value_prev'].notna() & (df['value_prev'] != 0), - (df['yoy_change'] / df['value_prev'].abs()) * 100, - np.nan - ) - df = df.drop(columns=['value_prev']) - - total_rows = len(df) - valid_yoy = df['yoy_pct'].notna().sum() - null_yoy = df['yoy_pct'].isna().sum() - - self.logger.info(f" Total rows : {total_rows:,}") - self.logger.info(f" YoY calculated : {valid_yoy:,}") - self.logger.info(f" YoY NULL (base yr): {null_yoy:,}") - - self.df_clean = df - self.logger.info(f" [OK] Kolom 'yoy_change', 'yoy_pct' ditambahkan") - return self.df_clean - - # ------------------------------------------------------------------ - # STEP 10: ANALYZE INDICATOR AVAILABILITY BY YEAR - # ------------------------------------------------------------------ - def analyze_indicator_availability_by_year(self): self.logger.info("\n" + "=" * 80) - self.logger.info("STEP 10: ANALYZE INDICATOR AVAILABILITY BY YEAR") + self.logger.info("STEP 7: ANALYZE INDICATOR AVAILABILITY BY YEAR") self.logger.info("=" * 80) year_stats = self.df_clean.groupby('year').agg({ @@ -861,139 +400,57 @@ class AnalyticalLayerLoader: 'indicator_id', 'indicator_name', 'pillar_name', 'direction', 'start_year', 'end_year', 'country_count' ] - - fw_at_end = ( - self.df_clean[self.df_clean['year'] == self.end_year] - .groupby('indicator_id')['framework'] - .first() - .reset_index() - ) - indicator_details = indicator_details.merge(fw_at_end, on='indicator_id', how='left') - indicator_details['framework'] = indicator_details['framework'].fillna('MDGs') - indicator_details['year_range'] = ( indicator_details['start_year'].astype(int).astype(str) + '-' + indicator_details['end_year'].astype(int).astype(str) ) - indicator_details = indicator_details.sort_values( - ['framework', 'pillar_name', 'start_year', 'indicator_name'] - ) + indicator_details = indicator_details.sort_values(['pillar_name', 'start_year', 'indicator_name']) self.logger.info(f"\nTotal Indicators: {len(indicator_details)}") - self.logger.info(f"Framework breakdown (at end_year={self.end_year}):") - for fw, count in indicator_details.groupby('framework').size().items(): - self.logger.info(f" {fw}: {count} indicators") + for pillar, count in indicator_details.groupby('pillar_name').size().items(): + self.logger.info(f" {pillar}: {count} indicators") - self.logger.info(f"\n{'-'*110}") - self.logger.info( - f"{'ID':<5} {'Indicator Name':<55} {'Pillar':<15} " - f"{'Framework':<10} {'Years':<12} {'Dir':<8} {'Countries'}" - ) - self.logger.info(f"{'-'*110}") + self.logger.info(f"\n{'-'*100}") + self.logger.info(f"{'ID':<5} {'Indicator Name':<55} {'Pillar':<15} {'Years':<12} {'Dir':<8} {'Countries'}") + self.logger.info(f"{'-'*100}") for _, row in indicator_details.iterrows(): direction = 'higher+' if row['direction'] == 'higher_better' else 'lower-' self.logger.info( f"{int(row['indicator_id']):<5} {row['indicator_name'][:52]:<55} " - f"{row['pillar_name'][:13]:<15} {row['framework']:<10} " - f"{row['year_range']:<12} {direction:<8} {int(row['country_count'])}" + f"{row['pillar_name'][:13]:<15} {row['year_range']:<12} " + f"{direction:<8} {int(row['country_count'])}" ) return year_stats - # ------------------------------------------------------------------ - # STEP 11: SAVE ANALYTICAL TABLE - # ------------------------------------------------------------------ - def save_analytical_table(self): - table_name = 'fact_asean_food_security_selected' - + table_name = 'analytical_food_security' self.logger.info("\n" + "=" * 80) - self.logger.info(f"STEP 11: SAVE TO [DW/Gold] {table_name} -> fs_asean_gold") + self.logger.info(f"STEP 8: SAVE TO [DW/Gold] {table_name} -> fs_asean_gold") self.logger.info("=" * 80) try: - if 'framework' not in self.df_clean.columns: - raise ValueError("Kolom 'framework' tidak ada. Pastikan Step 6 sudah dijalankan.") - if 'norm_value_1_100' not in self.df_clean.columns: - raise ValueError("Kolom 'norm_value_1_100' tidak ada. Pastikan Step 8 sudah dijalankan.") - if 'yoy_change' not in self.df_clean.columns: - raise ValueError("Kolom 'yoy_change' tidak ada. Pastikan Step 9 sudah dijalankan.") - analytical_df = self.df_clean[[ - 'country_id', - 'country_name', - 'indicator_id', - 'indicator_name', - 'direction', - 'framework', - 'pillar_id', - 'pillar_name', - 'time_id', - 'year', - 'value', - 'norm_value_1_100', - 'yoy_change', - 'yoy_pct', + 'country_id', 'indicator_id', 'pillar_id', 'time_id', 'value' ]].copy() - analytical_df = analytical_df.sort_values( - ['year', 'country_name', 'pillar_name', 'indicator_name'] + ['time_id', 'country_id', 'indicator_id'] ).reset_index(drop=True) - analytical_df['country_id'] = analytical_df['country_id'].astype(int) - analytical_df['country_name'] = analytical_df['country_name'].astype(str) - analytical_df['indicator_id'] = analytical_df['indicator_id'].astype(int) - analytical_df['indicator_name'] = analytical_df['indicator_name'].astype(str) - analytical_df['direction'] = analytical_df['direction'].astype(str) - analytical_df['framework'] = analytical_df['framework'].astype(str) - analytical_df['pillar_id'] = analytical_df['pillar_id'].astype(int) - analytical_df['pillar_name'] = analytical_df['pillar_name'].astype(str) - analytical_df['time_id'] = analytical_df['time_id'].astype(int) - analytical_df['year'] = analytical_df['year'].astype(int) - analytical_df['value'] = analytical_df['value'].astype(float) - analytical_df['norm_value_1_100'] = analytical_df['norm_value_1_100'].astype(float) - analytical_df['yoy_change'] = analytical_df['yoy_change'].astype(float) - analytical_df['yoy_pct'] = analytical_df['yoy_pct'].astype(float) + analytical_df['country_id'] = analytical_df['country_id'].astype(int) + analytical_df['indicator_id'] = analytical_df['indicator_id'].astype(int) + analytical_df['pillar_id'] = analytical_df['pillar_id'].astype(int) + analytical_df['time_id'] = analytical_df['time_id'].astype(int) + analytical_df['value'] = analytical_df['value'].astype(float) self.logger.info(f" Total rows: {len(analytical_df):,}") - fw_dist_rows = analytical_df['framework'].value_counts() - self.logger.info(f" Framework distribution (rows):") - for fw, cnt in fw_dist_rows.items(): - self.logger.info(f" {fw}: {cnt:,} rows") - - fw_dist_ind = ( - analytical_df[analytical_df['year'] == self.end_year] - .drop_duplicates('indicator_id')['framework'] - .value_counts() - ) - self.logger.info( - f" Framework distribution (indicators at year={self.end_year}):" - ) - for fw, cnt in fw_dist_ind.items(): - self.logger.info(f" {fw}: {cnt} indicators") - - self.logger.info( - f" norm_value_1_100 range: " - f"{analytical_df['norm_value_1_100'].min():.2f} - " - f"{analytical_df['norm_value_1_100'].max():.2f}" - ) - schema = [ - bigquery.SchemaField("country_id", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("country_name", "STRING", mode="REQUIRED"), - bigquery.SchemaField("indicator_id", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("indicator_name", "STRING", mode="REQUIRED"), - bigquery.SchemaField("direction", "STRING", mode="REQUIRED"), - bigquery.SchemaField("framework", "STRING", mode="REQUIRED"), - bigquery.SchemaField("pillar_id", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("pillar_name", "STRING", mode="REQUIRED"), - bigquery.SchemaField("time_id", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("value", "FLOAT", mode="REQUIRED"), - bigquery.SchemaField("norm_value_1_100", "FLOAT", mode="NULLABLE"), - bigquery.SchemaField("yoy_change", "FLOAT", mode="NULLABLE"), - bigquery.SchemaField("yoy_pct", "FLOAT", mode="NULLABLE"), + bigquery.SchemaField("country_id", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("indicator_id", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("pillar_id", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("time_id", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("value", "FLOAT", mode="REQUIRED"), ] rows_loaded = load_to_bigquery( @@ -1014,65 +471,33 @@ class AnalyticalLayerLoader: 'rows_loaded' : rows_loaded, 'completeness_pct' : 100.0, 'config_snapshot' : json.dumps({ - 'start_year' : self.start_year, - 'end_year' : self.end_year, - 'baseline_year' : self.baseline_year, - 'sdg_transition_year' : self.sdg_transition_year, - 'sdg_transition_source' : 'HARDCODE — SDGs resmi berlaku 1 Jan 2015', - 'fixed_countries' : len(self.selected_country_ids), - 'norm_scale' : ( - '1-100 per indicator global minmax direction-aware. ' - 'SATU normalisasi untuk seluruh data tanpa rescaling per-framework. ' - 'Komparabilitas lintas era MDGs/SDGs terjamin.' - ), - 'framework_logic' : ( - f'SDG_TRANSITION_YEAR={SDG_TRANSITION_YEAR} (HARDCODE); ' - 'SDG-only + year >= SDG_TRANSITION_YEAR → SDGs; ' - 'SDG-only + year < SDG_TRANSITION_YEAR → MDGs (data tetap ada); ' - 'non-SDG-only → MDGs selalu' - ), - 'sdg_only_keywords_count': len(SDG_ONLY_KEYWORDS), - 'condition_thresholds' : { - 'bad' : f'< {THRESHOLD_BAD}', - 'moderate': f'{THRESHOLD_BAD}-{THRESHOLD_GOOD}', - 'good' : f'> {THRESHOLD_GOOD}', - }, + 'start_year' : self.start_year, + 'end_year' : self.end_year, + 'fixed_countries': len(self.selected_country_ids), + 'no_gaps' : True, + 'layer' : 'gold' }), 'validation_metrics' : json.dumps({ - 'fixed_countries' : len(self.selected_country_ids), - 'total_indicators' : int(self.df_clean['indicator_id'].nunique()), - 'sdg_transition_year': self.sdg_transition_year, - 'framework_dist_rows': fw_dist_rows.to_dict(), + 'fixed_countries' : len(self.selected_country_ids), + 'total_indicators': int(self.df_clean['indicator_id'].nunique()) }) } save_etl_metadata(self.client, metadata) - self.logger.info(f" [OK] {table_name}: {rows_loaded:,} rows -> fs_asean_gold") + self.logger.info(f" ✓ {table_name}: {rows_loaded:,} rows → [DW/Gold] fs_asean_gold") + self.logger.info(f" Metadata → [AUDIT] etl_metadata") return rows_loaded except Exception as e: self.logger.error(f"Error saving: {e}") raise - # ------------------------------------------------------------------ - # RUN - # ------------------------------------------------------------------ - def run(self): self.pipeline_start = datetime.now() self.pipeline_metadata['start_time'] = self.pipeline_start self.logger.info("\n" + "=" * 80) - self.logger.info("Output: fact_asean_food_security_selected -> fs_asean_gold") - self.logger.info("Kolom baru: norm_value_1_100 (min-max 1-100, direction-aware)") - self.logger.info(f"Condition threshold: bad<{THRESHOLD_BAD}, good>{THRESHOLD_GOOD}") - self.logger.info( - f"Framework: SDG_TRANSITION_YEAR={SDG_TRANSITION_YEAR} (HARDCODE). " - "SDG-only + year >= 2015 → SDGs; sebelumnya MDGs. Non-SDG-only → MDGs selalu." - ) - self.logger.info( - "NORMALISASI: SATU referensi global per indikator — tidak ada rescaling per-framework." - ) + self.logger.info("Output: analytical_food_security → fs_asean_gold") self.logger.info("=" * 80) self.load_source_data() @@ -1080,10 +505,7 @@ class AnalyticalLayerLoader: self.filter_complete_indicators_per_country() self.select_countries_with_all_pillars() self.filter_indicators_consistent_across_fixed_countries() - self.assign_framework() self.verify_no_gaps() - self.calculate_norm_value() - self.calculate_yoy() self.analyze_indicator_availability_by_year() self.save_analytical_table() @@ -1093,12 +515,11 @@ class AnalyticalLayerLoader: self.logger.info("\n" + "=" * 80) self.logger.info("COMPLETED") self.logger.info("=" * 80) - self.logger.info(f" Duration : {duration:.2f}s") - self.logger.info(f" Year Range : {self.start_year}-{self.end_year}") - self.logger.info(f" SDG Transition Year: {self.sdg_transition_year} (HARDCODE)") - self.logger.info(f" Countries : {len(self.selected_country_ids)}") - self.logger.info(f" Indicators : {self.df_clean['indicator_id'].nunique()}") - self.logger.info(f" Rows Loaded : {self.pipeline_metadata['rows_loaded']:,}") + self.logger.info(f" Duration : {duration:.2f}s") + self.logger.info(f" Year Range : {self.start_year}-{self.end_year}") + self.logger.info(f" Countries : {len(self.selected_country_ids)}") + self.logger.info(f" Indicators : {self.df_clean['indicator_id'].nunique()}") + self.logger.info(f" Rows Loaded: {self.pipeline_metadata['rows_loaded']:,}") # ============================================================================= @@ -1106,6 +527,10 @@ class AnalyticalLayerLoader: # ============================================================================= def run_analytical_layer(): + """ + Airflow task: Build analytical_food_security dari fact_food_security + dims. + Dipanggil setelah dimensional_model_to_gold selesai. + """ from scripts.bigquery_config import get_bigquery_client client = get_bigquery_client() loader = AnalyticalLayerLoader(client) @@ -1119,14 +544,7 @@ def run_analytical_layer(): if __name__ == "__main__": print("=" * 80) - print("BIGQUERY ANALYTICAL LAYER - DATA FILTERING") - print("Output: fact_asean_food_security_selected -> fs_asean_gold") - print(f"Norm: min-max 1-100 per indicator, direction-aware, GLOBAL (satu referensi)") - print(f"Condition threshold: bad<{THRESHOLD_BAD}, good>{THRESHOLD_GOOD}") - print( - f"Framework: SDG_TRANSITION_YEAR={SDG_TRANSITION_YEAR} (HARDCODE). " - "SDG-only + year >= 2015 → SDGs; sebelumnya MDGs. Non-SDG-only → MDGs selalu." - ) + print("Output: analytical_food_security → fs_asean_gold") print("=" * 80) logger = setup_logging() @@ -1136,6 +554,4 @@ if __name__ == "__main__": print("\n" + "=" * 80) print("[OK] COMPLETED") - print(f" SDG Transition Year : {loader.sdg_transition_year} (HARDCODE)") - print(f" Rows Loaded : {loader.pipeline_metadata['rows_loaded']:,}") print("=" * 80) \ No newline at end of file diff --git a/scripts/bigquery_cleaned_layer.py b/scripts/bigquery_cleaned_layer.py index 682035e..0d112fc 100644 --- a/scripts/bigquery_cleaned_layer.py +++ b/scripts/bigquery_cleaned_layer.py @@ -40,12 +40,12 @@ def load_staging_data(client: bigquery.Client) -> pd.DataFrame: """Load data dari staging_integrated (STAGING/Silver layer).""" print("\nLoading data from staging_integrated (fs_asean_silver)...") df_staging = read_from_bigquery(client, 'staging_integrated', layer='silver') - print(f" Loaded : {len(df_staging):,} rows") - print(f" Columns : {len(df_staging.columns)}") - print(f" Sources : {df_staging['source'].nunique()}") - print(f" Indicators : {df_staging['indicator_standardized'].nunique()}") - print(f" Countries : {df_staging['country'].nunique()}") - print(f" Year range : {int(df_staging['year'].min())}-{int(df_staging['year'].max())}") + print(f" ✓ Loaded : {len(df_staging):,} rows") + print(f" Columns : {len(df_staging.columns)}") + print(f" Sources : {df_staging['source'].nunique()}") + print(f" Indicators : {df_staging['indicator_standardized'].nunique()}") + print(f" Countries : {df_staging['country'].nunique()}") + print(f" Year range : {int(df_staging['year'].min())}-{int(df_staging['year'].max())}") return df_staging @@ -53,6 +53,7 @@ def load_staging_data(client: bigquery.Client) -> pd.DataFrame: # COLUMN CONSTRAINT HELPERS # ============================================================================= +# Schema constraints — semua varchar max lengths COLUMN_CONSTRAINTS = { 'source' : 20, 'indicator_original' : 255, @@ -61,7 +62,7 @@ COLUMN_CONSTRAINTS = { 'year_range' : 20, 'unit' : 20, 'pillar' : 20, - 'direction' : 15, + 'direction' : 15, # 'higher_better'=13, 'lower_better'=12 } @@ -100,11 +101,11 @@ def apply_column_constraints(df: pd.DataFrame) -> pd.DataFrame: ) if truncation_report: - print("\n Column Truncations Applied:") + print("\n ⚠ Column Truncations Applied:") for column, info in truncation_report.items(): print(f" - {column}: {info['count']} values truncated to {info['max_length']} chars") else: - print("\n No truncations needed — all values within constraints") + print("\n ✓ No truncations needed — all values within constraints") return df_constrained @@ -155,11 +156,11 @@ def standardize_country_names_asean(df: pd.DataFrame, country_column: str = 'cou def map_country(country): if pd.isna(country): return country - s = str(country).strip() + s = str(country).strip() mapped = ASEAN_MAPPING.get(s.upper(), s) return mapped[:100] if len(mapped) > 100 else mapped - original = df_clean[country_column].copy() + original = df_clean[country_column].copy() df_clean[country_column] = df_clean[country_column].apply(map_country) changes = {orig: new for orig, new in zip(original, df_clean[country_column]) if orig != new} @@ -176,16 +177,16 @@ def standardize_country_names_asean(df: pd.DataFrame, country_column: str = 'cou def assign_pillar(indicator_name: str) -> str: """ Assign pillar berdasarkan keyword indikator. - Return values: 'Availability', 'Access', 'Utilization', 'Stability', 'Supporting' - All <= 20 chars (varchar(20) constraint). + Return values: 'Availability', 'Access', 'Utilization', 'Stability', 'Other' + All ≤ 20 chars (varchar(20) constraint). """ if pd.isna(indicator_name): - return 'Supporting' + return 'Other' ind = str(indicator_name).lower() for kw in ['requirement', 'coefficient', 'losses', 'fat supply']: if kw in ind: - return 'Supporting' + return 'Other' if any(kw in ind for kw in [ 'adequacy', 'protein supply', 'supply of protein', @@ -209,13 +210,12 @@ def assign_pillar(indicator_name: str) -> str: if any(kw in ind for kw in [ 'wasting', 'wasted', 'stunted', 'overweight', 'obese', 'obesity', - 'anemia', 'anaemia', 'birthweight', 'breastfeeding', 'drinking water', - 'sanitation', 'children under 5', 'newborns with low', - 'women of reproductive' + 'anemia', 'birthweight', 'breastfeeding', 'drinking water', 'sanitation', + 'children under 5', 'newborns with low', 'women of reproductive' ]): return 'Utilization' - return 'Supporting' + return 'Other' # ============================================================================= @@ -226,15 +226,17 @@ def assign_direction(indicator_name: str) -> str: """ Assign direction berdasarkan indikator. Return values: 'higher_better' (13 chars) atau 'lower_better' (12 chars) - Both <= 15 chars (varchar(15) constraint). + Both ≤ 15 chars (varchar(15) constraint). """ if pd.isna(indicator_name): return 'higher_better' ind = str(indicator_name).lower() + # Spesifik lower_better if 'share of dietary energy supply derived from cereals' in ind: return 'lower_better' + # Higher_better exceptions — cek sebelum lower_better keywords for kw in [ 'exclusive breastfeeding', 'dietary energy supply', @@ -246,6 +248,7 @@ def assign_direction(indicator_name: str) -> str: if kw in ind: return 'higher_better' + # Lower_better — masalah yang harus diminimalkan for kw in [ 'prevalence of undernourishment', 'prevalence of severe food insecurity', @@ -256,7 +259,6 @@ def assign_direction(indicator_name: str) -> str: 'prevalence of overweight', 'prevalence of obesity', 'prevalence of anemia', - 'prevalence of anaemia', 'prevalence of low birthweight', 'number of people undernourished', 'number of severely food insecure', @@ -281,9 +283,6 @@ def assign_direction(indicator_name: str) -> str: 'coefficient of variation', 'incidence of caloric losses', 'food losses', - 'indicator of food price anomalies', - 'proportion of local breeds classified as being at risk', - 'agricultural export subsidies', ]: if kw in ind: return 'lower_better' @@ -300,18 +299,19 @@ class CleanedDataLoader: Loader untuk cleaned integrated data ke STAGING layer (Silver). Kimball context: - Input : staging_integrated -> STAGING (Silver) — fs_asean_silver - Output : cleaned_integrated -> STAGING (Silver) — fs_asean_silver - Audit : etl_logs, etl_metadata -> AUDIT — fs_asean_audit + Input : staging_integrated → STAGING (Silver) — fs_asean_silver + Output : cleaned_integrated → STAGING (Silver) — fs_asean_silver + Audit : etl_logs, etl_metadata → AUDIT — fs_asean_audit Pipeline steps: 1. Standardize country names (ASEAN) 2. Remove missing values 3. Remove duplicates - 4. Add pillar & direction classification - 5. Apply column constraints - 6. Load ke BigQuery - 7. Log ke Audit layer + 4. Add pillar classification + 5. Add direction classification + 6. Apply column constraints + 7. Load ke BigQuery + 8. Log ke Audit layer """ SCHEMA = [ @@ -355,7 +355,7 @@ class CleanedDataLoader: def _step_standardize_countries(self, df: pd.DataFrame) -> pd.DataFrame: print("\n [Step 1/5] Standardize country names...") df, report = standardize_country_names_asean(df, country_column='country') - print(f" ASEAN countries mapped : {report['countries_mapped']}") + print(f" ✓ ASEAN countries mapped : {report['countries_mapped']}") unique_countries = sorted(df['country'].unique()) print(f" Countries ({len(unique_countries)}) : {', '.join(unique_countries)}") log_update(self.client, 'STAGING', 'staging_integrated', @@ -377,9 +377,7 @@ class CleanedDataLoader: def _step_remove_duplicates(self, df: pd.DataFrame) -> pd.DataFrame: print("\n [Step 3/5] Remove duplicates...") exact_dups = df.duplicated().sum() - data_dups = df.duplicated( - subset=['indicator_standardized', 'country', 'year', 'value'] - ).sum() + data_dups = df.duplicated(subset=['indicator_standardized', 'country', 'year', 'value']).sum() print(f" Exact duplicates : {exact_dups:,}") print(f" Data duplicates : {data_dups:,}") rows_before = len(df) @@ -393,21 +391,19 @@ class CleanedDataLoader: def _step_add_classifications(self, df: pd.DataFrame) -> pd.DataFrame: print("\n [Step 4/5] Add pillar & direction classification...") df = df.copy() - df['pillar'] = df['indicator_standardized'].apply(assign_pillar) df['direction'] = df['indicator_standardized'].apply(assign_direction) pillar_counts = df['pillar'].value_counts() - print(f" Pillar distribution:") + print(f" ✓ Pillar distribution:") for pillar, count in pillar_counts.items(): print(f" - {pillar}: {count:,}") direction_counts = df['direction'].value_counts() - print(f" Direction distribution:") + print(f" ✓ Direction distribution:") for direction, count in direction_counts.items(): pct = count / len(df) * 100 print(f" - {direction}: {count:,} ({pct:.1f}%)") - return df def _step_apply_constraints(self, df: pd.DataFrame) -> pd.DataFrame: @@ -442,6 +438,7 @@ class CleanedDataLoader: if 'country' in df.columns: validation['unique_countries'] = int(df['country'].nunique()) + # Column length check column_length_check = {} for col, max_len in COLUMN_CONSTRAINTS.items(): if col in df.columns: @@ -460,7 +457,7 @@ class CleanedDataLoader: def run(self, df: pd.DataFrame) -> int: """ - Execute full cleaning pipeline -> load ke STAGING (Silver). + Execute full cleaning pipeline → load ke STAGING (Silver). Returns: int: Rows loaded @@ -472,6 +469,7 @@ class CleanedDataLoader: print(" ERROR: DataFrame is empty, nothing to process.") return 0 + # Pipeline steps df = self._step_standardize_countries(df) df = self._step_remove_missing(df) df = self._step_remove_duplicates(df) @@ -480,6 +478,7 @@ class CleanedDataLoader: self.metadata['rows_transformed'] = len(df) + # Validate validation = self.validate_data(df) self.metadata['validation_metrics'] = validation @@ -488,12 +487,13 @@ class CleanedDataLoader: for info in validation.get('column_length_check', {}).values() ) if not all_within_limits: - print("\n WARNING: Some columns still exceed length constraints!") + print("\n ⚠ WARNING: Some columns still exceed length constraints!") for col, info in validation['column_length_check'].items(): if not info['within_limit']: print(f" - {col}: {info['max_actual_length']} > {info['max_length_constraint']}") - print(f"\n Loading to [STAGING/Silver] {self.table_name} -> fs_asean_silver...") + # Load ke Silver + print(f"\n Loading to [STAGING/Silver] {self.table_name} → fs_asean_silver...") rows_loaded = load_to_bigquery( self.client, df, self.table_name, layer='silver', @@ -502,8 +502,10 @@ class CleanedDataLoader: ) self.metadata['rows_loaded'] = rows_loaded + # Audit logs log_update(self.client, 'STAGING', self.table_name, 'full_refresh', rows_loaded) + # ETL metadata self.metadata['end_time'] = datetime.now() self.metadata['duration_seconds'] = ( self.metadata['end_time'] - self.metadata['start_time'] @@ -514,31 +516,33 @@ class CleanedDataLoader: self.metadata['validation_metrics'] = json.dumps(validation) save_etl_metadata(self.client, self.metadata) - print(f"\n Cleaned Integration completed: {rows_loaded:,} rows") + # Summary + print(f"\n ✓ Cleaned Integration completed: {rows_loaded:,} rows") print(f" Duration : {self.metadata['duration_seconds']:.2f}s") print(f" Completeness : {validation['completeness_pct']:.2f}%") if 'year_range' in validation: yr = validation['year_range'] if yr['min'] and yr['max']: - print(f" Year range : {yr['min']}-{yr['max']}") + print(f" Year range : {yr['min']}–{yr['max']}") print(f" Indicators : {validation.get('unique_indicators', '-')}") print(f" Countries : {validation.get('unique_countries', '-')}") print(f"\n Schema Validation:") for col, info in validation.get('column_length_check', {}).items(): - status = "OK" if info['within_limit'] else "FAIL" - print(f" [{status}] {col}: {info['max_actual_length']}/{info['max_length_constraint']}") - print(f"\n Metadata -> [AUDIT] etl_metadata") + status = "✓" if info['within_limit'] else "✗" + print(f" {status} {col}: {info['max_actual_length']}/{info['max_length_constraint']}") + print(f"\n Metadata → [AUDIT] etl_metadata") return rows_loaded # ============================================================================= -# AIRFLOW TASK FUNCTIONS +# AIRFLOW TASK FUNCTIONS ← sama polanya dengan raw layer # ============================================================================= def run_cleaned_integration(): """ Airflow task: Load cleaned_integrated dari staging_integrated. + Dipanggil oleh DAG setelah task staging_integration_to_silver selesai. """ from scripts.bigquery_config import get_bigquery_client @@ -557,21 +561,21 @@ if __name__ == "__main__": print("=" * 60) print("BIGQUERY CLEANED LAYER ETL") print("Kimball DW Architecture") - print(" Input : STAGING (Silver) -> staging_integrated") - print(" Output : STAGING (Silver) -> cleaned_integrated") - print(" Audit : AUDIT -> etl_logs, etl_metadata") + print(" Input : STAGING (Silver) → staging_integrated") + print(" Output : STAGING (Silver) → cleaned_integrated") + print(" Audit : AUDIT → etl_logs, etl_metadata") print("=" * 60) logger = setup_logging() client = get_bigquery_client() df_staging = load_staging_data(client) - print("\n[1/1] Cleaned Integration -> STAGING (Silver)...") + print("\n[1/1] Cleaned Integration → STAGING (Silver)...") loader = CleanedDataLoader(client, load_mode='full_refresh') final_count = loader.run(df_staging) print("\n" + "=" * 60) - print("[OK] CLEANED LAYER ETL COMPLETED") - print(f" STAGING (Silver) : cleaned_integrated ({final_count:,} rows)") - print(f" AUDIT : etl_logs, etl_metadata") + print("✓ CLEANED LAYER ETL COMPLETED") + print(f" 🥈 STAGING (Silver) : cleaned_integrated ({final_count:,} rows)") + print(f" 📋 AUDIT : etl_logs, etl_metadata") print("=" * 60) \ No newline at end of file diff --git a/scripts/bigquery_dimensional_model.py b/scripts/bigquery_dimensional_model.py index 9b23f02..a5e665c 100644 --- a/scripts/bigquery_dimensional_model.py +++ b/scripts/bigquery_dimensional_model.py @@ -46,9 +46,9 @@ class DimensionalModelLoader: Loader untuk dimensional model ke DW layer (Gold) — fs_asean_gold. Kimball context: - Input : cleaned_integrated -> STAGING (Silver) — fs_asean_silver - Output : dim_* + fact_* -> DW (Gold) — fs_asean_gold - Audit : etl_logs, etl_metadata -> AUDIT — fs_asean_audit + Input : cleaned_integrated → STAGING (Silver) — fs_asean_silver + Output : dim_* + fact_* → DW (Gold) — fs_asean_gold + Audit : etl_logs, etl_metadata → AUDIT — fs_asean_audit Pipeline steps: 1. Load dim_country @@ -117,7 +117,7 @@ class DimensionalModelLoader: """ try: self.client.query(query).result() - self.logger.info(f" [OK] FK: {table_name}.{fk_column} -> {ref_table}.{ref_column}") + self.logger.info(f" [OK] FK: {table_name}.{fk_column} → {ref_table}.{ref_column}") except Exception as e: if "already exists" in str(e).lower(): self.logger.info(f" [INFO] FK already exists: {constraint_name}") @@ -129,7 +129,7 @@ class DimensionalModelLoader: # ------------------------------------------------------------------ def _save_table_metadata(self, table_name: str): - meta = self.load_metadata[table_name] + meta = self.load_metadata[table_name] metadata = { 'source_class' : self.__class__.__name__, 'table_name' : table_name, @@ -145,7 +145,7 @@ class DimensionalModelLoader: } try: save_etl_metadata(self.client, metadata) - self.logger.info(f" Metadata -> [AUDIT] etl_metadata") + self.logger.info(f" Metadata → [AUDIT] etl_metadata") except Exception as e: self.logger.warning(f" [WARN] Could not save metadata for {table_name}: {e}") @@ -156,13 +156,13 @@ class DimensionalModelLoader: def load_dim_time(self): table_name = 'dim_time' self.load_metadata[table_name]['start_time'] = datetime.now() - self.logger.info("Loading dim_time -> [DW/Gold] fs_asean_gold...") + self.logger.info("Loading dim_time → [DW/Gold] fs_asean_gold...") try: if 'year_range' in self.df_clean.columns: dim_time = self.df_clean[['year', 'year_range']].drop_duplicates().copy() else: - dim_time = self.df_clean[['year']].drop_duplicates().copy() + dim_time = self.df_clean[['year']].drop_duplicates().copy() dim_time['year_range'] = None dim_time['year'] = dim_time['year'].astype(int) @@ -194,10 +194,10 @@ class DimensionalModelLoader: pass return pd.Series({'year': year, 'start_year': start_year, 'end_year': end_year}) - parsed = dim_time.apply(parse_year_range, axis=1) - dim_time['year'] = parsed['year'].astype(int) - dim_time['start_year'] = parsed['start_year'].astype(int) - dim_time['end_year'] = parsed['end_year'].astype(int) + parsed = dim_time.apply(parse_year_range, axis=1) + dim_time['year'] = parsed['year'].astype(int) + dim_time['start_year'] = parsed['start_year'].astype(int) + dim_time['end_year'] = parsed['end_year'].astype(int) dim_time['is_year_range'] = (dim_time['start_year'] != dim_time['end_year']) dim_time['decade'] = (dim_time['year'] // 10) * 10 dim_time['is_range'] = (dim_time['start_year'] != dim_time['end_year']).astype(int) @@ -229,7 +229,7 @@ class DimensionalModelLoader: ) log_update(self.client, 'DW', table_name, 'full_load', rows_loaded) self._save_table_metadata(table_name) - self.logger.info(f" dim_time: {rows_loaded} rows\n") + self.logger.info(f" ✓ dim_time: {rows_loaded} rows\n") return rows_loaded except Exception as e: @@ -240,11 +240,11 @@ class DimensionalModelLoader: def load_dim_country(self): table_name = 'dim_country' self.load_metadata[table_name]['start_time'] = datetime.now() - self.logger.info("Loading dim_country -> [DW/Gold] fs_asean_gold...") + self.logger.info("Loading dim_country → [DW/Gold] fs_asean_gold...") try: - dim_country = self.df_clean[['country']].drop_duplicates().copy() - dim_country.columns = ['country_name'] + dim_country = self.df_clean[['country']].drop_duplicates().copy() + dim_country.columns = ['country_name'] region_mapping = { 'Brunei Darussalam': ('Southeast Asia', 'ASEAN'), @@ -270,9 +270,7 @@ class DimensionalModelLoader: lambda x: region_mapping.get(x, ('Unknown', 'Unknown'))[1]) dim_country['iso_code'] = dim_country['country_name'].map(iso_mapping) - dim_country_final = dim_country[ - ['country_name', 'region', 'subregion', 'iso_code'] - ].copy() + dim_country_final = dim_country[['country_name', 'region', 'subregion', 'iso_code']].copy() dim_country_final = dim_country_final.reset_index(drop=True) dim_country_final.insert(0, 'country_id', range(1, len(dim_country_final) + 1)) @@ -295,7 +293,7 @@ class DimensionalModelLoader: ) log_update(self.client, 'DW', table_name, 'full_load', rows_loaded) self._save_table_metadata(table_name) - self.logger.info(f" dim_country: {rows_loaded} rows\n") + self.logger.info(f" ✓ dim_country: {rows_loaded} rows\n") return rows_loaded except Exception as e: @@ -304,83 +302,58 @@ class DimensionalModelLoader: raise def load_dim_indicator(self): - """ - Load dim_indicator ke Gold layer. - - Kolom yang dimuat: - indicator_id — surrogate key - indicator_name — nama standar indikator - indicator_category — kategori (Health & Nutrition, dll.) - unit — satuan ukuran - direction — higher_better / lower_better - """ table_name = 'dim_indicator' self.load_metadata[table_name]['start_time'] = datetime.now() - self.logger.info("Loading dim_indicator -> [DW/Gold] fs_asean_gold...") + self.logger.info("Loading dim_indicator → [DW/Gold] fs_asean_gold...") try: has_direction = 'direction' in self.df_clean.columns has_unit = 'unit' in self.df_clean.columns has_category = 'indicator_category' in self.df_clean.columns - dim_indicator = self.df_clean[['indicator_standardized']].drop_duplicates().copy() - dim_indicator.columns = ['indicator_name'] + dim_indicator = self.df_clean[['indicator_standardized']].drop_duplicates().copy() + dim_indicator.columns = ['indicator_name'] - # Unit if has_unit: unit_map = self.df_clean[['indicator_standardized', 'unit']].drop_duplicates() - unit_map.columns = ['indicator_name', 'unit'] - dim_indicator = dim_indicator.merge(unit_map, on='indicator_name', how='left') + unit_map.columns = ['indicator_name', 'unit'] + dim_indicator = dim_indicator.merge(unit_map, on='indicator_name', how='left') else: dim_indicator['unit'] = None - # Direction if has_direction: dir_map = self.df_clean[['indicator_standardized', 'direction']].drop_duplicates() - dir_map.columns = ['indicator_name', 'direction'] - dim_indicator = dim_indicator.merge(dir_map, on='indicator_name', how='left') + dir_map.columns = ['indicator_name', 'direction'] + dim_indicator = dim_indicator.merge(dir_map, on='indicator_name', how='left') self.logger.info(" [OK] direction column from cleaned_integrated") else: dim_indicator['direction'] = 'higher_better' self.logger.warning(" [WARN] direction not found, default: higher_better") - # Indicator category if has_category: - cat_map = self.df_clean[ - ['indicator_standardized', 'indicator_category'] - ].drop_duplicates() - cat_map.columns = ['indicator_name', 'indicator_category'] - dim_indicator = dim_indicator.merge(cat_map, on='indicator_name', how='left') + cat_map = self.df_clean[['indicator_standardized', 'indicator_category']].drop_duplicates() + cat_map.columns = ['indicator_name', 'indicator_category'] + dim_indicator = dim_indicator.merge(cat_map, on='indicator_name', how='left') else: def categorize_indicator(name): n = str(name).lower() - if any(w in n for w in [ - 'undernourishment', 'malnutrition', 'stunting', - 'wasting', 'anemia', 'anaemia', 'food security', - 'food insecure', 'hunger' - ]): + if any(w in n for w in ['undernourishment', 'malnutrition', 'stunting', + 'wasting', 'anemia', 'food security', 'food insecure', 'hunger']): return 'Health & Nutrition' - elif any(w in n for w in [ - 'production', 'yield', 'cereal', 'crop', - 'import dependency', 'share of dietary' - ]): + elif any(w in n for w in ['production', 'yield', 'cereal', 'crop', + 'import dependency', 'share of dietary']): return 'Agricultural Production' elif any(w in n for w in ['import', 'export', 'trade']): return 'Trade' elif any(w in n for w in ['gdp', 'income', 'economic']): return 'Economic' - elif any(w in n for w in [ - 'water', 'sanitation', 'infrastructure', 'rail' - ]): + elif any(w in n for w in ['water', 'sanitation', 'infrastructure', 'rail']): return 'Infrastructure' else: - return 'Supporting' - dim_indicator['indicator_category'] = dim_indicator['indicator_name'].apply( - categorize_indicator - ) - - dim_indicator = dim_indicator.drop_duplicates(subset=['indicator_name'], keep='first') + return 'Other' + dim_indicator['indicator_category'] = dim_indicator['indicator_name'].apply(categorize_indicator) + dim_indicator = dim_indicator.drop_duplicates(subset=['indicator_name'], keep='first') dim_indicator_final = dim_indicator[ ['indicator_name', 'indicator_category', 'unit', 'direction'] ].copy() @@ -401,22 +374,17 @@ class DimensionalModelLoader: ) self._add_primary_key(table_name, 'indicator_id') - # Log distribusi - for label, col in [ - ('Categories', 'indicator_category'), - ('Direction', 'direction'), - ]: + for label, col in [('Categories', 'indicator_category'), ('Direction', 'direction')]: self.logger.info(f" {label}:") for val, cnt in dim_indicator_final[col].value_counts().items(): - pct = cnt / len(dim_indicator_final) * 100 - self.logger.info(f" - {val}: {cnt} ({pct:.1f}%)") + self.logger.info(f" - {val}: {cnt} ({cnt/len(dim_indicator_final)*100:.1f}%)") self.load_metadata[table_name].update( {'rows_loaded': rows_loaded, 'status': 'success', 'end_time': datetime.now()} ) log_update(self.client, 'DW', table_name, 'full_load', rows_loaded) self._save_table_metadata(table_name) - self.logger.info(f" dim_indicator: {rows_loaded} rows\n") + self.logger.info(f" ✓ dim_indicator: {rows_loaded} rows\n") return rows_loaded except Exception as e: @@ -427,7 +395,7 @@ class DimensionalModelLoader: def load_dim_source(self): table_name = 'dim_source' self.load_metadata[table_name]['start_time'] = datetime.now() - self.logger.info("Loading dim_source -> [DW/Gold] fs_asean_gold...") + self.logger.info("Loading dim_source → [DW/Gold] fs_asean_gold...") try: source_details = { @@ -487,7 +455,7 @@ class DimensionalModelLoader: ) log_update(self.client, 'DW', table_name, 'full_load', rows_loaded) self._save_table_metadata(table_name) - self.logger.info(f" dim_source: {rows_loaded} rows\n") + self.logger.info(f" ✓ dim_source: {rows_loaded} rows\n") return rows_loaded except Exception as e: @@ -498,15 +466,15 @@ class DimensionalModelLoader: def load_dim_pillar(self): table_name = 'dim_pillar' self.load_metadata[table_name]['start_time'] = datetime.now() - self.logger.info("Loading dim_pillar -> [DW/Gold] fs_asean_gold...") + self.logger.info("Loading dim_pillar → [DW/Gold] fs_asean_gold...") try: pillar_codes = { 'Availability': 'AVL', 'Access' : 'ACC', - 'Utilization' : 'UTL', 'Stability': 'STB', 'Supporting': 'SPT', + 'Utilization' : 'UTL', 'Stability': 'STB', 'Other': 'OTH', } pillars_data = [ - {'pillar_name': p, 'pillar_code': pillar_codes.get(p, 'SPT')} + {'pillar_name': p, 'pillar_code': pillar_codes.get(p, 'OTH')} for p in self.df_clean['pillar'].unique() ] @@ -533,7 +501,7 @@ class DimensionalModelLoader: ) log_update(self.client, 'DW', table_name, 'full_load', rows_loaded) self._save_table_metadata(table_name) - self.logger.info(f" dim_pillar: {rows_loaded} rows\n") + self.logger.info(f" ✓ dim_pillar: {rows_loaded} rows\n") return rows_loaded except Exception as e: @@ -548,9 +516,10 @@ class DimensionalModelLoader: def load_fact_food_security(self): table_name = 'fact_food_security' self.load_metadata[table_name]['start_time'] = datetime.now() - self.logger.info("Loading fact_food_security -> [DW/Gold] fs_asean_gold...") + self.logger.info("Loading fact_food_security → [DW/Gold] fs_asean_gold...") try: + # Load dims dari Gold untuk FK resolution dim_country = read_from_bigquery(self.client, 'dim_country', layer='gold') dim_indicator = read_from_bigquery(self.client, 'dim_indicator', layer='gold') dim_time = read_from_bigquery(self.client, 'dim_time', layer='gold') @@ -592,9 +561,9 @@ class DimensionalModelLoader: fact_table['start_year'] = fact_table['year'].astype(int) fact_table['end_year'] = fact_table['year'].astype(int) + # Resolve FKs fact_table = fact_table.merge( - dim_country[['country_id', 'country_name']].rename( - columns={'country_name': 'country'}), + dim_country[['country_id', 'country_name']].rename(columns={'country_name': 'country'}), on='country', how='left' ) fact_table = fact_table.merge( @@ -607,16 +576,15 @@ class DimensionalModelLoader: on=['start_year', 'end_year'], how='left' ) fact_table = fact_table.merge( - dim_source[['source_id', 'source_name']].rename( - columns={'source_name': 'source'}), + dim_source[['source_id', 'source_name']].rename(columns={'source_name': 'source'}), on='source', how='left' ) fact_table = fact_table.merge( - dim_pillar[['pillar_id', 'pillar_name']].rename( - columns={'pillar_name': 'pillar'}), + dim_pillar[['pillar_id', 'pillar_name']].rename(columns={'pillar_name': 'pillar'}), on='pillar', how='left' ) + # Filter hanya row dengan FK lengkap fact_table = fact_table[ fact_table['country_id'].notna() & fact_table['indicator_id'].notna() & @@ -653,6 +621,7 @@ class DimensionalModelLoader: layer='gold', write_disposition="WRITE_TRUNCATE", schema=schema ) + # Add PK + FKs self._add_primary_key(table_name, 'fact_id') self._add_foreign_key(table_name, 'country_id', 'dim_country', 'country_id') self._add_foreign_key(table_name, 'indicator_id', 'dim_indicator', 'indicator_id') @@ -665,7 +634,7 @@ class DimensionalModelLoader: ) log_update(self.client, 'DW', table_name, 'full_load', rows_loaded) self._save_table_metadata(table_name) - self.logger.info(f" fact_food_security: {rows_loaded:,} rows\n") + self.logger.info(f" ✓ fact_food_security: {rows_loaded:,} rows\n") return rows_loaded except Exception as e: @@ -748,15 +717,11 @@ class DimensionalModelLoader: FROM `{get_table_id('dim_indicator', layer='gold')}` GROUP BY direction ORDER BY direction """ - df_dir = self.client.query(query_dir).result().to_dataframe( - create_bqstorage_client=False - ) + df_dir = self.client.query(query_dir).result().to_dataframe(create_bqstorage_client=False) if len(df_dir) > 0: self.logger.info(f"\n Direction Distribution:") for _, row in df_dir.iterrows(): - self.logger.info( - f" {row['direction']:15s}: {int(row['count']):>5,} indicators" - ) + self.logger.info(f" {row['direction']:15s}: {int(row['count']):>5,} indicators") self.logger.info("\n [OK] Validation completed") except Exception as e: @@ -773,19 +738,22 @@ class DimensionalModelLoader: self.pipeline_metadata['rows_fetched'] = len(self.df_clean) self.logger.info("\n" + "=" * 60) - self.logger.info("DIMENSIONAL MODEL LOAD — DW (Gold) -> fs_asean_gold") + self.logger.info("DIMENSIONAL MODEL LOAD — DW (Gold) → fs_asean_gold") self.logger.info("=" * 60) - self.logger.info("\nLOADING DIMENSION TABLES -> fs_asean_gold") + # Dimensions + self.logger.info("\nLOADING DIMENSION TABLES → fs_asean_gold") self.load_dim_country() self.load_dim_indicator() self.load_dim_time() self.load_dim_source() self.load_dim_pillar() - self.logger.info("\nLOADING FACT TABLE -> fs_asean_gold") + # Fact + self.logger.info("\nLOADING FACT TABLE → fs_asean_gold") self.load_fact_food_security() + # Validate self.validate_constraints() self.validate_data_load() @@ -794,23 +762,22 @@ class DimensionalModelLoader: total_loaded = sum(m['rows_loaded'] for m in self.load_metadata.values()) self.pipeline_metadata.update({ - 'end_time' : pipeline_end, - 'duration_seconds' : duration, - 'rows_transformed' : total_loaded, - 'rows_loaded' : total_loaded, + 'end_time' : pipeline_end, + 'duration_seconds' : duration, + 'rows_transformed' : total_loaded, + 'rows_loaded' : total_loaded, 'execution_timestamp': self.pipeline_metadata['start_time'], - 'completeness_pct' : 100.0, - 'config_snapshot' : json.dumps({'load_mode': 'full_refresh', 'layer': 'gold'}), - 'validation_metrics' : json.dumps( - {t: m['status'] for t, m in self.load_metadata.items()} - ), - 'table_name' : 'dimensional_model_pipeline', + 'completeness_pct' : 100.0, + 'config_snapshot' : json.dumps({'load_mode': 'full_refresh', 'layer': 'gold'}), + 'validation_metrics': json.dumps({t: m['status'] for t, m in self.load_metadata.items()}), + 'table_name' : 'dimensional_model_pipeline', }) try: save_etl_metadata(self.client, self.pipeline_metadata) except Exception as e: self.logger.warning(f" [WARN] Could not save pipeline metadata: {e}") + # Summary self.logger.info("\n" + "=" * 60) self.logger.info("DIMENSIONAL MODEL LOAD COMPLETED") self.logger.info("=" * 60) @@ -818,19 +785,20 @@ class DimensionalModelLoader: self.logger.info(f" Duration : {duration:.2f}s") self.logger.info(f" Tables :") for tbl, meta in self.load_metadata.items(): - icon = "OK" if meta['status'] == 'success' else "FAIL" - self.logger.info(f" [{icon}] {tbl:25s}: {meta['rows_loaded']:>10,} rows") - self.logger.info(f"\n Metadata -> [AUDIT] etl_metadata") + icon = "✓" if meta['status'] == 'success' else "✗" + self.logger.info(f" {icon} {tbl:25s}: {meta['rows_loaded']:>10,} rows") + self.logger.info(f"\n Metadata → [AUDIT] etl_metadata") self.logger.info("=" * 60) # ============================================================================= -# AIRFLOW TASK FUNCTIONS +# AIRFLOW TASK FUNCTIONS ← sama polanya dengan raw & cleaned layer # ============================================================================= def run_dimensional_model(): """ Airflow task: Load dimensional model dari cleaned_integrated. + Dipanggil oleh DAG setelah task cleaned_integration_to_silver selesai. """ from scripts.bigquery_config import get_bigquery_client @@ -849,9 +817,9 @@ if __name__ == "__main__": print("=" * 60) print("BIGQUERY DIMENSIONAL MODEL LOAD") print("Kimball DW Architecture") - print(" Input : STAGING (Silver) -> cleaned_integrated (fs_asean_silver)") - print(" Output : DW (Gold) -> dim_*, fact_* (fs_asean_gold)") - print(" Audit : AUDIT -> etl_logs, etl_metadata (fs_asean_audit)") + print(" Input : STAGING (Silver) → cleaned_integrated (fs_asean_silver)") + print(" Output : DW (Gold) → dim_*, fact_* (fs_asean_gold)") + print(" Audit : AUDIT → etl_logs, etl_metadata (fs_asean_audit)") print("=" * 60) logger = setup_logging() @@ -859,22 +827,24 @@ if __name__ == "__main__": print("\nLoading cleaned_integrated (fs_asean_silver)...") df_clean = read_from_bigquery(client, 'cleaned_integrated', layer='silver') - print(f" Loaded : {len(df_clean):,} rows") + print(f" ✓ Loaded : {len(df_clean):,} rows") print(f" Columns : {len(df_clean.columns)}") print(f" Sources : {df_clean['source'].nunique()}") print(f" Indicators : {df_clean['indicator_standardized'].nunique()}") print(f" Countries : {df_clean['country'].nunique()}") - print(f" Year range : {int(df_clean['year'].min())}-{int(df_clean['year'].max())}") + print(f" Year range : {int(df_clean['year'].min())}–{int(df_clean['year'].max())}") if 'direction' in df_clean.columns: print(f" Direction : {df_clean['direction'].value_counts().to_dict()}") + else: + print(f" [WARN] direction column not found — run bigquery_cleaned_layer.py first") - print("\n[1/1] Dimensional Model Load -> DW (Gold)...") + print("\n[1/1] Dimensional Model Load → DW (Gold)...") loader = DimensionalModelLoader(client, df_clean) loader.run() print("\n" + "=" * 60) - print("[OK] DIMENSIONAL MODEL ETL COMPLETED") - print(" DW (Gold) : dim_country, dim_indicator, dim_time,") - print(" dim_source, dim_pillar, fact_food_security") - print(" AUDIT : etl_logs, etl_metadata") + print("✓ DIMENSIONAL MODEL ETL COMPLETED") + print(" 🥇 DW (Gold) : dim_country, dim_indicator, dim_time,") + print(" dim_source, dim_pillar, fact_food_security") + print(" 📋 AUDIT : etl_logs, etl_metadata") print("=" * 60) \ No newline at end of file