Skip to content

Commit bb53449

Browse files
urklenihohit
authored andcommitted
Add lastid option to xclaim
1 parent ad70f98 commit bb53449

File tree

3 files changed

+87
-2
lines changed

3 files changed

+87
-2
lines changed

redis/src/commands/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -1563,7 +1563,7 @@ implement_commands! {
15631563
/// ```text
15641564
/// XCLAIM <key> <group> <consumer> <min-idle-time> <ID-1> <ID-2>
15651565
/// [IDLE <milliseconds>] [TIME <mstime>] [RETRYCOUNT <count>]
1566-
/// [FORCE] [JUSTID]
1566+
/// [FORCE] [JUSTID] [LASTID <lastid>]
15671567
/// ```
15681568
#[cfg(feature = "streams")]
15691569
#[cfg_attr(docsrs, doc(cfg(feature = "streams")))]

redis/src/streams.rs

+12
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,8 @@ pub struct StreamClaimOptions {
101101
/// Set `JUSTID` cmd arg. Be advised: the response
102102
/// type changes with this option.
103103
justid: bool,
104+
/// Set `LASTID <lastid>` cmd arg.
105+
lastid: Option<String>,
104106
}
105107

106108
impl StreamClaimOptions {
@@ -134,6 +136,12 @@ impl StreamClaimOptions {
134136
self.justid = true;
135137
self
136138
}
139+
140+
/// Set `LASTID <lastid>` cmd arg.
141+
pub fn with_lastid(mut self, lastid: impl Into<String>) -> Self {
142+
self.lastid = Some(lastid.into());
143+
self
144+
}
137145
}
138146

139147
impl ToRedisArgs for StreamClaimOptions {
@@ -159,6 +167,10 @@ impl ToRedisArgs for StreamClaimOptions {
159167
if self.justid {
160168
out.write_arg(b"JUSTID");
161169
}
170+
if let Some(ref lastid) = self.lastid {
171+
out.write_arg(b"LASTID");
172+
lastid.write_redis_args(out);
173+
}
162174
}
163175
}
164176

redis/tests/test_streams.rs

+74-1
Original file line numberDiff line numberDiff line change
@@ -575,7 +575,6 @@ fn test_xclaim() {
575575

576576
// save this StreamId for later
577577
let claim = &reply.keys[0].ids[0];
578-
let _claim_1 = &reply.keys[0].ids[1];
579578
let claim_justids = &reply.keys[0]
580579
.ids
581580
.iter()
@@ -644,6 +643,80 @@ fn test_xclaim() {
644643
assert_eq!(claimed.len(), 10);
645644
}
646645

646+
#[test]
647+
fn test_xclaim_last_id() {
648+
let ctx = TestContext::new();
649+
let mut con = ctx.connection();
650+
651+
let result: RedisResult<String> = con.xgroup_create_mkstream("k1", "g1", "$");
652+
assert!(result.is_ok());
653+
654+
// add some keys
655+
xadd_keyrange(&mut con, "k1", 0, 10);
656+
657+
let reply: StreamReadReply = con
658+
.xread_options(&["k1"], &["0"], &StreamReadOptions::default())
659+
.unwrap();
660+
// verify we have 10 ids
661+
assert_eq!(reply.keys[0].ids.len(), 10);
662+
663+
let claim_early_id = &reply.keys[0].ids[3];
664+
let claim_middle_id = &reply.keys[0].ids[5];
665+
let claim_late_id = &reply.keys[0].ids[8];
666+
667+
// get read up to the middle record
668+
let _: StreamReadReply = con
669+
.xread_options(
670+
&["k1"],
671+
&[">"],
672+
&StreamReadOptions::default().count(6).group("g1", "c1"),
673+
)
674+
.unwrap();
675+
676+
let info: StreamInfoGroupsReply = con.xinfo_groups("k1").unwrap();
677+
assert_eq!(info.groups[0].last_delivered_id, claim_middle_id.id.clone());
678+
679+
// sleep for 5ms
680+
sleep(Duration::from_millis(5));
681+
682+
let _: Vec<String> = con
683+
.xclaim_options(
684+
"k1",
685+
"g1",
686+
"c2",
687+
4,
688+
&[claim_middle_id.id.clone()],
689+
StreamClaimOptions::default()
690+
.with_justid()
691+
.with_lastid(claim_early_id.id.as_str()),
692+
)
693+
.unwrap();
694+
695+
// lastid is kept at the 6th entry as the 4th entry is OLDER than the last_delivered_id
696+
let info: StreamInfoGroupsReply = con.xinfo_groups("k1").unwrap();
697+
assert_eq!(info.groups[0].last_delivered_id, claim_middle_id.id.clone());
698+
699+
// sleep for 5ms
700+
sleep(Duration::from_millis(5));
701+
702+
let _: Vec<String> = con
703+
.xclaim_options(
704+
"k1",
705+
"g1",
706+
"c1",
707+
4,
708+
&[claim_middle_id.id.clone()],
709+
StreamClaimOptions::default()
710+
.with_justid()
711+
.with_lastid(claim_late_id.id.as_str()),
712+
)
713+
.unwrap();
714+
715+
// lastid is moved to the 8th entry as it is NEWER than the last_delivered_id
716+
let info: StreamInfoGroupsReply = con.xinfo_groups("k1").unwrap();
717+
assert_eq!(info.groups[0].last_delivered_id, claim_late_id.id.clone());
718+
}
719+
647720
#[test]
648721
fn test_xdel() {
649722
// Tests the following commands....

0 commit comments

Comments
 (0)