diff --git a/server/stat/src/app/user_app.rs b/server/stat/src/app/user_app.rs index 162fd14..96dfb37 100644 --- a/server/stat/src/app/user_app.rs +++ b/server/stat/src/app/user_app.rs @@ -78,14 +78,7 @@ impl UserApp { pub fn add_down_stream_pack(&self, socket_handle: u16, data: bytes::BytesMut) { println!("add_down_stream_pack1 len:{0}", data.len()); - let node = Arc::new(Mutex::new(DownStreamPack{ - holder: Default::default(), - socket_handle: socket_handle, - data: data, - entry: r9::ListHeadLock::< crate::app::user_app::DownStreamPack>::new_node(Default::default()), - })); - node.lock().unwrap().holder = Some(node.clone()); - node.lock().unwrap().entry = r9::ListHeadLock::< crate::app::user_app::DownStreamPack>::new_node(Arc::downgrade(&node)); + let node = DownStreamPack::new(socket_handle, data); self.net_msg_queue.lock().unwrap().push(&node.lock().unwrap().entry); } diff --git a/server/stat/src/common/types.rs b/server/stat/src/common/types.rs index f7164d7..82669a9 100644 --- a/server/stat/src/common/types.rs +++ b/server/stat/src/common/types.rs @@ -1,8 +1,26 @@ use std::sync::Arc; +use std::sync::Mutex; +use actix::dev::MessageResponse; pub struct DownStreamPack { - pub holder: Option>>, + holder: Option>>, pub socket_handle: u16, pub data: bytes::BytesMut, pub entry: Arc::>>, +} + +impl DownStreamPack { + + pub fn new(socket_handle: u16, data: bytes::BytesMut) -> Arc> { + let p = Arc::new(std::sync::Mutex::new(DownStreamPack{ + holder: Default::default(), + socket_handle: socket_handle, + data: data, + entry: r9::ListHeadLock::::new_node(Default::default()), + })); + p.lock().unwrap().holder = Some(p.clone()); + p.lock().unwrap().entry = r9::ListHeadLock::::new_node(Arc::downgrade(&p)); + return p; + } + } \ No newline at end of file diff --git a/server/stat/src/listener/wslistener.rs b/server/stat/src/listener/wslistener.rs index 0ef8b59..dec907c 100644 --- a/server/stat/src/listener/wslistener.rs +++ b/server/stat/src/listener/wslistener.rs @@ -232,14 +232,7 @@ async fn upstream_enter(stream_arc: Arc>, down_pack_queue: Arc< } let mut a_bytes = actix_web::web::Bytes::copy_from_slice(tmp_bytes.as_ref()); { - let node = Arc::new(std::sync::Mutex::new(DownStreamPack{ - holder: Default::default(), - socket_handle: socket_handle, - data: tmp_bytes, - entry: r9::ListHeadLock::::new_node(Default::default()), - })); - node.lock().unwrap().holder = Some(node.clone()); - node.lock().unwrap().entry = r9::ListHeadLock::::new_node(Arc::downgrade(&node)); + let node = DownStreamPack::new(socket_handle, tmp_bytes); up_pack_queue.lock().unwrap().push(&node.lock().unwrap().entry); } //a.do_send(msg);