a8/a8/websocketclient.cc
aozhiwei af7a9093a0 1
2023-12-08 10:44:16 +08:00

298 lines
9.7 KiB
C++

#include <a8/a8.h>
#include <a8/websocketclient.h>
#ifdef USE_BOOST
static const unsigned char FIN = 0x80;
static const unsigned char RSV1 = 0x40;
static const unsigned char RSV2 = 0x20;
static const unsigned char RSV3 = 0x10;
static const unsigned char RSV_MASK = RSV1 | RSV2 | RSV3;
static const unsigned char OPCODE_MASK = 0x0F;
static const unsigned char TEXT_MODE = 0x01;
static const unsigned char BINARY_MODE = 0x02;
static const unsigned char WEBSOCKET_OPCODE = 0x0F;
static const unsigned char WEBSOCKET_FRAME_CONTINUE = 0x0;
static const unsigned char WEBSOCKET_FRAME_TEXT = 0x1;
static const unsigned char WEBSOCKET_FRAME_BINARY = 0x2;
static const unsigned char WEBSOCKET_FRAME_CLOSE = 0x8;
static const unsigned char WEBSOCKET_FRAME_PING = 0x9;
static const unsigned char WEBSOCKET_FRAME_PONG = 0xA;
static const unsigned char WEBSOCKET_MASK = 0x80;
static const unsigned char WEBSOCKET_PAYLOAD_LEN = 0x7F;
static const unsigned char WEBSOCKET_PAYLOAD_LEN_UINT16 = 126;
static const unsigned char WEBSOCKET_PAYLOAD_LEN_UINT64 = 127;
static const char* WEB_SOCKET_KEY = "Sec-WebSocket-Key: ";
static const char* WEB_SOCKET_KEY2 = "Sec-Websocket-Key: ";
static const int DEFAULT_MAX_PACKET_LEN = 1024 * 10;
static const int DEFAULT_MAX_RECV_BUFFERSIZE = 1024 * 64;
namespace a8
{
WebSocketClient::WebSocketClient(std::shared_ptr<asio::io_context> io_context, const std::string& remote_ip, int remote_port)
{
max_packet_len_ = DEFAULT_MAX_PACKET_LEN;
recv_buff_ = (char *)malloc(max_packet_len_ + 1);
recv_bufflen_ = 0;
tcp_client_ = std::make_shared<AsioTcpClient>(io_context, remote_ip, remote_port);
decoded_buff_ = (char *)malloc(1024 * 64 + 1);
decoded_bufflen_ = 0;
tcp_client_->on_error =
[this] (a8::AsioTcpClient* socket, int err)
{
if (on_error) {
on_error(this, err);
}
};
tcp_client_->on_connect =
[this] (a8::AsioTcpClient* socket)
{
std::string data = a8::Format("GET ws://%s:%d/\r\n",
{socket->GetRemoteAddress(),
socket->GetRemotePort()});
data += "Upgrade: websocket\r\n";
data += "Connection: Upgrade\r\n";
data += "Sec-WebSocket-Version: 13\r\n";
data += "Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==*\r\n";
data += "\r\n";
socket->SendBuff(data.data(), data.size());
};
tcp_client_->on_disconnect =
[this] (a8::AsioTcpClient* socket, int err)
{
if (on_disconnect) {
on_disconnect(this, err);
}
};
tcp_client_->on_socketread =
[this] (a8::AsioTcpClient* socket, char* buf, unsigned int buflen)
{
unsigned int already_read_bytes = 0;
do {
if (already_read_bytes < buflen) {
int read_bytes = std::min(buflen - already_read_bytes,
(unsigned int)max_packet_len_ - recv_bufflen_);
if (read_bytes > 0) {
memmove(&recv_buff_[recv_bufflen_], buf + already_read_bytes, read_bytes);
recv_bufflen_ += read_bytes;
already_read_bytes += read_bytes;
}
}
int offset = 0;
int prev_offset = 0;
do {
prev_offset = offset;
DecodePacket(recv_buff_, offset, recv_bufflen_);
} while (prev_offset < offset && offset < recv_bufflen_);
if (offset > 0 && offset < recv_bufflen_){
memmove(recv_buff_, recv_buff_ + offset, recv_bufflen_ - offset);
}
recv_bufflen_ -= offset;
if (recv_bufflen_ >= max_packet_len_) {
//收到超长包
Close();
return;
}
} while (already_read_bytes < buflen);
};
}
WebSocketClient::~WebSocketClient()
{
recv_bufflen_ = 0;
free(recv_buff_);
recv_buff_ = nullptr;
free(decoded_buff_);
decoded_buff_ = nullptr;
decoded_bufflen_ = 0;
}
void WebSocketClient::Open()
{
tcp_client_->Open();
}
void WebSocketClient::Close()
{
tcp_client_->Close();
}
bool WebSocketClient::IsActive()
{
return tcp_client_->IsActive();
}
bool WebSocketClient::Connected()
{
return tcp_client_->Connected();
}
void WebSocketClient::SendBuff(const char* buff, unsigned int bufflen)
{
if (!handshook_) {
abort();
}
unsigned char szbuff [1024 * 65];
szbuff[0] = FIN | BINARY_MODE | 0;
int mask_offset = 2;
int payloadlen = bufflen;
if (payloadlen < 126) {
szbuff[1] = payloadlen | 0x80;
mask_offset = 2;
} else if (payloadlen <= 0xFFFF) {
szbuff[1] = 126 | 0x80;
szbuff[2] = (payloadlen >> 8) & 0xFF;
szbuff[3] = payloadlen & 0xFF;
mask_offset = 4;
} else {
abort();
}
*((int*)(szbuff + mask_offset)) = rand();
for (unsigned i = 0; i < bufflen; ++i) {
szbuff[mask_offset + 4 + i] =
((unsigned char)buff[i]) ^ szbuff[mask_offset + (i % 4)] ;
}
tcp_client_->SendBuff((char*)szbuff, bufflen + mask_offset + 4);
}
void WebSocketClient::ProcessHandShake(char* buf, int& offset, unsigned int buflen)
{
char* pend = strstr(buf + offset, "\r\n\r\n");
if (!pend) {
return;
}
ProcessWsHandShake(buf, offset, buflen);
}
void WebSocketClient::ProcessWsHandShake(char* buf, int& offset, unsigned int buflen)
{
char* pend = strstr(buf + offset, "\r\n\r\n");
if (!pend) {
return;
}
handshook_ = true;
offset += pend - buf - offset + strlen("\r\n\r\n");
if (on_connect) {
on_connect(this);
}
}
void WebSocketClient::ProcessUserPacket()
{
int offset = 0;
int prev_offset = 0;
do {
prev_offset = offset;
on_decode_userpacket(decoded_buff_, offset, decoded_bufflen_);
} while (prev_offset < offset && offset < decoded_bufflen_);
if (offset > 0 && offset < decoded_bufflen_){
memmove(decoded_buff_, decoded_buff_ + offset, decoded_bufflen_ - offset);
}
decoded_bufflen_ -= offset;
if (decoded_bufflen_ >= max_packet_len_) {
//收到超长包
Close();
return;
}
}
void WebSocketClient::DecodeFrame(char* buf, int& offset, unsigned int buflen)
{
if (offset + 2 > (int)buflen) {
return;
}
char* real_buf = buf + offset;
unsigned int ava_len = buflen - offset;
unsigned char header = real_buf[0];
unsigned char mask_payloadlen = real_buf[1];
bool is_final_frame = (header & FIN) == FIN;
#if 0
bool reserved_bits = (header & FIN) == RSV_MASK;
#endif
unsigned char opcode = header & OPCODE_MASK;
#if 0
bool opcode_is_control = opcode & 0x8;
#endif
if (opcode == WEBSOCKET_FRAME_CLOSE) {
Close();
return;
}
if (opcode != BINARY_MODE) {
if (opcode != WEBSOCKET_FRAME_PING) {
Close();
return;
}
}
if (!is_final_frame) {
Close();
return;
}
bool is_masked = (mask_payloadlen & 0x80) == 0x80;
unsigned char payloadlen = mask_payloadlen & 0x7F;
unsigned int framelen = 0;
int mask_offset = 0;
if (payloadlen < 126) {
framelen = payloadlen;
mask_offset = 2;
} else if (payloadlen == 126 && ava_len >= 4) {
framelen = ntohs( *(u_short*) (real_buf + 2) );
mask_offset = 4;
} else if (payloadlen == 127 && ava_len >= 8) {
//int32 or int64?
framelen = ntohl( *(u_long*) (real_buf + 2) );
mask_offset = 8;
} else {
return;
}
unsigned int real_pkg_len = mask_offset + framelen + (is_masked ? 4 : 0);
if (ava_len < real_pkg_len) {
return;
}
if (is_masked) {
unsigned char *frame_mask = (unsigned char*)(real_buf + mask_offset);
memcpy(&decoded_buff_[decoded_bufflen_], real_buf + mask_offset + 4, framelen);
for (unsigned int i = 0; i < framelen; i++) {
decoded_buff_[decoded_bufflen_ + i] =
(decoded_buff_[ decoded_bufflen_ + i] ^ frame_mask[i%4]);
}
} else {
memcpy(&decoded_buff_[decoded_bufflen_], real_buf + mask_offset, framelen);
}
decoded_bufflen_ += framelen;
ProcessUserPacket();
offset += real_pkg_len;
}
void WebSocketClient::DecodePacket(char* buf, int& offset, unsigned int buflen)
{
if (!handshook_) {
buf[buflen] = '\0';
ProcessHandShake(buf, offset, buflen);
} else {
DecodeFrame(buf, offset, buflen);
}
}
}
#endif