Skip to content
Snippets Groups Projects
ActionInvokeDistributedCommittedTransfer.java 1.73 KiB
Newer Older
package fucoin.actions.transaction;

import akka.actor.ActorRef;
import akka.actor.UntypedActorContext;
import fucoin.supervisor.DistributedCommittedTransferRequest;
import fucoin.supervisor.SuperVisorImpl;

public class ActionInvokeDistributedCommittedTransfer extends CoordinatorTransaction {

    private ActorRef source;
    private ActorRef target;
    private int amount;

    public ActionInvokeDistributedCommittedTransfer(ActorRef source,
                                                    ActorRef target, int amount){
        this(source, target, amount, null);
    }

    public ActionInvokeDistributedCommittedTransfer(ActorRef source,
                                                    ActorRef target, int amount, ActorRef observer) {
        this.source = source;
        this.target = target;
        this.amount = amount;
    }

    @Override
    protected void onAction(ActorRef sender, ActorRef self,
                            UntypedActorContext context, SuperVisorImpl superVisor) {
        superVisor.addLogMsg("invoke transaction " + source.path().name() +
                " sends " + amount +
                " to " + target.path().name());

        long timeout = System.currentTimeMillis() + 500;

        DistributedCommittedTransferRequest ds = new DistributedCommittedTransferRequest(source, target, amount, timeout, observer);
        superVisor.addDistributedCommitedTransferRequest(ds);
        ActionPrepareDistributedCommittedTransfer apdct = new ActionPrepareDistributedCommittedTransfer(source, target, amount, timeout, ds.getId());
        for (ActorRef neighbor : superVisor.getKnownNeighbors().values()) {
            neighbor.tell(apdct, self);
        }
    }

}