The term DDD comes from the book by Eric Evans: “Domain-Driven Design: Tackling
+Complexity in the Heart of Software”.
+In his book he describes a set of practices that aim to help us build
+maintainable, rich, software systems that solve customer’s problems. The book is
+560 pages of dense insight, so you’ll pardon me if my summary elides some
+details, but in brief he suggests:
+
+
Listen very carefully to your domain experts - the people whose job you’re
+ automating or assisting in software.
+
Learn the jargon that they use, and help them to come up with new jargon, so
+ that every concept in their mental model is named by a single precise term.
+
Use those terms to model your software; the nouns and verbs of the domain
+ expert are the classes and methods you should use in modelling.
+
Whenever there is a discrepancy between your shared understanding of the
+ domain, go and talk to the domain experts again, and then refactor
+ aggressively.
+
+
This sounds great in theory, but in practice we often find that our business
+logic escapes from our model objects; we end up with logic bleeding into
+controllers, or into fat “manager” classes. We find that refactoring becomes
+difficult: we can’t split a large and important class, because that would
+seriously impact the database schema; or we can’t rewrite the internals of an
+algorithm because it has become tightly coupled to code that exists for a
+different use-case. The good news is that these problems can be avoided, since
+they are caused by a lack of organisation in the codebase. In fact, the tools to
+solve these problems take up half of the DDD book, but it can be be difficult to
+understand how to use them together in the context of a complete system.
+
I want to use this series to introduce an architectural style called
+Ports and Adapters,
+and a design pattern named
+Command Handler.
+I’ll be explaining the patterns in Python because that’s the language that I use
+day-to-day, but the concepts are applicable to any OO language, and can be
+massaged to work perfectly in a functional context. There might be a lot more
+layering and abstraction than you’re used to, especially if you’re coming from a
+Django background or similar, but please bear with me. In exchange for a more
+complex system at the outset, we can avoid much of our accidental complexity later.
+
The system we’re going to build is an issue management system, for use by a
+helpdesk. We’re going to be replacing an existing system, which consists of an
+HTML form that sends an email. The emails go into a mailbox, and helpdesk staff
+go through the mails triaging problems and picking up problems that they can
+solve. Sometimes issues get overlooked for a long time, and the helpdesk team
+have invented a complex system of post-it notes and whiteboard layouts to track
+work in progress. For a while this system has worked pretty well but, as the
+system gets busier, the cracks are beginning to show.
+
Our first conversation with the domain expert
+“What’s the first step in the process?” you ask, “How do tickets end up in the
+mail box?”.
+
“Well, the first thing that happens is the user goes to the web page, and they
+fill out some details, and report an issue. That sends an email into the issue
+log and then we pick issues from the log each morning”.
+
“So when a user reports an issue, what’s the minimal set of data that you need
+from them?”
+
“We need to know who they are, so their name, and email I guess. Uh… and the
+problem description. They’re supposed to add a category, but they never do, and
+we used to have a priority, but everyone set their issue to EXTREMELY URGENT, so
+it was useless.
+
“But a category and priority would help you to triage things?”
+
“Yes, that would be really helpful if we could get users to set them properly.”
+
This gives us our first use case: As a user, I want to be able to report a new
+issue.
+
Okay, before we get to the code, let’s talk about architecture. The architecture
+of a software system is the overall structure - the choice of language,
+technology, and design patterns that organise the code and satisfy our
+constraints [https://en.wikipedia.org/wiki/Non-functional_requirement]. For our
+architecture, we’re going to try and stick with three principles:
+
+
We will always define where our use-cases begin and end. We won’t have
+ business processes that are strewn all over the codebase.
+
We will depend on abstractions
+ [https://en.wikipedia.org/wiki/Dependency_inversion_principle], and not on
+ concrete implementations.
+
We will treat glue code as distinct from business logic, and put it in an
+ appropriate place.
+
+
Firstly we start with the domain model. The domain model encapsulates our shared
+understanding of the problem, and uses the terms we agreed with the domain
+experts. In keeping with principle #2 we will define abstractions for any
+infrastructural or technical concerns and use those in our model. For example,
+if we need to send an email, or save an entity to a database, we will do so
+through an abstraction that captures our intent. In this series we’ll create a
+separate python package for our domain model so that we can be sure it has no
+dependencies on the other layers of the system. Maintaining this rule strictly
+will make it easier to test and refactor our system, since our domain models
+aren’t tangled up with messy details of databases and http calls.
+
Around the outside of our domain model we place services. These are stateless
+objects that do stuff to the domain. In particular, for this system, our command
+handlers are part of the service layer.
+
Finally, we have our adapter layer. This layer contains code that drives the
+service layer, or provides services to the domain model. For example, our domain
+model may have an abstraction for talking to the database, but the adapter layer
+provides a concrete implementation. Other adapters might include a Flask API, or
+our set of unit tests, or a celery event queue. All of these adapters connect
+our application to the outside world.
+
In keeping with our first principle, we’re going to define a boundary for this
+use case and create our first Command Handler. A command handler is an object
+that orchestrates a business process. It does the boring work of fetching the
+right objects, and invoking the right methods on them. It’s similar to the
+concept of a Controller in an MVC architecture.
A command object is a small object that represents a state-changing action that
+can happen in the system. Commands have no behaviour, they’re pure data
+structures. There’s no reason why you have to represent them with classes, since
+all they need is a name and a bag of data, but a NamedTuple is a nice compromise
+between simplicity and convenience. Commands are instructions from an external
+agent (a user, a cron job, another service etc.) and have names in the
+imperative tense, for example:
+
+
ReportIssue
+
PrepareUploadUri
+
CancelOutstandingOrders
+
RemoveItemFromCart
+
OpenLoginSession
+
PlaceCustomerOrder
+
BeginPaymentProcess
+
+
We should try to avoid the verbs Create, Update, or Delete (and their synonyms)
+because those are technical implementations. When we listen to our domain
+experts, we often find that there is a better word for the operation we’re
+trying to model. If all of your commands are named “CreateIssue”, “UpdateCart”,
+“DeleteOrders”, then you’re probably not paying enough attention to the language
+that your stakeholders are using.
+
The command objects belong to the domain, and they express the API of your
+domain. If every state-changing action is performed via a command handler, then
+the list of Commands is the complete list of supported operations in your domain
+model. This has two major benefits:
+
+
If the only way to change state in the system is through a command, then the
+ list of commands tells me all the things I need to test. There are no other
+ code paths that can modify data.
+
Because our commands are lightweight, logic-free objects, we can create them
+ from an HTTP post, or a celery task, or a command line csv reader, or a unit
+ test. They form a simple and stable API for our system that does not depend
+ on any implementation details and can be invoked in multiple ways.
+
+
In order to process our new command, we’ll need to create a command handler.
+
class ReportIssueCommandHandler:
+ def init(self, issue_log):
+ self.issue_log = issue_log
Command handlers are stateless objects that orchestrate the behaviour of a
+system. They are a kind of glue code, and manage the boring work of fetching and
+saving objects, and then notifying other parts of the system. In keeping with
+principle #3, we keep this in a separate layer. To satisfy principle #1, each
+use case is a separate command handler and has a clearly defined beginning and
+end. Every command is handled by exactly one command handler.
+
In general all command handlers will have the same structure:
+
+
Fetch the current state from our persistent storage.
+
Update the current state.
+
Persist the new state.
+
Notify any external systems that our state has changed.
+
+
We will usually avoid if statements, loops, and other such wizardry in our
+handlers, and stick to a single possible line of execution. Command handlers are
+ boring glue code.
+Since our command handlers are just glue code, we won’t put any business logic
+into them - they shouldn’t be making any business decisions. For example, let’s
+skip ahead a little to a new command handler:
+
class MarkIssueAsResolvedHandler:
+ def init(self, issue_log):
+ self.issue_log = issue_log
+
def __call__(self, cmd):
+ issue = self.issue_log.get(cmd.issue_id)
+ # the following line encodes a business rule
+ if (issue.state != IssueStatus.Resolved):
+ issue.mark_as_resolved(cmd.resolution)
+
+
This handler violates our glue-code principle because it encodes a business
+rule: “If an issue is already resolved, then it can’t be resolved a second
+time”. This rule belongs in our domain model, probably in the mark_as_resolved
+method of our Issue object.
+I tend to use classes for my command handlers, and to invoke them with the call
+magic method, but a function is perfectly valid as a handler, too. The major
+reason to prefer a class is that it can make dependency management a little
+easier, but the two approaches are completely equivalent. For example, we could
+rewrite our ReportIssueHandler like this:
If magic methods make you feel queasy, you can define a handler to be a class
+that exposes a handle method like this:
+
class ReportIssueHandler:
+ def handle(self, cmd):
+ …
+
However you structure them, the important ideas of commands and handlers are:
+
+
Commands are logic-free data structures with a name and a bunch of values.
+
They form a stable, simple API that describes what our system can do, and
+ doesn’t depend on any implementation details.
+
Each command can be handled by exactly one handler.
+
Each command instructs the system to run through one use case.
+
A handler will usually do the following steps: get state, change state,
+ persist state, notify other parties that state was changed.
+
+
Let’s take a look at the complete system, I’m concatenating all the files into a
+single code listing for each of grokking, but in the git repository
+[https://github.com/bobthemighty/blog-code-samples/tree/master/ports-and-adapters/01]
+ I’m splitting the layers of the system into separate packages. In the real
+world, I would probably use a single python package for the whole app, but in
+other languages - Java, C#, C++ - I would usually have a single binary for each
+layer. Splitting the packages up this way makes it easier to understand how the
+dependencies work.
+
from typing import NamedTuple
+from expects import expect, have_len, equal
+
Domain model
+
class IssueReporter:
+ def init(self, name, email):
+ self.name = name
+ self.email = email
There’s not a lot of functionality here, and our issue log has a couple of
+problems, firstly there’s no way to see the issues in the log yet, and secondly
+we’ll lose all of our data every time we restart the process. We’ll fix the
+second of those in the next part
+[https://io.made.com/blog/repository-and-unit-of-work-pattern-in-python/].
+
+
+
\ No newline at end of file
diff --git a/posts/2017-09-08-repository-and-unit-of-work-pattern-in-python.html b/posts/2017-09-08-repository-and-unit-of-work-pattern-in-python.html
new file mode 100644
index 0000000..63ca1bd
--- /dev/null
+++ b/posts/2017-09-08-repository-and-unit-of-work-pattern-in-python.html
@@ -0,0 +1,269 @@
+
+
+
+
+
+
+
+
Repository and Unit of Work Pattern
+
by Bob,
+
+
+
+
In the previous part
+(Introducing Command Handler)
+of this series we built a toy system that could add a new Issue to an IssueLog, but
+had no real behaviour of its own, and would lose its data every time the
+application restarted. We’re going to extend it a little by introducing some
+patterns for persistent data access, and talk a little more about the ideas
+underlying ports and adapters architectures. To recap, we’re abiding by three
+principles:
+
+
Clearly define the boundaries of our use cases.
+
Depend on abstractions, not on concrete implementation.
+
Identify glue code as distinct from domain logic and put it into its own
+ layer.
+
+
In our command handler, we wrote the following code:
The IssueLog is a term from our conversation with the domain expert. It’s the
+place that they record the list of all issues. This is part of the jargon used
+by our customers, and so it clearly belongs in the domain, but it’s also the
+ideal abstraction for a data store. How can we modify the code so that our newly
+created Issue will be persisted? We don’t want our IssueLog to depend on the
+database, because that’s a violation of principle #2. This is the question that
+leads us to the ports & adapters architecture.
+
In a ports and adapters architecture, we build a pure domain that exposes ports.
+A port is a way for data to get into, or out of, the domain model. In this
+system, the IssueLog is a port. Ports are connected to the external world by
+Adapters. In the previous code sample, the FakeIssueLog is an adapter: it
+provides a service to the system by implementing an interface.
+
Let’s use a real-world analogy. Imagine we have a circuit that detects current
+over some threshold. If the threshold is reached, the circuit outputs a signal.
+Into our circuit we attach two ports, one for current in, and one for current
+out. The input and output channels are part of our circuit: without them, the
+circuit is useless.
Because we had the great foresight to use standardised ports, we can plug any
+number of different devices into our circuit. For example, we could attach a
+light-detector to the input and a buzzer to the output, or we could attach a
+dial to the input, and a light to the output, and so on.
+
class LightDetector(ReadablePort):
+ def read(self):
+ return self.get_light_amplitude()
+
class Buzzer(WriteablePort):
+ def write(self, value):
+ if value > 0:
+ self.make_infuriating_noise()
+
class Dial(ReadablePort):
+ def read(self):
+ return self.current_value
+
class Light(self):
+ def write(self, value):
+ if value > 0:
+ self.on = True
+ else:
+ self.on = False
+
Considered in isolation, this is just an example of good OO practice: we are
+extending our system through composition. What makes this a ports-and-adapters
+architecture is the idea that there is an internal world consisting of the
+domain model (our ThresholdDetectionCircuit), and an external world that drives
+the domain model through well-defined ports. How does all of this relate to
+databases?
def __init__(self, path):
+ self.path = path
+
+def add(self, issue):
+ with open(self.path, 'w') as f:
+ json.dump(f)
+
+
By analogy to our circuit example, the IssueLog is a WriteablePort - it’s a way
+for us to get data out of the system. SqlAlchemy and the file system are two
+types of adapter that we can plug in, just like the Buzzer or Light classes. In
+fact, the IssueLog is an instance of a common design pattern: it’s a Repository
+[https://martinfowler.com/eaaCatalog/repository.html]. A repository is an object
+that hides the details of persistent storage by presenting us with an interface
+that looks like a collection. We should be able to add new things to the
+repository, and get things out of the repository, and that’s essentially it.
+
Let’s look at a simple repository pattern.
+
class FooRepository:
+ def init(self, db_session):
+ self.session = db_session
We expose a few methods, one to add new items, one to get items by their id, and
+a third to find items by some criterion. This FooRepository is using a
+SqlAlchemy session
+[http://docs.sqlalchemy.org/en/latest/orm/session_basics.html] object, so it’s
+part of our Adapter layer. We could define a different adapter for use in unit
+tests.
+
class FooRepository:
+ def init(self, db_session):
+ self.items = []
+
def add_new_item(self, item):
+ self.items.append(item)
+
+def get_item(self, id):
+ return next((item for item in self.items
+ if item.id == id))
+
+def find_foos_by_latitude(self, latitude):
+ return (item for item in self.items
+ if item.latitude == latitude)
+
+
This adapter works just the same as the one backed by a real database, but does
+so without any external state. This allows us to test our code without resorting
+to Setup/Teardown scripts on our database, or monkey patching our ORM to return
+hard-coded values. We just plug a different adapter into the existing port. As
+with the ReadablePort and WriteablePort, the simplicity of this interface makes
+it simple for us to plug in different implementations.
+
The repository gives us read/write access to objects in our data store, and is
+commonly used with another pattern, the Unit of Work
+[https://martinfowler.com/eaaCatalog/unitOfWork.html]. A unit of work represents
+a bunch of things that all have to happen together. It usually allows us to
+cache objects in memory for the lifetime of a request so that we don’t need to
+make repeated calls to the database. A unit of work is responsible for doing
+dirty checks on our objects, and flushing any changes to state at the end of a
+request.
+
What does a unit of work look like?
+
class SqlAlchemyUnitOfWorkManager(UnitOfWorkManager):
+ “”“The Unit of work manager returns a new unit of work.
+ Our UOW is backed by a sql alchemy session whose
+ lifetime can be scoped to a web request, or a
+ long-lived background job.”“”
+ def init(self, session_maker):
+ self.session_maker = session_maker
class SqlAlchemyUnitOfWork(UnitOfWork):
+ “”“The unit of work captures the idea of a set of things that
+ need to happen together.
+
Usually, in a relational database,
+ one unit of work == one database transaction."""
+
+def __init__(self, sessionfactory):
+ self.sessionfactory = sessionfactory
+
+def __enter__(self):
+ self.session = self.sessionfactory()
+ return self
+
+def __exit__(self, type, value, traceback):
+ self.session.close()
+
+def commit(self):
+ self.session.commit()
+
+def rollback(self):
+ self.session.rollback()
+
+# I tend to put my repositories onto my UOW
+# for convenient access.
+@property
+def issues(self):
+ return IssueRepository(self.session)
+
+
This code is taken from a current production system - the code to implement
+these patterns really isn’t complex. The only thing missing here is some logging
+and error handling in the commit method. Our unit-of-work manager creates a new
+unit-of-work, or gives us an existing one depending on how we’ve configured
+SqlAlchemy. The unit of work itself is just a thin layer over the top of
+SqlAlchemy that gives us explicit rollback and commit points. Let’s revisit our
+first command handler and see how we might use these patterns together.
+
class ReportIssueHandler:
+ def init(self, uowm:UnitOfWorkManager):
+ self.uowm = uowm
Our command handler looks more or less the same, except that it’s now
+responsible for starting a unit-of-work, and committing the unit-of-work when it
+has finished. This is in keeping with our rule #1 - we will clearly define the
+beginning and end of use cases. We know for a fact that only one object is being
+loaded and modified here, and our database transaction is kept short. Our
+handler depends on an abstraction - the UnitOfWorkManager, and doesn’t care if
+that’s a test-double or a SqlAlchemy session, so that’s rule #2 covered. Lastly,
+this code is painfully boring because it’s just glue. We’re moving all the dull
+glue out to the edges of our system so that we can write our domain model in any
+way that we like: rule #3 observed.
+
The code sample for this part
+[https://github.com/bobthemighty/blog-code-samples/tree/master/ports-and-adapters/02]
+ adds a couple of new packages - one for slow tests
+[http://pycon-2012-notes.readthedocs.io/en/latest/fast_tests_slow_tests.html]
+(tests that go over a network, or to a real file system), and one for our
+adapters. We haven’t added any new features yet, but we’ve added a test that
+shows we can insert an Issue into a sqlite database through our command handler
+and unit of work. Notice that all of the ORM code is in one module
+(issues.adapters.orm) and that it depends on our domain model, not the other way
+around. Our domain objects don’t inherit from SqlAlchemy’s declarative base.
+We’re beginning to get some sense of what it means to have the domain on the
+“inside” of a system, and the infrastructural code on the outside.
+
Our unit test has been updated to use a unit of work, and we can now test that
+we insert an issue into our issue log, and commit the unit of work, without
+having a dependency on any actual implementation details. We could completely
+delete SqlAlchemy from our code base, and our unit tests would continue to work,
+because we have a pure domain model and we expose abstract ports from our
+service layer.
Next time [https://io.made.com/blog/commands-and-queries-handlers-and-views]
+we’ll look at how to get data back out of the system.
+
+
+
\ No newline at end of file
diff --git a/posts/2017-09-13-commands-and-queries-handlers-and-views.html b/posts/2017-09-13-commands-and-queries-handlers-and-views.html
new file mode 100644
index 0000000..dd41874
--- /dev/null
+++ b/posts/2017-09-13-commands-and-queries-handlers-and-views.html
@@ -0,0 +1,283 @@
+
+
+
+
+
+
+
+
Commands, Handlers, Queries and Views
+
by Bob,
+
+
+
+
In the first and second parts of this series I introduced the
+Command-Handler
+and
+Unit of Work and Repository patterns.
+I was intending to write about Message Buses, and some more stuff
+about domain modelling, but I need to quickly skim over this first.
+
If you’ve just started reading the Message Buses piece, and you’re here to learn
+about Application-Controlled Identifiers, you’ll find those at the end of post,
+after a bunch of stuff about ORMs, CQRS, and some casual trolling of junior
+programmers.
+
What is CQS ?
+
The Command Query Separation
+principle was first described by Bertrand Meyer in the late Eighties. Per
+wikipedia,
+the principle states:
+
every method should either be a command that performs an action, or a query that
+returns data to the caller, but not both. In other words, “Asking a question
+should not change the answer”. More formally, methods should return a value only
+if they are referentially transparent and hence possess no side effects.
+
Referential transparency is an important concept from functional programming.
+Briefly, a function is referentially transparent if you could replace it with a
+static value.
In this class, the is_on method is referentially transparent - I can replace it
+with the value True or False without any loss of functionality, but the method
+toggle_light is side-effectual: replacing its calls with a static value would
+break the contracts of the system. To comply with the Command-Query separation
+principle, we should not return a value from our toggle_light method.
+
In some languages we would say that the is_on method is “pure”. The advantage of
+splitting our functions into those that have side effects and those that are
+pure is that the code becomes easier to reason about. Haskell loves pure
+functions, and uses this reasonability to do strange things, like re-ordering
+your code for you at compilation time to make it more efficient. For those of us
+who work in more prosaic languages, if commands and queries are clearly
+distinguished, then I can read through a code base and understand all the ways
+in which state can change. This is a huge win for debugging because there is
+nothing worse than troubleshooting a system when you can’t work out which
+code-paths are changing your data.
+
How do we get data out of a Command-Handler architecture?
+When we’re working in a Command-Handler system we obviously use Commands and
+Handlers to perform state changes, but what should we do when we want to get
+data back out of our model? What is the equivalent port for queries?
+
The answer is “it depends”. The lowest-cost option is just to re-use your
+repositories in your UI entrypoints.
+
@app.route("/issues")
+def list_issues():
+ with unit_of_work_manager.start() as unit_of_work:
+ open_issues = unit_of_work.issues.find_by_status('open')
+ return json.dumps(open_issues)
+
+
+
This is totally fine unless you have complex formatting, or multiple entrypoints
+to your system. The problem with using your repositories directly in this way is
+that it’s a slippery slope. Sooner or later you’re going to have a tight
+deadline, and a simple requirement, and the temptation is to skip all the
+command/handler nonsense and do it directly in the web api.
+
@app.route('/issues/<issue_id>', methods=['DELETE'])
+def delete_issue(issue_id):
+ with unit_of_work_manager.start() as uow:
+ issue = uow.issues[issue_id]
+ issue.delete()
+ uow.commit()
+
+
+
Super convenient, but then you need to add some error handling and some logging
+and an email notification.
+
@app.route('/issues/<issue_id>', methods=['DELETE'])
+def delete_issue(issue_id):
+ logging.info("Handling DELETE of issue "+str(issue_id))
+
+ with unit_of_work_manager.start() as uow:
+ issue = uow.issues[issue_id]
+
+ if issue is None:
+ logging.warn("Issue not found")
+ flask.abort(404)
+ if issue.status != 'deleted':
+ issue.delete()
+ uow.commit()
+ try:
+ smtp.send_notification(Issue.Deleted, issue_id)
+ except:
+ logging.error(
+ "Failed to send email notification for deleted issue "
+ + str(issue_id), exn_info=True)
+ else:
+ logging.info("Issue already deleted. NOOP")
+ return "Deleted!", 202
+
+
+
Aaaaand, we’re back to where we started: business logic mixed with glue code,
+and the whole mess slowly congealing in our web controllers. Of course, the
+slippery slope argument isn’t a good reason not to do something, so if your
+queries are very simple, and you can avoid the temptation to do updates from
+your controllers, then you might as well go ahead and read from repositories,
+it’s all good, you have my blessing. If you want to avoid this, because your
+reads are complex, or because you’re trying to stay pure, then instead we could
+define our views explicitly.
+
class OpenIssuesList:
+
+ def __init__(self, sessionmaker):
+ self.sessionmaker = sessionmaker
+
+ def fetch(self):
+ with self.sessionmaker() as session:
+ result = session.execute(
+ 'SELECT reporter_name, timestamp, title
+ FROM issues WHERE state="open"')
+ return [dict(r) for r in result.fetchall()]
+
+
+@api.route('/issues/')
+def list_issues():
+ view_builder = OpenIssuesList(session_maker)
+ return jsonify(view_builder.fetch())
+
+
+
This is my favourite part of teaching ports and adapters to junior programmers,
+because the conversation inevitably goes like this:
+
+
smooth-faced youngling: Wow, um… are you - are we just going to hardcode that
+sql in there? Just … run it on the database?
+
grizzled old architect: Yeah, I think so. Do The Simplest Thing That Could
+Possibly Work, right? YOLO, and so forth.
+
sfy: Oh, okay. Um… but what about the unit of work and the domain model and
+the service layer and the hexagonal stuff? Didn’t you say that “Data access
+ought to be performed against the aggregate root for the use case, so that we
+maintain tight control of transactional boundaries”?
+
goa: Ehhhh… I don’t feel like doing that right now, I think I’m getting
+hungry.
+
sfy: Right, right … but what if your database schema changes?
+
goa: I guess I’ll just come back and change that one line of SQL. My acceptance
+tests will fail if I forget, so I can’t get the code through CI.
+
sfy: But why don’t we use the Issue model we wrote? It seems weird to just
+ignore it and return this dict… and you said “Avoid taking a dependency
+directly on frameworks. Work against an abstraction so that if your dependency
+changes, that doesn’t force change to ripple through your domain”. You know we
+can’t unit test this, right?
+
goa: Ha! What are you, some kind of architecture astronaut? Domain models! Who
+needs ‘em.
+
+
Why have a separate read-model?
+
In my experience, there are two ways that teams go wrong when using ORMs. The
+most common mistake is not paying enough attention to the boundaries of their
+use cases. This leads to the application making far too many calls to the
+database because people write code like this:
+
# Find all users who are assigned this task
+# [[and]] notify them and their line manager
+# then move the task to their in-queue
+notification = task.as_notification()
+for assignee in task.assignees:
+ assignee.manager.notifications.add(notification)
+ assignee.notifications.add(notification)
+ assignee.queues.inbox.add(task)
+
+
+
ORMs make it very easy to “dot” through the object model this way, and pretend
+that we have our data in memory, but this quickly leads to performance issues
+when the ORM generates hundreds of select statements in response. Then they get
+all angry about performance and write long blog posts about how ORM sucks and is
+an anti-pattern and only n00bs like it. This is akin to blaming OO for your
+domain logic ending up in the controller.
+
The second mistake that teams make is using an ORM when they don’t need to. Why
+do we use an ORM in the first place? I think that a good ORM gives us two
+things:
+
+
A unit of work pattern which can be used to control our consistency
+ boundaries.
+
A data mapper pattern that lets us map a complex object graph to relational
+ tables, without writing tons of boring glue code.
+
+
Taken together, these patterns help us to write rich domain models by removing
+all the database cruft so we can focus on our use-cases. This allows us to model
+complex business processes in an internally consistent way. When I’m writing a
+GET method, though, I don’t care about any of that. My view doesn’t need any
+business logic, because it doesn’t change any state. For 99.5% of use cases, it
+doesn’t even matter if my data are fetched inside a transaction. If I perform a
+dirty read when listing the issues, one of three things might happen:
+
+
I might see changes that aren’t yet committed - maybe an Issue that has just
+ been deleted will still show up in the list.
+
I might not see changes that have been committed - an Issue could be missing
+ from the list, or a title might be 10ms out of date.
+
I might see duplicates of my data - an Issue could appear twice in the list.
+
+
In many systems all these occurrences are unlikely, and will be resolved by a
+page refresh or following a link to view more data. To be clear, I’m not
+recommending that you turn off transactions for your SELECT statements, just
+noting that transactional consistency is usually only a real requirement when we
+are changing state. When viewing state, we can almost always accept a weaker
+consistency model.
+
CQRS is CQS at a system-level
+
CQRS stands for Command-Query Responsibility Segregation, and it’s an
+architectural pattern that was popularised by Greg Young. A lot of people
+misunderstand CQRS, and think you need to use separate databases and crazy
+asynchronous processors to make it work. You can do these things, and I want to
+write more about that later, but CQRS just means that we separate the Write
+Model - what we normally think of as the domain model - and the Read Model - a
+lightweight, simple model for showing on the UI, or answering questions about
+the domain state.
+
When I’m serving a write request (a command), my job is to protect the invariants
+of the system, and model the business process as it appears in the minds of our
+domain experts. I take the collective understanding of our business analysts,
+and turn it into a state machine that makes useful work happen. When I’m serving
+a read request (a query), my job is to get the data out of the database as fast
+as possible and onto a screen so the user can view it. Anything that gets in the
+way of my doing that is bloat.
+
This isn’t a new idea, or particularly controversial. We’ve all tried writing
+reports against an ORM, or complex hierarchical listing pages, and hit
+performance barriers. When we get to that point, the only thing we can do -
+short of rewriting the whole model, or abandoning our use of an ORM - is to
+rewrite our queries in raw SQL. Once upon a time I’d feel bad for doing this, as
+though I were cheating, but nowadays I just recognise that the requirements for
+my queries are fundamentally different than the requirements for my commands.
+
For the write-side of the system, use an ORM, for the read side, use whatever is
+a) fast, and b) convenient.
+
Application Controlled Identifiers
+
At this point, a non-junior programmer will say
+
+
Okay, Mr Smarty-pants Architect, if our commands can’t return any values, and
+our domain models don’t know anything about the database, then how do I get an
+ID back from my save method?
+Let’s say I create an API for creating new issues, and when I have POSTed the
+new issue, I want to redirect the user to an endpoint where they can GET their
+new Issue. How can I get the id back?
+
+
The way I would recommend you handle this is simple - instead of letting your
+database choose ids for you, just choose them yourself.
+
@api.route('/issues', methods=['POST'])
+def report_issue(self):
+ # uuids make great domain-controlled identifiers, because
+ # they can be shared amongst several systems and are easy
+ # to generate.
+ issue_id = uuid.uuid4()
+
+ cmd = ReportIssueCommand(issue_id, **request.get_json())
+ handler.handle(cmd)
+ return "", 201, { 'Location': '/issues/' + str(issue_id) }
+
+
+
There’s a few ways to do this, the most common is just to use a UUID, but you
+can also implement something like
+hi-lo.
+In the new
+code sample,
+I’ve implemented three flask endpoints, one to create a new issue, one to list
+all issues, and one to view a single issue. I’m using UUIDs as my identifiers,
+but I’m still using an integer primary key on the issues table, because using a
+GUID in a clustered index leads to table fragmentation and
+sadness
+.
+
Okay, quick spot-check - how are we shaping up against our original Ports and
+Adapters diagram? How do the concepts map?
+
Pretty well! Our domain is pure and doesn’t know anything about infrastructure
+or IO. We have a command and a handler that orchestrate a use-case, and we can
+drive our application from tests or Flask. Most importantly, the layers on the
+outside depend on the layers toward the centre.
+
Next time I’ll get back to talking about message buses.
+
+
+
\ No newline at end of file
diff --git a/posts/2017-09-19-why-use-domain-events.html b/posts/2017-09-19-why-use-domain-events.html
new file mode 100644
index 0000000..89b07b8
--- /dev/null
+++ b/posts/2017-09-19-why-use-domain-events.html
@@ -0,0 +1,461 @@
+
+
+
+
+
+
+
+
Why use domain events?
+
by Bob,
+
+
+
+
Nota bene: this instalment in the Ports and Adapters with Command Handlers
+series is code-heavy, and isn’t going to make much sense unless you’ve read the
+previous parts:
Okay, so we have a basic skeleton for an application and we can add new issues
+into the database, then fetch them from a Flask API. So far, though, we don’t
+have any domain logic at all. All we have is a whole bunch of complicated crap
+where we could just have a tiny Django app. Let’s work through some more
+use-cases and start to flesh things out.
+
Back to our domain expert:
+
So when we’ve added a reported issue to the issue log, what happens next?
+
Well we need to triage the problem and decide how urgent it is. Then we might
+assign it to a particular engineer, or we might leave it on the queue to be
+picked up by anyone.
+
Wait, the queue? I thought you had an issue log, are they the same thing, or is
+there a difference?
+
Oh, yes. The issue log is just a record of all the issues we have received, but
+we work from the queue.
+
I see, and how do things get into the queue?
+
We triage the new items in the issue log to decide how urgent they are, and what
+categories they should be in. When we know how to categorise them, and how
+urgent they are, we treat the issues as a queue, and work through them in
+priority order.
+
This is because users always set things to “Extremely urgent”?
+
Yeah, it’s just easier for us to triage the issues ourselves.
+
And what does that actually mean, like, do you just read the ticket and say “oh,
+this is 5 important, and it’s in the broken mouse category”?
+
Mmmm… more or less, sometimes we need to ask more questions from the user so
+we’ll email them, or call them. Most things are first-come, first-served, but
+occasionally someone needs a fix before they can go to a meeting or something.
+
So you email the user to get more information, or you call them up, and then you
+use that information to assess the priority of the issue - sorry triage the
+issue, and work out what category it should go in… what do the categories
+achieve? Why categorise?
+
Partly for reporting, so we can see what stuff is taking up the most time, or if
+there are clusters of similar problems on a particular batch of laptops for
+example. Mostly because different engineers have different skills, like if you
+have a problem with the Active Directory domain, then you should send that to
+Barry, or if it’s an Exchange problem, then George can sort it out, and Mike has
+the equipment log so he can give you a temporary laptop and so on, and so on.
+
Okay, and where do I find this “queue”?
+
Your customer grins and gestures at the wall where a large whiteboard is covered
+in post-its and stickers of different colours.
+
Mapping our requirements to our domain
+How can we map these requirements back to our system? Looking back over our
+notes with the domain expert, there’s a few obvious verbs that we should use to
+model our use cases. We can triage an issue, which means we prioritise and
+categorise it; we can assign a triaged issue to an engineer, or an engineer can
+ pick up an unassigned issue. There’s also a whole piece about asking
+questions, which we might do synchronously by making a phone call and filling
+out some more details, or asynchronously by sending an email. The Queue, with
+all of its stickers and sigils and swimlanes looks too complicated to handle
+today, so we’ll dig deeper into that separately.
+
Let’s quickly flesh out the triage use cases. We’ll start by updating the
+existing unit test for reporting an issue:
We’re introducing a new concept - Issues now have a state, and a newly reported
+issue begins in the AwaitingTriage state. We can quickly add a command and
+handler that allows us to triage an issue.
Triaging an issue, for now, is a matter of selecting a category and priority.
+We’ll use a free string for category, and an enumeration for Priority. Once an
+issue is triaged, it enters the AwaitingAssignment state. At some point we’ll
+need to add some view builders to list issues that are waiting for triage or
+assignment, but for now let’s quickly add a handler so that an engineer can Pick
+ an issue from the queue.
At this point, the handlers are becoming a little boring. As I said way back in
+the first part [https://io.made.com/blog/introducing-command-handler/], commands
+handlers are supposed to be boring glue-code, and every command handler has the
+same basic structure:
+
+
Fetch current state.
+
Mutate the state by calling a method on our domain model.
+
Persist the new state.
+
Notify other parts of the system that our state has changed.
+
+
So far, though, we’ve only seen steps 1, 2, and 3. Let’s introduce a new
+requirement.
+
When an issue is assigned to an engineer, can we send them an email to let them
+know?
+
A brief discourse on SRP
+Let’s try and implement this new requirement. Here’s a first attempt:
Something here feels wrong, right? Our command-handler now has two very distinct
+responsibilities. Back at the beginning of this series we said we would stick
+with three principles:
+
+
We will always define where our use-cases begin and end.
+
We will depend on abstractions, and not on concrete implementations.
+
We will treat glue code as distinct from business logic, and put it in an
+ appropriate place.
+
+
The latter two are being maintained here, but the first principle feels a little
+more strained. At the very least we’re violating the Single Responsibility
+Principle [https://en.wikipedia.org/wiki/Single_responsibility_principle]; my
+rule of thumb for the SRP is “describe the behaviour of your class. If you use
+the word ‘and’ or ‘then’ you may be breaking the SRP”. What does this class do?
+It assigns an issue to an engineer, AND THEN sends them an email. That’s enough
+to get my refactoring senses tingling, but there’s another, less theoretical,
+reason to split this method up, and it’s to do with error handling.
+
If I click a button marked “Assign to engineer”, and I can’t assign the issue to
+that engineer, then I expect an error. The system can’t execute the command I’ve
+given to it, so I should retry, or choose a different engineer.
+
If I click a button marked “Assign to engineer”, and the system succeeds, but
+then can’t send a notification email, do I care? What action should I take in
+response? Should I assign the issue again? Should I assign it to someone else?
+What state will the system be in if I do?
+
Looking at the problem in this way, it’s clear that “assigning the issue” is the
+real boundary of our use case, and we should either do that successfully, or
+fail completely. “Send the email” is a secondary side effect. If that part fails
+I don’t want to see an error - let the sysadmins clear it up later.
+
What if we split out the notification to another class?
We don’t really need a unit of work here, because we’re not making any
+persistent changes to the Issue state, so what if we use a view builder instead?
That seems better, but how should we invoke our new handler? Building a new
+command and handler from inside our AssignIssueHandler also sounds like a
+violation of SRP. Worse still, if we start calling handlers from handlers, we’ll
+end up with our use cases coupled together again - and that’s definitely a
+violation of Principle #1.
+
What we need is a way to signal between handlers - a way of saying “I did my
+job, can you go do yours?”
+
All Aboard the Message Bus
+In this kind of system, we use Domain Events
+[http://verraes.net/2014/11/domain-events/] to fill that need. Events are
+closely related to Commands, in that both commands and events are types of
+message
+[http://www.enterpriseintegrationpatterns.com/patterns/messaging/Message.html]
+- named chunks of data sent between entities. Commands and events differ only in
+their intent:
+
+
Commands are named with the imperative tense (Do this thing), events are
+ named in the past tense (Thing was done).
+
Commands must be handled by exactly one handler, events can be handled by 0
+ to N handlers.
+
If an error occurs when processing a command, the entire request should
+ fail. If an error occurs while processing an event, we should fail
+ gracefully.
+
+
We will often use domain events to signal that a command has been processed and
+to do any additional book-keeping. When should we use a domain event? Going back
+to our principle #1, we should use events to trigger workflows that fall outside
+of our immediate use-case boundary. In this instance, our use-case boundary is
+“assign the issue”, and there is a second requirement “notify the assignee” that
+should happen as a secondary result. Notifications, to humans or other systems,
+are one of the most common reasons to trigger events in this way, but they might
+also be used to clear a cache, or regenerate a view model, or execute some logic
+to make the system eventually consistent.
+
Armed with this knowledge, we know what to do - we need to raise a domain event
+when we assign an issue to an engineer. We don’t want to know about the
+subscribers to our event, though, or we’ll remain coupled; what we need is a
+mediator, a piece of infrastructure that can route messages to the correct
+places. What we need is a message bus. A message bus is a simple piece of
+middleware that’s responsible for getting messages to the right listeners. In
+our application we have two kinds of message, commands and events. These two
+types of message are in some sense symmetrical, so we’ll use a single message
+bus for both.
+
How do we start off writing a message bus? Well, it needs to look up subscribers
+based on the name of an event. That sounds like a dict to me:
+
class MessageBus:
+
def __init__(self):
+ """Our message bus is just a mapping from message type
+ to a list of handlers"""
+ self.subscribers = defaultdict(list)
+
+def handle(self, msg):
+ """The handle method invokes each handler in turn
+ with our event"""
+ msg_name = type(msg).__name__
+ subscribers = self.subscribers[msg_name]
+ for subscriber in subscribers:
+ subscriber.handle(cmd)
+
+def subscribe_to(self, msg, handler):
+ """Subscribe sets up a new mapping, we make sure not
+ to allow more than one handler for a command"""
+ subscribers = [msg.__name__]
+ if msg.is_cmd and len(subscribers) > 0:
+ raise CommandAlreadySubscribedException(msg.__name__)
+ subscribers.append(handler)
+
+
Example usage
+
bus = MessageBus()
+bus.subscribe_to(ReportIssueCommand, ReportIssueHandler(db.unit_of_work_manager))
+bus.handle(cmd)
+
Here we have a bare-bones implementation of a message bus. It doesn’t do
+anything fancy, but it will do the job for now. In a production system, the
+message bus is an excellent place to put cross-cutting concerns; for example, we
+might want to validate our commands before passing them to handlers, or we may
+want to perform some basic logging, or performance monitoring. I want to talk
+more about that in the next part, when we’ll tackle the controversial subject of
+dependency injection and Inversion of Control containers.
+
For now, let’s look at how to hook this up. Firstly, we want to use it from our
+API handlers.
Not much has changed here - we’re still building our command in the Flask
+adapter, but now we’re passing it into a bus instead of directly constructing a
+handler for ourselves. What about when we need to raise an event? We’ve got
+several options for doing this. Usually I raise events from my command handlers,
+like this:
+
class AssignIssueHandler:
+
def handle(self, cmd):
+ with self.uowm.start() as uow:
+ issue = uow.issues.get(cmd.id)
+ issue.assign_to(cmd.assigned_to, cmd.assigned_by)
+ uow.commit()
+
+ # This is step 4: notify other parts of the system
+ self.bus.raise(IssueAssignedToEngineer(
+ cmd.issue_id,
+ cmd.assigned_to,
+ cmd.assigned_by))
+
+
I usually think of this event-raising as a kind of glue - it’s orchestration
+code. Raising events from your handlers this way makes the flow of messages
+explicit - you don’t have to look anywhere else in the system to understand
+which events will flow from a command. It’s also very simple in terms of
+plumbing. The counter argument is that this feels like we’re violating SRP in
+exactly the same way as before - we’re sending a notification about our
+workflow. Is this really any different to sending the email directly from the
+handler? Another option is to send events directly from our model objects, and
+treat them as part our domain model proper.
+
class Issue:
+
def assign_to(self, assigned_to, assigned_by):
+ self.assigned_to = assigned_to
+ self.assigned_by = assigned_by
+
+ # Add our new event to a list
+ self.events.add(IssueAssignedToEngineer(self.id, self.assigned_to, self.assigned_by))
+
+
There’s a couple of benefits of doing this: firstly, it keeps our command
+handler simpler, but secondly it pushes the logic for deciding when to send an
+event into the model. For example, maybe we don’t always need to raise the
+event.
+
class Issue:
+
def assign_to(self, assigned_to, assigned_by):
+ self.assigned_to = assigned_to
+ self.assigned_by = assigned_by
+
+ # don't raise the event if I picked the issue myself
+ if self.assigned_to != self.assigned_by:
+ self.events.add(IssueAssignedToEngineer(self.id, self.assigned_to, self.assigned_by))
+
+
Now we’ll only raise our event if the issue was assigned by another engineer.
+Cases like this are more like business logic than glue code, so today I’m
+choosing to put them in my domain model. Updating our unit tests is trivial,
+because we’re just exposing the events as a list on our model objects:
The have_raised function is a custom matcher I wrote that checks the events
+attribute of our object to see if we raised the correct event. It’s easy to test
+for the presence of events, because they’re namedtuples, and have value
+equality.
+
All that remains is to get the events off our model objects and into our message
+bus. What we need is a way to detect that we’ve finished one use-case and are
+ready to flush our changes. Fortunately, we have a name for this already - it’s
+a unit of work. In this system I’m using SQLAlchemy’s event hooks
+[http://docs.sqlalchemy.org/en/latest/orm/session_events.html] to work out
+which objects have changed, and queue up their events. When the unit of work
+exits, we raise the events.
+
class SqlAlchemyUnitOfWork(UnitOfWork):
+
def __init__(self, sessionfactory, bus):
+ self.sessionfactory = sessionfactory
+ self.bus = bus
+ # We want to listen to flush events so that we can get events
+ # from our model objects
+ event.listen(self.sessionfactory, "after_flush", self.gather_events)
+
+def __enter__(self):
+ self.session = self.sessionfactory()
+ # When we first start a unit of work, create a list of events
+ self.flushed_events = []
+ return self
+
+def commit(self):
+ self.session.flush()
+ self.session.commit()
+
+def rollback(self):
+ self.session.rollback()
+ # If we roll back our changes we should drop all the events
+ self.events = []
+
+def gather_events(self, session, ctx):
+ # When we flush changes, add all the events from our new and
+ # updated entities into the events list
+ flushed_objects = ([e for e in session.new]
+ + [e for e in session.dirty])
+ for e in flushed_objects:
+ self.flushed_events += e.events
+
+def publish_events(self):
+ # When the unit of work completes
+ # raise any events that are in the list
+ for e in self.flushed_events:
+ self.bus.handle(e)
+
+def __exit__(self, type, value, traceback):
+ self.session.close()
+ self.publish_events()
+
+
Okay, we’ve covered a lot of ground here. We’ve discussed why you might want to
+use domain events, how a message bus actually works in practice, and how we can
+get events out of our domain and into our subscribers. The newest code sample
+[https://github.com/bobthemighty/blog-code-samples/tree/master/ports-and-adapters/04]
+ demonstrates these ideas, please do check it out, run it, open pull requests,
+open Github issues etc.
+
Some people get nervous about the design of the message bus, or the unit of
+work, but this is just infrastructure - it can be ugly, so long as it works.
+We’re unlikely to ever change this code after the first few user-stories. It’s
+okay to have some crufty code here, so long as it’s in our glue layers, safely
+away from our domain model. Remember, we’re doing all of this so that our domain
+model can stay pure and be flexible when we need to refactor. Not all layers of
+the system are equal, glue code is just glue.
+
Next time I want to talk about Dependency Injection, why it’s great, and why
+it’s nothing to be afraid of.
+
+
+
\ No newline at end of file
diff --git a/posts/2020-01-25-testing_external_api_calls.html b/posts/2020-01-25-testing_external_api_calls.html
new file mode 100644
index 0000000..a0eb8ba
--- /dev/null
+++ b/posts/2020-01-25-testing_external_api_calls.html
@@ -0,0 +1,698 @@
+
+
+
+
+
+
+
+
Writing tests for external API calls
+
by Harry,
+
+
+
+
Here’s a common question from people doing testing in Python:
+
+
How do I write tests for for code that calls out to a third-party API?
+
+
(with thanks to Brian Okken for suggesting the question).
+
In this article I’d like to outline several options, starting from the
+most familiar (mocks) going out to the most architecture-astronautey,
+and try and discuss the pros and cons of each one. With luck I’ll convince you
+to at least try out some of the ideas near the end.
+
I’m going to use an example from the domain of logistics where we need to sync
+shipments to a cargo provider’s API, but you can really imagine any old API–a
+payment gateway, an SMS notifications engine, a cloud storage provider. Or you
+can imagine an external dependency that’s nothing to do with the web at all, just
+any kind of external I/O dependency that’s hard to unit test.
+
But to make things concrete, in our logistics example, we’ll have a model of a
+shipment which contains a number of order lines. We also care about its
+estimated time of arrival (eta) and a bit of jargon called the incoterm
+(you don’t need to understand what that is, I’m just trying to illustrate a bit
+of real-life complexity, in this small example).
+
@dataclass
+class OrderLine:
+ sku: str # sku="stock keeping unit", it's a product id basically
+ qty: int
+
+
+@dataclass
+class Shipment:
+ reference: str
+ lines: List[OrderLine]
+ eta: Optional[date]
+ incoterm: str
+
+ def save(self):
+ ... # for the sake of the example, let's imagine the model
+ # knows how to save itself to the DB. like Django.
+
+
+
+
We want to sync our shipments model with a third party, the cargo freight
+company, via their API. We have a couple of use cases: creating new shipments,
+and checking for updated etas.
+
Let’s say we have some sort of controller function that’s in charge of doing this. It
+takes a dict mapping skus to quantities, creates our model objects, saves them, and
+then calls a helper function to sync to the API. Hopefully this sort of thing
+looks familiar:
And you can imagine adding a few more tests, perhaps one that checks that we do
+the date-to-isoformat conversion correctly, maybe one that checks we can handle
+multiple lines. Three tests, one mock each, we’re ok.
+
The trouble is that it never stays quite that simple does it? For example,
+the cargo company may already have a shipment on record, because reasons.
+And if you do a POST when something already exists, then bad things happen.
+So we first need to check whether they have a shipment on file, using
+a GET request, and then we either do a POST if it’s new, or a PUT for
+an existing one:
+
def sync_to_api(shipment):
+ external_shipment_id = get_shipment_id(shipment.reference)
+ if external_shipment_id is None:
+ requests.post(f'{API_URL}/shipments/', json={
+ 'client_reference': shipment.reference,
+ 'arrival_date': shipment.eta,
+ 'products': [
+ {'sku': ol.sku, 'quantity': ol.quantity}
+ for ol in shipment.lines
+ ]
+ })
+
+ else:
+ requests.put(f'{API_URL}/shipments/{external_shipment_id}', json={
+ 'client_reference': shipment.reference,
+ 'arrival_date': shipment.eta,
+ 'products': [
+ {'sku': ol.sku, 'quantity': ol.quantity}
+ for ol in shipment.lines
+ ]
+ })
+
+
+def get_shipment_id(our_reference) -> Optional[str]:
+ their_shipments = requests.get(f"{API_URL}/shipments/").json()['items']
+ return next(
+ (s['id'] for s in their_shipments if s['client_reference'] == our_reference),
+ None
+ )
+
+
+
And as usual, complexity creeps in:
+
+
+
Because things are never easy, the third party has different reference
+ numbers to us, so we need the get_shipment_id() function that finds the
+ right one for us
+
+
+
And we need to use POST if it’s a new shipment, or PUT if it’s an existing one.
+
+
+
Already you can imagine we’re going to need to write quite a few tests to cover
+all these options. Here’s just one, as an example:
…and our tests are getting less and less pleasant. Again, the details don’t
+matter too much, the hope is that this sort of test ugliness is familiar.
+
And this is only the beginning, we’ve shown an API integration that only cares
+about writes, but what about reads? Say we want to poll our third party api
+now and again to get updated etas for our shipments. Depending on the eta, we
+have some business logic about notifying people of delays…
+
# another example controller,
+# showing business logic getting intermingled with API calls
+
+def get_updated_eta(shipment):
+ external_shipment_id = get_shipment_id(shipment.reference)
+ if external_shipment_id is None:
+ logging.warning('tried to get updated eta for shipment %s not yet sent to partners', shipment.reference)
+ return
+
+ [journey] = requests.get(f"{API_URL}/shipments/{external_shipment_id}/journeys").json()['items']
+ latest_eta = journey['eta']
+ if latest_eta == shipment.eta:
+ return
+ logging.info('setting new shipment eta for %s: %s (was %s)', shipment.reference, latest_eta, shipment.eta)
+ if shipment.eta is not None and latest_eta > shipment.eta:
+ notify_delay(shipment_ref=shipment.reference, delay=latest_eta - shipment.eta)
+ if shipment.eta is None and shipment.incoterm == 'FOB' and len(shipment.lines) > 10:
+ notify_new_large_shipment(shipment_ref=shipment.reference, eta=latest_eta)
+
+ shipment.eta = latest_eta
+ shipment.save()
+
+
+
I haven’t coded up what all the tests would look like, but you could imagine them:
+
+
a test that if the shipment does not exist, we log a warning. Needs to mock requests.get or get_shipment_id()
+
a test that if the eta has not changed, we do nothing. Needs two different mocks on requests.get
+
a test for the error case where the shipments api has no journeys
+
a test for the edge case where the shipment has multiple journeys
+
a tests to check that if the eta is is later than the current one, we do a
+ notification.
+
and a test of the converse, no notification if eta sooner
+
a test for the large shipments notification
+
and a test that we only do that one if necessary
+
and a general test that we update the local eta and save it.
+
…I’m sure we can imagine some more.
+
+
And each one of these tests needs to set up three or four mocks. We’re getting
+into what Ed Jung calls Mock Hell.
+
On top of our tests being hard to read and write, they’re also brittle. If we
+change the way we import, from import requests to from requests import get
+(not that you’d ever do that, but you get the point), then all our mocks break.
+If you want a more plausible example, perhaps we decide to stop using
+requests.get() because we want to use requests.Session() for whatever
+reason.
+
+
The point is that mock.patch ties you to specific implementation details
+
+
And we haven’t even spoken about other kinds of tests. To reassure yourself
+that things really work, you’re probably going to want an integration test or
+two, and maybe an E2E test.
+
Here’s a little recap of the pros and cons of the mocking approach. We’ll
+have one of these each time we introduce a new option.
+
Mocking and patching: tradeoffs
+
Pros:
+
+
no change to client code
+
low effort
+
it’s familiar to (most? many?) devs
+
+
Cons:
+
+
tightly coupled
+
brittle. requests.get -> requests.Session().get will break it.
+
need to remember to @mock.patch every single test that might
+ end up invoking that api
+
easy to mix together business logic and I/O concerns
+
probably need integration & E2E tests as well.
+
+
SUGGESTION: Build an Adapter (a wrapper for the external API)
+
We really want to disentangle our business logic from our API integration.
+Building an abstraction, a wrapper around the API that just exposes nice,
+readable methods for us to call in our code.
+
+
We call it an “adapter” in ports & adapters sense,
+but you don’t have to go full-on hexagonal architecture to use
+this pattern.
SUGGESTION: Use (only?) integration tests to test your Adapter
+
Now we can test our adapter separately from our main application code, we
+can have a think about what the best way to test it is. Since it’s just
+a thin wrapper around an external system, the best kinds of tests are integration
+tests:
+
def test_can_create_new_shipment():
+ api = RealCargoAPI('https://sandbox.example.com/')
+ line = OrderLine('sku1', 10)
+ ref = random_reference()
+ shipment = Shipment(reference=ref, lines=[line], eta=None, incoterm='foo')
+
+ api.sync(shipment)
+
+ shipments = requests.get(api.api_url + '/shipments/').json()['items']
+ new_shipment = next(s for s in shipments if s['client_reference'] == ref)
+ assert new_shipment['arrival_date'] is None
+ assert new_shipment['products'] == [{'sku': 'sku1', 'quantity': 10}]
+
+
+def test_can_update_a_shipment():
+ api = RealCargoAPI('https://sandbox.example.com/')
+ line = OrderLine('sku1', 10)
+ ref = random_reference()
+ shipment = Shipment(reference=ref, lines=[line], eta=None, incoterm='foo')
+
+ api.sync(shipment)
+
+ shipment.lines[0].qty = 20
+
+ api.sync(shipment)
+
+ shipments = requests.get(api.api_url + '/shipments/').json()['items']
+ new_shipment = next(s for s in shipments if s['client_reference'] == ref)
+ assert new_shipment['products'] == [{'sku': 'sku1', 'quantity': 20}]
+
+
+
That relies on your third-party api having a decent sandbox that you can test against.
+You’ll need to think about:
+
+
+
how do you clean up? Running dozens of tests dozens of times a day in dev
+ and CI will start filling the sandbox with test data.
+
+
+
is the sandbox slow and annoying to test against? are devs going to be
+ annoyed at waiting for integration tests to finish on their machines, or
+ in CI?
+
+
+
is the sandbox flakey at all? have you now introduced randomly-failing
+ tests in your build?
+
+
+
Adapter around api, with integration tests, tradeoffs:
+
Pros:
+
+
obey the “don’t mock what you don’t own” rule.
+
we present a simple api, which is easier to mock
+
we stop messing about with mocks like requests.get.return_value.json.return_value
+
if we ever change our third party, there’s a good chance that the API of our
+ adapter will not change. so our core app code (and its tests) don’t need
+ to change.
+
+
Cons:
+
+
we’ve added an extra layer in our application code, which for simple cases
+ might be unnecessary complexity
+
integration tests are strongly dependent on your third party providing a good
+ test sandbox
+
integration tests may be slow and flakey
+
+
OPTION: vcr.py
+
I want to give a quick nod to vcr.py
+at this point.
+
VCR is a very neat solution. It lets you run your tests against a real
+endpoint, and then it captures the outgoing and incoming requests, and
+serializes them to disk. Next time you run the tests, it intercepts your HTTP
+requests, compares them against the saved ones, and replays past responses.
+
The end result is that you have a way of running integration tests with
+realistic simulated responses, but without actually needing to talk to
+an external third party.
+
At any time you like, you can also trigger a test run against the real API,
+and it will update your saved response files. This gives you a way of
+checking whether things have changed on a periodic basis, and updating
+your recorded responses when they do.
+
As I say it’s a very neat solution, and I’ve used it successfully, but it does
+have some drawbacks:
+
+
+
Firstly the workflow can be quite confusing. While you’re still evolving
+ your integration, your code is going to change, and the canned responses too,
+ and it can be hard to keep track of what’s on disk, what’s fake and what’s not.
+ One person can usually wrap their head around it, but it’s a steep learning
+ curve for other members of the team. That can be particularly painful if
+ it’s code that only gets changed infrequently, because it’s long enough for
+ everyone to forget.
+
+
+
Secondly, vcr.py is tricky to configure when you have randomised data in
+ your requests (eg unique ids). By default it looks for requests that are
+ exactly the same as the ones it’s recorded. You can configure “matchers” to
+ selectively ignore certain fields when recognising requests, but that
+ only deals with half the problem.
+
+
+
If you send out a POST and follow up with a GET for the same ID, you might be
+ able to configure a matcher to ignore the ID in the requests, but the
+ responses will still contain the old IDs. That will break any logic on your
+ own side that’s doing any logic based on those IDs.
+
+
+
vcr.py tradeoffs
+
Pros:
+
+
gives you a way of isolating tests from external dependencies by replaying canned responses
+
can re-run against real API at any time
+
no changes to application code required
+
+
Cons:
+
+
can be tricky for team-members to understand
+
dealing with randomly-generated data is hard
+
challenging to simulate state-based workflows
+
+
OPTION: Build your own fake for integration tests
+
We’re into dangerous territory now, the solution we’re about to present is not
+necessarily a good idea in all cases. Like any solution you find on random blogs
+on the internet I suppose, but still.
+
So when might you think about doing this?
+
+
if the integration is not core to your application, i.e it’s an incidental feature
+
if the bulk of the code you write, and the feedback you want, is not about
+ integration issues, but about other things in your app
+
if you really can’t figure out how to fix the problems with your integration
+ tests another way (retries? perhaps they’d be a good idea anyway?)
+
+
Then you might consider building your own fake version of the external API. Then
+you can spin it up in a docker container, run it alongside your test code, and
+talk to that instead of the real API.
+
Faking a third party is often quite simple. A REST API around a CRUD data model
+might just pop json objects in an out of an in-memory dict, for example:
This doesn’t mean you never test against the third-party API, but
+you’ve now given yourself the option not to.
+
+
+
perhaps you test against the real API in CI, but not in dev
+
+
+
perhaps you have a way of marking certain PRs as needing
+ “real” api integration tests
+
+
+
perhaps you have some logic in CI that looks at what code has
+ changed in a given PR, tries to spot anything to do with the
+ third party api, and only then runs against the real API
+
+
+
OPTION: Contract tests
+
I’m not sure if “contract tests” is a real bit of terminology, but the idea is
+to test that the behaviour of the third party API conforms to a contract. That
+it does what you need it to do.
+
They’re different from integration tests because you may not be testing
+your adapter itself, and they tend to be against a single endpoint at a time.
+Things like:
+
+
+
checking the format and datatypes of data for given endpoints. are all
+ the fields you need there?
+
+
+
if the third party api has bugs you need to work around, you might repro
+ that bug in a test, so that you know if they ever fix it
+
+
+
These tests tend to be more lightweight than integration tests, in that
+they are often read-only, so they suffer less from problems related to
+clean-up. You might decide they’re useful in addition to integration tests,
+or they might be a useful backup option if proper integration tests aren’t
+possible. In a similar way, you probably want ways of selectively running
+your contract tests against your third party.
+
+
you can also run your contract tests against your fake api.
+
+
When you run your contract tests against your own fake api as well as
+against the real thing, you’re confirming the quality of your fake.
+Some people call this verified fakes
+(see also “stop mocking and start testing”.)
+
+
OPTION: DI
+
We still have the problem that using mock.patch ties us to specific
+ways of importing our adapter. We also need to remember to set up
+that mock on any test that might use the third party adapter.
+
+
Making the dependency explicit and using DI solves these problems
+
+
Again, we’re in dangerous territory here. Python people are skeptical
+of DI, and neither of these problems is that big of a deal. But
+DI does buy us some nice things, so read on with an open mind.
+
First, you might like to define an interface for your dependency explicitly.
+You could use an abc.ABC, or if you’re anti-inheritance, a newfangled
+typing.Protocol:
Now we can add our explicit dependency where it’s needed, replacing
+a hardcoded import with a new, explicit argument to a function somewhere.
+Possibly event with a type hint:
no need to remember to do mock.patch(), the function arguments
+ always require the dependency
+
+
Cons
+
+
we’ve added an “unnecessary” extra argument to our function
+
+
+
This change of an import to an explicit dependency is memorably advocated
+for in Yeray Díaz’s talk import as an antipattern
+
+
So far you may think the pros aren’t enough of a wow to justify the con?
+Well, if we take it one step further and really commit to DI, you may yet get
+on board.
+
OPTION: build your own fake for unit tests
+
Just like we can build our own fake for integration testing,
+we can build our own fake for unit tests too. Yes it’s more
+lines of code than mock_api = mock.Mock(), but it’s not a
+lot:
The fake is in-memory and in-process this time, but again, it’s just a
+thin wrapper around some sort of container, a dict in this case.
+
get_latest_eta() and sync() are the two methods we need to define
+to make it emulate the real api (and comply with the Protocol).
+
+
mypy will tell you when you get this right, or if you ever need to change it
+
+
The __contains__ is just a bit of syntactic sugar that lets us use
+assert in in our tests, which looks nice. It’s a Bob thing.
+
def test_create_shipment_syncs_to_api():
+ api = FakeCargoAPI()
+ shipment = create_shipment({'sku1': 10}, incoterm='EXW', cargo_api=api)
+ assert shipment in api
+
+
+
Why bother with this?
+
Handrolled fakes for unit tests, the tradeoffs
+
Pros:
+
+
tests can be more readable, no more mock.call_args == call(foo,bar) stuff
+
👉Our fake exerts design pressure on our Adapter’s API👈
+
+
Cons:
+
+
more code in tests
+
need to keep the fake in sync with the real thing
+
+
The design pressure is the killer argument in our opinion. Because hand-rolling
+a fake is more effort, it forces us to think about the API of our adapter,
+and it gives us an incentive to keep it simple.
+
If you think back to our initial decision to build a wrapper, in our toy example
+it was quite easy to decide what the adapter should look like, we just needed
+one public method called sync(). In real life it’s sometimes harder to figure
+out what belongs in an adapter, and what stays in business logic. By forcing
+ourselves to build a fake, we get to really see the shape of the thing that
+we’re abstracting out.
For bonus points, you can even share code between the fake class you use
+for your unit tests, and the fake you use for your integration tests.
+
+
Recap
+
+
+
As soon as your integration with an external API gets beyond the trivial,
+ mocking and patching starts to be quite painful
+
+
+
Consider abstracting out a wrapper around your API
+
+
+
Use integration tests to test your adapter, and unit tests for your
+ business logic (and to check that you call your adapter correctly)
+
+
+
Consider writing your own fakes for your unit tests. They will
+ help you find a good abstraction.
+
+
+
If you want a way for devs or CI to run tests without depending
+ on the external API, consider also writing a fully-functional fake of the
+ third-party API (an actual web server).
+
+
+
For bonus points, the two fakes can share code.
+
+
+
Selectively running integration tests against both the fake and the real API
+ can validate that both continue to work over time.
+
+
+
You could also consider adding more targeted “contract tests” for this purpose.
+
+
+
+
If you’d like to play around with the code from this blog post, you can
+check it out here
+
+
+
+
+
From 94298f168776714e0a8e028aeef480d329fc155d Mon Sep 17 00:00:00 2001
From: Harry
Date: Sun, 15 Mar 2020 19:00:05 +0000
Subject: [PATCH 03/27] up and running with milligram and pygments. done
command handler blog
---
.../2017-09-07-introducing-command-handler.md | 18 +-
generate-html.py | 2 +-
index.html | 4 +-
...017-09-07-introducing-command-handler.html | 279 ++++++----
...ry-and-unit-of-work-pattern-in-python.html | 221 +++++---
...mmands-and-queries-handlers-and-views.html | 196 ++++---
posts/2017-09-19-why-use-domain-events.html | 443 ++++++++-------
...2020-01-25-testing_external_api_calls.html | 520 ++++++++++--------
rss.xml | 2 +-
templates/blog_post_template.html | 39 +-
10 files changed, 980 insertions(+), 744 deletions(-)
diff --git a/blog/2017-09-07-introducing-command-handler.md b/blog/2017-09-07-introducing-command-handler.md
index 8dd6fc2..92c1919 100644
--- a/blog/2017-09-07-introducing-command-handler.md
+++ b/blog/2017-09-07-introducing-command-handler.md
@@ -123,11 +123,12 @@ concept of a Controller in an MVC architecture.
First, we create a Command object.
+```python
class ReportIssueCommand(NamedTuple):
reporter_name: str
reporter_email: str
problem_description: str
-
+```
A command object is a small object that represents a state-changing action that
can happen in the system. Commands have no behaviour, they're pure data
@@ -167,6 +168,7 @@ model. This has two major benefits:
In order to process our new command, we'll need to create a command handler.
+```python
class ReportIssueCommandHandler:
def __init__(self, issue_log):
self.issue_log = issue_log
@@ -177,7 +179,7 @@ class ReportIssueCommandHandler:
cmd.reporter_email)
issue = Issue(reported_by, cmd.problem_description)
self.issue_log.add(issue)
-
+```
Command handlers are stateless objects that orchestrate the behaviour of a
@@ -201,6 +203,7 @@ Since our command handlers are just glue code, we won't put any business logic
into them - they shouldn't be making any business decisions. For example, let's
skip ahead a little to a new command handler:
+```python
class MarkIssueAsResolvedHandler:
def __init__(self, issue_log):
self.issue_log = issue_log
@@ -210,7 +213,7 @@ class MarkIssueAsResolvedHandler:
# the following line encodes a business rule
if (issue.state != IssueStatus.Resolved):
issue.mark_as_resolved(cmd.resolution)
-
+```
This handler violates our glue-code principle because it encodes a business
rule: "If an issue is already resolved, then it can't be resolved a second
@@ -222,21 +225,23 @@ reason to prefer a class is that it can make dependency management a little
easier, but the two approaches are completely equivalent. For example, we could
rewrite our ReportIssueHandler like this:
+```python
def ReportIssue(issue_log, cmd):
reported_by = IssueReporter(
cmd.reporter_name,
cmd.reporter_email)
issue = Issue(reported_by, cmd.problem_description)
issue_log.add(issue)
-
+```
If magic methods make you feel queasy, you can define a handler to be a class
that exposes a handle method like this:
+```python
class ReportIssueHandler:
def handle(self, cmd):
...
-
+```
However you structure them, the important ideas of commands and handlers are:
@@ -257,6 +262,7 @@ other languages - Java, C#, C++ - I would usually have a single binary for each
layer. Splitting the packages up this way makes it easier to understand how the
dependencies work.
+```python
from typing import NamedTuple
from expects import expect, have_len, equal
@@ -345,7 +351,7 @@ class When_reporting_an_issue:
def it_should_have_recorded_the_description(self):
expect(self.issues[0].description).to(equal(desc))
-
+```
There's not a lot of functionality here, and our issue log has a couple of
problems, firstly there's no way to see the issues in the log yet, and secondly
diff --git a/generate-html.py b/generate-html.py
index 7342531..bce02fb 100755
--- a/generate-html.py
+++ b/generate-html.py
@@ -12,7 +12,7 @@
def main():
posts = glob.glob("blog/*.md")
- extensions = ['extra', 'smarty', 'meta']
+ extensions = ['extra', 'smarty', 'meta', 'codehilite']
_md = markdown.Markdown(extensions=extensions, output_format='html5')
loader = jinja2.FileSystemLoader(searchpath="./")
diff --git a/index.html b/index.html
index a734f2e..f0939e5 100644
--- a/index.html
+++ b/index.html
@@ -1,3 +1,4 @@
+
@@ -7,6 +8,7 @@
+
@@ -19,7 +21,7 @@
A command object is a small object that represents a state-changing action that
can happen in the system. Commands have no behaviour, they’re pure data
structures. There’s no reason why you have to represent them with classes, since
@@ -149,16 +171,19 @@
Introducing Command Handler
on any implementation details and can be invoked in multiple ways.
In order to process our new command, we’ll need to create a command handler.
-
class ReportIssueCommandHandler:
- def init(self, issue_log):
- self.issue_log = issue_log
Command handlers are stateless objects that orchestrate the behaviour of a
system. They are a kind of glue code, and manage the boring work of fetching and
saving objects, and then notifying other parts of the system. In keeping with
@@ -178,15 +203,18 @@
Introducing Command Handler
Since our command handlers are just glue code, we won’t put any business logic
into them - they shouldn’t be making any business decisions. For example, let’s
skip ahead a little to a new command handler:
-
class MarkIssueAsResolvedHandler:
- def init(self, issue_log):
- self.issue_log = issue_log
-
def __call__(self, cmd):
- issue = self.issue_log.get(cmd.issue_id)
- # the following line encodes a business rule
- if (issue.state != IssueStatus.Resolved):
- issue.mark_as_resolved(cmd.resolution)
-
+
classMarkIssueAsResolvedHandler:
+ def__init__(self,issue_log):
+ self.issue_log=issue_log
+
+ def__call__(self,cmd):
+ issue=self.issue_log.get(cmd.issue_id)
+ # the following line encodes a business rule
+ if(issue.state!=IssueStatus.Resolved):
+ issue.mark_as_resolved(cmd.resolution)
+
+
+
This handler violates our glue-code principle because it encodes a business
rule: “If an issue is already resolved, then it can’t be resolved a second
time”. This rule belongs in our domain model, probably in the mark_as_resolved
@@ -196,17 +224,23 @@
Introducing Command Handler
reason to prefer a class is that it can make dependency management a little
easier, but the two approaches are completely equivalent. For example, we could
rewrite our ReportIssueHandler like this:
-
However you structure them, the important ideas of commands and handlers are:
Commands are logic-free data structures with a name and a bunch of values.
@@ -225,81 +259,104 @@
Introducing Command Handler
other languages - Java, C#, C++ - I would usually have a single binary for each
layer. Splitting the packages up this way makes it easier to understand how the
dependencies work.
-
from typing import NamedTuple
-from expects import expect, have_len, equal
-
Domain model
-
class IssueReporter:
- def init(self, name, email):
- self.name = name
- self.email = email
There’s not a lot of functionality here, and our issue log has a couple of
problems, firstly there’s no way to see the issues in the log yet, and secondly
we’ll lose all of our data every time we restart the process. We’ll fix the
second of those in the next part
[https://io.made.com/blog/repository-and-unit-of-work-pattern-in-python/].
-
-
+
+
+
+
\ No newline at end of file
diff --git a/posts/2017-09-08-repository-and-unit-of-work-pattern-in-python.html b/posts/2017-09-08-repository-and-unit-of-work-pattern-in-python.html
index 63ca1bd..aafd075 100644
--- a/posts/2017-09-08-repository-and-unit-of-work-pattern-in-python.html
+++ b/posts/2017-09-08-repository-and-unit-of-work-pattern-in-python.html
@@ -1,15 +1,34 @@
-
-
+
+
+
+
+
+
+
+
+
+
-
-
Repository and Unit of Work Pattern
-
by Bob,
+
+
-
-
+
+
+
+
Repository and Unit of Work Pattern
+
by Bob,
+
+
+
In the previous part
(Introducing Command Handler)
of this series we built a toy system that could add a new Issue to an IssueLog, but
@@ -46,17 +65,19 @@
Repository and Unit of Work Pattern
out. The input and output channels are part of our circuit: without them, the
circuit is useless.
Because we had the great foresight to use standardised ports, we can plug any
number of different devices into our circuit. For example, we could attach a
light-detector to the input and a buzzer to the output, or we could attach a
@@ -85,20 +106,24 @@
+
-def add(self, issue):
- with open(self.path, 'w') as f:
- json.dump(f)
-
By analogy to our circuit example, the IssueLog is a WriteablePort - it’s a way
for us to get data out of the system. SqlAlchemy and the file system are two
types of adapter that we can plug in, just like the Buzzer or Light classes. In
@@ -111,16 +136,18 @@
Repository and Unit of Work Pattern
class FooRepository:
def init(self, db_session):
self.session = db_session
We expose a few methods, one to add new items, one to get items by their id, and
a third to find items by some criterion. This FooRepository is using a
SqlAlchemy session
@@ -130,17 +157,19 @@
Repository and Unit of Work Pattern
class FooRepository:
def init(self, db_session):
self.items = []
-def get_item(self, id):
- return next((item for item in self.items
- if item.id == id))
-def find_foos_by_latitude(self, latitude):
- return (item for item in self.items
- if item.latitude == latitude)
-
This adapter works just the same as the one backed by a real database, but does
so without any external state. This allows us to test our code without resorting
to Setup/Teardown scripts on our database, or monkey patching our ORM to return
@@ -163,37 +192,41 @@
-def rollback(self):
- self.session.rollback()
-# I tend to put my repositories onto my UOW
-# for convenient access.
-@property
-def issues(self):
- return IssueRepository(self.session)
-
This code is taken from a current production system - the code to implement
these patterns really isn’t complex. The only thing missing here is some logging
and error handling in the commit method. Our unit-of-work manager creates a new
@@ -204,13 +237,15 @@
Repository and Unit of Work Pattern
class ReportIssueHandler:
def init(self, uowm:UnitOfWorkManager):
self.uowm = uowm
Our command handler looks more or less the same, except that it’s now
responsible for starting a unit-of-work, and committing the unit-of-work when it
has finished. This is in keeping with our rule #1 - we will clearly define the
@@ -240,30 +275,34 @@
Repository and Unit of Work Pattern
because we have a pure domain model and we expose abstract ports from our
service layer.
Next time [https://io.made.com/blog/commands-and-queries-handlers-and-views]
we’ll look at how to get data back out of the system.
-
-
+
+
+
+
\ No newline at end of file
diff --git a/posts/2017-09-13-commands-and-queries-handlers-and-views.html b/posts/2017-09-13-commands-and-queries-handlers-and-views.html
index dd41874..3d3aee2 100644
--- a/posts/2017-09-13-commands-and-queries-handlers-and-views.html
+++ b/posts/2017-09-13-commands-and-queries-handlers-and-views.html
@@ -1,15 +1,34 @@
-
-
+
+
+
+
+
+
+
+
+
+
-
-
Commands, Handlers, Queries and Views
-
by Bob,
+
+
-
-
+
+
+
+
Commands, Handlers, Queries and Views
+
by Bob,
+
+
+
In the first and second parts of this series I introduced the
Command-Handler
and
@@ -32,16 +51,17 @@
What is CQS ?
Referential transparency is an important concept from functional programming.
Briefly, a function is referentially transparent if you could replace it with a
static value.
In this class, the is_on method is referentially transparent - I can replace it
with the value True or False without any loss of functionality, but the method
@@ -64,51 +84,54 @@
What is CQS ?
data back out of our model? What is the equivalent port for queries?
The answer is “it depends”. The lowest-cost option is just to re-use your
repositories in your UI entrypoints.
-
@app.route("/issues")
-def list_issues():
- with unit_of_work_manager.start() as unit_of_work:
- open_issues = unit_of_work.issues.find_by_status('open')
- return json.dumps(open_issues)
-
This is totally fine unless you have complex formatting, or multiple entrypoints
to your system. The problem with using your repositories directly in this way is
that it’s a slippery slope. Sooner or later you’re going to have a tight
deadline, and a simple requirement, and the temptation is to skip all the
command/handler nonsense and do it directly in the web api.
-
@app.route('/issues/<issue_id>', methods=['DELETE'])
-def delete_issue(issue_id):
- with unit_of_work_manager.start() as uow:
- issue = uow.issues[issue_id]
- issue.delete()
- uow.commit()
-
- if issue is None:
- logging.warn("Issue not found")
- flask.abort(404)
- if issue.status != 'deleted':
- issue.delete()
- uow.commit()
- try:
- smtp.send_notification(Issue.Deleted, issue_id)
- except:
- logging.error(
- "Failed to send email notification for deleted issue "
- + str(issue_id), exn_info=True)
- else:
- logging.info("Issue already deleted. NOOP")
- return "Deleted!", 202
-
Aaaaand, we’re back to where we started: business logic mixed with glue code,
and the whole mess slowly congealing in our web controllers. Of course, the
@@ -118,24 +141,25 @@
What is CQS ?
it’s all good, you have my blessing. If you want to avoid this, because your
reads are complex, or because you’re trying to stay pure, then instead we could
define our views explicitly.
-
class OpenIssuesList:
+
classOpenIssuesList:
+
+ def__init__(self,sessionmaker):
+ self.sessionmaker=sessionmaker
- def __init__(self, sessionmaker):
- self.sessionmaker = sessionmaker
+ deffetch(self):
+ withself.sessionmaker()assession:
+ result=session.execute(
+ 'SELECT reporter_name, timestamp, title
+ FROMissuesWHEREstate="open"')
+ return[dict(r)forrinresult.fetchall()]
- def fetch(self):
- with self.sessionmaker() as session:
- result = session.execute(
- 'SELECT reporter_name, timestamp, title
- FROM issues WHERE state="open"')
- return [dict(r) for r in result.fetchall()]
+@api.route('/issues/')
+deflist_issues():
+ view_builder=OpenIssuesList(session_maker)
+ returnjsonify(view_builder.fetch())
+
This is my favourite part of teaching ports and adapters to junior programmers,
because the conversation inevitably goes like this:
@@ -166,15 +190,16 @@
Why have a separate read-model?
most common mistake is not paying enough attention to the boundaries of their
use cases. This leads to the application making far too many calls to the
database because people write code like this:
-
# Find all users who are assigned this task
-# [[and]] notify them and their line manager
-# then move the task to their in-queue
-notification = task.as_notification()
-for assignee in task.assignees:
- assignee.manager.notifications.add(notification)
- assignee.notifications.add(notification)
- assignee.queues.inbox.add(task)
-
+
# Find all users who are assigned this task
+# [[and]] notify them and their line manager
+# then move the task to their in-queue
+notification=task.as_notification()
+forassigneeintask.assignees:
+ assignee.manager.notifications.add(notification)
+ assignee.notifications.add(notification)
+ assignee.queues.inbox.add(task)
+
+
ORMs make it very easy to “dot” through the object model this way, and pretend
that we have our data in memory, but this quickly leads to performance issues
@@ -248,17 +273,18 @@
Application Controlled Identifiers
The way I would recommend you handle this is simple - instead of letting your
database choose ids for you, just choose them yourself.
-
@api.route('/issues', methods=['POST'])
-def report_issue(self):
- # uuids make great domain-controlled identifiers, because
- # they can be shared amongst several systems and are easy
- # to generate.
- issue_id = uuid.uuid4()
+
@api.route('/issues',methods=['POST'])
+defreport_issue(self):
+ # uuids make great domain-controlled identifiers, because
+ # they can be shared amongst several systems and are easy
+ # to generate.
+ issue_id=uuid.uuid4()
+
+ cmd=ReportIssueCommand(issue_id,**request.get_json())
+ handler.handle(cmd)
+ return"",201,{'Location':'/issues/'+str(issue_id)}
+
There’s a few ways to do this, the most common is just to use a UUID, but you
can also implement something like
@@ -278,6 +304,8 @@
Application Controlled Identifiers
drive our application from tests or Flask. Most importantly, the layers on the
outside depend on the layers toward the centre.
Next time I’ll get back to talking about message buses.
-
-
+
+
+
+
\ No newline at end of file
diff --git a/posts/2017-09-19-why-use-domain-events.html b/posts/2017-09-19-why-use-domain-events.html
index 89b07b8..d1bbfa8 100644
--- a/posts/2017-09-19-why-use-domain-events.html
+++ b/posts/2017-09-19-why-use-domain-events.html
@@ -1,15 +1,34 @@
-
-
+
+
+
+
+
+
+
+
+
+
-
-
Why use domain events?
-
by Bob,
+
+
-
-
+
+
+
+
Why use domain events?
+
by Bob,
+
+
+
Nota bene: this instalment in the Ports and Adapters with Command Handlers
series is code-heavy, and isn’t going to make much sense unless you’ve read the
previous parts:
@@ -70,34 +89,38 @@
Why use domain events?
Let’s quickly flesh out the triage use cases. We’ll start by updating the
existing unit test for reporting an issue:
We’re introducing a new concept - Issues now have a state, and a newly reported
issue begins in the AwaitingTriage state. We can quickly add a command and
handler that allows us to triage an issue.
Triaging an issue, for now, is a matter of selecting a category and priority.
We’ll use a free string for category, and an enumeration for Priority. Once an
issue is triaged, it enters the AwaitingAssignment state. At some point we’ll
@@ -105,15 +128,17 @@
Why use domain events?
assignment, but for now let’s quickly add a handler so that an engineer can Pick
an issue from the queue.
At this point, the handlers are becoming a little boring. As I said way back in
the first part [https://io.made.com/blog/introducing-command-handler/], commands
handlers are supposed to be boring glue-code, and every command handler has the
@@ -131,31 +156,33 @@
Why use domain events?
A brief discourse on SRP
Let’s try and implement this new requirement. Here’s a first attempt:
Something here feels wrong, right? Our command-handler now has two very distinct
responsibilities. Back at the beginning of this series we said we would stick
with three principles:
@@ -186,18 +213,20 @@
Why use domain events?
I don’t want to see an error - let the sysadmins clear it up later.
What if we split out the notification to another class?
We don’t really need a unit of work here, because we’re not making any
persistent changes to the Issue state, so what if we use a view builder instead?
class SendAssignmentEmailHandler
@@ -226,15 +257,17 @@
That seems better, but how should we invoke our new handler? Building a new
command and handler from inside our AssignIssueHandler also sounds like a
violation of SRP. Worse still, if we start calling handlers from handlers, we’ll
@@ -280,27 +313,29 @@
Why use domain events?
How do we start off writing a message bus? Well, it needs to look up subscribers
based on the name of an event. That sounds like a dict to me:
class MessageBus:
-
def __init__(self):
- """Our message bus is just a mapping from message type
- to a list of handlers"""
- self.subscribers = defaultdict(list)
-
-def handle(self, msg):
- """The handle method invokes each handler in turn
- with our event"""
- msg_name = type(msg).__name__
- subscribers = self.subscribers[msg_name]
- for subscriber in subscribers:
- subscriber.handle(cmd)
-
-def subscribe_to(self, msg, handler):
- """Subscribe sets up a new mapping, we make sure not
- to allow more than one handler for a command"""
- subscribers = [msg.__name__]
- if msg.is_cmd and len(subscribers) > 0:
- raise CommandAlreadySubscribedException(msg.__name__)
- subscribers.append(handler)
-
+
def__init__(self):
+"""Our message bus is just a mapping from message type
+ to a list of handlers"""
+self.subscribers=defaultdict(list)
+
+defhandle(self,msg):
+"""The handle method invokes each handler in turn
+ with our event"""
+msg_name=type(msg).__name__
+subscribers=self.subscribers[msg_name]
+forsubscriberinsubscribers:
+subscriber.handle(cmd)
+
+defsubscribe_to(self,msg,handler):
+"""Subscribe sets up a new mapping, we make sure not
+ to allow more than one handler for a command"""
+subscribers=[msg.__name__]
+ifmsg.is_cmdandlen(subscribers)>0:
+raiseCommandAlreadySubscribedException(msg.__name__)
+subscribers.append(handler)
+
+
+
Example usage
bus = MessageBus()
bus.subscribe_to(ReportIssueCommand, ReportIssueHandler(db.unit_of_work_manager))
@@ -326,18 +361,20 @@
Example usage
several options for doing this. Usually I raise events from my command handlers,
like this:
class AssignIssueHandler:
-
def handle(self, cmd):
- with self.uowm.start() as uow:
- issue = uow.issues.get(cmd.id)
- issue.assign_to(cmd.assigned_to, cmd.assigned_by)
- uow.commit()
-
- # This is step 4: notify other parts of the system
- self.bus.raise(IssueAssignedToEngineer(
- cmd.issue_id,
- cmd.assigned_to,
- cmd.assigned_by))
-
I usually think of this event-raising as a kind of glue - it’s orchestration
code. Raising events from your handlers this way makes the flow of messages
explicit - you don’t have to look anywhere else in the system to understand
@@ -348,47 +385,53 @@
Example usage
handler? Another option is to send events directly from our model objects, and
treat them as part our domain model proper.
+
- # Add our new event to a list
- self.events.add(IssueAssignedToEngineer(self.id, self.assigned_to, self.assigned_by))
-
There’s a couple of benefits of doing this: firstly, it keeps our command
handler simpler, but secondly it pushes the logic for deciding when to send an
event into the model. For example, maybe we don’t always need to raise the
event.
class Issue:
-
def assign_to(self, assigned_to, assigned_by):
- self.assigned_to = assigned_to
- self.assigned_by = assigned_by
-
- # don't raise the event if I picked the issue myself
- if self.assigned_to != self.assigned_by:
- self.events.add(IssueAssignedToEngineer(self.id, self.assigned_to, self.assigned_by))
-
Now we’ll only raise our event if the issue was assigned by another engineer.
Cases like this are more like business logic than glue code, so today I’m
choosing to put them in my domain model. Updating our unit tests is trivial,
because we’re just exposing the events as a list on our model objects:
issue_id=uuid.uuid4()
+assigned_to='ashley@example.org'
+assigned_by='laura@example.org'
+
+defgiven_a_new_issue(self):
+self.issue=Issue(self.issue_id,'reporter@example.org','how do I even?')
+
+defbecause_we_assign_the_issue(self):
+self.issue.assign(self.assigned_to,self.assigned_by)
+
+defwe_should_raise_issue_assigned(self):
+expect(self.issue).to(have_raised(
+IssueAssignedToEngineer(self.issue_id,
+self.assigned_to,
+self.assigned_by)))
+
+
+
The have_raised function is a custom matcher I wrote that checks the events
attribute of our object to see if we raised the correct event. It’s easy to test
for the presence of events, because they’re namedtuples, and have value
@@ -401,46 +444,48 @@
Example usage
which objects have changed, and queue up their events. When the unit of work
exits, we raise the events.
class SqlAlchemyUnitOfWork(UnitOfWork):
-
def __init__(self, sessionfactory, bus):
- self.sessionfactory = sessionfactory
- self.bus = bus
- # We want to listen to flush events so that we can get events
- # from our model objects
- event.listen(self.sessionfactory, "after_flush", self.gather_events)
-
-def __enter__(self):
- self.session = self.sessionfactory()
- # When we first start a unit of work, create a list of events
- self.flushed_events = []
- return self
-
-def commit(self):
- self.session.flush()
- self.session.commit()
-
-def rollback(self):
- self.session.rollback()
- # If we roll back our changes we should drop all the events
- self.events = []
-
-def gather_events(self, session, ctx):
- # When we flush changes, add all the events from our new and
- # updated entities into the events list
- flushed_objects = ([e for e in session.new]
- + [e for e in session.dirty])
- for e in flushed_objects:
- self.flushed_events += e.events
-
-def publish_events(self):
- # When the unit of work completes
- # raise any events that are in the list
- for e in self.flushed_events:
- self.bus.handle(e)
-
-def __exit__(self, type, value, traceback):
- self.session.close()
- self.publish_events()
-
Okay, we’ve covered a lot of ground here. We’ve discussed why you might want to
use domain events, how a message bus actually works in practice, and how we can
get events out of our domain and into our subscribers. The newest code sample
@@ -456,6 +501,8 @@
Example usage
the system are equal, glue code is just glue.
Next time I want to talk about Dependency Injection, why it’s great, and why
it’s nothing to be afraid of.
-
-
+
+
+
+
\ No newline at end of file
diff --git a/posts/2020-01-25-testing_external_api_calls.html b/posts/2020-01-25-testing_external_api_calls.html
index a0eb8ba..bcd40f8 100644
--- a/posts/2020-01-25-testing_external_api_calls.html
+++ b/posts/2020-01-25-testing_external_api_calls.html
@@ -1,15 +1,34 @@
-
-
-
-
-
Writing tests for external API calls
-
by Harry,
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
Writing tests for external API calls
+
by Harry,
+
+
+
Here’s a common question from people doing testing in Python:
How do I write tests for for code that calls out to a third-party API?
@@ -29,24 +48,24 @@
Writing tests for external API calls
estimated time of arrival (eta) and a bit of jargon called the incoterm
(you don’t need to understand what that is, I’m just trying to illustrate a bit
of real-life complexity, in this small example).
-
@dataclass
-class OrderLine:
- sku: str # sku="stock keeping unit", it's a product id basically
- qty: int
+
@dataclass
+classOrderLine:
+ sku:str# sku="stock keeping unit", it's a product id basically
+ qty:int
-@dataclass
-class Shipment:
- reference: str
- lines: List[OrderLine]
- eta: Optional[date]
- incoterm: str
+@dataclass
+classShipment:
+ reference:str
+ lines:List[OrderLine]
+ eta:Optional[date]
+ incoterm:str
- def save(self):
- ... # for the sake of the example, let's imagine the model
- # knows how to save itself to the DB. like Django.
+ defsave(self):
+ ...# for the sake of the example, let's imagine the model
+ # knows how to save itself to the DB. like Django.
+
-
We want to sync our shipments model with a third party, the cargo freight
company, via their API. We have a couple of use cases: creating new shipments,
@@ -55,42 +74,45 @@
Writing tests for external API calls
takes a dict mapping skus to quantities, creates our model objects, saves them, and
then calls a helper function to sync to the API. Hopefully this sort of thing
looks familiar:
-
And you can imagine adding a few more tests, perhaps one that checks that we do
the date-to-isoformat conversion correctly, maybe one that checks we can handle
@@ -101,36 +123,37 @@
Writing tests for external API calls
So we first need to check whether they have a shipment on file, using
a GET request, and then we either do a POST if it’s new, or a PUT for
an existing one:
-
def sync_to_api(shipment):
- external_shipment_id = get_shipment_id(shipment.reference)
- if external_shipment_id is None:
- requests.post(f'{API_URL}/shipments/', json={
- 'client_reference': shipment.reference,
- 'arrival_date': shipment.eta,
- 'products': [
- {'sku': ol.sku, 'quantity': ol.quantity}
- for ol in shipment.lines
- ]
- })
-
- else:
- requests.put(f'{API_URL}/shipments/{external_shipment_id}', json={
- 'client_reference': shipment.reference,
- 'arrival_date': shipment.eta,
- 'products': [
- {'sku': ol.sku, 'quantity': ol.quantity}
- for ol in shipment.lines
- ]
- })
-
-
-def get_shipment_id(our_reference) -> Optional[str]:
- their_shipments = requests.get(f"{API_URL}/shipments/").json()['items']
- return next(
- (s['id'] for s in their_shipments if s['client_reference'] == our_reference),
- None
- )
-
…and our tests are getting less and less pleasant. Again, the details don’t
matter too much, the hope is that this sort of test ugliness is familiar.
@@ -170,28 +194,29 @@
Writing tests for external API calls
about writes, but what about reads? Say we want to poll our third party api
now and again to get updated etas for our shipments. Depending on the eta, we
have some business logic about notifying people of delays…
-
# another example controller,
-# showing business logic getting intermingled with API calls
-
-def get_updated_eta(shipment):
- external_shipment_id = get_shipment_id(shipment.reference)
- if external_shipment_id is None:
- logging.warning('tried to get updated eta for shipment %s not yet sent to partners', shipment.reference)
- return
-
- [journey] = requests.get(f"{API_URL}/shipments/{external_shipment_id}/journeys").json()['items']
- latest_eta = journey['eta']
- if latest_eta == shipment.eta:
- return
- logging.info('setting new shipment eta for %s: %s (was %s)', shipment.reference, latest_eta, shipment.eta)
- if shipment.eta is not None and latest_eta > shipment.eta:
- notify_delay(shipment_ref=shipment.reference, delay=latest_eta - shipment.eta)
- if shipment.eta is None and shipment.incoterm == 'FOB' and len(shipment.lines) > 10:
- notify_new_large_shipment(shipment_ref=shipment.reference, eta=latest_eta)
-
- shipment.eta = latest_eta
- shipment.save()
-
+
# another example controller,
+# showing business logic getting intermingled with API calls
+
+defget_updated_eta(shipment):
+ external_shipment_id=get_shipment_id(shipment.reference)
+ ifexternal_shipment_idisNone:
+ logging.warning('tried to get updated eta for shipment %s not yet sent to partners',shipment.reference)
+ return
+
+ [journey]=requests.get(f"{API_URL}/shipments/{external_shipment_id}/journeys").json()['items']
+ latest_eta=journey['eta']
+ iflatest_eta==shipment.eta:
+ return
+ logging.info('setting new shipment eta for %s: %s (was %s)',shipment.reference,latest_eta,shipment.eta)
+ ifshipment.etaisnotNoneandlatest_eta>shipment.eta:
+ notify_delay(shipment_ref=shipment.reference,delay=latest_eta-shipment.eta)
+ ifshipment.etaisNoneandshipment.incoterm=='FOB'andlen(shipment.lines)>10:
+ notify_new_large_shipment(shipment_ref=shipment.reference,eta=latest_eta)
+
+ shipment.eta=latest_eta
+ shipment.save()
+
+
I haven’t coded up what all the tests would look like, but you could imagine them:
@@ -248,36 +273,37 @@
SUGGESTION: Build an Adapter (a wrapper for the external API)
but you don’t have to go full-on hexagonal architecture to use
this pattern.
-
class RealCargoAPI:
- API_URL = 'https://example.org'
+
SUGGESTION: Use (only?) integration tests to test your Adapter
Now we can test our adapter separately from our main application code, we
can have a think about what the best way to test it is. Since it’s just
a thin wrapper around an external system, the best kinds of tests are integration
tests:
- shipments = requests.get(api.api_url + '/shipments/').json()['items']
- new_shipment = next(s for s in shipments if s['client_reference'] == ref)
- assert new_shipment['products'] == [{'sku': 'sku1', 'quantity': 20}]
-
That relies on your third-party api having a decent sandbox that you can test against.
You’ll need to think about:
@@ -448,36 +476,37 @@
OPTION: Build your own fake for integration tests
talk to that instead of the real API.
Faking a third party is often quite simple. A REST API around a CRUD data model
might just pop json objects in an out of an in-memory dict, for example:
This doesn’t mean you never test against the third-party API, but
you’ve now given yourself the option not to.
@@ -539,35 +568,38 @@
OPTION: DI
First, you might like to define an interface for your dependency explicitly.
You could use an abc.ABC, or if you’re anti-inheritance, a newfangled
typing.Protocol:
Now we can add our explicit dependency where it’s needed, replacing
a hardcoded import with a new, explicit argument to a function somewhere.
Possibly event with a type hint:
we’ll lose all of our data every time we restart the process. We’ll fix the
second of those in the next part
[https://io.made.com/blog/repository-and-unit-of-work-pattern-in-python/].
-
In the previous part
(Introducing Command Handler)
of this series we built a toy system that could add a new Issue to an IssueLog, but
@@ -301,7 +302,9 @@
Repository and Unit of Work Pattern
Next time [https://io.made.com/blog/commands-and-queries-handlers-and-views]
we’ll look at how to get data back out of the system.
Nota bene: this instalment in the Ports and Adapters with Command Handlers
series is code-heavy, and isn’t going to make much sense unless you’ve read the
previous parts:
@@ -501,7 +502,9 @@
Example usage
the system are equal, glue code is just glue.
Next time I want to talk about Dependency Injection, why it’s great, and why
it’s nothing to be afraid of.