1
This commit is contained in:
parent
b87ad6eaa6
commit
ec5cfbdeb3
@ -122,37 +122,72 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for MyWs {
|
|||||||
fn started(&mut self, ctx: &mut Self::Context) {
|
fn started(&mut self, ctx: &mut Self::Context) {
|
||||||
let a: Recipient<GSResponse> = ctx.address().recipient();
|
let a: Recipient<GSResponse> = ctx.address().recipient();
|
||||||
|
|
||||||
|
let mut recv_buf = BytesMut::with_capacity(1024 * 64 * 3);
|
||||||
|
let mut recv_buf_len = 0;
|
||||||
|
{
|
||||||
|
for i in 0..recv_buf.capacity() {
|
||||||
|
recv_buf.put_i8(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
let mut recv_queue = self.recv_queue.clone();
|
let mut recv_queue = self.recv_queue.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
loop {
|
loop {
|
||||||
while Arc::downgrade(&mut recv_queue).upgrade().unwrap().lock().unwrap().borrow().len() > 0 {
|
while Arc::downgrade(&mut recv_queue).upgrade().unwrap().lock().unwrap().borrow().len() > 0 {
|
||||||
let bytes = Arc::downgrade(&mut recv_queue).upgrade().unwrap().lock().unwrap().borrow_mut().pop().clone();
|
let bytes = Arc::downgrade(&mut recv_queue).upgrade().unwrap().lock().unwrap().borrow_mut().pop();
|
||||||
let len = bytes.clone().unwrap().clone().len();
|
let len = bytes.clone().unwrap().clone().len();
|
||||||
{
|
{
|
||||||
println!("sendmsg len:{0}", len);
|
//println!("sendmsg 1111111111111111111111111111111 len:{0}", len);
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
let data = bytes.unwrap();
|
let data = bytes.unwrap();
|
||||||
let pack_len = (data.as_ref()[0] as u16)+ ((data.as_ref()[1] as u16) << 8);
|
for i in 0..len {
|
||||||
let msg_id = (data.as_ref()[2] as u16)+ ((data.as_ref()[3] as u16) << 8);
|
recv_buf[recv_buf_len + i] = data[i];
|
||||||
let seq_id = (data.as_ref()[4] as i32)+ ((data.as_ref()[5] as i32) << 8) +
|
//println!("sendmsg xxxxx idx:{0} val:{1} buf_val:{2} recv_buf_len:{3}", i, data[i], recv_buf[recv_buf_len + 1], recv_buf_len);
|
||||||
((data.as_ref()[7] as i32) << 16)+ ((data.as_ref()[7] as i32) << 24);
|
}
|
||||||
let magic_code = (data.as_ref()[8] as u16)+ ((data.as_ref()[9] as u16) << 8);
|
recv_buf_len += len;
|
||||||
let rpc_error_code = (data.as_ref()[10] as u16)+ ((data.as_ref()[11] as u16) << 8);
|
while recv_buf_len >= 16 {
|
||||||
let socket_handle = (data.as_ref()[12] as u16)+ ((data.as_ref()[13] as u16) << 8);
|
let pack_len = (recv_buf[0] as u16) + ((recv_buf[1] as u16) << 8);
|
||||||
let ext_len = (data.as_ref()[14] as u16)+ ((data.as_ref()[15] as u16) << 8);
|
let msg_id = (recv_buf[2] as u16) + ((recv_buf[3] as u16) << 8);
|
||||||
if (pack_len + 16) as usize != data.len() {
|
let seq_id = (recv_buf[4] as i32) + ((recv_buf[5] as i32) << 8) +
|
||||||
panic!("errro len");
|
((recv_buf[7] as i32) << 16) + ((recv_buf[7] as i32) << 24);
|
||||||
|
let magic_code = (recv_buf[8] as u16) + ((recv_buf[9] as u16) << 8);
|
||||||
|
let rpc_error_code = (recv_buf[10] as u16) + ((recv_buf[11] as u16) << 8);
|
||||||
|
let socket_handle = (recv_buf[12] as u16) + ((recv_buf[13] as u16) << 8);
|
||||||
|
let ext_len = (recv_buf[14] as u16) + ((recv_buf[15] as u16) << 8);
|
||||||
|
if ((pack_len + 16) as usize) > recv_buf_len {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
let mut tmp_bytes = BytesMut::with_capacity((pack_len + 16) as usize);
|
||||||
|
{
|
||||||
|
tmp_bytes.put_u16_le(pack_len);
|
||||||
|
tmp_bytes.put_u16_le(msg_id);
|
||||||
|
tmp_bytes.put_i32_le(seq_id);
|
||||||
|
tmp_bytes.put_u16_le(magic_code);
|
||||||
|
tmp_bytes.put_u16_le(ext_len);
|
||||||
|
for i in 0..pack_len {
|
||||||
|
tmp_bytes.put_u8(recv_buf[(16 + i) as usize]);
|
||||||
|
}
|
||||||
|
let mut a_bytes = actix_web::web::Bytes::copy_from_slice(tmp_bytes.as_ref());
|
||||||
|
let msg = GSResponse {
|
||||||
|
data: a_bytes
|
||||||
|
};
|
||||||
|
a.do_send(msg);
|
||||||
|
/*
|
||||||
|
{
|
||||||
|
println!("sendmsg pack_len:{0} msg_id:{1} seq_id:{2} magic_code:{3} ext_len:{4}",
|
||||||
|
pack_len,
|
||||||
|
msg_id,
|
||||||
|
seq_id,
|
||||||
|
magic_code,
|
||||||
|
ext_len);
|
||||||
|
}*/
|
||||||
|
}
|
||||||
|
for i in (pack_len + 16) as usize..recv_buf_len {
|
||||||
|
recv_buf[i - (pack_len + 16) as usize] = recv_buf[i];
|
||||||
|
}
|
||||||
|
recv_buf_len -= (pack_len + 16) as usize;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
/*
|
|
||||||
{
|
|
||||||
let mut a_bytes = actix_web::web::Bytes::copy_from_slice(bytes.unwrap().as_ref());
|
|
||||||
let msg = GSResponse {
|
|
||||||
data: a_bytes
|
|
||||||
};
|
|
||||||
a.send(msg);
|
|
||||||
}*/
|
|
||||||
}
|
}
|
||||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||||
}
|
}
|
||||||
@ -165,37 +200,18 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for MyWs {
|
|||||||
Ok(ws::Message::Ping(msg)) => ctx.pong(&msg),
|
Ok(ws::Message::Ping(msg)) => ctx.pong(&msg),
|
||||||
Ok(ws::Message::Text(text)) => ctx.text(text),
|
Ok(ws::Message::Text(text)) => ctx.text(text),
|
||||||
Ok(ws::Message::Binary(bin)) => {
|
Ok(ws::Message::Binary(bin)) => {
|
||||||
///Arc::downgrade(&mut self.send_queue).upgrade().unwrap().lock().unwrap().borrow_mut().push(bin.clone());
|
|
||||||
println!("recv_buf:{0}", bin.len());
|
|
||||||
//ctx.binary(bin);
|
|
||||||
//self.down_stream.write_all(&bin);
|
|
||||||
self.parse_pkt(&bin);
|
self.parse_pkt(&bin);
|
||||||
//ctx.binary(bin);
|
|
||||||
},
|
},
|
||||||
_ => (),
|
_ => (),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Message)]
|
|
||||||
#[rtype(result = "()")]
|
|
||||||
pub struct Message(pub String);
|
|
||||||
|
|
||||||
#[derive(Message)]
|
#[derive(Message)]
|
||||||
#[rtype(result = "()")]
|
#[rtype(result = "()")]
|
||||||
pub struct GSResponse {
|
pub struct GSResponse {
|
||||||
data: actix_web::web::Bytes,
|
data: actix_web::web::Bytes,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Handler<Message> for MyWs {
|
|
||||||
type Result = ();
|
|
||||||
|
|
||||||
|
|
||||||
fn handle(&mut self, msg: Message, _: &mut ws::WebsocketContext<Self>) {
|
|
||||||
//self.send_message(&msg.room, msg.msg.as_str(), msg.id);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Handler<GSResponse> for MyWs {
|
impl Handler<GSResponse> for MyWs {
|
||||||
type Result = ();
|
type Result = ();
|
||||||
|
|
||||||
@ -239,7 +255,7 @@ async fn index(req: HttpRequest, stream: web::Payload, data: web::Data<AppStateW
|
|||||||
// if the readiness event is a false positive.
|
// if the readiness event is a false positive.
|
||||||
match down_stream.try_read(&mut data) {
|
match down_stream.try_read(&mut data) {
|
||||||
Ok(n) => {
|
Ok(n) => {
|
||||||
println!("read {} bytes", n);
|
//println!("read {} bytes", n);
|
||||||
let mut tmp_bytes = BytesMut::with_capacity((n + 0) as usize);
|
let mut tmp_bytes = BytesMut::with_capacity((n + 0) as usize);
|
||||||
for i in 0..n {
|
for i in 0..n {
|
||||||
tmp_bytes.put_u8(data[i]);
|
tmp_bytes.put_u8(data[i]);
|
||||||
@ -265,7 +281,7 @@ async fn index(req: HttpRequest, stream: web::Payload, data: web::Data<AppStateW
|
|||||||
let bytes = Arc::downgrade(&mut send_queue).upgrade().unwrap().lock().unwrap().borrow_mut().pop();
|
let bytes = Arc::downgrade(&mut send_queue).upgrade().unwrap().lock().unwrap().borrow_mut().pop();
|
||||||
let len = bytes.clone().unwrap().clone().len();
|
let len = bytes.clone().unwrap().clone().len();
|
||||||
{
|
{
|
||||||
println!("sendmsg len:{0}", len);
|
//println!("sendmsg len:{0}", len);
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
down_stream.write_all(bytes.unwrap().as_ref()).await;
|
down_stream.write_all(bytes.unwrap().as_ref()).await;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user