Skip to content
Snippets Groups Projects
Commit 1c413a97 authored by Simon Könnecke's avatar Simon Könnecke
Browse files

moved SuperVisorAction class, create statistic class and added to wallet, add...

moved SuperVisorAction class, create statistic class and added to wallet, add snapshot (as json files), create new configuration called Statistics, automated communication between wallets over StatisticsWalletConfiguration
parent e9ea7f29
Branches
No related tags found
No related merge requests found
Showing
with 307 additions and 19 deletions
...@@ -3,3 +3,4 @@ ...@@ -3,3 +3,4 @@
.idea/ .idea/
*.iml *.iml
.DS_Store .DS_Store
snapshots
\ No newline at end of file
...@@ -56,5 +56,12 @@ ...@@ -56,5 +56,12 @@
<artifactId>reflections</artifactId> <artifactId>reflections</artifactId>
<version>0.9.10</version> <version>0.9.10</version>
</dependency> </dependency>
<!-- https://mvnrepository.com/artifact/com.google.code.gson/gson -->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.7</version>
</dependency>
</dependencies> </dependencies>
</project> </project>
\ No newline at end of file
...@@ -3,10 +3,18 @@ package fucoin; ...@@ -3,10 +3,18 @@ package fucoin;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.actor.Address; import akka.actor.Address;
import akka.actor.UntypedActor; import akka.actor.UntypedActor;
import com.google.gson.ExclusionStrategy;
import com.google.gson.FieldAttributes;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import fucoin.actions.transaction.ActionGetAmount; import fucoin.actions.transaction.ActionGetAmount;
import fucoin.wallet.AbstractWallet; import fucoin.wallet.AbstractWallet;
import java.io.File;
import java.io.FileOutputStream;
import java.io.OutputStreamWriter;
import java.io.Serializable; import java.io.Serializable;
import java.time.LocalTime;
import java.util.HashMap; import java.util.HashMap;
public abstract class AbstractNode extends UntypedActor implements Serializable { public abstract class AbstractNode extends UntypedActor implements Serializable {
...@@ -68,4 +76,28 @@ public abstract class AbstractNode extends UntypedActor implements Serializable ...@@ -68,4 +76,28 @@ public abstract class AbstractNode extends UntypedActor implements Serializable
public HashMap<String, ActorRef> getKnownNeighbors() { public HashMap<String, ActorRef> getKnownNeighbors() {
return knownNeighbors; return knownNeighbors;
} }
public void createDump(String name, String content, LocalTime time) {
String filename = "snapshots/Dump_" + time.getNano() + "_" + name + ".json";
try {
File theDir = new File("snapshots");
// if the directory does not exist, create it
if (!theDir.exists()) {
theDir.mkdir();
}
File myFile = new File(filename);
myFile.createNewFile();
FileOutputStream fOut = new FileOutputStream(myFile);
OutputStreamWriter myOutWriter = new OutputStreamWriter(fOut);
myOutWriter.append(content);
myOutWriter.close();
fOut.close();
} catch (Exception e) {
e.printStackTrace();
}
} }
}
package fucoin.actions.transaction; package fucoin.actions;
import fucoin.actions.Action; import fucoin.actions.Action;
import fucoin.supervisor.SuperVisorImpl; import fucoin.supervisor.SuperVisorImpl;
......
...@@ -2,7 +2,7 @@ package fucoin.actions.control; ...@@ -2,7 +2,7 @@ package fucoin.actions.control;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.actor.UntypedActorContext; import akka.actor.UntypedActorContext;
import fucoin.actions.transaction.SuperVisorAction; import fucoin.actions.SuperVisorAction;
import fucoin.supervisor.SuperVisorImpl; import fucoin.supervisor.SuperVisorImpl;
/** /**
......
package fucoin.actions.control;
import akka.actor.ActorRef;
import akka.actor.UntypedActorContext;
import fucoin.actions.SuperVisorAction;
import fucoin.supervisor.SuperVisorImpl;
import java.time.LocalTime;
public class ActionCreateSuperVisorSnapshot extends SuperVisorAction {
/**
* Timestamp for the file name
*/
private final LocalTime time;
public ActionCreateSuperVisorSnapshot(LocalTime time) {
this.time = time;
}
@Override
protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, SuperVisorImpl abstractNode) {
abstractNode.createDump(time);
}
}
package fucoin.actions.control;
import akka.actor.ActorRef;
import akka.actor.UntypedActorContext;
import com.google.gson.Gson;
import fucoin.actions.ClientAction;
import fucoin.wallet.AbstractWallet;
import java.io.File;
import java.io.FileOutputStream;
import java.io.OutputStreamWriter;
import java.time.LocalTime;
public class ActionCreateWalletSnapshot extends ClientAction {
/**
* Timestamp for the file name
*/
private final LocalTime time;
public ActionCreateWalletSnapshot(LocalTime time) {
this.time = time;
}
@Override
protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, AbstractWallet abstractNode) {
abstractNode.createDump(time);
}
}
...@@ -3,7 +3,7 @@ package fucoin.actions.join; ...@@ -3,7 +3,7 @@ package fucoin.actions.join;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.actor.UntypedActorContext; import akka.actor.UntypedActorContext;
import fucoin.actions.transaction.ActionInvokeDistributedCommittedTransfer; import fucoin.actions.transaction.ActionInvokeDistributedCommittedTransfer;
import fucoin.actions.transaction.SuperVisorAction; import fucoin.actions.SuperVisorAction;
import fucoin.supervisor.SuperVisorImpl; import fucoin.supervisor.SuperVisorImpl;
/** /**
......
package fucoin.actions.statistics;
import akka.actor.ActorRef;
import akka.actor.UntypedActorContext;
import fucoin.actions.ClientAction;
import fucoin.wallet.AbstractWallet;
import fucoin.wallet.WalletStatistics;
/**
* Wallet A sends a Wallet B current Amount and statistic's.
* Wallet B updates his statistic's.
* @see WalletStatistics
*/
public class ActionInterchangeState extends ClientAction {
private final int currentAmount;
public ActionInterchangeState(int currentAmount) {
this.currentAmount = currentAmount;
}
@Override
protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, AbstractWallet abstractNode) {
abstractNode.getStatistics().update(currentAmount);
}
}
...@@ -2,6 +2,7 @@ package fucoin.actions.transaction; ...@@ -2,6 +2,7 @@ package fucoin.actions.transaction;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.actor.UntypedActorContext; import akka.actor.UntypedActorContext;
import fucoin.actions.SuperVisorAction;
import fucoin.supervisor.SuperVisorImpl; import fucoin.supervisor.SuperVisorImpl;
public abstract class CoordinatorTransaction extends SuperVisorAction { public abstract class CoordinatorTransaction extends SuperVisorAction {
......
...@@ -28,11 +28,11 @@ import java.util.concurrent.ThreadLocalRandom; ...@@ -28,11 +28,11 @@ import java.util.concurrent.ThreadLocalRandom;
*/ */
public abstract class AbstractConfiguration extends AbstractNode { public abstract class AbstractConfiguration extends AbstractNode {
private ActorRef superVisor; protected ActorRef superVisor;
private final List<ActorRef> activeActors = new ArrayList<>(); protected final List<ActorRef> activeActors = new ArrayList<>();
private Timeout timeout = new Timeout(Duration.create(10, "seconds")); protected Timeout timeout = new Timeout(Duration.create(10, "seconds"));
private int remainingTransactions; private int remainingTransactions;
......
...@@ -11,7 +11,7 @@ public class MassWalletConfiguration extends AbstractConfiguration { ...@@ -11,7 +11,7 @@ public class MassWalletConfiguration extends AbstractConfiguration {
public void run() { public void run() {
initSupervisor(); initSupervisor();
try { try {
spawnWallets(200, false); spawnWallets(20, false);
System.out.println("Wallet spawning done!"); System.out.println("Wallet spawning done!");
} catch (Exception e) { } catch (Exception e) {
System.out.println("Wallet spawning timed out!"); System.out.println("Wallet spawning timed out!");
......
package fucoin.configurations;
import akka.actor.ActorRef;
import akka.pattern.Patterns;
import fucoin.actions.control.ActionCreateSuperVisorSnapshot;
import fucoin.actions.control.ActionCreateWalletSnapshot;
import fucoin.actions.statistics.ActionInterchangeState;
import fucoin.actions.transaction.ActionGetAmount;
import fucoin.actions.transaction.ActionGetAmountAnswer;
import fucoin.configurations.internal.ConfigurationName;
import scala.concurrent.Await;
import scala.concurrent.Future;
import java.time.LocalTime;
import java.util.Collections;
import java.util.List;
/**
* This configuration spawns 200 wallets to demonstrate the spawning of many headless wallets
*/
@ConfigurationName("Statistics")
public class StatisticsWalletConfiguration extends AbstractConfiguration {
/**
* Spawn the number of wallets.
*/
private final static int sNumberOfWallets = 10;
private int numberOfInterchangeStatistic = 20;
@Override
public void run() {
initSupervisor();
try {
spawnWallets(sNumberOfWallets, false);
System.out.println("Wallet spawning done!");
} catch (Exception e) {
System.out.println("Wallet spawning timed out!");
}
randomTransactions(10, 10);
try {
//Busy wait
while (activeActors.size() < sNumberOfWallets) {
}
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("=========== Stats runs");
interchangeStatistic();
} catch (Exception e) {
e.printStackTrace();
}
}
private void interchangeStatistic() throws Exception {
if (!(numberOfInterchangeStatistic > 0)) {
//stop interchange statistics
return;
}
List<ActorRef> wallets = wallets();
Collections.shuffle(wallets);
ActorRef sender = wallets.get(0);
ActorRef recipient = wallets.get(1);
Future<Object> future = Patterns.ask(sender, new ActionGetAmount(), timeout);
ActionGetAmountAnswer answer = (ActionGetAmountAnswer) Await.result(future, timeout.duration());
recipient.tell(new ActionInterchangeState(answer.amount), sender);
numberOfInterchangeStatistic--;
if (numberOfInterchangeStatistic == 0) {
createDump();
} else {
interchangeStatistic();
}
}
/**
* Create a Dump of the SuperVisor and all active Wallets.
*/
private void createDump() {
System.out.println("Create Dumps");
LocalTime time = LocalTime.now();
superVisor.tell(new ActionCreateSuperVisorSnapshot(time), superVisor);
for (ActorRef wallet: activeActors) {
wallet.tell(new ActionCreateWalletSnapshot(time), wallet);
}
}
@Override
public void onReceive(Object message) {
super.onReceive(message);
}
}
package fucoin.gui; package fucoin.gui;
import fucoin.supervisor.AmountTableModel;
public interface SuperVisorGuiControl extends TransactionLogger { public interface SuperVisorGuiControl extends TransactionLogger {
/** /**
...@@ -7,6 +9,8 @@ public interface SuperVisorGuiControl extends TransactionLogger { ...@@ -7,6 +9,8 @@ public interface SuperVisorGuiControl extends TransactionLogger {
*/ */
void onLeave(); void onLeave();
public void updateTable(String address, String name, int amount); void updateTable(String address, String name, int amount);
AmountTableModel getTable();
} }
...@@ -48,6 +48,11 @@ public class SuperVisorGuiControlImpl implements SuperVisorGuiControl { ...@@ -48,6 +48,11 @@ public class SuperVisorGuiControlImpl implements SuperVisorGuiControl {
this.amountTableModel.updateTable(address, name, amount); this.amountTableModel.updateTable(address, name, amount);
} }
@Override
public AmountTableModel getTable() {
return this.amountTableModel;
}
private void log(LogMessage logMessage) { private void log(LogMessage logMessage) {
if (logActive) { if (logActive) {
threadGUI.log(logMessage); threadGUI.log(logMessage);
......
...@@ -3,7 +3,7 @@ package fucoin.supervisor; ...@@ -3,7 +3,7 @@ package fucoin.supervisor;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.actor.UntypedActorContext; import akka.actor.UntypedActorContext;
import fucoin.actions.transaction.ActionCommitDistributedCommittedTransfer; import fucoin.actions.transaction.ActionCommitDistributedCommittedTransfer;
import fucoin.actions.transaction.SuperVisorAction; import fucoin.actions.SuperVisorAction;
import java.util.List; import java.util.List;
......
package fucoin.supervisor; package fucoin.supervisor;
import javax.swing.*;
import javax.swing.table.DefaultTableModel; import javax.swing.table.DefaultTableModel;
import java.io.Serializable;
import java.util.Vector; import java.util.Vector;
public class AmountTableModel extends DefaultTableModel { public class AmountTableModel extends DefaultTableModel implements Serializable {
public AmountTableModel() { public AmountTableModel() {
super(new Object[]{"Address", "Name", "Amount"}, 0); super(new Object[]{"Address", "Name", "Amount"}, 0);
......
...@@ -2,17 +2,19 @@ package fucoin.supervisor; ...@@ -2,17 +2,19 @@ package fucoin.supervisor;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.actor.Props; import akka.actor.Props;
import com.google.gson.Gson;
import fucoin.actions.Action; import fucoin.actions.Action;
import fucoin.actions.control.ActionWalletCreationDone; import fucoin.actions.control.ActionWalletCreationDone;
import fucoin.actions.persist.ActionInvokeUpdate; import fucoin.actions.persist.ActionInvokeUpdate;
import fucoin.actions.transaction.ActionGetAmountAnswer; import fucoin.actions.transaction.ActionGetAmountAnswer;
import fucoin.actions.transaction.ActionNotifyObserver; import fucoin.actions.transaction.ActionNotifyObserver;
import fucoin.actions.transaction.SuperVisorAction; import fucoin.actions.SuperVisorAction;
import fucoin.gui.SuperVisorGuiControl; import fucoin.gui.SuperVisorGuiControl;
import fucoin.AbstractNode; import fucoin.AbstractNode;
import fucoin.gui.TransactionLogger; import fucoin.gui.TransactionLogger;
import javax.swing.*; import javax.swing.*;
import java.time.LocalTime;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
...@@ -23,12 +25,12 @@ public class SuperVisorImpl extends AbstractNode implements TransactionLogger { ...@@ -23,12 +25,12 @@ public class SuperVisorImpl extends AbstractNode implements TransactionLogger {
//private AmountTableModel amountTableModel; //private AmountTableModel amountTableModel;
private Map<Long, DistributedCommittedTransferRequest> requestQueue; private transient Map<Long, DistributedCommittedTransferRequest> requestQueue;
private SuperVisorGuiControl gui; private transient SuperVisorGuiControl gui;
private int pendingBankCommits = 0; private int pendingBankCommits = 0;
private ActorRef bankCommitObserver = null; private transient ActorRef bankCommitObserver = null;
public SuperVisorImpl() { public SuperVisorImpl() {
...@@ -167,4 +169,9 @@ public class SuperVisorImpl extends AbstractNode implements TransactionLogger { ...@@ -167,4 +169,9 @@ public class SuperVisorImpl extends AbstractNode implements TransactionLogger {
public void setBankCommitObserver(ActorRef bankCommitObserver) { public void setBankCommitObserver(ActorRef bankCommitObserver) {
this.bankCommitObserver = bankCommitObserver; this.bankCommitObserver = bankCommitObserver;
} }
public void createDump(LocalTime time) {
String str = new Gson().toJson(this.gui.getTable());
super.createDump("SuperVisor", str, time);
}
} }
...@@ -6,12 +6,17 @@ import fucoin.gui.TransactionLogger; ...@@ -6,12 +6,17 @@ import fucoin.gui.TransactionLogger;
import scala.concurrent.Future; import scala.concurrent.Future;
import java.io.Serializable; import java.io.Serializable;
import java.time.LocalTime;
/** /**
* *
*/ */
public abstract class AbstractWallet extends AbstractNode implements Serializable, TransactionLogger { public abstract class AbstractWallet extends AbstractNode implements Serializable, TransactionLogger {
/**
*
*/
protected WalletStatistics statistics = new WalletStatistics();
/** /**
* Currently amount of this wallet * Currently amount of this wallet
*/ */
...@@ -77,6 +82,14 @@ public abstract class AbstractWallet extends AbstractNode implements Serializabl ...@@ -77,6 +82,14 @@ public abstract class AbstractWallet extends AbstractNode implements Serializabl
*/ */
public abstract void setActive(boolean isActive); public abstract void setActive(boolean isActive);
public WalletStatistics getStatistics() {
return statistics;
}
public void setStatistics(WalletStatistics statistics) {
this.statistics = statistics;
}
/** /**
* Returns the * Returns the
* *
...@@ -114,4 +127,6 @@ public abstract class AbstractWallet extends AbstractNode implements Serializabl ...@@ -114,4 +127,6 @@ public abstract class AbstractWallet extends AbstractNode implements Serializabl
* @param observer * @param observer
*/ */
public abstract void send(String address, int amount, ActorRef observer); public abstract void send(String address, int amount, ActorRef observer);
public abstract void createDump(LocalTime time);
} }
...@@ -2,6 +2,10 @@ package fucoin.wallet; ...@@ -2,6 +2,10 @@ package fucoin.wallet;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.actor.Props; import akka.actor.Props;
import com.google.gson.ExclusionStrategy;
import com.google.gson.FieldAttributes;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import fucoin.actions.ClientAction; import fucoin.actions.ClientAction;
import fucoin.actions.join.ActionJoin; import fucoin.actions.join.ActionJoin;
import fucoin.actions.join.ActionJoinAnswer; import fucoin.actions.join.ActionJoinAnswer;
...@@ -16,16 +20,17 @@ import scala.concurrent.Future; ...@@ -16,16 +20,17 @@ import scala.concurrent.Future;
import static akka.dispatch.Futures.future; import static akka.dispatch.Futures.future;
import java.time.LocalTime;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
public class WalletImpl extends AbstractWallet { public class WalletImpl extends AbstractWallet {
private ActorRef preKnownNeighbour; private transient ActorRef preKnownNeighbour;
private ActorRef remoteSuperVisorActor; private transient ActorRef remoteSuperVisorActor;
private transient WalletGuiControl gui; private transient WalletGuiControl gui;
private String preKnownNeighbourName; private String preKnownNeighbourName;
private boolean isActive; private boolean isActive;
private ConcurrentLinkedQueue<ActorRef> deferedSupervisorReceivers = new ConcurrentLinkedQueue<>(); private transient ConcurrentLinkedQueue<ActorRef> deferedSupervisorReceivers = new ConcurrentLinkedQueue<>();
public WalletImpl(String name) { public WalletImpl(String name) {
super(name); super(name);
...@@ -245,4 +250,32 @@ public class WalletImpl extends AbstractWallet { ...@@ -245,4 +250,32 @@ public class WalletImpl extends AbstractWallet {
System.out.println(message); System.out.println(message);
} }
} }
public void createDump(LocalTime time){
Gson gson = new GsonBuilder()
.setExclusionStrategies(new ExcludeAttributes())
//.serializeNulls() <-- uncomment to serialize NULL fields as well
.create();
String s = gson.toJson(this);
super.createDump(getName(), s, time);
}
class ExcludeAttributes implements ExclusionStrategy {
public boolean shouldSkipClass(Class<?> arg0) {
return false;
}
public boolean shouldSkipField(FieldAttributes f) {
return !(f.getName().equals("statistics") ||
f.getName().equals("min") ||
f.getName().equals("max") ||
f.getName().equals("sum") ||
f.getName().equals("count") ||
f.getName().equals("amount") ||
f.getName().equals("name"));
}
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment