In addition to passing tasks (and results) between a controller and workers, the controller can also send “messages” to workers. This vignette shows what the possible messages do.

In order to do this, we’re going to need a queue and a worker:

obj <- rrqueue::queue("myqueue", sources="myfuns.R")
## creating new queue
## 0 workers available
logfile <- tempfile()
worker_id <- rrqueue::worker_spawn("myqueue", logfile)
## new worker: Richs-MacBook-Pro.local::34833

On startup the worker log contains:

                                                       __
                ______________ ___  _____  __  _____  / /
      ______   / ___/ ___/ __ `/ / / / _ \/ / / / _ \/ /  ______
     /_____/  / /  / /  / /_/ / /_/ /  __/ /_/ /  __/_/  /_____/
 ______      /_/  /_/   \__, /\__,_/\___/\__,_/\___(_)      ______
/_____/                   /_/                              /_____/
    version:          0.1.4 [LOCAL]
    platform:         x86_64-apple-darwin13.4.0 (64-bit)
    running:          OS X 10.10.4 (Yosemite)
    hostname:         Richs-MacBook-Pro.local
    pid:              34833
    redis_host:       127.0.0.1
    redis_port:       6379
    worker:           Richs-MacBook-Pro.local::34833
    queue_name:       myqueue
    heartbeat_period: 30
    heartbeat_expire: 90
    message:          myqueue:workers:Richs-MacBook-Pro.local::34833:message
    response:         myqueue:workers:Richs-MacBook-Pro.local::34833:response
    log:              myqueue:workers:Richs-MacBook-Pro.local::34833:log
    envir:            {}
[2015-09-28 12:41:05] ALIVE
[2015-09-28 12:41:05] ENVIR 9ba846cec2d2f060b336b9b90f8fd19d
[2015-09-28 12:41:05] ENVIR PACKAGES (none)
[2015-09-28 12:41:05] ENVIR SOURCES myfuns.R

Because one of the main effects of messages is to print to the worker logfile, we’ll print this fairly often.

Messages and responses

  1. The queue sends a message for one or more workers to process. The message has an identifier that is derived from the current time. Messages are written to a first-in-first-out queue, per worker, and are processed independently by workers who do not look to see if other workers have messages or are processing them.

  2. As soon as a worker has finished processing any current job it will process the message (it must wait to finish a current job but will not start any further jobs).

  3. Once the message has been processed (see below) a response will be written to a response list with the same identifier as the message.

PING

The PING message simply asks the worker to return PONG. It’s useful for diagnosing communication issues because it does so little

message_id <- obj$send_message("PING")

The message id is going to be useful for getting responses:

message_id
## [1] "1443440465.46735"

(this is derived from the current time, according to Redis which is the central reference point of time for the whole system).

wait a little while:

Sys.sleep(.5)
reader()
[2015-09-28 12:41:05] MESSAGE PING
PONG
[2015-09-28 12:41:05] RESPONSE PING

The logfile prints:

  1. the request for the PING (MESSAGE PING)
  2. the value PONG to the R message stream
  3. logging a response (RESPONSE PONG), which means that something is written to the response stream.

We can access the same bits of information in the worker log:

obj$workers_log_tail(n=Inf)[-1]
##         time  command                          message
## 1 1443440465    ALIVE                                 
## 2 1443440465    ENVIR 9ba846cec2d2f060b336b9b90f8fd19d
## 3 1443440465  MESSAGE                             PING
## 4 1443440465 RESPONSE                             PING

This includes the ALIVE and ENVIR bits as the worker comes up.

Inspecting the logs is fine for interactive use, but it’s going to be more useful often to poll for a response.

We already know that our worker has a response, but we can ask anyway:

obj$has_responses(message_id)
## Richs-MacBook-Pro.local::34833 
##                           TRUE

Or inversely we can as what messages a given worker has responses for:

obj$response_ids(worker_id)
## [1] "1443440465.46735"

To fetch the responses from all workers it was sent to (always returning a named list):

obj$get_responses(message_id)
## $`Richs-MacBook-Pro.local::34833`
## [1] "PONG"

or to fetch the response from a given worker:

obj$get_response(message_id, worker_id)
## [1] "PONG"

The response can be deleted by passing delete=TRUE to this method:

obj$get_response(message_id, worker_id, delete=TRUE)
## [1] "PONG"

after which recalling the message will throw an error:

obj$get_response(message_id, worker_id, delete=TRUE)
## Error in get_responses(self$con, self$keys, message_id, worker_ids, delete, : Response missing for workers: Richs-MacBook-Pro.local::34833

There is also a wait argument that lets you wait until a response is ready. The slowdouble command will take a few seconds, so to demonstrate:

obj$enqueue(slowdouble(2))
message_id <- obj$send_message("PING")
obj$get_response(message_id, worker_id, delete=TRUE, wait=10)
## [1] "PONG"

Looking at the log will show what went on here:

obj$workers_log_tail(n=4)[-1]
##         time       command message
## 1 1443440465    TASK_START       1
## 2 1443440467 TASK_COMPLETE       1
## 3 1443440467       MESSAGE    PING
## 4 1443440467      RESPONSE    PING
  1. A task is recieved
  2. 2s later the task is completed
  3. Then the message is recieved
  4. Then, basically instantaneously, the message is responded to

However, because the message is only processed after the task is completed, the response takes a while to come back. Equivalently, from the worker log:

[2015-09-28 12:41:05] TASK_START 1
[2015-09-28 12:41:05] EXPR slowdouble(2)
[2015-09-28 12:41:07] TASK_COMPLETE 1
[2015-09-28 12:41:07] MESSAGE PING
PONG
[2015-09-28 12:41:07] RESPONSE PING

ECHO

This is basically like PING and not very interesting; it prints an arbitrary string to the log. It always returns "OK" as a response.

message_id <- obj$send_message("ECHO", "hello world!")
obj$get_response(message_id, worker_id, delete=TRUE, wait=10)
## [1] "OK"
[2015-09-28 12:41:07] MESSAGE ECHO
hello world!
[2015-09-28 12:41:07] RESPONSE ECHO

INFO

The INFO command refreshes and returns the worker information.

We already have a copy of the worker info; it was created when the worker started up:

obj$workers_info()[[worker_id]]
##     version:          0.1.4 [LOCAL]
##     platform:         x86_64-apple-darwin13.4.0 (64-bit)
##     running:          OS X 10.10.4 (Yosemite)
##     hostname:         Richs-MacBook-Pro.local
##     pid:              34833
##     redis_host:       127.0.0.1
##     redis_port:       6379
##     worker:           Richs-MacBook-Pro.local::34833
##     queue_name:       myqueue
##     heartbeat_period: 30
##     heartbeat_expire: 90
##     message:          myqueue:workers:Richs-MacBook-Pro.local::34833:message
##     response:         myqueue:workers:Richs-MacBook-Pro.local::34833:response
##     log:              myqueue:workers:Richs-MacBook-Pro.local::34833:log
##     envir:            {}

Note that the envir field is currently empty ({}) because when the worker started it did not know about any environments.

message_id <- obj$send_message("INFO")

Here’s the new worker information, complete with an updated envir field:

obj$get_response(message_id, worker_id, delete=TRUE, wait=10)
##     version:          0.1.4 [LOCAL]
##     platform:         x86_64-apple-darwin13.4.0 (64-bit)
##     running:          OS X 10.10.4 (Yosemite)
##     hostname:         Richs-MacBook-Pro.local
##     pid:              34833
##     redis_host:       127.0.0.1
##     redis_port:       6379
##     worker:           Richs-MacBook-Pro.local::34833
##     queue_name:       myqueue
##     heartbeat_period: 30
##     heartbeat_expire: 90
##     message:          myqueue:workers:Richs-MacBook-Pro.local::34833:message
##     response:         myqueue:workers:Richs-MacBook-Pro.local::34833:response
##     log:              myqueue:workers:Richs-MacBook-Pro.local::34833:log
##     envir:            {9ba846cec2d2f060b336b9b90f8fd19d}

This has been updated on the database copy too:

obj$workers_info()[[worker_id]]$envir
## [1] "9ba846cec2d2f060b336b9b90f8fd19d"

and the same information is printed to the worker log:

[2015-09-28 12:41:07] MESSAGE INFO
                                                       __
                ______________ ___  _____  __  _____  / /
      ______   / ___/ ___/ __ `/ / / / _ \/ / / / _ \/ /  ______
     /_____/  / /  / /  / /_/ / /_/ /  __/ /_/ /  __/_/  /_____/
 ______      /_/  /_/   \__, /\__,_/\___/\__,_/\___(_)      ______
