Skip to content
Permalink
main
Switch branches/tags

Name already in use

A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
Go to file
 
 
Cannot retrieve contributors at this time
import paho.mqtt.client as mqtt
from flask import Flask, request, session, redirect, url_for, render_template, flash
from werkzeug.security import generate_password_hash, check_password_hash
from models import User,db
from flask_cors import CORS
app = Flask(__name__)
CORS(home)
@home.after_request
def add_cors_headers(response):
response.headers['Access-Control-Allow-Origin'] = '*' # 允许所有来源的跨域请求
response.headers['Access-Control-Allow-Headers'] = 'Content-Type'
response.headers['Access-Control-Allow-Methods'] = 'POST, OPTIONS' # 允许POST和OPTIONS方法
return response
def rndColor():
'''random color'''
return (random.randint(32, 127), random.randint(32, 127), random.randint(32, 127))
def gene_text():
'''generate4verification code'''
return ''.join(random.sample(string.ascii_letters + string.digits, 4))
def draw_lines(draw, num, width, height):
'''draw a line'''
for num in range(num):
x1 = random.randint(0, width / 2)
y1 = random.randint(0, height / 2)
x2 = random.randint(0, width)
y2 = random.randint(height / 2, height)
draw.line(((x1, y1), (x2, y2)), fill='black', width=1)
def get_verify_code():
'''generateVerification code graphic'''
code = gene_text()
width, height = 120, 50
im = Image.new('RGB', (width, height), 'white')
font = ImageFont.truetype('app/static/images/fonts/arial.ttf', 40)
draw = ImageDraw.Draw(im)
for item in range(4):
draw.text((5 + random.randint(-3, 3) + 23 * item, 5 + random.randint(-3, 3)),
text=code[item], fill=rndColor(), font=font)
return im, code
@home.route('/code')
def get_code():
image, code = get_verify_code()
buf = BytesIO()
image.save(buf, 'jpeg')
buf_str = buf.getvalue()
response = make_response(buf_str)
response.headers['Content-Type'] = 'image/gif'
session['image'] = code
return response
broker = "broker.emqx.io"
port = 1883
topic_login = "home/login"
topic_register = "home/register"
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected successfully")
client.subscribe(topic_login)
client.subscribe(topic_register)
else:
print(f"Connect failed with code {rc}")
def on_message(client, userdata, msg):
payload = msg.payload.decode()
data = json.loads(payload)
if msg.topic == topic_login:
handle_login(data)
elif msg.topic == topic_register:
handle_register(data)
def handle_login(data):
username = data.get("username")
password = data.get("password")
user = User.query.filter_by(username=username).first()
if not user:
return {"status": "error", "message": "Username does not exist"}
if not check_password_hash(user.password, password):
return {"status": "error", "message": "Wrong password"}
session["user_id"] = user.id
session["username"] = user.username
return {"status": "success"}
def handle_register(data):
username = data.get("username")
email = data.get("email")
password = data.get("password")
phone = data.get("phone")
user = User(
username=username,
email=email,
password=generate_password_hash(password),
phone=phone
)
db.session.add(user)
db.session.commit()
return {"status": "success"}
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect(broker, port, 60)
client.loop_start()
@app.route("/login/", methods=["GET", "POST"])
def login():
"""
Log in
"""
if "user_id" in session:
return redirect(url_for("home.index"))
form = LoginForm()
if form.validate_on_submit():
data = form.data
client.publish(topic_login, json.dumps(data))
# Assume we receive the response somehow
response = handle_login(data) # This should ideally be handled asynchronously
if response["status"] == "error":
flash(response["message"], "err")
return render_template("home/login.html", form=form)
return redirect(url_for("home.index"))
return render_template("home/login.html", form=form)
@app.route("/register/", methods=["GET", "POST"])
def register():
"""
Registration function
"""
if "user_id" in session:
return redirect(url_for("home.index"))
form = RegisterForm()
if form.validate_on_submit():
data = form.data
client.publish(topic_register, json.dumps(data))
# Assume we receive the response somehow
response = handle_register(data) # This should ideally be handled asynchronously
if response["status"] == "error":
flash(response["message"], "err")
return render_template("home/register.html", form=form)
return redirect(url_for("home.login"))
return render_template("home/register.html", form=form)
if __name__ == "__main__":
app.run(debug=True)