diff --git a/cpp/httpclientpool.cc b/cpp/httpclientpool.cc new file mode 100644 index 0000000..d4ad9f9 --- /dev/null +++ b/cpp/httpclientpool.cc @@ -0,0 +1,284 @@ +#include "precompile.h" + +#include +#include +#include + +#include +#include +#include +#include +#include + +#include "framework/cpp/httpclientpool.h" +#include "framework/cpp/msgqueue.h" +#include "framework/cpp/utils.h" + +namespace f8 +{ + + enum AsyncHttpError + { + AHE_NO_ERROR = 0, + }; + + struct AsyncHttpRequest + { + long long context_id = 0; + a8::XParams param; + time_t add_time = 0; + AsyncHttpOnOkFunc on_ok = nullptr; + AsyncHttpOnErrorFunc on_error = nullptr; + a8::TimerAttacher timer_attacher; + }; + + struct AsyncHttpNode + { + int socket_handle = 0; + long long context_id = 0; + AsyncHttpNode* nextnode = nullptr; + }; + + class HttpThread + { + public: + + void Init() + { + loop_mutex_ = new std::mutex(); + loop_cond_ = new std::condition_variable(); + + top_node_ = nullptr; + bot_node_ = nullptr; + work_node_ = nullptr; + msg_mutex_ = new std::mutex(); + work_thread_ = new std::thread(&HttpThread::WorkThreadProc, this); + } + + void AddAsyncHttp(AsyncHttpNode* p) + { + std::unique_lock lk(*loop_mutex_); + msg_mutex_->lock(); + if (bot_node_) { + bot_node_->nextnode = p; + bot_node_ = p; + } else { + top_node_ = p; + bot_node_ = p; + } + msg_mutex_->unlock(); + loop_cond_->notify_all(); + } + + private: + + void WorkThreadProc() + { + while (true) { + ProcessMsg(); + WaitLoopCond(); + } + } + + void ProcessMsg() + { + if (!work_node_ && top_node_) { + msg_mutex_->lock(); + work_node_ = top_node_; + top_node_ = nullptr; + bot_node_ = nullptr; + msg_mutex_->unlock(); + } + while (work_node_) { + AsyncHttpNode *pdelnode = work_node_; + work_node_ = work_node_->nextnode; + ProcAsyncHttp(pdelnode); + delete pdelnode; + } + } + + void WaitLoopCond() + { + std::unique_lock lk(*loop_mutex_); + { + msg_mutex_->lock(); + if (!work_node_ && top_node_) { + work_node_ = top_node_; + top_node_ = nullptr; + bot_node_ = nullptr; + } + msg_mutex_->unlock(); + } + if (!work_node_) { + loop_cond_->wait_for(lk, std::chrono::seconds(10)); + } + } + + + void ProcAsyncHttp(AsyncHttpNode* node) + { + } + + public: + int exec_async_http_msgid = 0; + + private: + std::mutex *loop_mutex_ = nullptr; + std::condition_variable *loop_cond_ = nullptr; + + std::thread *work_thread_ = nullptr; + AsyncHttpNode *top_node_ = nullptr; + AsyncHttpNode *bot_node_ = nullptr; + AsyncHttpNode *work_node_ = nullptr; + std::mutex *msg_mutex_ = nullptr; + }; + + class HttpClientPoolImpl + { + public: + + void Init() + { + curr_seqid = 1000001; + exec_async_http_msgid = MsgQueue::Instance()->AllocIMMsgId(); + } + + void SetThreadNum(int thread_num) + { + assert(thread_num > 0); + for (int i = 0; i < thread_num; i++) { + HttpThread *http_thread = new HttpThread(); + http_thread->exec_async_http_msgid = exec_async_http_msgid; + http_thread->Init(); + http_thread_pool.push_back(http_thread); + } + } + + AsyncHttpRequest* GetAsyncHttpRequest(long long seqid) + { + auto itr = async_http_hash.find(seqid); + return itr != async_http_hash.end() ? itr->second : nullptr; + } + + void AsyncHttpOnOk(long long seqid, a8::XObject& data) + { + AsyncHttpRequest* request = GetAsyncHttpRequest(seqid); + if (!request) { + return; + } + if (request->on_ok) { + request->on_ok(request->param, data); + } + async_http_hash.erase(seqid); + delete request; + } + + void AsyncHttpOnError(long long seqid, const std::string& response) + { + AsyncHttpRequest* request = GetAsyncHttpRequest(seqid); + if (!request) { + return; + } + if (request->on_error) { + request->on_error(request->param, response); + } + async_http_hash.erase(seqid); + delete request; + } + + void InternalExecAsyncHttp(const char* url, a8::XObject url_params, + a8::XParams param, AsyncHttpOnOkFunc on_ok, AsyncHttpOnErrorFunc on_error, long long hash_code) + + { + AsyncHttpRequest* p = new AsyncHttpRequest(); + { + p->context_id = ++curr_seqid; + p->param = param; + p->add_time = time(nullptr); + p->on_ok = on_ok; + p->on_error = on_error; + async_http_hash[p->context_id] = p; + } + HttpThread* http_thread = GetHttpThread(hash_code); + #if 0 + if (!http_thread) { + MsgQueue::Instance()->PostMsg_r(exec_async_http_msgid, + a8::XParams() + .SetSender(p->context_id) + .SetParam1(AQE_CONN_ERROR)); + return; + } + #endif + { + AsyncHttpNode* node = new AsyncHttpNode(); + node->socket_handle = 0; + node->context_id = p->context_id; + http_thread->AddAsyncHttp(node); + } + a8::Timer::Instance()->AddDeadLineTimerAndAttach(1000 * 10, + a8::XParams() + .SetSender(this) + .SetParam1(p->context_id), + [] (const a8::XParams& param) + { + + }, + &p->timer_attacher.timer_list_); + } + + HttpThread* GetHttpThread(long long hash_code) + { + if (http_thread_pool.empty() || hash_code < 0) { + return nullptr; + } + return http_thread_pool[hash_code % http_thread_pool.size()]; + } + + public: + long long curr_seqid = 0; + std::map async_http_hash; + + unsigned short exec_async_http_msgid = 0; + std::vector http_thread_pool; + }; + + void HttpClientPool::Init() + { + impl_ = new HttpClientPoolImpl(); + impl_->Init(); + MsgQueue::Instance()->RegisterCallBack(impl_->exec_async_http_msgid, + [] (const a8::XParams& param) + { + #if 0 + if (param.param1.GetInt() == AQE_NO_ERROR) { + DataSet* data_set = (DataSet*)param.param2.GetUserData(); + HttpClientPool::Instance()->impl_->AsyncHttpOnOk(param.sender, data_set); + delete data_set; + } else { + HttpClientPool::Instance()->impl_->AsyncHttpOnError(param.sender, + param.param1, + param.param2); + } + #endif + } + ); + } + + void HttpClientPool::UnInit() + { + delete impl_; + impl_ = nullptr; + } + + void HttpClientPool::SetThreadNum(int thread_num) + { + impl_->SetThreadNum(thread_num); + } + + void HttpClientPool::HttpGet(const char* url, a8::XObject url_params, + a8::XParams param, AsyncHttpOnOkFunc on_ok, AsyncHttpOnErrorFunc on_error, long long hash_code) + { + + } + +} diff --git a/cpp/httpclientpool.h b/cpp/httpclientpool.h new file mode 100644 index 0000000..dad94d8 --- /dev/null +++ b/cpp/httpclientpool.h @@ -0,0 +1,29 @@ +#pragma once + +namespace f8 +{ + + typedef void (*AsyncHttpOnOkFunc)(a8::XParams& param, a8::XObject& data); + typedef void (*AsyncHttpOnErrorFunc)(a8::XParams& param, const std::string& response); + + class HttpClientPoolImpl; + class HttpClientPool : public a8::Singleton + { + private: + HttpClientPool() {}; + friend class a8::Singleton; + + public: + void Init(); + void UnInit(); + void SetThreadNum(int thread_num); + + //执行异步http get + void HttpGet(const char* url, a8::XObject url_params, + a8::XParams param, AsyncHttpOnOkFunc on_ok, AsyncHttpOnErrorFunc on_error, long long hash_code = 0); + + private: + HttpClientPoolImpl* impl_ = nullptr; + }; + +}