diff --git a/dags/etl_food_security.py b/dags/etl_food_security.py index c031db5..23843f6 100644 --- a/dags/etl_food_security.py +++ b/dags/etl_food_security.py @@ -1,6 +1,6 @@ from airflow import DAG from airflow.operators.python import PythonOperator -from datetime import datetime +from datetime import datetime, timedelta # Import fungsi dari folder scripts from scripts.bigquery_raw_layer import ( @@ -19,11 +19,21 @@ from scripts.bigquery_dimesional_model import ( run_dimensional_model, ) +from scripts.bigquery_analytical_layer import ( + run_analytical_layer, +) + +from scripts.bigquery_aggregate_layer import ( + run_aggregation, +) + with DAG( dag_id = "etl_food_security_bigquery", + description = "Kimball ETL: FAO, World Bank, UNICEF to BigQuery (Bronze to Silver to Gold)", start_date = datetime(2026, 3, 1), schedule_interval = "@daily", + schedule_interval = timedelta(days=3), catchup = False, tags = ["food-security", "bigquery", "kimball"] ) as dag: @@ -63,5 +73,15 @@ with DAG( python_callable = run_dimensional_model ) + task_analytical = PythonOperator( + task_id = "analytical_layer_to_gold", + python_callable = run_analytical_layer + ) + + task_aggregation = PythonOperator( + task_id = "aggregation_to_gold", + python_callable = run_aggregation + ) + - task_verify >> task_fao >> task_worldbank >> task_unicef >> task_staging >> task_cleaned >> task_dimensional \ No newline at end of file + task_verify >> task_fao >> task_worldbank >> task_unicef >> task_staging >> task_cleaned >> task_dimensional >> task_analytical >> task_aggregation \ No newline at end of file diff --git a/scripts/bigquery_aggregate_layer.py b/scripts/bigquery_aggregate_layer.py new file mode 100644 index 0000000..7f7c6b7 --- /dev/null +++ b/scripts/bigquery_aggregate_layer.py @@ -0,0 +1,774 @@ +""" +BIGQUERY ANALYSIS LAYER - FOOD SECURITY AGGREGATION +Semua agregasi pakai norm_value dari _get_norm_value_df() +FIXED: Hanya simpan 4 tabel ke fs_asean_gold (layer='gold'): + - agg_pillar_composite + - agg_pillar_by_country + - agg_framework_by_country + - agg_framework_asean +""" + +import pandas as pd +import numpy as np +from datetime import datetime +import logging +import json +import sys as _sys + +from scripts.bigquery_config import get_bigquery_client +from scripts.bigquery_helpers import ( + log_update, + load_to_bigquery, + read_from_bigquery, + setup_logging, + save_etl_metadata, +) +from google.cloud import bigquery +from sklearn.preprocessing import MinMaxScaler + + +# ============================================================================= +# KONSTANTA GLOBAL +# ============================================================================= + +DIRECTION_INVERT_KEYWORDS = frozenset({ + "negative", "lower_better", "lower_is_better", "inverse", "neg", +}) + +DIRECTION_POSITIVE_KEYWORDS = frozenset({ + "positive", "higher_better", "higher_is_better", +}) + +NORMALIZE_FRAMEWORKS_JOINTLY = False + + +# ============================================================================= +# Windows CP1252 safe logging +# ============================================================================= + +class _SafeStreamHandler(logging.StreamHandler): + def emit(self, record): + try: + super().emit(record) + except UnicodeEncodeError: + try: + msg = self.format(record) + self.stream.write( + msg.encode("utf-8", errors="replace").decode("ascii", errors="replace") + + self.terminator + ) + self.flush() + except Exception: + self.handleError(record) + + +# ============================================================================= +# HELPERS +# ============================================================================= + +def _should_invert(direction: str, logger=None, context: str = "") -> bool: + d = str(direction).lower().strip() + if d in DIRECTION_INVERT_KEYWORDS: + return True + if d in DIRECTION_POSITIVE_KEYWORDS: + return False + if logger: + logger.warning( + f" [DIRECTION WARNING] Unknown direction '{direction}' " + f"{'(' + context + ')' if context else ''}. Defaulting to positive (no invert)." + ) + return False + + +def global_minmax(series: pd.Series, lo: float = 1.0, hi: float = 100.0) -> pd.Series: + values = series.dropna().values + if len(values) == 0: + return pd.Series(np.nan, index=series.index) + v_min, v_max = values.min(), values.max() + if v_min == v_max: + return pd.Series((lo + hi) / 2.0, index=series.index) + scaler = MinMaxScaler(feature_range=(lo, hi)) + result = np.full(len(series), np.nan) + not_nan = series.notna() + result[not_nan.values] = scaler.fit_transform( + series[not_nan].values.reshape(-1, 1) + ).flatten() + return pd.Series(result, index=series.index) + + +def add_yoy(df: pd.DataFrame, group_cols: list, score_col: str) -> pd.DataFrame: + df = df.sort_values(group_cols + ["year"]).reset_index(drop=True) + if group_cols: + df["year_over_year_change"] = df.groupby(group_cols)[score_col].diff() + else: + df["year_over_year_change"] = df[score_col].diff() + return df + + +def safe_int(series: pd.Series, fill: int = 0, col_name: str = "", logger=None) -> pd.Series: + n_nan = series.isna().sum() + if n_nan > 0 and logger: + logger.warning( + f" [NaN WARNING] Kolom '{col_name}' punya {n_nan} NaN -> di-fill dengan {fill}" + ) + return series.fillna(fill).astype(int) + + +def check_and_dedup(df: pd.DataFrame, key_cols: list, context: str = "", logger=None) -> pd.DataFrame: + dupes = df.duplicated(subset=key_cols, keep=False) + if dupes.any(): + n_dupes = dupes.sum() + if logger: + logger.warning( + f" [DEDUP WARNING] {context}: {n_dupes} duplikat rows pada {key_cols}. " + f"Di-aggregate dengan mean." + ) + numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist() + agg_dict = { + c: ("mean" if c in numeric_cols else "first") + for c in df.columns if c not in key_cols + } + df = df.groupby(key_cols, as_index=False).agg(agg_dict) + return df + + +# ============================================================================= +# MAIN CLASS +# ============================================================================= + +class FoodSecurityAggregator: + + def __init__(self, client: bigquery.Client): + self.client = client + self.logger = logging.getLogger(self.__class__.__name__) + self.logger.propagate = False + + self.load_metadata = { + "agg_pillar_composite": {"rows_loaded": 0, "status": "pending", "start_time": None, "end_time": None}, + "agg_pillar_by_country": {"rows_loaded": 0, "status": "pending", "start_time": None, "end_time": None}, + "agg_framework_by_country": {"rows_loaded": 0, "status": "pending", "start_time": None, "end_time": None}, + "agg_framework_asean": {"rows_loaded": 0, "status": "pending", "start_time": None, "end_time": None}, + } + + self.df = None + self.dims = {} + + self.sdgs_start_year = None + self.mdgs_indicator_ids = set() + self.sdgs_indicator_ids = set() + + # ========================================================================= + # STEP 1: Load data dari Gold layer + # ========================================================================= + + def load_data(self): + self.logger.info("=" * 70) + self.logger.info("STEP 1: LOAD DATA from fs_asean_gold") + self.logger.info("=" * 70) + + self.df = read_from_bigquery(self.client, "analytical_food_security", layer='gold') + self.logger.info(f" analytical_food_security : {len(self.df):,} rows") + + self.dims["country"] = read_from_bigquery(self.client, "dim_country", layer='gold') + self.dims["indicator"] = read_from_bigquery(self.client, "dim_indicator", layer='gold') + self.dims["pillar"] = read_from_bigquery(self.client, "dim_pillar", layer='gold') + self.dims["time"] = read_from_bigquery(self.client, "dim_time", layer='gold') + + ind_cols = ["indicator_id"] + if "direction" in self.dims["indicator"].columns: + ind_cols.append("direction") + + self.df = ( + self.df + .merge(self.dims["time"][["time_id", "year"]], on="time_id", how="left") + .merge(self.dims["country"][["country_id", "country_name"]], on="country_id", how="left") + .merge(self.dims["pillar"][["pillar_id", "pillar_name"]], on="pillar_id", how="left") + .merge(self.dims["indicator"][ind_cols], on="indicator_id", how="left") + ) + + if "direction" not in self.df.columns: + self.df["direction"] = "positive" + else: + n_null_dir = self.df["direction"].isna().sum() + if n_null_dir > 0: + self.logger.warning(f" [DIRECTION] {n_null_dir} rows dengan direction NULL -> diisi 'positive'") + self.df["direction"] = self.df["direction"].fillna("positive") + + dir_dist = self.df.drop_duplicates("indicator_id")["direction"].value_counts() + self.logger.info(f"\n Distribusi direction per indikator:") + for d, cnt in dir_dist.items(): + tag = "INVERT" if _should_invert(d, self.logger, "load_data check") else "normal" + self.logger.info(f" {d:<25} : {cnt:>3} indikator [{tag}]") + + self.logger.info(f"\n Setelah join: {len(self.df):,} rows") + self.logger.info(f" Negara : {self.df['country_id'].nunique()}") + self.logger.info(f" Indikator : {self.df['indicator_id'].nunique()}") + self.logger.info(f" Tahun : {int(self.df['year'].min())} - {int(self.df['year'].max())}") + + # ========================================================================= + # STEP 1b: Klasifikasi indikator ke MDGs / SDGs + # ========================================================================= + + def _classify_indicators(self): + self.logger.info("\n" + "=" * 70) + self.logger.info("STEP 1b: KLASIFIKASI INDIKATOR -> MDGs / SDGs") + self.logger.info("=" * 70) + + ind_min_year = ( + self.df.groupby("indicator_id")["year"] + .min().reset_index() + .rename(columns={"year": "min_year"}) + ) + + unique_years = sorted(ind_min_year["min_year"].unique()) + self.logger.info(f"\n Unique min_year per indikator: {unique_years}") + + if len(unique_years) == 1: + gap_threshold = unique_years[0] + 1 + self.logger.info(" Hanya 1 cluster -> semua = MDGs") + else: + gaps = [ + (unique_years[i+1] - unique_years[i], unique_years[i], unique_years[i+1]) + for i in range(len(unique_years) - 1) + ] + gaps.sort(reverse=True) + largest_gap_size, y_before, y_after = gaps[0] + gap_threshold = y_after + self.logger.info(f" Gap terbesar: {y_before} -> {y_after} (selisih {largest_gap_size})") + + ind_min_year["framework"] = ind_min_year["min_year"].apply( + lambda y: "MDGs" if int(y) < gap_threshold else "SDGs" + ) + + sdgs_rows = ind_min_year[ind_min_year["framework"] == "SDGs"] + self.sdgs_start_year = int(sdgs_rows["min_year"].min()) if not sdgs_rows.empty else int(self.df["year"].max()) + 1 + + self.logger.info(f" sdgs_start_year: {self.sdgs_start_year}") + + self.mdgs_indicator_ids = set(ind_min_year[ind_min_year["framework"] == "MDGs"]["indicator_id"].tolist()) + self.sdgs_indicator_ids = set(ind_min_year[ind_min_year["framework"] == "SDGs"]["indicator_id"].tolist()) + + self.logger.info(f" MDGs: {len(self.mdgs_indicator_ids)} indicators") + self.logger.info(f" SDGs: {len(self.sdgs_indicator_ids)} indicators") + + self.df = self.df.merge(ind_min_year[["indicator_id", "framework"]], on="indicator_id", how="left") + + # ========================================================================= + # CORE HELPER: normalisasi raw value per indikator + # ========================================================================= + + def _get_norm_value_df(self) -> pd.DataFrame: + if "framework" not in self.df.columns: + raise ValueError("Kolom 'framework' tidak ada. Pastikan _classify_indicators() dipanggil lebih dulu.") + + norm_parts = [] + for ind_id, grp in self.df.groupby("indicator_id"): + grp = grp.copy() + direction = str(grp["direction"].iloc[0]) + do_invert = _should_invert(direction, self.logger, context=f"indicator_id={ind_id}") + valid_mask = grp["value"].notna() + n_valid = valid_mask.sum() + + if n_valid < 2: + grp["norm_value"] = np.nan + norm_parts.append(grp) + continue + + scaler = MinMaxScaler(feature_range=(0, 1)) + normed = np.full(len(grp), np.nan) + normed[valid_mask.values] = scaler.fit_transform( + grp.loc[valid_mask, ["value"]] + ).flatten() + + if do_invert: + normed = np.where(np.isnan(normed), np.nan, 1.0 - normed) + + grp["norm_value"] = normed + norm_parts.append(grp) + + return pd.concat(norm_parts, ignore_index=True) + + # ========================================================================= + # STEP 2: agg_pillar_composite -> Gold + # ========================================================================= + + def calc_pillar_composite(self) -> pd.DataFrame: + table_name = "agg_pillar_composite" + self.load_metadata[table_name]["start_time"] = datetime.now() + self.logger.info("\n" + "=" * 70) + self.logger.info(f"STEP 2: {table_name} -> [Gold] fs_asean_gold") + self.logger.info("=" * 70) + + df_normed = self._get_norm_value_df() + + df = ( + df_normed + .groupby(["pillar_id", "pillar_name", "year"]) + .agg( + pillar_norm =("norm_value", "mean"), + n_indicators=("indicator_id", "nunique"), + n_countries =("country_id", "nunique"), + ) + .reset_index() + ) + + df["pillar_score_1_100"] = global_minmax(df["pillar_norm"]) + df["rank_in_year"] = ( + df.groupby("year")["pillar_score_1_100"] + .rank(method="min", ascending=False) + .astype(int) + ) + df = add_yoy(df, ["pillar_id"], "pillar_score_1_100") + + df["pillar_id"] = df["pillar_id"].astype(int) + df["year"] = df["year"].astype(int) + df["n_indicators"] = safe_int(df["n_indicators"], col_name="n_indicators", logger=self.logger) + df["n_countries"] = safe_int(df["n_countries"], col_name="n_countries", logger=self.logger) + df["rank_in_year"] = df["rank_in_year"].astype(int) + df["pillar_norm"] = df["pillar_norm"].astype(float) + df["pillar_score_1_100"] = df["pillar_score_1_100"].astype(float) + + schema = [ + bigquery.SchemaField("pillar_id", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("pillar_name", "STRING", mode="REQUIRED"), + bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("pillar_norm", "FLOAT", mode="REQUIRED"), + bigquery.SchemaField("n_indicators", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("n_countries", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("pillar_score_1_100", "FLOAT", mode="REQUIRED"), + bigquery.SchemaField("rank_in_year", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("year_over_year_change", "FLOAT", mode="NULLABLE"), + ] + rows = load_to_bigquery(self.client, df, table_name, layer='gold', write_disposition="WRITE_TRUNCATE", schema=schema) + self._finalize(table_name, rows) + return df + + # ========================================================================= + # STEP 3: agg_pillar_by_country -> Gold + # ========================================================================= + + def calc_pillar_by_country(self) -> pd.DataFrame: + table_name = "agg_pillar_by_country" + self.load_metadata[table_name]["start_time"] = datetime.now() + self.logger.info("\n" + "=" * 70) + self.logger.info(f"STEP 3: {table_name} -> [Gold] fs_asean_gold") + self.logger.info("=" * 70) + + df_normed = self._get_norm_value_df() + + df = ( + df_normed + .groupby(["country_id", "country_name", "pillar_id", "pillar_name", "year"]) + .agg(pillar_country_norm=("norm_value", "mean")) + .reset_index() + ) + + df["pillar_country_score_1_100"] = global_minmax(df["pillar_country_norm"]) + df["rank_in_pillar_year"] = ( + df.groupby(["pillar_id", "year"])["pillar_country_score_1_100"] + .rank(method="min", ascending=False) + .astype(int) + ) + df = add_yoy(df, ["country_id", "pillar_id"], "pillar_country_score_1_100") + + df["country_id"] = df["country_id"].astype(int) + df["pillar_id"] = df["pillar_id"].astype(int) + df["year"] = df["year"].astype(int) + df["rank_in_pillar_year"] = df["rank_in_pillar_year"].astype(int) + df["pillar_country_norm"] = df["pillar_country_norm"].astype(float) + df["pillar_country_score_1_100"] = df["pillar_country_score_1_100"].astype(float) + + schema = [ + bigquery.SchemaField("country_id", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("country_name", "STRING", mode="REQUIRED"), + bigquery.SchemaField("pillar_id", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("pillar_name", "STRING", mode="REQUIRED"), + bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("pillar_country_norm", "FLOAT", mode="REQUIRED"), + bigquery.SchemaField("pillar_country_score_1_100", "FLOAT", mode="REQUIRED"), + bigquery.SchemaField("rank_in_pillar_year", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("year_over_year_change", "FLOAT", mode="NULLABLE"), + ] + rows = load_to_bigquery(self.client, df, table_name, layer='gold', write_disposition="WRITE_TRUNCATE", schema=schema) + self._finalize(table_name, rows) + return df + + # ========================================================================= + # STEP 4: agg_framework_by_country -> Gold + # ========================================================================= + + def _calc_country_composite_inmemory(self) -> pd.DataFrame: + """Hitung country composite in-memory (tidak disimpan ke BQ).""" + df_normed = self._get_norm_value_df() + df = ( + df_normed + .groupby(["country_id", "country_name", "year"]) + .agg( + composite_score=("norm_value", "mean"), + n_indicators =("indicator_id", "nunique"), + ) + .reset_index() + ) + df["score_1_100"] = global_minmax(df["composite_score"]) + df["rank_in_asean"] = ( + df.groupby("year")["score_1_100"] + .rank(method="min", ascending=False) + .astype(int) + ) + df = add_yoy(df, ["country_id"], "score_1_100") + df["country_id"] = df["country_id"].astype(int) + df["year"] = df["year"].astype(int) + df["n_indicators"] = safe_int(df["n_indicators"], col_name="n_indicators", logger=self.logger) + df["composite_score"] = df["composite_score"].astype(float) + df["score_1_100"] = df["score_1_100"].astype(float) + df["rank_in_asean"] = df["rank_in_asean"].astype(int) + return df + + def calc_framework_by_country(self) -> pd.DataFrame: + table_name = "agg_framework_by_country" + self.load_metadata[table_name]["start_time"] = datetime.now() + self.logger.info("\n" + "=" * 70) + self.logger.info(f"STEP 4: {table_name} -> [Gold] fs_asean_gold") + self.logger.info("=" * 70) + + country_composite = self._calc_country_composite_inmemory() + df_normed = self._get_norm_value_df() + parts = [] + + # Layer TOTAL + agg_total = ( + country_composite[[ + "country_id", "country_name", "year", + "score_1_100", "n_indicators", "composite_score" + ]] + .copy() + .rename(columns={"score_1_100": "framework_score_1_100", "composite_score": "framework_norm"}) + ) + agg_total["framework"] = "Total" + parts.append(agg_total) + + # Layer MDGs — Era pre-SDGs = Total + pre_sdgs_rows = country_composite[country_composite["year"] < self.sdgs_start_year].copy() + if not pre_sdgs_rows.empty: + mdgs_pre = ( + pre_sdgs_rows[["country_id", "country_name", "year", "score_1_100", "n_indicators", "composite_score"]] + .copy() + .rename(columns={"score_1_100": "framework_score_1_100", "composite_score": "framework_norm"}) + ) + mdgs_pre["framework"] = "MDGs" + parts.append(mdgs_pre) + + # Layer MDGs — Era mixed + if self.mdgs_indicator_ids: + df_mdgs_mixed = df_normed[ + (df_normed["indicator_id"].isin(self.mdgs_indicator_ids)) & + (df_normed["year"] >= self.sdgs_start_year) + ].copy() + if not df_mdgs_mixed.empty: + agg_mdgs_mixed = ( + df_mdgs_mixed + .groupby(["country_id", "country_name", "year"]) + .agg(framework_norm=("norm_value", "mean"), n_indicators=("indicator_id", "nunique")) + .reset_index() + ) + if not NORMALIZE_FRAMEWORKS_JOINTLY: + agg_mdgs_mixed["framework_score_1_100"] = global_minmax(agg_mdgs_mixed["framework_norm"]) + agg_mdgs_mixed["framework"] = "MDGs" + parts.append(agg_mdgs_mixed) + + # Layer SDGs + if self.sdgs_indicator_ids: + df_sdgs = df_normed[ + (df_normed["indicator_id"].isin(self.sdgs_indicator_ids)) & + (df_normed["year"] >= self.sdgs_start_year) + ].copy() + if not df_sdgs.empty: + agg_sdgs = ( + df_sdgs + .groupby(["country_id", "country_name", "year"]) + .agg(framework_norm=("norm_value", "mean"), n_indicators=("indicator_id", "nunique")) + .reset_index() + ) + if not NORMALIZE_FRAMEWORKS_JOINTLY: + agg_sdgs["framework_score_1_100"] = global_minmax(agg_sdgs["framework_norm"]) + agg_sdgs["framework"] = "SDGs" + parts.append(agg_sdgs) + + df = pd.concat(parts, ignore_index=True) + + if NORMALIZE_FRAMEWORKS_JOINTLY: + mixed_mask = (df["framework"].isin(["MDGs", "SDGs"])) & (df["year"] >= self.sdgs_start_year) + if mixed_mask.any(): + df.loc[mixed_mask, "framework_score_1_100"] = global_minmax(df.loc[mixed_mask, "framework_norm"]) + + df = check_and_dedup(df, ["country_id", "framework", "year"], context=table_name, logger=self.logger) + df["rank_in_framework_year"] = ( + df.groupby(["framework", "year"])["framework_score_1_100"] + .rank(method="min", ascending=False) + .astype(int) + ) + df = add_yoy(df, ["country_id", "framework"], "framework_score_1_100") + + df["country_id"] = df["country_id"].astype(int) + df["year"] = df["year"].astype(int) + df["n_indicators"] = safe_int(df["n_indicators"], col_name="n_indicators", logger=self.logger) + df["rank_in_framework_year"] = safe_int(df["rank_in_framework_year"], col_name="rank_in_framework_year", logger=self.logger) + df["framework_norm"] = df["framework_norm"].astype(float) + df["framework_score_1_100"] = df["framework_score_1_100"].astype(float) + + self._validate_mdgs_equals_total(df, level="country") + + schema = [ + bigquery.SchemaField("country_id", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("country_name", "STRING", mode="REQUIRED"), + bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("framework", "STRING", mode="REQUIRED"), + bigquery.SchemaField("n_indicators", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("framework_norm", "FLOAT", mode="REQUIRED"), + bigquery.SchemaField("framework_score_1_100", "FLOAT", mode="REQUIRED"), + bigquery.SchemaField("rank_in_framework_year", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("year_over_year_change", "FLOAT", mode="NULLABLE"), + ] + rows = load_to_bigquery(self.client, df, table_name, layer='gold', write_disposition="WRITE_TRUNCATE", schema=schema) + self._finalize(table_name, rows) + return df + + # ========================================================================= + # STEP 5: agg_framework_asean -> Gold + # ========================================================================= + + def calc_framework_asean(self) -> pd.DataFrame: + table_name = "agg_framework_asean" + self.load_metadata[table_name]["start_time"] = datetime.now() + self.logger.info("\n" + "=" * 70) + self.logger.info(f"STEP 5: {table_name} -> [Gold] fs_asean_gold") + self.logger.info("=" * 70) + + df_normed = self._get_norm_value_df() + country_composite = self._calc_country_composite_inmemory() + + country_norm = ( + df_normed.groupby(["country_id", "country_name", "year"])["norm_value"] + .mean().reset_index().rename(columns={"norm_value": "country_norm"}) + ) + asean_overall = ( + country_norm.groupby("year") + .agg(asean_norm=("country_norm", "mean"), std_norm=("country_norm", "std"), + n_countries=("country_norm", "count")) + .reset_index() + ) + asean_overall["asean_score_1_100"] = global_minmax(asean_overall["asean_norm"]) + asean_comp = ( + country_composite.groupby("year")["composite_score"] + .mean().reset_index().rename(columns={"composite_score": "asean_composite"}) + ) + asean_overall = asean_overall.merge(asean_comp, on="year", how="left") + + parts = [] + + # Layer TOTAL + total_cols = asean_overall[["year", "asean_score_1_100", "asean_norm", "std_norm", "n_countries"]].copy() + total_cols = total_cols.rename(columns={ + "asean_score_1_100": "framework_score_1_100", + "asean_norm": "framework_norm", + "n_countries": "n_countries_with_data", + }) + n_ind_total = df_normed.groupby("year")["indicator_id"].nunique().reset_index().rename(columns={"indicator_id": "n_indicators"}) + total_cols = total_cols.merge(n_ind_total, on="year", how="left") + total_cols["framework"] = "Total" + parts.append(total_cols) + + # Layer MDGs — pre-SDGs = Total + pre_sdgs = asean_overall[asean_overall["year"] < self.sdgs_start_year].copy() + if not pre_sdgs.empty: + mdgs_pre = pre_sdgs[["year", "asean_score_1_100", "asean_norm", "std_norm", "n_countries"]].copy() + mdgs_pre = mdgs_pre.rename(columns={ + "asean_score_1_100": "framework_score_1_100", + "asean_norm": "framework_norm", + "n_countries": "n_countries_with_data", + }) + n_ind_pre = df_normed[df_normed["year"] < self.sdgs_start_year].groupby("year")["indicator_id"].nunique().reset_index().rename(columns={"indicator_id": "n_indicators"}) + mdgs_pre = mdgs_pre.merge(n_ind_pre, on="year", how="left") + mdgs_pre["framework"] = "MDGs" + parts.append(mdgs_pre) + + # Layer MDGs — mixed + if self.mdgs_indicator_ids: + df_mdgs_mixed = df_normed[ + (df_normed["indicator_id"].isin(self.mdgs_indicator_ids)) & + (df_normed["year"] >= self.sdgs_start_year) + ].copy() + if not df_mdgs_mixed.empty: + cn = df_mdgs_mixed.groupby(["country_id", "year"])["norm_value"].mean().reset_index().rename(columns={"norm_value": "country_norm"}) + asean_mdgs = cn.groupby("year").agg( + framework_norm=("country_norm", "mean"), + std_norm=("country_norm", "std"), + n_countries_with_data=("country_id", "count"), + ).reset_index() + n_ind_mdgs = df_mdgs_mixed.groupby("year")["indicator_id"].nunique().reset_index().rename(columns={"indicator_id": "n_indicators"}) + asean_mdgs = asean_mdgs.merge(n_ind_mdgs, on="year", how="left") + if not NORMALIZE_FRAMEWORKS_JOINTLY: + asean_mdgs["framework_score_1_100"] = global_minmax(asean_mdgs["framework_norm"]) + asean_mdgs["framework"] = "MDGs" + parts.append(asean_mdgs) + + # Layer SDGs + if self.sdgs_indicator_ids: + df_sdgs = df_normed[ + (df_normed["indicator_id"].isin(self.sdgs_indicator_ids)) & + (df_normed["year"] >= self.sdgs_start_year) + ].copy() + if not df_sdgs.empty: + cn = df_sdgs.groupby(["country_id", "year"])["norm_value"].mean().reset_index().rename(columns={"norm_value": "country_norm"}) + asean_sdgs = cn.groupby("year").agg( + framework_norm=("country_norm", "mean"), + std_norm=("country_norm", "std"), + n_countries_with_data=("country_id", "count"), + ).reset_index() + n_ind_sdgs = df_sdgs.groupby("year")["indicator_id"].nunique().reset_index().rename(columns={"indicator_id": "n_indicators"}) + asean_sdgs = asean_sdgs.merge(n_ind_sdgs, on="year", how="left") + if not NORMALIZE_FRAMEWORKS_JOINTLY: + asean_sdgs["framework_score_1_100"] = global_minmax(asean_sdgs["framework_norm"]) + asean_sdgs["framework"] = "SDGs" + parts.append(asean_sdgs) + + df = pd.concat(parts, ignore_index=True) + + if NORMALIZE_FRAMEWORKS_JOINTLY: + mixed_mask = (df["framework"].isin(["MDGs", "SDGs"])) & (df["year"] >= self.sdgs_start_year) + if mixed_mask.any(): + df.loc[mixed_mask, "framework_score_1_100"] = global_minmax(df.loc[mixed_mask, "framework_norm"]) + + df = check_and_dedup(df, ["framework", "year"], context=table_name, logger=self.logger) + df = add_yoy(df, ["framework"], "framework_score_1_100") + + df["year"] = df["year"].astype(int) + df["n_indicators"] = safe_int(df["n_indicators"], col_name="n_indicators", logger=self.logger) + df["n_countries_with_data"] = safe_int(df["n_countries_with_data"], col_name="n_countries_with_data", logger=self.logger) + for col in ["framework_norm", "std_norm", "framework_score_1_100"]: + df[col] = df[col].astype(float) + + self._validate_mdgs_equals_total(df, level="asean") + + schema = [ + bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("framework", "STRING", mode="REQUIRED"), + bigquery.SchemaField("n_indicators", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("n_countries_with_data", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("framework_norm", "FLOAT", mode="REQUIRED"), + bigquery.SchemaField("std_norm", "FLOAT", mode="NULLABLE"), + bigquery.SchemaField("framework_score_1_100", "FLOAT", mode="REQUIRED"), + bigquery.SchemaField("year_over_year_change", "FLOAT", mode="NULLABLE"), + ] + rows = load_to_bigquery(self.client, df, table_name, layer='gold', write_disposition="WRITE_TRUNCATE", schema=schema) + self._finalize(table_name, rows) + return df + + # ========================================================================= + # HELPERS + # ========================================================================= + + def _validate_mdgs_equals_total(self, df: pd.DataFrame, level: str = ""): + self.logger.info(f"\n Validasi MDGs < {self.sdgs_start_year} == Total [{level}]:") + group_by = ["year"] if level.startswith("asean") else ["country_id", "year"] + mdgs_pre = df[(df["framework"] == "MDGs") & (df["year"] < self.sdgs_start_year)][group_by + ["framework_score_1_100"]].rename(columns={"framework_score_1_100": "mdgs_score"}) + total_pre = df[(df["framework"] == "Total") & (df["year"] < self.sdgs_start_year)][group_by + ["framework_score_1_100"]].rename(columns={"framework_score_1_100": "total_score"}) + if mdgs_pre.empty and total_pre.empty: + self.logger.info(f" -> Tidak ada data pre-{self.sdgs_start_year} (skip)") + return + if mdgs_pre.empty or total_pre.empty: + self.logger.warning(f" -> [WARNING] Salah satu kosong: MDGs={len(mdgs_pre)}, Total={len(total_pre)}") + return + check = mdgs_pre.merge(total_pre, on=group_by) + max_diff = (check["mdgs_score"] - check["total_score"]).abs().max() + status = "OK (identik)" if max_diff < 0.01 else f"MISMATCH! max_diff={max_diff:.6f}" + self.logger.info(f" -> {status} (n_checked={len(check)})") + + def _finalize(self, table_name: str, rows_loaded: int): + self.load_metadata[table_name].update({ + "rows_loaded": rows_loaded, "status": "success", "end_time": datetime.now(), + }) + log_update(self.client, "DW", table_name, "full_load", rows_loaded) + self.logger.info(f" ✓ {table_name}: {rows_loaded:,} rows → [Gold] fs_asean_gold") + self.logger.info(f" Metadata → [AUDIT] etl_logs") + + def _fail(self, table_name: str, error: Exception): + self.load_metadata[table_name].update({"status": "failed", "end_time": datetime.now()}) + self.logger.error(f" [FAIL] {table_name}: {error}") + log_update(self.client, "DW", table_name, "full_load", 0, "failed", str(error)) + + # ========================================================================= + # RUN + # ========================================================================= + + def run(self): + start = datetime.now() + self.logger.info("\n" + "=" * 70) + self.logger.info("FOOD SECURITY AGGREGATION v8.0 — 4 TABLES -> fs_asean_gold") + self.logger.info("=" * 70) + + self.load_data() + self._classify_indicators() + self.calc_pillar_composite() + self.calc_pillar_by_country() + self.calc_framework_by_country() + self.calc_framework_asean() + + duration = (datetime.now() - start).total_seconds() + total_rows = sum(m["rows_loaded"] for m in self.load_metadata.values()) + + self.logger.info("\n" + "=" * 70) + self.logger.info("SELESAI") + self.logger.info("=" * 70) + self.logger.info(f" Durasi : {duration:.2f}s") + self.logger.info(f" Total rows : {total_rows:,}") + for tbl, meta in self.load_metadata.items(): + icon = "✓" if meta["status"] == "success" else "✗" + self.logger.info(f" {icon} {tbl:<35} {meta['rows_loaded']:>10,}") + + +# ============================================================================= +# AIRFLOW TASK FUNCTIONS +# ============================================================================= + +def run_aggregation(): + """ + Airflow task: Hitung semua agregasi dari analytical_food_security. + Dipanggil setelah analytical_layer_to_gold selesai. + """ + from scripts.bigquery_config import get_bigquery_client + client = get_bigquery_client() + agg = FoodSecurityAggregator(client) + agg.run() + total = sum(m["rows_loaded"] for m in agg.load_metadata.values()) + print(f"Aggregation completed: {total:,} total rows loaded") + + +# ============================================================================= +# MAIN EXECUTION +# ============================================================================= + +if __name__ == "__main__": + import io + + if _sys.stdout.encoding and _sys.stdout.encoding.lower() not in ("utf-8", "utf8"): + _sys.stdout = io.TextIOWrapper(_sys.stdout.buffer, encoding="utf-8", errors="replace") + if _sys.stderr.encoding and _sys.stderr.encoding.lower() not in ("utf-8", "utf8"): + _sys.stderr = io.TextIOWrapper(_sys.stderr.buffer, encoding="utf-8", errors="replace") + + print("=" * 70) + print("FOOD SECURITY AGGREGATION v8.0 — 4 TABLES -> fs_asean_gold") + print(f" NORMALIZE_FRAMEWORKS_JOINTLY = {NORMALIZE_FRAMEWORKS_JOINTLY}") + print("=" * 70) + + logger = setup_logging() + for handler in logger.handlers: + handler.__class__ = _SafeStreamHandler + + client = get_bigquery_client() + agg = FoodSecurityAggregator(client) + agg.run() + + print("\n" + "=" * 70) + print("[OK] SELESAI") + print("=" * 70) \ No newline at end of file diff --git a/scripts/bigquery_analytical_layer.py b/scripts/bigquery_analytical_layer.py new file mode 100644 index 0000000..6543564 --- /dev/null +++ b/scripts/bigquery_analytical_layer.py @@ -0,0 +1,557 @@ +""" +BIGQUERY ANALYTICAL LAYER - DATA FILTERING +FIXED: analytical_food_security disimpan di fs_asean_gold (layer='gold') + +Filtering Order: +1. Load data (single years only) +2. Determine year boundaries (2013 - auto-detected end year) +3. Filter complete indicators PER COUNTRY (auto-detect start year, no gaps) +4. Filter countries with ALL pillars (FIXED SET) +5. Filter indicators with consistent presence across FIXED countries +6. Save analytical table (value only, normalisasi & direction handled downstream) +""" + +import pandas as pd +import numpy as np +from datetime import datetime +import logging +from typing import Dict, List +import json +import sys + +if hasattr(sys.stdout, 'reconfigure'): + sys.stdout.reconfigure(encoding='utf-8') + +from scripts.bigquery_config import get_bigquery_client, CONFIG, get_table_id +from scripts.bigquery_helpers import ( + log_update, + load_to_bigquery, + read_from_bigquery, + setup_logging, + truncate_table, + save_etl_metadata, +) +from google.cloud import bigquery + + +# ============================================================================= +# ANALYTICAL LAYER CLASS +# ============================================================================= + +class AnalyticalLayerLoader: + """ + Analytical Layer Loader for BigQuery - CORRECTED VERSION v4 + + Key Logic: + 1. Complete per country (no gaps from start_year to end_year) + 2. Filter countries with all pillars + 3. Ensure indicators have consistent country count across all years + 4. Save raw value only (normalisasi & direction handled downstream) + + Output: analytical_food_security -> DW layer (Gold) -> fs_asean_gold + """ + + def __init__(self, client: bigquery.Client): + self.client = client + self.logger = logging.getLogger(self.__class__.__name__) + self.logger.propagate = False + + self.df_clean = None + self.df_indicator = None + self.df_country = None + self.df_pillar = None + + self.selected_country_ids = None + + self.start_year = 2013 + self.end_year = None + self.baseline_year = 2023 + + self.pipeline_metadata = { + 'source_class' : self.__class__.__name__, + 'start_time' : None, + 'end_time' : None, + 'duration_seconds' : None, + 'rows_fetched' : 0, + 'rows_transformed' : 0, + 'rows_loaded' : 0, + 'validation_metrics': {} + } + + self.pipeline_start = None + self.pipeline_end = None + + def load_source_data(self): + self.logger.info("\n" + "=" * 80) + self.logger.info("STEP 1: LOADING SOURCE DATA from fs_asean_gold") + self.logger.info("=" * 80) + + try: + query = f""" + SELECT + f.country_id, + c.country_name, + f.indicator_id, + i.indicator_name, + i.direction, + f.pillar_id, + p.pillar_name, + f.time_id, + t.year, + t.start_year, + t.end_year, + t.is_year_range, + f.value, + f.source_id + FROM `{get_table_id('fact_food_security', layer='gold')}` f + JOIN `{get_table_id('dim_country', layer='gold')}` c ON f.country_id = c.country_id + JOIN `{get_table_id('dim_indicator', layer='gold')}` i ON f.indicator_id = i.indicator_id + JOIN `{get_table_id('dim_pillar', layer='gold')}` p ON f.pillar_id = p.pillar_id + JOIN `{get_table_id('dim_time', layer='gold')}` t ON f.time_id = t.time_id + """ + + self.logger.info("Loading fact table with dimensions...") + self.df_clean = self.client.query(query).result().to_dataframe(create_bqstorage_client=False) + self.logger.info(f" Loaded: {len(self.df_clean):,} rows") + + if 'is_year_range' in self.df_clean.columns: + yr = self.df_clean['is_year_range'].value_counts() + self.logger.info(f" Breakdown:") + self.logger.info(f" Single years (is_year_range=False): {yr.get(False, 0):,}") + self.logger.info(f" Year ranges (is_year_range=True): {yr.get(True, 0):,}") + + self.df_indicator = read_from_bigquery(self.client, 'dim_indicator', layer='gold') + self.df_country = read_from_bigquery(self.client, 'dim_country', layer='gold') + self.df_pillar = read_from_bigquery(self.client, 'dim_pillar', layer='gold') + + self.logger.info(f" Indicators: {len(self.df_indicator)}") + self.logger.info(f" Countries: {len(self.df_country)}") + self.logger.info(f" Pillars: {len(self.df_pillar)}") + + self.pipeline_metadata['rows_fetched'] = len(self.df_clean) + return True + + except Exception as e: + self.logger.error(f"Error loading source data: {e}") + raise + + def determine_year_boundaries(self): + self.logger.info("\n" + "=" * 80) + self.logger.info("STEP 2: DETERMINE YEAR BOUNDARIES") + self.logger.info("=" * 80) + + df_2023 = self.df_clean[self.df_clean['year'] == self.baseline_year] + baseline_indicator_count = df_2023['indicator_id'].nunique() + + self.logger.info(f"\nBaseline Year: {self.baseline_year}") + self.logger.info(f"Baseline Indicator Count: {baseline_indicator_count}") + + years_sorted = sorted(self.df_clean['year'].unique(), reverse=True) + selected_end_year = None + + for year in years_sorted: + if year >= self.baseline_year: + df_year = self.df_clean[self.df_clean['year'] == year] + year_indicator_count = df_year['indicator_id'].nunique() + status = "OK" if year_indicator_count >= baseline_indicator_count else "X" + self.logger.info(f" [{status}] Year {int(year)}: {year_indicator_count} indicators") + if year_indicator_count >= baseline_indicator_count and selected_end_year is None: + selected_end_year = int(year) + + if selected_end_year is None: + selected_end_year = self.baseline_year + self.logger.warning(f" [!] No year found, using baseline: {selected_end_year}") + else: + self.logger.info(f"\n [OK] Selected End Year: {selected_end_year}") + + self.end_year = selected_end_year + original_count = len(self.df_clean) + + self.df_clean = self.df_clean[ + (self.df_clean['year'] >= self.start_year) & + (self.df_clean['year'] <= self.end_year) + ].copy() + + self.logger.info(f"\nFiltering {self.start_year}-{self.end_year}:") + self.logger.info(f" Rows before: {original_count:,}") + self.logger.info(f" Rows after: {len(self.df_clean):,}") + return self.df_clean + + def filter_complete_indicators_per_country(self): + self.logger.info("\n" + "=" * 80) + self.logger.info("STEP 3: FILTER COMPLETE INDICATORS PER COUNTRY (NO GAPS)") + self.logger.info("=" * 80) + + grouped = self.df_clean.groupby([ + 'country_id', 'country_name', 'indicator_id', 'indicator_name', + 'pillar_id', 'pillar_name' + ]) + + valid_combinations = [] + removed_combinations = [] + + for (country_id, country_name, indicator_id, indicator_name, + pillar_id, pillar_name), group in grouped: + + years_present = sorted(group['year'].unique()) + start_year = int(min(years_present)) + end_year_actual = int(max(years_present)) + expected_years = list(range(start_year, self.end_year + 1)) + missing_years = [y for y in expected_years if y not in years_present] + has_gap = len(missing_years) > 0 + + is_complete = ( + end_year_actual >= self.end_year and + not has_gap and + (self.end_year - start_year) >= 4 + ) + + if is_complete: + valid_combinations.append({'country_id': country_id, 'indicator_id': indicator_id}) + else: + reasons = [] + if end_year_actual < self.end_year: + reasons.append(f"ends {end_year_actual}") + if has_gap: + gap_str = str(missing_years[:3])[1:-1] + if len(missing_years) > 3: + gap_str += "..." + reasons.append(f"gap:{gap_str}") + if (self.end_year - start_year) < 4: + reasons.append(f"span={self.end_year - start_year}") + removed_combinations.append({ + 'country_name' : country_name, + 'indicator_name': indicator_name, + 'reasons' : ", ".join(reasons) + }) + + self.logger.info(f"\n [+] Valid: {len(valid_combinations):,}") + self.logger.info(f" [-] Removed: {len(removed_combinations):,}") + + df_valid = pd.DataFrame(valid_combinations) + df_valid['key'] = df_valid['country_id'].astype(str) + '_' + df_valid['indicator_id'].astype(str) + self.df_clean['key'] = (self.df_clean['country_id'].astype(str) + '_' + + self.df_clean['indicator_id'].astype(str)) + + original_count = len(self.df_clean) + self.df_clean = self.df_clean[self.df_clean['key'].isin(df_valid['key'])].copy() + self.df_clean = self.df_clean.drop('key', axis=1) + + self.logger.info(f"\n Rows before: {original_count:,}") + self.logger.info(f" Rows after: {len(self.df_clean):,}") + self.logger.info(f" Countries: {self.df_clean['country_id'].nunique()}") + self.logger.info(f" Indicators: {self.df_clean['indicator_id'].nunique()}") + return self.df_clean + + def select_countries_with_all_pillars(self): + self.logger.info("\n" + "=" * 80) + self.logger.info("STEP 4: SELECT COUNTRIES WITH ALL PILLARS (FIXED SET)") + self.logger.info("=" * 80) + + total_pillars = self.df_clean['pillar_id'].nunique() + country_pillar_count = self.df_clean.groupby(['country_id', 'country_name']).agg({ + 'pillar_id' : 'nunique', + 'indicator_id': 'nunique', + 'year' : lambda x: f"{int(x.min())}-{int(x.max())}" + }).reset_index() + country_pillar_count.columns = [ + 'country_id', 'country_name', 'pillar_count', 'indicator_count', 'year_range' + ] + + for _, row in country_pillar_count.sort_values('pillar_count', ascending=False).iterrows(): + status = "[+] KEEP" if row['pillar_count'] == total_pillars else "[-] REMOVE" + self.logger.info( + f" {status:<12} {row['country_name']:25s} " + f"{row['pillar_count']}/{total_pillars} pillars" + ) + + selected_countries = country_pillar_count[country_pillar_count['pillar_count'] == total_pillars] + self.selected_country_ids = selected_countries['country_id'].tolist() + + self.logger.info(f"\n FIXED SET: {len(self.selected_country_ids)} countries") + + original_count = len(self.df_clean) + self.df_clean = self.df_clean[self.df_clean['country_id'].isin(self.selected_country_ids)].copy() + + self.logger.info(f" Rows before: {original_count:,}") + self.logger.info(f" Rows after: {len(self.df_clean):,}") + return self.df_clean + + def filter_indicators_consistent_across_fixed_countries(self): + self.logger.info("\n" + "=" * 80) + self.logger.info("STEP 5: FILTER INDICATORS WITH CONSISTENT PRESENCE") + self.logger.info("=" * 80) + + indicator_country_start = self.df_clean.groupby([ + 'indicator_id', 'indicator_name', 'country_id' + ])['year'].min().reset_index() + indicator_country_start.columns = ['indicator_id', 'indicator_name', 'country_id', 'start_year'] + + indicator_max_start = indicator_country_start.groupby([ + 'indicator_id', 'indicator_name' + ])['start_year'].max().reset_index() + indicator_max_start.columns = ['indicator_id', 'indicator_name', 'max_start_year'] + + valid_indicators = [] + removed_indicators = [] + + for _, ind_row in indicator_max_start.iterrows(): + indicator_id = ind_row['indicator_id'] + indicator_name = ind_row['indicator_name'] + max_start = int(ind_row['max_start_year']) + span = self.end_year - max_start + + if span < 4: + removed_indicators.append({ + 'indicator_name': indicator_name, + 'reason' : f"span={span} < 4" + }) + continue + + expected_years = list(range(max_start, self.end_year + 1)) + ind_data = self.df_clean[self.df_clean['indicator_id'] == indicator_id] + all_years_complete = True + problematic_years = [] + + for year in expected_years: + country_count = ind_data[ind_data['year'] == year]['country_id'].nunique() + if country_count < len(self.selected_country_ids): + all_years_complete = False + problematic_years.append(f"{int(year)}({country_count})") + + if all_years_complete: + valid_indicators.append(indicator_id) + else: + removed_indicators.append({ + 'indicator_name': indicator_name, + 'reason' : f"missing countries in years: {', '.join(problematic_years[:5])}" + }) + + self.logger.info(f"\n [+] Valid: {len(valid_indicators)}") + self.logger.info(f" [-] Removed: {len(removed_indicators)}") + + if not valid_indicators: + raise ValueError("No valid indicators found after filtering!") + + original_count = len(self.df_clean) + self.df_clean = self.df_clean[self.df_clean['indicator_id'].isin(valid_indicators)].copy() + + self.df_clean = self.df_clean.merge( + indicator_max_start[['indicator_id', 'max_start_year']], on='indicator_id', how='left' + ) + self.df_clean = self.df_clean[self.df_clean['year'] >= self.df_clean['max_start_year']].copy() + self.df_clean = self.df_clean.drop('max_start_year', axis=1) + + self.logger.info(f"\n Rows before: {original_count:,}") + self.logger.info(f" Rows after: {len(self.df_clean):,}") + self.logger.info(f" Countries: {self.df_clean['country_id'].nunique()}") + self.logger.info(f" Indicators: {self.df_clean['indicator_id'].nunique()}") + self.logger.info(f" Pillars: {self.df_clean['pillar_id'].nunique()}") + return self.df_clean + + def verify_no_gaps(self): + self.logger.info("\n" + "=" * 80) + self.logger.info("STEP 6: VERIFY NO GAPS") + self.logger.info("=" * 80) + + expected_countries = len(self.selected_country_ids) + verification = self.df_clean.groupby(['indicator_id', 'year'])['country_id'].nunique().reset_index() + verification.columns = ['indicator_id', 'year', 'country_count'] + all_good = (verification['country_count'] == expected_countries).all() + + if all_good: + self.logger.info(f" VERIFICATION PASSED — all combinations have {expected_countries} countries") + else: + bad = verification[verification['country_count'] != expected_countries] + for _, row in bad.head(10).iterrows(): + self.logger.error( + f" Indicator {int(row['indicator_id'])}, Year {int(row['year'])}: " + f"{int(row['country_count'])} countries (expected {expected_countries})" + ) + raise ValueError("Gap verification failed!") + + return True + + def analyze_indicator_availability_by_year(self): + self.logger.info("\n" + "=" * 80) + self.logger.info("STEP 7: ANALYZE INDICATOR AVAILABILITY BY YEAR") + self.logger.info("=" * 80) + + year_stats = self.df_clean.groupby('year').agg({ + 'indicator_id': 'nunique', + 'country_id' : 'nunique' + }).reset_index() + year_stats.columns = ['year', 'indicator_count', 'country_count'] + + self.logger.info(f"\n{'Year':<8} {'Indicators':<15} {'Countries':<12} {'Rows'}") + self.logger.info("-" * 50) + for _, row in year_stats.iterrows(): + year = int(row['year']) + row_count = len(self.df_clean[self.df_clean['year'] == year]) + self.logger.info( + f"{year:<8} {int(row['indicator_count']):<15} " + f"{int(row['country_count']):<12} {row_count:,}" + ) + + indicator_details = self.df_clean.groupby([ + 'indicator_id', 'indicator_name', 'pillar_name', 'direction' + ]).agg({'year': ['min', 'max'], 'country_id': 'nunique'}).reset_index() + indicator_details.columns = [ + 'indicator_id', 'indicator_name', 'pillar_name', 'direction', + 'start_year', 'end_year', 'country_count' + ] + indicator_details['year_range'] = ( + indicator_details['start_year'].astype(int).astype(str) + '-' + + indicator_details['end_year'].astype(int).astype(str) + ) + indicator_details = indicator_details.sort_values(['pillar_name', 'start_year', 'indicator_name']) + + self.logger.info(f"\nTotal Indicators: {len(indicator_details)}") + for pillar, count in indicator_details.groupby('pillar_name').size().items(): + self.logger.info(f" {pillar}: {count} indicators") + + self.logger.info(f"\n{'-'*100}") + self.logger.info(f"{'ID':<5} {'Indicator Name':<55} {'Pillar':<15} {'Years':<12} {'Dir':<8} {'Countries'}") + self.logger.info(f"{'-'*100}") + for _, row in indicator_details.iterrows(): + direction = 'higher+' if row['direction'] == 'higher_better' else 'lower-' + self.logger.info( + f"{int(row['indicator_id']):<5} {row['indicator_name'][:52]:<55} " + f"{row['pillar_name'][:13]:<15} {row['year_range']:<12} " + f"{direction:<8} {int(row['country_count'])}" + ) + + return year_stats + + def save_analytical_table(self): + table_name = 'analytical_food_security' + self.logger.info("\n" + "=" * 80) + self.logger.info(f"STEP 8: SAVE TO [DW/Gold] {table_name} -> fs_asean_gold") + self.logger.info("=" * 80) + + try: + analytical_df = self.df_clean[[ + 'country_id', 'indicator_id', 'pillar_id', 'time_id', 'value' + ]].copy() + analytical_df = analytical_df.sort_values( + ['time_id', 'country_id', 'indicator_id'] + ).reset_index(drop=True) + + analytical_df['country_id'] = analytical_df['country_id'].astype(int) + analytical_df['indicator_id'] = analytical_df['indicator_id'].astype(int) + analytical_df['pillar_id'] = analytical_df['pillar_id'].astype(int) + analytical_df['time_id'] = analytical_df['time_id'].astype(int) + analytical_df['value'] = analytical_df['value'].astype(float) + + self.logger.info(f" Total rows: {len(analytical_df):,}") + + schema = [ + bigquery.SchemaField("country_id", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("indicator_id", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("pillar_id", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("time_id", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("value", "FLOAT", mode="REQUIRED"), + ] + + rows_loaded = load_to_bigquery( + self.client, analytical_df, table_name, + layer='gold', write_disposition="WRITE_TRUNCATE", schema=schema + ) + + self.pipeline_metadata['rows_loaded'] = rows_loaded + log_update(self.client, 'DW', table_name, 'full_load', rows_loaded) + + metadata = { + 'source_class' : self.__class__.__name__, + 'table_name' : table_name, + 'execution_timestamp': self.pipeline_start, + 'duration_seconds' : (datetime.now() - self.pipeline_start).total_seconds(), + 'rows_fetched' : self.pipeline_metadata['rows_fetched'], + 'rows_transformed' : rows_loaded, + 'rows_loaded' : rows_loaded, + 'completeness_pct' : 100.0, + 'config_snapshot' : json.dumps({ + 'start_year' : self.start_year, + 'end_year' : self.end_year, + 'fixed_countries': len(self.selected_country_ids), + 'no_gaps' : True, + 'layer' : 'gold' + }), + 'validation_metrics' : json.dumps({ + 'fixed_countries' : len(self.selected_country_ids), + 'total_indicators': int(self.df_clean['indicator_id'].nunique()) + }) + } + save_etl_metadata(self.client, metadata) + + self.logger.info(f" ✓ {table_name}: {rows_loaded:,} rows → [DW/Gold] fs_asean_gold") + self.logger.info(f" Metadata → [AUDIT] etl_metadata") + return rows_loaded + + except Exception as e: + self.logger.error(f"Error saving: {e}") + raise + + def run(self): + self.pipeline_start = datetime.now() + self.pipeline_metadata['start_time'] = self.pipeline_start + + self.logger.info("\n" + "=" * 80) + self.logger.info("Output: analytical_food_security → fs_asean_gold") + self.logger.info("=" * 80) + + self.load_source_data() + self.determine_year_boundaries() + self.filter_complete_indicators_per_country() + self.select_countries_with_all_pillars() + self.filter_indicators_consistent_across_fixed_countries() + self.verify_no_gaps() + self.analyze_indicator_availability_by_year() + self.save_analytical_table() + + self.pipeline_end = datetime.now() + duration = (self.pipeline_end - self.pipeline_start).total_seconds() + + self.logger.info("\n" + "=" * 80) + self.logger.info("COMPLETED") + self.logger.info("=" * 80) + self.logger.info(f" Duration : {duration:.2f}s") + self.logger.info(f" Year Range : {self.start_year}-{self.end_year}") + self.logger.info(f" Countries : {len(self.selected_country_ids)}") + self.logger.info(f" Indicators : {self.df_clean['indicator_id'].nunique()}") + self.logger.info(f" Rows Loaded: {self.pipeline_metadata['rows_loaded']:,}") + + +# ============================================================================= +# AIRFLOW TASK FUNCTION +# ============================================================================= + +def run_analytical_layer(): + """ + Airflow task: Build analytical_food_security dari fact_food_security + dims. + Dipanggil setelah dimensional_model_to_gold selesai. + """ + from scripts.bigquery_config import get_bigquery_client + client = get_bigquery_client() + loader = AnalyticalLayerLoader(client) + loader.run() + print(f"Analytical layer loaded: {loader.pipeline_metadata['rows_loaded']:,} rows") + + +# ============================================================================= +# MAIN EXECUTION +# ============================================================================= + +if __name__ == "__main__": + print("=" * 80) + print("Output: analytical_food_security → fs_asean_gold") + print("=" * 80) + + logger = setup_logging() + client = get_bigquery_client() + loader = AnalyticalLayerLoader(client) + loader.run() + + print("\n" + "=" * 80) + print("[OK] COMPLETED") + print("=" * 80) \ No newline at end of file