首页  >  问答  >  正文

MySQL 中并发工作线程的原子读取和更新

假设我有多个工作人员可以同时读取和写入 MySQL 表(例如 jobs)。每个工人的任务是:

  1. 查找最旧的 已排队 作业
  2. 将其状态设置为 RUNNING
  3. 返回对应的ID。

请注意,当工作人员运行步骤 #1 时,可能有任何符合条件的作业(即 QUEUED)。

到目前为止我有以下伪代码。我相信如果步骤 #1 没有返回作业,我需要取消 (ROLLBACK) 事务。我将如何在下面的代码中做到这一点?

BEGIN TRANSACTION;

# Update the status of jobs fetched by this query:
SELECT id from jobs WHERE status = "QUEUED" 
ORDER BY created_at ASC LIMIT 1;

# Do the actual update, otherwise abort (i.e. ROLLBACK?)
UPDATE jobs
SET status="RUNNING"
# HERE: Not sure how to make this conditional on the previous ID
# WHERE id = <ID from the previous SELECT>

COMMIT;

P粉239164234P粉239164234303 天前440

全部回复(2)我来回复

  • P粉536909186

    P粉5369091862023-12-22 09:14:00

    目前还不太清楚你想要什么。但假设您的任务是:查找下一个QUEUED 作业。将其状态设置为RUNNING并选择相应的ID。

    在单线程环境中,您可以只使用您的代码。将选定的 ID 提取到应用程序代码中的变量中,并将其传递给 WHERE 子句中的 UPDATE 查询。您甚至不需要事务,因为只有一个写入语句。您可以在 SQLscript 中进行模仿。

    假设这是您当前的状态:

    | id  | created_at          | status   |
    | --- | ------------------- | -------- |
    | 1   | 2020-06-15 12:00:00 | COMLETED |
    | 2   | 2020-06-15 12:00:10 | QUEUED   |
    | 3   | 2020-06-15 12:00:20 | QUEUED   |
    | 4   | 2020-06-15 12:00:30 | QUEUED   |

    您想要启动下一个排队作业(id=2)。

    SET @id_for_update = (
      SELECT id
      FROM jobs
      WHERE status = 'QUEUED'
      ORDER BY id
      LIMIT 1
    );
    
    UPDATE jobs
    SET status="RUNNING"
    WHERE id = @id_for_update;
    
    SELECT @id_for_update;

    你会得到

    @id_for_update
    2

    从上次选择开始。该表将具有以下状态:

    | id  | created_at          | status   |
    | --- | ------------------- | -------- |
    | 1   | 2020-06-15 12:00:00 | COMLETED |
    | 2   | 2020-06-15 12:00:10 | RUNNING  |
    | 3   | 2020-06-15 12:00:20 | QUEUED   |
    | 4   | 2020-06-15 12:00:30 | QUEUED   |

    在 DB Fiddle 上查看

    如果您有多个启动作业的进程,则需要使用 FOR UPDATE 锁定该行。但可以使用LAST_INSERT_ID()来避免这种情况:

    从上面的状态开始,作业 2 已经在运行:

    UPDATE jobs
    SET status = 'RUNNING',
        id = LAST_INSERT_ID(id)
    WHERE status = 'QUEUED'
    ORDER BY id
    LIMIT 1;
    
    SELECT LAST_INSERT_ID();

    您将得到:

    | LAST_INSERT_ID() | ROW_COUNT() |
    | ---------------- | ----------- |
    | 3                | 1           |

    新的状态是:

    | id  | created_at          | status   |
    | --- | ------------------- | -------- |
    | 1   | 2020-06-15 12:00:00 | COMLETED |
    | 2   | 2020-06-15 12:00:10 | RUNNING  |
    | 3   | 2020-06-15 12:00:20 | RUNNING  |
    | 4   | 2020-06-15 12:00:30 | QUEUED   |

    在 DB Fiddle 上查看

    如果 UPDATE 语句没有影响任何行(没有排队的行),ROW_COUNT() 将为 0

    可能存在一些我不知道的风险 - 但这也不是我真正的处理方式。我宁愿在 jobs 表中存储更多信息。简单的例子:

    CREATE TABLE jobs (
      id INT auto_increment primary key,
      created_at timestamp not null default now(),
      updated_at timestamp not null default now() on update now(),
      status varchar(50) not null default 'QUEUED',
      process_id varchar(50) null default null
    );

    UPDATE jobs
    SET status = 'RUNNING',
        process_id = 'some_unique_pid'    
    WHERE status = 'QUEUED'
    ORDER BY id
    LIMIT 1;

    现在正在运行的作业属于特定进程,您只需使用

    即可选择它
    SELECT * FROM jobs WHERE process_id = 'some_unique_pid';

    您甚至可能想了解更多信息 - 例如。 queued_atstarted_atfinished_at

    回复
    0
  • P粉635509719

    P粉6355097192023-12-22 00:24:50

    本周我正在实施与您的案例非常相似的事情。多个工作人员,每个工作人员抓取一组行中的“下一行”进行工作。

    伪代码是这样的:

    BEGIN;
    
    SELECT ID INTO @id FROM mytable WHERE status = 'QUEUED' LIMIT 1 FOR UPDATE;
    
    UPDATE mytable SET status = 'RUNNING' WHERE id = @id;
    
    COMMIT;

    使用 FOR UPDATE 对于避免竞争条件(即多个工作人员试图获取同一行)非常重要。

    参见https://dev.mysql.com/ doc/refman/8.0/en/select-into.html 了解有关 SELECT ... INTO 的信息。

    回复
    0
  • 取消回复