anime-tracker-qt/backend.py
2025-08-08 16:31:49 +05:00

292 lines
12 KiB
Python

# backend.py
import sqlite3
import csv
import logging
import html
from typing import Dict, Any, Optional
# Set up logging
logging.basicConfig(filename='anime_tracker.log', level=logging.ERROR,
format='%(asctime)s - %(levelname)s - %(message)s')
ALLOWED_STATUSES = {'unwatched', 'watching', 'completed'}
class AnimeBackend:
def __init__(self, db_path: str = 'anime_backlog.db'):
# Use autocommit mode (isolation_level=None). Keep commits explicit where needed.
self.db_path = db_path
self.db = sqlite3.connect(self.db_path, isolation_level=None, check_same_thread=False)
self.db.execute('PRAGMA journal_mode=WAL') # better concurrency
# Enable returning rows as tuples (default). Create table and useful indexes.
self.create_table()
self.create_indexes()
def create_table(self):
try:
cursor = self.db.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS anime (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
year INTEGER NOT NULL,
season TEXT,
status TEXT NOT NULL DEFAULT 'unwatched',
type TEXT,
comment TEXT,
url TEXT
)
""")
# No explicit commit required in autocommit mode, but keep for clarity
self.db.commit()
except Exception as e:
logging.error(f"Error creating table: {e}")
def create_indexes(self):
try:
cursor = self.db.cursor()
cursor.execute("CREATE INDEX IF NOT EXISTS idx_year_season ON anime(year, season)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_name_year_season ON anime(name, year, season)")
self.db.commit()
except Exception as e:
logging.error(f"Error creating indexes: {e}")
def sanitize_data(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""
Prepare and validate incoming data (from frontend or CSV) before writing to DB.
Backend is responsible for HTML-escaping text fields to avoid double-escaping problems.
"""
name = data.get('name') or ''
name = name.strip()
year = data.get('year') or 0
try:
year = int(year)
except Exception:
year = 0
season = (data.get('season') or '').strip()
status = (data.get('status') or 'unwatched').strip()
if status not in ALLOWED_STATUSES:
status = 'unwatched'
type_ = (data.get('type') or '').strip()
comment = (data.get('comment') or '').strip()
url = (data.get('url') or '').strip()
# HTML-escape user visible fields to prevent HTML injection in UI.
return {
'name': html.escape(name),
'year': year,
'season': season,
'status': status,
'type': html.escape(type_),
'comment': html.escape(comment),
'url': url
}
def get_pre_2010_entries(self):
try:
cursor = self.db.cursor()
cursor.execute("SELECT * FROM anime WHERE year < 2010 ORDER BY year DESC, name")
return cursor.fetchall()
except Exception as e:
logging.error(f"Error getting pre-2010 entries: {e}")
return []
def get_years(self):
try:
cursor = self.db.cursor()
cursor.execute("SELECT DISTINCT year FROM anime WHERE year >= 2010 ORDER BY year DESC")
return [row[0] for row in cursor.fetchall()]
except Exception as e:
logging.error(f"Error getting years: {e}")
return []
def get_entries_for_season(self, year: int, season: str):
try:
cursor = self.db.cursor()
cursor.execute("SELECT * FROM anime WHERE year = ? AND season = ? ORDER BY name", (year, season))
return cursor.fetchall()
except Exception as e:
logging.error(f"Error getting entries for season {season} in year {year}: {e}")
return []
def get_anime_by_id(self, anime_id: int) -> Optional[tuple]:
try:
cursor = self.db.cursor()
cursor.execute("SELECT * FROM anime WHERE id = ?", (anime_id,))
return cursor.fetchone()
except Exception as e:
logging.error(f"Error getting anime by id {anime_id}: {e}")
return None
def add_anime(self, data: Dict[str, Any]):
try:
d = self.sanitize_data(data)
cursor = self.db.cursor()
cursor.execute(
"INSERT INTO anime (name, year, season, status, type, comment, url) VALUES (?, ?, ?, ?, ?, ?, ?)",
(d['name'], d['year'], d['season'], d['status'], d['type'], d['comment'], d['url'])
)
# autocommit mode ensures immediate write, but call commit for clarity
self.db.commit()
return cursor.lastrowid
except Exception as e:
logging.error(f"Error adding anime: {e}")
def edit_anime(self, anime_id: int, data: Dict[str, Any]):
try:
d = self.sanitize_data(data)
cursor = self.db.cursor()
cursor.execute(
"UPDATE anime SET name=?, year=?, season=?, status=?, type=?, comment=?, url=? WHERE id=?",
(d['name'], d['year'], d['season'], d['status'], d['type'], d['comment'], d['url'], anime_id)
)
self.db.commit()
except Exception as e:
logging.error(f"Error editing anime id {anime_id}: {e}")
def delete_anime(self, anime_id: int):
try:
cursor = self.db.cursor()
cursor.execute("DELETE FROM anime WHERE id = ?", (anime_id,))
self.db.commit()
except Exception as e:
logging.error(f"Error deleting anime id {anime_id}: {e}")
def change_status(self, anime_id: int, new_status: str):
try:
if new_status not in ALLOWED_STATUSES:
logging.error(f"Attempt to set invalid status: {new_status}")
return
cursor = self.db.cursor()
cursor.execute("UPDATE anime SET status = ? WHERE id = ?", (new_status, anime_id))
self.db.commit()
except Exception as e:
logging.error(f"Error changing status for anime id {anime_id}: {e}")
def add_placeholders_for_year(self, year: int):
try:
cursor = self.db.cursor()
for season in ['winter', 'spring', 'summer', 'fall', '']:
cursor.execute(
"INSERT INTO anime (name, year, season, status, type, comment, url) VALUES (?, ?, ?, ?, ?, ?, ?)",
('Placeholder', year, season, 'unwatched', '', 'Delete or edit me', '')
)
self.db.commit()
except Exception as e:
logging.error(f"Error adding placeholders for year {year}: {e}")
def import_from_csv(self, file_name: str):
skipped = 0
inserted = 0
try:
with open(file_name, 'r', newline='', encoding='utf-8') as f:
reader = csv.reader(f)
header = next(reader, None)
cursor = self.db.cursor()
for row in reader:
# Accept either 7 columns (no id) or 8 columns (with id)
if len(row) == 7:
name, year_str, season, status, type_, comment, url = row
elif len(row) == 8:
_, name, year_str, season, status, type_, comment, url = row
else:
skipped += 1
logging.error(f"Skipping CSV row with unexpected length {len(row)}: {row}")
continue
try:
year = int(year_str)
except ValueError:
skipped += 1
logging.error(f"Skipping CSV row with invalid year: {row}")
continue
# Prepare and sanitize
data = {
'name': name,
'year': year,
'season': season,
'status': status,
'type': type_,
'comment': comment,
'url': url
}
d = self.sanitize_data(data)
# Avoid duplicates by name/year/season
cursor.execute(
"SELECT id FROM anime WHERE name = ? AND year = ? AND season = ?",
(d['name'], d['year'], d['season'])
)
if not cursor.fetchone():
cursor.execute(
"INSERT INTO anime (name, year, season, status, type, comment, url) VALUES (?, ?, ?, ?, ?, ?, ?)",
(d['name'], d['year'], d['season'], d['status'], d['type'], d['comment'], d['url'])
)
inserted += 1
self.db.commit()
except Exception as e:
logging.error(f"Error importing from CSV {file_name}: {e}")
finally:
logging.info(f"CSV import finished: inserted={inserted}, skipped={skipped}")
def export_to_csv(self, file_name: str):
try:
cursor = self.db.cursor()
cursor.execute("SELECT * FROM anime")
rows = cursor.fetchall()
with open(file_name, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f, quoting=csv.QUOTE_MINIMAL)
writer.writerow(['id', 'name', 'year', 'season', 'status', 'type', 'comment', 'url'])
writer.writerows(rows)
except Exception as e:
logging.error(f"Error exporting to CSV {file_name}: {e}")
def delete_year(self, year: int):
try:
cursor = self.db.cursor()
cursor.execute("DELETE FROM anime WHERE year = ?", (year,))
self.db.commit()
except Exception as e:
logging.error(f"Error deleting year {year}: {e}")
def get_total_entries(self) -> int:
try:
cursor = self.db.cursor()
cursor.execute("SELECT COUNT(*) FROM anime")
return cursor.fetchone()[0]
except Exception as e:
logging.error(f"Error getting total entries: {e}")
return 0
def get_completed_entries(self) -> int:
try:
cursor = self.db.cursor()
cursor.execute("SELECT COUNT(*) FROM anime WHERE status = 'completed'")
return cursor.fetchone()[0]
except Exception as e:
logging.error(f"Error getting completed entries: {e}")
return 0
def get_entries_by_type(self):
try:
cursor = self.db.cursor()
cursor.execute("SELECT type, COUNT(*) FROM anime GROUP BY type ORDER BY 2 DESC")
return cursor.fetchall() # list of (type, count)
except Exception as e:
logging.error(f"Error getting entries by type: {e}")
return []
def close(self):
try:
if self.db:
self.db.close()
self.db = None
except Exception as e:
logging.error(f"Error closing DB: {e}")
def __del__(self):
try:
if getattr(self, 'db', None):
self.db.close()
except Exception:
pass