1
This commit is contained in:
parent
cb8007590e
commit
436c47ea3f
@ -48,6 +48,7 @@ pub struct WsListener {
|
|||||||
#[rtype(result = "()")]
|
#[rtype(result = "()")]
|
||||||
pub struct DownStreamMessage {
|
pub struct DownStreamMessage {
|
||||||
data: actix_web::web::Bytes,
|
data: actix_web::web::Bytes,
|
||||||
|
//data: BytesMut,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Define HTTP actor
|
/// Define HTTP actor
|
||||||
@ -99,7 +100,7 @@ impl WsConn {
|
|||||||
tmp_bytes.put_u16_le(msg_id);
|
tmp_bytes.put_u16_le(msg_id);
|
||||||
tmp_bytes.put_i32_le(seq_id);
|
tmp_bytes.put_i32_le(seq_id);
|
||||||
tmp_bytes.put_u16_le(magic_code);
|
tmp_bytes.put_u16_le(magic_code);
|
||||||
tmp_bytes.put_u16_le(ext_len);
|
tmp_bytes.put_u16_le(self.socket_handle); //tmp_bytes.put_u16_le(ext_len);
|
||||||
tmp_bytes.put_i32_le(0);
|
tmp_bytes.put_i32_le(0);
|
||||||
tmp_bytes.put_i64_le(0);
|
tmp_bytes.put_i64_le(0);
|
||||||
for i in 24..tmp_bytes.capacity() {
|
for i in 24..tmp_bytes.capacity() {
|
||||||
@ -201,10 +202,10 @@ async fn upstream_enter(stream_arc: Arc<Mutex<TcpStream>>, down_pack_queue: Arc<
|
|||||||
if r.is_readable() {
|
if r.is_readable() {
|
||||||
// Try to read data, this may still fail with `WouldBlock`
|
// Try to read data, this may still fail with `WouldBlock`
|
||||||
// if the readiness event is a false positive.
|
// if the readiness event is a false positive.
|
||||||
let mut data = BytesMut::with_capacity(1024 * 64 * 3);
|
let mut data = vec![0; 1024 * 64];
|
||||||
match stream.try_read(&mut data) {
|
match stream.try_read(&mut data) {
|
||||||
Ok(len) => {
|
Ok(len) => {
|
||||||
println!("read {} bytes",len);
|
println!("read {0} bytes",len);
|
||||||
for i in 0..len {
|
for i in 0..len {
|
||||||
recv_buf[recv_buf_len + i] = data[i];
|
recv_buf[recv_buf_len + i] = data[i];
|
||||||
//println!("sendmsg xxxxx idx:{0} val:{1} buf_val:{2} recv_buf_len:{3}", i, data[i], recv_buf[recv_buf_len + 1], recv_buf_len);
|
//println!("sendmsg xxxxx idx:{0} val:{1} buf_val:{2} recv_buf_len:{3}", i, data[i], recv_buf[recv_buf_len + 1], recv_buf_len);
|
||||||
@ -233,19 +234,28 @@ async fn upstream_enter(stream_arc: Arc<Mutex<TcpStream>>, down_pack_queue: Arc<
|
|||||||
tmp_bytes.put_u8(recv_buf[(16 + i) as usize]);
|
tmp_bytes.put_u8(recv_buf[(16 + i) as usize]);
|
||||||
}
|
}
|
||||||
let mut a_bytes = actix_web::web::Bytes::copy_from_slice(tmp_bytes.as_ref());
|
let mut a_bytes = actix_web::web::Bytes::copy_from_slice(tmp_bytes.as_ref());
|
||||||
let msg = GSResponse {
|
|
||||||
data: a_bytes
|
|
||||||
};
|
|
||||||
//a.do_send(msg);
|
|
||||||
/*
|
|
||||||
{
|
{
|
||||||
println!("sendmsg pack_len:{0} msg_id:{1} seq_id:{2} magic_code:{3} ext_len:{4}",
|
let node = Rc::new(RefCell::new(DownStreamPack{
|
||||||
|
holder: Default::default(),
|
||||||
|
socket_handle: socket_handle,
|
||||||
|
data: tmp_bytes,
|
||||||
|
entry: r9::ListHead::< crate::app::user_app::DownStreamPack>::new_node(Default::default()),
|
||||||
|
}));
|
||||||
|
node.borrow_mut().holder = Some(node.clone());
|
||||||
|
node.borrow_mut().entry = r9::ListHead::< crate::app::user_app::DownStreamPack>::new_node(Rc::downgrade(&node));
|
||||||
|
up_pack_queue.lock().unwrap().push(&node.borrow_mut().entry);
|
||||||
|
}
|
||||||
|
//a.do_send(msg);
|
||||||
|
|
||||||
|
{
|
||||||
|
println!("sendmsg socket_handle:{5} pack_len:{0} msg_id:{1} seq_id:{2} magic_code:{3} ext_len:{4}",
|
||||||
|
socket_handle,
|
||||||
pack_len,
|
pack_len,
|
||||||
msg_id,
|
msg_id,
|
||||||
seq_id,
|
seq_id,
|
||||||
magic_code,
|
magic_code,
|
||||||
ext_len);
|
ext_len);
|
||||||
}*/
|
}
|
||||||
}
|
}
|
||||||
for i in (pack_len + 16) as usize..recv_buf_len {
|
for i in (pack_len + 16) as usize..recv_buf_len {
|
||||||
recv_buf[i - (pack_len + 16) as usize] = recv_buf[i];
|
recv_buf[i - (pack_len + 16) as usize] = recv_buf[i];
|
||||||
@ -366,6 +376,7 @@ impl WsListener {
|
|||||||
|
|
||||||
pub fn update(&self) {
|
pub fn update(&self) {
|
||||||
let mut data = bytes::BytesMut::new();
|
let mut data = bytes::BytesMut::new();
|
||||||
|
let mut socket_handle = 0;
|
||||||
{
|
{
|
||||||
self.up_pack_queue.lock().unwrap().fetch();
|
self.up_pack_queue.lock().unwrap().fetch();
|
||||||
//down_pack_queue.get_mut().fetch();
|
//down_pack_queue.get_mut().fetch();
|
||||||
@ -375,14 +386,25 @@ impl WsListener {
|
|||||||
let node = &work_list.borrow().first_entry();
|
let node = &work_list.borrow().first_entry();
|
||||||
node.upgrade().unwrap().borrow_mut().entry.borrow_mut().del_init();
|
node.upgrade().unwrap().borrow_mut().entry.borrow_mut().del_init();
|
||||||
data = node.upgrade().unwrap().borrow().data.clone();
|
data = node.upgrade().unwrap().borrow().data.clone();
|
||||||
|
socket_handle = node.upgrade().unwrap().borrow().socket_handle;
|
||||||
break;
|
break;
|
||||||
//stream.write_all(node.upgrade().unwrap().borrow().data.as_ref()).await;
|
//stream.write_all(node.upgrade().unwrap().borrow().data.as_ref()).await;
|
||||||
//stream.write_all(data.as_ref()).await;
|
//stream.write_all(data.as_ref()).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if data.len() > 0 {
|
if data.len() > 0 {
|
||||||
println!("write_all3 len:{0}", data.len());
|
println!("write_all3 socket_handle:{0} len:{1}", socket_handle, data.len());
|
||||||
//stream.write_all(data.as_ref()).await;
|
match self.socket_hash.get(&socket_handle){
|
||||||
|
Some(v) => {
|
||||||
|
let msg = DownStreamMessage{
|
||||||
|
data: actix_web::web::Bytes::copy_from_slice(data.as_ref())
|
||||||
|
};
|
||||||
|
v.do_send(msg);
|
||||||
|
},
|
||||||
|
None => {
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user