librust/f9/src/app.rs
2023-11-11 06:34:48 +00:00

266 lines
7.4 KiB
Rust

use std::rc::{Rc, Weak};
use std::cell::RefCell;
use std::time::Duration;
use tokio::runtime::Runtime;
use actix_web::{dev::ServerHandle,
rt,
web,
get,
web::Data,
HttpRequest,
HttpResponse,
HttpServer,
Responder};
use actix_web::App as WebApp;
use std::{sync::mpsc, thread};
use std::sync::{Arc, Mutex, Condvar};
use std::collections::HashMap;
use std::any::Any;
use r9_macro::SharedFromSelf;
use r9_macro_derive::SharedFromSelf;
pub type HttpRequestCb = Rc::<dyn FnMut (HttpRequest)>;
pub trait UserApp {
fn get_pkg_name(&self) -> String;
fn init(&mut self);
fn update(&mut self);
fn uninit(&mut self);
fn get_http_listen_port(&self) -> i32;
}
#[derive(SharedFromSelf)]
pub struct App {
_self_wp: Weak::<RefCell::<Self>>,
zone_id: i32,
node_id: i32,
instance_id: i32,
tokio_rt: Runtime,
user_app: Option<Rc::<RefCell::<dyn UserApp>>>,
im_msgs: Rc::<RefCell::<r9::ListHead::<IMMsgNode>>>,
im_work_msgs: Rc::<RefCell::<r9::ListHead::<IMMsgNode>>>,
im_mutex: Mutex<i32>,
webapp_state: Arc::<AppState>,
}
struct AppState {
request: Arc<Mutex<Vec<Arc<Mutex<HttpContext>>>>>,
}
#[derive(Default)]
struct IMMsgNode {
msg_id: u16,
args: Option<Vec<Rc::<dyn Any>>>,
cb: Option<Box::<dyn FnMut (&Option<Vec<Rc::<dyn Any>>>)>>,
entry: Rc::<RefCell::<r9::ListHead::<IMMsgNode>>>,
}
struct HttpContext {
id: u64,
//req: &HttpRequest,
add_tick: i64,
handled: bool,
rsp: String
}
async fn index(data: Data<Arc::<AppState>>, req: HttpRequest) -> impl Responder {
println!("http.thread.id {:?}", thread::current().id());
let context = Arc::new(
Mutex::new(HttpContext{
id: 0,
//req: &req,
add_tick: 0,
handled: false,
rsp: "".to_string()
})
);
{
let v = &mut data.request.lock().unwrap();
(*v).push(context.clone());
}
{
while !context.lock().unwrap().handled {
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
}
}
let data = context.lock().unwrap().rsp.clone();
HttpResponse::Ok().body(data)
}
async fn run_app(port: u16, app_state: Arc::<AppState>,
tx: mpsc::Sender<ServerHandle>) -> std::io::Result<()> {
let server = HttpServer::new(move || {
WebApp::new()
.data(app_state.clone())
.route("/webapp/index.php",web::get().to(index))
})
.bind(("0.0.0.0", port))?
.workers(1)
.run();
let _ = tx.send(server.handle());
server.await
}
impl App {
pub fn instance() -> Rc::<RefCell::<Self>> {
static mut _INSTANCE: Option<Rc::<RefCell::<App>>> = None;
unsafe {
match &_INSTANCE {
Some(v) => {
return v.clone();
}
None => {
_INSTANCE = Some(Rc::new(RefCell::new(
App{
zone_id: 0,
node_id: 0,
instance_id: 0,
tokio_rt: Runtime::new().unwrap(),
user_app: None,
im_msgs: r9::ListHead::<IMMsgNode>::new_head(),
im_work_msgs: r9::ListHead::<IMMsgNode>::new_head(),
im_mutex: Mutex::new(1),
webapp_state: Arc::new(
AppState{
request: Default::default()
}
),
_self_wp: Default::default(),
}
)));
_INSTANCE.clone().unwrap().borrow_mut()._self_wp =
Rc::downgrade(&_INSTANCE.clone().unwrap());
}
}
return _INSTANCE.clone().unwrap().clone();
}
}
pub fn init(&mut self, user_app: Rc::<RefCell::<dyn UserApp>>) {
println!("main.thread.id {:?}", thread::current().id());
self.user_app = Some(user_app);
self.tokio_rt.enter();
self.init_http_server();
crate::Timer::instance().borrow_mut().init();
self.user_app.as_ref().unwrap().borrow_mut().init();
}
pub fn uninit(&mut self) {
self.user_app.as_ref().unwrap().borrow_mut().uninit();
crate::Timer::instance().borrow_mut().uninit();
}
pub fn run(&mut self) {
loop {
crate::Timer::instance().borrow_mut().update();
std::thread::sleep(Duration::from_millis(1));
self.dispatch_immsg();
self.dispatch_httprequest();
}
}
pub fn get_pkg_name(&self) -> String {
return self.user_app.as_ref().unwrap().borrow().get_pkg_name();
}
pub fn new_uuid(&self) -> i64 {
return 0;
}
pub fn get_instance_id(&self) -> i32 {
return self.instance_id;
}
pub fn get_zone_id(&self) -> i32 {
return self.zone_id;
}
pub fn get_node_id(&self) -> i32 {
return self.node_id;
}
pub fn has_flag(&self) -> bool {
return false;
}
pub fn get_now_second(&self) -> i64 {
return 0;
}
pub fn get_now_millis(&self) -> i64 {
return 0;
}
pub fn get_time_offset(&self) -> i64 {
return 0;
}
pub fn add_immsg(&mut self, msg_id: u16, args: Option<Vec<Rc::<dyn Any>>>,
cb: Box::<dyn FnMut (&Option<Vec<Rc::<dyn Any>>>)>) {
let node = IMMsgNode{
msg_id: msg_id,
args: args,
cb: Some(cb),
entry: Default::default(),
};
self.im_mutex.lock();
r9::ListHead::add_tail(&self.im_msgs, &node.entry);
}
fn dispatch_immsg(&mut self) {
{
self.im_mutex.lock();
if !self.im_msgs.borrow().empty() {
r9::ListHead::replace_init(&self.im_msgs, &self.im_work_msgs);
}
}
while !self.im_work_msgs.borrow().empty() {
{
let node = &self.im_work_msgs.borrow().first_entry();
match &mut node.upgrade().unwrap().borrow_mut().cb {
Some(v) => {
(*v)(&node.upgrade().unwrap().borrow_mut().args);
}
None => {
}
}
node.upgrade().unwrap().borrow_mut().entry.borrow_mut().del_init();
}
}
}
fn dispatch_httprequest(&mut self) {
{
let v = &mut self.webapp_state.request.lock().unwrap();
if v.len() > 0 {
let c = v.pop();
match c {
Some(v) => {
v.lock().unwrap().rsp = "dsafsf".to_string();
v.lock().unwrap().handled = true;
}
None => {
}
}
}
}
}
fn init_http_server(&mut self) {
let (tx, _) = mpsc::channel();
let port = self.user_app.as_ref().unwrap().borrow().get_http_listen_port() as u16;
let app_state = self.webapp_state.clone();
thread::spawn(move || {
let server_future = run_app(port, app_state, tx);
rt::System::new().block_on(server_future)
});
}
}