226 lines
9.2 KiB
Python
226 lines
9.2 KiB
Python
"""
|
|
BIGQUERY HELPER FUNCTIONS
|
|
Kimball Data Warehouse Architecture
|
|
"""
|
|
|
|
import pandas as pd
|
|
import logging
|
|
from datetime import datetime
|
|
import pytz
|
|
from google.cloud import bigquery
|
|
from scripts.bigquery_config import (
|
|
get_bigquery_client,
|
|
get_table_id,
|
|
table_exists,
|
|
CONFIG
|
|
)
|
|
import json
|
|
|
|
# LOGGING SETUP
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
def setup_logging(log_file: str = 'logs/etl_pipeline.log') -> logging.Logger:
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
|
|
handlers=[
|
|
logging.FileHandler(log_file),
|
|
logging.StreamHandler()
|
|
]
|
|
)
|
|
return logging.getLogger(__name__)
|
|
|
|
|
|
def ensure_etl_logs_table(client: bigquery.Client):
|
|
if not table_exists(client, 'etl_logs', layer='audit'):
|
|
table_id = get_table_id('etl_logs', layer='audit')
|
|
schema = [
|
|
bigquery.SchemaField("id", "STRING", mode="REQUIRED"),
|
|
bigquery.SchemaField("timestamp", "DATETIME", mode="REQUIRED"),
|
|
bigquery.SchemaField("layer", "STRING", mode="REQUIRED"),
|
|
bigquery.SchemaField("table_name", "STRING", mode="REQUIRED"),
|
|
bigquery.SchemaField("update_method", "STRING", mode="REQUIRED"),
|
|
bigquery.SchemaField("rows_affected", "INTEGER", mode="NULLABLE"),
|
|
bigquery.SchemaField("status", "STRING", mode="NULLABLE"),
|
|
bigquery.SchemaField("error_message", "STRING", mode="NULLABLE"),
|
|
]
|
|
table = bigquery.Table(table_id, schema=schema)
|
|
client.create_table(table)
|
|
print(f" [AUDIT] Created table: etl_logs")
|
|
|
|
|
|
def log_update(client: bigquery.Client, layer: str, table_name: str,
|
|
update_method: str, rows_affected: int,
|
|
status: str = 'success', error_msg: str = None):
|
|
try:
|
|
ensure_etl_logs_table(client)
|
|
|
|
log_data = pd.DataFrame([{
|
|
'id' : str(pd.util.hash_pandas_object(
|
|
pd.Series([datetime.now().isoformat()])).values[0]),
|
|
'timestamp' : datetime.now(pytz.timezone('Asia/Jakarta')),
|
|
'layer' : layer.upper(),
|
|
'table_name' : table_name,
|
|
'update_method': update_method,
|
|
'rows_affected': rows_affected,
|
|
'status' : status,
|
|
'error_message': error_msg
|
|
}])
|
|
|
|
log_data['timestamp'] = pd.to_datetime(log_data['timestamp']).dt.tz_localize(None)
|
|
log_data['id'] = log_data['id'].astype(str)
|
|
|
|
table_id = get_table_id('etl_logs', layer='audit')
|
|
job_config = bigquery.LoadJobConfig(write_disposition="WRITE_APPEND")
|
|
job = client.load_table_from_dataframe(log_data, table_id, job_config=job_config)
|
|
job.result()
|
|
|
|
except Exception as e:
|
|
print(f" Warning: Failed to write etl_logs [AUDIT]: {e}")
|
|
|
|
|
|
def load_to_bigquery(client: bigquery.Client, df: pd.DataFrame,
|
|
table_name: str, layer: str = "bronze",
|
|
write_disposition: str = "WRITE_TRUNCATE",
|
|
schema: list = None) -> int:
|
|
table_id = get_table_id(table_name, layer)
|
|
job_config = bigquery.LoadJobConfig(
|
|
write_disposition=write_disposition,
|
|
autodetect=True if schema is None else False,
|
|
schema=schema
|
|
)
|
|
|
|
job = client.load_table_from_dataframe(df, table_id, job_config=job_config)
|
|
job.result()
|
|
|
|
table = client.get_table(table_id)
|
|
print(f" ✓ Loaded {table.num_rows:,} rows → [{layer.upper()}] {table_name}")
|
|
return table.num_rows
|
|
|
|
|
|
def read_from_bigquery(client: bigquery.Client,
|
|
table_name: str = None,
|
|
layer: str = "bronze",
|
|
query: str = None) -> pd.DataFrame:
|
|
if query:
|
|
return client.query(query).result().to_dataframe(create_bqstorage_client=False)
|
|
elif table_name:
|
|
table_id = get_table_id(table_name, layer)
|
|
return client.query(f"SELECT * FROM `{table_id}`").result().to_dataframe(create_bqstorage_client=False)
|
|
else:
|
|
raise ValueError("Either table_name or query must be provided")
|
|
|
|
|
|
def truncate_table(client: bigquery.Client, table_name: str, layer: str = "bronze"):
|
|
table_id = get_table_id(table_name, layer)
|
|
job = client.query(f"DELETE FROM `{table_id}` WHERE TRUE")
|
|
job.result()
|
|
print(f" Truncated [{layer.upper()}] table: {table_name}")
|
|
|
|
|
|
def drop_table(client: bigquery.Client, table_name: str, layer: str = "bronze"):
|
|
table_id = get_table_id(table_name, layer)
|
|
client.delete_table(table_id, not_found_ok=True)
|
|
print(f" Dropped [{layer.upper()}] table: {table_name}")
|
|
|
|
|
|
def get_staging_schema() -> list:
|
|
return [
|
|
bigquery.SchemaField("source", "STRING", mode="REQUIRED"),
|
|
bigquery.SchemaField("indicator_original", "STRING", mode="REQUIRED"),
|
|
bigquery.SchemaField("indicator_standardized", "STRING", mode="REQUIRED"),
|
|
bigquery.SchemaField("country", "STRING", mode="REQUIRED"),
|
|
bigquery.SchemaField("year", "INTEGER", mode="NULLABLE"),
|
|
bigquery.SchemaField("year_range", "STRING", mode="NULLABLE"),
|
|
bigquery.SchemaField("value", "FLOAT", mode="NULLABLE"),
|
|
bigquery.SchemaField("unit", "STRING", mode="NULLABLE"),
|
|
]
|
|
|
|
|
|
def get_etl_metadata_schema() -> list:
|
|
return [
|
|
bigquery.SchemaField("id", "STRING", mode="REQUIRED"),
|
|
bigquery.SchemaField("source_class", "STRING", mode="REQUIRED"),
|
|
bigquery.SchemaField("table_name", "STRING", mode="REQUIRED"),
|
|
bigquery.SchemaField("execution_timestamp", "DATETIME", mode="REQUIRED"),
|
|
bigquery.SchemaField("duration_seconds", "FLOAT", mode="NULLABLE"),
|
|
bigquery.SchemaField("rows_fetched", "INTEGER", mode="NULLABLE"),
|
|
bigquery.SchemaField("rows_transformed", "INTEGER", mode="NULLABLE"),
|
|
bigquery.SchemaField("rows_loaded", "INTEGER", mode="NULLABLE"),
|
|
bigquery.SchemaField("completeness_pct", "FLOAT", mode="NULLABLE"),
|
|
bigquery.SchemaField("config_snapshot", "STRING", mode="NULLABLE"),
|
|
bigquery.SchemaField("validation_metrics", "STRING", mode="NULLABLE"),
|
|
bigquery.SchemaField("created_at", "TIMESTAMP", mode="REQUIRED"),
|
|
bigquery.SchemaField("updated_at", "TIMESTAMP", mode="REQUIRED"),
|
|
]
|
|
|
|
|
|
def save_etl_metadata(client: bigquery.Client, metadata: dict):
|
|
table_name = metadata.get('table_name', 'unknown')
|
|
table_id = get_table_id('etl_metadata', layer='audit')
|
|
|
|
if not table_exists(client, 'etl_metadata', layer='audit'):
|
|
schema = get_etl_metadata_schema()
|
|
table = bigquery.Table(table_id, schema=schema)
|
|
client.create_table(table)
|
|
print(f" [AUDIT] Created table: etl_metadata")
|
|
|
|
check_query = f"""
|
|
SELECT MIN(created_at) AS first_created_at
|
|
FROM `{table_id}`
|
|
WHERE table_name = @table_name
|
|
"""
|
|
job_config_q = bigquery.QueryJobConfig(
|
|
query_parameters=[
|
|
bigquery.ScalarQueryParameter("table_name", "STRING", table_name)
|
|
]
|
|
)
|
|
|
|
try:
|
|
rows = list(client.query(check_query, job_config=job_config_q).result())
|
|
is_first_run = True
|
|
if rows and rows[0]['first_created_at'] is not None:
|
|
created_at = rows[0]['first_created_at']
|
|
is_first_run = False
|
|
else:
|
|
created_at = datetime.now()
|
|
except Exception:
|
|
created_at = datetime.now()
|
|
is_first_run = True
|
|
|
|
current_time = datetime.now()
|
|
|
|
import hashlib
|
|
record_id = hashlib.md5(
|
|
f"{metadata.get('source_class')}_{table_name}_{current_time.isoformat()}".encode()
|
|
).hexdigest()
|
|
|
|
meta_df = pd.DataFrame([{
|
|
'id' : record_id,
|
|
'source_class' : metadata.get('source_class', 'unknown'),
|
|
'table_name' : table_name,
|
|
'execution_timestamp': metadata.get('execution_timestamp', current_time),
|
|
'duration_seconds' : float(metadata.get('duration_seconds', 0)),
|
|
'rows_fetched' : int(metadata.get('rows_fetched', 0)),
|
|
'rows_transformed' : int(metadata.get('rows_transformed', 0)),
|
|
'rows_loaded' : int(metadata.get('rows_loaded', 0)),
|
|
'completeness_pct' : float(metadata.get('completeness_pct', 0)),
|
|
'config_snapshot' : metadata.get('config_snapshot', '{}'),
|
|
'validation_metrics' : metadata.get('validation_metrics', '{}'),
|
|
'created_at' : created_at,
|
|
'updated_at' : current_time
|
|
}])
|
|
|
|
for col in ['execution_timestamp', 'created_at', 'updated_at']:
|
|
meta_df[col] = pd.to_datetime(meta_df[col]).dt.tz_localize(None)
|
|
|
|
job_config = bigquery.LoadJobConfig(write_disposition="WRITE_APPEND")
|
|
job = client.load_table_from_dataframe(meta_df, table_id, job_config=job_config)
|
|
job.result()
|
|
|
|
if is_first_run:
|
|
print(f"etl_metadata — first run | created_at : {created_at}")
|
|
else:
|
|
print(f"etl_metadata — preserved | created_at : {created_at}")
|
|
print(f"etl_metadata — updated_at : {current_time}") |