Skip to content

Commit

Permalink
comments fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
ani2amigos committed Apr 12, 2024
1 parent a96d586 commit 3e7e4b0
Show file tree
Hide file tree
Showing 7 changed files with 61 additions and 29 deletions.
2 changes: 1 addition & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,4 @@ MAILER_DSN=native://default

MAIL_CHARSET=utf-8

MAX_ATTEMPTS_DEFAULT=10
MAX_ATTEMPTS_DEFAULT=3
3 changes: 2 additions & 1 deletion config/mailer.php
Original file line number Diff line number Diff line change
Expand Up @@ -65,5 +65,6 @@
'dsn' => $_ENV['MAIL_DSN']
]
],
'mail-charset' => $_ENV['MAIL_CHARSET'] ?: 'utf-8'
'mail-charset' => $_ENV['MAIL_CHARSET'] ?: 'utf-8',
'max-attempts' => $_ENV['MAX_ATTEMPTS_DEFAULT']
];
21 changes: 13 additions & 8 deletions src/Queue/Backend/Beanstalkd/BeanstalkdQueueStoreAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace Da\Mailer\Queue\Backend\Beanstalkd;

use Da\Mailer\Exception\InvalidCallException;
use Da\Mailer\Helper\ConfigReader;
use Da\Mailer\Queue\Backend\MailJobInterface;
use Da\Mailer\Queue\Backend\QueueStoreAdapterInterface;
use Pheanstalk\Job as PheanstalkJob;
Expand Down Expand Up @@ -107,23 +108,22 @@ public function dequeue()
*/
public function ack(MailJobInterface $mailJob)
{
$config = self::getConfig();
if ($mailJob->isNewRecord()) {
throw new InvalidCallException('BeanstalkdMailJob cannot be a new object to be acknowledged');
}

$pheanstalk = $this->getConnection()->getInstance()->useTube($this->queueName);

$pheanstalk->delete($mailJob->getPheanstalkJob());
$mailJob->incrementAttempt();
if (!$mailJob->isCompleted() && $mailJob->getAttempt() <= $_ENV['MAX_ATTEMPTS_DEFAULT']) {
$timestamp = $mailJob->getTimeToSend();
$delay = max(0, $timestamp - time());
// add back to the queue as it wasn't completed maybe due to some transitory error
// could also be failed.

$pheanstalk->put($this->createPayload($mailJob), Pheanstalk::DEFAULT_PRIORITY, $delay);
if($mailJob->isCompleted() || $mailJob->getAttempt() > $config['max-attempts']) {

$pheanstalk->delete($mailJob->getPheanstalkJob());
return null;
}

$pheanstalk->release($mailJob->getPheanstalkJob());

return null;
}

Expand Down Expand Up @@ -153,4 +153,9 @@ protected function createPayload(MailJobInterface $mailJob)
]);
}

protected static function getConfig()
{
return ConfigReader::get();
}

}
35 changes: 21 additions & 14 deletions src/Queue/Backend/Pdo/PdoQueueStoreAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace Da\Mailer\Queue\Backend\Pdo;

use Da\Mailer\Exception\InvalidCallException;
use Da\Mailer\Helper\ConfigReader;
use Da\Mailer\Queue\Backend\MailJobInterface;
use Da\Mailer\Queue\Backend\QueueStoreAdapterInterface;
use PDO;
Expand Down Expand Up @@ -82,7 +83,6 @@ public function dequeue()
$query->execute();
$queryResult = $query->fetch(PDO::FETCH_ASSOC);
if ($queryResult) {
//
$sqlText = 'UPDATE `%s` SET `state`=:state WHERE `id`=:id';
$sql = sprintf($sqlText, $this->tableName);
$query = $this->getConnection()->getInstance()->prepare($sql);
Expand All @@ -108,30 +108,33 @@ public function dequeue()
*/
public function ack(MailJobInterface $mailJob)
{
$config = self::getConfig();
if ($mailJob->isNewRecord()) {
throw new InvalidCallException('PdoMailJob cannot be a new object to be acknowledged');
}

$mailJob->incrementAttempt();
if($mailJob->getAttempt() > $_ENV['MAX_ATTEMPTS_DEFAULT']){
if($mailJob->getAttempt() > $config['max-attempts']){
$sqlText = 'DELETE FROM mail_queue WHERE id = :max_attempt;';
$sql = sprintf($sqlText, $this->tableName);
$query = $this->getConnection()->getInstance()->prepare($sql);
$query->bindValue(':id', $mailJob->getId());
} else {
$sqlText = 'UPDATE `%s`
SET `attempt`=:attempt, `state`=:state, `timeToSend`=:timeToSend, `sentTime`=:sentTime
WHERE `id`=:id';
$sql = sprintf($sqlText, $this->tableName);
$sentTime = $mailJob->isCompleted() ? date('Y-m-d H:i:s', time()) : null;
$query = $this->getConnection()->getInstance()->prepare($sql);
$query->bindValue(':id', $mailJob->getId(), PDO::PARAM_INT);
$query->bindValue(':attempt', $mailJob->getAttempt(), PDO::PARAM_INT);
$query->bindValue(':state', $mailJob->getState());
$query->bindValue(':timeToSend', $mailJob->getTimeToSend());
$query->bindValue(':sentTime', $sentTime);

return $query->execute();
}

$sqlText = 'UPDATE `%s`
SET `attempt`=:attempt, `state`=:state, `timeToSend`=:timeToSend, `sentTime`=:sentTime
WHERE `id`=:id';
$sql = sprintf($sqlText, $this->tableName);
$sentTime = $mailJob->isCompleted() ? date('Y-m-d H:i:s', time()) : null;
$query = $this->getConnection()->getInstance()->prepare($sql);
$query->bindValue(':id', $mailJob->getId(), PDO::PARAM_INT);
$query->bindValue(':attempt', $mailJob->getAttempt(), PDO::PARAM_INT);
$query->bindValue(':state', $mailJob->getState());
$query->bindValue(':timeToSend', $mailJob->getTimeToSend());
$query->bindValue(':sentTime', $sentTime);

return $query->execute();
}

Expand All @@ -148,4 +151,8 @@ public function isEmpty()
return intval($query->fetchColumn(0)) === 0;
}

protected static function getConfig()
{
return ConfigReader::get();
}
}
10 changes: 8 additions & 2 deletions src/Queue/Backend/RabbitMq/RabbitMqQueueStoreAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace Da\Mailer\Queue\Backend\RabbitMq;

