test connect
This commit is contained in:
17
dags/trial.py
Normal file
17
dags/trial.py
Normal file
@@ -0,0 +1,17 @@
|
||||
from airflow import DAG
|
||||
from airflow.operators.python import PythonOperator
|
||||
from datetime import datetime
|
||||
|
||||
from scripts.test_fao_load import run_fao_test
|
||||
|
||||
with DAG(
|
||||
dag_id="etl_fao_bigquery",
|
||||
start_date=datetime(2026, 3, 3),
|
||||
schedule_interval="@daily",
|
||||
catchup=False
|
||||
) as dag:
|
||||
|
||||
task_load_fao = PythonOperator(
|
||||
task_id="load_fao_to_bigquery",
|
||||
python_callable=run_fao_test
|
||||
)
|
||||
37
scripts/test_data.py
Normal file
37
scripts/test_data.py
Normal file
@@ -0,0 +1,37 @@
|
||||
import requests
|
||||
import zipfile
|
||||
import io
|
||||
import pandas as pd
|
||||
|
||||
from scripts.bigquery_config import get_bigquery_client
|
||||
from scripts.bigquery_helpers import load_to_bigquery, read_from_bigquery
|
||||
|
||||
def run_fao_test():
|
||||
print("--- MEMULAI TEST LOAD FAO KE BIGQUERY ---")
|
||||
|
||||
# 1. Extract
|
||||
url = "https://bulks-faostat.fao.org/production/Food_Security_Data_E_All_Data_(Normalized).zip"
|
||||
response = requests.get(url, timeout=120)
|
||||
with zipfile.ZipFile(io.BytesIO(response.content)) as z:
|
||||
csv_name = [f for f in z.namelist() if f.endswith('.csv')][0]
|
||||
df = pd.read_csv(z.open(csv_name), encoding='latin-1')
|
||||
|
||||
# Ambil 5 baris teratas
|
||||
df_top5 = df.head(5)
|
||||
|
||||
print("HASIL 5 DATA TERATAS:")
|
||||
print("====================================================")
|
||||
print(df_top5.to_string(index=False))
|
||||
print("====================================================")
|
||||
|
||||
# 2. Load ke BigQuery
|
||||
client = get_bigquery_client()
|
||||
load_to_bigquery(client, df_top5, "raw_fao_test", layer="bronze", write_disposition="WRITE_TRUNCATE")
|
||||
print(f"Total data yang berhasil di-load: {len(df_top5)} baris.")
|
||||
|
||||
# 3. Verify
|
||||
df_check = read_from_bigquery(client, "raw_fao_test", layer="bronze")
|
||||
print(f"Verify dari BigQuery: {len(df_check)} baris tersimpan.")
|
||||
|
||||
if __name__ == "__main__":
|
||||
run_fao_test()
|
||||
Reference in New Issue
Block a user