Skip to content
Snippets Groups Projects
Commit 8fcac86d authored by Simon Könnecke's avatar Simon Könnecke
Browse files

snapshot

parent 4f825c32
Branches
No related tags found
No related merge requests found
Showing
with 236 additions and 10 deletions
...@@ -67,5 +67,10 @@ ...@@ -67,5 +67,10 @@
<artifactId>fastutil</artifactId> <artifactId>fastutil</artifactId>
<version>7.0.12</version> <version>7.0.12</version>
</dependency> </dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.7</version>
</dependency>
</dependencies> </dependencies>
</project> </project>
\ No newline at end of file
...@@ -6,7 +6,11 @@ import akka.actor.UntypedActor; ...@@ -6,7 +6,11 @@ import akka.actor.UntypedActor;
import fucoin.actions.transaction.ActionGetAmount; import fucoin.actions.transaction.ActionGetAmount;
import fucoin.wallet.AbstractWallet; import fucoin.wallet.AbstractWallet;
import java.io.File;
import java.io.FileOutputStream;
import java.io.OutputStreamWriter;
import java.io.Serializable; import java.io.Serializable;
import java.time.LocalDateTime;
import java.util.HashMap; import java.util.HashMap;
public abstract class AbstractNode extends UntypedActor implements Serializable { public abstract class AbstractNode extends UntypedActor implements Serializable {
...@@ -68,4 +72,49 @@ public abstract class AbstractNode extends UntypedActor implements Serializable ...@@ -68,4 +72,49 @@ public abstract class AbstractNode extends UntypedActor implements Serializable
public HashMap<String, ActorRef> getKnownNeighbors() { public HashMap<String, ActorRef> getKnownNeighbors() {
return knownNeighbors; return knownNeighbors;
} }
private HashMap<LocalDateTime, String> snapshot = new HashMap();
public HashMap<LocalDateTime, String> getSnapshot() {
return snapshot;
}
public String readSnapshot(LocalDateTime time) {
return getSnapshot().get(time);
}
public boolean writeSnapshot(String name, String content, LocalDateTime time) {
if (getSnapshot().containsKey(time)) {
return false;
}
getSnapshot().put(time, content);
String folder = time.getYear() + "-" + time.getMonthValue() + "-" + time.getDayOfMonth() + " " +
time.getHour() + "." + time.getMinute() + "." + time.getSecond();
String filename = "snapshots/" + folder + "/" + name + ".json";
try {
File theDir = new File("snapshots");
// if the directory does not exist, create it
if (!theDir.exists()) {
theDir.mkdir();
}
theDir = new File("snapshots/" + folder);
if (!theDir.exists()) {
theDir.mkdir();
}
File myFile = new File(filename);
myFile.createNewFile();
FileOutputStream fOut = new FileOutputStream(myFile);
OutputStreamWriter myOutWriter = new OutputStreamWriter(fOut);
myOutWriter.append(content);
myOutWriter.close();
fOut.close();
} catch (Exception e) {
e.printStackTrace();
}
return true;
}
} }
\ No newline at end of file
package fucoin.actions.control;
import akka.actor.ActorRef;
import akka.actor.UntypedActorContext;
import fucoin.actions.ClientAction;
import fucoin.actions.transaction.SuperVisorAction;
import fucoin.supervisor.SuperVisorImpl;
import fucoin.wallet.AbstractWallet;
import java.time.LocalDateTime;
import java.util.Map;
public class ActionSuperVisorCreateSnapshot extends SuperVisorAction {
private final LocalDateTime time;
public ActionSuperVisorCreateSnapshot(LocalDateTime time) {
this.time = time;
}
@Override
protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, SuperVisorImpl abstractNode) {
//write the snapshot
if (!abstractNode.writeSnapshot(time)) {
return;
}
}
}
package fucoin.actions.control;
import akka.actor.ActorRef;
import akka.actor.UntypedActorContext;
import fucoin.actions.ClientAction;
import fucoin.wallet.AbstractWallet;
import java.time.LocalDateTime;
import java.util.Map;
public class ActionWalletCreateSnapshot extends ClientAction {
private final LocalDateTime time;
public ActionWalletCreateSnapshot(LocalDateTime time) {
this.time = time;
}
@Override
protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, AbstractWallet abstractNode) {
//write the snapshot
if (!abstractNode.writeSnapshot(time)) {
return;
}
//tell the neighbours
for (Map.Entry<String, ActorRef> node : abstractNode.getKnownNeighbors().entrySet()) {
node.getValue().tell(this, self);
}
}
}
...@@ -47,6 +47,7 @@ public class ActionCommitDistributedCommittedTransfer extends ClientAction { ...@@ -47,6 +47,7 @@ public class ActionCommitDistributedCommittedTransfer extends ClientAction {
if (granted) { if (granted) {
if (source.compareTo(self) == 0) { if (source.compareTo(self) == 0) {
wallet.getqDigest().offer(amount);
wallet.setAmount(wallet.getAmount() - amount); wallet.setAmount(wallet.getAmount() - amount);
wallet.addTransactionLogMessageSuccess("Sent " + amount + " FUC to " + target.path().name()); wallet.addTransactionLogMessageSuccess("Sent " + amount + " FUC to " + target.path().name());
} else if (target.compareTo(self) == 0) { } else if (target.compareTo(self) == 0) {
......
package fucoin.gui; package fucoin.gui;
import fucoin.supervisor.AmountTableModel;
public interface SuperVisorGuiControl extends TransactionLogger { public interface SuperVisorGuiControl extends TransactionLogger {
/** /**
...@@ -7,6 +9,8 @@ public interface SuperVisorGuiControl extends TransactionLogger { ...@@ -7,6 +9,8 @@ public interface SuperVisorGuiControl extends TransactionLogger {
*/ */
void onLeave(); void onLeave();
public void updateTable(String address, String name, int amount); void updateTable(String address, String name, int amount);
AmountTableModel getAmountTableModel();
} }
...@@ -3,6 +3,7 @@ package fucoin.gui; ...@@ -3,6 +3,7 @@ package fucoin.gui;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.pattern.Patterns; import akka.pattern.Patterns;
import akka.util.Timeout; import akka.util.Timeout;
import fucoin.actions.control.ActionWalletCreateSnapshot;
import fucoin.actions.control.ActionWalletGetNeighbours; import fucoin.actions.control.ActionWalletGetNeighbours;
import fucoin.actions.control.ActionWalletGetNeighboursAnswer; import fucoin.actions.control.ActionWalletGetNeighboursAnswer;
import fucoin.supervisor.AmountTableModel; import fucoin.supervisor.AmountTableModel;
...@@ -11,9 +12,10 @@ import scala.concurrent.Await; ...@@ -11,9 +12,10 @@ import scala.concurrent.Await;
import scala.concurrent.Future; import scala.concurrent.Future;
import scala.concurrent.duration.Duration; import scala.concurrent.duration.Duration;
import java.time.LocalDateTime;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map; import java.util.Map;
import java.util.Set;
public class SuperVisorGuiControlImpl implements SuperVisorGuiControl { public class SuperVisorGuiControlImpl implements SuperVisorGuiControl {
private SuperVisorImpl superVisor; private SuperVisorImpl superVisor;
...@@ -22,6 +24,8 @@ public class SuperVisorGuiControlImpl implements SuperVisorGuiControl { ...@@ -22,6 +24,8 @@ public class SuperVisorGuiControlImpl implements SuperVisorGuiControl {
private HashMap<String, HashMap<String, ActorRef>> nodeNeighbours = new HashMap<>(); private HashMap<String, HashMap<String, ActorRef>> nodeNeighbours = new HashMap<>();
private LinkedList<LocalDateTime> snapshotTimes = new LinkedList<>();
private SuperVisorThreadGUI threadGUI; private SuperVisorThreadGUI threadGUI;
private boolean logActive = false; private boolean logActive = false;
...@@ -40,6 +44,14 @@ public class SuperVisorGuiControlImpl implements SuperVisorGuiControl { ...@@ -40,6 +44,14 @@ public class SuperVisorGuiControlImpl implements SuperVisorGuiControl {
} }
public void createSnapshot(LocalDateTime time) {
snapshotTimes.add(time);
for (Map.Entry<String, ActorRef> item : superVisor.getKnownNeighbors().entrySet()) {
item.getValue().tell(new ActionWalletCreateSnapshot(time), ActorRef.noSender());
break;
}
}
public void updateNodeNeighbourList() { public void updateNodeNeighbourList() {
try { try {
Timeout timeout = new Timeout(Duration.create(10, "seconds")); Timeout timeout = new Timeout(Duration.create(10, "seconds"));
...@@ -62,6 +74,7 @@ public class SuperVisorGuiControlImpl implements SuperVisorGuiControl { ...@@ -62,6 +74,7 @@ public class SuperVisorGuiControlImpl implements SuperVisorGuiControl {
public HashMap<String, HashMap<String, ActorRef>> getNodeNeighbourList() { public HashMap<String, HashMap<String, ActorRef>> getNodeNeighbourList() {
return nodeNeighbours; return nodeNeighbours;
} }
public void guiTerminated() { public void guiTerminated() {
superVisor.exit(); superVisor.exit();
} }
......
...@@ -5,6 +5,7 @@ import java.awt.*; ...@@ -5,6 +5,7 @@ import java.awt.*;
import java.awt.event.ItemEvent; import java.awt.event.ItemEvent;
import java.awt.event.WindowAdapter; import java.awt.event.WindowAdapter;
import java.awt.event.WindowEvent; import java.awt.event.WindowEvent;
import java.time.LocalDateTime;
/** /**
* *
...@@ -17,6 +18,7 @@ public class SuperVisorThreadGUI { ...@@ -17,6 +18,7 @@ public class SuperVisorThreadGUI {
private JScrollPane logPane = new JScrollPane(txtLog); private JScrollPane logPane = new JScrollPane(txtLog);
private JCheckBox showDebug; private JCheckBox showDebug;
private JCheckBox activateLogging; private JCheckBox activateLogging;
private JComboBox<LocalDateTime> dropdown = new JComboBox<LocalDateTime>();
private SuperVisorGuiControlImpl superVisorGuiControl; private SuperVisorGuiControlImpl superVisorGuiControl;
public SuperVisorThreadGUI(SuperVisorGuiControlImpl superVisorGuiControl) { public SuperVisorThreadGUI(SuperVisorGuiControlImpl superVisorGuiControl) {
...@@ -65,13 +67,19 @@ public class SuperVisorThreadGUI { ...@@ -65,13 +67,19 @@ public class SuperVisorThreadGUI {
configPanel.add(activateLogging); configPanel.add(activateLogging);
configPanel.add(showDebug); configPanel.add(showDebug);
JPanel snapshotPanel = new JPanel();
snapshotPanel.add(dropdown);
//logPanel.add(activateLogging, BorderLayout.NORTH); //logPanel.add(activateLogging, BorderLayout.NORTH);
logPanel.add(configPanel, BorderLayout.NORTH); logPanel.add(configPanel, BorderLayout.NORTH);
logPanel.add(snapshotPanel, BorderLayout.SOUTH);
logPanel.add(logPane, BorderLayout.CENTER); logPanel.add(logPane, BorderLayout.CENTER);
contentPanel.add(logPanel); contentPanel.add(logPanel);
frame.add(contentPanel, BorderLayout.CENTER); frame.add(contentPanel, BorderLayout.CENTER);
JPanel btnPanel = new JPanel(); JPanel btnPanel = new JPanel();
JButton showGraphBtn = new JButton("Show Graph"); JButton showGraphBtn = new JButton("Show Graph");
...@@ -81,6 +89,14 @@ public class SuperVisorThreadGUI { ...@@ -81,6 +89,14 @@ public class SuperVisorThreadGUI {
}); });
btnPanel.add(showGraphBtn); btnPanel.add(showGraphBtn);
JButton createSnapshot = new JButton("Create Snapshot");
createSnapshot.addActionListener(e -> {
LocalDateTime time = LocalDateTime.now();
superVisorGuiControl.createSnapshot(time);
dropdown.addItem(time);
});
btnPanel.add(createSnapshot);
//Exit Button and shutdown supervisor //Exit Button and shutdown supervisor
JButton exitBtn = new JButton("Stop Supervisor"); JButton exitBtn = new JButton("Stop Supervisor");
...@@ -106,6 +122,7 @@ public class SuperVisorThreadGUI { ...@@ -106,6 +122,7 @@ public class SuperVisorThreadGUI {
public void windowClosing(WindowEvent e) { public void windowClosing(WindowEvent e) {
super.windowClosing(e); super.windowClosing(e);
superVisorGuiControl.guiTerminated(); superVisorGuiControl.guiTerminated();
System.exit(0);
} }
}); });
}).start(); }).start();
......
package fucoin.supervisor;
import java.io.Serializable;
public class Snapshot implements Serializable {
private final AmountTableModel amountTableModel;
public Snapshot(SuperVisorImpl superVisor) {
this.amountTableModel = superVisor.getGui().getAmountTableModel();
}
public AmountTableModel getAmountTableModel() {
return amountTableModel;
}
}
...@@ -2,6 +2,7 @@ package fucoin.supervisor; ...@@ -2,6 +2,7 @@ package fucoin.supervisor;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.actor.Props; import akka.actor.Props;
import com.google.gson.Gson;
import fucoin.actions.Action; import fucoin.actions.Action;
import fucoin.actions.control.ActionWalletCreationDone; import fucoin.actions.control.ActionWalletCreationDone;
import fucoin.actions.persist.ActionInvokeUpdate; import fucoin.actions.persist.ActionInvokeUpdate;
...@@ -11,8 +12,10 @@ import fucoin.actions.transaction.SuperVisorAction; ...@@ -11,8 +12,10 @@ import fucoin.actions.transaction.SuperVisorAction;
import fucoin.gui.SuperVisorGuiControl; import fucoin.gui.SuperVisorGuiControl;
import fucoin.AbstractNode; import fucoin.AbstractNode;
import fucoin.gui.TransactionLogger; import fucoin.gui.TransactionLogger;
import fucoin.supervisor.Snapshot;
import javax.swing.*; import javax.swing.*;
import java.time.LocalDateTime;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
...@@ -38,6 +41,10 @@ public class SuperVisorImpl extends AbstractNode implements TransactionLogger { ...@@ -38,6 +41,10 @@ public class SuperVisorImpl extends AbstractNode implements TransactionLogger {
this.gui = gui; this.gui = gui;
} }
public SuperVisorGuiControl getGui() {
return gui;
}
@Override @Override
public void onReceive(Object msg) { public void onReceive(Object msg) {
...@@ -167,4 +174,12 @@ public class SuperVisorImpl extends AbstractNode implements TransactionLogger { ...@@ -167,4 +174,12 @@ public class SuperVisorImpl extends AbstractNode implements TransactionLogger {
public void setBankCommitObserver(ActorRef bankCommitObserver) { public void setBankCommitObserver(ActorRef bankCommitObserver) {
this.bankCommitObserver = bankCommitObserver; this.bankCommitObserver = bankCommitObserver;
} }
public boolean writeSnapshot(LocalDateTime time) {
return super.writeSnapshot("SuperVisor", new Gson().toJson(new Snapshot(this)), time);
}
public Snapshot readSnapShot(LocalDateTime time) {
return new Gson().fromJson(super.readSnapshot(time), Snapshot.class);
}
} }
package fucoin.wallet; package fucoin.wallet;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import com.google.gson.Gson;
import fucoin.AbstractNode; import fucoin.AbstractNode;
import fucoin.gui.TransactionLogger; import fucoin.gui.TransactionLogger;
import scala.concurrent.Future; import scala.concurrent.Future;
import java.io.Serializable; import java.io.Serializable;
import java.time.LocalDateTime;
/** /**
* *
*/ */
public abstract class AbstractWallet extends AbstractNode implements Serializable, TransactionLogger { public abstract class AbstractWallet extends AbstractNode implements Serializable, TransactionLogger {
private static final double sCompression = 0.7;
private QDigest qDigest = new QDigest(sCompression);
/** /**
* Currently amount of this wallet * Currently amount of this wallet
*/ */
...@@ -114,4 +119,22 @@ public abstract class AbstractWallet extends AbstractNode implements Serializabl ...@@ -114,4 +119,22 @@ public abstract class AbstractWallet extends AbstractNode implements Serializabl
* @param observer * @param observer
*/ */
public abstract void send(String address, int amount, ActorRef observer); public abstract void send(String address, int amount, ActorRef observer);
public QDigest getqDigest() {
return qDigest;
}
public void setqDigest(QDigest qDigest) {
this.qDigest = qDigest;
}
public boolean writeSnapshot(LocalDateTime time) {
return super.writeSnapshot(getName(), new Gson().toJson(new Snapshot(this)), time);
}
public Snapshot readSnapShot(LocalDateTime time) {
return new Gson().fromJson(super.readSnapshot(time), Snapshot.class);
}
} }
...@@ -17,6 +17,7 @@ import it.unimi.dsi.fastutil.longs.Long2LongOpenHashMap; ...@@ -17,6 +17,7 @@ import it.unimi.dsi.fastutil.longs.Long2LongOpenHashMap;
import it.unimi.dsi.fastutil.longs.LongArrayFIFOQueue; import it.unimi.dsi.fastutil.longs.LongArrayFIFOQueue;
/** /**
* Source: https://github.com/addthis/stream-lib/blob/master/src/main/java/com/clearspring/analytics/stream/quantile/QDigest.java
* Q-Digest datastructure. * Q-Digest datastructure.
* <p/> * <p/>
* Answers approximate quantile queries: actual rank of the result of query(q) * Answers approximate quantile queries: actual rank of the result of query(q)
......
package fucoin.wallet;
import java.io.Serializable;
import java.util.List;
public class Snapshot implements Serializable{
private final String name;
private final int amount;
private final byte[] qDigest;
private final List<long[]> qDigestVal;
public Snapshot(AbstractWallet wallet) {
this.name = wallet.getName();
this.amount = wallet.getAmount();
this.qDigest = QDigest.serialize(wallet.getqDigest());
this.qDigestVal = wallet.getqDigest().toAscRanges();
}
public String getName() {
return name;
}
public int getAmount() {
return amount;
}
public QDigest getqDigest() {
return QDigest.deserialize(this.qDigest);
}
}
...@@ -4,7 +4,6 @@ import akka.actor.ActorRef; ...@@ -4,7 +4,6 @@ import akka.actor.ActorRef;
import akka.actor.Props; import akka.actor.Props;
import fucoin.actions.ClientAction; import fucoin.actions.ClientAction;
import fucoin.actions.join.ActionJoin; import fucoin.actions.join.ActionJoin;
import fucoin.actions.join.ActionJoinAnswer;
import fucoin.actions.join.ActionTellSupervisor; import fucoin.actions.join.ActionTellSupervisor;
import fucoin.actions.join.ServerActionJoin; import fucoin.actions.join.ServerActionJoin;
import fucoin.actions.persist.ActionInvokeLeave; import fucoin.actions.persist.ActionInvokeLeave;
...@@ -14,15 +13,11 @@ import fucoin.actions.transaction.ActionInvokeSentMoney; ...@@ -14,15 +13,11 @@ import fucoin.actions.transaction.ActionInvokeSentMoney;
import fucoin.gui.WalletGuiControl; import fucoin.gui.WalletGuiControl;
import scala.concurrent.Future; import scala.concurrent.Future;
import static akka.dispatch.Futures.future;
import java.util.HashMap;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
public class WalletImpl extends AbstractWallet { import static akka.dispatch.Futures.future;
private static final double sCompression = 0.7; public class WalletImpl extends AbstractWallet {
private QDigest qDigest = new QDigest(sCompression);
private ActorRef preKnownNeighbour; private ActorRef preKnownNeighbour;
private ActorRef remoteSuperVisorActor; private ActorRef remoteSuperVisorActor;
...@@ -54,7 +49,6 @@ public class WalletImpl extends AbstractWallet { ...@@ -54,7 +49,6 @@ public class WalletImpl extends AbstractWallet {
*/ */
public void addAmount(int amount) { public void addAmount(int amount) {
setAmount(this.getAmount() + amount); setAmount(this.getAmount() + amount);
qDigest.offer(amount);
addLogMsg(" My amount is now " + this.getAmount()); addLogMsg(" My amount is now " + this.getAmount());
} }
...@@ -255,4 +249,5 @@ public class WalletImpl extends AbstractWallet { ...@@ -255,4 +249,5 @@ public class WalletImpl extends AbstractWallet {
System.out.println(message); System.out.println(message);
} }
} }
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment