Skip to content

Commit 98a0908

Browse files
committed
add more examples
1 parent 8563718 commit 98a0908

File tree

6 files changed

+71
-3
lines changed

6 files changed

+71
-3
lines changed

asyncwait/src/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ fn main() {
1111

1212
join();
1313
select();
14+
futures_select();
1415

1516
stream();
1617

asyncwait/src/runtimes.rs

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
21
use futures::channel::mpsc;
32
use futures::executor::{self, ThreadPool};
43
use futures::try_join;
@@ -9,7 +8,6 @@ use futures::{
98
select,
109
};
1110

12-
1311
pub fn tokio_async() {
1412
let rt = tokio::runtime::Runtime::new().unwrap();
1513
rt.block_on(async {
@@ -94,3 +92,23 @@ pub fn select() {
9492
}
9593
});
9694
}
95+
96+
pub fn futures_select() {
97+
futures_lite::future::block_on(async {
98+
use futures::future;
99+
100+
let mut a_fut = future::ready(4);
101+
let mut b_fut = future::ready(6);
102+
let mut total = 0;
103+
104+
loop {
105+
select! {
106+
a = a_fut => total += a,
107+
b = b_fut => total += b,
108+
complete => {println!("complete"); break},
109+
default => unreachable!(), // never runs (futures are ready, then complete)
110+
};
111+
}
112+
assert_eq!(total, 10);
113+
});
114+
}

channel/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,5 +12,7 @@ atomic_mpmc = "0.2.0"
1212
broadcaster = "1.0.0"
1313
crossfire = "0.1.7"
1414
flume = "0.10.14"
15+
futures = "0.3.24"
16+
futures-channel = "0.3.24"
1517
futures-util = "0.3.24"
1618
tokio = { version = "1.21.2", features = ["full"] }

channel/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,4 +59,4 @@ pub fn mpsc_example4() {
5959
}
6060

6161
println!("mpsc_example4 completed");
62-
}
62+
}

channel/src/main.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,6 @@ fn main() {
1212
flume_example();
1313
async_channel_example();
1414
async_priority_channel_example();
15+
futures_channel_mpsc_example();
16+
futures_channel_oneshot_example();
1517
}

channel/src/others.rs

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,51 @@ pub fn async_channel_example() {
163163
});
164164
}
165165

166+
pub fn futures_channel_mpsc_example() {
167+
let rt = tokio::runtime::Runtime::new().unwrap();
168+
169+
let (tx, mut rx) = futures_channel::mpsc::channel(3);
170+
171+
rt.block_on(async move {
172+
tokio::spawn(async move {
173+
for _ in 0..3 {
174+
let mut tx = tx.clone();
175+
thread::spawn(move || tx.start_send("ok"));
176+
}
177+
178+
drop(tx);
179+
});
180+
181+
// Unbounded receiver waiting for all senders to complete.
182+
while let Ok(msg) = rx.try_next() {
183+
println!("{:?}", msg);
184+
}
185+
186+
println!("futures_channel_mpsc_example completed");
187+
});
188+
}
189+
190+
pub fn futures_channel_oneshot_example() {
191+
use futures::channel::oneshot;
192+
use std::time::Duration;
193+
194+
let (sender, receiver) = oneshot::channel::<i32>();
195+
196+
thread::spawn(|| {
197+
println!("THREAD: sleeping zzz...");
198+
thread::sleep(Duration::from_millis(1000));
199+
println!("THREAD: i'm awake! sending.");
200+
sender.send(3).unwrap();
201+
});
202+
203+
println!("MAIN: doing some useful stuff");
204+
205+
futures::executor::block_on(async {
206+
println!("MAIN: waiting for msg...");
207+
println!("MAIN: got: {:?}", receiver.await)
208+
});
209+
}
210+
166211
pub fn async_priority_channel_example() {
167212
let rt = tokio::runtime::Runtime::new().unwrap();
168213

0 commit comments

Comments
 (0)