From 28c89a6d50905686935c030a42eeeed096d44a95 Mon Sep 17 00:00:00 2001 From: azw Date: Sun, 12 May 2024 20:52:27 +0800 Subject: [PATCH] 1 --- server/stat/src/app/user_app.rs | 1 - server/stat/src/listener/wslistener.rs | 181 ------------------------- 2 files changed, 182 deletions(-) diff --git a/server/stat/src/app/user_app.rs b/server/stat/src/app/user_app.rs index f68b276..1f1bc40 100644 --- a/server/stat/src/app/user_app.rs +++ b/server/stat/src/app/user_app.rs @@ -48,7 +48,6 @@ impl f9::app::UserApp for UserApp { None => {} } } - WsListener::instance().update(); UpStreamMgr::instance().update(); } diff --git a/server/stat/src/listener/wslistener.rs b/server/stat/src/listener/wslistener.rs index 076232b..8aab5a2 100644 --- a/server/stat/src/listener/wslistener.rs +++ b/server/stat/src/listener/wslistener.rs @@ -13,16 +13,13 @@ use std::sync::{Arc}; use tokio::sync::Mutex; use std::thread; use std::thread::JoinHandle; -use std::time::Duration; use actix::prelude::*; use bytes::{BufMut, BytesMut}; -use tokio::io::{Interest}; use crate::app::UserApp; use crate::constant; use tokio::net::TcpStream; use tokio::runtime::Runtime; use crate::common::types::{DownStreamPack, DownStreamMessage, UpStreamPack}; -use tokio::io::{AsyncWriteExt}; use std::sync::OnceLock; use std::sync::atomic::{AtomicU16, Ordering}; @@ -172,125 +169,6 @@ async fn index(req: HttpRequest, stream: web::Payload) -> Result>, down_pack_queue: Arc>>, - up_pack_queue: Arc>>) { - tokio::spawn(async move { - println!("upstream_enter2"); - let mut recv_buf = BytesMut::with_capacity(1024 * 64 * 3); - let mut recv_buf_len = 0; - { - for i in 0..recv_buf.capacity() { - recv_buf.put_i8(0); - } - } - loop { - tokio::time::sleep(Duration::from_millis(10)).await; - let mut stream = stream_arc.lock().await; - let ready = stream.ready(Interest::READABLE | Interest::WRITABLE).await; - match ready { - Ok(r) => { - { - if r.is_readable() { - // Try to read data, this may still fail with `WouldBlock` - // if the readiness event is a false positive. - let mut data = vec![0; 1024 * 64]; - match stream.try_read(&mut data) { - Ok(len) => { - println!("read {0} bytes recv_buf_len:{1}",len, recv_buf_len); - for i in 0..len { - recv_buf[recv_buf_len + i] = data[i]; - //println!("sendmsg xxxxx idx:{0} val:{1} buf_val:{2} recv_buf_len:{3}", i, data[i], recv_buf[recv_buf_len + 1], recv_buf_len); - } - recv_buf_len += len; - while recv_buf_len >= 16 { - let pack_len = (recv_buf[0] as u16) + ((recv_buf[1] as u16) << 8); - let msg_id = (recv_buf[2] as u16) + ((recv_buf[3] as u16) << 8); - let seq_id = (recv_buf[4] as i32) + ((recv_buf[5] as i32) << 8) + - ((recv_buf[7] as i32) << 16) + ((recv_buf[7] as i32) << 24); - let magic_code = (recv_buf[8] as u16) + ((recv_buf[9] as u16) << 8); - let rpc_error_code = (recv_buf[10] as u16) + ((recv_buf[11] as u16) << 8); - let socket_handle = (recv_buf[12] as u16) + ((recv_buf[13] as u16) << 8); - let ext_len = (recv_buf[14] as u16) + ((recv_buf[15] as u16) << 8); - if ((pack_len + 16) as usize) > recv_buf_len { - break; - } - let mut tmp_bytes = BytesMut::with_capacity((pack_len + 16) as usize); - { - tmp_bytes.put_u16_le(pack_len); - tmp_bytes.put_u16_le(msg_id); - tmp_bytes.put_i32_le(seq_id); - tmp_bytes.put_u16_le(magic_code); - tmp_bytes.put_u16_le(ext_len); - for i in 0..pack_len { - tmp_bytes.put_u8(recv_buf[(16 + i) as usize]); - } - let mut a_bytes = actix_web::web::Bytes::copy_from_slice(tmp_bytes.as_ref()); - { - let node = UpStreamPack::new(socket_handle, tmp_bytes); - up_pack_queue.lock().unwrap().push(&node.lock().unwrap().entry); - } - //a.do_send(msg); - - { - println!("sendmsg socket_handle:{0} pack_len:{1} msg_id:{2} seq_id:{3} magic_code:{4} ext_len:{5}", - socket_handle, - pack_len, - msg_id, - seq_id, - magic_code, - ext_len); - } - } - for i in (pack_len + 16) as usize..recv_buf_len { - recv_buf[i - (pack_len + 16) as usize] = recv_buf[i]; - } - recv_buf_len -= (pack_len + 16) as usize; - } - } - Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => { - println!("read error 1"); - continue; - } - Err(e) => { - println!("read error 2"); - //return Err(e.into()); - } - } - } - } - - { - if r.is_writable() { - let mut data = bytes::BytesMut::new(); - { - down_pack_queue.lock().unwrap().fetch(); - //down_pack_queue.get_mut().fetch(); - let work_list = down_pack_queue.lock().unwrap().work_list.lock().unwrap().clone(); - //let work_list = down_pack_queue.get_mut().work_list.lock().unwrap().clone(); - while !work_list.lock().unwrap().empty() { - let node = &work_list.lock().unwrap().first_entry(); - node.upgrade().unwrap().lock().unwrap().entry.lock().unwrap().del_init(); - data = node.upgrade().unwrap().lock().unwrap().data.clone(); - break; - //stream.write_all(node.upgrade().unwrap().borrow().data.as_ref()).await; - //stream.write_all(data.as_ref()).await; - } - } - if data.len() > 0 { - println!("write_all2 len:{0}", data.len()); - stream.write_all(data.as_ref()).await; - } - } - } - } - Err(e) => { - - } - } - } - }); -} - impl WsListener { pub fn instance() -> &'static Self { @@ -338,73 +216,14 @@ impl WsListener { .await }); })); - /* - { - let down_pack_queue = self.down_pack_queue.clone(); - let up_pack_queue = self.up_pack_queue.clone(); - self.tokio_rt.spawn(async move { - let mut result = TcpStream::connect("192.168.100.39:7616").await; - match result { - Ok(v) => { - let a = Arc::new(Mutex::new(v)); - WsListener::instance().upstream.lock().unwrap().replace(a.clone()); - println!("connect ok"); - upstream_enter(a, down_pack_queue, up_pack_queue).await; - }, - Err(e) => { - println!("connect err") - } - } - }); - }*/ println!("hello3"); } } - pub fn update(&self) { - return; - let mut data = bytes::BytesMut::new(); - let mut socket_handle = 0; - { - self.up_pack_queue.lock().unwrap().fetch(); - //down_pack_queue.get_mut().fetch(); - let work_list = self.up_pack_queue.lock().unwrap().work_list.lock().unwrap().clone(); - //let work_list = down_pack_queue.get_mut().work_list.lock().unwrap().clone(); - while !work_list.lock().unwrap().empty() { - let node = &work_list.lock().unwrap().first_entry(); - node.upgrade().unwrap().lock().unwrap().entry.lock().unwrap().del_init(); - data = node.upgrade().unwrap().lock().unwrap().data.clone(); - socket_handle = node.upgrade().unwrap().lock().unwrap().socket_handle; - break; - //stream.write_all(node.upgrade().unwrap().borrow().data.as_ref()).await; - //stream.write_all(data.as_ref()).await; - } - } - if data.len() > 0 { - println!("write_all3 socket_handle:{0} len:{1}", socket_handle, data.len()); - match self.socket_hash.lock().unwrap().get(&socket_handle){ - Some(v) => { - let msg = DownStreamMessage{ - data: actix_web::web::Bytes::copy_from_slice(data.as_ref()) - }; - v.do_send(msg); - }, - None => { - - } - } - } - } - pub fn uninit(&self) { } - fn add_down_stream_pack(&self, data: std::sync::Weak::>) { - println!("add_down_stream_pack2 len:{0}", data.upgrade().unwrap().lock().unwrap().data.len()); - let socket_handle = data.upgrade().unwrap().lock().unwrap().socket_handle; - self.down_pack_queue.lock().as_ref().unwrap().push(&data.upgrade().unwrap().lock().unwrap().entry); - } fn on_connect(&self, conn: &mut WsConn, socket: Recipient) { self.cur_socket_handle.fetch_add(1, Ordering::Relaxed); conn.socket_handle = self.cur_socket_handle.load(Ordering::Relaxed);