""" BIGQUERY ANALYTICAL LAYER - DATA FILTERING FIXED: fact_asean_food_security_selected disimpan di fs_asean_gold (layer='gold') Filtering Order: 1. Load data (single years only) 2. Determine year boundaries (2013 - auto-detected end year) 3. Filter complete indicators PER COUNTRY (auto-detect start year, no gaps) 4. Filter countries with ALL pillars (FIXED SET) 5. Filter indicators with consistent presence across FIXED countries 6. Save analytical table (dengan nama/label lengkap untuk Looker Studio) """ import pandas as pd import numpy as np from datetime import datetime import logging from typing import Dict, List import json import sys if hasattr(sys.stdout, 'reconfigure'): sys.stdout.reconfigure(encoding='utf-8') from scripts.bigquery_config import get_bigquery_client, CONFIG, get_table_id from scripts.bigquery_helpers import ( log_update, load_to_bigquery, read_from_bigquery, setup_logging, truncate_table, save_etl_metadata, ) from google.cloud import bigquery # ============================================================================= # ANALYTICAL LAYER CLASS # ============================================================================= class AnalyticalLayerLoader: """ Analytical Layer Loader for BigQuery Key Logic: 1. Complete per country (no gaps from start_year to end_year) 2. Filter countries with all pillars 3. Ensure indicators have consistent country count across all years 4. Save dengan kolom lengkap (nama + ID) untuk kemudahan Looker Studio Output: fact_asean_food_security_selected -> DW layer (Gold) -> fs_asean_gold """ def __init__(self, client: bigquery.Client): self.client = client self.logger = logging.getLogger(self.__class__.__name__) self.logger.propagate = False self.df_clean = None self.df_indicator = None self.df_country = None self.df_pillar = None self.selected_country_ids = None self.start_year = 2013 self.end_year = None self.baseline_year = 2023 self.pipeline_metadata = { 'source_class' : self.__class__.__name__, 'start_time' : None, 'end_time' : None, 'duration_seconds' : None, 'rows_fetched' : 0, 'rows_transformed' : 0, 'rows_loaded' : 0, 'validation_metrics': {} } self.pipeline_start = None self.pipeline_end = None def load_source_data(self): self.logger.info("\n" + "=" * 80) self.logger.info("STEP 1: LOADING SOURCE DATA from fs_asean_gold") self.logger.info("=" * 80) try: query = f""" SELECT f.country_id, c.country_name, f.indicator_id, i.indicator_name, i.direction, f.pillar_id, p.pillar_name, f.time_id, t.year, t.start_year, t.end_year, t.is_year_range, f.value, f.source_id FROM `{get_table_id('fact_food_security', layer='gold')}` f JOIN `{get_table_id('dim_country', layer='gold')}` c ON f.country_id = c.country_id JOIN `{get_table_id('dim_indicator', layer='gold')}` i ON f.indicator_id = i.indicator_id JOIN `{get_table_id('dim_pillar', layer='gold')}` p ON f.pillar_id = p.pillar_id JOIN `{get_table_id('dim_time', layer='gold')}` t ON f.time_id = t.time_id """ self.logger.info("Loading fact table with dimensions...") self.df_clean = self.client.query(query).result().to_dataframe(create_bqstorage_client=False) self.logger.info(f" Loaded: {len(self.df_clean):,} rows") if 'is_year_range' in self.df_clean.columns: yr = self.df_clean['is_year_range'].value_counts() self.logger.info(f" Breakdown:") self.logger.info(f" Single years (is_year_range=False): {yr.get(False, 0):,}") self.logger.info(f" Year ranges (is_year_range=True): {yr.get(True, 0):,}") self.df_indicator = read_from_bigquery(self.client, 'dim_indicator', layer='gold') self.df_country = read_from_bigquery(self.client, 'dim_country', layer='gold') self.df_pillar = read_from_bigquery(self.client, 'dim_pillar', layer='gold') self.logger.info(f" Indicators: {len(self.df_indicator)}") self.logger.info(f" Countries: {len(self.df_country)}") self.logger.info(f" Pillars: {len(self.df_pillar)}") self.pipeline_metadata['rows_fetched'] = len(self.df_clean) return True except Exception as e: self.logger.error(f"Error loading source data: {e}") raise def determine_year_boundaries(self): self.logger.info("\n" + "=" * 80) self.logger.info("STEP 2: DETERMINE YEAR BOUNDARIES") self.logger.info("=" * 80) df_2023 = self.df_clean[self.df_clean['year'] == self.baseline_year] baseline_indicator_count = df_2023['indicator_id'].nunique() self.logger.info(f"\nBaseline Year: {self.baseline_year}") self.logger.info(f"Baseline Indicator Count: {baseline_indicator_count}") years_sorted = sorted(self.df_clean['year'].unique(), reverse=True) selected_end_year = None for year in years_sorted: if year >= self.baseline_year: df_year = self.df_clean[self.df_clean['year'] == year] year_indicator_count = df_year['indicator_id'].nunique() status = "OK" if year_indicator_count >= baseline_indicator_count else "X" self.logger.info(f" [{status}] Year {int(year)}: {year_indicator_count} indicators") if year_indicator_count >= baseline_indicator_count and selected_end_year is None: selected_end_year = int(year) if selected_end_year is None: selected_end_year = self.baseline_year self.logger.warning(f" [!] No year found, using baseline: {selected_end_year}") else: self.logger.info(f"\n [OK] Selected End Year: {selected_end_year}") self.end_year = selected_end_year original_count = len(self.df_clean) self.df_clean = self.df_clean[ (self.df_clean['year'] >= self.start_year) & (self.df_clean['year'] <= self.end_year) ].copy() self.logger.info(f"\nFiltering {self.start_year}-{self.end_year}:") self.logger.info(f" Rows before: {original_count:,}") self.logger.info(f" Rows after: {len(self.df_clean):,}") return self.df_clean def filter_complete_indicators_per_country(self): self.logger.info("\n" + "=" * 80) self.logger.info("STEP 3: FILTER COMPLETE INDICATORS PER COUNTRY (NO GAPS)") self.logger.info("=" * 80) grouped = self.df_clean.groupby([ 'country_id', 'country_name', 'indicator_id', 'indicator_name', 'pillar_id', 'pillar_name' ]) valid_combinations = [] removed_combinations = [] for (country_id, country_name, indicator_id, indicator_name, pillar_id, pillar_name), group in grouped: years_present = sorted(group['year'].unique()) start_year = int(min(years_present)) end_year_actual = int(max(years_present)) expected_years = list(range(start_year, self.end_year + 1)) missing_years = [y for y in expected_years if y not in years_present] has_gap = len(missing_years) > 0 is_complete = ( end_year_actual >= self.end_year and not has_gap and (self.end_year - start_year) >= 4 ) if is_complete: valid_combinations.append({'country_id': country_id, 'indicator_id': indicator_id}) else: reasons = [] if end_year_actual < self.end_year: reasons.append(f"ends {end_year_actual}") if has_gap: gap_str = str(missing_years[:3])[1:-1] if len(missing_years) > 3: gap_str += "..." reasons.append(f"gap:{gap_str}") if (self.end_year - start_year) < 4: reasons.append(f"span={self.end_year - start_year}") removed_combinations.append({ 'country_name' : country_name, 'indicator_name': indicator_name, 'reasons' : ", ".join(reasons) }) self.logger.info(f"\n [+] Valid: {len(valid_combinations):,}") self.logger.info(f" [-] Removed: {len(removed_combinations):,}") df_valid = pd.DataFrame(valid_combinations) df_valid['key'] = df_valid['country_id'].astype(str) + '_' + df_valid['indicator_id'].astype(str) self.df_clean['key'] = (self.df_clean['country_id'].astype(str) + '_' + self.df_clean['indicator_id'].astype(str)) original_count = len(self.df_clean) self.df_clean = self.df_clean[self.df_clean['key'].isin(df_valid['key'])].copy() self.df_clean = self.df_clean.drop('key', axis=1) self.logger.info(f"\n Rows before: {original_count:,}") self.logger.info(f" Rows after: {len(self.df_clean):,}") self.logger.info(f" Countries: {self.df_clean['country_id'].nunique()}") self.logger.info(f" Indicators: {self.df_clean['indicator_id'].nunique()}") return self.df_clean def select_countries_with_all_pillars(self): self.logger.info("\n" + "=" * 80) self.logger.info("STEP 4: SELECT COUNTRIES WITH ALL PILLARS (FIXED SET)") self.logger.info("=" * 80) total_pillars = self.df_clean['pillar_id'].nunique() country_pillar_count = self.df_clean.groupby(['country_id', 'country_name']).agg({ 'pillar_id' : 'nunique', 'indicator_id': 'nunique', 'year' : lambda x: f"{int(x.min())}-{int(x.max())}" }).reset_index() country_pillar_count.columns = [ 'country_id', 'country_name', 'pillar_count', 'indicator_count', 'year_range' ] for _, row in country_pillar_count.sort_values('pillar_count', ascending=False).iterrows(): status = "[+] KEEP" if row['pillar_count'] == total_pillars else "[-] REMOVE" self.logger.info( f" {status:<12} {row['country_name']:25s} " f"{row['pillar_count']}/{total_pillars} pillars" ) selected_countries = country_pillar_count[country_pillar_count['pillar_count'] == total_pillars] self.selected_country_ids = selected_countries['country_id'].tolist() self.logger.info(f"\n FIXED SET: {len(self.selected_country_ids)} countries") original_count = len(self.df_clean) self.df_clean = self.df_clean[self.df_clean['country_id'].isin(self.selected_country_ids)].copy() self.logger.info(f" Rows before: {original_count:,}") self.logger.info(f" Rows after: {len(self.df_clean):,}") return self.df_clean def filter_indicators_consistent_across_fixed_countries(self): self.logger.info("\n" + "=" * 80) self.logger.info("STEP 5: FILTER INDICATORS WITH CONSISTENT PRESENCE") self.logger.info("=" * 80) indicator_country_start = self.df_clean.groupby([ 'indicator_id', 'indicator_name', 'country_id' ])['year'].min().reset_index() indicator_country_start.columns = ['indicator_id', 'indicator_name', 'country_id', 'start_year'] indicator_max_start = indicator_country_start.groupby([ 'indicator_id', 'indicator_name' ])['start_year'].max().reset_index() indicator_max_start.columns = ['indicator_id', 'indicator_name', 'max_start_year'] valid_indicators = [] removed_indicators = [] for _, ind_row in indicator_max_start.iterrows(): indicator_id = ind_row['indicator_id'] indicator_name = ind_row['indicator_name'] max_start = int(ind_row['max_start_year']) span = self.end_year - max_start if span < 4: removed_indicators.append({ 'indicator_name': indicator_name, 'reason' : f"span={span} < 4" }) continue expected_years = list(range(max_start, self.end_year + 1)) ind_data = self.df_clean[self.df_clean['indicator_id'] == indicator_id] all_years_complete = True problematic_years = [] for year in expected_years: country_count = ind_data[ind_data['year'] == year]['country_id'].nunique() if country_count < len(self.selected_country_ids): all_years_complete = False problematic_years.append(f"{int(year)}({country_count})") if all_years_complete: valid_indicators.append(indicator_id) else: removed_indicators.append({ 'indicator_name': indicator_name, 'reason' : f"missing countries in years: {', '.join(problematic_years[:5])}" }) self.logger.info(f"\n [+] Valid: {len(valid_indicators)}") self.logger.info(f" [-] Removed: {len(removed_indicators)}") if not valid_indicators: raise ValueError("No valid indicators found after filtering!") original_count = len(self.df_clean) self.df_clean = self.df_clean[self.df_clean['indicator_id'].isin(valid_indicators)].copy() self.df_clean = self.df_clean.merge( indicator_max_start[['indicator_id', 'max_start_year']], on='indicator_id', how='left' ) self.df_clean = self.df_clean[self.df_clean['year'] >= self.df_clean['max_start_year']].copy() self.df_clean = self.df_clean.drop('max_start_year', axis=1) self.logger.info(f"\n Rows before: {original_count:,}") self.logger.info(f" Rows after: {len(self.df_clean):,}") self.logger.info(f" Countries: {self.df_clean['country_id'].nunique()}") self.logger.info(f" Indicators: {self.df_clean['indicator_id'].nunique()}") self.logger.info(f" Pillars: {self.df_clean['pillar_id'].nunique()}") return self.df_clean def verify_no_gaps(self): self.logger.info("\n" + "=" * 80) self.logger.info("STEP 6: VERIFY NO GAPS") self.logger.info("=" * 80) expected_countries = len(self.selected_country_ids) verification = self.df_clean.groupby(['indicator_id', 'year'])['country_id'].nunique().reset_index() verification.columns = ['indicator_id', 'year', 'country_count'] all_good = (verification['country_count'] == expected_countries).all() if all_good: self.logger.info(f" VERIFICATION PASSED — all combinations have {expected_countries} countries") else: bad = verification[verification['country_count'] != expected_countries] for _, row in bad.head(10).iterrows(): self.logger.error( f" Indicator {int(row['indicator_id'])}, Year {int(row['year'])}: " f"{int(row['country_count'])} countries (expected {expected_countries})" ) raise ValueError("Gap verification failed!") return True def analyze_indicator_availability_by_year(self): self.logger.info("\n" + "=" * 80) self.logger.info("STEP 7: ANALYZE INDICATOR AVAILABILITY BY YEAR") self.logger.info("=" * 80) year_stats = self.df_clean.groupby('year').agg({ 'indicator_id': 'nunique', 'country_id' : 'nunique' }).reset_index() year_stats.columns = ['year', 'indicator_count', 'country_count'] self.logger.info(f"\n{'Year':<8} {'Indicators':<15} {'Countries':<12} {'Rows'}") self.logger.info("-" * 50) for _, row in year_stats.iterrows(): year = int(row['year']) row_count = len(self.df_clean[self.df_clean['year'] == year]) self.logger.info( f"{year:<8} {int(row['indicator_count']):<15} " f"{int(row['country_count']):<12} {row_count:,}" ) indicator_details = self.df_clean.groupby([ 'indicator_id', 'indicator_name', 'pillar_name', 'direction' ]).agg({'year': ['min', 'max'], 'country_id': 'nunique'}).reset_index() indicator_details.columns = [ 'indicator_id', 'indicator_name', 'pillar_name', 'direction', 'start_year', 'end_year', 'country_count' ] indicator_details['year_range'] = ( indicator_details['start_year'].astype(int).astype(str) + '-' + indicator_details['end_year'].astype(int).astype(str) ) indicator_details = indicator_details.sort_values(['pillar_name', 'start_year', 'indicator_name']) self.logger.info(f"\nTotal Indicators: {len(indicator_details)}") for pillar, count in indicator_details.groupby('pillar_name').size().items(): self.logger.info(f" {pillar}: {count} indicators") self.logger.info(f"\n{'-'*100}") self.logger.info(f"{'ID':<5} {'Indicator Name':<55} {'Pillar':<15} {'Years':<12} {'Dir':<8} {'Countries'}") self.logger.info(f"{'-'*100}") for _, row in indicator_details.iterrows(): direction = 'higher+' if row['direction'] == 'higher_better' else 'lower-' self.logger.info( f"{int(row['indicator_id']):<5} {row['indicator_name'][:52]:<55} " f"{row['pillar_name'][:13]:<15} {row['year_range']:<12} " f"{direction:<8} {int(row['country_count'])}" ) return year_stats def save_analytical_table(self): # --------------------------------------------------------------- # CHANGED: nama tabel baru + kolom lengkap untuk Looker Studio # --------------------------------------------------------------- table_name = 'fact_asean_food_security_selected' self.logger.info("\n" + "=" * 80) self.logger.info(f"STEP 8: SAVE TO [DW/Gold] {table_name} -> fs_asean_gold") self.logger.info("=" * 80) try: # ------------------------------------------------------------------ # Pilih kolom: ID + Nama lengkap + value # Kolom nama memudahkan filtering/slicing langsung di Looker Studio # tanpa perlu join ulang ke tabel dimensi. # ------------------------------------------------------------------ analytical_df = self.df_clean[[ 'country_id', 'country_name', 'indicator_id', 'indicator_name', 'direction', 'pillar_id', 'pillar_name', 'time_id', 'year', 'value', ]].copy() analytical_df = analytical_df.sort_values( ['year', 'country_name', 'pillar_name', 'indicator_name'] ).reset_index(drop=True) # Pastikan tipe data konsisten analytical_df['country_id'] = analytical_df['country_id'].astype(int) analytical_df['country_name'] = analytical_df['country_name'].astype(str) analytical_df['indicator_id'] = analytical_df['indicator_id'].astype(int) analytical_df['indicator_name']= analytical_df['indicator_name'].astype(str) analytical_df['direction'] = analytical_df['direction'].astype(str) analytical_df['pillar_id'] = analytical_df['pillar_id'].astype(int) analytical_df['pillar_name'] = analytical_df['pillar_name'].astype(str) analytical_df['time_id'] = analytical_df['time_id'].astype(int) analytical_df['year'] = analytical_df['year'].astype(int) analytical_df['value'] = analytical_df['value'].astype(float) self.logger.info(f" Kolom yang disimpan: {list(analytical_df.columns)}") self.logger.info(f" Total rows: {len(analytical_df):,}") # Schema BigQuery schema = [ bigquery.SchemaField("country_id", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("country_name", "STRING", mode="REQUIRED"), bigquery.SchemaField("indicator_id", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("indicator_name", "STRING", mode="REQUIRED"), bigquery.SchemaField("direction", "STRING", mode="REQUIRED"), bigquery.SchemaField("pillar_id", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("pillar_name", "STRING", mode="REQUIRED"), bigquery.SchemaField("time_id", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("value", "FLOAT", mode="REQUIRED"), ] rows_loaded = load_to_bigquery( self.client, analytical_df, table_name, layer='gold', write_disposition="WRITE_TRUNCATE", schema=schema ) self.pipeline_metadata['rows_loaded'] = rows_loaded log_update(self.client, 'DW', table_name, 'full_load', rows_loaded) metadata = { 'source_class' : self.__class__.__name__, 'table_name' : table_name, 'execution_timestamp': self.pipeline_start, 'duration_seconds' : (datetime.now() - self.pipeline_start).total_seconds(), 'rows_fetched' : self.pipeline_metadata['rows_fetched'], 'rows_transformed' : rows_loaded, 'rows_loaded' : rows_loaded, 'completeness_pct' : 100.0, 'config_snapshot' : json.dumps({ 'start_year' : self.start_year, 'end_year' : self.end_year, 'fixed_countries': len(self.selected_country_ids), 'no_gaps' : True, 'layer' : 'gold', 'columns' : 'id + name + value (Looker Studio ready)' }), 'validation_metrics' : json.dumps({ 'fixed_countries' : len(self.selected_country_ids), 'total_indicators': int(self.df_clean['indicator_id'].nunique()) }) } save_etl_metadata(self.client, metadata) self.logger.info(f" ✓ {table_name}: {rows_loaded:,} rows → [DW/Gold] fs_asean_gold") self.logger.info(f" Metadata → [AUDIT] etl_metadata") return rows_loaded except Exception as e: self.logger.error(f"Error saving: {e}") raise def run(self): self.pipeline_start = datetime.now() self.pipeline_metadata['start_time'] = self.pipeline_start self.logger.info("\n" + "=" * 80) self.logger.info("Output: fact_asean_food_security_selected → fs_asean_gold") self.logger.info("=" * 80) self.load_source_data() self.determine_year_boundaries() self.filter_complete_indicators_per_country() self.select_countries_with_all_pillars() self.filter_indicators_consistent_across_fixed_countries() self.verify_no_gaps() self.analyze_indicator_availability_by_year() self.save_analytical_table() self.pipeline_end = datetime.now() duration = (self.pipeline_end - self.pipeline_start).total_seconds() self.logger.info("\n" + "=" * 80) self.logger.info("COMPLETED") self.logger.info("=" * 80) self.logger.info(f" Duration : {duration:.2f}s") self.logger.info(f" Year Range : {self.start_year}-{self.end_year}") self.logger.info(f" Countries : {len(self.selected_country_ids)}") self.logger.info(f" Indicators : {self.df_clean['indicator_id'].nunique()}") self.logger.info(f" Rows Loaded: {self.pipeline_metadata['rows_loaded']:,}") # ============================================================================= # AIRFLOW TASK FUNCTION # ============================================================================= def run_analytical_layer(): """ Airflow task: Build fact_asean_food_security_selected dari fact_food_security + dims. Dipanggil setelah dimensional_model_to_gold selesai. """ from scripts.bigquery_config import get_bigquery_client client = get_bigquery_client() loader = AnalyticalLayerLoader(client) loader.run() print(f"Analytical layer loaded: {loader.pipeline_metadata['rows_loaded']:,} rows") # ============================================================================= # MAIN EXECUTION # ============================================================================= if __name__ == "__main__": print("=" * 80) print("Output: fact_asean_food_security_selected → fs_asean_gold") print("=" * 80) logger = setup_logging() client = get_bigquery_client() loader = AnalyticalLayerLoader(client) loader.run() print("\n" + "=" * 80) print("[OK] COMPLETED") print("=" * 80)