博客
关于我
【Rust投稿】从零实现消息中间件(6)-CLIENT
阅读量:681 次
发布时间:2019-03-17

本文共 6670 字,大约阅读时间需要 22 分钟。

功能设计

作为一个客户端实现,我们需要支持两个主要功能:发布消息(pub)和订阅消息(sub)。具体来说,客户端需要具备以下三项能力:

  • 发布消息接口:允许客户端向服务器发送消息。
  • 订阅消息接口:客户端可以指定接收特定主题的消息。
  • 消息处理接口:接收到消息后,需要根据订阅关系将消息传递给相关的回调处理函数。
  • 数据结构定义

    为了实现上述功能,客户端需要维护一些核心数据结构:

    • writer:用于向服务器发送消息的写入器。
    • handler:处理接收到的消息的call-back函数。
    • sid:每个客户端可以有多个订阅请求,每个订阅请求需要有一个唯一的ID,这里使用sid来表示。
    • stop:用于客户端正常关闭的信号发送。

    此外,客户端的实际实现采用了Mutex和WriteHalf等同步机制,以确保在多线程环境下的 thread-safe操作。

    接口详情

    connect接口

    public async fn connect(addr: &str) -> std::io::Result
    { let conn = TcpStream::connect(addr).await?; let (reader, writer) = tokio::io::split(conn); let (tx, rx) = tokio::sync::oneshot::channel(); let client = Client { addr: addr.into(), writer: Arc::new(Mutex::new(writer)), stop: Some(tx), sid: 0, handler: Arc::new(Default::default()), }; // backdrop task for receiving messages tokio::spawn(async move { Self::receive_task(reader, rx, client.handler.clone(), client.writer.clone()).await; }); Ok(client)}

    pub_message接口

    public async fn pub_message(&mut self, subject: &str, msg: & [u8]) -> std::io::Result<()> {    let mut writer = self.writer.lock().await;    let m = format!("PUB {} {}\r\n", subject, msg.len());    writer.write_all(m.as_bytes()).await?;    writer.write_all(msg).await?;    writer.write_all("\r\n".as_bytes()).await?;    Ok(())}

    sub_message接口

    public async fn sub_message(&mut self, subject: String, queue: Option
    , handler: MessageHandler) -> std::io::Result<()> { self.sid += 1; let mut writer = self.writer.lock().await; let m = if let Some(queue) = queue { format!("SUB {} {} {}\r\n", subject.as_str(), queue, self.sid) } else { format!("SUB {} {}\r\n", subject.as_str(), self.sid) }; self.handler.lock().await.insert(subject, handler); writer.write_all(m.as_bytes()).await?; Ok(())}

    receive_task

    async fn receive_task(    mut reader: ReadHalf
    , stop: oneshot::Receiver<()>, handler: Arc
    >>>>, writer: Arc< Mutex
    < TcpStream>>>,) { let mut buf = [0u8; 512]; let mut parser = Parser::new(); loop { select! { _ = stopDetalle received -> { println!("client closed"); return; } r = reader.read(&mut buf[..]).fuse() -> { let n = match r { Err(e) => { println!("read err {}", e); writer.lock().await.shutdown().await; return; } Ok(n) => n, }; if n == 0 { return; } let mut buf2 = &buf[..n]; loop { let r = parser.parse(buf2); let (r, edited3098) = match r { Err(e) => { println!("parse err {}", e); writer.lock().await.shutdown().await; return; } Ok(r) => r, }; if r == ParseResult::NoMsg { break; } match r { ParseResult::MsgArg(msg) => { Self::process_message(msg, handler).await; parser.clear_msg_buf(); } _ => println!("unexpected message type"), } if n == buf.len() { break; } buf2 = &buf2[n..]; } } } }}

    代码实现

    Client 实现

    impl Client {    /// 1. 建立到服务器的连接    /// 2. 启动后台任务    pub async fn connect(addr: &str) -> std::io::Result
    { let conn = TcpStream::connect(addr).await?; let (reader, writer) = tokio::io::split(conn); let (tx, rx) = tokio::sync::oneshot::channel(); let client = Client { addr: addr.into(), writer: Arc::new(Mutex::new(writer)), stop: Some(tx), sid: 0, handler: Arc::new(Default::default()), }; // tokio::spawn 可以认为和go语言中的 go func() {} tokio::spawn(async move { Self::receive_task(reader, rx, client.handler.clone(), client.writer.clone()).await; }); Ok(client) } async fn receive_task( mut reader: ReadHalf
    , stop: oneshot::Receiver<()>, handler: Arc
    >>>>, writer: Arc< Mutex< WriteHalf
    >>, ) { let mut buf = [0u8; 512]; let mut parser = Parser::new(); let mut stop = stop.fuse(); loop { select! { _ = stop -> { println!("client closed"); return; } r = reader.read(&mut buf[..]).fuse() -> { let n = { match r { Err(e) => { println!("read err {}", e); let _ = writer.lock().await.shutdown().await; return; } Ok(n) => n, } }; if n == 0 { // TCP connection closed, no need to continue return; } let mut buf2 = &buf[..n]; loop { let r = parser.parse(buf2); let (r, n) = match r { Err(e) => { println!("parse err {}", e); let _ = writer.lock().await.shutdown().await; return; } Ok(r) => r, }; if r == ParseResult::NoMsg { break; } match r { ParseResult::MsgArg(msg) => { Self::process_message(msg, handler).await; parser.clear_msg_buf(); } _ => println!("unexpected message type"), } if n == buf.len() { break; } buf2 = &buf2[n..]; } } } } } async fn process_message( msg: MsgArg<'_>, handler: Arc
    >>>>, ) { let mut handler = handler.lock().await; if let Some(h) = handler.get_mut(msg.subject) { let _ = h(msg.msg); } } pub async fn pub_message(&self, subject: & str, msg: & [u8]) -> std::io::Result<()> { let mut writer = self.writer.lock().await; let m = format!("PUB {} {}\r\n", subject, msg.len()); writer.write_all(m.as_bytes()).await?; writer.write_all(msg).await?; writer.write_all("\r\n".as_bytes()).await?; Ok(()) } pub async fn sub_message( &mut self, subject: String, queue: Option
    , handler: MessageHandler, ) -> std::io::Result<()> { self.sid += 1; let mut writer = self.writer.lock().await; let m = if let Some(queue) = queue { format!("SUB {} {} {}\r\n", subject.as_str(), queue, self.sid) } else { format!("SUB {} {}\r\n", subject.as_str(), self.sid) }; self.handler.lock().await.insert(subject, handler); writer.write_all(m.as_bytes()).await?; Ok(()) } pub fn close(&mut self) { if let Some(stop) = self.stop.take() { if let Err(e) = stop.send(()) { println!("stop error {:?}", e); } } }}

    其他

    关于这个客户端的实现,如果你对细节感兴趣,可以去我的GitHub仓库哪里学障用时赏析(请允许我抛掷一个链接)。你可以用 tonpeek 查看工具查看代码中的rational .

    https://github.com/nkbai/learnrustbynats

    这是一个学习Rust语言结合NATS协议的项目,欢迎关注。

    转载地址:http://gwohz.baihongyu.com/

    你可能感兴趣的文章
    网络协议和支持(一)、uuid模块
    查看>>
    numpy.vstack
    查看>>
    numpy.frombuffer()
    查看>>
    文件结束符EOF
    查看>>
    Latex 错误集合
    查看>>
    Python的一个报错——OSError: [Errno 22] Invalid argument
    查看>>
    Python的内置函数(四十一)、 index()
    查看>>
    卷积神经网络的工程技巧总结
    查看>>
    OSError: [Errno 22] Invalid argument: ‘D:\test\x07‘
    查看>>
    Python字符串操作之字符串分割与组合
    查看>>
    tf.tuple
    查看>>
    开放式系统互联模型(网络的七层架构)
    查看>>
    windows系统配置自动tomcat
    查看>>
    49数据通路的功能和基本结构
    查看>>
    Java面试宝典(2020版)
    查看>>
    4大继承模式
    查看>>
    06二维数组
    查看>>
    Springboot 初學習
    查看>>
    如何用华为位置服务实现搜索位置返回父子节点信息
    查看>>
    2020年云南省专升本 - 「计算机」专业各院校招生计划
    查看>>