@@ -3,6 +3,7 @@ package com.vincentmasselis.rxbluetoothkotlin
3
3
import android.annotation.SuppressLint
4
4
import android.bluetooth.*
5
5
import android.bluetooth.BluetoothAdapter.*
6
+ import android.bluetooth.BluetoothGatt.GATT_SUCCESS
6
7
import android.content.Context
7
8
import android.content.IntentFilter
8
9
import android.os.Build
@@ -17,6 +18,7 @@ import io.reactivex.functions.Consumer
17
18
import io.reactivex.functions.Function
18
19
import io.reactivex.subjects.PublishSubject
19
20
import io.reactivex.subjects.SingleSubject
21
+ import io.reactivex.subjects.UnicastSubject
20
22
import java.util.*
21
23
import java.util.concurrent.CancellationException
22
24
import java.util.concurrent.TimeUnit
@@ -107,16 +109,16 @@ class RxBluetoothGattImpl(
107
109
callback.onConnectionState
108
110
.subscribe { (newState, status) ->
109
111
when {
110
- newState == BluetoothProfile .STATE_CONNECTED && status == BluetoothGatt . GATT_SUCCESS ->
112
+ newState == BluetoothProfile .STATE_CONNECTED && status == GATT_SUCCESS ->
111
113
downStream.onNext(Unit )
112
114
113
- newState == BluetoothProfile .STATE_DISCONNECTED && status == BluetoothGatt . GATT_SUCCESS ->
115
+ newState == BluetoothProfile .STATE_DISCONNECTED && status == GATT_SUCCESS ->
114
116
downStream.onComplete()
115
117
116
- newState == BluetoothProfile .STATE_DISCONNECTED && status != BluetoothGatt . GATT_SUCCESS ->
118
+ newState == BluetoothProfile .STATE_DISCONNECTED && status != GATT_SUCCESS ->
117
119
downStream.tryOnError(exceptionConverter(source.device, status))
118
120
119
- status != BluetoothGatt . GATT_SUCCESS ->
121
+ status != GATT_SUCCESS ->
120
122
downStream.tryOnError(exceptionConverter(source.device, status))
121
123
}
122
124
}
@@ -143,12 +145,12 @@ class RxBluetoothGattImpl(
143
145
144
146
// -------------------- I/O Tools
145
147
146
- private val operationQueue = PublishSubject .create<Maybe <Any >>()
148
+ private val operationQueue = UnicastSubject .create<Maybe <Any >>()
147
149
148
150
init {
149
151
operationQueue
150
152
.takeUntil(closeSubject.toObservable()) // Disposes when the connection is closed. Keep this takeUntil BEFORE the concatMapMaybe. If put after, every pending I/O into concatMapMaybe is disposed and the pending I/O messages will never trigger which causes dead chains.
151
- .concatMapMaybe { it.onErrorReturnItem(Unit ) /* To avoid disposing which make the queue unavailable */ }
153
+ .concatMapMaybe( { it.onErrorReturnItem(Unit ) /* To avoid disposing which make the queue unavailable */ }, 1 )
152
154
.subscribe()
153
155
}
154
156
@@ -161,11 +163,19 @@ class RxBluetoothGattImpl(
161
163
* @param [this] a single which contains a [BluetoothGatt] I/O operation to do.
162
164
*/
163
165
private fun <T > Single<T>.enqueue (exceptionWrapper : (device: BluetoothDevice , status: Int ) -> DeviceDisconnected ) = Maybe .create<T > { downstream ->
166
+
167
+ // logger?.v(TAG, "Enqueue Maybe create")
168
+
164
169
val livingConnection = Observable .defer { livingConnection(exceptionWrapper) }.replay(1 ).refCount()
165
170
166
171
livingConnection
172
+ /* .doOnSubscribe { logger?.v(TAG, "Enqueue livingConnection subscription") }
173
+ .doOnNext { logger?.v(TAG, "Enqueue livingConnection next") }
174
+ .doOnError { logger?.v(TAG, "Enqueue livingConnection error $it") }
175
+ .doOnComplete { logger?.v(TAG, "Enqueue livingConnection completion") }*/
167
176
.flatMapSingle {
168
177
this // this is the single to enqueue
178
+ // .doOnSubscribe { logger?.v(TAG, "Enqueue source maybe subscription") }
169
179
.subscribeOn(AndroidSchedulers .mainThread())
170
180
// Value is set to 1 minute because some devices take a long time to detect when the connection is lost. For example, we saw up to 16 seconds on a Nexus 4
171
181
// between the last call to write and the moment when the system fallback the disconnection.
@@ -180,6 +190,9 @@ class RxBluetoothGattImpl(
180
190
else Observable .error(it)
181
191
})
182
192
.firstElement()
193
+ /* .doOnSuccess { logger?.v(TAG, "Enqueue Maybe success with value $it") }
194
+ .doOnError { logger?.v(TAG, "Enqueue Maybe error $it") }
195
+ .doOnComplete { logger?.v(TAG, "Enqueue Maybe completed") }*/
183
196
// You don't have to subscribe to this chain, operationQueue will do it for you
184
197
// It's impossible to cancel a bluetooth I/O operation, so, even if the downstream is not listening for values, you must continue to listen until the end of the
185
198
// operation, if not, you could start a new operation before the current has finished which fires exceptions.
@@ -191,7 +204,9 @@ class RxBluetoothGattImpl(
191
204
operationQueue.onNext(it as Maybe <Any >)
192
205
}
193
206
194
- }
207
+ // downstream.setCancellable { logger?.v(TAG, "Enqueue Maybe cancelled") }
208
+
209
+ }// .also { logger?.v(TAG, "Enqueue Maybe method called") }
195
210
196
211
// -------------------- I/O Operations
197
212
@@ -218,7 +233,7 @@ class RxBluetoothGattImpl(
218
233
}
219
234
.enqueue(::RssiDeviceDisconnected )
220
235
.flatMap {
221
- if (it.status != BluetoothGatt . GATT_SUCCESS ) Maybe .error(IOFailed .RssiReadingFailed (it.status, source.device))
236
+ if (it.status != GATT_SUCCESS ) Maybe .error(IOFailed .RssiReadingFailed (it.status, source.device))
222
237
else Maybe .just(it.rssi)
223
238
}
224
239
@@ -245,7 +260,7 @@ class RxBluetoothGattImpl(
245
260
}
246
261
.enqueue(::DiscoverServicesDeviceDisconnected )
247
262
.flatMap { status ->
248
- if (status != BluetoothGatt . GATT_SUCCESS ) Maybe .error(IOFailed .ServiceDiscoveringFailed (status, source.device))
263
+ if (status != GATT_SUCCESS ) Maybe .error(IOFailed .ServiceDiscoveringFailed (status, source.device))
249
264
else Maybe .just(source.services)
250
265
}
251
266
@@ -273,7 +288,7 @@ class RxBluetoothGattImpl(
273
288
}
274
289
.enqueue(::MtuDeviceDisconnected )
275
290
.flatMap {
276
- if (it.status != BluetoothGatt . GATT_SUCCESS ) Maybe .error(IOFailed .MtuRequestingFailed (it.status, source.device))
291
+ if (it.status != GATT_SUCCESS ) Maybe .error(IOFailed .MtuRequestingFailed (it.status, source.device))
277
292
else Maybe .just(it.mtu)
278
293
}
279
294
@@ -303,7 +318,7 @@ class RxBluetoothGattImpl(
303
318
}
304
319
.enqueue(::ReadPhyDeviceDisconnected )
305
320
.flatMap {
306
- if (it.status != BluetoothGatt . GATT_SUCCESS ) Maybe .error(IOFailed .PhyReadFailed (it.connectionPHY, it.status, source.device))
321
+ if (it.status != GATT_SUCCESS ) Maybe .error(IOFailed .PhyReadFailed (it.connectionPHY, it.status, source.device))
307
322
else Maybe .just(it.connectionPHY)
308
323
}
309
324
@@ -333,7 +348,7 @@ class RxBluetoothGattImpl(
333
348
}
334
349
.enqueue { device, status -> SetPreferredPhyDeviceDisconnected (connectionPhy, phyOptions, device, status) }
335
350
.flatMap { (connectionPhy, status) ->
336
- if (status != BluetoothGatt . GATT_SUCCESS ) Maybe .error(IOFailed .SetPreferredPhyFailed (connectionPhy, phyOptions, status, source.device))
351
+ if (status != GATT_SUCCESS ) Maybe .error(IOFailed .SetPreferredPhyFailed (connectionPhy, phyOptions, status, source.device))
337
352
else Maybe .just(connectionPhy)
338
353
}
339
354
@@ -372,7 +387,7 @@ class RxBluetoothGattImpl(
372
387
}
373
388
.enqueue { device, status -> CharacteristicReadDeviceDisconnected (device, status, characteristic.service, characteristic) }
374
389
.flatMap { (readCharacteristic, status) ->
375
- if (status != BluetoothGatt . GATT_SUCCESS ) Maybe .error(IOFailed .CharacteristicReadingFailed (status, source.device, readCharacteristic.service, readCharacteristic))
390
+ if (status != GATT_SUCCESS ) Maybe .error(IOFailed .CharacteristicReadingFailed (status, source.device, readCharacteristic.service, readCharacteristic))
376
391
else Maybe .just(readCharacteristic.value)
377
392
}
378
393
@@ -413,7 +428,7 @@ class RxBluetoothGattImpl(
413
428
}
414
429
.enqueue { device, status -> CharacteristicWriteDeviceDisconnected (device, status, characteristic.service, characteristic, value) }
415
430
.flatMap { (wroteCharacteristic, status) ->
416
- if (status != BluetoothGatt . GATT_SUCCESS ) Maybe .error(
431
+ if (status != GATT_SUCCESS ) Maybe .error(
417
432
IOFailed .CharacteristicWriteFailed (
418
433
status,
419
434
source.device,
@@ -561,7 +576,7 @@ class RxBluetoothGattImpl(
561
576
}
562
577
.enqueue { device, status -> DescriptorReadDeviceDisconnected (device, status, descriptor.characteristic.service, descriptor.characteristic, descriptor) }
563
578
.flatMap { (readDescriptor, status) ->
564
- if (status != BluetoothGatt . GATT_SUCCESS ) Maybe .error(
579
+ if (status != GATT_SUCCESS ) Maybe .error(
565
580
IOFailed .DescriptorReadingFailed (
566
581
status,
567
582
source.device,
@@ -591,7 +606,7 @@ class RxBluetoothGattImpl(
591
606
override fun write (descriptor : BluetoothGattDescriptor , value : ByteArray , checkIfAlreadyWritten : Boolean ): Maybe <BluetoothGattDescriptor > = Single
592
607
.create<Pair <BluetoothGattDescriptor , Int >> { downStream ->
593
608
if (checkIfAlreadyWritten && Arrays .equals(descriptor.value, value)) {
594
- downStream.onSuccess(descriptor to BluetoothGatt . GATT_SUCCESS )
609
+ downStream.onSuccess(descriptor to GATT_SUCCESS )
595
610
return @create
596
611
}
597
612
@@ -615,7 +630,7 @@ class RxBluetoothGattImpl(
615
630
}
616
631
.enqueue { device, status -> DescriptorWriteDeviceDisconnected (device, status, descriptor.characteristic.service, descriptor.characteristic, descriptor, value) }
617
632
.flatMap { (wroteDescriptor, status) ->
618
- if (status != BluetoothGatt . GATT_SUCCESS ) Maybe .error(
633
+ if (status != GATT_SUCCESS ) Maybe .error(
619
634
IOFailed .DescriptorWriteFailed (
620
635
status,
621
636
source.device,
0 commit comments