This commit is contained in:
azw 2024-05-12 20:36:39 +08:00
parent 71f7a51ef8
commit 14beddb88a
4 changed files with 13 additions and 4 deletions

View File

@ -43,7 +43,7 @@ impl f9::app::UserApp for UserApp {
match cur_node {
Some(v) => {
v.upgrade().unwrap().lock().unwrap().entry.lock().unwrap().del_init();
WsListener::instance().add_down_stream_pack(v.clone());
UpStreamMgr::instance().add_down_stream_pack(v.clone());
}
None => {}
}

View File

@ -398,7 +398,7 @@ impl WsListener {
}
pub fn add_down_stream_pack(&self, data: std::sync::Weak::<std::sync::Mutex::<DownStreamPack>>) {
fn add_down_stream_pack(&self, data: std::sync::Weak::<std::sync::Mutex::<DownStreamPack>>) {
println!("add_down_stream_pack2 len:{0}", data.upgrade().unwrap().lock().unwrap().data.len());
let socket_handle = data.upgrade().unwrap().lock().unwrap().socket_handle;
self.down_pack_queue.lock().as_ref().unwrap().push(&data.upgrade().unwrap().lock().unwrap().entry);

View File

@ -13,8 +13,8 @@ pub struct UpStream {
remote_port: i32,
last_pong_tick: i64,
down_pack_queue: Arc<Mutex::<r9::QueueLock<DownStreamPack>>>,
up_pack_queue: Arc<Mutex::<r9::QueueLock<UpStreamPack>>>,
pub down_pack_queue: Arc<Mutex::<r9::QueueLock<DownStreamPack>>>,
pub up_pack_queue: Arc<Mutex::<r9::QueueLock<UpStreamPack>>>,
}
async fn upstream_enter(stream_arc: Arc<tokio::sync::Mutex<TcpStream>>, down_pack_queue: Arc<std::sync::Mutex::<r9::QueueLock<DownStreamPack>>>,

View File

@ -6,6 +6,7 @@ use std::cell::RefCell;
use std::sync::{Arc, Mutex};
use crate::upstream::UpStream;
use crate::app::UserApp;
use crate::common::{DownStreamPack, UpStreamPack};
pub struct UpStreamMgr {
curr_id: i16,
@ -46,4 +47,12 @@ impl UpStreamMgr {
}
pub fn add_down_stream_pack(&self, data: std::sync::Weak::<std::sync::Mutex::<DownStreamPack>>) {
println!("add_down_stream_pack2 len:{0}", data.upgrade().unwrap().lock().unwrap().data.len());
let socket_handle = data.upgrade().unwrap().lock().unwrap().socket_handle;
for (key, value) in self.id_hash.borrow().iter() {
value.down_pack_queue.lock().unwrap().push(&data.upgrade().unwrap().lock().unwrap().entry);
}
}
}