From a37723dd6c3ba70ddcc3db483e14c02f34515ef3 Mon Sep 17 00:00:00 2001 From: azw Date: Sun, 12 May 2024 19:40:11 +0800 Subject: [PATCH] 1 --- server/stat/src/common/types.rs | 23 +++++++++++++++++++++++ server/stat/src/listener/wslistener.rs | 10 +++++----- server/stat/src/upstream/upstream.rs | 5 ++++- 3 files changed, 32 insertions(+), 6 deletions(-) diff --git a/server/stat/src/common/types.rs b/server/stat/src/common/types.rs index d146ac7..6ed4054 100644 --- a/server/stat/src/common/types.rs +++ b/server/stat/src/common/types.rs @@ -10,6 +10,13 @@ pub struct DownStreamPack { pub entry: Arc::>>, } +pub struct UpStreamPack { + holder: Option>>, + pub socket_handle: u16, + pub data: bytes::BytesMut, + pub entry: Arc::>>, +} + #[derive(Message)] #[rtype(result = "()")] pub struct DownStreamMessage { @@ -30,4 +37,20 @@ impl DownStreamPack { return p; } +} + +impl UpStreamPack { + + pub fn new(socket_handle: u16, data: bytes::BytesMut) -> Arc> { + let p = Arc::new(std::sync::Mutex::new(UpStreamPack{ + holder: Default::default(), + socket_handle: socket_handle, + data: data, + entry: r9::ListHeadLock::::new_node(Default::default()), + })); + p.lock().unwrap().holder = Some(p.clone()); + p.lock().unwrap().entry = r9::ListHeadLock::::new_node(Arc::downgrade(&p)); + return p; + } + } \ No newline at end of file diff --git a/server/stat/src/listener/wslistener.rs b/server/stat/src/listener/wslistener.rs index 0670465..5f072c4 100644 --- a/server/stat/src/listener/wslistener.rs +++ b/server/stat/src/listener/wslistener.rs @@ -21,7 +21,7 @@ use crate::app::UserApp; use crate::constant; use tokio::net::TcpStream; use tokio::runtime::Runtime; -use crate::common::types::{DownStreamPack, DownStreamMessage}; +use crate::common::types::{DownStreamPack, DownStreamMessage, UpStreamPack}; use tokio::io::{AsyncWriteExt}; use std::sync::OnceLock; use std::sync::atomic::{AtomicU16, Ordering}; @@ -37,7 +37,7 @@ pub struct WsListener { pub upstream: std::sync::Mutex>>>, tokio_rt: Runtime, down_pack_queue: Arc>>, - up_pack_queue: Arc>>, + up_pack_queue: Arc>>, } static WsListenerInstance: OnceLock = OnceLock::new(); @@ -173,7 +173,7 @@ async fn index(req: HttpRequest, stream: web::Payload) -> Result>, down_pack_queue: Arc>>, - up_pack_queue: Arc>>) { + up_pack_queue: Arc>>) { tokio::spawn(async move { println!("upstream_enter2"); let mut recv_buf = BytesMut::with_capacity(1024 * 64 * 3); @@ -226,7 +226,7 @@ async fn upstream_enter(stream_arc: Arc>, down_pack_queue: Arc< } let mut a_bytes = actix_web::web::Bytes::copy_from_slice(tmp_bytes.as_ref()); { - let node = DownStreamPack::new(socket_handle, tmp_bytes); + let node = UpStreamPack::new(socket_handle, tmp_bytes); up_pack_queue.lock().unwrap().push(&node.lock().unwrap().entry); } //a.do_send(msg); @@ -308,7 +308,7 @@ impl WsListener { .build() .unwrap(), down_pack_queue: r9::QueueLock::::new_ex(), - up_pack_queue: r9::QueueLock::::new_ex(), + up_pack_queue: r9::QueueLock::::new_ex(), }; return p; } diff --git a/server/stat/src/upstream/upstream.rs b/server/stat/src/upstream/upstream.rs index 1d7fb30..3565b5b 100644 --- a/server/stat/src/upstream/upstream.rs +++ b/server/stat/src/upstream/upstream.rs @@ -1,3 +1,6 @@ +use std::sync::{Arc, Mutex}; +use r9; +use crate::common::DownStreamPack; pub struct UpStream { - + down_pack_queue: Arc>>, }