Skip to content
Snippets Groups Projects
Commit e9ea7f29 authored by lkeidel's avatar lkeidel
Browse files

Merge branch 'dev-group3' into 'master'

Configuration system

State of the system as presented last week. Closes #18 

See merge request !5
parents f8c61f42 c7efb949
No related branches found
No related tags found
1 merge request!5Configuration system
Showing
with 526 additions and 85 deletions
......@@ -51,5 +51,10 @@
<artifactId>akka-remote_2.11</artifactId>
<version>2.4.7</version>
</dependency>
<dependency>
<groupId>org.reflections</groupId>
<artifactId>reflections</artifactId>
<version>0.9.10</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
......@@ -68,9 +68,4 @@ public abstract class AbstractNode extends UntypedActor implements Serializable
public HashMap<String, ActorRef> getKnownNeighbors() {
return knownNeighbors;
}
public void log(String string) {
System.out.println(getSelf().path().name() + ": " + string);
}
}
\ No newline at end of file
package fucoin;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import com.google.common.collect.Sets;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import fucoin.actions.join.ActionJoinAnswer;
import fucoin.configurations.AbstractConfiguration;
import fucoin.configurations.internal.ConfigurationSelection;
import fucoin.setup.NetworkInterfaceReader;
import fucoin.supervisor.SuperVisorImpl;
import fucoin.wallet.WalletImpl;
import org.reflections.ReflectionUtils;
import org.reflections.Reflections;
import org.reflections.scanners.ResourcesScanner;
import org.reflections.scanners.SubTypesScanner;
import org.reflections.scanners.TypeElementsScanner;
import org.reflections.util.ClasspathHelper;
import org.reflections.util.ConfigurationBuilder;
import org.reflections.util.FilterBuilder;
import javax.swing.*;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.lang.reflect.Modifier;
import java.util.*;
public class Main {
private static int numberOfWallets = 2;
private static ActorSystem cSystem;
private static ActorRef cSuperVisorActor;
private static List<ActorRef> cActiveActors = new ArrayList<>();
static {
String hostname = NetworkInterfaceReader.readDefaultHostname();
......@@ -40,36 +42,65 @@ public class Main {
//Init System Actor System
cSystem = ActorSystem.create("Core", ConfigFactory.parseString("akka.remote.netty.tcp.hostname=" + hostname).withFallback(config));
cSuperVisorActor = cSystem.actorOf(SuperVisorImpl.props(), "SuperVisorImpl");
}
public static void main(String[] args) throws InterruptedException {
createWallets();
}
private static void createWallets() {
//Init Wallets
for (int i = 0; i < numberOfWallets; i++) {
String nameOfTheWallet = "Wallet" + String.valueOf(i);
Props props;
public static void main(String[] args) throws InterruptedException, IllegalAccessException, InstantiationException {
List<ConfigurationSelection> configurations = getAbstractConfigurations();
if (i > 0) {
//chain the wallets. wallet2 knows wallet1, wallet3 knows wallet2 and so on.
props = WalletImpl.props(cActiveActors.get(i - 1), nameOfTheWallet);
} else {
props = WalletImpl.props(null, nameOfTheWallet);
}
ConfigurationSelection[] configs = new ConfigurationSelection[configurations.size()];
configurations.toArray(configs);
ActorRef actorRef = cSystem.actorOf(props, nameOfTheWallet);
// Display the selection dialog to select a configuration
ConfigurationSelection selectedConfig = (ConfigurationSelection) JOptionPane.showInputDialog(
null,
"Select a configuration to run",
"Configuration Selection",
JOptionPane.QUESTION_MESSAGE,
null,
configs,
configurations.get(0)
);
// first wallet does not have a neighbour, so it can't send a ActionJoin to anybody
// instead we send directly an ActionJoinAnswer with the supervisor reference
if (i == 0) {
actorRef.tell(new ActionJoinAnswer(cSuperVisorActor), cSuperVisorActor);
}
if (selectedConfig != null) {
// The Configuration will be an actor in the system, so we tell akka to create the actor
Props theProps = AbstractConfiguration.props(selectedConfig.getConfigurationClass());
cActiveActors.add(actorRef);
cSystem.actorOf(theProps, "Configuration");
} else {
cSystem.terminate();
}
}
/**
* This method crawls the fucoin.configurations package for classes extending the AbstractConfiguration.
*
* @return The list contains all non abstract extensions of AbstractConfiguration in the namespace, wrapped in ConfigurationSelection objects
* @throws InstantiationException
* @throws IllegalAccessException
*/
private static List<ConfigurationSelection> getAbstractConfigurations() throws InstantiationException, IllegalAccessException {
List<ClassLoader> classLoadersList = new LinkedList<>();
// Lots of reflection magic happening here!
classLoadersList.add(ClasspathHelper.contextClassLoader());
classLoadersList.add(ClasspathHelper.staticClassLoader());
Reflections reflections = new Reflections(new ConfigurationBuilder()
.setScanners(new SubTypesScanner(false), new ResourcesScanner(), new TypeElementsScanner())
.setUrls(ClasspathHelper.forClassLoader(classLoadersList.toArray(new ClassLoader[0])))
.filterInputsBy(new FilterBuilder().include(FilterBuilder.prefix("fucoin.configurations"))));
Set<String> typeSet = reflections.getStore().get("TypeElementsScanner").keySet();
HashSet<Class<?>> allClasses = Sets.newHashSet(ReflectionUtils.forNames(typeSet, reflections
.getConfiguration().getClassLoaders()));
List<ConfigurationSelection> configurations = new ArrayList<>();
// Filter the found classes for non abstract classes, that inherit from AbstractConfiguration
allClasses.stream().filter(oneClass -> !Modifier.isAbstract(oneClass.getModifiers()) && AbstractConfiguration.class.isAssignableFrom(oneClass)).forEach(oneClass -> {
ConfigurationSelection cfg = new ConfigurationSelection((Class<AbstractConfiguration>) oneClass);
configurations.add(cfg);
});
return configurations;
}
}
......@@ -91,7 +91,7 @@ public class MainRemote {
}
// spawn wallet
system.actorOf(WalletImpl.props(preknownNeighbour, walletName), walletName);
system.actorOf(WalletImpl.props(preknownNeighbour, walletName, true), walletName);
}
}
package fucoin.actions.control;
import akka.actor.ActorRef;
import akka.actor.UntypedActorContext;
import fucoin.actions.transaction.SuperVisorAction;
import fucoin.supervisor.SuperVisorImpl;
/**
* Announce, that there will be some wallets spawned.
* Used by the configurator to wait, until all new wallets have their initial money.
* Send to supervisor.
*/
public class ActionAnnounceWalletCreation extends SuperVisorAction {
public int numOfWallets;
public ActorRef observer;
public ActionAnnounceWalletCreation(int numOfWallets, ActorRef observer) {
this.numOfWallets = numOfWallets;
this.observer = observer;
}
@Override
protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, SuperVisorImpl abstractNode) {
System.out.println("Waiting for wallet transactions");
abstractNode.setPendingBankCommits(abstractNode.getPendingBankCommits() + numOfWallets);
abstractNode.setBankCommitObserver(sender);
}
}
package fucoin.actions.control;
import akka.actor.ActorRef;
import akka.actor.UntypedActorContext;
import fucoin.actions.Action;
import fucoin.configurations.AbstractConfiguration;
/**
* This message tells the configuration, that the wallets have been spawned successfully
*/
public class ActionWalletCreationDone extends Action<AbstractConfiguration> {
@Override
protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, AbstractConfiguration abstractNode) {
}
}
package fucoin.actions.control;
import akka.actor.ActorRef;
import akka.actor.UntypedActorContext;
import fucoin.actions.ClientAction;
import fucoin.wallet.AbstractWallet;
/**
*
*/
public class ActionWalletSendMoney extends ClientAction {
protected String address;
protected int amount;
protected ActorRef observer;
public ActionWalletSendMoney(String address, int amount, ActorRef observer) {
this.address = address;
this.amount = amount;
this.observer = observer;
}
public ActionWalletSendMoney(String address, int amount){
this(address, amount, null);
}
@Override
protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, AbstractWallet abstractNode) {
abstractNode.send(address, amount, observer);
}
}
......@@ -14,8 +14,14 @@ public class ActionJoin extends ClientAction {
protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, AbstractWallet node) {
// send the joined node all known neighbours from node and a reference to the supervisor
ActionJoinAnswer aja = new ActionJoinAnswer(node.getRemoteSuperVisorActor());
ActionJoinAnswer aja = new ActionJoinAnswer();
aja.someNeighbors.putAll(node.getKnownNeighbors());
sender.tell(aja, self);
if (node.getRemoteSuperVisorActor() == null) {
node.deferSendOfSuperVisorActor(sender);
} else {
sender.tell(new ActionTellSupervisor(node.getRemoteSuperVisorActor()), self);
}
}
}
......@@ -2,9 +2,13 @@ package fucoin.actions.join;
import akka.actor.ActorRef;
import akka.actor.UntypedActorContext;
import akka.dispatch.OnSuccess;
import fucoin.actions.ClientAction;
import fucoin.actions.persist.ActionSearchMyWallet;
import fucoin.wallet.AbstractWallet;
import scala.Function1;
import scala.concurrent.Future;
import scala.runtime.BoxedUnit;
import java.util.HashMap;
import java.util.Map.Entry;
......@@ -20,16 +24,11 @@ import java.util.Map.Entry;
*/
public class ActionJoinAnswer extends ClientAction {
public final HashMap<String, ActorRef> someNeighbors = new HashMap<>();
public final ActorRef supervisor;
public ActionJoinAnswer(ActorRef supervisor) {
this.supervisor = supervisor;
}
protected void onAction(ActorRef sender, ActorRef self,
UntypedActorContext context, AbstractWallet wallet) {
wallet.log("Addressed to " + self.path().name() + " from " + sender.path().name() + ": someNeighbors:" + someNeighbors);
wallet.addLogMsg("Addressed to " + self.path().name() + " from " + sender.path().name() + ": someNeighbors:" + someNeighbors);
// your neighbours? my neighbours!
for (Entry<String, ActorRef> neighbor : someNeighbors.entrySet()) {
......@@ -38,11 +37,6 @@ public class ActionJoinAnswer extends ClientAction {
for (Entry<String, ActorRef> neighbor : someNeighbors.entrySet()) {
neighbor.getValue().tell(new ActionSearchMyWallet(wallet.getName()), self);
}
// register at the supervisor if the wallet just learned about it
if (wallet.getRemoteSuperVisorActor() == null) {
wallet.setRemoteSuperVisorActor(supervisor);
}
}
}
package fucoin.actions.join;
import akka.actor.ActorRef;
import akka.actor.UntypedActorContext;
import fucoin.actions.ClientAction;
import fucoin.wallet.AbstractWallet;
/**
* Tell the joining node the supervisor
*/
public class ActionTellSupervisor extends ClientAction {
public final ActorRef supervisor;
public ActionTellSupervisor(ActorRef supervisor) {
this.supervisor = supervisor;
}
@Override
protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, AbstractWallet abstractNode) {
abstractNode.setRemoteSuperVisorActor(supervisor);
}
}
......@@ -21,12 +21,12 @@ public class ServerActionJoin extends SuperVisorAction {
@Override
protected void onAction(ActorRef sender, ActorRef self,
UntypedActorContext context, SuperVisorImpl node) {
ActionJoinAnswer aja = new ActionJoinAnswer(node.getSelf());
ActionJoinAnswer aja = new ActionJoinAnswer(); // TODO: Might need added TellSupervisor
aja.someNeighbors.putAll(node.getKnownNeighbors());
sender.tell(aja, self);
node.addKnownNeighbor(name, sender);
self.tell(
new ActionInvokeDistributedCommittedTransfer(self, sender, 100),
new ActionInvokeDistributedCommittedTransfer(self, sender, 100, self),
sender);
}
}
......@@ -23,7 +23,7 @@ public class ActionSearchWalletReference extends Search {
@Override
protected void onAction(ActorRef sender, ActorRef self,
UntypedActorContext context, AbstractWallet wallet) {
wallet.log(wallet.getKnownNeighbors() + "contains " + name + "?");
wallet.addLogMsg(wallet.getKnownNeighbors() + "contains " + name + "?");
ttl.add(self);
ActionSearchWalletReferenceAnswer answer = null;
if (this.name.equals(wallet.getName())) {
......
......@@ -14,16 +14,19 @@ public class ActionCommitDistributedCommittedTransfer extends ClientAction {
private boolean granted;
private long timestamp;
private long id;
private ActorRef observer;
public ActionCommitDistributedCommittedTransfer(ActorRef source, ActorRef target,
int amount, boolean granted, long timestamp, long id) {
int amount, boolean granted, long timestamp, long id,
ActorRef observer) {
this.source = source;
this.target = target;
this.amount = amount;
this.granted = granted;
this.timestamp = timestamp;
this.id = id;
this.observer = observer;
}
public ActionCommitDistributedCommittedTransfer(
......@@ -34,36 +37,45 @@ public class ActionCommitDistributedCommittedTransfer extends ClientAction {
this.granted = false;
this.timestamp = outdatedRequest.getTimeout();
this.id = outdatedRequest.getId();
this.observer = outdatedRequest.getObserver();
}
@Override
protected void onAction(ActorRef sender, ActorRef self,
UntypedActorContext context, AbstractWallet wallet) {
wallet.log("ActionCommitDistributedCommittedTransfer is granted? " + granted);
wallet.addLogMsg("ActionCommitDistributedCommittedTransfer is granted? " + granted);
if (granted) {
Integer sourceAmount = wallet.amounts.getOrDefault(source, 0);
Integer targetAmount = wallet.amounts.getOrDefault(target, 0);
wallet.amounts.put(source, sourceAmount - amount);
wallet.amounts.put(target, targetAmount + amount);
if (source.compareTo(self) == 0) {
wallet.setAmount(wallet.getAmount() - amount);
wallet.logTransactionSuccess("Sent " + amount + " FUC to " + target.path().name());
wallet.addTransactionLogMessageSuccess("Sent " + amount + " FUC to " + target.path().name());
} else if (target.compareTo(self) == 0) {
wallet.setAmount(wallet.getAmount() + amount);
wallet.logTransactionSuccess("Received " + amount + " FUC from " + source.path().name());
wallet.addTransactionLogMessageSuccess("Received " + amount + " FUC from " + source.path().name());
// recipient should notify a possible observer
if (observer != null) {
observer.tell(new ActionNotifyObserver(source, target, amount, granted, timestamp, id), self);
}
}
} else {
wallet.log("abort transaction with id" + id);
wallet.addLogMsg("abort transaction with id" + id);
// rollback
Integer sourceAmount = wallet.amounts.getOrDefault(source, 0);
Integer targetAmount = wallet.amounts.getOrDefault(target, 0);
wallet.amounts.put(source, sourceAmount + amount);
wallet.amounts.put(target, targetAmount - amount);
if (source.compareTo(self) == 0) {
wallet.logTransactionFail("Failed to send " + amount + " FUC to " + target.path().name() + " (Commit has not been granted)");
wallet.addTransactionLogMessageFail("Failed to send " + amount + " FUC to " + target.path().name() + " (Commit has not been granted)");
if (observer != null) {
observer.tell(new ActionNotifyObserver(source, target, amount, granted, timestamp, id), self);
}
}
}
wallet.log("wallet.amounts:" + wallet.amounts);
//wallet.addLogMsg("wallet.amounts:" + wallet.amounts);
}
}
......@@ -9,25 +9,32 @@ public class ActionInvokeDistributedCommittedTransfer extends CoordinatorTransac
private ActorRef source;
private ActorRef target;
private ActorRef observer;
private int amount;
public ActionInvokeDistributedCommittedTransfer(ActorRef source,
ActorRef target, int amount) {
ActorRef target, int amount){
this(source, target, amount, null);
}
public ActionInvokeDistributedCommittedTransfer(ActorRef source,
ActorRef target, int amount, ActorRef observer) {
this.source = source;
this.target = target;
this.observer = observer;
this.amount = amount;
}
@Override
protected void onAction(ActorRef sender, ActorRef self,
UntypedActorContext context, SuperVisorImpl superVisor) {
superVisor.log("invoke transaction " + source.path().name() +
superVisor.addLogMsg("invoke transaction " + source.path().name() +
" sends " + amount +
" to " + target.path().name());
long timeout = System.currentTimeMillis() + 500;
DistributedCommittedTransferRequest ds = new DistributedCommittedTransferRequest(source, target, amount, timeout);
DistributedCommittedTransferRequest ds = new DistributedCommittedTransferRequest(source, target, amount, timeout, observer);
superVisor.addDistributedCommitedTransferRequest(ds);
ActionPrepareDistributedCommittedTransfer apdct = new ActionPrepareDistributedCommittedTransfer(source, target, amount, timeout, ds.getId());
for (ActorRef neighbor : superVisor.getKnownNeighbors().values()) {
......
......@@ -9,19 +9,21 @@ import fucoin.wallet.AbstractWallet;
public class ActionInvokeSentMoney extends Transaction {
public final String name;
public final int amount;
public final ActorRef observer;
public ActionInvokeSentMoney(String name, int amount) {
public ActionInvokeSentMoney(String name, int amount, ActorRef observer) {
this.name = name;
this.amount = amount;
this.observer = observer;
}
@Override
protected void onAction(ActorRef sender, ActorRef self,
UntypedActorContext context, AbstractWallet wallet) {
wallet.log(wallet.getKnownNeighbors() + "");
wallet.addLogMsg(wallet.getKnownNeighbors() + "");
if (wallet.getKnownNeighbors().containsKey(name)) {
wallet.getRemoteSuperVisorActor().tell(
new ActionInvokeDistributedCommittedTransfer(self, wallet.getKnownNeighbors().get(name), amount), sender);
new ActionInvokeDistributedCommittedTransfer(self, wallet.getKnownNeighbors().get(name), amount, observer), sender);
} else {
ActionSearchWalletReference aswr = new ActionSearchWalletReference(name);
for (ActorRef neighbor : wallet.getKnownNeighbors().values()) {
......
package fucoin.actions.transaction;
import akka.actor.ActorRef;
import akka.actor.UntypedActorContext;
import fucoin.AbstractNode;
import fucoin.actions.Action;
/**
* Used to notify an observer of a transaction when the transaction is finished.
*/
public class ActionNotifyObserver extends Action<AbstractNode> {
public ActorRef source;
public ActorRef target;
public int amount;
public boolean granted;
public long timestamp;
public long id;
public ActionNotifyObserver(ActorRef source, ActorRef target,
int amount, boolean granted, long timestamp, long id){
this.source = source;
this.target = target;
this.amount = amount;
this.granted = granted;
this.timestamp = timestamp;
this.id = id;
}
@Override
protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, AbstractNode abstractNode) {
}
}
......@@ -32,6 +32,12 @@ public class ActionPrepareDistributedCommittedTransfer extends Transaction {
//sender have enough money
&& wallet.amounts.getOrDefault(source, 0) >= amount));
// precautionly update own ledger to prevent double spending (respectively agreeing)
Integer sourceAmount = wallet.amounts.getOrDefault(source, 0);
Integer targetAmount = wallet.amounts.getOrDefault(target, 0);
wallet.amounts.put(source, sourceAmount - amount);
wallet.amounts.put(target, targetAmount + amount);
sender.tell(new ActionPrepareDistributedCommittedTransferAnswer(source, target, amount, timestamp, granted, id), self);
}
......
......@@ -27,16 +27,23 @@ public class ActionPrepareDistributedCommittedTransferAnswer extends Coordinator
@Override
protected void onAction(ActorRef sender, ActorRef self,
UntypedActorContext context, SuperVisorImpl superVisor) {
superVisor.log("" + superVisor.getKnownNeighbors());
superVisor.log("granted?" + granted);
superVisor.addLogMsg("granted?" + granted);
DistributedCommittedTransferRequest request = superVisor.getRequest(id);
// ignore unknown requests
if (request == null){
return;
}
if (granted) {
if (request == null)//unknown DistributedCommittedTransferRequest ignore
return;
int newCount = request.addPositiveAnswer(sender);
if (newCount == superVisor.getKnownNeighbors().size()) {
ActionCommitDistributedCommittedTransfer acdct = new ActionCommitDistributedCommittedTransfer(source, target, amount, true, timestamp, id);
ActionCommitDistributedCommittedTransfer acdct = new ActionCommitDistributedCommittedTransfer(source, target, amount, true, timestamp, id, request.getObserver());
superVisor.addTransactionLogMessageSuccess("Transfer of " + amount + " FUC from" + source.path().name() + " to " + target.path().name());
for (ActorRef neighbor : request.getAnswers()) {
neighbor.tell(acdct, self);
}
......@@ -44,13 +51,11 @@ public class ActionPrepareDistributedCommittedTransferAnswer extends Coordinator
}
} else {
//A client wants to rollback
if (request != null) {
superVisor.log("Client does not grant commit of " + amount + " FUC (" + source.path().name() + " -> " + target.path().name() + ")");
ActionCommitDistributedCommittedTransfer acdct = new ActionCommitDistributedCommittedTransfer(source, target, amount, false, timestamp, id);
superVisor.addTransactionLogMessageFail("Client does not grant commit of " + amount + " FUC (" + source.path().name() + " -> " + target.path().name() + ")");
ActionCommitDistributedCommittedTransfer acdct = new ActionCommitDistributedCommittedTransfer(source, target, amount, false, timestamp, id, request.getObserver());
for (ActorRef neighbor : request.getAnswers()) {
neighbor.tell(acdct, self);
}
}
}
}
......
package fucoin.configurations;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.pattern.Patterns;
import akka.util.Timeout;
import fucoin.AbstractNode;
import fucoin.actions.control.ActionAnnounceWalletCreation;
import fucoin.actions.control.ActionWalletSendMoney;
import fucoin.actions.join.ActionTellSupervisor;
import fucoin.actions.transaction.ActionGetAmount;
import fucoin.actions.transaction.ActionGetAmountAnswer;
import fucoin.actions.transaction.ActionNotifyObserver;
import fucoin.configurations.internal.ConfigurationCreator;
import fucoin.supervisor.SuperVisorImpl;
import fucoin.wallet.WalletImpl;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
/**
*
*/
public abstract class AbstractConfiguration extends AbstractNode {
private ActorRef superVisor;
private final List<ActorRef> activeActors = new ArrayList<>();
private Timeout timeout = new Timeout(Duration.create(10, "seconds"));
private int remainingTransactions;
public static Props props(Class configurationClass) {
return Props.create(new ConfigurationCreator(configurationClass));
}
/**
* Spawns a new wallet and blocks until it has received its initial money
*
* @throws Exception on timeout
*/
ActorRef spawnWallet(String name, boolean createGUI) throws Exception {
Future<Object> future = Patterns.ask(superVisor, new ActionAnnounceWalletCreation(1, self()), timeout);
ActorRef wallet = createWallet(name, createGUI);
Await.result(future, timeout.duration());
return wallet;
}
/**
* Creates a wallet without blocking until the wallet was created
*/
private ActorRef createWallet(String name, boolean createGUI) {
Props props;
int numOfWallets = activeActors.size();
if (numOfWallets == 0) {
props = WalletImpl.props(null, name, createGUI);
} else {
props = WalletImpl.props(activeActors.get(numOfWallets - 1), name, createGUI);
}
ActorRef actorRef = context().actorOf(props, name);
activeActors.add(actorRef);
if (numOfWallets == 0) {
actorRef.tell(new ActionTellSupervisor(superVisor), superVisor);
}
return actorRef;
}
/**
* Spawn multiple wallets and wait until they all have their initial FUC
*
* @throws Exception on timeout
*/
public void spawnWallets(int n, boolean createGUI) throws Exception {
Future<Object> future = Patterns.ask(superVisor, new ActionAnnounceWalletCreation(n, self()), timeout);
for (int i = 0; i < n; i++) {
String nameOfTheWallet = "Wallet" + String.valueOf(activeActors.size());
createWallet(nameOfTheWallet, createGUI);
}
Await.result(future, timeout.duration());
}
/**
* Fetch a random wallet
*/
public ActorRef getRandomWallet() {
return activeActors.get(ThreadLocalRandom.current().nextInt(activeActors.size()));
}
public List<ActorRef> wallets() {
return this.activeActors;
}
protected void randomTransactions(int number, int maxTransactionsAtTheSameTime) {
remainingTransactions = number;
for (int i = 0; i < Math.min(number, maxTransactionsAtTheSameTime); i++) {
nextRandomTransaction();
}
}
private void nextRandomTransaction() {
remainingTransactions--;
try {
randomTransaction();
} catch (Exception e) {
System.err.println("Error while trying to perform a random transaction: "+e.getMessage());
remainingTransactions = 0;
}
}
private void randomTransaction() throws Exception {
List<ActorRef> wallets = wallets();
Collections.shuffle(wallets);
ActorRef sender = wallets.get(0);
ActorRef recipient = wallets.get(1);
Future<Object> future = Patterns.ask(sender, new ActionGetAmount(), timeout);
ActionGetAmountAnswer answer = (ActionGetAmountAnswer) Await.result(future, timeout.duration());
int transferAmount = 1 + ThreadLocalRandom.current().nextInt(answer.amount);
sender.tell(new ActionWalletSendMoney(recipient.path().name(), transferAmount, self()), self());
}
/**
* Create the supervisor node
*/
ActorRef initSupervisor() {
superVisor = context().actorOf(SuperVisorImpl.props(), "SuperVisorImpl");
// Don't ask.
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
return superVisor;
}
@Override
public void onReceive(Object message) {
if (message instanceof ActionNotifyObserver) {
ActionNotifyObserver notification = (ActionNotifyObserver) message;
String status = "successful";
if (!notification.granted) {
status = "failed";
}
System.out.println("Observed a " + status + " transaction of " + notification.amount + " FUCs from " +
notification.source.path().name() + " to " + notification.target.path().name());
if (remainingTransactions > 0) {
nextRandomTransaction();
}
}
}
@Override
public void preStart() throws Exception {
super.preStart();
this.run();
}
public abstract void run();
}
package fucoin.configurations;
import akka.actor.ActorRef;
import akka.pattern.Patterns;
import akka.util.Timeout;
import fucoin.actions.control.ActionWalletSendMoney;
import fucoin.actions.transaction.ActionGetAmount;
import fucoin.actions.transaction.ActionGetAmountAnswer;
import fucoin.actions.transaction.ActionNotifyObserver;
import fucoin.configurations.internal.ConfigurationName;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
/**
* This configuration is the previous default of 2 wallets with GUI and a supervisor.
*/
@ConfigurationName("Default Configuration")
public class DefaultConfiguration extends AbstractConfiguration {
private ThreadLocalRandom rand;
private Timeout timeout = new Timeout(Duration.create(10, "seconds"));
public DefaultConfiguration(){
rand = ThreadLocalRandom.current();
}
@Override
public void run() {
initSupervisor();
ActorRef wallet1 = null;
ActorRef wallet2 = null;
try {
wallet1 = spawnWallet("Wallet0", true);
} catch (Exception e) {
System.out.println("Wallet0 spawning timed out");
}
try {
wallet2 = spawnWallet("Wallet1", true);
} catch (Exception e) {
System.out.println("Wallet1 spawning timed out");
}
if (wallet1 != null && wallet2 != null) {
wallet1.tell(new ActionWalletSendMoney(wallet2.path().name(), 50, getSelf()), wallet1);
}
}
@Override
public void onReceive(Object message) {
super.onReceive(message);
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment