diff --git a/server/stat/src/listener/wslistener.rs b/server/stat/src/listener/wslistener.rs index f7d3d3c..c7f489f 100644 --- a/server/stat/src/listener/wslistener.rs +++ b/server/stat/src/listener/wslistener.rs @@ -10,7 +10,7 @@ use actix_web::{ dev::ServerHandle, rt, web, - App as WebApp, + App as ActixApp, web::Data, web::Query, HttpRequest, @@ -19,35 +19,24 @@ use actix_web::{ Error, Responder}; use std::sync::{Arc, Mutex}; +use std::thread; +use std::thread::JoinHandle; use actix::prelude::*; - -#[derive(Debug)] -pub struct WsSession { - /// unique session id - pub id: usize, - - /// Client must send ping at least once per 10 seconds (CLIENT_TIMEOUT), - /// otherwise we drop connection. - pub hb: Instant, - - /// joined room - pub room: String, - - /// peer name - pub name: Option, - - // server - pub addr: Addr, -} +use bytes::BytesMut; +use crate::AppStateWithCounter; #[derive(SharedFromSelf)] #[derive(Singleton)] pub struct WsListener { _self_wp: Weak::>, + work_thread: Option>, } /// Define HTTP actor -pub struct WsConn; +pub struct WsConn { + recv_buf_len: usize, + recv_buf: BytesMut, +} impl Actor for WsConn { type Context = ws::WebsocketContext; @@ -66,37 +55,52 @@ impl StreamHandler> for WsConn { } async fn index(req: HttpRequest, stream: web::Payload) -> Result { - let resp = ws::start(WsConn{}, &req, stream); + let mut conn = WsConn{ + recv_buf_len: 0, + recv_buf: BytesMut::with_capacity(1024 * 64 * 2) + }; + conn.recv_buf.resize(conn.recv_buf.capacity(), 0); + let resp = ws::start(conn, &req, stream); println!("{:?}", resp); resp } -async fn run_app(port: u16, - tx: std::sync::mpsc::Sender) -> std::io::Result<()> { - let server = HttpServer::new(move || { - WebApp::new() - .route("/",web::get().to(index)) - }) - .bind(("0.0.0.0", port))? - .workers(1) - .run(); - - let _ = tx.send(server.handle()); - - server.await -} - impl WsListener { pub fn new() -> Self { let p = Self{ _self_wp: Default::default(), + work_thread: Default::default(), }; return p; } pub fn init(&mut self) { - + //self.work_thread = Arc::new(Some(1)); + { + self.work_thread = Some(thread::spawn(|| { + let mut rt = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(); + rt.block_on(async { + let counter = web::Data::new(AppStateWithCounter { + counter: Mutex::new(0), + }); + HttpServer::new(move || { + // move counter into the closure + ActixApp::new() + .app_data(counter.clone()) // <- register the created data + .route("/", web::get().to(crate::index)) + }) + .bind(("0.0.0.0", 8080))? + .run() + .await + }); + println!("hello2"); + })); + println!("hello3"); + } } pub fn uninit(&mut self) { diff --git a/server/stat/src/main.rs b/server/stat/src/main.rs index c4bb3a8..1c2ac03 100755 --- a/server/stat/src/main.rs +++ b/server/stat/src/main.rs @@ -90,8 +90,7 @@ impl MyWs { println!("recv buf_len:{0}", tmp_bytes.len()); } { - UserApp::instance().borrow_mut().add_down_stream_pack(tmp_bytes); - //Arc::downgrade(&mut self.send_queue).upgrade().unwrap().lock().unwrap().borrow_mut().insert(0, tmp_bytes); + Arc::downgrade(&mut self.send_queue).upgrade().unwrap().lock().unwrap().borrow_mut().insert(0, tmp_bytes); } offset += (pack_len as usize) + 12; } @@ -338,31 +337,6 @@ fn main1() { fn main() { App::init(UserApp::instance()); - { - let t1 = thread::spawn(||{ - //let mut rt = tokio::runtime::Runtime::new().unwrap(); - let mut rt = tokio::runtime::Builder::new_multi_thread() - .enable_all() - .build() - .unwrap(); - rt.block_on(async { - let counter = web::Data::new(AppStateWithCounter { - counter: Mutex::new(0), - }); - HttpServer::new(move || { - // move counter into the closure - ActixApp::new() - .app_data(counter.clone()) // <- register the created data - .route("/", web::get().to(index)) - }) - .bind(("0.0.0.0", 8080))? - .run() - .await - }); - println!("hello2"); - }); - println!("hello3"); - } App::run(); App::uninit(); }