Files
airflow-coolify/scripts/bigquery_config.py
2026-03-07 14:28:11 +07:00

298 lines
8.8 KiB
Python

"""
BIGQUERY CONFIGURATION FOR FOOD SECURITY DATA INTEGRATION
Kimball Data Warehouse Architecture
Dataset Naming:
- Bronze (fs_asean_bronze) : Raw layer — data as-is dari sumber
- Silver (fs_asean_silver) : Staging layer — staging_integrated, cleaned_integrated
- Audit (fs_asean_audit) : Audit layer — etl_logs, etl_metadata
- Gold (fs_asean_gold) : DW layer — Dim & Fact tables (Kimball Star Schema)
Kimball ETL Flow:
Source Data
RAW (Bronze) → raw_fao, raw_worldbank, raw_unicef
STAGING (Silver) → staging_integrated, cleaned_integrated
DATA WAREHOUSE (Gold) → dim_*, fact_food_security, fact_food_security_eligible
AUDIT (fs_asean_audit) → etl_logs, etl_metadata [semua layer log ke sini]
"""
import os
from pathlib import Path
from google.cloud import bigquery
from google.oauth2 import service_account
# BIGQUERY CONFIGURATION
CREDENTIALS_PATH = os.environ.get(
"GOOGLE_APPLICATION_CREDENTIALS",
"/opt/airflow/secrets/food-security-asean-project-826a4d7b302a.json"
)
PROJECT_ID = "food-security-asean-project"
LOCATION = "asia-southeast2"
# SETUP BIGQUERY CLIENT
def get_bigquery_client() -> bigquery.Client:
"""
Create BigQuery client.
Priority:
1. GOOGLE_CREDENTIALS_JSON env variable (Coolify/production)
2. GOOGLE_APPLICATION_CREDENTIALS file path (lokal/development)
"""
credentials_json = os.environ.get("GOOGLE_CREDENTIALS_JSON")
if credentials_json:
credentials_dict = json.loads(credentials_json)
credentials = service_account.Credentials.from_service_account_info(
credentials_dict,
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
else:
credentials = service_account.Credentials.from_service_account_file(
CREDENTIALS_PATH,
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
return bigquery.Client(
credentials=credentials,
project=PROJECT_ID,
location=LOCATION
)
# DATASET IDs
# Bronze = Raw Layer | Silver = Staging Layer | Gold = DW Layer (Kimball)
DATASET_BRONZE = "fs_asean_bronze" # Raw layer — data mentah dari sumber
DATASET_SILVER = "fs_asean_silver" # Staging layer — staging_integrated, cleaned_integrated
DATASET_AUDIT = "fs_asean_audit" # Audit layer — etl_logs, etl_metadata
DATASET_GOLD = "fs_asean_gold" # DW layer — Dim & Fact (Star Schema)
# Mapping layer name → dataset id
LAYER_DATASET_MAP = {
"bronze" : DATASET_BRONZE, # Raw
"silver" : DATASET_SILVER, # Staging, Cleaned
"audit" : DATASET_AUDIT, # Audit/Logs
"gold" : DATASET_GOLD, # DW
}
# Alias Kimball terminology → layer (untuk readability di file lain)
KIMBALL_LAYER_MAP = {
"raw" : "bronze",
"staging" : "silver",
"logs" : "audit",
"dw" : "gold",
}
# SETUP BIGQUERY CLIENT
def get_bigquery_client() -> bigquery.Client:
"""
Create BigQuery client dengan service account credentials
Returns:
bigquery.Client: Authenticated BigQuery client
"""
credentials = service_account.Credentials.from_service_account_file(
CREDENTIALS_PATH,
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
return bigquery.Client(
credentials=credentials,
project=PROJECT_ID,
location=LOCATION
)
# MATCHING CONFIGURATION
CONFIG = {
"bigquery": {
"project_id" : PROJECT_ID,
"dataset_bronze" : DATASET_BRONZE,
"dataset_silver" : DATASET_SILVER,
"dataset_audit" : DATASET_AUDIT,
"dataset_gold" : DATASET_GOLD,
"location" : LOCATION,
"credentials_path": CREDENTIALS_PATH
},
"matching": {
"threshold": 0.70,
"weights": {
"keyword" : 0.50,
"string_similarity" : 0.30,
"word_overlap" : 0.20
},
"penalties": {
"qualifier_mismatch" : 0.85,
"severity_mismatch" : 0.80,
"target_mismatch" : 0.90,
"service_level_mismatch": 0.88
}
},
"asean_countries": [
"Brunei Darussalam",
"Cambodia",
"Indonesia",
"Lao People's Democratic Republic",
"Malaysia",
"Myanmar",
"Philippines",
"Singapore",
"Thailand",
"Viet Nam"
],
"asean_iso_codes": ["BRN", "KHM", "IDN", "LAO", "MYS", "MMR", "PHL", "SGP", "THA", "VNM"],
"unicef_datasets": {
"WASH_HOUSEHOLDS": "Water, Sanitation & Hygiene",
"NUTRITION" : "Child Nutrition",
"EDUCATION" : "Education",
"HIV_AIDS" : "HIV/AIDS"
}
}
# DIRECTORY SETUP
BASE_DIR = Path.cwd()
EXPORTS_DIR = BASE_DIR / 'exports'
LOGS_DIR = BASE_DIR / 'logs'
for directory in [EXPORTS_DIR, LOGS_DIR]:
directory.mkdir(exist_ok=True)
# HELPER FUNCTIONS
def get_table_id(table_name: str, layer: str = "bronze") -> str:
# Resolve Kimball alias ke layer name
resolved = KIMBALL_LAYER_MAP.get(layer.lower(), layer.lower())
dataset = LAYER_DATASET_MAP.get(resolved, DATASET_BRONZE)
return f"{PROJECT_ID}.{dataset}.{table_name}"
def table_exists(client: bigquery.Client, table_name: str, layer: str = "bronze") -> bool:
"""
Check apakah table ada di BigQuery
Args:
client : BigQuery client
table_name : Nama table
layer : Layer — 'bronze'/'raw', 'silver'/'staging', 'gold'/'dw'
Returns:
bool: True jika table ada
"""
try:
client.get_table(get_table_id(table_name, layer))
return True
except Exception:
return False
def delete_table(client: bigquery.Client, table_name: str, layer: str = "bronze"):
"""
Delete table jika ada
Args:
client : BigQuery client
table_name : Nama table
layer : Layer — 'bronze'/'raw', 'silver'/'staging', 'gold'/'dw'
"""
table_id = get_table_id(table_name, layer)
try:
client.delete_table(table_id, not_found_ok=True)
print(f" Deleted [{layer.upper()}] table: {table_name}")
except Exception as e:
print(f" Error deleting [{layer.upper()}] table {table_name}: {e}")
def create_dataset_if_not_exists(client: bigquery.Client, dataset_id: str):
"""
Create dataset jika belum ada
Args:
client : BigQuery client
dataset_id : Dataset ID string
"""
full_id = f"{PROJECT_ID}.{dataset_id}"
try:
client.get_dataset(full_id)
print(f" ✓ Exists : {dataset_id}")
except Exception:
ds = bigquery.Dataset(full_id)
ds.location = LOCATION
client.create_dataset(ds, timeout=30)
print(f" ✓ Created : {dataset_id}")
def create_all_datasets(client: bigquery.Client):
"""Create semua 3 dataset (Raw/Staging/DW) jika belum ada"""
print("Setting up BigQuery Datasets (Kimball DW)...")
for layer, dataset_id in LAYER_DATASET_MAP.items():
create_dataset_if_not_exists(client, dataset_id)
# VERIFICATION
def verify_setup() -> bool:
"""
Verify BigQuery setup untuk semua 3 layer (Raw / Staging / DW)
Checks:
1. Credentials file exists
2. Koneksi ke BigQuery berhasil
3. Semua dataset ada atau berhasil dibuat
"""
print("=" * 60)
print("BIGQUERY SETUP VERIFICATION")
print("Kimball DW Architecture")
print("=" * 60)
# 1. Credentials
if not os.path.exists(CREDENTIALS_PATH):
print(f"Credentials not found : {CREDENTIALS_PATH}")
return False
print(f"✓ Credentials found")
# 2. Koneksi
try:
client = get_bigquery_client()
print(f"✓ Connected to BigQuery")
print(f" Project : {PROJECT_ID}")
print(f" Location : {LOCATION}")
except Exception as e:
print(f"Connection failed: {e}")
return False
# 3. Datasets
try:
print()
create_all_datasets(client)
except Exception as e:
print(f"Dataset setup failed: {e}")
return False
print("\n" + "=" * 60)
print("✓ SETUP SUCCESSFUL")
print(f" Raw (Bronze) : {DATASET_BRONZE}")
print(f" Staging (Silver) : {DATASET_SILVER}")
print(f" DW (Gold) : {DATASET_GOLD}")
print(f" Audit : {DATASET_AUDIT}")
print("=" * 60)
return True
# INITIALIZE ON IMPORT
if __name__ == "__main__":
verify_setup()
else:
print("BigQuery Config Loaded — Kimball DW Architecture")
print(f" Project : {PROJECT_ID}")
print(f" Raw (Bronze) : {DATASET_BRONZE}")
print(f" Staging (Silver) : {DATASET_SILVER}")
print(f" DW (Gold) : {DATASET_GOLD}")
print(f" Audit : {DATASET_AUDIT}")
print(f" Location : {LOCATION}")