diff --git a/dags/main_pipeline.py b/dags/main_pipeline.py
index 2300921..025fb3c 100644
--- a/dags/main_pipeline.py
+++ b/dags/main_pipeline.py
@@ -2,16 +2,16 @@ from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
# Import fungsi dari folder scripts
-from scripts.scraper_pddikti import run_scraping_logic
+from scripts.scraper_pddikti import run_sync_logic
with DAG(
- dag_id="etl_akademik_separated_v1",
+ dag_id="sync_dosen_spota_to_directus",
start_date=datetime(2026, 3, 3),
- schedule_interval="@daily",
+ schedule_interval="@monthly",
catchup=False
) as dag:
task_scraping = PythonOperator(
task_id="run_pddikti_scraping",
- python_callable=run_scraping_logic # Memanggil fungsi eksternal
+ python_callable=run_sync_logic # Memanggil fungsi eksternal
)
\ No newline at end of file
diff --git a/scripts/scraper_pddikti.py b/scripts/scraper_pddikti.py
index f5b5d68..79d9a02 100644
--- a/scripts/scraper_pddikti.py
+++ b/scripts/scraper_pddikti.py
@@ -1,31 +1,322 @@
-import requests
-import pandas as pd
+#!/usr/bin/env python3
+"""
+Sync dosen data from SPOTA to Directus.
+
+Usage:
+ python tools/sync_dosen_spota.py
+
+Setup:
+ - Set env vars (DIRECTUS_TOKEN required), or
+ - Copy .env.example to .env and fill DIRECTUS_TOKEN
+"""
+
+from __future__ import annotations
+
+import html
+import json
+import os
+import re
+import sys
+import urllib.error
+import urllib.parse
+import urllib.request
+from typing import Dict, List, Tuple
+
+
+def load_env_file(path: str) -> None:
+ if not os.path.isfile(path):
+ return
+
+ with open(path, "r", encoding="utf-8") as f:
+ for raw_line in f:
+ line = raw_line.strip()
+ if not line or line.startswith("#") or "=" not in line:
+ continue
+
+ key, value = line.split("=", 1)
+ key = key.strip()
+ value = value.strip()
+
+ if (value.startswith('"') and value.endswith('"')) or (
+ value.startswith("'") and value.endswith("'")
+ ):
+ value = value[1:-1]
+
+ if key and key not in os.environ:
+ os.environ[key] = value
+
+
+def http_request(
+ method: str,
+ url: str,
+ headers: Dict[str, str] | None = None,
+ json_body: Dict | List | None = None,
+ timeout: int = 30,
+) -> Tuple[int, str]:
+ payload = None
+ req_headers = dict(headers or {})
+
+ if json_body is not None:
+ payload = json.dumps(json_body, ensure_ascii=False).encode("utf-8")
+ req_headers["Content-Type"] = "application/json"
+
+ request = urllib.request.Request(
+ url=url,
+ data=payload,
+ headers=req_headers,
+ method=method.upper(),
+ )
-def run_scraping_logic():
- print("--- MEMULAI PROSES PENARIKAN DATA DUMMY ---")
-
- # URL API Publik (Data Universitas)
- url = "http://universities.hipolabs.com/search?country=Indonesia"
-
try:
- # 1. Extract
- response = requests.get(url, timeout=10)
- data = response.json()
-
- # 2. Transform (Gunakan Pandas)
- df = pd.DataFrame(data)
-
- # Kita ambil 5 data teratas saja untuk ditampilkan di log
- preview_data = df[['name', 'web_pages']].head(5)
-
- # 3. Load (Simulasi: Tampilkan ke Log Airflow)
- print("HASIL PENARIKAN DATA (5 Teratas):")
- print("====================================================")
- print(preview_data.to_string(index=False))
- print("====================================================")
- print(f"Total data yang berhasil ditarik: {len(df)} baris.")
-
- return True
- except Exception as e:
- print(f"TERJADI KESALAHAN: {str(e)}")
- return False
\ No newline at end of file
+ with urllib.request.urlopen(request, timeout=timeout) as response:
+ body = response.read().decode("utf-8", errors="replace")
+ return int(getattr(response, "status", 200)), body
+ except urllib.error.HTTPError as e:
+ body = e.read().decode("utf-8", errors="replace") if e.fp else ""
+ return int(e.code), body
+
+
+def strip_tags(text: str) -> str:
+ clean = re.sub(r"<[^>]*>", "", text, flags=re.IGNORECASE | re.DOTALL)
+ return html.unescape(clean).strip()
+
+
+def normalize_kk(kk: str, mapping: Dict[str, str]) -> str:
+ value = (kk or "").strip()
+ if not value:
+ return ""
+ return mapping.get(value, value)
+
+
+def normalize_photo_url(url: str, base_url: str) -> str:
+ value = (url or "").strip()
+ if not value:
+ return ""
+ if "noimageprofile" in value.lower():
+ return ""
+ if value.startswith("//"):
+ return "https:" + value
+ if re.match(r"^https?://", value, flags=re.IGNORECASE):
+ return value
+ return urllib.parse.urljoin(base_url + "/", value)
+
+
+def parse_spota_table(html_doc: str, kk_mapping: Dict[str, str], spota_base: str) -> List[Dict[str, str]]:
+ row_pattern = re.compile(r"
]*>(.*?)
", re.IGNORECASE | re.DOTALL)
+ cell_pattern = re.compile(r"]*>(.*?) | ", re.IGNORECASE | re.DOTALL)
+ img_pattern = re.compile(r"
]+src=[\"']([^\"']+)[\"']", re.IGNORECASE)
+
+ dosen: List[Dict[str, str]] = []
+
+ for row_html in row_pattern.findall(html_doc):
+ cells = cell_pattern.findall(row_html)
+ if len(cells) < 6:
+ continue
+
+ nama = strip_tags(cells[2])
+ nip = strip_tags(cells[3])
+ email_addr = strip_tags(cells[4])
+ kk = normalize_kk(strip_tags(cells[5]), kk_mapping)
+
+ if not nama or nama.lower() == "nama":
+ continue
+
+ img_match = img_pattern.search(cells[1])
+ foto = normalize_photo_url(img_match.group(1) if img_match else "", spota_base)
+
+ dosen.append(
+ {
+ "nama": nama,
+ "nip": nip,
+ "email": email_addr,
+ "kk": kk,
+ "foto": foto,
+ }
+ )
+
+ return dosen
+
+
+def parse_json(body: str, context: str) -> Dict:
+ try:
+ data = json.loads(body)
+ except json.JSONDecodeError as e:
+ raise RuntimeError(f"{context} returned invalid JSON: {e}") from e
+
+ if not isinstance(data, dict):
+ raise RuntimeError(f"{context} returned non-object JSON")
+
+ return data
+
+
+def summarize(text: str, limit: int = 300) -> str:
+ value = (text or "").strip()
+ if not value:
+ return "(empty response body)"
+ return value[:limit] + ("…" if len(value) > limit else "")
+
+
+def main() -> int:
+ load_env_file(os.path.join(os.getcwd(), ".env"))
+
+ spota_url = os.getenv("SPOTA_URL", "https://spota.untan.ac.id/listdosen.php")
+ directus_url = os.getenv("DIRECTUS_URL", "https://api.ifuntanhub.dev").rstrip("/")
+ directus_token = os.getenv("DIRECTUS_TOKEN", "")
+
+ if not directus_token:
+ print("ERROR: DIRECTUS_TOKEN is required", file=sys.stderr)
+ print("Create .env from .env.example or set runtime env variables", file=sys.stderr)
+ return 1
+
+ kk_mapping = {
+ "Computation & Artificial Intelligence": "Computation & Artificial Intelligence",
+ "Networking & Security": "Networking & Security",
+ "Software Engineering & Mobile Computing": "Software Engineering & Mobile Computing",
+ "Information System & Data Spatial": "Information System & Data Spatial",
+ "Computing & AI": "Computation & Artificial Intelligence",
+ "Networks & Security": "Networking & Security",
+ }
+
+ print("Fetching data from SPOTA...")
+ spota_status, spota_body = http_request(
+ "GET",
+ spota_url,
+ headers={
+ "User-Agent": "Mozilla/5.0 (compatible; Informatika-UNTAN-Sync/1.0)",
+ "Accept": "text/html,application/xhtml+xml",
+ },
+ )
+
+ if not (200 <= spota_status < 300):
+ print(f"SPOTA request failed ({spota_status}): {summarize(spota_body)}", file=sys.stderr)
+ return 1
+
+ spota_data = parse_spota_table(spota_body, kk_mapping, "https://spota.untan.ac.id")
+ print(f"Parsed {len(spota_data)} dosen from SPOTA")
+
+ if not spota_data:
+ print("No dosen data parsed from SPOTA. Aborting.", file=sys.stderr)
+ return 1
+
+ directus_headers = {
+ "Authorization": f"Bearer {directus_token}",
+ "Accept": "application/json",
+ }
+
+ print("Fetching existing dosen from Directus...")
+ existing_status, existing_body = http_request(
+ "GET",
+ f"{directus_url}/items/dosen?limit=-1&fields=id,nama,nip",
+ headers=directus_headers,
+ )
+
+ if not (200 <= existing_status < 300):
+ print(
+ f"Directus GET failed ({existing_status}): {summarize(existing_body)}",
+ file=sys.stderr,
+ )
+ return 1
+
+ existing_json = parse_json(existing_body, "Directus GET /items/dosen")
+ existing_rows = existing_json.get("data", [])
+ if not isinstance(existing_rows, list):
+ existing_rows = []
+
+ existing_by_nip: Dict[str, str] = {}
+ existing_by_name: Dict[str, str] = {}
+
+ for row in existing_rows:
+ if not isinstance(row, dict):
+ continue
+ row_id = row.get("id")
+ if row_id is None:
+ continue
+
+ nip = str(row.get("nip") or "").strip()
+ nama = str(row.get("nama") or "").strip().lower()
+
+ if nip:
+ existing_by_nip[nip] = str(row_id)
+ if nama:
+ existing_by_name[nama] = str(row_id)
+
+ to_create = []
+ to_update = []
+
+ for d in spota_data:
+ payload = {
+ "nama": d.get("nama", ""),
+ "nip": d.get("nip", ""),
+ "email": d.get("email", ""),
+ "kelompok_keahlian": d.get("kk", ""),
+ # If you later add a dedicated URL field in Directus, map photo URL here.
+ # "foto_url": d.get("foto", ""),
+ }
+
+ nip = str(d.get("nip") or "").strip()
+ name_key = str(d.get("nama") or "").strip().lower()
+
+ if nip and nip in existing_by_nip:
+ to_update.append({"id": existing_by_nip[nip], "nama": payload["nama"], "data": payload})
+ continue
+
+ if name_key and name_key in existing_by_name:
+ to_update.append({"id": existing_by_name[name_key], "nama": payload["nama"], "data": payload})
+ continue
+
+ to_create.append(payload)
+
+ print(f"To create: {len(to_create)}, to update: {len(to_update)}")
+
+ created_ok = 0
+ created_fail = 0
+ for item in to_create:
+ status, body = http_request(
+ "POST",
+ f"{directus_url}/items/dosen",
+ headers=directus_headers,
+ json_body=item,
+ )
+ if 200 <= status < 300:
+ created_ok += 1
+ else:
+ created_fail += 1
+ print(
+ f"Create failed ({item.get('nama', 'unknown')}) [{status}]: {summarize(body)}",
+ file=sys.stderr,
+ )
+
+ updated_ok = 0
+ updated_fail = 0
+ for item in to_update:
+ status, body = http_request(
+ "PATCH",
+ f"{directus_url}/items/dosen/{urllib.parse.quote(item['id'], safe='')}",
+ headers=directus_headers,
+ json_body=item["data"],
+ )
+ if 200 <= status < 300:
+ updated_ok += 1
+ else:
+ updated_fail += 1
+ print(
+ f"Update failed ({item.get('nama', 'unknown')}) [{status}]: {summarize(body)}",
+ file=sys.stderr,
+ )
+
+ print(f"Created: {created_ok}, failed: {created_fail}")
+ print(f"Updated: {updated_ok}, failed: {updated_fail}")
+ print("Sync complete.")
+
+ return 1 if (created_fail > 0 or updated_fail > 0) else 0
+
+
+def run_sync_logic() -> bool:
+ code = main()
+ if code != 0:
+ raise RuntimeError("sync_dosen_spota failed")
+ return True
+
+
+if __name__ == "__main__":
+ raise SystemExit(main())