import requests import time import random import re import urllib3 from datetime import datetime import pytz urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) TZ = pytz.timezone("Asia/Jakarta") CHECK_PAGE = "https://trustpositif.komdigi.go.id/check/" API_URL = "https://trustpositif.komdigi.go.id/Rest_server/getrecordsname_home" # Token fallback (punyamu yang lama). Akan dipakai kalau /check/ error (500/down) FALLBACK_CSRF = "d82d36271ea86cf9874f4042ef4e5918" HEADERS_BASE = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36", "Origin": "https://trustpositif.komdigi.go.id", # ✅ Origin hanya domain "Referer": "https://trustpositif.komdigi.go.id/check/", "X-Requested-With": "XMLHttpRequest", "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8", "Accept": "application/json, text/javascript, */*; q=0.01", } # ===== Telegram ===== def send_telegram_message(domain, status, list_category): #telegram_bot_token = "7782992742:AAHdX0vHwqXS8dJrjQJvr8tkFmWQ84v4sDg" #telegram_chat_id = "-4653669484" telegram_bot_token = '7720131646:AAESgu8wqTf6AjZfXiueeqgnf0VsUCsE2_E' telegram_chat_id = '-1002294633860' url = f"https://api.telegram.org/bot{telegram_bot_token}/sendMessage" message = f"Domain: {domain}\nStatus: {status}\nKategori: {list_category}" try: r = requests.post(url, data={"chat_id": telegram_chat_id, "text": message}, timeout=20) r.raise_for_status() except requests.exceptions.RequestException as e: print(f"[TG ERROR] {e}") def now_time(): return datetime.now(TZ).strftime("%H:%M:%S") def now_datetime(): return datetime.now(TZ).strftime("%Y-%m-%d %H:%M:%S") # ===== CSRF Manager ===== class TrustPositifClient: def __init__(self): self.s = requests.Session() self.csrf = FALLBACK_CSRF self.csrf_last_ok = False def _extract_csrf(self, html: str): m = re.search(r'name="csrf_token"\s+value="([^"]+)"', html) if m: return m.group(1) m = re.search(r'csrf_token["\']?\s*[:=]\s*["\']([^"\']+)["\']', html) if m: return m.group(1) return None def refresh_csrf(self, max_retry=3): last_err = None for attempt in range(1, max_retry + 1): try: r = self.s.get(CHECK_PAGE, headers=HEADERS_BASE, timeout=25, verify=False) # kalau server lagi down (500), jangan maksa if r.status_code >= 500: last_err = f"HTTP {r.status_code}" time.sleep(2.0 * attempt + random.uniform(0.2, 0.9)) continue r.raise_for_status() token = self._extract_csrf(r.text) if not token: last_err = "CSRF token tidak ditemukan" time.sleep(1.5 * attempt + random.uniform(0.2, 0.9)) continue self.csrf = token self.csrf_last_ok = True return True except Exception as e: last_err = str(e) time.sleep(2.0 * attempt + random.uniform(0.2, 0.9)) # gagal refresh → tetap pakai token lama/fallback self.csrf_last_ok = False return False def check_domain(self, domain: str, max_retry=3): """ Return: - "Ada" / "Tidak Ada Data" / status lain dari JSON - atau "Error HTTP xxx (...)" / "Error Non-JSON (...)" / "Error Request (...)" """ payload = {"csrf_token": self.csrf, "name": domain} last_err = None for attempt in range(1, max_retry + 1): try: r = self.s.post(API_URL, headers=HEADERS_BASE, data=payload, timeout=25, verify=False) # Jika token invalid / blocked, sering 403/419/401 if r.status_code in (401, 403, 419): # coba refresh csrf 1x lalu ulang self.refresh_csrf(max_retry=2) payload["csrf_token"] = self.csrf time.sleep(1.0 * attempt + random.uniform(0.2, 0.8)) continue # rate limit if r.status_code == 429: time.sleep(3.0 * attempt + random.uniform(0.5, 1.5)) continue if r.status_code != 200: snippet = (r.text or "")[:90].replace("\n", " ") return f"Error HTTP {r.status_code} ({snippet})" ct = (r.headers.get("Content-Type") or "").lower() if "json" not in ct: # kemungkinan HTML challenge / server error page snippet = (r.text or "")[:90].replace("\n", " ") # refresh csrf lalu retry sekali self.refresh_csrf(max_retry=1) payload["csrf_token"] = self.csrf if attempt < max_retry: time.sleep(1.0 * attempt + random.uniform(0.2, 0.8)) continue return f"Error Non-JSON ({ct}) ({snippet})" data = r.json() if "values" in data and data["values"]: return data["values"][0].get("Status", "Tidak Diketahui") return "Tidak Ada Data" except Exception as e: last_err = e time.sleep(1.5 * attempt + random.uniform(0.2, 0.8)) return f"Error Request ({last_err})" # ===== Domain checker per file ===== def check_domains(client: TrustPositifClient, file_name: str, list_category: str): try: with open(file_name, "r", encoding="utf-8") as f: for idx, domain in enumerate(f, start=1): domain = domain.strip() if not domain: continue status = client.check_domain(domain, max_retry=3) print("{:<5} {:<30} {:<35} {:<10} {:<20}".format( idx, domain, status, now_time(), list_category )) if status == "Ada": send_telegram_message(domain, "Nawala", list_category) # delay kecil biar aman dari WAF time.sleep(random.uniform(0.35, 0.95)) except FileNotFoundError: print(f"❌ File '{file_name}' tidak ditemukan.") except Exception as e: print(f"⚠️ Terjadi Error pada file '{file_name}': {e}") # ===== Main loop ===== def main(): file_categories = { "Mainsite.txt": "Mainsite", "LP_Ads.txt": "LP Ads", "MS_Ads.txt": "Mainsite Ads", "RTP.txt": "RTP", "ipedi.txt": "IP Edi", "ipdoni.txt": "IP Pacik", "ipvive.txt": "IP Vive", "ipilham.txt": "IP Ilham", "ipdimas.txt": "IP Dimas", "Kritik.txt": "Kritik & Saran", "sosmed.txt": "Sosmed Kangtoto", "AMP.txt": "AMP", "spin.txt": "SPINNER", "wablast.txt": "WA Blast", "shortio.txt": "Short IO", "cloaking.txt": "Cloaking", "ramalan.txt": "Prediksi", "MasterMiror.txt": "Master Mirror MS", "MasterDomainMirror.txt": "Master Domain Mirror", } while True: print("=" * 100) print("{:^100}".format("MEMULAI PENGECEKAN DOMAIN")) print("=" * 100) print("{:<5} {:<30} {:<35} {:<10} {:<20}".format("No", "Domain", "Status", "Waktu", "Kategori")) print("-" * 100) client = TrustPositifClient() # coba refresh csrf dulu (kalau gagal, tetap jalan pakai fallback token) ok = client.refresh_csrf(max_retry=3) if not ok: print(f"⚠️ Gagal ambil CSRF dari /check/ (server mungkin 500/down). Pakai token fallback/terakhir dulu.") for file_name, list_category in file_categories.items(): check_domains(client, file_name, list_category) print("-" * 100) print("{:^100}".format(f"Selesai Pengecekan: {now_datetime()}")) print("{:^100}".format("Pengecekan berikutnya dalam 5 menit...")) print("=" * 100) time.sleep(5 * 60) if __name__ == "__main__": main()