-
Notifications
You must be signed in to change notification settings - Fork 20
/
Copy pathtransaction.go
1727 lines (1530 loc) · 67.3 KB
/
transaction.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
Copyright 2024 Blnk Finance Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package blnk
import (
"context"
"database/sql"
"fmt"
"log"
"math/big"
"strings"
"sync"
"time"
"github.com/jerry-enebeli/blnk/config"
redlock "github.com/jerry-enebeli/blnk/internal/lock"
"github.com/jerry-enebeli/blnk/internal/notification"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"github.com/sirupsen/logrus"
"go.opentelemetry.io/otel"
"github.com/jerry-enebeli/blnk/model"
)
var (
tracer = otel.Tracer("blnk.transactions")
)
const (
StatusQueued = "QUEUED"
StatusApplied = "APPLIED"
StatusScheduled = "SCHEDULED"
StatusInflight = "INFLIGHT"
StatusVoid = "VOID"
StatusCommit = "COMMIT"
StatusRejected = "REJECTED"
)
// getTxns is a function type that retrieves a batch of transactions based on the parent transaction ID, batch size, and offset.
//
// Parameters:
// - ctx context.Context: The context for the operation.
// - parentTransactionID string: The ID of the parent transaction.
// - batchSize int: The number of transactions to retrieve in a batch.
// - offset int64: The offset for pagination.
//
// Returns:
// - []*model.Transaction: A slice of pointers to the retrieved Transaction models.
// - error: An error if the transactions could not be retrieved.
type getTxns func(ctx context.Context, parentTransactionID string, batchSize int, offset int64) ([]*model.Transaction, error)
// transactionWorker is a function type that processes transactions from a job channel and sends the results to a results channel.
//
// Parameters:
// - ctx context.Context: The context for the operation.
// - jobs <-chan *model.Transaction: A channel from which transactions are received for processing.
// - results chan<- BatchJobResult: A channel to which the results of the processing are sent.
// - wg *sync.WaitGroup: A wait group to synchronize the completion of the worker.
// - amount float64: The amount to be processed in the transaction.
type transactionWorker func(ctx context.Context, jobs <-chan *model.Transaction, results chan<- BatchJobResult, wg *sync.WaitGroup, amount float64)
// BatchJobResult represents the result of processing a transaction in a batch job.
//
// Fields:
// - Txn *model.Transaction: A pointer to the processed Transaction model.
// - Error error: An error if the transaction could not be processed.
type BatchJobResult struct {
Txn *model.Transaction
Error error
}
// getSourceAndDestination retrieves the source and destination balances for a transaction.
// It checks if the source or destination starts with "@", indicating the need to create or retrieve a balance by indicator.
// If not, it retrieves the balances by their IDs.
//
// Parameters:
// - ctx context.Context: The context for the operation.
// - transaction *model.Transaction: The transaction for which to retrieve the balances.
//
// Returns:
// - source *model.Balance: A pointer to the source Balance model.
// - destination *model.Balance: A pointer to the destination Balance model.
// - err error: An error if the balances could not be retrieved.
func (l *Blnk) getSourceAndDestination(ctx context.Context, transaction *model.Transaction) (source *model.Balance, destination *model.Balance, err error) {
ctx, span := tracer.Start(ctx, "GetSourceAndDestination")
defer span.End()
var sourceBalance, destinationBalance *model.Balance
// Check if Source starts with "@"
if strings.HasPrefix(transaction.Source, "@") {
sourceBalance, err = l.getOrCreateBalanceByIndicator(ctx, transaction.Source, transaction.Currency)
if err != nil {
span.RecordError(err)
logrus.Errorf("source error %v", err)
return nil, nil, err
}
// Update transaction source with the balance ID
transaction.Source = sourceBalance.BalanceID
span.SetAttributes(attribute.String("source.balance_id", sourceBalance.BalanceID))
} else {
sourceBalance, err = l.datasource.GetBalanceByIDLite(transaction.Source)
if err != nil {
span.RecordError(err)
logrus.Errorf("source error %v", err)
return nil, nil, err
}
}
// Check if Destination starts with "@"
if strings.HasPrefix(transaction.Destination, "@") {
destinationBalance, err = l.getOrCreateBalanceByIndicator(ctx, transaction.Destination, transaction.Currency)
if err != nil {
span.RecordError(err)
logrus.Errorf("destination error %v", err)
return nil, nil, err
}
// Update transaction destination with the balance ID
transaction.Destination = destinationBalance.BalanceID
span.SetAttributes(attribute.String("destination.balance_id", destinationBalance.BalanceID))
} else {
destinationBalance, err = l.datasource.GetBalanceByIDLite(transaction.Destination)
if err != nil {
span.RecordError(err)
logrus.Errorf("destination error %v", err)
return nil, nil, err
}
}
span.AddEvent("Retrieved source and destination balances")
return sourceBalance, destinationBalance, nil
}
// acquireLock acquires a distributed lock for a transaction to ensure exclusive access to the source balance.
// It starts a tracing span, attempts to acquire the lock, and records relevant events and errors.
//
// Parameters:
// - ctx context.Context: The context for the operation.
// - transaction *model.Transaction: The transaction for which to acquire the lock.
//
// Returns:
// - *redlock.Locker: A pointer to the acquired Locker if successful.
// - error: An error if the lock could not be acquired.
func (l *Blnk) acquireLock(ctx context.Context, transaction *model.Transaction) (*redlock.Locker, error) {
ctx, span := tracer.Start(ctx, "Acquiring Lock")
defer span.End()
config, err := config.Fetch()
if err != nil {
return nil, err
}
locker := redlock.NewLocker(l.redis, transaction.Source, model.GenerateUUIDWithSuffix("loc"))
err = locker.Lock(ctx, config.Transaction.LockDuration)
if err != nil {
span.RecordError(err)
return nil, err
}
span.AddEvent("Lock acquired")
return locker, nil
}
// updateTransactionDetails updates the details of a transaction, including source and destination balances and status.
// It starts a tracing span, creates a new transaction object with updated details, and records relevant events.
//
// Parameters:
// - ctx context.Context: The context for the operation.
// - transaction *model.Transaction: The original transaction to be updated.
// - sourceBalance *model.Balance: The source balance for the transaction.
// - destinationBalance *model.Balance: The destination balance for the transaction.
//
// Returns:
// - *model.Transaction: A pointer to the new transaction object with updated details.
func (l *Blnk) updateTransactionDetails(ctx context.Context, transaction *model.Transaction, sourceBalance, destinationBalance *model.Balance) *model.Transaction {
_, span := tracer.Start(ctx, "Updating Transaction Details")
defer span.End()
// Create a new transaction object with updated details (immutable pattern)
newTransaction := *transaction // Copy the original transaction
newTransaction.Source = sourceBalance.BalanceID
newTransaction.Destination = destinationBalance.BalanceID
// Update the status based on the current status and inflight flag
applicableStatus := map[string]string{
StatusQueued: StatusApplied,
StatusScheduled: StatusApplied,
StatusCommit: StatusApplied,
StatusVoid: StatusVoid,
}
newTransaction.Status = applicableStatus[transaction.Status]
if transaction.Inflight {
newTransaction.Status = StatusInflight
}
span.AddEvent("Transaction details updated")
return &newTransaction
}
// persistTransaction persists a transaction to the database.
// It starts a tracing span, records the transaction, and handles any errors that occur during the process.
//
// Parameters:
// - ctx context.Context: The context for the operation.
// - transaction *model.Transaction: The transaction to be persisted.
//
// Returns:
// - *model.Transaction: A pointer to the persisted Transaction model.
// - error: An error if the transaction could not be persisted.
func (l *Blnk) persistTransaction(ctx context.Context, transaction *model.Transaction) (*model.Transaction, error) {
ctx, span := tracer.Start(ctx, "Persisting Transaction")
defer span.End()
transaction, err := l.datasource.RecordTransaction(ctx, transaction)
if err != nil {
span.RecordError(err)
logrus.Errorf("ERROR saving transaction to db. %s", err)
return nil, err
}
span.SetAttributes(attribute.String("transaction.id", transaction.TransactionID))
span.AddEvent("Transaction persisted")
return transaction, nil
}
// postTransactionActions performs post-processing actions for a transaction.
// It starts a tracing span, queues the transaction data for indexing, and sends a webhook notification.
//
// Parameters:
// - ctx context.Context: The context for the operation.
// - transaction *model.Transaction: The transaction for which to perform post-processing actions.
func (l *Blnk) postTransactionActions(ctx context.Context, transaction *model.Transaction) {
_, span := tracer.Start(ctx, "Post Transaction Actions")
defer span.End()
config, err := config.Fetch()
if err != nil {
span.RecordError(err)
return
}
go func() {
err := l.queue.queueIndexData(transaction.TransactionID, config.Transaction.IndexQueuePrefix, transaction)
if err != nil {
span.RecordError(err)
notification.NotifyError(err)
}
err = SendWebhook(NewWebhook{
Event: getEventFromStatus(transaction.Status),
Payload: transaction,
})
if err != nil {
span.RecordError(err)
notification.NotifyError(err)
}
span.AddEvent("Post-transaction actions completed")
}()
}
// updateBalances updates the source and destination balances in the database.
// It starts a tracing span, updates the balances, and performs post-update actions such as checking balance monitors
// and queuing the balances for indexing.
//
// Parameters:
// - ctx context.Context: The context for the operation.
// - sourceBalance *model.Balance: The source balance to be updated.
// - destinationBalance *model.Balance: The destination balance to be updated.
//
// Returns:
// - error: An error if the balances could not be updated.
func (l *Blnk) updateBalances(ctx context.Context, sourceBalance, destinationBalance *model.Balance) error {
ctx, span := tracer.Start(ctx, "Updating Balances")
defer span.End()
var wg sync.WaitGroup
// Update the balances in the datasource
if err := l.datasource.UpdateBalances(ctx, sourceBalance, destinationBalance); err != nil {
span.RecordError(err)
return err
}
// Add two tasks to the wait group
wg.Add(2)
// Goroutine to check monitors and queue index data for the source balance
go func() {
defer wg.Done()
l.checkBalanceMonitors(ctx, sourceBalance)
err := l.queue.queueIndexData(sourceBalance.BalanceID, "balances", sourceBalance)
if err != nil {
span.RecordError(err)
notification.NotifyError(err)
}
}()
// Goroutine to check monitors and queue index data for the destination balance
go func() {
defer wg.Done()
l.checkBalanceMonitors(ctx, destinationBalance)
err := l.queue.queueIndexData(destinationBalance.BalanceID, "balances", destinationBalance)
if err != nil {
span.RecordError(err)
notification.NotifyError(err)
}
}()
// Wait for both goroutines to complete
wg.Wait()
span.AddEvent("Balances updated")
return nil
}
// validateTxn validates a transaction by checking if its reference has already been used.
// It starts a tracing span, checks the existence of the transaction reference, and records relevant events and errors.
//
// Parameters:
// - ctx context.Context: The context for the operation.
// - transaction *model.Transaction: The transaction to be validated.
//
// Returns:
// - error: An error if the transaction reference has already been used or if there was an issue checking the reference.
func (l *Blnk) validateTxn(ctx context.Context, transaction *model.Transaction) error {
ctx, span := tracer.Start(ctx, "Validating Transaction Reference")
defer span.End()
// Check if a transaction with the same reference already exists
txn, err := l.datasource.TransactionExistsByRef(ctx, transaction.Reference)
if err != nil {
return err
}
// If the transaction reference already exists, return an error
if txn {
err := fmt.Errorf("reference %s has already been used", transaction.Reference)
span.RecordError(err)
return err
}
span.AddEvent("Transaction validated")
return nil
}
// applyTransactionToBalances applies a transaction to the provided balances.
// It starts a tracing span, calculates new balances, and updates the balances based on the transaction status.
//
// Parameters:
// - ctx context.Context: The context for the operation.
// - balances []*model.Balance: A slice of Balance models to be updated. The first balance is the source, and the second is the destination.
// - transaction *model.Transaction: The transaction to be applied to the balances.
//
// Returns:
// - error: An error if the balances could not be updated.
func (l *Blnk) applyTransactionToBalances(ctx context.Context, balances []*model.Balance, transaction *model.Transaction) error {
_, span := tracer.Start(ctx, "Applying Transaction to Balances")
defer span.End()
span.AddEvent("Calculating new balances")
// Handle committed inflight transactions
if transaction.Status == StatusCommit {
balances[0].CommitInflightDebit(transaction)
balances[1].CommitInflightCredit(transaction)
span.AddEvent("Committed inflight balances")
return nil
}
transactionAmount := new(big.Int).SetInt64(transaction.PreciseAmount)
// Handle voided transactions
if transaction.Status == StatusVoid {
balances[0].RollbackInflightDebit(transactionAmount)
balances[1].RollbackInflightCredit(transactionAmount)
span.AddEvent("Rolled back inflight balances")
return nil
}
// Update balances for other transaction statuses
err := model.UpdateBalances(transaction, balances[0], balances[1])
if err != nil {
span.RecordError(err)
return err
}
span.AddEvent("Balances updated")
return nil
}
// GetInflightTransactionsByParentID retrieves inflight transactions by their parent transaction ID.
// It starts a tracing span, fetches the transactions from the datasource, and records relevant events and errors.
//
// Parameters:
// - ctx context.Context: The context for the operation.
// - parentTransactionID string: The ID of the parent transaction.
// - batchSize int: The number of transactions to retrieve in a batch.
// - offset int64: The offset for pagination.
//
// Returns:
// - []*model.Transaction: A slice of pointers to the retrieved Transaction models.
// - error: An error if the transactions could not be retrieved.
func (l *Blnk) GetInflightTransactionsByParentID(ctx context.Context, parentTransactionID string, batchSize int, offset int64) ([]*model.Transaction, error) {
ctx, span := tracer.Start(ctx, "GetInflightTransactionsByParentID")
defer span.End()
transactions, err := l.datasource.GetInflightTransactionsByParentID(ctx, parentTransactionID, batchSize, offset)
if err != nil {
span.RecordError(err)
return nil, err
}
span.SetAttributes(attribute.String("parent_transaction_id", parentTransactionID))
span.AddEvent("Inflight transactions retrieved")
return transactions, nil
}
// GetRefundableTransactionsByParentID retrieves refundable transactions by their parent transaction ID.
// It starts a tracing span, fetches the transactions from the datasource, and records relevant events and errors.
//
// Parameters:
// - ctx context.Context: The context for the operation.
// - parentTransactionID string: The ID of the parent transaction.
// - batchSize int: The number of transactions to retrieve in a batch.
// - offset int64: The offset for pagination.
//
// Returns:
// - []*model.Transaction: A slice of pointers to the retrieved Transaction models.
// - error: An error if the transactions could not be retrieved.
func (l *Blnk) GetRefundableTransactionsByParentID(ctx context.Context, parentTransactionID string, batchSize int, offset int64) ([]*model.Transaction, error) {
ctx, span := tracer.Start(ctx, "GetRefundableTransactionsByParentID")
defer span.End()
transactions, err := l.datasource.GetRefundableTransactionsByParentID(ctx, parentTransactionID, batchSize, offset)
if err != nil {
span.RecordError(err)
return nil, err
}
span.SetAttributes(attribute.String("parent_transaction_id", parentTransactionID))
span.AddEvent("Refundable transactions retrieved")
return transactions, nil
}
// ProcessTransactionInBatches processes transactions in batches or streams them based on the provided mode.
// It starts a tracing span, initializes worker pools, fetches transactions, and processes them concurrently.
//
// Parameters:
// - ctx context.Context: The context for the operation.
// - parentTransactionID string: The ID of the parent transaction.
// - amount float64: The amount to be processed in the transaction.
// - maxWorkers int: The maximum number of workers to process transactions concurrently.
// - streamMode bool: A flag indicating whether to process transactions in streaming mode.
// - gt getTxns: A function to retrieve transactions in batches.
// - tw transactionWorker: A function to process transactions.
//
// Returns:
// - []*model.Transaction: A slice of pointers to the processed Transaction models.
// - error: An error if the transactions could not be processed.
func (l *Blnk) ProcessTransactionInBatches(ctx context.Context, parentTransactionID string, amount float64, maxWorkers int, streamMode bool, gt getTxns, tw transactionWorker) ([]*model.Transaction, error) {
// Start a tracing span
ctx, span := tracer.Start(ctx, "ProcessTransactionInBatches")
defer span.End()
config, err := config.Fetch()
if err != nil {
return nil, err
}
// Use configuration values
batchSize := config.Transaction.BatchSize
maxQueueSize := config.Transaction.MaxQueueSize
// Slice to collect all processed transactions and errors
var allTxns []*model.Transaction
var allErrors []error
var mu sync.Mutex // Mutex to protect shared resources
// Create channels for jobs and results
jobs := make(chan *model.Transaction, maxQueueSize)
results := make(chan BatchJobResult, maxQueueSize)
// Initialize worker pool
var wg sync.WaitGroup
for w := 1; w <= maxWorkers; w++ {
wg.Add(1)
go tw(ctx, jobs, results, &wg, amount)
}
// Ensure the results channel is closed once all workers are done
go func() {
wg.Wait()
close(results)
}()
if !streamMode {
// Start a goroutine to process results
done := make(chan struct{})
go processResults(results, &mu, &allTxns, &allErrors, done)
// Fetch and process transactions in batches concurrently
errChan := make(chan error, 1)
go fetchTransactions(ctx, parentTransactionID, batchSize, gt, jobs, errChan)
// Wait for all processing to complete
select {
case err := <-errChan:
span.RecordError(err)
return allTxns, err
case <-done:
}
if len(allErrors) > 0 {
// Log errors and return a combined error
for _, err := range allErrors {
log.Printf("Error during processing: %v", err)
span.RecordError(err)
}
return allTxns, fmt.Errorf("multiple errors occurred during processing: %v", allErrors)
}
span.AddEvent("Processed all transactions in batches")
return allTxns, nil
} else {
var wg sync.WaitGroup
// Stream mode: just fetch transactions and send to jobs channel
errChan := make(chan error, 1)
wg.Add(1)
go func() {
defer wg.Done()
fetchTransactions(ctx, parentTransactionID, batchSize, gt, jobs, errChan)
}()
wg.Wait()
span.AddEvent("Processed all transactions in streaming mode")
return nil, nil
}
}
// processResults processes the results from the results channel, collecting transactions and errors.
// It locks access to shared data to ensure thread safety and signals completion when done.
//
// Parameters:
// - results chan BatchJobResult: The channel from which to receive results.
// - mu *sync.Mutex: A mutex to synchronize access to shared data.
// - allTxns *[]*model.Transaction: A slice to collect all processed transactions.
// - allErrors *[]error: A slice to collect all errors encountered during processing.
// - done chan struct{}: A channel to signal when processing is complete.
func processResults(results chan BatchJobResult, mu *sync.Mutex, allTxns *[]*model.Transaction, allErrors *[]error, done chan struct{}) {
for result := range results {
mu.Lock()
if result.Error != nil {
// Log any error encountered during transaction processing
log.Printf("Error processing transaction: %v", result.Error)
*allErrors = append(*allErrors, result.Error)
} else if result.Txn != nil {
*allTxns = append(*allTxns, result.Txn)
} else {
// Handle the case where the result contains no transaction and no error
log.Printf("Received a result with no transaction and no error")
}
mu.Unlock()
}
close(done) // Signal completion of processing.
}
// fetchTransactions fetches transactions in batches and sends them to the jobs channel.
// It starts a tracing span, iterates through the transactions, and handles context cancellation and errors.
//
// Parameters:
// - ctx context.Context: The context for the operation.
// - parentTransactionID string: The ID of the parent transaction.
// - batchSize int: The number of transactions to retrieve in a batch.
// - gt getTxns: A function to retrieve transactions in batches.
// - jobs chan *model.Transaction: The channel to send fetched transactions to.
// - errChan chan error: The channel to send errors to.
func fetchTransactions(ctx context.Context, parentTransactionID string, batchSize int, gt getTxns, jobs chan *model.Transaction, errChan chan error) {
newCtx, span := tracer.Start(ctx, "FetchTransactions")
defer span.End()
defer close(jobs) // Ensure the jobs channel is closed in all cases to avoid deadlocks
var offset int64 = 0
for {
select {
case <-ctx.Done():
// Handle context cancellation gracefully by sending the error and returning
err := ctx.Err()
if err != nil {
errChan <- err
span.RecordError(err)
}
return
default:
// Fetch the transactions in batches
txns, err := gt(newCtx, parentTransactionID, batchSize, offset)
if err != nil {
// Log and send error if fetching transactions fails
log.Printf("Error fetching transactions: %v", err)
errChan <- err
span.RecordError(err)
return
}
if len(txns) == 0 {
// Stop if no more transactions are found
span.AddEvent("No more transactions to fetch")
return
}
// Send fetched transactions to the jobs channel
for _, txn := range txns {
select {
case jobs <- txn: // Send the transaction to be processed
case <-ctx.Done():
// If context is canceled, handle gracefully
err := ctx.Err()
if err != nil {
errChan <- err
span.RecordError(err)
}
return
}
}
// Increment offset to fetch the next batch
offset += int64(len(txns))
}
}
}
// RefundWorker processes refund transactions from the jobs channel and sends the results to the results channel.
// It starts a tracing span, processes each transaction, and records relevant events and errors.
//
// Parameters:
// - ctx context.Context: The context for the operation.
// - jobs <-chan *model.Transaction: A channel from which transactions are received for processing.
// - results chan<- BatchJobResult: A channel to which the results of the processing are sent.
// - wg *sync.WaitGroup: A wait group to synchronize the completion of the worker.
// - amount float64: The amount to be processed in the transaction.
func (l *Blnk) RefundWorker(ctx context.Context, jobs <-chan *model.Transaction, results chan<- BatchJobResult, wg *sync.WaitGroup, amount float64) {
ctx, span := tracer.Start(ctx, "RefundWorker")
defer span.End()
defer wg.Done()
for originalTxn := range jobs {
queuedRefundTxn, err := l.RefundTransaction(ctx, originalTxn.TransactionID)
if err != nil {
results <- BatchJobResult{Error: err}
span.RecordError(err)
continue
}
results <- BatchJobResult{Txn: queuedRefundTxn}
span.AddEvent("Refund processed", trace.WithAttributes(attribute.String("transaction.id", queuedRefundTxn.TransactionID)))
}
}
// CommitWorker processes commit transactions from the jobs channel and sends the results to the results channel.
// It starts a tracing span, processes each transaction, and records relevant events and errors.
//
// Parameters:
// - ctx context.Context: The context for the operation.
// - jobs <-chan *model.Transaction: A channel from which transactions are received for processing.
// - results chan<- BatchJobResult: A channel to which the results of the processing are sent.
// - wg *sync.WaitGroup: A wait group to synchronize the completion of the worker.
// - amount float64: The amount to be processed in the transaction.
func (l *Blnk) CommitWorker(ctx context.Context, jobs <-chan *model.Transaction, results chan<- BatchJobResult, wg *sync.WaitGroup, amount float64) {
ctx, span := tracer.Start(ctx, "CommitWorker")
defer span.End()
defer wg.Done()
for originalTxn := range jobs {
if originalTxn.Status != StatusInflight {
err := fmt.Errorf("transaction is not in inflight status")
results <- BatchJobResult{Error: err}
span.RecordError(err)
continue
}
queuedCommitTxn, err := l.CommitInflightTransaction(ctx, originalTxn.TransactionID, amount)
if err != nil {
results <- BatchJobResult{Error: err}
span.RecordError(err)
continue
}
results <- BatchJobResult{Txn: queuedCommitTxn}
span.AddEvent("Commit processed", trace.WithAttributes(attribute.String("transaction.id", queuedCommitTxn.TransactionID)))
}
}
// VoidWorker processes void transactions from the jobs channel and sends the results to the results channel.
// It starts a tracing span, processes each transaction, and records relevant events and errors.
//
// Parameters:
// - ctx context.Context: The context for the operation.
// - jobs <-chan *model.Transaction: A channel from which transactions are received for processing.
// - results chan<- BatchJobResult: A channel to which the results of the processing are sent.
// - wg *sync.WaitGroup: A wait group to synchronize the completion of the worker.
// - amount float64: The amount to be processed in the transaction (not used in this function).
func (l *Blnk) VoidWorker(ctx context.Context, jobs <-chan *model.Transaction, results chan<- BatchJobResult, wg *sync.WaitGroup, amount float64) {
ctx, span := tracer.Start(ctx, "VoidWorker")
defer span.End()
defer wg.Done()
for originalTxn := range jobs {
if originalTxn.Status != StatusInflight {
err := fmt.Errorf("transaction is not in inflight status")
results <- BatchJobResult{Error: err}
span.RecordError(err)
continue
}
queuedVoidTxn, err := l.VoidInflightTransaction(ctx, originalTxn.TransactionID)
if err != nil {
results <- BatchJobResult{Error: err}
span.RecordError(err)
continue
}
results <- BatchJobResult{Txn: queuedVoidTxn}
span.AddEvent("Void processed", trace.WithAttributes(attribute.String("transaction.id", queuedVoidTxn.TransactionID)))
}
}
// RecordTransaction records a transaction by validating, processing balances, and finalizing the transaction.
// It starts a tracing span, acquires a lock, and performs the necessary steps to record the transaction.
//
// Parameters:
// - ctx context.Context: The context for the operation.
// - transaction *model.Transaction: The transaction to be recorded.
//
// Returns:
// - *model.Transaction: A pointer to the recorded Transaction model.
// - error: An error if the transaction could not be recorded.
func (l *Blnk) RecordTransaction(ctx context.Context, transaction *model.Transaction) (*model.Transaction, error) {
ctx, span := tracer.Start(ctx, "RecordTransaction")
defer span.End()
return l.executeWithLock(ctx, transaction, func(ctx context.Context) (*model.Transaction, error) {
// Execute pre-transaction hooks
if err := l.Hooks.ExecutePreHooks(ctx, transaction.TransactionID, transaction); err != nil {
span.RecordError(err)
return nil, err
}
// Validate and prepare the transaction, including retrieving source and destination balances
transaction, sourceBalance, destinationBalance, err := l.validateAndPrepareTransaction(ctx, transaction)
if err != nil {
span.RecordError(err)
return nil, err
}
// Process the balances by applying the transaction
if err := l.processBalances(ctx, transaction, sourceBalance, destinationBalance); err != nil {
span.RecordError(err)
return nil, err
}
// Finalize the transaction by persisting it and updating the balances
transaction, err = l.finalizeTransaction(ctx, transaction, sourceBalance, destinationBalance)
if err != nil {
span.RecordError(err)
return nil, err
}
// Execute post-transaction hooks
if err := l.Hooks.ExecutePostHooks(ctx, transaction.TransactionID, transaction); err != nil {
span.RecordError(err)
// Don't fail the transaction if post-hooks fail, just log the error
logrus.Errorf("post-transaction hooks failed: %v", err)
}
// Perform post-transaction actions such as indexing and sending webhooks
l.postTransactionActions(ctx, transaction)
span.AddEvent("Transaction processed", trace.WithAttributes(attribute.String("transaction.id", transaction.TransactionID)))
return transaction, nil
})
}
// executeWithLock executes a function with a distributed lock to ensure exclusive access to the transaction.
// It starts a tracing span, acquires the lock, executes the provided function, and releases the lock.
//
// Parameters:
// - ctx context.Context: The context for the operation.
// - transaction *model.Transaction: The transaction for which to acquire the lock.
// - fn func(context.Context) (*model.Transaction, error): The function to execute with the lock.
//
// Returns:
// - *model.Transaction: A pointer to the Transaction model returned by the function.
// - error: An error if the lock could not be acquired or if the function execution fails.
func (l *Blnk) executeWithLock(ctx context.Context, transaction *model.Transaction, fn func(context.Context) (*model.Transaction, error)) (*model.Transaction, error) {
ctx, span := tracer.Start(ctx, "ExecuteWithLock")
defer span.End()
// Acquire a distributed lock for the transaction
locker, err := l.acquireLock(ctx, transaction)
if err != nil {
span.RecordError(err)
return nil, fmt.Errorf("failed to acquire lock: %w", err)
}
defer l.releaseLock(ctx, locker)
// Execute the provided function with the lock
return fn(ctx)
}
// validateAndPrepareTransaction validates the transaction and prepares it by retrieving the source and destination balances.
// It starts a tracing span, validates the transaction, retrieves the balances, and updates the transaction with the balance IDs.
//
// Parameters:
// - ctx context.Context: The context for the operation.
// - transaction *model.Transaction: The transaction to be validated and prepared.
//
// Returns:
// - *model.Transaction: A pointer to the new transaction object with updated details.
// - *model.Balance: A pointer to the source Balance model.
// - *model.Balance: A pointer to the destination Balance model.
// - error: An error if the transaction validation or balance retrieval fails.
func (l *Blnk) validateAndPrepareTransaction(ctx context.Context, transaction *model.Transaction) (*model.Transaction, *model.Balance, *model.Balance, error) {
ctx, span := tracer.Start(ctx, "ValidateAndPrepareTransaction")
defer span.End()
// Validate the transaction
if err := l.validateTxn(ctx, transaction); err != nil {
span.RecordError(err)
return nil, nil, nil, l.logAndRecordError(span, "transaction validation failed", err)
}
// Retrieve the source and destination balances
sourceBalance, destinationBalance, err := l.getSourceAndDestination(ctx, transaction)
if err != nil {
span.RecordError(err)
return nil, nil, nil, l.logAndRecordError(span, "failed to get source and destination balances", err)
}
// Create a copy of the transaction and update it (immutable)
newTransaction := *transaction // Copy the original transaction
newTransaction.Source = sourceBalance.BalanceID
newTransaction.Destination = destinationBalance.BalanceID
span.AddEvent("Transaction validated and prepared", trace.WithAttributes(
attribute.String("source.balance_id", sourceBalance.BalanceID),
attribute.String("destination.balance_id", destinationBalance.BalanceID)))
// Return the new transaction, source, and destination balances
return &newTransaction, sourceBalance, destinationBalance, nil
}
// processBalances processes the source and destination balances by applying the transaction and updating the balances.
// It starts a tracing span, applies the transaction to the balances, updates the balances, and records relevant events and errors.
//
// Parameters:
// - ctx context.Context: The context for the operation.
// - transaction *model.Transaction: The transaction to be applied to the balances.
// - sourceBalance *model.Balance: The source balance to be updated.
// - destinationBalance *model.Balance: The destination balance to be updated.
//
// Returns:
// - error: An error if the transaction could not be applied to the balances or if the balances could not be updated.
func (l *Blnk) processBalances(ctx context.Context, transaction *model.Transaction, sourceBalance, destinationBalance *model.Balance) error {
ctx, span := tracer.Start(ctx, "ProcessBalances")
defer span.End()
// Apply the transaction to the source and destination balances
if err := l.applyTransactionToBalances(ctx, []*model.Balance{sourceBalance, destinationBalance}, transaction); err != nil {
span.RecordError(err)
return l.logAndRecordError(span, "failed to apply transaction to balances", err)
}
// Update the source and destination balances in the datasource
if err := l.updateBalances(ctx, sourceBalance, destinationBalance); err != nil {
span.RecordError(err)
return l.logAndRecordError(span, "failed to update balances", err)
}
span.AddEvent("Balances processed")
return nil
}
// finalizeTransaction finalizes the transaction by updating its details and persisting it to the database.
// It starts a tracing span, updates the transaction details, persists the transaction, and records relevant events and errors.
//
// Parameters:
// - ctx context.Context: The context for the operation.
// - transaction *model.Transaction: The transaction to be finalized.
// - sourceBalance *model.Balance: The source balance associated with the transaction.
// - destinationBalance *model.Balance: The destination balance associated with the transaction.
//
// Returns:
// - *model.Transaction: A pointer to the finalized Transaction model.
// - error: An error if the transaction could not be persisted.
func (l *Blnk) finalizeTransaction(ctx context.Context, transaction *model.Transaction, sourceBalance, destinationBalance *model.Balance) (*model.Transaction, error) {
ctx, span := tracer.Start(ctx, "FinalizeTransaction")
defer span.End()
// Update the transaction details with the source and destination balances
transaction = l.updateTransactionDetails(ctx, transaction, sourceBalance, destinationBalance)
// Persist the transaction to the database
transaction, err := l.persistTransaction(ctx, transaction)
if err != nil {
span.RecordError(err)
return nil, l.logAndRecordError(span, "failed to persist transaction", err)
}
span.AddEvent("Transaction processed", trace.WithAttributes(attribute.String("transaction.id", transaction.TransactionID)))
return transaction, nil
}
// releaseLock releases the distributed lock acquired for a transaction.
// It starts a tracing span, attempts to release the lock, and records relevant events and errors.
//
// Parameters:
// - ctx context.Context: The context for the operation.
// - locker *redlock.Locker: The locker object representing the acquired lock.
func (l *Blnk) releaseLock(ctx context.Context, locker *redlock.Locker) {
ctx, span := tracer.Start(ctx, "ReleaseLock")
defer span.End()
// Attempt to release the lock
if err := locker.Unlock(ctx); err != nil {
span.RecordError(err)
logrus.Error("failed to release lock", err)
}
span.AddEvent("Lock released")
}
// logAndRecordError logs an error message and records the error in the tracing span.
// It returns a formatted error message combining the provided message and the original error.
//
// Parameters:
// - span trace.Span: The tracing span to record the error.
// - msg string: The error message to log and include in the formatted error.
// - err error: The original error to be logged and recorded.
//
// Returns:
// - error: A formatted error message combining the provided message and the original error.
func (l *Blnk) logAndRecordError(span trace.Span, msg string, err error) error {
span.RecordError(err)
logrus.Error(msg, err)
return fmt.Errorf("%s: %w", msg, err)
}
// RejectTransaction rejects a transaction by updating its status and recording the rejection reason.
// It starts a tracing span, updates the transaction status and metadata, persists the transaction, and records relevant events and errors.
//
// Parameters:
// - ctx context.Context: The context for the operation.
// - transaction *model.Transaction: The transaction to be rejected.
// - reason string: The reason for rejecting the transaction.
//
// Returns:
// - *model.Transaction: A pointer to the rejected Transaction model.
// - error: An error if the transaction could not be recorded.
func (l *Blnk) RejectTransaction(ctx context.Context, transaction *model.Transaction, reason string) (*model.Transaction, error) {
ctx, span := tracer.Start(ctx, "RejectTransaction")
defer span.End()
// Update the transaction status to rejected
transaction.Status = StatusRejected
// Initialize MetaData if it's nil and add the rejection reason
if transaction.MetaData == nil {
transaction.MetaData = make(map[string]interface{})
}
transaction.MetaData["blnk_rejection_reason"] = reason
// Persist the transaction with the updated status and metadata
transaction, err := l.datasource.RecordTransaction(ctx, transaction)
if err != nil {
span.RecordError(err)
logrus.Errorf("ERROR saving transaction to db. %s", err)
return nil, err
}
span.AddEvent("Transaction rejected", trace.WithAttributes(attribute.String("transaction.id", transaction.TransactionID)))
return transaction, nil
}
// CommitInflightTransaction commits an inflight transaction by validating and updating its amount, and finalizing the commitment.
// It starts a tracing span, fetches and validates the inflight transaction, updates the amount, and finalizes the commitment.
//
// Parameters:
// - ctx context.Context: The context for the operation.
// - transactionID string: The ID of the inflight transaction to be committed.
// - amount float64: The amount to be validated and updated in the transaction.
//
// Returns:
// - *model.Transaction: A pointer to the committed Transaction model.
// - error: An error if the transaction could not be committed.
func (l *Blnk) CommitInflightTransaction(ctx context.Context, transactionID string, amount float64) (*model.Transaction, error) {