diff --git a/src/Stream/Configuration.php b/src/Stream/Configuration.php index 554f98d..75102d4 100644 --- a/src/Stream/Configuration.php +++ b/src/Stream/Configuration.php @@ -20,6 +20,7 @@ class Configuration private string $retentionPolicy = RetentionPolicy::LIMITS; private string $storageBackend = StorageBackend::FILE; + private ?float $duplicateWindow = null; private ?int $maxBytes = null; private ?int $maxMessageSize = null; private ?int $maxMessagesPerSubject = null; @@ -61,6 +62,11 @@ public function getDiscardPolicy(): string return $this->discardPolicy; } + public function getDuplicateWindow(): ?float + { + return $this->duplicateWindow; + } + public function getMaxAge(): ?int { return $this->maxAge; @@ -129,6 +135,12 @@ public function setDiscardPolicy(string $policy): self return $this; } + public function setDuplicateWindow(?float $seconds): self + { + $this->duplicateWindow = $seconds; + return $this; + } + public function setMaxAge(?int $maxAge): self { $this->maxAge = $maxAge; @@ -190,6 +202,7 @@ public function toArray(): array 'deny_delete' => $this->getDenyDelete(), 'description' => $this->getDescription(), 'discard' => $this->getDiscardPolicy(), + 'duplicate_window' => $this->getDuplicateWindow() * 1_000_000_000, 'max_age' => $this->getMaxAge(), 'max_bytes' => $this->getMaxBytes(), 'max_consumers' => $this->getMaxConsumers(), diff --git a/tests/StreamTest.php b/tests/StreamTest.php index 5a0724b..1a74c02 100644 --- a/tests/StreamTest.php +++ b/tests/StreamTest.php @@ -15,9 +15,19 @@ class StreamTest extends Test public function testDeduplication() { $stream = $this->getClient()->getApi()->getStream('tester'); - $stream->getConfiguration()->setSubjects(['tester']); + $stream->getConfiguration() + ->setSubjects(['tester']) + ->setDuplicateWindow(0.5); // 500ms windows duplicate + $stream->create(); + // windows value using nanoseconds + $this->assertEquals(0.5 * 1_000_000_000, $stream->info()->getValue('config.duplicate_window')); + + $stream->put('tester', new Payload("hello", [ + 'Nats-Msg-Id' => 'the-message' + ])); + $stream->put('tester', new Payload("hello", [ 'Nats-Msg-Id' => 'the-message' ])); @@ -39,6 +49,21 @@ public function testDeduplication() 'Nats-Msg-Id' => 'the-message' ])); $this->assertSame(0, $consumer->info()->num_pending); + + // 500ms sleep + usleep(500 * 1_000); + + $stream->put('tester', new Payload("hello", [ + 'Nats-Msg-Id' => 'the-message' + ])); + $this->assertSame(1, $consumer->info()->num_pending); + + usleep(500 * 1_000); + + $stream->put('tester', new Payload("hello", [ + 'Nats-Msg-Id' => 'the-message' + ])); + $this->assertSame(2, $consumer->info()->num_pending); } public function testInterrupt()