Permalink
Cannot retrieve contributors at this time
Name already in use
A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
Chatbot/ad_geocode.py
Go to fileThis commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
536 lines (471 sloc)
23.7 KB
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Rapptz (2017) discord.py [online] available from | |
# <https://github.com/Rapptz/discord.py> [9 November 2018] | |
import discord | |
from discord.ext import commands | |
import asyncio | |
import json | |
# NLTK Project (2018) Natural Language Toolkit [online] available from | |
# <https://www.nltk.org>[10 November 2018] | |
from nltk.tokenize import word_tokenize | |
import requests | |
from ad_nltk import find_word_synonyms | |
# key and token are imported for security reasons | |
from ad_key import getAPI, getBotToken | |
import os | |
# Templates for event-driven codes and commands were obtained from: | |
# Kumara, L (2018) Making a Python Bot with Discord.py [online] available from | |
# <https://www.youtube.com/playlist?list=PLW3GfRiBCHOiEkjvQj0uaUB1Q-RckYnj9>[30 October 2018] | |
client = commands.Bot(command_prefix = '.') | |
TOKEN = getBotToken() | |
#list for all place_types that is deemed a valid input for the url | |
place_types = [ | |
"accounting", "airport", "amusement_park", "aquarium", "art_gallery", "atm", "bakery", "bank", "bar", | |
"beauty_salon", "bicycle_store", "book_store", "bowling_alley", "bus_station", "cafe", "campground", | |
"car_dealer", "car_rental", "car_repair", "car_wash", "casino", "cemetery", "church", "city_hall", | |
"clothing_store", "convenience_store", "courthouse", "dentist", "department_store", "doctor", "electrician", | |
"electronics_store", "embassy", "fire_station", "florist", "funeral_home", "furniture_store", "gas_station", "gym", | |
"hair_care", "hardware_store", "hindu_temple", "home_goods_store", "hospital", "insurance_agency", "jewelry_store", | |
"laundry", "lawyer", "library", "liquor_store", "local_government_office", "locksmith", "lodging", "meal_delivery", | |
"meal_takeaway", "mosque", "movie_rental", "movie_theater", "moving_company", "museum", "night_club", "painter", | |
"park", "parking", "pet_store", "pharmacy", "physiotherapist", "plumber", "police", "post_office", | |
"real_estate_agency", "restaurant", "roofing_contractor", "rv_park", "school", "shoe_store", "shopping_mall", "spa", | |
"stadium", "storage", "store", "subway_station", "supermarket", "synagogue", "taxi_stand", "train_station", | |
"transit_station", "travel_agency", "veterinary_care", "zoo" | |
] | |
#creating a second list from the place_types list which consist of synonyms of each item | |
counter = 0 | |
great_list = [] | |
for typ in place_types: | |
synonyms = find_word_synonyms(typ) #code imported from ad_nltk.py | |
counter += 1 | |
syn_list = [] | |
for synonym in synonyms: | |
f_synonym = synonym.replace("_", " ") #removes underscore to allow chatbot to better detect the words | |
syn_list.append(f_synonym) | |
great_list.append(list(set(syn_list))) #remove duplicants of values | |
print(great_list) | |
########################### | |
#Codes used to locate user# | |
########################### | |
async def start_locating(message): | |
"""Starts locating user by asking for the user's current location.""" | |
ask = "Can you give me the address of your current location? I would appreciate it if it's the full address." | |
await client.send_message(message.channel, ask) | |
location = await client.wait_for_message(author=message.author, channel=message.channel) | |
location_str = location.content | |
pending = "I'll try my best to find {} .".format(location_str) | |
await client.send_message(message.channel, pending) | |
await checking_address(message, location_str) | |
def locating(location_str): | |
""" | |
Takes in the address given by the user and format it to make it sensible in a url. | |
The output is the value of the dictionary key "results" in the json data requested. | |
""" | |
address = location_str.replace(" ", "+") | |
url = "https://maps.googleapis.com/maps/api/geocode/json?address=" + address + "&key=" + getAPI() | |
r = requests.get(url) | |
print(url) | |
location_json = r.json() #reading the json data | |
results = location_json["results"] | |
#if else statement used to check if the value of "results" is an empty list or not. | |
if results == [ ]: | |
results = 0 | |
return results | |
def confirmation_status(response): | |
"""Checks the response for wait_for_message coroutines and assign different "status" to each response.""" | |
if "yes" in response: | |
status = 1 | |
return status | |
elif "no" in response: | |
status = 0 | |
return status | |
else: | |
status = 2 | |
return status | |
async def confirming(message, status, location_str): | |
""" | |
Asks the user if the address found is correct. | |
Dump location data in a file assigned with the user's id if user has confirmed the address. | |
""" | |
if status == 1: | |
f = open("location_data_{}.txt".format(message.author.mention), "w") #creates a data file with author_id associated | |
json.dump(locating(location_str), f) | |
print("location_data_{}.txt created".format(message.author.mention)) | |
address = locating(location_str)[0] | |
formatted_address = address["formatted_address"] | |
geometry = address["geometry"] | |
location = geometry["location"] | |
lat = location["lat"] | |
lng = location["lng"] | |
results = [ | |
"Formatted address: {}".format(formatted_address), | |
"Location:", | |
"Latitude: {}".format(lat), | |
"Longitude: {}".format(lng) | |
] | |
#format information to display on discord text channel, method inspired by codes from bartek_nltk.py | |
msg = "```\n" | |
for result in results: | |
msg += result + "\n" | |
msg += "```" | |
await client.send_message(message.channel, "Location acquired, {}.\n".format(message.author.mention) + msg) | |
elif status == 0: | |
msg = "I'm sorry. Can I have your current address again?" | |
await client.send_message(message.channel, msg) | |
location_i = await client.wait_for_message(author=message.author, channel=message.channel) | |
location_i_str = location_i.content | |
pending = "I'll try my best to find {}.".format(location_i_str) | |
await client.send_message(message.channel, pending) | |
await checking_address(message, location_i_str) | |
elif status == 2: | |
await client.send_message(message.channel, "I don't understand that, please say 'yes' or 'no'.") | |
confirmation_i = await client.wait_for_message(author=message.author, channel=message.channel) | |
confirmation_str_i = word_tokenize(confirmation_i.content.lower()) | |
status_i = confirmation_status(confirmation_str_i) | |
location_i_str = location_str | |
await confirming(message, status_i, location_i_str) | |
async def checking_address(message, location_str): | |
""" | |
Checks if location_str returns a proper address via the locating(location_str) function. | |
Asks user for another address if the first one yields no results. | |
""" | |
results = locating(location_str) | |
if results == 0: #no results from the address given | |
await client.send_message(message.channel, "I don't think you've given me a real address.") | |
await client.send_message(message.channel, "Do you still want me to locate you?") | |
proceed = await client.wait_for_message(author=message.author, channel=message.channel) | |
proceed_str = word_tokenize(proceed.content.lower()) | |
status = confirmation_status(proceed_str) | |
#if else statement used to check if the user still wants to continue the procedure. | |
if status == 1: | |
await client.send_message(message.channel, "Give me your current address again.") | |
location = await client.wait_for_message(author=message.author, channel=message.channel) | |
location_str = location.content | |
pending = "I'll try my best to find {}.".format(location_str) | |
await client.send_message(message.channel, pending) | |
await checking_address(message, location_str) | |
elif status == 0: | |
await client.send_message(message.channel, "Type 'locate' again when you're serious about it.") | |
else: | |
await client.send_message(message.channel, "I don't think you're being serious.") | |
await client.send_message(message.channel, "Type 'locate' again when you're serious about it.") | |
else: | |
address = results[0] | |
formatted_address = address["formatted_address"] | |
question = "Are you talking about {} ?".format(formatted_address) | |
await client.send_message(message.channel, question) | |
confirmation = await client.wait_for_message(author=message.author, channel=message.channel) | |
confirmation_str = word_tokenize(confirmation.content.lower()) | |
status = confirmation_status(confirmation_str) | |
await confirming(message, status, location_str) #confirms with the user again | |
########################################## | |
#Codes used to delete existing data files# | |
########################################## | |
async def os_error_handling(message, reply): | |
"""Redirects users to the start_locating function if their location data wasn't stored.""" | |
status = confirmation_status(reply) | |
if status == 0: | |
await start_locating(message) | |
elif status == 1: | |
await client.say("I'm sorry. Something must have gone wrong.") | |
await start_locating(message) | |
else: | |
await client.say("I don't understand that, please say 'yes' or 'no'.") | |
c_reply = await client.wait_for_message(author=message.author, channel=message.channel) | |
c_reply_str = word_tokenize(c_reply.content.lower()) | |
await os_error_handling(message, c_reply_str) | |
@client.command(pass_context=True) | |
async def location_wipe(ctx): | |
"""Delete the location data file created.""" | |
print("Deleting location_data_{0.author.mention}.txt".format(ctx.message)) | |
try: | |
os.remove("location_data_{0.author.mention}.txt".format(ctx.message)) | |
await client.say("Your location data is deleted, {0.author.mention}.".format(ctx.message)) | |
except FileNotFoundError: #if location data of the author doesn't exist. | |
msg = "I couldn't find your location data. Have you given me your location, {0.author.mention}?".format(ctx.message) | |
await client.say(msg) | |
reply = await client.wait_for_message(author=ctx.message.author, channel=ctx.message.channel) | |
reply_str = reply.content | |
await os_error_handling(ctx.message, reply_str) | |
######################################## | |
#Codes used to search for nearby places# | |
######################################## | |
##following codes are slightly buggy, but it achieves most of what is expected. | |
async def start_searching(message, tokens): | |
"""Begin searching for nearby locations.""" | |
place_type_rlist = [] | |
place_type_rlist2 = [] | |
for token in tokens: | |
if token in place_types: | |
place_type_r = token | |
place_type_rlist.append(place_type_r) | |
else: | |
for (i, list) in enumerate(great_list): #enumerate() is used to retrieve the index of the item | |
for items in list: | |
if items in message.content.lower(): | |
place_type_r = place_types[i] | |
place_type_rlist.append(place_type_r) | |
if place_type_rlist: | |
for place_type_r in set(place_type_rlist): #remove duplicants of a certain value | |
place_type_rlist2.append(place_type_r) | |
elif place_type_rlist == []: #place_types not found in message | |
msg = "Please type in the place type(s) you want to look for." | |
await client.send_message(message.channel, msg) | |
start_str = await client.wait_for_message(author=message.author, channel=message.channel) | |
await start_searching(message, word_tokenize(start_str.content)) | |
print(place_type_rlist2) | |
await check_file(message, place_type_rlist2) | |
async def confirmation_to_locate(message, status): | |
"""Confirms if user want to continue to be located.""" | |
if status == 1: | |
await start_locating(message) | |
elif status == 0: | |
await client.send_message(message.channel, "Ok then.") | |
elif status == 2: | |
await client.send_message(message.channel, "I don't understand that, please say 'yes' or 'no'.") | |
confirmation = await client.wait_for_message(author=message.author, channel=message.channel) | |
confirmation_str = word_tokenize(confirmation.content.lower()) | |
status = confirmation_status(confirmation_str) | |
await confirmation_to_locate(message, status) | |
async def check_file(message, place_type_list): | |
""" | |
Checks if user's location data file exists. | |
Redirect user to create file if user's data file is not found. | |
""" | |
try: | |
f = open("location_data_{0.author.mention}.txt".format(message), "r").read() | |
f_json = json.loads(f) | |
c_address = f_json[0] | |
c_formatted_address = c_address["formatted_address"] | |
c_geometry = c_address["geometry"] | |
c_location = c_geometry["location"] | |
c_latitude = c_location["lat"] | |
c_longitude = c_location["lng"] | |
#assigning user's current location to url | |
url_base = "https://maps.googleapis.com/maps/api/place/nearbysearch/json?location=" | |
url_first_components = url_base + str(c_latitude) + "," + str(c_longitude) | |
print(url_first_components) | |
await check_params(message, url_first_components, place_type_list) | |
except FileNotFoundError: | |
msg = "I don't think your location data file has been created yet, {0.author.mention}.".format(message) | |
msg2 = msg + " Do you want me to locate you now?" | |
await client.send_message(message.channel, msg2) | |
confirmation = await client.wait_for_message(author=message.author, channel=message.channel) | |
confirmation_str = confirmation.content | |
status = confirmation_status(confirmation_str) | |
await confirmation_to_locate(message, status) | |
async def check_radius_str(message, token_list): | |
"""Checks if the input is an appropriate integer for the radius parameter.""" | |
for token in token_list: | |
if token.isdigit(): | |
if int(token) <= 50: | |
radius = int(token)*1000 | |
elif int(token) < 0: | |
radius = "not_suitable" | |
else: | |
await client.send_message(message.channel, "Maximum radius allowed is 50km.") | |
radius = "not_suitable" | |
else: | |
radius = "not_suitable" | |
return radius | |
async def radius_none(message): | |
"""Asks user repeatedly to provide a radius in the accepted format, or 'yes' or 'no'.""" | |
msg = "Please type in a postive integer for the radius parameter." | |
await client.send_message(message.channel, msg) | |
msg = "Prefer to use the default value for the radius parameter(5km)? Please say 'yes' or 'no'." | |
await client.send_message(message.channel, msg) | |
radius_reaffirm = await client.wait_for_message(author=message.author, channel=message.channel) | |
radius = await check_radius_str(message, word_tokenize(radius_reaffirm.content)) #check if radius is in correct format | |
if radius != "not_suitable": | |
pass | |
elif radius == "not_suitable": | |
status = confirmation_status(word_tokenize(radius_reaffirm.content)) | |
if status == 1: | |
radius = 5000 | |
elif status == 0: | |
radius = await radius_none(message) | |
elif status == 2: | |
await client.send_message(message.channel, "Please follow my previous instructions.") | |
radius_reaffirm2 = await client.wait_for_message(author=message.author, channel=message.channel) | |
radius = await check_radius_str(message, word_tokenize(radius_reaffirm2.content)) | |
if radius == "not_suitable": | |
radius = await radius_none(message) #function never get called for reasons unknown (bug) | |
return radius | |
async def check_radius(message, type): | |
""" | |
Asks the user if he/she would like to use the radius parameter. | |
Redirects user to other functions based on user's response. | |
""" | |
msg = "Would you like to search for " + type + " in a certain radius?" | |
await client.send_message(message.channel, msg) | |
radius_confirmation = await client.wait_for_message(author=message.author, channel=message.channel) | |
status = confirmation_status(radius_confirmation.content) | |
if status == 1: | |
msg = "Please type in the desired radius in terms of kilometres." | |
await client.send_message(message.channel, msg) | |
radius_reply = await client.wait_for_message(author=message.author, channel=message.channel) | |
radius = await check_radius_str(message, word_tokenize(radius_reply.content)) | |
if radius == "not_suitable": | |
radius = await radius_none(message) | |
elif status == 0: | |
msg = "The default value for the radius parameter(5km) will be used then." | |
await client.send_message(message.channel, msg) | |
radius = 5000 | |
elif status == 2: | |
radius = await radius_none(message) | |
return radius | |
async def check_keywords(message): | |
""" | |
Checks if user wants to use the keyword parameter. | |
Returns keyword if user responded 'yes'. | |
""" | |
msg = "Would you like to add keyword(s) to filter the results?" | |
await client.send_message(message.channel, msg) | |
key_word_c = await client.wait_for_message(author=message.author, channel=message.channel) | |
status = confirmation_status(key_word_c.content) | |
keyword = await keyword_confirm(message, status) | |
return keyword | |
async def keyword_confirm(message, status): | |
"""Process the response from the check_keywords function to determine if keyword parameter is required.""" | |
if status == 1: | |
msg = "Please type in the keyword(s)." | |
await client.send_message(message.channel, msg) | |
keywords = await client.wait_for_message(author=message.author, channel=message.channel) | |
keyword = keywords.content.replace(" ", "+") #formatting the keywords for the url | |
elif status == 0: | |
keyword = "not_desired" | |
elif status == 2: | |
msg = "Prefer to not use the keyword parameter? Please say 'yes' or 'no'." | |
await client.send_message(message.channel, msg) | |
key_word_c = await client.wait_for_message(author=message.author, channel=message.channel) | |
status = confirmation_status(key_word_c.content) | |
keyword = await keyword_confirm(message, status) | |
return keyword | |
async def url_formatting(url, type, radius, keyword): | |
"""Format the values gathered to form a valid url.""" | |
url_second = url + "&radius=" + str(radius) + "&type=" + type | |
if keyword == "not_desired": | |
url_final = url_second + "&key=" + getAPI() | |
else: | |
url_final = url_second + "&keyword=" + str(keyword) + "&key=" + getAPI() | |
return url_final | |
async def check_params(message, url, place_types_detected): | |
""" | |
Works as an 'umbrella' function which nests other functions that checks specific parameters. | |
Checks if the place_types detected is more than 3. | |
Then limits the user to choosing only three place_types. | |
""" | |
if len(place_types_detected) == 1: | |
type = place_types_detected[0] | |
radius = await check_radius(message, type) | |
keywords = await check_keywords(message) | |
url_final = await url_formatting(url, type, radius, keywords) | |
print(url_final) | |
await fetch_results(message, url_final) | |
elif len(place_types_detected) < 4: | |
type = place_types_detected | |
type_str = ", ".join(type) | |
print (type_str) | |
radius = await check_radius(message, type_str) | |
keywords = await check_keywords(message) | |
url_list = [] | |
for type_i in type: | |
url_i = await url_formatting(url, type_i, radius, keywords) | |
url_list.append(url_i) | |
print(url_list) | |
num = 1 #used to separate results of different place_types | |
for url in url_list: | |
await client.send_message(message.channel, num) | |
num += 1 | |
await fetch_results(message, url) | |
else: | |
msg = "Please limit the number of types of establishment to 3." | |
await client.send_message(message.channel, msg) | |
await start_searching(message, tokens) | |
async def fetch_results(message, url): | |
"""Uses the url formed by other functions to request data of nearby locations.""" | |
r = requests.get(url) | |
places_json = r.json() | |
results = places_json['results'] #assign results to the results dictionary | |
if results == [ ]: | |
await client.send_message(message.channel, "Your request has yielded no results.") | |
else: | |
result_list = [] | |
#enumerate() to retain index number in each iteration | |
for (i, result) in enumerate(results): | |
list = [] | |
result = results[i] | |
name = result['name'] | |
list.append(name) | |
try: | |
if result['opening_hours']: | |
opening_hours = result['opening_hours'] | |
if opening_hours == { }: | |
open_now = "Info not available" | |
elif opening_hours['open_now']: | |
open_now = str(opening_hours['open_now']) | |
list.append(open_now) | |
except KeyError: | |
open_now = "Info not available" | |
list.append(open_now) | |
if result['vicinity']: #if vicinity is a valid key in the results dictionary | |
vicinity = result['vicinity'] | |
list.append(vicinity) | |
else: | |
vicinity = "Info not available" | |
list.append(vicinity) | |
result_list.append(list) | |
print(list) | |
if len(result_list) == 1: #different response based on number of places. | |
await client.send_message(message.channel, "This is the place that I've found.") | |
else: | |
await client.send_message(message.channel, "These are the places that I've found.") | |
for list in result_list: | |
if list[0]: | |
name = list[0] | |
else: | |
name = "Info not available" | |
open_now = list[1] #open_now has been 'pruned' beforehand | |
try: | |
vicinity = list[2] | |
except IndexError: | |
vicinity = list[1] | |
results = [ | |
"Name: " + name, | |
"Open now: " + str(open_now), | |
"Vicinity: " + vicinity | |
] | |
#Method inspired by codes in bartek_nltk.py | |
msg = "```\n" | |
for result in results: | |
msg += result + "\n" | |
msg += "```" | |
await client.send_message(message.channel, msg) | |
@client.event | |
async def on_message(message): | |
msg = word_tokenize(message.content.lower()) #tokenize sentences to make certain words more detectable. | |
print(msg) | |
if message.author == client.user: | |
return | |
if "locate" in msg or "location" in msg: | |
await start_locating(message) | |
if "find" in msg or "search" in msg: | |
await start_searching(message, msg) | |
await client.process_commands(message) | |
######################################## | |
#Other commands and event-driven codes # | |
######################################## | |
@client.command() | |
async def logout(): | |
"""Logs chatbot out.""" | |
print("Logging out bot") | |
await client.say("See you later, loser!") | |
await client.close() | |
@client.event | |
async def on_ready(): | |
"""Shows the name and id of the chatbot when it is ready.""" | |
print('Logged in as') | |
print(client.user.name) | |
print(client.user.id) | |
print('------') | |
client.run(TOKEN) |