Permalink
Cannot retrieve contributors at this time
Name already in use
A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
BS/app.py
Go to fileThis commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
453 lines (428 sloc)
17.3 KB
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
The main code for server. Defines the interface and the logistics of the server. | |
""" | |
from flask import Flask, request, render_template, flash, session, redirect, url_for | |
from flask_sqlalchemy import SQLAlchemy | |
from werkzeug.security import check_password_hash, generate_password_hash | |
from db import Users, Books, Item | |
import sqlalchemy, random | |
from datetime import datetime, date | |
# import database connection and server from db file | |
from db import db as db_connect | |
from db import app | |
# Set the secret key to some random bytes. Keep this really secret! | |
app.secret_key = b'_5#y2L"F4Q8z\n\xec]/' | |
@app.route("/") | |
def getBooks(): | |
""" | |
Return all books in the database. | |
""" | |
books = Books.query.all() | |
if len(books) > 0: | |
# render home page with books | |
return render_template("home.html", books=books) | |
# if there is no book available, render a default layout | |
return render_template("layout.html") | |
@app.route("/index") | |
def index(): | |
""" | |
The main page | |
""" | |
books = Books.query.all() | |
if len(books) > 0: | |
# render home page with books | |
return render_template("home.html", books=books) | |
# if there is no book available, render a default layout | |
return render_template("layout.html") | |
@app.route('/register', methods=['POST', 'GET']) | |
def register(): | |
""" | |
Register a new user. | |
Getting username and password and repeated password from submitted form data. | |
""" | |
if request.method == "POST": | |
error = None | |
# get data | |
username = request.form['username'] | |
password = request.form['password'] | |
pwd = request.form['pwd'] | |
# get the completeness of data | |
if not username: | |
error = 'Username is required.' | |
elif not password: | |
error = 'Password is required.' | |
elif not pwd: | |
error = 'Password is required.' | |
# the completeness check pass | |
if error is None: | |
if password == pwd: | |
try: | |
# try to insert the new user | |
customer = Users(username=username, | |
password=generate_password_hash(password)) | |
db_connect.session.add(customer) | |
db_connect.session.commit() | |
except sqlalchemy.exc.IntegrityError: | |
# there is an existing user | |
error = f"User {username} is already registered." | |
else: | |
# unknown error, go back to login | |
return redirect(url_for("login")) | |
else: | |
# the repeated password is not the same as original password | |
error = "Password is wrong." | |
# flash error message to the frontend | |
flash(error) | |
# return the register page with error message | |
return render_template('register.html', error=error) | |
else: | |
return render_template('register.html') | |
@app.route('/login', methods=['POST', 'GET']) | |
def login(): | |
""" | |
User login. Getting username and password from submitted form data. | |
""" | |
if request.method == 'POST': | |
error = None | |
# getting data | |
username = request.form['username'] | |
password = request.form['password'] | |
# check the completeness of data | |
if not username: | |
error = 'Username is required.' | |
elif not password: | |
error = 'Password is required.' | |
# get users with the username | |
user = Users.query.filter_by(username=username).first() | |
# no such user | |
if user is None: | |
error = 'Incorrect username.' | |
# password is invalid | |
elif not check_password_hash(user.password, password): | |
error = 'Incorrect password.' | |
# login successfully | |
if error is None: | |
# record the user information in session | |
session.clear() | |
session['user_id'] = user.id | |
session['username'] = user.username | |
# load shopping cart | |
items = Item.query.filter_by(user_id=user.id).all() | |
if len(items) > 0: | |
# generate data for shopping cart | |
total_quantity = 0 | |
total_price = 0 | |
for item in items: | |
total_quantity += int(item.quantity) | |
total_price += int(item.quantity)*item.trade_price | |
session['total_quantity'] = total_quantity | |
session['total_price'] = round(total_price,2) | |
else: | |
session['total_quantity'] = 0 | |
session['total_price'] = 0 | |
# render main page | |
return redirect(url_for('index')) | |
else: | |
# return the error message | |
flash(error) | |
return render_template('login.html', error=error) | |
else: | |
return render_template("login.html") | |
@app.route('/stock', methods=['POST', 'GET']) | |
def stock(): | |
""" | |
Get stock for books | |
""" | |
if request.method == 'GET': | |
# get all books and return the stock | |
books = Books.query.all() | |
if len(books) > 0: | |
return render_template("stock.html", books=books) | |
return render_template("layout.html") | |
@app.route('/add_stock', methods=['POST', 'GET']) | |
def add_stock(): | |
""" | |
Add stock to the database. | |
""" | |
if request.method == 'POST': | |
error = None | |
# get data from the form post | |
ISBN = request.form['ISBN'] | |
bookname = request.form['bookname'] | |
author = request.form['author'] | |
public_date = request.form['public_date'] | |
public_date = datetime.strptime(public_date, '%Y-%m-%d') | |
public_date = datetime.combine( | |
public_date.today(), datetime.min.time()) | |
description = request.form['description'] | |
# getting the image file | |
picture_url = request.files.get('picture_url') | |
# only allow images | |
ALLOWED_EXTENSIONS = ['png', 'jpg', 'jpeg', 'gif','jfif'] | |
# if the image exists | |
if picture_url is not None and '.' in picture_url.filename and picture_url.filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS: | |
# create a random file name with time and a random number | |
picpath = './static/images/' | |
nowTime = datetime.now().strftime("%Y%m%d%H%M%S") | |
randomNum = random.randint(0, 100) | |
randomNum = str(randomNum).zfill(3) | |
uniqueNum = nowTime + randomNum | |
picurl = picpath + uniqueNum + "_" +picture_url.filename | |
# actually save the file to disk | |
picture_url.save(picurl) | |
picture_url = uniqueNum + "_" +picture_url.filename | |
# get rest data | |
trade_price = request.form['trade_price'] | |
retail_price = request.form['retail_price'] | |
quantity = request.form['quantity'] | |
# check the completeness of data | |
if ISBN and bookname and author and public_date and description and picture_url and trade_price and retail_price and quantity: | |
# change the type of bookname, price, quantity | |
trade_price = int(trade_price) | |
retail_price = int(retail_price) | |
quantity = int(quantity) | |
# check whether there is a book with the same ISBN | |
book = Books.query.filter_by(ISBN=ISBN).first() | |
if book is None: | |
# create a new book | |
new_book = Books(ISBN=ISBN, bookname=bookname, author=author, public_date=public_date, description=description, | |
picture_url=picture_url, trade_price=trade_price, retail_price=retail_price, quantity=quantity) | |
db_connect.session.add(new_book) | |
db_connect.session.commit() | |
return redirect(url_for("stock")) | |
else: | |
# update the existing book data | |
book.bookname = bookname | |
book.author = author | |
book.published_date = public_date | |
book.description = description | |
book.picture_url = picture_url | |
book.trade_price = trade_price | |
book.retail_price = retail_price | |
book.quantity = quantity | |
db_connect.session.commit() | |
return redirect(url_for("stock")) | |
else: | |
error = "Something is required." | |
if error is not None: | |
flash(error) | |
return render_template('add_stock.html', error=error) | |
else: | |
return render_template("add_stock.html") | |
@app.route('/cart', methods=['POST', 'GET']) | |
def cart(): | |
""" | |
Get the cart data or add a book to the cart. | |
Method GET: getting data. | |
Method POST: adding new book. | |
""" | |
# get current user | |
user_id = session['user_id'] | |
if request.method == 'POST': | |
# add new book | |
error = None | |
# add the book information | |
ISBN = request.form['ISBN'] | |
quantity = request.form['quantity'] | |
# data is valid | |
if ISBN and quantity: | |
# get all books | |
book = Books.query.filter_by(ISBN=ISBN).first() | |
if book is not None: | |
# check the quantity of the book | |
item = Item.query.filter_by( | |
ISBN=ISBN, user_id=user_id).first() | |
# the book is existed in shopping cart | |
if item is not None: | |
item.quantity = item.quantity + int(quantity) | |
if item.quantity > book.quantity: | |
item.quantity = book.quantity | |
if item.quantity == 0: | |
error = "The stock of the book is not enough." | |
return redirect(url_for('index')) | |
db_connect.session.commit() | |
# update the total price of the cart | |
items = Item.query.filter_by(user_id=user_id).all() | |
if len(items) > 0: | |
total_quantity = 0 | |
total_price = 0 | |
for item in items: | |
total_quantity += int(item.quantity) | |
total_price += int(item.quantity)*item.trade_price | |
session['total_quantity'] = total_quantity | |
session['total_price'] = round(total_price,2) | |
return render_template("cart.html", items=items) | |
return render_template("cart.html") | |
else: | |
# add a new book to shopping cart | |
# check the quantity of book | |
if int(quantity) > book.quantity: | |
quantity = book.quantity | |
if quantity == 0: | |
error = "The stock of the book is not enough." | |
return redirect(url_for('index')) | |
# add a new book to the shopping cart | |
new_item = Item( | |
ISBN=ISBN, user_id=user_id, quantity=quantity, bookname=book.bookname, trade_price=book.trade_price, picture_url=book.picture_url) | |
db_connect.session.add(new_item) | |
db_connect.session.commit() | |
items = Item.query.filter_by(user_id=user_id).all() | |
# update the price | |
if len(items) > 0: | |
total_quantity = 0 | |
total_price = 0 | |
for item in items: | |
total_quantity += int(item.quantity) | |
total_price += int(item.quantity)*item.trade_price | |
session['total_quantity'] = total_quantity | |
session['total_price'] = round(total_price,2) | |
return render_template("cart.html", items=items) | |
return render_template("cart.html") | |
else: | |
error = "The book is not exist." | |
return redirect(url_for('index')) | |
else: | |
error = "Something wrong." | |
return redirect(url_for('index')) | |
else: | |
# getting shopping cart data | |
try: | |
# query all items | |
items = Item.query.filter_by(user_id=user_id).all() | |
# update price | |
if len(items) > 0: | |
total_quantity = 0 | |
total_price = 0 | |
for item in items: | |
total_quantity += int(item.quantity) | |
total_price += int(item.quantity)*item.trade_price | |
session['total_quantity'] = total_quantity | |
session['total_price'] = round(total_price,2) | |
# render page with data | |
return render_template("cart.html", items=items) | |
return render_template("cart.html") | |
except Exception as e: | |
print(e) | |
return render_template("cart.html") | |
@app.route('/delete_item', methods=['POST']) | |
def delete_item(): | |
""" | |
Delete a book from cart. | |
""" | |
user_id = session['user_id'] | |
error = None | |
# get the book to delete | |
item_id = request.form['item_id'] | |
# get all books in the cart | |
item = Item.query.filter_by(item_id=item_id, user_id=user_id).first() | |
if item is not None: | |
# find the corresponding item | |
Item.query.filter_by(item_id=item_id, user_id=user_id).delete() | |
db_connect.session.commit() | |
items = Item.query.filter_by(user_id=user_id).all() | |
# update the total price | |
if len(items) > 0: | |
total_quantity = 0 | |
total_price = 0 | |
for item in items: | |
total_quantity += int(item.quantity) | |
total_price += int(item.quantity)*item.trade_price | |
session['total_quantity'] = total_quantity | |
session['total_price'] = round(total_price,2) | |
return render_template("cart.html", items=items) | |
else: | |
# delete all books, set the price and quantity to 0 | |
session['total_quantity'] = 0 | |
session['total_price'] = 0 | |
books = Books.query.all() | |
if len(books) > 0: | |
return render_template("home.html", books=books) | |
return render_template("layout.html") | |
@app.route('/empty_cart', methods=['POST', 'GET']) | |
def empty_cart(): | |
""" | |
Empty the cart, would delete all books from it. | |
""" | |
if request.method == 'GET': | |
# get current user | |
user_id = session['user_id'] | |
# get and delete all books in cart | |
Item.query.filter_by(user_id=user_id).delete() | |
db_connect.session.commit() | |
# update the total price | |
session['total_quantity'] = 0 | |
session['total_price'] = 0 | |
# render books | |
books = Books.query.all() | |
if len(books) > 0: | |
return render_template("home.html", books=books) | |
return render_template("layout.html") | |
@app.route('/checkout', methods=['POST', 'GET']) | |
def checkout(): | |
""" | |
Checkout page. Would check the cart and generate an order. | |
""" | |
session['total'] = 0 | |
if request.method == 'GET': | |
# get current user | |
user_id = session['user_id'] | |
# get all items in cart | |
items = Item.query.filter_by(user_id=user_id).all() | |
for item in items: | |
# check the quantity of each book | |
book = Books.query.filter_by(ISBN = item.ISBN).first() | |
if item.quantity > book.quantity: | |
item.quantity = book.quantity | |
# computing postage and final price | |
if item.quantity > 0: | |
item.postage = 3+int(item.quantity) - 1 | |
item.total_price = item.postage + int(item.quantity)*item.trade_price | |
else: | |
item.postage = 0 | |
item.total_price = 0 | |
session['total'] += item.total_price | |
session['total'] = round(session['total'],2) | |
return render_template('checkout.html', items=items) | |
@app.route('/pay', methods=['POST','GET']) | |
def pay(): | |
""" | |
Make a payment | |
""" | |
if request.method == 'POST': | |
# get the payment information | |
card = request.form['card'] | |
month = request.form['month'] | |
year = request.form['year'] | |
cvv = request.form['cvv'] | |
user_id = session['user_id'] | |
items = Item.query.filter_by(user_id = user_id).all() | |
# we do not perform actual payment here | |
for item in items: | |
# update the quantity of each purchased book. | |
book = Books.query.filter_by(ISBN=item.ISBN).first() | |
if book.quantity > item.quantity: | |
book.quantity -= int(item.quantity) | |
else: | |
book.quantity = 0 | |
db_connect.session.commit() | |
# update the shopping cart | |
Item.query.filter_by(user_id=user_id).delete() | |
db_connect.session.commit() | |
session['total'] = 0 | |
session['total_quantity'] = 0 | |
session['total_price'] = 0 | |
books = Books.query.all() | |
if len(books) > 0: | |
return render_template("home.html", books=books) | |
return render_template("layout.html") | |
else: | |
return render_template('pay.html') | |
@app.route('/logout', methods=['POST', 'GET']) | |
def logout(): | |
""" | |
Logout. Clear the session. | |
""" | |
session.clear() | |
return redirect(url_for('index')) | |
if __name__ == '__main__': | |
app.run("127.0.0.1", port=5000) |