1
0
mirror of https://github.com/Tautulli/Tautulli.git synced 2025-03-12 04:35:40 -07:00
plexpy/plexpy/database.py
2024-09-04 13:28:54 -07:00

498 lines
19 KiB
Python

# This file is part of Tautulli.
#
# Tautulli is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Tautulli is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Tautulli. If not, see <http://www.gnu.org/licenses/>.
import os
import sqlite3
import shutil
import threading
import time
import plexpy
from plexpy import helpers
from plexpy import logger
FILENAME = "tautulli.db"
db_lock = threading.Lock()
IS_IMPORTING = False
def set_is_importing(value):
global IS_IMPORTING
IS_IMPORTING = value
def validate_database(database=None):
try:
connection = sqlite3.connect(database, timeout=20)
except (sqlite3.OperationalError, sqlite3.DatabaseError, ValueError) as e:
logger.error("Tautulli Database :: Invalid database specified: %s", e)
return 'Invalid database specified'
except Exception as e:
logger.error("Tautulli Database :: Uncaught exception: %s", e)
return 'Uncaught exception'
try:
connection.execute("SELECT started from session_history")
connection.close()
except (sqlite3.OperationalError, sqlite3.DatabaseError, ValueError) as e:
logger.error("Tautulli Database :: Invalid database specified: %s", e)
return 'Invalid database specified'
except Exception as e:
logger.error("Tautulli Database :: Uncaught exception: %s", e)
return 'Uncaught exception'
return 'success'
def import_tautulli_db(database=None, method=None, backup=False):
if IS_IMPORTING:
logger.warn("Tautulli Database :: Another Tautulli database is currently being imported. "
"Please wait until it is complete before importing another database.")
return False
db_validate = validate_database(database=database)
if not db_validate == 'success':
logger.error("Tautulli Database :: Failed to import Tautulli database: %s", db_validate)
return False
if method not in ('merge', 'overwrite'):
logger.error("Tautulli Database :: Failed to import Tautulli database: invalid import method '%s'", method)
return False
if backup:
# Make a backup of the current database first
logger.info("Tautulli Database :: Creating a database backup before importing.")
if not make_backup():
logger.error("Tautulli Database :: Failed to import Tautulli database: failed to create database backup")
return False
logger.info("Tautulli Database :: Importing Tautulli database '%s' with import method '%s'...", database, method)
set_is_importing(True)
db = MonitorDatabase()
db.connection.execute("BEGIN IMMEDIATE")
db.connection.execute("ATTACH ? AS import_db", [database])
try:
version_info = db.select_single("SELECT * FROM import_db.version_info WHERE key = 'version'")
import_db_version = version_info['value']
except (sqlite3.OperationalError, KeyError):
import_db_version = 'v2.6.10'
logger.info("Tautulli Database :: Import Tautulli database version: %s", import_db_version)
import_db_version = helpers.version_to_tuple(import_db_version)
# Get the current number of used ids in the session_history table
session_history_seq = db.select_single("SELECT seq FROM sqlite_sequence WHERE name = 'session_history'")
session_history_rows = session_history_seq.get('seq', 0)
session_history_tables = ('session_history', 'session_history_metadata', 'session_history_media_info')
if method == 'merge':
logger.info("Tautulli Database :: Creating temporary database tables to re-index grouped session history.")
for table_name in session_history_tables:
db.action("CREATE TABLE {table}_copy AS SELECT * FROM import_db.{table}".format(table=table_name))
db.action("UPDATE {table}_copy SET id = id + ?".format(table=table_name),
[session_history_rows])
if table_name == 'session_history':
db.action("UPDATE {table}_copy SET reference_id = reference_id + ?".format(table=table_name),
[session_history_rows])
# Migrate section_id from session_history_metadata to session_history
if import_db_version < helpers.version_to_tuple('v2.7.0'):
if method == 'merge':
from_db_name = 'main'
copy = '_copy'
else:
from_db_name = 'import_db'
copy = ''
db.action("ALTER TABLE {from_db}.session_history{copy} "
"ADD COLUMN section_id INTEGER".format(from_db=from_db_name,
copy=copy))
db.action("UPDATE {from_db}.session_history{copy} SET section_id = ("
"SELECT section_id FROM {from_db}.session_history_metadata{copy} "
"WHERE {from_db}.session_history_metadata{copy}.id = "
"{from_db}.session_history{copy}.id)".format(from_db=from_db_name,
copy=copy))
# Keep track of all table columns so that duplicates can be removed after importing
table_columns = {}
tables = db.select("SELECT name FROM import_db.sqlite_master "
"WHERE type = 'table' AND name NOT LIKE 'sqlite_%'"
"ORDER BY name")
for table in tables:
table_name = table['name']
if table_name == 'sessions' or table_name == 'version_info':
# Skip temporary sessions table
continue
current_table = db.select("PRAGMA main.table_info({table})".format(table=table_name))
if not current_table:
# Skip table does not exits
continue
logger.info("Tautulli Database :: Importing database table '%s'.", table_name)
if method == 'overwrite':
# Clear the table and reset the autoincrement ids
db.action("DELETE FROM {table}".format(table=table_name))
db.action("DELETE FROM sqlite_sequence WHERE name = ?", [table_name])
if method == 'merge' and table_name in session_history_tables:
from_db_name = 'main'
from_table_name = table_name + '_copy'
else:
from_db_name = 'import_db'
from_table_name = table_name
# Get the list of columns to import
current_columns = [c['name'] for c in current_table]
import_table = db.select("PRAGMA {from_db}.table_info({from_table})".format(from_db=from_db_name,
from_table=from_table_name))
if method == 'merge' and table_name not in session_history_tables:
import_columns = [c['name'] for c in import_table if c['name'] in current_columns and not c['pk']]
else:
import_columns = [c['name'] for c in import_table if c['name'] in current_columns]
table_columns[table_name] = import_columns
insert_columns = ', '.join(import_columns)
# Insert the data with ignore instead of replace to be safe
db.action("INSERT OR IGNORE INTO {table} ({columns}) "
"SELECT {columns} FROM {from_db}.{from_table}".format(table=table_name,
columns=insert_columns,
from_db=from_db_name,
from_table=from_table_name))
db.connection.execute("DETACH import_db")
if method == 'merge':
for table_name, columns in sorted(table_columns.items()):
duplicate_columns = ', '.join([c for c in columns if c not in ('id', 'reference_id')])
logger.info("Tautulli Database :: Removing duplicate rows from database table '%s'.", table_name)
if table_name in session_history_tables[1:]:
db.action("DELETE FROM {table} WHERE id NOT IN "
"(SELECT id FROM session_history)".format(table=table_name))
else:
db.action("DELETE FROM {table} WHERE id NOT IN "
"(SELECT MIN(id) FROM {table} GROUP BY {columns})".format(table=table_name,
columns=duplicate_columns))
logger.info("Tautulli Database :: Deleting temporary database tables.")
for table_name in session_history_tables:
db.action("DROP TABLE {table}_copy".format(table=table_name))
vacuum()
logger.info("Tautulli Database :: Tautulli database import complete.")
set_is_importing(False)
logger.info("Tautulli Database :: Deleting cached database: %s", database)
os.remove(database)
def integrity_check():
monitor_db = MonitorDatabase()
result = monitor_db.select_single("PRAGMA integrity_check")
return result
def clear_table(table=None):
if table:
monitor_db = MonitorDatabase()
logger.debug("Tautulli Database :: Clearing database table '%s'." % table)
try:
monitor_db.action("DELETE FROM %s" % table)
vacuum()
return True
except Exception as e:
logger.error("Tautulli Database :: Failed to clear database table '%s': %s." % (table, e))
return False
def delete_sessions():
logger.info("Tautulli Database :: Clearing temporary sessions from database.")
return clear_table('sessions')
def delete_recently_added():
logger.info("Tautulli Database :: Clearing recently added items from database.")
return clear_table('recently_added')
def delete_exports():
logger.info("Tautulli Database :: Clearing exported items from database.")
return clear_table('exports')
def delete_rows_from_table(table, row_ids):
if row_ids and isinstance(row_ids, str):
row_ids = list(map(helpers.cast_to_int, row_ids.split(',')))
if row_ids:
logger.info("Tautulli Database :: Deleting row ids %s from %s database table", row_ids, table)
# SQlite versions prior to 3.32.0 (2020-05-22) have maximum variable limit of 999
# https://sqlite.org/limits.html
sqlite_max_variable_number = 999
monitor_db = MonitorDatabase()
try:
for row_ids_group in helpers.chunk(row_ids, sqlite_max_variable_number):
query = "DELETE FROM " + table + " WHERE id IN (%s) " % ','.join(['?'] * len(row_ids_group))
monitor_db.action(query, row_ids_group)
vacuum()
except Exception as e:
logger.error("Tautulli Database :: Failed to delete rows from %s database table: %s" % (table, e))
return False
return True
def delete_session_history_rows(row_ids=None):
success = []
for table in ('session_history', 'session_history_media_info', 'session_history_metadata'):
success.append(delete_rows_from_table(table=table, row_ids=row_ids))
return all(success)
def delete_user_history(user_id=None):
if str(user_id).isdigit():
monitor_db = MonitorDatabase()
# Get all history associated with the user_id
result = monitor_db.select("SELECT id FROM session_history WHERE user_id = ?",
[user_id])
row_ids = [row['id'] for row in result]
logger.info("Tautulli Database :: Deleting all history for user_id %s from database." % user_id)
return delete_session_history_rows(row_ids=row_ids)
def delete_library_history(section_id=None):
if str(section_id).isdigit():
monitor_db = MonitorDatabase()
# Get all history associated with the section_id
result = monitor_db.select("SELECT id FROM session_history WHERE section_id = ?",
[section_id])
row_ids = [row['id'] for row in result]
logger.info("Tautulli Database :: Deleting all history for library section_id %s from database." % section_id)
return delete_session_history_rows(row_ids=row_ids)
def vacuum():
monitor_db = MonitorDatabase()
logger.info("Tautulli Database :: Vacuuming database.")
try:
monitor_db.action("VACUUM")
except Exception as e:
logger.error("Tautulli Database :: Failed to vacuum database: %s" % e)
def optimize():
monitor_db = MonitorDatabase()
logger.info("Tautulli Database :: Optimizing database.")
try:
monitor_db.action("PRAGMA optimize")
except Exception as e:
logger.error("Tautulli Database :: Failed to optimize database: %s" % e)
def optimize_db():
vacuum()
optimize()
def db_filename(filename=None):
""" Returns the filepath to the db """
if filename is None:
return os.path.join(plexpy.DATA_DIR, FILENAME)
return filename
def make_backup(cleanup=False, scheduler=False):
""" Makes a backup of db, removes all but the last 5 backups """
# Check the integrity of the database first
integrity = (integrity_check()['integrity_check'] == 'ok')
corrupt = ''
if not integrity:
corrupt = '.corrupt'
plexpy.NOTIFY_QUEUE.put({'notify_action': 'on_plexpydbcorrupt'})
if scheduler:
backup_file = 'tautulli.backup-{}{}.sched.db'.format(helpers.now(), corrupt)
else:
backup_file = 'tautulli.backup-{}{}.db'.format(helpers.now(), corrupt)
backup_folder = plexpy.CONFIG.BACKUP_DIR
backup_file_fp = os.path.join(backup_folder, backup_file)
# In case the user has deleted it manually
if not os.path.exists(backup_folder):
os.makedirs(backup_folder)
db = MonitorDatabase()
db.connection.execute("BEGIN IMMEDIATE")
shutil.copyfile(db_filename(), backup_file_fp)
db.connection.rollback()
# Only cleanup if the database integrity is okay
if cleanup and integrity:
now = time.time()
# Delete all scheduled backup older than BACKUP_DAYS.
for root, dirs, files in os.walk(backup_folder):
db_files = [os.path.join(root, f) for f in files if f.endswith('.sched.db')]
for file_ in db_files:
if os.stat(file_).st_mtime < now - plexpy.CONFIG.BACKUP_DAYS * 86400:
try:
os.remove(file_)
except OSError as e:
logger.error("Tautulli Database :: Failed to delete %s from the backup folder: %s" % (file_, e))
if backup_file in os.listdir(backup_folder):
logger.debug("Tautulli Database :: Successfully backed up %s to %s" % (db_filename(), backup_file))
return True
else:
logger.error("Tautulli Database :: Failed to backup %s to %s" % (db_filename(), backup_file))
return False
def get_cache_size():
# This will protect against typecasting problems produced by empty string and None settings
if not plexpy.CONFIG.CACHE_SIZEMB:
# sqlite will work with this (very slowly)
return 0
return int(plexpy.CONFIG.CACHE_SIZEMB)
def dict_factory(cursor, row):
d = {}
for idx, col in enumerate(cursor.description):
d[col[0]] = row[idx]
return d
class MonitorDatabase(object):
def __init__(self, filename=None):
self.filename = db_filename(filename)
self.connection = sqlite3.connect(self.filename, timeout=20)
# Set database synchronous mode (default NORMAL)
self.connection.execute("PRAGMA synchronous = %s" % plexpy.CONFIG.SYNCHRONOUS_MODE)
# Set database journal mode (default WAL)
self.connection.execute("PRAGMA journal_mode = %s" % plexpy.CONFIG.JOURNAL_MODE)
# Set database cache size (default 32MB)
self.connection.execute("PRAGMA cache_size = -%s" % (get_cache_size() * 1024))
self.connection.row_factory = dict_factory
def action(self, query, args=None, return_last_id=False):
if query is None:
return
with db_lock:
sql_result = None
attempts = 0
while attempts < 5:
try:
with self.connection as c:
if args is None:
sql_result = c.execute(query)
else:
sql_result = c.execute(query, args)
# Our transaction was successful, leave the loop
break
except sqlite3.OperationalError as e:
e = str(e)
if "unable to open database file" in e or "database is locked" in e:
logger.warn("Tautulli Database :: Database Error: %s", e)
attempts += 1
time.sleep(1)
else:
logger.error("Tautulli Database :: Database error: %s", e)
raise
except sqlite3.DatabaseError as e:
logger.error("Tautulli Database :: Fatal Error executing %s :: %s", query, e)
raise
return sql_result
def select(self, query, args=None):
sql_results = self.action(query, args).fetchall()
if sql_results is None or sql_results == [None]:
return []
return sql_results
def select_single(self, query, args=None):
sql_results = self.action(query, args).fetchone()
if sql_results is None or sql_results == "":
return {}
return sql_results
def upsert(self, table_name, value_dict, key_dict):
trans_type = 'update'
changes_before = self.connection.total_changes
gen_params = lambda my_dict: [x + " = ?" for x in my_dict]
update_query = "UPDATE " + table_name + " SET " + ", ".join(gen_params(value_dict)) + \
" WHERE " + " AND ".join(gen_params(key_dict))
self.action(update_query, list(value_dict.values()) + list(key_dict.values()))
if self.connection.total_changes == changes_before:
trans_type = 'insert'
insert_query = (
"INSERT INTO " + table_name + " (" + ", ".join(list(value_dict.keys()) + list(key_dict.keys())) + ")" +
" VALUES (" + ", ".join(["?"] * len(list(value_dict.keys()) + list(key_dict.keys()))) + ")"
)
try:
self.action(insert_query, list(value_dict.values()) + list(key_dict.values()))
except sqlite3.IntegrityError:
logger.info("Tautulli Database :: Queries failed: %s and %s", update_query, insert_query)
# We want to know if it was an update or insert
return trans_type
def last_insert_id(self):
# Get the last insert row id
result = self.select_single(query="SELECT last_insert_rowid() AS last_id")
if result:
return result.get('last_id', None)
def __del__(self):
# Close the database connection when object is garbage collected
self.connection.close()