Module contents

ESORM is an elasticsearch python ORM based on Pydantic

class esorm.ESBaseModel(**data)[source]

Bases: BaseModel

Base class for Elastic

It is useful for nested models, if you don’t need the model in ES mappings

class ESConfig[source]

Bases: object

ESBaseModel Config

This is just for lazy properties, to make ESBasemodel compatible with them

lazy_property_max_recursion_depth: int = 1

Maximum recursion depth of lazy properties

async calc_lazy_properties()[source]

(re)Calculate lazy properties

model_computed_fields: ClassVar[dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True, 'extra': 'forbid', 'populate_by_name': True, 'ser_json_bytes': 'base64', 'str_strip_whitespace': True, 'validate_assignment': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

class esorm.ESBulk(wait_for=False, **bulk_kwargs)[source]

Bases: object

Bulk operation for ElasticSearch

async delete(model)[source]

Add the model to the bulk for deletion

Parameters:

model (TypeVar(TModel, bound= ESModel)) – The model to add for deletion

async save(model)[source]

Add the model to the bulk for saving

If the model is from ES (get or search, so it has _seq_no and _primary_term), it will use optimistic concurrency check, so it will only update the document if the _seq_no and _primary_term are the same as the document in the index.

If the model is an ESModelTimestamp, it will update the modified_at field to the current time and if the created_at field is not already set, it will set it to the current time too.

Parameters:

model (TypeVar(TModel, bound= ESModel)) – The model to add for saving

class esorm.ESModel(**data)[source]

Bases: ESBaseModel

ElasticSearch Base Model

class ESConfig[source]

Bases: object

ESModel Config

default_sort: Optional[List[Dict[str, Dict[str, str]]]] = None

Default sort

id_field: Optional[str] = None

The name of the ‘id’ field

index: Optional[str] = None

The index name

lazy_property_max_recursion_depth: int = 1

Maximum recursion depth of lazy properties

settings: Optional[Dict[str, Any]] = None

Index settings

async classmethod aggregate(aggs, *, query=None, routing=None, **kwargs)[source]

Aggregate Model with aggregation dict Before aggregation the model can be filtered by query dict.

Parameters:
  • aggs (Dict[str, ESAgg]) – Aggregation dict

  • query (Optional[ESQuery]) – ElasticSearch query dict

  • routing (Optional[str]) – Shard routing value

  • kwargs – Other search API params

Return type:

Dict[str, Union[ESAggValueResponse, ESAggTermsResponse, ESAggHistogramResponse]]

Returns:

The result list

async classmethod all(**kwargs)[source]

Get all documents

Parameters:

kwargs – Other search API params

Return type:

List[TypeVar(TModel, bound= ESModel)]

Returns:

The result list

async classmethod call(method_name, *, wait_for=None, **kwargs)[source]

Call an elasticsearch method

This is a low level ES method call, it is not recommended to use this directly.

Parameters:
  • method_name – The name of the method to call

  • wait_for – Waits for all shards to sync before returning response

  • kwargs – The arguments to pass to the method

Return type:

dict

Returns:

The result dictionary from ElasticSearch

static create_query_from_dict(fields)[source]

Creates a query dict from a dictionary of fields and values

Parameters:

fields (Dict[str, Union[str, int, float]]) – A dictionary of fields and values to search by

Return type:

ESQuery

Returns:

A query dict

async delete(*, wait_for=False, routing=None)[source]

Deletes document from ElasticSearch.

Parameters:
  • wait_for – Waits for all shards to sync before returning response - useful when writing tests. Defaults to False.

  • routing (Optional[str]) – Shard routing value

Raises:
  • esorm.error.NotFoundError – Returned if document not found

  • ValueError – Returned when id attribute missing from instance

classmethod from_es(data)[source]

Returns an ESModel from an elasticsearch document that has _id, _source

Parameters:

data (Dict[str, Any]) – Elasticsearch document that has _id, _source

Raises:

esorm.error.InvalidResponseError – Returned when _id or _source is missing from data

Return type:

Optional[TypeVar(TModel, bound= ESModel)]

Returns:

The ESModel instance

async classmethod get(id, *, routing=None)[source]

Fetches document and returns ESModel instance populated with properties.

Parameters:
  • id (Union[str, int, float]) – Document id

  • routing (Optional[str]) – Shard routing value

Raises:

esorm.error.NotFoundError – Returned if document not found

Return type:

TypeVar(TModel, bound= ESModel)

Returns:

ESModel object

model_computed_fields: ClassVar[dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True, 'extra': 'forbid', 'populate_by_name': True, 'ser_json_bytes': 'base64', 'str_strip_whitespace': True, 'validate_assignment': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

model_post_init(__context)

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that’s what pydantic-core passes when calling it.

Return type:

None

Args:

self: The BaseModel instance. __context: The context.

async reload(*, routing=None)[source]

Reloads the document from ElasticSearch

Parameters:

routing (Optional[str]) – Shard routing value

Raises:

esorm.error.NotFoundError – Returned if document not found

Return type:

TypeVar(TModel, bound= ESModel)

async save(*, wait_for=False, pipeline=None, routing=None)[source]

Save document into elasticsearch.

If document already exists, existing document will be updated as per native elasticsearch index operation. If model has id (Config.id_field or __id__), this will be used as the elasticsearch _id. The id field will be removed from the document before indexing. If no id is provided, then document will be indexed and elasticsearch will generate a suitable id that will be populated on the returned model.

Parameters:
  • wait_for – Waits for all shards to sync before returning response - useful when writing tests. Defaults to False.

  • pipeline (Optional[str]) – Pipeline to use for indexing

  • routing (Optional[str]) – Shard routing value

Return type:

str

Returns:

The new document’s ID, it is always a string, even if the id field is an integer

async classmethod search(query, *, page_size=None, page=None, sort=None, routing=None, res_dict=False, **kwargs)[source]

Search Model with query dict

Parameters:
  • query (ESQuery) – ElasticSearch query dict

  • page_size (Optional[int]) – Pagination page size

  • page (Optional[int]) – Pagination page num, 1st page is 1

  • sort (Union[list, str, None]) – Name of field to be sorted, or sort term list of dict, if not specified, model’s default sort will be used, or no sorting

  • routing (Optional[str]) – Shard routing value

  • res_dict (bool) – If the result should be a dict with id as key and model as value instead of a list of models

  • kwargs – Other search API params

Return type:

Union[List[TypeVar(TModel, bound= ESModel)], Dict[str, TypeVar(TModel, bound= ESModel)]]

Returns:

The result list

async classmethod search_by_fields(fields, *, page_size=None, page=None, sort=None, routing=None, aggs=None, res_dict=False, **kwargs)[source]

Search Model by fields as key-value pairs

Parameters:
  • fields (Dict[str, Union[str, int, float]]) – A dictionary of fields and values to search by

  • page_size (Optional[int]) – Pagination page size

  • page (Optional[int]) – Pagination page num, 1st page is 1

  • sort (Union[list, str, None]) – Name of field to be sorted, or sort term list of dict, if not specified, model’s default sort will be used, or no sorting

  • routing (Optional[str]) – Shard routing value

  • aggs (Optional[Dict[str, ESAgg]]) – Aggregations

  • res_dict (bool) – If the result should be a dict with id as key and model as value instead of a list of models

  • kwargs – Other search API params

Return type:

List[TypeVar(TModel, bound= ESModel)]

Returns:

The result list

async classmethod search_one(query, *, routing=None, **kwargs)[source]

Search Model and return the first result

Parameters:
  • query (ESQuery) – ElasticSearch query dict

  • routing (Optional[str]) – Shard routing value

  • kwargs – Other search API params

Return type:

Optional[TypeVar(TModel, bound= ESModel)]

Returns:

The first result or None if no result

async classmethod search_one_by_fields(fields, *, routing=None, aggs=None, **kwargs)[source]

Search Model by fields as key-value pairs and return the first result

Parameters:
  • fields (Dict[str, Union[str, int, float]]) – A dictionary of fields and values to search by

  • routing (Optional[str]) – Shard routing value

  • aggs (Optional[Dict[str, ESAgg]]) – Aggregations

  • kwargs – Other search API params

Return type:

Optional[TypeVar(TModel, bound= ESModel)]

Returns:

The first result or None if no result

to_es(**kwargs)[source]

Generates a dictionary equivalent to what ElasticSearch returns in the ‘_source’ property of a response.

It automatically removes the id field from the document if it is set in ESConfig.id_field to prevent duplication of the id field.

Parameters:

kwargs – Pydantic’s model_dump parameters

Return type:

dict

Returns:

The dictionary for ElasticSearch

update_from_es(data)[source]

Update the model from ElasticSearch data

Parameters:

data (Dict[str, Any]) – The ElasticSearch data

Raises:

esorm.error.InvalidResponseError – Returned when _id or _source is missing from data

class esorm.ESModelTimestamp(**data)[source]

Bases: ESModel

Model which stores created_at and modified_at fields automatcally.

created_at: Optional[datetime]
model_computed_fields: ClassVar[dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True, 'extra': 'forbid', 'populate_by_name': True, 'ser_json_bytes': 'base64', 'str_strip_whitespace': True, 'validate_assignment': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'created_at': FieldInfo(annotation=Union[datetime, NoneType], required=False, default=None, description='Creation date and time'), 'modified_at': FieldInfo(annotation=Union[datetime, NoneType], required=False, default_factory=utcnow, description='Modification date and time')}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

model_post_init(_ModelMetaclass__context: Any) None

We need to both initialize private attributes and call the user-defined model_post_init method.

Return type:

None

modified_at: Optional[datetime]
async save(*, wait_for=False, force_new=False, pipeline=None, routing=None)[source]

Save document into elasticsearch.

If document already exists, existing document will be updated as per native elasticsearch index operation. If model has id (Meta.id_field or __id__), this will be used as the elasticsearch _id. The id field will be removed from the document before indexing. If no id is provided, then document will be indexed and elasticsearch will generate a suitable id that will be populated on the returned model.

Parameters:
  • wait_for – Waits for all shards to sync before returning response - useful when writing tests. Defaults to False.

  • force_new – It is assumed to be a new document, so created_at will be set to current time (it is no more necessary, because created_at is set to current time if it is None. It is here for backward compatibility)

  • pipeline (Optional[str]) – Pipeline to use for indexing

  • routing (Optional[str]) – Shard routing value

Return type:

str

Returns:

The new document’s ID

esorm.Field(default, *, index=True, alias=None, title=None, description=None, exclude=None, include=None, frozen=False, **extra)[source]

Basic Field Info

Parameters:
  • default (Any) – since this is replacing the field’s default, its first argument is used to set the default, use ellipsis (...) to indicate the field is required

  • index (bool) – if this field should be indexed or not

  • alias (Optional[str]) – the public name of the field

  • title (Optional[str]) – can be any string, used in the schema

  • description (Optional[str]) – can be any string, used in the schema

  • exclude (Optional[bool]) – exclude this field while dumping. Takes same values as the include and exclude arguments on the .dict method.

  • include (Optional[bool]) – include this field while dumping. Takes same values as the include and exclude arguments on the .dict method.

  • frozen (bool) – if this field should be frozen or not

  • extra – any additional keyword arguments will be added as is to the schema

Return type:

FieldInfo

Returns:

A field info object

exception esorm.InvalidModelError[source]

Bases: Exception

Raised when a model is invalid.

exception esorm.InvalidResponseError[source]

Bases: Exception

Raised when the response from Elasticsearch is invalid.

exception esorm.NotFoundError[source]

Bases: Exception

Raised when a model is not found.

class esorm.Pagination(**data)[source]

Bases: BaseModel

Pagination parameters

callback: Optional[Callable[[int], Awaitable[None]]]

Callback after the search is done with the total number of hits

model_computed_fields: ClassVar[dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'callback': FieldInfo(annotation=Union[Callable[[int], Awaitable[NoneType]], NoneType], required=False, default=None, description='Callback after the search is done with the total number of hits'), 'page': FieldInfo(annotation=int, required=False, default=1, description='The page number'), 'page_size': FieldInfo(annotation=int, required=False, default=10, description='The page size')}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

page: int

The page number

page_size: int

The page size

class esorm.Sort(**data)[source]

Bases: BaseModel

Sort parameters

model_computed_fields: ClassVar[dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'sort': FieldInfo(annotation=Union[List[Dict[str, esorm.model.SortOrder]], str, NoneType], required=True)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

sort: Union[List[Dict[str, SortOrder]], str, None]
async esorm.connect(hosts, *args, wait=False, **kwargs)[source]

Connect to ElasticSearch

Parameters:
  • hosts (Union[str, List[Union[str, Mapping[str, Union[str, int]], NodeConfig]]]) – ElasticSearch hosts to connect, either a list a mapping, or a single string

  • args – Other AsyncElasticsearch arguments

  • wait – Wait for AsyncElasticsearch to be ready

  • kwargs – Other AsyncElasticsearch keyword arguments

Return type:

Optional[AsyncElasticsearch]

Returns:

AsyncElasticsearch client instance

async esorm.get_es_version()[source]

Get ElasticSearch version

Return type:

str

Returns:

ElasticSearch version

esorm.lazy_property(func)[source]

Decorator for lazy properties

Lazy properties computed after search from ES

Parameters:

func (Callable[[], Awaitable[Any]]) – The async function to decorate

Returns:

The decorated function

esorm.retry_on_conflict(max_retries=-1)[source]

Decorator for optimistic concurrency control

Parameters:

max_retries – The maximum number of retries, -1 for infinite

Returns:

The decorated function

async esorm.setup_mappings(*_, debug=False)[source]

Create mappings for indices or try to extend it if there are new fields

Submodules

esorm.aggs module

ElasticSearch aggregation type definitions for ESORM

class esorm.aggs.ESAgg(*args, **kwargs)[source]

Bases: dict

Holds all types of aggregations supported

avg: ESAggFieldParams

Average aggregation

histogram: ESAggHistogramParams

Histogram aggregation

max: ESAggFieldParams

Maximum aggregation

min: ESAggFieldParams

Minimum aggregation

sum: ESAggFieldParams

Sum aggregation

terms: ESAggTermParams

Terms aggregation

class esorm.aggs.ESAggBucketResponse(*args, **kwargs)[source]

Bases: dict

Represents a single bucket in a bucket aggregation.

doc_count: int

The number of documents in this bucket.

key: str

The key of the bucket.

class esorm.aggs.ESAggExtendedBounds(*args, **kwargs)[source]

Bases: dict

Represents the parameters for extended bounds in Elasticsearch.

max: int

The maximum value.

min: int

The minimum value.

class esorm.aggs.ESAggFieldParams(*args, **kwargs)[source]

Bases: dict

Represents field parameter in Elasticsearch.

field: str

The field to aggregate on.

class esorm.aggs.ESAggHistogramBucketresponse(*args, **kwargs)[source]

Bases: dict

Represents a bucket in a histogram aggregation.

doc_count: int

The number of documents in this bucket.

key: float

Numeric key corresponding to the bucket’s range.

class esorm.aggs.ESAggHistogramParams(*args, **kwargs)[source]

Bases: dict

Represents the parameters for a histogram aggregation in Elasticsearch.

extended_bounds: ESAggExtendedBounds

The extended bounds of the histogram.

field: str

The field to aggregate on.

interval: int

The interval of the histogram.

min_doc_count: int

The minimum number of documents in a bucket.

class esorm.aggs.ESAggHistogramResponse(*args, **kwargs)[source]

Bases: dict

Represents the response for a histogram aggregation.

buckets: List[ESAggHistogramBucketresponse]

A list of buckets in the histogram aggregation.

class esorm.aggs.ESAggTermParams(*args, **kwargs)[source]

Bases: dict

Represents the parameters for a terms aggregation in Elasticsearch.

field: str

The field to aggregate on.

order: Dict[str, str]

The order of the buckets.

size: int

The number of buckets to return.

class esorm.aggs.ESAggTermsResponse(*args, **kwargs)[source]

Bases: dict

Represents the response for a terms aggregation.

buckets: List[ESAggBucketResponse]

A list of buckets in the terms aggregation.

class esorm.aggs.ESAggValueResponse(*args, **kwargs)[source]

Bases: dict

Represents the response for an average, min, or max aggregation.

value: float

The average, min, or max value.

esorm.aggs.ESAggs

ElasticSearch aggregations type definition

alias of Dict[str, ESAgg]

esorm.aggs.ESAggsResponse

ElasticSearch aggregations response type definition

alias of Dict[str, Union[ESAggValueResponse, ESAggTermsResponse, ESAggHistogramResponse]]

esorm.bulk module

Bulk operation for ElasticSearch

class esorm.bulk.ESBulk(wait_for=False, **bulk_kwargs)[source]

Bases: object

Bulk operation for ElasticSearch

async delete(model)[source]

Add the model to the bulk for deletion

Parameters:

model (TypeVar(TModel, bound= ESModel)) – The model to add for deletion

async save(model)[source]

Add the model to the bulk for saving

If the model is from ES (get or search, so it has _seq_no and _primary_term), it will use optimistic concurrency check, so it will only update the document if the _seq_no and _primary_term are the same as the document in the index.

If the model is an ESModelTimestamp, it will update the modified_at field to the current time and if the created_at field is not already set, it will set it to the current time too.

Parameters:

model (TypeVar(TModel, bound= ESModel)) – The model to add for saving

esorm.error module

This module contains all the exceptions that can be raised by ESORM.

exception esorm.error.BulkError(failed_operations)[source]

Bases: Exception

Exception for handling bulk operation errors.

failed_operations: List[BulkOperationError]
class esorm.error.BulkOperationError(*args, **kwargs)[source]

Bases: dict

A dictionary type to represent an error in a bulk operation response from Elasticsearch.

model: ESModel
reason: str
status: int
type: str
exception esorm.error.IndexDoesNotFoundError[source]

Bases: Exception

Raised when an index does not exist.

exception esorm.error.InvalidModelError[source]

Bases: Exception

Raised when a model is invalid.

exception esorm.error.InvalidResponseError[source]

Bases: Exception

Raised when the response from Elasticsearch is invalid.

exception esorm.error.NotFoundError[source]

Bases: Exception

Raised when a model is not found.

esorm.esorm module

ElasticSearch ORM main module

async esorm.esorm.connect(hosts, *args, wait=False, **kwargs)[source]

Connect to ElasticSearch

Parameters:
  • hosts (Union[str, List[Union[str, Mapping[str, Union[str, int]], NodeConfig]]]) – ElasticSearch hosts to connect, either a list a mapping, or a single string

  • args – Other AsyncElasticsearch arguments

  • wait – Wait for AsyncElasticsearch to be ready

  • kwargs – Other AsyncElasticsearch keyword arguments

Return type:

Optional[AsyncElasticsearch]

Returns:

AsyncElasticsearch client instance

async esorm.esorm.get_es_version()[source]

Get ElasticSearch version

Return type:

str

Returns:

ElasticSearch version

esorm.fastapi module

FastaAPI utilities for ESORM

esorm.fastapi.make_dep_pagination(default_page=1, default_page_size=10, set_headers=True)[source]

Create a pagination dependency with default values

Parameters:
  • default_page (int) – Default page number, the first page is 1

  • default_page_size (int) – Default page size, the default is 10

  • set_headers (bool) – Set X-Total-Hits header after search

Return type:

callable

Returns:

Pagination dependency

esorm.fastapi.make_dep_sort(**kwargs)[source]

Create a sort dependency with sort definitions

Parameters:

kwargs (Union[List[Dict[str, dict]], Dict[str, any]]) – Sort definitions

Return type:

callable

Returns:

Sort dependency

esorm.fastapi.set_max_page_size(max_page_size)[source]

Set the maximum page size for queries

Parameters:

max_page_size (int) – The maximum page size

Returns:

None

esorm.fields module

class esorm.fields.Binary[source]

Bases: str

Stores binary data as base64 encoded strings

property bytes: bytes
classmethod validate_binary(v, _)[source]
Return type:

str

class esorm.fields.Byte[source]

Bases: int

Byte Field

class esorm.fields.Double(x=0, /)[source]

Bases: float

Double Field

esorm.fields.Field(default, *, index=True, alias=None, title=None, description=None, exclude=None, include=None, frozen=False, **extra)[source]

Basic Field Info

Parameters:
  • default (Any) – since this is replacing the field’s default, its first argument is used to set the default, use ellipsis (...) to indicate the field is required

  • index (bool) – if this field should be indexed or not

  • alias (Optional[str]) – the public name of the field

  • title (Optional[str]) – can be any string, used in the schema

  • description (Optional[str]) – can be any string, used in the schema

  • exclude (Optional[bool]) – exclude this field while dumping. Takes same values as the include and exclude arguments on the .dict method.

  • include (Optional[bool]) – include this field while dumping. Takes same values as the include and exclude arguments on the .dict method.

  • frozen (bool) – if this field should be frozen or not

  • extra – any additional keyword arguments will be added as is to the schema

Return type:

FieldInfo

Returns:

A field info object

class esorm.fields.Float(x=0, /)[source]

Bases: float

Float Field

class esorm.fields.HalfFloat(x=0, /)[source]

Bases: float

Half Float Field

class esorm.fields.Integer[source]

Bases: int

Integer Field

class esorm.fields.Keyword[source]

Bases: str

Keyword Field

class esorm.fields.LatLon(**data)[source]

Bases: BaseModel

Geo Point Field - Latitude and Longitude

lat: float

Latitude Coordinate

lon: float

Longitude Coordinate

model_computed_fields: ClassVar[dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'lat': FieldInfo(annotation=float, required=True, description='Latitude Coordinate'), 'lon': FieldInfo(annotation=float, required=True, description='Longitude Coordinate')}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

class esorm.fields.Long[source]

Bases: int

Long Field

esorm.fields.NumericField(default, *, index=None, alias=None, gt=None, ge=None, lt=None, le=None, multiple_of=None, allow_inf_nan=None, max_digits=None, decimal_places=None, title=None, description=None, exclude=None, include=None, frozen=False, **extra)[source]

Numeric Field Info

Parameters:
  • default (Union[int, float]) – since this is replacing the field’s default, its first argument is used to set the default, use ellipsis (...) to indicate the field is required

  • index (Optional[bool]) – if this field should be indexed or not

  • alias (Optional[str]) – the public name of the field

  • gt (Optional[float]) – only applies to numbers, requires the field to be “greater than”. The schema will have an exclusiveMinimum validation keyword

  • ge (Optional[float]) – only applies to numbers, requires the field to be “greater than or equal to”. The schema will have a minimum validation keyword

  • lt (Optional[float]) – only applies to numbers, requires the field to be “less than”. The schema will have an exclusiveMaximum validation keyword

  • le (Optional[float]) – only applies to numbers, requires the field to be “less than or equal to”. The schema will have a maximum validation keyword

  • multiple_of (Optional[float]) – only applies to numbers, requires the field to be “a multiple of”. The schema will have a multipleOf validation keyword

  • allow_inf_nan (Optional[bool]) – only applies to numbers, allows the field to be NaN or infinity (+inf or -inf), which is a valid Python float. Default True, set to False for compatibility with JSON.

  • max_digits (Optional[int]) – only applies to Decimals, requires the field to have a maximum number of digits within the decimal. It does not include a zero before the decimal point or trailing decimal zeroes.

  • decimal_places (Optional[int]) – only applies to Decimals, requires the field to have at most a number of decimal places allowed. It does not include trailing decimal zeroes.

  • title (Optional[str]) – can be any string, used in the schema

  • description (Optional[str]) – can be any string, used in the schema

  • exclude (Optional[bool]) – exclude this field while dumping. Takes same values as the include and exclude arguments on the .dict method.

  • include (Optional[bool]) – include this field while dumping. Takes same values as the include and exclude arguments on the .dict method.

  • frozen (bool) – if this field should be frozen or not

  • extra – any additional keyword arguments will be added as is to the schema

Return type:

FieldInfo

Returns:

A field info object

class esorm.fields.Short[source]

Bases: int

Short Field

class esorm.fields.Text[source]

Bases: str

Text Field

esorm.fields.TextField(default, *, index=True, alias=None, min_length=None, max_length=None, regex=None, title=None, description=None, exclude=None, include=None, frozen=False, **extra)[source]

Text Field Info

Parameters:
  • default (str) – since this is replacing the field’s default, its first argument is used to set the default, use ellipsis (...) to indicate the field is required

  • index (bool) – if this field should be indexed or not

  • alias (Optional[str]) – the public name of the field

  • min_length (Optional[int]) – only applies to strings, requires the field to have a minimum length. The schema will have a minLength validation keyword

  • max_length (Optional[int]) – only applies to strings, requires the field to have a maximum length. The schema will have a maxLength validation keyword

  • regex (Optional[str]) – only applies to strings, requires the field match against a regular expression pattern string. The schema will have a pattern validation keyword

  • title (Optional[str]) – can be any string, used in the schema

  • description (Optional[str]) – can be any string, used in the schema

  • exclude (Optional[bool]) – exclude this field while dumping. Takes same values as the include and exclude arguments on the .dict method.

  • include (Optional[bool]) – include this field while dumping. Takes same values as the include and exclude arguments on the .dict method.

  • frozen (bool) – if this field should be frozen or not

  • extra – any additional keyword arguments will be added as is to the schema

Return type:

FieldInfo

Returns:

A field info object

esorm.fields.binary

Binary type

alias of Union[Binary, str]

esorm.fields.boolean

alias of bool

esorm.fields.byte

Byte type

alias of Union[Byte, int]

esorm.fields.double

64 bit float (double) type

alias of Union[Double, float]

esorm.fields.float16

16 bit float type

alias of Union[HalfFloat, float]

esorm.fields.float32

32 bit float type

alias of Union[Float, float]

esorm.fields.geo_point

Geo Point type

esorm.fields.int32

32 bit integer type

alias of Union[Integer, int]

esorm.fields.keyword

Keyword type

alias of Union[Keyword, str]

esorm.fields.long

64 bit integer (long) type

alias of Union[Long, int]

esorm.fields.short

Short type

alias of Union[Short, int]

esorm.fields.text

Text type

alias of Union[Text, str]

esorm.logger module

esorm.model module

This module contains the ESModel classes and related functions

class esorm.model.ESBaseModel(**data)[source]

Bases: BaseModel

Base class for Elastic

It is useful for nested models, if you don’t need the model in ES mappings

class ESConfig[source]

Bases: object

ESBaseModel Config

This is just for lazy properties, to make ESBasemodel compatible with them

lazy_property_max_recursion_depth: int = 1

Maximum recursion depth of lazy properties

async calc_lazy_properties()[source]

(re)Calculate lazy properties

model_computed_fields: ClassVar[dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True, 'extra': 'forbid', 'populate_by_name': True, 'ser_json_bytes': 'base64', 'str_strip_whitespace': True, 'validate_assignment': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

class esorm.model.ESModel(**data)[source]

Bases: ESBaseModel

ElasticSearch Base Model

class ESConfig[source]

Bases: object

ESModel Config

default_sort: Optional[List[Dict[str, Dict[str, str]]]] = None

Default sort

id_field: Optional[str] = None

The name of the ‘id’ field

index: Optional[str] = None

The index name

lazy_property_max_recursion_depth: int = 1

Maximum recursion depth of lazy properties

settings: Optional[Dict[str, Any]] = None

Index settings

async classmethod aggregate(aggs, *, query=None, routing=None, **kwargs)[source]

Aggregate Model with aggregation dict Before aggregation the model can be filtered by query dict.

Parameters:
  • aggs (Dict[str, ESAgg]) – Aggregation dict

  • query (Optional[ESQuery]) – ElasticSearch query dict

  • routing (Optional[str]) – Shard routing value

  • kwargs – Other search API params

Return type:

Dict[str, Union[ESAggValueResponse, ESAggTermsResponse, ESAggHistogramResponse]]

Returns:

The result list

async classmethod all(**kwargs)[source]

Get all documents

Parameters:

kwargs – Other search API params

Return type:

List[TypeVar(TModel, bound= ESModel)]

Returns:

The result list

async classmethod call(method_name, *, wait_for=None, **kwargs)[source]

Call an elasticsearch method

This is a low level ES method call, it is not recommended to use this directly.

Parameters:
  • method_name – The name of the method to call

  • wait_for – Waits for all shards to sync before returning response

  • kwargs – The arguments to pass to the method

Return type:

dict

Returns:

The result dictionary from ElasticSearch

static create_query_from_dict(fields)[source]

Creates a query dict from a dictionary of fields and values

Parameters:

fields (Dict[str, Union[str, int, float]]) – A dictionary of fields and values to search by

Return type:

ESQuery

Returns:

A query dict

async delete(*, wait_for=False, routing=None)[source]

Deletes document from ElasticSearch.

Parameters:
  • wait_for – Waits for all shards to sync before returning response - useful when writing tests. Defaults to False.

  • routing (Optional[str]) – Shard routing value

Raises:
  • esorm.error.NotFoundError – Returned if document not found

  • ValueError – Returned when id attribute missing from instance

classmethod from_es(data)[source]

Returns an ESModel from an elasticsearch document that has _id, _source

Parameters:

data (Dict[str, Any]) – Elasticsearch document that has _id, _source

Raises:

esorm.error.InvalidResponseError – Returned when _id or _source is missing from data

Return type:

Optional[TypeVar(TModel, bound= ESModel)]

Returns:

The ESModel instance

async classmethod get(id, *, routing=None)[source]

Fetches document and returns ESModel instance populated with properties.

Parameters:
  • id (Union[str, int, float]) – Document id

  • routing (Optional[str]) – Shard routing value

Raises:

esorm.error.NotFoundError – Returned if document not found

Return type:

TypeVar(TModel, bound= ESModel)

Returns:

ESModel object

model_computed_fields: ClassVar[dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True, 'extra': 'forbid', 'populate_by_name': True, 'ser_json_bytes': 'base64', 'str_strip_whitespace': True, 'validate_assignment': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

model_post_init(__context)

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that’s what pydantic-core passes when calling it.

Return type:

None

Args:

self: The BaseModel instance. __context: The context.

async reload(*, routing=None)[source]

Reloads the document from ElasticSearch

Parameters:

routing (Optional[str]) – Shard routing value

Raises:

esorm.error.NotFoundError – Returned if document not found

Return type:

TypeVar(TModel, bound= ESModel)

async save(*, wait_for=False, pipeline=None, routing=None)[source]

Save document into elasticsearch.

If document already exists, existing document will be updated as per native elasticsearch index operation. If model has id (Config.id_field or __id__), this will be used as the elasticsearch _id. The id field will be removed from the document before indexing. If no id is provided, then document will be indexed and elasticsearch will generate a suitable id that will be populated on the returned model.

Parameters:
  • wait_for – Waits for all shards to sync before returning response - useful when writing tests. Defaults to False.

  • pipeline (Optional[str]) – Pipeline to use for indexing

  • routing (Optional[str]) – Shard routing value

Return type:

str

Returns:

The new document’s ID, it is always a string, even if the id field is an integer

async classmethod search(query, *, page_size=None, page=None, sort=None, routing=None, res_dict=False, **kwargs)[source]

Search Model with query dict

Parameters:
  • query (ESQuery) – ElasticSearch query dict

  • page_size (Optional[int]) – Pagination page size

  • page (Optional[int]) – Pagination page num, 1st page is 1

  • sort (Union[list, str, None]) – Name of field to be sorted, or sort term list of dict, if not specified, model’s default sort will be used, or no sorting

  • routing (Optional[str]) – Shard routing value

  • res_dict (bool) – If the result should be a dict with id as key and model as value instead of a list of models

  • kwargs – Other search API params

Return type:

Union[List[TypeVar(TModel, bound= ESModel)], Dict[str, TypeVar(TModel, bound= ESModel)]]

Returns:

The result list

async classmethod search_by_fields(fields, *, page_size=None, page=None, sort=None, routing=None, aggs=None, res_dict=False, **kwargs)[source]

Search Model by fields as key-value pairs

Parameters:
  • fields (Dict[str, Union[str, int, float]]) – A dictionary of fields and values to search by

  • page_size (Optional[int]) – Pagination page size

  • page (Optional[int]) – Pagination page num, 1st page is 1

  • sort (Union[list, str, None]) – Name of field to be sorted, or sort term list of dict, if not specified, model’s default sort will be used, or no sorting

  • routing (Optional[str]) – Shard routing value

  • aggs (Optional[Dict[str, ESAgg]]) – Aggregations

  • res_dict (bool) – If the result should be a dict with id as key and model as value instead of a list of models

  • kwargs – Other search API params

Return type:

List[TypeVar(TModel, bound= ESModel)]

Returns:

The result list

async classmethod search_one(query, *, routing=None, **kwargs)[source]

Search Model and return the first result

Parameters:
  • query (ESQuery) – ElasticSearch query dict

  • routing (Optional[str]) – Shard routing value

  • kwargs – Other search API params

Return type:

Optional[TypeVar(TModel, bound= ESModel)]

Returns:

The first result or None if no result

async classmethod search_one_by_fields(fields, *, routing=None, aggs=None, **kwargs)[source]

Search Model by fields as key-value pairs and return the first result

Parameters:
  • fields (Dict[str, Union[str, int, float]]) – A dictionary of fields and values to search by

  • routing (Optional[str]) – Shard routing value

  • aggs (Optional[Dict[str, ESAgg]]) – Aggregations

  • kwargs – Other search API params

Return type:

Optional[TypeVar(TModel, bound= ESModel)]

Returns:

The first result or None if no result

to_es(**kwargs)[source]

Generates a dictionary equivalent to what ElasticSearch returns in the ‘_source’ property of a response.

It automatically removes the id field from the document if it is set in ESConfig.id_field to prevent duplication of the id field.

Parameters:

kwargs – Pydantic’s model_dump parameters

Return type:

dict

Returns:

The dictionary for ElasticSearch

update_from_es(data)[source]

Update the model from ElasticSearch data

Parameters:

data (Dict[str, Any]) – The ElasticSearch data

Raises:

esorm.error.InvalidResponseError – Returned when _id or _source is missing from data

class esorm.model.ESModelTimestamp(**data)[source]

Bases: ESModel

Model which stores created_at and modified_at fields automatcally.

created_at: Optional[datetime]
model_computed_fields: ClassVar[dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True, 'extra': 'forbid', 'populate_by_name': True, 'ser_json_bytes': 'base64', 'str_strip_whitespace': True, 'validate_assignment': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'created_at': FieldInfo(annotation=Union[datetime, NoneType], required=False, default=None, description='Creation date and time'), 'modified_at': FieldInfo(annotation=Union[datetime, NoneType], required=False, default_factory=utcnow, description='Modification date and time')}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

model_post_init(_ModelMetaclass__context: Any) None

We need to both initialize private attributes and call the user-defined model_post_init method.

Return type:

None

modified_at: Optional[datetime]
async save(*, wait_for=False, force_new=False, pipeline=None, routing=None)[source]

Save document into elasticsearch.

If document already exists, existing document will be updated as per native elasticsearch index operation. If model has id (Meta.id_field or __id__), this will be used as the elasticsearch _id. The id field will be removed from the document before indexing. If no id is provided, then document will be indexed and elasticsearch will generate a suitable id that will be populated on the returned model.

Parameters:
  • wait_for – Waits for all shards to sync before returning response - useful when writing tests. Defaults to False.

  • force_new – It is assumed to be a new document, so created_at will be set to current time (it is no more necessary, because created_at is set to current time if it is None. It is here for backward compatibility)

  • pipeline (Optional[str]) – Pipeline to use for indexing

  • routing (Optional[str]) – Shard routing value

Return type:

str

Returns:

The new document’s ID

class esorm.model.Pagination(**data)[source]

Bases: BaseModel

Pagination parameters

callback: Optional[Callable[[int], Awaitable[None]]]

Callback after the search is done with the total number of hits

model_computed_fields: ClassVar[dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'callback': FieldInfo(annotation=Union[Callable[[int], Awaitable[NoneType]], NoneType], required=False, default=None, description='Callback after the search is done with the total number of hits'), 'page': FieldInfo(annotation=int, required=False, default=1, description='The page number'), 'page_size': FieldInfo(annotation=int, required=False, default=10, description='The page size')}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

page: int

The page number

page_size: int

The page size

class esorm.model.Sort(**data)[source]

Bases: BaseModel

Sort parameters

model_computed_fields: ClassVar[dict[str, ComputedFieldInfo]] = {}

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'sort': FieldInfo(annotation=Union[List[Dict[str, esorm.model.SortOrder]], str, NoneType], required=True)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

sort: Union[List[Dict[str, SortOrder]], str, None]
async esorm.model.create_index_template(name, *, prefix_name, shards=1, replicas=0, **other_settings)[source]

Create index template

Parameters:
  • name (str) – The name of the template

  • prefix_name (str) – The prefix of index pattern

  • shards – Number of shards

  • replicas – Number of replicas

  • other_settings (Any) – Other settings

Return type:

object

Returns:

The result object from ES

esorm.model.lazy_property(func)[source]

Decorator for lazy properties

Lazy properties computed after search from ES

Parameters:

func (Callable[[], Awaitable[Any]]) – The async function to decorate

Returns:

The decorated function

esorm.model.retry_on_conflict(max_retries=-1)[source]

Decorator for optimistic concurrency control

Parameters:

max_retries – The maximum number of retries, -1 for infinite

Returns:

The decorated function

esorm.model.set_default_index_prefix(default_index_prefix)[source]

Set default index prefix we use for model and index creation

Parameters:

default_index_prefix (str) – The default index prefix

esorm.model.set_max_lazy_property_concurrency(concurrency)[source]

Set the maximum concurrency of processing lazy properties

If this is not set, the default is 16.

Parameters:

concurrency (int) – The maximum concurrency

async esorm.model.setup_mappings(*_, debug=False)[source]

Create mappings for indices or try to extend it if there are new fields

esorm.query module

Elasticsearch query type definitions for ESORM

class esorm.query.ESBool(*args, **kwargs)[source]

Bases: dict

Bool query structure

boost: float

Boosting value for the query

filter: List[Union[FieldRange, FieldTerm, FieldTerms, FieldMatch, FieldMatchPhrase, FieldExists, FieldWildcard, FieldPrefix, FieldFuzzy, FieldGeoDistance, FieldMatchAll, FieldESMatchNone, FieldBool]]

Filter queries

minimum_should_match: Union[int, str]

Minimum number of should queries to match

must: List[Union[FieldRange, FieldTerm, FieldTerms, FieldMatch, FieldMatchPhrase, FieldExists, FieldWildcard, FieldPrefix, FieldFuzzy, FieldGeoDistance, FieldMatchAll, FieldESMatchNone, FieldBool]]

Must queries

must_not: List[Union[FieldRange, FieldTerm, FieldTerms, FieldMatch, FieldMatchPhrase, FieldExists, FieldWildcard, FieldPrefix, FieldFuzzy, FieldGeoDistance, FieldMatchAll, FieldESMatchNone, FieldBool]]

Must not queries

should: List[Union[FieldRange, FieldTerm, FieldTerms, FieldMatch, FieldMatchPhrase, FieldExists, FieldWildcard, FieldPrefix, FieldFuzzy, FieldGeoDistance, FieldMatchAll, FieldESMatchNone, FieldBool]]

Should queries

class esorm.query.ESExists(*args, **kwargs)[source]

Bases: dict

Represents an exists query to check if a field exists.

field: str

The field to check.

esorm.query.ESFilter

Represents filter queries in Elasticsearch

alias of List[Union[FieldRange, FieldTerm, FieldTerms, FieldMatch, FieldMatchPhrase, FieldExists, FieldWildcard, FieldPrefix, FieldFuzzy, FieldGeoDistance, FieldMatchAll, FieldESMatchNone, FieldBool]]

class esorm.query.ESFuzzy(*args, **kwargs)[source]

Bases: dict

Represents a fuzzy query for approximate matching in Elasticsearch.

boost: float

Optional boosting value for the query

fuzziness: Union[int, str]

Fuzziness value for the query

max_expansions: int

Maximum number of expansions for the query

prefix_length: int

Prefix length for the query

transpositions: bool

Whether to allow transpositions for the query

value: str

The value to search for.

class esorm.query.ESGeoDistance(*args, **kwargs)[source]

Bases: dict

Represents a geo_distance query for distance-based geospatial queries in Elasticsearch.

distance: Union[str, float]

The distance to search for.

distance_type: str

The distance type to use for the query.

location: Union[Dict[str, float], str]

The location to search from.

location_field: str

The field containing the location to search from.

validation_method: str

The validation method to use for the query.

class esorm.query.ESMatch(*args, **kwargs)[source]

Bases: dict

Represents the parameters for a match query in Elasticsearch.

analyzer: str

Optional analyzer to use for the query.

boost: Union[int, float]

Optional boosting value for the query.

fuzziness: Union[int, str]

Optional fuzziness value for the query.

max_expansions: int

Optional maximum number of expansions for the query.

operator: str

The operator to use for the query.

prefix_length: int

Optional prefix length for the query.

query: Union[str, int, float]

The value to search for.

zero_terms_query: str

Optional zero terms query for the query.

class esorm.query.ESMatchAll(*args, **kwargs)[source]

Bases: dict

Represents a match_all query for matching all documents in Elasticsearch.

boost: float

Optional boosting value for the query

class esorm.query.ESMatchNone(*args, **kwargs)[source]

Bases: dict

Represents a match_none query for matching no documents in Elasticsearch.

class esorm.query.ESMatchPhrase(*args, **kwargs)[source]

Bases: dict

Represents the parameters for a match_phrase query in Elasticsearch.

analyzer: str

Optional analyzer to use for the query.

boost: Union[int, float]

Optional boosting value for the query.

query: str

The value to search for.

slop: int

Optional slop value for the query.

esorm.query.ESMust

Represents must queries in Elasticsearch

alias of List[Union[FieldRange, FieldTerm, FieldTerms, FieldMatch, FieldMatchPhrase, FieldExists, FieldWildcard, FieldPrefix, FieldFuzzy, FieldGeoDistance, FieldMatchAll, FieldESMatchNone, FieldBool]]

esorm.query.ESMustNot

Represents must_not queries in Elasticsearch

alias of List[Union[FieldRange, FieldTerm, FieldTerms, FieldMatch, FieldMatchPhrase, FieldExists, FieldWildcard, FieldPrefix, FieldFuzzy, FieldGeoDistance, FieldMatchAll, FieldESMatchNone, FieldBool]]

class esorm.query.ESPrefix(*args, **kwargs)[source]

Bases: dict

Represents a prefix query for prefix matching in Elasticsearch.

boost: float

Optional boosting value for the query

rewrite: str

Optional, method used to rewrite the query (e.g., “constant_score”, “scoring_boolean”)

value: str

The prefix to search for.

class esorm.query.ESQuery(*args, **kwargs)[source]

Bases: dict

Elasticsearch query structure

aggs: Dict[str, ESAgg]

Aggregations query structure

bool: ESBool

Bool query structure

exists: ESExists

Exists query structure

fuzzy: Dict[str, ESFuzzy]

Fuzzy query structure

geo_distance: Dict[str, ESGeoDistance]

Geo distance query structure

match: Dict[str, ESMatch]

Match query structure

match_all: ESMatchAll

Match all query structure

match_none: ESMatchNone

Match none query structure

match_phrase: Dict[str, ESMatchPhrase]

Match phrase query structure

prefix: Dict[str, ESPrefix]

Prefix query structure

term: Dict[str, ESTerm]

Term query structure

wildcard: Dict[str, ESWildcard]

Wildcard query structure

class esorm.query.ESRange(*args, **kwargs)[source]

Bases: dict

Range query structure

gt: Union[int, float, str]

Greater than

gte: Union[int, float, str]

Greater than or equal

lt: Union[int, float, str]

Less than

lte: Union[int, float, str]

Less than or equal

esorm.query.ESShould

Represents should queries in Elasticsearch

alias of List[Union[FieldRange, FieldTerm, FieldTerms, FieldMatch, FieldMatchPhrase, FieldExists, FieldWildcard, FieldPrefix, FieldFuzzy, FieldGeoDistance, FieldMatchAll, FieldESMatchNone, FieldBool]]

class esorm.query.ESTerm(*args, **kwargs)[source]

Bases: dict

Represents the parameters for a term query in Elasticsearch.

boost: Union[int, float]

Optional boosting value for the query.

value: Union[str, int, float]

The value to search for.

class esorm.query.ESWildcard(*args, **kwargs)[source]

Bases: dict

Represents a wildcard query for pattern matching in Elasticsearch.

boost: float

Optional boosting value for the query

case_insensitive: bool

Optional, whether the query is case insensitive.

rewrite: str

Optional, method used to rewrite the query (e.g., “constant_score”, “scoring_boolean”)

value: str

The pattern to search for. e.g., “te?t” or “test*”

class esorm.query.FieldBool(*args, **kwargs)[source]

Bases: dict

Represents a bool query for combining other queries in Elasticsearch.

bool: ESBool

Bool query structure

class esorm.query.FieldESMatchNone(*args, **kwargs)[source]

Bases: dict

Represents a match_none query for matching no documents in Elasticsearch.

match_none: ESMatchNone

Match none query structure

class esorm.query.FieldExists(*args, **kwargs)[source]

Bases: dict

Represents an exists query to check if a field exists in Elasticsearch.

exists: ESExists

Exists query structure

class esorm.query.FieldFuzzy(*args, **kwargs)[source]

Bases: dict

Represents a fuzzy query for approximate matching in Elasticsearch.

fuzzy: Dict[str, ESFuzzy]

Fuzzy query structure

class esorm.query.FieldGeoDistance(*args, **kwargs)[source]

Bases: dict

Represents a geo_distance query for distance-based geospatial queries in Elasticsearch.

geo_distance: Dict[str, ESGeoDistance]

Geo distance query structure

class esorm.query.FieldMatch(*args, **kwargs)[source]

Bases: dict

Represents a match query for matching based on the provided text in Elasticsearch.

match: Dict[str, ESMatch]

Match query structure

class esorm.query.FieldMatchAll(*args, **kwargs)[source]

Bases: dict

Represents a match_all query for matching all documents in Elasticsearch.

match_all: ESMatchAll

Match all query structure

class esorm.query.FieldMatchPhrase(*args, **kwargs)[source]

Bases: dict

Represents a match_phrase query for exact phrase matching in Elasticsearch.

match_phrase: Dict[str, ESMatchPhrase]

Match phrase query structure

class esorm.query.FieldPrefix(*args, **kwargs)[source]

Bases: dict

Represents a prefix query for prefix matching in Elasticsearch.

prefix: Dict[str, ESPrefix]

Prefix query structure

class esorm.query.FieldRange(*args, **kwargs)[source]

Bases: dict

Range qyery field

range: Dict[str, ESRange]

Range query structure

class esorm.query.FieldTerm(*args, **kwargs)[source]

Bases: dict

Represents a term query for exact value matching in Elasticsearch.

term: Dict[str, ESTerm]

Term query structure

class esorm.query.FieldTerms(*args, **kwargs)[source]

Bases: dict

Represents a terms query for exact value matching in Elasticsearch.

terms: Dict[str, List[Union[str, int, float]]]

Terms query structure

class esorm.query.FieldWildcard(*args, **kwargs)[source]

Bases: dict

Represents a wildcard query for pattern matching in Elasticsearch.

wildcard: Dict[str, ESWildcard]

Wildcard query structure

esorm.response module

This module contains type definitions for the response from Elasticsearch.

class esorm.response.ESResponse(*args, **kwargs)[source]

Bases: dict

Represents the overall structure of an Elasticsearch response.

aggregations: Dict[str, Union[ESAggValueResponse, ESAggTermsResponse, ESAggHistogramResponse]]

The aggregations section of the response.

hits: Hits

The hits section of the response.

timed_out: bool

Whether the query timed out.

took: int

The time in milliseconds it took to execute the query.

class esorm.response.Hit(*args, **kwargs)[source]

Bases: dict

Represents a single hit (result) from Elasticsearch.

class esorm.response.Hits(*args, **kwargs)[source]

Bases: dict

Represents the hits section of the Elasticsearch response.

hits: List[Hit]

List of hits.

max_score: Optional[float]

The maximum score of the hits.

total: Dict[str, int]

The total number of hits.

esorm.utils module

Utility functions

esorm.utils.camel_case(snake_str, capitalize_first=False)[source]

Convert to camel case

Parameters:
  • snake_str (str) – The string to convert to camel case

  • capitalize_first (bool) – Capitalize the first letter

Returns:

Converted string

esorm.utils.snake_case(camel_str)[source]

Convert to snake case

Parameters:

camel_str (str) – The string to convert to snake case

Returns:

Converted string

esorm.utils.utcnow()[source]

Get current UTC time

Returns:

Current UTC time

esorm.watcher module

ElasticSearch Watcher support for ESORM

class esorm.watcher.Action(*args, **kwargs)[source]

Bases: dict

Action definition

email: Dict[str, Any]
index: Dict[str, Any]
logging: Dict[str, Any]
pagerduty: Dict[str, Any]
slack: Dict[str, Any]
throttle_period: str
transform: Dict[str, Transform]
webhook: ActionWebhook
class esorm.watcher.ActionWebhook(*args, **kwargs)[source]

Bases: dict

Action webhook definition

auth: Dict[str, Any]
body: str
connection_timeout: str
headers: Dict[str, Any]
host: str
method: str
params: Dict[str, Any]
path: str
port: int
proxy: Dict[str, Any]
read_timeout: str
retries: int
retry_on_status: List[int]
scheme: str
ssl: Dict[str, Any]
timeout: str
webhook: Dict[str, Any]
class esorm.watcher.ArrayCompare(*args, **kwargs)[source]

Bases: dict

Array compare definition

eq: Any
gt: Union[int, float, str, Dict[str, Any]]
gte: Union[int, float, str, Dict[str, Any]]
lt: Union[int, float, str, Dict[str, Any]]
lte: Union[int, float, str, Dict[str, Any]]
not_eq: Any
path: str
class esorm.watcher.Body(*args, **kwargs)[source]

Bases: dict

Body definition

query: ESQuery
size: int
sort: Dict[str, Order]
class esorm.watcher.Compare(*args, **kwargs)[source]

Bases: dict

Compare definition

eq: Any
gt: Union[int, float, str, Dict[str, Any]]
gte: Union[int, float, str, Dict[str, Any]]
lt: Union[int, float, str, Dict[str, Any]]
lte: Union[int, float, str, Dict[str, Any]]
not_eq: Any
class esorm.watcher.Condition(*args, **kwargs)[source]

Bases: dict

Condition definition

always: EmptyDict
array_compare: Dict[str, Any]
compare: Dict[str, Compare]
never: EmptyDict
script: Dict[str, Any]
class esorm.watcher.DeleteWatcher[source]

Bases: Watcher

Watcher for deleting documents matching a query

class esorm.watcher.EmptyDict(*args, **kwargs)[source]

Bases: dict

Empty dict definition

class esorm.watcher.Order(*args, **kwargs)[source]

Bases: dict

Order definition

order: str
class esorm.watcher.Request(*args, **kwargs)[source]

Bases: dict

Request definition

body: ESQuery
indices: Union[List[str], str]
tepmlate: Dict[str, Any]
class esorm.watcher.Schedule(*args, **kwargs)[source]

Bases: dict

Schedule definition

interval: str
class esorm.watcher.Search(*args, **kwargs)[source]

Bases: dict

Search definition

extract: List[str]
request: Request
class esorm.watcher.Transform(*args, **kwargs)[source]

Bases: dict

Transform definition

chain: List[Dict[str, Any]]
script: Dict[str, Any]
search: Dict[str, Any]
class esorm.watcher.Trigger(*args, **kwargs)[source]

Bases: dict

Trigger definition

schedule: Schedule
class esorm.watcher.Watcher[source]

Bases: object

Watcher definition

actions: Optional[Dict[str, Action]] = None
condition: Optional[Condition] = None
input: Optional[Search] = None
metadata: Optional[Dict[str, Any]] = None
to_es()[source]
trigger: Optional[Trigger] = None
class esorm.watcher.WatcherMeta(name, bases, attrs)[source]

Bases: type

Watcher metaclass

async esorm.watcher.setup_watchers(*_, debug=False)[source]

Setup watchers :param _: Unused :type debug: :param debug: Whether to print the watcher definition