假設我有多位工作人員可以同時讀取和寫入 MySQL 表(例如 jobs
)。每個工人的任務是:
已排隊
作業 RUNNING
請注意,當工作人員執行步驟 #1 時,可能不有任何符合條件的作業(即 QUEUED
)。
到目前為止我有以下偽代碼。我相信如果步驟 #1 沒有返回作業,我需要取消 (ROLLBACK
) 事務。我將如何在下面的程式碼中做到這一點?
BEGIN TRANSACTION; # Update the status of jobs fetched by this query: SELECT id from jobs WHERE status = "QUEUED" ORDER BY created_at ASC LIMIT 1; # Do the actual update, otherwise abort (i.e. ROLLBACK?) UPDATE jobs SET status="RUNNING" # HERE: Not sure how to make this conditional on the previous ID # WHERE id = <ID from the previous SELECT> COMMIT;
P粉5369091862023-12-22 09:14:00
目前還不太清楚你想要什麼。但假設您的任務是:尋找下一個QUEUED
作業。將其狀態設為RUNNING
並選擇對應的ID。
在單執行緒環境中,您可以只使用您的程式碼。將選定的 ID 提取到應用程式程式碼中的變數中,並將其傳遞給 WHERE 子句中的 UPDATE 查詢。您甚至不需要事務,因為只有一個寫入語句。您可以在 SQLscript 中進行模仿。
假設這是您目前的狀態:
| id | created_at | status | | --- | ------------------- | -------- | | 1 | 2020-06-15 12:00:00 | COMLETED | | 2 | 2020-06-15 12:00:10 | QUEUED | | 3 | 2020-06-15 12:00:20 | QUEUED | | 4 | 2020-06-15 12:00:30 | QUEUED |
您想要啟動下一個排隊作業(id=2)。
SET @id_for_update = ( SELECT id FROM jobs WHERE status = 'QUEUED' ORDER BY id LIMIT 1 ); UPDATE jobs SET status="RUNNING" WHERE id = @id_for_update; SELECT @id_for_update;
你會得到
@id_for_update 2
從上次選擇開始。該表將具有以下狀態:
| id | created_at | status | | --- | ------------------- | -------- | | 1 | 2020-06-15 12:00:00 | COMLETED | | 2 | 2020-06-15 12:00:10 | RUNNING | | 3 | 2020-06-15 12:00:20 | QUEUED | | 4 | 2020-06-15 12:00:30 | QUEUED |
如果您有多個啟動作業的進程,則需要使用 FOR UPDATE
鎖定該行。但可以使用LAST_INSERT_ID()
來避免這種情況:
從上面的狀態開始,作業 2 已經在運作:
UPDATE jobs SET status = 'RUNNING', id = LAST_INSERT_ID(id) WHERE status = 'QUEUED' ORDER BY id LIMIT 1; SELECT LAST_INSERT_ID();
您將得到:
| LAST_INSERT_ID() | ROW_COUNT() | | ---------------- | ----------- | | 3 | 1 |
新的狀態是:
| id | created_at | status | | --- | ------------------- | -------- | | 1 | 2020-06-15 12:00:00 | COMLETED | | 2 | 2020-06-15 12:00:10 | RUNNING | | 3 | 2020-06-15 12:00:20 | RUNNING | | 4 | 2020-06-15 12:00:30 | QUEUED |
如果 UPDATE 語句沒有影響任何行(沒有排隊的行),ROW_COUNT()
將為 0
。
可能存在一些我不知道的風險 - 但這也不是我真正的處理方式。我寧願在 jobs
表中存儲更多資訊。簡單的例子:
CREATE TABLE jobs ( id INT auto_increment primary key, created_at timestamp not null default now(), updated_at timestamp not null default now() on update now(), status varchar(50) not null default 'QUEUED', process_id varchar(50) null default null );
和
UPDATE jobs SET status = 'RUNNING', process_id = 'some_unique_pid' WHERE status = 'QUEUED' ORDER BY id LIMIT 1;
現在正在執行的作業屬於特定進程,您只需使用
即可選擇它SELECT * FROM jobs WHERE process_id = 'some_unique_pid';
您甚至可能想了解更多 - 例如。 queued_at
、started_at
、finished_at
。
P粉6355097192023-12-22 00:24:50
本週我正在實施與您的案例非常相似的事情。多個工作人員,每個工作人員抓取一組行中的「下一行」進行工作。
偽程式碼是這樣的:
BEGIN; SELECT ID INTO @id FROM mytable WHERE status = 'QUEUED' LIMIT 1 FOR UPDATE; UPDATE mytable SET status = 'RUNNING' WHERE id = @id; COMMIT;
使用 FOR UPDATE
對於避免競爭條件(即多個工作人員試圖取得同一行)非常重要。
請參閱https://dev.mysql.com/ doc/refman/8.0/en/select-into.html 以了解有關 SELECT ... INTO
的資訊。