Skip to content
Snippets Groups Projects
Unverified Commit dbba1e8c authored by David Bohn's avatar David Bohn
Browse files

Fixed wait for wallet creation

parent 7a9a92de
Branches
No related tags found
1 merge request!5Configuration system
package fucoin.actions.control;
import akka.actor.ActorRef;
import akka.actor.UntypedActorContext;
import fucoin.actions.transaction.SuperVisorAction;
import fucoin.supervisor.SuperVisorImpl;
/**
* Announce, that there will be some wallets spawned.
* Used by the configurator to wait, until all new wallets have their initial money.
* Send to supervisor.
*/
public class ActionAnnounceWalletCreation extends SuperVisorAction {
public int numOfWallets;
public ActorRef observer;
public ActionAnnounceWalletCreation(int numOfWallets, ActorRef observer) {
this.numOfWallets = numOfWallets;
this.observer = observer;
}
@Override
protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, SuperVisorImpl abstractNode) {
System.out.println("Waiting for wallet transactions");
abstractNode.setPendingBankCommits(abstractNode.getPendingBankCommits() + numOfWallets);
abstractNode.setBankCommitObserver(sender);
}
}
package fucoin.actions.control;
import akka.actor.ActorRef;
import akka.actor.UntypedActorContext;
import fucoin.actions.Action;
import fucoin.configurations.AbstractConfiguration;
/**
* This message tells the configuration, that the wallets have been spawned successfully
*/
public class ActionWalletCreationDone extends Action<AbstractConfiguration> {
@Override
protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, AbstractConfiguration abstractNode) {
}
}
...@@ -26,7 +26,7 @@ public class ServerActionJoin extends SuperVisorAction { ...@@ -26,7 +26,7 @@ public class ServerActionJoin extends SuperVisorAction {
sender.tell(aja, self); sender.tell(aja, self);
node.addKnownNeighbor(name, sender); node.addKnownNeighbor(name, sender);
self.tell( self.tell(
new ActionInvokeDistributedCommittedTransfer(self, sender, 100), new ActionInvokeDistributedCommittedTransfer(self, sender, 100, self),
sender); sender);
} }
} }
...@@ -2,11 +2,17 @@ package fucoin.configurations; ...@@ -2,11 +2,17 @@ package fucoin.configurations;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.actor.Props; import akka.actor.Props;
import akka.pattern.Patterns;
import akka.util.Timeout;
import fucoin.AbstractNode; import fucoin.AbstractNode;
import fucoin.actions.control.ActionAnnounceWalletCreation;
import fucoin.actions.join.ActionTellSupervisor; import fucoin.actions.join.ActionTellSupervisor;
import fucoin.configurations.internal.ConfigurationCreator; import fucoin.configurations.internal.ConfigurationCreator;
import fucoin.supervisor.SuperVisorImpl; import fucoin.supervisor.SuperVisorImpl;
import fucoin.wallet.WalletImpl; import fucoin.wallet.WalletImpl;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
...@@ -21,12 +27,28 @@ public abstract class AbstractConfiguration extends AbstractNode { ...@@ -21,12 +27,28 @@ public abstract class AbstractConfiguration extends AbstractNode {
protected final List<ActorRef> activeActors = new ArrayList<>(); protected final List<ActorRef> activeActors = new ArrayList<>();
protected Timeout timeout = new Timeout(Duration.create(10, "seconds"));
public static Props props(Class configurationClass) { public static Props props(Class configurationClass) {
return Props.create(new ConfigurationCreator(configurationClass)); return Props.create(new ConfigurationCreator(configurationClass));
} }
public ActorRef spawnWallet(String name, boolean createGUI) { /**
* Spawns a new wallet and blocks until it has received its initial money
* @throws Exception on timeout
*/
public ActorRef spawnWallet(String name, boolean createGUI) throws Exception {
Future<Object> future = Patterns.ask(superVisor, new ActionAnnounceWalletCreation(1, self()), timeout);
ActorRef wallet = createWallet(name, createGUI);
Await.result(future, timeout.duration());
return wallet;
}
/**
* Creates a wallet without blocking until the wallet was created
*/
public ActorRef createWallet(String name, boolean createGUI) {
Props props; Props props;
int numOfWallets = activeActors.size(); int numOfWallets = activeActors.size();
if (numOfWallets == 0) { if (numOfWallets == 0) {
...@@ -40,24 +62,37 @@ public abstract class AbstractConfiguration extends AbstractNode { ...@@ -40,24 +62,37 @@ public abstract class AbstractConfiguration extends AbstractNode {
activeActors.add(actorRef); activeActors.add(actorRef);
if (numOfWallets == 0) { if (numOfWallets == 0) {
//actorRef.tell(new ActionJoinAnswer(cSuperVisorActor), cSuperVisorActor);
actorRef.tell(new ActionTellSupervisor(superVisor), superVisor); actorRef.tell(new ActionTellSupervisor(superVisor), superVisor);
} }
return actorRef; return actorRef;
} }
public void spawnWallets(int n, boolean createGUI) { /**
* Spawn multiple wallets and wait until they all have their initial FUC
* @throws Exception on timeout
*/
public void spawnWallets(int n, boolean createGUI) throws Exception {
Future<Object> future = Patterns.ask(superVisor, new ActionAnnounceWalletCreation(n, self()), timeout);
for (int i = 0; i < n; i++) { for (int i = 0; i < n; i++) {
String nameOfTheWallet = "Wallet" + String.valueOf(activeActors.size()); String nameOfTheWallet = "Wallet" + String.valueOf(activeActors.size());
spawnWallet(nameOfTheWallet, createGUI); createWallet(nameOfTheWallet, createGUI);
} }
Await.result(future, timeout.duration());
} }
/**
* Fetch a random wallet
* @return
*/
public ActorRef getRandomWallet() { public ActorRef getRandomWallet() {
return activeActors.get(ThreadLocalRandom.current().nextInt(activeActors.size())); return activeActors.get(ThreadLocalRandom.current().nextInt(activeActors.size()));
} }
/**
* Create the supervisor node
* @return
*/
public ActorRef initSupervisor() { public ActorRef initSupervisor() {
superVisor = context().actorOf(SuperVisorImpl.props(), "SuperVisorImpl"); superVisor = context().actorOf(SuperVisorImpl.props(), "SuperVisorImpl");
......
...@@ -16,17 +16,22 @@ public class DefaultConfiguration extends AbstractConfiguration { ...@@ -16,17 +16,22 @@ public class DefaultConfiguration extends AbstractConfiguration {
public void run() { public void run() {
initSupervisor(); initSupervisor();
ActorRef wallet1 = spawnWallet("Wallet0", false); ActorRef wallet1 = null;
ActorRef wallet2 = spawnWallet("Wallet1", false); ActorRef wallet2 = null;
try {
// TODO: this should be solved differently wallet1 = spawnWallet("Wallet0", false);
} catch (Exception e) {
System.out.println("Wallet0 spawning timed out");
}
try { try {
Thread.sleep(5000); wallet2 = spawnWallet("Wallet1", false);
} catch (InterruptedException e) { } catch (Exception e) {
e.printStackTrace(); System.out.println("Wallet1 spawning timed out");
} }
wallet1.tell(new ActionWalletSendMoney("Wallet1", 50, getSelf()), wallet1); if (wallet1 != null && wallet2 != null) {
wallet1.tell(new ActionWalletSendMoney(wallet2.path().name(), 50, getSelf()), wallet1);
}
} }
@Override @Override
......
...@@ -10,7 +10,12 @@ public class MassWalletConfiguration extends AbstractConfiguration { ...@@ -10,7 +10,12 @@ public class MassWalletConfiguration extends AbstractConfiguration {
@Override @Override
public void run() { public void run() {
initSupervisor(); initSupervisor();
spawnWallets(200, false); try {
spawnWallets(200, false);
System.out.println("Wallet spawning done!");
} catch (Exception e) {
System.out.println("Wallet spawning timed out!");
}
} }
@Override @Override
......
package fucoin.supervisor; package fucoin.supervisor;
import akka.actor.ActorRef;
import akka.actor.Props; import akka.actor.Props;
import fucoin.actions.Action; import fucoin.actions.Action;
import fucoin.actions.control.ActionWalletCreationDone;
import fucoin.actions.persist.ActionInvokeUpdate; import fucoin.actions.persist.ActionInvokeUpdate;
import fucoin.actions.transaction.ActionGetAmountAnswer; import fucoin.actions.transaction.ActionGetAmountAnswer;
import fucoin.actions.transaction.ActionNotifyObserver;
import fucoin.actions.transaction.SuperVisorAction; import fucoin.actions.transaction.SuperVisorAction;
import fucoin.gui.SuperVisorGuiControl; import fucoin.gui.SuperVisorGuiControl;
import fucoin.AbstractNode; import fucoin.AbstractNode;
...@@ -24,6 +27,9 @@ public class SuperVisorImpl extends AbstractNode implements TransactionLogger { ...@@ -24,6 +27,9 @@ public class SuperVisorImpl extends AbstractNode implements TransactionLogger {
private SuperVisorGuiControl gui; private SuperVisorGuiControl gui;
private int pendingBankCommits = 0;
private ActorRef bankCommitObserver = null;
public SuperVisorImpl() { public SuperVisorImpl() {
} }
...@@ -40,6 +46,19 @@ public class SuperVisorImpl extends AbstractNode implements TransactionLogger { ...@@ -40,6 +46,19 @@ public class SuperVisorImpl extends AbstractNode implements TransactionLogger {
if (msg instanceof ActionGetAmountAnswer) { if (msg instanceof ActionGetAmountAnswer) {
ActionGetAmountAnswer answer = (ActionGetAmountAnswer) msg; ActionGetAmountAnswer answer = (ActionGetAmountAnswer) msg;
SwingUtilities.invokeLater(() -> gui.updateTable(answer.address, answer.name, answer.amount)); SwingUtilities.invokeLater(() -> gui.updateTable(answer.address, answer.name, answer.amount));
} else if (msg instanceof ActionNotifyObserver) {
ActionNotifyObserver notifyMsg = (ActionNotifyObserver) msg;
if (notifyMsg.granted) {
if (pendingBankCommits > 0) {
pendingBankCommits--;
} else {
return;
}
if (pendingBankCommits == 0) {
this.getBankCommitObserver().tell(new ActionWalletCreationDone(), self());
}
}
} /* TODO: Whats happened here?? Why we can invoke doAction of abstract class? */ else if (msg instanceof SuperVisorAction) { } /* TODO: Whats happened here?? Why we can invoke doAction of abstract class? */ else if (msg instanceof SuperVisorAction) {
((Action) msg).doAction(this); ((Action) msg).doAction(this);
} }
...@@ -132,4 +151,20 @@ public class SuperVisorImpl extends AbstractNode implements TransactionLogger { ...@@ -132,4 +151,20 @@ public class SuperVisorImpl extends AbstractNode implements TransactionLogger {
System.out.println(message); System.out.println(message);
} }
} }
public int getPendingBankCommits() {
return pendingBankCommits;
}
public ActorRef getBankCommitObserver() {
return bankCommitObserver;
}
public void setPendingBankCommits(int pendingBankCommits) {
this.pendingBankCommits = pendingBankCommits;
}
public void setBankCommitObserver(ActorRef bankCommitObserver) {
this.bankCommitObserver = bankCommitObserver;
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment