Compare commits

...

2 Commits

Author SHA1 Message Date
InfoLeak c4f5b1cee7 chore: add .gitignore, untrack __pycache__ 2026-06-21 19:16:56 +02:00
InfoLeak 97b55487d6 refactor: parallelize base-URL modules, cap directory-listing body, modern type hints 2026-06-21 19:16:32 +02:00
7 changed files with 55 additions and 15 deletions
+7
View File
@@ -0,0 +1,7 @@
__pycache__/
*.py[cod]
.pytest_cache/
.venv/
venv/
*.egg-info/
.superpowers/
+1
View File
@@ -74,6 +74,7 @@ async def list_all_scans():
"target_url": j.target_url,
"status": j.status.value,
"findings_count": len(j.findings),
"started_at": j.started_at,
}
for j in list_scans()
]
+2 -2
View File
@@ -2,7 +2,7 @@ import re
import httpx
from .models import Finding, Severity
from .models import MAX_RESPONSE_BYTES, Finding, Severity
_PATTERNS = [
re.compile(r'<title>Index of /', re.IGNORECASE),
@@ -18,7 +18,7 @@ class DirectoryListingDetector:
except (httpx.ConnectError, httpx.TimeoutException, httpx.RemoteProtocolError):
return []
body = response.text
body = response.text[:MAX_RESPONSE_BYTES]
for pattern in _PATTERNS:
if pattern.search(body):
return [Finding(
+10 -8
View File
@@ -32,17 +32,19 @@ async def run_scan(job: ScanJob, modules: list[str]) -> None:
job.status = ScanStatus.RUNNING
try:
async with httpx.AsyncClient(timeout=5.0, follow_redirects=True) as client:
# The base-URL modules are independent and each hit only the target
# URL, so run them concurrently rather than serially.
base_tasks = []
if "headers" in modules:
findings = await HeaderAnalyzer().analyze(client, job.target_url)
job.findings.extend(findings)
base_tasks.append(HeaderAnalyzer().analyze(client, job.target_url))
if "secrets" in modules:
findings = await ResponseInspector().inspect(client, job.target_url)
job.findings.extend(findings)
base_tasks.append(ResponseInspector().inspect(client, job.target_url))
if "directory" in modules:
findings = await DirectoryListingDetector().detect(client, job.target_url)
job.findings.extend(findings)
base_tasks.append(DirectoryListingDetector().detect(client, job.target_url))
if base_tasks:
for findings in await asyncio.gather(*base_tasks):
job.findings.extend(findings)
if "paths" in modules:
prober = PathProber()
+7 -3
View File
@@ -1,7 +1,11 @@
import time
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional
# Maximum number of response body bytes any module reads into memory before
# scanning. Caps memory use when a target serves a very large body within the
# request timeout.
MAX_RESPONSE_BYTES = 512 * 1024
class Severity(str, Enum):
@@ -36,6 +40,6 @@ class ScanJob:
findings: list[Finding] = field(default_factory=list)
progress: int = 0
total: int = 0
error: Optional[str] = None
error: str | None = None
started_at: float = field(default_factory=time.time)
completed_at: Optional[float] = None
completed_at: float | None = None
+2 -2
View File
@@ -2,7 +2,7 @@ import re
import httpx
from .models import Finding, Severity
from .models import MAX_RESPONSE_BYTES, Finding, Severity
_PATTERNS: list[tuple[Severity, str, re.Pattern]] = [
(Severity.CRITICAL, "aws_access_key",
@@ -29,7 +29,7 @@ class ResponseInspector:
except (httpx.ConnectError, httpx.TimeoutException, httpx.RemoteProtocolError):
return []
body = response.text[:524288]
body = response.text[:MAX_RESPONSE_BYTES]
findings: list[Finding] = []
for severity, finding_type, pattern in _PATTERNS:
+26
View File
@@ -64,3 +64,29 @@ async def test_run_scan_with_headers_module():
assert job.status == ScanStatus.COMPLETED
header_findings = [f for f in job.findings if f.module == "header_analyzer"]
assert len(header_findings) > 0
async def test_run_scan_base_modules_run_concurrently():
# A response that triggers a finding in each of the three base-URL modules:
# missing security headers (header_analyzer), an AWS key (response_inspector),
# and a directory listing (directory_listing).
body = '<title>Index of /</title> key=AKIAIOSFODNN7EXAMPLE'
responses = {"/": (200, body)}
_real_AsyncClient = httpx.AsyncClient
def patched_client(*args, **kwargs):
kwargs.setdefault("transport", make_target_transport(responses))
return _real_AsyncClient(*args, **kwargs)
job = ScanJob(id="scan-3", target_url="http://target.com")
register_scan(job)
with mock.patch("scanner.engine.httpx.AsyncClient", side_effect=patched_client):
await run_scan(job, modules=["headers", "secrets", "directory"])
assert job.status == ScanStatus.COMPLETED
modules_with_findings = {f.module for f in job.findings}
assert "header_analyzer" in modules_with_findings
assert "response_inspector" in modules_with_findings
assert "directory_listing" in modules_with_findings