refactor: parallelize base-URL modules, cap directory-listing body, modern type hints
This commit is contained in:
Binary file not shown.
@@ -74,6 +74,7 @@ async def list_all_scans():
|
||||
"target_url": j.target_url,
|
||||
"status": j.status.value,
|
||||
"findings_count": len(j.findings),
|
||||
"started_at": j.started_at,
|
||||
}
|
||||
for j in list_scans()
|
||||
]
|
||||
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -2,7 +2,7 @@ import re
|
||||
|
||||
import httpx
|
||||
|
||||
from .models import Finding, Severity
|
||||
from .models import MAX_RESPONSE_BYTES, Finding, Severity
|
||||
|
||||
_PATTERNS = [
|
||||
re.compile(r'<title>Index of /', re.IGNORECASE),
|
||||
@@ -18,7 +18,7 @@ class DirectoryListingDetector:
|
||||
except (httpx.ConnectError, httpx.TimeoutException, httpx.RemoteProtocolError):
|
||||
return []
|
||||
|
||||
body = response.text
|
||||
body = response.text[:MAX_RESPONSE_BYTES]
|
||||
for pattern in _PATTERNS:
|
||||
if pattern.search(body):
|
||||
return [Finding(
|
||||
|
||||
+10
-8
@@ -32,17 +32,19 @@ async def run_scan(job: ScanJob, modules: list[str]) -> None:
|
||||
job.status = ScanStatus.RUNNING
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=5.0, follow_redirects=True) as client:
|
||||
# The base-URL modules are independent and each hit only the target
|
||||
# URL, so run them concurrently rather than serially.
|
||||
base_tasks = []
|
||||
if "headers" in modules:
|
||||
findings = await HeaderAnalyzer().analyze(client, job.target_url)
|
||||
job.findings.extend(findings)
|
||||
|
||||
base_tasks.append(HeaderAnalyzer().analyze(client, job.target_url))
|
||||
if "secrets" in modules:
|
||||
findings = await ResponseInspector().inspect(client, job.target_url)
|
||||
job.findings.extend(findings)
|
||||
|
||||
base_tasks.append(ResponseInspector().inspect(client, job.target_url))
|
||||
if "directory" in modules:
|
||||
findings = await DirectoryListingDetector().detect(client, job.target_url)
|
||||
job.findings.extend(findings)
|
||||
base_tasks.append(DirectoryListingDetector().detect(client, job.target_url))
|
||||
|
||||
if base_tasks:
|
||||
for findings in await asyncio.gather(*base_tasks):
|
||||
job.findings.extend(findings)
|
||||
|
||||
if "paths" in modules:
|
||||
prober = PathProber()
|
||||
|
||||
+7
-3
@@ -1,7 +1,11 @@
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
from enum import Enum
|
||||
from typing import Optional
|
||||
|
||||
# Maximum number of response body bytes any module reads into memory before
|
||||
# scanning. Caps memory use when a target serves a very large body within the
|
||||
# request timeout.
|
||||
MAX_RESPONSE_BYTES = 512 * 1024
|
||||
|
||||
|
||||
class Severity(str, Enum):
|
||||
@@ -36,6 +40,6 @@ class ScanJob:
|
||||
findings: list[Finding] = field(default_factory=list)
|
||||
progress: int = 0
|
||||
total: int = 0
|
||||
error: Optional[str] = None
|
||||
error: str | None = None
|
||||
started_at: float = field(default_factory=time.time)
|
||||
completed_at: Optional[float] = None
|
||||
completed_at: float | None = None
|
||||
|
||||
@@ -2,7 +2,7 @@ import re
|
||||
|
||||
import httpx
|
||||
|
||||
from .models import Finding, Severity
|
||||
from .models import MAX_RESPONSE_BYTES, Finding, Severity
|
||||
|
||||
_PATTERNS: list[tuple[Severity, str, re.Pattern]] = [
|
||||
(Severity.CRITICAL, "aws_access_key",
|
||||
@@ -29,7 +29,7 @@ class ResponseInspector:
|
||||
except (httpx.ConnectError, httpx.TimeoutException, httpx.RemoteProtocolError):
|
||||
return []
|
||||
|
||||
body = response.text[:524288]
|
||||
body = response.text[:MAX_RESPONSE_BYTES]
|
||||
findings: list[Finding] = []
|
||||
|
||||
for severity, finding_type, pattern in _PATTERNS:
|
||||
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -64,3 +64,29 @@ async def test_run_scan_with_headers_module():
|
||||
assert job.status == ScanStatus.COMPLETED
|
||||
header_findings = [f for f in job.findings if f.module == "header_analyzer"]
|
||||
assert len(header_findings) > 0
|
||||
|
||||
|
||||
async def test_run_scan_base_modules_run_concurrently():
|
||||
# A response that triggers a finding in each of the three base-URL modules:
|
||||
# missing security headers (header_analyzer), an AWS key (response_inspector),
|
||||
# and a directory listing (directory_listing).
|
||||
body = '<title>Index of /</title> key=AKIAIOSFODNN7EXAMPLE'
|
||||
responses = {"/": (200, body)}
|
||||
|
||||
_real_AsyncClient = httpx.AsyncClient
|
||||
|
||||
def patched_client(*args, **kwargs):
|
||||
kwargs.setdefault("transport", make_target_transport(responses))
|
||||
return _real_AsyncClient(*args, **kwargs)
|
||||
|
||||
job = ScanJob(id="scan-3", target_url="http://target.com")
|
||||
register_scan(job)
|
||||
|
||||
with mock.patch("scanner.engine.httpx.AsyncClient", side_effect=patched_client):
|
||||
await run_scan(job, modules=["headers", "secrets", "directory"])
|
||||
|
||||
assert job.status == ScanStatus.COMPLETED
|
||||
modules_with_findings = {f.module for f in job.findings}
|
||||
assert "header_analyzer" in modules_with_findings
|
||||
assert "response_inspector" in modules_with_findings
|
||||
assert "directory_listing" in modules_with_findings
|
||||
|
||||
Reference in New Issue
Block a user