From d8ea09da44d3bdb7223484a28816af45fd015ab5 Mon Sep 17 00:00:00 2001
From: Kim Kern <kim.kern@fu-berlin.de>
Date: Tue, 5 Jul 2016 23:53:42 +0200
Subject: [PATCH] add gui support for several aggregation methods

---
 .../aggregation/ActionAggregation.java        |  9 +++-
 .../aggregation/ActionAggregationCancel.java  |  1 +
 .../aggregation/ActionAggregationEnd.java     | 26 ++++++++++
 .../aggregation/ActionAggregationInit.java    | 16 +++++-
 .../aggregation/ActionAggregationResult.java  |  2 +-
 .../aggregation/AggregationMethod.java        | 48 ++++++++++++++++++
 .../MassWalletConfiguration.java              |  4 +-
 .../java/fucoin/gui/SuperVisorGuiControl.java |  2 +
 .../fucoin/gui/SuperVisorGuiControlImpl.java  | 10 +++-
 .../java/fucoin/gui/SuperVisorThreadGUI.java  | 32 ++++++++++--
 .../fucoin/supervisor/AmountTableModel.java   | 22 ++++++--
 .../fucoin/supervisor/SuperVisorImpl.java     | 50 ++++++++++++++++---
 src/main/java/fucoin/wallet/WalletImpl.java   |  2 +-
 13 files changed, 200 insertions(+), 24 deletions(-)
 create mode 100644 src/main/java/fucoin/actions/aggregation/ActionAggregationEnd.java
 create mode 100644 src/main/java/fucoin/actions/aggregation/AggregationMethod.java

diff --git a/src/main/java/fucoin/actions/aggregation/ActionAggregation.java b/src/main/java/fucoin/actions/aggregation/ActionAggregation.java
index 5624ff7..723768d 100644
--- a/src/main/java/fucoin/actions/aggregation/ActionAggregation.java
+++ b/src/main/java/fucoin/actions/aggregation/ActionAggregation.java
@@ -35,7 +35,7 @@ public abstract class ActionAggregation extends ClientAction {
 		} else {
 			wallet.clearExcludedNeighbors();
 			sendAggregatedResult(wallet, aggregationContext);
-
+			sendEndMessage(wallet);
 		}
 	}
 
@@ -46,6 +46,13 @@ public abstract class ActionAggregation extends ClientAction {
 				.tell(result, wallet.getSelf());
 	}
 
