Logo Stamkracht

Extending Haystack's Elasticsearch backend

Haystack provides an interface similar to Django’s QuerySet, which instead enables easy querying in one or more popular search backends.

Because the Haystack SearchQuerySet API is meant to hook up to several search backends, however, not all the functionality of the backends has been implemented in the API. In this article we show how Haystack's Elasticsearch backend can be extended with advanced querying functionality.

As an exemplary use case, we'll focus on implementing Elasticsearch's Nested Query in the SearchQuerySetAPI, to enable e.g. weighted tags on documents. The usage of this extended API will be shown first, after which we'll go through the necessary implementation steps.

ConfigurableSearchQuerySet API Usage

>>> import search.custom_elasticsearch as ces
>>> from files import FileObject

# Instantiate a ConfigurableSearchQuerySet
>>> sqs = ces.ConfigurableSearchQuerySet()

# A nested query for one tag
>>> sqs.nested(["milk"])
[<SearchResult: files.fileobject (pk=u'6')>, <SearchResult: files.fileobject (pk=u'10')>, <SearchResult: files.fileobject (pk=u'15')>, <SearchResult: wiki.wikiarticle (pk=u'5')>]

# Which we can combine with an ordinary keyword query
# Both queries, in this case, are wrapped in a bool "should" query
>>> sqs.nested(["milk"]).filter(text="yoghurt")
[<SearchResult: wiki.wikiarticle (pk=u'5')>, <SearchResult: files.fileobject (pk=u'6')>, <SearchResult: files.fileobject (pk=u'15')>, <SearchResult: files.fileobject (pk=u'10')>, <SearchResult: files.fileobject (pk=u'3')>, <SearchResult: files.fileobject (pk=u'5')>, <SearchResult: files.fileobject (pk=u'14')>, <SearchResult: files.fileobject (pk=u'13')>]

# We can use a custom score for a query...
>>> sqs.filter(text="isolation").custom_score("_score * doc['hotness']")
[<SearchResult: files.fileobject (pk=u'8')>, <SearchResult: wiki.wikiarticle (pk=u'1')>, <SearchResult: wiki.wikiarticle (pk=u'3')>, <SearchResult: files.fileobject (pk=u'16')>, <SearchResult: files.fileobject (pk=u'3')>, <SearchResult: files.fileobject (pk=u'15')>, <SearchResult: files.fileobject (pk=u'4')>, <SearchResult: files.fileobject (pk=u'14')>, <SearchResult: files.fileobject (pk=u'13')>]

# ...and still combine it with a nested (tag) query
>>> sqs.filter(text='fermented').custom_score('_score').nested(['heat'])
[<SearchResult: files.fileobject (pk=u'3')>, <SearchResult: wiki.wikiarticle (pk=u'15')>, <SearchResult: files.fileobject (pk=u'8')>, <SearchResult: files.fileobject (pk=u'16')>, <SearchResult: files.fileobject (pk=u'14')>, <SearchResult: files.fileobject (pk=u'6')>, <SearchResult: files.fileobject (pk=u'4')>, <SearchResult: files.fileobject (pk=u'13')>, <SearchResult: files.fileobject (pk=u'5')>]

# A filtered query can be added as outermost wrapping by using .models() and/or .narrow()
>>> sqs.nested(["heat"]).custom_score('_score').filter(text='dairy').models(FileObject)
[<SearchResult: files.fileobject (pk=u'3')>, <SearchResult: files.fileobject (pk=u'16')>, <SearchResult: files.fileobject (pk=u'8')>, <SearchResult: files.fileobject (pk=u'4')>, <SearchResult: files.fileobject (pk=u'15')>, <SearchResult: files.fileobject (pk=u'10')>, <SearchResult: files.fileobject (pk=u'13')>, <SearchResult: files.fileobject (pk=u'14')>, <SearchResult: files.fileobject (pk=u'6')>]

Extending Haystack's SearchQuerySet API

The basic approach for extending the API is to define some custom classes, which inherit from SearchQuerySet,SearchQuery, and SearchBackend and which override several existing methods and add a few new ones. Credits are due to Nathan Keilar, who was kind enough to share his initial solution to this problem.

It would be advisable to place the custom classes we will create in a new file, near your other search-related application code. In this article we will assume the location my_app.search.custom_elasticsearch.

Add a nested field to the mapping

The first thing to ensure is that our existing indexes can correctly update with nested fields. A TagsField needs to be defined, which will be picked up by ConfigurableElasticBackend.build_schema() so Elasticsearch can update its mapping and index nested fields properly. Unfortunately, we need to override build_schema() inelegantly to add a few lines at the right position. Finally, ConfigurableElasticsearchSearchQuery and ConfigurableElasticSearchEngine need to be defined, in order to change Haystack's backend in the configuration.

