Source code for pyzor.message
"""This modules contains the various messages used in the pyzor client server
communication.
"""
import random
import email.message
import pyzor
[docs]
class Message(email.message.Message):
def __init__(self):
email.message.Message.__init__(self)
self.setup()
[docs]
def init_for_sending(self):
self.ensure_complete()
def __str__(self):
# The parent class adds the unix From header.
return self.as_string()
[docs]
def ensure_complete(self):
pass
[docs]
class ThreadedMessage(Message):
[docs]
def init_for_sending(self):
if 'Thread' not in self:
self.set_thread(ThreadId.generate())
assert 'Thread' in self
self["PV"] = str(pyzor.proto_version)
Message.init_for_sending(self)
[docs]
def ensure_complete(self):
if 'PV' not in self or 'Thread' not in self:
raise pyzor.IncompleteMessageError("Doesn't have fields for a "
"ThreadedMessage.")
Message.ensure_complete(self)
[docs]
def get_protocol_version(self):
return float(self['PV'])
[docs]
def get_thread(self):
return ThreadId(self['Thread'])
[docs]
def set_thread(self, i):
self['Thread'] = str(i)
[docs]
class Response(ThreadedMessage):
ok_code = 200
[docs]
def ensure_complete(self):
if 'Code' not in self or 'Diag' not in self:
raise pyzor.IncompleteMessageError("doesn't have fields for a "
"Response")
ThreadedMessage.ensure_complete(self)
[docs]
def is_ok(self):
return self.get_code() == self.ok_code
[docs]
def get_code(self):
return int(self['Code'])
[docs]
def get_diag(self):
return self['Diag']
[docs]
def head_tuple(self):
return self.get_code(), self.get_diag()
[docs]
class Request(ThreadedMessage):
"""This is the class that should be used to read in Requests of any type.
Subclasses are responsible for setting 'Op' if they are generating a
message,"""
[docs]
def get_op(self):
return self['Op']
[docs]
def ensure_complete(self):
if 'Op' not in self:
raise pyzor.IncompleteMessageError("doesn't have fields for a "
"Request")
ThreadedMessage.ensure_complete(self)
[docs]
class ClientSideRequest(Request):
op = None
[docs]
def setup(self):
Request.setup(self)
self["Op"] = self.op
[docs]
class SimpleDigestBasedRequest(ClientSideRequest):
def __init__(self, digest=None):
ClientSideRequest.__init__(self)
self.digest_count = 0
if digest:
self.add_digest(digest)
[docs]
def add_digest(self, digest):
self.add_header("Op-Digest", digest)
self.digest_count += 1
[docs]
class SimpleDigestSpecBasedRequest(SimpleDigestBasedRequest):
def __init__(self, digest=None, spec=None):
SimpleDigestBasedRequest.__init__(self, digest)
if spec:
flat_spec = [item for sublist in spec for item in sublist]
self["Op-Spec"] = ",".join(str(part) for part in flat_spec)
[docs]
class PingRequest(ClientSideRequest):
op = "ping"
[docs]
class PongRequest(SimpleDigestBasedRequest):
op = "pong"
[docs]
class CheckRequest(SimpleDigestBasedRequest):
op = "check"
[docs]
class InfoRequest(SimpleDigestBasedRequest):
op = "info"
[docs]
class ReportRequest(SimpleDigestSpecBasedRequest):
op = "report"
[docs]
class WhitelistRequest(SimpleDigestSpecBasedRequest):
op = "whitelist"
[docs]
class ThreadId(int):
# (0, 1024) is reserved
full_range = (0, 2 ** 16)
ok_range = (1024, full_range[1])
error_value = 0
def __new__(cls, i):
self = int.__new__(cls, i)
if not (cls.full_range[0] <= self < cls.full_range[1]):
raise ValueError("value outside of range")
return self
[docs]
@classmethod
def generate(cls):
return cls(random.randrange(*cls.ok_range))
[docs]
def in_ok_range(self):
return self.ok_range[0] <= self < self.ok_range[1]