diff --git a/server/stat/src/listener/wslistener.rs b/server/stat/src/listener/wslistener.rs index 68f405a..1b4b7ea 100644 --- a/server/stat/src/listener/wslistener.rs +++ b/server/stat/src/listener/wslistener.rs @@ -15,7 +15,7 @@ use bytes::{BufMut, BytesMut}; use crate::app::UserApp; use crate::constant; use tokio::runtime::Runtime; -use crate::common::types::{DownStreamPack, DownStreamMessage, UpStreamPack}; +use crate::common::types::{DownStreamMessage}; use std::sync::OnceLock; use std::sync::atomic::{AtomicU16, Ordering}; diff --git a/server/stat/src/upstream/upstream.rs b/server/stat/src/upstream/upstream.rs index 1996253..2ad398c 100644 --- a/server/stat/src/upstream/upstream.rs +++ b/server/stat/src/upstream/upstream.rs @@ -21,6 +21,62 @@ pub struct UpStream { async fn upstream_enter(stream_arc: Arc>, down_pack_queue: Arc>>, up_pack_queue: Arc>>) { + let on_read = |len: usize, + recv_buf_len: &mut usize, + recv_buf: &mut BytesMut, + data: &Vec, + up_pack_queue: &Arc>>| { + println!("read {0} bytes recv_buf_len:{1}",len, recv_buf_len); + for i in 0..len { + recv_buf[*recv_buf_len + i] = data[i]; + //println!("sendmsg xxxxx idx:{0} val:{1} buf_val:{2} recv_buf_len:{3}", i, data[i], recv_buf[recv_buf_len + 1], recv_buf_len); + } + *recv_buf_len += len; + while *recv_buf_len >= 16 { + let pack_len = (recv_buf[0] as u16) + ((recv_buf[1] as u16) << 8); + let msg_id = (recv_buf[2] as u16) + ((recv_buf[3] as u16) << 8); + let seq_id = (recv_buf[4] as i32) + ((recv_buf[5] as i32) << 8) + + ((recv_buf[7] as i32) << 16) + ((recv_buf[7] as i32) << 24); + let magic_code = (recv_buf[8] as u16) + ((recv_buf[9] as u16) << 8); + let rpc_error_code = (recv_buf[10] as u16) + ((recv_buf[11] as u16) << 8); + let socket_handle = (recv_buf[12] as u16) + ((recv_buf[13] as u16) << 8); + let ext_len = (recv_buf[14] as u16) + ((recv_buf[15] as u16) << 8); + if ((pack_len + 16) as usize) > *recv_buf_len { + break; + } + let mut tmp_bytes = BytesMut::with_capacity((pack_len + 16) as usize); + { + tmp_bytes.put_u16_le(pack_len); + tmp_bytes.put_u16_le(msg_id); + tmp_bytes.put_i32_le(seq_id); + tmp_bytes.put_u16_le(magic_code); + tmp_bytes.put_u16_le(ext_len); + for i in 0..pack_len { + tmp_bytes.put_u8(recv_buf[(16 + i) as usize]); + } + let mut a_bytes = actix_web::web::Bytes::copy_from_slice(tmp_bytes.as_ref()); + { + let node = UpStreamPack::new(socket_handle, tmp_bytes); + up_pack_queue.lock().unwrap().push(&node.lock().unwrap().entry); + } + //a.do_send(msg); + + { + println!("sendmsg socket_handle:{0} pack_len:{1} msg_id:{2} seq_id:{3} magic_code:{4} ext_len:{5}", + socket_handle, + pack_len, + msg_id, + seq_id, + magic_code, + ext_len); + } + } + for i in (pack_len + 16) as usize..*recv_buf_len { + recv_buf[i - (pack_len + 16) as usize] = recv_buf[i]; + } + *recv_buf_len -= (pack_len + 16) as usize; + } + }; tokio::spawn(async move { println!("upstream_enter2"); let mut recv_buf = BytesMut::with_capacity(1024 * 64 * 3); @@ -43,56 +99,7 @@ async fn upstream_enter(stream_arc: Arc>, down_pac let mut data = vec![0; 1024 * 64]; match stream.try_read(&mut data) { Ok(len) => { - println!("read {0} bytes recv_buf_len:{1}",len, recv_buf_len); - for i in 0..len { - recv_buf[recv_buf_len + i] = data[i]; - //println!("sendmsg xxxxx idx:{0} val:{1} buf_val:{2} recv_buf_len:{3}", i, data[i], recv_buf[recv_buf_len + 1], recv_buf_len); - } - recv_buf_len += len; - while recv_buf_len >= 16 { - let pack_len = (recv_buf[0] as u16) + ((recv_buf[1] as u16) << 8); - let msg_id = (recv_buf[2] as u16) + ((recv_buf[3] as u16) << 8); - let seq_id = (recv_buf[4] as i32) + ((recv_buf[5] as i32) << 8) + - ((recv_buf[7] as i32) << 16) + ((recv_buf[7] as i32) << 24); - let magic_code = (recv_buf[8] as u16) + ((recv_buf[9] as u16) << 8); - let rpc_error_code = (recv_buf[10] as u16) + ((recv_buf[11] as u16) << 8); - let socket_handle = (recv_buf[12] as u16) + ((recv_buf[13] as u16) << 8); - let ext_len = (recv_buf[14] as u16) + ((recv_buf[15] as u16) << 8); - if ((pack_len + 16) as usize) > recv_buf_len { - break; - } - let mut tmp_bytes = BytesMut::with_capacity((pack_len + 16) as usize); - { - tmp_bytes.put_u16_le(pack_len); - tmp_bytes.put_u16_le(msg_id); - tmp_bytes.put_i32_le(seq_id); - tmp_bytes.put_u16_le(magic_code); - tmp_bytes.put_u16_le(ext_len); - for i in 0..pack_len { - tmp_bytes.put_u8(recv_buf[(16 + i) as usize]); - } - let mut a_bytes = actix_web::web::Bytes::copy_from_slice(tmp_bytes.as_ref()); - { - let node = UpStreamPack::new(socket_handle, tmp_bytes); - up_pack_queue.lock().unwrap().push(&node.lock().unwrap().entry); - } - //a.do_send(msg); - - { - println!("sendmsg socket_handle:{0} pack_len:{1} msg_id:{2} seq_id:{3} magic_code:{4} ext_len:{5}", - socket_handle, - pack_len, - msg_id, - seq_id, - magic_code, - ext_len); - } - } - for i in (pack_len + 16) as usize..recv_buf_len { - recv_buf[i - (pack_len + 16) as usize] = recv_buf[i]; - } - recv_buf_len -= (pack_len + 16) as usize; - } + on_read(len, &mut recv_buf_len, &mut recv_buf, &data, &up_pack_queue); } Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { println!("read error 1");