use Da\Mailer\Helper\ConfigReader;
use Da\Mailer\Queue\Backend\MailJobInterface;
use Da\Mailer\Queue\Backend\QueueStoreAdapterInterface;
use PhpAmqpLib\Channel\AMQPChannel;
Expand Down Expand Up @@ -111,10 +112,10 @@ public function ack(MailJobInterface $mailJob)
{
/** @var AMQPChannel $chanel */
$chanel = $this->getConnection()->getInstance();

$mailJob->incrementAttempt();
$config = self::getConfig();

if ($mailJob->isCompleted() || $mailJob->getAttempt() > $_ENV['MAX_ATTEMPTS_DEFAULT']) {
if ($mailJob->isCompleted() || $mailJob->getAttempt() > $config['max-attempts']) {
$chanel->basic_ack($mailJob->getDeliveryTag(), false);
return;
}
Expand Down Expand Up @@ -148,4 +149,9 @@ protected function createPayload(MailJobInterface $mailJob)
]);
}

protected static function getConfig()
{
return ConfigReader::get();
}

}
9 changes: 8 additions & 1 deletion src/Queue/Backend/Redis/RedisQueueStoreAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace Da\Mailer\Queue\Backend\Redis;

use Da\Mailer\Exception\InvalidCallException;
use Da\Mailer\Helper\ConfigReader;
use Da\Mailer\Queue\Backend\MailJobInterface;
use Da\Mailer\Queue\Backend\QueueStoreAdapterInterface;
use phpseclib3\Crypt\Random;
Expand Down Expand Up @@ -95,6 +96,7 @@ public function dequeue()
*/
public function ack(MailJobInterface $mailJob)
{
$config = self::getConfig();
if ($mailJob->isNewRecord()) {
throw new InvalidCallException('RedisMailJob cannot be a new object to be acknowledged');
}
Expand All @@ -107,7 +109,7 @@ public function ack(MailJobInterface $mailJob)
}

$mailJob->incrementAttempt();
if ($mailJob->getAttempt() <= $_ENV['MAX_ATTEMPTS_DEFAULT']){
if ($mailJob->getAttempt() <= $config['max-attempts']){
$this->enqueue($mailJob);
}
}
Expand Down Expand Up @@ -221,4 +223,9 @@ protected function pushExpiredJobsOntoNewQueue($transaction, $to, $jobs)
call_user_func_array([$transaction, 'rpush'], array_merge([$to], $jobs));
}

protected static function getConfig()
{
return ConfigReader::get();
}

}
10 changes: 8 additions & 2 deletions src/Queue/Backend/Sqs/SqsQueueStoreAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace Da\Mailer\Queue\Backend\Sqs;

use Da\Mailer\Exception\InvalidCallException;
use Da\Mailer\Helper\ConfigReader;
use Da\Mailer\Queue\Backend\MailJobInterface;
use Da\Mailer\Queue\Backend\QueueStoreAdapterInterface;

Expand Down Expand Up @@ -92,7 +93,6 @@ public function dequeue()
'id' => $result['MessageId'],
'receiptHandle' => $result['ReceiptHandle'],
'message' => $result['Body'],
// 'attempt' => $result['Attempt'],
]);
}

Expand All @@ -103,6 +103,7 @@ public function dequeue()
*/
public function ack(MailJobInterface $mailJob)
{
$config = self::getConfig();
if ($mailJob->isNewRecord()) {
throw new InvalidCallException('SqsMailJob cannot be a new object to be acknowledged');
}
Expand All @@ -114,7 +115,7 @@ public function ack(MailJobInterface $mailJob)
'ReceiptHandle' => $mailJob->getReceiptHandle(),
]);

if (!$mailJob->getDeleted() && $mailJob->getAttempt() <= $_ENV['MAX_ATTEMPTS_DEFAULT']) {
if (!$mailJob->getDeleted() && $mailJob->getAttempt() <= $config['max-attempts']) {
$this->getConnection()->getInstance()->sendMessage([
'QueueUrl' => $this->queueUrl,
'MessageBody' => json_encode(['message' => $mailJob->getMessage(), 'attempt' => $mailJob->getAttempt()]),
Expand Down Expand Up @@ -147,4 +148,9 @@ public function isEmpty(): bool
return $response['Attributes']['ApproximateNumberOfMessages'] === 0;
}

protected static function getConfig()
{
return ConfigReader::get();
}

}

0 comments on commit 3e7e4b0

Please sign in to comment.