+	protected void sendEndMessage(AbstractWallet wallet) {
+		ActionAggregationEnd end = new ActionAggregationEnd();
+		wallet.getKnownNeighbors()
+				.values()
+				.forEach(w -> w.tell(end, wallet.getSelf()));
+	}
+
 	private void sendSleepingRequest(AbstractWallet wallet, AggregationContext aggregationContext) {
 		ActorRef randomNeighbor = wallet.getRandomNeighbor();
 		Thread sleepingRequest = new Thread() {
diff --git a/src/main/java/fucoin/actions/aggregation/ActionAggregationCancel.java b/src/main/java/fucoin/actions/aggregation/ActionAggregationCancel.java
index e990155..49e4478 100644
--- a/src/main/java/fucoin/actions/aggregation/ActionAggregationCancel.java
+++ b/src/main/java/fucoin/actions/aggregation/ActionAggregationCancel.java
@@ -36,6 +36,7 @@ public class ActionAggregationCancel extends ActionAggregation {
 			continueAggregation(wallet);
 		} else if (wallet.hasAggregationContext()) {
 			sendAggregatedResult(wallet, wallet.getAggregationContext());
+			sendEndMessage(wallet);
 		}
 	}
 
diff --git a/src/main/java/fucoin/actions/aggregation/ActionAggregationEnd.java b/src/main/java/fucoin/actions/aggregation/ActionAggregationEnd.java
new file mode 100644
index 0000000..ea8c720
--- /dev/null
+++ b/src/main/java/fucoin/actions/aggregation/ActionAggregationEnd.java
@@ -0,0 +1,26 @@
+package fucoin.actions.aggregation;
+
+import akka.actor.ActorRef;
+import akka.actor.UntypedActorContext;
+import fucoin.actions.ClientAction;
+import fucoin.wallet.AbstractWallet;
+
+/**
+ * When a node gets an ActionAggregationEnd message, it immediately sends back
+ * its AggregationContext to the supervisor.
+ * 
+ * @author Kim
+ *
+ */
+public class ActionAggregationEnd extends ClientAction {
+
+	private static final long serialVersionUID = 6353251813735942634L;
+
+	@Override
+	protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, AbstractWallet wallet) {
+		ActorRef superVisor = wallet.getRemoteSuperVisorActor();
+		ActionAggregationResult result = new ActionAggregationResult(wallet.getAggregationContext());
+		superVisor.tell(result, self);
+	}
+
+}
diff --git a/src/main/java/fucoin/actions/aggregation/ActionAggregationInit.java b/src/main/java/fucoin/actions/aggregation/ActionAggregationInit.java
index e5d7014..42b7736 100644
--- a/src/main/java/fucoin/actions/aggregation/ActionAggregationInit.java
+++ b/src/main/java/fucoin/actions/aggregation/ActionAggregationInit.java
@@ -13,19 +13,31 @@ import fucoin.wallet.AbstractWallet;
  */
 public class ActionAggregationInit extends ActionAggregation {
 
-	public ActionAggregationInit(AggregationContext context) {
+	private final boolean useValueExtractor;
+
+	public ActionAggregationInit(AggregationContext context, boolean useValueExtractor) {
 		super(context);
+		this.useValueExtractor = useValueExtractor;
 	}
 
 	private static final long serialVersionUID = -7416863664917157007L;
 
 	@Override
 	protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, AbstractWallet wallet) {
-		AggregationContext initContext = getContext().initContext(wallet);
+		AggregationContext initContext;
+
+		if (useValueExtractor) {
+			initContext = getContext().initContext(wallet);
+		} else {
+			initContext = getContext();
+		}
 		wallet.setAggregationContext(initContext);
 		continueAggregation(wallet);
 	}
 
+	public boolean isUseValueExtractor() {
+		return useValueExtractor;
+	}
 
 
 }
