update dag food security bigquery etl pipeline
This commit is contained in:
@@ -1,93 +1,47 @@
|
|||||||
from airflow import DAG
|
from airflow import DAG
|
||||||
from airflow.operators.python import PythonOperator
|
from airflow.operators.python import PythonOperator
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
import sys
|
|
||||||
|
|
||||||
sys.path.insert(0, '/opt/airflow/scripts')
|
# Import fungsi dari folder scripts
|
||||||
|
from scripts.bigquery_raw_layer import (
|
||||||
default_args = {
|
run_verify_connection,
|
||||||
'owner' : 'airflow',
|
run_load_fao,
|
||||||
'retries' : 1,
|
run_load_worldbank,
|
||||||
'email_on_failure': False,
|
run_load_unicef,
|
||||||
}
|
run_staging_integration,
|
||||||
|
)
|
||||||
def task_verify_connection():
|
|
||||||
from bigquery_config import verify_setup
|
|
||||||
result = verify_setup()
|
|
||||||
if not result:
|
|
||||||
raise Exception("BigQuery connection failed!")
|
|
||||||
print("BigQuery connection OK")
|
|
||||||
|
|
||||||
def task_load_fao():
|
|
||||||
from bigquery_config import get_bigquery_client
|
|
||||||
from bigquery_raw_layer import FAODataSource
|
|
||||||
client = get_bigquery_client()
|
|
||||||
source = FAODataSource(client)
|
|
||||||
df = source.run()
|
|
||||||
print(f"FAO loaded: {len(df):,} rows")
|
|
||||||
|
|
||||||
def task_load_worldbank():
|
|
||||||
from bigquery_config import get_bigquery_client
|
|
||||||
from bigquery_raw_layer import FAODataSource, WorldBankDataSource
|
|
||||||
client = get_bigquery_client()
|
|
||||||
fao_source = FAODataSource(client)
|
|
||||||
df_fao = fao_source.run()
|
|
||||||
fao_indicators = df_fao['indicator'].unique().tolist()
|
|
||||||
wb_source = WorldBankDataSource(client, fao_indicators)
|
|
||||||
df = wb_source.run()
|
|
||||||
print(f"World Bank loaded: {len(df):,} rows")
|
|
||||||
|
|
||||||
def task_load_unicef():
|
|
||||||
from bigquery_config import get_bigquery_client
|
|
||||||
from bigquery_raw_layer import FAODataSource, UNICEFDataSource
|
|
||||||
client = get_bigquery_client()
|
|
||||||
fao_source = FAODataSource(client)
|
|
||||||
df_fao = fao_source.run()
|
|
||||||
fao_indicators = df_fao['indicator'].unique().tolist()
|
|
||||||
unicef_source = UNICEFDataSource(client, fao_indicators)
|
|
||||||
df = unicef_source.run()
|
|
||||||
print(f"UNICEF loaded: {len(df):,} rows")
|
|
||||||
|
|
||||||
def task_staging_integration():
|
|
||||||
from bigquery_config import get_bigquery_client
|
|
||||||
from bigquery_raw_layer import StagingDataIntegration
|
|
||||||
client = get_bigquery_client()
|
|
||||||
staging = StagingDataIntegration(client)
|
|
||||||
df = staging.run()
|
|
||||||
print(f"Staging integrated: {len(df):,} rows")
|
|
||||||
|
|
||||||
with DAG(
|
with DAG(
|
||||||
dag_id = "etl_food_security_bigquery",
|
dag_id = "etl_food_security_bigquery",
|
||||||
start_date = datetime(2026, 3, 1),
|
start_date = datetime(2026, 3, 1),
|
||||||
schedule_interval= None,
|
schedule_interval = "@daily",
|
||||||
catchup = False,
|
catchup = False,
|
||||||
default_args = default_args,
|
tags = ["food-security", "bigquery", "kimball"]
|
||||||
tags = ["food-security", "bigquery", "kimball"]
|
|
||||||
) as dag:
|
) as dag:
|
||||||
|
|
||||||
verify = PythonOperator(
|
task_verify = PythonOperator(
|
||||||
task_id = "verify_bigquery_connection",
|
task_id = "verify_bigquery_connection",
|
||||||
python_callable= task_verify_connection
|
python_callable = run_verify_connection
|
||||||
)
|
)
|
||||||
|
|
||||||
load_fao = PythonOperator(
|
task_fao = PythonOperator(
|
||||||
task_id = "load_fao_to_bronze",
|
task_id = "load_fao_to_bronze",
|
||||||
python_callable= task_load_fao
|
python_callable = run_load_fao
|
||||||
)
|
)
|
||||||
|
|
||||||
load_wb = PythonOperator(
|
task_worldbank = PythonOperator(
|
||||||
task_id = "load_worldbank_to_bronze",
|
task_id = "load_worldbank_to_bronze",
|
||||||
python_callable= task_load_worldbank
|
python_callable = run_load_worldbank
|
||||||
)
|
)
|
||||||
|
|
||||||
load_unicef = PythonOperator(
|
task_unicef = PythonOperator(
|
||||||
task_id = "load_unicef_to_bronze",
|
task_id = "load_unicef_to_bronze",
|
||||||
python_callable= task_load_unicef
|
python_callable = run_load_unicef
|
||||||
)
|
)
|
||||||
|
|
||||||
staging = PythonOperator(
|
task_staging = PythonOperator(
|
||||||
task_id = "staging_integration_to_silver",
|
task_id = "staging_integration_to_silver",
|
||||||
python_callable= task_staging_integration
|
python_callable = run_staging_integration
|
||||||
)
|
)
|
||||||
|
|
||||||
verify >> load_fao >> load_wb >> load_unicef >> staging
|
task_verify >> task_fao >> task_worldbank >> task_unicef >> task_staging
|
||||||
Reference in New Issue
Block a user