From dbba1e8cec3171984fc0e181c5ffd81441f77ea9 Mon Sep 17 00:00:00 2001 From: David Bohn <davbohn@googlemail.com> Date: Thu, 23 Jun 2016 00:13:45 +0200 Subject: [PATCH] Fixed wait for wallet creation --- .../control/ActionAnnounceWalletCreation.java | 30 +++++++++++++ .../control/ActionWalletCreationDone.java | 16 +++++++ .../fucoin/actions/join/ServerActionJoin.java | 2 +- .../configurations/AbstractConfiguration.java | 43 +++++++++++++++++-- .../configurations/DefaultConfiguration.java | 21 +++++---- .../MassWalletConfiguration.java | 7 ++- .../fucoin/supervisor/SuperVisorImpl.java | 35 +++++++++++++++ 7 files changed, 140 insertions(+), 14 deletions(-) create mode 100644 src/main/java/fucoin/actions/control/ActionAnnounceWalletCreation.java create mode 100644 src/main/java/fucoin/actions/control/ActionWalletCreationDone.java diff --git a/src/main/java/fucoin/actions/control/ActionAnnounceWalletCreation.java b/src/main/java/fucoin/actions/control/ActionAnnounceWalletCreation.java new file mode 100644 index 0000000..ee897f7 --- /dev/null +++ b/src/main/java/fucoin/actions/control/ActionAnnounceWalletCreation.java @@ -0,0 +1,30 @@ +package fucoin.actions.control; + +import akka.actor.ActorRef; +import akka.actor.UntypedActorContext; +import fucoin.actions.transaction.SuperVisorAction; +import fucoin.supervisor.SuperVisorImpl; + +/** + * Announce, that there will be some wallets spawned. + * Used by the configurator to wait, until all new wallets have their initial money. + * Send to supervisor. + */ +public class ActionAnnounceWalletCreation extends SuperVisorAction { + + public int numOfWallets; + public ActorRef observer; + + public ActionAnnounceWalletCreation(int numOfWallets, ActorRef observer) { + + this.numOfWallets = numOfWallets; + this.observer = observer; + } + + @Override + protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, SuperVisorImpl abstractNode) { + System.out.println("Waiting for wallet transactions"); + abstractNode.setPendingBankCommits(abstractNode.getPendingBankCommits() + numOfWallets); + abstractNode.setBankCommitObserver(sender); + } +} diff --git a/src/main/java/fucoin/actions/control/ActionWalletCreationDone.java b/src/main/java/fucoin/actions/control/ActionWalletCreationDone.java new file mode 100644 index 0000000..1184aca --- /dev/null +++ b/src/main/java/fucoin/actions/control/ActionWalletCreationDone.java @@ -0,0 +1,16 @@ +package fucoin.actions.control; + +import akka.actor.ActorRef; +import akka.actor.UntypedActorContext; +import fucoin.actions.Action; +import fucoin.configurations.AbstractConfiguration; + +/** + * This message tells the configuration, that the wallets have been spawned successfully + */ +public class ActionWalletCreationDone extends Action<AbstractConfiguration> { + @Override + protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, AbstractConfiguration abstractNode) { + + } +} diff --git a/src/main/java/fucoin/actions/join/ServerActionJoin.java b/src/main/java/fucoin/actions/join/ServerActionJoin.java index abc8ea1..18a1cf6 100644 --- a/src/main/java/fucoin/actions/join/ServerActionJoin.java +++ b/src/main/java/fucoin/actions/join/ServerActionJoin.java @@ -26,7 +26,7 @@ public class ServerActionJoin extends SuperVisorAction { sender.tell(aja, self); node.addKnownNeighbor(name, sender); self.tell( - new ActionInvokeDistributedCommittedTransfer(self, sender, 100), + new ActionInvokeDistributedCommittedTransfer(self, sender, 100, self), sender); } } diff --git a/src/main/java/fucoin/configurations/AbstractConfiguration.java b/src/main/java/fucoin/configurations/AbstractConfiguration.java index a3ac5a4..61347b1 100644 --- a/src/main/java/fucoin/configurations/AbstractConfiguration.java +++ b/src/main/java/fucoin/configurations/AbstractConfiguration.java @@ -2,11 +2,17 @@ package fucoin.configurations; import akka.actor.ActorRef; import akka.actor.Props; +import akka.pattern.Patterns; +import akka.util.Timeout; import fucoin.AbstractNode; +import fucoin.actions.control.ActionAnnounceWalletCreation; import fucoin.actions.join.ActionTellSupervisor; import fucoin.configurations.internal.ConfigurationCreator; import fucoin.supervisor.SuperVisorImpl; import fucoin.wallet.WalletImpl; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.Duration; import java.util.ArrayList; import java.util.List; @@ -21,12 +27,28 @@ public abstract class AbstractConfiguration extends AbstractNode { protected final List<ActorRef> activeActors = new ArrayList<>(); + protected Timeout timeout = new Timeout(Duration.create(10, "seconds")); + public static Props props(Class configurationClass) { return Props.create(new ConfigurationCreator(configurationClass)); } - public ActorRef spawnWallet(String name, boolean createGUI) { + /** + * Spawns a new wallet and blocks until it has received its initial money + * @throws Exception on timeout + */ + public ActorRef spawnWallet(String name, boolean createGUI) throws Exception { + Future<Object> future = Patterns.ask(superVisor, new ActionAnnounceWalletCreation(1, self()), timeout); + ActorRef wallet = createWallet(name, createGUI); + Await.result(future, timeout.duration()); + return wallet; + } + + /** + * Creates a wallet without blocking until the wallet was created + */ + public ActorRef createWallet(String name, boolean createGUI) { Props props; int numOfWallets = activeActors.size(); if (numOfWallets == 0) { @@ -40,24 +62,37 @@ public abstract class AbstractConfiguration extends AbstractNode { activeActors.add(actorRef); if (numOfWallets == 0) { - //actorRef.tell(new ActionJoinAnswer(cSuperVisorActor), cSuperVisorActor); actorRef.tell(new ActionTellSupervisor(superVisor), superVisor); } return actorRef; } - public void spawnWallets(int n, boolean createGUI) { + /** + * Spawn multiple wallets and wait until they all have their initial FUC + * @throws Exception on timeout + */ + public void spawnWallets(int n, boolean createGUI) throws Exception { + Future<Object> future = Patterns.ask(superVisor, new ActionAnnounceWalletCreation(n, self()), timeout); for (int i = 0; i < n; i++) { String nameOfTheWallet = "Wallet" + String.valueOf(activeActors.size()); - spawnWallet(nameOfTheWallet, createGUI); + createWallet(nameOfTheWallet, createGUI); } + Await.result(future, timeout.duration()); } + /** + * Fetch a random wallet + * @return + */ public ActorRef getRandomWallet() { return activeActors.get(ThreadLocalRandom.current().nextInt(activeActors.size())); } + /** + * Create the supervisor node + * @return + */ public ActorRef initSupervisor() { superVisor = context().actorOf(SuperVisorImpl.props(), "SuperVisorImpl"); diff --git a/src/main/java/fucoin/configurations/DefaultConfiguration.java b/src/main/java/fucoin/configurations/DefaultConfiguration.java index 779cdaf..555e6ef 100644 --- a/src/main/java/fucoin/configurations/DefaultConfiguration.java +++ b/src/main/java/fucoin/configurations/DefaultConfiguration.java @@ -16,17 +16,22 @@ public class DefaultConfiguration extends AbstractConfiguration { public void run() { initSupervisor(); - ActorRef wallet1 = spawnWallet("Wallet0", false); - ActorRef wallet2 = spawnWallet("Wallet1", false); - - // TODO: this should be solved differently + ActorRef wallet1 = null; + ActorRef wallet2 = null; + try { + wallet1 = spawnWallet("Wallet0", false); + } catch (Exception e) { + System.out.println("Wallet0 spawning timed out"); + } try { - Thread.sleep(5000); - } catch (InterruptedException e) { - e.printStackTrace(); + wallet2 = spawnWallet("Wallet1", false); + } catch (Exception e) { + System.out.println("Wallet1 spawning timed out"); } - wallet1.tell(new ActionWalletSendMoney("Wallet1", 50, getSelf()), wallet1); + if (wallet1 != null && wallet2 != null) { + wallet1.tell(new ActionWalletSendMoney(wallet2.path().name(), 50, getSelf()), wallet1); + } } @Override diff --git a/src/main/java/fucoin/configurations/MassWalletConfiguration.java b/src/main/java/fucoin/configurations/MassWalletConfiguration.java index 1ebb3ee..e2d3c0a 100644 --- a/src/main/java/fucoin/configurations/MassWalletConfiguration.java +++ b/src/main/java/fucoin/configurations/MassWalletConfiguration.java @@ -10,7 +10,12 @@ public class MassWalletConfiguration extends AbstractConfiguration { @Override public void run() { initSupervisor(); - spawnWallets(200, false); + try { + spawnWallets(200, false); + System.out.println("Wallet spawning done!"); + } catch (Exception e) { + System.out.println("Wallet spawning timed out!"); + } } @Override diff --git a/src/main/java/fucoin/supervisor/SuperVisorImpl.java b/src/main/java/fucoin/supervisor/SuperVisorImpl.java index ee0d68c..8b4ec17 100644 --- a/src/main/java/fucoin/supervisor/SuperVisorImpl.java +++ b/src/main/java/fucoin/supervisor/SuperVisorImpl.java @@ -1,9 +1,12 @@ package fucoin.supervisor; +import akka.actor.ActorRef; import akka.actor.Props; import fucoin.actions.Action; +import fucoin.actions.control.ActionWalletCreationDone; import fucoin.actions.persist.ActionInvokeUpdate; import fucoin.actions.transaction.ActionGetAmountAnswer; +import fucoin.actions.transaction.ActionNotifyObserver; import fucoin.actions.transaction.SuperVisorAction; import fucoin.gui.SuperVisorGuiControl; import fucoin.AbstractNode; @@ -24,6 +27,9 @@ public class SuperVisorImpl extends AbstractNode implements TransactionLogger { private SuperVisorGuiControl gui; + private int pendingBankCommits = 0; + private ActorRef bankCommitObserver = null; + public SuperVisorImpl() { } @@ -40,6 +46,19 @@ public class SuperVisorImpl extends AbstractNode implements TransactionLogger { if (msg instanceof ActionGetAmountAnswer) { ActionGetAmountAnswer answer = (ActionGetAmountAnswer) msg; SwingUtilities.invokeLater(() -> gui.updateTable(answer.address, answer.name, answer.amount)); + } else if (msg instanceof ActionNotifyObserver) { + ActionNotifyObserver notifyMsg = (ActionNotifyObserver) msg; + if (notifyMsg.granted) { + if (pendingBankCommits > 0) { + pendingBankCommits--; + } else { + return; + } + + if (pendingBankCommits == 0) { + this.getBankCommitObserver().tell(new ActionWalletCreationDone(), self()); + } + } } /* TODO: Whats happened here?? Why we can invoke doAction of abstract class? */ else if (msg instanceof SuperVisorAction) { ((Action) msg).doAction(this); } @@ -132,4 +151,20 @@ public class SuperVisorImpl extends AbstractNode implements TransactionLogger { System.out.println(message); } } + + public int getPendingBankCommits() { + return pendingBankCommits; + } + + public ActorRef getBankCommitObserver() { + return bankCommitObserver; + } + + public void setPendingBankCommits(int pendingBankCommits) { + this.pendingBankCommits = pendingBankCommits; + } + + public void setBankCommitObserver(ActorRef bankCommitObserver) { + this.bankCommitObserver = bankCommitObserver; + } } -- GitLab