package fucoin.actions.transaction; import akka.actor.ActorRef; import akka.actor.UntypedActorContext; import fucoin.supervisor.DistributedCommittedTransferRequest; import fucoin.supervisor.SuperVisorImpl; public class ActionInvokeDistributedCommittedTransfer extends CoordinatorTransaction { private ActorRef source; private ActorRef target; private ActorRef observer; private int amount; public ActionInvokeDistributedCommittedTransfer(ActorRef source, ActorRef target, int amount){ this(source, target, amount, null); } public ActionInvokeDistributedCommittedTransfer(ActorRef source, ActorRef target, int amount, ActorRef observer) { this.source = source; this.target = target; this.observer = observer; this.amount = amount; } @Override protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, SuperVisorImpl superVisor) { superVisor.addLogMsg("invoke transaction " + source.path().name() + " sends " + amount + " to " + target.path().name()); long timeout = System.currentTimeMillis() + 500; DistributedCommittedTransferRequest ds = new DistributedCommittedTransferRequest(source, target, amount, timeout, observer); superVisor.addDistributedCommitedTransferRequest(ds); ActionPrepareDistributedCommittedTransfer apdct = new ActionPrepareDistributedCommittedTransfer(source, target, amount, timeout, ds.getId()); for (ActorRef neighbor : superVisor.getKnownNeighbors().values()) { neighbor.tell(apdct, self); } } }