Browse Source

Initial commit

master
Skyweb 10 months ago
commit
feeb16f592
15 changed files with 784 additions and 0 deletions
  1. +2
    -0
      .gitignore
  2. +106
    -0
      cogs/admin.py
  3. +18
    -0
      cogs/help.py
  4. +172
    -0
      cogs/lastfm.py
  5. +54
    -0
      cogs/misc.py
  6. +16
    -0
      cogs/template.py
  7. BIN
      fonts/DejaVuSans-Bold.ttf
  8. BIN
      fonts/DejaVuSans.ttf
  9. BIN
      images/placeholder.png
  10. +9
    -0
      run.py
  11. +12
    -0
      utils/config.example.py
  12. +113
    -0
      utils/embedinator.py
  13. +55
    -0
      utils/redis.py
  14. +103
    -0
      utils/sorachan.py
  15. +124
    -0
      utils/sorahelp.py

+ 2
- 0
.gitignore View File

@@ -0,0 +1,2 @@
utils/config\.py
*.pyc

+ 106
- 0
cogs/admin.py View File

@@ -0,0 +1,106 @@
import discord
import io
import textwrap
import traceback
from contextlib import redirect_stdout
from discord.ext import commands


class Admin(commands.Cog, command_attrs=dict(hidden=True)):
"""Administration utilities"""

def __init__(self, bot):
self.bot = bot
self._last_result = None

async def cog_check(self, context):
return await self.bot.is_owner(context.author)

def cleanup_code(self, content):
if content.startswith('```py') and content.endswith('```'):
return '\n'.join(content[5:-3].split('\n'))
elif content.startswith('```') and content.endswith('```'):
return '\n'.join(content[3:-3].split('\n'))

return content.strip('` \n')

@commands.command()
async def eval(self, context, *, body: str):
"""Evaluates and runs arbitrary python code, use with care"""

async with context.channel.typing():
await self._eval(context, body)

async def _eval(self, context, body):
env = {
'bot': self.bot,
'context': context,
'ctx': context,
'_': self._last_result
}

env.update(globals())

body = self.cleanup_code(body)
stdout = io.StringIO()

to_compile = f'async def func():\n{textwrap.indent(body, " ")}'

try:
exec(to_compile, env)
except Exception as e:
response = await context.send(
f'```py\n{e.__class__.__name__}: {e}\n```'
)

await self.bot.register_response(response, context.message)
return

func = env['func']
response = None
try:
with redirect_stdout(stdout):
result = await func()
except Exception:
value = stdout.getvalue()
response = await context.send(
f'```py\n{value}{traceback.format_exc()}\n```'
)

try:
await context.message.add_reaction('❌')
except Exception:
pass

else:
value = stdout.getvalue()
try:
await context.message.add_reaction('✅')
except Exception:
pass

if result is None:
if value:
response = await context.send(f'```py\n{value}\n```')
else:
response = await context.send(f'```No output```')
else:
self._last_result = result
response = await context.send(f'```py\n{value}{result}\n```')

if response:
await self.bot.register_response(response, context.message)

@commands.command()
async def reload(self, context):
"""Gracefully reloads"""
response = await context.send('Going offline...')
await self.bot.register_response(response, context.message)
for cog in self.bot.cogs_to_load:
self.bot.unload_extension(f'cogs.{cog}')

await self.bot.logout()


def setup(bot):
bot.add_cog(Admin(bot))

+ 18
- 0
cogs/help.py View File

@@ -0,0 +1,18 @@
from discord.ext import commands
from utils import sorahelp


class Help(commands.Cog):
def __init__(self, bot):
self.bot = bot
self._original_help_command = bot.help_command
bot.help_command = sorahelp.SoraHelpCommand()
bot.help_command.cog = self
self.bot.get_command('help').hidden = True

def cog_unload(self):
self.bot.help_command = self._original_help_command


def setup(bot):
bot.add_cog(Help(bot))

+ 172
- 0
cogs/lastfm.py View File

@@ -0,0 +1,172 @@
import aiohttp
import discord
import io
from discord.ext import commands
from PIL import Image, ImageDraw, ImageFont
from utils import config


class LastFM(commands.Cog):
"""LastFM integration"""

def __init__(self, bot):
self.bot = bot
self.base_url = 'http://ws.audioscrobbler.com/2.0/'
self.session = aiohttp.ClientSession()

def cog_unload(self):
self.bot.loop.create_task(self.session.close())

@commands.command(aliases=['np'])
async def nowplaying(self, context, member: discord.Member = None):
"""Shows currently playing track"""
async with context.channel.typing():
member = member or context.author
username = await self.bot.pg.fetchval(
"""SELECT username
FROM lastfm_users
WHERE id = $1""",
member.id
)

if username:
response = await self.send_nowplaying(
context,
member,
username
)

else:
response = await self.send_not_linked(context, member)

await self.bot.register_response(response, context.message)

async def send_nowplaying(self, context, member, username):
recent_tracks = await self.fetch_recent_tracks(username)
if len(recent_tracks['recenttracks']['track']) == 1:
return await context.send(
f'{member.display_name} is not listening to anything...'
)

else:
track = recent_tracks['recenttracks']['track'][0]
art = await self.fetch_album_art(track)
image = await self.bot.loop.run_in_executor(
None,
self.create_image,
art,
member,
track['name'],
track['artist']['#text']
)

file = discord.File(image, filename='img.png')
return await context.send(file=file)

async def send_not_linked(self, context, member):
return await context.send(
f'Could not find a linked LastFM for {member.display_name}'
)

async def fetch_recent_tracks(self, username, limit=1):
parameters = {
'method': 'user.getrecenttracks',
'user': username,
'api_key': config.lastfm_key,
'limit': limit,
'format': 'json'
}

response = await self.session.get(
self.base_url,
params=parameters
)

return await response.json()

async def fetch_album_art(self, track):
try:
response = await self.session.get(track['image'][2]['#text'])
art_bytes = await response.read()
return Image.open(io.BytesIO(art_bytes))
except Exception:
return Image.open('images/album_art_placeholder.png')

def create_image(self, art, member, track, artist):
image = Image.new('RGBA', (1400, 174), color='#00000000')
draw = ImageDraw.Draw(image)

self.draw_art(image, art)
self.draw_member(draw, member)
self.draw_track(draw, track)
self.draw_artist(draw, artist)

image = image.resize((700, 87), Image.ANTIALIAS)
bytes = io.BytesIO()
image.save(bytes, 'PNG')
bytes.seek(0)
return bytes

def draw_art(self, image, art):
image.paste(art)

def draw_member(self, draw, member):
font = ImageFont.truetype('fonts/DejaVuSans.ttf', 45)
draw.text(
(184, -8),
f'{member.display_name} is listening to',
font=font, fill=(255, 255, 255)
)

def draw_track(self, draw, track):
fontLarge = ImageFont.truetype('fonts/DejaVuSans-Bold.ttf', 80)
truncated = False
while draw.textsize(track, font=fontLarge)[0] > 1100:
truncated = True
track = track[:-1]

if truncated:
track = f'{track}...'

draw.text((178, 34), track, font=fontLarge, fill=(255, 255, 255))

def draw_artist(self, draw, artist):
font = ImageFont.truetype('fonts/DejaVuSans.ttf', 45)
draw.text((184, 122), f'By {artist}', font=font, fill=(255, 255, 255))

@commands.command()
async def link(self, context, *, username):
"""Links your LastFM account"""

exists = await self.bot.pg.fetchval(
"""SELECT *
FROM lastfm_users
WHERE id = $1""",
context.author.id
)

if exists:
await self.bot.pg.fetchval(
"""UPDATE lastfm_users
SET username = $1
WHERE id = $2""",
username,
context.author.id
)

response = await context.send('User has been updated')
else:
await self.bot.pg.fetchval(
"""INSERT INTO lastfm_users (id, username)
VALUES ($1, $2)""",
context.author.id,
username
)

response = await context.send('User has been linked')

self.bot.register_response(response, context.message)


def setup(bot):
bot.add_cog(LastFM(bot))

+ 54
- 0
cogs/misc.py View File

