Integrating with Akka
Akka 使用Actor模型來(lái)提升抽象等級(jí)并且提供一個(gè)更好的平臺(tái)去構(gòu)建正確的并發(fā)和可擴(kuò)展的應(yīng)用原环。對(duì)于容錯(cuò)它采用了Let it crash
模型曙聂,這是很成功的應(yīng)用在電信行業(yè)構(gòu)建一個(gè)永不停止的自愈系統(tǒng)。Actor提供了對(duì)于transparent distribution和基本的真正可拓展和容錯(cuò)的應(yīng)用冤吨。
The application actor system
Akka可以使用一些稱作actor systems的容器。一個(gè)actor system 管理它的配置資源用于運(yùn)行他所包含的actors.
一個(gè)Play應(yīng)用定義了一個(gè)特殊的actor system被自身使用舰蟆。這個(gè)actor system跟隨著這個(gè)應(yīng)用的生命周期并且會(huì)自動(dòng)的重啟當(dāng)應(yīng)用重啟的時(shí)候讶凉。
Writing actors
想要使用Akka辙谜,你需要寫一個(gè)actor.下面是一個(gè)簡(jiǎn)單的actor.
package actors;
import akka.actor.*;
import actors.HelloActorProtocol.*;
public class HelloActor extends UntypedActor {
public static Props props = Props.create(HelloActor.class);
public void onReceive(Object msg) throws Exception {
if (msg instanceof SayHello) {
sender().tell("Hello, " + ((SayHello) msg).name, self());
}
}
}
注意這里的HelloActor定義了一個(gè)static field叫做props
问欠,它返回一個(gè)Props
對(duì)象用于描述如何創(chuàng)建這個(gè)actor肝匆。這是一個(gè)非常好的Akka慣例,用來(lái)分離實(shí)例化的邏輯從創(chuàng)建actor的代碼中顺献。
這里有一個(gè)best practice术唬。把HelloActor
發(fā)送和接收定義為static inner classes叫做HelloActorProtocol
:
package actors;
public class HelloActorProtocol {
public static class SayHello {
public final String name;
public SayHello(String name) {
this.name = name;
}
}
}
Creating and using actors
創(chuàng)建或是使用一個(gè)actor
,你需要一個(gè)ActorSystem
.可以通過(guò)申明一個(gè)依賴來(lái)獲得,然后你可以使用actorOf
方法去創(chuàng)建一個(gè)新的actor.
最基本的事情就是你可以給一個(gè)actor發(fā)送一個(gè)message.當(dāng)你發(fā)送一個(gè)message給一個(gè)actor滚澜,沒(méi)有響應(yīng), it`s fire and forget.這也被稱作tell
模式.
然而在一個(gè)web應(yīng)用中,tell
模式通常是沒(méi)有用的,因?yàn)镠TTP協(xié)議是一個(gè)request和responses.在這種情況下嫁怀,你可能想要的是一個(gè)ask
模式.這個(gè)ask
模式返回一個(gè)Scala的Future
,你可以通過(guò)使用scala.compat.java8.FutureConverts.toJava
來(lái)轉(zhuǎn)換為Java的CompletionStage
.
下面是一個(gè)使用ask
模式的HelloActor
的例子:
import akka.actor.*;
import play.mvc.*;
import scala.compat.java8.FutureConverters;
import javax.inject.*;
import java.util.concurrent.CompletionStage;
import static akka.pattern.Patterns.ask;//need imported
@Singleton
public class Application extends Controller {
final ActorRef helloActor;
@Inject public Application(ActorSystem system) {
helloActor = system.actorOf(HelloActor.props);
}
public CompletionStage<Result> sayHello(String name) {
return FutureConverters.toJava(ask(helloActor, new SayHello(name), 1000))
.thenApply(response -> ok((String) response));
}
}
有一些需要注意的地方:
-
ask
模式需要被導(dǎo)入设捐,靜態(tài)導(dǎo)入ask
通常是最方便的方式。 - 返回的
future
被轉(zhuǎn)換為CompletionStage
.這導(dǎo)致promise是一個(gè)CompletionStage<Object>
,當(dāng)你訪問(wèn)呢這個(gè)值的時(shí)候塘淑,你需要轉(zhuǎn)換為你希望從actor返回的類型萝招。 - 這個(gè)
ask
模式需要一個(gè)timeout,我們提供了1000 milliseconds.如果actor花費(fèi)的時(shí)間超過(guò)這個(gè)響應(yīng)時(shí)間。返回的promise
將成為一個(gè)timeout error存捺。 - 由于我們創(chuàng)建了一個(gè)actor在這個(gè)構(gòu)造器中槐沼,我們需要我們的controller作為一個(gè)
Singleton
,這樣每次controller被使用時(shí)不會(huì)創(chuàng)建一個(gè)性的actor。
Dependency injecting actors
如果你愿意捌治,你可以讓Guice
實(shí)例化你的actors并綁定引用到你的controllers和components依賴岗钩。
比如,如果你想要一個(gè)actor依賴于 Play configuration肖油,你可以這樣做:
import akka.actor.UntypedActor;
import play.Configuration;
import javax.inject.Inject;
public class ConfiguredActor extends UntypedActor {
private Configuration configuration;
@Inject
public ConfiguredActor(Configuration configuration) {
this.configuration = configuration;
}
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof ConfiguredActorProtocol.GetConfig) {
sender().tell(configuration.getString("my.config"), self());
}
}
}
Play提供了一些helpers用于幫助acotr bindings兼吓。他們?cè)试Sactor自身依賴注入,并且允許actor引用自身被注入到其他的組件森枪。綁定actor使用這些helpers视搏,創(chuàng)建一個(gè)module參考dependency injection documentation
,配合AkkaGuiceSupport
接口和使用bindActor
方法去綁定到actor:
import com.google.inject.AbstractModule;
import play.libs.akka.AkkaGuiceSupport;
public class MyModule extends AbstractModule implements AkkaGuiceSupport {
@Override
protected void configure() {
bindActor(ConfiguredActor.class, "configured-actor");
}
}
actor同時(shí)被命名為configured-actor
,并且還將使用configured-actor
被注入∩竽酰現(xiàn)在你可以依賴actor在你的controllers和其他的components:
import akka.actor.ActorRef;
import play.mvc.*;
import scala.compat.java8.FutureConverters;
import javax.inject.Inject;
import javax.inject.Named;
import java.util.concurrent.CompletionStage;
import static akka.pattern.Patterns.ask;
public class Application extends Controller {
private ActorRef configuredActor;
@Inject
public Application(@Named("configured-actor") ActorRef configuredActor) {
this.configuredActor = configuredActor;
}
public CompletionStage<Result> getConfig() {
return FutureConverters.toJava(ask(configuredActor,
new ConfiguredActorProtocol.GetConfig(), 1000)
).thenApply(response -> ok((String) response));
}
}
Dependency injecting child actors
上面是關(guān)于root actors的注入,但是你創(chuàng)建的很多actor是沒(méi)有被Play應(yīng)用的生命周期束縛的浑娜,并且可能會(huì)有一些額外的狀態(tài)傳遞佑力。
為了幫助注入child actors,Play利用Guice的AssistedInject
支持.
假設(shè)你有下面的actor,依賴configuration被注入筋遭,添加一個(gè)key:
import akka.actor.UntypedActor;
import com.google.inject.assistedinject.Assisted;
import play.Configuration;
import javax.inject.Inject;
public class ConfiguredChildActor extends UntypedActor {
private final Configuration configuration;
private final String key;
@Inject
public ConfiguredChildActor(Configuration configuration, @Assisted String key) {
this.configuration = configuration;
this.key = key;
}
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof ConfiguredChildActorProtocol.GetConfig) {
sender().tell(configuration.getString(key), self());
}
}
}
在這種情況下我們使用構(gòu)造函數(shù)注入打颤,Guice的注入支持僅僅兼容構(gòu)造器注入。參數(shù)key
在創(chuàng)建時(shí)提供宛畦,不是通過(guò)容器瘸洛,我們使用了@Assisted
注解。
現(xiàn)在在child協(xié)議中次和,我們定義了一個(gè)Factory
接口:
import akka.actor.Actor;
public class ConfiguredChildActorProtocol {
public static class GetConfig {}
public interface Factory {
public Actor create(String key);
}
}
我們不會(huì)去實(shí)現(xiàn)這個(gè)接口反肋,Guice會(huì)為我們做這些,提供一個(gè)實(shí)現(xiàn)踏施,不僅傳遞我們的key參數(shù)石蔗,而且還定位Configuration依賴并且注入他。由于只是返回一個(gè)Actor畅形,當(dāng)測(cè)試這個(gè)actor時(shí)养距,我們可以注入一個(gè)factor返回任意的actor,比如它允許我們注入一個(gè)家的child actor日熬,來(lái)替代一個(gè)真實(shí)的actor棍厌。
現(xiàn)在actor依賴可以被InjectedActorSupport
拓展,他可以依賴于我們創(chuàng)建的factory:
import akka.actor.ActorRef;
import akka.actor.UntypedActor;
import play.libs.akka.InjectedActorSupport;
import javax.inject.Inject;
public class ParentActor extends UntypedActor implements InjectedActorSupport {
private ConfiguredChildActorProtocol.Factory childFactory;
@Inject
public ParentActor(ConfiguredChildActorProtocol.Factory childFactory) {
this.childFactory = childFactory;
}
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof ParentActorProtocol.GetChild) {
String key = ((ParentActorProtocol.GetChild) message).key;
ActorRef child = injectedChild(() -> childFactory.create(key), key);
sender().tell(child, self());
}
}
}
它使用injectedChild
創(chuàng)建并獲取child actor引用,通過(guò)key竖席。第二個(gè)參數(shù)將會(huì)被用作child actor的name耘纱。
最終,我們需要綁定我們的actors毕荐。在我們的模塊中束析。我們使用bindActorFactory
去綁定parent actor并且綁定child factory到child實(shí)現(xiàn):
import com.google.inject.AbstractModule;
import play.libs.akka.AkkaGuiceSupport;
public class MyModule extends AbstractModule implements AkkaGuiceSupport {
@Override
protected void configure() {
bindActor(ParentActor.class, "parent-actor");
bindActorFactory(ConfiguredChildActor.class,
ConfiguredChildActorProtocol.Factory.class);
}
}
這將是Guice自動(dòng)綁定ConfiguredChildActorProtocol.Factory的實(shí)例,該實(shí)例將在配置為實(shí)例化時(shí)配置為ConfiguredChildActor憎亚。
Configuration
默認(rèn)的actor系統(tǒng)配置是讀取自Play application configuration文件员寇。比如,配置默認(rèn)的application actor system dispatcher,將這些行添加到conf/application.conf
文件:
akka.actor.default-dispatcher.fork-join-executor.pool-size-max = 64
akka.actor.debug.receive = on
關(guān)于Akka的日志配置第美,參考configuring logging
.
Changing configuration prefix
如果你想使用akka.*配置其他的Akka actor system, 你可以告訴Play加載他的配置從其他的位置蝶锋。
play.akka.config = "my-akka"
現(xiàn)在配置將讀取my-akka前綴替代akka前綴:
my-akka.actor.default-dispatcher.fork-join-executor.pool-size-max = 64
my-akka.actor.debug.receive = on
Built-in actor system name
默認(rèn)的Play actor system 的name是application
。你可以改變他通過(guò)conf/application.conf
play.akka.actor-system = "custom-name"
Note: This feature is useful if you want to put your play application ActorSystem in an akka cluster.
Executing a block of code asynchronously
一個(gè)常見(jiàn)的Akka用例是并發(fā)的計(jì)算什往,不需要···牲览。如果你發(fā)現(xiàn)你創(chuàng)建了一個(gè)Actors pool僅僅只是為了執(zhí)行一些并行的計(jì)算,這里有一些更簡(jiǎn)單更快的方式:
import play.mvc.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
public class Application extends Controller {
public CompletionStage<Result> index() {
return CompletableFuture.supplyAsync(this::longComputation)
.thenApply((Integer i) -> ok("Got " + i));
}
}
Scheduling asynchronous tasks
你可以定時(shí)發(fā)送消息給一個(gè)actor或是執(zhí)行任務(wù)(functions or Runnable instances).你可以得到一個(gè)Cancellable,你可以調(diào)用cancel來(lái)取消任務(wù)的執(zhí)行第献。
比如贡必,你可以發(fā)送一個(gè)消息到testActor
every 30 minutes:
system.scheduler().schedule(
Duration.create(0, TimeUnit.MILLISECONDS), //Initial delay 0 milliseconds
Duration.create(30, TimeUnit.MINUTES), //Frequency 30 minutes
testActor,
"tick",
system.dispatcher(),
null
);
或則運(yùn)行一個(gè)代碼塊10 milliseconds立即:
system.scheduler().scheduleOnce(
Duration.create(10, TimeUnit.MILLISECONDS),
() -> file.delete(),
system.dispatcher()
);