292 lines
12 KiB
Python
292 lines
12 KiB
Python
# backend.py
|
|
import sqlite3
|
|
import csv
|
|
import logging
|
|
import html
|
|
from typing import Dict, Any, Optional
|
|
|
|
# Set up logging
|
|
logging.basicConfig(filename='anime_tracker.log', level=logging.ERROR,
|
|
format='%(asctime)s - %(levelname)s - %(message)s')
|
|
|
|
ALLOWED_STATUSES = {'unwatched', 'watching', 'completed'}
|
|
|
|
|
|
class AnimeBackend:
|
|
def __init__(self, db_path: str = 'anime_backlog.db'):
|
|
# Use autocommit mode (isolation_level=None). Keep commits explicit where needed.
|
|
self.db_path = db_path
|
|
self.db = sqlite3.connect(self.db_path, isolation_level=None, check_same_thread=False)
|
|
self.db.execute('PRAGMA journal_mode=WAL') # better concurrency
|
|
# Enable returning rows as tuples (default). Create table and useful indexes.
|
|
self.create_table()
|
|
self.create_indexes()
|
|
|
|
def create_table(self):
|
|
try:
|
|
cursor = self.db.cursor()
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS anime (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
name TEXT NOT NULL,
|
|
year INTEGER NOT NULL,
|
|
season TEXT,
|
|
status TEXT NOT NULL DEFAULT 'unwatched',
|
|
type TEXT,
|
|
comment TEXT,
|
|
url TEXT
|
|
)
|
|
""")
|
|
# No explicit commit required in autocommit mode, but keep for clarity
|
|
self.db.commit()
|
|
except Exception as e:
|
|
logging.error(f"Error creating table: {e}")
|
|
|
|
def create_indexes(self):
|
|
try:
|
|
cursor = self.db.cursor()
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_year_season ON anime(year, season)")
|
|
cursor.execute("CREATE INDEX IF NOT EXISTS idx_name_year_season ON anime(name, year, season)")
|
|
self.db.commit()
|
|
except Exception as e:
|
|
logging.error(f"Error creating indexes: {e}")
|
|
|
|
def sanitize_data(self, data: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Prepare and validate incoming data (from frontend or CSV) before writing to DB.
|
|
Backend is responsible for HTML-escaping text fields to avoid double-escaping problems.
|
|
"""
|
|
name = data.get('name') or ''
|
|
name = name.strip()
|
|
year = data.get('year') or 0
|
|
try:
|
|
year = int(year)
|
|
except Exception:
|
|
year = 0
|
|
season = (data.get('season') or '').strip()
|
|
status = (data.get('status') or 'unwatched').strip()
|
|
if status not in ALLOWED_STATUSES:
|
|
status = 'unwatched'
|
|
type_ = (data.get('type') or '').strip()
|
|
comment = (data.get('comment') or '').strip()
|
|
url = (data.get('url') or '').strip()
|
|
|
|
# HTML-escape user visible fields to prevent HTML injection in UI.
|
|
return {
|
|
'name': html.escape(name),
|
|
'year': year,
|
|
'season': season,
|
|
'status': status,
|
|
'type': html.escape(type_),
|
|
'comment': html.escape(comment),
|
|
'url': url
|
|
}
|
|
|
|
def get_pre_2010_entries(self):
|
|
try:
|
|
cursor = self.db.cursor()
|
|
cursor.execute("SELECT * FROM anime WHERE year < 2010 ORDER BY year DESC, name")
|
|
return cursor.fetchall()
|
|
except Exception as e:
|
|
logging.error(f"Error getting pre-2010 entries: {e}")
|
|
return []
|
|
|
|
def get_years(self):
|
|
try:
|
|
cursor = self.db.cursor()
|
|
cursor.execute("SELECT DISTINCT year FROM anime WHERE year >= 2010 ORDER BY year DESC")
|
|
return [row[0] for row in cursor.fetchall()]
|
|
except Exception as e:
|
|
logging.error(f"Error getting years: {e}")
|
|
return []
|
|
|
|
def get_entries_for_season(self, year: int, season: str):
|
|
try:
|
|
cursor = self.db.cursor()
|
|
cursor.execute("SELECT * FROM anime WHERE year = ? AND season = ? ORDER BY name", (year, season))
|
|
return cursor.fetchall()
|
|
except Exception as e:
|
|
logging.error(f"Error getting entries for season {season} in year {year}: {e}")
|
|
return []
|
|
|
|
def get_anime_by_id(self, anime_id: int) -> Optional[tuple]:
|
|
try:
|
|
cursor = self.db.cursor()
|
|
cursor.execute("SELECT * FROM anime WHERE id = ?", (anime_id,))
|
|
return cursor.fetchone()
|
|
except Exception as e:
|
|
logging.error(f"Error getting anime by id {anime_id}: {e}")
|
|
return None
|
|
|
|
def add_anime(self, data: Dict[str, Any]):
|
|
try:
|
|
d = self.sanitize_data(data)
|
|
cursor = self.db.cursor()
|
|
cursor.execute(
|
|
"INSERT INTO anime (name, year, season, status, type, comment, url) VALUES (?, ?, ?, ?, ?, ?, ?)",
|
|
(d['name'], d['year'], d['season'], d['status'], d['type'], d['comment'], d['url'])
|
|
)
|
|
# autocommit mode ensures immediate write, but call commit for clarity
|
|
self.db.commit()
|
|
return cursor.lastrowid
|
|
except Exception as e:
|
|
logging.error(f"Error adding anime: {e}")
|
|
|
|
def edit_anime(self, anime_id: int, data: Dict[str, Any]):
|
|
try:
|
|
d = self.sanitize_data(data)
|
|
cursor = self.db.cursor()
|
|
cursor.execute(
|
|
"UPDATE anime SET name=?, year=?, season=?, status=?, type=?, comment=?, url=? WHERE id=?",
|
|
(d['name'], d['year'], d['season'], d['status'], d['type'], d['comment'], d['url'], anime_id)
|
|
)
|
|
self.db.commit()
|
|
except Exception as e:
|
|
logging.error(f"Error editing anime id {anime_id}: {e}")
|
|
|
|
def delete_anime(self, anime_id: int):
|
|
try:
|
|
cursor = self.db.cursor()
|
|
cursor.execute("DELETE FROM anime WHERE id = ?", (anime_id,))
|
|
self.db.commit()
|
|
except Exception as e:
|
|
logging.error(f"Error deleting anime id {anime_id}: {e}")
|
|
|
|
def change_status(self, anime_id: int, new_status: str):
|
|
try:
|
|
if new_status not in ALLOWED_STATUSES:
|
|
logging.error(f"Attempt to set invalid status: {new_status}")
|
|
return
|
|
cursor = self.db.cursor()
|
|
cursor.execute("UPDATE anime SET status = ? WHERE id = ?", (new_status, anime_id))
|
|
self.db.commit()
|
|
except Exception as e:
|
|
logging.error(f"Error changing status for anime id {anime_id}: {e}")
|
|
|
|
def add_placeholders_for_year(self, year: int):
|
|
try:
|
|
cursor = self.db.cursor()
|
|
for season in ['winter', 'spring', 'summer', 'fall', '']:
|
|
cursor.execute(
|
|
"INSERT INTO anime (name, year, season, status, type, comment, url) VALUES (?, ?, ?, ?, ?, ?, ?)",
|
|
('Placeholder', year, season, 'unwatched', '', 'Delete or edit me', '')
|
|
)
|
|
self.db.commit()
|
|
except Exception as e:
|
|
logging.error(f"Error adding placeholders for year {year}: {e}")
|
|
|
|
def import_from_csv(self, file_name: str):
|
|
skipped = 0
|
|
inserted = 0
|
|
try:
|
|
with open(file_name, 'r', newline='', encoding='utf-8') as f:
|
|
reader = csv.reader(f)
|
|
header = next(reader, None)
|
|
cursor = self.db.cursor()
|
|
for row in reader:
|
|
# Accept either 7 columns (no id) or 8 columns (with id)
|
|
if len(row) == 7:
|
|
name, year_str, season, status, type_, comment, url = row
|
|
elif len(row) == 8:
|
|
_, name, year_str, season, status, type_, comment, url = row
|
|
else:
|
|
skipped += 1
|
|
logging.error(f"Skipping CSV row with unexpected length {len(row)}: {row}")
|
|
continue
|
|
try:
|
|
year = int(year_str)
|
|
except ValueError:
|
|
skipped += 1
|
|
logging.error(f"Skipping CSV row with invalid year: {row}")
|
|
continue
|
|
# Prepare and sanitize
|
|
data = {
|
|
'name': name,
|
|
'year': year,
|
|
'season': season,
|
|
'status': status,
|
|
'type': type_,
|
|
'comment': comment,
|
|
'url': url
|
|
}
|
|
d = self.sanitize_data(data)
|
|
# Avoid duplicates by name/year/season
|
|
cursor.execute(
|
|
"SELECT id FROM anime WHERE name = ? AND year = ? AND season = ?",
|
|
(d['name'], d['year'], d['season'])
|
|
)
|
|
if not cursor.fetchone():
|
|
cursor.execute(
|
|
"INSERT INTO anime (name, year, season, status, type, comment, url) VALUES (?, ?, ?, ?, ?, ?, ?)",
|
|
(d['name'], d['year'], d['season'], d['status'], d['type'], d['comment'], d['url'])
|
|
)
|
|
inserted += 1
|
|
self.db.commit()
|
|
except Exception as e:
|
|
logging.error(f"Error importing from CSV {file_name}: {e}")
|
|
finally:
|
|
logging.info(f"CSV import finished: inserted={inserted}, skipped={skipped}")
|
|
|
|
def export_to_csv(self, file_name: str):
|
|
try:
|
|
cursor = self.db.cursor()
|
|
cursor.execute("SELECT * FROM anime")
|
|
rows = cursor.fetchall()
|
|
with open(file_name, 'w', newline='', encoding='utf-8') as f:
|
|
writer = csv.writer(f, quoting=csv.QUOTE_MINIMAL)
|
|
writer.writerow(['id', 'name', 'year', 'season', 'status', 'type', 'comment', 'url'])
|
|
writer.writerows(rows)
|
|
except Exception as e:
|
|
logging.error(f"Error exporting to CSV {file_name}: {e}")
|
|
|
|
def delete_year(self, year: int):
|
|
try:
|
|
cursor = self.db.cursor()
|
|
cursor.execute("DELETE FROM anime WHERE year = ?", (year,))
|
|
self.db.commit()
|
|
except Exception as e:
|
|
logging.error(f"Error deleting year {year}: {e}")
|
|
|
|
def get_total_entries(self) -> int:
|
|
try:
|
|
cursor = self.db.cursor()
|
|
cursor.execute("SELECT COUNT(*) FROM anime")
|
|
return cursor.fetchone()[0]
|
|
except Exception as e:
|
|
logging.error(f"Error getting total entries: {e}")
|
|
return 0
|
|
|
|
def get_completed_entries(self) -> int:
|
|
try:
|
|
cursor = self.db.cursor()
|
|
cursor.execute("SELECT COUNT(*) FROM anime WHERE status = 'completed'")
|
|
return cursor.fetchone()[0]
|
|
except Exception as e:
|
|
logging.error(f"Error getting completed entries: {e}")
|
|
return 0
|
|
|
|
def get_entries_by_type(self):
|
|
try:
|
|
cursor = self.db.cursor()
|
|
cursor.execute("SELECT type, COUNT(*) FROM anime GROUP BY type ORDER BY 2 DESC")
|
|
return cursor.fetchall() # list of (type, count)
|
|
except Exception as e:
|
|
logging.error(f"Error getting entries by type: {e}")
|
|
return []
|
|
|
|
def close(self):
|
|
try:
|
|
if self.db:
|
|
self.db.close()
|
|
self.db = None
|
|
except Exception as e:
|
|
logging.error(f"Error closing DB: {e}")
|
|
|
|
def __del__(self):
|
|
try:
|
|
if getattr(self, 'db', None):
|
|
self.db.close()
|
|
except Exception:
|
|
pass
|
|
|