Skip to content
Snippets Groups Projects
Commit d8ea09da authored by Kim Kern's avatar Kim Kern
Browse files

add gui support for several aggregation methods

parent 768b8c18
Branches
No related tags found
No related merge requests found
Showing
with 200 additions and 24 deletions
......@@ -35,7 +35,7 @@ public abstract class ActionAggregation extends ClientAction {
} else {
wallet.clearExcludedNeighbors();
sendAggregatedResult(wallet, aggregationContext);
sendEndMessage(wallet);
}
}
......@@ -46,6 +46,13 @@ public abstract class ActionAggregation extends ClientAction {
.tell(result, wallet.getSelf());
}
protected void sendEndMessage(AbstractWallet wallet) {
ActionAggregationEnd end = new ActionAggregationEnd();
wallet.getKnownNeighbors()
.values()
.forEach(w -> w.tell(end, wallet.getSelf()));
}
private void sendSleepingRequest(AbstractWallet wallet, AggregationContext aggregationContext) {
ActorRef randomNeighbor = wallet.getRandomNeighbor();
Thread sleepingRequest = new Thread() {
......
......@@ -36,6 +36,7 @@ public class ActionAggregationCancel extends ActionAggregation {
continueAggregation(wallet);
} else if (wallet.hasAggregationContext()) {
sendAggregatedResult(wallet, wallet.getAggregationContext());
sendEndMessage(wallet);
}
}
......
package fucoin.actions.aggregation;
import akka.actor.ActorRef;
import akka.actor.UntypedActorContext;
import fucoin.actions.ClientAction;
import fucoin.wallet.AbstractWallet;
/**
* When a node gets an ActionAggregationEnd message, it immediately sends back
* its AggregationContext to the supervisor.
*
* @author Kim
*
*/
public class ActionAggregationEnd extends ClientAction {
private static final long serialVersionUID = 6353251813735942634L;
@Override
protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, AbstractWallet wallet) {
ActorRef superVisor = wallet.getRemoteSuperVisorActor();
ActionAggregationResult result = new ActionAggregationResult(wallet.getAggregationContext());
superVisor.tell(result, self);
}
}
......@@ -13,19 +13,31 @@ import fucoin.wallet.AbstractWallet;
*/
public class ActionAggregationInit extends ActionAggregation {
public ActionAggregationInit(AggregationContext context) {
private final boolean useValueExtractor;
public ActionAggregationInit(AggregationContext context, boolean useValueExtractor) {
super(context);
this.useValueExtractor = useValueExtractor;
}
private static final long serialVersionUID = -7416863664917157007L;
@Override
protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, AbstractWallet wallet) {
AggregationContext initContext = getContext().initContext(wallet);
AggregationContext initContext;
if (useValueExtractor) {
initContext = getContext().initContext(wallet);
} else {
initContext = getContext();
}
wallet.setAggregationContext(initContext);
continueAggregation(wallet);
}
public boolean isUseValueExtractor() {
return useValueExtractor;
}
}
......@@ -24,7 +24,7 @@ public class ActionAggregationResult extends SuperVisorAction {
@Override
protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, SuperVisorImpl superVisor) {
// TODO add to supervisor result list
superVisor.addAggregationResult(sender, getContext());
}
public AggregationContext getContext() {
......
package fucoin.actions.aggregation;
import java.util.function.Function;
import fucoin.wallet.AbstractWallet;
public enum AggregationMethod {
Average(x -> new Double(x.getAmount()), (x, y) -> (x + y) / 2), Maximum(x -> new Double(x.getAmount()),
(x, y) -> Math.max(x, y)), Minimum(x -> new Double(x.getAmount()),
(x, y) -> Math.min(x, y)), Count(x -> 0.0, (x, y) -> (x + y) / 2, false, 1);
private final Function<AbstractWallet, Double> valueExtractor;
private final AggregationFunction function;
private final boolean useValueExtractor;
private double value;
AggregationMethod(Function<AbstractWallet, Double> valueExtractor, AggregationFunction function) {
this.valueExtractor = valueExtractor;
this.function = function;
this.useValueExtractor = true;
this.value = 0;
}
AggregationMethod(Function<AbstractWallet, Double> valueExtractor, AggregationFunction function,
boolean useValueExtractor, double value) {
this.valueExtractor = valueExtractor;
this.function = function;
this.useValueExtractor = useValueExtractor;
this.value = value;
}
public Function<AbstractWallet, Double> getValueExtractor() {
return valueExtractor;
}
public AggregationFunction getFunction() {
return function;
}
public boolean isUseValueExtractor() {
return useValueExtractor;
}
public double getValue() {
return value;
}
}
......@@ -12,13 +12,13 @@ public class MassWalletConfiguration extends AbstractConfiguration {
public void run() {
ActorRef supervisor = initSupervisor();
try {
spawnWallets(3, false);
spawnWallets(10, false);
System.out.println("Wallet spawning done!");
} catch (Exception e) {
System.out.println("Wallet spawning timed out!");
}
randomTransactions(3, 3);
randomTransactions(10, 10);
}
@Override
......
......@@ -9,4 +9,6 @@ public interface SuperVisorGuiControl extends TransactionLogger {
public void updateTable(String address, String name, int amount);
public void updateAggregationValue(String name, double d);
}
package fucoin.gui;
import fucoin.actions.aggregation.AggregationMethod;
import fucoin.supervisor.AmountTableModel;
import fucoin.supervisor.SuperVisorImpl;
......@@ -40,6 +41,11 @@ public class SuperVisorGuiControlImpl implements SuperVisorGuiControl {
this.amountTableModel.updateTable(address, name, amount);
}
@Override
public void updateAggregationValue(String name, double value) {
this.amountTableModel.updateAggregationValue(name, value);
}
private void log(LogMessage logMessage) {
if (logActive) {
threadGUI.log(logMessage);
......@@ -81,7 +87,7 @@ public class SuperVisorGuiControlImpl implements SuperVisorGuiControl {
return logActive;
}
public void startAggregation() {
superVisor.startAggregation();
public void startAggregation(AggregationMethod method, int exchanges) {
superVisor.startAggregation(method, exchanges);
}
}
......@@ -8,13 +8,18 @@ import java.awt.event.WindowEvent;
import javax.swing.JButton;
import javax.swing.JCheckBox;
import javax.swing.JComboBox;
import javax.swing.JFrame;
import javax.swing.JList;
import javax.swing.JPanel;
import javax.swing.JScrollPane;
import javax.swing.JSpinner;
import javax.swing.JTable;
import javax.swing.SpinnerNumberModel;
import javax.swing.SwingUtilities;
import fucoin.actions.aggregation.AggregationMethod;
/**
*
*/
......@@ -30,6 +35,10 @@ public class SuperVisorThreadGUI {
private JButton startAggregationButton;
private JComboBox<AggregationMethod> aggregationMethodsBox;
private JSpinner aggregationExchangesSpinner;
public SuperVisorThreadGUI(SuperVisorGuiControlImpl superVisorGuiControl) {
this.superVisorGuiControl = superVisorGuiControl;
......@@ -50,9 +59,15 @@ public class SuperVisorThreadGUI {
txtLog.setCellRenderer(new LogCellRenderer());
aggregationMethodsBox = new JComboBox<AggregationMethod>(AggregationMethod.values());
aggregationExchangesSpinner = new JSpinner(new SpinnerNumberModel(25, 1, 10000, 1));
startAggregationButton = new JButton("Start Aggregation");
startAggregationButton.addActionListener(e -> {
superVisorGuiControl.startAggregation();
AggregationMethod method = (AggregationMethod) aggregationMethodsBox.getSelectedItem();
int exchanges = (int) aggregationExchangesSpinner.getModel()
.getValue();
superVisorGuiControl.startAggregation(method, exchanges);
});
showDebug = new JCheckBox("Show debug messages in transaction log");
......@@ -76,11 +91,18 @@ public class SuperVisorThreadGUI {
}
});
JPanel configPanel = new JPanel();
JPanel configPanel = new JPanel(new BorderLayout());
JPanel logConfigPanel = new JPanel();
JPanel aggregationPanel = new JPanel();
configPanel.add(logConfigPanel, BorderLayout.NORTH);
configPanel.add(aggregationPanel, BorderLayout.SOUTH);
configPanel.add(activateLogging);
configPanel.add(showDebug);
configPanel.add(startAggregationButton);
logConfigPanel.add(activateLogging);
logConfigPanel.add(showDebug);
aggregationPanel.add(aggregationExchangesSpinner);
aggregationPanel.add(aggregationMethodsBox);
aggregationPanel.add(startAggregationButton);
//logPanel.add(activateLogging, BorderLayout.NORTH);
logPanel.add(configPanel, BorderLayout.NORTH);
......
package fucoin.supervisor;
import javax.swing.*;
import javax.swing.table.DefaultTableModel;
import java.util.Vector;
import javax.swing.table.DefaultTableModel;
public class AmountTableModel extends DefaultTableModel {
public AmountTableModel() {
super(new Object[]{"Address", "Name", "Amount"}, 0);
super(new Object[] { "Address", "Name", "Amount", "Aggregation Value" }, 0);
}
public void clear() {
......@@ -29,6 +29,20 @@ public class AmountTableModel extends DefaultTableModel {
}
}
this.addRow(new Object[]{address, name, amount});
this.addRow(new Object[] { address, name, amount, null });
}
public void updateAggregationValue(String name, double value) {
Vector rows = this.getDataVector();
for (int i = 0; i < rows.size(); i++) {
if (rows.get(i) instanceof Vector) {
Vector<Object> row = (Vector<Object>) rows.get(i);
if (row.get(1)
.equals(name)) {
setValueAt(value, i, 3);
return;
}
}
}
}
}
......@@ -17,6 +17,7 @@ import fucoin.actions.aggregation.ActionAggregationInit;
import fucoin.actions.aggregation.AggregationContext;
import fucoin.actions.aggregation.AggregationContext.AggregationContextBuilder;
import fucoin.actions.aggregation.AggregationFunction;
import fucoin.actions.aggregation.AggregationMethod;
import fucoin.actions.control.ActionWalletCreationDone;
import fucoin.actions.persist.ActionInvokeUpdate;
import fucoin.actions.transaction.ActionGetAmountAnswer;
......@@ -37,6 +38,8 @@ public class SuperVisorImpl extends AbstractNode implements TransactionLogger {
private int pendingBankCommits = 0;
private ActorRef bankCommitObserver = null;
private final Map<ActorRef, AggregationContext> aggregationResults = new HashMap<>();
public SuperVisorImpl() {
}
......@@ -175,14 +178,49 @@ public class SuperVisorImpl extends AbstractNode implements TransactionLogger {
this.bankCommitObserver = bankCommitObserver;
}
public void startAggregation() {
Function<AbstractWallet, Double> valueExtractor = x -> new Double(x.getAmount());
AggregationFunction function = (x, y) -> (x + y) / 2;
AggregationContext context = new AggregationContextBuilder(function, valueExtractor, 0)
.setMaxExchanges(5)
public void startAggregation(AggregationMethod method, int exchanges) {
clearAggregationResults();
Function<AbstractWallet, Double> valueExtractor = method.getValueExtractor();
AggregationFunction function = method.getFunction();
double value = method.getValue();
AggregationContext context = new AggregationContextBuilder(function, valueExtractor, value)
.setMaxExchanges(exchanges)
.build();
ActorRef randomNode = getRandomNeighbor();
ActionAggregationInit init = new ActionAggregationInit(context);
ActionAggregationInit init = new ActionAggregationInit(context, method.isUseValueExtractor());
randomNode.tell(init, getSelf());
}
/**
* Returns the results of an aggregation operation that have been sent back
* to the supervisor.
*
* @return aggregation results
*/
public Map<ActorRef, AggregationContext> getAggregationResults() {
return aggregationResults;
}
/**
* Clears previous aggregation results, such that a new aggregation can be
* started.
*/
public void clearAggregationResults() {
getAggregationResults().clear();
}
/**
* Adds a new aggregation result, consisting of the ActorRef of the wallet
* and the corresponding AggregationContext.
*
* @param wallet
* @param context
* AggregationContext, that holds the aggregated value
*/
public void addAggregationResult(ActorRef wallet, AggregationContext context) {
System.out.println("add: " + context.toString());
getAggregationResults().put(wallet, context);
gui.updateAggregationValue(wallet.path()
.name(), context.getValue());
}
}
......@@ -32,7 +32,7 @@ public class WalletImpl extends AbstractWallet {
private AggregationContext aggregationContext;
private boolean hasPendingAggregationRequest = false;
private ConcurrentLinkedQueue<ActorRef> deferedSupervisorReceivers = new ConcurrentLinkedQueue<>();
private final EvictingQueue<UUID> handledAggregationRequests = EvictingQueue.create(10);
private transient final EvictingQueue<UUID> handledAggregationRequests = EvictingQueue.create(10);
public WalletImpl(String name) {
super(name);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment