In addition to passing tasks (and results) between a controller and workers, the controller can also send “messages” to workers. This vignette shows what the possible messages do.
In order to do this, we’re going to need a queue and a worker:
obj <- rrqueue::queue("myqueue", sources="myfuns.R")
## creating new queue
## 0 workers available
logfile <- tempfile()
worker_id <- rrqueue::worker_spawn("myqueue", logfile)
## new worker: Richs-MacBook-Pro.local::34833
On startup the worker log contains:
__
______________ ___ _____ __ _____ / /
______ / ___/ ___/ __ `/ / / / _ \/ / / / _ \/ / ______
/_____/ / / / / / /_/ / /_/ / __/ /_/ / __/_/ /_____/
______ /_/ /_/ \__, /\__,_/\___/\__,_/\___(_) ______
/_____/ /_/ /_____/
version: 0.1.4 [LOCAL]
platform: x86_64-apple-darwin13.4.0 (64-bit)
running: OS X 10.10.4 (Yosemite)
hostname: Richs-MacBook-Pro.local
pid: 34833
redis_host: 127.0.0.1
redis_port: 6379
worker: Richs-MacBook-Pro.local::34833
queue_name: myqueue
heartbeat_period: 30
heartbeat_expire: 90
message: myqueue:workers:Richs-MacBook-Pro.local::34833:message
response: myqueue:workers:Richs-MacBook-Pro.local::34833:response
log: myqueue:workers:Richs-MacBook-Pro.local::34833:log
envir: {}
[2015-09-28 12:41:05] ALIVE
[2015-09-28 12:41:05] ENVIR 9ba846cec2d2f060b336b9b90f8fd19d
[2015-09-28 12:41:05] ENVIR PACKAGES (none)
[2015-09-28 12:41:05] ENVIR SOURCES myfuns.R
Because one of the main effects of messages is to print to the worker logfile, we’ll print this fairly often.
The queue sends a message for one or more workers to process. The message has an identifier that is derived from the current time. Messages are written to a first-in-first-out queue, per worker, and are processed independently by workers who do not look to see if other workers have messages or are processing them.
As soon as a worker has finished processing any current job it will process the message (it must wait to finish a current job but will not start any further jobs).
Once the message has been processed (see below) a response will be written to a response list with the same identifier as the message.
PING
The PING
message simply asks the worker to return PONG
. It’s useful for diagnosing communication issues because it does so little
message_id <- obj$send_message("PING")
The message id is going to be useful for getting responses:
message_id
## [1] "1443440465.46735"
(this is derived from the current time, according to Redis which is the central reference point of time for the whole system).
wait a little while:
Sys.sleep(.5)
reader()
[2015-09-28 12:41:05] MESSAGE PING
PONG
[2015-09-28 12:41:05] RESPONSE PING
The logfile prints:
PING
(MESSAGE PING
)PONG
to the R message streamRESPONSE PONG
), which means that something is written to the response stream.We can access the same bits of information in the worker log:
obj$workers_log_tail(n=Inf)[-1]
## time command message
## 1 1443440465 ALIVE
## 2 1443440465 ENVIR 9ba846cec2d2f060b336b9b90f8fd19d
## 3 1443440465 MESSAGE PING
## 4 1443440465 RESPONSE PING
This includes the ALIVE
and ENVIR
bits as the worker comes up.
Inspecting the logs is fine for interactive use, but it’s going to be more useful often to poll for a response.
We already know that our worker has a response, but we can ask anyway:
obj$has_responses(message_id)
## Richs-MacBook-Pro.local::34833
## TRUE
Or inversely we can as what messages a given worker has responses for:
obj$response_ids(worker_id)
## [1] "1443440465.46735"
To fetch the responses from all workers it was sent to (always returning a named list):
obj$get_responses(message_id)
## $`Richs-MacBook-Pro.local::34833`
## [1] "PONG"
or to fetch the response from a given worker:
obj$get_response(message_id, worker_id)
## [1] "PONG"
The response can be deleted by passing delete=TRUE
to this method:
obj$get_response(message_id, worker_id, delete=TRUE)
## [1] "PONG"
after which recalling the message will throw an error:
obj$get_response(message_id, worker_id, delete=TRUE)
## Error in get_responses(self$con, self$keys, message_id, worker_ids, delete, : Response missing for workers: Richs-MacBook-Pro.local::34833
There is also a wait
argument that lets you wait until a response is ready. The slowdouble
command will take a few seconds, so to demonstrate:
obj$enqueue(slowdouble(2))
message_id <- obj$send_message("PING")
obj$get_response(message_id, worker_id, delete=TRUE, wait=10)
## [1] "PONG"
Looking at the log will show what went on here:
obj$workers_log_tail(n=4)[-1]
## time command message
## 1 1443440465 TASK_START 1
## 2 1443440467 TASK_COMPLETE 1
## 3 1443440467 MESSAGE PING
## 4 1443440467 RESPONSE PING
However, because the message is only processed after the task is completed, the response takes a while to come back. Equivalently, from the worker log:
[2015-09-28 12:41:05] TASK_START 1
[2015-09-28 12:41:05] EXPR slowdouble(2)
[2015-09-28 12:41:07] TASK_COMPLETE 1
[2015-09-28 12:41:07] MESSAGE PING
PONG
[2015-09-28 12:41:07] RESPONSE PING
ECHO
This is basically like PING
and not very interesting; it prints an arbitrary string to the log. It always returns "OK"
as a response.
message_id <- obj$send_message("ECHO", "hello world!")
obj$get_response(message_id, worker_id, delete=TRUE, wait=10)
## [1] "OK"
[2015-09-28 12:41:07] MESSAGE ECHO
hello world!
[2015-09-28 12:41:07] RESPONSE ECHO
INFO
The INFO
command refreshes and returns the worker information.
We already have a copy of the worker info; it was created when the worker started up:
obj$workers_info()[[worker_id]]
## version: 0.1.4 [LOCAL]
## platform: x86_64-apple-darwin13.4.0 (64-bit)
## running: OS X 10.10.4 (Yosemite)
## hostname: Richs-MacBook-Pro.local
## pid: 34833
## redis_host: 127.0.0.1
## redis_port: 6379
## worker: Richs-MacBook-Pro.local::34833
## queue_name: myqueue
## heartbeat_period: 30
## heartbeat_expire: 90
## message: myqueue:workers:Richs-MacBook-Pro.local::34833:message
## response: myqueue:workers:Richs-MacBook-Pro.local::34833:response
## log: myqueue:workers:Richs-MacBook-Pro.local::34833:log
## envir: {}
Note that the envir
field is currently empty ({}
) because when the worker started it did not know about any environments.
message_id <- obj$send_message("INFO")
Here’s the new worker information, complete with an updated envir
field:
obj$get_response(message_id, worker_id, delete=TRUE, wait=10)
## version: 0.1.4 [LOCAL]
## platform: x86_64-apple-darwin13.4.0 (64-bit)
## running: OS X 10.10.4 (Yosemite)
## hostname: Richs-MacBook-Pro.local
## pid: 34833
## redis_host: 127.0.0.1
## redis_port: 6379
## worker: Richs-MacBook-Pro.local::34833
## queue_name: myqueue
## heartbeat_period: 30
## heartbeat_expire: 90
## message: myqueue:workers:Richs-MacBook-Pro.local::34833:message
## response: myqueue:workers:Richs-MacBook-Pro.local::34833:response
## log: myqueue:workers:Richs-MacBook-Pro.local::34833:log
## envir: {9ba846cec2d2f060b336b9b90f8fd19d}
This has been updated on the database copy too:
obj$workers_info()[[worker_id]]$envir
## [1] "9ba846cec2d2f060b336b9b90f8fd19d"
and the same information is printed to the worker log:
[2015-09-28 12:41:07] MESSAGE INFO
__
______________ ___ _____ __ _____ / /
______ / ___/ ___/ __ `/ / / / _ \/ / / / _ \/ / ______
/_____/ / / / / / /_/ / /_/ / __/ /_/ / __/_/ /_____/
______ /_/ /_/ \__, /\__,_/\___/\__,_/\___(_) ______
/_____/ /_/ /_____/
version: 0.1.4 [LOCAL]
platform: x86_64-apple-darwin13.4.0 (64-bit)
running: OS X 10.10.4 (Yosemite)
hostname: Richs-MacBook-Pro.local
pid: 34833
redis_host: 127.0.0.1
redis_port: 6379
worker: Richs-MacBook-Pro.local::34833
queue_name: myqueue
heartbeat_period: 30
heartbeat_expire: 90
message: myqueue:workers:Richs-MacBook-Pro.local::34833:message
response: myqueue:workers:Richs-MacBook-Pro.local::34833:response
log: myqueue:workers:Richs-MacBook-Pro.local::34833:log
envir: {9ba846cec2d2f060b336b9b90f8fd19d}
[2015-09-28 12:41:07] RESPONSE INFO
DIR
This is useful for listing directory contents, similar to the dir
function in R. However, because file contents are usually more interesting (e.g., working out why something is not running on the remote machine), this is basically the result of passing the results of dir
to tools::md5sum
in order to get the md5sum of the file.
message_id <- obj$send_message("DIR")
obj$get_response(message_id, worker_id, delete=TRUE, wait=10)
## introduction.html introduction.R
## "4a24f94f79ff6295e7ebd48b4a94fa75" "2c960b07e526b5184a52e23fc9f30464"
## introduction.Rmd messages.R
## "f87d11eaa23c90d6950a358612c5f6c2" "8f3f82d8c2d967e92e6c40d2b64fd6ff"
## messages.Rmd myfuns.R
## "31c1ff5d0f86684f4e4f9b863ecbf862" "ba6a3703d48d4686341a511559152b4c"
## src
## NA
Additional arguments to dir
can be passed through:
message_id <- obj$send_message("DIR", list(pattern="\\.R$"))
obj$get_response(message_id, worker_id, delete=TRUE, wait=10)
## introduction.R messages.R
## "2c960b07e526b5184a52e23fc9f30464" "c4b62ef9689e01828dac78765ad029ce"
## myfuns.R
## "ba6a3703d48d4686341a511559152b4c"
If you pass in invalid arguments to dir
, then a reasonably helpful message should be generated:
message_id <- obj$send_message("DIR", list(foo="bar"))
obj$get_response(message_id, worker_id, delete=TRUE, wait=10)
## [1] "Error in dir(foo = \"bar\") : unused argument (foo = \"bar\")\n"
## attr(,"class")
## [1] "try-error"
## attr(,"condition")
## <simpleError in dir(foo = "bar"): unused argument (foo = "bar")>
(note that this does not generate an error locally, but you can test to see if it did throw an error by checking the class of the returned value).
and the same information is printed to the worker log:
[2015-09-28 12:41:07] MESSAGE DIR
[2015-09-28 12:41:07] RESPONSE DIR
[2015-09-28 12:41:07] MESSAGE DIR
[2015-09-28 12:41:07] RESPONSE DIR
[2015-09-28 12:41:07] MESSAGE DIR
Error in dir(foo = "bar") : unused argument (foo = "bar")
[2015-09-28 12:41:07] RESPONSE DIR
PUSH
The commands PUSH
and PULL
move files from and to the worker. The command is interpreted as an instruction to the worker so PUSH
pushes files from the worker into the database while PULL
pulls files from the database into the worker. There are (will be) prefereable higher-level ways of dealing with this.
Things to be aware of here: Redis is an in memory store and rrqueue is not at all agressive about deleting objects. If you push a 1GB file into Redis things will go badly. There are no checks for this at present!
PUSH
takes a vector of filename as an argument. The response is not the file itself (how could it do that?) but instead the hash of that file. By the time the response is recieved the file contents are stored in the database and can be returned.
message_id <- obj$send_message("PUSH", "myfuns.R")
res <- obj$get_response(message_id, worker_id, delete=TRUE, wait=10)
res
## myfuns.R
## "ba6a3703d48d4686341a511559152b4c"
## attr(,"class")
## [1] "files_pack"
We can save the file onto a temporary directory in the filesystem using the method of :
path <- obj$files_unpack(res)
dir(path)
## [1] "myfuns.R"
And the files have the expected hash:
tools::md5sum(file.path(path, names(res)))
## /var/folders/3z/86tv450j7kb4w5y4wpxj6d5r0000gn/T//Rtmpy6z8pC/file87f33a28dbbc/myfuns.R
## "ba6a3703d48d4686341a511559152b4c"
[2015-09-28 12:41:08] MESSAGE PUSH
[2015-09-28 12:41:08] PUSH
[2015-09-28 12:41:08] RESPONSE PUSH
PULL
This is the inverse of PUSH
and takes files from the machine the queue is running on and copies them into the worker (from the view of the worker, the files in question are already in the database and it will “pull” them down locally.
First, we need to save files into the database. Let’s rename the temporary file above and save that:
file.rename(file.path(path, "myfuns.R"),
"brandnewcode.R")
## [1] TRUE
res <- obj$files_pack("brandnewcode.R")
res
## brandnewcode.R
## "ba6a3703d48d4686341a511559152b4c"
## attr(,"class")
## [1] "files_pack"
Note that the hash here is the same as above: rrqueue
can tell this is the same file even though it has the same filename. Note also that filenames will be interepted relative to the working directory, because the directory layout on the worker outside of this point could be arbitrarily different.
Now the the files have been packed, we can run the PULL command:
(note that the PULL
command always unpacks files into the workers working directory).
message_id <- obj$send_message("PULL")
obj$get_response(message_id, worker_id, delete=TRUE, wait=10)
## [1] "OK"
And the new file will be present in the directory:
message_id <- obj$send_message("DIR", list(pattern="\\.R$"))
obj$get_response(message_id, worker_id, delete=TRUE, wait=10)
## brandnewcode.R introduction.R
## "ba6a3703d48d4686341a511559152b4c" "2c960b07e526b5184a52e23fc9f30464"
## messages.R myfuns.R
## "90a5682ad3203d07b35c511911d2d0e2" "ba6a3703d48d4686341a511559152b4c"
[2015-09-28 12:41:08] MESSAGE PULL
[2015-09-28 12:41:08] PULL
[2015-09-28 12:41:08] RESPONSE PULL
[2015-09-28 12:41:08] MESSAGE DIR
[2015-09-28 12:41:08] RESPONSE DIR
EVAL
Evaluate an arbitrary R expression, passed as a string (not as any sort of unevaluated or quoted expression). This expression is evaluated in the global environment, which is not the environment in which queued code is evaluated in.
message_id <- obj$send_message("EVAL", "1 + 1")
obj$get_response(message_id, worker_id, delete=TRUE, wait=10)
## [1] 2
We can delete the file created above:
message_id <- obj$send_message("EVAL", "file.remove('brandnewcode.R')")
obj$get_response(message_id, worker_id, delete=TRUE, wait=10)
## [1] TRUE
[2015-09-28 12:41:08] MESSAGE EVAL
[1] 2
[2015-09-28 12:41:08] RESPONSE EVAL
[2015-09-28 12:41:08] MESSAGE EVAL
[1] TRUE
[2015-09-28 12:41:08] RESPONSE EVAL
This could be used to evaluate code that has side effects, such as installing packages. However, due to limitations with how R loads packages the only way to update and reload a package is going to be to restart the worker.
STOP
Stop sends a shutdown message to the worker. Generally you should prefer the stop_workers
method, which uses STOP
behind the scenes.
message_id <- obj$send_message("STOP")
obj$get_response(message_id, worker_id, delete=TRUE, wait=10)
## [1] "BYE"
[2015-09-28 12:41:08] MESSAGE STOP
[2015-09-28 12:41:08] RESPONSE STOP
[2015-09-28 12:41:08] STOP OK