Skip to content

Commit

Permalink
Refactored Opening session logic. Added dedicated methods with Sessio…
Browse files Browse the repository at this point in the history
…nOptions for each session type. Refactored tests to use explicit methods
  • Loading branch information
oskardudycz committed Feb 12, 2023
1 parent 0247267 commit 5652283
Show file tree
Hide file tree
Showing 20 changed files with 358 additions and 194 deletions.
62 changes: 30 additions & 32 deletions docs/scenarios/aggregates-events-repositories.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ public abstract class AggregateBase
public long Version { get; set; }

// JsonIgnore - for making sure that it won't be stored in inline projection
[JsonIgnore]
private readonly List<object> _uncommittedEvents = new List<object>();
[JsonIgnore] private readonly List<object> _uncommittedEvents = new List<object>();

// Get the deltas, i.e. events that make up the state, not yet persisted
public IEnumerable<object> GetUncommittedEvents()
Expand All @@ -42,7 +41,7 @@ public abstract class AggregateBase
}
}
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/ScenarioAggregateAndRepository.cs#L222-L256' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_scenario-aggregate-base' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/ScenarioAggregateAndRepository.cs#L232-L266' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_scenario-aggregate-base' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

With the first piece of infrastructure implemented, two events to capture state changes of an invoice are introduced. Namely, creation of an invoice, accompanied by an invoice number, and addition of lines to an invoice:
Expand Down Expand Up @@ -74,7 +73,7 @@ public sealed class LineItemAdded
}
}
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/ScenarioAggregateAndRepository.cs#L195-L220' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_scenario-aggregate-events' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/ScenarioAggregateAndRepository.cs#L204-L230' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_scenario-aggregate-events' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

With the events in place to present the deltas of an invoice, an aggregate is implemented, using the infrastructure presented above, to create and replay state from the described events.
Expand Down Expand Up @@ -153,7 +152,7 @@ public sealed class Invoice: AggregateBase
}
}
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/ScenarioAggregateAndRepository.cs#L121-L193' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_scenario-aggregate-invoice' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/ScenarioAggregateAndRepository.cs#L129-L202' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_scenario-aggregate-invoice' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

The implemented invoice protects its state by not exposing mutable data, while enforcing its contracts through argument validation. Once an applicable state modification is introduced, either through the constructor (which numbers our invoice and captures that in an event) or the `Invoice.AddLine` method, a respective event capturing that data is recorded.
Expand All @@ -172,30 +171,30 @@ public sealed class AggregateRepository
this.store = store;
}

public void Store(AggregateBase aggregate)
public async Task StoreAsync(AggregateBase aggregate, CancellationToken ct = default)
{
using (var session = store.OpenSession())
{
// Take non-persisted events, push them to the event stream, indexed by the aggregate ID
var events = aggregate.GetUncommittedEvents().ToArray();
session.Events.Append(aggregate.Id, aggregate.Version, events);
session.SaveChanges();
}
await using var session = await store.LightweightSessionAsync(token: ct);
// Take non-persisted events, push them to the event stream, indexed by the aggregate ID
var events = aggregate.GetUncommittedEvents().ToArray();
session.Events.Append(aggregate.Id, aggregate.Version, events);
await session.SaveChangesAsync(ct);
// Once successfully persisted, clear events from list of uncommitted events
aggregate.ClearUncommittedEvents();
}

public T Load<T>(string id, int? version = null) where T : AggregateBase
public async Task<T> LoadAsync<T>(
string id,
int? version = null,
CancellationToken ct = default
) where T : AggregateBase
{
using (var session = store.LightweightSession())
{
var aggregate = session.Events.AggregateStream<T>(id, version ?? 0);
return aggregate ?? throw new InvalidOperationException($"No aggregate by id {id}.");
}
await using var session = await store.LightweightSessionAsync(token: ct);
var aggregate = await session.Events.AggregateStreamAsync<T>(id, version ?? 0, token: ct);
return aggregate ?? throw new InvalidOperationException($"No aggregate by id {id}.");
}
}
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/ScenarioAggregateAndRepository.cs#L258-L291' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_scenario-aggregate-repository' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/ScenarioAggregateAndRepository.cs#L268-L302' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_scenario-aggregate-repository' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

With the last infrastructure component in place, versioned invoices can now be created, persisted and hydrated through Marten. For this purpose, first an invoice is created:
Expand All @@ -208,7 +207,7 @@ var invoice = new Invoice(42);
invoice.AddLine(100, 24, "Joo Janta 200 Super-Chromatic Peril Sensitive Sunglasses");
invoice.AddLine(200, 16, "Happy Vertical People Transporter");
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/ScenarioAggregateAndRepository.cs#L110-L115' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_scenario-aggregate-createinvoice' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/ScenarioAggregateAndRepository.cs#L116-L123' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_scenario-aggregate-createinvoice' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

