1
This commit is contained in:
parent
13204031ca
commit
e7ef4d9050
@ -19,64 +19,86 @@ pub struct UpStream {
|
|||||||
pub up_pack_queue: Arc<Mutex::<r9::QueueLock<UpStreamPack>>>,
|
pub up_pack_queue: Arc<Mutex::<r9::QueueLock<UpStreamPack>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn on_write(stream: &mut TcpStream, down_pack_queue: &Arc<std::sync::Mutex::<r9::QueueLock<DownStreamPack>>>) {
|
||||||
|
let mut data = bytes::BytesMut::new();
|
||||||
|
{
|
||||||
|
down_pack_queue.lock().unwrap().fetch();
|
||||||
|
//down_pack_queue.get_mut().fetch();
|
||||||
|
let work_list = down_pack_queue.lock().unwrap().work_list.lock().unwrap().clone();
|
||||||
|
//let work_list = down_pack_queue.get_mut().work_list.lock().unwrap().clone();
|
||||||
|
while !work_list.lock().unwrap().empty() {
|
||||||
|
let node = &work_list.lock().unwrap().first_entry();
|
||||||
|
node.upgrade().unwrap().lock().unwrap().entry.lock().unwrap().del_init();
|
||||||
|
data = node.upgrade().unwrap().lock().unwrap().data.clone();
|
||||||
|
break;
|
||||||
|
//stream.write_all(node.upgrade().unwrap().borrow().data.as_ref()).await;
|
||||||
|
//stream.write_all(data.as_ref()).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if data.len() > 0 {
|
||||||
|
println!("write_all2 len:{0}", data.len());
|
||||||
|
stream.write_all(data.as_ref()).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn on_read(len: usize,
|
||||||
|
recv_buf_len: &mut usize,
|
||||||
|
recv_buf: &mut BytesMut,
|
||||||
|
data: &Vec<u8>,
|
||||||
|
up_pack_queue: &Arc<std::sync::Mutex::<r9::QueueLock<UpStreamPack>>>) {
|
||||||
|
println!("read {0} bytes recv_buf_len:{1}",len, recv_buf_len);
|
||||||
|
for i in 0..len {
|
||||||
|
recv_buf[*recv_buf_len + i] = data[i];
|
||||||
|
//println!("sendmsg xxxxx idx:{0} val:{1} buf_val:{2} recv_buf_len:{3}", i, data[i], recv_buf[recv_buf_len + 1], recv_buf_len);
|
||||||
|
}
|
||||||
|
*recv_buf_len += len;
|
||||||
|
while *recv_buf_len >= 16 {
|
||||||
|
let pack_len = (recv_buf[0] as u16) + ((recv_buf[1] as u16) << 8);
|
||||||
|
let msg_id = (recv_buf[2] as u16) + ((recv_buf[3] as u16) << 8);
|
||||||
|
let seq_id = (recv_buf[4] as i32) + ((recv_buf[5] as i32) << 8) +
|
||||||
|
((recv_buf[7] as i32) << 16) + ((recv_buf[7] as i32) << 24);
|
||||||
|
let magic_code = (recv_buf[8] as u16) + ((recv_buf[9] as u16) << 8);
|
||||||
|
let rpc_error_code = (recv_buf[10] as u16) + ((recv_buf[11] as u16) << 8);
|
||||||
|
let socket_handle = (recv_buf[12] as u16) + ((recv_buf[13] as u16) << 8);
|
||||||
|
let ext_len = (recv_buf[14] as u16) + ((recv_buf[15] as u16) << 8);
|
||||||
|
if ((pack_len + 16) as usize) > *recv_buf_len {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
let mut tmp_bytes = BytesMut::with_capacity((pack_len + 16) as usize);
|
||||||
|
{
|
||||||
|
tmp_bytes.put_u16_le(pack_len);
|
||||||
|
tmp_bytes.put_u16_le(msg_id);
|
||||||
|
tmp_bytes.put_i32_le(seq_id);
|
||||||
|
tmp_bytes.put_u16_le(magic_code);
|
||||||
|
tmp_bytes.put_u16_le(ext_len);
|
||||||
|
for i in 0..pack_len {
|
||||||
|
tmp_bytes.put_u8(recv_buf[(16 + i) as usize]);
|
||||||
|
}
|
||||||
|
let mut a_bytes = actix_web::web::Bytes::copy_from_slice(tmp_bytes.as_ref());
|
||||||
|
{
|
||||||
|
let node = UpStreamPack::new(socket_handle, tmp_bytes);
|
||||||
|
up_pack_queue.lock().unwrap().push(&node.lock().unwrap().entry);
|
||||||
|
}
|
||||||
|
//a.do_send(msg);
|
||||||
|
|
||||||
|
{
|
||||||
|
println!("sendmsg socket_handle:{0} pack_len:{1} msg_id:{2} seq_id:{3} magic_code:{4} ext_len:{5}",
|
||||||
|
socket_handle,
|
||||||
|
pack_len,
|
||||||
|
msg_id,
|
||||||
|
seq_id,
|
||||||
|
magic_code,
|
||||||
|
ext_len);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for i in (pack_len + 16) as usize..*recv_buf_len {
|
||||||
|
recv_buf[i - (pack_len + 16) as usize] = recv_buf[i];
|
||||||
|
}
|
||||||
|
*recv_buf_len -= (pack_len + 16) as usize;
|
||||||
|
}
|
||||||
|
}
|
||||||
async fn upstream_enter(stream_arc: Arc<tokio::sync::Mutex<TcpStream>>, down_pack_queue: Arc<std::sync::Mutex::<r9::QueueLock<DownStreamPack>>>,
|
async fn upstream_enter(stream_arc: Arc<tokio::sync::Mutex<TcpStream>>, down_pack_queue: Arc<std::sync::Mutex::<r9::QueueLock<DownStreamPack>>>,
|
||||||
up_pack_queue: Arc<std::sync::Mutex::<r9::QueueLock<UpStreamPack>>>) {
|
up_pack_queue: Arc<std::sync::Mutex::<r9::QueueLock<UpStreamPack>>>) {
|
||||||
let on_read = |len: usize,
|
|
||||||
recv_buf_len: &mut usize,
|
|
||||||
recv_buf: &mut BytesMut,
|
|
||||||
data: &Vec<u8>,
|
|
||||||
up_pack_queue: &Arc<std::sync::Mutex::<r9::QueueLock<UpStreamPack>>>| {
|
|
||||||
println!("read {0} bytes recv_buf_len:{1}",len, recv_buf_len);
|
|
||||||
for i in 0..len {
|
|
||||||
recv_buf[*recv_buf_len + i] = data[i];
|
|
||||||
//println!("sendmsg xxxxx idx:{0} val:{1} buf_val:{2} recv_buf_len:{3}", i, data[i], recv_buf[recv_buf_len + 1], recv_buf_len);
|
|
||||||
}
|
|
||||||
*recv_buf_len += len;
|
|
||||||
while *recv_buf_len >= 16 {
|
|
||||||
let pack_len = (recv_buf[0] as u16) + ((recv_buf[1] as u16) << 8);
|
|
||||||
let msg_id = (recv_buf[2] as u16) + ((recv_buf[3] as u16) << 8);
|
|
||||||
let seq_id = (recv_buf[4] as i32) + ((recv_buf[5] as i32) << 8) +
|
|
||||||
((recv_buf[7] as i32) << 16) + ((recv_buf[7] as i32) << 24);
|
|
||||||
let magic_code = (recv_buf[8] as u16) + ((recv_buf[9] as u16) << 8);
|
|
||||||
let rpc_error_code = (recv_buf[10] as u16) + ((recv_buf[11] as u16) << 8);
|
|
||||||
let socket_handle = (recv_buf[12] as u16) + ((recv_buf[13] as u16) << 8);
|
|
||||||
let ext_len = (recv_buf[14] as u16) + ((recv_buf[15] as u16) << 8);
|
|
||||||
if ((pack_len + 16) as usize) > *recv_buf_len {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
let mut tmp_bytes = BytesMut::with_capacity((pack_len + 16) as usize);
|
|
||||||
{
|
|
||||||
tmp_bytes.put_u16_le(pack_len);
|
|
||||||
tmp_bytes.put_u16_le(msg_id);
|
|
||||||
tmp_bytes.put_i32_le(seq_id);
|
|
||||||
tmp_bytes.put_u16_le(magic_code);
|
|
||||||
tmp_bytes.put_u16_le(ext_len);
|
|
||||||
for i in 0..pack_len {
|
|
||||||
tmp_bytes.put_u8(recv_buf[(16 + i) as usize]);
|
|
||||||
}
|
|
||||||
let mut a_bytes = actix_web::web::Bytes::copy_from_slice(tmp_bytes.as_ref());
|
|
||||||
{
|
|
||||||
let node = UpStreamPack::new(socket_handle, tmp_bytes);
|
|
||||||
up_pack_queue.lock().unwrap().push(&node.lock().unwrap().entry);
|
|
||||||
}
|
|
||||||
//a.do_send(msg);
|
|
||||||
|
|
||||||
{
|
|
||||||
println!("sendmsg socket_handle:{0} pack_len:{1} msg_id:{2} seq_id:{3} magic_code:{4} ext_len:{5}",
|
|
||||||
socket_handle,
|
|
||||||
pack_len,
|
|
||||||
msg_id,
|
|
||||||
seq_id,
|
|
||||||
magic_code,
|
|
||||||
ext_len);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for i in (pack_len + 16) as usize..*recv_buf_len {
|
|
||||||
recv_buf[i - (pack_len + 16) as usize] = recv_buf[i];
|
|
||||||
}
|
|
||||||
*recv_buf_len -= (pack_len + 16) as usize;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
println!("upstream_enter2");
|
println!("upstream_enter2");
|
||||||
let mut recv_buf = BytesMut::with_capacity(1024 * 64 * 3);
|
let mut recv_buf = BytesMut::with_capacity(1024 * 64 * 3);
|
||||||
@ -115,25 +137,7 @@ async fn upstream_enter(stream_arc: Arc<tokio::sync::Mutex<TcpStream>>, down_pac
|
|||||||
|
|
||||||
{
|
{
|
||||||
if r.is_writable() {
|
if r.is_writable() {
|
||||||
let mut data = bytes::BytesMut::new();
|
on_write(&mut stream, &down_pack_queue).await;
|
||||||
{
|
|
||||||
down_pack_queue.lock().unwrap().fetch();
|
|
||||||
//down_pack_queue.get_mut().fetch();
|
|
||||||
let work_list = down_pack_queue.lock().unwrap().work_list.lock().unwrap().clone();
|
|
||||||
//let work_list = down_pack_queue.get_mut().work_list.lock().unwrap().clone();
|
|
||||||
while !work_list.lock().unwrap().empty() {
|
|
||||||
let node = &work_list.lock().unwrap().first_entry();
|
|
||||||
node.upgrade().unwrap().lock().unwrap().entry.lock().unwrap().del_init();
|
|
||||||
data = node.upgrade().unwrap().lock().unwrap().data.clone();
|
|
||||||
break;
|
|
||||||
//stream.write_all(node.upgrade().unwrap().borrow().data.as_ref()).await;
|
|
||||||
//stream.write_all(data.as_ref()).await;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if data.len() > 0 {
|
|
||||||
println!("write_all2 len:{0}", data.len());
|
|
||||||
stream.write_all(data.as_ref()).await;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user