""" Schema migration system. Rules for maintainers --------------------- - NEVER modify or delete an existing entry in MIGRATIONS. - ALWAYS append new entries at the end with an incremented version number. - Use ALTER TABLE to change an existing table, not a new CREATE TABLE. - Each entry must be a single, complete SQL statement. How it works ------------ On startup (and after the initial setup form) run_migrations() is called. It creates a `schema_migrations` table the first time, checks which versions have already been applied, and runs any that are missing. A full JSON backup of all user-data tables is written to the backups/ directory before any schema changes are made so data can be recovered if something goes wrong. """ import json import os from datetime import datetime from util.logger import logger # --------------------------------------------------------------------------- # Migration registry — append-only # --------------------------------------------------------------------------- MIGRATIONS = [ (1, "Create users table", """ CREATE TABLE IF NOT EXISTS users ( id INT AUTO_INCREMENT PRIMARY KEY, username VARCHAR(50) NOT NULL UNIQUE, password VARCHAR(255) NOT NULL, role VARCHAR(20) NOT NULL ) """), (2, "Create websites table", """ CREATE TABLE IF NOT EXISTS websites ( id INT AUTO_INCREMENT PRIMARY KEY, name VARCHAR(100) NOT NULL, url VARCHAR(255) NOT NULL, description VARCHAR(255) DEFAULT '' ) """), (3, "Create notes table", """ CREATE TABLE IF NOT EXISTS notes ( id INT AUTO_INCREMENT PRIMARY KEY, user_id INT NOT NULL, title VARCHAR(255) NOT NULL, content TEXT, language VARCHAR(50) DEFAULT 'text', created_at DATETIME DEFAULT CURRENT_TIMESTAMP, updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP ) """), # ── add new migrations below this line ────────────────────────────────── ] # --------------------------------------------------------------------------- # Internals # --------------------------------------------------------------------------- _BACKUP_DIR = os.path.join( os.path.dirname(os.path.abspath(__file__)), "..", "backups" ) def _ensure_migration_table(conn): cur = conn.cursor() cur.execute(""" CREATE TABLE IF NOT EXISTS schema_migrations ( version INT PRIMARY KEY, description VARCHAR(255) NOT NULL, applied_at DATETIME DEFAULT CURRENT_TIMESTAMP ) """) conn.commit() cur.close() def _applied_versions(conn): cur = conn.cursor() cur.execute("SELECT version FROM schema_migrations") versions = {row[0] for row in cur.fetchall()} cur.close() return versions def backup(conn): """ Dump every user-data table to a timestamped JSON file. Returns the path of the written file. Datetime values are serialised as ISO-8601 strings. """ os.makedirs(_BACKUP_DIR, exist_ok=True) ts = datetime.now().strftime("%Y%m%d_%H%M%S") path = os.path.join(_BACKUP_DIR, f"backup_{ts}.json") cur = conn.cursor(dictionary=True) cur.execute("SHOW TABLES") tables = [list(row.values())[0] for row in cur.fetchall()] snapshot = {} for table in tables: if table == "schema_migrations": continue cur.execute(f"SELECT * FROM `{table}`") snapshot[table] = [ { k: v.isoformat() if hasattr(v, "isoformat") else v for k, v in row.items() } for row in cur.fetchall() ] cur.close() with open(path, "w", encoding="utf-8") as f: json.dump(snapshot, f, indent=2, ensure_ascii=False) logger.info(f"[migrations] Backup written → {path}") return path # --------------------------------------------------------------------------- # Public API # --------------------------------------------------------------------------- def run_migrations(conn): """ Apply every pending migration in version order. A backup is created automatically before the first schema change. Safe to call on every startup — already-applied migrations are skipped. """ _ensure_migration_table(conn) applied = _applied_versions(conn) pending = [(v, d, sql) for v, d, sql in MIGRATIONS if v not in applied] if not pending: logger.info("[migrations] Schema is up to date.") return backup_path = backup(conn) logger.info( f"[migrations] {len(pending)} pending migration(s). " f"Backup: {backup_path}" ) cur = conn.cursor() for version, description, sql in pending: logger.info(f"[migrations] Applying v{version}: {description}") cur.execute(sql.strip()) cur.execute( "INSERT INTO schema_migrations (version, description) VALUES (%s, %s)", (version, description), ) conn.commit() cur.close() logger.info("[migrations] All migrations applied.")