diff --git a/cpp/httpclientpool.cc b/cpp/httpclientpool.cc index 6bc6097..ce95b4d 100644 --- a/cpp/httpclientpool.cc +++ b/cpp/httpclientpool.cc @@ -4,6 +4,9 @@ #include #include +#include +#include + #include #include #include @@ -15,6 +18,8 @@ #include "framework/cpp/msgqueue.h" #include "framework/cpp/utils.h" +#define F8_MUTLI_THREAD_HTTP 1 + namespace f8 { @@ -233,6 +238,37 @@ namespace f8 std::mutex *msg_mutex_ = nullptr; }; + class AsyncCurl + { + public: + + void Init() + { + curl_global_init(CURL_GLOBAL_ALL); + curl_handle_ = curl_multi_init(); + curl_multi_setopt(curl_handle_, CURLMOPT_SOCKETFUNCTION, HandleSocket); + curl_multi_setopt(curl_handle_, CURLMOPT_TIMERFUNCTION, StartTimeout); + } + + void UnInit() + { + curl_multi_cleanup(curl_handle_); + curl_handle_ = nullptr; + } + + static int HandleSocket(CURL *easy, curl_socket_t s, int action, void *userp, void *socketp) + { + } + + static void StartTimeout(CURLM *multi, long timeout_ms, void *userp) + { + + } + + private: + CURLM *curl_handle_ = nullptr; + }; + class HttpClientPoolImpl { public: @@ -245,16 +281,23 @@ namespace f8 void UnInit() { +#if F8_MUTLI_THREAD_HTTP for (auto& itr : http_thread_pool) { HttpThread* thread = itr; thread->UnInit(); delete thread; } http_thread_pool.clear(); +#else + async_curl_->UnInit(); + delete async_curl_; + async_curl_ = nullptr; +#endif } void SetThreadNum(int thread_num) { +#if F8_MUTLI_THREAD_HTTP assert(thread_num > 0); for (int i = 0; i < thread_num; i++) { HttpThread *http_thread = new HttpThread(); @@ -262,6 +305,7 @@ namespace f8 http_thread->Init(); http_thread_pool.push_back(http_thread); } +#endif } AsyncHttpRequest* GetAsyncHttpRequest(long long seqid) @@ -311,11 +355,13 @@ namespace f8 p->on_error = on_error; async_http_hash[p->context_id] = p; } +#if F8_MUTLI_THREAD_HTTP HttpThread* http_thread = GetHttpThread(hash_code); if (!http_thread) { abort(); return; } +#endif { AsyncHttpNode* node = new AsyncHttpNode(); node->socket_handle = 0; @@ -327,7 +373,9 @@ namespace f8 if (headers) { headers->DeepCopy(node->headers); } +#if F8_MUTLI_THREAD_HTTP http_thread->AddAsyncHttp(node); +#endif } a8::Timer::Instance()->AddDeadLineTimerAndAttach(1000 * 10, a8::XParams() @@ -340,6 +388,7 @@ namespace f8 &p->timer_attacher.timer_list_); } +#if F8_MUTLI_THREAD_HTTP HttpThread* GetHttpThread(long long hash_code) { if (http_thread_pool.empty() || hash_code < 0) { @@ -347,13 +396,18 @@ namespace f8 } return http_thread_pool[hash_code % http_thread_pool.size()]; } +#endif public: long long curr_seqid = 0; std::map async_http_hash; unsigned short exec_async_http_msgid = 0; +#if F8_MUTLI_THREAD_HTTP std::vector http_thread_pool; +#else + AsyncCurl* async_curl_ = nullptr; +#endif }; void HttpClientPool::Init(int thread_num)