From 13a4fb34d905ae78c72826575679f69417e6c9ee Mon Sep 17 00:00:00 2001 From: azw Date: Thu, 9 May 2024 08:46:49 +0800 Subject: [PATCH] 1 --- server/stat/src/listener/wslistener.rs | 120 +++++++++++++++++++++---- 1 file changed, 103 insertions(+), 17 deletions(-) diff --git a/server/stat/src/listener/wslistener.rs b/server/stat/src/listener/wslistener.rs index c7f489f..2394814 100644 --- a/server/stat/src/listener/wslistener.rs +++ b/server/stat/src/listener/wslistener.rs @@ -1,4 +1,3 @@ -use std::time::{Duration, Instant}; use std::rc::{Rc, Weak}; use std::cell::RefCell; use r9_macro::SharedFromSelf; @@ -7,23 +6,21 @@ use r9_macro_derive::Singleton; use actix::{Actor, StreamHandler}; use actix_web_actors::ws::{self, Message}; use actix_web::{ - dev::ServerHandle, - rt, web, App as ActixApp, - web::Data, - web::Query, HttpRequest, HttpResponse, HttpServer, - Error, - Responder}; + Error + }; use std::sync::{Arc, Mutex}; use std::thread; use std::thread::JoinHandle; use actix::prelude::*; -use bytes::BytesMut; -use crate::AppStateWithCounter; +use bytes::{BufMut, BytesMut}; +use actix_web::web::Bytes; +use crate::app::UserApp; +use crate::MAX_PACKET_LEN; #[derive(SharedFromSelf)] #[derive(Singleton)] @@ -32,6 +29,12 @@ pub struct WsListener { work_thread: Option>, } +#[derive(Message)] +#[rtype(result = "()")] +pub struct DownStreamMessage { + data: actix_web::web::Bytes, +} + /// Define HTTP actor pub struct WsConn { recv_buf_len: usize, @@ -42,18 +45,105 @@ impl Actor for WsConn { type Context = ws::WebsocketContext; } +impl WsConn { + fn parse_pkt(&mut self, bin: &actix_web::web::Bytes, ctx: &mut ::Context) { + let buf_len = bin.len(); + if buf_len > 0 { + let mut already_read_bytes: usize = 0; + { + for i in 0..1024 * 64 * 2 { + self.recv_buf.put_i8(0); + } + } + while true { + let mut read_bytes = buf_len - already_read_bytes; + if read_bytes > MAX_PACKET_LEN - self.recv_buf_len { + read_bytes = MAX_PACKET_LEN - self.recv_buf_len; + } + if read_bytes > 0 { + self.recv_buf[self.recv_buf_len..bin.len()].copy_from_slice(&bin); + self.recv_buf_len += read_bytes; + already_read_bytes += read_bytes; + } + + let mut offset = 0; + let mut pre_offect = 0; + while true { + pre_offect = offset; + { + let pack_len = (self.recv_buf[0] as u16)+ ((self.recv_buf[1] as u16) << 8); + let msg_id = (self.recv_buf[2] as u16)+ ((self.recv_buf[3] as u16) << 8); + let seq_id = (self.recv_buf[4] as i32)+ ((self.recv_buf[5] as i32) << 8) + + ((self.recv_buf[7] as i32) << 16)+ ((self.recv_buf[7] as i32) << 24); + let magic_code = (self.recv_buf[8] as u16)+ ((self.recv_buf[9] as u16) << 8); + let ext_len = (self.recv_buf[10] as u16)+ ((self.recv_buf[11] as u16) << 8); + let mut tmp_bytes = BytesMut::with_capacity((pack_len + 24) as usize); + { + tmp_bytes.put_u16_le(pack_len); + tmp_bytes.put_u16_le(msg_id); + tmp_bytes.put_i32_le(seq_id); + tmp_bytes.put_u16_le(magic_code); + tmp_bytes.put_u16_le(ext_len); + tmp_bytes.put_i32_le(0); + tmp_bytes.put_i64_le(0); + for i in 24..tmp_bytes.capacity() { + tmp_bytes.put_i8(0); + } + tmp_bytes[24 .. (24+ pack_len as usize)].copy_from_slice(&bin[12..(12 + pack_len as usize)]); + println!("recv buf_len:{0}", tmp_bytes.len()); + } + { + UserApp::instance().borrow_mut().add_down_stream_pack(tmp_bytes); + } + offset += (pack_len as usize) + 12; + } + if pre_offect >= offset || offset >= self.recv_buf_len { + break; + } + }//end while + if offset > 0 && offset < self.recv_buf_len { + self.recv_buf[..0].copy_from_slice(&bin); + } + self.recv_buf_len -= offset; + if self.recv_buf_len > 0 { + self.recv_buf_len = 0; + } + if already_read_bytes >= bin.len() { + already_read_bytes = 0; + break; + } + } + } + } + +} + impl StreamHandler> for WsConn { fn handle(&mut self, item: Result, ctx: &mut Self::Context) { match item { - Ok(Message::Text(text)) => ctx.text(text), - Ok(Message::Ping(msg)) => ctx.pong(&msg), - Ok(Message::Binary(bin)) => ctx.binary(bin), + Ok(Message::Text(text)) => { + + }, + Ok(Message::Ping(msg)) => { + ctx.pong(&msg) + }, + Ok(Message::Binary(bin)) => { + self.parse_pkt(&bin, ctx); + } Ok(Message::Close(reason)) => ctx.close(reason), _ => (), } } } +impl Handler for WsConn { + type Result = (); + + fn handle(&mut self, msg: DownStreamMessage, ctx: &mut Self::Context) -> Self::Result { + ctx.binary(msg.data); + } +} + async fn index(req: HttpRequest, stream: web::Payload) -> Result { let mut conn = WsConn{ recv_buf_len: 0, @@ -84,13 +174,9 @@ impl WsListener { .build() .unwrap(); rt.block_on(async { - let counter = web::Data::new(AppStateWithCounter { - counter: Mutex::new(0), - }); HttpServer::new(move || { // move counter into the closure - ActixApp::new() - .app_data(counter.clone()) // <- register the created data + ActixApp::new() // <- register the created data .route("/", web::get().to(crate::index)) }) .bind(("0.0.0.0", 8080))?