diff --git a/server/stat/src/app/user_app.rs b/server/stat/src/app/user_app.rs index b16384b..9f84615 100644 --- a/server/stat/src/app/user_app.rs +++ b/server/stat/src/app/user_app.rs @@ -80,6 +80,7 @@ impl f9::app::UserApp for UserApp { impl UserApp { pub fn add_down_stream_pack(&self, socket_handle: u16, data: bytes::BytesMut) { + println!("add_down_stream_pack1 len:{0}", data.len()); let node = Rc::new(RefCell::new(DownStreamPack{ holder: Default::default(), socket_handle: socket_handle, diff --git a/server/stat/src/listener/wslistener.rs b/server/stat/src/listener/wslistener.rs index d5e605e..78480e6 100644 --- a/server/stat/src/listener/wslistener.rs +++ b/server/stat/src/listener/wslistener.rs @@ -227,12 +227,13 @@ async fn upstream_enter(stream_arc: Arc>, down_pack_queue: Arc< let node = &work_list.borrow().first_entry(); node.upgrade().unwrap().borrow_mut().entry.borrow_mut().del_init(); data = node.upgrade().unwrap().borrow().data.clone(); + break; //stream.write_all(node.upgrade().unwrap().borrow().data.as_ref()).await; //stream.write_all(data.as_ref()).await; } } if data.len() > 0 { - println!("write_all len:{0}", data.len()); + println!("write_all2 len:{0}", data.len()); stream.write_all(data.as_ref()).await; } } @@ -318,15 +319,9 @@ impl WsListener { } pub fn add_down_stream_pack(&self, data: Weak::>) { + println!("add_down_stream_pack2 len:{0}", data.upgrade().unwrap().borrow().data.len()); let socket_handle = data.upgrade().unwrap().borrow().socket_handle; - match self.socket_hash.get(&socket_handle) { - Some(socket) => { - let a = 0; - } - None => { - let a = 0; - } - } + self.down_pack_queue.lock().as_ref().unwrap().push(&data.upgrade().unwrap().borrow().entry); } fn on_connect(&mut self, conn: &mut WsConn, socket: Recipient) { self.cur_socket_handle += 1;