diff --git a/pom.xml b/pom.xml index 9bf2c6d5319c4ce47d79e13325a52750cc823b3b..1a37142e64dcde75bec6b23cb852f4fcb0089deb 100644 --- a/pom.xml +++ b/pom.xml @@ -51,5 +51,10 @@ <artifactId>akka-remote_2.11</artifactId> <version>2.4.7</version> </dependency> + <dependency> + <groupId>org.reflections</groupId> + <artifactId>reflections</artifactId> + <version>0.9.10</version> + </dependency> </dependencies> </project> \ No newline at end of file diff --git a/src/main/java/fucoin/AbstractNode.java b/src/main/java/fucoin/AbstractNode.java index 437262c5c1f4f7472674e58e445abb1104136d56..9777980aa3f1880b274b162f251bf001d5cb61ef 100644 --- a/src/main/java/fucoin/AbstractNode.java +++ b/src/main/java/fucoin/AbstractNode.java @@ -68,9 +68,4 @@ public abstract class AbstractNode extends UntypedActor implements Serializable public HashMap<String, ActorRef> getKnownNeighbors() { return knownNeighbors; } - - public void log(String string) { - System.out.println(getSelf().path().name() + ": " + string); - } - } \ No newline at end of file diff --git a/src/main/java/fucoin/Main.java b/src/main/java/fucoin/Main.java index c715d6ffd66799df36f686e6607c96a3c2ec9a43..a42c3888e50de17e55c10e477f7d9b213576ed56 100644 --- a/src/main/java/fucoin/Main.java +++ b/src/main/java/fucoin/Main.java @@ -1,30 +1,32 @@ package fucoin; -import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; +import com.google.common.collect.Sets; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; -import fucoin.actions.join.ActionJoinAnswer; +import fucoin.configurations.AbstractConfiguration; +import fucoin.configurations.internal.ConfigurationSelection; import fucoin.setup.NetworkInterfaceReader; -import fucoin.supervisor.SuperVisorImpl; -import fucoin.wallet.WalletImpl; - +import org.reflections.ReflectionUtils; +import org.reflections.Reflections; +import org.reflections.scanners.ResourcesScanner; +import org.reflections.scanners.SubTypesScanner; +import org.reflections.scanners.TypeElementsScanner; +import org.reflections.util.ClasspathHelper; +import org.reflections.util.ConfigurationBuilder; +import org.reflections.util.FilterBuilder; + +import javax.swing.*; import java.io.File; -import java.util.ArrayList; -import java.util.List; +import java.lang.reflect.Modifier; +import java.util.*; public class Main { - private static int numberOfWallets = 2; - private static ActorSystem cSystem; - private static ActorRef cSuperVisorActor; - - private static List<ActorRef> cActiveActors = new ArrayList<>(); - static { String hostname = NetworkInterfaceReader.readDefaultHostname(); @@ -40,36 +42,65 @@ public class Main { //Init System Actor System cSystem = ActorSystem.create("Core", ConfigFactory.parseString("akka.remote.netty.tcp.hostname=" + hostname).withFallback(config)); - cSuperVisorActor = cSystem.actorOf(SuperVisorImpl.props(), "SuperVisorImpl"); - } - - public static void main(String[] args) throws InterruptedException { - createWallets(); } - private static void createWallets() { - //Init Wallets - for (int i = 0; i < numberOfWallets; i++) { - - String nameOfTheWallet = "Wallet" + String.valueOf(i); - Props props; + public static void main(String[] args) throws InterruptedException, IllegalAccessException, InstantiationException { + List<ConfigurationSelection> configurations = getAbstractConfigurations(); - if (i > 0) { - //chain the wallets. wallet2 knows wallet1, wallet3 knows wallet2 and so on. - props = WalletImpl.props(cActiveActors.get(i - 1), nameOfTheWallet); - } else { - props = WalletImpl.props(null, nameOfTheWallet); - } + ConfigurationSelection[] configs = new ConfigurationSelection[configurations.size()]; + configurations.toArray(configs); - ActorRef actorRef = cSystem.actorOf(props, nameOfTheWallet); + // Display the selection dialog to select a configuration + ConfigurationSelection selectedConfig = (ConfigurationSelection) JOptionPane.showInputDialog( + null, + "Select a configuration to run", + "Configuration Selection", + JOptionPane.QUESTION_MESSAGE, + null, + configs, + configurations.get(0) + ); - // first wallet does not have a neighbour, so it can't send a ActionJoin to anybody - // instead we send directly an ActionJoinAnswer with the supervisor reference - if (i == 0) { - actorRef.tell(new ActionJoinAnswer(cSuperVisorActor), cSuperVisorActor); - } + if (selectedConfig != null) { + // The Configuration will be an actor in the system, so we tell akka to create the actor + Props theProps = AbstractConfiguration.props(selectedConfig.getConfigurationClass()); - cActiveActors.add(actorRef); + cSystem.actorOf(theProps, "Configuration"); + } else { + cSystem.terminate(); } } + + /** + * This method crawls the fucoin.configurations package for classes extending the AbstractConfiguration. + * + * @return The list contains all non abstract extensions of AbstractConfiguration in the namespace, wrapped in ConfigurationSelection objects + * @throws InstantiationException + * @throws IllegalAccessException + */ + private static List<ConfigurationSelection> getAbstractConfigurations() throws InstantiationException, IllegalAccessException { + List<ClassLoader> classLoadersList = new LinkedList<>(); + // Lots of reflection magic happening here! + classLoadersList.add(ClasspathHelper.contextClassLoader()); + classLoadersList.add(ClasspathHelper.staticClassLoader()); + + Reflections reflections = new Reflections(new ConfigurationBuilder() + .setScanners(new SubTypesScanner(false), new ResourcesScanner(), new TypeElementsScanner()) + .setUrls(ClasspathHelper.forClassLoader(classLoadersList.toArray(new ClassLoader[0]))) + .filterInputsBy(new FilterBuilder().include(FilterBuilder.prefix("fucoin.configurations")))); + + Set<String> typeSet = reflections.getStore().get("TypeElementsScanner").keySet(); + HashSet<Class<?>> allClasses = Sets.newHashSet(ReflectionUtils.forNames(typeSet, reflections + .getConfiguration().getClassLoaders())); + + List<ConfigurationSelection> configurations = new ArrayList<>(); + + // Filter the found classes for non abstract classes, that inherit from AbstractConfiguration + allClasses.stream().filter(oneClass -> !Modifier.isAbstract(oneClass.getModifiers()) && AbstractConfiguration.class.isAssignableFrom(oneClass)).forEach(oneClass -> { + ConfigurationSelection cfg = new ConfigurationSelection((Class<AbstractConfiguration>) oneClass); + configurations.add(cfg); + }); + + return configurations; + } } diff --git a/src/main/java/fucoin/MainRemote.java b/src/main/java/fucoin/MainRemote.java index 67c8520b0547a598a0cdc321c450324e1efca881..dd1c1aa035f0973020f833c1088657c53294bd9a 100644 --- a/src/main/java/fucoin/MainRemote.java +++ b/src/main/java/fucoin/MainRemote.java @@ -91,7 +91,7 @@ public class MainRemote { } // spawn wallet - system.actorOf(WalletImpl.props(preknownNeighbour, walletName), walletName); + system.actorOf(WalletImpl.props(preknownNeighbour, walletName, true), walletName); } } diff --git a/src/main/java/fucoin/actions/control/ActionAnnounceWalletCreation.java b/src/main/java/fucoin/actions/control/ActionAnnounceWalletCreation.java new file mode 100644 index 0000000000000000000000000000000000000000..ee897f78bf0f3926ad283762364c360fb3096010 --- /dev/null +++ b/src/main/java/fucoin/actions/control/ActionAnnounceWalletCreation.java @@ -0,0 +1,30 @@ +package fucoin.actions.control; + +import akka.actor.ActorRef; +import akka.actor.UntypedActorContext; +import fucoin.actions.transaction.SuperVisorAction; +import fucoin.supervisor.SuperVisorImpl; + +/** + * Announce, that there will be some wallets spawned. + * Used by the configurator to wait, until all new wallets have their initial money. + * Send to supervisor. + */ +public class ActionAnnounceWalletCreation extends SuperVisorAction { + + public int numOfWallets; + public ActorRef observer; + + public ActionAnnounceWalletCreation(int numOfWallets, ActorRef observer) { + + this.numOfWallets = numOfWallets; + this.observer = observer; + } + + @Override + protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, SuperVisorImpl abstractNode) { + System.out.println("Waiting for wallet transactions"); + abstractNode.setPendingBankCommits(abstractNode.getPendingBankCommits() + numOfWallets); + abstractNode.setBankCommitObserver(sender); + } +} diff --git a/src/main/java/fucoin/actions/control/ActionWalletCreationDone.java b/src/main/java/fucoin/actions/control/ActionWalletCreationDone.java new file mode 100644 index 0000000000000000000000000000000000000000..1184aca89d216b62c28c6db7f1fdcb909fc0e9a5 --- /dev/null +++ b/src/main/java/fucoin/actions/control/ActionWalletCreationDone.java @@ -0,0 +1,16 @@ +package fucoin.actions.control; + +import akka.actor.ActorRef; +import akka.actor.UntypedActorContext; +import fucoin.actions.Action; +import fucoin.configurations.AbstractConfiguration; + +/** + * This message tells the configuration, that the wallets have been spawned successfully + */ +public class ActionWalletCreationDone extends Action<AbstractConfiguration> { + @Override + protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, AbstractConfiguration abstractNode) { + + } +} diff --git a/src/main/java/fucoin/actions/control/ActionWalletSendMoney.java b/src/main/java/fucoin/actions/control/ActionWalletSendMoney.java new file mode 100644 index 0000000000000000000000000000000000000000..5b7faaf6b720e806ffd02ca311f50b745ab7d9e7 --- /dev/null +++ b/src/main/java/fucoin/actions/control/ActionWalletSendMoney.java @@ -0,0 +1,32 @@ +package fucoin.actions.control; + +import akka.actor.ActorRef; +import akka.actor.UntypedActorContext; +import fucoin.actions.ClientAction; +import fucoin.wallet.AbstractWallet; + +/** + * + */ +public class ActionWalletSendMoney extends ClientAction { + + protected String address; + protected int amount; + + protected ActorRef observer; + + public ActionWalletSendMoney(String address, int amount, ActorRef observer) { + this.address = address; + this.amount = amount; + this.observer = observer; + } + + public ActionWalletSendMoney(String address, int amount){ + this(address, amount, null); + } + + @Override + protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, AbstractWallet abstractNode) { + abstractNode.send(address, amount, observer); + } +} diff --git a/src/main/java/fucoin/actions/join/ActionJoin.java b/src/main/java/fucoin/actions/join/ActionJoin.java index e7bd81419911b9291a7d4da154d227020f1828a8..205d09a5e5c36c25f3588606cfbccad9b4fe24b7 100644 --- a/src/main/java/fucoin/actions/join/ActionJoin.java +++ b/src/main/java/fucoin/actions/join/ActionJoin.java @@ -14,8 +14,14 @@ public class ActionJoin extends ClientAction { protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, AbstractWallet node) { // send the joined node all known neighbours from node and a reference to the supervisor - ActionJoinAnswer aja = new ActionJoinAnswer(node.getRemoteSuperVisorActor()); + ActionJoinAnswer aja = new ActionJoinAnswer(); aja.someNeighbors.putAll(node.getKnownNeighbors()); sender.tell(aja, self); + + if (node.getRemoteSuperVisorActor() == null) { + node.deferSendOfSuperVisorActor(sender); + } else { + sender.tell(new ActionTellSupervisor(node.getRemoteSuperVisorActor()), self); + } } } diff --git a/src/main/java/fucoin/actions/join/ActionJoinAnswer.java b/src/main/java/fucoin/actions/join/ActionJoinAnswer.java index 49cecf1d89a416a85ed8c6749cdfa841da57d232..89c4aadba026ce04ef9b2190b21566fdb7e68874 100644 --- a/src/main/java/fucoin/actions/join/ActionJoinAnswer.java +++ b/src/main/java/fucoin/actions/join/ActionJoinAnswer.java @@ -2,9 +2,13 @@ package fucoin.actions.join; import akka.actor.ActorRef; import akka.actor.UntypedActorContext; +import akka.dispatch.OnSuccess; import fucoin.actions.ClientAction; import fucoin.actions.persist.ActionSearchMyWallet; import fucoin.wallet.AbstractWallet; +import scala.Function1; +import scala.concurrent.Future; +import scala.runtime.BoxedUnit; import java.util.HashMap; import java.util.Map.Entry; @@ -20,16 +24,11 @@ import java.util.Map.Entry; */ public class ActionJoinAnswer extends ClientAction { public final HashMap<String, ActorRef> someNeighbors = new HashMap<>(); - public final ActorRef supervisor; - - public ActionJoinAnswer(ActorRef supervisor) { - this.supervisor = supervisor; - } protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, AbstractWallet wallet) { - wallet.log("Addressed to " + self.path().name() + " from " + sender.path().name() + ": someNeighbors:" + someNeighbors); + wallet.addLogMsg("Addressed to " + self.path().name() + " from " + sender.path().name() + ": someNeighbors:" + someNeighbors); // your neighbours? my neighbours! for (Entry<String, ActorRef> neighbor : someNeighbors.entrySet()) { @@ -38,11 +37,6 @@ public class ActionJoinAnswer extends ClientAction { for (Entry<String, ActorRef> neighbor : someNeighbors.entrySet()) { neighbor.getValue().tell(new ActionSearchMyWallet(wallet.getName()), self); } - - // register at the supervisor if the wallet just learned about it - if (wallet.getRemoteSuperVisorActor() == null) { - wallet.setRemoteSuperVisorActor(supervisor); - } } } diff --git a/src/main/java/fucoin/actions/join/ActionTellSupervisor.java b/src/main/java/fucoin/actions/join/ActionTellSupervisor.java new file mode 100644 index 0000000000000000000000000000000000000000..3c7ef7ae9079976d4212a4c193d5202d45ffc2ac --- /dev/null +++ b/src/main/java/fucoin/actions/join/ActionTellSupervisor.java @@ -0,0 +1,23 @@ +package fucoin.actions.join; + +import akka.actor.ActorRef; +import akka.actor.UntypedActorContext; +import fucoin.actions.ClientAction; +import fucoin.wallet.AbstractWallet; + +/** + * Tell the joining node the supervisor + */ +public class ActionTellSupervisor extends ClientAction { + + public final ActorRef supervisor; + + public ActionTellSupervisor(ActorRef supervisor) { + this.supervisor = supervisor; + } + + @Override + protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, AbstractWallet abstractNode) { + abstractNode.setRemoteSuperVisorActor(supervisor); + } +} diff --git a/src/main/java/fucoin/actions/join/ServerActionJoin.java b/src/main/java/fucoin/actions/join/ServerActionJoin.java index 4c57b136f65958aa606bb75b7f404ca8e6edaf70..18a1cf66174035d224dbacb0744f4fd851218034 100644 --- a/src/main/java/fucoin/actions/join/ServerActionJoin.java +++ b/src/main/java/fucoin/actions/join/ServerActionJoin.java @@ -21,12 +21,12 @@ public class ServerActionJoin extends SuperVisorAction { @Override protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, SuperVisorImpl node) { - ActionJoinAnswer aja = new ActionJoinAnswer(node.getSelf()); + ActionJoinAnswer aja = new ActionJoinAnswer(); // TODO: Might need added TellSupervisor aja.someNeighbors.putAll(node.getKnownNeighbors()); sender.tell(aja, self); node.addKnownNeighbor(name, sender); self.tell( - new ActionInvokeDistributedCommittedTransfer(self, sender, 100), + new ActionInvokeDistributedCommittedTransfer(self, sender, 100, self), sender); } } diff --git a/src/main/java/fucoin/actions/search/ActionSearchWalletReference.java b/src/main/java/fucoin/actions/search/ActionSearchWalletReference.java index fe514e2802102260c18970ab58ce7e85f2e40324..9d0727acf966413757518c861ea2870ea7a67b22 100644 --- a/src/main/java/fucoin/actions/search/ActionSearchWalletReference.java +++ b/src/main/java/fucoin/actions/search/ActionSearchWalletReference.java @@ -23,7 +23,7 @@ public class ActionSearchWalletReference extends Search { @Override protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, AbstractWallet wallet) { - wallet.log(wallet.getKnownNeighbors() + "contains " + name + "?"); + wallet.addLogMsg(wallet.getKnownNeighbors() + "contains " + name + "?"); ttl.add(self); ActionSearchWalletReferenceAnswer answer = null; if (this.name.equals(wallet.getName())) { diff --git a/src/main/java/fucoin/actions/transaction/ActionCommitDistributedCommittedTransfer.java b/src/main/java/fucoin/actions/transaction/ActionCommitDistributedCommittedTransfer.java index 884c132f99f58f41c58ce6de15df90ca162fbfec..641fc121d4a38b970df45fb99e3d9c179bc56ad6 100644 --- a/src/main/java/fucoin/actions/transaction/ActionCommitDistributedCommittedTransfer.java +++ b/src/main/java/fucoin/actions/transaction/ActionCommitDistributedCommittedTransfer.java @@ -14,16 +14,19 @@ public class ActionCommitDistributedCommittedTransfer extends ClientAction { private boolean granted; private long timestamp; private long id; + private ActorRef observer; public ActionCommitDistributedCommittedTransfer(ActorRef source, ActorRef target, - int amount, boolean granted, long timestamp, long id) { + int amount, boolean granted, long timestamp, long id, + ActorRef observer) { this.source = source; this.target = target; this.amount = amount; this.granted = granted; this.timestamp = timestamp; this.id = id; + this.observer = observer; } public ActionCommitDistributedCommittedTransfer( @@ -34,36 +37,45 @@ public class ActionCommitDistributedCommittedTransfer extends ClientAction { this.granted = false; this.timestamp = outdatedRequest.getTimeout(); this.id = outdatedRequest.getId(); + this.observer = outdatedRequest.getObserver(); } @Override protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, AbstractWallet wallet) { - wallet.log("ActionCommitDistributedCommittedTransfer is granted? " + granted); + wallet.addLogMsg("ActionCommitDistributedCommittedTransfer is granted? " + granted); if (granted) { - Integer sourceAmount = wallet.amounts.getOrDefault(source, 0); - Integer targetAmount = wallet.amounts.getOrDefault(target, 0); - wallet.amounts.put(source, sourceAmount - amount); - wallet.amounts.put(target, targetAmount + amount); - if (source.compareTo(self) == 0) { wallet.setAmount(wallet.getAmount() - amount); - wallet.logTransactionSuccess("Sent " + amount + " FUC to " + target.path().name()); + wallet.addTransactionLogMessageSuccess("Sent " + amount + " FUC to " + target.path().name()); } else if (target.compareTo(self) == 0) { wallet.setAmount(wallet.getAmount() + amount); - wallet.logTransactionSuccess("Received " + amount + " FUC from " + source.path().name()); + wallet.addTransactionLogMessageSuccess("Received " + amount + " FUC from " + source.path().name()); + // recipient should notify a possible observer + if (observer != null) { + observer.tell(new ActionNotifyObserver(source, target, amount, granted, timestamp, id), self); + } } } else { - wallet.log("abort transaction with id" + id); + wallet.addLogMsg("abort transaction with id" + id); + + // rollback + Integer sourceAmount = wallet.amounts.getOrDefault(source, 0); + Integer targetAmount = wallet.amounts.getOrDefault(target, 0); + wallet.amounts.put(source, sourceAmount + amount); + wallet.amounts.put(target, targetAmount - amount); if (source.compareTo(self) == 0) { - wallet.logTransactionFail("Failed to send " + amount + " FUC to " + target.path().name() + " (Commit has not been granted)"); + wallet.addTransactionLogMessageFail("Failed to send " + amount + " FUC to " + target.path().name() + " (Commit has not been granted)"); + if (observer != null) { + observer.tell(new ActionNotifyObserver(source, target, amount, granted, timestamp, id), self); + } } } - wallet.log("wallet.amounts:" + wallet.amounts); + //wallet.addLogMsg("wallet.amounts:" + wallet.amounts); } } diff --git a/src/main/java/fucoin/actions/transaction/ActionInvokeDistributedCommittedTransfer.java b/src/main/java/fucoin/actions/transaction/ActionInvokeDistributedCommittedTransfer.java index f3bf02db907f58dbe852398313c0bbc1e3cbbbd3..b277c64a9c613d842769089275996769947c0f60 100644 --- a/src/main/java/fucoin/actions/transaction/ActionInvokeDistributedCommittedTransfer.java +++ b/src/main/java/fucoin/actions/transaction/ActionInvokeDistributedCommittedTransfer.java @@ -9,25 +9,32 @@ public class ActionInvokeDistributedCommittedTransfer extends CoordinatorTransac private ActorRef source; private ActorRef target; + private ActorRef observer; private int amount; public ActionInvokeDistributedCommittedTransfer(ActorRef source, - ActorRef target, int amount) { + ActorRef target, int amount){ + this(source, target, amount, null); + } + + public ActionInvokeDistributedCommittedTransfer(ActorRef source, + ActorRef target, int amount, ActorRef observer) { this.source = source; this.target = target; + this.observer = observer; this.amount = amount; } @Override protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, SuperVisorImpl superVisor) { - superVisor.log("invoke transaction " + source.path().name() + + superVisor.addLogMsg("invoke transaction " + source.path().name() + " sends " + amount + " to " + target.path().name()); long timeout = System.currentTimeMillis() + 500; - DistributedCommittedTransferRequest ds = new DistributedCommittedTransferRequest(source, target, amount, timeout); + DistributedCommittedTransferRequest ds = new DistributedCommittedTransferRequest(source, target, amount, timeout, observer); superVisor.addDistributedCommitedTransferRequest(ds); ActionPrepareDistributedCommittedTransfer apdct = new ActionPrepareDistributedCommittedTransfer(source, target, amount, timeout, ds.getId()); for (ActorRef neighbor : superVisor.getKnownNeighbors().values()) { diff --git a/src/main/java/fucoin/actions/transaction/ActionInvokeSentMoney.java b/src/main/java/fucoin/actions/transaction/ActionInvokeSentMoney.java index 6f3bd3e2817b47e8d3df213f150fa45a5efe04bf..9f7c2f25a5b8202f92506c9800de0218fe810dd1 100644 --- a/src/main/java/fucoin/actions/transaction/ActionInvokeSentMoney.java +++ b/src/main/java/fucoin/actions/transaction/ActionInvokeSentMoney.java @@ -9,19 +9,21 @@ import fucoin.wallet.AbstractWallet; public class ActionInvokeSentMoney extends Transaction { public final String name; public final int amount; + public final ActorRef observer; - public ActionInvokeSentMoney(String name, int amount) { + public ActionInvokeSentMoney(String name, int amount, ActorRef observer) { this.name = name; this.amount = amount; + this.observer = observer; } @Override protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, AbstractWallet wallet) { - wallet.log(wallet.getKnownNeighbors() + ""); + wallet.addLogMsg(wallet.getKnownNeighbors() + ""); if (wallet.getKnownNeighbors().containsKey(name)) { wallet.getRemoteSuperVisorActor().tell( - new ActionInvokeDistributedCommittedTransfer(self, wallet.getKnownNeighbors().get(name), amount), sender); + new ActionInvokeDistributedCommittedTransfer(self, wallet.getKnownNeighbors().get(name), amount, observer), sender); } else { ActionSearchWalletReference aswr = new ActionSearchWalletReference(name); for (ActorRef neighbor : wallet.getKnownNeighbors().values()) { diff --git a/src/main/java/fucoin/actions/transaction/ActionNotifyObserver.java b/src/main/java/fucoin/actions/transaction/ActionNotifyObserver.java new file mode 100644 index 0000000000000000000000000000000000000000..46b4666d026f679491a65597d71dfd0620c5873d --- /dev/null +++ b/src/main/java/fucoin/actions/transaction/ActionNotifyObserver.java @@ -0,0 +1,33 @@ +package fucoin.actions.transaction; + +import akka.actor.ActorRef; +import akka.actor.UntypedActorContext; +import fucoin.AbstractNode; +import fucoin.actions.Action; + +/** + * Used to notify an observer of a transaction when the transaction is finished. + */ +public class ActionNotifyObserver extends Action<AbstractNode> { + + public ActorRef source; + public ActorRef target; + public int amount; + public boolean granted; + public long timestamp; + public long id; + + public ActionNotifyObserver(ActorRef source, ActorRef target, + int amount, boolean granted, long timestamp, long id){ + this.source = source; + this.target = target; + this.amount = amount; + this.granted = granted; + this.timestamp = timestamp; + this.id = id; + } + + @Override + protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, AbstractNode abstractNode) { + } +} diff --git a/src/main/java/fucoin/actions/transaction/ActionPrepareDistributedCommittedTransfer.java b/src/main/java/fucoin/actions/transaction/ActionPrepareDistributedCommittedTransfer.java index 7e7de88bf5a58168b49d67a333dedd266fe90cb7..997178633677c81e8cbfbab8a35e11f9d3b6c86b 100644 --- a/src/main/java/fucoin/actions/transaction/ActionPrepareDistributedCommittedTransfer.java +++ b/src/main/java/fucoin/actions/transaction/ActionPrepareDistributedCommittedTransfer.java @@ -32,6 +32,12 @@ public class ActionPrepareDistributedCommittedTransfer extends Transaction { //sender have enough money && wallet.amounts.getOrDefault(source, 0) >= amount)); + // precautionly update own ledger to prevent double spending (respectively agreeing) + Integer sourceAmount = wallet.amounts.getOrDefault(source, 0); + Integer targetAmount = wallet.amounts.getOrDefault(target, 0); + wallet.amounts.put(source, sourceAmount - amount); + wallet.amounts.put(target, targetAmount + amount); + sender.tell(new ActionPrepareDistributedCommittedTransferAnswer(source, target, amount, timestamp, granted, id), self); } diff --git a/src/main/java/fucoin/actions/transaction/ActionPrepareDistributedCommittedTransferAnswer.java b/src/main/java/fucoin/actions/transaction/ActionPrepareDistributedCommittedTransferAnswer.java index 3d2a0c094016678fe8d9ec18d64fed148be73eaa..db21831de14d3e95c2d30f335586f35052dc307c 100644 --- a/src/main/java/fucoin/actions/transaction/ActionPrepareDistributedCommittedTransferAnswer.java +++ b/src/main/java/fucoin/actions/transaction/ActionPrepareDistributedCommittedTransferAnswer.java @@ -27,16 +27,23 @@ public class ActionPrepareDistributedCommittedTransferAnswer extends Coordinator @Override protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, SuperVisorImpl superVisor) { - superVisor.log("" + superVisor.getKnownNeighbors()); - superVisor.log("granted?" + granted); + + superVisor.addLogMsg("granted?" + granted); + DistributedCommittedTransferRequest request = superVisor.getRequest(id); + + // ignore unknown requests + if (request == null){ + return; + } + if (granted) { - if (request == null)//unknown DistributedCommittedTransferRequest ignore - return; + int newCount = request.addPositiveAnswer(sender); if (newCount == superVisor.getKnownNeighbors().size()) { - ActionCommitDistributedCommittedTransfer acdct = new ActionCommitDistributedCommittedTransfer(source, target, amount, true, timestamp, id); + ActionCommitDistributedCommittedTransfer acdct = new ActionCommitDistributedCommittedTransfer(source, target, amount, true, timestamp, id, request.getObserver()); + superVisor.addTransactionLogMessageSuccess("Transfer of " + amount + " FUC from" + source.path().name() + " to " + target.path().name()); for (ActorRef neighbor : request.getAnswers()) { neighbor.tell(acdct, self); } @@ -44,13 +51,11 @@ public class ActionPrepareDistributedCommittedTransferAnswer extends Coordinator } } else { //A client wants to rollback - if (request != null) { - superVisor.log("Client does not grant commit of " + amount + " FUC (" + source.path().name() + " -> " + target.path().name() + ")"); - ActionCommitDistributedCommittedTransfer acdct = new ActionCommitDistributedCommittedTransfer(source, target, amount, false, timestamp, id); + superVisor.addTransactionLogMessageFail("Client does not grant commit of " + amount + " FUC (" + source.path().name() + " -> " + target.path().name() + ")"); + ActionCommitDistributedCommittedTransfer acdct = new ActionCommitDistributedCommittedTransfer(source, target, amount, false, timestamp, id, request.getObserver()); for (ActorRef neighbor : request.getAnswers()) { neighbor.tell(acdct, self); } - } } } diff --git a/src/main/java/fucoin/configurations/AbstractConfiguration.java b/src/main/java/fucoin/configurations/AbstractConfiguration.java new file mode 100644 index 0000000000000000000000000000000000000000..8493704af98937d47688ad0b57323902394e0d8d --- /dev/null +++ b/src/main/java/fucoin/configurations/AbstractConfiguration.java @@ -0,0 +1,184 @@ +package fucoin.configurations; + +import akka.actor.ActorRef; +import akka.actor.Props; +import akka.pattern.Patterns; +import akka.util.Timeout; +import fucoin.AbstractNode; +import fucoin.actions.control.ActionAnnounceWalletCreation; +import fucoin.actions.control.ActionWalletSendMoney; +import fucoin.actions.join.ActionTellSupervisor; +import fucoin.actions.transaction.ActionGetAmount; +import fucoin.actions.transaction.ActionGetAmountAnswer; +import fucoin.actions.transaction.ActionNotifyObserver; +import fucoin.configurations.internal.ConfigurationCreator; +import fucoin.supervisor.SuperVisorImpl; +import fucoin.wallet.WalletImpl; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.Duration; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; + +/** + * + */ +public abstract class AbstractConfiguration extends AbstractNode { + + private ActorRef superVisor; + + private final List<ActorRef> activeActors = new ArrayList<>(); + + private Timeout timeout = new Timeout(Duration.create(10, "seconds")); + + private int remainingTransactions; + + public static Props props(Class configurationClass) { + + return Props.create(new ConfigurationCreator(configurationClass)); + } + + /** + * Spawns a new wallet and blocks until it has received its initial money + * + * @throws Exception on timeout + */ + ActorRef spawnWallet(String name, boolean createGUI) throws Exception { + Future<Object> future = Patterns.ask(superVisor, new ActionAnnounceWalletCreation(1, self()), timeout); + ActorRef wallet = createWallet(name, createGUI); + Await.result(future, timeout.duration()); + return wallet; + } + + /** + * Creates a wallet without blocking until the wallet was created + */ + private ActorRef createWallet(String name, boolean createGUI) { + Props props; + int numOfWallets = activeActors.size(); + if (numOfWallets == 0) { + props = WalletImpl.props(null, name, createGUI); + } else { + props = WalletImpl.props(activeActors.get(numOfWallets - 1), name, createGUI); + } + + ActorRef actorRef = context().actorOf(props, name); + + activeActors.add(actorRef); + + if (numOfWallets == 0) { + actorRef.tell(new ActionTellSupervisor(superVisor), superVisor); + } + + return actorRef; + } + + /** + * Spawn multiple wallets and wait until they all have their initial FUC + * + * @throws Exception on timeout + */ + public void spawnWallets(int n, boolean createGUI) throws Exception { + Future<Object> future = Patterns.ask(superVisor, new ActionAnnounceWalletCreation(n, self()), timeout); + for (int i = 0; i < n; i++) { + String nameOfTheWallet = "Wallet" + String.valueOf(activeActors.size()); + createWallet(nameOfTheWallet, createGUI); + } + Await.result(future, timeout.duration()); + } + + /** + * Fetch a random wallet + */ + public ActorRef getRandomWallet() { + return activeActors.get(ThreadLocalRandom.current().nextInt(activeActors.size())); + } + + public List<ActorRef> wallets() { + return this.activeActors; + } + + protected void randomTransactions(int number, int maxTransactionsAtTheSameTime) { + + remainingTransactions = number; + + for (int i = 0; i < Math.min(number, maxTransactionsAtTheSameTime); i++) { + nextRandomTransaction(); + } + + } + + private void nextRandomTransaction() { + remainingTransactions--; + try { + randomTransaction(); + } catch (Exception e) { + System.err.println("Error while trying to perform a random transaction: "+e.getMessage()); + remainingTransactions = 0; + } + } + + private void randomTransaction() throws Exception { + List<ActorRef> wallets = wallets(); + Collections.shuffle(wallets); + + ActorRef sender = wallets.get(0); + ActorRef recipient = wallets.get(1); + + + Future<Object> future = Patterns.ask(sender, new ActionGetAmount(), timeout); + ActionGetAmountAnswer answer = (ActionGetAmountAnswer) Await.result(future, timeout.duration()); + + int transferAmount = 1 + ThreadLocalRandom.current().nextInt(answer.amount); + + sender.tell(new ActionWalletSendMoney(recipient.path().name(), transferAmount, self()), self()); + } + + + /** + * Create the supervisor node + */ + ActorRef initSupervisor() { + superVisor = context().actorOf(SuperVisorImpl.props(), "SuperVisorImpl"); + + // Don't ask. + try { + Thread.sleep(200); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return superVisor; + } + + @Override + public void onReceive(Object message) { + if (message instanceof ActionNotifyObserver) { + + ActionNotifyObserver notification = (ActionNotifyObserver) message; + + String status = "successful"; + if (!notification.granted) { + status = "failed"; + } + + System.out.println("Observed a " + status + " transaction of " + notification.amount + " FUCs from " + + notification.source.path().name() + " to " + notification.target.path().name()); + + if (remainingTransactions > 0) { + nextRandomTransaction(); + } + } + } + + @Override + public void preStart() throws Exception { + super.preStart(); + + this.run(); + } + + public abstract void run(); +} diff --git a/src/main/java/fucoin/configurations/DefaultConfiguration.java b/src/main/java/fucoin/configurations/DefaultConfiguration.java new file mode 100644 index 0000000000000000000000000000000000000000..403d071f351c51d413d9e38fd07331f52625a13f --- /dev/null +++ b/src/main/java/fucoin/configurations/DefaultConfiguration.java @@ -0,0 +1,60 @@ +package fucoin.configurations; + +import akka.actor.ActorRef; +import akka.pattern.Patterns; +import akka.util.Timeout; +import fucoin.actions.control.ActionWalletSendMoney; +import fucoin.actions.transaction.ActionGetAmount; +import fucoin.actions.transaction.ActionGetAmountAnswer; +import fucoin.actions.transaction.ActionNotifyObserver; +import fucoin.configurations.internal.ConfigurationName; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.Duration; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; + +/** + * This configuration is the previous default of 2 wallets with GUI and a supervisor. + */ +@ConfigurationName("Default Configuration") +public class DefaultConfiguration extends AbstractConfiguration { + + private ThreadLocalRandom rand; + private Timeout timeout = new Timeout(Duration.create(10, "seconds")); + + public DefaultConfiguration(){ + rand = ThreadLocalRandom.current(); + } + + @Override + public void run() { + initSupervisor(); + + ActorRef wallet1 = null; + ActorRef wallet2 = null; + try { + wallet1 = spawnWallet("Wallet0", true); + } catch (Exception e) { + System.out.println("Wallet0 spawning timed out"); + } + try { + wallet2 = spawnWallet("Wallet1", true); + } catch (Exception e) { + System.out.println("Wallet1 spawning timed out"); + } + + if (wallet1 != null && wallet2 != null) { + wallet1.tell(new ActionWalletSendMoney(wallet2.path().name(), 50, getSelf()), wallet1); + } + + } + + + @Override + public void onReceive(Object message) { + super.onReceive(message); + } +} diff --git a/src/main/java/fucoin/configurations/MassWalletConfiguration.java b/src/main/java/fucoin/configurations/MassWalletConfiguration.java new file mode 100644 index 0000000000000000000000000000000000000000..fcaaadf3cea173e7b0cf3fa6fc038fbee5dcecae --- /dev/null +++ b/src/main/java/fucoin/configurations/MassWalletConfiguration.java @@ -0,0 +1,27 @@ +package fucoin.configurations; + +import fucoin.configurations.internal.ConfigurationName; + +/** + * This configuration spawns 200 wallets to demonstrate the spawning of many headless wallets + */ +@ConfigurationName("Lots of Wallets") +public class MassWalletConfiguration extends AbstractConfiguration { + @Override + public void run() { + initSupervisor(); + try { + spawnWallets(200, false); + System.out.println("Wallet spawning done!"); + } catch (Exception e) { + System.out.println("Wallet spawning timed out!"); + } + + randomTransactions(100, 10); + } + + @Override + public void onReceive(Object message) { + super.onReceive(message); + } +} diff --git a/src/main/java/fucoin/configurations/internal/ConfigurationCreator.java b/src/main/java/fucoin/configurations/internal/ConfigurationCreator.java new file mode 100644 index 0000000000000000000000000000000000000000..46e7f0af2d91c5fc631ac15a999dde91a0a9c6ab --- /dev/null +++ b/src/main/java/fucoin/configurations/internal/ConfigurationCreator.java @@ -0,0 +1,22 @@ +package fucoin.configurations.internal; + +import akka.japi.Creator; +import fucoin.configurations.AbstractConfiguration; + +/** + * This is used by Akka to spawn the Configuration Actor + */ +public class ConfigurationCreator implements Creator<AbstractConfiguration> { + + private Class configurationClass; + + public ConfigurationCreator(Class configurationClass) { + + this.configurationClass = configurationClass; + } + + @Override + public AbstractConfiguration create() throws Exception { + return (AbstractConfiguration) this.configurationClass.newInstance(); + } +} diff --git a/src/main/java/fucoin/configurations/internal/ConfigurationName.java b/src/main/java/fucoin/configurations/internal/ConfigurationName.java new file mode 100644 index 0000000000000000000000000000000000000000..9f18a0bc1b62767944ec94542a43cb95fc0ad883 --- /dev/null +++ b/src/main/java/fucoin/configurations/internal/ConfigurationName.java @@ -0,0 +1,13 @@ +package fucoin.configurations.internal; + +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; + +/** + * This Annotation can be used by Configurations to display + * a more friendly title than the class name in the startup dialog + */ +@Retention(RetentionPolicy.RUNTIME) +public @interface ConfigurationName { + String value(); +} diff --git a/src/main/java/fucoin/configurations/internal/ConfigurationSelection.java b/src/main/java/fucoin/configurations/internal/ConfigurationSelection.java new file mode 100644 index 0000000000000000000000000000000000000000..f43ec8aa58299cbbf68863162d7005423b05db72 --- /dev/null +++ b/src/main/java/fucoin/configurations/internal/ConfigurationSelection.java @@ -0,0 +1,27 @@ +package fucoin.configurations.internal; + +import fucoin.configurations.AbstractConfiguration; + +/** + * A ConfigurationSelection is a wrapper around the configuration class objects + * to display a custom text in the configuration selection dropdown. + */ +public class ConfigurationSelection { + private Class<AbstractConfiguration> configurationClass; + + public ConfigurationSelection(Class<AbstractConfiguration> configurationClass) { + this.configurationClass = configurationClass; + } + + public Class<AbstractConfiguration> getConfigurationClass() { + return configurationClass; + } + + @Override + public String toString() { + if (configurationClass.isAnnotationPresent(ConfigurationName.class)) { + return configurationClass.getAnnotation(ConfigurationName.class).value(); + } + return configurationClass.getSimpleName(); + } +} diff --git a/src/main/java/fucoin/gui/FilteredLogModel.java b/src/main/java/fucoin/gui/FilteredLogModel.java new file mode 100644 index 0000000000000000000000000000000000000000..97ae91cf4a60f3269352192bbbfcfc3a2ca2f744 --- /dev/null +++ b/src/main/java/fucoin/gui/FilteredLogModel.java @@ -0,0 +1,76 @@ +package fucoin.gui; + +import javax.swing.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + + +public class FilteredLogModel extends AbstractListModel<LogMessage> { + + private List<LogMessage> log; + private List<LogMessage> filteredLog; + + private boolean filterTransactions = false; + + public FilteredLogModel() { + super(); + this.log = new ArrayList<>(); + filteredLog = new ArrayList<>(); + } + + public void setTransactionFilter() { + filterTransactions = true; + refilter(); + } + + public void clearFilter(){ + filterTransactions = false; + refilter(); + } + + private void refilter() { + filteredLog.clear(); + for (LogMessage msg : log) { + if (matchesFilter(msg)) { + filteredLog.add(msg); + + } + } + fireContentsChanged(this, 0, getSize()-1); + } + + private boolean matchesFilter(LogMessage msg){ + return !filterTransactions + || (msg.getContext() == LogMessage.Context.TRANSACTION_SUCCESS + || msg.getContext() == LogMessage.Context.TRANSACTION_FAIL); + } + + @Override + public int getSize() { + return filteredLog.size(); + } + + @Override + public LogMessage getElementAt(int index) { + return filteredLog.get(index); + } + + + public void addElement(LogMessage message) { + + log.add(message); + + if(matchesFilter(message)){ + filteredLog.add(message); + int index = getSize() - 1; + fireIntervalAdded(this, index, index); + } + } + + public void emptyLog() { + log.removeAll(log); + filteredLog.removeAll(filteredLog); + refilter(); + } +} diff --git a/src/main/java/fucoin/gui/LogCellRenderer.java b/src/main/java/fucoin/gui/LogCellRenderer.java index 5087daec62868bb8cfd661da6f56a049fcfd4ca0..1f787a5988f8622a589391af95d174453a279d05 100644 --- a/src/main/java/fucoin/gui/LogCellRenderer.java +++ b/src/main/java/fucoin/gui/LogCellRenderer.java @@ -28,7 +28,7 @@ public class LogCellRenderer extends DefaultListCellRenderer { } } - if(isSelected){ + if (isSelected) { setForeground(Color.WHITE); } diff --git a/src/main/java/fucoin/gui/SuperVisorGuiControl.java b/src/main/java/fucoin/gui/SuperVisorGuiControl.java index bea5e7ae5004ddbd003beb1b5fe82dbbdb3620b1..b8b29e555f906ae8de36d8b1edd625fe7acc824d 100644 --- a/src/main/java/fucoin/gui/SuperVisorGuiControl.java +++ b/src/main/java/fucoin/gui/SuperVisorGuiControl.java @@ -1,10 +1,12 @@ package fucoin.gui; -public interface SuperVisorGuiControl { +public interface SuperVisorGuiControl extends TransactionLogger { + /** * Call from SuperVisorImpl after poison pill or kill */ void onLeave(); - void log(String message); + public void updateTable(String address, String name, int amount); + } diff --git a/src/main/java/fucoin/gui/SuperVisorGuiControlImpl.java b/src/main/java/fucoin/gui/SuperVisorGuiControlImpl.java index 81a379ae96d0230c5fb3bfe4b7296d7ce75ba7ab..b0c9a125e68a2cadce66c418126fa563e9aec6bc 100644 --- a/src/main/java/fucoin/gui/SuperVisorGuiControlImpl.java +++ b/src/main/java/fucoin/gui/SuperVisorGuiControlImpl.java @@ -1,19 +1,24 @@ package fucoin.gui; +import fucoin.supervisor.AmountTableModel; import fucoin.supervisor.SuperVisorImpl; import javax.swing.*; +import javax.swing.event.TableModelEvent; +import javax.swing.event.TableModelListener; import java.awt.*; +import java.awt.event.ItemEvent; import java.awt.event.WindowAdapter; import java.awt.event.WindowEvent; public class SuperVisorGuiControlImpl implements SuperVisorGuiControl { private SuperVisorImpl superVisor; - private JFrame frame; - private DefaultListModel<String> log = new DefaultListModel<>(); - private JList<String> txtLog = new JList<>(log); - private JScrollPane logPane = new JScrollPane(txtLog); + private AmountTableModel amountTableModel; + + private SuperVisorThreadGUI threadGUI; + + private boolean logActive = false; public SuperVisorGuiControlImpl(SuperVisorImpl sv) { superVisor = sv; @@ -21,55 +26,66 @@ public class SuperVisorGuiControlImpl implements SuperVisorGuiControl { } private void init() { - //Show AWT window for runtime information - frame = new JFrame("Server"); - JPanel contentPanel = new JPanel(); - contentPanel.setLayout(new GridLayout(2, 1)); - - //Init Amount Table and SuperVisorImpl - - JTable amountListView = new JTable(superVisor.getAmountTableModel()); - contentPanel.add(new JScrollPane(amountListView)); - - contentPanel.add(logPane); - - frame.add(contentPanel, BorderLayout.CENTER); - - //Exit Button and shutdown supervisor - JButton exitBtn = new JButton("Stop Supervisor"); - exitBtn.addActionListener(e -> { - superVisor.exit(); - frame.setVisible(false); - frame.dispose(); - }); - frame.add(exitBtn, BorderLayout.PAGE_END); - frame.setSize(800, 600); - frame.setVisible(true); - - frame.addWindowListener(new WindowAdapter() { - @Override - public void windowClosing(WindowEvent e) { - super.windowClosing(e); - superVisor.exit(); - } - }); + + amountTableModel = new AmountTableModel(); + + threadGUI = new SuperVisorThreadGUI(this); + threadGUI.init(); + + } + + public void guiTerminated() { + superVisor.exit(); } @Override public void onLeave() { - frame.dispose(); + threadGUI.dispose(); + } + + @Override + public void updateTable(String address, String name, int amount) { + this.amountTableModel.updateTable(address, name, amount); + } + + private void log(LogMessage logMessage) { + if (logActive) { + threadGUI.log(logMessage); + } + } + + @Override + public void addLogMsg(String msg) { + log(new LogMessage(msg)); + } + + @Override + public void addTransactionLogMessageSuccess(String message) { + log(new LogMessage(message, LogMessage.Context.TRANSACTION_SUCCESS)); } @Override - public void log(String message) { - // One day, we may have a server log GUI as well.. - // Until then, we just print it to the console - //System.out.println(message); - SwingUtilities.invokeLater(() -> { - log.addElement(message); - - // auto scroll to the bottom - txtLog.ensureIndexIsVisible(log.size() - 1); - }); + public void addTransactionLogMessageFail(String message) { + log(new LogMessage(message, LogMessage.Context.TRANSACTION_FAIL)); + } + + public AmountTableModel getAmountTableModel() { + return amountTableModel; + } + + public void setAmountTableModel(AmountTableModel amountTableModel) { + this.amountTableModel = amountTableModel; + } + + public void activateLogging() { + this.logActive = true; + } + + public void disableLogging() { + this.logActive = false; + } + + public boolean isLogActive() { + return logActive; } } diff --git a/src/main/java/fucoin/gui/SuperVisorThreadGUI.java b/src/main/java/fucoin/gui/SuperVisorThreadGUI.java new file mode 100644 index 0000000000000000000000000000000000000000..d5d7a1bced7a9b4047993970268255c0ce08f765 --- /dev/null +++ b/src/main/java/fucoin/gui/SuperVisorThreadGUI.java @@ -0,0 +1,113 @@ +package fucoin.gui; + +import javax.swing.*; +import java.awt.*; +import java.awt.event.ItemEvent; +import java.awt.event.WindowAdapter; +import java.awt.event.WindowEvent; + +/** + * + */ +public class SuperVisorThreadGUI { + private JFrame frame; + + private FilteredLogModel log = new FilteredLogModel(); + private JList<LogMessage> txtLog = new JList<>(log); + private JScrollPane logPane = new JScrollPane(txtLog); + private JCheckBox showDebug; + private JCheckBox activateLogging; + private SuperVisorGuiControlImpl superVisorGuiControl; + + public SuperVisorThreadGUI(SuperVisorGuiControlImpl superVisorGuiControl) { + + this.superVisorGuiControl = superVisorGuiControl; + } + + public void init() { + new Thread(() -> { + //Show AWT window for runtime information + frame = new JFrame("Server"); + JPanel contentPanel = new JPanel(); + contentPanel.setLayout(new GridLayout(2, 1)); + + JTable amountListView = new JTable(superVisorGuiControl.getAmountTableModel()); + superVisorGuiControl.getAmountTableModel().addTableModelListener(e -> SwingUtilities.invokeLater(() -> frame.setTitle("Server (" + superVisorGuiControl.getAmountTableModel().getRowCount() + " Wallets)"))); + contentPanel.add(new JScrollPane(amountListView)); + + JPanel logPanel = new JPanel(new BorderLayout()); + + txtLog.setCellRenderer(new LogCellRenderer()); + + showDebug = new JCheckBox("Show debug messages in transaction log"); + showDebug.setSelected(true); + showDebug.addItemListener(e -> { + if (e.getStateChange() == ItemEvent.SELECTED) { + log.clearFilter(); + } else { + log.setTransactionFilter(); + } + }); + + activateLogging = new JCheckBox("Activate logging"); + activateLogging.setSelected(superVisorGuiControl.isLogActive()); + activateLogging.addItemListener(e -> { + if (e.getStateChange() == ItemEvent.SELECTED) { + superVisorGuiControl.activateLogging(); + log.emptyLog(); + } else { + superVisorGuiControl.disableLogging(); + } + }); + + JPanel configPanel = new JPanel(); + + configPanel.add(activateLogging); + configPanel.add(showDebug); + + //logPanel.add(activateLogging, BorderLayout.NORTH); + logPanel.add(configPanel, BorderLayout.NORTH); + logPanel.add(logPane, BorderLayout.CENTER); + contentPanel.add(logPanel); + + frame.add(contentPanel, BorderLayout.CENTER); + + //Exit Button and shutdown supervisor + JButton exitBtn = new JButton("Stop Supervisor"); + exitBtn.addActionListener(e -> { + superVisorGuiControl.guiTerminated(); + frame.setVisible(false); + frame.dispose(); + }); + + if (!superVisorGuiControl.isLogActive()) { + this.log(new LogMessage("Logging is currently disabled.")); + } + + frame.add(exitBtn, BorderLayout.PAGE_END); + frame.setSize(800, 600); + frame.setVisible(true); + + frame.addWindowListener(new WindowAdapter() { + @Override + public void windowClosing(WindowEvent e) { + super.windowClosing(e); + superVisorGuiControl.guiTerminated(); + } + }); + }).start(); + } + + public void dispose() { + frame.dispose(); + } + + public void log(LogMessage logMessage) { + SwingUtilities.invokeLater(() -> { + log.addElement(logMessage); + + // auto scroll to the bottom + txtLog.ensureIndexIsVisible(log.getSize() - 1); + }); + } +} diff --git a/src/main/java/fucoin/gui/TransactionLogger.java b/src/main/java/fucoin/gui/TransactionLogger.java new file mode 100644 index 0000000000000000000000000000000000000000..4a3a00db665abdf4075f93a854a7a208afa516dd --- /dev/null +++ b/src/main/java/fucoin/gui/TransactionLogger.java @@ -0,0 +1,25 @@ +package fucoin.gui; + +/** + * All nodes implementing some kind of logging should implement this interface. + */ +public interface TransactionLogger { + + /** + * Adds a debug log message or a generic log message to the log + * @param message + */ + public void addLogMsg(String message); + + /** + * Adds a log message concerning a successful transaction + * @param message + */ + public void addTransactionLogMessageSuccess(String message); + + /** + * Adds a log message concerning a failed transaction + * @param message + */ + public void addTransactionLogMessageFail(String message); +} diff --git a/src/main/java/fucoin/gui/WalletGuiControl.java b/src/main/java/fucoin/gui/WalletGuiControl.java index 90bdbdb4e43b6d8fe346d0ebce46996304200696..301e22dcfb142234ee8053aa0e77e2b374169757 100644 --- a/src/main/java/fucoin/gui/WalletGuiControl.java +++ b/src/main/java/fucoin/gui/WalletGuiControl.java @@ -1,19 +1,13 @@ package fucoin.gui; -public interface WalletGuiControl { +public interface WalletGuiControl extends TransactionLogger { void setAddress(String address); void setAmount(int amount); void addKnownAddress(String address); - void addLogMsg(String msg); - - void addTransactionLogMessageSuccess(String message); - - void addTransactionLogMessageFail(String message); - /** * Tell the GUI, that the wallet is a remote wallet. */ diff --git a/src/main/java/fucoin/gui/WalletGuiControlImpl.java b/src/main/java/fucoin/gui/WalletGuiControlImpl.java index 2a2e008f3ba57d8e55cf49c6597acba091220366..49c69b9408d320ac404f5194b4e856c50c80c480 100644 --- a/src/main/java/fucoin/gui/WalletGuiControlImpl.java +++ b/src/main/java/fucoin/gui/WalletGuiControlImpl.java @@ -10,7 +10,7 @@ import java.util.Enumeration; public class WalletGuiControlImpl implements WalletGuiControl { - private DefaultListModel<LogMessage> log = new DefaultListModel<>(); + private FilteredLogModel log = new FilteredLogModel(); private JFrame window = new JFrame("test"); private JPanel topPanel = new JPanel(); @@ -110,12 +110,14 @@ public class WalletGuiControlImpl implements WalletGuiControl { bottomPanel.setLayout(new BorderLayout()); + log.setTransactionFilter(); + showDebug = new JCheckBox("Show debug messages in transaction log"); showDebug.addItemListener(e -> { if (e.getStateChange() == ItemEvent.SELECTED) { - txtLog.setModel(log); + log.clearFilter(); } else { - updateFilteredLog(); + log.setTransactionFilter(); } }); @@ -222,27 +224,8 @@ public class WalletGuiControlImpl implements WalletGuiControl { SwingUtilities.invokeLater(() -> { log.addElement(logMessage); - if (!showDebug.isSelected()) { - updateFilteredLog(); - } - // auto scroll to the bottom - txtLog.ensureIndexIsVisible(log.size() - 1); + txtLog.ensureIndexIsVisible(log.getSize() - 1); }); } - - private void updateFilteredLog() { - - DefaultListModel<LogMessage> filteredLog = new DefaultListModel<>(); - Enumeration<LogMessage> elements = log.elements(); - while (elements.hasMoreElements()) { - LogMessage logMessage = elements.nextElement(); - LogMessage.Context context = logMessage.getContext(); - if (context == LogMessage.Context.TRANSACTION_FAIL || context == LogMessage.Context.TRANSACTION_SUCCESS) { - filteredLog.addElement(logMessage); - } - } - - txtLog.setModel(filteredLog); - } } diff --git a/src/main/java/fucoin/supervisor/AmountTableModel.java b/src/main/java/fucoin/supervisor/AmountTableModel.java index b25a22f139b2b4f37749b33604d65c55e5b4d65a..80378e916040dac5fc3fb816250206cefb424ff5 100644 --- a/src/main/java/fucoin/supervisor/AmountTableModel.java +++ b/src/main/java/fucoin/supervisor/AmountTableModel.java @@ -1,5 +1,6 @@ package fucoin.supervisor; +import javax.swing.*; import javax.swing.table.DefaultTableModel; import java.util.Vector; diff --git a/src/main/java/fucoin/supervisor/DistributedCommittedTransferRequest.java b/src/main/java/fucoin/supervisor/DistributedCommittedTransferRequest.java index d4ebd400373f9103c1ed6a5fa8b156b90e613003..26a53bb79c7a8f1660854e3c97d548839301468b 100644 --- a/src/main/java/fucoin/supervisor/DistributedCommittedTransferRequest.java +++ b/src/main/java/fucoin/supervisor/DistributedCommittedTransferRequest.java @@ -13,6 +13,8 @@ public class DistributedCommittedTransferRequest extends Transaction { private final static Random random = new Random(System.currentTimeMillis() + System.nanoTime()); private ActorRef source; private ActorRef target; + private ActorRef observer; + private int amount; private long timeout; @@ -21,13 +23,15 @@ public class DistributedCommittedTransferRequest extends Transaction { private List<ActorRef> answers = new LinkedList<>(); public DistributedCommittedTransferRequest(ActorRef source, ActorRef target, int amount, - long timeout) { + long timeout, ActorRef observer) { this.source = source; this.target = target; this.amount = amount; this.timeout = timeout; this.id = random.nextLong(); + + this.observer = observer; } @Override @@ -64,4 +68,8 @@ public class DistributedCommittedTransferRequest extends Transaction { public int getAmount() { return amount; } + + public ActorRef getObserver(){ + return observer; + } } diff --git a/src/main/java/fucoin/supervisor/SuperVisorImpl.java b/src/main/java/fucoin/supervisor/SuperVisorImpl.java index 1252f1c9c925c5e9e4fad02fcafd3c69ae2caebd..8b4ec173daab141c1172927ecaf34da83d5376c0 100644 --- a/src/main/java/fucoin/supervisor/SuperVisorImpl.java +++ b/src/main/java/fucoin/supervisor/SuperVisorImpl.java @@ -1,39 +1,43 @@ package fucoin.supervisor; +import akka.actor.ActorRef; import akka.actor.Props; import fucoin.actions.Action; +import fucoin.actions.control.ActionWalletCreationDone; import fucoin.actions.persist.ActionInvokeUpdate; import fucoin.actions.transaction.ActionGetAmountAnswer; +import fucoin.actions.transaction.ActionNotifyObserver; import fucoin.actions.transaction.SuperVisorAction; import fucoin.gui.SuperVisorGuiControl; import fucoin.AbstractNode; +import fucoin.gui.TransactionLogger; +import javax.swing.*; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; -public class SuperVisorImpl extends AbstractNode { +public class SuperVisorImpl extends AbstractNode implements TransactionLogger { - private AmountTableModel amountTableModel; + //private AmountTableModel amountTableModel; private Map<Long, DistributedCommittedTransferRequest> requestQueue; private SuperVisorGuiControl gui; + private int pendingBankCommits = 0; + private ActorRef bankCommitObserver = null; + public SuperVisorImpl() { - this.amountTableModel = new AmountTableModel(); + } public void setGuiControl(SuperVisorGuiControl gui) { this.gui = gui; } - public SuperVisorImpl(AmountTableModel amountTableModel) { - this.amountTableModel = amountTableModel; - } - @Override public void onReceive(Object msg) { @@ -41,7 +45,20 @@ public class SuperVisorImpl extends AbstractNode { // ClientAction for some reason if (msg instanceof ActionGetAmountAnswer) { ActionGetAmountAnswer answer = (ActionGetAmountAnswer) msg; - amountTableModel.updateTable(answer.address, answer.name, answer.amount); + SwingUtilities.invokeLater(() -> gui.updateTable(answer.address, answer.name, answer.amount)); + } else if (msg instanceof ActionNotifyObserver) { + ActionNotifyObserver notifyMsg = (ActionNotifyObserver) msg; + if (notifyMsg.granted) { + if (pendingBankCommits > 0) { + pendingBankCommits--; + } else { + return; + } + + if (pendingBankCommits == 0) { + this.getBankCommitObserver().tell(new ActionWalletCreationDone(), self()); + } + } } /* TODO: Whats happened here?? Why we can invoke doAction of abstract class? */ else if (msg instanceof SuperVisorAction) { ((Action) msg).doAction(this); } @@ -108,19 +125,46 @@ public class SuperVisorImpl extends AbstractNode { requestQueue.remove(request.getId()); } - public AmountTableModel getAmountTableModel() { - return amountTableModel; + @Override + public void addLogMsg(String message) { + if (gui != null) { + gui.addLogMsg(message); + } else { + System.out.println(message); + } } - public void setAmountTableModel(AmountTableModel amountTableModel) { - this.amountTableModel = amountTableModel; + @Override + public void addTransactionLogMessageSuccess(String message) { + if (gui != null) { + gui.addTransactionLogMessageSuccess(message); + } else { + System.out.println(message); + } } - public void log(String message) { - if (this.gui != null) { - this.gui.log(message); + @Override + public void addTransactionLogMessageFail(String message) { + if (gui != null) { + gui.addTransactionLogMessageFail(message); } else { System.out.println(message); } } + + public int getPendingBankCommits() { + return pendingBankCommits; + } + + public ActorRef getBankCommitObserver() { + return bankCommitObserver; + } + + public void setPendingBankCommits(int pendingBankCommits) { + this.pendingBankCommits = pendingBankCommits; + } + + public void setBankCommitObserver(ActorRef bankCommitObserver) { + this.bankCommitObserver = bankCommitObserver; + } } diff --git a/src/main/java/fucoin/wallet/AbstractWallet.java b/src/main/java/fucoin/wallet/AbstractWallet.java index bc44d051ec250a68db11f9e677b683f985a4c5b8..882c2a547b9060a576589281eaaea8c9d3e078ba 100644 --- a/src/main/java/fucoin/wallet/AbstractWallet.java +++ b/src/main/java/fucoin/wallet/AbstractWallet.java @@ -2,13 +2,15 @@ package fucoin.wallet; import akka.actor.ActorRef; import fucoin.AbstractNode; +import fucoin.gui.TransactionLogger; +import scala.concurrent.Future; import java.io.Serializable; /** * */ -public abstract class AbstractWallet extends AbstractNode implements Serializable { +public abstract class AbstractWallet extends AbstractNode implements Serializable, TransactionLogger { /** * Currently amount of this wallet @@ -94,25 +96,22 @@ public abstract class AbstractWallet extends AbstractNode implements Serializabl */ public abstract void setRemoteSuperVisorActor(ActorRef remoteSuperVisorActor); - /** - * Appends a message related to a successful transaction to the log - * - * @param msg - */ - public abstract void logTransactionSuccess(String msg); + public abstract void deferSendOfSuperVisorActor(ActorRef destinationWallet); /** - * Appends a message related to a successful transaction to the log + * Sends amount FUCs to the wallet with the address address * - * @param msg + * @param address Recipients address + * @param amount Amount to send */ - public abstract void logTransactionFail(String msg); + public abstract void send(String address, int amount); /** - * Sends amount FUCs to the wallet with the address adress + * Sends amount FUCs to the wallet with the address address and the observer observer * - * @param address Recipients address - * @param amount Amount to send + * @param address + * @param amount + * @param observer */ - public abstract void send(String address, int amount); + public abstract void send(String address, int amount, ActorRef observer); } diff --git a/src/main/java/fucoin/wallet/WalletCreator.java b/src/main/java/fucoin/wallet/WalletCreator.java index 2a512b9f797529e1a24a0508fe91194fb3431226..f8f6e4d425a39db9cb25b9ff3cd9bac7327b3616 100644 --- a/src/main/java/fucoin/wallet/WalletCreator.java +++ b/src/main/java/fucoin/wallet/WalletCreator.java @@ -7,18 +7,24 @@ import fucoin.gui.WalletGuiControlImpl; public class WalletCreator implements Creator<AbstractWallet> { private ActorRef preKnownNeighbour; private String walletName; + private boolean createGUI; - public WalletCreator(ActorRef preKnownNeighbour, String walletName) { + public WalletCreator(ActorRef preKnownNeighbour, String walletName, boolean createGUI) { this.preKnownNeighbour = preKnownNeighbour; this.walletName = walletName; + this.createGUI = createGUI; } @Override public WalletImpl create() throws Exception { + WalletImpl wallet = new WalletImpl(preKnownNeighbour, walletName); - WalletGuiControlImpl gui = new WalletGuiControlImpl(wallet); - wallet.setGui(gui); + if (createGUI) { + WalletGuiControlImpl gui = new WalletGuiControlImpl(wallet); + wallet.setGui(gui); + } + return wallet; } } diff --git a/src/main/java/fucoin/wallet/WalletImpl.java b/src/main/java/fucoin/wallet/WalletImpl.java index f35d620fb87a8629bb4238682c119e208b9158b2..951a1519f91fa609e143fce37a554279eebcecbb 100644 --- a/src/main/java/fucoin/wallet/WalletImpl.java +++ b/src/main/java/fucoin/wallet/WalletImpl.java @@ -5,12 +5,18 @@ import akka.actor.Props; import fucoin.actions.ClientAction; import fucoin.actions.join.ActionJoin; import fucoin.actions.join.ActionJoinAnswer; +import fucoin.actions.join.ActionTellSupervisor; import fucoin.actions.join.ServerActionJoin; import fucoin.actions.persist.ActionInvokeLeave; import fucoin.actions.persist.ActionInvokeRevive; import fucoin.actions.transaction.ActionGetAmountAnswer; import fucoin.actions.transaction.ActionInvokeSentMoney; import fucoin.gui.WalletGuiControl; +import scala.concurrent.Future; + +import static akka.dispatch.Futures.future; + +import java.util.concurrent.ConcurrentLinkedQueue; public class WalletImpl extends AbstractWallet { @@ -19,6 +25,7 @@ public class WalletImpl extends AbstractWallet { private transient WalletGuiControl gui; private String preKnownNeighbourName; private boolean isActive; + private ConcurrentLinkedQueue<ActorRef> deferedSupervisorReceivers = new ConcurrentLinkedQueue<>(); public WalletImpl(String name) { super(name); @@ -32,8 +39,8 @@ public class WalletImpl extends AbstractWallet { } } - public static Props props(ActorRef preKnownNeighbour, String walletName) { - return Props.create(new WalletCreator(preKnownNeighbour, walletName)); + public static Props props(ActorRef preKnownNeighbour, String walletName, boolean createGUI) { + return Props.create(new WalletCreator(preKnownNeighbour, walletName, createGUI)); } /** @@ -43,7 +50,7 @@ public class WalletImpl extends AbstractWallet { */ public void addAmount(int amount) { setAmount(this.getAmount() + amount); - log(" My amount is now " + this.getAmount()); + addLogMsg(" My amount is now " + this.getAmount()); } @Override @@ -54,7 +61,7 @@ public class WalletImpl extends AbstractWallet { @Override public void onReceive(Object message) { - log(getSender().path().name() + " invokes " + getSelf().path().name() + " to do " + message.getClass().getSimpleName()); + addLogMsg(getSender().path().name() + " invokes " + getSelf().path().name() + " to do " + message.getClass().getSimpleName()); if (message instanceof ActionInvokeRevive) { ((ActionInvokeRevive) message).doAction(this); } @@ -82,11 +89,15 @@ public class WalletImpl extends AbstractWallet { if (preKnownNeighbour != null) { addKnownNeighbor(preKnownNeighbourName, preKnownNeighbour); preKnownNeighbour.tell(new ActionJoin(), getSelf()); - ActionJoinAnswer aja = new ActionJoinAnswer(this.getRemoteSuperVisorActor()); + ActionJoinAnswer aja = new ActionJoinAnswer(); aja.someNeighbors.putAll(getKnownNeighbors()); aja.someNeighbors.put(name, getSelf()); preKnownNeighbour.tell(aja, getSelf()); + if (this.getRemoteSuperVisorActor() != null) { + preKnownNeighbour.tell(new ActionTellSupervisor(this.getRemoteSuperVisorActor()), self()); + } + } } @@ -145,32 +156,40 @@ public class WalletImpl extends AbstractWallet { return remoteSuperVisorActor; } + public Future<ActorRef> resolveSuperVisorActor() { + // TODO: this should return only, if getRemoteSuperVisorActor() != null + return future(() -> getRemoteSuperVisorActor(), context().system().dispatcher()); + } + @Override public void setRemoteSuperVisorActor(ActorRef remoteSuperVisorActor) { if (this.remoteSuperVisorActor == null) { this.remoteSuperVisorActor = remoteSuperVisorActor; this.remoteSuperVisorActor.tell(new ServerActionJoin(getName()), getSelf()); + this.tellDeferedSuperVisorReceivers(remoteSuperVisorActor); } } - public WalletGuiControl getGui() { - return gui; - } - - public void setGui(WalletGuiControl gui) { - this.gui = gui; + @Override + public void deferSendOfSuperVisorActor(ActorRef destinationWallet) { + deferedSupervisorReceivers.add(destinationWallet); } - public String getPreKnownNeighbourName() { - return preKnownNeighbourName; + protected void tellDeferedSuperVisorReceivers(ActorRef supervisor) { + while (deferedSupervisorReceivers.size() > 0) { + ActorRef receiver = deferedSupervisorReceivers.poll(); + if (receiver != null) { + receiver.tell(new ActionTellSupervisor(supervisor), self()); + } + } } - public void setPreKnownNeighbourName(String preKnownNeighbourName) { - this.preKnownNeighbourName = preKnownNeighbourName; + public WalletGuiControl getGui() { + return gui; } - public boolean isActive() { - return isActive; + public void setGui(WalletGuiControl gui) { + this.gui = gui; } public void setActive(boolean isActive) { @@ -179,7 +198,7 @@ public class WalletImpl extends AbstractWallet { @Override public boolean addKnownNeighbor(String key, ActorRef value) { - log(key + " is newNeighbor of " + name + "?" + !getKnownNeighbors().containsKey(key)); + addLogMsg(key + " is newNeighbor of " + name + "?" + !getKnownNeighbors().containsKey(key)); if (getKnownNeighbors().containsKey(key) || key.equals(name)) { return false; } @@ -191,38 +210,39 @@ public class WalletImpl extends AbstractWallet { return newNeighbor; } - @Override - public void log(String msg) { - if (gui != null) { - gui.addLogMsg(msg); - } else { - System.out.println(msg); - } + public void send(String address, int amount) { + send(address, amount, null); } + @Override + public void send(String address, int amount, ActorRef observer) { + getSelf().tell(new ActionInvokeSentMoney(address, amount, observer), getSelf()); + } @Override - public void logTransactionSuccess(String msg) { + public void addLogMsg(String message) { if (gui != null) { - gui.addTransactionLogMessageSuccess(msg); + gui.addLogMsg(message); } else { - System.out.println(msg); + System.out.println(message); } } - @Override - public void logTransactionFail(String msg) { + public void addTransactionLogMessageSuccess(String message) { if (gui != null) { - gui.addTransactionLogMessageFail(msg); + gui.addTransactionLogMessageSuccess(message); } else { - System.out.println(msg); + System.out.println(message); } } @Override - public void send(String address, int amount) { - getSelf().tell(new ActionInvokeSentMoney(address, amount), getSelf()); + public void addTransactionLogMessageFail(String message) { + if (gui != null) { + gui.addTransactionLogMessageFail(message); + } else { + System.out.println(message); + } } - }