Skip to content
Snippets Groups Projects
Commit 5d9d9fa7 authored by Kim Kern's avatar Kim Kern
Browse files

add sum and count as aggregation functions, allow for multiple aggregations at the same time

parent 6194941b
Branches
No related tags found
No related merge requests found
......@@ -26,11 +26,8 @@ public class ActionAggregationInit extends ActionAggregation {
protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, AbstractWallet wallet) {
AggregationContext initContext;
if (useValueExtractor) {
initContext = getContext().initContext(wallet);
} else {
initContext = getContext();
}
initContext = getContext().initContext(wallet, useValueExtractor);
wallet.setAggregationContext(initContext);
continueAggregation(wallet);
}
......
......@@ -77,7 +77,7 @@ public class ActionAggregationRequest extends ActionAggregation {
}
private void initContext(AbstractWallet wallet) {
AggregationContext aggregationContext = getContext().initContext(wallet);
AggregationContext aggregationContext = getContext().initContext(wallet, true);
wallet.setAggregationContext(aggregationContext);
}
......
......@@ -4,6 +4,7 @@
package fucoin.actions.aggregation;
import java.io.Serializable;
import java.util.Arrays;
import java.util.UUID;
import java.util.function.Function;
......@@ -16,37 +17,41 @@ import fucoin.wallet.AbstractWallet;
public class AggregationContext implements Serializable {
private static final long serialVersionUID = -8314269748209085088L;
private final AggregationFunction function;
private final double value;
private final Function<AbstractWallet, Double> valueExtractor;
private final double[] values;
private final Function<AbstractWallet, double[]> valueExtractor;
private final Function<AbstractWallet, double[]> valueAltExtractor;
private final int currentExchanges;
private final int maxExchanges;
private final UUID uuid;
private Function<double[], Double> resultExtractor;
private AggregationContext(AggregationFunction function, Function<AbstractWallet, Double> valueExtractor,
double value, int currentExchanges, int maxExchanges, UUID uuid) {
private AggregationContext(AggregationFunction function, Function<AbstractWallet, double[]> valueExtractor, Function<AbstractWallet, double[]> valueAltExtractor,
double[] values, int currentExchanges, int maxExchanges, UUID uuid, Function<double[], Double> resultExtractor) {
this.function = function;
this.valueExtractor = valueExtractor;
this.value = value;
this.valueAltExtractor = valueAltExtractor;
this.values = values;
this.currentExchanges = currentExchanges;
this.maxExchanges = maxExchanges;
this.uuid = uuid;
this.resultExtractor = resultExtractor;
}
public AggregationContext aggregate(AggregationContext neighborContext) {
double aggregatedValue = getFunction().apply(neighborContext.getValue(), getValue());
AggregationContext aggregatedContext = new AggregationContextBuilder(getFunction(), getValueExtractor(),
aggregatedValue)
double[] aggregatedValues = getFunction().apply(neighborContext.getValues(), getValues());
AggregationContext aggregatedContext = new AggregationContextBuilder(getFunction(), getValueExtractor(), getAltValueExtractor(),
aggregatedValues, resultExtractor)
.setCurrentExchanges(getCurrentExchanges() + 1)
.setMaxExchanges(getMaxExchanges())
.setUuid(getUuid())
.build();
System.out.println("Aggregation: " + getValue() + " + " + neighborContext.getValue() + " = " + aggregatedValue);
System.out.println("Aggregation: " + Arrays.toString(getValues()) + " + " + Arrays.toString(neighborContext.getValues()) + " = " + aggregatedValues);
return aggregatedContext;
}
public AggregationContext initContext(AbstractWallet wallet) {
double initValue = getValueExtractor().apply(wallet);
AggregationContext initContext = new AggregationContextBuilder(getFunction(), getValueExtractor(), initValue)
public AggregationContext initContext(AbstractWallet wallet, boolean useValueExtractor) {
double[] initValues = useValueExtractor ? getValueExtractor().apply(wallet) : getAltValueExtractor().apply(wallet);
AggregationContext initContext = new AggregationContextBuilder(getFunction(), getValueExtractor(), getAltValueExtractor(), initValues, resultExtractor)
.setMaxExchanges(getMaxExchanges())
.setUuid(getUuid())
.build();
......@@ -58,13 +63,17 @@ public class AggregationContext implements Serializable {
return isDone;
}
public double getValue() {
return value;
public double[] getValues() {
return values;
}
public double getResult() {
return resultExtractor.apply(values);
}
@Override
public String toString() {
return "The aggregated value is " + getValue() + " after " + getCurrentExchanges() + " aggregations.";
return "The aggregated values are " + Arrays.toString(getValues()) + " after " + getCurrentExchanges() + " aggregations.";
}
......@@ -72,10 +81,14 @@ public class AggregationContext implements Serializable {
return function;
}
private Function<AbstractWallet, Double> getValueExtractor() {
private Function<AbstractWallet, double[]> getValueExtractor() {
return valueExtractor;
}
private Function<AbstractWallet, double[]> getAltValueExtractor() {
return valueAltExtractor;
}
private int getCurrentExchanges() {
return currentExchanges;
}
......@@ -90,20 +103,24 @@ public class AggregationContext implements Serializable {
public static class AggregationContextBuilder {
private final AggregationFunction function;
private final double value;
private final Function<AbstractWallet, Double> valueExtractor;
private final double[] values;
private final Function<AbstractWallet, double[]> valueExtractor;
private final Function<AbstractWallet, double[]> valueAltExtractor;
private int currentExchanges;
private int maxExchanges;
private UUID uuid;
private Function<double[], Double> resultExtractor;
public AggregationContextBuilder(AggregationFunction function, Function<AbstractWallet, Double> valueExtractor,
double value) {
public AggregationContextBuilder(AggregationFunction function, Function<AbstractWallet, double[]> valueExtractor, Function<AbstractWallet, double[]> valueAltExtractor,
double[] values, Function<double[], Double> resultExtractor) {
this.function = function;
this.valueExtractor = valueExtractor;
this.value = value;
this.valueAltExtractor = valueAltExtractor;
this.values = values;
this.currentExchanges = 0;
this.maxExchanges = 20;
this.uuid = UUID.randomUUID();
this.resultExtractor = resultExtractor;
}
public AggregationContextBuilder setCurrentExchanges(int currentExchanges) {
......@@ -122,7 +139,7 @@ public class AggregationContext implements Serializable {
}
public AggregationContext build() {
return new AggregationContext(function, valueExtractor, value, currentExchanges, maxExchanges, uuid);
return new AggregationContext(function, valueExtractor, valueAltExtractor, values, currentExchanges, maxExchanges, uuid, resultExtractor);
}
}
......
......@@ -11,9 +11,9 @@ import java.util.function.BiFunction;
* @author Kim
*
*/
public interface AggregationFunction extends BiFunction<Double, Double, Double> {
public interface AggregationFunction extends BiFunction<double[], double[], double[]> {
@Override
public Double apply(Double t, Double u);
public double[] apply(double[] t, double[] u);
}
......@@ -5,34 +5,46 @@ import java.util.function.Function;
import fucoin.wallet.AbstractWallet;
public enum AggregationMethod {
Average(x -> new Double(x.getAmount()), (x, y) -> (x + y) / 2), Maximum(x -> new Double(x.getAmount()),
(x, y) -> Math.max(x, y)), Minimum(x -> new Double(x.getAmount()),
(x, y) -> Math.min(x, y)), Count(x -> 0.0, (x, y) -> (x + y) / 2, false, 1);
private final Function<AbstractWallet, Double> valueExtractor;
Average(x -> new double[]{x.getAmount()}, (x, y) -> new double[]{(x[0] + y[0]) / 2}),
Maximum(x -> new double[]{x.getAmount()}, (x, y) -> new double[]{Math.max(x[0], y[0])}),
Minimum(x -> new double[]{x.getAmount()}, (x, y) -> new double[]{Math.min(x[0], y[0])}),
Count(x -> new double[]{0}, (x, y) -> new double[]{(x[0] + y[0]) / 2}, x -> (1 / x[0]), false, x -> new double[]{1}),
Sum(x -> new double[]{x.getAmount(), 0}, (x, y) -> new double[]{(x[0] + y[0]) / 2, (x[1] + y[1]) / 2}, x -> x[0]*(1 / x[1]), false, x -> new double[]{x.getAmount(), 1});
private final Function<AbstractWallet, double[]> valueExtractor;
private final Function<AbstractWallet, double[]> valueAltExtractor;
private final AggregationFunction function;
private final boolean useValueExtractor;
private double value;
private double[] values;
private final Function<double[], Double> resultExtractor;
AggregationMethod(Function<AbstractWallet, Double> valueExtractor, AggregationFunction function) {
AggregationMethod(Function<AbstractWallet, double[]> valueExtractor, AggregationFunction function) {
this.valueExtractor = valueExtractor;
this.function = function;
this.useValueExtractor = true;
this.value = 0;
this.values = new double[]{ 0 };
this.resultExtractor = x -> x[0];
this.valueAltExtractor = null;
}
AggregationMethod(Function<AbstractWallet, Double> valueExtractor, AggregationFunction function,
boolean useValueExtractor, double value) {
AggregationMethod(Function<AbstractWallet, double[]> valueExtractor, AggregationFunction function, Function<double[], Double> resultExtractor,
boolean useValueExtractor, Function<AbstractWallet, double[]> valueAltExtractor) {
this.valueExtractor = valueExtractor;
this.function = function;
this.useValueExtractor = useValueExtractor;
this.value = value;
this.values = new double[]{0};
this.resultExtractor = resultExtractor;
this.valueAltExtractor = valueAltExtractor;
}
public Function<AbstractWallet, Double> getValueExtractor() {
public Function<AbstractWallet, double[]> getValueExtractor() {
return valueExtractor;
}
public Function<AbstractWallet, double[]> getValueAltExtractor() {
return valueAltExtractor;
}
public AggregationFunction getFunction() {
return function;
}
......@@ -41,8 +53,11 @@ public enum AggregationMethod {
return useValueExtractor;
}
public double getValue() {
return value;
public double[] getValues() {
return values;
}
public Function<double[], Double> getResultExtractor() {
return resultExtractor;
}
}
......@@ -180,10 +180,12 @@ public class SuperVisorImpl extends AbstractNode implements TransactionLogger {
public void startAggregation(AggregationMethod method, int exchanges) {
clearAggregationResults();
Function<AbstractWallet, Double> valueExtractor = method.getValueExtractor();
Function<AbstractWallet, double[]> valueExtractor = method.getValueExtractor();
Function<AbstractWallet, double[]> valueAltExtractor = method.getValueAltExtractor();
AggregationFunction function = method.getFunction();
double value = method.getValue();
AggregationContext context = new AggregationContextBuilder(function, valueExtractor, value)
double[] values = method.getValues();
Function<double[], Double> resultExtractor = method.getResultExtractor();
AggregationContext context = new AggregationContextBuilder(function, valueExtractor, valueAltExtractor, values, resultExtractor)
.setMaxExchanges(exchanges)
.build();
ActorRef randomNode = getRandomNeighbor();
......@@ -221,6 +223,6 @@ public class SuperVisorImpl extends AbstractNode implements TransactionLogger {
System.out.println("add: " + context.toString());
getAggregationResults().put(wallet, context);
gui.updateAggregationValue(wallet.path()
.name(), context.getValue());
.name(), context.getResult());
}
}
......@@ -30,7 +30,7 @@ public class WalletImpl extends AbstractWallet {
private transient WalletGuiControl gui;
private String preKnownNeighbourName;
private boolean isActive;
private AggregationContext aggregationContext = new AggregationContextBuilder(null, null, amount).build();
private AggregationContext aggregationContext = new AggregationContextBuilder(null, null, null, null, null).build();
private boolean hasPendingAggregationRequest = false;
private ConcurrentLinkedQueue<ActorRef> deferedSupervisorReceivers = new ConcurrentLinkedQueue<>();
private transient final EvictingQueue<UUID> handledAggregationRequests = EvictingQueue.create(10);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment