1
This commit is contained in:
parent
a183d86152
commit
28c89a6d50
@ -48,7 +48,6 @@ impl f9::app::UserApp for UserApp {
|
||||
None => {}
|
||||
}
|
||||
}
|
||||
WsListener::instance().update();
|
||||
UpStreamMgr::instance().update();
|
||||
}
|
||||
|
||||
|
@ -13,16 +13,13 @@ use std::sync::{Arc};
|
||||
use tokio::sync::Mutex;
|
||||
use std::thread;
|
||||
use std::thread::JoinHandle;
|
||||
use std::time::Duration;
|
||||
use actix::prelude::*;
|
||||
use bytes::{BufMut, BytesMut};
|
||||
use tokio::io::{Interest};
|
||||
use crate::app::UserApp;
|
||||
use crate::constant;
|
||||
use tokio::net::TcpStream;
|
||||
use tokio::runtime::Runtime;
|
||||
use crate::common::types::{DownStreamPack, DownStreamMessage, UpStreamPack};
|
||||
use tokio::io::{AsyncWriteExt};
|
||||
use std::sync::OnceLock;
|
||||
use std::sync::atomic::{AtomicU16, Ordering};
|
||||
|
||||
@ -172,125 +169,6 @@ async fn index(req: HttpRequest, stream: web::Payload) -> Result<HttpResponse, E
|
||||
resp
|
||||
}
|
||||
|
||||
async fn upstream_enter(stream_arc: Arc<Mutex<TcpStream>>, down_pack_queue: Arc<std::sync::Mutex::<r9::QueueLock<DownStreamPack>>>,
|
||||
up_pack_queue: Arc<std::sync::Mutex::<r9::QueueLock<UpStreamPack>>>) {
|
||||
tokio::spawn(async move {
|
||||
println!("upstream_enter2");
|
||||
let mut recv_buf = BytesMut::with_capacity(1024 * 64 * 3);
|
||||
let mut recv_buf_len = 0;
|
||||
{
|
||||
for i in 0..recv_buf.capacity() {
|
||||
recv_buf.put_i8(0);
|
||||
}
|
||||
}
|
||||
loop {
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
let mut stream = stream_arc.lock().await;
|
||||
let ready = stream.ready(Interest::READABLE | Interest::WRITABLE).await;
|
||||
match ready {
|
||||
Ok(r) => {
|
||||
{
|
||||
if r.is_readable() {
|
||||
// Try to read data, this may still fail with `WouldBlock`
|
||||
// if the readiness event is a false positive.
|
||||
let mut data = vec![0; 1024 * 64];
|
||||
match stream.try_read(&mut data) {
|
||||
Ok(len) => {
|
||||
println!("read {0} bytes recv_buf_len:{1}",len, recv_buf_len);
|
||||
for i in 0..len {
|
||||
recv_buf[recv_buf_len + i] = data[i];
|
||||
//println!("sendmsg xxxxx idx:{0} val:{1} buf_val:{2} recv_buf_len:{3}", i, data[i], recv_buf[recv_buf_len + 1], recv_buf_len);
|
||||
}
|
||||
recv_buf_len += len;
|
||||
while recv_buf_len >= 16 {
|
||||
let pack_len = (recv_buf[0] as u16) + ((recv_buf[1] as u16) << 8);
|
||||
let msg_id = (recv_buf[2] as u16) + ((recv_buf[3] as u16) << 8);
|
||||
let seq_id = (recv_buf[4] as i32) + ((recv_buf[5] as i32) << 8) +
|
||||
((recv_buf[7] as i32) << 16) + ((recv_buf[7] as i32) << 24);
|
||||
let magic_code = (recv_buf[8] as u16) + ((recv_buf[9] as u16) << 8);
|
||||
let rpc_error_code = (recv_buf[10] as u16) + ((recv_buf[11] as u16) << 8);
|
||||
let socket_handle = (recv_buf[12] as u16) + ((recv_buf[13] as u16) << 8);
|
||||
let ext_len = (recv_buf[14] as u16) + ((recv_buf[15] as u16) << 8);
|
||||
if ((pack_len + 16) as usize) > recv_buf_len {
|
||||
break;
|
||||
}
|
||||
let mut tmp_bytes = BytesMut::with_capacity((pack_len + 16) as usize);
|
||||
{
|
||||
tmp_bytes.put_u16_le(pack_len);
|
||||
tmp_bytes.put_u16_le(msg_id);
|
||||
tmp_bytes.put_i32_le(seq_id);
|
||||
tmp_bytes.put_u16_le(magic_code);
|
||||
tmp_bytes.put_u16_le(ext_len);
|
||||
for i in 0..pack_len {
|
||||
tmp_bytes.put_u8(recv_buf[(16 + i) as usize]);
|
||||
}
|
||||
let mut a_bytes = actix_web::web::Bytes::copy_from_slice(tmp_bytes.as_ref());
|
||||
{
|
||||
let node = UpStreamPack::new(socket_handle, tmp_bytes);
|
||||
up_pack_queue.lock().unwrap().push(&node.lock().unwrap().entry);
|
||||
}
|
||||
//a.do_send(msg);
|
||||
|
||||
{
|
||||
println!("sendmsg socket_handle:{0} pack_len:{1} msg_id:{2} seq_id:{3} magic_code:{4} ext_len:{5}",
|
||||
socket_handle,
|
||||
pack_len,
|
||||
msg_id,
|
||||
seq_id,
|
||||
magic_code,
|
||||
ext_len);
|
||||
}
|
||||
}
|
||||
for i in (pack_len + 16) as usize..recv_buf_len {
|
||||
recv_buf[i - (pack_len + 16) as usize] = recv_buf[i];
|
||||
}
|
||||
recv_buf_len -= (pack_len + 16) as usize;
|
||||
}
|
||||
}
|
||||
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
|
||||
println!("read error 1");
|
||||
continue;
|
||||
}
|
||||
Err(e) => {
|
||||
println!("read error 2");
|
||||
//return Err(e.into());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
if r.is_writable() {
|
||||
let mut data = bytes::BytesMut::new();
|
||||
{
|
||||
down_pack_queue.lock().unwrap().fetch();
|
||||
//down_pack_queue.get_mut().fetch();
|
||||
let work_list = down_pack_queue.lock().unwrap().work_list.lock().unwrap().clone();
|
||||
//let work_list = down_pack_queue.get_mut().work_list.lock().unwrap().clone();
|
||||
while !work_list.lock().unwrap().empty() {
|
||||
let node = &work_list.lock().unwrap().first_entry();
|
||||
node.upgrade().unwrap().lock().unwrap().entry.lock().unwrap().del_init();
|
||||
data = node.upgrade().unwrap().lock().unwrap().data.clone();
|
||||
break;
|
||||
//stream.write_all(node.upgrade().unwrap().borrow().data.as_ref()).await;
|
||||
//stream.write_all(data.as_ref()).await;
|
||||
}
|
||||
}
|
||||
if data.len() > 0 {
|
||||
println!("write_all2 len:{0}", data.len());
|
||||
stream.write_all(data.as_ref()).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
impl WsListener {
|
||||
|
||||
pub fn instance() -> &'static Self {
|
||||
@ -338,73 +216,14 @@ impl WsListener {
|
||||
.await
|
||||
});
|
||||
}));
|
||||
/*
|
||||
{
|
||||
let down_pack_queue = self.down_pack_queue.clone();
|
||||
let up_pack_queue = self.up_pack_queue.clone();
|
||||
self.tokio_rt.spawn(async move {
|
||||
let mut result = TcpStream::connect("192.168.100.39:7616").await;
|
||||
match result {
|
||||
Ok(v) => {
|
||||
let a = Arc::new(Mutex::new(v));
|
||||
WsListener::instance().upstream.lock().unwrap().replace(a.clone());
|
||||
println!("connect ok");
|
||||
upstream_enter(a, down_pack_queue, up_pack_queue).await;
|
||||
},
|
||||
Err(e) => {
|
||||
println!("connect err")
|
||||
}
|
||||
}
|
||||
});
|
||||
}*/
|
||||
println!("hello3");
|
||||
}
|
||||
}
|
||||
|
||||
pub fn update(&self) {
|
||||
return;
|
||||
let mut data = bytes::BytesMut::new();
|
||||
let mut socket_handle = 0;
|
||||
{
|
||||
self.up_pack_queue.lock().unwrap().fetch();
|
||||
//down_pack_queue.get_mut().fetch();
|
||||
let work_list = self.up_pack_queue.lock().unwrap().work_list.lock().unwrap().clone();
|
||||
//let work_list = down_pack_queue.get_mut().work_list.lock().unwrap().clone();
|
||||
while !work_list.lock().unwrap().empty() {
|
||||
let node = &work_list.lock().unwrap().first_entry();
|
||||
node.upgrade().unwrap().lock().unwrap().entry.lock().unwrap().del_init();
|
||||
data = node.upgrade().unwrap().lock().unwrap().data.clone();
|
||||
socket_handle = node.upgrade().unwrap().lock().unwrap().socket_handle;
|
||||
break;
|
||||
//stream.write_all(node.upgrade().unwrap().borrow().data.as_ref()).await;
|
||||
//stream.write_all(data.as_ref()).await;
|
||||
}
|
||||
}
|
||||
if data.len() > 0 {
|
||||
println!("write_all3 socket_handle:{0} len:{1}", socket_handle, data.len());
|
||||
match self.socket_hash.lock().unwrap().get(&socket_handle){
|
||||
Some(v) => {
|
||||
let msg = DownStreamMessage{
|
||||
data: actix_web::web::Bytes::copy_from_slice(data.as_ref())
|
||||
};
|
||||
v.do_send(msg);
|
||||
},
|
||||
None => {
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn uninit(&self) {
|
||||
|
||||
}
|
||||
|
||||
fn add_down_stream_pack(&self, data: std::sync::Weak::<std::sync::Mutex::<DownStreamPack>>) {
|
||||
println!("add_down_stream_pack2 len:{0}", data.upgrade().unwrap().lock().unwrap().data.len());
|
||||
let socket_handle = data.upgrade().unwrap().lock().unwrap().socket_handle;
|
||||
self.down_pack_queue.lock().as_ref().unwrap().push(&data.upgrade().unwrap().lock().unwrap().entry);
|
||||
}
|
||||
fn on_connect(&self, conn: &mut WsConn, socket: Recipient<DownStreamMessage>) {
|
||||
self.cur_socket_handle.fetch_add(1, Ordering::Relaxed);
|
||||
conn.socket_handle = self.cur_socket_handle.load(Ordering::Relaxed);
|
||||
|
Loading…
x
Reference in New Issue
Block a user