""" BIGQUERY HELPER FUNCTIONS Kimball Data Warehouse Architecture """ import pandas as pd import logging from datetime import datetime import pytz from google.cloud import bigquery from scripts.bigquery_config import ( get_bigquery_client, get_table_id, table_exists, CONFIG ) import json # LOGGING SETUP logger = logging.getLogger(__name__) def setup_logging(log_file: str = 'logs/etl_pipeline.log') -> logging.Logger: logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(log_file), logging.StreamHandler() ] ) return logging.getLogger(__name__) def ensure_etl_logs_table(client: bigquery.Client): if not table_exists(client, 'etl_logs', layer='audit'): table_id = get_table_id('etl_logs', layer='audit') schema = [ bigquery.SchemaField("id", "STRING", mode="REQUIRED"), bigquery.SchemaField("timestamp", "DATETIME", mode="REQUIRED"), bigquery.SchemaField("layer", "STRING", mode="REQUIRED"), bigquery.SchemaField("table_name", "STRING", mode="REQUIRED"), bigquery.SchemaField("update_method", "STRING", mode="REQUIRED"), bigquery.SchemaField("rows_affected", "INTEGER", mode="NULLABLE"), bigquery.SchemaField("status", "STRING", mode="NULLABLE"), bigquery.SchemaField("error_message", "STRING", mode="NULLABLE"), ] table = bigquery.Table(table_id, schema=schema) client.create_table(table) print(f" [AUDIT] Created table: etl_logs") def log_update(client: bigquery.Client, layer: str, table_name: str, update_method: str, rows_affected: int, status: str = 'success', error_msg: str = None): try: ensure_etl_logs_table(client) log_data = pd.DataFrame([{ 'id' : str(pd.util.hash_pandas_object( pd.Series([datetime.now().isoformat()])).values[0]), 'timestamp' : datetime.now(pytz.timezone('Asia/Jakarta')), 'layer' : layer.upper(), 'table_name' : table_name, 'update_method': update_method, 'rows_affected': rows_affected, 'status' : status, 'error_message': error_msg }]) log_data['timestamp'] = pd.to_datetime(log_data['timestamp']).dt.tz_localize(None) log_data['id'] = log_data['id'].astype(str) table_id = get_table_id('etl_logs', layer='audit') job_config = bigquery.LoadJobConfig(write_disposition="WRITE_APPEND") job = client.load_table_from_dataframe(log_data, table_id, job_config=job_config) job.result() except Exception as e: print(f" Warning: Failed to write etl_logs [AUDIT]: {e}") def load_to_bigquery(client: bigquery.Client, df: pd.DataFrame, table_name: str, layer: str = "bronze", write_disposition: str = "WRITE_TRUNCATE", schema: list = None) -> int: table_id = get_table_id(table_name, layer) job_config = bigquery.LoadJobConfig( write_disposition=write_disposition, autodetect=True if schema is None else False, schema=schema ) job = client.load_table_from_dataframe(df, table_id, job_config=job_config) job.result() table = client.get_table(table_id) print(f" ✓ Loaded {table.num_rows:,} rows → [{layer.upper()}] {table_name}") return table.num_rows def read_from_bigquery(client: bigquery.Client, table_name: str = None, layer: str = "bronze", query: str = None) -> pd.DataFrame: if query: return client.query(query).result().to_dataframe(create_bqstorage_client=False) elif table_name: table_id = get_table_id(table_name, layer) return client.query(f"SELECT * FROM `{table_id}`").result().to_dataframe(create_bqstorage_client=False) else: raise ValueError("Either table_name or query must be provided") def truncate_table(client: bigquery.Client, table_name: str, layer: str = "bronze"): table_id = get_table_id(table_name, layer) job = client.query(f"DELETE FROM `{table_id}` WHERE TRUE") job.result() print(f" Truncated [{layer.upper()}] table: {table_name}") def drop_table(client: bigquery.Client, table_name: str, layer: str = "bronze"): table_id = get_table_id(table_name, layer) client.delete_table(table_id, not_found_ok=True) print(f" Dropped [{layer.upper()}] table: {table_name}") def get_staging_schema() -> list: return [ bigquery.SchemaField("source", "STRING", mode="REQUIRED"), bigquery.SchemaField("indicator_original", "STRING", mode="REQUIRED"), bigquery.SchemaField("indicator_standardized", "STRING", mode="REQUIRED"), bigquery.SchemaField("country", "STRING", mode="REQUIRED"), bigquery.SchemaField("year", "INTEGER", mode="NULLABLE"), bigquery.SchemaField("year_range", "STRING", mode="NULLABLE"), bigquery.SchemaField("value", "FLOAT", mode="NULLABLE"), bigquery.SchemaField("unit", "STRING", mode="NULLABLE"), ] def get_etl_metadata_schema() -> list: return [ bigquery.SchemaField("id", "STRING", mode="REQUIRED"), bigquery.SchemaField("source_class", "STRING", mode="REQUIRED"), bigquery.SchemaField("table_name", "STRING", mode="REQUIRED"), bigquery.SchemaField("execution_timestamp", "DATETIME", mode="REQUIRED"), bigquery.SchemaField("duration_seconds", "FLOAT", mode="NULLABLE"), bigquery.SchemaField("rows_fetched", "INTEGER", mode="NULLABLE"), bigquery.SchemaField("rows_transformed", "INTEGER", mode="NULLABLE"), bigquery.SchemaField("rows_loaded", "INTEGER", mode="NULLABLE"), bigquery.SchemaField("completeness_pct", "FLOAT", mode="NULLABLE"), bigquery.SchemaField("config_snapshot", "STRING", mode="NULLABLE"), bigquery.SchemaField("validation_metrics", "STRING", mode="NULLABLE"), bigquery.SchemaField("created_at", "TIMESTAMP", mode="REQUIRED"), bigquery.SchemaField("updated_at", "TIMESTAMP", mode="REQUIRED"), ] def save_etl_metadata(client: bigquery.Client, metadata: dict): table_name = metadata.get('table_name', 'unknown') table_id = get_table_id('etl_metadata', layer='audit') if not table_exists(client, 'etl_metadata', layer='audit'): schema = get_etl_metadata_schema() table = bigquery.Table(table_id, schema=schema) client.create_table(table) print(f" [AUDIT] Created table: etl_metadata") check_query = f""" SELECT MIN(created_at) AS first_created_at FROM `{table_id}` WHERE table_name = @table_name """ job_config_q = bigquery.QueryJobConfig( query_parameters=[ bigquery.ScalarQueryParameter("table_name", "STRING", table_name) ] ) try: rows = list(client.query(check_query, job_config=job_config_q).result()) is_first_run = True if rows and rows[0]['first_created_at'] is not None: created_at = rows[0]['first_created_at'] is_first_run = False else: created_at = datetime.now() except Exception: created_at = datetime.now() is_first_run = True current_time = datetime.now() import hashlib record_id = hashlib.md5( f"{metadata.get('source_class')}_{table_name}_{current_time.isoformat()}".encode() ).hexdigest() meta_df = pd.DataFrame([{ 'id' : record_id, 'source_class' : metadata.get('source_class', 'unknown'), 'table_name' : table_name, 'execution_timestamp': metadata.get('execution_timestamp', current_time), 'duration_seconds' : float(metadata.get('duration_seconds', 0)), 'rows_fetched' : int(metadata.get('rows_fetched', 0)), 'rows_transformed' : int(metadata.get('rows_transformed', 0)), 'rows_loaded' : int(metadata.get('rows_loaded', 0)), 'completeness_pct' : float(metadata.get('completeness_pct', 0)), 'config_snapshot' : metadata.get('config_snapshot', '{}'), 'validation_metrics' : metadata.get('validation_metrics', '{}'), 'created_at' : created_at, 'updated_at' : current_time }]) for col in ['execution_timestamp', 'created_at', 'updated_at']: meta_df[col] = pd.to_datetime(meta_df[col]).dt.tz_localize(None) job_config = bigquery.LoadJobConfig(write_disposition="WRITE_APPEND") job = client.load_table_from_dataframe(meta_df, table_id, job_config=job_config) job.result() if is_first_run: print(f"etl_metadata — first run | created_at : {created_at}") else: print(f"etl_metadata — preserved | created_at : {created_at}") print(f"etl_metadata — updated_at : {current_time}")