## Could not initialise default postgres database. If postgres is running
## check that the environment variables PGHOST, PGPORT,
## PGUSER, PGPASSWORD, and PGDATABASE, are defined and
## point to your database.
Imagine you have an R process that is relatively intensive, based on user input.
To keep things as fast as possible, you may want to use several servers to all process incoming requests for square roots. However, to do this you need to co-ordinate between all of your servers (or workers). How do you decide which server works on what? What if one server dies mid-way? To decide this, we need a work queue, also known as a job queue or task queue. This document will show show you how to build a work queue system using R and PostgreSQL that would ordinarily require an external tool, like RabbitMQ.
In this example, our work will be generating square roots. We’ll keep track of the results in a table:
library(DBI)
con <- dbConnect(RPostgres::Postgres())
dbExecute(con, "DROP TABLE IF EXISTS sqroot_vignette_example;")
dbExecute(con, "
CREATE TABLE sqroot_vignette_example (
in_val INTEGER PRIMARY KEY,
out_val DOUBLE PRECISION NULL
)
")
When a client wants a square root value, it can insert a new row into
a table, filling in_val
. We’ll then have a bunch of workers
that will calculate the results for the client, and fill in
out_val
.
To manage these workers, we will combine 2 PostgreSQL concepts:
The Postgres LISTEN
and NOTIFY
commands
allow you to send and receive messages between clients connected to a
PostgreSQL database. This is known as a publish/subscribe
architecture.
We tell Postgres that we are interested in receiving messages using
LISTEN
. For example:
…in this case, “grapevine” is arbitrary, we don’t need to create
channels ahead of time. To make sure we have something to receive, we
can start a separate R process using callr. Ordinarily
this would be part of another R script, maybe on another computer. This
will wait a bit, and use NOTIFY
to send a message, then
finish:
rp <- callr::r_bg(function() {
library(DBI)
Sys.sleep(0.3)
db_notify <- dbConnect(RPostgres::Postgres())
dbExecute(db_notify, "NOTIFY grapevine, 'psst'")
dbDisconnect(db_notify)
})
Finally, we should wait for any incoming messages. To do this, use
postgresWaitForNotify
. The payload will contain the message
from the other R process:
We can use LISTEN/NOTIFY to inform all workers that there is
something to be done, but how do we decide which worker actually does
the work? This is done using SKIP LOCKED
.
We notify all workers that the input 99
is ready for
processing. After receiving this, they all do the following:
rs <- dbSendQuery(con, "
SELECT in_val
FROM sqroot_vignette_example
WHERE in_val = $1
FOR UPDATE
SKIP LOCKED
", params = list(99))
One lucky worker will get a row back, but thanks to
FOR UPDATE
, the row is now locked. For any other worker, as
the row is now locked, they will skip over it (SKIP LOCKED
)
and find something else to do. If there are no other jobs available,
then nothing will be returned.
Using SKIP LOCKED is discussed in more detail in this article.
Now we can put the concepts together. The following implements our worker as a function (again, this would be running as a script on several servers):
worker <- function() {
library(DBI)
db_worker <- dbConnect(RPostgres::Postgres())
on.exit(dbDisconnect(db_worker))
dbExecute(db_worker, "LISTEN sqroot")
dbExecute(db_worker, "LISTEN sqroot_shutdown")
while (TRUE) {
# Wait for new work to do
n <- RPostgres::postgresWaitForNotify(db_worker, 60)
if (is.null(n)) {
# If nothing to do, send notifications of any not up-to-date work
dbExecute(db_worker, "
SELECT pg_notify('sqroot', in_val::TEXT)
FROM sqroot_vignette_example
WHERE out_val IS NULL
")
next
}
# If we've been told to shutdown, stop right away
if (n$channel == 'sqroot_shutdown') {
writeLines("Shutting down.")
break
}
in_val <- strtoi(n$payload)
tryCatch(
{
dbWithTransaction(db_worker, {
# Try and fetch the item we got notified about
rs <- dbSendQuery(db_worker, "
SELECT in_val
FROM sqroot_vignette_example
WHERE out_val IS NULL -- if another worker already finished, don't reprocess
AND in_val = $1
FOR UPDATE SKIP LOCKED -- Don't let another worker work on this at the same time
", params = list(in_val))
in_val <- dbFetch(rs)[1, 1]
dbClearResult(rs)
if (!is.na(in_val)) {
# Actually do the sqrt
writeLines(paste("Sqroot-ing", in_val, "... "))
Sys.sleep(in_val * 0.1)
out_val <- sqrt(in_val)
# Update the datbase with the result
dbExecute(db_worker, "
UPDATE sqroot_vignette_example
SET out_val = $1
WHERE in_val = $2
", params = list(out_val, in_val))
} else {
writeLines(paste("Not sqroot-ing as another worker got there first"))
}
})
},
error = function(e) {
# Something went wrong. Report error and carry on
writeLines(paste("Failed to sqroot:", e$message))
})
}
}
The worker connects to the database, starts listening and loops indefinitely.
Let’s use callr again to start 2 workers:
stdout_1 <- tempfile()
stdout_2 <- tempfile()
rp <- callr::r_bg(worker, stdout = stdout_1, stderr = stdout_1)
rp <- callr::r_bg(worker, stdout = stdout_2, stderr = stdout_2)
Sys.sleep(1) # Give workers a chance to set themselves up
Now our client can add some values to our table and notify the workers that there’s something to do:
con <- dbConnect(RPostgres::Postgres())
add_sqroot <- function(in_val) {
dbExecute(con, "
INSERT INTO sqroot_vignette_example (in_val) VALUES ($1)
", params = list(in_val))
dbExecute(con, "
SELECT pg_notify('sqroot', $1)
", params = list(in_val))
}
add_sqroot(7)
add_sqroot(8)
add_sqroot(9)
…after a wait, the answers should have been populated by the workers for us:
Sys.sleep(3)
rs <- dbSendQuery(con, "SELECT * FROM sqroot_vignette_example ORDER BY in_val")
dbFetch(rs)
dbClearResult(rs) ; rs <- NULL
Finally, we can use NOTIFY
to stop all the workers:
And see what messages were printed as they run:
# We can't control which worker will process the first entry,
# so we sort the results so the vignette output stays the same.
outputs <- sort(c(
paste(readLines(con = stdout_1), collapse = "\n"),
paste(readLines(con = stdout_2), collapse = "\n")))
writeLines(outputs[[1]])
writeLines(outputs[[2]])
Notice that the work has been shared between the 2 workers. If these 2 weren’t enough, we could happily add more to keep the system going.