From a4c90a21ab145d8bc549bc10281959b6c53d4fed Mon Sep 17 00:00:00 2001 From: rfresh2 <89827146+rfresh2@users.noreply.github.com> Date: Wed, 9 Aug 2023 15:02:12 -0700 Subject: [PATCH] Simplified event bus without reflection --- build.gradle | 1 - src/main/java/com/zenith/Proxy.java | 101 ++--- src/main/java/com/zenith/Shared.java | 6 +- .../com/zenith/command/CommandManager.java | 2 + .../zenith/command/impl/UpdateCommand.java | 4 +- .../com/zenith/database/ChatDatabase.java | 14 +- .../zenith/database/ConnectionsDatabase.java | 15 +- .../java/com/zenith/database/Database.java | 12 +- .../com/zenith/database/DeathsDatabase.java | 10 +- .../zenith/database/PlayerCountDatabase.java | 13 +- .../zenith/database/QueueLengthDatabase.java | 13 +- .../zenith/database/QueueWaitDatabase.java | 19 +- .../com/zenith/database/RestartsDatabase.java | 11 +- .../java/com/zenith/discord/DiscordBot.java | 374 +++++++++--------- .../java/com/zenith/event/SimpleEventBus.java | 73 ++++ .../java/com/zenith/event/Subscription.java | 13 + .../event/module/AntiAfkStuckEvent.java | 4 - .../event/module/AutoEatOutOfFoodEvent.java | 7 +- .../zenith/event/module/ClientTickEvent.java | 4 - .../module/PlayerHealthChangedEvent.java | 4 - .../event/module/WeatherChangeEvent.java | 4 - .../event/proxy/ActiveHoursConnectEvent.java | 4 - .../event/proxy/AutoReconnectEvent.java | 4 - .../com/zenith/event/proxy/ConnectEvent.java | 4 - .../com/zenith/event/proxy/DeathEvent.java | 4 - .../zenith/event/proxy/DeathMessageEvent.java | 3 - .../event/proxy/DiscordMessageSentEvent.java | 4 - .../proxy/HealthAutoDisconnectEvent.java | 4 - .../event/proxy/MsaDeviceCodeLoginEvent.java | 3 - .../proxy/NewPlayerInVisualRangeEvent.java | 3 - .../NonWhitelistedPlayerConnectedEvent.java | 3 - .../event/proxy/PrioBanStatusUpdateEvent.java | 4 - .../zenith/event/proxy/PrioStatusEvent.java | 4 - .../event/proxy/PrioStatusUpdateEvent.java | 4 - .../proxy/ProxyClientDisconnectedEvent.java | 3 - .../event/proxy/ProxyLoginFailedEvent.java | 4 - .../proxy/ProxySpectatorConnectedEvent.java | 3 - .../ProxySpectatorDisconnectedEvent.java | 3 - .../event/proxy/QueueCompleteEvent.java | 4 - .../event/proxy/QueuePositionUpdateEvent.java | 4 - .../event/proxy/SelfDeathMessageEvent.java | 4 - .../event/proxy/ServerChatReceivedEvent.java | 3 - .../proxy/ServerPlayerConnectedEvent.java | 3 - .../proxy/ServerPlayerDisconnectedEvent.java | 3 - .../event/proxy/ServerRestartingEvent.java | 6 +- .../zenith/event/proxy/StartConnectEvent.java | 4 - .../zenith/event/proxy/StartQueueEvent.java | 4 - .../zenith/event/proxy/UpdateStartEvent.java | 4 - .../feature/autoupdater/AutoUpdater.java | 16 +- src/main/java/com/zenith/module/Module.java | 3 +- .../java/com/zenith/module/ModuleManager.java | 19 +- .../java/com/zenith/module/impl/AntiAFK.java | 14 +- .../zenith/module/impl/AutoDisconnect.java | 16 +- .../java/com/zenith/module/impl/AutoEat.java | 13 +- .../com/zenith/module/impl/AutoReply.java | 3 +- .../com/zenith/module/impl/AutoRespawn.java | 5 +- .../com/zenith/module/impl/AutoTotem.java | 3 +- .../java/com/zenith/module/impl/KillAura.java | 3 +- .../java/com/zenith/module/impl/Spammer.java | 3 +- .../java/com/zenith/module/impl/Spook.java | 5 +- .../zenith/network/client/Authenticator.java | 2 +- .../zenith/network/client/ClientListener.java | 4 +- .../client/handler/incoming/ChatHandler.java | 10 +- .../handler/incoming/GameStateHandler.java | 8 +- .../handler/incoming/JoinGameHandler.java | 2 +- .../handler/incoming/PlayerHealthHandler.java | 4 +- .../handler/incoming/ServerCombatHandler.java | 2 +- .../handler/incoming/TabListDataHandler.java | 10 +- .../handler/incoming/TabListEntryHandler.java | 4 +- .../handler/incoming/TitlePacketHandler.java | 2 +- .../incoming/spawn/SpawnPlayerHandler.java | 2 +- .../network/server/ServerConnection.java | 6 +- .../handler/ProxyServerLoginHandler.java | 4 +- .../outgoing/LoginSuccessOutgoingHandler.java | 2 +- src/main/java/com/zenith/util/Pair.java | 7 + .../META-INF/native-image/reflect-config.json | 293 -------------- 76 files changed, 523 insertions(+), 746 deletions(-) create mode 100644 src/main/java/com/zenith/event/SimpleEventBus.java create mode 100644 src/main/java/com/zenith/event/Subscription.java create mode 100644 src/main/java/com/zenith/util/Pair.java diff --git a/build.gradle b/build.gradle index 121742f17..da0f54313 100644 --- a/build.gradle +++ b/build.gradle @@ -218,7 +218,6 @@ dependencies { // uncomment to use io_uring // shade 'io.netty.incubator:netty-incubator-transport-native-io_uring:0.0.21.Final:linux-x86_64' - shade "com.github.collarmc:pounce:d921e2c156" shade('org.redisson:redisson:3.23.2') { exclude group: 'io.netty' } diff --git a/src/main/java/com/zenith/Proxy.java b/src/main/java/com/zenith/Proxy.java index c8d67e098..1ee6e5557 100644 --- a/src/main/java/com/zenith/Proxy.java +++ b/src/main/java/com/zenith/Proxy.java @@ -1,13 +1,13 @@ package com.zenith; import ch.qos.logback.classic.LoggerContext; -import com.collarmc.pounce.Subscribe; import com.github.steveice10.mc.protocol.MinecraftConstants; import com.github.steveice10.mc.protocol.MinecraftProtocol; import com.github.steveice10.mc.protocol.packet.ingame.server.ServerChatPacket; import com.github.steveice10.packetlib.BuiltinFlags; import com.github.steveice10.packetlib.tcp.TcpServer; import com.zenith.cache.data.PlayerCache; +import com.zenith.event.Subscription; import com.zenith.event.proxy.*; import com.zenith.feature.autoupdater.AutoUpdater; import com.zenith.feature.autoupdater.GitAutoUpdater; @@ -45,10 +45,12 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.Stream; import static com.zenith.Shared.*; +import static com.zenith.util.Pair.of; import static java.util.Objects.isNull; import static java.util.Objects.nonNull; @@ -77,6 +79,7 @@ public class Proxy { @Getter @Setter private AutoUpdater autoUpdater; + private Subscription eventSubscription; public static void main(String... args) { SLF4JBridgeHandler.removeHandlersForRootLogger(); @@ -98,11 +101,29 @@ public static String determineVersion() { } } + @SuppressWarnings("unchecked") + public void initEventHandlers() { + if (eventSubscription != null) throw new RuntimeException("Event handlers already initialized"); + eventSubscription = EVENT_BUS.subscribe( + of(DisconnectEvent.class, (Consumer)this::handleDisconnectEvent), + of(ConnectEvent.class, (Consumer)this::handleConnectEvent), + of(StartQueueEvent.class, (Consumer)this::handleStartQueueEvent), + of(QueuePositionUpdateEvent.class, (Consumer)this::handleQueuePositionUpdateEvent), + of(QueueCompleteEvent.class, (Consumer)this::handleQueueCompleteEvent), + of(PlayerOnlineEvent.class, (Consumer)this::handlePlayerOnlineEvent), + of(ProxyClientDisconnectedEvent.class, (Consumer)this::handleProxyClientDisconnectedEvent), + of(ServerRestartingEvent.class, (Consumer)this::handleServerRestartingEvent), + of(PrioStatusEvent.class, (Consumer)this::handlePrioStatusEvent), + of(ServerPlayerConnectedEvent.class, (Consumer)this::handleServerPlayerConnectedEvent), + of(ServerPlayerDisconnectedEvent.class, (Consumer)this::handleServerPlayerDisconnectedEvent) + ); + } + public void start() { loadConfig(); loadLaunchConfig(); DEFAULT_LOG.info("Starting ZenithProxy-{}", LAUNCH_CONFIG.version); - EVENT_BUS.subscribe(this); + initEventHandlers(); try { if (CONFIG.interactiveTerminal.enable) { TERMINAL_MANAGER.start(); @@ -265,13 +286,13 @@ public void connectAndCatchExceptions() { */ public synchronized void connect() { try { - EVENT_BUS.dispatch(new StartConnectEvent()); + EVENT_BUS.postAsync(new StartConnectEvent()); this.logIn(); } catch (final RuntimeException e) { - EVENT_BUS.dispatch(new ProxyLoginFailedEvent()); + EVENT_BUS.post(new ProxyLoginFailedEvent()); getActiveConnections().forEach(connection -> connection.disconnect("Login failed")); SCHEDULED_EXECUTOR_SERVICE.schedule(() -> { - EVENT_BUS.dispatch(new DisconnectEvent("Login Failed")); + EVENT_BUS.post(new DisconnectEvent("Login Failed")); }, 1L, TimeUnit.SECONDS); return; } @@ -402,30 +423,6 @@ public List getSpectatorConnections() { .collect(Collectors.toList()); } - @Subscribe - public void handleDisconnectEvent(DisconnectEvent event) { - CACHE.reset(true); - this.disconnectTime = Instant.now(); - this.inQueue = false; - this.queuePosition = 0; - if (!CONFIG.client.extra.utility.actions.autoDisconnect.autoClientDisconnect) { - // skip autoreconnect when we want to sync client disconnect - if (CONFIG.client.extra.autoReconnect.enabled && isReconnectableDisconnect(event.reason)) { - if (autoReconnectIsInProgress()) { - return; - } - this.autoReconnectFuture = Optional.of(SCHEDULED_EXECUTOR_SERVICE.submit(() -> { - delayBeforeReconnect(); - synchronized (this.autoReconnectFuture) { - if (this.autoReconnectFuture.isPresent()) this.connect(); - this.autoReconnectFuture = Optional.empty(); - } - })); - } - } - TPS_CALCULATOR.reset(); - } - public void delayBeforeReconnect() { try { final int countdown; @@ -443,7 +440,7 @@ public void delayBeforeReconnect() { // countdown = CONFIG.client.extra.autoReconnect.delaySeconds // + CONFIG.client.extra.autoReconnect.linearIncrease * this.reconnectCounter++; // } - EVENT_BUS.dispatch(new AutoReconnectEvent(countdown)); + EVENT_BUS.postAsync(new AutoReconnectEvent(countdown)); for (int i = countdown; SHOULD_RECONNECT && i > 0; i--) { if (i % 10 == 0) CLIENT_LOG.info("Reconnecting in {}", i); Wait.waitALittle(1); @@ -457,7 +454,7 @@ public void updatePrioBanStatus() { if (!CONFIG.client.extra.prioBan2b2tCheck || !CONFIG.client.server.address.toLowerCase(Locale.ROOT).contains("2b2t.org")) return; this.isPrioBanned = PRIORITY_BAN_CHECKER.checkPrioBan(); if (this.isPrioBanned.isPresent() && !this.isPrioBanned.get().equals(CONFIG.authentication.prioBanned)) { - EVENT_BUS.dispatch(new PrioBanStatusUpdateEvent(this.isPrioBanned.get())); + EVENT_BUS.postAsync(new PrioBanStatusUpdateEvent(this.isPrioBanned.get())); CONFIG.authentication.prioBanned = this.isPrioBanned.get(); saveConfig(); CLIENT_LOG.info("Prio Ban Change Detected: " + this.isPrioBanned.get()); @@ -489,7 +486,7 @@ private void handleActiveHoursTick() { }) .findAny() .ifPresent(t -> { - EVENT_BUS.dispatch(new ActiveHoursConnectEvent()); + EVENT_BUS.postAsync(new ActiveHoursConnectEvent()); this.lastActiveHoursConnect = Instant.now(); disconnect(SYSTEM_DISCONNECT); Wait.waitALittle(30); @@ -526,45 +523,62 @@ public void updateFavicon() { } } - @Subscribe + public void handleDisconnectEvent(DisconnectEvent event) { + CACHE.reset(true); + this.disconnectTime = Instant.now(); + this.inQueue = false; + this.queuePosition = 0; + if (!CONFIG.client.extra.utility.actions.autoDisconnect.autoClientDisconnect) { + // skip autoreconnect when we want to sync client disconnect + if (CONFIG.client.extra.autoReconnect.enabled && isReconnectableDisconnect(event.reason)) { + if (autoReconnectIsInProgress()) { + return; + } + this.autoReconnectFuture = Optional.of(SCHEDULED_EXECUTOR_SERVICE.submit(() -> { + delayBeforeReconnect(); + synchronized (this.autoReconnectFuture) { + if (this.autoReconnectFuture.isPresent()) this.connect(); + this.autoReconnectFuture = Optional.empty(); + } + })); + } + } + TPS_CALCULATOR.reset(); + } + + public void handleConnectEvent(ConnectEvent event) { this.connectTime = Instant.now(); cancelAutoReconnect(); } - @Subscribe public void handleStartQueueEvent(StartQueueEvent event) { this.inQueue = true; this.queuePosition = 0; updatePrioBanStatus(); } - @Subscribe public void handleQueuePositionUpdateEvent(QueuePositionUpdateEvent event) { this.queuePosition = event.position; } - @Subscribe public void handleQueueCompleteEvent(QueueCompleteEvent event) { this.inQueue = false; this.connectTime = Instant.now(); } - @Subscribe public void handlePlayerOnlineEvent(PlayerOnlineEvent event) { if (!this.isPrio.isPresent()) { // assume we are prio if we skipped queuing - EVENT_BUS.dispatch(new PrioStatusEvent(true)); + EVENT_BUS.postAsync(new PrioStatusEvent(true)); } PlayerCache.sync(); } - @Subscribe - public void handleClientDisconnect(final ProxyClientDisconnectedEvent e) { + public void handleProxyClientDisconnectedEvent(final ProxyClientDisconnectedEvent e) { PlayerCache.sync(); } - @Subscribe public void handleServerRestartingEvent(ServerRestartingEvent event) { if (!CONFIG.authentication.prio && isNull(getCurrentPlayer().get())) { Wait.waitRandomWithinMsBound(30000); @@ -572,7 +586,6 @@ public void handleServerRestartingEvent(ServerRestartingEvent event) { } } - @Subscribe public void handlePrioStatusEvent(PrioStatusEvent event) { if (CONFIG.client.server.address.toLowerCase().contains("2b2t.org")) { if (event.prio == CONFIG.authentication.prio) { @@ -582,7 +595,7 @@ public void handlePrioStatusEvent(PrioStatusEvent event) { } } else { CLIENT_LOG.info("Prio Change Detected: " + event.prio); - EVENT_BUS.dispatch(new PrioStatusUpdateEvent(event.prio)); + EVENT_BUS.postAsync(new PrioStatusUpdateEvent(event.prio)); this.isPrio = Optional.of(event.prio); CONFIG.authentication.prio = event.prio; saveConfig(); @@ -590,7 +603,6 @@ public void handlePrioStatusEvent(PrioStatusEvent event) { } } - @Subscribe public void handleServerPlayerConnectedEvent(ServerPlayerConnectedEvent event) { if (CONFIG.client.extra.chat.showConnectionMessages) { ServerConnection serverConnection = getCurrentPlayer().get(); @@ -600,7 +612,6 @@ public void handleServerPlayerConnectedEvent(ServerPlayerConnectedEvent event) { } } - @Subscribe public void handleServerPlayerDisconnectedEvent(ServerPlayerDisconnectedEvent event) { if (CONFIG.client.extra.chat.showConnectionMessages) { ServerConnection serverConnection = getCurrentPlayer().get(); diff --git a/src/main/java/com/zenith/Shared.java b/src/main/java/com/zenith/Shared.java index bb49de17f..f11d2bdfe 100644 --- a/src/main/java/com/zenith/Shared.java +++ b/src/main/java/com/zenith/Shared.java @@ -1,12 +1,12 @@ package com.zenith; -import com.collarmc.pounce.EventBus; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.zenith.cache.DataCache; import com.zenith.command.CommandManager; import com.zenith.database.DatabaseManager; import com.zenith.discord.DiscordBot; +import com.zenith.event.SimpleEventBus; import com.zenith.feature.pathing.Pathing; import com.zenith.feature.pathing.World; import com.zenith.feature.pathing.blockdata.BlockDataManager; @@ -83,7 +83,7 @@ public static boolean isReconnectableDisconnect(final String reason) { public static LaunchConfig LAUNCH_CONFIG; public static final DataCache CACHE; public static final DiscordBot DISCORD_BOT; - public static final EventBus EVENT_BUS; + public static final SimpleEventBus EVENT_BUS; public static final ScheduledExecutorService SCHEDULED_EXECUTOR_SERVICE; public static final WhitelistManager WHITELIST_MANAGER; public static final PriorityBanChecker PRIORITY_BAN_CHECKER; @@ -306,7 +306,7 @@ public static synchronized void saveLaunchConfig() { }); SCHEDULED_EXECUTOR_SERVICE = Executors.newScheduledThreadPool(16); DISCORD_BOT = new DiscordBot(); - EVENT_BUS = new EventBus(Runnable::run); + EVENT_BUS = new SimpleEventBus(); CACHE = new DataCache(); WHITELIST_MANAGER = new WhitelistManager(); PRIORITY_BAN_CHECKER = new PriorityBanChecker(); diff --git a/src/main/java/com/zenith/command/CommandManager.java b/src/main/java/com/zenith/command/CommandManager.java index c665ce904..86f0ea38e 100644 --- a/src/main/java/com/zenith/command/CommandManager.java +++ b/src/main/java/com/zenith/command/CommandManager.java @@ -33,6 +33,8 @@ public void init() { new ActiveHoursCommand(), new AntiAFKCommand(), new AutoDisconnectCommand(), + new AutoEatCommand(), + new AutoReconnectCommand(), new AutoReplyCommand(), new AutoRespawnCommand(), new AutoTotemCommand(), diff --git a/src/main/java/com/zenith/command/impl/UpdateCommand.java b/src/main/java/com/zenith/command/impl/UpdateCommand.java index db2203003..a56f8f536 100644 --- a/src/main/java/com/zenith/command/impl/UpdateCommand.java +++ b/src/main/java/com/zenith/command/impl/UpdateCommand.java @@ -23,7 +23,7 @@ public CommandUsage commandUsage() { public LiteralArgumentBuilder register() { return command("update").requires(Command::validateAccountOwner).executes(c -> { try { - EVENT_BUS.dispatch(new UpdateStartEvent()); + EVENT_BUS.post(new UpdateStartEvent()); CONFIG.discord.isUpdating = true; if (Proxy.getInstance().isConnected()) { CONFIG.autoUpdater.shouldReconnectAfterAutoUpdate = true; @@ -42,7 +42,7 @@ public LiteralArgumentBuilder register() { }).then(literal("c").executes(c -> { CONFIG.discord.isUpdating = true; CONFIG.autoUpdater.shouldReconnectAfterAutoUpdate = true; - EVENT_BUS.dispatch(new UpdateStartEvent()); + EVENT_BUS.post(new UpdateStartEvent()); Proxy.getInstance().stop(); })); } diff --git a/src/main/java/com/zenith/database/ChatDatabase.java b/src/main/java/com/zenith/database/ChatDatabase.java index 3efce8308..63088c70c 100644 --- a/src/main/java/com/zenith/database/ChatDatabase.java +++ b/src/main/java/com/zenith/database/ChatDatabase.java @@ -1,9 +1,9 @@ package com.zenith.database; -import com.collarmc.pounce.Subscribe; import com.zenith.Proxy; import com.zenith.database.dto.tables.Chats; import com.zenith.database.dto.tables.records.ChatsRecord; +import com.zenith.event.Subscription; import com.zenith.event.proxy.ServerChatReceivedEvent; import org.jooq.DSLContext; import org.jooq.InsertSetMoreStep; @@ -16,8 +16,7 @@ import java.time.ZoneOffset; import java.util.UUID; -import static com.zenith.Shared.CONFIG; -import static com.zenith.Shared.DATABASE_LOG; +import static com.zenith.Shared.*; public class ChatDatabase extends LockingDatabase { public ChatDatabase(QueryExecutor queryExecutor, RedisClient redisClient) { @@ -44,7 +43,14 @@ public Instant getLastEntryTime() { return chatsRecord.get(c.TIME).toInstant(); } - @Subscribe + @Override + public Subscription initEvents() { + return EVENT_BUS.subscribe( + ServerChatReceivedEvent.class, this::handleServerChatReceivedEvent + ); + } + + public void handleServerChatReceivedEvent(ServerChatReceivedEvent event) { if (!CONFIG.client.server.address.endsWith("2b2t.org") // only write on 2b2t || Proxy.getInstance().isInQueue() // ignore queue diff --git a/src/main/java/com/zenith/database/ConnectionsDatabase.java b/src/main/java/com/zenith/database/ConnectionsDatabase.java index 82e336dad..496242d4b 100644 --- a/src/main/java/com/zenith/database/ConnectionsDatabase.java +++ b/src/main/java/com/zenith/database/ConnectionsDatabase.java @@ -1,12 +1,13 @@ package com.zenith.database; -import com.collarmc.pounce.Subscribe; import com.zenith.Proxy; import com.zenith.database.dto.enums.Connectiontype; import com.zenith.database.dto.tables.Connections; import com.zenith.database.dto.tables.records.ConnectionsRecord; +import com.zenith.event.Subscription; import com.zenith.event.proxy.ServerPlayerConnectedEvent; import com.zenith.event.proxy.ServerPlayerDisconnectedEvent; +import com.zenith.util.Pair; import org.jooq.*; import org.jooq.impl.DSL; @@ -15,14 +16,24 @@ import java.time.OffsetDateTime; import java.time.ZoneOffset; import java.util.UUID; +import java.util.function.Consumer; import static com.zenith.Shared.DATABASE_LOG; +import static com.zenith.Shared.EVENT_BUS; public class ConnectionsDatabase extends LockingDatabase { public ConnectionsDatabase(final QueryExecutor queryExecutor, final RedisClient redisClient) { super(queryExecutor, redisClient); } + @Override + public Subscription initEvents() { + return EVENT_BUS.subscribe( + Pair.of(ServerPlayerConnectedEvent.class, (Consumer) this::handleServerPlayerConnectedEvent), + Pair.of(ServerPlayerDisconnectedEvent.class, (Consumer)this::handleServerPlayerDisconnectedEvent) + ); + } + @Override public String getLockKey() { return "Connections"; @@ -48,12 +59,10 @@ public int getMaxQueueLength() { return 300; // higher limit needed here to handle restarts where there are mass disconnects/connects } - @Subscribe public void handleServerPlayerConnectedEvent(ServerPlayerConnectedEvent event) { writeConnection(Connectiontype.JOIN, event.playerEntry.getName(), event.playerEntry.getId(), Instant.now().atOffset(ZoneOffset.UTC)); } - @Subscribe public void handleServerPlayerDisconnectedEvent(ServerPlayerDisconnectedEvent event) { writeConnection(Connectiontype.LEAVE, event.playerEntry.getName(), event.playerEntry.getId(), Instant.now().atOffset(ZoneOffset.UTC)); } diff --git a/src/main/java/com/zenith/database/Database.java b/src/main/java/com/zenith/database/Database.java index b4aff0a8e..61624366b 100644 --- a/src/main/java/com/zenith/database/Database.java +++ b/src/main/java/com/zenith/database/Database.java @@ -1,19 +1,25 @@ package com.zenith.database; -import static com.zenith.Shared.EVENT_BUS; +import com.zenith.event.Subscription; public abstract class Database { protected final QueryExecutor queryExecutor; + private Subscription eventSubscription; public Database(final QueryExecutor queryExecutor) { this.queryExecutor = queryExecutor; } public void start() { - EVENT_BUS.subscribe(this); + initEvents(); } public void stop() { - EVENT_BUS.unsubscribe(this); + if (eventSubscription != null) { + eventSubscription.unsubscribe(); + eventSubscription = null; + } } + + public abstract Subscription initEvents(); } diff --git a/src/main/java/com/zenith/database/DeathsDatabase.java b/src/main/java/com/zenith/database/DeathsDatabase.java index 8359af6d1..3d516aa12 100644 --- a/src/main/java/com/zenith/database/DeathsDatabase.java +++ b/src/main/java/com/zenith/database/DeathsDatabase.java @@ -1,9 +1,9 @@ package com.zenith.database; -import com.collarmc.pounce.Subscribe; import com.zenith.cache.data.tab.PlayerEntry; import com.zenith.database.dto.tables.Deaths; import com.zenith.database.dto.tables.records.DeathsRecord; +import com.zenith.event.Subscription; import com.zenith.event.proxy.DeathMessageEvent; import com.zenith.feature.deathmessages.DeathMessageParseResult; import com.zenith.feature.deathmessages.Killer; @@ -28,6 +28,13 @@ public DeathsDatabase(final QueryExecutor queryExecutor, final RedisClient redis super(queryExecutor, redisClient); } + @Override + public Subscription initEvents() { + return EVENT_BUS.subscribe( + DeathMessageEvent.class, this::handleDeathMessageEvent + ); + } + @Override public String getLockKey() { return "Deaths"; @@ -48,7 +55,6 @@ public Instant getLastEntryTime() { return deathsRecord.get(d.TIME).toInstant(); } - @Subscribe public void handleDeathMessageEvent(DeathMessageEvent event) { if (!CONFIG.client.server.address.endsWith("2b2t.org")) return; writeDeath(event.deathMessageParseResult, event.deathMessageRaw, Instant.now().atOffset(ZoneOffset.UTC)); diff --git a/src/main/java/com/zenith/database/PlayerCountDatabase.java b/src/main/java/com/zenith/database/PlayerCountDatabase.java index 13171394a..1f9ade799 100644 --- a/src/main/java/com/zenith/database/PlayerCountDatabase.java +++ b/src/main/java/com/zenith/database/PlayerCountDatabase.java @@ -1,9 +1,9 @@ package com.zenith.database; -import com.collarmc.pounce.Subscribe; import com.zenith.Proxy; import com.zenith.database.dto.tables.Playercount; import com.zenith.database.dto.tables.records.PlayercountRecord; +import com.zenith.event.Subscription; import com.zenith.event.module.ClientTickEvent; import org.jooq.*; import org.jooq.impl.DSL; @@ -13,8 +13,7 @@ import java.time.OffsetDateTime; import java.time.ZoneOffset; -import static com.zenith.Shared.CACHE; -import static com.zenith.Shared.DATABASE_LOG; +import static com.zenith.Shared.*; public class PlayerCountDatabase extends LockingDatabase { private static final Duration updateInterval = Duration.ofMinutes(5L); @@ -24,6 +23,13 @@ public PlayerCountDatabase(QueryExecutor queryExecutor, RedisClient redisClient) super(queryExecutor, redisClient); } + @Override + public Subscription initEvents() { + return EVENT_BUS.subscribe( + ClientTickEvent.class, this::handleClientTickEvent + ); + } + @Override public String getLockKey() { return "PlayerCount"; @@ -44,7 +50,6 @@ public Instant getLastEntryTime() { return timeRecordResult.get(0).value1().toInstant(); } - @Subscribe public void handleClientTickEvent(final ClientTickEvent event) { if (lastUpdate.isBefore(Instant.now().minus(updateInterval))) { if (!Proxy.getInstance().isOnlineOn2b2tForAtLeastDuration(Duration.ofSeconds(30L))) return; diff --git a/src/main/java/com/zenith/database/QueueLengthDatabase.java b/src/main/java/com/zenith/database/QueueLengthDatabase.java index a8977098c..20b915fd1 100644 --- a/src/main/java/com/zenith/database/QueueLengthDatabase.java +++ b/src/main/java/com/zenith/database/QueueLengthDatabase.java @@ -1,8 +1,8 @@ package com.zenith.database; -import com.collarmc.pounce.Subscribe; import com.zenith.database.dto.tables.Queuelength; import com.zenith.database.dto.tables.records.QueuelengthRecord; +import com.zenith.event.Subscription; import com.zenith.event.module.ClientTickEvent; import com.zenith.feature.queue.Queue; import com.zenith.feature.queue.QueueStatus; @@ -14,8 +14,7 @@ import java.time.OffsetDateTime; import java.time.ZoneOffset; -import static com.zenith.Shared.CONFIG; -import static com.zenith.Shared.DATABASE_LOG; +import static com.zenith.Shared.*; public class QueueLengthDatabase extends LockingDatabase { private Instant lastUpdate = Instant.EPOCH; @@ -24,6 +23,13 @@ public QueueLengthDatabase(QueryExecutor queryExecutor, RedisClient redisClient) super(queryExecutor, redisClient); } + @Override + public Subscription initEvents() { + return EVENT_BUS.subscribe( + ClientTickEvent.class, this::handleTickEvent + ); + } + @Override public String getLockKey() { return "QueueLength"; @@ -44,7 +50,6 @@ public Instant getLastEntryTime() { return timeRecordResult.get(0).value1().toInstant(); } - @Subscribe public void handleTickEvent(final ClientTickEvent event) { if (lastUpdate.isBefore(Instant.now().minus(Duration.ofMinutes(CONFIG.server.queueStatusRefreshMinutes + 1L)))) { lastUpdate = Instant.now(); diff --git a/src/main/java/com/zenith/database/QueueWaitDatabase.java b/src/main/java/com/zenith/database/QueueWaitDatabase.java index 1e832e9b9..63bd7c09c 100644 --- a/src/main/java/com/zenith/database/QueueWaitDatabase.java +++ b/src/main/java/com/zenith/database/QueueWaitDatabase.java @@ -1,8 +1,8 @@ package com.zenith.database; -import com.collarmc.pounce.Subscribe; import com.zenith.database.dto.tables.Queuewait; import com.zenith.database.dto.tables.records.QueuewaitRecord; +import com.zenith.event.Subscription; import com.zenith.event.proxy.QueueCompleteEvent; import com.zenith.event.proxy.QueuePositionUpdateEvent; import com.zenith.event.proxy.ServerRestartingEvent; @@ -16,8 +16,11 @@ import java.time.Instant; import java.time.ZoneOffset; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; import static com.zenith.Shared.CONFIG; +import static com.zenith.Shared.EVENT_BUS; +import static com.zenith.util.Pair.of; import static java.util.Objects.nonNull; public class QueueWaitDatabase extends Database { @@ -33,19 +36,26 @@ public QueueWaitDatabase(QueryExecutor queryExecutor) { super(queryExecutor); } - @Subscribe + @Override + public Subscription initEvents() { + return EVENT_BUS.subscribe( + of(ServerRestartingEvent.class, (Consumer)this::handleServerRestart), + of(StartQueueEvent.class, (Consumer)this::handleStartQueue), + of(QueuePositionUpdateEvent.class, (Consumer)this::handleQueuePosition), + of(QueueCompleteEvent.class, (Consumer)this::handleQueueComplete) + ); + } + public void handleServerRestart(final ServerRestartingEvent event) { lastServerRestart = Instant.now(); } - @Subscribe public void handleStartQueue(final StartQueueEvent event) { shouldUpdateQueueLen.set(true); initialQueueLen = null; initialQueueTime = null; } - @Subscribe public void handleQueuePosition(final QueuePositionUpdateEvent event) { // record only first position update if (shouldUpdateQueueLen.compareAndSet(true, false)) { @@ -54,7 +64,6 @@ public void handleQueuePosition(final QueuePositionUpdateEvent event) { } } - @Subscribe public void handleQueueComplete(final QueueCompleteEvent event) { final Instant queueCompleteTime = Instant.now(); diff --git a/src/main/java/com/zenith/database/RestartsDatabase.java b/src/main/java/com/zenith/database/RestartsDatabase.java index 264856d96..083fe5d57 100644 --- a/src/main/java/com/zenith/database/RestartsDatabase.java +++ b/src/main/java/com/zenith/database/RestartsDatabase.java @@ -1,8 +1,8 @@ package com.zenith.database; -import com.collarmc.pounce.Subscribe; import com.zenith.database.dto.tables.Restarts; import com.zenith.database.dto.tables.records.RestartsRecord; +import com.zenith.event.Subscription; import com.zenith.event.proxy.ServerRestartingEvent; import org.jooq.*; import org.jooq.impl.DSL; @@ -13,6 +13,7 @@ import java.time.ZoneOffset; import static com.zenith.Shared.DATABASE_LOG; +import static com.zenith.Shared.EVENT_BUS; public class RestartsDatabase extends LockingDatabase { @@ -23,6 +24,13 @@ public RestartsDatabase(QueryExecutor queryExecutor, RedisClient redisClient) { super(queryExecutor, redisClient); } + @Override + public Subscription initEvents() { + return EVENT_BUS.subscribe( + ServerRestartingEvent.class, this::handleServerRestartEvent + ); + } + @Override public String getLockKey() { return "Restarts"; @@ -43,7 +51,6 @@ public Instant getLastEntryTime() { return timeRecordResult.get(0).value1().toInstant(); } - @Subscribe public void handleServerRestartEvent(final ServerRestartingEvent event) { synchronized (this) { if (lastRestartWrite.isBefore(Instant.now().minus(cooldownDuration))) { diff --git a/src/main/java/com/zenith/discord/DiscordBot.java b/src/main/java/com/zenith/discord/DiscordBot.java index 01a4c13af..a86f7b593 100644 --- a/src/main/java/com/zenith/discord/DiscordBot.java +++ b/src/main/java/com/zenith/discord/DiscordBot.java @@ -1,11 +1,11 @@ package com.zenith.discord; -import com.collarmc.pounce.Subscribe; import com.github.steveice10.mc.protocol.packet.ingame.client.ClientChatPacket; import com.google.common.base.Suppliers; import com.zenith.Proxy; import com.zenith.command.CommandContext; import com.zenith.command.DiscordCommandContext; +import com.zenith.event.Subscription; import com.zenith.event.module.AntiAfkStuckEvent; import com.zenith.event.module.AutoEatOutOfFoodEvent; import com.zenith.event.proxy.*; @@ -43,11 +43,13 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; import static com.zenith.Shared.*; import static com.zenith.command.impl.StatusCommand.getCoordinates; +import static com.zenith.util.Pair.of; import static java.util.Arrays.asList; import static java.util.Objects.isNull; import static java.util.Objects.nonNull; @@ -67,6 +69,7 @@ public class DiscordBot { @Getter private boolean isRunning; + private Subscription eventSubscription; public DiscordBot() { this.mainChannelMessageQueue = new ConcurrentLinkedQueue<>(); @@ -74,6 +77,43 @@ public DiscordBot() { this.isRunning = false; } + public void initEventHandlers() { + if (eventSubscription != null) throw new RuntimeException("Event handlers already initialized"); + eventSubscription = EVENT_BUS.subscribe( + of(ConnectEvent.class, (Consumer)this::handleConnectEvent), + of(PlayerOnlineEvent.class, (Consumer)this::handlePlayerOnlineEvent), + of(DisconnectEvent.class, (Consumer)this::handleDisconnectEvent), + of(QueuePositionUpdateEvent.class, (Consumer)this::handleQueuePositionUpdateEvent), + of(AutoEatOutOfFoodEvent.class, (Consumer)this::handleAutoEatOutOfFoodEvent), + of(QueueCompleteEvent.class, (Consumer)this::handleQueueCompleteEvent), + of(StartQueueEvent.class, (Consumer)this::handleStartQueueEvent), + of(DeathEvent.class, (Consumer)this::handleDeathEvent), + of(SelfDeathMessageEvent.class, (Consumer)this::handleSelfDeathMessageEvent), + of(HealthAutoDisconnectEvent.class, (Consumer)this::handleHealthAutoDisconnectEvent), + of(ProxyClientConnectedEvent.class, (Consumer)this::handleProxyClientConnectedEvent), + of(ProxySpectatorConnectedEvent.class, (Consumer)this::handleProxySpectatorConnectedEvent), + of(ProxyClientDisconnectedEvent.class, (Consumer)this::handleProxyClientDisconnectedEvent), + of(NewPlayerInVisualRangeEvent.class, (Consumer)this::handleNewPlayerInVisualRangeEvent), + of(NonWhitelistedPlayerConnectedEvent.class, (Consumer)this::handleNonWhitelistedPlayerConnectedEvent), + of(ProxySpectatorDisconnectedEvent.class, (Consumer)this::handleProxySpectatorDisconnectedEvent), + of(ActiveHoursConnectEvent.class, (Consumer)this::handleActiveHoursConnectEvent), + of(ServerChatReceivedEvent.class, (Consumer)this::handleServerChatReceivedEvent), + of(ServerPlayerConnectedEvent.class, (Consumer)this::handleServerPlayerConnectedEvent), + of(ServerPlayerDisconnectedEvent.class, (Consumer)this::handleServerPlayerDisconnectedEvent), + of(DiscordMessageSentEvent.class, (Consumer)this::handleDiscordMessageSentEvent), + of(UpdateStartEvent.class, (Consumer)this::handleUpdateStartEvent), + of(ServerRestartingEvent.class, (Consumer)this::handleServerRestartingEvent), + of(ProxyLoginFailedEvent.class, (Consumer)this::handleProxyLoginFailedEvent), + of(StartConnectEvent.class, (Consumer)this::handleStartConnectEvent), + of(PrioStatusUpdateEvent.class, (Consumer)this::handlePrioStatusUpdateEvent), + of(PrioBanStatusUpdateEvent.class, (Consumer)this::handlePrioBanStatusUpdateEvent), + of(AntiAfkStuckEvent.class, (Consumer)this::handleAntiAfkStuckEvent), + of(AutoReconnectEvent.class, (Consumer)this::handleAutoReconnectEvent), + of(MsaDeviceCodeLoginEvent.class, (Consumer)this::handleMsaDeviceCodeLoginEvent), + of(DeathMessageEvent.class, (Consumer)this::handleDeathMessageEvent) + ); + } + public void start() { this.client = DiscordClientBuilder.create(CONFIG.discord.token) .build() @@ -81,14 +121,14 @@ public void start() { .setInitialPresence(shardInfo -> disconnectedPresence.get()) .login() .block(); - EVENT_BUS.subscribe(this); + initEventHandlers(); restClient = client.getRestClient(); client.getEventDispatcher().on(MessageCreateEvent.class).subscribe(event -> { if (CONFIG.discord.chatRelay.channelId.length() > 0 && event.getMessage().getChannelId().equals(Snowflake.of(CONFIG.discord.chatRelay.channelId))) { if (!event.getMember().get().getId().equals(this.client.getSelfId())) { - EVENT_BUS.dispatch(new DiscordMessageSentEvent(sanitizeRelayInputMessage(event.getMessage().getContent()))); + EVENT_BUS.postAsync(new DiscordMessageSentEvent(sanitizeRelayInputMessage(event.getMessage().getContent()))); return; } } @@ -188,7 +228,152 @@ private void handleProxyUpdateComplete() { .build()); } - @Subscribe + private void sendQueueWarning() { + sendEmbedMessage((CONFIG.discord.queueWarning.mentionRole ? "<@&" + CONFIG.discord.accountOwnerRoleId + ">" : ""), EmbedCreateSpec.builder() + .title("Queue Warning") + .addField("Queue Position", "[" + queuePositionStr() + "]", false) + .color(Color.MOON_YELLOW) + .build()); + } + + private String queuePositionStr() { + if (Proxy.getInstance().getIsPrio().isPresent()) { + if (Proxy.getInstance().getIsPrio().get()) { + return Proxy.getInstance().getQueuePosition() + " / " + Queue.getQueueStatus().prio + " - ETA: " + Queue.getQueueEta(Proxy.getInstance().getQueuePosition()); + } else { + return Proxy.getInstance().getQueuePosition() + " / " + Queue.getQueueStatus().regular + " - ETA: " + Queue.getQueueEta(Proxy.getInstance().getQueuePosition()); + } + } else { + return "?"; + } + } + + static boolean validateButtonInteractionEventFromAccountOwner(final ButtonInteractionEvent event) { + return event.getInteraction().getMember() + .map(m -> m.getRoleIds().stream() + .map(Snowflake::asString) + .anyMatch(roleId -> roleId.equals(CONFIG.discord.accountOwnerRoleId))) + .orElse(false); + } + + + private EmbedCreateSpec getUpdateMessage() { + return EmbedCreateSpec.builder() + .title("Updating and restarting...") + .color(Color.CYAN) + .build(); + } + + public static boolean isAllowedChatCharacter(char c0) { + return c0 != 167 && c0 >= 32 && c0 != 127; + } + + public static String sanitizeRelayInputMessage(final String input) { + StringBuilder stringbuilder = new StringBuilder(); + for (char c0 : input.toCharArray()) { + if (isAllowedChatCharacter(c0)) { + stringbuilder.append(c0); + } + } + return stringbuilder.toString(); + } + + public void updateProfileImage(final BufferedImage bufferedImage) { + try { + ByteArrayOutputStream os = new ByteArrayOutputStream(); + ImageIO.write(bufferedImage, "png", os); + this.restClient.edit(ImmutableUserModifyRequest.builder() + .avatar("data:image/png;base64," + Base64.getEncoder().encodeToString(os.toByteArray())) + .build()) + .block(); + } catch (final Exception e) { + DISCORD_LOG.error("Failed updating discord profile image", e); + } + } + + public void sendEmbedMessage(EmbedCreateSpec embedCreateSpec) { + try { + mainChannelMessageQueue.add(MessageCreateSpec.builder() + .addEmbed(embedCreateSpec) + .build().asRequest()); + TERMINAL_MANAGER.logEmbedOutput(embedCreateSpec); + } catch (final Exception e) { + DISCORD_LOG.error("Failed sending discord embed message", e); + } + } + + private void sendEmbedMessage(String message, EmbedCreateSpec embedCreateSpec) { + try { + mainChannelMessageQueue.add(MessageCreateSpec.builder() + .content(message) + .addEmbed(embedCreateSpec) + .build().asRequest()); + TERMINAL_LOG.info(message); + TERMINAL_MANAGER.logEmbedOutput(embedCreateSpec); + } catch (final Exception e) { + DISCORD_LOG.error("Failed sending discord embed message", e); + } + } + + public void sendMessage(final String message) { + try { + mainChannelMessageQueue.add(MessageCreateSpec.builder() + .content(message) + .build().asRequest()); + TERMINAL_LOG.info(message); + } catch (final Exception e) { + DISCORD_LOG.error("Failed sending discord message", e); + } + } + + private void sendEmbedMessageWithButtons(String message, EmbedCreateSpec embedCreateSpec, List