Files
Nirodan 9db922703b Add versioned DB migration system with automatic backup
migrations.py
- schema_migrations table tracks applied versions (version, description, applied_at)
- MIGRATIONS list is append-only; each entry is (version, description, sql)
- backup() dumps all user-data tables to a timestamped JSON file in backups/
  before any schema changes so data can be recovered if something goes wrong
- run_migrations() is idempotent: already-applied versions are skipped

Integration
- app.py calls _run_startup_migrations() at module load so every restart
  applies any pending migrations (no-op if schema is current)
- setup_routes.py calls run_migrations() after the initial setup form is
  submitted so all tables exist before the user hits the main page for the
  first time
- notes.py and admin.py: removed all per-request CREATE TABLE DDL; schema is
  now owned entirely by the migration system

Docker
- docker-compose.dev.yml: add backups-data volume so JSON backups survive
  container restarts and rebuilds
- Dockerfile: pre-create /app/backend/logs and /app/backend/backups so the
  directories exist even before volumes are mounted

Adding future schema changes
- Append a new (version, description, sql) tuple to MIGRATIONS in migrations.py
- The next restart will detect it as pending, back up first, then apply it

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-06 10:27:11 +02:00

162 lines
5.1 KiB
Python

"""
Schema migration system.
Rules for maintainers
---------------------
- NEVER modify or delete an existing entry in MIGRATIONS.
- ALWAYS append new entries at the end with an incremented version number.
- Use ALTER TABLE to change an existing table, not a new CREATE TABLE.
- Each entry must be a single, complete SQL statement.
How it works
------------
On startup (and after the initial setup form) run_migrations() is called.
It creates a `schema_migrations` table the first time, checks which versions
have already been applied, and runs any that are missing. A full JSON backup
of all user-data tables is written to the backups/ directory before any
schema changes are made so data can be recovered if something goes wrong.
"""
import json
import os
from datetime import datetime
from util.logger import logger
# ---------------------------------------------------------------------------
# Migration registry — append-only
# ---------------------------------------------------------------------------
MIGRATIONS = [
(1, "Create users table", """
CREATE TABLE IF NOT EXISTS users (
id INT AUTO_INCREMENT PRIMARY KEY,
username VARCHAR(50) NOT NULL UNIQUE,
password VARCHAR(255) NOT NULL,
role VARCHAR(20) NOT NULL
)
"""),
(2, "Create websites table", """
CREATE TABLE IF NOT EXISTS websites (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(100) NOT NULL,
url VARCHAR(255) NOT NULL,
description VARCHAR(255) DEFAULT ''
)
"""),
(3, "Create notes table", """
CREATE TABLE IF NOT EXISTS notes (
id INT AUTO_INCREMENT PRIMARY KEY,
user_id INT NOT NULL,
title VARCHAR(255) NOT NULL,
content TEXT,
language VARCHAR(50) DEFAULT 'text',
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
)
"""),
# ── add new migrations below this line ──────────────────────────────────
]
# ---------------------------------------------------------------------------
# Internals
# ---------------------------------------------------------------------------
_BACKUP_DIR = os.path.join(
os.path.dirname(os.path.abspath(__file__)), "..", "backups"
)
def _ensure_migration_table(conn):
cur = conn.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS schema_migrations (
version INT PRIMARY KEY,
description VARCHAR(255) NOT NULL,
applied_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
""")
conn.commit()
cur.close()
def _applied_versions(conn):
cur = conn.cursor()
cur.execute("SELECT version FROM schema_migrations")
versions = {row[0] for row in cur.fetchall()}
cur.close()
return versions
def backup(conn):
"""
Dump every user-data table to a timestamped JSON file.
Returns the path of the written file.
Datetime values are serialised as ISO-8601 strings.
"""
os.makedirs(_BACKUP_DIR, exist_ok=True)
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
path = os.path.join(_BACKUP_DIR, f"backup_{ts}.json")
cur = conn.cursor(dictionary=True)
cur.execute("SHOW TABLES")
tables = [list(row.values())[0] for row in cur.fetchall()]
snapshot = {}
for table in tables:
if table == "schema_migrations":
continue
cur.execute(f"SELECT * FROM `{table}`")
snapshot[table] = [
{
k: v.isoformat() if hasattr(v, "isoformat") else v
for k, v in row.items()
}
for row in cur.fetchall()
]
cur.close()
with open(path, "w", encoding="utf-8") as f:
json.dump(snapshot, f, indent=2, ensure_ascii=False)
logger.info(f"[migrations] Backup written → {path}")
return path
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def run_migrations(conn):
"""
Apply every pending migration in version order.
A backup is created automatically before the first schema change.
Safe to call on every startup — already-applied migrations are skipped.
"""
_ensure_migration_table(conn)
applied = _applied_versions(conn)
pending = [(v, d, sql) for v, d, sql in MIGRATIONS if v not in applied]
if not pending:
logger.info("[migrations] Schema is up to date.")
return
backup_path = backup(conn)
logger.info(
f"[migrations] {len(pending)} pending migration(s). "
f"Backup: {backup_path}"
)
cur = conn.cursor()
for version, description, sql in pending:
logger.info(f"[migrations] Applying v{version}: {description}")
cur.execute(sql.strip())
cur.execute(
"INSERT INTO schema_migrations (version, description) VALUES (%s, %s)",
(version, description),
)
conn.commit()
cur.close()
logger.info("[migrations] All migrations applied.")