假设我有多个工作人员可以同时读取和写入 MySQL 表(例如 jobs
)。每个工人的任务是:
已排队
作业 RUNNING
请注意,当工作人员运行步骤 #1 时,可能不有任何符合条件的作业(即 QUEUED
)。
到目前为止我有以下伪代码。我相信如果步骤 #1 没有返回作业,我需要取消 (ROLLBACK
) 事务。我将如何在下面的代码中做到这一点?
BEGIN TRANSACTION; # Update the status of jobs fetched by this query: SELECT id from jobs WHERE status = "QUEUED" ORDER BY created_at ASC LIMIT 1; # Do the actual update, otherwise abort (i.e. ROLLBACK?) UPDATE jobs SET status="RUNNING" # HERE: Not sure how to make this conditional on the previous ID # WHERE id = <ID from the previous SELECT> COMMIT;
P粉5369091862023-12-22 09:14:00
目前还不太清楚你想要什么。但假设您的任务是:查找下一个QUEUED
作业。将其状态设置为RUNNING
并选择相应的ID。
在单线程环境中,您可以只使用您的代码。将选定的 ID 提取到应用程序代码中的变量中,并将其传递给 WHERE 子句中的 UPDATE 查询。您甚至不需要事务,因为只有一个写入语句。您可以在 SQLscript 中进行模仿。
假设这是您当前的状态:
| id | created_at | status | | --- | ------------------- | -------- | | 1 | 2020-06-15 12:00:00 | COMLETED | | 2 | 2020-06-15 12:00:10 | QUEUED | | 3 | 2020-06-15 12:00:20 | QUEUED | | 4 | 2020-06-15 12:00:30 | QUEUED |
您想要启动下一个排队作业(id=2)。
SET @id_for_update = ( SELECT id FROM jobs WHERE status = 'QUEUED' ORDER BY id LIMIT 1 ); UPDATE jobs SET status="RUNNING" WHERE id = @id_for_update; SELECT @id_for_update;
你会得到
@id_for_update 2
从上次选择开始。该表将具有以下状态:
| id | created_at | status | | --- | ------------------- | -------- | | 1 | 2020-06-15 12:00:00 | COMLETED | | 2 | 2020-06-15 12:00:10 | RUNNING | | 3 | 2020-06-15 12:00:20 | QUEUED | | 4 | 2020-06-15 12:00:30 | QUEUED |
如果您有多个启动作业的进程,则需要使用 FOR UPDATE
锁定该行。但可以使用LAST_INSERT_ID()
来避免这种情况:
从上面的状态开始,作业 2 已经在运行:
UPDATE jobs SET status = 'RUNNING', id = LAST_INSERT_ID(id) WHERE status = 'QUEUED' ORDER BY id LIMIT 1; SELECT LAST_INSERT_ID();
您将得到:
| LAST_INSERT_ID() | ROW_COUNT() | | ---------------- | ----------- | | 3 | 1 |
新的状态是:
| id | created_at | status | | --- | ------------------- | -------- | | 1 | 2020-06-15 12:00:00 | COMLETED | | 2 | 2020-06-15 12:00:10 | RUNNING | | 3 | 2020-06-15 12:00:20 | RUNNING | | 4 | 2020-06-15 12:00:30 | QUEUED |
如果 UPDATE 语句没有影响任何行(没有排队的行),ROW_COUNT()
将为 0
。
可能存在一些我不知道的风险 - 但这也不是我真正的处理方式。我宁愿在 jobs
表中存储更多信息。简单的例子:
CREATE TABLE jobs ( id INT auto_increment primary key, created_at timestamp not null default now(), updated_at timestamp not null default now() on update now(), status varchar(50) not null default 'QUEUED', process_id varchar(50) null default null );
和
UPDATE jobs SET status = 'RUNNING', process_id = 'some_unique_pid' WHERE status = 'QUEUED' ORDER BY id LIMIT 1;
现在正在运行的作业属于特定进程,您只需使用
即可选择它SELECT * FROM jobs WHERE process_id = 'some_unique_pid';
您甚至可能想了解更多信息 - 例如。 queued_at
、started_at
、finished_at
。
P粉6355097192023-12-22 00:24:50
本周我正在实施与您的案例非常相似的事情。多个工作人员,每个工作人员抓取一组行中的“下一行”进行工作。
伪代码是这样的:
BEGIN; SELECT ID INTO @id FROM mytable WHERE status = 'QUEUED' LIMIT 1 FOR UPDATE; UPDATE mytable SET status = 'RUNNING' WHERE id = @id; COMMIT;
使用 FOR UPDATE
对于避免竞争条件(即多个工作人员试图获取同一行)非常重要。
参见https://dev.mysql.com/ doc/refman/8.0/en/select-into.html 了解有关 SELECT ... INTO
的信息。