fdbserver/networktest.actor.cpp
ACTOR Future<Void> networkTestServer() {
state NetworkTestInterface interf( g_network );
state Future<Void> logging = delay( 1.0 );
state double lastTime = now();
state int sent = 0;
loop {
choose {
when( NetworkTestRequest req = waitNext( interf.test.getFuture() ) ) {
req.reply.send( NetworkTestReply( Value( std::string( req.replySize, '.' ) ) ) );
sent++;
}
when( Void _ = wait( logging ) ) {
auto spd = sent / (now() - lastTime);
fprintf( stderr, "responses per second: %f (%f us)\n", spd, 1e6/spd );
lastTime = now();
sent = 0;
logging = delay( 1.0 );
}
}
}
}
namespace {
template <class NetworkTestServerActor>
class NetworkTestServerActorState {
public:
NetworkTestServerActorState()
: interf(g_network),
logging(delay( 1.0 )),
lastTime(now()),
sent(0)
{
}
int a_body1(int loopDepth=0)
{
try {
;
loopDepth = a_body1loopHead1(loopDepth);
}
catch (Error& error) {
loopDepth = a_body1Catch1(error, loopDepth);
} catch (...) {
loopDepth = a_body1Catch1(unknown_error(), loopDepth);
}
return loopDepth;
}
int a_body1Catch1(Error error,int loopDepth=0)
{
this->~NetworkTestServerActorState();
static_cast<NetworkTestServerActor*>(this)->sendErrorAndDelPromiseRef(error);
loopDepth = 0;
return loopDepth;
}
int a_body1loopHead1(int loopDepth)
{
int oldLoopDepth = ++loopDepth;
while (loopDepth == oldLoopDepth) loopDepth = a_body1loopBody1(loopDepth);
return loopDepth;
}
int a_body1loopBody1(int loopDepth)
{
FutureStream<NetworkTestRequest> __when_expr_0 = interf.test.getFuture();
if (static_cast<NetworkTestServerActor*>(this)->actor_wait_state < 0) return a_body1Catch1(actor_cancelled(), std::max(0, loopDepth - 1));
if (__when_expr_0.isReady()) { if (__when_expr_0.isError()) return a_body1Catch1(__when_expr_0.getError(), std::max(0, loopDepth - 1)); else return a_body1loopBody1when1(__when_expr_0.pop(), loopDepth); };
StrictFuture<Void> __when_expr_1 = logging;
if (__when_expr_1.isReady()) { if (__when_expr_1.isError()) return a_body1Catch1(__when_expr_1.getError(), std::max(0, loopDepth - 1)); else return a_body1loopBody1when2(__when_expr_1.get(), loopDepth); };
static_cast<NetworkTestServerActor*>(this)->actor_wait_state = 1;
__when_expr_0.addCallbackAndClear(static_cast<ActorSingleCallback< NetworkTestServerActor, 0, NetworkTestRequest >*>(static_cast<NetworkTestServerActor*>(this)));
__when_expr_1.addCallbackAndClear(static_cast<ActorCallback< NetworkTestServerActor, 1, Void >*>(static_cast<NetworkTestServerActor*>(this)));
loopDepth = 0;
return loopDepth;
}
int a_body1loopBody1cont1(int loopDepth)
{
if (loopDepth == 0) return a_body1loopHead1(0);
return loopDepth;
}
int a_body1loopBody1when1(NetworkTestRequest const& req,int loopDepth)
{
req.reply.send( NetworkTestReply( Value( std::string( req.replySize, '.' ) ) ) );
sent++;
loopDepth = a_body1loopBody1cont1(loopDepth);
return loopDepth;
}
int a_body1loopBody1when2(Void const& _,int loopDepth)
{
auto spd = sent / (now() - lastTime);
fprintf( stderr, "responses per second: %f (%f us)\n", spd, 1e6/spd );
lastTime = now();
sent = 0;
logging = delay( 1.0 );
loopDepth = a_body1loopBody1cont1(loopDepth);
return loopDepth;
}
void a_exitChoose1()
{
if (static_cast<NetworkTestServerActor*>(this)->actor_wait_state > 0) static_cast<NetworkTestServerActor*>(this)->actor_wait_state = 0;
static_cast<NetworkTestServerActor*>(this)->ActorSingleCallback< NetworkTestServerActor, 0, NetworkTestRequest >::remove();
static_cast<NetworkTestServerActor*>(this)->ActorCallback< NetworkTestServerActor, 1, Void >::remove();
}
void a_callback_fire(ActorSingleCallback< NetworkTestServerActor, 0, NetworkTestRequest >*,NetworkTestRequest value)
{
a_exitChoose1();
try {
a_body1loopBody1when1(value, 0);
}
catch (Error& error) {
a_body1Catch1(error, 0);
} catch (...) {
a_body1Catch1(unknown_error(), 0);
}
}
void a_callback_error(ActorSingleCallback< NetworkTestServerActor, 0, NetworkTestRequest >*,Error err)
{
a_exitChoose1();
try {
a_body1Catch1(err, 0);
}
catch (Error& error) {
a_body1Catch1(error, 0);
} catch (...) {
a_body1Catch1(unknown_error(), 0);
}
}
void a_callback_fire(ActorCallback< NetworkTestServerActor, 1, Void >*,Void value)
{
a_exitChoose1();
try {
a_body1loopBody1when2(value, 0);
}
catch (Error& error) {
a_body1Catch1(error, 0);
} catch (...) {
a_body1Catch1(unknown_error(), 0);
}
}
void a_callback_error(ActorCallback< NetworkTestServerActor, 1, Void >*,Error err)
{
a_exitChoose1();
try {
a_body1Catch1(err, 0);
}
catch (Error& error) {
a_body1Catch1(error, 0);
} catch (...) {
a_body1Catch1(unknown_error(), 0);
}
}
NetworkTestInterface interf;
Future<Void> logging;
double lastTime;
int sent;
};
class NetworkTestServerActor : public Actor<Void>, public ActorSingleCallback< NetworkTestServerActor, 0, NetworkTestRequest >, public ActorCallback< NetworkTestServerActor, 1, Void >, public FastAllocated<NetworkTestServerActor>, public NetworkTestServerActorState<NetworkTestServerActor> {
public:
using FastAllocated<NetworkTestServerActor>::operator new;
using FastAllocated<NetworkTestServerActor>::operator delete;
virtual void destroy() { ((Actor<Void>*)this)->~Actor(); operator delete(this); }
friend struct ActorSingleCallback< NetworkTestServerActor, 0, NetworkTestRequest >;
friend struct ActorCallback< NetworkTestServerActor, 1, Void >;
NetworkTestServerActor()
: Actor<Void>(),
NetworkTestServerActorState<NetworkTestServerActor>()
{
this->a_body1();
}
void cancel()
{
auto wait_state = this->actor_wait_state;
this->actor_wait_state = -1;
switch (wait_state) {
case 1: this->a_callback_error((ActorSingleCallback< NetworkTestServerActor, 0, NetworkTestRequest >*)0, actor_cancelled()); break;
}
}
};
}
Future<Void> networkTestServer( ) {
return Future<Void>(new NetworkTestServerActor());
}