Skip to content
Snippets Groups Projects
Commit bf18ee3f authored by berleon's avatar berleon
Browse files

Implement snapshot algorithm

parent c780e79e
Branches
No related tags found
No related merge requests found
Showing
with 428 additions and 63 deletions
...@@ -3,13 +3,21 @@ package fucoin; ...@@ -3,13 +3,21 @@ package fucoin;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.actor.Address; import akka.actor.Address;
import akka.actor.UntypedActor; import akka.actor.UntypedActor;
import fucoin.actions.snap.SnapShotBegin;
import fucoin.actions.transaction.ActionGetAmount; import fucoin.actions.transaction.ActionGetAmount;
import fucoin.supervisor.SuperVisorImpl;
import fucoin.wallet.AbstractWallet; import fucoin.wallet.AbstractWallet;
import scala.concurrent.Promise;
import java.io.Serializable; import java.io.Serializable;
import java.util.HashMap; import java.util.HashMap;
public abstract class AbstractNode extends UntypedActor implements Serializable { public abstract class AbstractNode extends UntypedActor implements Serializable {
protected SnapshotToken currentSnapshotToken;
protected Snapshot snapshot;
/** /**
* Returns the akka-style address as String, * Returns the akka-style address as String,
...@@ -65,7 +73,41 @@ public abstract class AbstractNode extends UntypedActor implements Serializable ...@@ -65,7 +73,41 @@ public abstract class AbstractNode extends UntypedActor implements Serializable
return false; return false;
} }
public SnapshotToken getCurrentSnapshotToken() {
return currentSnapshotToken;
}
public void setCurrentSnapshotToken(SnapshotToken currentSnapshotToken) {
this.currentSnapshotToken = currentSnapshotToken;
}
public Snapshot getSnapshot() {
return snapshot;
}
public HashMap<String, ActorRef> getKnownNeighbors() { public HashMap<String, ActorRef> getKnownNeighbors() {
return knownNeighbors; return knownNeighbors;
} }
public Promise<Snapshot> makeSnapshot() {
String prefix = "";
if (this instanceof SuperVisorImpl) {
prefix = "supervisor";
} else if (this instanceof AbstractWallet) {
AbstractWallet wallet = (AbstractWallet) this;
prefix = wallet.getName();
}
String snapName = prefix + "_" + String.valueOf(System.currentTimeMillis());
this.currentSnapshotToken = new SnapshotToken(snapName, self());
this.snapshot = new Snapshot();
if (this instanceof AbstractWallet) {
AbstractWallet wallet = (AbstractWallet) this;
this.snapshot.addState(wallet.getName(), wallet.getAmount());
}
for (ActorRef act : getKnownNeighbors().values()) {
act.tell(new SnapShotBegin(this.currentSnapshotToken), self());
}
return this.snapshot.promise();
}
} }
\ No newline at end of file
package fucoin;
import akka.dispatch.Futures;
import fucoin.actions.snap.SnapShotSaveState;
import org.json.JSONObject;
import scala.concurrent.Promise;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
public class Snapshot {
public HashMap<String, Integer> states = new HashMap<>();
public List<SnapshotTransaction> transactions = new ArrayList<>();
private Promise<Snapshot> complete = Futures.promise();
@Override
public String toString() {
return "Snapshot{" +
"states=" + states +
", transactions=" + transactions +
'}';
}
public JSONObject toJson() {
return null;
}
public void addState(String name, int amount) {
states.put(name, amount);
}
public void addState(SnapShotSaveState state) {
states.put(state.name, state.amount);
}
public void addTransaction(SnapshotTransaction snapshotTransaction) {
transactions.add(snapshotTransaction);
}
public void completed() {
this.complete.success(this);
}
public Promise<Snapshot> promise() {
return this.complete;
}
}
package fucoin;
import akka.actor.ActorRef;
public class SnapshotToken {
public String token;
public ActorRef observer;
public SnapshotToken(String token, ActorRef observer) {
this.token = token;
this.observer = observer;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof SnapshotToken)) return false;
SnapshotToken that = (SnapshotToken) o;
if (token != null ? !token.equals(that.token) : that.token != null) return false;
return observer != null ? observer.equals(that.observer) : that.observer == null;
}
@Override
public int hashCode() {
int result = token != null ? token.hashCode() : 0;
result = 31 * result + (observer != null ? observer.hashCode() : 0);
return result;
}
}
package fucoin;
public class SnapshotTransaction {
public String sender;
public String receiver;
public int amount;
public SnapshotTransaction(String sender, String receiver, int amount) {
this.sender = sender;
this.receiver = receiver;
this.amount = amount;
}
}
...@@ -3,6 +3,7 @@ package fucoin.actions; ...@@ -3,6 +3,7 @@ package fucoin.actions;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.actor.UntypedActorContext; import akka.actor.UntypedActorContext;
import fucoin.AbstractNode; import fucoin.AbstractNode;
import fucoin.SnapshotToken;
import java.io.Serializable; import java.io.Serializable;
...@@ -17,4 +18,6 @@ public abstract class Action<T extends AbstractNode> implements Serializable { ...@@ -17,4 +18,6 @@ public abstract class Action<T extends AbstractNode> implements Serializable {
protected abstract void onAction(ActorRef sender, ActorRef self, protected abstract void onAction(ActorRef sender, ActorRef self,
UntypedActorContext context, T abstractNode); UntypedActorContext context, T abstractNode);
} }
...@@ -2,6 +2,7 @@ package fucoin.actions.control; ...@@ -2,6 +2,7 @@ package fucoin.actions.control;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.actor.UntypedActorContext; import akka.actor.UntypedActorContext;
import fucoin.SnapshotToken;
import fucoin.actions.ClientAction; import fucoin.actions.ClientAction;
import fucoin.wallet.AbstractWallet; import fucoin.wallet.AbstractWallet;
......
package fucoin.actions.snap;
import fucoin.AbstractNode;
import fucoin.actions.Action;
public abstract class SnapAction extends Action<AbstractNode> {
}
package fucoin.actions.snap;
import akka.actor.ActorRef;
import akka.actor.UntypedActorContext;
import fucoin.AbstractNode;
import fucoin.SnapshotToken;
import fucoin.actions.Action;
import fucoin.actions.ClientAction;
import fucoin.wallet.AbstractWallet;
public class SnapShotBegin extends SnapAction {
private SnapshotToken snapshotToken;
public SnapShotBegin(SnapshotToken snapshotToken) {
this.snapshotToken = snapshotToken;
}
@Override
protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, AbstractNode abstractNode) {
abstractNode.setCurrentSnapshotToken(this.snapshotToken);
if (abstractNode instanceof AbstractWallet) {
AbstractWallet wallet = (AbstractWallet) abstractNode;
sender.tell(new SnapShotSaveState(wallet.getName(), wallet.getAmount()), self);
}
}
}
package fucoin.actions.snap;
import akka.actor.ActorRef;
import akka.actor.UntypedActorContext;
import fucoin.AbstractNode;
import fucoin.actions.ClientAction;
import fucoin.wallet.AbstractWallet;
public class SnapShotEnd extends SnapAction {
@Override
protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, AbstractNode abstractNode) {
abstractNode.setCurrentSnapshotToken(null);
}
}
package fucoin.actions.snap;
import akka.actor.ActorRef;
import akka.actor.UntypedActorContext;
import fucoin.AbstractNode;
import fucoin.Snapshot;
import fucoin.actions.Action;
import java.util.Set;
public class SnapShotSaveState extends SnapAction {
public String name;
public int amount;
public SnapShotSaveState(String name, int amount) {
this.name = name;
this.amount = amount;
}
@Override
protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context,
AbstractNode abstractNode) {
Snapshot snap = abstractNode.getSnapshot();
snap.addState(this);
Set<String> neighbors = abstractNode.getKnownNeighbors().keySet();
if (snap.states.keySet().containsAll(neighbors)) {
for (ActorRef node : abstractNode.getKnownNeighbors().values()) {
node.tell(new SnapShotEnd(), self);
}
snap.completed();
}
}
}
package fucoin.actions.snap;
import akka.actor.ActorRef;
import akka.actor.UntypedActorContext;
import fucoin.AbstractNode;
import fucoin.SnapshotTransaction;
import fucoin.actions.Action;
public class SnapShotSaveTransaction extends SnapAction {
public SnapshotTransaction transaction;
public SnapShotSaveTransaction(SnapshotTransaction transaction) {
this.transaction = transaction;
}
@Override
protected void onAction(ActorRef sender, ActorRef self,
UntypedActorContext context, AbstractNode abstractNode) {
abstractNode.getSnapshot().addTransaction(transaction);
}
}
...@@ -2,7 +2,10 @@ package fucoin.actions.transaction; ...@@ -2,7 +2,10 @@ package fucoin.actions.transaction;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.actor.UntypedActorContext; import akka.actor.UntypedActorContext;
import fucoin.SnapshotToken;
import fucoin.SnapshotTransaction;
import fucoin.actions.ClientAction; import fucoin.actions.ClientAction;
import fucoin.actions.snap.SnapShotSaveTransaction;
import fucoin.supervisor.DistributedCommittedTransferRequest; import fucoin.supervisor.DistributedCommittedTransferRequest;
import fucoin.wallet.AbstractWallet; import fucoin.wallet.AbstractWallet;
...@@ -15,17 +18,23 @@ public class ActionCommitDistributedCommittedTransfer extends ClientAction { ...@@ -15,17 +18,23 @@ public class ActionCommitDistributedCommittedTransfer extends ClientAction {
private long timestamp; private long timestamp;
private long id; private long id;
private ActorRef observer; private ActorRef observer;
private SnapshotToken snapshotToken;
public ActionCommitDistributedCommittedTransfer(ActorRef source, ActorRef target, public ActionCommitDistributedCommittedTransfer(ActorRef source, ActorRef target,
int amount, boolean granted, long timestamp, long id, int amount, boolean granted, long timestamp, long id,
ActorRef observer) { ActorRef observer) {
}
public ActionCommitDistributedCommittedTransfer(ActorRef source, ActorRef target,
int amount, boolean granted, long timestamp, long id,
SnapshotToken snapshotToken,
ActorRef observer) {
this.source = source; this.source = source;
this.target = target; this.target = target;
this.amount = amount; this.amount = amount;
this.granted = granted; this.granted = granted;
this.timestamp = timestamp; this.timestamp = timestamp;
this.id = id; this.id = id;
this.snapshotToken = snapshotToken;
this.observer = observer; this.observer = observer;
} }
...@@ -37,12 +46,24 @@ public class ActionCommitDistributedCommittedTransfer extends ClientAction { ...@@ -37,12 +46,24 @@ public class ActionCommitDistributedCommittedTransfer extends ClientAction {
this.granted = false; this.granted = false;
this.timestamp = outdatedRequest.getTimeout(); this.timestamp = outdatedRequest.getTimeout();
this.id = outdatedRequest.getId(); this.id = outdatedRequest.getId();
this.snapshotToken = outdatedRequest.getSnapshotToken();
this.observer = outdatedRequest.getObserver(); this.observer = outdatedRequest.getObserver();
} }
@Override @Override
protected void onAction(ActorRef sender, ActorRef self, protected void onAction(ActorRef sender, ActorRef self,
UntypedActorContext context, AbstractWallet wallet) { UntypedActorContext context, AbstractWallet wallet) {
boolean walletTokenSet = wallet.getCurrentSnapshotToken() != null;
if (granted && walletTokenSet && (this.snapshotToken == null
|| !this.snapshotToken.equals(wallet.getCurrentSnapshotToken())))
{
ActorRef snapObserver = wallet.getCurrentSnapshotToken().observer;
snapObserver.tell(new SnapShotSaveTransaction(
new SnapshotTransaction(
this.source.path().name(), this.target.path().name(), this.amount)), wallet.self());
}
//wallet.addLogMsg("ActionCommitDistributedCommittedTransfer is granted? " + granted); //wallet.addLogMsg("ActionCommitDistributedCommittedTransfer is granted? " + granted);
if (granted) { if (granted) {
......
...@@ -34,7 +34,7 @@ public class ActionInvokeDistributedCommittedTransfer extends CoordinatorTransac ...@@ -34,7 +34,7 @@ public class ActionInvokeDistributedCommittedTransfer extends CoordinatorTransac
long timeout = System.currentTimeMillis() + 500; long timeout = System.currentTimeMillis() + 500;
DistributedCommittedTransferRequest ds = new DistributedCommittedTransferRequest(source, target, amount, timeout, observer); DistributedCommittedTransferRequest ds = new DistributedCommittedTransferRequest(source, target, amount, timeout, superVisor.getCurrentSnapshotToken(), observer);
superVisor.addDistributedCommitedTransferRequest(ds); superVisor.addDistributedCommitedTransferRequest(ds);
ActionPrepareDistributedCommittedTransfer apdct = new ActionPrepareDistributedCommittedTransfer(source, target, amount, timeout, ds.getId()); ActionPrepareDistributedCommittedTransfer apdct = new ActionPrepareDistributedCommittedTransfer(source, target, amount, timeout, ds.getId());
for (ActorRef neighbor : superVisor.getKnownNeighbors().values()) { for (ActorRef neighbor : superVisor.getKnownNeighbors().values()) {
......
...@@ -42,7 +42,8 @@ public class ActionPrepareDistributedCommittedTransferAnswer extends Coordinator ...@@ -42,7 +42,8 @@ public class ActionPrepareDistributedCommittedTransferAnswer extends Coordinator
int newCount = request.addPositiveAnswer(sender); int newCount = request.addPositiveAnswer(sender);
if (newCount == superVisor.getKnownNeighbors().size()) { if (newCount == superVisor.getKnownNeighbors().size()) {
ActionCommitDistributedCommittedTransfer acdct = new ActionCommitDistributedCommittedTransfer(source, target, amount, true, timestamp, id, request.getObserver()); ActionCommitDistributedCommittedTransfer acdct = new ActionCommitDistributedCommittedTransfer(
source, target, amount, true, timestamp, id, superVisor.getCurrentSnapshotToken(), request.getObserver());
superVisor.addTransactionLogMessageSuccess("Transfer of " + amount + " FUC from" + source.path().name() + " to " + target.path().name()); superVisor.addTransactionLogMessageSuccess("Transfer of " + amount + " FUC from" + source.path().name() + " to " + target.path().name());
for (ActorRef neighbor : request.getAnswers()) { for (ActorRef neighbor : request.getAnswers()) {
neighbor.tell(acdct, self); neighbor.tell(acdct, self);
...@@ -52,7 +53,8 @@ public class ActionPrepareDistributedCommittedTransferAnswer extends Coordinator ...@@ -52,7 +53,8 @@ public class ActionPrepareDistributedCommittedTransferAnswer extends Coordinator
} else { } else {
//A client wants to rollback //A client wants to rollback
superVisor.addTransactionLogMessageFail("Client does not grant commit of " + amount + " FUC (" + source.path().name() + " -> " + target.path().name() + ")"); superVisor.addTransactionLogMessageFail("Client does not grant commit of " + amount + " FUC (" + source.path().name() + " -> " + target.path().name() + ")");
ActionCommitDistributedCommittedTransfer acdct = new ActionCommitDistributedCommittedTransfer(source, target, amount, false, timestamp, id, request.getObserver()); ActionCommitDistributedCommittedTransfer acdct = new ActionCommitDistributedCommittedTransfer(
source, target, amount, false, timestamp, id, superVisor.getCurrentSnapshotToken(), request.getObserver());
for (ActorRef neighbor : request.getAnswers()) { for (ActorRef neighbor : request.getAnswers()) {
neighbor.tell(acdct, self); neighbor.tell(acdct, self);
} }
......
...@@ -78,5 +78,6 @@ public class EvilWalletConfiguration extends AbstractConfiguration { ...@@ -78,5 +78,6 @@ public class EvilWalletConfiguration extends AbstractConfiguration {
@Override @Override
public void onReceive(Object message) { public void onReceive(Object message) {
super.onReceive(message); super.onReceive(message);
System.out.println(message);
} }
} }
package fucoin.configurations; package fucoin.configurations;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.dispatch.Futures;
import akka.dispatch.OnSuccess; import akka.dispatch.OnSuccess;
import akka.pattern.Patterns; import akka.pattern.Patterns;
import fucoin.Snapshot;
import fucoin.actions.control.ActionWalletSendMoney; import fucoin.actions.control.ActionWalletSendMoney;
import fucoin.actions.snap.SnapAction;
import fucoin.actions.transaction.ActionGetAmount; import fucoin.actions.transaction.ActionGetAmount;
import fucoin.actions.transaction.ActionGetAmountAnswer; import fucoin.actions.transaction.ActionGetAmountAnswer;
import fucoin.actions.transaction.ActionNotifyObserver;
import fucoin.configurations.internal.ConfigurationName; import fucoin.configurations.internal.ConfigurationName;
import fucoin.supervisor.SuperVisorImpl;
import fucoin.supervisor.SuperVisorLogImpl; import fucoin.supervisor.SuperVisorLogImpl;
import org.apache.commons.math3.distribution.EnumeratedDistribution; import org.apache.commons.math3.distribution.EnumeratedDistribution;
import org.apache.commons.math3.util.Pair; import org.apache.commons.math3.util.Pair;
import scala.Array; import scala.Function1;
import scala.concurrent.Await; import scala.concurrent.Await;
import scala.concurrent.Future; import scala.concurrent.Future;
import scala.concurrent.Promise;
import scala.runtime.BoxedUnit;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
...@@ -21,7 +26,6 @@ import java.util.List; ...@@ -21,7 +26,6 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import org.apache.batik.anim.timing.WallclockTimingSpecifier;
/** /**
* This configuration spawns 200 wallets to demonstrate the spawning of many headless wallets * This configuration spawns 200 wallets to demonstrate the spawning of many headless wallets
...@@ -34,7 +38,7 @@ class WalletSimulation { ...@@ -34,7 +38,7 @@ class WalletSimulation {
private double adaption_factor; private double adaption_factor;
private String name; private String name;
private Map<String, Double> p_receiver; private Map<String, Double> factorReceiver;
public WalletSimulation(String name, public WalletSimulation(String name,
List<String> walletNames, List<String> walletNames,
...@@ -43,15 +47,16 @@ class WalletSimulation { ...@@ -43,15 +47,16 @@ class WalletSimulation {
this.min_value = min_value; this.min_value = min_value;
this.max_value = max_value; this.max_value = max_value;
this.adaption_factor = adaption_factor; this.adaption_factor = adaption_factor;
this.name = name;
int n = walletNames.size(); int n = walletNames.size();
this.p_receiver = new HashMap<String, Double>(); this.factorReceiver = new HashMap<String, Double>();
for (String walletName : walletNames) { for (String walletName : walletNames) {
if (walletName.equals(this.name)) { if (walletName.equals(this.name)) {
continue; continue;
} }
this.p_receiver.put(walletName, 1./ (n - 1.)); this.factorReceiver.put(walletName, 1.);
} }
} }
...@@ -63,43 +68,71 @@ class WalletSimulation { ...@@ -63,43 +68,71 @@ class WalletSimulation {
return name; return name;
} }
public int sampleValue() { public int sampleValue(int currentAmount) {
double random = Math.random() * (this.max_value - this.min_value) + this.min_value; double random = Math.random() * (this.max_value - this.min_value) + this.min_value;
return (int) Math.round(random); int value = (int) Math.round(currentAmount * random);
assert value <= currentAmount;
return value;
}
private double total_factor() {
double sum = 0;
for (double value : this.factorReceiver.values()) {
sum += value;
}
return sum;
} }
public String sampleReceiver() { public String sampleReceiver() {
double normalization = total_factor();
List<Pair<String, Double>> distribution = new ArrayList<Pair<String, Double>>(); List<Pair<String, Double>> distribution = new ArrayList<Pair<String, Double>>();
for (String walletName : this.p_receiver.keySet()) { for (String walletName : this.factorReceiver.keySet()) {
// distribution.add( double prob = this.factorReceiver.get(walletName) / normalization;
// new Pair(walletName, this.p_receiver.get(this.p_receiver) distribution.add(new Pair(walletName, prob));
// );
} }
return new EnumeratedDistribution<>(distribution).sample(); return new EnumeratedDistribution<>(distribution).sample();
} }
public void updateProbabily(String walletName, double amountGotten) { public void updateProbabily(String walletName, int amountGotten) {
// this.adaption_factor double currentFactor = this.factorReceiver.get(walletName);
//adapt the probability for the specified wallet by considering the amount gotten and the this.factorReceiver.put(walletName, currentFactor + amountGotten * this.adaption_factor);
//adaptation factor
} }
/// TODO: adapted probabilites and normalize to 1.
} }
@ConfigurationName("GGBWallet") @ConfigurationName("GGBWallet")
public class GGBWalletConfiguration extends AbstractConfiguration { public class GGBWalletConfiguration extends AbstractConfiguration {
protected static int n_generous = 1; private static int n_generous = 1;
protected static int n_greedy = 1; private static int n_greedy = 1;
protected static int n_balanced = 1; private static int n_balanced = 1;
protected static double n_steps = 1000; private static double n_steps = 40000;
protected static String log_filename = "log.json"; private static String log_filename = "log.json";
private Map<String, WalletSimulation> simulations; private Map<String, WalletSimulation> simulations;
private HashMap<String,ActorRef> walletMap; private HashMap<String,ActorRef> walletMap;
private static final double GENEROUS_PROBABILITY = 0.02;
private static final double GENEROUS_MIN = 0.2;
private static final double GENEROUS_MAX = 0.8;
private static final double GENEROUS_ADAPTION = 0.1;
private static final double GREEDY_PROBABILITY = 0.005;
private static final double GREEDY_MIN = 0.05;
private static final double GREEDY_MAX = 0.5;
private static final double GREEDY_ADAPTION = 0.01;
private static final double BALANCED_PROBABILITY = 0.005;
private static final double BALANCED_MIN = 0.05;
private static final double BALANCED_MAX = 0.5;
private static final double BALANCED_ADAPTION = 0.01;
private int currentStep = 0;
private Promise<Void> simulationFinished = Futures.promise();
private double start;
@Override @Override
public void run() { public void run() {
simulations = new HashMap<>(); simulations = new HashMap<>();
ActorRef supervisor = initSupervisor(); initSupervisor();
List<String> generousWallets = new ArrayList<>(); List<String> generousWallets = new ArrayList<>();
List<String> greedyWallets = new ArrayList<>(); List<String> greedyWallets = new ArrayList<>();
...@@ -107,17 +140,17 @@ public class GGBWalletConfiguration extends AbstractConfiguration { ...@@ -107,17 +140,17 @@ public class GGBWalletConfiguration extends AbstractConfiguration {
walletMap = new HashMap<>(); walletMap = new HashMap<>();
try { try {
for (int i = 0; i <= n_generous; i++) { for (int i = 0; i < n_generous; i++) {
String name = "generous_" + i; String name = "generous_" + i;
walletMap.put(name, spawnWallet(name, false)); walletMap.put(name, spawnWallet(name, false));
generousWallets.add(name); generousWallets.add(name);
} }
for (int i = 0; i <= n_greedy; i++) { for (int i = 0; i < n_greedy; i++) {
String name = "greedy_" + i; String name = "greedy_" + i;
walletMap.put(name, spawnWallet(name, false)); walletMap.put(name, spawnWallet(name, false));
greedyWallets.add(name); greedyWallets.add(name);
} }
for (int i = 0; i <= n_balanced; i++) { for (int i = 0; i < n_balanced; i++) {
String name = "balanced_" + i; String name = "balanced_" + i;
walletMap.put(name, spawnWallet(name, false)); walletMap.put(name, spawnWallet(name, false));
balancedWallets.add(name); balancedWallets.add(name);
...@@ -126,11 +159,17 @@ public class GGBWalletConfiguration extends AbstractConfiguration { ...@@ -126,11 +159,17 @@ public class GGBWalletConfiguration extends AbstractConfiguration {
} catch (Exception e) { } catch (Exception e) {
System.out.println("Wallet spawning timed out!"); System.out.println("Wallet spawning timed out!");
} }
for (String name : walletMap.keySet()) {
this.addKnownNeighbor(name, walletMap.get(name));
}
List<String> allWalletNames = Stream.concat(generousWallets.stream(), List<String> allWalletNames = Stream.concat(generousWallets.stream(),
greedyWallets.stream()).collect(Collectors.toList()); greedyWallets.stream()).collect(Collectors.toList());
allWalletNames = Stream.concat(allWalletNames.stream(),
balancedWallets.stream()).collect(Collectors.toList());
for (String name : generousWallets) { for (String name : generousWallets) {
simulations.put(name, new WalletSimulation(name, allWalletNames, 0.02, 20, 80, 0.1)); simulations.put(name, new WalletSimulation(name, allWalletNames, GENEROUS_PROBABILITY,
GENEROUS_MIN, GENEROUS_MAX, GENEROUS_ADAPTION));
// initialize WalletSimulation // initialize WalletSimulation
// Money Probability Factor // Money Probability Factor
// greedy 20% 0.5% 1 % // greedy 20% 0.5% 1 %
...@@ -138,40 +177,70 @@ public class GGBWalletConfiguration extends AbstractConfiguration { ...@@ -138,40 +177,70 @@ public class GGBWalletConfiguration extends AbstractConfiguration {
// balanced 50% 1% 5 % // balanced 50% 1% 5 %
} }
for (String name : greedyWallets) { for (String name : greedyWallets) {
simulations.put(name, new WalletSimulation(name, allWalletNames, 0.005, 5, 50, 0.01)); simulations.put(name, new WalletSimulation(name, allWalletNames, GREEDY_PROBABILITY,
GREEDY_MIN, GREEDY_MAX, GREEDY_ADAPTION));
} }
for (String name : balancedWallets) { for (String name : balancedWallets) {
simulations.put(name, new WalletSimulation(name, allWalletNames, 0.01, 10, 65, 0.05)); simulations.put(name, new WalletSimulation(name, allWalletNames, BALANCED_PROBABILITY,
BALANCED_MIN, BALANCED_MAX, BALANCED_ADAPTION));
} }
double start = System.currentTimeMillis() / 1000.; start = System.currentTimeMillis() / 1000.;
this.nextTransaction();
for (int i = 0; i < n_steps; i++) { simulationFinished.future().onSuccess(new OnSuccess<Void>() {
this.step(); @Override
break; public void onSuccess(Void result) {
// You can start your algorithm here if you want to.
// Alternatively, you can also notify the user that all transactions are finished
System.out.println("All random transactions finished!");
} }
}, context().dispatcher());
} }
private void step() { private void nextTransaction() {
for(;currentStep < n_steps; currentStep++) {
for (WalletSimulation currentWallet : simulations.values()) { for (WalletSimulation currentWallet : simulations.values()) {
if(currentWallet.willSend()){ if(currentWallet.willSend()){
String receiverName = currentWallet.sampleReceiver(); String receiverName = currentWallet.sampleReceiver();
String senderName = currentWallet.getName(); String senderName = currentWallet.getName();
int value = currentWallet.sampleValue();
ActorRef senderWallet = walletMap.get(senderName); ActorRef senderWallet = walletMap.get(senderName);
ActorRef receiverWallet = walletMap.get(receiverName); ActorRef receiverWallet = walletMap.get(receiverName);
int value = currentWallet.sampleValue(getMoney(senderWallet));
sendMoney(value, receiverWallet, senderWallet); sendMoney(value, receiverWallet, senderWallet);
WalletSimulation receiverSimulation = simulations.get(receiverName); WalletSimulation receiverSimulation = simulations.get(receiverName);
receiverSimulation.updateProbabily(currentWallet.getName(), value); receiverSimulation.updateProbabily(currentWallet.getName(), value);
this.makeSnapshot().future().onSuccess(
new OnSuccess<Snapshot>() {
@Override
public void onSuccess(Snapshot snap) throws Throwable {
System.out.println("SNAPSHOT: " + snap);
}
} }
, context().dispatcher());
return;
} }
} }
}
}
private void sendMoney(int transferAmount,ActorRef receiverWallet, ActorRef senderWallet) { private void sendMoney(int transferAmount,ActorRef receiverWallet, ActorRef senderWallet) {
System.out.println(receiverWallet.path().name()); System.out.println(receiverWallet.path().name());
senderWallet.tell(new ActionWalletSendMoney(receiverWallet.path().name(), transferAmount, self()), self()); senderWallet.tell(new ActionWalletSendMoney(receiverWallet.path().name(), transferAmount, self()), self());
} }
private int getMoney(ActorRef sender) {
Future<Object> future = Patterns.ask(sender, new ActionGetAmount(), timeout);
try {
ActionGetAmountAnswer answer = (ActionGetAmountAnswer) Await.result(future, timeout.duration());
return answer.amount;
} catch (Exception e) {
e.printStackTrace();
}
return 0;
}
@Override @Override
ActorRef initSupervisor() { ActorRef initSupervisor() {
superVisor = context().actorOf( superVisor = context().actorOf(
...@@ -185,9 +254,31 @@ public class GGBWalletConfiguration extends AbstractConfiguration { ...@@ -185,9 +254,31 @@ public class GGBWalletConfiguration extends AbstractConfiguration {
return superVisor; return superVisor;
} }
@Override @Override
public void onReceive(Object message) { public void onReceive(Object message) {
super.onReceive(message); if (message instanceof ActionNotifyObserver) {
System.out.println(message); ActionNotifyObserver notification = (ActionNotifyObserver) message;
String status = "successful";
if (!notification.granted) {
status = "failed";
}
System.out.println("Observed a " + status + " transaction of " + notification.amount + " FUCs from " +
notification.source.path().name() + " to " + notification.target.path().name());
if (currentStep < n_steps) {
nextTransaction();
} else {
if (simulationFinished != null) {
simulationFinished.success(null);
simulationFinished = null;
}
}
}
if (message instanceof SnapAction) {
((SnapAction) message).doAction(this);
}
} }
} }
...@@ -13,13 +13,13 @@ public class MassWalletConfiguration extends AbstractConfiguration { ...@@ -13,13 +13,13 @@ public class MassWalletConfiguration extends AbstractConfiguration {
public void run() { public void run() {
initSupervisor(); initSupervisor();
try { try {
spawnWallets(2, false); spawnWallets(100, false);
System.out.println("Wallet spawning done!"); System.out.println("Wallet spawning done!");
} catch (Exception e) { } catch (Exception e) {
System.out.println("Wallet spawning timed out!"); System.out.println("Wallet spawning timed out!");
} }
randomTransactions(5, 2).onSuccess(new OnSuccess<Void>() { randomTransactions(5000, 200).onSuccess(new OnSuccess<Void>() {
@Override @Override
public void onSuccess(Void result) { public void onSuccess(Void result) {
// You can start your algorithm here if you want to. // You can start your algorithm here if you want to.
...@@ -33,4 +33,6 @@ public class MassWalletConfiguration extends AbstractConfiguration { ...@@ -33,4 +33,6 @@ public class MassWalletConfiguration extends AbstractConfiguration {
public void onReceive(Object message) { public void onReceive(Object message) {
super.onReceive(message); super.onReceive(message);
} }
} }
...@@ -37,13 +37,14 @@ public class NetworkInterfaceReader { ...@@ -37,13 +37,14 @@ public class NetworkInterfaceReader {
* As a fallback, we return 127.0.0.1, i.e. localhost * As a fallback, we return 127.0.0.1, i.e. localhost
*/ */
public static String readDefaultHostname() { public static String readDefaultHostname() {
String hostname = "127.0.0.1"; String hostname = "192.168.43.160";
try { try {
hostname = InetAddress.getLocalHost().getHostAddress(); hostname = InetAddress.getLocalHost().getHostAddress();
} catch (UnknownHostException e) { } catch (UnknownHostException e) {
e.printStackTrace(); e.printStackTrace();
} }
hostname = "127.0.0.1";
return hostname; return hostname;
} }
} }
...@@ -2,6 +2,7 @@ package fucoin.supervisor; ...@@ -2,6 +2,7 @@ package fucoin.supervisor;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.actor.UntypedActorContext; import akka.actor.UntypedActorContext;
import fucoin.SnapshotToken;
import fucoin.actions.transaction.Transaction; import fucoin.actions.transaction.Transaction;
import fucoin.wallet.AbstractWallet; import fucoin.wallet.AbstractWallet;
...@@ -19,17 +20,19 @@ public class DistributedCommittedTransferRequest extends Transaction { ...@@ -19,17 +20,19 @@ public class DistributedCommittedTransferRequest extends Transaction {
private long timeout; private long timeout;
private long id; private long id;
private final SnapshotToken snapshotToken;
private List<ActorRef> answers = new LinkedList<>(); private List<ActorRef> answers = new LinkedList<>();
public DistributedCommittedTransferRequest(ActorRef source, ActorRef target, int amount, public DistributedCommittedTransferRequest(ActorRef source, ActorRef target, int amount,
long timeout, ActorRef observer) { long timeout, SnapshotToken snapshotToken, ActorRef observer) {
this.source = source; this.source = source;
this.target = target; this.target = target;
this.amount = amount; this.amount = amount;
this.timeout = timeout; this.timeout = timeout;
this.id = random.nextLong(); this.id = random.nextLong();
this.snapshotToken = snapshotToken;
this.observer = observer; this.observer = observer;
} }
...@@ -69,7 +72,12 @@ public class DistributedCommittedTransferRequest extends Transaction { ...@@ -69,7 +72,12 @@ public class DistributedCommittedTransferRequest extends Transaction {
return amount; return amount;
} }
public SnapshotToken getSnapshotToken() {
return snapshotToken;
}
public ActorRef getObserver(){ public ActorRef getObserver(){
return observer; return observer;
} }
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment