capek eror
This commit is contained in:
@@ -247,140 +247,91 @@ def _format_yoy(yoy: float, unit: str, lower_better: bool) -> tuple:
|
||||
|
||||
def _build_narrative(row: pd.Series) -> str:
|
||||
"""
|
||||
Bangun 1 paragraf narasi per indikator, merangkum seluruh negara ASEAN
|
||||
dan seluruh tahun yang tersedia.
|
||||
Bangun 1 paragraf narasi ASEAN-level untuk satu baris (year x indicator_id).
|
||||
"""
|
||||
ind_name = str(row["indicator_name"]).strip()
|
||||
unit = str(row["unit"]).strip() if row["unit"] else ""
|
||||
direction = str(row["direction"]).strip()
|
||||
pillar = str(row["pillar_name"]).strip()
|
||||
framework = str(row["framework"]).strip()
|
||||
year_min = int(row["year_min"])
|
||||
year_max = int(row["year_max"])
|
||||
n_years = int(row["n_years"])
|
||||
n_countries = int(row["n_countries"])
|
||||
avg_score = row["avg_norm_score_1_100"]
|
||||
performance = str(row["performance"]) if not pd.isna(row["performance"]) else None
|
||||
first_val = row["asean_avg_first"]
|
||||
last_val = row["asean_avg_last"]
|
||||
cum_change = row["cumulative_change"]
|
||||
best_country = str(row["best_country"]).strip()
|
||||
worst_country = str(row["worst_country"]).strip()
|
||||
yoy_improved = int(row["yoy_improved_count"]) if not pd.isna(row["yoy_improved_count"]) else 0
|
||||
yoy_total = int(row["yoy_total_transitions"]) if not pd.isna(row["yoy_total_transitions"]) else 0
|
||||
best_year = int(row["best_year"]) if not pd.isna(row["best_year"]) else None
|
||||
worst_year = int(row["worst_year"]) if not pd.isna(row["worst_year"]) else None
|
||||
year = int(row["year"])
|
||||
ind_name = str(row["indicator_name"]).strip()
|
||||
unit = str(row["unit"]).strip() if row["unit"] else ""
|
||||
direction = str(row["direction"]).strip()
|
||||
pillar = str(row["pillar_name"]).strip()
|
||||
framework = str(row["framework"]).strip()
|
||||
avg_val = row["avg_value"]
|
||||
avg_score = row["avg_norm_score_1_100"]
|
||||
performance = row["performance"]
|
||||
yoy = row["yoy_avg_value"]
|
||||
n_countries = int(row["n_countries"]) if not pd.isna(row["n_countries"]) else 0
|
||||
|
||||
lower_better = _is_lower_better(direction)
|
||||
direction_label = (
|
||||
"lower values indicate better outcomes"
|
||||
if lower_better else "higher values indicate better outcomes"
|
||||
"lower values indicate better outcomes" if lower_better
|
||||
else "higher values indicate better outcomes"
|
||||
)
|
||||
|
||||
year_span = (
|
||||
f"from {year_min} to {year_max}"
|
||||
if year_min != year_max else f"in {year_min}"
|
||||
# --- Bagian 1: Nilai rata-rata ASEAN ---
|
||||
val_str = _format_value(avg_val, unit)
|
||||
sentence1 = (
|
||||
f"In {year}, the ASEAN regional average for {ind_name} stood at {val_str}"
|
||||
)
|
||||
n_years_label = f"{n_years} year{'s' if n_years > 1 else ''} of data"
|
||||
|
||||
# --- Bagian 1: Pembuka ---
|
||||
s1 = (
|
||||
f"The {ind_name} under the {framework} framework ({pillar} pillar) "
|
||||
f"was monitored {year_span} across {n_countries} ASEAN member state{'s' if n_countries > 1 else ''}, "
|
||||
f"covering {n_years_label}."
|
||||
)
|
||||
|
||||
# --- Bagian 2: Arah interpretasi ---
|
||||
s2 = f"Since {direction_label} for this indicator, performance is evaluated accordingly."
|
||||
|
||||
# --- Bagian 3: Trend kumulatif (first -> last) ---
|
||||
if not pd.isna(first_val) and not pd.isna(last_val) and not pd.isna(cum_change):
|
||||
first_str = _format_value(first_val, unit)
|
||||
last_str = _format_value(last_val, unit)
|
||||
chg_str = _format_value(abs(cum_change), unit)
|
||||
|
||||
if lower_better:
|
||||
is_improving = cum_change < 0
|
||||
else:
|
||||
is_improving = cum_change > 0
|
||||
|
||||
moved = "declined" if cum_change < 0 else "increased"
|
||||
trend_lbl = "an overall improving trend" if is_improving else "an overall deteriorating trend"
|
||||
|
||||
s3 = (
|
||||
f"The ASEAN regional average {moved} from {first_str} in {year_min} "
|
||||
f"to {last_str} in {year_max}, a cumulative shift of {chg_str}, "
|
||||
f"indicating {trend_lbl} over the observed period."
|
||||
if n_countries > 0:
|
||||
sentence1 += (
|
||||
f", based on data from {n_countries} "
|
||||
f"ASEAN member state{'s' if n_countries > 1 else ''}"
|
||||
)
|
||||
else:
|
||||
s3 = "Insufficient data is available to assess the cumulative trend across the period."
|
||||
sentence1 += "."
|
||||
|
||||
# --- Bagian 4: Score dan performance ---
|
||||
# --- Bagian 2: Score dan performance ---
|
||||
if not pd.isna(avg_score):
|
||||
score_str = f"{avg_score:.1f} out of 100"
|
||||
if performance == "Good":
|
||||
s4 = (
|
||||
f"The average normalized score across all years and countries stood at {score_str}, "
|
||||
f"placing the region in the Good performance category and consistently above the 60-point threshold."
|
||||
perf_phrase = (
|
||||
f"The region achieved a normalized score of {score_str}, "
|
||||
f"classified as Good performance meeting the 60-point threshold "
|
||||
f"under the {framework} framework ({pillar} pillar)."
|
||||
)
|
||||
elif performance == "Bad":
|
||||
s4 = (
|
||||
f"The average normalized score across all years and countries stood at {score_str}, "
|
||||
f"placing the region in the Bad performance category and below the 60-point threshold."
|
||||
perf_phrase = (
|
||||
f"The region recorded a normalized score of {score_str}, "
|
||||
f"classified as Bad performance falling below the 60-point threshold "
|
||||
f"under the {framework} framework ({pillar} pillar)."
|
||||
)
|
||||
else:
|
||||
s4 = f"The average normalized score across all years and countries stood at {score_str}."
|
||||
perf_phrase = (
|
||||
f"The region recorded a normalized score of {score_str} "
|
||||
f"under the {framework} framework ({pillar} pillar)."
|
||||
)
|
||||
else:
|
||||
s4 = "The normalized score could not be assessed due to insufficient data."
|
||||
|
||||
# --- Bagian 5: Best dan worst country ---
|
||||
if best_country and worst_country and best_country != worst_country:
|
||||
s5 = (
|
||||
f"Among member states, {best_country} recorded the strongest performance throughout the period, "
|
||||
f"while {worst_country} showed the weakest performance."
|
||||
perf_phrase = (
|
||||
f"Performance could not be assessed due to insufficient data "
|
||||
f"under the {framework} framework ({pillar} pillar)."
|
||||
)
|
||||
elif best_country:
|
||||
s5 = f"{best_country} recorded the strongest overall performance among ASEAN member states."
|
||||
else:
|
||||
s5 = ""
|
||||
|
||||
# --- Bagian 6: Best dan worst year ---
|
||||
if best_year and worst_year and best_year != worst_year:
|
||||
s6 = (
|
||||
f"The best regional score was recorded in {best_year}, "
|
||||
f"while {worst_year} represented the weakest year across the region."
|
||||
)
|
||||
elif best_year:
|
||||
s6 = f"The strongest regional performance was recorded in {best_year}."
|
||||
else:
|
||||
s6 = ""
|
||||
# --- Bagian 3 & 4: Arah + YoY ---
|
||||
direction_phrase = f"Since {direction_label} for this indicator"
|
||||
|
||||
# --- Bagian 7: YoY consistency ---
|
||||
if yoy_total > 0:
|
||||
yoy_worsened = yoy_total - yoy_improved
|
||||
if yoy_improved > yoy_worsened:
|
||||
consistency = "predominantly positive"
|
||||
elif yoy_improved < yoy_worsened:
|
||||
consistency = "predominantly negative"
|
||||
if not pd.isna(yoy) and yoy != 0:
|
||||
direction_word, change_desc, is_positive = _format_yoy(yoy, unit, lower_better)
|
||||
if is_positive:
|
||||
trend_word = "a positive trend"
|
||||
tone = "reflecting improvements in regional food security performance"
|
||||
else:
|
||||
consistency = "mixed"
|
||||
s7 = (
|
||||
f"Year-on-year, the region showed improvement in {yoy_improved} out of {yoy_total} "
|
||||
f"transitions, reflecting a {consistency} trajectory over the period."
|
||||
trend_word = "a deteriorating trend"
|
||||
tone = "signaling the need for greater regional attention and policy response"
|
||||
yoy_phrase = (
|
||||
f"{direction_phrase}, the regional average {direction_word} {change_desc} "
|
||||
f"compared to {year - 1}, reflecting {trend_word} — {tone}."
|
||||
)
|
||||
elif pd.isna(yoy):
|
||||
yoy_phrase = (
|
||||
f"No prior year data is available for comparison, "
|
||||
f"as this is the earliest recorded year for this indicator in the dataset."
|
||||
)
|
||||
else:
|
||||
s7 = ""
|
||||
yoy_phrase = (
|
||||
f"{direction_phrase}, the regional average remained stable "
|
||||
f"compared to {year - 1}, with no measurable change year-on-year."
|
||||
)
|
||||
|
||||
# --- Gabungkan semua bagian ---
|
||||
parts = [s1, s2, s3, s4]
|
||||
if s5:
|
||||
parts.append(s5)
|
||||
if s6:
|
||||
parts.append(s6)
|
||||
if s7:
|
||||
parts.append(s7)
|
||||
|
||||
return " ".join(parts)
|
||||
return f"{sentence1} {perf_phrase} {yoy_phrase}"
|
||||
|
||||
|
||||
# =============================================================================
|
||||
@@ -996,234 +947,112 @@ class IndicatorNormAggregator:
|
||||
Pipeline agg_narrative_indicator yang dijalankan otomatis
|
||||
setelah agg_indicator_norm selesai. Memakai df_final yang sudah ada
|
||||
di memori, tanpa re-load dari BigQuery.
|
||||
|
||||
Granularity: per indicator_id (1 baris = 1 indikator,
|
||||
merangkum SELURUH negara ASEAN + SELURUH tahun yang tersedia).
|
||||
Total rows = jumlah indikator unik.
|
||||
"""
|
||||
self.logger.info("\n" + "=" * 80)
|
||||
self.logger.info("STEP 12-16: agg_narrative_indicator")
|
||||
self.logger.info(" Level : per indicator_id (all ASEAN countries x all years)")
|
||||
self.logger.info(" Level : ASEAN (year x indicator_id)")
|
||||
self.logger.info("=" * 80)
|
||||
|
||||
df = df_final.copy()
|
||||
|
||||
# -- STEP 12: Agregasi per indicator_id (all countries, all years) --
|
||||
self.logger.info("\n--- STEP 12: AGGREGATE PER INDICATOR (all countries x all years) ---")
|
||||
|
||||
# -- STEP 12: Agregasi ke level ASEAN --
|
||||
self.logger.info("\n--- STEP 12: AGGREGATE TO ASEAN LEVEL ---")
|
||||
dim_cols = ["indicator_name", "unit", "direction", "pillar_name", "framework"]
|
||||
agg_dict = {col: "first" for col in dim_cols}
|
||||
agg_dict["value"] = "mean"
|
||||
agg_dict["norm_score_1_100"] = "mean"
|
||||
agg_dict["country_id"] = "count"
|
||||
|
||||
# Agregat dasar
|
||||
base = (
|
||||
df.groupby("indicator_id")
|
||||
.agg(
|
||||
indicator_name = ("indicator_name", "first"),
|
||||
unit = ("unit", "first"),
|
||||
direction = ("direction", "first"),
|
||||
pillar_name = ("pillar_name", "first"),
|
||||
framework = ("framework", "first"),
|
||||
year_min = ("year", "min"),
|
||||
year_max = ("year", "max"),
|
||||
n_years = ("year", "nunique"),
|
||||
n_countries = ("country_id", "nunique"),
|
||||
avg_norm_score_1_100 = ("norm_score_1_100", "mean"),
|
||||
df_agg = (
|
||||
df_final.groupby(["year", "indicator_id"])
|
||||
.agg(agg_dict)
|
||||
.reset_index()
|
||||
.rename(columns={
|
||||
"value" : "avg_value",
|
||||
"norm_score_1_100": "avg_norm_score_1_100",
|
||||
"country_id" : "n_countries",
|
||||
})
|
||||
)
|
||||
self.logger.info(f" Rows : {len(df_agg):,}")
|
||||
self.logger.info(f" Inds : {df_agg['indicator_id'].nunique()}")
|
||||
self.logger.info(
|
||||
f" Years : {int(df_agg['year'].min())} - {int(df_agg['year'].max())}"
|
||||
)
|
||||
|
||||
# -- STEP 13: YoY avg_value per indikator --
|
||||
self.logger.info("\n--- STEP 13: COMPUTE YoY avg_value ---")
|
||||
parts = []
|
||||
for ind_id, grp in df_agg.groupby("indicator_id"):
|
||||
grp = grp.sort_values("year").copy()
|
||||
grp["prev_avg_value"] = grp["avg_value"].shift(1)
|
||||
grp["yoy_avg_value"] = np.where(
|
||||
grp["avg_value"].notna() & grp["prev_avg_value"].notna(),
|
||||
grp["avg_value"] - grp["prev_avg_value"],
|
||||
np.nan,
|
||||
)
|
||||
.reset_index()
|
||||
)
|
||||
grp = grp.drop(columns=["prev_avg_value"])
|
||||
parts.append(grp)
|
||||
df_agg = pd.concat(parts, ignore_index=True)
|
||||
self.logger.info(f" yoy_avg_value nulls: {df_agg['yoy_avg_value'].isna().sum():,}")
|
||||
|
||||
# ASEAN avg value per tahun -> ambil first year & last year value
|
||||
asean_yr = (
|
||||
df.groupby(["indicator_id", "year"])["value"]
|
||||
.mean()
|
||||
.reset_index()
|
||||
.rename(columns={"value": "avg_val_yr"})
|
||||
)
|
||||
|
||||
def _first_last(grp):
|
||||
grp = grp.sort_values("year")
|
||||
return pd.Series({
|
||||
"asean_avg_first": grp["avg_val_yr"].iloc[0],
|
||||
"asean_avg_last" : grp["avg_val_yr"].iloc[-1],
|
||||
})
|
||||
|
||||
first_last = (
|
||||
asean_yr.groupby("indicator_id")
|
||||
.apply(_first_last)
|
||||
.reset_index()
|
||||
)
|
||||
base = base.merge(first_last, on="indicator_id", how="left")
|
||||
base["cumulative_change"] = base["asean_avg_last"] - base["asean_avg_first"]
|
||||
|
||||
# Best & worst country (berdasarkan avg norm_score_1_100 per negara per indikator)
|
||||
country_score = (
|
||||
df.groupby(["indicator_id", "country_name"])["norm_score_1_100"]
|
||||
.mean()
|
||||
.reset_index()
|
||||
)
|
||||
|
||||
def _best_worst_country(grp):
|
||||
grp = grp.sort_values("norm_score_1_100")
|
||||
return pd.Series({
|
||||
"worst_country": grp["country_name"].iloc[0],
|
||||
"best_country" : grp["country_name"].iloc[-1],
|
||||
})
|
||||
|
||||
bw_country = (
|
||||
country_score.groupby("indicator_id")
|
||||
.apply(_best_worst_country)
|
||||
.reset_index()
|
||||
)
|
||||
base = base.merge(bw_country, on="indicator_id", how="left")
|
||||
|
||||
# Best & worst year (berdasarkan avg norm_score_1_100 ASEAN per tahun)
|
||||
yr_score = (
|
||||
df.groupby(["indicator_id", "year"])["norm_score_1_100"]
|
||||
.mean()
|
||||
.reset_index()
|
||||
)
|
||||
|
||||
def _best_worst_year(grp):
|
||||
grp = grp.sort_values("norm_score_1_100")
|
||||
return pd.Series({
|
||||
"worst_year": int(grp["year"].iloc[0]),
|
||||
"best_year" : int(grp["year"].iloc[-1]),
|
||||
})
|
||||
|
||||
bw_year = (
|
||||
yr_score.groupby("indicator_id")
|
||||
.apply(_best_worst_year)
|
||||
.reset_index()
|
||||
)
|
||||
base = base.merge(bw_year, on="indicator_id", how="left")
|
||||
|
||||
# YoY consistency: hitung berapa transisi membaik vs memburuk
|
||||
# (berdasarkan avg ASEAN value per tahun, direction-aware)
|
||||
def _yoy_consistency(grp):
|
||||
ind_id = grp["indicator_id"].iloc[0]
|
||||
direction = base.loc[base["indicator_id"] == ind_id, "direction"].iloc[0]
|
||||
lb = _is_lower_better(direction)
|
||||
grp = grp.sort_values("year")
|
||||
diffs = grp["avg_val_yr"].diff().dropna()
|
||||
if lb:
|
||||
improved = (diffs < 0).sum()
|
||||
else:
|
||||
improved = (diffs > 0).sum()
|
||||
return pd.Series({
|
||||
"yoy_improved_count" : int(improved),
|
||||
"yoy_total_transitions": int(len(diffs)),
|
||||
})
|
||||
|
||||
yoy_cons = (
|
||||
asean_yr.groupby("indicator_id")
|
||||
.apply(_yoy_consistency)
|
||||
.reset_index()
|
||||
)
|
||||
base = base.merge(yoy_cons, on="indicator_id", how="left")
|
||||
|
||||
self.logger.info(f" Rows (= n_indicators): {len(base):,}")
|
||||
self.logger.info(f" Years span : {int(base['year_min'].min())} - {int(base['year_max'].max())}")
|
||||
|
||||
# -- STEP 13: Assign performance berdasarkan avg_norm_score_1_100 --
|
||||
self.logger.info("\n--- STEP 13: ASSIGN PERFORMANCE ---")
|
||||
base["performance"] = pd.NA
|
||||
has_score = base["avg_norm_score_1_100"].notna()
|
||||
base.loc[has_score & (base["avg_norm_score_1_100"] >= _PERFORMANCE_THRESHOLD), "performance"] = "Good"
|
||||
base.loc[has_score & (base["avg_norm_score_1_100"] < _PERFORMANCE_THRESHOLD), "performance"] = "Bad"
|
||||
n_good = (base["performance"] == "Good").sum()
|
||||
n_bad = (base["performance"] == "Bad").sum()
|
||||
# -- STEP 14: Assign performance --
|
||||
self.logger.info("\n--- STEP 14: ASSIGN PERFORMANCE ---")
|
||||
df_agg["performance"] = pd.NA
|
||||
has_score = df_agg["avg_norm_score_1_100"].notna()
|
||||
df_agg.loc[has_score & (df_agg["avg_norm_score_1_100"] >= _PERFORMANCE_THRESHOLD), "performance"] = "Good"
|
||||
df_agg.loc[has_score & (df_agg["avg_norm_score_1_100"] < _PERFORMANCE_THRESHOLD), "performance"] = "Bad"
|
||||
n_good = (df_agg["performance"] == "Good").sum()
|
||||
n_bad = (df_agg["performance"] == "Bad").sum()
|
||||
self.logger.info(f" Good: {n_good:,} | Bad: {n_bad:,}")
|
||||
|
||||
# -- STEP 14: Build narrative --
|
||||
self.logger.info("\n--- STEP 14: BUILD NARRATIVE ---")
|
||||
base["narrative"] = base.apply(_build_narrative, axis=1)
|
||||
self.logger.info(f" Narratives generated: {len(base):,}")
|
||||
# -- STEP 15: Build narrative --
|
||||
self.logger.info("\n--- STEP 15: BUILD NARRATIVE ---")
|
||||
df_agg["narrative"] = df_agg.apply(_build_narrative, axis=1)
|
||||
self.logger.info(f" Narratives generated: {len(df_agg):,}")
|
||||
self.logger.info("\n Sample (first 2):")
|
||||
for _, row in base.head(2).iterrows():
|
||||
for _, row in df_agg.head(2).iterrows():
|
||||
self.logger.info(
|
||||
f"\n [{row['indicator_name'][:55]}]"
|
||||
f"\n -> {row['narrative'][:300]}..."
|
||||
f"\n [{int(row['year'])}] {row['indicator_name'][:50]}"
|
||||
f"\n -> {row['narrative'][:250]}..."
|
||||
)
|
||||
|
||||
# -- STEP 15: Save --
|
||||
self.logger.info("\n--- STEP 15: SAVE -> [Gold] agg_narrative_indicator ---")
|
||||
out = base[[
|
||||
"indicator_id",
|
||||
"indicator_name",
|
||||
"unit",
|
||||
"direction",
|
||||
"pillar_name",
|
||||
"framework",
|
||||
"year_min",
|
||||
"year_max",
|
||||
"n_years",
|
||||
"n_countries",
|
||||
"asean_avg_first",
|
||||
"asean_avg_last",
|
||||
"cumulative_change",
|
||||
"avg_norm_score_1_100",
|
||||
"performance",
|
||||
"best_country",
|
||||
"worst_country",
|
||||
"best_year",
|
||||
"worst_year",
|
||||
"yoy_improved_count",
|
||||
"yoy_total_transitions",
|
||||
"narrative",
|
||||
# -- STEP 16: Save --
|
||||
self.logger.info("\n--- STEP 16: SAVE -> [Gold] agg_narrative_indicator ---")
|
||||
out = df_agg[[
|
||||
"year", "indicator_id", "indicator_name", "unit", "direction",
|
||||
"pillar_name", "framework",
|
||||
"avg_value", "avg_norm_score_1_100", "performance",
|
||||
"yoy_avg_value", "n_countries", "narrative",
|
||||
]].copy()
|
||||
|
||||
out = out.sort_values(["pillar_name", "indicator_name"]).reset_index(drop=True)
|
||||
out = out.sort_values(["year", "pillar_name", "indicator_name"]).reset_index(drop=True)
|
||||
|
||||
out["year"] = out["year"].astype(int)
|
||||
out["indicator_id"] = out["indicator_id"].astype(int)
|
||||
out["indicator_name"] = out["indicator_name"].astype(str)
|
||||
out["unit"] = out["unit"].fillna("").astype(str)
|
||||
out["direction"] = out["direction"].astype(str)
|
||||
out["pillar_name"] = out["pillar_name"].astype(str)
|
||||
out["framework"] = out["framework"].astype(str)
|
||||
out["year_min"] = out["year_min"].astype(int)
|
||||
out["year_max"] = out["year_max"].astype(int)
|
||||
out["n_years"] = out["n_years"].astype(int)
|
||||
out["n_countries"] = out["n_countries"].astype(int)
|
||||
out["asean_avg_first"] = out["asean_avg_first"].astype(float)
|
||||
out["asean_avg_last"] = out["asean_avg_last"].astype(float)
|
||||
out["cumulative_change"] = out["cumulative_change"].astype(float)
|
||||
out["avg_value"] = out["avg_value"].astype(float)
|
||||
out["avg_norm_score_1_100"] = out["avg_norm_score_1_100"].astype(float)
|
||||
out["performance"] = out["performance"].astype(str).replace("nan", pd.NA).astype("string")
|
||||
out["best_country"] = out["best_country"].astype(str)
|
||||
out["worst_country"] = out["worst_country"].astype(str)
|
||||
out["best_year"] = pd.to_numeric(out["best_year"], errors="coerce").astype("Int64")
|
||||
out["worst_year"] = pd.to_numeric(out["worst_year"], errors="coerce").astype("Int64")
|
||||
out["yoy_improved_count"] = pd.to_numeric(out["yoy_improved_count"], errors="coerce").astype("Int64")
|
||||
out["yoy_total_transitions"]= pd.to_numeric(out["yoy_total_transitions"], errors="coerce").astype("Int64")
|
||||
out["yoy_avg_value"] = pd.to_numeric(out["yoy_avg_value"], errors="coerce").astype(float)
|
||||
out["n_countries"] = out["n_countries"].astype(int)
|
||||
out["narrative"] = out["narrative"].astype(str)
|
||||
|
||||
self.logger.info(f" Columns : {list(out.columns)}")
|
||||
self.logger.info(f" Total rows : {len(out):,}")
|
||||
self.logger.info(f" Frameworks : {dict(out['framework'].value_counts())}")
|
||||
self.logger.info(f" Performance: {dict(out['performance'].value_counts())}")
|
||||
|
||||
schema = [
|
||||
bigquery.SchemaField("indicator_id", "INTEGER", mode="REQUIRED"),
|
||||
bigquery.SchemaField("indicator_name", "STRING", mode="REQUIRED"),
|
||||
bigquery.SchemaField("unit", "STRING", mode="NULLABLE"),
|
||||
bigquery.SchemaField("direction", "STRING", mode="REQUIRED"),
|
||||
bigquery.SchemaField("pillar_name", "STRING", mode="REQUIRED"),
|
||||
bigquery.SchemaField("framework", "STRING", mode="REQUIRED"),
|
||||
bigquery.SchemaField("year_min", "INTEGER", mode="REQUIRED"),
|
||||
bigquery.SchemaField("year_max", "INTEGER", mode="REQUIRED"),
|
||||
bigquery.SchemaField("n_years", "INTEGER", mode="REQUIRED"),
|
||||
bigquery.SchemaField("n_countries", "INTEGER", mode="REQUIRED"),
|
||||
bigquery.SchemaField("asean_avg_first", "FLOAT", mode="NULLABLE"),
|
||||
bigquery.SchemaField("asean_avg_last", "FLOAT", mode="NULLABLE"),
|
||||
bigquery.SchemaField("cumulative_change", "FLOAT", mode="NULLABLE"),
|
||||
bigquery.SchemaField("avg_norm_score_1_100", "FLOAT", mode="NULLABLE"),
|
||||
bigquery.SchemaField("performance", "STRING", mode="NULLABLE"),
|
||||
bigquery.SchemaField("best_country", "STRING", mode="NULLABLE"),
|
||||
bigquery.SchemaField("worst_country", "STRING", mode="NULLABLE"),
|
||||
bigquery.SchemaField("best_year", "INTEGER", mode="NULLABLE"),
|
||||
bigquery.SchemaField("worst_year", "INTEGER", mode="NULLABLE"),
|
||||
bigquery.SchemaField("yoy_improved_count", "INTEGER", mode="NULLABLE"),
|
||||
bigquery.SchemaField("yoy_total_transitions", "INTEGER", mode="NULLABLE"),
|
||||
bigquery.SchemaField("narrative", "STRING", mode="NULLABLE"),
|
||||
bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"),
|
||||
bigquery.SchemaField("indicator_id", "INTEGER", mode="REQUIRED"),
|
||||
bigquery.SchemaField("indicator_name", "STRING", mode="REQUIRED"),
|
||||
bigquery.SchemaField("unit", "STRING", mode="NULLABLE"),
|
||||
bigquery.SchemaField("direction", "STRING", mode="REQUIRED"),
|
||||
bigquery.SchemaField("pillar_name", "STRING", mode="REQUIRED"),
|
||||
bigquery.SchemaField("framework", "STRING", mode="REQUIRED"),
|
||||
bigquery.SchemaField("avg_value", "FLOAT", mode="NULLABLE"),
|
||||
bigquery.SchemaField("avg_norm_score_1_100", "FLOAT", mode="NULLABLE"),
|
||||
bigquery.SchemaField("performance", "STRING", mode="NULLABLE"),
|
||||
bigquery.SchemaField("yoy_avg_value", "FLOAT", mode="NULLABLE"),
|
||||
bigquery.SchemaField("n_countries", "INTEGER", mode="REQUIRED"),
|
||||
bigquery.SchemaField("narrative", "STRING", mode="NULLABLE"),
|
||||
]
|
||||
|
||||
rows_loaded = load_to_bigquery(
|
||||
@@ -1247,23 +1076,17 @@ class IndicatorNormAggregator:
|
||||
"completeness_pct" : 100.0,
|
||||
"config_snapshot" : json.dumps({
|
||||
"source_table" : "agg_indicator_norm (in-memory df_final)",
|
||||
"granularity" : "indicator_id (all ASEAN countries x all years)",
|
||||
"aggregation" : "mean across all countries and years per indicator",
|
||||
"granularity" : "year x indicator_id (ASEAN level)",
|
||||
"aggregation" : "mean across ASEAN countries",
|
||||
"performance_threshold": _PERFORMANCE_THRESHOLD,
|
||||
"narrative_components" : [
|
||||
"year span", "n_countries", "direction interpretation",
|
||||
"cumulative trend (first to last year)",
|
||||
"avg_norm_score + performance",
|
||||
"best/worst country", "best/worst year",
|
||||
"YoY consistency count",
|
||||
],
|
||||
"layer": "gold",
|
||||
"yoy_column" : "yoy_avg_value",
|
||||
"layer" : "gold",
|
||||
}),
|
||||
"validation_metrics" : json.dumps({
|
||||
"total_rows" : rows_loaded,
|
||||
"n_indicators": int(out["indicator_id"].nunique()),
|
||||
"year_min" : int(out["year_min"].min()),
|
||||
"year_max" : int(out["year_max"].max()),
|
||||
"year_min" : int(out["year"].min()),
|
||||
"year_max" : int(out["year"].max()),
|
||||
}),
|
||||
}
|
||||
save_etl_metadata(self.client, metadata)
|
||||
|
||||
Reference in New Issue
Block a user