From e63d8ba69a23650d32613a90e5921d8ee1767571 Mon Sep 17 00:00:00 2001 From: Debby Date: Sat, 7 Mar 2026 17:17:27 +0700 Subject: [PATCH] test connect --- dags/trial.py | 17 +++++++++++++++++ scripts/test_data.py | 37 +++++++++++++++++++++++++++++++++++++ 2 files changed, 54 insertions(+) create mode 100644 dags/trial.py create mode 100644 scripts/test_data.py diff --git a/dags/trial.py b/dags/trial.py new file mode 100644 index 0000000..52eda4f --- /dev/null +++ b/dags/trial.py @@ -0,0 +1,17 @@ +from airflow import DAG +from airflow.operators.python import PythonOperator +from datetime import datetime + +from scripts.test_fao_load import run_fao_test + +with DAG( + dag_id="etl_fao_bigquery", + start_date=datetime(2026, 3, 3), + schedule_interval="@daily", + catchup=False +) as dag: + + task_load_fao = PythonOperator( + task_id="load_fao_to_bigquery", + python_callable=run_fao_test + ) \ No newline at end of file diff --git a/scripts/test_data.py b/scripts/test_data.py new file mode 100644 index 0000000..3089ac2 --- /dev/null +++ b/scripts/test_data.py @@ -0,0 +1,37 @@ +import requests +import zipfile +import io +import pandas as pd + +from scripts.bigquery_config import get_bigquery_client +from scripts.bigquery_helpers import load_to_bigquery, read_from_bigquery + +def run_fao_test(): + print("--- MEMULAI TEST LOAD FAO KE BIGQUERY ---") + + # 1. Extract + url = "https://bulks-faostat.fao.org/production/Food_Security_Data_E_All_Data_(Normalized).zip" + response = requests.get(url, timeout=120) + with zipfile.ZipFile(io.BytesIO(response.content)) as z: + csv_name = [f for f in z.namelist() if f.endswith('.csv')][0] + df = pd.read_csv(z.open(csv_name), encoding='latin-1') + + # Ambil 5 baris teratas + df_top5 = df.head(5) + + print("HASIL 5 DATA TERATAS:") + print("====================================================") + print(df_top5.to_string(index=False)) + print("====================================================") + + # 2. Load ke BigQuery + client = get_bigquery_client() + load_to_bigquery(client, df_top5, "raw_fao_test", layer="bronze", write_disposition="WRITE_TRUNCATE") + print(f"Total data yang berhasil di-load: {len(df_top5)} baris.") + + # 3. Verify + df_check = read_from_bigquery(client, "raw_fao_test", layer="bronze") + print(f"Verify dari BigQuery: {len(df_check)} baris tersimpan.") + +if __name__ == "__main__": + run_fao_test() \ No newline at end of file