From e2871a611506bd8b11efe3355537af6330d5c0fe Mon Sep 17 00:00:00 2001 From: Kaupo Juhkam Date: Wed, 30 Dec 2015 14:19:06 +0200 Subject: [PATCH 1/6] Added SQL queue and controller --- SqlQueue.php | 152 ++++++++++++++++++++++++++++++++ controllers/QueueController.php | 90 +++++++++++++++++++ 2 files changed, 242 insertions(+) create mode 100644 SqlQueue.php create mode 100644 controllers/QueueController.php diff --git a/SqlQueue.php b/SqlQueue.php new file mode 100644 index 0000000..c5e5999 --- /dev/null +++ b/SqlQueue.php @@ -0,0 +1,152 @@ +connection)) { + $this->connection = Yii::$app->get($this->connection); + } elseif (is_array($this->connection)) { + if (!isset($this->connection['class'])) { + $this->connection['class'] = Connection::className(); + } + $this->connection = Yii::createObject($this->connection); + } + + if (!$this->connection instanceof Connection) { + throw new InvalidConfigException("Queue::connection must be application component ID of a SQL connection."); + } + + if (!$this->hasTable()) { + $this->createTable(); + } + } + + private function hasTable() + { + $schema=$this->connection->schema->getTableSchema($this->getTableName(), true); + if ($schema==null) { + return false; + } + if ($schema->columns['id']->comment!=='1.0.0') { + $this->dropTable(); + return false; + } + return true; + } + + private function createTable() + { + $this->connection->createCommand()->createTable($this->getTableName(), [ + 'id' => 'pk COMMENT "1.0.0"', + 'queue' => 'string(255)', + 'run_at' => 'timestamp default CURRENT_TIMESTAMP NOT NULL', + 'payload' => 'text', + ])->execute(); + $this->connection->schema->refresh(); + } + + public function dropTable() + { + $this->connection->createCommand()->dropTable($this->getTableName())->execute(); + } + + private function getTableName() + { + return $this->default.'_queue'; + } + + /** + * @inheritdoc + */ + public function push($payload, $queue = null, $delay = 0) + { + $this->connection->schema->insert($this->getTableName(), [ + 'queue' => $this->getQueue($queue), + 'payload' => $payload, + 'run_at' => new Expression('FROM_UNIXTIME(:unixtime)', [ + ':unixtime' => time() + $delay, + ]) + ]); + return $this->connection->lastInsertID; + } + + private function getQuery($queue) + { + if ($this->_query) { + return $this->_query; + } + + $this->_query=new Query; + $this->_query->select('id, payload') + ->from($this->getTableName()) + ->where(['queue'=>$queue]) + ->andWhere('run_at <= NOW()') + ->limit(1); + + return $this->_query; + } + + /** + * @inheritdoc + */ + public function delete(array $message) + { + $this->connection->createCommand()->delete($this->getTableName(), 'id=:id', [':id'=>$message['id']])->execute(); + } + + /** + * @inheritdoc + */ + public function pop($queue = null) + { + $row=$this->getQuery($this->getQueue($queue))->one($this->connection); + if ($row) { + return $row['payload']; + } + return false; + } + + /** + * @inheritdoc + */ + public function purge($queue) + { + $this->connection->createCommand()->delete($this->getTableName(), 'queue=:queue', [':queue'=>$queue])->execute(); + } + + /** + * @inheritdoc + */ + public function release(array $message, $delay = 0) + { + $this->connection->createCommand()->update( + $this->getTableName(), + ['run_at' => new Expression('DATE_ADD(NOW(), INTERVAL :delay SECOND)', ['delay' => $delay])], + 'id = :id', + ['id' => $message['id']] + )->execute(); + } +} diff --git a/controllers/QueueController.php b/controllers/QueueController.php new file mode 100644 index 0000000..b639a58 --- /dev/null +++ b/controllers/QueueController.php @@ -0,0 +1,90 @@ +process($queueName, $queueObjectName); + } + + /** + * Continuously process jobs + * + * @param string $queueName + * @param string $queueObjectName + * @throws \Exception + */ + public function actionListen($queueName = null, $queueObjectName = 'queue') + { + while (true) { + if ($this->timeout !==null) { + if ($this->timeoutprocess($queueName, $queueObjectName)) { + sleep($this->sleep); + } + + } + } + + protected function process($queueName, $queueObjectName) + { + $queue = Yii::$app->{$queueObjectName}; + $job = $queue->pop($queueName); + + if ($job) { + try { + $jobObject = call_user_func($job['body']['serializer'][1], $job['body']['object']); + $queue->delete($job); + $jobObject->run(); + return true; + } catch (\Exception $e) { + if ($queue->debug) { + var_dump($e); + } + + Yii::error($e->getMessage(), __METHOD__); + } + } + return false; + } + + public function beforeAction($action) + { + if (!parent::beforeAction($action)) { + return false; + } + + if (getenv('QUEUE_TIMEOUT')) { + $this->timeout=(int)getenv('QUEUE_TIMEOUT')+time(); + } + if (getenv('QUEUE_SLEEP')) { + $this->sleep=(int)getenv('QUEUE_SLEEP'); + } + return true; + } +} \ No newline at end of file From cb378cdeb1072b124dbba6768c5f541b85132afc Mon Sep 17 00:00:00 2001 From: Kaupo Juhkam Date: Wed, 30 Dec 2015 16:54:40 +0200 Subject: [PATCH 2/6] added tests and fixed some major bugs --- SqlQueue.php | 37 ++++++--------- tests/SqlQueueTest.php | 101 +++++++++++++++++++++++++++++++++++++++++ tests/TestCase.php | 46 +++++++++++++++++++ tests/TestJob.php | 20 ++++++++ tests/bootstrap.php | 11 +++++ 5 files changed, 193 insertions(+), 22 deletions(-) create mode 100644 tests/SqlQueueTest.php create mode 100644 tests/TestCase.php create mode 100644 tests/TestJob.php create mode 100644 tests/bootstrap.php diff --git a/SqlQueue.php b/SqlQueue.php index c5e5999..bdf9e3e 100644 --- a/SqlQueue.php +++ b/SqlQueue.php @@ -8,6 +8,8 @@ use yii\db\Expression; use yii\base\Component; use Yii; +use yii\helpers\ArrayHelper; +use yii\helpers\Json; class SqlQueue extends Component implements QueueInterface { @@ -21,8 +23,6 @@ class SqlQueue extends Component implements QueueInterface */ public $default = 'default'; - private $_query; - public function init() { parent::init(); @@ -60,9 +60,9 @@ private function hasTable() private function createTable() { $this->connection->createCommand()->createTable($this->getTableName(), [ - 'id' => 'pk COMMENT "1.0.0"', + 'id' => 'pk', 'queue' => 'string(255)', - 'run_at' => 'timestamp default CURRENT_TIMESTAMP NOT NULL', + 'run_at' => 'INTEGER NOT NULL', 'payload' => 'text', ])->execute(); $this->connection->schema->refresh(); @@ -84,29 +84,22 @@ private function getTableName() public function push($payload, $queue = null, $delay = 0) { $this->connection->schema->insert($this->getTableName(), [ - 'queue' => $this->getQueue($queue), - 'payload' => $payload, - 'run_at' => new Expression('FROM_UNIXTIME(:unixtime)', [ - ':unixtime' => time() + $delay, - ]) + 'queue' => $queue, + 'payload' => Json::encode($payload), + 'run_at' => time() + $delay, ]); return $this->connection->lastInsertID; } private function getQuery($queue) { - if ($this->_query) { - return $this->_query; - } - - $this->_query=new Query; - $this->_query->select('id, payload') - ->from($this->getTableName()) - ->where(['queue'=>$queue]) - ->andWhere('run_at <= NOW()') + $query=new Query; + $query->from($this->getTableName()) + ->andFilterWhere(['queue'=>$queue]) + ->andWhere('run_at <= :timestamp', ['timestamp' => time()]) ->limit(1); - return $this->_query; + return $query; } /** @@ -122,9 +115,9 @@ public function delete(array $message) */ public function pop($queue = null) { - $row=$this->getQuery($this->getQueue($queue))->one($this->connection); + $row=$this->getQuery($queue)->one($this->connection); if ($row) { - return $row['payload']; + return ArrayHelper::merge($row, json_decode($row['payload'])); } return false; } @@ -144,7 +137,7 @@ public function release(array $message, $delay = 0) { $this->connection->createCommand()->update( $this->getTableName(), - ['run_at' => new Expression('DATE_ADD(NOW(), INTERVAL :delay SECOND)', ['delay' => $delay])], + ['run_at' => time() + $delay], 'id = :id', ['id' => $message['id']] )->execute(); diff --git a/tests/SqlQueueTest.php b/tests/SqlQueueTest.php new file mode 100644 index 0000000..e71ed5a --- /dev/null +++ b/tests/SqlQueueTest.php @@ -0,0 +1,101 @@ +mockApplication([ + 'components' => [ + 'queue' => [ + 'class'=> 'yii\queue\SqlQueue' + ], + 'db' => [ + 'class' => 'yii\db\Connection', + 'dsn' => 'sqlite::memory:', + 'charset' => 'utf8', + ] + ] + ]); + } + + protected function pushJobToQueue($data = 'test', $delay = 0) + { + $job=new TestJob([ + 'data'=>$data + ]); + return $job->push($delay); + } + + public function testJobCreation() + { + $job=new TestJob([ + 'data'=>'test' + ]); + $this->assertEquals('test', $job->data); + } + + public function testJobInsertion() + { + $result=$this->pushJobToQueue(); + $resolvedJob=\Yii::$app->queue->pop(); + $this->assertEquals($result, $resolvedJob['id']); + } + + + public function testJobRun() + { + $this->pushJobToQueue(__FUNCTION__); + $job=\Yii::$app->queue->pop(); + $jobObject = call_user_func($job['serializer'][1], $job['object']); + $this->assertEquals(__FUNCTION__, $jobObject->data); + $this->assertEquals(__FUNCTION__, $jobObject->run()); + } + + public function testJobDelete() + { + $this->pushJobToQueue(__FUNCTION__); + $job=\Yii::$app->queue->pop(); + \Yii::$app->queue->delete($job); + + $this->assertFalse(\Yii::$app->queue->pop()); + } + + public function testDifferentQueueWontPopJob() + { + $this->pushJobToQueue(); + $this->assertFalse(\Yii::$app->queue->pop('hurrdurrImasheep')); + } + + public function testJobPurge() + { + for ($i = 0; $i <= 10; $i++) { + $this->pushJobToQueue(time()); + } + \Yii::$app->queue->purge('test'); + + $this->assertFalse(\Yii::$app->queue->pop()); + } + + public function testJobRelease() + { + $this->pushJobToQueue(__FUNCTION__); + $job = \Yii::$app->queue->pop(); + + $this->assertEquals(time(), $job['run_at']); + + \Yii::$app->queue->release($job, 200); + $this->assertFalse(\Yii::$app->queue->pop()); + } + + public function testJobInFutureDoesNotPopNow() + { + $this->pushJobToQueue(__FUNCTION__, 200); + $this->assertFalse(\Yii::$app->queue->pop()); + } +} diff --git a/tests/TestCase.php b/tests/TestCase.php new file mode 100644 index 0000000..adf982d --- /dev/null +++ b/tests/TestCase.php @@ -0,0 +1,46 @@ +destroyApplication(); + } + /** + * Populates Yii::$app with a new application + * The application will be destroyed on tearDown() automatically. + * @param array $config The application configuration, if needed + * @param string $appClass name of the application class to create + */ + protected function mockApplication($config = [], $appClass = '\yii\console\Application') + { + new $appClass(ArrayHelper::merge([ + 'id' => 'testapp', + 'basePath' => __DIR__, + 'vendorPath' => $this->getVendorPath(), + ], $config)); + } + protected function getVendorPath() + { + $vendor = dirname(dirname(__DIR__)) . '/vendor'; + if (!is_dir($vendor)) { + $vendor = dirname(dirname(dirname(dirname(__DIR__)))); + } + return $vendor; + } + /** + * Destroys application in Yii::$app by setting it to null. + */ + protected function destroyApplication() + { + \Yii::$app = null; + } +} \ No newline at end of file diff --git a/tests/TestJob.php b/tests/TestJob.php new file mode 100644 index 0000000..7f88538 --- /dev/null +++ b/tests/TestJob.php @@ -0,0 +1,20 @@ +data; + } +} \ No newline at end of file diff --git a/tests/bootstrap.php b/tests/bootstrap.php new file mode 100644 index 0000000..e11ee51 --- /dev/null +++ b/tests/bootstrap.php @@ -0,0 +1,11 @@ + Date: Wed, 30 Dec 2015 17:14:15 +0200 Subject: [PATCH 3/6] Update README.md --- README.md | 61 ++++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 60 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 4f2ade0..e605c1d 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,61 @@ -# yii2-queue Yii 2.0 Queue Extension +========================= + +This extension provides queue handler for the [Yii framework 2.0](http://www.yiiframework.com). + +Installation +------------ + +The preferred way to install this extension is through [composer](http://getcomposer.org/download/). + +Either run + +``` +php composer.phar require yiisoft/yii2-queue +``` + +or add + +```json +"yiisoft/yii2-queue": "dev-master" +``` + +to the require section of your composer.json. + + +Usage +----- + +To use this extension, simply add the following code in your application configuration: + +####SqlQueue + +```php +'components' => [ + 'queue' => [ + 'class' => 'yii\queue\SqlQueue', + ], +], +``` + +####RedisQueue + +```php +'components' => [ + 'queue' => [ + 'class' => 'yii\queue\RedisQueue', + 'redis' => '(your redis client)' + ], +], +``` + +####SqsQueue + +```php +'components' => [ + 'queue' => [ + 'class' => 'yii\queue\SqsQueue', + 'sqs' => '(your sqs client)' + ], +], +``` From 7db10925d243c09ff19ddfdcd060f1ffac7db9c6 Mon Sep 17 00:00:00 2001 From: Kaupo Juhkam Date: Wed, 30 Dec 2015 18:29:13 +0200 Subject: [PATCH 4/6] Fix SqlQueue::pop() not returning body added docblocks --- SqlQueue.php | 5 ++--- controllers/QueueController.php | 31 ++++++++++++++++++++----------- 2 files changed, 22 insertions(+), 14 deletions(-) diff --git a/SqlQueue.php b/SqlQueue.php index bdf9e3e..74604f4 100644 --- a/SqlQueue.php +++ b/SqlQueue.php @@ -5,10 +5,8 @@ use yii\base\InvalidConfigException; use yii\db\Connection; use yii\db\Query; -use yii\db\Expression; use yii\base\Component; use Yii; -use yii\helpers\ArrayHelper; use yii\helpers\Json; class SqlQueue extends Component implements QueueInterface @@ -117,7 +115,8 @@ public function pop($queue = null) { $row=$this->getQuery($queue)->one($this->connection); if ($row) { - return ArrayHelper::merge($row, json_decode($row['payload'])); + $row['body'] = Json::decode($row['payload']); + return $row; } return false; } diff --git a/controllers/QueueController.php b/controllers/QueueController.php index b639a58..a49aaca 100644 --- a/controllers/QueueController.php +++ b/controllers/QueueController.php @@ -14,8 +14,8 @@ class QueueController extends Controller { - private $timeout; - private $sleep=5; + private $_timeout; + private $_sleep=5; /** * Process a job @@ -34,23 +34,33 @@ public function actionWork($queueName = null, $queueObjectName = 'queue') * * @param string $queueName * @param string $queueObjectName + * + * @return bool * @throws \Exception */ public function actionListen($queueName = null, $queueObjectName = 'queue') { while (true) { - if ($this->timeout !==null) { - if ($this->timeout_timeout !==null) { + if ($this->_timeoutprocess($queueName, $queueObjectName)) { - sleep($this->sleep); + sleep($this->_sleep); } } } + /** + * Process one unit of job in queue + * + * @param $queueName + * @param $queueObjectName + * + * @return bool + */ protected function process($queueName, $queueObjectName) { $queue = Yii::$app->{$queueObjectName}; @@ -63,16 +73,15 @@ protected function process($queueName, $queueObjectName) $jobObject->run(); return true; } catch (\Exception $e) { - if ($queue->debug) { - var_dump($e); - } - Yii::error($e->getMessage(), __METHOD__); } } return false; } + /** + * @inheritdoc + */ public function beforeAction($action) { if (!parent::beforeAction($action)) { @@ -80,10 +89,10 @@ public function beforeAction($action) } if (getenv('QUEUE_TIMEOUT')) { - $this->timeout=(int)getenv('QUEUE_TIMEOUT')+time(); + $this->_timeout=(int)getenv('QUEUE_TIMEOUT')+time(); } if (getenv('QUEUE_SLEEP')) { - $this->sleep=(int)getenv('QUEUE_SLEEP'); + $this->_sleep=(int)getenv('QUEUE_SLEEP'); } return true; } From cb22940ccdd84f1bc92562663501109f88ca3870 Mon Sep 17 00:00:00 2001 From: Mikk Tendermann Date: Wed, 6 Jan 2016 19:15:50 +0200 Subject: [PATCH 5/6] fix starting queue module removes table --- tests/SqlQueueTest.php | 32 ++++++++++++++------------------ 1 file changed, 14 insertions(+), 18 deletions(-) diff --git a/tests/SqlQueueTest.php b/tests/SqlQueueTest.php index e71ed5a..aa18733 100644 --- a/tests/SqlQueueTest.php +++ b/tests/SqlQueueTest.php @@ -2,57 +2,53 @@ namespace yii\queue\tests; -use yii\db\Connection; -use yii\db\Query; - class SqlQueueTest extends TestCase { - public function setUp() { $this->mockApplication([ 'components' => [ 'queue' => [ - 'class'=> 'yii\queue\SqlQueue' + 'class' => 'yii\queue\SqlQueue', ], 'db' => [ 'class' => 'yii\db\Connection', 'dsn' => 'sqlite::memory:', 'charset' => 'utf8', - ] - ] + ], + ], ]); } protected function pushJobToQueue($data = 'test', $delay = 0) { - $job=new TestJob([ - 'data'=>$data + $job = new TestJob([ + 'data' => $data, ]); + return $job->push($delay); } public function testJobCreation() { - $job=new TestJob([ - 'data'=>'test' + $job = new TestJob([ + 'data' => 'test', ]); $this->assertEquals('test', $job->data); } public function testJobInsertion() { - $result=$this->pushJobToQueue(); - $resolvedJob=\Yii::$app->queue->pop(); + $result = $this->pushJobToQueue(); + $resolvedJob = \Yii::$app->queue->pop(); $this->assertEquals($result, $resolvedJob['id']); } - public function testJobRun() { $this->pushJobToQueue(__FUNCTION__); - $job=\Yii::$app->queue->pop(); - $jobObject = call_user_func($job['serializer'][1], $job['object']); + $job = \Yii::$app->queue->pop(); + $jobObject = call_user_func($job['body']['serializer'][1], $job['body']['object']); $this->assertEquals(__FUNCTION__, $jobObject->data); $this->assertEquals(__FUNCTION__, $jobObject->run()); } @@ -60,7 +56,7 @@ public function testJobRun() public function testJobDelete() { $this->pushJobToQueue(__FUNCTION__); - $job=\Yii::$app->queue->pop(); + $job = \Yii::$app->queue->pop(); \Yii::$app->queue->delete($job); $this->assertFalse(\Yii::$app->queue->pop()); @@ -74,7 +70,7 @@ public function testDifferentQueueWontPopJob() public function testJobPurge() { - for ($i = 0; $i <= 10; $i++) { + for ($i = 0; $i <= 10; ++$i) { $this->pushJobToQueue(time()); } \Yii::$app->queue->purge('test'); From 0f5e83615675885410b3a2a3c4fea1b6f3f603f1 Mon Sep 17 00:00:00 2001 From: Mikk Tendermann Date: Wed, 6 Jan 2016 19:16:03 +0200 Subject: [PATCH 6/6] try to implement treads --- SqlQueue.php | 34 ++++++++++++++--------------- WorkerThread.php | 21 ++++++++++++++++++ controllers/ThreadedController.php | 35 ++++++++++++++++++++++++++++++ tests/TestCase.php | 9 +++++--- tests/bootstrap.php | 6 ++--- 5 files changed, 82 insertions(+), 23 deletions(-) create mode 100644 WorkerThread.php create mode 100644 controllers/ThreadedController.php diff --git a/SqlQueue.php b/SqlQueue.php index 74604f4..93a3adc 100644 --- a/SqlQueue.php +++ b/SqlQueue.php @@ -34,7 +34,7 @@ public function init() } if (!$this->connection instanceof Connection) { - throw new InvalidConfigException("Queue::connection must be application component ID of a SQL connection."); + throw new InvalidConfigException('Queue::connection must be application component ID of a SQL connection.'); } if (!$this->hasTable()) { @@ -44,14 +44,11 @@ public function init() private function hasTable() { - $schema=$this->connection->schema->getTableSchema($this->getTableName(), true); - if ($schema==null) { - return false; - } - if ($schema->columns['id']->comment!=='1.0.0') { - $this->dropTable(); + $schema = $this->connection->schema->getTableSchema($this->getTableName(), true); + if ($schema == null) { return false; } + return true; } @@ -77,7 +74,7 @@ private function getTableName() } /** - * @inheritdoc + * {@inheritdoc} */ public function push($payload, $queue = null, $delay = 0) { @@ -86,14 +83,15 @@ public function push($payload, $queue = null, $delay = 0) 'payload' => Json::encode($payload), 'run_at' => time() + $delay, ]); + return $this->connection->lastInsertID; } private function getQuery($queue) { - $query=new Query; + $query = new Query(); $query->from($this->getTableName()) - ->andFilterWhere(['queue'=>$queue]) + ->andFilterWhere(['queue' => $queue]) ->andWhere('run_at <= :timestamp', ['timestamp' => time()]) ->limit(1); @@ -101,36 +99,38 @@ private function getQuery($queue) } /** - * @inheritdoc + * {@inheritdoc} */ public function delete(array $message) { - $this->connection->createCommand()->delete($this->getTableName(), 'id=:id', [':id'=>$message['id']])->execute(); + $this->connection->createCommand()->delete($this->getTableName(), 'id=:id', [':id' => $message['id']])->execute(); } /** - * @inheritdoc + * {@inheritdoc} */ public function pop($queue = null) { - $row=$this->getQuery($queue)->one($this->connection); + $row = $this->getQuery($queue)->one($this->connection); if ($row) { $row['body'] = Json::decode($row['payload']); + return $row; } + return false; } /** - * @inheritdoc + * {@inheritdoc} */ public function purge($queue) { - $this->connection->createCommand()->delete($this->getTableName(), 'queue=:queue', [':queue'=>$queue])->execute(); + $this->connection->createCommand()->delete($this->getTableName(), 'queue=:queue', [':queue' => $queue])->execute(); } /** - * @inheritdoc + * {@inheritdoc} */ public function release(array $message, $delay = 0) { diff --git a/WorkerThread.php b/WorkerThread.php new file mode 100644 index 0000000..65bc1eb --- /dev/null +++ b/WorkerThread.php @@ -0,0 +1,21 @@ +_job = $job; + } + + public function run() + { + $job->run(); + } +} diff --git a/controllers/ThreadedController.php b/controllers/ThreadedController.php new file mode 100644 index 0000000..8ec825a --- /dev/null +++ b/controllers/ThreadedController.php @@ -0,0 +1,35 @@ +{$queueObjectName}; + $job = $queue->pop($queueName); + + if ($job) { + try { + $jobObject = call_user_func($job['body']['serializer'][1], $job['body']['object']); + $worker = new WorkerThread($jobObject); + + $worker->run(); + $queue->delete($job); + + //$jobObject->run(); + + return true; + } catch (\Exception $e) { + Yii::error($e->getMessage(), __METHOD__); + } + } + + return false; + } +} diff --git a/tests/TestCase.php b/tests/TestCase.php index adf982d..1277841 100644 --- a/tests/TestCase.php +++ b/tests/TestCase.php @@ -1,4 +1,5 @@