@@ -0,0 +1,54 @@
from discord.ext import commands


class Misc(commands.Cog):
"""Miscellaneous commands"""

def __init__(self, bot):
self.bot = bot

@commands.Cog.listener()
async def on_message(self, message):
phrase = self.get_phrase(message)
if phrase:
response = await message.channel.send(phrase)
await self.bot.register_response(response, message)

@commands.Cog.listener()
async def on_message_edit(self, before, after):
phrase = self.get_phrase(after)
if phrase:
response = await after.channel.send(phrase)
await self.bot.register_response(response, after)

def get_phrase(self, message):
lower_content = message.content.lower()
if lower_content in ['i am sad', 'i feel sad']:
return f'ganbare {message.author.mention}, you got this!'
elif lower_content == f'{self.bot.user.mention} hello':
return 'Nani?'
elif lower_content == f'ohayou {self.bot.user.mention}':
if self.bot.is_owner(message.author):
return f'Ohanyaa~ {message.author.mention}'
else:
return f'Ohayou {message.author.mention}'
elif lower_content == f'{self.bot.user.mention} fuck you':
return 'Kowai!'
elif lower_content == f'{self.bot.user.mention} fuck me':
return 'Gross'
elif lower_content == '(╯°□°)╯︵ ┻━┻':
return '┬─┬ ノ( ゜-゜ノ)'

@commands.command()
async def play(self, context, song: str):
"""I'll play you a song!"""
if song == 'despacito':
response = await context.send('Fakku no!')
else:
response = await context.send('I don\'t know that song yet...')

await self.bot.register_response(response, context.message)


def setup(bot):
bot.add_cog(Misc(bot))

+ 16
- 0
cogs/template.py View File

@@ -0,0 +1,16 @@
from discord.ext import commands


class Template(commands.Cog):
"""Template cog"""

def __init__(self, bot):
self.bot = bot

@commands.command()
async def template(self, context):
context.send('This is a template desu!')


def setup(bot):
bot.add_cog(Template(bot))

BIN
fonts/DejaVuSans-Bold.ttf View File


BIN
fonts/DejaVuSans.ttf View File


BIN
images/placeholder.png View File

Before After
Width: 174  |  Height: 174  |  Size: 3.6KB

+ 9
- 0
run.py View File

@@ -0,0 +1,9 @@
import logging
from utils import config
from utils import sorachan

logging.basicConfig(level=logging.INFO)

bot = sorachan.Sorachan()

bot.run(config.token)

+ 12
- 0
utils/config.example.py View File

@@ -0,0 +1,12 @@
token = ''
ws_host = ''
ws_port = 0
redis_host = ''
redis_port = 0
redis_password = ''
lastfm_key = ''
lastfm_secret = ''
pg_user = ''
pg_password = ''
pg_database = ''
pg_host = ''

+ 113
- 0
utils/embedinator.py View File

@@ -0,0 +1,113 @@
import asyncio
import discord
import traceback


class Embedinator():
def __init__(self, bot, destination, member=None, **kwargs):
self.bot = bot
self.destination = destination
self.member = member
self.max_fields = kwargs.get('max_fields', 25)
self.base_embed = discord.Embed(**kwargs)
self.embed_list = []
self.current = 0
self.buttons = ['◀', '▶', '⏹']

@property
def last_page(self):
if len(self.embed_list) == 0:
self.add_embed_page()
return self.embed_list[-1]

def add_embed_page(self):
self.embed_list.append(self.base_embed.copy())
if len(self.embed_list) > 1:
self.set_footer()

def add_field(self, *, name, value, inline=True):
if len(self.last_page.fields) >= self.max_fields:
self.add_embed_page()
self.last_page.add_field(name=name, value=value, inline=inline)

async def send(self):
self.message = await self.destination.send(
embed=self.embed_list[self.current])
self.active = True
return self.message

async def edit(self):
await self.message.edit(
embed=self.embed_list[self.current])

async def handle(self):
if len(self.embed_list) == 1:
return

for button in self.buttons:
await self.message.add_reaction(button)

