rrqueue
is a distributed task queue for R, implemented on top of Redis. At the cost of a little more work it allows for more flexible parallelisation than afforded by mclapply
. The main goal is to support non-map style operations: submit some tasks, collect the completed results, queue more even while some tasks are still running.
Other features include:
mclapply
-like functions are available, the package is designed to be non-blocking so that intermediate results can be used.The basic workflow is:
The workers can be started at any time between 1-3, though they do need to be started before results can be collected.
Documenting things that work asynchronously is difficult. This document gives a tutorial-style overview of working with rrqueue.
The queue and workers can be started in any order, but it’s easiest to explain starting the queue first.
Suppose we have some simulation code; it needs to be in a file that the queue can see. For now, I’ll use the file myfuns.R
which is the test code. It has a function in it called slowdouble
that takes a number, sleeps for that many seconds, and then returns twice the number. It’s useful for testing.
You’ll also need a running Redis server. I have one operating with the default parameters, so this works:
RedisAPI::hiredis()$PING()
## [1] "PONG"
Create queue called “myqueue”, tell it to load the source file “myfuns.R”. If it was to load packages, then passing packages=c("package1", "package2")
would indicate that workers would need to load those packages, too.
obj <- rrqueue::queue("myqueue", sources="myfuns.R")
## creating new queue
## 0 workers available
The message “creating new queue” here indicates that rrqueue
did not find any previous queues in place. Queues are designed to be re-attachable so we can immediately just do that:
obj <- rrqueue::queue("myqueue", sources="myfuns.R")
## reattaching to existing queue
## 0 workers available
The message also notes that we have no workers available, so no work is going to get done. But we can still queue some tasks.
The simplest sort of task queuing is to pass an expression into enqueue:
t <- obj$enqueue(1 + 1)
The expression is not evaluated but stored and will be evaluated on the worker. Saving the result of this gives a task
object which can be inspected.
t
## <task>
## Public:
## clone: function
## con: environment
## envir: function
## expr: function
## id: 1
## initialize: function
## key_complete: myqueue:tasks:1:complete
## keys: list
## result: function
## status: function
## times: function
## wait: function
The expression stored in the task:
t$expr()
## 1 + 1
The status of the task:
t$status()
## [1] "PENDING"
The result of the task, which will throw an error if we try to call it:
t$result()
## Error in task_result(self$con, self$keys, self$id, follow_redirect, sanitise): task 1 is unfetchable: PENDING
And how long the task has been waiting:
t$times()
## submitted started finished waiting running idle
## 1 2015-09-28 12:41:00 <NA> <NA> 0.02538204 NA NA
Tasks can use local variables, too:
x <- 10
t2 <- obj$enqueue(x * 2)
t2$expr()
## x * 2
And because using unevaluated expressions can be problematic, rrqueue
has a standard-evaluation version (enqueue_
) which takes either strings representing expressions or quoted expressions:
obj$enqueue_(quote(x / 2))
Now we have three tasks:
obj$tasks_list()
## [1] "1" "2" "3"
All the tasks are waiting to be run:
obj$tasks_status()
## 1 2 3
## "PENDING" "PENDING" "PENDING"
We can get an overview of the tasks:
obj$tasks_overview()
##
## PENDING RUNNING COMPLETE ERROR
## 3 0 0 0
rrqueue
includes a script rrqueue_worker
for starting workers from the command line (install with rrqueue::install_scripts()
. Workers can also be started from within R using the worker_spawn
function:
logfile <- tempfile()
wid <- rrqueue::worker_spawn("myqueue", logfile)
## new worker: Richs-MacBook-Pro.local::34818
This function returns the worker identifier, which is also printed to the screen.
It’s probably informative at this point to read the logfile of the worker to see what it did on startup:
plain_output(readLines(logfile))
__
______________ ___ _____ __ _____ / /
______ / ___/ ___/ __ `/ / / / _ \/ / / / _ \/ / ______
/_____/ / / / / / /_/ / /_/ / __/ /_/ / __/_/ /_____/
______ /_/ /_/ \__, /\__,_/\___/\__,_/\___(_) ______
/_____/ /_/ /_____/
version: 0.1.4 [LOCAL]
platform: x86_64-apple-darwin13.4.0 (64-bit)
running: OS X 10.10.4 (Yosemite)
hostname: Richs-MacBook-Pro.local
pid: 34818
redis_host: 127.0.0.1
redis_port: 6379
worker: Richs-MacBook-Pro.local::34818
queue_name: myqueue
heartbeat_period: 30
heartbeat_expire: 90
message: myqueue:workers:Richs-MacBook-Pro.local::34818:message
response: myqueue:workers:Richs-MacBook-Pro.local::34818:response
log: myqueue:workers:Richs-MacBook-Pro.local::34818:log
envir: {}
[2015-09-28 12:41:01] ALIVE
[2015-09-28 12:41:01] ENVIR 9ba846cec2d2f060b336b9b90f8fd19d
[2015-09-28 12:41:01] ENVIR PACKAGES (none)
[2015-09-28 12:41:01] ENVIR SOURCES myfuns.R
[2015-09-28 12:41:01] TASK_START 1
[2015-09-28 12:41:01] EXPR 1 + 1
[2015-09-28 12:41:01] TASK_COMPLETE 1
[2015-09-28 12:41:01] TASK_START 2
[2015-09-28 12:41:01] EXPR x * 2
[2015-09-28 12:41:01] TASK_COMPLETE 2
[2015-09-28 12:41:01] TASK_START 3
[2015-09-28 12:41:01] EXPR x/2
[2015-09-28 12:41:01] TASK_COMPLETE 3
The worker first prints a lot of diagnostic information to the screen (or log file) indicating the name of the worker, the version of rrqueue, machine information, and special keys in the database where important information is stored.
Then after broadcasting that it is awake (ALIVE
) it detected that there was a controller on the queue and it attempts to construct the environment that the controller wants ENVIR 9ba846cec2d2f060b336b9b90f8fd19d.
After that, there are a series of TASK_START
, EXPR
, and TASK_COMPLETE
lines as each of the three tasks is processed.
obj$tasks_status()
## 1 2 3
## "COMPLETE" "COMPLETE" "COMPLETE"
The times here give an indication of the rrqueue overhead; the running time of these simple expressions should be close to zero.
obj$tasks_times()
## submitted started finished waiting
## 1 2015-09-28 12:41:00 2015-09-28 12:41:01 2015-09-28 12:41:01 0.7190011
## 2 2015-09-28 12:41:00 2015-09-28 12:41:01 2015-09-28 12:41:01 0.6885321
## 3 2015-09-28 12:41:00 2015-09-28 12:41:01 2015-09-28 12:41:01 0.6857600
## running idle
## 1 0.002099037 0.572660
## 2 0.001468897 0.567667
## 3 0.001198053 0.564261
The task handle created before can now give a result:
t$result()
## [1] 2
Similarly, results can be retrieved from the queue directly:
obj$task_result(1)
## [1] 2
obj$task_result(2)
## [1] 20
obj$task_result(3)
## [1] 5
The worker that we created can be seen here:
obj$workers_list()
## [1] "Richs-MacBook-Pro.local::34818"
Queue a slower task; this time the slowdouble
function. This will take 1s:
t <- obj$enqueue(slowdouble(1))
t$status()
## [1] "PENDING"
Sys.sleep(.3)
t$status()
## [1] "RUNNING"
Sys.sleep(1)
t$status()
## [1] "COMPLETE"
t$result()
## [1] 2
Again, times are available:
t$times()
## submitted started finished waiting
## 4 2015-09-28 12:41:02 2015-09-28 12:41:02 2015-09-28 12:41:03 0.001399994
## running idle
## 4 1.006097 0.317421
obj$stop_workers()
__
______________ ___ _____ __ _____ / /
______ / ___/ ___/ __ `/ / / / _ \/ / / / _ \/ / ______
/_____/ / / / / / /_/ / /_/ / __/ /_/ / __/_/ /_____/
______ /_/ /_/ \__, /\__,_/\___/\__,_/\___(_) ______
/_____/ /_/ /_____/
version: 0.1.4 [LOCAL]
platform: x86_64-apple-darwin13.4.0 (64-bit)
running: OS X 10.10.4 (Yosemite)
hostname: Richs-MacBook-Pro.local
pid: 34818
redis_host: 127.0.0.1
redis_port: 6379
worker: Richs-MacBook-Pro.local::34818
queue_name: myqueue
heartbeat_period: 30
heartbeat_expire: 90
message: myqueue:workers:Richs-MacBook-Pro.local::34818:message
response: myqueue:workers:Richs-MacBook-Pro.local::34818:response
log: myqueue:workers:Richs-MacBook-Pro.local::34818:log
envir: {}
[2015-09-28 12:41:01] ALIVE
[2015-09-28 12:41:01] ENVIR 9ba846cec2d2f060b336b9b90f8fd19d
[2015-09-28 12:41:01] ENVIR PACKAGES (none)
[2015-09-28 12:41:01] ENVIR SOURCES myfuns.R
[2015-09-28 12:41:01] TASK_START 1
[2015-09-28 12:41:01] EXPR 1 + 1
[2015-09-28 12:41:01] TASK_COMPLETE 1
[2015-09-28 12:41:01] TASK_START 2
[2015-09-28 12:41:01] EXPR x * 2
[2015-09-28 12:41:01] TASK_COMPLETE 2
[2015-09-28 12:41:01] TASK_START 3
[2015-09-28 12:41:01] EXPR x/2
[2015-09-28 12:41:01] TASK_COMPLETE 3
[2015-09-28 12:41:02] TASK_START 4
[2015-09-28 12:41:02] EXPR slowdouble(1)
[2015-09-28 12:41:03] TASK_COMPLETE 4
[2015-09-28 12:41:03] MESSAGE STOP
[2015-09-28 12:41:03] RESPONSE STOP
[2015-09-28 12:41:03] STOP OK
worker is now in the exited list
obj$workers_list_exited()
## [1] "Richs-MacBook-Pro.local::34818"
The full log from our worker (dropping the first column which is the worker id and takes up valuable space here):
obj$workers_log_tail(wid, Inf)[-1]
## time command message
## 1 1443440461 ALIVE
## 2 1443440461 ENVIR 9ba846cec2d2f060b336b9b90f8fd19d
## 3 1443440461 TASK_START 1
## 4 1443440461 TASK_COMPLETE 1
## 5 1443440461 TASK_START 2
## 6 1443440461 TASK_COMPLETE 2
## 7 1443440461 TASK_START 3
## 8 1443440461 TASK_COMPLETE 3
## 9 1443440462 TASK_START 4
## 10 1443440463 TASK_COMPLETE 4
## 11 1443440463 MESSAGE STOP
## 12 1443440463 RESPONSE STOP
## 13 1443440463 STOP OK