Create an rrqueue queue

Usage

queue(queue_name, packages = NULL, sources = NULL, redis_host = "127.0.0.1", redis_port = 6379, global = TRUE, config = NULL)

Arguments

queue_name
Queue name (scalar character)
packages
Optional character vector of packages to load
sources
Optional character vector of files to source
redis_host
Redis hostname
redis_port
Redis port number
global
Source files into the global environment? This is a good idea for working with the "user friendly" functions. See issue 2 on github.
config
Configuration file of key/value pairs in yaml format. See the package README for an example. If given, additional arguments to this function override values in the file which in turn override defaults of this function.

Description

Create an rrqueue queue. A queue requires a queue name and a set of packages and sources to load. The sources and packages together define an "environment"; on the worker these packages will be loaded and source files will be source-ed.

Details

The default values for redis_host and redis_port correspond to Redis' defaults; if your Redis server is configured differently or available over an internet connection you will need to adjust these accordingly.

The queue objects can be created and desroyed at will; all the data is stored on the server. Once a queue is created it can also be connected to by the observer object for read-only access.

Methods

enqueue
The main queuing function.

Usage: enqueue(expr = , envir = parent.frame(), key_complete = NULL, group = NULL)

Arguments:

expr
An unevaluated expression to be evaluated

envir

An environment in which local variables required to compute expr can be found. These will be evaluated and added to the Redis database.

key_complete

an optional string representing the Redis key to write to when the task is complete. You generally don't need to modify this, but is used in some higher-level functions (such as link{rrqlapply}) to keep track of task completions efficiently.

group

An optional human-readable "group" to add the task to. There are methods for addressing sets of tasks using this group.

Details:

This method uses non standard evaluation and the enqueue_ form may be prefereable for programming.

Value:

invisibly, a link{task} object, which can be used to monitor the status of the task.

enqueue_

The workhorse version of enqueue which uses standard evaluation and is therefore more suitable for programming. All arguments are the same as enqueue_ except for eval.

Usage: enqueue_(expr = , envir = parent.frame(), key_complete = NULL, group = NULL)

Arguments:

expr
Either a language object (quoted expression)

envir
Environment to find locals (see `enqueue`)

key_complete
See `enqueue`

group
See `enqueue`

requeue

Re-queue a task that has been orphaned by worker failure.

Usage: requeue(task_id = )

Arguments:

task_id
Task id number

Details:

If a worker fails (either an unhandled exception, R crash, network loss or machine loss) then if the worker was running a heartbeat process the task will eventually be flagged as orphaned. If this happens then the task can be requeued. Functions for fetching and querying tasks take a follow_redirect argument which can be set to TRUE so that this new, requeued, task is found instead of the old task.

Value:

invisibly, a task object.

send_message

Send a message to one or more (or all) workers. Messages can be used to retrieve information from workers, to shut them down and are useful in debugging. See Details for possible messages and their action.

Usage: send_message(command = , args = NULL, worker_ids = NULL)

Arguments:

command

Name of the command to run; one of "PING", "ECHO", "EVAL", "STOP", "INFO", "ENVIR", "PUSH", "PULL", "DIR". See Details.

args

Arguments to pass through to commands. Some commands require arguments, others do not. See Details.

worker_ids

