Overview

rrqueue is a distributed task queue for R, implemented on top of Redis. At the cost of a little more work it allows for more flexible parallelisation than afforded by mclapply. The main goal is to support non-map style operations: submit some tasks, collect the completed results, queue more even while some tasks are still running.

Other features include:

The basic workflow is:

  1. Create a queue
  2. Submit tasks to the queue
  3. Start workers
  4. Collect results

The workers can be started at any time between 1-3, though they do need to be started before results can be collected.

Documenting things that work asynchronously is difficult. This document gives a tutorial-style overview of working with rrqueue.

Getting started

The queue and workers can be started in any order, but it’s easiest to explain starting the queue first.

Suppose we have some simulation code; it needs to be in a file that the queue can see. For now, I’ll use the file myfuns.R which is the test code. It has a function in it called slowdouble that takes a number, sleeps for that many seconds, and then returns twice the number. It’s useful for testing.

You’ll also need a running Redis server. I have one operating with the default parameters, so this works:

RedisAPI::hiredis()$PING()
## [1] "PONG"

Create queue called “myqueue”, tell it to load the source file “myfuns.R”. If it was to load packages, then passing packages=c("package1", "package2") would indicate that workers would need to load those packages, too.

obj <- rrqueue::queue("myqueue", sources="myfuns.R")
## creating new queue
## 0 workers available

The message “creating new queue” here indicates that rrqueue did not find any previous queues in place. Queues are designed to be re-attachable so we can immediately just do that:

obj <- rrqueue::queue("myqueue", sources="myfuns.R")
## reattaching to existing queue
## 0 workers available

The message also notes that we have no workers available, so no work is going to get done. But we can still queue some tasks.

Queuing tasks

The simplest sort of task queuing is to pass an expression into enqueue:

t <- obj$enqueue(1 + 1)

The expression is not evaluated but stored and will be evaluated on the worker. Saving the result of this gives a task object which can be inspected.

t
## <task>
##   Public:
##     clone: function
##     con: environment
##     envir: function
##     expr: function
##     id: 1
##     initialize: function
##     key_complete: myqueue:tasks:1:complete
##     keys: list
##     result: function
##     status: function
##     times: function
##     wait: function

The expression stored in the task:

t$expr()
## 1 + 1

The status of the task:

t$status()
## [1] "PENDING"

The result of the task, which will throw an error if we try to call it:

t$result()
## Error in task_result(self$con, self$keys, self$id, follow_redirect, sanitise): task 1 is unfetchable: PENDING

And how long the task has been waiting:

t$times()
##             submitted started finished    waiting running idle
## 1 2015-09-28 12:41:00    <NA>     <NA> 0.02538204      NA   NA

Tasks can use local variables, too:

x <- 10
t2 <- obj$enqueue(x * 2)
t2$expr()
## x * 2

And because using unevaluated expressions can be problematic, rrqueue has a standard-evaluation version (enqueue_) which takes either strings representing expressions or quoted expressions:

obj$enqueue_(quote(x / 2))

Now we have three tasks:

obj$tasks_list()
## [1] "1" "2" "3"

All the tasks are waiting to be run:

obj$tasks_status()
##         1         2         3 
## "PENDING" "PENDING" "PENDING"

We can get an overview of the tasks:

obj$tasks_overview()
## 
##  PENDING  RUNNING COMPLETE    ERROR 
##        3        0        0        0

Starting workers

