diff --git a/src/main/java/fucoin/actions/control/ActionSuperVisorCreateSnapshot.java b/src/main/java/fucoin/actions/control/ActionSuperVisorCreateSnapshot.java index faa271b6611f92a602d6f7033442b1ec46570ce6..15d66e3999a9782e9ceedd2a77ab1e47a0d33048 100644 --- a/src/main/java/fucoin/actions/control/ActionSuperVisorCreateSnapshot.java +++ b/src/main/java/fucoin/actions/control/ActionSuperVisorCreateSnapshot.java @@ -19,16 +19,16 @@ public class ActionSuperVisorCreateSnapshot extends SuperVisorAction { @Override protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, SuperVisorImpl abstractNode) { - //write the snapshot - if (!abstractNode.writeSnapshot(time)) { - //snapshot already exists - return; - } - - //notify one in the network - for(ActorRef startPoint: abstractNode.getKnownNeighbors().values()) { - startPoint.tell(new ActionWalletCreateSnapshot(time), self); - break; - } +// //write the snapshot +// if (!abstractNode.writeSnapshot(time)) { +// //snapshot already exists +// return; +// } +// +// //notify one in the network +// for(ActorRef startPoint: abstractNode.getKnownNeighbors().values()) { +// startPoint.tell(new ActionWalletCreateSnapshot(time), self); +// break; +// } } } diff --git a/src/main/java/fucoin/actions/control/ActionWalletCreateSnapshot.java b/src/main/java/fucoin/actions/control/ActionWalletCreateSnapshot.java index db9a2992f32ecfa595bb15710d8040f61f311ab0..ba8cb948adb6843556bf7220d5ab4b9b26cc47ff 100644 --- a/src/main/java/fucoin/actions/control/ActionWalletCreateSnapshot.java +++ b/src/main/java/fucoin/actions/control/ActionWalletCreateSnapshot.java @@ -10,21 +10,31 @@ import java.util.Map; public class ActionWalletCreateSnapshot extends ClientAction { private final LocalDateTime time; + private final ActorRef originator; - public ActionWalletCreateSnapshot(LocalDateTime time) { + public ActionWalletCreateSnapshot(LocalDateTime time, ActorRef originator) { this.time = time; + this.originator = originator; } @Override protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, AbstractWallet abstractNode) { //write the snapshot if (!abstractNode.writeSnapshot(time)) { + abstractNode.disableMessageForwarding(time, sender); + return; } + // send own state to initiator + originator.tell(new ActionWalletGetSnapshotAnswer(abstractNode, time), self); + + // enable recording of incoming messages + abstractNode.enableMessageForwarding(time, originator); + //tell the neighbours - for (Map.Entry<String, ActorRef> node : abstractNode.getKnownNeighbors().entrySet()) { - node.getValue().tell(this, self); - } + abstractNode.getKnownNeighbors() + .values() + .forEach(actorRef -> actorRef.tell(this, self)); } } diff --git a/src/main/java/fucoin/actions/control/ActionWalletGetSnapshotAnswer.java b/src/main/java/fucoin/actions/control/ActionWalletGetSnapshotAnswer.java index 00cf8077132aa4b043a0625b0bb837a27a341879..32a55c367066a3ccbfdddcc5c95da07337f7bd6f 100644 --- a/src/main/java/fucoin/actions/control/ActionWalletGetSnapshotAnswer.java +++ b/src/main/java/fucoin/actions/control/ActionWalletGetSnapshotAnswer.java @@ -23,7 +23,7 @@ public class ActionWalletGetSnapshotAnswer extends ClientAction { @Override protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, AbstractWallet abstractNode) { - + abstractNode.addSnapshot(snapshot); } public Snapshot getSnapshot() { diff --git a/src/main/java/fucoin/configurations/AbstractConfiguration.java b/src/main/java/fucoin/configurations/AbstractConfiguration.java index f391a7ed9d4c98e8d62743f0fff1ab3a4b5b5421..f9fb77784d71d57ef91a3afd641ff87ecdd4d30b 100644 --- a/src/main/java/fucoin/configurations/AbstractConfiguration.java +++ b/src/main/java/fucoin/configurations/AbstractConfiguration.java @@ -37,7 +37,7 @@ public abstract class AbstractConfiguration extends AbstractNode { protected int remainingWalletsToSpawn; - protected boolean walletGuiSupport = false; + protected boolean walletGuiSupport = true; public static Props props(Class configurationClass) { @@ -64,7 +64,7 @@ public abstract class AbstractConfiguration extends AbstractNode { String name = "Wallet" + remainingWalletsToSpawn--; //superVisor.tell(new ActionAnnounceWalletCreation(), self()); //Future<Object> future = Patterns.ask(superVisor, new ActionAnnounceWalletCreation(1, self()), timeout); - ActorRef wallet = createWallet(name, walletGuiSupport); + ActorRef wallet = createWallet(name, remainingWalletsToSpawn == 0);//walletGuiSupport); //Await.result(future, timeout.duration()); return wallet; } @@ -117,7 +117,7 @@ public abstract class AbstractConfiguration extends AbstractNode { } protected void nextRandomTransaction() { - nextRandomTransaction(true); + nextRandomTransaction(false); } protected void nextRandomTransaction(boolean startConcurrentTransactions) { diff --git a/src/main/java/fucoin/gui/SuperVisorGuiControlImpl.java b/src/main/java/fucoin/gui/SuperVisorGuiControlImpl.java index 429b2e1e53099341a7e3c9b746ef7fb7c522c8e0..13be49d5f2e102671cd1e586fceed58cbec352e6 100644 --- a/src/main/java/fucoin/gui/SuperVisorGuiControlImpl.java +++ b/src/main/java/fucoin/gui/SuperVisorGuiControlImpl.java @@ -47,7 +47,7 @@ public class SuperVisorGuiControlImpl implements SuperVisorGuiControl { public void createSnapshot(LocalDateTime time) { superVisor.writeSnapshot(time); for (Map.Entry<String, ActorRef> item : superVisor.getKnownNeighbors().entrySet()) { - item.getValue().tell(new ActionWalletCreateSnapshot(time), ActorRef.noSender()); + item.getValue().tell(new ActionWalletCreateSnapshot(time, superVisor.getSelf()), ActorRef.noSender()); break; } } diff --git a/src/main/java/fucoin/gui/SuperVisorHistogramGUI.java b/src/main/java/fucoin/gui/SuperVisorHistogramGUI.java index 1c704a9b37a6c4cfc9b260ff7ac2bb9f79007db7..53feb98fa68d1ceaba0c63effadd18e66293a1a1 100644 --- a/src/main/java/fucoin/gui/SuperVisorHistogramGUI.java +++ b/src/main/java/fucoin/gui/SuperVisorHistogramGUI.java @@ -12,25 +12,31 @@ import org.jfree.ui.RectangleAnchor; import org.jfree.ui.TextAnchor; import javax.swing.*; +import javax.swing.border.EmptyBorder; +import javax.swing.event.ChangeEvent; +import javax.swing.event.ChangeListener; import java.awt.*; import java.time.LocalDateTime; +import java.util.HashSet; +import java.util.Hashtable; +import java.util.Set; /** * */ public class SuperVisorHistogramGUI extends JFrame { - private final SuperVisorGuiControlImpl superVisorGuiControl; + private final Set<Snapshot> snapshots; private final LocalDateTime time; private static final int width = 1200; private static final int height = 800; - public SuperVisorHistogramGUI(SuperVisorGuiControlImpl superVisorGuiControl, LocalDateTime time) { + public SuperVisorHistogramGUI(Set<Snapshot> snapshots, LocalDateTime time) { super("Histogram of distributed granted commit (Snapshot " + time + ")"); - this.superVisorGuiControl = superVisorGuiControl; this.time = time; + this.snapshots = new HashSet<>(snapshots); setVisible(true); setSize(width, height); @@ -59,7 +65,7 @@ public class SuperVisorHistogramGUI extends JFrame { public void init() { QDigest qDigest = null; - for (Snapshot item : superVisorGuiControl.getSnapshots()) { + for (Snapshot item : snapshots) { if (qDigest == null) { qDigest = item.getqDigest(); } else { @@ -79,7 +85,40 @@ public class SuperVisorHistogramGUI extends JFrame { JFreeChart chart = createChart(dataset); final ChartPanel chartPanel = new ChartPanel(chart); - setContentPane(chartPanel); + + JPanel myPanel = new JPanel(new BorderLayout()); + myPanel.add(chartPanel, BorderLayout.CENTER); + + JPanel quantilePanel = new JPanel(); + + final JSlider slider = new JSlider(0, 100); + Hashtable<Integer, JComponent> labelTable = new Hashtable<>(); + labelTable.put(0, new JLabel("0.0") ); + labelTable.put(5, new JLabel("0.5") ); + labelTable.put(10, new JLabel("1.0") ); + slider.setLabelTable( labelTable ); + slider.setValue(50); + + final JTextField quantileValueField = new JTextField(); + quantileValueField.setEditable(false); + quantileValueField.setPreferredSize(new Dimension(40, 20)); + + QDigest finalQDigest = qDigest; + slider.addChangeListener(e -> { + double val = slider.getValue() / 100.; + assert val <= 1.0; + quantileValueField.setText(String.format("%1$d", finalQDigest.getQuantile(val))); + }); + + slider.getChangeListeners()[0].stateChanged(null); + + quantilePanel.add(slider); + quantilePanel.add(quantileValueField); + quantilePanel.setBorder(new EmptyBorder(10, 10, 10, 10)); + + myPanel.add(quantilePanel, BorderLayout.PAGE_END); + + setContentPane(myPanel); } } diff --git a/src/main/java/fucoin/gui/SuperVisorThreadGUI.java b/src/main/java/fucoin/gui/SuperVisorThreadGUI.java index e33d89f1d4af98153e330f3a3f2f0110a05b23f1..f4cc75cdf50ea42534792a01da4c097c024419a8 100644 --- a/src/main/java/fucoin/gui/SuperVisorThreadGUI.java +++ b/src/main/java/fucoin/gui/SuperVisorThreadGUI.java @@ -1,11 +1,14 @@ package fucoin.gui; +import fucoin.wallet.Snapshot; + import javax.swing.*; import java.awt.*; import java.awt.event.ItemEvent; import java.awt.event.WindowAdapter; import java.awt.event.WindowEvent; import java.time.LocalDateTime; +import java.util.HashSet; /** * @@ -95,7 +98,7 @@ public class SuperVisorThreadGUI { showHistogramBtn.addActionListener(e -> { LocalDateTime time = (LocalDateTime) dropdown.getSelectedItem(); superVisorGuiControl.loadSnapshot(time); - new SuperVisorHistogramGUI(superVisorGuiControl, time).init(); + new SuperVisorHistogramGUI(new HashSet<>(superVisorGuiControl.getSnapshots()), time).init(); }); btnPanel.add(showHistogramBtn); diff --git a/src/main/java/fucoin/gui/WalletGuiControlImpl.java b/src/main/java/fucoin/gui/WalletGuiControlImpl.java index 49c69b9408d320ac404f5194b4e856c50c80c480..7ceecb14f99b1a6f4095a8c71ae9911ba1970e18 100644 --- a/src/main/java/fucoin/gui/WalletGuiControlImpl.java +++ b/src/main/java/fucoin/gui/WalletGuiControlImpl.java @@ -1,12 +1,17 @@ package fucoin.gui; import akka.actor.ActorSelection; +import fucoin.actions.control.ActionWalletCreateSnapshot; import fucoin.wallet.AbstractWallet; +import fucoin.wallet.Snapshot; import javax.swing.*; import java.awt.*; import java.awt.event.*; +import java.time.LocalDate; +import java.time.LocalDateTime; import java.util.Enumeration; +import java.util.Set; public class WalletGuiControlImpl implements WalletGuiControl { @@ -27,10 +32,13 @@ public class WalletGuiControlImpl implements WalletGuiControl { private JButton btnSearch = new JButton("Search"); private JButton btnStore = new JButton("Store"); private JButton btnExit = new JButton("Exit"); + private JButton btnCreateSnapshot = new JButton("Create Snapshot"); private JPanel bottomPanel = new JPanel(); private JList<LogMessage> txtLog = new JList<>(log); private JScrollPane logPane = new JScrollPane(txtLog); private JCheckBox showDebug; + private JComboBox<LocalDateTime> dropdown = new JComboBox<>(); + private JButton btnShowHistogram = new JButton("Show Histogram"); public WalletGuiControlImpl(AbstractWallet wallet) { @@ -70,7 +78,7 @@ public class WalletGuiControlImpl implements WalletGuiControl { topPanel.add(row2); window.add(topPanel); //<hr> - centerPanel.setLayout(new GridLayout(4, 1)); + centerPanel.setLayout(new GridLayout(5, 1)); // Row 1 JPanel centerup = new JPanel(); centerup.setLayout(new BorderLayout()); @@ -105,9 +113,19 @@ public class WalletGuiControlImpl implements WalletGuiControl { centerdown2.add(btnStore); centerdown2.add(btnExit); centerPanel.add(centerdown2); + + // Row 4 + JPanel centerdown3 = new JPanel(); + centerdown3.setLayout(new GridLayout(1, 3)); + centerdown3.add(btnCreateSnapshot); + centerdown3.add(dropdown); + centerdown3.add(btnShowHistogram); + centerPanel.add(centerdown3); + window.add(centerPanel); + bottomPanel.setLayout(new BorderLayout()); log.setTransactionFilter(); @@ -136,6 +154,23 @@ public class WalletGuiControlImpl implements WalletGuiControl { }); + btnShowHistogram.setEnabled(dropdown.getSelectedIndex() != -1); + btnShowHistogram.addActionListener(e -> { + LocalDateTime time = (LocalDateTime)dropdown.getSelectedItem(); + Set<Snapshot> snapshots = wallet.getSnapshots(time); + new SuperVisorHistogramGUI(snapshots, time).init(); + }); + + dropdown.addActionListener(e -> { + btnShowHistogram.setEnabled(dropdown.getSelectedIndex() != -1); + }); + + btnCreateSnapshot.addActionListener(e -> { + LocalDateTime now = LocalDateTime.now(); + dropdown.addItem(now); + wallet.createDistributedSnapshot(now); + }); + txtLog.setCellRenderer(new LogCellRenderer()); @@ -154,7 +189,6 @@ public class WalletGuiControlImpl implements WalletGuiControl { System.out.println("window closing"); wallet.leave(); super.windowClosing(e); - } @Override diff --git a/src/main/java/fucoin/wallet/AbstractWallet.java b/src/main/java/fucoin/wallet/AbstractWallet.java index 9b09de9d5521fef3d539018495edb97a8da56afa..8fde67458d41fbea150215718233de6310486930 100644 --- a/src/main/java/fucoin/wallet/AbstractWallet.java +++ b/src/main/java/fucoin/wallet/AbstractWallet.java @@ -9,6 +9,7 @@ import fucoin.gui.TransactionLogger; import java.io.Serializable; import java.time.LocalDateTime; +import java.util.Set; /** * @@ -119,10 +120,20 @@ public abstract class AbstractWallet extends AbstractNode implements Serializabl public boolean writeSnapshot(LocalDateTime time) { return super.writeSnapshot(getName(), - (new GsonBuilder().setPrettyPrinting().create()).toJson(new Snapshot(this)), time); + (new GsonBuilder().setPrettyPrinting().create()).toJson(new Snapshot(this, time)), time); } public Snapshot readSnapShot(LocalDateTime time) { return new Gson().fromJson(super.readSnapshot(getName(), time), Snapshot.class); } + + public abstract void createDistributedSnapshot(LocalDateTime dateTime); + + public abstract void enableMessageForwarding(LocalDateTime time, ActorRef originator); + + public abstract void disableMessageForwarding(LocalDateTime time, ActorRef sender); + + public abstract void addSnapshot(Snapshot snapshot); + + public abstract Set<Snapshot> getSnapshots(LocalDateTime token); } diff --git a/src/main/java/fucoin/wallet/Snapshot.java b/src/main/java/fucoin/wallet/Snapshot.java index cb2d2f4366a34866f0e70da7a46500c8a6109112..fe8992ad19ffcef034af2b3e7a210f803b96c3ea 100644 --- a/src/main/java/fucoin/wallet/Snapshot.java +++ b/src/main/java/fucoin/wallet/Snapshot.java @@ -4,6 +4,8 @@ import fucoin.QDigest; import fucoin.Statistics; import java.io.Serializable; +import java.time.LocalDate; +import java.time.LocalDateTime; import java.util.HashMap; import java.util.List; import java.util.Set; @@ -16,10 +18,12 @@ public class Snapshot implements Serializable { private final Statistics statistics; private final List<long[]> qDigestVal; private final byte[] qDigest; + private final LocalDateTime time; - public Snapshot(AbstractWallet wallet) { + public Snapshot(AbstractWallet wallet, LocalDateTime time) { this.name = wallet.getName(); + this.time = time; this.amount = wallet.getAmount(); this.statistics = wallet.getStatistics(); this.knownNeighbour = wallet.getKnownNeighbors().keySet(); @@ -57,4 +61,8 @@ public class Snapshot implements Serializable { public QDigest getqDigest() { return QDigest.deserialize(this.qDigest); } + + public LocalDateTime getTime() { + return time; + } } diff --git a/src/main/java/fucoin/wallet/WalletImpl.java b/src/main/java/fucoin/wallet/WalletImpl.java index 28f15ab5265b62ec8dbac958cee527d7c5edf19c..fed79109bc711fa362e65a58d79dd7a56a93a520 100644 --- a/src/main/java/fucoin/wallet/WalletImpl.java +++ b/src/main/java/fucoin/wallet/WalletImpl.java @@ -1,8 +1,8 @@ package fucoin.wallet; -import akka.actor.ActorRef; -import akka.actor.Props; +import akka.actor.*; import fucoin.actions.ClientAction; +import fucoin.actions.control.ActionWalletCreateSnapshot; import fucoin.actions.join.ActionJoin; import fucoin.actions.join.ActionTellSupervisor; import fucoin.actions.join.ServerActionJoin; @@ -12,8 +12,17 @@ import fucoin.actions.transaction.ActionGetAmountAnswer; import fucoin.actions.transaction.ActionInvokeSentMoney; import fucoin.gui.WalletGuiControl; import scala.concurrent.Future; - +import scala.concurrent.duration.Duration; + +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import static akka.dispatch.Futures.future; @@ -250,4 +259,72 @@ public class WalletImpl extends AbstractWallet { } } + // keeps track of what channels we're recording for a particular snapshot token + Map<LocalDateTime, Set<ActorRef>> activeChannels = new HashMap<>(); + + // keeps track of who the originator for the snapshot token was (needed for forwarding) + Map<LocalDateTime, ActorRef> token2Originator = new HashMap<>(); + + // collects all snapshot responses + Map<LocalDateTime, Set<Snapshot>> collectedSnapshots = new HashMap<>(); + + @Override + public void enableMessageForwarding(LocalDateTime time, ActorRef originator) { + token2Originator.put(time, originator); + + Set<ActorRef> refs = new HashSet<>(); + + activeChannels.put(time, refs); + + getKnownNeighbors() + .values() + .stream() + .filter(actorRef -> actorRef != originator) + .forEach(refs::add); + } + + @Override + public void disableMessageForwarding(LocalDateTime time, ActorRef sender) { + Set<ActorRef> remainingChannels = activeChannels.get(time); + + if(remainingChannels == null) { + // we're finished already + return; + } + + remainingChannels.remove(sender); + + // stop recording messages + if(remainingChannels.isEmpty()) { + activeChannels.remove(time); + token2Originator.remove(time); + } + } + + @Override + public void createDistributedSnapshot(LocalDateTime dateTime) { + LocalDateTime token = dateTime; + + enableMessageForwarding(token, self()); + + getKnownNeighbors().values().forEach(actorRef -> { + actorRef.tell(new ActionWalletCreateSnapshot(token, self()), self()); + }); + + + // wait for answers + getContext().system().scheduler().scheduleOnce(Duration.create(1, TimeUnit.SECONDS), () -> { + + }, getContext().system().dispatcher()); + } + + @Override + public void addSnapshot(Snapshot snapshot) { + collectedSnapshots.putIfAbsent(snapshot.getTime(), new HashSet<>()); + collectedSnapshots.get(snapshot.getTime()).add(snapshot); + } + + public Set<Snapshot> getSnapshots(LocalDateTime token) { + return new HashSet<>(collectedSnapshots.get(token)); + } }