Files
Tools/backend/admin.py
T
Nirodan 98bb34f094 Fix bugs, add log rotation, and optimize hot paths
- Fix AttributeError crash on empty request body in md5, hasher, textdiff,
  jwtdecoder, timestamp, passwordgen (get_json without silent=True / or {})
- Fix memory exhaustion in ipcalc: replace list(network.hosts()) with direct
  arithmetic — safe for /8 and larger networks
- Fix O(1M) loop in cronexplainer.get_next_runs: rewrite to skip by
  month/day/hour instead of iterating every minute
- Fix connection leak in notes.ensure_table: add try/finally around conn.close
- Fix admin._ensure_tables / notes._ensure_table running DDL on every request:
  guard with module-level flags (_tables_initialized, _table_ready)
- Fix update_website returning 200 when no row matched; delete_website returning
  success when nothing was deleted; add rowcount checks for both
- Add role validation in admin create_user / update_user (_VALID_ROLES guard)
- Add delimiter length guard in csvviewer (csv.reader requires single char)
- Fix loremipsum: wrap int(count) in try/except ValueError → 400 response
- Fix auth/token: use auth_header[7:] instead of fragile .replace()
- Fix app.py: remove duplicate import sys; cache DB liveness check with 30s TTL
  to avoid a new TCP connection on every frontend page load; move api/setup
  path guard before DB check
- Replace FileHandler with RotatingFileHandler (5 MB / 3 backups) in logger;
  fix relative log paths to absolute paths anchored to __file__
- Wrap all DB connections in try/finally conn.close() throughout admin and notes

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-06 10:06:29 +02:00

307 lines
10 KiB
Python

