Skip to content

Commit

Permalink
Merge pull request #1 from mikk150/master
Browse files Browse the repository at this point in the history
Controller
  • Loading branch information
vksee committed Jan 19, 2016
2 parents 921769c + 0f5e836 commit 8241025
Show file tree
Hide file tree
Showing 9 changed files with 536 additions and 1 deletion.
61 changes: 60 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,61 @@
# yii2-queue
Yii 2.0 Queue Extension
=========================

This extension provides queue handler for the [Yii framework 2.0](http://www.yiiframework.com).

Installation
------------

The preferred way to install this extension is through [composer](http://getcomposer.org/download/).

Either run

```
php composer.phar require yiisoft/yii2-queue
```

or add

```json
"yiisoft/yii2-queue": "dev-master"
```

to the require section of your composer.json.


Usage
-----

To use this extension, simply add the following code in your application configuration:

####SqlQueue

```php
'components' => [
'queue' => [
'class' => 'yii\queue\SqlQueue',
],
],
```

####RedisQueue

```php
'components' => [
'queue' => [
'class' => 'yii\queue\RedisQueue',
'redis' => '(your redis client)'
],
],
```

####SqsQueue

```php
'components' => [
'queue' => [
'class' => 'yii\queue\SqsQueue',
'sqs' => '(your sqs client)'
],
],
```
144 changes: 144 additions & 0 deletions SqlQueue.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
<?php

namespace yii\queue;

use yii\base\InvalidConfigException;
use yii\db\Connection;
use yii\db\Query;
use yii\base\Component;
use Yii;
use yii\helpers\Json;

class SqlQueue extends Component implements QueueInterface
{
/**
* @var string Default database connection component name
*/
public $connection = 'db';

/**
* @var string Default queue table namespace
*/
public $default = 'default';

public function init()
{
parent::init();
if (is_string($this->connection)) {
$this->connection = Yii::$app->get($this->connection);
} elseif (is_array($this->connection)) {
if (!isset($this->connection['class'])) {
$this->connection['class'] = Connection::className();
}
$this->connection = Yii::createObject($this->connection);
}

if (!$this->connection instanceof Connection) {
throw new InvalidConfigException('Queue::connection must be application component ID of a SQL connection.');
}

if (!$this->hasTable()) {
$this->createTable();
}
}

private function hasTable()
{
$schema = $this->connection->schema->getTableSchema($this->getTableName(), true);
if ($schema == null) {
return false;
}

return true;
}

private function createTable()
{
$this->connection->createCommand()->createTable($this->getTableName(), [
'id' => 'pk',
'queue' => 'string(255)',
'run_at' => 'INTEGER NOT NULL',
'payload' => 'text',
])->execute();
$this->connection->schema->refresh();
}

public function dropTable()
{
$this->connection->createCommand()->dropTable($this->getTableName())->execute();
}

private function getTableName()
{
return $this->default.'_queue';
}

/**
* {@inheritdoc}
*/
public function push($payload, $queue = null, $delay = 0)
{
$this->connection->schema->insert($this->getTableName(), [
'queue' => $queue,
'payload' => Json::encode($payload),
'run_at' => time() + $delay,
]);

return $this->connection->lastInsertID;
}

private function getQuery($queue)
{
$query = new Query();
$query->from($this->getTableName())
->andFilterWhere(['queue' => $queue])
->andWhere('run_at <= :timestamp', ['timestamp' => time()])
->limit(1);

return $query;
}

/**
* {@inheritdoc}
*/
public function delete(array $message)
{
$this->connection->createCommand()->delete($this->getTableName(), 'id=:id', [':id' => $message['id']])->execute();
}

/**
* {@inheritdoc}
*/
public function pop($queue = null)
{
$row = $this->getQuery($queue)->one($this->connection);
if ($row) {
$row['body'] = Json::decode($row['payload']);

return $row;
}

return false;
}

/**
* {@inheritdoc}
*/
public function purge($queue)
{
$this->connection->createCommand()->delete($this->getTableName(), 'queue=:queue', [':queue' => $queue])->execute();
}

/**
* {@inheritdoc}
*/
public function release(array $message, $delay = 0)
{
$this->connection->createCommand()->update(
$this->getTableName(),
['run_at' => time() + $delay],
'id = :id',
['id' => $message['id']]
)->execute();
}
}
21 changes: 21 additions & 0 deletions WorkerThread.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<?php

namespace yii\queue;

/**
*
*/
class WorkerThread extends Thread
{
private $_job;

public function __construct(ActiveJob $job)
{
$this->_job = $job;
}

public function run()
{
$job->run();
}
}
99 changes: 99 additions & 0 deletions controllers/QueueController.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
<?php

namespace yii\queue\controllers;

use Yii;
use yii\console\Controller;

/**
* Queue Process Command
*
* Class QueueController
* @package yii\queue\controllers
*/
class QueueController extends Controller
{

private $_timeout;
private $_sleep=5;

/**
* Process a job
*
* @param string $queueName
* @param string $queueObjectName
* @throws \Exception
*/
public function actionWork($queueName = null, $queueObjectName = 'queue')
{
$this->process($queueName, $queueObjectName);
}

/**
* Continuously process jobs
*
* @param string $queueName
* @param string $queueObjectName
*
* @return bool
* @throws \Exception
*/
public function actionListen($queueName = null, $queueObjectName = 'queue')
{
while (true) {
if ($this->_timeout !==null) {
if ($this->_timeout<time()) {
return true;
}
}
if (!$this->process($queueName, $queueObjectName)) {
sleep($this->_sleep);
}

}
}

/**
* Process one unit of job in queue
*
* @param $queueName
* @param $queueObjectName
*
* @return bool
*/
protected function process($queueName, $queueObjectName)
{
$queue = Yii::$app->{$queueObjectName};
$job = $queue->pop($queueName);

if ($job) {
try {
$jobObject = call_user_func($job['body']['serializer'][1], $job['body']['object']);
$queue->delete($job);
$jobObject->run();
return true;
} catch (\Exception $e) {
Yii::error($e->getMessage(), __METHOD__);
}
}
return false;
}

/**
* @inheritdoc
*/
public function beforeAction($action)
{
if (!parent::beforeAction($action)) {
return false;
}

if (getenv('QUEUE_TIMEOUT')) {
$this->_timeout=(int)getenv('QUEUE_TIMEOUT')+time();
}
if (getenv('QUEUE_SLEEP')) {
$this->_sleep=(int)getenv('QUEUE_SLEEP');
}
return true;
}
}
35 changes: 35 additions & 0 deletions controllers/ThreadedController.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
<?php

namespace yii\queue\controllers;

use yii\queue\WorkerThread;

/**
*
*/
class ThreadedController extends QueueController
{
protected function process($queueName, $queueObjectName)
{
$queue = Yii::$app->{$queueObjectName};
$job = $queue->pop($queueName);

if ($job) {
try {
$jobObject = call_user_func($job['body']['serializer'][1], $job['body']['object']);
$worker = new WorkerThread($jobObject);

$worker->run();
$queue->delete($job);

//$jobObject->run();

return true;
} catch (\Exception $e) {
Yii::error($e->getMessage(), __METHOD__);
}
}

return false;
}
}
Loading

0 comments on commit 8241025

Please sign in to comment.