This commit is contained in:
aozhiwei 2022-12-18 14:36:36 +08:00
parent 8524855f92
commit 0a591f7913

View File

@ -25,8 +25,6 @@
#include <f8/utils.h> #include <f8/utils.h>
#include <f8/timer.h> #include <f8/timer.h>
#define F8_MUTLI_THREAD_HTTP 1
namespace f8 namespace f8
{ {
@ -39,14 +37,8 @@ namespace f8
struct AsyncHttpRequest struct AsyncHttpRequest
{ {
long long context_id = 0; long long context_id = 0;
#if 0 f8::HttpProxyCb cb;
a8::XParams param;
#endif
time_t add_time = 0; time_t add_time = 0;
#if 0
AsyncHttpOnOkFunc on_ok = nullptr;
AsyncHttpOnErrorFunc on_error = nullptr;
#endif
f8::Attacher timer_attacher; f8::Attacher timer_attacher;
}; };
@ -324,23 +316,16 @@ namespace f8
void UnInit() void UnInit()
{ {
#if F8_MUTLI_THREAD_HTTP
for (auto& itr : http_thread_pool) { for (auto& itr : http_thread_pool) {
HttpThread* thread = itr; HttpThread* thread = itr;
thread->UnInit(); thread->UnInit();
delete thread; delete thread;
} }
http_thread_pool.clear(); http_thread_pool.clear();
#else
async_curl_->UnInit();
delete async_curl_;
async_curl_ = nullptr;
#endif
} }
void SetThreadNum(int thread_num) void SetThreadNum(int thread_num)
{ {
#if F8_MUTLI_THREAD_HTTP
assert(thread_num > 0); assert(thread_num > 0);
for (int i = 0; i < thread_num; i++) { for (int i = 0; i < thread_num; i++) {
HttpThread *http_thread = new HttpThread(); HttpThread *http_thread = new HttpThread();
@ -349,7 +334,6 @@ namespace f8
http_thread->Init(); http_thread->Init();
http_thread_pool.push_back(http_thread); http_thread_pool.push_back(http_thread);
} }
#endif
} }
AsyncHttpRequest* GetAsyncHttpRequest(long long seqid) AsyncHttpRequest* GetAsyncHttpRequest(long long seqid)
@ -388,10 +372,12 @@ namespace f8
delete request; delete request;
} }
#if 0 void InternalExecAsyncHttp(int method,
void InternalExecAsyncHttp(int method, const char* url, a8::XObject& url_params, const char* url,
const char* content, a8::XObject* headers, a8::XObject& url_params,
a8::XParams param, AsyncHttpOnOkFunc on_ok, AsyncHttpOnErrorFunc on_error, const char* content,
a8::XObject* headers,
f8::HttpProxyCb cb,
long long hash_code) long long hash_code)
{ {
@ -399,19 +385,15 @@ namespace f8
AsyncHttpRequest* p = new AsyncHttpRequest(); AsyncHttpRequest* p = new AsyncHttpRequest();
{ {
p->context_id = ++curr_seqid; p->context_id = ++curr_seqid;
p->param = param;
p->add_time = time(nullptr); p->add_time = time(nullptr);
p->on_ok = on_ok; p->cb = cb;
p->on_error = on_error;
async_http_hash[p->context_id] = p; async_http_hash[p->context_id] = p;
} }
#if F8_MUTLI_THREAD_HTTP
HttpThread* http_thread = GetHttpThread(hash_code); HttpThread* http_thread = GetHttpThread(hash_code);
if (!http_thread) { if (!http_thread) {
abort(); abort();
return; return;
} }
#endif
{ {
AsyncHttpNode* node = new AsyncHttpNode(); AsyncHttpNode* node = new AsyncHttpNode();
node->socket_handle = 0; node->socket_handle = 0;
@ -424,9 +406,7 @@ namespace f8
if (headers) { if (headers) {
headers->DeepCopy(node->headers); headers->DeepCopy(node->headers);
} }
#if F8_MUTLI_THREAD_HTTP
http_thread->AddAsyncHttp(node); http_thread->AddAsyncHttp(node);
#endif
} }
f8::Timer::Instance()->SetTimeoutEx f8::Timer::Instance()->SetTimeoutEx
(1000 * 10, (1000 * 10,
@ -436,9 +416,7 @@ namespace f8
}, },
&p->timer_attacher); &p->timer_attacher);
} }
#endif
#if F8_MUTLI_THREAD_HTTP
HttpThread* GetHttpThread(long long hash_code) HttpThread* GetHttpThread(long long hash_code)
{ {
if (http_thread_pool.empty() || hash_code < 0) { if (http_thread_pool.empty() || hash_code < 0) {
@ -446,7 +424,6 @@ namespace f8
} }
return http_thread_pool[hash_code % http_thread_pool.size()]; return http_thread_pool[hash_code % http_thread_pool.size()];
} }
#endif
public: public:
long long curr_seqid = 0; long long curr_seqid = 0;
@ -454,33 +431,11 @@ namespace f8
std::map<long long, AsyncHttpRequest*> async_http_hash; std::map<long long, AsyncHttpRequest*> async_http_hash;
unsigned short exec_async_http_msgid = 0; unsigned short exec_async_http_msgid = 0;
#if F8_MUTLI_THREAD_HTTP
std::vector<HttpThread*> http_thread_pool; std::vector<HttpThread*> http_thread_pool;
#else
AsyncCurl* async_curl_ = nullptr;
#endif
pthread_mutex_t* mutex_buf = nullptr; pthread_mutex_t* mutex_buf = nullptr;
size_t mutex_buf_size = 0; size_t mutex_buf_size = 0;
}; };
static void locking_function(int mode, int n, const char *file, int line)
{
if (n != (int)f8::HttpClientPool::Instance()->Impl()->mutex_buf_size) {
abort();
}
pthread_mutex_t* mutex = &f8::HttpClientPool::Instance()->Impl()->mutex_buf[n];
if(mode & CRYPTO_LOCK) {
pthread_mutex_lock(mutex);
} else {
pthread_mutex_unlock(mutex);
}
}
static unsigned long id_function(void)
{
return pthread_self();
}
void HttpClientPool::Init(int thread_num, int sys_num, int user_num) void HttpClientPool::Init(int thread_num, int sys_num, int user_num)
{ {
if (thread_num < 0 || sys_num < 0 || user_num < 0) { if (thread_num < 0 || sys_num < 0 || user_num < 0) {
@ -560,20 +515,29 @@ namespace f8
a8::XObject* headers) a8::XObject* headers)
{ {
#if 0 impl_->InternalExecAsyncHttp(1,
impl_->InternalExecAsyncHttp(1, url, url_params, "", headers, param, on_ok, on_error, hash_code); url,
#endif url_params,
"",
headers,
cb,
hash_code);
} }
void HttpClientPool::HttpPost(f8::HttpProxyCb cb, void HttpClientPool::HttpPost(f8::HttpProxyCb cb,
const char* url, const char* url,
a8::XObject url_params, a8::XObject url_params,
const std::string& content, const std::string& content,
long long hash_code, a8::XObject* headers) long long hash_code,
a8::XObject* headers)
{ {
#if 0 impl_->InternalExecAsyncHttp(2,
impl_->InternalExecAsyncHttp(2, url, url_params, content.c_str(), headers, param, on_ok, on_error, hash_code); url,
#endif url_params,
content.c_str(),
headers,
cb,
hash_code);
} }
long long HttpClientPool::GetPendingNum() long long HttpClientPool::GetPendingNum()