Creating an ipc.workqueue allows a thread to communicate with a set of worker threads created by ipc.map. The queue is bidirectional between an owner thread and the workers. All native Lua types can be quickly marshaled by value across thread boundaries (a copy is made). Torch Tensor and Storage objects are marshaled by reference (no copy is made).
local q = ipc.workqueue([name, size, size_increment])
The constructor takes the following optional arguments:
name
a string identifying the queue within the process (defaults to nil);size
is the initial size in bytes of the workqueue (defaults to 1024*16);size_increment
is the size in bytes by which the workqueue is incremented when it runs out of memory (defaults tosize
).
The two main methods are :write() and :read(). Their usage depends on the perspective of the caller. From the owner thread's perspective, :write() will put a question on the queue for one of the workers to process. Whenever the owner thread would like get the answer it can call :read(). From the perspective of the worker thread the functions are reversed. A worker can call :read() to get the next question off the queue to process and it can call :write() to return the answer to the owner thread.
local ipc = require 'libipc'
-- Create a named workqueue
local q = ipc.workqueue('my queue')
-- Create 2 background workers that read from the named workqueue
local workers = ipc.map(2, function(threadId)
-- the last argument passed to the function is the ID of the thread
assert((threadId == 1) or (threadId == 2))
-- This function is not a closure, it is a totally clean Lua environment
local ipc = require 'libipc'
-- Open the queue by name (the main thread already created it)
local q = ipc.workqueue('my queue')
repeat
-- Read the next file name off the workqueue
local fileName = q:read()
if fileName then
-- Load the file and write its contents back into the workqueue
q:write(torch.load(fileName))
end
until fileName == nil
end)
-- Write the file names into the workqueue
q:write('f1.t7')
q:write('f2.t7')
q:write('f3.t7')
-- Read back the 3 answers and print them
print(q:read())
print(q:read())
print(q:read())
The owner thread can also do non-blocking reads from the queue. This is useful to poll for answers while doing other work. Passing true into :read(true) will check the queue for an answer, if one is ready it will return it, else :read(true) will return nil, indicating no answer is currently ready.
Lua supports closures. These are functions with upvalues, i.e. non-global variables outside the scope of the function:
local upvalue = 1
local closure = function()
return upvalue
end
By default :write() doesn't support closures. Calling :write(closure) will throw an error. The reason for this is that we want to discourage users from serializing upvalues unless they absolutely need to. However, we do provide :writeup() for serializing closures/upvalues. So calling :writeup(closure) will not fail.
In Lua, almost everything is an upvalues.
For example, the nn
variable is an upvalue in the following heavyclosure()
:
local nn = require 'nn' -- upvalue
local heavyclosure = function(input)
return nn.Linear:forward(input)
end
Calling :writeup(heavyclosure) will attempt to serialize the entire nn
package.
To avoid this kind of mistake, we recommend calling require from inside the closure:
local lightfunction = function(input)
local nn = require 'nn'
return nn.Linear:forward(input)
end
Calling :write(lightfunction) will be much more efficient than calling :write(heavyfunction).
As a final note for the powerusers out there, know that :writeup()_ does not serialize the _ENV
upvalue of closures.
Typically _ENV = _G
in the writing thread, which would be too heavy to serialize.
Instead we set the _ENV
upvalue of the deserialized closure to the reading threads _G
.
So if you dont see any _ENV
in your code, you should be fine.
The :write() and :writeup() methods can be used to write multiple objects into the queue at once.
For example, q:write(1, 2, 3)
is equivalent to q:write(1);q:write(2);q:write(3)
.
As such, each argument passed to :write() and :writeup()
will require their own q:read()
to be read.
A more concrete example of combining ipc.map and ipc.workqueue can be found in ipc.BackgroundTask