webshot2.py
· 17 KiB · Python
原始檔案
import argparse
import asyncio
import csv
import json
import random
import re
from datetime import datetime
from pathlib import Path
import xml.etree.ElementTree as ET
from typing import List, Tuple, Dict, Optional, Set
from shutil import copyfile
from playwright.async_api import async_playwright, TimeoutError as PlaywrightTimeoutError
# Defaults/assumptions
DEFAULT_CONCURRENCY = 6
DEFAULT_TIMEOUT_MS = 10_000 # per navigation
JITTER_MIN_S = 0.1
JITTER_MAX_S = 0.3
# 추가: 페이지 이동 후 스크린샷까지 대기 시간(ms)
POST_NAV_WAIT_MS = 5_000
CSV_FIELDS = [
"timestamp",
"ip",
"port",
"scheme_attempted",
"outcome", # success | timeout | error
"http_status",
"final_url",
"page_title",
"error_message",
]
def parse_nmap_xml(xml_path: Path, pairs: Set[Tuple[str, int]]) -> None:
"""
Parse one Nmap XML file, updating 'pairs' set of (ip, port) for open TCP ports.
"""
tree = ET.parse(xml_path)
root = tree.getroot()
for host in root.findall(".//host"):
ip = None
for addr in host.findall("address"):
if addr.get("addrtype") == "ipv4":
ip = addr.get("addr")
break
if not ip:
continue
for port_el in host.findall(".//ports/port"):
if port_el.get("protocol") != "tcp":
continue
state_el = port_el.find("state")
if state_el is None or state_el.get("state") != "open":
continue
portid = port_el.get("portid")
if not portid or not portid.isdigit():
continue
pairs.add((ip, int(portid)))
def parse_nmap_text(txt_path: Path, pairs: Set[Tuple[str, int]]) -> None:
"""
Parse Nmap normal text output and update 'pairs' set with (ip, open_tcp_port).
Handles host lines like:
- Nmap scan report for 103.139.84.64
- Nmap scan report for ems1.example.org (203.0.113.10)
And port lines under a host block like:
- 80/tcp open http
- 443/tcp open https
"""
host_re = re.compile(r"^Nmap scan report for\s+(?P<host>.+?)(?:\s*\((?P<ip>\d+\.\d+\.\d+\.\d+)\))?\s*$")
open_port_re = re.compile(r"^(?P<port>\d{1,5})/tcp\s+open\b", re.IGNORECASE)
ip_lit_re = re.compile(r"^\d+\.\d+\.\d+\.\d+$")
current_ip: Optional[str] = None
in_ports_section = False
with txt_path.open("r", encoding="utf-8", errors="ignore") as f:
for raw in f:
line = raw.rstrip("\n")
m_host = host_re.match(line)
if m_host:
# New host starts
host_str = m_host.group("host").strip()
ip = m_host.group("ip")
if ip:
current_ip = ip
else:
current_ip = host_str if ip_lit_re.match(host_str) else None
in_ports_section = False
continue
if line.startswith("PORT") and "STATE" in line and "SERVICE" in line:
in_ports_section = True
continue
if not current_ip:
continue
if not line.strip():
in_ports_section = False
continue
if in_ports_section:
m_port = open_port_re.match(line.strip())
if m_port:
port = int(m_port.group("port"))
pairs.add((current_ip, port))
def parse_inputs(paths: List[Path]) -> List[Tuple[str, int]]:
"""
Parse a mix of XML and text Nmap outputs; return sorted unique (ip, port) list.
"""
pairs: Set[Tuple[str, int]] = set()
for p in paths:
suffix = p.suffix.lower()
try:
if suffix == ".xml":
parse_nmap_xml(p, pairs)
else:
parse_nmap_text(p, pairs)
except Exception as e:
# Skip unreadable/invalid files; could log in the future
print(f"[WARN] Failed to parse {p}: {e}")
return sorted(pairs, key=lambda t: (t[0], t[1]))
def ensure_ip_dirs(base_dir: Path, pairs: List[Tuple[str,int]]) -> None:
ips = sorted({ip for ip, _ in pairs})
for ip in ips:
(base_dir / ip).mkdir(parents=True, exist_ok=True)
def prepare_logs(base_dir: Path) -> Tuple[Path, Path]:
logs_dir = base_dir / "logs"
logs_dir.mkdir(parents=True, exist_ok=True)
return logs_dir / "webshot.csv", logs_dir / "webshot.json"
def write_logs(csv_path: Path, json_path: Path, records: List[Dict]) -> None:
# CSV: append with header if file is new
is_new_csv = not csv_path.exists()
with csv_path.open("a", newline="", encoding="utf-8") as fcsv:
writer = csv.DictWriter(fcsv, fieldnames=CSV_FIELDS)
if is_new_csv:
writer.writeheader()
for rec in records:
writer.writerow(rec)
# JSON: NDJSON (one object per line)
with json_path.open("a", encoding="utf-8") as fjson:
for rec in records:
fjson.write(json.dumps(rec, ensure_ascii=False) + "\n")
async def attempt_and_screenshot(context, url: str, timeout_ms: int, screenshot_path: Path, progress_label: str) -> Tuple[str, Optional[int], Optional[str], Optional[str], Optional[str]]:
r"""
Try to navigate to the given URL and always save a screenshot to screenshot_path.
Returns a tuple: (outcome, status, final_url, title, error_message)
- outcome: "success" | "timeout" | "error"
- status: HTTP status code if any
- final_url: the page.url after navigation (if any)
- title: page title if retrievable
- error_message: string on failure
Note: Always attempts to wait an extra POST_NAV_WAIT_MS (default 5000 ms)
before taking a screenshot, and also tries network idle state if possible.
"""
page = await context.new_page()
print(f"[START] {progress_label}: {url}", flush=True)
try:
resp = await page.goto(url, wait_until="domcontentloaded", timeout=timeout_ms)
status = resp.status if resp else None
final_url = page.url
title = None
# 추가 대기: 네트워크 안정화 시도 후, 고정 5초 대기
try:
# networkidle은 모든 네트워크가 잠잠해질 때까지 대기(환경에 따라 오래 걸릴 수 있어 Optional)
await page.wait_for_load_state("networkidle", timeout=timeout_ms)
except Exception:
# networkidle 실패/시간초과는 무시하고 고정 대기만 수행
pass
await page.wait_for_timeout(POST_NAV_WAIT_MS)
try:
title = await page.title()
except Exception:
title = None
# Save screenshot of the loaded page (any status is fine)
screenshot_path.parent.mkdir(parents=True, exist_ok=True)
await page.screenshot(path=str(screenshot_path), full_page=True)
print(f"[OK] {progress_label}: status={status} saved={screenshot_path}", flush=True)
return "success", status, final_url, title, None
except PlaywrightTimeoutError as e:
# Try to capture whatever is visible; if not, render an error HTML and screenshot it
try:
screenshot_path.parent.mkdir(parents=True, exist_ok=True)
await page.screenshot(path=str(screenshot_path), full_page=True)
except Exception:
try:
await page.set_content(
f"""
<html><body style='font-family:Segoe UI,Tahoma,Arial,sans-serif;padding:24px;'>
<h2 style='color:#b00;margin:0 0 8px'>Timeout</h2>
<div>URL: {url}</div>
<div>Reason: {str(e)}</div>
</body></html>
"""
)
await page.screenshot(path=str(screenshot_path), full_page=True)
except Exception:
pass
print(f"[TIME] {progress_label}: timeout saved={screenshot_path}", flush=True)
return "timeout", None, None, None, str(e)
except Exception as e:
# Generic error: same fallback to ensure a screenshot
try:
screenshot_path.parent.mkdir(parents=True, exist_ok=True)
await page.screenshot(path=str(screenshot_path), full_page=True)
except Exception:
try:
await page.set_content(
f"""
<html><body style='font-family:Segoe UI,Tahoma,Arial,sans-serif;padding:24px;'>
<h2 style='color:#b00;margin:0 0 8px'>Connection Error</h2>
<div>URL: {url}</div>
<div>Reason: {str(e)}</div>
</body></html>
"""
)
await page.screenshot(path=str(screenshot_path), full_page=True)
except Exception:
pass
print(f"[ERR] {progress_label}: error saved={screenshot_path}", flush=True)
return "error", None, None, None, str(e)
finally:
try:
await page.close()
except Exception:
pass
async def process_target(ip: str, port: int, context, base_dir: Path, timeout_ms: int, sem: asyncio.Semaphore) -> List[Dict]:
r"""
For one (ip, port): try HTTP first; on failure/timeout, try HTTPS (ignore HTTPS cert errors).
Always save screenshots per attempt under <ip>\_attempts\ and ensure final <ip>\<port>.png exists.
Returns a list of 1–2 log records (one per attempt performed).
"""
async with sem:
# Jitter to reduce burstiness
await asyncio.sleep(random.uniform(JITTER_MIN_S, JITTER_MAX_S))
ts = datetime.now().isoformat(timespec="seconds")
http_url = f"http://{ip}:{port}"
https_url = f"https://{ip}:{port}"
ip_dir = base_dir / ip
final_screenshot = ip_dir / f"{port}.png"
attempts_dir = ip_dir / "_attempts"
attempts_dir.mkdir(parents=True, exist_ok=True)
http_attempt_path = attempts_dir / f"{port}_http.png"
https_attempt_path = attempts_dir / f"{port}_https.png"
records: List[Dict] = []
# Attempt HTTP first
outcome, status, final_url, title, err_msg = await attempt_and_screenshot(
context, http_url, timeout_ms, http_attempt_path, f"{ip}:{port} HTTP"
)
http_rec = {
"timestamp": ts,
"ip": ip,
"port": port,
"scheme_attempted": "http",
"outcome": outcome,
"http_status": status,
"final_url": final_url,
"page_title": title,
"error_message": err_msg,
}
records.append(http_rec)
if outcome == "success":
# Promote HTTP attempt to final screenshot
try:
copyfile(http_attempt_path, final_screenshot)
except Exception:
pass
print(f"[SAVE] {ip}:{port} -> {final_screenshot} (http)", flush=True)
return records
# HTTP failed -> Attempt HTTPS
outcome2, status2, final_url2, title2, err_msg2 = await attempt_and_screenshot(
context, https_url, timeout_ms, https_attempt_path, f"{ip}:{port} HTTPS"
)
https_rec = {
"timestamp": ts,
"ip": ip,
"port": port,
"scheme_attempted": "https",
"outcome": outcome2,
"http_status": status2,
"final_url": final_url2,
"page_title": title2,
"error_message": err_msg2,
}
records.append(https_rec)
# Promote HTTPS attempt to final if success; otherwise keep HTTPS error as final
try:
src = https_attempt_path if https_attempt_path.exists() else http_attempt_path
copyfile(src, final_screenshot)
except Exception:
pass
print(f"[SAVE] {ip}:{port} -> {final_screenshot} ({'https' if outcome2=='success' else 'error'})", flush=True)
return records
async def run(input_paths: List[Path], concurrency: int, timeout_ms: int, base_dir: Path) -> None:
csv_path, json_path = prepare_logs(base_dir)
pairs = parse_inputs(input_paths)
total = len(pairs)
if not pairs:
print("No open TCP ports found in the provided Nmap files.")
return
ensure_ip_dirs(base_dir, pairs)
print(f"Targets: {total} open TCP ports across {len(set(ip for ip,_ in pairs))} hosts; Concurrency={concurrency}; Timeout={timeout_ms}ms", flush=True)
async with async_playwright() as pw:
# 번들 브라우저가 없을 경우를 대비하여, 시스템 Chrome 사용으로 바꾸고 싶다면:
# browser = await pw.chromium.launch(executable_path=r"C:\Program Files\Google\Chrome\Application\chrome.exe", headless=True)
browser = await pw.chromium.launch(headless=True)
context = await browser.new_context(ignore_https_errors=True)
sem = asyncio.Semaphore(concurrency)
tasks = [
asyncio.create_task(process_target(ip, port, context, base_dir, timeout_ms, sem))
for ip, port in pairs
]
results: List[Dict] = []
done = 0
for coro in asyncio.as_completed(tasks):
recs = await coro
results.extend(recs)
done += 1
# Progress line using the first record
ip = recs[0].get("ip")
port = recs[0].get("port")
summary = ("success" if any(r.get("outcome") == "success" for r in recs)
else ("timeout" if any(r.get("outcome") == "timeout" for r in recs) else "error"))
print(f"[PROG] {done}/{total} completed: {ip}:{port} => {summary}", flush=True)
await context.close()
await browser.close()
# Write all attempt records
write_logs(csv_path, json_path, results)
print(f"[DONE] Logs written: {csv_path} and {json_path}", flush=True)
def cleanup_final_images(base_dir: Path) -> int:
r"""
Remove top-level <port>.png files under each IP folder, keeping _attempts intact.
Returns number of files deleted.
"""
deleted = 0
for ip_dir in base_dir.iterdir():
try:
if not ip_dir.is_dir():
continue
# Skip common system/profile directories when base_dir is a user profile root
if ip_dir.name.lower() in {"application data", "appdata", "saved games", "documents", "downloads", "music", "pictures", "videos", "contacts", "links", "searches", "favorites", "onedrive"}:
continue
if ip_dir.name == "logs":
continue
attempts_dir = ip_dir / "_attempts"
for item in ip_dir.iterdir():
try:
if item.is_dir():
continue
if item.suffix.lower() != ".png":
continue
# Only delete files named like "<digits>.png" (e.g., 80.png, 443.png)
if re.fullmatch(r"\d{1,5}\.png", item.name):
try:
item.unlink()
deleted += 1
except Exception:
pass
except PermissionError:
continue
except PermissionError:
continue
return deleted
def main():
parser = argparse.ArgumentParser(description="Screenshot web UIs from Nmap results (XML or text). HTTP→HTTPS, Playwright.")
parser.add_argument("inputs", nargs="*", help="Nmap output files (XML or normal text), e.g., scan1.xml scan2.xml scan3.txt")
parser.add_argument("--concurrency", type=int, default=DEFAULT_CONCURRENCY, help="Max parallel targets (default: 6)")
parser.add_argument("--timeout", type=int, default=DEFAULT_TIMEOUT_MS, help="Navigation timeout in ms (default: 10000)")
parser.add_argument("--cleanup-final", action="store_true", help="Remove top-level <port>.png files, keep only _attempts images.")
parser.add_argument("--base-dir", default=None, help="Directory where outputs/logs will be written (default: current working directory)")
args = parser.parse_args()
base_dir = Path(args.base_dir) if args.base_dir else Path.cwd()
if not args.inputs and args.cleanup_final:
deleted = cleanup_final_images(base_dir)
print(f"[CLEANUP] Deleted {deleted} top-level port PNGs (kept _attempts)")
return
input_paths = [Path(p) for p in args.inputs]
for p in input_paths:
if not p.exists():
raise FileNotFoundError(f"File not found: {p}")
asyncio.run(run(input_paths, concurrency=args.concurrency, timeout_ms=args.timeout, base_dir=base_dir))
if args.cleanup_final:
deleted = cleanup_final_images(base_dir)
print(f"[CLEANUP] Deleted {deleted} top-level port PNGs (kept _attempts)")
if __name__ == "__main__":
main()
| 1 | import argparse |
| 2 | import asyncio |
| 3 | import csv |
| 4 | import json |
| 5 | import random |
| 6 | import re |
| 7 | from datetime import datetime |
| 8 | from pathlib import Path |
| 9 | import xml.etree.ElementTree as ET |
| 10 | from typing import List, Tuple, Dict, Optional, Set |
| 11 | from shutil import copyfile |
| 12 | |
| 13 | from playwright.async_api import async_playwright, TimeoutError as PlaywrightTimeoutError |
| 14 | |
| 15 | # Defaults/assumptions |
| 16 | DEFAULT_CONCURRENCY = 6 |
| 17 | DEFAULT_TIMEOUT_MS = 10_000 # per navigation |
| 18 | JITTER_MIN_S = 0.1 |
| 19 | JITTER_MAX_S = 0.3 |
| 20 | |
| 21 | # 추가: 페이지 이동 후 스크린샷까지 대기 시간(ms) |
| 22 | POST_NAV_WAIT_MS = 5_000 |
| 23 | |
| 24 | CSV_FIELDS = [ |
| 25 | "timestamp", |
| 26 | "ip", |
| 27 | "port", |
| 28 | "scheme_attempted", |
| 29 | "outcome", # success | timeout | error |
| 30 | "http_status", |
| 31 | "final_url", |
| 32 | "page_title", |
| 33 | "error_message", |
| 34 | ] |
| 35 | |
| 36 | def parse_nmap_xml(xml_path: Path, pairs: Set[Tuple[str, int]]) -> None: |
| 37 | """ |
| 38 | Parse one Nmap XML file, updating 'pairs' set of (ip, port) for open TCP ports. |
| 39 | """ |
| 40 | tree = ET.parse(xml_path) |
| 41 | root = tree.getroot() |
| 42 | for host in root.findall(".//host"): |
| 43 | ip = None |
| 44 | for addr in host.findall("address"): |
| 45 | if addr.get("addrtype") == "ipv4": |
| 46 | ip = addr.get("addr") |
| 47 | break |
| 48 | if not ip: |
| 49 | continue |
| 50 | for port_el in host.findall(".//ports/port"): |
| 51 | if port_el.get("protocol") != "tcp": |
| 52 | continue |
| 53 | state_el = port_el.find("state") |
| 54 | if state_el is None or state_el.get("state") != "open": |
| 55 | continue |
| 56 | portid = port_el.get("portid") |
| 57 | if not portid or not portid.isdigit(): |
| 58 | continue |
| 59 | pairs.add((ip, int(portid))) |
| 60 | |
| 61 | def parse_nmap_text(txt_path: Path, pairs: Set[Tuple[str, int]]) -> None: |
| 62 | """ |
| 63 | Parse Nmap normal text output and update 'pairs' set with (ip, open_tcp_port). |
| 64 | Handles host lines like: |
| 65 | - Nmap scan report for 103.139.84.64 |
| 66 | - Nmap scan report for ems1.example.org (203.0.113.10) |
| 67 | And port lines under a host block like: |
| 68 | - 80/tcp open http |
| 69 | - 443/tcp open https |
| 70 | """ |
| 71 | host_re = re.compile(r"^Nmap scan report for\s+(?P<host>.+?)(?:\s*\((?P<ip>\d+\.\d+\.\d+\.\d+)\))?\s*$") |
| 72 | open_port_re = re.compile(r"^(?P<port>\d{1,5})/tcp\s+open\b", re.IGNORECASE) |
| 73 | ip_lit_re = re.compile(r"^\d+\.\d+\.\d+\.\d+$") |
| 74 | |
| 75 | current_ip: Optional[str] = None |
| 76 | in_ports_section = False |
| 77 | |
| 78 | with txt_path.open("r", encoding="utf-8", errors="ignore") as f: |
| 79 | for raw in f: |
| 80 | line = raw.rstrip("\n") |
| 81 | |
| 82 | m_host = host_re.match(line) |
| 83 | if m_host: |
| 84 | # New host starts |
| 85 | host_str = m_host.group("host").strip() |
| 86 | ip = m_host.group("ip") |
| 87 | if ip: |
| 88 | current_ip = ip |
| 89 | else: |
| 90 | current_ip = host_str if ip_lit_re.match(host_str) else None |
| 91 | in_ports_section = False |
| 92 | continue |
| 93 | |
| 94 | if line.startswith("PORT") and "STATE" in line and "SERVICE" in line: |
| 95 | in_ports_section = True |
| 96 | continue |
| 97 | |
| 98 | if not current_ip: |
| 99 | continue |
| 100 | |
| 101 | if not line.strip(): |
| 102 | in_ports_section = False |
| 103 | continue |
| 104 | |
| 105 | if in_ports_section: |
| 106 | m_port = open_port_re.match(line.strip()) |
| 107 | if m_port: |
| 108 | port = int(m_port.group("port")) |
| 109 | pairs.add((current_ip, port)) |
| 110 | |
| 111 | |
| 112 | def parse_inputs(paths: List[Path]) -> List[Tuple[str, int]]: |
| 113 | """ |
| 114 | Parse a mix of XML and text Nmap outputs; return sorted unique (ip, port) list. |
| 115 | """ |
| 116 | pairs: Set[Tuple[str, int]] = set() |
| 117 | for p in paths: |
| 118 | suffix = p.suffix.lower() |
| 119 | try: |
| 120 | if suffix == ".xml": |
| 121 | parse_nmap_xml(p, pairs) |
| 122 | else: |
| 123 | parse_nmap_text(p, pairs) |
| 124 | except Exception as e: |
| 125 | # Skip unreadable/invalid files; could log in the future |
| 126 | print(f"[WARN] Failed to parse {p}: {e}") |
| 127 | return sorted(pairs, key=lambda t: (t[0], t[1])) |
| 128 | |
| 129 | def ensure_ip_dirs(base_dir: Path, pairs: List[Tuple[str,int]]) -> None: |
| 130 | ips = sorted({ip for ip, _ in pairs}) |
| 131 | for ip in ips: |
| 132 | (base_dir / ip).mkdir(parents=True, exist_ok=True) |
| 133 | |
| 134 | def prepare_logs(base_dir: Path) -> Tuple[Path, Path]: |
| 135 | logs_dir = base_dir / "logs" |
| 136 | logs_dir.mkdir(parents=True, exist_ok=True) |
| 137 | return logs_dir / "webshot.csv", logs_dir / "webshot.json" |
| 138 | |
| 139 | def write_logs(csv_path: Path, json_path: Path, records: List[Dict]) -> None: |
| 140 | # CSV: append with header if file is new |
| 141 | is_new_csv = not csv_path.exists() |
| 142 | with csv_path.open("a", newline="", encoding="utf-8") as fcsv: |
| 143 | writer = csv.DictWriter(fcsv, fieldnames=CSV_FIELDS) |
| 144 | if is_new_csv: |
| 145 | writer.writeheader() |
| 146 | for rec in records: |
| 147 | writer.writerow(rec) |
| 148 | # JSON: NDJSON (one object per line) |
| 149 | with json_path.open("a", encoding="utf-8") as fjson: |
| 150 | for rec in records: |
| 151 | fjson.write(json.dumps(rec, ensure_ascii=False) + "\n") |
| 152 | |
| 153 | async def attempt_and_screenshot(context, url: str, timeout_ms: int, screenshot_path: Path, progress_label: str) -> Tuple[str, Optional[int], Optional[str], Optional[str], Optional[str]]: |
| 154 | r""" |
| 155 | Try to navigate to the given URL and always save a screenshot to screenshot_path. |
| 156 | Returns a tuple: (outcome, status, final_url, title, error_message) |
| 157 | - outcome: "success" | "timeout" | "error" |
| 158 | - status: HTTP status code if any |
| 159 | - final_url: the page.url after navigation (if any) |
| 160 | - title: page title if retrievable |
| 161 | - error_message: string on failure |
| 162 | |
| 163 | Note: Always attempts to wait an extra POST_NAV_WAIT_MS (default 5000 ms) |
| 164 | before taking a screenshot, and also tries network idle state if possible. |
| 165 | """ |
| 166 | page = await context.new_page() |
| 167 | print(f"[START] {progress_label}: {url}", flush=True) |
| 168 | try: |
| 169 | resp = await page.goto(url, wait_until="domcontentloaded", timeout=timeout_ms) |
| 170 | status = resp.status if resp else None |
| 171 | final_url = page.url |
| 172 | title = None |
| 173 | |
| 174 | # 추가 대기: 네트워크 안정화 시도 후, 고정 5초 대기 |
| 175 | try: |
| 176 | # networkidle은 모든 네트워크가 잠잠해질 때까지 대기(환경에 따라 오래 걸릴 수 있어 Optional) |
| 177 | await page.wait_for_load_state("networkidle", timeout=timeout_ms) |
| 178 | except Exception: |
| 179 | # networkidle 실패/시간초과는 무시하고 고정 대기만 수행 |
| 180 | pass |
| 181 | |
| 182 | await page.wait_for_timeout(POST_NAV_WAIT_MS) |
| 183 | |
| 184 | try: |
| 185 | title = await page.title() |
| 186 | except Exception: |
| 187 | title = None |
| 188 | |
| 189 | # Save screenshot of the loaded page (any status is fine) |
| 190 | screenshot_path.parent.mkdir(parents=True, exist_ok=True) |
| 191 | await page.screenshot(path=str(screenshot_path), full_page=True) |
| 192 | print(f"[OK] {progress_label}: status={status} saved={screenshot_path}", flush=True) |
| 193 | return "success", status, final_url, title, None |
| 194 | except PlaywrightTimeoutError as e: |
| 195 | # Try to capture whatever is visible; if not, render an error HTML and screenshot it |
| 196 | try: |
| 197 | screenshot_path.parent.mkdir(parents=True, exist_ok=True) |
| 198 | await page.screenshot(path=str(screenshot_path), full_page=True) |
| 199 | except Exception: |
| 200 | try: |
| 201 | await page.set_content( |
| 202 | f""" |
| 203 | <html><body style='font-family:Segoe UI,Tahoma,Arial,sans-serif;padding:24px;'> |
| 204 | <h2 style='color:#b00;margin:0 0 8px'>Timeout</h2> |
| 205 | <div>URL: {url}</div> |
| 206 | <div>Reason: {str(e)}</div> |
| 207 | </body></html> |
| 208 | """ |
| 209 | ) |
| 210 | await page.screenshot(path=str(screenshot_path), full_page=True) |
| 211 | except Exception: |
| 212 | pass |
| 213 | print(f"[TIME] {progress_label}: timeout saved={screenshot_path}", flush=True) |
| 214 | return "timeout", None, None, None, str(e) |
| 215 | except Exception as e: |
| 216 | # Generic error: same fallback to ensure a screenshot |
| 217 | try: |
| 218 | screenshot_path.parent.mkdir(parents=True, exist_ok=True) |
| 219 | await page.screenshot(path=str(screenshot_path), full_page=True) |
| 220 | except Exception: |
| 221 | try: |
| 222 | await page.set_content( |
| 223 | f""" |
| 224 | <html><body style='font-family:Segoe UI,Tahoma,Arial,sans-serif;padding:24px;'> |
| 225 | <h2 style='color:#b00;margin:0 0 8px'>Connection Error</h2> |
| 226 | <div>URL: {url}</div> |
| 227 | <div>Reason: {str(e)}</div> |
| 228 | </body></html> |
| 229 | """ |
| 230 | ) |
| 231 | await page.screenshot(path=str(screenshot_path), full_page=True) |
| 232 | except Exception: |
| 233 | pass |
| 234 | print(f"[ERR] {progress_label}: error saved={screenshot_path}", flush=True) |
| 235 | return "error", None, None, None, str(e) |
| 236 | finally: |
| 237 | try: |
| 238 | await page.close() |
| 239 | except Exception: |
| 240 | pass |
| 241 | |
| 242 | async def process_target(ip: str, port: int, context, base_dir: Path, timeout_ms: int, sem: asyncio.Semaphore) -> List[Dict]: |
| 243 | r""" |
| 244 | For one (ip, port): try HTTP first; on failure/timeout, try HTTPS (ignore HTTPS cert errors). |
| 245 | Always save screenshots per attempt under <ip>\_attempts\ and ensure final <ip>\<port>.png exists. |
| 246 | Returns a list of 1–2 log records (one per attempt performed). |
| 247 | """ |
| 248 | async with sem: |
| 249 | # Jitter to reduce burstiness |
| 250 | await asyncio.sleep(random.uniform(JITTER_MIN_S, JITTER_MAX_S)) |
| 251 | ts = datetime.now().isoformat(timespec="seconds") |
| 252 | |
| 253 | http_url = f"http://{ip}:{port}" |
| 254 | https_url = f"https://{ip}:{port}" |
| 255 | |
| 256 | ip_dir = base_dir / ip |
| 257 | final_screenshot = ip_dir / f"{port}.png" |
| 258 | attempts_dir = ip_dir / "_attempts" |
| 259 | attempts_dir.mkdir(parents=True, exist_ok=True) |
| 260 | http_attempt_path = attempts_dir / f"{port}_http.png" |
| 261 | https_attempt_path = attempts_dir / f"{port}_https.png" |
| 262 | |
| 263 | records: List[Dict] = [] |
| 264 | |
| 265 | # Attempt HTTP first |
| 266 | outcome, status, final_url, title, err_msg = await attempt_and_screenshot( |
| 267 | context, http_url, timeout_ms, http_attempt_path, f"{ip}:{port} HTTP" |
| 268 | ) |
| 269 | http_rec = { |
| 270 | "timestamp": ts, |
| 271 | "ip": ip, |
| 272 | "port": port, |
| 273 | "scheme_attempted": "http", |
| 274 | "outcome": outcome, |
| 275 | "http_status": status, |
| 276 | "final_url": final_url, |
| 277 | "page_title": title, |
| 278 | "error_message": err_msg, |
| 279 | } |
| 280 | records.append(http_rec) |
| 281 | |
| 282 | if outcome == "success": |
| 283 | # Promote HTTP attempt to final screenshot |
| 284 | try: |
| 285 | copyfile(http_attempt_path, final_screenshot) |
| 286 | except Exception: |
| 287 | pass |
| 288 | print(f"[SAVE] {ip}:{port} -> {final_screenshot} (http)", flush=True) |
| 289 | return records |
| 290 | |
| 291 | # HTTP failed -> Attempt HTTPS |
| 292 | outcome2, status2, final_url2, title2, err_msg2 = await attempt_and_screenshot( |
| 293 | context, https_url, timeout_ms, https_attempt_path, f"{ip}:{port} HTTPS" |
| 294 | ) |
| 295 | https_rec = { |
| 296 | "timestamp": ts, |
| 297 | "ip": ip, |
| 298 | "port": port, |
| 299 | "scheme_attempted": "https", |
| 300 | "outcome": outcome2, |
| 301 | "http_status": status2, |
| 302 | "final_url": final_url2, |
| 303 | "page_title": title2, |
| 304 | "error_message": err_msg2, |
| 305 | } |
| 306 | records.append(https_rec) |
| 307 | |
| 308 | # Promote HTTPS attempt to final if success; otherwise keep HTTPS error as final |
| 309 | try: |
| 310 | src = https_attempt_path if https_attempt_path.exists() else http_attempt_path |
| 311 | copyfile(src, final_screenshot) |
| 312 | except Exception: |
| 313 | pass |
| 314 | print(f"[SAVE] {ip}:{port} -> {final_screenshot} ({'https' if outcome2=='success' else 'error'})", flush=True) |
| 315 | |
| 316 | return records |
| 317 | |
| 318 | async def run(input_paths: List[Path], concurrency: int, timeout_ms: int, base_dir: Path) -> None: |
| 319 | csv_path, json_path = prepare_logs(base_dir) |
| 320 | |
| 321 | pairs = parse_inputs(input_paths) |
| 322 | total = len(pairs) |
| 323 | if not pairs: |
| 324 | print("No open TCP ports found in the provided Nmap files.") |
| 325 | return |
| 326 | |
| 327 | ensure_ip_dirs(base_dir, pairs) |
| 328 | |
| 329 | print(f"Targets: {total} open TCP ports across {len(set(ip for ip,_ in pairs))} hosts; Concurrency={concurrency}; Timeout={timeout_ms}ms", flush=True) |
| 330 | |
| 331 | async with async_playwright() as pw: |
| 332 | # 번들 브라우저가 없을 경우를 대비하여, 시스템 Chrome 사용으로 바꾸고 싶다면: |
| 333 | # browser = await pw.chromium.launch(executable_path=r"C:\Program Files\Google\Chrome\Application\chrome.exe", headless=True) |
| 334 | browser = await pw.chromium.launch(headless=True) |
| 335 | context = await browser.new_context(ignore_https_errors=True) |
| 336 | |
| 337 | sem = asyncio.Semaphore(concurrency) |
| 338 | tasks = [ |
| 339 | asyncio.create_task(process_target(ip, port, context, base_dir, timeout_ms, sem)) |
| 340 | for ip, port in pairs |
| 341 | ] |
| 342 | |
| 343 | results: List[Dict] = [] |
| 344 | done = 0 |
| 345 | for coro in asyncio.as_completed(tasks): |
| 346 | recs = await coro |
| 347 | results.extend(recs) |
| 348 | done += 1 |
| 349 | # Progress line using the first record |
| 350 | ip = recs[0].get("ip") |
| 351 | port = recs[0].get("port") |
| 352 | summary = ("success" if any(r.get("outcome") == "success" for r in recs) |
| 353 | else ("timeout" if any(r.get("outcome") == "timeout" for r in recs) else "error")) |
| 354 | print(f"[PROG] {done}/{total} completed: {ip}:{port} => {summary}", flush=True) |
| 355 | |
| 356 | await context.close() |
| 357 | await browser.close() |
| 358 | |
| 359 | # Write all attempt records |
| 360 | write_logs(csv_path, json_path, results) |
| 361 | print(f"[DONE] Logs written: {csv_path} and {json_path}", flush=True) |
| 362 | |
| 363 | def cleanup_final_images(base_dir: Path) -> int: |
| 364 | r""" |
| 365 | Remove top-level <port>.png files under each IP folder, keeping _attempts intact. |
| 366 | Returns number of files deleted. |
| 367 | """ |
| 368 | deleted = 0 |
| 369 | for ip_dir in base_dir.iterdir(): |
| 370 | try: |
| 371 | if not ip_dir.is_dir(): |
| 372 | continue |
| 373 | # Skip common system/profile directories when base_dir is a user profile root |
| 374 | if ip_dir.name.lower() in {"application data", "appdata", "saved games", "documents", "downloads", "music", "pictures", "videos", "contacts", "links", "searches", "favorites", "onedrive"}: |
| 375 | continue |
| 376 | if ip_dir.name == "logs": |
| 377 | continue |
| 378 | attempts_dir = ip_dir / "_attempts" |
| 379 | for item in ip_dir.iterdir(): |
| 380 | try: |
| 381 | if item.is_dir(): |
| 382 | continue |
| 383 | if item.suffix.lower() != ".png": |
| 384 | continue |
| 385 | # Only delete files named like "<digits>.png" (e.g., 80.png, 443.png) |
| 386 | if re.fullmatch(r"\d{1,5}\.png", item.name): |
| 387 | try: |
| 388 | item.unlink() |
| 389 | deleted += 1 |
| 390 | except Exception: |
| 391 | pass |
| 392 | except PermissionError: |
| 393 | continue |
| 394 | except PermissionError: |
| 395 | continue |
| 396 | return deleted |
| 397 | |
| 398 | |
| 399 | def main(): |
| 400 | parser = argparse.ArgumentParser(description="Screenshot web UIs from Nmap results (XML or text). HTTP→HTTPS, Playwright.") |
| 401 | parser.add_argument("inputs", nargs="*", help="Nmap output files (XML or normal text), e.g., scan1.xml scan2.xml scan3.txt") |
| 402 | parser.add_argument("--concurrency", type=int, default=DEFAULT_CONCURRENCY, help="Max parallel targets (default: 6)") |
| 403 | parser.add_argument("--timeout", type=int, default=DEFAULT_TIMEOUT_MS, help="Navigation timeout in ms (default: 10000)") |
| 404 | parser.add_argument("--cleanup-final", action="store_true", help="Remove top-level <port>.png files, keep only _attempts images.") |
| 405 | parser.add_argument("--base-dir", default=None, help="Directory where outputs/logs will be written (default: current working directory)") |
| 406 | args = parser.parse_args() |
| 407 | |
| 408 | base_dir = Path(args.base_dir) if args.base_dir else Path.cwd() |
| 409 | |
| 410 | if not args.inputs and args.cleanup_final: |
| 411 | deleted = cleanup_final_images(base_dir) |
| 412 | print(f"[CLEANUP] Deleted {deleted} top-level port PNGs (kept _attempts)") |
| 413 | return |
| 414 | |
| 415 | input_paths = [Path(p) for p in args.inputs] |
| 416 | for p in input_paths: |
| 417 | if not p.exists(): |
| 418 | raise FileNotFoundError(f"File not found: {p}") |
| 419 | |
| 420 | asyncio.run(run(input_paths, concurrency=args.concurrency, timeout_ms=args.timeout, base_dir=base_dir)) |
| 421 | |
| 422 | if args.cleanup_final: |
| 423 | deleted = cleanup_final_images(base_dir) |
| 424 | print(f"[CLEANUP] Deleted {deleted} top-level port PNGs (kept _attempts)") |
| 425 | |
| 426 | if __name__ == "__main__": |
| 427 | main() |
| 428 |