diff --git a/src/main/java/fucoin/actions/aggregation/ActionAggregationResult.java b/src/main/java/fucoin/actions/aggregation/ActionAggregationResult.java
index 8375a48..9a74e2b 100644
--- a/src/main/java/fucoin/actions/aggregation/ActionAggregationResult.java
+++ b/src/main/java/fucoin/actions/aggregation/ActionAggregationResult.java
@@ -24,7 +24,7 @@ public class ActionAggregationResult extends SuperVisorAction {
 
 	@Override
 	protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, SuperVisorImpl superVisor) {
-		// TODO add to supervisor result list
+		superVisor.addAggregationResult(sender, getContext());
 	}
 
 	public AggregationContext getContext() {
diff --git a/src/main/java/fucoin/actions/aggregation/AggregationMethod.java b/src/main/java/fucoin/actions/aggregation/AggregationMethod.java
new file mode 100644
index 0000000..85fffe1
--- /dev/null
+++ b/src/main/java/fucoin/actions/aggregation/AggregationMethod.java
@@ -0,0 +1,48 @@
+package fucoin.actions.aggregation;
+
+import java.util.function.Function;
+
+import fucoin.wallet.AbstractWallet;
+
+public enum AggregationMethod {
+	Average(x -> new Double(x.getAmount()), (x, y) -> (x + y) / 2), Maximum(x -> new Double(x.getAmount()),
+			(x, y) -> Math.max(x, y)), Minimum(x -> new Double(x.getAmount()),
+					(x, y) -> Math.min(x, y)), Count(x -> 0.0, (x, y) -> (x + y) / 2, false, 1);
+
+	private final Function<AbstractWallet, Double> valueExtractor;
+	private final AggregationFunction function;
+	private final boolean useValueExtractor;
+	private double value;
+
+	AggregationMethod(Function<AbstractWallet, Double> valueExtractor, AggregationFunction function) {
+		this.valueExtractor = valueExtractor;
+		this.function = function;
+		this.useValueExtractor = true;
+		this.value = 0;
+	}
+
+	AggregationMethod(Function<AbstractWallet, Double> valueExtractor, AggregationFunction function,
+			boolean useValueExtractor, double value) {
+		this.valueExtractor = valueExtractor;
+		this.function = function;
+		this.useValueExtractor = useValueExtractor;
+		this.value = value;
+	}
+
+	public Function<AbstractWallet, Double> getValueExtractor() {
+		return valueExtractor;
+	}
+
+	public AggregationFunction getFunction() {
+		return function;
+	}
+
+	public boolean isUseValueExtractor() {
+		return useValueExtractor;
+	}
+
+	public double getValue() {
+		return value;
+	}
+
+}
diff --git a/src/main/java/fucoin/configurations/MassWalletConfiguration.java b/src/main/java/fucoin/configurations/MassWalletConfiguration.java
index ac6f08b..5eef7a1 100644
--- a/src/main/java/fucoin/configurations/MassWalletConfiguration.java
+++ b/src/main/java/fucoin/configurations/MassWalletConfiguration.java
@@ -12,13 +12,13 @@ public class MassWalletConfiguration extends AbstractConfiguration {
     public void run() {
 		ActorRef supervisor = initSupervisor();
         try {
-			spawnWallets(3, false);
+			spawnWallets(10, false);
             System.out.println("Wallet spawning done!");
         } catch (Exception e) {
             System.out.println("Wallet spawning timed out!");
         }
 
-		randomTransactions(3, 3);
+		randomTransactions(10, 10);
     }
 
     @Override
diff --git a/src/main/java/fucoin/gui/SuperVisorGuiControl.java b/src/main/java/fucoin/gui/SuperVisorGuiControl.java
index b8b29e5..a5d10b8 100644
--- a/src/main/java/fucoin/gui/SuperVisorGuiControl.java
+++ b/src/main/java/fucoin/gui/SuperVisorGuiControl.java
@@ -9,4 +9,6 @@ public interface SuperVisorGuiControl extends TransactionLogger {
 
     public void updateTable(String address, String name, int amount);
 
+	public void updateAggregationValue(String name, double d);
+
 }
diff --git a/src/main/java/fucoin/gui/SuperVisorGuiControlImpl.java b/src/main/java/fucoin/gui/SuperVisorGuiControlImpl.java
index 453b6c6..46d8b9a 100644
--- a/src/main/java/fucoin/gui/SuperVisorGuiControlImpl.java
+++ b/src/main/java/fucoin/gui/SuperVisorGuiControlImpl.java
@@ -1,5 +1,6 @@
 package fucoin.gui;
 
+import fucoin.actions.aggregation.AggregationMethod;
 import fucoin.supervisor.AmountTableModel;
 import fucoin.supervisor.SuperVisorImpl;
 
@@ -40,6 +41,11 @@ public class SuperVisorGuiControlImpl implements SuperVisorGuiControl {
         this.amountTableModel.updateTable(address, name, amount);
     }
 
+	@Override
+	public void updateAggregationValue(String name, double value) {
+		this.amountTableModel.updateAggregationValue(name, value);
+	}
+
     private void log(LogMessage logMessage) {
         if (logActive) {
             threadGUI.log(logMessage);
@@ -81,7 +87,7 @@ public class SuperVisorGuiControlImpl implements SuperVisorGuiControl {
         return logActive;
     }
 
-	public void startAggregation() {
-		superVisor.startAggregation();
+	public void startAggregation(AggregationMethod method, int exchanges) {
+		superVisor.startAggregation(method, exchanges);
 	}
 }
diff --git a/src/main/java/fucoin/gui/SuperVisorThreadGUI.java b/src/main/java/fucoin/gui/SuperVisorThreadGUI.java
index 63b8f8d..a722a3f 100644
--- a/src/main/java/fucoin/gui/SuperVisorThreadGUI.java
+++ b/src/main/java/fucoin/gui/SuperVisorThreadGUI.java
@@ -8,13 +8,18 @@ import java.awt.event.WindowEvent;
 
 import javax.swing.JButton;
 import javax.swing.JCheckBox;
+import javax.swing.JComboBox;
 import javax.swing.JFrame;
 import javax.swing.JList;
 import javax.swing.JPanel;
 import javax.swing.JScrollPane;
+import javax.swing.JSpinner;
 import javax.swing.JTable;
+import javax.swing.SpinnerNumberModel;
 import javax.swing.SwingUtilities;
 
+import fucoin.actions.aggregation.AggregationMethod;
+
 /**
  *
  */
@@ -30,6 +35,10 @@ public class SuperVisorThreadGUI {
 
 	private JButton startAggregationButton;
 
+	private JComboBox<AggregationMethod> aggregationMethodsBox;
+
+	private JSpinner aggregationExchangesSpinner;
+
     public SuperVisorThreadGUI(SuperVisorGuiControlImpl superVisorGuiControl) {
 
         this.superVisorGuiControl = superVisorGuiControl;
@@ -50,9 +59,15 @@ public class SuperVisorThreadGUI {
 
             txtLog.setCellRenderer(new LogCellRenderer());
             
+			aggregationMethodsBox = new JComboBox<AggregationMethod>(AggregationMethod.values());
+			aggregationExchangesSpinner = new JSpinner(new SpinnerNumberModel(25, 1, 10000, 1));
+
 			startAggregationButton = new JButton("Start Aggregation");
             startAggregationButton.addActionListener(e -> {
-				superVisorGuiControl.startAggregation();
+				AggregationMethod method = (AggregationMethod) aggregationMethodsBox.getSelectedItem();
+				int exchanges = (int) aggregationExchangesSpinner.getModel()
+						.getValue();
+				superVisorGuiControl.startAggregation(method, exchanges);
             });
 
             showDebug = new JCheckBox("Show debug messages in transaction log");
@@ -76,11 +91,18 @@ public class SuperVisorThreadGUI {
                 }
             });
 
-            JPanel configPanel = new JPanel();
+			JPanel configPanel = new JPanel(new BorderLayout());
+			JPanel logConfigPanel = new JPanel();
+			JPanel aggregationPanel = new JPanel();
+
+			configPanel.add(logConfigPanel, BorderLayout.NORTH);
+			configPanel.add(aggregationPanel, BorderLayout.SOUTH);
 
-            configPanel.add(activateLogging);
-            configPanel.add(showDebug);
-			configPanel.add(startAggregationButton);
+			logConfigPanel.add(activateLogging);
+			logConfigPanel.add(showDebug);
+			aggregationPanel.add(aggregationExchangesSpinner);
+			aggregationPanel.add(aggregationMethodsBox);
+			aggregationPanel.add(startAggregationButton);
 
             //logPanel.add(activateLogging, BorderLayout.NORTH);
             logPanel.add(configPanel, BorderLayout.NORTH);
diff --git a/src/main/java/fucoin/supervisor/AmountTableModel.java b/src/main/java/fucoin/supervisor/AmountTableModel.java
index 80378e9..44e8639 100644
--- a/src/main/java/fucoin/supervisor/AmountTableModel.java
+++ b/src/main/java/fucoin/supervisor/AmountTableModel.java
@@ -1,13 +1,13 @@
 package fucoin.supervisor;
 
-import javax.swing.*;
-import javax.swing.table.DefaultTableModel;
 import java.util.Vector;
 
+import javax.swing.table.DefaultTableModel;
+
 public class AmountTableModel extends DefaultTableModel {
 
     public AmountTableModel() {
-        super(new Object[]{"Address", "Name", "Amount"}, 0);
+		super(new Object[] { "Address", "Name", "Amount", "Aggregation Value" }, 0);
     }
 
     public void clear() {
@@ -29,6 +29,20 @@ public class AmountTableModel extends DefaultTableModel {
             }
         }
 
-        this.addRow(new Object[]{address, name, amount});
+		this.addRow(new Object[] { address, name, amount, null });
     }
+
+	public void updateAggregationValue(String name, double value) {
+		Vector rows = this.getDataVector();
+		for (int i = 0; i < rows.size(); i++) {
+			if (rows.get(i) instanceof Vector) {
+				Vector<Object> row = (Vector<Object>) rows.get(i);
+				if (row.get(1)
+						.equals(name)) {
+					setValueAt(value, i, 3);
+					return;
+				}
+			}
+		}
+	}
 }
diff --git a/src/main/java/fucoin/supervisor/SuperVisorImpl.java b/src/main/java/fucoin/supervisor/SuperVisorImpl.java
index c3b165d..86ffa7a 100644
--- a/src/main/java/fucoin/supervisor/SuperVisorImpl.java
+++ b/src/main/java/fucoin/supervisor/SuperVisorImpl.java
@@ -17,6 +17,7 @@ import fucoin.actions.aggregation.ActionAggregationInit;
 import fucoin.actions.aggregation.AggregationContext;
 import fucoin.actions.aggregation.AggregationContext.AggregationContextBuilder;
 import fucoin.actions.aggregation.AggregationFunction;
+import fucoin.actions.aggregation.AggregationMethod;
 import fucoin.actions.control.ActionWalletCreationDone;
 import fucoin.actions.persist.ActionInvokeUpdate;
 import fucoin.actions.transaction.ActionGetAmountAnswer;
@@ -37,6 +38,8 @@ public class SuperVisorImpl extends AbstractNode implements TransactionLogger {
     private int pendingBankCommits = 0;
     private ActorRef bankCommitObserver = null;
 
+	private final Map<ActorRef, AggregationContext> aggregationResults = new HashMap<>();
+
     public SuperVisorImpl() {
 
     }
@@ -175,14 +178,49 @@ public class SuperVisorImpl extends AbstractNode implements TransactionLogger {
         this.bankCommitObserver = bankCommitObserver;
     }
 
-	public void startAggregation() {
-		Function<AbstractWallet, Double> valueExtractor = x -> new Double(x.getAmount());
-		AggregationFunction function = (x, y) -> (x + y) / 2;
-		AggregationContext context = new AggregationContextBuilder(function, valueExtractor, 0)
-				.setMaxExchanges(5)
+	public void startAggregation(AggregationMethod method, int exchanges) {
+		clearAggregationResults();
+		Function<AbstractWallet, Double> valueExtractor = method.getValueExtractor();
+		AggregationFunction function = method.getFunction();
+		double value = method.getValue();
+		AggregationContext context = new AggregationContextBuilder(function, valueExtractor, value)
+				.setMaxExchanges(exchanges)
 				.build();
 		ActorRef randomNode = getRandomNeighbor();
-		ActionAggregationInit init = new ActionAggregationInit(context);
+		ActionAggregationInit init = new ActionAggregationInit(context, method.isUseValueExtractor());
 		randomNode.tell(init, getSelf());
 	}
+
+	/**
+	 * Returns the results of an aggregation operation that have been sent back
+	 * to the supervisor.
+	 * 
+	 * @return aggregation results
+	 */
+	public Map<ActorRef, AggregationContext> getAggregationResults() {
+		return aggregationResults;
+	}
+
+	/**
+	 * Clears previous aggregation results, such that a new aggregation can be
+	 * started.
+	 */
+	public void clearAggregationResults() {
+		getAggregationResults().clear();
+	}
+
+	/**
+	 * Adds a new aggregation result, consisting of the ActorRef of the wallet
+	 * and the corresponding AggregationContext.
+	 * 
+	 * @param wallet
+	 * @param context
+	 *            AggregationContext, that holds the aggregated value
+	 */
+	public void addAggregationResult(ActorRef wallet, AggregationContext context) {
+		System.out.println("add: " + context.toString());
+		getAggregationResults().put(wallet, context);
+		gui.updateAggregationValue(wallet.path()
+				.name(), context.getValue());
+	}
 }
diff --git a/src/main/java/fucoin/wallet/WalletImpl.java b/src/main/java/fucoin/wallet/WalletImpl.java
index 42caa83..613f395 100644
--- a/src/main/java/fucoin/wallet/WalletImpl.java
+++ b/src/main/java/fucoin/wallet/WalletImpl.java
@@ -32,7 +32,7 @@ public class WalletImpl extends AbstractWallet {
 	private AggregationContext aggregationContext;
 	private boolean hasPendingAggregationRequest = false;
     private ConcurrentLinkedQueue<ActorRef> deferedSupervisorReceivers = new ConcurrentLinkedQueue<>();
-	private final EvictingQueue<UUID> handledAggregationRequests = EvictingQueue.create(10);
+	private transient final EvictingQueue<UUID> handledAggregationRequests = EvictingQueue.create(10);
 
     public WalletImpl(String name) {
         super(name);
-- 
GitLab