""" BIGQUERY ANALYTICAL LAYER - DATA FILTERING fact_asean_food_security_selected disimpan di fs_asean_gold (layer='gold') Filtering Order: 1. Load data (single years only) 2. Determine year boundaries (2013 - auto-detected end year) 3. Filter complete indicators PER COUNTRY (auto-detect start year, no gaps) 4. Filter countries with ALL pillars (FIXED SET) 5. Filter indicators with consistent presence across FIXED countries 6. Determine SDGs start year & assign framework (MDGs/SDGs) per indicator 7. Calculate YoY per indicator per country 8. Analyze indicator availability by year 9. Save analytical table (dengan nama/label lengkap + kolom framework + YoY untuk Looker Studio) FRAMEWORK LOGIC: - SDG_START_YEAR = 2016 (default; auto-detect jika indikator SDGs pertama kali muncul lebih awal/lambat) - Indikator yang namanya ada di SDG_INDICATOR_KEYWORDS: * Jika data mulai >= SDG_START_YEAR -> 'SDGs' * Jika data mulai < SDG_START_YEAR -> 'MDGs' (artinya indikator ini sudah ada sebelum SDGs, mis. undernourishment) - Indikator yang namanya TIDAK ada di SDG_INDICATOR_KEYWORDS -> 'MDGs' - Penentuan framework dilakukan SETELAH filter selesai (data sudah bersih & range sudah fixed) sehingga start_year per indikator yang digunakan adalah start_year AKTUAL di dataset ini. """ import pandas as pd import numpy as np from datetime import datetime import logging from typing import Dict, List import json import sys if hasattr(sys.stdout, 'reconfigure'): sys.stdout.reconfigure(encoding='utf-8') from scripts.bigquery_config import get_bigquery_client, CONFIG, get_table_id from scripts.bigquery_helpers import ( log_update, load_to_bigquery, read_from_bigquery, setup_logging, truncate_table, save_etl_metadata, ) from google.cloud import bigquery # ============================================================================= # SDG INDICATOR KEYWORDS # ============================================================================= # Daftar nama indikator (lowercase) yang termasuk dalam SDG Goal 2. # Matching dilakukan dengan `kw in indicator_name.lower()` sehingga # partial match tetap valid (menangani variasi format nama). # # Logika framework: # - Nama ada di set ini + start_year >= SDG_START_YEAR -> 'SDGs' # - Nama ada di set ini + start_year < SDG_START_YEAR -> 'MDGs' # (indikator sudah eksis sebelum SDGs, mis. prevalence of undernourishment) # - Nama TIDAK ada di set ini -> 'MDGs' SDG_INDICATOR_KEYWORDS = frozenset([ # TARGET 2.1.1 — Prevalence of undernourishment (shared, sudah ada sebelum SDGs) "prevalence of undernourishment (percent) (3-year average)", "number of people undernourished (million) (3-year average)", # TARGET 2.1.2 — FIES (SDGs only) "prevalence of severe food insecurity in the total population (percent) (3-year average)", "prevalence of severe food insecurity in the male adult population (percent) (3-year average)", "prevalence of severe food insecurity in the female adult population (percent) (3-year average)", "prevalence of moderate or severe food insecurity in the total population (percent) (3-year average)", "prevalence of moderate or severe food insecurity in the male adult population (percent) (3-year average)", "prevalence of moderate or severe food insecurity in the female adult population (percent) (3-year average)", "number of severely food insecure people (million) (3-year average)", "number of severely food insecure male adults (million) (3-year average)", "number of severely food insecure female adults (million) (3-year average)", "number of moderately or severely food insecure people (million) (3-year average)", "number of moderately or severely food insecure male adults (million) (3-year average)", "number of moderately or severely food insecure female adults (million) (3-year average)", # TARGET 2.2.1 — Stunting (shared) "percentage of children under 5 years of age who are stunted (modelled estimates) (percent)", "number of children under 5 years of age who are stunted (modeled estimates) (million)", # TARGET 2.2.2 — Wasting & Overweight (shared) "percentage of children under 5 years affected by wasting (percent)", "number of children under 5 years affected by wasting (million)", "percentage of children under 5 years of age who are overweight (modelled estimates) (percent)", "number of children under 5 years of age who are overweight (modeled estimates) (million)", # TARGET 2.2.3 — Anaemia (SDGs only) "prevalence of anemia among women of reproductive age (15-49 years) (percent)", "number of women of reproductive age (15-49 years) affected by anemia (million)", ]) # Tahun resmi SDGs mulai berlaku (2030 Agenda adopted September 2015, # data reporting mulai 2016). Dipakai sebagai default jika auto-detect gagal. SDG_START_YEAR_DEFAULT = 2016 def assign_framework_dynamic( indicator_name: str, indicator_start_year: int, sdg_start_year: int, ) -> str: """ Tentukan framework (MDGs/SDGs) berdasarkan: 1. Apakah nama indikator ada di SDG_INDICATOR_KEYWORDS? 2. Apakah data indikator ini mulai pada tahun >= sdg_start_year? Args: indicator_name : Nama indikator (akan di-lowercase untuk matching) indicator_start_year : Tahun pertama data indikator ini tersedia di dataset sdg_start_year : Tahun mulai SDGs (dari auto-detect atau default) Returns: 'SDGs' jika indikator termasuk SDG list DAN mulai >= sdg_start_year 'MDGs' untuk semua kasus lainnya """ ind_lower = str(indicator_name).lower().strip() is_sdg_name = any(kw in ind_lower for kw in SDG_INDICATOR_KEYWORDS) if is_sdg_name and indicator_start_year >= sdg_start_year: return 'SDGs' return 'MDGs' # ============================================================================= # ANALYTICAL LAYER CLASS # ============================================================================= class AnalyticalLayerLoader: """ Analytical Layer Loader for BigQuery Key Logic: 1. Complete per country (no gaps from start_year to end_year) 2. Filter countries with all pillars 3. Ensure indicators have consistent country count across all years 4. Determine SDGs start year & assign framework per indicator dynamically 5. Calculate YoY (year-over-year) change per indicator per country 6. Save dengan kolom lengkap (nama + ID + framework + YoY) untuk Looker Studio Output: fact_asean_food_security_selected -> DW layer (Gold) -> fs_asean_gold Kolom output: country_id, country_name, indicator_id, indicator_name, direction, framework, pillar_id, pillar_name, time_id, year, value, yoy_change, yoy_pct """ def __init__(self, client: bigquery.Client): self.client = client self.logger = logging.getLogger(self.__class__.__name__) self.logger.propagate = False self.df_clean = None self.df_indicator = None self.df_country = None self.df_pillar = None self.selected_country_ids = None self.start_year = 2013 self.end_year = None self.baseline_year = 2023 # SDGs-related — di-set oleh determine_sdg_start_year() self.sdg_start_year = SDG_START_YEAR_DEFAULT self.pipeline_metadata = { 'source_class' : self.__class__.__name__, 'start_time' : None, 'end_time' : None, 'duration_seconds' : None, 'rows_fetched' : 0, 'rows_transformed' : 0, 'rows_loaded' : 0, 'validation_metrics': {} } self.pipeline_start = None self.pipeline_end = None # ------------------------------------------------------------------ # STEP 1: LOAD SOURCE DATA # ------------------------------------------------------------------ def load_source_data(self): self.logger.info("\n" + "=" * 80) self.logger.info("STEP 1: LOADING SOURCE DATA from fs_asean_gold") self.logger.info("=" * 80) try: # Tidak include framework dari dim_indicator — # framework akan ditentukan dinamis di Step 6 (determine_sdg_start_year) query = f""" SELECT f.country_id, c.country_name, f.indicator_id, i.indicator_name, i.direction, f.pillar_id, p.pillar_name, f.time_id, t.year, t.start_year, t.end_year, t.is_year_range, f.value, f.source_id FROM `{get_table_id('fact_food_security', layer='gold')}` f JOIN `{get_table_id('dim_country', layer='gold')}` c ON f.country_id = c.country_id JOIN `{get_table_id('dim_indicator', layer='gold')}` i ON f.indicator_id = i.indicator_id JOIN `{get_table_id('dim_pillar', layer='gold')}` p ON f.pillar_id = p.pillar_id JOIN `{get_table_id('dim_time', layer='gold')}` t ON f.time_id = t.time_id """ self.logger.info("Loading fact table with dimensions...") self.df_clean = self.client.query(query).result().to_dataframe( create_bqstorage_client=False ) self.logger.info(f" Loaded: {len(self.df_clean):,} rows") if 'is_year_range' in self.df_clean.columns: yr = self.df_clean['is_year_range'].value_counts() self.logger.info(f" Breakdown:") self.logger.info( f" Single years (is_year_range=False): {yr.get(False, 0):,}" ) self.logger.info( f" Year ranges (is_year_range=True): {yr.get(True, 0):,}" ) self.df_indicator = read_from_bigquery(self.client, 'dim_indicator', layer='gold') self.df_country = read_from_bigquery(self.client, 'dim_country', layer='gold') self.df_pillar = read_from_bigquery(self.client, 'dim_pillar', layer='gold') self.logger.info(f" Indicators: {len(self.df_indicator)}") self.logger.info(f" Countries: {len(self.df_country)}") self.logger.info(f" Pillars: {len(self.df_pillar)}") self.pipeline_metadata['rows_fetched'] = len(self.df_clean) return True except Exception as e: self.logger.error(f"Error loading source data: {e}") raise # ------------------------------------------------------------------ # STEP 2: DETERMINE YEAR BOUNDARIES # ------------------------------------------------------------------ def determine_year_boundaries(self): self.logger.info("\n" + "=" * 80) self.logger.info("STEP 2: DETERMINE YEAR BOUNDARIES") self.logger.info("=" * 80) df_2023 = self.df_clean[self.df_clean['year'] == self.baseline_year] baseline_indicator_count = df_2023['indicator_id'].nunique() self.logger.info(f"\nBaseline Year: {self.baseline_year}") self.logger.info(f"Baseline Indicator Count: {baseline_indicator_count}") years_sorted = sorted(self.df_clean['year'].unique(), reverse=True) selected_end_year = None for year in years_sorted: if year >= self.baseline_year: df_year = self.df_clean[self.df_clean['year'] == year] year_indicator_count = df_year['indicator_id'].nunique() status = "OK" if year_indicator_count >= baseline_indicator_count else "X" self.logger.info(f" [{status}] Year {int(year)}: {year_indicator_count} indicators") if year_indicator_count >= baseline_indicator_count and selected_end_year is None: selected_end_year = int(year) if selected_end_year is None: selected_end_year = self.baseline_year self.logger.warning(f" [!] No year found, using baseline: {selected_end_year}") else: self.logger.info(f"\n [OK] Selected End Year: {selected_end_year}") self.end_year = selected_end_year original_count = len(self.df_clean) self.df_clean = self.df_clean[ (self.df_clean['year'] >= self.start_year) & (self.df_clean['year'] <= self.end_year) ].copy() self.logger.info(f"\nFiltering {self.start_year}-{self.end_year}:") self.logger.info(f" Rows before: {original_count:,}") self.logger.info(f" Rows after: {len(self.df_clean):,}") return self.df_clean # ------------------------------------------------------------------ # STEP 3: FILTER COMPLETE INDICATORS PER COUNTRY # ------------------------------------------------------------------ def filter_complete_indicators_per_country(self): self.logger.info("\n" + "=" * 80) self.logger.info("STEP 3: FILTER COMPLETE INDICATORS PER COUNTRY (NO GAPS)") self.logger.info("=" * 80) grouped = self.df_clean.groupby([ 'country_id', 'country_name', 'indicator_id', 'indicator_name', 'pillar_id', 'pillar_name' ]) valid_combinations = [] removed_combinations = [] for (country_id, country_name, indicator_id, indicator_name, pillar_id, pillar_name), group in grouped: years_present = sorted(group['year'].unique()) start_year = int(min(years_present)) end_year_actual = int(max(years_present)) expected_years = list(range(start_year, self.end_year + 1)) missing_years = [y for y in expected_years if y not in years_present] has_gap = len(missing_years) > 0 is_complete = ( end_year_actual >= self.end_year and not has_gap and (self.end_year - start_year) >= 4 ) if is_complete: valid_combinations.append({'country_id': country_id, 'indicator_id': indicator_id}) else: reasons = [] if end_year_actual < self.end_year: reasons.append(f"ends {end_year_actual}") if has_gap: gap_str = str(missing_years[:3])[1:-1] if len(missing_years) > 3: gap_str += "..." reasons.append(f"gap:{gap_str}") if (self.end_year - start_year) < 4: reasons.append(f"span={self.end_year - start_year}") removed_combinations.append({ 'country_name' : country_name, 'indicator_name': indicator_name, 'reasons' : ", ".join(reasons) }) self.logger.info(f"\n [+] Valid: {len(valid_combinations):,}") self.logger.info(f" [-] Removed: {len(removed_combinations):,}") df_valid = pd.DataFrame(valid_combinations) df_valid['key'] = ( df_valid['country_id'].astype(str) + '_' + df_valid['indicator_id'].astype(str) ) self.df_clean['key'] = ( self.df_clean['country_id'].astype(str) + '_' + self.df_clean['indicator_id'].astype(str) ) original_count = len(self.df_clean) self.df_clean = self.df_clean[self.df_clean['key'].isin(df_valid['key'])].copy() self.df_clean = self.df_clean.drop('key', axis=1) self.logger.info(f"\n Rows before: {original_count:,}") self.logger.info(f" Rows after: {len(self.df_clean):,}") self.logger.info(f" Countries: {self.df_clean['country_id'].nunique()}") self.logger.info(f" Indicators: {self.df_clean['indicator_id'].nunique()}") return self.df_clean # ------------------------------------------------------------------ # STEP 4: SELECT COUNTRIES WITH ALL PILLARS # ------------------------------------------------------------------ def select_countries_with_all_pillars(self): self.logger.info("\n" + "=" * 80) self.logger.info("STEP 4: SELECT COUNTRIES WITH ALL PILLARS (FIXED SET)") self.logger.info("=" * 80) total_pillars = self.df_clean['pillar_id'].nunique() country_pillar_count = self.df_clean.groupby(['country_id', 'country_name']).agg({ 'pillar_id' : 'nunique', 'indicator_id': 'nunique', 'year' : lambda x: f"{int(x.min())}-{int(x.max())}" }).reset_index() country_pillar_count.columns = [ 'country_id', 'country_name', 'pillar_count', 'indicator_count', 'year_range' ] for _, row in country_pillar_count.sort_values('pillar_count', ascending=False).iterrows(): status = "[+] KEEP" if row['pillar_count'] == total_pillars else "[-] REMOVE" self.logger.info( f" {status:<12} {row['country_name']:25s} " f"{row['pillar_count']}/{total_pillars} pillars" ) selected_countries = country_pillar_count[ country_pillar_count['pillar_count'] == total_pillars ] self.selected_country_ids = selected_countries['country_id'].tolist() self.logger.info(f"\n FIXED SET: {len(self.selected_country_ids)} countries") original_count = len(self.df_clean) self.df_clean = self.df_clean[ self.df_clean['country_id'].isin(self.selected_country_ids) ].copy() self.logger.info(f" Rows before: {original_count:,}") self.logger.info(f" Rows after: {len(self.df_clean):,}") return self.df_clean # ------------------------------------------------------------------ # STEP 5: FILTER INDICATORS CONSISTENT ACROSS FIXED COUNTRIES # ------------------------------------------------------------------ def filter_indicators_consistent_across_fixed_countries(self): self.logger.info("\n" + "=" * 80) self.logger.info("STEP 5: FILTER INDICATORS WITH CONSISTENT PRESENCE") self.logger.info("=" * 80) indicator_country_start = self.df_clean.groupby([ 'indicator_id', 'indicator_name', 'country_id' ])['year'].min().reset_index() indicator_country_start.columns = [ 'indicator_id', 'indicator_name', 'country_id', 'start_year' ] indicator_max_start = indicator_country_start.groupby([ 'indicator_id', 'indicator_name' ])['start_year'].max().reset_index() indicator_max_start.columns = ['indicator_id', 'indicator_name', 'max_start_year'] valid_indicators = [] removed_indicators = [] for _, ind_row in indicator_max_start.iterrows(): indicator_id = ind_row['indicator_id'] indicator_name = ind_row['indicator_name'] max_start = int(ind_row['max_start_year']) span = self.end_year - max_start if span < 4: removed_indicators.append({ 'indicator_name': indicator_name, 'reason' : f"span={span} < 4" }) continue expected_years = list(range(max_start, self.end_year + 1)) ind_data = self.df_clean[self.df_clean['indicator_id'] == indicator_id] all_years_complete = True problematic_years = [] for year in expected_years: country_count = ind_data[ind_data['year'] == year]['country_id'].nunique() if country_count < len(self.selected_country_ids): all_years_complete = False problematic_years.append(f"{int(year)}({country_count})") if all_years_complete: valid_indicators.append(indicator_id) else: removed_indicators.append({ 'indicator_name': indicator_name, 'reason' : ( f"missing countries in years: {', '.join(problematic_years[:5])}" ) }) self.logger.info(f"\n [+] Valid: {len(valid_indicators)}") self.logger.info(f" [-] Removed: {len(removed_indicators)}") if not valid_indicators: raise ValueError("No valid indicators found after filtering!") original_count = len(self.df_clean) self.df_clean = self.df_clean[ self.df_clean['indicator_id'].isin(valid_indicators) ].copy() self.df_clean = self.df_clean.merge( indicator_max_start[['indicator_id', 'max_start_year']], on='indicator_id', how='left' ) self.df_clean = self.df_clean[ self.df_clean['year'] >= self.df_clean['max_start_year'] ].copy() self.df_clean = self.df_clean.drop('max_start_year', axis=1) self.logger.info(f"\n Rows before: {original_count:,}") self.logger.info(f" Rows after: {len(self.df_clean):,}") self.logger.info(f" Countries: {self.df_clean['country_id'].nunique()}") self.logger.info(f" Indicators: {self.df_clean['indicator_id'].nunique()}") self.logger.info(f" Pillars: {self.df_clean['pillar_id'].nunique()}") return self.df_clean # ------------------------------------------------------------------ # STEP 6: DETERMINE SDG START YEAR & ASSIGN FRAMEWORK # ------------------------------------------------------------------ def determine_sdg_start_year(self): """ Tentukan tahun mulai SDGs secara otomatis dari data aktual, lalu assign kolom 'framework' (MDGs/SDGs) ke setiap baris di df_clean. Logika penentuan SDG_START_YEAR: - Cari indikator yang namanya ada di SDG_INDICATOR_KEYWORDS (FIES, anaemia, dll.) dan yang diyakini HANYA ada di SDGs (bukan shared dengan MDGs). Proxy: indikator dengan keyword 'food insecurity' atau 'anemia'. - Ambil tahun pertama (min year) dari indikator-indikator tersebut di dataset ini. - Jika ditemukan -> sdg_start_year = tahun pertama itu. - Jika tidak ditemukan -> sdg_start_year = SDG_START_YEAR_DEFAULT (2016). Logika assign framework per indikator (assign_framework_dynamic): - Nama ada di SDG_INDICATOR_KEYWORDS + start_year >= sdg_start_year -> 'SDGs' - Nama ada di SDG_INDICATOR_KEYWORDS + start_year < sdg_start_year -> 'MDGs' (indikator seperti undernourishment sudah ada sebelum SDGs) - Nama TIDAK ada di SDG_INDICATOR_KEYWORDS -> 'MDGs' """ self.logger.info("\n" + "=" * 80) self.logger.info("STEP 6: DETERMINE SDG START YEAR & ASSIGN FRAMEWORK") self.logger.info("=" * 80) # --- 6a. Auto-detect SDG start year dari data aktual --- # Proxy SDGs-only: indikator yang pasti baru di SDGs (FIES & anaemia) sdg_proxy_keywords = [ 'food insecurity', 'anemia', 'anaemia', ] sdg_proxy_mask = self.df_clean['indicator_name'].str.lower().apply( lambda n: any(kw in n for kw in sdg_proxy_keywords) ) df_sdg_proxy = self.df_clean[sdg_proxy_mask] if len(df_sdg_proxy) > 0: detected_start = int(df_sdg_proxy['year'].min()) self.sdg_start_year = detected_start self.logger.info( f"\n [OK] SDG start year AUTO-DETECTED dari data: {self.sdg_start_year}" ) self.logger.info(f" Proxy indicators used (sample):") proxy_sample = ( df_sdg_proxy['indicator_name'] .drop_duplicates() .head(5) .tolist() ) for ind in proxy_sample: self.logger.info(f" - {ind}") else: self.sdg_start_year = SDG_START_YEAR_DEFAULT self.logger.warning( f"\n [WARN] SDG proxy indicators not found in dataset. " f"Using default: {self.sdg_start_year}" ) self.logger.info(f"\n SDG_START_YEAR = {self.sdg_start_year}") # --- 6b. Hitung start_year aktual per indikator di dataset ini --- indicator_start = ( self.df_clean .groupby(['indicator_id', 'indicator_name'])['year'] .min() .reset_index() ) indicator_start.columns = ['indicator_id', 'indicator_name', 'actual_start_year'] # --- 6c. Assign framework per indikator --- indicator_start['framework'] = indicator_start.apply( lambda row: assign_framework_dynamic( indicator_name = row['indicator_name'], indicator_start_year = int(row['actual_start_year']), sdg_start_year = self.sdg_start_year, ), axis=1 ) # --- 6d. Log hasil assignment --- self.logger.info(f"\n Framework assignment per indicator:") self.logger.info(f" {'-'*85}") self.logger.info( f" {'ID':<5} {'Framework':<10} {'Start Yr':<10} {'Indicator Name'}" ) self.logger.info(f" {'-'*85}") for _, row in indicator_start.sort_values( ['framework', 'actual_start_year', 'indicator_name'] ).iterrows(): is_in_sdg_list = any( kw in str(row['indicator_name']).lower() for kw in SDG_INDICATOR_KEYWORDS ) note = " [in SDG list]" if is_in_sdg_list else "" self.logger.info( f" {int(row['indicator_id']):<5} {row['framework']:<10} " f"{int(row['actual_start_year']):<10} {row['indicator_name'][:55]}{note}" ) fw_summary = indicator_start['framework'].value_counts() self.logger.info(f"\n Framework summary:") for fw, cnt in fw_summary.items(): self.logger.info(f" {fw}: {cnt} indicators") # --- 6e. Merge framework ke df_clean --- self.df_clean = self.df_clean.merge( indicator_start[['indicator_id', 'framework']], on='indicator_id', how='left' ) self.df_clean['framework'] = self.df_clean['framework'].fillna('MDGs') self.logger.info(f"\n [OK] Kolom 'framework' ditambahkan ke df_clean") self.logger.info( f" Row distribution — MDGs: " f"{(self.df_clean['framework'] == 'MDGs').sum():,} | " f"SDGs: {(self.df_clean['framework'] == 'SDGs').sum():,}" ) return self.df_clean # ------------------------------------------------------------------ # STEP 6b: VERIFY NO GAPS # ------------------------------------------------------------------ def verify_no_gaps(self): self.logger.info("\n" + "=" * 80) self.logger.info("STEP 6c: VERIFY NO GAPS") self.logger.info("=" * 80) expected_countries = len(self.selected_country_ids) verification = self.df_clean.groupby( ['indicator_id', 'year'] )['country_id'].nunique().reset_index() verification.columns = ['indicator_id', 'year', 'country_count'] all_good = (verification['country_count'] == expected_countries).all() if all_good: self.logger.info( f" VERIFICATION PASSED — all combinations have {expected_countries} countries" ) else: bad = verification[verification['country_count'] != expected_countries] for _, row in bad.head(10).iterrows(): self.logger.error( f" Indicator {int(row['indicator_id'])}, Year {int(row['year'])}: " f"{int(row['country_count'])} countries (expected {expected_countries})" ) raise ValueError("Gap verification failed!") return True # ------------------------------------------------------------------ # STEP 7: CALCULATE YOY # ------------------------------------------------------------------ def calculate_yoy(self): """ Hitung Year-over-Year (YoY) per indikator per negara. Kolom yang ditambahkan: yoy_change : selisih absolut -> value - value_tahun_sebelumnya yoy_pct : perubahan relatif -> (yoy_change / abs(value_prev)) * 100 Baris tahun pertama per kombinasi country-indicator bernilai NULL (intentional). """ self.logger.info("\n" + "=" * 80) self.logger.info("STEP 7: CALCULATE YEAR-OVER-YEAR (YoY) PER INDICATOR PER COUNTRY") self.logger.info("=" * 80) df = self.df_clean.sort_values(['country_id', 'indicator_id', 'year']).copy() df['value_prev'] = df.groupby(['country_id', 'indicator_id'])['value'].shift(1) df['yoy_change'] = df['value'] - df['value_prev'] df['yoy_pct'] = np.where( df['value_prev'].notna() & (df['value_prev'] != 0), (df['yoy_change'] / df['value_prev'].abs()) * 100, np.nan ) df = df.drop(columns=['value_prev']) total_rows = len(df) valid_yoy = df['yoy_pct'].notna().sum() null_yoy = df['yoy_pct'].isna().sum() self.logger.info(f" Total rows : {total_rows:,}") self.logger.info(f" YoY calculated : {valid_yoy:,}") self.logger.info(f" YoY NULL (base yr): {null_yoy:,} <- tahun pertama per country-indicator") per_ind = ( df[df['yoy_pct'].notna()] .groupby(['indicator_id', 'indicator_name'])['yoy_pct'] .agg(['mean', 'std', 'min', 'max']) .reset_index() ) per_ind.columns = ['indicator_id', 'indicator_name', 'mean', 'std', 'min', 'max'] self.logger.info(f"\n YoY summary per indicator (top 10 by abs mean change):") self.logger.info(f" {'-'*100}") self.logger.info( f" {'ID':<5} {'Indicator Name':<52} {'Mean%':>8} {'Std%':>8} {'Min%':>8} {'Max%':>8}" ) self.logger.info(f" {'-'*100}") top_ind = per_ind.reindex( per_ind['mean'].abs().sort_values(ascending=False).index ).head(10) for _, row in top_ind.iterrows(): self.logger.info( f" {int(row['indicator_id']):<5} {row['indicator_name'][:50]:<52} " f"{row['mean']:>+8.2f} {row['std']:>8.2f} " f"{row['min']:>+8.2f} {row['max']:>+8.2f}" ) per_country = ( df[df['yoy_pct'].notna()] .groupby(['country_id', 'country_name'])['yoy_pct'] .agg(['mean', 'std']) .reset_index() ) per_country.columns = ['country_id', 'country_name', 'mean_yoy', 'std_yoy'] self.logger.info(f"\n YoY summary per country:") self.logger.info(f" {'-'*60}") self.logger.info(f" {'Country':<30} {'Mean YoY%':>10} {'Std YoY%':>10}") self.logger.info(f" {'-'*60}") for _, row in per_country.sort_values('mean_yoy', ascending=False).iterrows(): self.logger.info( f" {row['country_name']:<30} {row['mean_yoy']:>+10.2f} {row['std_yoy']:>10.2f}" ) self.df_clean = df self.logger.info(f"\n [OK] YoY columns added: yoy_change, yoy_pct") return self.df_clean # ------------------------------------------------------------------ # STEP 8: ANALYZE INDICATOR AVAILABILITY BY YEAR # ------------------------------------------------------------------ def analyze_indicator_availability_by_year(self): self.logger.info("\n" + "=" * 80) self.logger.info("STEP 8: ANALYZE INDICATOR AVAILABILITY BY YEAR") self.logger.info("=" * 80) year_stats = self.df_clean.groupby('year').agg({ 'indicator_id': 'nunique', 'country_id' : 'nunique' }).reset_index() year_stats.columns = ['year', 'indicator_count', 'country_count'] self.logger.info(f"\n{'Year':<8} {'Indicators':<15} {'Countries':<12} {'Rows'}") self.logger.info("-" * 50) for _, row in year_stats.iterrows(): year = int(row['year']) row_count = len(self.df_clean[self.df_clean['year'] == year]) self.logger.info( f"{year:<8} {int(row['indicator_count']):<15} " f"{int(row['country_count']):<12} {row_count:,}" ) indicator_details = self.df_clean.groupby([ 'indicator_id', 'indicator_name', 'pillar_name', 'direction', 'framework' ]).agg({'year': ['min', 'max'], 'country_id': 'nunique'}).reset_index() indicator_details.columns = [ 'indicator_id', 'indicator_name', 'pillar_name', 'direction', 'framework', 'start_year', 'end_year', 'country_count' ] indicator_details['year_range'] = ( indicator_details['start_year'].astype(int).astype(str) + '-' + indicator_details['end_year'].astype(int).astype(str) ) indicator_details = indicator_details.sort_values( ['framework', 'pillar_name', 'start_year', 'indicator_name'] ) self.logger.info(f"\nTotal Indicators: {len(indicator_details)}") for pillar, count in indicator_details.groupby('pillar_name').size().items(): self.logger.info(f" {pillar}: {count} indicators") self.logger.info(f"\nFramework breakdown:") for fw, count in indicator_details.groupby('framework').size().items(): self.logger.info(f" {fw}: {count} indicators") self.logger.info(f"\n{'-'*110}") self.logger.info( f"{'ID':<5} {'Indicator Name':<55} {'Pillar':<15} " f"{'Framework':<10} {'Years':<12} {'Dir':<8} {'Countries'}" ) self.logger.info(f"{'-'*110}") for _, row in indicator_details.iterrows(): direction = 'higher+' if row['direction'] == 'higher_better' else 'lower-' self.logger.info( f"{int(row['indicator_id']):<5} {row['indicator_name'][:52]:<55} " f"{row['pillar_name'][:13]:<15} {row['framework']:<10} " f"{row['year_range']:<12} {direction:<8} {int(row['country_count'])}" ) return year_stats # ------------------------------------------------------------------ # STEP 9: SAVE ANALYTICAL TABLE # ------------------------------------------------------------------ def save_analytical_table(self): """ Simpan fact_asean_food_security_selected ke Gold layer. Kolom yang disimpan: country_id, country_name — dimensi negara indicator_id, indicator_name — dimensi indikator direction — arah penilaian (higher/lower_better) framework — MDGs/SDGs (ditentukan di Step 6) pillar_id, pillar_name — dimensi pilar time_id, year — dimensi waktu value — nilai indikator yoy_change — perubahan absolut YoY (NULL di tahun pertama) yoy_pct — perubahan relatif YoY dalam % (NULL di tahun pertama) """ table_name = 'fact_asean_food_security_selected' self.logger.info("\n" + "=" * 80) self.logger.info(f"STEP 9: SAVE TO [DW/Gold] {table_name} -> fs_asean_gold") self.logger.info("=" * 80) try: # Pastikan kolom YoY tersedia — fallback jika calculate_yoy() tidak dipanggil if 'yoy_change' not in self.df_clean.columns or 'yoy_pct' not in self.df_clean.columns: self.logger.warning( " [WARN] Kolom YoY tidak ditemukan. Menjalankan calculate_yoy() sebagai fallback..." ) self.calculate_yoy() analytical_df = self.df_clean[[ 'country_id', 'country_name', 'indicator_id', 'indicator_name', 'direction', 'framework', 'pillar_id', 'pillar_name', 'time_id', 'year', 'value', 'yoy_change', 'yoy_pct', ]].copy() analytical_df = analytical_df.sort_values( ['year', 'country_name', 'pillar_name', 'indicator_name'] ).reset_index(drop=True) analytical_df['country_id'] = analytical_df['country_id'].astype(int) analytical_df['country_name'] = analytical_df['country_name'].astype(str) analytical_df['indicator_id'] = analytical_df['indicator_id'].astype(int) analytical_df['indicator_name'] = analytical_df['indicator_name'].astype(str) analytical_df['direction'] = analytical_df['direction'].astype(str) analytical_df['framework'] = analytical_df['framework'].astype(str) analytical_df['pillar_id'] = analytical_df['pillar_id'].astype(int) analytical_df['pillar_name'] = analytical_df['pillar_name'].astype(str) analytical_df['time_id'] = analytical_df['time_id'].astype(int) analytical_df['year'] = analytical_df['year'].astype(int) analytical_df['value'] = analytical_df['value'].astype(float) analytical_df['yoy_change'] = analytical_df['yoy_change'].astype(float) analytical_df['yoy_pct'] = analytical_df['yoy_pct'].astype(float) self.logger.info(f" Kolom yang disimpan: {list(analytical_df.columns)}") self.logger.info(f" Total rows: {len(analytical_df):,}") fw_dist = analytical_df.drop_duplicates('indicator_id')['framework'].value_counts() self.logger.info(f" Framework distribution (per indikator unik):") for fw, cnt in fw_dist.items(): self.logger.info(f" {fw}: {cnt} indicators") yoy_valid = analytical_df['yoy_pct'].notna().sum() yoy_null = analytical_df['yoy_pct'].isna().sum() self.logger.info(f" YoY rows (calculated): {yoy_valid:,}") self.logger.info(f" YoY rows (NULL/base) : {yoy_null:,}") schema = [ bigquery.SchemaField("country_id", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("country_name", "STRING", mode="REQUIRED"), bigquery.SchemaField("indicator_id", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("indicator_name", "STRING", mode="REQUIRED"), bigquery.SchemaField("direction", "STRING", mode="REQUIRED"), bigquery.SchemaField("framework", "STRING", mode="REQUIRED"), bigquery.SchemaField("pillar_id", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("pillar_name", "STRING", mode="REQUIRED"), bigquery.SchemaField("time_id", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("value", "FLOAT", mode="REQUIRED"), bigquery.SchemaField("yoy_change", "FLOAT", mode="NULLABLE"), bigquery.SchemaField("yoy_pct", "FLOAT", mode="NULLABLE"), ] rows_loaded = load_to_bigquery( self.client, analytical_df, table_name, layer='gold', write_disposition="WRITE_TRUNCATE", schema=schema ) self.pipeline_metadata['rows_loaded'] = rows_loaded log_update(self.client, 'DW', table_name, 'full_load', rows_loaded) metadata = { 'source_class' : self.__class__.__name__, 'table_name' : table_name, 'execution_timestamp': self.pipeline_start, 'duration_seconds' : (datetime.now() - self.pipeline_start).total_seconds(), 'rows_fetched' : self.pipeline_metadata['rows_fetched'], 'rows_transformed' : rows_loaded, 'rows_loaded' : rows_loaded, 'completeness_pct' : 100.0, 'config_snapshot' : json.dumps({ 'start_year' : self.start_year, 'end_year' : self.end_year, 'sdg_start_year' : self.sdg_start_year, 'fixed_countries' : len(self.selected_country_ids), 'no_gaps' : True, 'layer' : 'gold', 'framework_logic' : ( f"SDGs if in SDG_INDICATOR_KEYWORDS AND start_year >= {self.sdg_start_year}, " "else MDGs" ), }), 'validation_metrics' : json.dumps({ 'fixed_countries' : len(self.selected_country_ids), 'total_indicators': int(self.df_clean['indicator_id'].nunique()), 'sdg_start_year' : self.sdg_start_year, 'framework_dist' : fw_dist.to_dict(), 'yoy_rows_valid' : int(yoy_valid), 'yoy_rows_null' : int(yoy_null), }) } save_etl_metadata(self.client, metadata) self.logger.info( f" {table_name}: {rows_loaded:,} rows -> [DW/Gold] fs_asean_gold" ) self.logger.info(f" Metadata -> [AUDIT] etl_metadata") return rows_loaded except Exception as e: self.logger.error(f"Error saving: {e}") raise # ------------------------------------------------------------------ # RUN # ------------------------------------------------------------------ def run(self): self.pipeline_start = datetime.now() self.pipeline_metadata['start_time'] = self.pipeline_start self.logger.info("\n" + "=" * 80) self.logger.info("Output: fact_asean_food_security_selected -> fs_asean_gold") self.logger.info("Kolom: country_id/name, indicator_id/name, direction, framework,") self.logger.info(" pillar_id/name, time_id, year, value, yoy_change, yoy_pct") self.logger.info(f"Framework: ditentukan dinamis berdasarkan SDG_START_YEAR (auto-detect)") self.logger.info("=" * 80) self.load_source_data() self.determine_year_boundaries() self.filter_complete_indicators_per_country() self.select_countries_with_all_pillars() self.filter_indicators_consistent_across_fixed_countries() self.determine_sdg_start_year() # Step 6: auto-detect SDG year & assign framework self.verify_no_gaps() # Step 6c: verifikasi tidak ada gap self.calculate_yoy() # Step 7: hitung YoY self.analyze_indicator_availability_by_year() self.save_analytical_table() self.pipeline_end = datetime.now() duration = (self.pipeline_end - self.pipeline_start).total_seconds() self.logger.info("\n" + "=" * 80) self.logger.info("COMPLETED") self.logger.info("=" * 80) self.logger.info(f" Duration : {duration:.2f}s") self.logger.info(f" Year Range : {self.start_year}-{self.end_year}") self.logger.info(f" SDG Start Yr : {self.sdg_start_year}") self.logger.info(f" Countries : {len(self.selected_country_ids)}") self.logger.info(f" Indicators : {self.df_clean['indicator_id'].nunique()}") self.logger.info(f" Rows Loaded : {self.pipeline_metadata['rows_loaded']:,}") # ============================================================================= # AIRFLOW TASK FUNCTION # ============================================================================= def run_analytical_layer(): """ Airflow task: Build fact_asean_food_security_selected dari fact_food_security + dims. Dipanggil setelah dimensional_model_to_gold selesai. """ from scripts.bigquery_config import get_bigquery_client client = get_bigquery_client() loader = AnalyticalLayerLoader(client) loader.run() print(f"Analytical layer loaded: {loader.pipeline_metadata['rows_loaded']:,} rows") # ============================================================================= # MAIN EXECUTION # ============================================================================= if __name__ == "__main__": print("=" * 80) print("BIGQUERY ANALYTICAL LAYER - DATA FILTERING") print("Output: fact_asean_food_security_selected -> fs_asean_gold") print("Framework: MDGs/SDGs ditentukan dinamis dari data (auto-detect SDG start year)") print("=" * 80) logger = setup_logging() client = get_bigquery_client() loader = AnalyticalLayerLoader(client) loader.run() print("\n" + "=" * 80) print("[OK] COMPLETED") print(f" SDG Start Year : {loader.sdg_start_year}") print(f" Rows Loaded : {loader.pipeline_metadata['rows_loaded']:,}") print("=" * 80)