Skip to content

Commit

Permalink
* consumer logic implemented
Browse files Browse the repository at this point in the history
  • Loading branch information
ssserj committed Sep 1, 2011
1 parent 32aea78 commit fd4091e
Show file tree
Hide file tree
Showing 8 changed files with 514 additions and 80 deletions.
19 changes: 15 additions & 4 deletions doc/ChangeLog
Original file line number Diff line number Diff line change
@@ -1,9 +1,20 @@
2011-09-01 Sergey S. Sergeev

* main/Utils/AMQP/AMQPQueueConsumer.class.php,
main/Utils/AMQP/AMQPConsumer.class.php,
main/Utils/AMQP/AMQPDefaultConsumer.class.php,
main/Utils/AMQP/AMQPChannelInterface.class.php,
main/Utils/AMQP/Pecl/AMQPPeclChannel.class.php:
consumer logic implemented, phpDoc updated.

* main/Utils/AMQP/AMQPIncomingMessage.class.php: typo fixed.

2011-08-26 Sergey S. Sergeev

* /main/Utils/AMQP/AMQPChannelInterface.class.php,
/main/Utils/AMQP/AMQPBaseChannel.class.php, /main/Utils/AMQP/AMQP.class.php,
/main/Utils/AMQP/Pecl/AMQPPeclChannel.class.php,
/main/Utils/AMQP/Pecl/AMQPPecl.class.php, /main/Utils/AMQP/Exceptions:
* main/Utils/AMQP/AMQPChannelInterface.class.php,
main/Utils/AMQP/AMQPBaseChannel.class.php, main/Utils/AMQP/AMQP.class.php,
main/Utils/AMQP/Pecl/AMQPPeclChannel.class.php,
main/Utils/AMQP/Pecl/AMQPPecl.class.php, main/Utils/AMQP/Exceptions:
extract AMQPChannelInterface, all channel classes implements
AMQPChannelInterface; methods returns an instance of AMQPChannelInterface;
throws AMQPServerException, AMQPServerConnectionException if RabbitMQ
Expand Down
7 changes: 6 additions & 1 deletion main/Utils/AMQP/AMQPChannelInterface.class.php
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,16 @@ public function basicAck($deliveryTag, $multiple = false);
/**
* @return AMQPChannelInterface
**/
public function basicConsume($queue, /*Consumer*/ $callback);
public function basicConsume($queue, $autoAck, AMQPConsumer $callback);

/**
* @return AMQPChannelInterface
**/
public function basicCancel($consumerTag);

/**
* @return AMQPIncomingMessage
**/
public function getNextDelivery();
}
?>
79 changes: 79 additions & 0 deletions main/Utils/AMQP/AMQPConsumer.class.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
<?php
/***************************************************************************
* Copyright (C) 2011 by Sergey S. Sergeev *
* *
* This program is free software; you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation; either version 2 of the License, or *
* (at your option) any later version. *
* *
***************************************************************************/

interface AMQPConsumer
{
/**
* @return AMQPChannelInterface
**/
public function getChannel();

/**
* Called when a delivery appears for this consumer.
* @param AMQPIncomingMessage $delivery
* @return void
**/
public function handleDelivery(AMQPIncomingMessage $delivery);

/**
* Called when the consumer is first registered by a call
* to {@link Channel#basicConsume}.
*
* @param consumerTag the defined consumerTag
* @return void
**/
public function handleConsumeOk($consumerTag);

/**
* Called when the consumer is deregistered by a call
* to {@link Channel#basicCancel}.
*
* @param consumerTag the defined consumerTag
* @return void
**/
public function handleCancelOk($consumerTag);

/**
* @return AMQPConsumer
**/
public function setQueueName($name);

/**
* @return string
**/
public function getQueueName();

/**
* @return AMQPConsumer
**/
public function setAutoAcknowledge($boolean);

/**
* @return boolean
**/
public function isAutoAcknowledge();

/**
* @return AMQPConsumer
**/
public function setConsumerTag($consumerTag);

/**
* @return string
**/
public function getConsumerTag();

/**
* @return AMQPIncomingMessage
**/
public function getNextDelivery();
}
?>
116 changes: 116 additions & 0 deletions main/Utils/AMQP/AMQPDefaultConsumer.class.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
<?php
/***************************************************************************
* Copyright (C) 2011 by Sergey S. Sergeev *
* *
* This program is free software; you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation; either version 2 of the License, or *
* (at your option) any later version. *
* *
***************************************************************************/

abstract class AMQPDefaultConsumer implements AMQPConsumer
{
/**
* @var AMQPChannelInterface
**/
protected $channel = null;
protected $consumerTag = null;
protected $autoAcknowledge = false;
protected $queueName = null;

public function __construct(AMQPChannelInterface $channel)
{
$this->channel = $channel;
}

/**
* @return AMQPChannelInterface
**/
public function getChannel()
{
return $this->channel;
}

/**
* @param $consumerTag
* @return AMQPConsumer
**/
public function setConsumerTag($consumerTag)
{
$this->consumerTag = $consumerTag;

return $this;
}

public function getConsumerTag()
{
return $this->consumerTag;
}

/**
* @return void
**/
public function handleConsumeOk($consumerTag)
{
// no work to do
}

/**
* @return void
**/
public function handleCancelOk($consumerTag)
{
// no work to do
}

/**
* @return void
**/
public function handleDelivery(AMQPIncomingMessage $delivery)
{
// no work to do
}

/**
* @return AMQPDefaultConsumer
**/
public function setQueueName($name)
{
$this->queueName = $name;

return $this;
}

/**
* @return string
**/
public function getQueueName()
{
return $this->queueName;
}

/**
* @return AMQPDefaultConsumer
**/
public function setAutoAcknowledge($boolean)
{
$this->autoAcknowledge = ($boolean === true);

return $this;
}

public function isAutoAcknowledge()
{
return $this->autoAcknowledge;
}

/**
* @return AMQPIncomingMessage
**/
public function getNextDelivery()
{
return $this->channel->getNextDelivery();
}
}
?>
17 changes: 11 additions & 6 deletions main/Utils/AMQP/AMQPIncomingMessage.class.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ final class AMQPIncomingMessage extends AMQPBaseMessage
protected $consumerTag = null;

protected static $mandatoryFields = array(
self::COUNT, self::ROUTING_KEY,
self::DELIVERY_TAG, self::EXCHANGE
self::ROUTING_KEY, self::DELIVERY_TAG, self::EXCHANGE
);

/**
Expand Down Expand Up @@ -150,7 +149,11 @@ protected function fill(array $assoc)
{
$this->checkMandatory($assoc);

$this->setCount($assoc[self::COUNT]);
if (isset($assoc[self::COUNT])) {
$this->setCount($assoc[self::COUNT]);
unset($assoc[self::COUNT]);
}

$this->setRoutingKey($assoc[self::ROUTING_KEY]);
$this->setDeliveryTag($assoc[self::DELIVERY_TAG]);
$this->setExchange($assoc[self::EXCHANGE]);
Expand All @@ -171,13 +174,15 @@ protected function fill(array $assoc)
}

if (isset($assoc[self::REDELIVERED])) {
$this->setConsumerTag($assoc[self::REDELIVERED]);
$this->setRedelivered($assoc[self::REDELIVERED]);
unset($assoc[self::REDELIVERED]);
}

//unset mandatory
unset(
$assoc[self::COUNT], $assoc[self::ROUTING_KEY],
$assoc[self::DELIVERY_TAG], $assoc[self::EXCHANGE]
$assoc[self::ROUTING_KEY],
$assoc[self::DELIVERY_TAG],
$assoc[self::EXCHANGE]
);

$this->setProperties($assoc);
Expand Down
13 changes: 13 additions & 0 deletions main/Utils/AMQP/AMQPQueueConsumer.class.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<?php
/***************************************************************************
* Copyright (C) 2011 by Sergey S. Sergeev *
* *
* This program is free software; you can redistribute it and/or modify *
* it under the terms of the GNU General Public License as published by *
* the Free Software Foundation; either version 2 of the License, or *
* (at your option) any later version. *
* *
***************************************************************************/

class AMQPQueueConsumer extends AMQPDefaultConsumer {/**/}
?>
Loading

0 comments on commit fd4091e

Please sign in to comment.