Module soitool.database
Includes all database-related functionality.
Expand source code
"""Includes all database-related functionality."""
import os
import sqlite3
import json
from datetime import datetime
import soitool.coder
from soitool.enumerates import CodebookSort
from soitool.serialize_export_import_soi import serialize_soi
from soitool.compressor import compress
from soitool.dialog_wrappers import exec_warning_dialog
# Set name and path to default (future) database
DBNAME = "database"
CURDIR = os.path.dirname(__file__)
DBPATH = os.path.join(CURDIR, DBNAME)
# Number og seconds in 24h
SECONDS_IN_24H = 24 * 60 * 60
# DDL-statements for creating tables
SOI = (
"CREATE TABLE SOI("
"Title VARCHAR NOT NULL CHECK(length(Title) > 0), "
"Version VARCHAR NOT NULL CHECK(length(Version) > 0), "
"SOI TEXT NOT NULL CHECK(length(SOI) > 0), "
"Date DATETIME, "
"PRIMARY KEY(Title, Version))"
)
CODEBOOK = (
"CREATE TABLE Codebook("
"Word VARCHAR PRIMARY KEY NOT NULL CHECK(length(Word) > 0), "
"Category VARCHAR, "
"Type VARCHAR NOT NULL DEFAULT 'Stor' CHECK(Type='Stor' OR Type='Liten'), "
"Code VARCHAR UNIQUE)"
)
CATEGORYWORDS = (
"CREATE TABLE CategoryWords("
"Word VARCHAR PRIMARY KEY, "
"Category VARCHAR NOT NULL CHECK(length(Category) > 0))"
)
LASTUPDATED = "CREATE TABLE LastUpdated(Timestamp DATETIME PRIMARY KEY)"
class Database:
"""Holds database-connection and related functions.
Uses default database unless parameter 'db_path', path to a database-file,
is given. Connects to existing db if found, creates new db if not.
If db is created, tables are created and filled.
Holds a QTimer that requests an update of CodeBook on every timeout.
Parameters
----------
db_path : str, optional
Path to DB, by default DBPATH.
"""
def __init__(self, db_path=DBPATH):
self.db_path = db_path
db_exists = os.path.exists(self.db_path)
self.conn = sqlite3.connect(self.db_path)
if not db_exists:
self.create_tables()
self.fill_tables()
self.conn.row_factory = sqlite3.Row # Enables row["columnName"]
def create_tables(self):
"""Create database-tables."""
stmts = [SOI, CODEBOOK, CATEGORYWORDS, LASTUPDATED]
for stmt in stmts:
self.conn.execute(stmt)
self.conn.commit()
def fill_tables(self):
"""Fill tables with testdata."""
self.fill_codebook()
self.fill_category_words()
self.fill_last_updated()
self.conn.commit()
def fill_codebook(self):
"""Read data from codebook.json and fill DB-table Codebook."""
file_path = os.path.join(CURDIR, "testdata/codebook.json")
# Load json as dict
file = open(file_path, "r", encoding="utf-8")
entries = json.load(file)
file.close()
# Generate codes
code_len = soitool.coder.get_code_length_needed(len(entries))
codes = soitool.coder.get_code_set(len(entries), code_len)
# Insert data in db
stmt = (
"INSERT INTO Codebook(Word, Category, Type, Code)"
"VALUES(?,?,?,?)"
)
for word in entries:
self.conn.execute(
stmt,
(word["word"], word["category"], word["type"], codes.pop()),
)
def fill_category_words(self):
"""Read data from CategoryWords.txt and fill DB-table CategoryWords."""
file_path = os.path.join(CURDIR, "testdata/CategoryWords.txt")
file = open(file_path, "r", encoding="utf-8")
# Get number of categories on file
no_of_categories = int(file.readline().rstrip())
# Loop through categories on file
for _ in range(no_of_categories):
# Get category and number of words in category
line = file.readline().split(", ")
category = line[0]
no_of_words = int(line[1].rstrip())
# Loop through words in category and add rows to DB
stmt = "INSERT INTO CategoryWords(Word, Category) VALUES(?, ?)"
for _ in range(no_of_words):
word = file.readline().rstrip()
self.conn.execute(stmt, (word, category,))
file.close()
def fill_last_updated(self):
"""Fill table with current date and time."""
stmt = "INSERT INTO LastUpdated(Timestamp) VALUES(?)"
self.conn.execute(stmt, (str(datetime.now()),))
def update_last_updated(self):
"""Update Timestamp in LastUpdated to current time."""
stmt = "UPDATE LastUpdated SET Timestamp = ?"
self.conn.execute(stmt, (str(datetime.now()),))
self.conn.commit()
def get_categories(self):
"""Retrieve all categories from DB-table CategoryWords.
Returns
-------
List
Containing categories (string).
"""
stmt = "SELECT DISTINCT Category FROM CategoryWords"
queried = self.conn.execute(stmt)
categories = [row["Category"] for row in queried]
return categories
def get_random_category_word(self):
"""Read a random word from database-table CategoryWords.
Returns
-------
String
Word (string).
"""
stmt = "SELECT Word FROM CategoryWords ORDER BY RANDOM() LIMIT 1"
word = self.conn.execute(stmt).fetchone()[0]
return word
def get_categories_from_codebook(self, small=False):
"""Read categories from full or small codebook.
Parameters
----------
small : bool, optional
Categories are from small codebook if True, full codebook if
False, by default False.
Returns
-------
List
Containing categories (string).
"""
stmt = "SELECT Category FROM Codebook"
if small:
stmt += " WHERE Type='Liten' GROUP BY Category"
else:
stmt += " GROUP BY Category"
queried = self.conn.execute(stmt).fetchall()
categories = [row["Category"] for row in queried]
return categories
def get_category_words(self, category):
"""Retrieve words from DB-table CategoryWords in given category.
Parameters
----------
category : str
Category of desired words.
"""
stmt = "SELECT DISTINCT Word FROM CategoryWords WHERE Category = ?"
queried = self.conn.execute(stmt, (category,))
words = []
for row in queried:
words.append(row["Word"])
return words
def get_codebook(self, small=False, sort=CodebookSort.WORD):
"""Retrieve the entries belonging to the full or small codebook.
Parameters
----------
small : Bool
Full codebook will be returned if False (default).
Small codebook will be returned if True.
sort : int, optional
0 or 1 (enumerate value from 'CodebookSort')
Data is sorted by Word if 0 (default), by Code if 1
Returns
-------
codebook : list (of dicts)
[{'word': str, 'type': str, 'category': str, 'code': str}]
Raises
------
ValueError
If value for parameter sort is invalid
"""
stmt = "SELECT * FROM (SELECT * FROM Codebook"
if sort == CodebookSort.WORD:
stmt += " ORDER BY Word)"
elif sort == CodebookSort.CODE:
stmt += " ORDER BY Code)"
else:
raise ValueError(
"Invalid value for parameter 'sort': '" + "'{}'".format(sort)
)
# Get either full or small codebook
if small:
stmt += " WHERE Type = ?"
queried = self.conn.execute(stmt, ("Liten",)).fetchall()
else:
queried = self.conn.execute(stmt).fetchall()
codebook = []
for entry in queried:
codebook.append(
{
"word": entry["Word"],
"category": entry["Category"],
"type": entry["Type"],
"code": entry["Code"],
}
)
return codebook
def get_codebook_expressions_in_category(self, category, small=False):
"""Read expressions, from full or small codebook, in a given category.
Parameters
----------
category : string
Expressions in this category are returned.
small : bool, optional
Expressions are from small codebook if True, full codebook if
False, by default False.
Returns
-------
List
Containing expressions (string).
"""
stmt = "SELECT Word FROM Codebook WHERE Category=?"
if small:
stmt += " AND Type='Liten'"
queried = self.conn.execute(stmt, (category,)).fetchall()
expressions = [row["Word"] for row in queried]
return expressions
def update_codebook(self):
"""Update codes in DB."""
# Get all the words (PK)
stmt = "SELECT Word FROM Codebook"
words = self.conn.execute(stmt).fetchall()
number_of_entries = len(words)
# Generate new codes
code_len = soitool.coder.get_code_length_needed(number_of_entries)
codes = soitool.coder.get_code_set(number_of_entries, code_len)
# Statement for update
stmt = "UPDATE Codebook SET Code = ?"
# Inserting NULL into Code column because of UNIQUE constraint
self.conn.execute(stmt, (None,))
# Fill Code column with new codes
stmt = stmt + "WHERE Word = ?"
for i in range(number_of_entries):
self.conn.execute(stmt, (codes.pop(), words[i][0]))
# Updates LastUpdated with current time
self.update_last_updated()
# Save changes in db
self.conn.commit()
def seconds_to_next_update(self, period):
"""Return time to next update of Codebook in seconds.
Parameters
----------
period : int
The number of seconds between each update
Returns
-------
seconds_to_update : float
Time to next update in seconds
"""
stmt = "SELECT Timestamp FROM LastUpdated"
last_updated = self.conn.execute(stmt).fetchall()[0]["Timestamp"]
# Convert datetime string to datetime object
last_updated = datetime.strptime(last_updated, "%Y-%m-%d %H:%M:%S.%f")
# Calculate the number of seconds until next update
seconds_to_update = (
period - (datetime.now() - last_updated).total_seconds()
)
# Since QTimer does not handle negative values
if seconds_to_update <= 0:
return 0
return seconds_to_update
def update_codebook_auto(self, timer):
"""Update Codebook if needed and update time for timer.
Parameters
----------
timer : QTimer
Timer to set new interval on.
"""
if self.seconds_to_next_update(SECONDS_IN_24H) <= 0:
self.update_codebook()
timer.setInterval(self.seconds_to_next_update(SECONDS_IN_24H) * 1000)
def add_code_to(self, word, mode="ascii"):
"""Generate and insert a code for the new word in DB-table CodeBook.
This function is espescially designed for when a single word is added
to the CodeBook table. A unique code is generate and inserted in
Code which for the parameter word from before were NULL. Dependent on
the number of entries in the table various actions are performed. If
the new word makes the number of entries pass 26^x and the length of
the codes does not have he capacity, all the codes are updatet to an
appropriate length.
Parameters
----------
word : string
The word to generate a code for.
mode : string
'ascii' for letters (default), 'digits' for digits and 'combo'
for combination of letters and digits.
Raises
------
ValueError
If code is added to empty DB.
"""
# Get length of codes and calculate needed length of codes
stmt = "SELECT COUNT(*) FROM Codebook"
number_of_entries = self.conn.execute(stmt).fetchall()[0][0]
stmt = "SELECT Code FROM Codebook"
# In special case where table is empty
if number_of_entries <= 0:
raise ValueError("Can't add code to table with no words.")
# In special case table only has a single word
if number_of_entries == 1:
actual_code_len = soitool.coder.get_code_length_needed(
number_of_entries
)
else:
# Since the newly added word's code is NULL and at [0][0],
# get [1][0] to get code length from an actual code
actual_code_len = len(
self.conn.execute(stmt).fetchall()[1]["Code"]
)
needed_code_len = soitool.coder.get_code_length_needed(
number_of_entries
)
if actual_code_len < needed_code_len:
self.update_codebook()
else:
# Get all codes and convert to set
codes = self.conn.execute(stmt).fetchall()
codes = {c[:][0] for c in codes}
# Get new unique code for param word
code = soitool.coder.get_code(needed_code_len, mode)
while code in codes:
code = soitool.coder.get_code(needed_code_len, mode)
# Insert code to the param word in db
stmt = "UPDATE Codebook SET Code = ? WHERE Word = ?"
self.conn.execute(stmt, (code, word))
self.conn.commit()
def insert_or_update_soi(self, soi):
"""Serialize, compress and insert/update SOI in database-table SOI.
SOI's with the same title and version are overwritten.
Parameters
----------
soi : soitool.soi.SOI
The SOI-instance to insert or update.
"""
# Serialize and compress SOI
compressed_soi = compress(serialize_soi(soi))
# Check if SOI with the same title and version exists
stmt = "SELECT COUNT(*) FROM SOI WHERE Title=? AND Version=?"
count = self.conn.execute(stmt, (soi.title, soi.version)).fetchone()[0]
try:
# If SOI exists, overwrite
if count == 1:
stmt = (
"UPDATE SOI SET SOI=?, Date=? WHERE Title=? AND Version=?"
)
self.conn.execute(
stmt, (compressed_soi, soi.date, soi.title, soi.version)
)
# SOI does not exist, insert
else:
stmt = (
"INSERT INTO SOI (Title, Version, SOI, Date) "
"VALUES(?,?,?,?)"
)
self.conn.execute(
stmt, (soi.title, soi.version, compressed_soi, soi.date)
)
self.conn.commit()
except sqlite3.IntegrityError:
exec_warning_dialog(
"SOI ble ikke skrevet til database.",
"Sørg for at SOI har en tittel og versjon.",
)
Classes
class Database (db_path='/builds/bachelor-paa-bittet/soitool/soitool/database')
-
Holds database-connection and related functions.
Uses default database unless parameter 'db_path', path to a database-file, is given. Connects to existing db if found, creates new db if not. If db is created, tables are created and filled.
Holds a QTimer that requests an update of CodeBook on every timeout.
Parameters
db_path : str, optional Path to DB, by default DBPATH.
Expand source code
class Database: """Holds database-connection and related functions. Uses default database unless parameter 'db_path', path to a database-file, is given. Connects to existing db if found, creates new db if not. If db is created, tables are created and filled. Holds a QTimer that requests an update of CodeBook on every timeout. Parameters ---------- db_path : str, optional Path to DB, by default DBPATH. """ def __init__(self, db_path=DBPATH): self.db_path = db_path db_exists = os.path.exists(self.db_path) self.conn = sqlite3.connect(self.db_path) if not db_exists: self.create_tables() self.fill_tables() self.conn.row_factory = sqlite3.Row # Enables row["columnName"] def create_tables(self): """Create database-tables.""" stmts = [SOI, CODEBOOK, CATEGORYWORDS, LASTUPDATED] for stmt in stmts: self.conn.execute(stmt) self.conn.commit() def fill_tables(self): """Fill tables with testdata.""" self.fill_codebook() self.fill_category_words() self.fill_last_updated() self.conn.commit() def fill_codebook(self): """Read data from codebook.json and fill DB-table Codebook.""" file_path = os.path.join(CURDIR, "testdata/codebook.json") # Load json as dict file = open(file_path, "r", encoding="utf-8") entries = json.load(file) file.close() # Generate codes code_len = soitool.coder.get_code_length_needed(len(entries)) codes = soitool.coder.get_code_set(len(entries), code_len) # Insert data in db stmt = ( "INSERT INTO Codebook(Word, Category, Type, Code)" "VALUES(?,?,?,?)" ) for word in entries: self.conn.execute( stmt, (word["word"], word["category"], word["type"], codes.pop()), ) def fill_category_words(self): """Read data from CategoryWords.txt and fill DB-table CategoryWords.""" file_path = os.path.join(CURDIR, "testdata/CategoryWords.txt") file = open(file_path, "r", encoding="utf-8") # Get number of categories on file no_of_categories = int(file.readline().rstrip()) # Loop through categories on file for _ in range(no_of_categories): # Get category and number of words in category line = file.readline().split(", ") category = line[0] no_of_words = int(line[1].rstrip()) # Loop through words in category and add rows to DB stmt = "INSERT INTO CategoryWords(Word, Category) VALUES(?, ?)" for _ in range(no_of_words): word = file.readline().rstrip() self.conn.execute(stmt, (word, category,)) file.close() def fill_last_updated(self): """Fill table with current date and time.""" stmt = "INSERT INTO LastUpdated(Timestamp) VALUES(?)" self.conn.execute(stmt, (str(datetime.now()),)) def update_last_updated(self): """Update Timestamp in LastUpdated to current time.""" stmt = "UPDATE LastUpdated SET Timestamp = ?" self.conn.execute(stmt, (str(datetime.now()),)) self.conn.commit() def get_categories(self): """Retrieve all categories from DB-table CategoryWords. Returns ------- List Containing categories (string). """ stmt = "SELECT DISTINCT Category FROM CategoryWords" queried = self.conn.execute(stmt) categories = [row["Category"] for row in queried] return categories def get_random_category_word(self): """Read a random word from database-table CategoryWords. Returns ------- String Word (string). """ stmt = "SELECT Word FROM CategoryWords ORDER BY RANDOM() LIMIT 1" word = self.conn.execute(stmt).fetchone()[0] return word def get_categories_from_codebook(self, small=False): """Read categories from full or small codebook. Parameters ---------- small : bool, optional Categories are from small codebook if True, full codebook if False, by default False. Returns ------- List Containing categories (string). """ stmt = "SELECT Category FROM Codebook" if small: stmt += " WHERE Type='Liten' GROUP BY Category" else: stmt += " GROUP BY Category" queried = self.conn.execute(stmt).fetchall() categories = [row["Category"] for row in queried] return categories def get_category_words(self, category): """Retrieve words from DB-table CategoryWords in given category. Parameters ---------- category : str Category of desired words. """ stmt = "SELECT DISTINCT Word FROM CategoryWords WHERE Category = ?" queried = self.conn.execute(stmt, (category,)) words = [] for row in queried: words.append(row["Word"]) return words def get_codebook(self, small=False, sort=CodebookSort.WORD): """Retrieve the entries belonging to the full or small codebook. Parameters ---------- small : Bool Full codebook will be returned if False (default). Small codebook will be returned if True. sort : int, optional 0 or 1 (enumerate value from 'CodebookSort') Data is sorted by Word if 0 (default), by Code if 1 Returns ------- codebook : list (of dicts) [{'word': str, 'type': str, 'category': str, 'code': str}] Raises ------ ValueError If value for parameter sort is invalid """ stmt = "SELECT * FROM (SELECT * FROM Codebook" if sort == CodebookSort.WORD: stmt += " ORDER BY Word)" elif sort == CodebookSort.CODE: stmt += " ORDER BY Code)" else: raise ValueError( "Invalid value for parameter 'sort': '" + "'{}'".format(sort) ) # Get either full or small codebook if small: stmt += " WHERE Type = ?" queried = self.conn.execute(stmt, ("Liten",)).fetchall() else: queried = self.conn.execute(stmt).fetchall() codebook = [] for entry in queried: codebook.append( { "word": entry["Word"], "category": entry["Category"], "type": entry["Type"], "code": entry["Code"], } ) return codebook def get_codebook_expressions_in_category(self, category, small=False): """Read expressions, from full or small codebook, in a given category. Parameters ---------- category : string Expressions in this category are returned. small : bool, optional Expressions are from small codebook if True, full codebook if False, by default False. Returns ------- List Containing expressions (string). """ stmt = "SELECT Word FROM Codebook WHERE Category=?" if small: stmt += " AND Type='Liten'" queried = self.conn.execute(stmt, (category,)).fetchall() expressions = [row["Word"] for row in queried] return expressions def update_codebook(self): """Update codes in DB.""" # Get all the words (PK) stmt = "SELECT Word FROM Codebook" words = self.conn.execute(stmt).fetchall() number_of_entries = len(words) # Generate new codes code_len = soitool.coder.get_code_length_needed(number_of_entries) codes = soitool.coder.get_code_set(number_of_entries, code_len) # Statement for update stmt = "UPDATE Codebook SET Code = ?" # Inserting NULL into Code column because of UNIQUE constraint self.conn.execute(stmt, (None,)) # Fill Code column with new codes stmt = stmt + "WHERE Word = ?" for i in range(number_of_entries): self.conn.execute(stmt, (codes.pop(), words[i][0])) # Updates LastUpdated with current time self.update_last_updated() # Save changes in db self.conn.commit() def seconds_to_next_update(self, period): """Return time to next update of Codebook in seconds. Parameters ---------- period : int The number of seconds between each update Returns ------- seconds_to_update : float Time to next update in seconds """ stmt = "SELECT Timestamp FROM LastUpdated" last_updated = self.conn.execute(stmt).fetchall()[0]["Timestamp"] # Convert datetime string to datetime object last_updated = datetime.strptime(last_updated, "%Y-%m-%d %H:%M:%S.%f") # Calculate the number of seconds until next update seconds_to_update = ( period - (datetime.now() - last_updated).total_seconds() ) # Since QTimer does not handle negative values if seconds_to_update <= 0: return 0 return seconds_to_update def update_codebook_auto(self, timer): """Update Codebook if needed and update time for timer. Parameters ---------- timer : QTimer Timer to set new interval on. """ if self.seconds_to_next_update(SECONDS_IN_24H) <= 0: self.update_codebook() timer.setInterval(self.seconds_to_next_update(SECONDS_IN_24H) * 1000) def add_code_to(self, word, mode="ascii"): """Generate and insert a code for the new word in DB-table CodeBook. This function is espescially designed for when a single word is added to the CodeBook table. A unique code is generate and inserted in Code which for the parameter word from before were NULL. Dependent on the number of entries in the table various actions are performed. If the new word makes the number of entries pass 26^x and the length of the codes does not have he capacity, all the codes are updatet to an appropriate length. Parameters ---------- word : string The word to generate a code for. mode : string 'ascii' for letters (default), 'digits' for digits and 'combo' for combination of letters and digits. Raises ------ ValueError If code is added to empty DB. """ # Get length of codes and calculate needed length of codes stmt = "SELECT COUNT(*) FROM Codebook" number_of_entries = self.conn.execute(stmt).fetchall()[0][0] stmt = "SELECT Code FROM Codebook" # In special case where table is empty if number_of_entries <= 0: raise ValueError("Can't add code to table with no words.") # In special case table only has a single word if number_of_entries == 1: actual_code_len = soitool.coder.get_code_length_needed( number_of_entries ) else: # Since the newly added word's code is NULL and at [0][0], # get [1][0] to get code length from an actual code actual_code_len = len( self.conn.execute(stmt).fetchall()[1]["Code"] ) needed_code_len = soitool.coder.get_code_length_needed( number_of_entries ) if actual_code_len < needed_code_len: self.update_codebook() else: # Get all codes and convert to set codes = self.conn.execute(stmt).fetchall() codes = {c[:][0] for c in codes} # Get new unique code for param word code = soitool.coder.get_code(needed_code_len, mode) while code in codes: code = soitool.coder.get_code(needed_code_len, mode) # Insert code to the param word in db stmt = "UPDATE Codebook SET Code = ? WHERE Word = ?" self.conn.execute(stmt, (code, word)) self.conn.commit() def insert_or_update_soi(self, soi): """Serialize, compress and insert/update SOI in database-table SOI. SOI's with the same title and version are overwritten. Parameters ---------- soi : soitool.soi.SOI The SOI-instance to insert or update. """ # Serialize and compress SOI compressed_soi = compress(serialize_soi(soi)) # Check if SOI with the same title and version exists stmt = "SELECT COUNT(*) FROM SOI WHERE Title=? AND Version=?" count = self.conn.execute(stmt, (soi.title, soi.version)).fetchone()[0] try: # If SOI exists, overwrite if count == 1: stmt = ( "UPDATE SOI SET SOI=?, Date=? WHERE Title=? AND Version=?" ) self.conn.execute( stmt, (compressed_soi, soi.date, soi.title, soi.version) ) # SOI does not exist, insert else: stmt = ( "INSERT INTO SOI (Title, Version, SOI, Date) " "VALUES(?,?,?,?)" ) self.conn.execute( stmt, (soi.title, soi.version, compressed_soi, soi.date) ) self.conn.commit() except sqlite3.IntegrityError: exec_warning_dialog( "SOI ble ikke skrevet til database.", "Sørg for at SOI har en tittel og versjon.", )
Methods
def add_code_to(self, word, mode='ascii')
-
Generate and insert a code for the new word in DB-table CodeBook.
This function is espescially designed for when a single word is added to the CodeBook table. A unique code is generate and inserted in Code which for the parameter word from before were NULL. Dependent on the number of entries in the table various actions are performed. If the new word makes the number of entries pass 26^x and the length of the codes does not have he capacity, all the codes are updatet to an appropriate length.
Parameters
word
:string
- The word to generate a code for.
mode
:string
- 'ascii' for letters (default), 'digits' for digits and 'combo' for combination of letters and digits.
Raises
ValueError
- If code is added to empty DB.
Expand source code
def add_code_to(self, word, mode="ascii"): """Generate and insert a code for the new word in DB-table CodeBook. This function is espescially designed for when a single word is added to the CodeBook table. A unique code is generate and inserted in Code which for the parameter word from before were NULL. Dependent on the number of entries in the table various actions are performed. If the new word makes the number of entries pass 26^x and the length of the codes does not have he capacity, all the codes are updatet to an appropriate length. Parameters ---------- word : string The word to generate a code for. mode : string 'ascii' for letters (default), 'digits' for digits and 'combo' for combination of letters and digits. Raises ------ ValueError If code is added to empty DB. """ # Get length of codes and calculate needed length of codes stmt = "SELECT COUNT(*) FROM Codebook" number_of_entries = self.conn.execute(stmt).fetchall()[0][0] stmt = "SELECT Code FROM Codebook" # In special case where table is empty if number_of_entries <= 0: raise ValueError("Can't add code to table with no words.") # In special case table only has a single word if number_of_entries == 1: actual_code_len = soitool.coder.get_code_length_needed( number_of_entries ) else: # Since the newly added word's code is NULL and at [0][0], # get [1][0] to get code length from an actual code actual_code_len = len( self.conn.execute(stmt).fetchall()[1]["Code"] ) needed_code_len = soitool.coder.get_code_length_needed( number_of_entries ) if actual_code_len < needed_code_len: self.update_codebook() else: # Get all codes and convert to set codes = self.conn.execute(stmt).fetchall() codes = {c[:][0] for c in codes} # Get new unique code for param word code = soitool.coder.get_code(needed_code_len, mode) while code in codes: code = soitool.coder.get_code(needed_code_len, mode) # Insert code to the param word in db stmt = "UPDATE Codebook SET Code = ? WHERE Word = ?" self.conn.execute(stmt, (code, word)) self.conn.commit()
def create_tables(self)
-
Create database-tables.
Expand source code
def create_tables(self): """Create database-tables.""" stmts = [SOI, CODEBOOK, CATEGORYWORDS, LASTUPDATED] for stmt in stmts: self.conn.execute(stmt) self.conn.commit()
def fill_category_words(self)
-
Read data from CategoryWords.txt and fill DB-table CategoryWords.
Expand source code
def fill_category_words(self): """Read data from CategoryWords.txt and fill DB-table CategoryWords.""" file_path = os.path.join(CURDIR, "testdata/CategoryWords.txt") file = open(file_path, "r", encoding="utf-8") # Get number of categories on file no_of_categories = int(file.readline().rstrip()) # Loop through categories on file for _ in range(no_of_categories): # Get category and number of words in category line = file.readline().split(", ") category = line[0] no_of_words = int(line[1].rstrip()) # Loop through words in category and add rows to DB stmt = "INSERT INTO CategoryWords(Word, Category) VALUES(?, ?)" for _ in range(no_of_words): word = file.readline().rstrip() self.conn.execute(stmt, (word, category,)) file.close()
def fill_codebook(self)
-
Read data from codebook.json and fill DB-table Codebook.
Expand source code
def fill_codebook(self): """Read data from codebook.json and fill DB-table Codebook.""" file_path = os.path.join(CURDIR, "testdata/codebook.json") # Load json as dict file = open(file_path, "r", encoding="utf-8") entries = json.load(file) file.close() # Generate codes code_len = soitool.coder.get_code_length_needed(len(entries)) codes = soitool.coder.get_code_set(len(entries), code_len) # Insert data in db stmt = ( "INSERT INTO Codebook(Word, Category, Type, Code)" "VALUES(?,?,?,?)" ) for word in entries: self.conn.execute( stmt, (word["word"], word["category"], word["type"], codes.pop()), )
def fill_last_updated(self)
-
Fill table with current date and time.
Expand source code
def fill_last_updated(self): """Fill table with current date and time.""" stmt = "INSERT INTO LastUpdated(Timestamp) VALUES(?)" self.conn.execute(stmt, (str(datetime.now()),))
def fill_tables(self)
-
Fill tables with testdata.
Expand source code
def fill_tables(self): """Fill tables with testdata.""" self.fill_codebook() self.fill_category_words() self.fill_last_updated() self.conn.commit()
def get_categories(self)
-
Retrieve all categories from DB-table CategoryWords.
Returns
List
- Containing categories (string).
Expand source code
def get_categories(self): """Retrieve all categories from DB-table CategoryWords. Returns ------- List Containing categories (string). """ stmt = "SELECT DISTINCT Category FROM CategoryWords" queried = self.conn.execute(stmt) categories = [row["Category"] for row in queried] return categories
def get_categories_from_codebook(self, small=False)
-
Read categories from full or small codebook.
Parameters
small
:bool
, optional- Categories are from small codebook if True, full codebook if False, by default False.
Returns
List
- Containing categories (string).
Expand source code
def get_categories_from_codebook(self, small=False): """Read categories from full or small codebook. Parameters ---------- small : bool, optional Categories are from small codebook if True, full codebook if False, by default False. Returns ------- List Containing categories (string). """ stmt = "SELECT Category FROM Codebook" if small: stmt += " WHERE Type='Liten' GROUP BY Category" else: stmt += " GROUP BY Category" queried = self.conn.execute(stmt).fetchall() categories = [row["Category"] for row in queried] return categories
def get_category_words(self, category)
-
Retrieve words from DB-table CategoryWords in given category.
Parameters
category
:str
- Category of desired words.
Expand source code
def get_category_words(self, category): """Retrieve words from DB-table CategoryWords in given category. Parameters ---------- category : str Category of desired words. """ stmt = "SELECT DISTINCT Word FROM CategoryWords WHERE Category = ?" queried = self.conn.execute(stmt, (category,)) words = [] for row in queried: words.append(row["Word"]) return words
def get_codebook(self, small=False, sort=CodebookSort.WORD)
-
Retrieve the entries belonging to the full or small codebook.
Parameters
small
:Bool
- Full codebook will be returned if False (default). Small codebook will be returned if True.
sort
:int
, optional- 0 or 1 (enumerate value from 'CodebookSort') Data is sorted by Word if 0 (default), by Code if 1
Returns
codebook
:list
(ofdicts
)- [{'word': str, 'type': str, 'category': str, 'code': str}]
Raises
ValueError
- If value for parameter sort is invalid
Expand source code
def get_codebook(self, small=False, sort=CodebookSort.WORD): """Retrieve the entries belonging to the full or small codebook. Parameters ---------- small : Bool Full codebook will be returned if False (default). Small codebook will be returned if True. sort : int, optional 0 or 1 (enumerate value from 'CodebookSort') Data is sorted by Word if 0 (default), by Code if 1 Returns ------- codebook : list (of dicts) [{'word': str, 'type': str, 'category': str, 'code': str}] Raises ------ ValueError If value for parameter sort is invalid """ stmt = "SELECT * FROM (SELECT * FROM Codebook" if sort == CodebookSort.WORD: stmt += " ORDER BY Word)" elif sort == CodebookSort.CODE: stmt += " ORDER BY Code)" else: raise ValueError( "Invalid value for parameter 'sort': '" + "'{}'".format(sort) ) # Get either full or small codebook if small: stmt += " WHERE Type = ?" queried = self.conn.execute(stmt, ("Liten",)).fetchall() else: queried = self.conn.execute(stmt).fetchall() codebook = [] for entry in queried: codebook.append( { "word": entry["Word"], "category": entry["Category"], "type": entry["Type"], "code": entry["Code"], } ) return codebook
def get_codebook_expressions_in_category(self, category, small=False)
-
Read expressions, from full or small codebook, in a given category.
Parameters
category
:string
- Expressions in this category are returned.
small
:bool
, optional- Expressions are from small codebook if True, full codebook if False, by default False.
Returns
List
- Containing expressions (string).
Expand source code
def get_codebook_expressions_in_category(self, category, small=False): """Read expressions, from full or small codebook, in a given category. Parameters ---------- category : string Expressions in this category are returned. small : bool, optional Expressions are from small codebook if True, full codebook if False, by default False. Returns ------- List Containing expressions (string). """ stmt = "SELECT Word FROM Codebook WHERE Category=?" if small: stmt += " AND Type='Liten'" queried = self.conn.execute(stmt, (category,)).fetchall() expressions = [row["Word"] for row in queried] return expressions
def get_random_category_word(self)
-
Read a random word from database-table CategoryWords.
Returns
String
- Word (string).
Expand source code
def get_random_category_word(self): """Read a random word from database-table CategoryWords. Returns ------- String Word (string). """ stmt = "SELECT Word FROM CategoryWords ORDER BY RANDOM() LIMIT 1" word = self.conn.execute(stmt).fetchone()[0] return word
def insert_or_update_soi(self, soi)
-
Serialize, compress and insert/update SOI in database-table SOI.
SOI's with the same title and version are overwritten.
Parameters
soi
:SOI
- The SOI-instance to insert or update.
Expand source code
def insert_or_update_soi(self, soi): """Serialize, compress and insert/update SOI in database-table SOI. SOI's with the same title and version are overwritten. Parameters ---------- soi : soitool.soi.SOI The SOI-instance to insert or update. """ # Serialize and compress SOI compressed_soi = compress(serialize_soi(soi)) # Check if SOI with the same title and version exists stmt = "SELECT COUNT(*) FROM SOI WHERE Title=? AND Version=?" count = self.conn.execute(stmt, (soi.title, soi.version)).fetchone()[0] try: # If SOI exists, overwrite if count == 1: stmt = ( "UPDATE SOI SET SOI=?, Date=? WHERE Title=? AND Version=?" ) self.conn.execute( stmt, (compressed_soi, soi.date, soi.title, soi.version) ) # SOI does not exist, insert else: stmt = ( "INSERT INTO SOI (Title, Version, SOI, Date) " "VALUES(?,?,?,?)" ) self.conn.execute( stmt, (soi.title, soi.version, compressed_soi, soi.date) ) self.conn.commit() except sqlite3.IntegrityError: exec_warning_dialog( "SOI ble ikke skrevet til database.", "Sørg for at SOI har en tittel og versjon.", )
def seconds_to_next_update(self, period)
-
Return time to next update of Codebook in seconds.
Parameters
period
:int
- The number of seconds between each update
Returns
seconds_to_update
:float
- Time to next update in seconds
Expand source code
def seconds_to_next_update(self, period): """Return time to next update of Codebook in seconds. Parameters ---------- period : int The number of seconds between each update Returns ------- seconds_to_update : float Time to next update in seconds """ stmt = "SELECT Timestamp FROM LastUpdated" last_updated = self.conn.execute(stmt).fetchall()[0]["Timestamp"] # Convert datetime string to datetime object last_updated = datetime.strptime(last_updated, "%Y-%m-%d %H:%M:%S.%f") # Calculate the number of seconds until next update seconds_to_update = ( period - (datetime.now() - last_updated).total_seconds() ) # Since QTimer does not handle negative values if seconds_to_update <= 0: return 0 return seconds_to_update
def update_codebook(self)
-
Update codes in DB.
Expand source code
def update_codebook(self): """Update codes in DB.""" # Get all the words (PK) stmt = "SELECT Word FROM Codebook" words = self.conn.execute(stmt).fetchall() number_of_entries = len(words) # Generate new codes code_len = soitool.coder.get_code_length_needed(number_of_entries) codes = soitool.coder.get_code_set(number_of_entries, code_len) # Statement for update stmt = "UPDATE Codebook SET Code = ?" # Inserting NULL into Code column because of UNIQUE constraint self.conn.execute(stmt, (None,)) # Fill Code column with new codes stmt = stmt + "WHERE Word = ?" for i in range(number_of_entries): self.conn.execute(stmt, (codes.pop(), words[i][0])) # Updates LastUpdated with current time self.update_last_updated() # Save changes in db self.conn.commit()
def update_codebook_auto(self, timer)
-
Update Codebook if needed and update time for timer.
Parameters
timer
:QTimer
- Timer to set new interval on.
Expand source code
def update_codebook_auto(self, timer): """Update Codebook if needed and update time for timer. Parameters ---------- timer : QTimer Timer to set new interval on. """ if self.seconds_to_next_update(SECONDS_IN_24H) <= 0: self.update_codebook() timer.setInterval(self.seconds_to_next_update(SECONDS_IN_24H) * 1000)
def update_last_updated(self)
-
Update Timestamp in LastUpdated to current time.
Expand source code
def update_last_updated(self): """Update Timestamp in LastUpdated to current time.""" stmt = "UPDATE LastUpdated SET Timestamp = ?" self.conn.execute(stmt, (str(datetime.now()),)) self.conn.commit()