Skip to content

Commit

Permalink
Merge pull request #57 from darkwood-com/1.x-dev
Browse files Browse the repository at this point in the history
v1.2.3
  • Loading branch information
matyo91 authored Dec 24, 2024
2 parents a0cf747 + 3c8db3b commit c8b0670
Show file tree
Hide file tree
Showing 53 changed files with 1,384 additions and 826 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@

# PHP
/composer.lock
/vendor/
/vendor/
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# Changelog

## v1.2.3

- Add event Event::POOL occurs when Flow needs to count IPs to process.
- Add `Flow\IpPool` for managing pools of Ips.
- Update `Flow\Event\PullEvent` to pull multiple Ips instead one.
- Move `Flow::do` to `FlowFactory::create`
- Add `Flow\Driver\ParallelDriver`

## v1.2.2

- Flow can now use `Flow\JobInterface` as job input
Expand Down
2 changes: 1 addition & 1 deletion LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ composer require darkwood/flow
<?php

use Flow\Flow\Flow;
use Flow\FlowFactory;
use Flow\Ip;

class D1 {
Expand All @@ -45,7 +46,7 @@ class D4 {
public function __construct(public int $n4) {}
}

$flow = Flow::do(static function() {
$flow = (new FlowFactory())->create(static function() {
yield fn (D1 $data1) => new D2($data1->n1 += 1);
yield fn (D2 $data2) => new D3($data2->n2 * 2);
yield function(D3 $data3) {
Expand Down
2 changes: 1 addition & 1 deletion docs/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
"keywords": [
"flow"
],
"version": "1.2.2",
"version": "1.2.3",
"browserslist": [
"defaults"
],
Expand Down
10 changes: 10 additions & 0 deletions docs/src/content/en/docs/getting-started/driver.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,16 @@ pecl install openswoole-22.0.0

More documentation can be found [https://openswoole.com](https://openswoole.com)

## Parallel Driver

To use Parallel Driver, you have to require the library with PECL

```bash
pecl install parallel
```

More documentation can be found [https://www.php.net/manual/en/book.parallel.php](https://www.php.net/manual/en/book.parallel.php)

## Make your custom driver

You can make your custom driver by implementing `Flow\DriverInterface`
2 changes: 1 addition & 1 deletion docs/src/content/en/docs/getting-started/flow.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,4 @@ YFlow use YCombinator to provide recursion.

## Make your own Flow

You can make your custom Flow by implementing `Flow\FlowInterface`.
You can make your custom Flow by implementing `Flow\FlowInterface`.
2 changes: 1 addition & 1 deletion docs/src/content/en/docs/getting-started/ip-strategy.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,4 @@ You can embed it by a custom strategy with is `LinearIpStrategy` by default.

## Make your Ip Strategy

You can make your custom Ip strategy by implementing `Flow\IpStrategyInterface`
You can make your custom Ip strategy by implementing `Flow\IpStrategyInterface`
2 changes: 1 addition & 1 deletion docs/src/content/en/docs/getting-started/license.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,4 @@ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
2 changes: 1 addition & 1 deletion docs/src/layouts/_default/section.sitemap.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,4 @@
{{ end -}}
{{ end -}}
{{ end -}}
</urlset>
</urlset>
44 changes: 44 additions & 0 deletions examples/Transport/Client.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
<?php

declare(strict_types=1);

namespace Flow\Examples\Transport;

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Handler\HandlerDescriptor;
use Symfony\Component\Messenger\Handler\HandlersLocator;
use Symfony\Component\Messenger\MessageBus;
use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware;
use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
use Symfony\Component\Messenger\Worker;

class Client
{
public function __construct(
private SenderInterface $sender,
private ReceiverInterface $receiver
) {}

/**
* @param ?int $delay The delay in milliseconds
*/
public function call(object $data, ?int $delay = null): void
{
$ip = Envelope::wrap($data, $delay ? [new DelayStamp($delay)] : []);
$this->sender->send($ip);
}

/**
* @param callable[][]|HandlerDescriptor[][] $handlers
*/
public function wait(array $handlers): void
{
$bus = new MessageBus([
new HandleMessageMiddleware(new HandlersLocator($handlers)),
]);
$worker = new Worker(['transport' => $this->receiver], $bus);
$worker->run();
}
}
41 changes: 2 additions & 39 deletions examples/client.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,53 +5,16 @@
require __DIR__ . '/../vendor/autoload.php';

use Doctrine\DBAL\DriverManager;
use Flow\Examples\Transport\Client;
use Flow\Examples\Transport\DoctrineIpTransport;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Handler\HandlerDescriptor;
use Symfony\Component\Messenger\Handler\HandlersLocator;
use Symfony\Component\Messenger\MessageBus;
use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware;
use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
use Symfony\Component\Messenger\Worker;

class client
{
public function __construct(
private SenderInterface $sender,
private ReceiverInterface $receiver
) {}

/**
* @param ?int $delay The delay in milliseconds
*/
public function call(object $data, ?int $delay = null): void
{
$ip = Envelope::wrap($data, $delay ? [new DelayStamp($delay)] : []);
$this->sender->send($ip);
}

/**
* @param callable[][]|HandlerDescriptor[][] $handlers
*/
public function wait(array $handlers): void
{
$bus = new MessageBus([
new HandleMessageMiddleware(new HandlersLocator($handlers)),
]);
$worker = new Worker(['transport' => $this->receiver], $bus);
$worker->run();
}
}

$connection = DriverManager::getConnection([
'driver' => 'pdo_sqlite',
'path' => __DIR__ . '/flow.sqlite',
]);
$transport = new DoctrineIpTransport($connection, uniqid('transport_', true));

$client = new client($transport, $transport);
$client = new Client($transport, $transport);

$ip = long2ip(random_int(ip2long('10.0.0.0'), ip2long('10.255.255.255')));
for ($i = 0; $i < 3; $i++) {
Expand Down
8 changes: 5 additions & 3 deletions examples/flow.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

use Flow\Driver\AmpDriver;
use Flow\Driver\FiberDriver;
use Flow\Driver\ParallelDriver;
use Flow\Driver\ReactDriver;
use Flow\Driver\SpatieDriver;
use Flow\Driver\SwooleDriver;
Expand All @@ -14,7 +15,7 @@
use Flow\Examples\Model\DataC;
use Flow\Examples\Model\DataD;
use Flow\ExceptionInterface;
use Flow\Flow\Flow;
use Flow\FlowFactory;
use Flow\Ip;
use Flow\IpStrategy\MaxIpStrategy;
use Flow\Job\ClosureJob;
Expand All @@ -25,6 +26,7 @@
3 => new ReactDriver(),
4 => new SwooleDriver(),
// 5 => new SpatieDriver(),
// 6 => new ParallelDriver(),
};
printf("Use %s\n", $driver::class);
printf("Calculating:\n");
Expand All @@ -35,7 +37,7 @@
$job1 = static function (DataA $dataA) use ($driver): DataB {
printf("*. #%d - Job 1 Calculating %d + %d\n", $dataA->id, $dataA->a, $dataA->b);

// simulating calculating some "light" operation from 0.1 to 1 seconds
// simulating calculating some "light" operation from 1 to 3 seconds
$delay = random_int(1, 3);
$driver->delay($delay);
$d = $dataA->a + $dataA->b;
Expand Down Expand Up @@ -92,7 +94,7 @@
$asyncTask = static function ($job1, $job2, $job3, $errorJob1, $errorJob2, $driver) {
echo "begin - flow asynchronous\n";

$flow = Flow::do(static function () use ($job1, $job2, $job3, $errorJob1, $errorJob2) {
$flow = (new FlowFactory())->create(static function () use ($job1, $job2, $job3, $errorJob1, $errorJob2) {
yield [$job1, $errorJob1, new MaxIpStrategy(2)];
yield [$job2, $errorJob2, new MaxIpStrategy(2)];
yield $job3;
Expand Down
4 changes: 2 additions & 2 deletions examples/server.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
use Flow\Driver\SwooleDriver;
use Flow\Examples\Transport\DoctrineIpTransport;
use Flow\ExceptionInterface;
use Flow\Flow\Flow;
use Flow\Flow\TransportFlow;
use Flow\FlowFactory;
use Flow\Ip;
use Flow\IpStrategy\MaxIpStrategy;
use Symfony\Component\Messenger\Envelope;
Expand Down Expand Up @@ -74,7 +74,7 @@
printf("%s\n", $exception->getMessage());
};

$flow = Flow::do(static function () use ($addOneJob, $multbyTwoJob, $minusThreeJob, $errorJob) {
$flow = (new FlowFactory())->create(static function () use ($addOneJob, $multbyTwoJob, $minusThreeJob, $errorJob) {
yield [$addOneJob, $errorJob, new MaxIpStrategy(1)];
yield [$multbyTwoJob, $errorJob, new MaxIpStrategy(3)];
yield [$minusThreeJob, $errorJob, new MaxIpStrategy(2)];
Expand Down
Loading

0 comments on commit c8b0670

Please sign in to comment.