Source code for koapy.backend.daishin_cybos_plus.proxy.CybosPlusProxyServiceServicer
import queue
import threading
import pythoncom
import win32com.client
from koapy.backend.daishin_cybos_plus.proxy import (
CybosPlusProxyService_pb2,
CybosPlusProxyService_pb2_grpc,
)
from koapy.backend.daishin_cybos_plus.proxy.CybosPlusProxyServiceMessageUtils import (
AssignPrimitive,
ExtractPrimitive,
)
[docs]class CybosPlusEvent:
def __init__(self, iterator):
self._iterator = iterator
self._done = threading.Event()
def __del__(self):
self.done()
[docs] def done(self):
self._done.set()
self._iterator._events.task_done()
[docs] def wait_for_done(self):
self._done.wait()
[docs]class CybosPlusEventIterator:
def __init__(self, handler):
self._handler = handler
self._events = queue.Queue()
self._sentinel = object()
self._invalid = False
with self._handler._lock:
self._handler._iterators.append(self)
def __del__(self):
self._invalid = True
self._events.put(self._sentinel)
with self._handler._lock:
if self in self._handler._iterators:
self._handler._iterators.remove(self)
[docs] def notify(self):
if not self._invalid:
event = CybosPlusEvent(self)
self._events.put(event)
return event
else:
raise ValueError
def __next__(self):
if self._invalid:
raise StopIteration
event = self._events.get()
if event is self._sentinel:
raise StopIteration
return event
[docs]class CybosPlusEventHandler:
def __init__(self):
self._lock = threading.RLock()
self._iterators = []
[docs] def OnRecieved(self):
with self._lock:
events = [iterator.notify() for iterator in self._iterators]
for event in events:
event.wait_for_done()
def __iter__(self):
return CybosPlusEventIterator(self)
[docs]class CybosPlusProxyServiceServicer(
CybosPlusProxyService_pb2_grpc.CybosPlusProxyServiceServicer
):
_lock = threading.RLock()
_dispatches = {}
_handlers = {}
def _EnsureDispatch(self, prog):
if prog not in self._dispatches:
with self._lock:
if prog not in self._dispatches:
pythoncom.CoInitialize()
dispatch = win32com.client.gencache.EnsureDispatch(prog)
self._dispatches[prog] = dispatch
if False: # TODO: 이벤트 처리 관련해서 아직 개발하지 못함
handler = win32com.client.WithEvents(
dispatch, CybosPlusEventHandler
)
self._handlers[prog] = handler
dispatch = self._dispatches[prog]
return dispatch
def _GetHandler(self, prog):
handler = None
_ = self._EnsureDispatch(prog)
if prog in self._handlers:
handler = self._handlers[prog]
return handler
[docs] def Dispatch(self, request, context):
prog = request.prog
dispatch = self._EnsureDispatch(prog)
properties = [p for p in dispatch._prop_map_get_.keys()]
methods = [
m
for m in dir(dispatch)
if not m.startswith("_") and m not in ["CLSID", "coclass_clsid"]
]
response = CybosPlusProxyService_pb2.DispatchResponse()
response.prog = prog
response.properties.extend(properties)
response.methods.extend(methods)
return response
[docs] def Property(self, request, context):
prog = request.prog
dispatch = self._EnsureDispatch(prog)
name = request.name
value = getattr(dispatch, name)
response = CybosPlusProxyService_pb2.PropertyResponse()
AssignPrimitive(response.value, value)
return response
[docs] def Method(self, request, context):
prog = request.prog
dispatch = self._EnsureDispatch(prog)
name = request.name
method = getattr(dispatch, name)
arguments = [ExtractPrimitive(arg.value) for arg in request.arguments]
return_value = method(*arguments)
response = CybosPlusProxyService_pb2.MethodResponse()
AssignPrimitive(response.return_value, return_value)
return response
[docs] def Event(self, request_iterator, context):
start_request = next(request_iterator)
assert start_request.HasField("start")
prog = start_request.start.prog
handler = self._GetHandler(prog)
response = CybosPlusProxyService_pb2.EventResponse()
response.started
yield response
for event in handler:
response = CybosPlusProxyService_pb2.EventResponse()
response.fired
yield response
done_request = next(request_iterator)
event.done()
if done_request.HasField("stop"):
break