diff --git a/server/stat/src/app/user_app.rs b/server/stat/src/app/user_app.rs index 69d7fc3..518f245 100644 --- a/server/stat/src/app/user_app.rs +++ b/server/stat/src/app/user_app.rs @@ -19,6 +19,9 @@ pub struct DownStreamPack { entry: Rc::>>, } +unsafe impl Send for DownStreamPack {} +unsafe impl Sync for DownStreamPack {} + #[derive(SharedFromSelf)] #[derive(Singleton)] pub struct UserApp { @@ -56,9 +59,8 @@ impl f9::app::UserApp for UserApp { fn update(&mut self) { self.net_msg_queue.borrow_mut().fetch(); - let work_list = self.net_msg_queue.borrow_mut().work_list.clone(); + let work_list = self.net_msg_queue.borrow().work_list.lock().unwrap().clone(); while !work_list.borrow().empty() { - println!("work_list exec"); let node = &work_list.borrow().first_entry(); node.upgrade().unwrap().borrow_mut().entry.borrow_mut().del_init(); } diff --git a/server/stat/src/listener/wslistener.rs b/server/stat/src/listener/wslistener.rs index dac245e..6907f9d 100644 --- a/server/stat/src/listener/wslistener.rs +++ b/server/stat/src/listener/wslistener.rs @@ -17,11 +17,16 @@ use actix_web::{ use std::sync::{Arc, Mutex}; use std::thread; use std::thread::JoinHandle; +use std::time::Duration; use actix::prelude::*; use bytes::{BufMut, BytesMut}; use actix_web::web::Bytes; +use tokio::io::Interest; use crate::app::UserApp; use crate::{AppStateWithCounter, GSResponse, MAX_PACKET_LEN}; +use tokio::net::TcpStream; +use tokio::runtime::Runtime; +use crate::app::user_app::DownStreamPack; #[derive(SharedFromSelf)] #[derive(Singleton)] @@ -30,6 +35,9 @@ pub struct WsListener { work_thread: Option>, socket_hash: HashMap>, cur_socket_handle: u16, + upstream: Arc>>, + tokio_rt: Runtime, + down_pack_queue: Arc>>, } #[derive(Message)] @@ -168,6 +176,64 @@ async fn index(req: HttpRequest, stream: web::Payload) -> Result>>,) { + tokio::spawn(async move { + loop { + tokio::time::sleep(Duration::from_millis(10)).await; + let ready = stream.ready(Interest::READABLE | Interest::WRITABLE).await; + match ready { + Ok(r) => { + { + if r.is_readable() { + let mut data = vec![0; 1024 * 64]; + // Try to read data, this may still fail with `WouldBlock` + // if the readiness event is a false positive. + match stream.try_read(&mut data) { + Ok(n) => { + //println!("read {} bytes", n); + let mut tmp_bytes = BytesMut::with_capacity((n + 0) as usize); + for i in 0..n { + tmp_bytes.put_u8(data[i]); + } + //Arc::downgrade(&mut recv_queue).upgrade().unwrap().lock().unwrap().borrow_mut().insert(0, tmp_bytes); + } + Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { + println!("read error 1"); + continue; + } + Err(e) => { + println!("read error 2"); + //return Err(e.into()); + } + } + } + } + + { + if r.is_writable() { + /* + while Arc::downgrade(&mut send_queue).upgrade().unwrap().lock().unwrap().borrow().len() > 0 { + //Arc::downgrade(&mut send_queue).upgrade().unwrap().lock().unwrap().borrow_mut()[0] + let bytes = Arc::downgrade(&mut send_queue).upgrade().unwrap().lock().unwrap().borrow_mut().pop(); + let len = bytes.clone().unwrap().clone().len(); + { + //println!("sendmsg len:{0}", len); + } + { + down_stream.write_all(bytes.unwrap().as_ref()).await; + } + }*/ + } + } + } + Err(e) => { + + } + } + } + }); +} + impl WsListener { pub fn new() -> Self { @@ -176,6 +242,12 @@ impl WsListener { _self_wp: Default::default(), work_thread: Default::default(), socket_hash: Default::default(), + upstream: Default::default(), + tokio_rt: tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(), + down_pack_queue: r9::Queue::::new_ex(), }; return p; } @@ -193,6 +265,7 @@ impl WsListener { let counter = web::Data::new(AppStateWithCounter { counter: Mutex::new(0), }); + println!("hello2333"); HttpServer::new(move || { // move counter into the closure ActixApp::new() // <- register the created data @@ -204,6 +277,23 @@ impl WsListener { .await }); })); + { + let upstream = self.upstream.clone(); + let down_pack_queue = self.down_pack_queue.clone(); + self.tokio_rt.spawn(async move { + let mut result = TcpStream::connect("192.168.100.39:7616").await; + match result { + Ok(v) => { + //upstream.lock().unwrap().replace(v); + println!("connect ok"); + upstream_enter(v, down_pack_queue); + }, + Err(e) => { + println!("connect err") + } + } + }); + } println!("hello3"); } } diff --git a/third_party/librust b/third_party/librust index cbca2a6..c950f6d 160000 --- a/third_party/librust +++ b/third_party/librust @@ -1 +1 @@ -Subproject commit cbca2a666f64fe55357179c1ddd13e1ed5a43c9c +Subproject commit c950f6d2883a3213a17d514eded9fdcc86f400b5