Skip to content
This repository was archived by the owner on Jun 2, 2023. It is now read-only.

Commit 0c22cfc

Browse files
authored
Merge pull request #7 from lamoda/feature/SCENTRE-5744
SCENTRE-5744 Handle queue in "in progress" status, some optimization
2 parents 202a596 + 817d4cd commit 0c22cfc

File tree

5 files changed

+55
-83
lines changed

5 files changed

+55
-83
lines changed

src/ConstantMessage.php

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,10 @@ class ConstantMessage
2020

2121
public const PUBLISHER_NOT_FOUND = 'Publisher `%s` not found';
2222

23-
public const QUEUE_ENTITY_NOT_FOUND = 'The queue with id "%d" was not found';
24-
public const QUEUE_ENTITY_NOT_FOUND_IN_STATUS_NEW = 'The queue "%s" with job "%s" was not found in status "new". Actual status is "%s"';
25-
public const QUEUE_ATTEMPTS_REACHED = 'The queue "%s" has reached it\'s attempts count maximum';
26-
public const QUEUE_CAN_NOT_REQUEUE = 'Can not requeue messages';
27-
public const QUEUE_CAN_NOT_REPUBLISH = 'Can not republish messages';
28-
public const QUEUE_SUCCESS_REPUBLISH = 'Queue republish successfully completed';
23+
public const QUEUE_ENTITY_NOT_FOUND = 'The queue with id "%d" was not found';
24+
public const QUEUE_ENTITY_NOT_FOUND_IN_SUITABLE_STATUS = 'The queue "%s" with job "%s" was not found in suitable status. Actual status is "%s"';
25+
public const QUEUE_ATTEMPTS_REACHED = 'The queue "%s" has reached it\'s attempts count maximum';
26+
public const QUEUE_CAN_NOT_REQUEUE = 'Can not requeue messages';
27+
public const QUEUE_CAN_NOT_REPUBLISH = 'Can not republish messages';
28+
public const QUEUE_SUCCESS_REPUBLISH = 'Queue republish successfully completed';
2929
}

src/Consumer.php

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -80,11 +80,7 @@ public function execute(AMQPMessage $message): int
8080
return $this->doExecute(
8181
$this->queueService->getToProcess($data['id'])
8282
);
83-
} catch (UnexpectedValueException $exception) {
84-
$this->logger->alert($exception->getMessage(), $this->getMessageLogParams($message));
85-
86-
return self::MSG_REJECT;
87-
} catch (AttemptsReachedException $exception) {
83+
} catch (UnexpectedValueException | AttemptsReachedException $exception) {
8884
$this->logger->alert($exception->getMessage(), $this->getMessageLogParams($message));
8985

9086
return self::MSG_REJECT;

src/Publisher.php

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,8 +117,6 @@ private function releaseQueues(array $queues): void
117117
'trace' => $e->getTraceAsString(),
118118
]
119119
);
120-
121-
continue;
122120
}
123121
}
124122
}

src/Service/QueueService.php

Lines changed: 29 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,12 @@ class QueueService
3030
/** @var int */
3131
protected $maxAttempts;
3232

