-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
5 changed files
with
179 additions
and
25 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,145 @@ | ||
# Copyright (c) 2024 iiPython | ||
|
||
# Modules | ||
import typing | ||
from base64 import b64encode | ||
from urllib.parse import quote_plus | ||
from platform import python_version | ||
|
||
from requests import Session | ||
|
||
from nightwatch import __version__ | ||
from nightwatch.bot import Client, Context | ||
from nightwatch.bot.client import AuthorizationFailed | ||
|
||
# Handle now playing | ||
session = Session() | ||
|
||
def get_spotify_access() -> str: | ||
with session.post( | ||
"https://accounts.spotify.com/api/token", | ||
data = "grant_type=client_credentials", | ||
headers = { | ||
"Authorization": f"Basic {b64encode(b'3f974573800a4ff5b325de9795b8e603:ff188d2860ff44baa57acc79c121a3b9').decode()}", | ||
"Content-Type": "application/x-www-form-urlencoded" | ||
} | ||
) as response: | ||
return response.json()["access_token"] | ||
|
||
def get_now_playing() -> tuple[dict | None, str | None]: | ||
with session.get("https://api.listenbrainz.org/1/user/iiPython/playing-now") as response: | ||
result = (response.json())["payload"]["listens"] | ||
|
||
if not result: | ||
return None, None | ||
|
||
result = result[0] | ||
|
||
# Reorganize the result data | ||
tm = result["track_metadata"] | ||
result = {"artist": tm["artist_name"], "track": tm["track_name"], "album": tm["release_name"],} | ||
with session.get( | ||
f"https://api.spotify.com/v1/search?q={quote_plus(f'{result['artist']} {result['album']}')}&type=album&limit=1", | ||
headers = { | ||
"Authorization": f"Bearer {get_spotify_access()}", | ||
"Content-Type": "application/x-www-form-urlencoded" | ||
} | ||
) as response: | ||
images = (response.json())["albums"]["items"] | ||
return result, images[0]["images"][-1]["url"] | ||
|
||
# Helping methods | ||
def dominant_color(hex: str) -> str: | ||
r, g, b = tuple(int(hex[i:i + 2], 16) for i in (0, 2, 4)) | ||
if r > g and r > b: | ||
return "red(ish)" | ||
|
||
elif g > r and g > b: | ||
return "green(ish)" | ||
|
||
elif b > r and b > g: | ||
return "blue(ish)" | ||
|
||
elif r == g and r > b: | ||
return "yellow(ish)" | ||
|
||
elif r == b and r > g: | ||
return "magenta(ish)" | ||
|
||
elif g == b and g > r: | ||
return "cyan(ish)" | ||
|
||
return "gray(ish)" | ||
|
||
# Create client | ||
class NextgenerationBot(Client): | ||
def __init__(self) -> None: | ||
super().__init__() | ||
|
||
# Extra data | ||
self.send_on_join = None | ||
|
||
async def rejoin(self, username: typing.Optional[str] = None, hex: typing.Optional[str] = None) -> None: | ||
await self.close() | ||
await self.event_loop(username or self.user.name, hex or self.user.hex, self.address) # type: ignore | ||
|
||
async def on_connect(self, ctx: Context) -> None: | ||
print(f"Connected to '{ctx.rics.name}'!") | ||
|
||
async def on_message(self, ctx: Context) -> None: | ||
if self.send_on_join is not None: | ||
await ctx.send(self.send_on_join) | ||
self.send_on_join = None | ||
return | ||
|
||
command = ctx.message.message | ||
if command[0] != "/": | ||
return | ||
|
||
match command[1:].split(" "): | ||
case ["help"]: | ||
await ctx.reply("Commands: /help, /music, /user, /people, /rename, /set-hex, /version") | ||
|
||
case ["music"]: | ||
data, image = get_now_playing() | ||
if not (data and image): | ||
return await ctx.reply("iiPython isn't listening to anything right now.") | ||
|
||
await ctx.send(f"iiPython is listening to {data['track']} by {data['artist']} (on {data['album']}).") | ||
await ctx.send(f"![{data['track']} by {data['artist']} cover art]({image})") | ||
|
||
case ["user", *username]: | ||
client = next(filter(lambda u: u.name == " ".join(username), ctx.rics.users), None) | ||
if client is None: | ||
return await ctx.reply("Specified user doesn't *fucking* exist.") | ||
|
||
await ctx.send(f"**Name:** {'🤖 ' if client.bot else '★ ' if client.admin else ''}{client.name} | **HEX Code:** #{client.hex} [{dominant_color(client.hex)}]") | ||
|
||
case ["rename" | "set-hex" as command, *response]: | ||
try: | ||
await self.rejoin( | ||
" ".join(response) if command == "rename" else None, | ||
response[0] if command == "set-hex" else None | ||
) | ||
|
||
except AuthorizationFailed as problem: | ||
if problem.json is not None: | ||
message = (problem.json.get("message") or problem.json["detail"][0]["msg"]).rstrip(".").lower() | ||
self.send_on_join = f"Failed to switch {'username' if command == 'rename' else 'hex code'} because '{message}'." | ||
|
||
await self.event_loop(self.user.name, self.user.hex, self.address) # type: ignore | ||
|
||
case ["people"]: | ||
await ctx.send(f"There are {len(ctx.rics.users)} users: {', '.join(f'{u.name}{f' ({'admin' if u.admin else 'bot'})' if u.admin or u.bot else ''}' for u in ctx.rics.users)}") | ||
|
||
case ["version"]: | ||
await ctx.reply(f"Running on Nightwatch v{__version__} using Python {python_version()}.") | ||
|
||
case _: | ||
await ctx.reply("I have **no idea** what the *fuck* you just asked...") | ||
|
||
NextgenerationBot().run( | ||
username = "Pizza Eater", | ||
hex = "ff0000", | ||
address = "nightwatch.k4ffu.dev" | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,4 @@ | ||
__version__ = "0.10.6" | ||
__version__ = "0.10.7" | ||
|
||
import re | ||
HEX_COLOR_REGEX = re.compile(r"^[A-Fa-f0-9]{6}$") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters