Skip to content

Commit

Permalink
Fixed deletion of expired retain msg after restore from persistent pl…
Browse files Browse the repository at this point in the history
…ugin
  • Loading branch information
NorbertHeusser committed Mar 12, 2024
1 parent 68a95d8 commit 4b91612
Show file tree
Hide file tree
Showing 9 changed files with 90 additions and 17 deletions.
6 changes: 6 additions & 0 deletions src/database.c
Original file line number Diff line number Diff line change
Expand Up @@ -1201,6 +1201,12 @@ int db__message_release_incoming(struct mosquitto *context, uint16_t mid)
}


void db__retain_expiry_check()
{
retain__expiry_check(&db.retains);
}


void db__expire_all_messages(struct mosquitto *context)
{
struct mosquitto__client_msg *client_msg, *tmp;
Expand Down
1 change: 1 addition & 0 deletions src/mosquitto.c
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,7 @@ int main(int argc, char *argv[])

plugin_persist__handle_restore();
session_expiry__check();
db__retain_expiry_check();
db__msg_store_compact();

/* After loading persisted clients and ACLs, try to associate them,
Expand Down
3 changes: 2 additions & 1 deletion src/mosquitto_broker_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -732,6 +732,7 @@ int db__message_write_queued_in(struct mosquitto *context);
void db__msg_add_to_inflight_stats(struct mosquitto_msg_data *msg_data, struct mosquitto__client_msg *msg);
void db__msg_add_to_queued_stats(struct mosquitto_msg_data *msg_data, struct mosquitto__client_msg *msg);
uint64_t db__new_msg_id(void);
void db__expiry_check(void);
void db__expire_all_messages(struct mosquitto *context);
void db__check_acl_of_all_messages(struct mosquitto *context);

Expand Down Expand Up @@ -889,7 +890,7 @@ int retain__init(void);
void retain__clean(struct mosquitto__retainhier **retainhier);
int retain__queue(struct mosquitto *context, const struct mosquitto_subscription *sub);
int retain__store(const char *topic, struct mosquitto__base_msg *base_msg, char **split_topics, bool persist);

void retain__expiry_check(struct mosquitto__retainhier **retainhier);
/* ============================================================
* Security related functions
* ============================================================ */
Expand Down
33 changes: 26 additions & 7 deletions src/retain.c
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,19 @@ int retain__store(const char *topic, struct mosquitto__base_msg *base_msg, char
return MOSQ_ERR_SUCCESS;
}

static bool retain__delete_expired_msg(struct mosquitto__retainhier *branch)
{
if(branch->retained && branch->retained->data.expiry_time > 0 && db.now_real_s >= branch->retained->data.expiry_time){
plugin_persist__handle_retain_msg_delete(branch->retained);
db__msg_store_ref_dec(&branch->retained);
branch->retained = NULL;
#ifdef WITH_SYS_TREE
db.retained_count--;
#endif
return true;
}
return false;
}

static int retain__process(struct mosquitto__retainhier *branch, struct mosquitto *context, const struct mosquitto_subscription *sub)
{
Expand All @@ -200,13 +213,7 @@ static int retain__process(struct mosquitto__retainhier *branch, struct mosquitt
uint16_t mid;
struct mosquitto__base_msg *retained;

if(branch->retained->data.expiry_time > 0 && db.now_real_s >= branch->retained->data.expiry_time){
plugin_persist__handle_retain_msg_delete(branch->retained);
db__msg_store_ref_dec(&branch->retained);
branch->retained = NULL;
#ifdef WITH_SYS_TREE
db.retained_count--;
#endif
if(retain__delete_expired_msg(branch)){
return MOSQ_ERR_SUCCESS;
}

Expand Down Expand Up @@ -344,6 +351,17 @@ int retain__queue(struct mosquitto *context, const struct mosquitto_subscription
return MOSQ_ERR_SUCCESS;
}

void retain__expiry_check(struct mosquitto__retainhier **retainhier)
{
struct mosquitto__retainhier *peer, *retainhier_tmp;

HASH_ITER(hh, *retainhier, peer, retainhier_tmp){
retain__expiry_check(&peer->children);
if (retain__delete_expired_msg(peer)){
retain__clean_empty_hierarchy(peer);
}
}
}

void retain__clean(struct mosquitto__retainhier **retainhier)
{
Expand All @@ -360,3 +378,4 @@ void retain__clean(struct mosquitto__retainhier **retainhier)
}
}


13 changes: 12 additions & 1 deletion test/broker/15-persist-client-drop-expired-messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ def do_test(
additional_config_entries: dict,
resubscribe: bool,
num_messages_two_subscribers: int = 0,
num_retain_messages : int = 0,
):
print(
f"{test_case_name}, resubscribe = {resubscribe}, two_subscribers = {'True' if num_messages_two_subscribers > 0 else 'False'}"
f"{test_case_name}, resubscribe = {resubscribe}, two_subscribers = {'True' if num_messages_two_subscribers > 0 else 'False'}, num_retain_messages = {num_retain_messages} "
)

conf_file = os.path.basename(__file__).replace(".py", f"_{port}.conf")
Expand Down Expand Up @@ -71,6 +72,7 @@ def do_test(
0,
num_messages - num_messages_two_subscribers,
message_expiry=60,
retain_end=num_retain_messages,
)

if num_messages_two_subscribers > 0:
Expand All @@ -90,6 +92,7 @@ def do_test(
num_messages - num_messages_two_subscribers,
num_messages,
message_expiry=60,
retain_end=num_retain_messages,
)
publisher_sock.close()

Expand All @@ -105,6 +108,7 @@ def do_test(
client_msg_counts=msg_counts,
publisher_id=publisher_id,
num_published_msgs=num_messages,
retain_end = num_retain_messages,
message_expiry=60,
)

Expand Down Expand Up @@ -143,6 +147,7 @@ def do_test(
client_msg_counts=msg_counts,
publisher_id=publisher_id,
num_published_msgs=num_messages,
retain_end = 0,
)

rc = broker_terminate_rc
Expand Down Expand Up @@ -182,3 +187,9 @@ def do_test(
resubscribe=False,
num_messages_two_subscribers=30,
)
do_test(
"memory queue",
additional_config_entries=memory_queue_config,
resubscribe=False,
num_retain_messages=40,
)
32 changes: 31 additions & 1 deletion test/broker/15-persist-client-expired-session.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ def do_test(
additional_config_entries: dict,
resubscribe: bool,
num_messages_two_subscribers: int = 0,
num_retain_messages : int = 0,
):
print(
f"{test_case_name}, resubscribe = {resubscribe}, two_subscribers = {'True' if num_messages_two_subscribers > 0 else 'False'}"
f"{test_case_name}, resubscribe = {resubscribe}, two_subscribers = {'True' if num_messages_two_subscribers > 0 else 'False'}, num_retain_messages = {num_retain_messages} "
)

conf_file = os.path.basename(__file__).replace(".py", f"_{port}.conf")
Expand Down Expand Up @@ -69,6 +70,7 @@ def do_test(
topic,
0,
num_messages - num_messages_two_subscribers,
retain_end=num_retain_messages,
)

if num_messages_two_subscribers > 0:
Expand All @@ -87,6 +89,7 @@ def do_test(
topic,
num_messages - num_messages_two_subscribers,
num_messages,
retain_end=num_retain_messages,
)
publisher_sock.close()

Expand All @@ -102,6 +105,7 @@ def do_test(
client_msg_counts=msg_counts,
publisher_id=publisher_id,
num_published_msgs=num_messages,
retain_end = num_retain_messages,
)

# Put session expiry_time into the past
Expand Down Expand Up @@ -136,6 +140,7 @@ def do_test(
client_msg_counts=msg_counts,
publisher_id=publisher_id,
num_published_msgs=num_messages,
retain_end=num_retain_messages,
)

if num_messages_two_subscribers > 0:
Expand Down Expand Up @@ -175,6 +180,7 @@ def do_test(
client_msg_counts=msg_counts,
publisher_id=publisher_id,
num_published_msgs=num_messages,
retain_end=num_retain_messages,
)

rc = broker_terminate_rc
Expand Down Expand Up @@ -221,3 +227,27 @@ def do_test(
resubscribe=True,
num_messages_two_subscribers=20,
)
do_test(
"memory queue",
additional_config_entries=memory_queue_config,
resubscribe=False,
num_retain_messages=30,
)
# The following test case is open for discussion as adapting
# the check routines will be hard and some observations
# are unclear right now
# do_test(
# "memory queue",
# additional_config_entries=memory_queue_config,
# resubscribe=False,
# num_messages_two_subscribers=40,
# num_retain_messages=30,
# )
# do_test(
# "memory queue",
# additional_config_entries=memory_queue_config,
# resubscribe=False,
# num_messages_two_subscribers=50,
# num_retain_messages=60,
# )

10 changes: 7 additions & 3 deletions test/broker/persist_module_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def publish_messages(
topic: str,
start: int,
end: int,
retain_end=0,
message_expiry: int = 0,
qos: int = 1,
):
Expand All @@ -65,6 +66,7 @@ def publish_messages(
mid=mid,
qos=qos,
payload=payload.encode("UTF-8"),
retain = True if i < retain_end else False,
proto_ver=proto_ver,
properties=props,
)
Expand All @@ -80,18 +82,20 @@ def check_db(
client_msg_counts: dict[str, int],
publisher_id: str,
num_published_msgs: int,
retain_end: int,
message_expiry: int = 0,
qos: int = 1,
):
count_list = [v for v in client_msg_counts.values() if v] + [0]
count_list = [v for v in client_msg_counts.values() if v is not None] + [0]
num_base_msgs = max(count_list)
num_subscriptions = sum(1 for c in client_msg_counts.values() if c is not None)
num_client_msgs_out = sum(count_list)
persist_help.check_counts(
port,
clients=len(client_msg_counts),
client_msgs_out=num_client_msgs_out,
base_msgs=num_base_msgs,
base_msgs=num_base_msgs if num_base_msgs > 0 or retain_end == 0 else 1,
retain_msgs=1 if retain_end > 0 else 0,
subscriptions=num_subscriptions,
)

Expand Down Expand Up @@ -131,7 +135,7 @@ def check_db(
mid,
port,
qos,
retain=0,
retain=1 if i < retain_end else 0,
idx=i,
)
# Check client msg
Expand Down
4 changes: 0 additions & 4 deletions test/broker/persist_sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,10 +360,6 @@ def check_client_msg(
raise ValueError(
"Invalid state %d / %d for message %s" % (row[8], state, msg_id)
)
except ValueError as err:
raise ValueError(
str(err) + f" in client message client_id = {client_id} cmsg_id = {idx}"
) from err
finally:
con.close()

Expand Down
5 changes: 5 additions & 0 deletions test/unit/broker/subs_stubs.c
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,11 @@ int retain__init(void)
return MOSQ_ERR_SUCCESS;
}

void retain__expiry_check(struct mosquitto__retainhier **retainhier)

Check warning on line 118 in test/unit/broker/subs_stubs.c

View check run for this annotation

Codecov / codecov/patch

test/unit/broker/subs_stubs.c#L118

Added line #L118 was not covered by tests
{
UNUSED(retainhier);
}

Check warning on line 121 in test/unit/broker/subs_stubs.c

View check run for this annotation

Codecov / codecov/patch

test/unit/broker/subs_stubs.c#L120-L121

Added lines #L120 - L121 were not covered by tests

void retain__clean(struct mosquitto__retainhier **retainhier)
{
UNUSED(retainhier);
Expand Down

0 comments on commit 4b91612

Please sign in to comment.