From 6131ac93a928ae3d14cc0f814b74b8c9b149b3bc Mon Sep 17 00:00:00 2001 From: azw Date: Mon, 6 May 2024 09:15:09 +0800 Subject: [PATCH] 1 --- server/stat/src/main.rs | 44 +++++++++++++++++++++++++++++++---------- 1 file changed, 34 insertions(+), 10 deletions(-) diff --git a/server/stat/src/main.rs b/server/stat/src/main.rs index 78057ae..739e8e2 100755 --- a/server/stat/src/main.rs +++ b/server/stat/src/main.rs @@ -72,21 +72,33 @@ impl MyWs { ((self.recv_buf[7] as i32) << 16)+ ((self.recv_buf[7] as i32) << 24); let magic_code = (self.recv_buf[8] as u16)+ ((self.recv_buf[9] as u16) << 8); let ext_len = (self.recv_buf[10] as u16)+ ((self.recv_buf[11] as u16) << 8); - let mut tmp_bytes = BytesMut::with_capacity((pack_len + 16) as usize); + let mut tmp_bytes = BytesMut::with_capacity((pack_len + 24) as usize); { - tmp_bytes.put_u16(pack_len); - tmp_bytes.put_u16(msg_id); - tmp_bytes.put_i32(seq_id); - tmp_bytes.put_u16(magic_code); - tmp_bytes.put_u16(ext_len); + tmp_bytes.put_u16_le(pack_len); + tmp_bytes.put_u16_le(msg_id); + tmp_bytes.put_i32_le(seq_id); + tmp_bytes.put_u16_le(magic_code); + tmp_bytes.put_u16_le(ext_len); + tmp_bytes.put_i32_le(0); + tmp_bytes.put_u16_le(0); + tmp_bytes.put_u16_le(0); + tmp_bytes.put_i64_le(0); + /*tmp_bytes.put_u16(0); + tmp_bytes.put_u16(0); tmp_bytes.put_i32(0); - for i in 16..tmp_bytes.capacity() { + tmp_bytes.put_u16(0); + tmp_bytes.put_u16(0); + tmp_bytes.put_i32(0); + tmp_bytes.put_i64(0);*/ + for i in 24..tmp_bytes.capacity() { tmp_bytes.put_i8(0); } - tmp_bytes[16 .. (16 + pack_len as usize)].copy_from_slice(&bin[12..(12 + pack_len as usize)]); + tmp_bytes[24 .. (24+ pack_len as usize)].copy_from_slice(&bin[12..(12 + pack_len as usize)]); + println!("recv buf_len:{0}", tmp_bytes.len()); } { - Arc::downgrade(&mut self.send_queue).upgrade().unwrap().lock().unwrap().borrow_mut().push(tmp_bytes); + Arc::downgrade(&mut self.send_queue).upgrade().unwrap().lock().unwrap().borrow_mut().insert(0, tmp_bytes); + //Arc::downgrade(&mut self.send_queue).upgrade().unwrap().lock().unwrap().borrow_mut().push(tmp_bytes); } offset += (pack_len as usize) + 12; } @@ -97,6 +109,11 @@ impl MyWs { if offset > 0 && offset < self.recv_buf_len { self.recv_buf[..0].copy_from_slice(&bin); } + self.recv_buf_len -= offset; + if already_read_bytes >= bin.len() { + already_read_bytes = 0; + break; + } } } } @@ -180,8 +197,15 @@ async fn index(req: HttpRequest, stream: web::Payload, data: web::Data 0 { + //Arc::downgrade(&mut send_queue).upgrade().unwrap().lock().unwrap().borrow_mut()[0] let bytes = Arc::downgrade(&mut send_queue).upgrade().unwrap().lock().unwrap().borrow_mut().pop(); - down_stream.write_all(bytes.unwrap().as_ref()).await; + let len = bytes.clone().unwrap().clone().len(); + { + println!("sendmsg len:{0}", len); + } + { + down_stream.write_all(bytes.unwrap().as_ref()).await; + } } } }