Source code for koapy.backend.daishin_cybos_plus.proxy.CybosPlusProxyServiceServicer

import queue
import threading

import pythoncom
import win32com.client

from koapy.backend.daishin_cybos_plus.proxy import (
    CybosPlusProxyService_pb2,
    CybosPlusProxyService_pb2_grpc,
)
from koapy.backend.daishin_cybos_plus.proxy.CybosPlusProxyServiceMessageUtils import (
    AssignPrimitive,
    ExtractPrimitive,
)


[docs]class CybosPlusEvent: def __init__(self, iterator): self._iterator = iterator self._done = threading.Event() def __del__(self): self.done()
[docs] def done(self): self._done.set() self._iterator._events.task_done()
[docs] def wait_for_done(self): self._done.wait()
[docs]class CybosPlusEventIterator: def __init__(self, handler): self._handler = handler self._events = queue.Queue() self._sentinel = object() self._invalid = False with self._handler._lock: self._handler._iterators.append(self) def __del__(self): self._invalid = True self._events.put(self._sentinel) with self._handler._lock: if self in self._handler._iterators: self._handler._iterators.remove(self)
[docs] def notify(self): if not self._invalid: event = CybosPlusEvent(self) self._events.put(event) return event else: raise ValueError
def __next__(self): if self._invalid: raise StopIteration event = self._events.get() if event is self._sentinel: raise StopIteration return event
[docs]class CybosPlusEventHandler: def __init__(self): self._lock = threading.RLock() self._iterators = []
[docs] def OnRecieved(self): with self._lock: events = [iterator.notify() for iterator in self._iterators] for event in events: event.wait_for_done()
def __iter__(self): return CybosPlusEventIterator(self)
[docs]class CybosPlusProxyServiceServicer( CybosPlusProxyService_pb2_grpc.CybosPlusProxyServiceServicer ): _lock = threading.RLock() _dispatches = {} _handlers = {} def _EnsureDispatch(self, prog): if prog not in self._dispatches: with self._lock: if prog not in self._dispatches: pythoncom.CoInitialize() dispatch = win32com.client.gencache.EnsureDispatch(prog) self._dispatches[prog] = dispatch if False: # TODO: 이벤트 처리 관련해서 아직 개발하지 못함 handler = win32com.client.WithEvents( dispatch, CybosPlusEventHandler ) self._handlers[prog] = handler dispatch = self._dispatches[prog] return dispatch def _GetHandler(self, prog): handler = None _ = self._EnsureDispatch(prog) if prog in self._handlers: handler = self._handlers[prog] return handler
[docs] def Dispatch(self, request, context): prog = request.prog dispatch = self._EnsureDispatch(prog) properties = [p for p in dispatch._prop_map_get_.keys()] methods = [ m for m in dir(dispatch) if not m.startswith("_") and m not in ["CLSID", "coclass_clsid"] ] response = CybosPlusProxyService_pb2.DispatchResponse() response.prog = prog response.properties.extend(properties) response.methods.extend(methods) return response
[docs] def Property(self, request, context): prog = request.prog dispatch = self._EnsureDispatch(prog) name = request.name value = getattr(dispatch, name) response = CybosPlusProxyService_pb2.PropertyResponse() AssignPrimitive(response.value, value) return response
[docs] def Method(self, request, context): prog = request.prog dispatch = self._EnsureDispatch(prog) name = request.name method = getattr(dispatch, name) arguments = [ExtractPrimitive(arg.value) for arg in request.arguments] return_value = method(*arguments) response = CybosPlusProxyService_pb2.MethodResponse() AssignPrimitive(response.return_value, return_value) return response
[docs] def Event(self, request_iterator, context): start_request = next(request_iterator) assert start_request.HasField("start") prog = start_request.start.prog handler = self._GetHandler(prog) response = CybosPlusProxyService_pb2.EventResponse() response.started yield response for event in handler: response = CybosPlusProxyService_pb2.EventResponse() response.fired yield response done_request = next(request_iterator) event.done() if done_request.HasField("stop"): break