diff --git a/server/stat/src/app/user_app.rs b/server/stat/src/app/user_app.rs index 9f84615..7b32594 100644 --- a/server/stat/src/app/user_app.rs +++ b/server/stat/src/app/user_app.rs @@ -13,7 +13,7 @@ use r9::xtimer::TimerList; //use crate::ss; pub struct DownStreamPack { - holder: Option>>, + pub holder: Option>>, pub socket_handle: u16, pub data: bytes::BytesMut, pub entry: Rc::>>, @@ -65,6 +65,7 @@ impl f9::app::UserApp for UserApp { node.upgrade().unwrap().borrow_mut().entry.borrow_mut().del_init(); WsListener::instance().borrow().add_down_stream_pack(node.clone()); } + WsListener::instance().borrow().update(); } fn uninit(&mut self) { diff --git a/server/stat/src/listener/wslistener.rs b/server/stat/src/listener/wslistener.rs index 78480e6..23812c7 100644 --- a/server/stat/src/listener/wslistener.rs +++ b/server/stat/src/listener/wslistener.rs @@ -41,6 +41,7 @@ pub struct WsListener { pub upstream: std::sync::Weak>, tokio_rt: Runtime, down_pack_queue: Arc>>, + up_pack_queue: Arc>>, } #[derive(Message)] @@ -179,10 +180,17 @@ async fn index(req: HttpRequest, stream: web::Payload) -> Result>, down_pack_queue: Arc>>,) { - println!("upstream_enter1"); +async fn upstream_enter(stream_arc: Arc>, down_pack_queue: Arc>>, + up_pack_queue: Arc>>) { tokio::spawn(async move { println!("upstream_enter2"); + let mut recv_buf = BytesMut::with_capacity(1024 * 64 * 3); + let mut recv_buf_len = 0; + { + for i in 0..recv_buf.capacity() { + recv_buf.put_i8(0); + } + } loop { tokio::time::sleep(Duration::from_millis(10)).await; let mut stream = stream_arc.lock().await; @@ -191,17 +199,59 @@ async fn upstream_enter(stream_arc: Arc>, down_pack_queue: Arc< Ok(r) => { { if r.is_readable() { - let mut data = vec![0; 1024 * 64]; // Try to read data, this may still fail with `WouldBlock` // if the readiness event is a false positive. + let mut data = BytesMut::with_capacity(1024 * 64 * 3); match stream.try_read(&mut data) { - Ok(n) => { - //println!("read {} bytes", n); - let mut tmp_bytes = BytesMut::with_capacity((n + 0) as usize); - for i in 0..n { - tmp_bytes.put_u8(data[i]); + Ok(len) => { + println!("read {} bytes",len); + for i in 0..len { + recv_buf[recv_buf_len + i] = data[i]; + //println!("sendmsg xxxxx idx:{0} val:{1} buf_val:{2} recv_buf_len:{3}", i, data[i], recv_buf[recv_buf_len + 1], recv_buf_len); + } + recv_buf_len += len; + while recv_buf_len >= 16 { + let pack_len = (recv_buf[0] as u16) + ((recv_buf[1] as u16) << 8); + let msg_id = (recv_buf[2] as u16) + ((recv_buf[3] as u16) << 8); + let seq_id = (recv_buf[4] as i32) + ((recv_buf[5] as i32) << 8) + + ((recv_buf[7] as i32) << 16) + ((recv_buf[7] as i32) << 24); + let magic_code = (recv_buf[8] as u16) + ((recv_buf[9] as u16) << 8); + let rpc_error_code = (recv_buf[10] as u16) + ((recv_buf[11] as u16) << 8); + let socket_handle = (recv_buf[12] as u16) + ((recv_buf[13] as u16) << 8); + let ext_len = (recv_buf[14] as u16) + ((recv_buf[15] as u16) << 8); + if ((pack_len + 16) as usize) > recv_buf_len { + break; + } + let mut tmp_bytes = BytesMut::with_capacity((pack_len + 16) as usize); + { + tmp_bytes.put_u16_le(pack_len); + tmp_bytes.put_u16_le(msg_id); + tmp_bytes.put_i32_le(seq_id); + tmp_bytes.put_u16_le(magic_code); + tmp_bytes.put_u16_le(ext_len); + for i in 0..pack_len { + tmp_bytes.put_u8(recv_buf[(16 + i) as usize]); + } + let mut a_bytes = actix_web::web::Bytes::copy_from_slice(tmp_bytes.as_ref()); + let msg = GSResponse { + data: a_bytes + }; + //a.do_send(msg); + /* + { + println!("sendmsg pack_len:{0} msg_id:{1} seq_id:{2} magic_code:{3} ext_len:{4}", + pack_len, + msg_id, + seq_id, + magic_code, + ext_len); + }*/ + } + for i in (pack_len + 16) as usize..recv_buf_len { + recv_buf[i - (pack_len + 16) as usize] = recv_buf[i]; + } + recv_buf_len -= (pack_len + 16) as usize; } - //Arc::downgrade(&mut recv_queue).upgrade().unwrap().lock().unwrap().borrow_mut().insert(0, tmp_bytes); } Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { println!("read error 1"); @@ -261,6 +311,7 @@ impl WsListener { .build() .unwrap(), down_pack_queue: r9::Queue::::new_ex(), + up_pack_queue: r9::Queue::::new_ex(), }; return p; } @@ -293,16 +344,15 @@ impl WsListener { { let upstream = self.upstream.clone(); let down_pack_queue = self.down_pack_queue.clone(); + let up_pack_queue = self.up_pack_queue.clone(); self.tokio_rt.spawn(async move { let mut result = TcpStream::connect("192.168.100.39:7616").await; match result { Ok(v) => { - //upstream.lock().unwrap().replace(v); let a = Arc::new(Mutex::new(v)); - //upstream = Arc::downgrade(&a); WsListener::instance().borrow_mut().upstream = Arc::downgrade(&a); println!("connect ok"); - upstream_enter(a, down_pack_queue).await; + upstream_enter(a, down_pack_queue, up_pack_queue).await; }, Err(e) => { println!("connect err") @@ -314,6 +364,28 @@ impl WsListener { } } + pub fn update(&self) { + let mut data = bytes::BytesMut::new(); + { + self.up_pack_queue.lock().unwrap().fetch(); + //down_pack_queue.get_mut().fetch(); + let work_list = self.up_pack_queue.lock().unwrap().work_list.lock().unwrap().clone(); + //let work_list = down_pack_queue.get_mut().work_list.lock().unwrap().clone(); + while !work_list.borrow().empty() { + let node = &work_list.borrow().first_entry(); + node.upgrade().unwrap().borrow_mut().entry.borrow_mut().del_init(); + data = node.upgrade().unwrap().borrow().data.clone(); + break; + //stream.write_all(node.upgrade().unwrap().borrow().data.as_ref()).await; + //stream.write_all(data.as_ref()).await; + } + } + if data.len() > 0 { + println!("write_all3 len:{0}", data.len()); + //stream.write_all(data.as_ref()).await; + } + } + pub fn uninit(&mut self) { }