Skip to content

Commit

Permalink
bug symfony#59352 [Messenger] Fix TransportMessageIdStamp not alway…
Browse files Browse the repository at this point in the history
…s added (HypeMC)

This PR was merged into the 6.4 branch.

Discussion
----------

[Messenger] Fix `TransportMessageIdStamp` not always added

| Q             | A
| ------------- | ---
| Branch?       | 6.4
| Bug fix?      | yes
| New feature?  | no
| Deprecations? | no
| Issues        | -
| License       | MIT

The `TransportMessageIdStamp` doesn't seem to always be added. I’ve noticed that the keepalive log message sometimes has `null` as the message ID. The same probably applies to the `AmazonSqs` and `Amqp` transports, but I'm not too familiar with those transports. https://github.com/symfony/symfony/blob/e36382c2fa8100056e3d47a01e2db6152f5a5bee/src/Symfony/Component/Messenger/Worker.php#L294-L308

Commits
-------

ba02db9 [Messenger] Fix `TransportMessageIdStamp` not always added
  • Loading branch information
fabpot committed Jan 6, 2025
2 parents 02cafa9 + ba02db9 commit 9590658
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdReceivedStamp;
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdReceiver;
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\Connection;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
use Symfony\Component\Serializer as SerializerComponent;
Expand All @@ -39,14 +41,21 @@ public function testItReturnsTheDecodedMessageToTheHandler()
$receiver = new BeanstalkdReceiver($connection, $serializer);
$actualEnvelopes = $receiver->get();
$this->assertCount(1, $actualEnvelopes);
$this->assertEquals(new DummyMessage('Hi'), $actualEnvelopes[0]->getMessage());
/** @var Envelope $actualEnvelope */
$actualEnvelope = $actualEnvelopes[0];
$this->assertEquals(new DummyMessage('Hi'), $actualEnvelope->getMessage());

/** @var BeanstalkdReceivedStamp $receivedStamp */
$receivedStamp = $actualEnvelopes[0]->last(BeanstalkdReceivedStamp::class);
$receivedStamp = $actualEnvelope->last(BeanstalkdReceivedStamp::class);

$this->assertInstanceOf(BeanstalkdReceivedStamp::class, $receivedStamp);
$this->assertSame('1', $receivedStamp->getId());
$this->assertSame($tube, $receivedStamp->getTube());

/** @var TransportMessageIdStamp $transportMessageIdStamp */
$transportMessageIdStamp = $actualEnvelope->last(TransportMessageIdStamp::class);
$this->assertNotNull($transportMessageIdStamp);
$this->assertSame('1', $transportMessageIdStamp->getId());
}

public function testItReturnsEmptyArrayIfThereAreNoMessages()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\Connection;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;

final class BeanstalkdSenderTest extends TestCase
Expand All @@ -27,13 +28,21 @@ public function testSend()
$encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]];

$connection = $this->createMock(Connection::class);
$connection->expects($this->once())->method('send')->with($encoded['body'], $encoded['headers'], 0);
$connection->expects($this->once())->method('send')
->with($encoded['body'], $encoded['headers'], 0)
->willReturn('1')
;

$serializer = $this->createMock(SerializerInterface::class);
$serializer->method('encode')->with($envelope)->willReturn($encoded);

$sender = new BeanstalkdSender($connection, $serializer);
$sender->send($envelope);
$actualEnvelope = $sender->send($envelope);

/** @var TransportMessageIdStamp $transportMessageIdStamp */
$transportMessageIdStamp = $actualEnvelope->last(TransportMessageIdStamp::class);
$this->assertNotNull($transportMessageIdStamp);
$this->assertSame('1', $transportMessageIdStamp->getId());
}

public function testSendWithDelay()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\LogicException;
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
Expand Down Expand Up @@ -52,7 +53,12 @@ public function get(): iterable
throw $exception;
}

return [$envelope->with(new BeanstalkdReceivedStamp($beanstalkdEnvelope['id'], $this->connection->getTube()))];
return [$envelope
->withoutAll(TransportMessageIdStamp::class)
->with(
new BeanstalkdReceivedStamp($beanstalkdEnvelope['id'], $this->connection->getTube()),
new TransportMessageIdStamp($beanstalkdEnvelope['id']),
)];
}

public function ack(Envelope $envelope): void
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
Expand All @@ -39,8 +40,8 @@ public function send(Envelope $envelope): Envelope
$delayStamp = $envelope->last(DelayStamp::class);
$delayInMs = null !== $delayStamp ? $delayStamp->getDelay() : 0;

$this->connection->send($encodedMessage['body'], $encodedMessage['headers'] ?? [], $delayInMs);
$id = $this->connection->send($encodedMessage['body'], $encodedMessage['headers'] ?? [], $delayInMs);

return $envelope;
return $envelope->with(new TransportMessageIdStamp($id));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public function testItReturnsTheDecodedMessageToTheHandler()
$this->assertCount(1, $actualEnvelopes);
/** @var Envelope $actualEnvelope */
$actualEnvelope = $actualEnvelopes[0];
$this->assertEquals(new DummyMessage('Hi'), $actualEnvelopes[0]->getMessage());
$this->assertEquals(new DummyMessage('Hi'), $actualEnvelope->getMessage());

/** @var DoctrineReceivedStamp $doctrineReceivedStamp */
$doctrineReceivedStamp = $actualEnvelope->last(DoctrineReceivedStamp::class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
use Symfony\Component\Messenger\Bridge\Redis\Tests\Fixtures\ExternalMessageSerializer;
use Symfony\Component\Messenger\Bridge\Redis\Transport\Connection;
use Symfony\Component\Messenger\Bridge\Redis\Transport\RedisReceiver;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
Expand All @@ -38,7 +40,14 @@ public function testItReturnsTheDecodedMessageToTheHandler(array $redisEnvelope,
$receiver = new RedisReceiver($connection, $serializer);
$actualEnvelopes = $receiver->get();
$this->assertCount(1, $actualEnvelopes);
$this->assertEquals($expectedMessage, $actualEnvelopes[0]->getMessage());
/** @var Envelope $actualEnvelope */
$actualEnvelope = $actualEnvelopes[0];
$this->assertEquals($expectedMessage, $actualEnvelope->getMessage());

/** @var TransportMessageIdStamp $transportMessageIdStamp */
$transportMessageIdStamp = $actualEnvelope->last(TransportMessageIdStamp::class);
$this->assertNotNull($transportMessageIdStamp);
$this->assertSame($redisEnvelope['id'], $transportMessageIdStamp->getId());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use Symfony\Component\Messenger\Exception\LogicException;
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
Expand Down Expand Up @@ -76,7 +77,12 @@ public function get(): iterable
throw $exception;
}

return [$envelope->with(new RedisReceivedStamp($message['id']))];
return [$envelope
->withoutAll(TransportMessageIdStamp::class)
->with(
new RedisReceivedStamp($message['id']),
new TransportMessageIdStamp($message['id'])
)];
}

public function ack(Envelope $envelope): void
Expand Down

0 comments on commit 9590658

Please sign in to comment.