from haystack.fields import SearchField
from haystack.backends.elasticsearch_backend import ElasticsearchSearchBackend, ElasticsearchSearchQuery
from haystack.backends.elasticsearch_backend import ElasticsearchSearchEngine

class TagsField(SearchField):
    field_type = "nested"
    properties = {
                "tag": {
                    "type": "string",
                    "index": "not_analyzed",
                    "omit_norms": True,
                    "index_options": "docs"
                },
                "points": {
                    "type": "float"
                }
    }

class ConfigurableElasticBackend(ElasticsearchSearchBackend):

    def build_schema(self, fields):
        content_field_name = ''
        mapping = {}

        for field_name, field_class in fields.items():
            field_mapping = {
                'boost': field_class.boost,
                'index': 'analyzed',
                'store': 'yes',
                'type': 'string',
            }

            if field_class.document is True:
                content_field_name = field_class.index_fieldname

            if field_class.field_type in ['date', 'datetime']:
                field_mapping['type'] = 'date'
            elif field_class.field_type == 'integer':
                field_mapping['type'] = 'long'
            elif field_class.field_type == 'float':
                field_mapping['type'] = 'float'
            elif field_class.field_type == 'boolean':
                field_mapping['type'] = 'boolean'
            elif field_class.field_type == 'nested':
                field_mapping['type'] = 'nested'
                try:
                    field_mapping['properties'] = field_class.properties
                except AttributeError:
                    pass
            elif field_class.field_type == 'ngram':
                field_mapping['analyzer'] = "ngram_analyzer"
            elif field_class.field_type == 'edge_ngram':
                field_mapping['analyzer'] = "edgengram_analyzer"
            elif field_class.field_type == 'location':
                field_mapping['type'] = 'geo_point'

            if field_class.stored is False:
                field_mapping['store'] = 'no'

            # Do this last to override `text` fields.
            if field_class.indexed is False or hasattr(field_class, 'facet_for'):
                field_mapping['index'] = 'not_analyzed'

            if field_mapping['type'] == 'string' and field_class.indexed:
                field_mapping["term_vector"] = "with_positions_offsets"

                if not hasattr(field_class, 'facet_for') and not field_class.field_type in('ngram', 'edge_ngram'):
                    field_mapping["analyzer"] = "snowball"

            mapping[field_class.index_fieldname] = field_mapping

        return (content_field_name, mapping)

class ConfigurableElasticsearchSearchQuery(ElasticsearchSearchQuery):
    pass

class ConfigurableElasticSearchEngine(ElasticsearchSearchEngine):
    backend = ConfigurableElasticBackend
    query = ConfigurableElasticsearchSearchQuery

Once the custom classes are in place, it can be used by pointing HAYSTACK_CONNECTIONS to it:

HAYSTACK_CONNECTIONS = {
    'default': {
         'ENGINE': 'search.custom_elasticsearch.ConfigurableElasticSearchEngine',
         'URL': 'http://127.0.0.1:9200',
         'INDEX_NAME': 'haystack',
        }
}

Because we have only extended the indexing functionality at this point, all querying should work as normally (and will still use the default SearchQuerySet). But by using TagsField in your indexes and adjusting your prepare methods to output lists of dictionaries, these fields will now fully function as nested fields within Elasticsearch.

Let's illustrate this with a short example:

class TaggableIndex(indexes.SearchIndex, indexes.Indexable):
    tags = search.custom_elasticsearch.TagsField(indexed=False, stored=True, boost=1.5)

    def prepare_tags(self, obj):
        return [{'tag': tag.name, 'points': tag.point_total} for tag in obj.get_tags()]

Add nested methods to SearchQuerySet, SearchQuery, and SearchBackend

You might now say: Elasticsearch is "you know, for search", not just "for indexing", and you would be entirely correct. Extending the Elasticsearch backend will require us to subclass SearchQuerySet and to add some more methods to the subclasses we already defined.

from haystack.query import SearchQuerySet
from haystack.constants import DEFAULT_ALIAS, DJANGO_CT

class ConfigurableSearchQuerySet(SearchQuerySet):
    def custom_score(self, score_query_string=None, params=None):
        """Adds arguments for custom_score to the query"""
        clone = self._clone()
        clone.query.add_custom_score(score_query_string, params)
        return clone

    def nested(self, terms=None, path="tags", field="tag"):
        """Adds arguments for nested to the query"""
        clone = self._clone()
        clone.query.add_nested(terms, path, field)
        return clone

