diff --git a/server/stat/src/app/user_app.rs b/server/stat/src/app/user_app.rs index 728840b..17e82be 100644 --- a/server/stat/src/app/user_app.rs +++ b/server/stat/src/app/user_app.rs @@ -8,6 +8,7 @@ use std::sync::OnceLock; pub struct UserApp { pub _masterMgr: MasterMgr, + pub _upstream_mgr: UpStreamMgr, down_stream_msg_queue: Arc::>>, } @@ -20,7 +21,7 @@ impl f9::app::UserApp for UserApp { fn init(&self) { MasterMgr::instance().init(); WsListener::instance().init(); - UpStreamMgr::instance().borrow_mut().init(); + UpStreamMgr::instance().init(); } fn update(&self) { @@ -52,6 +53,7 @@ impl f9::app::UserApp for UserApp { fn uninit(&self) { WsListener::instance().uninit(); + UpStreamMgr::instance().uninit(); MasterMgr::instance().uninit(); } @@ -71,6 +73,7 @@ impl UserApp { pub fn new() -> Self { let p = UserApp{ _masterMgr: MasterMgr::new(), + _upstream_mgr: UpStreamMgr::new(), down_stream_msg_queue: r9::QueueLock::::new(), }; return p; diff --git a/server/stat/src/upstream/upstreammgr.rs b/server/stat/src/upstream/upstreammgr.rs index f6c7a01..38444d1 100644 --- a/server/stat/src/upstream/upstreammgr.rs +++ b/server/stat/src/upstream/upstreammgr.rs @@ -1,93 +1,42 @@ -use std::rc::{Rc, Weak}; -use std::cell::RefCell; -use std::sync::{Arc, Mutex}; -use tokio::net::{TcpStream, ToSocketAddrs}; -use tokio::io::AsyncWriteExt; +use std::collections::HashMap; +use std::rc::Rc; use tokio::runtime::Runtime; -use tokio::io::Interest; -use std::error::Error; -use std::io; +use crate::upstream::UpStream; +use crate::app::UserApp; -use r9_macro::SharedFromSelf; -use r9_macro_derive::SharedFromSelf; -use r9_macro_derive::Singleton; - - -#[derive(SharedFromSelf)] -#[derive(Singleton)] pub struct UpStreamMgr { - _self_wp: Weak::>, + curr_id: i16, + key_hash: HashMap>, + id_hash: HashMap>, + tokio_rt: Runtime, } - -async fn run_app(send_queue: Arc::>>) -> Result<(), Box> { - let mut stream = TcpStream::connect("192.168.100.39:7617").await?; - - loop { - let ready = stream.ready(Interest::READABLE | Interest::WRITABLE).await?; - - if ready.is_readable() { - let mut data = vec![0; 1024]; - // Try to read data, this may still fail with `WouldBlock` - // if the readiness event is a false positive. - match stream.try_read(&mut data) { - Ok(n) => { - println!("read {} bytes", n); - } - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - continue; - } - Err(e) => { - //return Err(e.into()); - } - } - - } - - if ready.is_writable() { - // Try to write data, this may still fail with `WouldBlock` - // if the readiness event is a false positive. - match stream.try_write(b"hello world") { - Ok(n) => { - println!("write {} bytes", n); - } - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - continue - } - Err(e) => { - //return Err(e.into()); - } - } - } - } - - Ok(()) -} - +unsafe impl Send for UpStreamMgr{} +unsafe impl Sync for UpStreamMgr{} impl UpStreamMgr { + pub fn instance() -> &'static Self { + return &UserApp::instance()._upstream_mgr; + } + pub fn new() -> Self { - let p = UpStreamMgr{ - _self_wp: Default::default(), + let p = Self{ + curr_id: 0, + key_hash: Default::default(), + id_hash: Default::default(), + tokio_rt: tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(), }; return p; } - pub fn init(&mut self) { - let send_queue: Arc::>> = - Default::default(); - //let msg : Arc::>> = Arc::new(); - std::thread::spawn(move || { - let rt = tokio::runtime::Builder::new_multi_thread() - .worker_threads(4) - .enable_all() - .thread_name("my-custom-name") - .thread_stack_size(3 * 1024 * 1024) - .build() - .unwrap(); - rt.block_on(async { - run_app(send_queue.clone()).await; - }); - }); + pub fn init(&self) { + + + } + + pub fn uninit(&self) { }