From 5d9d9fa71888decc0624b29c9925c8606f5ed569 Mon Sep 17 00:00:00 2001
From: Kim Kern <kim.kern@fu-berlin.de>
Date: Thu, 14 Jul 2016 00:05:41 +0200
Subject: [PATCH] add sum and count as aggregation functions, allow for
 multiple aggregations at the same time

---
 .../aggregation/ActionAggregationInit.java    |  7 +--
 .../aggregation/ActionAggregationRequest.java |  2 +-
 .../aggregation/AggregationContext.java       | 61 ++++++++++++-------
 .../aggregation/AggregationFunction.java      |  4 +-
 .../aggregation/AggregationMethod.java        | 45 +++++++++-----
 .../fucoin/supervisor/SuperVisorImpl.java     | 10 +--
 src/main/java/fucoin/wallet/WalletImpl.java   |  2 +-
 7 files changed, 81 insertions(+), 50 deletions(-)

diff --git a/src/main/java/fucoin/actions/aggregation/ActionAggregationInit.java b/src/main/java/fucoin/actions/aggregation/ActionAggregationInit.java
index 42b7736..aa985d4 100644
--- a/src/main/java/fucoin/actions/aggregation/ActionAggregationInit.java
+++ b/src/main/java/fucoin/actions/aggregation/ActionAggregationInit.java
@@ -26,11 +26,8 @@ public class ActionAggregationInit extends ActionAggregation {
 	protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, AbstractWallet wallet) {
 		AggregationContext initContext;
 
-		if (useValueExtractor) {
-			initContext = getContext().initContext(wallet);
-		} else {
-			initContext = getContext();
-		}
+		initContext = getContext().initContext(wallet, useValueExtractor);
+		
 		wallet.setAggregationContext(initContext);
 		continueAggregation(wallet);
 	}
diff --git a/src/main/java/fucoin/actions/aggregation/ActionAggregationRequest.java b/src/main/java/fucoin/actions/aggregation/ActionAggregationRequest.java
index 5fde093..f52ee0d 100644
--- a/src/main/java/fucoin/actions/aggregation/ActionAggregationRequest.java
+++ b/src/main/java/fucoin/actions/aggregation/ActionAggregationRequest.java
@@ -77,7 +77,7 @@ public class ActionAggregationRequest extends ActionAggregation {
 	}
 
 	private void initContext(AbstractWallet wallet) {
-		AggregationContext aggregationContext = getContext().initContext(wallet);
+		AggregationContext aggregationContext = getContext().initContext(wallet, true);
 		wallet.setAggregationContext(aggregationContext);
 	}
 
diff --git a/src/main/java/fucoin/actions/aggregation/AggregationContext.java b/src/main/java/fucoin/actions/aggregation/AggregationContext.java
index 67e848b..df216a1 100644
--- a/src/main/java/fucoin/actions/aggregation/AggregationContext.java
+++ b/src/main/java/fucoin/actions/aggregation/AggregationContext.java
@@ -4,6 +4,7 @@
 package fucoin.actions.aggregation;
 
 import java.io.Serializable;
+import java.util.Arrays;
 import java.util.UUID;
 import java.util.function.Function;
 
@@ -16,37 +17,41 @@ import fucoin.wallet.AbstractWallet;
 public class AggregationContext implements Serializable {
 	private static final long serialVersionUID = -8314269748209085088L;
 	private final AggregationFunction function;
-	private final double value;
-	private final Function<AbstractWallet, Double> valueExtractor;
+	private final double[] values;
+	private final Function<AbstractWallet, double[]> valueExtractor;
+	private final Function<AbstractWallet, double[]> valueAltExtractor;
 	private final int currentExchanges;
 	private final int maxExchanges;
 	private final UUID uuid;
+	private Function<double[], Double> resultExtractor;
 
-	private AggregationContext(AggregationFunction function, Function<AbstractWallet, Double> valueExtractor,
-			double value, int currentExchanges, int maxExchanges, UUID uuid) {
+	private AggregationContext(AggregationFunction function, Function<AbstractWallet, double[]> valueExtractor, Function<AbstractWallet, double[]> valueAltExtractor,
+			double[] values, int currentExchanges, int maxExchanges, UUID uuid, Function<double[], Double> resultExtractor) {
 		this.function = function;
 		this.valueExtractor = valueExtractor;
-		this.value = value;
+		this.valueAltExtractor = valueAltExtractor;
+		this.values = values;
 		this.currentExchanges = currentExchanges;
 		this.maxExchanges = maxExchanges;
 		this.uuid = uuid;
+		this.resultExtractor = resultExtractor;
 	}
 
 	public AggregationContext aggregate(AggregationContext neighborContext) {
-		double aggregatedValue = getFunction().apply(neighborContext.getValue(), getValue());
-		AggregationContext aggregatedContext = new AggregationContextBuilder(getFunction(), getValueExtractor(),
-				aggregatedValue)
+		double[] aggregatedValues = getFunction().apply(neighborContext.getValues(), getValues());
+		AggregationContext aggregatedContext = new AggregationContextBuilder(getFunction(), getValueExtractor(), getAltValueExtractor(),
+				aggregatedValues, resultExtractor)
 						.setCurrentExchanges(getCurrentExchanges() + 1)
 						.setMaxExchanges(getMaxExchanges())
 						.setUuid(getUuid())
 						.build();
-		System.out.println("Aggregation: " + getValue() + " + " + neighborContext.getValue() + " = " + aggregatedValue);
+		System.out.println("Aggregation: " + Arrays.toString(getValues()) + " + " + Arrays.toString(neighborContext.getValues()) + " = " + aggregatedValues);
 		return aggregatedContext;
 	}
 
-	public AggregationContext initContext(AbstractWallet wallet) {
-		double initValue = getValueExtractor().apply(wallet);
-		AggregationContext initContext = new AggregationContextBuilder(getFunction(), getValueExtractor(), initValue)
+	public AggregationContext initContext(AbstractWallet wallet, boolean useValueExtractor) {
+		double[] initValues = useValueExtractor ? getValueExtractor().apply(wallet) : getAltValueExtractor().apply(wallet);
+		AggregationContext initContext = new AggregationContextBuilder(getFunction(), getValueExtractor(), getAltValueExtractor(), initValues, resultExtractor)
 				.setMaxExchanges(getMaxExchanges())
 				.setUuid(getUuid())
 				.build();
@@ -58,13 +63,17 @@ public class AggregationContext implements Serializable {
 		return isDone;
 	}
 
-	public double getValue() {
-		return value;
+	public double[] getValues() {
+		return values;
+	}
+	
+	public double getResult() {
+		return resultExtractor.apply(values);
 	}
 
 	@Override
 	public String toString() {
-		return "The aggregated value is " + getValue() + " after " + getCurrentExchanges() + " aggregations.";
+		return "The aggregated values are " + Arrays.toString(getValues()) + " after " + getCurrentExchanges() + " aggregations.";
 
 	}
 
@@ -72,9 +81,13 @@ public class AggregationContext implements Serializable {
 		return function;
 	}
 
-	private Function<AbstractWallet, Double> getValueExtractor() {
+	private Function<AbstractWallet, double[]> getValueExtractor() {
 		return valueExtractor;
 	}
+	
+	private Function<AbstractWallet, double[]> getAltValueExtractor() {
+		return valueAltExtractor;
+	}
 
 	private int getCurrentExchanges() {
 		return currentExchanges;
@@ -90,20 +103,24 @@ public class AggregationContext implements Serializable {
 
 	public static class AggregationContextBuilder {
 		private final AggregationFunction function;
-		private final double value;
-		private final Function<AbstractWallet, Double> valueExtractor;
+		private final double[] values;
+		private final Function<AbstractWallet, double[]> valueExtractor;
+		private final Function<AbstractWallet, double[]> valueAltExtractor;
 		private int currentExchanges;
 		private int maxExchanges;
 		private UUID uuid;
+		private Function<double[], Double> resultExtractor;
 
-		public AggregationContextBuilder(AggregationFunction function, Function<AbstractWallet, Double> valueExtractor,
-				double value) {
+		public AggregationContextBuilder(AggregationFunction function, Function<AbstractWallet, double[]> valueExtractor, Function<AbstractWallet, double[]> valueAltExtractor,
+				double[] values, Function<double[], Double> resultExtractor) {
 			this.function = function;
 			this.valueExtractor = valueExtractor;
-			this.value = value;
+			this.valueAltExtractor = valueAltExtractor;
+			this.values = values;
 			this.currentExchanges = 0;
 			this.maxExchanges = 20;
 			this.uuid = UUID.randomUUID();
+			this.resultExtractor = resultExtractor;
 		}
 
 		public AggregationContextBuilder setCurrentExchanges(int currentExchanges) {
@@ -122,7 +139,7 @@ public class AggregationContext implements Serializable {
 		}
 
 		public AggregationContext build() {
-			return new AggregationContext(function, valueExtractor, value, currentExchanges, maxExchanges, uuid);
+			return new AggregationContext(function, valueExtractor, valueAltExtractor, values, currentExchanges, maxExchanges, uuid, resultExtractor);
 		}
 
 	}
diff --git a/src/main/java/fucoin/actions/aggregation/AggregationFunction.java b/src/main/java/fucoin/actions/aggregation/AggregationFunction.java
index 2fda288..86134cd 100644
--- a/src/main/java/fucoin/actions/aggregation/AggregationFunction.java
+++ b/src/main/java/fucoin/actions/aggregation/AggregationFunction.java
@@ -11,9 +11,9 @@ import java.util.function.BiFunction;
  * @author Kim
  *
  */
-public interface AggregationFunction extends BiFunction<Double, Double, Double> {
+public interface AggregationFunction extends BiFunction<double[], double[], double[]> {
 
 	@Override
-	public Double apply(Double t, Double u);
+	public double[] apply(double[] t, double[] u);
 
 }
diff --git a/src/main/java/fucoin/actions/aggregation/AggregationMethod.java b/src/main/java/fucoin/actions/aggregation/AggregationMethod.java
index 85fffe1..460498e 100644
--- a/src/main/java/fucoin/actions/aggregation/AggregationMethod.java
+++ b/src/main/java/fucoin/actions/aggregation/AggregationMethod.java
@@ -5,33 +5,45 @@ import java.util.function.Function;
 import fucoin.wallet.AbstractWallet;
 
 public enum AggregationMethod {
-	Average(x -> new Double(x.getAmount()), (x, y) -> (x + y) / 2), Maximum(x -> new Double(x.getAmount()),
-			(x, y) -> Math.max(x, y)), Minimum(x -> new Double(x.getAmount()),
-					(x, y) -> Math.min(x, y)), Count(x -> 0.0, (x, y) -> (x + y) / 2, false, 1);
-
-	private final Function<AbstractWallet, Double> valueExtractor;
+	Average(x -> new double[]{x.getAmount()}, (x, y) -> new double[]{(x[0] + y[0]) / 2}),
+	Maximum(x -> new double[]{x.getAmount()}, (x, y) -> new double[]{Math.max(x[0], y[0])}),
+	Minimum(x -> new double[]{x.getAmount()}, (x, y) -> new double[]{Math.min(x[0], y[0])}),
+	Count(x -> new double[]{0}, (x, y) -> new double[]{(x[0] + y[0]) / 2}, x -> (1 / x[0]), false, x -> new double[]{1}),
+	Sum(x -> new double[]{x.getAmount(), 0}, (x, y) -> new double[]{(x[0] + y[0]) / 2, (x[1] + y[1]) / 2}, x -> x[0]*(1 / x[1]), false, x -> new double[]{x.getAmount(), 1});
+
+	private final Function<AbstractWallet, double[]> valueExtractor;
+	private final Function<AbstractWallet, double[]> valueAltExtractor;
 	private final AggregationFunction function;
 	private final boolean useValueExtractor;
-	private double value;
-
-	AggregationMethod(Function<AbstractWallet, Double> valueExtractor, AggregationFunction function) {
+	private double[] values;
+	private final Function<double[], Double> resultExtractor;
+	
+	AggregationMethod(Function<AbstractWallet, double[]> valueExtractor, AggregationFunction function) {
 		this.valueExtractor = valueExtractor;
 		this.function = function;
 		this.useValueExtractor = true;
-		this.value = 0;
+		this.values = new double[]{ 0 };
+		this.resultExtractor = x -> x[0];
+		this.valueAltExtractor = null;
 	}
 
-	AggregationMethod(Function<AbstractWallet, Double> valueExtractor, AggregationFunction function,
-			boolean useValueExtractor, double value) {
+	AggregationMethod(Function<AbstractWallet, double[]> valueExtractor, AggregationFunction function, Function<double[], Double> resultExtractor,
+			boolean useValueExtractor, Function<AbstractWallet, double[]> valueAltExtractor) {
 		this.valueExtractor = valueExtractor;
 		this.function = function;
 		this.useValueExtractor = useValueExtractor;
-		this.value = value;
+		this.values = new double[]{0};
+		this.resultExtractor = resultExtractor;
+		this.valueAltExtractor = valueAltExtractor;
 	}
 
-	public Function<AbstractWallet, Double> getValueExtractor() {
+	public Function<AbstractWallet, double[]> getValueExtractor() {
 		return valueExtractor;
 	}
+	
+	public Function<AbstractWallet, double[]> getValueAltExtractor() {
+		return valueAltExtractor;
+	}
 
 	public AggregationFunction getFunction() {
 		return function;
@@ -41,8 +53,11 @@ public enum AggregationMethod {
 		return useValueExtractor;
 	}
 
-	public double getValue() {
-		return value;
+	public double[] getValues() {
+		return values;
 	}
 
+	public Function<double[], Double> getResultExtractor() {
+		return resultExtractor;
+	}
 }
diff --git a/src/main/java/fucoin/supervisor/SuperVisorImpl.java b/src/main/java/fucoin/supervisor/SuperVisorImpl.java
index 86ffa7a..409f99b 100644
--- a/src/main/java/fucoin/supervisor/SuperVisorImpl.java
+++ b/src/main/java/fucoin/supervisor/SuperVisorImpl.java
@@ -180,10 +180,12 @@ public class SuperVisorImpl extends AbstractNode implements TransactionLogger {
 
 	public void startAggregation(AggregationMethod method, int exchanges) {
 		clearAggregationResults();
-		Function<AbstractWallet, Double> valueExtractor = method.getValueExtractor();
+		Function<AbstractWallet, double[]> valueExtractor = method.getValueExtractor();
+		Function<AbstractWallet, double[]> valueAltExtractor = method.getValueAltExtractor();
 		AggregationFunction function = method.getFunction();
-		double value = method.getValue();
-		AggregationContext context = new AggregationContextBuilder(function, valueExtractor, value)
+		double[] values = method.getValues();
+		Function<double[], Double> resultExtractor = method.getResultExtractor();
+		AggregationContext context = new AggregationContextBuilder(function, valueExtractor, valueAltExtractor, values, resultExtractor)
 				.setMaxExchanges(exchanges)
 				.build();
 		ActorRef randomNode = getRandomNeighbor();
@@ -221,6 +223,6 @@ public class SuperVisorImpl extends AbstractNode implements TransactionLogger {
 		System.out.println("add: " + context.toString());
 		getAggregationResults().put(wallet, context);
 		gui.updateAggregationValue(wallet.path()
-				.name(), context.getValue());
+				.name(), context.getResult());
 	}
 }
diff --git a/src/main/java/fucoin/wallet/WalletImpl.java b/src/main/java/fucoin/wallet/WalletImpl.java
index 9afc7d3..fb8fb8c 100644
--- a/src/main/java/fucoin/wallet/WalletImpl.java
+++ b/src/main/java/fucoin/wallet/WalletImpl.java
@@ -30,7 +30,7 @@ public class WalletImpl extends AbstractWallet {
     private transient WalletGuiControl gui;
     private String preKnownNeighbourName;
     private boolean isActive;
-	private AggregationContext aggregationContext = new AggregationContextBuilder(null, null, amount).build();
+	private AggregationContext aggregationContext = new AggregationContextBuilder(null, null, null, null, null).build();
 	private boolean hasPendingAggregationRequest = false;
     private ConcurrentLinkedQueue<ActorRef> deferedSupervisorReceivers = new ConcurrentLinkedQueue<>();
 	private transient final EvictingQueue<UUID> handledAggregationRequests = EvictingQueue.create(10);
-- 
GitLab