diff --git a/pom.xml b/pom.xml index 05d1ca3a064ed4aec8cf691471fb7f6185e84c04..ca8d113ddf2b061526fbfd2129f08a15246273c6 100644 --- a/pom.xml +++ b/pom.xml @@ -62,6 +62,15 @@ <artifactId>gephi-toolkit</artifactId> <version>0.9.1</version> </dependency> - + <dependency> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-client</artifactId> + <version>1.19.1</version> + </dependency> + <dependency> + <groupId>org.json</groupId> + <artifactId>json</artifactId> + <version>20160212</version> + </dependency> </dependencies> </project> \ No newline at end of file diff --git a/src/main/java/fucoin/actions/aggregation/ActionAggregation.java b/src/main/java/fucoin/actions/aggregation/ActionAggregation.java index 723768db906a8a74e3ecb6ce8aa172bf7043d866..973e3dbfabe9d16a37e1abaacf50fc3dd3b48593 100644 --- a/src/main/java/fucoin/actions/aggregation/ActionAggregation.java +++ b/src/main/java/fucoin/actions/aggregation/ActionAggregation.java @@ -55,6 +55,7 @@ public abstract class ActionAggregation extends ClientAction { private void sendSleepingRequest(AbstractWallet wallet, AggregationContext aggregationContext) { ActorRef randomNeighbor = wallet.getRandomNeighbor(); + System.out.println(randomNeighbor); Thread sleepingRequest = new Thread() { @Override public void run() { diff --git a/src/main/java/fucoin/actions/aggregation/ActionAggregationResult.java b/src/main/java/fucoin/actions/aggregation/ActionAggregationResult.java index 9a74e2bf3dddd672615d5f52dbc844d3aaac269a..63b7dde4dac1dd5ad92a235f56bbdd8e795cb153 100644 --- a/src/main/java/fucoin/actions/aggregation/ActionAggregationResult.java +++ b/src/main/java/fucoin/actions/aggregation/ActionAggregationResult.java @@ -25,6 +25,8 @@ public class ActionAggregationResult extends SuperVisorAction { @Override protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, SuperVisorImpl superVisor) { superVisor.addAggregationResult(sender, getContext()); + // send statistics result packet... + superVisor.sendValuePacket(sender.path().name(), getContext().getResult(), getContext().getMaxExchanges(), getContext().getAggregation(), true); } public AggregationContext getContext() { diff --git a/src/main/java/fucoin/actions/aggregation/ActionAggregationStatistics.java b/src/main/java/fucoin/actions/aggregation/ActionAggregationStatistics.java new file mode 100644 index 0000000000000000000000000000000000000000..1d68d5b921339c48b00581c1c58e74eac2940a24 --- /dev/null +++ b/src/main/java/fucoin/actions/aggregation/ActionAggregationStatistics.java @@ -0,0 +1,29 @@ +package fucoin.actions.aggregation; + +import akka.actor.ActorRef; +import akka.actor.UntypedActorContext; +import fucoin.actions.transaction.SuperVisorAction; +import fucoin.supervisor.SuperVisorImpl; + +/** + * This class represents a statistic packet that is send each exchange to track the aggregation. + */ +public class ActionAggregationStatistics extends SuperVisorAction { + private String name; + private double value; + private int exchange; + private int aggregation; + + + public ActionAggregationStatistics(String name, double value, int exchange, int aggregation) { + this.value = value; + this.name = name; + this.exchange = exchange; + this.aggregation = aggregation; + } + + @Override + protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, SuperVisorImpl superVisor) { + superVisor.sendValuePacket(name, value, exchange, aggregation, false); + } +} diff --git a/src/main/java/fucoin/actions/aggregation/AggregationContext.java b/src/main/java/fucoin/actions/aggregation/AggregationContext.java index 43326be35442f9f66cd7e8b51290d0d8a07aaf5a..fa996f5d774662b9877005fc8ba1df774231f7d0 100644 --- a/src/main/java/fucoin/actions/aggregation/AggregationContext.java +++ b/src/main/java/fucoin/actions/aggregation/AggregationContext.java @@ -1,5 +1,5 @@ /** - * + * */ package fucoin.actions.aggregation; @@ -12,7 +12,6 @@ import fucoin.wallet.AbstractWallet; /** * @author Kim - * */ public class AggregationContext implements Serializable { private static final long serialVersionUID = -8314269748209085088L; @@ -24,9 +23,11 @@ public class AggregationContext implements Serializable { private final int maxExchanges; private final UUID uuid; private Function<double[], Double> resultExtractor; + // the value of the current aggregation in succession + private final int aggregation; private AggregationContext(AggregationFunction function, Function<AbstractWallet, double[]> valueExtractor, Function<AbstractWallet, double[]> valueAltExtractor, - double[] values, int currentExchanges, int maxExchanges, UUID uuid, Function<double[], Double> resultExtractor) { + double[] values, int currentExchanges, int maxExchanges, UUID uuid, Function<double[], Double> resultExtractor, int aggregation) { this.function = function; this.valueExtractor = valueExtractor; this.valueAltExtractor = valueAltExtractor; @@ -35,30 +36,34 @@ public class AggregationContext implements Serializable { this.maxExchanges = maxExchanges; this.uuid = uuid; this.resultExtractor = resultExtractor; + this.aggregation = aggregation; } public AggregationContext aggregate(AggregationContext neighborContext) { double[] aggregatedValues = getFunction().apply(neighborContext.getValues(), getValues()); AggregationContext aggregatedContext = new AggregationContextBuilder(getFunction(), aggregatedValues) + .setValueExtractor(getValueExtractor()) + .setValueAltExtractor(getAltValueExtractor()) + .setResultExtractor(resultExtractor) .setCurrentExchanges(getCurrentExchanges() + 1) - .setValueExtractor(getValueExtractor()) - .setValueAltExtractor(getValueAltExtractor()) .setMaxExchanges(getMaxExchanges()) .setUuid(getUuid()) - .setResultExtractor(resultExtractor) + .setAggregation(getAggregation()) .build(); System.out.println("Aggregation: " + Arrays.toString(getValues()) + " + " + Arrays.toString(neighborContext.getValues()) + " = " + aggregatedValues); return aggregatedContext; } public AggregationContext initContext(AbstractWallet wallet, boolean useValueExtractor) { - double[] initValues = useValueExtractor ? getValueExtractor().apply(wallet) : getValueAltExtractor().apply(wallet); + double[] initValues = useValueExtractor ? getValueExtractor().apply(wallet) : getAltValueExtractor().apply(wallet); AggregationContext initContext = new AggregationContextBuilder(getFunction(), initValues) + .setValueExtractor(getValueExtractor()) + .setValueAltExtractor(getAltValueExtractor()) + .setResultExtractor(resultExtractor) + .setCurrentExchanges(getCurrentExchanges()) .setMaxExchanges(getMaxExchanges()) - .setValueExtractor(getValueExtractor()) - .setValueAltExtractor(getValueAltExtractor()) + .setAggregation(getAggregation()) .setUuid(getUuid()) - .setResultExtractor(resultExtractor) .build(); return initContext; } @@ -90,18 +95,20 @@ public class AggregationContext implements Serializable { return valueExtractor; } - private Function<AbstractWallet, double[]> getValueAltExtractor() { + private Function<AbstractWallet, double[]> getAltValueExtractor() { return valueAltExtractor; } - private int getCurrentExchanges() { + public int getCurrentExchanges() { return currentExchanges; } - private int getMaxExchanges() { + public int getMaxExchanges() { return maxExchanges; } + public int getAggregation() { return this.aggregation; } + public UUID getUuid() { return uuid; } @@ -113,6 +120,7 @@ public class AggregationContext implements Serializable { private Function<AbstractWallet, double[]> valueAltExtractor; private int currentExchanges; private int maxExchanges; + private int aggregation; private UUID uuid; private Function<double[], Double> resultExtractor; @@ -121,45 +129,49 @@ public class AggregationContext implements Serializable { this.valueExtractor = wallet -> new double[] { wallet.getAmount() }; this.valueAltExtractor = this.valueExtractor; this.values = values; - this.currentExchanges = 0; this.maxExchanges = 20; this.uuid = UUID.randomUUID(); this.resultExtractor = valuesArray -> valuesArray[0]; } + + public AggregationContextBuilder setValueExtractor(Function<AbstractWallet, double[]> valueExtractor) { + this.valueExtractor = valueExtractor; + return this; + } - public AggregationContextBuilder setCurrentExchanges(int currentExchanges) { - this.currentExchanges = currentExchanges; + public AggregationContextBuilder setValueAltExtractor(Function<AbstractWallet, double[]> valueAltExtractor) { + this.valueAltExtractor = valueAltExtractor; return this; } - public AggregationContextBuilder setMaxExchanges(int maxExchanges) { - this.maxExchanges = maxExchanges; + public AggregationContextBuilder setResultExtractor(Function<double[], Double> resultExtractor) { + this.resultExtractor = resultExtractor; return this; } - public AggregationContextBuilder setUuid(UUID uuid) { - this.uuid = uuid; + public AggregationContextBuilder setAggregation(int aggregation) { + this.aggregation = aggregation; return this; } - public AggregationContextBuilder setResultExtractor(Function<double[], Double> resultExtractor) { - this.resultExtractor = resultExtractor; + public AggregationContextBuilder setCurrentExchanges(int currentExchanges) { + this.currentExchanges = currentExchanges; return this; } - public AggregationContextBuilder setValueExtractor(Function<AbstractWallet, double[]> valueExtractor) { - this.valueExtractor = valueExtractor; + public AggregationContextBuilder setMaxExchanges(int maxExchanges) { + this.maxExchanges = maxExchanges; return this; } - public AggregationContextBuilder setValueAltExtractor(Function<AbstractWallet, double[]> valueAltExtractor) { - this.valueAltExtractor = valueAltExtractor; + public AggregationContextBuilder setUuid(UUID uuid) { + this.uuid = uuid; return this; } public AggregationContext build() { - return new AggregationContext(function, valueExtractor, valueAltExtractor, values, currentExchanges, maxExchanges, uuid, resultExtractor); + return new AggregationContext(function, valueExtractor, valueAltExtractor, values, currentExchanges, maxExchanges, uuid, resultExtractor, aggregation); } } diff --git a/src/main/java/fucoin/actions/aggregation/statistics/FileLogger.java b/src/main/java/fucoin/actions/aggregation/statistics/FileLogger.java new file mode 100644 index 0000000000000000000000000000000000000000..f70a6d28ce71fb0baa5e7f951a6ca21be1ec0bcb --- /dev/null +++ b/src/main/java/fucoin/actions/aggregation/statistics/FileLogger.java @@ -0,0 +1,57 @@ +package fucoin.actions.aggregation.statistics; + +import org.json.JSONObject; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; + +/** + * Simple file logger for statistics + */ +public class FileLogger implements Statistics { + + private Path filePath; + + public FileLogger(Path filePath) { + this.filePath = filePath; + } + + @Override + public void initPacket(String type, double value, int maxIndex) { + JSONObject jsonObject = new JSONObject(); + jsonObject.put("type", type); + jsonObject.put("value", value); + jsonObject.put("max_index", maxIndex); + + save(PacketType.InitPacket, jsonObject); + } + + @Override + public void initPacket(String type, double value, int maxIndex, double secondValue) { + JSONObject jsonObject = new JSONObject(); + jsonObject.put("type", type); + jsonObject.put("value", value); + jsonObject.put("max_index", maxIndex); + jsonObject.put("second_value", secondValue); + + save(PacketType.InitPacket, jsonObject); + } + + @Override + public void valuePacket(String walletName, int aggregation, int exchange, double value, boolean result) { + save(PacketType.ValuePacket, JsonHelper.createValuePacket(walletName, aggregation, exchange, value, result)); + + } + + private void save(Statistics.PacketType packet, JSONObject json) { + try (BufferedWriter writer = Files.newBufferedWriter(filePath, Charset.defaultCharset(), StandardOpenOption.APPEND)) { + writer.write(packet.toString() + " " + json.toString() + System.lineSeparator()); + } catch (IOException e) { + + } + } +} diff --git a/src/main/java/fucoin/actions/aggregation/statistics/GraphicsConnector.java b/src/main/java/fucoin/actions/aggregation/statistics/GraphicsConnector.java new file mode 100644 index 0000000000000000000000000000000000000000..27cd1af10765ccfa446807af6edd46a23c0a4141 --- /dev/null +++ b/src/main/java/fucoin/actions/aggregation/statistics/GraphicsConnector.java @@ -0,0 +1,42 @@ +package fucoin.actions.aggregation.statistics; + +import com.sun.jersey.api.client.*; +import org.json.JSONObject; + +import javax.ws.rs.core.MediaType; + +/** + * Connector for the graphService + */ +public class GraphicsConnector implements Statistics { + + private static final String BASE_URI = "http://localhost:8080/graphService/"; + private Client client; + + public GraphicsConnector() { + client = Client.create(); + } + + @Override + public void initPacket(String type, double value, int maxIndex) { + send(PacketType.InitPacket, JsonHelper.createInitPacket(type, value, maxIndex)); + } + + @Override + public void initPacket(String type, double value, int maxIndex, double secondValue) { + send(PacketType.InitPacket, JsonHelper.createInitPacket(type, value, maxIndex, secondValue)); + } + + @Override + public void valuePacket(String walletName, int aggregation, int exchange, double value, boolean result) { + send(PacketType.ValuePacket, JsonHelper.createValuePacket(walletName, aggregation, exchange, value, result)); + + } + + private void send(PacketType packet, JSONObject json) { + AsyncWebResource webResource = client.asyncResource(BASE_URI); + webResource.path(packet.toString()).accept(MediaType.APPLICATION_JSON_TYPE). + type(MediaType.APPLICATION_JSON_TYPE).put(ClientResponse.class, json.toString()); + } +} + diff --git a/src/main/java/fucoin/actions/aggregation/statistics/JsonHelper.java b/src/main/java/fucoin/actions/aggregation/statistics/JsonHelper.java new file mode 100644 index 0000000000000000000000000000000000000000..db9c246633bf74d257f9c18cafc08f8ee2e4d1c0 --- /dev/null +++ b/src/main/java/fucoin/actions/aggregation/statistics/JsonHelper.java @@ -0,0 +1,42 @@ +package fucoin.actions.aggregation.statistics; + +import org.json.JSONObject; + +/** + * Created by felix on 15.07.2016. + */ +public class JsonHelper { + private JsonHelper() { + } + + public static JSONObject createInitPacket(String type, double value, int maxIndex) { + JSONObject jsonObject = new JSONObject(); + jsonObject.put("type", type); + jsonObject.put("value", value); + jsonObject.put("max_index", maxIndex); + + return jsonObject; + } + public static JSONObject createInitPacket(String type, double value, int maxIndex, double secondValue) { + + JSONObject jsonObject = createInitPacket(type, value, maxIndex); + jsonObject.put("second_value" , secondValue); + + return jsonObject; + } + public static JSONObject createValuePacket(String walletName, int aggregation, int exchange, double value, boolean result) { + JSONObject jsonObject = new JSONObject(); + jsonObject.put("walletName", walletName); + jsonObject.put("aggregation", aggregation); + jsonObject.put("exchange", exchange); + // fix the Infinity bug + if(Double.isInfinite(value)) { + jsonObject.put("value", 0.0); + } else { + jsonObject.put("value", value); + } + jsonObject.put("result", result); + + return jsonObject; + } +} diff --git a/src/main/java/fucoin/actions/aggregation/statistics/Statistics.java b/src/main/java/fucoin/actions/aggregation/statistics/Statistics.java new file mode 100644 index 0000000000000000000000000000000000000000..612df0fb0a264bc6319a740909f880d7f42a1a7b --- /dev/null +++ b/src/main/java/fucoin/actions/aggregation/statistics/Statistics.java @@ -0,0 +1,40 @@ +package fucoin.actions.aggregation.statistics; + +/** + * Interface for statistics service + */ +public interface Statistics { + /** + * Sends an packet to initialize the graph. + * + * @param type the type of aggregation + * @param value the exact value of the aggregation + * @param maxIndex the number of exchanges that we want to display + */ + void initPacket(String type, double value, int maxIndex); + + /** + * Sends an packet to initialize the graph. + * + * @param type the type of aggregation + * @param value the exact value of the aggregation + * @param maxIndex the number of exchange that we want to display + * @param secondValue the second smallest/biggest value, needed for min/max aggregation + */ + void initPacket(String type, double value, int maxIndex, double secondValue); + + /** + * Sends an packet with the observed value. + * + * @param walletName the wallet that contains the value + * @param aggregation the number of aggregation we are currently on + * @param exchange the number of exchange the packet is on + * @param value the value of the wallet at that given exchange + * @param result true if this is the final result, false otherwise + */ + void valuePacket(String walletName, int aggregation, int exchange, double value, boolean result); + + enum PacketType { + InitPacket, ValuePacket + } +} diff --git a/src/main/java/fucoin/configurations/MassWalletConfiguration.java b/src/main/java/fucoin/configurations/MassWalletConfiguration.java new file mode 100644 index 0000000000000000000000000000000000000000..57bd49a19d0d808959b39b5fbb601ff585b09291 --- /dev/null +++ b/src/main/java/fucoin/configurations/MassWalletConfiguration.java @@ -0,0 +1,33 @@ +package fucoin.configurations; + +import akka.actor.ActorRef; +import fucoin.configurations.internal.ConfigurationName; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.LockSupport; + +/** + * This configuration spawns 50 wallets to demonstrate the spawning of many headless wallets + */ +@ConfigurationName("Lots of Wallets") +public class MassWalletConfiguration extends AbstractConfiguration { + @Override + public void run() { + ActorRef supervisor = initSupervisor(); + try { + spawnWallets(40, false); + System.out.println("Wallet spawning done!"); + } catch (Exception e) { + System.out.println("Wallet spawning timed out!"); + } + + LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(2)); + + randomTransactions(10, 10); + } + + @Override + public void onReceive(Object message) { + super.onReceive(message); + } +} diff --git a/src/main/java/fucoin/gui/SuperVisorGuiControlImpl.java b/src/main/java/fucoin/gui/SuperVisorGuiControlImpl.java index 46d8b9a2f130c7af9777a1366309d65c7541f321..48c9323b694ffd8eecd4f3ca723650d53dfd4cee 100644 --- a/src/main/java/fucoin/gui/SuperVisorGuiControlImpl.java +++ b/src/main/java/fucoin/gui/SuperVisorGuiControlImpl.java @@ -87,7 +87,7 @@ public class SuperVisorGuiControlImpl implements SuperVisorGuiControl { return logActive; } - public void startAggregation(AggregationMethod method, int exchanges) { - superVisor.startAggregation(method, exchanges); + public void startAggregation(AggregationMethod method, int exchanges, int aggregation, double exactValue, double secondValue) { + superVisor.startAggregation(method, exchanges, aggregation, exactValue, secondValue); } } diff --git a/src/main/java/fucoin/gui/SuperVisorThreadGUI.java b/src/main/java/fucoin/gui/SuperVisorThreadGUI.java index a722a3f49c305b2e0a2c745a20d4ca8143ea4e1b..2bc4a0ef450369cb79b8b4b7db0a86cd9ff6e49e 100644 --- a/src/main/java/fucoin/gui/SuperVisorThreadGUI.java +++ b/src/main/java/fucoin/gui/SuperVisorThreadGUI.java @@ -5,6 +5,11 @@ import java.awt.GridLayout; import java.awt.event.ItemEvent; import java.awt.event.WindowAdapter; import java.awt.event.WindowEvent; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; import javax.swing.JButton; import javax.swing.JCheckBox; @@ -39,6 +44,10 @@ public class SuperVisorThreadGUI { private JSpinner aggregationExchangesSpinner; + private AggregationMethod currentMethod; + private int aggregation = 0; + + public SuperVisorThreadGUI(SuperVisorGuiControlImpl superVisorGuiControl) { this.superVisorGuiControl = superVisorGuiControl; @@ -59,15 +68,18 @@ public class SuperVisorThreadGUI { txtLog.setCellRenderer(new LogCellRenderer()); - aggregationMethodsBox = new JComboBox<AggregationMethod>(AggregationMethod.values()); + aggregationMethodsBox = new JComboBox<>(AggregationMethod.values()); aggregationExchangesSpinner = new JSpinner(new SpinnerNumberModel(25, 1, 10000, 1)); - startAggregationButton = new JButton("Start Aggregation"); + + startAggregationButton = new JButton("Start Aggregation"); startAggregationButton.addActionListener(e -> { - AggregationMethod method = (AggregationMethod) aggregationMethodsBox.getSelectedItem(); - int exchanges = (int) aggregationExchangesSpinner.getModel() - .getValue(); - superVisorGuiControl.startAggregation(method, exchanges); + AggregationMethod method = (AggregationMethod) aggregationMethodsBox.getSelectedItem(); + int exchanges = (int) aggregationExchangesSpinner.getModel() + .getValue(); + + setAggregation(method); + startAggregation(method, exchanges); }); showDebug = new JCheckBox("Show debug messages in transaction log"); @@ -149,4 +161,59 @@ public class SuperVisorThreadGUI { txtLog.ensureIndexIsVisible(log.getSize() - 1); }); } + + private void setAggregation(AggregationMethod aggregationMethod) { + if (Objects.isNull(this.currentMethod)) { + this.currentMethod = aggregationMethod; + this.aggregation = 0; + } + if (this.currentMethod.equals(aggregationMethod)) { + this.aggregation++; + } else { + this.aggregation = 0; + } + this.currentMethod = aggregationMethod; + } + + private List<Integer> valueList() { + List<Integer> values = new ArrayList<>(); + for (int i = 0; i < superVisorGuiControl.getAmountTableModel().getRowCount(); i++) { + values.add((Integer) superVisorGuiControl.getAmountTableModel().getValueAt(i, 2)); + } + Collections.sort(values); + return values; + } + + private void startAggregation(AggregationMethod method, int exchanges) { + List<Integer> values = valueList(); + double exactValue = 0; + double secondValue = Integer.MIN_VALUE-1; + + // since values is a sorted list minimum and second smallest element are at the beginning + if (method.equals(AggregationMethod.Minimum)) { + exactValue = values.get(0); + secondValue = values.get(1); + } + + if (method.equals(AggregationMethod.Average)) { + exactValue = values.stream().collect(Collectors.averagingInt(Integer::intValue)); + } + + // since values is a sorted list maximum and second biggest element are at the end + if (method.equals(AggregationMethod.Maximum)) { + exactValue = values.get(values.size() - 1); + secondValue = values.get(values.size() - 2); + } + + if(method.equals(AggregationMethod.Count)) { + exactValue = values.size(); + } + + if(method.equals(AggregationMethod.Sum)) { + exactValue = values.stream() + .collect(Collectors.summingDouble(Integer::intValue)); + } + + superVisorGuiControl.startAggregation(method, exchanges, this.aggregation, exactValue, secondValue); + } } diff --git a/src/main/java/fucoin/supervisor/SuperVisorImpl.java b/src/main/java/fucoin/supervisor/SuperVisorImpl.java index 0458fe01f370d70b92245df775f2f49d9541173c..072c4c773eeec6399f01ec90f7cd20ba8b44c8ab 100644 --- a/src/main/java/fucoin/supervisor/SuperVisorImpl.java +++ b/src/main/java/fucoin/supervisor/SuperVisorImpl.java @@ -18,6 +18,8 @@ import fucoin.actions.aggregation.AggregationContext; import fucoin.actions.aggregation.AggregationContext.AggregationContextBuilder; import fucoin.actions.aggregation.AggregationFunction; import fucoin.actions.aggregation.AggregationMethod; +import fucoin.actions.aggregation.statistics.GraphicsConnector; +import fucoin.actions.aggregation.statistics.Statistics; import fucoin.actions.control.ActionWalletCreationDone; import fucoin.actions.persist.ActionInvokeUpdate; import fucoin.actions.transaction.ActionGetAmountAnswer; @@ -40,6 +42,9 @@ public class SuperVisorImpl extends AbstractNode implements TransactionLogger { private final Map<ActorRef, AggregationContext> aggregationResults = new HashMap<>(); + // refactor... + private Statistics statistics = new GraphicsConnector(); + public SuperVisorImpl() { } @@ -178,7 +183,7 @@ public class SuperVisorImpl extends AbstractNode implements TransactionLogger { this.bankCommitObserver = bankCommitObserver; } - public void startAggregation(AggregationMethod method, int exchanges) { + public void startAggregation(AggregationMethod method, int exchanges, int aggregation, double exactValue, double secondValue) { clearAggregationResults(); Function<AbstractWallet, double[]> valueExtractor = method.getValueExtractor(); Function<AbstractWallet, double[]> valueAltExtractor = method.getValueAltExtractor(); @@ -190,10 +195,17 @@ public class SuperVisorImpl extends AbstractNode implements TransactionLogger { .setValueAltExtractor(valueAltExtractor) .setResultExtractor(resultExtractor) .setMaxExchanges(exchanges) + .setAggregation(aggregation) .build(); ActorRef randomNode = getRandomNeighbor(); ActionAggregationInit init = new ActionAggregationInit(context, method.isUseValueExtractor()); randomNode.tell(init, getSelf()); + + // mask value + if(secondValue < Integer.MIN_VALUE) { + this.sendInitPacket(method.toString(), exchanges, exactValue); + } + this.sendInitPacket(method.toString(), exchanges, exactValue, secondValue); } /** @@ -228,4 +240,16 @@ public class SuperVisorImpl extends AbstractNode implements TransactionLogger { gui.updateAggregationValue(wallet.path() .name(), context.getResult()); } + + public void sendValuePacket(String name, double value, int exchange, int aggregation, boolean result) { + this.statistics.valuePacket(name, aggregation, exchange, value, result); + } + + public void sendInitPacket(String type, int exchanges, double value) { + this.statistics.initPacket(type, value, exchanges); + } + + public void sendInitPacket(String type, int exchanges, double value, double secondValue) { + this.statistics.initPacket(type, value, exchanges, secondValue); + } } diff --git a/src/main/java/fucoin/wallet/AbstractWallet.java b/src/main/java/fucoin/wallet/AbstractWallet.java index a60f73445ca41e2e9ba85fe3ac2af26317d1d1a0..0e92f83dbeb7b085cf162111382bdf3ff6c2c359 100644 --- a/src/main/java/fucoin/wallet/AbstractWallet.java +++ b/src/main/java/fucoin/wallet/AbstractWallet.java @@ -182,7 +182,8 @@ public abstract class AbstractWallet extends AbstractNode implements Serializabl overlayNeighbours.add(wallet); } - @Override + //TODO: ????!??? + /*@Override public ActorRef getRandomNeighbor() { List<ActorRef> neighbors = new ArrayList<>(overlayNeighbours); if (getExcludedNeighbors().size() < neighbors.size()) { @@ -191,5 +192,5 @@ public abstract class AbstractWallet extends AbstractNode implements Serializabl Random rand = new Random(); int index = rand.nextInt(neighbors.size()); return neighbors.get(index); - } + }*/ } diff --git a/src/main/java/fucoin/wallet/WalletImpl.java b/src/main/java/fucoin/wallet/WalletImpl.java index 9ee74ac3e8eb1088ec9da37c81a755a51b3d9d52..a32daafdefb3f8fabd45a7c33c25f330e0d84593 100644 --- a/src/main/java/fucoin/wallet/WalletImpl.java +++ b/src/main/java/fucoin/wallet/WalletImpl.java @@ -10,6 +10,7 @@ import com.google.common.collect.EvictingQueue; import akka.actor.ActorRef; import akka.actor.Props; import fucoin.actions.ClientAction; +import fucoin.actions.aggregation.ActionAggregationStatistics; import fucoin.actions.aggregation.AggregationContext; import fucoin.actions.aggregation.AggregationContext.AggregationContextBuilder; import fucoin.actions.join.ActionJoin; @@ -269,6 +270,7 @@ public class WalletImpl extends AbstractWallet { public void setAggregationContext(AggregationContext context) { System.out.println("Intermediate " + getName() + ": " + context); this.aggregationContext = context; + this.getRemoteSuperVisorActor().tell(new ActionAggregationStatistics(getName(), context.getResult(), context.getCurrentExchanges(), context.getAggregation()), this.getSelf()); } @Override