Compare commits

...

10 Commits

14 changed files with 949 additions and 1 deletions
+82
View File
@@ -0,0 +1,82 @@
import uuid
from fastapi import BackgroundTasks, FastAPI, HTTPException
from fastapi.responses import JSONResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel, HttpUrl
from scanner.engine import clear_scans, get_scan, list_scans, register_scan, run_scan
from scanner.models import ScanJob
app = FastAPI(title="InfoLeak Scanner", version="1.0.0")
class ScanRequest(BaseModel):
url: HttpUrl
modules: list[str] = ["paths", "headers", "secrets", "directory"]
def _serialize(job: ScanJob) -> dict:
return {
"id": job.id,
"target_url": job.target_url,
"status": job.status.value,
"findings": [
{
"severity": f.severity.value,
"type": f.type,
"url": f.url,
"evidence": f.evidence,
"module": f.module,
}
for f in job.findings
],
"progress": job.progress,
"total": job.total,
"error": job.error,
"started_at": job.started_at,
"completed_at": job.completed_at,
}
@app.post("/scan")
async def start_scan(request: ScanRequest, background_tasks: BackgroundTasks):
job = ScanJob(id=str(uuid.uuid4()), target_url=str(request.url))
register_scan(job)
background_tasks.add_task(run_scan, job, request.modules)
return {"scan_id": job.id}
@app.get("/scan/{scan_id}/export")
async def export_scan(scan_id: str):
job = get_scan(scan_id)
if not job:
raise HTTPException(status_code=404, detail="Scan not found")
return JSONResponse(
content=_serialize(job),
headers={"Content-Disposition": f"attachment; filename=scan-{scan_id[:8]}.json"},
)
@app.get("/scan/{scan_id}")
async def get_scan_status(scan_id: str):
job = get_scan(scan_id)
if not job:
raise HTTPException(status_code=404, detail="Scan not found")
return _serialize(job)
@app.get("/scans")
async def list_all_scans():
return [
{
"id": j.id,
"target_url": j.target_url,
"status": j.status.value,
"findings_count": len(j.findings),
}
for j in list_scans()
]
app.mount("/", StaticFiles(directory="static", html=True), name="static")
+31
View File
@@ -0,0 +1,31 @@
import re
import httpx
from .models import Finding, Severity
_PATTERNS = [
re.compile(r'<title>Index of /', re.IGNORECASE),
re.compile(r'Parent Directory</a>', re.IGNORECASE),
re.compile(r'<pre>.*?<a href="\.\.">', re.IGNORECASE | re.DOTALL),
]
class DirectoryListingDetector:
async def detect(self, client: httpx.AsyncClient, url: str) -> list[Finding]:
try:
response = await client.get(url)
except (httpx.ConnectError, httpx.TimeoutException, httpx.RemoteProtocolError):
return []
body = response.text
for pattern in _PATTERNS:
if pattern.search(body):
return [Finding(
severity=Severity.MEDIUM,
type="directory_listing",
url=url,
evidence="Directory listing is enabled",
module="directory_listing",
)]
return []
+69
View File
@@ -0,0 +1,69 @@
import asyncio
import time
import httpx
from .directory_listing import DirectoryListingDetector
from .header_analyzer import HeaderAnalyzer
from .models import ScanJob, ScanStatus
from .path_prober import PathProber
from .response_inspector import ResponseInspector
_scans: dict[str, ScanJob] = {}
def register_scan(job: ScanJob) -> None:
_scans[job.id] = job
def get_scan(scan_id: str) -> ScanJob | None:
return _scans.get(scan_id)
def list_scans() -> list[ScanJob]:
return list(_scans.values())
def clear_scans() -> None:
_scans.clear()
async def run_scan(job: ScanJob, modules: list[str]) -> None:
job.status = ScanStatus.RUNNING
try:
async with httpx.AsyncClient(timeout=5.0, follow_redirects=True) as client:
if "headers" in modules:
findings = await HeaderAnalyzer().analyze(client, job.target_url)
job.findings.extend(findings)
if "secrets" in modules:
findings = await ResponseInspector().inspect(client, job.target_url)
job.findings.extend(findings)
if "directory" in modules:
findings = await DirectoryListingDetector().detect(client, job.target_url)
job.findings.extend(findings)
if "paths" in modules:
prober = PathProber()
paths = prober.load_wordlist()
job.total = len(paths)
semaphore = asyncio.Semaphore(10)
async def probe(path: str) -> None:
async with semaphore:
try:
found = await prober.probe(client, job.target_url, path)
job.findings.extend(found)
except Exception:
pass
job.progress += 1
await asyncio.gather(*[probe(p) for p in paths])
job.status = ScanStatus.COMPLETED
job.completed_at = time.time()
except Exception as exc:
job.status = ScanStatus.ERROR
job.error = str(exc)
+51
View File
@@ -0,0 +1,51 @@
import httpx
from .models import Finding, Severity
_VERSION_HEADERS = [
"server",
"x-powered-by",
"x-aspnet-version",
"x-aspnetmvc-version",
"x-generator",
]
_SECURITY_HEADERS = [
"X-Frame-Options",
"Content-Security-Policy",
"X-Content-Type-Options",
"Strict-Transport-Security",
]
class HeaderAnalyzer:
async def analyze(self, client: httpx.AsyncClient, url: str) -> list[Finding]:
try:
response = await client.get(url)
except (httpx.ConnectError, httpx.TimeoutException, httpx.RemoteProtocolError):
return []
findings: list[Finding] = []
headers = response.headers
for header in _VERSION_HEADERS:
if header in headers:
findings.append(Finding(
severity=Severity.LOW,
type="version_disclosure",
url=url,
evidence=f"{header.title()}: {headers[header]}",
module="header_analyzer",
))
for header in _SECURITY_HEADERS:
if header.lower() not in headers:
findings.append(Finding(
severity=Severity.INFO,
type="missing_security_header",
url=url,
evidence=f"Missing header: {header}",
module="header_analyzer",
))
return findings
+1 -1
View File
@@ -34,7 +34,7 @@ class PathProber:
severity = self._severity(path)
if response.status_code == 200:
snippet = response.text[:200].replace("\n", " ")
snippet = response.content[:512].decode("utf-8", errors="replace")[:200].replace("\n", " ")
evidence = f"HTTP 200 — {snippet}" if snippet else "HTTP 200"
else:
evidence = "HTTP 403 (resource exists but forbidden)"
+46
View File
@@ -0,0 +1,46 @@
import re
import httpx
from .models import Finding, Severity
_PATTERNS: list[tuple[Severity, str, re.Pattern]] = [
(Severity.CRITICAL, "aws_access_key",
re.compile(r'AKIA[0-9A-Z]{16}')),
(Severity.CRITICAL, "private_key",
re.compile(r'-----BEGIN (?:RSA |EC |DSA )?PRIVATE KEY-----')),
(Severity.HIGH, "password_in_comment",
re.compile(r'<!--.*?(?:password|passwd|pwd)\s*[=:]\s*\S+.*?-->', re.IGNORECASE | re.DOTALL)),
(Severity.HIGH, "php_stack_trace",
re.compile(r'(?:Fatal error|Warning|Parse error).*?on line \d+', re.IGNORECASE)),
(Severity.HIGH, "python_traceback",
re.compile(r'Traceback \(most recent call last\)')),
(Severity.HIGH, "java_exception",
re.compile(r'at [a-zA-Z_$][a-zA-Z0-9_$]*(?:\.[a-zA-Z_$][a-zA-Z0-9_$]*)+\(.*?\.java:\d+\)')),
(Severity.MEDIUM, "generic_api_key",
re.compile(r'["\']?api[_-]?key["\']?\s*[:=]\s*["\']?([A-Za-z0-9_\-]{20,})', re.IGNORECASE)),
]
class ResponseInspector:
async def inspect(self, client: httpx.AsyncClient, url: str) -> list[Finding]:
try:
response = await client.get(url)
except (httpx.ConnectError, httpx.TimeoutException, httpx.RemoteProtocolError):
return []
body = response.text[:524288]
findings: list[Finding] = []
for severity, finding_type, pattern in _PATTERNS:
match = pattern.search(body)
if match:
findings.append(Finding(
severity=severity,
type=finding_type,
url=url,
evidence=match.group(0)[:200],
module="response_inspector",
))
return findings
+146
View File
@@ -0,0 +1,146 @@
const API = '';
const views = {
starter: document.getElementById('view-starter'),
scanning: document.getElementById('view-scanning'),
results: document.getElementById('view-results'),
};
function showView(name) {
Object.values(views).forEach(v => v.classList.add('hidden'));
views[name].classList.remove('hidden');
}
// ── Findings rendering ────────────────────────────────────────────────────────
function renderFinding(f) {
const el = document.createElement('div');
el.className = 'finding';
el.dataset.severity = f.severity;
el.innerHTML = `
<div class="finding-header">
<span class="badge badge-${f.severity}">${f.severity}</span>
<span class="finding-type">${escapeHtml(f.type).replace(/_/g, ' ')}</span>
</div>
<div class="finding-url">${escapeHtml(f.url)}</div>
<div class="finding-evidence">${escapeHtml(f.evidence)}</div>
`;
el.addEventListener('click', () => el.classList.toggle('open'));
return el;
}
function escapeHtml(str) {
return str.replace(/&/g,'&amp;').replace(/</g,'&lt;').replace(/>/g,'&gt;')
.replace(/"/g,'&quot;').replace(/'/g,'&#39;');
}
// ── Scan flow ─────────────────────────────────────────────────────────────────
let currentScanId = null;
let pollTimer = null;
async function startScan(url, modules) {
const res = await fetch(`${API}/scan`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ url, modules }),
});
if (!res.ok) throw new Error(await res.text());
const { scan_id } = await res.json();
return scan_id;
}
function startPolling(scanId) {
clearInterval(pollTimer);
pollTimer = setInterval(() => pollScan(scanId), 2000);
}
async function pollScan(scanId) {
const res = await fetch(`${API}/scan/${scanId}`);
if (!res.ok) return;
const job = await res.json();
updateLiveView(job);
if (job.status === 'completed' || job.status === 'error') {
clearInterval(pollTimer);
showResults(job);
}
}
function updateLiveView(job) {
const fill = document.getElementById('progress-fill');
const text = document.getElementById('progress-text');
const list = document.getElementById('live-findings');
if (job.total > 0) {
const pct = Math.round((job.progress / job.total) * 100);
fill.style.width = `${pct}%`;
text.textContent = `${job.progress} / ${job.total} paths checked`;
} else {
fill.style.width = '100%';
text.textContent = job.status === 'running' ? 'Scanning…' : job.status;
}
const existing = list.children.length;
job.findings.slice(existing).forEach(f => {
list.prepend(renderFinding(f));
});
}
function showResults(job) {
const summary = document.getElementById('results-summary');
const list = document.getElementById('results-findings');
summary.textContent = `${job.findings.length} finding${job.findings.length !== 1 ? 's' : ''}${job.target_url}`;
list.innerHTML = '';
const sorted = [...job.findings].sort((a, b) => {
const order = { critical: 0, high: 1, medium: 2, low: 3, info: 4 };
return (order[a.severity] ?? 5) - (order[b.severity] ?? 5);
});
sorted.forEach(f => list.appendChild(renderFinding(f)));
showView('results');
}
// ── Event listeners ───────────────────────────────────────────────────────────
document.getElementById('scan-form').addEventListener('submit', async (e) => {
e.preventDefault();
const url = document.getElementById('url-input').value;
const modules = [...document.querySelectorAll('input[name="modules"]:checked')]
.map(el => el.value);
document.getElementById('scan-target').textContent = `Scanning ${url}`;
document.getElementById('progress-fill').style.width = '0%';
document.getElementById('progress-text').textContent = 'Starting…';
document.getElementById('live-findings').innerHTML = '';
showView('scanning');
try {
currentScanId = await startScan(url, modules);
startPolling(currentScanId);
} catch (err) {
alert(`Failed to start scan: ${err.message}`);
showView('starter');
}
});
document.getElementById('new-scan-btn').addEventListener('click', () => {
clearInterval(pollTimer);
showView('starter');
});
document.getElementById('export-btn').addEventListener('click', () => {
if (currentScanId) {
window.location.href = `${API}/scan/${currentScanId}/export`;
}
});
document.getElementById('filter-severity').addEventListener('change', (e) => {
const value = e.target.value;
document.querySelectorAll('#results-findings .finding').forEach(el => {
el.style.display = (!value || el.dataset.severity === value) ? '' : 'none';
});
});
+69
View File
@@ -0,0 +1,69 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>InfoLeak Scanner</title>
<link rel="stylesheet" href="/style.css" />
</head>
<body>
<header>
<h1>InfoLeak Scanner</h1>
<p class="subtitle">Web Application Information Leak Detector</p>
</header>
<main>
<!-- View: Starter -->
<section id="view-starter">
<form id="scan-form">
<div class="field">
<label for="url-input">Target URL</label>
<input id="url-input" type="url" placeholder="https://example.com" required />
</div>
<div class="field">
<label>Modules</label>
<div class="checkboxes">
<label><input type="checkbox" name="modules" value="paths" checked /> Path Prober</label>
<label><input type="checkbox" name="modules" value="headers" checked /> Header Analyzer</label>
<label><input type="checkbox" name="modules" value="secrets" checked /> Response Inspector</label>
<label><input type="checkbox" name="modules" value="directory" checked /> Directory Listing</label>
</div>
</div>
<button type="submit">Start Scan</button>
</form>
</section>
<!-- View: Scanning -->
<section id="view-scanning" class="hidden">
<div class="scan-status">
<span id="scan-target"></span>
<div class="progress-bar"><div id="progress-fill"></div></div>
<span id="progress-text">Initializing...</span>
</div>
<div id="live-findings" class="findings-list"></div>
</section>
<!-- View: Results -->
<section id="view-results" class="hidden">
<div class="results-header">
<span id="results-summary"></span>
<div class="results-actions">
<select id="filter-severity">
<option value="">All severities</option>
<option value="critical">Critical</option>
<option value="high">High</option>
<option value="medium">Medium</option>
<option value="low">Low</option>
<option value="info">Info</option>
</select>
<button id="export-btn">Export JSON</button>
<button id="new-scan-btn">New Scan</button>
</div>
</div>
<div id="results-findings" class="findings-list"></div>
</section>
</main>
<script src="/app.js"></script>
</body>
</html>
+172
View File
@@ -0,0 +1,172 @@
*, *::before, *::after { box-sizing: border-box; margin: 0; padding: 0; }
:root {
--bg: #0f1117;
--surface: #1a1d27;
--border: #2a2d3a;
--text: #e2e8f0;
--muted: #6b7280;
--accent: #6366f1;
--critical: #ef4444;
--high: #f97316;
--medium: #eab308;
--low: #3b82f6;
--info: #6b7280;
}
body {
background: var(--bg);
color: var(--text);
font-family: 'Segoe UI', system-ui, sans-serif;
min-height: 100vh;
padding: 2rem;
}
header {
margin-bottom: 2rem;
}
h1 {
font-size: 1.75rem;
font-weight: 700;
color: var(--accent);
}
.subtitle { color: var(--muted); margin-top: 0.25rem; font-size: 0.9rem; }
.hidden { display: none !important; }
/* Starter form */
#scan-form {
background: var(--surface);
border: 1px solid var(--border);
border-radius: 0.75rem;
padding: 2rem;
max-width: 640px;
}
.field { margin-bottom: 1.25rem; }
.field label { display: block; margin-bottom: 0.5rem; font-size: 0.875rem; color: var(--muted); }
input[type="url"] {
width: 100%;
padding: 0.625rem 0.875rem;
background: var(--bg);
border: 1px solid var(--border);
border-radius: 0.5rem;
color: var(--text);
font-size: 1rem;
}
input[type="url"]:focus { outline: none; border-color: var(--accent); }
.checkboxes { display: flex; gap: 1rem; flex-wrap: wrap; }
.checkboxes label { display: flex; align-items: center; gap: 0.4rem; font-size: 0.875rem; cursor: pointer; }
button {
padding: 0.625rem 1.25rem;
background: var(--accent);
color: white;
border: none;
border-radius: 0.5rem;
font-size: 0.9rem;
cursor: pointer;
transition: opacity 0.15s;
}
button:hover { opacity: 0.85; }
button:disabled { opacity: 0.4; cursor: not-allowed; }
/* Scanning view */
.scan-status {
background: var(--surface);
border: 1px solid var(--border);
border-radius: 0.75rem;
padding: 1.5rem;
margin-bottom: 1.5rem;
max-width: 640px;
}
#scan-target { font-size: 0.875rem; color: var(--muted); display: block; margin-bottom: 0.75rem; }
.progress-bar {
background: var(--border);
border-radius: 999px;
height: 6px;
margin-bottom: 0.5rem;
overflow: hidden;
}
#progress-fill {
background: var(--accent);
height: 100%;
width: 0%;
transition: width 0.3s ease;
}
#progress-text { font-size: 0.8rem; color: var(--muted); }
/* Results header */
.results-header {
display: flex;
align-items: center;
justify-content: space-between;
margin-bottom: 1.25rem;
flex-wrap: wrap;
gap: 0.75rem;
}
#results-summary { font-weight: 600; }
.results-actions { display: flex; gap: 0.5rem; align-items: center; }
select {
padding: 0.5rem 0.75rem;
background: var(--surface);
border: 1px solid var(--border);
border-radius: 0.5rem;
color: var(--text);
font-size: 0.875rem;
}
#new-scan-btn { background: var(--surface); border: 1px solid var(--border); color: var(--text); }
/* Findings */
.findings-list { display: flex; flex-direction: column; gap: 0.5rem; max-width: 900px; }
.finding {
background: var(--surface);
border: 1px solid var(--border);
border-left: 3px solid;
border-radius: 0.5rem;
padding: 0.75rem 1rem;
cursor: pointer;
}
.finding[data-severity="critical"] { border-left-color: var(--critical); }
.finding[data-severity="high"] { border-left-color: var(--high); }
.finding[data-severity="medium"] { border-left-color: var(--medium); }
.finding[data-severity="low"] { border-left-color: var(--low); }
.finding[data-severity="info"] { border-left-color: var(--info); }
.finding-header { display: flex; align-items: center; gap: 0.75rem; }
.badge {
font-size: 0.7rem;
font-weight: 700;
text-transform: uppercase;
padding: 0.2rem 0.5rem;
border-radius: 0.25rem;
letter-spacing: 0.05em;
}
.badge-critical { background: var(--critical); color: white; }
.badge-high { background: var(--high); color: white; }
.badge-medium { background: var(--medium); color: #1a1a1a; }
.badge-low { background: var(--low); color: white; }
.badge-info { background: var(--surface); border: 1px solid var(--border); color: var(--muted); }
.finding-type { font-size: 0.875rem; font-weight: 500; }
.finding-url { font-size: 0.8rem; color: var(--muted); word-break: break-all; }
.finding-evidence { margin-top: 0.5rem; font-size: 0.8rem; color: var(--muted); font-family: monospace; background: var(--bg); padding: 0.4rem 0.6rem; border-radius: 0.25rem; display: none; word-break: break-all; }
.finding.open .finding-evidence { display: block; }
+70
View File
@@ -0,0 +1,70 @@
import pytest
from fastapi.testclient import TestClient
from scanner.engine import clear_scans
@pytest.fixture(autouse=True)
def reset():
clear_scans()
yield
clear_scans()
@pytest.fixture
def client():
from main import app
return TestClient(app)
def test_post_scan_returns_scan_id(client):
response = client.post("/scan", json={"url": "http://example.com"})
assert response.status_code == 200
data = response.json()
assert "scan_id" in data
assert len(data["scan_id"]) > 0
def test_post_scan_invalid_url_rejected(client):
response = client.post("/scan", json={"url": "not-a-url"})
assert response.status_code == 422
def test_get_scan_returns_job(client):
post = client.post("/scan", json={"url": "http://example.com", "modules": []})
scan_id = post.json()["scan_id"]
response = client.get(f"/scan/{scan_id}")
assert response.status_code == 200
data = response.json()
assert data["id"] == scan_id
assert data["target_url"] == "http://example.com/"
assert "findings" in data
def test_get_scan_unknown_returns_404(client):
response = client.get("/scan/nonexistent-id")
assert response.status_code == 404
def test_list_scans(client):
client.post("/scan", json={"url": "http://example.com", "modules": []})
client.post("/scan", json={"url": "http://other.com", "modules": []})
response = client.get("/scans")
assert response.status_code == 200
assert len(response.json()) == 2
def test_export_scan_returns_json_download(client):
post = client.post("/scan", json={"url": "http://example.com", "modules": []})
scan_id = post.json()["scan_id"]
response = client.get(f"/scan/{scan_id}/export")
assert response.status_code == 200
assert "attachment" in response.headers.get("content-disposition", "")
data = response.json()
assert data["id"] == scan_id
def test_export_unknown_scan_returns_404(client):
response = client.get("/scan/nope/export")
assert response.status_code == 404
+42
View File
@@ -0,0 +1,42 @@
import pytest
import httpx
from scanner.directory_listing import DirectoryListingDetector
from scanner.models import Severity
def make_client(body: str) -> httpx.AsyncClient:
def handler(request):
return httpx.Response(200, text=body)
return httpx.AsyncClient(transport=httpx.MockTransport(handler))
async def test_detects_apache_index_title():
body = "<html><head><title>Index of /var/www</title></head></html>"
async with make_client(body) as client:
findings = await DirectoryListingDetector().detect(client, "http://target.com/files/")
assert len(findings) == 1
assert findings[0].type == "directory_listing"
assert findings[0].severity == Severity.MEDIUM
assert findings[0].module == "directory_listing"
async def test_detects_parent_directory_link():
body = '<a href="..">Parent Directory</a>'
async with make_client(body) as client:
findings = await DirectoryListingDetector().detect(client, "http://target.com/files/")
assert len(findings) == 1
async def test_normal_page_returns_empty():
body = "<html><body><h1>Welcome</h1></body></html>"
async with make_client(body) as client:
findings = await DirectoryListingDetector().detect(client, "http://target.com")
assert findings == []
async def test_connection_error_returns_empty():
def handler(request):
raise httpx.ConnectError("refused")
async with httpx.AsyncClient(transport=httpx.MockTransport(handler)) as client:
findings = await DirectoryListingDetector().detect(client, "http://target.com")
assert findings == []
+66
View File
@@ -0,0 +1,66 @@
import pytest
import httpx
import unittest.mock as mock
from scanner.engine import register_scan, get_scan, list_scans, run_scan, clear_scans
from scanner.models import ScanJob, ScanStatus
def make_target_transport(responses: dict[str, tuple[int, str]]):
def handler(request):
path = request.url.path
status, body = responses.get(path, (404, ""))
return httpx.Response(status, text=body)
return httpx.MockTransport(handler)
@pytest.fixture(autouse=True)
def reset():
clear_scans()
yield
clear_scans()
def test_register_and_get_scan():
job = ScanJob(id="test-1", target_url="http://target.com")
register_scan(job)
retrieved = get_scan("test-1")
assert retrieved is job
def test_get_scan_unknown_returns_none():
assert get_scan("nonexistent") is None
def test_list_scans():
job1 = ScanJob(id="a", target_url="http://a.com")
job2 = ScanJob(id="b", target_url="http://b.com")
register_scan(job1)
register_scan(job2)
assert len(list_scans()) == 2
async def test_run_scan_completes():
job = ScanJob(id="scan-1", target_url="http://target.com")
register_scan(job)
await run_scan(job, modules=[])
assert job.status == ScanStatus.COMPLETED
async def test_run_scan_with_headers_module():
responses = {"/": (200, "<html>hello</html>")}
_real_AsyncClient = httpx.AsyncClient
def patched_client(*args, **kwargs):
kwargs.setdefault("transport", make_target_transport(responses))
return _real_AsyncClient(*args, **kwargs)
job = ScanJob(id="scan-2", target_url="http://target.com")
register_scan(job)
with mock.patch("scanner.engine.httpx.AsyncClient", side_effect=patched_client):
await run_scan(job, modules=["headers"])
assert job.status == ScanStatus.COMPLETED
header_findings = [f for f in job.findings if f.module == "header_analyzer"]
assert len(header_findings) > 0
+50
View File
@@ -0,0 +1,50 @@
import pytest
import httpx
from scanner.header_analyzer import HeaderAnalyzer
from scanner.models import Severity
def make_client(headers: dict) -> httpx.AsyncClient:
def handler(request):
return httpx.Response(200, headers=headers)
return httpx.AsyncClient(transport=httpx.MockTransport(handler))
async def test_server_header_disclosure():
async with make_client({"Server": "Apache/2.4.51 (Ubuntu)"}) as client:
findings = await HeaderAnalyzer().analyze(client, "http://target.com")
version_findings = [f for f in findings if f.type == "version_disclosure"]
assert len(version_findings) >= 1
assert any("Apache/2.4.51" in f.evidence for f in version_findings)
assert version_findings[0].severity == Severity.LOW
async def test_missing_security_headers_reported():
async with make_client({}) as client:
findings = await HeaderAnalyzer().analyze(client, "http://target.com")
missing = [f for f in findings if f.type == "missing_security_header"]
header_names = [f.evidence for f in missing]
assert any("Content-Security-Policy" in h for h in header_names)
assert any("X-Frame-Options" in h for h in header_names)
assert all(f.severity == Severity.INFO for f in missing)
async def test_present_security_header_not_reported():
headers = {
"X-Frame-Options": "DENY",
"Content-Security-Policy": "default-src 'self'",
"X-Content-Type-Options": "nosniff",
"Strict-Transport-Security": "max-age=31536000",
}
async with make_client(headers) as client:
findings = await HeaderAnalyzer().analyze(client, "http://target.com")
missing = [f for f in findings if f.type == "missing_security_header"]
assert missing == []
async def test_connection_error_returns_empty():
def handler(request):
raise httpx.ConnectError("refused")
async with httpx.AsyncClient(transport=httpx.MockTransport(handler)) as client:
findings = await HeaderAnalyzer().analyze(client, "http://target.com")
assert findings == []
+54
View File
@@ -0,0 +1,54 @@
import pytest
import httpx
from scanner.response_inspector import ResponseInspector
from scanner.models import Severity
def make_client(body: str) -> httpx.AsyncClient:
def handler(request):
return httpx.Response(200, text=body)
return httpx.AsyncClient(transport=httpx.MockTransport(handler))
async def test_detects_aws_key():
body = 'var key = "AKIAIOSFODNN7EXAMPLE";'
async with make_client(body) as client:
findings = await ResponseInspector().inspect(client, "http://target.com")
assert any(f.type == "aws_access_key" for f in findings)
assert any(f.severity == Severity.CRITICAL for f in findings)
async def test_detects_private_key():
body = "-----BEGIN RSA PRIVATE KEY-----\nMIIEo..."
async with make_client(body) as client:
findings = await ResponseInspector().inspect(client, "http://target.com")
assert any(f.type == "private_key" for f in findings)
async def test_detects_php_stack_trace():
body = "Fatal error: Call to undefined function foo() on line 42"
async with make_client(body) as client:
findings = await ResponseInspector().inspect(client, "http://target.com")
assert any(f.type == "php_stack_trace" for f in findings)
assert any(f.severity == Severity.HIGH for f in findings)
async def test_detects_python_traceback():
body = "Traceback (most recent call last):\n File app.py, line 10"
async with make_client(body) as client:
findings = await ResponseInspector().inspect(client, "http://target.com")
assert any(f.type == "python_traceback" for f in findings)
async def test_clean_page_returns_empty():
async with make_client("<html><body>Hello world</body></html>") as client:
findings = await ResponseInspector().inspect(client, "http://target.com")
assert findings == []
async def test_connection_error_returns_empty():
def handler(request):
raise httpx.ConnectError("refused")
async with httpx.AsyncClient(transport=httpx.MockTransport(handler)) as client:
findings = await ResponseInspector().inspect(client, "http://target.com")
assert findings == []