Last active 4 months ago

sehoon0519's Avatar sehoon0519 revised this gist 4 months ago. Go to revision

1 file changed, 427 insertions

webshot2.py(file created)

@@ -0,0 +1,427 @@
1 + import argparse
2 + import asyncio
3 + import csv
4 + import json
5 + import random
6 + import re
7 + from datetime import datetime
8 + from pathlib import Path
9 + import xml.etree.ElementTree as ET
10 + from typing import List, Tuple, Dict, Optional, Set
11 + from shutil import copyfile
12 +
13 + from playwright.async_api import async_playwright, TimeoutError as PlaywrightTimeoutError
14 +
15 + # Defaults/assumptions
16 + DEFAULT_CONCURRENCY = 6
17 + DEFAULT_TIMEOUT_MS = 10_000 # per navigation
18 + JITTER_MIN_S = 0.1
19 + JITTER_MAX_S = 0.3
20 +
21 + # 추가: 페이지 이동 후 스크린샷까지 대기 시간(ms)
22 + POST_NAV_WAIT_MS = 5_000
23 +
24 + CSV_FIELDS = [
25 + "timestamp",
26 + "ip",
27 + "port",
28 + "scheme_attempted",
29 + "outcome", # success | timeout | error
30 + "http_status",
31 + "final_url",
32 + "page_title",
33 + "error_message",
34 + ]
35 +
36 + def parse_nmap_xml(xml_path: Path, pairs: Set[Tuple[str, int]]) -> None:
37 + """
38 + Parse one Nmap XML file, updating 'pairs' set of (ip, port) for open TCP ports.
39 + """
40 + tree = ET.parse(xml_path)
41 + root = tree.getroot()
42 + for host in root.findall(".//host"):
43 + ip = None
44 + for addr in host.findall("address"):
45 + if addr.get("addrtype") == "ipv4":
46 + ip = addr.get("addr")
47 + break
48 + if not ip:
49 + continue
50 + for port_el in host.findall(".//ports/port"):
51 + if port_el.get("protocol") != "tcp":
52 + continue
53 + state_el = port_el.find("state")
54 + if state_el is None or state_el.get("state") != "open":
55 + continue
56 + portid = port_el.get("portid")
57 + if not portid or not portid.isdigit():
58 + continue
59 + pairs.add((ip, int(portid)))
60 +
61 + def parse_nmap_text(txt_path: Path, pairs: Set[Tuple[str, int]]) -> None:
62 + """
63 + Parse Nmap normal text output and update 'pairs' set with (ip, open_tcp_port).
64 + Handles host lines like:
65 + - Nmap scan report for 103.139.84.64
66 + - Nmap scan report for ems1.example.org (203.0.113.10)
67 + And port lines under a host block like:
68 + - 80/tcp open http
69 + - 443/tcp open https
70 + """
71 + host_re = re.compile(r"^Nmap scan report for\s+(?P<host>.+?)(?:\s*\((?P<ip>\d+\.\d+\.\d+\.\d+)\))?\s*$")
72 + open_port_re = re.compile(r"^(?P<port>\d{1,5})/tcp\s+open\b", re.IGNORECASE)
73 + ip_lit_re = re.compile(r"^\d+\.\d+\.\d+\.\d+$")
74 +
75 + current_ip: Optional[str] = None
76 + in_ports_section = False
77 +
78 + with txt_path.open("r", encoding="utf-8", errors="ignore") as f:
79 + for raw in f:
80 + line = raw.rstrip("\n")
81 +
82 + m_host = host_re.match(line)
83 + if m_host:
84 + # New host starts
85 + host_str = m_host.group("host").strip()
86 + ip = m_host.group("ip")
87 + if ip:
88 + current_ip = ip
89 + else:
90 + current_ip = host_str if ip_lit_re.match(host_str) else None
91 + in_ports_section = False
92 + continue
93 +
94 + if line.startswith("PORT") and "STATE" in line and "SERVICE" in line:
95 + in_ports_section = True
96 + continue
97 +
98 + if not current_ip:
99 + continue
100 +
101 + if not line.strip():
102 + in_ports_section = False
103 + continue
104 +
105 + if in_ports_section:
106 + m_port = open_port_re.match(line.strip())
107 + if m_port:
108 + port = int(m_port.group("port"))
109 + pairs.add((current_ip, port))
110 +
111 +
112 + def parse_inputs(paths: List[Path]) -> List[Tuple[str, int]]:
113 + """
114 + Parse a mix of XML and text Nmap outputs; return sorted unique (ip, port) list.
115 + """
116 + pairs: Set[Tuple[str, int]] = set()
117 + for p in paths:
118 + suffix = p.suffix.lower()
119 + try:
120 + if suffix == ".xml":
121 + parse_nmap_xml(p, pairs)
122 + else:
123 + parse_nmap_text(p, pairs)
124 + except Exception as e:
125 + # Skip unreadable/invalid files; could log in the future
126 + print(f"[WARN] Failed to parse {p}: {e}")
127 + return sorted(pairs, key=lambda t: (t[0], t[1]))
128 +
129 + def ensure_ip_dirs(base_dir: Path, pairs: List[Tuple[str,int]]) -> None:
130 + ips = sorted({ip for ip, _ in pairs})
131 + for ip in ips:
132 + (base_dir / ip).mkdir(parents=True, exist_ok=True)
133 +
134 + def prepare_logs(base_dir: Path) -> Tuple[Path, Path]:
135 + logs_dir = base_dir / "logs"
136 + logs_dir.mkdir(parents=True, exist_ok=True)
137 + return logs_dir / "webshot.csv", logs_dir / "webshot.json"
138 +
139 + def write_logs(csv_path: Path, json_path: Path, records: List[Dict]) -> None:
140 + # CSV: append with header if file is new
141 + is_new_csv = not csv_path.exists()
142 + with csv_path.open("a", newline="", encoding="utf-8") as fcsv:
143 + writer = csv.DictWriter(fcsv, fieldnames=CSV_FIELDS)
144 + if is_new_csv:
145 + writer.writeheader()
146 + for rec in records:
147 + writer.writerow(rec)
148 + # JSON: NDJSON (one object per line)
149 + with json_path.open("a", encoding="utf-8") as fjson:
150 + for rec in records:
151 + fjson.write(json.dumps(rec, ensure_ascii=False) + "\n")
152 +
153 + async def attempt_and_screenshot(context, url: str, timeout_ms: int, screenshot_path: Path, progress_label: str) -> Tuple[str, Optional[int], Optional[str], Optional[str], Optional[str]]:
154 + r"""
155 + Try to navigate to the given URL and always save a screenshot to screenshot_path.
156 + Returns a tuple: (outcome, status, final_url, title, error_message)
157 + - outcome: "success" | "timeout" | "error"
158 + - status: HTTP status code if any
159 + - final_url: the page.url after navigation (if any)
160 + - title: page title if retrievable
161 + - error_message: string on failure
162 +
163 + Note: Always attempts to wait an extra POST_NAV_WAIT_MS (default 5000 ms)
164 + before taking a screenshot, and also tries network idle state if possible.
165 + """
166 + page = await context.new_page()
167 + print(f"[START] {progress_label}: {url}", flush=True)
168 + try:
169 + resp = await page.goto(url, wait_until="domcontentloaded", timeout=timeout_ms)
170 + status = resp.status if resp else None
171 + final_url = page.url
172 + title = None
173 +
174 + # 추가 대기: 네트워크 안정화 시도 후, 고정 5초 대기
175 + try:
176 + # networkidle은 모든 네트워크가 잠잠해질 때까지 대기(환경에 따라 오래 걸릴 수 있어 Optional)
177 + await page.wait_for_load_state("networkidle", timeout=timeout_ms)
178 + except Exception:
179 + # networkidle 실패/시간초과는 무시하고 고정 대기만 수행
180 + pass
181 +
182 + await page.wait_for_timeout(POST_NAV_WAIT_MS)
183 +
184 + try:
185 + title = await page.title()
186 + except Exception:
187 + title = None
188 +
189 + # Save screenshot of the loaded page (any status is fine)
190 + screenshot_path.parent.mkdir(parents=True, exist_ok=True)
191 + await page.screenshot(path=str(screenshot_path), full_page=True)
192 + print(f"[OK] {progress_label}: status={status} saved={screenshot_path}", flush=True)
193 + return "success", status, final_url, title, None
194 + except PlaywrightTimeoutError as e:
195 + # Try to capture whatever is visible; if not, render an error HTML and screenshot it
196 + try:
197 + screenshot_path.parent.mkdir(parents=True, exist_ok=True)
198 + await page.screenshot(path=str(screenshot_path), full_page=True)
199 + except Exception:
200 + try:
201 + await page.set_content(
202 + f"""
203 + <html><body style='font-family:Segoe UI,Tahoma,Arial,sans-serif;padding:24px;'>
204 + <h2 style='color:#b00;margin:0 0 8px'>Timeout</h2>
205 + <div>URL: {url}</div>
206 + <div>Reason: {str(e)}</div>
207 + </body></html>
208 + """
209 + )
210 + await page.screenshot(path=str(screenshot_path), full_page=True)
211 + except Exception:
212 + pass
213 + print(f"[TIME] {progress_label}: timeout saved={screenshot_path}", flush=True)
214 + return "timeout", None, None, None, str(e)
215 + except Exception as e:
216 + # Generic error: same fallback to ensure a screenshot
217 + try:
218 + screenshot_path.parent.mkdir(parents=True, exist_ok=True)
219 + await page.screenshot(path=str(screenshot_path), full_page=True)
220 + except Exception:
221 + try:
222 + await page.set_content(
223 + f"""
224 + <html><body style='font-family:Segoe UI,Tahoma,Arial,sans-serif;padding:24px;'>
225 + <h2 style='color:#b00;margin:0 0 8px'>Connection Error</h2>
226 + <div>URL: {url}</div>
227 + <div>Reason: {str(e)}</div>
228 + </body></html>
229 + """
230 + )
231 + await page.screenshot(path=str(screenshot_path), full_page=True)
232 + except Exception:
233 + pass
234 + print(f"[ERR] {progress_label}: error saved={screenshot_path}", flush=True)
235 + return "error", None, None, None, str(e)
236 + finally:
237 + try:
238 + await page.close()
239 + except Exception:
240 + pass
241 +
242 + async def process_target(ip: str, port: int, context, base_dir: Path, timeout_ms: int, sem: asyncio.Semaphore) -> List[Dict]:
243 + r"""
244 + For one (ip, port): try HTTP first; on failure/timeout, try HTTPS (ignore HTTPS cert errors).
245 + Always save screenshots per attempt under <ip>\_attempts\ and ensure final <ip>\<port>.png exists.
246 + Returns a list of 1–2 log records (one per attempt performed).
247 + """
248 + async with sem:
249 + # Jitter to reduce burstiness
250 + await asyncio.sleep(random.uniform(JITTER_MIN_S, JITTER_MAX_S))
251 + ts = datetime.now().isoformat(timespec="seconds")
252 +
253 + http_url = f"http://{ip}:{port}"
254 + https_url = f"https://{ip}:{port}"
255 +
256 + ip_dir = base_dir / ip
257 + final_screenshot = ip_dir / f"{port}.png"
258 + attempts_dir = ip_dir / "_attempts"
259 + attempts_dir.mkdir(parents=True, exist_ok=True)
260 + http_attempt_path = attempts_dir / f"{port}_http.png"
261 + https_attempt_path = attempts_dir / f"{port}_https.png"
262 +
263 + records: List[Dict] = []
264 +
265 + # Attempt HTTP first
266 + outcome, status, final_url, title, err_msg = await attempt_and_screenshot(
267 + context, http_url, timeout_ms, http_attempt_path, f"{ip}:{port} HTTP"
268 + )
269 + http_rec = {
270 + "timestamp": ts,
271 + "ip": ip,
272 + "port": port,
273 + "scheme_attempted": "http",
274 + "outcome": outcome,
275 + "http_status": status,
276 + "final_url": final_url,
277 + "page_title": title,
278 + "error_message": err_msg,
279 + }
280 + records.append(http_rec)
281 +
282 + if outcome == "success":
283 + # Promote HTTP attempt to final screenshot
284 + try:
285 + copyfile(http_attempt_path, final_screenshot)
286 + except Exception:
287 + pass
288 + print(f"[SAVE] {ip}:{port} -> {final_screenshot} (http)", flush=True)
289 + return records
290 +
291 + # HTTP failed -> Attempt HTTPS
292 + outcome2, status2, final_url2, title2, err_msg2 = await attempt_and_screenshot(
293 + context, https_url, timeout_ms, https_attempt_path, f"{ip}:{port} HTTPS"
294 + )
295 + https_rec = {
296 + "timestamp": ts,
297 + "ip": ip,
298 + "port": port,
299 + "scheme_attempted": "https",
300 + "outcome": outcome2,
301 + "http_status": status2,
302 + "final_url": final_url2,
303 + "page_title": title2,
304 + "error_message": err_msg2,
305 + }
306 + records.append(https_rec)
307 +
308 + # Promote HTTPS attempt to final if success; otherwise keep HTTPS error as final
309 + try:
310 + src = https_attempt_path if https_attempt_path.exists() else http_attempt_path
311 + copyfile(src, final_screenshot)
312 + except Exception:
313 + pass
314 + print(f"[SAVE] {ip}:{port} -> {final_screenshot} ({'https' if outcome2=='success' else 'error'})", flush=True)
315 +
316 + return records
317 +
318 + async def run(input_paths: List[Path], concurrency: int, timeout_ms: int, base_dir: Path) -> None:
319 + csv_path, json_path = prepare_logs(base_dir)
320 +
321 + pairs = parse_inputs(input_paths)
322 + total = len(pairs)
323 + if not pairs:
324 + print("No open TCP ports found in the provided Nmap files.")
325 + return
326 +
327 + ensure_ip_dirs(base_dir, pairs)
328 +
329 + print(f"Targets: {total} open TCP ports across {len(set(ip for ip,_ in pairs))} hosts; Concurrency={concurrency}; Timeout={timeout_ms}ms", flush=True)
330 +
331 + async with async_playwright() as pw:
332 + # 번들 브라우저가 없을 경우를 대비하여, 시스템 Chrome 사용으로 바꾸고 싶다면:
333 + # browser = await pw.chromium.launch(executable_path=r"C:\Program Files\Google\Chrome\Application\chrome.exe", headless=True)
334 + browser = await pw.chromium.launch(headless=True)
335 + context = await browser.new_context(ignore_https_errors=True)
336 +
337 + sem = asyncio.Semaphore(concurrency)
338 + tasks = [
339 + asyncio.create_task(process_target(ip, port, context, base_dir, timeout_ms, sem))
340 + for ip, port in pairs
341 + ]
342 +
343 + results: List[Dict] = []
344 + done = 0
345 + for coro in asyncio.as_completed(tasks):
346 + recs = await coro
347 + results.extend(recs)
348 + done += 1
349 + # Progress line using the first record
350 + ip = recs[0].get("ip")
351 + port = recs[0].get("port")
352 + summary = ("success" if any(r.get("outcome") == "success" for r in recs)
353 + else ("timeout" if any(r.get("outcome") == "timeout" for r in recs) else "error"))
354 + print(f"[PROG] {done}/{total} completed: {ip}:{port} => {summary}", flush=True)
355 +
356 + await context.close()
357 + await browser.close()
358 +
359 + # Write all attempt records
360 + write_logs(csv_path, json_path, results)
361 + print(f"[DONE] Logs written: {csv_path} and {json_path}", flush=True)
362 +
363 + def cleanup_final_images(base_dir: Path) -> int:
364 + r"""
365 + Remove top-level <port>.png files under each IP folder, keeping _attempts intact.
366 + Returns number of files deleted.
367 + """
368 + deleted = 0
369 + for ip_dir in base_dir.iterdir():
370 + try:
371 + if not ip_dir.is_dir():
372 + continue
373 + # Skip common system/profile directories when base_dir is a user profile root
374 + if ip_dir.name.lower() in {"application data", "appdata", "saved games", "documents", "downloads", "music", "pictures", "videos", "contacts", "links", "searches", "favorites", "onedrive"}:
375 + continue
376 + if ip_dir.name == "logs":
377 + continue
378 + attempts_dir = ip_dir / "_attempts"
379 + for item in ip_dir.iterdir():
380 + try:
381 + if item.is_dir():
382 + continue
383 + if item.suffix.lower() != ".png":
384 + continue
385 + # Only delete files named like "<digits>.png" (e.g., 80.png, 443.png)
386 + if re.fullmatch(r"\d{1,5}\.png", item.name):
387 + try:
388 + item.unlink()
389 + deleted += 1
390 + except Exception:
391 + pass
392 + except PermissionError:
393 + continue
394 + except PermissionError:
395 + continue
396 + return deleted
397 +
398 +
399 + def main():
400 + parser = argparse.ArgumentParser(description="Screenshot web UIs from Nmap results (XML or text). HTTP→HTTPS, Playwright.")
401 + parser.add_argument("inputs", nargs="*", help="Nmap output files (XML or normal text), e.g., scan1.xml scan2.xml scan3.txt")
402 + parser.add_argument("--concurrency", type=int, default=DEFAULT_CONCURRENCY, help="Max parallel targets (default: 6)")
403 + parser.add_argument("--timeout", type=int, default=DEFAULT_TIMEOUT_MS, help="Navigation timeout in ms (default: 10000)")
404 + parser.add_argument("--cleanup-final", action="store_true", help="Remove top-level <port>.png files, keep only _attempts images.")
405 + parser.add_argument("--base-dir", default=None, help="Directory where outputs/logs will be written (default: current working directory)")
406 + args = parser.parse_args()
407 +
408 + base_dir = Path(args.base_dir) if args.base_dir else Path.cwd()
409 +
410 + if not args.inputs and args.cleanup_final:
411 + deleted = cleanup_final_images(base_dir)
412 + print(f"[CLEANUP] Deleted {deleted} top-level port PNGs (kept _attempts)")
413 + return
414 +
415 + input_paths = [Path(p) for p in args.inputs]
416 + for p in input_paths:
417 + if not p.exists():
418 + raise FileNotFoundError(f"File not found: {p}")
419 +
420 + asyncio.run(run(input_paths, concurrency=args.concurrency, timeout_ms=args.timeout, base_dir=base_dir))
421 +
422 + if args.cleanup_final:
423 + deleted = cleanup_final_images(base_dir)
424 + print(f"[CLEANUP] Deleted {deleted} top-level port PNGs (kept _attempts)")
425 +
426 + if __name__ == "__main__":
427 + main()
Newer Older