Permalink
Cannot retrieve contributors at this time
Name already in use
A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
5001CEM_bookshop/database.py
Go to fileThis commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
365 lines (294 sloc)
13.6 KB
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sqlite3 | |
import os | |
import datetime | |
def register_user(form_data): | |
""" | |
Register the user onto the users database | |
Should be run after form validation. | |
:param form_data: The data values from the registration form | |
:return: Nothing | |
""" | |
# In the future, the data can be escaped using the escape() function. | |
firstname = form_data['firstname'] | |
middlename = form_data['middlename'] | |
surname = form_data['surname'] | |
gender = form_data['gender'] | |
postcode = form_data['postcode'] | |
address = form_data['address'] | |
username = form_data['username'] | |
email = form_data['email'] | |
password = form_data['password_set'] | |
conn = sqlite3.connect('db/USER_DB.sqlite3') | |
cur = conn.cursor() | |
sql_string = f"INSERT INTO user_info VALUES (NULL,'{username}','{password}','{email}','{firstname}','{middlename}','{surname}','{gender}','{address}','{postcode}',0);" | |
cur.execute(sql_string) | |
cur.close() | |
conn.commit() | |
conn.close() | |
def get_access_type(uname, pwd): | |
""" | |
Given a username and password, get the associated access level from the database | |
:param uname : Username of the user | |
:param pwd: Password of the user | |
:return: the access level of the user as a string, or the string 401 if they are not registered | |
""" | |
conn = sqlite3.connect('db/USER_DB.sqlite3') | |
cur = conn.cursor() | |
cur.execute("SELECT username FROM user_info WHERE is_admin=1;") | |
db_admin_usernames = cur.fetchall() # get username values for admin users from db | |
cur.execute("SELECT username FROM user_info WHERE is_admin=0;") | |
db_normal_usernames = cur.fetchall() # get username values for normal users from db | |
admin_usernames = [] | |
normal_usernames = [] | |
for db_name in db_admin_usernames: # Append the actual values for verification | |
admin_usernames.append(db_name[0]) | |
for db_name in db_normal_usernames: | |
normal_usernames.append(db_name[0]) | |
if uname in admin_usernames: # If the username matches an admin one | |
cur.execute(f"SELECT password FROM user_info WHERE username='{uname}'") | |
admin_password = cur.fetchone()[0] # select record with same admin username | |
if pwd == admin_password: # from the selected record, check if the password matches | |
return 'admin' # if it does, return the admin role | |
else: # otherwise | |
cur.close() # close database connections | |
conn.close() | |
return '401' # return 401 error | |
elif uname in normal_usernames: | |
cur.execute(f"SELECT password FROM user_info WHERE username='{uname}'") | |
normal_password = cur.fetchone()[0] # select record with same normal username | |
if pwd == normal_password: # from the selected record, check if the password matches | |
cur.close() | |
conn.close() | |
return 'normal' # if it does, return the normal role | |
else: # otherwise | |
cur.close() | |
conn.close() | |
return '401' # return 401 error | |
else: # if the name is not in the database | |
cur.close() | |
conn.close() | |
return '401' # return 401 error | |
def username_unique(username): | |
""" | |
Returns a boolean indicating if the username provided is already in the database | |
:param username: The username to be checked with the users database | |
:return: False if the username is in the database, True if the username is not in the database. | |
""" | |
conn = sqlite3.connect('db/USER_DB.sqlite3') | |
cur = conn.cursor() | |
cur.execute("SELECT username FROM user_info;") | |
usernames = cur.fetchall() # get usernames from database | |
cur.close() | |
conn.close() | |
actual_usernames = [] | |
for db_username in usernames: | |
actual_usernames.append(db_username[0]) # get the actual values required for verification | |
if username in actual_usernames: # if the username is in the usernames field of the database | |
return False | |
else: # otherwise the username is unique | |
return True | |
# noinspection DuplicatedCode | |
def email_unique(email): | |
""" | |
Returns a boolean indicating if the given email is already in the database | |
:param email: the email you want to check is already in the database | |
:return: False if the email is in the database, True if it is not. | |
""" | |
conn = sqlite3.connect('db/USER_DB.sqlite3') | |
cur = conn.cursor() | |
cur.execute("SELECT email FROM user_info;") | |
emails = cur.fetchall() # get emails from database | |
cur.close() | |
conn.close() | |
actual_emails = [] | |
for db_email in emails: | |
actual_emails.append(db_email[0]) | |
if email in actual_emails: # if the email is in the email field of the database | |
return False | |
else: # otherwise the email is unique | |
return True | |
def check_stock_from_isbn(code): | |
""" | |
Returns a boolean indicating if the book has details for it in the books database | |
:param code: The 13 digit ISBN code | |
:return: True if the book is in the database, False if not. | |
""" | |
conn = sqlite3.connect('db/BOOK_DB.sqlite3') | |
cur = conn.cursor() | |
cur.execute("SELECT ISBN FROM book_info;") | |
db_book_isbns = cur.fetchall() # get list of ISBN13s from database | |
book_isbns = [] | |
for isbn in db_book_isbns: | |
book_isbns.append(isbn[0]) | |
code = int(code) # Convert the code to an integer before checking if it is in the list, as it is currently a string | |
if code in book_isbns: # if the code is in the list of isbn codes that are in the database | |
return True | |
else: # otherwise | |
return False | |
def get_sl_details(): | |
""" | |
Gets stock details for the stock levels screen.\n | |
These are to display the book's cover, title, ISBN and quantity.\n | |
The cover's filename will be added for the cover section as these will be in the static folder and we just need the name | |
:returns: sl_data which holds tuples in this form - (column_name, value) for the 4 columns. | |
""" | |
sl_cols = ["Cover", "Title", "ISBN13", "Quantity Available"] | |
conn = sqlite3.connect('db/BOOK_DB.sqlite3') | |
cur = conn.cursor() | |
cur.execute("SELECT COUNT(ISBN) FROM book_info;") # First check if there is any data in the database at all | |
rows = cur.fetchone()[0] | |
if rows == 0: | |
return None, sl_cols # If there isn't, don't return any data and only return the column names. | |
cur.execute("SELECT FILENAME,TITLE,ISBN,QUANTITY FROM book_info;") | |
db_data = cur.fetchall() | |
db_data_list = [] | |
for record in db_data: # for each record in the database | |
db_data_list.append(record) # add it to another list which will hold the actual values needed | |
longlist = [] | |
for record in db_data_list: # for each list which represents a record | |
for value in record: # for each value inside of those records | |
longlist.append(value) # add the value to another list called longlist | |
sl_data = [] | |
for i, a in enumerate(longlist): # for the length of the long list of values | |
sl_data.append((sl_cols[i % 4], a)) # assign the column name to each value as a tuple | |
return sl_data, sl_cols # return the data and column names as a tuple | |
def get_book_info(isbn13): | |
""" | |
Return all details from the database for a given ISBN | |
:param isbn13: The 13 digit ISBN code | |
:return: Book Details and table names zipped into a dict e.g. {ISBN:978...} | |
""" | |
conn = sqlite3.connect('db/BOOK_DB.sqlite3') | |
cur = conn.cursor() | |
cur.execute(f"SELECT * FROM book_info WHERE ISBN={isbn13};") | |
selected_book = cur.fetchall() # Check the database for the selected book via its isbn13 | |
column_names = [description[0] for description in | |
cur.description] # Get column names from the database - https://stackoverflow.com/a/7831685 | |
cur.close() | |
conn.close() | |
book_info = list( | |
zip(column_names, selected_book[0])) # Make a list of the column names paired with the actual value needed. | |
return book_info | |
def get_home_data(): | |
""" | |
Gets stock details for the home screen.\n | |
These are to display the book's cover and title, but ISBN is included for identification. Quantity is also included as it is a useful property to check if the item is in stock\n | |
The cover's filename will be added for the cover section.\n | |
The function only returns what is in stock and is related to get_sl_details. | |
:returns: hp_data which holds tuples in this form - (column_name, value) | |
""" | |
hp_cols = ["Cover", "Title", "In Stock", "ID"] # Columns needed for the homepage | |
conn = sqlite3.connect('db/BOOK_DB.sqlite3') | |
cur = conn.cursor() | |
cur.execute("SELECT COUNT(ISBN) FROM book_info;") | |
rows = cur.fetchone()[0] | |
if rows == 0: | |
return None, hp_cols # If the database is empty, dont return data | |
cur.execute("SELECT FILENAME,TITLE,QUANTITY,ISBN FROM book_info WHERE QUANTITY>=1;") | |
db_data = cur.fetchall() | |
db_data_list = [] | |
# SEE get_sl_details(), it is similar in code. | |
for record in db_data: # for each record in the database | |
db_data_list.append(record) # convert it into a list and add it to another list which will hold all the values | |
longlist = [] | |
for record in db_data_list: # for each list which represents a record | |
for value in record: # for each value inside of those records | |
longlist.append(value) # add the value to another list | |
hp_data = [] | |
for i, a in enumerate(longlist): # for the length of the long list of values | |
hp_data.append( | |
(hp_cols[i % 4], a)) # assign the column name to each value as a tuple, easily found by using modulus | |
return hp_data, hp_cols # return the column names and the list of data-column pairs | |
def add_book_to_db(form_data, files): | |
conn = sqlite3.connect('db/BOOK_DB.sqlite3') | |
cur = conn.cursor() | |
# store information from the forms into variables | |
title = form_data['title'] | |
author_firstname = form_data['author_firstname'] | |
author_middlename = form_data['author_middlename'] | |
author_surname = form_data['author_surname'] | |
publisher = form_data['publisher'] | |
pub_date = form_data['pub_date'] | |
isbn13 = form_data['isbn13'] | |
description = form_data['description'] | |
trade_price = form_data['trade_price'] | |
retail_price = form_data['retail_price'] | |
quantity = form_data['quantity'] | |
# Correct the data types, formats and escape all strings. | |
FILENAME = str(isbn13) + ".png" | |
isbn13 = int(isbn13) | |
title = escape(title) | |
author_firstname = escape(author_firstname) | |
author_middlename = escape(author_middlename) | |
author_surname = escape(author_surname) | |
publisher = escape(publisher) | |
description = escape(description) | |
trade_price = float(trade_price) | |
retail_price = float(retail_price) | |
# Datetime formatting - https://stackoverflow.com/a/35780962 | |
pub_date = pub_date.strftime("%d-%m-%Y") | |
# insert syntax https://stackoverflow.com/a/45575666 | |
insert_tuple = ( | |
isbn13, FILENAME, title, author_firstname, author_middlename, author_surname, publisher, pub_date, description, | |
quantity, retail_price, trade_price) | |
insert_string = "INSERT INTO book_info VALUES (?,?,?,?,?,?,?,?,?,?,?,?,0);" | |
update_string = f"UPDATE book_info SET ISBN={isbn13}, FILENAME='{FILENAME}', TITLE='{title}',AUTHOR_FIRSTNAME='{author_firstname}', AUTHOR_MIDDLENAME='{author_middlename}', AUTHOR_SURNAME='{author_surname}', PUBLISHER='{publisher}', PUB_DATE='{pub_date}', FULL_DESC='{description}',QUANTITY={quantity},RETAIL_PRICE={retail_price},TRADE_PRICE={trade_price} WHERE ISBN={isbn13};" | |
try: # to insert into db, e.g. record does not exist already. | |
cur.execute(insert_string, insert_tuple) | |
print("Inserted record") | |
except sqlite3.Error as e: # e.g. unique constraint failed - update existing record. | |
print(e) | |
cur.execute(update_string) | |
print("Updated record") | |
dbpath = os.path.abspath("static/img/") # absolute path https://stackoverflow.com/a/51523 | |
dbpath = dbpath.replace("\\", "/") # change direction of slashes to be only one way | |
savepath = os.path.join(dbpath + '/' + FILENAME) | |
files.save(savepath) | |
cur.close() | |
conn.commit() | |
conn.close() | |
def escape(string): | |
""" | |
Escape strings for usage in sqlite3 database TEXT fields.\n | |
The idea is from https://stackoverflow.com/a/58138810 | |
:param string: String to be escaped | |
:return: An escaped string | |
""" | |
return string.replace("'", "''").replace('"', '""') # escaping in sqlite3 https://stackoverflow.com/a/58138810 | |
def get_filename(path): | |
""" | |
Given a full path, gets the filename.\n | |
Only for the img folder in static | |
:param path: An absolute path to a file in the static/img/ folder | |
:return: The filename | |
""" | |
dbpath = os.path.abspath("static/img/") # absolute path https://stackoverflow.com/a/51523 | |
dbpath = dbpath.replace("\\", "/") # change direction of slashes to be only one way | |
new_path = path.remove_suffix(dbpath) | |
return new_path | |
def get_quantity(isbn): | |
""" | |
Get the QUANTITY value given the 13-digit ISBN | |
:param isbn: ISBN13, which should be checked beforehand | |
:return: The quantity value | |
""" | |
conn = sqlite3.connect('db/BOOK_DB.sqlite3') | |
cur = conn.cursor() | |
cur.execute(f"SELECT QUANTITY FROM book_info WHERE ISBN={isbn}") | |
quantity = cur.fetchone()[0] | |
cur.close() | |
conn.close() | |
return quantity | |
def get_price(isbn): | |
""" | |
Get the PRICE value given the 13-digit ISBN | |
:param isbn: ISBN13, which should be checked beforehand | |
:return: The price value | |
""" | |
conn = sqlite3.connect('db/BOOK_DB.sqlite3') | |
cur = conn.cursor() | |
cur.execute(f"SELECT RETAIL_PRICE FROM book_info WHERE ISBN={isbn}") | |
price = cur.fetchone()[0] | |
cur.close() | |
conn.close() | |
return price |