Permalink
Cannot retrieve contributors at this time
Name already in use
A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
6005CEM-CW-Teplate/app/views.py
Go to fileThis commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
450 lines (366 sloc)
13.4 KB
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import functools | |
from urllib import request | |
from .meta import * | |
import datetime | |
#--* | |
import bcrypt | |
#--* | |
#--* | |
# decorator function for protecting different routes | |
def login_required(view): | |
# accept and wrap a given view | |
@functools.wraps(view) | |
# can have 0 or >=1 arguments | |
def wrapped_view(*args, **kwargs): | |
# check if user has logged in the current session | |
if "user" not in flask.session: | |
flask.flash("You need to be logged in for this feature.") | |
return flask.redirect(flask.url_for("login")) | |
# return to the view function if yes | |
return view(*args, **kwargs) | |
# call the wrapped function | |
return wrapped_view | |
#--* | |
@app.route("/") | |
def index(): | |
""" | |
Main Page. | |
""" | |
#Get data from the DB using meta function | |
rows = query_db("SELECT * FROM product") | |
app.logger.info(rows) | |
return flask.render_template("index.html", | |
bookList = rows) | |
@app.route("/products", methods=["GET","POST"]) | |
def products(): | |
""" | |
Single Page (ish) Application for Products | |
""" | |
theItem = flask.request.args.get("item") | |
if theItem: | |
#We Do A Query for It | |
#-- | |
theQry = "SELECT * FROM product WHERE id = ?" | |
args = (theItem,) | |
itemQry = query_db(theQry, args, True) | |
theQry1 = "SELECT * FROM review INNER JOIN user ON review.userID = user.id WHERE review.productID = ?;" | |
# add a , becasue args parameter is a tuple, i.e., accepts tuples | |
args = (itemQry['id'],) | |
reviewQry = query_db(theQry1, args) | |
#-- | |
#If there is form interaction and they put somehing in the basket | |
if flask.request.method == "POST": | |
quantity = flask.request.form.get("quantity") | |
try: | |
quantity = int(quantity) | |
except ValueError: | |
flask.flash("Error Buying Item") | |
return flask.render_template("product.html", | |
item = itemQry, | |
reviews=reviewQry) | |
app.logger.warning("Buy Clicked %s items", quantity) | |
#And we add something to the Session for the user to keep track | |
basket = flask.session.get("basket", {}) | |
basket[theItem] = quantity | |
flask.session["basket"] = basket | |
flask.flash("Item Added to Cart") | |
return flask.render_template("product.html", | |
item = itemQry, | |
reviews=reviewQry) | |
else: | |
books = query_db("SELECT * FROM product") | |
return flask.render_template("products.html", | |
books = books) | |
# ------------------ | |
# USER Level Stuff | |
# --------------------- | |
@app.route("/user/login", methods=["GET", "POST"]) | |
def login(): | |
""" | |
Login Page | |
""" | |
if flask.request.method == "POST": | |
#Get data | |
user = flask.request.form.get("email") | |
password = flask.request.form.get("password") | |
app.logger.info("Attempt to login as %s:%s", user, password) | |
#-- | |
theQry = "SELECT * FROM User WHERE email = ?" | |
args = (user,) | |
userQry = query_db(theQry, args, one=True) | |
#-- | |
if userQry is None: | |
flask.flash("No Such User") | |
else: | |
app.logger.info("User is Ok") | |
if bcrypt.checkpw(password.encode('utf-8'), userQry["password"]): | |
app.logger.info("Login as %s Success", userQry["email"]) | |
flask.session["user"] = userQry["id"] | |
flask.flash("Login Successful") | |
return (flask.redirect(flask.url_for("index"))) | |
else: | |
flask.flash("Password is Incorrect") | |
return flask.render_template("login.html") | |
@app.route("/user/create", methods=["GET","POST"]) | |
def create(): | |
""" Create a new account, | |
we will redirect to a homepage here | |
""" | |
if flask.request.method == "GET": | |
return flask.render_template("create_account.html") | |
#Get the form data | |
email = flask.request.form.get("email") | |
password = flask.request.form.get("password") | |
#--- | |
# hash the passwords before adding them to the database | |
# the hash is calculated using a byte string first and must be UTF-8 encoding | |
encodedPw = password.encode('utf-8') | |
salt = bcrypt.gensalt() | |
hashedPw = bcrypt.hashpw(encodedPw, salt) | |
#---- | |
#Sanity check do we have a name, email and password | |
if not email or not password: | |
flask.flash("Not all info supplied") | |
return flask.render_template("create_account.html", | |
email = email) | |
#Otherwise we can add the user | |
#-- | |
theQry = "Select * FROM User WHERE email = ?" | |
args = (email,) | |
userQry = query_db(theQry, args, one=True) | |
#-- | |
if userQry: | |
flask.flash("A User with that Email Exists") | |
return flask.render_template("create_account.html", | |
name = name, | |
email = email) | |
else: | |
#Crate the user | |
app.logger.info("Create New User") | |
#-- | |
# parametrized query will supply values at execution time, avoiding SQLi as well | |
# testing with bandit should drop the number of injectable queries | |
theQry = "INSERT INTO user (id, email, password) VALUES (NULL, ?, ?)" | |
args = (email, hashedPw) | |
userQry = write_db(theQry, args) | |
#-- | |
flask.flash("Account Created, you can now Login") | |
return flask.redirect(flask.url_for("login")) | |
@app.route("/user/<userId>/settings") | |
@login_required | |
def settings(userId): | |
""" | |
Update a users settings, | |
Allow them to make reviews | |
""" | |
#-- | |
theQry = "Select * FROM User WHERE id = ?" | |
args = (userId,) | |
thisUser = query_db(theQry, args, one=True) | |
# the user shouldn't be able to access other accounts' settings | |
if thisUser['id'] != flask.session['user']: | |
flask.flash("Access denied: Unauthorized access to another account") | |
return flask.redirect(flask.url_for("settings", userId=flask.session['user'])) | |
#-- | |
elif not thisUser: | |
flask.flash("No Such User") | |
return flask.redirect(flask.url_for("index")) | |
#Purchases | |
#-- | |
theSQL = "SELECT * FROM purchase WHERE userID = ?" | |
args = (userId,) | |
purchaces = query_db(theSQL, args) | |
theSQL = """ | |
SELECT productId, date, product.name | |
FROM purchase | |
INNER JOIN product ON purchase.productID = product.id | |
WHERE userID = ?; | |
""" | |
args = (userId,) | |
purchaces = query_db(theSQL, args) | |
#-- | |
return flask.render_template("usersettings.html", | |
user = thisUser, | |
purchaces = purchaces) | |
@app.route("/logout") | |
def logout(): | |
""" | |
Login Page | |
""" | |
flask.session.clear() | |
return flask.redirect(flask.url_for("index")) | |
@app.route("/user/<userId>/update", methods=["GET","POST"]) | |
def updateUser(userId): | |
""" | |
Process any chances from the user settings page | |
""" | |
#-- | |
theQry = "Select * FROM User WHERE id = ?" | |
args = (userId,) | |
thisUser = query_db(theQry, args, one=True) | |
#-- | |
if not thisUser: | |
flask.flash("No Such User") | |
return flask.redirect(flask.flask_url_for("index")) | |
#otherwise we want to do the checks | |
if flask.request.method == "POST": | |
current = flask.request.form.get("current") | |
password = flask.request.form.get("password") | |
app.logger.info("Attempt password update for %s from %s to %s", userId, current, password) | |
app.logger.info("%s == %s", current, thisUser["password"]) | |
if current: | |
if current == thisUser["password"]: | |
app.logger.info("Password OK, update") | |
#Update the Password | |
#-- | |
theSQL = "UPDATE user SET password = ? WHERE id = ?" | |
args = (password, userId) | |
app.logger.info("SQL %s", theSQL) | |
write_db(theSQL, args) | |
#-- | |
flask.flash("Password Updated") | |
else: | |
app.logger.info("Mismatch") | |
flask.flash("Current Password is incorrect") | |
return flask.redirect(flask.url_for("settings", | |
userId = thisUser['id'])) | |
flask.flash("Update Error") | |
return flask.redirect(flask.url_for("settings", userId=userId)) | |
# ------------------------------------- | |
# | |
# Functionality to allow user to review items | |
# | |
# ------------------------------------------ | |
@app.route("/review/<userId>/<itemId>", methods=["GET", "POST"]) | |
@login_required | |
def reviewItem(userId, itemId): | |
"""Add a Review""" | |
#-- | |
theQry = "Select * FROM User WHERE id = ?" | |
args = (userId,) | |
thisUser = query_db(theQry, args, one=True) | |
if thisUser['id'] != flask.session['user']: | |
flask.flash("Access denied: Unauthorized access to another account") | |
return flask.redirect(flask.url_for("settings", userId=flask.session['user'])) | |
#-- | |
#Handle input | |
if flask.request.method == "POST": | |
reviewStars = flask.request.form.get("rating") | |
reviewComment = flask.request.form.get("review") | |
#Clean up review whitespace | |
reviewComment = reviewComment.strip() | |
reviewId = flask.request.form.get("reviewId") | |
app.logger.info("Review Made %s", reviewId) | |
app.logger.info("Rating %s Text %s", reviewStars, reviewComment) | |
if reviewId: | |
#Update an existing oe | |
app.logger.info("Update Existing") | |
#-- | |
theSQL = "UPDATE review SET stars = ?, review = ? WHERE id = ?" | |
args = (reviewStars, reviewComment, reviewId) | |
app.logger.debug("%s", theSQL) | |
write_db(theSQL, args) | |
#-- | |
flask.flash("Review Updated") | |
else: | |
app.logger.info("New Review") | |
#-- | |
theSQL = """ | |
INSERT INTO review (userId, productId, stars, review) | |
VALUES (?, ?, ?, ?); | |
""" | |
args = (userId, itemId, reviewStars, reviewComment) | |
app.logger.info("%s", theSQL) | |
write_db(theSQL, args) | |
#-- | |
flask.flash("Review Made") | |
#Otherwise get the review | |
#-- | |
theQry = "SELECT * FROM product WHERE id = ?;" | |
args = (itemId,) | |
item = query_db(theQry, args, one=True) | |
theQry = "SELECT * FROM review WHERE userID = ? AND productID = ?;" | |
args = (userId, itemId) | |
review = query_db(theQry, args, one=True) | |
app.logger.debug("Review Exists %s", review) | |
#-- | |
return flask.render_template("reviewItem.html", | |
item = item, | |
review = review, | |
) | |
# --------------------------------------- | |
# | |
# BASKET AND PAYMEN | |
# | |
# ------------------------------------------ | |
@app.route("/basket", methods=["GET","POST"]) | |
@login_required | |
def basket(): | |
theBasket = [] | |
#Otherwise we need to work out the Basket | |
#Get it from the session | |
sessionBasket = flask.session.get("basket", None) | |
if not sessionBasket: | |
flask.flash("No items in basket") | |
return flask.redirect(flask.url_for("index")) | |
totalPrice = 0 | |
for key in sessionBasket: | |
#-- | |
theQry = "SELECT * FROM product WHERE id = ?" | |
args = (key,) | |
theItem = query_db(theQry, args, one=True) | |
#-- | |
quantity = int(sessionBasket[key]) | |
thePrice = theItem["price"] * quantity | |
totalPrice += thePrice | |
theBasket.append([theItem, quantity, thePrice]) | |
return flask.render_template("basket.html", | |
basket = theBasket, | |
total=totalPrice) | |
@app.route("/basket/payment", methods=["GET", "POST"]) | |
@login_required | |
def pay(): | |
""" | |
Fake paymeent. | |
YOU DO NOT NEED TO IMPLEMENT PAYMENT | |
""" | |
#Get the total cost | |
cost = flask.request.form.get("total") | |
#Fetch USer ID from Sssion | |
#-- | |
theQry = "Select * FROM User WHERE id = ?" | |
args = (flask.session["user"],) | |
theUser = query_db(theQry, args, one=True) | |
#-- | |
#Add products to the user | |
sessionBasket = flask.session.get("basket", None) | |
theDate = datetime.datetime.utcnow() | |
for key in sessionBasket: | |
#As we should have a trustworthy key in the basket. | |
#-- | |
theQry = "INSERT INTO PURCHASE (userID, productID, date) VALUES (?, ?, ?)" | |
args = (theUser['id'], key, theDate) | |
app.logger.debug(theQry) | |
write_db(theQry, args) | |
#-- | |
#Clear the Session | |
flask.session.pop("basket", None) | |
return flask.render_template("pay.html", | |
total=cost) | |
# --------------------------- | |
# HELPER FUNCTIONS | |
# --------------------------- | |
@app.route('/uploads/<name>') | |
def serve_image(name): | |
""" | |
Helper function to serve an uploaded image | |
""" | |
return flask.send_from_directory(app.config["UPLOAD_FOLDER"], name) | |
@app.route("/initdb") | |
def database_helper(): | |
""" | |
Helper / Debug Function to create the initial database | |
You are free to ignore scurity implications of this | |
""" | |
init_db() | |
return "Done" |