1
+ // Copyright 2015-2024 Swim Inc.
2
+ //
3
+ // Licensed under the Apache License, Version 2.0 (the "License");
4
+ // you may not use this file except in compliance with the License.
5
+ // You may obtain a copy of the License at
6
+ //
7
+ // http://www.apache.org/licenses/LICENSE-2.0
8
+ //
9
+ // Unless required by applicable law or agreed to in writing, software
10
+ // distributed under the License is distributed on an "AS IS" BASIS,
11
+ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
+ // See the License for the specific language governing permissions and
13
+ // limitations under the License.
14
+
15
+ use std:: pin:: pin;
16
+
17
+ use bytes:: Bytes ;
18
+ use futures:: future:: join;
19
+ use rand:: { thread_rng, Rng } ;
20
+ use rumqttc:: { ClientError , MqttOptions , QoS } ;
21
+ use swimos_utilities:: trigger;
22
+ use tracing:: debug;
23
+
24
+ async fn generate_data ( mqtt_uri : String , mut stop : trigger:: Receiver ) -> Result < ( ) , Box < dyn std:: error:: Error > > {
25
+
26
+ let opts = MqttOptions :: parse_url ( mqtt_uri) ?;
27
+
28
+ let ( client, mut event_loop) = rumqttc:: AsyncClient :: new ( opts, 0 ) ;
29
+ let stop_cpy = stop. clone ( ) ;
30
+ let events_task = async move {
31
+ loop {
32
+ tokio:: select! {
33
+ biased;
34
+ _ = & mut stop => break Ok ( ( ) ) ,
35
+ event = event_loop. poll( ) => {
36
+ match event {
37
+ Ok ( ev) => debug!( event = ?ev, "Processed MQTT event." ) ,
38
+ Err ( err) => break Err ( err) ,
39
+ }
40
+ }
41
+ }
42
+ }
43
+ } ;
44
+
45
+ let gen_task = async move {
46
+ let mut rand = thread_rng ( ) ;
47
+ loop {
48
+ if stop_cpy. check_state ( ) . is_some ( ) {
49
+ break ;
50
+ }
51
+ let mut data = String :: new ( ) ;
52
+ for _ in 0 ..10 {
53
+ data. push ( rand. gen_range ( 'A' ..='Z' ) ) ;
54
+ }
55
+ client. publish_bytes ( "test/topic" , QoS :: AtMostOnce , true , Bytes :: from ( data) ) . await ?;
56
+ }
57
+ Ok ( ( ) ) as Result < ( ) , ClientError >
58
+ } ;
59
+
60
+ let ( ev_res, gen_res) = join ( events_task, gen_task) . await ;
61
+ ev_res?;
62
+ gen_res?;
63
+ Ok ( ( ) )
64
+ }
0 commit comments