class ConfigurableElasticsearchSearchQuery(ElasticsearchSearchQuery):
    def __init__(self, using=DEFAULT_ALIAS):
        out = super(ConfigurableElasticsearchSearchQuery, self).__init__(using)
        self.custom_score = {}
        self.nested = {}

    def add_custom_score(self, score_query_string=None, params=None):
        """Adds arguments for custom_score to the query"""
        self.custom_score = {
            'score_query_string': score_query_string,
            'score_query_params': params,
            }

    def add_nested(self, terms=None, path=None, field=None):
        """Adds arguments for nested to the query"""
        self.nested = {
            'nested_query_terms': terms,
            'nested_query_path': path,
            'nested_query_field': field
        }

    def build_params(self, spelling_query=None, **kwargs):
        """
        Add custom_score and/or nested parameters
        """
        search_kwargs = super(ConfigurableElasticsearchSearchQuery, self).build_params(spelling_query, **kwargs)
        if self.custom_score:
            search_kwargs['custom_score'] = self.custom_score
        if self.nested:
            search_kwargs['nested'] = self.nested

        return search_kwargs

    def _clone(self, klass=None, using=None):
        clone = super(ConfigurableElasticsearchSearchQuery, self)._clone(klass, using)
        clone.custom_score = self.custom_score
        clone.nested = self.nested
        return clone

Basically, these extensions allow for the additional methods custom_score(score_query_string=None, params=None) andnested(terms=None, path="tags", field="tag") to be used from a ConfigurableSearchQuerySet instance. Their arguments are, via ConfigurableElasticsearchSearchQuery passed to ConfigurableElasticBackend which can finally build a dictionary of the query. The code that adds custom_score() is Nathan Keilar's and nested() follows its example.

class ConfigurableElasticBackend(ElasticsearchSearchBackend):

    def build_search_kwargs(self, query_string, sort_by=None, start_offset=0, end_offset=None,
                        fields='', highlight=False, facets=None,
                        date_facets=None, query_facets=None,
                        narrow_queries=None, spelling_query=None,
                        within=None, dwithin=None, distance_point=None,
                        models=None, limit_to_registered_models=None,
                        result_class=None,custom_score=None,nested=None):

        out = super(ConfigurableElasticBackend, self).build_search_kwargs(query_string, sort_by, start_offset, end_offset,
                                                               fields, highlight, facets,
                                                               date_facets, query_facets,
                                                               narrow_queries, spelling_query,
                                                               within, dwithin, distance_point,
                                                               models, limit_to_registered_models,
                                                               result_class)

        if nested:
            out['query'] = self.nested_query_factory(nested)

        elif custom_score:
            out['query'] = { "custom_score": {
                                    "script": custom_score['score_query_string'],
                                    "query": out['query']
                                    }
                                }
            if custom_score['score_query_params']:
                out['query']['custom_score']['params'] = custom_score['score_query_params']

        return out

    def nested_query_factory(self, nested):
        score_script = "(doc['%s.points'].empty ? 0 : doc['%s.points'].value)" % \
                       (nested['nested_query_path'],nested['nested_query_path'])
        query = {"nested": {
                        "path": nested['nested_query_path'],
                        "score_mode": "total",
                        "query": {
                            "function_score": {
                                "query": {
                                    "terms": {
                                        nested['nested_query_field']: nested['nested_query_terms'],
                                        "minimum_match" : 1
                                    }
                                },
                                "script_score": {
                                    "script" : score_script,
                                    "lang": "mvel"
                                },
                                "boost_mode": "replace"
                            }
                        }
                    }
                }
        return query

    def build_schema(self, fields):
        # as defined above
        ....
        return (content_field_name, mapping)

As it is defined now, a sqs.nested() query replaces any original queries that may be called on the search query set. The nested query is set up by nested_query_factory() in such a way that additional queries are not easily inserted in this structure. A sqs.custom_score() query, in contrast, wraps around any other queries on sqs, incorporating them into the final query that is requested from Elasticsearch.

The query that is built by nested_query_factory() does include a function score query (the successor of custom_score), to offer fine-grained control over how a score for the parent document is computed from its nested children. We have hardcoded a score script to demonstrate the weighted tag use case, but implementing it as an attribute toConfigurableSearchQuerySet.nested() is a small step to take.

Customize the query wrapping order

Being able to do a nested query that replaces any original query is not sufficient. We want to combine queries conveniently, in a way that makes sense for our application.

This creates the need to decide on an order in which the queries are wrapped together. There's no need to customize all the ordering though; we're mostly interested in the wrapping of the outer levels of the query. To do this, let's override several keyword arguments on super(ConfigurableElasticBackend, self), to move them to the outer wrapping and to customize their order.

