create cleaned layer

This commit is contained in:
Debby
2026-03-14 23:04:11 +07:00
parent 5fc498e771
commit 0441573b61
2 changed files with 594 additions and 1 deletions

View File

@@ -11,6 +11,11 @@ from scripts.bigquery_raw_layer import (
run_staging_integration,
)
from scripts.bigquery_cleaned_layer import (
run_cleaned_integration,
)
with DAG(
dag_id = "etl_food_security_bigquery",
start_date = datetime(2026, 3, 1),
@@ -43,5 +48,12 @@ with DAG(
task_id = "staging_integration_to_silver",
python_callable = run_staging_integration
)
task_cleaned = PythonOperator(
task_id = "cleaned_integration_to_silver",
python_callable = run_cleaned_integration
)
task_verify >> task_fao >> task_worldbank >> task_unicef >> task_staging
task_verify >> task_fao >> task_worldbank >> task_unicef >> task_staging >> task_cleaned

View File

@@ -0,0 +1,581 @@
"""
BIGQUERY CLEANED LAYER ETL
Kimball Data Warehouse Architecture
Kimball ETL Flow yang dijalankan file ini:
Input : STAGING layer (Silver) — staging_integrated (fs_asean_silver)
Output : STAGING layer (Silver) — cleaned_integrated (fs_asean_silver)
Audit : AUDIT layer — etl_logs, etl_metadata (fs_asean_audit)
Classes:
CleanedDataLoader — Cleaning, enrichment, & load ke Silver layer
Usage:
python bigquery_cleaned_layer.py
"""
import pandas as pd
import numpy as np
from datetime import datetime
import logging
from typing import Dict
import json
from bigquery_config import get_bigquery_client, CONFIG, get_table_id
from bigquery_helpers import (
log_update,
load_to_bigquery,
read_from_bigquery,
setup_logging,
save_etl_metadata
)
from google.cloud import bigquery
# =============================================================================
# LOAD STAGING DATA
# =============================================================================
def load_staging_data(client: bigquery.Client) -> pd.DataFrame:
"""Load data dari staging_integrated (STAGING/Silver layer)."""
print("\nLoading data from staging_integrated (fs_asean_silver)...")
df_staging = read_from_bigquery(client, 'staging_integrated', layer='silver')
print(f" ✓ Loaded : {len(df_staging):,} rows")
print(f" Columns : {len(df_staging.columns)}")
print(f" Sources : {df_staging['source'].nunique()}")
print(f" Indicators : {df_staging['indicator_standardized'].nunique()}")
print(f" Countries : {df_staging['country'].nunique()}")
print(f" Year range : {int(df_staging['year'].min())}-{int(df_staging['year'].max())}")
return df_staging
# =============================================================================
# COLUMN CONSTRAINT HELPERS
# =============================================================================
# Schema constraints — semua varchar max lengths
COLUMN_CONSTRAINTS = {
'source' : 20,
'indicator_original' : 255,
'indicator_standardized': 255,
'country' : 100,
'year_range' : 20,
'unit' : 20,
'pillar' : 20,
'direction' : 15, # 'higher_better'=13, 'lower_better'=12
}
def truncate_string(value, max_length: int) -> str:
"""Truncate string ke max_length, return as-is jika None/NaN."""
if pd.isna(value):
return value
value_str = str(value)
return value_str[:max_length] if len(value_str) > max_length else value_str
def apply_column_constraints(df: pd.DataFrame) -> pd.DataFrame:
"""
Apply column length constraints sesuai schema tabel.
Melaporkan kolom mana yang dipotong dan contohnya.
"""
df_constrained = df.copy()
truncation_report = {}
for column, max_length in COLUMN_CONSTRAINTS.items():
if column not in df_constrained.columns:
continue
mask = (
df_constrained[column].notna() &
(df_constrained[column].astype(str).str.len() > max_length)
)
truncated_count = mask.sum()
if truncated_count > 0:
truncation_report[column] = {
'count' : int(truncated_count),
'max_length': max_length,
'examples' : df_constrained[mask][column].head(3).tolist()
}
df_constrained[column] = df_constrained[column].apply(
lambda x: truncate_string(x, max_length)
)
if truncation_report:
print("\n ⚠ Column Truncations Applied:")
for column, info in truncation_report.items():
print(f" - {column}: {info['count']} values truncated to {info['max_length']} chars")
else:
print("\n ✓ No truncations needed — all values within constraints")
return df_constrained
# =============================================================================
# COUNTRY NAME STANDARDIZATION
# =============================================================================
ASEAN_MAPPING = {
'BRN' : 'Brunei Darussalam',
'BRUNEI' : 'Brunei Darussalam',
'BRUNEI DARUSSALAM' : 'Brunei Darussalam',
'KHM' : 'Cambodia',
'CAMBODIA' : 'Cambodia',
'IDN' : 'Indonesia',
'INDONESIA' : 'Indonesia',
'LAO' : 'Laos',
'LAOS' : 'Laos',
"LAO PEOPLE'S DEMOCRATIC REPUBLIC" : 'Laos',
'LAO PDR' : 'Laos',
'MYS' : 'Malaysia',
'MALAYSIA' : 'Malaysia',
'MMR' : 'Myanmar',
'MYANMAR' : 'Myanmar',
'BURMA' : 'Myanmar',
'PHL' : 'Philippines',
'PHILIPPINES' : 'Philippines',
'SGP' : 'Singapore',
'SINGAPORE' : 'Singapore',
'THA' : 'Thailand',
'THAILAND' : 'Thailand',
'VNM' : 'Vietnam',
'VIETNAM' : 'Vietnam',
'VIET NAM' : 'Vietnam',
}
def standardize_country_names_asean(df: pd.DataFrame, country_column: str = 'country') -> tuple:
"""
Standardize country names untuk ASEAN.
Ensures country names within varchar(100) constraint.
Returns:
tuple: (df_clean, report_dict)
"""
df_clean = df.copy()
def map_country(country):
if pd.isna(country):
return country
s = str(country).strip()
mapped = ASEAN_MAPPING.get(s.upper(), s)
return mapped[:100] if len(mapped) > 100 else mapped
original = df_clean[country_column].copy()
df_clean[country_column] = df_clean[country_column].apply(map_country)
changes = {orig: new for orig, new in zip(original, df_clean[country_column]) if orig != new}
return df_clean, {
'countries_mapped': len(set(changes.keys())),
'changes' : changes,
}
# =============================================================================
# PILLAR CLASSIFICATION
# =============================================================================
def assign_pillar(indicator_name: str) -> str:
"""
Assign pillar berdasarkan keyword indikator.
Return values: 'Availability', 'Access', 'Utilization', 'Stability', 'Other'
All ≤ 20 chars (varchar(20) constraint).
"""
if pd.isna(indicator_name):
return 'Other'
ind = str(indicator_name).lower()
for kw in ['requirement', 'coefficient', 'losses', 'fat supply']:
if kw in ind:
return 'Other'
if any(kw in ind for kw in [
'adequacy', 'protein supply', 'supply of protein',
'dietary energy supply', 'share of dietary energy', 'derived from cereals'
]):
return 'Availability'
if any(kw in ind for kw in [
'variability', 'cereal import dependency', 'arable land equipped',
'political stability', 'value of food imports in total'
]):
return 'Stability'
if any(kw in ind for kw in [
'gdp', 'gross domestic product', 'rail lines', 'road density',
'number of moderately', 'number of severely',
'number of people undernourished', 'prevalence of moderate',
'prevalence of severe', 'prevalence of undernourishment', 'food insecure'
]):
return 'Access'
if any(kw in ind for kw in [
'wasting', 'wasted', 'stunted', 'overweight', 'obese', 'obesity',
'anemia', 'birthweight', 'breastfeeding', 'drinking water', 'sanitation',
'children under 5', 'newborns with low', 'women of reproductive'
]):
return 'Utilization'
return 'Other'
# =============================================================================
# DIRECTION CLASSIFICATION
# =============================================================================
def assign_direction(indicator_name: str) -> str:
"""
Assign direction berdasarkan indikator.
Return values: 'higher_better' (13 chars) atau 'lower_better' (12 chars)
Both ≤ 15 chars (varchar(15) constraint).
"""
if pd.isna(indicator_name):
return 'higher_better'
ind = str(indicator_name).lower()
# Spesifik lower_better
if 'share of dietary energy supply derived from cereals' in ind:
return 'lower_better'
# Higher_better exceptions — cek sebelum lower_better keywords
for kw in [
'exclusive breastfeeding',
'dietary energy supply',
'dietary energy supply adequacy',
'average fat supply',
'average protein supply',
'supply of protein of animal origin',
]:
if kw in ind:
return 'higher_better'
# Lower_better — masalah yang harus diminimalkan
for kw in [
'prevalence of undernourishment',
'prevalence of severe food insecurity',
'prevalence of moderate or severe food insecurity',
'prevalence of moderate food insecurity',
'prevalence of wasting',
'prevalence of stunting',
'prevalence of overweight',
'prevalence of obesity',
'prevalence of anemia',
'prevalence of low birthweight',
'number of people undernourished',
'number of severely food insecure',
'number of moderately or severely food insecure',
'number of children under 5 years affected by wasting',
'number of children under 5 years of age who are overweight',
'number of children under 5 years of age who are stunted',
'number of newborns with low birthweight',
'number of obese adults',
'number of women of reproductive age',
'percentage of children under 5 years affected by wasting',
'percentage of children under 5 years of age who are overweight',
'percentage of children under 5 years of age who are stunted',
'cereal import dependency',
'import dependency',
'value of food imports in total merchandise exports',
'value of food imports',
'variability of food production',
'variability of food supply',
'per capita food production variability',
'per capita food supply variability',
'coefficient of variation',
'incidence of caloric losses',
'food losses',
]:
if kw in ind:
return 'lower_better'
return 'higher_better'
# =============================================================================
# CLEANED DATA LOADER
# =============================================================================
class CleanedDataLoader:
"""
Loader untuk cleaned integrated data ke STAGING layer (Silver).
Kimball context:
Input : staging_integrated → STAGING (Silver) — fs_asean_silver
Output : cleaned_integrated → STAGING (Silver) — fs_asean_silver
Audit : etl_logs, etl_metadata → AUDIT — fs_asean_audit
Pipeline steps:
1. Standardize country names (ASEAN)
2. Remove missing values
3. Remove duplicates
4. Add pillar classification
5. Add direction classification
6. Apply column constraints
7. Load ke BigQuery
8. Log ke Audit layer
"""
SCHEMA = [
bigquery.SchemaField("source", "STRING", mode="REQUIRED"),
bigquery.SchemaField("indicator_original", "STRING", mode="REQUIRED"),
bigquery.SchemaField("indicator_standardized", "STRING", mode="REQUIRED"),
bigquery.SchemaField("country", "STRING", mode="REQUIRED"),
bigquery.SchemaField("year", "INTEGER", mode="NULLABLE"),
bigquery.SchemaField("year_range", "STRING", mode="NULLABLE"),
bigquery.SchemaField("value", "FLOAT", mode="NULLABLE"),
bigquery.SchemaField("unit", "STRING", mode="NULLABLE"),
bigquery.SchemaField("pillar", "STRING", mode="REQUIRED"),
bigquery.SchemaField("direction", "STRING", mode="REQUIRED"),
]
def __init__(self, client: bigquery.Client, load_mode: str = 'full_refresh'):
self.client = client
self.load_mode = load_mode
self.logger = logging.getLogger(self.__class__.__name__)
self.logger.propagate = False
self.table_name = 'cleaned_integrated'
self.target_layer = 'silver'
self.metadata = {
'source_class' : self.__class__.__name__,
'table_name' : self.table_name,
'start_time' : None,
'end_time' : None,
'duration_seconds' : None,
'rows_fetched' : 0,
'rows_transformed' : 0,
'rows_loaded' : 0,
'load_mode' : load_mode,
'validation_metrics': {}
}
# ------------------------------------------------------------------
# STEP METHODS
# ------------------------------------------------------------------
def _step_standardize_countries(self, df: pd.DataFrame) -> pd.DataFrame:
print("\n [Step 1/5] Standardize country names...")
df, report = standardize_country_names_asean(df, country_column='country')
print(f" ✓ ASEAN countries mapped : {report['countries_mapped']}")
unique_countries = sorted(df['country'].unique())
print(f" Countries ({len(unique_countries)}) : {', '.join(unique_countries)}")
log_update(self.client, 'STAGING', 'staging_integrated',
'standardize_asean', report['countries_mapped'])
return df
def _step_remove_missing(self, df: pd.DataFrame) -> pd.DataFrame:
print("\n [Step 2/5] Remove missing values...")
rows_before = len(df)
df_clean = df.dropna(subset=list(df.columns))
rows_after = len(df_clean)
removed = rows_before - rows_after
print(f" Rows before : {rows_before:,}")
print(f" Rows after : {rows_after:,}")
print(f" Rows removed : {removed:,} ({removed/rows_before*100:.1f}%)")
print(f" Retention : {rows_after/rows_before*100:.1f}%")
return df_clean
def _step_remove_duplicates(self, df: pd.DataFrame) -> pd.DataFrame:
print("\n [Step 3/5] Remove duplicates...")
exact_dups = df.duplicated().sum()
data_dups = df.duplicated(subset=['indicator_standardized', 'country', 'year', 'value']).sum()
print(f" Exact duplicates : {exact_dups:,}")
print(f" Data duplicates : {data_dups:,}")
rows_before = len(df)
df_clean = df.drop_duplicates(
subset=['indicator_standardized', 'country', 'year'], keep='first'
)
removed = rows_before - len(df_clean)
print(f" Rows removed : {removed:,} ({removed/rows_before*100:.1f}%)")
return df_clean
def _step_add_classifications(self, df: pd.DataFrame) -> pd.DataFrame:
print("\n [Step 4/5] Add pillar & direction classification...")
df = df.copy()
df['pillar'] = df['indicator_standardized'].apply(assign_pillar)
df['direction'] = df['indicator_standardized'].apply(assign_direction)
pillar_counts = df['pillar'].value_counts()
print(f" ✓ Pillar distribution:")
for pillar, count in pillar_counts.items():
print(f" - {pillar}: {count:,}")
direction_counts = df['direction'].value_counts()
print(f" ✓ Direction distribution:")
for direction, count in direction_counts.items():
pct = count / len(df) * 100
print(f" - {direction}: {count:,} ({pct:.1f}%)")
return df
def _step_apply_constraints(self, df: pd.DataFrame) -> pd.DataFrame:
print("\n [Step 5/5] Apply column constraints...")
return apply_column_constraints(df)
# ------------------------------------------------------------------
# VALIDATION
# ------------------------------------------------------------------
def validate_data(self, df: pd.DataFrame) -> Dict:
validation = {
'total_rows' : int(len(df)),
'total_columns' : int(len(df.columns)),
'duplicate_count' : int(df.duplicated().sum()),
'completeness_pct': float(round((1 - df.isnull().sum().sum() / df.size) * 100, 2)),
'memory_usage_mb' : float(round(df.memory_usage(deep=True).sum() / 1024**2, 2))
}
if 'year' in df.columns:
validation['year_range'] = {
'min' : int(df['year'].min()) if not df['year'].isnull().all() else None,
'max' : int(df['year'].max()) if not df['year'].isnull().all() else None,
'unique_years': int(df['year'].nunique())
}
for col in ('pillar', 'direction', 'source'):
if col in df.columns:
validation[f'{col}_breakdown'] = {
str(k): int(v) for k, v in df[col].value_counts().to_dict().items()
}
if 'indicator_standardized' in df.columns:
validation['unique_indicators'] = int(df['indicator_standardized'].nunique())
if 'country' in df.columns:
validation['unique_countries'] = int(df['country'].nunique())
# Column length check
column_length_check = {}
for col, max_len in COLUMN_CONSTRAINTS.items():
if col in df.columns:
max_actual = df[col].astype(str).str.len().max()
column_length_check[col] = {
'max_length_constraint': max_len,
'max_actual_length' : int(max_actual),
'within_limit' : bool(max_actual <= max_len)
}
validation['column_length_check'] = column_length_check
return validation
# ------------------------------------------------------------------
# RUN
# ------------------------------------------------------------------
def run(self, df: pd.DataFrame) -> int:
"""
Execute full cleaning pipeline → load ke STAGING (Silver).
Returns:
int: Rows loaded
"""
self.metadata['start_time'] = datetime.now()
self.metadata['rows_fetched'] = len(df)
if df.empty:
print(" ERROR: DataFrame is empty, nothing to process.")
return 0
# Pipeline steps
df = self._step_standardize_countries(df)
df = self._step_remove_missing(df)
df = self._step_remove_duplicates(df)
df = self._step_add_classifications(df)
df = self._step_apply_constraints(df)
self.metadata['rows_transformed'] = len(df)
# Validate
validation = self.validate_data(df)
self.metadata['validation_metrics'] = validation
all_within_limits = all(
info['within_limit']
for info in validation.get('column_length_check', {}).values()
)
if not all_within_limits:
print("\n ⚠ WARNING: Some columns still exceed length constraints!")
for col, info in validation['column_length_check'].items():
if not info['within_limit']:
print(f" - {col}: {info['max_actual_length']} > {info['max_length_constraint']}")
# Load ke Silver
print(f"\n Loading to [STAGING/Silver] {self.table_name} → fs_asean_silver...")
rows_loaded = load_to_bigquery(
self.client, df, self.table_name,
layer='silver',
write_disposition="WRITE_TRUNCATE",
schema=self.SCHEMA
)
self.metadata['rows_loaded'] = rows_loaded
# Audit logs
log_update(self.client, 'STAGING', self.table_name, 'full_refresh', rows_loaded)
# ETL metadata
self.metadata['end_time'] = datetime.now()
self.metadata['duration_seconds'] = (
self.metadata['end_time'] - self.metadata['start_time']
).total_seconds()
self.metadata['execution_timestamp'] = self.metadata['start_time']
self.metadata['completeness_pct'] = validation.get('completeness_pct', 0)
self.metadata['config_snapshot'] = json.dumps({'load_mode': self.load_mode})
self.metadata['validation_metrics'] = json.dumps(validation)
save_etl_metadata(self.client, self.metadata)
# Summary
print(f"\n ✓ Cleaned Integration completed: {rows_loaded:,} rows")
print(f" Duration : {self.metadata['duration_seconds']:.2f}s")
print(f" Completeness : {validation['completeness_pct']:.2f}%")
if 'year_range' in validation:
yr = validation['year_range']
if yr['min'] and yr['max']:
print(f" Year range : {yr['min']}{yr['max']}")
print(f" Indicators : {validation.get('unique_indicators', '-')}")
print(f" Countries : {validation.get('unique_countries', '-')}")
print(f"\n Schema Validation:")
for col, info in validation.get('column_length_check', {}).items():
status = "" if info['within_limit'] else ""
print(f" {status} {col}: {info['max_actual_length']}/{info['max_length_constraint']}")
print(f"\n Metadata → [AUDIT] etl_metadata")
return rows_loaded
# =============================================================================
# AIRFLOW TASK FUNCTIONS ← sama polanya dengan raw layer
# =============================================================================
def run_cleaned_integration():
"""
Airflow task: Load cleaned_integrated dari staging_integrated.
Dipanggil oleh DAG setelah task staging_integration_to_silver selesai.
"""
from bigquery_config import get_bigquery_client
client = get_bigquery_client()
df_staging = load_staging_data(client)
loader = CleanedDataLoader(client, load_mode='full_refresh')
rows = loader.run(df_staging)
print(f"Cleaned layer loaded: {rows:,} rows")
# =============================================================================
# MAIN EXECUTION
# =============================================================================
if __name__ == "__main__":
print("=" * 60)
print("BIGQUERY CLEANED LAYER ETL")
print("Kimball DW Architecture")
print(" Input : STAGING (Silver) → staging_integrated")
print(" Output : STAGING (Silver) → cleaned_integrated")
print(" Audit : AUDIT → etl_logs, etl_metadata")
print("=" * 60)
logger = setup_logging()
client = get_bigquery_client()
df_staging = load_staging_data(client)
print("\n[1/1] Cleaned Integration → STAGING (Silver)...")
loader = CleanedDataLoader(client, load_mode='full_refresh')
final_count = loader.run(df_staging)
print("\n" + "=" * 60)
print("✓ CLEANED LAYER ETL COMPLETED")
print(f" 🥈 STAGING (Silver) : cleaned_integrated ({final_count:,} rows)")
print(f" 📋 AUDIT : etl_logs, etl_metadata")
print("=" * 60)