diff --git a/src/main/java/fucoin/actions/aggregation/ActionAggregation.java b/src/main/java/fucoin/actions/aggregation/ActionAggregation.java index 5624ff79f202c84d8d5668aec6d8c5af58fe46cf..723768db906a8a74e3ecb6ce8aa172bf7043d866 100644 --- a/src/main/java/fucoin/actions/aggregation/ActionAggregation.java +++ b/src/main/java/fucoin/actions/aggregation/ActionAggregation.java @@ -35,7 +35,7 @@ public abstract class ActionAggregation extends ClientAction { } else { wallet.clearExcludedNeighbors(); sendAggregatedResult(wallet, aggregationContext); - + sendEndMessage(wallet); } } @@ -46,6 +46,13 @@ public abstract class ActionAggregation extends ClientAction { .tell(result, wallet.getSelf()); } + protected void sendEndMessage(AbstractWallet wallet) { + ActionAggregationEnd end = new ActionAggregationEnd(); + wallet.getKnownNeighbors() + .values() + .forEach(w -> w.tell(end, wallet.getSelf())); + } + private void sendSleepingRequest(AbstractWallet wallet, AggregationContext aggregationContext) { ActorRef randomNeighbor = wallet.getRandomNeighbor(); Thread sleepingRequest = new Thread() { diff --git a/src/main/java/fucoin/actions/aggregation/ActionAggregationCancel.java b/src/main/java/fucoin/actions/aggregation/ActionAggregationCancel.java index e9901550badcfd9f86a3b1934a577085eafd5dd7..49e4478508b5ed7313d5ac019736c43401ce7d81 100644 --- a/src/main/java/fucoin/actions/aggregation/ActionAggregationCancel.java +++ b/src/main/java/fucoin/actions/aggregation/ActionAggregationCancel.java @@ -36,6 +36,7 @@ public class ActionAggregationCancel extends ActionAggregation { continueAggregation(wallet); } else if (wallet.hasAggregationContext()) { sendAggregatedResult(wallet, wallet.getAggregationContext()); + sendEndMessage(wallet); } } diff --git a/src/main/java/fucoin/actions/aggregation/ActionAggregationEnd.java b/src/main/java/fucoin/actions/aggregation/ActionAggregationEnd.java new file mode 100644 index 0000000000000000000000000000000000000000..ea8c72045a84e52aa85545e96850fa93a79e13db --- /dev/null +++ b/src/main/java/fucoin/actions/aggregation/ActionAggregationEnd.java @@ -0,0 +1,26 @@ +package fucoin.actions.aggregation; + +import akka.actor.ActorRef; +import akka.actor.UntypedActorContext; +import fucoin.actions.ClientAction; +import fucoin.wallet.AbstractWallet; + +/** + * When a node gets an ActionAggregationEnd message, it immediately sends back + * its AggregationContext to the supervisor. + * + * @author Kim + * + */ +public class ActionAggregationEnd extends ClientAction { + + private static final long serialVersionUID = 6353251813735942634L; + + @Override + protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, AbstractWallet wallet) { + ActorRef superVisor = wallet.getRemoteSuperVisorActor(); + ActionAggregationResult result = new ActionAggregationResult(wallet.getAggregationContext()); + superVisor.tell(result, self); + } + +} diff --git a/src/main/java/fucoin/actions/aggregation/ActionAggregationInit.java b/src/main/java/fucoin/actions/aggregation/ActionAggregationInit.java index e5d701426af53901a97a0259a7f011da737ae746..42b7736d24d05fcbef99c64052d174d14365c729 100644 --- a/src/main/java/fucoin/actions/aggregation/ActionAggregationInit.java +++ b/src/main/java/fucoin/actions/aggregation/ActionAggregationInit.java @@ -13,19 +13,31 @@ import fucoin.wallet.AbstractWallet; */ public class ActionAggregationInit extends ActionAggregation { - public ActionAggregationInit(AggregationContext context) { + private final boolean useValueExtractor; + + public ActionAggregationInit(AggregationContext context, boolean useValueExtractor) { super(context); + this.useValueExtractor = useValueExtractor; } private static final long serialVersionUID = -7416863664917157007L; @Override protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, AbstractWallet wallet) { - AggregationContext initContext = getContext().initContext(wallet); + AggregationContext initContext; + + if (useValueExtractor) { + initContext = getContext().initContext(wallet); + } else { + initContext = getContext(); + } wallet.setAggregationContext(initContext); continueAggregation(wallet); } + public boolean isUseValueExtractor() { + return useValueExtractor; + } } diff --git a/src/main/java/fucoin/actions/aggregation/ActionAggregationResult.java b/src/main/java/fucoin/actions/aggregation/ActionAggregationResult.java index 8375a4883c8eb34c379474ad53fcbe8f76168514..9a74e2bf3dddd672615d5f52dbc844d3aaac269a 100644 --- a/src/main/java/fucoin/actions/aggregation/ActionAggregationResult.java +++ b/src/main/java/fucoin/actions/aggregation/ActionAggregationResult.java @@ -24,7 +24,7 @@ public class ActionAggregationResult extends SuperVisorAction { @Override protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, SuperVisorImpl superVisor) { - // TODO add to supervisor result list + superVisor.addAggregationResult(sender, getContext()); } public AggregationContext getContext() { diff --git a/src/main/java/fucoin/actions/aggregation/AggregationMethod.java b/src/main/java/fucoin/actions/aggregation/AggregationMethod.java new file mode 100644 index 0000000000000000000000000000000000000000..85fffe1df35cad995ee2f64db3df76b3e3d32a46 --- /dev/null +++ b/src/main/java/fucoin/actions/aggregation/AggregationMethod.java @@ -0,0 +1,48 @@ +package fucoin.actions.aggregation; + +import java.util.function.Function; + +import fucoin.wallet.AbstractWallet; + +public enum AggregationMethod { + Average(x -> new Double(x.getAmount()), (x, y) -> (x + y) / 2), Maximum(x -> new Double(x.getAmount()), + (x, y) -> Math.max(x, y)), Minimum(x -> new Double(x.getAmount()), + (x, y) -> Math.min(x, y)), Count(x -> 0.0, (x, y) -> (x + y) / 2, false, 1); + + private final Function<AbstractWallet, Double> valueExtractor; + private final AggregationFunction function; + private final boolean useValueExtractor; + private double value; + + AggregationMethod(Function<AbstractWallet, Double> valueExtractor, AggregationFunction function) { + this.valueExtractor = valueExtractor; + this.function = function; + this.useValueExtractor = true; + this.value = 0; + } + + AggregationMethod(Function<AbstractWallet, Double> valueExtractor, AggregationFunction function, + boolean useValueExtractor, double value) { + this.valueExtractor = valueExtractor; + this.function = function; + this.useValueExtractor = useValueExtractor; + this.value = value; + } + + public Function<AbstractWallet, Double> getValueExtractor() { + return valueExtractor; + } + + public AggregationFunction getFunction() { + return function; + } + + public boolean isUseValueExtractor() { + return useValueExtractor; + } + + public double getValue() { + return value; + } + +} diff --git a/src/main/java/fucoin/configurations/MassWalletConfiguration.java b/src/main/java/fucoin/configurations/MassWalletConfiguration.java index ac6f08b55fab15cc08fdb89957700f6b2bbf9e95..5eef7a15d0ae333d16f0fa582469d69869e139e4 100644 --- a/src/main/java/fucoin/configurations/MassWalletConfiguration.java +++ b/src/main/java/fucoin/configurations/MassWalletConfiguration.java @@ -12,13 +12,13 @@ public class MassWalletConfiguration extends AbstractConfiguration { public void run() { ActorRef supervisor = initSupervisor(); try { - spawnWallets(3, false); + spawnWallets(10, false); System.out.println("Wallet spawning done!"); } catch (Exception e) { System.out.println("Wallet spawning timed out!"); } - randomTransactions(3, 3); + randomTransactions(10, 10); } @Override diff --git a/src/main/java/fucoin/gui/SuperVisorGuiControl.java b/src/main/java/fucoin/gui/SuperVisorGuiControl.java index b8b29e555f906ae8de36d8b1edd625fe7acc824d..a5d10b81cc790dd877d16c2ce88df80133da4e64 100644 --- a/src/main/java/fucoin/gui/SuperVisorGuiControl.java +++ b/src/main/java/fucoin/gui/SuperVisorGuiControl.java @@ -9,4 +9,6 @@ public interface SuperVisorGuiControl extends TransactionLogger { public void updateTable(String address, String name, int amount); + public void updateAggregationValue(String name, double d); + } diff --git a/src/main/java/fucoin/gui/SuperVisorGuiControlImpl.java b/src/main/java/fucoin/gui/SuperVisorGuiControlImpl.java index 453b6c6a36338124ef1e9ea590a29ed8d46852b0..46d8b9a2f130c7af9777a1366309d65c7541f321 100644 --- a/src/main/java/fucoin/gui/SuperVisorGuiControlImpl.java +++ b/src/main/java/fucoin/gui/SuperVisorGuiControlImpl.java @@ -1,5 +1,6 @@ package fucoin.gui; +import fucoin.actions.aggregation.AggregationMethod; import fucoin.supervisor.AmountTableModel; import fucoin.supervisor.SuperVisorImpl; @@ -40,6 +41,11 @@ public class SuperVisorGuiControlImpl implements SuperVisorGuiControl { this.amountTableModel.updateTable(address, name, amount); } + @Override + public void updateAggregationValue(String name, double value) { + this.amountTableModel.updateAggregationValue(name, value); + } + private void log(LogMessage logMessage) { if (logActive) { threadGUI.log(logMessage); @@ -81,7 +87,7 @@ public class SuperVisorGuiControlImpl implements SuperVisorGuiControl { return logActive; } - public void startAggregation() { - superVisor.startAggregation(); + public void startAggregation(AggregationMethod method, int exchanges) { + superVisor.startAggregation(method, exchanges); } } diff --git a/src/main/java/fucoin/gui/SuperVisorThreadGUI.java b/src/main/java/fucoin/gui/SuperVisorThreadGUI.java index 63b8f8d16d048c52705cde1a849d5c4cf4a3a804..a722a3f49c305b2e0a2c745a20d4ca8143ea4e1b 100644 --- a/src/main/java/fucoin/gui/SuperVisorThreadGUI.java +++ b/src/main/java/fucoin/gui/SuperVisorThreadGUI.java @@ -8,13 +8,18 @@ import java.awt.event.WindowEvent; import javax.swing.JButton; import javax.swing.JCheckBox; +import javax.swing.JComboBox; import javax.swing.JFrame; import javax.swing.JList; import javax.swing.JPanel; import javax.swing.JScrollPane; +import javax.swing.JSpinner; import javax.swing.JTable; +import javax.swing.SpinnerNumberModel; import javax.swing.SwingUtilities; +import fucoin.actions.aggregation.AggregationMethod; + /** * */ @@ -30,6 +35,10 @@ public class SuperVisorThreadGUI { private JButton startAggregationButton; + private JComboBox<AggregationMethod> aggregationMethodsBox; + + private JSpinner aggregationExchangesSpinner; + public SuperVisorThreadGUI(SuperVisorGuiControlImpl superVisorGuiControl) { this.superVisorGuiControl = superVisorGuiControl; @@ -50,9 +59,15 @@ public class SuperVisorThreadGUI { txtLog.setCellRenderer(new LogCellRenderer()); + aggregationMethodsBox = new JComboBox<AggregationMethod>(AggregationMethod.values()); + aggregationExchangesSpinner = new JSpinner(new SpinnerNumberModel(25, 1, 10000, 1)); + startAggregationButton = new JButton("Start Aggregation"); startAggregationButton.addActionListener(e -> { - superVisorGuiControl.startAggregation(); + AggregationMethod method = (AggregationMethod) aggregationMethodsBox.getSelectedItem(); + int exchanges = (int) aggregationExchangesSpinner.getModel() + .getValue(); + superVisorGuiControl.startAggregation(method, exchanges); }); showDebug = new JCheckBox("Show debug messages in transaction log"); @@ -76,11 +91,18 @@ public class SuperVisorThreadGUI { } }); - JPanel configPanel = new JPanel(); + JPanel configPanel = new JPanel(new BorderLayout()); + JPanel logConfigPanel = new JPanel(); + JPanel aggregationPanel = new JPanel(); + + configPanel.add(logConfigPanel, BorderLayout.NORTH); + configPanel.add(aggregationPanel, BorderLayout.SOUTH); - configPanel.add(activateLogging); - configPanel.add(showDebug); - configPanel.add(startAggregationButton); + logConfigPanel.add(activateLogging); + logConfigPanel.add(showDebug); + aggregationPanel.add(aggregationExchangesSpinner); + aggregationPanel.add(aggregationMethodsBox); + aggregationPanel.add(startAggregationButton); //logPanel.add(activateLogging, BorderLayout.NORTH); logPanel.add(configPanel, BorderLayout.NORTH); diff --git a/src/main/java/fucoin/supervisor/AmountTableModel.java b/src/main/java/fucoin/supervisor/AmountTableModel.java index 80378e916040dac5fc3fb816250206cefb424ff5..44e863956429f4d9ba8e36f273c325a967efe616 100644 --- a/src/main/java/fucoin/supervisor/AmountTableModel.java +++ b/src/main/java/fucoin/supervisor/AmountTableModel.java @@ -1,13 +1,13 @@ package fucoin.supervisor; -import javax.swing.*; -import javax.swing.table.DefaultTableModel; import java.util.Vector; +import javax.swing.table.DefaultTableModel; + public class AmountTableModel extends DefaultTableModel { public AmountTableModel() { - super(new Object[]{"Address", "Name", "Amount"}, 0); + super(new Object[] { "Address", "Name", "Amount", "Aggregation Value" }, 0); } public void clear() { @@ -29,6 +29,20 @@ public class AmountTableModel extends DefaultTableModel { } } - this.addRow(new Object[]{address, name, amount}); + this.addRow(new Object[] { address, name, amount, null }); } + + public void updateAggregationValue(String name, double value) { + Vector rows = this.getDataVector(); + for (int i = 0; i < rows.size(); i++) { + if (rows.get(i) instanceof Vector) { + Vector<Object> row = (Vector<Object>) rows.get(i); + if (row.get(1) + .equals(name)) { + setValueAt(value, i, 3); + return; + } + } + } + } } diff --git a/src/main/java/fucoin/supervisor/SuperVisorImpl.java b/src/main/java/fucoin/supervisor/SuperVisorImpl.java index c3b165d22b62ac855c8748c0216f99e404da8261..86ffa7a03e9c3ffe2f61d89f9a84dc903f1fcc1b 100644 --- a/src/main/java/fucoin/supervisor/SuperVisorImpl.java +++ b/src/main/java/fucoin/supervisor/SuperVisorImpl.java @@ -17,6 +17,7 @@ import fucoin.actions.aggregation.ActionAggregationInit; import fucoin.actions.aggregation.AggregationContext; import fucoin.actions.aggregation.AggregationContext.AggregationContextBuilder; import fucoin.actions.aggregation.AggregationFunction; +import fucoin.actions.aggregation.AggregationMethod; import fucoin.actions.control.ActionWalletCreationDone; import fucoin.actions.persist.ActionInvokeUpdate; import fucoin.actions.transaction.ActionGetAmountAnswer; @@ -37,6 +38,8 @@ public class SuperVisorImpl extends AbstractNode implements TransactionLogger { private int pendingBankCommits = 0; private ActorRef bankCommitObserver = null; + private final Map<ActorRef, AggregationContext> aggregationResults = new HashMap<>(); + public SuperVisorImpl() { } @@ -175,14 +178,49 @@ public class SuperVisorImpl extends AbstractNode implements TransactionLogger { this.bankCommitObserver = bankCommitObserver; } - public void startAggregation() { - Function<AbstractWallet, Double> valueExtractor = x -> new Double(x.getAmount()); - AggregationFunction function = (x, y) -> (x + y) / 2; - AggregationContext context = new AggregationContextBuilder(function, valueExtractor, 0) - .setMaxExchanges(5) + public void startAggregation(AggregationMethod method, int exchanges) { + clearAggregationResults(); + Function<AbstractWallet, Double> valueExtractor = method.getValueExtractor(); + AggregationFunction function = method.getFunction(); + double value = method.getValue(); + AggregationContext context = new AggregationContextBuilder(function, valueExtractor, value) + .setMaxExchanges(exchanges) .build(); ActorRef randomNode = getRandomNeighbor(); - ActionAggregationInit init = new ActionAggregationInit(context); + ActionAggregationInit init = new ActionAggregationInit(context, method.isUseValueExtractor()); randomNode.tell(init, getSelf()); } + + /** + * Returns the results of an aggregation operation that have been sent back + * to the supervisor. + * + * @return aggregation results + */ + public Map<ActorRef, AggregationContext> getAggregationResults() { + return aggregationResults; + } + + /** + * Clears previous aggregation results, such that a new aggregation can be + * started. + */ + public void clearAggregationResults() { + getAggregationResults().clear(); + } + + /** + * Adds a new aggregation result, consisting of the ActorRef of the wallet + * and the corresponding AggregationContext. + * + * @param wallet + * @param context + * AggregationContext, that holds the aggregated value + */ + public void addAggregationResult(ActorRef wallet, AggregationContext context) { + System.out.println("add: " + context.toString()); + getAggregationResults().put(wallet, context); + gui.updateAggregationValue(wallet.path() + .name(), context.getValue()); + } } diff --git a/src/main/java/fucoin/wallet/WalletImpl.java b/src/main/java/fucoin/wallet/WalletImpl.java index 42caa83e8cfbb337172d5a04cd092aa44b3f4426..613f3952e00996d47d6626c3a1b5fe670f285850 100644 --- a/src/main/java/fucoin/wallet/WalletImpl.java +++ b/src/main/java/fucoin/wallet/WalletImpl.java @@ -32,7 +32,7 @@ public class WalletImpl extends AbstractWallet { private AggregationContext aggregationContext; private boolean hasPendingAggregationRequest = false; private ConcurrentLinkedQueue<ActorRef> deferedSupervisorReceivers = new ConcurrentLinkedQueue<>(); - private final EvictingQueue<UUID> handledAggregationRequests = EvictingQueue.create(10); + private transient final EvictingQueue<UUID> handledAggregationRequests = EvictingQueue.create(10); public WalletImpl(String name) { super(name);