code last done
This commit is contained in:
@@ -40,12 +40,12 @@ def load_staging_data(client: bigquery.Client) -> pd.DataFrame:
|
||||
"""Load data dari staging_integrated (STAGING/Silver layer)."""
|
||||
print("\nLoading data from staging_integrated (fs_asean_silver)...")
|
||||
df_staging = read_from_bigquery(client, 'staging_integrated', layer='silver')
|
||||
print(f" Loaded : {len(df_staging):,} rows")
|
||||
print(f" Columns : {len(df_staging.columns)}")
|
||||
print(f" Sources : {df_staging['source'].nunique()}")
|
||||
print(f" Indicators : {df_staging['indicator_standardized'].nunique()}")
|
||||
print(f" Countries : {df_staging['country'].nunique()}")
|
||||
print(f" Year range : {int(df_staging['year'].min())}-{int(df_staging['year'].max())}")
|
||||
print(f" ✓ Loaded : {len(df_staging):,} rows")
|
||||
print(f" Columns : {len(df_staging.columns)}")
|
||||
print(f" Sources : {df_staging['source'].nunique()}")
|
||||
print(f" Indicators : {df_staging['indicator_standardized'].nunique()}")
|
||||
print(f" Countries : {df_staging['country'].nunique()}")
|
||||
print(f" Year range : {int(df_staging['year'].min())}-{int(df_staging['year'].max())}")
|
||||
return df_staging
|
||||
|
||||
|
||||
@@ -53,6 +53,7 @@ def load_staging_data(client: bigquery.Client) -> pd.DataFrame:
|
||||
# COLUMN CONSTRAINT HELPERS
|
||||
# =============================================================================
|
||||
|
||||
# Schema constraints — semua varchar max lengths
|
||||
COLUMN_CONSTRAINTS = {
|
||||
'source' : 20,
|
||||
'indicator_original' : 255,
|
||||
@@ -61,7 +62,7 @@ COLUMN_CONSTRAINTS = {
|
||||
'year_range' : 20,
|
||||
'unit' : 20,
|
||||
'pillar' : 20,
|
||||
'direction' : 15,
|
||||
'direction' : 15, # 'higher_better'=13, 'lower_better'=12
|
||||
}
|
||||
|
||||
|
||||
@@ -100,11 +101,11 @@ def apply_column_constraints(df: pd.DataFrame) -> pd.DataFrame:
|
||||
)
|
||||
|
||||
if truncation_report:
|
||||
print("\n Column Truncations Applied:")
|
||||
print("\n ⚠ Column Truncations Applied:")
|
||||
for column, info in truncation_report.items():
|
||||
print(f" - {column}: {info['count']} values truncated to {info['max_length']} chars")
|
||||
else:
|
||||
print("\n No truncations needed — all values within constraints")
|
||||
print("\n ✓ No truncations needed — all values within constraints")
|
||||
|
||||
return df_constrained
|
||||
|
||||
@@ -155,11 +156,11 @@ def standardize_country_names_asean(df: pd.DataFrame, country_column: str = 'cou
|
||||
def map_country(country):
|
||||
if pd.isna(country):
|
||||
return country
|
||||
s = str(country).strip()
|
||||
s = str(country).strip()
|
||||
mapped = ASEAN_MAPPING.get(s.upper(), s)
|
||||
return mapped[:100] if len(mapped) > 100 else mapped
|
||||
|
||||
original = df_clean[country_column].copy()
|
||||
original = df_clean[country_column].copy()
|
||||
df_clean[country_column] = df_clean[country_column].apply(map_country)
|
||||
changes = {orig: new for orig, new in zip(original, df_clean[country_column]) if orig != new}
|
||||
|
||||
@@ -176,16 +177,16 @@ def standardize_country_names_asean(df: pd.DataFrame, country_column: str = 'cou
|
||||
def assign_pillar(indicator_name: str) -> str:
|
||||
"""
|
||||
Assign pillar berdasarkan keyword indikator.
|
||||
Return values: 'Availability', 'Access', 'Utilization', 'Stability', 'Supporting'
|
||||
All <= 20 chars (varchar(20) constraint).
|
||||
Return values: 'Availability', 'Access', 'Utilization', 'Stability', 'Other'
|
||||
All ≤ 20 chars (varchar(20) constraint).
|
||||
"""
|
||||
if pd.isna(indicator_name):
|
||||
return 'Supporting'
|
||||
return 'Other'
|
||||
ind = str(indicator_name).lower()
|
||||
|
||||
for kw in ['requirement', 'coefficient', 'losses', 'fat supply']:
|
||||
if kw in ind:
|
||||
return 'Supporting'
|
||||
return 'Other'
|
||||
|
||||
if any(kw in ind for kw in [
|
||||
'adequacy', 'protein supply', 'supply of protein',
|
||||
@@ -209,13 +210,12 @@ def assign_pillar(indicator_name: str) -> str:
|
||||
|
||||
if any(kw in ind for kw in [
|
||||
'wasting', 'wasted', 'stunted', 'overweight', 'obese', 'obesity',
|
||||
'anemia', 'anaemia', 'birthweight', 'breastfeeding', 'drinking water',
|
||||
'sanitation', 'children under 5', 'newborns with low',
|
||||
'women of reproductive'
|
||||
'anemia', 'birthweight', 'breastfeeding', 'drinking water', 'sanitation',
|
||||
'children under 5', 'newborns with low', 'women of reproductive'
|
||||
]):
|
||||
return 'Utilization'
|
||||
|
||||
return 'Supporting'
|
||||
return 'Other'
|
||||
|
||||
|
||||
# =============================================================================
|
||||
@@ -226,15 +226,17 @@ def assign_direction(indicator_name: str) -> str:
|
||||
"""
|
||||
Assign direction berdasarkan indikator.
|
||||
Return values: 'higher_better' (13 chars) atau 'lower_better' (12 chars)
|
||||
Both <= 15 chars (varchar(15) constraint).
|
||||
Both ≤ 15 chars (varchar(15) constraint).
|
||||
"""
|
||||
if pd.isna(indicator_name):
|
||||
return 'higher_better'
|
||||
ind = str(indicator_name).lower()
|
||||
|
||||
# Spesifik lower_better
|
||||
if 'share of dietary energy supply derived from cereals' in ind:
|
||||
return 'lower_better'
|
||||
|
||||
# Higher_better exceptions — cek sebelum lower_better keywords
|
||||
for kw in [
|
||||
'exclusive breastfeeding',
|
||||
'dietary energy supply',
|
||||
@@ -246,6 +248,7 @@ def assign_direction(indicator_name: str) -> str:
|
||||
if kw in ind:
|
||||
return 'higher_better'
|
||||
|
||||
# Lower_better — masalah yang harus diminimalkan
|
||||
for kw in [
|
||||
'prevalence of undernourishment',
|
||||
'prevalence of severe food insecurity',
|
||||
@@ -256,7 +259,6 @@ def assign_direction(indicator_name: str) -> str:
|
||||
'prevalence of overweight',
|
||||
'prevalence of obesity',
|
||||
'prevalence of anemia',
|
||||
'prevalence of anaemia',
|
||||
'prevalence of low birthweight',
|
||||
'number of people undernourished',
|
||||
'number of severely food insecure',
|
||||
@@ -281,9 +283,6 @@ def assign_direction(indicator_name: str) -> str:
|
||||
'coefficient of variation',
|
||||
'incidence of caloric losses',
|
||||
'food losses',
|
||||
'indicator of food price anomalies',
|
||||
'proportion of local breeds classified as being at risk',
|
||||
'agricultural export subsidies',
|
||||
]:
|
||||
if kw in ind:
|
||||
return 'lower_better'
|
||||
@@ -300,18 +299,19 @@ class CleanedDataLoader:
|
||||
Loader untuk cleaned integrated data ke STAGING layer (Silver).
|
||||
|
||||
Kimball context:
|
||||
Input : staging_integrated -> STAGING (Silver) — fs_asean_silver
|
||||
Output : cleaned_integrated -> STAGING (Silver) — fs_asean_silver
|
||||
Audit : etl_logs, etl_metadata -> AUDIT — fs_asean_audit
|
||||
Input : staging_integrated → STAGING (Silver) — fs_asean_silver
|
||||
Output : cleaned_integrated → STAGING (Silver) — fs_asean_silver
|
||||
Audit : etl_logs, etl_metadata → AUDIT — fs_asean_audit
|
||||
|
||||
Pipeline steps:
|
||||
1. Standardize country names (ASEAN)
|
||||
2. Remove missing values
|
||||
3. Remove duplicates
|
||||
4. Add pillar & direction classification
|
||||
5. Apply column constraints
|
||||
6. Load ke BigQuery
|
||||
7. Log ke Audit layer
|
||||
4. Add pillar classification
|
||||
5. Add direction classification
|
||||
6. Apply column constraints
|
||||
7. Load ke BigQuery
|
||||
8. Log ke Audit layer
|
||||
"""
|
||||
|
||||
SCHEMA = [
|
||||
@@ -355,7 +355,7 @@ class CleanedDataLoader:
|
||||
def _step_standardize_countries(self, df: pd.DataFrame) -> pd.DataFrame:
|
||||
print("\n [Step 1/5] Standardize country names...")
|
||||
df, report = standardize_country_names_asean(df, country_column='country')
|
||||
print(f" ASEAN countries mapped : {report['countries_mapped']}")
|
||||
print(f" ✓ ASEAN countries mapped : {report['countries_mapped']}")
|
||||
unique_countries = sorted(df['country'].unique())
|
||||
print(f" Countries ({len(unique_countries)}) : {', '.join(unique_countries)}")
|
||||
log_update(self.client, 'STAGING', 'staging_integrated',
|
||||
@@ -377,9 +377,7 @@ class CleanedDataLoader:
|
||||
def _step_remove_duplicates(self, df: pd.DataFrame) -> pd.DataFrame:
|
||||
print("\n [Step 3/5] Remove duplicates...")
|
||||
exact_dups = df.duplicated().sum()
|
||||
data_dups = df.duplicated(
|
||||
subset=['indicator_standardized', 'country', 'year', 'value']
|
||||
).sum()
|
||||
data_dups = df.duplicated(subset=['indicator_standardized', 'country', 'year', 'value']).sum()
|
||||
print(f" Exact duplicates : {exact_dups:,}")
|
||||
print(f" Data duplicates : {data_dups:,}")
|
||||
rows_before = len(df)
|
||||
@@ -393,21 +391,19 @@ class CleanedDataLoader:
|
||||
def _step_add_classifications(self, df: pd.DataFrame) -> pd.DataFrame:
|
||||
print("\n [Step 4/5] Add pillar & direction classification...")
|
||||
df = df.copy()
|
||||
|
||||
df['pillar'] = df['indicator_standardized'].apply(assign_pillar)
|
||||
df['direction'] = df['indicator_standardized'].apply(assign_direction)
|
||||
|
||||
pillar_counts = df['pillar'].value_counts()
|
||||
print(f" Pillar distribution:")
|
||||
print(f" ✓ Pillar distribution:")
|
||||
for pillar, count in pillar_counts.items():
|
||||
print(f" - {pillar}: {count:,}")
|
||||
|
||||
direction_counts = df['direction'].value_counts()
|
||||
print(f" Direction distribution:")
|
||||
print(f" ✓ Direction distribution:")
|
||||
for direction, count in direction_counts.items():
|
||||
pct = count / len(df) * 100
|
||||
print(f" - {direction}: {count:,} ({pct:.1f}%)")
|
||||
|
||||
return df
|
||||
|
||||
def _step_apply_constraints(self, df: pd.DataFrame) -> pd.DataFrame:
|
||||
@@ -442,6 +438,7 @@ class CleanedDataLoader:
|
||||
if 'country' in df.columns:
|
||||
validation['unique_countries'] = int(df['country'].nunique())
|
||||
|
||||
# Column length check
|
||||
column_length_check = {}
|
||||
for col, max_len in COLUMN_CONSTRAINTS.items():
|
||||
if col in df.columns:
|
||||
@@ -460,7 +457,7 @@ class CleanedDataLoader:
|
||||
|
||||
def run(self, df: pd.DataFrame) -> int:
|
||||
"""
|
||||
Execute full cleaning pipeline -> load ke STAGING (Silver).
|
||||
Execute full cleaning pipeline → load ke STAGING (Silver).
|
||||
|
||||
Returns:
|
||||
int: Rows loaded
|
||||
@@ -472,6 +469,7 @@ class CleanedDataLoader:
|
||||
print(" ERROR: DataFrame is empty, nothing to process.")
|
||||
return 0
|
||||
|
||||
# Pipeline steps
|
||||
df = self._step_standardize_countries(df)
|
||||
df = self._step_remove_missing(df)
|
||||
df = self._step_remove_duplicates(df)
|
||||
@@ -480,6 +478,7 @@ class CleanedDataLoader:
|
||||
|
||||
self.metadata['rows_transformed'] = len(df)
|
||||
|
||||
# Validate
|
||||
validation = self.validate_data(df)
|
||||
self.metadata['validation_metrics'] = validation
|
||||
|
||||
@@ -488,12 +487,13 @@ class CleanedDataLoader:
|
||||
for info in validation.get('column_length_check', {}).values()
|
||||
)
|
||||
if not all_within_limits:
|
||||
print("\n WARNING: Some columns still exceed length constraints!")
|
||||
print("\n ⚠ WARNING: Some columns still exceed length constraints!")
|
||||
for col, info in validation['column_length_check'].items():
|
||||
if not info['within_limit']:
|
||||
print(f" - {col}: {info['max_actual_length']} > {info['max_length_constraint']}")
|
||||
|
||||
print(f"\n Loading to [STAGING/Silver] {self.table_name} -> fs_asean_silver...")
|
||||
# Load ke Silver
|
||||
print(f"\n Loading to [STAGING/Silver] {self.table_name} → fs_asean_silver...")
|
||||
rows_loaded = load_to_bigquery(
|
||||
self.client, df, self.table_name,
|
||||
layer='silver',
|
||||
@@ -502,8 +502,10 @@ class CleanedDataLoader:
|
||||
)
|
||||
self.metadata['rows_loaded'] = rows_loaded
|
||||
|
||||
# Audit logs
|
||||
log_update(self.client, 'STAGING', self.table_name, 'full_refresh', rows_loaded)
|
||||
|
||||
# ETL metadata
|
||||
self.metadata['end_time'] = datetime.now()
|
||||
self.metadata['duration_seconds'] = (
|
||||
self.metadata['end_time'] - self.metadata['start_time']
|
||||
@@ -514,31 +516,33 @@ class CleanedDataLoader:
|
||||
self.metadata['validation_metrics'] = json.dumps(validation)
|
||||
save_etl_metadata(self.client, self.metadata)
|
||||
|
||||
print(f"\n Cleaned Integration completed: {rows_loaded:,} rows")
|
||||
# Summary
|
||||
print(f"\n ✓ Cleaned Integration completed: {rows_loaded:,} rows")
|
||||
print(f" Duration : {self.metadata['duration_seconds']:.2f}s")
|
||||
print(f" Completeness : {validation['completeness_pct']:.2f}%")
|
||||
if 'year_range' in validation:
|
||||
yr = validation['year_range']
|
||||
if yr['min'] and yr['max']:
|
||||
print(f" Year range : {yr['min']}-{yr['max']}")
|
||||
print(f" Year range : {yr['min']}–{yr['max']}")
|
||||
print(f" Indicators : {validation.get('unique_indicators', '-')}")
|
||||
print(f" Countries : {validation.get('unique_countries', '-')}")
|
||||
print(f"\n Schema Validation:")
|
||||
for col, info in validation.get('column_length_check', {}).items():
|
||||
status = "OK" if info['within_limit'] else "FAIL"
|
||||
print(f" [{status}] {col}: {info['max_actual_length']}/{info['max_length_constraint']}")
|
||||
print(f"\n Metadata -> [AUDIT] etl_metadata")
|
||||
status = "✓" if info['within_limit'] else "✗"
|
||||
print(f" {status} {col}: {info['max_actual_length']}/{info['max_length_constraint']}")
|
||||
print(f"\n Metadata → [AUDIT] etl_metadata")
|
||||
|
||||
return rows_loaded
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# AIRFLOW TASK FUNCTIONS
|
||||
# AIRFLOW TASK FUNCTIONS ← sama polanya dengan raw layer
|
||||
# =============================================================================
|
||||
|
||||
def run_cleaned_integration():
|
||||
"""
|
||||
Airflow task: Load cleaned_integrated dari staging_integrated.
|
||||
|
||||
Dipanggil oleh DAG setelah task staging_integration_to_silver selesai.
|
||||
"""
|
||||
from scripts.bigquery_config import get_bigquery_client
|
||||
@@ -557,21 +561,21 @@ if __name__ == "__main__":
|
||||
print("=" * 60)
|
||||
print("BIGQUERY CLEANED LAYER ETL")
|
||||
print("Kimball DW Architecture")
|
||||
print(" Input : STAGING (Silver) -> staging_integrated")
|
||||
print(" Output : STAGING (Silver) -> cleaned_integrated")
|
||||
print(" Audit : AUDIT -> etl_logs, etl_metadata")
|
||||
print(" Input : STAGING (Silver) → staging_integrated")
|
||||
print(" Output : STAGING (Silver) → cleaned_integrated")
|
||||
print(" Audit : AUDIT → etl_logs, etl_metadata")
|
||||
print("=" * 60)
|
||||
|
||||
logger = setup_logging()
|
||||
client = get_bigquery_client()
|
||||
df_staging = load_staging_data(client)
|
||||
|
||||
print("\n[1/1] Cleaned Integration -> STAGING (Silver)...")
|
||||
print("\n[1/1] Cleaned Integration → STAGING (Silver)...")
|
||||
loader = CleanedDataLoader(client, load_mode='full_refresh')
|
||||
final_count = loader.run(df_staging)
|
||||
|
||||
print("\n" + "=" * 60)
|
||||
print("[OK] CLEANED LAYER ETL COMPLETED")
|
||||
print(f" STAGING (Silver) : cleaned_integrated ({final_count:,} rows)")
|
||||
print(f" AUDIT : etl_logs, etl_metadata")
|
||||
print("✓ CLEANED LAYER ETL COMPLETED")
|
||||
print(f" 🥈 STAGING (Silver) : cleaned_integrated ({final_count:,} rows)")
|
||||
print(f" 📋 AUDIT : etl_logs, etl_metadata")
|
||||
print("=" * 60)
|
||||
Reference in New Issue
Block a user