This commit is contained in:
azw 2024-05-12 19:23:33 +08:00
parent b0ee7e7f58
commit d5bfb38690
2 changed files with 32 additions and 80 deletions

View File

@ -8,6 +8,7 @@ use std::sync::OnceLock;
pub struct UserApp { pub struct UserApp {
pub _masterMgr: MasterMgr, pub _masterMgr: MasterMgr,
pub _upstream_mgr: UpStreamMgr,
down_stream_msg_queue: Arc::<std::sync::Mutex::<r9::QueueLock<DownStreamPack>>>, down_stream_msg_queue: Arc::<std::sync::Mutex::<r9::QueueLock<DownStreamPack>>>,
} }
@ -20,7 +21,7 @@ impl f9::app::UserApp for UserApp {
fn init(&self) { fn init(&self) {
MasterMgr::instance().init(); MasterMgr::instance().init();
WsListener::instance().init(); WsListener::instance().init();
UpStreamMgr::instance().borrow_mut().init(); UpStreamMgr::instance().init();
} }
fn update(&self) { fn update(&self) {
@ -52,6 +53,7 @@ impl f9::app::UserApp for UserApp {
fn uninit(&self) { fn uninit(&self) {
WsListener::instance().uninit(); WsListener::instance().uninit();
UpStreamMgr::instance().uninit();
MasterMgr::instance().uninit(); MasterMgr::instance().uninit();
} }
@ -71,6 +73,7 @@ impl UserApp {
pub fn new() -> Self { pub fn new() -> Self {
let p = UserApp{ let p = UserApp{
_masterMgr: MasterMgr::new(), _masterMgr: MasterMgr::new(),
_upstream_mgr: UpStreamMgr::new(),
down_stream_msg_queue: r9::QueueLock::<DownStreamPack>::new(), down_stream_msg_queue: r9::QueueLock::<DownStreamPack>::new(),
}; };
return p; return p;

View File

@ -1,93 +1,42 @@
use std::rc::{Rc, Weak}; use std::collections::HashMap;
use std::cell::RefCell; use std::rc::Rc;
use std::sync::{Arc, Mutex};
use tokio::net::{TcpStream, ToSocketAddrs};
use tokio::io::AsyncWriteExt;
use tokio::runtime::Runtime; use tokio::runtime::Runtime;
use tokio::io::Interest; use crate::upstream::UpStream;
use std::error::Error; use crate::app::UserApp;
use std::io;
use r9_macro::SharedFromSelf;
use r9_macro_derive::SharedFromSelf;
use r9_macro_derive::Singleton;
#[derive(SharedFromSelf)]
#[derive(Singleton)]
pub struct UpStreamMgr { pub struct UpStreamMgr {
_self_wp: Weak::<RefCell::<Self>>, curr_id: i16,
key_hash: HashMap<String, Rc::<UpStream>>,
id_hash: HashMap<i32, Rc::<UpStream>>,
tokio_rt: Runtime,
} }
unsafe impl Send for UpStreamMgr{}
async fn run_app(send_queue: Arc::<Vec<Arc::<dyn ::protobuf::Message>>>) -> Result<(), Box<dyn Error>> { unsafe impl Sync for UpStreamMgr{}
let mut stream = TcpStream::connect("192.168.100.39:7617").await?;
loop {
let ready = stream.ready(Interest::READABLE | Interest::WRITABLE).await?;
if ready.is_readable() {
let mut data = vec![0; 1024];
// Try to read data, this may still fail with `WouldBlock`
// if the readiness event is a false positive.
match stream.try_read(&mut data) {
Ok(n) => {
println!("read {} bytes", n);
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
continue;
}
Err(e) => {
//return Err(e.into());
}
}
}
if ready.is_writable() {
// Try to write data, this may still fail with `WouldBlock`
// if the readiness event is a false positive.
match stream.try_write(b"hello world") {
Ok(n) => {
println!("write {} bytes", n);
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
continue
}
Err(e) => {
//return Err(e.into());
}
}
}
}
Ok(())
}
impl UpStreamMgr { impl UpStreamMgr {
pub fn instance() -> &'static Self {
return &UserApp::instance()._upstream_mgr;
}
pub fn new() -> Self { pub fn new() -> Self {
let p = UpStreamMgr{ let p = Self{
_self_wp: Default::default(), curr_id: 0,
key_hash: Default::default(),
id_hash: Default::default(),
tokio_rt: tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap(),
}; };
return p; return p;
} }
pub fn init(&mut self) { pub fn init(&self) {
let send_queue: Arc::<Vec<Arc::<dyn ::protobuf::Message>>> =
Default::default();
//let msg : Arc::<dyn ::protobuf::Message>>> = Arc::new(); }
std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_multi_thread() pub fn uninit(&self) {
.worker_threads(4)
.enable_all()
.thread_name("my-custom-name")
.thread_stack_size(3 * 1024 * 1024)
.build()
.unwrap();
rt.block_on(async {
run_app(send_queue.clone()).await;
});
});
} }