""" BIGQUERY ANALYTICAL LAYER - DATA FILTERING FIXED: fact_asean_food_security_selected disimpan di fs_asean_gold (layer='gold') Filtering Order: 1. Load data (single years only) 2. Determine year boundaries (2013 - auto-detected end year) 3. Filter complete indicators PER COUNTRY (auto-detect start year, no gaps) 4. Filter countries with ALL pillars (FIXED SET) 5. Filter indicators with consistent presence across FIXED countries 6. Calculate YoY per indicator per country 7. Save analytical table (dengan nama/label lengkap + kolom framework + YoY untuk Looker Studio) UPDATED: - Kolom 'framework' (MDGs/SDGs) dipropagasi dari dim_indicator ke tabel output. Hal ini memungkinkan Looker Studio melakukan filter/slice berdasarkan framework tanpa perlu join ulang ke dim_indicator. - Kolom 'yoy_change' dan 'yoy_pct' ditambahkan untuk analisis Year-over-Year per indikator per negara langsung di Looker Studio. """ import pandas as pd import numpy as np from datetime import datetime import logging from typing import Dict, List import json import sys if hasattr(sys.stdout, 'reconfigure'): sys.stdout.reconfigure(encoding='utf-8') from scripts.bigquery_config import get_bigquery_client, CONFIG, get_table_id from scripts.bigquery_helpers import ( log_update, load_to_bigquery, read_from_bigquery, setup_logging, truncate_table, save_etl_metadata, ) from google.cloud import bigquery # ============================================================================= # ANALYTICAL LAYER CLASS # ============================================================================= class AnalyticalLayerLoader: """ Analytical Layer Loader for BigQuery Key Logic: 1. Complete per country (no gaps from start_year to end_year) 2. Filter countries with all pillars 3. Ensure indicators have consistent country count across all years 4. Calculate YoY (year-over-year) change per indicator per country 5. Save dengan kolom lengkap (nama + ID + framework + YoY) untuk Looker Studio Output: fact_asean_food_security_selected -> DW layer (Gold) -> fs_asean_gold Kolom output: country_id, country_name, indicator_id, indicator_name, direction, framework, pillar_id, pillar_name, time_id, year, value, yoy_change, yoy_pct """ def __init__(self, client: bigquery.Client): self.client = client self.logger = logging.getLogger(self.__class__.__name__) self.logger.propagate = False self.df_clean = None self.df_indicator = None self.df_country = None self.df_pillar = None self.selected_country_ids = None self.start_year = 2013 self.end_year = None self.baseline_year = 2023 self.pipeline_metadata = { 'source_class' : self.__class__.__name__, 'start_time' : None, 'end_time' : None, 'duration_seconds' : None, 'rows_fetched' : 0, 'rows_transformed' : 0, 'rows_loaded' : 0, 'validation_metrics': {} } self.pipeline_start = None self.pipeline_end = None def load_source_data(self): self.logger.info("\n" + "=" * 80) self.logger.info("STEP 1: LOADING SOURCE DATA from fs_asean_gold") self.logger.info("=" * 80) try: # Sertakan kolom framework dari dim_indicator dalam query query = f""" SELECT f.country_id, c.country_name, f.indicator_id, i.indicator_name, i.direction, i.framework, f.pillar_id, p.pillar_name, f.time_id, t.year, t.start_year, t.end_year, t.is_year_range, f.value, f.source_id FROM `{get_table_id('fact_food_security', layer='gold')}` f JOIN `{get_table_id('dim_country', layer='gold')}` c ON f.country_id = c.country_id JOIN `{get_table_id('dim_indicator', layer='gold')}` i ON f.indicator_id = i.indicator_id JOIN `{get_table_id('dim_pillar', layer='gold')}` p ON f.pillar_id = p.pillar_id JOIN `{get_table_id('dim_time', layer='gold')}` t ON f.time_id = t.time_id """ self.logger.info("Loading fact table with dimensions (incl. framework)...") self.df_clean = self.client.query(query).result().to_dataframe( create_bqstorage_client=False ) self.logger.info(f" Loaded: {len(self.df_clean):,} rows") if 'is_year_range' in self.df_clean.columns: yr = self.df_clean['is_year_range'].value_counts() self.logger.info(f" Breakdown:") self.logger.info( f" Single years (is_year_range=False): {yr.get(False, 0):,}" ) self.logger.info( f" Year ranges (is_year_range=True): {yr.get(True, 0):,}" ) # Validasi kolom framework tersedia if 'framework' not in self.df_clean.columns: raise ValueError( "Kolom 'framework' tidak ditemukan di dim_indicator. " "Pastikan bigquery_cleaned_layer.py dan bigquery_dimensional_model.py " "sudah dijalankan dengan versi terbaru." ) fw_dist = self.df_clean.drop_duplicates('indicator_id')['framework'].value_counts() self.logger.info(f" Framework distribution (per indikator unik):") for fw, cnt in fw_dist.items(): self.logger.info(f" {fw}: {cnt} indicators") self.df_indicator = read_from_bigquery(self.client, 'dim_indicator', layer='gold') self.df_country = read_from_bigquery(self.client, 'dim_country', layer='gold') self.df_pillar = read_from_bigquery(self.client, 'dim_pillar', layer='gold') self.logger.info(f" Indicators: {len(self.df_indicator)}") self.logger.info(f" Countries: {len(self.df_country)}") self.logger.info(f" Pillars: {len(self.df_pillar)}") self.pipeline_metadata['rows_fetched'] = len(self.df_clean) return True except Exception as e: self.logger.error(f"Error loading source data: {e}") raise def determine_year_boundaries(self): self.logger.info("\n" + "=" * 80) self.logger.info("STEP 2: DETERMINE YEAR BOUNDARIES") self.logger.info("=" * 80) df_2023 = self.df_clean[self.df_clean['year'] == self.baseline_year] baseline_indicator_count = df_2023['indicator_id'].nunique() self.logger.info(f"\nBaseline Year: {self.baseline_year}") self.logger.info(f"Baseline Indicator Count: {baseline_indicator_count}") years_sorted = sorted(self.df_clean['year'].unique(), reverse=True) selected_end_year = None for year in years_sorted: if year >= self.baseline_year: df_year = self.df_clean[self.df_clean['year'] == year] year_indicator_count = df_year['indicator_id'].nunique() status = "OK" if year_indicator_count >= baseline_indicator_count else "X" self.logger.info(f" [{status}] Year {int(year)}: {year_indicator_count} indicators") if year_indicator_count >= baseline_indicator_count and selected_end_year is None: selected_end_year = int(year) if selected_end_year is None: selected_end_year = self.baseline_year self.logger.warning(f" [!] No year found, using baseline: {selected_end_year}") else: self.logger.info(f"\n [OK] Selected End Year: {selected_end_year}") self.end_year = selected_end_year original_count = len(self.df_clean) self.df_clean = self.df_clean[ (self.df_clean['year'] >= self.start_year) & (self.df_clean['year'] <= self.end_year) ].copy() self.logger.info(f"\nFiltering {self.start_year}-{self.end_year}:") self.logger.info(f" Rows before: {original_count:,}") self.logger.info(f" Rows after: {len(self.df_clean):,}") return self.df_clean def filter_complete_indicators_per_country(self): self.logger.info("\n" + "=" * 80) self.logger.info("STEP 3: FILTER COMPLETE INDICATORS PER COUNTRY (NO GAPS)") self.logger.info("=" * 80) grouped = self.df_clean.groupby([ 'country_id', 'country_name', 'indicator_id', 'indicator_name', 'pillar_id', 'pillar_name' ]) valid_combinations = [] removed_combinations = [] for (country_id, country_name, indicator_id, indicator_name, pillar_id, pillar_name), group in grouped: years_present = sorted(group['year'].unique()) start_year = int(min(years_present)) end_year_actual = int(max(years_present)) expected_years = list(range(start_year, self.end_year + 1)) missing_years = [y for y in expected_years if y not in years_present] has_gap = len(missing_years) > 0 is_complete = ( end_year_actual >= self.end_year and not has_gap and (self.end_year - start_year) >= 4 ) if is_complete: valid_combinations.append({'country_id': country_id, 'indicator_id': indicator_id}) else: reasons = [] if end_year_actual < self.end_year: reasons.append(f"ends {end_year_actual}") if has_gap: gap_str = str(missing_years[:3])[1:-1] if len(missing_years) > 3: gap_str += "..." reasons.append(f"gap:{gap_str}") if (self.end_year - start_year) < 4: reasons.append(f"span={self.end_year - start_year}") removed_combinations.append({ 'country_name' : country_name, 'indicator_name': indicator_name, 'reasons' : ", ".join(reasons) }) self.logger.info(f"\n [+] Valid: {len(valid_combinations):,}") self.logger.info(f" [-] Removed: {len(removed_combinations):,}") df_valid = pd.DataFrame(valid_combinations) df_valid['key'] = ( df_valid['country_id'].astype(str) + '_' + df_valid['indicator_id'].astype(str) ) self.df_clean['key'] = ( self.df_clean['country_id'].astype(str) + '_' + self.df_clean['indicator_id'].astype(str) ) original_count = len(self.df_clean) self.df_clean = self.df_clean[self.df_clean['key'].isin(df_valid['key'])].copy() self.df_clean = self.df_clean.drop('key', axis=1) self.logger.info(f"\n Rows before: {original_count:,}") self.logger.info(f" Rows after: {len(self.df_clean):,}") self.logger.info(f" Countries: {self.df_clean['country_id'].nunique()}") self.logger.info(f" Indicators: {self.df_clean['indicator_id'].nunique()}") return self.df_clean def select_countries_with_all_pillars(self): self.logger.info("\n" + "=" * 80) self.logger.info("STEP 4: SELECT COUNTRIES WITH ALL PILLARS (FIXED SET)") self.logger.info("=" * 80) total_pillars = self.df_clean['pillar_id'].nunique() country_pillar_count = self.df_clean.groupby(['country_id', 'country_name']).agg({ 'pillar_id' : 'nunique', 'indicator_id': 'nunique', 'year' : lambda x: f"{int(x.min())}-{int(x.max())}" }).reset_index() country_pillar_count.columns = [ 'country_id', 'country_name', 'pillar_count', 'indicator_count', 'year_range' ] for _, row in country_pillar_count.sort_values('pillar_count', ascending=False).iterrows(): status = "[+] KEEP" if row['pillar_count'] == total_pillars else "[-] REMOVE" self.logger.info( f" {status:<12} {row['country_name']:25s} " f"{row['pillar_count']}/{total_pillars} pillars" ) selected_countries = country_pillar_count[ country_pillar_count['pillar_count'] == total_pillars ] self.selected_country_ids = selected_countries['country_id'].tolist() self.logger.info(f"\n FIXED SET: {len(self.selected_country_ids)} countries") original_count = len(self.df_clean) self.df_clean = self.df_clean[ self.df_clean['country_id'].isin(self.selected_country_ids) ].copy() self.logger.info(f" Rows before: {original_count:,}") self.logger.info(f" Rows after: {len(self.df_clean):,}") return self.df_clean def filter_indicators_consistent_across_fixed_countries(self): self.logger.info("\n" + "=" * 80) self.logger.info("STEP 5: FILTER INDICATORS WITH CONSISTENT PRESENCE") self.logger.info("=" * 80) indicator_country_start = self.df_clean.groupby([ 'indicator_id', 'indicator_name', 'country_id' ])['year'].min().reset_index() indicator_country_start.columns = [ 'indicator_id', 'indicator_name', 'country_id', 'start_year' ] indicator_max_start = indicator_country_start.groupby([ 'indicator_id', 'indicator_name' ])['start_year'].max().reset_index() indicator_max_start.columns = ['indicator_id', 'indicator_name', 'max_start_year'] valid_indicators = [] removed_indicators = [] for _, ind_row in indicator_max_start.iterrows(): indicator_id = ind_row['indicator_id'] indicator_name = ind_row['indicator_name'] max_start = int(ind_row['max_start_year']) span = self.end_year - max_start if span < 4: removed_indicators.append({ 'indicator_name': indicator_name, 'reason' : f"span={span} < 4" }) continue expected_years = list(range(max_start, self.end_year + 1)) ind_data = self.df_clean[self.df_clean['indicator_id'] == indicator_id] all_years_complete = True problematic_years = [] for year in expected_years: country_count = ind_data[ind_data['year'] == year]['country_id'].nunique() if country_count < len(self.selected_country_ids): all_years_complete = False problematic_years.append(f"{int(year)}({country_count})") if all_years_complete: valid_indicators.append(indicator_id) else: removed_indicators.append({ 'indicator_name': indicator_name, 'reason' : ( f"missing countries in years: {', '.join(problematic_years[:5])}" ) }) self.logger.info(f"\n [+] Valid: {len(valid_indicators)}") self.logger.info(f" [-] Removed: {len(removed_indicators)}") if not valid_indicators: raise ValueError("No valid indicators found after filtering!") original_count = len(self.df_clean) self.df_clean = self.df_clean[ self.df_clean['indicator_id'].isin(valid_indicators) ].copy() self.df_clean = self.df_clean.merge( indicator_max_start[['indicator_id', 'max_start_year']], on='indicator_id', how='left' ) self.df_clean = self.df_clean[ self.df_clean['year'] >= self.df_clean['max_start_year'] ].copy() self.df_clean = self.df_clean.drop('max_start_year', axis=1) self.logger.info(f"\n Rows before: {original_count:,}") self.logger.info(f" Rows after: {len(self.df_clean):,}") self.logger.info(f" Countries: {self.df_clean['country_id'].nunique()}") self.logger.info(f" Indicators: {self.df_clean['indicator_id'].nunique()}") self.logger.info(f" Pillars: {self.df_clean['pillar_id'].nunique()}") return self.df_clean def verify_no_gaps(self): self.logger.info("\n" + "=" * 80) self.logger.info("STEP 6: VERIFY NO GAPS") self.logger.info("=" * 80) expected_countries = len(self.selected_country_ids) verification = self.df_clean.groupby( ['indicator_id', 'year'] )['country_id'].nunique().reset_index() verification.columns = ['indicator_id', 'year', 'country_count'] all_good = (verification['country_count'] == expected_countries).all() if all_good: self.logger.info( f" VERIFICATION PASSED — all combinations have {expected_countries} countries" ) else: bad = verification[verification['country_count'] != expected_countries] for _, row in bad.head(10).iterrows(): self.logger.error( f" Indicator {int(row['indicator_id'])}, Year {int(row['year'])}: " f"{int(row['country_count'])} countries (expected {expected_countries})" ) raise ValueError("Gap verification failed!") return True def calculate_yoy(self): """ Hitung Year-over-Year (YoY) per indikator per negara. Kolom yang ditambahkan ke df_clean: yoy_change : selisih absolut -> value - value_tahun_sebelumnya yoy_pct : perubahan relatif -> (yoy_change / abs(value_prev)) * 100 Catatan: - Baris tahun pertama per kombinasi country-indicator akan bernilai NULL (tidak ada tahun sebelumnya sebagai pembanding) — ini intentional. - value_prev di-drop setelah kalkulasi, tidak ikut disimpan ke BigQuery. - Dilakukan SETELAH verify_no_gaps() agar data sudah clean dan sorted benar. """ self.logger.info("\n" + "=" * 80) self.logger.info("STEP 6b: CALCULATE YEAR-OVER-YEAR (YoY) PER INDICATOR PER COUNTRY") self.logger.info("=" * 80) df = self.df_clean.sort_values(['country_id', 'indicator_id', 'year']).copy() # Nilai tahun sebelumnya (shifted within each country-indicator group) df['value_prev'] = df.groupby(['country_id', 'indicator_id'])['value'].shift(1) # YoY absolute change: value(t) - value(t-1) df['yoy_change'] = df['value'] - df['value_prev'] # YoY percentage change: (yoy_change / |value_prev|) * 100 # Hindari division by zero — jika value_prev == 0 atau NaN, hasilnya NaN df['yoy_pct'] = np.where( df['value_prev'].notna() & (df['value_prev'] != 0), (df['yoy_change'] / df['value_prev'].abs()) * 100, np.nan ) # Drop kolom bantu value_prev, tidak ikut disimpan ke BigQuery df = df.drop(columns=['value_prev']) # Log ringkasan total_rows = len(df) valid_yoy = df['yoy_pct'].notna().sum() null_yoy = df['yoy_pct'].isna().sum() self.logger.info(f" Total rows : {total_rows:,}") self.logger.info(f" YoY calculated : {valid_yoy:,}") self.logger.info(f" YoY NULL (base yr): {null_yoy:,} <- tahun pertama per country-indicator") # Log distribusi YoY per indikator (sample) per_ind = ( df[df['yoy_pct'].notna()] .groupby(['indicator_id', 'indicator_name'])['yoy_pct'] .agg(['mean', 'std', 'min', 'max']) .reset_index() ) per_ind.columns = ['indicator_id', 'indicator_name', 'mean', 'std', 'min', 'max'] self.logger.info(f"\n YoY summary per indicator (top 10 by abs mean change):") self.logger.info(f" {'-'*100}") self.logger.info( f" {'ID':<5} {'Indicator Name':<52} {'Mean%':>8} {'Std%':>8} {'Min%':>8} {'Max%':>8}" ) self.logger.info(f" {'-'*100}") top_ind = per_ind.reindex( per_ind['mean'].abs().sort_values(ascending=False).index ).head(10) for _, row in top_ind.iterrows(): self.logger.info( f" {int(row['indicator_id']):<5} {row['indicator_name'][:50]:<52} " f"{row['mean']:>+8.2f} {row['std']:>8.2f} " f"{row['min']:>+8.2f} {row['max']:>+8.2f}" ) # Log distribusi YoY per negara (ringkasan) per_country = ( df[df['yoy_pct'].notna()] .groupby(['country_id', 'country_name'])['yoy_pct'] .agg(['mean', 'std']) .reset_index() ) per_country.columns = ['country_id', 'country_name', 'mean_yoy', 'std_yoy'] self.logger.info(f"\n YoY summary per country:") self.logger.info(f" {'-'*60}") self.logger.info(f" {'Country':<30} {'Mean YoY%':>10} {'Std YoY%':>10}") self.logger.info(f" {'-'*60}") for _, row in per_country.sort_values('mean_yoy', ascending=False).iterrows(): self.logger.info( f" {row['country_name']:<30} {row['mean_yoy']:>+10.2f} {row['std_yoy']:>10.2f}" ) self.df_clean = df self.logger.info(f"\n [OK] YoY columns added: yoy_change, yoy_pct") return self.df_clean def analyze_indicator_availability_by_year(self): self.logger.info("\n" + "=" * 80) self.logger.info("STEP 7: ANALYZE INDICATOR AVAILABILITY BY YEAR") self.logger.info("=" * 80) year_stats = self.df_clean.groupby('year').agg({ 'indicator_id': 'nunique', 'country_id' : 'nunique' }).reset_index() year_stats.columns = ['year', 'indicator_count', 'country_count'] self.logger.info(f"\n{'Year':<8} {'Indicators':<15} {'Countries':<12} {'Rows'}") self.logger.info("-" * 50) for _, row in year_stats.iterrows(): year = int(row['year']) row_count = len(self.df_clean[self.df_clean['year'] == year]) self.logger.info( f"{year:<8} {int(row['indicator_count']):<15} " f"{int(row['country_count']):<12} {row_count:,}" ) indicator_details = self.df_clean.groupby([ 'indicator_id', 'indicator_name', 'pillar_name', 'direction', 'framework' ]).agg({'year': ['min', 'max'], 'country_id': 'nunique'}).reset_index() indicator_details.columns = [ 'indicator_id', 'indicator_name', 'pillar_name', 'direction', 'framework', 'start_year', 'end_year', 'country_count' ] indicator_details['year_range'] = ( indicator_details['start_year'].astype(int).astype(str) + '-' + indicator_details['end_year'].astype(int).astype(str) ) indicator_details = indicator_details.sort_values( ['framework', 'pillar_name', 'start_year', 'indicator_name'] ) self.logger.info(f"\nTotal Indicators: {len(indicator_details)}") for pillar, count in indicator_details.groupby('pillar_name').size().items(): self.logger.info(f" {pillar}: {count} indicators") self.logger.info(f"\nFramework breakdown:") for fw, count in indicator_details.groupby('framework').size().items(): self.logger.info(f" {fw}: {count} indicators") self.logger.info(f"\n{'-'*110}") self.logger.info( f"{'ID':<5} {'Indicator Name':<55} {'Pillar':<15} " f"{'Framework':<10} {'Years':<12} {'Dir':<8} {'Countries'}" ) self.logger.info(f"{'-'*110}") for _, row in indicator_details.iterrows(): direction = 'higher+' if row['direction'] == 'higher_better' else 'lower-' self.logger.info( f"{int(row['indicator_id']):<5} {row['indicator_name'][:52]:<55} " f"{row['pillar_name'][:13]:<15} {row['framework']:<10} " f"{row['year_range']:<12} {direction:<8} {int(row['country_count'])}" ) return year_stats def save_analytical_table(self): """ Simpan fact_asean_food_security_selected ke Gold layer. Kolom yang disimpan: country_id, country_name — dimensi negara indicator_id, indicator_name — dimensi indikator direction — arah penilaian (higher/lower_better) framework — MDGs / SDGs (untuk filter Looker Studio) pillar_id, pillar_name — dimensi pilar time_id, year — dimensi waktu value — nilai indikator yoy_change — perubahan absolut YoY (NULLABLE: NULL di tahun pertama) yoy_pct — perubahan relatif YoY dalam % (NULLABLE: NULL di tahun pertama) Kolom framework memungkinkan filter langsung di Looker Studio tanpa join ke dim_indicator. Kolom yoy_change dan yoy_pct memungkinkan analisis tren tahunan langsung di Looker Studio. """ table_name = 'fact_asean_food_security_selected' self.logger.info("\n" + "=" * 80) self.logger.info(f"STEP 8: SAVE TO [DW/Gold] {table_name} -> fs_asean_gold") self.logger.info("=" * 80) try: # Pastikan kolom framework tersedia di df_clean if 'framework' not in self.df_clean.columns: self.logger.warning( " [WARN] Kolom 'framework' tidak ada di df_clean. " "Melakukan join ke dim_indicator sebagai fallback..." ) dim_ind = read_from_bigquery(self.client, 'dim_indicator', layer='gold') if 'framework' in dim_ind.columns: self.df_clean = self.df_clean.merge( dim_ind[['indicator_id', 'framework']], on='indicator_id', how='left' ) self.df_clean['framework'] = self.df_clean['framework'].fillna('MDGs') self.logger.info(" [OK] framework di-join dari dim_indicator") else: self.df_clean['framework'] = 'MDGs' self.logger.warning( " [WARN] dim_indicator juga tidak punya kolom framework. " "Default: MDGs. Jalankan ulang pipeline dari cleaned_layer." ) # Pastikan kolom YoY tersedia — fallback jika calculate_yoy() tidak dipanggil if 'yoy_change' not in self.df_clean.columns or 'yoy_pct' not in self.df_clean.columns: self.logger.warning( " [WARN] Kolom YoY tidak ditemukan. Menjalankan calculate_yoy() sebagai fallback..." ) self.calculate_yoy() analytical_df = self.df_clean[[ 'country_id', 'country_name', 'indicator_id', 'indicator_name', 'direction', 'framework', 'pillar_id', 'pillar_name', 'time_id', 'year', 'value', 'yoy_change', 'yoy_pct', ]].copy() analytical_df = analytical_df.sort_values( ['year', 'country_name', 'pillar_name', 'indicator_name'] ).reset_index(drop=True) # Pastikan tipe data konsisten analytical_df['country_id'] = analytical_df['country_id'].astype(int) analytical_df['country_name'] = analytical_df['country_name'].astype(str) analytical_df['indicator_id'] = analytical_df['indicator_id'].astype(int) analytical_df['indicator_name']= analytical_df['indicator_name'].astype(str) analytical_df['direction'] = analytical_df['direction'].astype(str) analytical_df['framework'] = analytical_df['framework'].astype(str) analytical_df['pillar_id'] = analytical_df['pillar_id'].astype(int) analytical_df['pillar_name'] = analytical_df['pillar_name'].astype(str) analytical_df['time_id'] = analytical_df['time_id'].astype(int) analytical_df['year'] = analytical_df['year'].astype(int) analytical_df['value'] = analytical_df['value'].astype(float) # yoy_change dan yoy_pct tetap float — NULL (NaN) di tahun pertama adalah intentional analytical_df['yoy_change'] = analytical_df['yoy_change'].astype(float) analytical_df['yoy_pct'] = analytical_df['yoy_pct'].astype(float) self.logger.info(f" Kolom yang disimpan: {list(analytical_df.columns)}") self.logger.info(f" Total rows: {len(analytical_df):,}") # Log distribusi framework fw_dist = analytical_df.drop_duplicates('indicator_id')['framework'].value_counts() self.logger.info(f" Framework distribution (per indikator unik):") for fw, cnt in fw_dist.items(): self.logger.info(f" {fw}: {cnt} indicators") # Log statistik YoY yoy_valid = analytical_df['yoy_pct'].notna().sum() yoy_null = analytical_df['yoy_pct'].isna().sum() self.logger.info(f" YoY rows (calculated): {yoy_valid:,}") self.logger.info(f" YoY rows (NULL/base) : {yoy_null:,}") schema = [ bigquery.SchemaField("country_id", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("country_name", "STRING", mode="REQUIRED"), bigquery.SchemaField("indicator_id", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("indicator_name", "STRING", mode="REQUIRED"), bigquery.SchemaField("direction", "STRING", mode="REQUIRED"), bigquery.SchemaField("framework", "STRING", mode="REQUIRED"), bigquery.SchemaField("pillar_id", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("pillar_name", "STRING", mode="REQUIRED"), bigquery.SchemaField("time_id", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("value", "FLOAT", mode="REQUIRED"), # NULLABLE karena tahun pertama per country-indicator tidak memiliki nilai sebelumnya bigquery.SchemaField("yoy_change", "FLOAT", mode="NULLABLE"), bigquery.SchemaField("yoy_pct", "FLOAT", mode="NULLABLE"), ] rows_loaded = load_to_bigquery( self.client, analytical_df, table_name, layer='gold', write_disposition="WRITE_TRUNCATE", schema=schema ) self.pipeline_metadata['rows_loaded'] = rows_loaded log_update(self.client, 'DW', table_name, 'full_load', rows_loaded) metadata = { 'source_class' : self.__class__.__name__, 'table_name' : table_name, 'execution_timestamp': self.pipeline_start, 'duration_seconds' : (datetime.now() - self.pipeline_start).total_seconds(), 'rows_fetched' : self.pipeline_metadata['rows_fetched'], 'rows_transformed' : rows_loaded, 'rows_loaded' : rows_loaded, 'completeness_pct' : 100.0, 'config_snapshot' : json.dumps({ 'start_year' : self.start_year, 'end_year' : self.end_year, 'fixed_countries': len(self.selected_country_ids), 'no_gaps' : True, 'layer' : 'gold', 'columns' : ( 'id + name + direction + framework + value + ' 'yoy_change + yoy_pct (Looker Studio ready)' ) }), 'validation_metrics' : json.dumps({ 'fixed_countries' : len(self.selected_country_ids), 'total_indicators': int(self.df_clean['indicator_id'].nunique()), 'framework_dist' : fw_dist.to_dict(), 'yoy_rows_valid' : int(yoy_valid), 'yoy_rows_null' : int(yoy_null), }) } save_etl_metadata(self.client, metadata) self.logger.info( f" {table_name}: {rows_loaded:,} rows -> [DW/Gold] fs_asean_gold" ) self.logger.info(f" Metadata -> [AUDIT] etl_metadata") return rows_loaded except Exception as e: self.logger.error(f"Error saving: {e}") raise def run(self): self.pipeline_start = datetime.now() self.pipeline_metadata['start_time'] = self.pipeline_start self.logger.info("\n" + "=" * 80) self.logger.info("Output: fact_asean_food_security_selected -> fs_asean_gold") self.logger.info("Kolom: country_id/name, indicator_id/name, direction, framework,") self.logger.info(" pillar_id/name, time_id, year, value, yoy_change, yoy_pct") self.logger.info("=" * 80) self.load_source_data() self.determine_year_boundaries() self.filter_complete_indicators_per_country() self.select_countries_with_all_pillars() self.filter_indicators_consistent_across_fixed_countries() self.verify_no_gaps() self.calculate_yoy() # <-- Step 6b: hitung YoY self.analyze_indicator_availability_by_year() self.save_analytical_table() self.pipeline_end = datetime.now() duration = (self.pipeline_end - self.pipeline_start).total_seconds() self.logger.info("\n" + "=" * 80) self.logger.info("COMPLETED") self.logger.info("=" * 80) self.logger.info(f" Duration : {duration:.2f}s") self.logger.info(f" Year Range : {self.start_year}-{self.end_year}") self.logger.info(f" Countries : {len(self.selected_country_ids)}") self.logger.info(f" Indicators : {self.df_clean['indicator_id'].nunique()}") self.logger.info(f" Rows Loaded: {self.pipeline_metadata['rows_loaded']:,}") # ============================================================================= # AIRFLOW TASK FUNCTION # ============================================================================= def run_analytical_layer(): """ Airflow task: Build fact_asean_food_security_selected dari fact_food_security + dims. Dipanggil setelah dimensional_model_to_gold selesai. """ from scripts.bigquery_config import get_bigquery_client client = get_bigquery_client() loader = AnalyticalLayerLoader(client) loader.run() print(f"Analytical layer loaded: {loader.pipeline_metadata['rows_loaded']:,} rows") # ============================================================================= # MAIN EXECUTION # ============================================================================= if __name__ == "__main__": print("=" * 80) print("Output: fact_asean_food_security_selected -> fs_asean_gold") print("=" * 80) logger = setup_logging() client = get_bigquery_client() loader = AnalyticalLayerLoader(client) loader.run() print("\n" + "=" * 80) print("[OK] COMPLETED") print("=" * 80)