1
This commit is contained in:
parent
32e21d71ba
commit
cb8007590e
@ -13,7 +13,7 @@ use r9::xtimer::TimerList;
|
|||||||
//use crate::ss;
|
//use crate::ss;
|
||||||
|
|
||||||
pub struct DownStreamPack {
|
pub struct DownStreamPack {
|
||||||
holder: Option<Rc::<RefCell::<DownStreamPack>>>,
|
pub holder: Option<Rc::<RefCell::<DownStreamPack>>>,
|
||||||
pub socket_handle: u16,
|
pub socket_handle: u16,
|
||||||
pub data: bytes::BytesMut,
|
pub data: bytes::BytesMut,
|
||||||
pub entry: Rc::<RefCell::<r9::ListHead<DownStreamPack>>>,
|
pub entry: Rc::<RefCell::<r9::ListHead<DownStreamPack>>>,
|
||||||
@ -65,6 +65,7 @@ impl f9::app::UserApp for UserApp {
|
|||||||
node.upgrade().unwrap().borrow_mut().entry.borrow_mut().del_init();
|
node.upgrade().unwrap().borrow_mut().entry.borrow_mut().del_init();
|
||||||
WsListener::instance().borrow().add_down_stream_pack(node.clone());
|
WsListener::instance().borrow().add_down_stream_pack(node.clone());
|
||||||
}
|
}
|
||||||
|
WsListener::instance().borrow().update();
|
||||||
}
|
}
|
||||||
|
|
||||||
fn uninit(&mut self) {
|
fn uninit(&mut self) {
|
||||||
|
@ -41,6 +41,7 @@ pub struct WsListener {
|
|||||||
pub upstream: std::sync::Weak<Mutex<TcpStream>>,
|
pub upstream: std::sync::Weak<Mutex<TcpStream>>,
|
||||||
tokio_rt: Runtime,
|
tokio_rt: Runtime,
|
||||||
down_pack_queue: Arc<std::sync::Mutex<r9::Queue<DownStreamPack>>>,
|
down_pack_queue: Arc<std::sync::Mutex<r9::Queue<DownStreamPack>>>,
|
||||||
|
up_pack_queue: Arc<std::sync::Mutex<r9::Queue<DownStreamPack>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Message)]
|
#[derive(Message)]
|
||||||
@ -179,10 +180,17 @@ async fn index(req: HttpRequest, stream: web::Payload) -> Result<HttpResponse, E
|
|||||||
resp
|
resp
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn upstream_enter(stream_arc: Arc<Mutex<TcpStream>>, down_pack_queue: Arc<std::sync::Mutex::<r9::Queue<DownStreamPack>>>,) {
|
async fn upstream_enter(stream_arc: Arc<Mutex<TcpStream>>, down_pack_queue: Arc<std::sync::Mutex::<r9::Queue<DownStreamPack>>>,
|
||||||
println!("upstream_enter1");
|
up_pack_queue: Arc<std::sync::Mutex::<r9::Queue<DownStreamPack>>>) {
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
println!("upstream_enter2");
|
println!("upstream_enter2");
|
||||||
|
let mut recv_buf = BytesMut::with_capacity(1024 * 64 * 3);
|
||||||
|
let mut recv_buf_len = 0;
|
||||||
|
{
|
||||||
|
for i in 0..recv_buf.capacity() {
|
||||||
|
recv_buf.put_i8(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
loop {
|
loop {
|
||||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||||
let mut stream = stream_arc.lock().await;
|
let mut stream = stream_arc.lock().await;
|
||||||
@ -191,17 +199,59 @@ async fn upstream_enter(stream_arc: Arc<Mutex<TcpStream>>, down_pack_queue: Arc<
|
|||||||
Ok(r) => {
|
Ok(r) => {
|
||||||
{
|
{
|
||||||
if r.is_readable() {
|
if r.is_readable() {
|
||||||
let mut data = vec![0; 1024 * 64];
|
|
||||||
// Try to read data, this may still fail with `WouldBlock`
|
// Try to read data, this may still fail with `WouldBlock`
|
||||||
// if the readiness event is a false positive.
|
// if the readiness event is a false positive.
|
||||||
|
let mut data = BytesMut::with_capacity(1024 * 64 * 3);
|
||||||
match stream.try_read(&mut data) {
|
match stream.try_read(&mut data) {
|
||||||
Ok(n) => {
|
Ok(len) => {
|
||||||
//println!("read {} bytes", n);
|
println!("read {} bytes",len);
|
||||||
let mut tmp_bytes = BytesMut::with_capacity((n + 0) as usize);
|
for i in 0..len {
|
||||||
for i in 0..n {
|
recv_buf[recv_buf_len + i] = data[i];
|
||||||
tmp_bytes.put_u8(data[i]);
|
//println!("sendmsg xxxxx idx:{0} val:{1} buf_val:{2} recv_buf_len:{3}", i, data[i], recv_buf[recv_buf_len + 1], recv_buf_len);
|
||||||
|
}
|
||||||
|
recv_buf_len += len;
|
||||||
|
while recv_buf_len >= 16 {
|
||||||
|
let pack_len = (recv_buf[0] as u16) + ((recv_buf[1] as u16) << 8);
|
||||||
|
let msg_id = (recv_buf[2] as u16) + ((recv_buf[3] as u16) << 8);
|
||||||
|
let seq_id = (recv_buf[4] as i32) + ((recv_buf[5] as i32) << 8) +
|
||||||
|
((recv_buf[7] as i32) << 16) + ((recv_buf[7] as i32) << 24);
|
||||||
|
let magic_code = (recv_buf[8] as u16) + ((recv_buf[9] as u16) << 8);
|
||||||
|
let rpc_error_code = (recv_buf[10] as u16) + ((recv_buf[11] as u16) << 8);
|
||||||
|
let socket_handle = (recv_buf[12] as u16) + ((recv_buf[13] as u16) << 8);
|
||||||
|
let ext_len = (recv_buf[14] as u16) + ((recv_buf[15] as u16) << 8);
|
||||||
|
if ((pack_len + 16) as usize) > recv_buf_len {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
let mut tmp_bytes = BytesMut::with_capacity((pack_len + 16) as usize);
|
||||||
|
{
|
||||||
|
tmp_bytes.put_u16_le(pack_len);
|
||||||
|
tmp_bytes.put_u16_le(msg_id);
|
||||||
|
tmp_bytes.put_i32_le(seq_id);
|
||||||
|
tmp_bytes.put_u16_le(magic_code);
|
||||||
|
tmp_bytes.put_u16_le(ext_len);
|
||||||
|
for i in 0..pack_len {
|
||||||
|
tmp_bytes.put_u8(recv_buf[(16 + i) as usize]);
|
||||||
|
}
|
||||||
|
let mut a_bytes = actix_web::web::Bytes::copy_from_slice(tmp_bytes.as_ref());
|
||||||
|
let msg = GSResponse {
|
||||||
|
data: a_bytes
|
||||||
|
};
|
||||||
|
//a.do_send(msg);
|
||||||
|
/*
|
||||||
|
{
|
||||||
|
println!("sendmsg pack_len:{0} msg_id:{1} seq_id:{2} magic_code:{3} ext_len:{4}",
|
||||||
|
pack_len,
|
||||||
|
msg_id,
|
||||||
|
seq_id,
|
||||||
|
magic_code,
|
||||||
|
ext_len);
|
||||||
|
}*/
|
||||||
|
}
|
||||||
|
for i in (pack_len + 16) as usize..recv_buf_len {
|
||||||
|
recv_buf[i - (pack_len + 16) as usize] = recv_buf[i];
|
||||||
|
}
|
||||||
|
recv_buf_len -= (pack_len + 16) as usize;
|
||||||
}
|
}
|
||||||
//Arc::downgrade(&mut recv_queue).upgrade().unwrap().lock().unwrap().borrow_mut().insert(0, tmp_bytes);
|
|
||||||
}
|
}
|
||||||
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
|
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
|
||||||
println!("read error 1");
|
println!("read error 1");
|
||||||
@ -261,6 +311,7 @@ impl WsListener {
|
|||||||
.build()
|
.build()
|
||||||
.unwrap(),
|
.unwrap(),
|
||||||
down_pack_queue: r9::Queue::<DownStreamPack>::new_ex(),
|
down_pack_queue: r9::Queue::<DownStreamPack>::new_ex(),
|
||||||
|
up_pack_queue: r9::Queue::<DownStreamPack>::new_ex(),
|
||||||
};
|
};
|
||||||
return p;
|
return p;
|
||||||
}
|
}
|
||||||
@ -293,16 +344,15 @@ impl WsListener {
|
|||||||
{
|
{
|
||||||
let upstream = self.upstream.clone();
|
let upstream = self.upstream.clone();
|
||||||
let down_pack_queue = self.down_pack_queue.clone();
|
let down_pack_queue = self.down_pack_queue.clone();
|
||||||
|
let up_pack_queue = self.up_pack_queue.clone();
|
||||||
self.tokio_rt.spawn(async move {
|
self.tokio_rt.spawn(async move {
|
||||||
let mut result = TcpStream::connect("192.168.100.39:7616").await;
|
let mut result = TcpStream::connect("192.168.100.39:7616").await;
|
||||||
match result {
|
match result {
|
||||||
Ok(v) => {
|
Ok(v) => {
|
||||||
//upstream.lock().unwrap().replace(v);
|
|
||||||
let a = Arc::new(Mutex::new(v));
|
let a = Arc::new(Mutex::new(v));
|
||||||
//upstream = Arc::downgrade(&a);
|
|
||||||
WsListener::instance().borrow_mut().upstream = Arc::downgrade(&a);
|
WsListener::instance().borrow_mut().upstream = Arc::downgrade(&a);
|
||||||
println!("connect ok");
|
println!("connect ok");
|
||||||
upstream_enter(a, down_pack_queue).await;
|
upstream_enter(a, down_pack_queue, up_pack_queue).await;
|
||||||
},
|
},
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
println!("connect err")
|
println!("connect err")
|
||||||
@ -314,6 +364,28 @@ impl WsListener {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn update(&self) {
|
||||||
|
let mut data = bytes::BytesMut::new();
|
||||||
|
{
|
||||||
|
self.up_pack_queue.lock().unwrap().fetch();
|
||||||
|
//down_pack_queue.get_mut().fetch();
|
||||||
|
let work_list = self.up_pack_queue.lock().unwrap().work_list.lock().unwrap().clone();
|
||||||
|
//let work_list = down_pack_queue.get_mut().work_list.lock().unwrap().clone();
|
||||||
|
while !work_list.borrow().empty() {
|
||||||
|
let node = &work_list.borrow().first_entry();
|
||||||
|
node.upgrade().unwrap().borrow_mut().entry.borrow_mut().del_init();
|
||||||
|
data = node.upgrade().unwrap().borrow().data.clone();
|
||||||
|
break;
|
||||||
|
//stream.write_all(node.upgrade().unwrap().borrow().data.as_ref()).await;
|
||||||
|
//stream.write_all(data.as_ref()).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if data.len() > 0 {
|
||||||
|
println!("write_all3 len:{0}", data.len());
|
||||||
|
//stream.write_all(data.as_ref()).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub fn uninit(&mut self) {
|
pub fn uninit(&mut self) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user