Skip to content
Snippets Groups Projects
Commit d2fa98d7 authored by Danni Otterbach's avatar Danni Otterbach
Browse files

implement distributed snapshot for wallets and add x-quantile to histogram view

parent 1e3a38f7
No related branches found
No related tags found
No related merge requests found
Showing
with 215 additions and 33 deletions
...@@ -19,16 +19,16 @@ public class ActionSuperVisorCreateSnapshot extends SuperVisorAction { ...@@ -19,16 +19,16 @@ public class ActionSuperVisorCreateSnapshot extends SuperVisorAction {
@Override @Override
protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, SuperVisorImpl abstractNode) { protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, SuperVisorImpl abstractNode) {
//write the snapshot // //write the snapshot
if (!abstractNode.writeSnapshot(time)) { // if (!abstractNode.writeSnapshot(time)) {
//snapshot already exists // //snapshot already exists
return; // return;
} // }
//
//notify one in the network // //notify one in the network
for(ActorRef startPoint: abstractNode.getKnownNeighbors().values()) { // for(ActorRef startPoint: abstractNode.getKnownNeighbors().values()) {
startPoint.tell(new ActionWalletCreateSnapshot(time), self); // startPoint.tell(new ActionWalletCreateSnapshot(time), self);
break; // break;
} // }
} }
} }
...@@ -10,21 +10,31 @@ import java.util.Map; ...@@ -10,21 +10,31 @@ import java.util.Map;
public class ActionWalletCreateSnapshot extends ClientAction { public class ActionWalletCreateSnapshot extends ClientAction {
private final LocalDateTime time; private final LocalDateTime time;
private final ActorRef originator;
public ActionWalletCreateSnapshot(LocalDateTime time) { public ActionWalletCreateSnapshot(LocalDateTime time, ActorRef originator) {
this.time = time; this.time = time;
this.originator = originator;
} }
@Override @Override
protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, AbstractWallet abstractNode) { protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, AbstractWallet abstractNode) {
//write the snapshot //write the snapshot
if (!abstractNode.writeSnapshot(time)) { if (!abstractNode.writeSnapshot(time)) {
abstractNode.disableMessageForwarding(time, sender);
return; return;
} }
// send own state to initiator
originator.tell(new ActionWalletGetSnapshotAnswer(abstractNode, time), self);
// enable recording of incoming messages
abstractNode.enableMessageForwarding(time, originator);
//tell the neighbours //tell the neighbours
for (Map.Entry<String, ActorRef> node : abstractNode.getKnownNeighbors().entrySet()) { abstractNode.getKnownNeighbors()
node.getValue().tell(this, self); .values()
} .forEach(actorRef -> actorRef.tell(this, self));
} }
} }
...@@ -23,7 +23,7 @@ public class ActionWalletGetSnapshotAnswer extends ClientAction { ...@@ -23,7 +23,7 @@ public class ActionWalletGetSnapshotAnswer extends ClientAction {
@Override @Override
protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, AbstractWallet abstractNode) { protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, AbstractWallet abstractNode) {
abstractNode.addSnapshot(snapshot);
} }
public Snapshot getSnapshot() { public Snapshot getSnapshot() {
......
...@@ -37,7 +37,7 @@ public abstract class AbstractConfiguration extends AbstractNode { ...@@ -37,7 +37,7 @@ public abstract class AbstractConfiguration extends AbstractNode {
protected int remainingWalletsToSpawn; protected int remainingWalletsToSpawn;
protected boolean walletGuiSupport = false; protected boolean walletGuiSupport = true;
public static Props props(Class configurationClass) { public static Props props(Class configurationClass) {
...@@ -64,7 +64,7 @@ public abstract class AbstractConfiguration extends AbstractNode { ...@@ -64,7 +64,7 @@ public abstract class AbstractConfiguration extends AbstractNode {
String name = "Wallet" + remainingWalletsToSpawn--; String name = "Wallet" + remainingWalletsToSpawn--;
//superVisor.tell(new ActionAnnounceWalletCreation(), self()); //superVisor.tell(new ActionAnnounceWalletCreation(), self());
//Future<Object> future = Patterns.ask(superVisor, new ActionAnnounceWalletCreation(1, self()), timeout); //Future<Object> future = Patterns.ask(superVisor, new ActionAnnounceWalletCreation(1, self()), timeout);
ActorRef wallet = createWallet(name, walletGuiSupport); ActorRef wallet = createWallet(name, remainingWalletsToSpawn == 0);//walletGuiSupport);
//Await.result(future, timeout.duration()); //Await.result(future, timeout.duration());
return wallet; return wallet;
} }
...@@ -117,7 +117,7 @@ public abstract class AbstractConfiguration extends AbstractNode { ...@@ -117,7 +117,7 @@ public abstract class AbstractConfiguration extends AbstractNode {
} }
protected void nextRandomTransaction() { protected void nextRandomTransaction() {
nextRandomTransaction(true); nextRandomTransaction(false);
} }
protected void nextRandomTransaction(boolean startConcurrentTransactions) { protected void nextRandomTransaction(boolean startConcurrentTransactions) {
......
...@@ -47,7 +47,7 @@ public class SuperVisorGuiControlImpl implements SuperVisorGuiControl { ...@@ -47,7 +47,7 @@ public class SuperVisorGuiControlImpl implements SuperVisorGuiControl {
public void createSnapshot(LocalDateTime time) { public void createSnapshot(LocalDateTime time) {
superVisor.writeSnapshot(time); superVisor.writeSnapshot(time);
for (Map.Entry<String, ActorRef> item : superVisor.getKnownNeighbors().entrySet()) { for (Map.Entry<String, ActorRef> item : superVisor.getKnownNeighbors().entrySet()) {
item.getValue().tell(new ActionWalletCreateSnapshot(time), ActorRef.noSender()); item.getValue().tell(new ActionWalletCreateSnapshot(time, superVisor.getSelf()), ActorRef.noSender());
break; break;
} }
} }
......
...@@ -12,25 +12,31 @@ import org.jfree.ui.RectangleAnchor; ...@@ -12,25 +12,31 @@ import org.jfree.ui.RectangleAnchor;
import org.jfree.ui.TextAnchor; import org.jfree.ui.TextAnchor;
import javax.swing.*; import javax.swing.*;
import javax.swing.border.EmptyBorder;
import javax.swing.event.ChangeEvent;
import javax.swing.event.ChangeListener;
import java.awt.*; import java.awt.*;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.HashSet;
import java.util.Hashtable;
import java.util.Set;
/** /**
* *
*/ */
public class SuperVisorHistogramGUI extends JFrame { public class SuperVisorHistogramGUI extends JFrame {
private final SuperVisorGuiControlImpl superVisorGuiControl; private final Set<Snapshot> snapshots;
private final LocalDateTime time; private final LocalDateTime time;
private static final int width = 1200; private static final int width = 1200;
private static final int height = 800; private static final int height = 800;
public SuperVisorHistogramGUI(SuperVisorGuiControlImpl superVisorGuiControl, LocalDateTime time) { public SuperVisorHistogramGUI(Set<Snapshot> snapshots, LocalDateTime time) {
super("Histogram of distributed granted commit (Snapshot " + time + ")"); super("Histogram of distributed granted commit (Snapshot " + time + ")");
this.superVisorGuiControl = superVisorGuiControl;
this.time = time; this.time = time;
this.snapshots = new HashSet<>(snapshots);
setVisible(true); setVisible(true);
setSize(width, height); setSize(width, height);
...@@ -59,7 +65,7 @@ public class SuperVisorHistogramGUI extends JFrame { ...@@ -59,7 +65,7 @@ public class SuperVisorHistogramGUI extends JFrame {
public void init() { public void init() {
QDigest qDigest = null; QDigest qDigest = null;
for (Snapshot item : superVisorGuiControl.getSnapshots()) { for (Snapshot item : snapshots) {
if (qDigest == null) { if (qDigest == null) {
qDigest = item.getqDigest(); qDigest = item.getqDigest();
} else { } else {
...@@ -79,7 +85,40 @@ public class SuperVisorHistogramGUI extends JFrame { ...@@ -79,7 +85,40 @@ public class SuperVisorHistogramGUI extends JFrame {
JFreeChart chart = createChart(dataset); JFreeChart chart = createChart(dataset);
final ChartPanel chartPanel = new ChartPanel(chart); final ChartPanel chartPanel = new ChartPanel(chart);
setContentPane(chartPanel);
JPanel myPanel = new JPanel(new BorderLayout());
myPanel.add(chartPanel, BorderLayout.CENTER);
JPanel quantilePanel = new JPanel();
final JSlider slider = new JSlider(0, 100);
Hashtable<Integer, JComponent> labelTable = new Hashtable<>();
labelTable.put(0, new JLabel("0.0") );
labelTable.put(5, new JLabel("0.5") );
labelTable.put(10, new JLabel("1.0") );
slider.setLabelTable( labelTable );
slider.setValue(50);
final JTextField quantileValueField = new JTextField();
quantileValueField.setEditable(false);
quantileValueField.setPreferredSize(new Dimension(40, 20));
QDigest finalQDigest = qDigest;
slider.addChangeListener(e -> {
double val = slider.getValue() / 100.;
assert val <= 1.0;
quantileValueField.setText(String.format("%1$d", finalQDigest.getQuantile(val)));
});
slider.getChangeListeners()[0].stateChanged(null);
quantilePanel.add(slider);
quantilePanel.add(quantileValueField);
quantilePanel.setBorder(new EmptyBorder(10, 10, 10, 10));
myPanel.add(quantilePanel, BorderLayout.PAGE_END);
setContentPane(myPanel);
} }
} }
package fucoin.gui; package fucoin.gui;
import fucoin.wallet.Snapshot;
import javax.swing.*; import javax.swing.*;
import java.awt.*; import java.awt.*;
import java.awt.event.ItemEvent; import java.awt.event.ItemEvent;
import java.awt.event.WindowAdapter; import java.awt.event.WindowAdapter;
import java.awt.event.WindowEvent; import java.awt.event.WindowEvent;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.HashSet;
/** /**
* *
...@@ -95,7 +98,7 @@ public class SuperVisorThreadGUI { ...@@ -95,7 +98,7 @@ public class SuperVisorThreadGUI {
showHistogramBtn.addActionListener(e -> { showHistogramBtn.addActionListener(e -> {
LocalDateTime time = (LocalDateTime) dropdown.getSelectedItem(); LocalDateTime time = (LocalDateTime) dropdown.getSelectedItem();
superVisorGuiControl.loadSnapshot(time); superVisorGuiControl.loadSnapshot(time);
new SuperVisorHistogramGUI(superVisorGuiControl, time).init(); new SuperVisorHistogramGUI(new HashSet<>(superVisorGuiControl.getSnapshots()), time).init();
}); });
btnPanel.add(showHistogramBtn); btnPanel.add(showHistogramBtn);
......
package fucoin.gui; package fucoin.gui;
import akka.actor.ActorSelection; import akka.actor.ActorSelection;
import fucoin.actions.control.ActionWalletCreateSnapshot;
import fucoin.wallet.AbstractWallet; import fucoin.wallet.AbstractWallet;
import fucoin.wallet.Snapshot;
import javax.swing.*; import javax.swing.*;
import java.awt.*; import java.awt.*;
import java.awt.event.*; import java.awt.event.*;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.Enumeration; import java.util.Enumeration;
import java.util.Set;
public class WalletGuiControlImpl implements WalletGuiControl { public class WalletGuiControlImpl implements WalletGuiControl {
...@@ -27,10 +32,13 @@ public class WalletGuiControlImpl implements WalletGuiControl { ...@@ -27,10 +32,13 @@ public class WalletGuiControlImpl implements WalletGuiControl {
private JButton btnSearch = new JButton("Search"); private JButton btnSearch = new JButton("Search");
private JButton btnStore = new JButton("Store"); private JButton btnStore = new JButton("Store");
private JButton btnExit = new JButton("Exit"); private JButton btnExit = new JButton("Exit");
private JButton btnCreateSnapshot = new JButton("Create Snapshot");
private JPanel bottomPanel = new JPanel(); private JPanel bottomPanel = new JPanel();
private JList<LogMessage> txtLog = new JList<>(log); private JList<LogMessage> txtLog = new JList<>(log);
private JScrollPane logPane = new JScrollPane(txtLog); private JScrollPane logPane = new JScrollPane(txtLog);
private JCheckBox showDebug; private JCheckBox showDebug;
private JComboBox<LocalDateTime> dropdown = new JComboBox<>();
private JButton btnShowHistogram = new JButton("Show Histogram");
public WalletGuiControlImpl(AbstractWallet wallet) { public WalletGuiControlImpl(AbstractWallet wallet) {
...@@ -70,7 +78,7 @@ public class WalletGuiControlImpl implements WalletGuiControl { ...@@ -70,7 +78,7 @@ public class WalletGuiControlImpl implements WalletGuiControl {
topPanel.add(row2); topPanel.add(row2);
window.add(topPanel); window.add(topPanel);
//<hr> //<hr>
centerPanel.setLayout(new GridLayout(4, 1)); centerPanel.setLayout(new GridLayout(5, 1));
// Row 1 // Row 1
JPanel centerup = new JPanel(); JPanel centerup = new JPanel();
centerup.setLayout(new BorderLayout()); centerup.setLayout(new BorderLayout());
...@@ -105,9 +113,19 @@ public class WalletGuiControlImpl implements WalletGuiControl { ...@@ -105,9 +113,19 @@ public class WalletGuiControlImpl implements WalletGuiControl {
centerdown2.add(btnStore); centerdown2.add(btnStore);
centerdown2.add(btnExit); centerdown2.add(btnExit);
centerPanel.add(centerdown2); centerPanel.add(centerdown2);
// Row 4
JPanel centerdown3 = new JPanel();
centerdown3.setLayout(new GridLayout(1, 3));
centerdown3.add(btnCreateSnapshot);
centerdown3.add(dropdown);
centerdown3.add(btnShowHistogram);
centerPanel.add(centerdown3);
window.add(centerPanel); window.add(centerPanel);
bottomPanel.setLayout(new BorderLayout()); bottomPanel.setLayout(new BorderLayout());
log.setTransactionFilter(); log.setTransactionFilter();
...@@ -136,6 +154,23 @@ public class WalletGuiControlImpl implements WalletGuiControl { ...@@ -136,6 +154,23 @@ public class WalletGuiControlImpl implements WalletGuiControl {
}); });
btnShowHistogram.setEnabled(dropdown.getSelectedIndex() != -1);
btnShowHistogram.addActionListener(e -> {
LocalDateTime time = (LocalDateTime)dropdown.getSelectedItem();
Set<Snapshot> snapshots = wallet.getSnapshots(time);
new SuperVisorHistogramGUI(snapshots, time).init();
});
dropdown.addActionListener(e -> {
btnShowHistogram.setEnabled(dropdown.getSelectedIndex() != -1);
});
btnCreateSnapshot.addActionListener(e -> {
LocalDateTime now = LocalDateTime.now();
dropdown.addItem(now);
wallet.createDistributedSnapshot(now);
});
txtLog.setCellRenderer(new LogCellRenderer()); txtLog.setCellRenderer(new LogCellRenderer());
...@@ -154,7 +189,6 @@ public class WalletGuiControlImpl implements WalletGuiControl { ...@@ -154,7 +189,6 @@ public class WalletGuiControlImpl implements WalletGuiControl {
System.out.println("window closing"); System.out.println("window closing");
wallet.leave(); wallet.leave();
super.windowClosing(e); super.windowClosing(e);
} }
@Override @Override
......
...@@ -9,6 +9,7 @@ import fucoin.gui.TransactionLogger; ...@@ -9,6 +9,7 @@ import fucoin.gui.TransactionLogger;
import java.io.Serializable; import java.io.Serializable;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.Set;
/** /**
* *
...@@ -119,10 +120,20 @@ public abstract class AbstractWallet extends AbstractNode implements Serializabl ...@@ -119,10 +120,20 @@ public abstract class AbstractWallet extends AbstractNode implements Serializabl
public boolean writeSnapshot(LocalDateTime time) { public boolean writeSnapshot(LocalDateTime time) {
return super.writeSnapshot(getName(), return super.writeSnapshot(getName(),
(new GsonBuilder().setPrettyPrinting().create()).toJson(new Snapshot(this)), time); (new GsonBuilder().setPrettyPrinting().create()).toJson(new Snapshot(this, time)), time);
} }
public Snapshot readSnapShot(LocalDateTime time) { public Snapshot readSnapShot(LocalDateTime time) {
return new Gson().fromJson(super.readSnapshot(getName(), time), Snapshot.class); return new Gson().fromJson(super.readSnapshot(getName(), time), Snapshot.class);
} }
public abstract void createDistributedSnapshot(LocalDateTime dateTime);
public abstract void enableMessageForwarding(LocalDateTime time, ActorRef originator);
public abstract void disableMessageForwarding(LocalDateTime time, ActorRef sender);
public abstract void addSnapshot(Snapshot snapshot);
public abstract Set<Snapshot> getSnapshots(LocalDateTime token);
} }
...@@ -4,6 +4,8 @@ import fucoin.QDigest; ...@@ -4,6 +4,8 @@ import fucoin.QDigest;
import fucoin.Statistics; import fucoin.Statistics;
import java.io.Serializable; import java.io.Serializable;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
...@@ -16,10 +18,12 @@ public class Snapshot implements Serializable { ...@@ -16,10 +18,12 @@ public class Snapshot implements Serializable {
private final Statistics statistics; private final Statistics statistics;
private final List<long[]> qDigestVal; private final List<long[]> qDigestVal;
private final byte[] qDigest; private final byte[] qDigest;
private final LocalDateTime time;
public Snapshot(AbstractWallet wallet) { public Snapshot(AbstractWallet wallet, LocalDateTime time) {
this.name = wallet.getName(); this.name = wallet.getName();
this.time = time;
this.amount = wallet.getAmount(); this.amount = wallet.getAmount();
this.statistics = wallet.getStatistics(); this.statistics = wallet.getStatistics();
this.knownNeighbour = wallet.getKnownNeighbors().keySet(); this.knownNeighbour = wallet.getKnownNeighbors().keySet();
...@@ -57,4 +61,8 @@ public class Snapshot implements Serializable { ...@@ -57,4 +61,8 @@ public class Snapshot implements Serializable {
public QDigest getqDigest() { public QDigest getqDigest() {
return QDigest.deserialize(this.qDigest); return QDigest.deserialize(this.qDigest);
} }
public LocalDateTime getTime() {
return time;
}
} }
package fucoin.wallet; package fucoin.wallet;
import akka.actor.ActorRef; import akka.actor.*;
import akka.actor.Props;
import fucoin.actions.ClientAction; import fucoin.actions.ClientAction;
import fucoin.actions.control.ActionWalletCreateSnapshot;
import fucoin.actions.join.ActionJoin; import fucoin.actions.join.ActionJoin;
import fucoin.actions.join.ActionTellSupervisor; import fucoin.actions.join.ActionTellSupervisor;
import fucoin.actions.join.ServerActionJoin; import fucoin.actions.join.ServerActionJoin;
...@@ -12,8 +12,17 @@ import fucoin.actions.transaction.ActionGetAmountAnswer; ...@@ -12,8 +12,17 @@ import fucoin.actions.transaction.ActionGetAmountAnswer;
import fucoin.actions.transaction.ActionInvokeSentMoney; import fucoin.actions.transaction.ActionInvokeSentMoney;
import fucoin.gui.WalletGuiControl; import fucoin.gui.WalletGuiControl;
import scala.concurrent.Future; import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static akka.dispatch.Futures.future; import static akka.dispatch.Futures.future;
...@@ -250,4 +259,72 @@ public class WalletImpl extends AbstractWallet { ...@@ -250,4 +259,72 @@ public class WalletImpl extends AbstractWallet {
} }
} }
// keeps track of what channels we're recording for a particular snapshot token
Map<LocalDateTime, Set<ActorRef>> activeChannels = new HashMap<>();
// keeps track of who the originator for the snapshot token was (needed for forwarding)
Map<LocalDateTime, ActorRef> token2Originator = new HashMap<>();
// collects all snapshot responses
Map<LocalDateTime, Set<Snapshot>> collectedSnapshots = new HashMap<>();
@Override
public void enableMessageForwarding(LocalDateTime time, ActorRef originator) {
token2Originator.put(time, originator);
Set<ActorRef> refs = new HashSet<>();
activeChannels.put(time, refs);
getKnownNeighbors()
.values()
.stream()
.filter(actorRef -> actorRef != originator)
.forEach(refs::add);
}
@Override
public void disableMessageForwarding(LocalDateTime time, ActorRef sender) {
Set<ActorRef> remainingChannels = activeChannels.get(time);
if(remainingChannels == null) {
// we're finished already
return;
}
remainingChannels.remove(sender);
// stop recording messages
if(remainingChannels.isEmpty()) {
activeChannels.remove(time);
token2Originator.remove(time);
}
}
@Override
public void createDistributedSnapshot(LocalDateTime dateTime) {
LocalDateTime token = dateTime;
enableMessageForwarding(token, self());
getKnownNeighbors().values().forEach(actorRef -> {
actorRef.tell(new ActionWalletCreateSnapshot(token, self()), self());
});
// wait for answers
getContext().system().scheduler().scheduleOnce(Duration.create(1, TimeUnit.SECONDS), () -> {
}, getContext().system().dispatcher());
}
@Override
public void addSnapshot(Snapshot snapshot) {
collectedSnapshots.putIfAbsent(snapshot.getTime(), new HashSet<>());
collectedSnapshots.get(snapshot.getTime()).add(snapshot);
}
public Set<Snapshot> getSnapshots(LocalDateTime token) {
return new HashSet<>(collectedSnapshots.get(token));
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment