From dc981aacab564b5b8980b4c9617a573279d40719 Mon Sep 17 00:00:00 2001 From: Debby Date: Sat, 28 Mar 2026 19:15:24 +0700 Subject: [PATCH] SDGS MDGS indicator --- scripts/bigquery_aggregate_layer.py | 384 ++++++++++++++++---------- scripts/bigquery_analytical_layer.py | 317 ++++++++++++++++++--- scripts/bigquery_cleaned_layer.py | 195 +++++++++---- scripts/bigquery_dimensional_model.py | 245 ++++++++++------ 4 files changed, 812 insertions(+), 329 deletions(-) diff --git a/scripts/bigquery_aggregate_layer.py b/scripts/bigquery_aggregate_layer.py index c5c5f9e..36c2e8b 100644 --- a/scripts/bigquery_aggregate_layer.py +++ b/scripts/bigquery_aggregate_layer.py @@ -1,7 +1,14 @@ """ BIGQUERY ANALYSIS LAYER - FOOD SECURITY AGGREGATION Semua agregasi pakai norm_value dari _get_norm_value_df() -UPDATED: Simpan 6 tabel ke fs_asean_gold (layer='gold'): + +UPDATED: +- _classify_indicators() membaca kolom 'framework' langsung dari + fact_asean_food_security_selected (bukan heuristik gap min_year). +- Kolom 'framework' sudah ditanam sejak bigquery_cleaned_layer.py + berdasarkan daftar eksplisit SDG Goal 2 (2030 Agenda, versi Maret 2020). + +Simpan 6 tabel ke fs_asean_gold (layer='gold'): - agg_pillar_composite - agg_pillar_by_country - agg_framework_by_country @@ -9,7 +16,8 @@ UPDATED: Simpan 6 tabel ke fs_asean_gold (layer='gold'): - agg_narrative_overview - agg_narrative_pillar -SOURCE TABLE: fact_asean_food_security_selected (sudah include nama + ID) +SOURCE TABLE: fact_asean_food_security_selected + (sudah include country_name, indicator_name, pillar_name, direction, framework) """ import pandas as pd @@ -106,7 +114,9 @@ def add_yoy(df: pd.DataFrame, group_cols: list, score_col: str) -> pd.DataFrame: return df -def safe_int(series: pd.Series, fill: int = 0, col_name: str = "", logger=None) -> pd.Series: +def safe_int( + series: pd.Series, fill: int = 0, col_name: str = "", logger=None +) -> pd.Series: n_nan = series.isna().sum() if n_nan > 0 and logger: logger.warning( @@ -115,7 +125,9 @@ def safe_int(series: pd.Series, fill: int = 0, col_name: str = "", logger=None) return series.fillna(fill).astype(int) -def check_and_dedup(df: pd.DataFrame, key_cols: list, context: str = "", logger=None) -> pd.DataFrame: +def check_and_dedup( + df: pd.DataFrame, key_cols: list, context: str = "", logger=None +) -> pd.DataFrame: dupes = df.duplicated(subset=key_cols, keep=False) if dupes.any(): n_dupes = dupes.sum() @@ -134,18 +146,16 @@ def check_and_dedup(df: pd.DataFrame, key_cols: list, context: str = "", logger= # ============================================================================= -# NARRATIVE BUILDER FUNCTIONS (pure functions, easy to unit-test) +# NARRATIVE BUILDER FUNCTIONS # ============================================================================= def _fmt_score(score) -> str: - """Format score to 2 decimal places.""" if score is None or (isinstance(score, float) and np.isnan(score)): return "N/A" return f"{score:.2f}" def _fmt_delta(delta) -> str: - """Format YoY delta with sign and 2 decimal places.""" if delta is None or (isinstance(delta, float) and np.isnan(delta)): return "N/A" sign = "+" if delta >= 0 else "" @@ -339,9 +349,9 @@ def _build_pillar_narrative( f"for the {pillar_name} pillar in {year}" ) - if most_improved_pillar and most_improved_delta is not None \ - and most_declined_pillar and most_declined_delta is not None \ - and most_improved_pillar != most_declined_pillar: + if (most_improved_pillar and most_improved_delta is not None + and most_declined_pillar and most_declined_delta is not None + and most_improved_pillar != most_declined_pillar): sent4 += ( f". Across all pillars, {most_improved_pillar} showed the greatest improvement " f"({_fmt_delta(most_improved_delta)} pts), while {most_declined_pillar} " @@ -390,20 +400,14 @@ class FoodSecurityAggregator: self.logger.info("STEP 1: LOAD DATA from fs_asean_gold") self.logger.info("=" * 70) - # ----------------------------------------------------------------------- - # CHANGED: sumber tabel -> fact_asean_food_security_selected - # Tabel ini sudah include: country_name, indicator_name, pillar_name, - # direction, year -> tidak perlu join ke dim_* lagi - # ----------------------------------------------------------------------- self.df = read_from_bigquery( self.client, "fact_asean_food_security_selected", layer='gold' ) self.logger.info(f" fact_asean_food_security_selected : {len(self.df):,} rows") - # Validasi kolom wajib yang harus sudah ada di tabel baru required_cols = { "country_id", "country_name", - "indicator_id", "indicator_name", "direction", + "indicator_id", "indicator_name", "direction", "framework", "pillar_id", "pillar_name", "time_id", "year", "value", @@ -412,14 +416,14 @@ class FoodSecurityAggregator: if missing_cols: raise ValueError( f"Kolom berikut tidak ditemukan di fact_asean_food_security_selected: " - f"{missing_cols}" + f"{missing_cols}\n" + f"Pastikan pipeline dijalankan berurutan:\n" + f" 1. bigquery_cleaned_layer.py\n" + f" 2. bigquery_dimensional_model.py\n" + f" 3. bigquery_analytical_layer.py\n" + f" 4. bigquery_analysis_layer.py (file ini)" ) - # ----------------------------------------------------------------------- - # Tidak perlu join ke dim_* lagi karena semua nama sudah ada. - # Hanya load dim_indicator untuk keperluan fallback / referensi direction - # jika ada NULL yang perlu di-fill. - # ----------------------------------------------------------------------- n_null_dir = self.df["direction"].isna().sum() if n_null_dir > 0: self.logger.warning( @@ -427,12 +431,24 @@ class FoodSecurityAggregator: ) self.df["direction"] = self.df["direction"].fillna("positive") + n_null_fw = self.df["framework"].isna().sum() + if n_null_fw > 0: + self.logger.warning( + f" [FRAMEWORK] {n_null_fw} rows dengan framework NULL -> diisi 'MDGs'" + ) + self.df["framework"] = self.df["framework"].fillna("MDGs") + dir_dist = self.df.drop_duplicates("indicator_id")["direction"].value_counts() self.logger.info(f"\n Distribusi direction per indikator:") for d, cnt in dir_dist.items(): tag = "INVERT" if _should_invert(d, self.logger, "load_data check") else "normal" self.logger.info(f" {d:<25} : {cnt:>3} indikator [{tag}]") + fw_dist = self.df.drop_duplicates("indicator_id")["framework"].value_counts() + self.logger.info(f"\n Distribusi framework per indikator:") + for fw, cnt in fw_dist.items(): + self.logger.info(f" {fw:<10} : {cnt:>3} indikator") + self.logger.info(f"\n Rows loaded : {len(self.df):,}") self.logger.info(f" Negara : {self.df['country_id'].nunique()}") self.logger.info(f" Indikator : {self.df['indicator_id'].nunique()}") @@ -445,57 +461,66 @@ class FoodSecurityAggregator: # ========================================================================= def _classify_indicators(self): + """ + Klasifikasi indikator ke MDGs / SDGs. + + UPDATED: Membaca kolom 'framework' langsung dari tabel + fact_asean_food_security_selected — tidak lagi menggunakan heuristik + gap detection berdasarkan min_year. Klasifikasi eksplisit sudah dilakukan + di bigquery_cleaned_layer.py berdasarkan daftar resmi SDG Goal 2. + + sdgs_start_year dihitung dari tahun minimum data SDG yang tersedia, + bukan dari asumsi threshold hardcoded. + """ self.logger.info("\n" + "=" * 70) self.logger.info("STEP 1b: KLASIFIKASI INDIKATOR -> MDGs / SDGs") self.logger.info("=" * 70) - ind_min_year = ( - self.df.groupby("indicator_id")["year"] - .min().reset_index() - .rename(columns={"year": "min_year"}) - ) - - unique_years = sorted(ind_min_year["min_year"].unique()) - self.logger.info(f"\n Unique min_year per indikator: {unique_years}") - - if len(unique_years) == 1: - gap_threshold = unique_years[0] + 1 - self.logger.info(" Hanya 1 cluster -> semua = MDGs") - else: - gaps = [ - (unique_years[i+1] - unique_years[i], unique_years[i], unique_years[i+1]) - for i in range(len(unique_years) - 1) - ] - gaps.sort(reverse=True) - largest_gap_size, y_before, y_after = gaps[0] - gap_threshold = y_after - self.logger.info(f" Gap terbesar: {y_before} -> {y_after} (selisih {largest_gap_size})") - - ind_min_year["framework"] = ind_min_year["min_year"].apply( - lambda y: "MDGs" if int(y) < gap_threshold else "SDGs" - ) - - sdgs_rows = ind_min_year[ind_min_year["framework"] == "SDGs"] - self.sdgs_start_year = ( - int(sdgs_rows["min_year"].min()) if not sdgs_rows.empty - else int(self.df["year"].max()) + 1 - ) - - self.logger.info(f" sdgs_start_year: {self.sdgs_start_year}") + if "framework" not in self.df.columns: + raise ValueError( + "Kolom 'framework' tidak ditemukan di fact_asean_food_security_selected.\n" + "Pastikan pipeline dijalankan berurutan:\n" + " 1. bigquery_cleaned_layer.py (assign_framework)\n" + " 2. bigquery_dimensional_model.py (dim_indicator + framework)\n" + " 3. bigquery_analytical_layer.py (propagasi ke fact_selected)\n" + " 4. bigquery_analysis_layer.py (file ini)" + ) + # Baca langsung dari kolom — tidak ada gap detection / heuristik self.mdgs_indicator_ids = set( - ind_min_year[ind_min_year["framework"] == "MDGs"]["indicator_id"].tolist() + self.df[self.df["framework"] == "MDGs"]["indicator_id"].unique().tolist() ) self.sdgs_indicator_ids = set( - ind_min_year[ind_min_year["framework"] == "SDGs"]["indicator_id"].tolist() + self.df[self.df["framework"] == "SDGs"]["indicator_id"].unique().tolist() ) - self.logger.info(f" MDGs: {len(self.mdgs_indicator_ids)} indicators") - self.logger.info(f" SDGs: {len(self.sdgs_indicator_ids)} indicators") + # sdgs_start_year: tahun pertama kemunculan data SDG di dataset + # Digunakan untuk memisahkan era pre-SDG (MDGs only) dan era campuran (MDGs + SDGs) + sdgs_rows = self.df[self.df["framework"] == "SDGs"] + if not sdgs_rows.empty: + self.sdgs_start_year = int(sdgs_rows["year"].min()) + else: + # Tidak ada SDG sama sekali — set ke tahun setelah akhir data + self.sdgs_start_year = int(self.df["year"].max()) + 1 + self.logger.warning( + f" [WARN] Tidak ada indikator SDGs. sdgs_start_year = {self.sdgs_start_year}" + ) - self.df = self.df.merge( - ind_min_year[["indicator_id", "framework"]], on="indicator_id", how="left" - ) + self.logger.info(f"\n Sumber klasifikasi : kolom 'framework' dari tabel") + self.logger.info(f" MDGs : {len(self.mdgs_indicator_ids)} indikator") + self.logger.info(f" SDGs : {len(self.sdgs_indicator_ids)} indikator") + self.logger.info(f" sdgs_start_year : {self.sdgs_start_year} (dari data aktual)") + + # Log detail per framework untuk verifikasi + for fw in ["MDGs", "SDGs"]: + fw_inds = ( + self.df[self.df["framework"] == fw] + .drop_duplicates("indicator_id")[["indicator_id", "indicator_name"]] + .sort_values("indicator_name") + ) + self.logger.info(f"\n {fw} indicators ({len(fw_inds)}):") + for _, row in fw_inds.iterrows(): + self.logger.info(f" [{int(row['indicator_id'])}] {row['indicator_name']}") # ========================================================================= # CORE HELPER: normalisasi raw value per indikator @@ -520,9 +545,9 @@ class FoodSecurityAggregator: norm_parts.append(grp) continue - raw = grp.loc[valid_mask, "value"].values - v_min, v_max = raw.min(), raw.max() - normed = np.full(len(grp), np.nan) + raw = grp.loc[valid_mask, "value"].values + v_min, v_max = raw.min(), raw.max() + normed = np.full(len(grp), np.nan) if v_min == v_max: normed[valid_mask.values] = 0.5 else: @@ -553,9 +578,9 @@ class FoodSecurityAggregator: df_normed .groupby(["pillar_id", "pillar_name", "year"]) .agg( - pillar_norm =("norm_value", "mean"), - n_indicators=("indicator_id", "nunique"), - n_countries =("country_id", "nunique"), + pillar_norm =("norm_value", "mean"), + n_indicators =("indicator_id", "nunique"), + n_countries =("country_id", "nunique"), ) .reset_index() ) @@ -696,13 +721,18 @@ class FoodSecurityAggregator: "score_1_100", "n_indicators", "composite_score" ]] .copy() - .rename(columns={"score_1_100": "framework_score_1_100", "composite_score": "framework_norm"}) + .rename(columns={ + "score_1_100" : "framework_score_1_100", + "composite_score": "framework_norm" + }) ) agg_total["framework"] = "Total" parts.append(agg_total) # Layer MDGs — Era pre-SDGs = Total - pre_sdgs_rows = country_composite[country_composite["year"] < self.sdgs_start_year].copy() + pre_sdgs_rows = country_composite[ + country_composite["year"] < self.sdgs_start_year + ].copy() if not pre_sdgs_rows.empty: mdgs_pre = ( pre_sdgs_rows[[ @@ -710,12 +740,15 @@ class FoodSecurityAggregator: "score_1_100", "n_indicators", "composite_score" ]] .copy() - .rename(columns={"score_1_100": "framework_score_1_100", "composite_score": "framework_norm"}) + .rename(columns={ + "score_1_100" : "framework_score_1_100", + "composite_score": "framework_norm" + }) ) mdgs_pre["framework"] = "MDGs" parts.append(mdgs_pre) - # Layer MDGs — Era mixed + # Layer MDGs — Era mixed (setelah SDGs masuk) if self.mdgs_indicator_ids: df_mdgs_mixed = df_normed[ (df_normed["indicator_id"].isin(self.mdgs_indicator_ids)) & @@ -725,11 +758,16 @@ class FoodSecurityAggregator: agg_mdgs_mixed = ( df_mdgs_mixed .groupby(["country_id", "country_name", "year"]) - .agg(framework_norm=("norm_value", "mean"), n_indicators=("indicator_id", "nunique")) + .agg( + framework_norm=("norm_value", "mean"), + n_indicators =("indicator_id", "nunique") + ) .reset_index() ) if not NORMALIZE_FRAMEWORKS_JOINTLY: - agg_mdgs_mixed["framework_score_1_100"] = global_minmax(agg_mdgs_mixed["framework_norm"]) + agg_mdgs_mixed["framework_score_1_100"] = global_minmax( + agg_mdgs_mixed["framework_norm"] + ) agg_mdgs_mixed["framework"] = "MDGs" parts.append(agg_mdgs_mixed) @@ -743,22 +781,34 @@ class FoodSecurityAggregator: agg_sdgs = ( df_sdgs .groupby(["country_id", "country_name", "year"]) - .agg(framework_norm=("norm_value", "mean"), n_indicators=("indicator_id", "nunique")) + .agg( + framework_norm=("norm_value", "mean"), + n_indicators =("indicator_id", "nunique") + ) .reset_index() ) if not NORMALIZE_FRAMEWORKS_JOINTLY: - agg_sdgs["framework_score_1_100"] = global_minmax(agg_sdgs["framework_norm"]) + agg_sdgs["framework_score_1_100"] = global_minmax( + agg_sdgs["framework_norm"] + ) agg_sdgs["framework"] = "SDGs" parts.append(agg_sdgs) df = pd.concat(parts, ignore_index=True) if NORMALIZE_FRAMEWORKS_JOINTLY: - mixed_mask = (df["framework"].isin(["MDGs", "SDGs"])) & (df["year"] >= self.sdgs_start_year) + mixed_mask = ( + (df["framework"].isin(["MDGs", "SDGs"])) & + (df["year"] >= self.sdgs_start_year) + ) if mixed_mask.any(): - df.loc[mixed_mask, "framework_score_1_100"] = global_minmax(df.loc[mixed_mask, "framework_norm"]) + df.loc[mixed_mask, "framework_score_1_100"] = global_minmax( + df.loc[mixed_mask, "framework_norm"] + ) - df = check_and_dedup(df, ["country_id", "framework", "year"], context=table_name, logger=self.logger) + df = check_and_dedup( + df, ["country_id", "framework", "year"], context=table_name, logger=self.logger + ) df["rank_in_framework_year"] = ( df.groupby(["framework", "year"])["framework_score_1_100"] .rank(method="min", ascending=False) @@ -808,51 +858,62 @@ class FoodSecurityAggregator: country_composite = self._calc_country_composite_inmemory() country_norm = ( - df_normed.groupby(["country_id", "country_name", "year"])["norm_value"] - .mean().reset_index().rename(columns={"norm_value": "country_norm"}) + df_normed + .groupby(["country_id", "country_name", "year"])["norm_value"] + .mean().reset_index() + .rename(columns={"norm_value": "country_norm"}) ) asean_overall = ( country_norm.groupby("year") - .agg(asean_norm=("country_norm", "mean"), std_norm=("country_norm", "std"), - n_countries=("country_norm", "count")) + .agg( + asean_norm =("country_norm", "mean"), + std_norm =("country_norm", "std"), + n_countries =("country_norm", "count") + ) .reset_index() ) asean_overall["asean_score_1_100"] = global_minmax(asean_overall["asean_norm"]) asean_comp = ( country_composite.groupby("year")["composite_score"] - .mean().reset_index().rename(columns={"composite_score": "asean_composite"}) + .mean().reset_index() + .rename(columns={"composite_score": "asean_composite"}) ) asean_overall = asean_overall.merge(asean_comp, on="year", how="left") parts = [] # Layer TOTAL - total_cols = asean_overall[["year", "asean_score_1_100", "asean_norm", "std_norm", "n_countries"]].copy() - total_cols = total_cols.rename(columns={ + total_cols = asean_overall[[ + "year", "asean_score_1_100", "asean_norm", "std_norm", "n_countries" + ]].copy().rename(columns={ "asean_score_1_100": "framework_score_1_100", - "asean_norm": "framework_norm", - "n_countries": "n_countries_with_data", + "asean_norm" : "framework_norm", + "n_countries" : "n_countries_with_data", }) - n_ind_total = df_normed.groupby("year")["indicator_id"].nunique().reset_index().rename(columns={"indicator_id": "n_indicators"}) - total_cols = total_cols.merge(n_ind_total, on="year", how="left") + n_ind_total = ( + df_normed.groupby("year")["indicator_id"].nunique() + .reset_index().rename(columns={"indicator_id": "n_indicators"}) + ) + total_cols = total_cols.merge(n_ind_total, on="year", how="left") total_cols["framework"] = "Total" parts.append(total_cols) # Layer MDGs — pre-SDGs = Total pre_sdgs = asean_overall[asean_overall["year"] < self.sdgs_start_year].copy() if not pre_sdgs.empty: - mdgs_pre = pre_sdgs[["year", "asean_score_1_100", "asean_norm", "std_norm", "n_countries"]].copy() - mdgs_pre = mdgs_pre.rename(columns={ + mdgs_pre = pre_sdgs[[ + "year", "asean_score_1_100", "asean_norm", "std_norm", "n_countries" + ]].copy().rename(columns={ "asean_score_1_100": "framework_score_1_100", - "asean_norm": "framework_norm", - "n_countries": "n_countries_with_data", + "asean_norm" : "framework_norm", + "n_countries" : "n_countries_with_data", }) n_ind_pre = ( df_normed[df_normed["year"] < self.sdgs_start_year] .groupby("year")["indicator_id"].nunique() .reset_index().rename(columns={"indicator_id": "n_indicators"}) ) - mdgs_pre = mdgs_pre.merge(n_ind_pre, on="year", how="left") + mdgs_pre = mdgs_pre.merge(n_ind_pre, on="year", how="left") mdgs_pre["framework"] = "MDGs" parts.append(mdgs_pre) @@ -863,16 +924,25 @@ class FoodSecurityAggregator: (df_normed["year"] >= self.sdgs_start_year) ].copy() if not df_mdgs_mixed.empty: - cn = df_mdgs_mixed.groupby(["country_id", "year"])["norm_value"].mean().reset_index().rename(columns={"norm_value": "country_norm"}) + cn = ( + df_mdgs_mixed + .groupby(["country_id", "year"])["norm_value"].mean() + .reset_index().rename(columns={"norm_value": "country_norm"}) + ) asean_mdgs = cn.groupby("year").agg( - framework_norm=("country_norm", "mean"), - std_norm=("country_norm", "std"), - n_countries_with_data=("country_id", "count"), + framework_norm =("country_norm", "mean"), + std_norm =("country_norm", "std"), + n_countries_with_data =("country_id", "count"), ).reset_index() - n_ind_mdgs = df_mdgs_mixed.groupby("year")["indicator_id"].nunique().reset_index().rename(columns={"indicator_id": "n_indicators"}) + n_ind_mdgs = ( + df_mdgs_mixed.groupby("year")["indicator_id"].nunique() + .reset_index().rename(columns={"indicator_id": "n_indicators"}) + ) asean_mdgs = asean_mdgs.merge(n_ind_mdgs, on="year", how="left") if not NORMALIZE_FRAMEWORKS_JOINTLY: - asean_mdgs["framework_score_1_100"] = global_minmax(asean_mdgs["framework_norm"]) + asean_mdgs["framework_score_1_100"] = global_minmax( + asean_mdgs["framework_norm"] + ) asean_mdgs["framework"] = "MDGs" parts.append(asean_mdgs) @@ -883,27 +953,43 @@ class FoodSecurityAggregator: (df_normed["year"] >= self.sdgs_start_year) ].copy() if not df_sdgs.empty: - cn = df_sdgs.groupby(["country_id", "year"])["norm_value"].mean().reset_index().rename(columns={"norm_value": "country_norm"}) + cn = ( + df_sdgs + .groupby(["country_id", "year"])["norm_value"].mean() + .reset_index().rename(columns={"norm_value": "country_norm"}) + ) asean_sdgs = cn.groupby("year").agg( - framework_norm=("country_norm", "mean"), - std_norm=("country_norm", "std"), - n_countries_with_data=("country_id", "count"), + framework_norm =("country_norm", "mean"), + std_norm =("country_norm", "std"), + n_countries_with_data =("country_id", "count"), ).reset_index() - n_ind_sdgs = df_sdgs.groupby("year")["indicator_id"].nunique().reset_index().rename(columns={"indicator_id": "n_indicators"}) + n_ind_sdgs = ( + df_sdgs.groupby("year")["indicator_id"].nunique() + .reset_index().rename(columns={"indicator_id": "n_indicators"}) + ) asean_sdgs = asean_sdgs.merge(n_ind_sdgs, on="year", how="left") if not NORMALIZE_FRAMEWORKS_JOINTLY: - asean_sdgs["framework_score_1_100"] = global_minmax(asean_sdgs["framework_norm"]) + asean_sdgs["framework_score_1_100"] = global_minmax( + asean_sdgs["framework_norm"] + ) asean_sdgs["framework"] = "SDGs" parts.append(asean_sdgs) df = pd.concat(parts, ignore_index=True) if NORMALIZE_FRAMEWORKS_JOINTLY: - mixed_mask = (df["framework"].isin(["MDGs", "SDGs"])) & (df["year"] >= self.sdgs_start_year) + mixed_mask = ( + (df["framework"].isin(["MDGs", "SDGs"])) & + (df["year"] >= self.sdgs_start_year) + ) if mixed_mask.any(): - df.loc[mixed_mask, "framework_score_1_100"] = global_minmax(df.loc[mixed_mask, "framework_norm"]) + df.loc[mixed_mask, "framework_score_1_100"] = global_minmax( + df.loc[mixed_mask, "framework_norm"] + ) - df = check_and_dedup(df, ["framework", "year"], context=table_name, logger=self.logger) + df = check_and_dedup( + df, ["framework", "year"], context=table_name, logger=self.logger + ) df = add_yoy(df, ["framework"], "framework_score_1_100") df["year"] = df["year"].astype(int) @@ -962,6 +1048,7 @@ class FoodSecurityAggregator: .copy() ) + # Gunakan kolom framework dari self.df untuk hitung MDG/SDG per tahun ind_year = self.df.drop_duplicates(subset=["indicator_id", "year", "framework"]) records = [] @@ -995,10 +1082,10 @@ class FoodSecurityAggregator: for _, cr in yr_country.iterrows(): cr_yoy = cr.get("year_over_year_change", None) ranking_list.append({ - "rank": int(cr["rank_in_framework_year"]), + "rank" : int(cr["rank_in_framework_year"]), "country_name": str(cr["country_name"]), - "score": round(float(cr["framework_score_1_100"]), 2), - "yoy_change": round(float(cr_yoy), 2) if pd.notna(cr_yoy) else None, + "score" : round(float(cr["framework_score_1_100"]), 2), + "yoy_change" : round(float(cr_yoy), 2) if pd.notna(cr_yoy) else None, }) country_ranking_json = json.dumps(ranking_list, ensure_ascii=False) @@ -1032,19 +1119,19 @@ class FoodSecurityAggregator: ) records.append({ - "year": yr, - "n_mdg_indicators": n_mdg, - "n_sdg_indicators": n_sdg, - "n_total_indicators": n_total_ind, - "asean_total_score": round(score, 2), - "yoy_change": yoy_val, - "yoy_change_pct": round(yoy_pct, 2) if yoy_pct is not None else None, - "country_ranking_json": country_ranking_json, + "year" : yr, + "n_mdg_indicators" : n_mdg, + "n_sdg_indicators" : n_sdg, + "n_total_indicators" : n_total_ind, + "asean_total_score" : round(score, 2), + "yoy_change" : yoy_val, + "yoy_change_pct" : round(yoy_pct, 2) if yoy_pct is not None else None, + "country_ranking_json" : country_ranking_json, "most_improved_country": most_improved_country, - "most_improved_delta": most_improved_delta, + "most_improved_delta" : most_improved_delta, "most_declined_country": most_declined_country, - "most_declined_delta": most_declined_delta, - "narrative_overview": narrative, + "most_declined_delta" : most_declined_delta, + "narrative_overview" : narrative, }) df = pd.DataFrame(records) @@ -1109,8 +1196,8 @@ class FoodSecurityAggregator: yr_pillars_yoy = yr_pillars.dropna(subset=["year_over_year_change"]) if not yr_pillars_yoy.empty: - best_p_idx = yr_pillars_yoy["year_over_year_change"].idxmax() - worst_p_idx = yr_pillars_yoy["year_over_year_change"].idxmin() + best_p_idx = yr_pillars_yoy["year_over_year_change"].idxmax() + worst_p_idx = yr_pillars_yoy["year_over_year_change"].idxmin() most_improved_pillar = str(yr_pillars_yoy.loc[best_p_idx, "pillar_name"]) most_improved_delta = round(float(yr_pillars_yoy.loc[best_p_idx, "year_over_year_change"]), 2) most_declined_pillar = str(yr_pillars_yoy.loc[worst_p_idx, "pillar_name"]) @@ -1163,17 +1250,17 @@ class FoodSecurityAggregator: ) records.append({ - "year": yr, - "pillar_id": p_id, - "pillar_name": p_name, - "pillar_score": round(p_score, 2), - "rank_in_year": p_rank, - "yoy_change": p_yoy_val, - "top_country": top_country, - "top_country_score": top_country_score, - "bottom_country": bot_country, + "year" : yr, + "pillar_id" : p_id, + "pillar_name" : p_name, + "pillar_score" : round(p_score, 2), + "rank_in_year" : p_rank, + "yoy_change" : p_yoy_val, + "top_country" : top_country, + "top_country_score" : top_country_score, + "bottom_country" : bot_country, "bottom_country_score": bot_country_score, - "narrative_pillar": narrative, + "narrative_pillar" : narrative, }) df = pd.DataFrame(records) @@ -1210,13 +1297,19 @@ class FoodSecurityAggregator: def _validate_mdgs_equals_total(self, df: pd.DataFrame, level: str = ""): self.logger.info(f"\n Validasi MDGs < {self.sdgs_start_year} == Total [{level}]:") group_by = ["year"] if level.startswith("asean") else ["country_id", "year"] - mdgs_pre = df[(df["framework"] == "MDGs") & (df["year"] < self.sdgs_start_year)][group_by + ["framework_score_1_100"]].rename(columns={"framework_score_1_100": "mdgs_score"}) - total_pre = df[(df["framework"] == "Total") & (df["year"] < self.sdgs_start_year)][group_by + ["framework_score_1_100"]].rename(columns={"framework_score_1_100": "total_score"}) + mdgs_pre = df[ + (df["framework"] == "MDGs") & (df["year"] < self.sdgs_start_year) + ][group_by + ["framework_score_1_100"]].rename(columns={"framework_score_1_100": "mdgs_score"}) + total_pre = df[ + (df["framework"] == "Total") & (df["year"] < self.sdgs_start_year) + ][group_by + ["framework_score_1_100"]].rename(columns={"framework_score_1_100": "total_score"}) if mdgs_pre.empty and total_pre.empty: self.logger.info(f" -> Tidak ada data pre-{self.sdgs_start_year} (skip)") return if mdgs_pre.empty or total_pre.empty: - self.logger.warning(f" -> [WARNING] Salah satu kosong: MDGs={len(mdgs_pre)}, Total={len(total_pre)}") + self.logger.warning( + f" -> [WARNING] Salah satu kosong: MDGs={len(mdgs_pre)}, Total={len(total_pre)}" + ) return check = mdgs_pre.merge(total_pre, on=group_by) max_diff = (check["mdgs_score"] - check["total_score"]).abs().max() @@ -1228,8 +1321,8 @@ class FoodSecurityAggregator: "rows_loaded": rows_loaded, "status": "success", "end_time": datetime.now(), }) log_update(self.client, "DW", table_name, "full_load", rows_loaded) - self.logger.info(f" ✓ {table_name}: {rows_loaded:,} rows → [Gold] fs_asean_gold") - self.logger.info(f" Metadata → [AUDIT] etl_logs") + self.logger.info(f" {table_name}: {rows_loaded:,} rows -> [Gold] fs_asean_gold") + self.logger.info(f" Metadata -> [AUDIT] etl_logs") def _fail(self, table_name: str, error: Exception): self.load_metadata[table_name].update({"status": "failed", "end_time": datetime.now()}) @@ -1248,6 +1341,8 @@ class FoodSecurityAggregator: self.logger.info(" Outputs : agg_pillar_composite | agg_pillar_by_country") self.logger.info(" agg_framework_by_country| agg_framework_asean") self.logger.info(" agg_narrative_overview | agg_narrative_pillar") + self.logger.info(" NOTE : framework (MDGs/SDGs) dibaca dari kolom tabel,") + self.logger.info(" bukan heuristik gap min_year") self.logger.info("=" * 70) self.load_data() @@ -1276,8 +1371,8 @@ class FoodSecurityAggregator: self.logger.info(f" Durasi : {duration:.2f}s") self.logger.info(f" Total rows : {total_rows:,}") for tbl, meta in self.load_metadata.items(): - icon = "✓" if meta["status"] == "success" else "✗" - self.logger.info(f" {icon} {tbl:<35} {meta['rows_loaded']:>10,}") + icon = "OK" if meta["status"] == "success" else "FAIL" + self.logger.info(f" [{icon}] {tbl:<35} {meta['rows_loaded']:>10,}") # ============================================================================= @@ -1312,6 +1407,7 @@ if __name__ == "__main__": print("=" * 70) print("FOOD SECURITY AGGREGATION -> fs_asean_gold") print(f" Source : fact_asean_food_security_selected") + print(f" Framework classification : dari kolom tabel (bukan heuristik)") print(f" NORMALIZE_FRAMEWORKS_JOINTLY : {NORMALIZE_FRAMEWORKS_JOINTLY}") print("=" * 70) diff --git a/scripts/bigquery_analytical_layer.py b/scripts/bigquery_analytical_layer.py index 018be28..8369311 100644 --- a/scripts/bigquery_analytical_layer.py +++ b/scripts/bigquery_analytical_layer.py @@ -8,7 +8,15 @@ Filtering Order: 3. Filter complete indicators PER COUNTRY (auto-detect start year, no gaps) 4. Filter countries with ALL pillars (FIXED SET) 5. Filter indicators with consistent presence across FIXED countries -6. Save analytical table (dengan nama/label lengkap untuk Looker Studio) +6. Calculate YoY per indicator per country +7. Save analytical table (dengan nama/label lengkap + kolom framework + YoY untuk Looker Studio) + +UPDATED: +- Kolom 'framework' (MDGs/SDGs) dipropagasi dari dim_indicator ke tabel output. + Hal ini memungkinkan Looker Studio melakukan filter/slice berdasarkan framework + tanpa perlu join ulang ke dim_indicator. +- Kolom 'yoy_change' dan 'yoy_pct' ditambahkan untuk analisis Year-over-Year + per indikator per negara langsung di Looker Studio. """ import pandas as pd @@ -46,9 +54,17 @@ class AnalyticalLayerLoader: 1. Complete per country (no gaps from start_year to end_year) 2. Filter countries with all pillars 3. Ensure indicators have consistent country count across all years - 4. Save dengan kolom lengkap (nama + ID) untuk kemudahan Looker Studio + 4. Calculate YoY (year-over-year) change per indicator per country + 5. Save dengan kolom lengkap (nama + ID + framework + YoY) untuk Looker Studio Output: fact_asean_food_security_selected -> DW layer (Gold) -> fs_asean_gold + + Kolom output: + country_id, country_name, + indicator_id, indicator_name, direction, framework, + pillar_id, pillar_name, + time_id, year, value, + yoy_change, yoy_pct """ def __init__(self, client: bigquery.Client): @@ -87,6 +103,7 @@ class AnalyticalLayerLoader: self.logger.info("=" * 80) try: + # Sertakan kolom framework dari dim_indicator dalam query query = f""" SELECT f.country_id, @@ -94,6 +111,7 @@ class AnalyticalLayerLoader: f.indicator_id, i.indicator_name, i.direction, + i.framework, f.pillar_id, p.pillar_name, f.time_id, @@ -110,15 +128,34 @@ class AnalyticalLayerLoader: JOIN `{get_table_id('dim_time', layer='gold')}` t ON f.time_id = t.time_id """ - self.logger.info("Loading fact table with dimensions...") - self.df_clean = self.client.query(query).result().to_dataframe(create_bqstorage_client=False) + self.logger.info("Loading fact table with dimensions (incl. framework)...") + self.df_clean = self.client.query(query).result().to_dataframe( + create_bqstorage_client=False + ) self.logger.info(f" Loaded: {len(self.df_clean):,} rows") if 'is_year_range' in self.df_clean.columns: yr = self.df_clean['is_year_range'].value_counts() self.logger.info(f" Breakdown:") - self.logger.info(f" Single years (is_year_range=False): {yr.get(False, 0):,}") - self.logger.info(f" Year ranges (is_year_range=True): {yr.get(True, 0):,}") + self.logger.info( + f" Single years (is_year_range=False): {yr.get(False, 0):,}" + ) + self.logger.info( + f" Year ranges (is_year_range=True): {yr.get(True, 0):,}" + ) + + # Validasi kolom framework tersedia + if 'framework' not in self.df_clean.columns: + raise ValueError( + "Kolom 'framework' tidak ditemukan di dim_indicator. " + "Pastikan bigquery_cleaned_layer.py dan bigquery_dimensional_model.py " + "sudah dijalankan dengan versi terbaru." + ) + + fw_dist = self.df_clean.drop_duplicates('indicator_id')['framework'].value_counts() + self.logger.info(f" Framework distribution (per indikator unik):") + for fw, cnt in fw_dist.items(): + self.logger.info(f" {fw}: {cnt} indicators") self.df_indicator = read_from_bigquery(self.client, 'dim_indicator', layer='gold') self.df_country = read_from_bigquery(self.client, 'dim_country', layer='gold') @@ -228,10 +265,15 @@ class AnalyticalLayerLoader: self.logger.info(f"\n [+] Valid: {len(valid_combinations):,}") self.logger.info(f" [-] Removed: {len(removed_combinations):,}") - df_valid = pd.DataFrame(valid_combinations) - df_valid['key'] = df_valid['country_id'].astype(str) + '_' + df_valid['indicator_id'].astype(str) - self.df_clean['key'] = (self.df_clean['country_id'].astype(str) + '_' + - self.df_clean['indicator_id'].astype(str)) + df_valid = pd.DataFrame(valid_combinations) + df_valid['key'] = ( + df_valid['country_id'].astype(str) + '_' + + df_valid['indicator_id'].astype(str) + ) + self.df_clean['key'] = ( + self.df_clean['country_id'].astype(str) + '_' + + self.df_clean['indicator_id'].astype(str) + ) original_count = len(self.df_clean) self.df_clean = self.df_clean[self.df_clean['key'].isin(df_valid['key'])].copy() @@ -265,13 +307,17 @@ class AnalyticalLayerLoader: f"{row['pillar_count']}/{total_pillars} pillars" ) - selected_countries = country_pillar_count[country_pillar_count['pillar_count'] == total_pillars] + selected_countries = country_pillar_count[ + country_pillar_count['pillar_count'] == total_pillars + ] self.selected_country_ids = selected_countries['country_id'].tolist() self.logger.info(f"\n FIXED SET: {len(self.selected_country_ids)} countries") original_count = len(self.df_clean) - self.df_clean = self.df_clean[self.df_clean['country_id'].isin(self.selected_country_ids)].copy() + self.df_clean = self.df_clean[ + self.df_clean['country_id'].isin(self.selected_country_ids) + ].copy() self.logger.info(f" Rows before: {original_count:,}") self.logger.info(f" Rows after: {len(self.df_clean):,}") @@ -285,7 +331,9 @@ class AnalyticalLayerLoader: indicator_country_start = self.df_clean.groupby([ 'indicator_id', 'indicator_name', 'country_id' ])['year'].min().reset_index() - indicator_country_start.columns = ['indicator_id', 'indicator_name', 'country_id', 'start_year'] + indicator_country_start.columns = [ + 'indicator_id', 'indicator_name', 'country_id', 'start_year' + ] indicator_max_start = indicator_country_start.groupby([ 'indicator_id', 'indicator_name' @@ -324,7 +372,9 @@ class AnalyticalLayerLoader: else: removed_indicators.append({ 'indicator_name': indicator_name, - 'reason' : f"missing countries in years: {', '.join(problematic_years[:5])}" + 'reason' : ( + f"missing countries in years: {', '.join(problematic_years[:5])}" + ) }) self.logger.info(f"\n [+] Valid: {len(valid_indicators)}") @@ -334,12 +384,17 @@ class AnalyticalLayerLoader: raise ValueError("No valid indicators found after filtering!") original_count = len(self.df_clean) - self.df_clean = self.df_clean[self.df_clean['indicator_id'].isin(valid_indicators)].copy() + self.df_clean = self.df_clean[ + self.df_clean['indicator_id'].isin(valid_indicators) + ].copy() self.df_clean = self.df_clean.merge( - indicator_max_start[['indicator_id', 'max_start_year']], on='indicator_id', how='left' + indicator_max_start[['indicator_id', 'max_start_year']], + on='indicator_id', how='left' ) - self.df_clean = self.df_clean[self.df_clean['year'] >= self.df_clean['max_start_year']].copy() + self.df_clean = self.df_clean[ + self.df_clean['year'] >= self.df_clean['max_start_year'] + ].copy() self.df_clean = self.df_clean.drop('max_start_year', axis=1) self.logger.info(f"\n Rows before: {original_count:,}") @@ -355,12 +410,16 @@ class AnalyticalLayerLoader: self.logger.info("=" * 80) expected_countries = len(self.selected_country_ids) - verification = self.df_clean.groupby(['indicator_id', 'year'])['country_id'].nunique().reset_index() + verification = self.df_clean.groupby( + ['indicator_id', 'year'] + )['country_id'].nunique().reset_index() verification.columns = ['indicator_id', 'year', 'country_count'] all_good = (verification['country_count'] == expected_countries).all() if all_good: - self.logger.info(f" VERIFICATION PASSED — all combinations have {expected_countries} countries") + self.logger.info( + f" VERIFICATION PASSED — all combinations have {expected_countries} countries" + ) else: bad = verification[verification['country_count'] != expected_countries] for _, row in bad.head(10).iterrows(): @@ -372,6 +431,101 @@ class AnalyticalLayerLoader: return True + def calculate_yoy(self): + """ + Hitung Year-over-Year (YoY) per indikator per negara. + + Kolom yang ditambahkan ke df_clean: + yoy_change : selisih absolut -> value - value_tahun_sebelumnya + yoy_pct : perubahan relatif -> (yoy_change / abs(value_prev)) * 100 + + Catatan: + - Baris tahun pertama per kombinasi country-indicator akan bernilai NULL + (tidak ada tahun sebelumnya sebagai pembanding) — ini intentional. + - value_prev di-drop setelah kalkulasi, tidak ikut disimpan ke BigQuery. + - Dilakukan SETELAH verify_no_gaps() agar data sudah clean dan sorted benar. + """ + self.logger.info("\n" + "=" * 80) + self.logger.info("STEP 6b: CALCULATE YEAR-OVER-YEAR (YoY) PER INDICATOR PER COUNTRY") + self.logger.info("=" * 80) + + df = self.df_clean.sort_values(['country_id', 'indicator_id', 'year']).copy() + + # Nilai tahun sebelumnya (shifted within each country-indicator group) + df['value_prev'] = df.groupby(['country_id', 'indicator_id'])['value'].shift(1) + + # YoY absolute change: value(t) - value(t-1) + df['yoy_change'] = df['value'] - df['value_prev'] + + # YoY percentage change: (yoy_change / |value_prev|) * 100 + # Hindari division by zero — jika value_prev == 0 atau NaN, hasilnya NaN + df['yoy_pct'] = np.where( + df['value_prev'].notna() & (df['value_prev'] != 0), + (df['yoy_change'] / df['value_prev'].abs()) * 100, + np.nan + ) + + # Drop kolom bantu value_prev, tidak ikut disimpan ke BigQuery + df = df.drop(columns=['value_prev']) + + # Log ringkasan + total_rows = len(df) + valid_yoy = df['yoy_pct'].notna().sum() + null_yoy = df['yoy_pct'].isna().sum() + + self.logger.info(f" Total rows : {total_rows:,}") + self.logger.info(f" YoY calculated : {valid_yoy:,}") + self.logger.info(f" YoY NULL (base yr): {null_yoy:,} <- tahun pertama per country-indicator") + + # Log distribusi YoY per indikator (sample) + per_ind = ( + df[df['yoy_pct'].notna()] + .groupby(['indicator_id', 'indicator_name'])['yoy_pct'] + .agg(['mean', 'std', 'min', 'max']) + .reset_index() + ) + per_ind.columns = ['indicator_id', 'indicator_name', 'mean', 'std', 'min', 'max'] + + self.logger.info(f"\n YoY summary per indicator (top 10 by abs mean change):") + self.logger.info(f" {'-'*100}") + self.logger.info( + f" {'ID':<5} {'Indicator Name':<52} {'Mean%':>8} {'Std%':>8} {'Min%':>8} {'Max%':>8}" + ) + self.logger.info(f" {'-'*100}") + + top_ind = per_ind.reindex( + per_ind['mean'].abs().sort_values(ascending=False).index + ).head(10) + + for _, row in top_ind.iterrows(): + self.logger.info( + f" {int(row['indicator_id']):<5} {row['indicator_name'][:50]:<52} " + f"{row['mean']:>+8.2f} {row['std']:>8.2f} " + f"{row['min']:>+8.2f} {row['max']:>+8.2f}" + ) + + # Log distribusi YoY per negara (ringkasan) + per_country = ( + df[df['yoy_pct'].notna()] + .groupby(['country_id', 'country_name'])['yoy_pct'] + .agg(['mean', 'std']) + .reset_index() + ) + per_country.columns = ['country_id', 'country_name', 'mean_yoy', 'std_yoy'] + + self.logger.info(f"\n YoY summary per country:") + self.logger.info(f" {'-'*60}") + self.logger.info(f" {'Country':<30} {'Mean YoY%':>10} {'Std YoY%':>10}") + self.logger.info(f" {'-'*60}") + for _, row in per_country.sort_values('mean_yoy', ascending=False).iterrows(): + self.logger.info( + f" {row['country_name']:<30} {row['mean_yoy']:>+10.2f} {row['std_yoy']:>10.2f}" + ) + + self.df_clean = df + self.logger.info(f"\n [OK] YoY columns added: yoy_change, yoy_pct") + return self.df_clean + def analyze_indicator_availability_by_year(self): self.logger.info("\n" + "=" * 80) self.logger.info("STEP 7: ANALYZE INDICATOR AVAILABILITY BY YEAR") @@ -394,39 +548,62 @@ class AnalyticalLayerLoader: ) indicator_details = self.df_clean.groupby([ - 'indicator_id', 'indicator_name', 'pillar_name', 'direction' + 'indicator_id', 'indicator_name', 'pillar_name', 'direction', 'framework' ]).agg({'year': ['min', 'max'], 'country_id': 'nunique'}).reset_index() indicator_details.columns = [ - 'indicator_id', 'indicator_name', 'pillar_name', 'direction', + 'indicator_id', 'indicator_name', 'pillar_name', 'direction', 'framework', 'start_year', 'end_year', 'country_count' ] indicator_details['year_range'] = ( indicator_details['start_year'].astype(int).astype(str) + '-' + indicator_details['end_year'].astype(int).astype(str) ) - indicator_details = indicator_details.sort_values(['pillar_name', 'start_year', 'indicator_name']) + indicator_details = indicator_details.sort_values( + ['framework', 'pillar_name', 'start_year', 'indicator_name'] + ) self.logger.info(f"\nTotal Indicators: {len(indicator_details)}") for pillar, count in indicator_details.groupby('pillar_name').size().items(): self.logger.info(f" {pillar}: {count} indicators") - self.logger.info(f"\n{'-'*100}") - self.logger.info(f"{'ID':<5} {'Indicator Name':<55} {'Pillar':<15} {'Years':<12} {'Dir':<8} {'Countries'}") - self.logger.info(f"{'-'*100}") + self.logger.info(f"\nFramework breakdown:") + for fw, count in indicator_details.groupby('framework').size().items(): + self.logger.info(f" {fw}: {count} indicators") + + self.logger.info(f"\n{'-'*110}") + self.logger.info( + f"{'ID':<5} {'Indicator Name':<55} {'Pillar':<15} " + f"{'Framework':<10} {'Years':<12} {'Dir':<8} {'Countries'}" + ) + self.logger.info(f"{'-'*110}") for _, row in indicator_details.iterrows(): direction = 'higher+' if row['direction'] == 'higher_better' else 'lower-' self.logger.info( f"{int(row['indicator_id']):<5} {row['indicator_name'][:52]:<55} " - f"{row['pillar_name'][:13]:<15} {row['year_range']:<12} " - f"{direction:<8} {int(row['country_count'])}" + f"{row['pillar_name'][:13]:<15} {row['framework']:<10} " + f"{row['year_range']:<12} {direction:<8} {int(row['country_count'])}" ) return year_stats def save_analytical_table(self): - # --------------------------------------------------------------- - # CHANGED: nama tabel baru + kolom lengkap untuk Looker Studio - # --------------------------------------------------------------- + """ + Simpan fact_asean_food_security_selected ke Gold layer. + + Kolom yang disimpan: + country_id, country_name — dimensi negara + indicator_id, indicator_name — dimensi indikator + direction — arah penilaian (higher/lower_better) + framework — MDGs / SDGs (untuk filter Looker Studio) + pillar_id, pillar_name — dimensi pilar + time_id, year — dimensi waktu + value — nilai indikator + yoy_change — perubahan absolut YoY (NULLABLE: NULL di tahun pertama) + yoy_pct — perubahan relatif YoY dalam % (NULLABLE: NULL di tahun pertama) + + Kolom framework memungkinkan filter langsung di Looker Studio tanpa join ke dim_indicator. + Kolom yoy_change dan yoy_pct memungkinkan analisis tren tahunan langsung di Looker Studio. + """ table_name = 'fact_asean_food_security_selected' self.logger.info("\n" + "=" * 80) @@ -434,22 +611,48 @@ class AnalyticalLayerLoader: self.logger.info("=" * 80) try: - # ------------------------------------------------------------------ - # Pilih kolom: ID + Nama lengkap + value - # Kolom nama memudahkan filtering/slicing langsung di Looker Studio - # tanpa perlu join ulang ke tabel dimensi. - # ------------------------------------------------------------------ + # Pastikan kolom framework tersedia di df_clean + if 'framework' not in self.df_clean.columns: + self.logger.warning( + " [WARN] Kolom 'framework' tidak ada di df_clean. " + "Melakukan join ke dim_indicator sebagai fallback..." + ) + dim_ind = read_from_bigquery(self.client, 'dim_indicator', layer='gold') + if 'framework' in dim_ind.columns: + self.df_clean = self.df_clean.merge( + dim_ind[['indicator_id', 'framework']], + on='indicator_id', how='left' + ) + self.df_clean['framework'] = self.df_clean['framework'].fillna('MDGs') + self.logger.info(" [OK] framework di-join dari dim_indicator") + else: + self.df_clean['framework'] = 'MDGs' + self.logger.warning( + " [WARN] dim_indicator juga tidak punya kolom framework. " + "Default: MDGs. Jalankan ulang pipeline dari cleaned_layer." + ) + + # Pastikan kolom YoY tersedia — fallback jika calculate_yoy() tidak dipanggil + if 'yoy_change' not in self.df_clean.columns or 'yoy_pct' not in self.df_clean.columns: + self.logger.warning( + " [WARN] Kolom YoY tidak ditemukan. Menjalankan calculate_yoy() sebagai fallback..." + ) + self.calculate_yoy() + analytical_df = self.df_clean[[ 'country_id', 'country_name', 'indicator_id', 'indicator_name', 'direction', + 'framework', 'pillar_id', 'pillar_name', 'time_id', 'year', 'value', + 'yoy_change', + 'yoy_pct', ]].copy() analytical_df = analytical_df.sort_values( @@ -462,27 +665,46 @@ class AnalyticalLayerLoader: analytical_df['indicator_id'] = analytical_df['indicator_id'].astype(int) analytical_df['indicator_name']= analytical_df['indicator_name'].astype(str) analytical_df['direction'] = analytical_df['direction'].astype(str) + analytical_df['framework'] = analytical_df['framework'].astype(str) analytical_df['pillar_id'] = analytical_df['pillar_id'].astype(int) analytical_df['pillar_name'] = analytical_df['pillar_name'].astype(str) analytical_df['time_id'] = analytical_df['time_id'].astype(int) analytical_df['year'] = analytical_df['year'].astype(int) analytical_df['value'] = analytical_df['value'].astype(float) + # yoy_change dan yoy_pct tetap float — NULL (NaN) di tahun pertama adalah intentional + analytical_df['yoy_change'] = analytical_df['yoy_change'].astype(float) + analytical_df['yoy_pct'] = analytical_df['yoy_pct'].astype(float) self.logger.info(f" Kolom yang disimpan: {list(analytical_df.columns)}") self.logger.info(f" Total rows: {len(analytical_df):,}") - # Schema BigQuery + # Log distribusi framework + fw_dist = analytical_df.drop_duplicates('indicator_id')['framework'].value_counts() + self.logger.info(f" Framework distribution (per indikator unik):") + for fw, cnt in fw_dist.items(): + self.logger.info(f" {fw}: {cnt} indicators") + + # Log statistik YoY + yoy_valid = analytical_df['yoy_pct'].notna().sum() + yoy_null = analytical_df['yoy_pct'].isna().sum() + self.logger.info(f" YoY rows (calculated): {yoy_valid:,}") + self.logger.info(f" YoY rows (NULL/base) : {yoy_null:,}") + schema = [ bigquery.SchemaField("country_id", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("country_name", "STRING", mode="REQUIRED"), bigquery.SchemaField("indicator_id", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("indicator_name", "STRING", mode="REQUIRED"), bigquery.SchemaField("direction", "STRING", mode="REQUIRED"), + bigquery.SchemaField("framework", "STRING", mode="REQUIRED"), bigquery.SchemaField("pillar_id", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("pillar_name", "STRING", mode="REQUIRED"), bigquery.SchemaField("time_id", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("value", "FLOAT", mode="REQUIRED"), + # NULLABLE karena tahun pertama per country-indicator tidak memiliki nilai sebelumnya + bigquery.SchemaField("yoy_change", "FLOAT", mode="NULLABLE"), + bigquery.SchemaField("yoy_pct", "FLOAT", mode="NULLABLE"), ] rows_loaded = load_to_bigquery( @@ -508,17 +730,25 @@ class AnalyticalLayerLoader: 'fixed_countries': len(self.selected_country_ids), 'no_gaps' : True, 'layer' : 'gold', - 'columns' : 'id + name + value (Looker Studio ready)' + 'columns' : ( + 'id + name + direction + framework + value + ' + 'yoy_change + yoy_pct (Looker Studio ready)' + ) }), 'validation_metrics' : json.dumps({ 'fixed_countries' : len(self.selected_country_ids), - 'total_indicators': int(self.df_clean['indicator_id'].nunique()) + 'total_indicators': int(self.df_clean['indicator_id'].nunique()), + 'framework_dist' : fw_dist.to_dict(), + 'yoy_rows_valid' : int(yoy_valid), + 'yoy_rows_null' : int(yoy_null), }) } save_etl_metadata(self.client, metadata) - self.logger.info(f" ✓ {table_name}: {rows_loaded:,} rows → [DW/Gold] fs_asean_gold") - self.logger.info(f" Metadata → [AUDIT] etl_metadata") + self.logger.info( + f" {table_name}: {rows_loaded:,} rows -> [DW/Gold] fs_asean_gold" + ) + self.logger.info(f" Metadata -> [AUDIT] etl_metadata") return rows_loaded except Exception as e: @@ -530,7 +760,9 @@ class AnalyticalLayerLoader: self.pipeline_metadata['start_time'] = self.pipeline_start self.logger.info("\n" + "=" * 80) - self.logger.info("Output: fact_asean_food_security_selected → fs_asean_gold") + self.logger.info("Output: fact_asean_food_security_selected -> fs_asean_gold") + self.logger.info("Kolom: country_id/name, indicator_id/name, direction, framework,") + self.logger.info(" pillar_id/name, time_id, year, value, yoy_change, yoy_pct") self.logger.info("=" * 80) self.load_source_data() @@ -539,6 +771,7 @@ class AnalyticalLayerLoader: self.select_countries_with_all_pillars() self.filter_indicators_consistent_across_fixed_countries() self.verify_no_gaps() + self.calculate_yoy() # <-- Step 6b: hitung YoY self.analyze_indicator_availability_by_year() self.save_analytical_table() @@ -577,7 +810,7 @@ def run_analytical_layer(): if __name__ == "__main__": print("=" * 80) - print("Output: fact_asean_food_security_selected → fs_asean_gold") + print("Output: fact_asean_food_security_selected -> fs_asean_gold") print("=" * 80) logger = setup_logging() diff --git a/scripts/bigquery_cleaned_layer.py b/scripts/bigquery_cleaned_layer.py index 0d112fc..26dbd9c 100644 --- a/scripts/bigquery_cleaned_layer.py +++ b/scripts/bigquery_cleaned_layer.py @@ -40,12 +40,12 @@ def load_staging_data(client: bigquery.Client) -> pd.DataFrame: """Load data dari staging_integrated (STAGING/Silver layer).""" print("\nLoading data from staging_integrated (fs_asean_silver)...") df_staging = read_from_bigquery(client, 'staging_integrated', layer='silver') - print(f" ✓ Loaded : {len(df_staging):,} rows") - print(f" Columns : {len(df_staging.columns)}") - print(f" Sources : {df_staging['source'].nunique()}") - print(f" Indicators : {df_staging['indicator_standardized'].nunique()}") - print(f" Countries : {df_staging['country'].nunique()}") - print(f" Year range : {int(df_staging['year'].min())}-{int(df_staging['year'].max())}") + print(f" Loaded : {len(df_staging):,} rows") + print(f" Columns : {len(df_staging.columns)}") + print(f" Sources : {df_staging['source'].nunique()}") + print(f" Indicators : {df_staging['indicator_standardized'].nunique()}") + print(f" Countries : {df_staging['country'].nunique()}") + print(f" Year range : {int(df_staging['year'].min())}-{int(df_staging['year'].max())}") return df_staging @@ -53,7 +53,6 @@ def load_staging_data(client: bigquery.Client) -> pd.DataFrame: # COLUMN CONSTRAINT HELPERS # ============================================================================= -# Schema constraints — semua varchar max lengths COLUMN_CONSTRAINTS = { 'source' : 20, 'indicator_original' : 255, @@ -62,7 +61,8 @@ COLUMN_CONSTRAINTS = { 'year_range' : 20, 'unit' : 20, 'pillar' : 20, - 'direction' : 15, # 'higher_better'=13, 'lower_better'=12 + 'direction' : 15, + 'framework' : 5, # 'MDGs'=4, 'SDGs'=4 } @@ -101,11 +101,11 @@ def apply_column_constraints(df: pd.DataFrame) -> pd.DataFrame: ) if truncation_report: - print("\n ⚠ Column Truncations Applied:") + print("\n Column Truncations Applied:") for column, info in truncation_report.items(): print(f" - {column}: {info['count']} values truncated to {info['max_length']} chars") else: - print("\n ✓ No truncations needed — all values within constraints") + print("\n No truncations needed — all values within constraints") return df_constrained @@ -156,11 +156,11 @@ def standardize_country_names_asean(df: pd.DataFrame, country_column: str = 'cou def map_country(country): if pd.isna(country): return country - s = str(country).strip() + s = str(country).strip() mapped = ASEAN_MAPPING.get(s.upper(), s) return mapped[:100] if len(mapped) > 100 else mapped - original = df_clean[country_column].copy() + original = df_clean[country_column].copy() df_clean[country_column] = df_clean[country_column].apply(map_country) changes = {orig: new for orig, new in zip(original, df_clean[country_column]) if orig != new} @@ -178,7 +178,7 @@ def assign_pillar(indicator_name: str) -> str: """ Assign pillar berdasarkan keyword indikator. Return values: 'Availability', 'Access', 'Utilization', 'Stability', 'Other' - All ≤ 20 chars (varchar(20) constraint). + All <= 20 chars (varchar(20) constraint). """ if pd.isna(indicator_name): return 'Other' @@ -210,8 +210,9 @@ def assign_pillar(indicator_name: str) -> str: if any(kw in ind for kw in [ 'wasting', 'wasted', 'stunted', 'overweight', 'obese', 'obesity', - 'anemia', 'birthweight', 'breastfeeding', 'drinking water', 'sanitation', - 'children under 5', 'newborns with low', 'women of reproductive' + 'anemia', 'anaemia', 'birthweight', 'breastfeeding', 'drinking water', + 'sanitation', 'children under 5', 'newborns with low', + 'women of reproductive' ]): return 'Utilization' @@ -226,17 +227,15 @@ def assign_direction(indicator_name: str) -> str: """ Assign direction berdasarkan indikator. Return values: 'higher_better' (13 chars) atau 'lower_better' (12 chars) - Both ≤ 15 chars (varchar(15) constraint). + Both <= 15 chars (varchar(15) constraint). """ if pd.isna(indicator_name): return 'higher_better' ind = str(indicator_name).lower() - # Spesifik lower_better if 'share of dietary energy supply derived from cereals' in ind: return 'lower_better' - # Higher_better exceptions — cek sebelum lower_better keywords for kw in [ 'exclusive breastfeeding', 'dietary energy supply', @@ -248,7 +247,6 @@ def assign_direction(indicator_name: str) -> str: if kw in ind: return 'higher_better' - # Lower_better — masalah yang harus diminimalkan for kw in [ 'prevalence of undernourishment', 'prevalence of severe food insecurity', @@ -259,6 +257,7 @@ def assign_direction(indicator_name: str) -> str: 'prevalence of overweight', 'prevalence of obesity', 'prevalence of anemia', + 'prevalence of anaemia', 'prevalence of low birthweight', 'number of people undernourished', 'number of severely food insecure', @@ -283,6 +282,9 @@ def assign_direction(indicator_name: str) -> str: 'coefficient of variation', 'incidence of caloric losses', 'food losses', + 'indicator of food price anomalies', + 'proportion of local breeds classified as being at risk', + 'agricultural export subsidies', ]: if kw in ind: return 'lower_better' @@ -290,6 +292,73 @@ def assign_direction(indicator_name: str) -> str: return 'higher_better' +# ============================================================================= +# FRAMEWORK CLASSIFICATION (MDGs vs SDGs) +# ============================================================================= + +# Daftar keyword eksplisit dari SDG Goal 2 (2030 Agenda for Sustainable Development) +# Sumber: UN SDG Indicators — versi Maret 2020 +# Indikator: 2.1.1, 2.1.2, 2.2.1, 2.2.2, 2.2.3, 2.3.1, 2.3.2, 2.4.1, +# 2.5.1, 2.5.2, 2.a.1, 2.a.2, 2.b.1, 2.c.1 +SDG_INDICATOR_KEYWORDS = frozenset([ + # 2.1.1 — Prevalence of undernourishment + "prevalence of undernourishment", + # 2.1.2 — Prevalence of moderate or severe food insecurity (FIES) + "prevalence of moderate or severe food insecurity", + "prevalence of severe food insecurity", + "prevalence of moderate food insecurity", + # 2.2.1 — Prevalence of stunting + "prevalence of stunting", + # 2.2.2 — Prevalence of malnutrition (wasting and overweight) + "prevalence of malnutrition", + "prevalence of wasting", + "prevalence of overweight", + # 2.2.3 — Prevalence of anaemia in women 15-49 + "prevalence of anaemia", + "prevalence of anemia", + # 2.3.1 — Volume of production per labour unit + "volume of production per labour unit", + # 2.3.2 — Average income of small-scale food producers + "average income of small-scale food producers", + # 2.4.1 — Proportion of agricultural area under productive and sustainable agriculture + "proportion of agricultural area under productive", + # 2.5.1 — Number of plant and animal genetic resources secured + "number of plant and animal genetic resources", + # 2.5.2 — Proportion of local breeds at risk of extinction + "proportion of local breeds classified as being at risk", + # 2.a.1 — Agriculture orientation index for government expenditures + "agriculture orientation index", + # 2.a.2 — Total official flows to the agriculture sector + "total official flows", + # 2.b.1 — Agricultural export subsidies + "agricultural export subsidies", + # 2.c.1 — Indicator of food price anomalies + "indicator of food price anomalies", +]) + + +def assign_framework(indicator_name: str) -> str: + """ + Assign framework berdasarkan daftar eksplisit indikator SDG Goal 2 + dari 2030 Agenda for Sustainable Development (versi Maret 2020). + + Logika: + - Cek apakah nama indikator mengandung keyword SDG yang terdaftar + - Jika ya -> 'SDGs' + - Jika tidak -> 'MDGs' (indikator FAO/lama yang bukan SDG resmi) + + Return values: 'MDGs' atau 'SDGs' + Panjang max 4 chars (dalam constraint varchar(5)). + """ + if pd.isna(indicator_name): + return 'MDGs' + ind = str(indicator_name).lower().strip() + for kw in SDG_INDICATOR_KEYWORDS: + if kw in ind: + return 'SDGs' + return 'MDGs' + + # ============================================================================= # CLEANED DATA LOADER # ============================================================================= @@ -299,19 +368,18 @@ class CleanedDataLoader: Loader untuk cleaned integrated data ke STAGING layer (Silver). Kimball context: - Input : staging_integrated → STAGING (Silver) — fs_asean_silver - Output : cleaned_integrated → STAGING (Silver) — fs_asean_silver - Audit : etl_logs, etl_metadata → AUDIT — fs_asean_audit + Input : staging_integrated -> STAGING (Silver) — fs_asean_silver + Output : cleaned_integrated -> STAGING (Silver) — fs_asean_silver + Audit : etl_logs, etl_metadata -> AUDIT — fs_asean_audit Pipeline steps: 1. Standardize country names (ASEAN) 2. Remove missing values 3. Remove duplicates - 4. Add pillar classification - 5. Add direction classification - 6. Apply column constraints - 7. Load ke BigQuery - 8. Log ke Audit layer + 4. Add pillar, direction & framework classification + 5. Apply column constraints + 6. Load ke BigQuery + 7. Log ke Audit layer """ SCHEMA = [ @@ -325,6 +393,7 @@ class CleanedDataLoader: bigquery.SchemaField("unit", "STRING", mode="NULLABLE"), bigquery.SchemaField("pillar", "STRING", mode="REQUIRED"), bigquery.SchemaField("direction", "STRING", mode="REQUIRED"), + bigquery.SchemaField("framework", "STRING", mode="REQUIRED"), ] def __init__(self, client: bigquery.Client, load_mode: str = 'full_refresh'): @@ -355,7 +424,7 @@ class CleanedDataLoader: def _step_standardize_countries(self, df: pd.DataFrame) -> pd.DataFrame: print("\n [Step 1/5] Standardize country names...") df, report = standardize_country_names_asean(df, country_column='country') - print(f" ✓ ASEAN countries mapped : {report['countries_mapped']}") + print(f" ASEAN countries mapped : {report['countries_mapped']}") unique_countries = sorted(df['country'].unique()) print(f" Countries ({len(unique_countries)}) : {', '.join(unique_countries)}") log_update(self.client, 'STAGING', 'staging_integrated', @@ -377,7 +446,9 @@ class CleanedDataLoader: def _step_remove_duplicates(self, df: pd.DataFrame) -> pd.DataFrame: print("\n [Step 3/5] Remove duplicates...") exact_dups = df.duplicated().sum() - data_dups = df.duplicated(subset=['indicator_standardized', 'country', 'year', 'value']).sum() + data_dups = df.duplicated( + subset=['indicator_standardized', 'country', 'year', 'value'] + ).sum() print(f" Exact duplicates : {exact_dups:,}") print(f" Data duplicates : {data_dups:,}") rows_before = len(df) @@ -389,21 +460,39 @@ class CleanedDataLoader: return df_clean def _step_add_classifications(self, df: pd.DataFrame) -> pd.DataFrame: - print("\n [Step 4/5] Add pillar & direction classification...") + print("\n [Step 4/5] Add pillar, direction & framework classification...") df = df.copy() + df['pillar'] = df['indicator_standardized'].apply(assign_pillar) df['direction'] = df['indicator_standardized'].apply(assign_direction) + df['framework'] = df['indicator_standardized'].apply(assign_framework) pillar_counts = df['pillar'].value_counts() - print(f" ✓ Pillar distribution:") + print(f" Pillar distribution:") for pillar, count in pillar_counts.items(): print(f" - {pillar}: {count:,}") direction_counts = df['direction'].value_counts() - print(f" ✓ Direction distribution:") + print(f" Direction distribution:") for direction, count in direction_counts.items(): pct = count / len(df) * 100 print(f" - {direction}: {count:,} ({pct:.1f}%)") + + framework_counts = df['framework'].value_counts() + print(f" Framework distribution:") + for fw, count in framework_counts.items(): + pct = count / len(df) * 100 + print(f" - {fw}: {count:,} ({pct:.1f}%)") + + # Log indikator yang terklasifikasi SDGs untuk verifikasi + sdg_inds = ( + df[df['framework'] == 'SDGs']['indicator_standardized'] + .drop_duplicates().sort_values().tolist() + ) + print(f"\n SDG indicators ({len(sdg_inds)}):") + for ind in sdg_inds: + print(f" - {ind}") + return df def _step_apply_constraints(self, df: pd.DataFrame) -> pd.DataFrame: @@ -428,7 +517,7 @@ class CleanedDataLoader: 'max' : int(df['year'].max()) if not df['year'].isnull().all() else None, 'unique_years': int(df['year'].nunique()) } - for col in ('pillar', 'direction', 'source'): + for col in ('pillar', 'direction', 'framework', 'source'): if col in df.columns: validation[f'{col}_breakdown'] = { str(k): int(v) for k, v in df[col].value_counts().to_dict().items() @@ -438,7 +527,6 @@ class CleanedDataLoader: if 'country' in df.columns: validation['unique_countries'] = int(df['country'].nunique()) - # Column length check column_length_check = {} for col, max_len in COLUMN_CONSTRAINTS.items(): if col in df.columns: @@ -457,7 +545,7 @@ class CleanedDataLoader: def run(self, df: pd.DataFrame) -> int: """ - Execute full cleaning pipeline → load ke STAGING (Silver). + Execute full cleaning pipeline -> load ke STAGING (Silver). Returns: int: Rows loaded @@ -469,7 +557,6 @@ class CleanedDataLoader: print(" ERROR: DataFrame is empty, nothing to process.") return 0 - # Pipeline steps df = self._step_standardize_countries(df) df = self._step_remove_missing(df) df = self._step_remove_duplicates(df) @@ -478,7 +565,6 @@ class CleanedDataLoader: self.metadata['rows_transformed'] = len(df) - # Validate validation = self.validate_data(df) self.metadata['validation_metrics'] = validation @@ -487,13 +573,12 @@ class CleanedDataLoader: for info in validation.get('column_length_check', {}).values() ) if not all_within_limits: - print("\n ⚠ WARNING: Some columns still exceed length constraints!") + print("\n WARNING: Some columns still exceed length constraints!") for col, info in validation['column_length_check'].items(): if not info['within_limit']: print(f" - {col}: {info['max_actual_length']} > {info['max_length_constraint']}") - # Load ke Silver - print(f"\n Loading to [STAGING/Silver] {self.table_name} → fs_asean_silver...") + print(f"\n Loading to [STAGING/Silver] {self.table_name} -> fs_asean_silver...") rows_loaded = load_to_bigquery( self.client, df, self.table_name, layer='silver', @@ -502,10 +587,8 @@ class CleanedDataLoader: ) self.metadata['rows_loaded'] = rows_loaded - # Audit logs log_update(self.client, 'STAGING', self.table_name, 'full_refresh', rows_loaded) - # ETL metadata self.metadata['end_time'] = datetime.now() self.metadata['duration_seconds'] = ( self.metadata['end_time'] - self.metadata['start_time'] @@ -516,33 +599,31 @@ class CleanedDataLoader: self.metadata['validation_metrics'] = json.dumps(validation) save_etl_metadata(self.client, self.metadata) - # Summary - print(f"\n ✓ Cleaned Integration completed: {rows_loaded:,} rows") + print(f"\n Cleaned Integration completed: {rows_loaded:,} rows") print(f" Duration : {self.metadata['duration_seconds']:.2f}s") print(f" Completeness : {validation['completeness_pct']:.2f}%") if 'year_range' in validation: yr = validation['year_range'] if yr['min'] and yr['max']: - print(f" Year range : {yr['min']}–{yr['max']}") + print(f" Year range : {yr['min']}-{yr['max']}") print(f" Indicators : {validation.get('unique_indicators', '-')}") print(f" Countries : {validation.get('unique_countries', '-')}") print(f"\n Schema Validation:") for col, info in validation.get('column_length_check', {}).items(): - status = "✓" if info['within_limit'] else "✗" - print(f" {status} {col}: {info['max_actual_length']}/{info['max_length_constraint']}") - print(f"\n Metadata → [AUDIT] etl_metadata") + status = "OK" if info['within_limit'] else "FAIL" + print(f" [{status}] {col}: {info['max_actual_length']}/{info['max_length_constraint']}") + print(f"\n Metadata -> [AUDIT] etl_metadata") return rows_loaded # ============================================================================= -# AIRFLOW TASK FUNCTIONS ← sama polanya dengan raw layer +# AIRFLOW TASK FUNCTIONS # ============================================================================= def run_cleaned_integration(): """ Airflow task: Load cleaned_integrated dari staging_integrated. - Dipanggil oleh DAG setelah task staging_integration_to_silver selesai. """ from scripts.bigquery_config import get_bigquery_client @@ -561,21 +642,21 @@ if __name__ == "__main__": print("=" * 60) print("BIGQUERY CLEANED LAYER ETL") print("Kimball DW Architecture") - print(" Input : STAGING (Silver) → staging_integrated") - print(" Output : STAGING (Silver) → cleaned_integrated") - print(" Audit : AUDIT → etl_logs, etl_metadata") + print(" Input : STAGING (Silver) -> staging_integrated") + print(" Output : STAGING (Silver) -> cleaned_integrated") + print(" Audit : AUDIT -> etl_logs, etl_metadata") print("=" * 60) logger = setup_logging() client = get_bigquery_client() df_staging = load_staging_data(client) - print("\n[1/1] Cleaned Integration → STAGING (Silver)...") + print("\n[1/1] Cleaned Integration -> STAGING (Silver)...") loader = CleanedDataLoader(client, load_mode='full_refresh') final_count = loader.run(df_staging) print("\n" + "=" * 60) - print("✓ CLEANED LAYER ETL COMPLETED") - print(f" 🥈 STAGING (Silver) : cleaned_integrated ({final_count:,} rows)") - print(f" 📋 AUDIT : etl_logs, etl_metadata") + print("[OK] CLEANED LAYER ETL COMPLETED") + print(f" STAGING (Silver) : cleaned_integrated ({final_count:,} rows)") + print(f" AUDIT : etl_logs, etl_metadata") print("=" * 60) \ No newline at end of file diff --git a/scripts/bigquery_dimensional_model.py b/scripts/bigquery_dimensional_model.py index a5e665c..bf59fe7 100644 --- a/scripts/bigquery_dimensional_model.py +++ b/scripts/bigquery_dimensional_model.py @@ -46,13 +46,13 @@ class DimensionalModelLoader: Loader untuk dimensional model ke DW layer (Gold) — fs_asean_gold. Kimball context: - Input : cleaned_integrated → STAGING (Silver) — fs_asean_silver - Output : dim_* + fact_* → DW (Gold) — fs_asean_gold - Audit : etl_logs, etl_metadata → AUDIT — fs_asean_audit + Input : cleaned_integrated -> STAGING (Silver) — fs_asean_silver + Output : dim_* + fact_* -> DW (Gold) — fs_asean_gold + Audit : etl_logs, etl_metadata -> AUDIT — fs_asean_audit Pipeline steps: 1. Load dim_country - 2. Load dim_indicator + 2. Load dim_indicator (+ kolom framework dari cleaned_integrated) 3. Load dim_time 4. Load dim_source 5. Load dim_pillar @@ -117,7 +117,7 @@ class DimensionalModelLoader: """ try: self.client.query(query).result() - self.logger.info(f" [OK] FK: {table_name}.{fk_column} → {ref_table}.{ref_column}") + self.logger.info(f" [OK] FK: {table_name}.{fk_column} -> {ref_table}.{ref_column}") except Exception as e: if "already exists" in str(e).lower(): self.logger.info(f" [INFO] FK already exists: {constraint_name}") @@ -129,7 +129,7 @@ class DimensionalModelLoader: # ------------------------------------------------------------------ def _save_table_metadata(self, table_name: str): - meta = self.load_metadata[table_name] + meta = self.load_metadata[table_name] metadata = { 'source_class' : self.__class__.__name__, 'table_name' : table_name, @@ -145,7 +145,7 @@ class DimensionalModelLoader: } try: save_etl_metadata(self.client, metadata) - self.logger.info(f" Metadata → [AUDIT] etl_metadata") + self.logger.info(f" Metadata -> [AUDIT] etl_metadata") except Exception as e: self.logger.warning(f" [WARN] Could not save metadata for {table_name}: {e}") @@ -156,13 +156,13 @@ class DimensionalModelLoader: def load_dim_time(self): table_name = 'dim_time' self.load_metadata[table_name]['start_time'] = datetime.now() - self.logger.info("Loading dim_time → [DW/Gold] fs_asean_gold...") + self.logger.info("Loading dim_time -> [DW/Gold] fs_asean_gold...") try: if 'year_range' in self.df_clean.columns: dim_time = self.df_clean[['year', 'year_range']].drop_duplicates().copy() else: - dim_time = self.df_clean[['year']].drop_duplicates().copy() + dim_time = self.df_clean[['year']].drop_duplicates().copy() dim_time['year_range'] = None dim_time['year'] = dim_time['year'].astype(int) @@ -194,10 +194,10 @@ class DimensionalModelLoader: pass return pd.Series({'year': year, 'start_year': start_year, 'end_year': end_year}) - parsed = dim_time.apply(parse_year_range, axis=1) - dim_time['year'] = parsed['year'].astype(int) - dim_time['start_year'] = parsed['start_year'].astype(int) - dim_time['end_year'] = parsed['end_year'].astype(int) + parsed = dim_time.apply(parse_year_range, axis=1) + dim_time['year'] = parsed['year'].astype(int) + dim_time['start_year'] = parsed['start_year'].astype(int) + dim_time['end_year'] = parsed['end_year'].astype(int) dim_time['is_year_range'] = (dim_time['start_year'] != dim_time['end_year']) dim_time['decade'] = (dim_time['year'] // 10) * 10 dim_time['is_range'] = (dim_time['start_year'] != dim_time['end_year']).astype(int) @@ -229,7 +229,7 @@ class DimensionalModelLoader: ) log_update(self.client, 'DW', table_name, 'full_load', rows_loaded) self._save_table_metadata(table_name) - self.logger.info(f" ✓ dim_time: {rows_loaded} rows\n") + self.logger.info(f" dim_time: {rows_loaded} rows\n") return rows_loaded except Exception as e: @@ -240,11 +240,11 @@ class DimensionalModelLoader: def load_dim_country(self): table_name = 'dim_country' self.load_metadata[table_name]['start_time'] = datetime.now() - self.logger.info("Loading dim_country → [DW/Gold] fs_asean_gold...") + self.logger.info("Loading dim_country -> [DW/Gold] fs_asean_gold...") try: - dim_country = self.df_clean[['country']].drop_duplicates().copy() - dim_country.columns = ['country_name'] + dim_country = self.df_clean[['country']].drop_duplicates().copy() + dim_country.columns = ['country_name'] region_mapping = { 'Brunei Darussalam': ('Southeast Asia', 'ASEAN'), @@ -270,7 +270,9 @@ class DimensionalModelLoader: lambda x: region_mapping.get(x, ('Unknown', 'Unknown'))[1]) dim_country['iso_code'] = dim_country['country_name'].map(iso_mapping) - dim_country_final = dim_country[['country_name', 'region', 'subregion', 'iso_code']].copy() + dim_country_final = dim_country[ + ['country_name', 'region', 'subregion', 'iso_code'] + ].copy() dim_country_final = dim_country_final.reset_index(drop=True) dim_country_final.insert(0, 'country_id', range(1, len(dim_country_final) + 1)) @@ -293,7 +295,7 @@ class DimensionalModelLoader: ) log_update(self.client, 'DW', table_name, 'full_load', rows_loaded) self._save_table_metadata(table_name) - self.logger.info(f" ✓ dim_country: {rows_loaded} rows\n") + self.logger.info(f" dim_country: {rows_loaded} rows\n") return rows_loaded except Exception as e: @@ -302,60 +304,106 @@ class DimensionalModelLoader: raise def load_dim_indicator(self): + """ + Load dim_indicator ke Gold layer. + + Kolom yang dimuat: + indicator_id — surrogate key + indicator_name — nama standar indikator + indicator_category — kategori (Health & Nutrition, dll.) + unit — satuan ukuran + direction — higher_better / lower_better + framework — MDGs / SDGs <-- BARU: dibaca dari cleaned_integrated + """ table_name = 'dim_indicator' self.load_metadata[table_name]['start_time'] = datetime.now() - self.logger.info("Loading dim_indicator → [DW/Gold] fs_asean_gold...") + self.logger.info("Loading dim_indicator -> [DW/Gold] fs_asean_gold...") try: has_direction = 'direction' in self.df_clean.columns has_unit = 'unit' in self.df_clean.columns has_category = 'indicator_category' in self.df_clean.columns + has_framework = 'framework' in self.df_clean.columns - dim_indicator = self.df_clean[['indicator_standardized']].drop_duplicates().copy() - dim_indicator.columns = ['indicator_name'] + dim_indicator = self.df_clean[['indicator_standardized']].drop_duplicates().copy() + dim_indicator.columns = ['indicator_name'] + # Unit if has_unit: unit_map = self.df_clean[['indicator_standardized', 'unit']].drop_duplicates() - unit_map.columns = ['indicator_name', 'unit'] - dim_indicator = dim_indicator.merge(unit_map, on='indicator_name', how='left') + unit_map.columns = ['indicator_name', 'unit'] + dim_indicator = dim_indicator.merge(unit_map, on='indicator_name', how='left') else: dim_indicator['unit'] = None + # Direction if has_direction: dir_map = self.df_clean[['indicator_standardized', 'direction']].drop_duplicates() - dir_map.columns = ['indicator_name', 'direction'] - dim_indicator = dim_indicator.merge(dir_map, on='indicator_name', how='left') + dir_map.columns = ['indicator_name', 'direction'] + dim_indicator = dim_indicator.merge(dir_map, on='indicator_name', how='left') self.logger.info(" [OK] direction column from cleaned_integrated") else: dim_indicator['direction'] = 'higher_better' self.logger.warning(" [WARN] direction not found, default: higher_better") + # Indicator category if has_category: - cat_map = self.df_clean[['indicator_standardized', 'indicator_category']].drop_duplicates() - cat_map.columns = ['indicator_name', 'indicator_category'] - dim_indicator = dim_indicator.merge(cat_map, on='indicator_name', how='left') + cat_map = self.df_clean[ + ['indicator_standardized', 'indicator_category'] + ].drop_duplicates() + cat_map.columns = ['indicator_name', 'indicator_category'] + dim_indicator = dim_indicator.merge(cat_map, on='indicator_name', how='left') else: def categorize_indicator(name): n = str(name).lower() - if any(w in n for w in ['undernourishment', 'malnutrition', 'stunting', - 'wasting', 'anemia', 'food security', 'food insecure', 'hunger']): + if any(w in n for w in [ + 'undernourishment', 'malnutrition', 'stunting', + 'wasting', 'anemia', 'anaemia', 'food security', + 'food insecure', 'hunger' + ]): return 'Health & Nutrition' - elif any(w in n for w in ['production', 'yield', 'cereal', 'crop', - 'import dependency', 'share of dietary']): + elif any(w in n for w in [ + 'production', 'yield', 'cereal', 'crop', + 'import dependency', 'share of dietary' + ]): return 'Agricultural Production' elif any(w in n for w in ['import', 'export', 'trade']): return 'Trade' elif any(w in n for w in ['gdp', 'income', 'economic']): return 'Economic' - elif any(w in n for w in ['water', 'sanitation', 'infrastructure', 'rail']): + elif any(w in n for w in [ + 'water', 'sanitation', 'infrastructure', 'rail' + ]): return 'Infrastructure' else: return 'Other' - dim_indicator['indicator_category'] = dim_indicator['indicator_name'].apply(categorize_indicator) + dim_indicator['indicator_category'] = dim_indicator['indicator_name'].apply( + categorize_indicator + ) + + # Framework — KOLOM BARU + # Dibaca dari cleaned_integrated yang sudah menjalankan assign_framework(). + # Jika kolom belum ada (misal pipeline lama), fallback ke 'MDGs' dengan warning. + if has_framework: + fw_map = self.df_clean[ + ['indicator_standardized', 'framework'] + ].drop_duplicates() + fw_map.columns = ['indicator_name', 'framework'] + dim_indicator = dim_indicator.merge(fw_map, on='indicator_name', how='left') + # Pastikan tidak ada NULL setelah merge + dim_indicator['framework'] = dim_indicator['framework'].fillna('MDGs') + self.logger.info(" [OK] framework column from cleaned_integrated") + else: + dim_indicator['framework'] = 'MDGs' + self.logger.warning( + " [WARN] framework column not found in cleaned_integrated. " + "Default: MDGs. Jalankan bigquery_cleaned_layer.py terlebih dahulu." + ) + + dim_indicator = dim_indicator.drop_duplicates(subset=['indicator_name'], keep='first') - dim_indicator = dim_indicator.drop_duplicates(subset=['indicator_name'], keep='first') dim_indicator_final = dim_indicator[ - ['indicator_name', 'indicator_category', 'unit', 'direction'] + ['indicator_name', 'indicator_category', 'unit', 'direction', 'framework'] ].copy() dim_indicator_final = dim_indicator_final.reset_index(drop=True) dim_indicator_final.insert(0, 'indicator_id', range(1, len(dim_indicator_final) + 1)) @@ -366,6 +414,7 @@ class DimensionalModelLoader: bigquery.SchemaField("indicator_category", "STRING", mode="REQUIRED"), bigquery.SchemaField("unit", "STRING", mode="NULLABLE"), bigquery.SchemaField("direction", "STRING", mode="REQUIRED"), + bigquery.SchemaField("framework", "STRING", mode="REQUIRED"), ] rows_loaded = load_to_bigquery( @@ -374,17 +423,23 @@ class DimensionalModelLoader: ) self._add_primary_key(table_name, 'indicator_id') - for label, col in [('Categories', 'indicator_category'), ('Direction', 'direction')]: + # Log distribusi + for label, col in [ + ('Categories', 'indicator_category'), + ('Direction', 'direction'), + ('Framework', 'framework'), + ]: self.logger.info(f" {label}:") for val, cnt in dim_indicator_final[col].value_counts().items(): - self.logger.info(f" - {val}: {cnt} ({cnt/len(dim_indicator_final)*100:.1f}%)") + pct = cnt / len(dim_indicator_final) * 100 + self.logger.info(f" - {val}: {cnt} ({pct:.1f}%)") self.load_metadata[table_name].update( {'rows_loaded': rows_loaded, 'status': 'success', 'end_time': datetime.now()} ) log_update(self.client, 'DW', table_name, 'full_load', rows_loaded) self._save_table_metadata(table_name) - self.logger.info(f" ✓ dim_indicator: {rows_loaded} rows\n") + self.logger.info(f" dim_indicator: {rows_loaded} rows\n") return rows_loaded except Exception as e: @@ -395,7 +450,7 @@ class DimensionalModelLoader: def load_dim_source(self): table_name = 'dim_source' self.load_metadata[table_name]['start_time'] = datetime.now() - self.logger.info("Loading dim_source → [DW/Gold] fs_asean_gold...") + self.logger.info("Loading dim_source -> [DW/Gold] fs_asean_gold...") try: source_details = { @@ -455,7 +510,7 @@ class DimensionalModelLoader: ) log_update(self.client, 'DW', table_name, 'full_load', rows_loaded) self._save_table_metadata(table_name) - self.logger.info(f" ✓ dim_source: {rows_loaded} rows\n") + self.logger.info(f" dim_source: {rows_loaded} rows\n") return rows_loaded except Exception as e: @@ -466,7 +521,7 @@ class DimensionalModelLoader: def load_dim_pillar(self): table_name = 'dim_pillar' self.load_metadata[table_name]['start_time'] = datetime.now() - self.logger.info("Loading dim_pillar → [DW/Gold] fs_asean_gold...") + self.logger.info("Loading dim_pillar -> [DW/Gold] fs_asean_gold...") try: pillar_codes = { @@ -501,7 +556,7 @@ class DimensionalModelLoader: ) log_update(self.client, 'DW', table_name, 'full_load', rows_loaded) self._save_table_metadata(table_name) - self.logger.info(f" ✓ dim_pillar: {rows_loaded} rows\n") + self.logger.info(f" dim_pillar: {rows_loaded} rows\n") return rows_loaded except Exception as e: @@ -516,10 +571,9 @@ class DimensionalModelLoader: def load_fact_food_security(self): table_name = 'fact_food_security' self.load_metadata[table_name]['start_time'] = datetime.now() - self.logger.info("Loading fact_food_security → [DW/Gold] fs_asean_gold...") + self.logger.info("Loading fact_food_security -> [DW/Gold] fs_asean_gold...") try: - # Load dims dari Gold untuk FK resolution dim_country = read_from_bigquery(self.client, 'dim_country', layer='gold') dim_indicator = read_from_bigquery(self.client, 'dim_indicator', layer='gold') dim_time = read_from_bigquery(self.client, 'dim_time', layer='gold') @@ -561,9 +615,9 @@ class DimensionalModelLoader: fact_table['start_year'] = fact_table['year'].astype(int) fact_table['end_year'] = fact_table['year'].astype(int) - # Resolve FKs fact_table = fact_table.merge( - dim_country[['country_id', 'country_name']].rename(columns={'country_name': 'country'}), + dim_country[['country_id', 'country_name']].rename( + columns={'country_name': 'country'}), on='country', how='left' ) fact_table = fact_table.merge( @@ -576,15 +630,16 @@ class DimensionalModelLoader: on=['start_year', 'end_year'], how='left' ) fact_table = fact_table.merge( - dim_source[['source_id', 'source_name']].rename(columns={'source_name': 'source'}), + dim_source[['source_id', 'source_name']].rename( + columns={'source_name': 'source'}), on='source', how='left' ) fact_table = fact_table.merge( - dim_pillar[['pillar_id', 'pillar_name']].rename(columns={'pillar_name': 'pillar'}), + dim_pillar[['pillar_id', 'pillar_name']].rename( + columns={'pillar_name': 'pillar'}), on='pillar', how='left' ) - # Filter hanya row dengan FK lengkap fact_table = fact_table[ fact_table['country_id'].notna() & fact_table['indicator_id'].notna() & @@ -621,7 +676,6 @@ class DimensionalModelLoader: layer='gold', write_disposition="WRITE_TRUNCATE", schema=schema ) - # Add PK + FKs self._add_primary_key(table_name, 'fact_id') self._add_foreign_key(table_name, 'country_id', 'dim_country', 'country_id') self._add_foreign_key(table_name, 'indicator_id', 'dim_indicator', 'indicator_id') @@ -634,7 +688,7 @@ class DimensionalModelLoader: ) log_update(self.client, 'DW', table_name, 'full_load', rows_loaded) self._save_table_metadata(table_name) - self.logger.info(f" ✓ fact_food_security: {rows_loaded:,} rows\n") + self.logger.info(f" fact_food_security: {rows_loaded:,} rows\n") return rows_loaded except Exception as e: @@ -712,16 +766,36 @@ class DimensionalModelLoader: self.logger.info(f" Unique Sources : {int(stats['unique_sources']):>10,}") self.logger.info(f" Unique Pillars : {int(stats['unique_pillars']):>10,}") + # Validasi distribusi framework di dim_indicator + query_fw = f""" + SELECT framework, COUNT(*) AS count + FROM `{get_table_id('dim_indicator', layer='gold')}` + GROUP BY framework ORDER BY framework + """ + df_fw = self.client.query(query_fw).result().to_dataframe( + create_bqstorage_client=False + ) + if len(df_fw) > 0: + self.logger.info(f"\n Framework Distribution (dim_indicator):") + for _, row in df_fw.iterrows(): + self.logger.info( + f" {row['framework']:10s}: {int(row['count']):>5,} indicators" + ) + query_dir = f""" SELECT direction, COUNT(*) AS count FROM `{get_table_id('dim_indicator', layer='gold')}` GROUP BY direction ORDER BY direction """ - df_dir = self.client.query(query_dir).result().to_dataframe(create_bqstorage_client=False) + df_dir = self.client.query(query_dir).result().to_dataframe( + create_bqstorage_client=False + ) if len(df_dir) > 0: self.logger.info(f"\n Direction Distribution:") for _, row in df_dir.iterrows(): - self.logger.info(f" {row['direction']:15s}: {int(row['count']):>5,} indicators") + self.logger.info( + f" {row['direction']:15s}: {int(row['count']):>5,} indicators" + ) self.logger.info("\n [OK] Validation completed") except Exception as e: @@ -738,22 +812,19 @@ class DimensionalModelLoader: self.pipeline_metadata['rows_fetched'] = len(self.df_clean) self.logger.info("\n" + "=" * 60) - self.logger.info("DIMENSIONAL MODEL LOAD — DW (Gold) → fs_asean_gold") + self.logger.info("DIMENSIONAL MODEL LOAD — DW (Gold) -> fs_asean_gold") self.logger.info("=" * 60) - # Dimensions - self.logger.info("\nLOADING DIMENSION TABLES → fs_asean_gold") + self.logger.info("\nLOADING DIMENSION TABLES -> fs_asean_gold") self.load_dim_country() self.load_dim_indicator() self.load_dim_time() self.load_dim_source() self.load_dim_pillar() - # Fact - self.logger.info("\nLOADING FACT TABLE → fs_asean_gold") + self.logger.info("\nLOADING FACT TABLE -> fs_asean_gold") self.load_fact_food_security() - # Validate self.validate_constraints() self.validate_data_load() @@ -762,22 +833,23 @@ class DimensionalModelLoader: total_loaded = sum(m['rows_loaded'] for m in self.load_metadata.values()) self.pipeline_metadata.update({ - 'end_time' : pipeline_end, - 'duration_seconds' : duration, - 'rows_transformed' : total_loaded, - 'rows_loaded' : total_loaded, + 'end_time' : pipeline_end, + 'duration_seconds' : duration, + 'rows_transformed' : total_loaded, + 'rows_loaded' : total_loaded, 'execution_timestamp': self.pipeline_metadata['start_time'], - 'completeness_pct' : 100.0, - 'config_snapshot' : json.dumps({'load_mode': 'full_refresh', 'layer': 'gold'}), - 'validation_metrics': json.dumps({t: m['status'] for t, m in self.load_metadata.items()}), - 'table_name' : 'dimensional_model_pipeline', + 'completeness_pct' : 100.0, + 'config_snapshot' : json.dumps({'load_mode': 'full_refresh', 'layer': 'gold'}), + 'validation_metrics' : json.dumps( + {t: m['status'] for t, m in self.load_metadata.items()} + ), + 'table_name' : 'dimensional_model_pipeline', }) try: save_etl_metadata(self.client, self.pipeline_metadata) except Exception as e: self.logger.warning(f" [WARN] Could not save pipeline metadata: {e}") - # Summary self.logger.info("\n" + "=" * 60) self.logger.info("DIMENSIONAL MODEL LOAD COMPLETED") self.logger.info("=" * 60) @@ -785,20 +857,19 @@ class DimensionalModelLoader: self.logger.info(f" Duration : {duration:.2f}s") self.logger.info(f" Tables :") for tbl, meta in self.load_metadata.items(): - icon = "✓" if meta['status'] == 'success' else "✗" - self.logger.info(f" {icon} {tbl:25s}: {meta['rows_loaded']:>10,} rows") - self.logger.info(f"\n Metadata → [AUDIT] etl_metadata") + icon = "OK" if meta['status'] == 'success' else "FAIL" + self.logger.info(f" [{icon}] {tbl:25s}: {meta['rows_loaded']:>10,} rows") + self.logger.info(f"\n Metadata -> [AUDIT] etl_metadata") self.logger.info("=" * 60) # ============================================================================= -# AIRFLOW TASK FUNCTIONS ← sama polanya dengan raw & cleaned layer +# AIRFLOW TASK FUNCTIONS # ============================================================================= def run_dimensional_model(): """ Airflow task: Load dimensional model dari cleaned_integrated. - Dipanggil oleh DAG setelah task cleaned_integration_to_silver selesai. """ from scripts.bigquery_config import get_bigquery_client @@ -817,9 +888,9 @@ if __name__ == "__main__": print("=" * 60) print("BIGQUERY DIMENSIONAL MODEL LOAD") print("Kimball DW Architecture") - print(" Input : STAGING (Silver) → cleaned_integrated (fs_asean_silver)") - print(" Output : DW (Gold) → dim_*, fact_* (fs_asean_gold)") - print(" Audit : AUDIT → etl_logs, etl_metadata (fs_asean_audit)") + print(" Input : STAGING (Silver) -> cleaned_integrated (fs_asean_silver)") + print(" Output : DW (Gold) -> dim_*, fact_* (fs_asean_gold)") + print(" Audit : AUDIT -> etl_logs, etl_metadata (fs_asean_audit)") print("=" * 60) logger = setup_logging() @@ -827,24 +898,26 @@ if __name__ == "__main__": print("\nLoading cleaned_integrated (fs_asean_silver)...") df_clean = read_from_bigquery(client, 'cleaned_integrated', layer='silver') - print(f" ✓ Loaded : {len(df_clean):,} rows") + print(f" Loaded : {len(df_clean):,} rows") print(f" Columns : {len(df_clean.columns)}") print(f" Sources : {df_clean['source'].nunique()}") print(f" Indicators : {df_clean['indicator_standardized'].nunique()}") print(f" Countries : {df_clean['country'].nunique()}") - print(f" Year range : {int(df_clean['year'].min())}–{int(df_clean['year'].max())}") + print(f" Year range : {int(df_clean['year'].min())}-{int(df_clean['year'].max())}") if 'direction' in df_clean.columns: print(f" Direction : {df_clean['direction'].value_counts().to_dict()}") + if 'framework' in df_clean.columns: + print(f" Framework : {df_clean['framework'].value_counts().to_dict()}") else: - print(f" [WARN] direction column not found — run bigquery_cleaned_layer.py first") + print(" [WARN] framework column not found — run bigquery_cleaned_layer.py first") - print("\n[1/1] Dimensional Model Load → DW (Gold)...") + print("\n[1/1] Dimensional Model Load -> DW (Gold)...") loader = DimensionalModelLoader(client, df_clean) loader.run() print("\n" + "=" * 60) - print("✓ DIMENSIONAL MODEL ETL COMPLETED") - print(" 🥇 DW (Gold) : dim_country, dim_indicator, dim_time,") - print(" dim_source, dim_pillar, fact_food_security") - print(" 📋 AUDIT : etl_logs, etl_metadata") + print("[OK] DIMENSIONAL MODEL ETL COMPLETED") + print(" DW (Gold) : dim_country, dim_indicator (+ framework),") + print(" dim_time, dim_source, dim_pillar, fact_food_security") + print(" AUDIT : etl_logs, etl_metadata") print("=" * 60) \ No newline at end of file