Newer
Older
package fucoin.actions.transaction;
import akka.actor.ActorRef;
import akka.actor.UntypedActorContext;
import fucoin.supervisor.DistributedCommittedTransferRequest;
import fucoin.supervisor.SuperVisorImpl;
public class ActionInvokeDistributedCommittedTransfer extends CoordinatorTransaction {
private ActorRef source;
private ActorRef target;
Luca Keidel
committed
private ActorRef observer;
private int amount;
public ActionInvokeDistributedCommittedTransfer(ActorRef source,
Luca Keidel
committed
ActorRef target, int amount){
this(source, target, amount, null);
}
public ActionInvokeDistributedCommittedTransfer(ActorRef source,
ActorRef target, int amount, ActorRef observer) {
this.source = source;
this.target = target;
Luca Keidel
committed
this.observer = observer;
this.amount = amount;
}
@Override
protected void onAction(ActorRef sender, ActorRef self,
UntypedActorContext context, SuperVisorImpl superVisor) {
superVisor.addLogMsg("invoke transaction " + source.path().name() +
" sends " + amount +
" to " + target.path().name());
long timeout = System.currentTimeMillis() + 500;
Luca Keidel
committed
DistributedCommittedTransferRequest ds = new DistributedCommittedTransferRequest(source, target, amount, timeout, observer);
superVisor.addDistributedCommitedTransferRequest(ds);
ActionPrepareDistributedCommittedTransfer apdct = new ActionPrepareDistributedCommittedTransfer(source, target, amount, timeout, ds.getId());
for (ActorRef neighbor : superVisor.getKnownNeighbors().values()) {
neighbor.tell(apdct, self);
}
}
}