簡介 (Introduction)
在建構 Web 應用程式時,您可能有一些任務(例如解析和儲存上傳的 CSV 檔案)在典型的 Web 請求期間執行時間過長。幸運的是,Laravel 允許您輕鬆建立可在背景處理的佇列 Job。透過將耗時的任務移至佇列,您的應用程式可以以極快的速度回應 Web 請求,並為您的客戶提供更好的使用者體驗。
Laravel 佇列為各種不同的佇列後端(例如 Amazon SQS、Redis 甚至關聯式資料庫)提供了統一的佇列 API。
Laravel 的佇列設定選項儲存在應用程式的 config/queue.php 設定檔中。在此檔案中,您將找到框架中包含的每個佇列驅動程式的連線設定,包括資料庫、Amazon SQS、Redis 和 Beanstalkd 驅動程式,以及一個同步驅動程式,該驅動程式將立即執行 Job(用於開發或測試期間)。還包含一個 null 佇列驅動程式,它會丟棄佇列中的 Job。
[!NOTE] Laravel Horizon 是一個用於 Redis 驅動佇列的精美儀表板和設定系統。查看完整的 Horizon 文件 以獲取更多資訊。
連線 vs. 佇列 (Connections vs. Queues)
在開始使用 Laravel 佇列之前,了解「連線」和「佇列」之間的區別非常重要。在您的 config/queue.php 設定檔中,有一個 connections 設定陣列。此選項定義了與後端佇列服務(例如 Amazon SQS、Beanstalk 或 Redis)的連線。但是,任何給定的佇列連線都可能有多個「佇列」,這些佇列可以被認為是不同的堆疊或成堆的佇列 Job。
請注意,queue 設定檔中的每個連線設定範例都包含一個 queue 屬性。這是 Job 發送到給定連線時將被分派到的預設佇列。換句話說,如果您分派一個 Job 而沒有明確定義它應該被分派到哪個佇列,該 Job 將被放置在連線設定的 queue 屬性中定義的佇列上:
use App\Jobs\ProcessPodcast;
// 此 Job 被發送到預設連線的預設佇列...
ProcessPodcast::dispatch();
// 此 Job 被發送到預設連線的 "emails" 佇列...
ProcessPodcast::dispatch()->onQueue('emails');
有些應用程式可能不需要將 Job 推送到多個佇列,而是更喜歡有一個簡單的佇列。但是,將 Job 推送到多個佇列對於希望優先處理或區分 Job 處理方式的應用程式特別有用,因為 Laravel 佇列 Worker 允許您指定它應該按優先順序處理哪些佇列。例如,如果您將 Job 推送到 high 佇列,您可以執行一個 Worker 來給予它們更高的處理優先權:
php artisan queue:work --queue=high,default
驅動程式說明與先決條件 (Driver Notes and Prerequisites)
資料庫 (Database)
為了使用 database 佇列驅動程式,您將需要一個資料庫資料表來保存 Job。通常,這包含在 Laravel 的預設 0001_01_01_000002_create_jobs_table.php 資料庫遷移中;但是,如果您的應用程式不包含此遷移,您可以使用 make:queue-table Artisan 指令來建立它:
php artisan make:queue-table
php artisan migrate
Redis
為了使用 redis 佇列驅動程式,您應該在 config/database.php 設定檔中設定 Redis 資料庫連線。
[!WARNING] >
redis佇列驅動程式不支援serializer和compressionRedis 選項。
Redis Cluster
如果您的 Redis 佇列連線使用 Redis Cluster,您的佇列名稱必須包含 key hash tag。這是為了確保給定佇列的所有 Redis 鍵都放置在同一個雜湊槽中:
'redis' => [
'driver' => 'redis',
'connection' => env('REDIS_QUEUE_CONNECTION', 'default'),
'queue' => env('REDIS_QUEUE', '{default}'),
'retry_after' => env('REDIS_QUEUE_RETRY_AFTER', 90),
'block_for' => null,
'after_commit' => false,
],
阻塞 (Blocking)
使用 Redis 佇列時,您可以使用 block_for 設定選項來指定驅動程式在迭代 Worker 迴圈並重新輪詢 Redis 資料庫之前,應等待 Job 變為可用的時間長度。
根據您的佇列負載調整此值比持續輪詢 Redis 資料庫以獲取新 Job 更有效率。例如,您可以將值設定為 5,以指示驅動程式在等待 Job 變為可用時應阻塞五秒鐘:
'redis' => [
'driver' => 'redis',
'connection' => env('REDIS_QUEUE_CONNECTION', 'default'),
'queue' => env('REDIS_QUEUE', 'default'),
'retry_after' => env('REDIS_QUEUE_RETRY_AFTER', 90),
'block_for' => 5,
'after_commit' => false,
],
[!WARNING] 將
block_for設定為0將導致佇列 Worker 無限期阻塞,直到有 Job 可用。這也將阻止像SIGTERM這樣的訊號被處理,直到下一個 Job 被處理完畢。
其他驅動程式先決條件 (Other Driver Prerequisites)
列出的佇列驅動程式需要以下依賴項。這些依賴項可以透過 Composer 套件管理器安裝:
- Amazon SQS:
aws/aws-sdk-php ~3.0 - Beanstalkd:
pda/pheanstalk ~5.0 - Redis:
predis/predis ~2.0or phpredis PHP extension - MongoDB:
mongodb/laravel-mongodb
建立 Job (Creating Jobs)
產生 Job 類別 (Generating Job Classes)
預設情況下,應用程式的所有可佇列 Job 都儲存在 app/Jobs 目錄中。如果 app/Jobs 目錄不存在,當您執行 make:job Artisan 指令時,它將會被建立:
php artisan make:job ProcessPodcast
產生的類別將實作 Illuminate\Contracts\Queue\ShouldQueue 介面,向 Laravel 指示該 Job 應該被推送到佇列以非同步執行。
[!NOTE] Job stub 可以使用 stub publishing 進行自訂。
類別結構 (Class Structure)
Job 類別非常簡單,通常只包含一個 handle 方法,當 Job 由佇列處理時會呼叫該方法。為了開始,讓我們看一個範例 Job 類別。在這個範例中,我們假設我們管理一個 Podcast 發佈服務,並且需要在發佈之前處理上傳的 Podcast 檔案:
<?php
namespace App\Jobs;
use App\Models\Podcast;
use App\Services\AudioProcessor;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
class ProcessPodcast implements ShouldQueue
{
use Queueable;
/**
* Create a new job instance.
*/
public function __construct(
public Podcast $podcast,
) {}
/**
* Execute the job.
*/
public function handle(AudioProcessor $processor): void
{
// Process uploaded podcast...
}
}
在此範例中,請注意我們能夠將 Eloquent 模型直接傳遞給佇列 Job 的建構函式。由於 Job 使用了 Queueable trait,Eloquent 模型及其載入的關聯將在 Job 處理時優雅地序列化和反序列化。
如果您的佇列 Job 在其建構函式中接受 Eloquent 模型,則只有模型的識別碼會被序列化到佇列中。當 Job 實際被處理時,佇列系統將自動從資料庫中重新檢索完整的模型實例及其載入的關聯。這種模型序列化方法允許將更小的 Job 負載發送到您的佇列驅動程式。
handle 方法依賴注入 (handle Method Dependency Injection)
當 Job 由佇列處理時,會呼叫 handle 方法。請注意,我們可以在 Job 的 handle 方法上對依賴項進行型別提示。Laravel 服務容器會自動注入這些依賴項。
如果您想完全控制容器如何將依賴項注入到 handle 方法中,您可以使用容器的 bindMethod 方法。bindMethod 方法接受一個回呼,該回呼接收 Job 和容器。在回呼中,您可以自由地以任何您希望的方式呼叫 handle 方法。通常,您應該從 App\Providers\AppServiceProvider 服務提供者的 boot 方法中呼叫此方法:
use App\Jobs\ProcessPodcast;
use App\Services\AudioProcessor;
use Illuminate\Contracts\Foundation\Application;
$this->app->bindMethod([ProcessPodcast::class, 'handle'], function (ProcessPodcast $job, Application $app) {
return $job->handle($app->make(AudioProcessor::class));
});
[!WARNING] 二進位資料(例如原始圖片內容)在傳遞給佇列 Job 之前應透過
base64_encode函式傳遞。否則,Job 在放入佇列時可能無法正確序列化為 JSON。
佇列關聯 (Queued Relationships)
由於所有載入的 Eloquent 模型關聯在 Job 被佇列時也會被序列化,因此序列化的 Job 字串有時會變得相當大。此外,當 Job 被反序列化並且模型關聯從資料庫重新檢索時,它們將被完整地檢索。在 Job 佇列過程中模型被序列化之前應用的任何先前關聯約束,在 Job 被反序列化時將不會被應用。因此,如果您希望使用給定關聯的子集,您應該在佇列 Job 中重新約束該關聯。
或者,為了防止關聯被序列化,您可以在設定屬性值時在模型上呼叫 withoutRelations 方法。此方法將傳回一個沒有載入關聯的模型實例:
/**
* Create a new job instance.
*/
public function __construct(
Podcast $podcast,
) {
$this->podcast = $podcast->withoutRelations();
}
如果您使用 PHP 建構函式屬性提升並且想指示 Eloquent 模型不應序列化其關聯,您可以使用 WithoutRelations 屬性:
use Illuminate\Queue\Attributes\WithoutRelations;
/**
* Create a new job instance.
*/
public function __construct(
#[WithoutRelations]
public Podcast $podcast,
) {}
為了方便起見,如果您希望序列化所有沒有關聯的模型,您可以將 WithoutRelations 屬性應用於整個類別,而不是將屬性應用於每個模型:
<?php
namespace App\Jobs;
use App\Models\DistributionPlatform;
use App\Models\Podcast;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
use Illuminate\Queue\Attributes\WithoutRelations;
#[WithoutRelations]
class ProcessPodcast implements ShouldQueue
{
use Queueable;
/**
* Create a new job instance.
*/
public function __construct(
public Podcast $podcast,
public DistributionPlatform $platform,
) {}
}
如果 Job 接收 Eloquent 模型的集合或陣列而不是單個模型,則當 Job 被反序列化和執行時,該集合中的模型將不會恢復其關聯。這是為了防止在處理大量模型的 Job 上過度使用資源。
唯一 Job (Unique Jobs)
[!WARNING] 唯一 Job 需要支援鎖定的快取驅動程式。目前,
memcached、redis、dynamodb、database、file和array快取驅動程式支援原子鎖定。
[!WARNING] 唯一 Job 約束不適用於批次中的 Job。
有時,您可能希望確保在任何時間點佇列中只有一個特定 Job 的實例。您可以透過在 Job 類別上實作 ShouldBeUnique 介面來做到這一點。此介面不需要您在類別上定義任何額外的方法:
<?php
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUnique;
class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
{
// ...
}
在上面的範例中,UpdateSearchIndex Job 是唯一的。因此,如果另一個 Job 實例已經在佇列中且尚未完成處理,則不會分派該 Job。
在某些情況下,您可能希望定義一個特定的「鍵」使 Job 唯一,或者您可能希望指定一個逾時時間,超過該時間 Job 將不再保持唯一。為了實現這一點,您可以在 Job 類別上定義 uniqueId 和 uniqueFor 屬性或方法:
<?php
namespace App\Jobs;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUnique;
class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
{
/**
* The product instance.
*
* @var \App\Models\Product
*/
public $product;
/**
* The number of seconds after which the job's unique lock will be released.
*
* @var int
*/
public $uniqueFor = 3600;
/**
* Get the unique ID for the job.
*/
public function uniqueId(): string
{
return $this->product->id;
}
}
在上面的範例中,UpdateSearchIndex Job 透過產品 ID 是唯一的。因此,任何具有相同產品 ID 的新 Job 分派都將被忽略,直到現有 Job 完成處理。此外,如果現有 Job 在一小時內未被處理,唯一鎖定將被釋放,另一個具有相同唯一鍵的 Job 可以被分派到佇列。
[!WARNING] 如果您的應用程式從多個 Web 伺服器或容器分派 Job,您應該確保所有伺服器都與同一個中央快取伺服器通訊,以便 Laravel 可以準確地確定 Job 是否唯一。
保持 Job 唯一直到處理開始 (Keeping Jobs Unique Until Processing Begins)
預設情況下,唯一 Job 在 Job 完成處理或所有重試嘗試失敗後「解鎖」。但是,在某些情況下,您可能希望在 Job 被處理之前立即解鎖。為了實現這一點,您的 Job 應該實作 ShouldBeUniqueUntilProcessing 契約而不是 ShouldBeUnique 契約:
<?php
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUniqueUntilProcessing;
class UpdateSearchIndex implements ShouldQueue, ShouldBeUniqueUntilProcessing
{
// ...
}
唯一 Job 鎖定 (Unique Job Locks)
在幕後,當分派 ShouldBeUnique Job 時,Laravel 嘗試使用 uniqueId 鍵獲取鎖定。如果鎖定已被持有,則不會分派 Job。當 Job 完成處理或所有重試嘗試失敗時,此鎖定將被釋放。預設情況下,Laravel 將使用預設快取驅動程式來獲取此鎖定。但是,如果您希望使用另一個驅動程式來獲取鎖定,您可以定義一個 uniqueVia 方法,該方法傳回應使用的快取驅動程式:
use Illuminate\Contracts\Cache\Repository;
use Illuminate\Support\Facades\Cache;
class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
{
// ...
/**
* Get the cache driver for the unique job lock.
*/
public function uniqueVia(): Repository
{
return Cache::driver('redis');
}
}
[!NOTE] 如果您只需要限制 Job 的並發處理,請改用 WithoutOverlapping Job Middleware。
加密 Job (Encrypted Jobs)
Laravel 允許您透過加密確保 Job 資料的隱私和完整性。為了開始,只需將 ShouldBeEncrypted 介面新增至 Job 類別。一旦此介面被新增至類別,Laravel 將在將 Job 推送到佇列之前自動加密您的 Job:
<?php
use Illuminate\Contracts\Queue\ShouldBeEncrypted;
use Illuminate\Contracts\Queue\ShouldQueue;
class UpdateSearchIndex implements ShouldQueue, ShouldBeEncrypted
{
// ...
}
Job Middleware (Job Middleware)
Job Middleware 允許您將自訂邏輯包裝在佇列 Job 的執行周圍,從而減少 Job 本身中的樣板程式碼。例如,考慮以下 handle 方法,該方法利用 Laravel 的 Redis 速率限制功能,允許每五秒鐘只處理一個 Job:
use Illuminate\Support\Facades\Redis;
/**
* Execute the job.
*/
public function handle(): void
{
Redis::throttle('key')->block(0)->allow(1)->every(5)->then(function () {
info('Lock obtained...');
// Handle job...
}, function () {
// Could not obtain lock...
return $this->release(5);
});
}
雖然此程式碼有效,但 handle 方法的實作變得雜亂,因為它充斥著 Redis 速率限制邏輯。此外,對於我們想要進行速率限制的任何其他 Job,都必須重複此速率限制邏輯。我們可以定義一個處理速率限制的 Job Middleware,而不是在 handle 方法中進行速率限制:
<?php
namespace App\Jobs\Middleware;
use Closure;
use Illuminate\Support\Facades\Redis;
class RateLimited
{
/**
* Process the queued job.
*
* @param \Closure(object): void $next
*/
public function handle(object $job, Closure $next): void
{
Redis::throttle('key')
->block(0)->allow(1)->every(5)
->then(function () use ($job, $next) {
// Lock obtained...
$next($job);
}, function () use ($job) {
// Could not obtain lock...
$job->release(5);
});
}
}
如您所見,就像路由 Middleware 一樣,Job Middleware 接收正在處理的 Job 和一個應該被呼叫以繼續處理 Job 的回呼。
您可以使用 make:job-middleware Artisan 指令產生新的 Job Middleware 類別。建立 Job Middleware 後,可以透過從 Job 的 middleware 方法傳回它們來將它們附加到 Job。make:job Artisan 指令產生的 Job 上不存在此方法,因此您需要手動將其新增至您的 Job 類別:
use App\Jobs\Middleware\RateLimited;
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [new RateLimited];
}
速率限制 (Rate Limiting)
雖然我們剛剛示範了如何編寫自己的速率限制 Job Middleware,但 Laravel 實際上包含一個速率限制 Middleware,您可以使用它來對 Job 進行速率限制。就像路由速率限制器一樣,Job 速率限制器是使用 RateLimiter facade 的 for 方法定義的。
例如,您可能希望允許使用者每小時備份一次資料,而對高級客戶不施加此類限制。為了實現這一點,您可以在 AppServiceProvider 的 boot 方法中定義一個 RateLimiter:
use Illuminate\Cache\RateLimiting\Limit;
use Illuminate\Support\Facades\RateLimiter;
/**
* Bootstrap any application services.
*/
public function boot(): void
{
RateLimiter::for('backups', function (object $job) {
return $job->user->vipCustomer()
? Limit::none()
: Limit::perHour(1)->by($job->user->id);
});
}
在上面的範例中,我們定義了一個每小時的速率限制;但是,您可以輕鬆地使用 perMinute 方法定義基於分鐘的速率限制。此外,您可以將任何您希望的值傳遞給速率限制的 by 方法;但是,此值最常用於按客戶區分速率限制:
return Limit::perMinute(50)->by($job->user->id);
定義速率限制後,您可以使用 Illuminate\Queue\Middleware\RateLimited Middleware 將速率限制器附加到您的 Job。每當 Job 超過速率限制時,此 Middleware 都會根據速率限制持續時間將 Job 以適當的延遲釋放回佇列:
use Illuminate\Queue\Middleware\RateLimited;
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [new RateLimited('backups')];
}
將受速率限制的 Job 釋放回佇列仍會增加 Job 的 attempts 總數。您可能希望相應地調整 Job 類別上的 tries 和 maxExceptions 屬性。或者,您可能希望使用 retryUntil 方法來定義不再嘗試 Job 之前的時間量。
使用 releaseAfter 方法,您還可以指定在釋放的 Job 再次被嘗試之前必須經過的秒數:
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new RateLimited('backups'))->releaseAfter(60)];
}
如果您不希望 Job 在受到速率限制時重試,可以使用 dontRelease 方法:
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new RateLimited('backups'))->dontRelease()];
}
[!NOTE] 如果您使用 Redis,可以使用
Illuminate\Queue\Middleware\RateLimitedWithRedisMiddleware,它針對 Redis 進行了微調,比基本的速率限制 Middleware 更有效率。
防止 Job 重疊 (Preventing Job Overlaps)
Laravel 包含一個 Illuminate\Queue\Middleware\WithoutOverlapping Middleware,允許您根據任意鍵防止 Job 重疊。當佇列 Job 正在修改一次只能由一個 Job 修改的資源時,這會很有幫助。
例如,讓我們想像您有一個佇列 Job 更新使用者的信用評分,並且您希望防止同一使用者 ID 的信用評分更新 Job 重疊。為了實現這一點,您可以從 Job 的 middleware 方法傳回 WithoutOverlapping Middleware:
use Illuminate\Queue\Middleware\WithoutOverlapping;
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [new WithoutOverlapping($this->user->id)];
}
將重疊的 Job 釋放回佇列仍會增加 Job 的嘗試總數。您可能希望相應地調整 Job 類別上的 tries 和 maxExceptions 屬性。例如,將 tries 屬性保留為預設值 1 將防止任何重疊的 Job 在稍後重試。
任何相同類型的重疊 Job 都將被釋放回佇列。您還可以指定在釋放的 Job 再次被嘗試之前必須經過的秒數:
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new WithoutOverlapping($this->order->id))->releaseAfter(60)];
}
如果您希望立即刪除任何重疊的 Job 以便它們不會被重試,可以使用 dontRelease 方法:
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new WithoutOverlapping($this->order->id))->dontRelease()];
}
WithoutOverlapping Middleware 由 Laravel 的原子鎖定功能提供支援。有時,您的 Job 可能會意外失敗或逾時,導致鎖定未被釋放。因此,您可以使用 expireAfter 方法明確定義鎖定過期時間。例如,下面的範例將指示 Laravel 在 Job 開始處理三分鐘後釋放 WithoutOverlapping 鎖定:
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new WithoutOverlapping($this->order->id))->expireAfter(180)];
}
[!WARNING] >
WithoutOverlappingMiddleware 需要支援鎖定的快取驅動程式。目前,memcached、redis、dynamodb、database、file和array快取驅動程式支援原子鎖定。
在 Job 類別之間共用鎖定鍵 (Sharing Lock Keys Across Job Classes)
預設情況下,WithoutOverlapping Middleware 只會防止相同類別的重疊 Job。因此,雖然兩個不同的 Job 類別可能使用相同的鎖定鍵,但它們不會被阻止重疊。但是,您可以使用 shared 方法指示 Laravel 在 Job 類別之間應用該鍵:
use Illuminate\Queue\Middleware\WithoutOverlapping;
class ProviderIsDown
{
// ...
public function middleware(): array
{
return [
(new WithoutOverlapping("status:{$this->provider}"))->shared(),
];
}
}
class ProviderIsUp
{
// ...
public function middleware(): array
{
return [
(new WithoutOverlapping("status:{$this->provider}"))->shared(),
];
}
}
限制例外 (Throttling Exceptions)
Laravel 包含一個 Illuminate\Queue\Middleware\ThrottlesExceptions Middleware,允許您限制例外。一旦 Job 拋出指定數量的例外,所有後續執行該 Job 的嘗試都將延遲,直到指定的時間間隔結束。此 Middleware 對於與不穩定的第三方服務互動的 Job 特別有用。
例如,讓我們想像一個佇列 Job 與一個開始拋出例外的第三方 API 互動。為了限制例外,您可以從 Job 的 middleware 方法傳回 ThrottlesExceptions Middleware。通常,此 Middleware 應與實作基於時間的嘗試的 Job 配對:
use DateTime;
use Illuminate\Queue\Middleware\ThrottlesExceptions;
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [new ThrottlesExceptions(10, 5 * 60)];
}
/**
* Determine the time at which the job should timeout.
*/
public function retryUntil(): DateTime
{
return now()->addMinutes(30);
}
Middleware 接受的第一個建構函式參數是 Job 在被限制之前可以拋出的例外數量,而第二個建構函式參數是 Job 被限制後再次嘗試之前應經過的秒數。在上面的程式碼範例中,如果 Job 連續拋出 10 個例外,我們將等待 5 分鐘再嘗試該 Job,並受 30 分鐘的時間限制約束。
當 Job 拋出例外但尚未達到例外閾值時,Job 通常會立即重試。但是,您可以透過在將 Middleware 附加到 Job 時呼叫 backoff 方法來指定此類 Job 應延遲的分鐘數:
use Illuminate\Queue\Middleware\ThrottlesExceptions;
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new ThrottlesExceptions(10, 5 * 60))->backoff(5)];
}
在內部,此 Middleware 使用 Laravel 的快取系統來實作速率限制,並且 Job 的類別名稱被用作快取「鍵」。您可以透過在將 Middleware 附加到 Job 時呼叫 by 方法來覆寫此鍵。如果您有多個 Job 與同一個第三方服務互動,並且您希望它們共用一個通用的限制「桶」,以確保它們遵守單個共用限制,這可能會很有用:
use Illuminate\Queue\Middleware\ThrottlesExceptions;
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new ThrottlesExceptions(10, 10 * 60))->by('key')];
}
預設情況下,此 Middleware 將限制每個例外。您可以透過在將 Middleware 附加到 Job 時呼叫 when 方法來修改此行為。只有當提供給 when 方法的閉包傳回 true 時,例外才會被限制:
use Illuminate\Http\Client\HttpClientException;
use Illuminate\Queue\Middleware\ThrottlesExceptions;
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new ThrottlesExceptions(10, 10 * 60))->when(
fn (Throwable $throwable) => $throwable instanceof HttpClientException
)];
}
與將 Job 釋放回佇列或拋出例外的 when 方法不同,deleteWhen 方法允許您在發生給定例外時完全刪除 Job:
use App\Exceptions\CustomerDeletedException;
use Illuminate\Queue\Middleware\ThrottlesExceptions;
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new ThrottlesExceptions(2, 10 * 60))->deleteWhen(CustomerDeletedException::class)];
}
如果您希望將受限制的例外回報給應用程式的例外處理常式,可以透過在將 Middleware 附加到 Job 時呼叫 report 方法來實現。或者,您可以向 report 方法提供一個閉包,只有當給定的閉包傳回 true 時,才會回報例外:
use Illuminate\Http\Client\HttpClientException;
use Illuminate\Queue\Middleware\ThrottlesExceptions;
/**
* Get the middleware the job should pass through.
*
* @return array<int, object>
*/
public function middleware(): array
{
return [(new ThrottlesExceptions(10, 10 * 60))->report(
fn (Throwable $throwable) => $throwable instanceof HttpClientException
)];
}
[!NOTE] 如果您使用 Redis,可以使用
Illuminate\Queue\Middleware\ThrottlesExceptionsWithRedisMiddleware,它針對 Redis 進行了微調,比基本的例外限制 Middleware 更有效率。
跳過 Job (Skipping Jobs)
Skip Middleware 允許您指定應跳過/刪除 Job,而無需修改 Job 的邏輯。如果給定條件評估為 true,Skip::when 方法將刪除 Job,而如果條件評估為 false,Skip::unless 方法將刪除 Job:
use Illuminate\Queue\Middleware\Skip;
/**
* Get the middleware the job should pass through.
*/
public function middleware(): array
{
return [
Skip::when($condition),
];
}
您也可以將 Closure 傳遞給 when 和 unless 方法,以進行更複雜的條件評估:
use Illuminate\Queue\Middleware\Skip;
/**
* Get the middleware the job should pass through.
*/
public function middleware(): array
{
return [
Skip::when(function (): bool {
return $this->shouldSkip();
}),
];
}
分派 Job (Dispatching Jobs)
一旦您編寫了 Job 類別,您就可以使用 Job 本身的 dispatch 方法來分派它。傳遞給 dispatch 方法的參數將提供給 Job 的建構函式:
<?php
namespace App\Http\Controllers;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;
class PodcastController extends Controller
{
/**
* Store a new podcast.
*/
public function store(Request $request): RedirectResponse
{
$podcast = Podcast::create(/* ... */);
// ...
ProcessPodcast::dispatch($podcast);
return redirect('/podcasts');
}
}
如果您想有條件地分派 Job,可以使用 dispatchIf 和 dispatchUnless 方法:
ProcessPodcast::dispatchIf($accountActive, $podcast);
ProcessPodcast::dispatchUnless($accountSuspended, $podcast);
在新的 Laravel 應用程式中,database 連線被定義為預設佇列。您可以透過變更應用程式 .env 檔案中的 QUEUE_CONNECTION 環境變數來指定不同的預設佇列連線。
延遲分派 (Delayed Dispatching)
如果您想指定 Job 不應立即由佇列 Worker 處理,您可以在分派 Job 時使用 delay 方法。例如,讓我們指定 Job 在分派後 10 分鐘內不應被處理:
<?php
namespace App\Http\Controllers;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;
class PodcastController extends Controller
{
/**
* Store a new podcast.
*/
public function store(Request $request): RedirectResponse
{
$podcast = Podcast::create(/* ... */);
// ...
ProcessPodcast::dispatch($podcast)
->delay(now()->addMinutes(10));
return redirect('/podcasts');
}
}
在某些情況下,Job 可能設定了預設延遲。如果您需要繞過此延遲並分派 Job 以立即處理,可以使用 withoutDelay 方法:
ProcessPodcast::dispatch($podcast)->withoutDelay();
[!WARNING] Amazon SQS 佇列服務的最大延遲時間為 15 分鐘。
同步分派 (Synchronous Dispatching)
如果您想立即(同步)分派 Job,可以使用 dispatchSync 方法。使用此方法時,Job 將不會被佇列,並將在目前程序中立即執行:
<?php
namespace App\Http\Controllers;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;
class PodcastController extends Controller
{
/**
* Store a new podcast.
*/
public function store(Request $request): RedirectResponse
{
$podcast = Podcast::create(/* ... */);
// Create podcast...
ProcessPodcast::dispatchSync($podcast);
return redirect('/podcasts');
}
}
延遲分派 (Deferred Dispatching)
使用延遲同步分派,您可以分派一個 Job 在目前程序期間處理,但在 HTTP 回應發送給使用者之後。這允許您同步處理「佇列」Job,而不會減慢使用者的應用程式體驗。要延遲同步 Job 的執行,請將 Job 分派到 deferred 連線:
RecordDelivery::dispatch($order)->onConnection('deferred');
deferred 連線也作為預設的容錯移轉佇列。
同樣地,background 連線在 HTTP 回應發送給使用者後處理 Job;但是,Job 在單獨產生的 PHP 程序中處理,允許 PHP-FPM / 應用程式 Worker 可用於處理另一個傳入的 HTTP 請求:
RecordDelivery::dispatch($order)->onConnection('background');
Job 與資料庫交易 (Jobs & Database Transactions)
雖然在資料庫交易中分派 Job 完全沒問題,但您應該特別注意確保您的 Job 實際上能夠成功執行。在交易中分派 Job 時,Job 可能會在父交易提交之前由 Worker 處理。發生這種情況時,您在資料庫交易期間對模型或資料庫記錄所做的任何更新可能尚未反映在資料庫中。此外,在交易中建立的任何模型或資料庫記錄可能不存在於資料庫中。
幸運的是,Laravel 提供了幾種解決此問題的方法。首先,您可以在佇列連線的設定陣列中設定 after_commit 連線選項:
'redis' => [
'driver' => 'redis',
// ...
'after_commit' => true,
],
當 after_commit 選項為 true 時,您可以在資料庫交易中分派 Job;但是,Laravel 將等待所有開啟的父資料庫交易提交後才實際分派 Job。當然,如果目前沒有開啟的資料庫交易,Job 將立即分派。
如果交易由於交易期間發生的例外而回滾,則在該交易期間分派的 Job 將被丟棄。
[!NOTE] 將
after_commit設定選項設定為true也會導致任何佇列事件監聽器、可郵寄項目、通知和廣播事件在所有開啟的資料庫交易提交後分派。
內聯指定提交分派行為 (Specifying Commit Dispatch Behavior Inline)
如果您未將 after_commit 佇列連線設定選項設定為 true,您仍然可以指示特定 Job 應在所有開啟的資料庫交易提交後分派。為了實現這一點,您可以將 afterCommit 方法鏈接到您的分派操作上:
use App\Jobs\ProcessPodcast;
ProcessPodcast::dispatch($podcast)->afterCommit();
同樣地,如果 after_commit 設定選項設定為 true,您可以指示特定 Job 應立即分派,而無需等待任何開啟的資料庫交易提交:
ProcessPodcast::dispatch($podcast)->beforeCommit();
Job 鏈接 (Job Chaining)
Job 鏈接允許您指定一個佇列 Job 清單,這些 Job 應在主要 Job 成功執行後按順序執行。如果序列中的一個 Job 失敗,其餘的 Job 將不會執行。要執行佇列 Job 鏈,您可以使用 Bus facade 提供的 chain 方法。Laravel 的指令匯流排是佇列 Job 分派建立在其之上的較低層級元件:
use App\Jobs\OptimizePodcast;
use App\Jobs\ProcessPodcast;
use App\Jobs\ReleasePodcast;
use Illuminate\Support\Facades\Bus;
Bus::chain([
new ProcessPodcast,
new OptimizePodcast,
new ReleasePodcast,
])->dispatch();
除了鏈接 Job 類別實例外,您還可以鏈接閉包:
Bus::chain([
new ProcessPodcast,
new OptimizePodcast,
function () {
Podcast::update(/* ... */);
},
])->dispatch();
[!WARNING] 在 Job 中使用
$this->delete()方法刪除 Job 不會阻止鏈接的 Job 被處理。只有當鏈接中的 Job 失敗時,鏈接才會停止執行。
鏈接連線與佇列 (Chain Connection and Queue)
如果您想指定用於鏈接 Job 的連線和佇列,可以使用 onConnection 和 onQueue 方法。這些方法指定應使用的佇列連線和佇列名稱,除非佇列 Job 明確分配了不同的連線/佇列:
Bus::chain([
new ProcessPodcast,
new OptimizePodcast,
new ReleasePodcast,
])->onConnection('redis')->onQueue('podcasts')->dispatch();
新增 Job 到鏈接 (Adding Jobs to the Chain)
偶爾,您可能需要從鏈接中的另一個 Job 內將 Job 預置或附加到現有 Job 鏈接。您可以使用 prependToChain 和 appendToChain 方法來實現此目的:
/**
* Execute the job.
*/
public function handle(): void
{
// ...
// Prepend to the current chain, run job immediately after current job...
$this->prependToChain(new TranscribePodcast);
// Append to the current chain, run job at end of chain...
$this->appendToChain(new TranscribePodcast);
}
鏈接失敗 (Chain Failures)
鏈接 Job 時,您可以使用 catch 方法指定一個閉包,如果鏈接中的 Job 失敗,該閉包將被呼叫。給定的回呼將接收導致 Job 失敗的 Throwable 實例:
use Illuminate\Support\Facades\Bus;
use Throwable;
Bus::chain([
new ProcessPodcast,
new OptimizePodcast,
new ReleasePodcast,
])->catch(function (Throwable $e) {
// A job within the chain has failed...
})->dispatch();
[!WARNING] 由於鏈接回呼由 Laravel 佇列序列化並在稍後執行,因此您不應在鏈接回呼中使用
$this變數。
自訂佇列與連線 (Customizing the Queue and Connection) (Customizing The Queue and Connection)
分派到特定佇列 (Dispatching to a Particular Queue)
透過將 Job 推送到不同的佇列,您可以對佇列 Job 進行「分類」,甚至優先考慮分配給各種佇列的 Worker 數量。請記住,這不會將 Job 推送到佇列設定檔中定義的不同佇列「連線」,而只會推送到單個連線中的特定佇列。要指定佇列,請在分派 Job 時使用 onQueue 方法:
<?php
namespace App\Http\Controllers;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;
class PodcastController extends Controller
{
/**
* Store a new podcast.
*/
public function store(Request $request): RedirectResponse
{
$podcast = Podcast::create(/* ... */);
// Create podcast...
ProcessPodcast::dispatch($podcast)->onQueue('processing');
return redirect('/podcasts');
}
}
或者,您可以在 Job 的建構函式中呼叫 onQueue 方法來指定 Job 的佇列:
<?php
namespace App\Jobs;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
class ProcessPodcast implements ShouldQueue
{
use Queueable;
/**
* Create a new job instance.
*/
public function __construct()
{
$this->onQueue('processing');
}
}
分派到特定連線 (Dispatching to a Particular Connection)
如果您的應用程式與多個佇列連線互動,您可以使用 onConnection 方法指定將 Job 推送到哪個連線:
<?php
namespace App\Http\Controllers;
use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;
class PodcastController extends Controller
{
/**
* Store a new podcast.
*/
public function store(Request $request): RedirectResponse
{
$podcast = Podcast::create(/* ... */);
// Create podcast...
ProcessPodcast::dispatch($podcast)->onConnection('sqs');
return redirect('/podcasts');
}
}
您可以將 onConnection 和 onQueue 方法鏈接在一起,以指定 Job 的連線和佇列:
ProcessPodcast::dispatch($podcast)
->onConnection('sqs')
->onQueue('processing');
或者,您可以在 Job 的建構函式中呼叫 onConnection 方法來指定 Job 的連線:
<?php
namespace App\Jobs;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
class ProcessPodcast implements ShouldQueue
{
use Queueable;
/**
* Create a new job instance.
*/
public function __construct()
{
$this->onConnection('sqs');
}
}
指定最大 Job 嘗試次數 / 逾時值 (Specifying Max Job Attempts / Timeout (Specifying Max Job Attempts / Timeout Values)
Values)
最大嘗試次數 (Max Attempts)
Job 嘗試是 Laravel 佇列系統的核心概念,並支援許多進階功能。雖然一開始可能看起來很混亂,但在修改預設設定之前了解它們的工作原理非常重要。
當 Job 被分派時,它被推送到佇列中。然後,Worker 獲取它並嘗試執行它。這是一個 Job 嘗試。
但是,嘗試並不一定意味著 Job 的 handle 方法已執行。嘗試也可以透過多種方式「消耗」:
- Job 在執行期間遇到未處理的例外。
- 使用
$this->release()手動將 Job 釋放回佇列。 - 像
WithoutOverlapping或RateLimited這樣的 Middleware 無法獲取鎖定並釋放 Job。 - Job 逾時。
- Job 的
handle方法執行並完成,沒有拋出例外。
您可能不希望無限期地嘗試 Job。因此,Laravel 提供了各種方法來指定 Job 可以被嘗試的次數或時間長度。
[!NOTE] 預設情況下,Laravel 只會嘗試 Job 一次。如果您的 Job 使用像
WithoutOverlapping或RateLimited這樣的 Middleware,或者如果您手動釋放 Job,您可能需要透過tries選項增加允許的嘗試次數。
指定 Job 最大嘗試次數的一種方法是透過 Artisan 指令行上的 --tries 開關。這將適用於 Worker 處理的所有 Job,除非正在處理的 Job 指定了它可以被嘗試的次數:
php artisan queue:work --tries=3
如果 Job 超過其最大嘗試次數,它將被視為「失敗」的 Job。有關處理失敗 Job 的更多資訊,請參閱失敗 Job 文件。如果向 queue:work 指令提供 --tries=0,則 Job 將無限期重試。
您可以透過在 Job 類別本身上定義 Job 可以被嘗試的最大次數來採取更精細的方法。如果在 Job 上指定了最大嘗試次數,它將優先於指令行上提供的 --tries 值:
<?php
namespace App\Jobs;
class ProcessPodcast implements ShouldQueue
{
/**
* The number of times the job may be attempted.
*
* @var int
*/
public $tries = 5;
}
如果您需要動態控制特定 Job 的最大嘗試次數,您可以在 Job 上定義 tries 方法:
/**
* Determine number of times the job may be attempted.
*/
public function tries(): int
{
return 5;
}
基於時間的嘗試 (Time Based Attempts)
作為定義 Job 在失敗之前可以被嘗試多少次的替代方法,您可以定義 Job 不應再被嘗試的時間。這允許 Job 在給定的時間範圍內被嘗試任意次數。要定義 Job 不應再被嘗試的時間,請將 retryUntil 方法新增至您的 Job 類別。此方法應傳回一個 DateTime 實例:
use DateTime;
/**
* Determine the time at which the job should timeout.
*/
public function retryUntil(): DateTime
{
return now()->addMinutes(10);
}
如果同時定義了 retryUntil 和 tries,Laravel 會優先考慮 retryUntil 方法。
最大例外 (Max Exceptions)
有時您可能希望指定 Job 可以被嘗試多次,但如果重試是由給定數量的未處理例外觸發的(而不是直接由 release 方法釋放),則應該失敗。為了實現這一點,您可以在 Job 類別上定義 maxExceptions 屬性:
<?php
namespace App\Jobs;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
use Illuminate\Support\Facades\Redis;
class ProcessPodcast implements ShouldQueue
{
use Queueable;
/**
* The number of times the job may be attempted.
*
* @var int
*/
public $tries = 25;
/**
* The maximum number of unhandled exceptions to allow before failing.
*
* @var int
*/
public $maxExceptions = 3;
/**
* Execute the job.
*/
public function handle(): void
{
Redis::throttle('key')->allow(10)->every(60)->then(function () {
// Lock obtained, process the podcast...
}, function () {
// Unable to obtain lock...
return $this->release(10);
});
}
}
在此範例中,如果應用程式無法獲取 Redis 鎖定,Job 將被釋放十秒鐘,並將繼續重試最多 25 次。但是,如果 Job 拋出三個未處理的例外,Job 將失敗。
逾時 (Timeout)
通常,您大致知道您預計佇列 Job 需要多長時間。因此,Laravel 允許您指定一個「逾時」值。預設情況下,逾時值為 60 秒。如果 Job 的處理時間超過逾時值指定的秒數,處理該 Job 的 Worker 將因錯誤而退出。通常,Worker 將由伺服器上設定的程序管理器自動重新啟動。
可以使用 Artisan 指令行上的 --timeout 開關指定 Job 可以執行的最大秒數:
php artisan queue:work --timeout=30
如果 Job 透過持續逾時超過其最大嘗試次數,它將被標記為失敗。
您也可以在 Job 類別本身上定義 Job 應允許執行的最大秒數。如果在 Job 上指定了逾時,它將優先於指令行上指定的任何逾時:
<?php
namespace App\Jobs;
class ProcessPodcast implements ShouldQueue
{
/**
* The number of seconds the job can run before timing out.
*
* @var int
*/
public $timeout = 120;
}
有時,IO 阻塞程序(例如 socket 或傳出 HTTP 連線)可能不遵守您指定的逾時。因此,在使用這些功能時,您也應該始終嘗試使用它們的 API 指定逾時。例如,使用 Guzzle 時,您應該始終指定連線和請求逾時值。
[!WARNING] 必須安裝 PCNTL PHP 擴充功能才能指定 Job 逾時。此外,Job 的「逾時」值應始終小於其「重試後」值。否則,Job 可能會在實際完成執行或逾時之前被重新嘗試。
逾時失敗 (Failing on Timeout)
如果您想指示 Job 在逾時時應標記為失敗,您可以在 Job 類別上定義 $failOnTimeout 屬性:
/**
* Indicate if the job should be marked as failed on timeout.
*
* @var bool
*/
public $failOnTimeout = true;
[!NOTE] 預設情況下,當 Job 逾時時,它會消耗一次嘗試並被釋放回佇列(如果允許重試)。但是,如果您將 Job 設定為在逾時時失敗,無論為嘗試設定的值為何,它都不會被重試。
SQS FIFO 與公平佇列 (SQS FIFO and Fair Queues)
Laravel 支援 Amazon SQS FIFO (先進先出) 佇列,允許您按照發送的確切順序處理 Job,同時透過訊息重複資料刪除確保只處理一次。
FIFO 佇列需要訊息群組 ID 來確定哪些 Job 可以並行處理。具有相同群組 ID 的 Job 按順序處理,而具有不同群組 ID 的訊息可以並發處理。
Laravel 提供了一個流暢的 onGroup 方法來在分派 Job 時指定訊息群組 ID:
ProcessOrder::dispatch($order)
->onGroup("customer-{$order->customer_id}");
SQS FIFO 佇列支援訊息重複資料刪除以確保只處理一次。在您的 Job 類別中實作 deduplicationId 方法以提供自訂重複資料刪除 ID:
<?php
namespace App\Jobs;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
class ProcessSubscriptionRenewal implements ShouldQueue
{
use Queueable;
// ...
/**
* Get the job's deduplication ID.
*/
public function deduplicationId(): string
{
return "renewal-{$this->subscription->id}";
}
}
FIFO 監聽器、郵件和通知 (FIFO Listeners, Mail, and Notifications)
使用 FIFO 佇列時,您還需要在監聽器、郵件和通知上定義訊息群組。或者,您可以將這些物件的佇列實例分派到非 FIFO 佇列。
要為佇列事件監聽器定義訊息群組,請在監聽器上定義 messageGroup 方法。您還可以選擇定義 deduplicationId 方法:
<?php
namespace App\Listeners;
class SendShipmentNotification
{
// ...
/**
* Get the job's message group.
*/
public function messageGroup(): string
{
return 'shipments';
}
/**
* Get the job's deduplication ID.
*/
public function deduplicationId(): string
{
return "shipment-notification-{$this->shipment->id}";
}
}
發送將在 FIFO 佇列上排隊的郵件訊息時,您應該在發送通知時呼叫 onGroup 方法,並可選擇呼叫 withDeduplicator 方法:
use App\Mail\InvoicePaid;
use Illuminate\Support\Facades\Mail;
$invoicePaid = (new InvoicePaid($invoice))
->onGroup('invoices')
->withDeduplicator(fn () => 'invoices-'.$invoice->id);
Mail::to($request->user())->send($invoicePaid);
發送將在 FIFO 佇列上排隊的通知時,您應該在發送通知時呼叫 onGroup 方法,並可選擇呼叫 withDeduplicator 方法:
use App\Notifications\InvoicePaid;
$invoicePaid = (new InvoicePaid($invoice))
->onGroup('invoices')
->withDeduplicator(fn () => 'invoices-'.$invoice->id);
$user->notify($invoicePaid);
佇列容錯移轉 (Queue Failover)
failover 佇列驅動程式在將 Job 推送到佇列時提供自動容錯移轉功能。如果主要佇列連線因任何原因失敗,Laravel 將自動嘗試將 Job 推送到清單中的下一個設定連線。這對於確保佇列可靠性至關重要的生產環境中的高可用性特別有用。
要設定容錯移轉佇列連線,請指定 failover 驅動程式並提供要按順序嘗試的連線名稱陣列。預設情況下,Laravel 在應用程式的 config/queue.php 設定檔中包含一個範例容錯移轉設定:
'failover' => [
'driver' => 'failover',
'connections' => [
'redis',
'database',
'sync',
],
],
設定使用 failover 驅動程式的連線後,您可能希望將容錯移轉連線設定為應用程式 .env 檔案中的預設佇列連線:
QUEUE_CONNECTION=failover
接下來,為容錯移轉連線清單中的每個連線啟動至少一個 Worker:
php artisan queue:work redis
php artisan queue:work database
[!NOTE] 您不需要為使用
sync、background或deferred佇列驅動程式的連線執行 Worker,因為這些驅動程式在目前 PHP 程序中處理 Job。
當佇列連線操作失敗並啟動容錯移轉時,Laravel 將分派 Illuminate\Queue\Events\QueueFailedOver 事件,允許您回報或記錄佇列連線已失敗。
[!TIP] 如果您使用 Laravel Horizon,請記住 Horizon 僅管理 Redis 佇列。如果您的容錯移轉清單包含
database,您應該在 Horizon 旁邊執行常規的php artisan queue:work database程序。
錯誤處理 (Error Handling)
如果在處理 Job 時拋出例外,Job 將自動釋放回佇列,以便可以再次嘗試。Job 將繼續被釋放,直到達到應用程式允許的最大嘗試次數。最大嘗試次數由 queue:work Artisan 指令上使用的 --tries 開關定義。或者,可以在 Job 類別本身上定義最大嘗試次數。有關執行佇列 Worker 的更多資訊,請參閱下文。
手動釋放 Job (Manually Releasing a Job)
有時您可能希望手動將 Job 釋放回佇列,以便稍後再次嘗試。您可以透過呼叫 release 方法來實現此目的:
/**
* Execute the job.
*/
public function handle(): void
{
// ...
$this->release();
}
預設情況下,release 方法將把 Job 釋放回佇列以立即處理。但是,您可以透過將整數或日期實例傳遞給 release 方法來指示佇列在經過指定秒數之前不要讓 Job 可用於處理:
$this->release(10);
$this->release(now()->addSeconds(10));
手動讓 Job 失敗 (Manually Failing a Job)
偶爾您可能需要手動將 Job 標記為「失敗」。為此,您可以呼叫 fail 方法:
/**
* Execute the job.
*/
public function handle(): void
{
// ...
$this->fail();
}
如果您想因為捕獲到的例外而將 Job 標記為失敗,可以將例外傳遞給 fail 方法。或者,為了方便起見,您可以傳遞一個字串錯誤訊息,它將為您轉換為例外:
$this->fail($exception);
$this->fail('Something went wrong.');
[!NOTE] 有關失敗 Job 的更多資訊,請查看處理 Job 失敗的文件。
特定例外時讓 Job 失敗 (Failing Jobs on Specific Exceptions)
FailOnException Job Middleware 允許您在拋出特定例外時短路重試。這允許在外部 API 錯誤等暫時性例外時重試,但在持續性例外(例如使用者的權限被撤銷)時永久失敗 Job:
<?php
namespace App\Jobs;
use App\Models\User;
use Illuminate\Auth\Access\AuthorizationException;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
use Illuminate\Queue\Middleware\FailOnException;
use Illuminate\Support\Facades\Http;
class SyncChatHistory implements ShouldQueue
{
use Queueable;
public $tries = 3;
/**
* Create a new job instance.
*/
public function __construct(
public User $user,
) {}
/**
* Execute the job.
*/
public function handle(): void
{
$this->user->authorize('sync-chat-history');
$response = Http::throw()->get(
"https://chat.laravel.test/?user={$this->user->uuid}"
);
// ...
}
/**
* Get the middleware the job should pass through.
*/
public function middleware(): array
{
return [
new FailOnException([AuthorizationException::class])
];
}
}
Job 批次處理 (Job Batching)
Laravel 的 Job 批次處理功能允許您輕鬆執行一批 Job,然後在批次 Job 完成執行後執行某些操作。在開始之前,您應該建立一個資料庫遷移來建立一個資料表,該資料表將包含有關 Job 批次的元資訊,例如它們的完成百分比。可以使用 make:queue-batches-table Artisan 指令產生此遷移:
php artisan make:queue-batches-table
php artisan migrate
定義可批次處理的 Job (Defining Batchable Jobs)
要定義可批次處理的 Job,您應該像往常一樣建立一個可佇列 Job;但是,您應該將 Illuminate\Bus\Batchable trait 新增至 Job 類別。此 trait 提供對 batch 方法的存取,該方法可用於檢索 Job 正在其中執行的目前批次:
<?php
namespace App\Jobs;
use Illuminate\Bus\Batchable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
class ImportCsv implements ShouldQueue
{
use Batchable, Queueable;
/**
* Execute the job.
*/
public function handle(): void
{
if ($this->batch()->cancelled()) {
// Determine if the batch has been cancelled...
return;
}
// Import a portion of the CSV file...
}
}
分派批次 (Dispatching Batches)
要分派一批 Job,您應該使用 Bus facade 的 batch 方法。當然,批次處理在與完成回呼結合使用時主要有用。因此,您可以使用 then、catch 和 finally 方法來定義批次的完成回呼。這些回呼中的每一個在被呼叫時都會接收一個 Illuminate\Bus\Batch 實例。
當執行多個佇列 Worker 時,批次中的 Job 將並行處理。因此,Job 完成的順序可能與它們新增到批次的順序不同。有關如何按順序執行一系列 Job 的資訊,請參閱我們關於鏈接與批次的文件。
在此範例中,我們將想像我們正在佇列一批 Job,每個 Job 處理 CSV 檔案中的給定行數:
use App\Jobs\ImportCsv;
use Illuminate\Bus\Batch;
use Illuminate\Support\Facades\Bus;
use Throwable;
$batch = Bus::batch([
new ImportCsv(1, 100),
new ImportCsv(101, 200),
new ImportCsv(201, 300),
new ImportCsv(301, 400),
new ImportCsv(401, 500),
])->before(function (Batch $batch) {
// The batch has been created but no jobs have been added...
})->progress(function (Batch $batch) {
// A single job has completed successfully...
})->then(function (Batch $batch) {
// All jobs completed successfully...
})->catch(function (Batch $batch, Throwable $e) {
// First batch job failure detected...
})->finally(function (Batch $batch) {
// The batch has finished executing...
})->dispatch();
return $batch->id;
批次的 ID(可透過 $batch->id 屬性存取)可用於在分派批次後查詢 Laravel 指令匯流排以獲取有關批次的資訊。
[!WARNING] 由於批次回呼由 Laravel 佇列序列化並在稍後執行,因此您不應在回呼中使用
$this變數。此外,由於批次 Job 包裝在資料庫交易中,因此觸發隱式提交的資料庫語句不應在 Job 中執行。
命名批次 (Naming Batches)
如果批次已命名,某些工具(如 Laravel Horizon 和 Laravel Telescope)可能會為批次提供更友善的偵錯資訊。要為批次分配任意名稱,您可以在定義批次時呼叫 name 方法:
$batch = Bus::batch([
// ...
])->then(function (Batch $batch) {
// All jobs completed successfully...
})->name('Import CSV')->dispatch();
批次連線與佇列 (Batch Connection and Queue)
如果您想指定用於批次 Job 的連線和佇列,可以使用 onConnection 和 onQueue 方法。所有批次 Job 必須在同一個連線和佇列中執行:
$batch = Bus::batch([
// ...
])->then(function (Batch $batch) {
// All jobs completed successfully...
})->onConnection('redis')->onQueue('imports')->dispatch();
鏈接與批次 (Chains and Batches)
您可以透過將鏈接 Job 放在陣列中,在批次中定義一組鏈接 Job。例如,我們可以並行執行兩個 Job 鏈,並在兩個 Job 鏈都完成處理時執行回呼:
use App\Jobs\ReleasePodcast;
use App\Jobs\SendPodcastReleaseNotification;
use Illuminate\Bus\Batch;
use Illuminate\Support\Facades\Bus;
Bus::batch([
[
new ReleasePodcast(1),
new SendPodcastReleaseNotification(1),
],
[
new ReleasePodcast(2),
new SendPodcastReleaseNotification(2),
],
])->then(function (Batch $batch) {
// All jobs completed successfully...
})->dispatch();
相反,您可以透過在鏈接中定義批次,在鏈接中執行批次 Job。例如,您可以先執行一批 Job 來發布多個 Podcast,然後執行一批 Job 來發送發布通知:
use App\Jobs\FlushPodcastCache;
use App\Jobs\ReleasePodcast;
use App\Jobs\SendPodcastReleaseNotification;
use Illuminate\Support\Facades\Bus;
Bus::chain([
new FlushPodcastCache,
Bus::batch([
new ReleasePodcast(1),
new ReleasePodcast(2),
]),
Bus::batch([
new SendPodcastReleaseNotification(1),
new SendPodcastReleaseNotification(2),
]),
])->dispatch();
新增 Job 到批次 (Adding Jobs to Batches)
有時,從批次 Job 中向批次新增額外的 Job 可能很有用。當您需要批次處理數千個 Job,而這些 Job 在 Web 請求期間分派可能花費太長時間時,此模式很有用。因此,您可能希望分派初始批次的「載入器」Job,這些 Job 會用更多 Job 來填充批次:
$batch = Bus::batch([
new LoadImportBatch,
new LoadImportBatch,
new LoadImportBatch,
])->then(function (Batch $batch) {
// All jobs completed successfully...
})->name('Import Contacts')->dispatch();
在此範例中,我們將使用 LoadImportBatch Job 用額外的 Job 來填充批次。為了實現這一點,我們可以使用批次實例上的 add 方法,該實例可以透過 Job 的 batch 方法存取:
use App\Jobs\ImportContacts;
use Illuminate\Support\Collection;
/**
* Execute the job.
*/
public function handle(): void
{
if ($this->batch()->cancelled()) {
return;
}
$this->batch()->add(Collection::times(1000, function () {
return new ImportContacts;
}));
}
[!WARNING] 您只能從屬於同一批次的 Job 中向批次新增 Job。
檢查批次 (Inspecting Batches)
提供給批次完成回呼的 Illuminate\Bus\Batch 實例具有各種屬性和方法,可協助您與給定的批次 Job 互動和檢查:
// The UUID of the batch...
$batch->id;
// The name of the batch (if applicable)...
$batch->name;
// The number of jobs assigned to the batch...
$batch->totalJobs;
// The number of jobs that have not been processed by the queue...
$batch->pendingJobs;
// The number of jobs that have failed...
$batch->failedJobs;
// The number of jobs that have been processed thus far...
$batch->processedJobs();
// The completion percentage of the batch (0-100)...
$batch->progress();
// Indicates if the batch has finished executing...
$batch->finished();
// Cancel the execution of the batch...
$batch->cancel();
// Indicates if the batch has been cancelled...
$batch->cancelled();
從路由傳回批次 (Returning Batches From Routes)
所有 Illuminate\Bus\Batch 實例都是 JSON 可序列化的,這意味著您可以直接從應用程式的路由中傳回它們,以檢索包含有關批次資訊(包括其完成進度)的 JSON 負載。這使得在應用程式的 UI 中顯示有關批次完成進度的資訊變得很方便。
要透過 ID 檢索批次,您可以使用 Bus facade 的 findBatch 方法:
use Illuminate\Support\Facades\Bus;
use Illuminate\Support\Facades\Route;
Route::get('/batch/{batchId}', function (string $batchId) {
return Bus::findBatch($batchId);
});
取消批次 (Cancelling Batches)
有時您可能需要取消給定批次的執行。這可以透過呼叫 Illuminate\Bus\Batch 實例上的 cancel 方法來完成:
/**
* Execute the job.
*/
public function handle(): void
{
if ($this->user->exceedsImportLimit()) {
$this->batch()->cancel();
return;
}
if ($this->batch()->cancelled()) {
return;
}
}
正如您在前面的範例中可能已經注意到的那樣,批次 Job 通常應在繼續執行之前確定其相應的批次是否已被取消。但是,為了方便起見,您可以改為將 SkipIfBatchCancelled Middleware 分配給 Job。正如其名稱所示,此 Middleware 將指示 Laravel 如果其相應的批次已被取消,則不處理該 Job:
use Illuminate\Queue\Middleware\SkipIfBatchCancelled;
/**
* Get the middleware the job should pass through.
*/
public function middleware(): array
{
return [new SkipIfBatchCancelled];
}
批次失敗 (Batch Failures)
當批次 Job 失敗時,將呼叫 catch 回呼(如果已分配)。此回呼僅針對批次中第一個失敗的 Job 呼叫。
允許失敗 (Allowing Failures)
當批次中的 Job 失敗時,Laravel 將自動將批次標記為「已取消」。如果您願意,您可以停用此行為,以便 Job 失敗不會自動將批次標記為已取消。這可以透過在分派批次時呼叫 allowFailures 方法來實現:
$batch = Bus::batch([
// ...
])->then(function (Batch $batch) {
// All jobs completed successfully...
})->allowFailures()->dispatch();
您可以選擇向 allowFailures 方法提供一個閉包,該閉包將在每個 Job 失敗時執行:
$batch = Bus::batch([
// ...
])->allowFailures(function (Batch $batch, $exception) {
// Handle individual job failures...
})->dispatch();
重試失敗的批次 Job (Retrying Failed Batch Jobs)
為了方便起見,Laravel 提供了一個 queue:retry-batch Artisan 指令,允許您輕鬆重試給定批次的所有失敗 Job。此指令接受應重試其失敗 Job 的批次 UUID:
php artisan queue:retry-batch 32dbc76c-4f82-4749-b610-a639fe0099b5
清除批次 (Pruning Batches)
如果不進行清除,job_batches 資料表可能會非常快地累積記錄。為了減輕這種情況,您應該排程 queue:prune-batches Artisan 指令每天執行:
use Illuminate\Support\Facades\Schedule;
Schedule::command('queue:prune-batches')->daily();
預設情況下,所有超過 24 小時的已完成批次都將被清除。您可以在呼叫指令時使用 hours 選項來確定保留批次資料的時間。例如,以下指令將刪除所有在 48 小時前完成的批次:
use Illuminate\Support\Facades\Schedule;
Schedule::command('queue:prune-batches --hours=48')->daily();
有時,您的 jobs_batches 資料表可能會累積從未成功完成的批次的批次記錄,例如 Job 失敗且該 Job 從未成功重試的批次。您可以使用 unfinished 選項指示 queue:prune-batches 指令清除這些未完成的批次記錄:
use Illuminate\Support\Facades\Schedule;
Schedule::command('queue:prune-batches --hours=48 --unfinished=72')->daily();
同樣地,您的 jobs_batches 資料表也可能累積已取消批次的批次記錄。您可以使用 cancelled 選項指示 queue:prune-batches 指令清除這些已取消的批次記錄:
use Illuminate\Support\Facades\Schedule;
Schedule::command('queue:prune-batches --hours=48 --cancelled=72')->daily();
在 DynamoDB 中儲存批次 (Storing Batches in DynamoDB)
Laravel 也支援將批次元資訊儲存在 DynamoDB 中,而不是關聯式資料庫中。但是,您需要手動建立一個 DynamoDB 資料表來儲存所有批次記錄。
通常,此資料表應命名為 job_batches,但您應該根據應用程式 queue 設定檔中的 queue.batching.table 設定值來命名資料表。
DynamoDB 批次資料表設定 (DynamoDB Batch Table Configuration)
job_batches 資料表應具有名為 application 的字串主分割區鍵和名為 id 的字串主排序鍵。鍵的 application 部分將包含應用程式 app 設定檔中 name 設定值定義的應用程式名稱。由於應用程式名稱是 DynamoDB 資料表鍵的一部分,因此您可以使用同一個資料表來儲存多個 Laravel 應用程式的 Job 批次。
此外,如果您想利用自動批次清除,您可以為資料表定義 ttl 屬性。
DynamoDB 設定 (DynamoDB Configuration)
接下來,安裝 AWS SDK,以便您的 Laravel 應用程式可以與 Amazon DynamoDB 通訊:
composer require aws/aws-sdk-php
然後,將 queue.batching.driver 設定選項的值設定為 dynamodb。此外,您應該在 batching 設定陣列中定義 key、secret 和 region 設定選項。這些選項將用於向 AWS 進行身分驗證。使用 dynamodb 驅動程式時,不需要 queue.batching.database 設定選項:
'batching' => [
'driver' => env('QUEUE_BATCHING_DRIVER', 'dynamodb'),
'key' => env('AWS_ACCESS_KEY_ID'),
'secret' => env('AWS_SECRET_ACCESS_KEY'),
'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
'table' => 'job_batches',
],
在 DynamoDB 中清除批次 (Pruning Batches in DynamoDB)
當利用 DynamoDB 儲存 Job 批次資訊時,用於清除儲存在關聯式資料庫中的批次的典型清除指令將不起作用。相反,您可以利用 DynamoDB 的原生 TTL 功能來自動刪除舊批次的記錄。
如果您使用 ttl 屬性定義了 DynamoDB 資料表,您可以定義設定參數來指示 Laravel 如何清除批次記錄。queue.batching.ttl_attribute 設定值定義持有 TTL 的屬性名稱,而 queue.batching.ttl 設定值定義批次記錄在上次更新後可以從 DynamoDB 資料表中刪除的秒數:
'batching' => [
'driver' => env('QUEUE_FAILED_DRIVER', 'dynamodb'),
'key' => env('AWS_ACCESS_KEY_ID'),
'secret' => env('AWS_SECRET_ACCESS_KEY'),
'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
'table' => 'job_batches',
'ttl_attribute' => 'ttl',
'ttl' => 60 * 60 * 24 * 7, // 7 days...
],
佇列閉包 (Queueing Closures)
除了將 Job 類別分派到佇列之外,您還可以分派閉包。這對於需要在目前請求週期之外執行的快速、簡單的任務非常有用。將閉包分派到佇列時,閉包的程式碼內容經過加密簽署,因此無法在傳輸過程中修改:
use App\Models\Podcast;
$podcast = Podcast::find(1);
dispatch(function () use ($podcast) {
$podcast->publish();
});
要為佇列閉包分配名稱,以便佇列報告儀表板使用,並由 queue:work 指令顯示,您可以使用 name 方法:
dispatch(function () {
// ...
})->name('Publish Podcast');
使用 catch 方法,您可以提供一個閉包,如果佇列閉包在耗盡所有佇列的設定重試嘗試後未能成功完成,則應執行該閉包:
use Throwable;
dispatch(function () use ($podcast) {
$podcast->publish();
})->catch(function (Throwable $e) {
// This job has failed...
});
[!WARNING] 由於
catch回呼由 Laravel 佇列序列化並在稍後執行,因此您不應在catch回呼中使用$this變數。
執行佇列 Worker (Running the Queue Worker)
queue:work 指令 (The queue:work Command)
Laravel 包含一個 Artisan 指令,該指令將啟動佇列 Worker 並在將新 Job 推送到佇列時處理它們。您可以使用 queue:work Artisan 指令執行 Worker。請注意,一旦 queue:work 指令啟動,它將繼續執行,直到手動停止或關閉終端機:
php artisan queue:work
[!NOTE] 為了讓
queue:work程序在背景永久執行,您應該使用像 Supervisor 這樣的程序監視器來確保佇列 Worker 不會停止執行。
如果您希望在指令的輸出中包含已處理的 Job ID、連線名稱和佇列名稱,可以在呼叫 queue:work 指令時包含 -v 標誌:
php artisan queue:work -v
請記住,佇列 Worker 是長壽命的程序,並將啟動的應用程式狀態儲存在記憶體中。因此,它們在啟動後不會注意到程式碼庫中的變更。因此,在部署過程中,請務必重新啟動您的佇列 Worker. 此外,請記住,您的應用程式建立或修改的任何靜態狀態都不會在 Job 之間自動重設。
或者,您可以執行 queue:listen 指令。使用 queue:listen 指令時,當您想要重新載入更新的程式碼或重設應用程式狀態時,無需手動重新啟動 Worker;但是,此指令的效率明顯低於 queue:work 指令:
php artisan queue:listen
執行多個佇列 Worker (Running Multiple Queue Workers)
要將多個 Worker 分配給佇列並並發處理 Job,您只需啟動多個 queue:work 程序即可。這可以在終端機中的多個索引標籤中本機完成,也可以在生產環境中使用程序管理器的設定設定來完成。使用 Supervisor 時,您可以使用 numprocs 設定值。
指定連線與佇列 (Specifying the Connection and Queue)
您也可以指定 Worker 應使用哪個佇列連線。傳遞給 work 指令的連線名稱應對應於 config/queue.php 設定檔中定義的連線之一:
php artisan queue:work redis
預設情況下,queue:work 指令僅處理給定連線上預設佇列的 Job。但是,您可以透過僅處理給定連線的特定佇列來進一步自訂佇列 Worker。例如,如果您的所有電子郵件都在 redis 佇列連線上的 emails 佇列中處理,您可以發出以下指令來啟動僅處理該佇列的 Worker:
php artisan queue:work redis --queue=emails
處理指定數量的 Job (Processing a Specified Number of Jobs)
--once 選項可用於指示 Worker 僅處理佇列中的單個 Job:
php artisan queue:work --once
--max-jobs 選項可用於指示 Worker 處理給定數量的 Job 然後退出。當與 Supervisor 結合使用時,此選項可能很有用,以便您的 Worker 在處理給定數量的 Job 後自動重新啟動,釋放它們可能累積的任何記憶體:
php artisan queue:work --max-jobs=1000
處理所有佇列 Job 然後退出 (Processing All Queued Jobs and Then Exiting)
--stop-when-empty 選項可用於指示 Worker 處理所有 Job,然後優雅地退出。如果您希望在佇列為空後關閉容器,此選項在 Docker 容器內處理 Laravel 佇列時非常有用:
php artisan queue:work --stop-when-empty
處理指定秒數的 Job (Processing Jobs for a Given Number of Seconds)
--max-time 選項可用於指示 Worker 處理指定秒數的 Job,然後退出。當與 Supervisor 結合使用時,此選項可能很有用,以便您的 Worker 在處理給定時間量的 Job 後自動重新啟動,釋放它們可能累積的任何記憶體:
# Process jobs for one hour and then exit...
php artisan queue:work --max-time=3600
Worker 睡眠持續時間 (Worker Sleep Duration)
當佇列中有 Job 可用時,Worker 將繼續處理 Job,在 Job 之間沒有延遲。但是,sleep 選項決定了如果沒有 Job 可用,Worker 將「睡眠」多少秒。當然,在睡眠時,Worker 不會處理任何新 Job:
php artisan queue:work --sleep=3
維護模式與佇列 (Maintenance Mode and Queues)
當您的應用程式處於維護模式時,將不會處理任何佇列 Job。一旦應用程式退出維護模式,Job 將繼續正常處理。
要強制您的佇列 Worker 即使在啟用維護模式的情況下也處理 Job,您可以使用 --force 選項:
php artisan queue:work --force
資源考量 (Resource Considerations)
Daemon 佇列 Worker 在處理每個 Job 之前不會「重新啟動」框架。因此,您應該在每個 Job 完成後釋放任何繁重的資源。例如,如果您使用 GD 函式庫進行圖片處理,則應在完成圖片處理後使用 imagedestroy 釋放記憶體。
佇列優先順序 (Queue Priorities)
有時您可能希望優先處理您的佇列。例如,在您的 config/queue.php 設定檔中,您可以將 redis 連線的預設 queue 設定為 low。但是,偶爾您可能希望將 Job 推送到 high 優先順序佇列,如下所示:
dispatch((new Job)->onQueue('high'));
要啟動一個 Worker,該 Worker 在繼續處理 low 佇列上的任何 Job 之前驗證所有 high 佇列 Job 是否已處理,請將以逗號分隔的佇列名稱清單傳遞給 work 指令:
php artisan queue:work --queue=high,low
佇列 Worker 與部署 (Queue Workers and Deployment)
由於佇列 Worker 是長壽命的程序,如果不重新啟動,它們將不會注意到程式碼的變更。因此,使用佇列 Worker 部署應用程式的最簡單方法是在部署過程中重新啟動 Worker。您可以透過發出 queue:restart 指令來優雅地重新啟動所有 Worker:
php artisan queue:restart
此指令將指示所有佇列 Worker 在完成處理目前的 Job 後優雅地退出,以免遺失任何現有的 Job。由於執行 queue:restart 指令時佇列 Worker 將退出,因此您應該執行像 Supervisor 這樣的程序管理器來自動重新啟動佇列 Worker。
[!NOTE] 佇列使用 Cache 來儲存重新啟動訊號,因此在使用此功能之前,您應該確認應用程式已正確設定快取驅動程式。
Job 過期與逾時 (Job Expirations and Timeouts)
Job 過期 (Job Expiration)
在您的 config/queue.php 設定檔中,每個佇列連線都定義了一個 retry_after 選項。此選項指定佇列連線在重試正在處理的 Job 之前應等待多少秒。例如,如果 retry_after 的值設定為 90,則如果 Job 在 90 秒內未被釋放或刪除,它將被釋放回佇列。通常,您應該將 retry_after 值設定為您的 Job 合理完成處理所需的最大秒數。
[!WARNING] 唯一不包含
retry_after值的佇列連線是 Amazon SQS。SQS 將根據 AWS 主控台中管理的預設可見性逾時重試 Job。
Worker 逾時 (Worker Timeouts)
queue:work Artisan 指令公開了一個 --timeout 選項。預設情況下,--timeout 值為 60 秒。如果 Job 的處理時間超過逾時值指定的秒數,處理該 Job 的 Worker 將因錯誤而退出。通常,Worker 將由伺服器上設定的程序管理器自動重新啟動:
php artisan queue:work --timeout=60
retry_after 設定選項和 --timeout CLI 選項不同,但它們共同作用以確保 Job 不會遺失,並且 Job 僅成功處理一次。
[!WARNING] >
--timeout值應始終比retry_after設定值短至少幾秒鐘。這將確保處理凍結 Job 的 Worker 在重試 Job 之前始終被終止。如果您的--timeout選項比retry_after設定值長,您的 Job 可能會被處理兩次。
Supervisor 設定 (Supervisor Configuration)
在生產環境中,您需要一種方法來保持 queue:work 程序執行。queue:work 程序可能會因各種原因停止執行,例如超過 Worker 逾時或執行 queue:restart 指令。
因此,您需要設定一個程序監視器,它可以偵測 queue:work 程序何時退出並自動重新啟動它們。此外,程序監視器可以允許您指定要並發執行的 queue:work 程序數量。Supervisor 是 Linux 環境中常用的程序監視器,我們將在以下文件中討論如何設定它。
安裝 Supervisor (Installing Supervisor)
Supervisor 是 Linux 作業系統的程序監視器,如果它們失敗,它將自動重新啟動您的 queue:work 程序。要在 Ubuntu 上安裝 Supervisor,您可以使用以下指令:
sudo apt-get install supervisor
[!NOTE] 如果自己設定和管理 Supervisor 聽起來很難,請考慮使用 Laravel Cloud,它提供了一個用於執行 Laravel 佇列 Worker 的完全託管平台。
設定 Supervisor (Configuring Supervisor)
Supervisor 設定檔通常儲存在 /etc/supervisor/conf.d 目錄中。在此目錄中,您可以建立任意數量的設定檔,指示 Supervisor 如何監視您的程序。例如,讓我們建立一個 laravel-worker.conf 檔案,用於啟動和監視 queue:work 程序:
[program:laravel-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /home/forge/app.com/artisan queue:work sqs --sleep=3 --tries=3 --max-time=3600
autostart=true
autorestart=true
stopasgroup=true
killasgroup=true
user=forge
numprocs=8
redirect_stderr=true
stdout_logfile=/home/forge/app.com/worker.log
stopwaitsecs=3600
在此範例中,numprocs 指令將指示 Supervisor 執行八個 queue:work 程序並監視所有這些程序,如果它們失敗則自動重新啟動它們。您應該變更設定的 command 指令以反映您所需的佇列連線和 Worker 選項。
[!WARNING] 您應該確保
stopwaitsecs的值大於最長執行 Job 所消耗的秒數。否則,Supervisor 可能會在 Job 完成處理之前將其終止。
啟動 Supervisor (Starting Supervisor)
建立設定檔後,您可以使用以下指令更新 Supervisor 設定並啟動程序:
sudo supervisorctl reread
sudo supervisorctl update
sudo supervisorctl start "laravel-worker:*"
有關 Supervisor 的更多資訊,請參閱 Supervisor 文件。
處理失敗的 Job (Dealing With Failed Jobs)
有時您的佇列 Job 會失敗。別擔心,事情並不總是按計劃進行!Laravel 包含一種方便的方法來指定 Job 應嘗試的最大次數。在非同步 Job 超過此嘗試次數後,它將被插入到 failed_jobs 資料庫資料表中。同步分派的 Job 失敗時不會儲存在此資料表中,其例外將由應用程式立即處理。
建立 failed_jobs 資料表的遷移通常已存在於新的 Laravel 應用程式中。但是,如果您的應用程式不包含此資料表的遷移,您可以使用 make:queue-failed-table 指令來建立遷移:
php artisan make:queue-failed-table
php artisan migrate
執行佇列 Worker 程序時,您可以使用 queue:work 指令上的 --tries 開關指定 Job 應嘗試的最大次數。如果您未指定 --tries 選項的值,則 Job 將僅嘗試一次或按照 Job 類別的 $tries 屬性指定的次數進行嘗試:
php artisan queue:work redis --tries=3
使用 --backoff 選項,您可以指定 Laravel 在重試遇到例外的 Job 之前應等待多少秒。預設情況下,Job 會立即釋放回佇列,以便可以再次嘗試:
php artisan queue:work redis --tries=3 --backoff=3
如果您想根據每個 Job 設定 Laravel 在重試遇到例外的 Job 之前應等待多少秒,您可以透過在 Job 類別上定義 backoff 屬性來實現:
/**
* The number of seconds to wait before retrying the job.
*
* @var int
*/
public $backoff = 3;
如果您需要更複雜的邏輯來確定 Job 的退避時間,您可以在 Job 類別上定義 backoff 方法:
/**
* Calculate the number of seconds to wait before retrying the job.
*/
public function backoff(): int
{
return 3;
}
您可以透過從 backoff 方法傳回退避值陣列來輕鬆設定「指數」退避。在此範例中,第一次重試的重試延遲為 1 秒,第二次重試為 5 秒,第三次重試為 10 秒,如果還有更多嘗試次數,則每次後續重試為 10 秒:
/**
* Calculate the number of seconds to wait before retrying the job.
*
* @return array<int, int>
*/
public function backoff(): array
{
return [1, 5, 10];
}
清除失敗 Job 後 (Cleaning Up After Failed Jobs)
當特定 Job 失敗時,您可能希望向使用者發送警報或還原 Job 部分完成的任何操作。為了實現這一點,您可以在 Job 類別上定義 failed 方法。導致 Job 失敗的 Throwable 實例將傳遞給 failed 方法:
<?php
namespace App\Jobs;
use App\Models\Podcast;
use App\Services\AudioProcessor;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
use Throwable;
class ProcessPodcast implements ShouldQueue
{
use Queueable;
/**
* Create a new job instance.
*/
public function __construct(
public Podcast $podcast,
) {}
/**
* Execute the job.
*/
public function handle(AudioProcessor $processor): void
{
// Process uploaded podcast...
}
/**
* Handle a job failure.
*/
public function failed(?Throwable $exception): void
{
// Send user notification of failure, etc...
}
}
[!WARNING] 在呼叫
failed方法之前,會實例化 Job 的新實例;因此,在handle方法中可能發生的任何類別屬性修改都將遺失。
失敗的 Job 不一定是遇到未處理例外的 Job。當 Job 耗盡所有允許的嘗試次數時,也可能被視為失敗。這些嘗試可以透過多種方式消耗:
- Job 逾時。
- Job 在執行期間遇到未處理的例外。
- Job 被手動或由 Middleware 釋放回佇列。
如果最後一次嘗試由於 Job 執行期間拋出的例外而失敗,則該例外將傳遞給 Job 的 failed 方法。但是,如果 Job 因達到允許的最大嘗試次數而失敗,則 $exception 將是 Illuminate\Queue\MaxAttemptsExceededException 的實例。同樣地,如果 Job 因超過設定的逾時而失敗,則 $exception 將是 Illuminate\Queue\TimeoutExceededException 的實例。
重試失敗的 Job (Retrying Failed Jobs)
要檢視所有已插入 failed_jobs 資料庫資料表的失敗 Job,您可以使用 queue:failed Artisan 指令:
php artisan queue:failed
queue:failed 指令將列出 Job ID、連線、佇列、失敗時間以及有關 Job 的其他資訊。Job ID 可用於重試失敗的 Job。例如,要重試 ID 為 ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece 的失敗 Job,請發出以下指令:
php artisan queue:retry ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece
如有必要,您可以將多個 ID 傳遞給指令:
php artisan queue:retry ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece 91401d2c-0784-4f43-824c-34f94a33c24d
您也可以重試特定佇列的所有失敗 Job:
php artisan queue:retry --queue=name
要重試所有失敗的 Job,請執行 queue:retry 指令並傳遞 all 作為 ID:
php artisan queue:retry all
如果您想刪除失敗的 Job,可以使用 queue:forget 指令:
php artisan queue:forget 91401d2c-0784-4f43-824c-34f94a33c24d
[!NOTE] 使用 Horizon 時,您應該使用
horizon:forget指令來刪除失敗的 Job,而不是queue:forget指令。
要從 failed_jobs 資料表中刪除所有失敗的 Job,您可以使用 queue:flush 指令:
php artisan queue:flush
queue:flush 指令會從您的佇列中刪除所有失敗的 Job 記錄,無論失敗的 Job 有多舊。您可以使用 --hours 選項僅刪除在一定小時數之前或更早失敗的 Job:
php artisan queue:flush --hours=48
忽略遺失的模型 (Ignoring Missing Models)
將 Eloquent 模型注入 Job 時,模型在放入佇列之前會自動序列化,並在處理 Job 時從資料庫重新檢索。但是,如果在 Job 等待 Worker 處理時模型已被刪除,您的 Job 可能會因 ModelNotFoundException 而失敗。
為了方便起見,您可以選擇透過將 Job 的 deleteWhenMissingModels 屬性設定為 true 來自動刪除缺少模型的 Job。當此屬性設定為 true 時,Laravel 將靜默丟棄 Job 而不引發例外:
/**
* Delete the job if its models no longer exist.
*
* @var bool
*/
public $deleteWhenMissingModels = true;
清除失敗的 Job (Pruning Failed Jobs)
您可以透過呼叫 queue:prune-failed Artisan 指令來清除應用程式 failed_jobs 資料表中的記錄:
php artisan queue:prune-failed
預設情況下,所有超過 24 小時的失敗 Job 記錄都將被清除。如果您向指令提供 --hours 選項,則僅保留在過去 N 小時內插入的失敗 Job 記錄。例如,以下指令將刪除所有在 48 小時前插入的失敗 Job 記錄:
php artisan queue:prune-failed --hours=48
在 DynamoDB 中儲存失敗的 Job (Storing Failed Jobs in DynamoDB)
Laravel 也支援將失敗的 Job 記錄儲存在 DynamoDB 中,而不是關聯式資料庫資料表中。但是,您必須手動建立一個 DynamoDB 資料表來儲存所有失敗的 Job 記錄。通常,此資料表應命名為 failed_jobs,但您應該根據應用程式 queue 設定檔中的 queue.failed.table 設定值來命名資料表。
failed_jobs 資料表應具有名為 application 的字串主分割區鍵和名為 uuid 的字串主排序鍵。鍵的 application 部分將包含應用程式 app 設定檔中 name 設定值定義的應用程式名稱。由於應用程式名稱是 DynamoDB 資料表鍵的一部分,因此您可以使用同一個資料表來儲存多個 Laravel 應用程式的失敗 Job。
此外,請確保安裝 AWS SDK,以便您的 Laravel 應用程式可以與 Amazon DynamoDB 通訊:
composer require aws/aws-sdk-php
接下來,將 queue.failed.driver 設定選項的值設定為 dynamodb。此外,您應該在失敗 Job 設定陣列中定義 key、secret 和 region 設定選項。這些選項將用於向 AWS 進行身分驗證。使用 dynamodb 驅動程式時,不需要 queue.failed.database 設定選項:
'failed' => [
'driver' => env('QUEUE_FAILED_DRIVER', 'dynamodb'),
'key' => env('AWS_ACCESS_KEY_ID'),
'secret' => env('AWS_SECRET_ACCESS_KEY'),
'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
'table' => 'failed_jobs',
],
停用失敗的 Job 儲存 (Disabling Failed Job Storage)
您可以透過將 queue.failed.driver 設定選項的值設定為 null 來指示 Laravel 丟棄失敗的 Job 而不儲存它們。通常,這可以透過 QUEUE_FAILED_DRIVER 環境變數來完成:
QUEUE_FAILED_DRIVER=null
失敗的 Job 事件 (Failed Job Events)
如果您想註冊一個在 Job 失敗時呼叫的事件監聽器,可以使用 Queue facade 的 failing 方法。例如,我們可以從 Laravel 包含的 AppServiceProvider 的 boot 方法中將閉包附加到此事件:
<?php
namespace App\Providers;
use Illuminate\Support\Facades\Queue;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Events\JobFailed;
class AppServiceProvider extends ServiceProvider
{
/**
* Register any application services.
*/
public function register(): void
{
// ...
}
/**
* Bootstrap any application services.
*/
public function boot(): void
{
Queue::failing(function (JobFailed $event) {
// $event->connectionName
// $event->job
// $event->exception
});
}
}
清除佇列中的 Job (Clearing Jobs From Queues)
[!NOTE] 使用 Horizon 時,您應該使用
horizon:clear指令從佇列中清除 Job,而不是queue:clear指令。
如果您想從預設連線的預設佇列中刪除所有 Job,可以使用 queue:clear Artisan 指令:
php artisan queue:clear
您也可以提供 connection 參數和 queue 選項來刪除特定連線和佇列中的 Job:
php artisan queue:clear redis --queue=emails
[!WARNING] 從佇列中清除 Job 僅適用於 SQS、Redis 和資料庫佇列驅動程式。此外,SQS 訊息刪除過程最多需要 60 秒,因此在清除佇列後 60 秒內發送到 SQS 佇列的 Job 也可能會被刪除。
監控您的佇列 (Monitoring Your Queues)
如果您的佇列突然收到大量 Job,它可能會不堪重負,導致 Job 完成的等待時間很長。如果您願意,Laravel 可以在您的佇列 Job 數量超過指定閾值時向您發出警報。
首先,您應該排程 queue:monitor 指令每分鐘執行一次。該指令接受您希望監控的佇列名稱以及您所需的 Job 數量閾值:
php artisan queue:monitor redis:default,redis:deployments --max=100
單獨排程此指令不足以觸發通知,提醒您佇列已不堪重負。當指令遇到 Job 數量超過閾值的佇列時,將分派 Illuminate\Queue\Events\QueueBusy 事件。您可以在應用程式的 AppServiceProvider 中監聽此事件,以便向您或您的開發團隊發送通知:
use App\Notifications\QueueHasLongWaitTime;
use Illuminate\Queue\Events\QueueBusy;
use Illuminate\Support\Facades\Event;
use Illuminate\Support\Facades\Notification;
/**
* Bootstrap any application services.
*/
public function boot(): void
{
Event::listen(function (QueueBusy $event) {
Notification::route('mail', 'dev@example.com')
->notify(new QueueHasLongWaitTime(
$event->connection,
$event->queue,
$event->size
));
});
}
測試 (Testing)
在測試分派 Job 的程式碼時,您可能希望指示 Laravel 不要實際執行 Job 本身,因為 Job 的程式碼可以直接測試,並且與分派它的程式碼分開測試。當然,要測試 Job 本身,您可以實例化 Job 實例並在測試中直接呼叫 handle 方法。
您可以使用 Queue facade 的 fake 方法來防止佇列 Job 實際被推送到佇列。呼叫 Queue facade 的 fake 方法後,您可以斷言應用程式嘗試將 Job 推送到佇列:
<?php
use App\Jobs\AnotherJob;
use App\Jobs\ShipOrder;
use Illuminate\Support\Facades\Queue;
test('orders can be shipped', function () {
Queue::fake();
// Perform order shipping...
// Assert that no jobs were pushed...
Queue::assertNothingPushed();
// Assert a job was pushed to a given queue...
Queue::assertPushedOn('queue-name', ShipOrder::class);
// Assert a job was pushed twice...
Queue::assertPushed(ShipOrder::class, 2);
// Assert a job was not pushed...
Queue::assertNotPushed(AnotherJob::class);
// Assert that a closure was pushed to the queue...
Queue::assertClosurePushed();
// Assert that a closure was not pushed...
Queue::assertClosureNotPushed();
// Assert the total number of jobs that were pushed...
Queue::assertCount(3);
});
<?php
namespace Tests\Feature;
use App\Jobs\AnotherJob;
use App\Jobs\ShipOrder;
use Illuminate\Support\Facades\Queue;
use Tests\TestCase;
class ExampleTest extends TestCase
{
public function test_orders_can_be_shipped(): void
{
Queue::fake();
// Perform order shipping...
// Assert that no jobs were pushed...
Queue::assertNothingPushed();
// Assert a job was pushed to a given queue...
Queue::assertPushedOn('queue-name', ShipOrder::class);
// Assert a job was pushed twice...
Queue::assertPushed(ShipOrder::class, 2);
// Assert a job was not pushed...
Queue::assertNotPushed(AnotherJob::class);
// Assert that a closure was pushed to the queue...
Queue::assertClosurePushed();
// Assert that a closure was not pushed...
Queue::assertClosureNotPushed();
// Assert the total number of jobs that were pushed...
Queue::assertCount(3);
}
}
您可以將閉包傳遞給 assertPushed、assertNotPushed、assertClosurePushed 或 assertClosureNotPushed 方法,以斷言已推送通過給定「真值測試」的 Job。如果至少推送了一個通過給定真值測試的 Job,則斷言將成功:
use Illuminate\Queue\CallQueuedClosure;
Queue::assertPushed(function (ShipOrder $job) use ($order) {
return $job->order->id === $order->id;
});
Queue::assertClosurePushed(function (CallQueuedClosure $job) {
return $job->name === 'validate-order';
});
偽造 Job 子集 (Faking a Subset of Jobs)
如果您只需要偽造特定 Job,同時允許其他 Job 正常執行,您可以將應偽造的 Job 類別名稱傳遞給 fake 方法:
test('orders can be shipped', function () {
Queue::fake([
ShipOrder::class,
]);
// Perform order shipping...
// Assert a job was pushed twice...
Queue::assertPushed(ShipOrder::class, 2);
});
public function test_orders_can_be_shipped(): void
{
Queue::fake([
ShipOrder::class,
]);
// Perform order shipping...
// Assert a job was pushed twice...
Queue::assertPushed(ShipOrder::class, 2);
}
您可以使用 except 方法偽造除一組指定 Job 之外的所有 Job:
Queue::fake()->except([
ShipOrder::class,
]);
測試 Job 鏈接 (Testing Job Chains)
要測試 Job 鏈接,您需要利用 Bus facade 的偽造功能。Bus facade 的 assertChained 方法可用於斷言已分派Job 鏈。assertChained 方法接受鏈接 Job 陣列作為其第一個參數:
use App\Jobs\RecordShipment;
use App\Jobs\ShipOrder;
use App\Jobs\UpdateInventory;
use Illuminate\Support\Facades\Bus;
Bus::fake();
// ...
Bus::assertChained([
ShipOrder::class,
RecordShipment::class,
UpdateInventory::class
]);
正如您在上面的範例中看到的,鏈接 Job 陣列可以是 Job 類別名稱的陣列。但是,您也可以提供實際 Job 實例的陣列。這樣做時,Laravel 將確保 Job 實例與應用程式分派的鏈接 Job 具有相同的類別和相同的屬性值:
Bus::assertChained([
new ShipOrder,
new RecordShipment,
new UpdateInventory,
]);
您可以使用 assertDispatchedWithoutChain 方法來斷言 Job 已推送且沒有 Job 鏈:
Bus::assertDispatchedWithoutChain(ShipOrder::class);
測試鏈接修改 (Testing Chain Modifications)
如果鏈接 Job 將 Job 前置或附加到現有鏈接,您可以使用 Job 的 assertHasChain 方法來斷言 Job 具有預期的剩餘 Job 鏈:
$job = new ProcessPodcast;
$job->handle();
$job->assertHasChain([
new TranscribePodcast,
new OptimizePodcast,
new ReleasePodcast,
]);
assertDoesntHaveChain 方法可用於斷言 Job 的剩餘鏈接為空:
$job->assertDoesntHaveChain();
測試鏈接批次 (Testing Chained Batches)
如果您的 Job 鏈包含一批 Job,您可以透過在鏈接斷言中插入 Bus::chainedBatch 定義來斷言鏈接批次符合您的預期:
use App\Jobs\ShipOrder;
use App\Jobs\UpdateInventory;
use Illuminate\Bus\PendingBatch;
use Illuminate\Support\Facades\Bus;
Bus::assertChained([
new ShipOrder,
Bus::chainedBatch(function (PendingBatch $batch) {
return $batch->jobs->count() === 3;
}),
new UpdateInventory,
]);
測試 Job 批次 (Testing Job Batches)
Bus facade 的 assertBatched 方法可用於斷言已分派一批 Job。提供給 assertBatched 方法的閉包接收 Illuminate\Bus\PendingBatch 的實例,該實例可用於檢查批次中的 Job:
use Illuminate\Bus\PendingBatch;
use Illuminate\Support\Facades\Bus;
Bus::fake();
// ...
Bus::assertBatched(function (PendingBatch $batch) {
return $batch->name == 'Import CSV' &&
$batch->jobs->count() === 10;
});
您可以使用 assertBatchCount 方法來斷言已分派給定數量的批次:
Bus::assertBatchCount(3);
您可以使用 assertNothingBatched 來斷言沒有分派任何批次:
Bus::assertNothingBatched();
測試 Job / 批次互動 (Testing Job / Batch Interaction)
此外,您偶爾可能需要測試單個 Job 與其底層批次的互動。例如,您可能需要測試 Job 是否取消了其批次的進一步處理。為了實現這一點,您需要透過 withFakeBatch 方法將偽造批次分配給 Job。withFakeBatch 方法傳回包含 Job 實例和偽造批次的元組:
[$job, $batch] = (new ShipOrder)->withFakeBatch();
$job->handle();
$this->assertTrue($batch->cancelled());
$this->assertEmpty($batch->added);
測試 Job / 佇列互動 (Testing Job / Queue Interactions)
有時,您可能需要測試佇列 Job 是否將自身釋放回佇列。或者,您可能需要測試 Job 是否刪除了自身。您可以透過實例化 Job 並呼叫 withFakeQueueInteractions 方法來測試這些佇列互動。
一旦 Job 的佇列互動被偽造,您就可以在 Job 上呼叫 handle 方法。呼叫 Job 後,可以使用各種斷言方法來驗證 Job 的佇列互動:
use App\Exceptions\CorruptedAudioException;
use App\Jobs\ProcessPodcast;
$job = (new ProcessPodcast)->withFakeQueueInteractions();
$job->handle();
$job->assertReleased(delay: 30);
$job->assertDeleted();
$job->assertNotDeleted();
$job->assertFailed();
$job->assertFailedWith(CorruptedAudioException::class);
$job->assertNotFailed();
Job 事件 (Job Events)
使用 Queue facade 上的 before 和 after 方法,您可以指定在處理佇列 Job 之前或之後執行的回呼。這些回呼是執行額外日誌記錄或增加儀表板統計資訊的絕佳機會。通常,您應該從服務提供者的 boot 方法中呼叫這些方法。例如,我們可以使用 Laravel 包含的 AppServiceProvider:
<?php
namespace App\Providers;
use Illuminate\Support\Facades\Queue;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Events\JobProcessed;
use Illuminate\Queue\Events\JobProcessing;
class AppServiceProvider extends ServiceProvider
{
/**
* Register any application services.
*/
public function register(): void
{
// ...
}
/**
* Bootstrap any application services.
*/
public function boot(): void
{
Queue::before(function (JobProcessing $event) {
// $event->connectionName
// $event->job
// $event->job->payload()
});
Queue::after(function (JobProcessed $event) {
// $event->connectionName
// $event->job
// $event->job->payload()
});
}
}
使用 Queue facade 上的 looping 方法,您可以指定在 Worker 嘗試從佇列中獲取 Job 之前執行的回呼。例如,您可以註冊一個閉包來復原先前失敗的 Job 留下的任何開啟的交易:
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Queue;
Queue::looping(function () {
while (DB::transactionLevel() > 0) {
DB::rollBack();
}
});