/_____/                   /_/                              /_____/
    version:          0.1.4 [LOCAL]
    platform:         x86_64-apple-darwin13.4.0 (64-bit)
    running:          OS X 10.10.4 (Yosemite)
    hostname:         Richs-MacBook-Pro.local
    pid:              34833
    redis_host:       127.0.0.1
    redis_port:       6379
    worker:           Richs-MacBook-Pro.local::34833
    queue_name:       myqueue
    heartbeat_period: 30
    heartbeat_expire: 90
    message:          myqueue:workers:Richs-MacBook-Pro.local::34833:message
    response:         myqueue:workers:Richs-MacBook-Pro.local::34833:response
    log:              myqueue:workers:Richs-MacBook-Pro.local::34833:log
    envir:            {9ba846cec2d2f060b336b9b90f8fd19d}
[2015-09-28 12:41:07] RESPONSE INFO

DIR

This is useful for listing directory contents, similar to the dir function in R. However, because file contents are usually more interesting (e.g., working out why something is not running on the remote machine), this is basically the result of passing the results of dir to tools::md5sum in order to get the md5sum of the file.

message_id <- obj$send_message("DIR")
obj$get_response(message_id, worker_id, delete=TRUE, wait=10)
##                  introduction.html                     introduction.R 
## "4a24f94f79ff6295e7ebd48b4a94fa75" "2c960b07e526b5184a52e23fc9f30464" 
##                   introduction.Rmd                         messages.R 
## "f87d11eaa23c90d6950a358612c5f6c2" "8f3f82d8c2d967e92e6c40d2b64fd6ff" 
##                       messages.Rmd                           myfuns.R 
## "31c1ff5d0f86684f4e4f9b863ecbf862" "ba6a3703d48d4686341a511559152b4c" 
##                                src 
##                                 NA

