edited fact_asean_food_security_selected

This commit is contained in:
Debby
2026-03-28 10:05:23 +07:00
parent 2cab52a110
commit 9fb98bf4f8
2 changed files with 58 additions and 25 deletions

View File

@@ -421,8 +421,8 @@ class FoodSecurityAggregator:
self.logger.info("STEP 1: LOAD DATA from fs_asean_gold") self.logger.info("STEP 1: LOAD DATA from fs_asean_gold")
self.logger.info("=" * 70) self.logger.info("=" * 70)
self.df = read_from_bigquery(self.client, "analytical_food_security", layer='gold') self.df = read_from_bigquery(self.client, "fact_asean_food_security_selected", layer='gold')
self.logger.info(f" analytical_food_security : {len(self.df):,} rows") self.logger.info(f" fact_asean_food_security_selected : {len(self.df):,} rows")
self.dims["country"] = read_from_bigquery(self.client, "dim_country", layer='gold') self.dims["country"] = read_from_bigquery(self.client, "dim_country", layer='gold')
self.dims["indicator"] = read_from_bigquery(self.client, "dim_indicator", layer='gold') self.dims["indicator"] = read_from_bigquery(self.client, "dim_indicator", layer='gold')
@@ -1307,7 +1307,7 @@ class FoodSecurityAggregator:
def run_aggregation(): def run_aggregation():
""" """
Airflow task: Hitung semua agregasi dari analytical_food_security. Airflow task: Hitung semua agregasi dari fact_asean_food_security_selected.
Dipanggil setelah analytical_layer_to_gold selesai. Dipanggil setelah analytical_layer_to_gold selesai.
Menjalankan 6 tabel sekaligus: 4 agregasi + 2 narrative. Menjalankan 6 tabel sekaligus: 4 agregasi + 2 narrative.
""" """

View File

