"""
ElasticSearch Watcher support for ESORM
"""
from typing import TypedDict, Tuple, Dict, List, Any, Union, Type, Optional, cast
import json
from .logger import logger
from .query import ESQuery
from .esorm import es
[docs]class Schedule(TypedDict):
""" Schedule definition """
interval: str
[docs]class Trigger(TypedDict):
""" Trigger definition """
schedule: Schedule
[docs]class Order(TypedDict):
""" Order definition """
order: str
[docs]class Body(TypedDict, total=False):
""" Body definition """
size: int
sort: Dict[str, Order]
query: ESQuery
[docs]class Request(TypedDict, total=False):
""" Request definition """
indices: Union[List[str], str]
body: ESQuery
tepmlate: Dict[str, Any]
[docs]class Search(TypedDict, total=False):
""" Search definition """
request: Request
extract: List[str]
[docs]class EmptyDict(TypedDict, total=False):
""" Empty dict definition """
[docs]class Compare(TypedDict, total=False):
""" Compare definition """
eq: Any
not_eq: Any
gt: Union[int, float, str, Dict[str, Any]]
gte: Union[int, float, str, Dict[str, Any]]
lt: Union[int, float, str, Dict[str, Any]]
lte: Union[int, float, str, Dict[str, Any]]
[docs]class ArrayCompare(TypedDict, total=False):
""" Array compare definition """
path: str
eq: Any
not_eq: Any
gt: Union[int, float, str, Dict[str, Any]]
gte: Union[int, float, str, Dict[str, Any]]
lt: Union[int, float, str, Dict[str, Any]]
lte: Union[int, float, str, Dict[str, Any]]
[docs]class Condition(TypedDict, total=False):
""" Condition definition """
compare: Dict[str, Compare]
array_compare: Dict[str, Any]
never: EmptyDict
always: EmptyDict
script: Dict[str, Any]
[docs]class ActionWebhook(TypedDict, total=False):
""" Action webhook definition """
scheme: str
host: str
port: int
method: str
path: str
params: Dict[str, Any]
headers: Dict[str, Any]
body: str
auth: Dict[str, Any]
proxy: Dict[str, Any]
timeout: str
connection_timeout: str
read_timeout: str
retries: int
retry_on_status: List[int]
ssl: Dict[str, Any]
webhook: Dict[str, Any]
[docs]class Action(TypedDict, total=False):
""" Action definition """
transform: Dict[str, Transform]
throttle_period: str
email: Dict[str, Any]
index: Dict[str, Any]
logging: Dict[str, Any]
pagerduty: Dict[str, Any]
slack: Dict[str, Any]
webhook: ActionWebhook
[docs]class Watcher(metaclass=WatcherMeta):
"""
Watcher definition
"""
metadata: Optional[Dict[str, Any]] = None
trigger: Optional[Trigger] = None
input: Optional[Search] = None
condition: Optional[Condition] = None
actions: Optional[Dict[str, Action]] = None
def __init__(self):
assert self.trigger is not None, "Trigger is not defined"
[docs] def to_es(self):
res = {}
if self.metadata is not None:
res['metadata'] = self.metadata
if self.trigger is not None:
res['trigger'] = self.trigger
if self.input is not None:
res['input'] = self.input
if self.condition is not None:
res['condition'] = self.condition
if self.actions is not None:
res['actions'] = self.actions
return res
[docs]class DeleteWatcher(Watcher):
"""
Watcher for deleting documents matching a query
"""
_scheme: str = "http"
""" ES scheme """
_host: str = "127.0.0.1"
""" ES host """
_port: int = 9200
""" ES port """
_index: Optional[str] = None
""" Target index """
_query: Optional[ESQuery] = None
""" Query to match documents to delete """
def __init__(self):
super().__init__()
assert self._index is not None, "Index is not defined"
assert self._query is not None, "Query is not defined"
self.actions = {
"delete_doc": {
"webhook": {
"scheme": self._scheme,
"method": "POST",
"host": self._host,
"port": self._port,
"path": f"/{self._index}/_delete_by_query",
"body": f'{{"query": {json.dumps(self._query)}}}',
}
}
}
[docs]async def setup_watchers(*_, debug=False):
"""
Setup watchers
:param _: Unused
:param debug: Whether to print the watcher definition
"""
for WatcherClass in WatcherMeta.__watchers__:
watcher = WatcherClass()
if debug:
from pprint import pformat
logger.debug(
f"`{Watcher.__name__}` watcer:\n {pformat(watcher.to_es(), indent=2, width=100, compact=False, sort_dicts=False)}")
await es.watcher.put_watch(id=Watcher.__name__, **watcher.to_es())
logger.info(f"Watcher {Watcher.__name__} created.")