Compare commits
10 Commits
97b9bd2077
...
21eb89ab8f
| Author | SHA1 | Date | |
|---|---|---|---|
| 21eb89ab8f | |||
| 47bd93597d | |||
| e3d483422d | |||
| bfeb765610 | |||
| ef688491ab | |||
| 163a23d989 | |||
| 625af4c9a7 | |||
| 6992b8465a | |||
| 245abcea89 | |||
| 36bc937842 |
@@ -0,0 +1,82 @@
|
||||
import uuid
|
||||
|
||||
from fastapi import BackgroundTasks, FastAPI, HTTPException
|
||||
from fastapi.responses import JSONResponse
|
||||
from fastapi.staticfiles import StaticFiles
|
||||
from pydantic import BaseModel, HttpUrl
|
||||
|
||||
from scanner.engine import clear_scans, get_scan, list_scans, register_scan, run_scan
|
||||
from scanner.models import ScanJob
|
||||
|
||||
app = FastAPI(title="InfoLeak Scanner", version="1.0.0")
|
||||
|
||||
|
||||
class ScanRequest(BaseModel):
|
||||
url: HttpUrl
|
||||
modules: list[str] = ["paths", "headers", "secrets", "directory"]
|
||||
|
||||
|
||||
def _serialize(job: ScanJob) -> dict:
|
||||
return {
|
||||
"id": job.id,
|
||||
"target_url": job.target_url,
|
||||
"status": job.status.value,
|
||||
"findings": [
|
||||
{
|
||||
"severity": f.severity.value,
|
||||
"type": f.type,
|
||||
"url": f.url,
|
||||
"evidence": f.evidence,
|
||||
"module": f.module,
|
||||
}
|
||||
for f in job.findings
|
||||
],
|
||||
"progress": job.progress,
|
||||
"total": job.total,
|
||||
"error": job.error,
|
||||
"started_at": job.started_at,
|
||||
"completed_at": job.completed_at,
|
||||
}
|
||||
|
||||
|
||||
@app.post("/scan")
|
||||
async def start_scan(request: ScanRequest, background_tasks: BackgroundTasks):
|
||||
job = ScanJob(id=str(uuid.uuid4()), target_url=str(request.url))
|
||||
register_scan(job)
|
||||
background_tasks.add_task(run_scan, job, request.modules)
|
||||
return {"scan_id": job.id}
|
||||
|
||||
|
||||
@app.get("/scan/{scan_id}/export")
|
||||
async def export_scan(scan_id: str):
|
||||
job = get_scan(scan_id)
|
||||
if not job:
|
||||
raise HTTPException(status_code=404, detail="Scan not found")
|
||||
return JSONResponse(
|
||||
content=_serialize(job),
|
||||
headers={"Content-Disposition": f"attachment; filename=scan-{scan_id[:8]}.json"},
|
||||
)
|
||||
|
||||
|
||||
@app.get("/scan/{scan_id}")
|
||||
async def get_scan_status(scan_id: str):
|
||||
job = get_scan(scan_id)
|
||||
if not job:
|
||||
raise HTTPException(status_code=404, detail="Scan not found")
|
||||
return _serialize(job)
|
||||
|
||||
|
||||
@app.get("/scans")
|
||||
async def list_all_scans():
|
||||
return [
|
||||
{
|
||||
"id": j.id,
|
||||
"target_url": j.target_url,
|
||||
"status": j.status.value,
|
||||
"findings_count": len(j.findings),
|
||||
}
|
||||
for j in list_scans()
|
||||
]
|
||||
|
||||
|
||||
app.mount("/", StaticFiles(directory="static", html=True), name="static")
|
||||
@@ -0,0 +1,31 @@
|
||||
import re
|
||||
|
||||
import httpx
|
||||
|
||||
from .models import Finding, Severity
|
||||
|
||||
_PATTERNS = [
|
||||
re.compile(r'<title>Index of /', re.IGNORECASE),
|
||||
re.compile(r'Parent Directory</a>', re.IGNORECASE),
|
||||
re.compile(r'<pre>.*?<a href="\.\.">', re.IGNORECASE | re.DOTALL),
|
||||
]
|
||||
|
||||
|
||||
class DirectoryListingDetector:
|
||||
async def detect(self, client: httpx.AsyncClient, url: str) -> list[Finding]:
|
||||
try:
|
||||
response = await client.get(url)
|
||||
except (httpx.ConnectError, httpx.TimeoutException, httpx.RemoteProtocolError):
|
||||
return []
|
||||
|
||||
body = response.text
|
||||
for pattern in _PATTERNS:
|
||||
if pattern.search(body):
|
||||
return [Finding(
|
||||
severity=Severity.MEDIUM,
|
||||
type="directory_listing",
|
||||
url=url,
|
||||
evidence="Directory listing is enabled",
|
||||
module="directory_listing",
|
||||
)]
|
||||
return []
|
||||
@@ -0,0 +1,69 @@
|
||||
import asyncio
|
||||
import time
|
||||
|
||||
import httpx
|
||||
|
||||
from .directory_listing import DirectoryListingDetector
|
||||
from .header_analyzer import HeaderAnalyzer
|
||||
from .models import ScanJob, ScanStatus
|
||||
from .path_prober import PathProber
|
||||
from .response_inspector import ResponseInspector
|
||||
|
||||
_scans: dict[str, ScanJob] = {}
|
||||
|
||||
|
||||
def register_scan(job: ScanJob) -> None:
|
||||
_scans[job.id] = job
|
||||
|
||||
|
||||
def get_scan(scan_id: str) -> ScanJob | None:
|
||||
return _scans.get(scan_id)
|
||||
|
||||
|
||||
def list_scans() -> list[ScanJob]:
|
||||
return list(_scans.values())
|
||||
|
||||
|
||||
def clear_scans() -> None:
|
||||
_scans.clear()
|
||||
|
||||
|
||||
async def run_scan(job: ScanJob, modules: list[str]) -> None:
|
||||
job.status = ScanStatus.RUNNING
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=5.0, follow_redirects=True) as client:
|
||||
if "headers" in modules:
|
||||
findings = await HeaderAnalyzer().analyze(client, job.target_url)
|
||||
job.findings.extend(findings)
|
||||
|
||||
if "secrets" in modules:
|
||||
findings = await ResponseInspector().inspect(client, job.target_url)
|
||||
job.findings.extend(findings)
|
||||
|
||||
if "directory" in modules:
|
||||
findings = await DirectoryListingDetector().detect(client, job.target_url)
|
||||
job.findings.extend(findings)
|
||||
|
||||
if "paths" in modules:
|
||||
prober = PathProber()
|
||||
paths = prober.load_wordlist()
|
||||
job.total = len(paths)
|
||||
semaphore = asyncio.Semaphore(10)
|
||||
|
||||
async def probe(path: str) -> None:
|
||||
async with semaphore:
|
||||
try:
|
||||
found = await prober.probe(client, job.target_url, path)
|
||||
job.findings.extend(found)
|
||||
except Exception:
|
||||
pass
|
||||
job.progress += 1
|
||||
|
||||
await asyncio.gather(*[probe(p) for p in paths])
|
||||
|
||||
job.status = ScanStatus.COMPLETED
|
||||
job.completed_at = time.time()
|
||||
|
||||
except Exception as exc:
|
||||
job.status = ScanStatus.ERROR
|
||||
job.error = str(exc)
|
||||
@@ -0,0 +1,51 @@
|
||||
import httpx
|
||||
|
||||
from .models import Finding, Severity
|
||||
|
||||
_VERSION_HEADERS = [
|
||||
"server",
|
||||
"x-powered-by",
|
||||
"x-aspnet-version",
|
||||
"x-aspnetmvc-version",
|
||||
"x-generator",
|
||||
]
|
||||
|
||||
_SECURITY_HEADERS = [
|
||||
"X-Frame-Options",
|
||||
"Content-Security-Policy",
|
||||
"X-Content-Type-Options",
|
||||
"Strict-Transport-Security",
|
||||
]
|
||||
|
||||
|
||||
class HeaderAnalyzer:
|
||||
async def analyze(self, client: httpx.AsyncClient, url: str) -> list[Finding]:
|
||||
try:
|
||||
response = await client.get(url)
|
||||
except (httpx.ConnectError, httpx.TimeoutException, httpx.RemoteProtocolError):
|
||||
return []
|
||||
|
||||
findings: list[Finding] = []
|
||||
headers = response.headers
|
||||
|
||||
for header in _VERSION_HEADERS:
|
||||
if header in headers:
|
||||
findings.append(Finding(
|
||||
severity=Severity.LOW,
|
||||
type="version_disclosure",
|
||||
url=url,
|
||||
evidence=f"{header.title()}: {headers[header]}",
|
||||
module="header_analyzer",
|
||||
))
|
||||
|
||||
for header in _SECURITY_HEADERS:
|
||||
if header.lower() not in headers:
|
||||
findings.append(Finding(
|
||||
severity=Severity.INFO,
|
||||
type="missing_security_header",
|
||||
url=url,
|
||||
evidence=f"Missing header: {header}",
|
||||
module="header_analyzer",
|
||||
))
|
||||
|
||||
return findings
|
||||
@@ -34,7 +34,7 @@ class PathProber:
|
||||
|
||||
severity = self._severity(path)
|
||||
if response.status_code == 200:
|
||||
snippet = response.text[:200].replace("\n", " ")
|
||||
snippet = response.content[:512].decode("utf-8", errors="replace")[:200].replace("\n", " ")
|
||||
evidence = f"HTTP 200 — {snippet}" if snippet else "HTTP 200"
|
||||
else:
|
||||
evidence = "HTTP 403 (resource exists but forbidden)"
|
||||
|
||||
@@ -0,0 +1,46 @@
|
||||
import re
|
||||
|
||||
import httpx
|
||||
|
||||
from .models import Finding, Severity
|
||||
|
||||
_PATTERNS: list[tuple[Severity, str, re.Pattern]] = [
|
||||
(Severity.CRITICAL, "aws_access_key",
|
||||
re.compile(r'AKIA[0-9A-Z]{16}')),
|
||||
(Severity.CRITICAL, "private_key",
|
||||
re.compile(r'-----BEGIN (?:RSA |EC |DSA )?PRIVATE KEY-----')),
|
||||
(Severity.HIGH, "password_in_comment",
|
||||
re.compile(r'<!--.*?(?:password|passwd|pwd)\s*[=:]\s*\S+.*?-->', re.IGNORECASE | re.DOTALL)),
|
||||
(Severity.HIGH, "php_stack_trace",
|
||||
re.compile(r'(?:Fatal error|Warning|Parse error).*?on line \d+', re.IGNORECASE)),
|
||||
(Severity.HIGH, "python_traceback",
|
||||
re.compile(r'Traceback \(most recent call last\)')),
|
||||
(Severity.HIGH, "java_exception",
|
||||
re.compile(r'at [a-zA-Z_$][a-zA-Z0-9_$]*(?:\.[a-zA-Z_$][a-zA-Z0-9_$]*)+\(.*?\.java:\d+\)')),
|
||||
(Severity.MEDIUM, "generic_api_key",
|
||||
re.compile(r'["\']?api[_-]?key["\']?\s*[:=]\s*["\']?([A-Za-z0-9_\-]{20,})', re.IGNORECASE)),
|
||||
]
|
||||
|
||||
|
||||
class ResponseInspector:
|
||||
async def inspect(self, client: httpx.AsyncClient, url: str) -> list[Finding]:
|
||||
try:
|
||||
response = await client.get(url)
|
||||
except (httpx.ConnectError, httpx.TimeoutException, httpx.RemoteProtocolError):
|
||||
return []
|
||||
|
||||
body = response.text[:524288]
|
||||
findings: list[Finding] = []
|
||||
|
||||
for severity, finding_type, pattern in _PATTERNS:
|
||||
match = pattern.search(body)
|
||||
if match:
|
||||
findings.append(Finding(
|
||||
severity=severity,
|
||||
type=finding_type,
|
||||
url=url,
|
||||
evidence=match.group(0)[:200],
|
||||
module="response_inspector",
|
||||
))
|
||||
|
||||
return findings
|
||||
+146
@@ -0,0 +1,146 @@
|
||||
const API = '';
|
||||
|
||||
const views = {
|
||||
starter: document.getElementById('view-starter'),
|
||||
scanning: document.getElementById('view-scanning'),
|
||||
results: document.getElementById('view-results'),
|
||||
};
|
||||
|
||||
function showView(name) {
|
||||
Object.values(views).forEach(v => v.classList.add('hidden'));
|
||||
views[name].classList.remove('hidden');
|
||||
}
|
||||
|
||||
// ── Findings rendering ────────────────────────────────────────────────────────
|
||||
|
||||
function renderFinding(f) {
|
||||
const el = document.createElement('div');
|
||||
el.className = 'finding';
|
||||
el.dataset.severity = f.severity;
|
||||
el.innerHTML = `
|
||||
<div class="finding-header">
|
||||
<span class="badge badge-${f.severity}">${f.severity}</span>
|
||||
<span class="finding-type">${escapeHtml(f.type).replace(/_/g, ' ')}</span>
|
||||
</div>
|
||||
<div class="finding-url">${escapeHtml(f.url)}</div>
|
||||
<div class="finding-evidence">${escapeHtml(f.evidence)}</div>
|
||||
`;
|
||||
el.addEventListener('click', () => el.classList.toggle('open'));
|
||||
return el;
|
||||
}
|
||||
|
||||
function escapeHtml(str) {
|
||||
return str.replace(/&/g,'&').replace(/</g,'<').replace(/>/g,'>')
|
||||
.replace(/"/g,'"').replace(/'/g,''');
|
||||
}
|
||||
|
||||
// ── Scan flow ─────────────────────────────────────────────────────────────────
|
||||
|
||||
let currentScanId = null;
|
||||
let pollTimer = null;
|
||||
|
||||
async function startScan(url, modules) {
|
||||
const res = await fetch(`${API}/scan`, {
|
||||
method: 'POST',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
body: JSON.stringify({ url, modules }),
|
||||
});
|
||||
if (!res.ok) throw new Error(await res.text());
|
||||
const { scan_id } = await res.json();
|
||||
return scan_id;
|
||||
}
|
||||
|
||||
function startPolling(scanId) {
|
||||
clearInterval(pollTimer);
|
||||
pollTimer = setInterval(() => pollScan(scanId), 2000);
|
||||
}
|
||||
|
||||
async function pollScan(scanId) {
|
||||
const res = await fetch(`${API}/scan/${scanId}`);
|
||||
if (!res.ok) return;
|
||||
const job = await res.json();
|
||||
|
||||
updateLiveView(job);
|
||||
|
||||
if (job.status === 'completed' || job.status === 'error') {
|
||||
clearInterval(pollTimer);
|
||||
showResults(job);
|
||||
}
|
||||
}
|
||||
|
||||
function updateLiveView(job) {
|
||||
const fill = document.getElementById('progress-fill');
|
||||
const text = document.getElementById('progress-text');
|
||||
const list = document.getElementById('live-findings');
|
||||
|
||||
if (job.total > 0) {
|
||||
const pct = Math.round((job.progress / job.total) * 100);
|
||||
fill.style.width = `${pct}%`;
|
||||
text.textContent = `${job.progress} / ${job.total} paths checked`;
|
||||
} else {
|
||||
fill.style.width = '100%';
|
||||
text.textContent = job.status === 'running' ? 'Scanning…' : job.status;
|
||||
}
|
||||
|
||||
const existing = list.children.length;
|
||||
job.findings.slice(existing).forEach(f => {
|
||||
list.prepend(renderFinding(f));
|
||||
});
|
||||
}
|
||||
|
||||
function showResults(job) {
|
||||
const summary = document.getElementById('results-summary');
|
||||
const list = document.getElementById('results-findings');
|
||||
|
||||
summary.textContent = `${job.findings.length} finding${job.findings.length !== 1 ? 's' : ''} — ${job.target_url}`;
|
||||
list.innerHTML = '';
|
||||
|
||||
const sorted = [...job.findings].sort((a, b) => {
|
||||
const order = { critical: 0, high: 1, medium: 2, low: 3, info: 4 };
|
||||
return (order[a.severity] ?? 5) - (order[b.severity] ?? 5);
|
||||
});
|
||||
|
||||
sorted.forEach(f => list.appendChild(renderFinding(f)));
|
||||
showView('results');
|
||||
}
|
||||
|
||||
// ── Event listeners ───────────────────────────────────────────────────────────
|
||||
|
||||
document.getElementById('scan-form').addEventListener('submit', async (e) => {
|
||||
e.preventDefault();
|
||||
const url = document.getElementById('url-input').value;
|
||||
const modules = [...document.querySelectorAll('input[name="modules"]:checked')]
|
||||
.map(el => el.value);
|
||||
|
||||
document.getElementById('scan-target').textContent = `Scanning ${url}`;
|
||||
document.getElementById('progress-fill').style.width = '0%';
|
||||
document.getElementById('progress-text').textContent = 'Starting…';
|
||||
document.getElementById('live-findings').innerHTML = '';
|
||||
showView('scanning');
|
||||
|
||||
try {
|
||||
currentScanId = await startScan(url, modules);
|
||||
startPolling(currentScanId);
|
||||
} catch (err) {
|
||||
alert(`Failed to start scan: ${err.message}`);
|
||||
showView('starter');
|
||||
}
|
||||
});
|
||||
|
||||
document.getElementById('new-scan-btn').addEventListener('click', () => {
|
||||
clearInterval(pollTimer);
|
||||
showView('starter');
|
||||
});
|
||||
|
||||
document.getElementById('export-btn').addEventListener('click', () => {
|
||||
if (currentScanId) {
|
||||
window.location.href = `${API}/scan/${currentScanId}/export`;
|
||||
}
|
||||
});
|
||||
|
||||
document.getElementById('filter-severity').addEventListener('change', (e) => {
|
||||
const value = e.target.value;
|
||||
document.querySelectorAll('#results-findings .finding').forEach(el => {
|
||||
el.style.display = (!value || el.dataset.severity === value) ? '' : 'none';
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,69 @@
|
||||
<!DOCTYPE html>
|
||||
<html lang="en">
|
||||
<head>
|
||||
<meta charset="UTF-8" />
|
||||
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
|
||||
<title>InfoLeak Scanner</title>
|
||||
<link rel="stylesheet" href="/style.css" />
|
||||
</head>
|
||||
<body>
|
||||
<header>
|
||||
<h1>InfoLeak Scanner</h1>
|
||||
<p class="subtitle">Web Application Information Leak Detector</p>
|
||||
</header>
|
||||
|
||||
<main>
|
||||
<!-- View: Starter -->
|
||||
<section id="view-starter">
|
||||
<form id="scan-form">
|
||||
<div class="field">
|
||||
<label for="url-input">Target URL</label>
|
||||
<input id="url-input" type="url" placeholder="https://example.com" required />
|
||||
</div>
|
||||
<div class="field">
|
||||
<label>Modules</label>
|
||||
<div class="checkboxes">
|
||||
<label><input type="checkbox" name="modules" value="paths" checked /> Path Prober</label>
|
||||
<label><input type="checkbox" name="modules" value="headers" checked /> Header Analyzer</label>
|
||||
<label><input type="checkbox" name="modules" value="secrets" checked /> Response Inspector</label>
|
||||
<label><input type="checkbox" name="modules" value="directory" checked /> Directory Listing</label>
|
||||
</div>
|
||||
</div>
|
||||
<button type="submit">Start Scan</button>
|
||||
</form>
|
||||
</section>
|
||||
|
||||
<!-- View: Scanning -->
|
||||
<section id="view-scanning" class="hidden">
|
||||
<div class="scan-status">
|
||||
<span id="scan-target"></span>
|
||||
<div class="progress-bar"><div id="progress-fill"></div></div>
|
||||
<span id="progress-text">Initializing...</span>
|
||||
</div>
|
||||
<div id="live-findings" class="findings-list"></div>
|
||||
</section>
|
||||
|
||||
<!-- View: Results -->
|
||||
<section id="view-results" class="hidden">
|
||||
<div class="results-header">
|
||||
<span id="results-summary"></span>
|
||||
<div class="results-actions">
|
||||
<select id="filter-severity">
|
||||
<option value="">All severities</option>
|
||||
<option value="critical">Critical</option>
|
||||
<option value="high">High</option>
|
||||
<option value="medium">Medium</option>
|
||||
<option value="low">Low</option>
|
||||
<option value="info">Info</option>
|
||||
</select>
|
||||
<button id="export-btn">Export JSON</button>
|
||||
<button id="new-scan-btn">New Scan</button>
|
||||
</div>
|
||||
</div>
|
||||
<div id="results-findings" class="findings-list"></div>
|
||||
</section>
|
||||
</main>
|
||||
|
||||
<script src="/app.js"></script>
|
||||
</body>
|
||||
</html>
|
||||
@@ -0,0 +1,172 @@
|
||||
*, *::before, *::after { box-sizing: border-box; margin: 0; padding: 0; }
|
||||
|
||||
:root {
|
||||
--bg: #0f1117;
|
||||
--surface: #1a1d27;
|
||||
--border: #2a2d3a;
|
||||
--text: #e2e8f0;
|
||||
--muted: #6b7280;
|
||||
--accent: #6366f1;
|
||||
--critical: #ef4444;
|
||||
--high: #f97316;
|
||||
--medium: #eab308;
|
||||
--low: #3b82f6;
|
||||
--info: #6b7280;
|
||||
}
|
||||
|
||||
body {
|
||||
background: var(--bg);
|
||||
color: var(--text);
|
||||
font-family: 'Segoe UI', system-ui, sans-serif;
|
||||
min-height: 100vh;
|
||||
padding: 2rem;
|
||||
}
|
||||
|
||||
header {
|
||||
margin-bottom: 2rem;
|
||||
}
|
||||
|
||||
h1 {
|
||||
font-size: 1.75rem;
|
||||
font-weight: 700;
|
||||
color: var(--accent);
|
||||
}
|
||||
|
||||
.subtitle { color: var(--muted); margin-top: 0.25rem; font-size: 0.9rem; }
|
||||
|
||||
.hidden { display: none !important; }
|
||||
|
||||
/* Starter form */
|
||||
#scan-form {
|
||||
background: var(--surface);
|
||||
border: 1px solid var(--border);
|
||||
border-radius: 0.75rem;
|
||||
padding: 2rem;
|
||||
max-width: 640px;
|
||||
}
|
||||
|
||||
.field { margin-bottom: 1.25rem; }
|
||||
.field label { display: block; margin-bottom: 0.5rem; font-size: 0.875rem; color: var(--muted); }
|
||||
|
||||
input[type="url"] {
|
||||
width: 100%;
|
||||
padding: 0.625rem 0.875rem;
|
||||
background: var(--bg);
|
||||
border: 1px solid var(--border);
|
||||
border-radius: 0.5rem;
|
||||
color: var(--text);
|
||||
font-size: 1rem;
|
||||
}
|
||||
|
||||
input[type="url"]:focus { outline: none; border-color: var(--accent); }
|
||||
|
||||
.checkboxes { display: flex; gap: 1rem; flex-wrap: wrap; }
|
||||
.checkboxes label { display: flex; align-items: center; gap: 0.4rem; font-size: 0.875rem; cursor: pointer; }
|
||||
|
||||
button {
|
||||
padding: 0.625rem 1.25rem;
|
||||
background: var(--accent);
|
||||
color: white;
|
||||
border: none;
|
||||
border-radius: 0.5rem;
|
||||
font-size: 0.9rem;
|
||||
cursor: pointer;
|
||||
transition: opacity 0.15s;
|
||||
}
|
||||
|
||||
button:hover { opacity: 0.85; }
|
||||
button:disabled { opacity: 0.4; cursor: not-allowed; }
|
||||
|
||||
/* Scanning view */
|
||||
.scan-status {
|
||||
background: var(--surface);
|
||||
border: 1px solid var(--border);
|
||||
border-radius: 0.75rem;
|
||||
padding: 1.5rem;
|
||||
margin-bottom: 1.5rem;
|
||||
max-width: 640px;
|
||||
}
|
||||
|
||||
#scan-target { font-size: 0.875rem; color: var(--muted); display: block; margin-bottom: 0.75rem; }
|
||||
|
||||
.progress-bar {
|
||||
background: var(--border);
|
||||
border-radius: 999px;
|
||||
height: 6px;
|
||||
margin-bottom: 0.5rem;
|
||||
overflow: hidden;
|
||||
}
|
||||
|
||||
#progress-fill {
|
||||
background: var(--accent);
|
||||
height: 100%;
|
||||
width: 0%;
|
||||
transition: width 0.3s ease;
|
||||
}
|
||||
|
||||
#progress-text { font-size: 0.8rem; color: var(--muted); }
|
||||
|
||||
/* Results header */
|
||||
.results-header {
|
||||
display: flex;
|
||||
align-items: center;
|
||||
justify-content: space-between;
|
||||
margin-bottom: 1.25rem;
|
||||
flex-wrap: wrap;
|
||||
gap: 0.75rem;
|
||||
}
|
||||
|
||||
#results-summary { font-weight: 600; }
|
||||
|
||||
.results-actions { display: flex; gap: 0.5rem; align-items: center; }
|
||||
|
||||
select {
|
||||
padding: 0.5rem 0.75rem;
|
||||
background: var(--surface);
|
||||
border: 1px solid var(--border);
|
||||
border-radius: 0.5rem;
|
||||
color: var(--text);
|
||||
font-size: 0.875rem;
|
||||
}
|
||||
|
||||
#new-scan-btn { background: var(--surface); border: 1px solid var(--border); color: var(--text); }
|
||||
|
||||
/* Findings */
|
||||
.findings-list { display: flex; flex-direction: column; gap: 0.5rem; max-width: 900px; }
|
||||
|
||||
.finding {
|
||||
background: var(--surface);
|
||||
border: 1px solid var(--border);
|
||||
border-left: 3px solid;
|
||||
border-radius: 0.5rem;
|
||||
padding: 0.75rem 1rem;
|
||||
cursor: pointer;
|
||||
}
|
||||
|
||||
.finding[data-severity="critical"] { border-left-color: var(--critical); }
|
||||
.finding[data-severity="high"] { border-left-color: var(--high); }
|
||||
.finding[data-severity="medium"] { border-left-color: var(--medium); }
|
||||
.finding[data-severity="low"] { border-left-color: var(--low); }
|
||||
.finding[data-severity="info"] { border-left-color: var(--info); }
|
||||
|
||||
.finding-header { display: flex; align-items: center; gap: 0.75rem; }
|
||||
|
||||
.badge {
|
||||
font-size: 0.7rem;
|
||||
font-weight: 700;
|
||||
text-transform: uppercase;
|
||||
padding: 0.2rem 0.5rem;
|
||||
border-radius: 0.25rem;
|
||||
letter-spacing: 0.05em;
|
||||
}
|
||||
|
||||
.badge-critical { background: var(--critical); color: white; }
|
||||
.badge-high { background: var(--high); color: white; }
|
||||
.badge-medium { background: var(--medium); color: #1a1a1a; }
|
||||
.badge-low { background: var(--low); color: white; }
|
||||
.badge-info { background: var(--surface); border: 1px solid var(--border); color: var(--muted); }
|
||||
|
||||
.finding-type { font-size: 0.875rem; font-weight: 500; }
|
||||
.finding-url { font-size: 0.8rem; color: var(--muted); word-break: break-all; }
|
||||
.finding-evidence { margin-top: 0.5rem; font-size: 0.8rem; color: var(--muted); font-family: monospace; background: var(--bg); padding: 0.4rem 0.6rem; border-radius: 0.25rem; display: none; word-break: break-all; }
|
||||
.finding.open .finding-evidence { display: block; }
|
||||
@@ -0,0 +1,70 @@
|
||||
import pytest
|
||||
from fastapi.testclient import TestClient
|
||||
from scanner.engine import clear_scans
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def reset():
|
||||
clear_scans()
|
||||
yield
|
||||
clear_scans()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def client():
|
||||
from main import app
|
||||
return TestClient(app)
|
||||
|
||||
|
||||
def test_post_scan_returns_scan_id(client):
|
||||
response = client.post("/scan", json={"url": "http://example.com"})
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert "scan_id" in data
|
||||
assert len(data["scan_id"]) > 0
|
||||
|
||||
|
||||
def test_post_scan_invalid_url_rejected(client):
|
||||
response = client.post("/scan", json={"url": "not-a-url"})
|
||||
assert response.status_code == 422
|
||||
|
||||
|
||||
def test_get_scan_returns_job(client):
|
||||
post = client.post("/scan", json={"url": "http://example.com", "modules": []})
|
||||
scan_id = post.json()["scan_id"]
|
||||
|
||||
response = client.get(f"/scan/{scan_id}")
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert data["id"] == scan_id
|
||||
assert data["target_url"] == "http://example.com/"
|
||||
assert "findings" in data
|
||||
|
||||
|
||||
def test_get_scan_unknown_returns_404(client):
|
||||
response = client.get("/scan/nonexistent-id")
|
||||
assert response.status_code == 404
|
||||
|
||||
|
||||
def test_list_scans(client):
|
||||
client.post("/scan", json={"url": "http://example.com", "modules": []})
|
||||
client.post("/scan", json={"url": "http://other.com", "modules": []})
|
||||
response = client.get("/scans")
|
||||
assert response.status_code == 200
|
||||
assert len(response.json()) == 2
|
||||
|
||||
|
||||
def test_export_scan_returns_json_download(client):
|
||||
post = client.post("/scan", json={"url": "http://example.com", "modules": []})
|
||||
scan_id = post.json()["scan_id"]
|
||||
|
||||
response = client.get(f"/scan/{scan_id}/export")
|
||||
assert response.status_code == 200
|
||||
assert "attachment" in response.headers.get("content-disposition", "")
|
||||
data = response.json()
|
||||
assert data["id"] == scan_id
|
||||
|
||||
|
||||
def test_export_unknown_scan_returns_404(client):
|
||||
response = client.get("/scan/nope/export")
|
||||
assert response.status_code == 404
|
||||
@@ -0,0 +1,42 @@
|
||||
import pytest
|
||||
import httpx
|
||||
from scanner.directory_listing import DirectoryListingDetector
|
||||
from scanner.models import Severity
|
||||
|
||||
|
||||
def make_client(body: str) -> httpx.AsyncClient:
|
||||
def handler(request):
|
||||
return httpx.Response(200, text=body)
|
||||
return httpx.AsyncClient(transport=httpx.MockTransport(handler))
|
||||
|
||||
|
||||
async def test_detects_apache_index_title():
|
||||
body = "<html><head><title>Index of /var/www</title></head></html>"
|
||||
async with make_client(body) as client:
|
||||
findings = await DirectoryListingDetector().detect(client, "http://target.com/files/")
|
||||
assert len(findings) == 1
|
||||
assert findings[0].type == "directory_listing"
|
||||
assert findings[0].severity == Severity.MEDIUM
|
||||
assert findings[0].module == "directory_listing"
|
||||
|
||||
|
||||
async def test_detects_parent_directory_link():
|
||||
body = '<a href="..">Parent Directory</a>'
|
||||
async with make_client(body) as client:
|
||||
findings = await DirectoryListingDetector().detect(client, "http://target.com/files/")
|
||||
assert len(findings) == 1
|
||||
|
||||
|
||||
async def test_normal_page_returns_empty():
|
||||
body = "<html><body><h1>Welcome</h1></body></html>"
|
||||
async with make_client(body) as client:
|
||||
findings = await DirectoryListingDetector().detect(client, "http://target.com")
|
||||
assert findings == []
|
||||
|
||||
|
||||
async def test_connection_error_returns_empty():
|
||||
def handler(request):
|
||||
raise httpx.ConnectError("refused")
|
||||
async with httpx.AsyncClient(transport=httpx.MockTransport(handler)) as client:
|
||||
findings = await DirectoryListingDetector().detect(client, "http://target.com")
|
||||
assert findings == []
|
||||
@@ -0,0 +1,66 @@
|
||||
import pytest
|
||||
import httpx
|
||||
import unittest.mock as mock
|
||||
from scanner.engine import register_scan, get_scan, list_scans, run_scan, clear_scans
|
||||
from scanner.models import ScanJob, ScanStatus
|
||||
|
||||
|
||||
def make_target_transport(responses: dict[str, tuple[int, str]]):
|
||||
def handler(request):
|
||||
path = request.url.path
|
||||
status, body = responses.get(path, (404, ""))
|
||||
return httpx.Response(status, text=body)
|
||||
return httpx.MockTransport(handler)
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def reset():
|
||||
clear_scans()
|
||||
yield
|
||||
clear_scans()
|
||||
|
||||
|
||||
def test_register_and_get_scan():
|
||||
job = ScanJob(id="test-1", target_url="http://target.com")
|
||||
register_scan(job)
|
||||
retrieved = get_scan("test-1")
|
||||
assert retrieved is job
|
||||
|
||||
|
||||
def test_get_scan_unknown_returns_none():
|
||||
assert get_scan("nonexistent") is None
|
||||
|
||||
|
||||
def test_list_scans():
|
||||
job1 = ScanJob(id="a", target_url="http://a.com")
|
||||
job2 = ScanJob(id="b", target_url="http://b.com")
|
||||
register_scan(job1)
|
||||
register_scan(job2)
|
||||
assert len(list_scans()) == 2
|
||||
|
||||
|
||||
async def test_run_scan_completes():
|
||||
job = ScanJob(id="scan-1", target_url="http://target.com")
|
||||
register_scan(job)
|
||||
await run_scan(job, modules=[])
|
||||
assert job.status == ScanStatus.COMPLETED
|
||||
|
||||
|
||||
async def test_run_scan_with_headers_module():
|
||||
responses = {"/": (200, "<html>hello</html>")}
|
||||
|
||||
_real_AsyncClient = httpx.AsyncClient
|
||||
|
||||
def patched_client(*args, **kwargs):
|
||||
kwargs.setdefault("transport", make_target_transport(responses))
|
||||
return _real_AsyncClient(*args, **kwargs)
|
||||
|
||||
job = ScanJob(id="scan-2", target_url="http://target.com")
|
||||
register_scan(job)
|
||||
|
||||
with mock.patch("scanner.engine.httpx.AsyncClient", side_effect=patched_client):
|
||||
await run_scan(job, modules=["headers"])
|
||||
|
||||
assert job.status == ScanStatus.COMPLETED
|
||||
header_findings = [f for f in job.findings if f.module == "header_analyzer"]
|
||||
assert len(header_findings) > 0
|
||||
@@ -0,0 +1,50 @@
|
||||
import pytest
|
||||
import httpx
|
||||
from scanner.header_analyzer import HeaderAnalyzer
|
||||
from scanner.models import Severity
|
||||
|
||||
|
||||
def make_client(headers: dict) -> httpx.AsyncClient:
|
||||
def handler(request):
|
||||
return httpx.Response(200, headers=headers)
|
||||
return httpx.AsyncClient(transport=httpx.MockTransport(handler))
|
||||
|
||||
|
||||
async def test_server_header_disclosure():
|
||||
async with make_client({"Server": "Apache/2.4.51 (Ubuntu)"}) as client:
|
||||
findings = await HeaderAnalyzer().analyze(client, "http://target.com")
|
||||
version_findings = [f for f in findings if f.type == "version_disclosure"]
|
||||
assert len(version_findings) >= 1
|
||||
assert any("Apache/2.4.51" in f.evidence for f in version_findings)
|
||||
assert version_findings[0].severity == Severity.LOW
|
||||
|
||||
|
||||
async def test_missing_security_headers_reported():
|
||||
async with make_client({}) as client:
|
||||
findings = await HeaderAnalyzer().analyze(client, "http://target.com")
|
||||
missing = [f for f in findings if f.type == "missing_security_header"]
|
||||
header_names = [f.evidence for f in missing]
|
||||
assert any("Content-Security-Policy" in h for h in header_names)
|
||||
assert any("X-Frame-Options" in h for h in header_names)
|
||||
assert all(f.severity == Severity.INFO for f in missing)
|
||||
|
||||
|
||||
async def test_present_security_header_not_reported():
|
||||
headers = {
|
||||
"X-Frame-Options": "DENY",
|
||||
"Content-Security-Policy": "default-src 'self'",
|
||||
"X-Content-Type-Options": "nosniff",
|
||||
"Strict-Transport-Security": "max-age=31536000",
|
||||
}
|
||||
async with make_client(headers) as client:
|
||||
findings = await HeaderAnalyzer().analyze(client, "http://target.com")
|
||||
missing = [f for f in findings if f.type == "missing_security_header"]
|
||||
assert missing == []
|
||||
|
||||
|
||||
async def test_connection_error_returns_empty():
|
||||
def handler(request):
|
||||
raise httpx.ConnectError("refused")
|
||||
async with httpx.AsyncClient(transport=httpx.MockTransport(handler)) as client:
|
||||
findings = await HeaderAnalyzer().analyze(client, "http://target.com")
|
||||
assert findings == []
|
||||
@@ -0,0 +1,54 @@
|
||||
import pytest
|
||||
import httpx
|
||||
from scanner.response_inspector import ResponseInspector
|
||||
from scanner.models import Severity
|
||||
|
||||
|
||||
def make_client(body: str) -> httpx.AsyncClient:
|
||||
def handler(request):
|
||||
return httpx.Response(200, text=body)
|
||||
return httpx.AsyncClient(transport=httpx.MockTransport(handler))
|
||||
|
||||
|
||||
async def test_detects_aws_key():
|
||||
body = 'var key = "AKIAIOSFODNN7EXAMPLE";'
|
||||
async with make_client(body) as client:
|
||||
findings = await ResponseInspector().inspect(client, "http://target.com")
|
||||
assert any(f.type == "aws_access_key" for f in findings)
|
||||
assert any(f.severity == Severity.CRITICAL for f in findings)
|
||||
|
||||
|
||||
async def test_detects_private_key():
|
||||
body = "-----BEGIN RSA PRIVATE KEY-----\nMIIEo..."
|
||||
async with make_client(body) as client:
|
||||
findings = await ResponseInspector().inspect(client, "http://target.com")
|
||||
assert any(f.type == "private_key" for f in findings)
|
||||
|
||||
|
||||
async def test_detects_php_stack_trace():
|
||||
body = "Fatal error: Call to undefined function foo() on line 42"
|
||||
async with make_client(body) as client:
|
||||
findings = await ResponseInspector().inspect(client, "http://target.com")
|
||||
assert any(f.type == "php_stack_trace" for f in findings)
|
||||
assert any(f.severity == Severity.HIGH for f in findings)
|
||||
|
||||
|
||||
async def test_detects_python_traceback():
|
||||
body = "Traceback (most recent call last):\n File app.py, line 10"
|
||||
async with make_client(body) as client:
|
||||
findings = await ResponseInspector().inspect(client, "http://target.com")
|
||||
assert any(f.type == "python_traceback" for f in findings)
|
||||
|
||||
|
||||
async def test_clean_page_returns_empty():
|
||||
async with make_client("<html><body>Hello world</body></html>") as client:
|
||||
findings = await ResponseInspector().inspect(client, "http://target.com")
|
||||
assert findings == []
|
||||
|
||||
|
||||
async def test_connection_error_returns_empty():
|
||||
def handler(request):
|
||||
raise httpx.ConnectError("refused")
|
||||
async with httpx.AsyncClient(transport=httpx.MockTransport(handler)) as client:
|
||||
findings = await ResponseInspector().inspect(client, "http://target.com")
|
||||
assert findings == []
|
||||
Reference in New Issue
Block a user