This commit is contained in:
azw 2024-05-12 20:49:20 +08:00
parent 14beddb88a
commit a183d86152
4 changed files with 51 additions and 2 deletions

View File

@ -49,6 +49,7 @@ impl f9::app::UserApp for UserApp {
}
}
WsListener::instance().update();
UpStreamMgr::instance().update();
}
fn uninit(&self) {

View File

@ -338,6 +338,7 @@ impl WsListener {
.await
});
}));
/*
{
let down_pack_queue = self.down_pack_queue.clone();
let up_pack_queue = self.up_pack_queue.clone();
@ -355,12 +356,13 @@ impl WsListener {
}
}
});
}
}*/
println!("hello3");
}
}
pub fn update(&self) {
return;
let mut data = bytes::BytesMut::new();
let mut socket_handle = 0;
{
@ -415,4 +417,18 @@ impl WsListener {
println!("on_disconnect socket_handle:{0}", socket_handle);
}
pub fn send_to_downsteam(&self, socket_handle: u16, data: bytes::BytesMut) {
match self.socket_hash.lock().unwrap().get(&socket_handle){
Some(v) => {
let msg = DownStreamMessage{
data: actix_web::web::Bytes::copy_from_slice(data.as_ref())
};
v.do_send(msg);
},
None => {
}
}
}
}

View File

@ -6,7 +6,9 @@ use tokio::io::{AsyncWriteExt};
use bytes::{BufMut, BytesMut};
use std::time::Duration;
use r9;
use crate::common::{DownStreamPack, UpStreamPack};
use crate::common::{DownStreamMessage, DownStreamPack, UpStreamPack};
use crate::listener::WsListener;
pub struct UpStream {
pub instance_id: i32,
remote_ip: String,
@ -172,4 +174,28 @@ impl UpStream {
}
}
pub fn update(&self) {
let mut data = bytes::BytesMut::new();
let mut socket_handle = 0;
{
self.up_pack_queue.lock().unwrap().fetch();
//down_pack_queue.get_mut().fetch();
let work_list = self.up_pack_queue.lock().unwrap().work_list.lock().unwrap().clone();
//let work_list = down_pack_queue.get_mut().work_list.lock().unwrap().clone();
while !work_list.lock().unwrap().empty() {
let node = &work_list.lock().unwrap().first_entry();
node.upgrade().unwrap().lock().unwrap().entry.lock().unwrap().del_init();
data = node.upgrade().unwrap().lock().unwrap().data.clone();
socket_handle = node.upgrade().unwrap().lock().unwrap().socket_handle;
break;
//stream.write_all(node.upgrade().unwrap().borrow().data.as_ref()).await;
//stream.write_all(data.as_ref()).await;
}
}
if data.len() > 0 {
println!("write_all3 socket_handle:{0} len:{1}", socket_handle, data.len());
WsListener::instance().send_to_downsteam(socket_handle, data);
}
}
}

View File

@ -55,4 +55,10 @@ impl UpStreamMgr {
}
}
pub fn update(&self) {
for (key, value) in self.id_hash.borrow().iter() {
value.update();
}
}
}