diff --git a/scripts/bigquery_aggraget_fact_selected_layer.py b/scripts/bigquery_aggraget_fact_selected_layer.py index 0376e5a..bd32fab 100644 --- a/scripts/bigquery_aggraget_fact_selected_layer.py +++ b/scripts/bigquery_aggraget_fact_selected_layer.py @@ -4,6 +4,12 @@ Tabel 1: agg_indicator_norm -> fs_asean_gold Tabel 2: agg_narrative_indicator -> fs_asean_gold ============================================================================= +PERUBAHAN: + - Ditambahkan kolom indicator_name_id : nama indikator dalam Bahasa Indonesia + - Ditambahkan kolom pillar_name_id : nama pilar dalam Bahasa Indonesia + - Kedua kolom ikut tersimpan di BigQuery (schema + DataFrame output) +============================================================================= + agg_indicator_norm ============================================================================= Tujuan: @@ -30,8 +36,9 @@ Performance Label Logic: Output Schema (agg_indicator_norm): year, country_id, country_name, - indicator_id, indicator_name, unit, direction, - pillar_id, pillar_name, + indicator_id, indicator_name, indicator_name_id, + unit, direction, + pillar_id, pillar_name, pillar_name_id, framework, value, norm_value, @@ -53,8 +60,10 @@ Granularity: indicator_id (all years, all ASEAN countries) Output Schema (agg_narrative_indicator): - indicator_id, indicator_name, unit, direction, - pillar_name, framework, + indicator_id, indicator_name, indicator_name_id, + unit, direction, + pillar_name, pillar_name_id, + framework, year_min, year_max, n_countries, avg_value_first, avg_value_last, avg_norm_score_1_100, @@ -83,6 +92,128 @@ from scripts.bigquery_helpers import ( from google.cloud import bigquery +# ============================================================================= +# MAPPING BAHASA INDONESIA +# ============================================================================= + +# Mapping nama pilar (Inggris -> Indonesia) +PILLAR_NAME_ID_MAP: dict = { + "Availability" : "Ketersediaan", + "Access" : "Akses", + "Utilization" : "Pemanfaatan", + "Stability" : "Stabilitas", + "availability" : "Ketersediaan", + "access" : "Akses", + "utilization" : "Pemanfaatan", + "stability" : "Stabilitas", +} + +# Mapping nama indikator (Inggris -> Indonesia) +# Kunci: indicator_name lowercase stripped +INDICATOR_NAME_ID_MAP: dict = { + # --- Availability / Ketersediaan --- + "prevalence of undernourishment (percent) (3-year average)": + "Prevalensi kekurangan gizi (persen) (rata-rata 3 tahun)", + "number of people undernourished (million) (3-year average)": + "Jumlah penduduk kekurangan gizi (juta jiwa) (rata-rata 3 tahun)", + "prevalence of severe food insecurity in the total population (percent) (3-year average)": + "Prevalensi ketidaktahanan pangan berat pada total populasi (persen) (rata-rata 3 tahun)", + "prevalence of severe food insecurity in the male adult population (percent) (3-year average)": + "Prevalensi ketidaktahanan pangan berat pada populasi dewasa laki-laki (persen) (rata-rata 3 tahun)", + "prevalence of severe food insecurity in the female adult population (percent) (3-year average)": + "Prevalensi ketidaktahanan pangan berat pada populasi dewasa perempuan (persen) (rata-rata 3 tahun)", + "prevalence of moderate or severe food insecurity in the total population (percent) (3-year average)": + "Prevalensi ketidaktahanan pangan sedang atau berat pada total populasi (persen) (rata-rata 3 tahun)", + "prevalence of moderate or severe food insecurity in the male adult population (percent) (3-year average)": + "Prevalensi ketidaktahanan pangan sedang atau berat pada populasi dewasa laki-laki (persen) (rata-rata 3 tahun)", + "prevalence of moderate or severe food insecurity in the female adult population (percent) (3-year average)": + "Prevalensi ketidaktahanan pangan sedang atau berat pada populasi dewasa perempuan (persen) (rata-rata 3 tahun)", + "number of severely food insecure people (million) (3-year average)": + "Jumlah penduduk mengalami ketidaktahanan pangan berat (juta jiwa) (rata-rata 3 tahun)", + "number of severely food insecure male adults (million) (3-year average)": + "Jumlah dewasa laki-laki mengalami ketidaktahanan pangan berat (juta jiwa) (rata-rata 3 tahun)", + "number of severely food insecure female adults (million) (3-year average)": + "Jumlah dewasa perempuan mengalami ketidaktahanan pangan berat (juta jiwa) (rata-rata 3 tahun)", + "number of moderately or severely food insecure people (million) (3-year average)": + "Jumlah penduduk mengalami ketidaktahanan pangan sedang atau berat (juta jiwa) (rata-rata 3 tahun)", + "number of moderately or severely food insecure male adults (million) (3-year average)": + "Jumlah dewasa laki-laki mengalami ketidaktahanan pangan sedang atau berat (juta jiwa) (rata-rata 3 tahun)", + "number of moderately or severely food insecure female adults (million) (3-year average)": + "Jumlah dewasa perempuan mengalami ketidaktahanan pangan sedang atau berat (juta jiwa) (rata-rata 3 tahun)", + # --- Utilization / Pemanfaatan --- + "percentage of children under 5 years of age who are stunted (modelled estimates) (percent)": + "Persentase anak di bawah 5 tahun yang mengalami stunting (estimasi model) (persen)", + "number of children under 5 years of age who are stunted (modeled estimates) (million)": + "Jumlah anak di bawah 5 tahun yang mengalami stunting (estimasi model) (juta jiwa)", + "percentage of children under 5 years affected by wasting (percent)": + "Persentase anak di bawah 5 tahun yang mengalami wasting (persen)", + "number of children under 5 years affected by wasting (million)": + "Jumlah anak di bawah 5 tahun yang mengalami wasting (juta jiwa)", + "percentage of children under 5 years of age who are overweight (modelled estimates) (percent)": + "Persentase anak di bawah 5 tahun yang mengalami kelebihan berat badan (estimasi model) (persen)", + "number of children under 5 years of age who are overweight (modeled estimates) (million)": + "Jumlah anak di bawah 5 tahun yang mengalami kelebihan berat badan (estimasi model) (juta jiwa)", + "prevalence of anemia among women of reproductive age (15-49 years) (percent)": + "Prevalensi anemia pada perempuan usia reproduksi (15-49 tahun) (persen)", + "number of women of reproductive age (15-49 years) affected by anemia (million)": + "Jumlah perempuan usia reproduksi (15-49 tahun) yang mengalami anemia (juta jiwa)", + # --- Access / Akses --- + "gdp per capita (current us$)": + "PDB per kapita (US$ saat ini)", + "gdp per capita, ppp (current international $)": + "PDB per kapita, PPP (internasional $ saat ini)", + "food consumer price index (cpi)": + "Indeks Harga Konsumen (IHK) pangan", + "per capita food supply variability (kcal/cap/day)": + "Variabilitas pasokan pangan per kapita (kkal/kapita/hari)", + "percentage of population using at least basic drinking water services": + "Persentase penduduk yang menggunakan layanan air minum dasar", + "percentage of population using at least basic sanitation services": + "Persentase penduduk yang menggunakan layanan sanitasi dasar", + "prevalence of obesity in the adult population (18 years and older)": + "Prevalensi obesitas pada populasi dewasa (18 tahun ke atas)", + "prevalence of overweight in the adult population (18 years and older)": + "Prevalensi kelebihan berat badan pada populasi dewasa (18 tahun ke atas)", + "minimum dietary energy requirement (mder) (kcal/cap/day)": + "Kebutuhan energi pangan minimum (KEPM) (kkal/kapita/hari)", + "average dietary energy supply adequacy (percent) (3-year average)": + "Kecukupan rata-rata pasokan energi pangan (persen) (rata-rata 3 tahun)", + "average protein supply (g/cap/day) (3-year average)": + "Rata-rata pasokan protein (g/kapita/hari) (rata-rata 3 tahun)", + "average supply of protein of animal origin (g/cap/day) (3-year average)": + "Rata-rata pasokan protein hewani (g/kapita/hari) (rata-rata 3 tahun)", + # --- Stability / Stabilitas --- + "political stability and absence of violence/terrorism": + "Stabilitas politik dan ketiadaan kekerasan/terorisme", + "domestic food price volatility index": + "Indeks volatilitas harga pangan domestik", + "per capita food supply variability (kcal/capita/day)": + "Variabilitas pasokan pangan per kapita (kkal/kapita/hari)", + "cereal import dependency ratio (percent) (3-year average)": + "Rasio ketergantungan impor sereal (persen) (rata-rata 3 tahun)", + "value of food imports in total merchandise exports (percent) (3-year average)": + "Nilai impor pangan terhadap total ekspor barang (persen) (rata-rata 3 tahun)", + "share of dietary energy supply derived from cereals, roots and tubers (percent) (3-year average)": + "Pangsa pasokan energi pangan dari sereal, akar, dan umbi-umbian (persen) (rata-rata 3 tahun)", +} + + +def get_indicator_name_id(indicator_name: str) -> str: + """Kembalikan terjemahan Bahasa Indonesia untuk nama indikator.""" + return INDICATOR_NAME_ID_MAP.get( + str(indicator_name).lower().strip(), + str(indicator_name), # fallback: kembalikan nama asli jika tidak ada mapping + ) + + +def get_pillar_name_id(pillar_name: str) -> str: + """Kembalikan terjemahan Bahasa Indonesia untuk nama pilar.""" + return PILLAR_NAME_ID_MAP.get( + str(pillar_name).strip(), + str(pillar_name), # fallback: kembalikan nama asli jika tidak ada mapping + ) + + # ============================================================================= # SDG-ONLY KEYWORD SET # ============================================================================= @@ -190,55 +321,42 @@ def _is_lower_better(direction: str) -> bool: # ============================================================================= def _detect_trend(scores_by_year: pd.Series, lower_better: bool) -> str: - """ - Deteksi tren: improving_consistent, improving_slowing, fluctuating, deteriorating. - scores_by_year: Series dengan index=year, value=avg_score (sudah direction-aware). - """ if len(scores_by_year) < 3: return "insufficient_data" - years = sorted(scores_by_year.index) - vals = [scores_by_year[y] for y in years if not pd.isna(scores_by_year.get(y, np.nan))] + years = sorted(scores_by_year.index) + vals = [scores_by_year[y] for y in years if not pd.isna(scores_by_year.get(y, np.nan))] if len(vals) < 3: return "insufficient_data" - # Hitung slope keseluruhan - x = np.arange(len(vals)) - slope = np.polyfit(x, vals, 1)[0] + x = np.arange(len(vals)) + slope = np.polyfit(x, vals, 1)[0] - # Slope positif = skor naik = baik untuk higher_better, buruk untuk lower_better improving = (slope > 0 and not lower_better) or (slope < 0 and lower_better) - # Hitung apakah laju melambat: bandingkan slope paruh pertama vs paruh kedua - mid = len(vals) // 2 - first_half = vals[:mid] + mid = len(vals) // 2 + first_half = vals[:mid] second_half = vals[mid:] slope1 = np.polyfit(np.arange(len(first_half)), first_half, 1)[0] if len(first_half) > 1 else 0 slope2 = np.polyfit(np.arange(len(second_half)), second_half, 1)[0] if len(second_half) > 1 else 0 - # Koefisien variasi untuk cek fluktuasi cv = np.std(vals) / (np.mean(vals) + 1e-9) if cv > 0.25: return "fluctuating" if improving: - # Cek apakah melambat if lower_better: - slowing = slope2 > slope1 # slope negatif mengecil artinya melambat + slowing = slope2 > slope1 else: - slowing = slope2 < slope1 # slope positif mengecil artinya melambat + slowing = slope2 < slope1 return "improving_slowing" if slowing else "improving_consistent" else: return "deteriorating" def _detect_gap_trend(df_ind: pd.DataFrame, lower_better: bool) -> str: - """ - Deteksi apakah gap antar negara melebar, menyempit, atau stabil. - df_ind: rows untuk 1 indikator, kolom: year, country_id, value - """ std_by_year = ( df_ind.groupby("year")["value"] .std() @@ -257,10 +375,6 @@ def _detect_gap_trend(df_ind: pd.DataFrame, lower_better: bool) -> str: def _detect_anomaly_year(scores_by_year: pd.Series) -> tuple: - """ - Deteksi tahun dengan perubahan paling ekstrem (naik atau turun tajam). - Return: (anomaly_year, direction) atau (None, None) - """ if len(scores_by_year) < 3: return None, None @@ -290,10 +404,6 @@ def _detect_anomaly_year(scores_by_year: pd.Series) -> tuple: def _detect_consistency(df_ind: pd.DataFrame, lower_better: bool) -> tuple: - """ - Cari negara yang paling konsisten terbaik dan terburuk. - Return: (consistent_best, consistent_worst, is_consistent) - """ country_avg = ( df_ind.groupby("country_name")["value"] .mean() @@ -309,7 +419,6 @@ def _detect_consistency(df_ind: pd.DataFrame, lower_better: bool) -> tuple: best = country_avg.idxmax() worst = country_avg.idxmin() - # Cek konsistensi: apakah negara terbaik selalu di atas rata-rata? asean_avg_by_year = df_ind.groupby("year")["value"].mean() country_by_year = df_ind[df_ind["country_name"] == best].set_index("year")["value"] @@ -338,10 +447,6 @@ def _detect_consistency(df_ind: pd.DataFrame, lower_better: bool) -> tuple: # ============================================================================= def _build_narrative_per_indicator(row: pd.Series, df_full: pd.DataFrame) -> tuple: - """ - Bangun narasi interpretatif per indikator berdasarkan kondisi nyata data. - Return: (narrative_en, narrative_id) — plain text tanpa markdown bold. - """ ind_id = int(row["indicator_id"]) ind_name = str(row["indicator_name"]).strip() unit = str(row["unit"]).strip() if row["unit"] else "" @@ -352,7 +457,6 @@ def _build_narrative_per_indicator(row: pd.Series, df_full: pd.DataFrame) -> tup year_max = int(row["year_max"]) lower_better = _is_lower_better(direction) - # Subset data untuk indikator ini df_ind = df_full[df_full["indicator_id"] == ind_id].copy() if df_ind.empty: @@ -360,13 +464,12 @@ def _build_narrative_per_indicator(row: pd.Series, df_full: pd.DataFrame) -> tup na_id = f"{ind_name} ({framework}, {pillar}): Data tidak cukup untuk dianalisis." return na_en, na_id - # ---- Hitung kondisi dari data ---- asean_avg_by_year = ( df_ind.groupby("year")["value"].mean().dropna() ) - trend_label = _detect_trend(asean_avg_by_year, lower_better) - gap_label = _detect_gap_trend(df_ind, lower_better) + trend_label = _detect_trend(asean_avg_by_year, lower_better) + gap_label = _detect_gap_trend(df_ind, lower_better) anomaly_year, anomaly_dir = _detect_anomaly_year(asean_avg_by_year) best_country, worst_country, is_consistent = _detect_consistency(df_ind, lower_better) @@ -380,17 +483,14 @@ def _build_narrative_per_indicator(row: pd.Series, df_full: pd.DataFrame) -> tup s = f"{v:,.1f}" if abs_v >= 1000 else (f"{v:.2f}" if abs_v >= 10 else f"{v:.3f}") return f"{s} {unit}".strip() if unit else s - # ---- Bangun kalimat EN ---- sentences_en = [] sentences_id = [] - # Kalimat 1: konteks indikator s1_en = f"{ind_name} ({framework}, {pillar}, {year_min}-{year_max}):" s1_id = f"{ind_name} ({framework}, {pillar}, {year_min}-{year_max}):" sentences_en.append(s1_en) sentences_id.append(s1_id) - # Kalimat 2: tren keseluruhan trend_map_en = { "improving_consistent": f"Regional average improved consistently from {fmt(avg_first)} to {fmt(avg_last)}.", "improving_slowing": f"Regional average improved from {fmt(avg_first)} to {fmt(avg_last)}, though the pace slowed in recent years.", @@ -408,7 +508,6 @@ def _build_narrative_per_indicator(row: pd.Series, df_full: pd.DataFrame) -> tup sentences_en.append(trend_map_en.get(trend_label, "")) sentences_id.append(trend_map_id.get(trend_label, "")) - # Kalimat 3: gap antar negara if gap_label == "widening": sentences_en.append("Disparity among ASEAN countries has widened over time, indicating unequal progress.") sentences_id.append("Kesenjangan antar negara ASEAN melebar seiring waktu, menunjukkan kemajuan yang tidak merata.") @@ -419,7 +518,6 @@ def _build_narrative_per_indicator(row: pd.Series, df_full: pd.DataFrame) -> tup sentences_en.append("The gap among ASEAN countries remained relatively stable throughout the period.") sentences_id.append("Kesenjangan antar negara ASEAN relatif stabil sepanjang periode.") - # Kalimat 4: anomali if anomaly_year is not None: if anomaly_dir == "drop": sentences_en.append(f"A notable decline was recorded in {anomaly_year}, which stood out from the overall pattern.") @@ -428,7 +526,6 @@ def _build_narrative_per_indicator(row: pd.Series, df_full: pd.DataFrame) -> tup sentences_en.append(f"A sharp improvement was observed in {anomaly_year}, standing out from the overall pattern.") sentences_id.append(f"Peningkatan tajam tercatat pada tahun {anomaly_year}, yang menyimpang dari pola keseluruhan.") - # Kalimat 5: konsistensi negara terbaik/terburuk if best_country and worst_country: if is_consistent: sentences_en.append( @@ -581,6 +678,50 @@ class IndicatorNormAggregator: f" Merge OK. Rows: {after:,} | Rows dengan unit kosong: {n_empty}" ) + # ========================================================================= + # STEP 3b: Tambah kolom nama Bahasa Indonesia + # ========================================================================= + + def _add_indonesia_name_columns(self): + self.logger.info("\n" + "=" * 80) + self.logger.info("STEP 3b: ADD BAHASA INDONESIA NAME COLUMNS") + self.logger.info("=" * 80) + + self.df["indicator_name_id"] = ( + self.df["indicator_name"] + .apply(get_indicator_name_id) + .astype(str) + ) + self.df["pillar_name_id"] = ( + self.df["pillar_name"] + .apply(get_pillar_name_id) + .astype(str) + ) + + n_indicator_mapped = (self.df["indicator_name_id"] != self.df["indicator_name"]).sum() + n_pillar_mapped = (self.df["pillar_name_id"] != self.df["pillar_name"]).sum() + self.logger.info(f" indicator_name_id mapped rows : {n_indicator_mapped:,}") + self.logger.info(f" pillar_name_id mapped rows : {n_pillar_mapped:,}") + + # Log sample mapping + sample_ind = ( + self.df[["indicator_name", "indicator_name_id"]] + .drop_duplicates() + .head(5) + ) + self.logger.info("\n Sample indicator mapping (EN -> ID):") + for _, r in sample_ind.iterrows(): + self.logger.info(f" EN: {r['indicator_name'][:55]}") + self.logger.info(f" ID: {r['indicator_name_id'][:55]}") + + sample_pil = ( + self.df[["pillar_name", "pillar_name_id"]] + .drop_duplicates() + ) + self.logger.info("\n Pillar mapping (EN -> ID):") + for _, r in sample_pil.iterrows(): + self.logger.info(f" {r['pillar_name']:<20} -> {r['pillar_name_id']}") + # ========================================================================= # STEP 4: Deteksi sdgs_start_year # ========================================================================= @@ -783,8 +924,10 @@ class IndicatorNormAggregator: out = df[[ "year", "country_id", "country_name", - "indicator_id", "indicator_name", "unit", "direction", - "pillar_id", "pillar_name", "framework", + "indicator_id", "indicator_name", "indicator_name_id", + "unit", "direction", + "pillar_id", "pillar_name", "pillar_name_id", + "framework", "value", "norm_value", "norm_score_1_100", "yoy_value", "yoy_norm_value", "performance", ]].copy() @@ -793,22 +936,24 @@ class IndicatorNormAggregator: ["year", "country_name", "pillar_name", "indicator_name"] ).reset_index(drop=True) - out["year"] = out["year"].astype(int) - out["country_id"] = out["country_id"].astype(int) - out["country_name"] = out["country_name"].astype(str) - out["indicator_id"] = out["indicator_id"].astype(int) - out["indicator_name"] = out["indicator_name"].astype(str) - out["unit"] = out["unit"].astype(str) - out["direction"] = out["direction"].astype(str) - out["pillar_id"] = out["pillar_id"].astype(int) - out["pillar_name"] = out["pillar_name"].astype(str) - out["framework"] = out["framework"].astype(str) - out["value"] = out["value"].astype(float) - out["norm_value"] = out["norm_value"].astype(float) - out["norm_score_1_100"] = out["norm_score_1_100"].astype(float) - out["yoy_value"] = pd.to_numeric(out["yoy_value"], errors="coerce").astype(float) - out["yoy_norm_value"] = pd.to_numeric(out["yoy_norm_value"], errors="coerce").astype(float) - out["performance"] = out["performance"].astype(str).replace("nan", pd.NA).astype("string") + out["year"] = out["year"].astype(int) + out["country_id"] = out["country_id"].astype(int) + out["country_name"] = out["country_name"].astype(str) + out["indicator_id"] = out["indicator_id"].astype(int) + out["indicator_name"] = out["indicator_name"].astype(str) + out["indicator_name_id"] = out["indicator_name_id"].astype(str) + out["unit"] = out["unit"].astype(str) + out["direction"] = out["direction"].astype(str) + out["pillar_id"] = out["pillar_id"].astype(int) + out["pillar_name"] = out["pillar_name"].astype(str) + out["pillar_name_id"] = out["pillar_name_id"].astype(str) + out["framework"] = out["framework"].astype(str) + out["value"] = out["value"].astype(float) + out["norm_value"] = out["norm_value"].astype(float) + out["norm_score_1_100"] = out["norm_score_1_100"].astype(float) + out["yoy_value"] = pd.to_numeric(out["yoy_value"], errors="coerce").astype(float) + out["yoy_norm_value"] = pd.to_numeric(out["yoy_norm_value"], errors="coerce").astype(float) + out["performance"] = out["performance"].astype(str).replace("nan", pd.NA).astype("string") self.logger.info(f" Total rows : {len(out):,}") self.logger.info(f" Countries : {out['country_id'].nunique()}") @@ -816,22 +961,24 @@ class IndicatorNormAggregator: self.logger.info(f" Years : {int(out['year'].min())} - {int(out['year'].max())}") schema = [ - bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("country_id", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("country_name", "STRING", mode="REQUIRED"), - bigquery.SchemaField("indicator_id", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("indicator_name", "STRING", mode="REQUIRED"), - bigquery.SchemaField("unit", "STRING", mode="NULLABLE"), - bigquery.SchemaField("direction", "STRING", mode="REQUIRED"), - bigquery.SchemaField("pillar_id", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("pillar_name", "STRING", mode="REQUIRED"), - bigquery.SchemaField("framework", "STRING", mode="REQUIRED"), - bigquery.SchemaField("value", "FLOAT", mode="REQUIRED"), - bigquery.SchemaField("norm_value", "FLOAT", mode="NULLABLE"), - bigquery.SchemaField("norm_score_1_100", "FLOAT", mode="NULLABLE"), - bigquery.SchemaField("yoy_value", "FLOAT", mode="NULLABLE"), - bigquery.SchemaField("yoy_norm_value", "FLOAT", mode="NULLABLE"), - bigquery.SchemaField("performance", "STRING", mode="NULLABLE"), + bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("country_id", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("country_name", "STRING", mode="REQUIRED"), + bigquery.SchemaField("indicator_id", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("indicator_name", "STRING", mode="REQUIRED"), + bigquery.SchemaField("indicator_name_id", "STRING", mode="NULLABLE"), + bigquery.SchemaField("unit", "STRING", mode="NULLABLE"), + bigquery.SchemaField("direction", "STRING", mode="REQUIRED"), + bigquery.SchemaField("pillar_id", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("pillar_name", "STRING", mode="REQUIRED"), + bigquery.SchemaField("pillar_name_id", "STRING", mode="NULLABLE"), + bigquery.SchemaField("framework", "STRING", mode="REQUIRED"), + bigquery.SchemaField("value", "FLOAT", mode="REQUIRED"), + bigquery.SchemaField("norm_value", "FLOAT", mode="NULLABLE"), + bigquery.SchemaField("norm_score_1_100", "FLOAT", mode="NULLABLE"), + bigquery.SchemaField("yoy_value", "FLOAT", mode="NULLABLE"), + bigquery.SchemaField("yoy_norm_value", "FLOAT", mode="NULLABLE"), + bigquery.SchemaField("performance", "STRING", mode="NULLABLE"), ] rows_loaded = load_to_bigquery( @@ -860,6 +1007,7 @@ class IndicatorNormAggregator: "yoy_columns" : ["yoy_value", "yoy_norm_value"], "performance_threshold": _PERFORMANCE_THRESHOLD, "unit_source" : "dim_indicator", + "added_columns" : ["indicator_name_id", "pillar_name_id"], }), "validation_metrics" : json.dumps({ "total_rows" : rows_loaded, @@ -1022,9 +1170,14 @@ class IndicatorNormAggregator: }) df_country_stats = pd.DataFrame(country_stats) - # Dim cols - dim_cols = ["indicator_name", "unit", "direction", "pillar_name", "framework"] - df_dim = df[["indicator_id"] + dim_cols].drop_duplicates(subset=["indicator_id"]) + # Dim cols — sertakan kolom Indonesia + dim_cols = [ + "indicator_name", "indicator_name_id", + "unit", "direction", + "pillar_name", "pillar_name_id", + "framework", + ] + df_dim = df[["indicator_id"] + dim_cols].drop_duplicates(subset=["indicator_id"]) # Merge semua df_agg = ( @@ -1043,7 +1196,7 @@ class IndicatorNormAggregator: df_agg.loc[has_score & (df_agg["avg_norm_score_1_100"] >= _PERFORMANCE_THRESHOLD), "performance"] = "Good" df_agg.loc[has_score & (df_agg["avg_norm_score_1_100"] < _PERFORMANCE_THRESHOLD), "performance"] = "Bad" - # ---- Build narrative (bilingual, interpretatif, plain text) ---- + # ---- Build narrative ---- self.logger.info("\n--- BUILD NARRATIVE (interpretatif, plain text, bilingual EN/ID) ---") narratives_en = [] narratives_id = [] @@ -1064,8 +1217,10 @@ class IndicatorNormAggregator: # ---- Save ---- out = df_agg[[ - "indicator_id", "indicator_name", "unit", "direction", - "pillar_name", "framework", + "indicator_id", "indicator_name", "indicator_name_id", + "unit", "direction", + "pillar_name", "pillar_name_id", + "framework", "year_min", "year_max", "n_countries", "avg_value_first", "avg_value_last", "avg_norm_score_1_100", "performance", @@ -1079,9 +1234,11 @@ class IndicatorNormAggregator: out["indicator_id"] = out["indicator_id"].astype(int) out["indicator_name"] = out["indicator_name"].astype(str) + out["indicator_name_id"] = out["indicator_name_id"].astype(str) out["unit"] = out["unit"].fillna("").astype(str) out["direction"] = out["direction"].astype(str) out["pillar_name"] = out["pillar_name"].astype(str) + out["pillar_name_id"] = out["pillar_name_id"].astype(str) out["framework"] = out["framework"].astype(str) out["year_min"] = out["year_min"].astype(int) out["year_max"] = out["year_max"].astype(int) @@ -1102,9 +1259,11 @@ class IndicatorNormAggregator: schema = [ bigquery.SchemaField("indicator_id", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("indicator_name", "STRING", mode="REQUIRED"), + bigquery.SchemaField("indicator_name_id", "STRING", mode="NULLABLE"), bigquery.SchemaField("unit", "STRING", mode="NULLABLE"), bigquery.SchemaField("direction", "STRING", mode="REQUIRED"), bigquery.SchemaField("pillar_name", "STRING", mode="REQUIRED"), + bigquery.SchemaField("pillar_name_id", "STRING", mode="NULLABLE"), bigquery.SchemaField("framework", "STRING", mode="REQUIRED"), bigquery.SchemaField("year_min", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("year_max", "INTEGER", mode="REQUIRED"), @@ -1149,6 +1308,7 @@ class IndicatorNormAggregator: "narrative_dimensions" : ["trend", "gap_trend", "anomaly", "country_consistency"], "performance_threshold": _PERFORMANCE_THRESHOLD, "layer" : "gold", + "added_columns" : ["indicator_name_id", "pillar_name_id"], }), "validation_metrics" : json.dumps({ "total_rows" : rows_loaded, @@ -1172,11 +1332,13 @@ class IndicatorNormAggregator: self.logger.info(" Dim : dim_indicator (unit)") self.logger.info(" Output : agg_indicator_norm -> fs_asean_gold") self.logger.info(" agg_narrative_indicator -> fs_asean_gold") + self.logger.info(" Added : indicator_name_id, pillar_name_id (Bahasa Indonesia)") self.logger.info("=" * 80) self.load_data() self.load_units() self._merge_unit() + self._add_indonesia_name_columns() # <-- BARU self.sdgs_start_year = self._detect_sdgs_start_year() self._assign_framework() df_normed = self._compute_norm_values() diff --git a/scripts/bigquery_aggregate_layer.py b/scripts/bigquery_aggregate_layer.py index 9597549..96b26a0 100644 --- a/scripts/bigquery_aggregate_layer.py +++ b/scripts/bigquery_aggregate_layer.py @@ -14,6 +14,12 @@ Narrative style: - Interpretatif: membaca tren, gap, anomali, konsistensi dari data nyata - Bilingual: narrative_en (Inggris) + narrative_id (Indonesia) - Granularity: per tahun (Overview & Pillar) + +ADDED: Kolom indicator_name_id dan pillar_name_id (terjemahan Bahasa Indonesia) + - agg_pillar_composite : + pillar_name_id + - agg_pillar_by_country : + pillar_name_id + - agg_framework_by_country : (framework tidak diterjemahkan, sudah singkat) + - agg_narrative_pillar : + pillar_name_id """ import pandas as pd @@ -82,6 +88,176 @@ _FIES_DETECTION_LOWER: frozenset = frozenset([ ]) +# ============================================================================= +# TRANSLATION DICTIONARIES +# ============================================================================= + +PILLAR_TRANSLATION_ID: dict = { + # 4 pilar utama Food Security + "Availability" : "Ketersediaan", + "Access" : "Keterjangkauan", + "Utilization" : "Pemanfaatan", + "Stability" : "Stabilitas", + # Variasi penulisan yang mungkin muncul + "availability" : "Ketersediaan", + "access" : "Keterjangkauan", + "utilization" : "Pemanfaatan", + "stability" : "Stabilitas", + "Food Availability" : "Ketersediaan Pangan", + "Food Access" : "Keterjangkauan Pangan", + "Food Utilization" : "Pemanfaatan Pangan", + "Food Stability" : "Stabilitas Pangan", +} + +INDICATOR_TRANSLATION_ID: dict = { + # ------------------------------------------------------------------------- + # AVAILABILITY + # ------------------------------------------------------------------------- + "Average dietary energy supply adequacy (percent) (3-year average)": + "Kecukupan rata-rata pasokan energi makanan (persen) (rata-rata 3 tahun)", + "Average value of food production (constant 2014-2016 thousand US$) (3-year average)": + "Nilai rata-rata produksi pangan (ribu US$ konstan 2014-2016) (rata-rata 3 tahun)", + "Share of dietary energy supply derived from cereals, roots and tubers (percent) (3-year average)": + "Proporsi pasokan energi makanan dari serealia, akar, dan umbi-umbian (persen) (rata-rata 3 tahun)", + "Average protein supply (g/cap/day) (3-year average)": + "Rata-rata pasokan protein (g/kapita/hari) (rata-rata 3 tahun)", + "Average supply of protein of animal origin (g/cap/day) (3-year average)": + "Rata-rata pasokan protein hewani (g/kapita/hari) (rata-rata 3 tahun)", + "Cereal import dependency ratio (percent) (3-year average)": + "Rasio ketergantungan impor sereal (persen) (rata-rata 3 tahun)", + "Percent of arable land equipped for irrigation (percent) (3-year average)": + "Persentase lahan pertanian yang dilengkapi irigasi (persen) (rata-rata 3 tahun)", + "Crop production index (2014-2016 = 100)": + "Indeks produksi tanaman pangan (2014-2016 = 100)", + "Livestock production index (2014-2016 = 100)": + "Indeks produksi peternakan (2014-2016 = 100)", + "Value of food imports over total merchandise exports (percent) (3-year average)": + "Nilai impor pangan terhadap total ekspor barang (persen) (rata-rata 3 tahun)", + "Food production variability (constant 2014-2016 thousand US$ per capita)": + "Variabilitas produksi pangan (ribu US$ konstan 2014-2016 per kapita)", + "Food supply variability (kcal/cap/day)": + "Variabilitas pasokan pangan (kkal/kapita/hari)", + + # ------------------------------------------------------------------------- + # ACCESS + # ------------------------------------------------------------------------- + "Gross domestic product per capita, PPP (constant 2017 international $)": + "Produk domestik bruto per kapita, PPP (internasional konstan 2017 US$)", + "Domestic food price level index (2015 = 1.00)": + "Indeks tingkat harga pangan domestik (2015 = 1,00)", + "Domestic food price volatility index": + "Indeks volatilitas harga pangan domestik", + "Prevalence of undernourishment (percent) (3-year average)": + "Prevalensi kekurangan gizi (persen) (rata-rata 3 tahun)", + "Number of people undernourished (million) (3-year average)": + "Jumlah penduduk kekurangan gizi (juta jiwa) (rata-rata 3 tahun)", + "Depth of the food deficit (kcal/capita/day) (3-year average)": + "Kedalaman defisit pangan (kkal/kapita/hari) (rata-rata 3 tahun)", + "Percentage of population using at least basic drinking water services (percent)": + "Persentase penduduk yang menggunakan layanan air minum dasar (persen)", + "Percentage of population using safely managed drinking water services (percent)": + "Persentase penduduk yang menggunakan layanan air minum yang dikelola dengan aman (persen)", + "Percentage of population using at least basic sanitation services (percent)": + "Persentase penduduk yang menggunakan layanan sanitasi dasar (persen)", + "Percentage of population using safely managed sanitation services (percent)": + "Persentase penduduk yang menggunakan layanan sanitasi yang dikelola dengan aman (persen)", + "Access to electricity (percent of rural population)": + "Akses listrik (persen penduduk pedesaan)", + "Proportion of population with access to electricity (percent)": + "Proporsi penduduk dengan akses listrik (persen)", + "Road infrastructure index": + "Indeks infrastruktur jalan", + "Rail lines density (total route-km per 100 square km of land area)": + "Kepadatan jalur kereta api (total rute-km per 100 km2 lahan)", + "Gross national income per capita (Atlas method, current US$)": + "Pendapatan nasional bruto per kapita (metode Atlas, US$ terkini)", + "Food Insecurity Experience Scale (FIES)": + "Skala Pengalaman Ketidakamanan Pangan (FIES)", + + # ------------------------------------------------------------------------- + # UTILIZATION + # ------------------------------------------------------------------------- + "Prevalence of severe food insecurity in the total population (percent) (3-year average)": + "Prevalensi kerawanan pangan berat pada total penduduk (persen) (rata-rata 3 tahun)", + "Prevalence of severe food insecurity in the male adult population (percent) (3-year average)": + "Prevalensi kerawanan pangan berat pada penduduk laki-laki dewasa (persen) (rata-rata 3 tahun)", + "Prevalence of severe food insecurity in the female adult population (percent) (3-year average)": + "Prevalensi kerawanan pangan berat pada penduduk perempuan dewasa (persen) (rata-rata 3 tahun)", + "Prevalence of moderate or severe food insecurity in the total population (percent) (3-year average)": + "Prevalensi kerawanan pangan sedang atau berat pada total penduduk (persen) (rata-rata 3 tahun)", + "Prevalence of moderate or severe food insecurity in the male adult population (percent) (3-year average)": + "Prevalensi kerawanan pangan sedang atau berat pada penduduk laki-laki dewasa (persen) (rata-rata 3 tahun)", + "Prevalence of moderate or severe food insecurity in the female adult population (percent) (3-year average)": + "Prevalensi kerawanan pangan sedang atau berat pada penduduk perempuan dewasa (persen) (rata-rata 3 tahun)", + "Number of severely food insecure people (million) (3-year average)": + "Jumlah penduduk yang mengalami kerawanan pangan berat (juta jiwa) (rata-rata 3 tahun)", + "Number of severely food insecure male adults (million) (3-year average)": + "Jumlah laki-laki dewasa yang mengalami kerawanan pangan berat (juta jiwa) (rata-rata 3 tahun)", + "Number of severely food insecure female adults (million) (3-year average)": + "Jumlah perempuan dewasa yang mengalami kerawanan pangan berat (juta jiwa) (rata-rata 3 tahun)", + "Number of moderately or severely food insecure people (million) (3-year average)": + "Jumlah penduduk yang mengalami kerawanan pangan sedang atau berat (juta jiwa) (rata-rata 3 tahun)", + "Number of moderately or severely food insecure male adults (million) (3-year average)": + "Jumlah laki-laki dewasa yang mengalami kerawanan pangan sedang atau berat (juta jiwa) (rata-rata 3 tahun)", + "Number of moderately or severely food insecure female adults (million) (3-year average)": + "Jumlah perempuan dewasa yang mengalami kerawanan pangan sedang atau berat (juta jiwa) (rata-rata 3 tahun)", + "Percentage of children under 5 years of age who are stunted (modelled estimates) (percent)": + "Persentase anak di bawah 5 tahun yang mengalami stunting (estimasi model) (persen)", + "Number of children under 5 years of age who are stunted (modeled estimates) (million)": + "Jumlah anak di bawah 5 tahun yang mengalami stunting (estimasi model) (juta jiwa)", + "Percentage of children under 5 years affected by wasting (percent)": + "Persentase anak di bawah 5 tahun yang mengalami wasting (persen)", + "Number of children under 5 years affected by wasting (million)": + "Jumlah anak di bawah 5 tahun yang mengalami wasting (juta jiwa)", + "Percentage of children under 5 years of age who are overweight (modelled estimates) (percent)": + "Persentase anak di bawah 5 tahun yang mengalami kelebihan berat badan (estimasi model) (persen)", + "Number of children under 5 years of age who are overweight (modeled estimates) (million)": + "Jumlah anak di bawah 5 tahun yang mengalami kelebihan berat badan (estimasi model) (juta jiwa)", + "Prevalence of anemia among women of reproductive age (15-49 years) (percent)": + "Prevalensi anemia pada perempuan usia reproduksi (15-49 tahun) (persen)", + "Number of women of reproductive age (15-49 years) affected by anemia (million)": + "Jumlah perempuan usia reproduksi (15-49 tahun) yang menderita anemia (juta jiwa)", + "Prevalence of obesity in the adult population (18 years and older) (percent)": + "Prevalensi obesitas pada penduduk dewasa (18 tahun ke atas) (persen)", + "Prevalence of exclusive breastfeeding among infants 0-5 months of age (percent)": + "Prevalensi pemberian ASI eksklusif pada bayi usia 0-5 bulan (persen)", + "Minimum dietary diversity for women (MDD-W) (percent)": + "Keragaman pola makan minimum untuk perempuan (MDD-W) (persen)", + + # ------------------------------------------------------------------------- + # STABILITY + # ------------------------------------------------------------------------- + "Cereal import dependency ratio (percent)": + "Rasio ketergantungan impor sereal (persen)", + "Political stability and absence of violence/terrorism (index)": + "Stabilitas politik dan tidak adanya kekerasan/terorisme (indeks)", + "Domestic food price volatility": + "Volatilitas harga pangan domestik", + "Per capita food supply variability (kcal/cap/day)": + "Variabilitas pasokan pangan per kapita (kkal/kapita/hari)", + "Percentage of arable land equipped for irrigation (percent)": + "Persentase lahan pertanian yang dilengkapi irigasi (persen)", + "GDP per capita growth (annual %)": + "Pertumbuhan PDB per kapita (% tahunan)", + "GDP growth (annual %)": + "Pertumbuhan PDB (% tahunan)", +} + + +def translate_indicator(name: str) -> str: + """Terjemahkan nama indikator ke Bahasa Indonesia. Fallback ke nama asli.""" + if not name: + return name + return INDICATOR_TRANSLATION_ID.get(name, name) + + +def translate_pillar(name: str) -> str: + """Terjemahkan nama pillar ke Bahasa Indonesia. Fallback ke nama asli.""" + if not name: + return name + return PILLAR_TRANSLATION_ID.get(name, name) + + # ============================================================================= # WINDOWS CP1252 SAFE LOGGING # ============================================================================= @@ -194,10 +370,6 @@ def _fmt_delta(delta) -> str: # ============================================================================= def _detect_series_trend(scores: list) -> str: - """ - Deteksi tren dari list skor berurutan. - Return: 'improving_consistent' | 'improving_slowing' | 'deteriorating' | 'fluctuating' - """ if len(scores) < 3: return "insufficient" @@ -220,10 +392,6 @@ def _detect_series_trend(scores: list) -> str: def _detect_country_gap(scores_by_country_year: pd.DataFrame, score_col: str) -> str: - """ - Deteksi apakah std antar negara melebar atau menyempit dari waktu ke waktu. - scores_by_country_year: df dengan kolom [year, country_id, score_col] - """ std_by_year = ( scores_by_country_year.groupby("year")[score_col] .std().dropna() @@ -242,11 +410,6 @@ def _detect_country_gap(scores_by_country_year: pd.DataFrame, score_col: str) -> def _find_anomaly_year(values_by_year: dict) -> tuple: - """ - Cari tahun dengan perubahan YoY paling ekstrem. - values_by_year: {year: score} - Return: (year, 'drop' | 'rise') atau (None, None) - """ years = sorted(values_by_year.keys()) deltas = {} for i in range(1, len(years)): @@ -285,17 +448,12 @@ def _build_overview_narrative( most_improved_delta, most_declined_country, most_declined_delta, - historical_scores: dict, # {year: score} semua tahun sebelumnya - country_scores_all: pd.DataFrame, # df [year, country_name, framework_score_1_100] + historical_scores: dict, + country_scores_all: pd.DataFrame, ) -> tuple: - """ - Narasi overview per tahun — interpretatif, plain text, bilingual. - Return: (narrative_en, narrative_id) - """ sentences_en = [] sentences_id = [] - # ---- 1. Status tahun ini vs threshold ---- perf_word_en = "good" if performance_status == "Good" else "below target" perf_word_id = "baik" if performance_status == "Good" else "di bawah target" @@ -312,7 +470,6 @@ def _build_overview_narrative( sentences_en.append(s1_en) sentences_id.append(s1_id) - # ---- 2. Kondisi YoY tahun ini ---- if yoy_val is not None and not pd.isna(yoy_val): if abs(yoy_val) < 0.5: s2_en = f"The score was relatively stable compared to the previous year." @@ -326,7 +483,6 @@ def _build_overview_narrative( sentences_en.append(s2_en) sentences_id.append(s2_id) - # ---- 3. Tren historis (baca dari semua data yang ada) ---- hist_years = sorted(historical_scores.keys()) hist_scores = [historical_scores[y] for y in hist_years if not pd.isna(historical_scores.get(y, np.nan))] @@ -352,7 +508,6 @@ def _build_overview_narrative( sentences_en.append(s3_en) sentences_id.append(s3_id) - # ---- 4. Gap antar negara ---- if not country_scores_all.empty: gap_trend = _detect_country_gap( country_scores_all[country_scores_all["year"] <= year], @@ -375,7 +530,6 @@ def _build_overview_narrative( sentences_en.append(s4_en) sentences_id.append(s4_id) - # ---- 5. Top dan bottom country tahun ini ---- if ranking_list and len(ranking_list) >= 2: top = ranking_list[0] bottom = ranking_list[-1] @@ -392,7 +546,6 @@ def _build_overview_narrative( sentences_en.append(s5_en) sentences_id.append(s5_id) - # ---- 6. Most improved / declined country ---- if most_improved_country and most_declined_country: if most_improved_country != most_declined_country: s6_en = ( @@ -430,19 +583,14 @@ def _build_pillar_narrative( top_country_score, bot_country: str, bot_country_score, - pillar_scores_history: dict, # {year: score} untuk pilar ini - all_pillar_scores_year: pd.DataFrame, # df [pillar_name, pillar_score_1_100] tahun ini - country_pillar_all: pd.DataFrame, # df [year, country_id, pillar_country_score_1_100] pilar ini + pillar_scores_history: dict, + all_pillar_scores_year: pd.DataFrame, + country_pillar_all: pd.DataFrame, ) -> tuple: - """ - Narasi pillar per tahun — interpretatif, plain text, bilingual. - Return: (narrative_en, narrative_id) - """ sentences_en = [] sentences_id = [] - # ---- 1. Posisi pilar tahun ini ---- - rank_suffix = {1: "st", 2: "nd", 3: "rd"}.get(rank_in_year, "th") + rank_suffix = {1: "st", 2: "nd", 3: "rd"}.get(rank_in_year, "th") perf_word_en = "good" if pillar_score >= PERFORMANCE_THRESHOLD else "below target" perf_word_id = "baik" if pillar_score >= PERFORMANCE_THRESHOLD else "di bawah target" @@ -457,7 +605,6 @@ def _build_pillar_narrative( sentences_en.append(s1_en) sentences_id.append(s1_id) - # ---- 2. YoY pilar ini ---- if yoy_val is not None and not pd.isna(yoy_val): if abs(yoy_val) < 0.5: s2_en = "Performance was relatively stable compared to the previous year." @@ -471,7 +618,6 @@ def _build_pillar_narrative( sentences_en.append(s2_en) sentences_id.append(s2_id) - # ---- 3. Tren historis pilar ini ---- hist_years = sorted(pillar_scores_history.keys()) hist_scores = [ pillar_scores_history[y] @@ -501,7 +647,6 @@ def _build_pillar_narrative( sentences_en.append(s3_en) sentences_id.append(s3_id) - # ---- 4. Gap antar negara dalam pilar ini ---- if not country_pillar_all.empty: gap_trend = _detect_country_gap( country_pillar_all[country_pillar_all["year"] <= year], @@ -521,7 +666,6 @@ def _build_pillar_narrative( sentences_en.append(s4_en) sentences_id.append(s4_id) - # ---- 5. Top/bottom country dalam pilar ini ---- if top_country and bot_country and top_country != bot_country: s5_en = ( f"{top_country} performed best in this pillar ({_fmt_score(top_country_score)}), " @@ -534,7 +678,6 @@ def _build_pillar_narrative( sentences_en.append(s5_en) sentences_id.append(s5_id) - # ---- 6. Posisi relatif pilar ini vs pilar lain ---- if not all_pillar_scores_year.empty and len(all_pillar_scores_year) > 1: sorted_pillars = all_pillar_scores_year.sort_values("pillar_score_1_100", ascending=False) strongest = sorted_pillars.iloc[0] @@ -605,15 +748,21 @@ class FoodSecurityAggregator: } missing_cols = required_cols - set(self.df.columns) if missing_cols: - raise ValueError( - f"Kolom berikut tidak ditemukan: {missing_cols}" - ) + raise ValueError(f"Kolom berikut tidak ditemukan: {missing_cols}") n_null_dir = self.df["direction"].isna().sum() if n_null_dir > 0: self.logger.warning(f" [DIRECTION] {n_null_dir} rows NULL -> diisi 'positive'") self.df["direction"] = self.df["direction"].fillna("positive") + # Pastikan kolom terjemahan Indonesia tersedia (bisa dari fact atau dibuat ulang) + if "indicator_name_id" not in self.df.columns: + self.df["indicator_name_id"] = self.df["indicator_name"].apply(translate_indicator) + self.logger.info(" [TRANSLATION] Kolom indicator_name_id dibuat dari mapping.") + if "pillar_name_id" not in self.df.columns: + self.df["pillar_name_id"] = self.df["pillar_name"].apply(translate_pillar) + self.logger.info(" [TRANSLATION] Kolom pillar_name_id dibuat dari mapping.") + self.logger.info(f" Rows : {len(self.df):,}") self.logger.info(f" Countries : {self.df['country_id'].nunique()}") self.logger.info(f" Indicators: {self.df['indicator_id'].nunique()}") @@ -758,6 +907,7 @@ class FoodSecurityAggregator: # ========================================================================= # STEP 2: agg_pillar_composite + # Kolom tambahan: pillar_name_id # ========================================================================= def calc_pillar_composite(self) -> pd.DataFrame: @@ -789,6 +939,9 @@ class FoodSecurityAggregator: ) df = add_yoy(df, ["pillar_id"], "pillar_score_1_100") + # Kolom terjemahan Indonesia + df["pillar_name_id"] = df["pillar_name"].apply(translate_pillar) + df["pillar_id"] = df["pillar_id"].astype(int) df["year"] = df["year"].astype(int) df["n_indicators"] = safe_int(df["n_indicators"], col_name="n_indicators", logger=self.logger) @@ -796,10 +949,12 @@ class FoodSecurityAggregator: df["rank_in_year"] = df["rank_in_year"].astype(int) df["pillar_norm"] = df["pillar_norm"].astype(float) df["pillar_score_1_100"] = df["pillar_score_1_100"].astype(float) + df["pillar_name_id"] = df["pillar_name_id"].astype(str) schema = [ bigquery.SchemaField("pillar_id", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("pillar_name", "STRING", mode="REQUIRED"), + bigquery.SchemaField("pillar_name_id", "STRING", mode="REQUIRED"), bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("pillar_norm", "FLOAT", mode="REQUIRED"), bigquery.SchemaField("n_indicators", "INTEGER", mode="REQUIRED"), @@ -821,6 +976,7 @@ class FoodSecurityAggregator: # ========================================================================= # STEP 3: agg_pillar_by_country + # Kolom tambahan: pillar_name_id # ========================================================================= def calc_pillar_by_country(self) -> pd.DataFrame: @@ -848,18 +1004,23 @@ class FoodSecurityAggregator: ) df = add_yoy(df, ["country_id", "pillar_id"], "pillar_country_score_1_100") + # Kolom terjemahan Indonesia + df["pillar_name_id"] = df["pillar_name"].apply(translate_pillar) + df["country_id"] = df["country_id"].astype(int) df["pillar_id"] = df["pillar_id"].astype(int) df["year"] = df["year"].astype(int) df["rank_in_pillar_year"] = df["rank_in_pillar_year"].astype(int) df["pillar_country_norm"] = df["pillar_country_norm"].astype(float) df["pillar_country_score_1_100"] = df["pillar_country_score_1_100"].astype(float) + df["pillar_name_id"] = df["pillar_name_id"].astype(str) schema = [ bigquery.SchemaField("country_id", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("country_name", "STRING", mode="REQUIRED"), bigquery.SchemaField("pillar_id", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("pillar_name", "STRING", mode="REQUIRED"), + bigquery.SchemaField("pillar_name_id", "STRING", mode="REQUIRED"), bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("pillar_country_norm", "FLOAT", mode="REQUIRED"), bigquery.SchemaField("pillar_country_score_1_100", "FLOAT", mode="REQUIRED"), @@ -879,6 +1040,7 @@ class FoodSecurityAggregator: # ========================================================================= # STEP 4: agg_framework_by_country + # Tidak ada kolom pillar/indicator di tabel ini (sudah di level framework) # ========================================================================= def _calc_country_composite_inmemory(self) -> pd.DataFrame: @@ -1043,6 +1205,7 @@ class FoodSecurityAggregator: # ========================================================================= # STEP 5: agg_framework_asean + # Tidak ada kolom pillar/indicator langsung di tabel ini # ========================================================================= def calc_framework_asean(self) -> pd.DataFrame: @@ -1205,6 +1368,7 @@ class FoodSecurityAggregator: # ========================================================================= # STEP 6: agg_narrative_overview + # Tidak ada kolom pillar/indicator di tabel ini # ========================================================================= def calc_narrative_overview( @@ -1284,7 +1448,6 @@ class FoodSecurityAggregator: most_improved_country = most_declined_country = None most_improved_delta = most_declined_delta = None - # Semua data skor negara untuk gap analysis country_scores_all = country_total[["year", "country_id", "framework_score_1_100"]].copy() narrative_en, narrative_id = _build_overview_narrative( @@ -1368,6 +1531,7 @@ class FoodSecurityAggregator: # ========================================================================= # STEP 7: agg_narrative_pillar + # Kolom tambahan: pillar_name_id # ========================================================================= def calc_narrative_pillar( @@ -1409,6 +1573,9 @@ class FoodSecurityAggregator: p_yoy = prow["year_over_year_change"] p_yoy_val = float(p_yoy) if pd.notna(p_yoy) else None + # Terjemahan Indonesia nama pillar + p_name_id = translate_pillar(p_name) + p_country = ( yr_country_pillar[yr_country_pillar["pillar_id"] == p_id] .sort_values("rank_in_pillar_year") @@ -1423,12 +1590,10 @@ class FoodSecurityAggregator: top_country = bot_country = None top_country_score = bot_country_score = None - # Data historis hanya sampai tahun ini hist_up_to_yr = { y: s for y, s in pillar_history.get(p_id, {}).items() if y <= yr } - # Data negara-pilar ini semua tahun (untuk gap analysis) country_pillar_all = df_pillar_by_country[ df_pillar_by_country["pillar_id"] == p_id ][["year", "country_id", "pillar_country_score_1_100"]].copy() @@ -1453,6 +1618,7 @@ class FoodSecurityAggregator: "year": yr, "pillar_id": p_id, "pillar_name": p_name, + "pillar_name_id": p_name_id, "pillar_score": round(p_score, 2), "rank_in_year": p_rank, "yoy_change": p_yoy_val, @@ -1465,11 +1631,12 @@ class FoodSecurityAggregator: }) df = pd.DataFrame(records) - df["year"] = df["year"].astype(int) - df["pillar_id"] = df["pillar_id"].astype(int) - df["rank_in_year"] = df["rank_in_year"].astype(int) - df["narrative_en"] = df["narrative_en"].astype(str) - df["narrative_id"] = df["narrative_id"].astype(str) + df["year"] = df["year"].astype(int) + df["pillar_id"] = df["pillar_id"].astype(int) + df["rank_in_year"] = df["rank_in_year"].astype(int) + df["pillar_name_id"] = df["pillar_name_id"].astype(str) + df["narrative_en"] = df["narrative_en"].astype(str) + df["narrative_id"] = df["narrative_id"].astype(str) for col in ["pillar_score", "yoy_change", "top_country_score", "bottom_country_score"]: df[col] = pd.to_numeric(df[col], errors="coerce").astype(float) @@ -1482,6 +1649,7 @@ class FoodSecurityAggregator: bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("pillar_id", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("pillar_name", "STRING", mode="REQUIRED"), + bigquery.SchemaField("pillar_name_id", "STRING", mode="REQUIRED"), bigquery.SchemaField("pillar_score", "FLOAT", mode="REQUIRED"), bigquery.SchemaField("rank_in_year", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("yoy_change", "FLOAT", mode="NULLABLE"), diff --git a/scripts/bigquery_analytical_layer.py b/scripts/bigquery_analytical_layer.py index 018be28..117e19a 100644 --- a/scripts/bigquery_analytical_layer.py +++ b/scripts/bigquery_analytical_layer.py @@ -9,6 +9,8 @@ Filtering Order: 4. Filter countries with ALL pillars (FIXED SET) 5. Filter indicators with consistent presence across FIXED countries 6. Save analytical table (dengan nama/label lengkap untuk Looker Studio) + +ADDED: Kolom indicator_name_id dan pillar_name_id (terjemahan Bahasa Indonesia) """ import pandas as pd @@ -34,6 +36,176 @@ from scripts.bigquery_helpers import ( from google.cloud import bigquery +# ============================================================================= +# TRANSLATION DICTIONARIES +# ============================================================================= + +PILLAR_TRANSLATION_ID: dict = { + # 4 pilar utama Food Security + "Availability" : "Ketersediaan", + "Access" : "Keterjangkauan", + "Utilization" : "Pemanfaatan", + "Stability" : "Stabilitas", + # Variasi penulisan yang mungkin muncul + "availability" : "Ketersediaan", + "access" : "Keterjangkauan", + "utilization" : "Pemanfaatan", + "stability" : "Stabilitas", + "Food Availability" : "Ketersediaan Pangan", + "Food Access" : "Keterjangkauan Pangan", + "Food Utilization" : "Pemanfaatan Pangan", + "Food Stability" : "Stabilitas Pangan", +} + +INDICATOR_TRANSLATION_ID: dict = { + # ------------------------------------------------------------------------- + # AVAILABILITY + # ------------------------------------------------------------------------- + "Average dietary energy supply adequacy (percent) (3-year average)": + "Kecukupan rata-rata pasokan energi makanan (persen) (rata-rata 3 tahun)", + "Average value of food production (constant 2014-2016 thousand US$) (3-year average)": + "Nilai rata-rata produksi pangan (ribu US$ konstan 2014-2016) (rata-rata 3 tahun)", + "Share of dietary energy supply derived from cereals, roots and tubers (percent) (3-year average)": + "Proporsi pasokan energi makanan dari serealia, akar, dan umbi-umbian (persen) (rata-rata 3 tahun)", + "Average protein supply (g/cap/day) (3-year average)": + "Rata-rata pasokan protein (g/kapita/hari) (rata-rata 3 tahun)", + "Average supply of protein of animal origin (g/cap/day) (3-year average)": + "Rata-rata pasokan protein hewani (g/kapita/hari) (rata-rata 3 tahun)", + "Cereal import dependency ratio (percent) (3-year average)": + "Rasio ketergantungan impor sereal (persen) (rata-rata 3 tahun)", + "Percent of arable land equipped for irrigation (percent) (3-year average)": + "Persentase lahan pertanian yang dilengkapi irigasi (persen) (rata-rata 3 tahun)", + "Crop production index (2014-2016 = 100)": + "Indeks produksi tanaman pangan (2014-2016 = 100)", + "Livestock production index (2014-2016 = 100)": + "Indeks produksi peternakan (2014-2016 = 100)", + "Value of food imports over total merchandise exports (percent) (3-year average)": + "Nilai impor pangan terhadap total ekspor barang (persen) (rata-rata 3 tahun)", + "Food production variability (constant 2014-2016 thousand US$ per capita)": + "Variabilitas produksi pangan (ribu US$ konstan 2014-2016 per kapita)", + "Food supply variability (kcal/cap/day)": + "Variabilitas pasokan pangan (kkal/kapita/hari)", + + # ------------------------------------------------------------------------- + # ACCESS + # ------------------------------------------------------------------------- + "Gross domestic product per capita, PPP (constant 2017 international $)": + "Produk domestik bruto per kapita, PPP (internasional konstan 2017 US$)", + "Domestic food price level index (2015 = 1.00)": + "Indeks tingkat harga pangan domestik (2015 = 1,00)", + "Domestic food price volatility index": + "Indeks volatilitas harga pangan domestik", + "Prevalence of undernourishment (percent) (3-year average)": + "Prevalensi kekurangan gizi (persen) (rata-rata 3 tahun)", + "Number of people undernourished (million) (3-year average)": + "Jumlah penduduk kekurangan gizi (juta jiwa) (rata-rata 3 tahun)", + "Depth of the food deficit (kcal/capita/day) (3-year average)": + "Kedalaman defisit pangan (kkal/kapita/hari) (rata-rata 3 tahun)", + "Percentage of population using at least basic drinking water services (percent)": + "Persentase penduduk yang menggunakan layanan air minum dasar (persen)", + "Percentage of population using safely managed drinking water services (percent)": + "Persentase penduduk yang menggunakan layanan air minum yang dikelola dengan aman (persen)", + "Percentage of population using at least basic sanitation services (percent)": + "Persentase penduduk yang menggunakan layanan sanitasi dasar (persen)", + "Percentage of population using safely managed sanitation services (percent)": + "Persentase penduduk yang menggunakan layanan sanitasi yang dikelola dengan aman (persen)", + "Access to electricity (percent of rural population)": + "Akses listrik (persen penduduk pedesaan)", + "Proportion of population with access to electricity (percent)": + "Proporsi penduduk dengan akses listrik (persen)", + "Road infrastructure index": + "Indeks infrastruktur jalan", + "Rail lines density (total route-km per 100 square km of land area)": + "Kepadatan jalur kereta api (total rute-km per 100 km2 lahan)", + "Gross national income per capita (Atlas method, current US$)": + "Pendapatan nasional bruto per kapita (metode Atlas, US$ terkini)", + "Food Insecurity Experience Scale (FIES)": + "Skala Pengalaman Ketidakamanan Pangan (FIES)", + + # ------------------------------------------------------------------------- + # UTILIZATION + # ------------------------------------------------------------------------- + "Prevalence of severe food insecurity in the total population (percent) (3-year average)": + "Prevalensi kerawanan pangan berat pada total penduduk (persen) (rata-rata 3 tahun)", + "Prevalence of severe food insecurity in the male adult population (percent) (3-year average)": + "Prevalensi kerawanan pangan berat pada penduduk laki-laki dewasa (persen) (rata-rata 3 tahun)", + "Prevalence of severe food insecurity in the female adult population (percent) (3-year average)": + "Prevalensi kerawanan pangan berat pada penduduk perempuan dewasa (persen) (rata-rata 3 tahun)", + "Prevalence of moderate or severe food insecurity in the total population (percent) (3-year average)": + "Prevalensi kerawanan pangan sedang atau berat pada total penduduk (persen) (rata-rata 3 tahun)", + "Prevalence of moderate or severe food insecurity in the male adult population (percent) (3-year average)": + "Prevalensi kerawanan pangan sedang atau berat pada penduduk laki-laki dewasa (persen) (rata-rata 3 tahun)", + "Prevalence of moderate or severe food insecurity in the female adult population (percent) (3-year average)": + "Prevalensi kerawanan pangan sedang atau berat pada penduduk perempuan dewasa (persen) (rata-rata 3 tahun)", + "Number of severely food insecure people (million) (3-year average)": + "Jumlah penduduk yang mengalami kerawanan pangan berat (juta jiwa) (rata-rata 3 tahun)", + "Number of severely food insecure male adults (million) (3-year average)": + "Jumlah laki-laki dewasa yang mengalami kerawanan pangan berat (juta jiwa) (rata-rata 3 tahun)", + "Number of severely food insecure female adults (million) (3-year average)": + "Jumlah perempuan dewasa yang mengalami kerawanan pangan berat (juta jiwa) (rata-rata 3 tahun)", + "Number of moderately or severely food insecure people (million) (3-year average)": + "Jumlah penduduk yang mengalami kerawanan pangan sedang atau berat (juta jiwa) (rata-rata 3 tahun)", + "Number of moderately or severely food insecure male adults (million) (3-year average)": + "Jumlah laki-laki dewasa yang mengalami kerawanan pangan sedang atau berat (juta jiwa) (rata-rata 3 tahun)", + "Number of moderately or severely food insecure female adults (million) (3-year average)": + "Jumlah perempuan dewasa yang mengalami kerawanan pangan sedang atau berat (juta jiwa) (rata-rata 3 tahun)", + "Percentage of children under 5 years of age who are stunted (modelled estimates) (percent)": + "Persentase anak di bawah 5 tahun yang mengalami stunting (estimasi model) (persen)", + "Number of children under 5 years of age who are stunted (modeled estimates) (million)": + "Jumlah anak di bawah 5 tahun yang mengalami stunting (estimasi model) (juta jiwa)", + "Percentage of children under 5 years affected by wasting (percent)": + "Persentase anak di bawah 5 tahun yang mengalami wasting (persen)", + "Number of children under 5 years affected by wasting (million)": + "Jumlah anak di bawah 5 tahun yang mengalami wasting (juta jiwa)", + "Percentage of children under 5 years of age who are overweight (modelled estimates) (percent)": + "Persentase anak di bawah 5 tahun yang mengalami kelebihan berat badan (estimasi model) (persen)", + "Number of children under 5 years of age who are overweight (modeled estimates) (million)": + "Jumlah anak di bawah 5 tahun yang mengalami kelebihan berat badan (estimasi model) (juta jiwa)", + "Prevalence of anemia among women of reproductive age (15-49 years) (percent)": + "Prevalensi anemia pada perempuan usia reproduksi (15-49 tahun) (persen)", + "Number of women of reproductive age (15-49 years) affected by anemia (million)": + "Jumlah perempuan usia reproduksi (15-49 tahun) yang menderita anemia (juta jiwa)", + "Prevalence of obesity in the adult population (18 years and older) (percent)": + "Prevalensi obesitas pada penduduk dewasa (18 tahun ke atas) (persen)", + "Prevalence of exclusive breastfeeding among infants 0-5 months of age (percent)": + "Prevalensi pemberian ASI eksklusif pada bayi usia 0-5 bulan (persen)", + "Minimum dietary diversity for women (MDD-W) (percent)": + "Keragaman pola makan minimum untuk perempuan (MDD-W) (persen)", + + # ------------------------------------------------------------------------- + # STABILITY + # ------------------------------------------------------------------------- + "Cereal import dependency ratio (percent)": + "Rasio ketergantungan impor sereal (persen)", + "Political stability and absence of violence/terrorism (index)": + "Stabilitas politik dan tidak adanya kekerasan/terorisme (indeks)", + "Domestic food price volatility": + "Volatilitas harga pangan domestik", + "Per capita food supply variability (kcal/cap/day)": + "Variabilitas pasokan pangan per kapita (kkal/kapita/hari)", + "Percentage of arable land equipped for irrigation (percent)": + "Persentase lahan pertanian yang dilengkapi irigasi (persen)", + "GDP per capita growth (annual %)": + "Pertumbuhan PDB per kapita (% tahunan)", + "GDP growth (annual %)": + "Pertumbuhan PDB (% tahunan)", +} + + +def translate_indicator(name: str) -> str: + """Terjemahkan nama indikator ke Bahasa Indonesia. Fallback ke nama asli.""" + if not name: + return name + return INDICATOR_TRANSLATION_ID.get(name, name) + + +def translate_pillar(name: str) -> str: + """Terjemahkan nama pillar ke Bahasa Indonesia. Fallback ke nama asli.""" + if not name: + return name + return PILLAR_TRANSLATION_ID.get(name, name) + + # ============================================================================= # ANALYTICAL LAYER CLASS # ============================================================================= @@ -46,9 +218,13 @@ class AnalyticalLayerLoader: 1. Complete per country (no gaps from start_year to end_year) 2. Filter countries with all pillars 3. Ensure indicators have consistent country count across all years - 4. Save dengan kolom lengkap (nama + ID) untuk kemudahan Looker Studio + 4. Save dengan kolom lengkap (nama + ID + nama Indonesia) untuk Looker Studio Output: fact_asean_food_security_selected -> DW layer (Gold) -> fs_asean_gold + + Kolom tambahan: + - indicator_name_id : terjemahan Bahasa Indonesia dari indicator_name + - pillar_name_id : terjemahan Bahasa Indonesia dari pillar_name """ def __init__(self, client: bigquery.Client): @@ -424,9 +600,6 @@ class AnalyticalLayerLoader: return year_stats def save_analytical_table(self): - # --------------------------------------------------------------- - # CHANGED: nama tabel baru + kolom lengkap untuk Looker Studio - # --------------------------------------------------------------- table_name = 'fact_asean_food_security_selected' self.logger.info("\n" + "=" * 80) @@ -434,11 +607,6 @@ class AnalyticalLayerLoader: self.logger.info("=" * 80) try: - # ------------------------------------------------------------------ - # Pilih kolom: ID + Nama lengkap + value - # Kolom nama memudahkan filtering/slicing langsung di Looker Studio - # tanpa perlu join ulang ke tabel dimensi. - # ------------------------------------------------------------------ analytical_df = self.df_clean[[ 'country_id', 'country_name', @@ -452,37 +620,68 @@ class AnalyticalLayerLoader: 'value', ]].copy() + # ------------------------------------------------------------------ + # TAMBAHAN: kolom terjemahan Bahasa Indonesia + # indicator_name_id : terjemahan Bahasa Indonesia dari indicator_name + # pillar_name_id : terjemahan Bahasa Indonesia dari pillar_name + # ------------------------------------------------------------------ + analytical_df['indicator_name_id'] = analytical_df['indicator_name'].apply(translate_indicator) + analytical_df['pillar_name_id'] = analytical_df['pillar_name'].apply(translate_pillar) + + # Log indikator yang belum punya terjemahan (fallback ke nama asli) + no_trans_ind = analytical_df[ + analytical_df['indicator_name_id'] == analytical_df['indicator_name'] + ]['indicator_name'].unique() + if len(no_trans_ind) > 0: + self.logger.warning( + f" [TRANSLATION] {len(no_trans_ind)} indicator(s) tidak ada di kamus " + f"(menggunakan nama asli): {list(no_trans_ind)[:5]}" + ) + + no_trans_pil = analytical_df[ + analytical_df['pillar_name_id'] == analytical_df['pillar_name'] + ]['pillar_name'].unique() + if len(no_trans_pil) > 0: + self.logger.warning( + f" [TRANSLATION] {len(no_trans_pil)} pillar(s) tidak ada di kamus " + f"(menggunakan nama asli): {list(no_trans_pil)}" + ) + analytical_df = analytical_df.sort_values( ['year', 'country_name', 'pillar_name', 'indicator_name'] ).reset_index(drop=True) # Pastikan tipe data konsisten - analytical_df['country_id'] = analytical_df['country_id'].astype(int) - analytical_df['country_name'] = analytical_df['country_name'].astype(str) - analytical_df['indicator_id'] = analytical_df['indicator_id'].astype(int) - analytical_df['indicator_name']= analytical_df['indicator_name'].astype(str) - analytical_df['direction'] = analytical_df['direction'].astype(str) - analytical_df['pillar_id'] = analytical_df['pillar_id'].astype(int) - analytical_df['pillar_name'] = analytical_df['pillar_name'].astype(str) - analytical_df['time_id'] = analytical_df['time_id'].astype(int) - analytical_df['year'] = analytical_df['year'].astype(int) - analytical_df['value'] = analytical_df['value'].astype(float) + analytical_df['country_id'] = analytical_df['country_id'].astype(int) + analytical_df['country_name'] = analytical_df['country_name'].astype(str) + analytical_df['indicator_id'] = analytical_df['indicator_id'].astype(int) + analytical_df['indicator_name'] = analytical_df['indicator_name'].astype(str) + analytical_df['indicator_name_id'] = analytical_df['indicator_name_id'].astype(str) + analytical_df['direction'] = analytical_df['direction'].astype(str) + analytical_df['pillar_id'] = analytical_df['pillar_id'].astype(int) + analytical_df['pillar_name'] = analytical_df['pillar_name'].astype(str) + analytical_df['pillar_name_id'] = analytical_df['pillar_name_id'].astype(str) + analytical_df['time_id'] = analytical_df['time_id'].astype(int) + analytical_df['year'] = analytical_df['year'].astype(int) + analytical_df['value'] = analytical_df['value'].astype(float) self.logger.info(f" Kolom yang disimpan: {list(analytical_df.columns)}") self.logger.info(f" Total rows: {len(analytical_df):,}") # Schema BigQuery schema = [ - bigquery.SchemaField("country_id", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("country_name", "STRING", mode="REQUIRED"), - bigquery.SchemaField("indicator_id", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("indicator_name", "STRING", mode="REQUIRED"), - bigquery.SchemaField("direction", "STRING", mode="REQUIRED"), - bigquery.SchemaField("pillar_id", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("pillar_name", "STRING", mode="REQUIRED"), - bigquery.SchemaField("time_id", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"), - bigquery.SchemaField("value", "FLOAT", mode="REQUIRED"), + bigquery.SchemaField("country_id", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("country_name", "STRING", mode="REQUIRED"), + bigquery.SchemaField("indicator_id", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("indicator_name", "STRING", mode="REQUIRED"), + bigquery.SchemaField("indicator_name_id", "STRING", mode="REQUIRED"), + bigquery.SchemaField("direction", "STRING", mode="REQUIRED"), + bigquery.SchemaField("pillar_id", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("pillar_name", "STRING", mode="REQUIRED"), + bigquery.SchemaField("pillar_name_id", "STRING", mode="REQUIRED"), + bigquery.SchemaField("time_id", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"), + bigquery.SchemaField("value", "FLOAT", mode="REQUIRED"), ] rows_loaded = load_to_bigquery( @@ -508,7 +707,7 @@ class AnalyticalLayerLoader: 'fixed_countries': len(self.selected_country_ids), 'no_gaps' : True, 'layer' : 'gold', - 'columns' : 'id + name + value (Looker Studio ready)' + 'columns' : 'id + name + name_id (Looker Studio ready)' }), 'validation_metrics' : json.dumps({ 'fixed_countries' : len(self.selected_country_ids), @@ -517,8 +716,8 @@ class AnalyticalLayerLoader: } save_etl_metadata(self.client, metadata) - self.logger.info(f" ✓ {table_name}: {rows_loaded:,} rows → [DW/Gold] fs_asean_gold") - self.logger.info(f" Metadata → [AUDIT] etl_metadata") + self.logger.info(f" [OK] {table_name}: {rows_loaded:,} rows -> [DW/Gold] fs_asean_gold") + self.logger.info(f" Metadata -> [AUDIT] etl_metadata") return rows_loaded except Exception as e: @@ -530,7 +729,7 @@ class AnalyticalLayerLoader: self.pipeline_metadata['start_time'] = self.pipeline_start self.logger.info("\n" + "=" * 80) - self.logger.info("Output: fact_asean_food_security_selected → fs_asean_gold") + self.logger.info("Output: fact_asean_food_security_selected -> fs_asean_gold") self.logger.info("=" * 80) self.load_source_data() @@ -577,7 +776,7 @@ def run_analytical_layer(): if __name__ == "__main__": print("=" * 80) - print("Output: fact_asean_food_security_selected → fs_asean_gold") + print("Output: fact_asean_food_security_selected -> fs_asean_gold") print("=" * 80) logger = setup_logging()