Source code for koapy.backend.kiwoom_open_api_plus.utils.grpc.PipeableMultiThreadedRendezvous

from collections.abc import Iterator

from grpc._channel import _MultiThreadedRendezvous as MultiThreadedRendezvous


[docs]class PipeableMultiThreadedRendezvous(Iterator): def __init__(self, rendezvous, iterator=None): super().__init__() self._rendezvous = rendezvous self._iterator = iterator or self._rendezvous assert isinstance(self._rendezvous, MultiThreadedRendezvous) assert isinstance(self._iterator, Iterator) def __next__(self): return self._iterator.__next__()
[docs] def pipe(self, func): rendezvous = self._rendezvous iterator = func(self._iterator) return type(self)(rendezvous, iterator)
def __getattr__(self, name): try: return getattr(self._rendezvous, name) except AttributeError: return getattr(self._iterator, name)