Skip to content

Commit

Permalink
Update Redis integration to support valkey-cli and valkey-server; adj…
Browse files Browse the repository at this point in the history
…ust package dependencies and improve error handling

Signed-off-by: avifenesh <[email protected]>
  • Loading branch information
avifenesh committed Jan 26, 2025
1 parent a998656 commit 3f8d48a
Show file tree
Hide file tree
Showing 11 changed files with 220 additions and 147 deletions.
1 change: 1 addition & 0 deletions glide-core/redis-rs/redis/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ anyhow = "1"
sscanf = "0.4.1"
serial_test = "2"
versions = "6.3"
which = "7.0.1"

[[test]]
name = "test_async"
Expand Down
8 changes: 7 additions & 1 deletion glide-core/redis-rs/redis/tests/support/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,13 @@ impl RedisCluster {
));
}

let mut cmd = process::Command::new("redis-cli");
let cli_command = ["valkey-cli", "redis-cli"]
.iter()
.find(|cmd| which::which(cmd).is_ok())
.map(|&cmd| cmd)
.unwrap_or_else(|| panic!("Neither valkey-cli nor redis-cli exists in the system."));

let mut cmd = process::Command::new(cli_command);
cmd.stdout(process::Stdio::null())
.arg("--cluster")
.arg("create")
Expand Down
30 changes: 19 additions & 11 deletions glide-core/redis-rs/redis/tests/support/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,17 +225,25 @@ impl RedisServer {
modules: &[Module],
spawner: F,
) -> RedisServer {
let mut redis_cmd = process::Command::new("redis-server");
// Check wether the server available is redis or valkey
let server_command = ["valkey-server", "redis-server"]
.iter()
.find(|cmd| which::which(cmd).is_ok())
.map(|&cmd| cmd)
.unwrap_or_else(|| {
panic!("Neither valkey-server nor redis-server exists in the system.")
});
let mut server_cmd = process::Command::new(server_command);

if let Some(config_path) = config_file {
redis_cmd.arg(config_path);
server_cmd.arg(config_path);
}

// Load Redis Modules
for module in modules {
match module {
Module::Json => {
redis_cmd
server_cmd
.arg("--loadmodule")
.arg(env::var("REDIS_RS_REDIS_JSON_PATH").expect(
"Unable to find path to RedisJSON at REDIS_RS_REDIS_JSON_PATH, is it set?",
Expand All @@ -244,24 +252,24 @@ impl RedisServer {
};
}

redis_cmd
server_cmd
.stdout(process::Stdio::null())
.stderr(process::Stdio::null());
let tempdir = tempfile::Builder::new()
.prefix("redis")
.tempdir()
.expect("failed to create tempdir");
redis_cmd.arg("--logfile").arg(Self::log_file(&tempdir));
server_cmd.arg("--logfile").arg(Self::log_file(&tempdir));
match addr {
redis::ConnectionAddr::Tcp(ref bind, server_port) => {
redis_cmd
server_cmd
.arg("--port")
.arg(server_port.to_string())
.arg("--bind")
.arg(bind);

RedisServer {
process: spawner(&mut redis_cmd),
process: spawner(&mut server_cmd),
tempdir,
addr,
tls_paths: None,
Expand All @@ -273,7 +281,7 @@ impl RedisServer {
let auth_client = if mtls_enabled { "yes" } else { "no" };

// prepare redis with TLS
redis_cmd
server_cmd
.arg("--tls-port")
.arg(port.to_string())
.arg("--port")
Expand All @@ -300,20 +308,20 @@ impl RedisServer {
};

RedisServer {
process: spawner(&mut redis_cmd),
process: spawner(&mut server_cmd),
tempdir,
addr,
tls_paths: Some(tls_paths),
}
}
redis::ConnectionAddr::Unix(ref path) => {
redis_cmd
server_cmd
.arg("--port")
.arg("0")
.arg("--unixsocket")
.arg(path);
RedisServer {
process: spawner(&mut redis_cmd),
process: spawner(&mut server_cmd),
tempdir,
addr,
tls_paths: None,
Expand Down
111 changes: 57 additions & 54 deletions node/hybrid-node-tests/commonjs-test/commonjs-test.cjs
Original file line number Diff line number Diff line change
@@ -1,73 +1,76 @@
/* eslint no-undef: off */
/* eslint @typescript-eslint/no-require-imports: off */
const { AsyncClient } = require("glide-rs");
const RedisServer = require("redis-server");
const { GlideClient } = require("@valkey/valkey-glide");
const FreePort = require("find-free-port");
const { execFile } = require("child_process");
const { promisify } = require("node:util");
const { exec, spawn } = require("node:child_process");
const execAsync = promisify(exec);

const PORT_NUMBER = 4001;
let server;
let port;

function checkCliAvailability(cli) {
return new Promise((resolve) => {
execFile(cli, ["--version"], (error) => {
resolve(!error);
});
});
async function checkCommandAvailability(command) {
try {
const { stdout } = await execAsync(`which ${command}`);
console.log(stdout);
return Boolean(stdout.trim());
} catch (error) {
console.error(error);
return false;
}
}

async function flushallOnPort(port) {
const redisCliAvailable = await checkCliAvailability("redis-cli");
const valkeyCliAvailable = await checkCliAvailability("valkey-cli");
async function main() {
console.log("Starting main");
const port = await FreePort(PORT_NUMBER).then(([free_port]) => free_port);
const redisServerAvailable = await checkCommandAvailability("redis-server");
const valkeyServerAvailable =
await checkCommandAvailability("valkey-server");

if (!redisCliAvailable && !valkeyCliAvailable) {
throw new Error("Neither redis-cli nor valkey-cli is available");
if (!redisServerAvailable && !valkeyServerAvailable) {
throw new Error("Neither valkey nor redis are available");
}

const cli = redisCliAvailable ? "valkey-cli" : "redis-cli";
return new Promise((resolve, reject) => {
execFile(cli, ["-p", port, "FLUSHALL"], (error, _, stderr) => {
if (error) {
console.error(stderr);
reject(error);
} else {
const server = valkeyServerAvailable ? "valkey-server" : "redis-server";
console.log(server);

const serverProcess = spawn(server, ["--port", port.toString()], {
stdio: ["ignore", "pipe", "pipe"],
});

await new Promise((resolve) => {
serverProcess.stdout.on("data", (data) => {
console.log(`${data}`);

if (data.toString().includes("Ready to accept connections")) {
resolve();
}
});

serverProcess.stderr.on("data", (data) => {
console.error(`${data}`);
});
});
}

FreePort(PORT_NUMBER)
.then(([free_port]) => {
port = free_port;
server = new RedisServer(port);
server.open(async (err) => {
if (err) {
console.error("Error opening server:", err);
throw err;
}
const client = await GlideClient.createClient({
addresses: [{ host: "localhost", port }],
});
const setResult = await client.set("test", "test");
console.log(setResult);
let getResult = await client.get("test");
console.log(getResult);

const client = AsyncClient.CreateConnection(
`redis://localhost:${port}`,
);
await client.set("test", "test");
let result = await client.get("test");
if (getResult !== "test") {
throw new Error("Common Test failed");
} else {
console.log("Common Test passed");
}

if (result !== "test") {
throw new Error("Common Test failed");
} else {
console.log("Common Test passed");
}
await client.flushall();
client.close();
serverProcess.kill();
}

await flushallOnPort(port).then(() => {
console.log("db flushed");
});
await server.close().then(() => {
console.log("server closed");
});
});
})
.catch((error) => {
console.error("Error occurred while finding a free port:", error);
});
main().then(() => {
console.log("Done");
process.exit(0);
});
11 changes: 4 additions & 7 deletions node/hybrid-node-tests/commonjs-test/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"name": "common-test",
"version": "1.0.0",
"description": "",
"main": "Common-test.cjs",
"main": "commonjs-test.cjs",
"type": "commonjs",
"scripts": {
"build": "cd ../../../node && npm run build",
Expand All @@ -14,13 +14,10 @@
"author": "Amazon Web Services",
"license": "Apache-2.0",
"dependencies": {
"child_process": "^1.0.2",
"find-free-port": "^2.0.0",
"valkey-glide": "file:../../../node/build-ts/cjs"
"@valkey/valkey-glide": "file:../..",
"find-free-port": "^2.0.0"
},
"devDependencies": {
"@types/node": "^22.8",
"prettier": "^3.3",
"typescript": "^5.6"
"prettier": "^3.3"
}
}
91 changes: 49 additions & 42 deletions node/hybrid-node-tests/ecmascript-test/ecmascript-test.mjs
Original file line number Diff line number Diff line change
@@ -1,66 +1,73 @@
/* eslint no-undef: off */
import { execFile } from "child_process";
import { spawn, execFile } from "child_process";
import findFreePorts from "find-free-ports";
import { AsyncClient } from "glide-rs";
import RedisServer from "redis-server";
import { GlideClient } from "@valkey/valkey-glide";

const PORT_NUMBER = 4001;
let server;
let port;

function checkCliAvailability(cli) {
function checkCommandAvailability(command) {
return new Promise((resolve) => {
execFile(cli, ["--version"], (error) => {
execFile(command, ["--version"], (error) => {
resolve(!error);
});
});
}

async function flushallOnPort(port) {
const redisCliAvailable = await checkCliAvailability("redis-cli");
const valkeyCliAvailable = await checkCliAvailability("valkey-cli");
async function main() {
console.log("Starting main");
const port = await findFreePorts(PORT_NUMBER).then(
([free_port]) => free_port,
);
const redisServerAvailable = await checkCommandAvailability("redis-server");
const valkeyServerAvailable =
await checkCommandAvailability("valkey-server");

if (!redisCliAvailable && !valkeyCliAvailable) {
throw new Error("Neither redis-cli nor valkey-cli is available");
if (!redisServerAvailable && !valkeyServerAvailable) {
throw new Error("Neither valkey nor redis are available");
}

const cli = redisCliAvailable ? "valkey-cli" : "redis-cli";
return new Promise((resolve, reject) => {
execFile(cli, ["-p", port, "FLUSHALL"], (error, _, stderr) => {
if (error) {
console.error(stderr);
reject(error);
} else {
const server = valkeyServerAvailable ? "valkey-server" : "redis-server";
console.log(server);

const serverProcess = spawn(server, ["--port", port.toString()], {
stdio: ["ignore", "pipe", "pipe"],
});

await new Promise((resolve) => {
serverProcess.stdout.on("data", (data) => {
console.log(`${data}`);

if (data.toString().includes("Ready to accept connections")) {
resolve();
}
});
});
}

port = await findFreePorts(PORT_NUMBER).then(([free_port]) => free_port);
server = await new Promise((resolve, reject) => {
const server = new RedisServer(port);
server.open(async (err) => {
if (err) {
reject(err);
}
serverProcess.stderr.on("data", (data) => {
console.error(`${data}`);
});
});

resolve(server);
const client = await GlideClient.createClient({
addresses: [{ host: "localhost", port }],
});
});
const client = AsyncClient.CreateConnection("redis://localhost:" + port);
await client.set("test", "test");
let result = await client.get("test");
const setResult = await client.set("test", "test");
console.log(setResult);
let getResult = await client.get("test");
console.log(getResult);

if (result !== "test") {
throw new Error("Ecma Test failed");
} else {
console.log("Ecma Test passed");
if (getResult !== "test") {
throw new Error("Ecma Test failed");
} else {
console.log("Ecma Test passed");
}

await client.flushall();
client.close();
serverProcess.kill();
console.log("Done");
}

await flushallOnPort(port).then(() => {
console.log("db flushed");
});
await server.close().then(() => {
console.log("server closed");
main().catch((err) => {
console.error(err);
process.exit(1);
});
Loading

0 comments on commit 3f8d48a

Please sign in to comment.