Compare commits
2 Commits
a51be482e8
...
c4f5b1cee7
| Author | SHA1 | Date | |
|---|---|---|---|
| c4f5b1cee7 | |||
| 97b55487d6 |
@@ -0,0 +1,7 @@
|
||||
__pycache__/
|
||||
*.py[cod]
|
||||
.pytest_cache/
|
||||
.venv/
|
||||
venv/
|
||||
*.egg-info/
|
||||
.superpowers/
|
||||
@@ -74,6 +74,7 @@ async def list_all_scans():
|
||||
"target_url": j.target_url,
|
||||
"status": j.status.value,
|
||||
"findings_count": len(j.findings),
|
||||
"started_at": j.started_at,
|
||||
}
|
||||
for j in list_scans()
|
||||
]
|
||||
|
||||
@@ -2,7 +2,7 @@ import re
|
||||
|
||||
import httpx
|
||||
|
||||
from .models import Finding, Severity
|
||||
from .models import MAX_RESPONSE_BYTES, Finding, Severity
|
||||
|
||||
_PATTERNS = [
|
||||
re.compile(r'<title>Index of /', re.IGNORECASE),
|
||||
@@ -18,7 +18,7 @@ class DirectoryListingDetector:
|
||||
except (httpx.ConnectError, httpx.TimeoutException, httpx.RemoteProtocolError):
|
||||
return []
|
||||
|
||||
body = response.text
|
||||
body = response.text[:MAX_RESPONSE_BYTES]
|
||||
for pattern in _PATTERNS:
|
||||
if pattern.search(body):
|
||||
return [Finding(
|
||||
|
||||
+10
-8
@@ -32,17 +32,19 @@ async def run_scan(job: ScanJob, modules: list[str]) -> None:
|
||||
job.status = ScanStatus.RUNNING
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=5.0, follow_redirects=True) as client:
|
||||
# The base-URL modules are independent and each hit only the target
|
||||
# URL, so run them concurrently rather than serially.
|
||||
base_tasks = []
|
||||
if "headers" in modules:
|
||||
findings = await HeaderAnalyzer().analyze(client, job.target_url)
|
||||
job.findings.extend(findings)
|
||||
|
||||
base_tasks.append(HeaderAnalyzer().analyze(client, job.target_url))
|
||||
if "secrets" in modules:
|
||||
findings = await ResponseInspector().inspect(client, job.target_url)
|
||||
job.findings.extend(findings)
|
||||
|
||||
base_tasks.append(ResponseInspector().inspect(client, job.target_url))
|
||||
if "directory" in modules:
|
||||
findings = await DirectoryListingDetector().detect(client, job.target_url)
|
||||
job.findings.extend(findings)
|
||||
base_tasks.append(DirectoryListingDetector().detect(client, job.target_url))
|
||||
|
||||
if base_tasks:
|
||||
for findings in await asyncio.gather(*base_tasks):
|
||||
job.findings.extend(findings)
|
||||
|
||||
if "paths" in modules:
|
||||
prober = PathProber()
|
||||
|
||||
+7
-3
@@ -1,7 +1,11 @@
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
from enum import Enum
|
||||
from typing import Optional
|
||||
|
||||
# Maximum number of response body bytes any module reads into memory before
|
||||
# scanning. Caps memory use when a target serves a very large body within the
|
||||
# request timeout.
|
||||
MAX_RESPONSE_BYTES = 512 * 1024
|
||||
|
||||
|
||||
class Severity(str, Enum):
|
||||
@@ -36,6 +40,6 @@ class ScanJob:
|
||||
findings: list[Finding] = field(default_factory=list)
|
||||
progress: int = 0
|
||||
total: int = 0
|
||||
error: Optional[str] = None
|
||||
error: str | None = None
|
||||
started_at: float = field(default_factory=time.time)
|
||||
completed_at: Optional[float] = None
|
||||
completed_at: float | None = None
|
||||
|
||||
@@ -2,7 +2,7 @@ import re
|
||||
|
||||
import httpx
|
||||
|
||||
from .models import Finding, Severity
|
||||
from .models import MAX_RESPONSE_BYTES, Finding, Severity
|
||||
|
||||
_PATTERNS: list[tuple[Severity, str, re.Pattern]] = [
|
||||
(Severity.CRITICAL, "aws_access_key",
|
||||
@@ -29,7 +29,7 @@ class ResponseInspector:
|
||||
except (httpx.ConnectError, httpx.TimeoutException, httpx.RemoteProtocolError):
|
||||
return []
|
||||
|
||||
body = response.text[:524288]
|
||||
body = response.text[:MAX_RESPONSE_BYTES]
|
||||
findings: list[Finding] = []
|
||||
|
||||
for severity, finding_type, pattern in _PATTERNS:
|
||||
|
||||
@@ -64,3 +64,29 @@ async def test_run_scan_with_headers_module():
|
||||
assert job.status == ScanStatus.COMPLETED
|
||||
header_findings = [f for f in job.findings if f.module == "header_analyzer"]
|
||||
assert len(header_findings) > 0
|
||||
|
||||
|
||||
async def test_run_scan_base_modules_run_concurrently():
|
||||
# A response that triggers a finding in each of the three base-URL modules:
|
||||
# missing security headers (header_analyzer), an AWS key (response_inspector),
|
||||
# and a directory listing (directory_listing).
|
||||
body = '<title>Index of /</title> key=AKIAIOSFODNN7EXAMPLE'
|
||||
responses = {"/": (200, body)}
|
||||
|
||||
_real_AsyncClient = httpx.AsyncClient
|
||||
|
||||
def patched_client(*args, **kwargs):
|
||||
kwargs.setdefault("transport", make_target_transport(responses))
|
||||
return _real_AsyncClient(*args, **kwargs)
|
||||
|
||||
job = ScanJob(id="scan-3", target_url="http://target.com")
|
||||
register_scan(job)
|
||||
|
||||
with mock.patch("scanner.engine.httpx.AsyncClient", side_effect=patched_client):
|
||||
await run_scan(job, modules=["headers", "secrets", "directory"])
|
||||
|
||||
assert job.status == ScanStatus.COMPLETED
|
||||
modules_with_findings = {f.module for f in job.findings}
|
||||
assert "header_analyzer" in modules_with_findings
|
||||
assert "response_inspector" in modules_with_findings
|
||||
assert "directory_listing" in modules_with_findings
|
||||
|
||||
Reference in New Issue
Block a user