diff --git a/server/stat/src/app/user_app.rs b/server/stat/src/app/user_app.rs index 92236a3..bcae70b 100644 --- a/server/stat/src/app/user_app.rs +++ b/server/stat/src/app/user_app.rs @@ -12,6 +12,7 @@ use bytes::BytesMut; //use crate::ss; pub struct DownStreamPack { + socket_handle: u16, data: bytes::BytesMut, entry: Rc::>>, } @@ -70,8 +71,9 @@ impl f9::app::UserApp for UserApp { impl UserApp { - pub fn add_down_stream_pack(&mut self, data: bytes::BytesMut) { + pub fn add_down_stream_pack(&mut self, socket_handle: u16, data: bytes::BytesMut) { let node = Rc::new(RefCell::new(DownStreamPack{ + socket_handle: socket_handle, data: data, entry: r9::ListHead::< crate::app::user_app::DownStreamPack>::new_node(Default::default()), })); diff --git a/server/stat/src/listener/wslistener.rs b/server/stat/src/listener/wslistener.rs index 2394814..62f2408 100644 --- a/server/stat/src/listener/wslistener.rs +++ b/server/stat/src/listener/wslistener.rs @@ -1,5 +1,6 @@ use std::rc::{Rc, Weak}; use std::cell::RefCell; +use std::collections::HashMap; use r9_macro::SharedFromSelf; use r9_macro_derive::SharedFromSelf; use r9_macro_derive::Singleton; @@ -20,13 +21,15 @@ use actix::prelude::*; use bytes::{BufMut, BytesMut}; use actix_web::web::Bytes; use crate::app::UserApp; -use crate::MAX_PACKET_LEN; +use crate::{GSResponse, MAX_PACKET_LEN}; #[derive(SharedFromSelf)] #[derive(Singleton)] pub struct WsListener { _self_wp: Weak::>, work_thread: Option>, + socket_hash: HashMap>, + cur_socket_handle: u16, } #[derive(Message)] @@ -37,6 +40,7 @@ pub struct DownStreamMessage { /// Define HTTP actor pub struct WsConn { + socket_handle: u16, recv_buf_len: usize, recv_buf: BytesMut, } @@ -93,7 +97,7 @@ impl WsConn { println!("recv buf_len:{0}", tmp_bytes.len()); } { - UserApp::instance().borrow_mut().add_down_stream_pack(tmp_bytes); + UserApp::instance().borrow_mut().add_down_stream_pack(self.socket_handle, tmp_bytes); } offset += (pack_len as usize) + 12; } @@ -119,6 +123,11 @@ impl WsConn { } impl StreamHandler> for WsConn { + + fn started(&mut self, ctx: &mut Self::Context) { + let a: Recipient = ctx.address().recipient(); + WsListener::instance().borrow_mut().on_connect(self, a); + } fn handle(&mut self, item: Result, ctx: &mut Self::Context) { match item { Ok(Message::Text(text)) => { @@ -130,8 +139,11 @@ impl StreamHandler> for WsConn { Ok(Message::Binary(bin)) => { self.parse_pkt(&bin, ctx); } - Ok(Message::Close(reason)) => ctx.close(reason), - _ => (), + Ok(Message::Close(reason)) => { + ctx.close(reason); + WsListener::instance().borrow_mut().on_disconnect(self.socket_handle); + }, + _ => {}, } } } @@ -146,6 +158,7 @@ impl Handler for WsConn { async fn index(req: HttpRequest, stream: web::Payload) -> Result { let mut conn = WsConn{ + socket_handle: 0, recv_buf_len: 0, recv_buf: BytesMut::with_capacity(1024 * 64 * 2) }; @@ -159,8 +172,10 @@ impl WsListener { pub fn new() -> Self { let p = Self{ + cur_socket_handle: 0, _self_wp: Default::default(), work_thread: Default::default(), + socket_hash: Default::default(), }; return p; } @@ -193,4 +208,16 @@ impl WsListener { } + fn on_connect(&mut self, conn: &mut WsConn, socket: Recipient) { + self.cur_socket_handle += 1; + conn.socket_handle = self.cur_socket_handle; + self.socket_hash.insert(conn.socket_handle, socket); + println!("on_connect socket_handle:{0}", conn.socket_handle); + } + + fn on_disconnect(&mut self, socket_handle: u16) { + self.socket_hash.remove(&socket_handle); + println!("on_disconnect socket_handle:{0}", socket_handle); + } + } diff --git a/server/stat/src/main.rs b/server/stat/src/main.rs index 1c2ac03..f156dec 100755 --- a/server/stat/src/main.rs +++ b/server/stat/src/main.rs @@ -235,10 +235,6 @@ async fn index(req: HttpRequest, stream: web::Payload, data: web::Data