This commit is contained in:
azw 2024-05-11 08:34:49 +08:00
parent 45a48a1471
commit 8ed90029b3
3 changed files with 95 additions and 3 deletions

View File

@ -19,6 +19,9 @@ pub struct DownStreamPack {
entry: Rc::<RefCell::<r9::ListHead<DownStreamPack>>>,
}
unsafe impl Send for DownStreamPack {}
unsafe impl Sync for DownStreamPack {}
#[derive(SharedFromSelf)]
#[derive(Singleton)]
pub struct UserApp {
@ -56,9 +59,8 @@ impl f9::app::UserApp for UserApp {
fn update(&mut self) {
self.net_msg_queue.borrow_mut().fetch();
let work_list = self.net_msg_queue.borrow_mut().work_list.clone();
let work_list = self.net_msg_queue.borrow().work_list.lock().unwrap().clone();
while !work_list.borrow().empty() {
println!("work_list exec");
let node = &work_list.borrow().first_entry();
node.upgrade().unwrap().borrow_mut().entry.borrow_mut().del_init();
}

View File

@ -17,11 +17,16 @@ use actix_web::{
use std::sync::{Arc, Mutex};
use std::thread;
use std::thread::JoinHandle;
use std::time::Duration;
use actix::prelude::*;
use bytes::{BufMut, BytesMut};
use actix_web::web::Bytes;
use tokio::io::Interest;
use crate::app::UserApp;
use crate::{AppStateWithCounter, GSResponse, MAX_PACKET_LEN};
use tokio::net::TcpStream;
use tokio::runtime::Runtime;
use crate::app::user_app::DownStreamPack;
#[derive(SharedFromSelf)]
#[derive(Singleton)]
@ -30,6 +35,9 @@ pub struct WsListener {
work_thread: Option<JoinHandle<()>>,
socket_hash: HashMap<u16, Recipient<DownStreamMessage>>,
cur_socket_handle: u16,
upstream: Arc<Mutex<Option<TcpStream>>>,
tokio_rt: Runtime,
down_pack_queue: Arc<Mutex<r9::Queue<DownStreamPack>>>,
}
#[derive(Message)]
@ -168,6 +176,64 @@ async fn index(req: HttpRequest, stream: web::Payload) -> Result<HttpResponse, E
resp
}
async fn upstream_enter(stream: TcpStream, down_pack_queue: Arc<Mutex::<r9::Queue<DownStreamPack>>>,) {
tokio::spawn(async move {
loop {
tokio::time::sleep(Duration::from_millis(10)).await;
let ready = stream.ready(Interest::READABLE | Interest::WRITABLE).await;
match ready {
Ok(r) => {
{
if r.is_readable() {
let mut data = vec![0; 1024 * 64];
// Try to read data, this may still fail with `WouldBlock`
// if the readiness event is a false positive.
match stream.try_read(&mut data) {
Ok(n) => {
//println!("read {} bytes", n);
let mut tmp_bytes = BytesMut::with_capacity((n + 0) as usize);
for i in 0..n {
tmp_bytes.put_u8(data[i]);
}
//Arc::downgrade(&mut recv_queue).upgrade().unwrap().lock().unwrap().borrow_mut().insert(0, tmp_bytes);
}
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
println!("read error 1");
continue;
}
Err(e) => {
println!("read error 2");
//return Err(e.into());
}
}
}
}
{
if r.is_writable() {
/*
while Arc::downgrade(&mut send_queue).upgrade().unwrap().lock().unwrap().borrow().len() > 0 {
//Arc::downgrade(&mut send_queue).upgrade().unwrap().lock().unwrap().borrow_mut()[0]
let bytes = Arc::downgrade(&mut send_queue).upgrade().unwrap().lock().unwrap().borrow_mut().pop();
let len = bytes.clone().unwrap().clone().len();
{
//println!("sendmsg len:{0}", len);
}
{
down_stream.write_all(bytes.unwrap().as_ref()).await;
}
}*/
}
}
}
Err(e) => {
}
}
}
});
}
impl WsListener {
pub fn new() -> Self {
@ -176,6 +242,12 @@ impl WsListener {
_self_wp: Default::default(),
work_thread: Default::default(),
socket_hash: Default::default(),
upstream: Default::default(),
tokio_rt: tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap(),
down_pack_queue: r9::Queue::<DownStreamPack>::new_ex(),
};
return p;
}
@ -193,6 +265,7 @@ impl WsListener {
let counter = web::Data::new(AppStateWithCounter {
counter: Mutex::new(0),
});
println!("hello2333");
HttpServer::new(move || {
// move counter into the closure
ActixApp::new() // <- register the created data
@ -204,6 +277,23 @@ impl WsListener {
.await
});
}));
{
let upstream = self.upstream.clone();
let down_pack_queue = self.down_pack_queue.clone();
self.tokio_rt.spawn(async move {
let mut result = TcpStream::connect("192.168.100.39:7616").await;
match result {
Ok(v) => {
//upstream.lock().unwrap().replace(v);
println!("connect ok");
upstream_enter(v, down_pack_queue);
},
Err(e) => {
println!("connect err")
}
}
});
}
println!("hello3");
}
}

2
third_party/librust vendored

@ -1 +1 @@
Subproject commit cbca2a666f64fe55357179c1ddd13e1ed5a43c9c
Subproject commit c950f6d2883a3213a17d514eded9fdcc86f400b5