Additional arguments to dir can be passed through:

message_id <- obj$send_message("DIR", list(pattern="\\.R$"))
obj$get_response(message_id, worker_id, delete=TRUE, wait=10)
##                     introduction.R                         messages.R 
## "2c960b07e526b5184a52e23fc9f30464" "c4b62ef9689e01828dac78765ad029ce" 
##                           myfuns.R 
## "ba6a3703d48d4686341a511559152b4c"

If you pass in invalid arguments to dir, then a reasonably helpful message should be generated:

message_id <- obj$send_message("DIR", list(foo="bar"))
obj$get_response(message_id, worker_id, delete=TRUE, wait=10)
## [1] "Error in dir(foo = \"bar\") : unused argument (foo = \"bar\")\n"
## attr(,"class")
## [1] "try-error"
## attr(,"condition")
## <simpleError in dir(foo = "bar"): unused argument (foo = "bar")>

(note that this does not generate an error locally, but you can test to see if it did throw an error by checking the class of the returned value).

and the same information is printed to the worker log:

[2015-09-28 12:41:07] MESSAGE DIR
[2015-09-28 12:41:07] RESPONSE DIR
[2015-09-28 12:41:07] MESSAGE DIR
[2015-09-28 12:41:07] RESPONSE DIR
[2015-09-28 12:41:07] MESSAGE DIR
Error in dir(foo = "bar") : unused argument (foo = "bar")
[2015-09-28 12:41:07] RESPONSE DIR

PUSH

The commands PUSH and PULL move files from and to the worker. The command is interpreted as an instruction to the worker so PUSH pushes files from the worker into the database while PULL pulls files from the database into the worker. There are (will be) prefereable higher-level ways of dealing with this.

Things to be aware of here: Redis is an in memory store and rrqueue is not at all agressive about deleting objects. If you push a 1GB file into Redis things will go badly. There are no checks for this at present!

PUSH takes a vector of filename as an argument. The response is not the file itself (how could it do that?) but instead the hash of that file. By the time the response is recieved the file contents are stored in the database and can be returned.

