このタスクのロジックは、コンテンツをユーザーにプッシュすることです。ユーザーは、キューのコンテンツに基づいて取得および走査され、リクエスト バックエンド HTTP インターフェイスを通じて送信される必要があります。たとえば、ユーザーが 10,000 人いる場合、ユーザー数が多かったり、インターフェースの処理速度がそれほど速くない場合、実行時間は確実に 60 秒を超えるため、タスクは再びキューに追加されます。
状況はさらに悪く、前のタスクが 60 秒以内に実行されなかった場合、それらのタスクは再びキューに追加され、同じタスクが 1 回だけでなく複数回実行されることになります。Laravel のソースコードから犯人を見つけてみましょう。 ソースコードファイル:vendor/laravel/framework/src/Illuminate/Queue/RedisQueue.php
/** * The expiration time of a job. * * @var int|null */ protected $expire = 60;
この$expireメンバー変数は固定値であり、Laravelはキューは関係ないと考えていますどのように 60 数秒以内に完了する必要があります。キューの取得方法:
public function pop($queue = null) { $original = $queue ?: $this->default; $queue = $this->getQueue($queue); $this->migrateExpiredJobs($queue.':delayed', $queue); if (! is_null($this->expire)) { $this->migrateExpiredJobs($queue.':reserved', $queue); } list($job, $reserved) = $this->getConnection()->eval( LuaScripts::pop(), 2, $queue, $queue.':reserved', $this->getTime() + $this->expire ); if ($reserved) { return new RedisJob($this->container, $this, $job, $reserved, $original); } }
キューを取得するにはいくつかの手順があります。キューの実行が失敗するか、実行がタイムアウトになるため、キューは別のコレクションに入れられ、再試行のために保存されます。処理は次のとおりです。
1 .遅延収集から実行に失敗したキューを現在実行中のキューに再プッシュします。
2. 実行タイムアウトのため、予約されたコレクションから現在実行されているキューにキューを再プッシュします。
3. 次に、キューからタスクを取得して実行を開始し、予約された順序付きコレクションにキューを入れます。
ここでは eval コマンドを使用してこの処理を実行し、いくつかの lua スクリプトを使用します。
実行するキューからタスクを取得します:
local job = redis.call('lpop', KEYS[1]) local reserved = false if(job ~= false) then reserved = cjson.decode(job) reserved['attempts'] = reserved['attempts'] + 1 reserved = cjson.encode(reserved) redis.call('zadd', KEYS[2], ARGV[1], reserved) end return {job, reserved}Laravel が Redis によって実行されるキューを取得すると、順序付けされたキューにコピーも配置されることがわかります。 collection. を使用し、有効期限のタイムスタンプをスコアとして使用します。
タスクが完了した場合にのみ、タスクは順序付けセットから削除されます。この順序付けされたコレクションからキューを削除するコードは省略されていますが、実行時間が 60 秒を超えるキューを Laravel がどのように処理するかを見てみましょう。 これは、この Lua スクリプトによって実行される操作です:
local val = redis.call('zrangebyscore', KEYS[1], '-inf', ARGV[1]) if(next(val) ~= nil) then redis.call('zremrangebyrank', KEYS[1], 0, #val - 1) for i = 1, #val, 100 do redis.call('rpush', KEYS[2], unpack(val, i, math.min(i+99, #val))) end end return true
ここで、zrangebyscore は、スコアが無限小から現在のタイムスタンプまでの範囲にある要素、つまり 60 秒前にコレクションに追加されたタスクを検索します。これらの要素は、zremrangebyrank を介してセットから削除され、キューにプッシュされます。
これを見たらハッと気づくはずです。
キューが 60 秒以内に実行されなかった場合、プロセスはキューをフェッチするときに予約セットからタスクを再度キューにプッシュします。
関連する推奨事項: