package fucoin.actions.transaction; import fucoin.supervisor.DistributedCommittedTransferRequest; import fucoin.supervisor.SuperVisorImpl; import akka.actor.ActorRef; import akka.actor.UntypedActorContext; public class ActionPrepareDistributedCommittedTransferAnswer extends CoordinatorTransaction { private ActorRef source; private ActorRef target; private int amount; private boolean granted; private long timestamp; private long id; public ActionPrepareDistributedCommittedTransferAnswer(ActorRef source, ActorRef target, int amount, long timestamp, boolean granted, long id) { this.source=source; this.target=target; this.amount=amount; this.granted=granted; this.timestamp=timestamp; this.id=id; } @Override protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, SuperVisorImpl superVisor) { log(""+superVisor.getKnownNeighbors()); log("granted?"+granted); DistributedCommittedTransferRequest request = superVisor.getRequest(id); if(granted){ if(request==null)//unknown DistributedCommittedTransferRequest ignore return; int newCount = request.addPositiveAnswer(sender); System.out.println(newCount+" have agreed on request"+id); if(newCount == superVisor.getKnownNeighbors().size()){ ActionCommitDistributedCommittedTransfer acdct = new ActionCommitDistributedCommittedTransfer(source,target,amount,true,timestamp,id); for(ActorRef neighbor : request.getAnswers()){ neighbor.tell(acdct, self); } superVisor.deleteRequest(request); } }else{ //A client wants to rollback if(request!=null){ ActionCommitDistributedCommittedTransfer acdct = new ActionCommitDistributedCommittedTransfer(source,target,amount,false,timestamp,id); for(ActorRef neighbor : request.getAnswers()){ neighbor.tell(acdct, self); } } } } }