Compare commits

..

26 Commits

Author SHA1 Message Date
Debby
2cab52a110 agg narative country list 2026-03-26 20:03:49 +07:00
Debby
e25ae0dfe7 agg narrative v2 2026-03-26 19:15:39 +07:00
Debby
1d732167f5 create naration 2026-03-26 17:27:59 +07:00
Debby
a4ff15677e replace sklearn with pure numpy 2026-03-15 00:15:53 +07:00
Debby
4b617a1e8f cron job 2026-03-15 00:07:47 +07:00
Debby
27ac14ad9b rename file 2026-03-15 00:06:51 +07:00
Debby
2f29e42e3f create analytical and agg 2026-03-14 23:54:24 +07:00
Debby
453fb3ef52 create dimensional model 2026-03-14 23:29:45 +07:00
Debby
46eda521d1 script folder 2026-03-14 23:18:52 +07:00
Debby
0441573b61 create cleaned layer 2026-03-14 23:04:11 +07:00
Debby
5fc498e771 rename path json 2026-03-12 15:53:51 +07:00
Debby
0235dfbc75 raw and staging data 2026-03-12 14:57:30 +07:00
Debby
847a6a9859 volume to bind mount 1 2026-03-12 14:06:10 +07:00
Debby
514e1bd009 docker compose 1v 2026-03-08 11:52:23 +07:00
Debby
7be65b44b1 test compose problem 2026-03-08 11:39:22 +07:00
Debby
b7660045f8 bine mount first 2026-03-08 11:34:50 +07:00
Debby
330dbf5df7 bine mount 3 2026-03-08 11:32:38 +07:00
Debby
9ac03c9acc bine mount 2 2026-03-08 11:29:36 +07:00
Debby
94cfe71c51 bind mount 2026-03-08 11:21:13 +07:00
Debby
6a28c01f44 hello world 2026-03-08 11:10:46 +07:00
Debby
c68532c013 remove module-level BigQuery 2026-03-08 10:47:56 +07:00
Debby
007e679349 test connect fao 2026-03-08 10:29:28 +07:00
Debby
45cee4cbd4 test connect to bigquery 2026-03-07 17:43:14 +07:00
Debby
e63d8ba69a test connect 2026-03-07 17:17:27 +07:00
Debby
ceccf69518 docker-compose-1V 2026-03-07 16:56:58 +07:00
Debby
3086e5ce50 first compose 2026-03-07 16:43:55 +07:00
12 changed files with 3522 additions and 418 deletions

View File

