Dear list
I may need som advise to get som testing wokring.
I have a server for service a, which creates a client for service b to get
som data:
class StateHandler(object):
implements(State.Iface)
def __init__(self):
#do stuff
self.loaddata()
@inlineCallbacks
def loaddata(self):
try:
conn = yield ClientCreator(
reactor,
TTwisted.ThriftClientProtocol,
dsClient,
TBinaryProtocol.TBinaryProtocolFactory()).connectTCP("10.70.10.30", 7246)
data = yield conn.client.getAllAuthenticationData()
#process data
conn.transport.loseConnection()
I use trail to test this service, code is bolow:
class TestStateStore(unittest.TestCase):
@defer.inlineCallbacks
def setUp(self):
self.handler = StateHandler()
self.processor = State.Processor(self.handler)
self.pfactory = TBinaryProtocol.TBinaryProtocolFactory()
self.server = reactor.listenTCP(
0,
TTwisted.ThriftServerFactory(self.processor, self.pfactory),
interface="127.0.0.1")
self.portNo = self.server.getHost().port
self.txclient = yield ClientCreator(
reactor,
TTwisted.ThriftClientProtocol,
State.Client,
self.pfactory).connectTCP("127.0.0.1", self.portNo)
self.client = self.txclient.client
@defer.inlineCallbacks
def tearDown(self):
self.txclient.transport.loseConnection()
yield self.server.stopListening()
def test_dummy(self):
self.assertEquals(True, True)
When I run test it fails with the following error message.
ERROR]
Traceback (most recent call last):
Failure: twisted.trial.util.DirtyReactorAggregateError: Reactor was unclean.
DelayedCalls: (set twisted.internet.base.DelayedCall.debug = True to debug)