From 700c6b448bd91d09a37808f3675341a6db0dcfa0 Mon Sep 17 00:00:00 2001 From: azw Date: Sun, 12 May 2024 16:43:38 +0800 Subject: [PATCH] 1 --- server/stat/src/app/user_app.rs | 90 ++++++++++++++------------ server/stat/src/listener/wslistener.rs | 52 +++++++-------- third_party/librust | 2 +- 3 files changed, 76 insertions(+), 68 deletions(-) diff --git a/server/stat/src/app/user_app.rs b/server/stat/src/app/user_app.rs index dca529f..da1ae8e 100644 --- a/server/stat/src/app/user_app.rs +++ b/server/stat/src/app/user_app.rs @@ -1,36 +1,24 @@ -use std::rc::{Rc, Weak}; -use std::cell::RefCell; -use r9_macro::SharedFromSelf; -use r9_macro_derive::SharedFromSelf; -use r9_macro_derive::Singleton; -use r9::Queue; -use f9::app::App; +use std::sync::Arc; +use std::sync::Mutex; use crate::listener::WsListener; use crate::upstream::UpStreamMgr; -use bytes::BytesMut; -use r9::xtimer::TimerList; use std::sync::OnceLock; pub struct DownStreamPack { - pub holder: Option>>, + pub holder: Option>>, pub socket_handle: u16, pub data: bytes::BytesMut, - pub entry: Rc::>>, + pub entry: Arc::>>, } -unsafe impl Send for DownStreamPack {} -unsafe impl Sync for DownStreamPack {} - pub struct UserApp { - net_msg_queue: Rc::>>, + net_msg_queue: Arc::>>, } -unsafe impl Send for UserApp {} -unsafe impl Sync for UserApp {} impl f9::app::UserApp for UserApp { fn get_pkg_name(&self) -> String { - return "statserver".to_string(); + return "wsproxy".to_string(); } fn init(&self) { @@ -39,12 +27,28 @@ impl f9::app::UserApp for UserApp { } fn update(&self) { - self.net_msg_queue.borrow_mut().fetch(); - let work_list = self.net_msg_queue.borrow().work_list.lock().unwrap().clone(); - while !work_list.borrow().empty() { - let node = &work_list.borrow().first_entry(); - node.upgrade().unwrap().borrow_mut().entry.borrow_mut().del_init(); - WsListener::instance().borrow().add_down_stream_pack(node.clone()); + let mut work_is_empty = false; + while !work_is_empty { + let mut cur_node: Option>> = None; + match self.fetch_work_list().lock() { + Ok(work_list) => { + if !work_list.empty() { + let node = &work_list.first_entry(); + cur_node = Some(node.clone()); + } else { + work_is_empty = true; + break; + } + } + Err(poisoned) => {} + } + match cur_node { + Some(v) => { + v.upgrade().unwrap().lock().unwrap().entry.lock().unwrap().del_init(); + WsListener::instance().borrow().add_down_stream_pack(v.clone()); + } + None => {} + } } WsListener::instance().borrow().update(); } @@ -63,28 +67,32 @@ static UserAppInstance: OnceLock = OnceLock::new(); impl UserApp { - pub fn instance() -> &'static UserApp { - return UserAppInstance.get_or_init(|| UserApp::new()); + pub fn instance() -> &'static Self { + return UserAppInstance.get_or_init(|| Self::new()); } - - pub fn add_down_stream_pack(&self, socket_handle: u16, data: bytes::BytesMut) { - println!("add_down_stream_pack1 len:{0}", data.len()); - let node = Rc::new(RefCell::new(DownStreamPack{ - holder: Default::default(), - socket_handle: socket_handle, - data: data, - entry: r9::ListHead::< crate::app::user_app::DownStreamPack>::new_node(Default::default()), - })); - node.borrow_mut().holder = Some(node.clone()); - node.borrow_mut().entry = r9::ListHead::< crate::app::user_app::DownStreamPack>::new_node(Rc::downgrade(&node)); - self.net_msg_queue.borrow_mut().push(&node.borrow_mut().entry); - } - pub fn new() -> Self { let p = UserApp{ - net_msg_queue: r9::Queue::::new(), + net_msg_queue: r9::QueueLock::::new(), }; return p; } + fn fetch_work_list(&self) -> Arc::>> { + self.net_msg_queue.lock().unwrap().fetch(); + return self.net_msg_queue.lock().unwrap().work_list.lock().unwrap().clone(); + } + + pub fn add_down_stream_pack(&self, socket_handle: u16, data: bytes::BytesMut) { + println!("add_down_stream_pack1 len:{0}", data.len()); + let node = Arc::new(Mutex::new(DownStreamPack{ + holder: Default::default(), + socket_handle: socket_handle, + data: data, + entry: r9::ListHeadLock::< crate::app::user_app::DownStreamPack>::new_node(Default::default()), + })); + node.lock().unwrap().holder = Some(node.clone()); + node.lock().unwrap().entry = r9::ListHeadLock::< crate::app::user_app::DownStreamPack>::new_node(Arc::downgrade(&node)); + self.net_msg_queue.lock().unwrap().push(&node.lock().unwrap().entry); + } + } diff --git a/server/stat/src/listener/wslistener.rs b/server/stat/src/listener/wslistener.rs index e6cfd87..66fb4dd 100644 --- a/server/stat/src/listener/wslistener.rs +++ b/server/stat/src/listener/wslistener.rs @@ -44,8 +44,8 @@ pub struct WsListener { cur_socket_handle: u16, pub upstream: std::sync::Weak>, tokio_rt: Runtime, - down_pack_queue: Arc>>, - up_pack_queue: Arc>>, + down_pack_queue: Arc>>, + up_pack_queue: Arc>>, } #[derive(Message)] @@ -185,8 +185,8 @@ async fn index(req: HttpRequest, stream: web::Payload) -> Result>, down_pack_queue: Arc>>, - up_pack_queue: Arc>>) { +async fn upstream_enter(stream_arc: Arc>, down_pack_queue: Arc>>, + up_pack_queue: Arc>>) { tokio::spawn(async move { println!("upstream_enter2"); let mut recv_buf = BytesMut::with_capacity(1024 * 64 * 3); @@ -209,7 +209,7 @@ async fn upstream_enter(stream_arc: Arc>, down_pack_queue: Arc< let mut data = vec![0; 1024 * 64]; match stream.try_read(&mut data) { Ok(len) => { - println!("read {0} bytes",len); + println!("read {0} bytes recv_buf_len:{1}",len, recv_buf_len); for i in 0..len { recv_buf[recv_buf_len + i] = data[i]; //println!("sendmsg xxxxx idx:{0} val:{1} buf_val:{2} recv_buf_len:{3}", i, data[i], recv_buf[recv_buf_len + 1], recv_buf_len); @@ -239,20 +239,20 @@ async fn upstream_enter(stream_arc: Arc>, down_pack_queue: Arc< } let mut a_bytes = actix_web::web::Bytes::copy_from_slice(tmp_bytes.as_ref()); { - let node = Rc::new(RefCell::new(DownStreamPack{ + let node = Arc::new(std::sync::Mutex::new(DownStreamPack{ holder: Default::default(), socket_handle: socket_handle, data: tmp_bytes, - entry: r9::ListHead::< crate::app::user_app::DownStreamPack>::new_node(Default::default()), + entry: r9::ListHeadLock::::new_node(Default::default()), })); - node.borrow_mut().holder = Some(node.clone()); - node.borrow_mut().entry = r9::ListHead::< crate::app::user_app::DownStreamPack>::new_node(Rc::downgrade(&node)); - up_pack_queue.lock().unwrap().push(&node.borrow_mut().entry); + node.lock().unwrap().holder = Some(node.clone()); + node.lock().unwrap().entry = r9::ListHeadLock::< crate::app::user_app::DownStreamPack>::new_node(Arc::downgrade(&node)); + up_pack_queue.lock().unwrap().push(&node.lock().unwrap().entry); } //a.do_send(msg); { - println!("sendmsg socket_handle:{5} pack_len:{0} msg_id:{1} seq_id:{2} magic_code:{3} ext_len:{4}", + println!("sendmsg socket_handle:{0} pack_len:{1} msg_id:{2} seq_id:{3} magic_code:{4} ext_len:{5}", socket_handle, pack_len, msg_id, @@ -287,10 +287,10 @@ async fn upstream_enter(stream_arc: Arc>, down_pack_queue: Arc< //down_pack_queue.get_mut().fetch(); let work_list = down_pack_queue.lock().unwrap().work_list.lock().unwrap().clone(); //let work_list = down_pack_queue.get_mut().work_list.lock().unwrap().clone(); - while !work_list.borrow().empty() { - let node = &work_list.borrow().first_entry(); - node.upgrade().unwrap().borrow_mut().entry.borrow_mut().del_init(); - data = node.upgrade().unwrap().borrow().data.clone(); + while !work_list.lock().unwrap().empty() { + let node = &work_list.lock().unwrap().first_entry(); + node.upgrade().unwrap().lock().unwrap().entry.lock().unwrap().del_init(); + data = node.upgrade().unwrap().lock().unwrap().data.clone(); break; //stream.write_all(node.upgrade().unwrap().borrow().data.as_ref()).await; //stream.write_all(data.as_ref()).await; @@ -324,8 +324,8 @@ impl WsListener { .enable_all() .build() .unwrap(), - down_pack_queue: r9::Queue::::new_ex(), - up_pack_queue: r9::Queue::::new_ex(), + down_pack_queue: r9::QueueLock::::new_ex(), + up_pack_queue: r9::QueueLock::::new_ex(), }; return p; } @@ -386,11 +386,11 @@ impl WsListener { //down_pack_queue.get_mut().fetch(); let work_list = self.up_pack_queue.lock().unwrap().work_list.lock().unwrap().clone(); //let work_list = down_pack_queue.get_mut().work_list.lock().unwrap().clone(); - while !work_list.borrow().empty() { - let node = &work_list.borrow().first_entry(); - node.upgrade().unwrap().borrow_mut().entry.borrow_mut().del_init(); - data = node.upgrade().unwrap().borrow().data.clone(); - socket_handle = node.upgrade().unwrap().borrow().socket_handle; + while !work_list.lock().unwrap().empty() { + let node = &work_list.lock().unwrap().first_entry(); + node.upgrade().unwrap().lock().unwrap().entry.lock().unwrap().del_init(); + data = node.upgrade().unwrap().lock().unwrap().data.clone(); + socket_handle = node.upgrade().unwrap().lock().unwrap().socket_handle; break; //stream.write_all(node.upgrade().unwrap().borrow().data.as_ref()).await; //stream.write_all(data.as_ref()).await; @@ -416,10 +416,10 @@ impl WsListener { } - pub fn add_down_stream_pack(&self, data: Weak::>) { - println!("add_down_stream_pack2 len:{0}", data.upgrade().unwrap().borrow().data.len()); - let socket_handle = data.upgrade().unwrap().borrow().socket_handle; - self.down_pack_queue.lock().as_ref().unwrap().push(&data.upgrade().unwrap().borrow().entry); + pub fn add_down_stream_pack(&self, data: std::sync::Weak::>) { + println!("add_down_stream_pack2 len:{0}", data.upgrade().unwrap().lock().unwrap().data.len()); + let socket_handle = data.upgrade().unwrap().lock().unwrap().socket_handle; + self.down_pack_queue.lock().as_ref().unwrap().push(&data.upgrade().unwrap().lock().unwrap().entry); } fn on_connect(&mut self, conn: &mut WsConn, socket: Recipient) { self.cur_socket_handle += 1; diff --git a/third_party/librust b/third_party/librust index c3280f8..0611d80 160000 --- a/third_party/librust +++ b/third_party/librust @@ -1 +1 @@ -Subproject commit c3280f8747cba62bb6809d30ab3014dea52981d2 +Subproject commit 0611d8063c8ff2a1f833468f3ca7d1e8e0770e4c