cybertools/catalog/query.py

265 lines
6.6 KiB
Python

#
# Copyright (c) 2011 Helmut Merz helmutm@cy55.de
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
"""
Catalog query terms and their logical combinations.
This is mainly a simplified version of Martijn Faassen's hurry.query
(http://cheeseshop.python.org/pypi/hurry.query).
$Id$
"""
from BTrees.IFBTree import weightedIntersection, weightedUnion
from BTrees.IFBTree import difference, IFBTree, IFBucket, IFSet
from BTrees.IIBTree import IISet, union
from zope.app.catalog.catalog import ResultSet
from zope.app.catalog.field import IFieldIndex
from zope.app.catalog.text import ITextIndex
from zope.app.catalog.interfaces import ICatalog
from zope import component
from zope.intid.interfaces import IIntIds
from cybertools.catalog.keyword import IKeywordIndex
class Term(object):
def __and__(self, other):
return And(self, other)
def __rand__(self, other):
return And(other, self)
def __or__(self, other):
return Or(self, other)
def __ror__(self, other):
return Or(other, self)
def __invert__(self):
return Not(self)
class And(Term):
def __init__(self, *terms):
self.terms = terms
def apply(self):
results = []
for term in self.terms:
r = term.apply()
if not r:
# empty results
return r
results.append((len(r), r))
if not results:
# no applicable terms at all
return IFBucket()
results.sort()
_, result = results.pop(0)
for _, r in results:
w, result = weightedIntersection(result, r)
return result
class Or(Term):
def __init__(self, *terms):
self.terms = terms
def apply(self):
results = []
for term in self.terms:
r = term.apply()
# empty results
if not r:
continue
results.append(r)
if not results:
# no applicable terms at all
return IFBucket()
result = results.pop(0)
for r in results:
w, result = weightedUnion(result, r)
return result
class Not(Term):
def __init__(self, term):
self.term = term
def apply(self):
return difference(self._all(), self.term.apply())
def _all(self):
# XXX may not work well/be efficient with extentcatalog
# XXX not very efficient in general, better to use internal
# IntIds datastructure but that would break abstraction..
intids = component.getUtility(IIntIds)
result = IFBucket()
for uid in intids:
result[uid] = 0
return result
class IndexTerm(Term):
def __init__(self, (catalog_name, index_name)):
self.catalog_name = catalog_name
self.index_name = index_name
def getIndex(self):
catalog = component.getUtility(ICatalog, self.catalog_name)
index = catalog[self.index_name]
return index
# field index
class FieldTerm(IndexTerm):
def getIndex(self):
index = super(FieldTerm, self).getIndex()
assert IFieldIndex.providedBy(index)
return index
class Eq(FieldTerm):
def __init__(self, index_id, value):
assert value is not None
super(Eq, self).__init__(index_id)
self.value = value
def apply(self):
return self.getIndex().apply((self.value, self.value))
class NotEq(FieldTerm):
def __init__(self, index_id, not_value):
super(NotEq, self).__init__(index_id)
self.not_value = not_value
def apply(self):
index = self.getIndex()
all = index.apply((None, None))
r = index.apply((self.not_value, self.not_value))
return difference(all, r)
class Between(FieldTerm):
def __init__(self, index_id, min_value, max_value):
super(Between, self).__init__(index_id)
self.min_value = min_value
self.max_value = max_value
def apply(self):
return self.getIndex().apply((self.min_value, self.max_value))
class Ge(Between):
def __init__(self, index_id, min_value):
super(Ge, self).__init__(index_id, min_value, None)
class Le(Between):
def __init__(self, index_id, max_value):
super(Le, self).__init__(index_id, None, max_value)
class In(FieldTerm):
def __init__(self, index_id, values):
assert None not in values
super(In, self).__init__(index_id)
self.values = values
def apply(self):
results = []
index = self.getIndex()
for value in self.values:
r = index.apply((value, value))
# empty results
if not r:
continue
results.append(r)
if not results:
# no applicable terms at all
return IFBucket()
result = results.pop(0)
for r in results:
w, result = weightedUnion(result, r)
return result
# text index
class Text(IndexTerm):
def __init__(self, index_id, text):
super(Text, self).__init__(index_id)
self.text = text
def getIndex(self):
index = super(Text, self).getIndex()
assert ITextIndex.providedBy(index)
return index
def apply(self):
index = self.getIndex()
return index.apply(self.text)
# keyword index
class KeywordTerm(IndexTerm):
def __init__(self, index_id, values):
super(KeywordTerm, self).__init__(index_id)
self.values = values
def getIndex(self):
index = super(KeywordTerm, self).getIndex()
#assert IKeywordIndex.providedBy(index)
return index
class AnyOf(KeywordTerm):
def apply(self):
result = self.getIndex().search(self.values, 'or')
if isinstance(result, IFSet):
return result
return IFSet(result)
class AllOf(KeywordTerm):
def apply(self):
result = self.getIndex().search(self.values, 'and')
if isinstance(result, IFSet):
return result
return IFSet(result)