Permalink
Cannot retrieve contributors at this time
Name already in use
A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
chatbot/imageSearchFunctionsFile.py
Go to fileThis commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
241 lines (214 sloc)
13.6 KB
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from discord.ext import commands | |
from botClient import colorCode, weatherKey, timeKey, dictionaryId, dictionaryKey | |
import random | |
from pprint import pprint | |
import discord | |
import aiohttp | |
import json | |
import urllib | |
import urllib.parse | |
"""Module for the image board post search by tags function""" | |
class ImageSearch: | |
def __init__(self, client): | |
self.client = client | |
@commands.command(name='animage', aliases=['konachan', 'kona', 'konasfw', 'konas', | |
'konachanh', 'konah', 'konansfw', 'konachannsfw', 'konar18', #NSFW | |
'danbooru', 'danb', 'donmai', 'animepics', | |
'danbooruh', 'danbh', 'donmaih', 'danbnsfw', #NSFW | |
'yan','yandere', 'yans', 'yansfw', | |
'yanh', 'yandereh', 'yanderensfw', 'yannsfw']) #NSFW | |
@commands.cooldown(2, 4, commands.BucketType.user) | |
async def animeboard_post(self, ctx, f_arg=" ", *args): | |
"""Searches for an image on Konachan, Danbooru or Yande.re""" | |
try: | |
await ctx.message.delete() | |
except discord.HTTPException: | |
pass | |
if ctx.message.guild and ctx.author.nick: | |
userName = ctx.author.nick | |
else: | |
userName = ctx.author.name | |
commandUsed = ctx.invoked_with | |
ratingText = {'s': 'Safe', 'q': 'Questionable', 'e': 'Explicit'} | |
"""Here we check which command the user inputted so that we know which image board to get the post from | |
as well as what kind of image. Every element of the dictionary is used to change the API request, so that we | |
don't have to make different functions for each site.""" | |
if commandUsed in ['konachan', 'kona', 'konasfw', 'konas', | |
'konachanh', 'konah', 'konansfw', 'konachannsfw', 'konar18']: #KONA SFW | |
post = {'board': 'Konachan', 'url': 'https://konachan.net', 'rating': 'sfw', 'height': 'jpeg_height', | |
'width': 'jpeg_width', 'color': 0, 'tagcount': 'count', | |
'urltags': 'https://konachan.net/tag/index.json?name={}', | |
'idurl': 'https://konachan.com/post/show/'} | |
if commandUsed in ['konachanh', 'konah', 'konansfw', 'konachannsfw', 'konar18']: #KONA NSFW | |
post['rating'] = 'nsfw' | |
post['color'] = 3 | |
elif commandUsed in ['danbooru', 'danb', 'donmai', 'animepics', 'animeboard_post', 'animage', | |
'danbooruh', 'danbh', 'donmaih', 'danbnsfw']: #DANB SFW | |
post = {'board': 'Danbooru', 'url': 'https://danbooru.donmai.us', 'rating': 'sfw', 'height': 'height', | |
'width': 'width', 'color': 0, 'tagcount': 'post_count', | |
'urltags': 'https://danbooru.donmai.us/tags.json?search[name_matches]=*{}*', | |
'idurl': 'https://danbooru.donmai.us/posts/'} | |
if commandUsed in ['danbooruh', 'danbh', 'donmaih', 'danbnsfw']: #DANB NSFW | |
post['rating'] = 'nsfw' | |
post['color'] = 3 | |
elif commandUsed in ['yan', 'yandere', 'yans', 'yansfw', 'yanh', 'yandereh', 'yanderensfw', 'yannsfw']: | |
post = {'board': 'Yandere', 'url': 'https://yande.re', 'rating': 'sfw', 'height': 'height', | |
'width': 'width', 'color': 0, 'tagcount': 'post_count', | |
'urltags': 'https://yande.re/tag/index.json?name={}', | |
'idurl': 'https://yande.re/post/show/'} | |
if commandUsed in ['yanh', 'yandereh', 'yanderensfw', 'yannsfw']: | |
post['rating'] = 'nsfw' | |
post['color'] = 3 | |
"""Checks if the channel in which the nsfw command is called is a NSFW channel or not | |
NSFW commands can only be called in NSFW channels, or in private""" | |
if post['rating'] == 'nsfw': | |
if ctx.message.guild: | |
if not ctx.message.channel.is_nsfw(): | |
await ctx.send('You can only use this command in NSFW channels.') | |
return | |
"""Combines tags separated by whitespace, and also converts them to URL if it's necessary for special characters | |
""" | |
user_tag = f_arg | |
if f_arg != " ": | |
for arg in args: | |
user_tag = str(user_tag) + "_" + str(arg) | |
urllib.parse.quote(user_tag) | |
async with aiohttp.ClientSession() as session: | |
"""Command for tags search instead of posts""" | |
try: | |
if f_arg == "tag" or f_arg == "tags": | |
user_tag = user_tag.split('_', 1)[1] | |
async with session.get(post['urltags'].format(user_tag)) as tagreq: | |
if tagreq.status == 200: | |
tagresp = json.loads(await tagreq.text()) | |
tagResultsCount = len(tagresp) | |
if tagResultsCount < 1: # if no similar tags found | |
embed = discord.Embed( | |
title=f"{userName}'s tag search on {post['board']} for ``{user_tag}``", | |
description=f'No similar tags found for: "{user_tag}".', | |
color=colorCode[2]) | |
await ctx.send(embed=embed) | |
return | |
"""Sorts tags by popularity/number of posts""" | |
tagresp = sorted(tagresp, key=lambda x: x[post['tagcount']], reverse=True) | |
tagsList = f"``{tagresp[0]['name']}`` - {tagresp[0][post['tagcount']]} post(s)\n" | |
"""Try to get the top 5 most relevant and popular tags""" | |
for i in range(1, 5): | |
try: | |
tagsList += f"``{tagresp[i]['name']}`` - {tagresp[i][post['tagcount']]} post(s)\n" | |
except IndexError: | |
break | |
embed = discord.Embed( | |
title=f"{userName}'s tag search on {post['board']} for {user_tag}", | |
description=f'**Similar tags:**\n' | |
f'{tagsList}', | |
color=colorCode[1] | |
) | |
embed.set_footer(text='Note: some tags may be empty (have no posts) ' | |
'contrary to what post count shows.') | |
await ctx.send(embed=embed) | |
return | |
else: | |
print(f'Tags request failed: {tagreq.status}, {tagreq.reason}') | |
embed = discord.Embed( | |
title=f"{userName}'s tag search on {post['board']} for ``{user_tag}``", | |
description=f'Tags request failed: {tagreq.status}) found for: "{user_tag}".', | |
color=colorCode[2]) | |
await ctx.send(embed=embed) | |
return | |
except IndexError: #if no tags are given | |
await ctx.send('You must input the tags you are searching for.', delete_after=5) | |
return | |
async with session.get(f'{post["url"]}/post/index.json?tags={user_tag}&limit=100') as req: | |
if req.status == 200: | |
resp = json.loads(await req.text()) | |
resultsCount = len(resp) | |
"""If no posts are found for the tags inputted by the user, try to find similar tags""" | |
if resultsCount < 1: | |
async with session.get(post['urltags'].format(user_tag)) as tagreq: | |
if tagreq.status == 200: | |
tagresp = json.loads(await tagreq.text()) | |
tagResultsCount = len(tagresp) | |
if tagResultsCount < 1: #if no similar tags found | |
embed = discord.Embed( | |
title=f"{userName}'s {post['rating']} search on {post['board']}" | |
f" for ``{user_tag}``", | |
description=f'No posts or similar tags found for: "{user_tag}".', | |
color=colorCode[2]) | |
await ctx.send(embed=embed) | |
return | |
tagresp = sorted(tagresp, key=lambda x: x[post['tagcount']], reverse=True) | |
tagsList = f"``{tagresp[0]['name']}`` - {tagresp[0][post['tagcount']]} post(s)\n" | |
for i in range(1, 5): | |
try: | |
tagsList += f"``{tagresp[i]['name']}``" \ | |
f" - {tagresp[i][post['tagcount']]} post(s)\n" | |
except IndexError: | |
break | |
embed = discord.Embed( | |
title=f"{userName}'s {post['rating']} search on {post['board']} for ``{user_tag}``", | |
description=f'**Found nothing for "{user_tag}". Similar tags:**\n' | |
f'{tagsList}', | |
color=colorCode[1] | |
) | |
embed.set_footer( | |
text='Note: some tags may be empty (have no posts) ' | |
'contrary to what post count shows.') | |
await ctx.send(embed=embed) | |
return | |
else: | |
print(f'Tags request failed: {tagreq.status}, {tagreq.reason}') | |
embed = discord.Embed( | |
title=f"{userName}'s {post['rating']} search on {post['board']} for ``{user_tag}``", | |
description=f'No posts or similar tags(tag req failed: {tagreq.status}) found ' | |
f'for: "{user_tag}".', | |
color=colorCode[2]) | |
await ctx.send(embed=embed) | |
return | |
postId = random.randint(0, resultsCount - 1) | |
searchAttempts = 0 | |
"""For the sake of this presentation, I've added a filter so that all commands (even NSFW ones) | |
will return a SFW post. Please note, however, that some images posts be wrongly rated on | |
their respective sites. This is out of my control, so use the command at your own "risk".""" | |
#ratingFilter = post['rating'] #normal functionality | |
ratingFilter = 'sfw' #changed so that all commands should return Safe content | |
if ratingFilter == 'sfw': | |
avoidPost = ['q', 'e'] | |
elif ratingFilter == 'nsfw': | |
avoidPost = ['s'] | |
while resp[postId]['rating'] in avoidPost: | |
searchAttempts += 1 | |
postId = random.randint(0, resultsCount - 1) | |
if searchAttempts >= 20: | |
embed = discord.Embed(title=f"{userName}'s {post['rating']} search on {post['board']} " | |
f"for ``{user_tag}``", | |
description=f'No {post["rating"]} posts found for the tags: ' | |
f'"{user_tag}".\nTry again.', | |
color=colorCode[2]) | |
await ctx.send(embed=embed) | |
return | |
"""If the post has too many tags, keeps only the first few so that the embed doesn't get overloaded | |
""" | |
postTags = resp[postId]['tags'].replace(" ", ", ") | |
while len(postTags) > 185: | |
postTags = postTags[:185] | |
postTags = postTags.rsplit(', ', 1)[0] | |
if user_tag == " ": | |
user_tag = 'anything' | |
embed = discord.Embed(title=f"{userName}'s {post['rating']} search on {post['board']} " | |
f"for ``{user_tag}``", | |
description=f'**Tags**: ``{postTags}``', | |
color=colorCode[post['color']]) | |
embed.add_field(name='Links', | |
value=f"[{post['board']}]({post['idurl']}{resp[postId]['id']}) " | |
f"[DirectLink]({resp[postId]['file_url']})", | |
inline=True) | |
embed.add_field(name='Rating', value=ratingText[resp[postId]['rating']], inline=True) | |
embed.add_field(name='Height', value=resp[postId][post['height']], inline=True) | |
embed.add_field(name='Width', value=resp[postId][post['width']], inline=True) | |
embed.set_image(url=resp[postId]['file_url']) | |
await ctx.send(embed=embed) | |
else: | |
print(f'Api Req Failed: {req.reason}, {req.status}') | |
def setup(client): | |
client.add_cog(ImageSearch(client)) |