import argparse import asyncio import csv import json import random import re from datetime import datetime from pathlib import Path import xml.etree.ElementTree as ET from typing import List, Tuple, Dict, Optional, Set from shutil import copyfile from playwright.async_api import async_playwright, TimeoutError as PlaywrightTimeoutError # Defaults/assumptions DEFAULT_CONCURRENCY = 6 DEFAULT_TIMEOUT_MS = 10_000 # per navigation JITTER_MIN_S = 0.1 JITTER_MAX_S = 0.3 # 추가: 페이지 이동 후 스크린샷까지 대기 시간(ms) POST_NAV_WAIT_MS = 5_000 CSV_FIELDS = [ "timestamp", "ip", "port", "scheme_attempted", "outcome", # success | timeout | error "http_status", "final_url", "page_title", "error_message", ] def parse_nmap_xml(xml_path: Path, pairs: Set[Tuple[str, int]]) -> None: """ Parse one Nmap XML file, updating 'pairs' set of (ip, port) for open TCP ports. """ tree = ET.parse(xml_path) root = tree.getroot() for host in root.findall(".//host"): ip = None for addr in host.findall("address"): if addr.get("addrtype") == "ipv4": ip = addr.get("addr") break if not ip: continue for port_el in host.findall(".//ports/port"): if port_el.get("protocol") != "tcp": continue state_el = port_el.find("state") if state_el is None or state_el.get("state") != "open": continue portid = port_el.get("portid") if not portid or not portid.isdigit(): continue pairs.add((ip, int(portid))) def parse_nmap_text(txt_path: Path, pairs: Set[Tuple[str, int]]) -> None: """ Parse Nmap normal text output and update 'pairs' set with (ip, open_tcp_port). Handles host lines like: - Nmap scan report for 103.139.84.64 - Nmap scan report for ems1.example.org (203.0.113.10) And port lines under a host block like: - 80/tcp open http - 443/tcp open https """ host_re = re.compile(r"^Nmap scan report for\s+(?P.+?)(?:\s*\((?P\d+\.\d+\.\d+\.\d+)\))?\s*$") open_port_re = re.compile(r"^(?P\d{1,5})/tcp\s+open\b", re.IGNORECASE) ip_lit_re = re.compile(r"^\d+\.\d+\.\d+\.\d+$") current_ip: Optional[str] = None in_ports_section = False with txt_path.open("r", encoding="utf-8", errors="ignore") as f: for raw in f: line = raw.rstrip("\n") m_host = host_re.match(line) if m_host: # New host starts host_str = m_host.group("host").strip() ip = m_host.group("ip") if ip: current_ip = ip else: current_ip = host_str if ip_lit_re.match(host_str) else None in_ports_section = False continue if line.startswith("PORT") and "STATE" in line and "SERVICE" in line: in_ports_section = True continue if not current_ip: continue if not line.strip(): in_ports_section = False continue if in_ports_section: m_port = open_port_re.match(line.strip()) if m_port: port = int(m_port.group("port")) pairs.add((current_ip, port)) def parse_inputs(paths: List[Path]) -> List[Tuple[str, int]]: """ Parse a mix of XML and text Nmap outputs; return sorted unique (ip, port) list. """ pairs: Set[Tuple[str, int]] = set() for p in paths: suffix = p.suffix.lower() try: if suffix == ".xml": parse_nmap_xml(p, pairs) else: parse_nmap_text(p, pairs) except Exception as e: # Skip unreadable/invalid files; could log in the future print(f"[WARN] Failed to parse {p}: {e}") return sorted(pairs, key=lambda t: (t[0], t[1])) def ensure_ip_dirs(base_dir: Path, pairs: List[Tuple[str,int]]) -> None: ips = sorted({ip for ip, _ in pairs}) for ip in ips: (base_dir / ip).mkdir(parents=True, exist_ok=True) def prepare_logs(base_dir: Path) -> Tuple[Path, Path]: logs_dir = base_dir / "logs" logs_dir.mkdir(parents=True, exist_ok=True) return logs_dir / "webshot.csv", logs_dir / "webshot.json" def write_logs(csv_path: Path, json_path: Path, records: List[Dict]) -> None: # CSV: append with header if file is new is_new_csv = not csv_path.exists() with csv_path.open("a", newline="", encoding="utf-8") as fcsv: writer = csv.DictWriter(fcsv, fieldnames=CSV_FIELDS) if is_new_csv: writer.writeheader() for rec in records: writer.writerow(rec) # JSON: NDJSON (one object per line) with json_path.open("a", encoding="utf-8") as fjson: for rec in records: fjson.write(json.dumps(rec, ensure_ascii=False) + "\n") async def attempt_and_screenshot(context, url: str, timeout_ms: int, screenshot_path: Path, progress_label: str) -> Tuple[str, Optional[int], Optional[str], Optional[str], Optional[str]]: r""" Try to navigate to the given URL and always save a screenshot to screenshot_path. Returns a tuple: (outcome, status, final_url, title, error_message) - outcome: "success" | "timeout" | "error" - status: HTTP status code if any - final_url: the page.url after navigation (if any) - title: page title if retrievable - error_message: string on failure Note: Always attempts to wait an extra POST_NAV_WAIT_MS (default 5000 ms) before taking a screenshot, and also tries network idle state if possible. """ page = await context.new_page() print(f"[START] {progress_label}: {url}", flush=True) try: resp = await page.goto(url, wait_until="domcontentloaded", timeout=timeout_ms) status = resp.status if resp else None final_url = page.url title = None # 추가 대기: 네트워크 안정화 시도 후, 고정 5초 대기 try: # networkidle은 모든 네트워크가 잠잠해질 때까지 대기(환경에 따라 오래 걸릴 수 있어 Optional) await page.wait_for_load_state("networkidle", timeout=timeout_ms) except Exception: # networkidle 실패/시간초과는 무시하고 고정 대기만 수행 pass await page.wait_for_timeout(POST_NAV_WAIT_MS) try: title = await page.title() except Exception: title = None # Save screenshot of the loaded page (any status is fine) screenshot_path.parent.mkdir(parents=True, exist_ok=True) await page.screenshot(path=str(screenshot_path), full_page=True) print(f"[OK] {progress_label}: status={status} saved={screenshot_path}", flush=True) return "success", status, final_url, title, None except PlaywrightTimeoutError as e: # Try to capture whatever is visible; if not, render an error HTML and screenshot it try: screenshot_path.parent.mkdir(parents=True, exist_ok=True) await page.screenshot(path=str(screenshot_path), full_page=True) except Exception: try: await page.set_content( f"""

