This commit is contained in:
azw 2024-05-12 17:06:34 +08:00
parent 700c6b448b
commit fa8e824d8d
2 changed files with 28 additions and 26 deletions

View File

@ -22,7 +22,7 @@ impl f9::app::UserApp for UserApp {
} }
fn init(&self) { fn init(&self) {
WsListener::instance().borrow_mut().init(); WsListener::instance().init();
UpStreamMgr::instance().borrow_mut().init(); UpStreamMgr::instance().borrow_mut().init();
} }
@ -45,16 +45,16 @@ impl f9::app::UserApp for UserApp {
match cur_node { match cur_node {
Some(v) => { Some(v) => {
v.upgrade().unwrap().lock().unwrap().entry.lock().unwrap().del_init(); v.upgrade().unwrap().lock().unwrap().entry.lock().unwrap().del_init();
WsListener::instance().borrow().add_down_stream_pack(v.clone()); WsListener::instance().add_down_stream_pack(v.clone());
} }
None => {} None => {}
} }
} }
WsListener::instance().borrow().update(); WsListener::instance().update();
} }
fn uninit(&self) { fn uninit(&self) {
WsListener::instance().borrow_mut().uninit(); WsListener::instance().uninit();
} }
fn get_http_listen_port(&self) -> i32 { fn get_http_listen_port(&self) -> i32 {

View File

@ -2,8 +2,6 @@ use std::rc::{Rc, Weak};
use std::cell::RefCell; use std::cell::RefCell;
use std::collections::HashMap; use std::collections::HashMap;
use r9_macro::SharedFromSelf; use r9_macro::SharedFromSelf;
use r9_macro_derive::SharedFromSelf;
use r9_macro_derive::Singleton;
use actix::{Actor, StreamHandler}; use actix::{Actor, StreamHandler};
use actix_web_actors::ws::{self, Message}; use actix_web_actors::ws::{self, Message};
use actix_web::{ use actix_web::{
@ -30,24 +28,25 @@ use tokio::net::TcpStream;
use tokio::runtime::Runtime; use tokio::runtime::Runtime;
use crate::app::user_app::DownStreamPack; use crate::app::user_app::DownStreamPack;
use tokio::io::{AsyncWriteExt, Ready}; use tokio::io::{AsyncWriteExt, Ready};
use std::sync::OnceLock;
use std::sync::atomic::{AtomicU16, Ordering};
struct AppStateWithCounter { struct AppStateWithCounter {
counter: std::sync::Mutex<i32>, // <- Mutex is necessary to mutate safely across threadsds counter: std::sync::Mutex<i32>, // <- Mutex is necessary to mutate safely across threadsds
} }
#[derive(SharedFromSelf)]
#[derive(Singleton)]
pub struct WsListener { pub struct WsListener {
_self_wp: Weak::<RefCell::<Self>>,
work_thread: Option<JoinHandle<()>>, work_thread: Option<JoinHandle<()>>,
socket_hash: HashMap<u16, Recipient<DownStreamMessage>>, socket_hash: std::sync::Mutex<HashMap<u16, Recipient<DownStreamMessage>>>,
cur_socket_handle: u16, cur_socket_handle: AtomicU16,
pub upstream: std::sync::Weak<Mutex<TcpStream>>, pub upstream: std::sync::Weak<Mutex<TcpStream>>,
tokio_rt: Runtime, tokio_rt: Runtime,
down_pack_queue: Arc<std::sync::Mutex<r9::QueueLock<DownStreamPack>>>, down_pack_queue: Arc<std::sync::Mutex<r9::QueueLock<DownStreamPack>>>,
up_pack_queue: Arc<std::sync::Mutex<r9::QueueLock<DownStreamPack>>>, up_pack_queue: Arc<std::sync::Mutex<r9::QueueLock<DownStreamPack>>>,
} }
static WsListenerInstance: OnceLock<WsListener> = OnceLock::new();
#[derive(Message)] #[derive(Message)]
#[rtype(result = "()")] #[rtype(result = "()")]
pub struct DownStreamMessage { pub struct DownStreamMessage {
@ -143,7 +142,7 @@ impl StreamHandler<Result<Message, ws::ProtocolError>> for WsConn {
fn started(&mut self, ctx: &mut Self::Context) { fn started(&mut self, ctx: &mut Self::Context) {
let a: Recipient<DownStreamMessage> = ctx.address().recipient(); let a: Recipient<DownStreamMessage> = ctx.address().recipient();
WsListener::instance().borrow_mut().on_connect(self, a); WsListener::instance().on_connect(self, a);
} }
fn handle(&mut self, item: Result<Message, ws::ProtocolError>, ctx: &mut Self::Context) { fn handle(&mut self, item: Result<Message, ws::ProtocolError>, ctx: &mut Self::Context) {
match item { match item {
@ -158,7 +157,7 @@ impl StreamHandler<Result<Message, ws::ProtocolError>> for WsConn {
} }
Ok(Message::Close(reason)) => { Ok(Message::Close(reason)) => {
ctx.close(reason); ctx.close(reason);
WsListener::instance().borrow_mut().on_disconnect(self.socket_handle); WsListener::instance().on_disconnect(self.socket_handle);
}, },
_ => {}, _ => {},
} }
@ -313,10 +312,13 @@ async fn upstream_enter(stream_arc: Arc<Mutex<TcpStream>>, down_pack_queue: Arc<
impl WsListener { impl WsListener {
pub fn instance() -> &'static Self {
return WsListenerInstance.get_or_init(|| Self::new());
}
pub fn new() -> Self { pub fn new() -> Self {
let p = Self{ let p = Self{
cur_socket_handle: 0, cur_socket_handle: AtomicU16::new(0),
_self_wp: Default::default(),
work_thread: Default::default(), work_thread: Default::default(),
socket_hash: Default::default(), socket_hash: Default::default(),
upstream: Default::default(), upstream: Default::default(),
@ -330,10 +332,10 @@ impl WsListener {
return p; return p;
} }
pub fn init(&mut self) { pub fn init(&self) {
//self.work_thread = Arc::new(Some(1)); //self.work_thread = Arc::new(Some(1));
{ {
self.work_thread = Some(thread::spawn(|| { Some(thread::spawn(|| {
println!("hello2"); println!("hello2");
let mut rt = tokio::runtime::Builder::new_multi_thread() let mut rt = tokio::runtime::Builder::new_multi_thread()
.enable_all() .enable_all()
@ -364,7 +366,7 @@ impl WsListener {
match result { match result {
Ok(v) => { Ok(v) => {
let a = Arc::new(Mutex::new(v)); let a = Arc::new(Mutex::new(v));
WsListener::instance().borrow_mut().upstream = Arc::downgrade(&a); //WsListener::instance().upstream = Arc::downgrade(&a);
println!("connect ok"); println!("connect ok");
upstream_enter(a, down_pack_queue, up_pack_queue).await; upstream_enter(a, down_pack_queue, up_pack_queue).await;
}, },
@ -398,7 +400,7 @@ impl WsListener {
} }
if data.len() > 0 { if data.len() > 0 {
println!("write_all3 socket_handle:{0} len:{1}", socket_handle, data.len()); println!("write_all3 socket_handle:{0} len:{1}", socket_handle, data.len());
match self.socket_hash.get(&socket_handle){ match self.socket_hash.lock().unwrap().get(&socket_handle){
Some(v) => { Some(v) => {
let msg = DownStreamMessage{ let msg = DownStreamMessage{
data: actix_web::web::Bytes::copy_from_slice(data.as_ref()) data: actix_web::web::Bytes::copy_from_slice(data.as_ref())
@ -412,7 +414,7 @@ impl WsListener {
} }
} }
pub fn uninit(&mut self) { pub fn uninit(&self) {
} }
@ -421,15 +423,15 @@ impl WsListener {
let socket_handle = data.upgrade().unwrap().lock().unwrap().socket_handle; let socket_handle = data.upgrade().unwrap().lock().unwrap().socket_handle;
self.down_pack_queue.lock().as_ref().unwrap().push(&data.upgrade().unwrap().lock().unwrap().entry); self.down_pack_queue.lock().as_ref().unwrap().push(&data.upgrade().unwrap().lock().unwrap().entry);
} }
fn on_connect(&mut self, conn: &mut WsConn, socket: Recipient<DownStreamMessage>) { fn on_connect(&self, conn: &mut WsConn, socket: Recipient<DownStreamMessage>) {
self.cur_socket_handle += 1; self.cur_socket_handle.fetch_add(1, Ordering::Relaxed);
conn.socket_handle = self.cur_socket_handle; conn.socket_handle = self.cur_socket_handle.load(Ordering::Relaxed);
self.socket_hash.insert(conn.socket_handle, socket); self.socket_hash.lock().unwrap().insert(conn.socket_handle, socket);
println!("on_connect socket_handle:{0}", conn.socket_handle); println!("on_connect socket_handle:{0}", conn.socket_handle);
} }
fn on_disconnect(&mut self, socket_handle: u16) { fn on_disconnect(&self, socket_handle: u16) {
self.socket_hash.remove(&socket_handle); self.socket_hash.lock().unwrap().remove(&socket_handle);
println!("on_disconnect socket_handle:{0}", socket_handle); println!("on_disconnect socket_handle:{0}", socket_handle);
} }