diff --git a/docs/running-queues.md b/docs/running-queues.md index c32d74f..c32184b 100644 --- a/docs/running-queues.md +++ b/docs/running-queues.md @@ -12,7 +12,7 @@ This will cause command to check for the new jobs every 10 seconds if the queue ### With CRON -Using queues with CRON is more challenging, but definitely doable. You can use command like this: +Using queues with CRON is more challenging but definitely doable. You can use command like this: php spark queue:work emails -max-jobs 20 --stop-when-empty @@ -63,6 +63,22 @@ But we can also run the worker like this: This way, worker will consume jobs with the `low` priority and then with `high`. The order set in the config file is override. +### Delaying jobs + +Normally, when we add jobs to a queue, they are run in the order in which we added them to the queue (FIFO - first in, first out). +Of course, there are also priorities, which we described in the previous section. But what about the scenario where we want to run a job, but not earlier than in 5 minutes? + +This is where job delay comes into play. We measure the delay in seconds. + +```php +// This job will be run not sooner than in 5 minutes +service('queue')->setDelay(5 * MINUTE)->push('emails', 'email', ['message' => 'Email sent no sooner than 5 minutes from now']); +``` + +Note that there is no guarantee that the job will run exactly in 5 minutes. If many new jobs are added to the queue (without a delay), it may take a long time before the delayed job is actually executed. + +We can also combine delayed jobs with priorities. + ### Running many instances of the same queue As mentioned above, sometimes we may want to have multiple instances of the same command running at the same time. The queue is safe to use in that scenario with all databases as long as you keep the `skipLocked` to `true` in the config file. Only for SQLite3 driver, this setting is not relevant as it provides atomicity without the need for explicit concurrency control. diff --git a/phpunit.xml.dist b/phpunit.xml.dist index 927d60a..48fe085 100644 --- a/phpunit.xml.dist +++ b/phpunit.xml.dist @@ -12,7 +12,7 @@ stopOnFailure="false" stopOnIncomplete="false" stopOnSkipped="false" - cacheDirectory=".phpunit.cache" + cacheDirectory="build/.phpunit.cache" beStrictAboutCoverageMetadata="true"> diff --git a/src/Exceptions/QueueException.php b/src/Exceptions/QueueException.php index 7e3db3d..542de45 100644 --- a/src/Exceptions/QueueException.php +++ b/src/Exceptions/QueueException.php @@ -51,4 +51,9 @@ public static function forIncorrectQueuePriority(string $priority, string $queue { return new self(lang('Queue.incorrectQueuePriority', [$priority, $queue])); } + + public static function forIncorrectDelayValue(): static + { + return new self(lang('Queue.incorrectDelayValue')); + } } diff --git a/src/Handlers/BaseHandler.php b/src/Handlers/BaseHandler.php index 07aeb3a..63e0f87 100644 --- a/src/Handlers/BaseHandler.php +++ b/src/Handlers/BaseHandler.php @@ -29,6 +29,7 @@ abstract class BaseHandler { protected QueueConfig $config; protected ?string $priority = null; + protected ?int $delay = null; abstract public function name(): string; @@ -62,6 +63,20 @@ public function setPriority(string $priority): static return $this; } + /** + * Set delay for job queue (in seconds). + */ + public function setDelay(int $delay): static + { + if ($delay < 0) { + throw QueueException::forIncorrectDelayValue(); + } + + $this->delay = $delay; + + return $this; + } + /** * Retry failed job. * diff --git a/src/Handlers/DatabaseHandler.php b/src/Handlers/DatabaseHandler.php index 5c11279..71bd497 100644 --- a/src/Handlers/DatabaseHandler.php +++ b/src/Handlers/DatabaseHandler.php @@ -56,10 +56,10 @@ public function push(string $queue, string $job, array $data): bool 'priority' => $this->priority, 'status' => Status::PENDING->value, 'attempts' => 0, - 'available_at' => Time::now(), + 'available_at' => Time::now()->addSeconds($this->delay ?? 0), ]); - $this->priority = null; + $this->priority = $this->delay = null; return $this->jobModel->insert($queueJob, false); } diff --git a/src/Handlers/PredisHandler.php b/src/Handlers/PredisHandler.php index 4c2114d..e781955 100644 --- a/src/Handlers/PredisHandler.php +++ b/src/Handlers/PredisHandler.php @@ -64,6 +64,8 @@ public function push(string $queue, string $job, array $data): bool helper('text'); + $availableAt = Time::now()->addSeconds($this->delay ?? 0); + $queueJob = new QueueJob([ 'id' => random_string('numeric', 16), 'queue' => $queue, @@ -71,12 +73,12 @@ public function push(string $queue, string $job, array $data): bool 'priority' => $this->priority, 'status' => Status::PENDING->value, 'attempts' => 0, - 'available_at' => Time::now(), + 'available_at' => $availableAt, ]); - $result = $this->predis->zadd("queues:{$queue}:{$this->priority}", [json_encode($queueJob) => Time::now()->timestamp]); + $result = $this->predis->zadd("queues:{$queue}:{$this->priority}", [json_encode($queueJob) => $availableAt->timestamp]); - $this->priority = null; + $this->priority = $this->delay = null; return $result > 0; } diff --git a/src/Handlers/RedisHandler.php b/src/Handlers/RedisHandler.php index 8fcf27f..9fc49f2 100644 --- a/src/Handlers/RedisHandler.php +++ b/src/Handlers/RedisHandler.php @@ -81,6 +81,8 @@ public function push(string $queue, string $job, array $data): bool helper('text'); + $availableAt = Time::now()->addSeconds($this->delay ?? 0); + $queueJob = new QueueJob([ 'id' => random_string('numeric', 16), 'queue' => $queue, @@ -88,12 +90,12 @@ public function push(string $queue, string $job, array $data): bool 'priority' => $this->priority, 'status' => Status::PENDING->value, 'attempts' => 0, - 'available_at' => Time::now(), + 'available_at' => $availableAt, ]); - $result = (int) $this->redis->zAdd("queues:{$queue}:{$this->priority}", Time::now()->timestamp, json_encode($queueJob)); + $result = (int) $this->redis->zAdd("queues:{$queue}:{$this->priority}", $availableAt->timestamp, json_encode($queueJob)); - $this->priority = null; + $this->priority = $this->delay = null; return $result > 0; } diff --git a/src/Language/en/Queue.php b/src/Language/en/Queue.php index 9989f76..2336197 100644 --- a/src/Language/en/Queue.php +++ b/src/Language/en/Queue.php @@ -24,4 +24,5 @@ 'incorrectPriorityFormat' => 'The priority name should consists only lowercase letters.', 'tooLongPriorityName' => 'The priority name is too long. It should be no longer than 64 letters.', 'incorrectQueuePriority' => 'This queue has incorrectly defined priority: "{0}" for the queue: "{1}".', + 'incorrectDelayValue' => 'The number of seconds of delay must be a positive integer.', ]; diff --git a/tests/DatabaseHandlerTest.php b/tests/DatabaseHandlerTest.php index d62d015..4c5b159 100644 --- a/tests/DatabaseHandlerTest.php +++ b/tests/DatabaseHandlerTest.php @@ -112,6 +112,9 @@ public function testPushWithPriority(): void ]); } + /** + * @throws ReflectionException + */ public function testPushAndPopWithPriority(): void { Time::setTestNow('2023-12-29 14:15:16'); @@ -148,6 +151,38 @@ public function testPushAndPopWithPriority(): void $this->assertSame($payload, $result->payload); } + /** + * @throws Exception + */ + public function testPushWithDelay(): void + { + Time::setTestNow('2023-12-29 14:15:16'); + + $handler = new DatabaseHandler($this->config); + $result = $handler->setDelay(MINUTE)->push('queue-delay', 'success', ['key' => 'value']); + + $this->assertTrue($result); + + $availableAt = 1703859376; + + $this->seeInDatabase('queue_jobs', [ + 'queue' => 'queue-delay', + 'payload' => json_encode(['job' => 'success', 'data' => ['key' => 'value']]), + 'available_at' => $availableAt, + ]); + + $this->assertEqualsWithDelta(MINUTE, $availableAt - Time::now()->getTimestamp(), 1); + } + + public function testPushWithDelayException(): void + { + $this->expectException(QueueException::class); + $this->expectExceptionMessage('The number of seconds of delay must be a positive integer.'); + + $handler = new DatabaseHandler($this->config); + $handler->setDelay(-60); + } + /** * @throws ReflectionException */ diff --git a/tests/PredisHandlerTest.php b/tests/PredisHandlerTest.php index 51f1492..148adea 100644 --- a/tests/PredisHandlerTest.php +++ b/tests/PredisHandlerTest.php @@ -102,6 +102,27 @@ public function testPushWithPriority(): void $this->assertSame(['key' => 'value'], $queueJob->payload['data']); } + /** + * @throws ReflectionException + */ + public function testPushWithDelay(): void + { + Time::setTestNow('2023-12-29 14:15:16'); + + $handler = new PredisHandler($this->config); + $result = $handler->setDelay(MINUTE)->push('queue-delay', 'success', ['key' => 'value']); + + $this->assertTrue($result); + + $predis = self::getPrivateProperty($handler, 'predis'); + $this->assertSame(1, $predis->zcard('queues:queue-delay:default')); + + $task = $predis->zrangebyscore('queues:queue-delay:default', '-inf', Time::now()->addSeconds(MINUTE)->timestamp, ['limit' => [0, 1]]); + $queueJob = new QueueJob(json_decode((string) $task[0], true)); + $this->assertSame('success', $queueJob->payload['job']); + $this->assertSame(['key' => 'value'], $queueJob->payload['data']); + } + public function testPushException(): void { $this->expectException(QueueException::class); diff --git a/tests/PushAndPopWithDelayTest.php b/tests/PushAndPopWithDelayTest.php new file mode 100644 index 0000000..cb2d0c1 --- /dev/null +++ b/tests/PushAndPopWithDelayTest.php @@ -0,0 +1,103 @@ + + * + * For the full copyright and license information, please view + * the LICENSE file that was distributed with this source code. + */ + +namespace Tests; + +use CodeIgniter\I18n\Time; +use CodeIgniter\Queue\Entities\QueueJob; +use CodeIgniter\Test\ReflectionHelper; +use PHPUnit\Framework\Attributes\DataProvider; +use Tests\Support\Config\Queue as QueueConfig; +use Tests\Support\Database\Seeds\TestDatabaseQueueSeeder; +use Tests\Support\TestCase; + +/** + * @internal + */ +final class PushAndPopWithDelayTest extends TestCase +{ + use ReflectionHelper; + + protected $seed = TestDatabaseQueueSeeder::class; + private QueueConfig $config; + + protected function setUp(): void + { + parent::setUp(); + + $this->config = config(QueueConfig::class); + } + + public static function handlerProvider(): iterable + { + return [ + [ + 'database', // name + 'CodeIgniter\Queue\Handlers\DatabaseHandler', // class + ], + [ + 'redis', + 'CodeIgniter\Queue\Handlers\RedisHandler', + ], + [ + 'predis', + 'CodeIgniter\Queue\Handlers\PredisHandler', + ], + ]; + } + + #[DataProvider('handlerProvider')] + public function testPushAndPopWithDelay(string $name, string $class): void + { + Time::setTestNow('2023-12-29 14:15:16'); + + $handler = new $class($this->config); + $result = $handler->setDelay(MINUTE)->push('queue-delay', 'success', ['key1' => 'value1']); + + $this->assertTrue($result); + + $result = $handler->push('queue-delay', 'success', ['key2' => 'value2']); + + $this->assertTrue($result); + + if ($name === 'database') { + $this->seeInDatabase('queue_jobs', [ + 'queue' => 'queue-delay', + 'payload' => json_encode(['job' => 'success', 'data' => ['key1' => 'value1']]), + 'available_at' => 1703859376, + ]); + + $this->seeInDatabase('queue_jobs', [ + 'queue' => 'queue-delay', + 'payload' => json_encode(['job' => 'success', 'data' => ['key2' => 'value2']]), + 'available_at' => 1703859316, + ]); + } + + $result = $handler->pop('queue-delay', ['default']); + $this->assertInstanceOf(QueueJob::class, $result); + $payload = ['job' => 'success', 'data' => ['key2' => 'value2']]; + $this->assertSame($payload, $result->payload); + + $result = $handler->pop('queue-delay', ['default']); + $this->assertNull($result); + + // add 1 minute + Time::setTestNow('2023-12-29 14:16:16'); + + $result = $handler->pop('queue-delay', ['default']); + $this->assertInstanceOf(QueueJob::class, $result); + $payload = ['job' => 'success', 'data' => ['key1' => 'value1']]; + $this->assertSame($payload, $result->payload); + } +} diff --git a/tests/RedisHandlerTest.php b/tests/RedisHandlerTest.php index 96ae0d6..57178eb 100644 --- a/tests/RedisHandlerTest.php +++ b/tests/RedisHandlerTest.php @@ -19,6 +19,7 @@ use CodeIgniter\Queue\Handlers\RedisHandler; use CodeIgniter\Test\ReflectionHelper; use Exception; +use ReflectionException; use Tests\Support\Config\Queue as QueueConfig; use Tests\Support\Database\Seeds\TestRedisQueueSeeder; use Tests\Support\TestCase; @@ -95,6 +96,27 @@ public function testPushWithPriority(): void $this->assertSame(['key' => 'value'], $queueJob->payload['data']); } + /** + * @throws ReflectionException + */ + public function testPushWithDelay(): void + { + Time::setTestNow('2023-12-29 14:15:16'); + + $handler = new RedisHandler($this->config); + $result = $handler->setDelay(MINUTE)->push('queue-delay', 'success', ['key' => 'value']); + + $this->assertTrue($result); + + $redis = self::getPrivateProperty($handler, 'redis'); + $this->assertSame(1, $redis->zCard('queues:queue-delay:default')); + + $task = $redis->zRangeByScore('queues:queue-delay:default', '-inf', Time::now()->addSeconds(MINUTE)->timestamp, ['limit' => [0, 1]]); + $queueJob = new QueueJob(json_decode((string) $task[0], true)); + $this->assertSame('success', $queueJob->payload['job']); + $this->assertSame(['key' => 'value'], $queueJob->payload['data']); + } + public function testPushException(): void { $this->expectException(QueueException::class);