From f9053e8740bdd9a3988d4023ea110bbcbe568772 Mon Sep 17 00:00:00 2001 From: azw Date: Sun, 5 May 2024 18:57:22 +0800 Subject: [PATCH] 1 --- server/stat/src/main.rs | 117 +++++++++++++++++++++++++--------------- 1 file changed, 74 insertions(+), 43 deletions(-) diff --git a/server/stat/src/main.rs b/server/stat/src/main.rs index d64ae46..be0f693 100755 --- a/server/stat/src/main.rs +++ b/server/stat/src/main.rs @@ -22,6 +22,9 @@ use tokio::io::{AsyncWriteExt, Ready}; use tokio::net::TcpStream; use std::sync::Arc; use tokio::io::Interest; +use std::vec::Vec; +use std::cell::RefCell; +use std::io::Read; const MAX_PACKET_LEN: usize = 1024 * 64; @@ -29,12 +32,60 @@ const MAX_PACKET_LEN: usize = 1024 * 64; struct MyWs { recv_buf_len: usize, recv_buf: BytesMut, + send_queue: Arc>>>, } impl Actor for MyWs { type Context = ws::WebsocketContext; } +impl MyWs { + fn parse_pkt(&mut self, bin: &actix_web::web::Bytes) { + let buf_len = bin.len(); + if buf_len > 0 { + let mut already_read_bytes: usize = 0; + { + for i in 0..1024 * 64 * 2 { + self.recv_buf.put_i8(0); + } + } + while true { + let mut read_bytes = buf_len - already_read_bytes; + if read_bytes > MAX_PACKET_LEN - self.recv_buf_len { + read_bytes = MAX_PACKET_LEN - self.recv_buf_len; + } + if read_bytes > 0 { + self.recv_buf[self.recv_buf_len..bin.len()].copy_from_slice(&bin); + self.recv_buf_len += read_bytes; + already_read_bytes += read_bytes; + } + + let mut offset = 0; + let mut pre_offect = 0; + while true { + pre_offect = offset; + { + let pack_len = (self.recv_buf[0] as u16)+ ((self.recv_buf[1] as u16) << 8); + let msg_id = (self.recv_buf[2] as u16)+ ((self.recv_buf[3] as u16) << 8); + let seq_id = (self.recv_buf[4] as i32)+ ((self.recv_buf[5] as i32) << 8) + + ((self.recv_buf[7] as i32) << 16)+ ((self.recv_buf[7] as i32) << 24); + let magic_code = (self.recv_buf[8] as u16)+ ((self.recv_buf[9] as u16) << 8); + let ext_len = (self.recv_buf[10] as u16)+ ((self.recv_buf[11] as u16) << 8); + let a = 30; + } + if pre_offect >= offset || offset >= self.recv_buf_len { + break; + } + }//end while + if offset > 0 && offset < self.recv_buf_len { + //self.recv_buf[..0].copy_from_slice(&bin); + } + } + } + } + +} + /// Handler for ws::Message message impl StreamHandler> for MyWs { @@ -43,51 +94,18 @@ impl StreamHandler> for MyWs { //let b = 100; } + fn handle(&mut self, msg: Result, ctx: &mut Self::Context) { match msg { Ok(ws::Message::Ping(msg)) => ctx.pong(&msg), Ok(ws::Message::Text(text)) => ctx.text(text), Ok(ws::Message::Binary(bin)) => { - println!("recv_buf:{0}", bin.len()); + ///Arc::downgrade(&mut self.send_queue).upgrade().unwrap().lock().unwrap().borrow_mut().push(bin.clone()); + //println!("recv_buf:{0}", bin.len()); + //ctx.binary(bin); //self.down_stream.write_all(&bin); - /* - let buf_len = bin.len(); - if buf_len > 0 { - let mut already_read_bytes: usize = 0; - { - for i in 0..1024*64*2 { - self.recv_buf.put_i8(0); - } - } - while true { - let mut read_bytes = buf_len - already_read_bytes; - if read_bytes > MAX_PACKET_LEN - self.recv_buf_len { - read_bytes = MAX_PACKET_LEN - self.recv_buf_len; - } - if read_bytes > 0 { - self.recv_buf[self.recv_buf_len..bin.len()].copy_from_slice(&bin); - self.recv_buf_len += read_bytes; - already_read_bytes += read_bytes; - } - - let mut offset = 0; - let mut pre_offect = 0; - while true { - pre_offect = offset; - { - let pack_len = (self.recv_buf[0] as u16)+ ((self.recv_buf[1] as u16) << 8); - let msg_id = (self.recv_buf[2] as u16)+ ((self.recv_buf[3] as u16) << 8); - let seq_id = self.recv_buf.get_u16(); - } - if pre_offect >= offset || offset >= self.recv_buf_len { - break; - } - }//end while - if offset > 0 && offset < self.recv_buf_len { - //self.recv_buf[..0].copy_from_slice(&bin); - } - } - }*/ + self.parse_pkt(&bin); + ctx.binary(bin); }, _ => (), } @@ -122,23 +140,32 @@ async fn index(req: HttpRequest, stream: web::Payload, data: web::Data { { - if r.is_readable() {} + if r.is_readable() { + let a = 100; + } } { if r.is_writable() { - down_stream.write_all(b"hello world!").await; + while Arc::downgrade(&mut send_queue).upgrade().unwrap().lock().unwrap().borrow().len() > 0 { + let bytes = Arc::downgrade(&mut send_queue).upgrade().unwrap().lock().unwrap().borrow_mut().pop(); + down_stream.write_all(bytes.unwrap().as_ref()).await; + } } } } @@ -156,7 +183,11 @@ async fn index(req: HttpRequest, stream: web::Payload, data: web::Data