This commit is contained in:
azw 2024-05-12 09:56:44 +08:00
parent 436c47ea3f
commit acb8e1a470
3 changed files with 6 additions and 324 deletions

View File

@ -25,12 +25,16 @@ use actix_web::web::Bytes;
use futures::FutureExt;
use tokio::io::{AsyncReadExt, Interest};
use crate::app::UserApp;
use crate::{AppStateWithCounter, GSResponse, MAX_PACKET_LEN};
use crate::{MAX_PACKET_LEN};
use tokio::net::TcpStream;
use tokio::runtime::Runtime;
use crate::app::user_app::DownStreamPack;
use tokio::io::{AsyncWriteExt, Ready};
struct AppStateWithCounter {
counter: std::sync::Mutex<i32>, // <- Mutex is necessary to mutate safely across threadsds
}
#[derive(SharedFromSelf)]
#[derive(Singleton)]
pub struct WsListener {

View File

@ -1,4 +1,3 @@
use std::future::IntoFuture;
use f9::app::App;
mod app;
@ -7,330 +6,9 @@ mod upstream;
mod downstream;
mod ss;
use crate::app::UserApp;
use actix_web::{web, App as ActixApp, get, post, Responder, Error, HttpServer, HttpRequest, HttpResponse};
use std::sync::mpsc;
use std::thread;
use actix_rt::System;
use std::time::Duration;
use actix_web_actors::ws;
use actix::{Actor, AsyncContext, StreamHandler};
use std::sync::Mutex;
use actix::prelude::*;
use bytes::{Buf, BufMut, Bytes};
use bytes::BytesMut;
use tokio::io::{AsyncWriteExt, Ready};
use tokio::net::TcpStream;
use std::sync::Arc;
use tokio::io::Interest;
use std::vec::Vec;
use std::cell::RefCell;
use std::io::Read;
use std::io;
use std::cell::Cell;
const MAX_PACKET_LEN: usize = 1024 * 64;
/// Define HTTP actor
struct MyWs {
recv_buf_len: usize,
recv_buf: BytesMut,
send_queue: Arc<Mutex<RefCell<Vec::<BytesMut>>>>,
recv_queue: Arc<Mutex<RefCell<Vec::<BytesMut>>>>,
}
impl Actor for MyWs {
type Context = ws::WebsocketContext<Self>;
}
impl MyWs {
fn parse_pkt(&mut self, bin: &actix_web::web::Bytes) {
let buf_len = bin.len();
if buf_len > 0 {
let mut already_read_bytes: usize = 0;
{
for i in 0..1024 * 64 * 2 {
self.recv_buf.put_i8(0);
}
}
while true {
let mut read_bytes = buf_len - already_read_bytes;
if read_bytes > MAX_PACKET_LEN - self.recv_buf_len {
read_bytes = MAX_PACKET_LEN - self.recv_buf_len;
}
if read_bytes > 0 {
self.recv_buf[self.recv_buf_len..bin.len()].copy_from_slice(&bin);
self.recv_buf_len += read_bytes;
already_read_bytes += read_bytes;
}
let mut offset = 0;
let mut pre_offect = 0;
while true {
pre_offect = offset;
{
let pack_len = (self.recv_buf[0] as u16)+ ((self.recv_buf[1] as u16) << 8);
let msg_id = (self.recv_buf[2] as u16)+ ((self.recv_buf[3] as u16) << 8);
let seq_id = (self.recv_buf[4] as i32)+ ((self.recv_buf[5] as i32) << 8) +
((self.recv_buf[7] as i32) << 16)+ ((self.recv_buf[7] as i32) << 24);
let magic_code = (self.recv_buf[8] as u16)+ ((self.recv_buf[9] as u16) << 8);
let ext_len = (self.recv_buf[10] as u16)+ ((self.recv_buf[11] as u16) << 8);
let mut tmp_bytes = BytesMut::with_capacity((pack_len + 24) as usize);
{
tmp_bytes.put_u16_le(pack_len);
tmp_bytes.put_u16_le(msg_id);
tmp_bytes.put_i32_le(seq_id);
tmp_bytes.put_u16_le(magic_code);
tmp_bytes.put_u16_le(ext_len);
tmp_bytes.put_i32_le(0);
tmp_bytes.put_i64_le(0);
for i in 24..tmp_bytes.capacity() {
tmp_bytes.put_i8(0);
}
tmp_bytes[24 .. (24+ pack_len as usize)].copy_from_slice(&bin[12..(12 + pack_len as usize)]);
println!("recv buf_len:{0}", tmp_bytes.len());
}
{
Arc::downgrade(&mut self.send_queue).upgrade().unwrap().lock().unwrap().borrow_mut().insert(0, tmp_bytes);
}
offset += (pack_len as usize) + 12;
}
if pre_offect >= offset || offset >= self.recv_buf_len {
break;
}
}//end while
if offset > 0 && offset < self.recv_buf_len {
self.recv_buf[..0].copy_from_slice(&bin);
}
self.recv_buf_len -= offset;
if self.recv_buf_len > 0 {
self.recv_buf_len = 0;
}
if already_read_bytes >= bin.len() {
already_read_bytes = 0;
break;
}
}
}
}
}
/// Handler for ws::Message message
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for MyWs {
fn started(&mut self, ctx: &mut Self::Context) {
let a: Recipient<GSResponse> = ctx.address().recipient();
let mut recv_buf = BytesMut::with_capacity(1024 * 64 * 3);
let mut recv_buf_len = 0;
{
for i in 0..recv_buf.capacity() {
recv_buf.put_i8(0);
}
}
let mut recv_queue = self.recv_queue.clone();
tokio::spawn(async move {
loop {
while Arc::downgrade(&mut recv_queue).upgrade().unwrap().lock().unwrap().borrow().len() > 0 {
let bytes = Arc::downgrade(&mut recv_queue).upgrade().unwrap().lock().unwrap().borrow_mut().pop();
let len = bytes.clone().unwrap().clone().len();
{
//println!("sendmsg 1111111111111111111111111111111 len:{0}", len);
}
{
let data = bytes.unwrap();
for i in 0..len {
recv_buf[recv_buf_len + i] = data[i];
//println!("sendmsg xxxxx idx:{0} val:{1} buf_val:{2} recv_buf_len:{3}", i, data[i], recv_buf[recv_buf_len + 1], recv_buf_len);
}
recv_buf_len += len;
while recv_buf_len >= 16 {
let pack_len = (recv_buf[0] as u16) + ((recv_buf[1] as u16) << 8);
let msg_id = (recv_buf[2] as u16) + ((recv_buf[3] as u16) << 8);
let seq_id = (recv_buf[4] as i32) + ((recv_buf[5] as i32) << 8) +
((recv_buf[7] as i32) << 16) + ((recv_buf[7] as i32) << 24);
let magic_code = (recv_buf[8] as u16) + ((recv_buf[9] as u16) << 8);
let rpc_error_code = (recv_buf[10] as u16) + ((recv_buf[11] as u16) << 8);
let socket_handle = (recv_buf[12] as u16) + ((recv_buf[13] as u16) << 8);
let ext_len = (recv_buf[14] as u16) + ((recv_buf[15] as u16) << 8);
if ((pack_len + 16) as usize) > recv_buf_len {
break;
}
let mut tmp_bytes = BytesMut::with_capacity((pack_len + 16) as usize);
{
tmp_bytes.put_u16_le(pack_len);
tmp_bytes.put_u16_le(msg_id);
tmp_bytes.put_i32_le(seq_id);
tmp_bytes.put_u16_le(magic_code);
tmp_bytes.put_u16_le(ext_len);
for i in 0..pack_len {
tmp_bytes.put_u8(recv_buf[(16 + i) as usize]);
}
let mut a_bytes = actix_web::web::Bytes::copy_from_slice(tmp_bytes.as_ref());
let msg = GSResponse {
data: a_bytes
};
a.do_send(msg);
/*
{
println!("sendmsg pack_len:{0} msg_id:{1} seq_id:{2} magic_code:{3} ext_len:{4}",
pack_len,
msg_id,
seq_id,
magic_code,
ext_len);
}*/
}
for i in (pack_len + 16) as usize..recv_buf_len {
recv_buf[i - (pack_len + 16) as usize] = recv_buf[i];
}
recv_buf_len -= (pack_len + 16) as usize;
}
}
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
});
}
fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
match msg {
Ok(ws::Message::Ping(msg)) => ctx.pong(&msg),
Ok(ws::Message::Text(text)) => ctx.text(text),
Ok(ws::Message::Binary(bin)) => {
self.parse_pkt(&bin);
},
_ => (),
}
}
}
#[derive(Message)]
#[rtype(result = "()")]
pub struct GSResponse {
data: actix_web::web::Bytes,
}
impl Handler<GSResponse> for MyWs {
type Result = ();
fn handle(&mut self, msg: GSResponse, ctx: &mut Self::Context) -> Self::Result {
ctx.binary(msg.data);
}
}
struct AppStateWithCounter {
counter: Mutex<i32>, // <- Mutex is necessary to mutate safely across threads
}
async fn index(req: HttpRequest, stream: web::Payload, data: web::Data<AppStateWithCounter>) -> Result<HttpResponse, Error> {
let mut counter = data.counter.lock().unwrap(); // <- get counter's MutexGuard
*counter += 1; // <- access counter inside MutexGuard
format!("Request number: {counter}"); // <- response with count
let ws_client = MyWs {
recv_buf_len: 0,
recv_buf: BytesMut::with_capacity(1024 * 64 * 2),
send_queue: Default::default(),
recv_queue: Default::default(),
//addr: Default::default(),
};
{
let mut down_stream = TcpStream::connect("192.168.100.39:7616").await?;
down_stream.set_nodelay(true);
//down_stream.set_nonblocking(true)?;
let mut send_queue = ws_client.send_queue.clone();
let mut recv_queue = ws_client.recv_queue.clone();
tokio::spawn(async move {
loop {
tokio::time::sleep(Duration::from_millis(10)).await;
let ready = down_stream.ready(Interest::READABLE | Interest::WRITABLE).await;
match ready {
Ok(r) => {
{
if r.is_readable() {
let mut data = vec![0; 1024 * 64];
// Try to read data, this may still fail with `WouldBlock`
// if the readiness event is a false positive.
match down_stream.try_read(&mut data) {
Ok(n) => {
//println!("read {} bytes", n);
let mut tmp_bytes = BytesMut::with_capacity((n + 0) as usize);
for i in 0..n {
tmp_bytes.put_u8(data[i]);
}
Arc::downgrade(&mut recv_queue).upgrade().unwrap().lock().unwrap().borrow_mut().insert(0, tmp_bytes);
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
println!("read error 1");
continue;
}
Err(e) => {
println!("read error 2");
//return Err(e.into());
}
}
}
}
{
if r.is_writable() {
while Arc::downgrade(&mut send_queue).upgrade().unwrap().lock().unwrap().borrow().len() > 0 {
//Arc::downgrade(&mut send_queue).upgrade().unwrap().lock().unwrap().borrow_mut()[0]
let bytes = Arc::downgrade(&mut send_queue).upgrade().unwrap().lock().unwrap().borrow_mut().pop();
let len = bytes.clone().unwrap().clone().len();
{
//println!("sendmsg len:{0}", len);
}
{
down_stream.write_all(bytes.unwrap().as_ref()).await;
}
}
}
}
}
Err(e) => {
}
}
}
});
}
let resp = ws::start(ws_client, &req, stream);
println!("{:?}", resp);
resp
}
fn main1() {
let t1 = thread::spawn(||{
//let mut rt = tokio::runtime::Runtime::new().unwrap();
let mut rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();
rt.block_on(async {
let counter = web::Data::new(AppStateWithCounter {
counter: Mutex::new(0),
});
HttpServer::new(move || {
// move counter into the closure
ActixApp::new()
.app_data(counter.clone()) // <- register the created data
.route("/", web::get().to(index))
})
.bind(("0.0.0.0", 8080))?
.run()
.await
});
println!("hello2");
});
println!("hello3");
while true {
thread::sleep(Duration::from_secs(10));
}
}
fn main() {
App::init(UserApp::instance());
App::run();

2
third_party/librust vendored

@ -1 +1 @@
Subproject commit c950f6d2883a3213a17d514eded9fdcc86f400b5
Subproject commit 902db5e1bf1789c5055b0e9015f513a409cac4c6