@@ -1,6 +1,6 @@
""" """
BIGQUERY ANALYTICAL LAYER - DATA FILTERING BIGQUERY ANALYTICAL LAYER - DATA FILTERING
FIXED: analytical_food_security disimpan di fs_asean_gold (layer='gold') FIXED: fact_asean_food_security_selected disimpan di fs_asean_gold (layer='gold')
Filtering Order: Filtering Order:
1. Load data (single years only) 1. Load data (single years only)
@@ -8,7 +8,7 @@ Filtering Order:
3. Filter complete indicators PER COUNTRY (auto-detect start year, no gaps) 3. Filter complete indicators PER COUNTRY (auto-detect start year, no gaps)
4. Filter countries with ALL pillars (FIXED SET) 4. Filter countries with ALL pillars (FIXED SET)
5. Filter indicators with consistent presence across FIXED countries 5. Filter indicators with consistent presence across FIXED countries
6. Save analytical table (value only, normalisasi & direction handled downstream) 6. Save analytical table (dengan nama/label lengkap untuk Looker Studio)
""" """
import pandas as pd import pandas as pd
@@ -40,15 +40,15 @@ from google.cloud import bigquery
class AnalyticalLayerLoader: class AnalyticalLayerLoader:
""" """
Analytical Layer Loader for BigQuery - CORRECTED VERSION v4 Analytical Layer Loader for BigQuery
Key Logic: Key Logic:
1. Complete per country (no gaps from start_year to end_year) 1. Complete per country (no gaps from start_year to end_year)
2. Filter countries with all pillars 2. Filter countries with all pillars
3. Ensure indicators have consistent country count across all years 3. Ensure indicators have consistent country count across all years
4. Save raw value only (normalisasi & direction handled downstream) 4. Save dengan kolom lengkap (nama + ID) untuk kemudahan Looker Studio
Output: analytical_food_security -> DW layer (Gold) -> fs_asean_gold Output: fact_asean_food_security_selected -> DW layer (Gold) -> fs_asean_gold
""" """
def __init__(self, client: bigquery.Client): def __init__(self, client: bigquery.Client):
@@ -424,32 +424,64 @@ class AnalyticalLayerLoader:
return year_stats return year_stats
def save_analytical_table(self): def save_analytical_table(self):
table_name = 'analytical_food_security' # ---------------------------------------------------------------
# CHANGED: nama tabel baru + kolom lengkap untuk Looker Studio
# ---------------------------------------------------------------
table_name = 'fact_asean_food_security_selected'
self.logger.info("\n" + "=" * 80) self.logger.info("\n" + "=" * 80)
self.logger.info(f"STEP 8: SAVE TO [DW/Gold] {table_name} -> fs_asean_gold") self.logger.info(f"STEP 8: SAVE TO [DW/Gold] {table_name} -> fs_asean_gold")
self.logger.info("=" * 80) self.logger.info("=" * 80)
try: try:
# ------------------------------------------------------------------
# Pilih kolom: ID + Nama lengkap + value
# Kolom nama memudahkan filtering/slicing langsung di Looker Studio
# tanpa perlu join ulang ke tabel dimensi.
# ------------------------------------------------------------------
analytical_df = self.df_clean[[ analytical_df = self.df_clean[[
'country_id', 'indicator_id', 'pillar_id', 'time_id', 'value' 'country_id',
'country_name',
'indicator_id',
'indicator_name',
'direction',
'pillar_id',
'pillar_name',
'time_id',
'year',
'value',
]].copy() ]].copy()
analytical_df = analytical_df.sort_values( analytical_df = analytical_df.sort_values(
['time_id', 'country_id', 'indicator_id'] ['year', 'country_name', 'pillar_name', 'indicator_name']
).reset_index(drop=True) ).reset_index(drop=True)
# Pastikan tipe data konsisten
analytical_df['country_id'] = analytical_df['country_id'].astype(int) analytical_df['country_id'] = analytical_df['country_id'].astype(int)
analytical_df['country_name'] = analytical_df['country_name'].astype(str)
analytical_df['indicator_id'] = analytical_df['indicator_id'].astype(int) analytical_df['indicator_id'] = analytical_df['indicator_id'].astype(int)
analytical_df['indicator_name']= analytical_df['indicator_name'].astype(str)
analytical_df['direction'] = analytical_df['direction'].astype(str)
analytical_df['pillar_id'] = analytical_df['pillar_id'].astype(int) analytical_df['pillar_id'] = analytical_df['pillar_id'].astype(int)
analytical_df['pillar_name'] = analytical_df['pillar_name'].astype(str)
analytical_df['time_id'] = analytical_df['time_id'].astype(int) analytical_df['time_id'] = analytical_df['time_id'].astype(int)
analytical_df['year'] = analytical_df['year'].astype(int)
analytical_df['value'] = analytical_df['value'].astype(float) analytical_df['value'] = analytical_df['value'].astype(float)
self.logger.info(f" Kolom yang disimpan: {list(analytical_df.columns)}")
self.logger.info(f" Total rows: {len(analytical_df):,}") self.logger.info(f" Total rows: {len(analytical_df):,}")
# Schema BigQuery
schema = [ schema = [
bigquery.SchemaField("country_id", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("country_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("country_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("indicator_id", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("indicator_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("indicator_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("direction", "STRING", mode="REQUIRED"),
bigquery.SchemaField("pillar_id", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("pillar_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("pillar_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("time_id", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("time_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("value", "FLOAT", mode="REQUIRED"), bigquery.SchemaField("value", "FLOAT", mode="REQUIRED"),
] ]
@@ -475,7 +507,8 @@ class AnalyticalLayerLoader:
'end_year' : self.end_year, 'end_year' : self.end_year,
'fixed_countries': len(self.selected_country_ids), 'fixed_countries': len(self.selected_country_ids),
'no_gaps' : True, 'no_gaps' : True,
'layer' : 'gold' 'layer' : 'gold',
'columns' : 'id + name + value (Looker Studio ready)'
}), }),
'validation_metrics' : json.dumps({ 'validation_metrics' : json.dumps({
'fixed_countries' : len(self.selected_country_ids), 'fixed_countries' : len(self.selected_country_ids),
@@ -497,7 +530,7 @@ class AnalyticalLayerLoader:
self.pipeline_metadata['start_time'] = self.pipeline_start self.pipeline_metadata['start_time'] = self.pipeline_start
self.logger.info("\n" + "=" * 80) self.logger.info("\n" + "=" * 80)
self.logger.info("Output: analytical_food_security → fs_asean_gold") self.logger.info("Output: fact_asean_food_security_selected → fs_asean_gold")
self.logger.info("=" * 80) self.logger.info("=" * 80)
self.load_source_data() self.load_source_data()
@@ -528,7 +561,7 @@ class AnalyticalLayerLoader:
def run_analytical_layer(): def run_analytical_layer():
""" """
Airflow task: Build analytical_food_security dari fact_food_security + dims. Airflow task: Build fact_asean_food_security_selected dari fact_food_security + dims.
Dipanggil setelah dimensional_model_to_gold selesai. Dipanggil setelah dimensional_model_to_gold selesai.
""" """
from scripts.bigquery_config import get_bigquery_client from scripts.bigquery_config import get_bigquery_client
@@ -544,7 +577,7 @@ def run_analytical_layer():
if __name__ == "__main__": if __name__ == "__main__":
print("=" * 80) print("=" * 80)
print("Output: analytical_food_security → fs_asean_gold") print("Output: fact_asean_food_security_selected → fs_asean_gold")
print("=" * 80) print("=" * 80)
logger = setup_logging() logger = setup_logging()