diff --git a/README.md b/README.md index 4f2ade0..e605c1d 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,61 @@ -# yii2-queue Yii 2.0 Queue Extension +========================= + +This extension provides queue handler for the [Yii framework 2.0](http://www.yiiframework.com). + +Installation +------------ + +The preferred way to install this extension is through [composer](http://getcomposer.org/download/). + +Either run + +``` +php composer.phar require yiisoft/yii2-queue +``` + +or add + +```json +"yiisoft/yii2-queue": "dev-master" +``` + +to the require section of your composer.json. + + +Usage +----- + +To use this extension, simply add the following code in your application configuration: + +####SqlQueue + +```php +'components' => [ + 'queue' => [ + 'class' => 'yii\queue\SqlQueue', + ], +], +``` + +####RedisQueue + +```php +'components' => [ + 'queue' => [ + 'class' => 'yii\queue\RedisQueue', + 'redis' => '(your redis client)' + ], +], +``` + +####SqsQueue + +```php +'components' => [ + 'queue' => [ + 'class' => 'yii\queue\SqsQueue', + 'sqs' => '(your sqs client)' + ], +], +``` diff --git a/SqlQueue.php b/SqlQueue.php new file mode 100644 index 0000000..93a3adc --- /dev/null +++ b/SqlQueue.php @@ -0,0 +1,144 @@ +connection)) { + $this->connection = Yii::$app->get($this->connection); + } elseif (is_array($this->connection)) { + if (!isset($this->connection['class'])) { + $this->connection['class'] = Connection::className(); + } + $this->connection = Yii::createObject($this->connection); + } + + if (!$this->connection instanceof Connection) { + throw new InvalidConfigException('Queue::connection must be application component ID of a SQL connection.'); + } + + if (!$this->hasTable()) { + $this->createTable(); + } + } + + private function hasTable() + { + $schema = $this->connection->schema->getTableSchema($this->getTableName(), true); + if ($schema == null) { + return false; + } + + return true; + } + + private function createTable() + { + $this->connection->createCommand()->createTable($this->getTableName(), [ + 'id' => 'pk', + 'queue' => 'string(255)', + 'run_at' => 'INTEGER NOT NULL', + 'payload' => 'text', + ])->execute(); + $this->connection->schema->refresh(); + } + + public function dropTable() + { + $this->connection->createCommand()->dropTable($this->getTableName())->execute(); + } + + private function getTableName() + { + return $this->default.'_queue'; + } + + /** + * {@inheritdoc} + */ + public function push($payload, $queue = null, $delay = 0) + { + $this->connection->schema->insert($this->getTableName(), [ + 'queue' => $queue, + 'payload' => Json::encode($payload), + 'run_at' => time() + $delay, + ]); + + return $this->connection->lastInsertID; + } + + private function getQuery($queue) + { + $query = new Query(); + $query->from($this->getTableName()) + ->andFilterWhere(['queue' => $queue]) + ->andWhere('run_at <= :timestamp', ['timestamp' => time()]) + ->limit(1); + + return $query; + } + + /** + * {@inheritdoc} + */ + public function delete(array $message) + { + $this->connection->createCommand()->delete($this->getTableName(), 'id=:id', [':id' => $message['id']])->execute(); + } + + /** + * {@inheritdoc} + */ + public function pop($queue = null) + { + $row = $this->getQuery($queue)->one($this->connection); + if ($row) { + $row['body'] = Json::decode($row['payload']); + + return $row; + } + + return false; + } + + /** + * {@inheritdoc} + */ + public function purge($queue) + { + $this->connection->createCommand()->delete($this->getTableName(), 'queue=:queue', [':queue' => $queue])->execute(); + } + + /** + * {@inheritdoc} + */ + public function release(array $message, $delay = 0) + { + $this->connection->createCommand()->update( + $this->getTableName(), + ['run_at' => time() + $delay], + 'id = :id', + ['id' => $message['id']] + )->execute(); + } +} diff --git a/WorkerThread.php b/WorkerThread.php new file mode 100644 index 0000000..65bc1eb --- /dev/null +++ b/WorkerThread.php @@ -0,0 +1,21 @@ +_job = $job; + } + + public function run() + { + $job->run(); + } +} diff --git a/controllers/QueueController.php b/controllers/QueueController.php new file mode 100644 index 0000000..a49aaca --- /dev/null +++ b/controllers/QueueController.php @@ -0,0 +1,99 @@ +process($queueName, $queueObjectName); + } + + /** + * Continuously process jobs + * + * @param string $queueName + * @param string $queueObjectName + * + * @return bool + * @throws \Exception + */ + public function actionListen($queueName = null, $queueObjectName = 'queue') + { + while (true) { + if ($this->_timeout !==null) { + if ($this->_timeoutprocess($queueName, $queueObjectName)) { + sleep($this->_sleep); + } + + } + } + + /** + * Process one unit of job in queue + * + * @param $queueName + * @param $queueObjectName + * + * @return bool + */ + protected function process($queueName, $queueObjectName) + { + $queue = Yii::$app->{$queueObjectName}; + $job = $queue->pop($queueName); + + if ($job) { + try { + $jobObject = call_user_func($job['body']['serializer'][1], $job['body']['object']); + $queue->delete($job); + $jobObject->run(); + return true; + } catch (\Exception $e) { + Yii::error($e->getMessage(), __METHOD__); + } + } + return false; + } + + /** + * @inheritdoc + */ + public function beforeAction($action) + { + if (!parent::beforeAction($action)) { + return false; + } + + if (getenv('QUEUE_TIMEOUT')) { + $this->_timeout=(int)getenv('QUEUE_TIMEOUT')+time(); + } + if (getenv('QUEUE_SLEEP')) { + $this->_sleep=(int)getenv('QUEUE_SLEEP'); + } + return true; + } +} \ No newline at end of file diff --git a/controllers/ThreadedController.php b/controllers/ThreadedController.php new file mode 100644 index 0000000..8ec825a --- /dev/null +++ b/controllers/ThreadedController.php @@ -0,0 +1,35 @@ +{$queueObjectName}; + $job = $queue->pop($queueName); + + if ($job) { + try { + $jobObject = call_user_func($job['body']['serializer'][1], $job['body']['object']); + $worker = new WorkerThread($jobObject); + + $worker->run(); + $queue->delete($job); + + //$jobObject->run(); + + return true; + } catch (\Exception $e) { + Yii::error($e->getMessage(), __METHOD__); + } + } + + return false; + } +} diff --git a/tests/SqlQueueTest.php b/tests/SqlQueueTest.php new file mode 100644 index 0000000..aa18733 --- /dev/null +++ b/tests/SqlQueueTest.php @@ -0,0 +1,97 @@ +mockApplication([ + 'components' => [ + 'queue' => [ + 'class' => 'yii\queue\SqlQueue', + ], + 'db' => [ + 'class' => 'yii\db\Connection', + 'dsn' => 'sqlite::memory:', + 'charset' => 'utf8', + ], + ], + ]); + } + + protected function pushJobToQueue($data = 'test', $delay = 0) + { + $job = new TestJob([ + 'data' => $data, + ]); + + return $job->push($delay); + } + + public function testJobCreation() + { + $job = new TestJob([ + 'data' => 'test', + ]); + $this->assertEquals('test', $job->data); + } + + public function testJobInsertion() + { + $result = $this->pushJobToQueue(); + $resolvedJob = \Yii::$app->queue->pop(); + $this->assertEquals($result, $resolvedJob['id']); + } + + public function testJobRun() + { + $this->pushJobToQueue(__FUNCTION__); + $job = \Yii::$app->queue->pop(); + $jobObject = call_user_func($job['body']['serializer'][1], $job['body']['object']); + $this->assertEquals(__FUNCTION__, $jobObject->data); + $this->assertEquals(__FUNCTION__, $jobObject->run()); + } + + public function testJobDelete() + { + $this->pushJobToQueue(__FUNCTION__); + $job = \Yii::$app->queue->pop(); + \Yii::$app->queue->delete($job); + + $this->assertFalse(\Yii::$app->queue->pop()); + } + + public function testDifferentQueueWontPopJob() + { + $this->pushJobToQueue(); + $this->assertFalse(\Yii::$app->queue->pop('hurrdurrImasheep')); + } + + public function testJobPurge() + { + for ($i = 0; $i <= 10; ++$i) { + $this->pushJobToQueue(time()); + } + \Yii::$app->queue->purge('test'); + + $this->assertFalse(\Yii::$app->queue->pop()); + } + + public function testJobRelease() + { + $this->pushJobToQueue(__FUNCTION__); + $job = \Yii::$app->queue->pop(); + + $this->assertEquals(time(), $job['run_at']); + + \Yii::$app->queue->release($job, 200); + $this->assertFalse(\Yii::$app->queue->pop()); + } + + public function testJobInFutureDoesNotPopNow() + { + $this->pushJobToQueue(__FUNCTION__, 200); + $this->assertFalse(\Yii::$app->queue->pop()); + } +} diff --git a/tests/TestCase.php b/tests/TestCase.php new file mode 100644 index 0000000..1277841 --- /dev/null +++ b/tests/TestCase.php @@ -0,0 +1,49 @@ +destroyApplication(); + } + /** + * Populates Yii::$app with a new application + * The application will be destroyed on tearDown() automatically. + * + * @param array $config The application configuration, if needed + * @param string $appClass name of the application class to create + */ + protected function mockApplication($config = [], $appClass = '\yii\console\Application') + { + new $appClass(ArrayHelper::merge([ + 'id' => 'testapp', + 'basePath' => __DIR__, + 'vendorPath' => $this->getVendorPath(), + ], $config)); + } + protected function getVendorPath() + { + $vendor = dirname(dirname(__DIR__)).'/vendor'; + if (!is_dir($vendor)) { + $vendor = dirname(dirname(dirname(dirname(__DIR__)))); + } + + return $vendor; + } + /** + * Destroys application in Yii::$app by setting it to null. + */ + protected function destroyApplication() + { + \Yii::$app = null; + } +} diff --git a/tests/TestJob.php b/tests/TestJob.php new file mode 100644 index 0000000..7f88538 --- /dev/null +++ b/tests/TestJob.php @@ -0,0 +1,20 @@ +data; + } +} \ No newline at end of file diff --git a/tests/bootstrap.php b/tests/bootstrap.php new file mode 100644 index 0000000..8886d05 --- /dev/null +++ b/tests/bootstrap.php @@ -0,0 +1,11 @@ +