This commit is contained in:
azw 2024-05-07 16:48:47 +08:00
parent 0226ca4366
commit 38a8b35364

View File

@ -25,6 +25,8 @@ use tokio::io::Interest;
use std::vec::Vec; use std::vec::Vec;
use std::cell::RefCell; use std::cell::RefCell;
use std::io::Read; use std::io::Read;
use std::io;
use std::cell::Cell;
const MAX_PACKET_LEN: usize = 1024 * 64; const MAX_PACKET_LEN: usize = 1024 * 64;
@ -32,8 +34,8 @@ const MAX_PACKET_LEN: usize = 1024 * 64;
struct MyWs { struct MyWs {
recv_buf_len: usize, recv_buf_len: usize,
recv_buf: BytesMut, recv_buf: BytesMut,
//send_queue: Arc<Mutex<RefCell<Vec::<actix_web::web::Bytes>>>>,
send_queue: Arc<Mutex<RefCell<Vec::<BytesMut>>>>, send_queue: Arc<Mutex<RefCell<Vec::<BytesMut>>>>,
recv_queue: Arc<Mutex<RefCell<Vec::<BytesMut>>>>,
} }
impl Actor for MyWs { impl Actor for MyWs {
@ -127,8 +129,29 @@ impl MyWs {
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for MyWs { impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for MyWs {
fn started(&mut self, ctx: &mut Self::Context) { fn started(&mut self, ctx: &mut Self::Context) {
let a = ctx.address().recipient(); let a: Recipient<GSResponse> = ctx.address().recipient();
//let b = 100;
let mut recv_queue = self.recv_queue.clone();
tokio::spawn(async move {
loop {
while Arc::downgrade(&mut recv_queue).upgrade().unwrap().lock().unwrap().borrow().len() > 0 {
let bytes = Arc::downgrade(&mut recv_queue).upgrade().unwrap().lock().unwrap().borrow_mut().pop();
let len = bytes.clone().unwrap().clone().len();
{
println!("sendmsg len:{0}", len);
}
{
let mut a_bytes = actix_web::web::Bytes::copy_from_slice(bytes.unwrap().as_ref());
let msg = GSResponse {
data: a_bytes
};
a.send(msg);
//ctx.binary(a_bytes);
}
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
});
} }
@ -142,7 +165,7 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for MyWs {
//ctx.binary(bin); //ctx.binary(bin);
//self.down_stream.write_all(&bin); //self.down_stream.write_all(&bin);
self.parse_pkt(&bin); self.parse_pkt(&bin);
ctx.binary(bin); //ctx.binary(bin);
}, },
_ => (), _ => (),
} }
@ -153,6 +176,11 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for MyWs {
#[rtype(result = "()")] #[rtype(result = "()")]
pub struct Message(pub String); pub struct Message(pub String);
#[derive(Message)]
#[rtype(result = "()")]
pub struct GSResponse {
data: actix_web::web::Bytes,
}
impl Handler<Message> for MyWs { impl Handler<Message> for MyWs {
type Result = (); type Result = ();
@ -163,7 +191,13 @@ impl Handler<Message> for MyWs {
} }
} }
impl Handler<GSResponse> for MyWs {
type Result = ();
fn handle(&mut self, msg: GSResponse, ctx: &mut Self::Context) -> Self::Result {
ctx.binary(msg.data);
}
}
struct AppStateWithCounter { struct AppStateWithCounter {
counter: Mutex<i32>, // <- Mutex is necessary to mutate safely across threads counter: Mutex<i32>, // <- Mutex is necessary to mutate safely across threads
@ -178,22 +212,40 @@ async fn index(req: HttpRequest, stream: web::Payload, data: web::Data<AppStateW
recv_buf_len: 0, recv_buf_len: 0,
recv_buf: BytesMut::with_capacity(1024 * 64 * 2), recv_buf: BytesMut::with_capacity(1024 * 64 * 2),
send_queue: Default::default(), send_queue: Default::default(),
recv_queue: Default::default(),
//addr: Default::default(),
}; };
{ {
let mut down_stream = TcpStream::connect("192.168.100.39:7616").await?; let mut down_stream = TcpStream::connect("192.168.100.39:7616").await?;
down_stream.set_nodelay(true); down_stream.set_nodelay(true);
//down_stream.set_nonblocking(true)?; //down_stream.set_nonblocking(true)?;
let mut send_queue = ws_client.send_queue.clone(); let mut send_queue = ws_client.send_queue.clone();
let mut recv_queue = ws_client.recv_queue.clone();
tokio::spawn(async move { tokio::spawn(async move {
loop { loop {
tokio::time::sleep(Duration::from_secs(1)).await; tokio::time::sleep(Duration::from_millis(10)).await;
let ready = down_stream.ready(Interest::READABLE | Interest::WRITABLE).await; let ready = down_stream.ready(Interest::READABLE | Interest::WRITABLE).await;
match ready { match ready {
Ok(r) => { Ok(r) => {
{ {
if r.is_readable() { if r.is_readable() {
let a = 100; let mut data = vec![0; 1024 * 64];
// Try to read data, this may still fail with `WouldBlock`
// if the readiness event is a false positive.
match down_stream.try_read(&mut data) {
Ok(n) => {
println!("read {} bytes", n);
let mut tmp_bytes = BytesMut::with_capacity((n + 24) as usize);
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
println!("read error 1");
continue;
}
Err(e) => {
println!("read error 2");
//return Err(e.into());
}
}
} }
} }