Compare commits

..

No commits in common. "main" and "v1.0.2" have entirely different histories.
main ... v1.0.2

3 changed files with 461 additions and 648 deletions

View File

@ -1,26 +1,17 @@
# backend.py
import sqlite3
import csv
import logging
import html
from typing import Dict, Any, Optional
# Set up logging
logging.basicConfig(filename='anime_tracker.log', level=logging.ERROR,
format='%(asctime)s - %(levelname)s - %(message)s')
ALLOWED_STATUSES = {'unwatched', 'watching', 'completed'}
class AnimeBackend:
def __init__(self, db_path: str = 'anime_backlog.db'):
# Use autocommit mode (isolation_level=None). Keep commits explicit where needed.
self.db_path = db_path
self.db = sqlite3.connect(self.db_path, isolation_level=None, check_same_thread=False)
self.db.execute('PRAGMA journal_mode=WAL') # better concurrency
# Enable returning rows as tuples (default). Create table and useful indexes.
def __init__(self):
self.db = sqlite3.connect('anime_backlog.db', isolation_level=None) # Autocommit mode to prevent locks
self.db.execute('PRAGMA journal_mode=WAL') # Use WAL mode for better concurrency
self.create_table()
self.create_indexes()
def create_table(self):
try:
@ -37,51 +28,10 @@ class AnimeBackend:
url TEXT
)
""")
# No explicit commit required in autocommit mode, but keep for clarity
self.db.commit()
except Exception as e:
logging.error(f"Error creating table: {e}")
def create_indexes(self):
try:
cursor = self.db.cursor()
cursor.execute("CREATE INDEX IF NOT EXISTS idx_year_season ON anime(year, season)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_name_year_season ON anime(name, year, season)")
self.db.commit()
except Exception as e:
logging.error(f"Error creating indexes: {e}")
def sanitize_data(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""
Prepare and validate incoming data (from frontend or CSV) before writing to DB.
Backend is responsible for HTML-escaping text fields to avoid double-escaping problems.
"""
name = data.get('name') or ''
name = name.strip()
year = data.get('year') or 0
try:
year = int(year)
except Exception:
year = 0
season = (data.get('season') or '').strip()
status = (data.get('status') or 'unwatched').strip()
if status not in ALLOWED_STATUSES:
status = 'unwatched'
type_ = (data.get('type') or '').strip()
comment = (data.get('comment') or '').strip()
url = (data.get('url') or '').strip()
# HTML-escape user visible fields to prevent HTML injection in UI.
return {
'name': html.escape(name),
'year': year,
'season': season,
'status': status,
'type': html.escape(type_),
'comment': html.escape(comment),
'url': url
}
def get_pre_2010_entries(self):
try:
cursor = self.db.cursor()
@ -100,7 +50,7 @@ class AnimeBackend:
logging.error(f"Error getting years: {e}")
return []
def get_entries_for_season(self, year: int, season: str):
def get_entries_for_season(self, year, season):
try:
cursor = self.db.cursor()
cursor.execute("SELECT * FROM anime WHERE year = ? AND season = ? ORDER BY name", (year, season))
@ -109,7 +59,7 @@ class AnimeBackend:
logging.error(f"Error getting entries for season {season} in year {year}: {e}")
return []
def get_anime_by_id(self, anime_id: int) -> Optional[tuple]:
def get_anime_by_id(self, anime_id):
try:
cursor = self.db.cursor()
cursor.execute("SELECT * FROM anime WHERE id = ?", (anime_id,))
@ -118,52 +68,86 @@ class AnimeBackend:
logging.error(f"Error getting anime by id {anime_id}: {e}")
return None
def add_anime(self, data: Dict[str, Any]):
def add_anime(self, data):
try:
d = self.sanitize_data(data)
# Sanitize string inputs
sanitized_data = {
'name': html.escape(data['name'].strip()) if data['name'] else '',
'year': data['year'],
'season': data['season'].strip() if data['season'] else '',
'status': data['status'].strip() if data['status'] else 'unwatched',
'type': data['type'].strip() if data['type'] else '',
'comment': html.escape(data['comment'].strip()) if data['comment'] else '',
'url': data['url'].strip() if data['url'] else ''
}
cursor = self.db.cursor()
cursor.execute(
"INSERT INTO anime (name, year, season, status, type, comment, url) VALUES (?, ?, ?, ?, ?, ?, ?)",
(d['name'], d['year'], d['season'], d['status'], d['type'], d['comment'], d['url'])
(
sanitized_data['name'],
sanitized_data['year'],
sanitized_data['season'],
sanitized_data['status'],
sanitized_data['type'],
sanitized_data['comment'],
sanitized_data['url']
)
)
# autocommit mode ensures immediate write, but call commit for clarity
self.db.commit()
return cursor.lastrowid
except Exception as e:
logging.error(f"Error adding anime: {e}")
self.db.rollback()
def edit_anime(self, anime_id: int, data: Dict[str, Any]):
def edit_anime(self, anime_id, data):
try:
d = self.sanitize_data(data)
# Sanitize string inputs
sanitized_data = {
'name': html.escape(data['name'].strip()) if data['name'] else '',
'year': data['year'],
'season': data['season'].strip() if data['season'] else '',
'status': data['status'].strip() if data['status'] else 'unwatched',
'type': data['type'].strip() if data['type'] else '',
'comment': html.escape(data['comment'].strip()) if data['comment'] else '',
'url': data['url'].strip() if data['url'] else ''
}
cursor = self.db.cursor()
cursor.execute(
"UPDATE anime SET name=?, year=?, season=?, status=?, type=?, comment=?, url=? WHERE id=?",
(d['name'], d['year'], d['season'], d['status'], d['type'], d['comment'], d['url'], anime_id)
(
sanitized_data['name'],
sanitized_data['year'],
sanitized_data['season'],
sanitized_data['status'],
sanitized_data['type'],
sanitized_data['comment'],
sanitized_data['url'],
anime_id
)
)
self.db.commit()
except Exception as e:
logging.error(f"Error editing anime id {anime_id}: {e}")
self.db.rollback()
def delete_anime(self, anime_id: int):
def delete_anime(self, anime_id):
try:
cursor = self.db.cursor()
cursor.execute("DELETE FROM anime WHERE id = ?", (anime_id,))
self.db.commit()
except Exception as e:
logging.error(f"Error deleting anime id {anime_id}: {e}")
self.db.rollback()
def change_status(self, anime_id: int, new_status: str):
def change_status(self, anime_id, new_status):
try:
if new_status not in ALLOWED_STATUSES:
logging.error(f"Attempt to set invalid status: {new_status}")
return
cursor = self.db.cursor()
cursor.execute("UPDATE anime SET status = ? WHERE id = ?", (new_status, anime_id))
self.db.commit()
except Exception as e:
logging.error(f"Error changing status for anime id {anime_id}: {e}")
self.db.rollback()
def add_placeholders_for_year(self, year: int):
def add_placeholders_for_year(self, year):
try:
cursor = self.db.cursor()
for season in ['winter', 'spring', 'summer', 'fall', '']:
@ -174,80 +158,69 @@ class AnimeBackend:
self.db.commit()
except Exception as e:
logging.error(f"Error adding placeholders for year {year}: {e}")
self.db.rollback()
def import_from_csv(self, file_name: str):
skipped = 0
inserted = 0
def import_from_csv(self, file_name):
try:
with open(file_name, 'r', newline='', encoding='utf-8') as f:
with open(file_name, 'r', newline='') as f:
reader = csv.reader(f)
header = next(reader, None)
cursor = self.db.cursor()
for row in reader:
# Accept either 7 columns (no id) or 8 columns (with id)
if len(row) == 7:
name, year_str, season, status, type_, comment, url = row
elif len(row) == 8:
_, name, year_str, season, status, type_, comment, url = row
else:
skipped += 1
logging.error(f"Skipping CSV row with unexpected length {len(row)}: {row}")
continue
try:
year = int(year_str)
except ValueError:
skipped += 1
logging.error(f"Skipping CSV row with invalid year: {row}")
continue
# Prepare and sanitize
data = {
'name': name,
'year': year,
'season': season,
'status': status,
'type': type_,
'comment': comment,
'url': url
}
d = self.sanitize_data(data)
# Avoid duplicates by name/year/season
cursor.execute(
"SELECT id FROM anime WHERE name = ? AND year = ? AND season = ?",
(d['name'], d['year'], d['season'])
)
if not cursor.fetchone():
if header:
cursor = self.db.cursor()
for row in reader:
if len(row) == 7:
name, year_str, season, status, type_, comment, url = row
elif len(row) == 8:
_, name, year_str, season, status, type_, comment, url = row
else:
continue
try:
year = int(year_str)
except ValueError:
continue
# Sanitize CSV inputs
name = html.escape(name.strip()) if name else ''
season = season.strip() if season else ''
status = status.strip() if status else 'unwatched'
type_ = type_.strip() if type_ else ''
comment = html.escape(comment.strip()) if comment else ''
url = url.strip() if url else ''
cursor.execute(
"INSERT INTO anime (name, year, season, status, type, comment, url) VALUES (?, ?, ?, ?, ?, ?, ?)",
(d['name'], d['year'], d['season'], d['status'], d['type'], d['comment'], d['url'])
"SELECT id FROM anime WHERE name = ? AND year = ? AND season = ?",
(name, year, season)
)
inserted += 1
self.db.commit()
if not cursor.fetchone():
cursor.execute(
"INSERT INTO anime (name, year, season, status, type, comment, url) VALUES (?, ?, ?, ?, ?, ?, ?)",
(name, year, season, status, type_, comment, url)
)
self.db.commit()
except Exception as e:
logging.error(f"Error importing from CSV {file_name}: {e}")
finally:
logging.info(f"CSV import finished: inserted={inserted}, skipped={skipped}")
self.db.rollback()
def export_to_csv(self, file_name: str):
def export_to_csv(self, file_name):
try:
cursor = self.db.cursor()
cursor.execute("SELECT * FROM anime")
rows = cursor.fetchall()
with open(file_name, 'w', newline='', encoding='utf-8') as f:
with open(file_name, 'w', newline='') as f:
writer = csv.writer(f, quoting=csv.QUOTE_MINIMAL)
writer.writerow(['id', 'name', 'year', 'season', 'status', 'type', 'comment', 'url'])
writer.writerows(rows)
except Exception as e:
logging.error(f"Error exporting to CSV {file_name}: {e}")
def delete_year(self, year: int):
def delete_year(self, year):
try:
cursor = self.db.cursor()
cursor.execute("DELETE FROM anime WHERE year = ?", (year,))
self.db.commit()
except Exception as e:
logging.error(f"Error deleting year {year}: {e}")
self.db.rollback()
def get_total_entries(self) -> int:
def get_total_entries(self):
try:
cursor = self.db.cursor()
cursor.execute("SELECT COUNT(*) FROM anime")
@ -256,7 +229,7 @@ class AnimeBackend:
logging.error(f"Error getting total entries: {e}")
return 0
def get_completed_entries(self) -> int:
def get_completed_entries(self):
try:
cursor = self.db.cursor()
cursor.execute("SELECT COUNT(*) FROM anime WHERE status = 'completed'")
@ -269,23 +242,7 @@ class AnimeBackend:
try:
cursor = self.db.cursor()
cursor.execute("SELECT type, COUNT(*) FROM anime GROUP BY type ORDER BY 2 DESC")
return cursor.fetchall() # list of (type, count)
return cursor.fetchall() # Returns list of tuples: (type, count)
except Exception as e:
logging.error(f"Error getting entries by type: {e}")
return []
def close(self):
try:
if self.db:
self.db.close()
self.db = None
except Exception as e:
logging.error(f"Error closing DB: {e}")
def __del__(self):
try:
if getattr(self, 'db', None):
self.db.close()
except Exception:
pass

File diff suppressed because it is too large Load Diff

View File

@ -11,7 +11,7 @@ Then run a binary:
`./dist/AnimeTracker/AnimeTracker`
## How to update the binary file
`rm -rf ~/Applications/AnimeTracker/_internal && cp -r ~/Documents/programs/python/anime-tracker/dist/AnimeTracker ~/Applications/`
cp ~/Documents/programs/python/anime-tracker/dist/AnimeTracker/AnimeTracker ~/Applications/AnimeTracker/AnimeTracker
## How to run this app without building:
`python frontend.py`