How job queue should work

Job queue is a simple stack storage. It has 2 basic features:

Job queue is a popular way to make multiple clients (multiple apps or multiple app components) give tasks (jobs) to each other:

There can be any number of clients who add jobs and any number of clients who get jobs:

Mysql InnoDB engine is a great platform to implement simple job queues.

Job queue table structure

Let's build the most simple job queue one can imagine. We will need a table with the following structure:

We'll need ID column to identify each job stored and maintain insert order (on top of AUTO_INCREMENT functionality). data column will be used to store job data. As this can be anything, we're going to serialize it and store as a string. So TEXT type should be ok (use LONGTEXT if you have large job data objects).

Let's create the table:

CREATE TABLE queue (
  id SERIAL PRIMARY KEY, 
  data TEXT
);

Writing to Job queue

In order to write data to our queue table we'll just use standard INSERT query. We'll use json_encode() method for serializing data into string as it's the most universal data type:

<?php
$pdo = new PDO('mysql:host=localhost;dbname=db', 'usr', 'pwd'); # connect to Mysql

$job = json_encode(['do' => 'something', 'with' => 'data']); # convert job data to JSON

$statement = $pdo->prepare('INSERT INTO queue SET data = :data'); # insert job into queue
$statement->execute([':data' => $job]);                           # using prepared statements

After couple of inserts our table will look like:

> SELECT * FROM queue;
+----+---------------------------------------------+
| id | data                                        |
+----+---------------------------------------------+
|  1 | {"do":"something","with":"data"}            |
|  2 | {"do":"something_else","with":"other_data"} |
+----+---------------------------------------------+

As InnoDB is a great engine for large amounts of simultaneous writes, we don't need to do anything extra to allow multiple clients to write to our queue.

Reading from job queue

Now the fun part. Why? Because we need to do 2 operations for every read from queue:

Easy if we have only one client:

<?php
$statement = $pdo->prepare('SELECT * FROM queue ORDER BY id LIMIT 1'); # select one job in the order of insertion
$statement->execute(); # execute select statement

$job = $statement->fetch(); # fetch selected data into php variable
$pdo->exec('DELETE FROM queue WHERE id = ' . $job['id']); # remove fetched job from queue

$job_data = json_decode($job['data'], 1); # unserialize data from JSON
print_r($job_data); # do something with job data

If we have any jobs in the queue we get:

Array
(
    [do] => something
    [with] => data
)

But if we have multiple simultaneous clients we can end up giving the same job data to multiple clients if they request it in parallel:

In this case we'll want to use InnoDB locking mechanism to make sure we give unique task to unique client. That's what we do:

  1. Begin transaction.
  2. Select another job and lock it instantly using row-level locking (available from version 8.0).
  3. Remove returned job from queue table.
  4. Commit transaction.

In this scenario we will lock selected job and not allow access to it for other clients:

Let's code that now:

<?php
$pdo->exec('START TRANSACTION'); # begin transaction

$statement = $pdo->prepare('SELECT * FROM queue ORDER BY id LIMIT 1 FOR UPDATE SKIP LOCKED'); # get job and lock it
$statement->execute(); # execute select statement

$job = $statement->fetch(); # fetch selected data into php variable
$pdo->exec('DELETE FROM queue WHERE id = ' . $job['id']); # remove fetched job from queue
$pdo->exec('COMMIT'); # commit transaction

$job_data = json_decode($job['data'], 1); # unserialize data from JSON
print_r($job_data); # do something with job data

Now this implementation can be used with multiple (and pretty large number of) clients. This approach is used for the queue component of Mysqly data framework.

written on Jan, 2022