1
This commit is contained in:
parent
da9b5aff3f
commit
e0d85a137a
@ -21,6 +21,7 @@ r9_macro = { path = "../../third_party/librust/r9_macro" }
|
|||||||
r9_macro_derive = { path = "../../third_party/librust/r9_macro_derive" }
|
r9_macro_derive = { path = "../../third_party/librust/r9_macro_derive" }
|
||||||
f9 = { path = "../../third_party/librust/f9" }
|
f9 = { path = "../../third_party/librust/f9" }
|
||||||
bytes = "0.5.6"
|
bytes = "0.5.6"
|
||||||
|
futures = "0.3.30"
|
||||||
|
|
||||||
[build-dependencies]
|
[build-dependencies]
|
||||||
protobuf-codegen-pure = "2.9.0"
|
protobuf-codegen-pure = "2.9.0"
|
||||||
|
@ -14,7 +14,7 @@ use r9::xtimer::TimerList;
|
|||||||
|
|
||||||
pub struct DownStreamPack {
|
pub struct DownStreamPack {
|
||||||
holder: Option<Rc::<RefCell::<DownStreamPack>>>,
|
holder: Option<Rc::<RefCell::<DownStreamPack>>>,
|
||||||
socket_handle: u16,
|
pub socket_handle: u16,
|
||||||
pub data: bytes::BytesMut,
|
pub data: bytes::BytesMut,
|
||||||
pub entry: Rc::<RefCell::<r9::ListHead<DownStreamPack>>>,
|
pub entry: Rc::<RefCell::<r9::ListHead<DownStreamPack>>>,
|
||||||
}
|
}
|
||||||
@ -63,6 +63,7 @@ impl f9::app::UserApp for UserApp {
|
|||||||
while !work_list.borrow().empty() {
|
while !work_list.borrow().empty() {
|
||||||
let node = &work_list.borrow().first_entry();
|
let node = &work_list.borrow().first_entry();
|
||||||
node.upgrade().unwrap().borrow_mut().entry.borrow_mut().del_init();
|
node.upgrade().unwrap().borrow_mut().entry.borrow_mut().del_init();
|
||||||
|
WsListener::instance().borrow().add_down_stream_pack(node.clone());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -14,14 +14,16 @@ use actix_web::{
|
|||||||
HttpServer,
|
HttpServer,
|
||||||
Error
|
Error
|
||||||
};
|
};
|
||||||
use std::sync::{Arc, Mutex};
|
use std::sync::{Arc};
|
||||||
|
use tokio::sync::Mutex;
|
||||||
use std::thread;
|
use std::thread;
|
||||||
use std::thread::JoinHandle;
|
use std::thread::JoinHandle;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
use actix::prelude::*;
|
use actix::prelude::*;
|
||||||
use bytes::{BufMut, BytesMut};
|
use bytes::{BufMut, BytesMut};
|
||||||
use actix_web::web::Bytes;
|
use actix_web::web::Bytes;
|
||||||
use tokio::io::Interest;
|
use futures::FutureExt;
|
||||||
|
use tokio::io::{AsyncReadExt, Interest};
|
||||||
use crate::app::UserApp;
|
use crate::app::UserApp;
|
||||||
use crate::{AppStateWithCounter, GSResponse, MAX_PACKET_LEN};
|
use crate::{AppStateWithCounter, GSResponse, MAX_PACKET_LEN};
|
||||||
use tokio::net::TcpStream;
|
use tokio::net::TcpStream;
|
||||||
@ -36,9 +38,9 @@ pub struct WsListener {
|
|||||||
work_thread: Option<JoinHandle<()>>,
|
work_thread: Option<JoinHandle<()>>,
|
||||||
socket_hash: HashMap<u16, Recipient<DownStreamMessage>>,
|
socket_hash: HashMap<u16, Recipient<DownStreamMessage>>,
|
||||||
cur_socket_handle: u16,
|
cur_socket_handle: u16,
|
||||||
upstream: Arc<Mutex<Option<TcpStream>>>,
|
pub upstream: std::sync::Weak<Mutex<TcpStream>>,
|
||||||
tokio_rt: Runtime,
|
tokio_rt: Runtime,
|
||||||
down_pack_queue: Arc<Mutex<r9::Queue<DownStreamPack>>>,
|
down_pack_queue: Arc<std::sync::Mutex<r9::Queue<DownStreamPack>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Message)]
|
#[derive(Message)]
|
||||||
@ -177,12 +179,13 @@ async fn index(req: HttpRequest, stream: web::Payload) -> Result<HttpResponse, E
|
|||||||
resp
|
resp
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn upstream_enter(mut stream: TcpStream, down_pack_queue: Arc<Mutex::<r9::Queue<DownStreamPack>>>,) {
|
async fn upstream_enter(stream_arc: Arc<Mutex<TcpStream>>, down_pack_queue: Arc<std::sync::Mutex::<r9::Queue<DownStreamPack>>>,) {
|
||||||
println!("upstream_enter1");
|
println!("upstream_enter1");
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
println!("upstream_enter2");
|
println!("upstream_enter2");
|
||||||
loop {
|
loop {
|
||||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||||
|
let mut stream = stream_arc.lock().await;
|
||||||
let ready = stream.ready(Interest::READABLE | Interest::WRITABLE).await;
|
let ready = stream.ready(Interest::READABLE | Interest::WRITABLE).await;
|
||||||
match ready {
|
match ready {
|
||||||
Ok(r) => {
|
Ok(r) => {
|
||||||
@ -217,7 +220,9 @@ async fn upstream_enter(mut stream: TcpStream, down_pack_queue: Arc<Mutex::<r9::
|
|||||||
let mut data = bytes::BytesMut::new();
|
let mut data = bytes::BytesMut::new();
|
||||||
{
|
{
|
||||||
down_pack_queue.lock().unwrap().fetch();
|
down_pack_queue.lock().unwrap().fetch();
|
||||||
|
//down_pack_queue.get_mut().fetch();
|
||||||
let work_list = down_pack_queue.lock().unwrap().work_list.lock().unwrap().clone();
|
let work_list = down_pack_queue.lock().unwrap().work_list.lock().unwrap().clone();
|
||||||
|
//let work_list = down_pack_queue.get_mut().work_list.lock().unwrap().clone();
|
||||||
while !work_list.borrow().empty() {
|
while !work_list.borrow().empty() {
|
||||||
let node = &work_list.borrow().first_entry();
|
let node = &work_list.borrow().first_entry();
|
||||||
node.upgrade().unwrap().borrow_mut().entry.borrow_mut().del_init();
|
node.upgrade().unwrap().borrow_mut().entry.borrow_mut().del_init();
|
||||||
@ -270,7 +275,7 @@ impl WsListener {
|
|||||||
.unwrap();
|
.unwrap();
|
||||||
rt.block_on(async {
|
rt.block_on(async {
|
||||||
let counter = web::Data::new(AppStateWithCounter {
|
let counter = web::Data::new(AppStateWithCounter {
|
||||||
counter: Mutex::new(0),
|
counter: std::sync::Mutex::new(0),
|
||||||
});
|
});
|
||||||
println!("hello2333");
|
println!("hello2333");
|
||||||
HttpServer::new(move || {
|
HttpServer::new(move || {
|
||||||
@ -292,8 +297,11 @@ impl WsListener {
|
|||||||
match result {
|
match result {
|
||||||
Ok(v) => {
|
Ok(v) => {
|
||||||
//upstream.lock().unwrap().replace(v);
|
//upstream.lock().unwrap().replace(v);
|
||||||
|
let a = Arc::new(Mutex::new(v));
|
||||||
|
//upstream = Arc::downgrade(&a);
|
||||||
|
WsListener::instance().borrow_mut().upstream = Arc::downgrade(&a);
|
||||||
println!("connect ok");
|
println!("connect ok");
|
||||||
upstream_enter(v, down_pack_queue).await;
|
upstream_enter(a, down_pack_queue).await;
|
||||||
},
|
},
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
println!("connect err")
|
println!("connect err")
|
||||||
@ -309,6 +317,17 @@ impl WsListener {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn add_down_stream_pack(&self, data: Weak::<RefCell::<DownStreamPack>>) {
|
||||||
|
let socket_handle = data.upgrade().unwrap().borrow().socket_handle;
|
||||||
|
match self.socket_hash.get(&socket_handle) {
|
||||||
|
Some(socket) => {
|
||||||
|
let a = 0;
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
let a = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
fn on_connect(&mut self, conn: &mut WsConn, socket: Recipient<DownStreamMessage>) {
|
fn on_connect(&mut self, conn: &mut WsConn, socket: Recipient<DownStreamMessage>) {
|
||||||
self.cur_socket_handle += 1;
|
self.cur_socket_handle += 1;
|
||||||
conn.socket_handle = self.cur_socket_handle;
|
conn.socket_handle = self.cur_socket_handle;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user