From 38a8b35364e29650dba1e2dc406020f586bbd2b3 Mon Sep 17 00:00:00 2001 From: azw Date: Tue, 7 May 2024 16:48:47 +0800 Subject: [PATCH] 1 --- server/stat/src/main.rs | 66 ++++++++++++++++++++++++++++++++++++----- 1 file changed, 59 insertions(+), 7 deletions(-) diff --git a/server/stat/src/main.rs b/server/stat/src/main.rs index b43ddc9..6cd9336 100755 --- a/server/stat/src/main.rs +++ b/server/stat/src/main.rs @@ -25,6 +25,8 @@ use tokio::io::Interest; use std::vec::Vec; use std::cell::RefCell; use std::io::Read; +use std::io; +use std::cell::Cell; const MAX_PACKET_LEN: usize = 1024 * 64; @@ -32,8 +34,8 @@ const MAX_PACKET_LEN: usize = 1024 * 64; struct MyWs { recv_buf_len: usize, recv_buf: BytesMut, - //send_queue: Arc>>>, send_queue: Arc>>>, + recv_queue: Arc>>>, } impl Actor for MyWs { @@ -127,8 +129,29 @@ impl MyWs { impl StreamHandler> for MyWs { fn started(&mut self, ctx: &mut Self::Context) { - let a = ctx.address().recipient(); - //let b = 100; + let a: Recipient = ctx.address().recipient(); + + let mut recv_queue = self.recv_queue.clone(); + tokio::spawn(async move { + loop { + while Arc::downgrade(&mut recv_queue).upgrade().unwrap().lock().unwrap().borrow().len() > 0 { + let bytes = Arc::downgrade(&mut recv_queue).upgrade().unwrap().lock().unwrap().borrow_mut().pop(); + let len = bytes.clone().unwrap().clone().len(); + { + println!("sendmsg len:{0}", len); + } + { + let mut a_bytes = actix_web::web::Bytes::copy_from_slice(bytes.unwrap().as_ref()); + let msg = GSResponse { + data: a_bytes + }; + a.send(msg); + //ctx.binary(a_bytes); + } + } + tokio::time::sleep(Duration::from_millis(10)).await; + } + }); } @@ -142,7 +165,7 @@ impl StreamHandler> for MyWs { //ctx.binary(bin); //self.down_stream.write_all(&bin); self.parse_pkt(&bin); - ctx.binary(bin); + //ctx.binary(bin); }, _ => (), } @@ -153,6 +176,11 @@ impl StreamHandler> for MyWs { #[rtype(result = "()")] pub struct Message(pub String); +#[derive(Message)] +#[rtype(result = "()")] +pub struct GSResponse { + data: actix_web::web::Bytes, +} impl Handler for MyWs { type Result = (); @@ -163,7 +191,13 @@ impl Handler for MyWs { } } +impl Handler for MyWs { + type Result = (); + fn handle(&mut self, msg: GSResponse, ctx: &mut Self::Context) -> Self::Result { + ctx.binary(msg.data); + } +} struct AppStateWithCounter { counter: Mutex, // <- Mutex is necessary to mutate safely across threads @@ -178,22 +212,40 @@ async fn index(req: HttpRequest, stream: web::Payload, data: web::Data { { if r.is_readable() { - let a = 100; + let mut data = vec![0; 1024 * 64]; + // Try to read data, this may still fail with `WouldBlock` + // if the readiness event is a false positive. + match down_stream.try_read(&mut data) { + Ok(n) => { + println!("read {} bytes", n); + let mut tmp_bytes = BytesMut::with_capacity((n + 24) as usize); + } + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + println!("read error 1"); + continue; + } + Err(e) => { + println!("read error 2"); + //return Err(e.into()); + } + } } }