diff --git a/pom.xml b/pom.xml index 05d1ca3a064ed4aec8cf691471fb7f6185e84c04..ca8d113ddf2b061526fbfd2129f08a15246273c6 100644 --- a/pom.xml +++ b/pom.xml @@ -62,6 +62,15 @@ <artifactId>gephi-toolkit</artifactId> <version>0.9.1</version> </dependency> - + <dependency> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-client</artifactId> + <version>1.19.1</version> + </dependency> + <dependency> + <groupId>org.json</groupId> + <artifactId>json</artifactId> + <version>20160212</version> + </dependency> </dependencies> </project> \ No newline at end of file diff --git a/src/main/java/fucoin/actions/aggregation/ActionAggregation.java b/src/main/java/fucoin/actions/aggregation/ActionAggregation.java index 723768db906a8a74e3ecb6ce8aa172bf7043d866..973e3dbfabe9d16a37e1abaacf50fc3dd3b48593 100644 --- a/src/main/java/fucoin/actions/aggregation/ActionAggregation.java +++ b/src/main/java/fucoin/actions/aggregation/ActionAggregation.java @@ -55,6 +55,7 @@ public abstract class ActionAggregation extends ClientAction { private void sendSleepingRequest(AbstractWallet wallet, AggregationContext aggregationContext) { ActorRef randomNeighbor = wallet.getRandomNeighbor(); + System.out.println(randomNeighbor); Thread sleepingRequest = new Thread() { @Override public void run() { diff --git a/src/main/java/fucoin/actions/aggregation/ActionAggregationResult.java b/src/main/java/fucoin/actions/aggregation/ActionAggregationResult.java index 9a74e2bf3dddd672615d5f52dbc844d3aaac269a..63b7dde4dac1dd5ad92a235f56bbdd8e795cb153 100644 --- a/src/main/java/fucoin/actions/aggregation/ActionAggregationResult.java +++ b/src/main/java/fucoin/actions/aggregation/ActionAggregationResult.java @@ -25,6 +25,8 @@ public class ActionAggregationResult extends SuperVisorAction { @Override protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, SuperVisorImpl superVisor) { superVisor.addAggregationResult(sender, getContext()); + // send statistics result packet... + superVisor.sendValuePacket(sender.path().name(), getContext().getResult(), getContext().getMaxExchanges(), getContext().getAggregation(), true); } public AggregationContext getContext() { diff --git a/src/main/java/fucoin/actions/aggregation/AggregationContext.java b/src/main/java/fucoin/actions/aggregation/AggregationContext.java index df216a15ec21d70c366cc2c3e620cb6db0035df3..0c6a0ff5991864ca457aa9cd641d1eebb42511e8 100644 --- a/src/main/java/fucoin/actions/aggregation/AggregationContext.java +++ b/src/main/java/fucoin/actions/aggregation/AggregationContext.java @@ -1,5 +1,5 @@ /** - * + * */ package fucoin.actions.aggregation; @@ -12,136 +12,166 @@ import fucoin.wallet.AbstractWallet; /** * @author Kim - * */ public class AggregationContext implements Serializable { - private static final long serialVersionUID = -8314269748209085088L; - private final AggregationFunction function; - private final double[] values; - private final Function<AbstractWallet, double[]> valueExtractor; - private final Function<AbstractWallet, double[]> valueAltExtractor; - private final int currentExchanges; - private final int maxExchanges; - private final UUID uuid; - private Function<double[], Double> resultExtractor; - - private AggregationContext(AggregationFunction function, Function<AbstractWallet, double[]> valueExtractor, Function<AbstractWallet, double[]> valueAltExtractor, - double[] values, int currentExchanges, int maxExchanges, UUID uuid, Function<double[], Double> resultExtractor) { - this.function = function; - this.valueExtractor = valueExtractor; - this.valueAltExtractor = valueAltExtractor; - this.values = values; - this.currentExchanges = currentExchanges; - this.maxExchanges = maxExchanges; - this.uuid = uuid; - this.resultExtractor = resultExtractor; - } - - public AggregationContext aggregate(AggregationContext neighborContext) { - double[] aggregatedValues = getFunction().apply(neighborContext.getValues(), getValues()); - AggregationContext aggregatedContext = new AggregationContextBuilder(getFunction(), getValueExtractor(), getAltValueExtractor(), - aggregatedValues, resultExtractor) - .setCurrentExchanges(getCurrentExchanges() + 1) - .setMaxExchanges(getMaxExchanges()) - .setUuid(getUuid()) - .build(); - System.out.println("Aggregation: " + Arrays.toString(getValues()) + " + " + Arrays.toString(neighborContext.getValues()) + " = " + aggregatedValues); - return aggregatedContext; - } - - public AggregationContext initContext(AbstractWallet wallet, boolean useValueExtractor) { - double[] initValues = useValueExtractor ? getValueExtractor().apply(wallet) : getAltValueExtractor().apply(wallet); - AggregationContext initContext = new AggregationContextBuilder(getFunction(), getValueExtractor(), getAltValueExtractor(), initValues, resultExtractor) - .setMaxExchanges(getMaxExchanges()) - .setUuid(getUuid()) - .build(); - return initContext; - } - - public boolean isDone() { - boolean isDone = getCurrentExchanges() >= getMaxExchanges(); - return isDone; - } - - public double[] getValues() { - return values; - } - - public double getResult() { - return resultExtractor.apply(values); - } - - @Override - public String toString() { - return "The aggregated values are " + Arrays.toString(getValues()) + " after " + getCurrentExchanges() + " aggregations."; - - } - - private AggregationFunction getFunction() { - return function; - } - - private Function<AbstractWallet, double[]> getValueExtractor() { - return valueExtractor; - } - - private Function<AbstractWallet, double[]> getAltValueExtractor() { - return valueAltExtractor; - } - - private int getCurrentExchanges() { - return currentExchanges; - } - - private int getMaxExchanges() { - return maxExchanges; - } - - public UUID getUuid() { - return uuid; - } - - public static class AggregationContextBuilder { - private final AggregationFunction function; - private final double[] values; - private final Function<AbstractWallet, double[]> valueExtractor; - private final Function<AbstractWallet, double[]> valueAltExtractor; - private int currentExchanges; - private int maxExchanges; - private UUID uuid; - private Function<double[], Double> resultExtractor; - - public AggregationContextBuilder(AggregationFunction function, Function<AbstractWallet, double[]> valueExtractor, Function<AbstractWallet, double[]> valueAltExtractor, - double[] values, Function<double[], Double> resultExtractor) { - this.function = function; - this.valueExtractor = valueExtractor; - this.valueAltExtractor = valueAltExtractor; - this.values = values; - this.currentExchanges = 0; - this.maxExchanges = 20; - this.uuid = UUID.randomUUID(); - this.resultExtractor = resultExtractor; - } - - public AggregationContextBuilder setCurrentExchanges(int currentExchanges) { - this.currentExchanges = currentExchanges; - return this; - } - - public AggregationContextBuilder setMaxExchanges(int maxExchanges) { - this.maxExchanges = maxExchanges; - return this; - } - - public AggregationContextBuilder setUuid(UUID uuid) { - this.uuid = uuid; - return this; - } - - public AggregationContext build() { - return new AggregationContext(function, valueExtractor, valueAltExtractor, values, currentExchanges, maxExchanges, uuid, resultExtractor); - } - - } - -} + private static final long serialVersionUID = -8314269748209085088L; + private final AggregationFunction function; + private final double[] values; + private final Function<AbstractWallet, double[]> valueExtractor; + private final Function<AbstractWallet, double[]> valueAltExtractor; + private final int currentExchanges; + private final int maxExchanges; + private final UUID uuid; + private Function<double[], Double> resultExtractor; + // the value of the current aggregation in succession + private final int aggregation; + + private AggregationContext(AggregationFunction function, Function<AbstractWallet, double[]> valueExtractor, Function<AbstractWallet, double[]> valueAltExtractor, + double[] values, int currentExchanges, int maxExchanges, UUID uuid, Function<double[], Double> resultExtractor, int aggregation) { + this.function = function; + this.valueExtractor = valueExtractor; + this.valueAltExtractor = valueAltExtractor; + this.values = values; + this.currentExchanges = currentExchanges; + this.maxExchanges = maxExchanges; + this.uuid = uuid; + this.resultExtractor = resultExtractor; + this.aggregation = aggregation; + } + + public AggregationContext aggregate(AggregationContext neighborContext) { + double[] aggregatedValues = getFunction().apply(neighborContext.getValues(), getValues()); + AggregationContext aggregatedContext = new AggregationContextBuilder(getFunction(),aggregatedValues) + .setValueExtractor(getValueExtractor()) + .setValueAltExtractor(getAltValueExtractor()) + .setResultExtractor(resultExtractor) + .setCurrentExchanges(getCurrentExchanges() + 1) + .setMaxExchanges(getMaxExchanges()) + .setUuid(getUuid()) + .setAggregation(getAggregation()) + .build(); + System.out.println("Aggregation: " + Arrays.toString(getValues()) + " + " + Arrays.toString(neighborContext.getValues()) + " = " + aggregatedValues); + return aggregatedContext; + } + + public AggregationContext initContext(AbstractWallet wallet, boolean useValueExtractor) { + double[] initValues = useValueExtractor ? getValueExtractor().apply(wallet) : getAltValueExtractor().apply(wallet); + AggregationContext initContext = new AggregationContextBuilder(getFunction(), initValues) + .setValueExtractor(getValueExtractor()) + .setValueAltExtractor(getAltValueExtractor()) + .setResultExtractor(resultExtractor) + .setCurrentExchanges(getCurrentExchanges()) + .setMaxExchanges(getMaxExchanges()) + .setAggregation(getAggregation()) + .setUuid(getUuid()) + .build(); + return initContext; + } + + public boolean isDone() { + boolean isDone = getCurrentExchanges() >= getMaxExchanges(); + return isDone; + } + + public double[] getValues() { + return values; + } + + public double getResult() { + return resultExtractor.apply(values); + } + + @Override + public String toString() { + return "The aggregated values are " + Arrays.toString(getValues()) + " after " + getCurrentExchanges() + " aggregations."; + + } + + private AggregationFunction getFunction() { + return function; + } + + private Function<AbstractWallet, double[]> getValueExtractor() { + return valueExtractor; + } + + private Function<AbstractWallet, double[]> getAltValueExtractor() { + return valueAltExtractor; + } + + public int getCurrentExchanges() { + return currentExchanges; + } + + public int getMaxExchanges() { + return maxExchanges; + } + + public int getAggregation() { return this.aggregation; } + + public UUID getUuid() { + return uuid; + } + + public static class AggregationContextBuilder { + private final AggregationFunction function; + private final double[] values; + private Function<AbstractWallet, double[]> valueExtractor; + private Function<AbstractWallet, double[]> valueAltExtractor; + private int currentExchanges; + private int maxExchanges; + private int aggregation; + private UUID uuid; + private Function<double[], Double> resultExtractor; + + public AggregationContextBuilder(AggregationFunction function, double[] values) { + this.function = function; + this.values = values; + this.currentExchanges = 0; + this.maxExchanges = 20; + this.uuid = UUID.randomUUID(); + + } + + public AggregationContextBuilder setValueExtractor(Function<AbstractWallet, double[]> valueExtractor) { + this.valueExtractor = valueExtractor; + return this; + } + + public AggregationContextBuilder setValueAltExtractor(Function<AbstractWallet, double[]> valueAltExtractor) { + this.valueAltExtractor = valueAltExtractor; + return this; + } + + public AggregationContextBuilder setResultExtractor(Function<double[], Double> resultExtractor) { + this.resultExtractor = resultExtractor; + return this; + } + + public AggregationContextBuilder setAggregation(int aggregation) { + this.aggregation = aggregation; + return this; + } + + public AggregationContextBuilder setCurrentExchanges(int currentExchanges) { + this.currentExchanges = currentExchanges; + return this; + } + + public AggregationContextBuilder setMaxExchanges(int maxExchanges) { + this.maxExchanges = maxExchanges; + return this; + } + + public AggregationContextBuilder setUuid(UUID uuid) { + this.uuid = uuid; + return this; + } + + public AggregationContext build() { + return new AggregationContext(function, valueExtractor, valueAltExtractor, values, currentExchanges, maxExchanges, uuid, resultExtractor, aggregation); + } + + } + +} \ No newline at end of file diff --git a/src/main/java/fucoin/actions/aggregation/statistics/FileLogger.java b/src/main/java/fucoin/actions/aggregation/statistics/FileLogger.java new file mode 100644 index 0000000000000000000000000000000000000000..f70a6d28ce71fb0baa5e7f951a6ca21be1ec0bcb --- /dev/null +++ b/src/main/java/fucoin/actions/aggregation/statistics/FileLogger.java @@ -0,0 +1,57 @@ +package fucoin.actions.aggregation.statistics; + +import org.json.JSONObject; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; + +/** + * Simple file logger for statistics + */ +public class FileLogger implements Statistics { + + private Path filePath; + + public FileLogger(Path filePath) { + this.filePath = filePath; + } + + @Override + public void initPacket(String type, double value, int maxIndex) { + JSONObject jsonObject = new JSONObject(); + jsonObject.put("type", type); + jsonObject.put("value", value); + jsonObject.put("max_index", maxIndex); + + save(PacketType.InitPacket, jsonObject); + } + + @Override + public void initPacket(String type, double value, int maxIndex, double secondValue) { + JSONObject jsonObject = new JSONObject(); + jsonObject.put("type", type); + jsonObject.put("value", value); + jsonObject.put("max_index", maxIndex); + jsonObject.put("second_value", secondValue); + + save(PacketType.InitPacket, jsonObject); + } + + @Override + public void valuePacket(String walletName, int aggregation, int exchange, double value, boolean result) { + save(PacketType.ValuePacket, JsonHelper.createValuePacket(walletName, aggregation, exchange, value, result)); + + } + + private void save(Statistics.PacketType packet, JSONObject json) { + try (BufferedWriter writer = Files.newBufferedWriter(filePath, Charset.defaultCharset(), StandardOpenOption.APPEND)) { + writer.write(packet.toString() + " " + json.toString() + System.lineSeparator()); + } catch (IOException e) { + + } + } +} diff --git a/src/main/java/fucoin/actions/aggregation/statistics/GraphicsConnector.java b/src/main/java/fucoin/actions/aggregation/statistics/GraphicsConnector.java new file mode 100644 index 0000000000000000000000000000000000000000..27cd1af10765ccfa446807af6edd46a23c0a4141 --- /dev/null +++ b/src/main/java/fucoin/actions/aggregation/statistics/GraphicsConnector.java @@ -0,0 +1,42 @@ +package fucoin.actions.aggregation.statistics; + +import com.sun.jersey.api.client.*; +import org.json.JSONObject; + +import javax.ws.rs.core.MediaType; + +/** + * Connector for the graphService + */ +public class GraphicsConnector implements Statistics { + + private static final String BASE_URI = "http://localhost:8080/graphService/"; + private Client client; + + public GraphicsConnector() { + client = Client.create(); + } + + @Override + public void initPacket(String type, double value, int maxIndex) { + send(PacketType.InitPacket, JsonHelper.createInitPacket(type, value, maxIndex)); + } + + @Override + public void initPacket(String type, double value, int maxIndex, double secondValue) { + send(PacketType.InitPacket, JsonHelper.createInitPacket(type, value, maxIndex, secondValue)); + } + + @Override + public void valuePacket(String walletName, int aggregation, int exchange, double value, boolean result) { + send(PacketType.ValuePacket, JsonHelper.createValuePacket(walletName, aggregation, exchange, value, result)); + + } + + private void send(PacketType packet, JSONObject json) { + AsyncWebResource webResource = client.asyncResource(BASE_URI); + webResource.path(packet.toString()).accept(MediaType.APPLICATION_JSON_TYPE). + type(MediaType.APPLICATION_JSON_TYPE).put(ClientResponse.class, json.toString()); + } +} + diff --git a/src/main/java/fucoin/actions/aggregation/statistics/JsonHelper.java b/src/main/java/fucoin/actions/aggregation/statistics/JsonHelper.java new file mode 100644 index 0000000000000000000000000000000000000000..db9c246633bf74d257f9c18cafc08f8ee2e4d1c0 --- /dev/null +++ b/src/main/java/fucoin/actions/aggregation/statistics/JsonHelper.java @@ -0,0 +1,42 @@ +package fucoin.actions.aggregation.statistics; + +import org.json.JSONObject; + +/** + * Created by felix on 15.07.2016. + */ +public class JsonHelper { + private JsonHelper() { + } + + public static JSONObject createInitPacket(String type, double value, int maxIndex) { + JSONObject jsonObject = new JSONObject(); + jsonObject.put("type", type); + jsonObject.put("value", value); + jsonObject.put("max_index", maxIndex); + + return jsonObject; + } + public static JSONObject createInitPacket(String type, double value, int maxIndex, double secondValue) { + + JSONObject jsonObject = createInitPacket(type, value, maxIndex); + jsonObject.put("second_value" , secondValue); + + return jsonObject; + } + public static JSONObject createValuePacket(String walletName, int aggregation, int exchange, double value, boolean result) { + JSONObject jsonObject = new JSONObject(); + jsonObject.put("walletName", walletName); + jsonObject.put("aggregation", aggregation); + jsonObject.put("exchange", exchange); + // fix the Infinity bug + if(Double.isInfinite(value)) { + jsonObject.put("value", 0.0); + } else { + jsonObject.put("value", value); + } + jsonObject.put("result", result); + + return jsonObject; + } +} diff --git a/src/main/java/fucoin/actions/aggregation/statistics/Statistics.java b/src/main/java/fucoin/actions/aggregation/statistics/Statistics.java new file mode 100644 index 0000000000000000000000000000000000000000..612df0fb0a264bc6319a740909f880d7f42a1a7b --- /dev/null +++ b/src/main/java/fucoin/actions/aggregation/statistics/Statistics.java @@ -0,0 +1,40 @@ +package fucoin.actions.aggregation.statistics; + +/** + * Interface for statistics service + */ +public interface Statistics { + /** + * Sends an packet to initialize the graph. + * + * @param type the type of aggregation + * @param value the exact value of the aggregation + * @param maxIndex the number of exchanges that we want to display + */ + void initPacket(String type, double value, int maxIndex); + + /** + * Sends an packet to initialize the graph. + * + * @param type the type of aggregation + * @param value the exact value of the aggregation + * @param maxIndex the number of exchange that we want to display + * @param secondValue the second smallest/biggest value, needed for min/max aggregation + */ + void initPacket(String type, double value, int maxIndex, double secondValue); + + /** + * Sends an packet with the observed value. + * + * @param walletName the wallet that contains the value + * @param aggregation the number of aggregation we are currently on + * @param exchange the number of exchange the packet is on + * @param value the value of the wallet at that given exchange + * @param result true if this is the final result, false otherwise + */ + void valuePacket(String walletName, int aggregation, int exchange, double value, boolean result); + + enum PacketType { + InitPacket, ValuePacket + } +} diff --git a/src/main/java/fucoin/configurations/MassWalletConfiguration.java b/src/main/java/fucoin/configurations/MassWalletConfiguration.java new file mode 100644 index 0000000000000000000000000000000000000000..57bd49a19d0d808959b39b5fbb601ff585b09291 --- /dev/null +++ b/src/main/java/fucoin/configurations/MassWalletConfiguration.java @@ -0,0 +1,33 @@ +package fucoin.configurations; + +import akka.actor.ActorRef; +import fucoin.configurations.internal.ConfigurationName; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.LockSupport; + +/** + * This configuration spawns 50 wallets to demonstrate the spawning of many headless wallets + */ +@ConfigurationName("Lots of Wallets") +public class MassWalletConfiguration extends AbstractConfiguration { + @Override + public void run() { + ActorRef supervisor = initSupervisor(); + try { + spawnWallets(40, false); + System.out.println("Wallet spawning done!"); + } catch (Exception e) { + System.out.println("Wallet spawning timed out!"); + } + + LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(2)); + + randomTransactions(10, 10); + } + + @Override + public void onReceive(Object message) { + super.onReceive(message); + } +} diff --git a/src/main/java/fucoin/gui/SuperVisorGuiControlImpl.java b/src/main/java/fucoin/gui/SuperVisorGuiControlImpl.java index 46d8b9a2f130c7af9777a1366309d65c7541f321..48c9323b694ffd8eecd4f3ca723650d53dfd4cee 100644 --- a/src/main/java/fucoin/gui/SuperVisorGuiControlImpl.java +++ b/src/main/java/fucoin/gui/SuperVisorGuiControlImpl.java @@ -87,7 +87,7 @@ public class SuperVisorGuiControlImpl implements SuperVisorGuiControl { return logActive; } - public void startAggregation(AggregationMethod method, int exchanges) { - superVisor.startAggregation(method, exchanges); + public void startAggregation(AggregationMethod method, int exchanges, int aggregation, double exactValue, double secondValue) { + superVisor.startAggregation(method, exchanges, aggregation, exactValue, secondValue); } } diff --git a/src/main/java/fucoin/gui/SuperVisorThreadGUI.java b/src/main/java/fucoin/gui/SuperVisorThreadGUI.java index a722a3f49c305b2e0a2c745a20d4ca8143ea4e1b..fb8b7a7076170183506cc93d73e5f51c4d303fb0 100644 --- a/src/main/java/fucoin/gui/SuperVisorThreadGUI.java +++ b/src/main/java/fucoin/gui/SuperVisorThreadGUI.java @@ -5,6 +5,8 @@ import java.awt.GridLayout; import java.awt.event.ItemEvent; import java.awt.event.WindowAdapter; import java.awt.event.WindowEvent; +import java.util.*; +import java.util.stream.Collectors; import javax.swing.JButton; import javax.swing.JCheckBox; @@ -39,6 +41,10 @@ public class SuperVisorThreadGUI { private JSpinner aggregationExchangesSpinner; + private AggregationMethod currentMethod; + private int aggregation = 0; + + public SuperVisorThreadGUI(SuperVisorGuiControlImpl superVisorGuiControl) { this.superVisorGuiControl = superVisorGuiControl; @@ -59,15 +65,18 @@ public class SuperVisorThreadGUI { txtLog.setCellRenderer(new LogCellRenderer()); - aggregationMethodsBox = new JComboBox<AggregationMethod>(AggregationMethod.values()); + aggregationMethodsBox = new JComboBox<>(AggregationMethod.values()); aggregationExchangesSpinner = new JSpinner(new SpinnerNumberModel(25, 1, 10000, 1)); - startAggregationButton = new JButton("Start Aggregation"); + + startAggregationButton = new JButton("Start Aggregation"); startAggregationButton.addActionListener(e -> { - AggregationMethod method = (AggregationMethod) aggregationMethodsBox.getSelectedItem(); - int exchanges = (int) aggregationExchangesSpinner.getModel() - .getValue(); - superVisorGuiControl.startAggregation(method, exchanges); + AggregationMethod method = (AggregationMethod) aggregationMethodsBox.getSelectedItem(); + int exchanges = (int) aggregationExchangesSpinner.getModel() + .getValue(); + + setAggregation(method); + startAggregation(method, exchanges); }); showDebug = new JCheckBox("Show debug messages in transaction log"); @@ -149,4 +158,58 @@ public class SuperVisorThreadGUI { txtLog.ensureIndexIsVisible(log.getSize() - 1); }); } + + private void setAggregation(AggregationMethod aggregationMethod) { + if (Objects.isNull(this.currentMethod)) { + this.currentMethod = aggregationMethod; + this.aggregation = 0; + } + if (this.currentMethod.equals(aggregationMethod)) { + this.aggregation++; + } else { + this.aggregation = 0; + } + this.currentMethod = aggregationMethod; + } + + private List<Integer> valueList() { + List<Integer> values = new ArrayList<>(); + for (int i = 0; i < superVisorGuiControl.getAmountTableModel().getRowCount(); i++) { + values.add((Integer) superVisorGuiControl.getAmountTableModel().getValueAt(i, 2)); + } + Collections.sort(values); + return values; + } + + private void startAggregation(AggregationMethod method, int exchanges) { + List<Integer> values = valueList(); + double exactValue = 0; + double secondValue = Integer.MIN_VALUE-1; + + // since values is a sorted list minimum and second smallest element are at the beginning + if (method.equals(AggregationMethod.Minimum)) { + exactValue = values.get(0); + secondValue = values.get(1); + } + + if (method.equals(AggregationMethod.Average)) { + exactValue = values.stream().collect(Collectors.averagingInt(Integer::intValue)); + } + + // since values is a sorted list maximum and second biggest element are at the end + if (method.equals(AggregationMethod.Maximum)) { + exactValue = values.get(values.size() - 1); + secondValue = values.get(values.size() - 2); + } + + if(method.equals(AggregationMethod.Count)) { + exactValue = values.size(); + } + + if(method.equals(AggregationMethod.Sum)) { + exactValue = values.stream().collect(Collectors.summingInt(Integer::intValue)); + } + + superVisorGuiControl.startAggregation(method, exchanges, this.aggregation, exactValue, secondValue); + } } diff --git a/src/main/java/fucoin/supervisor/SuperVisorImpl.java b/src/main/java/fucoin/supervisor/SuperVisorImpl.java index 0458fe01f370d70b92245df775f2f49d9541173c..072c4c773eeec6399f01ec90f7cd20ba8b44c8ab 100644 --- a/src/main/java/fucoin/supervisor/SuperVisorImpl.java +++ b/src/main/java/fucoin/supervisor/SuperVisorImpl.java @@ -18,6 +18,8 @@ import fucoin.actions.aggregation.AggregationContext; import fucoin.actions.aggregation.AggregationContext.AggregationContextBuilder; import fucoin.actions.aggregation.AggregationFunction; import fucoin.actions.aggregation.AggregationMethod; +import fucoin.actions.aggregation.statistics.GraphicsConnector; +import fucoin.actions.aggregation.statistics.Statistics; import fucoin.actions.control.ActionWalletCreationDone; import fucoin.actions.persist.ActionInvokeUpdate; import fucoin.actions.transaction.ActionGetAmountAnswer; @@ -40,6 +42,9 @@ public class SuperVisorImpl extends AbstractNode implements TransactionLogger { private final Map<ActorRef, AggregationContext> aggregationResults = new HashMap<>(); + // refactor... + private Statistics statistics = new GraphicsConnector(); + public SuperVisorImpl() { } @@ -178,7 +183,7 @@ public class SuperVisorImpl extends AbstractNode implements TransactionLogger { this.bankCommitObserver = bankCommitObserver; } - public void startAggregation(AggregationMethod method, int exchanges) { + public void startAggregation(AggregationMethod method, int exchanges, int aggregation, double exactValue, double secondValue) { clearAggregationResults(); Function<AbstractWallet, double[]> valueExtractor = method.getValueExtractor(); Function<AbstractWallet, double[]> valueAltExtractor = method.getValueAltExtractor(); @@ -190,10 +195,17 @@ public class SuperVisorImpl extends AbstractNode implements TransactionLogger { .setValueAltExtractor(valueAltExtractor) .setResultExtractor(resultExtractor) .setMaxExchanges(exchanges) + .setAggregation(aggregation) .build(); ActorRef randomNode = getRandomNeighbor(); ActionAggregationInit init = new ActionAggregationInit(context, method.isUseValueExtractor()); randomNode.tell(init, getSelf()); + + // mask value + if(secondValue < Integer.MIN_VALUE) { + this.sendInitPacket(method.toString(), exchanges, exactValue); + } + this.sendInitPacket(method.toString(), exchanges, exactValue, secondValue); } /** @@ -228,4 +240,16 @@ public class SuperVisorImpl extends AbstractNode implements TransactionLogger { gui.updateAggregationValue(wallet.path() .name(), context.getResult()); } + + public void sendValuePacket(String name, double value, int exchange, int aggregation, boolean result) { + this.statistics.valuePacket(name, aggregation, exchange, value, result); + } + + public void sendInitPacket(String type, int exchanges, double value) { + this.statistics.initPacket(type, value, exchanges); + } + + public void sendInitPacket(String type, int exchanges, double value, double secondValue) { + this.statistics.initPacket(type, value, exchanges, secondValue); + } } diff --git a/src/main/java/fucoin/wallet/AbstractWallet.java b/src/main/java/fucoin/wallet/AbstractWallet.java index a60f73445ca41e2e9ba85fe3ac2af26317d1d1a0..0e92f83dbeb7b085cf162111382bdf3ff6c2c359 100644 --- a/src/main/java/fucoin/wallet/AbstractWallet.java +++ b/src/main/java/fucoin/wallet/AbstractWallet.java @@ -182,7 +182,8 @@ public abstract class AbstractWallet extends AbstractNode implements Serializabl overlayNeighbours.add(wallet); } - @Override + //TODO: ????!??? + /*@Override public ActorRef getRandomNeighbor() { List<ActorRef> neighbors = new ArrayList<>(overlayNeighbours); if (getExcludedNeighbors().size() < neighbors.size()) { @@ -191,5 +192,5 @@ public abstract class AbstractWallet extends AbstractNode implements Serializabl Random rand = new Random(); int index = rand.nextInt(neighbors.size()); return neighbors.get(index); - } + }*/ } diff --git a/src/main/java/fucoin/wallet/WalletImpl.java b/src/main/java/fucoin/wallet/WalletImpl.java index 9ee74ac3e8eb1088ec9da37c81a755a51b3d9d52..a32daafdefb3f8fabd45a7c33c25f330e0d84593 100644 --- a/src/main/java/fucoin/wallet/WalletImpl.java +++ b/src/main/java/fucoin/wallet/WalletImpl.java @@ -10,6 +10,7 @@ import com.google.common.collect.EvictingQueue; import akka.actor.ActorRef; import akka.actor.Props; import fucoin.actions.ClientAction; +import fucoin.actions.aggregation.ActionAggregationStatistics; import fucoin.actions.aggregation.AggregationContext; import fucoin.actions.aggregation.AggregationContext.AggregationContextBuilder; import fucoin.actions.join.ActionJoin; @@ -269,6 +270,7 @@ public class WalletImpl extends AbstractWallet { public void setAggregationContext(AggregationContext context) { System.out.println("Intermediate " + getName() + ": " + context); this.aggregationContext = context; + this.getRemoteSuperVisorActor().tell(new ActionAggregationStatistics(getName(), context.getResult(), context.getCurrentExchanges(), context.getAggregation()), this.getSelf()); } @Override