From 436c47ea3f46f75e3f96cfad850aca3b64407be5 Mon Sep 17 00:00:00 2001 From: azw Date: Sun, 12 May 2024 09:48:58 +0800 Subject: [PATCH] 1 --- server/stat/src/listener/wslistener.rs | 52 ++++++++++++++++++-------- 1 file changed, 37 insertions(+), 15 deletions(-) diff --git a/server/stat/src/listener/wslistener.rs b/server/stat/src/listener/wslistener.rs index 23812c7..c0caa7f 100644 --- a/server/stat/src/listener/wslistener.rs +++ b/server/stat/src/listener/wslistener.rs @@ -48,6 +48,7 @@ pub struct WsListener { #[rtype(result = "()")] pub struct DownStreamMessage { data: actix_web::web::Bytes, + //data: BytesMut, } /// Define HTTP actor @@ -99,7 +100,7 @@ impl WsConn { tmp_bytes.put_u16_le(msg_id); tmp_bytes.put_i32_le(seq_id); tmp_bytes.put_u16_le(magic_code); - tmp_bytes.put_u16_le(ext_len); + tmp_bytes.put_u16_le(self.socket_handle); //tmp_bytes.put_u16_le(ext_len); tmp_bytes.put_i32_le(0); tmp_bytes.put_i64_le(0); for i in 24..tmp_bytes.capacity() { @@ -201,10 +202,10 @@ async fn upstream_enter(stream_arc: Arc>, down_pack_queue: Arc< if r.is_readable() { // Try to read data, this may still fail with `WouldBlock` // if the readiness event is a false positive. - let mut data = BytesMut::with_capacity(1024 * 64 * 3); + let mut data = vec![0; 1024 * 64]; match stream.try_read(&mut data) { Ok(len) => { - println!("read {} bytes",len); + println!("read {0} bytes",len); for i in 0..len { recv_buf[recv_buf_len + i] = data[i]; //println!("sendmsg xxxxx idx:{0} val:{1} buf_val:{2} recv_buf_len:{3}", i, data[i], recv_buf[recv_buf_len + 1], recv_buf_len); @@ -233,19 +234,28 @@ async fn upstream_enter(stream_arc: Arc>, down_pack_queue: Arc< tmp_bytes.put_u8(recv_buf[(16 + i) as usize]); } let mut a_bytes = actix_web::web::Bytes::copy_from_slice(tmp_bytes.as_ref()); - let msg = GSResponse { - data: a_bytes - }; - //a.do_send(msg); - /* { - println!("sendmsg pack_len:{0} msg_id:{1} seq_id:{2} magic_code:{3} ext_len:{4}", - pack_len, - msg_id, - seq_id, + let node = Rc::new(RefCell::new(DownStreamPack{ + holder: Default::default(), + socket_handle: socket_handle, + data: tmp_bytes, + entry: r9::ListHead::< crate::app::user_app::DownStreamPack>::new_node(Default::default()), + })); + node.borrow_mut().holder = Some(node.clone()); + node.borrow_mut().entry = r9::ListHead::< crate::app::user_app::DownStreamPack>::new_node(Rc::downgrade(&node)); + up_pack_queue.lock().unwrap().push(&node.borrow_mut().entry); + } + //a.do_send(msg); + + { + println!("sendmsg socket_handle:{5} pack_len:{0} msg_id:{1} seq_id:{2} magic_code:{3} ext_len:{4}", + socket_handle, + pack_len, + msg_id, + seq_id, magic_code, ext_len); - }*/ + } } for i in (pack_len + 16) as usize..recv_buf_len { recv_buf[i - (pack_len + 16) as usize] = recv_buf[i]; @@ -366,6 +376,7 @@ impl WsListener { pub fn update(&self) { let mut data = bytes::BytesMut::new(); + let mut socket_handle = 0; { self.up_pack_queue.lock().unwrap().fetch(); //down_pack_queue.get_mut().fetch(); @@ -375,14 +386,25 @@ impl WsListener { let node = &work_list.borrow().first_entry(); node.upgrade().unwrap().borrow_mut().entry.borrow_mut().del_init(); data = node.upgrade().unwrap().borrow().data.clone(); + socket_handle = node.upgrade().unwrap().borrow().socket_handle; break; //stream.write_all(node.upgrade().unwrap().borrow().data.as_ref()).await; //stream.write_all(data.as_ref()).await; } } if data.len() > 0 { - println!("write_all3 len:{0}", data.len()); - //stream.write_all(data.as_ref()).await; + println!("write_all3 socket_handle:{0} len:{1}", socket_handle, data.len()); + match self.socket_hash.get(&socket_handle){ + Some(v) => { + let msg = DownStreamMessage{ + data: actix_web::web::Bytes::copy_from_slice(data.as_ref()) + }; + v.do_send(msg); + }, + None => { + + } + } } }