b-site/vendor/github.com/go-ego/riot/indexer_worker.go
2019-04-22 02:59:20 +00:00

167 lines
4.2 KiB
Go

// Copyright 2013 Hui Chen
// Copyright 2016 ego authors
//
// Licensed under the Apache License, Version 2.0 (the "License"): you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations
// under the License.
package riot
import (
"sync/atomic"
"github.com/go-ego/riot/types"
)
type indexerAddDocReq struct {
doc *types.DocIndex
forceUpdate bool
}
type indexerLookupReq struct {
countDocsOnly bool
tokens []string
labels []string
docIds map[uint64]bool
options types.RankOpts
rankerReturnChan chan rankerReturnReq
orderless bool
logic types.Logic
}
type indexerRemoveDocReq struct {
docId uint64
forceUpdate bool
}
func (engine *Engine) indexerAddDocWorker(shard int) {
for {
request := <-engine.indexerAddDocChans[shard]
engine.indexers[shard].AddDocToCache(request.doc, request.forceUpdate)
if request.doc != nil {
atomic.AddUint64(&engine.numTokenIndexAdded,
uint64(len(request.doc.Keywords)))
engine.loc.Lock()
atomic.AddUint64(&engine.numDocsIndexed, 1)
// engine.numDocsIndexed++
engine.loc.Unlock()
}
if request.forceUpdate {
engine.loc.Lock()
atomic.AddUint64(&engine.numDocsForceUpdated, 1)
engine.loc.Unlock()
}
}
}
func (engine *Engine) indexerRemoveDocWorker(shard int) {
for {
request := <-engine.indexerRemoveDocChans[shard]
engine.indexers[shard].RemoveDocToCache(request.docId, request.forceUpdate)
if request.docId != 0 {
engine.loc.Lock()
atomic.AddUint64(&engine.numDocsRemoved, 1)
engine.loc.Unlock()
}
if request.forceUpdate {
engine.loc.Lock()
atomic.AddUint64(&engine.numDocsForceUpdated, 1)
engine.loc.Unlock()
}
}
}
func (engine *Engine) orderLess(
request indexerLookupReq, docs []types.IndexedDoc) {
if engine.initOptions.IDOnly {
var outputDocs []types.ScoredID
// var outputDocs types.ScoredIDs
for _, d := range docs {
outputDocs = append(outputDocs, types.ScoredID{
DocId: d.DocId,
TokenSnippetLocs: d.TokenSnippetLocs,
TokenLocs: d.TokenLocs})
}
request.rankerReturnChan <- rankerReturnReq{
docs: types.ScoredIDs(outputDocs),
numDocs: len(outputDocs),
}
return
}
var outputDocs []types.ScoredDoc
// var outputDocs types.ScoredDocs
for _, d := range docs {
outputDocs = append(outputDocs, types.ScoredDoc{
DocId: d.DocId,
TokenSnippetLocs: d.TokenSnippetLocs,
TokenLocs: d.TokenLocs})
}
request.rankerReturnChan <- rankerReturnReq{
docs: types.ScoredDocs(outputDocs),
numDocs: len(outputDocs),
}
}
func (engine *Engine) indexerLookupWorker(shard int) {
for {
request := <-engine.indexerLookupChans[shard]
var (
docs []types.IndexedDoc
numDocs int
)
if request.docIds == nil {
docs, numDocs = engine.indexers[shard].Lookup(
request.tokens, request.labels,
nil, request.countDocsOnly, request.logic)
// docs, numDocs = engine.indexers[shard].Lookup(request.tokens,
// request.labels, nil, request.countDocsOnly)
} else {
docs, numDocs = engine.indexers[shard].Lookup(
request.tokens, request.labels,
request.docIds, request.countDocsOnly, request.logic)
// docs, numDocs = engine.indexers[shard].Lookup(request.tokens,
// request.labels, request.docIds, request.countDocsOnly)
}
if request.countDocsOnly {
request.rankerReturnChan <- rankerReturnReq{numDocs: numDocs}
continue
}
if len(docs) == 0 {
request.rankerReturnChan <- rankerReturnReq{}
continue
}
if request.orderless {
// var outputDocs interface{}
engine.orderLess(request, docs)
continue
}
rankerRequest := rankerRankReq{
countDocsOnly: request.countDocsOnly,
docs: docs,
options: request.options,
rankerReturnChan: request.rankerReturnChan,
}
engine.rankerRankChans[shard] <- rankerRequest
}
}