Compare commits
36 Commits
ddc9fb3b48
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
cfb0df3a15 | ||
|
|
4bab746779 | ||
|
|
f9d013f8e6 | ||
|
|
40528766bd | ||
|
|
74be63226a | ||
|
|
76b451b2c1 | ||
|
|
fa2cf75634 | ||
|
|
f13a76756f | ||
|
|
e00e9c569d | ||
|
|
00cdf961a9 | ||
|
|
8aed670267 | ||
|
|
327768cc01 | ||
|
|
7ccc3ea35d | ||
|
|
933c370606 | ||
|
|
0384e62b01 | ||
|
|
cebb6b88eb | ||
|
|
5313039b50 | ||
|
|
f652f2f730 | ||
|
|
d4bee86331 | ||
|
|
47ea9c0492 | ||
|
|
6030268924 | ||
|
|
b54b276c63 | ||
|
|
ba4927f620 | ||
|
|
ffd8cdf65e | ||
|
|
189e8895c9 | ||
|
|
b7cab36bd9 | ||
|
|
d948819535 | ||
|
|
c3b7674001 | ||
|
|
6a55a91112 | ||
|
|
0f93ff6ecd | ||
|
|
db60e6e414 | ||
|
|
236d4b4dc8 | ||
|
|
64e3095e7a | ||
|
|
8ae5018a62 | ||
|
|
0d89c60b12 | ||
|
|
beb494f89c |
@@ -22,6 +22,8 @@ Kimball ETL Flow:
|
|||||||
│ agg_pillar_by_country │
|
│ agg_pillar_by_country │
|
||||||
│ agg_framework_by_country │
|
│ agg_framework_by_country │
|
||||||
│ agg_framework_asean │
|
│ agg_framework_asean │
|
||||||
|
│ ↓ │
|
||||||
|
│ agg_indicator_norm │
|
||||||
│ │
|
│ │
|
||||||
│ AUDIT : etl_logs, etl_metadata (setiap layer) │
|
│ AUDIT : etl_logs, etl_metadata (setiap layer) │
|
||||||
└──────────────────────────────────────────────────────────────────────────┘
|
└──────────────────────────────────────────────────────────────────────────┘
|
||||||
@@ -36,13 +38,15 @@ Task Order:
|
|||||||
→ dimensional_model_to_gold
|
→ dimensional_model_to_gold
|
||||||
→ analytical_layer_to_gold
|
→ analytical_layer_to_gold
|
||||||
→ aggregation_to_gold
|
→ aggregation_to_gold
|
||||||
|
→ indicator_norm_aggregation_to_gold
|
||||||
|
|
||||||
Scripts folder harus berisi:
|
Scripts folder harus berisi:
|
||||||
- bigquery_raw_layer.py (run_verify_connection, run_load_fao, ...)
|
- bigquery_raw_layer.py (run_verify_connection, run_load_fao, ...)
|
||||||
- bigquery_cleaned_layer.py (run_cleaned_integration)
|
- bigquery_cleaned_layer.py (run_cleaned_integration)
|
||||||
- bigquery_dimensional_model.py (run_dimensional_model)
|
- bigquery_dimensional_model.py (run_dimensional_model)
|
||||||
- bigquery_analytical_layer.py (run_analytical_layer)
|
- bigquery_analytical_layer.py (run_analytical_layer)
|
||||||
- bigquery_analysis_aggregation.py (run_aggregation)
|
- bigquery_analysis_aggregation.py (run_aggregation)
|
||||||
|
- bigquery_aggraget_fact_selected_layer.py (run_indicator_norm_aggregation)
|
||||||
- bigquery_config.py
|
- bigquery_config.py
|
||||||
- bigquery_helpers.py
|
- bigquery_helpers.py
|
||||||
- bigquery_datasource.py
|
- bigquery_datasource.py
|
||||||
@@ -71,11 +75,14 @@ from scripts.bigquery_analytical_layer import (
|
|||||||
from scripts.bigquery_aggregate_layer import (
|
from scripts.bigquery_aggregate_layer import (
|
||||||
run_aggregation,
|
run_aggregation,
|
||||||
)
|
)
|
||||||
|
from scripts.bigquery_aggraget_fact_selected_layer import (
|
||||||
|
run_indicator_norm_aggregation,
|
||||||
|
)
|
||||||
|
|
||||||
# DEFAULT ARGS
|
# DEFAULT ARGS
|
||||||
|
|
||||||
default_args = {
|
default_args = {
|
||||||
'owner': 'data-engineering',
|
'owner': 'Debby Seftia',
|
||||||
'email': ['d1041221004@student.untan.ac.id'],
|
'email': ['d1041221004@student.untan.ac.id'],
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -86,7 +93,7 @@ with DAG(
|
|||||||
description = "Kimball ETL: FAO, World Bank, UNICEF → BigQuery (Bronze → Silver → Gold)",
|
description = "Kimball ETL: FAO, World Bank, UNICEF → BigQuery (Bronze → Silver → Gold)",
|
||||||
default_args = default_args,
|
default_args = default_args,
|
||||||
start_date = datetime(2026, 3, 1),
|
start_date = datetime(2026, 3, 1),
|
||||||
schedule_interval = "0 0 */3 * *",
|
schedule_interval = "0 0 1 */3 *",
|
||||||
catchup = False,
|
catchup = False,
|
||||||
tags = ["food-security", "bigquery", "kimball"],
|
tags = ["food-security", "bigquery", "kimball"],
|
||||||
) as dag:
|
) as dag:
|
||||||
@@ -136,5 +143,21 @@ with DAG(
|
|||||||
python_callable = run_aggregation
|
python_callable = run_aggregation
|
||||||
)
|
)
|
||||||
|
|
||||||
|
task_indicator_norm = PythonOperator(
|
||||||
task_verify >> task_fao >> task_worldbank >> task_unicef >> task_staging >> task_cleaned >> task_dimensional >> task_analytical >> task_aggregation
|
task_id = "indicator_norm_aggregation_to_gold",
|
||||||
|
python_callable = run_indicator_norm_aggregation
|
||||||
|
)
|
||||||
|
|
||||||
|
# Task Dependencies
|
||||||
|
(
|
||||||
|
task_verify
|
||||||
|
>> task_fao
|
||||||
|
>> task_worldbank
|
||||||
|
>> task_unicef
|
||||||
|
>> task_staging
|
||||||
|
>> task_cleaned
|
||||||
|
>> task_dimensional
|
||||||
|
>> task_analytical
|
||||||
|
>> task_aggregation
|
||||||
|
>> task_indicator_norm
|
||||||
|
)
|
||||||
1402
scripts/bigquery_aggraget_fact_selected_layer.py
Normal file
1402
scripts/bigquery_aggraget_fact_selected_layer.py
Normal file
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
@@ -40,12 +40,12 @@ def load_staging_data(client: bigquery.Client) -> pd.DataFrame:
|
|||||||
"""Load data dari staging_integrated (STAGING/Silver layer)."""
|
"""Load data dari staging_integrated (STAGING/Silver layer)."""
|
||||||
print("\nLoading data from staging_integrated (fs_asean_silver)...")
|
print("\nLoading data from staging_integrated (fs_asean_silver)...")
|
||||||
df_staging = read_from_bigquery(client, 'staging_integrated', layer='silver')
|
df_staging = read_from_bigquery(client, 'staging_integrated', layer='silver')
|
||||||
print(f" Loaded : {len(df_staging):,} rows")
|
print(f" ✓ Loaded : {len(df_staging):,} rows")
|
||||||
print(f" Columns : {len(df_staging.columns)}")
|
print(f" Columns : {len(df_staging.columns)}")
|
||||||
print(f" Sources : {df_staging['source'].nunique()}")
|
print(f" Sources : {df_staging['source'].nunique()}")
|
||||||
print(f" Indicators : {df_staging['indicator_standardized'].nunique()}")
|
print(f" Indicators : {df_staging['indicator_standardized'].nunique()}")
|
||||||
print(f" Countries : {df_staging['country'].nunique()}")
|
print(f" Countries : {df_staging['country'].nunique()}")
|
||||||
print(f" Year range : {int(df_staging['year'].min())}-{int(df_staging['year'].max())}")
|
print(f" Year range : {int(df_staging['year'].min())}-{int(df_staging['year'].max())}")
|
||||||
return df_staging
|
return df_staging
|
||||||
|
|
||||||
|
|
||||||
@@ -53,6 +53,7 @@ def load_staging_data(client: bigquery.Client) -> pd.DataFrame:
|
|||||||
# COLUMN CONSTRAINT HELPERS
|
# COLUMN CONSTRAINT HELPERS
|
||||||
# =============================================================================
|
# =============================================================================
|
||||||
|
|
||||||
|
# Schema constraints — semua varchar max lengths
|
||||||
COLUMN_CONSTRAINTS = {
|
COLUMN_CONSTRAINTS = {
|
||||||
'source' : 20,
|
'source' : 20,
|
||||||
'indicator_original' : 255,
|
'indicator_original' : 255,
|
||||||
@@ -61,7 +62,7 @@ COLUMN_CONSTRAINTS = {
|
|||||||
'year_range' : 20,
|
'year_range' : 20,
|
||||||
'unit' : 20,
|
'unit' : 20,
|
||||||
'pillar' : 20,
|
'pillar' : 20,
|
||||||
'direction' : 15,
|
'direction' : 15, # 'higher_better'=13, 'lower_better'=12
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@@ -100,11 +101,11 @@ def apply_column_constraints(df: pd.DataFrame) -> pd.DataFrame:
|
|||||||
)
|
)
|
||||||
|
|
||||||
if truncation_report:
|
if truncation_report:
|
||||||
print("\n Column Truncations Applied:")
|
print("\n ⚠ Column Truncations Applied:")
|
||||||
for column, info in truncation_report.items():
|
for column, info in truncation_report.items():
|
||||||
print(f" - {column}: {info['count']} values truncated to {info['max_length']} chars")
|
print(f" - {column}: {info['count']} values truncated to {info['max_length']} chars")
|
||||||
else:
|
else:
|
||||||
print("\n No truncations needed — all values within constraints")
|
print("\n ✓ No truncations needed — all values within constraints")
|
||||||
|
|
||||||
return df_constrained
|
return df_constrained
|
||||||
|
|
||||||
@@ -155,11 +156,11 @@ def standardize_country_names_asean(df: pd.DataFrame, country_column: str = 'cou
|
|||||||
def map_country(country):
|
def map_country(country):
|
||||||
if pd.isna(country):
|
if pd.isna(country):
|
||||||
return country
|
return country
|
||||||
s = str(country).strip()
|
s = str(country).strip()
|
||||||
mapped = ASEAN_MAPPING.get(s.upper(), s)
|
mapped = ASEAN_MAPPING.get(s.upper(), s)
|
||||||
return mapped[:100] if len(mapped) > 100 else mapped
|
return mapped[:100] if len(mapped) > 100 else mapped
|
||||||
|
|
||||||
original = df_clean[country_column].copy()
|
original = df_clean[country_column].copy()
|
||||||
df_clean[country_column] = df_clean[country_column].apply(map_country)
|
df_clean[country_column] = df_clean[country_column].apply(map_country)
|
||||||
changes = {orig: new for orig, new in zip(original, df_clean[country_column]) if orig != new}
|
changes = {orig: new for orig, new in zip(original, df_clean[country_column]) if orig != new}
|
||||||
|
|
||||||
@@ -176,16 +177,16 @@ def standardize_country_names_asean(df: pd.DataFrame, country_column: str = 'cou
|
|||||||
def assign_pillar(indicator_name: str) -> str:
|
def assign_pillar(indicator_name: str) -> str:
|
||||||
"""
|
"""
|
||||||
Assign pillar berdasarkan keyword indikator.
|
Assign pillar berdasarkan keyword indikator.
|
||||||
Return values: 'Availability', 'Access', 'Utilization', 'Stability', 'Other'
|
Return values: 'Availability', 'Access', 'Utilization', 'Stability', 'Sustainability'
|
||||||
All <= 20 chars (varchar(20) constraint).
|
All ≤ 20 chars (varchar(20) constraint).
|
||||||
"""
|
"""
|
||||||
if pd.isna(indicator_name):
|
if pd.isna(indicator_name):
|
||||||
return 'Other'
|
return 'Sustainability'
|
||||||
ind = str(indicator_name).lower()
|
ind = str(indicator_name).lower()
|
||||||
|
|
||||||
for kw in ['requirement', 'coefficient', 'losses', 'fat supply']:
|
for kw in ['requirement', 'coefficient', 'losses', 'fat supply']:
|
||||||
if kw in ind:
|
if kw in ind:
|
||||||
return 'Other'
|
return 'Sustainability'
|
||||||
|
|
||||||
if any(kw in ind for kw in [
|
if any(kw in ind for kw in [
|
||||||
'adequacy', 'protein supply', 'supply of protein',
|
'adequacy', 'protein supply', 'supply of protein',
|
||||||
@@ -209,13 +210,12 @@ def assign_pillar(indicator_name: str) -> str:
|
|||||||
|
|
||||||
if any(kw in ind for kw in [
|
if any(kw in ind for kw in [
|
||||||
'wasting', 'wasted', 'stunted', 'overweight', 'obese', 'obesity',
|
'wasting', 'wasted', 'stunted', 'overweight', 'obese', 'obesity',
|
||||||
'anemia', 'anaemia', 'birthweight', 'breastfeeding', 'drinking water',
|
'anemia', 'birthweight', 'breastfeeding', 'drinking water', 'sanitation',
|
||||||
'sanitation', 'children under 5', 'newborns with low',
|
'children under 5', 'newborns with low', 'women of reproductive'
|
||||||
'women of reproductive'
|
|
||||||
]):
|
]):
|
||||||
return 'Utilization'
|
return 'Utilization'
|
||||||
|
|
||||||
return 'Other'
|
return 'Sustainability'
|
||||||
|
|
||||||
|
|
||||||
# =============================================================================
|
# =============================================================================
|
||||||
@@ -226,15 +226,17 @@ def assign_direction(indicator_name: str) -> str:
|
|||||||
"""
|
"""
|
||||||
Assign direction berdasarkan indikator.
|
Assign direction berdasarkan indikator.
|
||||||
Return values: 'higher_better' (13 chars) atau 'lower_better' (12 chars)
|
Return values: 'higher_better' (13 chars) atau 'lower_better' (12 chars)
|
||||||
Both <= 15 chars (varchar(15) constraint).
|
Both ≤ 15 chars (varchar(15) constraint).
|
||||||
"""
|
"""
|
||||||
if pd.isna(indicator_name):
|
if pd.isna(indicator_name):
|
||||||
return 'higher_better'
|
return 'higher_better'
|
||||||
ind = str(indicator_name).lower()
|
ind = str(indicator_name).lower()
|
||||||
|
|
||||||
|
# Spesifik lower_better
|
||||||
if 'share of dietary energy supply derived from cereals' in ind:
|
if 'share of dietary energy supply derived from cereals' in ind:
|
||||||
return 'lower_better'
|
return 'lower_better'
|
||||||
|
|
||||||
|
# Higher_better exceptions — cek sebelum lower_better keywords
|
||||||
for kw in [
|
for kw in [
|
||||||
'exclusive breastfeeding',
|
'exclusive breastfeeding',
|
||||||
'dietary energy supply',
|
'dietary energy supply',
|
||||||
@@ -246,6 +248,7 @@ def assign_direction(indicator_name: str) -> str:
|
|||||||
if kw in ind:
|
if kw in ind:
|
||||||
return 'higher_better'
|
return 'higher_better'
|
||||||
|
|
||||||
|
# Lower_better — masalah yang harus diminimalkan
|
||||||
for kw in [
|
for kw in [
|
||||||
'prevalence of undernourishment',
|
'prevalence of undernourishment',
|
||||||
'prevalence of severe food insecurity',
|
'prevalence of severe food insecurity',
|
||||||
@@ -256,7 +259,6 @@ def assign_direction(indicator_name: str) -> str:
|
|||||||
'prevalence of overweight',
|
'prevalence of overweight',
|
||||||
'prevalence of obesity',
|
'prevalence of obesity',
|
||||||
'prevalence of anemia',
|
'prevalence of anemia',
|
||||||
'prevalence of anaemia',
|
|
||||||
'prevalence of low birthweight',
|
'prevalence of low birthweight',
|
||||||
'number of people undernourished',
|
'number of people undernourished',
|
||||||
'number of severely food insecure',
|
'number of severely food insecure',
|
||||||
@@ -281,9 +283,6 @@ def assign_direction(indicator_name: str) -> str:
|
|||||||
'coefficient of variation',
|
'coefficient of variation',
|
||||||
'incidence of caloric losses',
|
'incidence of caloric losses',
|
||||||
'food losses',
|
'food losses',
|
||||||
'indicator of food price anomalies',
|
|
||||||
'proportion of local breeds classified as being at risk',
|
|
||||||
'agricultural export subsidies',
|
|
||||||
]:
|
]:
|
||||||
if kw in ind:
|
if kw in ind:
|
||||||
return 'lower_better'
|
return 'lower_better'
|
||||||
@@ -300,18 +299,19 @@ class CleanedDataLoader:
|
|||||||
Loader untuk cleaned integrated data ke STAGING layer (Silver).
|
Loader untuk cleaned integrated data ke STAGING layer (Silver).
|
||||||
|
|
||||||
Kimball context:
|
Kimball context:
|
||||||
Input : staging_integrated -> STAGING (Silver) — fs_asean_silver
|
Input : staging_integrated → STAGING (Silver) — fs_asean_silver
|
||||||
Output : cleaned_integrated -> STAGING (Silver) — fs_asean_silver
|
Output : cleaned_integrated → STAGING (Silver) — fs_asean_silver
|
||||||
Audit : etl_logs, etl_metadata -> AUDIT — fs_asean_audit
|
Audit : etl_logs, etl_metadata → AUDIT — fs_asean_audit
|
||||||
|
|
||||||
Pipeline steps:
|
Pipeline steps:
|
||||||
1. Standardize country names (ASEAN)
|
1. Standardize country names (ASEAN)
|
||||||
2. Remove missing values
|
2. Remove missing values
|
||||||
3. Remove duplicates
|
3. Remove duplicates
|
||||||
4. Add pillar & direction classification
|
4. Add pillar classification
|
||||||
5. Apply column constraints
|
5. Add direction classification
|
||||||
6. Load ke BigQuery
|
6. Apply column constraints
|
||||||
7. Log ke Audit layer
|
7. Load ke BigQuery
|
||||||
|
8. Log ke Audit layer
|
||||||
"""
|
"""
|
||||||
|
|
||||||
SCHEMA = [
|
SCHEMA = [
|
||||||
@@ -355,7 +355,7 @@ class CleanedDataLoader:
|
|||||||
def _step_standardize_countries(self, df: pd.DataFrame) -> pd.DataFrame:
|
def _step_standardize_countries(self, df: pd.DataFrame) -> pd.DataFrame:
|
||||||
print("\n [Step 1/5] Standardize country names...")
|
print("\n [Step 1/5] Standardize country names...")
|
||||||
df, report = standardize_country_names_asean(df, country_column='country')
|
df, report = standardize_country_names_asean(df, country_column='country')
|
||||||
print(f" ASEAN countries mapped : {report['countries_mapped']}")
|
print(f" ✓ ASEAN countries mapped : {report['countries_mapped']}")
|
||||||
unique_countries = sorted(df['country'].unique())
|
unique_countries = sorted(df['country'].unique())
|
||||||
print(f" Countries ({len(unique_countries)}) : {', '.join(unique_countries)}")
|
print(f" Countries ({len(unique_countries)}) : {', '.join(unique_countries)}")
|
||||||
log_update(self.client, 'STAGING', 'staging_integrated',
|
log_update(self.client, 'STAGING', 'staging_integrated',
|
||||||
@@ -377,9 +377,7 @@ class CleanedDataLoader:
|
|||||||
def _step_remove_duplicates(self, df: pd.DataFrame) -> pd.DataFrame:
|
def _step_remove_duplicates(self, df: pd.DataFrame) -> pd.DataFrame:
|
||||||
print("\n [Step 3/5] Remove duplicates...")
|
print("\n [Step 3/5] Remove duplicates...")
|
||||||
exact_dups = df.duplicated().sum()
|
exact_dups = df.duplicated().sum()
|
||||||
data_dups = df.duplicated(
|
data_dups = df.duplicated(subset=['indicator_standardized', 'country', 'year', 'value']).sum()
|
||||||
subset=['indicator_standardized', 'country', 'year', 'value']
|
|
||||||
).sum()
|
|
||||||
print(f" Exact duplicates : {exact_dups:,}")
|
print(f" Exact duplicates : {exact_dups:,}")
|
||||||
print(f" Data duplicates : {data_dups:,}")
|
print(f" Data duplicates : {data_dups:,}")
|
||||||
rows_before = len(df)
|
rows_before = len(df)
|
||||||
@@ -393,21 +391,19 @@ class CleanedDataLoader:
|
|||||||
def _step_add_classifications(self, df: pd.DataFrame) -> pd.DataFrame:
|
def _step_add_classifications(self, df: pd.DataFrame) -> pd.DataFrame:
|
||||||
print("\n [Step 4/5] Add pillar & direction classification...")
|
print("\n [Step 4/5] Add pillar & direction classification...")
|
||||||
df = df.copy()
|
df = df.copy()
|
||||||
|
|
||||||
df['pillar'] = df['indicator_standardized'].apply(assign_pillar)
|
df['pillar'] = df['indicator_standardized'].apply(assign_pillar)
|
||||||
df['direction'] = df['indicator_standardized'].apply(assign_direction)
|
df['direction'] = df['indicator_standardized'].apply(assign_direction)
|
||||||
|
|
||||||
pillar_counts = df['pillar'].value_counts()
|
pillar_counts = df['pillar'].value_counts()
|
||||||
print(f" Pillar distribution:")
|
print(f" ✓ Pillar distribution:")
|
||||||
for pillar, count in pillar_counts.items():
|
for pillar, count in pillar_counts.items():
|
||||||
print(f" - {pillar}: {count:,}")
|
print(f" - {pillar}: {count:,}")
|
||||||
|
|
||||||
direction_counts = df['direction'].value_counts()
|
direction_counts = df['direction'].value_counts()
|
||||||
print(f" Direction distribution:")
|
print(f" ✓ Direction distribution:")
|
||||||
for direction, count in direction_counts.items():
|
for direction, count in direction_counts.items():
|
||||||
pct = count / len(df) * 100
|
pct = count / len(df) * 100
|
||||||
print(f" - {direction}: {count:,} ({pct:.1f}%)")
|
print(f" - {direction}: {count:,} ({pct:.1f}%)")
|
||||||
|
|
||||||
return df
|
return df
|
||||||
|
|
||||||
def _step_apply_constraints(self, df: pd.DataFrame) -> pd.DataFrame:
|
def _step_apply_constraints(self, df: pd.DataFrame) -> pd.DataFrame:
|
||||||
@@ -442,6 +438,7 @@ class CleanedDataLoader:
|
|||||||
if 'country' in df.columns:
|
if 'country' in df.columns:
|
||||||
validation['unique_countries'] = int(df['country'].nunique())
|
validation['unique_countries'] = int(df['country'].nunique())
|
||||||
|
|
||||||
|
# Column length check
|
||||||
column_length_check = {}
|
column_length_check = {}
|
||||||
for col, max_len in COLUMN_CONSTRAINTS.items():
|
for col, max_len in COLUMN_CONSTRAINTS.items():
|
||||||
if col in df.columns:
|
if col in df.columns:
|
||||||
@@ -460,7 +457,7 @@ class CleanedDataLoader:
|
|||||||
|
|
||||||
def run(self, df: pd.DataFrame) -> int:
|
def run(self, df: pd.DataFrame) -> int:
|
||||||
"""
|
"""
|
||||||
Execute full cleaning pipeline -> load ke STAGING (Silver).
|
Execute full cleaning pipeline → load ke STAGING (Silver).
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
int: Rows loaded
|
int: Rows loaded
|
||||||
@@ -472,6 +469,7 @@ class CleanedDataLoader:
|
|||||||
print(" ERROR: DataFrame is empty, nothing to process.")
|
print(" ERROR: DataFrame is empty, nothing to process.")
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
|
# Pipeline steps
|
||||||
df = self._step_standardize_countries(df)
|
df = self._step_standardize_countries(df)
|
||||||
df = self._step_remove_missing(df)
|
df = self._step_remove_missing(df)
|
||||||
df = self._step_remove_duplicates(df)
|
df = self._step_remove_duplicates(df)
|
||||||
@@ -480,6 +478,7 @@ class CleanedDataLoader:
|
|||||||
|
|
||||||
self.metadata['rows_transformed'] = len(df)
|
self.metadata['rows_transformed'] = len(df)
|
||||||
|
|
||||||
|
# Validate
|
||||||
validation = self.validate_data(df)
|
validation = self.validate_data(df)
|
||||||
self.metadata['validation_metrics'] = validation
|
self.metadata['validation_metrics'] = validation
|
||||||
|
|
||||||
@@ -488,12 +487,13 @@ class CleanedDataLoader:
|
|||||||
for info in validation.get('column_length_check', {}).values()
|
for info in validation.get('column_length_check', {}).values()
|
||||||
)
|
)
|
||||||
if not all_within_limits:
|
if not all_within_limits:
|
||||||
print("\n WARNING: Some columns still exceed length constraints!")
|
print("\n ⚠ WARNING: Some columns still exceed length constraints!")
|
||||||
for col, info in validation['column_length_check'].items():
|
for col, info in validation['column_length_check'].items():
|
||||||
if not info['within_limit']:
|
if not info['within_limit']:
|
||||||
print(f" - {col}: {info['max_actual_length']} > {info['max_length_constraint']}")
|
print(f" - {col}: {info['max_actual_length']} > {info['max_length_constraint']}")
|
||||||
|
|
||||||
print(f"\n Loading to [STAGING/Silver] {self.table_name} -> fs_asean_silver...")
|
# Load ke Silver
|
||||||
|
print(f"\n Loading to [STAGING/Silver] {self.table_name} → fs_asean_silver...")
|
||||||
rows_loaded = load_to_bigquery(
|
rows_loaded = load_to_bigquery(
|
||||||
self.client, df, self.table_name,
|
self.client, df, self.table_name,
|
||||||
layer='silver',
|
layer='silver',
|
||||||
@@ -502,8 +502,10 @@ class CleanedDataLoader:
|
|||||||
)
|
)
|
||||||
self.metadata['rows_loaded'] = rows_loaded
|
self.metadata['rows_loaded'] = rows_loaded
|
||||||
|
|
||||||
|
# Audit logs
|
||||||
log_update(self.client, 'STAGING', self.table_name, 'full_refresh', rows_loaded)
|
log_update(self.client, 'STAGING', self.table_name, 'full_refresh', rows_loaded)
|
||||||
|
|
||||||
|
# ETL metadata
|
||||||
self.metadata['end_time'] = datetime.now()
|
self.metadata['end_time'] = datetime.now()
|
||||||
self.metadata['duration_seconds'] = (
|
self.metadata['duration_seconds'] = (
|
||||||
self.metadata['end_time'] - self.metadata['start_time']
|
self.metadata['end_time'] - self.metadata['start_time']
|
||||||
@@ -514,31 +516,33 @@ class CleanedDataLoader:
|
|||||||
self.metadata['validation_metrics'] = json.dumps(validation)
|
self.metadata['validation_metrics'] = json.dumps(validation)
|
||||||
save_etl_metadata(self.client, self.metadata)
|
save_etl_metadata(self.client, self.metadata)
|
||||||
|
|
||||||
print(f"\n Cleaned Integration completed: {rows_loaded:,} rows")
|
# Summary
|
||||||
|
print(f"\n ✓ Cleaned Integration completed: {rows_loaded:,} rows")
|
||||||
print(f" Duration : {self.metadata['duration_seconds']:.2f}s")
|
print(f" Duration : {self.metadata['duration_seconds']:.2f}s")
|
||||||
print(f" Completeness : {validation['completeness_pct']:.2f}%")
|
print(f" Completeness : {validation['completeness_pct']:.2f}%")
|
||||||
if 'year_range' in validation:
|
if 'year_range' in validation:
|
||||||
yr = validation['year_range']
|
yr = validation['year_range']
|
||||||
if yr['min'] and yr['max']:
|
if yr['min'] and yr['max']:
|
||||||
print(f" Year range : {yr['min']}-{yr['max']}")
|
print(f" Year range : {yr['min']}–{yr['max']}")
|
||||||
print(f" Indicators : {validation.get('unique_indicators', '-')}")
|
print(f" Indicators : {validation.get('unique_indicators', '-')}")
|
||||||
print(f" Countries : {validation.get('unique_countries', '-')}")
|
print(f" Countries : {validation.get('unique_countries', '-')}")
|
||||||
print(f"\n Schema Validation:")
|
print(f"\n Schema Validation:")
|
||||||
for col, info in validation.get('column_length_check', {}).items():
|
for col, info in validation.get('column_length_check', {}).items():
|
||||||
status = "OK" if info['within_limit'] else "FAIL"
|
status = "✓" if info['within_limit'] else "✗"
|
||||||
print(f" [{status}] {col}: {info['max_actual_length']}/{info['max_length_constraint']}")
|
print(f" {status} {col}: {info['max_actual_length']}/{info['max_length_constraint']}")
|
||||||
print(f"\n Metadata -> [AUDIT] etl_metadata")
|
print(f"\n Metadata → [AUDIT] etl_metadata")
|
||||||
|
|
||||||
return rows_loaded
|
return rows_loaded
|
||||||
|
|
||||||
|
|
||||||
# =============================================================================
|
# =============================================================================
|
||||||
# AIRFLOW TASK FUNCTIONS
|
# AIRFLOW TASK FUNCTIONS ← sama polanya dengan raw layer
|
||||||
# =============================================================================
|
# =============================================================================
|
||||||
|
|
||||||
def run_cleaned_integration():
|
def run_cleaned_integration():
|
||||||
"""
|
"""
|
||||||
Airflow task: Load cleaned_integrated dari staging_integrated.
|
Airflow task: Load cleaned_integrated dari staging_integrated.
|
||||||
|
|
||||||
Dipanggil oleh DAG setelah task staging_integration_to_silver selesai.
|
Dipanggil oleh DAG setelah task staging_integration_to_silver selesai.
|
||||||
"""
|
"""
|
||||||
from scripts.bigquery_config import get_bigquery_client
|
from scripts.bigquery_config import get_bigquery_client
|
||||||
@@ -557,21 +561,21 @@ if __name__ == "__main__":
|
|||||||
print("=" * 60)
|
print("=" * 60)
|
||||||
print("BIGQUERY CLEANED LAYER ETL")
|
print("BIGQUERY CLEANED LAYER ETL")
|
||||||
print("Kimball DW Architecture")
|
print("Kimball DW Architecture")
|
||||||
print(" Input : STAGING (Silver) -> staging_integrated")
|
print(" Input : STAGING (Silver) → staging_integrated")
|
||||||
print(" Output : STAGING (Silver) -> cleaned_integrated")
|
print(" Output : STAGING (Silver) → cleaned_integrated")
|
||||||
print(" Audit : AUDIT -> etl_logs, etl_metadata")
|
print(" Audit : AUDIT → etl_logs, etl_metadata")
|
||||||
print("=" * 60)
|
print("=" * 60)
|
||||||
|
|
||||||
logger = setup_logging()
|
logger = setup_logging()
|
||||||
client = get_bigquery_client()
|
client = get_bigquery_client()
|
||||||
df_staging = load_staging_data(client)
|
df_staging = load_staging_data(client)
|
||||||
|
|
||||||
print("\n[1/1] Cleaned Integration -> STAGING (Silver)...")
|
print("\n[1/1] Cleaned Integration → STAGING (Silver)...")
|
||||||
loader = CleanedDataLoader(client, load_mode='full_refresh')
|
loader = CleanedDataLoader(client, load_mode='full_refresh')
|
||||||
final_count = loader.run(df_staging)
|
final_count = loader.run(df_staging)
|
||||||
|
|
||||||
print("\n" + "=" * 60)
|
print("\n" + "=" * 60)
|
||||||
print("[OK] CLEANED LAYER ETL COMPLETED")
|
print("✓ CLEANED LAYER ETL COMPLETED")
|
||||||
print(f" STAGING (Silver) : cleaned_integrated ({final_count:,} rows)")
|
print(f" 🥈 STAGING (Silver) : cleaned_integrated ({final_count:,} rows)")
|
||||||
print(f" AUDIT : etl_logs, etl_metadata")
|
print(f" 📋 AUDIT : etl_logs, etl_metadata")
|
||||||
print("=" * 60)
|
print("=" * 60)
|
||||||
@@ -46,9 +46,9 @@ class DimensionalModelLoader:
|
|||||||
Loader untuk dimensional model ke DW layer (Gold) — fs_asean_gold.
|
Loader untuk dimensional model ke DW layer (Gold) — fs_asean_gold.
|
||||||
|
|
||||||
Kimball context:
|
Kimball context:
|
||||||
Input : cleaned_integrated -> STAGING (Silver) — fs_asean_silver
|
Input : cleaned_integrated → STAGING (Silver) — fs_asean_silver
|
||||||
Output : dim_* + fact_* -> DW (Gold) — fs_asean_gold
|
Output : dim_* + fact_* → DW (Gold) — fs_asean_gold
|
||||||
Audit : etl_logs, etl_metadata -> AUDIT — fs_asean_audit
|
Audit : etl_logs, etl_metadata → AUDIT — fs_asean_audit
|
||||||
|
|
||||||
Pipeline steps:
|
Pipeline steps:
|
||||||
1. Load dim_country
|
1. Load dim_country
|
||||||
@@ -117,7 +117,7 @@ class DimensionalModelLoader:
|
|||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
self.client.query(query).result()
|
self.client.query(query).result()
|
||||||
self.logger.info(f" [OK] FK: {table_name}.{fk_column} -> {ref_table}.{ref_column}")
|
self.logger.info(f" [OK] FK: {table_name}.{fk_column} → {ref_table}.{ref_column}")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
if "already exists" in str(e).lower():
|
if "already exists" in str(e).lower():
|
||||||
self.logger.info(f" [INFO] FK already exists: {constraint_name}")
|
self.logger.info(f" [INFO] FK already exists: {constraint_name}")
|
||||||
@@ -129,7 +129,7 @@ class DimensionalModelLoader:
|
|||||||
# ------------------------------------------------------------------
|
# ------------------------------------------------------------------
|
||||||
|
|
||||||
def _save_table_metadata(self, table_name: str):
|
def _save_table_metadata(self, table_name: str):
|
||||||
meta = self.load_metadata[table_name]
|
meta = self.load_metadata[table_name]
|
||||||
metadata = {
|
metadata = {
|
||||||
'source_class' : self.__class__.__name__,
|
'source_class' : self.__class__.__name__,
|
||||||
'table_name' : table_name,
|
'table_name' : table_name,
|
||||||
@@ -145,7 +145,7 @@ class DimensionalModelLoader:
|
|||||||
}
|
}
|
||||||
try:
|
try:
|
||||||
save_etl_metadata(self.client, metadata)
|
save_etl_metadata(self.client, metadata)
|
||||||
self.logger.info(f" Metadata -> [AUDIT] etl_metadata")
|
self.logger.info(f" Metadata → [AUDIT] etl_metadata")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.logger.warning(f" [WARN] Could not save metadata for {table_name}: {e}")
|
self.logger.warning(f" [WARN] Could not save metadata for {table_name}: {e}")
|
||||||
|
|
||||||
@@ -156,13 +156,13 @@ class DimensionalModelLoader:
|
|||||||
def load_dim_time(self):
|
def load_dim_time(self):
|
||||||
table_name = 'dim_time'
|
table_name = 'dim_time'
|
||||||
self.load_metadata[table_name]['start_time'] = datetime.now()
|
self.load_metadata[table_name]['start_time'] = datetime.now()
|
||||||
self.logger.info("Loading dim_time -> [DW/Gold] fs_asean_gold...")
|
self.logger.info("Loading dim_time → [DW/Gold] fs_asean_gold...")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
if 'year_range' in self.df_clean.columns:
|
if 'year_range' in self.df_clean.columns:
|
||||||
dim_time = self.df_clean[['year', 'year_range']].drop_duplicates().copy()
|
dim_time = self.df_clean[['year', 'year_range']].drop_duplicates().copy()
|
||||||
else:
|
else:
|
||||||
dim_time = self.df_clean[['year']].drop_duplicates().copy()
|
dim_time = self.df_clean[['year']].drop_duplicates().copy()
|
||||||
dim_time['year_range'] = None
|
dim_time['year_range'] = None
|
||||||
|
|
||||||
dim_time['year'] = dim_time['year'].astype(int)
|
dim_time['year'] = dim_time['year'].astype(int)
|
||||||
@@ -194,10 +194,10 @@ class DimensionalModelLoader:
|
|||||||
pass
|
pass
|
||||||
return pd.Series({'year': year, 'start_year': start_year, 'end_year': end_year})
|
return pd.Series({'year': year, 'start_year': start_year, 'end_year': end_year})
|
||||||
|
|
||||||
parsed = dim_time.apply(parse_year_range, axis=1)
|
parsed = dim_time.apply(parse_year_range, axis=1)
|
||||||
dim_time['year'] = parsed['year'].astype(int)
|
dim_time['year'] = parsed['year'].astype(int)
|
||||||
dim_time['start_year'] = parsed['start_year'].astype(int)
|
dim_time['start_year'] = parsed['start_year'].astype(int)
|
||||||
dim_time['end_year'] = parsed['end_year'].astype(int)
|
dim_time['end_year'] = parsed['end_year'].astype(int)
|
||||||
dim_time['is_year_range'] = (dim_time['start_year'] != dim_time['end_year'])
|
dim_time['is_year_range'] = (dim_time['start_year'] != dim_time['end_year'])
|
||||||
dim_time['decade'] = (dim_time['year'] // 10) * 10
|
dim_time['decade'] = (dim_time['year'] // 10) * 10
|
||||||
dim_time['is_range'] = (dim_time['start_year'] != dim_time['end_year']).astype(int)
|
dim_time['is_range'] = (dim_time['start_year'] != dim_time['end_year']).astype(int)
|
||||||
@@ -229,7 +229,7 @@ class DimensionalModelLoader:
|
|||||||
)
|
)
|
||||||
log_update(self.client, 'DW', table_name, 'full_load', rows_loaded)
|
log_update(self.client, 'DW', table_name, 'full_load', rows_loaded)
|
||||||
self._save_table_metadata(table_name)
|
self._save_table_metadata(table_name)
|
||||||
self.logger.info(f" dim_time: {rows_loaded} rows\n")
|
self.logger.info(f" ✓ dim_time: {rows_loaded} rows\n")
|
||||||
return rows_loaded
|
return rows_loaded
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@@ -240,11 +240,11 @@ class DimensionalModelLoader:
|
|||||||
def load_dim_country(self):
|
def load_dim_country(self):
|
||||||
table_name = 'dim_country'
|
table_name = 'dim_country'
|
||||||
self.load_metadata[table_name]['start_time'] = datetime.now()
|
self.load_metadata[table_name]['start_time'] = datetime.now()
|
||||||
self.logger.info("Loading dim_country -> [DW/Gold] fs_asean_gold...")
|
self.logger.info("Loading dim_country → [DW/Gold] fs_asean_gold...")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
dim_country = self.df_clean[['country']].drop_duplicates().copy()
|
dim_country = self.df_clean[['country']].drop_duplicates().copy()
|
||||||
dim_country.columns = ['country_name']
|
dim_country.columns = ['country_name']
|
||||||
|
|
||||||
region_mapping = {
|
region_mapping = {
|
||||||
'Brunei Darussalam': ('Southeast Asia', 'ASEAN'),
|
'Brunei Darussalam': ('Southeast Asia', 'ASEAN'),
|
||||||
@@ -270,9 +270,7 @@ class DimensionalModelLoader:
|
|||||||
lambda x: region_mapping.get(x, ('Unknown', 'Unknown'))[1])
|
lambda x: region_mapping.get(x, ('Unknown', 'Unknown'))[1])
|
||||||
dim_country['iso_code'] = dim_country['country_name'].map(iso_mapping)
|
dim_country['iso_code'] = dim_country['country_name'].map(iso_mapping)
|
||||||
|
|
||||||
dim_country_final = dim_country[
|
dim_country_final = dim_country[['country_name', 'region', 'subregion', 'iso_code']].copy()
|
||||||
['country_name', 'region', 'subregion', 'iso_code']
|
|
||||||
].copy()
|
|
||||||
dim_country_final = dim_country_final.reset_index(drop=True)
|
dim_country_final = dim_country_final.reset_index(drop=True)
|
||||||
dim_country_final.insert(0, 'country_id', range(1, len(dim_country_final) + 1))
|
dim_country_final.insert(0, 'country_id', range(1, len(dim_country_final) + 1))
|
||||||
|
|
||||||
@@ -295,7 +293,7 @@ class DimensionalModelLoader:
|
|||||||
)
|
)
|
||||||
log_update(self.client, 'DW', table_name, 'full_load', rows_loaded)
|
log_update(self.client, 'DW', table_name, 'full_load', rows_loaded)
|
||||||
self._save_table_metadata(table_name)
|
self._save_table_metadata(table_name)
|
||||||
self.logger.info(f" dim_country: {rows_loaded} rows\n")
|
self.logger.info(f" ✓ dim_country: {rows_loaded} rows\n")
|
||||||
return rows_loaded
|
return rows_loaded
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@@ -304,83 +302,58 @@ class DimensionalModelLoader:
|
|||||||
raise
|
raise
|
||||||
|
|
||||||
def load_dim_indicator(self):
|
def load_dim_indicator(self):
|
||||||
"""
|
|
||||||
Load dim_indicator ke Gold layer.
|
|
||||||
|
|
||||||
Kolom yang dimuat:
|
|
||||||
indicator_id — surrogate key
|
|
||||||
indicator_name — nama standar indikator
|
|
||||||
indicator_category — kategori (Health & Nutrition, dll.)
|
|
||||||
unit — satuan ukuran
|
|
||||||
direction — higher_better / lower_better
|
|
||||||
"""
|
|
||||||
table_name = 'dim_indicator'
|
table_name = 'dim_indicator'
|
||||||
self.load_metadata[table_name]['start_time'] = datetime.now()
|
self.load_metadata[table_name]['start_time'] = datetime.now()
|
||||||
self.logger.info("Loading dim_indicator -> [DW/Gold] fs_asean_gold...")
|
self.logger.info("Loading dim_indicator → [DW/Gold] fs_asean_gold...")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
has_direction = 'direction' in self.df_clean.columns
|
has_direction = 'direction' in self.df_clean.columns
|
||||||
has_unit = 'unit' in self.df_clean.columns
|
has_unit = 'unit' in self.df_clean.columns
|
||||||
has_category = 'indicator_category' in self.df_clean.columns
|
has_category = 'indicator_category' in self.df_clean.columns
|
||||||
|
|
||||||
dim_indicator = self.df_clean[['indicator_standardized']].drop_duplicates().copy()
|
dim_indicator = self.df_clean[['indicator_standardized']].drop_duplicates().copy()
|
||||||
dim_indicator.columns = ['indicator_name']
|
dim_indicator.columns = ['indicator_name']
|
||||||
|
|
||||||
# Unit
|
|
||||||
if has_unit:
|
if has_unit:
|
||||||
unit_map = self.df_clean[['indicator_standardized', 'unit']].drop_duplicates()
|
unit_map = self.df_clean[['indicator_standardized', 'unit']].drop_duplicates()
|
||||||
unit_map.columns = ['indicator_name', 'unit']
|
unit_map.columns = ['indicator_name', 'unit']
|
||||||
dim_indicator = dim_indicator.merge(unit_map, on='indicator_name', how='left')
|
dim_indicator = dim_indicator.merge(unit_map, on='indicator_name', how='left')
|
||||||
else:
|
else:
|
||||||
dim_indicator['unit'] = None
|
dim_indicator['unit'] = None
|
||||||
|
|
||||||
# Direction
|
|
||||||
if has_direction:
|
if has_direction:
|
||||||
dir_map = self.df_clean[['indicator_standardized', 'direction']].drop_duplicates()
|
dir_map = self.df_clean[['indicator_standardized', 'direction']].drop_duplicates()
|
||||||
dir_map.columns = ['indicator_name', 'direction']
|
dir_map.columns = ['indicator_name', 'direction']
|
||||||
dim_indicator = dim_indicator.merge(dir_map, on='indicator_name', how='left')
|
dim_indicator = dim_indicator.merge(dir_map, on='indicator_name', how='left')
|
||||||
self.logger.info(" [OK] direction column from cleaned_integrated")
|
self.logger.info(" [OK] direction column from cleaned_integrated")
|
||||||
else:
|
else:
|
||||||
dim_indicator['direction'] = 'higher_better'
|
dim_indicator['direction'] = 'higher_better'
|
||||||
self.logger.warning(" [WARN] direction not found, default: higher_better")
|
self.logger.warning(" [WARN] direction not found, default: higher_better")
|
||||||
|
|
||||||
# Indicator category
|
|
||||||
if has_category:
|
if has_category:
|
||||||
cat_map = self.df_clean[
|
cat_map = self.df_clean[['indicator_standardized', 'indicator_category']].drop_duplicates()
|
||||||
['indicator_standardized', 'indicator_category']
|
cat_map.columns = ['indicator_name', 'indicator_category']
|
||||||
].drop_duplicates()
|
dim_indicator = dim_indicator.merge(cat_map, on='indicator_name', how='left')
|
||||||
cat_map.columns = ['indicator_name', 'indicator_category']
|
|
||||||
dim_indicator = dim_indicator.merge(cat_map, on='indicator_name', how='left')
|
|
||||||
else:
|
else:
|
||||||
def categorize_indicator(name):
|
def categorize_indicator(name):
|
||||||
n = str(name).lower()
|
n = str(name).lower()
|
||||||
if any(w in n for w in [
|
if any(w in n for w in ['undernourishment', 'malnutrition', 'stunting',
|
||||||
'undernourishment', 'malnutrition', 'stunting',
|
'wasting', 'anemia', 'food security', 'food insecure', 'hunger']):
|
||||||
'wasting', 'anemia', 'anaemia', 'food security',
|
|
||||||
'food insecure', 'hunger'
|
|
||||||
]):
|
|
||||||
return 'Health & Nutrition'
|
return 'Health & Nutrition'
|
||||||
elif any(w in n for w in [
|
elif any(w in n for w in ['production', 'yield', 'cereal', 'crop',
|
||||||
'production', 'yield', 'cereal', 'crop',
|
'import dependency', 'share of dietary']):
|
||||||
'import dependency', 'share of dietary'
|
|
||||||
]):
|
|
||||||
return 'Agricultural Production'
|
return 'Agricultural Production'
|
||||||
elif any(w in n for w in ['import', 'export', 'trade']):
|
elif any(w in n for w in ['import', 'export', 'trade']):
|
||||||
return 'Trade'
|
return 'Trade'
|
||||||
elif any(w in n for w in ['gdp', 'income', 'economic']):
|
elif any(w in n for w in ['gdp', 'income', 'economic']):
|
||||||
return 'Economic'
|
return 'Economic'
|
||||||
elif any(w in n for w in [
|
elif any(w in n for w in ['water', 'sanitation', 'infrastructure', 'rail']):
|
||||||
'water', 'sanitation', 'infrastructure', 'rail'
|
|
||||||
]):
|
|
||||||
return 'Infrastructure'
|
return 'Infrastructure'
|
||||||
else:
|
else:
|
||||||
return 'Other'
|
return 'Sustainability'
|
||||||
dim_indicator['indicator_category'] = dim_indicator['indicator_name'].apply(
|
dim_indicator['indicator_category'] = dim_indicator['indicator_name'].apply(categorize_indicator)
|
||||||
categorize_indicator
|
|
||||||
)
|
|
||||||
|
|
||||||
dim_indicator = dim_indicator.drop_duplicates(subset=['indicator_name'], keep='first')
|
|
||||||
|
|
||||||
|
dim_indicator = dim_indicator.drop_duplicates(subset=['indicator_name'], keep='first')
|
||||||
dim_indicator_final = dim_indicator[
|
dim_indicator_final = dim_indicator[
|
||||||
['indicator_name', 'indicator_category', 'unit', 'direction']
|
['indicator_name', 'indicator_category', 'unit', 'direction']
|
||||||
].copy()
|
].copy()
|
||||||
@@ -401,22 +374,17 @@ class DimensionalModelLoader:
|
|||||||
)
|
)
|
||||||
self._add_primary_key(table_name, 'indicator_id')
|
self._add_primary_key(table_name, 'indicator_id')
|
||||||
|
|
||||||
# Log distribusi
|
for label, col in [('Categories', 'indicator_category'), ('Direction', 'direction')]:
|
||||||
for label, col in [
|
|
||||||
('Categories', 'indicator_category'),
|
|
||||||
('Direction', 'direction'),
|
|
||||||
]:
|
|
||||||
self.logger.info(f" {label}:")
|
self.logger.info(f" {label}:")
|
||||||
for val, cnt in dim_indicator_final[col].value_counts().items():
|
for val, cnt in dim_indicator_final[col].value_counts().items():
|
||||||
pct = cnt / len(dim_indicator_final) * 100
|
self.logger.info(f" - {val}: {cnt} ({cnt/len(dim_indicator_final)*100:.1f}%)")
|
||||||
self.logger.info(f" - {val}: {cnt} ({pct:.1f}%)")
|
|
||||||
|
|
||||||
self.load_metadata[table_name].update(
|
self.load_metadata[table_name].update(
|
||||||
{'rows_loaded': rows_loaded, 'status': 'success', 'end_time': datetime.now()}
|
{'rows_loaded': rows_loaded, 'status': 'success', 'end_time': datetime.now()}
|
||||||
)
|
)
|
||||||
log_update(self.client, 'DW', table_name, 'full_load', rows_loaded)
|
log_update(self.client, 'DW', table_name, 'full_load', rows_loaded)
|
||||||
self._save_table_metadata(table_name)
|
self._save_table_metadata(table_name)
|
||||||
self.logger.info(f" dim_indicator: {rows_loaded} rows\n")
|
self.logger.info(f" ✓ dim_indicator: {rows_loaded} rows\n")
|
||||||
return rows_loaded
|
return rows_loaded
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@@ -427,7 +395,7 @@ class DimensionalModelLoader:
|
|||||||
def load_dim_source(self):
|
def load_dim_source(self):
|
||||||
table_name = 'dim_source'
|
table_name = 'dim_source'
|
||||||
self.load_metadata[table_name]['start_time'] = datetime.now()
|
self.load_metadata[table_name]['start_time'] = datetime.now()
|
||||||
self.logger.info("Loading dim_source -> [DW/Gold] fs_asean_gold...")
|
self.logger.info("Loading dim_source → [DW/Gold] fs_asean_gold...")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
source_details = {
|
source_details = {
|
||||||
@@ -487,7 +455,7 @@ class DimensionalModelLoader:
|
|||||||
)
|
)
|
||||||
log_update(self.client, 'DW', table_name, 'full_load', rows_loaded)
|
log_update(self.client, 'DW', table_name, 'full_load', rows_loaded)
|
||||||
self._save_table_metadata(table_name)
|
self._save_table_metadata(table_name)
|
||||||
self.logger.info(f" dim_source: {rows_loaded} rows\n")
|
self.logger.info(f" ✓ dim_source: {rows_loaded} rows\n")
|
||||||
return rows_loaded
|
return rows_loaded
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@@ -498,15 +466,15 @@ class DimensionalModelLoader:
|
|||||||
def load_dim_pillar(self):
|
def load_dim_pillar(self):
|
||||||
table_name = 'dim_pillar'
|
table_name = 'dim_pillar'
|
||||||
self.load_metadata[table_name]['start_time'] = datetime.now()
|
self.load_metadata[table_name]['start_time'] = datetime.now()
|
||||||
self.logger.info("Loading dim_pillar -> [DW/Gold] fs_asean_gold...")
|
self.logger.info("Loading dim_pillar → [DW/Gold] fs_asean_gold...")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
pillar_codes = {
|
pillar_codes = {
|
||||||
'Availability': 'AVL', 'Access' : 'ACC',
|
'Availability': 'AVL', 'Access' : 'ACC',
|
||||||
'Utilization' : 'UTL', 'Stability': 'STB', 'Other': 'OTH',
|
'Utilization' : 'UTL', 'Stability': 'STB', 'Sustainability': 'STN',
|
||||||
}
|
}
|
||||||
pillars_data = [
|
pillars_data = [
|
||||||
{'pillar_name': p, 'pillar_code': pillar_codes.get(p, 'OTH')}
|
{'pillar_name': p, 'pillar_code': pillar_codes.get(p, 'STN')}
|
||||||
for p in self.df_clean['pillar'].unique()
|
for p in self.df_clean['pillar'].unique()
|
||||||
]
|
]
|
||||||
|
|
||||||
@@ -533,7 +501,7 @@ class DimensionalModelLoader:
|
|||||||
)
|
)
|
||||||
log_update(self.client, 'DW', table_name, 'full_load', rows_loaded)
|
log_update(self.client, 'DW', table_name, 'full_load', rows_loaded)
|
||||||
self._save_table_metadata(table_name)
|
self._save_table_metadata(table_name)
|
||||||
self.logger.info(f" dim_pillar: {rows_loaded} rows\n")
|
self.logger.info(f" ✓ dim_pillar: {rows_loaded} rows\n")
|
||||||
return rows_loaded
|
return rows_loaded
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@@ -548,9 +516,10 @@ class DimensionalModelLoader:
|
|||||||
def load_fact_food_security(self):
|
def load_fact_food_security(self):
|
||||||
table_name = 'fact_food_security'
|
table_name = 'fact_food_security'
|
||||||
self.load_metadata[table_name]['start_time'] = datetime.now()
|
self.load_metadata[table_name]['start_time'] = datetime.now()
|
||||||
self.logger.info("Loading fact_food_security -> [DW/Gold] fs_asean_gold...")
|
self.logger.info("Loading fact_food_security → [DW/Gold] fs_asean_gold...")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
# Load dims dari Gold untuk FK resolution
|
||||||
dim_country = read_from_bigquery(self.client, 'dim_country', layer='gold')
|
dim_country = read_from_bigquery(self.client, 'dim_country', layer='gold')
|
||||||
dim_indicator = read_from_bigquery(self.client, 'dim_indicator', layer='gold')
|
dim_indicator = read_from_bigquery(self.client, 'dim_indicator', layer='gold')
|
||||||
dim_time = read_from_bigquery(self.client, 'dim_time', layer='gold')
|
dim_time = read_from_bigquery(self.client, 'dim_time', layer='gold')
|
||||||
@@ -592,9 +561,9 @@ class DimensionalModelLoader:
|
|||||||
fact_table['start_year'] = fact_table['year'].astype(int)
|
fact_table['start_year'] = fact_table['year'].astype(int)
|
||||||
fact_table['end_year'] = fact_table['year'].astype(int)
|
fact_table['end_year'] = fact_table['year'].astype(int)
|
||||||
|
|
||||||
|
# Resolve FKs
|
||||||
fact_table = fact_table.merge(
|
fact_table = fact_table.merge(
|
||||||
dim_country[['country_id', 'country_name']].rename(
|
dim_country[['country_id', 'country_name']].rename(columns={'country_name': 'country'}),
|
||||||
columns={'country_name': 'country'}),
|
|
||||||
on='country', how='left'
|
on='country', how='left'
|
||||||
)
|
)
|
||||||
fact_table = fact_table.merge(
|
fact_table = fact_table.merge(
|
||||||
@@ -607,16 +576,15 @@ class DimensionalModelLoader:
|
|||||||
on=['start_year', 'end_year'], how='left'
|
on=['start_year', 'end_year'], how='left'
|
||||||
)
|
)
|
||||||
fact_table = fact_table.merge(
|
fact_table = fact_table.merge(
|
||||||
dim_source[['source_id', 'source_name']].rename(
|
dim_source[['source_id', 'source_name']].rename(columns={'source_name': 'source'}),
|
||||||
columns={'source_name': 'source'}),
|
|
||||||
on='source', how='left'
|
on='source', how='left'
|
||||||
)
|
)
|
||||||
fact_table = fact_table.merge(
|
fact_table = fact_table.merge(
|
||||||
dim_pillar[['pillar_id', 'pillar_name']].rename(
|
dim_pillar[['pillar_id', 'pillar_name']].rename(columns={'pillar_name': 'pillar'}),
|
||||||
columns={'pillar_name': 'pillar'}),
|
|
||||||
on='pillar', how='left'
|
on='pillar', how='left'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Filter hanya row dengan FK lengkap
|
||||||
fact_table = fact_table[
|
fact_table = fact_table[
|
||||||
fact_table['country_id'].notna() &
|
fact_table['country_id'].notna() &
|
||||||
fact_table['indicator_id'].notna() &
|
fact_table['indicator_id'].notna() &
|
||||||
@@ -653,6 +621,7 @@ class DimensionalModelLoader:
|
|||||||
layer='gold', write_disposition="WRITE_TRUNCATE", schema=schema
|
layer='gold', write_disposition="WRITE_TRUNCATE", schema=schema
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Add PK + FKs
|
||||||
self._add_primary_key(table_name, 'fact_id')
|
self._add_primary_key(table_name, 'fact_id')
|
||||||
self._add_foreign_key(table_name, 'country_id', 'dim_country', 'country_id')
|
self._add_foreign_key(table_name, 'country_id', 'dim_country', 'country_id')
|
||||||
self._add_foreign_key(table_name, 'indicator_id', 'dim_indicator', 'indicator_id')
|
self._add_foreign_key(table_name, 'indicator_id', 'dim_indicator', 'indicator_id')
|
||||||
@@ -665,7 +634,7 @@ class DimensionalModelLoader:
|
|||||||
)
|
)
|
||||||
log_update(self.client, 'DW', table_name, 'full_load', rows_loaded)
|
log_update(self.client, 'DW', table_name, 'full_load', rows_loaded)
|
||||||
self._save_table_metadata(table_name)
|
self._save_table_metadata(table_name)
|
||||||
self.logger.info(f" fact_food_security: {rows_loaded:,} rows\n")
|
self.logger.info(f" ✓ fact_food_security: {rows_loaded:,} rows\n")
|
||||||
return rows_loaded
|
return rows_loaded
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@@ -748,15 +717,11 @@ class DimensionalModelLoader:
|
|||||||
FROM `{get_table_id('dim_indicator', layer='gold')}`
|
FROM `{get_table_id('dim_indicator', layer='gold')}`
|
||||||
GROUP BY direction ORDER BY direction
|
GROUP BY direction ORDER BY direction
|
||||||
"""
|
"""
|
||||||
df_dir = self.client.query(query_dir).result().to_dataframe(
|
df_dir = self.client.query(query_dir).result().to_dataframe(create_bqstorage_client=False)
|
||||||
create_bqstorage_client=False
|
|
||||||
)
|
|
||||||
if len(df_dir) > 0:
|
if len(df_dir) > 0:
|
||||||
self.logger.info(f"\n Direction Distribution:")
|
self.logger.info(f"\n Direction Distribution:")
|
||||||
for _, row in df_dir.iterrows():
|
for _, row in df_dir.iterrows():
|
||||||
self.logger.info(
|
self.logger.info(f" {row['direction']:15s}: {int(row['count']):>5,} indicators")
|
||||||
f" {row['direction']:15s}: {int(row['count']):>5,} indicators"
|
|
||||||
)
|
|
||||||
|
|
||||||
self.logger.info("\n [OK] Validation completed")
|
self.logger.info("\n [OK] Validation completed")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@@ -773,19 +738,22 @@ class DimensionalModelLoader:
|
|||||||
self.pipeline_metadata['rows_fetched'] = len(self.df_clean)
|
self.pipeline_metadata['rows_fetched'] = len(self.df_clean)
|
||||||
|
|
||||||
self.logger.info("\n" + "=" * 60)
|
self.logger.info("\n" + "=" * 60)
|
||||||
self.logger.info("DIMENSIONAL MODEL LOAD — DW (Gold) -> fs_asean_gold")
|
self.logger.info("DIMENSIONAL MODEL LOAD — DW (Gold) → fs_asean_gold")
|
||||||
self.logger.info("=" * 60)
|
self.logger.info("=" * 60)
|
||||||
|
|
||||||
self.logger.info("\nLOADING DIMENSION TABLES -> fs_asean_gold")
|
# Dimensions
|
||||||
|
self.logger.info("\nLOADING DIMENSION TABLES → fs_asean_gold")
|
||||||
self.load_dim_country()
|
self.load_dim_country()
|
||||||
self.load_dim_indicator()
|
self.load_dim_indicator()
|
||||||
self.load_dim_time()
|
self.load_dim_time()
|
||||||
self.load_dim_source()
|
self.load_dim_source()
|
||||||
self.load_dim_pillar()
|
self.load_dim_pillar()
|
||||||
|
|
||||||
self.logger.info("\nLOADING FACT TABLE -> fs_asean_gold")
|
# Fact
|
||||||
|
self.logger.info("\nLOADING FACT TABLE → fs_asean_gold")
|
||||||
self.load_fact_food_security()
|
self.load_fact_food_security()
|
||||||
|
|
||||||
|
# Validate
|
||||||
self.validate_constraints()
|
self.validate_constraints()
|
||||||
self.validate_data_load()
|
self.validate_data_load()
|
||||||
|
|
||||||
@@ -794,23 +762,22 @@ class DimensionalModelLoader:
|
|||||||
total_loaded = sum(m['rows_loaded'] for m in self.load_metadata.values())
|
total_loaded = sum(m['rows_loaded'] for m in self.load_metadata.values())
|
||||||
|
|
||||||
self.pipeline_metadata.update({
|
self.pipeline_metadata.update({
|
||||||
'end_time' : pipeline_end,
|
'end_time' : pipeline_end,
|
||||||
'duration_seconds' : duration,
|
'duration_seconds' : duration,
|
||||||
'rows_transformed' : total_loaded,
|
'rows_transformed' : total_loaded,
|
||||||
'rows_loaded' : total_loaded,
|
'rows_loaded' : total_loaded,
|
||||||
'execution_timestamp': self.pipeline_metadata['start_time'],
|
'execution_timestamp': self.pipeline_metadata['start_time'],
|
||||||
'completeness_pct' : 100.0,
|
'completeness_pct' : 100.0,
|
||||||
'config_snapshot' : json.dumps({'load_mode': 'full_refresh', 'layer': 'gold'}),
|
'config_snapshot' : json.dumps({'load_mode': 'full_refresh', 'layer': 'gold'}),
|
||||||
'validation_metrics' : json.dumps(
|
'validation_metrics': json.dumps({t: m['status'] for t, m in self.load_metadata.items()}),
|
||||||
{t: m['status'] for t, m in self.load_metadata.items()}
|
'table_name' : 'dimensional_model_pipeline',
|
||||||
),
|
|
||||||
'table_name' : 'dimensional_model_pipeline',
|
|
||||||
})
|
})
|
||||||
try:
|
try:
|
||||||
save_etl_metadata(self.client, self.pipeline_metadata)
|
save_etl_metadata(self.client, self.pipeline_metadata)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.logger.warning(f" [WARN] Could not save pipeline metadata: {e}")
|
self.logger.warning(f" [WARN] Could not save pipeline metadata: {e}")
|
||||||
|
|
||||||
|
# Summary
|
||||||
self.logger.info("\n" + "=" * 60)
|
self.logger.info("\n" + "=" * 60)
|
||||||
self.logger.info("DIMENSIONAL MODEL LOAD COMPLETED")
|
self.logger.info("DIMENSIONAL MODEL LOAD COMPLETED")
|
||||||
self.logger.info("=" * 60)
|
self.logger.info("=" * 60)
|
||||||
@@ -818,19 +785,20 @@ class DimensionalModelLoader:
|
|||||||
self.logger.info(f" Duration : {duration:.2f}s")
|
self.logger.info(f" Duration : {duration:.2f}s")
|
||||||
self.logger.info(f" Tables :")
|
self.logger.info(f" Tables :")
|
||||||
for tbl, meta in self.load_metadata.items():
|
for tbl, meta in self.load_metadata.items():
|
||||||
icon = "OK" if meta['status'] == 'success' else "FAIL"
|
icon = "✓" if meta['status'] == 'success' else "✗"
|
||||||
self.logger.info(f" [{icon}] {tbl:25s}: {meta['rows_loaded']:>10,} rows")
|
self.logger.info(f" {icon} {tbl:25s}: {meta['rows_loaded']:>10,} rows")
|
||||||
self.logger.info(f"\n Metadata -> [AUDIT] etl_metadata")
|
self.logger.info(f"\n Metadata → [AUDIT] etl_metadata")
|
||||||
self.logger.info("=" * 60)
|
self.logger.info("=" * 60)
|
||||||
|
|
||||||
|
|
||||||
# =============================================================================
|
# =============================================================================
|
||||||
# AIRFLOW TASK FUNCTIONS
|
# AIRFLOW TASK FUNCTIONS ← sama polanya dengan raw & cleaned layer
|
||||||
# =============================================================================
|
# =============================================================================
|
||||||
|
|
||||||
def run_dimensional_model():
|
def run_dimensional_model():
|
||||||
"""
|
"""
|
||||||
Airflow task: Load dimensional model dari cleaned_integrated.
|
Airflow task: Load dimensional model dari cleaned_integrated.
|
||||||
|
|
||||||
Dipanggil oleh DAG setelah task cleaned_integration_to_silver selesai.
|
Dipanggil oleh DAG setelah task cleaned_integration_to_silver selesai.
|
||||||
"""
|
"""
|
||||||
from scripts.bigquery_config import get_bigquery_client
|
from scripts.bigquery_config import get_bigquery_client
|
||||||
@@ -849,9 +817,9 @@ if __name__ == "__main__":
|
|||||||
print("=" * 60)
|
print("=" * 60)
|
||||||
print("BIGQUERY DIMENSIONAL MODEL LOAD")
|
print("BIGQUERY DIMENSIONAL MODEL LOAD")
|
||||||
print("Kimball DW Architecture")
|
print("Kimball DW Architecture")
|
||||||
print(" Input : STAGING (Silver) -> cleaned_integrated (fs_asean_silver)")
|
print(" Input : STAGING (Silver) → cleaned_integrated (fs_asean_silver)")
|
||||||
print(" Output : DW (Gold) -> dim_*, fact_* (fs_asean_gold)")
|
print(" Output : DW (Gold) → dim_*, fact_* (fs_asean_gold)")
|
||||||
print(" Audit : AUDIT -> etl_logs, etl_metadata (fs_asean_audit)")
|
print(" Audit : AUDIT → etl_logs, etl_metadata (fs_asean_audit)")
|
||||||
print("=" * 60)
|
print("=" * 60)
|
||||||
|
|
||||||
logger = setup_logging()
|
logger = setup_logging()
|
||||||
@@ -859,22 +827,24 @@ if __name__ == "__main__":
|
|||||||
|
|
||||||
print("\nLoading cleaned_integrated (fs_asean_silver)...")
|
print("\nLoading cleaned_integrated (fs_asean_silver)...")
|
||||||
df_clean = read_from_bigquery(client, 'cleaned_integrated', layer='silver')
|
df_clean = read_from_bigquery(client, 'cleaned_integrated', layer='silver')
|
||||||
print(f" Loaded : {len(df_clean):,} rows")
|
print(f" ✓ Loaded : {len(df_clean):,} rows")
|
||||||
print(f" Columns : {len(df_clean.columns)}")
|
print(f" Columns : {len(df_clean.columns)}")
|
||||||
print(f" Sources : {df_clean['source'].nunique()}")
|
print(f" Sources : {df_clean['source'].nunique()}")
|
||||||
print(f" Indicators : {df_clean['indicator_standardized'].nunique()}")
|
print(f" Indicators : {df_clean['indicator_standardized'].nunique()}")
|
||||||
print(f" Countries : {df_clean['country'].nunique()}")
|
print(f" Countries : {df_clean['country'].nunique()}")
|
||||||
print(f" Year range : {int(df_clean['year'].min())}-{int(df_clean['year'].max())}")
|
print(f" Year range : {int(df_clean['year'].min())}–{int(df_clean['year'].max())}")
|
||||||
if 'direction' in df_clean.columns:
|
if 'direction' in df_clean.columns:
|
||||||
print(f" Direction : {df_clean['direction'].value_counts().to_dict()}")
|
print(f" Direction : {df_clean['direction'].value_counts().to_dict()}")
|
||||||
|
else:
|
||||||
|
print(f" [WARN] direction column not found — run bigquery_cleaned_layer.py first")
|
||||||
|
|
||||||
print("\n[1/1] Dimensional Model Load -> DW (Gold)...")
|
print("\n[1/1] Dimensional Model Load → DW (Gold)...")
|
||||||
loader = DimensionalModelLoader(client, df_clean)
|
loader = DimensionalModelLoader(client, df_clean)
|
||||||
loader.run()
|
loader.run()
|
||||||
|
|
||||||
print("\n" + "=" * 60)
|
print("\n" + "=" * 60)
|
||||||
print("[OK] DIMENSIONAL MODEL ETL COMPLETED")
|
print("✓ DIMENSIONAL MODEL ETL COMPLETED")
|
||||||
print(" DW (Gold) : dim_country, dim_indicator, dim_time,")
|
print(" 🥇 DW (Gold) : dim_country, dim_indicator, dim_time,")
|
||||||
print(" dim_source, dim_pillar, fact_food_security")
|
print(" dim_source, dim_pillar, fact_food_security")
|
||||||
print(" AUDIT : etl_logs, etl_metadata")
|
print(" 📋 AUDIT : etl_logs, etl_metadata")
|
||||||
print("=" * 60)
|
print("=" * 60)
|
||||||
Reference in New Issue
Block a user