This commit is contained in:
azw 2024-05-11 09:24:41 +08:00
parent 8ed90029b3
commit da9b5aff3f
2 changed files with 22 additions and 15 deletions

View File

@ -15,8 +15,8 @@ use r9::xtimer::TimerList;
pub struct DownStreamPack { pub struct DownStreamPack {
holder: Option<Rc::<RefCell::<DownStreamPack>>>, holder: Option<Rc::<RefCell::<DownStreamPack>>>,
socket_handle: u16, socket_handle: u16,
data: bytes::BytesMut, pub data: bytes::BytesMut,
entry: Rc::<RefCell::<r9::ListHead<DownStreamPack>>>, pub entry: Rc::<RefCell::<r9::ListHead<DownStreamPack>>>,
} }
unsafe impl Send for DownStreamPack {} unsafe impl Send for DownStreamPack {}

View File

@ -27,6 +27,7 @@ use crate::{AppStateWithCounter, GSResponse, MAX_PACKET_LEN};
use tokio::net::TcpStream; use tokio::net::TcpStream;
use tokio::runtime::Runtime; use tokio::runtime::Runtime;
use crate::app::user_app::DownStreamPack; use crate::app::user_app::DownStreamPack;
use tokio::io::{AsyncWriteExt, Ready};
#[derive(SharedFromSelf)] #[derive(SharedFromSelf)]
#[derive(Singleton)] #[derive(Singleton)]
@ -176,8 +177,10 @@ async fn index(req: HttpRequest, stream: web::Payload) -> Result<HttpResponse, E
resp resp
} }
async fn upstream_enter(stream: TcpStream, down_pack_queue: Arc<Mutex::<r9::Queue<DownStreamPack>>>,) { async fn upstream_enter(mut stream: TcpStream, down_pack_queue: Arc<Mutex::<r9::Queue<DownStreamPack>>>,) {
println!("upstream_enter1");
tokio::spawn(async move { tokio::spawn(async move {
println!("upstream_enter2");
loop { loop {
tokio::time::sleep(Duration::from_millis(10)).await; tokio::time::sleep(Duration::from_millis(10)).await;
let ready = stream.ready(Interest::READABLE | Interest::WRITABLE).await; let ready = stream.ready(Interest::READABLE | Interest::WRITABLE).await;
@ -211,18 +214,22 @@ async fn upstream_enter(stream: TcpStream, down_pack_queue: Arc<Mutex::<r9::Queu
{ {
if r.is_writable() { if r.is_writable() {
/* let mut data = bytes::BytesMut::new();
while Arc::downgrade(&mut send_queue).upgrade().unwrap().lock().unwrap().borrow().len() > 0 {
//Arc::downgrade(&mut send_queue).upgrade().unwrap().lock().unwrap().borrow_mut()[0]
let bytes = Arc::downgrade(&mut send_queue).upgrade().unwrap().lock().unwrap().borrow_mut().pop();
let len = bytes.clone().unwrap().clone().len();
{ {
//println!("sendmsg len:{0}", len); down_pack_queue.lock().unwrap().fetch();
let work_list = down_pack_queue.lock().unwrap().work_list.lock().unwrap().clone();
while !work_list.borrow().empty() {
let node = &work_list.borrow().first_entry();
node.upgrade().unwrap().borrow_mut().entry.borrow_mut().del_init();
data = node.upgrade().unwrap().borrow().data.clone();
//stream.write_all(node.upgrade().unwrap().borrow().data.as_ref()).await;
//stream.write_all(data.as_ref()).await;
} }
{
down_stream.write_all(bytes.unwrap().as_ref()).await;
} }
}*/ if data.len() > 0 {
println!("write_all len:{0}", data.len());
stream.write_all(data.as_ref()).await;
}
} }
} }
} }
@ -286,7 +293,7 @@ impl WsListener {
Ok(v) => { Ok(v) => {
//upstream.lock().unwrap().replace(v); //upstream.lock().unwrap().replace(v);
println!("connect ok"); println!("connect ok");
upstream_enter(v, down_pack_queue); upstream_enter(v, down_pack_queue).await;
}, },
Err(e) => { Err(e) => {
println!("connect err") println!("connect err")