Optional vector of worker ids to send the message to. If this is omitted (or NULL then try all workers that rrqueue knows about.

Details:

The possible types of message are

PING
send a "PING" to the worker. It will respond by replying PONG to its stderr, to its log (see observer for how to access) and to the response queue. Ignores any argument.

ECHO
Like "PING", but the worker responds by echoing the string given. Requires one argument.

INFO
Refresh the worker info (see workers_info in observer. Worker will print info to stderr, write it to the appropriate place in the database and return it in the response queue. Ignores any argument.

DIR
Tell the worker to return directory contents and md5 hashes of files.

PUSH
Tell the worker to push files into the database. The arguments should be a vector of filenames to copy. The response queue will contain appropriate data for retrieving the files, but the interface here will change to make this nice to use.

PULL
Tells the worker to pull files into its working directory. Can be used to keep the worker in sync.

EVAL
Evaluate an arbitrary R expression as a string (e.g., run_message("EVAL", "sin(1)")). The output is printed to stdout, the worker log and to the response queue. Requires a single argument.

# the interface here is likely to change, so I'll withdraw the # documentation for now: # ENVIR: Tell the worker to try an load an environment, whose # id is given as a single argument. Requires a single argument.

STOP
Tell the worker to stop cleanly. Ignores any argument.

After sending a message, there is no guarantee about how long it will take to process. If the worker is involved in a long-running computation it will be unavailable to process the message. However, it will process the message before running any new task. The message id is worth saving. It can be passed to the method get_respones to wait for and retrieve responses from one or more workers.

Value:

The "message id" which can be used to retrieve messages with has_responses, get_responses and get_response.

has_responses

Detect which workers have responses ready for a given message id.

Usage: has_responses(message_id = , worker_ids = NULL)

Arguments:

message_id
id of the message (as returned by send_message

worker_ids

Optional vector of worker ids to send the message to. If this is omitted (or NULL then try all workers that rrqueue knows about.

Value:

A named logical vector; names are worker ids, the value is TRUE for each worker for which a response is ready and FALSE for workers where a response is not ready.

get_responses

Retrieve responses to a give message id from one or more workers.

Usage: get_responses(message_id = , worker_ids = NULL, delete = FALSE, wait = 0)

Arguments:

message_id
id of the message (as returned by send_message

worker_ids

Optional vector of worker ids to send the message to. If this is omitted (or NULL then try all workers that rrqueue knows about.

delete

delete the response after a successful retrieval of all responses?

wait

Number of seconds to wait for a response. We poll the database repeatedly during this interval. If 0, then a response is requested immediately. If no response is recieved from all workers in time, an error is raised.

Value: Always returns a list, even if only one worker id is given.

get_response

As for get_responses, but only for a single worker id, and returns the value of the response rather than a list.

Usage: get_response(message_id = , worker_id = , delete = FALSE, wait = 0)

Arguments:

message_id
message id

worker_id
single worker id

delete
delete response after successful retrieval?

wait
how long to wait for a message, in seconds

response_ids

Get list of message ids that a given worker has responses for.

Usage: response_ids(worker_id = )

Arguments:

worker_id
single worker id

tasks_drop
Drop tasks from the database

Usage: tasks_drop(task_ids = )

Arguments:

task_ids
Vector of task ids to drop

files_pack
Pack files into the Redis database

Usage: files_pack(... = , files = c(...))

Arguments:

...
filenames

files
a vector of filename, used in place of ...

files_unpack
Unpack files from the Redis database onto the filesystem.

Usage: files_unpack(pack = , path = tempfile())

Arguments:

pack
a files_pack object, created by files_pack or returned as a response to a PUSH response.

path
path to unpack files. Files will be overwritten without warning, so using tempfile() (the default) guarantees not to overwrite anything. This method returns path invisibly so you can move files around easily afterwards.

tasks_set_group

Set the group name for one or more tasks. The tasks can be pending, running or completed, and the tasks can already have a group ir can be groupless. Once tasks have been grouped they can be easier to work with as a set (see tasks_in_groups and task_bundle_get in observer.

Usage: tasks_set_group(task_ids = , group = , exists_action = "stop")

Arguments:

task_ids
Vector of task ids

group
Single group name

exists_action

Behaviour when a group name already exists for a given task. Options are "stop" (throw an error, the default), "warn" (warn, but don't rename), "pass" (don't warn, don't rename) and "overwrite" (replace the group name).

stop_workers

Stop some or all rrqueue workers.

Usage: stop_workers(worker_ids = NULL, type = "message", interrupt = TRUE, wait = 0)

Arguments:

worker_ids

Optional vector of worker ids to send the message to. If this is omitted (or NULL then try all workers that rrqueue knows about.

type

way to stop workers; options are "message" (the default) or "kill". See Details for more information.

interrupt

Should busy workers be interrupted after sending a message? See Details.

wait

How long to wait after sending a message for a response to be retrieved. If this is greater than zero, any unresponsive workers will be killed.

Details:

Stopping remote workers is fairly tricky because we can't really talk to them, they might be working on a task, or worse they might be working on a task that does not listen for interrupt (custom C/C++ code is a common culprit here).

The default behaviour of this function is to send a STOP message and then immediately send an interrupt signal to all workers that have status "BUSY". This should work in most cases. Wait a second or two and then check workers_list_exited() to make sure that all workers are listed.

To let workers finish whatever task they are working on, specify interrupt=FALSE. The STOP message will be the next thing the workers process, so they will shut down as soon as they finish the task.

To ensure that workers do stop in some timeframe, specify a time. Passing time=5 will send a STOP signal (and possibly an interrupt) and then poll for responses from all workers for 5 seconds. Any worker that has not completed within this time will then be killed. If all workers respond in time, the function will exit more quickly, so you can use an overestimate.

If you just want to kill the workers outright, use type="kill" which will send a SIGTERM via the database. No other checks are done as the worker will be unceremoniously halted.

If you want to kill a local worker and just want it dead, you can use type="kill_local" which will use tools::pskill to terminate the process. This is really a line of last resort.