diff --git a/server/stat/src/main.rs b/server/stat/src/main.rs index b20b58b..6b02809 100755 --- a/server/stat/src/main.rs +++ b/server/stat/src/main.rs @@ -122,37 +122,72 @@ impl StreamHandler> for MyWs { fn started(&mut self, ctx: &mut Self::Context) { let a: Recipient = ctx.address().recipient(); + let mut recv_buf = BytesMut::with_capacity(1024 * 64 * 3); + let mut recv_buf_len = 0; + { + for i in 0..recv_buf.capacity() { + recv_buf.put_i8(0); + } + } let mut recv_queue = self.recv_queue.clone(); tokio::spawn(async move { loop { while Arc::downgrade(&mut recv_queue).upgrade().unwrap().lock().unwrap().borrow().len() > 0 { - let bytes = Arc::downgrade(&mut recv_queue).upgrade().unwrap().lock().unwrap().borrow_mut().pop().clone(); + let bytes = Arc::downgrade(&mut recv_queue).upgrade().unwrap().lock().unwrap().borrow_mut().pop(); let len = bytes.clone().unwrap().clone().len(); { - println!("sendmsg len:{0}", len); + //println!("sendmsg 1111111111111111111111111111111 len:{0}", len); } { let data = bytes.unwrap(); - let pack_len = (data.as_ref()[0] as u16)+ ((data.as_ref()[1] as u16) << 8); - let msg_id = (data.as_ref()[2] as u16)+ ((data.as_ref()[3] as u16) << 8); - let seq_id = (data.as_ref()[4] as i32)+ ((data.as_ref()[5] as i32) << 8) + - ((data.as_ref()[7] as i32) << 16)+ ((data.as_ref()[7] as i32) << 24); - let magic_code = (data.as_ref()[8] as u16)+ ((data.as_ref()[9] as u16) << 8); - let rpc_error_code = (data.as_ref()[10] as u16)+ ((data.as_ref()[11] as u16) << 8); - let socket_handle = (data.as_ref()[12] as u16)+ ((data.as_ref()[13] as u16) << 8); - let ext_len = (data.as_ref()[14] as u16)+ ((data.as_ref()[15] as u16) << 8); - if (pack_len + 16) as usize != data.len() { - panic!("errro len"); + for i in 0..len { + recv_buf[recv_buf_len + i] = data[i]; + //println!("sendmsg xxxxx idx:{0} val:{1} buf_val:{2} recv_buf_len:{3}", i, data[i], recv_buf[recv_buf_len + 1], recv_buf_len); + } + recv_buf_len += len; + while recv_buf_len >= 16 { + let pack_len = (recv_buf[0] as u16) + ((recv_buf[1] as u16) << 8); + let msg_id = (recv_buf[2] as u16) + ((recv_buf[3] as u16) << 8); + let seq_id = (recv_buf[4] as i32) + ((recv_buf[5] as i32) << 8) + + ((recv_buf[7] as i32) << 16) + ((recv_buf[7] as i32) << 24); + let magic_code = (recv_buf[8] as u16) + ((recv_buf[9] as u16) << 8); + let rpc_error_code = (recv_buf[10] as u16) + ((recv_buf[11] as u16) << 8); + let socket_handle = (recv_buf[12] as u16) + ((recv_buf[13] as u16) << 8); + let ext_len = (recv_buf[14] as u16) + ((recv_buf[15] as u16) << 8); + if ((pack_len + 16) as usize) > recv_buf_len { + break; + } + let mut tmp_bytes = BytesMut::with_capacity((pack_len + 16) as usize); + { + tmp_bytes.put_u16_le(pack_len); + tmp_bytes.put_u16_le(msg_id); + tmp_bytes.put_i32_le(seq_id); + tmp_bytes.put_u16_le(magic_code); + tmp_bytes.put_u16_le(ext_len); + for i in 0..pack_len { + tmp_bytes.put_u8(recv_buf[(16 + i) as usize]); + } + let mut a_bytes = actix_web::web::Bytes::copy_from_slice(tmp_bytes.as_ref()); + let msg = GSResponse { + data: a_bytes + }; + a.do_send(msg); + /* + { + println!("sendmsg pack_len:{0} msg_id:{1} seq_id:{2} magic_code:{3} ext_len:{4}", + pack_len, + msg_id, + seq_id, + magic_code, + ext_len); + }*/ + } + for i in (pack_len + 16) as usize..recv_buf_len { + recv_buf[i - (pack_len + 16) as usize] = recv_buf[i]; + } + recv_buf_len -= (pack_len + 16) as usize; } } - /* - { - let mut a_bytes = actix_web::web::Bytes::copy_from_slice(bytes.unwrap().as_ref()); - let msg = GSResponse { - data: a_bytes - }; - a.send(msg); - }*/ } tokio::time::sleep(Duration::from_millis(10)).await; } @@ -165,37 +200,18 @@ impl StreamHandler> for MyWs { Ok(ws::Message::Ping(msg)) => ctx.pong(&msg), Ok(ws::Message::Text(text)) => ctx.text(text), Ok(ws::Message::Binary(bin)) => { - ///Arc::downgrade(&mut self.send_queue).upgrade().unwrap().lock().unwrap().borrow_mut().push(bin.clone()); - println!("recv_buf:{0}", bin.len()); - //ctx.binary(bin); - //self.down_stream.write_all(&bin); self.parse_pkt(&bin); - //ctx.binary(bin); }, _ => (), } } } -#[derive(Message)] -#[rtype(result = "()")] -pub struct Message(pub String); - #[derive(Message)] #[rtype(result = "()")] pub struct GSResponse { data: actix_web::web::Bytes, } - -impl Handler for MyWs { - type Result = (); - - - fn handle(&mut self, msg: Message, _: &mut ws::WebsocketContext) { - //self.send_message(&msg.room, msg.msg.as_str(), msg.id); - } -} - impl Handler for MyWs { type Result = (); @@ -239,7 +255,7 @@ async fn index(req: HttpRequest, stream: web::Payload, data: web::Data { - println!("read {} bytes", n); + //println!("read {} bytes", n); let mut tmp_bytes = BytesMut::with_capacity((n + 0) as usize); for i in 0..n { tmp_bytes.put_u8(data[i]); @@ -265,7 +281,7 @@ async fn index(req: HttpRequest, stream: web::Payload, data: web::Data