bismillah done

This commit is contained in:
Debby
2026-04-07 10:00:45 +07:00
parent cebb6b88eb
commit 0384e62b01
3 changed files with 372 additions and 38 deletions

View File

@@ -1,7 +1,11 @@
""" """
BIGQUERY ANALYSIS LAYER - INDICATOR NORM AGGREGATION BIGQUERY ANALYSIS LAYER - INDICATOR NORM AGGREGATION
Tabel: agg_indicator_norm -> fs_asean_gold Tabel 1: agg_indicator_norm -> fs_asean_gold
Tabel 2: agg_narrative_indicator -> fs_asean_gold
=============================================================================
agg_indicator_norm
=============================================================================
Tujuan: Tujuan:
Menghitung norm_value per indikator per negara per tahun, sehingga dapat Menghitung norm_value per indikator per negara per tahun, sehingga dapat
melihat performa setiap indikator secara individual (lower_better & higher_better melihat performa setiap indikator secara individual (lower_better & higher_better
@@ -35,6 +39,27 @@ Output Schema (agg_indicator_norm):
yoy_value, -- perubahan absolut value YoY yoy_value, -- perubahan absolut value YoY
yoy_norm_value, -- perubahan absolut norm_value YoY yoy_norm_value, -- perubahan absolut norm_value YoY
performance -- "Good" | "Bad" | null performance -- "Good" | "Bad" | null
=============================================================================
agg_narrative_indicator
=============================================================================
Tujuan:
Menghasilkan narasi otomatis 1 paragraf per indikator per tahun di level ASEAN
(rata-rata seluruh negara ASEAN), dijalankan otomatis setelah agg_indicator_norm
selesai dalam pipeline yang sama.
Granularity:
year x indicator_id (level ASEAN, bukan per negara)
Output Schema (agg_narrative_indicator):
year, indicator_id, indicator_name, unit, direction,
pillar_name, framework,
avg_value, -- rata-rata value ASEAN
avg_norm_score_1_100, -- rata-rata norm_score_1_100 ASEAN
performance, -- Good | Bad | null
yoy_avg_value, -- perubahan avg_value vs tahun sebelumnya
n_countries, -- jumlah negara yang punya data tahun ini
narrative -- 1 paragraf narasi otomatis
""" """
import pandas as pd import pandas as pd
@@ -113,7 +138,7 @@ _PERFORMANCE_THRESHOLD: float = 60.0
# ============================================================================= # =============================================================================
# PURE HELPERS # PURE HELPERS — agg_indicator_norm
# ============================================================================= # =============================================================================
def _should_invert(direction: str, logger=None, context: str = "") -> bool: def _should_invert(direction: str, logger=None, context: str = "") -> bool:
@@ -174,6 +199,141 @@ def _compute_yoy(df: pd.DataFrame) -> pd.DataFrame:
return df return df
# =============================================================================
# PURE HELPERS — agg_narrative_indicator
# =============================================================================
def _is_lower_better(direction: str) -> bool:
return str(direction).lower().strip() in DIRECTION_INVERT_KEYWORDS
def _format_value(value: float, unit: str) -> str:
"""Format nilai dengan unit yang sesuai."""
if pd.isna(value):
return "N/A"
unit = str(unit).strip() if unit else ""
if abs(value) >= 1000:
formatted = f"{value:,.1f}"
elif abs(value) >= 10:
formatted = f"{value:.2f}"
else:
formatted = f"{value:.3f}"
return f"{formatted} {unit}".strip()
def _format_yoy(yoy: float, unit: str, lower_better: bool) -> tuple:
"""
Kembalikan (direction_word, change_desc, is_positive_trend).
is_positive_trend: True jika perubahan menguntungkan sesuai direction.
"""
unit = str(unit).strip() if unit else ""
abs_yoy = abs(yoy)
if abs_yoy >= 1000:
yoy_str = f"{abs_yoy:,.1f}"
elif abs_yoy >= 10:
yoy_str = f"{abs_yoy:.2f}"
else:
yoy_str = f"{abs_yoy:.3f}"
change_desc = f"{yoy_str} {unit}".strip()
is_positive = (yoy < 0) if lower_better else (yoy > 0)
direction_word = "decreased by" if yoy < 0 else "increased by"
return direction_word, change_desc, is_positive
def _build_narrative(row: pd.Series) -> str:
"""
Bangun 1 paragraf narasi ASEAN-level untuk satu baris (year x indicator_id).
"""
year = int(row["year"])
ind_name = str(row["indicator_name"]).strip()
unit = str(row["unit"]).strip() if row["unit"] else ""
direction = str(row["direction"]).strip()
pillar = str(row["pillar_name"]).strip()
framework = str(row["framework"]).strip()
avg_val = row["avg_value"]
avg_score = row["avg_norm_score_1_100"]
performance = row["performance"]
yoy = row["yoy_avg_value"]
n_countries = int(row["n_countries"]) if not pd.isna(row["n_countries"]) else 0
lower_better = _is_lower_better(direction)
direction_label = (
"lower values indicate better outcomes" if lower_better
else "higher values indicate better outcomes"
)
# --- Bagian 1: Nilai rata-rata ASEAN ---
val_str = _format_value(avg_val, unit)
sentence1 = (
f"In {year}, the ASEAN regional average for {ind_name} stood at {val_str}"
)
if n_countries > 0:
sentence1 += (
f", based on data from {n_countries} "
f"ASEAN member state{'s' if n_countries > 1 else ''}"
)
sentence1 += "."
# --- Bagian 2: Score dan performance ---
if not pd.isna(avg_score):
score_str = f"{avg_score:.1f} out of 100"
if performance == "Good":
perf_phrase = (
f"The region achieved a normalized score of {score_str}, "
f"classified as Good performance — meeting the 60-point threshold — "
f"under the {framework} framework ({pillar} pillar)."
)
elif performance == "Bad":
perf_phrase = (
f"The region recorded a normalized score of {score_str}, "
f"classified as Bad performance — falling below the 60-point threshold — "
f"under the {framework} framework ({pillar} pillar)."
)
else:
perf_phrase = (
f"The region recorded a normalized score of {score_str} "
f"under the {framework} framework ({pillar} pillar)."
)
else:
perf_phrase = (
f"Performance could not be assessed due to insufficient data "
f"under the {framework} framework ({pillar} pillar)."
)
# --- Bagian 3 & 4: Arah + YoY ---
direction_phrase = f"Since {direction_label} for this indicator"
if not pd.isna(yoy) and yoy != 0:
direction_word, change_desc, is_positive = _format_yoy(yoy, unit, lower_better)
if is_positive:
trend_word = "a positive trend"
tone = "reflecting improvements in regional food security performance"
else:
trend_word = "a deteriorating trend"
tone = "signaling the need for greater regional attention and policy response"
yoy_phrase = (
f"{direction_phrase}, the regional average {direction_word} {change_desc} "
f"compared to {year - 1}, reflecting {trend_word}{tone}."
)
elif pd.isna(yoy):
yoy_phrase = (
f"No prior year data is available for comparison, "
f"as this is the earliest recorded year for this indicator in the dataset."
)
else:
yoy_phrase = (
f"{direction_phrase}, the regional average remained stable "
f"compared to {year - 1}, with no measurable change year-on-year."
)
return f"{sentence1} {perf_phrase} {yoy_phrase}"
# ============================================================================= # =============================================================================
# MAIN CLASS # MAIN CLASS
# ============================================================================= # =============================================================================
@@ -182,18 +342,28 @@ class IndicatorNormAggregator:
""" """
Hitung norm_value per indikator untuk seluruh data di Hitung norm_value per indikator untuk seluruh data di
fact_asean_food_security_selected, lalu simpan ke agg_indicator_norm. fact_asean_food_security_selected, lalu simpan ke agg_indicator_norm.
Setelah selesai, otomatis menjalankan pipeline agg_narrative_indicator.
Alur: Alur agg_indicator_norm:
1. Load fact_asean_food_security_selected 1. Load fact_asean_food_security_selected
2. Load dim_indicator -> ambil kolom unit 2. Load dim_indicator -> ambil kolom unit
3. Merge unit ke df 3. Merge unit ke df
4. Deteksi sdgs_start_year (tahun pertama FIES hadir di data) 4. Deteksi sdgs_start_year (tahun pertama FIES hadir di data)
5. Assign framework per baris mengikuti aturan MDGs/SDGs dual-label 5. Assign framework per baris mengikuti aturan MDGs/SDGs dual-label
6. Hitung norm_value per indikator (direction-aware, 0-1) 6. Hitung norm_value per indikator (direction-aware, 0-1)
7. Hitung YoY per (indicator_id, country_id) 7. Hitung YoY per (indicator_id, country_id)
8. Scale ke 1-100 per indikator (global) 8. Scale ke 1-100 per indikator (global)
9. Assign performance label (Good/Bad) 9. Assign performance label (Good/Bad)
10. Simpan ke BigQuery 10. Simpan ke BigQuery -> agg_indicator_norm
11. Summary log agg_indicator_norm
Alur agg_narrative_indicator (lanjutan, pakai df_final yang sudah ada):
12. Agregasi ke level ASEAN (year x indicator_id)
13. Hitung YoY avg_value per indikator
14. Assign performance berdasarkan avg_norm_score
15. Build narrative 1 paragraf per baris
16. Simpan ke BigQuery -> agg_narrative_indicator
17. Summary log agg_narrative_indicator
""" """
def __init__(self, client: bigquery.Client): def __init__(self, client: bigquery.Client):
@@ -207,10 +377,11 @@ class IndicatorNormAggregator:
self.pipeline_start = None self.pipeline_start = None
self.pipeline_metadata = { self.pipeline_metadata = {
"rows_fetched": 0, "rows_fetched" : 0,
"rows_loaded" : 0, "rows_loaded" : 0,
"start_time" : None, "rows_loaded_narrative" : 0,
"end_time" : None, "start_time" : None,
"end_time" : None,
} }
# ========================================================================= # =========================================================================
@@ -559,7 +730,7 @@ class IndicatorNormAggregator:
return df return df
# ========================================================================= # =========================================================================
# STEP 10: Save to BigQuery # STEP 10: Save agg_indicator_norm to BigQuery
# ========================================================================= # =========================================================================
def _save(self, df: pd.DataFrame) -> int: def _save(self, df: pd.DataFrame) -> int:
@@ -681,12 +852,12 @@ class IndicatorNormAggregator:
return rows_loaded return rows_loaded
# ========================================================================= # =========================================================================
# STEP 11: Summary log # STEP 11: Summary log agg_indicator_norm
# ========================================================================= # =========================================================================
def _log_summary(self, df: pd.DataFrame): def _log_summary(self, df: pd.DataFrame):
self.logger.info("\n" + "=" * 80) self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 11: SUMMARY") self.logger.info("STEP 11: SUMMARY — agg_indicator_norm")
self.logger.info("=" * 80) self.logger.info("=" * 80)
summary = ( summary = (
@@ -767,6 +938,162 @@ class IndicatorNormAggregator:
for _, r in pillar_summary.iterrows(): for _, r in pillar_summary.iterrows():
self.logger.info(f" {r['pillar_name']:<30}: {r['n_indicators']}") self.logger.info(f" {r['pillar_name']:<30}: {r['n_indicators']}")
# =========================================================================
# STEP 12-16: agg_narrative_indicator (lanjutan dari df_final)
# =========================================================================
def _build_narrative_table(self, df_final: pd.DataFrame):
"""
Pipeline agg_narrative_indicator yang dijalankan otomatis
setelah agg_indicator_norm selesai. Memakai df_final yang sudah ada
di memori, tanpa re-load dari BigQuery.
"""
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 12-16: agg_narrative_indicator")
self.logger.info(" Level : ASEAN (year x indicator_id)")
self.logger.info("=" * 80)
# -- STEP 12: Agregasi ke level ASEAN --
self.logger.info("\n--- STEP 12: AGGREGATE TO ASEAN LEVEL ---")
dim_cols = ["indicator_name", "unit", "direction", "pillar_name", "framework"]
agg_dict = {col: "first" for col in dim_cols}
agg_dict["value"] = "mean"
agg_dict["norm_score_1_100"] = "mean"
agg_dict["country_id"] = "count"
df_agg = (
df_final.groupby(["year", "indicator_id"])
.agg(agg_dict)
.reset_index()
.rename(columns={
"value" : "avg_value",
"norm_score_1_100": "avg_norm_score_1_100",
"country_id" : "n_countries",
})
)
self.logger.info(f" Rows : {len(df_agg):,}")
self.logger.info(f" Inds : {df_agg['indicator_id'].nunique()}")
self.logger.info(
f" Years : {int(df_agg['year'].min())} - {int(df_agg['year'].max())}"
)
# -- STEP 13: YoY avg_value per indikator --
self.logger.info("\n--- STEP 13: COMPUTE YoY avg_value ---")
parts = []
for ind_id, grp in df_agg.groupby("indicator_id"):
grp = grp.sort_values("year").copy()
grp["prev_avg_value"] = grp["avg_value"].shift(1)
grp["yoy_avg_value"] = np.where(
grp["avg_value"].notna() & grp["prev_avg_value"].notna(),
grp["avg_value"] - grp["prev_avg_value"],
np.nan,
)
grp = grp.drop(columns=["prev_avg_value"])
parts.append(grp)
df_agg = pd.concat(parts, ignore_index=True)
self.logger.info(f" yoy_avg_value nulls: {df_agg['yoy_avg_value'].isna().sum():,}")
# -- STEP 14: Assign performance --
self.logger.info("\n--- STEP 14: ASSIGN PERFORMANCE ---")
df_agg["performance"] = pd.NA
has_score = df_agg["avg_norm_score_1_100"].notna()
df_agg.loc[has_score & (df_agg["avg_norm_score_1_100"] >= _PERFORMANCE_THRESHOLD), "performance"] = "Good"
df_agg.loc[has_score & (df_agg["avg_norm_score_1_100"] < _PERFORMANCE_THRESHOLD), "performance"] = "Bad"
n_good = (df_agg["performance"] == "Good").sum()
n_bad = (df_agg["performance"] == "Bad").sum()
self.logger.info(f" Good: {n_good:,} | Bad: {n_bad:,}")
# -- STEP 15: Build narrative --
self.logger.info("\n--- STEP 15: BUILD NARRATIVE ---")
df_agg["narrative"] = df_agg.apply(_build_narrative, axis=1)
self.logger.info(f" Narratives generated: {len(df_agg):,}")
self.logger.info("\n Sample (first 2):")
for _, row in df_agg.head(2).iterrows():
self.logger.info(
f"\n [{int(row['year'])}] {row['indicator_name'][:50]}"
f"\n -> {row['narrative'][:250]}..."
)
# -- STEP 16: Save --
self.logger.info("\n--- STEP 16: SAVE -> [Gold] agg_narrative_indicator ---")
out = df_agg[[
"year", "indicator_id", "indicator_name", "unit", "direction",
"pillar_name", "framework",
"avg_value", "avg_norm_score_1_100", "performance",
"yoy_avg_value", "n_countries", "narrative",
]].copy()
out = out.sort_values(["year", "pillar_name", "indicator_name"]).reset_index(drop=True)
out["year"] = out["year"].astype(int)
out["indicator_id"] = out["indicator_id"].astype(int)
out["indicator_name"] = out["indicator_name"].astype(str)
out["unit"] = out["unit"].fillna("").astype(str)
out["direction"] = out["direction"].astype(str)
out["pillar_name"] = out["pillar_name"].astype(str)
out["framework"] = out["framework"].astype(str)
out["avg_value"] = out["avg_value"].astype(float)
out["avg_norm_score_1_100"] = out["avg_norm_score_1_100"].astype(float)
out["performance"] = out["performance"].astype(str).replace("nan", pd.NA).astype("string")
out["yoy_avg_value"] = pd.to_numeric(out["yoy_avg_value"], errors="coerce").astype(float)
out["n_countries"] = out["n_countries"].astype(int)
out["narrative"] = out["narrative"].astype(str)
schema = [
bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("indicator_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("indicator_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("unit", "STRING", mode="NULLABLE"),
bigquery.SchemaField("direction", "STRING", mode="REQUIRED"),
bigquery.SchemaField("pillar_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("framework", "STRING", mode="REQUIRED"),
bigquery.SchemaField("avg_value", "FLOAT", mode="NULLABLE"),
bigquery.SchemaField("avg_norm_score_1_100", "FLOAT", mode="NULLABLE"),
bigquery.SchemaField("performance", "STRING", mode="NULLABLE"),
bigquery.SchemaField("yoy_avg_value", "FLOAT", mode="NULLABLE"),
bigquery.SchemaField("n_countries", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("narrative", "STRING", mode="NULLABLE"),
]
rows_loaded = load_to_bigquery(
self.client, out, "agg_narrative_indicator",
layer="gold", write_disposition="WRITE_TRUNCATE", schema=schema,
)
log_update(self.client, "DW", "agg_narrative_indicator", "full_load", rows_loaded)
self.logger.info(
f" [OK] agg_narrative_indicator: {rows_loaded:,} rows -> [Gold] fs_asean_gold"
)
metadata = {
"source_class" : self.__class__.__name__,
"table_name" : "agg_narrative_indicator",
"execution_timestamp": self.pipeline_start,
"duration_seconds" : (datetime.now() - self.pipeline_start).total_seconds(),
"rows_fetched" : self.pipeline_metadata["rows_fetched"],
"rows_transformed" : rows_loaded,
"rows_loaded" : rows_loaded,
"completeness_pct" : 100.0,
"config_snapshot" : json.dumps({
"source_table" : "agg_indicator_norm (in-memory df_final)",
"granularity" : "year x indicator_id (ASEAN level)",
"aggregation" : "mean across ASEAN countries",
"performance_threshold": _PERFORMANCE_THRESHOLD,
"yoy_column" : "yoy_avg_value",
"layer" : "gold",
}),
"validation_metrics" : json.dumps({
"total_rows" : rows_loaded,
"n_indicators": int(out["indicator_id"].nunique()),
"year_min" : int(out["year"].min()),
"year_max" : int(out["year"].max()),
}),
}
save_etl_metadata(self.client, metadata)
self.logger.info(" Metadata -> [AUDIT] etl_metadata")
self.pipeline_metadata["rows_loaded_narrative"] = rows_loaded
# ========================================================================= # =========================================================================
# RUN # RUN
# ========================================================================= # =========================================================================
@@ -780,6 +1107,7 @@ class IndicatorNormAggregator:
self.logger.info(" Source : fact_asean_food_security_selected") self.logger.info(" Source : fact_asean_food_security_selected")
self.logger.info(" Dim : dim_indicator (unit)") self.logger.info(" Dim : dim_indicator (unit)")
self.logger.info(" Output : agg_indicator_norm -> fs_asean_gold") self.logger.info(" Output : agg_indicator_norm -> fs_asean_gold")
self.logger.info(" agg_narrative_indicator -> fs_asean_gold")
self.logger.info("=" * 80) self.logger.info("=" * 80)
self.load_data() self.load_data()
@@ -787,14 +1115,17 @@ class IndicatorNormAggregator:
self._merge_unit() self._merge_unit()
self.sdgs_start_year = self._detect_sdgs_start_year() self.sdgs_start_year = self._detect_sdgs_start_year()
self._assign_framework() self._assign_framework()
df_normed = self._compute_norm_values() df_normed = self._compute_norm_values()
df_yoy = self._compute_yoy_columns(df_normed) df_yoy = self._compute_yoy_columns(df_normed)
df_scored = self._compute_scores(df_yoy) df_scored = self._compute_scores(df_yoy)
df_final = self._assign_performance(df_scored) df_final = self._assign_performance(df_scored)
rows_loaded = self._save(df_final) rows_loaded = self._save(df_final)
self.pipeline_metadata["rows_loaded"] = rows_loaded self.pipeline_metadata["rows_loaded"] = rows_loaded
self._log_summary(df_final) self._log_summary(df_final)
# Lanjut build agg_narrative_indicator dari df_final (tanpa re-load BQ)
self._build_narrative_table(df_final)
self.pipeline_metadata["end_time"] = datetime.now() self.pipeline_metadata["end_time"] = datetime.now()
duration = ( duration = (
self.pipeline_metadata["end_time"] - self.pipeline_start self.pipeline_metadata["end_time"] - self.pipeline_start
@@ -803,25 +1134,27 @@ class IndicatorNormAggregator:
self.logger.info("\n" + "=" * 80) self.logger.info("\n" + "=" * 80)
self.logger.info("COMPLETED") self.logger.info("COMPLETED")
self.logger.info("=" * 80) self.logger.info("=" * 80)
self.logger.info(f" Duration : {duration:.2f}s") self.logger.info(f" Duration : {duration:.2f}s")
self.logger.info(f" Rows Fetched : {self.pipeline_metadata['rows_fetched']:,}") self.logger.info(f" Rows Fetched : {self.pipeline_metadata['rows_fetched']:,}")
self.logger.info(f" Rows Loaded : {rows_loaded:,}") self.logger.info(f" Rows Loaded (norm) : {rows_loaded:,}")
self.logger.info(f" sdgs_start_year : {self.sdgs_start_year}") self.logger.info(f" Rows Loaded (narrative) : {self.pipeline_metadata['rows_loaded_narrative']:,}")
self.logger.info(f" sdgs_start_year : {self.sdgs_start_year}")
# ============================================================================= # =============================================================================
# AIRFLOW TASK # AIRFLOW TASK <-- tidak berubah
# ============================================================================= # =============================================================================
def run_indicator_norm_aggregation(): def run_indicator_norm_aggregation():
""" """
Airflow task: Build agg_indicator_norm. Airflow task: Build agg_indicator_norm + agg_narrative_indicator.
Dipanggil setelah analytical_layer_to_gold selesai. Dipanggil setelah analytical_layer_to_gold selesai.
""" """
client = get_bigquery_client() client = get_bigquery_client()
agg = IndicatorNormAggregator(client) agg = IndicatorNormAggregator(client)
agg.run() agg.run()
print(f"agg_indicator_norm loaded: {agg.pipeline_metadata['rows_loaded']:,} rows") print(f"agg_indicator_norm loaded : {agg.pipeline_metadata['rows_loaded']:,} rows")
print(f"agg_narrative_indicator loaded: {agg.pipeline_metadata['rows_loaded_narrative']:,} rows")
# ============================================================================= # =============================================================================
@@ -840,6 +1173,7 @@ if __name__ == "__main__":
print(" Source : fact_asean_food_security_selected") print(" Source : fact_asean_food_security_selected")
print(" Dim : dim_indicator (unit)") print(" Dim : dim_indicator (unit)")
print(" Output : agg_indicator_norm") print(" Output : agg_indicator_norm")
print(" agg_narrative_indicator")
print("=" * 80) print("=" * 80)
logger = setup_logging() logger = setup_logging()

View File

@@ -177,16 +177,16 @@ def standardize_country_names_asean(df: pd.DataFrame, country_column: str = 'cou
def assign_pillar(indicator_name: str) -> str: def assign_pillar(indicator_name: str) -> str:
""" """
Assign pillar berdasarkan keyword indikator. Assign pillar berdasarkan keyword indikator.
Return values: 'Availability', 'Access', 'Utilization', 'Stability', 'Supporting' Return values: 'Availability', 'Access', 'Utilization', 'Stability', 'Sustainability'
All ≤ 20 chars (varchar(20) constraint). All ≤ 20 chars (varchar(20) constraint).
""" """
if pd.isna(indicator_name): if pd.isna(indicator_name):
return 'Supporting' return 'Sustainability'
ind = str(indicator_name).lower() ind = str(indicator_name).lower()
for kw in ['requirement', 'coefficient', 'losses', 'fat supply']: for kw in ['requirement', 'coefficient', 'losses', 'fat supply']:
if kw in ind: if kw in ind:
return 'Supporting' return 'Sustainability'
if any(kw in ind for kw in [ if any(kw in ind for kw in [
'adequacy', 'protein supply', 'supply of protein', 'adequacy', 'protein supply', 'supply of protein',
@@ -215,7 +215,7 @@ def assign_pillar(indicator_name: str) -> str:
]): ]):
return 'Utilization' return 'Utilization'
return 'Supporting' return 'Sustainability'
# ============================================================================= # =============================================================================

View File

@@ -350,7 +350,7 @@ class DimensionalModelLoader:
elif any(w in n for w in ['water', 'sanitation', 'infrastructure', 'rail']): elif any(w in n for w in ['water', 'sanitation', 'infrastructure', 'rail']):
return 'Infrastructure' return 'Infrastructure'
else: else:
return 'Supporting' return 'Sustainability'
dim_indicator['indicator_category'] = dim_indicator['indicator_name'].apply(categorize_indicator) dim_indicator['indicator_category'] = dim_indicator['indicator_name'].apply(categorize_indicator)
dim_indicator = dim_indicator.drop_duplicates(subset=['indicator_name'], keep='first') dim_indicator = dim_indicator.drop_duplicates(subset=['indicator_name'], keep='first')
@@ -471,10 +471,10 @@ class DimensionalModelLoader:
try: try:
pillar_codes = { pillar_codes = {
'Availability': 'AVL', 'Access' : 'ACC', 'Availability': 'AVL', 'Access' : 'ACC',
'Utilization' : 'UTL', 'Stability': 'STB', 'Supporting': 'SPT', 'Utilization' : 'UTL', 'Stability': 'STB', 'Sustainability': 'STN',
} }
pillars_data = [ pillars_data = [
{'pillar_name': p, 'pillar_code': pillar_codes.get(p, 'SPT')} {'pillar_name': p, 'pillar_code': pillar_codes.get(p, 'STN')}
for p in self.df_clean['pillar'].unique() for p in self.df_clean['pillar'].unique()
] ]