diff --git a/server/stat/src/listener/wslistener.rs b/server/stat/src/listener/wslistener.rs index c0caa7f..cad9d65 100644 --- a/server/stat/src/listener/wslistener.rs +++ b/server/stat/src/listener/wslistener.rs @@ -25,12 +25,16 @@ use actix_web::web::Bytes; use futures::FutureExt; use tokio::io::{AsyncReadExt, Interest}; use crate::app::UserApp; -use crate::{AppStateWithCounter, GSResponse, MAX_PACKET_LEN}; +use crate::{MAX_PACKET_LEN}; use tokio::net::TcpStream; use tokio::runtime::Runtime; use crate::app::user_app::DownStreamPack; use tokio::io::{AsyncWriteExt, Ready}; +struct AppStateWithCounter { + counter: std::sync::Mutex, // <- Mutex is necessary to mutate safely across threadsds +} + #[derive(SharedFromSelf)] #[derive(Singleton)] pub struct WsListener { diff --git a/server/stat/src/main.rs b/server/stat/src/main.rs index f156dec..962e640 100755 --- a/server/stat/src/main.rs +++ b/server/stat/src/main.rs @@ -1,4 +1,3 @@ -use std::future::IntoFuture; use f9::app::App; mod app; @@ -7,330 +6,9 @@ mod upstream; mod downstream; mod ss; use crate::app::UserApp; -use actix_web::{web, App as ActixApp, get, post, Responder, Error, HttpServer, HttpRequest, HttpResponse}; -use std::sync::mpsc; -use std::thread; -use actix_rt::System; -use std::time::Duration; -use actix_web_actors::ws; -use actix::{Actor, AsyncContext, StreamHandler}; -use std::sync::Mutex; -use actix::prelude::*; -use bytes::{Buf, BufMut, Bytes}; -use bytes::BytesMut; -use tokio::io::{AsyncWriteExt, Ready}; -use tokio::net::TcpStream; -use std::sync::Arc; -use tokio::io::Interest; -use std::vec::Vec; -use std::cell::RefCell; -use std::io::Read; -use std::io; -use std::cell::Cell; const MAX_PACKET_LEN: usize = 1024 * 64; -/// Define HTTP actor -struct MyWs { - recv_buf_len: usize, - recv_buf: BytesMut, - send_queue: Arc>>>, - recv_queue: Arc>>>, -} - -impl Actor for MyWs { - type Context = ws::WebsocketContext; -} - -impl MyWs { - fn parse_pkt(&mut self, bin: &actix_web::web::Bytes) { - let buf_len = bin.len(); - if buf_len > 0 { - let mut already_read_bytes: usize = 0; - { - for i in 0..1024 * 64 * 2 { - self.recv_buf.put_i8(0); - } - } - while true { - let mut read_bytes = buf_len - already_read_bytes; - if read_bytes > MAX_PACKET_LEN - self.recv_buf_len { - read_bytes = MAX_PACKET_LEN - self.recv_buf_len; - } - if read_bytes > 0 { - self.recv_buf[self.recv_buf_len..bin.len()].copy_from_slice(&bin); - self.recv_buf_len += read_bytes; - already_read_bytes += read_bytes; - } - - let mut offset = 0; - let mut pre_offect = 0; - while true { - pre_offect = offset; - { - let pack_len = (self.recv_buf[0] as u16)+ ((self.recv_buf[1] as u16) << 8); - let msg_id = (self.recv_buf[2] as u16)+ ((self.recv_buf[3] as u16) << 8); - let seq_id = (self.recv_buf[4] as i32)+ ((self.recv_buf[5] as i32) << 8) + - ((self.recv_buf[7] as i32) << 16)+ ((self.recv_buf[7] as i32) << 24); - let magic_code = (self.recv_buf[8] as u16)+ ((self.recv_buf[9] as u16) << 8); - let ext_len = (self.recv_buf[10] as u16)+ ((self.recv_buf[11] as u16) << 8); - let mut tmp_bytes = BytesMut::with_capacity((pack_len + 24) as usize); - { - tmp_bytes.put_u16_le(pack_len); - tmp_bytes.put_u16_le(msg_id); - tmp_bytes.put_i32_le(seq_id); - tmp_bytes.put_u16_le(magic_code); - tmp_bytes.put_u16_le(ext_len); - tmp_bytes.put_i32_le(0); - tmp_bytes.put_i64_le(0); - for i in 24..tmp_bytes.capacity() { - tmp_bytes.put_i8(0); - } - tmp_bytes[24 .. (24+ pack_len as usize)].copy_from_slice(&bin[12..(12 + pack_len as usize)]); - println!("recv buf_len:{0}", tmp_bytes.len()); - } - { - Arc::downgrade(&mut self.send_queue).upgrade().unwrap().lock().unwrap().borrow_mut().insert(0, tmp_bytes); - } - offset += (pack_len as usize) + 12; - } - if pre_offect >= offset || offset >= self.recv_buf_len { - break; - } - }//end while - if offset > 0 && offset < self.recv_buf_len { - self.recv_buf[..0].copy_from_slice(&bin); - } - self.recv_buf_len -= offset; - if self.recv_buf_len > 0 { - self.recv_buf_len = 0; - } - if already_read_bytes >= bin.len() { - already_read_bytes = 0; - break; - } - } - } - } - -} - -/// Handler for ws::Message message -impl StreamHandler> for MyWs { - - fn started(&mut self, ctx: &mut Self::Context) { - let a: Recipient = ctx.address().recipient(); - - let mut recv_buf = BytesMut::with_capacity(1024 * 64 * 3); - let mut recv_buf_len = 0; - { - for i in 0..recv_buf.capacity() { - recv_buf.put_i8(0); - } - } - let mut recv_queue = self.recv_queue.clone(); - tokio::spawn(async move { - loop { - while Arc::downgrade(&mut recv_queue).upgrade().unwrap().lock().unwrap().borrow().len() > 0 { - let bytes = Arc::downgrade(&mut recv_queue).upgrade().unwrap().lock().unwrap().borrow_mut().pop(); - let len = bytes.clone().unwrap().clone().len(); - { - //println!("sendmsg 1111111111111111111111111111111 len:{0}", len); - } - { - let data = bytes.unwrap(); - for i in 0..len { - recv_buf[recv_buf_len + i] = data[i]; - //println!("sendmsg xxxxx idx:{0} val:{1} buf_val:{2} recv_buf_len:{3}", i, data[i], recv_buf[recv_buf_len + 1], recv_buf_len); - } - recv_buf_len += len; - while recv_buf_len >= 16 { - let pack_len = (recv_buf[0] as u16) + ((recv_buf[1] as u16) << 8); - let msg_id = (recv_buf[2] as u16) + ((recv_buf[3] as u16) << 8); - let seq_id = (recv_buf[4] as i32) + ((recv_buf[5] as i32) << 8) + - ((recv_buf[7] as i32) << 16) + ((recv_buf[7] as i32) << 24); - let magic_code = (recv_buf[8] as u16) + ((recv_buf[9] as u16) << 8); - let rpc_error_code = (recv_buf[10] as u16) + ((recv_buf[11] as u16) << 8); - let socket_handle = (recv_buf[12] as u16) + ((recv_buf[13] as u16) << 8); - let ext_len = (recv_buf[14] as u16) + ((recv_buf[15] as u16) << 8); - if ((pack_len + 16) as usize) > recv_buf_len { - break; - } - let mut tmp_bytes = BytesMut::with_capacity((pack_len + 16) as usize); - { - tmp_bytes.put_u16_le(pack_len); - tmp_bytes.put_u16_le(msg_id); - tmp_bytes.put_i32_le(seq_id); - tmp_bytes.put_u16_le(magic_code); - tmp_bytes.put_u16_le(ext_len); - for i in 0..pack_len { - tmp_bytes.put_u8(recv_buf[(16 + i) as usize]); - } - let mut a_bytes = actix_web::web::Bytes::copy_from_slice(tmp_bytes.as_ref()); - let msg = GSResponse { - data: a_bytes - }; - a.do_send(msg); - /* - { - println!("sendmsg pack_len:{0} msg_id:{1} seq_id:{2} magic_code:{3} ext_len:{4}", - pack_len, - msg_id, - seq_id, - magic_code, - ext_len); - }*/ - } - for i in (pack_len + 16) as usize..recv_buf_len { - recv_buf[i - (pack_len + 16) as usize] = recv_buf[i]; - } - recv_buf_len -= (pack_len + 16) as usize; - } - } - } - tokio::time::sleep(Duration::from_millis(10)).await; - } - }); - } - - - fn handle(&mut self, msg: Result, ctx: &mut Self::Context) { - match msg { - Ok(ws::Message::Ping(msg)) => ctx.pong(&msg), - Ok(ws::Message::Text(text)) => ctx.text(text), - Ok(ws::Message::Binary(bin)) => { - self.parse_pkt(&bin); - }, - _ => (), - } - } -} - -#[derive(Message)] -#[rtype(result = "()")] -pub struct GSResponse { - data: actix_web::web::Bytes, -} -impl Handler for MyWs { - type Result = (); - - fn handle(&mut self, msg: GSResponse, ctx: &mut Self::Context) -> Self::Result { - ctx.binary(msg.data); - } -} - -struct AppStateWithCounter { - counter: Mutex, // <- Mutex is necessary to mutate safely across threads -} - -async fn index(req: HttpRequest, stream: web::Payload, data: web::Data) -> Result { - let mut counter = data.counter.lock().unwrap(); // <- get counter's MutexGuard - *counter += 1; // <- access counter inside MutexGuard - - format!("Request number: {counter}"); // <- response with count - let ws_client = MyWs { - recv_buf_len: 0, - recv_buf: BytesMut::with_capacity(1024 * 64 * 2), - send_queue: Default::default(), - recv_queue: Default::default(), - //addr: Default::default(), - }; - { - let mut down_stream = TcpStream::connect("192.168.100.39:7616").await?; - down_stream.set_nodelay(true); - //down_stream.set_nonblocking(true)?; - let mut send_queue = ws_client.send_queue.clone(); - let mut recv_queue = ws_client.recv_queue.clone(); - tokio::spawn(async move { - loop { - tokio::time::sleep(Duration::from_millis(10)).await; - let ready = down_stream.ready(Interest::READABLE | Interest::WRITABLE).await; - match ready { - Ok(r) => { - { - if r.is_readable() { - let mut data = vec![0; 1024 * 64]; - // Try to read data, this may still fail with `WouldBlock` - // if the readiness event is a false positive. - match down_stream.try_read(&mut data) { - Ok(n) => { - //println!("read {} bytes", n); - let mut tmp_bytes = BytesMut::with_capacity((n + 0) as usize); - for i in 0..n { - tmp_bytes.put_u8(data[i]); - } - Arc::downgrade(&mut recv_queue).upgrade().unwrap().lock().unwrap().borrow_mut().insert(0, tmp_bytes); - } - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - println!("read error 1"); - continue; - } - Err(e) => { - println!("read error 2"); - //return Err(e.into()); - } - } - } - } - - { - if r.is_writable() { - while Arc::downgrade(&mut send_queue).upgrade().unwrap().lock().unwrap().borrow().len() > 0 { - //Arc::downgrade(&mut send_queue).upgrade().unwrap().lock().unwrap().borrow_mut()[0] - let bytes = Arc::downgrade(&mut send_queue).upgrade().unwrap().lock().unwrap().borrow_mut().pop(); - let len = bytes.clone().unwrap().clone().len(); - { - //println!("sendmsg len:{0}", len); - } - { - down_stream.write_all(bytes.unwrap().as_ref()).await; - } - } - } - } - } - Err(e) => { - - } - } - } - }); - } - let resp = ws::start(ws_client, &req, stream); - println!("{:?}", resp); - resp -} - -fn main1() { - let t1 = thread::spawn(||{ - //let mut rt = tokio::runtime::Runtime::new().unwrap(); - let mut rt = tokio::runtime::Builder::new_multi_thread() - .enable_all() - .build() - .unwrap(); - rt.block_on(async { - let counter = web::Data::new(AppStateWithCounter { - counter: Mutex::new(0), - }); - HttpServer::new(move || { - // move counter into the closure - ActixApp::new() - .app_data(counter.clone()) // <- register the created data - .route("/", web::get().to(index)) - }) - .bind(("0.0.0.0", 8080))? - .run() - .await - }); - println!("hello2"); - }); - - println!("hello3"); - while true { - thread::sleep(Duration::from_secs(10)); - } -} - fn main() { App::init(UserApp::instance()); App::run(); diff --git a/third_party/librust b/third_party/librust index c950f6d..902db5e 160000 --- a/third_party/librust +++ b/third_party/librust @@ -1 +1 @@ -Subproject commit c950f6d2883a3213a17d514eded9fdcc86f400b5 +Subproject commit 902db5e1bf1789c5055b0e9015f513a409cac4c6