您当前的位置: 首页 >  rust

mutourend

暂无认证

  • 1浏览

    0关注

    661博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

Rust中的channel

mutourend 发布时间:2021-08-05 19:54:57 ,浏览量:1

1. std::sync::mpsc::channel

支持多Sender,仅支持1个Receiver,可保证接收消息的顺序与发送的顺序一致。

pub fn channel() -> (Sender, Receiver)

会创建新的async channel,返回的是sender/receiver对。 所有经由Sender发送的数据顺序,与 在Receiver端收到的数据顺序是一致的。 没有任何send操作可阻塞线程,该channel可认为是具有“无限buffer”的,而recv操作将阻塞直到有消息过来。(而对于sync_channel,当其达到buffer limit时会阻塞)。

Sender可复制多次send到同一channel,但仅支持一个`Receiver``。

Receiver断开时,Sender的send操作会受到SendError。同理,当Sender断开时,Receiver的recv操作将受到RecvError。

示例:

use std::sync::mpsc::channel;
use std::thread;

let (sender, receiver) = channel();

// Spawn off an expensive computation
thread::spawn(move|| {
    sender.send(expensive_computation()).unwrap();
});

// Do some useful work for awhile

// Let's see what that answer was
println!("{:?}", receiver.recv().unwrap());
2. crossbeam_channel

为多生产者,多接收者的消息传输通道。 channel通道的创建方式有2种:

  • bounded:通道具有容量上限,即通道内同时容纳的消息数有限制。
use crossbeam_channel::bounded;

// Create a channel that can hold at most 5 messages at a time.
let (s, r) = bounded(5);

// Can send only 5 messages without blocking.
for i in 0..5 {
    s.send(i).unwrap();
}

// Another call to `send` would block because the channel is full.
// s.send(5).unwrap();

  • unbounded:通道的容量无上限,即通道内可同时容纳任意多的消息。
use crossbeam_channel::unbounded;

// Create an unbounded channel.
let (s, r) = unbounded();

// Can send any number of messages into the channel without blocking.
for i in 0..1000 {
    s.send(i).unwrap();
}

特例情况为0容量通道,即通道内无法容纳消息。对应的,发送和接收操作必须成对出现:

use std::thread;
use crossbeam_channel::bounded;

// Create a zero-capacity channel.
let (s, r) = bounded(0);

// Sending blocks until a receive operation appears on the other side.
thread::spawn(move || s.send("Hi!").unwrap());

// Receiving blocks until a send operation appears on the other side.
assert_eq!(r.recv(), Ok("Hi!"));

支持复制Sender和Receiver:

use crossbeam_channel::unbounded;

let (s1, r1) = unbounded();
let (s2, r2) = (s1.clone(), r1.clone());
let (s3, r3) = (s2.clone(), r2.clone());

s1.send(10).unwrap();
s2.send(20).unwrap();
s3.send(30).unwrap();

assert_eq!(r3.recv(), Ok(10));
assert_eq!(r1.recv(), Ok(20));
assert_eq!(r2.recv(), Ok(30));

当所有的Senders或Receivers 与通道连接断开,则通道处于disconnected状态,消息不再可发送成功,但是通道内剩余的消息仍然可被接收。对处于disconnected状态的通道进行发送或接收操作都不会阻塞:

use crossbeam_channel::{unbounded, RecvError};

let (s, r) = unbounded();
s.send(1).unwrap();
s.send(2).unwrap();
s.send(3).unwrap();

// The only sender is dropped, disconnecting the channel.
drop(s);

// The remaining messages can be received.
assert_eq!(r.recv(), Ok(1));
assert_eq!(r.recv(), Ok(2));
assert_eq!(r.recv(), Ok(3));

// There are no more messages in the channel.
assert!(r.is_empty());

// Note that calling `r.recv()` does not block.
// Instead, `Err(RecvError)` is returned immediately.
assert_eq!(r.recv(), Err(RecvError));

发送和接收操作支持3种模式:

  • 非阻塞(立即返回成功或失败)
  • 阻塞(等待操作成功或通道disconnected)
  • 超时阻塞(仅阻塞一段时间)
use crossbeam_channel::{bounded, RecvError, TryRecvError};

let (s, r) = bounded(1);

// Send a message into the channel.
s.send("foo").unwrap();

// This call would block because the channel is full.
// s.send("bar").unwrap();

// Receive the message.
assert_eq!(r.recv(), Ok("foo"));

// This call would block because the channel is empty.
// r.recv();

// Try receiving a message without blocking.
assert_eq!(r.try_recv(), Err(TryRecvError::Empty));

// Disconnect the channel.
drop(s);

// This call doesn't block because the channel is now disconnected.
assert_eq!(r.recv(), Err(RecvError));

借助try_iter可非阻塞的获取通道内的所有消息:

use crossbeam_channel::unbounded;

let (s, r) = unbounded();
s.send(1).unwrap();
s.send(2).unwrap();
s.send(3).unwrap();
// No need to drop the sender.

// Receive all messages currently in the channel.
let v: Vec = r.try_iter().collect();

assert_eq!(v, [1, 2, 3]);
参考资料

[1] https://doc.rust-lang.org/std/sync/mpsc/fn.channel.html [2] https://docs.rs/crossbeam-channel/0.5.1/crossbeam_channel/

关注
打赏
1664532908
查看更多评论
立即登录/注册

微信扫码登录

0.0372s