Skip to content

Commit

Permalink
Simplified event bus without reflection
Browse files Browse the repository at this point in the history
  • Loading branch information
rfresh2 committed Aug 9, 2023
1 parent 7ccc996 commit a4c90a2
Show file tree
Hide file tree
Showing 76 changed files with 523 additions and 746 deletions.
1 change: 0 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,6 @@ dependencies {
// uncomment to use io_uring
// shade 'io.netty.incubator:netty-incubator-transport-native-io_uring:0.0.21.Final:linux-x86_64'

shade "com.github.collarmc:pounce:d921e2c156"
shade('org.redisson:redisson:3.23.2') {
exclude group: 'io.netty'
}
Expand Down
101 changes: 56 additions & 45 deletions src/main/java/com/zenith/Proxy.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package com.zenith;

import ch.qos.logback.classic.LoggerContext;
import com.collarmc.pounce.Subscribe;
import com.github.steveice10.mc.protocol.MinecraftConstants;
import com.github.steveice10.mc.protocol.MinecraftProtocol;
import com.github.steveice10.mc.protocol.packet.ingame.server.ServerChatPacket;
import com.github.steveice10.packetlib.BuiltinFlags;
import com.github.steveice10.packetlib.tcp.TcpServer;
import com.zenith.cache.data.PlayerCache;
import com.zenith.event.Subscription;
import com.zenith.event.proxy.*;
import com.zenith.feature.autoupdater.AutoUpdater;
import com.zenith.feature.autoupdater.GitAutoUpdater;
Expand Down Expand Up @@ -45,10 +45,12 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static com.zenith.Shared.*;
import static com.zenith.util.Pair.of;
import static java.util.Objects.isNull;
import static java.util.Objects.nonNull;

Expand Down Expand Up @@ -77,6 +79,7 @@ public class Proxy {
@Getter
@Setter
private AutoUpdater autoUpdater;
private Subscription eventSubscription;

public static void main(String... args) {
SLF4JBridgeHandler.removeHandlersForRootLogger();
Expand All @@ -98,11 +101,29 @@ public static String determineVersion() {
}
}

@SuppressWarnings("unchecked")
public void initEventHandlers() {
if (eventSubscription != null) throw new RuntimeException("Event handlers already initialized");
eventSubscription = EVENT_BUS.subscribe(
of(DisconnectEvent.class, (Consumer<DisconnectEvent>)this::handleDisconnectEvent),
of(ConnectEvent.class, (Consumer<ConnectEvent>)this::handleConnectEvent),
of(StartQueueEvent.class, (Consumer<StartQueueEvent>)this::handleStartQueueEvent),
of(QueuePositionUpdateEvent.class, (Consumer<QueuePositionUpdateEvent>)this::handleQueuePositionUpdateEvent),
of(QueueCompleteEvent.class, (Consumer<QueueCompleteEvent>)this::handleQueueCompleteEvent),
of(PlayerOnlineEvent.class, (Consumer<PlayerOnlineEvent>)this::handlePlayerOnlineEvent),
of(ProxyClientDisconnectedEvent.class, (Consumer<ProxyClientDisconnectedEvent>)this::handleProxyClientDisconnectedEvent),
of(ServerRestartingEvent.class, (Consumer<ServerRestartingEvent>)this::handleServerRestartingEvent),
of(PrioStatusEvent.class, (Consumer<PrioStatusEvent>)this::handlePrioStatusEvent),
of(ServerPlayerConnectedEvent.class, (Consumer<ServerPlayerConnectedEvent>)this::handleServerPlayerConnectedEvent),
of(ServerPlayerDisconnectedEvent.class, (Consumer<ServerPlayerDisconnectedEvent>)this::handleServerPlayerDisconnectedEvent)
);
}

public void start() {
loadConfig();
loadLaunchConfig();
DEFAULT_LOG.info("Starting ZenithProxy-{}", LAUNCH_CONFIG.version);
EVENT_BUS.subscribe(this);
initEventHandlers();
try {
if (CONFIG.interactiveTerminal.enable) {
TERMINAL_MANAGER.start();
Expand Down Expand Up @@ -265,13 +286,13 @@ public void connectAndCatchExceptions() {
*/
public synchronized void connect() {
try {
EVENT_BUS.dispatch(new StartConnectEvent());
EVENT_BUS.postAsync(new StartConnectEvent());
this.logIn();
} catch (final RuntimeException e) {
EVENT_BUS.dispatch(new ProxyLoginFailedEvent());
EVENT_BUS.post(new ProxyLoginFailedEvent());
getActiveConnections().forEach(connection -> connection.disconnect("Login failed"));
SCHEDULED_EXECUTOR_SERVICE.schedule(() -> {
EVENT_BUS.dispatch(new DisconnectEvent("Login Failed"));
EVENT_BUS.post(new DisconnectEvent("Login Failed"));
}, 1L, TimeUnit.SECONDS);
return;
}
Expand Down Expand Up @@ -402,30 +423,6 @@ public List<ServerConnection> getSpectatorConnections() {
.collect(Collectors.toList());
}

@Subscribe
public void handleDisconnectEvent(DisconnectEvent event) {
CACHE.reset(true);
this.disconnectTime = Instant.now();
this.inQueue = false;
this.queuePosition = 0;
if (!CONFIG.client.extra.utility.actions.autoDisconnect.autoClientDisconnect) {
// skip autoreconnect when we want to sync client disconnect
if (CONFIG.client.extra.autoReconnect.enabled && isReconnectableDisconnect(event.reason)) {
if (autoReconnectIsInProgress()) {
return;
}
this.autoReconnectFuture = Optional.of(SCHEDULED_EXECUTOR_SERVICE.submit(() -> {
delayBeforeReconnect();
synchronized (this.autoReconnectFuture) {
if (this.autoReconnectFuture.isPresent()) this.connect();
this.autoReconnectFuture = Optional.empty();
}
}));
}
}
TPS_CALCULATOR.reset();
}

public void delayBeforeReconnect() {
try {
final int countdown;
Expand All @@ -443,7 +440,7 @@ public void delayBeforeReconnect() {
// countdown = CONFIG.client.extra.autoReconnect.delaySeconds
// + CONFIG.client.extra.autoReconnect.linearIncrease * this.reconnectCounter++;
// }
EVENT_BUS.dispatch(new AutoReconnectEvent(countdown));
EVENT_BUS.postAsync(new AutoReconnectEvent(countdown));
for (int i = countdown; SHOULD_RECONNECT && i > 0; i--) {
if (i % 10 == 0) CLIENT_LOG.info("Reconnecting in {}", i);
Wait.waitALittle(1);
Expand All @@ -457,7 +454,7 @@ public void updatePrioBanStatus() {
if (!CONFIG.client.extra.prioBan2b2tCheck || !CONFIG.client.server.address.toLowerCase(Locale.ROOT).contains("2b2t.org")) return;
this.isPrioBanned = PRIORITY_BAN_CHECKER.checkPrioBan();
if (this.isPrioBanned.isPresent() && !this.isPrioBanned.get().equals(CONFIG.authentication.prioBanned)) {
EVENT_BUS.dispatch(new PrioBanStatusUpdateEvent(this.isPrioBanned.get()));
EVENT_BUS.postAsync(new PrioBanStatusUpdateEvent(this.isPrioBanned.get()));
CONFIG.authentication.prioBanned = this.isPrioBanned.get();
saveConfig();
CLIENT_LOG.info("Prio Ban Change Detected: " + this.isPrioBanned.get());
Expand Down Expand Up @@ -489,7 +486,7 @@ private void handleActiveHoursTick() {
})
.findAny()
.ifPresent(t -> {
EVENT_BUS.dispatch(new ActiveHoursConnectEvent());
EVENT_BUS.postAsync(new ActiveHoursConnectEvent());
this.lastActiveHoursConnect = Instant.now();
disconnect(SYSTEM_DISCONNECT);
Wait.waitALittle(30);
Expand Down Expand Up @@ -526,53 +523,69 @@ public void updateFavicon() {
}
}

@Subscribe
public void handleDisconnectEvent(DisconnectEvent event) {
CACHE.reset(true);
this.disconnectTime = Instant.now();
this.inQueue = false;
this.queuePosition = 0;
if (!CONFIG.client.extra.utility.actions.autoDisconnect.autoClientDisconnect) {
// skip autoreconnect when we want to sync client disconnect
if (CONFIG.client.extra.autoReconnect.enabled && isReconnectableDisconnect(event.reason)) {
if (autoReconnectIsInProgress()) {
return;
}
this.autoReconnectFuture = Optional.of(SCHEDULED_EXECUTOR_SERVICE.submit(() -> {
delayBeforeReconnect();
synchronized (this.autoReconnectFuture) {
if (this.autoReconnectFuture.isPresent()) this.connect();
this.autoReconnectFuture = Optional.empty();
}
}));
}
}
TPS_CALCULATOR.reset();
}


public void handleConnectEvent(ConnectEvent event) {
this.connectTime = Instant.now();
cancelAutoReconnect();
}

@Subscribe
public void handleStartQueueEvent(StartQueueEvent event) {
this.inQueue = true;
this.queuePosition = 0;
updatePrioBanStatus();
}

@Subscribe
public void handleQueuePositionUpdateEvent(QueuePositionUpdateEvent event) {
this.queuePosition = event.position;
}

@Subscribe
public void handleQueueCompleteEvent(QueueCompleteEvent event) {
this.inQueue = false;
this.connectTime = Instant.now();
}

@Subscribe
public void handlePlayerOnlineEvent(PlayerOnlineEvent event) {
if (!this.isPrio.isPresent()) {
// assume we are prio if we skipped queuing
EVENT_BUS.dispatch(new PrioStatusEvent(true));
EVENT_BUS.postAsync(new PrioStatusEvent(true));
}
PlayerCache.sync();
}

@Subscribe
public void handleClientDisconnect(final ProxyClientDisconnectedEvent e) {
public void handleProxyClientDisconnectedEvent(final ProxyClientDisconnectedEvent e) {
PlayerCache.sync();
}

@Subscribe
public void handleServerRestartingEvent(ServerRestartingEvent event) {
if (!CONFIG.authentication.prio && isNull(getCurrentPlayer().get())) {
Wait.waitRandomWithinMsBound(30000);
disconnect(SERVER_RESTARTING, new Exception());
}
}

@Subscribe
public void handlePrioStatusEvent(PrioStatusEvent event) {
if (CONFIG.client.server.address.toLowerCase().contains("2b2t.org")) {
if (event.prio == CONFIG.authentication.prio) {
Expand All @@ -582,15 +595,14 @@ public void handlePrioStatusEvent(PrioStatusEvent event) {
}
} else {
CLIENT_LOG.info("Prio Change Detected: " + event.prio);
EVENT_BUS.dispatch(new PrioStatusUpdateEvent(event.prio));
EVENT_BUS.postAsync(new PrioStatusUpdateEvent(event.prio));
this.isPrio = Optional.of(event.prio);
CONFIG.authentication.prio = event.prio;
saveConfig();
}
}
}

@Subscribe
public void handleServerPlayerConnectedEvent(ServerPlayerConnectedEvent event) {
if (CONFIG.client.extra.chat.showConnectionMessages) {
ServerConnection serverConnection = getCurrentPlayer().get();
Expand All @@ -600,7 +612,6 @@ public void handleServerPlayerConnectedEvent(ServerPlayerConnectedEvent event) {
}
}

@Subscribe
public void handleServerPlayerDisconnectedEvent(ServerPlayerDisconnectedEvent event) {
if (CONFIG.client.extra.chat.showConnectionMessages) {
ServerConnection serverConnection = getCurrentPlayer().get();
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/com/zenith/Shared.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package com.zenith;

import com.collarmc.pounce.EventBus;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.zenith.cache.DataCache;
import com.zenith.command.CommandManager;
import com.zenith.database.DatabaseManager;
import com.zenith.discord.DiscordBot;
import com.zenith.event.SimpleEventBus;
import com.zenith.feature.pathing.Pathing;
import com.zenith.feature.pathing.World;
import com.zenith.feature.pathing.blockdata.BlockDataManager;
Expand Down Expand Up @@ -83,7 +83,7 @@ public static boolean isReconnectableDisconnect(final String reason) {
public static LaunchConfig LAUNCH_CONFIG;
public static final DataCache CACHE;
public static final DiscordBot DISCORD_BOT;
public static final EventBus EVENT_BUS;
public static final SimpleEventBus EVENT_BUS;
public static final ScheduledExecutorService SCHEDULED_EXECUTOR_SERVICE;
public static final WhitelistManager WHITELIST_MANAGER;
public static final PriorityBanChecker PRIORITY_BAN_CHECKER;
Expand Down Expand Up @@ -306,7 +306,7 @@ public static synchronized void saveLaunchConfig() {
});
SCHEDULED_EXECUTOR_SERVICE = Executors.newScheduledThreadPool(16);
DISCORD_BOT = new DiscordBot();
EVENT_BUS = new EventBus(Runnable::run);
EVENT_BUS = new SimpleEventBus();
CACHE = new DataCache();
WHITELIST_MANAGER = new WhitelistManager();
PRIORITY_BAN_CHECKER = new PriorityBanChecker();
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/com/zenith/command/CommandManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public void init() {
new ActiveHoursCommand(),
new AntiAFKCommand(),
new AutoDisconnectCommand(),
new AutoEatCommand(),
new AutoReconnectCommand(),
new AutoReplyCommand(),
new AutoRespawnCommand(),
new AutoTotemCommand(),
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/com/zenith/command/impl/UpdateCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public CommandUsage commandUsage() {
public LiteralArgumentBuilder<CommandContext> register() {
return command("update").requires(Command::validateAccountOwner).executes(c -> {
try {
EVENT_BUS.dispatch(new UpdateStartEvent());
EVENT_BUS.post(new UpdateStartEvent());
CONFIG.discord.isUpdating = true;
if (Proxy.getInstance().isConnected()) {
CONFIG.autoUpdater.shouldReconnectAfterAutoUpdate = true;
Expand All @@ -42,7 +42,7 @@ public LiteralArgumentBuilder<CommandContext> register() {
}).then(literal("c").executes(c -> {
CONFIG.discord.isUpdating = true;
CONFIG.autoUpdater.shouldReconnectAfterAutoUpdate = true;
EVENT_BUS.dispatch(new UpdateStartEvent());
EVENT_BUS.post(new UpdateStartEvent());
Proxy.getInstance().stop();
}));
}
Expand Down
14 changes: 10 additions & 4 deletions src/main/java/com/zenith/database/ChatDatabase.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package com.zenith.database;

import com.collarmc.pounce.Subscribe;
import com.zenith.Proxy;
import com.zenith.database.dto.tables.Chats;
import com.zenith.database.dto.tables.records.ChatsRecord;
import com.zenith.event.Subscription;
import com.zenith.event.proxy.ServerChatReceivedEvent;
import org.jooq.DSLContext;
import org.jooq.InsertSetMoreStep;
Expand All @@ -16,8 +16,7 @@
import java.time.ZoneOffset;
import java.util.UUID;

import static com.zenith.Shared.CONFIG;
import static com.zenith.Shared.DATABASE_LOG;
import static com.zenith.Shared.*;

public class ChatDatabase extends LockingDatabase {
public ChatDatabase(QueryExecutor queryExecutor, RedisClient redisClient) {
Expand All @@ -44,7 +43,14 @@ public Instant getLastEntryTime() {
return chatsRecord.get(c.TIME).toInstant();
}

@Subscribe
@Override
public Subscription initEvents() {
return EVENT_BUS.subscribe(
ServerChatReceivedEvent.class, this::handleServerChatReceivedEvent
);
}


public void handleServerChatReceivedEvent(ServerChatReceivedEvent event) {
if (!CONFIG.client.server.address.endsWith("2b2t.org") // only write on 2b2t
|| Proxy.getInstance().isInQueue() // ignore queue
Expand Down
Loading

0 comments on commit a4c90a2

Please sign in to comment.