Files
airflow-coolify/scripts/bigquery_raw_layer.py
2026-03-12 14:57:30 +07:00

797 lines
34 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
BIGQUERY RAW LAYER ETL
Kimball Data Warehouse Architecture
Kimball ETL Flow yang dijalankan file ini:
FAODataSource → EXTRACT & LOAD ke RAW layer (Bronze) : raw_fao
WorldBankDataSource → EXTRACT & LOAD ke RAW layer (Bronze) : raw_worldbank
UNICEFDataSource → EXTRACT & LOAD ke RAW layer (Bronze) : raw_unicef
StagingDataIntegration:
- READ dari RAW layer (Bronze) : raw_fao, raw_worldbank, raw_unicef
- LOAD ke STAGING layer (Silver) : staging_integrated
- LOG ke AUDIT layer (Audit) : etl_logs, etl_metadata
Classes:
IndicatorMatcher — Fuzzy matching indikator antar sumber data
FAODataSource — ETL data FAO Food Security
WorldBankDataSource — ETL data World Bank
UNICEFDataSource — ETL data UNICEF
StagingDataIntegration — Integrasi & standardisasi ke Staging layer
Usage:
python bigquery_raw_layer.py
"""
import pandas as pd
import numpy as np
from datetime import datetime
import logging
from typing import List, Dict, Optional, Union
import json
from functools import lru_cache
from pathlib import Path
from difflib import SequenceMatcher
import re
from scripts.bigquery_config import get_bigquery_client, CONFIG, EXPORTS_DIR, LOGS_DIR, get_table_id
from scripts.bigquery_helpers import (
log_update,
load_to_bigquery,
read_from_bigquery,
setup_logging,
save_etl_metadata,
get_staging_schema
)
from scripts.bigquery_datasource import DataSource
from google.cloud import bigquery
# INDICATOR MATCHER
class IndicatorMatcher:
CORE_KEYWORDS = {
'fat' : ['fat', 'lipid'],
'protein' : ['protein'],
'calorie' : ['calorie', 'caloric', 'kcal', 'energy intake'],
'energy' : ['energy', 'dietary energy consumption'],
'stunting' : ['stunting', 'stunted', 'height for age'],
'wasting' : ['wasting', 'wasted', 'weight for height'],
'underweight' : ['underweight', 'weight for age'],
'overweight' : ['overweight', 'overfed'],
'obesity' : ['obesity', 'obese'],
'anemia' : ['anemia', 'anaemia', 'hemoglobin', 'haemoglobin'],
'malnutrition' : ['malnutrition', 'undernourishment', 'malnourished', 'undernourished'],
'breastfeeding': ['breastfeeding', 'breast feeding'],
'birthweight' : ['birthweight', 'birth weight', 'low birth weight'],
'immunization' : ['immunization', 'immunisation', 'vaccination', 'vaccine'],
'gdp' : ['gdp', 'gross domestic product'],
'poverty' : ['poverty', 'poor', 'poverty line'],
'inequality' : ['inequality', 'gini'],
'water' : ['water', 'drinking water', 'clean water', 'safe water'],
'sanitation' : ['sanitation', 'toilet', 'improved sanitation'],
'electricity' : ['electricity', 'electric', 'power'],
'healthcare' : ['healthcare', 'health facility', 'hospital'],
'governance' : ['governance', 'government effectiveness'],
'corruption' : ['corruption', 'transparency'],
'stability' : ['political stability', 'stability', 'conflict']
}
QUALIFIERS = {
'at_least' : ['at least', 'minimum', 'or more', 'or better'],
'basic' : ['basic'],
'improved' : ['improved'],
'safely_managed': ['safely managed', 'safe'],
'exclusive' : ['exclusive', 'exclusively'],
'severe' : ['severe', 'severely'],
'moderate' : ['moderate'],
'mild' : ['mild'],
'children' : ['children', 'child', 'under 5', 'under five', 'u5'],
'women' : ['women', 'female', 'reproductive age'],
'adults' : ['adults', 'adult'],
'population' : ['population', 'people', 'persons'],
'household' : ['household', 'households']
}
def __init__(self):
self.threshold = CONFIG['matching']['threshold']
self.weights = CONFIG['matching']['weights']
self.penalties = CONFIG['matching']['penalties']
self.logger = logging.getLogger(self.__class__.__name__)
@staticmethod
@lru_cache(maxsize=1024)
def clean_text(text: str) -> str:
if pd.isna(text):
return ""
text = str(text).lower()
text = re.sub(r'[^\w\s\(\)]', ' ', text)
return ' '.join(text.split())
@classmethod
@lru_cache(maxsize=512)
def extract_keywords(cls, text: str) -> tuple:
text_clean = cls.clean_text(text)
return tuple(key for key, variants in cls.CORE_KEYWORDS.items()
if any(v in text_clean for v in variants))
@classmethod
@lru_cache(maxsize=512)
def detect_qualifiers(cls, text: str) -> frozenset:
text_clean = cls.clean_text(text)
return frozenset(q for q, variants in cls.QUALIFIERS.items()
if any(v in text_clean for v in variants))
@lru_cache(maxsize=2048)
def calculate_similarity(self, text1: str, text2: str) -> float:
if text1 == text2:
return 1.0
clean1 = self.clean_text(text1)
clean2 = self.clean_text(text2)
keywords1 = set(self.extract_keywords(text1))
keywords2 = set(self.extract_keywords(text2))
if keywords1 and keywords2 and not (keywords1 & keywords2):
return 0.0
core_score = len(keywords1 & keywords2) / max(len(keywords1), len(keywords2), 1)
base_score = SequenceMatcher(None, clean1, clean2).ratio()
words1, words2 = set(clean1.split()), set(clean2.split())
overlap = len(words1 & words2) / max(len(words1), len(words2), 1)
w = self.weights
final_score = (core_score * w['keyword'] +
base_score * w['string_similarity'] +
overlap * w['word_overlap'])
quals1 = self.detect_qualifiers(text1)
quals2 = self.detect_qualifiers(text2)
p = self.penalties
if ('at_least' in quals1) != ('at_least' in quals2): final_score *= p['qualifier_mismatch']
if ('exclusive' in quals1) != ('exclusive' in quals2): final_score *= p['qualifier_mismatch']
sev1 = {'severe', 'moderate', 'mild'} & quals1
sev2 = {'severe', 'moderate', 'mild'} & quals2
if sev1 != sev2 and (sev1 or sev2): final_score *= p['severity_mismatch']
tgt1 = {'children', 'women', 'adults'} & quals1
tgt2 = {'children', 'women', 'adults'} & quals2
if tgt1 != tgt2 and (tgt1 or tgt2): final_score *= p['target_mismatch']
lvl1 = {'basic', 'improved', 'safely_managed'} & quals1
lvl2 = {'basic', 'improved', 'safely_managed'} & quals2
if lvl1 != lvl2 and (lvl1 or lvl2): final_score *= p['service_level_mismatch']
return final_score
def match_indicators(self, source_indicators, target_indicators,
threshold=None, id_col='id', name_col='value', deduplicate=True):
if threshold is None:
threshold = self.threshold
all_matches = []
for source in sorted(source_indicators):
best = self._find_best_match(source, target_indicators, threshold, id_col, name_col)
if best:
all_matches.append({
'source_indicator': source,
'target_indicator': best['name'],
'target_code' : best['code'],
'similarity_score': round(best['similarity'] * 100, 1)
})
if deduplicate and all_matches:
all_matches = self._deduplicate_matches(all_matches)
return all_matches
def _find_best_match(self, source, targets, threshold, id_col, name_col):
best = None
best_score = threshold
if isinstance(targets, pd.DataFrame):
for _, row in targets.iterrows():
score = self.calculate_similarity(source, row[name_col])
if score > best_score:
best_score = score
best = {'code': row[id_col], 'name': row[name_col]}
else:
for target in targets:
score = self.calculate_similarity(source, target)
if score > best_score:
best_score = score
best = {'code': None, 'name': target}
return None if best is None else {**best, 'similarity': best_score}
def _deduplicate_matches(self, matches):
df = pd.DataFrame(matches).sort_values('similarity_score', ascending=False)
dup_col = 'target_code' if df['target_code'].notna().any() else 'target_indicator'
return df.drop_duplicates(subset=dup_col, keep='first').to_dict('records')
# FAO DATA SOURCE → RAW LAYER (Bronze)
class FAODataSource(DataSource):
"""
FAO Food Security Data Source (BigQuery version)
Menggunakan bulk download karena faostat API butuh autentikasi
"""
def __init__(self, client: bigquery.Client = None):
super().__init__(client)
self.table_name = 'raw_fao'
self.domain_code = 'FS'
self.matcher = IndicatorMatcher()
self.logger.propagate = False
self.download_url = (
"https://bulks-faostat.fao.org/production/"
"Food_Security_Data_E_All_Data_(Normalized).zip"
)
def fetch_data(self) -> pd.DataFrame:
import requests
import zipfile
import io
print(" Downloading FAO Food Security dataset...")
response = requests.get(self.download_url, timeout=120)
response.raise_for_status()
with zipfile.ZipFile(io.BytesIO(response.content)) as z:
csv_name = [f for f in z.namelist() if f.endswith('.csv')][0]
df = pd.read_csv(z.open(csv_name), encoding='latin-1')
if 'Area' in df.columns:
df = df[df['Area'].isin(self.asean_countries)].copy()
print(f" Raw rows after ASEAN filter: {len(df):,}")
return df
def transform_data(self, df: pd.DataFrame) -> pd.DataFrame:
if 'Element' in df.columns:
df = df[df['Element'] == 'Value'].copy()
column_mapping = {
'Area' : 'country',
'Year' : 'year',
'Item' : 'indicator',
'Value': 'value',
'Unit' : 'unit'
}
df = df.rename(columns={k: v for k, v in column_mapping.items() if k in df.columns})
keep_cols = [c for c in ['country', 'year', 'indicator', 'value', 'unit'] if c in df.columns]
df = df[keep_cols].copy()
if all(col in df.columns for col in ['indicator', 'country', 'year']):
df = df.sort_values(['indicator', 'country', 'year']).reset_index(drop=True)
return df
# WORLD BANK DATA SOURCE → RAW LAYER (Bronze)
import wbgapi as wb
class WorldBankDataSource(DataSource):
def __init__(self, client: bigquery.Client, fao_indicators: List[str]):
super().__init__(client)
self.table_name = 'raw_worldbank'
self.fao_indicators = fao_indicators
self.asean_iso = CONFIG['asean_iso_codes']
self.matching_results = []
self.matcher = IndicatorMatcher()
self.logger.propagate = False
def fetch_data(self) -> Dict:
wb_indicators = pd.DataFrame(list(wb.series.list()))
matches = self.matcher.match_indicators(
self.fao_indicators, wb_indicators,
threshold=CONFIG['matching']['threshold'],
id_col='id', name_col='value', deduplicate=True
)
self.matching_results = [{
'indikator_fao' : m['source_indicator'],
'indikator_wb' : m['target_indicator'],
'kode_wb' : m['target_code'],
'similarity_persen': m['similarity_score']
} for m in matches]
wb_data_dict = {}
for item in self.matching_results:
try:
data = wb.data.DataFrame(item['kode_wb'], self.asean_iso, numericTimeKeys=True)
wb_data_dict[item['indikator_fao']] = data
except Exception:
pass
return wb_data_dict
def transform_data(self, wb_data_dict: Dict) -> pd.DataFrame:
all_data = []
for fao_indicator, df_wide in wb_data_dict.items():
info = next(i for i in self.matching_results if i['indikator_fao'] == fao_indicator)
temp = df_wide.reset_index()
temp.insert(0, 'indicator_wb_original', info['indikator_wb'])
temp.insert(1, 'indicator_fao', fao_indicator)
temp.insert(2, 'wb_code', info['kode_wb'])
all_data.append(temp)
if not all_data:
return pd.DataFrame()
df_combined = pd.concat(all_data, ignore_index=True)
id_vars = ['indicator_wb_original', 'indicator_fao', 'wb_code', 'economy']
value_vars = [c for c in df_combined.columns if c not in id_vars]
df_long = df_combined.melt(
id_vars=id_vars, value_vars=value_vars,
var_name='year', value_name='value'
).rename(columns={'economy': 'country'})
df_long['year'] = df_long['year'].astype(int)
df_long = df_long[['indicator_wb_original', 'indicator_fao', 'wb_code',
'country', 'year', 'value']]
return df_long.sort_values(['indicator_fao', 'country', 'year']).reset_index(drop=True)
# UNICEF DATA SOURCE → RAW LAYER (Bronze)
import requests
import time
class UNICEFDataSource(DataSource):
def __init__(self, client: bigquery.Client, fao_indicators: List[str]):
super().__init__(client)
self.table_name = 'raw_unicef'
self.fao_indicators = fao_indicators
self.base_url = "https://sdmx.data.unicef.org/ws/public/sdmxapi/rest"
self.datasets = CONFIG['unicef_datasets']
self.asean_keywords = ['Indonesia', 'Malaysia', 'Thailand', 'Vietnam', 'Viet Nam',
'Philippines', 'Singapore', 'Myanmar', 'Cambodia', 'Lao', 'Brunei']
self.matching_results = []
self.matcher = IndicatorMatcher()
self.logger.propagate = False
def fetch_data(self) -> pd.DataFrame:
all_data = []
for dataset_code, dataset_name in self.datasets.items():
try:
url = f"{self.base_url}/data/{dataset_code}/all/?format=sdmx-json"
response = requests.get(url, timeout=30)
response.raise_for_status()
data_json = response.json()
series_data = data_json['data']['dataSets'][0]['series']
dimensions = data_json['data']['structure']['dimensions']
data_list = []
for series_key, series_value in series_data.items():
indices = series_key.split(':')
row_data = {'dataset': dataset_code}
for i, dim in enumerate(dimensions['series']):
row_data[dim['id']] = dim['values'][int(indices[i])]['name']
for obs_key, obs_value in series_value.get('observations', {}).items():
obs_row = row_data.copy()
for i, dim in enumerate(dimensions['observation']):
obs_row[dim['id']] = dim['values'][int(obs_key.split(':')[i])]['id']
obs_row['value'] = obs_value[0]
data_list.append(obs_row)
df_temp = pd.DataFrame(data_list)
if 'REF_AREA' in df_temp.columns:
asean_found = [c for c in df_temp['REF_AREA'].unique()
if any(k.lower() in c.lower() for k in self.asean_keywords)]
df_temp = df_temp[df_temp['REF_AREA'].isin(asean_found)]
if len(df_temp) > 0:
all_data.append(df_temp)
time.sleep(0.5)
except Exception:
pass
return pd.concat(all_data, ignore_index=True) if all_data else pd.DataFrame()
def transform_data(self, df: pd.DataFrame) -> pd.DataFrame:
if df.empty:
return df
indicator_col = next((col for col in df.columns if 'indicator' in col.lower()), None)
if not indicator_col:
return pd.DataFrame()
unicef_indicators = df[indicator_col].unique()
matches = self.matcher.match_indicators(
self.fao_indicators, list(unicef_indicators),
threshold=CONFIG['matching']['threshold'], deduplicate=True
)
self.matching_results = []
for match in matches:
matched_rows = df[df[indicator_col] == match['target_indicator']]
if len(matched_rows) > 0:
self.matching_results.append({
'indikator_fao' : match['source_indicator'],
'indikator_unicef': match['target_indicator'],
'unicef_dataset' : matched_rows['dataset'].iloc[0],
'similarity_persen': match['similarity_score']
})
if not self.matching_results:
return pd.DataFrame()
unicef_matched = [i['indikator_unicef'] for i in self.matching_results]
df_filtered = df[df[indicator_col].isin(unicef_matched)].copy()
df_filtered = df_filtered.rename(columns={
indicator_col: 'indicator_unicef_original',
'REF_AREA' : 'country',
'TIME_PERIOD': 'year',
'value' : 'value'
})
unicef_to_fao = {i['indikator_unicef']: i['indikator_fao'] for i in self.matching_results}
df_filtered['indicator_fao'] = df_filtered['indicator_unicef_original'].map(unicef_to_fao)
return df_filtered
# STAGING DATA INTEGRATION → STAGING LAYER (Silver)
class StagingDataIntegration:
"""
Staging Data Integration (BigQuery version)
Input : RAW layer (Bronze) — raw_fao, raw_worldbank, raw_unicef
Output : STAGING layer (Silver) — staging_integrated
Audit : etl_logs, etl_metadata (Audit → fs_asean_audit)
"""
def __init__(self, client: bigquery.Client):
self.client = client
self.logger = logging.getLogger(self.__class__.__name__)
self.logger.propagate = False
self.staging_table = 'staging_integrated'
self.metadata = {
'source_class' : self.__class__.__name__,
'table_name' : self.staging_table,
'start_time' : None,
'end_time' : None,
'duration_seconds' : None,
'rows_fetched' : 0,
'rows_transformed' : 0,
'rows_loaded' : 0,
'validation_metrics': {}
}
def load_raw_data(self) -> Dict[str, pd.DataFrame]:
raw_data = {}
try:
raw_data['fao'] = read_from_bigquery(self.client, 'raw_fao', layer='bronze')
except Exception:
raw_data['fao'] = pd.DataFrame()
try:
raw_data['worldbank'] = read_from_bigquery(self.client, 'raw_worldbank', layer='bronze')
except Exception:
raw_data['worldbank'] = pd.DataFrame()
try:
raw_data['unicef'] = read_from_bigquery(self.client, 'raw_unicef', layer='bronze')
except Exception:
raw_data['unicef'] = pd.DataFrame()
return raw_data
def clean_value(self, value):
if pd.isna(value):
return None
value_str = str(value).strip().replace('<', '').replace('>', '').strip()
try:
return float(value_str)
except:
return None
def process_year_range(self, year_value):
if pd.isna(year_value):
return None, None
year_str = str(year_value).strip().replace('', '-').replace('', '-')
if '-' in year_str:
try:
parts = year_str.split('-')
if len(parts) == 2:
start_year = int(parts[0].strip())
end_year = int(parts[1].strip())
return (start_year + end_year) // 2, year_str
else:
return int(float(year_str)), year_str
except:
return None, year_str
else:
try:
single_year = int(float(year_str))
return single_year, str(single_year)
except:
return None, year_str
def truncate_string(self, value, max_length: int) -> str:
if pd.isna(value):
return ''
s = str(value).strip()
return s[:max_length] if len(s) > max_length else s
def standardize_dataframe(self, df: pd.DataFrame, source: str,
indicator_orig_col: str, indicator_std_col: str,
country_col: str, year_col: str, value_col: str,
unit_col: str = None) -> pd.DataFrame:
if df.empty:
return pd.DataFrame()
df_clean = df.copy().dropna(subset=[indicator_orig_col, country_col, year_col])
year_data = df_clean[year_col].apply(self.process_year_range)
units = df_clean[unit_col].fillna('') if (unit_col and unit_col in df_clean.columns) else ''
return pd.DataFrame({
'source' : [self.truncate_string(source, 20)] * len(df_clean),
'indicator_original' : df_clean[indicator_orig_col].apply(lambda x: self.truncate_string(x, 255)),
'indicator_standardized': df_clean[indicator_std_col].apply(lambda x: self.truncate_string(x, 255)),
'country' : df_clean[country_col].apply(lambda x: self.truncate_string(x, 100)),
'year' : [y[0] for y in year_data],
'year_range' : [self.truncate_string(y[1], 20) for y in year_data],
'value' : df_clean[value_col].apply(self.clean_value),
'unit' : [
self.truncate_string(u, 20)
for u in (units if isinstance(units, pd.Series)
else [units] * len(df_clean))
]
})
def standardize_schema(self, raw_data: Dict[str, pd.DataFrame]) -> pd.DataFrame:
integrated_data = []
# FAO
if not raw_data['fao'].empty:
df = raw_data['fao'].copy()
integrated_data.append(self.standardize_dataframe(
df, 'FAO',
indicator_orig_col='Item' if 'Item' in df.columns else 'indicator',
indicator_std_col ='Item' if 'Item' in df.columns else 'indicator',
country_col ='Area' if 'Area' in df.columns else 'country',
year_col ='Year' if 'Year' in df.columns else 'year',
value_col ='Value' if 'Value' in df.columns else 'value',
unit_col ='Unit' if 'Unit' in df.columns else ('unit' if 'unit' in df.columns else None)
))
# World Bank
if not raw_data['worldbank'].empty:
df = raw_data['worldbank'].copy()
integrated_data.append(self.standardize_dataframe(
df, 'World Bank',
indicator_orig_col='indicator_wb_original',
indicator_std_col ='indicator_fao',
country_col ='country',
year_col ='year',
value_col ='value',
unit_col ='unit' if 'unit' in df.columns else None
))
# UNICEF
if not raw_data['unicef'].empty:
df = raw_data['unicef'].copy()
integrated_data.append(self.standardize_dataframe(
df, 'UNICEF',
indicator_orig_col='indicator_unicef_original',
indicator_std_col ='indicator_fao',
country_col ='country',
year_col ='year',
value_col ='value',
unit_col ='unit' if 'unit' in df.columns else None
))
if not integrated_data:
return pd.DataFrame()
df_integrated = pd.concat(integrated_data, ignore_index=True)
df_integrated['year'] = pd.to_numeric(df_integrated['year'], errors='coerce')
df_integrated['value'] = pd.to_numeric(df_integrated['value'], errors='coerce')
for col, max_len in [('source', 20), ('country', 100), ('indicator_original', 255),
('indicator_standardized', 255), ('year_range', 20), ('unit', 20)]:
df_integrated[col] = df_integrated[col].astype(str).apply(
lambda x: self.truncate_string(x, max_len)
)
return df_integrated.sort_values(
['source', 'indicator_standardized', 'country', 'year']
).reset_index(drop=True)
def validate_data(self, df: pd.DataFrame) -> Dict:
validation = {
'total_rows' : int(len(df)),
'total_columns' : int(len(df.columns)),
'duplicate_count' : int(df.duplicated().sum()),
'completeness_pct': float(round((1 - df.isnull().sum().sum() / df.size) * 100, 2)),
'memory_usage_mb' : float(round(df.memory_usage(deep=True).sum() / 1024**2, 2))
}
if 'year' in df.columns:
validation['year_range'] = {
'min' : int(df['year'].min()) if not df['year'].isnull().all() else None,
'max' : int(df['year'].max()) if not df['year'].isnull().all() else None,
'unique_years': int(df['year'].nunique())
}
if 'source' in df.columns:
validation['source_breakdown'] = {
str(k): int(v) for k, v in df['source'].value_counts().to_dict().items()
}
if 'indicator_standardized' in df.columns:
validation['unique_indicators'] = int(df['indicator_standardized'].nunique())
if 'country' in df.columns:
validation['unique_countries'] = int(df['country'].nunique())
validation['schema_validation'] = {
'source_max_length' : int(df['source'].str.len().max()) if 'source' in df.columns else 0,
'indicator_original_max_length' : int(df['indicator_original'].str.len().max()) if 'indicator_original' in df.columns else 0,
'indicator_standardized_max_length': int(df['indicator_standardized'].str.len().max()) if 'indicator_standardized' in df.columns else 0,
'country_max_length' : int(df['country'].str.len().max()) if 'country' in df.columns else 0,
'year_range_max_length' : int(df['year_range'].str.len().max()) if 'year_range' in df.columns else 0,
'unit_max_length' : int(df['unit'].str.len().max()) if 'unit' in df.columns else 0
}
return validation
def save_to_staging(self, df: pd.DataFrame):
try:
schema = get_staging_schema()
load_to_bigquery(
self.client, df, self.staging_table,
layer='silver',
write_disposition="WRITE_TRUNCATE",
schema=schema
)
log_update(self.client, 'STAGING', self.staging_table, 'full_refresh', len(df))
except Exception as e:
print(f"save_to_staging FAILED: {type(e).__name__}: {e}")
log_update(self.client, 'STAGING', self.staging_table, 'full_refresh', 0,
status='failed', error_msg=str(e))
raise
def run(self) -> pd.DataFrame:
self.metadata['start_time'] = datetime.now()
try:
print("Integrating data from all sources...")
raw_data = self.load_raw_data()
total_fetched = sum(len(df) for df in raw_data.values())
self.metadata['rows_fetched'] = total_fetched
print(f" Total rows fetched: {total_fetched:,}")
df_integrated = self.standardize_schema(raw_data)
if df_integrated.empty:
print("No data to integrate")
return df_integrated
self.metadata['rows_transformed'] = len(df_integrated)
validation = self.validate_data(df_integrated)
self.metadata['validation_metrics'] = validation
self.save_to_staging(df_integrated)
self.metadata['rows_loaded'] = len(df_integrated)
self.metadata['end_time'] = datetime.now()
self.metadata['duration_seconds'] = (
self.metadata['end_time'] - self.metadata['start_time']
).total_seconds()
self.metadata['execution_timestamp'] = self.metadata['start_time']
self.metadata['completeness_pct'] = validation.get('completeness_pct', 0)
self.metadata['config_snapshot'] = json.dumps(CONFIG['matching'])
self.metadata['validation_metrics'] = json.dumps(validation)
save_etl_metadata(self.client, self.metadata)
print(f" ✓ Staging Integration completed: {len(df_integrated):,} rows")
print(f" Duration : {self.metadata['duration_seconds']:.2f}s")
if 'source_breakdown' in validation:
for src, cnt in validation['source_breakdown'].items():
print(f" - {src}: {cnt:,} rows")
print(f" Indicators : {validation.get('unique_indicators', '-')}")
print(f" Countries : {validation.get('unique_countries', '-')}")
if 'year_range' in validation:
yr = validation['year_range']
if yr['min'] and yr['max']:
print(f" Year range : {yr['min']}{yr['max']}")
print(f" Completeness: {validation['completeness_pct']:.2f}%")
schema_val = validation['schema_validation']
print(f"\n Schema Validation:")
print(f" - source max length : {schema_val['source_max_length']}/20")
print(f" - indicator_original max length : {schema_val['indicator_original_max_length']}/255")
print(f" - indicator_std max length : {schema_val['indicator_standardized_max_length']}/255")
print(f" - country max length : {schema_val['country_max_length']}/100")
print(f" - year_range max length : {schema_val['year_range_max_length']}/20")
print(f" - unit max length : {schema_val['unit_max_length']}/20")
print(f"\n Metadata → [AUDIT] etl_metadata")
return df_integrated
except Exception as e:
self.logger.error(f"Staging integration failed: {str(e)}")
raise
# MAIN EXECUTION
if __name__ == "__main__":
print("=" * 60)
print("BIGQUERY RAW LAYER ETL")
print("Kimball DW Architecture")
print("=" * 60)
logger = setup_logging()
client = get_bigquery_client()
print("\n[1/4] Loading FAO Food Security Data → RAW (Bronze)...")
fao_source = FAODataSource(client)
df_fao = fao_source.run()
print(f" ✓ raw_fao: {len(df_fao):,} rows")
print(f" Indicators : {df_fao['indicator'].nunique()}")
print(f" Countries : {df_fao['country'].nunique()}")
print(f" Year range : {df_fao['year'].min()}{df_fao['year'].max()}")
fao_indicators = df_fao['indicator'].unique()
print("\n[2/4] Loading World Bank Data → RAW (Bronze)...")
wb_source = WorldBankDataSource(client, list(fao_indicators))
df_wb = wb_source.run()
print(f" ✓ raw_worldbank: {len(df_wb):,} rows")
print(f" Matched indicators : {df_wb['indicator_fao'].nunique()}")
print(f" Countries : {df_wb['country'].nunique()}")
if len(df_wb) > 0:
print(f" Year range : {df_wb['year'].min()}{df_wb['year'].max()}")
print("\n[3/4] Loading UNICEF Data → RAW (Bronze)...")
unicef_source = UNICEFDataSource(client, list(fao_indicators))
df_unicef = unicef_source.run()
print(f" ✓ raw_unicef: {len(df_unicef):,} rows")
if len(df_unicef) > 0:
print(f" Matched indicators : {df_unicef['indicator_fao'].nunique()}")
print(f" Countries : {df_unicef['country'].nunique()}")
print("\n[4/4] Staging Integration → STAGING (Silver)...")
staging = StagingDataIntegration(client)
df_staging = staging.run()
print("\n" + "=" * 60)
print("✓ ETL COMPLETED")
print(f"RAW (Bronze) : raw_fao, raw_worldbank, raw_unicef")
print(f"STAGING (Silver) : staging_integrated")
print(f"AUDIT : etl_logs, etl_metadata")
print("=" * 60)
# AIRFLOW TASK FUNCTIONS
def run_verify_connection():
from scripts.bigquery_config import verify_setup
result = verify_setup()
if not result:
raise Exception("BigQuery connection failed!")
print("BigQuery connection OK")
def run_load_fao():
from scripts.bigquery_config import get_bigquery_client
client = get_bigquery_client()
source = FAODataSource(client)
df = source.run()
print(f"FAO loaded: {len(df):,} rows")
def run_load_worldbank():
from scripts.bigquery_config import get_bigquery_client
client = get_bigquery_client()
fao_source = FAODataSource(client)
df_fao = fao_source.run()
fao_indicators = df_fao['indicator'].unique().tolist()
wb_source = WorldBankDataSource(client, fao_indicators)
df = wb_source.run()
print(f"World Bank loaded: {len(df):,} rows")
def run_load_unicef():
from scripts.bigquery_config import get_bigquery_client
client = get_bigquery_client()
fao_source = FAODataSource(client)
df_fao = fao_source.run()
fao_indicators = df_fao['indicator'].unique().tolist()
unicef_source = UNICEFDataSource(client, fao_indicators)
df = unicef_source.run()
print(f"UNICEF loaded: {len(df):,} rows")
def run_staging_integration():
from scripts.bigquery_config import get_bigquery_client
client = get_bigquery_client()
staging = StagingDataIntegration(client)
df = staging.run()
print(f"Staging integrated: {len(df):,} rows")