Permalink
Cannot retrieve contributors at this time
Name already in use
A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
final-year-project/main.py
Go to fileThis commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
419 lines (322 sloc)
12.6 KB
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import discord | |
from dotenv import load_dotenv | |
from discord.ext import commands | |
import nltk | |
nltk.download('punkt') | |
from nltk.stem.lancaster import LancasterStemmer | |
stemmer = LancasterStemmer() | |
import numpy | |
import tflearn | |
import tensorflow | |
import random | |
import json | |
import pickle | |
import time | |
#should learn | |
pre_learn = False | |
learn = False | |
unknown_input = '' | |
#joke | |
pre_notFunny = False | |
notFunny = False | |
notFunnyAtAll = False | |
newJoke = "" | |
#covid knowledge | |
covid_learn = False | |
confirmation = False | |
isQuestion = False | |
isAnswer = False | |
question = '' | |
answer = '' | |
again = False | |
#connect discord | |
load_dotenv() | |
TOKEN = os.getenv('DISCORD_TOKEN') | |
GUILD = os.getenv('DISCORD_GUILD') | |
client = discord.Client() | |
bot = commands.Bot(command_prefix='!') | |
@bot.event | |
async def on_ready(): | |
print(f'{bot.user.name} has connected to Discord!') | |
guild = discord.utils.get(bot.guilds, name=GUILD) | |
print( | |
f'{bot.user} is connected to the following guild:\n' | |
f'{guild.name}(id: {guild.id})' | |
) | |
members = '\n - '.join([member.name for member in guild.members]) | |
print(f'Guild Members:\n - {members}') | |
@bot.event | |
async def on_member_join(member): | |
await member.create_dm() | |
await member.dm_channel.send( | |
f'Hi {member.name}, how can I the COVID_BOT help you today?' | |
) | |
#open the data file | |
with open("intents.json") as file: | |
data = json.load(file) | |
try: | |
with open("data_new.pickle", "rb") as f: | |
words, labels, training, output = pickle.load(f) | |
except: | |
words = [] | |
labels = [] | |
docs_x = [] | |
docs_y = [] | |
for intent in data["intents"]: | |
for pattern in intent["patterns"]: | |
#analyze the words | |
wrds = nltk.word_tokenize(pattern) | |
words.extend(wrds) | |
docs_x.append(wrds) | |
docs_y.append(intent["tag"]) | |
if intent["tag"] not in labels: | |
labels.append(intent["tag"]) | |
words = [stemmer.stem(w.lower()) for w in words if w != "?"] | |
words = sorted(list(set(words))) | |
labels = sorted(labels) | |
training = [] | |
output = [] | |
out_empty = [0 for _ in range(len(labels))] | |
for x, doc in enumerate(docs_x): | |
bag = [] | |
wrds = [stemmer.stem(w) for w in doc] | |
for w in words: | |
if w in wrds: | |
bag.append(1) | |
else: | |
bag.append(0) | |
output_row = out_empty[:] | |
output_row[labels.index(docs_y[x])] = 1 | |
training.append(bag) | |
output.append(output_row) | |
training = numpy.array(training) | |
output = numpy.array(output) | |
with open("data_new.pickle", "wb") as f: | |
pickle.dump((words, labels, training, output), f) | |
tensorflow.compat.v1.reset_default_graph() | |
net = tflearn.input_data(shape=[None, len(training[0])]) | |
net = tflearn.fully_connected(net, 8) | |
net = tflearn.fully_connected(net, 8) | |
net = tflearn.fully_connected(net, len(output[0]), activation="softmax") | |
net = tflearn.regression(net) | |
model = tflearn.DNN(net) | |
model.fit(training, output, n_epoch=1000, batch_size=9, show_metric=True) | |
model.save("model_new.tflearn") | |
def bag_of_words(s, words): | |
bag = [0 for _ in range(len(words))] | |
s_words = nltk.word_tokenize(s) | |
s_words = [stemmer.stem(word.lower()) for word in s_words] | |
for sen in s_words: | |
for i, w in enumerate(words): | |
if w == sen: | |
bag[i] = 1 | |
return numpy.array(bag) | |
def chat(inp): | |
global pre_learn | |
global learn | |
global unknown_input | |
while True: | |
if inp.lower() == "bye": | |
break | |
results = model.predict([bag_of_words(inp, words)])[0] | |
results_index = numpy.argmax(results) | |
tag = labels[results_index] | |
if results[results_index] > 0.9: | |
for tg in data["intents"]: | |
if tg["tag"] == tag: | |
responses = tg["responses"] | |
else: | |
unknown_input = inp | |
pre_learn = True | |
return ["Ohh snap, I'm not sure I understand that. :man_facepalming: \nCould you please try to explain it to me so I can learn? \nJust type 'yes' and I will start recording.", "learning"] | |
return [random.choice(responses), tag] | |
@bot.event | |
async def on_message(message): | |
if message.author == bot.user: | |
return | |
global pre_learn | |
global learn | |
global unknown_input | |
global pre_notFunny | |
global notFunny | |
global notFunnyAtAll | |
global newJoke | |
global covid_learn | |
global isQuestion | |
global isAnswer | |
global question | |
global answer | |
global confirmation | |
global again | |
if pre_learn == True and message.content == 'yes': | |
learn = True | |
time.sleep(1.5) | |
await message.channel.send("Great! Teach me master! :star_struck:") | |
time.sleep(1.5) | |
await message.channel.send("Please write a brief explanation of you question or sentence. It will be recorded by the bot and reviewed by the Creator. Thank you!") | |
time.sleep(1.5) | |
await message.channel.send("Type 'quit' to exit study mode.") | |
elif pre_notFunny == True and message.content == 'yes': | |
notFunny = True | |
response = chat("joke") | |
await message.channel.send("How about...") | |
time.sleep(1.5) | |
await message.channel.send(response[0]) | |
time.sleep(1.5) | |
await message.channel.send("\n \nFunny, yes?") | |
elif notFunny == True: | |
response = chat(message.content) | |
if message.content == 'no' or response[1] == 'not_a_cool_joke': | |
notFunnyAtAll = True | |
notFunny = False | |
pre_notFunny = False | |
time.sleep(1.5) | |
await message.channel.send("In this case, let's see what you have! Tell me a joke and I will remember it :wink:") | |
time.sleep(1.5) | |
await message.channel.send("You can also type 'Not this time' if you don't want to do that.") | |
elif notFunnyAtAll == True and message.content != "not this time": | |
pre_notFunny = False | |
notFunny = False | |
notFunnyAtAll = False | |
newJoke = message.content | |
with open('intents.json') as file: | |
data = json.load(file) | |
for intent in data["intents"]: | |
if intent["tag"] == "new_jokes": | |
intent["patterns"].append("joke") | |
intent["responses"].append(message.content) | |
with open('intents.json', 'w') as f: | |
json.dump(data, f, ensure_ascii=False) | |
time.sleep(1.5) | |
await message.channel.send("Ha-ha-ha, funny. :rofl: :rofl: :rofl:") | |
time.sleep(1.5) | |
await message.channel.send("I will use that later, thank you!") | |
elif (pre_notFunny == True and (message.content == "no" or message.content == "nah")) or (covid_learn == True and (message.content == "no" or message.content == "nah")) or (learn == True and (message.content == "no" or message.content == "nah")): | |
pre_notFunny = False | |
notFunny = False | |
notFunnyAtAll = False | |
covid_learn = False | |
confirmation = False | |
isAnswer = False | |
isQuestion = False | |
time.sleep(1.5) | |
await message.channel.send("Mmm, okay probably next time! :man_shrugging: ") | |
elif notFunnyAtAll == True and message.content == "not this time": | |
pre_notFunny = False | |
notFunny = False | |
notFunnyAtAll = False | |
time.sleep(1.5) | |
await message.channel.send("Mmm, okay probably next time! :man_shrugging: ") | |
elif learn == True and message.content != 'quit': | |
pre_learn = False | |
with open('intents.json') as file: | |
data = json.load(file) | |
for intent in data["intents"]: | |
if intent["tag"] == "learning": | |
intent["patterns"].append(unknown_input) | |
intent["responses"].append(message.content) | |
with open('intents.json', 'w') as f: | |
json.dump(data, f, ensure_ascii=False) | |
elif message.content == 'add info': | |
covid_learn = True | |
time.sleep(1.5) | |
await message.channel.send("Amazing. I love to learn. Are you ready to share the information?") | |
elif covid_learn == True and message.content == "yes": | |
isQuestion = True | |
covid_learn = False | |
time.sleep(1.5) | |
await message.channel.send("Okay cool. Let's start with the question you have answer to so I can record it. Whenever you want just type it and send it to me.") | |
time.sleep(1.5) | |
await message.channel.send("I am waiting.") | |
time.sleep(1.5) | |
await message.channel.send("*Giggling Exactly* :face_with_hand_over_mouth: ") | |
elif confirmation == False and isQuestion == True and message.content != '': | |
question = message.content | |
confirmation = True | |
time.sleep(1.5) | |
await message.channel.send("So the question is: \n" + question + "\n \n Yes or No?") | |
elif confirmation == True and isQuestion == True and message.content == "yes": | |
confirmation = False | |
isQuestion = False | |
isAnswer = True | |
time.sleep(1.5) | |
await message.channel.send("Great! Let's continue with the answer.") | |
time.sleep(1.5) | |
await message.channel.send("If you don't have an answer that's okay.") | |
time.sleep(1.5) | |
await message.channel.send("I will find it later and learn it. Just type 'I don't know the answer'") | |
elif confirmation == True and isQuestion == True and message.content == 'no': | |
again = True | |
confirmation = False | |
time.sleep(1.5) | |
await message.channel.send("Let's try again. I'm ready to record.") | |
elif confirmation == False and isAnswer == True and message.content != "": | |
confirmation = True | |
answer = message.content | |
time.sleep(1.5) | |
await message.channel.send("So the answer is: \n" + answer + "\n \n Yes or No?") | |
elif confirmation == True and isAnswer == True and message.content == "yes": | |
confirmation = False | |
isAnswer = False | |
with open('intents.json') as file: | |
data = json.load(file) | |
for intent in data["intents"]: | |
if intent["tag"] == "covid_knowledge": | |
intent["patterns"].append(question) | |
intent["responses"].append(answer) | |
with open('intents.json', 'w') as f: | |
json.dump(data, f, ensure_ascii=False) | |
time.sleep(1.5) | |
await message.channel.send("Hooray! :partying_face: :partying_face: :partying_face: New knowledge. Wicked! :metal:") | |
time.sleep(1.5) | |
await message.channel.send("Thank you for that. It will be soon part of my permenent knowledge.") | |
time.sleep(1.5) | |
await message.channel.send("Now it is under review. My creator is kind of untrusty :face_with_monocle:") | |
elif confirmation == True and isAnswer == True and message.content == 'no': | |
again = True | |
confirmation = False | |
await message.channel.send("Let's try again. I'm ready to record.") | |
elif message.content == 'quit': | |
learn = False | |
pre_learn = False | |
await message.channel.send("That was a great lesson, thank you!") | |
elif message.content != '': | |
pre_learn = False | |
learn = False | |
unknown_input = False | |
pre_notFunny = False | |
notFunny = False | |
notFunnyAtAll = False | |
newJoke = False | |
covid_learn = False | |
isQuestion = False | |
isAnswer = False | |
confirmation = False | |
again = False | |
response = chat(message.content) | |
if response[1] == 'joke': | |
await message.channel.send("Let me try. \n \n") | |
time.sleep(1.5) | |
await message.channel.send(response[0]) | |
elif response[1] == 'not_a_cool_joke': | |
pre_notFunny = True | |
await message.channel.send(response[0]) | |
time.sleep(1.5) | |
await message.channel.send("Would you like me to try again?") | |
elif response[1] == 'covid_prevention' or response[1] == 'covid_covering' or response[1] == 'covid_treatment' or response[1] == 'covid_sanitizer' or response[1] == 'covid_cigarettes': | |
await message.channel.send("Hmm. That's a good question, let me try to answer it. :robot: \n") | |
time.sleep(2.5) | |
await message.channel.send(response[0]) | |
else: | |
time.sleep(1.5) | |
await message.channel.send(response[0]) | |
elif message.content == 'raise-exception': | |
raise discord.DiscordException | |
async def on_error(event, *args, **kwargs): | |
with open('err.log', 'a') as f: | |
if event == 'on_message': | |
f.write(f'Unhandled message: {args[0]}\n') | |
else: | |
raise | |
bot.run(TOKEN) |