Last active 4 months ago

Revision 9ebbbbdaf2a027c7eb5cec253ed0563c5604e76b

webshot2.py Raw
1import argparse
2import asyncio
3import csv
4import json
5import random
6import re
7from datetime import datetime
8from pathlib import Path
9import xml.etree.ElementTree as ET
10from typing import List, Tuple, Dict, Optional, Set
11from shutil import copyfile
12
13from playwright.async_api import async_playwright, TimeoutError as PlaywrightTimeoutError
14
15# Defaults/assumptions
16DEFAULT_CONCURRENCY = 6
17DEFAULT_TIMEOUT_MS = 10_000 # per navigation
18JITTER_MIN_S = 0.1
19JITTER_MAX_S = 0.3
20
21# 추가: 페이지 이동 후 스크린샷까지 대기 시간(ms)
22POST_NAV_WAIT_MS = 5_000
23
24CSV_FIELDS = [
25 "timestamp",
26 "ip",
27 "port",
28 "scheme_attempted",
29 "outcome", # success | timeout | error
30 "http_status",
31 "final_url",
32 "page_title",
33 "error_message",
34]
35
36def parse_nmap_xml(xml_path: Path, pairs: Set[Tuple[str, int]]) -> None:
37 """
38 Parse one Nmap XML file, updating 'pairs' set of (ip, port) for open TCP ports.
39 """
40 tree = ET.parse(xml_path)
41 root = tree.getroot()
42 for host in root.findall(".//host"):
43 ip = None
44 for addr in host.findall("address"):
45 if addr.get("addrtype") == "ipv4":
46 ip = addr.get("addr")
47 break
48 if not ip:
49 continue
50 for port_el in host.findall(".//ports/port"):
51 if port_el.get("protocol") != "tcp":
52 continue
53 state_el = port_el.find("state")
54 if state_el is None or state_el.get("state") != "open":
55 continue
56 portid = port_el.get("portid")
57 if not portid or not portid.isdigit():
58 continue
59 pairs.add((ip, int(portid)))
60
61def parse_nmap_text(txt_path: Path, pairs: Set[Tuple[str, int]]) -> None:
62 """
63 Parse Nmap normal text output and update 'pairs' set with (ip, open_tcp_port).
64 Handles host lines like:
65 - Nmap scan report for 103.139.84.64
66 - Nmap scan report for ems1.example.org (203.0.113.10)
67 And port lines under a host block like:
68 - 80/tcp open http
69 - 443/tcp open https
70 """
71 host_re = re.compile(r"^Nmap scan report for\s+(?P<host>.+?)(?:\s*\((?P<ip>\d+\.\d+\.\d+\.\d+)\))?\s*$")
72 open_port_re = re.compile(r"^(?P<port>\d{1,5})/tcp\s+open\b", re.IGNORECASE)
73 ip_lit_re = re.compile(r"^\d+\.\d+\.\d+\.\d+$")
74
75 current_ip: Optional[str] = None
76 in_ports_section = False
77
78 with txt_path.open("r", encoding="utf-8", errors="ignore") as f:
79 for raw in f:
80 line = raw.rstrip("\n")
81
82 m_host = host_re.match(line)
83 if m_host:
84 # New host starts
85 host_str = m_host.group("host").strip()
86 ip = m_host.group("ip")
87 if ip:
88 current_ip = ip
89 else:
90 current_ip = host_str if ip_lit_re.match(host_str) else None
91 in_ports_section = False
92 continue
93
94 if line.startswith("PORT") and "STATE" in line and "SERVICE" in line:
95 in_ports_section = True
96 continue
97
98 if not current_ip:
99 continue
100
101 if not line.strip():
102 in_ports_section = False
103 continue
104
105 if in_ports_section:
106 m_port = open_port_re.match(line.strip())
107 if m_port:
108 port = int(m_port.group("port"))
109 pairs.add((current_ip, port))
110
111
112def parse_inputs(paths: List[Path]) -> List[Tuple[str, int]]:
113 """
114 Parse a mix of XML and text Nmap outputs; return sorted unique (ip, port) list.
115 """
116 pairs: Set[Tuple[str, int]] = set()
117 for p in paths:
118 suffix = p.suffix.lower()
119 try:
120 if suffix == ".xml":
121 parse_nmap_xml(p, pairs)
122 else:
123 parse_nmap_text(p, pairs)
124 except Exception as e:
125 # Skip unreadable/invalid files; could log in the future
126 print(f"[WARN] Failed to parse {p}: {e}")
127 return sorted(pairs, key=lambda t: (t[0], t[1]))
128
129def ensure_ip_dirs(base_dir: Path, pairs: List[Tuple[str,int]]) -> None:
130 ips = sorted({ip for ip, _ in pairs})
131 for ip in ips:
132 (base_dir / ip).mkdir(parents=True, exist_ok=True)
133
134def prepare_logs(base_dir: Path) -> Tuple[Path, Path]:
135 logs_dir = base_dir / "logs"
136 logs_dir.mkdir(parents=True, exist_ok=True)
137 return logs_dir / "webshot.csv", logs_dir / "webshot.json"
138
139def write_logs(csv_path: Path, json_path: Path, records: List[Dict]) -> None:
140 # CSV: append with header if file is new
141 is_new_csv = not csv_path.exists()
142 with csv_path.open("a", newline="", encoding="utf-8") as fcsv:
143 writer = csv.DictWriter(fcsv, fieldnames=CSV_FIELDS)
144 if is_new_csv:
145 writer.writeheader()
146 for rec in records:
147 writer.writerow(rec)
148 # JSON: NDJSON (one object per line)
149 with json_path.open("a", encoding="utf-8") as fjson:
150 for rec in records:
151 fjson.write(json.dumps(rec, ensure_ascii=False) + "\n")
152
153async def attempt_and_screenshot(context, url: str, timeout_ms: int, screenshot_path: Path, progress_label: str) -> Tuple[str, Optional[int], Optional[str], Optional[str], Optional[str]]:
154 r"""
155 Try to navigate to the given URL and always save a screenshot to screenshot_path.
156 Returns a tuple: (outcome, status, final_url, title, error_message)
157 - outcome: "success" | "timeout" | "error"
158 - status: HTTP status code if any
159 - final_url: the page.url after navigation (if any)
160 - title: page title if retrievable
161 - error_message: string on failure
162
163 Note: Always attempts to wait an extra POST_NAV_WAIT_MS (default 5000 ms)
164 before taking a screenshot, and also tries network idle state if possible.
165 """
166 page = await context.new_page()
167 print(f"[START] {progress_label}: {url}", flush=True)
168 try:
169 resp = await page.goto(url, wait_until="domcontentloaded", timeout=timeout_ms)
170 status = resp.status if resp else None
171 final_url = page.url
172 title = None
173
174 # 추가 대기: 네트워크 안정화 시도 후, 고정 5초 대기
175 try:
176 # networkidle은 모든 네트워크가 잠잠해질 때까지 대기(환경에 따라 오래 걸릴 수 있어 Optional)
177 await page.wait_for_load_state("networkidle", timeout=timeout_ms)
178 except Exception:
179 # networkidle 실패/시간초과는 무시하고 고정 대기만 수행
180 pass
181
182 await page.wait_for_timeout(POST_NAV_WAIT_MS)
183
184 try:
185 title = await page.title()
186 except Exception:
187 title = None
188
189 # Save screenshot of the loaded page (any status is fine)
190 screenshot_path.parent.mkdir(parents=True, exist_ok=True)
191 await page.screenshot(path=str(screenshot_path), full_page=True)
192 print(f"[OK] {progress_label}: status={status} saved={screenshot_path}", flush=True)
193 return "success", status, final_url, title, None
194 except PlaywrightTimeoutError as e:
195 # Try to capture whatever is visible; if not, render an error HTML and screenshot it
196 try:
197 screenshot_path.parent.mkdir(parents=True, exist_ok=True)
198 await page.screenshot(path=str(screenshot_path), full_page=True)
199 except Exception:
200 try:
201 await page.set_content(
202 f"""
203 <html><body style='font-family:Segoe UI,Tahoma,Arial,sans-serif;padding:24px;'>
204 <h2 style='color:#b00;margin:0 0 8px'>Timeout</h2>
205 <div>URL: {url}</div>
206 <div>Reason: {str(e)}</div>
207 </body></html>
208 """
209 )
210 await page.screenshot(path=str(screenshot_path), full_page=True)
211 except Exception:
212 pass
213 print(f"[TIME] {progress_label}: timeout saved={screenshot_path}", flush=True)
214 return "timeout", None, None, None, str(e)
215 except Exception as e:
216 # Generic error: same fallback to ensure a screenshot
217 try:
218 screenshot_path.parent.mkdir(parents=True, exist_ok=True)
219 await page.screenshot(path=str(screenshot_path), full_page=True)
220 except Exception:
221 try:
222 await page.set_content(
223 f"""
224 <html><body style='font-family:Segoe UI,Tahoma,Arial,sans-serif;padding:24px;'>
225 <h2 style='color:#b00;margin:0 0 8px'>Connection Error</h2>
226 <div>URL: {url}</div>
227 <div>Reason: {str(e)}</div>
228 </body></html>
229 """
230 )
231 await page.screenshot(path=str(screenshot_path), full_page=True)
232 except Exception:
233 pass
234 print(f"[ERR] {progress_label}: error saved={screenshot_path}", flush=True)
235 return "error", None, None, None, str(e)
236 finally:
237 try:
238 await page.close()
239 except Exception:
240 pass
241
242async def process_target(ip: str, port: int, context, base_dir: Path, timeout_ms: int, sem: asyncio.Semaphore) -> List[Dict]:
243 r"""
244 For one (ip, port): try HTTP first; on failure/timeout, try HTTPS (ignore HTTPS cert errors).
245 Always save screenshots per attempt under <ip>\_attempts\ and ensure final <ip>\<port>.png exists.
246 Returns a list of 1–2 log records (one per attempt performed).
247 """
248 async with sem:
249 # Jitter to reduce burstiness
250 await asyncio.sleep(random.uniform(JITTER_MIN_S, JITTER_MAX_S))
251 ts = datetime.now().isoformat(timespec="seconds")
252
253 http_url = f"http://{ip}:{port}"
254 https_url = f"https://{ip}:{port}"
255
256 ip_dir = base_dir / ip
257 final_screenshot = ip_dir / f"{port}.png"
258 attempts_dir = ip_dir / "_attempts"
259 attempts_dir.mkdir(parents=True, exist_ok=True)
260 http_attempt_path = attempts_dir / f"{port}_http.png"
261 https_attempt_path = attempts_dir / f"{port}_https.png"
262
263 records: List[Dict] = []
264
265 # Attempt HTTP first
266 outcome, status, final_url, title, err_msg = await attempt_and_screenshot(
267 context, http_url, timeout_ms, http_attempt_path, f"{ip}:{port} HTTP"
268 )
269 http_rec = {
270 "timestamp": ts,
271 "ip": ip,
272 "port": port,
273 "scheme_attempted": "http",
274 "outcome": outcome,
275 "http_status": status,
276 "final_url": final_url,
277 "page_title": title,
278 "error_message": err_msg,
279 }
280 records.append(http_rec)
281
282 if outcome == "success":
283 # Promote HTTP attempt to final screenshot
284 try:
285 copyfile(http_attempt_path, final_screenshot)
286 except Exception:
287 pass
288 print(f"[SAVE] {ip}:{port} -> {final_screenshot} (http)", flush=True)
289 return records
290
291 # HTTP failed -> Attempt HTTPS
292 outcome2, status2, final_url2, title2, err_msg2 = await attempt_and_screenshot(
293 context, https_url, timeout_ms, https_attempt_path, f"{ip}:{port} HTTPS"
294 )
295 https_rec = {
296 "timestamp": ts,
297 "ip": ip,
298 "port": port,
299 "scheme_attempted": "https",
300 "outcome": outcome2,
301 "http_status": status2,
302 "final_url": final_url2,
303 "page_title": title2,
304 "error_message": err_msg2,
305 }
306 records.append(https_rec)
307
308 # Promote HTTPS attempt to final if success; otherwise keep HTTPS error as final
309 try:
310 src = https_attempt_path if https_attempt_path.exists() else http_attempt_path
311 copyfile(src, final_screenshot)
312 except Exception:
313 pass
314 print(f"[SAVE] {ip}:{port} -> {final_screenshot} ({'https' if outcome2=='success' else 'error'})", flush=True)
315
316 return records
317
318async def run(input_paths: List[Path], concurrency: int, timeout_ms: int, base_dir: Path) -> None:
319 csv_path, json_path = prepare_logs(base_dir)
320
321 pairs = parse_inputs(input_paths)
322 total = len(pairs)
323 if not pairs:
324 print("No open TCP ports found in the provided Nmap files.")
325 return
326
327 ensure_ip_dirs(base_dir, pairs)
328
329 print(f"Targets: {total} open TCP ports across {len(set(ip for ip,_ in pairs))} hosts; Concurrency={concurrency}; Timeout={timeout_ms}ms", flush=True)
330
331 async with async_playwright() as pw:
332 # 번들 브라우저가 없을 경우를 대비하여, 시스템 Chrome 사용으로 바꾸고 싶다면:
333 # browser = await pw.chromium.launch(executable_path=r"C:\Program Files\Google\Chrome\Application\chrome.exe", headless=True)
334 browser = await pw.chromium.launch(headless=True)
335 context = await browser.new_context(ignore_https_errors=True)
336
337 sem = asyncio.Semaphore(concurrency)
338 tasks = [
339 asyncio.create_task(process_target(ip, port, context, base_dir, timeout_ms, sem))
340 for ip, port in pairs
341 ]
342
343 results: List[Dict] = []
344 done = 0
345 for coro in asyncio.as_completed(tasks):
346 recs = await coro
347 results.extend(recs)
348 done += 1
349 # Progress line using the first record
350 ip = recs[0].get("ip")
351 port = recs[0].get("port")
352 summary = ("success" if any(r.get("outcome") == "success" for r in recs)
353 else ("timeout" if any(r.get("outcome") == "timeout" for r in recs) else "error"))
354 print(f"[PROG] {done}/{total} completed: {ip}:{port} => {summary}", flush=True)
355
356 await context.close()
357 await browser.close()
358
359 # Write all attempt records
360 write_logs(csv_path, json_path, results)
361 print(f"[DONE] Logs written: {csv_path} and {json_path}", flush=True)
362
363def cleanup_final_images(base_dir: Path) -> int:
364 r"""
365 Remove top-level <port>.png files under each IP folder, keeping _attempts intact.
366 Returns number of files deleted.
367 """
368 deleted = 0
369 for ip_dir in base_dir.iterdir():
370 try:
371 if not ip_dir.is_dir():
372 continue
373 # Skip common system/profile directories when base_dir is a user profile root
374 if ip_dir.name.lower() in {"application data", "appdata", "saved games", "documents", "downloads", "music", "pictures", "videos", "contacts", "links", "searches", "favorites", "onedrive"}:
375 continue
376 if ip_dir.name == "logs":
377 continue
378 attempts_dir = ip_dir / "_attempts"
379 for item in ip_dir.iterdir():
380 try:
381 if item.is_dir():
382 continue
383 if item.suffix.lower() != ".png":
384 continue
385 # Only delete files named like "<digits>.png" (e.g., 80.png, 443.png)
386 if re.fullmatch(r"\d{1,5}\.png", item.name):
387 try:
388 item.unlink()
389 deleted += 1
390 except Exception:
391 pass
392 except PermissionError:
393 continue
394 except PermissionError:
395 continue
396 return deleted
397
398
399def main():
400 parser = argparse.ArgumentParser(description="Screenshot web UIs from Nmap results (XML or text). HTTP→HTTPS, Playwright.")
401 parser.add_argument("inputs", nargs="*", help="Nmap output files (XML or normal text), e.g., scan1.xml scan2.xml scan3.txt")
402 parser.add_argument("--concurrency", type=int, default=DEFAULT_CONCURRENCY, help="Max parallel targets (default: 6)")
403 parser.add_argument("--timeout", type=int, default=DEFAULT_TIMEOUT_MS, help="Navigation timeout in ms (default: 10000)")
404 parser.add_argument("--cleanup-final", action="store_true", help="Remove top-level <port>.png files, keep only _attempts images.")
405 parser.add_argument("--base-dir", default=None, help="Directory where outputs/logs will be written (default: current working directory)")
406 args = parser.parse_args()
407
408 base_dir = Path(args.base_dir) if args.base_dir else Path.cwd()
409
410 if not args.inputs and args.cleanup_final:
411 deleted = cleanup_final_images(base_dir)
412 print(f"[CLEANUP] Deleted {deleted} top-level port PNGs (kept _attempts)")
413 return
414
415 input_paths = [Path(p) for p in args.inputs]
416 for p in input_paths:
417 if not p.exists():
418 raise FileNotFoundError(f"File not found: {p}")
419
420 asyncio.run(run(input_paths, concurrency=args.concurrency, timeout_ms=args.timeout, base_dir=base_dir))
421
422 if args.cleanup_final:
423 deleted = cleanup_final_images(base_dir)
424 print(f"[CLEANUP] Deleted {deleted} top-level port PNGs (kept _attempts)")
425
426if __name__ == "__main__":
427 main()
428