forked from bloomberg/comdb2
-
Notifications
You must be signed in to change notification settings - Fork 0
/
bdb_int.h
1870 lines (1447 loc) · 64.5 KB
/
bdb_int.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
Copyright 2015 Bloomberg Finance L.P.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#ifndef __bdb_int_h__
#define __bdb_int_h__
#define restrict
/*#define RW_RRN_LOCK*/
#include <sys/types.h>
#include <stdio.h>
#include <stdlib.h>
#include <build/db.h>
#include <bb_stdint.h>
#include <compile_time_assert.h>
#include <object_pool.h>
#include <list.h>
#include <plhash.h>
#include <thread_util.h>
#include "bdb_cursor.h"
#include "cursor_ll.h"
#include "bdb_access.h"
#include <compile_time_assert.h>
#include <epochlib.h>
#include <cheapstack.h>
#include <cdb2_constants.h>
#include "averager.h"
#include "intern_strings.h"
#include "bdb_schemachange.h"
/* Some additional error codes, chosen not to conflict with system codes
* or with berkdb error codes. Use bdb_strerror() to decode. */
#define DB_ODH_CORRUPT (-40000) /* On disk header corrupt */
#define DB_UNCOMPRESS_ERR (-40001) /* Cannot inflate compressed rec */
#include "ix_return_codes.h"
#include "mem_bdb.h"
#include "mem_override.h"
#include "tunables.h"
/* Public ODH constants */
enum {
ODH_UPDATEID_BITS = 12,
ODH_LENGTH_BITS = 28,
ODH_SIZE = 7, /* We may extend for larger headers in the future,
but the minimum size shall always be 7 bytes. */
ODH_SIZE_RESERVE = 7, /* Callers wishing to provide a buffer into which
a record will be packed should allow this many
bytes on top of the record size for the ODH.
Right now this is the same as ODH_SIZE - one
day it may be the max possible ODH size if we
start adding fields. */
ODH_FLAG_COMPR_MASK = 0x7
};
/* snapisol log ops */
typedef enum log_ops { LOG_APPLY = 0, LOG_PRESCAN = 1, LOG_BACKFILL = 2 } log_ops_t;
/* These are the fields of the ondisk header. This is not the ondisk
* representation but a convenient format for passing the header around in
* our code. */
struct odh {
uint32_t length; /* actually only 28 bits of this can be used leading to
a max value of (1<<ODH_LENGTH_BITS)-1 */
uint16_t updateid; /* actually only 12 bits of this can be used leading to
a max value of (1<<ODH_UPDATEID_BITS)-1 */
uint8_t csc2vers;
uint8_t flags;
void *recptr; /* Some functions set this to point to the
decompressed record data. */
};
#ifndef MIN
#define MIN(a, b) (((a) < (b)) ? (a) : (b))
#endif
#ifndef MAX
#define MAX(a, b) (((a) > (b)) ? (a) : (b))
#endif
/* by trial and error it seems that for queue databases the available bytes for
* record data is pagesize-32. Can't seem to find an appropriate constant
* in berkdb... */
#define QUEUE_PAGE_HEADER_SZ 32
void make_lsn(DB_LSN *logseqnum, unsigned int filenum, unsigned int offsetnum);
struct tran_table_shadows;
typedef struct tran_table_shadows tran_table_shadows_t;
typedef enum {
TRANCLASS_BERK = 1,
TRANCLASS_LOGICAL = 2,
TRANCLASS_PHYSICAL = 3,
TRANCLASS_READCOMMITTED = 4,
TRANCLASS_SERIALIZABLE = 5,
/* TRANCLASS_QUERYISOLATION = 6, */
TRANCLASS_LOGICAL_NOROWLOCKS = 7, /* used in fetch.c for table locks */
TRANCLASS_SOSQL = 8,
TRANCLASS_SNAPISOL = 9
} tranclass_type;
#define PAGE_KEY \
unsigned char fileid[DB_FILE_ID_LEN]; \
db_pgno_t pgno;
#define PAGE_KEY_SIZE \
(DB_FILE_ID_LEN * sizeof(unsigned char) + sizeof(db_pgno_t))
struct lsn_list {
DB_LSN lsn;
LINKC_T(struct lsn_list) lnk;
#ifdef NEWSI_DEBUG_POOL
void *pool;
#endif
};
struct commit_list {
DB_LSN commit_lsn;
unsigned long long logical_tranid;
LINKC_T(struct commit_list) lnk;
#ifdef NEWSI_DEBUG_POOL
void *pool;
#endif
};
struct lsn_commit_list {
DB_LSN lsn;
DB_LSN commit_lsn;
LINKC_T(struct lsn_commit_list) lnk;
#ifdef NEWSI_DEBUG_POOL
void *pool;
#endif
};
struct relink_list {
db_pgno_t inh;
DB_LSN lsn;
LINKC_T(struct relink_list) lnk;
#ifdef NEWSI_DEBUG_POOL
void *pool;
#endif
};
enum { PGLOGS_QUEUE_PAGE = 1, PGLOGS_QUEUE_RELINK = 2 };
struct pglogs_queue_key {
LINKC_T(struct pglogs_queue_key) lnk;
unsigned long long logical_tranid;
int type;
db_pgno_t pgno;
db_pgno_t prev_pgno;
db_pgno_t next_pgno;
DB_LSN lsn;
DB_LSN commit_lsn;
#ifdef NEWSI_DEBUG_POOL
void *pool;
#endif
};
struct asof_cursor {
unsigned char fileid[DB_FILE_ID_LEN];
struct pglogs_queue_key *cur;
};
struct fileid_pglogs_queue {
unsigned char fileid[DB_FILE_ID_LEN];
int deleteme;
pthread_rwlock_t queue_lk;
LISTC_T(struct pglogs_queue_key) queue_keys;
};
// This is stored in a hash indexed by fileid. All cursors pointed
// at a fileid maintain a pointer to the same memory.
struct pglogs_queue_cursor {
unsigned char fileid[DB_FILE_ID_LEN];
struct fileid_pglogs_queue *queue;
struct pglogs_queue_key *last;
};
struct pglogs_queue_heads {
int index;
unsigned char **fileids;
};
struct page_logical_lsn_key {
PAGE_KEY
DB_LSN lsn;
DB_LSN commit_lsn;
};
struct pglogs_key {
PAGE_KEY
LISTC_T(struct lsn_list) lsns;
#ifdef NEWSI_DEBUG_POOL
void *pool;
#endif
};
#define PGLOGS_KEY_OFFSET (offsetof(struct pglogs_key, fileid))
struct pglogs_logical_key {
PAGE_KEY
LISTC_T(struct lsn_commit_list) lsns;
#ifdef NEWSI_DEBUG_POOL
void *pool;
#endif
};
#define PGLOGS_LOGICAL_KEY_OFFSET (offsetof(struct pglogs_logical_key, fileid))
struct pglogs_relink_key {
PAGE_KEY
LISTC_T(struct relink_list) relinks;
#ifdef NEWSI_DEBUG_POOL
void *pool;
#endif
};
#define PGLOGS_RELINK_KEY_OFFSET (offsetof(struct pglogs_relink_key, fileid))
struct ltran_pglogs_key {
unsigned long long logical_tranid;
pthread_mutex_t pglogs_mutex;
DB_LSN logical_commit_lsn; /* lsn of the physical commit of the logical
transaction */
hash_t *pglogs_hashtbl;
};
struct timestamp_lsn_key {
int32_t timestamp;
DB_LSN lsn;
unsigned long long context;
};
typedef struct pglogs_tmptbl_key {
unsigned char fileid[DB_FILE_ID_LEN];
db_pgno_t pgno;
DB_LSN commit_lsn;
DB_LSN lsn;
} pglogs_tmptbl_key;
typedef struct relinks_tmptbl_key {
unsigned char fileid[DB_FILE_ID_LEN];
db_pgno_t pgno;
DB_LSN lsn;
db_pgno_t inh;
} relinks_tmptbl_key;
struct logfile_pglogs_entry {
u_int32_t filenum;
pthread_mutex_t pglogs_lk;
struct temp_table *pglogs_tbl;
struct temp_cursor *pglogs_cur;
pthread_mutex_t relinks_lk;
struct temp_table *relinks_tbl;
struct temp_cursor *relinks_cur;
};
struct checkpoint_list {
DB_LSN lsn;
DB_LSN ckp_lsn;
int32_t timestamp;
LINKC_T(struct checkpoint_list) lnk;
};
struct tran_tag {
tranclass_type tranclass;
DB_TXN *tid;
u_int32_t logical_lid;
u_int32_t original_lid;
int is_curtran;
void *usrptr;
DB_LSN savelsn;
struct tran_tag *parent;
DB_LSN begin_lsn; /* lsn of logical begin */
DB_LSN startlsn; /* where log was when we
started */
/*
snapshot bdb_state->numchildren
we don't care that much if DB-s are flipping,
but we don't want to see transient tailing DB-s
created by schema change or fastinit
*/
int numchildren;
/*
this is index by dbnum;
right now 0, 1 are meta, among them is also fstblk
these will never have shadows (shrugs)
*/
tran_table_shadows_t *tables; /* shadow for tables */
/* this is a replacement for the genid_bitmap, keep both for now */
unsigned long long gblcontext;
unsigned long long logical_tranid;
/* LSN of the last logical record for this transaction */
DB_LSN last_logical_lsn;
DB_LSN last_physical_commit_lsn;
/* which lsn generated the startgenid */
DB_LSN snapy_commit_lsn;
uint32_t snapy_commit_generation;
DB_LSN last_regop_lsn;
/* LSN of the the physical commit/abort txn */
DB_LSN commit_lsn;
/* lsn when the tran obj was created */
DB_LSN birth_lsn;
/* Birth lsn of oldest outstanding logical txn at start time */
DB_LSN oldest_txn_at_start;
/* List of outstanding logical txns at start */
uint64_t *bkfill_txn_list;
/* Number of outstanding logical txns at start */
int bkfill_txn_count;
/* tran obj was created as of we were at a lsn*/
DB_LSN asof_lsn;
/* oldest logical ref point of a begin-as-of tran*/
DB_LSN asof_ref_lsn;
/* hash table for pglogs */
hash_t *pglogs_hashtbl;
/* hash table for relinks */
hash_t *relinks_hashtbl;
pthread_mutex_t pglogs_mutex;
/* hash table to keep track of
whether we have copied pglogs from the gbl structure for a given page */
hash_t *asof_hashtbl;
/* temporary: used in logical abort case */
hash_t *compensated_records;
/* anchor in bdb_state->transactions */
LINKC_T(struct tran_tag) tranlist_lnk;
/* For non-clustered sql offloading we pass the tran object allocated
* in the block processor to the sql engine pool. Then when th sql engine
* creates shadow indexes it uses its thread id as part of the file name.
* However the shadow files don't get deleted until commit or abort time
* on the original block processor thread, by which time the sql engine
* thread may have been freed and reused for another transation.
* Get round this by recording the threadid of the thread that creates
* the transaction and using this in shadow file names. */
pthread_t threadid;
/* for recom and snapisol/serial, record in startgenid the context when this
transaction was started;
- for recom it is used to differentiate between synthetic genids and
real(existing) genids
- for si, this is also used to mask out new updates
*/
unsigned long long startgenid;
unsigned int trigger_epoch;
/* For logical transactions: a logical transaction may have a (one and
only one) physical transaction in flight. Latch it here for debugging
and sanity checking */
struct tran_tag *physical_tran;
/* For a physical transaction, chain up to the logical transaction */
struct tran_tag *logical_tran;
/* snapshot/serializable support */
struct bdb_osql_trn *osql;
/* this is tested in rep.c to see if net needs to flush/wait */
signed char is_about_to_commit;
signed char aborted;
signed char rep_handle_dead; /* must reopen all db cursors after abort */
/* set if we are a top level transaction (ie, not a child) */
signed char master;
/* Set if we were created from the replication stream */
signed char reptxn;
signed char wrote_begin_record;
signed char committed_begin_record;
signed char get_schema_lock;
signed char single_physical_transaction;
/* log support */
signed char trak; /* set this to enable tracking */
signed char is_rowlocks_trans;
/* if the txn intends to write, this tells us to get write
locks when we read */
signed char write_intent;
/* Open cursors under this transaction. */
LISTC_T(bdb_cursor_ifn_t) open_cursors;
/* Committed the child transaction. */
signed char committed_child;
/* total shadow rows */
int shadow_rows;
/* Set to 1 if we got the bdb lock */
int got_bdb_lock;
/* Set to 1 if this is a schema change txn */
int schema_change_txn;
struct tran_tag *sc_parent_tran;
/* Set to 1 if this txn touches a logical live sc table */
int force_logical_commit;
/* Tables that this tran touches (for logical redo sc) */
hash_t *dirty_table_hash;
/* cache the versions of dta files to catch schema changes and fastinits */
int table_version_cache_sz;
unsigned long long *table_version_cache;
bdb_state_type *parent_state;
/* Send the master periodic 'acks' after this many physical commits */
int request_ack;
int check_shadows;
int micro_commit;
unsigned verify_updateid : 1;
/* Rowlocks commit support */
pool_t *rc_pool;
DBT **rc_list;
DB_LOCK *rc_locks;
u_int32_t rc_max;
u_int32_t rc_count;
u_int64_t logbytes;
/* Newsi pglogs queue hash */
hash_t *pglogs_queue_hash;
u_int32_t flags;
int is_prepared;
};
struct seqnum_t {
DB_LSN lsn;
// For master lease
uint32_t issue_time[2];
uint32_t lease_ms;
uint32_t commit_generation;
uint32_t generation;
};
enum { BDB_SEQNUM_TYPE_LEN = 8 + 2 + 2 + 4 + 12 };
BB_COMPILE_TIME_ASSERT(bdb_seqnum_type,
sizeof(struct seqnum_t) == BDB_SEQNUM_TYPE_LEN);
struct filepage_t {
unsigned int fileid; /* fileid to prefault */
unsigned int pgno; /* page number to prefault*/
};
enum { BDB_FILEPAGE_TYPE_LEN = 4 + 4 };
BB_COMPILE_TIME_ASSERT(bdb_filepage_type,
sizeof(struct filepage_t) == BDB_FILEPAGE_TYPE_LEN);
/* terminate list w/ index == -1 */
typedef struct {
unsigned long long context;
short index;
} cmpcontextlist_type;
struct thread_lock_info_tag;
typedef struct thread_lock_info_tag thread_lock_info_type;
#ifndef __bdb_api_h__
struct bdb_state_tag;
typedef struct bdb_state_tag bdb_state_type;
struct bdb_callback_tag;
typedef struct bdb_callback_tag bdb_callback_type;
struct tran_tag;
typedef struct tran_tag tran_type;
struct bdb_attr_tag;
typedef struct bdb_attr_tag bdb_attr_type;
struct bdb_temp_hash;
typedef struct bdb_temp_hash bdb_temp_hash;
struct bulk_dump;
typedef struct bulk_dump bulk_dump;
struct dtadump;
typedef struct dtadump dtadump;
#endif
struct bdb_queue_priv;
typedef struct bdb_queue_priv bdb_queue_priv;
struct bdb_cursor_thd_tag;
typedef struct bdb_cursor_thd_tag bdb_cursor_thd_t;
enum bdbcursor_types {
BDBC_UN = 0,
BDBC_IX = 1,
BDBC_DT = 2,
BDBC_SK = 3,
BDBC_BL = 4
};
char const *cursortype(int type);
/* track the cursor threading */
struct bdb_cursor_thd_tag {
int x;
};
struct bdb_cursor_impl_tag {
/* cursor btree info */
enum bdbcursor_types type; /* BDBC_IX, BDBC_DT */
int dbnum; /* dbnum for this bdbcursor */
int idx; /* BDBC_IX:ixnum, BDBC_DT:split_dta_num */
/* transaction */
bdb_state_type *state; /* state for */
cursor_tran_t *curtran; /* all cursors (but comdb2 mode have this */
tran_type *shadow_tran; /* read committed and snapshot/serializable modes */
/* cursor position */
int rrn; /* == 2 (don't need this) */
unsigned long long genid; /* genid of current entry */
void *data; /* points inside one of bdb_berkdb_t if valid */
int datalen; /* size of payload */
void *datacopy;
void *unpacked_datacopy;
/* new btree access interface */
bdb_berkdb_t *rl; /* persistent berkdb */
bdb_berkdb_t *sd; /* shadow berkdb */
/* comdb2 mode support */
DBCPS dbcps; /* serialized cursor */
/* perfmetrics */
int nsteps; /* count ops */
/* read committed/snapshot/serializable mode support */
tmpcursor_t *skip; /* skip list; don't touch this, use bdb_osql please */
char
*lastkey; /* set after a row is consumed from real data (see merging) */
int lastkeylen;
int laststripe;
int lastpage;
int lastindex;
/* read committed/snapshot/serializable (maybe we should merge this here, in
* bdb, not in db) */
tmpcursor_t *addcur; /* cursors for add and upd data shadows; */
void *addcur_odh; /* working area for addcur odh. */
int addcur_use_odh;
/* page-order read committed/snapshot/serializable/snapisol */
tmpcursor_t *pgordervs;
/* support for deadlock */
int invalidated; /* mark this if the cursor was unlocked */
/* page-order flags */
int pageorder; /* mark if the cursor is in page-order */
int discardpages; /* mark if the pages should be discarded immediately */
tmptable_t *vs_stab; /* Table of records to skip in the virtual stripe. */
tmpcursor_t *vs_skip; /* Cursor for vs_stab. */
#if 0
tmptable_t *cstripe; /* Cursor stripe */
tmpcursor_t *cscur; /* Cursor for cstripe */
#endif
int new_skip; /* Set to 1 when the vs_skip has a new record. */
int last_skip; /* Set to 1 if we've passed the last record. */
unsigned long long agenid; /* The last addcur genid. */
int repo_addcur; /* Set to 1 if we've added to addcur. */
int threaded; /* mark if this is this is threaded */
int upd_shadows_count;
/* XXX todo */
bdb_cursor_thd_t *thdinfo; /* track cursor threadinfo */
/* if pointer */
struct bdb_cursor_ifn *ifn;
/* col attributes */
char *collattr; /* pointer to tailing data, if any */
int collattr_len;
/* snapisol may need prescanning the updates to filter out
older genids added by younger commits */
int need_prescan;
int *pagelockflag;
int max_page_locks;
int rowlocks;
struct pglogs_queue_cursor *queue_cursor;
uint8_t ver;
uint8_t trak; /* debug this cursor: set to 1 for verbose */
uint8_t used_rl; /* set to 1 if rl position was consumed */
uint8_t used_sd; /* set to 1 if sd position was consumed */
};
struct bdb_cursor_ser_int {
uint8_t is_valid;
DBCS dbcs;
};
typedef struct bdb_cursor_ser_int bdb_cursor_ser_int_t;
#include "bdb_api.h"
#include "list.h"
struct deferred_berkdb_option {
char *attr;
char *value;
int ivalue;
LINKC_T(struct deferred_berkdb_option) lnk;
};
struct bdb_attr_tag {
#define DEF_ATTR(NAME, name, type, dflt, desc) int name;
#define DEF_ATTR_2(NAME, name, type, dflt, desc, flags, verify_fn, update_fn) \
int name;
#include "attr.h"
#undef DEF_ATTR
#undef DEF_ATTR_2
LISTC_T(struct deferred_berkdb_option) deferred_berkdb_options;
};
typedef int (*BDBFP)(); /*was called FP, but that clashed with dbutil.h - sj */
struct bdb_callback_tag {
WHOISMASTERFP whoismaster_rtn;
NODEUPFP nodeup_rtn;
GETROOMFP getroom_rtn;
REPFAILFP repfail_rtn;
BDBAPPSOCKFP appsock_rtn;
BDBAPPSOCKFP admin_appsock_rtn;
PRINTFP print_rtn;
BDBELECTSETTINGSFP electsettings_rtn;
BDBCATCHUPFP catchup_rtn;
BDBTHREADDUMPFP threaddump_rtn;
BDBGETFILELWMFP get_file_lwm_rtn;
BDBSETFILELWMFP set_file_lwm_rtn;
SCDONEFP scdone_rtn;
SCABORTFP scabort_rtn;
NODEDOWNFP nodedown_rtn;
SERIALCHECK serialcheck_rtn;
SYNCMODE syncmode_rtn;
};
struct waiting_for_lsn {
DB_LSN lsn;
int start;
LINKC_T(struct waiting_for_lsn) lnk;
};
typedef LISTC_T(struct waiting_for_lsn) wait_for_lsn_list;
typedef struct {
seqnum_type *seqnums; /* 1 per node num */
pthread_mutex_t lock;
pthread_cond_t cond;
pthread_key_t key;
wait_for_lsn_list **waitlist;
short *expected_udp_count;
short *incomming_udp_count;
short *udp_average_counter;
int *filenum;
pool_t *trackpool;
/* need to do a bit better here... */
struct averager **time_10seconds;
struct averager **time_minute;
} seqnum_info_type;
typedef struct {
int rep_process_message;
int rep_zerorc;
int rep_newsite;
int rep_holdelection;
int rep_newmaster;
int rep_dupmaster;
int rep_isperm;
int rep_notperm;
int rep_outdated;
int rep_other;
int dummy_adds;
int commits;
} repstats_type;
struct sockaddr_in;
typedef struct {
netinfo_type *netinfo;
char *master_host;
char *myhost;
pthread_mutex_t elect_mutex;
int *appseqnum; /* application level (bdb lib) sequencing */
pthread_mutex_t appseqnum_lock;
pthread_mutex_t upgrade_lock; /* ensure only 1 upgrade at a time */
pthread_mutex_t send_lock;
repstats_type repstats;
pthread_mutex_t receive_lock;
signed char in_rep_process_message;
signed char disable_watcher;
signed char in_election; /* true if we are in the middle of an election */
signed char upgrade_allowed;
int skipsinceepoch; /* since when have we been incoherent */
int rep_process_message_start_time;
int dont_elect_untill_time;
struct sockaddr_in *udp_addr;
pthread_t udp_thread;
int udp_fd;
int should_reject_timestamp;
int should_reject;
} repinfo_type;
enum {
STATE_COHERENT = 0,
STATE_INCOHERENT = 1,
STATE_INCOHERENT_SLOW = 2,
STATE_INCOHERENT_WAIT = 3
};
struct bdb_state_tag;
/* Every time we add a blkseq, if the log file rolled, we add a new
* entry with the earliest blkseq in the new log. We maintain this list in
* bdb_blkseq_insert and in bdb_blkseq_recover (should really call
* bdb_blkseq_insert
* in recovery instead). In log deletion code, we walk the list, and disallow
* deletion
* for log files where the blkseq is too new. */
struct seen_blkseq {
u_int32_t logfile;
int timestamp;
LINKC_T(struct seen_blkseq) lnk;
};
struct temp_table;
struct sc_redo_lsn {
DB_LSN lsn;
u_int32_t txnid;
LINKC_T(struct sc_redo_lsn) lnk;
};
struct bdb_state_tag {
pthread_attr_t pthread_attr_detach;
seqnum_info_type *seqnum_info;
bdb_attr_type *attr; /* attributes that have defaults */
bdb_callback_type *callback; /* callback functions */
DB_ENV *dbenv; /* transactional environment */
int read_write; /* if we opened the db with R/W access */
repinfo_type *repinfo; /* replication info */
signed char numdtafiles;
/* the berkeley db btrees underlying this "table" */
DB *dbp_data[MAXDTAFILES][MAXDTASTRIPE]; /* the data files. dbp_data[0] is
the primary data file which would contain
the record. higher files are extra data
aka the blobs. in blobstripe mode the
blob files are striped too, otherwise
they are not. */
DB *dbp_ix[MAXINDEX]; /* handle for the ixN files */
pthread_key_t tid_key;
int numthreads;
pthread_mutex_t numthreads_lock;
char *name; /* name of the comdb */
char *txndir; /* name of the transaction directory for log files */
char *tmpdir; /* name of directory for tempoarary dbs */
char *dir; /* directory the files go in (/bb/data /bb/data2) */
int lrl; /* Logical Record Length (0 = variable) */
short numix; /* number of indexes */
short ixlen[MAXINDEX]; /* size of each index */
signed char ixdta[MAXINDEX]; /* does this index contain the dta? */
int ixdtalen[MAXINDEX]; /* dta len in bytes (0 if index does not contain the dta or is full datacopy) */
signed char ixcollattr[MAXINDEX]; /* does this index contain the column
attributes? */
signed char ixnulls[MAXINDEX]; /*does this index contain any columns that
allow nulls?*/
signed char ixdups[MAXINDEX]; /* 1 if ix allows dupes, else 0 */
signed char
ixrecnum[MAXINDEX]; /* 1 if we turned on recnum mode for btrees */
short keymaxsz; /* size of the keymax buffer */
/* the helper threads (only valid for a "parent" bdb_state) */
pthread_t checkpoint_thread;
pthread_t watcher_thread;
pthread_t memp_trickle_thread;
pthread_t logdelete_thread;
pthread_t lock_detect_thread;
pthread_t coherency_lease_thread;
pthread_t master_lease_thread;
struct bdb_state_tag *parent; /* pointer to our parent */
short numchildren;
struct bdb_state_tag *children[MAX_CHILDREN];
pthread_rwlock_t *bdb_lock; /* we need this to do safe upgrades. fetch
operations get a read lock, upgrade requires
a write lock - this way we can close and
re-open databases knowing that there
are no cursors opened on them */
signed char bdb_lock_desired; /* signal that long running operations like
fast dump should GET OUT! so that we can
upgrade/downgrade */
void *usr_ptr;
pthread_t bdb_lock_write_holder;
thread_lock_info_type *bdb_lock_write_holder_ptr;
char bdb_lock_write_idstr[80];
int seed;
unsigned int last_genid_epoch;
pthread_mutex_t seed_lock;
/* One of the BDBTYPE_ constants */
int bdbtype;
/* How many total queue/queuedb add/consume operations total (ever)? */
int qdb_adds;
int qdb_cons;
/* Lite databases have no rrn cache, freerecs files, ix# files */
int pagesize_override; /* 0, or a power of 2 */
size_t queue_item_sz; /* size of a queue record in bytes (including
* struct bdb_queue_header) */
/* bit mask of which consumers want to consume new queue items */
uint32_t active_consumers;
unsigned long long master_cmpcontext;
/* stuff for the genid->thread affinity logic */
int maxthreadid;
unsigned char stripe_pool[17];
unsigned char stripe_pool_start;
pthread_mutex_t last_dta_lk;
int last_dta;
/* when did we convert to blobstripe? */
unsigned long long blobstripe_convert_genid;
pthread_mutex_t pending_broadcast_lock;
union {
unsigned long long orig; /* original time-based format */
struct {
uint16_t lo16; /* stripe + update-id in network byte order */
uint64_t hi48; /* incrementing rowid in host byte order */
} genid48; /* GENID48 format */
} gblcontext;
void (*signal_rtoff)(void);
int checkpoint_start_time;
hash_t *logical_transactions_hash;
DB_LSN lwm; /* low watermark for logical transactions */
/* chain all transactions */
pthread_mutex_t translist_lk;
LISTC_T(struct tran_tag) logical_transactions_list;
/* for queues this points to extra stuff defined in queue.c */
bdb_queue_priv *qpriv;
void *temp_list;
pthread_mutex_t temp_list_lock;
comdb2_objpool_t temp_table_pool; /* pooled temptables */
pthread_t priosqlthr;
int haspriosqlthr;
int temp_table_id;
int num_temp_tables;
DB_MPOOL_STAT *temp_stats;
pthread_mutex_t id_lock;
unsigned int id;
pthread_mutex_t gblcontext_lock;
pthread_mutex_t children_lock;
signed char have_children_lock;
FILE *bdblock_debug_fp;
pthread_mutex_t bdblock_debug_lock;
uint8_t version;
/* access control */
bdb_access_t *access;
char *origname; /* name before new.name shenanigans */
pthread_mutex_t exit_lock;
signed char have_recnums; /* 1 if ANY index has recnums enabled */
signed char exiting;
signed char caught_up; /* if we passed the recovery phase */
signed char isopen;
signed char envonly;
signed char need_to_downgrade_and_lose;
signed char rep_trace;
signed char berkdb_rep_startupdone;
signed char rep_started;
signed char master_handle;
signed char sanc_ok;
signed char ondisk_header; /* boolean: give each record an ondisk header? */
signed char compress; /* boolean: compress data? */
signed char compress_blobs; /*boolean: compress blobs? */
signed char persistent_seq; /* boolean: persistent seq for queue? */
signed char got_gblcontext;
signed char need_to_upgrade;
signed char in_recovery;
signed char in_bdb_recovery;
signed char low_headroom_count;
signed char pending_seqnum_broadcast;
int *coherent_state;
uint64_t *master_lease;
pthread_mutex_t master_lease_lk;
signed char after_llmeta_init_done;
pthread_mutex_t coherent_state_lock;
signed char