feat: FastAPI backend with all routes
This commit is contained in:
@@ -0,0 +1,83 @@
|
||||
import uuid
|
||||
from dataclasses import asdict
|
||||
|
||||
from fastapi import BackgroundTasks, FastAPI, HTTPException
|
||||
from fastapi.responses import JSONResponse
|
||||
from fastapi.staticfiles import StaticFiles
|
||||
from pydantic import BaseModel, HttpUrl
|
||||
|
||||
from scanner.engine import clear_scans, get_scan, list_scans, register_scan, run_scan
|
||||
from scanner.models import ScanJob
|
||||
|
||||
app = FastAPI(title="InfoLeak Scanner", version="1.0.0")
|
||||
|
||||
|
||||
class ScanRequest(BaseModel):
|
||||
url: HttpUrl
|
||||
modules: list[str] = ["paths", "headers", "secrets", "directory"]
|
||||
|
||||
|
||||
def _serialize(job: ScanJob) -> dict:
|
||||
return {
|
||||
"id": job.id,
|
||||
"target_url": job.target_url,
|
||||
"status": job.status.value,
|
||||
"findings": [
|
||||
{
|
||||
"severity": f.severity.value,
|
||||
"type": f.type,
|
||||
"url": f.url,
|
||||
"evidence": f.evidence,
|
||||
"module": f.module,
|
||||
}
|
||||
for f in job.findings
|
||||
],
|
||||
"progress": job.progress,
|
||||
"total": job.total,
|
||||
"error": job.error,
|
||||
"started_at": job.started_at,
|
||||
"completed_at": job.completed_at,
|
||||
}
|
||||
|
||||
|
||||
@app.post("/scan")
|
||||
async def start_scan(request: ScanRequest, background_tasks: BackgroundTasks):
|
||||
job = ScanJob(id=str(uuid.uuid4()), target_url=str(request.url))
|
||||
register_scan(job)
|
||||
background_tasks.add_task(run_scan, job, request.modules)
|
||||
return {"scan_id": job.id}
|
||||
|
||||
|
||||
@app.get("/scan/{scan_id}/export")
|
||||
async def export_scan(scan_id: str):
|
||||
job = get_scan(scan_id)
|
||||
if not job:
|
||||
raise HTTPException(status_code=404, detail="Scan not found")
|
||||
return JSONResponse(
|
||||
content=_serialize(job),
|
||||
headers={"Content-Disposition": f"attachment; filename=scan-{scan_id[:8]}.json"},
|
||||
)
|
||||
|
||||
|
||||
@app.get("/scan/{scan_id}")
|
||||
async def get_scan_status(scan_id: str):
|
||||
job = get_scan(scan_id)
|
||||
if not job:
|
||||
raise HTTPException(status_code=404, detail="Scan not found")
|
||||
return _serialize(job)
|
||||
|
||||
|
||||
@app.get("/scans")
|
||||
async def list_all_scans():
|
||||
return [
|
||||
{
|
||||
"id": j.id,
|
||||
"target_url": j.target_url,
|
||||
"status": j.status.value,
|
||||
"findings_count": len(j.findings),
|
||||
}
|
||||
for j in list_scans()
|
||||
]
|
||||
|
||||
|
||||
app.mount("/", StaticFiles(directory="static", html=True), name="static")
|
||||
@@ -0,0 +1 @@
|
||||
<!DOCTYPE html><html><body>loading...</body></html>
|
||||
@@ -0,0 +1,70 @@
|
||||
import pytest
|
||||
from fastapi.testclient import TestClient
|
||||
from scanner.engine import clear_scans
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def reset():
|
||||
clear_scans()
|
||||
yield
|
||||
clear_scans()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def client():
|
||||
from main import app
|
||||
return TestClient(app)
|
||||
|
||||
|
||||
def test_post_scan_returns_scan_id(client):
|
||||
response = client.post("/scan", json={"url": "http://example.com"})
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert "scan_id" in data
|
||||
assert len(data["scan_id"]) > 0
|
||||
|
||||
|
||||
def test_post_scan_invalid_url_rejected(client):
|
||||
response = client.post("/scan", json={"url": "not-a-url"})
|
||||
assert response.status_code == 422
|
||||
|
||||
|
||||
def test_get_scan_returns_job(client):
|
||||
post = client.post("/scan", json={"url": "http://example.com", "modules": []})
|
||||
scan_id = post.json()["scan_id"]
|
||||
|
||||
response = client.get(f"/scan/{scan_id}")
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert data["id"] == scan_id
|
||||
assert data["target_url"] == "http://example.com/"
|
||||
assert "findings" in data
|
||||
|
||||
|
||||
def test_get_scan_unknown_returns_404(client):
|
||||
response = client.get("/scan/nonexistent-id")
|
||||
assert response.status_code == 404
|
||||
|
||||
|
||||
def test_list_scans(client):
|
||||
client.post("/scan", json={"url": "http://example.com", "modules": []})
|
||||
client.post("/scan", json={"url": "http://other.com", "modules": []})
|
||||
response = client.get("/scans")
|
||||
assert response.status_code == 200
|
||||
assert len(response.json()) == 2
|
||||
|
||||
|
||||
def test_export_scan_returns_json_download(client):
|
||||
post = client.post("/scan", json={"url": "http://example.com", "modules": []})
|
||||
scan_id = post.json()["scan_id"]
|
||||
|
||||
response = client.get(f"/scan/{scan_id}/export")
|
||||
assert response.status_code == 200
|
||||
assert "attachment" in response.headers.get("content-disposition", "")
|
||||
data = response.json()
|
||||
assert data["id"] == scan_id
|
||||
|
||||
|
||||
def test_export_unknown_scan_returns_404(client):
|
||||
response = client.get("/scan/nope/export")
|
||||
assert response.status_code == 404
|
||||
Reference in New Issue
Block a user