fix(security+perf): SSRF protection, timing-safe auth, proxy cache, submit error handling

This commit is contained in:
2026-04-13 17:59:29 +00:00
parent b34d4062a4
commit 54ae313563

View File

@@ -3,11 +3,14 @@ from __future__ import annotations
import base64 import base64
import hashlib import hashlib
import hmac
import html as html_mod import html as html_mod
import ipaddress
import json import json
import os import os
import re import re
import shlex import shlex
import socket
import subprocess import subprocess
import threading import threading
import time import time
@@ -106,7 +109,7 @@ def _check_basic_auth(req: Request) -> bool:
if ":" not in raw: if ":" not in raw:
return False return False
user, pw = raw.split(":", 1) user, pw = raw.split(":", 1)
return user == BASIC_AUTH_USER and pw == BASIC_AUTH_PASS return hmac.compare_digest(user, BASIC_AUTH_USER) and hmac.compare_digest(pw, BASIC_AUTH_PASS)
def _auth_challenge() -> HTMLResponse: def _auth_challenge() -> HTMLResponse:
return HTMLResponse( return HTMLResponse(
@@ -237,8 +240,26 @@ def is_demo_link(name: str) -> bool:
lower = name.lower().replace("-", "_").replace(".", " ") lower = name.lower().replace("-", "_").replace(".", " ")
return any(pat in lower for pat in DEMO_PATTERNS) return any(pat in lower for pat in DEMO_PATTERNS)
def _is_ssrf_target(url: str) -> bool:
"""Return True if the URL resolves to a private/loopback address (SSRF protection)."""
try:
host = urllib.parse.urlparse(url).hostname or ""
try:
addr = ipaddress.ip_address(host)
except ValueError:
try:
host = socket.gethostbyname(host)
addr = ipaddress.ip_address(host)
except Exception:
return False
return addr.is_private or addr.is_loopback or addr.is_link_local or addr.is_reserved
except Exception:
return False
def check_url_reachable(url: str) -> Optional[str]: def check_url_reachable(url: str) -> Optional[str]:
"""Try a HEAD request to verify the URL is reachable. Returns error string or None.""" """Try a HEAD request to verify the URL is reachable. Returns error string or None."""
if _is_ssrf_target(url):
return "URL zeigt auf eine interne/private Adresse (nicht erlaubt)"
try: try:
req = urllib.request.Request(url, method="HEAD") req = urllib.request.Request(url, method="HEAD")
req.add_header("User-Agent", "Mozilla/5.0") req.add_header("User-Agent", "Mozilla/5.0")
@@ -435,13 +456,24 @@ def format_proxy_lines(raw: str, scheme: str) -> str:
return "\n".join(dedup) return "\n".join(dedup)
_PROXY_FETCH_LIMIT = 2 * 1024 * 1024 # 2 MB cap to prevent memory exhaustion
_proxy_cache: Dict[str, Tuple[float, str]] = {}
_PROXY_CACHE_TTL = 300.0 # 5 minutes
def fetch_proxy_list(url: str) -> str: def fetch_proxy_list(url: str) -> str:
now = time.time()
cached_ts, cached_text = _proxy_cache.get(url, (0.0, ""))
if cached_text and now - cached_ts < _PROXY_CACHE_TTL:
return cached_text
req = urllib.request.Request(url) req = urllib.request.Request(url)
log_connection(f"HTTP GET {url} (no-proxy)") log_connection(f"HTTP GET {url} (no-proxy)")
with NO_PROXY_OPENER.open(req, timeout=20) as resp: with NO_PROXY_OPENER.open(req, timeout=20) as resp:
text = resp.read().decode("utf-8", "replace") text = resp.read(_PROXY_FETCH_LIMIT).decode("utf-8", "replace")
if "\n" not in text and re.search(r"\s", text): if "
return re.sub(r"\s+", "\n", text.strip()) " not in text and re.search(r"\s", text):
text = re.sub(r"\s+", "
", text.strip())
_proxy_cache[url] = (now, text)
return text return text
def build_jdproxies_payload(text: str) -> Dict[str, Any]: def build_jdproxies_payload(text: str) -> Dict[str, Any]:
@@ -1181,7 +1213,10 @@ def submit(url: str = Form(...), package_name: str = Form(""), library: str = Fo
log_connection(f"URL-Check fehlgeschlagen: {url} -> {url_err}") log_connection(f"URL-Check fehlgeschlagen: {url} -> {url_err}")
return HTMLResponse(render_page(f"Link nicht erreichbar: {url_err}"), status_code=400) return HTMLResponse(render_page(f"Link nicht erreichbar: {url_err}"), status_code=400)
try:
dev = get_device() dev = get_device()
except Exception as e:
return HTMLResponse(render_page(f"JDownloader nicht erreichbar: {e}"), status_code=503)
resp = dev.linkgrabber.add_links([{ resp = dev.linkgrabber.add_links([{
"links": url, "links": url,
"autostart": True, "autostart": True,