I recently discovered that relational databases can be used as a job queue. For some reason, I thought it was not possible, and tools like RabbitMQ, Apache Kafka, Redis, Amazon SQS, were the only options for dispatching jobs from a set of producers to a set of consumers.
Don’t get me wrong, these special tools are probably better than a relational database in some cases, and they do solve real problems. But what if you already have a relational database in your IT system, and you need to implement a job queue? Wouldn’t it be nice to reuse an existing component and save on additional operational costs? I think that if scalability is not a concern, and all you need is to process long running jobs, a relational database is probably just enough.
In this article, I will show you one feature from SQL that can be use to implement a job queuing system.
PostgreSQL’s features for job queues
PostgreSQL provides two features that can be used for queuing jobs.
First, a SELECT
statement with the FOR UPDATE SKIP LOCKED
option will ensure that a lock is acquired for all rows that are selected, and rows that were already locked cannot be selected again. This ensures that a job is selected and processed only once.
Second, the LISTEN
and NOTIFY
statements can be used to push events to the workers that will process jobs. This way, you do not have to implement a polling mechanism, you can directly notify your workers when a new job is added to the queue.
An example implementation
In the rest of this article, I will show you a prototype of a job queue implemented with PostgreSQL. Producers and consumers for the queue will be implemented in Python, and will be responsible respectively for adding jobs, or processing them.
Installation
In this section, I assume that you are starting from a base Ubuntu 24.04 instance.
First, we setup Python to be able to connect to a PostgreSQL database.
sudo apt install python3 python3-venv
# Create the virtual environment
python3 -m venv venv
source venv/bin/activate
# Install the PIP package for PostgreSQL
pip install "psycopg[binary,pool]"
Then, we install PostgreSQL locally.
sudo apt install postgresql
# No need to configure remote connections if we use the server locally.
# However, we do need to change the password for the default user.
sudo -u postgres psql template1
# Type this in the postgres prompt
ALTER USER postgres with encrypted password 'your_password';
# Then type \q to quit
\q
# Reboot the PostgreSQL server to take in account the new configuration
sudo systemctl restart postgresql.service
Database creation
We will use the PostgreSQL CLI tool to create the table that will contain the jobs that our system needs to process.
Each job will have:
- A unique ID, that is auto incrementing primary key
- A status, which is either 0 when the job is pending, or 1 when it’s done
- A result, which is 0 by default, and contains the result after processing
To add the table, we first connect in CLI to the database.
# We connect to the default database template1
sudo -u postgres psql template1
With the SQL prompt, we create the table by typing the following.
CREATE TABLE jobs (
id SERIAL PRIMARY KEY,
status SMALLINT,
result INTEGER
);
CREATE INDEX idx_status ON jobs (status);
Job producer
A python script will be responsible for adding N jobs to the queue. It will add one job every 50 milliseconds until all jobs are added.
#! /usr/bin/env python
import argparse
import psycopg
from time import sleep
class JobProducer:
def __init__(self):
self.connection = None
def __del__(self):
if self.connection is not None:
self.connection.close()
def connect(self):
""" Connect to the postgres DB """
try:
self.connection = psycopg.connect(
dbname="template1",
user="postgres",
password="your_password",
host="localhost",
port="5432"
)
except (Exception, psycopg.DatabaseError) as error:
print(f"Error during connection: {error}")
def disconnect(self):
""" Disconnect from the postgres DB """
if self.connection is not None:
self.connection.close()
self.connection = None
def insert_new_job(self):
""" Insert a job in the database """
if self.connection is None:
raise RuntimeError("You need to be connected before you can insert jobs.")
try:
cursor = self.connection.cursor()
insert_query = "INSERT INTO jobs (status, result) VALUES (%s, %s)"
data_to_insert = ('0', '0')
cursor.execute(insert_query, data_to_insert)
self.connection.commit()
except (Exception, psycopg.DatabaseError) as error:
print(f"Error during insertion: {error}")
finally:
cursor.close()
def run_job_producer(number_jobs, per_job_delay):
job_producer = JobProducer()
job_producer.connect()
for i in range(number_jobs):
job_producer.insert_new_job()
print("Added a job!")
sleep(per_job_delay)
job_producer.disconnect()
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("N", help="Number of jobs to add to the queue", type=int)
args = parser.parse_args()
if args.N <= 0:
raise ValueError("The number of jobs to add must be positive.")
run_job_producer(args.N, per_job_delay=0.1)
Job consumers
Another python script will be responsible for processing jobs. The task of a job processor is to hold the job for 100 milliseconds, add a given number to its result, and set it as processed. If no job is available in the queue, the worker waits for 10 milliseconds. We will launch multiple instances of the job processor with different input parameters so that we can easily infer which processor processed which job.
#! /usr/bin/env python
import argparse
import psycopg
from time import sleep
class JobConsumer:
def __init__(self):
self.connection = None
def __del__(self):
if self.connection is not None:
self.connection.close()
def connect(self):
""" Connect to the postgres DB """
try:
self.connection = psycopg.connect(
dbname="template1",
user="postgres",
password="your_password",
host="localhost",
port="5432"
)
except (Exception, psycopg.DatabaseError) as error:
print(f"Error during connection: {error}")
def disconnect(self):
""" Disconnect from the postgres DB """
if self.connection is not None:
self.connection.close()
self.connection = None
def try_to_process_a_job(self, per_job_delay, add_to_results):
""" Try to process a job """
if self.connection is None:
raise RuntimeError("You need to be connected before you can process jobs.")
try:
# Start a transaction
cursor = self.connection.cursor()
# Select one pending job in the queue
select_query = """
SELECT id, result
FROM jobs
WHERE status = 0
FOR UPDATE SKIP LOCKED
LIMIT 1
"""
cursor.execute(select_query)
job = cursor.fetchone()
# If a job is available
if job:
job_id, job_result = job[0], job[1]
print("Selected Job: ", job_id)
# Compute the value of the new result
new_result = job_result + add_to_results
# Sleep to simulate the time it takes to process a job
sleep(per_job_delay)
# Define the update query to set the job as processed and update its result
update_query = """
UPDATE jobs
SET status = 1, result = %s
WHERE id = %s
"""
cursor.execute(update_query, (new_result, job_id))
# Commit the transaction
self.connection.commit()
print("\tJob processed successfully")
else:
# No job is available
self.connection.rollback()
except (Exception, psycopg.DatabaseError) as error:
print(f"Error during processing: {error}")
finally:
cursor.close()
def run_job_consumer(
number_job_processing_attempts,
polling_delay,
per_job_delay,
add_to_results):
job_consumer = JobConsumer()
job_consumer.connect()
for i in range(number_job_processing_attempts):
job_consumer.try_to_process_a_job(per_job_delay, add_to_results)
sleep(polling_delay)
job_consumer.disconnect()
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("N", help="Number to add to job results", type=int)
args = parser.parse_args()
number_job_processing_attempts = 5000
polling_delay = 0.01
per_job_delay = 0.1
run_job_consumer(
number_job_processing_attempts,
polling_delay,
per_job_delay,
args.N
)
Validation
When the whole job queue is processed, we will try to detect jobs that were potentially processed twice by looking at their results. If any result contains a number that is a composition of any input parameters, it means that the job was processed at least twice.
For example, if two job processors were launched with input parameters 1 and 10, then if any job has an assigned result of 11, it means that this job has been processed by both workers 1 and 2, which is an error.
To validate, we count any job whose result is not one of the input parameters to the consumers. If everything went well, this value should be 0. For example, this is the expect result in the SQL prompt.
template1=# SELECT COUNT(*) FROM jobs WHERE result != 1 AND result != 10;
count
-------
0
(1 row)
Launching the scripts
We first launch all consumers, then we launch the producer. We wait until all jobs have been processed. Finally, we check results with the SQL command that counts results. In one terminal, we would do it this way.
python consumer.py 1 &
python consumer.py 10 &
python producer.py 1000
This image shows how it looks like when I run it in VS Code:
Conclusion
In this article, I showed that PostgreSQL is a valid alternative to task queues. In particular, this solution is compelling when the system does not need to handle a large load, and PostgreSQL is already available. Although, the solution that I described works, it is based on polling. In a future work, we may investigate the use of two features from PostgreSQL that may be useful to implement. First, PostgreSQL’s LISTEN/NOTIFY mechanism can be used to implement a publish/subscribe pattern, which may be preferable compared to polling. Second, PostgreSQL can be used for distributed locking using advisory locks.