Timeout

URL: {url}
Reason: {str(e)}
""" ) await page.screenshot(path=str(screenshot_path), full_page=True) except Exception: pass print(f"[TIME] {progress_label}: timeout saved={screenshot_path}", flush=True) return "timeout", None, None, None, str(e) except Exception as e: # Generic error: same fallback to ensure a screenshot try: screenshot_path.parent.mkdir(parents=True, exist_ok=True) await page.screenshot(path=str(screenshot_path), full_page=True) except Exception: try: await page.set_content( f"""

Connection Error

URL: {url}
Reason: {str(e)}
""" ) await page.screenshot(path=str(screenshot_path), full_page=True) except Exception: pass print(f"[ERR] {progress_label}: error saved={screenshot_path}", flush=True) return "error", None, None, None, str(e) finally: try: await page.close() except Exception: pass async def process_target(ip: str, port: int, context, base_dir: Path, timeout_ms: int, sem: asyncio.Semaphore) -> List[Dict]: r""" For one (ip, port): try HTTP first; on failure/timeout, try HTTPS (ignore HTTPS cert errors). Always save screenshots per attempt under \_attempts\ and ensure final \.png exists. Returns a list of 1–2 log records (one per attempt performed). """ async with sem: # Jitter to reduce burstiness await asyncio.sleep(random.uniform(JITTER_MIN_S, JITTER_MAX_S)) ts = datetime.now().isoformat(timespec="seconds") http_url = f"http://{ip}:{port}" https_url = f"https://{ip}:{port}" ip_dir = base_dir / ip final_screenshot = ip_dir / f"{port}.png" attempts_dir = ip_dir / "_attempts" attempts_dir.mkdir(parents=True, exist_ok=True) http_attempt_path = attempts_dir / f"{port}_http.png" https_attempt_path = attempts_dir / f"{port}_https.png" records: List[Dict] = [] # Attempt HTTP first outcome, status, final_url, title, err_msg = await attempt_and_screenshot( context, http_url, timeout_ms, http_attempt_path, f"{ip}:{port} HTTP" ) http_rec = { "timestamp": ts, "ip": ip, "port": port, "scheme_attempted": "http", "outcome": outcome, "http_status": status, "final_url": final_url, "page_title": title, "error_message": err_msg, } records.append(http_rec) if outcome == "success": # Promote HTTP attempt to final screenshot try: copyfile(http_attempt_path, final_screenshot) except Exception: pass print(f"[SAVE] {ip}:{port} -> {final_screenshot} (http)", flush=True) return records # HTTP failed -> Attempt HTTPS outcome2, status2, final_url2, title2, err_msg2 = await attempt_and_screenshot( context, https_url, timeout_ms, https_attempt_path, f"{ip}:{port} HTTPS" ) https_rec = { "timestamp": ts, "ip": ip, "port": port, "scheme_attempted": "https", "outcome": outcome2, "http_status": status2, "final_url": final_url2, "page_title": title2, "error_message": err_msg2, } records.append(https_rec) # Promote HTTPS attempt to final if success; otherwise keep HTTPS error as final try: src = https_attempt_path if https_attempt_path.exists() else http_attempt_path copyfile(src, final_screenshot) except Exception: pass print(f"[SAVE] {ip}:{port} -> {final_screenshot} ({'https' if outcome2=='success' else 'error'})", flush=True) return records async def run(input_paths: List[Path], concurrency: int, timeout_ms: int, base_dir: Path) -> None: csv_path, json_path = prepare_logs(base_dir) pairs = parse_inputs(input_paths) total = len(pairs) if not pairs: print("No open TCP ports found in the provided Nmap files.") return ensure_ip_dirs(base_dir, pairs) print(f"Targets: {total} open TCP ports across {len(set(ip for ip,_ in pairs))} hosts; Concurrency={concurrency}; Timeout={timeout_ms}ms", flush=True) async with async_playwright() as pw: # 번들 브라우저가 없을 경우를 대비하여, 시스템 Chrome 사용으로 바꾸고 싶다면: # browser = await pw.chromium.launch(executable_path=r"C:\Program Files\Google\Chrome\Application\chrome.exe", headless=True) browser = await pw.chromium.launch(headless=True) context = await browser.new_context(ignore_https_errors=True) sem = asyncio.Semaphore(concurrency) tasks = [ asyncio.create_task(process_target(ip, port, context, base_dir, timeout_ms, sem)) for ip, port in pairs ] results: List[Dict] = [] done = 0 for coro in asyncio.as_completed(tasks): recs = await coro results.extend(recs) done += 1 # Progress line using the first record ip = recs[0].get("ip") port = recs[0].get("port") summary = ("success" if any(r.get("outcome") == "success" for r in recs) else ("timeout" if any(r.get("outcome") == "timeout" for r in recs) else "error")) print(f"[PROG] {done}/{total} completed: {ip}:{port} => {summary}", flush=True) await context.close() await browser.close() # Write all attempt records write_logs(csv_path, json_path, results) print(f"[DONE] Logs written: {csv_path} and {json_path}", flush=True) def cleanup_final_images(base_dir: Path) -> int: r""" Remove top-level .png files under each IP folder, keeping _attempts intact. Returns number of files deleted. """ deleted = 0 for ip_dir in base_dir.iterdir(): try: if not ip_dir.is_dir(): continue # Skip common system/profile directories when base_dir is a user profile root if ip_dir.name.lower() in {"application data", "appdata", "saved games", "documents", "downloads", "music", "pictures", "videos", "contacts", "links", "searches", "favorites", "onedrive"}: continue if ip_dir.name == "logs": continue attempts_dir = ip_dir / "_attempts" for item in ip_dir.iterdir(): try: if item.is_dir(): continue if item.suffix.lower() != ".png": continue # Only delete files named like ".png" (e.g., 80.png, 443.png) if re.fullmatch(r"\d{1,5}\.png", item.name): try: item.unlink() deleted += 1 except Exception: pass except PermissionError: continue except PermissionError: continue return deleted def main(): parser = argparse.ArgumentParser(description="Screenshot web UIs from Nmap results (XML or text). HTTP→HTTPS, Playwright.") parser.add_argument("inputs", nargs="*", help="Nmap output files (XML or normal text), e.g., scan1.xml scan2.xml scan3.txt") parser.add_argument("--concurrency", type=int, default=DEFAULT_CONCURRENCY, help="Max parallel targets (default: 6)") parser.add_argument("--timeout", type=int, default=DEFAULT_TIMEOUT_MS, help="Navigation timeout in ms (default: 10000)") parser.add_argument("--cleanup-final", action="store_true", help="Remove top-level .png files, keep only _attempts images.") parser.add_argument("--base-dir", default=None, help="Directory where outputs/logs will be written (default: current working directory)") args = parser.parse_args() base_dir = Path(args.base_dir) if args.base_dir else Path.cwd() if not args.inputs and args.cleanup_final: deleted = cleanup_final_images(base_dir) print(f"[CLEANUP] Deleted {deleted} top-level port PNGs (kept _attempts)") return input_paths = [Path(p) for p in args.inputs] for p in input_paths: if not p.exists(): raise FileNotFoundError(f"File not found: {p}") asyncio.run(run(input_paths, concurrency=args.concurrency, timeout_ms=args.timeout, base_dir=base_dir)) if args.cleanup_final: deleted = cleanup_final_images(base_dir) print(f"[CLEANUP] Deleted {deleted} top-level port PNGs (kept _attempts)") if __name__ == "__main__": main()