搜尋

首頁  >  問答  >  主體

MySQL 中並發工作執行緒的原子讀取與更新

假設我有多位工作人員可以同時讀取和寫入 MySQL 表(例如 jobs)。每個工人的任務是:

  1. 尋找最舊的 已排隊 作業
  2. 將其狀態設為 RUNNING
  3. 傳回對應的ID。

請注意,當工作人員執行步驟 #1 時,可能有任何符合條件的作業(即 QUEUED)。

到目前為止我有以下偽代碼。我相信如果步驟 #1 沒有返回作業,我需要取消 (ROLLBACK) 事務。我將如何在下面的程式碼中做到這一點?

BEGIN TRANSACTION;

# Update the status of jobs fetched by this query:
SELECT id from jobs WHERE status = "QUEUED" 
ORDER BY created_at ASC LIMIT 1;

# Do the actual update, otherwise abort (i.e. ROLLBACK?)
UPDATE jobs
SET status="RUNNING"
# HERE: Not sure how to make this conditional on the previous ID
# WHERE id = <ID from the previous SELECT>

COMMIT;

P粉239164234P粉239164234396 天前540

全部回覆(2)我來回復

  • P粉536909186

    P粉5369091862023-12-22 09:14:00

    目前還不太清楚你想要什麼。但假設您的任務是:尋找下一個QUEUED 作業。將其狀態設為RUNNING並選擇對應的ID。

    在單執行緒環境中,您可以只使用您的程式碼。將選定的 ID 提取到應用程式程式碼中的變數中,並將其傳遞給 WHERE 子句中的 UPDATE 查詢。您甚至不需要事務,因為只有一個寫入語句。您可以在 SQLscript 中進行模仿。

    假設這是您目前的狀態:

    | id  | created_at          | status   |
    | --- | ------------------- | -------- |
    | 1   | 2020-06-15 12:00:00 | COMLETED |
    | 2   | 2020-06-15 12:00:10 | QUEUED   |
    | 3   | 2020-06-15 12:00:20 | QUEUED   |
    | 4   | 2020-06-15 12:00:30 | QUEUED   |

    您想要啟動下一個排隊作業(id=2)。

    SET @id_for_update = (
      SELECT id
      FROM jobs
      WHERE status = 'QUEUED'
      ORDER BY id
      LIMIT 1
    );
    
    UPDATE jobs
    SET status="RUNNING"
    WHERE id = @id_for_update;
    
    SELECT @id_for_update;

    你會得到

    @id_for_update
    2

    從上次選擇開始。該表將具有以下狀態:

    | id  | created_at          | status   |
    | --- | ------------------- | -------- |
    | 1   | 2020-06-15 12:00:00 | COMLETED |
    | 2   | 2020-06-15 12:00:10 | RUNNING  |
    | 3   | 2020-06-15 12:00:20 | QUEUED   |
    | 4   | 2020-06-15 12:00:30 | QUEUED   |

    在 DB Fiddle 上查看

    如果您有多個啟動作業的進程,則需要使用 FOR UPDATE 鎖定該行。但可以使用LAST_INSERT_ID()來避免這種情況:

    從上面的狀態開始,作業 2 已經在運作:

    UPDATE jobs
    SET status = 'RUNNING',
        id = LAST_INSERT_ID(id)
    WHERE status = 'QUEUED'
    ORDER BY id
    LIMIT 1;
    
    SELECT LAST_INSERT_ID();

    您將得到:

    | LAST_INSERT_ID() | ROW_COUNT() |
    | ---------------- | ----------- |
    | 3                | 1           |

    新的狀態是:

    | id  | created_at          | status   |
    | --- | ------------------- | -------- |
    | 1   | 2020-06-15 12:00:00 | COMLETED |
    | 2   | 2020-06-15 12:00:10 | RUNNING  |
    | 3   | 2020-06-15 12:00:20 | RUNNING  |
    | 4   | 2020-06-15 12:00:30 | QUEUED   |

    在 DB Fiddle 上查看

    如果 UPDATE 語句沒有影響任何行(沒有排隊的行),ROW_COUNT() 將為 0

    可能存在一些我不知道的風險 - 但這也不是我真正的處理方式。我寧願在 jobs 表中存儲更多資訊。簡單的例子:

    CREATE TABLE jobs (
      id INT auto_increment primary key,
      created_at timestamp not null default now(),
      updated_at timestamp not null default now() on update now(),
      status varchar(50) not null default 'QUEUED',
      process_id varchar(50) null default null
    );

    UPDATE jobs
    SET status = 'RUNNING',
        process_id = 'some_unique_pid'    
    WHERE status = 'QUEUED'
    ORDER BY id
    LIMIT 1;

    現在正在執行的作業屬於特定進程,您只需使用

    即可選擇它
    SELECT * FROM jobs WHERE process_id = 'some_unique_pid';

    您甚至可能想了解更多 - 例如。 queued_atstarted_atfinished_at

    回覆
    0
  • P粉635509719

    P粉6355097192023-12-22 00:24:50

    本週我正在實施與您的案例非常相似的事情。多個工作人員,每個工作人員抓取一組行中的「下一行」進行工作。

    偽程式碼是這樣的:

    BEGIN;
    
    SELECT ID INTO @id FROM mytable WHERE status = 'QUEUED' LIMIT 1 FOR UPDATE;
    
    UPDATE mytable SET status = 'RUNNING' WHERE id = @id;
    
    COMMIT;

    使用 FOR UPDATE 對於避免競爭條件(即多個工作人員試圖取得同一行)非常重要。

    請參閱https://dev.mysql.com/ doc/refman/8.0/en/select-into.html 以了解有關 SELECT ... INTO 的資訊。

    回覆
    0
  • 取消回覆