Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 37 additions & 7 deletions smpplib/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import socket
import struct

from time import sleep
from smpplib import consts, exceptions, smpp


Expand Down Expand Up @@ -61,12 +62,14 @@ class Client(object):
vendor = None
_socket = None
sequence_generator = None

def __init__(self, host, port, timeout=5, sequence_generator=None, logger_name=None):
def __init__(self, host, port, timeout=5, sequence_generator=None, max_outstanding_operations=None, logger_name=None):
self.host = host
self.port = int(port)
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.timeout = timeout
self.outstanding_operations = set()
self.max_outstanding_operations = max_outstanding_operations
self.logger = logging.getLogger(logger_name or 'smpp.Client.{}'.format(id(self)))
if sequence_generator is None:
sequence_generator = SimpleSequenceGenerator()
Expand Down Expand Up @@ -167,18 +170,41 @@ def unbind(self):
except socket.timeout:
raise exceptions.ConnectionError()

def send_pdu(self, p):
def manage_outstanding_operations(self, sending, pdu):
if not self.max_outstanding_operations:
return

if pdu.is_response():
operation = '{}{}'.format('S' if sending else 'C', pdu.sequence)
try:
self.outstanding_operations.remove(operation)
self.logger.debug('Unregistering outstanding operation {}'.format(operation))
except KeyError:
self.logger.warning('Failed to unregistered outstanding operation {}'.format(operation))
else:
operation = '{}{}'.format('C' if sending else 'S', pdu.sequence)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicated line which is used in both branches. Could you move it right above the if?

if pdu.sequence in self.outstanding_operations:
self.logger.warning('Outstanding operation {} already registered'.format(operation))
else:
self.outstanding_operations.add(operation)
self.logger.debug('Registering outstanding operation {}'.format(operation))

def send_pdu(self, pdu):
"""Send PDU to the SMSC"""

if self.state not in consts.COMMAND_STATES[p.command]:
if self.state not in consts.COMMAND_STATES[pdu.command]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for renaming that :)

raise exceptions.PDUError("Command %s failed: %s" % (
p.command,
pdu.command,
consts.DESCRIPTIONS[consts.SMPP_ESME_RINVBNDSTS],
))

self.logger.debug('Sending %s PDU', p.command)
if self.max_outstanding_operations and pdu.is_request():
while len(self.outstanding_operations) >= self.max_outstanding_operations:
sleep(.1)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd probably use threading.Event.wait() to block here. Then response handling code could flag the event to unblock the loop here. This way I wouldn't need the sleep

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never used events before, will take a look a that, thank you.


self.logger.debug('Sending %s PDU', pdu.command)

generated = p.generate()
generated = pdu.generate()

self.logger.debug('>>%s (%d bytes)', binascii.b2a_hex(generated), len(generated))

Expand All @@ -194,6 +220,8 @@ def send_pdu(self, p):
raise exceptions.ConnectionError()
sent += sent_last

self.manage_outstanding_operations(sending=True, pdu=pdu)

return True

def read_pdu(self):
Expand Down Expand Up @@ -227,6 +255,8 @@ def read_pdu(self):

self.logger.debug('Read %s PDU', pdu.command)

self.manage_outstanding_operations(sending=False, pdu=pdu)

if pdu.is_error():
return pdu

Expand Down