#include "precompile.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include "framework/cpp/httpclientpool.h" #include "framework/cpp/msgqueue.h" #include "framework/cpp/utils.h" #define F8_MUTLI_THREAD_HTTP 1 namespace f8 { enum AsyncHttpError { AHE_NO_ERROR = 0, AHE_NO_CONN = 1, }; struct AsyncHttpRequest { long long context_id = 0; a8::XParams param; time_t add_time = 0; AsyncHttpOnOkFunc on_ok = nullptr; AsyncHttpOnErrorFunc on_error = nullptr; a8::TimerAttacher timer_attacher; }; struct AsyncHttpNode { int socket_handle = 0; long long context_id = 0; int method = 0; std::string url; std::string url_params; std::string content; a8::XObject headers; std::map kv_params; AsyncHttpNode* nextnode = nullptr; }; class HttpThread { public: void Init() { loop_mutex_ = new std::mutex(); loop_cond_ = new std::condition_variable(); top_node_ = nullptr; bot_node_ = nullptr; work_node_ = nullptr; msg_mutex_ = new std::mutex(); work_thread_ = new std::thread(&HttpThread::WorkThreadProc, this); } void UnInit() { terminated = true; loop_cond_->notify_all(); work_thread_->join(); delete work_thread_; work_thread_ = nullptr; loop_mutex_->lock(); if (!work_node_) { work_node_ = top_node_; top_node_ = nullptr; bot_node_ = nullptr; } while (work_node_) { AsyncHttpNode* pdelnode = work_node_; work_node_ = work_node_->nextnode; if (!work_node_) { work_node_ = top_node_; top_node_ = nullptr; bot_node_ = nullptr; } delete pdelnode; } loop_mutex_->unlock(); delete loop_cond_; loop_cond_ = nullptr; delete loop_mutex_; loop_mutex_ = nullptr; delete msg_mutex_; msg_mutex_ = nullptr; } void AddAsyncHttp(AsyncHttpNode* p) { std::unique_lock lk(*loop_mutex_); msg_mutex_->lock(); if (bot_node_) { bot_node_->nextnode = p; bot_node_ = p; } else { top_node_ = p; bot_node_ = p; } msg_mutex_->unlock(); loop_cond_->notify_all(); } private: void WorkThreadProc() { while (!terminated) { ProcessMsg(); WaitLoopCond(); } } void ProcessMsg() { if (!work_node_ && top_node_) { msg_mutex_->lock(); work_node_ = top_node_; top_node_ = nullptr; bot_node_ = nullptr; msg_mutex_->unlock(); } while (work_node_) { AsyncHttpNode *pdelnode = work_node_; work_node_ = work_node_->nextnode; ProcAsyncHttp(pdelnode); delete pdelnode; } } void WaitLoopCond() { std::unique_lock lk(*loop_mutex_); { msg_mutex_->lock(); if (!work_node_ && top_node_) { work_node_ = top_node_; top_node_ = nullptr; bot_node_ = nullptr; } msg_mutex_->unlock(); } if (!work_node_) { loop_cond_->wait_for(lk, std::chrono::seconds(10)); } } void ProcSign(AsyncHttpNode* node) { } void ProcAsyncHttp(AsyncHttpNode* node) { ProcSign(node); std::string finally_url; if (node->url.find('?') != std::string::npos) { finally_url = node->url + node->url_params; } else { finally_url = node->url + "?" + node->url_params; } std::string response; bool ret = false; switch (node->method) { case 1: { long long begin_tick = a8::XGetTickCount(); ret = a8::http::Get(finally_url, response, &node->headers, 10); long long end_tick = a8::XGetTickCount(); if (thread_id < f8::HttpClientPool::Instance()->sys_num) { if (end_tick - begin_tick > f8::HttpClientPool::Instance()->max_sys_request_delay) { f8::HttpClientPool::Instance()->max_sys_request_delay = end_tick - begin_tick; } } else { if (end_tick - begin_tick > f8::HttpClientPool::Instance()->max_user_request_delay) { f8::HttpClientPool::Instance()->max_user_request_delay = end_tick - begin_tick; } } break; } case 2: { long long begin_tick = a8::XGetTickCount(); ret = a8::http::Post(finally_url.c_str(), node->content, response, &node->headers, 10); long long end_tick = a8::XGetTickCount(); if (thread_id < f8::HttpClientPool::Instance()->sys_num) { if (end_tick - begin_tick > f8::HttpClientPool::Instance()->max_sys_request_delay) { f8::HttpClientPool::Instance()->max_sys_request_delay = end_tick - begin_tick; } } else { if (end_tick - begin_tick > f8::HttpClientPool::Instance()->max_user_request_delay) { f8::HttpClientPool::Instance()->max_user_request_delay = end_tick - begin_tick; } } break; } default: { break; } } if (ret) { a8::XObject* xobj = new a8::XObject(); if (xobj->ReadFromJsonString(response)) { f8::MsgQueue::Instance()->PostMsg_r(exec_async_http_msgid, a8::XParams() .SetSender(node->context_id) .SetParam1(AHE_NO_ERROR) .SetParam2((void*)xobj) ); } else { f8::MsgQueue::Instance()->PostMsg_r(exec_async_http_msgid, a8::XParams() .SetSender(node->context_id) .SetParam1(AHE_NO_CONN) .SetParam2(response) ); delete xobj; } } else { f8::MsgQueue::Instance()->PostMsg_r(exec_async_http_msgid, a8::XParams() .SetSender(node->context_id) .SetParam1(AHE_NO_CONN) ); } } public: int exec_async_http_msgid = 0; int thread_id = 0; private: volatile bool terminated = false; std::mutex *loop_mutex_ = nullptr; std::condition_variable *loop_cond_ = nullptr; std::thread *work_thread_ = nullptr; AsyncHttpNode *top_node_ = nullptr; AsyncHttpNode *bot_node_ = nullptr; AsyncHttpNode *work_node_ = nullptr; std::mutex *msg_mutex_ = nullptr; }; class AsyncCurl { public: void Init() { curl_global_init(CURL_GLOBAL_ALL); curl_handle_ = curl_multi_init(); curl_multi_setopt(curl_handle_, CURLMOPT_SOCKETFUNCTION, HandleSocket); curl_multi_setopt(curl_handle_, CURLMOPT_TIMERFUNCTION, StartTimeout); } void UnInit() { curl_multi_cleanup(curl_handle_); curl_handle_ = nullptr; } static int HandleSocket(CURL *easy, curl_socket_t s, int action, void *userp, void *socketp) { return 0; } static void StartTimeout(CURLM *multi, long timeout_ms, void *userp) { } private: CURLM *curl_handle_ = nullptr; }; class HttpClientPoolImpl { public: void Init() { curr_seqid = 1000001; exec_async_http_msgid = MsgQueue::Instance()->AllocIMMsgId(nullptr); } void UnInit() { #if F8_MUTLI_THREAD_HTTP for (auto& itr : http_thread_pool) { HttpThread* thread = itr; thread->UnInit(); delete thread; } http_thread_pool.clear(); #else async_curl_->UnInit(); delete async_curl_; async_curl_ = nullptr; #endif } void SetThreadNum(int thread_num) { #if F8_MUTLI_THREAD_HTTP assert(thread_num > 0); for (int i = 0; i < thread_num; i++) { HttpThread *http_thread = new HttpThread(); http_thread->exec_async_http_msgid = exec_async_http_msgid; http_thread->thread_id = i; http_thread->Init(); http_thread_pool.push_back(http_thread); } #endif } AsyncHttpRequest* GetAsyncHttpRequest(long long seqid) { auto itr = async_http_hash.find(seqid); return itr != async_http_hash.end() ? itr->second : nullptr; } void AsyncHttpOnOk(long long seqid, a8::XObject& data) { AsyncHttpRequest* request = GetAsyncHttpRequest(seqid); if (!request) { return; } if (request->on_ok) { request->on_ok(request->param, data); } async_http_hash.erase(seqid); delete request; } void AsyncHttpOnError(long long seqid, const std::string& response) { AsyncHttpRequest* request = GetAsyncHttpRequest(seqid); if (!request) { return; } if (request->on_error) { request->on_error(request->param, response); } async_http_hash.erase(seqid); delete request; } void InternalExecAsyncHttp(int method, const char* url, a8::XObject& url_params, const char* content, a8::XObject* headers, a8::XParams param, AsyncHttpOnOkFunc on_ok, AsyncHttpOnErrorFunc on_error, long long hash_code) { ++pending_num; AsyncHttpRequest* p = new AsyncHttpRequest(); { p->context_id = ++curr_seqid; p->param = param; p->add_time = time(nullptr); p->on_ok = on_ok; p->on_error = on_error; async_http_hash[p->context_id] = p; } #if F8_MUTLI_THREAD_HTTP HttpThread* http_thread = GetHttpThread(hash_code); if (!http_thread) { abort(); return; } #endif { AsyncHttpNode* node = new AsyncHttpNode(); node->socket_handle = 0; node->context_id = p->context_id; node->method = method; node->url = url; url_params.ToUrlEncodeStr(node->url_params); //url_params.ToKVList(node->kv_params); node->content = std::string(content); if (headers) { headers->DeepCopy(node->headers); } #if F8_MUTLI_THREAD_HTTP http_thread->AddAsyncHttp(node); #endif } a8::Timer::Instance()->AddDeadLineTimerAndAttach(1000 * 10, a8::XParams() .SetSender(this) .SetParam1(p->context_id), [] (const a8::XParams& param) { }, &p->timer_attacher.timer_list_); } #if F8_MUTLI_THREAD_HTTP HttpThread* GetHttpThread(long long hash_code) { if (http_thread_pool.empty() || hash_code < 0) { return nullptr; } return http_thread_pool[hash_code % http_thread_pool.size()]; } #endif public: long long curr_seqid = 0; std::atomic pending_num = {0}; std::map async_http_hash; unsigned short exec_async_http_msgid = 0; #if F8_MUTLI_THREAD_HTTP std::vector http_thread_pool; #else AsyncCurl* async_curl_ = nullptr; #endif pthread_mutex_t* mutex_buf = nullptr; size_t mutex_buf_size = 0; }; static void locking_function(int mode, int n, const char *file, int line) { if (n != (int)f8::HttpClientPool::Instance()->Impl()->mutex_buf_size) { abort(); } pthread_mutex_t* mutex = &f8::HttpClientPool::Instance()->Impl()->mutex_buf[n]; if(mode & CRYPTO_LOCK) { pthread_mutex_lock(mutex); } else { pthread_mutex_unlock(mutex); } } static unsigned long id_function(void) { return pthread_self(); } void HttpClientPool::Init(int thread_num, int sys_num, int user_num) { if (thread_num < 0 || sys_num < 0 || user_num < 0) { abort(); } if (thread_num != sys_num + user_num) { abort(); } this->thread_num = thread_num; this->sys_num = sys_num; this->user_num = user_num; #if 1 curl_global_init(CURL_GLOBAL_ALL); #endif impl_ = new HttpClientPoolImpl(); impl_->Init(); MsgQueue::Instance()->RegisterCallBack(impl_->exec_async_http_msgid, [] (const a8::XParams& param) { --(HttpClientPool::Instance()->impl_->pending_num); if (param.param1.GetInt() == AHE_NO_ERROR) { a8::XObject* xobj = (a8::XObject*)param.param2.GetUserData(); HttpClientPool::Instance()->impl_->AsyncHttpOnOk(param.sender, *xobj); delete xobj; } else { HttpClientPool::Instance()->impl_->AsyncHttpOnError(param.sender, param.param2.GetString()); } } ); impl_->SetThreadNum(thread_num); #if 1 { if (CRYPTO_num_locks() > 0) { impl_->mutex_buf = (pthread_mutex_t*)malloc(CRYPTO_num_locks() * sizeof(pthread_mutex_t)); impl_->mutex_buf_size = CRYPTO_num_locks(); for (size_t i = 0; i < impl_->mutex_buf_size; ++i) { pthread_mutex_init(&impl_->mutex_buf[i], nullptr); } } CRYPTO_set_id_callback(id_function); CRYPTO_set_locking_callback(locking_function); } #endif } void HttpClientPool::UnInit() { impl_->UnInit(); #if 1 { CRYPTO_set_id_callback(nullptr); CRYPTO_set_locking_callback(nullptr); if (impl_->mutex_buf) { for (size_t i = 0; i < impl_->mutex_buf_size; ++i) { pthread_mutex_destroy(&impl_->mutex_buf[i]); } free(impl_->mutex_buf); impl_->mutex_buf = nullptr; } } #endif delete impl_; impl_ = nullptr; #if 1 curl_global_cleanup(); #endif } void HttpClientPool::HttpGet(a8::XParams param, AsyncHttpOnOkFunc on_ok, AsyncHttpOnErrorFunc on_error, const char* url, a8::XObject url_params, long long hash_code, a8::XObject* headers) { impl_->InternalExecAsyncHttp(1, url, url_params, "", headers, param, on_ok, on_error, hash_code); } void HttpClientPool::HttpPost(a8::XParams param, AsyncHttpOnOkFunc on_ok, AsyncHttpOnErrorFunc on_error, const char* url, a8::XObject url_params, const std::string& content, long long hash_code, a8::XObject* headers) { impl_->InternalExecAsyncHttp(2, url, url_params, content.c_str(), headers, param, on_ok, on_error, hash_code); } long long HttpClientPool::GetPendingNum() { return impl_->pending_num; } HttpClientPoolImpl* HttpClientPool::Impl() { return impl_; } }