This commit is contained in:
azw 2024-05-12 22:36:16 +08:00
parent 5726c3ad04
commit 13204031ca
2 changed files with 58 additions and 51 deletions

View File

@ -15,7 +15,7 @@ use bytes::{BufMut, BytesMut};
use crate::app::UserApp;
use crate::constant;
use tokio::runtime::Runtime;
use crate::common::types::{DownStreamPack, DownStreamMessage, UpStreamPack};
use crate::common::types::{DownStreamMessage};
use std::sync::OnceLock;
use std::sync::atomic::{AtomicU16, Ordering};

View File

@ -21,6 +21,62 @@ pub struct UpStream {
async fn upstream_enter(stream_arc: Arc<tokio::sync::Mutex<TcpStream>>, down_pack_queue: Arc<std::sync::Mutex::<r9::QueueLock<DownStreamPack>>>,
up_pack_queue: Arc<std::sync::Mutex::<r9::QueueLock<UpStreamPack>>>) {
let on_read = |len: usize,
recv_buf_len: &mut usize,
recv_buf: &mut BytesMut,
data: &Vec<u8>,
up_pack_queue: &Arc<std::sync::Mutex::<r9::QueueLock<UpStreamPack>>>| {
println!("read {0} bytes recv_buf_len:{1}",len, recv_buf_len);
for i in 0..len {
recv_buf[*recv_buf_len + i] = data[i];
//println!("sendmsg xxxxx idx:{0} val:{1} buf_val:{2} recv_buf_len:{3}", i, data[i], recv_buf[recv_buf_len + 1], recv_buf_len);
}
*recv_buf_len += len;
while *recv_buf_len >= 16 {
let pack_len = (recv_buf[0] as u16) + ((recv_buf[1] as u16) << 8);
let msg_id = (recv_buf[2] as u16) + ((recv_buf[3] as u16) << 8);
let seq_id = (recv_buf[4] as i32) + ((recv_buf[5] as i32) << 8) +
((recv_buf[7] as i32) << 16) + ((recv_buf[7] as i32) << 24);
let magic_code = (recv_buf[8] as u16) + ((recv_buf[9] as u16) << 8);
let rpc_error_code = (recv_buf[10] as u16) + ((recv_buf[11] as u16) << 8);
let socket_handle = (recv_buf[12] as u16) + ((recv_buf[13] as u16) << 8);
let ext_len = (recv_buf[14] as u16) + ((recv_buf[15] as u16) << 8);
if ((pack_len + 16) as usize) > *recv_buf_len {
break;
}
let mut tmp_bytes = BytesMut::with_capacity((pack_len + 16) as usize);
{
tmp_bytes.put_u16_le(pack_len);
tmp_bytes.put_u16_le(msg_id);
tmp_bytes.put_i32_le(seq_id);
tmp_bytes.put_u16_le(magic_code);
tmp_bytes.put_u16_le(ext_len);
for i in 0..pack_len {
tmp_bytes.put_u8(recv_buf[(16 + i) as usize]);
}
let mut a_bytes = actix_web::web::Bytes::copy_from_slice(tmp_bytes.as_ref());
{
let node = UpStreamPack::new(socket_handle, tmp_bytes);
up_pack_queue.lock().unwrap().push(&node.lock().unwrap().entry);
}
//a.do_send(msg);
{
println!("sendmsg socket_handle:{0} pack_len:{1} msg_id:{2} seq_id:{3} magic_code:{4} ext_len:{5}",
socket_handle,
pack_len,
msg_id,
seq_id,
magic_code,
ext_len);
}
}
for i in (pack_len + 16) as usize..*recv_buf_len {
recv_buf[i - (pack_len + 16) as usize] = recv_buf[i];
}
*recv_buf_len -= (pack_len + 16) as usize;
}
};
tokio::spawn(async move {
println!("upstream_enter2");
let mut recv_buf = BytesMut::with_capacity(1024 * 64 * 3);
@ -43,56 +99,7 @@ async fn upstream_enter(stream_arc: Arc<tokio::sync::Mutex<TcpStream>>, down_pac
let mut data = vec![0; 1024 * 64];
match stream.try_read(&mut data) {
Ok(len) => {
println!("read {0} bytes recv_buf_len:{1}",len, recv_buf_len);
for i in 0..len {
recv_buf[recv_buf_len + i] = data[i];
//println!("sendmsg xxxxx idx:{0} val:{1} buf_val:{2} recv_buf_len:{3}", i, data[i], recv_buf[recv_buf_len + 1], recv_buf_len);
}
recv_buf_len += len;
while recv_buf_len >= 16 {
let pack_len = (recv_buf[0] as u16) + ((recv_buf[1] as u16) << 8);
let msg_id = (recv_buf[2] as u16) + ((recv_buf[3] as u16) << 8);
let seq_id = (recv_buf[4] as i32) + ((recv_buf[5] as i32) << 8) +
((recv_buf[7] as i32) << 16) + ((recv_buf[7] as i32) << 24);
let magic_code = (recv_buf[8] as u16) + ((recv_buf[9] as u16) << 8);
let rpc_error_code = (recv_buf[10] as u16) + ((recv_buf[11] as u16) << 8);
let socket_handle = (recv_buf[12] as u16) + ((recv_buf[13] as u16) << 8);
let ext_len = (recv_buf[14] as u16) + ((recv_buf[15] as u16) << 8);
if ((pack_len + 16) as usize) > recv_buf_len {
break;
}
let mut tmp_bytes = BytesMut::with_capacity((pack_len + 16) as usize);
{
tmp_bytes.put_u16_le(pack_len);
tmp_bytes.put_u16_le(msg_id);
tmp_bytes.put_i32_le(seq_id);
tmp_bytes.put_u16_le(magic_code);
tmp_bytes.put_u16_le(ext_len);
for i in 0..pack_len {
tmp_bytes.put_u8(recv_buf[(16 + i) as usize]);
}
let mut a_bytes = actix_web::web::Bytes::copy_from_slice(tmp_bytes.as_ref());
{
let node = UpStreamPack::new(socket_handle, tmp_bytes);
up_pack_queue.lock().unwrap().push(&node.lock().unwrap().entry);
}
//a.do_send(msg);
{
println!("sendmsg socket_handle:{0} pack_len:{1} msg_id:{2} seq_id:{3} magic_code:{4} ext_len:{5}",
socket_handle,
pack_len,
msg_id,
seq_id,
magic_code,
ext_len);
}
}
for i in (pack_len + 16) as usize..recv_buf_len {
recv_buf[i - (pack_len + 16) as usize] = recv_buf[i];
}
recv_buf_len -= (pack_len + 16) as usize;
}
on_read(len, &mut recv_buf_len, &mut recv_buf, &data, &up_pack_queue);
}
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
println!("read error 1");