diff --git a/pom.xml b/pom.xml index f81f4249ee39a7b829b333b750e79b16fab42777..c923f4a50a954b4bd29da388ef889fae5fb5fe7a 100644 --- a/pom.xml +++ b/pom.xml @@ -67,5 +67,10 @@ <artifactId>fastutil</artifactId> <version>7.0.12</version> </dependency> + <dependency> + <groupId>com.google.code.gson</groupId> + <artifactId>gson</artifactId> + <version>2.7</version> + </dependency> </dependencies> </project> \ No newline at end of file diff --git a/src/main/java/fucoin/AbstractNode.java b/src/main/java/fucoin/AbstractNode.java index 9777980aa3f1880b274b162f251bf001d5cb61ef..7fd042fe65bac4f7e1d544290f1e6d8c1733d232 100644 --- a/src/main/java/fucoin/AbstractNode.java +++ b/src/main/java/fucoin/AbstractNode.java @@ -6,7 +6,11 @@ import akka.actor.UntypedActor; import fucoin.actions.transaction.ActionGetAmount; import fucoin.wallet.AbstractWallet; +import java.io.File; +import java.io.FileOutputStream; +import java.io.OutputStreamWriter; import java.io.Serializable; +import java.time.LocalDateTime; import java.util.HashMap; public abstract class AbstractNode extends UntypedActor implements Serializable { @@ -68,4 +72,49 @@ public abstract class AbstractNode extends UntypedActor implements Serializable public HashMap<String, ActorRef> getKnownNeighbors() { return knownNeighbors; } + + private HashMap<LocalDateTime, String> snapshot = new HashMap(); + + public HashMap<LocalDateTime, String> getSnapshot() { + return snapshot; + } + + public String readSnapshot(LocalDateTime time) { + return getSnapshot().get(time); + } + + public boolean writeSnapshot(String name, String content, LocalDateTime time) { + if (getSnapshot().containsKey(time)) { + return false; + } + + getSnapshot().put(time, content); + + String folder = time.getYear() + "-" + time.getMonthValue() + "-" + time.getDayOfMonth() + " " + + time.getHour() + "." + time.getMinute() + "." + time.getSecond(); + String filename = "snapshots/" + folder + "/" + name + ".json"; + + try { + File theDir = new File("snapshots"); + // if the directory does not exist, create it + if (!theDir.exists()) { + theDir.mkdir(); + } + theDir = new File("snapshots/" + folder); + if (!theDir.exists()) { + theDir.mkdir(); + } + + File myFile = new File(filename); + myFile.createNewFile(); + FileOutputStream fOut = new FileOutputStream(myFile); + OutputStreamWriter myOutWriter = new OutputStreamWriter(fOut); + myOutWriter.append(content); + myOutWriter.close(); + fOut.close(); + } catch (Exception e) { + e.printStackTrace(); + } + return true; + } } \ No newline at end of file diff --git a/src/main/java/fucoin/actions/control/ActionSuperVisorCreateSnapshot.java b/src/main/java/fucoin/actions/control/ActionSuperVisorCreateSnapshot.java new file mode 100644 index 0000000000000000000000000000000000000000..26840270ff6ba4b1ca51fd952404029efc0613a3 --- /dev/null +++ b/src/main/java/fucoin/actions/control/ActionSuperVisorCreateSnapshot.java @@ -0,0 +1,27 @@ +package fucoin.actions.control; + +import akka.actor.ActorRef; +import akka.actor.UntypedActorContext; +import fucoin.actions.ClientAction; +import fucoin.actions.transaction.SuperVisorAction; +import fucoin.supervisor.SuperVisorImpl; +import fucoin.wallet.AbstractWallet; + +import java.time.LocalDateTime; +import java.util.Map; + +public class ActionSuperVisorCreateSnapshot extends SuperVisorAction { + private final LocalDateTime time; + + public ActionSuperVisorCreateSnapshot(LocalDateTime time) { + this.time = time; + } + + @Override + protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, SuperVisorImpl abstractNode) { + //write the snapshot + if (!abstractNode.writeSnapshot(time)) { + return; + } + } +} diff --git a/src/main/java/fucoin/actions/control/ActionWalletCreateSnapshot.java b/src/main/java/fucoin/actions/control/ActionWalletCreateSnapshot.java new file mode 100644 index 0000000000000000000000000000000000000000..db9a2992f32ecfa595bb15710d8040f61f311ab0 --- /dev/null +++ b/src/main/java/fucoin/actions/control/ActionWalletCreateSnapshot.java @@ -0,0 +1,30 @@ +package fucoin.actions.control; + +import akka.actor.ActorRef; +import akka.actor.UntypedActorContext; +import fucoin.actions.ClientAction; +import fucoin.wallet.AbstractWallet; + +import java.time.LocalDateTime; +import java.util.Map; + +public class ActionWalletCreateSnapshot extends ClientAction { + private final LocalDateTime time; + + public ActionWalletCreateSnapshot(LocalDateTime time) { + this.time = time; + } + + @Override + protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, AbstractWallet abstractNode) { + //write the snapshot + if (!abstractNode.writeSnapshot(time)) { + return; + } + + //tell the neighbours + for (Map.Entry<String, ActorRef> node : abstractNode.getKnownNeighbors().entrySet()) { + node.getValue().tell(this, self); + } + } +} diff --git a/src/main/java/fucoin/actions/transaction/ActionCommitDistributedCommittedTransfer.java b/src/main/java/fucoin/actions/transaction/ActionCommitDistributedCommittedTransfer.java index f44d08498f311f0aae27f00d2ce150c47a11ebd3..2605a2f2ce687183f409e4059d77ef95cec537cf 100644 --- a/src/main/java/fucoin/actions/transaction/ActionCommitDistributedCommittedTransfer.java +++ b/src/main/java/fucoin/actions/transaction/ActionCommitDistributedCommittedTransfer.java @@ -47,6 +47,7 @@ public class ActionCommitDistributedCommittedTransfer extends ClientAction { if (granted) { if (source.compareTo(self) == 0) { + wallet.getqDigest().offer(amount); wallet.setAmount(wallet.getAmount() - amount); wallet.addTransactionLogMessageSuccess("Sent " + amount + " FUC to " + target.path().name()); } else if (target.compareTo(self) == 0) { diff --git a/src/main/java/fucoin/gui/SuperVisorGuiControl.java b/src/main/java/fucoin/gui/SuperVisorGuiControl.java index b8b29e555f906ae8de36d8b1edd625fe7acc824d..b37177ba615168a786c078c38037f4b2f6c0725b 100644 --- a/src/main/java/fucoin/gui/SuperVisorGuiControl.java +++ b/src/main/java/fucoin/gui/SuperVisorGuiControl.java @@ -1,5 +1,7 @@ package fucoin.gui; +import fucoin.supervisor.AmountTableModel; + public interface SuperVisorGuiControl extends TransactionLogger { /** @@ -7,6 +9,8 @@ public interface SuperVisorGuiControl extends TransactionLogger { */ void onLeave(); - public void updateTable(String address, String name, int amount); + void updateTable(String address, String name, int amount); + + AmountTableModel getAmountTableModel(); } diff --git a/src/main/java/fucoin/gui/SuperVisorGuiControlImpl.java b/src/main/java/fucoin/gui/SuperVisorGuiControlImpl.java index 333ba6cd1427fc0008fa8f5bb9aa1df2725b10fe..ca7526e93df62b574f306e76974d880d8c2c4cf2 100644 --- a/src/main/java/fucoin/gui/SuperVisorGuiControlImpl.java +++ b/src/main/java/fucoin/gui/SuperVisorGuiControlImpl.java @@ -3,6 +3,7 @@ package fucoin.gui; import akka.actor.ActorRef; import akka.pattern.Patterns; import akka.util.Timeout; +import fucoin.actions.control.ActionWalletCreateSnapshot; import fucoin.actions.control.ActionWalletGetNeighbours; import fucoin.actions.control.ActionWalletGetNeighboursAnswer; import fucoin.supervisor.AmountTableModel; @@ -11,9 +12,10 @@ import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Duration; +import java.time.LocalDateTime; import java.util.HashMap; +import java.util.LinkedList; import java.util.Map; -import java.util.Set; public class SuperVisorGuiControlImpl implements SuperVisorGuiControl { private SuperVisorImpl superVisor; @@ -22,6 +24,8 @@ public class SuperVisorGuiControlImpl implements SuperVisorGuiControl { private HashMap<String, HashMap<String, ActorRef>> nodeNeighbours = new HashMap<>(); + private LinkedList<LocalDateTime> snapshotTimes = new LinkedList<>(); + private SuperVisorThreadGUI threadGUI; private boolean logActive = false; @@ -40,6 +44,14 @@ public class SuperVisorGuiControlImpl implements SuperVisorGuiControl { } + public void createSnapshot(LocalDateTime time) { + snapshotTimes.add(time); + for (Map.Entry<String, ActorRef> item : superVisor.getKnownNeighbors().entrySet()) { + item.getValue().tell(new ActionWalletCreateSnapshot(time), ActorRef.noSender()); + break; + } + } + public void updateNodeNeighbourList() { try { Timeout timeout = new Timeout(Duration.create(10, "seconds")); @@ -62,6 +74,7 @@ public class SuperVisorGuiControlImpl implements SuperVisorGuiControl { public HashMap<String, HashMap<String, ActorRef>> getNodeNeighbourList() { return nodeNeighbours; } + public void guiTerminated() { superVisor.exit(); } diff --git a/src/main/java/fucoin/gui/SuperVisorThreadGUI.java b/src/main/java/fucoin/gui/SuperVisorThreadGUI.java index 6f6b4b064c931260664cdc8587352650307ad208..c8a87216decdf8b486ac6f2677dc3ba31a914e9b 100644 --- a/src/main/java/fucoin/gui/SuperVisorThreadGUI.java +++ b/src/main/java/fucoin/gui/SuperVisorThreadGUI.java @@ -5,6 +5,7 @@ import java.awt.*; import java.awt.event.ItemEvent; import java.awt.event.WindowAdapter; import java.awt.event.WindowEvent; +import java.time.LocalDateTime; /** * @@ -17,6 +18,7 @@ public class SuperVisorThreadGUI { private JScrollPane logPane = new JScrollPane(txtLog); private JCheckBox showDebug; private JCheckBox activateLogging; + private JComboBox<LocalDateTime> dropdown = new JComboBox<LocalDateTime>(); private SuperVisorGuiControlImpl superVisorGuiControl; public SuperVisorThreadGUI(SuperVisorGuiControlImpl superVisorGuiControl) { @@ -65,13 +67,19 @@ public class SuperVisorThreadGUI { configPanel.add(activateLogging); configPanel.add(showDebug); + JPanel snapshotPanel = new JPanel(); + snapshotPanel.add(dropdown); + //logPanel.add(activateLogging, BorderLayout.NORTH); logPanel.add(configPanel, BorderLayout.NORTH); + logPanel.add(snapshotPanel, BorderLayout.SOUTH); logPanel.add(logPane, BorderLayout.CENTER); contentPanel.add(logPanel); frame.add(contentPanel, BorderLayout.CENTER); + + JPanel btnPanel = new JPanel(); JButton showGraphBtn = new JButton("Show Graph"); @@ -81,6 +89,14 @@ public class SuperVisorThreadGUI { }); btnPanel.add(showGraphBtn); + JButton createSnapshot = new JButton("Create Snapshot"); + createSnapshot.addActionListener(e -> { + LocalDateTime time = LocalDateTime.now(); + superVisorGuiControl.createSnapshot(time); + dropdown.addItem(time); + }); + btnPanel.add(createSnapshot); + //Exit Button and shutdown supervisor JButton exitBtn = new JButton("Stop Supervisor"); @@ -106,6 +122,7 @@ public class SuperVisorThreadGUI { public void windowClosing(WindowEvent e) { super.windowClosing(e); superVisorGuiControl.guiTerminated(); + System.exit(0); } }); }).start(); diff --git a/src/main/java/fucoin/supervisor/Snapshot.java b/src/main/java/fucoin/supervisor/Snapshot.java new file mode 100644 index 0000000000000000000000000000000000000000..f616d7d6d43b3fde9c896a028f93a3c1b5d4dfb1 --- /dev/null +++ b/src/main/java/fucoin/supervisor/Snapshot.java @@ -0,0 +1,16 @@ +package fucoin.supervisor; + + +import java.io.Serializable; + +public class Snapshot implements Serializable { + private final AmountTableModel amountTableModel; + + public Snapshot(SuperVisorImpl superVisor) { + this.amountTableModel = superVisor.getGui().getAmountTableModel(); + } + + public AmountTableModel getAmountTableModel() { + return amountTableModel; + } +} diff --git a/src/main/java/fucoin/supervisor/SuperVisorImpl.java b/src/main/java/fucoin/supervisor/SuperVisorImpl.java index 8b4ec173daab141c1172927ecaf34da83d5376c0..b7b85a16b160824b01184cc1055ded1e99a6ddcf 100644 --- a/src/main/java/fucoin/supervisor/SuperVisorImpl.java +++ b/src/main/java/fucoin/supervisor/SuperVisorImpl.java @@ -2,6 +2,7 @@ package fucoin.supervisor; import akka.actor.ActorRef; import akka.actor.Props; +import com.google.gson.Gson; import fucoin.actions.Action; import fucoin.actions.control.ActionWalletCreationDone; import fucoin.actions.persist.ActionInvokeUpdate; @@ -11,8 +12,10 @@ import fucoin.actions.transaction.SuperVisorAction; import fucoin.gui.SuperVisorGuiControl; import fucoin.AbstractNode; import fucoin.gui.TransactionLogger; +import fucoin.supervisor.Snapshot; import javax.swing.*; +import java.time.LocalDateTime; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -38,6 +41,10 @@ public class SuperVisorImpl extends AbstractNode implements TransactionLogger { this.gui = gui; } + public SuperVisorGuiControl getGui() { + return gui; + } + @Override public void onReceive(Object msg) { @@ -167,4 +174,12 @@ public class SuperVisorImpl extends AbstractNode implements TransactionLogger { public void setBankCommitObserver(ActorRef bankCommitObserver) { this.bankCommitObserver = bankCommitObserver; } + + public boolean writeSnapshot(LocalDateTime time) { + return super.writeSnapshot("SuperVisor", new Gson().toJson(new Snapshot(this)), time); + } + + public Snapshot readSnapShot(LocalDateTime time) { + return new Gson().fromJson(super.readSnapshot(time), Snapshot.class); + } } diff --git a/src/main/java/fucoin/wallet/AbstractWallet.java b/src/main/java/fucoin/wallet/AbstractWallet.java index 882c2a547b9060a576589281eaaea8c9d3e078ba..561e3008ebeba574a33738905509268190cf6e62 100644 --- a/src/main/java/fucoin/wallet/AbstractWallet.java +++ b/src/main/java/fucoin/wallet/AbstractWallet.java @@ -1,17 +1,22 @@ package fucoin.wallet; import akka.actor.ActorRef; +import com.google.gson.Gson; import fucoin.AbstractNode; import fucoin.gui.TransactionLogger; import scala.concurrent.Future; import java.io.Serializable; +import java.time.LocalDateTime; /** * */ public abstract class AbstractWallet extends AbstractNode implements Serializable, TransactionLogger { + private static final double sCompression = 0.7; + private QDigest qDigest = new QDigest(sCompression); + /** * Currently amount of this wallet */ @@ -114,4 +119,22 @@ public abstract class AbstractWallet extends AbstractNode implements Serializabl * @param observer */ public abstract void send(String address, int amount, ActorRef observer); + + + public QDigest getqDigest() { + return qDigest; + } + + public void setqDigest(QDigest qDigest) { + this.qDigest = qDigest; + } + + + public boolean writeSnapshot(LocalDateTime time) { + return super.writeSnapshot(getName(), new Gson().toJson(new Snapshot(this)), time); + } + + public Snapshot readSnapShot(LocalDateTime time) { + return new Gson().fromJson(super.readSnapshot(time), Snapshot.class); + } } diff --git a/src/main/java/fucoin/wallet/QDigest.java b/src/main/java/fucoin/wallet/QDigest.java index 7dcc82462305aaf9cefb3f8d042460276c91e5fe..36bc7fad9e6c54c168adaa768ae19dc4a00c5cc8 100644 --- a/src/main/java/fucoin/wallet/QDigest.java +++ b/src/main/java/fucoin/wallet/QDigest.java @@ -17,6 +17,7 @@ import it.unimi.dsi.fastutil.longs.Long2LongOpenHashMap; import it.unimi.dsi.fastutil.longs.LongArrayFIFOQueue; /** + * Source: https://github.com/addthis/stream-lib/blob/master/src/main/java/com/clearspring/analytics/stream/quantile/QDigest.java * Q-Digest datastructure. * <p/> * Answers approximate quantile queries: actual rank of the result of query(q) diff --git a/src/main/java/fucoin/wallet/Snapshot.java b/src/main/java/fucoin/wallet/Snapshot.java new file mode 100644 index 0000000000000000000000000000000000000000..0e4a6864ca7c2cc87992e176ce661ef973f78068 --- /dev/null +++ b/src/main/java/fucoin/wallet/Snapshot.java @@ -0,0 +1,30 @@ +package fucoin.wallet; + +import java.io.Serializable; +import java.util.List; + +public class Snapshot implements Serializable{ + private final String name; + private final int amount; + private final byte[] qDigest; + private final List<long[]> qDigestVal; + + public Snapshot(AbstractWallet wallet) { + this.name = wallet.getName(); + this.amount = wallet.getAmount(); + this.qDigest = QDigest.serialize(wallet.getqDigest()); + this.qDigestVal = wallet.getqDigest().toAscRanges(); + } + + public String getName() { + return name; + } + + public int getAmount() { + return amount; + } + + public QDigest getqDigest() { + return QDigest.deserialize(this.qDigest); + } +} diff --git a/src/main/java/fucoin/wallet/WalletImpl.java b/src/main/java/fucoin/wallet/WalletImpl.java index 88e64c9eb7c9242632a59ae0e957d01866be02d4..28f15ab5265b62ec8dbac958cee527d7c5edf19c 100644 --- a/src/main/java/fucoin/wallet/WalletImpl.java +++ b/src/main/java/fucoin/wallet/WalletImpl.java @@ -4,7 +4,6 @@ import akka.actor.ActorRef; import akka.actor.Props; import fucoin.actions.ClientAction; import fucoin.actions.join.ActionJoin; -import fucoin.actions.join.ActionJoinAnswer; import fucoin.actions.join.ActionTellSupervisor; import fucoin.actions.join.ServerActionJoin; import fucoin.actions.persist.ActionInvokeLeave; @@ -14,15 +13,11 @@ import fucoin.actions.transaction.ActionInvokeSentMoney; import fucoin.gui.WalletGuiControl; import scala.concurrent.Future; -import static akka.dispatch.Futures.future; - -import java.util.HashMap; import java.util.concurrent.ConcurrentLinkedQueue; -public class WalletImpl extends AbstractWallet { +import static akka.dispatch.Futures.future; - private static final double sCompression = 0.7; - private QDigest qDigest = new QDigest(sCompression); +public class WalletImpl extends AbstractWallet { private ActorRef preKnownNeighbour; private ActorRef remoteSuperVisorActor; @@ -54,7 +49,6 @@ public class WalletImpl extends AbstractWallet { */ public void addAmount(int amount) { setAmount(this.getAmount() + amount); - qDigest.offer(amount); addLogMsg(" My amount is now " + this.getAmount()); } @@ -255,4 +249,5 @@ public class WalletImpl extends AbstractWallet { System.out.println(message); } } + }