schdule /3 bulan dan tambah metadata

This commit is contained in:
Debby
2026-04-15 13:37:40 +07:00
parent fa2cf75634
commit 76b451b2c1
2 changed files with 659 additions and 572 deletions

View File

@@ -2,7 +2,9 @@
AIRFLOW DAG — ETL Food Security BigQuery AIRFLOW DAG — ETL Food Security BigQuery
Kimball Data Warehouse Architecture Kimball Data Warehouse Architecture
Schedule : Setiap 3 hari sekali (timedelta(days=3)) Schedule : Setiap 3 bulan sekali (tanggal 1, pukul 00:00)
Cron: "0 0 1 */3 *"
-> 1 Jan, 1 Apr, 1 Jul, 1 Okt
Catchup : False Catchup : False
Kimball ETL Flow: Kimball ETL Flow:
@@ -72,7 +74,8 @@ from scripts.bigquery_dimensional_model import (
from scripts.bigquery_analytical_layer import ( from scripts.bigquery_analytical_layer import (
run_analytical_layer, run_analytical_layer,
) )
from scripts.bigquery_aggregate_layer import ( # FIXED: nama modul disesuaikan dengan nama file yang benar
from scripts.bigquery_analysis_aggregation import (
run_aggregation, run_aggregation,
) )
from scripts.bigquery_aggraget_fact_selected_layer import ( from scripts.bigquery_aggraget_fact_selected_layer import (
@@ -87,15 +90,23 @@ default_args = {
} }
# DAG DEFINITION # DAG DEFINITION
#
# schedule_interval = "0 0 1 */3 *"
# ┌───── menit : 0
# │ ┌─── jam : 0 (tengah malam)
# │ │ ┌─ hari : 1 (tanggal 1 setiap bulan yang cocok)
# │ │ │ ┌ bulan : */3 (setiap 3 bulan -> Jan, Apr, Jul, Okt)
# │ │ │ │ ┌ hari minggu : * (semua)
# 0 0 1 */3 *
with DAG( with DAG(
dag_id = "etl_food_security_bigquery", dag_id = "etl_food_security_bigquery",
description = "Kimball ETL: FAO, World Bank, UNICEF → BigQuery (Bronze → Silver → Gold)", description = "Kimball ETL: FAO, World Bank, UNICEF → BigQuery (Bronze → Silver → Gold) | Schedule: setiap 3 bulan",
default_args = default_args, default_args = default_args,
start_date = datetime(2026, 3, 1), start_date = datetime(2026, 1, 1),
schedule_interval = "0 0 */3 * *", schedule_interval = "0 0 1 */3 *", # Setiap 3 bulan sekali
catchup = False, catchup = False,
tags = ["food-security", "bigquery", "kimball"], tags = ["food-security", "bigquery", "kimball", "quarterly"],
) as dag: ) as dag:
task_verify = PythonOperator( task_verify = PythonOperator(

View File

@@ -644,6 +644,7 @@ class FoodSecurityAggregator:
self.logger.info(f"STEP 2: {table_name} -> [Gold] fs_asean_gold") self.logger.info(f"STEP 2: {table_name} -> [Gold] fs_asean_gold")
self.logger.info("=" * 70) self.logger.info("=" * 70)
try:
df_normed = self._get_norm_value_df() df_normed = self._get_norm_value_df()
df = ( df = (
@@ -691,6 +692,10 @@ class FoodSecurityAggregator:
self._finalize(table_name, rows) self._finalize(table_name, rows)
return df return df
except Exception as e:
self._fail(table_name, e)
raise
# ========================================================================= # =========================================================================
# STEP 3: agg_pillar_by_country # STEP 3: agg_pillar_by_country
# ========================================================================= # =========================================================================
@@ -702,6 +707,7 @@ class FoodSecurityAggregator:
self.logger.info(f"STEP 3: {table_name} -> [Gold] fs_asean_gold") self.logger.info(f"STEP 3: {table_name} -> [Gold] fs_asean_gold")
self.logger.info("=" * 70) self.logger.info("=" * 70)
try:
df_normed = self._get_norm_value_df() df_normed = self._get_norm_value_df()
df = ( df = (
@@ -744,6 +750,10 @@ class FoodSecurityAggregator:
self._finalize(table_name, rows) self._finalize(table_name, rows)
return df return df
except Exception as e:
self._fail(table_name, e)
raise
# ========================================================================= # =========================================================================
# STEP 4: agg_framework_by_country # STEP 4: agg_framework_by_country
# ========================================================================= # =========================================================================
@@ -781,6 +791,7 @@ class FoodSecurityAggregator:
self.logger.info(f"STEP 4: {table_name} -> [Gold] fs_asean_gold") self.logger.info(f"STEP 4: {table_name} -> [Gold] fs_asean_gold")
self.logger.info("=" * 70) self.logger.info("=" * 70)
try:
country_composite = self._calc_country_composite_inmemory() country_composite = self._calc_country_composite_inmemory()
df_normed = self._get_norm_value_df() df_normed = self._get_norm_value_df()
parts = [] parts = []
@@ -907,6 +918,10 @@ class FoodSecurityAggregator:
self._finalize(table_name, rows) self._finalize(table_name, rows)
return df return df
except Exception as e:
self._fail(table_name, e)
raise
# ========================================================================= # =========================================================================
# STEP 5: agg_framework_asean (+ performance_status) # STEP 5: agg_framework_asean (+ performance_status)
# ========================================================================= # =========================================================================
@@ -919,6 +934,7 @@ class FoodSecurityAggregator:
self.logger.info(f" performance_status threshold: {PERFORMANCE_THRESHOLD}") self.logger.info(f" performance_status threshold: {PERFORMANCE_THRESHOLD}")
self.logger.info("=" * 70) self.logger.info("=" * 70)
try:
df_normed = self._get_norm_value_df() df_normed = self._get_norm_value_df()
country_composite = self._calc_country_composite_inmemory() country_composite = self._calc_country_composite_inmemory()
@@ -961,7 +977,6 @@ class FoodSecurityAggregator:
"asean_norm" : "framework_norm", "asean_norm" : "framework_norm",
"n_countries" : "n_countries_with_data", "n_countries" : "n_countries_with_data",
}) })
# n_indicators Total = semua indikator yang hadir di tahun tsb
total_cols["n_indicators"] = total_cols["year"].apply( total_cols["n_indicators"] = total_cols["year"].apply(
lambda y: int(self._ind_year_framework[ lambda y: int(self._ind_year_framework[
self._ind_year_framework["year"] == y self._ind_year_framework["year"] == y
@@ -980,7 +995,6 @@ class FoodSecurityAggregator:
"asean_norm" : "framework_norm", "asean_norm" : "framework_norm",
"n_countries" : "n_countries_with_data", "n_countries" : "n_countries_with_data",
}) })
# Pre-SDGs era: semua indikator berlabel MDGs
mdgs_pre["n_indicators"] = mdgs_pre["year"].apply( mdgs_pre["n_indicators"] = mdgs_pre["year"].apply(
lambda y: _n_ind(y, "MDGs") lambda y: _n_ind(y, "MDGs")
) )
@@ -1095,6 +1109,10 @@ class FoodSecurityAggregator:
self._finalize(table_name, rows) self._finalize(table_name, rows)
return df return df
except Exception as e:
self._fail(table_name, e)
raise
# ========================================================================= # =========================================================================
# STEP 6: agg_narrative_overview # STEP 6: agg_narrative_overview
# ========================================================================= # =========================================================================
@@ -1110,6 +1128,7 @@ class FoodSecurityAggregator:
self.logger.info(f"STEP 6: {table_name} -> [Gold] fs_asean_gold") self.logger.info(f"STEP 6: {table_name} -> [Gold] fs_asean_gold")
self.logger.info("=" * 70) self.logger.info("=" * 70)
try:
asean_total = ( asean_total = (
df_framework_asean[df_framework_asean["framework"] == "Total"] df_framework_asean[df_framework_asean["framework"] == "Total"]
.sort_values("year") .sort_values("year")
@@ -1246,6 +1265,10 @@ class FoodSecurityAggregator:
self._finalize(table_name, rows) self._finalize(table_name, rows)
return df return df
except Exception as e:
self._fail(table_name, e)
raise
# ========================================================================= # =========================================================================
# STEP 7: agg_narrative_pillar # STEP 7: agg_narrative_pillar
# ========================================================================= # =========================================================================
@@ -1261,6 +1284,7 @@ class FoodSecurityAggregator:
self.logger.info(f"STEP 7: {table_name} -> [Gold] fs_asean_gold") self.logger.info(f"STEP 7: {table_name} -> [Gold] fs_asean_gold")
self.logger.info("=" * 70) self.logger.info("=" * 70)
try:
records = [] records = []
years = sorted(df_pillar_composite["year"].unique()) years = sorted(df_pillar_composite["year"].unique())
@@ -1371,6 +1395,10 @@ class FoodSecurityAggregator:
self._finalize(table_name, rows) self._finalize(table_name, rows)
return df return df
except Exception as e:
self._fail(table_name, e)
raise
# ========================================================================= # =========================================================================
# HELPERS # HELPERS
# ========================================================================= # =========================================================================
@@ -1392,16 +1420,64 @@ class FoodSecurityAggregator:
self.logger.info(f" -> {status} (n_checked={len(check)})") self.logger.info(f" -> {status} (n_checked={len(check)})")
def _finalize(self, table_name: str, rows_loaded: int): def _finalize(self, table_name: str, rows_loaded: int):
"""
Tandai tabel sebagai sukses, catat ke etl_logs dan etl_metadata.
start_time diambil dari self.load_metadata yang di-set di awal tiap step.
"""
end_time = datetime.now()
start_time = self.load_metadata[table_name].get("start_time")
self.load_metadata[table_name].update({ self.load_metadata[table_name].update({
"rows_loaded": rows_loaded, "status": "success", "end_time": datetime.now(), "rows_loaded": rows_loaded,
"status": "success",
"end_time": end_time,
}) })
# Catat ke etl_logs (ringkasan singkat)
log_update(self.client, "DW", table_name, "full_load", rows_loaded) log_update(self.client, "DW", table_name, "full_load", rows_loaded)
# Catat ke etl_metadata (detail: durasi, status, rows)
save_etl_metadata(
client = self.client,
table_name = table_name,
layer = "gold",
rows_loaded= rows_loaded,
start_time = start_time,
end_time = end_time,
status = "success",
)
self.logger.info(f" [OK] {table_name}: {rows_loaded:,} rows -> [Gold] fs_asean_gold") self.logger.info(f" [OK] {table_name}: {rows_loaded:,} rows -> [Gold] fs_asean_gold")
def _fail(self, table_name: str, error: Exception): def _fail(self, table_name: str, error: Exception):
self.load_metadata[table_name].update({"status": "failed", "end_time": datetime.now()}) """
self.logger.error(f" [FAIL] {table_name}: {error}") Tandai tabel sebagai gagal, catat ke etl_logs dan etl_metadata beserta pesan error.
log_update(self.client, "DW", table_name, "full_load", 0, "failed", str(error)) """
end_time = datetime.now()
start_time = self.load_metadata[table_name].get("start_time")
error_msg = str(error)
self.load_metadata[table_name].update({
"status": "failed",
"end_time": end_time,
})
# Catat ke etl_logs
log_update(self.client, "DW", table_name, "full_load", 0, "failed", error_msg)
# Catat ke etl_metadata dengan status failed + pesan error
save_etl_metadata(
client = self.client,
table_name = table_name,
layer = "gold",
rows_loaded= 0,
start_time = start_time,
end_time = end_time,
status = "failed",
error_msg = error_msg,
)
self.logger.error(f" [FAIL] {table_name}: {error_msg}")
# ========================================================================= # =========================================================================
# RUN # RUN