while self.active:
done, pending = await asyncio.wait([
self.bot.wait_for(
'reaction_add',
check=self.check_reaction
),
self.bot.wait_for(
'reaction_remove',
check=self.check_reaction
)
],
timeout=60.0,
return_when=asyncio.FIRST_COMPLETED
)

try:
if any(done):
reaction, user = done.pop().result()
for future in pending:
future.cancel()
await self.handle_reaction(reaction, user)
else:
await self.cleanup()
except Exception:
traceback.print_exc()
return

def check_reaction(self, reaction, user):
return (reaction.message.id == self.message.id and
str(reaction.emoji) in self.buttons and
(self.member is None or user.id == self.member.id))

async def handle_reaction(self, reaction, user):
if str(reaction.emoji) == '⏹':
await self.cleanup()
if str(reaction.emoji) == '▶':
self.current += 1
if self.current == len(self.embed_list):
self.current = 0
await self.edit()
if str(reaction.emoji) == '◀':
self.current -= 1
if self.current == -1:
self.current = len(self.embed_list) - 1
await self.edit()

async def cleanup(self):
try:
await self.message.delete()
except discord.NotFound:
traceback.print_exc()

self.active = False

def set_footer(self):
i = 1
for embed in self.embed_list:
embed.set_footer(text=f'{i}/{len(self.embed_list)}')
i += 1

def set_author(self, **kwargs):
self.base_embed.set_author(**kwargs)
for embed in self.embed_list:
embed.set_author(**kwargs)

+ 55
- 0
utils/redis.py View File

@@ -0,0 +1,55 @@
import aioredis
from utils import config


class Redis:
def __init__(self):
self.connection = None

async def connect(self):
if self.connection is not None and not self.connection.closed:
return

self.connection = await aioredis.create_connection(
(config.redis_host, config.redis_port),
password=config.redis_password
)

async def reconnect(self):
self.connection = await aioredis.create_connection(
self.connection.address
)

def disconnect(self):
if self.connection is None:
return

self.connection.close()

async def rpush(self, key, *values):
return await self.execute('RPUSH', key, *values)

async def lrange(self, key, start, end):
return await self.execute('LRANGE', key, start, end)

async def expire(self, key, seconds):
return await self.execute('EXPIRE', key, seconds)

async def delete(self, *keys):
return await self.execute('DEL', *keys)

async def exists(self, *values):
return await self.execute('EXISTS', *values) == len(values)

async def execute(self, command, *args):
value = await self.connection.execute(command, *args)

return self.decode_value(value)

def decode_value(self, value):
if type(value) is list:
return [self.decode_value(v) for v in value]
elif type(value) is bytes:
return value.decode()

return value

+ 103
- 0
utils/sorachan.py View File

@@ -0,0 +1,103 @@
import asyncpg
import discord
from discord.ext import commands
from utils import config
from utils import redis


class Sorachan(commands.Bot):
def __init__(self):
super().__init__(commands.when_mentioned_or('~'))
self.cogs_to_load = [
'admin',
'help',
'lastfm',
'misc'
]

self.redis = None
self.pg = None

for cog in self.cogs_to_load:
self.load_extension(f'cogs.{cog}')

async def on_command_error(self, context, exception):
if isinstance(exception, commands.CommandOnCooldown):
if await self.is_owner(context.author):
await context.reinvoke()
else:
await super().on_command_error(context, exception)

async def on_ready(self):
if not self.redis:
self.redis = redis.Redis()
await self.redis.connect()

if not self.pg:
self.pg = await asyncpg.connect(
user=config.pg_user,
password=config.pg_password,
database=config.pg_database,
host=config.pg_host
)

info = await self.application_info()
self.owner = info.owner
await self.owner.send('I\'m online!')

async def on_message(self, message):
if message.author.bot:
return

await self.track_message(f'tracked_message {message.id}')
await self.process_commands(message)

async def track_message(self, message):
if await self.redis.exists(message):
return

await self.redis.rpush(message, 0)
await self.redis.expire(message, 3600)

async def on_raw_message_edit(self, payload):
if 'content' not in payload.data:
return

channel = self.get_channel(int(payload.data['channel_id']))

if channel is None:
return

