Files
airflow-coolify/scripts/bigquery_helpers.py
2026-03-08 10:47:56 +07:00

227 lines
9.2 KiB
Python

"""
BIGQUERY HELPER FUNCTIONS
Kimball Data Warehouse Architecture
"""
import pandas as pd
import logging
from datetime import datetime
import pytz
from google.cloud import bigquery
from scripts.bigquery_config import (
get_bigquery_client,
get_table_id,
table_exists,
CONFIG
)
import json
# LOGGING SETUP
logger = logging.getLogger(__name__)
def setup_logging(log_file: str = 'logs/etl_pipeline.log') -> logging.Logger:
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file),
logging.StreamHandler()
]
)
return logging.getLogger(__name__)
def ensure_etl_logs_table(client: bigquery.Client):
if not table_exists(client, 'etl_logs', layer='audit'):
table_id = get_table_id('etl_logs', layer='audit')
schema = [
bigquery.SchemaField("id", "STRING", mode="REQUIRED"),
bigquery.SchemaField("timestamp", "DATETIME", mode="REQUIRED"),
bigquery.SchemaField("layer", "STRING", mode="REQUIRED"),
bigquery.SchemaField("table_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("update_method", "STRING", mode="REQUIRED"),
bigquery.SchemaField("rows_affected", "INTEGER", mode="NULLABLE"),
bigquery.SchemaField("status", "STRING", mode="NULLABLE"),
bigquery.SchemaField("error_message", "STRING", mode="NULLABLE"),
]
table = bigquery.Table(table_id, schema=schema)
client.create_table(table)
print(f" [AUDIT] Created table: etl_logs")
def log_update(client: bigquery.Client, layer: str, table_name: str,
update_method: str, rows_affected: int,
status: str = 'success', error_msg: str = None):
try:
ensure_etl_logs_table(client)
log_data = pd.DataFrame([{
'id' : str(pd.util.hash_pandas_object(
pd.Series([datetime.now().isoformat()])).values[0]),
'timestamp' : datetime.now(pytz.timezone('Asia/Jakarta')),
'layer' : layer.upper(),
'table_name' : table_name,
'update_method': update_method,
'rows_affected': rows_affected,
'status' : status,
'error_message': error_msg
}])
log_data['timestamp'] = pd.to_datetime(log_data['timestamp']).dt.tz_localize(None)
log_data['id'] = log_data['id'].astype(str)
table_id = get_table_id('etl_logs', layer='audit')
job_config = bigquery.LoadJobConfig(write_disposition="WRITE_APPEND")
job = client.load_table_from_dataframe(log_data, table_id, job_config=job_config)
job.result()
except Exception as e:
print(f" Warning: Failed to write etl_logs [STAGING]: {e}")
def load_to_bigquery(client: bigquery.Client, df: pd.DataFrame,
table_name: str, layer: str = "bronze",
write_disposition: str = "WRITE_TRUNCATE",
schema: list = None) -> int:
table_id = get_table_id(table_name, layer)
job_config = bigquery.LoadJobConfig(
write_disposition=write_disposition,
autodetect=True if schema is None else False,
schema=schema
)
job = client.load_table_from_dataframe(df, table_id, job_config=job_config)
job.result()
table = client.get_table(table_id)
print(f" ✓ Loaded {table.num_rows:,} rows → [{layer.upper()}] {table_name}")
return table.num_rows
def read_from_bigquery(client: bigquery.Client,
table_name: str = None,
layer: str = "bronze",
query: str = None) -> pd.DataFrame:
if query:
return client.query(query).result().to_dataframe(create_bqstorage_client=False)
elif table_name:
table_id = get_table_id(table_name, layer)
return client.query(f"SELECT * FROM `{table_id}`").result().to_dataframe(create_bqstorage_client=False)
else:
raise ValueError("Either table_name or query must be provided")
def truncate_table(client: bigquery.Client, table_name: str, layer: str = "bronze"):
table_id = get_table_id(table_name, layer)
job = client.query(f"DELETE FROM `{table_id}` WHERE TRUE")
job.result()
print(f" Truncated [{layer.upper()}] table: {table_name}")
def drop_table(client: bigquery.Client, table_name: str, layer: str = "bronze"):
table_id = get_table_id(table_name, layer)
client.delete_table(table_id, not_found_ok=True)
print(f" Dropped [{layer.upper()}] table: {table_name}")
def get_staging_schema() -> list:
return [
bigquery.SchemaField("source", "STRING", mode="REQUIRED"),
bigquery.SchemaField("indicator_original", "STRING", mode="REQUIRED"),
bigquery.SchemaField("indicator_standardized", "STRING", mode="REQUIRED"),
bigquery.SchemaField("country", "STRING", mode="REQUIRED"),
bigquery.SchemaField("year", "INTEGER", mode="NULLABLE"),
bigquery.SchemaField("year_range", "STRING", mode="NULLABLE"),
bigquery.SchemaField("value", "FLOAT", mode="NULLABLE"),
bigquery.SchemaField("unit", "STRING", mode="NULLABLE"),
]
def get_etl_metadata_schema() -> list:
return [
bigquery.SchemaField("id", "STRING", mode="REQUIRED"),
bigquery.SchemaField("source_class", "STRING", mode="REQUIRED"),
bigquery.SchemaField("table_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("execution_timestamp", "DATETIME", mode="REQUIRED"),
bigquery.SchemaField("duration_seconds", "FLOAT", mode="NULLABLE"),
bigquery.SchemaField("rows_fetched", "INTEGER", mode="NULLABLE"),
bigquery.SchemaField("rows_transformed", "INTEGER", mode="NULLABLE"),
bigquery.SchemaField("rows_loaded", "INTEGER", mode="NULLABLE"),
bigquery.SchemaField("completeness_pct", "FLOAT", mode="NULLABLE"),
bigquery.SchemaField("config_snapshot", "STRING", mode="NULLABLE"),
bigquery.SchemaField("validation_metrics", "STRING", mode="NULLABLE"),
bigquery.SchemaField("created_at", "TIMESTAMP", mode="REQUIRED"),
bigquery.SchemaField("updated_at", "TIMESTAMP", mode="REQUIRED"),
]
def save_etl_metadata(client: bigquery.Client, metadata: dict):
table_name = metadata.get('table_name', 'unknown')
table_id = get_table_id('etl_metadata', layer='audit')
if not table_exists(client, 'etl_metadata', layer='audit'):
schema = get_etl_metadata_schema()
table = bigquery.Table(table_id, schema=schema)
client.create_table(table)
print(f" [AUDIT] Created table: etl_metadata")
check_query = f"""
SELECT MIN(created_at) AS first_created_at
FROM `{table_id}`
WHERE table_name = @table_name
"""
job_config_q = bigquery.QueryJobConfig(
query_parameters=[
bigquery.ScalarQueryParameter("table_name", "STRING", table_name)
]
)
try:
rows = list(client.query(check_query, job_config=job_config_q).result())
is_first_run = True
if rows and rows[0]['first_created_at'] is not None:
created_at = rows[0]['first_created_at']
is_first_run = False
else:
created_at = datetime.now()
except Exception:
created_at = datetime.now()
is_first_run = True
current_time = datetime.now()
import hashlib
record_id = hashlib.md5(
f"{metadata.get('source_class')}_{table_name}_{current_time.isoformat()}".encode()
).hexdigest()
meta_df = pd.DataFrame([{
'id' : record_id,
'source_class' : metadata.get('source_class', 'unknown'),
'table_name' : table_name,
'execution_timestamp': metadata.get('execution_timestamp', current_time),
'duration_seconds' : float(metadata.get('duration_seconds', 0)),
'rows_fetched' : int(metadata.get('rows_fetched', 0)),
'rows_transformed' : int(metadata.get('rows_transformed', 0)),
'rows_loaded' : int(metadata.get('rows_loaded', 0)),
'completeness_pct' : float(metadata.get('completeness_pct', 0)),
'config_snapshot' : metadata.get('config_snapshot', '{}'),
'validation_metrics' : metadata.get('validation_metrics', '{}'),
'created_at' : created_at,
'updated_at' : current_time
}])
for col in ['execution_timestamp', 'created_at', 'updated_at']:
meta_df[col] = pd.to_datetime(meta_df[col]).dt.tz_localize(None)
job_config = bigquery.LoadJobConfig(write_disposition="WRITE_APPEND")
job = client.load_table_from_dataframe(meta_df, table_id, job_config=job_config)
job.result()
if is_first_run:
print(f"etl_metadata — first run | created_at : {created_at}")
else:
print(f"etl_metadata — preserved | created_at : {created_at}")
print(f"etl_metadata — updated_at : {current_time}")