Skip to content

Commit

Permalink
Merge pull request #17 from JMTamayo/main
Browse files Browse the repository at this point in the history
v0.4.1
  • Loading branch information
JMTamayo authored Jun 13, 2024
2 parents e6fddaf + 8dbc141 commit 3d6ae08
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 18 deletions.
6 changes: 6 additions & 0 deletions redsumer-rs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## ✨ v0.4.1 [2024-06-13]

### Fixed:

- 🛠 Fixing BUG reported in [issue #15](https://github.com/enerBit/redsumer-rs/issues/15) with arguments in function xclaim.

## ✨ v0.4.0 [2024-04-23]

### Added:
Expand Down
8 changes: 4 additions & 4 deletions redsumer-rs/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "redsumer"
description = "Lightweight implementation of Redis Streams for Rust"
version = "0.4.0"
version = "0.4.1"
edition = "2021"
license-file = "../LICENSE"
readme = "../README.md"
Expand All @@ -22,10 +22,10 @@ authors = [

[dependencies]
redis = { version = "0.25.3", features = ["tokio-comp", "streams"] }
tokio = { version = "1.37.0", features = ["full"] }
tokio = { version = "1.38.0", features = ["full"] }
uuid = { version = "1.8.0" }
time = { version = "0.3.36", features = ["parsing"] }
bytes = { version = "1.6.0" }
serde = { version = "1.0.198", features = ["derive"] }
serde_json = { version = "1.0.116" }
serde = { version = "1.0.203", features = ["derive"] }
serde_json = { version = "1.0.117" }
log = { version = "0.4.21" }
34 changes: 20 additions & 14 deletions redsumer-rs/src/redsumer/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,25 @@ impl<'c> RedsumerConsumer<'c> {

/// Claim pending messages from *stream* from *since_id* to the newest one until to a maximum of *claimed_messages_count* using [`Commands::xpending_count`] ([`XPENDING`](https://redis.io/commands/xpending/)) and [`Commands::xclaim_options`] ([`XCLAIM`](https://redis.io/commands/xclaim/)).
fn claim_pending_messages(&self) -> RedsumerResult<Vec<StreamId>> {
let ids_to_claim: Vec<Id> = self
.get_client()
.get_connection()?
.xpending_count::<_, _, _, _, _, StreamPendingCountReply>(
self.get_stream_name(),
self.get_group_name(),
self.get_since_id(),
"+",
self.get_claimed_messages_count(),
)?
.ids
.iter()
.map(|stream_pending_id| stream_pending_id.id.to_owned())
.collect::<Vec<Id>>();

if ids_to_claim.is_empty() {
return Ok(Vec::new());
}

Ok(self
.get_client()
.get_connection()?
Expand All @@ -234,20 +253,7 @@ impl<'c> RedsumerConsumer<'c> {
self.get_group_name(),
self.get_consumer_name(),
self.get_min_idle_time_milliseconds(),
&(self
.get_client()
.get_connection()?
.xpending_count::<_, _, _, _, _, StreamPendingCountReply>(
self.get_stream_name(),
self.get_group_name(),
self.get_since_id(),
"+",
self.get_claimed_messages_count(),
)?
.ids
.iter()
.map(|stream_pending_id| stream_pending_id.id.to_owned())
.collect::<Vec<Id>>()),
&ids_to_claim,
StreamClaimOptions::default(),
)?
.ids)
Expand Down

0 comments on commit 3d6ae08

Please sign in to comment.