From 8fcac86dd4e076b5b3a3df870a251ffcc6b040b5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20K=C3=B6nnecke?= <simonkoennecke@gmail.com> Date: Fri, 8 Jul 2016 01:34:11 +0200 Subject: [PATCH] snapshot --- pom.xml | 5 ++ src/main/java/fucoin/AbstractNode.java | 49 +++++++++++++++++++ .../ActionSuperVisorCreateSnapshot.java | 27 ++++++++++ .../control/ActionWalletCreateSnapshot.java | 30 ++++++++++++ ...ionCommitDistributedCommittedTransfer.java | 1 + .../java/fucoin/gui/SuperVisorGuiControl.java | 6 ++- .../fucoin/gui/SuperVisorGuiControlImpl.java | 15 +++++- .../java/fucoin/gui/SuperVisorThreadGUI.java | 17 +++++++ src/main/java/fucoin/supervisor/Snapshot.java | 16 ++++++ .../fucoin/supervisor/SuperVisorImpl.java | 15 ++++++ .../java/fucoin/wallet/AbstractWallet.java | 23 +++++++++ src/main/java/fucoin/wallet/QDigest.java | 1 + src/main/java/fucoin/wallet/Snapshot.java | 30 ++++++++++++ src/main/java/fucoin/wallet/WalletImpl.java | 11 ++--- 14 files changed, 236 insertions(+), 10 deletions(-) create mode 100644 src/main/java/fucoin/actions/control/ActionSuperVisorCreateSnapshot.java create mode 100644 src/main/java/fucoin/actions/control/ActionWalletCreateSnapshot.java create mode 100644 src/main/java/fucoin/supervisor/Snapshot.java create mode 100644 src/main/java/fucoin/wallet/Snapshot.java diff --git a/pom.xml b/pom.xml index f81f424..c923f4a 100644 --- a/pom.xml +++ b/pom.xml @@ -67,5 +67,10 @@ <artifactId>fastutil</artifactId> <version>7.0.12</version> </dependency> + <dependency> + <groupId>com.google.code.gson</groupId> + <artifactId>gson</artifactId> + <version>2.7</version> + </dependency> </dependencies> </project> \ No newline at end of file diff --git a/src/main/java/fucoin/AbstractNode.java b/src/main/java/fucoin/AbstractNode.java index 9777980..7fd042f 100644 --- a/src/main/java/fucoin/AbstractNode.java +++ b/src/main/java/fucoin/AbstractNode.java @@ -6,7 +6,11 @@ import akka.actor.UntypedActor; import fucoin.actions.transaction.ActionGetAmount; import fucoin.wallet.AbstractWallet; +import java.io.File; +import java.io.FileOutputStream; +import java.io.OutputStreamWriter; import java.io.Serializable; +import java.time.LocalDateTime; import java.util.HashMap; public abstract class AbstractNode extends UntypedActor implements Serializable { @@ -68,4 +72,49 @@ public abstract class AbstractNode extends UntypedActor implements Serializable public HashMap<String, ActorRef> getKnownNeighbors() { return knownNeighbors; } + + private HashMap<LocalDateTime, String> snapshot = new HashMap(); + + public HashMap<LocalDateTime, String> getSnapshot() { + return snapshot; + } + + public String readSnapshot(LocalDateTime time) { + return getSnapshot().get(time); + } + + public boolean writeSnapshot(String name, String content, LocalDateTime time) { + if (getSnapshot().containsKey(time)) { + return false; + } + + getSnapshot().put(time, content); + + String folder = time.getYear() + "-" + time.getMonthValue() + "-" + time.getDayOfMonth() + " " + + time.getHour() + "." + time.getMinute() + "." + time.getSecond(); + String filename = "snapshots/" + folder + "/" + name + ".json"; + + try { + File theDir = new File("snapshots"); + // if the directory does not exist, create it + if (!theDir.exists()) { + theDir.mkdir(); + } + theDir = new File("snapshots/" + folder); + if (!theDir.exists()) { + theDir.mkdir(); + } + + File myFile = new File(filename); + myFile.createNewFile(); + FileOutputStream fOut = new FileOutputStream(myFile); + OutputStreamWriter myOutWriter = new OutputStreamWriter(fOut); + myOutWriter.append(content); + myOutWriter.close(); + fOut.close(); + } catch (Exception e) { + e.printStackTrace(); + } + return true; + } } \ No newline at end of file diff --git a/src/main/java/fucoin/actions/control/ActionSuperVisorCreateSnapshot.java b/src/main/java/fucoin/actions/control/ActionSuperVisorCreateSnapshot.java new file mode 100644 index 0000000..2684027 --- /dev/null +++ b/src/main/java/fucoin/actions/control/ActionSuperVisorCreateSnapshot.java @@ -0,0 +1,27 @@ +package fucoin.actions.control; + +import akka.actor.ActorRef; +import akka.actor.UntypedActorContext; +import fucoin.actions.ClientAction; +import fucoin.actions.transaction.SuperVisorAction; +import fucoin.supervisor.SuperVisorImpl; +import fucoin.wallet.AbstractWallet; + +import java.time.LocalDateTime; +import java.util.Map; + +public class ActionSuperVisorCreateSnapshot extends SuperVisorAction { + private final LocalDateTime time; + + public ActionSuperVisorCreateSnapshot(LocalDateTime time) { + this.time = time; + } + + @Override + protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, SuperVisorImpl abstractNode) { + //write the snapshot + if (!abstractNode.writeSnapshot(time)) { + return; + } + } +} diff --git a/src/main/java/fucoin/actions/control/ActionWalletCreateSnapshot.java b/src/main/java/fucoin/actions/control/ActionWalletCreateSnapshot.java new file mode 100644 index 0000000..db9a299 --- /dev/null +++ b/src/main/java/fucoin/actions/control/ActionWalletCreateSnapshot.java @@ -0,0 +1,30 @@ +package fucoin.actions.control; + +import akka.actor.ActorRef; +import akka.actor.UntypedActorContext; +import fucoin.actions.ClientAction; +import fucoin.wallet.AbstractWallet; + +import java.time.LocalDateTime; +import java.util.Map; + +public class ActionWalletCreateSnapshot extends ClientAction { + private final LocalDateTime time; + + public ActionWalletCreateSnapshot(LocalDateTime time) { + this.time = time; + } + + @Override + protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, AbstractWallet abstractNode) { + //write the snapshot + if (!abstractNode.writeSnapshot(time)) { + return; + } + + //tell the neighbours + for (Map.Entry<String, ActorRef> node : abstractNode.getKnownNeighbors().entrySet()) { + node.getValue().tell(this, self); + } + } +} diff --git a/src/main/java/fucoin/actions/transaction/ActionCommitDistributedCommittedTransfer.java b/src/main/java/fucoin/actions/transaction/ActionCommitDistributedCommittedTransfer.java index f44d084..2605a2f 100644 --- a/src/main/java/fucoin/actions/transaction/ActionCommitDistributedCommittedTransfer.java +++ b/src/main/java/fucoin/actions/transaction/ActionCommitDistributedCommittedTransfer.java @@ -47,6 +47,7 @@ public class ActionCommitDistributedCommittedTransfer extends ClientAction { if (granted) { if (source.compareTo(self) == 0) { + wallet.getqDigest().offer(amount); wallet.setAmount(wallet.getAmount() - amount); wallet.addTransactionLogMessageSuccess("Sent " + amount + " FUC to " + target.path().name()); } else if (target.compareTo(self) == 0) { diff --git a/src/main/java/fucoin/gui/SuperVisorGuiControl.java b/src/main/java/fucoin/gui/SuperVisorGuiControl.java index b8b29e5..b37177b 100644 --- a/src/main/java/fucoin/gui/SuperVisorGuiControl.java +++ b/src/main/java/fucoin/gui/SuperVisorGuiControl.java @@ -1,5 +1,7 @@ package fucoin.gui; +import fucoin.supervisor.AmountTableModel; + public interface SuperVisorGuiControl extends TransactionLogger { /** @@ -7,6 +9,8 @@ public interface SuperVisorGuiControl extends TransactionLogger { */ void onLeave(); - public void updateTable(String address, String name, int amount); + void updateTable(String address, String name, int amount); + + AmountTableModel getAmountTableModel(); } diff --git a/src/main/java/fucoin/gui/SuperVisorGuiControlImpl.java b/src/main/java/fucoin/gui/SuperVisorGuiControlImpl.java index 333ba6c..ca7526e 100644 --- a/src/main/java/fucoin/gui/SuperVisorGuiControlImpl.java +++ b/src/main/java/fucoin/gui/SuperVisorGuiControlImpl.java @@ -3,6 +3,7 @@ package fucoin.gui; import akka.actor.ActorRef; import akka.pattern.Patterns; import akka.util.Timeout; +import fucoin.actions.control.ActionWalletCreateSnapshot; import fucoin.actions.control.ActionWalletGetNeighbours; import fucoin.actions.control.ActionWalletGetNeighboursAnswer; import fucoin.supervisor.AmountTableModel; @@ -11,9 +12,10 @@ import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Duration; +import java.time.LocalDateTime; import java.util.HashMap; +import java.util.LinkedList; import java.util.Map; -import java.util.Set; public class SuperVisorGuiControlImpl implements SuperVisorGuiControl { private SuperVisorImpl superVisor; @@ -22,6 +24,8 @@ public class SuperVisorGuiControlImpl implements SuperVisorGuiControl { private HashMap<String, HashMap<String, ActorRef>> nodeNeighbours = new HashMap<>(); + private LinkedList<LocalDateTime> snapshotTimes = new LinkedList<>(); + private SuperVisorThreadGUI threadGUI; private boolean logActive = false; @@ -40,6 +44,14 @@ public class SuperVisorGuiControlImpl implements SuperVisorGuiControl { } + public void createSnapshot(LocalDateTime time) { + snapshotTimes.add(time); + for (Map.Entry<String, ActorRef> item : superVisor.getKnownNeighbors().entrySet()) { + item.getValue().tell(new ActionWalletCreateSnapshot(time), ActorRef.noSender()); + break; + } + } + public void updateNodeNeighbourList() { try { Timeout timeout = new Timeout(Duration.create(10, "seconds")); @@ -62,6 +74,7 @@ public class SuperVisorGuiControlImpl implements SuperVisorGuiControl { public HashMap<String, HashMap<String, ActorRef>> getNodeNeighbourList() { return nodeNeighbours; } + public void guiTerminated() { superVisor.exit(); } diff --git a/src/main/java/fucoin/gui/SuperVisorThreadGUI.java b/src/main/java/fucoin/gui/SuperVisorThreadGUI.java index 6f6b4b0..c8a8721 100644 --- a/src/main/java/fucoin/gui/SuperVisorThreadGUI.java +++ b/src/main/java/fucoin/gui/SuperVisorThreadGUI.java @@ -5,6 +5,7 @@ import java.awt.*; import java.awt.event.ItemEvent; import java.awt.event.WindowAdapter; import java.awt.event.WindowEvent; +import java.time.LocalDateTime; /** * @@ -17,6 +18,7 @@ public class SuperVisorThreadGUI { private JScrollPane logPane = new JScrollPane(txtLog); private JCheckBox showDebug; private JCheckBox activateLogging; + private JComboBox<LocalDateTime> dropdown = new JComboBox<LocalDateTime>(); private SuperVisorGuiControlImpl superVisorGuiControl; public SuperVisorThreadGUI(SuperVisorGuiControlImpl superVisorGuiControl) { @@ -65,13 +67,19 @@ public class SuperVisorThreadGUI { configPanel.add(activateLogging); configPanel.add(showDebug); + JPanel snapshotPanel = new JPanel(); + snapshotPanel.add(dropdown); + //logPanel.add(activateLogging, BorderLayout.NORTH); logPanel.add(configPanel, BorderLayout.NORTH); + logPanel.add(snapshotPanel, BorderLayout.SOUTH); logPanel.add(logPane, BorderLayout.CENTER); contentPanel.add(logPanel); frame.add(contentPanel, BorderLayout.CENTER); + + JPanel btnPanel = new JPanel(); JButton showGraphBtn = new JButton("Show Graph"); @@ -81,6 +89,14 @@ public class SuperVisorThreadGUI { }); btnPanel.add(showGraphBtn); + JButton createSnapshot = new JButton("Create Snapshot"); + createSnapshot.addActionListener(e -> { + LocalDateTime time = LocalDateTime.now(); + superVisorGuiControl.createSnapshot(time); + dropdown.addItem(time); + }); + btnPanel.add(createSnapshot); + //Exit Button and shutdown supervisor JButton exitBtn = new JButton("Stop Supervisor"); @@ -106,6 +122,7 @@ public class SuperVisorThreadGUI { public void windowClosing(WindowEvent e) { super.windowClosing(e); superVisorGuiControl.guiTerminated(); + System.exit(0); } }); }).start(); diff --git a/src/main/java/fucoin/supervisor/Snapshot.java b/src/main/java/fucoin/supervisor/Snapshot.java new file mode 100644 index 0000000..f616d7d --- /dev/null +++ b/src/main/java/fucoin/supervisor/Snapshot.java @@ -0,0 +1,16 @@ +package fucoin.supervisor; + + +import java.io.Serializable; + +public class Snapshot implements Serializable { + private final AmountTableModel amountTableModel; + + public Snapshot(SuperVisorImpl superVisor) { + this.amountTableModel = superVisor.getGui().getAmountTableModel(); + } + + public AmountTableModel getAmountTableModel() { + return amountTableModel; + } +} diff --git a/src/main/java/fucoin/supervisor/SuperVisorImpl.java b/src/main/java/fucoin/supervisor/SuperVisorImpl.java index 8b4ec17..b7b85a1 100644 --- a/src/main/java/fucoin/supervisor/SuperVisorImpl.java +++ b/src/main/java/fucoin/supervisor/SuperVisorImpl.java @@ -2,6 +2,7 @@ package fucoin.supervisor; import akka.actor.ActorRef; import akka.actor.Props; +import com.google.gson.Gson; import fucoin.actions.Action; import fucoin.actions.control.ActionWalletCreationDone; import fucoin.actions.persist.ActionInvokeUpdate; @@ -11,8 +12,10 @@ import fucoin.actions.transaction.SuperVisorAction; import fucoin.gui.SuperVisorGuiControl; import fucoin.AbstractNode; import fucoin.gui.TransactionLogger; +import fucoin.supervisor.Snapshot; import javax.swing.*; +import java.time.LocalDateTime; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -38,6 +41,10 @@ public class SuperVisorImpl extends AbstractNode implements TransactionLogger { this.gui = gui; } + public SuperVisorGuiControl getGui() { + return gui; + } + @Override public void onReceive(Object msg) { @@ -167,4 +174,12 @@ public class SuperVisorImpl extends AbstractNode implements TransactionLogger { public void setBankCommitObserver(ActorRef bankCommitObserver) { this.bankCommitObserver = bankCommitObserver; } + + public boolean writeSnapshot(LocalDateTime time) { + return super.writeSnapshot("SuperVisor", new Gson().toJson(new Snapshot(this)), time); + } + + public Snapshot readSnapShot(LocalDateTime time) { + return new Gson().fromJson(super.readSnapshot(time), Snapshot.class); + } } diff --git a/src/main/java/fucoin/wallet/AbstractWallet.java b/src/main/java/fucoin/wallet/AbstractWallet.java index 882c2a5..561e300 100644 --- a/src/main/java/fucoin/wallet/AbstractWallet.java +++ b/src/main/java/fucoin/wallet/AbstractWallet.java @@ -1,17 +1,22 @@ package fucoin.wallet; import akka.actor.ActorRef; +import com.google.gson.Gson; import fucoin.AbstractNode; import fucoin.gui.TransactionLogger; import scala.concurrent.Future; import java.io.Serializable; +import java.time.LocalDateTime; /** * */ public abstract class AbstractWallet extends AbstractNode implements Serializable, TransactionLogger { + private static final double sCompression = 0.7; + private QDigest qDigest = new QDigest(sCompression); + /** * Currently amount of this wallet */ @@ -114,4 +119,22 @@ public abstract class AbstractWallet extends AbstractNode implements Serializabl * @param observer */ public abstract void send(String address, int amount, ActorRef observer); + + + public QDigest getqDigest() { + return qDigest; + } + + public void setqDigest(QDigest qDigest) { + this.qDigest = qDigest; + } + + + public boolean writeSnapshot(LocalDateTime time) { + return super.writeSnapshot(getName(), new Gson().toJson(new Snapshot(this)), time); + } + + public Snapshot readSnapShot(LocalDateTime time) { + return new Gson().fromJson(super.readSnapshot(time), Snapshot.class); + } } diff --git a/src/main/java/fucoin/wallet/QDigest.java b/src/main/java/fucoin/wallet/QDigest.java index 7dcc824..36bc7fa 100644 --- a/src/main/java/fucoin/wallet/QDigest.java +++ b/src/main/java/fucoin/wallet/QDigest.java @@ -17,6 +17,7 @@ import it.unimi.dsi.fastutil.longs.Long2LongOpenHashMap; import it.unimi.dsi.fastutil.longs.LongArrayFIFOQueue; /** + * Source: https://github.com/addthis/stream-lib/blob/master/src/main/java/com/clearspring/analytics/stream/quantile/QDigest.java * Q-Digest datastructure. * <p/> * Answers approximate quantile queries: actual rank of the result of query(q) diff --git a/src/main/java/fucoin/wallet/Snapshot.java b/src/main/java/fucoin/wallet/Snapshot.java new file mode 100644 index 0000000..0e4a686 --- /dev/null +++ b/src/main/java/fucoin/wallet/Snapshot.java @@ -0,0 +1,30 @@ +package fucoin.wallet; + +import java.io.Serializable; +import java.util.List; + +public class Snapshot implements Serializable{ + private final String name; + private final int amount; + private final byte[] qDigest; + private final List<long[]> qDigestVal; + + public Snapshot(AbstractWallet wallet) { + this.name = wallet.getName(); + this.amount = wallet.getAmount(); + this.qDigest = QDigest.serialize(wallet.getqDigest()); + this.qDigestVal = wallet.getqDigest().toAscRanges(); + } + + public String getName() { + return name; + } + + public int getAmount() { + return amount; + } + + public QDigest getqDigest() { + return QDigest.deserialize(this.qDigest); + } +} diff --git a/src/main/java/fucoin/wallet/WalletImpl.java b/src/main/java/fucoin/wallet/WalletImpl.java index 88e64c9..28f15ab 100644 --- a/src/main/java/fucoin/wallet/WalletImpl.java +++ b/src/main/java/fucoin/wallet/WalletImpl.java @@ -4,7 +4,6 @@ import akka.actor.ActorRef; import akka.actor.Props; import fucoin.actions.ClientAction; import fucoin.actions.join.ActionJoin; -import fucoin.actions.join.ActionJoinAnswer; import fucoin.actions.join.ActionTellSupervisor; import fucoin.actions.join.ServerActionJoin; import fucoin.actions.persist.ActionInvokeLeave; @@ -14,15 +13,11 @@ import fucoin.actions.transaction.ActionInvokeSentMoney; import fucoin.gui.WalletGuiControl; import scala.concurrent.Future; -import static akka.dispatch.Futures.future; - -import java.util.HashMap; import java.util.concurrent.ConcurrentLinkedQueue; -public class WalletImpl extends AbstractWallet { +import static akka.dispatch.Futures.future; - private static final double sCompression = 0.7; - private QDigest qDigest = new QDigest(sCompression); +public class WalletImpl extends AbstractWallet { private ActorRef preKnownNeighbour; private ActorRef remoteSuperVisorActor; @@ -54,7 +49,6 @@ public class WalletImpl extends AbstractWallet { */ public void addAmount(int amount) { setAmount(this.getAmount() + amount); - qDigest.offer(amount); addLogMsg(" My amount is now " + this.getAmount()); } @@ -255,4 +249,5 @@ public class WalletImpl extends AbstractWallet { System.out.println(message); } } + } -- GitLab