message_id <- obj$send_message("PUSH", "myfuns.R")
res <- obj$get_response(message_id, worker_id, delete=TRUE, wait=10)
res
##                           myfuns.R 
## "ba6a3703d48d4686341a511559152b4c" 
## attr(,"class")
## [1] "files_pack"

We can save the file onto a temporary directory in the filesystem using the method of :

path <- obj$files_unpack(res)
dir(path)
## [1] "myfuns.R"

And the files have the expected hash:

tools::md5sum(file.path(path, names(res)))
## /var/folders/3z/86tv450j7kb4w5y4wpxj6d5r0000gn/T//Rtmpy6z8pC/file87f33a28dbbc/myfuns.R 
##                                                     "ba6a3703d48d4686341a511559152b4c"
[2015-09-28 12:41:08] MESSAGE PUSH
[2015-09-28 12:41:08] PUSH
[2015-09-28 12:41:08] RESPONSE PUSH

PULL

This is the inverse of PUSH and takes files from the machine the queue is running on and copies them into the worker (from the view of the worker, the files in question are already in the database and it will “pull” them down locally.

First, we need to save files into the database. Let’s rename the temporary file above and save that:

file.rename(file.path(path, "myfuns.R"),
            "brandnewcode.R")
## [1] TRUE
res <- obj$files_pack("brandnewcode.R")
res
##                     brandnewcode.R 
## "ba6a3703d48d4686341a511559152b4c" 
## attr(,"class")
## [1] "files_pack"

Note that the hash here is the same as above: rrqueue can tell this is the same file even though it has the same filename. Note also that filenames will be interepted relative to the working directory, because the directory layout on the worker outside of this point could be arbitrarily different.

Now the the files have been packed, we can run the PULL command:

(note that the PULL command always unpacks files into the workers working directory).

message_id <- obj$send_message("PULL")
obj$get_response(message_id, worker_id, delete=TRUE, wait=10)
## [1] "OK"

And the new file will be present in the directory:

message_id <- obj$send_message("DIR", list(pattern="\\.R$"))
obj$get_response(message_id, worker_id, delete=TRUE, wait=10)
##                     brandnewcode.R                     introduction.R 
## "ba6a3703d48d4686341a511559152b4c" "2c960b07e526b5184a52e23fc9f30464" 
##                         messages.R                           myfuns.R 
## "90a5682ad3203d07b35c511911d2d0e2" "ba6a3703d48d4686341a511559152b4c"
[2015-09-28 12:41:08] MESSAGE PULL
[2015-09-28 12:41:08] PULL
[2015-09-28 12:41:08] RESPONSE PULL
[2015-09-28 12:41:08] MESSAGE DIR
[2015-09-28 12:41:08] RESPONSE DIR

EVAL

Evaluate an arbitrary R expression, passed as a string (not as any sort of unevaluated or quoted expression). This expression is evaluated in the global environment, which is not the environment in which queued code is evaluated in.

message_id <- obj$send_message("EVAL", "1 + 1")
obj$get_response(message_id, worker_id, delete=TRUE, wait=10)
## [1] 2

We can delete the file created above:

message_id <- obj$send_message("EVAL", "file.remove('brandnewcode.R')")
obj$get_response(message_id, worker_id, delete=TRUE, wait=10)
## [1] TRUE
[2015-09-28 12:41:08] MESSAGE EVAL
[1] 2
[2015-09-28 12:41:08] RESPONSE EVAL
[2015-09-28 12:41:08] MESSAGE EVAL
[1] TRUE
[2015-09-28 12:41:08] RESPONSE EVAL

This could be used to evaluate code that has side effects, such as installing packages. However, due to limitations with how R loads packages the only way to update and reload a package is going to be to restart the worker.

STOP

Stop sends a shutdown message to the worker. Generally you should prefer the stop_workers method, which uses STOP behind the scenes.

message_id <- obj$send_message("STOP")
obj$get_response(message_id, worker_id, delete=TRUE, wait=10)
## [1] "BYE"
[2015-09-28 12:41:08] MESSAGE STOP
[2015-09-28 12:41:08] RESPONSE STOP
[2015-09-28 12:41:08] STOP OK