A lightweight message queue for PHP that requires no dedicated queue server. Just a Redis server. See smrchy/rsmq for more information.
This is a fork of eislambey/php-rsmq with the following changes:
- Uses predis instead of the Redis extension
- Has some OO wrappers for QueueAttributes and Message
- Provides a simple QueueWorker
composer require andrewbreksa/rsmq
Creates a new instance of RSMQ.
Parameters:
$predis
(\Predis\ClientInterface): *required The Predis instance$ns
(string): optional (Default: "rsmq") The namespace prefix used for all keys created by RSMQ$realtime
(Boolean): optional (Default: false) Enable realtime PUBLISH of new messages
Example:
<?php
use Predis\Client;
use AndrewBreksa\RSMQ\RSMQClient;
$predis = new Client(
[
'host' => '127.0.0.1',
'port' => 6379
]
);
$this->rsmq = new RSMQClient($predis);
Create a new queue.
Parameters:
$name
(string): The Queue name. Maximum 160 characters; alphanumeric characters, hyphens (-), and underscores (_) are allowed.$vt
(int): optional (Default: 30) The length of time, in seconds, that a message received from a queue will be invisible to other receiving components when they ask to receive messages. Allowed values: 0-9999999 (around 115 days)$delay
(int): optional (Default: 0) The time in seconds that the delivery of all new messages in the queue will be delayed. Allowed values: 0-9999999 (around 115 days)$maxsize
(int): optional (Default: 65536) The maximum message size in bytes. Allowed values: 1024-65536 and -1 (for unlimited size)
Returns:
true
(Bool)
Throws:
\AndrewBreksa\RSMQ\Exceptions\QueueAlreadyExistsException
Example:
<?php
/**
* @var AndrewBreksa\RSMQ\RSMQClientInterface $rsmq
*/
$rsmq->createQueue('myqueue');
List all queues
Returns an array:
["qname1", "qname2"]
Example:
<?php
/**
* @var AndrewBreksa\RSMQ\RSMQClientInterface $rsmq
*/
$queues = $rsmq->listQueues();
Deletes a queue and all messages.
Parameters:
$name
(string): The Queue name.
Returns:
true
(Bool)
Throws:
\AndrewBreksa\RSMQ\Exceptions\QueueNotFoundException
Example:
<?php
/**
* @var AndrewBreksa\RSMQ\RSMQClientInterface $rsmq
*/
$rsmq->deleteQueue('myqueue');
Get queue attributes, counter and stats
Parameters:
$queue
(string): The Queue name.
Returns a \AndrewBreksa\RSMQ\QueueAttributes
object with the following properties:
vt
(int): The visibility timeout for the queue in secondsdelay
(int): The delay for new messages in secondsmaxSize
(int): The maximum size of a message in bytestotalReceived
(int): Total number of messages received from the queuetotalSent
(int): Total number of messages sent to the queuecreated
(float): Timestamp (epoch in seconds) when the queue was createdmodified
(float): Timestamp (epoch in seconds) when the queue was last modified withsetQueueAttributes
messageCount
(int): Current number of messages in the queuehiddenMessageCount
(int): Current number of hidden / not visible messages. A message can be hidden while "in flight" due to avt
parameter or when sent with adelay
Example:
<?php
/**
* @var AndrewBreksa\RSMQ\RSMQClientInterface $rsmq
*/
$attributes = $rsmq->getQueueAttributes('myqueue');
echo "visibility timeout: ", $attributes->getVt(), "\n";
echo "delay for new messages: ", $attributes->getDelay(), "\n";
echo "max size in bytes: ", $attributes->getMaxSize(), "\n";
echo "total received messages: ", $attributes->getTotalReceived(), "\n";
echo "total sent messages: ", $attributes->getTotalSent(), "\n";
echo "created: ", $attributes->getCreated(), "\n";
echo "last modified: ", $attributes->getModified(), "\n";
echo "current n of messages: ", $attributes->getMessageCount(), "\n";
echo "hidden messages: ", $attributes->getHiddenMessageCount(), "\n";
Sets queue parameters.
Parameters:
$queue
(string): The Queue name.$vt
(int): optional * The length of time, in seconds, that a message received from a queue will be invisible to other receiving components when they ask to receive messages. Allowed values: 0-9999999 (around 115 days)$delay
(int): optional The time in seconds that the delivery of all new messages in the queue will be delayed. Allowed values: 0-9999999 (around 115 days)$maxsize
(int): optional The maximum message size in bytes. Allowed values: 1024-65536 and -1 (for unlimited size)
Note: At least one attribute (vt, delay, maxsize) must be supplied. Only attributes that are supplied will be modified.
Returns a \AndrewBreksa\RSMQ\QueueAttributes
object with the following properties:
vt
(int): The visibility timeout for the queue in secondsdelay
(int): The delay for new messages in secondsmaxSize
(int): The maximum size of a message in bytestotalReceived
(int): Total number of messages received from the queuetotalSent
(int): Total number of messages sent to the queuecreated
(float): Timestamp (epoch in seconds) when the queue was createdmodified
(float): Timestamp (epoch in seconds) when the queue was last modified withsetQueueAttributes
messageCount
(int): Current number of messages in the queuehiddenMessageCount
(int): Current number of hidden / not visible messages. A message can be hidden while "in flight" due to avt
parameter or when sent with adelay
Throws:
\AndrewBreksa\RSMQ\QueueAttributes
\AndrewBreksa\RSMQ\QueueParametersValidationException
\AndrewBreksa\RSMQ\QueueNotFoundException
Example:
<?php
/**
* @var AndrewBreksa\RSMQ\RSMQClientInterface $rsmq
*/
$queue = 'myqueue';
$vt = 50;
$delay = 10;
$maxsize = 2048;
$rsmq->setQueueAttributes($queue, $vt, $delay, $maxsize);
Sends a new message.
Parameters:
$queue
(string)$message
(string)$delay
(int): optional (Default: queue settings) The time in seconds that the delivery of the message will be delayed. Allowed values: 0-9999999 (around 115 days)
Returns:
$id
(string): The internal message id.
Throws:
\AndrewBreksa\RSMQ\Exceptions\MessageToLongException
\AndrewBreksa\RSMQ\Exceptions\QueueNotFoundException
\AndrewBreksa\RSMQ\Exceptions\QueueParametersValidationException
Example:
<?php
/**
* @var AndrewBreksa\RSMQ\RSMQClientInterface $rsmq
*/
$id = $rsmq->sendMessage('myqueue', 'a message');
echo "Message Sent. ID: ", $id;
Receive the next message from the queue.
Parameters:
$queue
(string): The Queue name.$vt
(int): optional (Default: queue settings) The length of time, in seconds, that the received message will be invisible to others. Allowed values: 0-9999999 (around 115 days)
Returns a \AndrewBreksa\RSMQ\Message
object with the following properties:
message
(string): The message's contents.id
(string): The internal message id.sent
(int): Timestamp of when this message was sent / created.firstReceived
(int): Timestamp of when this message was first received.receiveCount
(int): Number of times this message was received.
Note: Will return an empty array if no message is there
Throws:
\AndrewBreksa\RSMQ\Exceptions\QueueNotFoundException
\AndrewBreksa\RSMQ\Exceptions\QueueParametersValidationException
Example:
<?php
/**
* @var AndrewBreksa\RSMQ\RSMQClientInterface $rsmq
*/
$message = $rsmq->receiveMessage('myqueue');
echo "Message ID: ", $message->getId();
echo "Message: ", $message->getMessage();
Parameters:
$queue
(string): The Queue name.$id
(string): message id to delete.
Returns:
true
if successful,false
if the message was not found (bool).
Throws:
\AndrewBreksa\RSMQ\Exceptions\QueueParametersValidationException
Example:
<?php
/**
* @var AndrewBreksa\RSMQ\RSMQClientInterface $rsmq
*/
$id = $rsmq->sendMessage('queue', 'a message');
$rsmq->deleteMessage('queue', $id);
Receive the next message from the queue and delete it.
Important: This method deletes the message it receives right away. There is no way to receive the message again if something goes wrong while working on the message.
Parameters:
$queue
(string): The Queue name.
Returns a \AndrewBreksa\RSMQ\Message
object with the following properties:
message
(string): The message's contents.id
(string): The internal message id.sent
(int): Timestamp of when this message was sent / created.firstReceived
(int): Timestamp of when this message was first received.receiveCount
(int): Number of times this message was received.
Note: Will return an empty object if no message is there
Throws:
\AndrewBreksa\RSMQ\Exceptions\QueueNotFoundException
\AndrewBreksa\RSMQ\Exceptions\QueueParametersValidationException
Example:
<?php
/**
* @var AndrewBreksa\RSMQ\RSMQClientInterface $rsmq
*/
$message = $rsmq->popMessage('myqueue');
echo "Message ID: ", $message->getId();
echo "Message: ", $message->getMessage();
Change the visibility timer of a single message. The time when the message will be visible again is calculated from the
current time (now) + vt
.
Parameters:
qname
(string): The Queue name.id
(string): The message id.vt
(int): The length of time, in seconds, that this message will not be visible. Allowed values: 0-9999999 (around 115 days)
Returns:
true
if successful,false
if the message was not found (bool).
Throws:
\AndrewBreksa\RSMQ\Exceptions\QueueParametersValidationException
\AndrewBreksa\RSMQ\Exceptions\QueueNotFoundException
Example:
<?php
/**
* @var AndrewBreksa\RSMQ\RSMQClientInterface $rsmq
*/
$queue = 'myqueue';
$id = $rsmq->sendMessage($queue, 'a message');
if($rsmq->changeMessageVisibility($queue, $id, 60)) {
echo "Message hidden for 60 secs";
}
The QueueWorker class provides an easy way to consume RSMQ messages, to use it:
<?php
/**
* @var AndrewBreksa\RSMQ\RSMQClientInterface $rsmq
*/
use AndrewBreksa\RSMQ\ExecutorInterface;
use AndrewBreksa\RSMQ\Message;
use AndrewBreksa\RSMQ\QueueWorker;
use AndrewBreksa\RSMQ\WorkerSleepProvider;
$executor = new class() implements ExecutorInterface{
public function __invoke(Message $message) : bool {
//@todo: do some work, true will ack/delete the message, false will allow the queue's config to "re-publish"
return true;
}
};
$sleepProvider = new class() implements WorkerSleepProvider{
public function getSleep() : ?int {
/**
* This allows you to return null to stop the worker, which can be used with something like redis to mark.
*
* Note that this method is called _before_ we poll for a message, and therefore if it returns null we'll eject
* before we process a message.
*/
return 1;
}
};
$worker = new QueueWorker($rsmq, $executor, $sleepProvider, 'test_queue');
$worker->work(); // here we can optionally pass true to only process one message
The MIT LICENSE. See LICENSE