""" BIGQUERY RAW LAYER ETL Kimball Data Warehouse Architecture Kimball ETL Flow yang dijalankan file ini: FAODataSource → EXTRACT & LOAD ke RAW layer (Bronze) : raw_fao WorldBankDataSource → EXTRACT & LOAD ke RAW layer (Bronze) : raw_worldbank UNICEFDataSource → EXTRACT & LOAD ke RAW layer (Bronze) : raw_unicef StagingDataIntegration: - READ dari RAW layer (Bronze) : raw_fao, raw_worldbank, raw_unicef - LOAD ke STAGING layer (Silver) : staging_integrated - LOG ke AUDIT layer (Audit) : etl_logs, etl_metadata Classes: IndicatorMatcher — Fuzzy matching indikator antar sumber data FAODataSource — ETL data FAO Food Security WorldBankDataSource — ETL data World Bank UNICEFDataSource — ETL data UNICEF StagingDataIntegration — Integrasi & standardisasi ke Staging layer Usage: python bigquery_raw_layer.py """ import pandas as pd import numpy as np from datetime import datetime import logging from typing import List, Dict, Optional, Union import json from functools import lru_cache from pathlib import Path from difflib import SequenceMatcher import re from scripts.bigquery_config import get_bigquery_client, CONFIG, EXPORTS_DIR, LOGS_DIR, get_table_id from scripts.bigquery_helpers import ( log_update, load_to_bigquery, read_from_bigquery, setup_logging, save_etl_metadata, get_staging_schema ) from scripts.bigquery_datasource import DataSource from google.cloud import bigquery # INDICATOR MATCHER class IndicatorMatcher: CORE_KEYWORDS = { 'fat' : ['fat', 'lipid'], 'protein' : ['protein'], 'calorie' : ['calorie', 'caloric', 'kcal', 'energy intake'], 'energy' : ['energy', 'dietary energy consumption'], 'stunting' : ['stunting', 'stunted', 'height for age'], 'wasting' : ['wasting', 'wasted', 'weight for height'], 'underweight' : ['underweight', 'weight for age'], 'overweight' : ['overweight', 'overfed'], 'obesity' : ['obesity', 'obese'], 'anemia' : ['anemia', 'anaemia', 'hemoglobin', 'haemoglobin'], 'malnutrition' : ['malnutrition', 'undernourishment', 'malnourished', 'undernourished'], 'breastfeeding': ['breastfeeding', 'breast feeding'], 'birthweight' : ['birthweight', 'birth weight', 'low birth weight'], 'immunization' : ['immunization', 'immunisation', 'vaccination', 'vaccine'], 'gdp' : ['gdp', 'gross domestic product'], 'poverty' : ['poverty', 'poor', 'poverty line'], 'inequality' : ['inequality', 'gini'], 'water' : ['water', 'drinking water', 'clean water', 'safe water'], 'sanitation' : ['sanitation', 'toilet', 'improved sanitation'], 'electricity' : ['electricity', 'electric', 'power'], 'healthcare' : ['healthcare', 'health facility', 'hospital'], 'governance' : ['governance', 'government effectiveness'], 'corruption' : ['corruption', 'transparency'], 'stability' : ['political stability', 'stability', 'conflict'] } QUALIFIERS = { 'at_least' : ['at least', 'minimum', 'or more', 'or better'], 'basic' : ['basic'], 'improved' : ['improved'], 'safely_managed': ['safely managed', 'safe'], 'exclusive' : ['exclusive', 'exclusively'], 'severe' : ['severe', 'severely'], 'moderate' : ['moderate'], 'mild' : ['mild'], 'children' : ['children', 'child', 'under 5', 'under five', 'u5'], 'women' : ['women', 'female', 'reproductive age'], 'adults' : ['adults', 'adult'], 'population' : ['population', 'people', 'persons'], 'household' : ['household', 'households'] } def __init__(self): self.threshold = CONFIG['matching']['threshold'] self.weights = CONFIG['matching']['weights'] self.penalties = CONFIG['matching']['penalties'] self.logger = logging.getLogger(self.__class__.__name__) @staticmethod @lru_cache(maxsize=1024) def clean_text(text: str) -> str: if pd.isna(text): return "" text = str(text).lower() text = re.sub(r'[^\w\s\(\)]', ' ', text) return ' '.join(text.split()) @classmethod @lru_cache(maxsize=512) def extract_keywords(cls, text: str) -> tuple: text_clean = cls.clean_text(text) return tuple(key for key, variants in cls.CORE_KEYWORDS.items() if any(v in text_clean for v in variants)) @classmethod @lru_cache(maxsize=512) def detect_qualifiers(cls, text: str) -> frozenset: text_clean = cls.clean_text(text) return frozenset(q for q, variants in cls.QUALIFIERS.items() if any(v in text_clean for v in variants)) @lru_cache(maxsize=2048) def calculate_similarity(self, text1: str, text2: str) -> float: if text1 == text2: return 1.0 clean1 = self.clean_text(text1) clean2 = self.clean_text(text2) keywords1 = set(self.extract_keywords(text1)) keywords2 = set(self.extract_keywords(text2)) if keywords1 and keywords2 and not (keywords1 & keywords2): return 0.0 core_score = len(keywords1 & keywords2) / max(len(keywords1), len(keywords2), 1) base_score = SequenceMatcher(None, clean1, clean2).ratio() words1, words2 = set(clean1.split()), set(clean2.split()) overlap = len(words1 & words2) / max(len(words1), len(words2), 1) w = self.weights final_score = (core_score * w['keyword'] + base_score * w['string_similarity'] + overlap * w['word_overlap']) quals1 = self.detect_qualifiers(text1) quals2 = self.detect_qualifiers(text2) p = self.penalties if ('at_least' in quals1) != ('at_least' in quals2): final_score *= p['qualifier_mismatch'] if ('exclusive' in quals1) != ('exclusive' in quals2): final_score *= p['qualifier_mismatch'] sev1 = {'severe', 'moderate', 'mild'} & quals1 sev2 = {'severe', 'moderate', 'mild'} & quals2 if sev1 != sev2 and (sev1 or sev2): final_score *= p['severity_mismatch'] tgt1 = {'children', 'women', 'adults'} & quals1 tgt2 = {'children', 'women', 'adults'} & quals2 if tgt1 != tgt2 and (tgt1 or tgt2): final_score *= p['target_mismatch'] lvl1 = {'basic', 'improved', 'safely_managed'} & quals1 lvl2 = {'basic', 'improved', 'safely_managed'} & quals2 if lvl1 != lvl2 and (lvl1 or lvl2): final_score *= p['service_level_mismatch'] return final_score def match_indicators(self, source_indicators, target_indicators, threshold=None, id_col='id', name_col='value', deduplicate=True): if threshold is None: threshold = self.threshold all_matches = [] for source in sorted(source_indicators): best = self._find_best_match(source, target_indicators, threshold, id_col, name_col) if best: all_matches.append({ 'source_indicator': source, 'target_indicator': best['name'], 'target_code' : best['code'], 'similarity_score': round(best['similarity'] * 100, 1) }) if deduplicate and all_matches: all_matches = self._deduplicate_matches(all_matches) return all_matches def _find_best_match(self, source, targets, threshold, id_col, name_col): best = None best_score = threshold if isinstance(targets, pd.DataFrame): for _, row in targets.iterrows(): score = self.calculate_similarity(source, row[name_col]) if score > best_score: best_score = score best = {'code': row[id_col], 'name': row[name_col]} else: for target in targets: score = self.calculate_similarity(source, target) if score > best_score: best_score = score best = {'code': None, 'name': target} return None if best is None else {**best, 'similarity': best_score} def _deduplicate_matches(self, matches): df = pd.DataFrame(matches).sort_values('similarity_score', ascending=False) dup_col = 'target_code' if df['target_code'].notna().any() else 'target_indicator' return df.drop_duplicates(subset=dup_col, keep='first').to_dict('records') # FAO DATA SOURCE → RAW LAYER (Bronze) class FAODataSource(DataSource): """ FAO Food Security Data Source (BigQuery version) Menggunakan bulk download karena faostat API butuh autentikasi """ def __init__(self, client: bigquery.Client = None): super().__init__(client) self.table_name = 'raw_fao' self.domain_code = 'FS' self.matcher = IndicatorMatcher() self.logger.propagate = False self.download_url = ( "https://bulks-faostat.fao.org/production/" "Food_Security_Data_E_All_Data_(Normalized).zip" ) def fetch_data(self) -> pd.DataFrame: import requests import zipfile import io print(" Downloading FAO Food Security dataset...") response = requests.get(self.download_url, timeout=120) response.raise_for_status() with zipfile.ZipFile(io.BytesIO(response.content)) as z: csv_name = [f for f in z.namelist() if f.endswith('.csv')][0] df = pd.read_csv(z.open(csv_name), encoding='latin-1') if 'Area' in df.columns: df = df[df['Area'].isin(self.asean_countries)].copy() print(f" Raw rows after ASEAN filter: {len(df):,}") return df def transform_data(self, df: pd.DataFrame) -> pd.DataFrame: if 'Element' in df.columns: df = df[df['Element'] == 'Value'].copy() column_mapping = { 'Area' : 'country', 'Year' : 'year', 'Item' : 'indicator', 'Value': 'value', 'Unit' : 'unit' } df = df.rename(columns={k: v for k, v in column_mapping.items() if k in df.columns}) keep_cols = [c for c in ['country', 'year', 'indicator', 'value', 'unit'] if c in df.columns] df = df[keep_cols].copy() if all(col in df.columns for col in ['indicator', 'country', 'year']): df = df.sort_values(['indicator', 'country', 'year']).reset_index(drop=True) return df # WORLD BANK DATA SOURCE → RAW LAYER (Bronze) import wbgapi as wb class WorldBankDataSource(DataSource): def __init__(self, client: bigquery.Client, fao_indicators: List[str]): super().__init__(client) self.table_name = 'raw_worldbank' self.fao_indicators = fao_indicators self.asean_iso = CONFIG['asean_iso_codes'] self.matching_results = [] self.matcher = IndicatorMatcher() self.logger.propagate = False def fetch_data(self) -> Dict: wb_indicators = pd.DataFrame(list(wb.series.list())) matches = self.matcher.match_indicators( self.fao_indicators, wb_indicators, threshold=CONFIG['matching']['threshold'], id_col='id', name_col='value', deduplicate=True ) self.matching_results = [{ 'indikator_fao' : m['source_indicator'], 'indikator_wb' : m['target_indicator'], 'kode_wb' : m['target_code'], 'similarity_persen': m['similarity_score'] } for m in matches] wb_data_dict = {} for item in self.matching_results: try: data = wb.data.DataFrame(item['kode_wb'], self.asean_iso, numericTimeKeys=True) wb_data_dict[item['indikator_fao']] = data except Exception: pass return wb_data_dict def transform_data(self, wb_data_dict: Dict) -> pd.DataFrame: all_data = [] for fao_indicator, df_wide in wb_data_dict.items(): info = next(i for i in self.matching_results if i['indikator_fao'] == fao_indicator) temp = df_wide.reset_index() temp.insert(0, 'indicator_wb_original', info['indikator_wb']) temp.insert(1, 'indicator_fao', fao_indicator) temp.insert(2, 'wb_code', info['kode_wb']) all_data.append(temp) if not all_data: return pd.DataFrame() df_combined = pd.concat(all_data, ignore_index=True) id_vars = ['indicator_wb_original', 'indicator_fao', 'wb_code', 'economy'] value_vars = [c for c in df_combined.columns if c not in id_vars] df_long = df_combined.melt( id_vars=id_vars, value_vars=value_vars, var_name='year', value_name='value' ).rename(columns={'economy': 'country'}) df_long['year'] = df_long['year'].astype(int) df_long = df_long[['indicator_wb_original', 'indicator_fao', 'wb_code', 'country', 'year', 'value']] return df_long.sort_values(['indicator_fao', 'country', 'year']).reset_index(drop=True) # UNICEF DATA SOURCE → RAW LAYER (Bronze) import requests import time class UNICEFDataSource(DataSource): def __init__(self, client: bigquery.Client, fao_indicators: List[str]): super().__init__(client) self.table_name = 'raw_unicef' self.fao_indicators = fao_indicators self.base_url = "https://sdmx.data.unicef.org/ws/public/sdmxapi/rest" self.datasets = CONFIG['unicef_datasets'] self.asean_keywords = ['Indonesia', 'Malaysia', 'Thailand', 'Vietnam', 'Viet Nam', 'Philippines', 'Singapore', 'Myanmar', 'Cambodia', 'Lao', 'Brunei'] self.matching_results = [] self.matcher = IndicatorMatcher() self.logger.propagate = False def fetch_data(self) -> pd.DataFrame: all_data = [] for dataset_code, dataset_name in self.datasets.items(): try: url = f"{self.base_url}/data/{dataset_code}/all/?format=sdmx-json" response = requests.get(url, timeout=30) response.raise_for_status() data_json = response.json() series_data = data_json['data']['dataSets'][0]['series'] dimensions = data_json['data']['structure']['dimensions'] data_list = [] for series_key, series_value in series_data.items(): indices = series_key.split(':') row_data = {'dataset': dataset_code} for i, dim in enumerate(dimensions['series']): row_data[dim['id']] = dim['values'][int(indices[i])]['name'] for obs_key, obs_value in series_value.get('observations', {}).items(): obs_row = row_data.copy() for i, dim in enumerate(dimensions['observation']): obs_row[dim['id']] = dim['values'][int(obs_key.split(':')[i])]['id'] obs_row['value'] = obs_value[0] data_list.append(obs_row) df_temp = pd.DataFrame(data_list) if 'REF_AREA' in df_temp.columns: asean_found = [c for c in df_temp['REF_AREA'].unique() if any(k.lower() in c.lower() for k in self.asean_keywords)] df_temp = df_temp[df_temp['REF_AREA'].isin(asean_found)] if len(df_temp) > 0: all_data.append(df_temp) time.sleep(0.5) except Exception: pass return pd.concat(all_data, ignore_index=True) if all_data else pd.DataFrame() def transform_data(self, df: pd.DataFrame) -> pd.DataFrame: if df.empty: return df indicator_col = next((col for col in df.columns if 'indicator' in col.lower()), None) if not indicator_col: return pd.DataFrame() unicef_indicators = df[indicator_col].unique() matches = self.matcher.match_indicators( self.fao_indicators, list(unicef_indicators), threshold=CONFIG['matching']['threshold'], deduplicate=True ) self.matching_results = [] for match in matches: matched_rows = df[df[indicator_col] == match['target_indicator']] if len(matched_rows) > 0: self.matching_results.append({ 'indikator_fao' : match['source_indicator'], 'indikator_unicef': match['target_indicator'], 'unicef_dataset' : matched_rows['dataset'].iloc[0], 'similarity_persen': match['similarity_score'] }) if not self.matching_results: return pd.DataFrame() unicef_matched = [i['indikator_unicef'] for i in self.matching_results] df_filtered = df[df[indicator_col].isin(unicef_matched)].copy() df_filtered = df_filtered.rename(columns={ indicator_col: 'indicator_unicef_original', 'REF_AREA' : 'country', 'TIME_PERIOD': 'year', 'value' : 'value' }) unicef_to_fao = {i['indikator_unicef']: i['indikator_fao'] for i in self.matching_results} df_filtered['indicator_fao'] = df_filtered['indicator_unicef_original'].map(unicef_to_fao) return df_filtered # STAGING DATA INTEGRATION → STAGING LAYER (Silver) class StagingDataIntegration: """ Staging Data Integration (BigQuery version) Input : RAW layer (Bronze) — raw_fao, raw_worldbank, raw_unicef Output : STAGING layer (Silver) — staging_integrated Audit : etl_logs, etl_metadata (Audit → fs_asean_audit) """ def __init__(self, client: bigquery.Client): self.client = client self.logger = logging.getLogger(self.__class__.__name__) self.logger.propagate = False self.staging_table = 'staging_integrated' self.metadata = { 'source_class' : self.__class__.__name__, 'table_name' : self.staging_table, 'start_time' : None, 'end_time' : None, 'duration_seconds' : None, 'rows_fetched' : 0, 'rows_transformed' : 0, 'rows_loaded' : 0, 'validation_metrics': {} } def load_raw_data(self) -> Dict[str, pd.DataFrame]: raw_data = {} try: raw_data['fao'] = read_from_bigquery(self.client, 'raw_fao', layer='bronze') except Exception: raw_data['fao'] = pd.DataFrame() try: raw_data['worldbank'] = read_from_bigquery(self.client, 'raw_worldbank', layer='bronze') except Exception: raw_data['worldbank'] = pd.DataFrame() try: raw_data['unicef'] = read_from_bigquery(self.client, 'raw_unicef', layer='bronze') except Exception: raw_data['unicef'] = pd.DataFrame() return raw_data def clean_value(self, value): if pd.isna(value): return None value_str = str(value).strip().replace('<', '').replace('>', '').strip() try: return float(value_str) except: return None def process_year_range(self, year_value): if pd.isna(year_value): return None, None year_str = str(year_value).strip().replace('–', '-').replace('—', '-') if '-' in year_str: try: parts = year_str.split('-') if len(parts) == 2: start_year = int(parts[0].strip()) end_year = int(parts[1].strip()) return (start_year + end_year) // 2, year_str else: return int(float(year_str)), year_str except: return None, year_str else: try: single_year = int(float(year_str)) return single_year, str(single_year) except: return None, year_str def truncate_string(self, value, max_length: int) -> str: if pd.isna(value): return '' s = str(value).strip() return s[:max_length] if len(s) > max_length else s def standardize_dataframe(self, df: pd.DataFrame, source: str, indicator_orig_col: str, indicator_std_col: str, country_col: str, year_col: str, value_col: str, unit_col: str = None) -> pd.DataFrame: if df.empty: return pd.DataFrame() df_clean = df.copy().dropna(subset=[indicator_orig_col, country_col, year_col]) year_data = df_clean[year_col].apply(self.process_year_range) units = df_clean[unit_col].fillna('') if (unit_col and unit_col in df_clean.columns) else '' return pd.DataFrame({ 'source' : [self.truncate_string(source, 20)] * len(df_clean), 'indicator_original' : df_clean[indicator_orig_col].apply(lambda x: self.truncate_string(x, 255)), 'indicator_standardized': df_clean[indicator_std_col].apply(lambda x: self.truncate_string(x, 255)), 'country' : df_clean[country_col].apply(lambda x: self.truncate_string(x, 100)), 'year' : [y[0] for y in year_data], 'year_range' : [self.truncate_string(y[1], 20) for y in year_data], 'value' : df_clean[value_col].apply(self.clean_value), 'unit' : [ self.truncate_string(u, 20) for u in (units if isinstance(units, pd.Series) else [units] * len(df_clean)) ] }) def standardize_schema(self, raw_data: Dict[str, pd.DataFrame]) -> pd.DataFrame: integrated_data = [] # FAO if not raw_data['fao'].empty: df = raw_data['fao'].copy() integrated_data.append(self.standardize_dataframe( df, 'FAO', indicator_orig_col='Item' if 'Item' in df.columns else 'indicator', indicator_std_col ='Item' if 'Item' in df.columns else 'indicator', country_col ='Area' if 'Area' in df.columns else 'country', year_col ='Year' if 'Year' in df.columns else 'year', value_col ='Value' if 'Value' in df.columns else 'value', unit_col ='Unit' if 'Unit' in df.columns else ('unit' if 'unit' in df.columns else None) )) # World Bank if not raw_data['worldbank'].empty: df = raw_data['worldbank'].copy() integrated_data.append(self.standardize_dataframe( df, 'World Bank', indicator_orig_col='indicator_wb_original', indicator_std_col ='indicator_fao', country_col ='country', year_col ='year', value_col ='value', unit_col ='unit' if 'unit' in df.columns else None )) # UNICEF if not raw_data['unicef'].empty: df = raw_data['unicef'].copy() integrated_data.append(self.standardize_dataframe( df, 'UNICEF', indicator_orig_col='indicator_unicef_original', indicator_std_col ='indicator_fao', country_col ='country', year_col ='year', value_col ='value', unit_col ='unit' if 'unit' in df.columns else None )) if not integrated_data: return pd.DataFrame() df_integrated = pd.concat(integrated_data, ignore_index=True) df_integrated['year'] = pd.to_numeric(df_integrated['year'], errors='coerce') df_integrated['value'] = pd.to_numeric(df_integrated['value'], errors='coerce') for col, max_len in [('source', 20), ('country', 100), ('indicator_original', 255), ('indicator_standardized', 255), ('year_range', 20), ('unit', 20)]: df_integrated[col] = df_integrated[col].astype(str).apply( lambda x: self.truncate_string(x, max_len) ) return df_integrated.sort_values( ['source', 'indicator_standardized', 'country', 'year'] ).reset_index(drop=True) def validate_data(self, df: pd.DataFrame) -> Dict: validation = { 'total_rows' : int(len(df)), 'total_columns' : int(len(df.columns)), 'duplicate_count' : int(df.duplicated().sum()), 'completeness_pct': float(round((1 - df.isnull().sum().sum() / df.size) * 100, 2)), 'memory_usage_mb' : float(round(df.memory_usage(deep=True).sum() / 1024**2, 2)) } if 'year' in df.columns: validation['year_range'] = { 'min' : int(df['year'].min()) if not df['year'].isnull().all() else None, 'max' : int(df['year'].max()) if not df['year'].isnull().all() else None, 'unique_years': int(df['year'].nunique()) } if 'source' in df.columns: validation['source_breakdown'] = { str(k): int(v) for k, v in df['source'].value_counts().to_dict().items() } if 'indicator_standardized' in df.columns: validation['unique_indicators'] = int(df['indicator_standardized'].nunique()) if 'country' in df.columns: validation['unique_countries'] = int(df['country'].nunique()) validation['schema_validation'] = { 'source_max_length' : int(df['source'].str.len().max()) if 'source' in df.columns else 0, 'indicator_original_max_length' : int(df['indicator_original'].str.len().max()) if 'indicator_original' in df.columns else 0, 'indicator_standardized_max_length': int(df['indicator_standardized'].str.len().max()) if 'indicator_standardized' in df.columns else 0, 'country_max_length' : int(df['country'].str.len().max()) if 'country' in df.columns else 0, 'year_range_max_length' : int(df['year_range'].str.len().max()) if 'year_range' in df.columns else 0, 'unit_max_length' : int(df['unit'].str.len().max()) if 'unit' in df.columns else 0 } return validation def save_to_staging(self, df: pd.DataFrame): try: schema = get_staging_schema() load_to_bigquery( self.client, df, self.staging_table, layer='silver', write_disposition="WRITE_TRUNCATE", schema=schema ) log_update(self.client, 'STAGING', self.staging_table, 'full_refresh', len(df)) except Exception as e: print(f"save_to_staging FAILED: {type(e).__name__}: {e}") log_update(self.client, 'STAGING', self.staging_table, 'full_refresh', 0, status='failed', error_msg=str(e)) raise def run(self) -> pd.DataFrame: self.metadata['start_time'] = datetime.now() try: print("Integrating data from all sources...") raw_data = self.load_raw_data() total_fetched = sum(len(df) for df in raw_data.values()) self.metadata['rows_fetched'] = total_fetched print(f" Total rows fetched: {total_fetched:,}") df_integrated = self.standardize_schema(raw_data) if df_integrated.empty: print("No data to integrate") return df_integrated self.metadata['rows_transformed'] = len(df_integrated) validation = self.validate_data(df_integrated) self.metadata['validation_metrics'] = validation self.save_to_staging(df_integrated) self.metadata['rows_loaded'] = len(df_integrated) self.metadata['end_time'] = datetime.now() self.metadata['duration_seconds'] = ( self.metadata['end_time'] - self.metadata['start_time'] ).total_seconds() self.metadata['execution_timestamp'] = self.metadata['start_time'] self.metadata['completeness_pct'] = validation.get('completeness_pct', 0) self.metadata['config_snapshot'] = json.dumps(CONFIG['matching']) self.metadata['validation_metrics'] = json.dumps(validation) save_etl_metadata(self.client, self.metadata) print(f" ✓ Staging Integration completed: {len(df_integrated):,} rows") print(f" Duration : {self.metadata['duration_seconds']:.2f}s") if 'source_breakdown' in validation: for src, cnt in validation['source_breakdown'].items(): print(f" - {src}: {cnt:,} rows") print(f" Indicators : {validation.get('unique_indicators', '-')}") print(f" Countries : {validation.get('unique_countries', '-')}") if 'year_range' in validation: yr = validation['year_range'] if yr['min'] and yr['max']: print(f" Year range : {yr['min']}–{yr['max']}") print(f" Completeness: {validation['completeness_pct']:.2f}%") schema_val = validation['schema_validation'] print(f"\n Schema Validation:") print(f" - source max length : {schema_val['source_max_length']}/20") print(f" - indicator_original max length : {schema_val['indicator_original_max_length']}/255") print(f" - indicator_std max length : {schema_val['indicator_standardized_max_length']}/255") print(f" - country max length : {schema_val['country_max_length']}/100") print(f" - year_range max length : {schema_val['year_range_max_length']}/20") print(f" - unit max length : {schema_val['unit_max_length']}/20") print(f"\n Metadata → [AUDIT] etl_metadata") return df_integrated except Exception as e: self.logger.error(f"Staging integration failed: {str(e)}") raise # MAIN EXECUTION if __name__ == "__main__": print("=" * 60) print("BIGQUERY RAW LAYER ETL") print("Kimball DW Architecture") print("=" * 60) logger = setup_logging() client = get_bigquery_client() print("\n[1/4] Loading FAO Food Security Data → RAW (Bronze)...") fao_source = FAODataSource(client) df_fao = fao_source.run() print(f" ✓ raw_fao: {len(df_fao):,} rows") print(f" Indicators : {df_fao['indicator'].nunique()}") print(f" Countries : {df_fao['country'].nunique()}") print(f" Year range : {df_fao['year'].min()}–{df_fao['year'].max()}") fao_indicators = df_fao['indicator'].unique() print("\n[2/4] Loading World Bank Data → RAW (Bronze)...") wb_source = WorldBankDataSource(client, list(fao_indicators)) df_wb = wb_source.run() print(f" ✓ raw_worldbank: {len(df_wb):,} rows") print(f" Matched indicators : {df_wb['indicator_fao'].nunique()}") print(f" Countries : {df_wb['country'].nunique()}") if len(df_wb) > 0: print(f" Year range : {df_wb['year'].min()}–{df_wb['year'].max()}") print("\n[3/4] Loading UNICEF Data → RAW (Bronze)...") unicef_source = UNICEFDataSource(client, list(fao_indicators)) df_unicef = unicef_source.run() print(f" ✓ raw_unicef: {len(df_unicef):,} rows") if len(df_unicef) > 0: print(f" Matched indicators : {df_unicef['indicator_fao'].nunique()}") print(f" Countries : {df_unicef['country'].nunique()}") print("\n[4/4] Staging Integration → STAGING (Silver)...") staging = StagingDataIntegration(client) df_staging = staging.run() print("\n" + "=" * 60) print("✓ ETL COMPLETED") print(f"RAW (Bronze) : raw_fao, raw_worldbank, raw_unicef") print(f"STAGING (Silver) : staging_integrated") print(f"AUDIT : etl_logs, etl_metadata") print("=" * 60) # AIRFLOW TASK FUNCTIONS def run_verify_connection(): from scripts.bigquery_config import verify_setup result = verify_setup() if not result: raise Exception("BigQuery connection failed!") print("BigQuery connection OK") def run_load_fao(): from scripts.bigquery_config import get_bigquery_client client = get_bigquery_client() source = FAODataSource(client) df = source.run() print(f"FAO loaded: {len(df):,} rows") def run_load_worldbank(): from scripts.bigquery_config import get_bigquery_client client = get_bigquery_client() fao_source = FAODataSource(client) df_fao = fao_source.run() fao_indicators = df_fao['indicator'].unique().tolist() wb_source = WorldBankDataSource(client, fao_indicators) df = wb_source.run() print(f"World Bank loaded: {len(df):,} rows") def run_load_unicef(): from scripts.bigquery_config import get_bigquery_client client = get_bigquery_client() fao_source = FAODataSource(client) df_fao = fao_source.run() fao_indicators = df_fao['indicator'].unique().tolist() unicef_source = UNICEFDataSource(client, fao_indicators) df = unicef_source.run() print(f"UNICEF loaded: {len(df):,} rows") def run_staging_integration(): from scripts.bigquery_config import get_bigquery_client client = get_bigquery_client() staging = StagingDataIntegration(client) df = staging.run() print(f"Staging integrated: {len(df):,} rows")