From da9b5aff3f66802fbbb812907c7781d40ab53bd0 Mon Sep 17 00:00:00 2001 From: azw Date: Sat, 11 May 2024 09:24:41 +0800 Subject: [PATCH] 1 --- server/stat/src/app/user_app.rs | 4 ++-- server/stat/src/listener/wslistener.rs | 33 ++++++++++++++++---------- 2 files changed, 22 insertions(+), 15 deletions(-) diff --git a/server/stat/src/app/user_app.rs b/server/stat/src/app/user_app.rs index 518f245..2bdb36d 100644 --- a/server/stat/src/app/user_app.rs +++ b/server/stat/src/app/user_app.rs @@ -15,8 +15,8 @@ use r9::xtimer::TimerList; pub struct DownStreamPack { holder: Option>>, socket_handle: u16, - data: bytes::BytesMut, - entry: Rc::>>, + pub data: bytes::BytesMut, + pub entry: Rc::>>, } unsafe impl Send for DownStreamPack {} diff --git a/server/stat/src/listener/wslistener.rs b/server/stat/src/listener/wslistener.rs index 6907f9d..059ed6e 100644 --- a/server/stat/src/listener/wslistener.rs +++ b/server/stat/src/listener/wslistener.rs @@ -27,6 +27,7 @@ use crate::{AppStateWithCounter, GSResponse, MAX_PACKET_LEN}; use tokio::net::TcpStream; use tokio::runtime::Runtime; use crate::app::user_app::DownStreamPack; +use tokio::io::{AsyncWriteExt, Ready}; #[derive(SharedFromSelf)] #[derive(Singleton)] @@ -176,8 +177,10 @@ async fn index(req: HttpRequest, stream: web::Payload) -> Result>>,) { +async fn upstream_enter(mut stream: TcpStream, down_pack_queue: Arc>>,) { + println!("upstream_enter1"); tokio::spawn(async move { + println!("upstream_enter2"); loop { tokio::time::sleep(Duration::from_millis(10)).await; let ready = stream.ready(Interest::READABLE | Interest::WRITABLE).await; @@ -211,18 +214,22 @@ async fn upstream_enter(stream: TcpStream, down_pack_queue: Arc 0 { - //Arc::downgrade(&mut send_queue).upgrade().unwrap().lock().unwrap().borrow_mut()[0] - let bytes = Arc::downgrade(&mut send_queue).upgrade().unwrap().lock().unwrap().borrow_mut().pop(); - let len = bytes.clone().unwrap().clone().len(); - { - //println!("sendmsg len:{0}", len); + let mut data = bytes::BytesMut::new(); + { + down_pack_queue.lock().unwrap().fetch(); + let work_list = down_pack_queue.lock().unwrap().work_list.lock().unwrap().clone(); + while !work_list.borrow().empty() { + let node = &work_list.borrow().first_entry(); + node.upgrade().unwrap().borrow_mut().entry.borrow_mut().del_init(); + data = node.upgrade().unwrap().borrow().data.clone(); + //stream.write_all(node.upgrade().unwrap().borrow().data.as_ref()).await; + //stream.write_all(data.as_ref()).await; } - { - down_stream.write_all(bytes.unwrap().as_ref()).await; - } - }*/ + } + if data.len() > 0 { + println!("write_all len:{0}", data.len()); + stream.write_all(data.as_ref()).await; + } } } } @@ -286,7 +293,7 @@ impl WsListener { Ok(v) => { //upstream.lock().unwrap().replace(v); println!("connect ok"); - upstream_enter(v, down_pack_queue); + upstream_enter(v, down_pack_queue).await; }, Err(e) => { println!("connect err")