Skip to content

Commit

Permalink
Add reserve
Browse files Browse the repository at this point in the history
  • Loading branch information
g41797 committed Dec 29, 2024
1 parent f62eaa4 commit 87d52c6
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 19 deletions.
13 changes: 8 additions & 5 deletions src/client.zig
Original file line number Diff line number Diff line change
Expand Up @@ -307,21 +307,21 @@ pub const Client = struct {
return err.findError(cl.readLine[0..linelen]);
}

const ret = try parse.parseSize(cl.readLine[0..linelen]);
var ret = try parse.parseSize(cl.readLine[0..linelen]);

const jsize: usize = ret[1];

try job.alloc(jsize);

try cl.read_buffer(job.buffer[0..job.len], jsize);
try cl.read_buffer(job.buffer.?[0..job.len], jsize);

job.actual_len = jsize;

ret = try parse.parseSize(ret[0]);

const jid: usize = ret[1];

job.jid = jid;
job.jid = @intCast(jid);

return;
}
Expand Down Expand Up @@ -473,15 +473,18 @@ pub const Client = struct {
if (cl.connection == null) {
return ReturnedError.CommunicationFailure;
}
if (len > buffer.len) {
return ReturnedError.Internal;
}
if (len > 0) {
const rlen = try cl.connection.?.reader().readAtLeast(buffer, len);
const rlen = try cl.connection.?.reader().readAll(buffer[0..len]);

if (rlen < len) {
return ReturnedError.NoCRLF;
}
}
var rn: [2]u8 = undefined;
_ = try cl.readLine(&rn[0..2], 2);
_ = try cl.read_line(rn[0..2]);

return;
}
Expand Down
26 changes: 20 additions & 6 deletions src/client_tests.zig
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,41 @@ const ReturnedError = err.ReturnedError;
const client = @import("client.zig");
const Client = client.Client;
const Allocator = std.mem.Allocator;
const Job = @import("job.zig").Job;

test "connect-disconnect" {
var cl: Client = .{};
try cl.connect(std.testing.allocator, null, null);
cl.disconnect();
}

test "put-state-delete" {
test "put-state-reserve-delete" {
var cl: Client = .{};
try cl.connect(std.testing.allocator, null, null);
defer cl.disconnect();

try testing.expectError(ReturnedError.NotFound, cl.state(1));
const firstJob = 1;
if (cl.delete(firstJob)) {} else |_| {}
try testing.expectError(ReturnedError.NotFound, cl.state(firstJob));

const jid = try cl.put(1, 2, 120, "job body");
const job_body = "job body";
const jid = try cl.put(1, 0, 120, job_body);

const job_state = cl.state(jid);
try testing.expectEqual(job_state, client.JobState.delayed);
var job_state = cl.state(jid);
try testing.expectEqual(job_state, client.JobState.ready);

try cl.delete(jid);
var job: Job = .{};
try job.init(std.testing.allocator);
defer job.free();

try cl.reserve(0, &job);
const rid = job.id().?;
job_state = cl.state(rid);
try testing.expectEqual(job_state, client.JobState.reserved);
try testing.expectEqual(jid, rid);
try testing.expectEqual(std.mem.eql(u8, job_body, job.body().?), true);

try cl.delete(jid);
try testing.expectError(ReturnedError.NotFound, cl.state(jid));
}

Expand Down
23 changes: 15 additions & 8 deletions src/job.zig
Original file line number Diff line number Diff line change
Expand Up @@ -5,30 +5,37 @@ const std = @import("std");
const Allocator = std.mem.Allocator;

pub const Job = struct {
ready: bool = false,
jid: ?u32 = null,
buffer: ?[]u8 = null,
len: usize = undefined,
actual_len: usize = 0,
allocator: Allocator = undefined,

pub fn init(job: *Job, allocator: Allocator) !void {
if (job.ready) {
return error.AlreadyInitialized;
}
job.allocator = allocator;
job.ready = true;
}

pub fn alloc(job: *Job, len: usize) !void {
if (job.buffer == null) {
job.len = roundlen(len);
job.buffer = try job.allocator.alloc(u8, job.len);
return;
}

len = roundlen(len);
const rlen = roundlen(len);

if (job.len >= len) {
if (job.len >= rlen) {
return;
}

job.free();

job.len = len;

return job.alloc();
return job.alloc(rlen);
}

pub fn free(job: *Job) void {
Expand All @@ -43,17 +50,17 @@ pub const Job = struct {
if (job.buffer == null) {
return null;
}
if (job.id == null) {
if (job.jid == null) {
return null;
}
return job.id.?;
return job.jid.?;
}

pub fn body(job: *Job) ?[]u8 {
if (job.buffer == null) {
return null;
}
return job.buffer[0..job.actual_len];
return job.buffer.?[0..job.actual_len];
}
};

Expand Down

0 comments on commit 87d52c6

Please sign in to comment.