Skip to content
Snippets Groups Projects
Commit 1e3a38f7 authored by Simon Könnecke's avatar Simon Könnecke
Browse files

qdigst to statistics class and statistic class to abstractWallet, snapshot can...

qdigst to statistics class and statistic class to abstractWallet, snapshot can now load from previous sessions.
parent d19fc1dd
Branches
No related tags found
No related merge requests found
Showing
with 229 additions and 51 deletions
...@@ -3,13 +3,13 @@ package fucoin; ...@@ -3,13 +3,13 @@ package fucoin;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.actor.Address; import akka.actor.Address;
import akka.actor.UntypedActor; import akka.actor.UntypedActor;
import com.google.gson.Gson;
import fucoin.actions.transaction.ActionGetAmount; import fucoin.actions.transaction.ActionGetAmount;
import fucoin.wallet.AbstractWallet; import fucoin.wallet.AbstractWallet;
import java.io.File; import java.io.*;
import java.io.FileOutputStream; import java.nio.file.Files;
import java.io.OutputStreamWriter; import java.nio.file.Paths;
import java.io.Serializable;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.HashMap; import java.util.HashMap;
...@@ -73,26 +73,82 @@ public abstract class AbstractNode extends UntypedActor implements Serializable ...@@ -73,26 +73,82 @@ public abstract class AbstractNode extends UntypedActor implements Serializable
return knownNeighbors; return knownNeighbors;
} }
private HashMap<LocalDateTime, String> snapshot = new HashMap(); /**
* Stores all snapshots.
*/
private HashMap<LocalDateTime, String> snapshots = new HashMap();
/**
* Last loaded snapshot.
*/
protected LocalDateTime lastRetrievedSnapshot = null;
/**
* Return all Snapshots.
*
* @return Map of snapshot name to snapshot as string
*/
public HashMap<LocalDateTime, String> getSnapshots() {
return snapshots;
}
/**
* Checks if the last loaded snapshot was snapshot "time".
* @param time Snapshot name.
* @return true = was the last loaded snapshot, false = no wasn't the last loaded snapshot
*/
public boolean isLastLoadedSnapshot(LocalDateTime time) {
if (lastRetrievedSnapshot.equals(time)) {
return true;
}
return false;
}
/**
* Read a snapshot from memory or disk.
* @param name Node name
* @param time Snapshot name
* @return Empty or String with a valid json.
*/
public String readSnapshot(String name, LocalDateTime time) {
lastRetrievedSnapshot = time;
if (snapshots.containsKey(time)) {
return getSnapshots().get(time);
}
String[] identifier = getSnapshotDirAndFilename(name, time);
public HashMap<LocalDateTime, String> getSnapshot() { try {
return snapshot; File theDir = new File(identifier[1]);
if (!theDir.exists()) {
return "";
}
byte[] encoded = Files.readAllBytes(Paths.get(identifier[1]));
String content = new String(encoded);
//store snapshot to memory
snapshots.put(time, content);
return content;
} catch (IOException e) {
return "";
} }
public String readSnapshot(LocalDateTime time) {
return getSnapshot().get(time);
} }
/**
* Stores snapshot to memory and persistent to disk.
* @param name Name of the node.
* @param content Serialized object.
* @param time Snapshot name.
* @return if was successful or not.
*/
public boolean writeSnapshot(String name, String content, LocalDateTime time) { public boolean writeSnapshot(String name, String content, LocalDateTime time) {
if (getSnapshot().containsKey(time)) { if (getSnapshots().containsKey(time)) {
return false; return false;
} }
getSnapshot().put(time, content); getSnapshots().put(time, content);
String[] identifier = getSnapshotDirAndFilename(name, time);
String folder = time.getYear() + "-" + time.getMonthValue() + "-" + time.getDayOfMonth() + " " +
time.getHour() + "." + time.getMinute() + "." + time.getSecond();
String filename = "snapshots/" + folder + "/" + name + ".json";
try { try {
File theDir = new File("snapshots"); File theDir = new File("snapshots");
...@@ -100,12 +156,12 @@ public abstract class AbstractNode extends UntypedActor implements Serializable ...@@ -100,12 +156,12 @@ public abstract class AbstractNode extends UntypedActor implements Serializable
if (!theDir.exists()) { if (!theDir.exists()) {
theDir.mkdir(); theDir.mkdir();
} }
theDir = new File("snapshots/" + folder); theDir = new File("snapshots/" + identifier[0]);
if (!theDir.exists()) { if (!theDir.exists()) {
theDir.mkdir(); theDir.mkdir();
} }
File myFile = new File(filename); File myFile = new File(identifier[1]);
myFile.createNewFile(); myFile.createNewFile();
FileOutputStream fOut = new FileOutputStream(myFile); FileOutputStream fOut = new FileOutputStream(myFile);
OutputStreamWriter myOutWriter = new OutputStreamWriter(fOut); OutputStreamWriter myOutWriter = new OutputStreamWriter(fOut);
...@@ -113,8 +169,26 @@ public abstract class AbstractNode extends UntypedActor implements Serializable ...@@ -113,8 +169,26 @@ public abstract class AbstractNode extends UntypedActor implements Serializable
myOutWriter.close(); myOutWriter.close();
fOut.close(); fOut.close();
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); return false;
} }
return true; return true;
} }
/**
* Create the file path to snapshot.
* @param name Name of the node
* @param time Snapshot name
* @return Array[snapshot folder name, path to snapshot include filename]
*/
private String[] getSnapshotDirAndFilename(String name, LocalDateTime time) {
String folder = time.getYear() + "-" + time.getMonthValue() + "-" + time.getDayOfMonth() + " " +
time.getHour() + "." + time.getMinute() + "." + time.getSecond();
return new String[]{folder, "snapshots/" + folder + "/" + name + ".json"};
}
private Statistics statistics = new Statistics();
public Statistics getStatistics() {
return statistics;
}
} }
\ No newline at end of file
package fucoin.wallet; package fucoin;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
......
package fucoin;
import java.io.Serializable;
public class Statistics implements Serializable {
/**
* Q-Digest Compression: When the node value at binary tree is higher then sCompression apply q-digest compress.
*/
private static final double sCompression = 5;
/**
* Q-Digest data structure to creating histograms or answer queries for mean or x-quantil.
*/
private transient QDigest qDigest = new QDigest(sCompression);
public QDigest getqDigest() {
return qDigest;
}
public void setqDigest(QDigest qDigest) {
this.qDigest = qDigest;
}
}
...@@ -21,7 +21,14 @@ public class ActionSuperVisorCreateSnapshot extends SuperVisorAction { ...@@ -21,7 +21,14 @@ public class ActionSuperVisorCreateSnapshot extends SuperVisorAction {
protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, SuperVisorImpl abstractNode) { protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, SuperVisorImpl abstractNode) {
//write the snapshot //write the snapshot
if (!abstractNode.writeSnapshot(time)) { if (!abstractNode.writeSnapshot(time)) {
//snapshot already exists
return; return;
} }
//notify one in the network
for(ActorRef startPoint: abstractNode.getKnownNeighbors().values()) {
startPoint.tell(new ActionWalletCreateSnapshot(time), self);
break;
}
} }
} }
...@@ -50,7 +50,7 @@ public class ActionCommitDistributedCommittedTransfer extends ClientAction { ...@@ -50,7 +50,7 @@ public class ActionCommitDistributedCommittedTransfer extends ClientAction {
wallet.setAmount(wallet.getAmount() - amount); wallet.setAmount(wallet.getAmount() - amount);
wallet.addTransactionLogMessageSuccess("Sent " + amount + " FUC to " + target.path().name()); wallet.addTransactionLogMessageSuccess("Sent " + amount + " FUC to " + target.path().name());
} else if (target.compareTo(self) == 0) { } else if (target.compareTo(self) == 0) {
wallet.getqDigest().offer(amount); wallet.getStatistics().getqDigest().offer(amount);
wallet.setAmount(wallet.getAmount() + amount); wallet.setAmount(wallet.getAmount() + amount);
wallet.addTransactionLogMessageSuccess("Received " + amount + " FUC from " + source.path().name()); wallet.addTransactionLogMessageSuccess("Received " + amount + " FUC from " + source.path().name());
} }
......
...@@ -117,12 +117,23 @@ public abstract class AbstractConfiguration extends AbstractNode { ...@@ -117,12 +117,23 @@ public abstract class AbstractConfiguration extends AbstractNode {
} }
protected void nextRandomTransaction() { protected void nextRandomTransaction() {
nextRandomTransaction(true);
}
protected void nextRandomTransaction(boolean startConcurrentTransactions) {
if (remainingTransactions <= 0) { if (remainingTransactions <= 0) {
return; return;
} }
System.out.println("Next Random Transaction, " + remainingTransactions + " left."); System.out.println("Next Random Transaction, " + remainingTransactions + " left.");
if (startConcurrentTransactions) {
int startTransactions = ThreadLocalRandom.current().nextInt(5);
for (int i = 0; i < startTransactions; i++) {
nextRandomTransaction(false);
}
}
remainingTransactions--; remainingTransactions--;
try { try {
randomTransaction(); randomTransaction();
......
...@@ -25,7 +25,6 @@ public class SuperVisorGuiControlImpl implements SuperVisorGuiControl { ...@@ -25,7 +25,6 @@ public class SuperVisorGuiControlImpl implements SuperVisorGuiControl {
private List<Snapshot> loadedSnapshotsOfWallets = new LinkedList<>(); private List<Snapshot> loadedSnapshotsOfWallets = new LinkedList<>();
private LinkedList<LocalDateTime> snapshotTimes = new LinkedList<>();
private LocalDateTime loadedSnapshotDateTime; private LocalDateTime loadedSnapshotDateTime;
private SuperVisorThreadGUI threadGUI; private SuperVisorThreadGUI threadGUI;
...@@ -39,16 +38,14 @@ public class SuperVisorGuiControlImpl implements SuperVisorGuiControl { ...@@ -39,16 +38,14 @@ public class SuperVisorGuiControlImpl implements SuperVisorGuiControl {
} }
private void init() { private void init() {
amountTableModel = new AmountTableModel(); amountTableModel = new AmountTableModel();
threadGUI = new SuperVisorThreadGUI(this); threadGUI = new SuperVisorThreadGUI(this);
threadGUI.init(); threadGUI.init();
} }
public void createSnapshot(LocalDateTime time) { public void createSnapshot(LocalDateTime time) {
snapshotTimes.add(time); superVisor.writeSnapshot(time);
for (Map.Entry<String, ActorRef> item : superVisor.getKnownNeighbors().entrySet()) { for (Map.Entry<String, ActorRef> item : superVisor.getKnownNeighbors().entrySet()) {
item.getValue().tell(new ActionWalletCreateSnapshot(time), ActorRef.noSender()); item.getValue().tell(new ActionWalletCreateSnapshot(time), ActorRef.noSender());
break; break;
...@@ -86,6 +83,14 @@ public class SuperVisorGuiControlImpl implements SuperVisorGuiControl { ...@@ -86,6 +83,14 @@ public class SuperVisorGuiControlImpl implements SuperVisorGuiControl {
return loadedSnapshotsOfWallets; return loadedSnapshotsOfWallets;
} }
public fucoin.supervisor.Snapshot getSupervisorSnapshot(LocalDateTime time) {
return superVisor.readSnapShot(time);
}
public List<LocalDateTime> getSnapshotList() {
return superVisor.getSnapshotList();
}
public void guiTerminated() { public void guiTerminated() {
superVisor.exit(); superVisor.exit();
} }
......
package fucoin.gui; package fucoin.gui;
import fucoin.wallet.QDigest; import fucoin.QDigest;
import fucoin.wallet.Snapshot; import fucoin.wallet.Snapshot;
import org.jfree.chart.ChartFactory; import org.jfree.chart.ChartFactory;
import org.jfree.chart.ChartPanel; import org.jfree.chart.ChartPanel;
......
...@@ -68,6 +68,7 @@ public class SuperVisorThreadGUI { ...@@ -68,6 +68,7 @@ public class SuperVisorThreadGUI {
configPanel.add(showDebug); configPanel.add(showDebug);
JPanel snapshotPanel = new JPanel(); JPanel snapshotPanel = new JPanel();
superVisorGuiControl.getSnapshotList().forEach(time -> dropdown.addItem(time));
snapshotPanel.add(dropdown); snapshotPanel.add(dropdown);
//logPanel.add(activateLogging, BorderLayout.NORTH); //logPanel.add(activateLogging, BorderLayout.NORTH);
......
package fucoin.supervisor; package fucoin.supervisor;
import fucoin.QDigest;
import fucoin.Statistics;
import java.io.Serializable; import java.io.Serializable;
import java.util.List;
public class Snapshot implements Serializable { public class Snapshot implements Serializable {
private final AmountTableModel amountTableModel; private final AmountTableModel amountTableModel;
private final Statistics statistics;
private final List<long[]> qDigestVal;
private final byte[] qDigest;
public Snapshot(SuperVisorImpl superVisor) { public Snapshot(SuperVisorImpl superVisor) {
this.amountTableModel = superVisor.getGui().getAmountTableModel(); this.amountTableModel = superVisor.getGui().getAmountTableModel();
this.statistics = superVisor.getStatistics();
this.qDigest = QDigest.serialize(superVisor.getStatistics().getqDigest());
this.qDigestVal = superVisor.getStatistics().getqDigest().toAscRanges();
} }
public AmountTableModel getAmountTableModel() { public AmountTableModel getAmountTableModel() {
return amountTableModel; return amountTableModel;
} }
public Statistics getStatistics() {
return statistics;
}
public List<long[]> getqDigestVal() {
return qDigestVal;
}
public QDigest getqDigest() {
return QDigest.deserialize(this.qDigest);
}
} }
...@@ -3,6 +3,9 @@ package fucoin.supervisor; ...@@ -3,6 +3,9 @@ package fucoin.supervisor;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.actor.Props; import akka.actor.Props;
import com.google.gson.Gson; import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import com.google.gson.stream.JsonReader;
import fucoin.AbstractNode;
import fucoin.actions.Action; import fucoin.actions.Action;
import fucoin.actions.control.ActionWalletCreationDone; import fucoin.actions.control.ActionWalletCreationDone;
import fucoin.actions.persist.ActionInvokeUpdate; import fucoin.actions.persist.ActionInvokeUpdate;
...@@ -10,16 +13,12 @@ import fucoin.actions.transaction.ActionGetAmountAnswer; ...@@ -10,16 +13,12 @@ import fucoin.actions.transaction.ActionGetAmountAnswer;
import fucoin.actions.transaction.ActionNotifyObserver; import fucoin.actions.transaction.ActionNotifyObserver;
import fucoin.actions.transaction.SuperVisorAction; import fucoin.actions.transaction.SuperVisorAction;
import fucoin.gui.SuperVisorGuiControl; import fucoin.gui.SuperVisorGuiControl;
import fucoin.AbstractNode;
import fucoin.gui.TransactionLogger; import fucoin.gui.TransactionLogger;
import fucoin.supervisor.Snapshot;
import javax.swing.*; import javax.swing.*;
import java.io.*;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.ArrayList; import java.util.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
public class SuperVisorImpl extends AbstractNode implements TransactionLogger { public class SuperVisorImpl extends AbstractNode implements TransactionLogger {
...@@ -34,7 +33,7 @@ public class SuperVisorImpl extends AbstractNode implements TransactionLogger { ...@@ -34,7 +33,7 @@ public class SuperVisorImpl extends AbstractNode implements TransactionLogger {
private ActorRef bankCommitObserver = null; private ActorRef bankCommitObserver = null;
public SuperVisorImpl() { public SuperVisorImpl() {
readSnapshotList();
} }
public void setGuiControl(SuperVisorGuiControl gui) { public void setGuiControl(SuperVisorGuiControl gui) {
...@@ -175,11 +174,54 @@ public class SuperVisorImpl extends AbstractNode implements TransactionLogger { ...@@ -175,11 +174,54 @@ public class SuperVisorImpl extends AbstractNode implements TransactionLogger {
this.bankCommitObserver = bankCommitObserver; this.bankCommitObserver = bankCommitObserver;
} }
private List<LocalDateTime> snapshotList = new LinkedList<>();
private final static String sSnapshotListName = "snapshots/list.json";
public List<LocalDateTime> getSnapshotList() {
return snapshotList;
}
private void readSnapshotList() {
try {
File file = new File(sSnapshotListName);
if (!file.exists()) {
return;
}
Gson gson = new Gson();
JsonReader reader = new JsonReader(new FileReader(sSnapshotListName));
snapshotList = gson.fromJson(reader, new TypeToken<List<LocalDateTime>>() {}.getType());
} catch (FileNotFoundException e) {
e.printStackTrace();
}
}
private void writeSnapshotList() {
try {
File myFile = new File(sSnapshotListName);
myFile.createNewFile();
FileOutputStream fOut = new FileOutputStream(myFile);
OutputStreamWriter myOutWriter = new OutputStreamWriter(fOut);
myOutWriter.append(new Gson().toJson(snapshotList));
myOutWriter.close();
fOut.close();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
private void addSnapshot(LocalDateTime time) {
snapshotList.add(time);
writeSnapshotList();
}
public boolean writeSnapshot(LocalDateTime time) { public boolean writeSnapshot(LocalDateTime time) {
addSnapshot(time);
return super.writeSnapshot("SuperVisor", new Gson().toJson(new Snapshot(this)), time); return super.writeSnapshot("SuperVisor", new Gson().toJson(new Snapshot(this)), time);
} }
public Snapshot readSnapShot(LocalDateTime time) { public Snapshot readSnapShot(LocalDateTime time) {
return new Gson().fromJson(super.readSnapshot(time), Snapshot.class); return new Gson().fromJson(super.readSnapshot("SuperVisor", time), Snapshot.class);
} }
} }
...@@ -4,6 +4,7 @@ import akka.actor.ActorRef; ...@@ -4,6 +4,7 @@ import akka.actor.ActorRef;
import com.google.gson.Gson; import com.google.gson.Gson;
import com.google.gson.GsonBuilder; import com.google.gson.GsonBuilder;
import fucoin.AbstractNode; import fucoin.AbstractNode;
import fucoin.QDigest;
import fucoin.gui.TransactionLogger; import fucoin.gui.TransactionLogger;
import java.io.Serializable; import java.io.Serializable;
...@@ -13,10 +14,6 @@ import java.time.LocalDateTime; ...@@ -13,10 +14,6 @@ import java.time.LocalDateTime;
* *
*/ */
public abstract class AbstractWallet extends AbstractNode implements Serializable, TransactionLogger { public abstract class AbstractWallet extends AbstractNode implements Serializable, TransactionLogger {
private static final double sCompression = 5;
private QDigest qDigest = new QDigest(sCompression);
/** /**
* Currently amount of this wallet * Currently amount of this wallet
*/ */
...@@ -120,22 +117,12 @@ public abstract class AbstractWallet extends AbstractNode implements Serializabl ...@@ -120,22 +117,12 @@ public abstract class AbstractWallet extends AbstractNode implements Serializabl
*/ */
public abstract void send(String address, int amount, ActorRef observer); public abstract void send(String address, int amount, ActorRef observer);
public QDigest getqDigest() {
return qDigest;
}
public void setqDigest(QDigest qDigest) {
this.qDigest = qDigest;
}
public boolean writeSnapshot(LocalDateTime time) { public boolean writeSnapshot(LocalDateTime time) {
return super.writeSnapshot(getName(), return super.writeSnapshot(getName(),
(new GsonBuilder().setPrettyPrinting().create()).toJson(new Snapshot(this)), time); (new GsonBuilder().setPrettyPrinting().create()).toJson(new Snapshot(this)), time);
} }
public Snapshot readSnapShot(LocalDateTime time) { public Snapshot readSnapShot(LocalDateTime time) {
return new Gson().fromJson(super.readSnapshot(time), Snapshot.class); return new Gson().fromJson(super.readSnapshot(getName(), time), Snapshot.class);
} }
} }
package fucoin.wallet; package fucoin.wallet;
import akka.actor.ActorRef; import fucoin.QDigest;
import fucoin.Statistics;
import java.io.Serializable; import java.io.Serializable;
import java.util.HashMap; import java.util.HashMap;
...@@ -12,6 +13,7 @@ public class Snapshot implements Serializable { ...@@ -12,6 +13,7 @@ public class Snapshot implements Serializable {
private final int amount; private final int amount;
private final Set<String> knownNeighbour; private final Set<String> knownNeighbour;
private final HashMap<String, Integer> knownNeighbourAmounts = new HashMap<>(); private final HashMap<String, Integer> knownNeighbourAmounts = new HashMap<>();
private final Statistics statistics;
private final List<long[]> qDigestVal; private final List<long[]> qDigestVal;
private final byte[] qDigest; private final byte[] qDigest;
...@@ -19,12 +21,13 @@ public class Snapshot implements Serializable { ...@@ -19,12 +21,13 @@ public class Snapshot implements Serializable {
public Snapshot(AbstractWallet wallet) { public Snapshot(AbstractWallet wallet) {
this.name = wallet.getName(); this.name = wallet.getName();
this.amount = wallet.getAmount(); this.amount = wallet.getAmount();
this.qDigest = QDigest.serialize(wallet.getqDigest()); this.statistics = wallet.getStatistics();
this.qDigestVal = wallet.getqDigest().toAscRanges();
this.knownNeighbour = wallet.getKnownNeighbors().keySet(); this.knownNeighbour = wallet.getKnownNeighbors().keySet();
wallet.amounts.forEach((actorRef, amount) -> { wallet.amounts.forEach((actorRef, amount) -> {
knownNeighbourAmounts.put(actorRef.toString(), amount); knownNeighbourAmounts.put(actorRef.toString(), amount);
}); });
this.qDigest = QDigest.serialize(wallet.getStatistics().getqDigest());
this.qDigestVal = wallet.getStatistics().getqDigest().toAscRanges();
} }
public String getName() { public String getName() {
...@@ -35,6 +38,10 @@ public class Snapshot implements Serializable { ...@@ -35,6 +38,10 @@ public class Snapshot implements Serializable {
return amount; return amount;
} }
public Statistics getStatistics() {
return statistics;
}
public Set<String> getKnownNeighbour() { public Set<String> getKnownNeighbour() {
return knownNeighbour; return knownNeighbour;
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment