Source code for esorm.esorm

"""
ElasticSearch ORM main module
"""
from typing import Optional, cast, Union, List, Mapping

import asyncio

from elasticsearch import AsyncElasticsearch
from elastic_transport import NodeConfig

from .logger import logger


class _ESProxy:
    """
    ElasticSearch client proxy
    """
    __client__: Optional[AsyncElasticsearch] = None

    def set_client(self, client):
        self.__client__ = client

    def __getattr__(self, name):
        if self.__client__ is None:
            if name == '__wrapped__':  # This is needed for pdoc3
                return object.__getattribute__(self, name)
            raise ValueError(f"ElasticSearch client has not been set yet, please call connect! ({name})")
        return getattr(self.__client__, name)


# Global client proxy
es = cast(AsyncElasticsearch, _ESProxy())

__all__ = ['es', 'connect', 'get_es_version']


[docs]async def connect(hosts: Union[str, Union[List[Union[str, Mapping[str, Union[str, int]], NodeConfig]], None]], *args, wait=False, **kwargs) -> Optional[AsyncElasticsearch]: """ Connect to ElasticSearch :param hosts: ElasticSearch hosts to connect, either a list a mapping, or a single string :param args: Other AsyncElasticsearch arguments :param wait: Wait for AsyncElasticsearch to be ready :param kwargs: Other AsyncElasticsearch keyword arguments :return: AsyncElasticsearch client instance """ cast(_ESProxy, es).set_client(AsyncElasticsearch(hosts=hosts, *args, **kwargs)) try: logger.info(f"Connecting to ElasticSearch ({hosts})…") if wait: # Wait for ES to start, this will block until ES is ready to prevent routers start early while True: status = await es.ping() if not status: logger.info("Waiting for ElasticSearch to be ready…") await asyncio.sleep(2.0) else: break return es except asyncio.CancelledError: return None
[docs]async def get_es_version() -> str: """ Get ElasticSearch version :return: ElasticSearch version """ info = await es.info() return info['version']['number']