Heim  >  Artikel  >  Datenbank  >  Akka2使用探索5(Typed Actors)

Akka2使用探索5(Typed Actors)

WBOY
WBOYOriginal
2016-06-07 15:30:521402Durchsuche

Akka 中的有类型 Actor 是 Active Objects 模式的一种实现. Smalltalk诞生之时,就已经缺省地将方法调用从同步操作发为异步派发。 有类型 Actor 由两 “部分” 组成, 一个public接口和一个实现, 如果你有 “企业级” Java的开发经验, 这对你应该非常熟悉。

Akka 中的有类型 Actor 是 Active Objects 模式的一种实现. Smalltalk诞生之时,就已经缺省地将方法调用从同步操作发为异步派发。

有类型 Actor 由两 “部分” 组成, 一个public接口和一个实现, 如果你有 “企业级” Java的开发经验, 这对你应该非常熟悉。 对普通actor来说,你拥有一个外部API (public接口的实例) 来将方法调用异步地委托给其实现类的私有实例。

有类型Actor相对于普通Actor的优势在于有类型Actor拥有静态的契约, 你不需要定义你自己的消息, 它的劣势在于对你能做什么和不能做什么进行了一些限制,比如 你不能使用 become/unbecome.

有类型Actor是使用 JDK Proxies 实现的,JDK Proxies提供了非常简单的api来拦截方法调用。

注意

和普通Akka actor一样,有类型actor也一次处理一个消息。

什么时候使用有类型的Actor

有类型的Actor很适合用在连接actor系统和非actor的代码,因为它可以使你能在外部编写正常的OO模式的代码。但切记不可滥用。

工具箱

返回有类型actor扩展 Returns the Typed Actor Extension
TypedActorExtension extension =
TypedActor.get(system); //system is an instance of ActorSystem

判断一个引用是否是有类型actor代理 Returns whether the reference is a Typed Actor Proxy or not
TypedActor.get(system).isTypedActor(someReference);

返回一个外部有类型actor代理所代表的Akka actor Returns the backing Akka Actor behind an external Typed Actor Proxy
TypedActor.get(system).getActorRefFor(someReference);

返回当前的ActorContext//Returns the current ActorContext,
此方法仅在一个TypedActor 实现的方法中有效 // method only valid within methods of a TypedActor implementation
ActorContext context = TypedActor.context();

返回当前有类型actor的外部代理//Returns the external proxy of the current Typed Actor,
此方法仅在一个TypedActor 实现的方法中有效// method only valid within methods of a TypedActor implementation
Squarer sq = TypedActor.self();


返回一个有类型Actor扩展的上下文实例//Returns a contextual instance of the Typed Actor Extension
这意味着如果你用它创建其它的有类型actor,它们会成为当前有类型actor的子actor//this means that if you create other Typed Actors with this,
//they will become children to the current Typed Actor.
TypedActor.get(TypedActor.context());

具体例子及说明

<span>package</span> practise.akka.typedactors

<span>import</span> akka.dispatch.Future
<span>import</span> akka.japi.Option

<span>/**
 * 这个就是对外的接口,各函数就是Typed Actor的接口方法
 */</span>
<span>public</span> <span>interface</span> Squarer {
    <span>void</span> squareDontCare(<span>int</span> i); <span>//fire-forget</span>

    Future<integer> square(<span>int</span> i); <span>//non-blocking send-request-reply</span>

    Option<integer> squareNowPlease(<span>int</span> i);<span>//blocking send-request-reply</span>

    <span>int</span> squareNow(<span>int</span> i); <span>//blocking send-request-reply</span>
}</integer></integer>

<span>package</span> practise.akka.typedactors

<span>import</span> akka.dispatch.Future
<span>import</span> akka.dispatch.Futures
<span>import</span> akka.actor.TypedActor
<span>import</span> akka.japi.Option
<span>import</span> akka.actor.ActorContext
<span>import</span> groovy.util.logging.Log4j
<span>import</span> akka.actor.ActorRef

<span>/**
 * 这个是接口实现。(实现akka.actor.TypedActor.Receiver接口就能接收actor发来的普通消息(非函数调用消息)。)
 */</span>
@Log4j
<span>class</span> SquarerImpl <span>implements</span> Squarer, akka.actor.TypedActor.Receiver {
    <span>private</span> String name;

    <span>public</span> SquarerImpl() {
        <span>this</span>.name = "<span>default</span>";
    }

    <span>public</span> SquarerImpl(String name) {
        <span>this</span>.name = name;
    }

    <span>public</span> <span>void</span> squareDontCare(<span>int</span> i) {
        log.debug("<span>squareDontCare,fire-and-forget只接收不返回结果,与ActorRef.tell完全一致----</span>" + i)   <span>//可以从线程号看出是异步处理的</span>
        <span>int</span> sq = i * i; <span>//Nobody cares :(</span>

        <span>//返回当前的ActorContext,</span>
        <span>// 此方法仅在一个TypedActor 实现的方法中有效</span>
        ActorContext context = TypedActor.context();
        println "<span>context ---- </span>" + context

        <span>//返回当前有类型actor的外部代理,</span>
        <span>// 此方法仅在一个TypedActor 实现的方法中有效</span>
        Squarer mysq = TypedActor.<squarer> self();
        println "<span>--self --</span>" + mysq

    }

    <span>public</span> Future<integer> square(<span>int</span> i) {
        log.debug("<span>square send-request-reply Future----</span>" + i)   <span>//可以从线程号看出是异步处理的</span>
        <span>return</span> Futures.successful(i * i, TypedActor.dispatcher());
    }

    <span>public</span> Option<integer> squareNowPlease(<span>int</span> i) {
        log.debug("<span>squareNowPlease send-request-reply Option----</span>" + i)   <span>//可以从线程号看出是异步处理的</span>
        <span>return</span> Option.some(i * i);
    }

    <span>public</span> <span>int</span> squareNow(<span>int</span> i) {
        log.debug("<span>squareNow send-request-reply result----</span>" + i)   <span>//可以从线程号看出是异步处理的</span>
        <span>return</span> i * i;
    }

    @Override
    <span>void</span> onReceive(Object o, ActorRef actorRef) {
        log.debug("<span>TypedActor收到消息----${o}---from:${actorRef}</span>")
    }
}</integer></integer></squarer>


