From 71f7a51ef8c5b4ecf17bf4815a568f2c870c1bc3 Mon Sep 17 00:00:00 2001 From: azw Date: Sun, 12 May 2024 20:22:43 +0800 Subject: [PATCH] 1 --- server/stat/src/upstream/upstream.rs | 131 ++++++++++++++++++++++++++- 1 file changed, 127 insertions(+), 4 deletions(-) diff --git a/server/stat/src/upstream/upstream.rs b/server/stat/src/upstream/upstream.rs index 0cbb0c7..c3fd150 100644 --- a/server/stat/src/upstream/upstream.rs +++ b/server/stat/src/upstream/upstream.rs @@ -1,6 +1,10 @@ use std::sync::{Arc, Mutex}; use tokio::runtime::Runtime; use tokio::net::TcpStream; +use tokio::io::{Interest}; +use tokio::io::{AsyncWriteExt}; +use bytes::{BufMut, BytesMut}; +use std::time::Duration; use r9; use crate::common::{DownStreamPack, UpStreamPack}; pub struct UpStream { @@ -13,6 +17,125 @@ pub struct UpStream { up_pack_queue: Arc>>, } +async fn upstream_enter(stream_arc: Arc>, down_pack_queue: Arc>>, + up_pack_queue: Arc>>) { + tokio::spawn(async move { + println!("upstream_enter2"); + let mut recv_buf = BytesMut::with_capacity(1024 * 64 * 3); + let mut recv_buf_len = 0; + { + for i in 0..recv_buf.capacity() { + recv_buf.put_i8(0); + } + } + loop { + tokio::time::sleep(Duration::from_millis(10)).await; + let mut stream = stream_arc.lock().await; + let ready = stream.ready(Interest::READABLE | Interest::WRITABLE).await; + match ready { + Ok(r) => { + { + if r.is_readable() { + // Try to read data, this may still fail with `WouldBlock` + // if the readiness event is a false positive. + let mut data = vec![0; 1024 * 64]; + match stream.try_read(&mut data) { + Ok(len) => { + println!("read {0} bytes recv_buf_len:{1}",len, recv_buf_len); + for i in 0..len { + recv_buf[recv_buf_len + i] = data[i]; + //println!("sendmsg xxxxx idx:{0} val:{1} buf_val:{2} recv_buf_len:{3}", i, data[i], recv_buf[recv_buf_len + 1], recv_buf_len); + } + recv_buf_len += len; + while recv_buf_len >= 16 { + let pack_len = (recv_buf[0] as u16) + ((recv_buf[1] as u16) << 8); + let msg_id = (recv_buf[2] as u16) + ((recv_buf[3] as u16) << 8); + let seq_id = (recv_buf[4] as i32) + ((recv_buf[5] as i32) << 8) + + ((recv_buf[7] as i32) << 16) + ((recv_buf[7] as i32) << 24); + let magic_code = (recv_buf[8] as u16) + ((recv_buf[9] as u16) << 8); + let rpc_error_code = (recv_buf[10] as u16) + ((recv_buf[11] as u16) << 8); + let socket_handle = (recv_buf[12] as u16) + ((recv_buf[13] as u16) << 8); + let ext_len = (recv_buf[14] as u16) + ((recv_buf[15] as u16) << 8); + if ((pack_len + 16) as usize) > recv_buf_len { + break; + } + let mut tmp_bytes = BytesMut::with_capacity((pack_len + 16) as usize); + { + tmp_bytes.put_u16_le(pack_len); + tmp_bytes.put_u16_le(msg_id); + tmp_bytes.put_i32_le(seq_id); + tmp_bytes.put_u16_le(magic_code); + tmp_bytes.put_u16_le(ext_len); + for i in 0..pack_len { + tmp_bytes.put_u8(recv_buf[(16 + i) as usize]); + } + let mut a_bytes = actix_web::web::Bytes::copy_from_slice(tmp_bytes.as_ref()); + { + let node = UpStreamPack::new(socket_handle, tmp_bytes); + up_pack_queue.lock().unwrap().push(&node.lock().unwrap().entry); + } + //a.do_send(msg); + + { + println!("sendmsg socket_handle:{0} pack_len:{1} msg_id:{2} seq_id:{3} magic_code:{4} ext_len:{5}", + socket_handle, + pack_len, + msg_id, + seq_id, + magic_code, + ext_len); + } + } + for i in (pack_len + 16) as usize..recv_buf_len { + recv_buf[i - (pack_len + 16) as usize] = recv_buf[i]; + } + recv_buf_len -= (pack_len + 16) as usize; + } + } + Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { + println!("read error 1"); + continue; + } + Err(e) => { + println!("read error 2"); + //return Err(e.into()); + } + } + } + } + + { + if r.is_writable() { + let mut data = bytes::BytesMut::new(); + { + down_pack_queue.lock().unwrap().fetch(); + //down_pack_queue.get_mut().fetch(); + let work_list = down_pack_queue.lock().unwrap().work_list.lock().unwrap().clone(); + //let work_list = down_pack_queue.get_mut().work_list.lock().unwrap().clone(); + while !work_list.lock().unwrap().empty() { + let node = &work_list.lock().unwrap().first_entry(); + node.upgrade().unwrap().lock().unwrap().entry.lock().unwrap().del_init(); + data = node.upgrade().unwrap().lock().unwrap().data.clone(); + break; + //stream.write_all(node.upgrade().unwrap().borrow().data.as_ref()).await; + //stream.write_all(data.as_ref()).await; + } + } + if data.len() > 0 { + println!("write_all2 len:{0}", data.len()); + stream.write_all(data.as_ref()).await; + } + } + } + } + Err(e) => { + + } + } + } + }); +} + impl UpStream { pub fn new(instance_id: i32, remote_ip: String, remote_port: i32) -> Self { @@ -37,12 +160,12 @@ impl UpStream { let mut result = TcpStream::connect(address).await; match result { Ok(v) => { - let a = Arc::new(Mutex::new(v)); - println!("connect ok"); - //upstream_enter(a, down_pack_queue, up_pack_queue).await; + let a = Arc::new(tokio::sync::Mutex::new(v)); + println!("connect2 ok"); + upstream_enter(a, down_pack_queue, up_pack_queue).await; }, Err(e) => { - println!("connect err") + println!("connect2 err") } } });