[Twisted-Python] How to receive a big stream data?
![](https://secure.gravatar.com/avatar/5d26f3bb28c2320b7c84f4f17a99f5ef.jpg?s=120&d=mm&r=g)
Dear all, I'm using the Twisted to write a socket server. I has a problem in the socket server. I could not receive a big size data when the data size is bigger then the connection buffersize. The protocol description: First, Client send data head to Server. Head Format: ABC,123456,XXX,XXX,XXX^ Second, Client send big data to Server, the big data is stream data. After Server received the big data and then save it to database. My problem source code : ================================================================================================ class MyProtocol(protocol.Protocol): is_big_data = False big_data = '' big_data_size = 0 def connectionMade(self): self.big_data = '' self.big_data_size = 0 self.is_big_data = False pass def dataReceived(self, data): if check_head_ok(data): # To receive the big size data if self.is_big_data: return # The big data will overflow the Twisted receive buffer size, so loop to receive data if is_big_size_data: big_data += data if big_data_size == len(big_data): # To save data to database .... pass def connectionLost(self, reason): self.big_data = '' self.big_data_size = 0 self.is_big_data = False pass def check_head_ok(self, head): if data.startswith('ABC'): values = data.split(',') self.big_data_size = int(values[1]) self.is_big_data = self.big_data_size > 0 return True else: return False ================================================================================================ If you know it, Please tell me. Thank you very much! :) Beat Regards, Steven. Wang
![](https://secure.gravatar.com/avatar/228abe46f6c8e9c7c2e8440cfda66ebd.jpg?s=120&d=mm&r=g)
You're missing "self." in several places.. On 6/27/07, steven wang <steven.zdwang@gmail.com> wrote: ================================================================================================
self.is_big_size_data
big_data += data
self.big_data
if big_data_size == len(big_data):
self.big_data_size == len(self.big_data)
The check_head_ok also won't work like that. First off you need to change head to values there - and since dataReceived is called multiple times, you need to make sure check_head_ok is only called at the beginning. Arnar
![](https://secure.gravatar.com/avatar/5d26f3bb28c2320b7c84f4f17a99f5ef.jpg?s=120&d=mm&r=g)
======================================================== from twisted.internet import protocol, reactor, defer, interfaces class MyProtocol(protocol.Protocol): is_big_data = False big_data = '' big_data_size = 0 def connectionMade(self): self.big_data = '' self.big_data_size = 0 self.is_big_data = False pass def dataReceived(self, data): if check_head_ok(data): # To receive the big size data if self.is_big_data: return # The big data will overflow the Twisted receive buffer size, so loop to receive data if self.is_big_size_data: self.big_data += data if self.big_data_size == len(big_data): # To save data to database pass def connectionLost(self, reason): self.big_data = '' self.big_data_size = 0 self.is_big_data = False pass def check_head_ok(self, data): if data.startswith('ABC'): values = data.split(',') self.big_data_size = int(values[1]) self.is_big_data = self.big_data_size > 0 return True else: return False ================================================================ I rewrite my source about MyProtocol. But I can not receive complete data, only a part of data. :( Could you help me?
![](https://secure.gravatar.com/avatar/49e6a1325c24b53539004a0b7984bf55.jpg?s=120&d=mm&r=g)
On 2 Jul 2007, at 21.40, steven wang wrote:
I rewrite my source about MyProtocol. But I can not receive complete data, only a part of data. :( Could you help me?
Have you taken a look at twisted.protocols.basic? There are some nice abstractions in there for building chunked protocols. I'd recommend NetstringReceiver if you want to keep your protocol text-based. You subclass protocols.basic.NetstringReceiver instead of protocol.Protocol, and override stringReceived(data). That gets called whenever a complete message is received (unlike dataReceived, which just gets a series of bytes that may or may not be a complete message).
![](https://secure.gravatar.com/avatar/a61e243764490913906c773c9acb0d3c.jpg?s=120&d=mm&r=g)
"steven wang" <steven.zdwang@gmail.com> writes:
But I want to receive binary data in my protocol.
Even if you start with a non-binary header, you can switch to receiving binary information at any time by going using the raw mode of most of the basic protocols. And having some sort of ASCII header prior to the raw data is often a very simple way to handle things (something in common with a tremendous number of standard TCP-based protocols). Your original post had a fairly straight-forward ASCII header that I think would probably be fine. What you're probably missing is the concept of switching to a raw binary receive mode which then switches your protocol from getting data in its lineReceived method to having rawDataReceived called. For example, here's a slightly stripped pair of protocols (server and client) that I'm currently using as part of a bigger project. Most of the communication is over a PB connection which the client uses to perform operations on the server, one of which is editing job information. But jobs contain attached files (often very large audio/video files), so committing changes to a job also involves transmitting up any newly added files. So after the client updates the server's meta data, it initiates a separate set of file transfers across a different port. In my case, the header for a file transfer includes a session key (which the protocol uses to reference the original PB-based job session the client was using) along with a file key used for storage (which uniquely references a specific file in the job). The final element is the total file size. That is, upon connecting, the client transmits a line such as: <session_uuid> <file_uuid> ####### where the two uuids are specific to the transfer underway (and help with security since a random client isn't going to know the right ids), and ######## is the overall file length. After sending that line (e.g., right after its final newline), the client just blasts up the raw data. The protocol is a simple LineReceiver based protocol, that receives that information information as an ASCII initial line, after which it switches to raw mode to receive the data. Although the data length could technically be inferred from when the client disconnects, having it up front ensures I can detect a transfer that gets interrupted. So on the server side you have: - - - - - - - - - - - - - - - - - - - - - - - - - class FileIOProtocol(LineReceiver): def __init__(self): self.info = None self.outfile = None self.remain = 0 self.crc = 0 def lineReceived(self, line): logger.debug('FileIOProtocol:lineReceived:%s', line) sess_key, file_key, self.size = line.split() file_key = uuid.UUID(file_key) try: session_uuid = uuid.UUID(sess_key) except: logger.debug('FileIOProtocol:lineReceived Invalid session') self.transport.loseConnection() return self.job_session = self.factory.sessions.get(session_uuid) if not self.job_session: logger.debug('FileIOProtocol:lineReceived Invalid session') self.transport.loseConnection() return if not self.job_session.active: logger.debug('FileIOProtocol:lineReceived Stale session') self.transport.loseConnection() return # [db3l] The original code validates the individual file uuid here # resulting in self.job_file as job file object from the session if not self.job_file: logger.debug('FileIOProtocol:lineReceived Invalid file key') self.transport.loseConnection() return # Create the upload directory if not already present if not os.path.isdir(self.job_session.upload_dir): os.makedirs(self.job_session.upload_dir) self.outfilename = os.path.join(self.job_session.upload_dir, self.job_file['uuid'].hex) logger.debug('FileIOProtocol:lineReceived Receiving into %s', self.outfilename) try: self.outfile = open(self.outfilename,'wb') except Exception, value: logger.debug('FileIOProtocol:lineReceived Unable to open file %s ' '(%s)', self.outfilename, value) self.transport.loseConnection() return self.remain = int(self.size) logger.debug('FileIOProtocol:lineReceived Entering raw mode: %s %s', self.outfile, self.remain) self.setRawMode() def rawDataReceived(self, data): self.remain -= len(data) self.crc = crc32(data, self.crc) self.outfile.write(data) def connectionMade(self): LineReceiver.connectionMade(self) logger.debug('FileIOProtocol:connectionMade') def connectionLost(self, reason): LineReceiver.connectionLost(self, reason) logger.debug('FileIOProtocol:connectionLost') if self.outfile: self.outfile.close() if self.remain != 0: # Problem uploading - discard logger.debug('FileIOProtocol:connectionLost remain(%d)!=0', self.remain) os.remove(self.outfilename) else: # Update job object with upload status self.job_file['uploaded'] = datetime.utcnow() self.job_file['size'] = self.size self.job_file['crc'] = self.crc class FileIOFactory(ServerFactory): protocol = FileIOProtocol def __init__(self, db, sessions, options): self.db = db self.options = options self.sessions = sessions - - - - - - - - - - - - - - - - - - - - - - - - - which is bound to an appropriate port on the server however you'd like. I use code like: self.fileio = FileIOFactory(db, self.sessions, options) reactor.listenTCP(self.options['file_port'], self.fileio) On the client side, I have an equivalent protocol that transmits up the file. It's run beneath a GUI, so keeps a reference to the GUI controller object that might indicate it needs to cancel a transfer mid-stream, as well as updating the controller during the transfer so it can update a progress bar on screen. It is also a LineReceiver based protocol, and uses the Twisted FileSender object to do the raw data transfer (which is implemented as a straight producer with the TCP socket being the consumer). The connectionMade method is where it transmits the ASCII header and then institutes the raw data transfer. - - - - - - - - - - - - - - - - - - - - - - - - - class TransferCancelled(Exception): """Exception for a user cancelling a transfer""" pass class FileIOClient(LineReceiver): def __init__(self, path, sess_key, file_key, controller): self.path = path self.sess_key = sess_key self.file_key = file_key self.controller = controller self.infile = open(self.path, 'rb') self.insize = os.stat(self.path).st_size self.result = None self.completed = False self.controller.file_sent = 0 self.controller.file_size = self.insize def _monitor(self, data): self.controller.file_sent += len(data) self.controller.total_sent += len(data) # Check with controller to see if we've been cancelled and abort # if so. if self.controller.cancel: print 'FileIOClient._monitor Cancelling' # Need to unregister the producer with the transport or it will # wait for it to finish before breaking the connection self.transport.unregisterProducer() self.transport.loseConnection() # Indicate a user cancelled result self.result = TransferCancelled('User cancelled transfer') return data def cbTransferCompleted(self, lastsent): self.completed = True self.transport.loseConnection() def connectionMade(self): self.transport.write('%s %s %s\r\n' % (str(self.sess_key), str(self.file_key), self.insize)) sender = FileSender() sender.CHUNK_SIZE = 2 ** 16 d = sender.beginFileTransfer(self.infile, self.transport, self._monitor) d.addCallback(self.cbTransferCompleted) def connectionLost(self, reason): LineReceiver.connectionLost(self, reason) print 'FileIOClient:connectionLost' self.infile.close() if self.completed: self.controller.completed.callback(self.result) else: self.controller.completed.errback(reason) class FileIOClientFactory(ClientFactory): protocol = FileIOClient def __init__(self, path, sess_key, file_key, controller): self.path = path self.sess_key = sess_key self.file_key = file_key self.controller = controller def clientConnectionFailed(self, connector, reason): ClientFactory.clientConnectionFailed(self, connector, reason) self.controller.completed.errback(reason) def buildProtocol(self, addr): print 'buildProtocol' p = self.protocol(self.path, self.sess_key, self.file_key, self.controller) p.factory = self return p - - - - - - - - - - - - - - - - - - - - - - - - - Within the presentation layer controller on the client, initiating a transfer is done with: def _transmitOne(self, address, port, path, sess_key, file_key): self.completed = defer.Deferred() f = FileIOClientFactory(path, sess_key, file_key, self) reactor.connectTCP(address, port, f) return self.completed and the result is that self.completed fires (callback or errback) when the transfer is done (which the controller uses to then initiate the next transfer when there are a list of files to go up for a job). While probably not exactly what you're trying to do, perhaps it'll point you in the right direction. -- David
![](https://secure.gravatar.com/avatar/ea2ac4f949b5ce239d5032a52f29076d.jpg?s=120&d=mm&r=g)
Hello Steven, I met the similar problem in the project I'm currently developing. Here is how I solved it: I delimit each message with a special char (0x1f in my case) which I'm sure will not occur inside any message. Every time a new chunk arrives (either complete message or a part of it) I check if it is delimited with this char. If yes, then I process it, if not, then I add this chunk up to the internal buffer. This seems to work perfectly for me. Here is a piece of code illustrating the concept (taken as is from the working code): class xooChatProtocol(Protocol): def __init__(self): self.buf = '' def dataReceived(self, data): commands = data.split('\x1f') if len(commands) > 0: i = 1 num = len(commands) for command in commands: if i < num: request = self.buf + command self.requestReceived(request) self.buf = '' else: self.buf += command i += 1 Hope this helps. -- Good luck, Pavel Bastov xooChat Team Leader and xooChat Evangelist http://www.xoochat.com/
![](https://secure.gravatar.com/avatar/228abe46f6c8e9c7c2e8440cfda66ebd.jpg?s=120&d=mm&r=g)
You're missing "self." in several places.. On 6/27/07, steven wang <steven.zdwang@gmail.com> wrote: ================================================================================================
self.is_big_size_data
big_data += data
self.big_data
if big_data_size == len(big_data):
self.big_data_size == len(self.big_data)
The check_head_ok also won't work like that. First off you need to change head to values there - and since dataReceived is called multiple times, you need to make sure check_head_ok is only called at the beginning. Arnar
![](https://secure.gravatar.com/avatar/5d26f3bb28c2320b7c84f4f17a99f5ef.jpg?s=120&d=mm&r=g)
======================================================== from twisted.internet import protocol, reactor, defer, interfaces class MyProtocol(protocol.Protocol): is_big_data = False big_data = '' big_data_size = 0 def connectionMade(self): self.big_data = '' self.big_data_size = 0 self.is_big_data = False pass def dataReceived(self, data): if check_head_ok(data): # To receive the big size data if self.is_big_data: return # The big data will overflow the Twisted receive buffer size, so loop to receive data if self.is_big_size_data: self.big_data += data if self.big_data_size == len(big_data): # To save data to database pass def connectionLost(self, reason): self.big_data = '' self.big_data_size = 0 self.is_big_data = False pass def check_head_ok(self, data): if data.startswith('ABC'): values = data.split(',') self.big_data_size = int(values[1]) self.is_big_data = self.big_data_size > 0 return True else: return False ================================================================ I rewrite my source about MyProtocol. But I can not receive complete data, only a part of data. :( Could you help me?
![](https://secure.gravatar.com/avatar/49e6a1325c24b53539004a0b7984bf55.jpg?s=120&d=mm&r=g)
On 2 Jul 2007, at 21.40, steven wang wrote:
I rewrite my source about MyProtocol. But I can not receive complete data, only a part of data. :( Could you help me?
Have you taken a look at twisted.protocols.basic? There are some nice abstractions in there for building chunked protocols. I'd recommend NetstringReceiver if you want to keep your protocol text-based. You subclass protocols.basic.NetstringReceiver instead of protocol.Protocol, and override stringReceived(data). That gets called whenever a complete message is received (unlike dataReceived, which just gets a series of bytes that may or may not be a complete message).
![](https://secure.gravatar.com/avatar/a61e243764490913906c773c9acb0d3c.jpg?s=120&d=mm&r=g)
"steven wang" <steven.zdwang@gmail.com> writes:
But I want to receive binary data in my protocol.
Even if you start with a non-binary header, you can switch to receiving binary information at any time by going using the raw mode of most of the basic protocols. And having some sort of ASCII header prior to the raw data is often a very simple way to handle things (something in common with a tremendous number of standard TCP-based protocols). Your original post had a fairly straight-forward ASCII header that I think would probably be fine. What you're probably missing is the concept of switching to a raw binary receive mode which then switches your protocol from getting data in its lineReceived method to having rawDataReceived called. For example, here's a slightly stripped pair of protocols (server and client) that I'm currently using as part of a bigger project. Most of the communication is over a PB connection which the client uses to perform operations on the server, one of which is editing job information. But jobs contain attached files (often very large audio/video files), so committing changes to a job also involves transmitting up any newly added files. So after the client updates the server's meta data, it initiates a separate set of file transfers across a different port. In my case, the header for a file transfer includes a session key (which the protocol uses to reference the original PB-based job session the client was using) along with a file key used for storage (which uniquely references a specific file in the job). The final element is the total file size. That is, upon connecting, the client transmits a line such as: <session_uuid> <file_uuid> ####### where the two uuids are specific to the transfer underway (and help with security since a random client isn't going to know the right ids), and ######## is the overall file length. After sending that line (e.g., right after its final newline), the client just blasts up the raw data. The protocol is a simple LineReceiver based protocol, that receives that information information as an ASCII initial line, after which it switches to raw mode to receive the data. Although the data length could technically be inferred from when the client disconnects, having it up front ensures I can detect a transfer that gets interrupted. So on the server side you have: - - - - - - - - - - - - - - - - - - - - - - - - - class FileIOProtocol(LineReceiver): def __init__(self): self.info = None self.outfile = None self.remain = 0 self.crc = 0 def lineReceived(self, line): logger.debug('FileIOProtocol:lineReceived:%s', line) sess_key, file_key, self.size = line.split() file_key = uuid.UUID(file_key) try: session_uuid = uuid.UUID(sess_key) except: logger.debug('FileIOProtocol:lineReceived Invalid session') self.transport.loseConnection() return self.job_session = self.factory.sessions.get(session_uuid) if not self.job_session: logger.debug('FileIOProtocol:lineReceived Invalid session') self.transport.loseConnection() return if not self.job_session.active: logger.debug('FileIOProtocol:lineReceived Stale session') self.transport.loseConnection() return # [db3l] The original code validates the individual file uuid here # resulting in self.job_file as job file object from the session if not self.job_file: logger.debug('FileIOProtocol:lineReceived Invalid file key') self.transport.loseConnection() return # Create the upload directory if not already present if not os.path.isdir(self.job_session.upload_dir): os.makedirs(self.job_session.upload_dir) self.outfilename = os.path.join(self.job_session.upload_dir, self.job_file['uuid'].hex) logger.debug('FileIOProtocol:lineReceived Receiving into %s', self.outfilename) try: self.outfile = open(self.outfilename,'wb') except Exception, value: logger.debug('FileIOProtocol:lineReceived Unable to open file %s ' '(%s)', self.outfilename, value) self.transport.loseConnection() return self.remain = int(self.size) logger.debug('FileIOProtocol:lineReceived Entering raw mode: %s %s', self.outfile, self.remain) self.setRawMode() def rawDataReceived(self, data): self.remain -= len(data) self.crc = crc32(data, self.crc) self.outfile.write(data) def connectionMade(self): LineReceiver.connectionMade(self) logger.debug('FileIOProtocol:connectionMade') def connectionLost(self, reason): LineReceiver.connectionLost(self, reason) logger.debug('FileIOProtocol:connectionLost') if self.outfile: self.outfile.close() if self.remain != 0: # Problem uploading - discard logger.debug('FileIOProtocol:connectionLost remain(%d)!=0', self.remain) os.remove(self.outfilename) else: # Update job object with upload status self.job_file['uploaded'] = datetime.utcnow() self.job_file['size'] = self.size self.job_file['crc'] = self.crc class FileIOFactory(ServerFactory): protocol = FileIOProtocol def __init__(self, db, sessions, options): self.db = db self.options = options self.sessions = sessions - - - - - - - - - - - - - - - - - - - - - - - - - which is bound to an appropriate port on the server however you'd like. I use code like: self.fileio = FileIOFactory(db, self.sessions, options) reactor.listenTCP(self.options['file_port'], self.fileio) On the client side, I have an equivalent protocol that transmits up the file. It's run beneath a GUI, so keeps a reference to the GUI controller object that might indicate it needs to cancel a transfer mid-stream, as well as updating the controller during the transfer so it can update a progress bar on screen. It is also a LineReceiver based protocol, and uses the Twisted FileSender object to do the raw data transfer (which is implemented as a straight producer with the TCP socket being the consumer). The connectionMade method is where it transmits the ASCII header and then institutes the raw data transfer. - - - - - - - - - - - - - - - - - - - - - - - - - class TransferCancelled(Exception): """Exception for a user cancelling a transfer""" pass class FileIOClient(LineReceiver): def __init__(self, path, sess_key, file_key, controller): self.path = path self.sess_key = sess_key self.file_key = file_key self.controller = controller self.infile = open(self.path, 'rb') self.insize = os.stat(self.path).st_size self.result = None self.completed = False self.controller.file_sent = 0 self.controller.file_size = self.insize def _monitor(self, data): self.controller.file_sent += len(data) self.controller.total_sent += len(data) # Check with controller to see if we've been cancelled and abort # if so. if self.controller.cancel: print 'FileIOClient._monitor Cancelling' # Need to unregister the producer with the transport or it will # wait for it to finish before breaking the connection self.transport.unregisterProducer() self.transport.loseConnection() # Indicate a user cancelled result self.result = TransferCancelled('User cancelled transfer') return data def cbTransferCompleted(self, lastsent): self.completed = True self.transport.loseConnection() def connectionMade(self): self.transport.write('%s %s %s\r\n' % (str(self.sess_key), str(self.file_key), self.insize)) sender = FileSender() sender.CHUNK_SIZE = 2 ** 16 d = sender.beginFileTransfer(self.infile, self.transport, self._monitor) d.addCallback(self.cbTransferCompleted) def connectionLost(self, reason): LineReceiver.connectionLost(self, reason) print 'FileIOClient:connectionLost' self.infile.close() if self.completed: self.controller.completed.callback(self.result) else: self.controller.completed.errback(reason) class FileIOClientFactory(ClientFactory): protocol = FileIOClient def __init__(self, path, sess_key, file_key, controller): self.path = path self.sess_key = sess_key self.file_key = file_key self.controller = controller def clientConnectionFailed(self, connector, reason): ClientFactory.clientConnectionFailed(self, connector, reason) self.controller.completed.errback(reason) def buildProtocol(self, addr): print 'buildProtocol' p = self.protocol(self.path, self.sess_key, self.file_key, self.controller) p.factory = self return p - - - - - - - - - - - - - - - - - - - - - - - - - Within the presentation layer controller on the client, initiating a transfer is done with: def _transmitOne(self, address, port, path, sess_key, file_key): self.completed = defer.Deferred() f = FileIOClientFactory(path, sess_key, file_key, self) reactor.connectTCP(address, port, f) return self.completed and the result is that self.completed fires (callback or errback) when the transfer is done (which the controller uses to then initiate the next transfer when there are a list of files to go up for a job). While probably not exactly what you're trying to do, perhaps it'll point you in the right direction. -- David
![](https://secure.gravatar.com/avatar/ea2ac4f949b5ce239d5032a52f29076d.jpg?s=120&d=mm&r=g)
Hello Steven, I met the similar problem in the project I'm currently developing. Here is how I solved it: I delimit each message with a special char (0x1f in my case) which I'm sure will not occur inside any message. Every time a new chunk arrives (either complete message or a part of it) I check if it is delimited with this char. If yes, then I process it, if not, then I add this chunk up to the internal buffer. This seems to work perfectly for me. Here is a piece of code illustrating the concept (taken as is from the working code): class xooChatProtocol(Protocol): def __init__(self): self.buf = '' def dataReceived(self, data): commands = data.split('\x1f') if len(commands) > 0: i = 1 num = len(commands) for command in commands: if i < num: request = self.buf + command self.requestReceived(request) self.buf = '' else: self.buf += command i += 1 Hope this helps. -- Good luck, Pavel Bastov xooChat Team Leader and xooChat Evangelist http://www.xoochat.com/
participants (5)
-
Adam Atlas
-
Arnar Birgisson
-
David Bolen
-
Pavel Bastov
-
steven wang