Source code for koapy.utils.grpc
import contextlib
import threading
import warnings
from pprint import PrettyPrinter
import grpc
from google.protobuf.json_format import MessageToDict
@contextlib.contextmanager
[docs]def warn_on_rpc_error_context():
try:
yield
except grpc.RpcError as e:
warnings.warn(str(e))
[docs]def warn_on_rpc_error(stream):
with warn_on_rpc_error_context():
for event in stream:
yield event
[docs]def cancel_after(stream, after):
timer = threading.Timer(after, stream.cancel)
timer.start()
return warn_on_rpc_error(stream)
[docs]def pprint_message(message):
pp = PrettyPrinter()
pp.pprint(MessageToDict(message, preserving_proto_field_name=True))