From d36c9b663fa49b387618695b5091b15e1b9a801e Mon Sep 17 00:00:00 2001 From: Power BI Dev Date: Mon, 30 Mar 2026 22:01:42 +0700 Subject: [PATCH] new etl listdosen from spota --- dags/main_pipeline.py | 4 +- scripts/scraper_pddikti.py | 349 ++++++++++++++++++++++++++++++++++--- 2 files changed, 322 insertions(+), 31 deletions(-) diff --git a/dags/main_pipeline.py b/dags/main_pipeline.py index 2300921..4c883b3 100644 --- a/dags/main_pipeline.py +++ b/dags/main_pipeline.py @@ -5,9 +5,9 @@ from datetime import datetime from scripts.scraper_pddikti import run_scraping_logic with DAG( - dag_id="etl_akademik_separated_v1", + dag_id="sync_dosen_spota_to_directus", start_date=datetime(2026, 3, 3), - schedule_interval="@daily", + schedule_interval="@monthly", catchup=False ) as dag: diff --git a/scripts/scraper_pddikti.py b/scripts/scraper_pddikti.py index f5b5d68..79d9a02 100644 --- a/scripts/scraper_pddikti.py +++ b/scripts/scraper_pddikti.py @@ -1,31 +1,322 @@ -import requests -import pandas as pd +#!/usr/bin/env python3 +""" +Sync dosen data from SPOTA to Directus. + +Usage: + python tools/sync_dosen_spota.py + +Setup: + - Set env vars (DIRECTUS_TOKEN required), or + - Copy .env.example to .env and fill DIRECTUS_TOKEN +""" + +from __future__ import annotations + +import html +import json +import os +import re +import sys +import urllib.error +import urllib.parse +import urllib.request +from typing import Dict, List, Tuple + + +def load_env_file(path: str) -> None: + if not os.path.isfile(path): + return + + with open(path, "r", encoding="utf-8") as f: + for raw_line in f: + line = raw_line.strip() + if not line or line.startswith("#") or "=" not in line: + continue + + key, value = line.split("=", 1) + key = key.strip() + value = value.strip() + + if (value.startswith('"') and value.endswith('"')) or ( + value.startswith("'") and value.endswith("'") + ): + value = value[1:-1] + + if key and key not in os.environ: + os.environ[key] = value + + +def http_request( + method: str, + url: str, + headers: Dict[str, str] | None = None, + json_body: Dict | List | None = None, + timeout: int = 30, +) -> Tuple[int, str]: + payload = None + req_headers = dict(headers or {}) + + if json_body is not None: + payload = json.dumps(json_body, ensure_ascii=False).encode("utf-8") + req_headers["Content-Type"] = "application/json" + + request = urllib.request.Request( + url=url, + data=payload, + headers=req_headers, + method=method.upper(), + ) -def run_scraping_logic(): - print("--- MEMULAI PROSES PENARIKAN DATA DUMMY ---") - - # URL API Publik (Data Universitas) - url = "http://universities.hipolabs.com/search?country=Indonesia" - try: - # 1. Extract - response = requests.get(url, timeout=10) - data = response.json() - - # 2. Transform (Gunakan Pandas) - df = pd.DataFrame(data) - - # Kita ambil 5 data teratas saja untuk ditampilkan di log - preview_data = df[['name', 'web_pages']].head(5) - - # 3. Load (Simulasi: Tampilkan ke Log Airflow) - print("HASIL PENARIKAN DATA (5 Teratas):") - print("====================================================") - print(preview_data.to_string(index=False)) - print("====================================================") - print(f"Total data yang berhasil ditarik: {len(df)} baris.") - - return True - except Exception as e: - print(f"TERJADI KESALAHAN: {str(e)}") - return False \ No newline at end of file + with urllib.request.urlopen(request, timeout=timeout) as response: + body = response.read().decode("utf-8", errors="replace") + return int(getattr(response, "status", 200)), body + except urllib.error.HTTPError as e: + body = e.read().decode("utf-8", errors="replace") if e.fp else "" + return int(e.code), body + + +def strip_tags(text: str) -> str: + clean = re.sub(r"<[^>]*>", "", text, flags=re.IGNORECASE | re.DOTALL) + return html.unescape(clean).strip() + + +def normalize_kk(kk: str, mapping: Dict[str, str]) -> str: + value = (kk or "").strip() + if not value: + return "" + return mapping.get(value, value) + + +def normalize_photo_url(url: str, base_url: str) -> str: + value = (url or "").strip() + if not value: + return "" + if "noimageprofile" in value.lower(): + return "" + if value.startswith("//"): + return "https:" + value + if re.match(r"^https?://", value, flags=re.IGNORECASE): + return value + return urllib.parse.urljoin(base_url + "/", value) + + +def parse_spota_table(html_doc: str, kk_mapping: Dict[str, str], spota_base: str) -> List[Dict[str, str]]: + row_pattern = re.compile(r"]*>(.*?)", re.IGNORECASE | re.DOTALL) + cell_pattern = re.compile(r"]*>(.*?)", re.IGNORECASE | re.DOTALL) + img_pattern = re.compile(r"]+src=[\"']([^\"']+)[\"']", re.IGNORECASE) + + dosen: List[Dict[str, str]] = [] + + for row_html in row_pattern.findall(html_doc): + cells = cell_pattern.findall(row_html) + if len(cells) < 6: + continue + + nama = strip_tags(cells[2]) + nip = strip_tags(cells[3]) + email_addr = strip_tags(cells[4]) + kk = normalize_kk(strip_tags(cells[5]), kk_mapping) + + if not nama or nama.lower() == "nama": + continue + + img_match = img_pattern.search(cells[1]) + foto = normalize_photo_url(img_match.group(1) if img_match else "", spota_base) + + dosen.append( + { + "nama": nama, + "nip": nip, + "email": email_addr, + "kk": kk, + "foto": foto, + } + ) + + return dosen + + +def parse_json(body: str, context: str) -> Dict: + try: + data = json.loads(body) + except json.JSONDecodeError as e: + raise RuntimeError(f"{context} returned invalid JSON: {e}") from e + + if not isinstance(data, dict): + raise RuntimeError(f"{context} returned non-object JSON") + + return data + + +def summarize(text: str, limit: int = 300) -> str: + value = (text or "").strip() + if not value: + return "(empty response body)" + return value[:limit] + ("…" if len(value) > limit else "") + + +def main() -> int: + load_env_file(os.path.join(os.getcwd(), ".env")) + + spota_url = os.getenv("SPOTA_URL", "https://spota.untan.ac.id/listdosen.php") + directus_url = os.getenv("DIRECTUS_URL", "https://api.ifuntanhub.dev").rstrip("/") + directus_token = os.getenv("DIRECTUS_TOKEN", "") + + if not directus_token: + print("ERROR: DIRECTUS_TOKEN is required", file=sys.stderr) + print("Create .env from .env.example or set runtime env variables", file=sys.stderr) + return 1 + + kk_mapping = { + "Computation & Artificial Intelligence": "Computation & Artificial Intelligence", + "Networking & Security": "Networking & Security", + "Software Engineering & Mobile Computing": "Software Engineering & Mobile Computing", + "Information System & Data Spatial": "Information System & Data Spatial", + "Computing & AI": "Computation & Artificial Intelligence", + "Networks & Security": "Networking & Security", + } + + print("Fetching data from SPOTA...") + spota_status, spota_body = http_request( + "GET", + spota_url, + headers={ + "User-Agent": "Mozilla/5.0 (compatible; Informatika-UNTAN-Sync/1.0)", + "Accept": "text/html,application/xhtml+xml", + }, + ) + + if not (200 <= spota_status < 300): + print(f"SPOTA request failed ({spota_status}): {summarize(spota_body)}", file=sys.stderr) + return 1 + + spota_data = parse_spota_table(spota_body, kk_mapping, "https://spota.untan.ac.id") + print(f"Parsed {len(spota_data)} dosen from SPOTA") + + if not spota_data: + print("No dosen data parsed from SPOTA. Aborting.", file=sys.stderr) + return 1 + + directus_headers = { + "Authorization": f"Bearer {directus_token}", + "Accept": "application/json", + } + + print("Fetching existing dosen from Directus...") + existing_status, existing_body = http_request( + "GET", + f"{directus_url}/items/dosen?limit=-1&fields=id,nama,nip", + headers=directus_headers, + ) + + if not (200 <= existing_status < 300): + print( + f"Directus GET failed ({existing_status}): {summarize(existing_body)}", + file=sys.stderr, + ) + return 1 + + existing_json = parse_json(existing_body, "Directus GET /items/dosen") + existing_rows = existing_json.get("data", []) + if not isinstance(existing_rows, list): + existing_rows = [] + + existing_by_nip: Dict[str, str] = {} + existing_by_name: Dict[str, str] = {} + + for row in existing_rows: + if not isinstance(row, dict): + continue + row_id = row.get("id") + if row_id is None: + continue + + nip = str(row.get("nip") or "").strip() + nama = str(row.get("nama") or "").strip().lower() + + if nip: + existing_by_nip[nip] = str(row_id) + if nama: + existing_by_name[nama] = str(row_id) + + to_create = [] + to_update = [] + + for d in spota_data: + payload = { + "nama": d.get("nama", ""), + "nip": d.get("nip", ""), + "email": d.get("email", ""), + "kelompok_keahlian": d.get("kk", ""), + # If you later add a dedicated URL field in Directus, map photo URL here. + # "foto_url": d.get("foto", ""), + } + + nip = str(d.get("nip") or "").strip() + name_key = str(d.get("nama") or "").strip().lower() + + if nip and nip in existing_by_nip: + to_update.append({"id": existing_by_nip[nip], "nama": payload["nama"], "data": payload}) + continue + + if name_key and name_key in existing_by_name: + to_update.append({"id": existing_by_name[name_key], "nama": payload["nama"], "data": payload}) + continue + + to_create.append(payload) + + print(f"To create: {len(to_create)}, to update: {len(to_update)}") + + created_ok = 0 + created_fail = 0 + for item in to_create: + status, body = http_request( + "POST", + f"{directus_url}/items/dosen", + headers=directus_headers, + json_body=item, + ) + if 200 <= status < 300: + created_ok += 1 + else: + created_fail += 1 + print( + f"Create failed ({item.get('nama', 'unknown')}) [{status}]: {summarize(body)}", + file=sys.stderr, + ) + + updated_ok = 0 + updated_fail = 0 + for item in to_update: + status, body = http_request( + "PATCH", + f"{directus_url}/items/dosen/{urllib.parse.quote(item['id'], safe='')}", + headers=directus_headers, + json_body=item["data"], + ) + if 200 <= status < 300: + updated_ok += 1 + else: + updated_fail += 1 + print( + f"Update failed ({item.get('nama', 'unknown')}) [{status}]: {summarize(body)}", + file=sys.stderr, + ) + + print(f"Created: {created_ok}, failed: {created_fail}") + print(f"Updated: {updated_ok}, failed: {updated_fail}") + print("Sync complete.") + + return 1 if (created_fail > 0 or updated_fail > 0) else 0 + + +def run_sync_logic() -> bool: + code = main() + if code != 0: + raise RuntimeError("sync_dosen_spota failed") + return True + + +if __name__ == "__main__": + raise SystemExit(main())