This commit is contained in:
aozhiwei 2019-06-20 19:09:27 +08:00
parent d38901a6b4
commit df2936127c

View File

@ -4,6 +4,9 @@
#include <mutex> #include <mutex>
#include <condition_variable> #include <condition_variable>
#include <curl/curl.h>
#include <curl/easy.h>
#include <a8/list.h> #include <a8/list.h>
#include <a8/timer.h> #include <a8/timer.h>
#include <a8/timer_attacher.h> #include <a8/timer_attacher.h>
@ -15,6 +18,8 @@
#include "framework/cpp/msgqueue.h" #include "framework/cpp/msgqueue.h"
#include "framework/cpp/utils.h" #include "framework/cpp/utils.h"
#define F8_MUTLI_THREAD_HTTP 1
namespace f8 namespace f8
{ {
@ -233,6 +238,37 @@ namespace f8
std::mutex *msg_mutex_ = nullptr; std::mutex *msg_mutex_ = nullptr;
}; };
class AsyncCurl
{
public:
void Init()
{
curl_global_init(CURL_GLOBAL_ALL);
curl_handle_ = curl_multi_init();
curl_multi_setopt(curl_handle_, CURLMOPT_SOCKETFUNCTION, HandleSocket);
curl_multi_setopt(curl_handle_, CURLMOPT_TIMERFUNCTION, StartTimeout);
}
void UnInit()
{
curl_multi_cleanup(curl_handle_);
curl_handle_ = nullptr;
}
static int HandleSocket(CURL *easy, curl_socket_t s, int action, void *userp, void *socketp)
{
}
static void StartTimeout(CURLM *multi, long timeout_ms, void *userp)
{
}
private:
CURLM *curl_handle_ = nullptr;
};
class HttpClientPoolImpl class HttpClientPoolImpl
{ {
public: public:
@ -245,16 +281,23 @@ namespace f8
void UnInit() void UnInit()
{ {
#if F8_MUTLI_THREAD_HTTP
for (auto& itr : http_thread_pool) { for (auto& itr : http_thread_pool) {
HttpThread* thread = itr; HttpThread* thread = itr;
thread->UnInit(); thread->UnInit();
delete thread; delete thread;
} }
http_thread_pool.clear(); http_thread_pool.clear();
#else
async_curl_->UnInit();
delete async_curl_;
async_curl_ = nullptr;
#endif
} }
void SetThreadNum(int thread_num) void SetThreadNum(int thread_num)
{ {
#if F8_MUTLI_THREAD_HTTP
assert(thread_num > 0); assert(thread_num > 0);
for (int i = 0; i < thread_num; i++) { for (int i = 0; i < thread_num; i++) {
HttpThread *http_thread = new HttpThread(); HttpThread *http_thread = new HttpThread();
@ -262,6 +305,7 @@ namespace f8
http_thread->Init(); http_thread->Init();
http_thread_pool.push_back(http_thread); http_thread_pool.push_back(http_thread);
} }
#endif
} }
AsyncHttpRequest* GetAsyncHttpRequest(long long seqid) AsyncHttpRequest* GetAsyncHttpRequest(long long seqid)
@ -311,11 +355,13 @@ namespace f8
p->on_error = on_error; p->on_error = on_error;
async_http_hash[p->context_id] = p; async_http_hash[p->context_id] = p;
} }
#if F8_MUTLI_THREAD_HTTP
HttpThread* http_thread = GetHttpThread(hash_code); HttpThread* http_thread = GetHttpThread(hash_code);
if (!http_thread) { if (!http_thread) {
abort(); abort();
return; return;
} }
#endif
{ {
AsyncHttpNode* node = new AsyncHttpNode(); AsyncHttpNode* node = new AsyncHttpNode();
node->socket_handle = 0; node->socket_handle = 0;
@ -327,7 +373,9 @@ namespace f8
if (headers) { if (headers) {
headers->DeepCopy(node->headers); headers->DeepCopy(node->headers);
} }
#if F8_MUTLI_THREAD_HTTP
http_thread->AddAsyncHttp(node); http_thread->AddAsyncHttp(node);
#endif
} }
a8::Timer::Instance()->AddDeadLineTimerAndAttach(1000 * 10, a8::Timer::Instance()->AddDeadLineTimerAndAttach(1000 * 10,
a8::XParams() a8::XParams()
@ -340,6 +388,7 @@ namespace f8
&p->timer_attacher.timer_list_); &p->timer_attacher.timer_list_);
} }
#if F8_MUTLI_THREAD_HTTP
HttpThread* GetHttpThread(long long hash_code) HttpThread* GetHttpThread(long long hash_code)
{ {
if (http_thread_pool.empty() || hash_code < 0) { if (http_thread_pool.empty() || hash_code < 0) {
@ -347,13 +396,18 @@ namespace f8
} }
return http_thread_pool[hash_code % http_thread_pool.size()]; return http_thread_pool[hash_code % http_thread_pool.size()];
} }
#endif
public: public:
long long curr_seqid = 0; long long curr_seqid = 0;
std::map<long long, AsyncHttpRequest*> async_http_hash; std::map<long long, AsyncHttpRequest*> async_http_hash;
unsigned short exec_async_http_msgid = 0; unsigned short exec_async_http_msgid = 0;
#if F8_MUTLI_THREAD_HTTP
std::vector<HttpThread*> http_thread_pool; std::vector<HttpThread*> http_thread_pool;
#else
AsyncCurl* async_curl_ = nullptr;
#endif
}; };
void HttpClientPool::Init(int thread_num) void HttpClientPool::Init(int thread_num)