<span>package</span> practise.akka.typedactors

<span>import</span> akka.actor.ActorSystem
<span>import</span> akka.actor.TypedActor
<span>import</span> akka.actor.TypedProps
<span>import</span> com.typesafe.config.ConfigFactory
<span>import</span> akka.japi.Creator
<span>import</span> groovy.util.logging.Log4j
<span>import</span> akka.actor.ActorContext

<span>/**
 * 这里创建Typed Actor.
 */</span>
@Log4j
<span>class</span> TypedActorsFactory {

    ActorSystem system

    <span>private</span> <span>final</span> String config = """akka {
    loglevel = "<span>${log?.debugEnabled ? </span>"DEBUG"<span> : </span>"INFO"<span>}</span>"
    actor.provider = "<span>akka.remote.RemoteActorRefProvider</span>"
    remote.netty.hostname = "<span>127.0.0.1</span>"
    remote.netty.port = 2552
    remote.log-received-messages = on
    remote.log-sent-messages = on
}"""

    TypedActorsFactory(String sysName) {
        <span>this</span>.system = ActorSystem.create(sysName, ConfigFactory.parseString(config))
    }


    Squarer getTypedActorDefault() {
        Squarer mySquarer =
            TypedActor.get(system).typedActorOf(<span>new</span> TypedProps<squarerimpl>(Squarer.<span>class</span>, SquarerImpl.<span>class</span>));
        <span>//这里创建的是代理类型</span>
        <span>return</span> mySquarer
    }

    Squarer getTypedActor(String name) {
        Squarer otherSquarer =
            TypedActor.get(system).typedActorOf(<span>new</span> TypedProps<squarerimpl>(Squarer.<span>class</span>,
                    <span>new</span> Creator<squarerimpl>() {
                        <span>public</span> SquarerImpl create() { <span>return</span> <span>new</span> SquarerImpl(name); }  <span>//这里创建的是具体的实现类型</span>
                    }),
                    name);  <span>//这个name是actor的name:akka//sys@host:port/user/name</span>
        <span>return</span> otherSquarer
    }

}</squarerimpl></squarerimpl></squarerimpl>

下面用几个测试用例实验一下

<span>package</span> practise.akka.typedactors

<span>import</span> akka.actor.ActorRef
<span>import</span> akka.actor.TypedActor
<span>import</span> akka.actor.UntypedActorContext
<span>import</span> akka.dispatch.Future
<span>import</span> com.baoxian.akka.AkkaClientNoReply
<span>import</span> com.baoxian.akka.AkkaServerApp

<span>class</span> TestTypedActors <span>extends</span> GroovyTestCase {

    def testTypeActor() {
        println("<span>----</span>")
        TypedActorsFactory factory = <span>new</span> TypedActorsFactory("<span>typedServer</span>")
<span>//        Squarer squarer = factory?.getTypedActorDefault()   //创建代理</span>
        Squarer squarer = factory?.getTypedActor("<span>serv</span>")      <span>//具体实现</span>
        squarer?.squareDontCare(10)
        Future future = squarer?.square(10)
        AkkaServerApp app = <span>new</span> AkkaServerApp("<span>tmp</span>", "<span>127.0.0.1</span>", 6666, "<span>result</span>")   <span>//这是我自己构建的接收器</span>
        app.messageProcessor = {msg, UntypedActorContext context ->
            log.info("<span>结果为</span>" + msg)
        }
        app.startup()

        akka.pattern.Patterns.pipe(future).to(app.serverActor)    <span>//Future的返回结果pipe到接收器中了,在log中能看到结果</span>
        println "<span>----</span>" + squarer?.squareNowPlease(10)?.get()
        println "<span>----</span>" + squarer?.squareNow(10)

        <span>//返回有类型actor扩展</span>
        TypedActor.get(factory.system)

        <span>//返回一个外部有类型actor代理所代表的Akka actor</span>
        ActorRef actor = TypedActor.get(factory.system).getActorRefFor(squarer);
        actor.tell("<span>消息</span>")     <span>//这个消息将会在SquarerImpl的onReceive方法中接收到</span>

        sleep(1000 * 60 * 10)
<span>//        TypedActor.get(factory.system).stop(squarer);   //这将会尽快地异步终止与指定的代理关联的有类型Actor</span>
        TypedActor.get(factory.system).poisonPill(squarer);<span>//这将会在有类型actor完成所有在当前调用之前对它的调用后异步地终止它</span>
    }


    def testRemoteTypedActor() {
        AkkaClientNoReply client = <span>new</span> AkkaClientNoReply("<span>akka://typedServer@127.0.0.1:2552/user/serv</span>")
        client.send("<span>远程消息</span>")      <span>//这将会在SquarerImpl的onReceive方法中接收到</span>
        sleep(1000)
        client.shutdown()
    }

}
Stellungnahme:
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn