diff --git a/lib/src/connectionhandling/mqtt_client_mqtt_connection_keep_alive.dart b/lib/src/connectionhandling/mqtt_client_mqtt_connection_keep_alive.dart index 2a604b7..5b7e51e 100644 --- a/lib/src/connectionhandling/mqtt_client_mqtt_connection_keep_alive.dart +++ b/lib/src/connectionhandling/mqtt_client_mqtt_connection_keep_alive.dart @@ -177,8 +177,10 @@ class MqttConnectionKeepAlive { // Calculate latencies lastCycleLatency = DateTime.now().millisecondsSinceEpoch - _lastPingTime; _cycleCount++; - averageCycleLatency = - (averageCycleLatency + lastCycleLatency) ~/ _cycleCount; + // Average latency calculation is + // new_avg = prev_avg + ((new_value − prev_avg) ~/ n + 1) + averageCycleLatency += + (lastCycleLatency - averageCycleLatency) ~/ _cycleCount; // Call the pong callback if not null if (pongCallback != null) { diff --git a/test/mqtt_client_keep_alive_test.dart b/test/mqtt_client_keep_alive_test.dart index 8463a91..e23ce32 100644 --- a/test/mqtt_client_keep_alive_test.dart +++ b/test/mqtt_client_keep_alive_test.dart @@ -237,7 +237,7 @@ void main() { expect(ka.disconnectTimer, isNull); }); test('Latency counts', () async { - final latencies = [0, 0]; + final latencies = [0, 0, 0]; final clientEventBus = events.EventBus(); var disconnect = false; void disconnectOnNoPingResponse(DisconnectOnNoPingResponse event) { @@ -260,22 +260,27 @@ void main() { verify(() => ch.registerForAllSentMessages(ka.messageSent)).called(1); expect(ka.pingTimer?.isActive, isTrue); expect(ka.disconnectTimer, isNull); - await MqttUtilities.asyncSleep(3); + await MqttUtilities.asyncSleep(4); verify(() => ch.sendMessage(any())).called(1); - await MqttUtilities.asyncSleep(1); final pingMessageRx = MqttPingResponseMessage(); ka.pingResponseReceived(pingMessageRx); latencies[0] = ka.lastCycleLatency; expect(ka.lastCycleLatency > 1000, isTrue); expect(ka.averageCycleLatency > 1000, isTrue); - await MqttUtilities.asyncSleep(2); + await MqttUtilities.asyncSleep(3); verify(() => ch.sendMessage(any())).called(1); - await MqttUtilities.asyncSleep(1); ka.pingResponseReceived(pingMessageRx); latencies[1] = ka.lastCycleLatency; expect(ka.lastCycleLatency > 1000, isTrue); expect(ka.averageCycleLatency > 1000, isTrue); - expect(ka.averageCycleLatency, (latencies[0] + latencies[1]) ~/ 2); + await MqttUtilities.asyncSleep(3); + verify(() => ch.sendMessage(any())).called(1); + ka.pingResponseReceived(pingMessageRx); + latencies[2] = ka.lastCycleLatency; + expect(ka.lastCycleLatency > 1000, isTrue); + expect(ka.averageCycleLatency > 1000, isTrue); + expect(ka.averageCycleLatency, + (latencies[0] + latencies[1] + latencies[2]) ~/ 3); expect(disconnect, isFalse); ka.stop(); expect(ka.averageCycleLatency, 0);