f8/cpp/httpclientpool.cc
aozhiwei a6cc881c1c 1
2019-08-30 09:09:29 +08:00

555 lines
18 KiB
C++

#include "precompile.h"
#include <thread>
#include <mutex>
#include <atomic>
#include <condition_variable>
#include <curl/curl.h>
#include <curl/easy.h>
#include <openssl/err.h>
#include <a8/list.h>
#include <a8/timer.h>
#include <a8/timer_attacher.h>
#include <a8/udplog.h>
#include <a8/curl.h>
#include <a8/mutable_xobject.h>
#include "framework/cpp/httpclientpool.h"
#include "framework/cpp/msgqueue.h"
#include "framework/cpp/utils.h"
#define F8_MUTLI_THREAD_HTTP 1
namespace f8
{
enum AsyncHttpError
{
AHE_NO_ERROR = 0,
AHE_NO_CONN = 1,
};
struct AsyncHttpRequest
{
long long context_id = 0;
a8::XParams param;
time_t add_time = 0;
AsyncHttpOnOkFunc on_ok = nullptr;
AsyncHttpOnErrorFunc on_error = nullptr;
a8::TimerAttacher timer_attacher;
};
struct AsyncHttpNode
{
int socket_handle = 0;
long long context_id = 0;
int method = 0;
std::string url;
std::string url_params;
std::string content;
a8::XObject headers;
std::map<std::string, std::string> kv_params;
AsyncHttpNode* nextnode = nullptr;
};
class HttpThread
{
public:
void Init()
{
loop_mutex_ = new std::mutex();
loop_cond_ = new std::condition_variable();
top_node_ = nullptr;
bot_node_ = nullptr;
work_node_ = nullptr;
msg_mutex_ = new std::mutex();
work_thread_ = new std::thread(&HttpThread::WorkThreadProc, this);
}
void UnInit()
{
terminated = true;
loop_cond_->notify_all();
work_thread_->join();
delete work_thread_;
work_thread_ = nullptr;
loop_mutex_->lock();
if (!work_node_) {
work_node_ = top_node_;
top_node_ = nullptr;
bot_node_ = nullptr;
}
while (work_node_) {
AsyncHttpNode* pdelnode = work_node_;
work_node_ = work_node_->nextnode;
if (!work_node_) {
work_node_ = top_node_;
top_node_ = nullptr;
bot_node_ = nullptr;
}
delete pdelnode;
}
loop_mutex_->unlock();
delete loop_cond_;
loop_cond_ = nullptr;
delete loop_mutex_;
loop_mutex_ = nullptr;
delete msg_mutex_;
msg_mutex_ = nullptr;
}
void AddAsyncHttp(AsyncHttpNode* p)
{
std::unique_lock<std::mutex> lk(*loop_mutex_);
msg_mutex_->lock();
if (bot_node_) {
bot_node_->nextnode = p;
bot_node_ = p;
} else {
top_node_ = p;
bot_node_ = p;
}
msg_mutex_->unlock();
loop_cond_->notify_all();
}
private:
void WorkThreadProc()
{
while (!terminated) {
ProcessMsg();
WaitLoopCond();
}
}
void ProcessMsg()
{
if (!work_node_ && top_node_) {
msg_mutex_->lock();
work_node_ = top_node_;
top_node_ = nullptr;
bot_node_ = nullptr;
msg_mutex_->unlock();
}
while (work_node_) {
AsyncHttpNode *pdelnode = work_node_;
work_node_ = work_node_->nextnode;
ProcAsyncHttp(pdelnode);
delete pdelnode;
}
}
void WaitLoopCond()
{
std::unique_lock<std::mutex> lk(*loop_mutex_);
{
msg_mutex_->lock();
if (!work_node_ && top_node_) {
work_node_ = top_node_;
top_node_ = nullptr;
bot_node_ = nullptr;
}
msg_mutex_->unlock();
}
if (!work_node_) {
loop_cond_->wait_for(lk, std::chrono::seconds(10));
}
}
void ProcSign(AsyncHttpNode* node)
{
}
void ProcAsyncHttp(AsyncHttpNode* node)
{
ProcSign(node);
std::string finally_url;
if (node->url.find('?') != std::string::npos) {
finally_url = node->url + node->url_params;
} else {
finally_url = node->url + "?" + node->url_params;
}
std::string response;
bool ret = false;
switch (node->method) {
case 1:
{
long long begin_tick = a8::XGetTickCount();
ret = a8::http::Get(finally_url, response, &node->headers, 10);
long long end_tick = a8::XGetTickCount();
if (thread_id < f8::HttpClientPool::Instance()->sys_num) {
if (end_tick - begin_tick > f8::HttpClientPool::Instance()->max_sys_request_delay) {
f8::HttpClientPool::Instance()->max_sys_request_delay = end_tick - begin_tick;
}
} else {
if (end_tick - begin_tick > f8::HttpClientPool::Instance()->max_user_request_delay) {
f8::HttpClientPool::Instance()->max_user_request_delay = end_tick - begin_tick;
}
}
break;
}
case 2:
{
long long begin_tick = a8::XGetTickCount();
ret = a8::http::Post(finally_url.c_str(), node->content, response, &node->headers, 10);
long long end_tick = a8::XGetTickCount();
if (thread_id < f8::HttpClientPool::Instance()->sys_num) {
if (end_tick - begin_tick > f8::HttpClientPool::Instance()->max_sys_request_delay) {
f8::HttpClientPool::Instance()->max_sys_request_delay = end_tick - begin_tick;
}
} else {
if (end_tick - begin_tick > f8::HttpClientPool::Instance()->max_user_request_delay) {
f8::HttpClientPool::Instance()->max_user_request_delay = end_tick - begin_tick;
}
}
break;
}
default:
{
break;
}
}
if (ret) {
a8::XObject* xobj = new a8::XObject();
if (xobj->ReadFromJsonString(response)) {
f8::MsgQueue::Instance()->PostMsg_r(exec_async_http_msgid,
a8::XParams()
.SetSender(node->context_id)
.SetParam1(AHE_NO_ERROR)
.SetParam2((void*)xobj)
);
} else {
f8::MsgQueue::Instance()->PostMsg_r(exec_async_http_msgid,
a8::XParams()
.SetSender(node->context_id)
.SetParam1(AHE_NO_CONN)
.SetParam2(response)
);
delete xobj;
}
} else {
f8::MsgQueue::Instance()->PostMsg_r(exec_async_http_msgid,
a8::XParams()
.SetSender(node->context_id)
.SetParam1(AHE_NO_CONN)
);
}
}
public:
int exec_async_http_msgid = 0;
int thread_id = 0;
private:
volatile bool terminated = false;
std::mutex *loop_mutex_ = nullptr;
std::condition_variable *loop_cond_ = nullptr;
std::thread *work_thread_ = nullptr;
AsyncHttpNode *top_node_ = nullptr;
AsyncHttpNode *bot_node_ = nullptr;
AsyncHttpNode *work_node_ = nullptr;
std::mutex *msg_mutex_ = nullptr;
};
class AsyncCurl
{
public:
void Init()
{
curl_global_init(CURL_GLOBAL_ALL);
curl_handle_ = curl_multi_init();
curl_multi_setopt(curl_handle_, CURLMOPT_SOCKETFUNCTION, HandleSocket);
curl_multi_setopt(curl_handle_, CURLMOPT_TIMERFUNCTION, StartTimeout);
}
void UnInit()
{
curl_multi_cleanup(curl_handle_);
curl_handle_ = nullptr;
}
static int HandleSocket(CURL *easy, curl_socket_t s, int action, void *userp, void *socketp)
{
return 0;
}
static void StartTimeout(CURLM *multi, long timeout_ms, void *userp)
{
}
private:
CURLM *curl_handle_ = nullptr;
};
class HttpClientPoolImpl
{
public:
void Init()
{
curr_seqid = 1000001;
exec_async_http_msgid = MsgQueue::Instance()->AllocIMMsgId(nullptr);
}
void UnInit()
{
#if F8_MUTLI_THREAD_HTTP
for (auto& itr : http_thread_pool) {
HttpThread* thread = itr;
thread->UnInit();
delete thread;
}
http_thread_pool.clear();
#else
async_curl_->UnInit();
delete async_curl_;
async_curl_ = nullptr;
#endif
}
void SetThreadNum(int thread_num)
{
#if F8_MUTLI_THREAD_HTTP
assert(thread_num > 0);
for (int i = 0; i < thread_num; i++) {
HttpThread *http_thread = new HttpThread();
http_thread->exec_async_http_msgid = exec_async_http_msgid;
http_thread->thread_id = i;
http_thread->Init();
http_thread_pool.push_back(http_thread);
}
#endif
}
AsyncHttpRequest* GetAsyncHttpRequest(long long seqid)
{
auto itr = async_http_hash.find(seqid);
return itr != async_http_hash.end() ? itr->second : nullptr;
}
void AsyncHttpOnOk(long long seqid, a8::XObject& data)
{
AsyncHttpRequest* request = GetAsyncHttpRequest(seqid);
if (!request) {
return;
}
if (request->on_ok) {
request->on_ok(request->param, data);
}
async_http_hash.erase(seqid);
delete request;
}
void AsyncHttpOnError(long long seqid, const std::string& response)
{
AsyncHttpRequest* request = GetAsyncHttpRequest(seqid);
if (!request) {
return;
}
if (request->on_error) {
request->on_error(request->param, response);
}
async_http_hash.erase(seqid);
delete request;
}
void InternalExecAsyncHttp(int method, const char* url, a8::XObject& url_params,
const char* content, a8::XObject* headers,
a8::XParams param, AsyncHttpOnOkFunc on_ok, AsyncHttpOnErrorFunc on_error,
long long hash_code)
{
++pending_num;
AsyncHttpRequest* p = new AsyncHttpRequest();
{
p->context_id = ++curr_seqid;
p->param = param;
p->add_time = time(nullptr);
p->on_ok = on_ok;
p->on_error = on_error;
async_http_hash[p->context_id] = p;
}
#if F8_MUTLI_THREAD_HTTP
HttpThread* http_thread = GetHttpThread(hash_code);
if (!http_thread) {
abort();
return;
}
#endif
{
AsyncHttpNode* node = new AsyncHttpNode();
node->socket_handle = 0;
node->context_id = p->context_id;
node->method = method;
node->url = url;
url_params.ToUrlEncodeStr(node->url_params);
//url_params.ToKVList(node->kv_params);
node->content = std::string(content);
if (headers) {
headers->DeepCopy(node->headers);
}
#if F8_MUTLI_THREAD_HTTP
http_thread->AddAsyncHttp(node);
#endif
}
a8::Timer::Instance()->AddDeadLineTimerAndAttach(1000 * 10,
a8::XParams()
.SetSender(this)
.SetParam1(p->context_id),
[] (const a8::XParams& param)
{
},
&p->timer_attacher.timer_list_);
}
#if F8_MUTLI_THREAD_HTTP
HttpThread* GetHttpThread(long long hash_code)
{
if (http_thread_pool.empty() || hash_code < 0) {
return nullptr;
}
return http_thread_pool[hash_code % http_thread_pool.size()];
}
#endif
public:
long long curr_seqid = 0;
std::atomic<long long> pending_num = {0};
std::map<long long, AsyncHttpRequest*> async_http_hash;
unsigned short exec_async_http_msgid = 0;
#if F8_MUTLI_THREAD_HTTP
std::vector<HttpThread*> http_thread_pool;
#else
AsyncCurl* async_curl_ = nullptr;
#endif
pthread_mutex_t* mutex_buf = nullptr;
size_t mutex_buf_size = 0;
};
static void locking_function(int mode, int n, const char *file, int line)
{
if (n != f8::HttpClientPool::Instance()->Impl()->mutex_buf_size) {
abort();
}
pthread_mutex_t* mutex = &f8::HttpClientPool::Instance()->Impl()->mutex_buf[n];
if(mode & CRYPTO_LOCK) {
pthread_mutex_lock(mutex);
} else {
pthread_mutex_unlock(mutex);
}
}
static unsigned long id_function(void)
{
return pthread_self();
}
void HttpClientPool::Init(int thread_num, int sys_num, int user_num)
{
if (thread_num < 0 || sys_num < 0 || user_num < 0) {
abort();
}
if (thread_num != sys_num + user_num) {
abort();
}
this->thread_num = thread_num;
this->sys_num = sys_num;
this->user_num = user_num;
#if 1
curl_global_init(CURL_GLOBAL_ALL);
#endif
impl_ = new HttpClientPoolImpl();
impl_->Init();
MsgQueue::Instance()->RegisterCallBack(impl_->exec_async_http_msgid,
[] (const a8::XParams& param)
{
--(HttpClientPool::Instance()->impl_->pending_num);
if (param.param1.GetInt() == AHE_NO_ERROR) {
a8::XObject* xobj = (a8::XObject*)param.param2.GetUserData();
HttpClientPool::Instance()->impl_->AsyncHttpOnOk(param.sender, *xobj);
delete xobj;
} else {
HttpClientPool::Instance()->impl_->AsyncHttpOnError(param.sender,
param.param2.GetString());
}
}
);
impl_->SetThreadNum(thread_num);
#if 1
{
if (CRYPTO_num_locks() > 0) {
impl_->mutex_buf = (pthread_mutex_t*)malloc(CRYPTO_num_locks() * sizeof(pthread_mutex_t));
impl_->mutex_buf_size = CRYPTO_num_locks();
for (size_t i = 0; i < impl_->mutex_buf_size; ++i) {
pthread_mutex_init(&impl_->mutex_buf[i], nullptr);
}
}
CRYPTO_set_id_callback(id_function);
CRYPTO_set_locking_callback(locking_function);
}
#endif
}
void HttpClientPool::UnInit()
{
impl_->UnInit();
#if 1
{
CRYPTO_set_id_callback(nullptr);
CRYPTO_set_locking_callback(nullptr);
if (impl_->mutex_buf) {
for (size_t i = 0; i < impl_->mutex_buf_size; ++i) {
pthread_mutex_destroy(&impl_->mutex_buf[i]);
}
free(impl_->mutex_buf);
impl_->mutex_buf = nullptr;
}
}
#endif
delete impl_;
impl_ = nullptr;
#if 1
curl_global_cleanup();
#endif
}
void HttpClientPool::HttpGet(a8::XParams param, AsyncHttpOnOkFunc on_ok, AsyncHttpOnErrorFunc on_error,
const char* url, a8::XObject url_params,
long long hash_code, a8::XObject* headers)
{
impl_->InternalExecAsyncHttp(1, url, url_params, "", headers, param, on_ok, on_error, hash_code);
}
void HttpClientPool::HttpPost(a8::XParams param, AsyncHttpOnOkFunc on_ok, AsyncHttpOnErrorFunc on_error,
const char* url, a8::XObject url_params, const std::string& content,
long long hash_code, a8::XObject* headers)
{
impl_->InternalExecAsyncHttp(2, url, url_params, content.c_str(), headers, param, on_ok, on_error, hash_code);
}
long long HttpClientPool::GetPendingNum()
{
return impl_->pending_num;
}
HttpClientPoolImpl* HttpClientPool::Impl()
{
return impl_;
}
}