248 lines
10 KiB
Python
248 lines
10 KiB
Python
import sqlite3
|
|
import csv
|
|
import logging
|
|
import html
|
|
|
|
# Set up logging
|
|
logging.basicConfig(filename='anime_tracker.log', level=logging.ERROR,
|
|
format='%(asctime)s - %(levelname)s - %(message)s')
|
|
|
|
class AnimeBackend:
|
|
def __init__(self):
|
|
self.db = sqlite3.connect('anime_backlog.db', isolation_level=None) # Autocommit mode to prevent locks
|
|
self.db.execute('PRAGMA journal_mode=WAL') # Use WAL mode for better concurrency
|
|
self.create_table()
|
|
|
|
def create_table(self):
|
|
try:
|
|
cursor = self.db.cursor()
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS anime (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
name TEXT NOT NULL,
|
|
year INTEGER NOT NULL,
|
|
season TEXT,
|
|
status TEXT NOT NULL DEFAULT 'unwatched',
|
|
type TEXT,
|
|
comment TEXT,
|
|
url TEXT
|
|
)
|
|
""")
|
|
self.db.commit()
|
|
except Exception as e:
|
|
logging.error(f"Error creating table: {e}")
|
|
|
|
def get_pre_2010_entries(self):
|
|
try:
|
|
cursor = self.db.cursor()
|
|
cursor.execute("SELECT * FROM anime WHERE year < 2010 ORDER BY year DESC, name")
|
|
return cursor.fetchall()
|
|
except Exception as e:
|
|
logging.error(f"Error getting pre-2010 entries: {e}")
|
|
return []
|
|
|
|
def get_years(self):
|
|
try:
|
|
cursor = self.db.cursor()
|
|
cursor.execute("SELECT DISTINCT year FROM anime WHERE year >= 2010 ORDER BY year DESC")
|
|
return [row[0] for row in cursor.fetchall()]
|
|
except Exception as e:
|
|
logging.error(f"Error getting years: {e}")
|
|
return []
|
|
|
|
def get_entries_for_season(self, year, season):
|
|
try:
|
|
cursor = self.db.cursor()
|
|
cursor.execute("SELECT * FROM anime WHERE year = ? AND season = ? ORDER BY name", (year, season))
|
|
return cursor.fetchall()
|
|
except Exception as e:
|
|
logging.error(f"Error getting entries for season {season} in year {year}: {e}")
|
|
return []
|
|
|
|
def get_anime_by_id(self, anime_id):
|
|
try:
|
|
cursor = self.db.cursor()
|
|
cursor.execute("SELECT * FROM anime WHERE id = ?", (anime_id,))
|
|
return cursor.fetchone()
|
|
except Exception as e:
|
|
logging.error(f"Error getting anime by id {anime_id}: {e}")
|
|
return None
|
|
|
|
def add_anime(self, data):
|
|
try:
|
|
# Sanitize string inputs
|
|
sanitized_data = {
|
|
'name': html.escape(data['name'].strip()) if data['name'] else '',
|
|
'year': data['year'],
|
|
'season': data['season'].strip() if data['season'] else '',
|
|
'status': data['status'].strip() if data['status'] else 'unwatched',
|
|
'type': data['type'].strip() if data['type'] else '',
|
|
'comment': html.escape(data['comment'].strip()) if data['comment'] else '',
|
|
'url': data['url'].strip() if data['url'] else ''
|
|
}
|
|
cursor = self.db.cursor()
|
|
cursor.execute(
|
|
"INSERT INTO anime (name, year, season, status, type, comment, url) VALUES (?, ?, ?, ?, ?, ?, ?)",
|
|
(
|
|
sanitized_data['name'],
|
|
sanitized_data['year'],
|
|
sanitized_data['season'],
|
|
sanitized_data['status'],
|
|
sanitized_data['type'],
|
|
sanitized_data['comment'],
|
|
sanitized_data['url']
|
|
)
|
|
)
|
|
self.db.commit()
|
|
except Exception as e:
|
|
logging.error(f"Error adding anime: {e}")
|
|
self.db.rollback()
|
|
|
|
def edit_anime(self, anime_id, data):
|
|
try:
|
|
# Sanitize string inputs
|
|
sanitized_data = {
|
|
'name': html.escape(data['name'].strip()) if data['name'] else '',
|
|
'year': data['year'],
|
|
'season': data['season'].strip() if data['season'] else '',
|
|
'status': data['status'].strip() if data['status'] else 'unwatched',
|
|
'type': data['type'].strip() if data['type'] else '',
|
|
'comment': html.escape(data['comment'].strip()) if data['comment'] else '',
|
|
'url': data['url'].strip() if data['url'] else ''
|
|
}
|
|
cursor = self.db.cursor()
|
|
cursor.execute(
|
|
"UPDATE anime SET name=?, year=?, season=?, status=?, type=?, comment=?, url=? WHERE id=?",
|
|
(
|
|
sanitized_data['name'],
|
|
sanitized_data['year'],
|
|
sanitized_data['season'],
|
|
sanitized_data['status'],
|
|
sanitized_data['type'],
|
|
sanitized_data['comment'],
|
|
sanitized_data['url'],
|
|
anime_id
|
|
)
|
|
)
|
|
self.db.commit()
|
|
except Exception as e:
|
|
logging.error(f"Error editing anime id {anime_id}: {e}")
|
|
self.db.rollback()
|
|
|
|
def delete_anime(self, anime_id):
|
|
try:
|
|
cursor = self.db.cursor()
|
|
cursor.execute("DELETE FROM anime WHERE id = ?", (anime_id,))
|
|
self.db.commit()
|
|
except Exception as e:
|
|
logging.error(f"Error deleting anime id {anime_id}: {e}")
|
|
self.db.rollback()
|
|
|
|
def change_status(self, anime_id, new_status):
|
|
try:
|
|
cursor = self.db.cursor()
|
|
cursor.execute("UPDATE anime SET status = ? WHERE id = ?", (new_status, anime_id))
|
|
self.db.commit()
|
|
except Exception as e:
|
|
logging.error(f"Error changing status for anime id {anime_id}: {e}")
|
|
self.db.rollback()
|
|
|
|
def add_placeholders_for_year(self, year):
|
|
try:
|
|
cursor = self.db.cursor()
|
|
for season in ['winter', 'spring', 'summer', 'fall', '']:
|
|
cursor.execute(
|
|
"INSERT INTO anime (name, year, season, status, type, comment, url) VALUES (?, ?, ?, ?, ?, ?, ?)",
|
|
('Placeholder', year, season, 'unwatched', '', 'Delete or edit me', '')
|
|
)
|
|
self.db.commit()
|
|
except Exception as e:
|
|
logging.error(f"Error adding placeholders for year {year}: {e}")
|
|
self.db.rollback()
|
|
|
|
def import_from_csv(self, file_name):
|
|
try:
|
|
with open(file_name, 'r', newline='') as f:
|
|
reader = csv.reader(f)
|
|
header = next(reader, None)
|
|
if header:
|
|
cursor = self.db.cursor()
|
|
for row in reader:
|
|
if len(row) == 7:
|
|
name, year_str, season, status, type_, comment, url = row
|
|
elif len(row) == 8:
|
|
_, name, year_str, season, status, type_, comment, url = row
|
|
else:
|
|
continue
|
|
try:
|
|
year = int(year_str)
|
|
except ValueError:
|
|
continue
|
|
# Sanitize CSV inputs
|
|
name = html.escape(name.strip()) if name else ''
|
|
season = season.strip() if season else ''
|
|
status = status.strip() if status else 'unwatched'
|
|
type_ = type_.strip() if type_ else ''
|
|
comment = html.escape(comment.strip()) if comment else ''
|
|
url = url.strip() if url else ''
|
|
cursor.execute(
|
|
"SELECT id FROM anime WHERE name = ? AND year = ? AND season = ?",
|
|
(name, year, season)
|
|
)
|
|
if not cursor.fetchone():
|
|
cursor.execute(
|
|
"INSERT INTO anime (name, year, season, status, type, comment, url) VALUES (?, ?, ?, ?, ?, ?, ?)",
|
|
(name, year, season, status, type_, comment, url)
|
|
)
|
|
self.db.commit()
|
|
except Exception as e:
|
|
logging.error(f"Error importing from CSV {file_name}: {e}")
|
|
self.db.rollback()
|
|
|
|
def export_to_csv(self, file_name):
|
|
try:
|
|
cursor = self.db.cursor()
|
|
cursor.execute("SELECT * FROM anime")
|
|
rows = cursor.fetchall()
|
|
with open(file_name, 'w', newline='') as f:
|
|
writer = csv.writer(f, quoting=csv.QUOTE_MINIMAL)
|
|
writer.writerow(['id', 'name', 'year', 'season', 'status', 'type', 'comment', 'url'])
|
|
writer.writerows(rows)
|
|
except Exception as e:
|
|
logging.error(f"Error exporting to CSV {file_name}: {e}")
|
|
|
|
def delete_year(self, year):
|
|
try:
|
|
cursor = self.db.cursor()
|
|
cursor.execute("DELETE FROM anime WHERE year = ?", (year,))
|
|
self.db.commit()
|
|
except Exception as e:
|
|
logging.error(f"Error deleting year {year}: {e}")
|
|
self.db.rollback()
|
|
|
|
def get_total_entries(self):
|
|
try:
|
|
cursor = self.db.cursor()
|
|
cursor.execute("SELECT COUNT(*) FROM anime")
|
|
return cursor.fetchone()[0]
|
|
except Exception as e:
|
|
logging.error(f"Error getting total entries: {e}")
|
|
return 0
|
|
|
|
def get_completed_entries(self):
|
|
try:
|
|
cursor = self.db.cursor()
|
|
cursor.execute("SELECT COUNT(*) FROM anime WHERE status = 'completed'")
|
|
return cursor.fetchone()[0]
|
|
except Exception as e:
|
|
logging.error(f"Error getting completed entries: {e}")
|
|
return 0
|
|
|
|
def get_entries_by_type(self):
|
|
try:
|
|
cursor = self.db.cursor()
|
|
cursor.execute("SELECT type, COUNT(*) FROM anime GROUP BY type ORDER BY 2 DESC")
|
|
return cursor.fetchall() # Returns list of tuples: (type, count)
|
|
except Exception as e:
|
|
logging.error(f"Error getting entries by type: {e}")
|
|
return [] |