httpclientpool ok

This commit is contained in:
aozhiwei 2018-12-22 14:09:11 +08:00
parent 1c89ff944c
commit 6313ff569f

View File

@ -8,6 +8,7 @@
#include <a8/timer.h> #include <a8/timer.h>
#include <a8/timer_attacher.h> #include <a8/timer_attacher.h>
#include <a8/udplog.h> #include <a8/udplog.h>
#include <a8/curl.h>
#include <a8/mutable_xobject.h> #include <a8/mutable_xobject.h>
#include "framework/cpp/httpclientpool.h" #include "framework/cpp/httpclientpool.h"
@ -20,6 +21,7 @@ namespace f8
enum AsyncHttpError enum AsyncHttpError
{ {
AHE_NO_ERROR = 0, AHE_NO_ERROR = 0,
AHE_NO_CONN = 1,
}; };
struct AsyncHttpRequest struct AsyncHttpRequest
@ -27,6 +29,8 @@ namespace f8
long long context_id = 0; long long context_id = 0;
a8::XParams param; a8::XParams param;
time_t add_time = 0; time_t add_time = 0;
std::string url;
std::string url_params;
AsyncHttpOnOkFunc on_ok = nullptr; AsyncHttpOnOkFunc on_ok = nullptr;
AsyncHttpOnErrorFunc on_error = nullptr; AsyncHttpOnErrorFunc on_error = nullptr;
a8::TimerAttacher timer_attacher; a8::TimerAttacher timer_attacher;
@ -36,6 +40,8 @@ namespace f8
{ {
int socket_handle = 0; int socket_handle = 0;
long long context_id = 0; long long context_id = 0;
std::string url;
std::string url_params;
AsyncHttpNode* nextnode = nullptr; AsyncHttpNode* nextnode = nullptr;
}; };
@ -117,6 +123,38 @@ namespace f8
void ProcAsyncHttp(AsyncHttpNode* node) void ProcAsyncHttp(AsyncHttpNode* node)
{ {
std::string finally_url;
if (node->url.find('?') != std::string::npos) {
finally_url = node->url + node->url_params;
} else {
finally_url = node->url + "?" + node->url_params;
}
std::string response;
if (a8::http::Get(finally_url, response, 10)) {
a8::XObject* xobj = new a8::XObject();
if (xobj->ReadFromJsonString(response)) {
f8::MsgQueue::Instance()->PostMsg_r(exec_async_http_msgid,
a8::XParams()
.SetSender(node->context_id)
.SetParam1(AHE_NO_ERROR)
.SetParam2((void*)xobj)
);
} else {
f8::MsgQueue::Instance()->PostMsg_r(exec_async_http_msgid,
a8::XParams()
.SetSender(node->context_id)
.SetParam1(AHE_NO_CONN)
.SetParam2(response)
);
delete xobj;
}
} else {
f8::MsgQueue::Instance()->PostMsg_r(exec_async_http_msgid,
a8::XParams()
.SetSender(node->context_id)
.SetParam1(AHE_NO_CONN)
);
}
} }
public: public:
@ -186,7 +224,7 @@ namespace f8
delete request; delete request;
} }
void InternalExecAsyncHttp(const char* url, a8::XObject url_params, void InternalExecAsyncHttp(const char* url, a8::XObject& url_params,
a8::XParams param, AsyncHttpOnOkFunc on_ok, AsyncHttpOnErrorFunc on_error, long long hash_code) a8::XParams param, AsyncHttpOnOkFunc on_ok, AsyncHttpOnErrorFunc on_error, long long hash_code)
{ {
@ -195,24 +233,23 @@ namespace f8
p->context_id = ++curr_seqid; p->context_id = ++curr_seqid;
p->param = param; p->param = param;
p->add_time = time(nullptr); p->add_time = time(nullptr);
p->url = url;
url_params.ToUrlEncodeStr(p->url_params);
p->on_ok = on_ok; p->on_ok = on_ok;
p->on_error = on_error; p->on_error = on_error;
async_http_hash[p->context_id] = p; async_http_hash[p->context_id] = p;
} }
HttpThread* http_thread = GetHttpThread(hash_code); HttpThread* http_thread = GetHttpThread(hash_code);
#if 0
if (!http_thread) { if (!http_thread) {
MsgQueue::Instance()->PostMsg_r(exec_async_http_msgid, abort();
a8::XParams()
.SetSender(p->context_id)
.SetParam1(AQE_CONN_ERROR));
return; return;
} }
#endif
{ {
AsyncHttpNode* node = new AsyncHttpNode(); AsyncHttpNode* node = new AsyncHttpNode();
node->socket_handle = 0; node->socket_handle = 0;
node->context_id = p->context_id; node->context_id = p->context_id;
node->url = url;
url_params.ToUrlEncodeStr(node->url_params);
http_thread->AddAsyncHttp(node); http_thread->AddAsyncHttp(node);
} }
a8::Timer::Instance()->AddDeadLineTimerAndAttach(1000 * 10, a8::Timer::Instance()->AddDeadLineTimerAndAttach(1000 * 10,
@ -249,17 +286,14 @@ namespace f8
MsgQueue::Instance()->RegisterCallBack(impl_->exec_async_http_msgid, MsgQueue::Instance()->RegisterCallBack(impl_->exec_async_http_msgid,
[] (const a8::XParams& param) [] (const a8::XParams& param)
{ {
#if 0 if (param.param1.GetInt() == AHE_NO_ERROR) {
if (param.param1.GetInt() == AQE_NO_ERROR) { a8::XObject* xobj = (a8::XObject*)param.param2.GetUserData();
DataSet* data_set = (DataSet*)param.param2.GetUserData(); HttpClientPool::Instance()->impl_->AsyncHttpOnOk(param.sender, *xobj);
HttpClientPool::Instance()->impl_->AsyncHttpOnOk(param.sender, data_set); delete xobj;
delete data_set;
} else { } else {
HttpClientPool::Instance()->impl_->AsyncHttpOnError(param.sender, HttpClientPool::Instance()->impl_->AsyncHttpOnError(param.sender,
param.param1, param.param2.GetString());
param.param2);
} }
#endif
} }
); );
} }
@ -278,7 +312,7 @@ namespace f8
void HttpClientPool::HttpGet(const char* url, a8::XObject url_params, void HttpClientPool::HttpGet(const char* url, a8::XObject url_params,
a8::XParams param, AsyncHttpOnOkFunc on_ok, AsyncHttpOnErrorFunc on_error, long long hash_code) a8::XParams param, AsyncHttpOnOkFunc on_ok, AsyncHttpOnErrorFunc on_error, long long hash_code)
{ {
impl_->InternalExecAsyncHttp(url, url_params, param, on_ok, on_error, hash_code);
} }
} }