diff --git a/pom.xml b/pom.xml index 1a37142e64dcde75bec6b23cb852f4fcb0089deb..579ee80d66916520480b6286094c15e0340257b0 100644 --- a/pom.xml +++ b/pom.xml @@ -56,5 +56,11 @@ <artifactId>reflections</artifactId> <version>0.9.10</version> </dependency> + <dependency> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-client</artifactId> + <version>1.19.1</version> + </dependency> + </dependencies> </project> \ No newline at end of file diff --git a/src/main/java/fucoin/actions/aggregation/ActionAggregationCancel.java b/src/main/java/fucoin/actions/aggregation/ActionAggregationCancel.java index f0b831bc024ffe071d7b60000f3b5c48b3a60f0c..49e4478508b5ed7313d5ac019736c43401ce7d81 100644 --- a/src/main/java/fucoin/actions/aggregation/ActionAggregationCancel.java +++ b/src/main/java/fucoin/actions/aggregation/ActionAggregationCancel.java @@ -32,14 +32,10 @@ public class ActionAggregationCancel extends ActionAggregation { if (isDone()) { wallet.addExcludedNeighbor(sender); } - AggregationContext aggregationContext = wallet.getAggregationContext(); - if (!wallet.areAllNeighborsExcluded() && !aggregationContext - .isDone()) { + if (!wallet.areAllNeighborsExcluded()) { continueAggregation(wallet); - } else if (aggregationContext.isDone() && !wallet.areAllNeighborsExcluded()) { - sendEndMessage(wallet); } else if (wallet.hasAggregationContext()) { - sendAggregatedResult(wallet, aggregationContext); + sendAggregatedResult(wallet, wallet.getAggregationContext()); sendEndMessage(wallet); } } diff --git a/src/main/java/fucoin/actions/aggregation/ActionAggregationResult.java b/src/main/java/fucoin/actions/aggregation/ActionAggregationResult.java index 9a74e2bf3dddd672615d5f52dbc844d3aaac269a..a52fa1b4bfde3d0c71837c4b213167a99c2f01e7 100644 --- a/src/main/java/fucoin/actions/aggregation/ActionAggregationResult.java +++ b/src/main/java/fucoin/actions/aggregation/ActionAggregationResult.java @@ -25,6 +25,7 @@ public class ActionAggregationResult extends SuperVisorAction { @Override protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, SuperVisorImpl superVisor) { superVisor.addAggregationResult(sender, getContext()); + superVisor.sendValuePacket(sender.path().name(), getContext().getValue(), getContext().getAggregation(), true); } public AggregationContext getContext() { diff --git a/src/main/java/fucoin/actions/aggregation/ActionAggregationStatistics.java b/src/main/java/fucoin/actions/aggregation/ActionAggregationStatistics.java new file mode 100644 index 0000000000000000000000000000000000000000..90d07c8d2a0478329a0353192e2e07385026d0a6 --- /dev/null +++ b/src/main/java/fucoin/actions/aggregation/ActionAggregationStatistics.java @@ -0,0 +1,30 @@ +package fucoin.actions.aggregation; + +import akka.actor.ActorRef; +import akka.actor.UntypedActorContext; +import fucoin.actions.transaction.SuperVisorAction; +import fucoin.supervisor.SuperVisorImpl; + +/** + * Created by felix on 11.07.2016. + */ +public class ActionAggregationStatistics extends SuperVisorAction { + + private String name; + private double value; + private int exchange; + private int aggregation; + + + public ActionAggregationStatistics(String name, double value, int exchange, int aggregation) { + this.value = value; + this.name = name; + this.exchange = exchange; + this.aggregation = aggregation; + } + + @Override + protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, SuperVisorImpl abstractNode) { + abstractNode.sendValuePacket(name, value, exchange, aggregation, false); + } +} diff --git a/src/main/java/fucoin/actions/aggregation/AggregationContext.java b/src/main/java/fucoin/actions/aggregation/AggregationContext.java index 67e848b2c5a267636a9c187b036c85d2a05c6cb0..ae4059cf61a835083027ba13d0452aa78e0da3a5 100644 --- a/src/main/java/fucoin/actions/aggregation/AggregationContext.java +++ b/src/main/java/fucoin/actions/aggregation/AggregationContext.java @@ -20,16 +20,18 @@ public class AggregationContext implements Serializable { private final Function<AbstractWallet, Double> valueExtractor; private final int currentExchanges; private final int maxExchanges; + private final int aggregation; private final UUID uuid; private AggregationContext(AggregationFunction function, Function<AbstractWallet, Double> valueExtractor, - double value, int currentExchanges, int maxExchanges, UUID uuid) { + double value, int currentExchanges, int maxExchanges, UUID uuid, int aggregation) { this.function = function; this.valueExtractor = valueExtractor; this.value = value; this.currentExchanges = currentExchanges; this.maxExchanges = maxExchanges; this.uuid = uuid; + this.aggregation = aggregation; } public AggregationContext aggregate(AggregationContext neighborContext) { @@ -39,6 +41,7 @@ public class AggregationContext implements Serializable { .setCurrentExchanges(getCurrentExchanges() + 1) .setMaxExchanges(getMaxExchanges()) .setUuid(getUuid()) + .setAggregation(getAggregation()) .build(); System.out.println("Aggregation: " + getValue() + " + " + neighborContext.getValue() + " = " + aggregatedValue); return aggregatedContext; @@ -49,6 +52,7 @@ public class AggregationContext implements Serializable { AggregationContext initContext = new AggregationContextBuilder(getFunction(), getValueExtractor(), initValue) .setMaxExchanges(getMaxExchanges()) .setUuid(getUuid()) + .setAggregation(getAggregation()) .build(); return initContext; } @@ -76,7 +80,7 @@ public class AggregationContext implements Serializable { return valueExtractor; } - private int getCurrentExchanges() { + public int getCurrentExchanges() { return currentExchanges; } @@ -84,6 +88,8 @@ public class AggregationContext implements Serializable { return maxExchanges; } + public int getAggregation() { return aggregation; } + public UUID getUuid() { return uuid; } @@ -94,6 +100,7 @@ public class AggregationContext implements Serializable { private final Function<AbstractWallet, Double> valueExtractor; private int currentExchanges; private int maxExchanges; + private int aggregation; private UUID uuid; public AggregationContextBuilder(AggregationFunction function, Function<AbstractWallet, Double> valueExtractor, @@ -121,8 +128,13 @@ public class AggregationContext implements Serializable { return this; } + public AggregationContextBuilder setAggregation(int aggregation) { + this.aggregation = aggregation; + return this; + } + public AggregationContext build() { - return new AggregationContext(function, valueExtractor, value, currentExchanges, maxExchanges, uuid); + return new AggregationContext(function, valueExtractor, value, currentExchanges, maxExchanges, uuid, aggregation); } } diff --git a/src/main/java/fucoin/actions/aggregation/statistics/FileLogger.java b/src/main/java/fucoin/actions/aggregation/statistics/FileLogger.java new file mode 100644 index 0000000000000000000000000000000000000000..9ffac73f15393d7b311fbb30f5116674aad015e1 --- /dev/null +++ b/src/main/java/fucoin/actions/aggregation/statistics/FileLogger.java @@ -0,0 +1,59 @@ +package fucoin.actions.aggregation.statistics; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; + +/** + * Created by felix on 11.07.2016. + */ +public class FileLogger implements Statistics { + + private Path filePath; + + + public FileLogger(Path filePath) { + this.filePath = filePath; + } + + @Override + public void initPacket(String type, double value, int maxIndex) { + String json = "{ \"type\": \"" + type + "\"," + + "\"value\": \"" + value + "\"," + + "\"max_index\": \"" + maxIndex + "\" }"; + + save("initPacket", json); + } + + @Override + public void initPacket(String type, double value, int maxIndex, double secondValue) { + String json = "{ \"type\": \"" + type + "\"," + + "\"value\": \"" + value + "\"," + + "\"max_index\": \"" + maxIndex + "\"," + + "\"second_value\": \"" + secondValue + "\" }"; + + save("initPacket", json); + } + + @Override + public void valuePacket(String walletName, int aggregation, int exchange, double value, boolean result) { + String json = "{ \"walletName\": \"" + walletName + "\", " + + "\"aggregation\": \"" + aggregation + "\", " + + "\"exchange\": \"" + exchange + "\", " + + "\"value\": \"" + value + "\", " + + "\"result\": \"" + result + "\" }"; + + save("valuePacket", json); + } + + private void save(String packet, String json) { + try (BufferedWriter writer = Files.newBufferedWriter(filePath, Charset.defaultCharset(), StandardOpenOption.APPEND)) { + writer.write(json); + } catch (IOException e) { + + } + } +} diff --git a/src/main/java/fucoin/actions/aggregation/statistics/GraphicsConnector.java b/src/main/java/fucoin/actions/aggregation/statistics/GraphicsConnector.java new file mode 100644 index 0000000000000000000000000000000000000000..8df6c276d54da73c026c94d1975abc9f57ca8bb7 --- /dev/null +++ b/src/main/java/fucoin/actions/aggregation/statistics/GraphicsConnector.java @@ -0,0 +1,64 @@ +package fucoin.actions.aggregation.statistics; + +import com.sun.jersey.api.client.*; + +import javax.ws.rs.core.MediaType; +import java.util.Queue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; + +/** + * Created by felix on 11.07.2016. + */ +public class GraphicsConnector implements Statistics { + + private static final String BASE_URI = "http://localhost:8080/graphService/"; + private Client client; + + public GraphicsConnector() { + client = Client.create(); + } + + @Override + public void initPacket(String type, double value, int maxIndex) { + + String json = "{ \"type\": \"" + type + "\"," + + "\"value\": \"" + value + "\"," + + "\"max_index\": \"" + maxIndex + "\" }"; + + send("initPacket", json); + + } + + @Override + public void initPacket(String type, double value, int maxIndex, double secondValue) { + String json = "{ \"type\": \"" + type + "\"," + + "\"value\": \"" + value + "\"," + + "\"max_index\": \"" + maxIndex + "\"," + + "\"second_value\": \"" + secondValue + "\" }"; + + + send("initPacket", json); + } + + @Override + public void valuePacket(String walletName, int aggregation, int exchange, double value, boolean result) { + String json = "{ \"walletName\": \"" + walletName + "\", " + + "\"aggregation\": \"" + aggregation + "\", " + + "\"exchange\": \"" + exchange + "\", " + + "\"value\": \"" + value + "\", " + + "\"result\": \"" + result + "\" }"; + + send("valuePacket", json); + + } + + private void send(String packet, String json) { + AsyncWebResource webResource = client.asyncResource(BASE_URI); + webResource.path(packet).accept(MediaType.APPLICATION_JSON_TYPE). + type(MediaType.APPLICATION_JSON_TYPE).put(ClientResponse.class, json); + } +} + diff --git a/src/main/java/fucoin/actions/aggregation/statistics/Statistics.java b/src/main/java/fucoin/actions/aggregation/statistics/Statistics.java new file mode 100644 index 0000000000000000000000000000000000000000..4419aed232204bf73faaa7367fb66c66e095151a --- /dev/null +++ b/src/main/java/fucoin/actions/aggregation/statistics/Statistics.java @@ -0,0 +1,10 @@ +package fucoin.actions.aggregation.statistics; + +/** + * Created by felix on 11.07.2016. + */ +public interface Statistics { + void initPacket(String type, double value, int maxIndex); + void initPacket(String type, double value, int maxIndex, double secondValue); + void valuePacket(String walletName, int aggregation, int exchange, double value, boolean result); +} diff --git a/src/main/java/fucoin/configurations/MassWalletConfiguration.java b/src/main/java/fucoin/configurations/MassWalletConfiguration.java index 5eef7a15d0ae333d16f0fa582469d69869e139e4..4c2985993d089eec2906ffc70db781fdbbcbadcc 100644 --- a/src/main/java/fucoin/configurations/MassWalletConfiguration.java +++ b/src/main/java/fucoin/configurations/MassWalletConfiguration.java @@ -3,6 +3,9 @@ package fucoin.configurations; import akka.actor.ActorRef; import fucoin.configurations.internal.ConfigurationName; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.LockSupport; + /** * This configuration spawns 200 wallets to demonstrate the spawning of many headless wallets */ @@ -12,12 +15,14 @@ public class MassWalletConfiguration extends AbstractConfiguration { public void run() { ActorRef supervisor = initSupervisor(); try { - spawnWallets(10, false); + spawnWallets(33, false); System.out.println("Wallet spawning done!"); } catch (Exception e) { System.out.println("Wallet spawning timed out!"); } + LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5)); + randomTransactions(10, 10); } diff --git a/src/main/java/fucoin/gui/SuperVisorGuiControlImpl.java b/src/main/java/fucoin/gui/SuperVisorGuiControlImpl.java index 46d8b9a2f130c7af9777a1366309d65c7541f321..9a0423ef2a8ddc82a261ba1dce1ae055749ffe4b 100644 --- a/src/main/java/fucoin/gui/SuperVisorGuiControlImpl.java +++ b/src/main/java/fucoin/gui/SuperVisorGuiControlImpl.java @@ -87,7 +87,10 @@ public class SuperVisorGuiControlImpl implements SuperVisorGuiControl { return logActive; } - public void startAggregation(AggregationMethod method, int exchanges) { - superVisor.startAggregation(method, exchanges); + public void startAggregation(AggregationMethod method, int exchanges, double value, double secondValue, int aggregation) { + superVisor.startAggregation(method, exchanges, value, secondValue, aggregation); } + public void startAggregation(AggregationMethod method, int exchanges, double value, int aggregation) { + superVisor.startAggregation(method, exchanges, value, aggregation); + } } diff --git a/src/main/java/fucoin/gui/SuperVisorThreadGUI.java b/src/main/java/fucoin/gui/SuperVisorThreadGUI.java index a722a3f49c305b2e0a2c745a20d4ca8143ea4e1b..5cfde0cd8c3f14096f7c3f2dd4f189f3522130a8 100644 --- a/src/main/java/fucoin/gui/SuperVisorThreadGUI.java +++ b/src/main/java/fucoin/gui/SuperVisorThreadGUI.java @@ -5,6 +5,11 @@ import java.awt.GridLayout; import java.awt.event.ItemEvent; import java.awt.event.WindowAdapter; import java.awt.event.WindowEvent; +import java.awt.geom.Arc2D; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; import javax.swing.JButton; import javax.swing.JCheckBox; @@ -33,11 +38,14 @@ public class SuperVisorThreadGUI { private JCheckBox activateLogging; private SuperVisorGuiControlImpl superVisorGuiControl; - private JButton startAggregationButton; + private JButton startAggregationButton; - private JComboBox<AggregationMethod> aggregationMethodsBox; + private JComboBox<AggregationMethod> aggregationMethodsBox; - private JSpinner aggregationExchangesSpinner; + private JSpinner aggregationExchangesSpinner; + private AggregationMethod currentMethod; + private AggregationMethod lastMethod; + private int aggregation = 0; public SuperVisorThreadGUI(SuperVisorGuiControlImpl superVisorGuiControl) { @@ -58,16 +66,65 @@ public class SuperVisorThreadGUI { JPanel logPanel = new JPanel(new BorderLayout()); txtLog.setCellRenderer(new LogCellRenderer()); - - aggregationMethodsBox = new JComboBox<AggregationMethod>(AggregationMethod.values()); - aggregationExchangesSpinner = new JSpinner(new SpinnerNumberModel(25, 1, 10000, 1)); - startAggregationButton = new JButton("Start Aggregation"); + aggregationMethodsBox = new JComboBox<AggregationMethod>(AggregationMethod.values()); + aggregationExchangesSpinner = new JSpinner(new SpinnerNumberModel(25, 1, 10000, 1)); + + startAggregationButton = new JButton("Start Aggregation"); startAggregationButton.addActionListener(e -> { - AggregationMethod method = (AggregationMethod) aggregationMethodsBox.getSelectedItem(); - int exchanges = (int) aggregationExchangesSpinner.getModel() - .getValue(); - superVisorGuiControl.startAggregation(method, exchanges); + AggregationMethod method = (AggregationMethod) aggregationMethodsBox.getSelectedItem(); + + currentMethod = method; + + if (lastMethod == null) { + lastMethod = currentMethod; + aggregation = 0; + } + if (currentMethod == lastMethod) { + aggregation++; + } else { + aggregation = 0; + } + lastMethod = currentMethod; + + int exchanges = (int) aggregationExchangesSpinner.getModel() + .getValue(); + + double value = 0; + double smallest = 0; + double second = Integer.MIN_VALUE; + + List<Integer> nums = new ArrayList<>(); + for (int i = 0; i < superVisorGuiControl.getAmountTableModel().getRowCount(); i++) { + nums.add((Integer) superVisorGuiControl.getAmountTableModel().getValueAt(i, 2)); + } + Collections.sort(nums); + + if (method == AggregationMethod.Minimum) { + smallest = nums.get(0); + second = nums.get(1); + } + + if (method == AggregationMethod.Average) { + for (int i = 0; i < superVisorGuiControl.getAmountTableModel().getRowCount(); i++) { + value += (Integer) superVisorGuiControl.getAmountTableModel().getValueAt(i, 2); + } + value = value / superVisorGuiControl.getAmountTableModel().getRowCount(); + } + + if (method == AggregationMethod.Maximum) { + smallest = nums.get(nums.size() - 1); + second = nums.get(nums.size() - 2); + } + + + if (second != Integer.MIN_VALUE) { + superVisorGuiControl.startAggregation(method, exchanges, smallest, second, aggregation); + } else { + superVisorGuiControl.startAggregation(method, exchanges, value, aggregation); + } + + }); showDebug = new JCheckBox("Show debug messages in transaction log"); @@ -91,18 +148,18 @@ public class SuperVisorThreadGUI { } }); - JPanel configPanel = new JPanel(new BorderLayout()); - JPanel logConfigPanel = new JPanel(); - JPanel aggregationPanel = new JPanel(); + JPanel configPanel = new JPanel(new BorderLayout()); + JPanel logConfigPanel = new JPanel(); + JPanel aggregationPanel = new JPanel(); - configPanel.add(logConfigPanel, BorderLayout.NORTH); - configPanel.add(aggregationPanel, BorderLayout.SOUTH); + configPanel.add(logConfigPanel, BorderLayout.NORTH); + configPanel.add(aggregationPanel, BorderLayout.SOUTH); - logConfigPanel.add(activateLogging); - logConfigPanel.add(showDebug); - aggregationPanel.add(aggregationExchangesSpinner); - aggregationPanel.add(aggregationMethodsBox); - aggregationPanel.add(startAggregationButton); + logConfigPanel.add(activateLogging); + logConfigPanel.add(showDebug); + aggregationPanel.add(aggregationExchangesSpinner); + aggregationPanel.add(aggregationMethodsBox); + aggregationPanel.add(startAggregationButton); //logPanel.add(activateLogging, BorderLayout.NORTH); logPanel.add(configPanel, BorderLayout.NORTH); diff --git a/src/main/java/fucoin/supervisor/SuperVisorImpl.java b/src/main/java/fucoin/supervisor/SuperVisorImpl.java index 86ffa7a03e9c3ffe2f61d89f9a84dc903f1fcc1b..168bdbee1c159a3deba238fd70f3181e05352319 100644 --- a/src/main/java/fucoin/supervisor/SuperVisorImpl.java +++ b/src/main/java/fucoin/supervisor/SuperVisorImpl.java @@ -1,5 +1,6 @@ package fucoin.supervisor; +import java.nio.file.Paths; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -18,6 +19,9 @@ import fucoin.actions.aggregation.AggregationContext; import fucoin.actions.aggregation.AggregationContext.AggregationContextBuilder; import fucoin.actions.aggregation.AggregationFunction; import fucoin.actions.aggregation.AggregationMethod; +import fucoin.actions.aggregation.statistics.FileLogger; +import fucoin.actions.aggregation.statistics.GraphicsConnector; +import fucoin.actions.aggregation.statistics.Statistics; import fucoin.actions.control.ActionWalletCreationDone; import fucoin.actions.persist.ActionInvokeUpdate; import fucoin.actions.transaction.ActionGetAmountAnswer; @@ -38,10 +42,13 @@ public class SuperVisorImpl extends AbstractNode implements TransactionLogger { private int pendingBankCommits = 0; private ActorRef bankCommitObserver = null; - private final Map<ActorRef, AggregationContext> aggregationResults = new HashMap<>(); + private final Map<ActorRef, AggregationContext> aggregationResults = new HashMap<>(); - public SuperVisorImpl() { + private int maxExchanges = -1; + + private Statistics statistics = new GraphicsConnector(); + public SuperVisorImpl() { } public void setGuiControl(SuperVisorGuiControl gui) { @@ -178,49 +185,87 @@ public class SuperVisorImpl extends AbstractNode implements TransactionLogger { this.bankCommitObserver = bankCommitObserver; } - public void startAggregation(AggregationMethod method, int exchanges) { - clearAggregationResults(); - Function<AbstractWallet, Double> valueExtractor = method.getValueExtractor(); - AggregationFunction function = method.getFunction(); - double value = method.getValue(); - AggregationContext context = new AggregationContextBuilder(function, valueExtractor, value) - .setMaxExchanges(exchanges) - .build(); - ActorRef randomNode = getRandomNeighbor(); - ActionAggregationInit init = new ActionAggregationInit(context, method.isUseValueExtractor()); - randomNode.tell(init, getSelf()); - } - - /** - * Returns the results of an aggregation operation that have been sent back - * to the supervisor. - * - * @return aggregation results - */ - public Map<ActorRef, AggregationContext> getAggregationResults() { - return aggregationResults; - } - - /** - * Clears previous aggregation results, such that a new aggregation can be - * started. - */ - public void clearAggregationResults() { - getAggregationResults().clear(); - } - - /** - * Adds a new aggregation result, consisting of the ActorRef of the wallet - * and the corresponding AggregationContext. - * - * @param wallet - * @param context - * AggregationContext, that holds the aggregated value - */ - public void addAggregationResult(ActorRef wallet, AggregationContext context) { - System.out.println("add: " + context.toString()); - getAggregationResults().put(wallet, context); - gui.updateAggregationValue(wallet.path() - .name(), context.getValue()); - } + public void startAggregation(AggregationMethod method, int exchanges, double _value, double secondValue, int aggregation) { + clearAggregationResults(); + Function<AbstractWallet, Double> valueExtractor = method.getValueExtractor(); + AggregationFunction function = method.getFunction(); + double value = method.getValue(); + AggregationContext context = new AggregationContextBuilder(function, valueExtractor, value) + .setMaxExchanges(exchanges) + .setAggregation(aggregation) + .build(); + ActorRef randomNode = getRandomNeighbor(); + ActionAggregationInit init = new ActionAggregationInit(context, method.isUseValueExtractor()); + randomNode.tell(init, getSelf()); + + this.maxExchanges = exchanges; + this.sendInitPacket(method.toString(), exchanges, _value, secondValue); + } + + + public void startAggregation(AggregationMethod method, int exchanges, double _value, int aggregation) { + clearAggregationResults(); + Function<AbstractWallet, Double> valueExtractor = method.getValueExtractor(); + AggregationFunction function = method.getFunction(); + double value = method.getValue(); + AggregationContext context = new AggregationContextBuilder(function, valueExtractor, value) + .setMaxExchanges(exchanges) + .setAggregation(aggregation) + .build(); + ActorRef randomNode = getRandomNeighbor(); + ActionAggregationInit init = new ActionAggregationInit(context, method.isUseValueExtractor()); + randomNode.tell(init, getSelf()); + + this.maxExchanges = exchanges; + this.sendInitPacket(method.toString(), exchanges, _value); + } + + /** + * Returns the results of an aggregation operation that have been sent back + * to the supervisor. + * + * @return aggregation results + */ + public Map<ActorRef, AggregationContext> getAggregationResults() { + return aggregationResults; + } + + /** + * Clears previous aggregation results, such that a new aggregation can be + * started. + */ + public void clearAggregationResults() { + getAggregationResults().clear(); + } + + /** + * Adds a new aggregation result, consisting of the ActorRef of the wallet + * and the corresponding AggregationContext. + * + * @param wallet + * @param context AggregationContext, that holds the aggregated value + */ + public void addAggregationResult(ActorRef wallet, AggregationContext context) { + System.out.println("add: " + context.toString()); + getAggregationResults().put(wallet, context); + gui.updateAggregationValue(wallet.path() + .name(), context.getValue()); + } + + + public void sendValuePacket(String name, double value, int exchange, int aggregation, boolean result) { + this.statistics.valuePacket(name, aggregation, exchange, value, result); + } + + public void sendValuePacket(String name, double value, int aggregation, boolean result) { + this.statistics.valuePacket(name, aggregation, maxExchanges, value, result); + } + + public void sendInitPacket(String type, int exchanges, double value) { + this.statistics.initPacket(type, value, exchanges); + } + + public void sendInitPacket(String type, int exchanges, double value, double secondValue) { + this.statistics.initPacket(type, value, exchanges, secondValue); + } } diff --git a/src/main/java/fucoin/wallet/WalletImpl.java b/src/main/java/fucoin/wallet/WalletImpl.java index 9afc7d3533cb1fe9fb715f2fb9759c04a775cbf5..a5f7f7836dafb251f31436da6253735a11f1d9e3 100644 --- a/src/main/java/fucoin/wallet/WalletImpl.java +++ b/src/main/java/fucoin/wallet/WalletImpl.java @@ -10,8 +10,8 @@ import com.google.common.collect.EvictingQueue; import akka.actor.ActorRef; import akka.actor.Props; import fucoin.actions.ClientAction; +import fucoin.actions.aggregation.ActionAggregationStatistics; import fucoin.actions.aggregation.AggregationContext; -import fucoin.actions.aggregation.AggregationContext.AggregationContextBuilder; import fucoin.actions.join.ActionJoin; import fucoin.actions.join.ActionJoinAnswer; import fucoin.actions.join.ActionTellSupervisor; @@ -30,7 +30,7 @@ public class WalletImpl extends AbstractWallet { private transient WalletGuiControl gui; private String preKnownNeighbourName; private boolean isActive; - private AggregationContext aggregationContext = new AggregationContextBuilder(null, null, amount).build(); + private AggregationContext aggregationContext; private boolean hasPendingAggregationRequest = false; private ConcurrentLinkedQueue<ActorRef> deferedSupervisorReceivers = new ConcurrentLinkedQueue<>(); private transient final EvictingQueue<UUID> handledAggregationRequests = EvictingQueue.create(10); @@ -268,6 +268,7 @@ public class WalletImpl extends AbstractWallet { public void setAggregationContext(AggregationContext context) { System.out.println("Intermediate " + getName() + ": " + context); this.aggregationContext = context; + this.getRemoteSuperVisorActor().tell(new ActionAggregationStatistics(getName(), context.getValue(), context.getCurrentExchanges(), context.getAggregation()), this.getSelf()); } @Override