Mathieu GAILLARD

Using an SQL database as a job queue

I recently discovered that relational databases can be used as a job queue. For some reason, I thought it was not possible, and tools like RabbitMQ, Apache Kafka, Redis, Amazon SQS, were the only options for dispatching jobs from a set of producers to a set of consumers.

Don’t get me wrong, these special tools are probably better than a relational database in some cases, and they do solve real problems. But what if you already have a relational database in your IT system, and you need to implement a job queue? Wouldn’t it be nice to reuse an existing component and save on additional operational costs? I think that if scalability is not a concern, and all you need is to process long running jobs, a relational database is probably just enough.

In this article, I will show you one feature from SQL that can be use to implement a job queuing system.

PostgreSQL’s features for job queues

PostgreSQL provides two features that can be used for queuing jobs.

First, a SELECT statement with the FOR UPDATE SKIP LOCKED option will ensure that a lock is acquired for all rows that are selected, and rows that were already locked cannot be selected again. This ensures that a job is selected and processed only once.

Second, the LISTEN and NOTIFY statements can be used to push events to the workers that will process jobs. This way, you do not have to implement a polling mechanism, you can directly notify your workers when a new job is added to the queue.

An example implementation

In the rest of this article, I will show you a prototype of a job queue implemented with PostgreSQL. Producers and consumers for the queue will be implemented in Python, and will be responsible respectively for adding jobs, or processing them.

Installation

In this section, I assume that you are starting from a base Ubuntu 24.04 instance.

First, we setup Python to be able to connect to a PostgreSQL database.

sudo apt install python3 python3-venv
# Create the virtual environment
python3 -m venv venv
source venv/bin/activate
# Install the PIP package for PostgreSQL
pip install "psycopg[binary,pool]"

Then, we install PostgreSQL locally.

sudo apt install postgresql
# No need to configure remote connections if we use the server locally.
# However, we do need to change the password for the default user.
sudo -u postgres psql template1
# Type this in the postgres prompt
ALTER USER postgres with encrypted password 'your_password';
# Then type \q to quit
\q
# Reboot the PostgreSQL server to take in account the new configuration
sudo systemctl restart postgresql.service

Database creation

We will use the PostgreSQL CLI tool to create the table that will contain the jobs that our system needs to process.

Each job will have:

  • A unique ID, that is auto incrementing primary key
  • A status, which is either 0 when the job is pending, or 1 when it’s done
  • A result, which is 0 by default, and contains the result after processing

To add the table, we first connect in CLI to the database.

# We connect to the default database template1
sudo -u postgres psql template1

With the SQL prompt, we create the table by typing the following.

CREATE TABLE jobs (
    id SERIAL PRIMARY KEY,
    status SMALLINT,
    result INTEGER
);

CREATE INDEX idx_status ON jobs (status);

Job producer

A python script will be responsible for adding N jobs to the queue. It will add one job every 50 milliseconds until all jobs are added.

#! /usr/bin/env python
import argparse
import psycopg
from time import sleep

class JobProducer:
    def __init__(self):
        self.connection = None

    def __del__(self):
        if self.connection is not None:
            self.connection.close()

    def connect(self):
        """ Connect to the postgres DB """
        try:
            self.connection = psycopg.connect(
                dbname="template1",
                user="postgres",
                password="your_password",
                host="localhost",
                port="5432"
            )
        except (Exception, psycopg.DatabaseError) as error:
            print(f"Error during connection: {error}")
    
    def disconnect(self):
        """ Disconnect from the postgres DB """
        if self.connection is not None:
            self.connection.close()
            self.connection = None

    def insert_new_job(self):
        """ Insert a job in the database """
        if self.connection is None:
            raise RuntimeError("You need to be connected before you can insert jobs.")

        try:
            cursor = self.connection.cursor()
            insert_query = "INSERT INTO jobs (status, result) VALUES (%s, %s)"
            data_to_insert = ('0', '0')
            cursor.execute(insert_query, data_to_insert)
            self.connection.commit()
        except (Exception, psycopg.DatabaseError) as error:
            print(f"Error during insertion: {error}")
        finally:
            cursor.close()


def run_job_producer(number_jobs, per_job_delay):
    job_producer = JobProducer()
    job_producer.connect()
    for i in range(number_jobs):
        job_producer.insert_new_job()
        print("Added a job!")
        sleep(per_job_delay)
    job_producer.disconnect()


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("N", help="Number of jobs to add to the queue", type=int)
    args = parser.parse_args()

    if args.N <= 0:
        raise ValueError("The number of jobs to add must be positive.")

    run_job_producer(args.N, per_job_delay=0.1)

