"steven wang" steven.zdwang@gmail.com writes:
But I want to receive binary data in my protocol.
Even if you start with a non-binary header, you can switch to receiving binary information at any time by going using the raw mode of most of the basic protocols. And having some sort of ASCII header prior to the raw data is often a very simple way to handle things (something in common with a tremendous number of standard TCP-based protocols).
Your original post had a fairly straight-forward ASCII header that I think would probably be fine. What you're probably missing is the concept of switching to a raw binary receive mode which then switches your protocol from getting data in its lineReceived method to having rawDataReceived called.
For example, here's a slightly stripped pair of protocols (server and client) that I'm currently using as part of a bigger project. Most of the communication is over a PB connection which the client uses to perform operations on the server, one of which is editing job information. But jobs contain attached files (often very large audio/video files), so committing changes to a job also involves transmitting up any newly added files. So after the client updates the server's meta data, it initiates a separate set of file transfers across a different port.
In my case, the header for a file transfer includes a session key (which the protocol uses to reference the original PB-based job session the client was using) along with a file key used for storage (which uniquely references a specific file in the job). The final element is the total file size. That is, upon connecting, the client transmits a line such as:
<session_uuid> <file_uuid> #######
where the two uuids are specific to the transfer underway (and help with security since a random client isn't going to know the right ids), and ######## is the overall file length. After sending that line (e.g., right after its final newline), the client just blasts up the raw data.
The protocol is a simple LineReceiver based protocol, that receives that information information as an ASCII initial line, after which it switches to raw mode to receive the data. Although the data length could technically be inferred from when the client disconnects, having it up front ensures I can detect a transfer that gets interrupted.
So on the server side you have:
- - - - - - - - - - - - - - - - - - - - - - - - -
class FileIOProtocol(LineReceiver):
def __init__(self): self.info = None self.outfile = None self.remain = 0 self.crc = 0
def lineReceived(self, line): logger.debug('FileIOProtocol:lineReceived:%s', line) sess_key, file_key, self.size = line.split() file_key = uuid.UUID(file_key)
try: session_uuid = uuid.UUID(sess_key) except: logger.debug('FileIOProtocol:lineReceived Invalid session') self.transport.loseConnection() return
self.job_session = self.factory.sessions.get(session_uuid) if not self.job_session: logger.debug('FileIOProtocol:lineReceived Invalid session') self.transport.loseConnection() return
if not self.job_session.active: logger.debug('FileIOProtocol:lineReceived Stale session') self.transport.loseConnection() return
# [db3l] The original code validates the individual file uuid here # resulting in self.job_file as job file object from the session
if not self.job_file: logger.debug('FileIOProtocol:lineReceived Invalid file key') self.transport.loseConnection() return
# Create the upload directory if not already present if not os.path.isdir(self.job_session.upload_dir): os.makedirs(self.job_session.upload_dir)
self.outfilename = os.path.join(self.job_session.upload_dir, self.job_file['uuid'].hex)
logger.debug('FileIOProtocol:lineReceived Receiving into %s', self.outfilename) try: self.outfile = open(self.outfilename,'wb') except Exception, value: logger.debug('FileIOProtocol:lineReceived Unable to open file %s ' '(%s)', self.outfilename, value) self.transport.loseConnection() return
self.remain = int(self.size) logger.debug('FileIOProtocol:lineReceived Entering raw mode: %s %s', self.outfile, self.remain) self.setRawMode()
def rawDataReceived(self, data): self.remain -= len(data) self.crc = crc32(data, self.crc) self.outfile.write(data)
def connectionMade(self): LineReceiver.connectionMade(self) logger.debug('FileIOProtocol:connectionMade')
def connectionLost(self, reason): LineReceiver.connectionLost(self, reason) logger.debug('FileIOProtocol:connectionLost') if self.outfile: self.outfile.close()
if self.remain != 0: # Problem uploading - discard logger.debug('FileIOProtocol:connectionLost remain(%d)!=0', self.remain)
os.remove(self.outfilename) else: # Update job object with upload status self.job_file['uploaded'] = datetime.utcnow() self.job_file['size'] = self.size self.job_file['crc'] = self.crc
class FileIOFactory(ServerFactory): protocol = FileIOProtocol
def __init__(self, db, sessions, options): self.db = db self.options = options self.sessions = sessions
- - - - - - - - - - - - - - - - - - - - - - - - -
which is bound to an appropriate port on the server however you'd like. I use code like:
self.fileio = FileIOFactory(db, self.sessions, options) reactor.listenTCP(self.options['file_port'], self.fileio)
On the client side, I have an equivalent protocol that transmits up the file. It's run beneath a GUI, so keeps a reference to the GUI controller object that might indicate it needs to cancel a transfer mid-stream, as well as updating the controller during the transfer so it can update a progress bar on screen.
It is also a LineReceiver based protocol, and uses the Twisted FileSender object to do the raw data transfer (which is implemented as a straight producer with the TCP socket being the consumer). The connectionMade method is where it transmits the ASCII header and then institutes the raw data transfer.
- - - - - - - - - - - - - - - - - - - - - - - - -
class TransferCancelled(Exception): """Exception for a user cancelling a transfer""" pass
class FileIOClient(LineReceiver):
def __init__(self, path, sess_key, file_key, controller): self.path = path self.sess_key = sess_key self.file_key = file_key self.controller = controller
self.infile = open(self.path, 'rb') self.insize = os.stat(self.path).st_size
self.result = None self.completed = False
self.controller.file_sent = 0 self.controller.file_size = self.insize
def _monitor(self, data): self.controller.file_sent += len(data) self.controller.total_sent += len(data)
# Check with controller to see if we've been cancelled and abort # if so. if self.controller.cancel: print 'FileIOClient._monitor Cancelling' # Need to unregister the producer with the transport or it will # wait for it to finish before breaking the connection self.transport.unregisterProducer() self.transport.loseConnection() # Indicate a user cancelled result self.result = TransferCancelled('User cancelled transfer')
return data
def cbTransferCompleted(self, lastsent): self.completed = True self.transport.loseConnection()
def connectionMade(self): self.transport.write('%s %s %s\r\n' % (str(self.sess_key), str(self.file_key), self.insize)) sender = FileSender() sender.CHUNK_SIZE = 2 ** 16 d = sender.beginFileTransfer(self.infile, self.transport, self._monitor) d.addCallback(self.cbTransferCompleted)
def connectionLost(self, reason): LineReceiver.connectionLost(self, reason) print 'FileIOClient:connectionLost' self.infile.close() if self.completed: self.controller.completed.callback(self.result) else: self.controller.completed.errback(reason)
class FileIOClientFactory(ClientFactory):
protocol = FileIOClient
def __init__(self, path, sess_key, file_key, controller): self.path = path self.sess_key = sess_key self.file_key = file_key self.controller = controller
def clientConnectionFailed(self, connector, reason): ClientFactory.clientConnectionFailed(self, connector, reason) self.controller.completed.errback(reason)
def buildProtocol(self, addr): print 'buildProtocol' p = self.protocol(self.path, self.sess_key, self.file_key, self.controller) p.factory = self return p
- - - - - - - - - - - - - - - - - - - - - - - - -
Within the presentation layer controller on the client, initiating a transfer is done with:
def _transmitOne(self, address, port, path, sess_key, file_key): self.completed = defer.Deferred() f = FileIOClientFactory(path, sess_key, file_key, self) reactor.connectTCP(address, port, f) return self.completed
and the result is that self.completed fires (callback or errback) when the transfer is done (which the controller uses to then initiate the next transfer when there are a list of files to go up for a job).
While probably not exactly what you're trying to do, perhaps it'll point you in the right direction.
-- David