queue(queue_name, packages = NULL, sources = NULL, redis_host = "127.0.0.1", redis_port = 6379, global = TRUE, config = NULL)
Create an rrqueue queue. A queue requires a queue name and a set
of packages and sources to load. The sources and packages
together define an "environment"; on the worker these packages
will be loaded and source files will be source-ed.
The default values for redis_host and redis_port
correspond to Redis' defaults; if your Redis server is configured
differently or available over an internet connection you will need
to adjust these accordingly.
The queue objects can be created and desroyed at will; all
the data is stored on the server. Once a queue is created it can
also be connected to by the observer object for read-only
access.
enqueueUsage:
enqueue(expr = , envir = parent.frame(), key_complete = NULL,
group = NULL)
Arguments:
exprenvirAn environment in which local variables required to compute expr can be found. These will be evaluated and added to the Redis database.
key_completean optional string representing the Redis key to write to when the task is complete. You generally don't need to modify this, but is used in some higher-level functions (such as link{rrqlapply}) to keep track of task completions efficiently.
groupAn optional human-readable "group" to add the task to. There are methods for addressing sets of tasks using this group.
Details:
This method uses non standard evaluation and the enqueue_ form may be prefereable for programming.
Value:
invisibly, a link{task} object, which can be used to monitor the status of the task.
enqueue_The workhorse version of enqueue which uses standard evaluation and is therefore more suitable for programming. All arguments are the same as enqueue_ except for eval.
Usage:
enqueue_(expr = , envir = parent.frame(), key_complete = NULL,
group = NULL)
Arguments:
exprenvirkey_completegrouprequeueRe-queue a task that has been orphaned by worker failure.
Usage:
requeue(task_id = )
Arguments:
task_idDetails:
If a worker fails (either an unhandled exception, R crash, network loss or machine loss) then if the worker was running a heartbeat process the task will eventually be flagged as orphaned. If this happens then the task can be requeued. Functions for fetching and querying tasks take a follow_redirect argument which can be set to TRUE so that this new, requeued, task is found instead of the old task.
Value:
invisibly, a task object.
send_messageSend a message to one or more (or all) workers. Messages can be used to retrieve information from workers, to shut them down and are useful in debugging. See Details for possible messages and their action.
Usage:
send_message(command = , args = NULL, worker_ids = NULL)
Arguments:
commandName of the command to run; one of "PING", "ECHO", "EVAL", "STOP", "INFO", "ENVIR", "PUSH", "PULL", "DIR". See Details.
argsArguments to pass through to commands. Some commands require arguments, others do not. See Details.
worker_idsOptional vector of worker ids to send the message to. If this is omitted (or NULL then try all workers that rrqueue knows about.
Details:
The possible types of message are
PINGobserver for how to access) and to the response queue. Ignores any argument.ECHOINFOworkers_info in observer. Worker will print info to stderr, write it to the appropriate place in the database and return it in the response queue. Ignores any argument.DIRPUSHPULLEVALrun_message("EVAL", "sin(1)")). The output is printed to stdout, the worker log and to the response queue. Requires a single argument.# the interface here is likely to change, so I'll withdraw the # documentation for now: # ENVIR: Tell the worker to try an load an environment, whose # id is given as a single argument. Requires a single argument.
STOPAfter sending a message, there is no guarantee about how long it will take to process. If the worker is involved in a long-running computation it will be unavailable to process the message. However, it will process the message before running any new task.
The message id is worth saving. It can be passed to the method get_respones to wait for and retrieve responses from one or more workers.
Value:
The "message id" which can be used to retrieve messages with has_responses, get_responses and get_response.
has_responsesDetect which workers have responses ready for a given message id.
Usage:
has_responses(message_id = , worker_ids = NULL)
Arguments:
message_idsend_message
worker_idsOptional vector of worker ids to send the message to. If this is omitted (or NULL then try all workers that rrqueue knows about.
Value:
A named logical vector; names are worker ids, the value is TRUE for each worker for which a response is ready and FALSE for workers where a response is not ready.
get_responsesRetrieve responses to a give message id from one or more workers.
Usage:
get_responses(message_id = , worker_ids = NULL, delete = FALSE,
wait = 0)
Arguments:
message_idsend_message
worker_idsOptional vector of worker ids to send the message to. If this is omitted (or NULL then try all workers that rrqueue knows about.
deletedelete the response after a successful retrieval of all responses?
waitNumber of seconds to wait for a response. We poll the database repeatedly during this interval. If 0, then a response is requested immediately. If no response is recieved from all workers in time, an error is raised.
Value: Always returns a list, even if only one worker id is given.
get_responseAs for get_responses, but only for a single worker id, and returns the value of the response rather than a list.
Usage:
get_response(message_id = , worker_id = , delete = FALSE, wait = 0)
Arguments:
message_idworker_iddeletewaitresponse_idsGet list of message ids that a given worker has responses for.
Usage:
response_ids(worker_id = )
Arguments:
worker_idtasks_dropUsage:
tasks_drop(task_ids = )
Arguments:
task_idsfiles_packUsage:
files_pack(... = , files = c(...))
Arguments:
...files...
files_unpackUsage:
files_unpack(pack = , path = tempfile())
Arguments:
packfiles_pack object, created by files_pack or returned as a response to a PUSH response.
pathtempfile() (the default) guarantees not to overwrite anything. This method returns path invisibly so you can move files around easily afterwards.
tasks_set_groupSet the group name for one or more tasks. The tasks can be pending, running or completed, and the tasks can already have a group ir can be groupless. Once tasks have been grouped they can be easier to work with as a set (see tasks_in_groups and task_bundle_get in observer.
Usage:
tasks_set_group(task_ids = , group = , exists_action = "stop")
Arguments:
task_idsgroupexists_actionBehaviour when a group name already exists for a given task. Options are "stop" (throw an error, the default), "warn" (warn, but don't rename), "pass" (don't warn, don't rename) and "overwrite" (replace the group name).
stop_workersStop some or all rrqueue workers.
Usage:
stop_workers(worker_ids = NULL, type = "message", interrupt = TRUE,
wait = 0)
Arguments:
worker_idsOptional vector of worker ids to send the message to. If this is omitted (or NULL then try all workers that rrqueue knows about.
typeway to stop workers; options are "message" (the default) or "kill". See Details for more information.
interruptShould busy workers be interrupted after sending a message? See Details.
waitHow long to wait after sending a message for a response to be retrieved. If this is greater than zero, any unresponsive workers will be killed.
Details:
Stopping remote workers is fairly tricky because we can't really talk to them, they might be working on a task, or worse they might be working on a task that does not listen for interrupt (custom C/C++ code is a common culprit here).
The default behaviour of this function is to send a STOP message and then immediately send an interrupt signal to all workers that have status "BUSY". This should work in most cases. Wait a second or two and then check workers_list_exited() to make sure that all workers are listed.
To let workers finish whatever task they are working on, specify interrupt=FALSE. The STOP message will be the next thing the workers process, so they will shut down as soon as they finish the task.
To ensure that workers do stop in some timeframe, specify a time. Passing time=5 will send a STOP signal (and possibly an interrupt) and then poll for responses from all workers for 5 seconds. Any worker that has not completed within this time will then be killed. If all workers respond in time, the function will exit more quickly, so you can use an overestimate.
If you just want to kill the workers outright, use type="kill" which will send a SIGTERM via the database. No other checks are done as the worker will be unceremoniously halted.
If you want to kill a local worker and just want it dead, you can use type="kill_local" which will use tools::pskill to terminate the process. This is really a line of last resort.