From a183d86152b24ea8dde68f6b4fca35039b776f18 Mon Sep 17 00:00:00 2001 From: azw Date: Sun, 12 May 2024 20:49:20 +0800 Subject: [PATCH] 1 --- server/stat/src/app/user_app.rs | 1 + server/stat/src/listener/wslistener.rs | 18 +++++++++++++++- server/stat/src/upstream/upstream.rs | 28 ++++++++++++++++++++++++- server/stat/src/upstream/upstreammgr.rs | 6 ++++++ 4 files changed, 51 insertions(+), 2 deletions(-) diff --git a/server/stat/src/app/user_app.rs b/server/stat/src/app/user_app.rs index 4168a84..f68b276 100644 --- a/server/stat/src/app/user_app.rs +++ b/server/stat/src/app/user_app.rs @@ -49,6 +49,7 @@ impl f9::app::UserApp for UserApp { } } WsListener::instance().update(); + UpStreamMgr::instance().update(); } fn uninit(&self) { diff --git a/server/stat/src/listener/wslistener.rs b/server/stat/src/listener/wslistener.rs index fb3e782..076232b 100644 --- a/server/stat/src/listener/wslistener.rs +++ b/server/stat/src/listener/wslistener.rs @@ -338,6 +338,7 @@ impl WsListener { .await }); })); + /* { let down_pack_queue = self.down_pack_queue.clone(); let up_pack_queue = self.up_pack_queue.clone(); @@ -355,12 +356,13 @@ impl WsListener { } } }); - } + }*/ println!("hello3"); } } pub fn update(&self) { + return; let mut data = bytes::BytesMut::new(); let mut socket_handle = 0; { @@ -415,4 +417,18 @@ impl WsListener { println!("on_disconnect socket_handle:{0}", socket_handle); } + pub fn send_to_downsteam(&self, socket_handle: u16, data: bytes::BytesMut) { + match self.socket_hash.lock().unwrap().get(&socket_handle){ + Some(v) => { + let msg = DownStreamMessage{ + data: actix_web::web::Bytes::copy_from_slice(data.as_ref()) + }; + v.do_send(msg); + }, + None => { + + } + } + } + } diff --git a/server/stat/src/upstream/upstream.rs b/server/stat/src/upstream/upstream.rs index dffa803..1996253 100644 --- a/server/stat/src/upstream/upstream.rs +++ b/server/stat/src/upstream/upstream.rs @@ -6,7 +6,9 @@ use tokio::io::{AsyncWriteExt}; use bytes::{BufMut, BytesMut}; use std::time::Duration; use r9; -use crate::common::{DownStreamPack, UpStreamPack}; +use crate::common::{DownStreamMessage, DownStreamPack, UpStreamPack}; +use crate::listener::WsListener; + pub struct UpStream { pub instance_id: i32, remote_ip: String, @@ -172,4 +174,28 @@ impl UpStream { } } + pub fn update(&self) { + let mut data = bytes::BytesMut::new(); + let mut socket_handle = 0; + { + self.up_pack_queue.lock().unwrap().fetch(); + //down_pack_queue.get_mut().fetch(); + let work_list = self.up_pack_queue.lock().unwrap().work_list.lock().unwrap().clone(); + //let work_list = down_pack_queue.get_mut().work_list.lock().unwrap().clone(); + while !work_list.lock().unwrap().empty() { + let node = &work_list.lock().unwrap().first_entry(); + node.upgrade().unwrap().lock().unwrap().entry.lock().unwrap().del_init(); + data = node.upgrade().unwrap().lock().unwrap().data.clone(); + socket_handle = node.upgrade().unwrap().lock().unwrap().socket_handle; + break; + //stream.write_all(node.upgrade().unwrap().borrow().data.as_ref()).await; + //stream.write_all(data.as_ref()).await; + } + } + if data.len() > 0 { + println!("write_all3 socket_handle:{0} len:{1}", socket_handle, data.len()); + WsListener::instance().send_to_downsteam(socket_handle, data); + } + } + } \ No newline at end of file diff --git a/server/stat/src/upstream/upstreammgr.rs b/server/stat/src/upstream/upstreammgr.rs index 850d11d..f9223c4 100644 --- a/server/stat/src/upstream/upstreammgr.rs +++ b/server/stat/src/upstream/upstreammgr.rs @@ -55,4 +55,10 @@ impl UpStreamMgr { } } + pub fn update(&self) { + for (key, value) in self.id_hash.borrow().iter() { + value.update(); + } + } + }