aozhiwei 99bce9d286 1
2024-06-28 14:45:13 +08:00

253 lines
7.2 KiB
Go

package task
import (
"q5"
"f5"
"mt"
"jccommon"
"main/constant"
"main/service"
"time"
"strings"
)
type webHook struct {
}
func (this* webHook) init() {
go this.loadWebHookEvent(jccommon.ORDER_UPDATE_EVENT, this.orderUpdatedCb)
go this.loadWebHookEvent(jccommon.ACTIVITY_SALE_EVENT, this.activitySaleCb)
}
func (this* webHook) unInit() {
}
func (this* webHook) loadWebHookEvent(eventName string, cb func(ds *f5.DataSet) bool) {
time.Sleep(time.Second * 3)
lastOutTick := q5.GetTickCount()
var lastSyncIdx = this.getLastIdx(eventName)
for true {
if lastSyncIdx < 0 {
lastSyncIdx = this.getLastIdx(eventName)
if lastSyncIdx >= 0 {
continue
}
} else {
var newLastSyncIdx int64
f5.GetGoStyleDb().RawQuery(
constant.BCEVENT_DB,
"SELECT * FROM t_webhook_event WHERE idx > ? AND event_name = ? LIMIT 1000",
[]string{
q5.ToString(lastSyncIdx),
eventName,
},
func (err error, ds *f5.DataSet) {
if err == nil {
for ds.Next() {
idx := q5.ToInt64(ds.GetByName("idx"))
if cb(ds) {
if idx > newLastSyncIdx {
newLastSyncIdx = idx
}
} else {
break
}
}
}
})
if newLastSyncIdx > lastSyncIdx {
lastSyncIdx = newLastSyncIdx
this.saveLastIdx(eventName, lastSyncIdx)
}
}
if q5.GetTickCount() - lastOutTick > 1000 * 15 {
lastOutTick = q5.GetTickCount()
f5.GetSysLog().Info("webhook %s last_idx:%d", eventName, lastSyncIdx)
}
time.Sleep(time.Second * 3)
}
}
func (this* webHook) getLastIdx(eventName string) int64 {
var lastSyncIdx int64 = -1
f5.GetGoStyleDb().OrmSelectOne(
constant.BCEVENT_DB,
"t_webhook_process_last_idx",
[][]string{
{"event_name", eventName},
},
func (err error, ds *f5.DataSet) {
if err != nil {
return
}
if ds.Next() {
lastSyncIdx = q5.ToInt64(ds.GetByName("last_idx"))
} else {
lastSyncIdx = 0
}
})
return lastSyncIdx
}
func (this* webHook) saveLastIdx(eventName string, lastIdx int64) bool {
var result bool = false
nowTime := f5.GetApp().GetRealSeconds()
f5.GetGoStyleDb().Upsert(
constant.BCEVENT_DB,
"t_webhook_process_last_idx",
[][]string{
{"event_name", eventName},
},
[][]string{
{"last_idx", q5.ToString(lastIdx)},
},
[][]string{
{"event_name", eventName},
{"last_idx", q5.ToString(lastIdx)},
{"createtime", q5.ToString(nowTime)},
{"modifytime", q5.ToString(nowTime)},
},
func (err error, lastInsertId int64, rowsAffected int64) {
result = err == nil
})
return result
}
func (this* webHook) orderUpdatedCb(ds *f5.DataSet) bool {
f5.GetSysLog().Info("orderUpdate event_id:%s start", ds.GetByName("event_id"))
nowTime := f5.GetApp().GetRealSeconds()
rawData := ds.GetByName("raw_data")
p := new(jccommon.OrderUpdatedEvent)
err := q5.DecodeJson(rawData, &p)
netId := mt.Table.Config.GetById(0).GetNetId()
if err != nil {
f5.GetSysLog().Warning("orderUpdate event_id:%s event_data parse error :%s",
ds.GetByName("event_id"), err)
return false
}
var resultErr error
var itemId int32
var heroQuality int32
service.GetItemIdHeroQuality(netId, p.Data.Sell[0].ContractAddress, p.Data.Sell[0].TokenId,
&itemId, &heroQuality)
startTime := q5.SmartParseTimeToMills(q5.SafeToString(p.Data.StartAt))
endTime := q5.SmartParseTimeToMills(q5.SafeToString(p.Data.EndAt))
createdAt := q5.SmartParseTimeToMills(q5.SafeToString(p.Data.CreatedAt))
updatedAt := q5.SmartParseTimeToMills(q5.SafeToString(p.Data.UpdatedAt))
srcPriceAmount := p.Data.Buy[0].Amount
srcPriceContractAddress := p.Data.Buy[0].ContractAddress
srcPriceItemType := p.Data.Buy[0].ItemType
srcPriceExchangeRate, price := service.BcCurrency.ExchangeUSD(
srcPriceAmount, srcPriceItemType, srcPriceContractAddress)
commonFields := [][]string{
{"order_id", p.Data.Id},
{"chain", p.Chain},
{"status", p.Data.Status.Name},
{"seller_address", strings.ToLower(p.Data.AccountAddress)},
{"item_id", q5.ToString(itemId)},
{"hero_quality", q5.ToString(heroQuality)},
{"start_time", q5.ToString(startTime)},
{"end_time", q5.ToString(endTime)},
{"created_at", q5.ToString(createdAt)},
{"updated_at", q5.ToString(updatedAt)},
{"event_id", p.EventId},
{"event_data", rawData},
{"modifytime", q5.ToString(nowTime)},
{"src_price_amount", srcPriceAmount},
{"src_price_contract_address", srcPriceContractAddress},
{"src_price_item_type", srcPriceItemType},
{"price", price},
{"price_len", q5.ToString(len(price))},
{"src_price_exchange_rate", q5.ToString(srcPriceExchangeRate)},
}
updateFields := commonFields[0:]
insertFields := commonFields[0:]
q5.AppendSlice(&insertFields, []string{"net_id", q5.ToString(netId)})
q5.AppendSlice(&insertFields, []string{"token_id", p.Data.Sell[0].TokenId})
q5.AppendSlice(&insertFields, []string{"contract_address",
strings.ToLower(p.Data.Sell[0].ContractAddress)})
q5.AppendSlice(&insertFields, []string{"createtime", q5.ToString(nowTime)})
f5.GetGoStyleDb().UpsertEx(
constant.BCNFT_DB,
"t_order",
[][]string{
{"net_id", q5.ToString(netId)},
{"contract_address", strings.ToLower(p.Data.Sell[0].ContractAddress)},
{"token_id", p.Data.Sell[0].TokenId},
},
updateFields,
insertFields,
func (err error, lastInsertId int64, rowsAffected int64) {
resultErr = err
if err != nil {
return
}
},
func (ds *f5.DataSet) bool {
return p.EventId > ds.GetByName("event_id")
})
f5.GetSysLog().Info("orderUpdate event_id:%s end", ds.GetByName("event_id"))
return resultErr == nil
}
func (this* webHook) activitySaleCb(ds *f5.DataSet) bool {
f5.GetSysLog().Info("activitySale event_id:%s start", ds.GetByName("event_id"))
nowTime := f5.GetApp().GetRealSeconds()
rawData := ds.GetByName("raw_data")
p := new(jccommon.ActivitySaleEvent)
err := q5.DecodeJson(rawData, &p)
netId := mt.Table.Config.GetById(0).GetNetId()
if err != nil {
f5.GetSysLog().Warning("activitySale event_id:%s event_data parse error :%s",
ds.GetByName("event_id"), err)
return false
}
var resultErr error
var itemId int32
var heroQuality int32
service.GetItemIdHeroQuality(netId,
p.Data.Details.Asset[0].ContractAddress,
p.Data.Details.Asset[0].TokenId,
&itemId, &heroQuality)
indexedAt := q5.SmartParseTimeToMills(q5.SafeToString(p.Data.IndexedAt))
updateFields := [][]string{
}
insertFields := [][]string{
{"order_id", p.Data.Details.OrderId},
{"chain", p.Chain},
{"net_id", q5.ToString(netId)},
{"token_id", p.Data.Details.Asset[0].TokenId},
{"contract_address", strings.ToLower(p.Data.Details.Asset[0].ContractAddress)},
{"item_id", q5.ToString(itemId)},
{"hero_quality", q5.ToString(heroQuality)},
{"buyer", p.Data.Details.From},
{"seller", p.Data.Details.To},
{"indexed_at", q5.ToString(indexedAt)},
{"event_id", p.EventId},
{"event_data", rawData},
{"createtime", q5.ToString(nowTime)},
{"modifytime", q5.ToString(nowTime)},
}
f5.GetGoStyleDb().UpsertEx(
constant.BCNFT_DB,
"t_sale",
[][]string{
{"order_id", p.Data.Details.OrderId},
},
updateFields,
insertFields,
func (err error, lastInsertId int64, rowsAffected int64) {
resultErr = err
if err != nil {
return
}
},
func (ds *f5.DataSet) bool {
return p.EventId > ds.GetByName("event_id")
})
f5.GetSysLog().Info("activitySale event_id:%s end", ds.GetByName("event_id"))
return resultErr == nil
}