From e0d85a137a1036929076eb44ff0442106dfad0b3 Mon Sep 17 00:00:00 2001 From: azw Date: Sat, 11 May 2024 23:11:26 +0800 Subject: [PATCH] 1 --- server/stat/Cargo.toml | 1 + server/stat/src/app/user_app.rs | 3 ++- server/stat/src/listener/wslistener.rs | 33 ++++++++++++++++++++------ 3 files changed, 29 insertions(+), 8 deletions(-) diff --git a/server/stat/Cargo.toml b/server/stat/Cargo.toml index ec542b4..5737d93 100644 --- a/server/stat/Cargo.toml +++ b/server/stat/Cargo.toml @@ -21,6 +21,7 @@ r9_macro = { path = "../../third_party/librust/r9_macro" } r9_macro_derive = { path = "../../third_party/librust/r9_macro_derive" } f9 = { path = "../../third_party/librust/f9" } bytes = "0.5.6" +futures = "0.3.30" [build-dependencies] protobuf-codegen-pure = "2.9.0" diff --git a/server/stat/src/app/user_app.rs b/server/stat/src/app/user_app.rs index 2bdb36d..b16384b 100644 --- a/server/stat/src/app/user_app.rs +++ b/server/stat/src/app/user_app.rs @@ -14,7 +14,7 @@ use r9::xtimer::TimerList; pub struct DownStreamPack { holder: Option>>, - socket_handle: u16, + pub socket_handle: u16, pub data: bytes::BytesMut, pub entry: Rc::>>, } @@ -63,6 +63,7 @@ impl f9::app::UserApp for UserApp { while !work_list.borrow().empty() { let node = &work_list.borrow().first_entry(); node.upgrade().unwrap().borrow_mut().entry.borrow_mut().del_init(); + WsListener::instance().borrow().add_down_stream_pack(node.clone()); } } diff --git a/server/stat/src/listener/wslistener.rs b/server/stat/src/listener/wslistener.rs index 059ed6e..d5e605e 100644 --- a/server/stat/src/listener/wslistener.rs +++ b/server/stat/src/listener/wslistener.rs @@ -14,14 +14,16 @@ use actix_web::{ HttpServer, Error }; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc}; +use tokio::sync::Mutex; use std::thread; use std::thread::JoinHandle; use std::time::Duration; use actix::prelude::*; use bytes::{BufMut, BytesMut}; use actix_web::web::Bytes; -use tokio::io::Interest; +use futures::FutureExt; +use tokio::io::{AsyncReadExt, Interest}; use crate::app::UserApp; use crate::{AppStateWithCounter, GSResponse, MAX_PACKET_LEN}; use tokio::net::TcpStream; @@ -36,9 +38,9 @@ pub struct WsListener { work_thread: Option>, socket_hash: HashMap>, cur_socket_handle: u16, - upstream: Arc>>, + pub upstream: std::sync::Weak>, tokio_rt: Runtime, - down_pack_queue: Arc>>, + down_pack_queue: Arc>>, } #[derive(Message)] @@ -177,12 +179,13 @@ async fn index(req: HttpRequest, stream: web::Payload) -> Result>>,) { +async fn upstream_enter(stream_arc: Arc>, down_pack_queue: Arc>>,) { println!("upstream_enter1"); tokio::spawn(async move { println!("upstream_enter2"); loop { tokio::time::sleep(Duration::from_millis(10)).await; + let mut stream = stream_arc.lock().await; let ready = stream.ready(Interest::READABLE | Interest::WRITABLE).await; match ready { Ok(r) => { @@ -217,7 +220,9 @@ async fn upstream_enter(mut stream: TcpStream, down_pack_queue: Arc { //upstream.lock().unwrap().replace(v); + let a = Arc::new(Mutex::new(v)); + //upstream = Arc::downgrade(&a); + WsListener::instance().borrow_mut().upstream = Arc::downgrade(&a); println!("connect ok"); - upstream_enter(v, down_pack_queue).await; + upstream_enter(a, down_pack_queue).await; }, Err(e) => { println!("connect err") @@ -309,6 +317,17 @@ impl WsListener { } + pub fn add_down_stream_pack(&self, data: Weak::>) { + let socket_handle = data.upgrade().unwrap().borrow().socket_handle; + match self.socket_hash.get(&socket_handle) { + Some(socket) => { + let a = 0; + } + None => { + let a = 0; + } + } + } fn on_connect(&mut self, conn: &mut WsConn, socket: Recipient) { self.cur_socket_handle += 1; conn.socket_handle = self.cur_socket_handle;