Skip to content

Commit

Permalink
added request/response + fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Thiies committed Mar 28, 2024
1 parent 5f13c47 commit 737fefa
Show file tree
Hide file tree
Showing 23 changed files with 432 additions and 39 deletions.
11 changes: 5 additions & 6 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,8 @@ allprojects {
apply(plugin = "java-library")
apply(plugin = "maven-publish")


group = "dev.httpmarco"
version = "1.0.27-SNAPSHOT"
version = "1.0.35-SNAPSHOT"

repositories {
mavenCentral()
Expand Down Expand Up @@ -67,11 +66,11 @@ allprojects {
nexusPublishing {
repositories {
sonatype {
nexusUrl.set(uri("https://s01.oss.sonatype.org/service/local/"))
snapshotRepositoryUrl.set(uri("https://s01.oss.sonatype.org/content/repositories/snapshots/"))
nexusUrl.set(uri("https://nexus.bytemc.de/repository/maven-public/"))
snapshotRepositoryUrl.set(uri("https://nexus.bytemc.de/repository/maven-public/"))

username.set(System.getenv("ossrhUsername")?.toString() ?: "")
password.set(System.getenv("ossrhPassword")?.toString() ?: "")
username.set(System.getenv("BYTEMC_REPO_USER")?.toString() ?: "")
password.set(System.getenv("BYTEMC_REPO_PASSWORD")?.toString() ?: "")
}
}
useStaging.set(!project.rootProject.version.toString().endsWith("-SNAPSHOT"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ public JsonObjectSerializer(String gsonObject) {
this.jsonObject = JsonUtils.getGson().fromJson(gsonObject, JsonObject.class);
}

public boolean has(String key) {
return jsonObject.has(key);
}

public JsonObjectSerializer append(String key, String value) {
jsonObject.addProperty(key, value);
return this;
Expand Down
2 changes: 2 additions & 0 deletions osgan-netty/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
dependencies {
implementation(libs.netty5)
compileOnly(libs.gson)
api(project(":osgan-utils"))
api(project(":osgan-files"))
api(project(":osgan-reflections"))

testImplementation(platform("org.junit:junit-bom:5.10.2"))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package dev.httpmarco.osgan.networking;

import dev.httpmarco.osgan.files.json.JsonUtils;
import dev.httpmarco.osgan.networking.packet.ForwardPacket;
import io.netty5.channel.Channel;
import lombok.*;
import lombok.experimental.Accessors;
Expand All @@ -15,6 +17,14 @@ public final class ChannelTransmit {
private final Channel channel;

public <P extends Packet> void sendPacket(P object) {
this.sendPacket(this.channel, object);
}

public <P extends Packet> void sendPacket(Channel channel, P object) {
channel.writeAndFlush(object);
}

public <P extends Packet> void redirectPacket(String id, P object) {
this.sendPacket(new ForwardPacket(id, object.getClass().getName(), JsonUtils.toJson(object)));
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package dev.httpmarco.osgan.networking;

import dev.httpmarco.osgan.files.json.JsonObjectSerializer;
import dev.httpmarco.osgan.networking.listening.ChannelPacketListener;
import dev.httpmarco.osgan.networking.request.PacketResponder;
import dev.httpmarco.osgan.networking.request.RequestHandler;
import dev.httpmarco.osgan.utils.executers.FutureResult;
import io.netty5.channel.Channel;
import io.netty5.channel.EventLoopGroup;
Expand All @@ -13,6 +16,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;

@Getter
@Accessors(fluent = true)
Expand All @@ -24,12 +28,22 @@ public abstract class CommunicationComponent<M extends Metadata> {
private final M metadata;
private final EventLoopGroup bossGroup;
private final Map<Class<? extends Packet>, List<ChannelPacketListener<? extends Packet>>> packetListeners = new HashMap<>();
private final RequestHandler requestHandler;

public CommunicationComponent(M metadata, int workerThreads) {
this.bossGroup = NetworkUtils.createEventLoopGroup(workerThreads);
this.metadata = metadata;
this.requestHandler = new RequestHandler(this);
}

public abstract boolean isServer();

public abstract <P extends Packet> void sendPacket(P packet);

public abstract <P extends Packet> void sendPacket(Channel channel, P packet);

public abstract <P extends Packet> void redirectPacket(String id, P packet);

public FutureListener<? super Channel> handleConnectionRelease() {
return it -> {
if (it.isSuccess()) {
Expand Down Expand Up @@ -59,7 +73,19 @@ public void callPacketReceived(ChannelTransmit transmit, Packet packet) {
}
}

public <P extends Packet> void listening(Class<P> packetClass, ChannelPacketListener<P> listener) {
public <P extends Packet> void listen(Class<P> packetClass, ChannelPacketListener<P> listener) {
this.packetListeners.computeIfAbsent(packetClass, it -> new ArrayList<>()).add(listener);
}

public <T extends Packet> void request(String id, Class<T> responsePacket, Consumer<T> consumer) {
this.requestHandler.request(id, responsePacket, consumer);
}

public <T extends Packet> void request(String id, JsonObjectSerializer properties, Class<T> responsePacket, Consumer<T> consumer) {
this.requestHandler.request(id, properties, responsePacket, consumer);
}

public <T extends Packet> void registerResponder(String id, PacketResponder<T> responder) {
this.requestHandler.registerResponder(id, responder);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,19 @@ public final class CommunicationComponentHandler extends SimpleChannelInboundHan
@Override
protected void messageReceived(ChannelHandlerContext ctx, Packet packet) {
this.onPacketReceived.listenWithMapping(new ChannelTransmit(ctx.channel()), packet);
System.out.println("Received packet " + packet.getClass().getSimpleName() + " from " + ctx.channel().remoteAddress());
}

@Override
public void channelActive(@NotNull ChannelHandlerContext ctx) {
this.supplyChannelTransmit(ctx.channel(), this.onActive);
System.out.println("Channel active: " + ctx.channel().remoteAddress());
}

@Override
public void channelInactive(@NotNull ChannelHandlerContext ctx) {
this.supplyChannelTransmit(ctx.channel(), this.onInactive);
System.out.println("Connection closed with " + ctx.channel().remoteAddress());
}

private void supplyChannelTransmit(Channel channel, ChannelConsumer consumer) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
package dev.httpmarco.osgan.networking.client;

import dev.httpmarco.osgan.files.json.JsonUtils;
import dev.httpmarco.osgan.networking.*;
import dev.httpmarco.osgan.networking.packet.ChannelTransmitAuthPacket;
import dev.httpmarco.osgan.networking.packet.ForwardPacket;
import dev.httpmarco.osgan.networking.request.packets.BadResponsePacket;
import dev.httpmarco.osgan.networking.request.packets.RequestPacket;
import dev.httpmarco.osgan.networking.request.packets.ResponsePacket;
import dev.httpmarco.osgan.utils.executers.FutureResult;
import io.netty5.bootstrap.Bootstrap;
import io.netty5.channel.Channel;
import io.netty5.channel.ChannelOption;
import io.netty5.channel.epoll.Epoll;
import lombok.Getter;
Expand All @@ -29,17 +35,20 @@ public NettyClient(ClientMetadata metadata) {
.handler(new ChannelInitializer(CommunicationComponentHandler
.builder()
.onActive(it -> {
if (metadata.id() == null) {
if (metadata.id() != null) {
it.sendPacket(new ChannelTransmitAuthPacket(metadata().id()));
}
this.transmit = it;
})
.onInactive(it -> {
if ((metadata.hasReconnection())) {
this.reconnectQueue.start();
System.out.println("Starting reconnect queue...");
}

this.transmit = null;
})
.onPacketReceived(this::callPacketReceived)
.build()))
.option(ChannelOption.AUTO_READ, true)
.option(ChannelOption.TCP_NODELAY, true)
Expand All @@ -49,6 +58,35 @@ public NettyClient(ClientMetadata metadata) {
if (Epoll.isTcpFastOpenClientSideAvailable()) {
bootstrap.option(ChannelOption.TCP_FASTOPEN_CONNECT, true);
}

this.listen(ForwardPacket.class, (transmit, packet) -> {
if (packet.id().equals(metadata.id())) {
try {
this.callPacketReceived(transmit, (Packet) JsonUtils.fromJson(packet.packetJson(), Class.forName(packet.className())));
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
}
});

this.listen(RequestPacket.class, (transmit, packet) -> {
if (this.requestHandler().isResponderPresent(packet.id())) {
this.sendPacket(new ResponsePacket(packet.uniqueId(), JsonUtils.toJson(this.requestHandler().getResponder(packet.id()).response(transmit, packet.properties()))));
}
});
this.listen(BadResponsePacket.class, (transmit, packet) -> {
if (this.requestHandler().isRequestPresent(packet.uniqueId())) {
this.requestHandler().removeRequest(packet.uniqueId());

System.out.println("Received bad response for request '" + packet.uniqueId() + "': " + packet.message());
}
});
this.listen(ResponsePacket.class, (transmit, packet) -> {
if (this.requestHandler().isRequestPresent(packet.uniqueId())) {
this.requestHandler().acceptRequest(packet.uniqueId(), packet.packetJson());
}
});

this.connect();
}

Expand Down Expand Up @@ -77,9 +115,29 @@ public void connect() {
});
}

@Override
public <P extends Packet> void sendPacket(P packet) {
if (this.transmit != null) {
this.transmit.sendPacket(packet);
}
}

@Override
public <P extends Packet> void sendPacket(Channel channel, P packet) {
if (this.transmit != null) {
this.transmit.sendPacket(channel, packet);
}
}

@Override
public <P extends Packet> void redirectPacket(String id, P packet) {
if (this.transmit != null) {
this.transmit.redirectPacket(id, packet);
}
}

@Override
public boolean isServer() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,13 @@ public boolean readBoolean() {
return this.buffer.readBoolean();
}

public CodecBuffer writeUUID(UUID uuid) {
this.buffer.writeLong(uuid.getMostSignificantBits());
this.buffer.writeLong(uuid.getLeastSignificantBits());
public CodecBuffer writeUniqueId(UUID uniqueId) {
this.buffer.writeLong(uniqueId.getMostSignificantBits());
this.buffer.writeLong(uniqueId.getLeastSignificantBits());
return this;
}

public UUID readUUID() {
public UUID readUniqueId() {
return new UUID(this.buffer.readLong(), this.buffer.readLong());
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package dev.httpmarco.osgan.networking.codec;

import dev.httpmarco.osgan.files.json.JsonObjectSerializer;
import dev.httpmarco.osgan.networking.Packet;
import dev.httpmarco.osgan.networking.annotation.PacketIgnore;
import dev.httpmarco.osgan.networking.annotation.PacketIncludeObject;
Expand Down Expand Up @@ -98,7 +99,10 @@ private boolean encodeParameter(CodecBuffer buffer, Object parameter) {
buffer.writeByte((Byte) parameter);
return true;
} else if (type.equals(UUID.class)) {
buffer.writeUUID((UUID) parameter);
buffer.writeUniqueId((UUID) parameter);
return true;
} else if (type.equals(JsonObjectSerializer.class)) {
buffer.writeString(parameter.toString());
return true;
} else if (type.isEnum()) {
buffer.writeEnum((Enum<?>) parameter);
Expand Down Expand Up @@ -209,7 +213,9 @@ private Object decodeObject(@NotNull CodecBuffer buffer) throws ClassNotFoundExc
} else if (type.equals(Byte.class) || type.equals(byte.class)) {
return buffer.readByte();
} else if (type.equals(UUID.class)) {
return buffer.readUUID();
return buffer.readUniqueId();
} else if (type.equals(JsonObjectSerializer.class)) {
return new JsonObjectSerializer(buffer.readString());
} else if (type.isEnum()) {
return buffer.readEnum((Class<? extends Enum<?>>) type);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ public interface ChannelPacketListener<P> {
void listen(ChannelTransmit channel, P packet);

@SuppressWarnings("unchecked")
default void listenWithMapping(ChannelTransmit channel, Packet packet) {
listen(channel, (P) packet);
default void listenWithMapping(ChannelTransmit transmit, Packet packet) {
listen(transmit, (P) packet);
}

}

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package dev.httpmarco.osgan.networking.packet;

import dev.httpmarco.osgan.networking.Packet;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.experimental.Accessors;

@Getter
@Accessors(fluent = true)
@AllArgsConstructor
public class ForwardPacket implements Packet {
private String id;
private String className;
private String packetJson;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* MIT License
*
* Copyright (c) 2024 nextCluster
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package dev.httpmarco.osgan.networking.request;

import dev.httpmarco.osgan.files.json.JsonObjectSerializer;
import dev.httpmarco.osgan.networking.ChannelTransmit;
import dev.httpmarco.osgan.networking.Packet;
import io.netty5.channel.Channel;

public interface PacketResponder<T extends Packet> {

T response(ChannelTransmit transmit, JsonObjectSerializer properties);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package dev.httpmarco.osgan.networking.request;

import dev.httpmarco.osgan.networking.ChannelTransmit;
import io.netty5.channel.Channel;

import java.util.UUID;

public record PendingRequest(ChannelTransmit transmit, String id, UUID uniqueId, long timestamp) {
}
Loading

0 comments on commit 737fefa

Please sign in to comment.