This commit is contained in:
azw 2024-05-12 19:40:11 +08:00
parent d5bfb38690
commit a37723dd6c
3 changed files with 32 additions and 6 deletions

View File

@ -10,6 +10,13 @@ pub struct DownStreamPack {
pub entry: Arc::<std::sync::Mutex::<r9::ListHeadLock<Self>>>,
}
pub struct UpStreamPack {
holder: Option<Arc::<std::sync::Mutex::<Self>>>,
pub socket_handle: u16,
pub data: bytes::BytesMut,
pub entry: Arc::<std::sync::Mutex::<r9::ListHeadLock<Self>>>,
}
#[derive(Message)]
#[rtype(result = "()")]
pub struct DownStreamMessage {
@ -30,4 +37,20 @@ impl DownStreamPack {
return p;
}
}
impl UpStreamPack {
pub fn new(socket_handle: u16, data: bytes::BytesMut) -> Arc<Mutex<Self>> {
let p = Arc::new(std::sync::Mutex::new(UpStreamPack{
holder: Default::default(),
socket_handle: socket_handle,
data: data,
entry: r9::ListHeadLock::<Self>::new_node(Default::default()),
}));
p.lock().unwrap().holder = Some(p.clone());
p.lock().unwrap().entry = r9::ListHeadLock::<Self>::new_node(Arc::downgrade(&p));
return p;
}
}

View File

@ -21,7 +21,7 @@ use crate::app::UserApp;
use crate::constant;
use tokio::net::TcpStream;
use tokio::runtime::Runtime;
use crate::common::types::{DownStreamPack, DownStreamMessage};
use crate::common::types::{DownStreamPack, DownStreamMessage, UpStreamPack};
use tokio::io::{AsyncWriteExt};
use std::sync::OnceLock;
use std::sync::atomic::{AtomicU16, Ordering};
@ -37,7 +37,7 @@ pub struct WsListener {
pub upstream: std::sync::Mutex<Option<std::sync::Arc<Mutex<TcpStream>>>>,
tokio_rt: Runtime,
down_pack_queue: Arc<std::sync::Mutex<r9::QueueLock<DownStreamPack>>>,
up_pack_queue: Arc<std::sync::Mutex<r9::QueueLock<DownStreamPack>>>,
up_pack_queue: Arc<std::sync::Mutex<r9::QueueLock<UpStreamPack>>>,
}
static WsListenerInstance: OnceLock<WsListener> = OnceLock::new();
@ -173,7 +173,7 @@ async fn index(req: HttpRequest, stream: web::Payload) -> Result<HttpResponse, E
}
async fn upstream_enter(stream_arc: Arc<Mutex<TcpStream>>, down_pack_queue: Arc<std::sync::Mutex::<r9::QueueLock<DownStreamPack>>>,
up_pack_queue: Arc<std::sync::Mutex::<r9::QueueLock<DownStreamPack>>>) {
up_pack_queue: Arc<std::sync::Mutex::<r9::QueueLock<UpStreamPack>>>) {
tokio::spawn(async move {
println!("upstream_enter2");
let mut recv_buf = BytesMut::with_capacity(1024 * 64 * 3);
@ -226,7 +226,7 @@ async fn upstream_enter(stream_arc: Arc<Mutex<TcpStream>>, down_pack_queue: Arc<
}
let mut a_bytes = actix_web::web::Bytes::copy_from_slice(tmp_bytes.as_ref());
{
let node = DownStreamPack::new(socket_handle, tmp_bytes);
let node = UpStreamPack::new(socket_handle, tmp_bytes);
up_pack_queue.lock().unwrap().push(&node.lock().unwrap().entry);
}
//a.do_send(msg);
@ -308,7 +308,7 @@ impl WsListener {
.build()
.unwrap(),
down_pack_queue: r9::QueueLock::<DownStreamPack>::new_ex(),
up_pack_queue: r9::QueueLock::<DownStreamPack>::new_ex(),
up_pack_queue: r9::QueueLock::<UpStreamPack>::new_ex(),
};
return p;
}

View File

@ -1,3 +1,6 @@
use std::sync::{Arc, Mutex};
use r9;
use crate::common::DownStreamPack;
pub struct UpStream {
down_pack_queue: Arc<Mutex::<r9::QueueLock<DownStreamPack>>>,
}