forked from 0rpc/zerorpc-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcontext.py
More file actions
118 lines (100 loc) · 4.37 KB
/
context.py
File metadata and controls
118 lines (100 loc) · 4.37 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# -*- coding: utf-8 -*-
# Open Source Initiative OSI - The MIT License (MIT):Licensing
#
# The MIT License (MIT)
# Copyright (c) 2012 DotCloud Inc ([email protected])
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of
# this software and associated documentation files (the "Software"), to deal in
# the Software without restriction, including without limitation the rights to
# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
# of the Software, and to permit persons to whom the Software is furnished to do
# so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import uuid
import functools
import random
import gevent_zmq as zmq
class Context(zmq.Context):
_instance = None
def __init__(self):
self._middlewares = []
self._middlewares_hooks = {
'resolve_endpoint': [],
'raise_error': [],
'call_procedure': [],
'load_task_context': [],
'get_task_context': [],
'inspect_error': []
}
self._reset_msgid()
@staticmethod
def get_instance():
if Context._instance is None:
Context._instance = Context()
return Context._instance
def _reset_msgid(self):
self._msg_id_base = str(uuid.uuid4())[8:]
self._msg_id_counter = random.randrange(0, 2**32)
self._msg_id_counter_stop = random.randrange(self._msg_id_counter, 2**32)
def new_msgid(self):
if (self._msg_id_counter >= self._msg_id_counter_stop):
self._reset_msgid()
else:
self._msg_id_counter = (self._msg_id_counter + 1) & 0xffffffff
return '{0:08x}{1}'.format(self._msg_id_counter, self._msg_id_base)
def register_middleware(self, middleware_instance):
registered_count = 0
self._middlewares.append(middleware_instance)
for hook in self._middlewares_hooks.keys():
functor = getattr(middleware_instance, hook, None)
if functor is None:
try:
functor = middleware_instance.get(hook, None)
except AttributeError:
pass
if functor is not None:
self._middlewares_hooks[hook].append(functor)
registered_count += 1
return registered_count
def middleware_resolve_endpoint(self, endpoint):
for functor in self._middlewares_hooks['resolve_endpoint']:
endpoint = functor(endpoint)
return endpoint
def middleware_inspect_error(self, exc_type, exc_value, exc_traceback):
exc_info = exc_type, exc_value, exc_traceback
task_context = self.middleware_get_task_context()
for functor in self._middlewares_hooks['inspect_error']:
functor(task_context, exc_info)
def middleware_raise_error(self, event):
for functor in self._middlewares_hooks['raise_error']:
functor(event)
def middleware_call_procedure(self, procedure, *args, **kwargs):
class chain(object):
def __init__(self, fct, next):
functools.update_wrapper(self, next)
self.fct = fct
self.next = next
def __call__(self, *args, **kwargs):
return self.fct(self.next, *args, **kwargs)
for functor in self._middlewares_hooks['call_procedure']:
procedure = chain(functor, procedure)
return procedure(*args, **kwargs)
def middleware_load_task_context(self, event_header):
for functor in self._middlewares_hooks['load_task_context']:
functor(event_header)
def middleware_get_task_context(self):
event_header = {}
for functor in self._middlewares_hooks['get_task_context']:
event_header.update(functor())
return event_header