rrqueue
is a distributed task queue for R, implemented on top of Redis. At the cost of a little more work it allows for more flexible parallelisation than afforded by mclapply
. The main goal is to support non-map style operations: submit some tasks, collect the completed results, queue more even while some tasks are still running.
Other features include:
mclapply
-like functions are available, the package is designed to be non-blocking so that intermediate results can be used.The basic workflow is:
The workers can be started at any time between 1-3, though they do need to be started before results can be collected.
Start a queue that we will submit tasks to
con <- rrqueue::queue("jobs")
Expressions can be queued using the enqueue
method:
task <- con$enqueue(sin(1))
Task objects can be inspected to find out (for example) how long they have been waiting for:
task$times()
or what their status is:
task$status()
To get workers to process jobs from this queue, interactively run (in a separate R instance)
w <- rrqueue::worker("jobs")
or spawn a worker in the background with
logfile <- tempfile()
rrqueue::worker_spawn("jobs", logfile)
The task will complete:
task$status()
and the value can be retrieved:
task$result()
con$send_message("STOP")
In contrast with many parallel approaches in R, workers can be added at at any time and will automatically start working on any remaining jobs.
There's lots more in various stages of completion, including mclapply
-like functions (rrqlapply
), and lots of information gathering.
Redis must be installed, redis-server
must be running. If you are familiar with docker, the redis docker image might be a good idea here. Alternatively, download redis, unpack and then install by running make install
in a terminal window within the downloaded folder.
Once installed start redis-server
by typing in a terminal window
redis-server
(On Linux the server will probably be running for you if you. On Mac OSX, you might like to set it up to run as a daemon – i.e. background process) – i f you end up using redis at lot, following these instructions)
Try redis-server PING
to see if it is running.
R packages:
install.packages(c("RcppRedis", "R6", "digest", "docopt"))
devtools::install_github(c("ropensci/RedisAPI", "richfitz/RedisHeartbeat", "richfitz/storr", "richfitz/ids"))
devtools::install_git("https://github.com/traitecoevo/rrqueue")
(optional) to see what is going on, in a terminal, run redis-cli monitor
which will print all the Redis chatter, though it will impact on redis performance.
Workers can be started from within an R process using rrqueue::worker_spawn
function. This takes an optional argument n
to start more than one worker at a time, and will block until all workers have appeared.
From the command line, workers can be started using the rrqueue_worker
script. The script can be installed by running (from R)
rrqueue::install_scripts("~/bin")
replacing "~/bin"
with a path that is in your executable search path and which is writeable.
$ rrqueue_worker --help
Usage:
rrqueue_worker [options] <queue_name>
rrqueue_worker --config=FILENAME [options] [<queue_name>]
rrqueue_worker -h | --help
Options:
--redis-host HOSTNAME Hostname for Redis
--redis-port PORT Port for Redis
--heartbeat-period T Heartbeat period
--heartbeat-expire T Heartbeat expiry time
--key-worker-alive KEY Key to write to when the worker becomes alive
--config FILENAME Optional YAML configuration filename
Arguments:
<queue_name> Name of queue
the arguments correspond to the arguments documented in ?worker_spawn
. The queue name is determined by position.
The config
argument is an optional path to a yml configuration file. That configuration file contains values for any of the arguments to worker_spawn
, for example:
queue_name: tmpjobs
redis_host: 127.0.0.1
redis_port: 6379
heartbeat_period: 30
heartbeat_expire: 90
Arguments passed to rrqueue_worker
in addition to the configuration will override values in the yaml.
This file can also be passed to queue
and observer
as the config
argument (e.g., queue(config="config.yml")
rather than having to pass in lots of parameters.
Reference documentation and vignettes are available on this website. If the vignettes are built (make vignettes
), they will be avilable in the package, and this will be commited to github once things settle down.
So far, I've done relatively little performance tuning. In particular, the workers make no effort to minimise the number of calls to Redis and assumes that this is fast connection. On the other hand, we use rrqueue
where the controller many hops across the internet (controlling a queue on AWS). To reduce the time involved, rrqueue
uses lua scripting) to reduce the number of instruction round trips.
You may see a variant on errors like
Calls: <Anonymous> -> .handleSimpleError -> h -> signalCondition
Error in signalCondition(e) :
no function to return from, jumping to top level
This is an issue somewhere within Rcpp modules (which RcppRedis uses) and seems harmless.
enqueue_bulk
(enqueue_bulk_submit)install_scripts
observer
queue
rrqlapply
(rrqlapply_submit)task
task_bundle
worker
worker_spawn
worker_stop