Compare commits

...

36 Commits

Author SHA1 Message Date
Debby
cfb0df3a15 indonesian version column 2026-05-19 10:09:48 +07:00
Debby
4bab746779 create colomn indonesian text 2026-05-12 09:55:15 +07:00
Debby
f9d013f8e6 new narrative teks 2026-04-22 16:02:05 +07:00
Debby
40528766bd add metadata aggregate 2026-04-16 08:14:10 +07:00
Debby
74be63226a dag etl 2026-04-15 13:56:55 +07:00
Debby
76b451b2c1 schdule /3 bulan dan tambah metadata 2026-04-15 13:37:40 +07:00
Debby
fa2cf75634 ganti narasi 2026-04-07 23:10:34 +07:00
Debby
f13a76756f capek eror 2026-04-07 20:58:34 +07:00
Debby
e00e9c569d sum indicator problem solve 2026-04-07 20:04:52 +07:00
Debby
00cdf961a9 ulang 2026-04-07 19:14:10 +07:00
Debby
8aed670267 narrative indicator 2026-04-07 18:15:58 +07:00
Debby
327768cc01 delete mark 2026-04-07 11:48:41 +07:00
Debby
7ccc3ea35d rename pillar 2026-04-07 11:07:58 +07:00
Debby
933c370606 try again 2026-04-07 10:19:35 +07:00
Debby
0384e62b01 bismillah done 2026-04-07 10:00:45 +07:00
Debby
cebb6b88eb finish etl code 2026-04-06 16:37:05 +07:00
Debby
5313039b50 bismillah capekk 2026-04-03 08:40:30 +07:00
Debby
f652f2f730 agregate fact selected 2026-04-03 08:09:57 +07:00
Debby
d4bee86331 finish fact dan dim 2026-04-02 20:31:19 +07:00
Debby
47ea9c0492 code last done 2026-04-02 19:58:05 +07:00
Debby
6030268924 salah file 2026-04-02 17:43:31 +07:00
Debby
b54b276c63 done 1 2026-04-02 17:34:33 +07:00
Debby
ba4927f620 rename other to supporting 2026-04-02 07:54:23 +07:00
Debby
ffd8cdf65e year hardcode sdgs 2026-04-02 07:10:41 +07:00
Debby
189e8895c9 coba 1 2026-04-01 21:30:54 +07:00
Debby
b7cab36bd9 framework v2 2026-04-01 20:42:54 +07:00
Debby
d948819535 framework v1 2026-04-01 20:33:16 +07:00
Debby
c3b7674001 keawal 2026-04-01 15:58:59 +07:00
Debby
6a55a91112 code final 2026-04-01 15:46:20 +07:00
Debby
0f93ff6ecd try1 2026-04-01 08:29:18 +07:00
Debby
db60e6e414 sdgs era v5 2026-04-01 08:04:19 +07:00
Debby
236d4b4dc8 sdgs year v4 2026-04-01 07:43:31 +07:00
Debby
64e3095e7a sdgs year v3 2026-04-01 07:13:07 +07:00
Debby
8ae5018a62 sdgs year v2 2026-03-31 23:38:15 +07:00
Debby
0d89c60b12 sdgs year v1 2026-03-31 23:20:54 +07:00
Debby
beb494f89c sdg start year and label condition 2026-03-31 15:42:11 +07:00
6 changed files with 3132 additions and 1568 deletions

View File

