From fa8e824d8de7003689342a1f65f43c5f7540c1d5 Mon Sep 17 00:00:00 2001 From: azw Date: Sun, 12 May 2024 17:06:34 +0800 Subject: [PATCH] 1 --- server/stat/src/app/user_app.rs | 8 ++--- server/stat/src/listener/wslistener.rs | 46 ++++++++++++++------------ 2 files changed, 28 insertions(+), 26 deletions(-) diff --git a/server/stat/src/app/user_app.rs b/server/stat/src/app/user_app.rs index da1ae8e..5b7a56b 100644 --- a/server/stat/src/app/user_app.rs +++ b/server/stat/src/app/user_app.rs @@ -22,7 +22,7 @@ impl f9::app::UserApp for UserApp { } fn init(&self) { - WsListener::instance().borrow_mut().init(); + WsListener::instance().init(); UpStreamMgr::instance().borrow_mut().init(); } @@ -45,16 +45,16 @@ impl f9::app::UserApp for UserApp { match cur_node { Some(v) => { v.upgrade().unwrap().lock().unwrap().entry.lock().unwrap().del_init(); - WsListener::instance().borrow().add_down_stream_pack(v.clone()); + WsListener::instance().add_down_stream_pack(v.clone()); } None => {} } } - WsListener::instance().borrow().update(); + WsListener::instance().update(); } fn uninit(&self) { - WsListener::instance().borrow_mut().uninit(); + WsListener::instance().uninit(); } fn get_http_listen_port(&self) -> i32 { diff --git a/server/stat/src/listener/wslistener.rs b/server/stat/src/listener/wslistener.rs index 66fb4dd..e6fded6 100644 --- a/server/stat/src/listener/wslistener.rs +++ b/server/stat/src/listener/wslistener.rs @@ -2,8 +2,6 @@ use std::rc::{Rc, Weak}; use std::cell::RefCell; use std::collections::HashMap; use r9_macro::SharedFromSelf; -use r9_macro_derive::SharedFromSelf; -use r9_macro_derive::Singleton; use actix::{Actor, StreamHandler}; use actix_web_actors::ws::{self, Message}; use actix_web::{ @@ -30,24 +28,25 @@ use tokio::net::TcpStream; use tokio::runtime::Runtime; use crate::app::user_app::DownStreamPack; use tokio::io::{AsyncWriteExt, Ready}; +use std::sync::OnceLock; +use std::sync::atomic::{AtomicU16, Ordering}; struct AppStateWithCounter { counter: std::sync::Mutex, // <- Mutex is necessary to mutate safely across threadsds } -#[derive(SharedFromSelf)] -#[derive(Singleton)] pub struct WsListener { - _self_wp: Weak::>, work_thread: Option>, - socket_hash: HashMap>, - cur_socket_handle: u16, + socket_hash: std::sync::Mutex>>, + cur_socket_handle: AtomicU16, pub upstream: std::sync::Weak>, tokio_rt: Runtime, down_pack_queue: Arc>>, up_pack_queue: Arc>>, } +static WsListenerInstance: OnceLock = OnceLock::new(); + #[derive(Message)] #[rtype(result = "()")] pub struct DownStreamMessage { @@ -143,7 +142,7 @@ impl StreamHandler> for WsConn { fn started(&mut self, ctx: &mut Self::Context) { let a: Recipient = ctx.address().recipient(); - WsListener::instance().borrow_mut().on_connect(self, a); + WsListener::instance().on_connect(self, a); } fn handle(&mut self, item: Result, ctx: &mut Self::Context) { match item { @@ -158,7 +157,7 @@ impl StreamHandler> for WsConn { } Ok(Message::Close(reason)) => { ctx.close(reason); - WsListener::instance().borrow_mut().on_disconnect(self.socket_handle); + WsListener::instance().on_disconnect(self.socket_handle); }, _ => {}, } @@ -313,10 +312,13 @@ async fn upstream_enter(stream_arc: Arc>, down_pack_queue: Arc< impl WsListener { + pub fn instance() -> &'static Self { + return WsListenerInstance.get_or_init(|| Self::new()); + } + pub fn new() -> Self { let p = Self{ - cur_socket_handle: 0, - _self_wp: Default::default(), + cur_socket_handle: AtomicU16::new(0), work_thread: Default::default(), socket_hash: Default::default(), upstream: Default::default(), @@ -330,10 +332,10 @@ impl WsListener { return p; } - pub fn init(&mut self) { + pub fn init(&self) { //self.work_thread = Arc::new(Some(1)); { - self.work_thread = Some(thread::spawn(|| { + Some(thread::spawn(|| { println!("hello2"); let mut rt = tokio::runtime::Builder::new_multi_thread() .enable_all() @@ -364,7 +366,7 @@ impl WsListener { match result { Ok(v) => { let a = Arc::new(Mutex::new(v)); - WsListener::instance().borrow_mut().upstream = Arc::downgrade(&a); + //WsListener::instance().upstream = Arc::downgrade(&a); println!("connect ok"); upstream_enter(a, down_pack_queue, up_pack_queue).await; }, @@ -398,7 +400,7 @@ impl WsListener { } if data.len() > 0 { println!("write_all3 socket_handle:{0} len:{1}", socket_handle, data.len()); - match self.socket_hash.get(&socket_handle){ + match self.socket_hash.lock().unwrap().get(&socket_handle){ Some(v) => { let msg = DownStreamMessage{ data: actix_web::web::Bytes::copy_from_slice(data.as_ref()) @@ -412,7 +414,7 @@ impl WsListener { } } - pub fn uninit(&mut self) { + pub fn uninit(&self) { } @@ -421,15 +423,15 @@ impl WsListener { let socket_handle = data.upgrade().unwrap().lock().unwrap().socket_handle; self.down_pack_queue.lock().as_ref().unwrap().push(&data.upgrade().unwrap().lock().unwrap().entry); } - fn on_connect(&mut self, conn: &mut WsConn, socket: Recipient) { - self.cur_socket_handle += 1; - conn.socket_handle = self.cur_socket_handle; - self.socket_hash.insert(conn.socket_handle, socket); + fn on_connect(&self, conn: &mut WsConn, socket: Recipient) { + self.cur_socket_handle.fetch_add(1, Ordering::Relaxed); + conn.socket_handle = self.cur_socket_handle.load(Ordering::Relaxed); + self.socket_hash.lock().unwrap().insert(conn.socket_handle, socket); println!("on_connect socket_handle:{0}", conn.socket_handle); } - fn on_disconnect(&mut self, socket_handle: u16) { - self.socket_hash.remove(&socket_handle); + fn on_disconnect(&self, socket_handle: u16) { + self.socket_hash.lock().unwrap().remove(&socket_handle); println!("on_disconnect socket_handle:{0}", socket_handle); }