This commit is contained in:
azw 2024-05-05 15:38:31 +08:00
parent 2c399e2d92
commit f4868cdfcf
2 changed files with 99 additions and 7 deletions

View File

@ -20,6 +20,7 @@ r9 = { path = "../../third_party/librust/r9" }
r9_macro = { path = "../../third_party/librust/r9_macro" }
r9_macro_derive = { path = "../../third_party/librust/r9_macro_derive" }
f9 = { path = "../../third_party/librust/f9" }
bytes = "0.5.6"
[build-dependencies]
protobuf-codegen-pure = "2.9.0"

View File

@ -1,3 +1,4 @@
use std::future::IntoFuture;
use f9::app::App;
mod app;
@ -12,10 +13,20 @@ use std::thread;
use actix_rt::System;
use std::time::Duration;
use actix_web_actors::ws;
use actix::{Actor, StreamHandler};
use actix::{Actor, AsyncContext, StreamHandler};
use std::sync::Mutex;
use actix::prelude::*;
use bytes::{Buf, BufMut, Bytes};
use bytes::BytesMut;
//use serde::de::Unexpected::Bytes;
const MAX_PACKET_LEN: usize = 1024 * 64;
/// Define HTTP actor
struct MyWs;
struct MyWs {
recv_buf_len: usize,
recv_buf: BytesMut,
}
impl Actor for MyWs {
type Context = ws::WebsocketContext<Self>;
@ -23,27 +34,106 @@ impl Actor for MyWs {
/// Handler for ws::Message message
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for MyWs {
fn started(&mut self, ctx: &mut Self::Context) {
let a = ctx.address().recipient();
//let b = 100;
}
fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
match msg {
Ok(ws::Message::Ping(msg)) => ctx.pong(&msg),
Ok(ws::Message::Text(text)) => ctx.text(text),
Ok(ws::Message::Binary(bin)) => ctx.binary(bin),
Ok(ws::Message::Binary(bin)) => {
let buf_len = bin.len();
if buf_len > 0 {
let mut already_read_bytes: usize = 0;
{
for i in 0..1024*64*2 {
self.recv_buf.put_i8(0);
}
}
while true {
let mut read_bytes = buf_len - already_read_bytes;
if read_bytes > MAX_PACKET_LEN - self.recv_buf_len {
read_bytes = MAX_PACKET_LEN - self.recv_buf_len;
}
if read_bytes > 0 {
self.recv_buf[self.recv_buf_len..bin.len()].copy_from_slice(&bin);
self.recv_buf_len += read_bytes;
already_read_bytes += read_bytes;
}
let mut offset = 0;
let mut pre_offect = 0;
while true {
pre_offect = offset;
{
let pack_len = (self.recv_buf[0] as u16)+ ((self.recv_buf[1] as u16) << 8);
let msg_id = (self.recv_buf[2] as u16)+ ((self.recv_buf[3] as u16) << 8);
let seq_id = self.recv_buf.get_u16();
}
if pre_offect >= offset || offset >= self.recv_buf_len {
break;
}
}//end while
if offset > 0 && offset < self.recv_buf_len {
//self.recv_buf[..0].copy_from_slice(&bin);
}
}
}
},
_ => (),
}
}
}
async fn index(req: HttpRequest, stream: web::Payload) -> Result<HttpResponse, Error> {
let resp = ws::start(MyWs {}, &req, stream);
#[derive(Message)]
#[rtype(result = "()")]
pub struct Message(pub String);
impl Handler<Message> for MyWs {
type Result = ();
fn handle(&mut self, msg: Message, _: &mut ws::WebsocketContext<Self>) {
//self.send_message(&msg.room, msg.msg.as_str(), msg.id);
}
}
struct AppStateWithCounter {
counter: Mutex<i32>, // <- Mutex is necessary to mutate safely across threads
}
async fn index(req: HttpRequest, stream: web::Payload, data: web::Data<AppStateWithCounter>) -> Result<HttpResponse, Error> {
let mut counter = data.counter.lock().unwrap(); // <- get counter's MutexGuard
*counter += 1; // <- access counter inside MutexGuard
format!("Request number: {counter}"); // <- response with count
let resp = ws::start(MyWs {
recv_buf_len: 0,
recv_buf: BytesMut::with_capacity(1024 * 64 * 2)
}, &req, stream);
println!("{:?}", resp);
resp
}
fn main() {
fn main() {
let t1 = thread::spawn(||{
let mut rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
HttpServer::new(|| ActixApp::new().route("/ws/", web::get().to(index)))
let counter = web::Data::new(AppStateWithCounter {
counter: Mutex::new(0),
});
HttpServer::new(move || {
// move counter into the closure
ActixApp::new()
.app_data(counter.clone()) // <- register the created data
.route("/", web::get().to(index))
})
.bind(("127.0.0.1", 8080))?
.run()
.await
@ -51,6 +141,7 @@ fn main() {
println!("hello2");
});
//App::run();
//t1.join().unwrap();
println!("hello3");
while true {