diff --git a/consensusj-jsonrpc-unix/build.gradle b/consensusj-jsonrpc-unix/build.gradle new file mode 100644 index 000000000..64d54c2fe --- /dev/null +++ b/consensusj-jsonrpc-unix/build.gradle @@ -0,0 +1,14 @@ +plugins { + id 'java-library' +} + +dependencies { + api project(':consensusj-jsonrpc') + + testImplementation "org.slf4j:slf4j-jdk14:${slf4jVersion}" // Runtime implementation of slf4j +} + +java { + sourceCompatibility = JavaVersion.toVersion("17") + targetCompatibility = JavaVersion.toVersion("17") +} diff --git a/consensusj-jsonrpc-unix/src/main/java/org/consensusj/jsonrpc/unix/JsonRpcClientUnixSocket.java b/consensusj-jsonrpc-unix/src/main/java/org/consensusj/jsonrpc/unix/JsonRpcClientUnixSocket.java new file mode 100644 index 000000000..f0439550f --- /dev/null +++ b/consensusj-jsonrpc-unix/src/main/java/org/consensusj/jsonrpc/unix/JsonRpcClientUnixSocket.java @@ -0,0 +1,77 @@ +package org.consensusj.jsonrpc.unix; + +import com.fasterxml.jackson.databind.JavaType; +import com.fasterxml.jackson.databind.JsonNode; +import org.consensusj.jsonrpc.AbstractRpcClient; +import org.consensusj.jsonrpc.JsonRpcMessage; +import org.consensusj.jsonrpc.JsonRpcRequest; +import org.consensusj.jsonrpc.JsonRpcResponse; +import org.consensusj.jsonrpc.JsonRpcStatusException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.StandardProtocolFamily; +import java.net.URI; +import java.net.UnixDomainSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; +import java.nio.file.Path; + +/** + * Proof-of-concept UNIX domain socket JsonRpc Client (works with {@link UnixSocketEchoServer} and {@code lightningd}.) + */ +public class JsonRpcClientUnixSocket extends AbstractRpcClient { + private static final Logger log = LoggerFactory.getLogger(JsonRpcClientUnixSocket.class); + private final UnixDomainSocketAddress socketAddress; + private final JsonRpcUnixSocketMapper socketMapper; + + public static void main(String[] args) throws IOException { + boolean useEchoServer = true; + Path socketPath = useEchoServer ? UnixSocketEchoServer.getTestPath() : getLightningRpcPath(); + JsonRpcClientUnixSocket socket = new JsonRpcClientUnixSocket(socketPath); + JsonNode response = socket.send("getinfo", JsonNode.class); + System.out.println(response); + } + + public JsonRpcClientUnixSocket(Path socketPath) { + super(JsonRpcMessage.Version.V2); + socketAddress = UnixDomainSocketAddress.of(socketPath); + socketMapper = new JsonRpcUnixSocketMapper(mapper); + + // TODO: Delete on close: + // Files.deleteIfExists(socketPath); + } + + @Override + public JsonRpcResponse sendRequestForResponse(JsonRpcRequest request, JavaType responseType) throws IOException, JsonRpcStatusException { + SocketChannel channel = SocketChannel.open(StandardProtocolFamily.UNIX); + // TODO: Use Selectable channel for async?? + channel.connect(socketAddress); + ByteBuffer buffer = socketMapper.serializeRequest(request); + while (buffer.hasRemaining()) { + channel.write(buffer); + } + // TODO: Read response see https://www.baeldung.com/java-unix-domain-socket + // See also: https://nipafx.dev/java-unix-domain-sockets/ + // And: https://www.linkedin.com/pulse/java-sockets-io-blocking-non-blocking-asynchronous-aliaksandr-liakh/ + JsonRpcResponse responseJson = null; + try { + responseJson = socketMapper.readSocketResponse(request, responseType, channel); + } catch (InterruptedException e) { + e.printStackTrace(); + } + channel.close(); + return responseJson; + } + + @Override + public URI getServerURI() { + return socketAddress.getPath().toUri(); + } + + public static Path getLightningRpcPath() { + return Path.of(System.getProperty("user.home")).resolve(".lightning/regtest/lightning-rpc"); + + } +} diff --git a/consensusj-jsonrpc-unix/src/main/java/org/consensusj/jsonrpc/unix/JsonRpcUnixSocketMapper.java b/consensusj-jsonrpc-unix/src/main/java/org/consensusj/jsonrpc/unix/JsonRpcUnixSocketMapper.java new file mode 100644 index 000000000..8ac4b591c --- /dev/null +++ b/consensusj-jsonrpc-unix/src/main/java/org/consensusj/jsonrpc/unix/JsonRpcUnixSocketMapper.java @@ -0,0 +1,98 @@ +package org.consensusj.jsonrpc.unix; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JavaType; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.consensusj.jsonrpc.JsonRpcRequest; +import org.consensusj.jsonrpc.JsonRpcResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; +import java.util.Optional; + +/** + * Common code for JsonRpc UNIX Sockets clients and servers + */ +public class JsonRpcUnixSocketMapper { + private static final Logger log = LoggerFactory.getLogger(JsonRpcClientUnixSocket.class); + + private final ObjectMapper mapper; + + /** + * @param mapper Jackson mapper + */ + public JsonRpcUnixSocketMapper(ObjectMapper mapper) { + this.mapper = mapper; + } + + public JsonRpcResponse readSocketResponse(JsonRpcRequest request, JavaType responseType, SocketChannel channel) throws IOException, InterruptedException { + Optional resp = Optional.empty(); + while (resp.isEmpty()) { + Thread.sleep(100); + resp = readSocketMessage(channel); + resp.ifPresent(System.out::println); + } + JsonRpcResponse responseJson = deserializeResponse(responseType, resp.orElseThrow()); + return responseJson; + } + + public Optional readSocketMessage(SocketChannel channel) throws IOException { + ByteBuffer buffer = ByteBuffer.allocate(10240); + int bytesRead = channel.read(buffer); + if (bytesRead < 0) + return Optional.empty(); + + byte[] bytes = new byte[bytesRead]; + buffer.flip(); + buffer.get(bytes); + String message = new String(bytes); + return Optional.of(message); + } + + public JsonRpcResponse deserializeResponse(JavaType responseType, String s) throws JsonProcessingException { + JsonRpcResponse responseJson; + log.debug("Response String: {}", s); + try { + responseJson = mapper.readValue(s, responseType); + } catch (JsonProcessingException e) { + log.error("JsonProcessingException: ", e); + // TODO: Map to some kind of JsonRPC exception similar to JsonRPCStatusException + throw e; + } + return responseJson; + } + + public JsonRpcRequest deserializeRequest(String s) throws JsonProcessingException { + JsonRpcRequest requestJson; + log.debug("Request String: {}", s); + try { + requestJson = mapper.readValue(s, JsonRpcRequest.class); + } catch (JsonProcessingException e) { + log.error("JsonProcessingException: ", e); + // TODO: Map to some kind of JsonRPC exception similar to JsonRPCStatusException + throw e; + } + return requestJson; + } + + public ByteBuffer serializeRequest(JsonRpcRequest request) throws JsonProcessingException { + String message = mapper.writeValueAsString(request); + ByteBuffer buffer = ByteBuffer.allocate(1024); + buffer.clear(); + buffer.put(message.getBytes()); + buffer.flip(); + return buffer; + } + + public ByteBuffer serializeResponse(JsonRpcResponse response) throws JsonProcessingException { + String message = mapper.writeValueAsString(response); + ByteBuffer buffer = ByteBuffer.allocate(1024); + buffer.clear(); + buffer.put(message.getBytes()); + buffer.flip(); + return buffer; + } +} diff --git a/consensusj-jsonrpc-unix/src/main/java/org/consensusj/jsonrpc/unix/UnixSocketEchoServer.java b/consensusj-jsonrpc-unix/src/main/java/org/consensusj/jsonrpc/unix/UnixSocketEchoServer.java new file mode 100644 index 000000000..6666417b2 --- /dev/null +++ b/consensusj-jsonrpc-unix/src/main/java/org/consensusj/jsonrpc/unix/UnixSocketEchoServer.java @@ -0,0 +1,88 @@ +package org.consensusj.jsonrpc.unix; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.consensusj.jsonrpc.JsonRpcError; +import org.consensusj.jsonrpc.JsonRpcRequest; +import org.consensusj.jsonrpc.JsonRpcResponse; + +import java.io.IOException; +import java.net.StandardProtocolFamily; +import java.net.UnixDomainSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.nio.file.Files; +import java.nio.file.Path; + +/** + * For possible implementation with CompletableFuture, + * See https://github.com/IBM/java-async-util/blob/master/asyncutil/src/test/java/com/ibm/asyncutil/examples/nio/nio.md#nio-bridge + * + */ +public class UnixSocketEchoServer { + private final UnixDomainSocketAddress socketAddress; + private final JsonRpcUnixSocketMapper socketMapper; + + public static void main(String[] args) throws IOException, InterruptedException { + Path socketPath = getTestPath(); + Files.deleteIfExists(socketPath); + UnixSocketEchoServer server = new UnixSocketEchoServer(socketPath); + server.run(); + } + + public UnixSocketEchoServer(Path socketPath) { + socketAddress = UnixDomainSocketAddress.of(socketPath); + socketMapper = new JsonRpcUnixSocketMapper(getMapper()); + + // TODO: Delete on close: + // Files.deleteIfExists(socketPath); + } + + public void run() throws IOException, InterruptedException { + ServerSocketChannel serverChannel = ServerSocketChannel.open(StandardProtocolFamily.UNIX); + serverChannel.bind(socketAddress); + SocketChannel channel = serverChannel.accept(); + while (true) { + var optMessage = socketMapper.readSocketMessage(channel); + if (optMessage.isPresent()) { + processMessage(channel, optMessage.get()); + } + channel.close(); + channel = serverChannel.accept(); + Thread.sleep(100); + } + } + + private void processMessage(SocketChannel channel, String message) throws IOException { + System.out.printf("[Client message] %s\n", message); + JsonRpcRequest request; + try { + request = socketMapper.deserializeRequest(message); + System.out.println("Got " + request.getMethod() + " request"); + JsonRpcResponse response = switch (request.getMethod()) { + case "getinfo" -> new JsonRpcResponse<>(request, "Echo GETINFO Response"); + default -> new JsonRpcResponse<>(request, JsonRpcError.of(JsonRpcError.Error.METHOD_NOT_FOUND)); + }; + ByteBuffer buffer = socketMapper.serializeResponse(response); + while (buffer.hasRemaining()) { + channel.write(buffer); + } + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } + + static ObjectMapper getMapper() { + var mapper = new ObjectMapper(); + // TODO: Provide external API to configure FAIL_ON_UNKNOWN_PROPERTIES + // TODO: Remove "ignore unknown" annotations on various POJOs that we've defined. + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + return mapper; + } + + static Path getTestPath() { + return Path.of(System.getProperty("user.home")).resolve("consensusj.socket"); + } +} diff --git a/settings.gradle b/settings.gradle index 4f628b56e..38ae6138a 100644 --- a/settings.gradle +++ b/settings.gradle @@ -42,8 +42,9 @@ include 'consensusj-currency', // JavaMoney Currency Provider(s include 'cj-btc-services' // bitcoinj-based service objects if (JavaVersion.current().compareTo(JavaVersion.VERSION_17) >= 0) { - System.err.println "Including JDK 17 module because Java ${JavaVersion.current()} is JDK 17+" + System.err.println "Including JDK 17 modules because Java ${JavaVersion.current()} is JDK 17+" include 'cj-bitcoinj-dsl-js' // JavaScript DSL for bitcoinj via Nashorn + include 'consensusj-jsonrpc-unix' // UNIX Domain Sockets for JSONRPC } else { System.err.println "Skipping 'cj-bitcoinj-dsl-js', requires Java 17+, currently running Java ${JavaVersion.current()}" }