33+
/** @var array */
34+
protected $queueSuitableStatuses = [
35+
QueueEntityMappedSuperclass::STATUS_NEW,
36+
QueueEntityMappedSuperclass::STATUS_IN_PROGRESS
37+
];
38+
3339
public function __construct(
3440
QueueRepository $repository,
3541
EntityFactoryInterface $entityFactory,
@@ -64,45 +70,33 @@ public function getToRestore(int $limit, ?int $offset = null): array
6470
*/
6571
public function getToProcess(int $id): QueueEntityInterface
6672
{
67-
$this->repository->beginTransaction();
73+
$queueEntity = $this->repository->find($id);
6874

69-
try {
70-
$queueEntity = $this->repository->findOneBy(
71-
[
72-
'id' => $id,
73-
]
74-
);
75-
76-
if (!($queueEntity instanceof QueueEntityInterface)) {
77-
throw new UnexpectedValueException(sprintf(ConstantMessage::QUEUE_ENTITY_NOT_FOUND, $id));
78-
}
79-
80-
if (QueueEntityMappedSuperclass::STATUS_NEW !== $queueEntity->getStatus()) {
81-
throw new UnexpectedValueException(sprintf(
82-
ConstantMessage::QUEUE_ENTITY_NOT_FOUND_IN_STATUS_NEW,
83-
$queueEntity->getName(),
84-
$queueEntity->getJobName(),
85-
$queueEntity->getStatusAsString()
86-
));
87-
}
88-
89-
$attemptsReached = $queueEntity->isMaxAttemptsReached($this->maxAttempts);
90-
if ($attemptsReached) {
91-
$queueEntity->setAttemptsReached();
92-
} else {
93-
$queueEntity->setInProgress();
94-
}
95-
96-
$this->repository->save($queueEntity);
97-
$this->repository->commit();
98-
} catch (Exception $exception) {
99-
$this->repository->rollback();
100-
101-
throw $exception;
75+
if (!($queueEntity instanceof QueueEntityInterface)) {
76+
throw new UnexpectedValueException(sprintf(ConstantMessage::QUEUE_ENTITY_NOT_FOUND, $id));
10277
}
10378

79+
if (!in_array($queueEntity->getStatus(), $this->queueSuitableStatuses, true)) {
80+
throw new UnexpectedValueException(sprintf(
81+
ConstantMessage::QUEUE_ENTITY_NOT_FOUND_IN_SUITABLE_STATUS,
82+
$queueEntity->getName(),
83+
$queueEntity->getJobName(),
84+
$queueEntity->getStatusAsString()
85+
));
86+
}
87+
88+
$attemptsReached = $queueEntity->isMaxAttemptsReached($this->maxAttempts);
89+
if ($attemptsReached) {
90+
$queueEntity->setAttemptsReached();
91+
} else {
92+
$queueEntity->setInProgress();
93+
}
94+
95+
$this->repository->save($queueEntity);
96+
10497
if ($attemptsReached) {
10598
$this->eventDispatcher->dispatch(QueueAttemptsReachedEvent::NAME, new QueueAttemptsReachedEvent($queueEntity));
99+
106100
throw new AttemptsReachedException(sprintf(ConstantMessage::QUEUE_ATTEMPTS_REACHED, $queueEntity->getName()));
107101
}
108102

@@ -131,7 +125,7 @@ public function getToRepublish(int $limit, ?int $offset = null): array
131125
*/
132126
public function createQueue(QueueInterface $queueable): QueueEntityInterface
133127
{
134-
return $this->save($this->entityFactory->createQueue($queueable));
128+
return $this->entityFactory->createQueue($queueable);
135129
}
136130

137131
public function flush(QueueEntityInterface $entity = null): void

tests/unit/Service/QueueServiceTest.php

Lines changed: 19 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -42,33 +42,36 @@ public function testGetToRestore(): void
4242
}
4343

4444
/**
45+
* @param QueueEntity $queue
46+
*
4547
* @throws \Exception
48+
*
49+
* @dataProvider dataGetToProcess()
4650
*/
47-
public function testGetToProcess(): void
51+
public function testGetToProcess(QueueEntity $queue): void
4852
{
49-
$queue = new QueueEntity('queue', 'exchange', 'ClassJob', ['id' => 1]);
50-
$queue->setNew();
51-
5253
$queueRepository = $this->getQueueRepository();
5354
$queueRepository
5455
->expects($this->once())
55-
->method('beginTransaction');
56-
$queueRepository
57-
->expects($this->once())
58-
->method('findOneBy')
56+
->method('find')
5957
->willReturn($queue);
6058
$queueRepository
6159
->expects($this->once())
6260
->method('save')
6361
->with($queue);
64-
$queueRepository
65-
->expects($this->once())
66-
->method('commit');
6762

6863
$this->assertEquals($queue, $this->createService($queueRepository)->getToProcess(1));
6964
$this->assertEquals(QueueEntity::STATUS_IN_PROGRESS_TITLE, $queue->getStatusAsString());
7065
}
7166

67+
public function dataGetToProcess(): array
68+
{
69+
return [
70+
'new status' => [(new QueueEntity('queue', 'exchange', 'ClassJob', ['id' => 1]))->setNew()],
71+
'in progress status' => [(new QueueEntity('queue', 'exchange', 'ClassJob', ['id' => 1]))->setInProgress()],
72+
];
73+
}
74+
7275
/**
7376
* @param null | QueueEntity $queueEntity
7477
* @param string $expectedExceptionMessage
@@ -83,16 +86,11 @@ public function testGetToProcessQueueNotFound(?QueueEntity $queueEntity, string
8386
$this->expectExceptionMessage($expectedExceptionMessage);
8487

8588
$queueRepository = $this->getQueueRepository();
89+
8690
$queueRepository
8791
->expects($this->once())
88-
->method('beginTransaction');
89-
$queueRepository
90-
->expects($this->once())
91-
->method('findOneBy')
92+
->method('find')
9293
->willReturn($queueEntity);
93-
$queueRepository
94-
->expects($this->once())
95-
->method('rollback');
9694

9795
$this->createService($queueRepository)->getToProcess(1);
9896
}
@@ -106,7 +104,7 @@ public function dataGetToProcessQueueNotFound(): array
106104
],
107105
'Status not NEW' => [
108106
new QueueEntity('queue', 'exchange', 'ClassJob', ['id' => 1]),
109-
'The queue "queue" with job "ClassJob" was not found in status "new". Actual status is "initial"',
107+
'The queue "queue" with job "ClassJob" was not found in suitable status. Actual status is "initial"',
110108
],
111109
];
112110
}
@@ -126,18 +124,12 @@ public function testGetToProcessAttemptsReached(): void
126124
$queueRepository = $this->getQueueRepository();
127125
$queueRepository
128126
->expects($this->once())
129-
->method('beginTransaction');
130-
$queueRepository
131-
->expects($this->once())
132-
->method('findOneBy')
127+
->method('find')
133128
->willReturn($queue);
134129
$queueRepository
135130
->expects($this->once())
136131
->method('save')
137132
->with($queue);
138-
$queueRepository
139-
->expects($this->once())
140-
->method('commit');
141133

142134
$attemptsReachedEvent = new QueueAttemptsReachedEvent($queue);
143135

@@ -184,11 +176,6 @@ public function testCreateQueue(): void
184176
->willReturn($queue);
185177

186178
$queueRepository = $this->getQueueRepository();
187-
$queueRepository
188-
->expects($this->once())
189-
->method('save')
190-
->with($queue)
191-
->willReturnArgument(0);
192179

193180
$this->assertEquals(
194181
$queue,
@@ -232,10 +219,7 @@ private function getQueueRepository()
232219
{
233220
return $this->getMockQueueRepository(
234221
[
235-
'beginTransaction',
236-
'rollback',
237-
'commit',
238-
'findOneBy',
222+
'find',
239223
'getToRestore',
240224
'save',
241225
'isTransactionActive',

0 commit comments

Comments
 (0)