Files
Tools/backend/app.py
T
2025-06-17 10:43:22 +02:00

161 lines
5.1 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from flask import Flask, request, render_template, jsonify, redirect, send_from_directory
from datenbankverbindung import lade_db_config, speichere_db_config, teste_verbindung, initialisiere_admin_user
from datetime import datetime, timedelta
from jwt import decode, ExpiredSignatureError, InvalidTokenError
import time
import os
import jwt
app = Flask(__name__)
app.template_folder = "templates"
SECRET_KEY = "bitte_hier_dein_geheimes_passwort_setzen" # Achtung: später z.B. aus .env
CONFIG_PATH = "config/db_config.json"
MAX_WAIT = 30 # In Sekunden
WAIT_INTERVAL = 10
def ist_konfiguriert():
return os.path.exists(CONFIG_PATH)
def versuche_verbindung_mit_warten():
elapsed = 0
if not ist_konfiguriert():
return False
config = lade_db_config()
while not teste_verbindung(config) and elapsed < MAX_WAIT:
print(f"[INFO] DB nicht erreichbar warte {WAIT_INTERVAL}s...")
time.sleep(WAIT_INTERVAL)
elapsed += WAIT_INTERVAL
return elapsed < MAX_WAIT
@app.route('/api/status')
def status():
if not ist_konfiguriert():
return jsonify({"status": "init", "db_connected": False})
elif teste_verbindung(lade_db_config()):
return jsonify({"status": "ready", "db_connected": True})
else:
return jsonify({"status": "error", "db_connected": False})
@app.route('/setup', methods=['GET', 'POST'])
def setup():
if request.method == 'POST':
db_config = {
"host": request.form['host'],
"port": int(request.form['port']),
"user": request.form['user'],
"password": request.form['password'],
"database": request.form['database']
}
speichere_db_config(db_config)
if teste_verbindung(db_config):
initialisiere_admin_user(db_config) # <- das hier NEU
return redirect('/')
else:
return "Verbindung fehlgeschlagen. Bitte zurück und prüfen.", 500
return render_template('setup.html')
@app.route('/', defaults={'path': ''})
@app.route('/<path:path>')
def serve_react(path):
if not ist_konfiguriert() or not teste_verbindung(lade_db_config()):
return redirect('/setup')
# setup und API dürfen nicht von React überdeckt werden
if path.startswith('setup') or path.startswith('api'):
return redirect(f'/{path}')
# Pfad zu frontend/dist absolut auflösen
dist_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'frontend', 'dist'))
file_path = os.path.join(dist_dir, path)
if path and os.path.exists(file_path):
return send_from_directory(dist_dir, path)
else:
return send_from_directory(dist_dir, 'index.html')
@app.route('/api/login', methods=['POST'])
def login():
from mysql.connector import connect
from werkzeug.security import check_password_hash
data = request.get_json()
username = data.get('username')
password = data.get('password')
try:
config = lade_db_config()
conn = connect(**config)
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM users WHERE username = %s", (username,))
user = cursor.fetchone()
cursor.close()
conn.close()
if user and check_password_hash(user['password'], password):
# JWT generieren
payload = {
"username": user['username'],
"role": user['role'],
"exp": datetime.utcnow() + timedelta(minutes=60) # Token läuft nach 60 Min ab
}
token = jwt.encode(payload, SECRET_KEY, algorithm="HS256")
return jsonify({
"token": token,
"role": user['role']
})
return jsonify({"message": "Login fehlgeschlagen"}), 401
except Exception as e:
print("[Login-Fehler]", e)
return jsonify({"message": "Serverfehler"}), 500
@app.route('/api/logout', methods=['POST'])
def logout():
# Aktuell macht das nichts, aber der Client bekommt Bestätigung
return jsonify({"message": "Logout erfolgreich"})
def verify_token():
auth_header = request.headers.get("Authorization", "")
if not auth_header.startswith("Bearer "):
return None
token = auth_header.replace("Bearer ", "")
try:
decoded = decode(token, SECRET_KEY, algorithms=["HS256"])
return decoded # enthält z.B. username, role, exp
except ExpiredSignatureError:
print("[JWT] Token abgelaufen")
return None
except InvalidTokenError:
print("[JWT] Ungültiger Token")
return None
@app.route('/api/hash/md5', methods=['POST'])
def hash_md5():
user = verify_token()
if not user:
return jsonify({"message": "Nicht autorisiert"}), 401
data = request.get_json()
password = data.get("password", "")
import hashlib
result = hashlib.md5(password.encode()).hexdigest()
return jsonify({
"md5": result,
"by": user['username']
})
if __name__ == '__main__':
os.makedirs("config", exist_ok=True)
app.run(host='0.0.0.0', port=5000, debug=True) # debug=True nur für Entwicklung, nicht in Produktion verwenden!