Skip to content

Commit

Permalink
Object Store use over leaf-hub domain (#1160)
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf authored Jun 17, 2024
1 parent cb123c8 commit c17c25e
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 10 deletions.
3 changes: 1 addition & 2 deletions src/main/java/io/nats/client/impl/NatsObjectStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,7 @@ public ObjectInfo get(String objectName, OutputStream out) throws IOException, J
out.write(data);
}
else {

JetStreamSubscription sub = js.subscribe(pubSubChunkSubject(oi.getNuid()),
JetStreamSubscription sub = js.subscribe(rawChunkSubject(oi.getNuid()),
PushSubscribeOptions.builder().stream(streamName).ordered(true).build());

Message m = sub.nextMessage(Duration.ofSeconds(1));
Expand Down
57 changes: 56 additions & 1 deletion src/test/java/io/nats/client/impl/ObjectStoreTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ private ObjectInfo validateObjectInfo(ObjectInfo oi, String bucket, String objec
}

@SuppressWarnings("SameParameterValue")
private static Object[] getInput(int size) throws IOException {
private static Object[] getInput(int size) {
File found = null;
long foundLen = Long.MAX_VALUE;
final String classPath = System.getProperty("java.class.path", ".");
Expand Down Expand Up @@ -618,4 +618,59 @@ private void validateWatcher(Object[] expecteds, TestObjectStoreWatcher watcher)
}
}
}

@Test
public void testObjectStoreDomains() throws Exception {
runInJsHubLeaf((hubNc, leafNc) -> {
ObjectStoreManagement hubOsm = hubNc.objectStoreManagement();

// Create main OS on HUB
String hubBucket = bucket();
ObjectStoreStatus hubStatus = hubOsm.create(ObjectStoreConfiguration.builder()
.name(hubBucket)
.storageType(StorageType.Memory)
.replicas(1)
.build());
assertEquals(0, hubStatus.getSize());
assertEquals(1, hubStatus.getReplicas());

ObjectStore hubOs = hubNc.objectStore(hubBucket);
ObjectStore leafOs = leafNc.objectStore(hubBucket, ObjectStoreOptions.builder().jsDomain(HUB_DOMAIN).build());

String objectName = name();
ObjectMeta meta = ObjectMeta.builder(objectName)
.chunkSize(8 * 1024)
.build();

Object[] input = getInput(4 * 8 * 1024);
File file = (File)input[1];
InputStream in = Files.newInputStream(file.toPath());
hubOs.put(meta, in);

hubStatus = hubOs.getStatus();
assertTrue(hubStatus.getSize() > 0);

ObjectStoreStatus leafStatus = leafOs.getStatus();

assertEquals(hubStatus.getBucketName(), leafStatus.getBucketName());
assertEquals(hubStatus.getSize(), leafStatus.getSize());

ObjectInfo hubInfo = hubOs.getInfo(objectName);
ObjectInfo leafInfo = leafOs.getInfo(objectName);

assertEquals(hubInfo.getNuid(), leafInfo.getNuid());
assertEquals(hubInfo.getSize(), leafInfo.getSize());
assertEquals(hubInfo.getObjectMeta().getObjectName(), leafInfo.getObjectMeta().getObjectName());

ByteArrayOutputStream hubOut = new ByteArrayOutputStream();
ByteArrayOutputStream leafOut = new ByteArrayOutputStream();
hubOs.get(objectName, hubOut);
leafOs.get(objectName, leafOut);

byte[] hubBytes = hubOut.toByteArray();
byte[] leafBytes = leafOut.toByteArray();

assertArrayEquals(hubBytes, leafBytes);
});
}
}
20 changes: 13 additions & 7 deletions src/test/java/io/nats/client/utils/TestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -290,19 +290,21 @@ public static void runInJsHubLeaf(TwoServerTest twoServerTest) throws Exception
int leafPort = NatsTestServer.nextPort();

String[] hubInserts = new String[] {
"server_name: HUB",
"server_name: " + HUB_DOMAIN,
"jetstream {",
" domain: HUB",
" store_dir: " + tempJsStoreDir(),
" domain: " + HUB_DOMAIN,
"}",
"leafnodes {",
" listen = 127.0.0.1:" + hubLeafPort,
"}"
};

String[] leafInserts = new String[] {
"server_name: LEAF",
"server_name: " + LEAF_DOMAIN,
"jetstream {",
" domain: LEAF",
" store_dir: " + tempJsStoreDir(),
" domain: " + LEAF_DOMAIN,
"}",
"leafnodes {",
" remotes = [ { url: \"leaf://127.0.0.1:" + hubLeafPort + "\" } ]",
Expand Down Expand Up @@ -331,9 +333,9 @@ public static void runInJsCluster(ThreeServerTest threeServerTest) throws Except
int listen1 = NatsTestServer.nextPort();
int listen2 = NatsTestServer.nextPort();
int listen3 = NatsTestServer.nextPort();
String path1 = Files.createTempDirectory(variant()).toString().replace("\\", "\\\\");
String path2 = Files.createTempDirectory(variant()).toString().replace("\\", "\\\\");
String path3 = Files.createTempDirectory(variant()).toString().replace("\\", "\\\\");
String path1 = tempJsStoreDir();
String path2 = tempJsStoreDir();
String path3 = tempJsStoreDir();
String cluster = variant();
String serverPrefix = variant();

Expand Down Expand Up @@ -398,6 +400,10 @@ public static void runInJsCluster(ThreeServerTest threeServerTest) throws Except
}
}

private static String tempJsStoreDir() throws IOException {
return Files.createTempDirectory(variant()).toString().replace("\\", "\\\\"); // when on windows this is necessary. unix doesn't have backslash
}

private static void cleanupJs(Connection c)
{
try {
Expand Down

0 comments on commit c17c25e

Please sign in to comment.