diff --git a/server/stat/src/app/user_app.rs b/server/stat/src/app/user_app.rs index 17e82be..4168a84 100644 --- a/server/stat/src/app/user_app.rs +++ b/server/stat/src/app/user_app.rs @@ -43,7 +43,7 @@ impl f9::app::UserApp for UserApp { match cur_node { Some(v) => { v.upgrade().unwrap().lock().unwrap().entry.lock().unwrap().del_init(); - WsListener::instance().add_down_stream_pack(v.clone()); + UpStreamMgr::instance().add_down_stream_pack(v.clone()); } None => {} } diff --git a/server/stat/src/listener/wslistener.rs b/server/stat/src/listener/wslistener.rs index 5f072c4..fb3e782 100644 --- a/server/stat/src/listener/wslistener.rs +++ b/server/stat/src/listener/wslistener.rs @@ -398,7 +398,7 @@ impl WsListener { } - pub fn add_down_stream_pack(&self, data: std::sync::Weak::>) { + fn add_down_stream_pack(&self, data: std::sync::Weak::>) { println!("add_down_stream_pack2 len:{0}", data.upgrade().unwrap().lock().unwrap().data.len()); let socket_handle = data.upgrade().unwrap().lock().unwrap().socket_handle; self.down_pack_queue.lock().as_ref().unwrap().push(&data.upgrade().unwrap().lock().unwrap().entry); diff --git a/server/stat/src/upstream/upstream.rs b/server/stat/src/upstream/upstream.rs index c3fd150..dffa803 100644 --- a/server/stat/src/upstream/upstream.rs +++ b/server/stat/src/upstream/upstream.rs @@ -13,8 +13,8 @@ pub struct UpStream { remote_port: i32, last_pong_tick: i64, - down_pack_queue: Arc>>, - up_pack_queue: Arc>>, + pub down_pack_queue: Arc>>, + pub up_pack_queue: Arc>>, } async fn upstream_enter(stream_arc: Arc>, down_pack_queue: Arc>>, diff --git a/server/stat/src/upstream/upstreammgr.rs b/server/stat/src/upstream/upstreammgr.rs index bcc7a90..850d11d 100644 --- a/server/stat/src/upstream/upstreammgr.rs +++ b/server/stat/src/upstream/upstreammgr.rs @@ -6,6 +6,7 @@ use std::cell::RefCell; use std::sync::{Arc, Mutex}; use crate::upstream::UpStream; use crate::app::UserApp; +use crate::common::{DownStreamPack, UpStreamPack}; pub struct UpStreamMgr { curr_id: i16, @@ -46,4 +47,12 @@ impl UpStreamMgr { } + pub fn add_down_stream_pack(&self, data: std::sync::Weak::>) { + println!("add_down_stream_pack2 len:{0}", data.upgrade().unwrap().lock().unwrap().data.len()); + let socket_handle = data.upgrade().unwrap().lock().unwrap().socket_handle; + for (key, value) in self.id_hash.borrow().iter() { + value.down_pack_queue.lock().unwrap().push(&data.upgrade().unwrap().lock().unwrap().entry); + } + } + }