diff --git a/src/main/java/fucoin/actions/control/ActionWalletSendMoney.java b/src/main/java/fucoin/actions/control/ActionWalletSendMoney.java index 1480d1666f28b11c6c33e8beab2c4adc8841c338..5b7faaf6b720e806ffd02ca311f50b745ab7d9e7 100644 --- a/src/main/java/fucoin/actions/control/ActionWalletSendMoney.java +++ b/src/main/java/fucoin/actions/control/ActionWalletSendMoney.java @@ -13,13 +13,20 @@ public class ActionWalletSendMoney extends ClientAction { protected String address; protected int amount; - public ActionWalletSendMoney(String address, int amount) { + protected ActorRef observer; + + public ActionWalletSendMoney(String address, int amount, ActorRef observer) { this.address = address; this.amount = amount; + this.observer = observer; + } + + public ActionWalletSendMoney(String address, int amount){ + this(address, amount, null); } @Override protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, AbstractWallet abstractNode) { - abstractNode.send(address, amount); + abstractNode.send(address, amount, observer); } } diff --git a/src/main/java/fucoin/actions/transaction/ActionCommitDistributedCommittedTransfer.java b/src/main/java/fucoin/actions/transaction/ActionCommitDistributedCommittedTransfer.java index 1d1a7b2cb376b5ba83ffa9f6708d049b8a350a02..54ab5b06d2869bbbc7670aa67f4eac76b125380d 100644 --- a/src/main/java/fucoin/actions/transaction/ActionCommitDistributedCommittedTransfer.java +++ b/src/main/java/fucoin/actions/transaction/ActionCommitDistributedCommittedTransfer.java @@ -14,16 +14,19 @@ public class ActionCommitDistributedCommittedTransfer extends ClientAction { private boolean granted; private long timestamp; private long id; + private ActorRef observer; public ActionCommitDistributedCommittedTransfer(ActorRef source, ActorRef target, - int amount, boolean granted, long timestamp, long id) { + int amount, boolean granted, long timestamp, long id, + ActorRef observer) { this.source = source; this.target = target; this.amount = amount; this.granted = granted; this.timestamp = timestamp; this.id = id; + this.observer = observer; } public ActionCommitDistributedCommittedTransfer( @@ -34,6 +37,7 @@ public class ActionCommitDistributedCommittedTransfer extends ClientAction { this.granted = false; this.timestamp = outdatedRequest.getTimeout(); this.id = outdatedRequest.getId(); + this.observer = outdatedRequest.getObserver(); } @Override @@ -48,6 +52,11 @@ public class ActionCommitDistributedCommittedTransfer extends ClientAction { } else if (target.compareTo(self) == 0) { wallet.setAmount(wallet.getAmount() + amount); wallet.addTransactionLogMessageSuccess("Received " + amount + " FUC from " + source.path().name()); + // recipient should notify a possible observer + if (observer != null){ + observer.tell(new ActionNotifyObserver(source,target,amount,granted,timestamp, id), self); + } + } } else { @@ -61,6 +70,9 @@ public class ActionCommitDistributedCommittedTransfer extends ClientAction { if (source.compareTo(self) == 0) { wallet.addTransactionLogMessageFail("Failed to send " + amount + " FUC to " + target.path().name() + " (Commit has not been granted)"); + if (observer != null){ + observer.tell(new ActionNotifyObserver(source,target,amount,granted,timestamp, id), self); + } } } diff --git a/src/main/java/fucoin/actions/transaction/ActionInvokeDistributedCommittedTransfer.java b/src/main/java/fucoin/actions/transaction/ActionInvokeDistributedCommittedTransfer.java index 13f2ca11b0663cf70685086579966e1ae21e0ef2..b277c64a9c613d842769089275996769947c0f60 100644 --- a/src/main/java/fucoin/actions/transaction/ActionInvokeDistributedCommittedTransfer.java +++ b/src/main/java/fucoin/actions/transaction/ActionInvokeDistributedCommittedTransfer.java @@ -9,12 +9,19 @@ public class ActionInvokeDistributedCommittedTransfer extends CoordinatorTransac private ActorRef source; private ActorRef target; + private ActorRef observer; private int amount; public ActionInvokeDistributedCommittedTransfer(ActorRef source, - ActorRef target, int amount) { + ActorRef target, int amount){ + this(source, target, amount, null); + } + + public ActionInvokeDistributedCommittedTransfer(ActorRef source, + ActorRef target, int amount, ActorRef observer) { this.source = source; this.target = target; + this.observer = observer; this.amount = amount; } @@ -27,7 +34,7 @@ public class ActionInvokeDistributedCommittedTransfer extends CoordinatorTransac long timeout = System.currentTimeMillis() + 500; - DistributedCommittedTransferRequest ds = new DistributedCommittedTransferRequest(source, target, amount, timeout); + DistributedCommittedTransferRequest ds = new DistributedCommittedTransferRequest(source, target, amount, timeout, observer); superVisor.addDistributedCommitedTransferRequest(ds); ActionPrepareDistributedCommittedTransfer apdct = new ActionPrepareDistributedCommittedTransfer(source, target, amount, timeout, ds.getId()); for (ActorRef neighbor : superVisor.getKnownNeighbors().values()) { diff --git a/src/main/java/fucoin/actions/transaction/ActionInvokeSentMoney.java b/src/main/java/fucoin/actions/transaction/ActionInvokeSentMoney.java index 0327d8c05394d2272c0aa16ed0bd338619969086..9f7c2f25a5b8202f92506c9800de0218fe810dd1 100644 --- a/src/main/java/fucoin/actions/transaction/ActionInvokeSentMoney.java +++ b/src/main/java/fucoin/actions/transaction/ActionInvokeSentMoney.java @@ -9,10 +9,12 @@ import fucoin.wallet.AbstractWallet; public class ActionInvokeSentMoney extends Transaction { public final String name; public final int amount; + public final ActorRef observer; - public ActionInvokeSentMoney(String name, int amount) { + public ActionInvokeSentMoney(String name, int amount, ActorRef observer) { this.name = name; this.amount = amount; + this.observer = observer; } @Override @@ -21,7 +23,7 @@ public class ActionInvokeSentMoney extends Transaction { wallet.addLogMsg(wallet.getKnownNeighbors() + ""); if (wallet.getKnownNeighbors().containsKey(name)) { wallet.getRemoteSuperVisorActor().tell( - new ActionInvokeDistributedCommittedTransfer(self, wallet.getKnownNeighbors().get(name), amount), sender); + new ActionInvokeDistributedCommittedTransfer(self, wallet.getKnownNeighbors().get(name), amount, observer), sender); } else { ActionSearchWalletReference aswr = new ActionSearchWalletReference(name); for (ActorRef neighbor : wallet.getKnownNeighbors().values()) { diff --git a/src/main/java/fucoin/actions/transaction/ActionNotifyObserver.java b/src/main/java/fucoin/actions/transaction/ActionNotifyObserver.java new file mode 100644 index 0000000000000000000000000000000000000000..46b4666d026f679491a65597d71dfd0620c5873d --- /dev/null +++ b/src/main/java/fucoin/actions/transaction/ActionNotifyObserver.java @@ -0,0 +1,33 @@ +package fucoin.actions.transaction; + +import akka.actor.ActorRef; +import akka.actor.UntypedActorContext; +import fucoin.AbstractNode; +import fucoin.actions.Action; + +/** + * Used to notify an observer of a transaction when the transaction is finished. + */ +public class ActionNotifyObserver extends Action<AbstractNode> { + + public ActorRef source; + public ActorRef target; + public int amount; + public boolean granted; + public long timestamp; + public long id; + + public ActionNotifyObserver(ActorRef source, ActorRef target, + int amount, boolean granted, long timestamp, long id){ + this.source = source; + this.target = target; + this.amount = amount; + this.granted = granted; + this.timestamp = timestamp; + this.id = id; + } + + @Override + protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, AbstractNode abstractNode) { + } +} diff --git a/src/main/java/fucoin/actions/transaction/ActionPrepareDistributedCommittedTransferAnswer.java b/src/main/java/fucoin/actions/transaction/ActionPrepareDistributedCommittedTransferAnswer.java index 01e90b33fb28a47a40d5f29d86a2a0721019c62c..db21831de14d3e95c2d30f335586f35052dc307c 100644 --- a/src/main/java/fucoin/actions/transaction/ActionPrepareDistributedCommittedTransferAnswer.java +++ b/src/main/java/fucoin/actions/transaction/ActionPrepareDistributedCommittedTransferAnswer.java @@ -27,16 +27,22 @@ public class ActionPrepareDistributedCommittedTransferAnswer extends Coordinator @Override protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, SuperVisorImpl superVisor) { - //superVisor.addLogMsg("" + superVisor.getKnownNeighbors()); + superVisor.addLogMsg("granted?" + granted); + DistributedCommittedTransferRequest request = superVisor.getRequest(id); + + // ignore unknown requests + if (request == null){ + return; + } + if (granted) { - if (request == null)//unknown DistributedCommittedTransferRequest ignore - return; + int newCount = request.addPositiveAnswer(sender); if (newCount == superVisor.getKnownNeighbors().size()) { - ActionCommitDistributedCommittedTransfer acdct = new ActionCommitDistributedCommittedTransfer(source, target, amount, true, timestamp, id); + ActionCommitDistributedCommittedTransfer acdct = new ActionCommitDistributedCommittedTransfer(source, target, amount, true, timestamp, id, request.getObserver()); superVisor.addTransactionLogMessageSuccess("Transfer of " + amount + " FUC from" + source.path().name() + " to " + target.path().name()); for (ActorRef neighbor : request.getAnswers()) { neighbor.tell(acdct, self); @@ -45,13 +51,11 @@ public class ActionPrepareDistributedCommittedTransferAnswer extends Coordinator } } else { //A client wants to rollback - if (request != null) { superVisor.addTransactionLogMessageFail("Client does not grant commit of " + amount + " FUC (" + source.path().name() + " -> " + target.path().name() + ")"); - ActionCommitDistributedCommittedTransfer acdct = new ActionCommitDistributedCommittedTransfer(source, target, amount, false, timestamp, id); + ActionCommitDistributedCommittedTransfer acdct = new ActionCommitDistributedCommittedTransfer(source, target, amount, false, timestamp, id, request.getObserver()); for (ActorRef neighbor : request.getAnswers()) { neighbor.tell(acdct, self); } - } } } diff --git a/src/main/java/fucoin/configurations/DefaultConfiguration.java b/src/main/java/fucoin/configurations/DefaultConfiguration.java index 81e59cc97c32b41a34b241e172cdf13352839ff1..779cdaf400d826d2ce6e1cf404b1ebf0c0e75de3 100644 --- a/src/main/java/fucoin/configurations/DefaultConfiguration.java +++ b/src/main/java/fucoin/configurations/DefaultConfiguration.java @@ -1,7 +1,9 @@ package fucoin.configurations; import akka.actor.ActorRef; +import fucoin.actions.Action; import fucoin.actions.control.ActionWalletSendMoney; +import fucoin.actions.transaction.ActionNotifyObserver; import fucoin.configurations.internal.ConfigurationName; /** @@ -24,11 +26,22 @@ public class DefaultConfiguration extends AbstractConfiguration { e.printStackTrace(); } - wallet1.tell(new ActionWalletSendMoney("Wallet1", 50), wallet1); + wallet1.tell(new ActionWalletSendMoney("Wallet1", 50, getSelf()), wallet1); } @Override public void onReceive(Object message) { + if (message instanceof ActionNotifyObserver){ + ActionNotifyObserver notification = (ActionNotifyObserver) message; + + String status = "successful"; + if(! notification.granted){ + status = "failed"; + } + + System.out.println("Observed a " + status + " transaction of " + notification.amount+" FUCs from " + + notification.source.path().name() + " to " + notification.target.path().name()); + } } } diff --git a/src/main/java/fucoin/supervisor/DistributedCommittedTransferRequest.java b/src/main/java/fucoin/supervisor/DistributedCommittedTransferRequest.java index d4ebd400373f9103c1ed6a5fa8b156b90e613003..26a53bb79c7a8f1660854e3c97d548839301468b 100644 --- a/src/main/java/fucoin/supervisor/DistributedCommittedTransferRequest.java +++ b/src/main/java/fucoin/supervisor/DistributedCommittedTransferRequest.java @@ -13,6 +13,8 @@ public class DistributedCommittedTransferRequest extends Transaction { private final static Random random = new Random(System.currentTimeMillis() + System.nanoTime()); private ActorRef source; private ActorRef target; + private ActorRef observer; + private int amount; private long timeout; @@ -21,13 +23,15 @@ public class DistributedCommittedTransferRequest extends Transaction { private List<ActorRef> answers = new LinkedList<>(); public DistributedCommittedTransferRequest(ActorRef source, ActorRef target, int amount, - long timeout) { + long timeout, ActorRef observer) { this.source = source; this.target = target; this.amount = amount; this.timeout = timeout; this.id = random.nextLong(); + + this.observer = observer; } @Override @@ -64,4 +68,8 @@ public class DistributedCommittedTransferRequest extends Transaction { public int getAmount() { return amount; } + + public ActorRef getObserver(){ + return observer; + } } diff --git a/src/main/java/fucoin/wallet/AbstractWallet.java b/src/main/java/fucoin/wallet/AbstractWallet.java index d9f99f719ad6bf7799d802143a670b7a80cf26e5..882c2a547b9060a576589281eaaea8c9d3e078ba 100644 --- a/src/main/java/fucoin/wallet/AbstractWallet.java +++ b/src/main/java/fucoin/wallet/AbstractWallet.java @@ -99,10 +99,19 @@ public abstract class AbstractWallet extends AbstractNode implements Serializabl public abstract void deferSendOfSuperVisorActor(ActorRef destinationWallet); /** - * Sends amount FUCs to the wallet with the address adress + * Sends amount FUCs to the wallet with the address address * * @param address Recipients address * @param amount Amount to send */ public abstract void send(String address, int amount); + + /** + * Sends amount FUCs to the wallet with the address address and the observer observer + * + * @param address + * @param amount + * @param observer + */ + public abstract void send(String address, int amount, ActorRef observer); } diff --git a/src/main/java/fucoin/wallet/WalletImpl.java b/src/main/java/fucoin/wallet/WalletImpl.java index ef38e97f476e337e25c86b3e5b80548f54b5979d..951a1519f91fa609e143fce37a554279eebcecbb 100644 --- a/src/main/java/fucoin/wallet/WalletImpl.java +++ b/src/main/java/fucoin/wallet/WalletImpl.java @@ -211,7 +211,12 @@ public class WalletImpl extends AbstractWallet { } public void send(String address, int amount) { - getSelf().tell(new ActionInvokeSentMoney(address, amount), getSelf()); + send(address, amount, null); + } + + @Override + public void send(String address, int amount, ActorRef observer) { + getSelf().tell(new ActionInvokeSentMoney(address, amount, observer), getSelf()); } @Override