Permalink
Cannot retrieve contributors at this time
Name already in use
A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
Qin-Xy-Auction/web.py
Go to fileThis commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
370 lines (329 sloc)
11.8 KB
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from flask import Flask, url_for, redirect, render_template, request, Response, stream_with_context,jsonify | |
from itsdangerous import TimedJSONWebSignatureSerializer as Serializer | |
import json | |
import os | |
import time | |
import codecs | |
from flask_cors import * | |
import spider | |
from pydub import AudioSegment | |
import lsqlite3 | |
import lmysql | |
app = Flask(__name__)# 音频拼接 | |
CORS(app, resources=r'/*')# r'/*' 是通配符,让本服务器所有的URL 都允许跨域请求 | |
databaseUse = 'sqlite3' | |
# databaseUse = 'mysql' | |
# 创建目录 | |
def mkdir(path): | |
# 引入模块 | |
import os | |
# 去除首位空格 | |
path=path.strip() | |
# 去除尾部 \ 符号 | |
path=path.rstrip("\\") | |
# 判断路径是否存在 | |
# 存在 True | |
# 不存在 False | |
isExists=os.path.exists(path) | |
# 判断结果 | |
if not isExists: | |
# 如果不存在则创建目录 | |
# 创建目录操作函数 | |
os.makedirs(path) | |
print(path+' 创建成功') | |
return True | |
else: | |
# 如果目录存在则不创建,并提示目录已存在 | |
print(path+' 目录已存在') | |
return False | |
# 中文翻译成日文 | |
@app.route("/fanyi",methods=['GET']) | |
def fanyi(): | |
#使用方法 | |
from googletrans import Translator | |
# name = request.args.get('name','erro') | |
translator = Translator(service_urls=['translate.google.cn']) | |
source = request.args.get('name','erro') | |
text = translator.translate(source,src='zh-cn',dest='ja').text | |
print(text) | |
translator = Translator() | |
print(translator.translate('星期日').text) | |
translator = Translator() | |
print(translator.translate('Sunday', dest='zh-CN').text) | |
return json.dumps({"data":text}) | |
# 翻译成英文 | |
@app.route("/fanyichengyingwen",methods=['GET']) | |
def fanyichengyingwen(): | |
#使用方法 | |
from googletrans import Translator | |
# name = request.args.get('name','erro') | |
translator = Translator(service_urls=['translate.google.cn']) | |
source = request.args.get('name','erro') | |
return json.dumps({"data":translator.translate(source).text}) | |
# 翻译成中文 | |
@app.route("/fanyichengzhongwen",methods=['GET']) | |
def fanyichengzhongwen(): | |
#使用方法 | |
from googletrans import Translator | |
# name = request.args.get('name','erro') | |
translator = Translator(service_urls=['translate.google.cn']) | |
source = request.args.get('name','erro') | |
return json.dumps({"data":translator.translate(source, dest='zh-CN').text}) | |
# 根据id修改数据 | |
@app.route("/updatedata",methods=['POST']) | |
def updatedata(): | |
data = request.get_data().decode('utf-8') | |
data = json.loads(data) | |
if databaseUse=='sqlite3': | |
lsqlite3.useSqliteUpdate(data) | |
return json.dumps({"msg":"save success"}) | |
if databaseUse=='mysql': | |
lmysql.useSqliteUpdate(data) | |
return json.dumps({"msg":"save success"}) | |
return json.dumps({"msg":"save success"}) | |
# 根据id删除数据 | |
@app.route("/deldata",methods=['GET']) | |
def deldata(): | |
table = request.args.get('table','erro') | |
database = request.args.get('database','erro') | |
apath = table | |
if databaseUse=='sqlite3': | |
lsqlite3.useSqliteDelete({"database":database,'table':table,"id":request.args.get('id','erro')}) | |
return json.dumps({"msg":"delete success"}) | |
if databaseUse=='mysql': | |
lmysql.useSqliteDelete({"database":database,'table':table,"id":request.args.get('id','erro')}) | |
return json.dumps({"msg":"delete success"}) | |
return json.dumps({"msg":"delete success"}) | |
# 上传数据 | |
@app.route("/savedata",methods=['POST']) | |
def savedata(): | |
data = request.get_data().decode('utf-8') | |
data = json.loads(data) | |
# for key in data: | |
# if key == 'database': | |
# pass | |
# elif key == 'table': | |
# pass | |
# else: | |
# # data[key]=enctry(data[key]) | |
# pass | |
data['creatTime']=time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time())) | |
print(data) | |
if databaseUse=='sqlite3': | |
lsqlite3.useSqliteInsert(data) | |
return json.dumps({"msg":"save success"}) | |
if databaseUse=='mysql': | |
lmysql.useSqliteInsert(data) | |
return json.dumps({"msg":"save success"}) | |
return json.dumps({"msg":"save success"}) | |
# 获取每一行数据 | |
@app.route("/getdata",methods=['GET']) | |
def getdata(): | |
table = request.args.get('table','erro') | |
database = request.args.get('database','erro') | |
if databaseUse=='sqlite3': | |
try: | |
return json.dumps({"msg":"获取成功","data":lsqlite3.useSqliteSelect(database,table)}) | |
pass | |
except Exception as e: | |
return json.dumps({"msg":"获取成功","data":[]}) | |
if databaseUse=='mysql': | |
try: | |
return json.dumps({"msg":"获取成功","data":lmysql.useSqliteSelect(database,table)}) | |
except Exception as e: | |
return json.dumps({"msg":"获取成功","data":[]}) | |
return json.dumps({"msg":"获取成功","data":[]}) | |
@app.route("/getdatabykey",methods=['POST']) | |
def getdatabykey(): | |
data = request.get_data().decode('utf-8') | |
# data = json.loads(data) | |
data = request.form.to_dict() | |
if databaseUse=='sqlite3': | |
try: | |
return json.dumps({"msg":"获取成功","data":lsqlite3.useSqliteSelectByKey(data)}) | |
pass | |
except Exception as e: | |
print('----------') | |
print(e) | |
return json.dumps({"msg":"获取成功","data":[]}) | |
if databaseUse=='mysql': | |
try: | |
return json.dumps({"msg":"获取成功","data":lmysql.useSqliteSelectByKey(data)}) | |
except Exception as e: | |
return json.dumps({"msg":"获取成功","data":[]}) | |
return json.dumps({"msg":"获取成功","data":[]}) | |
@app.route("/getdatabypage",methods=['POST']) | |
def getdatabypage(): | |
data = request.get_data().decode('utf-8') | |
# data = json.loads(data) | |
data = request.form.to_dict() | |
if databaseUse=='sqlite3': | |
try: | |
return json.dumps({"msg":"获取成功","data":lsqlite3.useSqliteSelectByPage(data),'total':lsqlite3.count(data)[0]['count(id)'],'nowPage':data["page"]}) | |
pass | |
except Exception as e: | |
print('----------') | |
print(e) | |
return json.dumps({"msg":"获取成功","data":[]}) | |
if databaseUse=='mysql': | |
try: | |
return json.dumps({"msg":"获取成功","data":lmysql.useSqliteSelectByPage(data)}) | |
except Exception as e: | |
return json.dumps({"msg":"获取成功","data":[]}) | |
return json.dumps({"msg":"获取成功","data":[]}) | |
# 获取所有数据库 | |
@app.route("/getalldatabase",methods=['GET']) | |
def getalldatabase(): | |
if databaseUse=='sqlite3': | |
alist = os.listdir('database') | |
blist = [] | |
for x in alist: | |
if 'sqlite3' in x: | |
blist.append(x) | |
pass | |
pass | |
return json.dumps({"msg":"获取成功","data":blist}) | |
return json.dumps({"msg":"获取成功"}) | |
# 获取数据库下所有表 | |
@app.route("/getalltable",methods=['GET']) | |
def getalltable(): | |
database = request.args.get('database','erro') | |
if databaseUse=='sqlite3': | |
return json.dumps({"msg":"获取成功","data":lsqlite3.useSqliteAllTable({"database":database})}) | |
return json.dumps({"msg":"获取成功"}) | |
# 查询表中所有的字段名 | |
@app.route("/getalltableDetail",methods=['GET']) | |
def getalltableDetail(): | |
table = request.args.get('table','erro') | |
database = request.args.get('database','erro') | |
if databaseUse=='sqlite3': | |
return json.dumps({"msg":"获取成功","data":lsqlite3.userSqliteTabelDetail({"table":table,"database":database})}) | |
return json.dumps({"msg":"获取成功"}) | |
# post请求 上传文件 | |
@app.route("/uploadfile",methods=['POST']) | |
def uploadfile(): | |
f = request.files['the_file'] | |
imgName = (time.strftime('%Y%m%d%H%M%S',time.localtime(time.time())))+f.filename | |
f.save(r'static/upload/' +imgName) | |
return json.dumps({"msg":'上传成功',"path":r'static/upload/' + imgName,"name":f.filename}) | |
# 下载合成成功的视频 | |
@app.route("/download.mp3",methods=['GET']) | |
def download_mp3(): | |
# 服务器上文件的位置 | |
vf = 'static/yinyue/result.mp3' | |
# 高级特性 生成器 | |
def generate(video_file): | |
f = open(video_file, 'rb') | |
while True: | |
chunk = f.read(512*1024) | |
if not chunk: | |
break | |
yield chunk | |
return Response( | |
stream_with_context(generate(vf)), | |
# 控制浏览器怎么识别下载的东西 | |
# mimetype='video/mp4', # 让浏览器用mp4播放 | |
mimetype='application/octet-stream', # 让浏览器直接下载 | |
headers={"Content-Length": os.path.getsize(vf)}) | |
@app.route("/") | |
def ladmin(): | |
return render_template('login.html') | |
@app.route("/<name>") | |
def hello(name): | |
return render_template('%s' %name) | |
@app.route('/favicon.ico') | |
def favicon(): | |
return app.send_static_file('favicon.ico') | |
# 根据类名和标签获取内容 | |
@app.route("/getarticle",methods=['GET']) | |
def getarticle(): | |
url = request.args.get('url','erro') | |
element = request.args.get('element','erro') | |
atable = request.args.get('atable','erro') | |
avalue = request.args.get('avalue','erro') | |
soup =spider.BeautifulSoup(spider.getHtmlContent(url)) | |
news_content = soup.find(element,attrs={atable:avalue}) | |
# print(news_content) | |
return json.dumps({"data":str(news_content)}) | |
pass | |
import requests | |
import random | |
import sys | |
@app.route("/shiping",methods=['GET']) | |
def shiping(): | |
data={'_': random.random(), | |
'user_id': request.args.get('uid'), | |
'course_id': request.args.get('cid'), | |
'lesson_location': int(request.args.get('shijian'))*60, | |
'session_time': '00:01:31', | |
'lesson_status': 'incomplete'} | |
print(data) | |
header = { | |
"Accept":"text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9", | |
"Accept-Encoding":"gzip, deflate", | |
"Accept-Language":"zh-CN,zh;q=0.9,en;q=0.8", | |
"Cache-Control":"no-cache", | |
"Connection":"keep-alive", | |
"Cookie":"dypxGLTHEME=; GLTHEME=blue; GLX_ACCOUNT=510125199212073513; ASP.NET_SessionId=sr3npcjsti2r4dr2jq2o2efc", | |
"Host":"rsj.deyang.gov.cn:81", | |
"Pragma":"no-cache", | |
"Referer":"http://rsj.deyang.gov.cn:81/dypx/Admin/Sysmgr/CourseMgr/CourseMgrUpload/zjwCourse/2019/dypx2019_18/study.html?userid=081616712AC44CEEA4F9D47A5B534099&courseid=13BF613CB58E48FB8955C66BB1E1E8F4&lessonlocation=0&lessonstatus=incomplete&returnurl=http://rsj.deyang.gov.cn/dypx/OnlineLearning/Chapter.aspx?kcid=1D9ACE87F9694265B9D3C8476DF412B4&kc_title=2019-2020%E5%B9%B4%E5%BA%A6%E2%80%9C%E4%BA%BA%E5%B7%A5%E6%99%BA%E8%83%BD%E4%B8%8E%E5%81%A5%E5%BA%B7%E2%80%9D%E5%85%AC%E9%9C%80%E7%A7%91%E7%9B%AE", | |
"Upgrade-Insecure-Requests":"1", | |
"User-Agent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.198 Safari/537.36", | |
} | |
r = requests.get('http://rsj.deyang.gov.cn:81/dypx/OnlineLearning/ZJWCourse/SaveCourseInfo.aspx',params=data,headers=header) | |
return json.dumps({"data":str(r)+'操作成功'}) | |
SECRET_KEY = '123456' | |
def generate_token(api_users): | |
expiration = 3600 | |
global SECRET_KEY | |
s = Serializer(SECRET_KEY, expires_in=expiration) #expiration是过期时间 | |
token = s.dumps(api_users).decode('ascii') | |
return token, expiration | |
@app.route('/getToken', methods=['GET']) | |
def getToken(): | |
ZhangHao = request.args.get('ZhangHao') | |
MiMa = request.args.get('MiMa') | |
user = {} | |
user['id']='1' | |
user['name']='张三' | |
token,expiration = generate_token(user) | |
return json.dumps({"msg":"获取成功","data":token}) | |
def verify_auth_token(token): | |
global SECRET_KEY | |
s = Serializer(SECRET_KEY) | |
try: | |
data = s.loads(token) | |
except Exception as e: | |
return {"msg":'身份校验失败或已过期'} # invalid token | |
user=data | |
user["msg"]='身份校验成功' | |
return user | |
@app.route('/token', methods=['GET', 'POST']) | |
def token(): | |
token = request.args.get('token') | |
_token = verify_auth_token(token) | |
return json.dumps({"msg":"获取成功","data":_token}) | |
# 加密 | |
def enctry(s): | |
k = 'djq5cu#-jeq15abg$z9_i#_w=$o88m!*alpbedlbat8cr74sd' | |
encry_str = "" | |
for i,j in zip(s,k): | |
# i为字符,j为秘钥字符 | |
temp = str(ord(i)+ord(j))+'_' # 加密字符 = 字符的Unicode码 + 秘钥的Unicode码 | |
encry_str = encry_str + temp | |
return encry_str | |
# 解密 | |
def dectry(p): | |
k = 'djq5cu#-jeq15abg$z9_i#_w=$o88m!*alpbedlbat8cr74sd' | |
dec_str = "" | |
for i,j in zip(p.split("_")[:-1],k): | |
# i 为加密字符,j为秘钥字符 | |
temp = chr(int(i) - ord(j)) # 解密字符 = (加密Unicode码字符 - 秘钥字符的Unicode码)的单字节字符 | |
dec_str = dec_str+temp | |
return dec_str | |
# 管理应用 | |
# http://localhost:8000/admin.html?database=admin&type=admin | |
if __name__ == '__main__': | |
# 绑定的地址0.0.0.0,表示任意ip都能访问 | |
app.run(host='0.0.0.0', port=8000,debug="True") |