a8/a8/udpclient.cc
2018-08-26 20:34:01 +08:00

156 lines
4.4 KiB
C++

#include <errno.h>
#include <stdio.h>
#include <unistd.h>
#include <memory.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <sys/epoll.h>
#include <netinet/tcp.h>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <a8/a8.h>
#include <a8/udpclient.h>
namespace a8
{
struct UdpClientImpl
{
volatile int socket = -1;
std::mutex send_buffer_mutex;
a8::SendQueueNode* top_node = nullptr;
a8::SendQueueNode* bot_node = nullptr;
std::thread* sender_thread = nullptr;
struct sockaddr_in addr = {0};
std::mutex* send_cond_mutex = nullptr;
std::condition_variable* send_cond = nullptr;
volatile bool shutdown = false;
void NotifySendCond()
{
std::unique_lock<std::mutex> lk(*send_cond_mutex);
send_cond->notify_all();
}
void SenderThreadProc()
{
SendQueueNode* worknode = nullptr;
SendQueueNode* delnode = nullptr;
while (!shutdown) {
if (!worknode && top_node) {
send_buffer_mutex.lock();
worknode = top_node;
top_node = nullptr;
bot_node = nullptr;
send_buffer_mutex.unlock();
}
while (worknode && !shutdown) {
::sendto(socket,
worknode->buff,
worknode->bufflen,
0,
(struct sockaddr*)&addr, sizeof(addr));
delnode = worknode;
worknode = worknode->next;
free(delnode->buff);
free(delnode);
}
{
std::unique_lock<std::mutex> lk(*send_cond_mutex);
send_cond->wait_for(lk, std::chrono::seconds(1));
}
}
while (worknode) {
delnode = worknode;
worknode = worknode->next;
free(delnode->buff);
free(delnode);
}
}
};
UdpClient::UdpClient()
{
impl_ = new a8::UdpClientImpl();
bzero(&impl_->addr, sizeof(impl_->addr));
impl_->send_cond_mutex = new std::mutex();
impl_->send_cond = new std::condition_variable();
}
UdpClient::~UdpClient()
{
Close();
delete impl_->send_cond_mutex;
impl_->send_cond_mutex = nullptr;
delete impl_->send_cond;
impl_->send_cond = nullptr;
delete impl_;
impl_ = nullptr;
}
void UdpClient::Open()
{
if(!IsActive()){
Close();
impl_->socket = socket(AF_INET, SOCK_DGRAM, 0);
if(impl_->socket == -1){
// OnError(this, errno);
return;
}
bzero(&impl_->addr, sizeof(impl_->addr));
impl_->addr.sin_family = AF_INET;
impl_->addr.sin_port = htons(remote_port);
impl_->addr.sin_addr.s_addr = inet_addr(remote_address.c_str());
impl_->shutdown = false;
impl_->sender_thread = new std::thread(&a8::UdpClientImpl::SenderThreadProc, impl_);
}
}
void UdpClient::Close()
{
if(IsActive()){
::close(impl_->socket);
impl_->socket = -1;
if (impl_->sender_thread) {
impl_->shutdown = true;
impl_->sender_thread->join();
delete impl_->sender_thread;
impl_->sender_thread = NULL;
}
}
}
bool UdpClient::IsActive()
{
return impl_->socket != -1;
}
void UdpClient::SendBuff(const char* buff, unsigned int bufflen)
{
if (bufflen > 0){
SendQueueNode* p = (SendQueueNode*)malloc(sizeof(SendQueueNode));
memset(p, 0, sizeof(SendQueueNode));
p->buff = (char*)malloc(bufflen);
memmove(p->buff, buff, bufflen);
p->bufflen = bufflen;
impl_->send_buffer_mutex.lock();
if (impl_->bot_node){
impl_->bot_node->next = p;
impl_->bot_node = p;
}else{
impl_->top_node = p;
impl_->bot_node = p;
}
impl_->send_buffer_mutex.unlock();
impl_->NotifySendCond();
}
}
}