This commit is contained in:
azw 2024-05-09 09:08:58 +08:00
parent 13a4fb34d9
commit cba1ec0d84
3 changed files with 34 additions and 9 deletions

View File

@ -12,6 +12,7 @@ use bytes::BytesMut;
//use crate::ss; //use crate::ss;
pub struct DownStreamPack { pub struct DownStreamPack {
socket_handle: u16,
data: bytes::BytesMut, data: bytes::BytesMut,
entry: Rc::<RefCell::<r9::ListHead<DownStreamPack>>>, entry: Rc::<RefCell::<r9::ListHead<DownStreamPack>>>,
} }
@ -70,8 +71,9 @@ impl f9::app::UserApp for UserApp {
impl UserApp { impl UserApp {
pub fn add_down_stream_pack(&mut self, data: bytes::BytesMut) { pub fn add_down_stream_pack(&mut self, socket_handle: u16, data: bytes::BytesMut) {
let node = Rc::new(RefCell::new(DownStreamPack{ let node = Rc::new(RefCell::new(DownStreamPack{
socket_handle: socket_handle,
data: data, data: data,
entry: r9::ListHead::< crate::app::user_app::DownStreamPack>::new_node(Default::default()), entry: r9::ListHead::< crate::app::user_app::DownStreamPack>::new_node(Default::default()),
})); }));

View File

@ -1,5 +1,6 @@
use std::rc::{Rc, Weak}; use std::rc::{Rc, Weak};
use std::cell::RefCell; use std::cell::RefCell;
use std::collections::HashMap;
use r9_macro::SharedFromSelf; use r9_macro::SharedFromSelf;
use r9_macro_derive::SharedFromSelf; use r9_macro_derive::SharedFromSelf;
use r9_macro_derive::Singleton; use r9_macro_derive::Singleton;
@ -20,13 +21,15 @@ use actix::prelude::*;
use bytes::{BufMut, BytesMut}; use bytes::{BufMut, BytesMut};
use actix_web::web::Bytes; use actix_web::web::Bytes;
use crate::app::UserApp; use crate::app::UserApp;
use crate::MAX_PACKET_LEN; use crate::{GSResponse, MAX_PACKET_LEN};
#[derive(SharedFromSelf)] #[derive(SharedFromSelf)]
#[derive(Singleton)] #[derive(Singleton)]
pub struct WsListener { pub struct WsListener {
_self_wp: Weak::<RefCell::<Self>>, _self_wp: Weak::<RefCell::<Self>>,
work_thread: Option<JoinHandle<()>>, work_thread: Option<JoinHandle<()>>,
socket_hash: HashMap<u16, Recipient<DownStreamMessage>>,
cur_socket_handle: u16,
} }
#[derive(Message)] #[derive(Message)]
@ -37,6 +40,7 @@ pub struct DownStreamMessage {
/// Define HTTP actor /// Define HTTP actor
pub struct WsConn { pub struct WsConn {
socket_handle: u16,
recv_buf_len: usize, recv_buf_len: usize,
recv_buf: BytesMut, recv_buf: BytesMut,
} }
@ -93,7 +97,7 @@ impl WsConn {
println!("recv buf_len:{0}", tmp_bytes.len()); println!("recv buf_len:{0}", tmp_bytes.len());
} }
{ {
UserApp::instance().borrow_mut().add_down_stream_pack(tmp_bytes); UserApp::instance().borrow_mut().add_down_stream_pack(self.socket_handle, tmp_bytes);
} }
offset += (pack_len as usize) + 12; offset += (pack_len as usize) + 12;
} }
@ -119,6 +123,11 @@ impl WsConn {
} }
impl StreamHandler<Result<Message, ws::ProtocolError>> for WsConn { impl StreamHandler<Result<Message, ws::ProtocolError>> for WsConn {
fn started(&mut self, ctx: &mut Self::Context) {
let a: Recipient<DownStreamMessage> = ctx.address().recipient();
WsListener::instance().borrow_mut().on_connect(self, a);
}
fn handle(&mut self, item: Result<Message, ws::ProtocolError>, ctx: &mut Self::Context) { fn handle(&mut self, item: Result<Message, ws::ProtocolError>, ctx: &mut Self::Context) {
match item { match item {
Ok(Message::Text(text)) => { Ok(Message::Text(text)) => {
@ -130,8 +139,11 @@ impl StreamHandler<Result<Message, ws::ProtocolError>> for WsConn {
Ok(Message::Binary(bin)) => { Ok(Message::Binary(bin)) => {
self.parse_pkt(&bin, ctx); self.parse_pkt(&bin, ctx);
} }
Ok(Message::Close(reason)) => ctx.close(reason), Ok(Message::Close(reason)) => {
_ => (), ctx.close(reason);
WsListener::instance().borrow_mut().on_disconnect(self.socket_handle);
},
_ => {},
} }
} }
} }
@ -146,6 +158,7 @@ impl Handler<DownStreamMessage> for WsConn {
async fn index(req: HttpRequest, stream: web::Payload) -> Result<HttpResponse, Error> { async fn index(req: HttpRequest, stream: web::Payload) -> Result<HttpResponse, Error> {
let mut conn = WsConn{ let mut conn = WsConn{
socket_handle: 0,
recv_buf_len: 0, recv_buf_len: 0,
recv_buf: BytesMut::with_capacity(1024 * 64 * 2) recv_buf: BytesMut::with_capacity(1024 * 64 * 2)
}; };
@ -159,8 +172,10 @@ impl WsListener {
pub fn new() -> Self { pub fn new() -> Self {
let p = Self{ let p = Self{
cur_socket_handle: 0,
_self_wp: Default::default(), _self_wp: Default::default(),
work_thread: Default::default(), work_thread: Default::default(),
socket_hash: Default::default(),
}; };
return p; return p;
} }
@ -193,4 +208,16 @@ impl WsListener {
} }
fn on_connect(&mut self, conn: &mut WsConn, socket: Recipient<DownStreamMessage>) {
self.cur_socket_handle += 1;
conn.socket_handle = self.cur_socket_handle;
self.socket_hash.insert(conn.socket_handle, socket);
println!("on_connect socket_handle:{0}", conn.socket_handle);
}
fn on_disconnect(&mut self, socket_handle: u16) {
self.socket_hash.remove(&socket_handle);
println!("on_disconnect socket_handle:{0}", socket_handle);
}
} }

View File

@ -235,10 +235,6 @@ async fn index(req: HttpRequest, stream: web::Payload, data: web::Data<AppStateW
recv_queue: Default::default(), recv_queue: Default::default(),
//addr: Default::default(), //addr: Default::default(),
}; };
{
let mut tmp_bytes = BytesMut::with_capacity((16) as usize);
UserApp::instance().borrow_mut().add_down_stream_pack(tmp_bytes);
}
{ {
let mut down_stream = TcpStream::connect("192.168.100.39:7616").await?; let mut down_stream = TcpStream::connect("192.168.100.39:7616").await?;
down_stream.set_nodelay(true); down_stream.set_nodelay(true);