From 0c6c7cc41d5a7172e73ff5e96436e217bb38f2c6 Mon Sep 17 00:00:00 2001 From: azw Date: Wed, 8 May 2024 22:11:20 +0800 Subject: [PATCH] 1 --- server/stat/src/app/user_app.rs | 20 +++++++++++++++++++- server/stat/src/main.rs | 10 +++++++--- third_party/librust | 2 +- 3 files changed, 27 insertions(+), 5 deletions(-) diff --git a/server/stat/src/app/user_app.rs b/server/stat/src/app/user_app.rs index 69e966e..92236a3 100644 --- a/server/stat/src/app/user_app.rs +++ b/server/stat/src/app/user_app.rs @@ -3,16 +3,24 @@ use std::cell::RefCell; use r9_macro::SharedFromSelf; use r9_macro_derive::SharedFromSelf; use r9_macro_derive::Singleton; +use r9::Queue; use f9::app::App; use f9::app::HttpContext; use crate::listener::WsListener; use crate::upstream::UpStreamMgr; +use bytes::BytesMut; //use crate::ss; +pub struct DownStreamPack { + data: bytes::BytesMut, + entry: Rc::>>, +} + #[derive(SharedFromSelf)] #[derive(Singleton)] pub struct UserApp { _self_wp: Weak::>, + net_msg_queue: Rc::>>, } impl f9::app::UserApp for UserApp { @@ -44,7 +52,10 @@ impl f9::app::UserApp for UserApp { } fn update(&mut self) { + self.net_msg_queue.borrow_mut().fetch(); + if self.net_msg_queue.borrow_mut().empty() { + } } fn uninit(&mut self) { @@ -59,12 +70,19 @@ impl f9::app::UserApp for UserApp { impl UserApp { - pub fn test(&mut self) { + pub fn add_down_stream_pack(&mut self, data: bytes::BytesMut) { + let node = Rc::new(RefCell::new(DownStreamPack{ + data: data, + entry: r9::ListHead::< crate::app::user_app::DownStreamPack>::new_node(Default::default()), + })); + node.borrow_mut().entry = r9::ListHead::< crate::app::user_app::DownStreamPack>::new_node(Rc::downgrade(&node)); + self.net_msg_queue.borrow_mut().push(&node.borrow_mut().entry); } pub fn new() -> Self { let p = UserApp{ _self_wp: Default::default(), + net_msg_queue: r9::Queue::::new(), }; return p; } diff --git a/server/stat/src/main.rs b/server/stat/src/main.rs index cac68b3..c4bb3a8 100755 --- a/server/stat/src/main.rs +++ b/server/stat/src/main.rs @@ -90,9 +90,8 @@ impl MyWs { println!("recv buf_len:{0}", tmp_bytes.len()); } { - UserApp::instance().borrow_mut().test(); - Arc::downgrade(&mut self.send_queue).upgrade().unwrap().lock().unwrap().borrow_mut().insert(0, tmp_bytes); - //Arc::downgrade(&mut self.send_queue).upgrade().unwrap().lock().unwrap().borrow_mut().push(tmp_bytes); + UserApp::instance().borrow_mut().add_down_stream_pack(tmp_bytes); + //Arc::downgrade(&mut self.send_queue).upgrade().unwrap().lock().unwrap().borrow_mut().insert(0, tmp_bytes); } offset += (pack_len as usize) + 12; } @@ -237,6 +236,10 @@ async fn index(req: HttpRequest, stream: web::Payload, data: web::Data