Job consumers

Another python script will be responsible for processing jobs. The task of a job processor is to hold the job for 100 milliseconds, add a given number to its result, and set it as processed. If no job is available in the queue, the worker waits for 10 milliseconds. We will launch multiple instances of the job processor with different input parameters so that we can easily infer which processor processed which job.

#! /usr/bin/env python
import argparse
import psycopg
from time import sleep

class JobConsumer:
    def __init__(self):
        self.connection = None

    def __del__(self):
        if self.connection is not None:
            self.connection.close()

    def connect(self):
        """ Connect to the postgres DB """
        try:
            self.connection = psycopg.connect(
                dbname="template1",
                user="postgres",
                password="your_password",
                host="localhost",
                port="5432"
            )
        except (Exception, psycopg.DatabaseError) as error:
            print(f"Error during connection: {error}")
    
    def disconnect(self):
        """ Disconnect from the postgres DB """
        if self.connection is not None:
            self.connection.close()
            self.connection = None

    def try_to_process_a_job(self, per_job_delay, add_to_results):
        """ Try to process a job """
        if self.connection is None:
            raise RuntimeError("You need to be connected before you can process jobs.")

        try:
            # Start a transaction
            cursor = self.connection.cursor()
            
            # Select one pending job in the queue
            select_query = """
                SELECT id, result
                FROM jobs 
                WHERE status = 0
                FOR UPDATE SKIP LOCKED
                LIMIT 1
            """
            cursor.execute(select_query)
            job = cursor.fetchone()
            
            # If a job is available
            if job:
                job_id, job_result = job[0], job[1]
                print("Selected Job: ", job_id)

                # Compute the value of the new result
                new_result = job_result + add_to_results

                # Sleep to simulate the time it takes to process a job
                sleep(per_job_delay)
                
                # Define the update query to set the job as processed and update its result
                update_query = """
                    UPDATE jobs
                    SET status = 1, result = %s
                    WHERE id = %s
                """
                cursor.execute(update_query, (new_result, job_id))
                
                # Commit the transaction
                self.connection.commit()
                print("\tJob processed successfully")
            else:
                # No job is available
                self.connection.rollback()
        except (Exception, psycopg.DatabaseError) as error:
            print(f"Error during processing: {error}")
        finally:
            cursor.close()


def run_job_consumer(
        number_job_processing_attempts,
        polling_delay,
        per_job_delay,
        add_to_results):
    job_consumer = JobConsumer()
    job_consumer.connect()
    for i in range(number_job_processing_attempts):
        job_consumer.try_to_process_a_job(per_job_delay, add_to_results)
        sleep(polling_delay)
    job_consumer.disconnect()


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("N", help="Number to add to job results", type=int)
    args = parser.parse_args()

    number_job_processing_attempts = 5000
    polling_delay = 0.01
    per_job_delay = 0.1

    run_job_consumer(
        number_job_processing_attempts,
        polling_delay,
        per_job_delay,
        args.N
    )

Validation

When the whole job queue is processed, we will try to detect jobs that were potentially processed twice by looking at their results. If any result contains a number that is a composition of any input parameters, it means that the job was processed at least twice.

For example, if two job processors were launched with input parameters 1 and 10, then if any job has an assigned result of 11, it means that this job has been processed by both workers 1 and 2, which is an error.

To validate, we count any job whose result is not one of the input parameters to the consumers. If everything went well, this value should be 0. For example, this is the expect result in the SQL prompt.

template1=# SELECT COUNT(*) FROM jobs WHERE result != 1 AND result != 10;

count 
-------
     0
(1 row)

Launching the scripts

We first launch all consumers, then we launch the producer. We wait until all jobs have been processed. Finally, we check results with the SQL command that counts results. In one terminal, we would do it this way.

python consumer.py 1 &
python consumer.py 10 &
python producer.py 1000

This image shows how it looks like when I run it in VS Code: A terminal showing the producers and consumers

Conclusion

In this article, I showed that PostgreSQL is a valid alternative to task queues. In particular, this solution is compelling when the system does not need to handle a large load, and PostgreSQL is already available. Although, the solution that I described works, it is based on polling. In a future work, we may investigate the use of two features from PostgreSQL that may be useful to implement. First, PostgreSQL’s LISTEN/NOTIFY mechanism can be used to implement a publish/subscribe pattern, which may be preferable compared to polling. Second, PostgreSQL can be used for distributed locking using advisory locks.

References