This is an semi-experimental approach to solve the problem of having multiple orca
installations (each with its own database) communicate changes with each other, for instance in a multi-region Spinnaker installation or during a database migration.
Definitions:
-
peer
Anorca
cluster whose database (can be a read replica) we copy data from. Eachorca
cluster has an ID, for exampleus-east-1
orus-west-2
.
A peer is defined by specifying its database connection AND its ID (in the yaml config).
For example,orca
cluster with IDus-west-2
could peerorca
cluster with IDus-east-1
, and vice-versa -
partition
The executions stored in a DB are tagged with a partition, this is synonymous with peer ID described above.
When an execution is "peered" (copied) from a peer with IDus-east-1
that execution will be persisted in our local database with thepartition
set tous-east-1
.
Note: for historical reasons, the partition has been omitted in the executions. Therefore, anorca
cluster will consider executions withpartition = NULL
ORpartition = MY_PARTITION_ID
to be owned by this cluster. -
foreign executions
Foreign executions are executions that show up in the local database but are marked withpartition
of our peer.
These executions are essentially read-only and the currentorca
cluster can't perform any actions on these executions.
The peering mechanism accomplishes a few things:
- Peer (copy) executions (both pipelines and orchestrations) from a database of a peer to the local cluster database
- Allow for an
orca
cluster to perform actions on a foreign execution (e.g. executions running on a peer, seeorca-interlink
module) - (still to come) Take ownership and resume an execution previously operated on by a peer
Execution peering is essentially copying of executions from one database to another.
In a typical topology for orca
a single orca
cluster will use a single [sql] database.
The database stores all execution history as well as the execution queue.
The history needs to be peered but the queue not be peered/replicated as that would cause issues with duplicate executions, etc.
(additionally, the queue is extremely high bandwidth/change rate so replicating it would be difficult/require a lot of overhead on the DB)
Logic for peering lives in PeeringAgent.kt, see comments for details on the algorithm.
At a high level the idea is:
- Given a peer ID and its database connection (can be pointed to readonly replica)
- Mirror all foreign executions with the specified peer ID to the local database
- During copy, all executions get annotated as coming from the specified peer (
partition
column) - Any attempt to operate on a foreign execution (one with
partition != our ID
) will fail
The user can perform the following actions on an execution via the UI/API (orca
mutates the execution based on these actions):
- cancel an execution
- pause an execution
- resume an execution
- pass judgement on an execution
- delete an execution
These operations must take place on the cluster/instance that owns the execution. TBD
TBD
- Only MySQL is supported at this time, but this could easily be extended by a new SqlRawAccess implementation for the given DB engine
- It is recommended that only one instance run the
peering
agent/profile. This will likely be improved on in the future but today, cross instance locking is not there
Consider this reference peering
profile in orca.yml
:
spring:
profiles: peering
pollers:
peering:
enabled: true
poolName: foreign
id: us-west-2
intervalMs: 5000 # This is the default value
threadCount: 30 # This is the default value
chunkSize: 100 # This is the default value
clockDriftMs: 5000 # This is the default value
queue:
redis:
enabled: false
keiko:
queue:
enabled: false
sql:
enabled: true
foreignBaseUrl: URL_OF_MYSQL_DB_TO_PEER_FROM:3306
partitionName: LOCAL_PARTITION_NAME
connectionPools:
foreign:
jdbcUrl: jdbc:mysql://${sql.foreignBaseUrl}/orca?ADD_YOUR_PREFFERED_CONNECTION_STRING_PARAMS_HERE
user: orca_service
password: ${sql.passwords.orca_service}
connectionTimeoutMs: 5000
validationTimeoutMs: 5000
maxPoolSize: ${pollers.peering.threadCount}
Parameter | Default | Notes |
---|---|---|
pollers.peering.enabled |
false |
used to enabled or disable peering |
pollers.peering.poolName |
[REQUIRED] | name of the pool to use for foreign database, see sql.connectionPools.foreign above |
pollers.peering.id |
[REQUIRED] | id of the peer, this must be unique for each database |
pollers.peering.intervalMs |
5000 |
interval to run migrations at (each run performs a delta copy). Shorter = less lag but more CPU and DB load |
pollers.peering.threadCount |
30 |
number of threads to use to perform bulk migration. A large number here only helps with the initial bulk import. After that, the delta is usually small enough that anything above 2 is unlikely to make a difference |
pollers.peering.chunkSize |
100 |
chunk size used when copying data (this is the max number of rows that will be modified at a time) |
pollers.peering.clockDriftMs |
5000 |
allows for this much clock drift across orca instances operating on a single DB |
pollers.peering.maxAllowedDeleteCount |
100 |
maximum number of executions to delete at a time. If the delete Δ > this number no deletes occur and the error metric is incremented. This is to prevent accidental deletion of all executions; the number can be set via DynamicConfigService |
Notes about some of the parameters above:
-
pollers.peering.intervalMs
This is the interval at which the peering agent runs (e.g. time between previous run of the agent completing and a new run starting).
Together with the time it takes to perform a single run of the agent this determines the latency of copying an execution from one DB to another.
As a reference, on a MySQL orca installation with about 6million past executions and about 200 active executions at any time, a single agent run will take about 8s.
The smaller the number the the shorter the peering lag, but heavier load on the database. -
pollers.peering.clockDriftMs
This settings defines the "fudge" factor in comparing theupdated_at
timestamp on an execution.
Because the timestamp is written by the instances (and not the DB itself) there is no guarantee that the timestamps on the instances are synchronized. -
pollers.peering.maxAllowedDeleteCount
This setting provides a safety mechanism from catastrophic failure of the peering agent in the event that it (incorrectly) decides to delete all executions from the local DB. This number should be bigger than the maximum number of executions deleted during regular operation but much smaller than the total number of executions present.
A ballpark of 0.25% of all executions is probably reasonable.
The following metrics are emitted by the peering agent and can/should be used for monitoring health of the peering system.
Parameter | Notes |
---|---|
pollers.peering.lag |
Timer (seconds) of how long it takes to perform a single migration loop, this + the agent intervalMs is the effective lag. This should be a fairly steady number |
pollers.peering.numPeered |
Counter of number of copied executions (should look fairly steady - i.e. mirror the number of active executions) |
pollers.peering.numDeleted |
Counter of number of deleted executions |
pollers.peering.numStagesDeleted |
Counter of number of stages deleted during copy, purely informational |
pollers.peering.numErrors |
Counter of errors encountered during execution copying (this should be alerted on) |
If using the peering feature, it is recommended that you configure alerts for the following metrics:
pollers.peering.numErrors > 0
pollers.peering.numPeered == 0
for some period of time (depends on your steady stage of active executions)pollers.peering.lag > 60
for some period of time (~3 minutes)
The following dynamic properties are exposed and can be controlled at runtime via DynamicConfigService
.
Property | Default | Notes |
---|---|---|
pollers.peering.enabled |
true |
if set to false turns off all peering |
pollers.peering.<PEERID>.enabled |
true |
if set to false turns off all peering for peer with given ID |
pollers.peering.max-allowed-delete-count |
100 | maximum number of executions that are allowed to be deleted in a single run of the agent |