Compare commits
2 Commits
1061738345
...
ddc9fb3b48
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ddc9fb3b48 | ||
|
|
ddf15ca9a5 |
@@ -1,6 +1,6 @@
|
||||
"""
|
||||
BIGQUERY ANALYTICAL LAYER - DATA FILTERING
|
||||
FIXED: fact_asean_food_security_selected disimpan di fs_asean_gold (layer='gold')
|
||||
fact_asean_food_security_selected disimpan di fs_asean_gold (layer='gold')
|
||||
|
||||
Filtering Order:
|
||||
1. Load data (single years only)
|
||||
@@ -8,15 +8,20 @@ Filtering Order:
|
||||
3. Filter complete indicators PER COUNTRY (auto-detect start year, no gaps)
|
||||
4. Filter countries with ALL pillars (FIXED SET)
|
||||
5. Filter indicators with consistent presence across FIXED countries
|
||||
6. Calculate YoY per indicator per country
|
||||
7. Save analytical table (dengan nama/label lengkap + kolom framework + YoY untuk Looker Studio)
|
||||
6. Determine SDGs start year & assign framework (MDGs/SDGs) per indicator
|
||||
7. Calculate YoY per indicator per country
|
||||
8. Analyze indicator availability by year
|
||||
9. Save analytical table (dengan nama/label lengkap + kolom framework + YoY untuk Looker Studio)
|
||||
|
||||
UPDATED:
|
||||
- Kolom 'framework' (MDGs/SDGs) dipropagasi dari dim_indicator ke tabel output.
|
||||
Hal ini memungkinkan Looker Studio melakukan filter/slice berdasarkan framework
|
||||
tanpa perlu join ulang ke dim_indicator.
|
||||
- Kolom 'yoy_change' dan 'yoy_pct' ditambahkan untuk analisis Year-over-Year
|
||||
per indikator per negara langsung di Looker Studio.
|
||||
FRAMEWORK LOGIC:
|
||||
- SDG_START_YEAR = 2016 (default; auto-detect jika indikator SDGs pertama kali muncul lebih awal/lambat)
|
||||
- Indikator yang namanya ada di SDG_INDICATOR_KEYWORDS:
|
||||
* Jika data mulai >= SDG_START_YEAR -> 'SDGs'
|
||||
* Jika data mulai < SDG_START_YEAR -> 'MDGs'
|
||||
(artinya indikator ini sudah ada sebelum SDGs, mis. undernourishment)
|
||||
- Indikator yang namanya TIDAK ada di SDG_INDICATOR_KEYWORDS -> 'MDGs'
|
||||
- Penentuan framework dilakukan SETELAH filter selesai (data sudah bersih & range sudah fixed)
|
||||
sehingga start_year per indikator yang digunakan adalah start_year AKTUAL di dataset ini.
|
||||
"""
|
||||
|
||||
import pandas as pd
|
||||
@@ -42,6 +47,81 @@ from scripts.bigquery_helpers import (
|
||||
from google.cloud import bigquery
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# SDG INDICATOR KEYWORDS
|
||||
# =============================================================================
|
||||
# Daftar nama indikator (lowercase) yang termasuk dalam SDG Goal 2.
|
||||
# Matching dilakukan dengan `kw in indicator_name.lower()` sehingga
|
||||
# partial match tetap valid (menangani variasi format nama).
|
||||
#
|
||||
# Logika framework:
|
||||
# - Nama ada di set ini + start_year >= SDG_START_YEAR -> 'SDGs'
|
||||
# - Nama ada di set ini + start_year < SDG_START_YEAR -> 'MDGs'
|
||||
# (indikator sudah eksis sebelum SDGs, mis. prevalence of undernourishment)
|
||||
# - Nama TIDAK ada di set ini -> 'MDGs'
|
||||
|
||||
SDG_INDICATOR_KEYWORDS = frozenset([
|
||||
# TARGET 2.1.1 — Prevalence of undernourishment (shared, sudah ada sebelum SDGs)
|
||||
"prevalence of undernourishment (percent) (3-year average)",
|
||||
"number of people undernourished (million) (3-year average)",
|
||||
# TARGET 2.1.2 — FIES (SDGs only)
|
||||
"prevalence of severe food insecurity in the total population (percent) (3-year average)",
|
||||
"prevalence of severe food insecurity in the male adult population (percent) (3-year average)",
|
||||
"prevalence of severe food insecurity in the female adult population (percent) (3-year average)",
|
||||
"prevalence of moderate or severe food insecurity in the total population (percent) (3-year average)",
|
||||
"prevalence of moderate or severe food insecurity in the male adult population (percent) (3-year average)",
|
||||
"prevalence of moderate or severe food insecurity in the female adult population (percent) (3-year average)",
|
||||
"number of severely food insecure people (million) (3-year average)",
|
||||
"number of severely food insecure male adults (million) (3-year average)",
|
||||
"number of severely food insecure female adults (million) (3-year average)",
|
||||
"number of moderately or severely food insecure people (million) (3-year average)",
|
||||
"number of moderately or severely food insecure male adults (million) (3-year average)",
|
||||
"number of moderately or severely food insecure female adults (million) (3-year average)",
|
||||
# TARGET 2.2.1 — Stunting (shared)
|
||||
"percentage of children under 5 years of age who are stunted (modelled estimates) (percent)",
|
||||
"number of children under 5 years of age who are stunted (modeled estimates) (million)",
|
||||
# TARGET 2.2.2 — Wasting & Overweight (shared)
|
||||
"percentage of children under 5 years affected by wasting (percent)",
|
||||
"number of children under 5 years affected by wasting (million)",
|
||||
"percentage of children under 5 years of age who are overweight (modelled estimates) (percent)",
|
||||
"number of children under 5 years of age who are overweight (modeled estimates) (million)",
|
||||
# TARGET 2.2.3 — Anaemia (SDGs only)
|
||||
"prevalence of anemia among women of reproductive age (15-49 years) (percent)",
|
||||
"number of women of reproductive age (15-49 years) affected by anemia (million)",
|
||||
])
|
||||
|
||||
# Tahun resmi SDGs mulai berlaku (2030 Agenda adopted September 2015,
|
||||
# data reporting mulai 2016). Dipakai sebagai default jika auto-detect gagal.
|
||||
SDG_START_YEAR_DEFAULT = 2016
|
||||
|
||||
|
||||
def assign_framework_dynamic(
|
||||
indicator_name: str,
|
||||
indicator_start_year: int,
|
||||
sdg_start_year: int,
|
||||
) -> str:
|
||||
"""
|
||||
Tentukan framework (MDGs/SDGs) berdasarkan:
|
||||
1. Apakah nama indikator ada di SDG_INDICATOR_KEYWORDS?
|
||||
2. Apakah data indikator ini mulai pada tahun >= sdg_start_year?
|
||||
|
||||
Args:
|
||||
indicator_name : Nama indikator (akan di-lowercase untuk matching)
|
||||
indicator_start_year : Tahun pertama data indikator ini tersedia di dataset
|
||||
sdg_start_year : Tahun mulai SDGs (dari auto-detect atau default)
|
||||
|
||||
Returns:
|
||||
'SDGs' jika indikator termasuk SDG list DAN mulai >= sdg_start_year
|
||||
'MDGs' untuk semua kasus lainnya
|
||||
"""
|
||||
ind_lower = str(indicator_name).lower().strip()
|
||||
is_sdg_name = any(kw in ind_lower for kw in SDG_INDICATOR_KEYWORDS)
|
||||
|
||||
if is_sdg_name and indicator_start_year >= sdg_start_year:
|
||||
return 'SDGs'
|
||||
return 'MDGs'
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# ANALYTICAL LAYER CLASS
|
||||
# =============================================================================
|
||||
@@ -54,8 +134,9 @@ class AnalyticalLayerLoader:
|
||||
1. Complete per country (no gaps from start_year to end_year)
|
||||
2. Filter countries with all pillars
|
||||
3. Ensure indicators have consistent country count across all years
|
||||
4. Calculate YoY (year-over-year) change per indicator per country
|
||||
5. Save dengan kolom lengkap (nama + ID + framework + YoY) untuk Looker Studio
|
||||
4. Determine SDGs start year & assign framework per indicator dynamically
|
||||
5. Calculate YoY (year-over-year) change per indicator per country
|
||||
6. Save dengan kolom lengkap (nama + ID + framework + YoY) untuk Looker Studio
|
||||
|
||||
Output: fact_asean_food_security_selected -> DW layer (Gold) -> fs_asean_gold
|
||||
|
||||
@@ -83,6 +164,9 @@ class AnalyticalLayerLoader:
|
||||
self.end_year = None
|
||||
self.baseline_year = 2023
|
||||
|
||||
# SDGs-related — di-set oleh determine_sdg_start_year()
|
||||
self.sdg_start_year = SDG_START_YEAR_DEFAULT
|
||||
|
||||
self.pipeline_metadata = {
|
||||
'source_class' : self.__class__.__name__,
|
||||
'start_time' : None,
|
||||
@@ -97,13 +181,18 @@ class AnalyticalLayerLoader:
|
||||
self.pipeline_start = None
|
||||
self.pipeline_end = None
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# STEP 1: LOAD SOURCE DATA
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def load_source_data(self):
|
||||
self.logger.info("\n" + "=" * 80)
|
||||
self.logger.info("STEP 1: LOADING SOURCE DATA from fs_asean_gold")
|
||||
self.logger.info("=" * 80)
|
||||
|
||||
try:
|
||||
# Sertakan kolom framework dari dim_indicator dalam query
|
||||
# Tidak include framework dari dim_indicator —
|
||||
# framework akan ditentukan dinamis di Step 6 (determine_sdg_start_year)
|
||||
query = f"""
|
||||
SELECT
|
||||
f.country_id,
|
||||
@@ -111,7 +200,6 @@ class AnalyticalLayerLoader:
|
||||
f.indicator_id,
|
||||
i.indicator_name,
|
||||
i.direction,
|
||||
i.framework,
|
||||
f.pillar_id,
|
||||
p.pillar_name,
|
||||
f.time_id,
|
||||
@@ -128,7 +216,7 @@ class AnalyticalLayerLoader:
|
||||
JOIN `{get_table_id('dim_time', layer='gold')}` t ON f.time_id = t.time_id
|
||||
"""
|
||||
|
||||
self.logger.info("Loading fact table with dimensions (incl. framework)...")
|
||||
self.logger.info("Loading fact table with dimensions...")
|
||||
self.df_clean = self.client.query(query).result().to_dataframe(
|
||||
create_bqstorage_client=False
|
||||
)
|
||||
@@ -144,19 +232,6 @@ class AnalyticalLayerLoader:
|
||||
f" Year ranges (is_year_range=True): {yr.get(True, 0):,}"
|
||||
)
|
||||
|
||||
# Validasi kolom framework tersedia
|
||||
if 'framework' not in self.df_clean.columns:
|
||||
raise ValueError(
|
||||
"Kolom 'framework' tidak ditemukan di dim_indicator. "
|
||||
"Pastikan bigquery_cleaned_layer.py dan bigquery_dimensional_model.py "
|
||||
"sudah dijalankan dengan versi terbaru."
|
||||
)
|
||||
|
||||
fw_dist = self.df_clean.drop_duplicates('indicator_id')['framework'].value_counts()
|
||||
self.logger.info(f" Framework distribution (per indikator unik):")
|
||||
for fw, cnt in fw_dist.items():
|
||||
self.logger.info(f" {fw}: {cnt} indicators")
|
||||
|
||||
self.df_indicator = read_from_bigquery(self.client, 'dim_indicator', layer='gold')
|
||||
self.df_country = read_from_bigquery(self.client, 'dim_country', layer='gold')
|
||||
self.df_pillar = read_from_bigquery(self.client, 'dim_pillar', layer='gold')
|
||||
@@ -172,6 +247,10 @@ class AnalyticalLayerLoader:
|
||||
self.logger.error(f"Error loading source data: {e}")
|
||||
raise
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# STEP 2: DETERMINE YEAR BOUNDARIES
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def determine_year_boundaries(self):
|
||||
self.logger.info("\n" + "=" * 80)
|
||||
self.logger.info("STEP 2: DETERMINE YEAR BOUNDARIES")
|
||||
@@ -214,6 +293,10 @@ class AnalyticalLayerLoader:
|
||||
self.logger.info(f" Rows after: {len(self.df_clean):,}")
|
||||
return self.df_clean
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# STEP 3: FILTER COMPLETE INDICATORS PER COUNTRY
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def filter_complete_indicators_per_country(self):
|
||||
self.logger.info("\n" + "=" * 80)
|
||||
self.logger.info("STEP 3: FILTER COMPLETE INDICATORS PER COUNTRY (NO GAPS)")
|
||||
@@ -285,6 +368,10 @@ class AnalyticalLayerLoader:
|
||||
self.logger.info(f" Indicators: {self.df_clean['indicator_id'].nunique()}")
|
||||
return self.df_clean
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# STEP 4: SELECT COUNTRIES WITH ALL PILLARS
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def select_countries_with_all_pillars(self):
|
||||
self.logger.info("\n" + "=" * 80)
|
||||
self.logger.info("STEP 4: SELECT COUNTRIES WITH ALL PILLARS (FIXED SET)")
|
||||
@@ -323,6 +410,10 @@ class AnalyticalLayerLoader:
|
||||
self.logger.info(f" Rows after: {len(self.df_clean):,}")
|
||||
return self.df_clean
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# STEP 5: FILTER INDICATORS CONSISTENT ACROSS FIXED COUNTRIES
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def filter_indicators_consistent_across_fixed_countries(self):
|
||||
self.logger.info("\n" + "=" * 80)
|
||||
self.logger.info("STEP 5: FILTER INDICATORS WITH CONSISTENT PRESENCE")
|
||||
@@ -404,9 +495,138 @@ class AnalyticalLayerLoader:
|
||||
self.logger.info(f" Pillars: {self.df_clean['pillar_id'].nunique()}")
|
||||
return self.df_clean
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# STEP 6: DETERMINE SDG START YEAR & ASSIGN FRAMEWORK
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def determine_sdg_start_year(self):
|
||||
"""
|
||||
Tentukan tahun mulai SDGs secara otomatis dari data aktual, lalu
|
||||
assign kolom 'framework' (MDGs/SDGs) ke setiap baris di df_clean.
|
||||
|
||||
Logika penentuan SDG_START_YEAR:
|
||||
- Cari indikator yang namanya ada di SDG_INDICATOR_KEYWORDS (FIES, anaemia, dll.)
|
||||
dan yang diyakini HANYA ada di SDGs (bukan shared dengan MDGs).
|
||||
Proxy: indikator dengan keyword 'food insecurity' atau 'anemia'.
|
||||
- Ambil tahun pertama (min year) dari indikator-indikator tersebut di dataset ini.
|
||||
- Jika ditemukan -> sdg_start_year = tahun pertama itu.
|
||||
- Jika tidak ditemukan -> sdg_start_year = SDG_START_YEAR_DEFAULT (2016).
|
||||
|
||||
Logika assign framework per indikator (assign_framework_dynamic):
|
||||
- Nama ada di SDG_INDICATOR_KEYWORDS + start_year >= sdg_start_year -> 'SDGs'
|
||||
- Nama ada di SDG_INDICATOR_KEYWORDS + start_year < sdg_start_year -> 'MDGs'
|
||||
(indikator seperti undernourishment sudah ada sebelum SDGs)
|
||||
- Nama TIDAK ada di SDG_INDICATOR_KEYWORDS -> 'MDGs'
|
||||
"""
|
||||
self.logger.info("\n" + "=" * 80)
|
||||
self.logger.info("STEP 6: DETERMINE SDG START YEAR & ASSIGN FRAMEWORK")
|
||||
self.logger.info("=" * 80)
|
||||
|
||||
# --- 6a. Auto-detect SDG start year dari data aktual ---
|
||||
# Proxy SDGs-only: indikator yang pasti baru di SDGs (FIES & anaemia)
|
||||
sdg_proxy_keywords = [
|
||||
'food insecurity',
|
||||
'anemia',
|
||||
'anaemia',
|
||||
]
|
||||
|
||||
sdg_proxy_mask = self.df_clean['indicator_name'].str.lower().apply(
|
||||
lambda n: any(kw in n for kw in sdg_proxy_keywords)
|
||||
)
|
||||
df_sdg_proxy = self.df_clean[sdg_proxy_mask]
|
||||
|
||||
if len(df_sdg_proxy) > 0:
|
||||
detected_start = int(df_sdg_proxy['year'].min())
|
||||
self.sdg_start_year = detected_start
|
||||
self.logger.info(
|
||||
f"\n [OK] SDG start year AUTO-DETECTED dari data: {self.sdg_start_year}"
|
||||
)
|
||||
self.logger.info(f" Proxy indicators used (sample):")
|
||||
proxy_sample = (
|
||||
df_sdg_proxy['indicator_name']
|
||||
.drop_duplicates()
|
||||
.head(5)
|
||||
.tolist()
|
||||
)
|
||||
for ind in proxy_sample:
|
||||
self.logger.info(f" - {ind}")
|
||||
else:
|
||||
self.sdg_start_year = SDG_START_YEAR_DEFAULT
|
||||
self.logger.warning(
|
||||
f"\n [WARN] SDG proxy indicators not found in dataset. "
|
||||
f"Using default: {self.sdg_start_year}"
|
||||
)
|
||||
|
||||
self.logger.info(f"\n SDG_START_YEAR = {self.sdg_start_year}")
|
||||
|
||||
# --- 6b. Hitung start_year aktual per indikator di dataset ini ---
|
||||
indicator_start = (
|
||||
self.df_clean
|
||||
.groupby(['indicator_id', 'indicator_name'])['year']
|
||||
.min()
|
||||
.reset_index()
|
||||
)
|
||||
indicator_start.columns = ['indicator_id', 'indicator_name', 'actual_start_year']
|
||||
|
||||
# --- 6c. Assign framework per indikator ---
|
||||
indicator_start['framework'] = indicator_start.apply(
|
||||
lambda row: assign_framework_dynamic(
|
||||
indicator_name = row['indicator_name'],
|
||||
indicator_start_year = int(row['actual_start_year']),
|
||||
sdg_start_year = self.sdg_start_year,
|
||||
),
|
||||
axis=1
|
||||
)
|
||||
|
||||
# --- 6d. Log hasil assignment ---
|
||||
self.logger.info(f"\n Framework assignment per indicator:")
|
||||
self.logger.info(f" {'-'*85}")
|
||||
self.logger.info(
|
||||
f" {'ID':<5} {'Framework':<10} {'Start Yr':<10} {'Indicator Name'}"
|
||||
)
|
||||
self.logger.info(f" {'-'*85}")
|
||||
|
||||
for _, row in indicator_start.sort_values(
|
||||
['framework', 'actual_start_year', 'indicator_name']
|
||||
).iterrows():
|
||||
is_in_sdg_list = any(
|
||||
kw in str(row['indicator_name']).lower()
|
||||
for kw in SDG_INDICATOR_KEYWORDS
|
||||
)
|
||||
note = " [in SDG list]" if is_in_sdg_list else ""
|
||||
self.logger.info(
|
||||
f" {int(row['indicator_id']):<5} {row['framework']:<10} "
|
||||
f"{int(row['actual_start_year']):<10} {row['indicator_name'][:55]}{note}"
|
||||
)
|
||||
|
||||
fw_summary = indicator_start['framework'].value_counts()
|
||||
self.logger.info(f"\n Framework summary:")
|
||||
for fw, cnt in fw_summary.items():
|
||||
self.logger.info(f" {fw}: {cnt} indicators")
|
||||
|
||||
# --- 6e. Merge framework ke df_clean ---
|
||||
self.df_clean = self.df_clean.merge(
|
||||
indicator_start[['indicator_id', 'framework']],
|
||||
on='indicator_id', how='left'
|
||||
)
|
||||
self.df_clean['framework'] = self.df_clean['framework'].fillna('MDGs')
|
||||
|
||||
self.logger.info(f"\n [OK] Kolom 'framework' ditambahkan ke df_clean")
|
||||
self.logger.info(
|
||||
f" Row distribution — MDGs: "
|
||||
f"{(self.df_clean['framework'] == 'MDGs').sum():,} | "
|
||||
f"SDGs: {(self.df_clean['framework'] == 'SDGs').sum():,}"
|
||||
)
|
||||
|
||||
return self.df_clean
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# STEP 6b: VERIFY NO GAPS
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def verify_no_gaps(self):
|
||||
self.logger.info("\n" + "=" * 80)
|
||||
self.logger.info("STEP 6: VERIFY NO GAPS")
|
||||
self.logger.info("STEP 6c: VERIFY NO GAPS")
|
||||
self.logger.info("=" * 80)
|
||||
|
||||
expected_countries = len(self.selected_country_ids)
|
||||
@@ -431,44 +651,35 @@ class AnalyticalLayerLoader:
|
||||
|
||||
return True
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# STEP 7: CALCULATE YOY
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def calculate_yoy(self):
|
||||
"""
|
||||
Hitung Year-over-Year (YoY) per indikator per negara.
|
||||
|
||||
Kolom yang ditambahkan ke df_clean:
|
||||
Kolom yang ditambahkan:
|
||||
yoy_change : selisih absolut -> value - value_tahun_sebelumnya
|
||||
yoy_pct : perubahan relatif -> (yoy_change / abs(value_prev)) * 100
|
||||
|
||||
Catatan:
|
||||
- Baris tahun pertama per kombinasi country-indicator akan bernilai NULL
|
||||
(tidak ada tahun sebelumnya sebagai pembanding) — ini intentional.
|
||||
- value_prev di-drop setelah kalkulasi, tidak ikut disimpan ke BigQuery.
|
||||
- Dilakukan SETELAH verify_no_gaps() agar data sudah clean dan sorted benar.
|
||||
Baris tahun pertama per kombinasi country-indicator bernilai NULL (intentional).
|
||||
"""
|
||||
self.logger.info("\n" + "=" * 80)
|
||||
self.logger.info("STEP 6b: CALCULATE YEAR-OVER-YEAR (YoY) PER INDICATOR PER COUNTRY")
|
||||
self.logger.info("STEP 7: CALCULATE YEAR-OVER-YEAR (YoY) PER INDICATOR PER COUNTRY")
|
||||
self.logger.info("=" * 80)
|
||||
|
||||
df = self.df_clean.sort_values(['country_id', 'indicator_id', 'year']).copy()
|
||||
|
||||
# Nilai tahun sebelumnya (shifted within each country-indicator group)
|
||||
df['value_prev'] = df.groupby(['country_id', 'indicator_id'])['value'].shift(1)
|
||||
|
||||
# YoY absolute change: value(t) - value(t-1)
|
||||
df['yoy_change'] = df['value'] - df['value_prev']
|
||||
|
||||
# YoY percentage change: (yoy_change / |value_prev|) * 100
|
||||
# Hindari division by zero — jika value_prev == 0 atau NaN, hasilnya NaN
|
||||
df['yoy_pct'] = np.where(
|
||||
df['value_prev'].notna() & (df['value_prev'] != 0),
|
||||
(df['yoy_change'] / df['value_prev'].abs()) * 100,
|
||||
np.nan
|
||||
)
|
||||
|
||||
# Drop kolom bantu value_prev, tidak ikut disimpan ke BigQuery
|
||||
df = df.drop(columns=['value_prev'])
|
||||
|
||||
# Log ringkasan
|
||||
total_rows = len(df)
|
||||
valid_yoy = df['yoy_pct'].notna().sum()
|
||||
null_yoy = df['yoy_pct'].isna().sum()
|
||||
@@ -477,7 +688,6 @@ class AnalyticalLayerLoader:
|
||||
self.logger.info(f" YoY calculated : {valid_yoy:,}")
|
||||
self.logger.info(f" YoY NULL (base yr): {null_yoy:,} <- tahun pertama per country-indicator")
|
||||
|
||||
# Log distribusi YoY per indikator (sample)
|
||||
per_ind = (
|
||||
df[df['yoy_pct'].notna()]
|
||||
.groupby(['indicator_id', 'indicator_name'])['yoy_pct']
|
||||
@@ -504,7 +714,6 @@ class AnalyticalLayerLoader:
|
||||
f"{row['min']:>+8.2f} {row['max']:>+8.2f}"
|
||||
)
|
||||
|
||||
# Log distribusi YoY per negara (ringkasan)
|
||||
per_country = (
|
||||
df[df['yoy_pct'].notna()]
|
||||
.groupby(['country_id', 'country_name'])['yoy_pct']
|
||||
@@ -526,9 +735,13 @@ class AnalyticalLayerLoader:
|
||||
self.logger.info(f"\n [OK] YoY columns added: yoy_change, yoy_pct")
|
||||
return self.df_clean
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# STEP 8: ANALYZE INDICATOR AVAILABILITY BY YEAR
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def analyze_indicator_availability_by_year(self):
|
||||
self.logger.info("\n" + "=" * 80)
|
||||
self.logger.info("STEP 7: ANALYZE INDICATOR AVAILABILITY BY YEAR")
|
||||
self.logger.info("STEP 8: ANALYZE INDICATOR AVAILABILITY BY YEAR")
|
||||
self.logger.info("=" * 80)
|
||||
|
||||
year_stats = self.df_clean.groupby('year').agg({
|
||||
@@ -586,6 +799,10 @@ class AnalyticalLayerLoader:
|
||||
|
||||
return year_stats
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# STEP 9: SAVE ANALYTICAL TABLE
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def save_analytical_table(self):
|
||||
"""
|
||||
Simpan fact_asean_food_security_selected ke Gold layer.
|
||||
@@ -594,44 +811,20 @@ class AnalyticalLayerLoader:
|
||||
country_id, country_name — dimensi negara
|
||||
indicator_id, indicator_name — dimensi indikator
|
||||
direction — arah penilaian (higher/lower_better)
|
||||
framework — MDGs / SDGs (untuk filter Looker Studio)
|
||||
framework — MDGs/SDGs (ditentukan di Step 6)
|
||||
pillar_id, pillar_name — dimensi pilar
|
||||
time_id, year — dimensi waktu
|
||||
value — nilai indikator
|
||||
yoy_change — perubahan absolut YoY (NULLABLE: NULL di tahun pertama)
|
||||
yoy_pct — perubahan relatif YoY dalam % (NULLABLE: NULL di tahun pertama)
|
||||
|
||||
Kolom framework memungkinkan filter langsung di Looker Studio tanpa join ke dim_indicator.
|
||||
Kolom yoy_change dan yoy_pct memungkinkan analisis tren tahunan langsung di Looker Studio.
|
||||
yoy_change — perubahan absolut YoY (NULL di tahun pertama)
|
||||
yoy_pct — perubahan relatif YoY dalam % (NULL di tahun pertama)
|
||||
"""
|
||||
table_name = 'fact_asean_food_security_selected'
|
||||
|
||||
self.logger.info("\n" + "=" * 80)
|
||||
self.logger.info(f"STEP 8: SAVE TO [DW/Gold] {table_name} -> fs_asean_gold")
|
||||
self.logger.info(f"STEP 9: SAVE TO [DW/Gold] {table_name} -> fs_asean_gold")
|
||||
self.logger.info("=" * 80)
|
||||
|
||||
try:
|
||||
# Pastikan kolom framework tersedia di df_clean
|
||||
if 'framework' not in self.df_clean.columns:
|
||||
self.logger.warning(
|
||||
" [WARN] Kolom 'framework' tidak ada di df_clean. "
|
||||
"Melakukan join ke dim_indicator sebagai fallback..."
|
||||
)
|
||||
dim_ind = read_from_bigquery(self.client, 'dim_indicator', layer='gold')
|
||||
if 'framework' in dim_ind.columns:
|
||||
self.df_clean = self.df_clean.merge(
|
||||
dim_ind[['indicator_id', 'framework']],
|
||||
on='indicator_id', how='left'
|
||||
)
|
||||
self.df_clean['framework'] = self.df_clean['framework'].fillna('MDGs')
|
||||
self.logger.info(" [OK] framework di-join dari dim_indicator")
|
||||
else:
|
||||
self.df_clean['framework'] = 'MDGs'
|
||||
self.logger.warning(
|
||||
" [WARN] dim_indicator juga tidak punya kolom framework. "
|
||||
"Default: MDGs. Jalankan ulang pipeline dari cleaned_layer."
|
||||
)
|
||||
|
||||
# Pastikan kolom YoY tersedia — fallback jika calculate_yoy() tidak dipanggil
|
||||
if 'yoy_change' not in self.df_clean.columns or 'yoy_pct' not in self.df_clean.columns:
|
||||
self.logger.warning(
|
||||
@@ -659,11 +852,10 @@ class AnalyticalLayerLoader:
|
||||
['year', 'country_name', 'pillar_name', 'indicator_name']
|
||||
).reset_index(drop=True)
|
||||
|
||||
# Pastikan tipe data konsisten
|
||||
analytical_df['country_id'] = analytical_df['country_id'].astype(int)
|
||||
analytical_df['country_name'] = analytical_df['country_name'].astype(str)
|
||||
analytical_df['indicator_id'] = analytical_df['indicator_id'].astype(int)
|
||||
analytical_df['indicator_name']= analytical_df['indicator_name'].astype(str)
|
||||
analytical_df['indicator_name'] = analytical_df['indicator_name'].astype(str)
|
||||
analytical_df['direction'] = analytical_df['direction'].astype(str)
|
||||
analytical_df['framework'] = analytical_df['framework'].astype(str)
|
||||
analytical_df['pillar_id'] = analytical_df['pillar_id'].astype(int)
|
||||
@@ -671,20 +863,17 @@ class AnalyticalLayerLoader:
|
||||
analytical_df['time_id'] = analytical_df['time_id'].astype(int)
|
||||
analytical_df['year'] = analytical_df['year'].astype(int)
|
||||
analytical_df['value'] = analytical_df['value'].astype(float)
|
||||
# yoy_change dan yoy_pct tetap float — NULL (NaN) di tahun pertama adalah intentional
|
||||
analytical_df['yoy_change'] = analytical_df['yoy_change'].astype(float)
|
||||
analytical_df['yoy_pct'] = analytical_df['yoy_pct'].astype(float)
|
||||
|
||||
self.logger.info(f" Kolom yang disimpan: {list(analytical_df.columns)}")
|
||||
self.logger.info(f" Total rows: {len(analytical_df):,}")
|
||||
|
||||
# Log distribusi framework
|
||||
fw_dist = analytical_df.drop_duplicates('indicator_id')['framework'].value_counts()
|
||||
self.logger.info(f" Framework distribution (per indikator unik):")
|
||||
for fw, cnt in fw_dist.items():
|
||||
self.logger.info(f" {fw}: {cnt} indicators")
|
||||
|
||||
# Log statistik YoY
|
||||
yoy_valid = analytical_df['yoy_pct'].notna().sum()
|
||||
yoy_null = analytical_df['yoy_pct'].isna().sum()
|
||||
self.logger.info(f" YoY rows (calculated): {yoy_valid:,}")
|
||||
@@ -702,7 +891,6 @@ class AnalyticalLayerLoader:
|
||||
bigquery.SchemaField("time_id", "INTEGER", mode="REQUIRED"),
|
||||
bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"),
|
||||
bigquery.SchemaField("value", "FLOAT", mode="REQUIRED"),
|
||||
# NULLABLE karena tahun pertama per country-indicator tidak memiliki nilai sebelumnya
|
||||
bigquery.SchemaField("yoy_change", "FLOAT", mode="NULLABLE"),
|
||||
bigquery.SchemaField("yoy_pct", "FLOAT", mode="NULLABLE"),
|
||||
]
|
||||
@@ -727,17 +915,19 @@ class AnalyticalLayerLoader:
|
||||
'config_snapshot' : json.dumps({
|
||||
'start_year' : self.start_year,
|
||||
'end_year' : self.end_year,
|
||||
'fixed_countries': len(self.selected_country_ids),
|
||||
'sdg_start_year' : self.sdg_start_year,
|
||||
'fixed_countries' : len(self.selected_country_ids),
|
||||
'no_gaps' : True,
|
||||
'layer' : 'gold',
|
||||
'columns' : (
|
||||
'id + name + direction + framework + value + '
|
||||
'yoy_change + yoy_pct (Looker Studio ready)'
|
||||
)
|
||||
'framework_logic' : (
|
||||
f"SDGs if in SDG_INDICATOR_KEYWORDS AND start_year >= {self.sdg_start_year}, "
|
||||
"else MDGs"
|
||||
),
|
||||
}),
|
||||
'validation_metrics' : json.dumps({
|
||||
'fixed_countries' : len(self.selected_country_ids),
|
||||
'total_indicators': int(self.df_clean['indicator_id'].nunique()),
|
||||
'sdg_start_year' : self.sdg_start_year,
|
||||
'framework_dist' : fw_dist.to_dict(),
|
||||
'yoy_rows_valid' : int(yoy_valid),
|
||||
'yoy_rows_null' : int(yoy_null),
|
||||
@@ -755,6 +945,10 @@ class AnalyticalLayerLoader:
|
||||
self.logger.error(f"Error saving: {e}")
|
||||
raise
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# RUN
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def run(self):
|
||||
self.pipeline_start = datetime.now()
|
||||
self.pipeline_metadata['start_time'] = self.pipeline_start
|
||||
@@ -763,6 +957,7 @@ class AnalyticalLayerLoader:
|
||||
self.logger.info("Output: fact_asean_food_security_selected -> fs_asean_gold")
|
||||
self.logger.info("Kolom: country_id/name, indicator_id/name, direction, framework,")
|
||||
self.logger.info(" pillar_id/name, time_id, year, value, yoy_change, yoy_pct")
|
||||
self.logger.info(f"Framework: ditentukan dinamis berdasarkan SDG_START_YEAR (auto-detect)")
|
||||
self.logger.info("=" * 80)
|
||||
|
||||
self.load_source_data()
|
||||
@@ -770,8 +965,9 @@ class AnalyticalLayerLoader:
|
||||
self.filter_complete_indicators_per_country()
|
||||
self.select_countries_with_all_pillars()
|
||||
self.filter_indicators_consistent_across_fixed_countries()
|
||||
self.verify_no_gaps()
|
||||
self.calculate_yoy() # <-- Step 6b: hitung YoY
|
||||
self.determine_sdg_start_year() # Step 6: auto-detect SDG year & assign framework
|
||||
self.verify_no_gaps() # Step 6c: verifikasi tidak ada gap
|
||||
self.calculate_yoy() # Step 7: hitung YoY
|
||||
self.analyze_indicator_availability_by_year()
|
||||
self.save_analytical_table()
|
||||
|
||||
@@ -783,9 +979,10 @@ class AnalyticalLayerLoader:
|
||||
self.logger.info("=" * 80)
|
||||
self.logger.info(f" Duration : {duration:.2f}s")
|
||||
self.logger.info(f" Year Range : {self.start_year}-{self.end_year}")
|
||||
self.logger.info(f" SDG Start Yr : {self.sdg_start_year}")
|
||||
self.logger.info(f" Countries : {len(self.selected_country_ids)}")
|
||||
self.logger.info(f" Indicators : {self.df_clean['indicator_id'].nunique()}")
|
||||
self.logger.info(f" Rows Loaded: {self.pipeline_metadata['rows_loaded']:,}")
|
||||
self.logger.info(f" Rows Loaded : {self.pipeline_metadata['rows_loaded']:,}")
|
||||
|
||||
|
||||
# =============================================================================
|
||||
@@ -810,7 +1007,9 @@ def run_analytical_layer():
|
||||
|
||||
if __name__ == "__main__":
|
||||
print("=" * 80)
|
||||
print("BIGQUERY ANALYTICAL LAYER - DATA FILTERING")
|
||||
print("Output: fact_asean_food_security_selected -> fs_asean_gold")
|
||||
print("Framework: MDGs/SDGs ditentukan dinamis dari data (auto-detect SDG start year)")
|
||||
print("=" * 80)
|
||||
|
||||
logger = setup_logging()
|
||||
@@ -820,4 +1019,6 @@ if __name__ == "__main__":
|
||||
|
||||
print("\n" + "=" * 80)
|
||||
print("[OK] COMPLETED")
|
||||
print(f" SDG Start Year : {loader.sdg_start_year}")
|
||||
print(f" Rows Loaded : {loader.pipeline_metadata['rows_loaded']:,}")
|
||||
print("=" * 80)
|
||||
@@ -62,7 +62,6 @@ COLUMN_CONSTRAINTS = {
|
||||
'unit' : 20,
|
||||
'pillar' : 20,
|
||||
'direction' : 15,
|
||||
'framework' : 5, # 'MDGs'=4, 'SDGs'=4
|
||||
}
|
||||
|
||||
|
||||
@@ -292,62 +291,6 @@ def assign_direction(indicator_name: str) -> str:
|
||||
return 'higher_better'
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# FRAMEWORK CLASSIFICATION (MDGs vs SDGs)
|
||||
# =============================================================================
|
||||
|
||||
# Daftar keyword eksplisit dari SDG Goal 2 Khusus FIES (2030 Agenda for Sustainable Development).
|
||||
# Disimpan lowercase agar matching tidak sensitif terhadap kapitalisasi input.
|
||||
|
||||
SDG_INDICATOR_KEYWORDS = frozenset([
|
||||
"prevalence of severe food insecurity in the total population (percent) (3-year average)",
|
||||
"prevalence of severe food insecurity in the male adult population (percent) (3-year average)",
|
||||
"prevalence of severe food insecurity in the female adult population (percent) (3-year average)",
|
||||
"prevalence of moderate or severe food insecurity in the total population (percent) (3-year average)",
|
||||
"prevalence of moderate or severe food insecurity in the male adult population (percent) (3-year average)",
|
||||
"prevalence of moderate or severe food insecurity in the female adult population (percent) (3-year average)",
|
||||
"number of severely food insecure people (million) (3-year average)",
|
||||
"number of severely food insecure male adults (million) (3-year average)",
|
||||
"number of severely food insecure female adults (million) (3-year average)",
|
||||
"number of moderately or severely food insecure people (million) (3-year average)",
|
||||
"number of moderately or severely food insecure male adults (million) (3-year average)",
|
||||
"number of moderately or severely food insecure female adults (million) (3-year average)",
|
||||
])
|
||||
|
||||
|
||||
def assign_framework(indicator_name: str) -> str:
|
||||
"""
|
||||
Assign framework berdasarkan daftar eksplisit indikator SDG Goal 2
|
||||
dari 2030 Agenda for Sustainable Development (versi Maret 2020).
|
||||
|
||||
Logika:
|
||||
- Lowercase nama indikator input
|
||||
- Cek apakah ada keyword SDG (lowercase) yang terkandung di dalam nama indikator
|
||||
- Jika ya -> 'SDGs'
|
||||
- Jika tidak -> 'MDGs' (indikator FAO/lama yang bukan SDG resmi)
|
||||
|
||||
FIX: Bug sebelumnya menggunakan `kw in ind` (cek apakah keyword ada di dalam ind),
|
||||
padahal seharusnya `kw in ind` sudah benar secara logika — tapi keyword di-set
|
||||
dengan kapitalisasi campuran sementara `ind` sudah di-lowercase, sehingga
|
||||
perbandingan selalu gagal. Solusi: simpan keyword dalam lowercase di set,
|
||||
sehingga `kw in ind` bekerja dengan benar.
|
||||
|
||||
Return values: 'MDGs' atau 'SDGs'
|
||||
Panjang max 4 chars (dalam constraint varchar(5)).
|
||||
"""
|
||||
if pd.isna(indicator_name):
|
||||
return 'MDGs'
|
||||
|
||||
# Lowercase input agar matching tidak sensitif terhadap kapitalisasi
|
||||
ind = str(indicator_name).lower().strip()
|
||||
|
||||
# Cek apakah salah satu keyword SDG (sudah lowercase) ada di dalam ind
|
||||
if any(kw in ind for kw in SDG_INDICATOR_KEYWORDS):
|
||||
return 'SDGs'
|
||||
|
||||
return 'MDGs'
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# CLEANED DATA LOADER
|
||||
# =============================================================================
|
||||
@@ -365,7 +308,7 @@ class CleanedDataLoader:
|
||||
1. Standardize country names (ASEAN)
|
||||
2. Remove missing values
|
||||
3. Remove duplicates
|
||||
4. Add pillar, direction & framework classification
|
||||
4. Add pillar & direction classification
|
||||
5. Apply column constraints
|
||||
6. Load ke BigQuery
|
||||
7. Log ke Audit layer
|
||||
@@ -382,7 +325,6 @@ class CleanedDataLoader:
|
||||
bigquery.SchemaField("unit", "STRING", mode="NULLABLE"),
|
||||
bigquery.SchemaField("pillar", "STRING", mode="REQUIRED"),
|
||||
bigquery.SchemaField("direction", "STRING", mode="REQUIRED"),
|
||||
bigquery.SchemaField("framework", "STRING", mode="REQUIRED"),
|
||||
]
|
||||
|
||||
def __init__(self, client: bigquery.Client, load_mode: str = 'full_refresh'):
|
||||
@@ -449,12 +391,11 @@ class CleanedDataLoader:
|
||||
return df_clean
|
||||
|
||||
def _step_add_classifications(self, df: pd.DataFrame) -> pd.DataFrame:
|
||||
print("\n [Step 4/5] Add pillar, direction & framework classification...")
|
||||
print("\n [Step 4/5] Add pillar & direction classification...")
|
||||
df = df.copy()
|
||||
|
||||
df['pillar'] = df['indicator_standardized'].apply(assign_pillar)
|
||||
df['direction'] = df['indicator_standardized'].apply(assign_direction)
|
||||
df['framework'] = df['indicator_standardized'].apply(assign_framework)
|
||||
|
||||
pillar_counts = df['pillar'].value_counts()
|
||||
print(f" Pillar distribution:")
|
||||
@@ -467,21 +408,6 @@ class CleanedDataLoader:
|
||||
pct = count / len(df) * 100
|
||||
print(f" - {direction}: {count:,} ({pct:.1f}%)")
|
||||
|
||||
framework_counts = df['framework'].value_counts()
|
||||
print(f" Framework distribution:")
|
||||
for fw, count in framework_counts.items():
|
||||
pct = count / len(df) * 100
|
||||
print(f" - {fw}: {count:,} ({pct:.1f}%)")
|
||||
|
||||
# Log indikator yang terklasifikasi SDGs untuk verifikasi
|
||||
sdg_inds = (
|
||||
df[df['framework'] == 'SDGs']['indicator_standardized']
|
||||
.drop_duplicates().sort_values().tolist()
|
||||
)
|
||||
print(f"\n SDG indicators ({len(sdg_inds)}):")
|
||||
for ind in sdg_inds:
|
||||
print(f" - {ind}")
|
||||
|
||||
return df
|
||||
|
||||
def _step_apply_constraints(self, df: pd.DataFrame) -> pd.DataFrame:
|
||||
@@ -506,7 +432,7 @@ class CleanedDataLoader:
|
||||
'max' : int(df['year'].max()) if not df['year'].isnull().all() else None,
|
||||
'unique_years': int(df['year'].nunique())
|
||||
}
|
||||
for col in ('pillar', 'direction', 'framework', 'source'):
|
||||
for col in ('pillar', 'direction', 'source'):
|
||||
if col in df.columns:
|
||||
validation[f'{col}_breakdown'] = {
|
||||
str(k): int(v) for k, v in df[col].value_counts().to_dict().items()
|
||||
|
||||
@@ -52,7 +52,7 @@ class DimensionalModelLoader:
|
||||
|
||||
Pipeline steps:
|
||||
1. Load dim_country
|
||||
2. Load dim_indicator (+ kolom framework dari cleaned_integrated)
|
||||
2. Load dim_indicator
|
||||
3. Load dim_time
|
||||
4. Load dim_source
|
||||
5. Load dim_pillar
|
||||
@@ -313,7 +313,6 @@ class DimensionalModelLoader:
|
||||
indicator_category — kategori (Health & Nutrition, dll.)
|
||||
unit — satuan ukuran
|
||||
direction — higher_better / lower_better
|
||||
framework — MDGs / SDGs <-- BARU: dibaca dari cleaned_integrated
|
||||
"""
|
||||
table_name = 'dim_indicator'
|
||||
self.load_metadata[table_name]['start_time'] = datetime.now()
|
||||
@@ -323,7 +322,6 @@ class DimensionalModelLoader:
|
||||
has_direction = 'direction' in self.df_clean.columns
|
||||
has_unit = 'unit' in self.df_clean.columns
|
||||
has_category = 'indicator_category' in self.df_clean.columns
|
||||
has_framework = 'framework' in self.df_clean.columns
|
||||
|
||||
dim_indicator = self.df_clean[['indicator_standardized']].drop_duplicates().copy()
|
||||
dim_indicator.columns = ['indicator_name']
|
||||
@@ -381,29 +379,10 @@ class DimensionalModelLoader:
|
||||
categorize_indicator
|
||||
)
|
||||
|
||||
# Framework — KOLOM BARU
|
||||
# Dibaca dari cleaned_integrated yang sudah menjalankan assign_framework().
|
||||
# Jika kolom belum ada (misal pipeline lama), fallback ke 'MDGs' dengan warning.
|
||||
if has_framework:
|
||||
fw_map = self.df_clean[
|
||||
['indicator_standardized', 'framework']
|
||||
].drop_duplicates()
|
||||
fw_map.columns = ['indicator_name', 'framework']
|
||||
dim_indicator = dim_indicator.merge(fw_map, on='indicator_name', how='left')
|
||||
# Pastikan tidak ada NULL setelah merge
|
||||
dim_indicator['framework'] = dim_indicator['framework'].fillna('MDGs')
|
||||
self.logger.info(" [OK] framework column from cleaned_integrated")
|
||||
else:
|
||||
dim_indicator['framework'] = 'MDGs'
|
||||
self.logger.warning(
|
||||
" [WARN] framework column not found in cleaned_integrated. "
|
||||
"Default: MDGs. Jalankan bigquery_cleaned_layer.py terlebih dahulu."
|
||||
)
|
||||
|
||||
dim_indicator = dim_indicator.drop_duplicates(subset=['indicator_name'], keep='first')
|
||||
|
||||
dim_indicator_final = dim_indicator[
|
||||
['indicator_name', 'indicator_category', 'unit', 'direction', 'framework']
|
||||
['indicator_name', 'indicator_category', 'unit', 'direction']
|
||||
].copy()
|
||||
dim_indicator_final = dim_indicator_final.reset_index(drop=True)
|
||||
dim_indicator_final.insert(0, 'indicator_id', range(1, len(dim_indicator_final) + 1))
|
||||
@@ -414,7 +393,6 @@ class DimensionalModelLoader:
|
||||
bigquery.SchemaField("indicator_category", "STRING", mode="REQUIRED"),
|
||||
bigquery.SchemaField("unit", "STRING", mode="NULLABLE"),
|
||||
bigquery.SchemaField("direction", "STRING", mode="REQUIRED"),
|
||||
bigquery.SchemaField("framework", "STRING", mode="REQUIRED"),
|
||||
]
|
||||
|
||||
rows_loaded = load_to_bigquery(
|
||||
@@ -427,7 +405,6 @@ class DimensionalModelLoader:
|
||||
for label, col in [
|
||||
('Categories', 'indicator_category'),
|
||||
('Direction', 'direction'),
|
||||
('Framework', 'framework'),
|
||||
]:
|
||||
self.logger.info(f" {label}:")
|
||||
for val, cnt in dim_indicator_final[col].value_counts().items():
|
||||
@@ -766,22 +743,6 @@ class DimensionalModelLoader:
|
||||
self.logger.info(f" Unique Sources : {int(stats['unique_sources']):>10,}")
|
||||
self.logger.info(f" Unique Pillars : {int(stats['unique_pillars']):>10,}")
|
||||
|
||||
# Validasi distribusi framework di dim_indicator
|
||||
query_fw = f"""
|
||||
SELECT framework, COUNT(*) AS count
|
||||
FROM `{get_table_id('dim_indicator', layer='gold')}`
|
||||
GROUP BY framework ORDER BY framework
|
||||
"""
|
||||
df_fw = self.client.query(query_fw).result().to_dataframe(
|
||||
create_bqstorage_client=False
|
||||
)
|
||||
if len(df_fw) > 0:
|
||||
self.logger.info(f"\n Framework Distribution (dim_indicator):")
|
||||
for _, row in df_fw.iterrows():
|
||||
self.logger.info(
|
||||
f" {row['framework']:10s}: {int(row['count']):>5,} indicators"
|
||||
)
|
||||
|
||||
query_dir = f"""
|
||||
SELECT direction, COUNT(*) AS count
|
||||
FROM `{get_table_id('dim_indicator', layer='gold')}`
|
||||
@@ -906,10 +867,6 @@ if __name__ == "__main__":
|
||||
print(f" Year range : {int(df_clean['year'].min())}-{int(df_clean['year'].max())}")
|
||||
if 'direction' in df_clean.columns:
|
||||
print(f" Direction : {df_clean['direction'].value_counts().to_dict()}")
|
||||
if 'framework' in df_clean.columns:
|
||||
print(f" Framework : {df_clean['framework'].value_counts().to_dict()}")
|
||||
else:
|
||||
print(" [WARN] framework column not found — run bigquery_cleaned_layer.py first")
|
||||
|
||||
print("\n[1/1] Dimensional Model Load -> DW (Gold)...")
|
||||
loader = DimensionalModelLoader(client, df_clean)
|
||||
@@ -917,7 +874,7 @@ if __name__ == "__main__":
|
||||
|
||||
print("\n" + "=" * 60)
|
||||
print("[OK] DIMENSIONAL MODEL ETL COMPLETED")
|
||||
print(" DW (Gold) : dim_country, dim_indicator (+ framework),")
|
||||
print(" dim_time, dim_source, dim_pillar, fact_food_security")
|
||||
print(" DW (Gold) : dim_country, dim_indicator, dim_time,")
|
||||
print(" dim_source, dim_pillar, fact_food_security")
|
||||
print(" AUDIT : etl_logs, etl_metadata")
|
||||
print("=" * 60)
|
||||
Reference in New Issue
Block a user