1
This commit is contained in:
parent
c2fe49bcd0
commit
f9053e8740
@ -22,6 +22,9 @@ use tokio::io::{AsyncWriteExt, Ready};
|
|||||||
use tokio::net::TcpStream;
|
use tokio::net::TcpStream;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio::io::Interest;
|
use tokio::io::Interest;
|
||||||
|
use std::vec::Vec;
|
||||||
|
use std::cell::RefCell;
|
||||||
|
use std::io::Read;
|
||||||
|
|
||||||
const MAX_PACKET_LEN: usize = 1024 * 64;
|
const MAX_PACKET_LEN: usize = 1024 * 64;
|
||||||
|
|
||||||
@ -29,12 +32,60 @@ const MAX_PACKET_LEN: usize = 1024 * 64;
|
|||||||
struct MyWs {
|
struct MyWs {
|
||||||
recv_buf_len: usize,
|
recv_buf_len: usize,
|
||||||
recv_buf: BytesMut,
|
recv_buf: BytesMut,
|
||||||
|
send_queue: Arc<Mutex<RefCell<Vec::<actix_web::web::Bytes>>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Actor for MyWs {
|
impl Actor for MyWs {
|
||||||
type Context = ws::WebsocketContext<Self>;
|
type Context = ws::WebsocketContext<Self>;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl MyWs {
|
||||||
|
fn parse_pkt(&mut self, bin: &actix_web::web::Bytes) {
|
||||||
|
let buf_len = bin.len();
|
||||||
|
if buf_len > 0 {
|
||||||
|
let mut already_read_bytes: usize = 0;
|
||||||
|
{
|
||||||
|
for i in 0..1024 * 64 * 2 {
|
||||||
|
self.recv_buf.put_i8(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
while true {
|
||||||
|
let mut read_bytes = buf_len - already_read_bytes;
|
||||||
|
if read_bytes > MAX_PACKET_LEN - self.recv_buf_len {
|
||||||
|
read_bytes = MAX_PACKET_LEN - self.recv_buf_len;
|
||||||
|
}
|
||||||
|
if read_bytes > 0 {
|
||||||
|
self.recv_buf[self.recv_buf_len..bin.len()].copy_from_slice(&bin);
|
||||||
|
self.recv_buf_len += read_bytes;
|
||||||
|
already_read_bytes += read_bytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut offset = 0;
|
||||||
|
let mut pre_offect = 0;
|
||||||
|
while true {
|
||||||
|
pre_offect = offset;
|
||||||
|
{
|
||||||
|
let pack_len = (self.recv_buf[0] as u16)+ ((self.recv_buf[1] as u16) << 8);
|
||||||
|
let msg_id = (self.recv_buf[2] as u16)+ ((self.recv_buf[3] as u16) << 8);
|
||||||
|
let seq_id = (self.recv_buf[4] as i32)+ ((self.recv_buf[5] as i32) << 8) +
|
||||||
|
((self.recv_buf[7] as i32) << 16)+ ((self.recv_buf[7] as i32) << 24);
|
||||||
|
let magic_code = (self.recv_buf[8] as u16)+ ((self.recv_buf[9] as u16) << 8);
|
||||||
|
let ext_len = (self.recv_buf[10] as u16)+ ((self.recv_buf[11] as u16) << 8);
|
||||||
|
let a = 30;
|
||||||
|
}
|
||||||
|
if pre_offect >= offset || offset >= self.recv_buf_len {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}//end while
|
||||||
|
if offset > 0 && offset < self.recv_buf_len {
|
||||||
|
//self.recv_buf[..0].copy_from_slice(&bin);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
/// Handler for ws::Message message
|
/// Handler for ws::Message message
|
||||||
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for MyWs {
|
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for MyWs {
|
||||||
|
|
||||||
@ -43,51 +94,18 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for MyWs {
|
|||||||
//let b = 100;
|
//let b = 100;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
|
fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
|
||||||
match msg {
|
match msg {
|
||||||
Ok(ws::Message::Ping(msg)) => ctx.pong(&msg),
|
Ok(ws::Message::Ping(msg)) => ctx.pong(&msg),
|
||||||
Ok(ws::Message::Text(text)) => ctx.text(text),
|
Ok(ws::Message::Text(text)) => ctx.text(text),
|
||||||
Ok(ws::Message::Binary(bin)) => {
|
Ok(ws::Message::Binary(bin)) => {
|
||||||
println!("recv_buf:{0}", bin.len());
|
///Arc::downgrade(&mut self.send_queue).upgrade().unwrap().lock().unwrap().borrow_mut().push(bin.clone());
|
||||||
|
//println!("recv_buf:{0}", bin.len());
|
||||||
|
//ctx.binary(bin);
|
||||||
//self.down_stream.write_all(&bin);
|
//self.down_stream.write_all(&bin);
|
||||||
/*
|
self.parse_pkt(&bin);
|
||||||
let buf_len = bin.len();
|
ctx.binary(bin);
|
||||||
if buf_len > 0 {
|
|
||||||
let mut already_read_bytes: usize = 0;
|
|
||||||
{
|
|
||||||
for i in 0..1024*64*2 {
|
|
||||||
self.recv_buf.put_i8(0);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
while true {
|
|
||||||
let mut read_bytes = buf_len - already_read_bytes;
|
|
||||||
if read_bytes > MAX_PACKET_LEN - self.recv_buf_len {
|
|
||||||
read_bytes = MAX_PACKET_LEN - self.recv_buf_len;
|
|
||||||
}
|
|
||||||
if read_bytes > 0 {
|
|
||||||
self.recv_buf[self.recv_buf_len..bin.len()].copy_from_slice(&bin);
|
|
||||||
self.recv_buf_len += read_bytes;
|
|
||||||
already_read_bytes += read_bytes;
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut offset = 0;
|
|
||||||
let mut pre_offect = 0;
|
|
||||||
while true {
|
|
||||||
pre_offect = offset;
|
|
||||||
{
|
|
||||||
let pack_len = (self.recv_buf[0] as u16)+ ((self.recv_buf[1] as u16) << 8);
|
|
||||||
let msg_id = (self.recv_buf[2] as u16)+ ((self.recv_buf[3] as u16) << 8);
|
|
||||||
let seq_id = self.recv_buf.get_u16();
|
|
||||||
}
|
|
||||||
if pre_offect >= offset || offset >= self.recv_buf_len {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}//end while
|
|
||||||
if offset > 0 && offset < self.recv_buf_len {
|
|
||||||
//self.recv_buf[..0].copy_from_slice(&bin);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}*/
|
|
||||||
},
|
},
|
||||||
_ => (),
|
_ => (),
|
||||||
}
|
}
|
||||||
@ -122,23 +140,32 @@ async fn index(req: HttpRequest, stream: web::Payload, data: web::Data<AppStateW
|
|||||||
let ws_client = MyWs {
|
let ws_client = MyWs {
|
||||||
recv_buf_len: 0,
|
recv_buf_len: 0,
|
||||||
recv_buf: BytesMut::with_capacity(1024 * 64 * 2),
|
recv_buf: BytesMut::with_capacity(1024 * 64 * 2),
|
||||||
|
send_queue: Default::default(),
|
||||||
};
|
};
|
||||||
|
|
||||||
{
|
{
|
||||||
let mut down_stream = TcpStream::connect("192.168.100.39:7616").await?;
|
let mut down_stream = TcpStream::connect("192.168.100.39:7616").await?;
|
||||||
down_stream.set_nodelay(true);
|
down_stream.set_nodelay(true);
|
||||||
//down_stream.set_nonblocking(true)?;
|
//down_stream.set_nonblocking(true)?;
|
||||||
|
let mut send_queue = ws_client.send_queue.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
loop {
|
loop {
|
||||||
|
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||||
let ready = down_stream.ready(Interest::READABLE | Interest::WRITABLE).await;
|
let ready = down_stream.ready(Interest::READABLE | Interest::WRITABLE).await;
|
||||||
match ready {
|
match ready {
|
||||||
Ok(r) => {
|
Ok(r) => {
|
||||||
{
|
{
|
||||||
if r.is_readable() {}
|
if r.is_readable() {
|
||||||
|
let a = 100;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
if r.is_writable() {
|
if r.is_writable() {
|
||||||
down_stream.write_all(b"hello world!").await;
|
while Arc::downgrade(&mut send_queue).upgrade().unwrap().lock().unwrap().borrow().len() > 0 {
|
||||||
|
let bytes = Arc::downgrade(&mut send_queue).upgrade().unwrap().lock().unwrap().borrow_mut().pop();
|
||||||
|
down_stream.write_all(bytes.unwrap().as_ref()).await;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -156,7 +183,11 @@ async fn index(req: HttpRequest, stream: web::Payload, data: web::Data<AppStateW
|
|||||||
|
|
||||||
fn main() {
|
fn main() {
|
||||||
let t1 = thread::spawn(||{
|
let t1 = thread::spawn(||{
|
||||||
let mut rt = tokio::runtime::Runtime::new().unwrap();
|
//let mut rt = tokio::runtime::Runtime::new().unwrap();
|
||||||
|
let mut rt = tokio::runtime::Builder::new_multi_thread()
|
||||||
|
.enable_all()
|
||||||
|
.build()
|
||||||
|
.unwrap();
|
||||||
rt.block_on(async {
|
rt.block_on(async {
|
||||||
let counter = web::Data::new(AppStateWithCounter {
|
let counter = web::Data::new(AppStateWithCounter {
|
||||||
counter: Mutex::new(0),
|
counter: Mutex::new(0),
|
||||||
@ -173,7 +204,7 @@ fn main() {
|
|||||||
});
|
});
|
||||||
println!("hello2");
|
println!("hello2");
|
||||||
});
|
});
|
||||||
|
|
||||||
println!("hello3");
|
println!("hello3");
|
||||||
while true {
|
while true {
|
||||||
thread::sleep(Duration::from_secs(10));
|
thread::sleep(Duration::from_secs(10));
|
||||||
|
Loading…
x
Reference in New Issue
Block a user