message = channel._state._fetch_message(payload.message_id)
if message is None:
try:
message = await channel.fetch_message(payload.message_id)
except discord.HTTPException:
return

if await self.redis.exists(f'tracked_message {payload.message_id}'):
await self.clear_messages(f'tracked_message {payload.message_id}')
await self.process_commands(message)

async def on_raw_message_delete(self, payload):
if await self.redis.exists(payload.message_id):
await self.clear_messages(payload.message_id)
await self.redis.delete(payload.message_id)

async def clear_messages(self, tracked_message):
for message_data in await self.redis.lrange(tracked_message, 1, -1):
channel_id, message_id = message_data.split(':')
try:
await self.http.delete_message(
int(channel_id), int(message_id))
except discord.NotFound:
pass

await self.redis.execute('LTRIM', tracked_message, 0, 0)

async def register_response(self, response, request):
if await self.redis.exists(f'tracked_message {request.id}'):
await self.redis.rpush(
f'tracked_message {request.id}',
f'{response.channel.id}:{response.id}'
)

+ 124
- 0
utils/sorahelp.py View File

@@ -0,0 +1,124 @@
import discord
from utils.embedinator import Embedinator
from discord.ext import commands


class SoraHelpCommand(commands.HelpCommand):
def __init__(self):
super().__init__()
self.colour = 7593967

def command_not_found(self, string):
return f'Command or category "{string}" not found.'

def subcommand_not_found(self, command, string):
return f'Subcommand "{string}" not found.'

def get_command_name(self, command):
name = command.name
if any(command.aliases):
name = f'{name}, {", ".join(command.aliases)}'

name = f'{name} {command.signature}'
return name

def create_embed(self):
embed = discord.Embed(colour=self.colour)
embed.set_author(
name=self.context.bot.user.name,
icon_url=self.context.bot.user.avatar_url
)

return embed

def create_embedinator(self, **kwargs):
destination = self.get_destination()
embedinator = Embedinator(
self.context.bot,
destination,
self.context.author,
**kwargs,
colour=self.colour
)

embedinator.set_author(
name=self.context.bot.user.name,
icon_url=self.context.bot.user.avatar_url
)

return embedinator

async def send_command_help(self, command):
embed = self.create_embed()
embed.title = self.get_command_name(command)
embed.description = command.short_doc or 'No description'
embed.set_footer(text=f'Category: {command.cog_name}')
destination = self.get_destination()
response = await destination.send(embed=embed)
await self.context.bot.register_response(
response, self.context.message)

async def send_group_help(self, group):
embedinator = self.create_embedinator(
title=self.get_commands(group),
description=group.short_doc or 'No description',
max_fields=4
)

filtered = await self.filter_commands(group.commands)

if filtered:
for command in filtered:
self.add_command_field(embedinator, command)

await self.send_embedinator(embedinator)

async def send_cog_help(self, cog):
embedinator = self.create_embedinator(
title=f'Category: {cog.qualified_name}',
description=cog.description or 'No description',
max_fields=5
)

filtered = await self.filter_commands(cog.get_commands())

if filtered:
for command in filtered:
self.add_command_field(embedinator, command)

await self.send_embedinator(embedinator)

async def send_bot_help(self, mapping):
embedinator = self.create_embedinator(
description=self.get_opening_note())

for cog, cog_commands in mapping.items():
filtered = await self.filter_commands(cog_commands)
if filtered:
embedinator.add_embed_page()
embedinator.add_field(
name=f'Category: {cog.qualified_name}',
value=cog.description or 'No description',
inline=False
)

for command in filtered:
self.add_command_field(embedinator, command)

await self.send_embedinator(embedinator)

async def send_embedinator(self, embedinator):
response = await embedinator.send()
await self.context.bot.register_response(
response, self.context.message)

await embedinator.handle()

def add_command_field(self, embedinator, command):
name = self.get_command_name(command)
embedinator.add_field(name=name, value=command.short_doc, inline=False)

def get_opening_note(self):
command_name = self.context.invoked_with
return "Use `{0}{1} <command/category>` for more info.".format(
self.clean_prefix, command_name)

Loading…
Cancel
Save