Once more, the final ConfigurableElasticBackend (don't forget the additional import!):

from django.conf import settings

class ConfigurableElasticBackend(ElasticsearchSearchBackend):

    def build_search_kwargs(self, query_string, sort_by=None, start_offset=0, end_offset=None,
                        fields='', highlight=False, facets=None,
                        date_facets=None, query_facets=None,
                        narrow_queries=None, spelling_query=None,
                        within=None, dwithin=None, distance_point=None,
                        models=None, limit_to_registered_models=None,
                        result_class=None,custom_score=None,nested=None):

        out = super(ConfigurableElasticBackend, self).build_search_kwargs(query_string, sort_by, start_offset, end_offset,
                                                               fields, highlight, facets,
                                                               date_facets, query_facets,
                                                               None, spelling_query, #narrow_queries==None
                                                               within, dwithin, distance_point,
                                                               None, False, #models==None, limit_..._models==False
                                                               result_class)

        # Wrapping order TOP: inner query -> DOWN: outer queries
        if custom_score:
            out['query'] = { "custom_score": {
                                    "script": custom_score['score_query_string'],
                                    "query": out['query']
                                    }
                                }
            if custom_score['score_query_params']:
                out['query']['custom_score']['params'] = custom_score['score_query_params']

        if nested:
            # check if there is an original query
            if 'match_all' not in out['query']:
                out['query'] = self.bool_query_factory(out['query'], nested)
            else:
                out['query'] = self.nested_query_factory(nested)

        ## START outer wrapping of filter(s): narrow(access/models/etc.)
        if limit_to_registered_models is None:
            limit_to_registered_models = getattr(settings, 'HAYSTACK_LIMIT_TO_REGISTERED_MODELS', True)

        if models and len(models):
            model_choices = sorted(['%s.%s' % (model._meta.app_label, model._meta.module_name) for model in models])
        elif limit_to_registered_models:
            # Using narrow queries, limit the results to only models handled
            # with the current routers.
            model_choices = self.build_models_list()
        else:
            model_choices = []

        if len(model_choices) > 0:
            if narrow_queries is None:
                narrow_queries = set()

            narrow_queries.add('%s:(%s)' % (DJANGO_CT, ' OR '.join(model_choices)))

        if narrow_queries:
            out['query'] = {
                'filtered': {
                    'query': out['query'],
                    'filter': {
                        'fquery': {
                            'query': {
                                'query_string': {
                                    'query': u' AND '.join(list(narrow_queries)),
                                },
                            },
                            '_cache': True,
                        }
                    }
                }
            }
        ## END outer wrapping of model filter(s)

        return out

    def nested_query_factory(self, nested):
        score_script = "(doc['%s.points'].empty ? 0 : doc['%s.points'].value)" % \
                       (nested['nested_query_path'],nested['nested_query_path'])
        query = {"nested": {
                        "path": nested['nested_query_path'],
                        "score_mode": "total",
                        "query": {
                            "function_score": {
                                "query": {
                                    "terms": {
                                        nested['nested_query_field']: nested['nested_query_terms'],
                                        "minimum_match" : 1
                                    }
                                },
                                "script_score": {
                                    "script" : score_script,
                                    "lang": "mvel"
                                },
                            "boost_mode": "replace"
                            }
                        }
                    }
                }
        return query

    def bool_query_factory(self, original_query, nested):
        query = {"bool": {
                    "should": [
                       self.nested_query_factory(nested),
                       original_query
                    ],
                    "minimum_should_match": 1
                    }
                }
        return query

    def build_schema(self, fields):
        content_field_name = ''
        mapping = {}

        for field_name, field_class in fields.items():
            field_mapping = {
                'boost': field_class.boost,
                'index': 'analyzed',
                'store': 'yes',
                'type': 'string',
            }

            if field_class.document is True:
                content_field_name = field_class.index_fieldname

            if field_class.field_type in ['date', 'datetime']:
                field_mapping['type'] = 'date'
            elif field_class.field_type == 'integer':
                field_mapping['type'] = 'long'
            elif field_class.field_type == 'float':
                field_mapping['type'] = 'float'
            elif field_class.field_type == 'boolean':
                field_mapping['type'] = 'boolean'
            elif field_class.field_type == 'nested':
                field_mapping['type'] = 'nested'
                try:
                    field_mapping['properties'] = field_class.properties
                except AttributeError:
                    pass
            elif field_class.field_type == 'ngram':
                field_mapping['analyzer'] = "ngram_analyzer"
            elif field_class.field_type == 'edge_ngram':
                field_mapping['analyzer'] = "edgengram_analyzer"
            elif field_class.field_type == 'location':
                field_mapping['type'] = 'geo_point'

            if field_class.stored is False:
                field_mapping['store'] = 'no'

            # Do this last to override `text` fields.
            if field_class.indexed is False or hasattr(field_class, 'facet_for'):
                field_mapping['index'] = 'not_analyzed'

            if field_mapping['type'] == 'string' and field_class.indexed:
                field_mapping["term_vector"] = "with_positions_offsets"

                if not hasattr(field_class, 'facet_for') and not field_class.field_type in('ngram', 'edge_ngram'):
                    field_mapping["analyzer"] = "snowball"

            mapping[field_class.index_fieldname] = field_mapping

        return (content_field_name, mapping)