From 0235dfbc75ae3b5adbeb73aca40136c5d3421a9e Mon Sep 17 00:00:00 2001 From: Debby Date: Thu, 12 Mar 2026 14:57:30 +0700 Subject: [PATCH] raw and staging data --- dags/trial.py | 17 ------ scripts/bigquery_config.py | 64 ++------------------ scripts/bigquery_datasource.py | 105 ++++----------------------------- scripts/bigquery_helpers.py | 4 +- scripts/bigquery_raw_layer.py | 59 ++++-------------- 5 files changed, 30 insertions(+), 219 deletions(-) delete mode 100644 dags/trial.py diff --git a/dags/trial.py b/dags/trial.py deleted file mode 100644 index 0ac7d93..0000000 --- a/dags/trial.py +++ /dev/null @@ -1,17 +0,0 @@ -from airflow import DAG -from airflow.operators.python import PythonOperator -from datetime import datetime - -from scripts.test_data import run_fao_test - -with DAG( - dag_id="etl_fao_bigquery", - start_date=datetime(2026, 3, 3), - schedule_interval="@daily", - catchup=False -) as dag: - - task_load_fao = PythonOperator( - task_id="load_fao_to_bigquery", - python_callable=run_fao_test - ) \ No newline at end of file diff --git a/scripts/bigquery_config.py b/scripts/bigquery_config.py index 31685f0..8b29122 100644 --- a/scripts/bigquery_config.py +++ b/scripts/bigquery_config.py @@ -21,6 +21,7 @@ Kimball ETL Flow: """ import os +import json from pathlib import Path from google.cloud import bigquery from google.oauth2 import service_account @@ -88,25 +89,6 @@ KIMBALL_LAYER_MAP = { "dw" : "gold", } -# SETUP BIGQUERY CLIENT - -def get_bigquery_client() -> bigquery.Client: - """ - Create BigQuery client dengan service account credentials - - Returns: - bigquery.Client: Authenticated BigQuery client - """ - credentials = service_account.Credentials.from_service_account_file( - CREDENTIALS_PATH, - scopes=["https://www.googleapis.com/auth/cloud-platform"] - ) - return bigquery.Client( - credentials=credentials, - project=PROJECT_ID, - location=LOCATION - ) - # MATCHING CONFIGURATION CONFIG = { @@ -166,7 +148,6 @@ for directory in [EXPORTS_DIR, LOGS_DIR]: # HELPER FUNCTIONS def get_table_id(table_name: str, layer: str = "bronze") -> str: - # Resolve Kimball alias ke layer name resolved = KIMBALL_LAYER_MAP.get(layer.lower(), layer.lower()) dataset = LAYER_DATASET_MAP.get(resolved, DATASET_BRONZE) @@ -174,17 +155,6 @@ def get_table_id(table_name: str, layer: str = "bronze") -> str: def table_exists(client: bigquery.Client, table_name: str, layer: str = "bronze") -> bool: - """ - Check apakah table ada di BigQuery - - Args: - client : BigQuery client - table_name : Nama table - layer : Layer — 'bronze'/'raw', 'silver'/'staging', 'gold'/'dw' - - Returns: - bool: True jika table ada - """ try: client.get_table(get_table_id(table_name, layer)) return True @@ -193,14 +163,6 @@ def table_exists(client: bigquery.Client, table_name: str, layer: str = "bronze" def delete_table(client: bigquery.Client, table_name: str, layer: str = "bronze"): - """ - Delete table jika ada - - Args: - client : BigQuery client - table_name : Nama table - layer : Layer — 'bronze'/'raw', 'silver'/'staging', 'gold'/'dw' - """ table_id = get_table_id(table_name, layer) try: client.delete_table(table_id, not_found_ok=True) @@ -210,13 +172,6 @@ def delete_table(client: bigquery.Client, table_name: str, layer: str = "bronze" def create_dataset_if_not_exists(client: bigquery.Client, dataset_id: str): - """ - Create dataset jika belum ada - - Args: - client : BigQuery client - dataset_id : Dataset ID string - """ full_id = f"{PROJECT_ID}.{dataset_id}" try: client.get_dataset(full_id) @@ -229,7 +184,6 @@ def create_dataset_if_not_exists(client: bigquery.Client, dataset_id: str): def create_all_datasets(client: bigquery.Client): - """Create semua 3 dataset (Raw/Staging/DW) jika belum ada""" print("Setting up BigQuery Datasets (Kimball DW)...") for layer, dataset_id in LAYER_DATASET_MAP.items(): create_dataset_if_not_exists(client, dataset_id) @@ -238,21 +192,14 @@ def create_all_datasets(client: bigquery.Client): # VERIFICATION def verify_setup() -> bool: - """ - Verify BigQuery setup untuk semua 3 layer (Raw / Staging / DW) - - Checks: - 1. Credentials file exists - 2. Koneksi ke BigQuery berhasil - 3. Semua dataset ada atau berhasil dibuat - """ print("=" * 60) print("BIGQUERY SETUP VERIFICATION") print("Kimball DW Architecture") print("=" * 60) # 1. Credentials - if not os.path.exists(CREDENTIALS_PATH): + credentials_json = os.environ.get("GOOGLE_CREDENTIALS_JSON") + if not credentials_json and not os.path.exists(CREDENTIALS_PATH): print(f"Credentials not found : {CREDENTIALS_PATH}") return False print(f"✓ Credentials found") @@ -284,15 +231,16 @@ def verify_setup() -> bool: print("=" * 60) return True + # INITIALIZE ON IMPORT if __name__ == "__main__": verify_setup() else: print("BigQuery Config Loaded — Kimball DW Architecture") - print(f" Project : {PROJECT_ID}") + print(f" Project : {PROJECT_ID}") print(f" Raw (Bronze) : {DATASET_BRONZE}") print(f" Staging (Silver) : {DATASET_SILVER}") print(f" DW (Gold) : {DATASET_GOLD}") print(f" Audit : {DATASET_AUDIT}") - print(f" Location : {LOCATION}") \ No newline at end of file + print(f" Location : {LOCATION}") \ No newline at end of file diff --git a/scripts/bigquery_datasource.py b/scripts/bigquery_datasource.py index 859e521..2cddfb9 100644 --- a/scripts/bigquery_datasource.py +++ b/scripts/bigquery_datasource.py @@ -11,13 +11,6 @@ Subclass yang menggunakan DataSource: FAODataSource → load ke RAW (Bronze) : raw_fao WorldBankDataSource → load ke RAW (Bronze) : raw_worldbank UNICEFDataSource → load ke RAW (Bronze) : raw_unicef - -Changes from MySQL version: - 1. Replace SQLAlchemy engine → BigQuery client - 2. Replace to_sql() → load_table_from_dataframe() - 3. load_to_database() default layer = 'bronze' (RAW layer) - 4. log_update() menggunakan label 'RAW' sesuai Kimball terminology - 5. save_metadata() → save_etl_metadata() ke STAGING layer (Silver) """ from abc import ABC, abstractmethod @@ -27,8 +20,8 @@ from datetime import datetime from typing import Dict import json -from bigquery_config import get_bigquery_client, get_table_id, table_exists, CONFIG -from bigquery_helpers import log_update, load_to_bigquery, read_from_bigquery, save_etl_metadata +from scripts.bigquery_config import get_bigquery_client, get_table_id, table_exists, CONFIG +from scripts.bigquery_helpers import log_update, load_to_bigquery, read_from_bigquery, save_etl_metadata from google.cloud import bigquery @@ -42,7 +35,7 @@ class DataSource(ABC): transform_data() → Transform ke format standar validate_data() → Cek kualitas data load_to_database() → Load ke RAW layer (Bronze) - save_metadata() → Simpan metadata ke STAGING layer (Silver) + save_metadata() → Simpan metadata ke AUDIT layer Subclass wajib implement: fetch_data() @@ -50,22 +43,15 @@ class DataSource(ABC): """ def __init__(self, client: bigquery.Client = None): - """ - Initialize DataSource dengan BigQuery client. - - Args: - client: BigQuery client (jika None, akan dibuat baru) - """ self.client = client if client else get_bigquery_client() self.logger = logging.getLogger(self.__class__.__name__) self.logger.propagate = False self.data = None self.table_name = None - self.target_layer = "bronze" # RAW layer — default untuk semua data sources + self.target_layer = "bronze" self.asean_countries = CONFIG['asean_countries'] - # Metadata untuk tracking reproducibility (disimpan ke STAGING/Silver) self.metadata = { 'source_class' : self.__class__.__name__, 'table_name' : None, @@ -84,35 +70,13 @@ class DataSource(ABC): @abstractmethod def fetch_data(self) -> pd.DataFrame: - """ - Extract data mentah dari sumber eksternal. - WAJIB diimplementasikan oleh subclass. - """ pass @abstractmethod def transform_data(self, df: pd.DataFrame) -> pd.DataFrame: - """ - Transform data ke format standar sebelum load ke RAW layer. - WAJIB diimplementasikan oleh subclass. - """ pass def validate_data(self, df: pd.DataFrame) -> Dict: - """ - Validasi kualitas data hasil transform sebelum load ke RAW layer. - - Metrics yang dihitung: - total_rows, total_columns — dimensi data - null_count, null_percentage — kelengkapan per kolom - duplicate_count — duplikasi data - completeness_pct — persentase kelengkapan keseluruhan - memory_usage_mb — ukuran data di memori - year_range — rentang tahun (jika ada kolom year) - - Returns: - Dict: Validation metrics - """ validation = { 'total_rows' : int(len(df)), 'total_columns' : int(len(df.columns)), @@ -126,7 +90,6 @@ class DataSource(ABC): 'memory_usage_mb' : float(round(df.memory_usage(deep=True).sum() / 1024**2, 2)) } - # Deteksi kolom year untuk year range info year_cols = [col for col in df.columns if 'year' in col.lower() or 'tahun' in col.lower()] if year_cols: year_col = year_cols[0] @@ -139,35 +102,18 @@ class DataSource(ABC): return validation def load_to_database(self, df: pd.DataFrame, table_name: str): - """ - Load data ke RAW layer (Bronze) dengan full refresh strategy. - - Kimball context: - RAW layer adalah landing zone pertama untuk data mentah dari sumber. - Menggunakan WRITE_TRUNCATE (full refresh) karena data sumber - bisa berubah setiap kali pipeline dijalankan. - - Args: - df : DataFrame hasil transform - table_name : Nama table tujuan di RAW layer (Bronze) - - Audit: - Setiap load dicatat ke etl_logs di STAGING layer (Silver) - """ try: - # Load ke RAW layer (Bronze) — full refresh load_to_bigquery( self.client, df, table_name, - layer='bronze', # RAW layer - write_disposition="WRITE_TRUNCATE" # Full refresh + layer='bronze', + write_disposition="WRITE_TRUNCATE" ) - # Audit log ke STAGING layer (Silver) log_update( self.client, - layer='RAW', # Label Kimball + layer='RAW', table_name=table_name, update_method='full_refresh', rows_affected=len(df) @@ -186,49 +132,20 @@ class DataSource(ABC): raise def save_metadata(self): - """ - Simpan metadata eksekusi ETL ke STAGING layer (Silver). - - Kimball context: - ETL metadata (execution time, row counts, completeness, dll.) - disimpan di Staging layer sebagai operational/audit table, - bukan bagian dari Star Schema di DW layer. - - Metadata yang disimpan: - source_class, table_name, execution_timestamp, - duration_seconds, rows_fetched/transformed/loaded, - completeness_pct, config_snapshot, validation_metrics - """ try: self.metadata['table_name'] = self.table_name - # Pastikan validation_metrics dalam format JSON string if isinstance(self.metadata.get('validation_metrics'), dict): self.metadata['validation_metrics'] = json.dumps( self.metadata['validation_metrics'] ) - # Save ke STAGING layer (Silver) via helper save_etl_metadata(self.client, self.metadata) except Exception as e: - # Silent fail — metadata tracking tidak boleh menghentikan proses ETL - self.logger.warning(f"Failed to save ETL metadata to STAGING: {str(e)}") + self.logger.warning(f"Failed to save ETL metadata to AUDIT: {str(e)}") def run(self) -> pd.DataFrame: - """ - Jalankan full ETL pipeline: Extract → Transform → Validate → Load → Metadata. - - Kimball ETL steps: - 1. EXTRACT — fetch_data() : Ambil dari sumber eksternal - 2. TRANSFORM — transform_data() : Standardize format - 3. VALIDATE — validate_data() : Cek kualitas - 4. LOAD — load_to_database() : Load ke RAW layer (Bronze) - 5. METADATA — save_metadata() : Simpan ke STAGING layer (Silver) - - Returns: - pd.DataFrame: Data yang sudah di-load ke RAW layer - """ start_time = datetime.now() self.metadata['execution_timestamp'] = start_time @@ -254,7 +171,7 @@ class DataSource(ABC): self.load_to_database(self.data, self.table_name) self.metadata['rows_loaded'] = len(self.data) - # 5. METADATA → STAGING layer (Silver) + # 5. METADATA → AUDIT layer end_time = datetime.now() self.metadata['duration_seconds'] = (end_time - start_time).total_seconds() self.save_metadata() @@ -267,5 +184,5 @@ class DataSource(ABC): print("DataSource base class loaded — Kimball DW Architecture") print(" Default target layer : RAW (Bronze)") -print(" Audit logs : STAGING (Silver) via etl_logs") -print(" ETL metadata : STAGING (Silver) via etl_metadata") \ No newline at end of file +print(" Audit logs : AUDIT via etl_logs") +print(" ETL metadata : AUDIT via etl_metadata") \ No newline at end of file diff --git a/scripts/bigquery_helpers.py b/scripts/bigquery_helpers.py index 5ad07b3..b9f2837 100644 --- a/scripts/bigquery_helpers.py +++ b/scripts/bigquery_helpers.py @@ -77,7 +77,7 @@ def log_update(client: bigquery.Client, layer: str, table_name: str, job.result() except Exception as e: - print(f" Warning: Failed to write etl_logs [STAGING]: {e}") + print(f" Warning: Failed to write etl_logs [AUDIT]: {e}") def load_to_bigquery(client: bigquery.Client, df: pd.DataFrame, @@ -223,4 +223,4 @@ def save_etl_metadata(client: bigquery.Client, metadata: dict): print(f"etl_metadata — first run | created_at : {created_at}") else: print(f"etl_metadata — preserved | created_at : {created_at}") - print(f"etl_metadata — updated_at : {current_time}") + print(f"etl_metadata — updated_at : {current_time}") \ No newline at end of file diff --git a/scripts/bigquery_raw_layer.py b/scripts/bigquery_raw_layer.py index 8f7596e..fafa649 100644 --- a/scripts/bigquery_raw_layer.py +++ b/scripts/bigquery_raw_layer.py @@ -33,8 +33,8 @@ from pathlib import Path from difflib import SequenceMatcher import re -from bigquery_config import get_bigquery_client, CONFIG, EXPORTS_DIR, LOGS_DIR, get_table_id -from bigquery_helpers import ( +from scripts.bigquery_config import get_bigquery_client, CONFIG, EXPORTS_DIR, LOGS_DIR, get_table_id +from scripts.bigquery_helpers import ( log_update, load_to_bigquery, read_from_bigquery, @@ -42,9 +42,10 @@ from bigquery_helpers import ( save_etl_metadata, get_staging_schema ) -from bigquery_datasource import DataSource +from scripts.bigquery_datasource import DataSource from google.cloud import bigquery + # INDICATOR MATCHER class IndicatorMatcher: @@ -200,7 +201,7 @@ class IndicatorMatcher: class FAODataSource(DataSource): """ FAO Food Security Data Source (BigQuery version) - FIXED: Menggunakan bulk download karena faostat API butuh autentikasi + Menggunakan bulk download karena faostat API butuh autentikasi """ def __init__(self, client: bigquery.Client = None): @@ -447,28 +448,22 @@ class StagingDataIntegration: } def load_raw_data(self) -> Dict[str, pd.DataFrame]: - """Load data dari semua tabel RAW layer (Bronze)""" raw_data = {} - try: raw_data['fao'] = read_from_bigquery(self.client, 'raw_fao', layer='bronze') except Exception: raw_data['fao'] = pd.DataFrame() - try: raw_data['worldbank'] = read_from_bigquery(self.client, 'raw_worldbank', layer='bronze') except Exception: raw_data['worldbank'] = pd.DataFrame() - try: raw_data['unicef'] = read_from_bigquery(self.client, 'raw_unicef', layer='bronze') except Exception: raw_data['unicef'] = pd.DataFrame() - return raw_data def clean_value(self, value): - """Clean dan convert value ke float""" if pd.isna(value): return None value_str = str(value).strip().replace('<', '').replace('>', '').strip() @@ -478,18 +473,9 @@ class StagingDataIntegration: return None def process_year_range(self, year_value): - """ - Process year range dan return (year_int, year_range_str) - Examples: - "2020" → (2020, "2020") - "2020-2021" → (2020, "2020-2021") - "2019–2021" → (2020, "2019-2021") - """ if pd.isna(year_value): return None, None - year_str = str(year_value).strip().replace('–', '-').replace('—', '-') - if '-' in year_str: try: parts = year_str.split('-') @@ -509,7 +495,6 @@ class StagingDataIntegration: return None, year_str def truncate_string(self, value, max_length: int) -> str: - """Truncate string sesuai varchar constraint""" if pd.isna(value): return '' s = str(value).strip() @@ -519,7 +504,6 @@ class StagingDataIntegration: indicator_orig_col: str, indicator_std_col: str, country_col: str, year_col: str, value_col: str, unit_col: str = None) -> pd.DataFrame: - """Standardize dataframe ke schema staging_integrated""" if df.empty: return pd.DataFrame() @@ -543,10 +527,9 @@ class StagingDataIntegration: }) def standardize_schema(self, raw_data: Dict[str, pd.DataFrame]) -> pd.DataFrame: - """Standardize schema dari semua sumber data""" integrated_data = [] - # FAO — deteksi kolom (nama asli atau sudah di-rename) + # FAO if not raw_data['fao'].empty: df = raw_data['fao'].copy() integrated_data.append(self.standardize_dataframe( @@ -590,11 +573,9 @@ class StagingDataIntegration: df_integrated = pd.concat(integrated_data, ignore_index=True) - # Final type conversion df_integrated['year'] = pd.to_numeric(df_integrated['year'], errors='coerce') df_integrated['value'] = pd.to_numeric(df_integrated['value'], errors='coerce') - # Enforce varchar constraints for col, max_len in [('source', 20), ('country', 100), ('indicator_original', 255), ('indicator_standardized', 255), ('year_range', 20), ('unit', 20)]: df_integrated[col] = df_integrated[col].astype(str).apply( @@ -606,7 +587,6 @@ class StagingDataIntegration: ).reset_index(drop=True) def validate_data(self, df: pd.DataFrame) -> Dict: - """Validate data dan return metrics""" validation = { 'total_rows' : int(len(df)), 'total_columns' : int(len(df.columns)), @@ -621,15 +601,12 @@ class StagingDataIntegration: 'max' : int(df['year'].max()) if not df['year'].isnull().all() else None, 'unique_years': int(df['year'].nunique()) } - if 'source' in df.columns: validation['source_breakdown'] = { str(k): int(v) for k, v in df['source'].value_counts().to_dict().items() } - if 'indicator_standardized' in df.columns: validation['unique_indicators'] = int(df['indicator_standardized'].nunique()) - if 'country' in df.columns: validation['unique_countries'] = int(df['country'].nunique()) @@ -645,21 +622,15 @@ class StagingDataIntegration: return validation def save_to_staging(self, df: pd.DataFrame): - """Save data ke staging_integrated table di STAGING layer (Silver)""" try: schema = get_staging_schema() - load_to_bigquery( - self.client, - df, - self.staging_table, - layer='silver', # → fs_asean_silver + self.client, df, self.staging_table, + layer='silver', write_disposition="WRITE_TRUNCATE", schema=schema ) - log_update(self.client, 'STAGING', self.staging_table, 'full_refresh', len(df)) - except Exception as e: print(f"save_to_staging FAILED: {type(e).__name__}: {e}") log_update(self.client, 'STAGING', self.staging_table, 'full_refresh', 0, @@ -667,7 +638,6 @@ class StagingDataIntegration: raise def run(self) -> pd.DataFrame: - """Run staging integration process""" self.metadata['start_time'] = datetime.now() try: @@ -703,7 +673,6 @@ class StagingDataIntegration: save_etl_metadata(self.client, self.metadata) - # Summary print(f" ✓ Staging Integration completed: {len(df_integrated):,} rows") print(f" Duration : {self.metadata['duration_seconds']:.2f}s") if 'source_breakdown' in validation: @@ -725,7 +694,6 @@ class StagingDataIntegration: print(f" - country max length : {schema_val['country_max_length']}/100") print(f" - year_range max length : {schema_val['year_range_max_length']}/20") print(f" - unit max length : {schema_val['unit_max_length']}/20") - print(f"\n Metadata → [AUDIT] etl_metadata") return df_integrated @@ -734,6 +702,7 @@ class StagingDataIntegration: self.logger.error(f"Staging integration failed: {str(e)}") raise + # MAIN EXECUTION if __name__ == "__main__": @@ -745,11 +714,9 @@ if __name__ == "__main__": logger = setup_logging() client = get_bigquery_client() - # ── FAO ────────────────────────────────────────────────────────────────── print("\n[1/4] Loading FAO Food Security Data → RAW (Bronze)...") fao_source = FAODataSource(client) df_fao = fao_source.run() - print(f" ✓ raw_fao: {len(df_fao):,} rows") print(f" Indicators : {df_fao['indicator'].nunique()}") print(f" Countries : {df_fao['country'].nunique()}") @@ -757,28 +724,23 @@ if __name__ == "__main__": fao_indicators = df_fao['indicator'].unique() - # ── World Bank ──────────────────────────────────────────────────────────── print("\n[2/4] Loading World Bank Data → RAW (Bronze)...") wb_source = WorldBankDataSource(client, list(fao_indicators)) df_wb = wb_source.run() - print(f" ✓ raw_worldbank: {len(df_wb):,} rows") print(f" Matched indicators : {df_wb['indicator_fao'].nunique()}") print(f" Countries : {df_wb['country'].nunique()}") if len(df_wb) > 0: print(f" Year range : {df_wb['year'].min()}–{df_wb['year'].max()}") - # ── UNICEF ──────────────────────────────────────────────────────────────── print("\n[3/4] Loading UNICEF Data → RAW (Bronze)...") unicef_source = UNICEFDataSource(client, list(fao_indicators)) df_unicef = unicef_source.run() - print(f" ✓ raw_unicef: {len(df_unicef):,} rows") if len(df_unicef) > 0: print(f" Matched indicators : {df_unicef['indicator_fao'].nunique()}") print(f" Countries : {df_unicef['country'].nunique()}") - # ── Staging Integration ─────────────────────────────────────────────────── print("\n[4/4] Staging Integration → STAGING (Silver)...") staging = StagingDataIntegration(client) df_staging = staging.run() @@ -789,7 +751,8 @@ if __name__ == "__main__": print(f"STAGING (Silver) : staging_integrated") print(f"AUDIT : etl_logs, etl_metadata") print("=" * 60) - + + # AIRFLOW TASK FUNCTIONS def run_verify_connection():