rrqueue includes a script rrqueue_worker for starting workers from the command line (install with rrqueue::install_scripts(). Workers can also be started from within R using the worker_spawn function:

logfile <- tempfile()
wid <- rrqueue::worker_spawn("myqueue", logfile)
## new worker: Richs-MacBook-Pro.local::34818

This function returns the worker identifier, which is also printed to the screen.

It’s probably informative at this point to read the logfile of the worker to see what it did on startup:

plain_output(readLines(logfile))
                                                       __
                ______________ ___  _____  __  _____  / /
      ______   / ___/ ___/ __ `/ / / / _ \/ / / / _ \/ /  ______
     /_____/  / /  / /  / /_/ / /_/ /  __/ /_/ /  __/_/  /_____/
 ______      /_/  /_/   \__, /\__,_/\___/\__,_/\___(_)      ______
/_____/                   /_/                              /_____/
    version:          0.1.4 [LOCAL]
    platform:         x86_64-apple-darwin13.4.0 (64-bit)
    running:          OS X 10.10.4 (Yosemite)
    hostname:         Richs-MacBook-Pro.local
    pid:              34818
    redis_host:       127.0.0.1
    redis_port:       6379
    worker:           Richs-MacBook-Pro.local::34818
    queue_name:       myqueue
    heartbeat_period: 30
    heartbeat_expire: 90
    message:          myqueue:workers:Richs-MacBook-Pro.local::34818:message
    response:         myqueue:workers:Richs-MacBook-Pro.local::34818:response
    log:              myqueue:workers:Richs-MacBook-Pro.local::34818:log
    envir:            {}
[2015-09-28 12:41:01] ALIVE
[2015-09-28 12:41:01] ENVIR 9ba846cec2d2f060b336b9b90f8fd19d
[2015-09-28 12:41:01] ENVIR PACKAGES (none)
[2015-09-28 12:41:01] ENVIR SOURCES myfuns.R
[2015-09-28 12:41:01] TASK_START 1
[2015-09-28 12:41:01] EXPR 1 + 1
[2015-09-28 12:41:01] TASK_COMPLETE 1
[2015-09-28 12:41:01] TASK_START 2
[2015-09-28 12:41:01] EXPR x * 2
[2015-09-28 12:41:01] TASK_COMPLETE 2
[2015-09-28 12:41:01] TASK_START 3
[2015-09-28 12:41:01] EXPR x/2
[2015-09-28 12:41:01] TASK_COMPLETE 3

The worker first prints a lot of diagnostic information to the screen (or log file) indicating the name of the worker, the version of rrqueue, machine information, and special keys in the database where important information is stored.

Then after broadcasting that it is awake (ALIVE) it detected that there was a controller on the queue and it attempts to construct the environment that the controller wants ENVIR 9ba846cec2d2f060b336b9b90f8fd19d.

After that, there are a series of TASK_START, EXPR, and TASK_COMPLETE lines as each of the three tasks is processed.

obj$tasks_status()
##          1          2          3 
## "COMPLETE" "COMPLETE" "COMPLETE"

The times here give an indication of the rrqueue overhead; the running time of these simple expressions should be close to zero.

obj$tasks_times()
##             submitted             started            finished   waiting
## 1 2015-09-28 12:41:00 2015-09-28 12:41:01 2015-09-28 12:41:01 0.7190011
## 2 2015-09-28 12:41:00 2015-09-28 12:41:01 2015-09-28 12:41:01 0.6885321
## 3 2015-09-28 12:41:00 2015-09-28 12:41:01 2015-09-28 12:41:01 0.6857600
##       running     idle
## 1 0.002099037 0.572660
## 2 0.001468897 0.567667
## 3 0.001198053 0.564261

The task handle created before can now give a result:

t$result()
## [1] 2

Similarly, results can be retrieved from the queue directly:

obj$task_result(1)
## [1] 2
obj$task_result(2)
## [1] 20
obj$task_result(3)
## [1] 5

The worker that we created can be seen here:

obj$workers_list()
## [1] "Richs-MacBook-Pro.local::34818"

Queue a slower task; this time the slowdouble function. This will take 1s:

t <- obj$enqueue(slowdouble(1))
t$status()
## [1] "PENDING"
Sys.sleep(.3)
t$status()
## [1] "RUNNING"
Sys.sleep(1)
t$status()
## [1] "COMPLETE"
t$result()
## [1] 2

Again, times are available:

t$times()
##             submitted             started            finished     waiting
## 4 2015-09-28 12:41:02 2015-09-28 12:41:02 2015-09-28 12:41:03 0.001399994
##    running     idle
## 4 1.006097 0.317421

Finishing up

obj$stop_workers()
                                                       __
                ______________ ___  _____  __  _____  / /
      ______   / ___/ ___/ __ `/ / / / _ \/ / / / _ \/ /  ______
     /_____/  / /  / /  / /_/ / /_/ /  __/ /_/ /  __/_/  /_____/
 ______      /_/  /_/   \__, /\__,_/\___/\__,_/\___(_)      ______
/_____/                   /_/                              /_____/
    version:          0.1.4 [LOCAL]
    platform:         x86_64-apple-darwin13.4.0 (64-bit)
    running:          OS X 10.10.4 (Yosemite)
    hostname:         Richs-MacBook-Pro.local
    pid:              34818
    redis_host:       127.0.0.1
    redis_port:       6379
    worker:           Richs-MacBook-Pro.local::34818
    queue_name:       myqueue
    heartbeat_period: 30
    heartbeat_expire: 90
    message:          myqueue:workers:Richs-MacBook-Pro.local::34818:message
    response:         myqueue:workers:Richs-MacBook-Pro.local::34818:response
    log:              myqueue:workers:Richs-MacBook-Pro.local::34818:log
    envir:            {}
[2015-09-28 12:41:01] ALIVE
[2015-09-28 12:41:01] ENVIR 9ba846cec2d2f060b336b9b90f8fd19d
[2015-09-28 12:41:01] ENVIR PACKAGES (none)
[2015-09-28 12:41:01] ENVIR SOURCES myfuns.R
[2015-09-28 12:41:01] TASK_START 1
[2015-09-28 12:41:01] EXPR 1 + 1
[2015-09-28 12:41:01] TASK_COMPLETE 1
[2015-09-28 12:41:01] TASK_START 2
[2015-09-28 12:41:01] EXPR x * 2
[2015-09-28 12:41:01] TASK_COMPLETE 2
[2015-09-28 12:41:01] TASK_START 3
[2015-09-28 12:41:01] EXPR x/2
[2015-09-28 12:41:01] TASK_COMPLETE 3
[2015-09-28 12:41:02] TASK_START 4
[2015-09-28 12:41:02] EXPR slowdouble(1)
[2015-09-28 12:41:03] TASK_COMPLETE 4
[2015-09-28 12:41:03] MESSAGE STOP
[2015-09-28 12:41:03] RESPONSE STOP
[2015-09-28 12:41:03] STOP OK

worker is now in the exited list

obj$workers_list_exited()
## [1] "Richs-MacBook-Pro.local::34818"

The full log from our worker (dropping the first column which is the worker id and takes up valuable space here):

obj$workers_log_tail(wid, Inf)[-1]
##          time       command                          message
## 1  1443440461         ALIVE                                 
## 2  1443440461         ENVIR 9ba846cec2d2f060b336b9b90f8fd19d
## 3  1443440461    TASK_START                                1
## 4  1443440461 TASK_COMPLETE                                1
## 5  1443440461    TASK_START                                2
## 6  1443440461 TASK_COMPLETE                                2
## 7  1443440461    TASK_START                                3
## 8  1443440461 TASK_COMPLETE                                3
## 9  1443440462    TASK_START                                4
## 10 1443440463 TASK_COMPLETE                                4
## 11 1443440463       MESSAGE                             STOP
## 12 1443440463      RESPONSE                             STOP
## 13 1443440463          STOP                               OK