add httpclientpool

This commit is contained in:
aozhiwei 2018-12-22 11:55:09 +08:00
parent 031d41eef1
commit 1c89ff944c
2 changed files with 313 additions and 0 deletions

284
cpp/httpclientpool.cc Normal file
View File

@ -0,0 +1,284 @@
#include "precompile.h"
#include <thread>
#include <mutex>
#include <condition_variable>
#include <a8/list.h>
#include <a8/timer.h>
#include <a8/timer_attacher.h>
#include <a8/udplog.h>
#include <a8/mutable_xobject.h>
#include "framework/cpp/httpclientpool.h"
#include "framework/cpp/msgqueue.h"
#include "framework/cpp/utils.h"
namespace f8
{
enum AsyncHttpError
{
AHE_NO_ERROR = 0,
};
struct AsyncHttpRequest
{
long long context_id = 0;
a8::XParams param;
time_t add_time = 0;
AsyncHttpOnOkFunc on_ok = nullptr;
AsyncHttpOnErrorFunc on_error = nullptr;
a8::TimerAttacher timer_attacher;
};
struct AsyncHttpNode
{
int socket_handle = 0;
long long context_id = 0;
AsyncHttpNode* nextnode = nullptr;
};
class HttpThread
{
public:
void Init()
{
loop_mutex_ = new std::mutex();
loop_cond_ = new std::condition_variable();
top_node_ = nullptr;
bot_node_ = nullptr;
work_node_ = nullptr;
msg_mutex_ = new std::mutex();
work_thread_ = new std::thread(&HttpThread::WorkThreadProc, this);
}
void AddAsyncHttp(AsyncHttpNode* p)
{
std::unique_lock<std::mutex> lk(*loop_mutex_);
msg_mutex_->lock();
if (bot_node_) {
bot_node_->nextnode = p;
bot_node_ = p;
} else {
top_node_ = p;
bot_node_ = p;
}
msg_mutex_->unlock();
loop_cond_->notify_all();
}
private:
void WorkThreadProc()
{
while (true) {
ProcessMsg();
WaitLoopCond();
}
}
void ProcessMsg()
{
if (!work_node_ && top_node_) {
msg_mutex_->lock();
work_node_ = top_node_;
top_node_ = nullptr;
bot_node_ = nullptr;
msg_mutex_->unlock();
}
while (work_node_) {
AsyncHttpNode *pdelnode = work_node_;
work_node_ = work_node_->nextnode;
ProcAsyncHttp(pdelnode);
delete pdelnode;
}
}
void WaitLoopCond()
{
std::unique_lock<std::mutex> lk(*loop_mutex_);
{
msg_mutex_->lock();
if (!work_node_ && top_node_) {
work_node_ = top_node_;
top_node_ = nullptr;
bot_node_ = nullptr;
}
msg_mutex_->unlock();
}
if (!work_node_) {
loop_cond_->wait_for(lk, std::chrono::seconds(10));
}
}
void ProcAsyncHttp(AsyncHttpNode* node)
{
}
public:
int exec_async_http_msgid = 0;
private:
std::mutex *loop_mutex_ = nullptr;
std::condition_variable *loop_cond_ = nullptr;
std::thread *work_thread_ = nullptr;
AsyncHttpNode *top_node_ = nullptr;
AsyncHttpNode *bot_node_ = nullptr;
AsyncHttpNode *work_node_ = nullptr;
std::mutex *msg_mutex_ = nullptr;
};
class HttpClientPoolImpl
{
public:
void Init()
{
curr_seqid = 1000001;
exec_async_http_msgid = MsgQueue::Instance()->AllocIMMsgId();
}
void SetThreadNum(int thread_num)
{
assert(thread_num > 0);
for (int i = 0; i < thread_num; i++) {
HttpThread *http_thread = new HttpThread();
http_thread->exec_async_http_msgid = exec_async_http_msgid;
http_thread->Init();
http_thread_pool.push_back(http_thread);
}
}
AsyncHttpRequest* GetAsyncHttpRequest(long long seqid)
{
auto itr = async_http_hash.find(seqid);
return itr != async_http_hash.end() ? itr->second : nullptr;
}
void AsyncHttpOnOk(long long seqid, a8::XObject& data)
{
AsyncHttpRequest* request = GetAsyncHttpRequest(seqid);
if (!request) {
return;
}
if (request->on_ok) {
request->on_ok(request->param, data);
}
async_http_hash.erase(seqid);
delete request;
}
void AsyncHttpOnError(long long seqid, const std::string& response)
{
AsyncHttpRequest* request = GetAsyncHttpRequest(seqid);
if (!request) {
return;
}
if (request->on_error) {
request->on_error(request->param, response);
}
async_http_hash.erase(seqid);
delete request;
}
void InternalExecAsyncHttp(const char* url, a8::XObject url_params,
a8::XParams param, AsyncHttpOnOkFunc on_ok, AsyncHttpOnErrorFunc on_error, long long hash_code)
{
AsyncHttpRequest* p = new AsyncHttpRequest();
{
p->context_id = ++curr_seqid;
p->param = param;
p->add_time = time(nullptr);
p->on_ok = on_ok;
p->on_error = on_error;
async_http_hash[p->context_id] = p;
}
HttpThread* http_thread = GetHttpThread(hash_code);
#if 0
if (!http_thread) {
MsgQueue::Instance()->PostMsg_r(exec_async_http_msgid,
a8::XParams()
.SetSender(p->context_id)
.SetParam1(AQE_CONN_ERROR));
return;
}
#endif
{
AsyncHttpNode* node = new AsyncHttpNode();
node->socket_handle = 0;
node->context_id = p->context_id;
http_thread->AddAsyncHttp(node);
}
a8::Timer::Instance()->AddDeadLineTimerAndAttach(1000 * 10,
a8::XParams()
.SetSender(this)
.SetParam1(p->context_id),
[] (const a8::XParams& param)
{
},
&p->timer_attacher.timer_list_);
}
HttpThread* GetHttpThread(long long hash_code)
{
if (http_thread_pool.empty() || hash_code < 0) {
return nullptr;
}
return http_thread_pool[hash_code % http_thread_pool.size()];
}
public:
long long curr_seqid = 0;
std::map<long long, AsyncHttpRequest*> async_http_hash;
unsigned short exec_async_http_msgid = 0;
std::vector<HttpThread*> http_thread_pool;
};
void HttpClientPool::Init()
{
impl_ = new HttpClientPoolImpl();
impl_->Init();
MsgQueue::Instance()->RegisterCallBack(impl_->exec_async_http_msgid,
[] (const a8::XParams& param)
{
#if 0
if (param.param1.GetInt() == AQE_NO_ERROR) {
DataSet* data_set = (DataSet*)param.param2.GetUserData();
HttpClientPool::Instance()->impl_->AsyncHttpOnOk(param.sender, data_set);
delete data_set;
} else {
HttpClientPool::Instance()->impl_->AsyncHttpOnError(param.sender,
param.param1,
param.param2);
}
#endif
}
);
}
void HttpClientPool::UnInit()
{
delete impl_;
impl_ = nullptr;
}
void HttpClientPool::SetThreadNum(int thread_num)
{
impl_->SetThreadNum(thread_num);
}
void HttpClientPool::HttpGet(const char* url, a8::XObject url_params,
a8::XParams param, AsyncHttpOnOkFunc on_ok, AsyncHttpOnErrorFunc on_error, long long hash_code)
{
}
}

29
cpp/httpclientpool.h Normal file
View File

@ -0,0 +1,29 @@
#pragma once
namespace f8
{
typedef void (*AsyncHttpOnOkFunc)(a8::XParams& param, a8::XObject& data);
typedef void (*AsyncHttpOnErrorFunc)(a8::XParams& param, const std::string& response);
class HttpClientPoolImpl;
class HttpClientPool : public a8::Singleton<HttpClientPool>
{
private:
HttpClientPool() {};
friend class a8::Singleton<HttpClientPool>;
public:
void Init();
void UnInit();
void SetThreadNum(int thread_num);
//执行异步http get
void HttpGet(const char* url, a8::XObject url_params,
a8::XParams param, AsyncHttpOnOkFunc on_ok, AsyncHttpOnErrorFunc on_error, long long hash_code = 0);
private:
HttpClientPoolImpl* impl_ = nullptr;
};
}