Maison > Article > base de données > Akka2使用探索4(Actors)
ask 异步发送一条消息并返回一个 Future 代表一个可能的回应。需要采用Future的处理模式。 每一个消息发送者分别保证自己的消息的次序. try { String result = operation(); getSender().tell(result); } catch (Exception e) { getSender().tell(new akka.a
ask 异步发送一条消息并返回一个 Future代表一个可能的回应。需要采用Future的处理模式。每一个消息发送者分别保证自己的消息的次序. try {
String result = operation();
getSender().tell(result);
} catch (Exception e) {
getSender().tell(new akka.actor.Status.Failure(e));
throw e;
}
ask使用方式如下:
List<future>> futures = [] AkkaClientNoReply client = <span>new</span> AkkaClientNoReply("<span>akka://xw@127.0.0.1:8888/user/server</span>") client.send("<span>hello</span>") 0.upto(15) { futures <pre class="brush:php;toolbar:false"> //模拟客户端给服务端发0——15消息,服务器处理(把数值+1返回给客户端) } <span>final</span> Future<iterable>> aggregate = Futures.sequence(futures, client.system.dispatcher()); <span>final</span> Future<integer> transformed = aggregate.map(<span>new</span> Mapper<iterable>, Integer>() { <span>public</span> Integer apply(Iterable<object> coll) { <span>final</span> Iterator<object> it = coll.iterator(); <span>int</span> count = 0; <span>while</span> (it.hasNext()) { <span>int</span> x = (Integer) it.next(); count = count + x } <span>return</span> <span>new</span> Integer(count); } }); AkkaServerApp app = <span>new</span> AkkaServerApp("<span>resultHandler</span>", "<span>127.0.0.1</span>", 6666, "<span>result</span>") app.messageProcessor = {msg, UntypedActorContext context -> log.info("<span>1到16之和为</span>" + msg) } app.startup() akka.pattern.Patterns.pipe(transformed).to(app.serverActor)</object></object></iterable></integer></iterable>
如果服务端处理消息时发生了异常而导致没有给客户端回应,那么客户端收到的结果将会收到Timeout的Failure:Failure(akka.pattern.AskTimeoutException: Timed out)。可以将异常捕获用Failure封装异常发给客户端:actor.tell(new akka.actor.Status.Failure(e))。
Future的onComplete, onResult, 或 onTimeout 方法可以用来注册一个回调,以便在Future完成时得到通知。从而提供一种避免阻塞的方法。
警告
在使用future回调如 onComplete, onSuccess, and onFailure时, 在actor内部你要小心避免捕捉该actor的引用, i.e. 不要在回调中调用该actor的方法或访问其可变状态。这会破坏actor的封装,会引用同步bug和race condition, 因为回调会与此actor一同被并发调度。 不幸的是目前还没有一种编译时的方法能够探测到这种非法访问。
转发消息
你可以将消息从一个actor转发给另一个。虽然经过了一个‘中转’,但最初的发送者地址/引用将保持不变。当实现功能类似路由器、负载均衡器、备份等的actor时会很有用。
myActor.forward(message, getContext());
回应消息
getSender().tell(replyMsg)
如果没有sender (不是从actor发送的消息或者没有future上下文) 那么 sender 缺省为“dead-letter” actor的引用.
初始化接收消息超时
设置receiveTimeout 属性并声明一个处理 ReceiveTimeout 对象的匹配分支。
public class MyReceivedTimeoutUntypedActor extends UntypedActor {
public MyReceivedTimeoutUntypedActor() {
getContext().setReceiveTimeout(Duration.parse("30 seconds"));
}
public void onReceive(Object message) {
if (message.equals("Hello")) {
getSender().tell("Hello world");
} else if (message == Actors.receiveTimeout()) {
throw new RuntimeException("received timeout");
} else {
unhandled(message);
}
}
}
终止Actor
通过调用ActorRefFactory i.e. ActorContext 或 ActorSystem 的 stop 方法来终止一个actor , 通常 context 用来终止子actor,而 system 用来终止顶级actor. 实际的终止操作是异步执行的, i.e. stop 可能在actor被终止之前返回。
如果当前有正在处理的消息,对该消息的处理将在actor被终止之前完成,但是邮箱中的后续消息将不会被处理。缺省情况下这些消息会被送到 ActorSystem 的 死信, 但是这取决于邮箱的实现。
actor的终止分两步: 第一步actor将停止对邮箱的处理,向所有子actor发送终止命令,然后处理来自子actor的终止消息直到所有的子actor都完成终止, 最后终止自己 (调用 postStop, 销毁邮箱, 向 DeathWatch 发布 Terminated, 通知其监管者). 这个过程保证actor系统中的子树以一种有序的方式终止, 将终止命令传播到叶子结点并收集它们回送的确认消息给被终止的监管者。如果其中某个actor没有响应 (i.e. 由于处理消息用了太长时间以至于没有收到终止命令), 整个过程将会被阻塞。
在 ActorSystem.shutdown被调用时, 系统根监管actor会被终止,以上的过程将保证整个系统的正确终止。
postStop hook 是在actor被完全终止以后调用。
PoisonPill
你也可以向actor发送 akka.actor.PoisonPill 消息, 这个消息处理完成后actor会被终止。 PoisonPill 与普通消息一样被放进队列,因此会在已经入队列的其它消息之后被执行。
优雅地终止
如果你想等待终止过程的结束,或者组合若干actor的终止次序,可以使用gracefulStop:
try {
Future
Await.result(stopped, Duration.create(6, TimeUnit.SECONDS));
// the actor has been stopped
} catch (ActorTimeoutException e) {
// the actor wasn't stopped within 5 seconds
}
热拔插 Become/Unbecome
升级 Upgrade
Akka支持在运行时对Actor消息循环 (e.g. 的实现)进行实时替换: 在actor中调用 context.become 方法。
Become 要求一个 akka.japi.Procedure 参数作为新的消息处理实现。 被替换的代码被存在一个栈中,可以被push和pop。
降级
由于被热替换掉的代码存在栈中,你也可以对代码进行降级,只需要在actor中调用 context.unbecome 方法。
Killing actor
发送Kill消息给actor
Actor 与 异常
在消息被actor处理的过程中可能会抛出异常,例如数据库异常。
如果消息处理过程中(即从邮箱中取出并交给receive后)发生了异常,这个消息将被丢失。必须明白它不会被放回到邮箱中。所以如果你希望重试对消息的处理,你需要自己抓住异常然后在异常处理流程中重试. 请确保你限制重试的次数,因为你不会希望系统产生活锁 (从而消耗大量CPU而于事无补)。
如果消息处理过程中发生异常,邮箱没有任何变化。如果actor被重启,邮箱会被保留。邮箱中的所有消息不会丢失。
如果抛出了异常,actor实例将被丢弃而生成一个新的实例。这个新的实例会被该actor的引用所引用(所以这个过程对开发人员来说是不可见的)。注意这意味着如果你不在preRestart 回调中进行保存,并在postRestart回调中恢复,那么失败的actor实例的当前状态会被丢失。