Skip to content
Snippets Groups Projects
Commit a290d0a7 authored by Simon Könnecke's avatar Simon Könnecke
Browse files

stop supervisor exit the program, random transaction and InterchangeState

parent 643f21ad
Branches
No related tags found
No related merge requests found
...@@ -3,10 +3,6 @@ package fucoin; ...@@ -3,10 +3,6 @@ package fucoin;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.actor.Address; import akka.actor.Address;
import akka.actor.UntypedActor; import akka.actor.UntypedActor;
import com.google.gson.ExclusionStrategy;
import com.google.gson.FieldAttributes;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import fucoin.actions.transaction.ActionGetAmount; import fucoin.actions.transaction.ActionGetAmount;
import fucoin.wallet.AbstractWallet; import fucoin.wallet.AbstractWallet;
...@@ -14,7 +10,7 @@ import java.io.File; ...@@ -14,7 +10,7 @@ import java.io.File;
import java.io.FileOutputStream; import java.io.FileOutputStream;
import java.io.OutputStreamWriter; import java.io.OutputStreamWriter;
import java.io.Serializable; import java.io.Serializable;
import java.time.LocalTime; import java.time.LocalDateTime;
import java.util.HashMap; import java.util.HashMap;
public abstract class AbstractNode extends UntypedActor implements Serializable { public abstract class AbstractNode extends UntypedActor implements Serializable {
...@@ -77,16 +73,21 @@ public abstract class AbstractNode extends UntypedActor implements Serializable ...@@ -77,16 +73,21 @@ public abstract class AbstractNode extends UntypedActor implements Serializable
return knownNeighbors; return knownNeighbors;
} }
public void createDump(String name, String content, LocalTime time) { public void createDump(String name, String content, LocalDateTime time) {
String filename = "snapshots/Dump_" + time.getNano() + "_" + name + ".json"; String folder = time.getYear() + "-" + time.getMonthValue() + "-" + time.getDayOfMonth() + " " +
time.getHour() + "." + time.getMinute() + "." + time.getSecond();
String filename = "snapshots/" + folder + "/" + name + ".json";
try { try {
File theDir = new File("snapshots"); File theDir = new File("snapshots");
// if the directory does not exist, create it // if the directory does not exist, create it
if (!theDir.exists()) { if (!theDir.exists()) {
theDir.mkdir(); theDir.mkdir();
} }
theDir = new File("snapshots/" + folder);
if (!theDir.exists()) {
theDir.mkdir();
}
File myFile = new File(filename); File myFile = new File(filename);
myFile.createNewFile(); myFile.createNewFile();
......
...@@ -5,16 +5,21 @@ import akka.actor.UntypedActorContext; ...@@ -5,16 +5,21 @@ import akka.actor.UntypedActorContext;
import fucoin.actions.SuperVisorAction; import fucoin.actions.SuperVisorAction;
import fucoin.supervisor.SuperVisorImpl; import fucoin.supervisor.SuperVisorImpl;
import java.time.LocalDateTime;
import java.time.LocalTime; import java.time.LocalTime;
import java.util.Map;
/**
* SuperVisor create a snapshot by sending all Wallets a Snapshot Msg.
*/
public class ActionCreateSuperVisorSnapshot extends SuperVisorAction { public class ActionCreateSuperVisorSnapshot extends SuperVisorAction {
/** /**
* Timestamp for the file name * Timestamp for the file name
*/ */
private final LocalTime time; private final LocalDateTime time;
public ActionCreateSuperVisorSnapshot(LocalTime time) { public ActionCreateSuperVisorSnapshot(LocalDateTime time) {
this.time = time; this.time = time;
} }
...@@ -22,5 +27,9 @@ public class ActionCreateSuperVisorSnapshot extends SuperVisorAction { ...@@ -22,5 +27,9 @@ public class ActionCreateSuperVisorSnapshot extends SuperVisorAction {
@Override @Override
protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, SuperVisorImpl abstractNode) { protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, SuperVisorImpl abstractNode) {
abstractNode.createDump(time); abstractNode.createDump(time);
System.out.println("Create Dumps");
for (Map.Entry<String, ActorRef> wallet: abstractNode.getKnownNeighbors().entrySet()) {
wallet.getValue().tell(new ActionCreateWalletSnapshot(time), self);
}
} }
} }
...@@ -2,23 +2,19 @@ package fucoin.actions.control; ...@@ -2,23 +2,19 @@ package fucoin.actions.control;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.actor.UntypedActorContext; import akka.actor.UntypedActorContext;
import com.google.gson.Gson;
import fucoin.actions.ClientAction; import fucoin.actions.ClientAction;
import fucoin.wallet.AbstractWallet; import fucoin.wallet.AbstractWallet;
import java.io.File; import java.time.LocalDateTime;
import java.io.FileOutputStream;
import java.io.OutputStreamWriter;
import java.time.LocalTime;
public class ActionCreateWalletSnapshot extends ClientAction { public class ActionCreateWalletSnapshot extends ClientAction {
/** /**
* Timestamp for the file name * Timestamp for the file name
*/ */
private final LocalTime time; private final LocalDateTime time;
public ActionCreateWalletSnapshot(LocalTime time) { public ActionCreateWalletSnapshot(LocalDateTime time) {
this.time = time; this.time = time;
} }
......
package fucoin.actions.persist; package fucoin.actions.persist;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import akka.actor.UntypedActorContext; import akka.actor.UntypedActorContext;
import fucoin.wallet.AbstractWallet; import fucoin.wallet.AbstractWallet;
......
...@@ -3,7 +3,6 @@ package fucoin.configurations; ...@@ -3,7 +3,6 @@ package fucoin.configurations;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.pattern.Patterns; import akka.pattern.Patterns;
import fucoin.actions.control.ActionCreateSuperVisorSnapshot; import fucoin.actions.control.ActionCreateSuperVisorSnapshot;
import fucoin.actions.control.ActionCreateWalletSnapshot;
import fucoin.actions.statistics.ActionInterchangeState; import fucoin.actions.statistics.ActionInterchangeState;
import fucoin.actions.transaction.ActionGetAmount; import fucoin.actions.transaction.ActionGetAmount;
import fucoin.actions.transaction.ActionGetAmountAnswer; import fucoin.actions.transaction.ActionGetAmountAnswer;
...@@ -11,9 +10,10 @@ import fucoin.configurations.internal.ConfigurationName; ...@@ -11,9 +10,10 @@ import fucoin.configurations.internal.ConfigurationName;
import scala.concurrent.Await; import scala.concurrent.Await;
import scala.concurrent.Future; import scala.concurrent.Future;
import java.time.LocalTime; import java.time.LocalDateTime;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
/** /**
* This configuration spawns 200 wallets to demonstrate the spawning of many headless wallets * This configuration spawns 200 wallets to demonstrate the spawning of many headless wallets
...@@ -25,7 +25,7 @@ public class StatisticsWalletConfiguration extends AbstractConfiguration { ...@@ -25,7 +25,7 @@ public class StatisticsWalletConfiguration extends AbstractConfiguration {
*/ */
private final static int sNumberOfWallets = 10; private final static int sNumberOfWallets = 10;
private int numberOfInterchangeStatistic = 20; //private int numberOfInterchangeStatistic = 20;
@Override @Override
public void run() { public void run() {
...@@ -37,26 +37,37 @@ public class StatisticsWalletConfiguration extends AbstractConfiguration { ...@@ -37,26 +37,37 @@ public class StatisticsWalletConfiguration extends AbstractConfiguration {
System.out.println("Wallet spawning timed out!"); System.out.println("Wallet spawning timed out!");
} }
randomTransactions(10, 10); //Wait until init money was send to wallets
try { try {
//Busy wait Thread.sleep(1000);
while (activeActors.size() < sNumberOfWallets) { } catch (InterruptedException e) {
e.printStackTrace();
} }
//Send InterchangeState messages between wallets
new Thread(() -> {
System.out.println("Randomly interchange message process started.");
boolean toggle = true;
while (true) {
try { try {
Thread.sleep(4000); if (toggle) {
randomTransactions(wallets().size() / 4, 10);
} else {
interchangeStatistic(wallets().size() * 4);
}
toggle = !toggle;
Thread.sleep(ThreadLocalRandom.current().nextInt(10) * 1000);
} catch (InterruptedException e) { } catch (InterruptedException e) {
e.printStackTrace(); e.printStackTrace();
} }
System.out.println("=========== Stats runs");
interchangeStatistic();
} catch (Exception e) {
e.printStackTrace();
} }
}).run();
//Transfer random many between wallets
} }
private void interchangeStatistic() throws Exception { private void interchangeStatistic(int numberOfInterchangeStatistic) {
if (!(numberOfInterchangeStatistic > 0)) { if (!(numberOfInterchangeStatistic > 0)) {
//stop interchange statistics //stop interchange statistics
return; return;
...@@ -67,17 +78,20 @@ public class StatisticsWalletConfiguration extends AbstractConfiguration { ...@@ -67,17 +78,20 @@ public class StatisticsWalletConfiguration extends AbstractConfiguration {
ActorRef sender = wallets.get(0); ActorRef sender = wallets.get(0);
ActorRef recipient = wallets.get(1); ActorRef recipient = wallets.get(1);
try {
Future<Object> future = Patterns.ask(sender, new ActionGetAmount(), timeout); Future<Object> future = Patterns.ask(sender, new ActionGetAmount(), timeout);
ActionGetAmountAnswer answer = (ActionGetAmountAnswer) Await.result(future, timeout.duration()); ActionGetAmountAnswer answer = (ActionGetAmountAnswer) Await.result(future, timeout.duration());
recipient.tell(new ActionInterchangeState(answer.amount), sender); recipient.tell(new ActionInterchangeState(answer.amount), sender);
} catch (Exception e) {
e.printStackTrace();
}
numberOfInterchangeStatistic--; numberOfInterchangeStatistic--;
if (numberOfInterchangeStatistic == 0) { if (numberOfInterchangeStatistic == 0) {
createDump(); createDump();
} else { } else {
interchangeStatistic(); interchangeStatistic(numberOfInterchangeStatistic);
} }
} }
...@@ -85,12 +99,7 @@ public class StatisticsWalletConfiguration extends AbstractConfiguration { ...@@ -85,12 +99,7 @@ public class StatisticsWalletConfiguration extends AbstractConfiguration {
* Create a Dump of the SuperVisor and all active Wallets. * Create a Dump of the SuperVisor and all active Wallets.
*/ */
private void createDump() { private void createDump() {
System.out.println("Create Dumps"); superVisor.tell(new ActionCreateSuperVisorSnapshot(LocalDateTime.now()), self());
LocalTime time = LocalTime.now();
superVisor.tell(new ActionCreateSuperVisorSnapshot(time), superVisor);
for (ActorRef wallet: activeActors) {
wallet.tell(new ActionCreateWalletSnapshot(time), wallet);
}
} }
@Override @Override
......
package fucoin.gui; package fucoin.gui;
import akka.actor.ActorRef;
import fucoin.supervisor.AmountTableModel; import fucoin.supervisor.AmountTableModel;
import fucoin.supervisor.SuperVisorImpl; import fucoin.supervisor.SuperVisorImpl;
...@@ -10,6 +11,7 @@ import java.awt.*; ...@@ -10,6 +11,7 @@ import java.awt.*;
import java.awt.event.ItemEvent; import java.awt.event.ItemEvent;
import java.awt.event.WindowAdapter; import java.awt.event.WindowAdapter;
import java.awt.event.WindowEvent; import java.awt.event.WindowEvent;
import java.util.Map;
public class SuperVisorGuiControlImpl implements SuperVisorGuiControl { public class SuperVisorGuiControlImpl implements SuperVisorGuiControl {
private SuperVisorImpl superVisor; private SuperVisorImpl superVisor;
...@@ -93,4 +95,8 @@ public class SuperVisorGuiControlImpl implements SuperVisorGuiControl { ...@@ -93,4 +95,8 @@ public class SuperVisorGuiControlImpl implements SuperVisorGuiControl {
public boolean isLogActive() { public boolean isLogActive() {
return logActive; return logActive;
} }
public void createSnapshot() {
}
} }
package fucoin.supervisor; package fucoin.supervisor;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import akka.actor.Props; import akka.actor.Props;
import com.google.gson.Gson; import com.google.gson.Gson;
import fucoin.actions.Action; import fucoin.actions.Action;
import fucoin.actions.control.ActionWalletCreationDone; import fucoin.actions.control.ActionWalletCreationDone;
import fucoin.actions.persist.ActionInvokeLeave;
import fucoin.actions.persist.ActionInvokeUpdate; import fucoin.actions.persist.ActionInvokeUpdate;
import fucoin.actions.transaction.ActionGetAmountAnswer; import fucoin.actions.transaction.ActionGetAmountAnswer;
import fucoin.actions.transaction.ActionNotifyObserver; import fucoin.actions.transaction.ActionNotifyObserver;
...@@ -15,6 +17,7 @@ import fucoin.gui.TransactionLogger; ...@@ -15,6 +17,7 @@ import fucoin.gui.TransactionLogger;
import fucoin.wallet.WalletStatistics; import fucoin.wallet.WalletStatistics;
import javax.swing.*; import javax.swing.*;
import java.time.LocalDateTime;
import java.time.LocalTime; import java.time.LocalTime;
import java.util.*; import java.util.*;
import java.util.Map.Entry; import java.util.Map.Entry;
...@@ -73,10 +76,19 @@ public class SuperVisorImpl extends AbstractNode implements TransactionLogger { ...@@ -73,10 +76,19 @@ public class SuperVisorImpl extends AbstractNode implements TransactionLogger {
} }
public void exit() { public void exit() {
//Kill all Wallets
for(Map.Entry<String, ActorRef> wallet: getKnownNeighbors().entrySet()) {
wallet.getValue().tell(PoisonPill.getInstance(), self());
}
//Stop supervisor
getContext().stop(getSelf()); getContext().stop(getSelf());
if (gui != null) { if (gui != null) {
gui.onLeave(); gui.onLeave();
} }
System.exit(0);
} }
@Override @Override
...@@ -168,30 +180,41 @@ public class SuperVisorImpl extends AbstractNode implements TransactionLogger { ...@@ -168,30 +180,41 @@ public class SuperVisorImpl extends AbstractNode implements TransactionLogger {
this.bankCommitObserver = bankCommitObserver; this.bankCommitObserver = bankCommitObserver;
} }
public void createDump(LocalTime time) { public WalletStatistics getStatistics() {
Snapshot snapshot = new Snapshot(); WalletStatistics statistics = new WalletStatistics();
snapshot.amountTableModel = this.gui.getTable();
snapshot.statistics = new WalletStatistics();
int sum = 0; int sum = 0;
Vector rows = snapshot.amountTableModel.getDataVector(); Vector rows = gui.getTable().getDataVector();
for (int i = 0; i < rows.size(); i++) { for (int i = 0; i < rows.size(); i++) {
if (rows.get(i) instanceof Vector) { if (rows.get(i) instanceof Vector) {
Vector<Object> row = (Vector<Object>) rows.get(i); Vector<Object> row = (Vector<Object>) rows.get(i);
int amount = (Integer) row.get(2); int amount = (Integer) row.get(2);
snapshot.statistics.setMin(amount); statistics.setMin(amount);
snapshot.statistics.setMax(amount); statistics.setMax(amount);
sum += amount; sum += amount;
} }
} }
snapshot.statistics.setSum(sum); statistics.setSum(sum);
snapshot.statistics.setCount(rows.size()); statistics.setCount(rows.size());
snapshot.statistics.setAvg(sum / rows.size()); statistics.setAvg(sum / rows.size());
return statistics;
}
public void createDump(LocalDateTime time) {
Snapshot snapshot = new Snapshot();
snapshot.amountTableModel = this.gui.getTable();
snapshot.statistics = getStatistics();
String str = new Gson().toJson(snapshot); String str = new Gson().toJson(snapshot);
super.createDump("SuperVisor", str, time); super.createDump("SuperVisor", str, time);
} }
class Snapshot { class Snapshot {
public Snapshot() {
time = LocalDateTime.now();
}
WalletStatistics statistics; WalletStatistics statistics;
AmountTableModel amountTableModel; AmountTableModel amountTableModel;
LocalDateTime time;
} }
} }
...@@ -6,6 +6,7 @@ import fucoin.gui.TransactionLogger; ...@@ -6,6 +6,7 @@ import fucoin.gui.TransactionLogger;
import scala.concurrent.Future; import scala.concurrent.Future;
import java.io.Serializable; import java.io.Serializable;
import java.time.LocalDateTime;
import java.time.LocalTime; import java.time.LocalTime;
/** /**
...@@ -128,5 +129,5 @@ public abstract class AbstractWallet extends AbstractNode implements Serializabl ...@@ -128,5 +129,5 @@ public abstract class AbstractWallet extends AbstractNode implements Serializabl
*/ */
public abstract void send(String address, int amount, ActorRef observer); public abstract void send(String address, int amount, ActorRef observer);
public abstract void createDump(LocalTime time); public abstract void createDump(LocalDateTime time);
} }
...@@ -18,11 +18,11 @@ import fucoin.actions.transaction.ActionInvokeSentMoney; ...@@ -18,11 +18,11 @@ import fucoin.actions.transaction.ActionInvokeSentMoney;
import fucoin.gui.WalletGuiControl; import fucoin.gui.WalletGuiControl;
import scala.concurrent.Future; import scala.concurrent.Future;
import static akka.dispatch.Futures.future; import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import static akka.dispatch.Futures.future;
public class WalletImpl extends AbstractWallet { public class WalletImpl extends AbstractWallet {
private transient ActorRef preKnownNeighbour; private transient ActorRef preKnownNeighbour;
...@@ -251,7 +251,7 @@ public class WalletImpl extends AbstractWallet { ...@@ -251,7 +251,7 @@ public class WalletImpl extends AbstractWallet {
} }
} }
public void createDump(LocalTime time){ public void createDump(LocalDateTime time) {
Gson gson = new GsonBuilder() Gson gson = new GsonBuilder()
.setExclusionStrategies(new ExcludeAttributes()) .setExclusionStrategies(new ExcludeAttributes())
//.serializeNulls() <-- uncomment to serialize NULL fields as well //.serializeNulls() <-- uncomment to serialize NULL fields as well
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment