Permalink
Cannot retrieve contributors at this time
Name already in use
A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
tinyl-shortener/__init__.py
Go to fileThis commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
191 lines (155 sloc)
7.58 KB
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from datetime import datetime, timedelta | |
from enum import unique | |
import requests, socket, json, validators, whois, hashlib, random | |
from flask import Flask, request, session, render_template, redirect | |
from flask_sqlalchemy import SQLAlchemy | |
app = Flask(__name__) | |
app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///tinyl.db" | |
db = SQLAlchemy(app) | |
class Links(db.Model): | |
id = db.Column(db.Integer, primary_key=True) | |
link = db.Column(db.String(500), nullable=False) | |
uhash = db.Column(db.String(50), nullable=False) | |
shortu = db.Column(db.String(20), nullable=False, unique=True) | |
class Report(db.Model): | |
id = db.Column(db.Integer, primary_key=True) | |
link = db.Column(db.String(500), nullable=False) | |
reason = db.Column(db.String(700), nullable=False) | |
@app.route("/<path>",methods=["GET"]) | |
def redir(path): | |
if len(path) == 7 and path.isalnum(): | |
check = db.session.query(Links).filter_by(shortu=path).first() | |
googleData = { | |
"client": { | |
"clientId": "yourcompanyname", | |
"clientVersion": "1.5.2" | |
}, | |
"threatInfo": { | |
"threatTypes": ["MALWARE", "SOCIAL_ENGINEERING"], | |
"platformTypes": ["WINDOWS"], | |
"threatEntryTypes": ["URL"], | |
"threatEntries": [ | |
{"url": str(check.link)} | |
] | |
} | |
} | |
r = json.loads(requests.post("https://safebrowsing.googleapis.com/v4/threatMatches:find?key=AIzaSyA-PgdUPyqMLxmJ1B7aolYV4i9CkPvujGw", json=googleData).text) | |
if r == {}: | |
pass | |
elif r["matches"][0]["threatType"] == "SOCIAL_ENGINEERING": | |
return "This link has been blocked for abuse" | |
return redirect(str(check.link)) | |
@app.route("/", methods=["GET","POST"]) | |
def hello(): | |
if request.method == "GET": | |
return render_template("index.html") | |
if request.method == "POST" and "url" in request.form: | |
url = str(request.form["url"]) | |
if validators.url(url): | |
uhash = hashlib.md5(str.encode(url)).hexdigest() | |
try: | |
check = db.session.query(Report).filter_by(uhash=uhash) | |
return "found" | |
except Exception as e: | |
print("Exception occured: " + str(e)) | |
record = Links(link=url,uhash=uhash,shortu="nullz") ## mentioned twice so i can capture the ID and use it to generate short url before adding to db | |
db.session.add(record) | |
db.session.commit() | |
shortu = hashlib.md5(str.encode(str(record.id))).hexdigest()[0:7] | |
update = db.session.query(Links).filter_by(shortu="nullz").first() | |
update.shortu = shortu | |
db.session.commit() | |
return "Successfully shortened your URL: https://tinyl.uk/"+shortu | |
else: | |
return "Invalid URL" | |
@app.route("/check", methods=["GET","POST"]) | |
def check(): | |
resp = '<h1 class="resp">' | |
if request.method == "GET": | |
return render_template("check.html") | |
if request.method== "POST" and "url" in request.form: | |
url = str(request.form["url"]) | |
## INPUT VALIDATION ## | |
if validators.url(url): | |
try: | |
if "https://" in url.lower(): | |
domain = url.lower().replace("https://","").split("/")[0] | |
if "http://" in url.lower(): | |
domain = url.lower().replace("http://","").split("/")[0] | |
except Exception as e: | |
print("Erorr occured: " + str(e)) | |
return redirect("/error1") | |
pass | |
else: | |
return render_template("check.html",second=resp+"Invalid URL</h1>") | |
## domain active check ## | |
try: | |
print(domain) | |
domainip = socket.gethostbyname(domain) | |
except Exception as e: | |
print(str(e)) | |
return render_template("check.html",second=resp+ " The provided domain does not resolve correctly</h1>") | |
## db check ## | |
try: | |
check = db.session.query(Report).filter_by(link=url) | |
for row in check: | |
return render_template("check.html",second=resp+ row.reason + " was found in the databases</h1>") | |
pass | |
except Exception as e: | |
print("Exception occured on DB lookup: " + str(e)) | |
pass | |
## GOOGLE SAFEBRBROWSING CHECK ## | |
googleData = { | |
"client": { | |
"clientId": "yourcompanyname", | |
"clientVersion": "1.5.2" | |
}, | |
"threatInfo": { | |
"threatTypes": ["MALWARE", "SOCIAL_ENGINEERING"], | |
"platformTypes": ["WINDOWS"], | |
"threatEntryTypes": ["URL"], | |
"threatEntries": [ | |
{"url": url} | |
] | |
} | |
} | |
r = json.loads(requests.post("https://safebrowsing.googleapis.com/v4/threatMatches:find?key=AIzaSyA-PgdUPyqMLxmJ1B7aolYV4i9CkPvujGw", json=googleData).text) | |
if r == {}: | |
pass | |
elif r["matches"][0]["threatType"] == "SOCIAL_ENGINEERING": | |
print("Adding to database") | |
try: | |
record = Report(link=str(url),reason="This url was previously listed in the google safebrowsing database for phishing") | |
db.session.add(record) | |
db.session.commit() | |
app.logger.info(record.id) | |
return render_template("check.html",second=resp+"This URL was listed in google saferbrowsing database for phishng</h1>") | |
except Exception as e: | |
print("error with db" + str(e)) | |
return "Error db" | |
else: | |
return "Error 2" | |
## DNS CHECK AND DOMAIN REP CHECK ## | |
domainWhois = whois.whois(domain) | |
domainInfo = whois.whois(domain).creation_date | |
past = datetime.now() - timedelta(days=7) | |
if str(type(domainInfo)) == "<class 'NoneType'>": | |
past = 1 | |
date = 1 | |
print("The domain creation date was not found") | |
elif str(type(domainInfo)) == "<class 'list'>": | |
date = domainInfo[0] | |
else: | |
date = domainInfo | |
if past < date: | |
record = Report(link=str(url),reason="This url was recently purchased and may contain malicious content, be wary when continuing.") | |
db.session.add(record) | |
db.session.commit() | |
app.logger.info(record.id) | |
return render_template("check.html",second=resp+"This url was recently purchased and may contain malicious content, be wary when continuing.</h1>") | |
else: | |
pass | |
## URL PHISHING SIMILARITY ## | |
return render_template("check.html",second=resp+"This URL has been checked and is clean so far, you should still take caution if you are unsure submit a manual request.</h1>") | |
if __name__ == "__main__": | |
app.run() | |