Skip to content

Commit cbae5d1

Browse files
authored
[ISSUE#31] Stop handler thread on destroy (#82)
1 parent cee9bf7 commit cbae5d1

File tree

6 files changed

+133
-57
lines changed

6 files changed

+133
-57
lines changed

mqtt-client/api/mqtt-client.api

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,11 @@ public abstract interface class com/gojek/mqtt/client/MqttInterceptor {
2929

3030
public final class com/gojek/mqtt/client/config/ExperimentConfigs {
3131
public fun <init> ()V
32-
public fun <init> (Lcom/gojek/mqtt/client/config/SubscriptionStore;Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIIJJZI)V
33-
public synthetic fun <init> (Lcom/gojek/mqtt/client/config/SubscriptionStore;Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIIJJZIILkotlin/jvm/internal/DefaultConstructorMarker;)V
32+
public fun <init> (Lcom/gojek/mqtt/client/config/SubscriptionStore;Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIIJJZIZ)V
33+
public synthetic fun <init> (Lcom/gojek/mqtt/client/config/SubscriptionStore;Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIIJJZIZILkotlin/jvm/internal/DefaultConstructorMarker;)V
3434
public final fun component1 ()Lcom/gojek/mqtt/client/config/SubscriptionStore;
3535
public final fun component10 ()I
36+
public final fun component11 ()Z
3637
public final fun component2 ()Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;
3738
public final fun component3 ()I
3839
public final fun component4 ()I
@@ -41,8 +42,8 @@ public final class com/gojek/mqtt/client/config/ExperimentConfigs {
4142
public final fun component7 ()J
4243
public final fun component8 ()J
4344
public final fun component9 ()Z
44-
public final fun copy (Lcom/gojek/mqtt/client/config/SubscriptionStore;Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIIJJZI)Lcom/gojek/mqtt/client/config/ExperimentConfigs;
45-
public static synthetic fun copy$default (Lcom/gojek/mqtt/client/config/ExperimentConfigs;Lcom/gojek/mqtt/client/config/SubscriptionStore;Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIIJJZIILjava/lang/Object;)Lcom/gojek/mqtt/client/config/ExperimentConfigs;
45+
public final fun copy (Lcom/gojek/mqtt/client/config/SubscriptionStore;Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIIJJZIZ)Lcom/gojek/mqtt/client/config/ExperimentConfigs;
46+
public static synthetic fun copy$default (Lcom/gojek/mqtt/client/config/ExperimentConfigs;Lcom/gojek/mqtt/client/config/SubscriptionStore;Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIIJJZIZILjava/lang/Object;)Lcom/gojek/mqtt/client/config/ExperimentConfigs;
4647
public fun equals (Ljava/lang/Object;)Z
4748
public final fun getActivityCheckIntervalSeconds ()I
4849
public final fun getAdaptiveKeepAliveConfig ()Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;
@@ -53,6 +54,7 @@ public final class com/gojek/mqtt/client/config/ExperimentConfigs {
5354
public final fun getMaxInflightMessagesLimit ()I
5455
public final fun getPolicyResetTimeSeconds ()I
5556
public final fun getShouldUseNewSSLFlow ()Z
57+
public final fun getStopMqttThreadOnDestroy ()Z
5658
public final fun getSubscriptionStore ()Lcom/gojek/mqtt/client/config/SubscriptionStore;
5759
public fun hashCode ()I
5860
public fun toString ()Ljava/lang/String;
@@ -816,6 +818,23 @@ public final class com/gojek/mqtt/event/MqttEvent$OfflineMessageDiscardedEvent :
816818
public fun toString ()Ljava/lang/String;
817819
}
818820

821+
public final class com/gojek/mqtt/event/MqttEvent$OperationDiscardedEvent : com/gojek/mqtt/event/MqttEvent {
822+
public fun <init> (Ljava/lang/String;Ljava/lang/String;Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;)V
823+
public synthetic fun <init> (Ljava/lang/String;Ljava/lang/String;Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
824+
public final fun component1 ()Ljava/lang/String;
825+
public final fun component2 ()Ljava/lang/String;
826+
public final fun component3 ()Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;
827+
public final fun copy (Ljava/lang/String;Ljava/lang/String;Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;)Lcom/gojek/mqtt/event/MqttEvent$OperationDiscardedEvent;
828+
public static synthetic fun copy$default (Lcom/gojek/mqtt/event/MqttEvent$OperationDiscardedEvent;Ljava/lang/String;Ljava/lang/String;Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;ILjava/lang/Object;)Lcom/gojek/mqtt/event/MqttEvent$OperationDiscardedEvent;
829+
public fun equals (Ljava/lang/Object;)Z
830+
public fun getConnectionInfo ()Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;
831+
public final fun getName ()Ljava/lang/String;
832+
public final fun getReason ()Ljava/lang/String;
833+
public fun hashCode ()I
834+
public fun setConnectionInfo (Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;)V
835+
public fun toString ()Ljava/lang/String;
836+
}
837+
819838
public final class com/gojek/mqtt/event/MqttEvent$OptimalKeepAliveFoundEvent : com/gojek/mqtt/event/MqttEvent {
820839
public fun <init> (IIILcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;)V
821840
public synthetic fun <init> (IIILcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;ILkotlin/jvm/internal/DefaultConstructorMarker;)V

mqtt-client/src/main/java/com/gojek/mqtt/client/config/ExperimentConfigs.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@ data class ExperimentConfigs(
1717
val incomingMessagesTTLSecs: Long = 360,
1818
val incomingMessagesCleanupIntervalSecs: Long = 60,
1919
val shouldUseNewSSLFlow: Boolean = false,
20-
val maxInflightMessagesLimit: Int = MAX_INFLIGHT_MESSAGES_ALLOWED
20+
val maxInflightMessagesLimit: Int = MAX_INFLIGHT_MESSAGES_ALLOWED,
21+
val stopMqttThreadOnDestroy: Boolean = false
2122
)
2223

2324
enum class SubscriptionStore {

mqtt-client/src/main/java/com/gojek/mqtt/client/v3/impl/AndroidMqttClient.kt

Lines changed: 38 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,6 @@
11
package com.gojek.mqtt.client.v3.impl
22

33
import android.content.Context
4-
import android.os.Bundle
5-
import android.os.Handler
6-
import android.os.HandlerThread
7-
import android.os.Looper
8-
import android.os.Message
9-
import android.os.Messenger
10-
import android.os.RemoteException
114
import androidx.annotation.RequiresApi
125
import com.gojek.courier.QoS
136
import com.gojek.courier.callback.SendMessageCallback
@@ -38,11 +31,11 @@ import com.gojek.mqtt.client.model.ConnectionState.DISCONNECTING
3831
import com.gojek.mqtt.client.model.ConnectionState.INITIALISED
3932
import com.gojek.mqtt.client.model.MqttSendPacket
4033
import com.gojek.mqtt.client.v3.IAndroidMqttClient
34+
import com.gojek.mqtt.client.v3.impl.State.DESTROYED
35+
import com.gojek.mqtt.client.v3.impl.State.UNINITIALISED
4136
import com.gojek.mqtt.connection.IMqttConnection
4237
import com.gojek.mqtt.connection.MqttConnection
4338
import com.gojek.mqtt.connection.config.v3.ConnectionConfig
44-
import com.gojek.mqtt.constants.MESSAGE
45-
import com.gojek.mqtt.constants.MSG_APP_PUBLISH
4639
import com.gojek.mqtt.event.EventHandler
4740
import com.gojek.mqtt.event.MqttEvent.AuthenticatorErrorEvent
4841
import com.gojek.mqtt.event.MqttEvent.MqttConnectDiscardedEvent
@@ -54,8 +47,8 @@ import com.gojek.mqtt.event.MqttEvent.MqttMessageSendEvent
5447
import com.gojek.mqtt.event.MqttEvent.MqttMessageSendFailureEvent
5548
import com.gojek.mqtt.event.MqttEvent.MqttMessageSendSuccessEvent
5649
import com.gojek.mqtt.event.MqttEvent.MqttReconnectEvent
50+
import com.gojek.mqtt.event.MqttEvent.OperationDiscardedEvent
5751
import com.gojek.mqtt.exception.toCourierException
58-
import com.gojek.mqtt.handler.IncomingHandler
5952
import com.gojek.mqtt.model.MqttConnectOptions
6053
import com.gojek.mqtt.model.MqttPacket
6154
import com.gojek.mqtt.network.NetworkHandler
@@ -78,6 +71,7 @@ import com.gojek.mqtt.wakelock.WakeLockProvider
7871
import com.gojek.networktracker.NetworkStateTracker
7972
import java.nio.charset.StandardCharsets
8073
import java.util.concurrent.TimeUnit
74+
import java.util.concurrent.atomic.AtomicReference
8175
import org.eclipse.paho.client.mqttv3.MqttException
8276
import org.eclipse.paho.client.mqttv3.MqttException.REASON_CODE_UNEXPECTED_ERROR
8377
import org.eclipse.paho.client.mqttv3.MqttPersistenceException
@@ -95,9 +89,6 @@ internal class AndroidMqttClient(
9589

9690
private val runnableScheduler: IRunnableScheduler
9791
private val mqttConnection: IMqttConnection
98-
private val mqttThreadLooper: Looper
99-
private val mqttThreadHandler: Handler
100-
private var mMessenger: Messenger
10192
private val networkUtils: NetworkUtils
10293
private val mqttUtils: MqttUtils
10394
private val mqttPersistence: PahoPersistence
@@ -114,8 +105,7 @@ internal class AndroidMqttClient(
114105
@Volatile
115106
private var globalListener: MessageListener? = null
116107

117-
@Volatile
118-
private var isInitialised = false
108+
private var state = AtomicReference(UNINITIALISED)
119109

120110
// Accessed only from mqtt thread
121111
private var forceRefresh = false
@@ -133,21 +123,12 @@ internal class AndroidMqttClient(
133123

134124
init {
135125
logger = mqttConfiguration.logger
136-
val mqttHandlerThread = HandlerThread("MQTT_Thread")
137-
mqttHandlerThread.start()
138-
mqttThreadLooper = mqttHandlerThread.looper
139-
mqttThreadHandler = Handler(mqttThreadLooper)
140-
mMessenger = Messenger(
141-
IncomingHandler(mqttThreadLooper, this, logger)
142-
)
143126
@RequiresApi
144127
runnableScheduler = MqttRunnableScheduler(
145-
mqttHandlerThread,
146-
mqttThreadHandler,
147-
this,
148-
logger,
149-
eventHandler,
150-
experimentConfigs.activityCheckIntervalSeconds
128+
clientSchedulerBridge = this,
129+
logger = logger,
130+
eventHandler = eventHandler,
131+
activityCheckIntervalSeconds = experimentConfigs.activityCheckIntervalSeconds
151132
)
152133
mqttUtils = MqttUtils()
153134
networkUtils = NetworkUtils()
@@ -214,19 +195,24 @@ internal class AndroidMqttClient(
214195
connectOptions: MqttConnectOptions
215196
) {
216197
this.connectOptions = connectOptions
217-
isInitialised = true
198+
runnableScheduler.start()
199+
state.set(State.INITIALISED)
218200
runnableScheduler.connectMqtt()
219201
}
220202

221203
// This can be invoked on any thread
222204
override fun reconnect() {
205+
if (state.get() != State.INITIALISED) {
206+
eventHandler.onEvent(OperationDiscardedEvent("Reconnect", "Client is not initialised"))
207+
return
208+
}
223209
eventHandler.onEvent(MqttReconnectEvent())
224210
runnableScheduler.disconnectMqtt(true)
225211
}
226212

227213
// This can be invoked on any thread
228214
override fun disconnect(clearState: Boolean) {
229-
isInitialised = false
215+
state.set(State.DISCONNECTED)
230216
runnableScheduler.disconnectMqtt(false, clearState)
231217
}
232218

@@ -299,6 +285,10 @@ internal class AndroidMqttClient(
299285

300286
// This can be invoked on any thread
301287
override fun send(mqttPacket: MqttPacket, sendMessageCallback: SendMessageCallback): Boolean {
288+
if (state.get() == DESTROYED) {
289+
eventHandler.onEvent(OperationDiscardedEvent("SendMessage", "Client is not initialised"))
290+
return false
291+
}
302292
val mqttSendPacket = MqttSendPacket(
303293
message = mqttPacket.message,
304294
messageId = 0,
@@ -309,25 +299,7 @@ internal class AndroidMqttClient(
309299
triggerTime = System.nanoTime(),
310300
sendMessageCallback = sendMessageCallback
311301
)
312-
313-
val msg = Message.obtain()
314-
msg.what = MSG_APP_PUBLISH
315-
316-
val bundle = Bundle()
317-
bundle.putParcelable(MESSAGE, mqttSendPacket)
318-
319-
msg.data = bundle
320-
msg.replyTo = mMessenger
321-
322-
try {
323-
mMessenger.send(msg)
324-
} catch (e: RemoteException) {
325-
/* Service is dead. What to do? */
326-
logger.e(TAG, "Remote Service dead", e)
327-
return false
328-
}
329-
330-
return true
302+
return runnableScheduler.sendMessage(mqttSendPacket)
331303
}
332304

333305
override fun addMessageListener(topic: String, listener: MessageListener) {
@@ -347,7 +319,7 @@ internal class AndroidMqttClient(
347319
val startTime = clock.nanoTime()
348320
try {
349321
logger.d(TAG, "Sending onConnectAttempt event")
350-
if (!isInitialised) {
322+
if (state.get() != State.INITIALISED) {
351323
logger.d(TAG, "Mqtt Client not initialised")
352324
eventHandler.onEvent(
353325
MqttConnectDiscardedEvent(
@@ -414,6 +386,10 @@ internal class AndroidMqttClient(
414386
eventHandler.onEvent(MqttDisconnectEvent())
415387
mqttConnection.disconnect()
416388
if (clearState) {
389+
if (experimentConfigs.stopMqttThreadOnDestroy) {
390+
state.set(DESTROYED)
391+
runnableScheduler.stop()
392+
}
417393
mqttConnection.shutDown()
418394
subscriptionStore.clear()
419395
mqttPersistence.clearAll()
@@ -432,11 +408,19 @@ internal class AndroidMqttClient(
432408

433409
override fun subscribe(topicMap: Map<String, QoS>) {
434410
val addedTopics = subscriptionStore.subscribeTopics(topicMap)
411+
if (state.get() != State.INITIALISED) {
412+
eventHandler.onEvent(OperationDiscardedEvent("Subscribe", "Client is not initialised"))
413+
return
414+
}
435415
runnableScheduler.scheduleSubscribe(0, addedTopics)
436416
}
437417

438418
override fun unsubscribe(topics: List<String>) {
439419
val removedTopics = subscriptionStore.unsubscribeTopics(topics)
420+
if (state.get() != State.INITIALISED) {
421+
eventHandler.onEvent(OperationDiscardedEvent("Unsubscribe", "Client is not initialised"))
422+
return
423+
}
440424
runnableScheduler.scheduleUnsubscribe(0, removedTopics)
441425
}
442426

@@ -626,3 +610,7 @@ internal class AndroidMqttClient(
626610
const val TAG = "AndroidMqttClient"
627611
}
628612
}
613+
614+
private enum class State {
615+
UNINITIALISED, INITIALISED, DISCONNECTED, DESTROYED
616+
}

mqtt-client/src/main/java/com/gojek/mqtt/event/MqttEvent.kt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,12 @@ sealed class MqttEvent(open var connectionInfo: ConnectionInfo?) {
2121
override var connectionInfo: ConnectionInfo? = null
2222
) : MqttEvent(connectionInfo)
2323

24+
data class OperationDiscardedEvent(
25+
val name: String,
26+
val reason: String,
27+
override var connectionInfo: ConnectionInfo? = null
28+
) : MqttEvent(connectionInfo)
29+
2430
data class MqttConnectSuccessEvent(
2531
val activeNetInfo: ActiveNetInfo,
2632
val serverUri: ServerUri?,

mqtt-client/src/main/java/com/gojek/mqtt/scheduler/IRunnableScheduler.kt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.gojek.mqtt.scheduler
22

33
import com.gojek.courier.QoS
4+
import com.gojek.mqtt.client.model.MqttSendPacket
45

56
internal interface IRunnableScheduler {
67
fun connectMqtt()
@@ -24,4 +25,9 @@ internal interface IRunnableScheduler {
2425
fun scheduleResetParams(delayMillis: Long)
2526

2627
fun scheduleAuthFailureRunnable(delayMillis: Long)
28+
29+
fun sendMessage(mqttSendPacket: MqttSendPacket): Boolean
30+
31+
fun start()
32+
fun stop()
2733
}

0 commit comments

Comments
 (0)