Skip to content

Commit

Permalink
stream duplicate window configuration using seconds
Browse files Browse the repository at this point in the history
  • Loading branch information
nekufa committed Feb 8, 2022
1 parent ac95b2c commit 1d65ab6
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 1 deletion.
13 changes: 13 additions & 0 deletions src/Stream/Configuration.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class Configuration
private string $retentionPolicy = RetentionPolicy::LIMITS;
private string $storageBackend = StorageBackend::FILE;

private ?float $duplicateWindow = null;
private ?int $maxBytes = null;
private ?int $maxMessageSize = null;
private ?int $maxMessagesPerSubject = null;
Expand Down Expand Up @@ -61,6 +62,11 @@ public function getDiscardPolicy(): string
return $this->discardPolicy;
}

public function getDuplicateWindow(): ?float
{
return $this->duplicateWindow;
}

public function getMaxAge(): ?int
{
return $this->maxAge;
Expand Down Expand Up @@ -129,6 +135,12 @@ public function setDiscardPolicy(string $policy): self
return $this;
}

public function setDuplicateWindow(?float $seconds): self
{
$this->duplicateWindow = $seconds;
return $this;
}

public function setMaxAge(?int $maxAge): self
{
$this->maxAge = $maxAge;
Expand Down Expand Up @@ -190,6 +202,7 @@ public function toArray(): array
'deny_delete' => $this->getDenyDelete(),
'description' => $this->getDescription(),
'discard' => $this->getDiscardPolicy(),
'duplicate_window' => $this->getDuplicateWindow() * 1_000_000_000,
'max_age' => $this->getMaxAge(),
'max_bytes' => $this->getMaxBytes(),
'max_consumers' => $this->getMaxConsumers(),
Expand Down
27 changes: 26 additions & 1 deletion tests/StreamTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,19 @@ class StreamTest extends Test
public function testDeduplication()
{
$stream = $this->getClient()->getApi()->getStream('tester');
$stream->getConfiguration()->setSubjects(['tester']);
$stream->getConfiguration()
->setSubjects(['tester'])
->setDuplicateWindow(0.5); // 500ms windows duplicate

$stream->create();

// windows value using nanoseconds
$this->assertEquals(0.5 * 1_000_000_000, $stream->info()->getValue('config.duplicate_window'));

$stream->put('tester', new Payload("hello", [
'Nats-Msg-Id' => 'the-message'
]));

$stream->put('tester', new Payload("hello", [
'Nats-Msg-Id' => 'the-message'
]));
Expand All @@ -39,6 +49,21 @@ public function testDeduplication()
'Nats-Msg-Id' => 'the-message'
]));
$this->assertSame(0, $consumer->info()->num_pending);

// 500ms sleep
usleep(500 * 1_000);

$stream->put('tester', new Payload("hello", [
'Nats-Msg-Id' => 'the-message'
]));
$this->assertSame(1, $consumer->info()->num_pending);

usleep(500 * 1_000);

$stream->put('tester', new Payload("hello", [
'Nats-Msg-Id' => 'the-message'
]));
$this->assertSame(2, $consumer->info()->num_pending);
}

public function testInterrupt()
Expand Down

0 comments on commit 1d65ab6

Please sign in to comment.