agregate fact selected
This commit is contained in:
@@ -22,6 +22,8 @@ Kimball ETL Flow:
|
||||
│ agg_pillar_by_country │
|
||||
│ agg_framework_by_country │
|
||||
│ agg_framework_asean │
|
||||
│ ↓ │
|
||||
│ agg_indicator_norm │
|
||||
│ │
|
||||
│ AUDIT : etl_logs, etl_metadata (setiap layer) │
|
||||
└──────────────────────────────────────────────────────────────────────────┘
|
||||
@@ -36,13 +38,15 @@ Task Order:
|
||||
→ dimensional_model_to_gold
|
||||
→ analytical_layer_to_gold
|
||||
→ aggregation_to_gold
|
||||
→ indicator_norm_aggregation_to_gold
|
||||
|
||||
Scripts folder harus berisi:
|
||||
- bigquery_raw_layer.py (run_verify_connection, run_load_fao, ...)
|
||||
- bigquery_cleaned_layer.py (run_cleaned_integration)
|
||||
- bigquery_dimensional_model.py (run_dimensional_model)
|
||||
- bigquery_analytical_layer.py (run_analytical_layer)
|
||||
- bigquery_analysis_aggregation.py (run_aggregation)
|
||||
- bigquery_raw_layer.py (run_verify_connection, run_load_fao, ...)
|
||||
- bigquery_cleaned_layer.py (run_cleaned_integration)
|
||||
- bigquery_dimensional_model.py (run_dimensional_model)
|
||||
- bigquery_analytical_layer.py (run_analytical_layer)
|
||||
- bigquery_analysis_aggregation.py (run_aggregation)
|
||||
- bigquery_aggraget_fact_selected_layer.py (run_indicator_norm_aggregation)
|
||||
- bigquery_config.py
|
||||
- bigquery_helpers.py
|
||||
- bigquery_datasource.py
|
||||
@@ -71,6 +75,9 @@ from scripts.bigquery_analytical_layer import (
|
||||
from scripts.bigquery_aggregate_layer import (
|
||||
run_aggregation,
|
||||
)
|
||||
from scripts.bigquery_aggraget_fact_selected_layer import (
|
||||
run_indicator_norm_aggregation,
|
||||
)
|
||||
|
||||
# DEFAULT ARGS
|
||||
|
||||
@@ -136,5 +143,21 @@ with DAG(
|
||||
python_callable = run_aggregation
|
||||
)
|
||||
|
||||
|
||||
task_verify >> task_fao >> task_worldbank >> task_unicef >> task_staging >> task_cleaned >> task_dimensional >> task_analytical >> task_aggregation
|
||||
task_indicator_norm = PythonOperator(
|
||||
task_id = "indicator_norm_aggregation_to_gold",
|
||||
python_callable = run_indicator_norm_aggregation
|
||||
)
|
||||
|
||||
# Task Dependencies
|
||||
(
|
||||
task_verify
|
||||
>> task_fao
|
||||
>> task_worldbank
|
||||
>> task_unicef
|
||||
>> task_staging
|
||||
>> task_cleaned
|
||||
>> task_dimensional
|
||||
>> task_analytical
|
||||
>> task_aggregation
|
||||
>> task_indicator_norm
|
||||
)
|
||||
730
scripts/bigquery_aggraget_fact_selected_layer.py
Normal file
730
scripts/bigquery_aggraget_fact_selected_layer.py
Normal file
@@ -0,0 +1,730 @@
|
||||
"""
|
||||
BIGQUERY ANALYSIS LAYER - INDICATOR NORM AGGREGATION
|
||||
Tabel: agg_indicator_norm -> fs_asean_gold
|
||||
|
||||
Tujuan:
|
||||
Menghitung norm_value per indikator per negara per tahun, sehingga dapat
|
||||
melihat performa setiap indikator secara individual (lower_better & higher_better
|
||||
sudah dibalik).
|
||||
|
||||
Framework Classification Logic:
|
||||
- Semua indikator berlabel "MDGs" secara default.
|
||||
- Indikator yang ada dalam SDG_ONLY_KEYWORDS akan berlabel "SDGs" mulai dari
|
||||
sdgs_start_year (tahun pertama FIES hadir, dihitung otomatis).
|
||||
- Indikator yang SUDAH ADA sebelum sdgs_start_year DAN juga termasuk
|
||||
SDG_ONLY_KEYWORDS akan memiliki DUA label framework:
|
||||
* "MDGs" untuk year < sdgs_start_year
|
||||
* "SDGs" untuk year >= sdgs_start_year
|
||||
- Indikator yang TIDAK ada dalam SDG_ONLY_KEYWORDS selalu "MDGs".
|
||||
|
||||
Output Schema (agg_indicator_norm):
|
||||
year, country_id, country_name,
|
||||
indicator_id, indicator_name, direction,
|
||||
pillar_id, pillar_name,
|
||||
framework, -- "MDGs" | "SDGs"
|
||||
value, -- raw value asli
|
||||
norm_value, -- 0-1, direction sudah diperhitungkan
|
||||
norm_score_1_100, -- scaled 1-100 (global per indikator)
|
||||
rank_in_indicator_year, -- rank negara di dalam satu indikator & tahun
|
||||
rank_in_country_year -- rank indikator di dalam satu negara & tahun
|
||||
"""
|
||||
|
||||
import pandas as pd
|
||||
import numpy as np
|
||||
from datetime import datetime
|
||||
import logging
|
||||
import json
|
||||
|
||||
from scripts.bigquery_config import get_bigquery_client
|
||||
from scripts.bigquery_helpers import (
|
||||
log_update,
|
||||
load_to_bigquery,
|
||||
read_from_bigquery,
|
||||
setup_logging,
|
||||
save_etl_metadata,
|
||||
)
|
||||
from google.cloud import bigquery
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# SDG-ONLY KEYWORD SET
|
||||
# =============================================================================
|
||||
|
||||
SDG_ONLY_KEYWORDS: frozenset = frozenset([
|
||||
# TARGET 2.1.1 - Undernourishment
|
||||
"prevalence of undernourishment (percent) (3-year average)",
|
||||
"number of people undernourished (million) (3-year average)",
|
||||
# TARGET 2.1.2 - Food Insecurity (FIES)
|
||||
"prevalence of severe food insecurity in the total population (percent) (3-year average)",
|
||||
"prevalence of severe food insecurity in the male adult population (percent) (3-year average)",
|
||||
"prevalence of severe food insecurity in the female adult population (percent) (3-year average)",
|
||||
"prevalence of moderate or severe food insecurity in the total population (percent) (3-year average)",
|
||||
"prevalence of moderate or severe food insecurity in the male adult population (percent) (3-year average)",
|
||||
"prevalence of moderate or severe food insecurity in the female adult population (percent) (3-year average)",
|
||||
"number of severely food insecure people (million) (3-year average)",
|
||||
"number of severely food insecure male adults (million) (3-year average)",
|
||||
"number of severely food insecure female adults (million) (3-year average)",
|
||||
"number of moderately or severely food insecure people (million) (3-year average)",
|
||||
"number of moderately or severely food insecure male adults (million) (3-year average)",
|
||||
"number of moderately or severely food insecure female adults (million) (3-year average)",
|
||||
# TARGET 2.2.1 - Stunting
|
||||
"percentage of children under 5 years of age who are stunted (modelled estimates) (percent)",
|
||||
"number of children under 5 years of age who are stunted (modeled estimates) (million)",
|
||||
# TARGET 2.2.2 - Wasting
|
||||
"percentage of children under 5 years affected by wasting (percent)",
|
||||
"number of children under 5 years affected by wasting (million)",
|
||||
# TARGET 2.2.2 - Overweight (children)
|
||||
"percentage of children under 5 years of age who are overweight (modelled estimates) (percent)",
|
||||
"number of children under 5 years of age who are overweight (modeled estimates) (million)",
|
||||
# TARGET 2.2.3 - Anaemia
|
||||
"prevalence of anemia among women of reproductive age (15-49 years) (percent)",
|
||||
"number of women of reproductive age (15-49 years) affected by anemia (million)",
|
||||
])
|
||||
|
||||
# Lowercase set untuk matching case-insensitive
|
||||
_SDG_ONLY_LOWER: frozenset = frozenset(k.lower() for k in SDG_ONLY_KEYWORDS)
|
||||
|
||||
# FIES-specific keywords untuk deteksi sdgs_start_year
|
||||
# (indikator yang HANYA muncul setelah SDGs era dimulai)
|
||||
_FIES_DETECTION_KEYWORDS: frozenset = frozenset([
|
||||
"prevalence of severe food insecurity in the total population (percent) (3-year average)",
|
||||
"prevalence of moderate or severe food insecurity in the total population (percent) (3-year average)",
|
||||
"number of severely food insecure people (million) (3-year average)",
|
||||
"number of moderately or severely food insecure people (million) (3-year average)",
|
||||
])
|
||||
_FIES_DETECTION_LOWER: frozenset = frozenset(k.lower() for k in _FIES_DETECTION_KEYWORDS)
|
||||
|
||||
DIRECTION_INVERT_KEYWORDS = frozenset({
|
||||
"negative", "lower_better", "lower_is_better", "inverse", "neg",
|
||||
})
|
||||
DIRECTION_POSITIVE_KEYWORDS = frozenset({
|
||||
"positive", "higher_better", "higher_is_better",
|
||||
})
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# PURE HELPERS
|
||||
# =============================================================================
|
||||
|
||||
def _should_invert(direction: str, logger=None, context: str = "") -> bool:
|
||||
d = str(direction).lower().strip()
|
||||
if d in DIRECTION_INVERT_KEYWORDS:
|
||||
return True
|
||||
if d in DIRECTION_POSITIVE_KEYWORDS:
|
||||
return False
|
||||
if logger:
|
||||
logger.warning(
|
||||
f" [DIRECTION WARNING] Unknown direction '{direction}' "
|
||||
f"{'(' + context + ')' if context else ''}. Defaulting to positive (no invert)."
|
||||
)
|
||||
return False
|
||||
|
||||
|
||||
def global_minmax(series: pd.Series, lo: float = 1.0, hi: float = 100.0) -> pd.Series:
|
||||
values = series.dropna().values
|
||||
if len(values) == 0:
|
||||
return pd.Series(np.nan, index=series.index)
|
||||
v_min, v_max = values.min(), values.max()
|
||||
if v_min == v_max:
|
||||
return pd.Series((lo + hi) / 2.0, index=series.index)
|
||||
result = np.full(len(series), np.nan)
|
||||
not_nan = series.notna()
|
||||
result[not_nan.values] = lo + (series[not_nan].values - v_min) / (v_max - v_min) * (hi - lo)
|
||||
return pd.Series(result, index=series.index)
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# MAIN CLASS
|
||||
# =============================================================================
|
||||
|
||||
class IndicatorNormAggregator:
|
||||
"""
|
||||
Hitung norm_value per indikator untuk seluruh data di
|
||||
fact_asean_food_security_selected, lalu simpan ke agg_indicator_norm.
|
||||
|
||||
Alur:
|
||||
1. Load fact_asean_food_security_selected
|
||||
2. Deteksi sdgs_start_year (tahun pertama FIES hadir di data)
|
||||
3. Assign framework per baris mengikuti aturan MDGs/SDGs dual-label
|
||||
4. Hitung norm_value per indikator (direction-aware, 0-1)
|
||||
5. Scale ke 1-100 per indikator (global)
|
||||
6. Hitung rank_in_indicator_year & rank_in_country_year
|
||||
7. Simpan ke BigQuery
|
||||
"""
|
||||
|
||||
def __init__(self, client: bigquery.Client):
|
||||
self.client = client
|
||||
self.logger = logging.getLogger(self.__class__.__name__)
|
||||
self.logger.propagate = False
|
||||
|
||||
self.df = None
|
||||
self.sdgs_start_year = None
|
||||
|
||||
self.pipeline_start = None
|
||||
self.pipeline_metadata = {
|
||||
"rows_fetched": 0,
|
||||
"rows_loaded" : 0,
|
||||
"start_time" : None,
|
||||
"end_time" : None,
|
||||
}
|
||||
|
||||
# =========================================================================
|
||||
# STEP 1: Load
|
||||
# =========================================================================
|
||||
|
||||
def load_data(self):
|
||||
self.logger.info("\n" + "=" * 80)
|
||||
self.logger.info("STEP 1: LOAD DATA — fact_asean_food_security_selected")
|
||||
self.logger.info("=" * 80)
|
||||
|
||||
self.df = read_from_bigquery(
|
||||
self.client, "fact_asean_food_security_selected", layer="gold"
|
||||
)
|
||||
|
||||
required = {
|
||||
"country_id", "country_name",
|
||||
"indicator_id", "indicator_name", "direction",
|
||||
"pillar_id", "pillar_name",
|
||||
"year", "value",
|
||||
}
|
||||
missing = required - set(self.df.columns)
|
||||
if missing:
|
||||
raise ValueError(f"Kolom tidak ditemukan: {missing}")
|
||||
|
||||
n_null = self.df["direction"].isna().sum()
|
||||
if n_null > 0:
|
||||
self.logger.warning(f" {n_null} rows direction NULL -> diisi 'positive'")
|
||||
self.df["direction"] = self.df["direction"].fillna("positive")
|
||||
|
||||
self.pipeline_metadata["rows_fetched"] = len(self.df)
|
||||
self.logger.info(f" Rows : {len(self.df):,}")
|
||||
self.logger.info(f" Countries : {self.df['country_id'].nunique()}")
|
||||
self.logger.info(f" Indicators: {self.df['indicator_id'].nunique()}")
|
||||
self.logger.info(
|
||||
f" Years : {int(self.df['year'].min())} - {int(self.df['year'].max())}"
|
||||
)
|
||||
|
||||
# =========================================================================
|
||||
# STEP 2: Deteksi sdgs_start_year
|
||||
# =========================================================================
|
||||
|
||||
def _detect_sdgs_start_year(self) -> int:
|
||||
"""
|
||||
sdgs_start_year = tahun pertama FIES hadir di data.
|
||||
FIES = indikator yang ada di _FIES_DETECTION_LOWER.
|
||||
|
||||
Fallback ke metode gap-terbesar pada min_year distribusi per indikator
|
||||
jika FIES tidak ditemukan.
|
||||
"""
|
||||
self.logger.info("\n" + "=" * 80)
|
||||
self.logger.info("STEP 2: DETECT sdgs_start_year (first FIES year)")
|
||||
self.logger.info("=" * 80)
|
||||
|
||||
# Metode 1: Explicit FIES detection
|
||||
fies_rows = self.df[
|
||||
self.df["indicator_name"].str.lower().str.strip().isin(_FIES_DETECTION_LOWER)
|
||||
]
|
||||
if not fies_rows.empty:
|
||||
sdgs_start = int(fies_rows["year"].min())
|
||||
n_fies_ind = fies_rows["indicator_name"].nunique()
|
||||
self.logger.info(f" [Metode 1 - FIES explicit] sdgs_start_year = {sdgs_start}")
|
||||
self.logger.info(f" FIES indicators found: {n_fies_ind}, first year = {sdgs_start}")
|
||||
for nm in fies_rows["indicator_name"].unique():
|
||||
min_y = int(fies_rows[fies_rows["indicator_name"] == nm]["year"].min())
|
||||
self.logger.info(f" - {nm[:60]} (first year: {min_y})")
|
||||
return sdgs_start
|
||||
|
||||
# Fallback: gap-terbesar
|
||||
self.logger.info(" [Metode 1] Tidak ada FIES rows -> fallback gap-terbesar")
|
||||
ind_min_year = (
|
||||
self.df.groupby("indicator_id")["year"]
|
||||
.min().reset_index()
|
||||
.rename(columns={"year": "min_year"})
|
||||
)
|
||||
unique_years = sorted(ind_min_year["min_year"].unique())
|
||||
self.logger.info(f" Unique min_year per indikator: {unique_years}")
|
||||
|
||||
if len(unique_years) == 1:
|
||||
sdgs_start = int(unique_years[0]) + 9999
|
||||
self.logger.info(" Hanya 1 cluster -> semua MDGs")
|
||||
else:
|
||||
gaps = [
|
||||
(unique_years[i+1] - unique_years[i], unique_years[i], unique_years[i+1])
|
||||
for i in range(len(unique_years) - 1)
|
||||
]
|
||||
gaps.sort(reverse=True)
|
||||
_, y_before, y_after = gaps[0]
|
||||
sdgs_start = int(y_after)
|
||||
self.logger.info(
|
||||
f" Gap terbesar: {y_before} -> {y_after} -> sdgs_start_year = {sdgs_start}"
|
||||
)
|
||||
|
||||
return sdgs_start
|
||||
|
||||
# =========================================================================
|
||||
# STEP 3: Assign framework
|
||||
# =========================================================================
|
||||
|
||||
def _assign_framework(self):
|
||||
"""
|
||||
Tambahkan kolom 'framework' ke self.df.
|
||||
|
||||
Aturan per baris:
|
||||
- Indikator TIDAK di SDG_ONLY_KEYWORDS:
|
||||
framework = "MDGs" (selalu, semua tahun)
|
||||
|
||||
- Indikator DI SDG_ONLY_KEYWORDS:
|
||||
year < sdgs_start_year -> framework = "MDGs"
|
||||
year >= sdgs_start_year -> framework = "SDGs"
|
||||
|
||||
Contoh dual-label (indicator "prevalence of undernourishment"):
|
||||
Jika data ada dari 2013 dan sdgs_start_year = 2019:
|
||||
- Baris 2013-2018: framework = "MDGs" (masuk era MDGs)
|
||||
- Baris 2019-dst : framework = "SDGs" (masuk era SDGs)
|
||||
Sehingga indikator ini muncul di kedua framework tanpa duplikasi baris.
|
||||
|
||||
Contoh FIES-only (indicator "prevalence of severe food insecurity"):
|
||||
Data baru ada mulai 2019 (= sdgs_start_year):
|
||||
- Semua baris: framework = "SDGs"
|
||||
"""
|
||||
self.logger.info("\n" + "=" * 80)
|
||||
self.logger.info("STEP 3: ASSIGN FRAMEWORK PER BARIS")
|
||||
self.logger.info(f" sdgs_start_year = {self.sdgs_start_year}")
|
||||
self.logger.info("=" * 80)
|
||||
|
||||
df = self.df.copy()
|
||||
|
||||
# Flag apakah indikator ada di SDG_ONLY_KEYWORDS
|
||||
df["_is_sdg_kw"] = df["indicator_name"].str.lower().str.strip().isin(_SDG_ONLY_LOWER)
|
||||
|
||||
# Default semua MDGs
|
||||
df["framework"] = "MDGs"
|
||||
|
||||
# SDG_ONLY + year >= sdgs_start_year -> SDGs
|
||||
mask_sdgs = df["_is_sdg_kw"] & (df["year"] >= self.sdgs_start_year)
|
||||
df.loc[mask_sdgs, "framework"] = "SDGs"
|
||||
|
||||
# Drop helper column
|
||||
df = df.drop(columns=["_is_sdg_kw"])
|
||||
|
||||
# ---- Logging ----
|
||||
fw_dist = df["framework"].value_counts()
|
||||
self.logger.info("\n Framework distribution (rows):")
|
||||
for fw, cnt in fw_dist.items():
|
||||
self.logger.info(f" {fw:<6}: {cnt:,} rows")
|
||||
|
||||
# Cek berapa indikator punya dual-framework
|
||||
dual = (
|
||||
df.groupby("indicator_id")["framework"]
|
||||
.nunique()
|
||||
.reset_index()
|
||||
.rename(columns={"framework": "n_frameworks"})
|
||||
)
|
||||
dual_ids = dual[dual["n_frameworks"] > 1]["indicator_id"].tolist()
|
||||
self.logger.info(
|
||||
f"\n Indikator dengan DUAL framework (MDGs + SDGs): {len(dual_ids)}"
|
||||
)
|
||||
if dual_ids:
|
||||
for iid in dual_ids:
|
||||
ind_name = df[df["indicator_id"] == iid]["indicator_name"].iloc[0]
|
||||
yr_range = df[df["indicator_id"] == iid][["year", "framework"]].drop_duplicates()
|
||||
mdgs_yrs = sorted(yr_range[yr_range["framework"] == "MDGs"]["year"].tolist())
|
||||
sdgs_yrs = sorted(yr_range[yr_range["framework"] == "SDGs"]["year"].tolist())
|
||||
self.logger.info(
|
||||
f" [{iid}] {ind_name[:55]}\n"
|
||||
f" MDGs years: {mdgs_yrs}\n"
|
||||
f" SDGs years: {sdgs_yrs}"
|
||||
)
|
||||
|
||||
self.logger.info(
|
||||
f"\n Indikator SDGs only (semua tahun = SDGs): "
|
||||
f"{len(dual[(dual['n_frameworks'] == 1)].merge(df[df['framework'] == 'SDGs'][['indicator_id']].drop_duplicates(), on='indicator_id'))}"
|
||||
)
|
||||
|
||||
self.df = df
|
||||
|
||||
# =========================================================================
|
||||
# STEP 4: Hitung norm_value per indikator (direction-aware)
|
||||
# =========================================================================
|
||||
|
||||
def _compute_norm_values(self) -> pd.DataFrame:
|
||||
"""
|
||||
Normalisasi per indikator secara global (semua tahun & negara):
|
||||
norm_value = (raw - min) / (max - min) [higher_better]
|
||||
norm_value = 1 - (raw - min) / (max - min) [lower_better]
|
||||
|
||||
Normalisasi dilakukan satu kali per indicator_id,
|
||||
mencakup SEMUA baris (MDGs + SDGs dari indikator yang sama)
|
||||
agar skor konsisten antar framework.
|
||||
"""
|
||||
self.logger.info("\n" + "=" * 80)
|
||||
self.logger.info("STEP 4: COMPUTE NORM_VALUE PER INDICATOR (direction-aware)")
|
||||
self.logger.info("=" * 80)
|
||||
|
||||
df = self.df.copy()
|
||||
norm_parts = []
|
||||
|
||||
for ind_id, grp in df.groupby("indicator_id"):
|
||||
grp = grp.copy()
|
||||
direction = str(grp["direction"].iloc[0])
|
||||
do_invert = _should_invert(
|
||||
direction, self.logger, context=f"indicator_id={ind_id}"
|
||||
)
|
||||
|
||||
valid_mask = grp["value"].notna()
|
||||
n_valid = valid_mask.sum()
|
||||
|
||||
if n_valid < 2:
|
||||
grp["norm_value"] = np.nan
|
||||
norm_parts.append(grp)
|
||||
self.logger.warning(
|
||||
f" [SKIP] indicator_id={ind_id}: only {n_valid} valid values"
|
||||
)
|
||||
continue
|
||||
|
||||
raw = grp.loc[valid_mask, "value"].values
|
||||
v_min = raw.min()
|
||||
v_max = raw.max()
|
||||
|
||||
normed = np.full(len(grp), np.nan)
|
||||
if v_min == v_max:
|
||||
normed[valid_mask.values] = 0.5
|
||||
else:
|
||||
normed[valid_mask.values] = (raw - v_min) / (v_max - v_min)
|
||||
|
||||
if do_invert:
|
||||
normed = np.where(np.isnan(normed), np.nan, 1.0 - normed)
|
||||
|
||||
grp["norm_value"] = normed
|
||||
norm_parts.append(grp)
|
||||
|
||||
df_normed = pd.concat(norm_parts, ignore_index=True)
|
||||
|
||||
n_ind_computed = df_normed["indicator_id"].nunique()
|
||||
self.logger.info(f" norm_value computed: {n_ind_computed} indicators")
|
||||
self.logger.info(
|
||||
f" norm_value range : "
|
||||
f"{df_normed['norm_value'].min():.4f} - {df_normed['norm_value'].max():.4f}"
|
||||
)
|
||||
self.logger.info(
|
||||
f" norm_value nulls : {df_normed['norm_value'].isna().sum()}"
|
||||
)
|
||||
return df_normed
|
||||
|
||||
# =========================================================================
|
||||
# STEP 5: Scale ke 1-100, hitung rank
|
||||
# =========================================================================
|
||||
|
||||
def _compute_scores_and_ranks(self, df: pd.DataFrame) -> pd.DataFrame:
|
||||
"""
|
||||
norm_score_1_100:
|
||||
Scale norm_value ke 1-100 secara global PER INDIKATOR
|
||||
(semua tahun & negara dalam satu indikator di-scale bersama).
|
||||
|
||||
rank_in_indicator_year:
|
||||
Rank negara dalam satu (indicator_id, year).
|
||||
rank=1 -> negara dengan norm_score terbaik untuk indikator tsb di tahun tsb.
|
||||
|
||||
rank_in_country_year:
|
||||
Rank indikator dalam satu (country_id, year).
|
||||
rank=1 -> indikator dengan norm_score terbaik untuk negara tsb di tahun tsb.
|
||||
"""
|
||||
self.logger.info("\n" + "=" * 80)
|
||||
self.logger.info("STEP 5: SCALE TO 1-100 & COMPUTE RANKS")
|
||||
self.logger.info("=" * 80)
|
||||
|
||||
# Scale per indikator
|
||||
score_parts = []
|
||||
for ind_id, grp in df.groupby("indicator_id"):
|
||||
grp = grp.copy()
|
||||
grp["norm_score_1_100"] = global_minmax(grp["norm_value"])
|
||||
score_parts.append(grp)
|
||||
df = pd.concat(score_parts, ignore_index=True)
|
||||
|
||||
# rank_in_indicator_year: rank negara per (indicator, year)
|
||||
df["rank_in_indicator_year"] = (
|
||||
df.groupby(["indicator_id", "year"])["norm_score_1_100"]
|
||||
.rank(method="min", ascending=False)
|
||||
.astype("Int64")
|
||||
)
|
||||
|
||||
# rank_in_country_year: rank indikator per (country, year)
|
||||
df["rank_in_country_year"] = (
|
||||
df.groupby(["country_id", "year"])["norm_score_1_100"]
|
||||
.rank(method="min", ascending=False)
|
||||
.astype("Int64")
|
||||
)
|
||||
|
||||
self.logger.info(
|
||||
f" norm_score_1_100 range : "
|
||||
f"{df['norm_score_1_100'].min():.2f} - {df['norm_score_1_100'].max():.2f}"
|
||||
)
|
||||
self.logger.info(
|
||||
f" rank_in_indicator_year max: {df['rank_in_indicator_year'].max()}"
|
||||
)
|
||||
self.logger.info(
|
||||
f" rank_in_country_year max : {df['rank_in_country_year'].max()}"
|
||||
)
|
||||
return df
|
||||
|
||||
# =========================================================================
|
||||
# STEP 6: Save to BigQuery
|
||||
# =========================================================================
|
||||
|
||||
def _save(self, df: pd.DataFrame) -> int:
|
||||
table_name = "agg_indicator_norm"
|
||||
|
||||
self.logger.info("\n" + "=" * 80)
|
||||
self.logger.info(f"STEP 6: SAVE -> [Gold] {table_name}")
|
||||
self.logger.info("=" * 80)
|
||||
|
||||
out = df[[
|
||||
"year",
|
||||
"country_id",
|
||||
"country_name",
|
||||
"indicator_id",
|
||||
"indicator_name",
|
||||
"direction",
|
||||
"pillar_id",
|
||||
"pillar_name",
|
||||
"framework",
|
||||
"value",
|
||||
"norm_value",
|
||||
"norm_score_1_100",
|
||||
"rank_in_indicator_year",
|
||||
"rank_in_country_year",
|
||||
]].copy()
|
||||
|
||||
out = out.sort_values(
|
||||
["year", "country_name", "pillar_name", "indicator_name"]
|
||||
).reset_index(drop=True)
|
||||
|
||||
# Cast
|
||||
out["year"] = out["year"].astype(int)
|
||||
out["country_id"] = out["country_id"].astype(int)
|
||||
out["country_name"] = out["country_name"].astype(str)
|
||||
out["indicator_id"] = out["indicator_id"].astype(int)
|
||||
out["indicator_name"] = out["indicator_name"].astype(str)
|
||||
out["direction"] = out["direction"].astype(str)
|
||||
out["pillar_id"] = out["pillar_id"].astype(int)
|
||||
out["pillar_name"] = out["pillar_name"].astype(str)
|
||||
out["framework"] = out["framework"].astype(str)
|
||||
out["value"] = out["value"].astype(float)
|
||||
out["norm_value"] = out["norm_value"].astype(float)
|
||||
out["norm_score_1_100"] = out["norm_score_1_100"].astype(float)
|
||||
out["rank_in_indicator_year"] = pd.to_numeric(
|
||||
out["rank_in_indicator_year"], errors="coerce"
|
||||
).astype("Int64")
|
||||
out["rank_in_country_year"] = pd.to_numeric(
|
||||
out["rank_in_country_year"], errors="coerce"
|
||||
).astype("Int64")
|
||||
|
||||
self.logger.info(f" Columns : {list(out.columns)}")
|
||||
self.logger.info(f" Total rows : {len(out):,}")
|
||||
self.logger.info(f" Countries : {out['country_id'].nunique()}")
|
||||
self.logger.info(f" Indicators : {out['indicator_id'].nunique()}")
|
||||
self.logger.info(f" Years : {int(out['year'].min())} - {int(out['year'].max())}")
|
||||
self.logger.info(f" Frameworks : {dict(out['framework'].value_counts())}")
|
||||
|
||||
schema = [
|
||||
bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"),
|
||||
bigquery.SchemaField("country_id", "INTEGER", mode="REQUIRED"),
|
||||
bigquery.SchemaField("country_name", "STRING", mode="REQUIRED"),
|
||||
bigquery.SchemaField("indicator_id", "INTEGER", mode="REQUIRED"),
|
||||
bigquery.SchemaField("indicator_name", "STRING", mode="REQUIRED"),
|
||||
bigquery.SchemaField("direction", "STRING", mode="REQUIRED"),
|
||||
bigquery.SchemaField("pillar_id", "INTEGER", mode="REQUIRED"),
|
||||
bigquery.SchemaField("pillar_name", "STRING", mode="REQUIRED"),
|
||||
bigquery.SchemaField("framework", "STRING", mode="REQUIRED"),
|
||||
bigquery.SchemaField("value", "FLOAT", mode="REQUIRED"),
|
||||
bigquery.SchemaField("norm_value", "FLOAT", mode="NULLABLE"),
|
||||
bigquery.SchemaField("norm_score_1_100", "FLOAT", mode="NULLABLE"),
|
||||
bigquery.SchemaField("rank_in_indicator_year", "INTEGER", mode="NULLABLE"),
|
||||
bigquery.SchemaField("rank_in_country_year", "INTEGER", mode="NULLABLE"),
|
||||
]
|
||||
|
||||
rows_loaded = load_to_bigquery(
|
||||
self.client, out, table_name,
|
||||
layer="gold", write_disposition="WRITE_TRUNCATE", schema=schema,
|
||||
)
|
||||
|
||||
log_update(self.client, "DW", table_name, "full_load", rows_loaded)
|
||||
self.logger.info(f" [OK] {table_name}: {rows_loaded:,} rows -> [Gold] fs_asean_gold")
|
||||
|
||||
metadata = {
|
||||
"source_class" : self.__class__.__name__,
|
||||
"table_name" : table_name,
|
||||
"execution_timestamp": self.pipeline_start,
|
||||
"duration_seconds" : (datetime.now() - self.pipeline_start).total_seconds(),
|
||||
"rows_fetched" : self.pipeline_metadata["rows_fetched"],
|
||||
"rows_transformed" : rows_loaded,
|
||||
"rows_loaded" : rows_loaded,
|
||||
"completeness_pct" : 100.0,
|
||||
"config_snapshot" : json.dumps({
|
||||
"sdgs_start_year" : self.sdgs_start_year,
|
||||
"sdg_only_keywords_n" : len(SDG_ONLY_KEYWORDS),
|
||||
"layer" : "gold",
|
||||
"normalization" : "per_indicator_global_minmax",
|
||||
"direction_handling" : "lower_better_inverted",
|
||||
"framework_logic" : (
|
||||
"SDG_ONLY_KEYWORDS: MDGs if year < sdgs_start_year, "
|
||||
"SDGs if year >= sdgs_start_year. "
|
||||
"Non-SDG_ONLY: always MDGs."
|
||||
),
|
||||
}),
|
||||
"validation_metrics" : json.dumps({
|
||||
"total_rows" : rows_loaded,
|
||||
"n_indicators" : int(out["indicator_id"].nunique()),
|
||||
"n_countries" : int(out["country_id"].nunique()),
|
||||
"sdgs_start_year": self.sdgs_start_year,
|
||||
}),
|
||||
}
|
||||
save_etl_metadata(self.client, metadata)
|
||||
self.logger.info(" Metadata -> [AUDIT] etl_metadata")
|
||||
return rows_loaded
|
||||
|
||||
# =========================================================================
|
||||
# STEP 7: Summary log
|
||||
# =========================================================================
|
||||
|
||||
def _log_summary(self, df: pd.DataFrame):
|
||||
self.logger.info("\n" + "=" * 80)
|
||||
self.logger.info("STEP 7: SUMMARY")
|
||||
self.logger.info("=" * 80)
|
||||
|
||||
# Per framework & year
|
||||
summary = (
|
||||
df.groupby(["framework", "year"])
|
||||
.agg(
|
||||
n_indicators=("indicator_id", "nunique"),
|
||||
n_countries =("country_id", "nunique"),
|
||||
avg_score =("norm_score_1_100", "mean"),
|
||||
)
|
||||
.reset_index()
|
||||
)
|
||||
self.logger.info(
|
||||
f"\n{'Framework':<8} {'Year':<6} {'Indicators':<12} {'Countries':<12} {'Avg Score'}"
|
||||
)
|
||||
self.logger.info("-" * 55)
|
||||
for _, r in summary.iterrows():
|
||||
self.logger.info(
|
||||
f"{r['framework']:<8} {int(r['year']):<6} "
|
||||
f"{int(r['n_indicators']):<12} {int(r['n_countries']):<12} "
|
||||
f"{r['avg_score']:.2f}"
|
||||
)
|
||||
|
||||
# Top 5 & Bottom 5 indikator (rata-rata norm_score_1_100)
|
||||
ind_avg = (
|
||||
df.groupby(["indicator_id", "indicator_name", "pillar_name", "direction"])
|
||||
["norm_score_1_100"].mean()
|
||||
.reset_index()
|
||||
.sort_values("norm_score_1_100", ascending=False)
|
||||
)
|
||||
|
||||
self.logger.info(
|
||||
"\n TOP 5 Indicators (avg norm_score_1_100 across all years & countries):"
|
||||
)
|
||||
for _, r in ind_avg.head(5).iterrows():
|
||||
tag = "[lower+]" if r["direction"] in DIRECTION_INVERT_KEYWORDS else "[higher+]"
|
||||
self.logger.info(
|
||||
f" [{int(r['indicator_id'])}] {r['indicator_name'][:55]:<57} "
|
||||
f"{r['norm_score_1_100']:.2f} {tag}"
|
||||
)
|
||||
|
||||
self.logger.info("\n BOTTOM 5 Indicators:")
|
||||
for _, r in ind_avg.tail(5).iterrows():
|
||||
tag = "[lower+]" if r["direction"] in DIRECTION_INVERT_KEYWORDS else "[higher+]"
|
||||
self.logger.info(
|
||||
f" [{int(r['indicator_id'])}] {r['indicator_name'][:55]:<57} "
|
||||
f"{r['norm_score_1_100']:.2f} {tag}"
|
||||
)
|
||||
|
||||
# Indikator per pillar
|
||||
pillar_summary = (
|
||||
df.drop_duplicates(subset=["indicator_id", "pillar_name"])
|
||||
.groupby("pillar_name")["indicator_id"]
|
||||
.count()
|
||||
.reset_index()
|
||||
.rename(columns={"indicator_id": "n_indicators"})
|
||||
)
|
||||
self.logger.info("\n Indicators per pillar:")
|
||||
for _, r in pillar_summary.iterrows():
|
||||
self.logger.info(f" {r['pillar_name']:<30}: {r['n_indicators']}")
|
||||
|
||||
# =========================================================================
|
||||
# RUN
|
||||
# =========================================================================
|
||||
|
||||
def run(self):
|
||||
self.pipeline_start = datetime.now()
|
||||
self.pipeline_metadata["start_time"] = self.pipeline_start
|
||||
|
||||
self.logger.info("\n" + "=" * 80)
|
||||
self.logger.info("INDICATOR NORM AGGREGATION")
|
||||
self.logger.info(" Source : fact_asean_food_security_selected")
|
||||
self.logger.info(" Output : agg_indicator_norm -> fs_asean_gold")
|
||||
self.logger.info("=" * 80)
|
||||
|
||||
self.load_data()
|
||||
self.sdgs_start_year = self._detect_sdgs_start_year()
|
||||
self._assign_framework()
|
||||
df_normed = self._compute_norm_values()
|
||||
df_final = self._compute_scores_and_ranks(df_normed)
|
||||
rows_loaded = self._save(df_final)
|
||||
self.pipeline_metadata["rows_loaded"] = rows_loaded
|
||||
self._log_summary(df_final)
|
||||
|
||||
self.pipeline_metadata["end_time"] = datetime.now()
|
||||
duration = (
|
||||
self.pipeline_metadata["end_time"] - self.pipeline_start
|
||||
).total_seconds()
|
||||
|
||||
self.logger.info("\n" + "=" * 80)
|
||||
self.logger.info("COMPLETED")
|
||||
self.logger.info("=" * 80)
|
||||
self.logger.info(f" Duration : {duration:.2f}s")
|
||||
self.logger.info(f" Rows Fetched : {self.pipeline_metadata['rows_fetched']:,}")
|
||||
self.logger.info(f" Rows Loaded : {rows_loaded:,}")
|
||||
self.logger.info(f" sdgs_start_year : {self.sdgs_start_year}")
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# AIRFLOW TASK
|
||||
# =============================================================================
|
||||
|
||||
def run_indicator_norm_aggregation():
|
||||
"""
|
||||
Airflow task: Build agg_indicator_norm.
|
||||
Dipanggil setelah analytical_layer_to_gold selesai.
|
||||
"""
|
||||
client = get_bigquery_client()
|
||||
agg = IndicatorNormAggregator(client)
|
||||
agg.run()
|
||||
print(f"agg_indicator_norm loaded: {agg.pipeline_metadata['rows_loaded']:,} rows")
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# MAIN
|
||||
# =============================================================================
|
||||
|
||||
if __name__ == "__main__":
|
||||
import sys, io
|
||||
if sys.stdout.encoding and sys.stdout.encoding.lower() not in ("utf-8", "utf8"):
|
||||
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", errors="replace")
|
||||
if sys.stderr.encoding and sys.stderr.encoding.lower() not in ("utf-8", "utf8"):
|
||||
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding="utf-8", errors="replace")
|
||||
|
||||
print("=" * 80)
|
||||
print("INDICATOR NORM AGGREGATION -> fs_asean_gold")
|
||||
print(" Source : fact_asean_food_security_selected")
|
||||
print(" Output : agg_indicator_norm")
|
||||
print("=" * 80)
|
||||
|
||||
logger = setup_logging()
|
||||
client = get_bigquery_client()
|
||||
agg = IndicatorNormAggregator(client)
|
||||
agg.run()
|
||||
|
||||
print("\n" + "=" * 80)
|
||||
print("[OK] COMPLETED")
|
||||
print("=" * 80)
|
||||
Reference in New Issue
Block a user