This commit is contained in:
azw 2024-05-12 16:43:38 +08:00
parent 3045de1e92
commit 700c6b448b
3 changed files with 76 additions and 68 deletions

View File

@ -1,36 +1,24 @@
use std::rc::{Rc, Weak};
use std::cell::RefCell;
use r9_macro::SharedFromSelf;
use r9_macro_derive::SharedFromSelf;
use r9_macro_derive::Singleton;
use r9::Queue;
use f9::app::App;
use std::sync::Arc;
use std::sync::Mutex;
use crate::listener::WsListener;
use crate::upstream::UpStreamMgr;
use bytes::BytesMut;
use r9::xtimer::TimerList;
use std::sync::OnceLock;
pub struct DownStreamPack {
pub holder: Option<Rc::<RefCell::<DownStreamPack>>>,
pub holder: Option<Arc::<std::sync::Mutex::<Self>>>,
pub socket_handle: u16,
pub data: bytes::BytesMut,
pub entry: Rc::<RefCell::<r9::ListHead<DownStreamPack>>>,
pub entry: Arc::<std::sync::Mutex::<r9::ListHeadLock<Self>>>,
}
unsafe impl Send for DownStreamPack {}
unsafe impl Sync for DownStreamPack {}
pub struct UserApp {
net_msg_queue: Rc::<RefCell::<r9::Queue<DownStreamPack>>>,
net_msg_queue: Arc::<std::sync::Mutex::<r9::QueueLock<DownStreamPack>>>,
}
unsafe impl Send for UserApp {}
unsafe impl Sync for UserApp {}
impl f9::app::UserApp for UserApp {
fn get_pkg_name(&self) -> String {
return "statserver".to_string();
return "wsproxy".to_string();
}
fn init(&self) {
@ -39,12 +27,28 @@ impl f9::app::UserApp for UserApp {
}
fn update(&self) {
self.net_msg_queue.borrow_mut().fetch();
let work_list = self.net_msg_queue.borrow().work_list.lock().unwrap().clone();
while !work_list.borrow().empty() {
let node = &work_list.borrow().first_entry();
node.upgrade().unwrap().borrow_mut().entry.borrow_mut().del_init();
WsListener::instance().borrow().add_down_stream_pack(node.clone());
let mut work_is_empty = false;
while !work_is_empty {
let mut cur_node: Option<std::sync::Weak::<std::sync::Mutex::<DownStreamPack>>> = None;
match self.fetch_work_list().lock() {
Ok(work_list) => {
if !work_list.empty() {
let node = &work_list.first_entry();
cur_node = Some(node.clone());
} else {
work_is_empty = true;
break;
}
}
Err(poisoned) => {}
}
match cur_node {
Some(v) => {
v.upgrade().unwrap().lock().unwrap().entry.lock().unwrap().del_init();
WsListener::instance().borrow().add_down_stream_pack(v.clone());
}
None => {}
}
}
WsListener::instance().borrow().update();
}
@ -63,28 +67,32 @@ static UserAppInstance: OnceLock<UserApp> = OnceLock::new();
impl UserApp {
pub fn instance() -> &'static UserApp {
return UserAppInstance.get_or_init(|| UserApp::new());
pub fn instance() -> &'static Self {
return UserAppInstance.get_or_init(|| Self::new());
}
pub fn add_down_stream_pack(&self, socket_handle: u16, data: bytes::BytesMut) {
println!("add_down_stream_pack1 len:{0}", data.len());
let node = Rc::new(RefCell::new(DownStreamPack{
holder: Default::default(),
socket_handle: socket_handle,
data: data,
entry: r9::ListHead::< crate::app::user_app::DownStreamPack>::new_node(Default::default()),
}));
node.borrow_mut().holder = Some(node.clone());
node.borrow_mut().entry = r9::ListHead::< crate::app::user_app::DownStreamPack>::new_node(Rc::downgrade(&node));
self.net_msg_queue.borrow_mut().push(&node.borrow_mut().entry);
}
pub fn new() -> Self {
let p = UserApp{
net_msg_queue: r9::Queue::<DownStreamPack>::new(),
net_msg_queue: r9::QueueLock::<DownStreamPack>::new(),
};
return p;
}
fn fetch_work_list(&self) -> Arc::<Mutex::<r9::ListHeadLock<DownStreamPack>>> {
self.net_msg_queue.lock().unwrap().fetch();
return self.net_msg_queue.lock().unwrap().work_list.lock().unwrap().clone();
}
pub fn add_down_stream_pack(&self, socket_handle: u16, data: bytes::BytesMut) {
println!("add_down_stream_pack1 len:{0}", data.len());
let node = Arc::new(Mutex::new(DownStreamPack{
holder: Default::default(),
socket_handle: socket_handle,
data: data,
entry: r9::ListHeadLock::< crate::app::user_app::DownStreamPack>::new_node(Default::default()),
}));
node.lock().unwrap().holder = Some(node.clone());
node.lock().unwrap().entry = r9::ListHeadLock::< crate::app::user_app::DownStreamPack>::new_node(Arc::downgrade(&node));
self.net_msg_queue.lock().unwrap().push(&node.lock().unwrap().entry);
}
}

View File

@ -44,8 +44,8 @@ pub struct WsListener {
cur_socket_handle: u16,
pub upstream: std::sync::Weak<Mutex<TcpStream>>,
tokio_rt: Runtime,
down_pack_queue: Arc<std::sync::Mutex<r9::Queue<DownStreamPack>>>,
up_pack_queue: Arc<std::sync::Mutex<r9::Queue<DownStreamPack>>>,
down_pack_queue: Arc<std::sync::Mutex<r9::QueueLock<DownStreamPack>>>,
up_pack_queue: Arc<std::sync::Mutex<r9::QueueLock<DownStreamPack>>>,
}
#[derive(Message)]
@ -185,8 +185,8 @@ async fn index(req: HttpRequest, stream: web::Payload) -> Result<HttpResponse, E
resp
}
async fn upstream_enter(stream_arc: Arc<Mutex<TcpStream>>, down_pack_queue: Arc<std::sync::Mutex::<r9::Queue<DownStreamPack>>>,
up_pack_queue: Arc<std::sync::Mutex::<r9::Queue<DownStreamPack>>>) {
async fn upstream_enter(stream_arc: Arc<Mutex<TcpStream>>, down_pack_queue: Arc<std::sync::Mutex::<r9::QueueLock<DownStreamPack>>>,
up_pack_queue: Arc<std::sync::Mutex::<r9::QueueLock<DownStreamPack>>>) {
tokio::spawn(async move {
println!("upstream_enter2");
let mut recv_buf = BytesMut::with_capacity(1024 * 64 * 3);
@ -209,7 +209,7 @@ async fn upstream_enter(stream_arc: Arc<Mutex<TcpStream>>, down_pack_queue: Arc<
let mut data = vec![0; 1024 * 64];
match stream.try_read(&mut data) {
Ok(len) => {
println!("read {0} bytes",len);
println!("read {0} bytes recv_buf_len:{1}",len, recv_buf_len);
for i in 0..len {
recv_buf[recv_buf_len + i] = data[i];
//println!("sendmsg xxxxx idx:{0} val:{1} buf_val:{2} recv_buf_len:{3}", i, data[i], recv_buf[recv_buf_len + 1], recv_buf_len);
@ -239,20 +239,20 @@ async fn upstream_enter(stream_arc: Arc<Mutex<TcpStream>>, down_pack_queue: Arc<
}
let mut a_bytes = actix_web::web::Bytes::copy_from_slice(tmp_bytes.as_ref());
{
let node = Rc::new(RefCell::new(DownStreamPack{
let node = Arc::new(std::sync::Mutex::new(DownStreamPack{
holder: Default::default(),
socket_handle: socket_handle,
data: tmp_bytes,
entry: r9::ListHead::< crate::app::user_app::DownStreamPack>::new_node(Default::default()),
entry: r9::ListHeadLock::<DownStreamPack>::new_node(Default::default()),
}));
node.borrow_mut().holder = Some(node.clone());
node.borrow_mut().entry = r9::ListHead::< crate::app::user_app::DownStreamPack>::new_node(Rc::downgrade(&node));
up_pack_queue.lock().unwrap().push(&node.borrow_mut().entry);
node.lock().unwrap().holder = Some(node.clone());
node.lock().unwrap().entry = r9::ListHeadLock::< crate::app::user_app::DownStreamPack>::new_node(Arc::downgrade(&node));
up_pack_queue.lock().unwrap().push(&node.lock().unwrap().entry);
}
//a.do_send(msg);
{
println!("sendmsg socket_handle:{5} pack_len:{0} msg_id:{1} seq_id:{2} magic_code:{3} ext_len:{4}",
println!("sendmsg socket_handle:{0} pack_len:{1} msg_id:{2} seq_id:{3} magic_code:{4} ext_len:{5}",
socket_handle,
pack_len,
msg_id,
@ -287,10 +287,10 @@ async fn upstream_enter(stream_arc: Arc<Mutex<TcpStream>>, down_pack_queue: Arc<
//down_pack_queue.get_mut().fetch();
let work_list = down_pack_queue.lock().unwrap().work_list.lock().unwrap().clone();
//let work_list = down_pack_queue.get_mut().work_list.lock().unwrap().clone();
while !work_list.borrow().empty() {
let node = &work_list.borrow().first_entry();
node.upgrade().unwrap().borrow_mut().entry.borrow_mut().del_init();
data = node.upgrade().unwrap().borrow().data.clone();
while !work_list.lock().unwrap().empty() {
let node = &work_list.lock().unwrap().first_entry();
node.upgrade().unwrap().lock().unwrap().entry.lock().unwrap().del_init();
data = node.upgrade().unwrap().lock().unwrap().data.clone();
break;
//stream.write_all(node.upgrade().unwrap().borrow().data.as_ref()).await;
//stream.write_all(data.as_ref()).await;
@ -324,8 +324,8 @@ impl WsListener {
.enable_all()
.build()
.unwrap(),
down_pack_queue: r9::Queue::<DownStreamPack>::new_ex(),
up_pack_queue: r9::Queue::<DownStreamPack>::new_ex(),
down_pack_queue: r9::QueueLock::<DownStreamPack>::new_ex(),
up_pack_queue: r9::QueueLock::<DownStreamPack>::new_ex(),
};
return p;
}
@ -386,11 +386,11 @@ impl WsListener {
//down_pack_queue.get_mut().fetch();
let work_list = self.up_pack_queue.lock().unwrap().work_list.lock().unwrap().clone();
//let work_list = down_pack_queue.get_mut().work_list.lock().unwrap().clone();
while !work_list.borrow().empty() {
let node = &work_list.borrow().first_entry();
node.upgrade().unwrap().borrow_mut().entry.borrow_mut().del_init();
data = node.upgrade().unwrap().borrow().data.clone();
socket_handle = node.upgrade().unwrap().borrow().socket_handle;
while !work_list.lock().unwrap().empty() {
let node = &work_list.lock().unwrap().first_entry();
node.upgrade().unwrap().lock().unwrap().entry.lock().unwrap().del_init();
data = node.upgrade().unwrap().lock().unwrap().data.clone();
socket_handle = node.upgrade().unwrap().lock().unwrap().socket_handle;
break;
//stream.write_all(node.upgrade().unwrap().borrow().data.as_ref()).await;
//stream.write_all(data.as_ref()).await;
@ -416,10 +416,10 @@ impl WsListener {
}
pub fn add_down_stream_pack(&self, data: Weak::<RefCell::<DownStreamPack>>) {
println!("add_down_stream_pack2 len:{0}", data.upgrade().unwrap().borrow().data.len());
let socket_handle = data.upgrade().unwrap().borrow().socket_handle;
self.down_pack_queue.lock().as_ref().unwrap().push(&data.upgrade().unwrap().borrow().entry);
pub fn add_down_stream_pack(&self, data: std::sync::Weak::<std::sync::Mutex::<DownStreamPack>>) {
println!("add_down_stream_pack2 len:{0}", data.upgrade().unwrap().lock().unwrap().data.len());
let socket_handle = data.upgrade().unwrap().lock().unwrap().socket_handle;
self.down_pack_queue.lock().as_ref().unwrap().push(&data.upgrade().unwrap().lock().unwrap().entry);
}
fn on_connect(&mut self, conn: &mut WsConn, socket: Recipient<DownStreamMessage>) {
self.cur_socket_handle += 1;

2
third_party/librust vendored

@ -1 +1 @@
Subproject commit c3280f8747cba62bb6809d30ab3014dea52981d2
Subproject commit 0611d8063c8ff2a1f833468f3ca7d1e8e0770e4c