diff --git a/dags/etl_food_security.py b/dags/etl_food_security.py index f3dda1a..d5010c7 100644 --- a/dags/etl_food_security.py +++ b/dags/etl_food_security.py @@ -11,6 +11,11 @@ from scripts.bigquery_raw_layer import ( run_staging_integration, ) +from scripts.bigquery_cleaned_layer import ( + run_cleaned_integration, +) + + with DAG( dag_id = "etl_food_security_bigquery", start_date = datetime(2026, 3, 1), @@ -43,5 +48,12 @@ with DAG( task_id = "staging_integration_to_silver", python_callable = run_staging_integration ) + + task_cleaned = PythonOperator( + task_id = "cleaned_integration_to_silver", + python_callable = run_cleaned_integration + ) + + - task_verify >> task_fao >> task_worldbank >> task_unicef >> task_staging \ No newline at end of file + task_verify >> task_fao >> task_worldbank >> task_unicef >> task_staging >> task_cleaned \ No newline at end of file diff --git a/scripts/bigquery_cleaned_layer.py b/scripts/bigquery_cleaned_layer.py new file mode 100644 index 0000000..5a1fe5f --- /dev/null +++ b/scripts/bigquery_cleaned_layer.py @@ -0,0 +1,581 @@ +""" +BIGQUERY CLEANED LAYER ETL +Kimball Data Warehouse Architecture + +Kimball ETL Flow yang dijalankan file ini: + Input : STAGING layer (Silver) — staging_integrated (fs_asean_silver) + Output : STAGING layer (Silver) — cleaned_integrated (fs_asean_silver) + Audit : AUDIT layer — etl_logs, etl_metadata (fs_asean_audit) + +Classes: + CleanedDataLoader — Cleaning, enrichment, & load ke Silver layer + +Usage: + python bigquery_cleaned_layer.py +""" + +import pandas as pd +import numpy as np +from datetime import datetime +import logging +from typing import Dict +import json + +from bigquery_config import get_bigquery_client, CONFIG, get_table_id +from bigquery_helpers import ( + log_update, + load_to_bigquery, + read_from_bigquery, + setup_logging, + save_etl_metadata +) +from google.cloud import bigquery + + +# ============================================================================= +# LOAD STAGING DATA +# ============================================================================= + +def load_staging_data(client: bigquery.Client) -> pd.DataFrame: + """Load data dari staging_integrated (STAGING/Silver layer).""" + print("\nLoading data from staging_integrated (fs_asean_silver)...") + df_staging = read_from_bigquery(client, 'staging_integrated', layer='silver') + print(f" ✓ Loaded : {len(df_staging):,} rows") + print(f" Columns : {len(df_staging.columns)}") + print(f" Sources : {df_staging['source'].nunique()}") + print(f" Indicators : {df_staging['indicator_standardized'].nunique()}") + print(f" Countries : {df_staging['country'].nunique()}") + print(f" Year range : {int(df_staging['year'].min())}-{int(df_staging['year'].max())}") + return df_staging + + +# ============================================================================= +# COLUMN CONSTRAINT HELPERS +# ============================================================================= + +# Schema constraints — semua varchar max lengths +COLUMN_CONSTRAINTS = { + 'source' : 20, + 'indicator_original' : 255, + 'indicator_standardized': 255, + 'country' : 100, + 'year_range' : 20, + 'unit' : 20, + 'pillar' : 20, + 'direction' : 15, # 'higher_better'=13, 'lower_better'=12 +} + + +def truncate_string(value, max_length: int) -> str: + """Truncate string ke max_length, return as-is jika None/NaN.""" + if pd.isna(value): + return value + value_str = str(value) + return value_str[:max_length] if len(value_str) > max_length else value_str + + +def apply_column_constraints(df: pd.DataFrame) -> pd.DataFrame: + """ + Apply column length constraints sesuai schema tabel. + Melaporkan kolom mana yang dipotong dan contohnya. + """ + df_constrained = df.copy() + truncation_report = {} + + for column, max_length in COLUMN_CONSTRAINTS.items(): + if column not in df_constrained.columns: + continue + mask = ( + df_constrained[column].notna() & + (df_constrained[column].astype(str).str.len() > max_length) + ) + truncated_count = mask.sum() + if truncated_count > 0: + truncation_report[column] = { + 'count' : int(truncated_count), + 'max_length': max_length, + 'examples' : df_constrained[mask][column].head(3).tolist() + } + df_constrained[column] = df_constrained[column].apply( + lambda x: truncate_string(x, max_length) + ) + + if truncation_report: + print("\n ⚠ Column Truncations Applied:") + for column, info in truncation_report.items(): + print(f" - {column}: {info['count']} values truncated to {info['max_length']} chars") + else: + print("\n ✓ No truncations needed — all values within constraints") + + return df_constrained + + +# ============================================================================= +# COUNTRY NAME STANDARDIZATION +# ============================================================================= + +ASEAN_MAPPING = { + 'BRN' : 'Brunei Darussalam', + 'BRUNEI' : 'Brunei Darussalam', + 'BRUNEI DARUSSALAM' : 'Brunei Darussalam', + 'KHM' : 'Cambodia', + 'CAMBODIA' : 'Cambodia', + 'IDN' : 'Indonesia', + 'INDONESIA' : 'Indonesia', + 'LAO' : 'Laos', + 'LAOS' : 'Laos', + "LAO PEOPLE'S DEMOCRATIC REPUBLIC" : 'Laos', + 'LAO PDR' : 'Laos', + 'MYS' : 'Malaysia', + 'MALAYSIA' : 'Malaysia', + 'MMR' : 'Myanmar', + 'MYANMAR' : 'Myanmar', + 'BURMA' : 'Myanmar', + 'PHL' : 'Philippines', + 'PHILIPPINES' : 'Philippines', + 'SGP' : 'Singapore', + 'SINGAPORE' : 'Singapore', + 'THA' : 'Thailand', + 'THAILAND' : 'Thailand', + 'VNM' : 'Vietnam', + 'VIETNAM' : 'Vietnam', + 'VIET NAM' : 'Vietnam', +} + + +def standardize_country_names_asean(df: pd.DataFrame, country_column: str = 'country') -> tuple: + """ + Standardize country names untuk ASEAN. + Ensures country names within varchar(100) constraint. + + Returns: + tuple: (df_clean, report_dict) + """ + df_clean = df.copy() + + def map_country(country): + if pd.isna(country): + return country + s = str(country).strip() + mapped = ASEAN_MAPPING.get(s.upper(), s) + return mapped[:100] if len(mapped) > 100 else mapped + + original = df_clean[country_column].copy() + df_clean[country_column] = df_clean[country_column].apply(map_country) + changes = {orig: new for orig, new in zip(original, df_clean[country_column]) if orig != new} + + return df_clean, { + 'countries_mapped': len(set(changes.keys())), + 'changes' : changes, + } + + +# ============================================================================= +# PILLAR CLASSIFICATION +# ============================================================================= + +def assign_pillar(indicator_name: str) -> str: + """ + Assign pillar berdasarkan keyword indikator. + Return values: 'Availability', 'Access', 'Utilization', 'Stability', 'Other' + All ≤ 20 chars (varchar(20) constraint). + """ + if pd.isna(indicator_name): + return 'Other' + ind = str(indicator_name).lower() + + for kw in ['requirement', 'coefficient', 'losses', 'fat supply']: + if kw in ind: + return 'Other' + + if any(kw in ind for kw in [ + 'adequacy', 'protein supply', 'supply of protein', + 'dietary energy supply', 'share of dietary energy', 'derived from cereals' + ]): + return 'Availability' + + if any(kw in ind for kw in [ + 'variability', 'cereal import dependency', 'arable land equipped', + 'political stability', 'value of food imports in total' + ]): + return 'Stability' + + if any(kw in ind for kw in [ + 'gdp', 'gross domestic product', 'rail lines', 'road density', + 'number of moderately', 'number of severely', + 'number of people undernourished', 'prevalence of moderate', + 'prevalence of severe', 'prevalence of undernourishment', 'food insecure' + ]): + return 'Access' + + if any(kw in ind for kw in [ + 'wasting', 'wasted', 'stunted', 'overweight', 'obese', 'obesity', + 'anemia', 'birthweight', 'breastfeeding', 'drinking water', 'sanitation', + 'children under 5', 'newborns with low', 'women of reproductive' + ]): + return 'Utilization' + + return 'Other' + + +# ============================================================================= +# DIRECTION CLASSIFICATION +# ============================================================================= + +def assign_direction(indicator_name: str) -> str: + """ + Assign direction berdasarkan indikator. + Return values: 'higher_better' (13 chars) atau 'lower_better' (12 chars) + Both ≤ 15 chars (varchar(15) constraint). + """ + if pd.isna(indicator_name): + return 'higher_better' + ind = str(indicator_name).lower() + + # Spesifik lower_better + if 'share of dietary energy supply derived from cereals' in ind: + return 'lower_better' + + # Higher_better exceptions — cek sebelum lower_better keywords + for kw in [ + 'exclusive breastfeeding', + 'dietary energy supply', + 'dietary energy supply adequacy', + 'average fat supply', + 'average protein supply', + 'supply of protein of animal origin', + ]: + if kw in ind: + return 'higher_better' + + # Lower_better — masalah yang harus diminimalkan + for kw in [ + 'prevalence of undernourishment', + 'prevalence of severe food insecurity', + 'prevalence of moderate or severe food insecurity', + 'prevalence of moderate food insecurity', + 'prevalence of wasting', + 'prevalence of stunting', + 'prevalence of overweight', + 'prevalence of obesity', + 'prevalence of anemia', + 'prevalence of low birthweight', + 'number of people undernourished', + 'number of severely food insecure', + 'number of moderately or severely food insecure', + 'number of children under 5 years affected by wasting', + 'number of children under 5 years of age who are overweight', + 'number of children under 5 years of age who are stunted', + 'number of newborns with low birthweight', + 'number of obese adults', + 'number of women of reproductive age', + 'percentage of children under 5 years affected by wasting', + 'percentage of children under 5 years of age who are overweight', + 'percentage of children under 5 years of age who are stunted', + 'cereal import dependency', + 'import dependency', + 'value of food imports in total merchandise exports', + 'value of food imports', + 'variability of food production', + 'variability of food supply', + 'per capita food production variability', + 'per capita food supply variability', + 'coefficient of variation', + 'incidence of caloric losses', + 'food losses', + ]: + if kw in ind: + return 'lower_better' + + return 'higher_better' + + +# ============================================================================= +# CLEANED DATA LOADER +# ============================================================================= + +class CleanedDataLoader: + """ + Loader untuk cleaned integrated data ke STAGING layer (Silver). + + Kimball context: + Input : staging_integrated → STAGING (Silver) — fs_asean_silver + Output : cleaned_integrated → STAGING (Silver) — fs_asean_silver + Audit : etl_logs, etl_metadata → AUDIT — fs_asean_audit + + Pipeline steps: + 1. Standardize country names (ASEAN) + 2. Remove missing values + 3. Remove duplicates + 4. Add pillar classification + 5. Add direction classification + 6. Apply column constraints + 7. Load ke BigQuery + 8. Log ke Audit layer + """ + + SCHEMA = [ + bigquery.SchemaField("source", "STRING", mode="REQUIRED"), + bigquery.SchemaField("indicator_original", "STRING", mode="REQUIRED"), + bigquery.SchemaField("indicator_standardized", "STRING", mode="REQUIRED"), + bigquery.SchemaField("country", "STRING", mode="REQUIRED"), + bigquery.SchemaField("year", "INTEGER", mode="NULLABLE"), + bigquery.SchemaField("year_range", "STRING", mode="NULLABLE"), + bigquery.SchemaField("value", "FLOAT", mode="NULLABLE"), + bigquery.SchemaField("unit", "STRING", mode="NULLABLE"), + bigquery.SchemaField("pillar", "STRING", mode="REQUIRED"), + bigquery.SchemaField("direction", "STRING", mode="REQUIRED"), + ] + + def __init__(self, client: bigquery.Client, load_mode: str = 'full_refresh'): + self.client = client + self.load_mode = load_mode + self.logger = logging.getLogger(self.__class__.__name__) + self.logger.propagate = False + self.table_name = 'cleaned_integrated' + self.target_layer = 'silver' + + self.metadata = { + 'source_class' : self.__class__.__name__, + 'table_name' : self.table_name, + 'start_time' : None, + 'end_time' : None, + 'duration_seconds' : None, + 'rows_fetched' : 0, + 'rows_transformed' : 0, + 'rows_loaded' : 0, + 'load_mode' : load_mode, + 'validation_metrics': {} + } + + # ------------------------------------------------------------------ + # STEP METHODS + # ------------------------------------------------------------------ + + def _step_standardize_countries(self, df: pd.DataFrame) -> pd.DataFrame: + print("\n [Step 1/5] Standardize country names...") + df, report = standardize_country_names_asean(df, country_column='country') + print(f" ✓ ASEAN countries mapped : {report['countries_mapped']}") + unique_countries = sorted(df['country'].unique()) + print(f" Countries ({len(unique_countries)}) : {', '.join(unique_countries)}") + log_update(self.client, 'STAGING', 'staging_integrated', + 'standardize_asean', report['countries_mapped']) + return df + + def _step_remove_missing(self, df: pd.DataFrame) -> pd.DataFrame: + print("\n [Step 2/5] Remove missing values...") + rows_before = len(df) + df_clean = df.dropna(subset=list(df.columns)) + rows_after = len(df_clean) + removed = rows_before - rows_after + print(f" Rows before : {rows_before:,}") + print(f" Rows after : {rows_after:,}") + print(f" Rows removed : {removed:,} ({removed/rows_before*100:.1f}%)") + print(f" Retention : {rows_after/rows_before*100:.1f}%") + return df_clean + + def _step_remove_duplicates(self, df: pd.DataFrame) -> pd.DataFrame: + print("\n [Step 3/5] Remove duplicates...") + exact_dups = df.duplicated().sum() + data_dups = df.duplicated(subset=['indicator_standardized', 'country', 'year', 'value']).sum() + print(f" Exact duplicates : {exact_dups:,}") + print(f" Data duplicates : {data_dups:,}") + rows_before = len(df) + df_clean = df.drop_duplicates( + subset=['indicator_standardized', 'country', 'year'], keep='first' + ) + removed = rows_before - len(df_clean) + print(f" Rows removed : {removed:,} ({removed/rows_before*100:.1f}%)") + return df_clean + + def _step_add_classifications(self, df: pd.DataFrame) -> pd.DataFrame: + print("\n [Step 4/5] Add pillar & direction classification...") + df = df.copy() + df['pillar'] = df['indicator_standardized'].apply(assign_pillar) + df['direction'] = df['indicator_standardized'].apply(assign_direction) + + pillar_counts = df['pillar'].value_counts() + print(f" ✓ Pillar distribution:") + for pillar, count in pillar_counts.items(): + print(f" - {pillar}: {count:,}") + + direction_counts = df['direction'].value_counts() + print(f" ✓ Direction distribution:") + for direction, count in direction_counts.items(): + pct = count / len(df) * 100 + print(f" - {direction}: {count:,} ({pct:.1f}%)") + return df + + def _step_apply_constraints(self, df: pd.DataFrame) -> pd.DataFrame: + print("\n [Step 5/5] Apply column constraints...") + return apply_column_constraints(df) + + # ------------------------------------------------------------------ + # VALIDATION + # ------------------------------------------------------------------ + + def validate_data(self, df: pd.DataFrame) -> Dict: + validation = { + 'total_rows' : int(len(df)), + 'total_columns' : int(len(df.columns)), + 'duplicate_count' : int(df.duplicated().sum()), + 'completeness_pct': float(round((1 - df.isnull().sum().sum() / df.size) * 100, 2)), + 'memory_usage_mb' : float(round(df.memory_usage(deep=True).sum() / 1024**2, 2)) + } + if 'year' in df.columns: + validation['year_range'] = { + 'min' : int(df['year'].min()) if not df['year'].isnull().all() else None, + 'max' : int(df['year'].max()) if not df['year'].isnull().all() else None, + 'unique_years': int(df['year'].nunique()) + } + for col in ('pillar', 'direction', 'source'): + if col in df.columns: + validation[f'{col}_breakdown'] = { + str(k): int(v) for k, v in df[col].value_counts().to_dict().items() + } + if 'indicator_standardized' in df.columns: + validation['unique_indicators'] = int(df['indicator_standardized'].nunique()) + if 'country' in df.columns: + validation['unique_countries'] = int(df['country'].nunique()) + + # Column length check + column_length_check = {} + for col, max_len in COLUMN_CONSTRAINTS.items(): + if col in df.columns: + max_actual = df[col].astype(str).str.len().max() + column_length_check[col] = { + 'max_length_constraint': max_len, + 'max_actual_length' : int(max_actual), + 'within_limit' : bool(max_actual <= max_len) + } + validation['column_length_check'] = column_length_check + return validation + + # ------------------------------------------------------------------ + # RUN + # ------------------------------------------------------------------ + + def run(self, df: pd.DataFrame) -> int: + """ + Execute full cleaning pipeline → load ke STAGING (Silver). + + Returns: + int: Rows loaded + """ + self.metadata['start_time'] = datetime.now() + self.metadata['rows_fetched'] = len(df) + + if df.empty: + print(" ERROR: DataFrame is empty, nothing to process.") + return 0 + + # Pipeline steps + df = self._step_standardize_countries(df) + df = self._step_remove_missing(df) + df = self._step_remove_duplicates(df) + df = self._step_add_classifications(df) + df = self._step_apply_constraints(df) + + self.metadata['rows_transformed'] = len(df) + + # Validate + validation = self.validate_data(df) + self.metadata['validation_metrics'] = validation + + all_within_limits = all( + info['within_limit'] + for info in validation.get('column_length_check', {}).values() + ) + if not all_within_limits: + print("\n ⚠ WARNING: Some columns still exceed length constraints!") + for col, info in validation['column_length_check'].items(): + if not info['within_limit']: + print(f" - {col}: {info['max_actual_length']} > {info['max_length_constraint']}") + + # Load ke Silver + print(f"\n Loading to [STAGING/Silver] {self.table_name} → fs_asean_silver...") + rows_loaded = load_to_bigquery( + self.client, df, self.table_name, + layer='silver', + write_disposition="WRITE_TRUNCATE", + schema=self.SCHEMA + ) + self.metadata['rows_loaded'] = rows_loaded + + # Audit logs + log_update(self.client, 'STAGING', self.table_name, 'full_refresh', rows_loaded) + + # ETL metadata + self.metadata['end_time'] = datetime.now() + self.metadata['duration_seconds'] = ( + self.metadata['end_time'] - self.metadata['start_time'] + ).total_seconds() + self.metadata['execution_timestamp'] = self.metadata['start_time'] + self.metadata['completeness_pct'] = validation.get('completeness_pct', 0) + self.metadata['config_snapshot'] = json.dumps({'load_mode': self.load_mode}) + self.metadata['validation_metrics'] = json.dumps(validation) + save_etl_metadata(self.client, self.metadata) + + # Summary + print(f"\n ✓ Cleaned Integration completed: {rows_loaded:,} rows") + print(f" Duration : {self.metadata['duration_seconds']:.2f}s") + print(f" Completeness : {validation['completeness_pct']:.2f}%") + if 'year_range' in validation: + yr = validation['year_range'] + if yr['min'] and yr['max']: + print(f" Year range : {yr['min']}–{yr['max']}") + print(f" Indicators : {validation.get('unique_indicators', '-')}") + print(f" Countries : {validation.get('unique_countries', '-')}") + print(f"\n Schema Validation:") + for col, info in validation.get('column_length_check', {}).items(): + status = "✓" if info['within_limit'] else "✗" + print(f" {status} {col}: {info['max_actual_length']}/{info['max_length_constraint']}") + print(f"\n Metadata → [AUDIT] etl_metadata") + + return rows_loaded + + +# ============================================================================= +# AIRFLOW TASK FUNCTIONS ← sama polanya dengan raw layer +# ============================================================================= + +def run_cleaned_integration(): + """ + Airflow task: Load cleaned_integrated dari staging_integrated. + + Dipanggil oleh DAG setelah task staging_integration_to_silver selesai. + """ + from bigquery_config import get_bigquery_client + client = get_bigquery_client() + df_staging = load_staging_data(client) + loader = CleanedDataLoader(client, load_mode='full_refresh') + rows = loader.run(df_staging) + print(f"Cleaned layer loaded: {rows:,} rows") + + +# ============================================================================= +# MAIN EXECUTION +# ============================================================================= + +if __name__ == "__main__": + print("=" * 60) + print("BIGQUERY CLEANED LAYER ETL") + print("Kimball DW Architecture") + print(" Input : STAGING (Silver) → staging_integrated") + print(" Output : STAGING (Silver) → cleaned_integrated") + print(" Audit : AUDIT → etl_logs, etl_metadata") + print("=" * 60) + + logger = setup_logging() + client = get_bigquery_client() + df_staging = load_staging_data(client) + + print("\n[1/1] Cleaned Integration → STAGING (Silver)...") + loader = CleanedDataLoader(client, load_mode='full_refresh') + final_count = loader.run(df_staging) + + print("\n" + "=" * 60) + print("✓ CLEANED LAYER ETL COMPLETED") + print(f" 🥈 STAGING (Silver) : cleaned_integrated ({final_count:,} rows)") + print(f" 📋 AUDIT : etl_logs, etl_metadata") + print("=" * 60) \ No newline at end of file