@@ -1,8 +1,57 @@
"""
AIRFLOW DAG — ETL Food Security BigQuery
Kimball Data Warehouse Architecture
Schedule : Setiap 3 hari sekali (timedelta(days=3))
Catchup : False
Kimball ETL Flow:
┌──────────────────────────────────────────────────────────────────────────┐
│ BRONZE (Raw) SILVER (Staging→Cleaned) GOLD (DW → Analytical) │
│ │
│ raw_fao ─┐ dim_country │
│ raw_worldbank ─┼→ staging_integrated dim_indicator │
│ raw_unicef ─┘ ↓ dim_time │
│ cleaned_integrated ───────→ dim_source │
│ dim_pillar │
│ fact_food_security │
│ ↓ │
│ analytical_food_security │
│ ↓ │
│ agg_pillar_composite │
│ agg_pillar_by_country │
│ agg_framework_by_country │
│ agg_framework_asean │
│ │
│ AUDIT : etl_logs, etl_metadata (setiap layer) │
└──────────────────────────────────────────────────────────────────────────┘
Task Order:
verify_bigquery_connection
→ load_fao_to_bronze
→ load_worldbank_to_bronze
→ load_unicef_to_bronze
→ staging_integration_to_silver
→ cleaned_integration_to_silver
→ dimensional_model_to_gold
→ analytical_layer_to_gold
→ aggregation_to_gold
Scripts folder harus berisi:
- bigquery_raw_layer.py (run_verify_connection, run_load_fao, ...)
- bigquery_cleaned_layer.py (run_cleaned_integration)
- bigquery_dimensional_model.py (run_dimensional_model)
- bigquery_analytical_layer.py (run_analytical_layer)
- bigquery_analysis_aggregation.py (run_aggregation)
- bigquery_config.py
- bigquery_helpers.py
- bigquery_datasource.py
"""
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
# Import fungsi dari folder scripts
from scripts.bigquery_raw_layer import (
run_verify_connection,
run_load_fao,
@@ -10,13 +59,36 @@ from scripts.bigquery_raw_layer import (
run_load_unicef,
run_staging_integration,
)
from scripts.bigquery_cleaned_layer import (
run_cleaned_integration,
)
from scripts.bigquery_dimensional_model import (
run_dimensional_model,
)
from scripts.bigquery_analytical_layer import (
run_analytical_layer,
)
from scripts.bigquery_aggregate_layer import (
run_aggregation,
)
# DEFAULT ARGS
default_args = {
'owner': 'data-engineering',
'email': ['d1041221004@student.untan.ac.id'],
}
# DAG DEFINITION
with DAG(
dag_id = "etl_food_security_bigquery",
description = "Kimball ETL: FAO, World Bank, UNICEF → BigQuery (Bronze → Silver → Gold)",
default_args = default_args,
start_date = datetime(2026, 3, 1),
schedule_interval = "@daily",
schedule_interval = "0 0 */3 * *",
catchup = False,
tags = ["food-security", "bigquery", "kimball"]
tags = ["food-security", "bigquery", "kimball"],
) as dag:
task_verify = PythonOperator(
@@ -44,4 +116,25 @@ with DAG(
python_callable = run_staging_integration
)
task_verify >> task_fao >> task_worldbank >> task_unicef >> task_staging
task_cleaned = PythonOperator(
task_id = "cleaned_integration_to_silver",
python_callable = run_cleaned_integration
)
task_dimensional = PythonOperator(
task_id = "dimensional_model_to_gold",
python_callable = run_dimensional_model
)
task_analytical = PythonOperator(
task_id = "analytical_layer_to_gold",
python_callable = run_analytical_layer
)
task_aggregation = PythonOperator(
task_id = "aggregation_to_gold",
python_callable = run_aggregation
)
task_verify >> task_fao >> task_worldbank >> task_unicef >> task_staging >> task_cleaned >> task_dimensional >> task_analytical >> task_aggregation

14
dags/test_simple.py Normal file
View File

@@ -0,0 +1,14 @@
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def hello():
print("hello world")
with DAG(
dag_id="test_simple",
start_date=datetime(2026, 3, 1),
schedule_interval="@daily",
catchup=False
) as dag:
PythonOperator(task_id="hello", python_callable=hello)

View File

@@ -19,13 +19,9 @@ services:
- AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@postgres/airflow
- AIRFLOW__CORE__EXECUTOR=LocalExecutor
- AIRFLOW__CORE__LOAD_EXAMPLES=False
- GOOGLE_APPLICATION_CREDENTIALS=/opt/airflow/secrets/food-security-asean-project-826a4d7b302a.json
volumes:
- airflow_dags:/opt/airflow/dags
- airflow_logs:/opt/airflow/logs
- airflow_plugins:/opt/airflow/plugins
- ./scripts:/opt/airflow/scripts
- ./secrets:/opt/airflow/secrets:ro
ports:
- "8081:8080"
command: bash -c "airflow db init && airflow webserver"
@@ -39,17 +35,12 @@ services:
- PYTHONPATH=/opt/airflow
- AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@postgres/airflow
- AIRFLOW__CORE__EXECUTOR=LocalExecutor
- GOOGLE_APPLICATION_CREDENTIALS=/opt/airflow/secrets/food-security-asean-project-826a4d7b302a.json
volumes:
- airflow_dags:/opt/airflow/dags
- airflow_logs:/opt/airflow/logs
- airflow_plugins:/opt/airflow/plugins
- ./scripts:/opt/airflow/scripts
- ./secrets:/opt/airflow/secrets:ro
command: scheduler
volumes:
postgres_data:
airflow_dags:
airflow_logs:
airflow_plugins:

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,557 @@
"""
BIGQUERY ANALYTICAL LAYER - DATA FILTERING
FIXED: analytical_food_security disimpan di fs_asean_gold (layer='gold')
Filtering Order:
1. Load data (single years only)
2. Determine year boundaries (2013 - auto-detected end year)
3. Filter complete indicators PER COUNTRY (auto-detect start year, no gaps)
4. Filter countries with ALL pillars (FIXED SET)
5. Filter indicators with consistent presence across FIXED countries
6. Save analytical table (value only, normalisasi & direction handled downstream)
"""
import pandas as pd
import numpy as np
from datetime import datetime
import logging
from typing import Dict, List
import json
import sys
if hasattr(sys.stdout, 'reconfigure'):
sys.stdout.reconfigure(encoding='utf-8')
from scripts.bigquery_config import get_bigquery_client, CONFIG, get_table_id
from scripts.bigquery_helpers import (
log_update,
load_to_bigquery,
read_from_bigquery,
setup_logging,
truncate_table,
save_etl_metadata,
)
from google.cloud import bigquery
# =============================================================================
# ANALYTICAL LAYER CLASS
# =============================================================================
class AnalyticalLayerLoader:
"""
Analytical Layer Loader for BigQuery - CORRECTED VERSION v4
Key Logic:
1. Complete per country (no gaps from start_year to end_year)
2. Filter countries with all pillars
3. Ensure indicators have consistent country count across all years
4. Save raw value only (normalisasi & direction handled downstream)
Output: analytical_food_security -> DW layer (Gold) -> fs_asean_gold
"""
def __init__(self, client: bigquery.Client):
self.client = client
self.logger = logging.getLogger(self.__class__.__name__)
self.logger.propagate = False
self.df_clean = None
self.df_indicator = None
self.df_country = None
self.df_pillar = None
self.selected_country_ids = None
self.start_year = 2013
self.end_year = None
self.baseline_year = 2023
self.pipeline_metadata = {
'source_class' : self.__class__.__name__,
'start_time' : None,
'end_time' : None,
'duration_seconds' : None,
'rows_fetched' : 0,
'rows_transformed' : 0,
'rows_loaded' : 0,
'validation_metrics': {}
}
self.pipeline_start = None
self.pipeline_end = None
def load_source_data(self):
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 1: LOADING SOURCE DATA from fs_asean_gold")
self.logger.info("=" * 80)
try:
query = f"""
SELECT
f.country_id,
c.country_name,
f.indicator_id,
i.indicator_name,
i.direction,
f.pillar_id,
p.pillar_name,
f.time_id,
t.year,
t.start_year,
t.end_year,
t.is_year_range,
f.value,
f.source_id
FROM `{get_table_id('fact_food_security', layer='gold')}` f
JOIN `{get_table_id('dim_country', layer='gold')}` c ON f.country_id = c.country_id
JOIN `{get_table_id('dim_indicator', layer='gold')}` i ON f.indicator_id = i.indicator_id
JOIN `{get_table_id('dim_pillar', layer='gold')}` p ON f.pillar_id = p.pillar_id
JOIN `{get_table_id('dim_time', layer='gold')}` t ON f.time_id = t.time_id
"""
self.logger.info("Loading fact table with dimensions...")
self.df_clean = self.client.query(query).result().to_dataframe(create_bqstorage_client=False)
self.logger.info(f" Loaded: {len(self.df_clean):,} rows")
if 'is_year_range' in self.df_clean.columns:
yr = self.df_clean['is_year_range'].value_counts()
self.logger.info(f" Breakdown:")
self.logger.info(f" Single years (is_year_range=False): {yr.get(False, 0):,}")
self.logger.info(f" Year ranges (is_year_range=True): {yr.get(True, 0):,}")
self.df_indicator = read_from_bigquery(self.client, 'dim_indicator', layer='gold')
self.df_country = read_from_bigquery(self.client, 'dim_country', layer='gold')
self.df_pillar = read_from_bigquery(self.client, 'dim_pillar', layer='gold')
self.logger.info(f" Indicators: {len(self.df_indicator)}")
self.logger.info(f" Countries: {len(self.df_country)}")
self.logger.info(f" Pillars: {len(self.df_pillar)}")
self.pipeline_metadata['rows_fetched'] = len(self.df_clean)
return True
except Exception as e:
self.logger.error(f"Error loading source data: {e}")
raise
def determine_year_boundaries(self):
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 2: DETERMINE YEAR BOUNDARIES")
self.logger.info("=" * 80)
df_2023 = self.df_clean[self.df_clean['year'] == self.baseline_year]
baseline_indicator_count = df_2023['indicator_id'].nunique()
self.logger.info(f"\nBaseline Year: {self.baseline_year}")
self.logger.info(f"Baseline Indicator Count: {baseline_indicator_count}")
years_sorted = sorted(self.df_clean['year'].unique(), reverse=True)
selected_end_year = None
for year in years_sorted:
if year >= self.baseline_year:
df_year = self.df_clean[self.df_clean['year'] == year]
year_indicator_count = df_year['indicator_id'].nunique()
status = "OK" if year_indicator_count >= baseline_indicator_count else "X"
self.logger.info(f" [{status}] Year {int(year)}: {year_indicator_count} indicators")
if year_indicator_count >= baseline_indicator_count and selected_end_year is None:
selected_end_year = int(year)
if selected_end_year is None:
selected_end_year = self.baseline_year
self.logger.warning(f" [!] No year found, using baseline: {selected_end_year}")
else:
self.logger.info(f"\n [OK] Selected End Year: {selected_end_year}")
self.end_year = selected_end_year
original_count = len(self.df_clean)
self.df_clean = self.df_clean[
(self.df_clean['year'] >= self.start_year) &
(self.df_clean['year'] <= self.end_year)
].copy()
self.logger.info(f"\nFiltering {self.start_year}-{self.end_year}:")
self.logger.info(f" Rows before: {original_count:,}")
self.logger.info(f" Rows after: {len(self.df_clean):,}")
return self.df_clean
def filter_complete_indicators_per_country(self):
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 3: FILTER COMPLETE INDICATORS PER COUNTRY (NO GAPS)")
self.logger.info("=" * 80)
grouped = self.df_clean.groupby([
'country_id', 'country_name', 'indicator_id', 'indicator_name',
'pillar_id', 'pillar_name'
])
valid_combinations = []
removed_combinations = []
for (country_id, country_name, indicator_id, indicator_name,
pillar_id, pillar_name), group in grouped:
years_present = sorted(group['year'].unique())
start_year = int(min(years_present))
end_year_actual = int(max(years_present))
expected_years = list(range(start_year, self.end_year + 1))
missing_years = [y for y in expected_years if y not in years_present]
has_gap = len(missing_years) > 0
is_complete = (
end_year_actual >= self.end_year and
not has_gap and
(self.end_year - start_year) >= 4
)
if is_complete:
valid_combinations.append({'country_id': country_id, 'indicator_id': indicator_id})
else:
reasons = []
if end_year_actual < self.end_year:
reasons.append(f"ends {end_year_actual}")
if has_gap:
gap_str = str(missing_years[:3])[1:-1]
if len(missing_years) > 3:
gap_str += "..."
reasons.append(f"gap:{gap_str}")
if (self.end_year - start_year) < 4:
reasons.append(f"span={self.end_year - start_year}")
removed_combinations.append({
'country_name' : country_name,
'indicator_name': indicator_name,
'reasons' : ", ".join(reasons)
})
self.logger.info(f"\n [+] Valid: {len(valid_combinations):,}")
self.logger.info(f" [-] Removed: {len(removed_combinations):,}")
df_valid = pd.DataFrame(valid_combinations)
df_valid['key'] = df_valid['country_id'].astype(str) + '_' + df_valid['indicator_id'].astype(str)
self.df_clean['key'] = (self.df_clean['country_id'].astype(str) + '_' +
self.df_clean['indicator_id'].astype(str))
original_count = len(self.df_clean)
self.df_clean = self.df_clean[self.df_clean['key'].isin(df_valid['key'])].copy()
self.df_clean = self.df_clean.drop('key', axis=1)
self.logger.info(f"\n Rows before: {original_count:,}")
self.logger.info(f" Rows after: {len(self.df_clean):,}")
self.logger.info(f" Countries: {self.df_clean['country_id'].nunique()}")
self.logger.info(f" Indicators: {self.df_clean['indicator_id'].nunique()}")
return self.df_clean
def select_countries_with_all_pillars(self):
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 4: SELECT COUNTRIES WITH ALL PILLARS (FIXED SET)")
self.logger.info("=" * 80)
total_pillars = self.df_clean['pillar_id'].nunique()
country_pillar_count = self.df_clean.groupby(['country_id', 'country_name']).agg({
'pillar_id' : 'nunique',
'indicator_id': 'nunique',
'year' : lambda x: f"{int(x.min())}-{int(x.max())}"
}).reset_index()
country_pillar_count.columns = [
'country_id', 'country_name', 'pillar_count', 'indicator_count', 'year_range'
]
for _, row in country_pillar_count.sort_values('pillar_count', ascending=False).iterrows():
status = "[+] KEEP" if row['pillar_count'] == total_pillars else "[-] REMOVE"
self.logger.info(
f" {status:<12} {row['country_name']:25s} "
f"{row['pillar_count']}/{total_pillars} pillars"
)
selected_countries = country_pillar_count[country_pillar_count['pillar_count'] == total_pillars]
self.selected_country_ids = selected_countries['country_id'].tolist()
self.logger.info(f"\n FIXED SET: {len(self.selected_country_ids)} countries")
original_count = len(self.df_clean)
self.df_clean = self.df_clean[self.df_clean['country_id'].isin(self.selected_country_ids)].copy()
self.logger.info(f" Rows before: {original_count:,}")
self.logger.info(f" Rows after: {len(self.df_clean):,}")
return self.df_clean
def filter_indicators_consistent_across_fixed_countries(self):
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 5: FILTER INDICATORS WITH CONSISTENT PRESENCE")
self.logger.info("=" * 80)
indicator_country_start = self.df_clean.groupby([
'indicator_id', 'indicator_name', 'country_id'
])['year'].min().reset_index()
indicator_country_start.columns = ['indicator_id', 'indicator_name', 'country_id', 'start_year']
indicator_max_start = indicator_country_start.groupby([
'indicator_id', 'indicator_name'
])['start_year'].max().reset_index()
indicator_max_start.columns = ['indicator_id', 'indicator_name', 'max_start_year']
valid_indicators = []
removed_indicators = []
for _, ind_row in indicator_max_start.iterrows():
indicator_id = ind_row['indicator_id']
indicator_name = ind_row['indicator_name']
max_start = int(ind_row['max_start_year'])
span = self.end_year - max_start
if span < 4:
removed_indicators.append({
'indicator_name': indicator_name,
'reason' : f"span={span} < 4"
})
continue
expected_years = list(range(max_start, self.end_year + 1))
ind_data = self.df_clean[self.df_clean['indicator_id'] == indicator_id]
all_years_complete = True
problematic_years = []
for year in expected_years:
country_count = ind_data[ind_data['year'] == year]['country_id'].nunique()
if country_count < len(self.selected_country_ids):
all_years_complete = False
problematic_years.append(f"{int(year)}({country_count})")
if all_years_complete:
valid_indicators.append(indicator_id)
else:
removed_indicators.append({
'indicator_name': indicator_name,
'reason' : f"missing countries in years: {', '.join(problematic_years[:5])}"
})
self.logger.info(f"\n [+] Valid: {len(valid_indicators)}")
self.logger.info(f" [-] Removed: {len(removed_indicators)}")
if not valid_indicators:
raise ValueError("No valid indicators found after filtering!")
original_count = len(self.df_clean)
self.df_clean = self.df_clean[self.df_clean['indicator_id'].isin(valid_indicators)].copy()
self.df_clean = self.df_clean.merge(
indicator_max_start[['indicator_id', 'max_start_year']], on='indicator_id', how='left'
)
self.df_clean = self.df_clean[self.df_clean['year'] >= self.df_clean['max_start_year']].copy()
self.df_clean = self.df_clean.drop('max_start_year', axis=1)
self.logger.info(f"\n Rows before: {original_count:,}")
self.logger.info(f" Rows after: {len(self.df_clean):,}")
self.logger.info(f" Countries: {self.df_clean['country_id'].nunique()}")
self.logger.info(f" Indicators: {self.df_clean['indicator_id'].nunique()}")
self.logger.info(f" Pillars: {self.df_clean['pillar_id'].nunique()}")
return self.df_clean
def verify_no_gaps(self):
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 6: VERIFY NO GAPS")
self.logger.info("=" * 80)
expected_countries = len(self.selected_country_ids)
verification = self.df_clean.groupby(['indicator_id', 'year'])['country_id'].nunique().reset_index()
verification.columns = ['indicator_id', 'year', 'country_count']
all_good = (verification['country_count'] == expected_countries).all()
if all_good:
self.logger.info(f" VERIFICATION PASSED — all combinations have {expected_countries} countries")
else:
bad = verification[verification['country_count'] != expected_countries]
for _, row in bad.head(10).iterrows():
self.logger.error(
f" Indicator {int(row['indicator_id'])}, Year {int(row['year'])}: "
f"{int(row['country_count'])} countries (expected {expected_countries})"
)
raise ValueError("Gap verification failed!")
return True
def analyze_indicator_availability_by_year(self):
self.logger.info("\n" + "=" * 80)
self.logger.info("STEP 7: ANALYZE INDICATOR AVAILABILITY BY YEAR")
self.logger.info("=" * 80)
year_stats = self.df_clean.groupby('year').agg({
'indicator_id': 'nunique',
'country_id' : 'nunique'
}).reset_index()
year_stats.columns = ['year', 'indicator_count', 'country_count']
self.logger.info(f"\n{'Year':<8} {'Indicators':<15} {'Countries':<12} {'Rows'}")
self.logger.info("-" * 50)
for _, row in year_stats.iterrows():
year = int(row['year'])
row_count = len(self.df_clean[self.df_clean['year'] == year])
self.logger.info(
f"{year:<8} {int(row['indicator_count']):<15} "
f"{int(row['country_count']):<12} {row_count:,}"
)
indicator_details = self.df_clean.groupby([
'indicator_id', 'indicator_name', 'pillar_name', 'direction'
]).agg({'year': ['min', 'max'], 'country_id': 'nunique'}).reset_index()
indicator_details.columns = [
'indicator_id', 'indicator_name', 'pillar_name', 'direction',
'start_year', 'end_year', 'country_count'
]
indicator_details['year_range'] = (
indicator_details['start_year'].astype(int).astype(str) + '-' +
indicator_details['end_year'].astype(int).astype(str)
)
indicator_details = indicator_details.sort_values(['pillar_name', 'start_year', 'indicator_name'])
self.logger.info(f"\nTotal Indicators: {len(indicator_details)}")
for pillar, count in indicator_details.groupby('pillar_name').size().items():
self.logger.info(f" {pillar}: {count} indicators")
self.logger.info(f"\n{'-'*100}")
self.logger.info(f"{'ID':<5} {'Indicator Name':<55} {'Pillar':<15} {'Years':<12} {'Dir':<8} {'Countries'}")
self.logger.info(f"{'-'*100}")
for _, row in indicator_details.iterrows():
direction = 'higher+' if row['direction'] == 'higher_better' else 'lower-'
self.logger.info(
f"{int(row['indicator_id']):<5} {row['indicator_name'][:52]:<55} "
f"{row['pillar_name'][:13]:<15} {row['year_range']:<12} "
f"{direction:<8} {int(row['country_count'])}"
)
return year_stats
def save_analytical_table(self):
table_name = 'analytical_food_security'
self.logger.info("\n" + "=" * 80)
self.logger.info(f"STEP 8: SAVE TO [DW/Gold] {table_name} -> fs_asean_gold")
self.logger.info("=" * 80)
try:
analytical_df = self.df_clean[[
'country_id', 'indicator_id', 'pillar_id', 'time_id', 'value'
]].copy()
analytical_df = analytical_df.sort_values(
['time_id', 'country_id', 'indicator_id']
).reset_index(drop=True)
analytical_df['country_id'] = analytical_df['country_id'].astype(int)
analytical_df['indicator_id'] = analytical_df['indicator_id'].astype(int)
analytical_df['pillar_id'] = analytical_df['pillar_id'].astype(int)
analytical_df['time_id'] = analytical_df['time_id'].astype(int)
analytical_df['value'] = analytical_df['value'].astype(float)
self.logger.info(f" Total rows: {len(analytical_df):,}")
schema = [
bigquery.SchemaField("country_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("indicator_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("pillar_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("time_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("value", "FLOAT", mode="REQUIRED"),
]
rows_loaded = load_to_bigquery(
self.client, analytical_df, table_name,
layer='gold', write_disposition="WRITE_TRUNCATE", schema=schema
)
self.pipeline_metadata['rows_loaded'] = rows_loaded
log_update(self.client, 'DW', table_name, 'full_load', rows_loaded)
metadata = {
'source_class' : self.__class__.__name__,
'table_name' : table_name,
'execution_timestamp': self.pipeline_start,
'duration_seconds' : (datetime.now() - self.pipeline_start).total_seconds(),
'rows_fetched' : self.pipeline_metadata['rows_fetched'],
'rows_transformed' : rows_loaded,
'rows_loaded' : rows_loaded,
'completeness_pct' : 100.0,
'config_snapshot' : json.dumps({
'start_year' : self.start_year,
'end_year' : self.end_year,
'fixed_countries': len(self.selected_country_ids),
'no_gaps' : True,
'layer' : 'gold'
}),
'validation_metrics' : json.dumps({
'fixed_countries' : len(self.selected_country_ids),
'total_indicators': int(self.df_clean['indicator_id'].nunique())
})
}
save_etl_metadata(self.client, metadata)
self.logger.info(f"{table_name}: {rows_loaded:,} rows → [DW/Gold] fs_asean_gold")
self.logger.info(f" Metadata → [AUDIT] etl_metadata")
return rows_loaded
except Exception as e:
self.logger.error(f"Error saving: {e}")
raise
def run(self):
self.pipeline_start = datetime.now()
self.pipeline_metadata['start_time'] = self.pipeline_start
self.logger.info("\n" + "=" * 80)
self.logger.info("Output: analytical_food_security → fs_asean_gold")
self.logger.info("=" * 80)
self.load_source_data()
self.determine_year_boundaries()
self.filter_complete_indicators_per_country()
self.select_countries_with_all_pillars()
self.filter_indicators_consistent_across_fixed_countries()
self.verify_no_gaps()
self.analyze_indicator_availability_by_year()
self.save_analytical_table()
self.pipeline_end = datetime.now()
duration = (self.pipeline_end - self.pipeline_start).total_seconds()
self.logger.info("\n" + "=" * 80)
self.logger.info("COMPLETED")
self.logger.info("=" * 80)
self.logger.info(f" Duration : {duration:.2f}s")
self.logger.info(f" Year Range : {self.start_year}-{self.end_year}")
self.logger.info(f" Countries : {len(self.selected_country_ids)}")
self.logger.info(f" Indicators : {self.df_clean['indicator_id'].nunique()}")
self.logger.info(f" Rows Loaded: {self.pipeline_metadata['rows_loaded']:,}")
# =============================================================================
# AIRFLOW TASK FUNCTION
# =============================================================================
def run_analytical_layer():
"""
Airflow task: Build analytical_food_security dari fact_food_security + dims.
Dipanggil setelah dimensional_model_to_gold selesai.
"""
from scripts.bigquery_config import get_bigquery_client
client = get_bigquery_client()
loader = AnalyticalLayerLoader(client)
loader.run()
print(f"Analytical layer loaded: {loader.pipeline_metadata['rows_loaded']:,} rows")
# =============================================================================
# MAIN EXECUTION
# =============================================================================
if __name__ == "__main__":
print("=" * 80)
print("Output: analytical_food_security → fs_asean_gold")
print("=" * 80)
logger = setup_logging()
client = get_bigquery_client()
loader = AnalyticalLayerLoader(client)
loader.run()
print("\n" + "=" * 80)
print("[OK] COMPLETED")
print("=" * 80)

View File

@@ -0,0 +1,581 @@
"""
BIGQUERY CLEANED LAYER ETL
Kimball Data Warehouse Architecture
Kimball ETL Flow yang dijalankan file ini:
Input : STAGING layer (Silver) — staging_integrated (fs_asean_silver)
Output : STAGING layer (Silver) — cleaned_integrated (fs_asean_silver)
Audit : AUDIT layer — etl_logs, etl_metadata (fs_asean_audit)
Classes:
CleanedDataLoader — Cleaning, enrichment, & load ke Silver layer
Usage:
python bigquery_cleaned_layer.py
"""
import pandas as pd
import numpy as np
from datetime import datetime
import logging
from typing import Dict
import json
from scripts.bigquery_config import get_bigquery_client, CONFIG, get_table_id
from scripts.bigquery_helpers import (
log_update,
load_to_bigquery,
read_from_bigquery,
setup_logging,
save_etl_metadata,
)
from google.cloud import bigquery
# =============================================================================
# LOAD STAGING DATA
# =============================================================================
def load_staging_data(client: bigquery.Client) -> pd.DataFrame:
"""Load data dari staging_integrated (STAGING/Silver layer)."""
print("\nLoading data from staging_integrated (fs_asean_silver)...")
df_staging = read_from_bigquery(client, 'staging_integrated', layer='silver')
print(f" ✓ Loaded : {len(df_staging):,} rows")
print(f" Columns : {len(df_staging.columns)}")
print(f" Sources : {df_staging['source'].nunique()}")
print(f" Indicators : {df_staging['indicator_standardized'].nunique()}")
print(f" Countries : {df_staging['country'].nunique()}")
print(f" Year range : {int(df_staging['year'].min())}-{int(df_staging['year'].max())}")
return df_staging
# =============================================================================
# COLUMN CONSTRAINT HELPERS
# =============================================================================
# Schema constraints — semua varchar max lengths
COLUMN_CONSTRAINTS = {
'source' : 20,
'indicator_original' : 255,
'indicator_standardized': 255,
'country' : 100,
'year_range' : 20,
'unit' : 20,
'pillar' : 20,
'direction' : 15, # 'higher_better'=13, 'lower_better'=12
}
def truncate_string(value, max_length: int) -> str:
"""Truncate string ke max_length, return as-is jika None/NaN."""
if pd.isna(value):
return value
value_str = str(value)
return value_str[:max_length] if len(value_str) > max_length else value_str
def apply_column_constraints(df: pd.DataFrame) -> pd.DataFrame:
"""
Apply column length constraints sesuai schema tabel.
Melaporkan kolom mana yang dipotong dan contohnya.
"""
df_constrained = df.copy()
truncation_report = {}
for column, max_length in COLUMN_CONSTRAINTS.items():
if column not in df_constrained.columns:
continue
mask = (
df_constrained[column].notna() &
(df_constrained[column].astype(str).str.len() > max_length)
)
truncated_count = mask.sum()
if truncated_count > 0:
truncation_report[column] = {
'count' : int(truncated_count),
'max_length': max_length,
'examples' : df_constrained[mask][column].head(3).tolist()
}
df_constrained[column] = df_constrained[column].apply(
lambda x: truncate_string(x, max_length)
)
if truncation_report:
print("\n ⚠ Column Truncations Applied:")
for column, info in truncation_report.items():
print(f" - {column}: {info['count']} values truncated to {info['max_length']} chars")
else:
print("\n ✓ No truncations needed — all values within constraints")
return df_constrained
# =============================================================================
# COUNTRY NAME STANDARDIZATION
# =============================================================================
ASEAN_MAPPING = {
'BRN' : 'Brunei Darussalam',
'BRUNEI' : 'Brunei Darussalam',
'BRUNEI DARUSSALAM' : 'Brunei Darussalam',
'KHM' : 'Cambodia',
'CAMBODIA' : 'Cambodia',
'IDN' : 'Indonesia',
'INDONESIA' : 'Indonesia',
'LAO' : 'Laos',
'LAOS' : 'Laos',
"LAO PEOPLE'S DEMOCRATIC REPUBLIC" : 'Laos',
'LAO PDR' : 'Laos',
'MYS' : 'Malaysia',
'MALAYSIA' : 'Malaysia',
'MMR' : 'Myanmar',
'MYANMAR' : 'Myanmar',
'BURMA' : 'Myanmar',
'PHL' : 'Philippines',
'PHILIPPINES' : 'Philippines',
'SGP' : 'Singapore',
'SINGAPORE' : 'Singapore',
'THA' : 'Thailand',
'THAILAND' : 'Thailand',
'VNM' : 'Vietnam',
'VIETNAM' : 'Vietnam',
'VIET NAM' : 'Vietnam',
}
def standardize_country_names_asean(df: pd.DataFrame, country_column: str = 'country') -> tuple:
"""
Standardize country names untuk ASEAN.
Ensures country names within varchar(100) constraint.
Returns:
tuple: (df_clean, report_dict)
"""
df_clean = df.copy()
def map_country(country):
if pd.isna(country):
return country
s = str(country).strip()
mapped = ASEAN_MAPPING.get(s.upper(), s)
return mapped[:100] if len(mapped) > 100 else mapped
original = df_clean[country_column].copy()
df_clean[country_column] = df_clean[country_column].apply(map_country)
changes = {orig: new for orig, new in zip(original, df_clean[country_column]) if orig != new}
return df_clean, {
'countries_mapped': len(set(changes.keys())),
'changes' : changes,
}
# =============================================================================
# PILLAR CLASSIFICATION
# =============================================================================
def assign_pillar(indicator_name: str) -> str:
"""
Assign pillar berdasarkan keyword indikator.
Return values: 'Availability', 'Access', 'Utilization', 'Stability', 'Other'
All ≤ 20 chars (varchar(20) constraint).
"""
if pd.isna(indicator_name):
return 'Other'
ind = str(indicator_name).lower()
for kw in ['requirement', 'coefficient', 'losses', 'fat supply']:
if kw in ind:
return 'Other'
if any(kw in ind for kw in [
'adequacy', 'protein supply', 'supply of protein',
'dietary energy supply', 'share of dietary energy', 'derived from cereals'
]):
return 'Availability'
if any(kw in ind for kw in [
'variability', 'cereal import dependency', 'arable land equipped',
'political stability', 'value of food imports in total'
]):
return 'Stability'
if any(kw in ind for kw in [
'gdp', 'gross domestic product', 'rail lines', 'road density',
'number of moderately', 'number of severely',
'number of people undernourished', 'prevalence of moderate',
'prevalence of severe', 'prevalence of undernourishment', 'food insecure'
]):
return 'Access'
if any(kw in ind for kw in [
'wasting', 'wasted', 'stunted', 'overweight', 'obese', 'obesity',
'anemia', 'birthweight', 'breastfeeding', 'drinking water', 'sanitation',
'children under 5', 'newborns with low', 'women of reproductive'
]):
return 'Utilization'
return 'Other'
# =============================================================================
# DIRECTION CLASSIFICATION
# =============================================================================
def assign_direction(indicator_name: str) -> str:
"""
Assign direction berdasarkan indikator.
Return values: 'higher_better' (13 chars) atau 'lower_better' (12 chars)
Both ≤ 15 chars (varchar(15) constraint).
"""
if pd.isna(indicator_name):
return 'higher_better'
ind = str(indicator_name).lower()
# Spesifik lower_better
if 'share of dietary energy supply derived from cereals' in ind:
return 'lower_better'
# Higher_better exceptions — cek sebelum lower_better keywords
for kw in [
'exclusive breastfeeding',
'dietary energy supply',
'dietary energy supply adequacy',
'average fat supply',
'average protein supply',
'supply of protein of animal origin',
]:
if kw in ind:
return 'higher_better'
# Lower_better — masalah yang harus diminimalkan
for kw in [
'prevalence of undernourishment',
'prevalence of severe food insecurity',
'prevalence of moderate or severe food insecurity',
'prevalence of moderate food insecurity',
'prevalence of wasting',
'prevalence of stunting',
'prevalence of overweight',
'prevalence of obesity',
'prevalence of anemia',
'prevalence of low birthweight',
'number of people undernourished',
'number of severely food insecure',
'number of moderately or severely food insecure',
'number of children under 5 years affected by wasting',
'number of children under 5 years of age who are overweight',
'number of children under 5 years of age who are stunted',
'number of newborns with low birthweight',
'number of obese adults',
'number of women of reproductive age',
'percentage of children under 5 years affected by wasting',
'percentage of children under 5 years of age who are overweight',
'percentage of children under 5 years of age who are stunted',
'cereal import dependency',
'import dependency',
'value of food imports in total merchandise exports',
'value of food imports',
'variability of food production',
'variability of food supply',
'per capita food production variability',
'per capita food supply variability',
'coefficient of variation',
'incidence of caloric losses',
'food losses',
]:
if kw in ind:
return 'lower_better'
return 'higher_better'
# =============================================================================
# CLEANED DATA LOADER
# =============================================================================
class CleanedDataLoader:
"""
Loader untuk cleaned integrated data ke STAGING layer (Silver).
Kimball context:
Input : staging_integrated → STAGING (Silver) — fs_asean_silver
Output : cleaned_integrated → STAGING (Silver) — fs_asean_silver
Audit : etl_logs, etl_metadata → AUDIT — fs_asean_audit
Pipeline steps:
1. Standardize country names (ASEAN)
2. Remove missing values
3. Remove duplicates
4. Add pillar classification
5. Add direction classification
6. Apply column constraints
7. Load ke BigQuery
8. Log ke Audit layer
"""
SCHEMA = [
bigquery.SchemaField("source", "STRING", mode="REQUIRED"),
bigquery.SchemaField("indicator_original", "STRING", mode="REQUIRED"),
bigquery.SchemaField("indicator_standardized", "STRING", mode="REQUIRED"),
bigquery.SchemaField("country", "STRING", mode="REQUIRED"),
bigquery.SchemaField("year", "INTEGER", mode="NULLABLE"),
bigquery.SchemaField("year_range", "STRING", mode="NULLABLE"),
bigquery.SchemaField("value", "FLOAT", mode="NULLABLE"),
bigquery.SchemaField("unit", "STRING", mode="NULLABLE"),
bigquery.SchemaField("pillar", "STRING", mode="REQUIRED"),
bigquery.SchemaField("direction", "STRING", mode="REQUIRED"),
]
def __init__(self, client: bigquery.Client, load_mode: str = 'full_refresh'):
self.client = client
self.load_mode = load_mode
self.logger = logging.getLogger(self.__class__.__name__)
self.logger.propagate = False
self.table_name = 'cleaned_integrated'
self.target_layer = 'silver'
self.metadata = {
'source_class' : self.__class__.__name__,
'table_name' : self.table_name,
'start_time' : None,
'end_time' : None,
'duration_seconds' : None,
'rows_fetched' : 0,
'rows_transformed' : 0,
'rows_loaded' : 0,
'load_mode' : load_mode,
'validation_metrics': {}
}
# ------------------------------------------------------------------
# STEP METHODS
# ------------------------------------------------------------------
def _step_standardize_countries(self, df: pd.DataFrame) -> pd.DataFrame:
print("\n [Step 1/5] Standardize country names...")
df, report = standardize_country_names_asean(df, country_column='country')
print(f" ✓ ASEAN countries mapped : {report['countries_mapped']}")
unique_countries = sorted(df['country'].unique())
print(f" Countries ({len(unique_countries)}) : {', '.join(unique_countries)}")
log_update(self.client, 'STAGING', 'staging_integrated',
'standardize_asean', report['countries_mapped'])
return df
def _step_remove_missing(self, df: pd.DataFrame) -> pd.DataFrame:
print("\n [Step 2/5] Remove missing values...")
rows_before = len(df)
df_clean = df.dropna(subset=list(df.columns))
rows_after = len(df_clean)
removed = rows_before - rows_after
print(f" Rows before : {rows_before:,}")
print(f" Rows after : {rows_after:,}")
print(f" Rows removed : {removed:,} ({removed/rows_before*100:.1f}%)")
print(f" Retention : {rows_after/rows_before*100:.1f}%")
return df_clean
def _step_remove_duplicates(self, df: pd.DataFrame) -> pd.DataFrame:
print("\n [Step 3/5] Remove duplicates...")
exact_dups = df.duplicated().sum()
data_dups = df.duplicated(subset=['indicator_standardized', 'country', 'year', 'value']).sum()
print(f" Exact duplicates : {exact_dups:,}")
print(f" Data duplicates : {data_dups:,}")
rows_before = len(df)
df_clean = df.drop_duplicates(
subset=['indicator_standardized', 'country', 'year'], keep='first'
)
removed = rows_before - len(df_clean)
print(f" Rows removed : {removed:,} ({removed/rows_before*100:.1f}%)")
return df_clean
def _step_add_classifications(self, df: pd.DataFrame) -> pd.DataFrame:
print("\n [Step 4/5] Add pillar & direction classification...")
df = df.copy()
df['pillar'] = df['indicator_standardized'].apply(assign_pillar)
df['direction'] = df['indicator_standardized'].apply(assign_direction)
pillar_counts = df['pillar'].value_counts()
print(f" ✓ Pillar distribution:")
for pillar, count in pillar_counts.items():
print(f" - {pillar}: {count:,}")
direction_counts = df['direction'].value_counts()
print(f" ✓ Direction distribution:")
for direction, count in direction_counts.items():
pct = count / len(df) * 100
print(f" - {direction}: {count:,} ({pct:.1f}%)")
return df
def _step_apply_constraints(self, df: pd.DataFrame) -> pd.DataFrame:
print("\n [Step 5/5] Apply column constraints...")
return apply_column_constraints(df)
# ------------------------------------------------------------------
# VALIDATION
# ------------------------------------------------------------------
def validate_data(self, df: pd.DataFrame) -> Dict:
validation = {
'total_rows' : int(len(df)),
'total_columns' : int(len(df.columns)),
'duplicate_count' : int(df.duplicated().sum()),
'completeness_pct': float(round((1 - df.isnull().sum().sum() / df.size) * 100, 2)),
'memory_usage_mb' : float(round(df.memory_usage(deep=True).sum() / 1024**2, 2))
}
if 'year' in df.columns:
validation['year_range'] = {
'min' : int(df['year'].min()) if not df['year'].isnull().all() else None,
'max' : int(df['year'].max()) if not df['year'].isnull().all() else None,
'unique_years': int(df['year'].nunique())
}
for col in ('pillar', 'direction', 'source'):
if col in df.columns:
validation[f'{col}_breakdown'] = {
str(k): int(v) for k, v in df[col].value_counts().to_dict().items()
}
if 'indicator_standardized' in df.columns:
validation['unique_indicators'] = int(df['indicator_standardized'].nunique())
if 'country' in df.columns:
validation['unique_countries'] = int(df['country'].nunique())
# Column length check
column_length_check = {}
for col, max_len in COLUMN_CONSTRAINTS.items():
if col in df.columns:
max_actual = df[col].astype(str).str.len().max()
column_length_check[col] = {
'max_length_constraint': max_len,
'max_actual_length' : int(max_actual),
'within_limit' : bool(max_actual <= max_len)
}
validation['column_length_check'] = column_length_check
return validation
# ------------------------------------------------------------------
# RUN
# ------------------------------------------------------------------
def run(self, df: pd.DataFrame) -> int:
"""
Execute full cleaning pipeline → load ke STAGING (Silver).
Returns:
int: Rows loaded
"""
self.metadata['start_time'] = datetime.now()
self.metadata['rows_fetched'] = len(df)
if df.empty:
print(" ERROR: DataFrame is empty, nothing to process.")
return 0
# Pipeline steps
df = self._step_standardize_countries(df)
df = self._step_remove_missing(df)
df = self._step_remove_duplicates(df)
df = self._step_add_classifications(df)
df = self._step_apply_constraints(df)
self.metadata['rows_transformed'] = len(df)
# Validate
validation = self.validate_data(df)
self.metadata['validation_metrics'] = validation
all_within_limits = all(
info['within_limit']
for info in validation.get('column_length_check', {}).values()
)
if not all_within_limits:
print("\n ⚠ WARNING: Some columns still exceed length constraints!")
for col, info in validation['column_length_check'].items():
if not info['within_limit']:
print(f" - {col}: {info['max_actual_length']} > {info['max_length_constraint']}")
# Load ke Silver
print(f"\n Loading to [STAGING/Silver] {self.table_name} → fs_asean_silver...")
rows_loaded = load_to_bigquery(
self.client, df, self.table_name,
layer='silver',
write_disposition="WRITE_TRUNCATE",
schema=self.SCHEMA
)
self.metadata['rows_loaded'] = rows_loaded
# Audit logs
log_update(self.client, 'STAGING', self.table_name, 'full_refresh', rows_loaded)
# ETL metadata
self.metadata['end_time'] = datetime.now()
self.metadata['duration_seconds'] = (
self.metadata['end_time'] - self.metadata['start_time']
).total_seconds()
self.metadata['execution_timestamp'] = self.metadata['start_time']
self.metadata['completeness_pct'] = validation.get('completeness_pct', 0)
self.metadata['config_snapshot'] = json.dumps({'load_mode': self.load_mode})
self.metadata['validation_metrics'] = json.dumps(validation)
save_etl_metadata(self.client, self.metadata)
# Summary
print(f"\n ✓ Cleaned Integration completed: {rows_loaded:,} rows")
print(f" Duration : {self.metadata['duration_seconds']:.2f}s")
print(f" Completeness : {validation['completeness_pct']:.2f}%")
if 'year_range' in validation:
yr = validation['year_range']
if yr['min'] and yr['max']:
print(f" Year range : {yr['min']}{yr['max']}")
print(f" Indicators : {validation.get('unique_indicators', '-')}")
print(f" Countries : {validation.get('unique_countries', '-')}")
print(f"\n Schema Validation:")
for col, info in validation.get('column_length_check', {}).items():
status = "" if info['within_limit'] else ""
print(f" {status} {col}: {info['max_actual_length']}/{info['max_length_constraint']}")
print(f"\n Metadata → [AUDIT] etl_metadata")
return rows_loaded
# =============================================================================
# AIRFLOW TASK FUNCTIONS ← sama polanya dengan raw layer
# =============================================================================
def run_cleaned_integration():
"""
Airflow task: Load cleaned_integrated dari staging_integrated.
Dipanggil oleh DAG setelah task staging_integration_to_silver selesai.
"""
from scripts.bigquery_config import get_bigquery_client
client = get_bigquery_client()
df_staging = load_staging_data(client)
loader = CleanedDataLoader(client, load_mode='full_refresh')
rows = loader.run(df_staging)
print(f"Cleaned layer loaded: {rows:,} rows")
# =============================================================================
# MAIN EXECUTION
# =============================================================================
if __name__ == "__main__":
print("=" * 60)
print("BIGQUERY CLEANED LAYER ETL")
print("Kimball DW Architecture")
print(" Input : STAGING (Silver) → staging_integrated")
print(" Output : STAGING (Silver) → cleaned_integrated")
print(" Audit : AUDIT → etl_logs, etl_metadata")
print("=" * 60)
logger = setup_logging()
client = get_bigquery_client()
df_staging = load_staging_data(client)
print("\n[1/1] Cleaned Integration → STAGING (Silver)...")
loader = CleanedDataLoader(client, load_mode='full_refresh')
final_count = loader.run(df_staging)
print("\n" + "=" * 60)
print("✓ CLEANED LAYER ETL COMPLETED")
print(f" 🥈 STAGING (Silver) : cleaned_integrated ({final_count:,} rows)")
print(f" 📋 AUDIT : etl_logs, etl_metadata")
print("=" * 60)

View File

@@ -1,6 +1,6 @@
"""
BIGQUERY CONFIGURATION FOR FOOD SECURITY DATA INTEGRATION
Kimball Data Warehouse Architecture
Kimball Data Warehouse Architecturejson
Dataset Naming:
- Bronze (fs_asean_bronze) : Raw layer — data as-is dari sumber
@@ -21,6 +21,7 @@ Kimball ETL Flow:
"""
import os
import json
from pathlib import Path
from google.cloud import bigquery
from google.oauth2 import service_account
@@ -28,7 +29,7 @@ from google.oauth2 import service_account
# BIGQUERY CONFIGURATION
CREDENTIALS_PATH = os.environ.get(
"GOOGLE_APPLICATION_CREDENTIALS",
"/opt/airflow/secrets/food-security-asean-project-826a4d7b302a.json"
"/opt/airflow/secrets/food-security-asean-project-3c22d9247bcb.json"
)
PROJECT_ID = "food-security-asean-project"
LOCATION = "asia-southeast2"
@@ -88,25 +89,6 @@ KIMBALL_LAYER_MAP = {
"dw" : "gold",
}
# SETUP BIGQUERY CLIENT
def get_bigquery_client() -> bigquery.Client:
"""
Create BigQuery client dengan service account credentials
Returns:
bigquery.Client: Authenticated BigQuery client
"""
credentials = service_account.Credentials.from_service_account_file(
CREDENTIALS_PATH,
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
return bigquery.Client(
credentials=credentials,
project=PROJECT_ID,
location=LOCATION
)
# MATCHING CONFIGURATION
CONFIG = {
@@ -166,7 +148,6 @@ for directory in [EXPORTS_DIR, LOGS_DIR]:
# HELPER FUNCTIONS
def get_table_id(table_name: str, layer: str = "bronze") -> str:
# Resolve Kimball alias ke layer name
resolved = KIMBALL_LAYER_MAP.get(layer.lower(), layer.lower())
dataset = LAYER_DATASET_MAP.get(resolved, DATASET_BRONZE)
@@ -174,17 +155,6 @@ def get_table_id(table_name: str, layer: str = "bronze") -> str:
def table_exists(client: bigquery.Client, table_name: str, layer: str = "bronze") -> bool:
"""
Check apakah table ada di BigQuery
Args:
client : BigQuery client
table_name : Nama table
layer : Layer — 'bronze'/'raw', 'silver'/'staging', 'gold'/'dw'
Returns:
bool: True jika table ada
"""
try:
client.get_table(get_table_id(table_name, layer))
return True
@@ -193,14 +163,6 @@ def table_exists(client: bigquery.Client, table_name: str, layer: str = "bronze"
def delete_table(client: bigquery.Client, table_name: str, layer: str = "bronze"):
"""
Delete table jika ada
Args:
client : BigQuery client
table_name : Nama table
layer : Layer — 'bronze'/'raw', 'silver'/'staging', 'gold'/'dw'
"""
table_id = get_table_id(table_name, layer)
try:
client.delete_table(table_id, not_found_ok=True)
@@ -210,13 +172,6 @@ def delete_table(client: bigquery.Client, table_name: str, layer: str = "bronze"
def create_dataset_if_not_exists(client: bigquery.Client, dataset_id: str):
"""
Create dataset jika belum ada
Args:
client : BigQuery client
dataset_id : Dataset ID string
"""
full_id = f"{PROJECT_ID}.{dataset_id}"
try:
client.get_dataset(full_id)
@@ -229,7 +184,6 @@ def create_dataset_if_not_exists(client: bigquery.Client, dataset_id: str):
def create_all_datasets(client: bigquery.Client):
"""Create semua 3 dataset (Raw/Staging/DW) jika belum ada"""
print("Setting up BigQuery Datasets (Kimball DW)...")
for layer, dataset_id in LAYER_DATASET_MAP.items():
create_dataset_if_not_exists(client, dataset_id)
@@ -238,21 +192,14 @@ def create_all_datasets(client: bigquery.Client):
# VERIFICATION
def verify_setup() -> bool:
"""
Verify BigQuery setup untuk semua 3 layer (Raw / Staging / DW)
Checks:
1. Credentials file exists
2. Koneksi ke BigQuery berhasil
3. Semua dataset ada atau berhasil dibuat
"""
print("=" * 60)
print("BIGQUERY SETUP VERIFICATION")
print("Kimball DW Architecture")
print("=" * 60)
# 1. Credentials
if not os.path.exists(CREDENTIALS_PATH):
credentials_json = os.environ.get("GOOGLE_CREDENTIALS_JSON")
if not credentials_json and not os.path.exists(CREDENTIALS_PATH):
print(f"Credentials not found : {CREDENTIALS_PATH}")
return False
print(f"✓ Credentials found")
@@ -284,6 +231,7 @@ def verify_setup() -> bool:
print("=" * 60)
return True
# INITIALIZE ON IMPORT
if __name__ == "__main__":

View File

@@ -11,13 +11,6 @@ Subclass yang menggunakan DataSource:
FAODataSource → load ke RAW (Bronze) : raw_fao
WorldBankDataSource → load ke RAW (Bronze) : raw_worldbank
UNICEFDataSource → load ke RAW (Bronze) : raw_unicef
Changes from MySQL version:
1. Replace SQLAlchemy engine → BigQuery client
2. Replace to_sql() → load_table_from_dataframe()
3. load_to_database() default layer = 'bronze' (RAW layer)
4. log_update() menggunakan label 'RAW' sesuai Kimball terminology
5. save_metadata() → save_etl_metadata() ke STAGING layer (Silver)
"""
from abc import ABC, abstractmethod
@@ -27,8 +20,8 @@ from datetime import datetime
from typing import Dict
import json
from bigquery_config import get_bigquery_client, get_table_id, table_exists, CONFIG
from bigquery_helpers import log_update, load_to_bigquery, read_from_bigquery, save_etl_metadata
from scripts.bigquery_config import get_bigquery_client, get_table_id, table_exists, CONFIG
from scripts.bigquery_helpers import log_update, load_to_bigquery, read_from_bigquery, save_etl_metadata
from google.cloud import bigquery
@@ -42,7 +35,7 @@ class DataSource(ABC):
transform_data() → Transform ke format standar
validate_data() → Cek kualitas data
load_to_database() → Load ke RAW layer (Bronze)
save_metadata() → Simpan metadata ke STAGING layer (Silver)
save_metadata() → Simpan metadata ke AUDIT layer
Subclass wajib implement:
fetch_data()
@@ -50,22 +43,15 @@ class DataSource(ABC):
"""
def __init__(self, client: bigquery.Client = None):
"""
Initialize DataSource dengan BigQuery client.
Args:
client: BigQuery client (jika None, akan dibuat baru)
"""
self.client = client if client else get_bigquery_client()
self.logger = logging.getLogger(self.__class__.__name__)
self.logger.propagate = False
self.data = None
self.table_name = None
self.target_layer = "bronze" # RAW layer — default untuk semua data sources
self.target_layer = "bronze"
self.asean_countries = CONFIG['asean_countries']
# Metadata untuk tracking reproducibility (disimpan ke STAGING/Silver)
self.metadata = {
'source_class' : self.__class__.__name__,
'table_name' : None,
@@ -84,35 +70,13 @@ class DataSource(ABC):
@abstractmethod
def fetch_data(self) -> pd.DataFrame:
"""
Extract data mentah dari sumber eksternal.
WAJIB diimplementasikan oleh subclass.
"""
pass
@abstractmethod
def transform_data(self, df: pd.DataFrame) -> pd.DataFrame:
"""
Transform data ke format standar sebelum load ke RAW layer.
WAJIB diimplementasikan oleh subclass.
"""
pass
def validate_data(self, df: pd.DataFrame) -> Dict:
"""
Validasi kualitas data hasil transform sebelum load ke RAW layer.
Metrics yang dihitung:
total_rows, total_columns — dimensi data
null_count, null_percentage — kelengkapan per kolom
duplicate_count — duplikasi data
completeness_pct — persentase kelengkapan keseluruhan
memory_usage_mb — ukuran data di memori
year_range — rentang tahun (jika ada kolom year)
Returns:
Dict: Validation metrics
"""
validation = {
'total_rows' : int(len(df)),
'total_columns' : int(len(df.columns)),
@@ -126,7 +90,6 @@ class DataSource(ABC):
'memory_usage_mb' : float(round(df.memory_usage(deep=True).sum() / 1024**2, 2))
}
# Deteksi kolom year untuk year range info
year_cols = [col for col in df.columns if 'year' in col.lower() or 'tahun' in col.lower()]
if year_cols:
year_col = year_cols[0]
@@ -139,35 +102,18 @@ class DataSource(ABC):
return validation
def load_to_database(self, df: pd.DataFrame, table_name: str):
"""
Load data ke RAW layer (Bronze) dengan full refresh strategy.
Kimball context:
RAW layer adalah landing zone pertama untuk data mentah dari sumber.
Menggunakan WRITE_TRUNCATE (full refresh) karena data sumber
bisa berubah setiap kali pipeline dijalankan.
Args:
df : DataFrame hasil transform
table_name : Nama table tujuan di RAW layer (Bronze)
Audit:
Setiap load dicatat ke etl_logs di STAGING layer (Silver)
"""
try:
# Load ke RAW layer (Bronze) — full refresh
load_to_bigquery(
self.client,
df,
table_name,
layer='bronze', # RAW layer
write_disposition="WRITE_TRUNCATE" # Full refresh
layer='bronze',
write_disposition="WRITE_TRUNCATE"
)
# Audit log ke STAGING layer (Silver)
log_update(
self.client,
layer='RAW', # Label Kimball
layer='RAW',
table_name=table_name,
update_method='full_refresh',
rows_affected=len(df)
@@ -186,49 +132,20 @@ class DataSource(ABC):
raise
def save_metadata(self):
"""
Simpan metadata eksekusi ETL ke STAGING layer (Silver).
Kimball context:
ETL metadata (execution time, row counts, completeness, dll.)
disimpan di Staging layer sebagai operational/audit table,
bukan bagian dari Star Schema di DW layer.
Metadata yang disimpan:
source_class, table_name, execution_timestamp,
duration_seconds, rows_fetched/transformed/loaded,
completeness_pct, config_snapshot, validation_metrics
"""
try:
self.metadata['table_name'] = self.table_name
# Pastikan validation_metrics dalam format JSON string
if isinstance(self.metadata.get('validation_metrics'), dict):
self.metadata['validation_metrics'] = json.dumps(
self.metadata['validation_metrics']
)
# Save ke STAGING layer (Silver) via helper
save_etl_metadata(self.client, self.metadata)
except Exception as e:
# Silent fail — metadata tracking tidak boleh menghentikan proses ETL
self.logger.warning(f"Failed to save ETL metadata to STAGING: {str(e)}")
self.logger.warning(f"Failed to save ETL metadata to AUDIT: {str(e)}")
def run(self) -> pd.DataFrame:
"""
Jalankan full ETL pipeline: Extract → Transform → Validate → Load → Metadata.
Kimball ETL steps:
1. EXTRACT — fetch_data() : Ambil dari sumber eksternal
2. TRANSFORM — transform_data() : Standardize format
3. VALIDATE — validate_data() : Cek kualitas
4. LOAD — load_to_database() : Load ke RAW layer (Bronze)
5. METADATA — save_metadata() : Simpan ke STAGING layer (Silver)
Returns:
pd.DataFrame: Data yang sudah di-load ke RAW layer
"""
start_time = datetime.now()
self.metadata['execution_timestamp'] = start_time
@@ -254,7 +171,7 @@ class DataSource(ABC):
self.load_to_database(self.data, self.table_name)
self.metadata['rows_loaded'] = len(self.data)
# 5. METADATA → STAGING layer (Silver)
# 5. METADATA → AUDIT layer
end_time = datetime.now()
self.metadata['duration_seconds'] = (end_time - start_time).total_seconds()
self.save_metadata()
@@ -267,5 +184,5 @@ class DataSource(ABC):
print("DataSource base class loaded — Kimball DW Architecture")
print(" Default target layer : RAW (Bronze)")
print(" Audit logs : STAGING (Silver) via etl_logs")
print(" ETL metadata : STAGING (Silver) via etl_metadata")
print(" Audit logs : AUDIT via etl_logs")
print(" ETL metadata : AUDIT via etl_metadata")

View File

@@ -0,0 +1,850 @@
"""
BIGQUERY DIMENSIONAL MODEL LOAD
Kimball Data Warehouse Architecture
Kimball ETL Flow yang dijalankan file ini:
Input : STAGING layer (Silver) — cleaned_integrated (fs_asean_silver)
Output : DW layer (Gold) — dim_*, fact_* (fs_asean_gold)
Audit : AUDIT layer — etl_logs, etl_metadata (fs_asean_audit)
Classes:
DimensionalModelLoader — Build Star Schema & load ke Gold layer
Usage:
python bigquery_dimensional_model.py
"""
import pandas as pd
import numpy as np
from datetime import datetime
import logging
from typing import Dict, List
import json
import sys
from scripts.bigquery_config import get_bigquery_client, CONFIG, get_table_id
from scripts.bigquery_helpers import (
log_update,
load_to_bigquery,
read_from_bigquery,
setup_logging,
truncate_table,
save_etl_metadata,
)
from google.cloud import bigquery
if hasattr(sys.stdout, 'reconfigure'):
sys.stdout.reconfigure(encoding='utf-8')
# =============================================================================
# DIMENSIONAL MODEL LOADER
# =============================================================================
class DimensionalModelLoader:
"""
Loader untuk dimensional model ke DW layer (Gold) — fs_asean_gold.
Kimball context:
Input : cleaned_integrated → STAGING (Silver) — fs_asean_silver
Output : dim_* + fact_* → DW (Gold) — fs_asean_gold
Audit : etl_logs, etl_metadata → AUDIT — fs_asean_audit
Pipeline steps:
1. Load dim_country
2. Load dim_indicator
3. Load dim_time
4. Load dim_source
5. Load dim_pillar
6. Load fact_food_security (resolve FK dari Gold dims)
7. Validate constraints & data load
"""
def __init__(self, client: bigquery.Client, df_clean: pd.DataFrame):
self.client = client
self.df_clean = df_clean
self.logger = logging.getLogger(self.__class__.__name__)
self.logger.propagate = False
self.target_layer = 'gold'
self.load_metadata = {
'dim_country' : {'start_time': None, 'end_time': None, 'rows_loaded': 0, 'status': 'pending'},
'dim_indicator' : {'start_time': None, 'end_time': None, 'rows_loaded': 0, 'status': 'pending'},
'dim_time' : {'start_time': None, 'end_time': None, 'rows_loaded': 0, 'status': 'pending'},
'dim_source' : {'start_time': None, 'end_time': None, 'rows_loaded': 0, 'status': 'pending'},
'dim_pillar' : {'start_time': None, 'end_time': None, 'rows_loaded': 0, 'status': 'pending'},
'fact_food_security': {'start_time': None, 'end_time': None, 'rows_loaded': 0, 'status': 'pending'},
}
self.pipeline_metadata = {
'source_class' : self.__class__.__name__,
'start_time' : None,
'end_time' : None,
'duration_seconds' : None,
'rows_fetched' : 0,
'rows_transformed' : 0,
'rows_loaded' : 0,
'validation_metrics': {}
}
# ------------------------------------------------------------------
# CONSTRAINT HELPERS
# ------------------------------------------------------------------
def _add_primary_key(self, table_name: str, column_name: str):
table_id = get_table_id(table_name, layer='gold')
query = f"ALTER TABLE `{table_id}` ADD PRIMARY KEY ({column_name}) NOT ENFORCED"
try:
self.client.query(query).result()
self.logger.info(f" [OK] PRIMARY KEY: {table_name}({column_name})")
except Exception as e:
if "already exists" in str(e).lower():
self.logger.info(f" [INFO] PRIMARY KEY already exists: {table_name}({column_name})")
else:
self.logger.warning(f" [WARN] Could not add PRIMARY KEY to {table_name}.{column_name}: {e}")
def _add_foreign_key(self, table_name: str, fk_column: str,
ref_table: str, ref_column: str):
table_id = get_table_id(table_name, layer='gold')
ref_table_id = get_table_id(ref_table, layer='gold')
constraint_name = f"fk_{table_name}_{fk_column}"
query = f"""
ALTER TABLE `{table_id}`
ADD CONSTRAINT {constraint_name}
FOREIGN KEY ({fk_column})
REFERENCES `{ref_table_id}`({ref_column})
NOT ENFORCED
"""
try:
self.client.query(query).result()
self.logger.info(f" [OK] FK: {table_name}.{fk_column}{ref_table}.{ref_column}")
except Exception as e:
if "already exists" in str(e).lower():
self.logger.info(f" [INFO] FK already exists: {constraint_name}")
else:
self.logger.warning(f" [WARN] Could not add FK {constraint_name}: {e}")
# ------------------------------------------------------------------
# METADATA HELPER
# ------------------------------------------------------------------
def _save_table_metadata(self, table_name: str):
meta = self.load_metadata[table_name]
metadata = {
'source_class' : self.__class__.__name__,
'table_name' : table_name,
'execution_timestamp': meta['start_time'],
'duration_seconds' : (meta['end_time'] - meta['start_time']).total_seconds()
if meta['end_time'] else 0,
'rows_fetched' : 0,
'rows_transformed' : meta['rows_loaded'],
'rows_loaded' : meta['rows_loaded'],
'completeness_pct' : 100.0 if meta['status'] == 'success' else 0.0,
'config_snapshot' : json.dumps({'load_mode': 'full_refresh', 'layer': 'gold'}),
'validation_metrics' : json.dumps({'status': meta['status'], 'rows': meta['rows_loaded']})
}
try:
save_etl_metadata(self.client, metadata)
self.logger.info(f" Metadata → [AUDIT] etl_metadata")
except Exception as e:
self.logger.warning(f" [WARN] Could not save metadata for {table_name}: {e}")
# ------------------------------------------------------------------
# DIMENSION LOADERS
# ------------------------------------------------------------------
def load_dim_time(self):
table_name = 'dim_time'
self.load_metadata[table_name]['start_time'] = datetime.now()
self.logger.info("Loading dim_time → [DW/Gold] fs_asean_gold...")
try:
if 'year_range' in self.df_clean.columns:
dim_time = self.df_clean[['year', 'year_range']].drop_duplicates().copy()
else:
dim_time = self.df_clean[['year']].drop_duplicates().copy()
dim_time['year_range'] = None
dim_time['year'] = dim_time['year'].astype(int)
def parse_year_range(row):
year = row['year']
year_range = row.get('year_range')
start_year = year
end_year = year
if pd.notna(year_range) and year_range is not None:
yr_str = str(year_range).strip()
if yr_str and yr_str != 'nan':
if '-' in yr_str:
parts = yr_str.split('-')
if len(parts) == 2:
try:
start_year = int(parts[0].strip())
end_year = int(parts[1].strip())
year = (start_year + end_year) // 2
except Exception:
pass
else:
try:
single = int(yr_str)
start_year = single
end_year = single
year = single
except Exception:
pass
return pd.Series({'year': year, 'start_year': start_year, 'end_year': end_year})
parsed = dim_time.apply(parse_year_range, axis=1)
dim_time['year'] = parsed['year'].astype(int)
dim_time['start_year'] = parsed['start_year'].astype(int)
dim_time['end_year'] = parsed['end_year'].astype(int)
dim_time['is_year_range'] = (dim_time['start_year'] != dim_time['end_year'])
dim_time['decade'] = (dim_time['year'] // 10) * 10
dim_time['is_range'] = (dim_time['start_year'] != dim_time['end_year']).astype(int)
dim_time = dim_time.sort_values(['is_range', 'start_year'], ascending=[True, True])
dim_time = dim_time.drop(['is_range', 'year_range'], axis=1, errors='ignore')
dim_time = dim_time.drop_duplicates(subset=['start_year', 'end_year'], keep='first')
dim_time_final = dim_time[['year', 'start_year', 'end_year', 'decade', 'is_year_range']].copy()
dim_time_final = dim_time_final.reset_index(drop=True)
dim_time_final.insert(0, 'time_id', range(1, len(dim_time_final) + 1))
schema = [
bigquery.SchemaField("time_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("start_year", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("end_year", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("decade", "INTEGER", mode="NULLABLE"),
bigquery.SchemaField("is_year_range", "BOOLEAN", mode="NULLABLE"),
]
rows_loaded = load_to_bigquery(
self.client, dim_time_final, table_name,
layer='gold', write_disposition="WRITE_TRUNCATE", schema=schema
)
self._add_primary_key(table_name, 'time_id')
self.load_metadata[table_name].update(
{'rows_loaded': rows_loaded, 'status': 'success', 'end_time': datetime.now()}
)
log_update(self.client, 'DW', table_name, 'full_load', rows_loaded)
self._save_table_metadata(table_name)
self.logger.info(f" ✓ dim_time: {rows_loaded} rows\n")
return rows_loaded
except Exception as e:
self.load_metadata[table_name].update({'status': 'failed', 'end_time': datetime.now()})
log_update(self.client, 'DW', table_name, 'full_load', 0, 'failed', str(e))
raise
def load_dim_country(self):
table_name = 'dim_country'
self.load_metadata[table_name]['start_time'] = datetime.now()
self.logger.info("Loading dim_country → [DW/Gold] fs_asean_gold...")
try:
dim_country = self.df_clean[['country']].drop_duplicates().copy()
dim_country.columns = ['country_name']
region_mapping = {
'Brunei Darussalam': ('Southeast Asia', 'ASEAN'),
'Cambodia' : ('Southeast Asia', 'ASEAN'),
'Indonesia' : ('Southeast Asia', 'ASEAN'),
'Laos' : ('Southeast Asia', 'ASEAN'),
'Malaysia' : ('Southeast Asia', 'ASEAN'),
'Myanmar' : ('Southeast Asia', 'ASEAN'),
'Philippines' : ('Southeast Asia', 'ASEAN'),
'Singapore' : ('Southeast Asia', 'ASEAN'),
'Thailand' : ('Southeast Asia', 'ASEAN'),
'Vietnam' : ('Southeast Asia', 'ASEAN'),
}
iso_mapping = {
'Brunei Darussalam': 'BRN', 'Cambodia': 'KHM', 'Indonesia': 'IDN',
'Laos': 'LAO', 'Malaysia': 'MYS', 'Myanmar': 'MMR',
'Philippines': 'PHL', 'Singapore': 'SGP', 'Thailand': 'THA', 'Vietnam': 'VNM',
}
dim_country['region'] = dim_country['country_name'].map(
lambda x: region_mapping.get(x, ('Unknown', 'Unknown'))[0])
dim_country['subregion'] = dim_country['country_name'].map(
lambda x: region_mapping.get(x, ('Unknown', 'Unknown'))[1])
dim_country['iso_code'] = dim_country['country_name'].map(iso_mapping)
dim_country_final = dim_country[['country_name', 'region', 'subregion', 'iso_code']].copy()
dim_country_final = dim_country_final.reset_index(drop=True)
dim_country_final.insert(0, 'country_id', range(1, len(dim_country_final) + 1))
schema = [
bigquery.SchemaField("country_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("country_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("region", "STRING", mode="NULLABLE"),
bigquery.SchemaField("subregion", "STRING", mode="NULLABLE"),
bigquery.SchemaField("iso_code", "STRING", mode="NULLABLE"),
]
rows_loaded = load_to_bigquery(
self.client, dim_country_final, table_name,
layer='gold', write_disposition="WRITE_TRUNCATE", schema=schema
)
self._add_primary_key(table_name, 'country_id')
self.load_metadata[table_name].update(
{'rows_loaded': rows_loaded, 'status': 'success', 'end_time': datetime.now()}
)
log_update(self.client, 'DW', table_name, 'full_load', rows_loaded)
self._save_table_metadata(table_name)
self.logger.info(f" ✓ dim_country: {rows_loaded} rows\n")
return rows_loaded
except Exception as e:
self.load_metadata[table_name].update({'status': 'failed', 'end_time': datetime.now()})
log_update(self.client, 'DW', table_name, 'full_load', 0, 'failed', str(e))
raise
def load_dim_indicator(self):
table_name = 'dim_indicator'
self.load_metadata[table_name]['start_time'] = datetime.now()
self.logger.info("Loading dim_indicator → [DW/Gold] fs_asean_gold...")
try:
has_direction = 'direction' in self.df_clean.columns
has_unit = 'unit' in self.df_clean.columns
has_category = 'indicator_category' in self.df_clean.columns
dim_indicator = self.df_clean[['indicator_standardized']].drop_duplicates().copy()
dim_indicator.columns = ['indicator_name']
if has_unit:
unit_map = self.df_clean[['indicator_standardized', 'unit']].drop_duplicates()
unit_map.columns = ['indicator_name', 'unit']
dim_indicator = dim_indicator.merge(unit_map, on='indicator_name', how='left')
else:
dim_indicator['unit'] = None
if has_direction:
dir_map = self.df_clean[['indicator_standardized', 'direction']].drop_duplicates()
dir_map.columns = ['indicator_name', 'direction']
dim_indicator = dim_indicator.merge(dir_map, on='indicator_name', how='left')
self.logger.info(" [OK] direction column from cleaned_integrated")
else:
dim_indicator['direction'] = 'higher_better'
self.logger.warning(" [WARN] direction not found, default: higher_better")
if has_category:
cat_map = self.df_clean[['indicator_standardized', 'indicator_category']].drop_duplicates()
cat_map.columns = ['indicator_name', 'indicator_category']
dim_indicator = dim_indicator.merge(cat_map, on='indicator_name', how='left')
else:
def categorize_indicator(name):
n = str(name).lower()
if any(w in n for w in ['undernourishment', 'malnutrition', 'stunting',
'wasting', 'anemia', 'food security', 'food insecure', 'hunger']):
return 'Health & Nutrition'
elif any(w in n for w in ['production', 'yield', 'cereal', 'crop',
'import dependency', 'share of dietary']):
return 'Agricultural Production'
elif any(w in n for w in ['import', 'export', 'trade']):
return 'Trade'
elif any(w in n for w in ['gdp', 'income', 'economic']):
return 'Economic'
elif any(w in n for w in ['water', 'sanitation', 'infrastructure', 'rail']):
return 'Infrastructure'
else:
return 'Other'
dim_indicator['indicator_category'] = dim_indicator['indicator_name'].apply(categorize_indicator)
dim_indicator = dim_indicator.drop_duplicates(subset=['indicator_name'], keep='first')
dim_indicator_final = dim_indicator[
['indicator_name', 'indicator_category', 'unit', 'direction']
].copy()
dim_indicator_final = dim_indicator_final.reset_index(drop=True)
dim_indicator_final.insert(0, 'indicator_id', range(1, len(dim_indicator_final) + 1))
schema = [
bigquery.SchemaField("indicator_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("indicator_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("indicator_category", "STRING", mode="REQUIRED"),
bigquery.SchemaField("unit", "STRING", mode="NULLABLE"),
bigquery.SchemaField("direction", "STRING", mode="REQUIRED"),
]
rows_loaded = load_to_bigquery(
self.client, dim_indicator_final, table_name,
layer='gold', write_disposition="WRITE_TRUNCATE", schema=schema
)
self._add_primary_key(table_name, 'indicator_id')
for label, col in [('Categories', 'indicator_category'), ('Direction', 'direction')]:
self.logger.info(f" {label}:")
for val, cnt in dim_indicator_final[col].value_counts().items():
self.logger.info(f" - {val}: {cnt} ({cnt/len(dim_indicator_final)*100:.1f}%)")
self.load_metadata[table_name].update(
{'rows_loaded': rows_loaded, 'status': 'success', 'end_time': datetime.now()}
)
log_update(self.client, 'DW', table_name, 'full_load', rows_loaded)
self._save_table_metadata(table_name)
self.logger.info(f" ✓ dim_indicator: {rows_loaded} rows\n")
return rows_loaded
except Exception as e:
self.load_metadata[table_name].update({'status': 'failed', 'end_time': datetime.now()})
log_update(self.client, 'DW', table_name, 'full_load', 0, 'failed', str(e))
raise
def load_dim_source(self):
table_name = 'dim_source'
self.load_metadata[table_name]['start_time'] = datetime.now()
self.logger.info("Loading dim_source → [DW/Gold] fs_asean_gold...")
try:
source_details = {
'FAO': {
'source_type' : 'International Organization',
'organization' : 'Food and Agriculture Organization',
'access_method': 'Python Library (faostat)',
'api_endpoint' : None,
},
'World Bank': {
'source_type' : 'International Organization',
'organization' : 'The World Bank',
'access_method': 'Python Library (wbgapi)',
'api_endpoint' : None,
},
'UNICEF': {
'source_type' : 'International Organization',
'organization' : "United Nations Children's Fund",
'access_method': 'SDMX API',
'api_endpoint' : 'https://sdmx.data.unicef.org/ws/public/sdmxapi/rest',
},
}
sources_data = []
for source in self.df_clean['source'].unique():
detail = source_details.get(source, {
'source_type' : 'International Organization',
'organization' : source,
'access_method': 'Unknown',
'api_endpoint' : None,
})
sources_data.append({'source_name': source, **detail})
dim_source_final = pd.DataFrame(sources_data)[
['source_name', 'source_type', 'organization', 'access_method', 'api_endpoint']
].copy()
dim_source_final = dim_source_final.reset_index(drop=True)
dim_source_final.insert(0, 'source_id', range(1, len(dim_source_final) + 1))
schema = [
bigquery.SchemaField("source_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("source_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("source_type", "STRING", mode="NULLABLE"),
bigquery.SchemaField("organization", "STRING", mode="NULLABLE"),
bigquery.SchemaField("access_method", "STRING", mode="NULLABLE"),
bigquery.SchemaField("api_endpoint", "STRING", mode="NULLABLE"),
]
rows_loaded = load_to_bigquery(
self.client, dim_source_final, table_name,
layer='gold', write_disposition="WRITE_TRUNCATE", schema=schema
)
self._add_primary_key(table_name, 'source_id')
self.load_metadata[table_name].update(
{'rows_loaded': rows_loaded, 'status': 'success', 'end_time': datetime.now()}
)
log_update(self.client, 'DW', table_name, 'full_load', rows_loaded)
self._save_table_metadata(table_name)
self.logger.info(f" ✓ dim_source: {rows_loaded} rows\n")
return rows_loaded
except Exception as e:
self.load_metadata[table_name].update({'status': 'failed', 'end_time': datetime.now()})
log_update(self.client, 'DW', table_name, 'full_load', 0, 'failed', str(e))
raise
def load_dim_pillar(self):
table_name = 'dim_pillar'
self.load_metadata[table_name]['start_time'] = datetime.now()
self.logger.info("Loading dim_pillar → [DW/Gold] fs_asean_gold...")
try:
pillar_codes = {
'Availability': 'AVL', 'Access' : 'ACC',
'Utilization' : 'UTL', 'Stability': 'STB', 'Other': 'OTH',
}
pillars_data = [
{'pillar_name': p, 'pillar_code': pillar_codes.get(p, 'OTH')}
for p in self.df_clean['pillar'].unique()
]
dim_pillar_final = pd.DataFrame(pillars_data).sort_values('pillar_name')[
['pillar_name', 'pillar_code']
].copy()
dim_pillar_final = dim_pillar_final.reset_index(drop=True)
dim_pillar_final.insert(0, 'pillar_id', range(1, len(dim_pillar_final) + 1))
schema = [
bigquery.SchemaField("pillar_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("pillar_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("pillar_code", "STRING", mode="NULLABLE"),
]
rows_loaded = load_to_bigquery(
self.client, dim_pillar_final, table_name,
layer='gold', write_disposition="WRITE_TRUNCATE", schema=schema
)
self._add_primary_key(table_name, 'pillar_id')
self.load_metadata[table_name].update(
{'rows_loaded': rows_loaded, 'status': 'success', 'end_time': datetime.now()}
)
log_update(self.client, 'DW', table_name, 'full_load', rows_loaded)
self._save_table_metadata(table_name)
self.logger.info(f" ✓ dim_pillar: {rows_loaded} rows\n")
return rows_loaded
except Exception as e:
self.load_metadata[table_name].update({'status': 'failed', 'end_time': datetime.now()})
log_update(self.client, 'DW', table_name, 'full_load', 0, 'failed', str(e))
raise
# ------------------------------------------------------------------
# FACT LOADER
# ------------------------------------------------------------------
def load_fact_food_security(self):
table_name = 'fact_food_security'
self.load_metadata[table_name]['start_time'] = datetime.now()
self.logger.info("Loading fact_food_security → [DW/Gold] fs_asean_gold...")
try:
# Load dims dari Gold untuk FK resolution
dim_country = read_from_bigquery(self.client, 'dim_country', layer='gold')
dim_indicator = read_from_bigquery(self.client, 'dim_indicator', layer='gold')
dim_time = read_from_bigquery(self.client, 'dim_time', layer='gold')
dim_source = read_from_bigquery(self.client, 'dim_source', layer='gold')
dim_pillar = read_from_bigquery(self.client, 'dim_pillar', layer='gold')
fact_table = self.df_clean.copy()
def parse_year_range_for_merge(row):
year = row['year']
year_range = row.get('year_range')
start_year = year
end_year = year
if pd.notna(year_range) and year_range is not None:
yr_str = str(year_range).strip()
if yr_str and yr_str != 'nan':
if '-' in yr_str:
parts = yr_str.split('-')
if len(parts) == 2:
try:
start_year = int(parts[0].strip())
end_year = int(parts[1].strip())
except Exception:
pass
else:
try:
single = int(yr_str)
start_year = single
end_year = single
except Exception:
pass
return pd.Series({'start_year': start_year, 'end_year': end_year})
if 'year_range' in fact_table.columns:
parsed = fact_table.apply(parse_year_range_for_merge, axis=1)
fact_table['start_year'] = parsed['start_year'].astype(int)
fact_table['end_year'] = parsed['end_year'].astype(int)
else:
fact_table['start_year'] = fact_table['year'].astype(int)
fact_table['end_year'] = fact_table['year'].astype(int)
# Resolve FKs
fact_table = fact_table.merge(
dim_country[['country_id', 'country_name']].rename(columns={'country_name': 'country'}),
on='country', how='left'
)
fact_table = fact_table.merge(
dim_indicator[['indicator_id', 'indicator_name']].rename(
columns={'indicator_name': 'indicator_standardized'}),
on='indicator_standardized', how='left'
)
fact_table = fact_table.merge(
dim_time[['time_id', 'start_year', 'end_year']],
on=['start_year', 'end_year'], how='left'
)
fact_table = fact_table.merge(
dim_source[['source_id', 'source_name']].rename(columns={'source_name': 'source'}),
on='source', how='left'
)
fact_table = fact_table.merge(
dim_pillar[['pillar_id', 'pillar_name']].rename(columns={'pillar_name': 'pillar'}),
on='pillar', how='left'
)
# Filter hanya row dengan FK lengkap
fact_table = fact_table[
fact_table['country_id'].notna() &
fact_table['indicator_id'].notna() &
fact_table['time_id'].notna() &
fact_table['source_id'].notna() &
fact_table['pillar_id'].notna()
]
fact_final = fact_table[
['country_id', 'indicator_id', 'time_id', 'source_id', 'pillar_id', 'value']
].copy()
fact_final['data_quality_score'] = 0.95
for col in ['country_id', 'indicator_id', 'time_id', 'source_id', 'pillar_id']:
fact_final[col] = fact_final[col].astype(int)
fact_final['value'] = fact_final['value'].astype(float)
fact_final = fact_final.reset_index(drop=True)
fact_final.insert(0, 'fact_id', range(1, len(fact_final) + 1))
schema = [
bigquery.SchemaField("fact_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("country_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("indicator_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("time_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("source_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("pillar_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("value", "FLOAT", mode="NULLABLE"),
bigquery.SchemaField("data_quality_score", "FLOAT", mode="NULLABLE"),
]
rows_loaded = load_to_bigquery(
self.client, fact_final, table_name,
layer='gold', write_disposition="WRITE_TRUNCATE", schema=schema
)
# Add PK + FKs
self._add_primary_key(table_name, 'fact_id')
self._add_foreign_key(table_name, 'country_id', 'dim_country', 'country_id')
self._add_foreign_key(table_name, 'indicator_id', 'dim_indicator', 'indicator_id')
self._add_foreign_key(table_name, 'time_id', 'dim_time', 'time_id')
self._add_foreign_key(table_name, 'source_id', 'dim_source', 'source_id')
self._add_foreign_key(table_name, 'pillar_id', 'dim_pillar', 'pillar_id')
self.load_metadata[table_name].update(
{'rows_loaded': rows_loaded, 'status': 'success', 'end_time': datetime.now()}
)
log_update(self.client, 'DW', table_name, 'full_load', rows_loaded)
self._save_table_metadata(table_name)
self.logger.info(f" ✓ fact_food_security: {rows_loaded:,} rows\n")
return rows_loaded
except Exception as e:
self.load_metadata[table_name].update({'status': 'failed', 'end_time': datetime.now()})
log_update(self.client, 'DW', table_name, 'full_load', 0, 'failed', str(e))
raise
# ------------------------------------------------------------------
# VALIDATION
# ------------------------------------------------------------------
def validate_constraints(self):
self.logger.info("\n" + "=" * 60)
self.logger.info("CONSTRAINT VALIDATION — fs_asean_gold")
self.logger.info("=" * 60)
try:
gold_dataset = CONFIG['bigquery']['dataset_gold']
query = f"""
SELECT table_name, constraint_name, constraint_type
FROM `{CONFIG['bigquery']['project_id']}.{gold_dataset}.INFORMATION_SCHEMA.TABLE_CONSTRAINTS`
WHERE table_name IN (
'dim_country', 'dim_indicator', 'dim_time',
'dim_source', 'dim_pillar', 'fact_food_security'
)
ORDER BY
CASE WHEN table_name LIKE 'dim_%' THEN 1 ELSE 2 END,
table_name, constraint_type
"""
df = self.client.query(query).result().to_dataframe(create_bqstorage_client=False)
if len(df) > 0:
for _, row in df.iterrows():
icon = "[PK]" if row['constraint_type'] == "PRIMARY KEY" else "[FK]"
self.logger.info(
f" {icon} {row['table_name']:25s} | "
f"{row['constraint_type']:15s} | {row['constraint_name']}"
)
pk_count = len(df[df['constraint_type'] == 'PRIMARY KEY'])
fk_count = len(df[df['constraint_type'] == 'FOREIGN KEY'])
self.logger.info(f"\n Primary Keys : {pk_count}")
self.logger.info(f" Foreign Keys : {fk_count}")
self.logger.info(f" Total : {len(df)}")
else:
self.logger.warning(" [WARN] No constraints found!")
except Exception as e:
self.logger.error(f"Error validating constraints: {e}")
def validate_data_load(self):
self.logger.info("\n" + "=" * 60)
self.logger.info("DATA LOAD VALIDATION — fs_asean_gold")
self.logger.info("=" * 60)
try:
for table in ['dim_country', 'dim_indicator', 'dim_time',
'dim_source', 'dim_pillar', 'fact_food_security']:
df = read_from_bigquery(self.client, table, layer='gold')
self.logger.info(f" {table:25s}: {len(df):>10,} rows")
query = f"""
SELECT
COUNT(*) AS total_facts,
COUNT(DISTINCT country_id) AS unique_countries,
COUNT(DISTINCT indicator_id) AS unique_indicators,
COUNT(DISTINCT time_id) AS unique_years,
COUNT(DISTINCT source_id) AS unique_sources,
COUNT(DISTINCT pillar_id) AS unique_pillars
FROM `{get_table_id('fact_food_security', layer='gold')}`
"""
stats = self.client.query(query).result().to_dataframe(
create_bqstorage_client=False
).iloc[0]
self.logger.info(f"\n Fact Table Summary:")
self.logger.info(f" Total Facts : {int(stats['total_facts']):>10,}")
self.logger.info(f" Unique Countries : {int(stats['unique_countries']):>10,}")
self.logger.info(f" Unique Indicators : {int(stats['unique_indicators']):>10,}")
self.logger.info(f" Unique Years : {int(stats['unique_years']):>10,}")
self.logger.info(f" Unique Sources : {int(stats['unique_sources']):>10,}")
self.logger.info(f" Unique Pillars : {int(stats['unique_pillars']):>10,}")
query_dir = f"""
SELECT direction, COUNT(*) AS count
FROM `{get_table_id('dim_indicator', layer='gold')}`
GROUP BY direction ORDER BY direction
"""
df_dir = self.client.query(query_dir).result().to_dataframe(create_bqstorage_client=False)
if len(df_dir) > 0:
self.logger.info(f"\n Direction Distribution:")
for _, row in df_dir.iterrows():
self.logger.info(f" {row['direction']:15s}: {int(row['count']):>5,} indicators")
self.logger.info("\n [OK] Validation completed")
except Exception as e:
self.logger.error(f"Error during validation: {e}")
raise
# ------------------------------------------------------------------
# RUN
# ------------------------------------------------------------------
def run(self):
"""Execute full dimensional model load ke DW layer (Gold)."""
self.pipeline_metadata['start_time'] = datetime.now()
self.pipeline_metadata['rows_fetched'] = len(self.df_clean)
self.logger.info("\n" + "=" * 60)
self.logger.info("DIMENSIONAL MODEL LOAD — DW (Gold) → fs_asean_gold")
self.logger.info("=" * 60)
# Dimensions
self.logger.info("\nLOADING DIMENSION TABLES → fs_asean_gold")
self.load_dim_country()
self.load_dim_indicator()
self.load_dim_time()
self.load_dim_source()
self.load_dim_pillar()
# Fact
self.logger.info("\nLOADING FACT TABLE → fs_asean_gold")
self.load_fact_food_security()
# Validate
self.validate_constraints()
self.validate_data_load()
pipeline_end = datetime.now()
duration = (pipeline_end - self.pipeline_metadata['start_time']).total_seconds()
total_loaded = sum(m['rows_loaded'] for m in self.load_metadata.values())
self.pipeline_metadata.update({
'end_time' : pipeline_end,
'duration_seconds' : duration,
'rows_transformed' : total_loaded,
'rows_loaded' : total_loaded,
'execution_timestamp': self.pipeline_metadata['start_time'],
'completeness_pct' : 100.0,
'config_snapshot' : json.dumps({'load_mode': 'full_refresh', 'layer': 'gold'}),
'validation_metrics': json.dumps({t: m['status'] for t, m in self.load_metadata.items()}),
'table_name' : 'dimensional_model_pipeline',
})
try:
save_etl_metadata(self.client, self.pipeline_metadata)
except Exception as e:
self.logger.warning(f" [WARN] Could not save pipeline metadata: {e}")
# Summary
self.logger.info("\n" + "=" * 60)
self.logger.info("DIMENSIONAL MODEL LOAD COMPLETED")
self.logger.info("=" * 60)
self.logger.info(f" Dataset : fs_asean_gold")
self.logger.info(f" Duration : {duration:.2f}s")
self.logger.info(f" Tables :")
for tbl, meta in self.load_metadata.items():
icon = "" if meta['status'] == 'success' else ""
self.logger.info(f" {icon} {tbl:25s}: {meta['rows_loaded']:>10,} rows")
self.logger.info(f"\n Metadata → [AUDIT] etl_metadata")
self.logger.info("=" * 60)
# =============================================================================
# AIRFLOW TASK FUNCTIONS ← sama polanya dengan raw & cleaned layer
# =============================================================================
def run_dimensional_model():
"""
Airflow task: Load dimensional model dari cleaned_integrated.
Dipanggil oleh DAG setelah task cleaned_integration_to_silver selesai.
"""
from scripts.bigquery_config import get_bigquery_client
client = get_bigquery_client()
df_clean = read_from_bigquery(client, 'cleaned_integrated', layer='silver')
loader = DimensionalModelLoader(client, df_clean)
loader.run()
print(f"Dimensional model loaded: {len(df_clean):,} source rows processed")
# =============================================================================
# MAIN EXECUTION
# =============================================================================
if __name__ == "__main__":
print("=" * 60)
print("BIGQUERY DIMENSIONAL MODEL LOAD")
print("Kimball DW Architecture")
print(" Input : STAGING (Silver) → cleaned_integrated (fs_asean_silver)")
print(" Output : DW (Gold) → dim_*, fact_* (fs_asean_gold)")
print(" Audit : AUDIT → etl_logs, etl_metadata (fs_asean_audit)")
print("=" * 60)
logger = setup_logging()
client = get_bigquery_client()
print("\nLoading cleaned_integrated (fs_asean_silver)...")
df_clean = read_from_bigquery(client, 'cleaned_integrated', layer='silver')
print(f" ✓ Loaded : {len(df_clean):,} rows")
print(f" Columns : {len(df_clean.columns)}")
print(f" Sources : {df_clean['source'].nunique()}")
print(f" Indicators : {df_clean['indicator_standardized'].nunique()}")
print(f" Countries : {df_clean['country'].nunique()}")
print(f" Year range : {int(df_clean['year'].min())}{int(df_clean['year'].max())}")
if 'direction' in df_clean.columns:
print(f" Direction : {df_clean['direction'].value_counts().to_dict()}")
else:
print(f" [WARN] direction column not found — run bigquery_cleaned_layer.py first")
print("\n[1/1] Dimensional Model Load → DW (Gold)...")
loader = DimensionalModelLoader(client, df_clean)
loader.run()
print("\n" + "=" * 60)
print("✓ DIMENSIONAL MODEL ETL COMPLETED")
print(" 🥇 DW (Gold) : dim_country, dim_indicator, dim_time,")
print(" dim_source, dim_pillar, fact_food_security")
print(" 📋 AUDIT : etl_logs, etl_metadata")
print("=" * 60)

View File

@@ -1,23 +1,6 @@
"""
BIGQUERY HELPER FUNCTIONS
Kimball Data Warehouse Architecture
Layer Assignment (Kimball terminology):
RAW (Bronze) → raw_fao, raw_worldbank, raw_unicef
STAGING (Silver) → staging_integrated, cleaned_integrated
AUDIT (Audit) → etl_logs, etl_metadata
DW (Gold) → dim_*, fact_food_security, fact_food_security_eligible
Functions:
setup_logging() — Setup file & console logging
log_update() — Audit log ETL ke staging (Silver)
save_etl_metadata() — Save ETL metadata ke staging (Silver), preserve created_at
load_to_bigquery() — Load DataFrame ke layer tertentu
read_from_bigquery() — Read dari layer tertentu
truncate_table() — Hapus semua rows dari table
drop_table() — Drop table dari layer tertentu
get_staging_schema() — Schema staging_integrated
get_etl_metadata_schema() — Schema etl_metadata
"""
import pandas as pd
@@ -25,7 +8,7 @@ import logging
from datetime import datetime
import pytz
from google.cloud import bigquery
from bigquery_config import (
from scripts.bigquery_config import (
get_bigquery_client,
get_table_id,
table_exists,
@@ -35,16 +18,9 @@ import json
# LOGGING SETUP
logger = logging.getLogger(__name__)
def setup_logging(log_file: str = 'logs/etl_pipeline.log') -> logging.Logger:
"""
Setup logging system untuk tracking eksekusi ETL
Args:
log_file: Path to log file
Returns:
logging.Logger: Configured logger
"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
@@ -55,27 +31,8 @@ def setup_logging(log_file: str = 'logs/etl_pipeline.log') -> logging.Logger:
)
return logging.getLogger(__name__)
# ETL AUDIT LOG — STAGING LAYER (Silver)
def ensure_etl_logs_table(client: bigquery.Client):
"""
Buat table etl_logs di STAGING layer (Silver) jika belum ada.
Kimball context:
etl_logs adalah operational/audit table, bukan bagian dari Star Schema.
Disimpan di Staging layer karena merupakan output proses ETL,
bukan data warehouse final.
Schema:
id STRING — unique log ID
timestamp DATETIME — waktu log dibuat
layer STRING — layer yang diproses (RAW/STAGING/DW)
table_name STRING — nama table yang diproses
update_method STRING — full_refresh / incremental
rows_affected INTEGER — jumlah rows
status STRING — success / failed
error_message STRING — pesan error jika gagal
"""
if not table_exists(client, 'etl_logs', layer='audit'):
table_id = get_table_id('etl_logs', layer='audit')
schema = [
@@ -96,28 +53,6 @@ def ensure_etl_logs_table(client: bigquery.Client):
def log_update(client: bigquery.Client, layer: str, table_name: str,
update_method: str, rows_affected: int,
status: str = 'success', error_msg: str = None):
"""
Catat aktivitas ETL ke etl_logs (STAGING/Silver) untuk audit trail.
Args:
client : BigQuery client
layer : Layer yang diproses — 'RAW', 'STAGING', atau 'DW'
table_name : Nama table yang diproses
update_method : 'full_refresh' atau 'incremental'
rows_affected : Jumlah rows yang diproses
status : 'success' atau 'failed'
error_msg : Pesan error jika status='failed'
Examples:
# Log saat load raw data
log_update(client, 'RAW', 'raw_fao', 'full_refresh', 5000)
# Log saat proses staging
log_update(client, 'STAGING', 'staging_integrated', 'full_refresh', 12000)
# Log saat load ke DW
log_update(client, 'DW', 'fact_food_security', 'full_refresh', 8000)
"""
try:
ensure_etl_logs_table(client)
@@ -133,7 +68,6 @@ def log_update(client: bigquery.Client, layer: str, table_name: str,
'error_message': error_msg
}])
# Hapus timezone untuk BigQuery DATETIME
log_data['timestamp'] = pd.to_datetime(log_data['timestamp']).dt.tz_localize(None)
log_data['id'] = log_data['id'].astype(str)
@@ -143,42 +77,13 @@ def log_update(client: bigquery.Client, layer: str, table_name: str,
job.result()
except Exception as e:
print(f" Warning: Failed to write etl_logs [STAGING]: {e}")
print(f" Warning: Failed to write etl_logs [AUDIT]: {e}")
# DATA LOADING TO BIGQUERY
def load_to_bigquery(client: bigquery.Client, df: pd.DataFrame,
table_name: str, layer: str = "bronze",
write_disposition: str = "WRITE_TRUNCATE",
schema: list = None) -> int:
"""
Load DataFrame ke BigQuery table pada layer tertentu.
Args:
client : BigQuery client
df : DataFrame yang akan di-load
table_name : Nama table tujuan
layer : 'bronze'/'raw', 'silver'/'staging', 'gold'/'dw'
write_disposition : WRITE_TRUNCATE (replace) atau WRITE_APPEND (append)
schema : Optional schema (list of SchemaField)
Returns:
int: Jumlah rows yang berhasil di-load
Examples (Kimball flow):
# RAW layer — data mentah dari sumber
load_to_bigquery(client, df_fao, 'raw_fao', layer='bronze')
load_to_bigquery(client, df_wb, 'raw_worldbank', layer='bronze')
load_to_bigquery(client, df_unicef, 'raw_unicef', layer='bronze')
# STAGING layer — cleaned & integrated
load_to_bigquery(client, df_staging, 'staging_integrated', layer='silver')
# DW layer — Kimball Star Schema
load_to_bigquery(client, df_dim, 'dim_country', layer='gold')
load_to_bigquery(client, df_fact, 'fact_food_security', layer='gold')
load_to_bigquery(client, df_elig, 'fact_food_security_eligible', layer='gold')
"""
table_id = get_table_id(table_name, layer)
job_config = bigquery.LoadJobConfig(
write_disposition=write_disposition,
@@ -193,36 +98,11 @@ def load_to_bigquery(client: bigquery.Client, df: pd.DataFrame,
print(f" ✓ Loaded {table.num_rows:,} rows → [{layer.upper()}] {table_name}")
return table.num_rows
# DATA READING FROM BIGQUERY
def read_from_bigquery(client: bigquery.Client,
table_name: str = None,
layer: str = "bronze",
query: str = None) -> pd.DataFrame:
"""
Read data dari BigQuery table atau jalankan custom query.
Args:
client : BigQuery client
table_name : Nama table yang akan dibaca
layer : 'bronze'/'raw', 'silver'/'staging', 'gold'/'dw'
query : Custom SQL query (jika diisi, table_name diabaikan)
Returns:
pd.DataFrame: Hasil query
Examples (Kimball flow):
# Baca dari RAW layer
df = read_from_bigquery(client, 'raw_fao', layer='bronze')
# Baca dari STAGING layer
df = read_from_bigquery(client, 'staging_integrated', layer='silver')
# Baca dari DW layer
df = read_from_bigquery(client, 'fact_food_security', layer='gold')
df = read_from_bigquery(client, 'fact_food_security_eligible', layer='gold')
df = read_from_bigquery(client, 'dim_country', layer='gold')
"""
if query:
return client.query(query).result().to_dataframe(create_bqstorage_client=False)
elif table_name:
@@ -231,17 +111,8 @@ def read_from_bigquery(client: bigquery.Client,
else:
raise ValueError("Either table_name or query must be provided")
# TABLE MANAGEMENT
def truncate_table(client: bigquery.Client, table_name: str, layer: str = "bronze"):
"""
Hapus semua rows dari table (kosongkan table, struktur tetap ada).
Args:
client : BigQuery client
table_name : Nama table
layer : 'bronze'/'raw', 'silver'/'staging', 'gold'/'dw'
"""
table_id = get_table_id(table_name, layer)
job = client.query(f"DELETE FROM `{table_id}` WHERE TRUE")
job.result()
@@ -249,30 +120,12 @@ def truncate_table(client: bigquery.Client, table_name: str, layer: str = "bronz
def drop_table(client: bigquery.Client, table_name: str, layer: str = "bronze"):
"""
Drop table dari BigQuery jika ada.
Args:
client : BigQuery client
table_name : Nama table
layer : 'bronze'/'raw', 'silver'/'staging', 'gold'/'dw'
"""
table_id = get_table_id(table_name, layer)
client.delete_table(table_id, not_found_ok=True)
print(f" Dropped [{layer.upper()}] table: {table_name}")
# SCHEMA DEFINITIONS — STAGING LAYER (Silver)
def get_staging_schema() -> list:
"""
Schema untuk staging_integrated table (STAGING/Silver layer).
Staging table adalah area integrasi data dari semua sumber (FAO, WB, UNICEF)
sebelum di-load ke DW layer sebagai Dim & Fact tables.
Returns:
list: List of SchemaField objects
"""
return [
bigquery.SchemaField("source", "STRING", mode="REQUIRED"),
bigquery.SchemaField("indicator_original", "STRING", mode="REQUIRED"),
@@ -286,15 +139,6 @@ def get_staging_schema() -> list:
def get_etl_metadata_schema() -> list:
"""
Schema untuk etl_metadata table (STAGING/Silver layer).
ETL metadata disimpan di Staging layer karena merupakan operational table
untuk reproducibility & tracking, bukan bagian Star Schema DW.
Returns:
list: List of SchemaField objects
"""
return [
bigquery.SchemaField("id", "STRING", mode="REQUIRED"),
bigquery.SchemaField("source_class", "STRING", mode="REQUIRED"),
@@ -311,43 +155,17 @@ def get_etl_metadata_schema() -> list:
bigquery.SchemaField("updated_at", "TIMESTAMP", mode="REQUIRED"),
]
# ETL METADATA — STAGING LAYER (Silver)
# FIXED: Preserve created_at dari eksekusi pertama
def save_etl_metadata(client: bigquery.Client, metadata: dict):
"""
Save ETL metadata ke etl_metadata table (STAGING/Silver layer).
Logic created_at vs updated_at:
created_at : diambil dari record PERTAMA untuk table_name yang sama
(preserved across runs — untuk reproducibility)
updated_at : selalu diperbarui ke waktu eksekusi sekarang
Args:
client : BigQuery client
metadata : Dict berisi informasi eksekusi ETL:
table_name (required)
source_class (required)
execution_timestamp
duration_seconds
rows_fetched
rows_transformed
rows_loaded
completeness_pct
config_snapshot (JSON string)
validation_metrics (JSON string)
"""
table_name = metadata.get('table_name', 'unknown')
table_id = get_table_id('etl_metadata', layer='audit')
# Buat table jika belum ada
if not table_exists(client, 'etl_metadata', layer='audit'):
schema = get_etl_metadata_schema()
table = bigquery.Table(table_id, schema=schema)
client.create_table(table)
print(f" [AUDIT] Created table: etl_metadata")
# Ambil created_at pertama untuk table ini (preserve across runs)
check_query = f"""
SELECT MIN(created_at) AS first_created_at
FROM `{table_id}`
@@ -373,7 +191,6 @@ def save_etl_metadata(client: bigquery.Client, metadata: dict):
current_time = datetime.now()
# Generate unique ID
import hashlib
record_id = hashlib.md5(
f"{metadata.get('source_class')}_{table_name}_{current_time.isoformat()}".encode()
@@ -391,15 +208,13 @@ def save_etl_metadata(client: bigquery.Client, metadata: dict):
'completeness_pct' : float(metadata.get('completeness_pct', 0)),
'config_snapshot' : metadata.get('config_snapshot', '{}'),
'validation_metrics' : metadata.get('validation_metrics', '{}'),
'created_at' : created_at, # PRESERVED dari run pertama
'updated_at' : current_time # SELALU waktu sekarang
'created_at' : created_at,
'updated_at' : current_time
}])
# Hapus timezone untuk BigQuery
for col in ['execution_timestamp', 'created_at', 'updated_at']:
meta_df[col] = pd.to_datetime(meta_df[col]).dt.tz_localize(None)
# APPEND ke STAGING layer (Silver)
job_config = bigquery.LoadJobConfig(write_disposition="WRITE_APPEND")
job = client.load_table_from_dataframe(meta_df, table_id, job_config=job_config)
job.result()
@@ -409,14 +224,3 @@ def save_etl_metadata(client: bigquery.Client, metadata: dict):
else:
print(f"etl_metadata — preserved | created_at : {created_at}")
print(f"etl_metadata — updated_at : {current_time}")
# INITIALIZE
logger = setup_logging()
client = get_bigquery_client()
print("BigQuery Helpers Loaded — Kimball DW Architecture")
print(f"Project : {CONFIG['bigquery']['project_id']}")
print(f"Raw (Bronze) : {CONFIG['bigquery']['dataset_bronze']}")
print(f"Staging (Silver) : {CONFIG['bigquery']['dataset_silver']}")
print(f"DW (Gold) : {CONFIG['bigquery']['dataset_gold']}")

View File

@@ -33,8 +33,8 @@ from pathlib import Path
from difflib import SequenceMatcher
import re
from bigquery_config import get_bigquery_client, CONFIG, EXPORTS_DIR, LOGS_DIR, get_table_id
from bigquery_helpers import (
from scripts.bigquery_config import get_bigquery_client, CONFIG, EXPORTS_DIR, LOGS_DIR, get_table_id
from scripts.bigquery_helpers import (
log_update,
load_to_bigquery,
read_from_bigquery,
@@ -42,9 +42,10 @@ from bigquery_helpers import (
save_etl_metadata,
get_staging_schema
)
from bigquery_datasource import DataSource
from scripts.bigquery_datasource import DataSource
from google.cloud import bigquery
# INDICATOR MATCHER
class IndicatorMatcher:
@@ -200,7 +201,7 @@ class IndicatorMatcher:
class FAODataSource(DataSource):
"""
FAO Food Security Data Source (BigQuery version)
FIXED: Menggunakan bulk download karena faostat API butuh autentikasi
Menggunakan bulk download karena faostat API butuh autentikasi
"""
def __init__(self, client: bigquery.Client = None):
@@ -447,28 +448,22 @@ class StagingDataIntegration:
}
def load_raw_data(self) -> Dict[str, pd.DataFrame]:
"""Load data dari semua tabel RAW layer (Bronze)"""
raw_data = {}
try:
raw_data['fao'] = read_from_bigquery(self.client, 'raw_fao', layer='bronze')
except Exception:
raw_data['fao'] = pd.DataFrame()
try:
raw_data['worldbank'] = read_from_bigquery(self.client, 'raw_worldbank', layer='bronze')
except Exception:
raw_data['worldbank'] = pd.DataFrame()
try:
raw_data['unicef'] = read_from_bigquery(self.client, 'raw_unicef', layer='bronze')
except Exception:
raw_data['unicef'] = pd.DataFrame()
return raw_data
def clean_value(self, value):
"""Clean dan convert value ke float"""
if pd.isna(value):
return None
value_str = str(value).strip().replace('<', '').replace('>', '').strip()
@@ -478,18 +473,9 @@ class StagingDataIntegration:
return None
def process_year_range(self, year_value):
"""
Process year range dan return (year_int, year_range_str)
Examples:
"2020" → (2020, "2020")
"2020-2021" → (2020, "2020-2021")
"20192021" → (2020, "2019-2021")
"""
if pd.isna(year_value):
return None, None
year_str = str(year_value).strip().replace('', '-').replace('', '-')
if '-' in year_str:
try:
parts = year_str.split('-')
@@ -509,7 +495,6 @@ class StagingDataIntegration:
return None, year_str
def truncate_string(self, value, max_length: int) -> str:
"""Truncate string sesuai varchar constraint"""
if pd.isna(value):
return ''
s = str(value).strip()
@@ -519,7 +504,6 @@ class StagingDataIntegration:
indicator_orig_col: str, indicator_std_col: str,
country_col: str, year_col: str, value_col: str,
unit_col: str = None) -> pd.DataFrame:
"""Standardize dataframe ke schema staging_integrated"""
if df.empty:
return pd.DataFrame()
@@ -543,10 +527,9 @@ class StagingDataIntegration:
})
def standardize_schema(self, raw_data: Dict[str, pd.DataFrame]) -> pd.DataFrame:
"""Standardize schema dari semua sumber data"""
integrated_data = []
# FAO — deteksi kolom (nama asli atau sudah di-rename)
# FAO
if not raw_data['fao'].empty:
df = raw_data['fao'].copy()
integrated_data.append(self.standardize_dataframe(
@@ -590,11 +573,9 @@ class StagingDataIntegration:
df_integrated = pd.concat(integrated_data, ignore_index=True)
# Final type conversion
df_integrated['year'] = pd.to_numeric(df_integrated['year'], errors='coerce')
df_integrated['value'] = pd.to_numeric(df_integrated['value'], errors='coerce')
# Enforce varchar constraints
for col, max_len in [('source', 20), ('country', 100), ('indicator_original', 255),
('indicator_standardized', 255), ('year_range', 20), ('unit', 20)]:
df_integrated[col] = df_integrated[col].astype(str).apply(
@@ -606,7 +587,6 @@ class StagingDataIntegration:
).reset_index(drop=True)
def validate_data(self, df: pd.DataFrame) -> Dict:
"""Validate data dan return metrics"""
validation = {
'total_rows' : int(len(df)),
'total_columns' : int(len(df.columns)),
@@ -621,15 +601,12 @@ class StagingDataIntegration:
'max' : int(df['year'].max()) if not df['year'].isnull().all() else None,
'unique_years': int(df['year'].nunique())
}
if 'source' in df.columns:
validation['source_breakdown'] = {
str(k): int(v) for k, v in df['source'].value_counts().to_dict().items()
}
if 'indicator_standardized' in df.columns:
validation['unique_indicators'] = int(df['indicator_standardized'].nunique())
if 'country' in df.columns:
validation['unique_countries'] = int(df['country'].nunique())
@@ -645,21 +622,15 @@ class StagingDataIntegration:
return validation
def save_to_staging(self, df: pd.DataFrame):
"""Save data ke staging_integrated table di STAGING layer (Silver)"""
try:
schema = get_staging_schema()
load_to_bigquery(
self.client,
df,
self.staging_table,
layer='silver', # → fs_asean_silver
self.client, df, self.staging_table,
layer='silver',
write_disposition="WRITE_TRUNCATE",
schema=schema
)
log_update(self.client, 'STAGING', self.staging_table, 'full_refresh', len(df))
except Exception as e:
print(f"save_to_staging FAILED: {type(e).__name__}: {e}")
log_update(self.client, 'STAGING', self.staging_table, 'full_refresh', 0,
@@ -667,7 +638,6 @@ class StagingDataIntegration:
raise
def run(self) -> pd.DataFrame:
"""Run staging integration process"""
self.metadata['start_time'] = datetime.now()
try:
@@ -703,7 +673,6 @@ class StagingDataIntegration:
save_etl_metadata(self.client, self.metadata)
# Summary
print(f" ✓ Staging Integration completed: {len(df_integrated):,} rows")
print(f" Duration : {self.metadata['duration_seconds']:.2f}s")
if 'source_breakdown' in validation:
@@ -725,7 +694,6 @@ class StagingDataIntegration:
print(f" - country max length : {schema_val['country_max_length']}/100")
print(f" - year_range max length : {schema_val['year_range_max_length']}/20")
print(f" - unit max length : {schema_val['unit_max_length']}/20")
print(f"\n Metadata → [AUDIT] etl_metadata")
return df_integrated
@@ -734,6 +702,7 @@ class StagingDataIntegration:
self.logger.error(f"Staging integration failed: {str(e)}")
raise
# MAIN EXECUTION
if __name__ == "__main__":
@@ -745,11 +714,9 @@ if __name__ == "__main__":
logger = setup_logging()
client = get_bigquery_client()
# ── FAO ──────────────────────────────────────────────────────────────────
print("\n[1/4] Loading FAO Food Security Data → RAW (Bronze)...")
fao_source = FAODataSource(client)
df_fao = fao_source.run()
print(f" ✓ raw_fao: {len(df_fao):,} rows")
print(f" Indicators : {df_fao['indicator'].nunique()}")
print(f" Countries : {df_fao['country'].nunique()}")
@@ -757,28 +724,23 @@ if __name__ == "__main__":
fao_indicators = df_fao['indicator'].unique()
# ── World Bank ────────────────────────────────────────────────────────────
print("\n[2/4] Loading World Bank Data → RAW (Bronze)...")
wb_source = WorldBankDataSource(client, list(fao_indicators))
df_wb = wb_source.run()
print(f" ✓ raw_worldbank: {len(df_wb):,} rows")
print(f" Matched indicators : {df_wb['indicator_fao'].nunique()}")
print(f" Countries : {df_wb['country'].nunique()}")
if len(df_wb) > 0:
print(f" Year range : {df_wb['year'].min()}{df_wb['year'].max()}")
# ── UNICEF ────────────────────────────────────────────────────────────────
print("\n[3/4] Loading UNICEF Data → RAW (Bronze)...")
unicef_source = UNICEFDataSource(client, list(fao_indicators))
df_unicef = unicef_source.run()
print(f" ✓ raw_unicef: {len(df_unicef):,} rows")
if len(df_unicef) > 0:
print(f" Matched indicators : {df_unicef['indicator_fao'].nunique()}")
print(f" Countries : {df_unicef['country'].nunique()}")
# ── Staging Integration ───────────────────────────────────────────────────
print("\n[4/4] Staging Integration → STAGING (Silver)...")
staging = StagingDataIntegration(client)
df_staging = staging.run()
@@ -790,6 +752,7 @@ if __name__ == "__main__":
print(f"AUDIT : etl_logs, etl_metadata")
print("=" * 60)
# AIRFLOW TASK FUNCTIONS
def run_verify_connection():

37
scripts/test_data.py Normal file
View File

@@ -0,0 +1,37 @@
import requests
import zipfile
import io
import pandas as pd
def run_fao_test():
from scripts.bigquery_config import get_bigquery_client
from scripts.bigquery_helpers import load_to_bigquery, read_from_bigquery
print("--- MEMULAI TEST LOAD FAO KE BIGQUERY ---")
# 1. Extract
url = "https://bulks-faostat.fao.org/production/Food_Security_Data_E_All_Data_(Normalized).zip"
response = requests.get(url, timeout=120)
with zipfile.ZipFile(io.BytesIO(response.content)) as z:
csv_name = [f for f in z.namelist() if f.endswith('.csv')][0]
df = pd.read_csv(z.open(csv_name), encoding='latin-1')
# Ambil 5 baris teratas
df_top5 = df.head(5)
print("HASIL 5 DATA TERATAS:")
print("====================================================")
print(df_top5.to_string(index=False))
print("====================================================")
# 2. Load ke BigQuery
client = get_bigquery_client()
load_to_bigquery(client, df_top5, "raw_fao_test", layer="bronze", write_disposition="WRITE_TRUNCATE")
print(f"Total data yang berhasil di-load: {len(df_top5)} baris.")
# 3. Verify
df_check = read_from_bigquery(client, "raw_fao_test", layer="bronze")
print(f"Verify dari BigQuery: {len(df_check)} baris tersimpan.")
if __name__ == "__main__":
run_fao_test()