diff --git a/server/stat/Cargo.toml b/server/stat/Cargo.toml index 4c095df..ec542b4 100644 --- a/server/stat/Cargo.toml +++ b/server/stat/Cargo.toml @@ -20,6 +20,7 @@ r9 = { path = "../../third_party/librust/r9" } r9_macro = { path = "../../third_party/librust/r9_macro" } r9_macro_derive = { path = "../../third_party/librust/r9_macro_derive" } f9 = { path = "../../third_party/librust/f9" } +bytes = "0.5.6" [build-dependencies] protobuf-codegen-pure = "2.9.0" diff --git a/server/stat/src/main.rs b/server/stat/src/main.rs index 9b79ade..677e3a7 100755 --- a/server/stat/src/main.rs +++ b/server/stat/src/main.rs @@ -1,3 +1,4 @@ +use std::future::IntoFuture; use f9::app::App; mod app; @@ -12,10 +13,20 @@ use std::thread; use actix_rt::System; use std::time::Duration; use actix_web_actors::ws; -use actix::{Actor, StreamHandler}; +use actix::{Actor, AsyncContext, StreamHandler}; +use std::sync::Mutex; +use actix::prelude::*; +use bytes::{Buf, BufMut, Bytes}; +use bytes::BytesMut; +//use serde::de::Unexpected::Bytes; + +const MAX_PACKET_LEN: usize = 1024 * 64; /// Define HTTP actor -struct MyWs; +struct MyWs { + recv_buf_len: usize, + recv_buf: BytesMut, +} impl Actor for MyWs { type Context = ws::WebsocketContext; @@ -23,27 +34,106 @@ impl Actor for MyWs { /// Handler for ws::Message message impl StreamHandler> for MyWs { + + fn started(&mut self, ctx: &mut Self::Context) { + let a = ctx.address().recipient(); + //let b = 100; + } + fn handle(&mut self, msg: Result, ctx: &mut Self::Context) { match msg { Ok(ws::Message::Ping(msg)) => ctx.pong(&msg), Ok(ws::Message::Text(text)) => ctx.text(text), - Ok(ws::Message::Binary(bin)) => ctx.binary(bin), + Ok(ws::Message::Binary(bin)) => { + let buf_len = bin.len(); + if buf_len > 0 { + let mut already_read_bytes: usize = 0; + { + for i in 0..1024*64*2 { + self.recv_buf.put_i8(0); + } + } + while true { + let mut read_bytes = buf_len - already_read_bytes; + if read_bytes > MAX_PACKET_LEN - self.recv_buf_len { + read_bytes = MAX_PACKET_LEN - self.recv_buf_len; + } + if read_bytes > 0 { + self.recv_buf[self.recv_buf_len..bin.len()].copy_from_slice(&bin); + self.recv_buf_len += read_bytes; + already_read_bytes += read_bytes; + } + + let mut offset = 0; + let mut pre_offect = 0; + while true { + pre_offect = offset; + { + let pack_len = (self.recv_buf[0] as u16)+ ((self.recv_buf[1] as u16) << 8); + let msg_id = (self.recv_buf[2] as u16)+ ((self.recv_buf[3] as u16) << 8); + let seq_id = self.recv_buf.get_u16(); + } + if pre_offect >= offset || offset >= self.recv_buf_len { + break; + } + }//end while + if offset > 0 && offset < self.recv_buf_len { + //self.recv_buf[..0].copy_from_slice(&bin); + } + } + } + }, _ => (), } } } -async fn index(req: HttpRequest, stream: web::Payload) -> Result { - let resp = ws::start(MyWs {}, &req, stream); +#[derive(Message)] +#[rtype(result = "()")] +pub struct Message(pub String); + + +impl Handler for MyWs { + type Result = (); + + + fn handle(&mut self, msg: Message, _: &mut ws::WebsocketContext) { + //self.send_message(&msg.room, msg.msg.as_str(), msg.id); + } +} + + + +struct AppStateWithCounter { + counter: Mutex, // <- Mutex is necessary to mutate safely across threads +} + +async fn index(req: HttpRequest, stream: web::Payload, data: web::Data) -> Result { + let mut counter = data.counter.lock().unwrap(); // <- get counter's MutexGuard + *counter += 1; // <- access counter inside MutexGuard + + format!("Request number: {counter}"); // <- response with count + let resp = ws::start(MyWs { + recv_buf_len: 0, + recv_buf: BytesMut::with_capacity(1024 * 64 * 2) + }, &req, stream); println!("{:?}", resp); resp } -fn main() { +fn main() { let t1 = thread::spawn(||{ let mut rt = tokio::runtime::Runtime::new().unwrap(); rt.block_on(async { - HttpServer::new(|| ActixApp::new().route("/ws/", web::get().to(index))) + let counter = web::Data::new(AppStateWithCounter { + counter: Mutex::new(0), + }); + HttpServer::new(move || { + // move counter into the closure + ActixApp::new() + .app_data(counter.clone()) // <- register the created data + .route("/", web::get().to(index)) + }) .bind(("127.0.0.1", 8080))? .run() .await @@ -51,6 +141,7 @@ fn main() { println!("hello2"); }); + //App::run(); //t1.join().unwrap(); println!("hello3"); while true {