from flask import Blueprint, request, jsonify
from werkzeug.security import generate_password_hash
from auth.token import verify_token
from util.db_pool import get_connection
from util.logger import logger
admin_bp = Blueprint("admin", __name__)
_VALID_ROLES = {"user", "admin"}
_tables_initialized = False
def _require_admin():
user = verify_token()
if not user:
return None, (jsonify({"message": "Nicht autorisiert"}), 401)
if user.get("role") != "admin":
logger.warning("🚫 Adminbereich verweigert (kein Admin)")
return None, (jsonify({"message": "Adminrechte erforderlich"}), 403)
return user, None
def _ensure_tables(cur):
global _tables_initialized
if _tables_initialized:
return
cur.execute(
"""
CREATE TABLE IF NOT EXISTS users (
id INT AUTO_INCREMENT PRIMARY KEY,
username VARCHAR(50) NOT NULL UNIQUE,
password VARCHAR(255) NOT NULL,
role VARCHAR(20) NOT NULL
)
"""
)
cur.execute(
"""
CREATE TABLE IF NOT EXISTS websites (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(100) NOT NULL,
url VARCHAR(255) NOT NULL,
description VARCHAR(255) DEFAULT ''
)
"""
)
_tables_initialized = True
@admin_bp.route("/api/admin/users", methods=["GET"])
def list_users():
_, err = _require_admin()
if err:
return err
try:
conn = get_connection()
try:
cur = conn.cursor(dictionary=True)
_ensure_tables(cur)
cur.execute("SELECT id, username, role FROM users ORDER BY username ASC")
users = cur.fetchall()
cur.close()
finally:
conn.close()
return jsonify(users)
except Exception as e:
logger.error(f"[Admin list_users] {e}")
return jsonify({"message": "Serverfehler"}), 500
@admin_bp.route("/api/admin/users", methods=["POST"])
def create_user():
admin, err = _require_admin()
if err:
return err
data = request.get_json(silent=True) or {}
username = data.get("username", "").strip()
password = data.get("password", "")
role = data.get("role", "user")
if not username or not password:
return jsonify({"message": "Username und Passwort erforderlich"}), 400
if role not in _VALID_ROLES:
return jsonify({"message": f"Ungültige Rolle. Erlaubt: {', '.join(_VALID_ROLES)}"}), 400
try:
conn = get_connection()
try:
cur = conn.cursor(dictionary=True)
_ensure_tables(cur)
cur.execute("SELECT id FROM users WHERE username=%s", (username,))
if cur.fetchone():
cur.close()
return jsonify({"message": "Nutzer existiert bereits"}), 409
cur.execute(
"INSERT INTO users (username, password, role) VALUES (%s, %s, %s)",
(username, generate_password_hash(password), role)
)
conn.commit()
new_id = cur.lastrowid
cur.close()
finally:
conn.close()
logger.info(f"✅ User erstellt: {username} durch {admin['username']}")
return jsonify({"id": new_id, "username": username, "role": role}), 201
except Exception as e:
logger.error(f"[Admin create_user] {e}")
return jsonify({"message": "Serverfehler"}), 500
@admin_bp.route("/api/admin/users/<int:user_id>", methods=["PUT"])
def update_user(user_id):
admin, err = _require_admin()
if err:
return err
data = request.get_json(silent=True) or {}
role = data.get("role")
password = data.get("password")
if role is None and password is None:
return jsonify({"message": "Nichts zu aktualisieren"}), 400
if role is not None and role not in _VALID_ROLES:
return jsonify({"message": f"Ungültige Rolle. Erlaubt: {', '.join(_VALID_ROLES)}"}), 400
try:
conn = get_connection()
try:
cur = conn.cursor()
_ensure_tables(cur)
if role:
cur.execute("UPDATE users SET role=%s WHERE id=%s", (role, user_id))
if password:
cur.execute(
"UPDATE users SET password=%s WHERE id=%s",
(generate_password_hash(password), user_id)
)
conn.commit()
cur.close()
finally:
conn.close()
logger.info(f"✏️ User aktualisiert (id={user_id}) durch {admin['username']}")
return jsonify({"message": "Aktualisiert"}), 200
except Exception as e:
logger.error(f"[Admin update_user] {e}")
return jsonify({"message": "Serverfehler"}), 500
@admin_bp.route("/api/admin/users/<int:user_id>", methods=["DELETE"])
def delete_user(user_id):
admin, err = _require_admin()
if err:
return err
try:
conn = get_connection()
try:
cur = conn.cursor()
_ensure_tables(cur)
cur.execute("SELECT username FROM users WHERE id=%s", (user_id,))
row = cur.fetchone()
if not row:
cur.close()
return jsonify({"message": "Nicht gefunden"}), 404
username = row[0]
if username == admin["username"]:
cur.close()
return jsonify({"message": "Du kannst dich nicht selbst löschen"}), 400
cur.execute("DELETE FROM users WHERE id=%s", (user_id,))
conn.commit()
cur.close()
finally:
conn.close()
logger.info(f"🗑️ User gelöscht (id={user_id}) durch {admin['username']}")
return jsonify({"message": "Gelöscht"}), 200
except Exception as e:
logger.error(f"[Admin delete_user] {e}")
return jsonify({"message": "Serverfehler"}), 500
# ---------- Websites (Admin CRUD) ----------
@admin_bp.route("/api/admin/websites", methods=["GET"])
def list_websites_admin():
_, err = _require_admin()
if err:
return err
try:
conn = get_connection()
try:
cur = conn.cursor(dictionary=True)
_ensure_tables(cur)
cur.execute("SELECT id, name, url, description FROM websites ORDER BY name ASC")
rows = cur.fetchall()
cur.close()
finally:
conn.close()
return jsonify(rows)
except Exception as e:
logger.error(f"[Admin list_websites] {e}")
return jsonify({"message": "Serverfehler"}), 500
@admin_bp.route("/api/admin/websites", methods=["POST"])
def create_website():
_, err = _require_admin()
if err:
return err
data = request.get_json(silent=True) or {}
name = data.get("name", "").strip()
url = data.get("url", "").strip()
description = data.get("description", "").strip()
if not name or not url:
return jsonify({"message": "Name und URL erforderlich"}), 400
try:
conn = get_connection()
try:
cur = conn.cursor()
_ensure_tables(cur)
cur.execute(
"INSERT INTO websites (name, url, description) VALUES (%s, %s, %s)",
(name, url, description),
)
conn.commit()
new_id = cur.lastrowid
cur.close()
finally:
conn.close()
return jsonify({"id": new_id, "name": name, "url": url, "description": description}), 201
except Exception as e:
logger.error(f"[Admin create_website] {e}")
return jsonify({"message": "Serverfehler"}), 500
@admin_bp.route("/api/admin/websites/<int:item_id>", methods=["PUT"])
def update_website(item_id):
_, err = _require_admin()
if err:
return err
data = request.get_json(silent=True) or {}
name = data.get("name", "").strip()
url = data.get("url", "").strip()
if not name or not url:
return jsonify({"message": "Name und URL erforderlich"}), 400
description = data.get("description", "").strip()
try:
conn = get_connection()
try:
cur = conn.cursor()
_ensure_tables(cur)
cur.execute(
"UPDATE websites SET name=%s, url=%s, description=%s WHERE id=%s",
(name, url, description, item_id),
)
conn.commit()
affected = cur.rowcount
cur.close()
finally:
conn.close()
if affected == 0:
return jsonify({"message": "Nicht gefunden"}), 404
return jsonify({"message": "Aktualisiert"}), 200
except Exception as e:
logger.error(f"[Admin update_website] {e}")
return jsonify({"message": "Serverfehler"}), 500
@admin_bp.route("/api/admin/websites/<int:item_id>", methods=["DELETE"])
def delete_website(item_id):
_, err = _require_admin()
if err:
return err
try:
conn = get_connection()
try:
cur = conn.cursor()
_ensure_tables(cur)
cur.execute("DELETE FROM websites WHERE id=%s", (item_id,))
conn.commit()
affected = cur.rowcount
cur.close()
finally:
conn.close()
if affected == 0:
return jsonify({"message": "Nicht gefunden"}), 404
return jsonify({"message": "Gelöscht"}), 200
except Exception as e:
logger.error(f"[Admin delete_website] {e}")
return jsonify({"message": "Serverfehler"}), 500
# ---------- Public (logged-in) endpoints ----------
@admin_bp.route("/api/websites", methods=["GET"])
def list_websites_public():
user = verify_token()
if not user:
return jsonify({"message": "Nicht autorisiert"}), 401
try:
conn = get_connection()
try:
cur = conn.cursor(dictionary=True)
_ensure_tables(cur)
cur.execute("SELECT id, name, url, description FROM websites ORDER BY name ASC")
rows = cur.fetchall()
cur.close()
finally:
conn.close()
return jsonify(rows)
except Exception as e:
logger.error(f"[Public list_websites] {e}")
return jsonify({"message": "Serverfehler"}), 500