Source code for anybot.action
from abc import abstractmethod
from collections import deque
from contextlib import contextmanager
from functools import partial
from heapq import heappop, heappush
from time import sleep, monotonic as time
from .utils import ensure_time, only_daylight_hours, random_intervals
[docs]class Action:
"""Continuous Action that executes in steps over time"""
done = False
day_begins_at = 7
day_ends_at = 24
random_interval = 0
def __init__(self, only_during_daylight=False):
if only_during_daylight:
self._context = partial(only_daylight_hours,
self.day_begins_at, self.day_ends_at,
self.random_interval)
else:
self._context = _dummy_context
[docs] @abstractmethod
def step(self, api):
raise NotImplementedError
[docs] def begin(self, api):
pass
[docs] def end(self, api):
pass
[docs] def cancel(self):
self.done = True
[docs] def restart(self):
self.done = False
[docs] def run(self, api):
self.begin(api)
while not self.done:
with self._context():
self.step(api)
self.end(api)
[docs]class QueuedActions(Action):
"""
Action that can queue commands and then execute them sequentially
at a rate of 'per_interval' commands in the time 'interval'.
:param per_interval: number of actions to execute in one interval
:param interval: time in seconds for interval
"""
def __init__(self, per_interval=30, interval=3600, **kwargs):
super(QueuedActions, self).__init__(**kwargs)
self.interval = interval
self.per_interval = per_interval
self._queue = deque(maxlen=100)
[docs] @abstractmethod
def update(self, api):
"""
"""
raise NotImplementedError
[docs] def add(self, action, *args):
"""
Add an action onto the queue to be executed later
:param action:
:param args:
"""
if (action, *args) not in self._queue:
self._queue.append((action, *args))
[docs] def step(self, api):
with ensure_time(self.interval):
begin_update = time()
self.update(api)
time_left = self.interval - (time() - begin_update)
for sleep_time in random_intervals(self.per_interval, time_left):
with ensure_time(sleep_time):
while not self._queue:
sleep(1e-3)
action, *args = self._queue.popleft()
action(*args)
[docs]class CandidateActions(QueuedActions):
def __init__(self, action, per_hour=30, min_candidates=100):
super(CandidateActions, self).__init__(per_hour, 3600)
self.action = action
self.min_candidates = min_candidates
self.candidates = []
self.gen = None
[docs] @abstractmethod
def create_gen(self, api):
raise NotImplementedError
[docs] @abstractmethod
def get_score(self, api, candidate):
raise NotImplementedError
[docs] def before_update(self, api):
pass
[docs] def begin(self, api):
self.gen = self.create_gen(api)
[docs] def update(self, api):
self.before_update(api)
while len(self.candidates) <= self.min_candidates:
item = next(self.gen)
score = self.get_score(api, item)
if score is not None:
heappush(self.candidates, (score, item))
_, best_candidate = heappop(self.candidates)
self.add(getattr(best_candidate, self.action))
@contextmanager
def _dummy_context():
"""Dummy context to replace only_daylight_hours in Action"""
yield