a8/a8/tcplistener.cc
2019-05-29 14:23:47 +08:00

510 lines
16 KiB
C++

#include <memory.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <sys/epoll.h>
#include <netinet/tcp.h>
#include <thread>
#include <a8/a8.h>
#include <a8/tcplistener.h>
#ifdef A8_TCP_SESSION2
#include <a8/tcpsession2.h>
#else
#include <a8/tcpsession.h>
#endif
#include <a8/tcpsessionpool.h>
namespace a8
{
struct TcpListenerImpl
{
a8::TcpListener* master = nullptr;
int listen_socket = a8::INVALID_SOCKET;
std::thread* accept_thread = nullptr;
std::thread* worker_thread = nullptr;
volatile bool accept_thread_shutdown = false;
volatile bool worker_thread_shutdown = false;
std::mutex clients_mutex;
std::map<a8::TcpSession*, a8::TcpSession*> client_hash;
std::map<unsigned short, a8::TcpSession*> client_handle_hash;
volatile unsigned int client_count = 0;
unsigned short curr_socket_handle = 1000;
unsigned short max_clients = 0xEFFF;
a8::TcpSessionPool free_client_pool;
volatile int epoll_fd = a8::INVALID_FD;
#if 0
list_head session_list;
#endif
#if 0
list_head* checking_node = nullptr;
#endif
bool IsActive()
{
return listen_socket != a8::INVALID_SOCKET;
}
void SetActive(bool active)
{
if (active) {
if (IsActive()) {
return;
}
if (!worker_thread) {
worker_thread_shutdown = false;
worker_thread = new std::thread(&a8::TcpListenerImpl::WorkerThreadProc, this);
}
} else {
if (IsActive()) {
ActiveStop();
}
}
}
bool ActiveStart()
{
listen_socket = ::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if(listen_socket == a8::INVALID_SOCKET){
if (master->on_error){
master->on_error(master, a8::TCPLISTENER_E::TE_CREATE_ERR, errno);
}
return false;
}
// TIME_WAIT - argh
int on = 1;
if (::setsockopt(listen_socket,
SOL_SOCKET,
SO_REUSEADDR,
(const char*)&on,
sizeof(on)) < 0){
if (master->on_error){
master->on_error(master, a8::TCPLISTENER_E::TE_SETSOCKOPT_ERR, errno);
}
::close(listen_socket);
listen_socket = a8::INVALID_SOCKET;
return false;
}
sockaddr_in sa;
memset(&sa, 0, sizeof(sa));
sa.sin_family = AF_INET;
sa.sin_addr.s_addr = ::inet_addr(master->bind_address.c_str());
sa.sin_port = htons(master->bind_port);
if (::bind(listen_socket,
(sockaddr*)&sa,
sizeof(sa)) < 0) {
if (master->on_error) {
master->on_error(master, a8::TCPLISTENER_E::TE_BIND_ERR, errno);
}
::close(listen_socket);
listen_socket = a8::INVALID_SOCKET;
return false;
}
if (::listen(listen_socket, max_clients) < 0) {
if (master->on_error) {
master->on_error(master, a8::TCPLISTENER_E::TE_LISTEN_ERR, errno);
}
::close(listen_socket);
listen_socket = a8::INVALID_SOCKET;
return false;
}
epoll_fd = ::epoll_create(max_clients);
assert(epoll_fd != a8::INVALID_FD);
accept_thread_shutdown = false;
accept_thread = new std::thread(&a8::TcpListenerImpl::AcceptThreadProc, this);
return true;
}
void ActiveStop()
{
if (listen_socket != a8::INVALID_SOCKET) {
::shutdown(listen_socket, SHUT_RDWR);
::close(listen_socket);
listen_socket = a8::INVALID_SOCKET;
}
if(epoll_fd != a8::INVALID_FD) {
::close(epoll_fd);
epoll_fd = a8::INVALID_FD;
}
if (accept_thread) {
accept_thread_shutdown = true;
accept_thread->join();
delete accept_thread;
accept_thread = nullptr;
}
if (worker_thread) {
worker_thread_shutdown = true;
worker_thread->join();
delete worker_thread;
worker_thread = nullptr;
}
{
clients_mutex.lock();
std::vector<a8::TcpSession*> del_sessions;
for (auto& pair : client_hash) {
del_sessions.push_back(pair.second);
}
for (a8::TcpSession* session : del_sessions) {
session->_ForceClose();
}
client_hash.clear();
clients_mutex.unlock();
}
}
void ClearDeadSession()
{
#if 1
std::list<a8::TcpSession*> dead_session;
for (auto& pair : client_hash) {
if (!pair.first->Alive()) {
dead_session.push_back(pair.first);
}
}
for (auto& session : dead_session) {
session->_ForceClose();
}
#else
int try_cnt = 0;
while (!list_empty(checking_node) && try_cnt < 1024) {
a8::TcpSession* session = list_entry(checking_node, a8::TcpSession, session_entry);
checking_node = checking_node->next;
if (!session->Alive()) {
session->_ForceClose();
}
++try_cnt;
}
if (list_empty(checking_node)) {
checking_node = &session_list;
}
#endif
}
a8::TcpSession* CreateSession(int sock, const char* ipaddr, int port, unsigned long saddr)
{
a8::TcpSession* session = nullptr;
clients_mutex.lock();
ClearDeadSession();
session = free_client_pool.Get();
clients_mutex.unlock();
if (!session) {
if (master->on_create_client_socket) {
master->on_create_client_socket(&session);
if (session) {
if (!session->AllocRecvBuf()) {
delete session;
session = nullptr;
return nullptr;
}
}
}
}
{
session->create_time = time(nullptr);
session->master = master;
session->Reset();
session->SetSocket(sock);
session->saddr = saddr;
session->remote_address = std::string(ipaddr);
session->remote_port = port;
INIT_LIST_HEAD(&session->session_entry);
#if 0
list_add_tail(&session->session_entry, &session_list);
#endif
}
{
clients_mutex.lock();
++curr_socket_handle;
int try_count = 0;
while (master->GetClientSession(curr_socket_handle) && try_count < max_clients) {
if (curr_socket_handle < 1000) {
curr_socket_handle = 1000;
}
++try_count;
++curr_socket_handle;
}
if (!master->GetClientSession(curr_socket_handle)) {
session->socket_handle = curr_socket_handle;
client_count++;
client_hash.insert(std::make_pair(session, session));
client_handle_hash.insert(std::make_pair(session->socket_handle, session));
} else {
delete session;
session = nullptr;
}
clients_mutex.unlock();
}
return session;
}
bool SocketAccept(int sock, const char* ipaddr, int port, unsigned long saddr)
{
if (client_count >= max_clients) {
return false;
}
bool refuse = false;
if (master->on_client_connect) {
master->on_client_connect(ipaddr, port, refuse);
}
if (refuse) {
return false;
}
a8::TcpSession* p = CreateSession(sock, ipaddr, port, saddr);
if (!p) {
return false;
}
//set nodelay
{
int flag = 1;
int ret = ::setsockopt(sock,
IPPROTO_TCP,
TCP_NODELAY,
(char *)&flag,
sizeof(flag));
assert(ret >= 0);
if (ret < 0) {
abort();
}
}
//set nonblock
{
int flags = 0;
flags = ::fcntl(sock, F_GETFL, 0);
::fcntl(sock, F_SETFL, flags|O_NONBLOCK);
p->epoll_fd = epoll_fd;
}
//add epoll
{
struct epoll_event ev;
ev.data.fd = sock;
#ifdef NEW_NET
ev.events = EPOLLIN | EPOLLET | EPOLLRDHUP;
#else
ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLRDHUP;
#endif
#ifdef A8_TCP_SESSION2
ev.events = EPOLLIN | EPOLLRDHUP;
#endif
ev.data.ptr = p;
int n = ::epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sock, &ev);
assert(n == 0);
if (n != 0) {
abort();
}
}
p->OnConnect();
return true;
}
void AcceptThreadProc()
{
sockaddr_in addr;
socklen_t addr_len = sizeof(sockaddr_in);
while (!accept_thread_shutdown) {
if (listen_socket == a8::INVALID_SOCKET) {
break;
}
addr_len = sizeof(sockaddr_in);
memset(&addr, 0, sizeof(addr));
int sock = ::accept(listen_socket, (sockaddr*)&addr, (socklen_t*)&addr_len);
if (sock != a8::INVALID_SOCKET) {
if (accept_thread_shutdown) {
::close(sock);
} else {
if (!SocketAccept(sock,
::inet_ntoa(addr.sin_addr),
addr.sin_port,
addr.sin_addr.s_addr)) {
::close(sock);
}
}
}
}
}
void WorkerThreadProc()
{
if (!ActiveStart()) {
return;
}
epoll_event *events = new epoll_event[max_clients];
while (!worker_thread_shutdown) {
int nfds = ::epoll_wait(epoll_fd, events, max_clients, 1000 * 10);
for (int i = 0; i < nfds; ++i) {
a8::TcpSession* session = (a8::TcpSession*)events[i].data.ptr;
if (events[i].events & EPOLLIN) {
session->DoClientRecv();
} else if (events[i].events & EPOLLOUT) {
session->DoClientSend();
} else if (events[i].events & EPOLLRDHUP ||
events[i].events & EPOLLHUP) {
FreeClient(session);
}
}
}
delete [] events;
}
void FreeClientWithNoLock(a8::TcpSession *session)
{
auto itr = client_hash.find(session);
if (itr != client_hash.end()) {
#if 0
if (checking_node == &session->session_entry) {
checking_node = session->session_entry.next;
}
list_del_init(&session->session_entry);
#endif
client_hash.erase(itr);
if (session->Socket() != a8::INVALID_SOCKET) {
session->_ForceClose();
}
client_handle_hash.erase(session->socket_handle);
session->ClearSendBuff();
session->OnDisConnect();
free_client_pool.Add(session);
client_count--;
}
}
void FreeClient(a8::TcpSession *session)
{
clients_mutex.lock();
FreeClientWithNoLock(session);
clients_mutex.unlock();
}
void LockClients()
{
clients_mutex.lock();
}
void UnLockClients()
{
clients_mutex.unlock();
}
};
TcpListener::TcpListener(unsigned short max_client_cnt)
{
impl_ = new a8::TcpListenerImpl();
impl_->max_clients = max_client_cnt;
impl_->master = this;
#if 0
INIT_LIST_HEAD(&impl_->session_list);
#endif
#if 0
impl_->checking_node = &impl_->session_list;
#endif
}
TcpListener::~TcpListener()
{
Close();
delete impl_;
impl_ = nullptr;
}
void TcpListener::Open()
{
if (!IsActive()) {
impl_->SetActive(true);
}
}
void TcpListener::Close()
{
if (IsActive()) {
impl_->SetActive(false);
}
}
bool TcpListener::IsActive()
{
return impl_->listen_socket != a8::INVALID_SOCKET;
}
bool TcpListener::SendClientMsg(unsigned short sockhandle, const char *buff, int buffLen)
{
bool boSendOk = false;
impl_->clients_mutex.lock();
a8::TcpSession *p = GetClientSession(sockhandle);
if (p){
p->SendBuff(buff, buffLen);
boSendOk = true;
}
impl_->clients_mutex.unlock();
return boSendOk;
}
void TcpListener::BroadcastMsg(const char* buff, int bufflen)
{
impl_->clients_mutex.lock();
for (auto &itr : impl_->client_hash) {
itr.second->SendBuff(buff, bufflen);
}
impl_->clients_mutex.unlock();
}
void TcpListener::ForceCloseClient(unsigned short sockhandle)
{
impl_->clients_mutex.lock();
a8::TcpSession *p = GetClientSession(sockhandle);
if(p){
if(p->Socket() != a8::INVALID_SOCKET){
p->_ForceClose();
}
}
impl_->clients_mutex.unlock();
}
void TcpListener::MarkClient(unsigned short sockhandle, bool is_active)
{
impl_->clients_mutex.lock();
a8::TcpSession *p = GetClientSession(sockhandle);
if (p){
p->is_activite = is_active;
}
impl_->clients_mutex.unlock();
}
int TcpListener::GetClientSocketCount()
{
return impl_->client_count;
}
int TcpListener::GetPoolSocketCount()
{
return impl_->free_client_pool.Count();
}
void TcpListener::LockClients()
{
impl_->LockClients();
}
void TcpListener::FreeClientWithNoLock(a8::TcpSession *session)
{
impl_->FreeClientWithNoLock(session);
}
void TcpListener::UnLockClients()
{
impl_->UnLockClients();
}
a8::TcpSession* TcpListener::GetClientSession(unsigned short handle)
{
auto itr = impl_->client_handle_hash.find(handle);
return itr != impl_->client_handle_hash.end() ? itr->second : nullptr;
}
}