# backend.py import sqlite3 import csv import logging import html from typing import Dict, Any, Optional # Set up logging logging.basicConfig(filename='anime_tracker.log', level=logging.ERROR, format='%(asctime)s - %(levelname)s - %(message)s') ALLOWED_STATUSES = {'unwatched', 'watching', 'completed'} class AnimeBackend: def __init__(self, db_path: str = 'anime_backlog.db'): # Use autocommit mode (isolation_level=None). Keep commits explicit where needed. self.db_path = db_path self.db = sqlite3.connect(self.db_path, isolation_level=None, check_same_thread=False) self.db.execute('PRAGMA journal_mode=WAL') # better concurrency # Enable returning rows as tuples (default). Create table and useful indexes. self.create_table() self.create_indexes() def create_table(self): try: cursor = self.db.cursor() cursor.execute(""" CREATE TABLE IF NOT EXISTS anime ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, year INTEGER NOT NULL, season TEXT, status TEXT NOT NULL DEFAULT 'unwatched', type TEXT, comment TEXT, url TEXT ) """) # No explicit commit required in autocommit mode, but keep for clarity self.db.commit() except Exception as e: logging.error(f"Error creating table: {e}") def create_indexes(self): try: cursor = self.db.cursor() cursor.execute("CREATE INDEX IF NOT EXISTS idx_year_season ON anime(year, season)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_name_year_season ON anime(name, year, season)") self.db.commit() except Exception as e: logging.error(f"Error creating indexes: {e}") def sanitize_data(self, data: Dict[str, Any]) -> Dict[str, Any]: """ Prepare and validate incoming data (from frontend or CSV) before writing to DB. Backend is responsible for HTML-escaping text fields to avoid double-escaping problems. """ name = data.get('name') or '' name = name.strip() year = data.get('year') or 0 try: year = int(year) except Exception: year = 0 season = (data.get('season') or '').strip() status = (data.get('status') or 'unwatched').strip() if status not in ALLOWED_STATUSES: status = 'unwatched' type_ = (data.get('type') or '').strip() comment = (data.get('comment') or '').strip() url = (data.get('url') or '').strip() # HTML-escape user visible fields to prevent HTML injection in UI. return { 'name': html.escape(name), 'year': year, 'season': season, 'status': status, 'type': html.escape(type_), 'comment': html.escape(comment), 'url': url } def get_pre_2010_entries(self): try: cursor = self.db.cursor() cursor.execute("SELECT * FROM anime WHERE year < 2010 ORDER BY year DESC, name") return cursor.fetchall() except Exception as e: logging.error(f"Error getting pre-2010 entries: {e}") return [] def get_years(self): try: cursor = self.db.cursor() cursor.execute("SELECT DISTINCT year FROM anime WHERE year >= 2010 ORDER BY year DESC") return [row[0] for row in cursor.fetchall()] except Exception as e: logging.error(f"Error getting years: {e}") return [] def get_entries_for_season(self, year: int, season: str): try: cursor = self.db.cursor() cursor.execute("SELECT * FROM anime WHERE year = ? AND season = ? ORDER BY name", (year, season)) return cursor.fetchall() except Exception as e: logging.error(f"Error getting entries for season {season} in year {year}: {e}") return [] def get_anime_by_id(self, anime_id: int) -> Optional[tuple]: try: cursor = self.db.cursor() cursor.execute("SELECT * FROM anime WHERE id = ?", (anime_id,)) return cursor.fetchone() except Exception as e: logging.error(f"Error getting anime by id {anime_id}: {e}") return None def add_anime(self, data: Dict[str, Any]): try: d = self.sanitize_data(data) cursor = self.db.cursor() cursor.execute( "INSERT INTO anime (name, year, season, status, type, comment, url) VALUES (?, ?, ?, ?, ?, ?, ?)", (d['name'], d['year'], d['season'], d['status'], d['type'], d['comment'], d['url']) ) # autocommit mode ensures immediate write, but call commit for clarity self.db.commit() return cursor.lastrowid except Exception as e: logging.error(f"Error adding anime: {e}") def edit_anime(self, anime_id: int, data: Dict[str, Any]): try: d = self.sanitize_data(data) cursor = self.db.cursor() cursor.execute( "UPDATE anime SET name=?, year=?, season=?, status=?, type=?, comment=?, url=? WHERE id=?", (d['name'], d['year'], d['season'], d['status'], d['type'], d['comment'], d['url'], anime_id) ) self.db.commit() except Exception as e: logging.error(f"Error editing anime id {anime_id}: {e}") def delete_anime(self, anime_id: int): try: cursor = self.db.cursor() cursor.execute("DELETE FROM anime WHERE id = ?", (anime_id,)) self.db.commit() except Exception as e: logging.error(f"Error deleting anime id {anime_id}: {e}") def change_status(self, anime_id: int, new_status: str): try: if new_status not in ALLOWED_STATUSES: logging.error(f"Attempt to set invalid status: {new_status}") return cursor = self.db.cursor() cursor.execute("UPDATE anime SET status = ? WHERE id = ?", (new_status, anime_id)) self.db.commit() except Exception as e: logging.error(f"Error changing status for anime id {anime_id}: {e}") def add_placeholders_for_year(self, year: int): try: cursor = self.db.cursor() for season in ['winter', 'spring', 'summer', 'fall', '']: cursor.execute( "INSERT INTO anime (name, year, season, status, type, comment, url) VALUES (?, ?, ?, ?, ?, ?, ?)", ('Placeholder', year, season, 'unwatched', '', 'Delete or edit me', '') ) self.db.commit() except Exception as e: logging.error(f"Error adding placeholders for year {year}: {e}") def import_from_csv(self, file_name: str): skipped = 0 inserted = 0 try: with open(file_name, 'r', newline='', encoding='utf-8') as f: reader = csv.reader(f) header = next(reader, None) cursor = self.db.cursor() for row in reader: # Accept either 7 columns (no id) or 8 columns (with id) if len(row) == 7: name, year_str, season, status, type_, comment, url = row elif len(row) == 8: _, name, year_str, season, status, type_, comment, url = row else: skipped += 1 logging.error(f"Skipping CSV row with unexpected length {len(row)}: {row}") continue try: year = int(year_str) except ValueError: skipped += 1 logging.error(f"Skipping CSV row with invalid year: {row}") continue # Prepare and sanitize data = { 'name': name, 'year': year, 'season': season, 'status': status, 'type': type_, 'comment': comment, 'url': url } d = self.sanitize_data(data) # Avoid duplicates by name/year/season cursor.execute( "SELECT id FROM anime WHERE name = ? AND year = ? AND season = ?", (d['name'], d['year'], d['season']) ) if not cursor.fetchone(): cursor.execute( "INSERT INTO anime (name, year, season, status, type, comment, url) VALUES (?, ?, ?, ?, ?, ?, ?)", (d['name'], d['year'], d['season'], d['status'], d['type'], d['comment'], d['url']) ) inserted += 1 self.db.commit() except Exception as e: logging.error(f"Error importing from CSV {file_name}: {e}") finally: logging.info(f"CSV import finished: inserted={inserted}, skipped={skipped}") def export_to_csv(self, file_name: str): try: cursor = self.db.cursor() cursor.execute("SELECT * FROM anime") rows = cursor.fetchall() with open(file_name, 'w', newline='', encoding='utf-8') as f: writer = csv.writer(f, quoting=csv.QUOTE_MINIMAL) writer.writerow(['id', 'name', 'year', 'season', 'status', 'type', 'comment', 'url']) writer.writerows(rows) except Exception as e: logging.error(f"Error exporting to CSV {file_name}: {e}") def delete_year(self, year: int): try: cursor = self.db.cursor() cursor.execute("DELETE FROM anime WHERE year = ?", (year,)) self.db.commit() except Exception as e: logging.error(f"Error deleting year {year}: {e}") def get_total_entries(self) -> int: try: cursor = self.db.cursor() cursor.execute("SELECT COUNT(*) FROM anime") return cursor.fetchone()[0] except Exception as e: logging.error(f"Error getting total entries: {e}") return 0 def get_completed_entries(self) -> int: try: cursor = self.db.cursor() cursor.execute("SELECT COUNT(*) FROM anime WHERE status = 'completed'") return cursor.fetchone()[0] except Exception as e: logging.error(f"Error getting completed entries: {e}") return 0 def get_entries_by_type(self): try: cursor = self.db.cursor() cursor.execute("SELECT type, COUNT(*) FROM anime GROUP BY type ORDER BY 2 DESC") return cursor.fetchall() # list of (type, count) except Exception as e: logging.error(f"Error getting entries by type: {e}") return [] def close(self): try: if self.db: self.db.close() self.db = None except Exception as e: logging.error(f"Error closing DB: {e}") def __del__(self): try: if getattr(self, 'db', None): self.db.close() except Exception: pass