search

Home  >  Q&A  >  body text

Atomic reads and updates with concurrent worker threads in MySQL

Suppose I have multiple workers that can read and write to a MySQL table at the same time (e.g. jobs). The task of each worker is:

  1. Find the oldest Queued Job
  2. Set its status to RUNNING
  3. Return the corresponding ID.

Please note that when the worker runs step #1, there may not be any eligible jobs (i.e. QUEUED).

I have the following pseudocode so far. I believe if step #1 does not return the job, I need to cancel the (ROLLBACK) transaction. How would I do this in the code below?

BEGIN TRANSACTION;

# Update the status of jobs fetched by this query:
SELECT id from jobs WHERE status = "QUEUED" 
ORDER BY created_at ASC LIMIT 1;

# Do the actual update, otherwise abort (i.e. ROLLBACK?)
UPDATE jobs
SET status="RUNNING"
# HERE: Not sure how to make this conditional on the previous ID
# WHERE id = <ID from the previous SELECT>

COMMIT;

P粉239164234P粉239164234382 days ago532

reply all(2)I'll reply

  • P粉536909186

    P粉5369091862023-12-22 09:14:00

    It's not quite clear what you want. But let's say your task is: find the next QUEUED job. Set its status to RUNNING and select the appropriate ID.

    In a single-threaded environment, you can just use your code. Extract the selected ID into a variable in the application code and pass it to the UPDATE query in the WHERE clause. You don't even need a transaction as there is only one write statement. You can imitate this in SQLscript.

    Assume this is your current status:

    | id  | created_at          | status   |
    | --- | ------------------- | -------- |
    | 1   | 2020-06-15 12:00:00 | COMLETED |
    | 2   | 2020-06-15 12:00:10 | QUEUED   |
    | 3   | 2020-06-15 12:00:20 | QUEUED   |
    | 4   | 2020-06-15 12:00:30 | QUEUED   |

    You want to start the next queued job (id=2).

    SET @id_for_update = (
      SELECT id
      FROM jobs
      WHERE status = 'QUEUED'
      ORDER BY id
      LIMIT 1
    );
    
    UPDATE jobs
    SET status="RUNNING"
    WHERE id = @id_for_update;
    
    SELECT @id_for_update;

    You will get

    @id_for_update
    2

    Start from last selection. The table will have the following status:

    | id  | created_at          | status   |
    | --- | ------------------- | -------- |
    | 1   | 2020-06-15 12:00:00 | COMLETED |
    | 2   | 2020-06-15 12:00:10 | RUNNING  |
    | 3   | 2020-06-15 12:00:20 | QUEUED   |
    | 4   | 2020-06-15 12:00:30 | QUEUED   |

    View on DB Fiddle

    If you have multiple processes launching jobs, you will need to lock the row using FOR UPDATE. But you can use LAST_INSERT_ID() to avoid this situation:

    Starting from the above status, job 2 is already running:

    UPDATE jobs
    SET status = 'RUNNING',
        id = LAST_INSERT_ID(id)
    WHERE status = 'QUEUED'
    ORDER BY id
    LIMIT 1;
    
    SELECT LAST_INSERT_ID();

    You will get:

    | LAST_INSERT_ID() | ROW_COUNT() |
    | ---------------- | ----------- |
    | 3                | 1           |

    The new status is:

    | id  | created_at          | status   |
    | --- | ------------------- | -------- |
    | 1   | 2020-06-15 12:00:00 | COMLETED |
    | 2   | 2020-06-15 12:00:10 | RUNNING  |
    | 3   | 2020-06-15 12:00:20 | RUNNING  |
    | 4   | 2020-06-15 12:00:30 | QUEUED   |

    View on DB Fiddle

    If the UPDATE statement did not affect any rows (no queued rows), ROW_COUNT() will be 0.

    There may be some risks that I'm not aware of - but that's not really how I approach it either. I would rather store more information in the jobs table. Simple example:

    CREATE TABLE jobs (
      id INT auto_increment primary key,
      created_at timestamp not null default now(),
      updated_at timestamp not null default now() on update now(),
      status varchar(50) not null default 'QUEUED',
      process_id varchar(50) null default null
    );

    and

    UPDATE jobs
    SET status = 'RUNNING',
        process_id = 'some_unique_pid'    
    WHERE status = 'QUEUED'
    ORDER BY id
    LIMIT 1;

    The job that is running now belongs to a specific process, you can select it simply using

    SELECT * FROM jobs WHERE process_id = 'some_unique_pid';

    You might even want to know more - eg. queued_at, started_at, finished_at.

    reply
    0
  • P粉635509719

    P粉6355097192023-12-22 00:24:50

    This week I'm implementing something very similar to your case. Multiple workers, each grabbing the "next row" in a set of rows to work on.

    The pseudo code is like this:

    BEGIN;
    
    SELECT ID INTO @id FROM mytable WHERE status = 'QUEUED' LIMIT 1 FOR UPDATE;
    
    UPDATE mytable SET status = 'RUNNING' WHERE id = @id;
    
    COMMIT;

    Using FOR UPDATE is important to avoid race conditions (i.e. multiple workers trying to get the same row).

    See https://dev.mysql.com/ doc/refman/8.0/en/select-into.html for information about SELECT ... INTO.

    reply
    0
  • Cancelreply