Then, with an instantiated & configured Document Store (in this case with string as event stream identity) a repository is bootstrapped. The newly created invoice is then passed to the repository, which pushes the deltas to the database and clears them from the to-be-committed list of changes. Once persisted, the invoice data is replayed from the database and verified to match the data of the original item.
Expand All @@ -218,26 +217,26 @@ Then, with an instantiated & configured Document Store (in this case with string
```cs
var repository = new AggregateRepository(theStore);

repository.Store(invoice);
await repository.StoreAsync(invoice);

var invoiceFromRepository = repository.Load<Invoice>(invoice.Id);
var invoiceFromRepository = await repository.LoadAsync<Invoice>(invoice.Id);

Assert.Equal(invoice.ToString(), invoiceFromRepository.ToString());
Assert.Equal(invoice.Total, invoiceFromRepository.Total);
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/ScenarioAggregateAndRepository.cs#L40-L49' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_scenario-aggregate-storeandreadinvoice' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/ScenarioAggregateAndRepository.cs#L41-L52' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_scenario-aggregate-storeandreadinvoice' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

With this infrastructure in place and the ability to model change as events, it is also possible to replay back any previous state of the object. For example, it is possible to see what the invoice looked with only the first line added:

<!-- snippet: sample_scenario-aggregate-versionedload -->
<a id='snippet-sample_scenario-aggregate-versionedload'></a>
```cs
var invoiceFromRepository = repository.Load<Invoice>(invoice.Id, 2);
var invoiceFromRepository = await repository.LoadAsync<Invoice>(invoice.Id, 2);

Assert.Equal(124, invoiceFromRepository.Total);
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/ScenarioAggregateAndRepository.cs#L61-L65' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_scenario-aggregate-versionedload' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/ScenarioAggregateAndRepository.cs#L64-L70' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_scenario-aggregate-versionedload' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

Lastly, to prevent our invoice from getting into a conflicted state, the version attribute of the item is used to assert that the state of the object has not changed between replaying its state and introducing new deltas:
Expand All @@ -248,12 +247,11 @@ Lastly, to prevent our invoice from getting into a conflicted state, the version
var invoice = CreateInvoice();
var invoiceWithSameIdentity = CreateInvoice();

repository.Store(invoice);
await repository.StoreAsync(invoice);

Assert.Throws<EventStreamUnexpectedMaxEventIdException>(() =>
{
repository.Store(invoiceWithSameIdentity);
});
await Assert.ThrowsAsync<EventStreamUnexpectedMaxEventIdException>(() =>
repository.StoreAsync(invoiceWithSameIdentity)
);
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/ScenarioAggregateAndRepository.cs#L73-L83' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_scenario-aggregate-conflict' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/ScenarioAggregateAndRepository.cs#L78-L89' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_scenario-aggregate-conflict' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->
12 changes: 5 additions & 7 deletions src/CommandLineRunner/TestCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,13 @@ public override async Task<bool> Execute(NetCoreInput input)
await store.BulkInsertDocumentsAsync(targets);

Console.WriteLine("QueryOnly");
await using (var session1 = store.QuerySession())
await using (var session1 = await store.QuerySessionAsync())
{
(await session1.Query<Target>().Take(1).ToListAsync()).Single().ShouldBeOfType<Target>();
}

Console.WriteLine("Lightweight");
await using (var session2 = store.LightweightSession())
await using (var session2 = await store.LightweightSessionAsync())
{
(await session2.Query<Target>().Take(1).ToListAsync()).Single().ShouldBeOfType<Target>();

Expand All @@ -76,8 +76,6 @@ public override async Task<bool> Execute(NetCoreInput input)

await session2.SaveChangesAsync();



// Just a smoke test
await session2.QueryAsync(new FindUserByAllTheThings
{
Expand All @@ -86,7 +84,7 @@ await session2.QueryAsync(new FindUserByAllTheThings
}

Console.WriteLine("IdentityMap");
await using (var session3 = store.OpenSession())
await using (var session3 = await store.IdentitySessionAsync())
{
(await session3.Query<Target>().Take(1).ToListAsync()).Single().ShouldBeOfType<Target>();

Expand All @@ -102,7 +100,7 @@ await session3.QueryAsync(new FindUserByAllTheThings
}

Console.WriteLine("DirtyChecking");
await using (var session4 = store.OpenSession())
await using (var session4 = await store.DirtyTrackedSessionAsync())
{
(await session4.Query<Target>().Take(1).ToListAsync()).Single().ShouldBeOfType<Target>();

Expand All @@ -119,7 +117,7 @@ await session4.QueryAsync(new FindUserByAllTheThings
}

Console.WriteLine("Capturing Events");
await using (var session = store.LightweightSession())
await using (var session = await store.LightweightSessionAsync())
{
var streamId = Guid.NewGuid();
session.Events.Append(streamId, new AEvent(), new BEvent(), new CEvent(), new DEvent());
Expand Down
2 changes: 1 addition & 1 deletion src/CoreTests/MartenServiceCollectionExtensionsTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ public IQuerySession QuerySession()
public IDocumentSession OpenSession()
{
BuiltSession = true;
return _store.OpenSession();
return _store.IdentitySession();
}

public bool BuiltSession { get; set; }
Expand Down
2 changes: 1 addition & 1 deletion src/CoreTests/working_with_initial_data.cs
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ public InitialDataWithQuery(params Aggregate1495[] initialData)

public async Task Populate(IDocumentStore store, CancellationToken cancellation)
{
await using var session = store.OpenSession();
await using var session = await store.LightweightSessionAsync(token: cancellation);
if (!session.Query<Aggregate1495>().Any())
{
session.Store(_initialData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,21 +106,17 @@ private static string Format(Guid id)
return id.ToString();
}

private UserWithGuid[] GetUsers(IDocumentStore documentStore)
private static UserWithGuid[] GetUsers(IDocumentStore documentStore)
{
using (var session = documentStore.QuerySession())
{
return session.Query<UserWithGuid>().ToArray();
}
using var session = documentStore.QuerySession();
return session.Query<UserWithGuid>().ToArray();
}

private static void StoreUser(IDocumentStore documentStore, string lastName)
{
using (var session = documentStore.OpenSession())
{
session.Store(new UserWithGuid { LastName = lastName });
session.SaveChanges();
}
using var session = documentStore.IdentitySession();
session.Store(new UserWithGuid { LastName = lastName });
session.SaveChanges();
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,21 +56,17 @@ private static string GetId(UserWithString[] users, string user1)
return users.Single(user => user.LastName == user1).Id;
}

private UserWithString[] GetUsers(IDocumentStore documentStore)
private static UserWithString[] GetUsers(IDocumentStore documentStore)
{
using (var session = documentStore.QuerySession())
{
return session.Query<UserWithString>().ToArray();
}
using var session = documentStore.QuerySession();
return session.Query<UserWithString>().ToArray();
}

private static void StoreUser(IDocumentStore documentStore, string lastName)
{
using (var session = documentStore.OpenSession())
{
session.Store(new UserWithString { LastName = lastName});
session.SaveChanges();
}
using var session = documentStore.IdentitySession();
session.Store(new UserWithString { LastName = lastName});
session.SaveChanges();
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,14 @@ private async Task RunTest(IDocumentStore documentStore)
const string importStreamKey = "original";
const string dataItemStreamKey = "other";

await using (var session = documentStore.OpenSession())
await using (var session = await documentStore.LightweightSessionAsync())
{
session.Events.StartStream<DataImportAggregate>(importStreamKey, new DataImportStartedEvent {ByUser = createdByUser});
session.Events.StartStream<DataItemAggregate>(dataItemStreamKey, new CreateDataItemEvent {ImportId = importStreamKey, Name = "Data item"});
await session.SaveChangesAsync();
}

await using (var session = documentStore.QuerySession())
await using (var session = await documentStore.QuerySessionAsync())
{
var importAggregate = await session.LoadAsync<DataImportAggregate>(importStreamKey);
importAggregate.ShouldNotBeNull();
Expand All @@ -69,14 +69,14 @@ private async Task RunTest(IDocumentStore documentStore)
dataItemAggregate.ImportId.ShouldBe(importStreamKey);
}

await using (var session = documentStore.OpenSession())
await using (var session = await documentStore.LightweightSessionAsync())
{
session.Events.Append(dataItemStreamKey, new DeleteDataItemEvent());
session.Events.Append(importStreamKey, new DeleteImportEvent());
await session.SaveChangesAsync();
}

await using (var session = documentStore.QuerySession())
await using (var session = await documentStore.QuerySessionAsync())
{
var importAggregate = await session.LoadAsync<DataImportAggregate>(importStreamKey);
importAggregate.ShouldBeNull();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public async Task should_be_able_to_save_changes_when_no_stream_action_has_any_e

private static async Task<Guid> CreateEntityForTest(IDocumentStore documentStore, string name, int status)
{
await using var session = documentStore.OpenSession();
await using var session = await documentStore.LightweightSessionAsync();
var stream = session.Events.StartStream<TestEntity>(new CreateEntityEvent
{
Name = name,
Expand Down
Loading

0 comments on commit 5652283

Please sign in to comment.