diff --git a/server/stat/src/upstream/upstream.rs b/server/stat/src/upstream/upstream.rs index 2ad398c..76b7ca2 100644 --- a/server/stat/src/upstream/upstream.rs +++ b/server/stat/src/upstream/upstream.rs @@ -19,64 +19,86 @@ pub struct UpStream { pub up_pack_queue: Arc>>, } +async fn on_write(stream: &mut TcpStream, down_pack_queue: &Arc>>) { + let mut data = bytes::BytesMut::new(); + { + down_pack_queue.lock().unwrap().fetch(); + //down_pack_queue.get_mut().fetch(); + let work_list = down_pack_queue.lock().unwrap().work_list.lock().unwrap().clone(); + //let work_list = down_pack_queue.get_mut().work_list.lock().unwrap().clone(); + while !work_list.lock().unwrap().empty() { + let node = &work_list.lock().unwrap().first_entry(); + node.upgrade().unwrap().lock().unwrap().entry.lock().unwrap().del_init(); + data = node.upgrade().unwrap().lock().unwrap().data.clone(); + break; + //stream.write_all(node.upgrade().unwrap().borrow().data.as_ref()).await; + //stream.write_all(data.as_ref()).await; + } + } + if data.len() > 0 { + println!("write_all2 len:{0}", data.len()); + stream.write_all(data.as_ref()).await; + } +} + +fn on_read(len: usize, + recv_buf_len: &mut usize, + recv_buf: &mut BytesMut, + data: &Vec, + up_pack_queue: &Arc>>) { + println!("read {0} bytes recv_buf_len:{1}",len, recv_buf_len); + for i in 0..len { + recv_buf[*recv_buf_len + i] = data[i]; + //println!("sendmsg xxxxx idx:{0} val:{1} buf_val:{2} recv_buf_len:{3}", i, data[i], recv_buf[recv_buf_len + 1], recv_buf_len); + } + *recv_buf_len += len; + while *recv_buf_len >= 16 { + let pack_len = (recv_buf[0] as u16) + ((recv_buf[1] as u16) << 8); + let msg_id = (recv_buf[2] as u16) + ((recv_buf[3] as u16) << 8); + let seq_id = (recv_buf[4] as i32) + ((recv_buf[5] as i32) << 8) + + ((recv_buf[7] as i32) << 16) + ((recv_buf[7] as i32) << 24); + let magic_code = (recv_buf[8] as u16) + ((recv_buf[9] as u16) << 8); + let rpc_error_code = (recv_buf[10] as u16) + ((recv_buf[11] as u16) << 8); + let socket_handle = (recv_buf[12] as u16) + ((recv_buf[13] as u16) << 8); + let ext_len = (recv_buf[14] as u16) + ((recv_buf[15] as u16) << 8); + if ((pack_len + 16) as usize) > *recv_buf_len { + break; + } + let mut tmp_bytes = BytesMut::with_capacity((pack_len + 16) as usize); + { + tmp_bytes.put_u16_le(pack_len); + tmp_bytes.put_u16_le(msg_id); + tmp_bytes.put_i32_le(seq_id); + tmp_bytes.put_u16_le(magic_code); + tmp_bytes.put_u16_le(ext_len); + for i in 0..pack_len { + tmp_bytes.put_u8(recv_buf[(16 + i) as usize]); + } + let mut a_bytes = actix_web::web::Bytes::copy_from_slice(tmp_bytes.as_ref()); + { + let node = UpStreamPack::new(socket_handle, tmp_bytes); + up_pack_queue.lock().unwrap().push(&node.lock().unwrap().entry); + } + //a.do_send(msg); + + { + println!("sendmsg socket_handle:{0} pack_len:{1} msg_id:{2} seq_id:{3} magic_code:{4} ext_len:{5}", + socket_handle, + pack_len, + msg_id, + seq_id, + magic_code, + ext_len); + } + } + for i in (pack_len + 16) as usize..*recv_buf_len { + recv_buf[i - (pack_len + 16) as usize] = recv_buf[i]; + } + *recv_buf_len -= (pack_len + 16) as usize; + } +} async fn upstream_enter(stream_arc: Arc>, down_pack_queue: Arc>>, up_pack_queue: Arc>>) { - let on_read = |len: usize, - recv_buf_len: &mut usize, - recv_buf: &mut BytesMut, - data: &Vec, - up_pack_queue: &Arc>>| { - println!("read {0} bytes recv_buf_len:{1}",len, recv_buf_len); - for i in 0..len { - recv_buf[*recv_buf_len + i] = data[i]; - //println!("sendmsg xxxxx idx:{0} val:{1} buf_val:{2} recv_buf_len:{3}", i, data[i], recv_buf[recv_buf_len + 1], recv_buf_len); - } - *recv_buf_len += len; - while *recv_buf_len >= 16 { - let pack_len = (recv_buf[0] as u16) + ((recv_buf[1] as u16) << 8); - let msg_id = (recv_buf[2] as u16) + ((recv_buf[3] as u16) << 8); - let seq_id = (recv_buf[4] as i32) + ((recv_buf[5] as i32) << 8) + - ((recv_buf[7] as i32) << 16) + ((recv_buf[7] as i32) << 24); - let magic_code = (recv_buf[8] as u16) + ((recv_buf[9] as u16) << 8); - let rpc_error_code = (recv_buf[10] as u16) + ((recv_buf[11] as u16) << 8); - let socket_handle = (recv_buf[12] as u16) + ((recv_buf[13] as u16) << 8); - let ext_len = (recv_buf[14] as u16) + ((recv_buf[15] as u16) << 8); - if ((pack_len + 16) as usize) > *recv_buf_len { - break; - } - let mut tmp_bytes = BytesMut::with_capacity((pack_len + 16) as usize); - { - tmp_bytes.put_u16_le(pack_len); - tmp_bytes.put_u16_le(msg_id); - tmp_bytes.put_i32_le(seq_id); - tmp_bytes.put_u16_le(magic_code); - tmp_bytes.put_u16_le(ext_len); - for i in 0..pack_len { - tmp_bytes.put_u8(recv_buf[(16 + i) as usize]); - } - let mut a_bytes = actix_web::web::Bytes::copy_from_slice(tmp_bytes.as_ref()); - { - let node = UpStreamPack::new(socket_handle, tmp_bytes); - up_pack_queue.lock().unwrap().push(&node.lock().unwrap().entry); - } - //a.do_send(msg); - - { - println!("sendmsg socket_handle:{0} pack_len:{1} msg_id:{2} seq_id:{3} magic_code:{4} ext_len:{5}", - socket_handle, - pack_len, - msg_id, - seq_id, - magic_code, - ext_len); - } - } - for i in (pack_len + 16) as usize..*recv_buf_len { - recv_buf[i - (pack_len + 16) as usize] = recv_buf[i]; - } - *recv_buf_len -= (pack_len + 16) as usize; - } - }; tokio::spawn(async move { println!("upstream_enter2"); let mut recv_buf = BytesMut::with_capacity(1024 * 64 * 3); @@ -115,25 +137,7 @@ async fn upstream_enter(stream_arc: Arc>, down_pac { if r.is_writable() { - let mut data = bytes::BytesMut::new(); - { - down_pack_queue.lock().unwrap().fetch(); - //down_pack_queue.get_mut().fetch(); - let work_list = down_pack_queue.lock().unwrap().work_list.lock().unwrap().clone(); - //let work_list = down_pack_queue.get_mut().work_list.lock().unwrap().clone(); - while !work_list.lock().unwrap().empty() { - let node = &work_list.lock().unwrap().first_entry(); - node.upgrade().unwrap().lock().unwrap().entry.lock().unwrap().del_init(); - data = node.upgrade().unwrap().lock().unwrap().data.clone(); - break; - //stream.write_all(node.upgrade().unwrap().borrow().data.as_ref()).await; - //stream.write_all(data.as_ref()).await; - } - } - if data.len() > 0 { - println!("write_all2 len:{0}", data.len()); - stream.write_all(data.as_ref()).await; - } + on_write(&mut stream, &down_pack_queue).await; } } }