Files
airflow-coolify/dags/etl_food_security.py
2026-03-07 14:46:02 +07:00

93 lines
3.0 KiB
Python

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import sys
sys.path.insert(0, '/opt/airflow/scripts')
default_args = {
'owner' : 'airflow',
'retries' : 1,
'email_on_failure': False,
}
def task_verify_connection():
from bigquery_config import verify_setup
result = verify_setup()
if not result:
raise Exception("BigQuery connection failed!")
print("BigQuery connection OK")
def task_load_fao():
from bigquery_config import get_bigquery_client
from bigquery_raw_layer import FAODataSource
client = get_bigquery_client()
source = FAODataSource(client)
df = source.run()
print(f"FAO loaded: {len(df):,} rows")
def task_load_worldbank():
from bigquery_config import get_bigquery_client
from bigquery_raw_layer import FAODataSource, WorldBankDataSource
client = get_bigquery_client()
fao_source = FAODataSource(client)
df_fao = fao_source.run()
fao_indicators = df_fao['indicator'].unique().tolist()
wb_source = WorldBankDataSource(client, fao_indicators)
df = wb_source.run()
print(f"World Bank loaded: {len(df):,} rows")
def task_load_unicef():
from bigquery_config import get_bigquery_client
from bigquery_raw_layer import FAODataSource, UNICEFDataSource
client = get_bigquery_client()
fao_source = FAODataSource(client)
df_fao = fao_source.run()
fao_indicators = df_fao['indicator'].unique().tolist()
unicef_source = UNICEFDataSource(client, fao_indicators)
df = unicef_source.run()
print(f"UNICEF loaded: {len(df):,} rows")
def task_staging_integration():
from bigquery_config import get_bigquery_client
from bigquery_raw_layer import StagingDataIntegration
client = get_bigquery_client()
staging = StagingDataIntegration(client)
df = staging.run()
print(f"Staging integrated: {len(df):,} rows")
with DAG(
dag_id = "etl_food_security_bigquery",
start_date = datetime(2026, 3, 1),
schedule_interval= None,
catchup = False,
default_args = default_args,
tags = ["food-security", "bigquery", "kimball"]
) as dag:
verify = PythonOperator(
task_id = "verify_bigquery_connection",
python_callable= task_verify_connection
)
load_fao = PythonOperator(
task_id = "load_fao_to_bronze",
python_callable= task_load_fao
)
load_wb = PythonOperator(
task_id = "load_worldbank_to_bronze",
python_callable= task_load_worldbank
)
load_unicef = PythonOperator(
task_id = "load_unicef_to_bronze",
python_callable= task_load_unicef
)
staging = PythonOperator(
task_id = "staging_integration_to_silver",
python_callable= task_staging_integration
)
verify >> load_fao >> load_wb >> load_unicef >> staging