forked from huangzworks/redis-3.0-annotated
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsentinel.c
5057 lines (4326 loc) · 193 KB
/
sentinel.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/* Redis Sentinel implementation
*
* Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Redis nor the names of its contributors may be used
* to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#include "redis.h"
#include "hiredis.h"
#include "async.h"
#include <ctype.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/wait.h>
#include <fcntl.h>
extern char **environ;
// sentinel 的默认端口号
#define REDIS_SENTINEL_PORT 26379
/* ======================== Sentinel global state =========================== */
/* Address object, used to describe an ip:port pair. */
/* 地址对象,用于保存 IP 地址和端口 */
typedef struct sentinelAddr {
char *ip;
int port;
} sentinelAddr;
/* A Sentinel Redis Instance object is monitoring. */
/* 每个被监视的 Redis 实例都会创建一个 sentinelRedisInstance 结构
* 而每个结构的 flags 值会是以下常量的一个或多个的并 */
// 实例是一个主服务器
#define SRI_MASTER (1<<0)
// 实例是一个从服务器
#define SRI_SLAVE (1<<1)
// 实例是一个 Sentinel
#define SRI_SENTINEL (1<<2)
// 实例已断线
#define SRI_DISCONNECTED (1<<3)
// 实例已处于 SDOWN 状态
#define SRI_S_DOWN (1<<4) /* Subjectively down (no quorum). */
// 实例已处于 ODOWN 状态
#define SRI_O_DOWN (1<<5) /* Objectively down (confirmed by others). */
// Sentinel 认为主服务器已下线
#define SRI_MASTER_DOWN (1<<6) /* A Sentinel with this flag set thinks that
its master is down. */
// 正在对主服务器进行故障迁移
#define SRI_FAILOVER_IN_PROGRESS (1<<7) /* Failover is in progress for
this master. */
// 实例是被选中的新主服务器(目前仍是从服务器)
#define SRI_PROMOTED (1<<8) /* Slave selected for promotion. */
// 向从服务器发送 SLAVEOF 命令,让它们转向复制新主服务器
#define SRI_RECONF_SENT (1<<9) /* SLAVEOF <newmaster> sent. */
// 从服务器正在与新主服务器进行同步
#define SRI_RECONF_INPROG (1<<10) /* Slave synchronization in progress. */
// 从服务器与新主服务器同步完毕,开始复制新主服务器
#define SRI_RECONF_DONE (1<<11) /* Slave synchronized with new master. */
// 对主服务器强制执行故障迁移操作
#define SRI_FORCE_FAILOVER (1<<12) /* Force failover with master up. */
// 已经对返回 -BUSY 的服务器发送 SCRIPT KILL 命令
#define SRI_SCRIPT_KILL_SENT (1<<13) /* SCRIPT KILL already sent on -BUSY */
/* Note: times are in milliseconds. */
/* 各种时间常量,以毫秒为单位 */
// 发送 INFO 命令的间隔
#define SENTINEL_INFO_PERIOD 10000
// 发送 PING 命令的间隔
#define SENTINEL_PING_PERIOD 1000
// 发送 ASK 命令的间隔
#define SENTINEL_ASK_PERIOD 1000
// 发送 PUBLISH 命令的间隔
#define SENTINEL_PUBLISH_PERIOD 2000
// 默认的判断服务器已下线的时长
#define SENTINEL_DEFAULT_DOWN_AFTER 30000
// 默认的信息频道
#define SENTINEL_HELLO_CHANNEL "__sentinel__:hello"
// 默认的 TILT 触发时长
#define SENTINEL_TILT_TRIGGER 2000
// 默认的 TILT 环境时长(要多久才能退出 TITL 模式)
#define SENTINEL_TILT_PERIOD (SENTINEL_PING_PERIOD*30)
// 默认从服务器优先级
#define SENTINEL_DEFAULT_SLAVE_PRIORITY 100
#define SENTINEL_SLAVE_RECONF_TIMEOUT 10000
// 默认的同时对新主服务器进行复制的从服务器个数
#define SENTINEL_DEFAULT_PARALLEL_SYNCS 1
// 默认的最少重连接间隔
#define SENTINEL_MIN_LINK_RECONNECT_PERIOD 15000
// 默认的故障迁移执行时长
#define SENTINEL_DEFAULT_FAILOVER_TIMEOUT (60*3*1000)
// 默认的最大积压命令数量
#define SENTINEL_MAX_PENDING_COMMANDS 100
// 默认的选举超时时长
#define SENTINEL_ELECTION_TIMEOUT 10000
#define SENTINEL_MAX_DESYNC 1000
/* Failover machine different states. */
/* 故障转移时的状态 */
// 没在执行故障迁移
#define SENTINEL_FAILOVER_STATE_NONE 0 /* No failover in progress. */
// 正在等待开始故障迁移
#define SENTINEL_FAILOVER_STATE_WAIT_START 1 /* Wait for failover_start_time*/
// 正在挑选作为新主服务器的从服务器
#define SENTINEL_FAILOVER_STATE_SELECT_SLAVE 2 /* Select slave to promote */
// 向被选中的从服务器发送 SLAVEOF no one
#define SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE 3 /* Slave -> Master */
// 等待从服务器转变成主服务器
#define SENTINEL_FAILOVER_STATE_WAIT_PROMOTION 4 /* Wait slave to change role */
// 向已下线主服务器的其他从服务器发送 SLAVEOF 命令
// 让它们复制新的主服务器
#define SENTINEL_FAILOVER_STATE_RECONF_SLAVES 5 /* SLAVEOF newmaster */
// 监视被升级的从服务器
#define SENTINEL_FAILOVER_STATE_UPDATE_CONFIG 6 /* Monitor promoted slave. */
/* 主从服务器之间的连接状态 */
// 连接正常
#define SENTINEL_MASTER_LINK_STATUS_UP 0
// 连接断开
#define SENTINEL_MASTER_LINK_STATUS_DOWN 1
/* Generic flags that can be used with different functions.
* They use higher bits to avoid colliding with the function specific
* flags. */
/* 可以用于多个函数的通用标识。
* 使用高位来避免与一般标识冲突。 */
// 没有标识
#define SENTINEL_NO_FLAGS 0
// 生成事件
#define SENTINEL_GENERATE_EVENT (1<<16)
// 领头
#define SENTINEL_LEADER (1<<17)
// 观察者
#define SENTINEL_OBSERVER (1<<18)
/* Script execution flags and limits. */
/* 脚本执行状态和限制 */
// 脚本目前没有被执行
#define SENTINEL_SCRIPT_NONE 0
// 脚本正在执行
#define SENTINEL_SCRIPT_RUNNING 1
// 脚本队列保存脚本数量的最大值
#define SENTINEL_SCRIPT_MAX_QUEUE 256
// 同一时间可执行脚本的最大数量
#define SENTINEL_SCRIPT_MAX_RUNNING 16
// 脚本的最大执行时长
#define SENTINEL_SCRIPT_MAX_RUNTIME 60000 /* 60 seconds max exec time. */
// 脚本最大重试数量
#define SENTINEL_SCRIPT_MAX_RETRY 10
// 脚本重试之前的延迟时间
#define SENTINEL_SCRIPT_RETRY_DELAY 30000 /* 30 seconds between retries. */
// Sentinel 会为每个被监视的 Redis 实例创建相应的 sentinelRedisInstance 实例
// (被监视的实例可以是主服务器、从服务器、或者其他 Sentinel )
typedef struct sentinelRedisInstance {
// 标识值,记录了实例的类型,以及该实例的当前状态
int flags; /* See SRI_... defines */
// 实例的名字
// 主服务器的名字由用户在配置文件中设置
// 从服务器以及 Sentinel 的名字由 Sentinel 自动设置
// 格式为 ip:port ,例如 "127.0.0.1:26379"
char *name; /* Master name from the point of view of this sentinel. */
// 实例的运行 ID
char *runid; /* run ID of this instance. */
// 配置纪元,用于实现故障转移
uint64_t config_epoch; /* Configuration epoch. */
// 实例的地址
sentinelAddr *addr; /* Master host. */
// 用于发送命令的异步连接
redisAsyncContext *cc; /* Hiredis context for commands. */
// 用于执行 SUBSCRIBE 命令、接收频道信息的异步连接
// 仅在实例为主服务器时使用
redisAsyncContext *pc; /* Hiredis context for Pub / Sub. */
// 已发送但尚未回复的命令数量
int pending_commands; /* Number of commands sent waiting for a reply. */
// cc 连接的创建时间
mstime_t cc_conn_time; /* cc connection time. */
// pc 连接的创建时间
mstime_t pc_conn_time; /* pc connection time. */
// 最后一次从这个实例接收信息的时间
mstime_t pc_last_activity; /* Last time we received any message. */
// 实例最后一次返回正确的 PING 命令回复的时间
mstime_t last_avail_time; /* Last time the instance replied to ping with
a reply we consider valid. */
// 实例最后一次发送 PING 命令的时间
mstime_t last_ping_time; /* Last time a pending ping was sent in the
context of the current command connection
with the instance. 0 if still not sent or
if pong already received. */
// 实例最后一次返回 PING 命令的时间,无论内容正确与否
mstime_t last_pong_time; /* Last time the instance replied to ping,
whatever the reply was. That's used to check
if the link is idle and must be reconnected. */
// 最后一次向频道发送问候信息的时间
// 只在当前实例为 sentinel 时使用
mstime_t last_pub_time; /* Last time we sent hello via Pub/Sub. */
// 最后一次接收到这个 sentinel 发来的问候信息的时间
// 只在当前实例为 sentinel 时使用
mstime_t last_hello_time; /* Only used if SRI_SENTINEL is set. Last time
we received a hello from this Sentinel
via Pub/Sub. */
// 最后一次回复 SENTINEL is-master-down-by-addr 命令的时间
// 只在当前实例为 sentinel 时使用
mstime_t last_master_down_reply_time; /* Time of last reply to
SENTINEL is-master-down command. */
// 实例被判断为 SDOWN 状态的时间
mstime_t s_down_since_time; /* Subjectively down since time. */
// 实例被判断为 ODOWN 状态的时间
mstime_t o_down_since_time; /* Objectively down since time. */
// SENTINEL down-after-milliseconds 选项所设定的值
// 实例无响应多少毫秒之后才会被判断为主观下线(subjectively down)
mstime_t down_after_period; /* Consider it down after that period. */
// 从实例获取 INFO 命令的回复的时间
mstime_t info_refresh; /* Time at which we received INFO output from it. */
/* Role and the first time we observed it.
* This is useful in order to delay replacing what the instance reports
* with our own configuration. We need to always wait some time in order
* to give a chance to the leader to report the new configuration before
* we do silly things. */
// 实例的角色
int role_reported;
// 角色的更新时间
mstime_t role_reported_time;
// 最后一次从服务器的主服务器地址变更的时间
mstime_t slave_conf_change_time; /* Last time slave master addr changed. */
/* Master specific. */
/* 主服务器实例特有的属性 -------------------------------------------------------------*/
// 其他同样监控这个主服务器的所有 sentinel
dict *sentinels; /* Other sentinels monitoring the same master. */
// 如果这个实例代表的是一个主服务器
// 那么这个字典保存着主服务器属下的从服务器
// 字典的键是从服务器的名字,字典的值是从服务器对应的 sentinelRedisInstance 结构
dict *slaves; /* Slaves for this master instance. */
// SENTINEL monitor <master-name> <IP> <port> <quorum> 选项中的 quorum 参数
// 判断这个实例为客观下线(objectively down)所需的支持投票数量
int quorum; /* Number of sentinels that need to agree on failure. */
// SENTINEL parallel-syncs <master-name> <number> 选项的值
// 在执行故障转移操作时,可以同时对新的主服务器进行同步的从服务器数量
int parallel_syncs; /* How many slaves to reconfigure at same time. */
// 连接主服务器和从服务器所需的密码
char *auth_pass; /* Password to use for AUTH against master & slaves. */
/* Slave specific. */
/* 从服务器实例特有的属性 -------------------------------------------------------------*/
// 主从服务器连接断开的时间
mstime_t master_link_down_time; /* Slave replication link down time. */
// 从服务器优先级
int slave_priority; /* Slave priority according to its INFO output. */
// 执行故障转移操作时,从服务器发送 SLAVEOF <new-master> 命令的时间
mstime_t slave_reconf_sent_time; /* Time at which we sent SLAVE OF <new> */
// 主服务器的实例(在本实例为从服务器时使用)
struct sentinelRedisInstance *master; /* Master instance if it's slave. */
// INFO 命令的回复中记录的主服务器 IP
char *slave_master_host; /* Master host as reported by INFO */
// INFO 命令的回复中记录的主服务器端口号
int slave_master_port; /* Master port as reported by INFO */
// INFO 命令的回复中记录的主从服务器连接状态
int slave_master_link_status; /* Master link status as reported by INFO */
// 从服务器的复制偏移量
unsigned long long slave_repl_offset; /* Slave replication offset. */
/* Failover */
/* 故障转移相关属性 -------------------------------------------------------------------*/
// 如果这是一个主服务器实例,那么 leader 将是负责进行故障转移的 Sentinel 的运行 ID 。
// 如果这是一个 Sentinel 实例,那么 leader 就是被选举出来的领头 Sentinel 。
// 这个域只在 Sentinel 实例的 flags 属性的 SRI_MASTER_DOWN 标志处于打开状态时才有效。
char *leader; /* If this is a master instance, this is the runid of
the Sentinel that should perform the failover. If
this is a Sentinel, this is the runid of the Sentinel
that this Sentinel voted as leader. */
// 领头的纪元
uint64_t leader_epoch; /* Epoch of the 'leader' field. */
// 当前执行中的故障转移的纪元
uint64_t failover_epoch; /* Epoch of the currently started failover. */
// 故障转移操作的当前状态
int failover_state; /* See SENTINEL_FAILOVER_STATE_* defines. */
// 状态改变的时间
mstime_t failover_state_change_time;
// 最后一次进行故障迁移的时间
mstime_t failover_start_time; /* Last failover attempt start time. */
// SENTINEL failover-timeout <master-name> <ms> 选项的值
// 刷新故障迁移状态的最大时限
mstime_t failover_timeout; /* Max time to refresh failover state. */
mstime_t failover_delay_logged; /* For what failover_start_time value we
logged the failover delay. */
// 指向被提升为新主服务器的从服务器的指针
struct sentinelRedisInstance *promoted_slave; /* Promoted slave instance. */
/* Scripts executed to notify admin or reconfigure clients: when they
* are set to NULL no script is executed. */
// 一个文件路径,保存着 WARNING 级别的事件发生时执行的,
// 用于通知管理员的脚本的地址
char *notification_script;
// 一个文件路径,保存着故障转移执行之前、之后、或者被中止时,
// 需要执行的脚本的地址
char *client_reconfig_script;
} sentinelRedisInstance;
/* Main state. */
/* Sentinel 的状态结构 */
struct sentinelState {
// 当前纪元
uint64_t current_epoch; /* Current epoch. */
// 保存了所有被这个 sentinel 监视的主服务器
// 字典的键是主服务器的名字
// 字典的值则是一个指向 sentinelRedisInstance 结构的指针
dict *masters; /* Dictionary of master sentinelRedisInstances.
Key is the instance name, value is the
sentinelRedisInstance structure pointer. */
// 是否进入了 TILT 模式?
int tilt; /* Are we in TILT mode? */
// 目前正在执行的脚本的数量
int running_scripts; /* Number of scripts in execution right now. */
// 进入 TILT 模式的时间
mstime_t tilt_start_time; /* When TITL started. */
// 最后一次执行时间处理器的时间
mstime_t previous_time; /* Last time we ran the time handler. */
// 一个 FIFO 队列,包含了所有需要执行的用户脚本
list *scripts_queue; /* Queue of user scripts to execute. */
} sentinel;
/* A script execution job. */
// 脚本运行状态
typedef struct sentinelScriptJob {
// 标志,记录了脚本是否运行
int flags; /* Script job flags: SENTINEL_SCRIPT_* */
// 该脚本的已尝试执行次数
int retry_num; /* Number of times we tried to execute it. */
// 要传给脚本的参数
char **argv; /* Arguments to call the script. */
// 开始运行脚本的时间
mstime_t start_time; /* Script execution time if the script is running,
otherwise 0 if we are allowed to retry the
execution at any time. If the script is not
running and it's not 0, it means: do not run
before the specified time. */
// 脚本由子进程执行,该属性记录子进程的 pid
pid_t pid; /* Script execution pid. */
} sentinelScriptJob;
/* ======================= hiredis ae.c adapters =============================
* Note: this implementation is taken from hiredis/adapters/ae.h, however
* we have our modified copy for Sentinel in order to use our allocator
* and to have full control over how the adapter works. */
// 客户端适配器(adapter)结构
typedef struct redisAeEvents {
// 客户端连接上下文
redisAsyncContext *context;
// 服务器的事件循环
aeEventLoop *loop;
// 套接字
int fd;
// 记录读事件以及写事件是否就绪
int reading, writing;
} redisAeEvents;
// 读事件处理器
static void redisAeReadEvent(aeEventLoop *el, int fd, void *privdata, int mask) {
((void)el); ((void)fd); ((void)mask);
redisAeEvents *e = (redisAeEvents*)privdata;
// 从连接中进行读取
redisAsyncHandleRead(e->context);
}
// 写事件处理器
static void redisAeWriteEvent(aeEventLoop *el, int fd, void *privdata, int mask) {
((void)el); ((void)fd); ((void)mask);
redisAeEvents *e = (redisAeEvents*)privdata;
// 从连接中进行写入
redisAsyncHandleWrite(e->context);
}
// 将读事件处理器安装到事件循环中
static void redisAeAddRead(void *privdata) {
redisAeEvents *e = (redisAeEvents*)privdata;
aeEventLoop *loop = e->loop;
// 如果读事件处理器未安装,那么进行安装
if (!e->reading) {
e->reading = 1;
aeCreateFileEvent(loop,e->fd,AE_READABLE,redisAeReadEvent,e);
}
}
// 从事件循环中删除读事件处理器
static void redisAeDelRead(void *privdata) {
redisAeEvents *e = (redisAeEvents*)privdata;
aeEventLoop *loop = e->loop;
// 仅在读事件处理器已安装的情况下进行删除
if (e->reading) {
e->reading = 0;
aeDeleteFileEvent(loop,e->fd,AE_READABLE);
}
}
// 将写事件处理器安装到事件循环中
static void redisAeAddWrite(void *privdata) {
redisAeEvents *e = (redisAeEvents*)privdata;
aeEventLoop *loop = e->loop;
if (!e->writing) {
e->writing = 1;
aeCreateFileEvent(loop,e->fd,AE_WRITABLE,redisAeWriteEvent,e);
}
}
// 从事件循环中删除写事件处理器
static void redisAeDelWrite(void *privdata) {
redisAeEvents *e = (redisAeEvents*)privdata;
aeEventLoop *loop = e->loop;
if (e->writing) {
e->writing = 0;
aeDeleteFileEvent(loop,e->fd,AE_WRITABLE);
}
}
// 清理事件
static void redisAeCleanup(void *privdata) {
redisAeEvents *e = (redisAeEvents*)privdata;
redisAeDelRead(privdata);
redisAeDelWrite(privdata);
zfree(e);
}
// 为上下文 ae 和事件循环 loop 创建 hiredis 适配器
// 并设置相关的异步处理函数
static int redisAeAttach(aeEventLoop *loop, redisAsyncContext *ac) {
redisContext *c = &(ac->c);
redisAeEvents *e;
/* Nothing should be attached when something is already attached */
if (ac->ev.data != NULL)
return REDIS_ERR;
/* Create container for context and r/w events */
// 创建适配器
e = (redisAeEvents*)zmalloc(sizeof(*e));
e->context = ac;
e->loop = loop;
e->fd = c->fd;
e->reading = e->writing = 0;
/* Register functions to start/stop listening for events */
// 设置异步调用函数
ac->ev.addRead = redisAeAddRead;
ac->ev.delRead = redisAeDelRead;
ac->ev.addWrite = redisAeAddWrite;
ac->ev.delWrite = redisAeDelWrite;
ac->ev.cleanup = redisAeCleanup;
ac->ev.data = e;
return REDIS_OK;
}
/* ============================= Prototypes ================================= */
void sentinelLinkEstablishedCallback(const redisAsyncContext *c, int status);
void sentinelDisconnectCallback(const redisAsyncContext *c, int status);
void sentinelReceiveHelloMessages(redisAsyncContext *c, void *reply, void *privdata);
sentinelRedisInstance *sentinelGetMasterByName(char *name);
char *sentinelGetSubjectiveLeader(sentinelRedisInstance *master);
char *sentinelGetObjectiveLeader(sentinelRedisInstance *master);
int yesnotoi(char *s);
void sentinelDisconnectInstanceFromContext(const redisAsyncContext *c);
void sentinelKillLink(sentinelRedisInstance *ri, redisAsyncContext *c);
const char *sentinelRedisInstanceTypeStr(sentinelRedisInstance *ri);
void sentinelAbortFailover(sentinelRedisInstance *ri);
void sentinelEvent(int level, char *type, sentinelRedisInstance *ri, const char *fmt, ...);
sentinelRedisInstance *sentinelSelectSlave(sentinelRedisInstance *master);
void sentinelScheduleScriptExecution(char *path, ...);
void sentinelStartFailover(sentinelRedisInstance *master);
void sentinelDiscardReplyCallback(redisAsyncContext *c, void *reply, void *privdata);
int sentinelSendSlaveOf(sentinelRedisInstance *ri, char *host, int port);
char *sentinelVoteLeader(sentinelRedisInstance *master, uint64_t req_epoch, char *req_runid, uint64_t *leader_epoch);
void sentinelFlushConfig(void);
void sentinelGenerateInitialMonitorEvents(void);
int sentinelSendPing(sentinelRedisInstance *ri);
/* ========================= Dictionary types =============================== */
unsigned int dictSdsHash(const void *key);
int dictSdsKeyCompare(void *privdata, const void *key1, const void *key2);
void releaseSentinelRedisInstance(sentinelRedisInstance *ri);
void dictInstancesValDestructor (void *privdata, void *obj) {
releaseSentinelRedisInstance(obj);
}
/* Instance name (sds) -> instance (sentinelRedisInstance pointer)
*
* also used for: sentinelRedisInstance->sentinels dictionary that maps
* sentinels ip:port to last seen time in Pub/Sub hello message. */
// 这个字典类型有两个作用:
// 1) 将实例名字映射到一个 sentinelRedisInstance 指针
// 2) 将 sentinelRedisInstance 指针映射到一个字典,
// 字典的键是 Sentinel 的 ip:port 地址,
// 字典的值是该 Sentinel 最后一次向频道发送信息的时间
dictType instancesDictType = {
dictSdsHash, /* hash function */
NULL, /* key dup */
NULL, /* val dup */
dictSdsKeyCompare, /* key compare */
NULL, /* key destructor */
dictInstancesValDestructor /* val destructor */
};
/* Instance runid (sds) -> votes (long casted to void*)
*
* This is useful into sentinelGetObjectiveLeader() function in order to
* count the votes and understand who is the leader. */
// 将一个运行 ID 映射到一个 cast 成 void* 类型的 long 值的投票数量上
// 用于统计客观 leader sentinel
dictType leaderVotesDictType = {
dictSdsHash, /* hash function */
NULL, /* key dup */
NULL, /* val dup */
dictSdsKeyCompare, /* key compare */
NULL, /* key destructor */
NULL /* val destructor */
};
/* =========================== Initialization =============================== */
void sentinelCommand(redisClient *c);
void sentinelInfoCommand(redisClient *c);
void sentinelSetCommand(redisClient *c);
void sentinelPublishCommand(redisClient *c);
// 服务器在 sentinel 模式下可执行的命令
struct redisCommand sentinelcmds[] = {
{"ping",pingCommand,1,"",0,NULL,0,0,0,0,0},
{"sentinel",sentinelCommand,-2,"",0,NULL,0,0,0,0,0},
{"subscribe",subscribeCommand,-2,"",0,NULL,0,0,0,0,0},
{"unsubscribe",unsubscribeCommand,-1,"",0,NULL,0,0,0,0,0},
{"psubscribe",psubscribeCommand,-2,"",0,NULL,0,0,0,0,0},
{"punsubscribe",punsubscribeCommand,-1,"",0,NULL,0,0,0,0,0},
{"publish",sentinelPublishCommand,3,"",0,NULL,0,0,0,0,0},
{"info",sentinelInfoCommand,-1,"",0,NULL,0,0,0,0,0},
{"shutdown",shutdownCommand,-1,"",0,NULL,0,0,0,0,0}
};
/* This function overwrites a few normal Redis config default with Sentinel
* specific defaults. */
// 这个函数会用 Sentinel 所属的属性覆盖服务器默认的属性
void initSentinelConfig(void) {
server.port = REDIS_SENTINEL_PORT;
}
/* Perform the Sentinel mode initialization. */
// 以 Sentinel 模式初始化服务器
void initSentinel(void) {
int j;
/* Remove usual Redis commands from the command table, then just add
* the SENTINEL command. */
// 清空 Redis 服务器的命令表(该表用于普通模式)
dictEmpty(server.commands,NULL);
// 将 SENTINEL 模式所用的命令添加进命令表
for (j = 0; j < sizeof(sentinelcmds)/sizeof(sentinelcmds[0]); j++) {
int retval;
struct redisCommand *cmd = sentinelcmds+j;
retval = dictAdd(server.commands, sdsnew(cmd->name), cmd);
redisAssert(retval == DICT_OK);
}
/* Initialize various data structures. */
/* 初始化 Sentinel 的状态 */
// 初始化纪元
sentinel.current_epoch = 0;
// 初始化保存主服务器信息的字典
sentinel.masters = dictCreate(&instancesDictType,NULL);
// 初始化 TILT 模式的相关选项
sentinel.tilt = 0;
sentinel.tilt_start_time = 0;
sentinel.previous_time = mstime();
// 初始化脚本相关选项
sentinel.running_scripts = 0;
sentinel.scripts_queue = listCreate();
}
/* This function gets called when the server is in Sentinel mode, started,
* loaded the configuration, and is ready for normal operations. */
// 这个函数在 Sentinel 准备就绪,可以执行操作时执行
void sentinelIsRunning(void) {
redisLog(REDIS_WARNING,"Sentinel runid is %s", server.runid);
// Sentinel 不能在没有配置文件的情况下执行
if (server.configfile == NULL) {
redisLog(REDIS_WARNING,
"Sentinel started without a config file. Exiting...");
exit(1);
} else if (access(server.configfile,W_OK) == -1) {
redisLog(REDIS_WARNING,
"Sentinel config file %s is not writable: %s. Exiting...",
server.configfile,strerror(errno));
exit(1);
}
/* We want to generate a +monitor event for every configured master
* at startup. */
sentinelGenerateInitialMonitorEvents();
}
/* ============================== sentinelAddr ============================== */
/* Create a sentinelAddr object and return it on success.
*
* 创建一个 sentinel 地址对象,并在创建成功时返回该对象。
*
* On error NULL is returned and errno is set to:
*
* 函数在出错时返回 NULL ,并将 errnor 设为以下值:
*
* ENOENT: Can't resolve the hostname.
* 不能解释 hostname
*
* EINVAL: Invalid port number.
* 端口号不正确
*/
sentinelAddr *createSentinelAddr(char *hostname, int port) {
char buf[32];
sentinelAddr *sa;
// 检查端口号
if (port <= 0 || port > 65535) {
errno = EINVAL;
return NULL;
}
// 检查并创建地址
if (anetResolve(NULL,hostname,buf,sizeof(buf)) == ANET_ERR) {
errno = ENOENT;
return NULL;
}
// 创建并返回地址结构
sa = zmalloc(sizeof(*sa));
sa->ip = sdsnew(buf);
sa->port = port;
return sa;
}
/* Return a duplicate of the source address. */
// 复制并返回给定地址的一个副本
sentinelAddr *dupSentinelAddr(sentinelAddr *src) {
sentinelAddr *sa;
sa = zmalloc(sizeof(*sa));
sa->ip = sdsnew(src->ip);
sa->port = src->port;
return sa;
}
/* Free a Sentinel address. Can't fail. */
// 释放 Sentinel 地址
void releaseSentinelAddr(sentinelAddr *sa) {
sdsfree(sa->ip);
zfree(sa);
}
/* Return non-zero if two addresses are equal. */
// 如果两个地址相同,那么返回 0
int sentinelAddrIsEqual(sentinelAddr *a, sentinelAddr *b) {
return a->port == b->port && !strcasecmp(a->ip,b->ip);
}
/* =========================== Events notification ========================== */
/* Send an event to log, pub/sub, user notification script.
*
* 将事件发送到日志、频道,以及用户提醒脚本。
*
* 'level' is the log level for logging. Only REDIS_WARNING events will trigger
* the execution of the user notification script.
*
* level 是日志的级别。只有 REDIS_WARNING 级别的日志会触发用户提醒脚本。
*
* 'type' is the message type, also used as a pub/sub channel name.
*
* type 是信息的类型,也用作频道的名字。
*
* 'ri', is the redis instance target of this event if applicable, and is
* used to obtain the path of the notification script to execute.
*
* ri 是引发事件的 Redis 实例,它可以用来获取可执行的用户脚本。
*
* The remaining arguments are printf-alike.
*
* 剩下的都是类似于传给 printf 函数的参数。
*
* If the format specifier starts with the two characters "%@" then ri is
* not NULL, and the message is prefixed with an instance identifier in the
* following format:
*
* 如果格式指定以 "%@" 两个字符开头,并且 ri 不为空,
* 那么信息将使用以下实例标识符为开头:
*
* <instance type> <instance name> <ip> <port>
*
* If the instance type is not master, than the additional string is
* added to specify the originating master:
*
* 如果实例的类型不是主服务器,那么以下内容会被追加到信息的后面,
* 用于指定目标主服务器:
*
* @ <master name> <master ip> <master port>
*
* Any other specifier after "%@" is processed by printf itself.
*
* "%@" 之后的其他指派器(specifier)都和 printf 函数所使用的指派器一样。
*/
void sentinelEvent(int level, char *type, sentinelRedisInstance *ri,
const char *fmt, ...) {
va_list ap;
// 日志字符串
char msg[REDIS_MAX_LOGMSG_LEN];
robj *channel, *payload;
/* Handle %@ */
// 处理 %@
if (fmt[0] == '%' && fmt[1] == '@') {
// 如果 ri 实例是主服务器,那么 master 就是 NULL
// 否则 ri 就是一个从服务器或者 sentinel ,而 master 就是该实例的主服务器
//
// sentinelRedisInstance *master = NULL;
// if (~(ri->flags & SRI_MASTER))
// master = ri->master;
sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ?
NULL : ri->master;
if (master) {
// ri 不是主服务器
snprintf(msg, sizeof(msg), "%s %s %s %d @ %s %s %d",
// 打印 ri 的类型
sentinelRedisInstanceTypeStr(ri),
// 打印 ri 的名字、IP 和端口号
ri->name, ri->addr->ip, ri->addr->port,
// 打印 ri 的主服务器的名字、 IP 和端口号
master->name, master->addr->ip, master->addr->port);
} else {
// ri 是主服务器
snprintf(msg, sizeof(msg), "%s %s %s %d",
// 打印 ri 的类型
sentinelRedisInstanceTypeStr(ri),
// 打印 ri 的名字、IP 和端口号
ri->name, ri->addr->ip, ri->addr->port);
}
// 跳过已处理的 "%@" 字符
fmt += 2;
} else {
msg[0] = '\0';
}
/* Use vsprintf for the rest of the formatting if any. */
// 打印之后的内容,格式和平常的 printf 一样
if (fmt[0] != '\0') {
va_start(ap, fmt);
vsnprintf(msg+strlen(msg), sizeof(msg)-strlen(msg), fmt, ap);
va_end(ap);
}
/* Log the message if the log level allows it to be logged. */
// 如果日志的级别足够高的话,那么记录到日志中
if (level >= server.verbosity)
redisLog(level,"%s %s",type,msg);
/* Publish the message via Pub/Sub if it's not a debugging one. */
// 如果日志不是 DEBUG 日志,那么将它发送到频道中
if (level != REDIS_DEBUG) {
// 频道
channel = createStringObject(type,strlen(type));
// 内容
payload = createStringObject(msg,strlen(msg));
// 发送信息
pubsubPublishMessage(channel,payload);
decrRefCount(channel);
decrRefCount(payload);
}
/* Call the notification script if applicable. */
// 如果有需要的话,调用提醒脚本
if (level == REDIS_WARNING && ri != NULL) {
sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ?
ri : ri->master;
if (master->notification_script) {
sentinelScheduleScriptExecution(master->notification_script,
type,msg,NULL);
}
}
}
/* This function is called only at startup and is used to generate a
* +monitor event for every configured master. The same events are also
* generated when a master to monitor is added at runtime via the
* SENTINEL MONITOR command. */
// 在 Sentinel 启动时执行,用于创建并生成 +monitor 事件
void sentinelGenerateInitialMonitorEvents(void) {
dictIterator *di;
dictEntry *de;
di = dictGetIterator(sentinel.masters);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);
sentinelEvent(REDIS_WARNING,"+monitor",ri,"%@ quorum %d",ri->quorum);
}
dictReleaseIterator(di);
}
/* ============================ script execution ============================ */
/* Release a script job structure and all the associated data. */
// 释放一个脚本任务结构,以及该任务的相关数据。
void sentinelReleaseScriptJob(sentinelScriptJob *sj) {
int j = 0;
while(sj->argv[j]) sdsfree(sj->argv[j++]);
zfree(sj->argv);
zfree(sj);
}
// 将给定参数和脚本放入队列
#define SENTINEL_SCRIPT_MAX_ARGS 16
void sentinelScheduleScriptExecution(char *path, ...) {
va_list ap;
char *argv[SENTINEL_SCRIPT_MAX_ARGS+1];
int argc = 1;
sentinelScriptJob *sj;
// 生成参数
va_start(ap, path);
while(argc < SENTINEL_SCRIPT_MAX_ARGS) {
argv[argc] = va_arg(ap,char*);
if (!argv[argc]) break;
argv[argc] = sdsnew(argv[argc]); /* Copy the string. */
argc++;
}
va_end(ap);
argv[0] = sdsnew(path);
// 初始化脚本结构
sj = zmalloc(sizeof(*sj));
sj->flags = SENTINEL_SCRIPT_NONE;
sj->retry_num = 0;
sj->argv = zmalloc(sizeof(char*)*(argc+1));
sj->start_time = 0;
sj->pid = 0;
memcpy(sj->argv,argv,sizeof(char*)*(argc+1));
// 添加到等待执行脚本队列的末尾, FIFO
listAddNodeTail(sentinel.scripts_queue,sj);
/* Remove the oldest non running script if we already hit the limit. */
// 如果入队的脚本数量太多,那么移除最旧的未执行脚本
if (listLength(sentinel.scripts_queue) > SENTINEL_SCRIPT_MAX_QUEUE) {
listNode *ln;
listIter li;
listRewind(sentinel.scripts_queue,&li);
while ((ln = listNext(&li)) != NULL) {
sj = ln->value;
// 不删除正在运行的脚本
if (sj->flags & SENTINEL_SCRIPT_RUNNING) continue;
/* The first node is the oldest as we add on tail. */
listDelNode(sentinel.scripts_queue,ln);
sentinelReleaseScriptJob(sj);
break;
}
redisAssert(listLength(sentinel.scripts_queue) <=
SENTINEL_SCRIPT_MAX_QUEUE);
}
}
/* Lookup a script in the scripts queue via pid, and returns the list node
* (so that we can easily remove it from the queue if needed). */
// 根据 pid ,查找正在运行中的脚本
listNode *sentinelGetScriptListNodeByPid(pid_t pid) {
listNode *ln;
listIter li;
listRewind(sentinel.scripts_queue,&li);
while ((ln = listNext(&li)) != NULL) {
sentinelScriptJob *sj = ln->value;
if ((sj->flags & SENTINEL_SCRIPT_RUNNING) && sj->pid == pid)
return ln;
}
return NULL;
}
/* Run pending scripts if we are not already at max number of running
* scripts. */
// 运行等待执行的脚本
void sentinelRunPendingScripts(void) {
listNode *ln;
listIter li;
mstime_t now = mstime();
/* Find jobs that are not running and run them, from the top to the
* tail of the queue, so we run older jobs first. */
// 如果运行的脚本数量未超过最大值,