Source code for koapy.backend.kiwoom_open_api_plus.utils.queue.QueueBasedIterableObserver

from queue import Empty, Queue

from rx.core.typing import Observer

from koapy.backend.kiwoom_open_api_plus.utils.queue.QueueIterator import (
    BufferedQueueIterator,
)


[docs]class QueueBasedIterableObserverIterator(BufferedQueueIterator): def __init__(self, queue, sentinel): self._queue = queue self._sentinel = sentinel super().__init__(self._queue)
[docs] def next(self, block=True, timeout=None): value, error = super().next(block, timeout) if value == self._sentinel: raise StopIteration elif error is not None and isinstance(error, Exception): raise error return value
[docs] def head(self): value, error = super().head() if value == self._sentinel: raise Empty elif error is not None and isinstance(error, Exception): raise error return value
[docs]class QueueBasedIterableObserver(Observer): _default_maxsize = 0 _queue_get_timeout = 2 def __init__(self, queue=None, maxsize=None): if queue is None: if maxsize is None: maxsize = self._default_maxsize queue = Queue(maxsize) self._queue = queue self._maxsize = maxsize self._sentinel = object() self._iterator = QueueBasedIterableObserverIterator(self._queue, self._sentinel) @property
[docs] def queue(self): return self._queue
[docs] def on_next(self, value): self._queue.put((value, None))
[docs] def on_error(self, error): self._queue.put((None, error))
[docs] def on_completed(self): self._queue.put((self._sentinel, None))
def __iter__(self): return self._iterator
[docs] def stop(self): return self._iterator.stop()