From c68532c01349903fc31ce68f434cef777eda4030 Mon Sep 17 00:00:00 2001 From: Debby Date: Sun, 8 Mar 2026 10:47:56 +0700 Subject: [PATCH] remove module-level BigQuery --- scripts/bigquery_helpers.py | 206 +----------------------------------- 1 file changed, 5 insertions(+), 201 deletions(-) diff --git a/scripts/bigquery_helpers.py b/scripts/bigquery_helpers.py index b883b8b..5ad07b3 100644 --- a/scripts/bigquery_helpers.py +++ b/scripts/bigquery_helpers.py @@ -1,23 +1,6 @@ """ BIGQUERY HELPER FUNCTIONS Kimball Data Warehouse Architecture - -Layer Assignment (Kimball terminology): - RAW (Bronze) → raw_fao, raw_worldbank, raw_unicef - STAGING (Silver) → staging_integrated, cleaned_integrated - AUDIT (Audit) → etl_logs, etl_metadata - DW (Gold) → dim_*, fact_food_security, fact_food_security_eligible - -Functions: - setup_logging() — Setup file & console logging - log_update() — Audit log ETL ke staging (Silver) - save_etl_metadata() — Save ETL metadata ke staging (Silver), preserve created_at - load_to_bigquery() — Load DataFrame ke layer tertentu - read_from_bigquery() — Read dari layer tertentu - truncate_table() — Hapus semua rows dari table - drop_table() — Drop table dari layer tertentu - get_staging_schema() — Schema staging_integrated - get_etl_metadata_schema() — Schema etl_metadata """ import pandas as pd @@ -25,7 +8,7 @@ import logging from datetime import datetime import pytz from google.cloud import bigquery -from bigquery_config import ( +from scripts.bigquery_config import ( get_bigquery_client, get_table_id, table_exists, @@ -35,16 +18,9 @@ import json # LOGGING SETUP +logger = logging.getLogger(__name__) + def setup_logging(log_file: str = 'logs/etl_pipeline.log') -> logging.Logger: - """ - Setup logging system untuk tracking eksekusi ETL - - Args: - log_file: Path to log file - - Returns: - logging.Logger: Configured logger - """ logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', @@ -55,27 +31,8 @@ def setup_logging(log_file: str = 'logs/etl_pipeline.log') -> logging.Logger: ) return logging.getLogger(__name__) -# ETL AUDIT LOG — STAGING LAYER (Silver) def ensure_etl_logs_table(client: bigquery.Client): - """ - Buat table etl_logs di STAGING layer (Silver) jika belum ada. - - Kimball context: - etl_logs adalah operational/audit table, bukan bagian dari Star Schema. - Disimpan di Staging layer karena merupakan output proses ETL, - bukan data warehouse final. - - Schema: - id STRING — unique log ID - timestamp DATETIME — waktu log dibuat - layer STRING — layer yang diproses (RAW/STAGING/DW) - table_name STRING — nama table yang diproses - update_method STRING — full_refresh / incremental - rows_affected INTEGER — jumlah rows - status STRING — success / failed - error_message STRING — pesan error jika gagal - """ if not table_exists(client, 'etl_logs', layer='audit'): table_id = get_table_id('etl_logs', layer='audit') schema = [ @@ -96,28 +53,6 @@ def ensure_etl_logs_table(client: bigquery.Client): def log_update(client: bigquery.Client, layer: str, table_name: str, update_method: str, rows_affected: int, status: str = 'success', error_msg: str = None): - """ - Catat aktivitas ETL ke etl_logs (STAGING/Silver) untuk audit trail. - - Args: - client : BigQuery client - layer : Layer yang diproses — 'RAW', 'STAGING', atau 'DW' - table_name : Nama table yang diproses - update_method : 'full_refresh' atau 'incremental' - rows_affected : Jumlah rows yang diproses - status : 'success' atau 'failed' - error_msg : Pesan error jika status='failed' - - Examples: - # Log saat load raw data - log_update(client, 'RAW', 'raw_fao', 'full_refresh', 5000) - - # Log saat proses staging - log_update(client, 'STAGING', 'staging_integrated', 'full_refresh', 12000) - - # Log saat load ke DW - log_update(client, 'DW', 'fact_food_security', 'full_refresh', 8000) - """ try: ensure_etl_logs_table(client) @@ -133,7 +68,6 @@ def log_update(client: bigquery.Client, layer: str, table_name: str, 'error_message': error_msg }]) - # Hapus timezone untuk BigQuery DATETIME log_data['timestamp'] = pd.to_datetime(log_data['timestamp']).dt.tz_localize(None) log_data['id'] = log_data['id'].astype(str) @@ -145,40 +79,11 @@ def log_update(client: bigquery.Client, layer: str, table_name: str, except Exception as e: print(f" Warning: Failed to write etl_logs [STAGING]: {e}") -# DATA LOADING TO BIGQUERY def load_to_bigquery(client: bigquery.Client, df: pd.DataFrame, table_name: str, layer: str = "bronze", write_disposition: str = "WRITE_TRUNCATE", schema: list = None) -> int: - """ - Load DataFrame ke BigQuery table pada layer tertentu. - - Args: - client : BigQuery client - df : DataFrame yang akan di-load - table_name : Nama table tujuan - layer : 'bronze'/'raw', 'silver'/'staging', 'gold'/'dw' - write_disposition : WRITE_TRUNCATE (replace) atau WRITE_APPEND (append) - schema : Optional schema (list of SchemaField) - - Returns: - int: Jumlah rows yang berhasil di-load - - Examples (Kimball flow): - # RAW layer — data mentah dari sumber - load_to_bigquery(client, df_fao, 'raw_fao', layer='bronze') - load_to_bigquery(client, df_wb, 'raw_worldbank', layer='bronze') - load_to_bigquery(client, df_unicef, 'raw_unicef', layer='bronze') - - # STAGING layer — cleaned & integrated - load_to_bigquery(client, df_staging, 'staging_integrated', layer='silver') - - # DW layer — Kimball Star Schema - load_to_bigquery(client, df_dim, 'dim_country', layer='gold') - load_to_bigquery(client, df_fact, 'fact_food_security', layer='gold') - load_to_bigquery(client, df_elig, 'fact_food_security_eligible', layer='gold') - """ table_id = get_table_id(table_name, layer) job_config = bigquery.LoadJobConfig( write_disposition=write_disposition, @@ -193,36 +98,11 @@ def load_to_bigquery(client: bigquery.Client, df: pd.DataFrame, print(f" ✓ Loaded {table.num_rows:,} rows → [{layer.upper()}] {table_name}") return table.num_rows -# DATA READING FROM BIGQUERY def read_from_bigquery(client: bigquery.Client, table_name: str = None, layer: str = "bronze", query: str = None) -> pd.DataFrame: - """ - Read data dari BigQuery table atau jalankan custom query. - - Args: - client : BigQuery client - table_name : Nama table yang akan dibaca - layer : 'bronze'/'raw', 'silver'/'staging', 'gold'/'dw' - query : Custom SQL query (jika diisi, table_name diabaikan) - - Returns: - pd.DataFrame: Hasil query - - Examples (Kimball flow): - # Baca dari RAW layer - df = read_from_bigquery(client, 'raw_fao', layer='bronze') - - # Baca dari STAGING layer - df = read_from_bigquery(client, 'staging_integrated', layer='silver') - - # Baca dari DW layer - df = read_from_bigquery(client, 'fact_food_security', layer='gold') - df = read_from_bigquery(client, 'fact_food_security_eligible', layer='gold') - df = read_from_bigquery(client, 'dim_country', layer='gold') - """ if query: return client.query(query).result().to_dataframe(create_bqstorage_client=False) elif table_name: @@ -231,17 +111,8 @@ def read_from_bigquery(client: bigquery.Client, else: raise ValueError("Either table_name or query must be provided") -# TABLE MANAGEMENT def truncate_table(client: bigquery.Client, table_name: str, layer: str = "bronze"): - """ - Hapus semua rows dari table (kosongkan table, struktur tetap ada). - - Args: - client : BigQuery client - table_name : Nama table - layer : 'bronze'/'raw', 'silver'/'staging', 'gold'/'dw' - """ table_id = get_table_id(table_name, layer) job = client.query(f"DELETE FROM `{table_id}` WHERE TRUE") job.result() @@ -249,30 +120,12 @@ def truncate_table(client: bigquery.Client, table_name: str, layer: str = "bronz def drop_table(client: bigquery.Client, table_name: str, layer: str = "bronze"): - """ - Drop table dari BigQuery jika ada. - - Args: - client : BigQuery client - table_name : Nama table - layer : 'bronze'/'raw', 'silver'/'staging', 'gold'/'dw' - """ table_id = get_table_id(table_name, layer) client.delete_table(table_id, not_found_ok=True) print(f" Dropped [{layer.upper()}] table: {table_name}") -# SCHEMA DEFINITIONS — STAGING LAYER (Silver) def get_staging_schema() -> list: - """ - Schema untuk staging_integrated table (STAGING/Silver layer). - - Staging table adalah area integrasi data dari semua sumber (FAO, WB, UNICEF) - sebelum di-load ke DW layer sebagai Dim & Fact tables. - - Returns: - list: List of SchemaField objects - """ return [ bigquery.SchemaField("source", "STRING", mode="REQUIRED"), bigquery.SchemaField("indicator_original", "STRING", mode="REQUIRED"), @@ -286,15 +139,6 @@ def get_staging_schema() -> list: def get_etl_metadata_schema() -> list: - """ - Schema untuk etl_metadata table (STAGING/Silver layer). - - ETL metadata disimpan di Staging layer karena merupakan operational table - untuk reproducibility & tracking, bukan bagian Star Schema DW. - - Returns: - list: List of SchemaField objects - """ return [ bigquery.SchemaField("id", "STRING", mode="REQUIRED"), bigquery.SchemaField("source_class", "STRING", mode="REQUIRED"), @@ -311,43 +155,17 @@ def get_etl_metadata_schema() -> list: bigquery.SchemaField("updated_at", "TIMESTAMP", mode="REQUIRED"), ] -# ETL METADATA — STAGING LAYER (Silver) -# FIXED: Preserve created_at dari eksekusi pertama def save_etl_metadata(client: bigquery.Client, metadata: dict): - """ - Save ETL metadata ke etl_metadata table (STAGING/Silver layer). - - Logic created_at vs updated_at: - created_at : diambil dari record PERTAMA untuk table_name yang sama - (preserved across runs — untuk reproducibility) - updated_at : selalu diperbarui ke waktu eksekusi sekarang - - Args: - client : BigQuery client - metadata : Dict berisi informasi eksekusi ETL: - table_name (required) - source_class (required) - execution_timestamp - duration_seconds - rows_fetched - rows_transformed - rows_loaded - completeness_pct - config_snapshot (JSON string) - validation_metrics (JSON string) - """ table_name = metadata.get('table_name', 'unknown') table_id = get_table_id('etl_metadata', layer='audit') - # Buat table jika belum ada if not table_exists(client, 'etl_metadata', layer='audit'): schema = get_etl_metadata_schema() table = bigquery.Table(table_id, schema=schema) client.create_table(table) print(f" [AUDIT] Created table: etl_metadata") - # Ambil created_at pertama untuk table ini (preserve across runs) check_query = f""" SELECT MIN(created_at) AS first_created_at FROM `{table_id}` @@ -373,7 +191,6 @@ def save_etl_metadata(client: bigquery.Client, metadata: dict): current_time = datetime.now() - # Generate unique ID import hashlib record_id = hashlib.md5( f"{metadata.get('source_class')}_{table_name}_{current_time.isoformat()}".encode() @@ -391,15 +208,13 @@ def save_etl_metadata(client: bigquery.Client, metadata: dict): 'completeness_pct' : float(metadata.get('completeness_pct', 0)), 'config_snapshot' : metadata.get('config_snapshot', '{}'), 'validation_metrics' : metadata.get('validation_metrics', '{}'), - 'created_at' : created_at, # PRESERVED dari run pertama - 'updated_at' : current_time # SELALU waktu sekarang + 'created_at' : created_at, + 'updated_at' : current_time }]) - # Hapus timezone untuk BigQuery for col in ['execution_timestamp', 'created_at', 'updated_at']: meta_df[col] = pd.to_datetime(meta_df[col]).dt.tz_localize(None) - # APPEND ke STAGING layer (Silver) job_config = bigquery.LoadJobConfig(write_disposition="WRITE_APPEND") job = client.load_table_from_dataframe(meta_df, table_id, job_config=job_config) job.result() @@ -409,14 +224,3 @@ def save_etl_metadata(client: bigquery.Client, metadata: dict): else: print(f"etl_metadata — preserved | created_at : {created_at}") print(f"etl_metadata — updated_at : {current_time}") - -# INITIALIZE - -logger = setup_logging() -client = get_bigquery_client() - -print("BigQuery Helpers Loaded — Kimball DW Architecture") -print(f"Project : {CONFIG['bigquery']['project_id']}") -print(f"Raw (Bronze) : {CONFIG['bigquery']['dataset_bronze']}") -print(f"Staging (Silver) : {CONFIG['bigquery']['dataset_silver']}") -print(f"DW (Gold) : {CONFIG['bigquery']['dataset_gold']}") \ No newline at end of file