834 lines
36 KiB
Python
834 lines
36 KiB
Python
"""
|
||
BIGQUERY RAW LAYER ETL
|
||
Kimball Data Warehouse Architecture
|
||
|
||
Kimball ETL Flow yang dijalankan file ini:
|
||
FAODataSource → EXTRACT & LOAD ke RAW layer (Bronze) : raw_fao
|
||
WorldBankDataSource → EXTRACT & LOAD ke RAW layer (Bronze) : raw_worldbank
|
||
UNICEFDataSource → EXTRACT & LOAD ke RAW layer (Bronze) : raw_unicef
|
||
StagingDataIntegration:
|
||
- READ dari RAW layer (Bronze) : raw_fao, raw_worldbank, raw_unicef
|
||
- LOAD ke STAGING layer (Silver) : staging_integrated
|
||
- LOG ke AUDIT layer (Audit) : etl_logs, etl_metadata
|
||
|
||
Classes:
|
||
IndicatorMatcher — Fuzzy matching indikator antar sumber data
|
||
FAODataSource — ETL data FAO Food Security
|
||
WorldBankDataSource — ETL data World Bank
|
||
UNICEFDataSource — ETL data UNICEF
|
||
StagingDataIntegration — Integrasi & standardisasi ke Staging layer
|
||
|
||
Usage:
|
||
python bigquery_raw_layer.py
|
||
"""
|
||
|
||
import pandas as pd
|
||
import numpy as np
|
||
from datetime import datetime
|
||
import logging
|
||
from typing import List, Dict, Optional, Union
|
||
import json
|
||
from functools import lru_cache
|
||
from pathlib import Path
|
||
from difflib import SequenceMatcher
|
||
import re
|
||
|
||
from bigquery_config import get_bigquery_client, CONFIG, EXPORTS_DIR, LOGS_DIR, get_table_id
|
||
from bigquery_helpers import (
|
||
log_update,
|
||
load_to_bigquery,
|
||
read_from_bigquery,
|
||
setup_logging,
|
||
save_etl_metadata,
|
||
get_staging_schema
|
||
)
|
||
from bigquery_datasource import DataSource
|
||
from google.cloud import bigquery
|
||
|
||
# INDICATOR MATCHER
|
||
|
||
class IndicatorMatcher:
|
||
CORE_KEYWORDS = {
|
||
'fat' : ['fat', 'lipid'],
|
||
'protein' : ['protein'],
|
||
'calorie' : ['calorie', 'caloric', 'kcal', 'energy intake'],
|
||
'energy' : ['energy', 'dietary energy consumption'],
|
||
'stunting' : ['stunting', 'stunted', 'height for age'],
|
||
'wasting' : ['wasting', 'wasted', 'weight for height'],
|
||
'underweight' : ['underweight', 'weight for age'],
|
||
'overweight' : ['overweight', 'overfed'],
|
||
'obesity' : ['obesity', 'obese'],
|
||
'anemia' : ['anemia', 'anaemia', 'hemoglobin', 'haemoglobin'],
|
||
'malnutrition' : ['malnutrition', 'undernourishment', 'malnourished', 'undernourished'],
|
||
'breastfeeding': ['breastfeeding', 'breast feeding'],
|
||
'birthweight' : ['birthweight', 'birth weight', 'low birth weight'],
|
||
'immunization' : ['immunization', 'immunisation', 'vaccination', 'vaccine'],
|
||
'gdp' : ['gdp', 'gross domestic product'],
|
||
'poverty' : ['poverty', 'poor', 'poverty line'],
|
||
'inequality' : ['inequality', 'gini'],
|
||
'water' : ['water', 'drinking water', 'clean water', 'safe water'],
|
||
'sanitation' : ['sanitation', 'toilet', 'improved sanitation'],
|
||
'electricity' : ['electricity', 'electric', 'power'],
|
||
'healthcare' : ['healthcare', 'health facility', 'hospital'],
|
||
'governance' : ['governance', 'government effectiveness'],
|
||
'corruption' : ['corruption', 'transparency'],
|
||
'stability' : ['political stability', 'stability', 'conflict']
|
||
}
|
||
|
||
QUALIFIERS = {
|
||
'at_least' : ['at least', 'minimum', 'or more', 'or better'],
|
||
'basic' : ['basic'],
|
||
'improved' : ['improved'],
|
||
'safely_managed': ['safely managed', 'safe'],
|
||
'exclusive' : ['exclusive', 'exclusively'],
|
||
'severe' : ['severe', 'severely'],
|
||
'moderate' : ['moderate'],
|
||
'mild' : ['mild'],
|
||
'children' : ['children', 'child', 'under 5', 'under five', 'u5'],
|
||
'women' : ['women', 'female', 'reproductive age'],
|
||
'adults' : ['adults', 'adult'],
|
||
'population' : ['population', 'people', 'persons'],
|
||
'household' : ['household', 'households']
|
||
}
|
||
|
||
def __init__(self):
|
||
self.threshold = CONFIG['matching']['threshold']
|
||
self.weights = CONFIG['matching']['weights']
|
||
self.penalties = CONFIG['matching']['penalties']
|
||
self.logger = logging.getLogger(self.__class__.__name__)
|
||
|
||
@staticmethod
|
||
@lru_cache(maxsize=1024)
|
||
def clean_text(text: str) -> str:
|
||
if pd.isna(text):
|
||
return ""
|
||
text = str(text).lower()
|
||
text = re.sub(r'[^\w\s\(\)]', ' ', text)
|
||
return ' '.join(text.split())
|
||
|
||
@classmethod
|
||
@lru_cache(maxsize=512)
|
||
def extract_keywords(cls, text: str) -> tuple:
|
||
text_clean = cls.clean_text(text)
|
||
return tuple(key for key, variants in cls.CORE_KEYWORDS.items()
|
||
if any(v in text_clean for v in variants))
|
||
|
||
@classmethod
|
||
@lru_cache(maxsize=512)
|
||
def detect_qualifiers(cls, text: str) -> frozenset:
|
||
text_clean = cls.clean_text(text)
|
||
return frozenset(q for q, variants in cls.QUALIFIERS.items()
|
||
if any(v in text_clean for v in variants))
|
||
|
||
@lru_cache(maxsize=2048)
|
||
def calculate_similarity(self, text1: str, text2: str) -> float:
|
||
if text1 == text2:
|
||
return 1.0
|
||
clean1 = self.clean_text(text1)
|
||
clean2 = self.clean_text(text2)
|
||
keywords1 = set(self.extract_keywords(text1))
|
||
keywords2 = set(self.extract_keywords(text2))
|
||
if keywords1 and keywords2 and not (keywords1 & keywords2):
|
||
return 0.0
|
||
core_score = len(keywords1 & keywords2) / max(len(keywords1), len(keywords2), 1)
|
||
base_score = SequenceMatcher(None, clean1, clean2).ratio()
|
||
words1, words2 = set(clean1.split()), set(clean2.split())
|
||
overlap = len(words1 & words2) / max(len(words1), len(words2), 1)
|
||
w = self.weights
|
||
final_score = (core_score * w['keyword'] +
|
||
base_score * w['string_similarity'] +
|
||
overlap * w['word_overlap'])
|
||
quals1 = self.detect_qualifiers(text1)
|
||
quals2 = self.detect_qualifiers(text2)
|
||
p = self.penalties
|
||
if ('at_least' in quals1) != ('at_least' in quals2): final_score *= p['qualifier_mismatch']
|
||
if ('exclusive' in quals1) != ('exclusive' in quals2): final_score *= p['qualifier_mismatch']
|
||
sev1 = {'severe', 'moderate', 'mild'} & quals1
|
||
sev2 = {'severe', 'moderate', 'mild'} & quals2
|
||
if sev1 != sev2 and (sev1 or sev2): final_score *= p['severity_mismatch']
|
||
tgt1 = {'children', 'women', 'adults'} & quals1
|
||
tgt2 = {'children', 'women', 'adults'} & quals2
|
||
if tgt1 != tgt2 and (tgt1 or tgt2): final_score *= p['target_mismatch']
|
||
lvl1 = {'basic', 'improved', 'safely_managed'} & quals1
|
||
lvl2 = {'basic', 'improved', 'safely_managed'} & quals2
|
||
if lvl1 != lvl2 and (lvl1 or lvl2): final_score *= p['service_level_mismatch']
|
||
return final_score
|
||
|
||
def match_indicators(self, source_indicators, target_indicators,
|
||
threshold=None, id_col='id', name_col='value', deduplicate=True):
|
||
if threshold is None:
|
||
threshold = self.threshold
|
||
all_matches = []
|
||
for source in sorted(source_indicators):
|
||
best = self._find_best_match(source, target_indicators, threshold, id_col, name_col)
|
||
if best:
|
||
all_matches.append({
|
||
'source_indicator': source,
|
||
'target_indicator': best['name'],
|
||
'target_code' : best['code'],
|
||
'similarity_score': round(best['similarity'] * 100, 1)
|
||
})
|
||
if deduplicate and all_matches:
|
||
all_matches = self._deduplicate_matches(all_matches)
|
||
return all_matches
|
||
|
||
def _find_best_match(self, source, targets, threshold, id_col, name_col):
|
||
best = None
|
||
best_score = threshold
|
||
if isinstance(targets, pd.DataFrame):
|
||
for _, row in targets.iterrows():
|
||
score = self.calculate_similarity(source, row[name_col])
|
||
if score > best_score:
|
||
best_score = score
|
||
best = {'code': row[id_col], 'name': row[name_col]}
|
||
else:
|
||
for target in targets:
|
||
score = self.calculate_similarity(source, target)
|
||
if score > best_score:
|
||
best_score = score
|
||
best = {'code': None, 'name': target}
|
||
return None if best is None else {**best, 'similarity': best_score}
|
||
|
||
def _deduplicate_matches(self, matches):
|
||
df = pd.DataFrame(matches).sort_values('similarity_score', ascending=False)
|
||
dup_col = 'target_code' if df['target_code'].notna().any() else 'target_indicator'
|
||
return df.drop_duplicates(subset=dup_col, keep='first').to_dict('records')
|
||
|
||
|
||
# FAO DATA SOURCE → RAW LAYER (Bronze)
|
||
|
||
class FAODataSource(DataSource):
|
||
"""
|
||
FAO Food Security Data Source (BigQuery version)
|
||
FIXED: Menggunakan bulk download karena faostat API butuh autentikasi
|
||
"""
|
||
|
||
def __init__(self, client: bigquery.Client = None):
|
||
super().__init__(client)
|
||
self.table_name = 'raw_fao'
|
||
self.domain_code = 'FS'
|
||
self.matcher = IndicatorMatcher()
|
||
self.logger.propagate = False
|
||
self.download_url = (
|
||
"https://bulks-faostat.fao.org/production/"
|
||
"Food_Security_Data_E_All_Data_(Normalized).zip"
|
||
)
|
||
|
||
def fetch_data(self) -> pd.DataFrame:
|
||
import requests
|
||
import zipfile
|
||
import io
|
||
|
||
print(" Downloading FAO Food Security dataset...")
|
||
response = requests.get(self.download_url, timeout=120)
|
||
response.raise_for_status()
|
||
|
||
with zipfile.ZipFile(io.BytesIO(response.content)) as z:
|
||
csv_name = [f for f in z.namelist() if f.endswith('.csv')][0]
|
||
df = pd.read_csv(z.open(csv_name), encoding='latin-1')
|
||
|
||
if 'Area' in df.columns:
|
||
df = df[df['Area'].isin(self.asean_countries)].copy()
|
||
|
||
print(f" Raw rows after ASEAN filter: {len(df):,}")
|
||
return df
|
||
|
||
def transform_data(self, df: pd.DataFrame) -> pd.DataFrame:
|
||
if 'Element' in df.columns:
|
||
df = df[df['Element'] == 'Value'].copy()
|
||
|
||
column_mapping = {
|
||
'Area' : 'country',
|
||
'Year' : 'year',
|
||
'Item' : 'indicator',
|
||
'Value': 'value',
|
||
'Unit' : 'unit'
|
||
}
|
||
df = df.rename(columns={k: v for k, v in column_mapping.items() if k in df.columns})
|
||
|
||
keep_cols = [c for c in ['country', 'year', 'indicator', 'value', 'unit'] if c in df.columns]
|
||
df = df[keep_cols].copy()
|
||
|
||
if all(col in df.columns for col in ['indicator', 'country', 'year']):
|
||
df = df.sort_values(['indicator', 'country', 'year']).reset_index(drop=True)
|
||
|
||
return df
|
||
|
||
|
||
# WORLD BANK DATA SOURCE → RAW LAYER (Bronze)
|
||
|
||
import wbgapi as wb
|
||
|
||
class WorldBankDataSource(DataSource):
|
||
|
||
def __init__(self, client: bigquery.Client, fao_indicators: List[str]):
|
||
super().__init__(client)
|
||
self.table_name = 'raw_worldbank'
|
||
self.fao_indicators = fao_indicators
|
||
self.asean_iso = CONFIG['asean_iso_codes']
|
||
self.matching_results = []
|
||
self.matcher = IndicatorMatcher()
|
||
self.logger.propagate = False
|
||
|
||
def fetch_data(self) -> Dict:
|
||
wb_indicators = pd.DataFrame(list(wb.series.list()))
|
||
matches = self.matcher.match_indicators(
|
||
self.fao_indicators, wb_indicators,
|
||
threshold=CONFIG['matching']['threshold'],
|
||
id_col='id', name_col='value', deduplicate=True
|
||
)
|
||
self.matching_results = [{
|
||
'indikator_fao' : m['source_indicator'],
|
||
'indikator_wb' : m['target_indicator'],
|
||
'kode_wb' : m['target_code'],
|
||
'similarity_persen': m['similarity_score']
|
||
} for m in matches]
|
||
|
||
wb_data_dict = {}
|
||
for item in self.matching_results:
|
||
try:
|
||
data = wb.data.DataFrame(item['kode_wb'], self.asean_iso, numericTimeKeys=True)
|
||
wb_data_dict[item['indikator_fao']] = data
|
||
except Exception:
|
||
pass
|
||
return wb_data_dict
|
||
|
||
def transform_data(self, wb_data_dict: Dict) -> pd.DataFrame:
|
||
all_data = []
|
||
for fao_indicator, df_wide in wb_data_dict.items():
|
||
info = next(i for i in self.matching_results if i['indikator_fao'] == fao_indicator)
|
||
temp = df_wide.reset_index()
|
||
temp.insert(0, 'indicator_wb_original', info['indikator_wb'])
|
||
temp.insert(1, 'indicator_fao', fao_indicator)
|
||
temp.insert(2, 'wb_code', info['kode_wb'])
|
||
all_data.append(temp)
|
||
|
||
if not all_data:
|
||
return pd.DataFrame()
|
||
|
||
df_combined = pd.concat(all_data, ignore_index=True)
|
||
id_vars = ['indicator_wb_original', 'indicator_fao', 'wb_code', 'economy']
|
||
value_vars = [c for c in df_combined.columns if c not in id_vars]
|
||
|
||
df_long = df_combined.melt(
|
||
id_vars=id_vars, value_vars=value_vars,
|
||
var_name='year', value_name='value'
|
||
).rename(columns={'economy': 'country'})
|
||
|
||
df_long['year'] = df_long['year'].astype(int)
|
||
df_long = df_long[['indicator_wb_original', 'indicator_fao', 'wb_code',
|
||
'country', 'year', 'value']]
|
||
return df_long.sort_values(['indicator_fao', 'country', 'year']).reset_index(drop=True)
|
||
|
||
|
||
# UNICEF DATA SOURCE → RAW LAYER (Bronze)
|
||
|
||
import requests
|
||
import time
|
||
|
||
class UNICEFDataSource(DataSource):
|
||
|
||
def __init__(self, client: bigquery.Client, fao_indicators: List[str]):
|
||
super().__init__(client)
|
||
self.table_name = 'raw_unicef'
|
||
self.fao_indicators = fao_indicators
|
||
self.base_url = "https://sdmx.data.unicef.org/ws/public/sdmxapi/rest"
|
||
self.datasets = CONFIG['unicef_datasets']
|
||
self.asean_keywords = ['Indonesia', 'Malaysia', 'Thailand', 'Vietnam', 'Viet Nam',
|
||
'Philippines', 'Singapore', 'Myanmar', 'Cambodia', 'Lao', 'Brunei']
|
||
self.matching_results = []
|
||
self.matcher = IndicatorMatcher()
|
||
self.logger.propagate = False
|
||
|
||
def fetch_data(self) -> pd.DataFrame:
|
||
all_data = []
|
||
for dataset_code, dataset_name in self.datasets.items():
|
||
try:
|
||
url = f"{self.base_url}/data/{dataset_code}/all/?format=sdmx-json"
|
||
response = requests.get(url, timeout=30)
|
||
response.raise_for_status()
|
||
data_json = response.json()
|
||
series_data = data_json['data']['dataSets'][0]['series']
|
||
dimensions = data_json['data']['structure']['dimensions']
|
||
|
||
data_list = []
|
||
for series_key, series_value in series_data.items():
|
||
indices = series_key.split(':')
|
||
row_data = {'dataset': dataset_code}
|
||
for i, dim in enumerate(dimensions['series']):
|
||
row_data[dim['id']] = dim['values'][int(indices[i])]['name']
|
||
for obs_key, obs_value in series_value.get('observations', {}).items():
|
||
obs_row = row_data.copy()
|
||
for i, dim in enumerate(dimensions['observation']):
|
||
obs_row[dim['id']] = dim['values'][int(obs_key.split(':')[i])]['id']
|
||
obs_row['value'] = obs_value[0]
|
||
data_list.append(obs_row)
|
||
|
||
df_temp = pd.DataFrame(data_list)
|
||
if 'REF_AREA' in df_temp.columns:
|
||
asean_found = [c for c in df_temp['REF_AREA'].unique()
|
||
if any(k.lower() in c.lower() for k in self.asean_keywords)]
|
||
df_temp = df_temp[df_temp['REF_AREA'].isin(asean_found)]
|
||
if len(df_temp) > 0:
|
||
all_data.append(df_temp)
|
||
time.sleep(0.5)
|
||
except Exception:
|
||
pass
|
||
|
||
return pd.concat(all_data, ignore_index=True) if all_data else pd.DataFrame()
|
||
|
||
def transform_data(self, df: pd.DataFrame) -> pd.DataFrame:
|
||
if df.empty:
|
||
return df
|
||
indicator_col = next((col for col in df.columns if 'indicator' in col.lower()), None)
|
||
if not indicator_col:
|
||
return pd.DataFrame()
|
||
|
||
unicef_indicators = df[indicator_col].unique()
|
||
matches = self.matcher.match_indicators(
|
||
self.fao_indicators, list(unicef_indicators),
|
||
threshold=CONFIG['matching']['threshold'], deduplicate=True
|
||
)
|
||
|
||
self.matching_results = []
|
||
for match in matches:
|
||
matched_rows = df[df[indicator_col] == match['target_indicator']]
|
||
if len(matched_rows) > 0:
|
||
self.matching_results.append({
|
||
'indikator_fao' : match['source_indicator'],
|
||
'indikator_unicef': match['target_indicator'],
|
||
'unicef_dataset' : matched_rows['dataset'].iloc[0],
|
||
'similarity_persen': match['similarity_score']
|
||
})
|
||
|
||
if not self.matching_results:
|
||
return pd.DataFrame()
|
||
|
||
unicef_matched = [i['indikator_unicef'] for i in self.matching_results]
|
||
df_filtered = df[df[indicator_col].isin(unicef_matched)].copy()
|
||
df_filtered = df_filtered.rename(columns={
|
||
indicator_col: 'indicator_unicef_original',
|
||
'REF_AREA' : 'country',
|
||
'TIME_PERIOD': 'year',
|
||
'value' : 'value'
|
||
})
|
||
unicef_to_fao = {i['indikator_unicef']: i['indikator_fao'] for i in self.matching_results}
|
||
df_filtered['indicator_fao'] = df_filtered['indicator_unicef_original'].map(unicef_to_fao)
|
||
return df_filtered
|
||
|
||
|
||
# STAGING DATA INTEGRATION → STAGING LAYER (Silver)
|
||
|
||
class StagingDataIntegration:
|
||
"""
|
||
Staging Data Integration (BigQuery version)
|
||
|
||
Input : RAW layer (Bronze) — raw_fao, raw_worldbank, raw_unicef
|
||
Output : STAGING layer (Silver) — staging_integrated
|
||
Audit : etl_logs, etl_metadata (Audit → fs_asean_audit)
|
||
"""
|
||
|
||
def __init__(self, client: bigquery.Client):
|
||
self.client = client
|
||
self.logger = logging.getLogger(self.__class__.__name__)
|
||
self.logger.propagate = False
|
||
self.staging_table = 'staging_integrated'
|
||
|
||
self.metadata = {
|
||
'source_class' : self.__class__.__name__,
|
||
'table_name' : self.staging_table,
|
||
'start_time' : None,
|
||
'end_time' : None,
|
||
'duration_seconds' : None,
|
||
'rows_fetched' : 0,
|
||
'rows_transformed' : 0,
|
||
'rows_loaded' : 0,
|
||
'validation_metrics': {}
|
||
}
|
||
|
||
def load_raw_data(self) -> Dict[str, pd.DataFrame]:
|
||
"""Load data dari semua tabel RAW layer (Bronze)"""
|
||
raw_data = {}
|
||
|
||
try:
|
||
raw_data['fao'] = read_from_bigquery(self.client, 'raw_fao', layer='bronze')
|
||
except Exception:
|
||
raw_data['fao'] = pd.DataFrame()
|
||
|
||
try:
|
||
raw_data['worldbank'] = read_from_bigquery(self.client, 'raw_worldbank', layer='bronze')
|
||
except Exception:
|
||
raw_data['worldbank'] = pd.DataFrame()
|
||
|
||
try:
|
||
raw_data['unicef'] = read_from_bigquery(self.client, 'raw_unicef', layer='bronze')
|
||
except Exception:
|
||
raw_data['unicef'] = pd.DataFrame()
|
||
|
||
return raw_data
|
||
|
||
def clean_value(self, value):
|
||
"""Clean dan convert value ke float"""
|
||
if pd.isna(value):
|
||
return None
|
||
value_str = str(value).strip().replace('<', '').replace('>', '').strip()
|
||
try:
|
||
return float(value_str)
|
||
except:
|
||
return None
|
||
|
||
def process_year_range(self, year_value):
|
||
"""
|
||
Process year range dan return (year_int, year_range_str)
|
||
Examples:
|
||
"2020" → (2020, "2020")
|
||
"2020-2021" → (2020, "2020-2021")
|
||
"2019–2021" → (2020, "2019-2021")
|
||
"""
|
||
if pd.isna(year_value):
|
||
return None, None
|
||
|
||
year_str = str(year_value).strip().replace('–', '-').replace('—', '-')
|
||
|
||
if '-' in year_str:
|
||
try:
|
||
parts = year_str.split('-')
|
||
if len(parts) == 2:
|
||
start_year = int(parts[0].strip())
|
||
end_year = int(parts[1].strip())
|
||
return (start_year + end_year) // 2, year_str
|
||
else:
|
||
return int(float(year_str)), year_str
|
||
except:
|
||
return None, year_str
|
||
else:
|
||
try:
|
||
single_year = int(float(year_str))
|
||
return single_year, str(single_year)
|
||
except:
|
||
return None, year_str
|
||
|
||
def truncate_string(self, value, max_length: int) -> str:
|
||
"""Truncate string sesuai varchar constraint"""
|
||
if pd.isna(value):
|
||
return ''
|
||
s = str(value).strip()
|
||
return s[:max_length] if len(s) > max_length else s
|
||
|
||
def standardize_dataframe(self, df: pd.DataFrame, source: str,
|
||
indicator_orig_col: str, indicator_std_col: str,
|
||
country_col: str, year_col: str, value_col: str,
|
||
unit_col: str = None) -> pd.DataFrame:
|
||
"""Standardize dataframe ke schema staging_integrated"""
|
||
if df.empty:
|
||
return pd.DataFrame()
|
||
|
||
df_clean = df.copy().dropna(subset=[indicator_orig_col, country_col, year_col])
|
||
year_data = df_clean[year_col].apply(self.process_year_range)
|
||
units = df_clean[unit_col].fillna('') if (unit_col and unit_col in df_clean.columns) else ''
|
||
|
||
return pd.DataFrame({
|
||
'source' : [self.truncate_string(source, 20)] * len(df_clean),
|
||
'indicator_original' : df_clean[indicator_orig_col].apply(lambda x: self.truncate_string(x, 255)),
|
||
'indicator_standardized': df_clean[indicator_std_col].apply(lambda x: self.truncate_string(x, 255)),
|
||
'country' : df_clean[country_col].apply(lambda x: self.truncate_string(x, 100)),
|
||
'year' : [y[0] for y in year_data],
|
||
'year_range' : [self.truncate_string(y[1], 20) for y in year_data],
|
||
'value' : df_clean[value_col].apply(self.clean_value),
|
||
'unit' : [
|
||
self.truncate_string(u, 20)
|
||
for u in (units if isinstance(units, pd.Series)
|
||
else [units] * len(df_clean))
|
||
]
|
||
})
|
||
|
||
def standardize_schema(self, raw_data: Dict[str, pd.DataFrame]) -> pd.DataFrame:
|
||
"""Standardize schema dari semua sumber data"""
|
||
integrated_data = []
|
||
|
||
# FAO — deteksi kolom (nama asli atau sudah di-rename)
|
||
if not raw_data['fao'].empty:
|
||
df = raw_data['fao'].copy()
|
||
integrated_data.append(self.standardize_dataframe(
|
||
df, 'FAO',
|
||
indicator_orig_col='Item' if 'Item' in df.columns else 'indicator',
|
||
indicator_std_col ='Item' if 'Item' in df.columns else 'indicator',
|
||
country_col ='Area' if 'Area' in df.columns else 'country',
|
||
year_col ='Year' if 'Year' in df.columns else 'year',
|
||
value_col ='Value' if 'Value' in df.columns else 'value',
|
||
unit_col ='Unit' if 'Unit' in df.columns else ('unit' if 'unit' in df.columns else None)
|
||
))
|
||
|
||
# World Bank
|
||
if not raw_data['worldbank'].empty:
|
||
df = raw_data['worldbank'].copy()
|
||
integrated_data.append(self.standardize_dataframe(
|
||
df, 'World Bank',
|
||
indicator_orig_col='indicator_wb_original',
|
||
indicator_std_col ='indicator_fao',
|
||
country_col ='country',
|
||
year_col ='year',
|
||
value_col ='value',
|
||
unit_col ='unit' if 'unit' in df.columns else None
|
||
))
|
||
|
||
# UNICEF
|
||
if not raw_data['unicef'].empty:
|
||
df = raw_data['unicef'].copy()
|
||
integrated_data.append(self.standardize_dataframe(
|
||
df, 'UNICEF',
|
||
indicator_orig_col='indicator_unicef_original',
|
||
indicator_std_col ='indicator_fao',
|
||
country_col ='country',
|
||
year_col ='year',
|
||
value_col ='value',
|
||
unit_col ='unit' if 'unit' in df.columns else None
|
||
))
|
||
|
||
if not integrated_data:
|
||
return pd.DataFrame()
|
||
|
||
df_integrated = pd.concat(integrated_data, ignore_index=True)
|
||
|
||
# Final type conversion
|
||
df_integrated['year'] = pd.to_numeric(df_integrated['year'], errors='coerce')
|
||
df_integrated['value'] = pd.to_numeric(df_integrated['value'], errors='coerce')
|
||
|
||
# Enforce varchar constraints
|
||
for col, max_len in [('source', 20), ('country', 100), ('indicator_original', 255),
|
||
('indicator_standardized', 255), ('year_range', 20), ('unit', 20)]:
|
||
df_integrated[col] = df_integrated[col].astype(str).apply(
|
||
lambda x: self.truncate_string(x, max_len)
|
||
)
|
||
|
||
return df_integrated.sort_values(
|
||
['source', 'indicator_standardized', 'country', 'year']
|
||
).reset_index(drop=True)
|
||
|
||
def validate_data(self, df: pd.DataFrame) -> Dict:
|
||
"""Validate data dan return metrics"""
|
||
validation = {
|
||
'total_rows' : int(len(df)),
|
||
'total_columns' : int(len(df.columns)),
|
||
'duplicate_count' : int(df.duplicated().sum()),
|
||
'completeness_pct': float(round((1 - df.isnull().sum().sum() / df.size) * 100, 2)),
|
||
'memory_usage_mb' : float(round(df.memory_usage(deep=True).sum() / 1024**2, 2))
|
||
}
|
||
|
||
if 'year' in df.columns:
|
||
validation['year_range'] = {
|
||
'min' : int(df['year'].min()) if not df['year'].isnull().all() else None,
|
||
'max' : int(df['year'].max()) if not df['year'].isnull().all() else None,
|
||
'unique_years': int(df['year'].nunique())
|
||
}
|
||
|
||
if 'source' in df.columns:
|
||
validation['source_breakdown'] = {
|
||
str(k): int(v) for k, v in df['source'].value_counts().to_dict().items()
|
||
}
|
||
|
||
if 'indicator_standardized' in df.columns:
|
||
validation['unique_indicators'] = int(df['indicator_standardized'].nunique())
|
||
|
||
if 'country' in df.columns:
|
||
validation['unique_countries'] = int(df['country'].nunique())
|
||
|
||
validation['schema_validation'] = {
|
||
'source_max_length' : int(df['source'].str.len().max()) if 'source' in df.columns else 0,
|
||
'indicator_original_max_length' : int(df['indicator_original'].str.len().max()) if 'indicator_original' in df.columns else 0,
|
||
'indicator_standardized_max_length': int(df['indicator_standardized'].str.len().max()) if 'indicator_standardized' in df.columns else 0,
|
||
'country_max_length' : int(df['country'].str.len().max()) if 'country' in df.columns else 0,
|
||
'year_range_max_length' : int(df['year_range'].str.len().max()) if 'year_range' in df.columns else 0,
|
||
'unit_max_length' : int(df['unit'].str.len().max()) if 'unit' in df.columns else 0
|
||
}
|
||
|
||
return validation
|
||
|
||
def save_to_staging(self, df: pd.DataFrame):
|
||
"""Save data ke staging_integrated table di STAGING layer (Silver)"""
|
||
try:
|
||
schema = get_staging_schema()
|
||
|
||
load_to_bigquery(
|
||
self.client,
|
||
df,
|
||
self.staging_table,
|
||
layer='silver', # → fs_asean_silver
|
||
write_disposition="WRITE_TRUNCATE",
|
||
schema=schema
|
||
)
|
||
|
||
log_update(self.client, 'STAGING', self.staging_table, 'full_refresh', len(df))
|
||
|
||
except Exception as e:
|
||
print(f"save_to_staging FAILED: {type(e).__name__}: {e}")
|
||
log_update(self.client, 'STAGING', self.staging_table, 'full_refresh', 0,
|
||
status='failed', error_msg=str(e))
|
||
raise
|
||
|
||
def run(self) -> pd.DataFrame:
|
||
"""Run staging integration process"""
|
||
self.metadata['start_time'] = datetime.now()
|
||
|
||
try:
|
||
print("Integrating data from all sources...")
|
||
|
||
raw_data = self.load_raw_data()
|
||
total_fetched = sum(len(df) for df in raw_data.values())
|
||
self.metadata['rows_fetched'] = total_fetched
|
||
print(f" Total rows fetched: {total_fetched:,}")
|
||
|
||
df_integrated = self.standardize_schema(raw_data)
|
||
|
||
if df_integrated.empty:
|
||
print("No data to integrate")
|
||
return df_integrated
|
||
|
||
self.metadata['rows_transformed'] = len(df_integrated)
|
||
|
||
validation = self.validate_data(df_integrated)
|
||
self.metadata['validation_metrics'] = validation
|
||
|
||
self.save_to_staging(df_integrated)
|
||
self.metadata['rows_loaded'] = len(df_integrated)
|
||
|
||
self.metadata['end_time'] = datetime.now()
|
||
self.metadata['duration_seconds'] = (
|
||
self.metadata['end_time'] - self.metadata['start_time']
|
||
).total_seconds()
|
||
self.metadata['execution_timestamp'] = self.metadata['start_time']
|
||
self.metadata['completeness_pct'] = validation.get('completeness_pct', 0)
|
||
self.metadata['config_snapshot'] = json.dumps(CONFIG['matching'])
|
||
self.metadata['validation_metrics'] = json.dumps(validation)
|
||
|
||
save_etl_metadata(self.client, self.metadata)
|
||
|
||
# Summary
|
||
print(f" ✓ Staging Integration completed: {len(df_integrated):,} rows")
|
||
print(f" Duration : {self.metadata['duration_seconds']:.2f}s")
|
||
if 'source_breakdown' in validation:
|
||
for src, cnt in validation['source_breakdown'].items():
|
||
print(f" - {src}: {cnt:,} rows")
|
||
print(f" Indicators : {validation.get('unique_indicators', '-')}")
|
||
print(f" Countries : {validation.get('unique_countries', '-')}")
|
||
if 'year_range' in validation:
|
||
yr = validation['year_range']
|
||
if yr['min'] and yr['max']:
|
||
print(f" Year range : {yr['min']}–{yr['max']}")
|
||
print(f" Completeness: {validation['completeness_pct']:.2f}%")
|
||
|
||
schema_val = validation['schema_validation']
|
||
print(f"\n Schema Validation:")
|
||
print(f" - source max length : {schema_val['source_max_length']}/20")
|
||
print(f" - indicator_original max length : {schema_val['indicator_original_max_length']}/255")
|
||
print(f" - indicator_std max length : {schema_val['indicator_standardized_max_length']}/255")
|
||
print(f" - country max length : {schema_val['country_max_length']}/100")
|
||
print(f" - year_range max length : {schema_val['year_range_max_length']}/20")
|
||
print(f" - unit max length : {schema_val['unit_max_length']}/20")
|
||
|
||
print(f"\n Metadata → [AUDIT] etl_metadata")
|
||
|
||
return df_integrated
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"Staging integration failed: {str(e)}")
|
||
raise
|
||
|
||
# MAIN EXECUTION
|
||
|
||
if __name__ == "__main__":
|
||
print("=" * 60)
|
||
print("BIGQUERY RAW LAYER ETL")
|
||
print("Kimball DW Architecture")
|
||
print("=" * 60)
|
||
|
||
logger = setup_logging()
|
||
client = get_bigquery_client()
|
||
|
||
# ── FAO ──────────────────────────────────────────────────────────────────
|
||
print("\n[1/4] Loading FAO Food Security Data → RAW (Bronze)...")
|
||
fao_source = FAODataSource(client)
|
||
df_fao = fao_source.run()
|
||
|
||
print(f" ✓ raw_fao: {len(df_fao):,} rows")
|
||
print(f" Indicators : {df_fao['indicator'].nunique()}")
|
||
print(f" Countries : {df_fao['country'].nunique()}")
|
||
print(f" Year range : {df_fao['year'].min()}–{df_fao['year'].max()}")
|
||
|
||
fao_indicators = df_fao['indicator'].unique()
|
||
|
||
# ── World Bank ────────────────────────────────────────────────────────────
|
||
print("\n[2/4] Loading World Bank Data → RAW (Bronze)...")
|
||
wb_source = WorldBankDataSource(client, list(fao_indicators))
|
||
df_wb = wb_source.run()
|
||
|
||
print(f" ✓ raw_worldbank: {len(df_wb):,} rows")
|
||
print(f" Matched indicators : {df_wb['indicator_fao'].nunique()}")
|
||
print(f" Countries : {df_wb['country'].nunique()}")
|
||
if len(df_wb) > 0:
|
||
print(f" Year range : {df_wb['year'].min()}–{df_wb['year'].max()}")
|
||
|
||
# ── UNICEF ────────────────────────────────────────────────────────────────
|
||
print("\n[3/4] Loading UNICEF Data → RAW (Bronze)...")
|
||
unicef_source = UNICEFDataSource(client, list(fao_indicators))
|
||
df_unicef = unicef_source.run()
|
||
|
||
print(f" ✓ raw_unicef: {len(df_unicef):,} rows")
|
||
if len(df_unicef) > 0:
|
||
print(f" Matched indicators : {df_unicef['indicator_fao'].nunique()}")
|
||
print(f" Countries : {df_unicef['country'].nunique()}")
|
||
|
||
# ── Staging Integration ───────────────────────────────────────────────────
|
||
print("\n[4/4] Staging Integration → STAGING (Silver)...")
|
||
staging = StagingDataIntegration(client)
|
||
df_staging = staging.run()
|
||
|
||
print("\n" + "=" * 60)
|
||
print("✓ ETL COMPLETED")
|
||
print(f"RAW (Bronze) : raw_fao, raw_worldbank, raw_unicef")
|
||
print(f"STAGING (Silver) : staging_integrated")
|
||
print(f"AUDIT : etl_logs, etl_metadata")
|
||
print("=" * 60)
|
||
|
||
# AIRFLOW TASK FUNCTIONS
|
||
|
||
def run_verify_connection():
|
||
from scripts.bigquery_config import verify_setup
|
||
result = verify_setup()
|
||
if not result:
|
||
raise Exception("BigQuery connection failed!")
|
||
print("BigQuery connection OK")
|
||
|
||
def run_load_fao():
|
||
from scripts.bigquery_config import get_bigquery_client
|
||
client = get_bigquery_client()
|
||
source = FAODataSource(client)
|
||
df = source.run()
|
||
print(f"FAO loaded: {len(df):,} rows")
|
||
|
||
def run_load_worldbank():
|
||
from scripts.bigquery_config import get_bigquery_client
|
||
client = get_bigquery_client()
|
||
fao_source = FAODataSource(client)
|
||
df_fao = fao_source.run()
|
||
fao_indicators = df_fao['indicator'].unique().tolist()
|
||
wb_source = WorldBankDataSource(client, fao_indicators)
|
||
df = wb_source.run()
|
||
print(f"World Bank loaded: {len(df):,} rows")
|
||
|
||
def run_load_unicef():
|
||
from scripts.bigquery_config import get_bigquery_client
|
||
client = get_bigquery_client()
|
||
fao_source = FAODataSource(client)
|
||
df_fao = fao_source.run()
|
||
fao_indicators = df_fao['indicator'].unique().tolist()
|
||
unicef_source = UNICEFDataSource(client, fao_indicators)
|
||
df = unicef_source.run()
|
||
print(f"UNICEF loaded: {len(df):,} rows")
|
||
|
||
def run_staging_integration():
|
||
from scripts.bigquery_config import get_bigquery_client
|
||
client = get_bigquery_client()
|
||
staging = StagingDataIntegration(client)
|
||
df = staging.run()
|
||
print(f"Staging integrated: {len(df):,} rows") |