1
This commit is contained in:
parent
2f8a373bdf
commit
13a4fb34d9
@ -1,4 +1,3 @@
|
||||
use std::time::{Duration, Instant};
|
||||
use std::rc::{Rc, Weak};
|
||||
use std::cell::RefCell;
|
||||
use r9_macro::SharedFromSelf;
|
||||
@ -7,23 +6,21 @@ use r9_macro_derive::Singleton;
|
||||
use actix::{Actor, StreamHandler};
|
||||
use actix_web_actors::ws::{self, Message};
|
||||
use actix_web::{
|
||||
dev::ServerHandle,
|
||||
rt,
|
||||
web,
|
||||
App as ActixApp,
|
||||
web::Data,
|
||||
web::Query,
|
||||
HttpRequest,
|
||||
HttpResponse,
|
||||
HttpServer,
|
||||
Error,
|
||||
Responder};
|
||||
Error
|
||||
};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::thread;
|
||||
use std::thread::JoinHandle;
|
||||
use actix::prelude::*;
|
||||
use bytes::BytesMut;
|
||||
use crate::AppStateWithCounter;
|
||||
use bytes::{BufMut, BytesMut};
|
||||
use actix_web::web::Bytes;
|
||||
use crate::app::UserApp;
|
||||
use crate::MAX_PACKET_LEN;
|
||||
|
||||
#[derive(SharedFromSelf)]
|
||||
#[derive(Singleton)]
|
||||
@ -32,6 +29,12 @@ pub struct WsListener {
|
||||
work_thread: Option<JoinHandle<()>>,
|
||||
}
|
||||
|
||||
#[derive(Message)]
|
||||
#[rtype(result = "()")]
|
||||
pub struct DownStreamMessage {
|
||||
data: actix_web::web::Bytes,
|
||||
}
|
||||
|
||||
/// Define HTTP actor
|
||||
pub struct WsConn {
|
||||
recv_buf_len: usize,
|
||||
@ -42,18 +45,105 @@ impl Actor for WsConn {
|
||||
type Context = ws::WebsocketContext<Self>;
|
||||
}
|
||||
|
||||
impl WsConn {
|
||||
fn parse_pkt(&mut self, bin: &actix_web::web::Bytes, ctx: &mut <WsConn as Actor>::Context) {
|
||||
let buf_len = bin.len();
|
||||
if buf_len > 0 {
|
||||
let mut already_read_bytes: usize = 0;
|
||||
{
|
||||
for i in 0..1024 * 64 * 2 {
|
||||
self.recv_buf.put_i8(0);
|
||||
}
|
||||
}
|
||||
while true {
|
||||
let mut read_bytes = buf_len - already_read_bytes;
|
||||
if read_bytes > MAX_PACKET_LEN - self.recv_buf_len {
|
||||
read_bytes = MAX_PACKET_LEN - self.recv_buf_len;
|
||||
}
|
||||
if read_bytes > 0 {
|
||||
self.recv_buf[self.recv_buf_len..bin.len()].copy_from_slice(&bin);
|
||||
self.recv_buf_len += read_bytes;
|
||||
already_read_bytes += read_bytes;
|
||||
}
|
||||
|
||||
let mut offset = 0;
|
||||
let mut pre_offect = 0;
|
||||
while true {
|
||||
pre_offect = offset;
|
||||
{
|
||||
let pack_len = (self.recv_buf[0] as u16)+ ((self.recv_buf[1] as u16) << 8);
|
||||
let msg_id = (self.recv_buf[2] as u16)+ ((self.recv_buf[3] as u16) << 8);
|
||||
let seq_id = (self.recv_buf[4] as i32)+ ((self.recv_buf[5] as i32) << 8) +
|
||||
((self.recv_buf[7] as i32) << 16)+ ((self.recv_buf[7] as i32) << 24);
|
||||
let magic_code = (self.recv_buf[8] as u16)+ ((self.recv_buf[9] as u16) << 8);
|
||||
let ext_len = (self.recv_buf[10] as u16)+ ((self.recv_buf[11] as u16) << 8);
|
||||
let mut tmp_bytes = BytesMut::with_capacity((pack_len + 24) as usize);
|
||||
{
|
||||
tmp_bytes.put_u16_le(pack_len);
|
||||
tmp_bytes.put_u16_le(msg_id);
|
||||
tmp_bytes.put_i32_le(seq_id);
|
||||
tmp_bytes.put_u16_le(magic_code);
|
||||
tmp_bytes.put_u16_le(ext_len);
|
||||
tmp_bytes.put_i32_le(0);
|
||||
tmp_bytes.put_i64_le(0);
|
||||
for i in 24..tmp_bytes.capacity() {
|
||||
tmp_bytes.put_i8(0);
|
||||
}
|
||||
tmp_bytes[24 .. (24+ pack_len as usize)].copy_from_slice(&bin[12..(12 + pack_len as usize)]);
|
||||
println!("recv buf_len:{0}", tmp_bytes.len());
|
||||
}
|
||||
{
|
||||
UserApp::instance().borrow_mut().add_down_stream_pack(tmp_bytes);
|
||||
}
|
||||
offset += (pack_len as usize) + 12;
|
||||
}
|
||||
if pre_offect >= offset || offset >= self.recv_buf_len {
|
||||
break;
|
||||
}
|
||||
}//end while
|
||||
if offset > 0 && offset < self.recv_buf_len {
|
||||
self.recv_buf[..0].copy_from_slice(&bin);
|
||||
}
|
||||
self.recv_buf_len -= offset;
|
||||
if self.recv_buf_len > 0 {
|
||||
self.recv_buf_len = 0;
|
||||
}
|
||||
if already_read_bytes >= bin.len() {
|
||||
already_read_bytes = 0;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
impl StreamHandler<Result<Message, ws::ProtocolError>> for WsConn {
|
||||
fn handle(&mut self, item: Result<Message, ws::ProtocolError>, ctx: &mut Self::Context) {
|
||||
match item {
|
||||
Ok(Message::Text(text)) => ctx.text(text),
|
||||
Ok(Message::Ping(msg)) => ctx.pong(&msg),
|
||||
Ok(Message::Binary(bin)) => ctx.binary(bin),
|
||||
Ok(Message::Text(text)) => {
|
||||
|
||||
},
|
||||
Ok(Message::Ping(msg)) => {
|
||||
ctx.pong(&msg)
|
||||
},
|
||||
Ok(Message::Binary(bin)) => {
|
||||
self.parse_pkt(&bin, ctx);
|
||||
}
|
||||
Ok(Message::Close(reason)) => ctx.close(reason),
|
||||
_ => (),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Handler<DownStreamMessage> for WsConn {
|
||||
type Result = ();
|
||||
|
||||
fn handle(&mut self, msg: DownStreamMessage, ctx: &mut Self::Context) -> Self::Result {
|
||||
ctx.binary(msg.data);
|
||||
}
|
||||
}
|
||||
|
||||
async fn index(req: HttpRequest, stream: web::Payload) -> Result<HttpResponse, Error> {
|
||||
let mut conn = WsConn{
|
||||
recv_buf_len: 0,
|
||||
@ -84,13 +174,9 @@ impl WsListener {
|
||||
.build()
|
||||
.unwrap();
|
||||
rt.block_on(async {
|
||||
let counter = web::Data::new(AppStateWithCounter {
|
||||
counter: Mutex::new(0),
|
||||
});
|
||||
HttpServer::new(move || {
|
||||
// move counter into the closure
|
||||
ActixApp::new()
|
||||
.app_data(counter.clone()) // <- register the created data
|
||||
ActixApp::new() // <- register the created data
|
||||
.route("/", web::get().to(crate::index))
|
||||
})
|
||||
.bind(("0.0.0.0", 8080))?
|
||||
|
Loading…
x
Reference in New Issue
Block a user