@@ -22,6 +22,8 @@ Kimball ETL Flow:
│ agg_pillar_by_country │
│ agg_framework_by_country │
│ agg_framework_asean │
│ ↓ │
│ agg_indicator_norm │
│ │
│ AUDIT : etl_logs, etl_metadata (setiap layer) │
└──────────────────────────────────────────────────────────────────────────┘
@@ -36,13 +38,15 @@ Task Order:
→ dimensional_model_to_gold
→ analytical_layer_to_gold
→ aggregation_to_gold
→ indicator_norm_aggregation_to_gold
Scripts folder harus berisi:
- bigquery_raw_layer.py (run_verify_connection, run_load_fao, ...)
- bigquery_cleaned_layer.py (run_cleaned_integration)
- bigquery_dimensional_model.py (run_dimensional_model)
- bigquery_analytical_layer.py (run_analytical_layer)
- bigquery_analysis_aggregation.py (run_aggregation)
- bigquery_raw_layer.py (run_verify_connection, run_load_fao, ...)
- bigquery_cleaned_layer.py (run_cleaned_integration)
- bigquery_dimensional_model.py (run_dimensional_model)
- bigquery_analytical_layer.py (run_analytical_layer)
- bigquery_analysis_aggregation.py (run_aggregation)
- bigquery_aggraget_fact_selected_layer.py (run_indicator_norm_aggregation)
- bigquery_config.py
- bigquery_helpers.py
- bigquery_datasource.py
@@ -71,11 +75,14 @@ from scripts.bigquery_analytical_layer import (
from scripts.bigquery_aggregate_layer import (
run_aggregation,
)
from scripts.bigquery_aggraget_fact_selected_layer import (
run_indicator_norm_aggregation,
)
# DEFAULT ARGS
default_args = {
'owner': 'data-engineering',
'owner': 'Debby Seftia',
'email': ['d1041221004@student.untan.ac.id'],
}
@@ -86,7 +93,7 @@ with DAG(
description = "Kimball ETL: FAO, World Bank, UNICEF → BigQuery (Bronze → Silver → Gold)",
default_args = default_args,
start_date = datetime(2026, 3, 1),
schedule_interval = "0 0 */3 * *",
schedule_interval = "0 0 1 */3 *",
catchup = False,
tags = ["food-security", "bigquery", "kimball"],
) as dag:
@@ -136,5 +143,21 @@ with DAG(
python_callable = run_aggregation
)
task_verify >> task_fao >> task_worldbank >> task_unicef >> task_staging >> task_cleaned >> task_dimensional >> task_analytical >> task_aggregation
task_indicator_norm = PythonOperator(
task_id = "indicator_norm_aggregation_to_gold",
python_callable = run_indicator_norm_aggregation
)
# Task Dependencies
(
task_verify
>> task_fao
>> task_worldbank
>> task_unicef
>> task_staging
>> task_cleaned
>> task_dimensional
>> task_analytical
>> task_aggregation
>> task_indicator_norm
)

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -40,12 +40,12 @@ def load_staging_data(client: bigquery.Client) -> pd.DataFrame:
"""Load data dari staging_integrated (STAGING/Silver layer)."""
print("\nLoading data from staging_integrated (fs_asean_silver)...")
df_staging = read_from_bigquery(client, 'staging_integrated', layer='silver')
print(f" Loaded : {len(df_staging):,} rows")
print(f" Columns : {len(df_staging.columns)}")
print(f" Sources : {df_staging['source'].nunique()}")
print(f" Indicators : {df_staging['indicator_standardized'].nunique()}")
print(f" Countries : {df_staging['country'].nunique()}")
print(f" Year range : {int(df_staging['year'].min())}-{int(df_staging['year'].max())}")
print(f" Loaded : {len(df_staging):,} rows")
print(f" Columns : {len(df_staging.columns)}")
print(f" Sources : {df_staging['source'].nunique()}")
print(f" Indicators : {df_staging['indicator_standardized'].nunique()}")
print(f" Countries : {df_staging['country'].nunique()}")
print(f" Year range : {int(df_staging['year'].min())}-{int(df_staging['year'].max())}")
return df_staging
@@ -53,6 +53,7 @@ def load_staging_data(client: bigquery.Client) -> pd.DataFrame:
# COLUMN CONSTRAINT HELPERS
# =============================================================================
# Schema constraints — semua varchar max lengths
COLUMN_CONSTRAINTS = {
'source' : 20,
'indicator_original' : 255,
@@ -61,7 +62,7 @@ COLUMN_CONSTRAINTS = {
'year_range' : 20,
'unit' : 20,
'pillar' : 20,
'direction' : 15,
'direction' : 15, # 'higher_better'=13, 'lower_better'=12
}
@@ -100,11 +101,11 @@ def apply_column_constraints(df: pd.DataFrame) -> pd.DataFrame:
)
if truncation_report:
print("\n Column Truncations Applied:")
print("\n Column Truncations Applied:")
for column, info in truncation_report.items():
print(f" - {column}: {info['count']} values truncated to {info['max_length']} chars")
else:
print("\n No truncations needed — all values within constraints")
print("\n No truncations needed — all values within constraints")
return df_constrained
@@ -155,11 +156,11 @@ def standardize_country_names_asean(df: pd.DataFrame, country_column: str = 'cou
def map_country(country):
if pd.isna(country):
return country
s = str(country).strip()
s = str(country).strip()
mapped = ASEAN_MAPPING.get(s.upper(), s)
return mapped[:100] if len(mapped) > 100 else mapped
original = df_clean[country_column].copy()
original = df_clean[country_column].copy()
df_clean[country_column] = df_clean[country_column].apply(map_country)
changes = {orig: new for orig, new in zip(original, df_clean[country_column]) if orig != new}
@@ -176,16 +177,16 @@ def standardize_country_names_asean(df: pd.DataFrame, country_column: str = 'cou
def assign_pillar(indicator_name: str) -> str:
"""
Assign pillar berdasarkan keyword indikator.
Return values: 'Availability', 'Access', 'Utilization', 'Stability', 'Other'
All <= 20 chars (varchar(20) constraint).
Return values: 'Availability', 'Access', 'Utilization', 'Stability', 'Sustainability'
All 20 chars (varchar(20) constraint).
"""
if pd.isna(indicator_name):
return 'Other'
return 'Sustainability'
ind = str(indicator_name).lower()
for kw in ['requirement', 'coefficient', 'losses', 'fat supply']:
if kw in ind:
return 'Other'
return 'Sustainability'
if any(kw in ind for kw in [
'adequacy', 'protein supply', 'supply of protein',
@@ -209,13 +210,12 @@ def assign_pillar(indicator_name: str) -> str:
if any(kw in ind for kw in [
'wasting', 'wasted', 'stunted', 'overweight', 'obese', 'obesity',
'anemia', 'anaemia', 'birthweight', 'breastfeeding', 'drinking water',
'sanitation', 'children under 5', 'newborns with low',
'women of reproductive'
'anemia', 'birthweight', 'breastfeeding', 'drinking water', 'sanitation',
'children under 5', 'newborns with low', 'women of reproductive'
]):
return 'Utilization'
return 'Other'
return 'Sustainability'
# =============================================================================
@@ -226,15 +226,17 @@ def assign_direction(indicator_name: str) -> str:
"""
Assign direction berdasarkan indikator.
Return values: 'higher_better' (13 chars) atau 'lower_better' (12 chars)
Both <= 15 chars (varchar(15) constraint).
Both 15 chars (varchar(15) constraint).
"""
if pd.isna(indicator_name):
return 'higher_better'
ind = str(indicator_name).lower()
# Spesifik lower_better
if 'share of dietary energy supply derived from cereals' in ind:
return 'lower_better'
# Higher_better exceptions — cek sebelum lower_better keywords
for kw in [
'exclusive breastfeeding',
'dietary energy supply',
@@ -246,6 +248,7 @@ def assign_direction(indicator_name: str) -> str:
if kw in ind:
return 'higher_better'
# Lower_better — masalah yang harus diminimalkan
for kw in [
'prevalence of undernourishment',
'prevalence of severe food insecurity',
@@ -256,7 +259,6 @@ def assign_direction(indicator_name: str) -> str:
'prevalence of overweight',
'prevalence of obesity',
'prevalence of anemia',
'prevalence of anaemia',
'prevalence of low birthweight',
'number of people undernourished',
'number of severely food insecure',
@@ -281,9 +283,6 @@ def assign_direction(indicator_name: str) -> str:
'coefficient of variation',
'incidence of caloric losses',
'food losses',
'indicator of food price anomalies',
'proportion of local breeds classified as being at risk',
'agricultural export subsidies',
]:
if kw in ind:
return 'lower_better'
@@ -300,18 +299,19 @@ class CleanedDataLoader:
Loader untuk cleaned integrated data ke STAGING layer (Silver).
Kimball context:
Input : staging_integrated -> STAGING (Silver) — fs_asean_silver
Output : cleaned_integrated -> STAGING (Silver) — fs_asean_silver
Audit : etl_logs, etl_metadata -> AUDIT — fs_asean_audit
Input : staging_integrated STAGING (Silver) — fs_asean_silver
Output : cleaned_integrated STAGING (Silver) — fs_asean_silver
Audit : etl_logs, etl_metadata AUDIT — fs_asean_audit
Pipeline steps:
1. Standardize country names (ASEAN)
2. Remove missing values
3. Remove duplicates
4. Add pillar & direction classification
5. Apply column constraints
6. Load ke BigQuery
7. Log ke Audit layer
4. Add pillar classification
5. Add direction classification
6. Apply column constraints
7. Load ke BigQuery
8. Log ke Audit layer
"""
SCHEMA = [
@@ -355,7 +355,7 @@ class CleanedDataLoader:
def _step_standardize_countries(self, df: pd.DataFrame) -> pd.DataFrame:
print("\n [Step 1/5] Standardize country names...")
df, report = standardize_country_names_asean(df, country_column='country')
print(f" ASEAN countries mapped : {report['countries_mapped']}")
print(f" ASEAN countries mapped : {report['countries_mapped']}")
unique_countries = sorted(df['country'].unique())
print(f" Countries ({len(unique_countries)}) : {', '.join(unique_countries)}")
log_update(self.client, 'STAGING', 'staging_integrated',
@@ -377,9 +377,7 @@ class CleanedDataLoader:
def _step_remove_duplicates(self, df: pd.DataFrame) -> pd.DataFrame:
print("\n [Step 3/5] Remove duplicates...")
exact_dups = df.duplicated().sum()
data_dups = df.duplicated(
subset=['indicator_standardized', 'country', 'year', 'value']
).sum()
data_dups = df.duplicated(subset=['indicator_standardized', 'country', 'year', 'value']).sum()
print(f" Exact duplicates : {exact_dups:,}")
print(f" Data duplicates : {data_dups:,}")
rows_before = len(df)
@@ -393,21 +391,19 @@ class CleanedDataLoader:
def _step_add_classifications(self, df: pd.DataFrame) -> pd.DataFrame:
print("\n [Step 4/5] Add pillar & direction classification...")
df = df.copy()
df['pillar'] = df['indicator_standardized'].apply(assign_pillar)
df['direction'] = df['indicator_standardized'].apply(assign_direction)
pillar_counts = df['pillar'].value_counts()
print(f" Pillar distribution:")
print(f" Pillar distribution:")
for pillar, count in pillar_counts.items():
print(f" - {pillar}: {count:,}")
direction_counts = df['direction'].value_counts()
print(f" Direction distribution:")
print(f" Direction distribution:")
for direction, count in direction_counts.items():
pct = count / len(df) * 100
print(f" - {direction}: {count:,} ({pct:.1f}%)")
return df
def _step_apply_constraints(self, df: pd.DataFrame) -> pd.DataFrame:
@@ -442,6 +438,7 @@ class CleanedDataLoader:
if 'country' in df.columns:
validation['unique_countries'] = int(df['country'].nunique())
# Column length check
column_length_check = {}
for col, max_len in COLUMN_CONSTRAINTS.items():
if col in df.columns:
@@ -460,7 +457,7 @@ class CleanedDataLoader:
def run(self, df: pd.DataFrame) -> int:
"""
Execute full cleaning pipeline -> load ke STAGING (Silver).
Execute full cleaning pipeline load ke STAGING (Silver).
Returns:
int: Rows loaded
@@ -472,6 +469,7 @@ class CleanedDataLoader:
print(" ERROR: DataFrame is empty, nothing to process.")
return 0
# Pipeline steps
df = self._step_standardize_countries(df)
df = self._step_remove_missing(df)
df = self._step_remove_duplicates(df)
@@ -480,6 +478,7 @@ class CleanedDataLoader:
self.metadata['rows_transformed'] = len(df)
# Validate
validation = self.validate_data(df)
self.metadata['validation_metrics'] = validation
@@ -488,12 +487,13 @@ class CleanedDataLoader:
for info in validation.get('column_length_check', {}).values()
)
if not all_within_limits:
print("\n WARNING: Some columns still exceed length constraints!")
print("\n WARNING: Some columns still exceed length constraints!")
for col, info in validation['column_length_check'].items():
if not info['within_limit']:
print(f" - {col}: {info['max_actual_length']} > {info['max_length_constraint']}")
print(f"\n Loading to [STAGING/Silver] {self.table_name} -> fs_asean_silver...")
# Load ke Silver
print(f"\n Loading to [STAGING/Silver] {self.table_name} → fs_asean_silver...")
rows_loaded = load_to_bigquery(
self.client, df, self.table_name,
layer='silver',
@@ -502,8 +502,10 @@ class CleanedDataLoader:
)
self.metadata['rows_loaded'] = rows_loaded
# Audit logs
log_update(self.client, 'STAGING', self.table_name, 'full_refresh', rows_loaded)
# ETL metadata
self.metadata['end_time'] = datetime.now()
self.metadata['duration_seconds'] = (
self.metadata['end_time'] - self.metadata['start_time']
@@ -514,31 +516,33 @@ class CleanedDataLoader:
self.metadata['validation_metrics'] = json.dumps(validation)
save_etl_metadata(self.client, self.metadata)
print(f"\n Cleaned Integration completed: {rows_loaded:,} rows")
# Summary
print(f"\n ✓ Cleaned Integration completed: {rows_loaded:,} rows")
print(f" Duration : {self.metadata['duration_seconds']:.2f}s")
print(f" Completeness : {validation['completeness_pct']:.2f}%")
if 'year_range' in validation:
yr = validation['year_range']
if yr['min'] and yr['max']:
print(f" Year range : {yr['min']}-{yr['max']}")
print(f" Year range : {yr['min']}{yr['max']}")
print(f" Indicators : {validation.get('unique_indicators', '-')}")
print(f" Countries : {validation.get('unique_countries', '-')}")
print(f"\n Schema Validation:")
for col, info in validation.get('column_length_check', {}).items():
status = "OK" if info['within_limit'] else "FAIL"
print(f" [{status}] {col}: {info['max_actual_length']}/{info['max_length_constraint']}")
print(f"\n Metadata -> [AUDIT] etl_metadata")
status = "" if info['within_limit'] else ""
print(f" {status} {col}: {info['max_actual_length']}/{info['max_length_constraint']}")
print(f"\n Metadata [AUDIT] etl_metadata")
return rows_loaded
# =============================================================================
# AIRFLOW TASK FUNCTIONS
# AIRFLOW TASK FUNCTIONS ← sama polanya dengan raw layer
# =============================================================================
def run_cleaned_integration():
"""
Airflow task: Load cleaned_integrated dari staging_integrated.
Dipanggil oleh DAG setelah task staging_integration_to_silver selesai.
"""
from scripts.bigquery_config import get_bigquery_client
@@ -557,21 +561,21 @@ if __name__ == "__main__":
print("=" * 60)
print("BIGQUERY CLEANED LAYER ETL")
print("Kimball DW Architecture")
print(" Input : STAGING (Silver) -> staging_integrated")
print(" Output : STAGING (Silver) -> cleaned_integrated")
print(" Audit : AUDIT -> etl_logs, etl_metadata")
print(" Input : STAGING (Silver) staging_integrated")
print(" Output : STAGING (Silver) cleaned_integrated")
print(" Audit : AUDIT etl_logs, etl_metadata")
print("=" * 60)
logger = setup_logging()
client = get_bigquery_client()
df_staging = load_staging_data(client)
print("\n[1/1] Cleaned Integration -> STAGING (Silver)...")
print("\n[1/1] Cleaned Integration STAGING (Silver)...")
loader = CleanedDataLoader(client, load_mode='full_refresh')
final_count = loader.run(df_staging)
print("\n" + "=" * 60)
print("[OK] CLEANED LAYER ETL COMPLETED")
print(f" STAGING (Silver) : cleaned_integrated ({final_count:,} rows)")
print(f" AUDIT : etl_logs, etl_metadata")
print(" CLEANED LAYER ETL COMPLETED")
print(f" 🥈 STAGING (Silver) : cleaned_integrated ({final_count:,} rows)")
print(f" 📋 AUDIT : etl_logs, etl_metadata")
print("=" * 60)

View File

@@ -46,9 +46,9 @@ class DimensionalModelLoader:
Loader untuk dimensional model ke DW layer (Gold) — fs_asean_gold.
Kimball context:
Input : cleaned_integrated -> STAGING (Silver) — fs_asean_silver
Output : dim_* + fact_* -> DW (Gold) — fs_asean_gold
Audit : etl_logs, etl_metadata -> AUDIT — fs_asean_audit
Input : cleaned_integrated STAGING (Silver) — fs_asean_silver
Output : dim_* + fact_* DW (Gold) — fs_asean_gold
Audit : etl_logs, etl_metadata AUDIT — fs_asean_audit
Pipeline steps:
1. Load dim_country
@@ -117,7 +117,7 @@ class DimensionalModelLoader:
"""
try:
self.client.query(query).result()
self.logger.info(f" [OK] FK: {table_name}.{fk_column} -> {ref_table}.{ref_column}")
self.logger.info(f" [OK] FK: {table_name}.{fk_column} {ref_table}.{ref_column}")
except Exception as e:
if "already exists" in str(e).lower():
self.logger.info(f" [INFO] FK already exists: {constraint_name}")
@@ -129,7 +129,7 @@ class DimensionalModelLoader:
# ------------------------------------------------------------------
def _save_table_metadata(self, table_name: str):
meta = self.load_metadata[table_name]
meta = self.load_metadata[table_name]
metadata = {
'source_class' : self.__class__.__name__,
'table_name' : table_name,
@@ -145,7 +145,7 @@ class DimensionalModelLoader:
}
try:
save_etl_metadata(self.client, metadata)
self.logger.info(f" Metadata -> [AUDIT] etl_metadata")
self.logger.info(f" Metadata [AUDIT] etl_metadata")
except Exception as e:
self.logger.warning(f" [WARN] Could not save metadata for {table_name}: {e}")
@@ -156,13 +156,13 @@ class DimensionalModelLoader:
def load_dim_time(self):
table_name = 'dim_time'
self.load_metadata[table_name]['start_time'] = datetime.now()
self.logger.info("Loading dim_time -> [DW/Gold] fs_asean_gold...")
self.logger.info("Loading dim_time [DW/Gold] fs_asean_gold...")
try:
if 'year_range' in self.df_clean.columns:
dim_time = self.df_clean[['year', 'year_range']].drop_duplicates().copy()
else:
dim_time = self.df_clean[['year']].drop_duplicates().copy()
dim_time = self.df_clean[['year']].drop_duplicates().copy()
dim_time['year_range'] = None
dim_time['year'] = dim_time['year'].astype(int)
@@ -194,10 +194,10 @@ class DimensionalModelLoader:
pass
return pd.Series({'year': year, 'start_year': start_year, 'end_year': end_year})
parsed = dim_time.apply(parse_year_range, axis=1)
dim_time['year'] = parsed['year'].astype(int)
dim_time['start_year'] = parsed['start_year'].astype(int)
dim_time['end_year'] = parsed['end_year'].astype(int)
parsed = dim_time.apply(parse_year_range, axis=1)
dim_time['year'] = parsed['year'].astype(int)
dim_time['start_year'] = parsed['start_year'].astype(int)
dim_time['end_year'] = parsed['end_year'].astype(int)
dim_time['is_year_range'] = (dim_time['start_year'] != dim_time['end_year'])
dim_time['decade'] = (dim_time['year'] // 10) * 10
dim_time['is_range'] = (dim_time['start_year'] != dim_time['end_year']).astype(int)
@@ -229,7 +229,7 @@ class DimensionalModelLoader:
)
log_update(self.client, 'DW', table_name, 'full_load', rows_loaded)
self._save_table_metadata(table_name)
self.logger.info(f" dim_time: {rows_loaded} rows\n")
self.logger.info(f" dim_time: {rows_loaded} rows\n")
return rows_loaded
except Exception as e:
@@ -240,11 +240,11 @@ class DimensionalModelLoader:
def load_dim_country(self):
table_name = 'dim_country'
self.load_metadata[table_name]['start_time'] = datetime.now()
self.logger.info("Loading dim_country -> [DW/Gold] fs_asean_gold...")
self.logger.info("Loading dim_country [DW/Gold] fs_asean_gold...")
try:
dim_country = self.df_clean[['country']].drop_duplicates().copy()
dim_country.columns = ['country_name']
dim_country = self.df_clean[['country']].drop_duplicates().copy()
dim_country.columns = ['country_name']
region_mapping = {
'Brunei Darussalam': ('Southeast Asia', 'ASEAN'),
@@ -270,9 +270,7 @@ class DimensionalModelLoader:
lambda x: region_mapping.get(x, ('Unknown', 'Unknown'))[1])
dim_country['iso_code'] = dim_country['country_name'].map(iso_mapping)
dim_country_final = dim_country[
['country_name', 'region', 'subregion', 'iso_code']
].copy()
dim_country_final = dim_country[['country_name', 'region', 'subregion', 'iso_code']].copy()
dim_country_final = dim_country_final.reset_index(drop=True)
dim_country_final.insert(0, 'country_id', range(1, len(dim_country_final) + 1))
@@ -295,7 +293,7 @@ class DimensionalModelLoader:
)
log_update(self.client, 'DW', table_name, 'full_load', rows_loaded)
self._save_table_metadata(table_name)
self.logger.info(f" dim_country: {rows_loaded} rows\n")
self.logger.info(f" dim_country: {rows_loaded} rows\n")
return rows_loaded
except Exception as e:
@@ -304,83 +302,58 @@ class DimensionalModelLoader:
raise
def load_dim_indicator(self):
"""
Load dim_indicator ke Gold layer.
Kolom yang dimuat:
indicator_id — surrogate key
indicator_name — nama standar indikator
indicator_category — kategori (Health & Nutrition, dll.)
unit — satuan ukuran
direction — higher_better / lower_better
"""
table_name = 'dim_indicator'
self.load_metadata[table_name]['start_time'] = datetime.now()
self.logger.info("Loading dim_indicator -> [DW/Gold] fs_asean_gold...")
self.logger.info("Loading dim_indicator [DW/Gold] fs_asean_gold...")
try:
has_direction = 'direction' in self.df_clean.columns
has_unit = 'unit' in self.df_clean.columns
has_category = 'indicator_category' in self.df_clean.columns
dim_indicator = self.df_clean[['indicator_standardized']].drop_duplicates().copy()
dim_indicator.columns = ['indicator_name']
dim_indicator = self.df_clean[['indicator_standardized']].drop_duplicates().copy()
dim_indicator.columns = ['indicator_name']
# Unit
if has_unit:
unit_map = self.df_clean[['indicator_standardized', 'unit']].drop_duplicates()
unit_map.columns = ['indicator_name', 'unit']
dim_indicator = dim_indicator.merge(unit_map, on='indicator_name', how='left')
unit_map.columns = ['indicator_name', 'unit']
dim_indicator = dim_indicator.merge(unit_map, on='indicator_name', how='left')
else:
dim_indicator['unit'] = None
# Direction
if has_direction:
dir_map = self.df_clean[['indicator_standardized', 'direction']].drop_duplicates()
dir_map.columns = ['indicator_name', 'direction']
dim_indicator = dim_indicator.merge(dir_map, on='indicator_name', how='left')
dir_map.columns = ['indicator_name', 'direction']
dim_indicator = dim_indicator.merge(dir_map, on='indicator_name', how='left')
self.logger.info(" [OK] direction column from cleaned_integrated")
else:
dim_indicator['direction'] = 'higher_better'
self.logger.warning(" [WARN] direction not found, default: higher_better")
# Indicator category
if has_category:
cat_map = self.df_clean[
['indicator_standardized', 'indicator_category']
].drop_duplicates()
cat_map.columns = ['indicator_name', 'indicator_category']
dim_indicator = dim_indicator.merge(cat_map, on='indicator_name', how='left')
cat_map = self.df_clean[['indicator_standardized', 'indicator_category']].drop_duplicates()
cat_map.columns = ['indicator_name', 'indicator_category']
dim_indicator = dim_indicator.merge(cat_map, on='indicator_name', how='left')
else:
def categorize_indicator(name):
n = str(name).lower()
if any(w in n for w in [
'undernourishment', 'malnutrition', 'stunting',
'wasting', 'anemia', 'anaemia', 'food security',
'food insecure', 'hunger'
]):
if any(w in n for w in ['undernourishment', 'malnutrition', 'stunting',
'wasting', 'anemia', 'food security', 'food insecure', 'hunger']):
return 'Health & Nutrition'
elif any(w in n for w in [
'production', 'yield', 'cereal', 'crop',
'import dependency', 'share of dietary'
]):
elif any(w in n for w in ['production', 'yield', 'cereal', 'crop',
'import dependency', 'share of dietary']):
return 'Agricultural Production'
elif any(w in n for w in ['import', 'export', 'trade']):
return 'Trade'
elif any(w in n for w in ['gdp', 'income', 'economic']):
return 'Economic'
elif any(w in n for w in [
'water', 'sanitation', 'infrastructure', 'rail'
]):
elif any(w in n for w in ['water', 'sanitation', 'infrastructure', 'rail']):
return 'Infrastructure'
else:
return 'Other'
dim_indicator['indicator_category'] = dim_indicator['indicator_name'].apply(
categorize_indicator
)
dim_indicator = dim_indicator.drop_duplicates(subset=['indicator_name'], keep='first')
return 'Sustainability'
dim_indicator['indicator_category'] = dim_indicator['indicator_name'].apply(categorize_indicator)
dim_indicator = dim_indicator.drop_duplicates(subset=['indicator_name'], keep='first')
dim_indicator_final = dim_indicator[
['indicator_name', 'indicator_category', 'unit', 'direction']
].copy()
@@ -401,22 +374,17 @@ class DimensionalModelLoader:
)
self._add_primary_key(table_name, 'indicator_id')
# Log distribusi
for label, col in [
('Categories', 'indicator_category'),
('Direction', 'direction'),
]:
for label, col in [('Categories', 'indicator_category'), ('Direction', 'direction')]:
self.logger.info(f" {label}:")
for val, cnt in dim_indicator_final[col].value_counts().items():
pct = cnt / len(dim_indicator_final) * 100
self.logger.info(f" - {val}: {cnt} ({pct:.1f}%)")
self.logger.info(f" - {val}: {cnt} ({cnt/len(dim_indicator_final)*100:.1f}%)")
self.load_metadata[table_name].update(
{'rows_loaded': rows_loaded, 'status': 'success', 'end_time': datetime.now()}
)
log_update(self.client, 'DW', table_name, 'full_load', rows_loaded)
self._save_table_metadata(table_name)
self.logger.info(f" dim_indicator: {rows_loaded} rows\n")
self.logger.info(f" dim_indicator: {rows_loaded} rows\n")
return rows_loaded
except Exception as e:
@@ -427,7 +395,7 @@ class DimensionalModelLoader:
def load_dim_source(self):
table_name = 'dim_source'
self.load_metadata[table_name]['start_time'] = datetime.now()
self.logger.info("Loading dim_source -> [DW/Gold] fs_asean_gold...")
self.logger.info("Loading dim_source [DW/Gold] fs_asean_gold...")
try:
source_details = {
@@ -487,7 +455,7 @@ class DimensionalModelLoader:
)
log_update(self.client, 'DW', table_name, 'full_load', rows_loaded)
self._save_table_metadata(table_name)
self.logger.info(f" dim_source: {rows_loaded} rows\n")
self.logger.info(f" dim_source: {rows_loaded} rows\n")
return rows_loaded
except Exception as e:
@@ -498,15 +466,15 @@ class DimensionalModelLoader:
def load_dim_pillar(self):
table_name = 'dim_pillar'
self.load_metadata[table_name]['start_time'] = datetime.now()
self.logger.info("Loading dim_pillar -> [DW/Gold] fs_asean_gold...")
self.logger.info("Loading dim_pillar [DW/Gold] fs_asean_gold...")
try:
pillar_codes = {
'Availability': 'AVL', 'Access' : 'ACC',
'Utilization' : 'UTL', 'Stability': 'STB', 'Other': 'OTH',
'Utilization' : 'UTL', 'Stability': 'STB', 'Sustainability': 'STN',
}
pillars_data = [
{'pillar_name': p, 'pillar_code': pillar_codes.get(p, 'OTH')}
{'pillar_name': p, 'pillar_code': pillar_codes.get(p, 'STN')}
for p in self.df_clean['pillar'].unique()
]
@@ -533,7 +501,7 @@ class DimensionalModelLoader:
)
log_update(self.client, 'DW', table_name, 'full_load', rows_loaded)
self._save_table_metadata(table_name)
self.logger.info(f" dim_pillar: {rows_loaded} rows\n")
self.logger.info(f" dim_pillar: {rows_loaded} rows\n")
return rows_loaded
except Exception as e:
@@ -548,9 +516,10 @@ class DimensionalModelLoader:
def load_fact_food_security(self):
table_name = 'fact_food_security'
self.load_metadata[table_name]['start_time'] = datetime.now()
self.logger.info("Loading fact_food_security -> [DW/Gold] fs_asean_gold...")
self.logger.info("Loading fact_food_security [DW/Gold] fs_asean_gold...")
try:
# Load dims dari Gold untuk FK resolution
dim_country = read_from_bigquery(self.client, 'dim_country', layer='gold')
dim_indicator = read_from_bigquery(self.client, 'dim_indicator', layer='gold')
dim_time = read_from_bigquery(self.client, 'dim_time', layer='gold')
@@ -592,9 +561,9 @@ class DimensionalModelLoader:
fact_table['start_year'] = fact_table['year'].astype(int)
fact_table['end_year'] = fact_table['year'].astype(int)
# Resolve FKs
fact_table = fact_table.merge(
dim_country[['country_id', 'country_name']].rename(
columns={'country_name': 'country'}),
dim_country[['country_id', 'country_name']].rename(columns={'country_name': 'country'}),
on='country', how='left'
)
fact_table = fact_table.merge(
@@ -607,16 +576,15 @@ class DimensionalModelLoader:
on=['start_year', 'end_year'], how='left'
)
fact_table = fact_table.merge(
dim_source[['source_id', 'source_name']].rename(
columns={'source_name': 'source'}),
dim_source[['source_id', 'source_name']].rename(columns={'source_name': 'source'}),
on='source', how='left'
)
fact_table = fact_table.merge(
dim_pillar[['pillar_id', 'pillar_name']].rename(
columns={'pillar_name': 'pillar'}),
dim_pillar[['pillar_id', 'pillar_name']].rename(columns={'pillar_name': 'pillar'}),
on='pillar', how='left'
)
# Filter hanya row dengan FK lengkap
fact_table = fact_table[
fact_table['country_id'].notna() &
fact_table['indicator_id'].notna() &
@@ -653,6 +621,7 @@ class DimensionalModelLoader:
layer='gold', write_disposition="WRITE_TRUNCATE", schema=schema
)
# Add PK + FKs
self._add_primary_key(table_name, 'fact_id')
self._add_foreign_key(table_name, 'country_id', 'dim_country', 'country_id')
self._add_foreign_key(table_name, 'indicator_id', 'dim_indicator', 'indicator_id')
@@ -665,7 +634,7 @@ class DimensionalModelLoader:
)
log_update(self.client, 'DW', table_name, 'full_load', rows_loaded)
self._save_table_metadata(table_name)
self.logger.info(f" fact_food_security: {rows_loaded:,} rows\n")
self.logger.info(f" fact_food_security: {rows_loaded:,} rows\n")
return rows_loaded
except Exception as e:
@@ -748,15 +717,11 @@ class DimensionalModelLoader:
FROM `{get_table_id('dim_indicator', layer='gold')}`
GROUP BY direction ORDER BY direction
"""
df_dir = self.client.query(query_dir).result().to_dataframe(
create_bqstorage_client=False
)
df_dir = self.client.query(query_dir).result().to_dataframe(create_bqstorage_client=False)
if len(df_dir) > 0:
self.logger.info(f"\n Direction Distribution:")
for _, row in df_dir.iterrows():
self.logger.info(
f" {row['direction']:15s}: {int(row['count']):>5,} indicators"
)
self.logger.info(f" {row['direction']:15s}: {int(row['count']):>5,} indicators")
self.logger.info("\n [OK] Validation completed")
except Exception as e:
@@ -773,19 +738,22 @@ class DimensionalModelLoader:
self.pipeline_metadata['rows_fetched'] = len(self.df_clean)
self.logger.info("\n" + "=" * 60)
self.logger.info("DIMENSIONAL MODEL LOAD — DW (Gold) -> fs_asean_gold")
self.logger.info("DIMENSIONAL MODEL LOAD — DW (Gold) fs_asean_gold")
self.logger.info("=" * 60)
self.logger.info("\nLOADING DIMENSION TABLES -> fs_asean_gold")
# Dimensions
self.logger.info("\nLOADING DIMENSION TABLES → fs_asean_gold")
self.load_dim_country()
self.load_dim_indicator()
self.load_dim_time()
self.load_dim_source()
self.load_dim_pillar()
self.logger.info("\nLOADING FACT TABLE -> fs_asean_gold")
# Fact
self.logger.info("\nLOADING FACT TABLE → fs_asean_gold")
self.load_fact_food_security()
# Validate
self.validate_constraints()
self.validate_data_load()
@@ -794,23 +762,22 @@ class DimensionalModelLoader:
total_loaded = sum(m['rows_loaded'] for m in self.load_metadata.values())
self.pipeline_metadata.update({
'end_time' : pipeline_end,
'duration_seconds' : duration,
'rows_transformed' : total_loaded,
'rows_loaded' : total_loaded,
'end_time' : pipeline_end,
'duration_seconds' : duration,
'rows_transformed' : total_loaded,
'rows_loaded' : total_loaded,
'execution_timestamp': self.pipeline_metadata['start_time'],
'completeness_pct' : 100.0,
'config_snapshot' : json.dumps({'load_mode': 'full_refresh', 'layer': 'gold'}),
'validation_metrics' : json.dumps(
{t: m['status'] for t, m in self.load_metadata.items()}
),
'table_name' : 'dimensional_model_pipeline',
'completeness_pct' : 100.0,
'config_snapshot' : json.dumps({'load_mode': 'full_refresh', 'layer': 'gold'}),
'validation_metrics': json.dumps({t: m['status'] for t, m in self.load_metadata.items()}),
'table_name' : 'dimensional_model_pipeline',
})
try:
save_etl_metadata(self.client, self.pipeline_metadata)
except Exception as e:
self.logger.warning(f" [WARN] Could not save pipeline metadata: {e}")
# Summary
self.logger.info("\n" + "=" * 60)
self.logger.info("DIMENSIONAL MODEL LOAD COMPLETED")
self.logger.info("=" * 60)
@@ -818,19 +785,20 @@ class DimensionalModelLoader:
self.logger.info(f" Duration : {duration:.2f}s")
self.logger.info(f" Tables :")
for tbl, meta in self.load_metadata.items():
icon = "OK" if meta['status'] == 'success' else "FAIL"
self.logger.info(f" [{icon}] {tbl:25s}: {meta['rows_loaded']:>10,} rows")
self.logger.info(f"\n Metadata -> [AUDIT] etl_metadata")
icon = "" if meta['status'] == 'success' else ""
self.logger.info(f" {icon} {tbl:25s}: {meta['rows_loaded']:>10,} rows")
self.logger.info(f"\n Metadata [AUDIT] etl_metadata")
self.logger.info("=" * 60)
# =============================================================================
# AIRFLOW TASK FUNCTIONS
# AIRFLOW TASK FUNCTIONS ← sama polanya dengan raw & cleaned layer
# =============================================================================
def run_dimensional_model():
"""
Airflow task: Load dimensional model dari cleaned_integrated.
Dipanggil oleh DAG setelah task cleaned_integration_to_silver selesai.
"""
from scripts.bigquery_config import get_bigquery_client
@@ -849,9 +817,9 @@ if __name__ == "__main__":
print("=" * 60)
print("BIGQUERY DIMENSIONAL MODEL LOAD")
print("Kimball DW Architecture")
print(" Input : STAGING (Silver) -> cleaned_integrated (fs_asean_silver)")
print(" Output : DW (Gold) -> dim_*, fact_* (fs_asean_gold)")
print(" Audit : AUDIT -> etl_logs, etl_metadata (fs_asean_audit)")
print(" Input : STAGING (Silver) cleaned_integrated (fs_asean_silver)")
print(" Output : DW (Gold) dim_*, fact_* (fs_asean_gold)")
print(" Audit : AUDIT etl_logs, etl_metadata (fs_asean_audit)")
print("=" * 60)
logger = setup_logging()
@@ -859,22 +827,24 @@ if __name__ == "__main__":
print("\nLoading cleaned_integrated (fs_asean_silver)...")
df_clean = read_from_bigquery(client, 'cleaned_integrated', layer='silver')
print(f" Loaded : {len(df_clean):,} rows")
print(f" Loaded : {len(df_clean):,} rows")
print(f" Columns : {len(df_clean.columns)}")
print(f" Sources : {df_clean['source'].nunique()}")
print(f" Indicators : {df_clean['indicator_standardized'].nunique()}")
print(f" Countries : {df_clean['country'].nunique()}")
print(f" Year range : {int(df_clean['year'].min())}-{int(df_clean['year'].max())}")
print(f" Year range : {int(df_clean['year'].min())}{int(df_clean['year'].max())}")
if 'direction' in df_clean.columns:
print(f" Direction : {df_clean['direction'].value_counts().to_dict()}")
else:
print(f" [WARN] direction column not found — run bigquery_cleaned_layer.py first")
print("\n[1/1] Dimensional Model Load -> DW (Gold)...")
print("\n[1/1] Dimensional Model Load DW (Gold)...")
loader = DimensionalModelLoader(client, df_clean)
loader.run()
print("\n" + "=" * 60)
print("[OK] DIMENSIONAL MODEL ETL COMPLETED")
print(" DW (Gold) : dim_country, dim_indicator, dim_time,")
print(" dim_source, dim_pillar, fact_food_security")
print(" AUDIT : etl_logs, etl_metadata")
print(" DIMENSIONAL MODEL ETL COMPLETED")
print(" 🥇 DW (Gold) : dim_country, dim_indicator, dim_time,")
print(" dim_source, dim_pillar, fact_food_security")
print(" 📋 AUDIT : etl_logs, etl_metadata")
print("=" * 60)