Skip to content
Snippets Groups Projects
Commit 82f306ee authored by Kim Kern's avatar Kim Kern
Browse files

merge

parents 62e143ca 54d59033
No related branches found
No related tags found
No related merge requests found
Showing
with 400 additions and 39 deletions
...@@ -62,6 +62,15 @@ ...@@ -62,6 +62,15 @@
<artifactId>gephi-toolkit</artifactId> <artifactId>gephi-toolkit</artifactId>
<version>0.9.1</version> <version>0.9.1</version>
</dependency> </dependency>
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-client</artifactId>
<version>1.19.1</version>
</dependency>
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20160212</version>
</dependency>
</dependencies> </dependencies>
</project> </project>
\ No newline at end of file
...@@ -55,6 +55,7 @@ public abstract class ActionAggregation extends ClientAction { ...@@ -55,6 +55,7 @@ public abstract class ActionAggregation extends ClientAction {
private void sendSleepingRequest(AbstractWallet wallet, AggregationContext aggregationContext) { private void sendSleepingRequest(AbstractWallet wallet, AggregationContext aggregationContext) {
ActorRef randomNeighbor = wallet.getRandomNeighbor(); ActorRef randomNeighbor = wallet.getRandomNeighbor();
System.out.println(randomNeighbor);
Thread sleepingRequest = new Thread() { Thread sleepingRequest = new Thread() {
@Override @Override
public void run() { public void run() {
......
...@@ -25,6 +25,8 @@ public class ActionAggregationResult extends SuperVisorAction { ...@@ -25,6 +25,8 @@ public class ActionAggregationResult extends SuperVisorAction {
@Override @Override
protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, SuperVisorImpl superVisor) { protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, SuperVisorImpl superVisor) {
superVisor.addAggregationResult(sender, getContext()); superVisor.addAggregationResult(sender, getContext());
// send statistics result packet...
superVisor.sendValuePacket(sender.path().name(), getContext().getResult(), getContext().getMaxExchanges(), getContext().getAggregation(), true);
} }
public AggregationContext getContext() { public AggregationContext getContext() {
......
package fucoin.actions.aggregation;
import akka.actor.ActorRef;
import akka.actor.UntypedActorContext;
import fucoin.actions.transaction.SuperVisorAction;
import fucoin.supervisor.SuperVisorImpl;
/**
* This class represents a statistic packet that is send each exchange to track the aggregation.
*/
public class ActionAggregationStatistics extends SuperVisorAction {
private String name;
private double value;
private int exchange;
private int aggregation;
public ActionAggregationStatistics(String name, double value, int exchange, int aggregation) {
this.value = value;
this.name = name;
this.exchange = exchange;
this.aggregation = aggregation;
}
@Override
protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, SuperVisorImpl superVisor) {
superVisor.sendValuePacket(name, value, exchange, aggregation, false);
}
}
...@@ -12,7 +12,6 @@ import fucoin.wallet.AbstractWallet; ...@@ -12,7 +12,6 @@ import fucoin.wallet.AbstractWallet;
/** /**
* @author Kim * @author Kim
*
*/ */
public class AggregationContext implements Serializable { public class AggregationContext implements Serializable {
private static final long serialVersionUID = -8314269748209085088L; private static final long serialVersionUID = -8314269748209085088L;
...@@ -24,9 +23,11 @@ public class AggregationContext implements Serializable { ...@@ -24,9 +23,11 @@ public class AggregationContext implements Serializable {
private final int maxExchanges; private final int maxExchanges;
private final UUID uuid; private final UUID uuid;
private Function<double[], Double> resultExtractor; private Function<double[], Double> resultExtractor;
// the value of the current aggregation in succession
private final int aggregation;
private AggregationContext(AggregationFunction function, Function<AbstractWallet, double[]> valueExtractor, Function<AbstractWallet, double[]> valueAltExtractor, private AggregationContext(AggregationFunction function, Function<AbstractWallet, double[]> valueExtractor, Function<AbstractWallet, double[]> valueAltExtractor,
double[] values, int currentExchanges, int maxExchanges, UUID uuid, Function<double[], Double> resultExtractor) { double[] values, int currentExchanges, int maxExchanges, UUID uuid, Function<double[], Double> resultExtractor, int aggregation) {
this.function = function; this.function = function;
this.valueExtractor = valueExtractor; this.valueExtractor = valueExtractor;
this.valueAltExtractor = valueAltExtractor; this.valueAltExtractor = valueAltExtractor;
...@@ -35,30 +36,34 @@ public class AggregationContext implements Serializable { ...@@ -35,30 +36,34 @@ public class AggregationContext implements Serializable {
this.maxExchanges = maxExchanges; this.maxExchanges = maxExchanges;
this.uuid = uuid; this.uuid = uuid;
this.resultExtractor = resultExtractor; this.resultExtractor = resultExtractor;
this.aggregation = aggregation;
} }
public AggregationContext aggregate(AggregationContext neighborContext) { public AggregationContext aggregate(AggregationContext neighborContext) {
double[] aggregatedValues = getFunction().apply(neighborContext.getValues(), getValues()); double[] aggregatedValues = getFunction().apply(neighborContext.getValues(), getValues());
AggregationContext aggregatedContext = new AggregationContextBuilder(getFunction(), aggregatedValues) AggregationContext aggregatedContext = new AggregationContextBuilder(getFunction(), aggregatedValues)
.setCurrentExchanges(getCurrentExchanges() + 1)
.setValueExtractor(getValueExtractor()) .setValueExtractor(getValueExtractor())
.setValueAltExtractor(getValueAltExtractor()) .setValueAltExtractor(getAltValueExtractor())
.setResultExtractor(resultExtractor)
.setCurrentExchanges(getCurrentExchanges() + 1)
.setMaxExchanges(getMaxExchanges()) .setMaxExchanges(getMaxExchanges())
.setUuid(getUuid()) .setUuid(getUuid())
.setResultExtractor(resultExtractor) .setAggregation(getAggregation())
.build(); .build();
System.out.println("Aggregation: " + Arrays.toString(getValues()) + " + " + Arrays.toString(neighborContext.getValues()) + " = " + aggregatedValues); System.out.println("Aggregation: " + Arrays.toString(getValues()) + " + " + Arrays.toString(neighborContext.getValues()) + " = " + aggregatedValues);
return aggregatedContext; return aggregatedContext;
} }
public AggregationContext initContext(AbstractWallet wallet, boolean useValueExtractor) { public AggregationContext initContext(AbstractWallet wallet, boolean useValueExtractor) {
double[] initValues = useValueExtractor ? getValueExtractor().apply(wallet) : getValueAltExtractor().apply(wallet); double[] initValues = useValueExtractor ? getValueExtractor().apply(wallet) : getAltValueExtractor().apply(wallet);
AggregationContext initContext = new AggregationContextBuilder(getFunction(), initValues) AggregationContext initContext = new AggregationContextBuilder(getFunction(), initValues)
.setMaxExchanges(getMaxExchanges())
.setValueExtractor(getValueExtractor()) .setValueExtractor(getValueExtractor())
.setValueAltExtractor(getValueAltExtractor()) .setValueAltExtractor(getAltValueExtractor())
.setUuid(getUuid())
.setResultExtractor(resultExtractor) .setResultExtractor(resultExtractor)
.setCurrentExchanges(getCurrentExchanges())
.setMaxExchanges(getMaxExchanges())
.setAggregation(getAggregation())
.setUuid(getUuid())
.build(); .build();
return initContext; return initContext;
} }
...@@ -90,18 +95,20 @@ public class AggregationContext implements Serializable { ...@@ -90,18 +95,20 @@ public class AggregationContext implements Serializable {
return valueExtractor; return valueExtractor;
} }
private Function<AbstractWallet, double[]> getValueAltExtractor() { private Function<AbstractWallet, double[]> getAltValueExtractor() {
return valueAltExtractor; return valueAltExtractor;
} }
private int getCurrentExchanges() { public int getCurrentExchanges() {
return currentExchanges; return currentExchanges;
} }
private int getMaxExchanges() { public int getMaxExchanges() {
return maxExchanges; return maxExchanges;
} }
public int getAggregation() { return this.aggregation; }
public UUID getUuid() { public UUID getUuid() {
return uuid; return uuid;
} }
...@@ -113,6 +120,7 @@ public class AggregationContext implements Serializable { ...@@ -113,6 +120,7 @@ public class AggregationContext implements Serializable {
private Function<AbstractWallet, double[]> valueAltExtractor; private Function<AbstractWallet, double[]> valueAltExtractor;
private int currentExchanges; private int currentExchanges;
private int maxExchanges; private int maxExchanges;
private int aggregation;
private UUID uuid; private UUID uuid;
private Function<double[], Double> resultExtractor; private Function<double[], Double> resultExtractor;
...@@ -121,45 +129,49 @@ public class AggregationContext implements Serializable { ...@@ -121,45 +129,49 @@ public class AggregationContext implements Serializable {
this.valueExtractor = wallet -> new double[] { wallet.getAmount() }; this.valueExtractor = wallet -> new double[] { wallet.getAmount() };
this.valueAltExtractor = this.valueExtractor; this.valueAltExtractor = this.valueExtractor;
this.values = values; this.values = values;
this.currentExchanges = 0; this.currentExchanges = 0;
this.maxExchanges = 20; this.maxExchanges = 20;
this.uuid = UUID.randomUUID(); this.uuid = UUID.randomUUID();
this.resultExtractor = valuesArray -> valuesArray[0]; this.resultExtractor = valuesArray -> valuesArray[0];
} }
public AggregationContextBuilder setCurrentExchanges(int currentExchanges) { public AggregationContextBuilder setValueExtractor(Function<AbstractWallet, double[]> valueExtractor) {
this.currentExchanges = currentExchanges; this.valueExtractor = valueExtractor;
return this; return this;
} }
public AggregationContextBuilder setMaxExchanges(int maxExchanges) { public AggregationContextBuilder setValueAltExtractor(Function<AbstractWallet, double[]> valueAltExtractor) {
this.maxExchanges = maxExchanges; this.valueAltExtractor = valueAltExtractor;
return this; return this;
} }
public AggregationContextBuilder setUuid(UUID uuid) { public AggregationContextBuilder setResultExtractor(Function<double[], Double> resultExtractor) {
this.uuid = uuid; this.resultExtractor = resultExtractor;
return this; return this;
} }
public AggregationContextBuilder setResultExtractor(Function<double[], Double> resultExtractor) { public AggregationContextBuilder setAggregation(int aggregation) {
this.resultExtractor = resultExtractor; this.aggregation = aggregation;
return this; return this;
} }
public AggregationContextBuilder setValueExtractor(Function<AbstractWallet, double[]> valueExtractor) { public AggregationContextBuilder setCurrentExchanges(int currentExchanges) {
this.valueExtractor = valueExtractor; this.currentExchanges = currentExchanges;
return this; return this;
} }
public AggregationContextBuilder setValueAltExtractor(Function<AbstractWallet, double[]> valueAltExtractor) { public AggregationContextBuilder setMaxExchanges(int maxExchanges) {
this.valueAltExtractor = valueAltExtractor; this.maxExchanges = maxExchanges;
return this;
}
public AggregationContextBuilder setUuid(UUID uuid) {
this.uuid = uuid;
return this; return this;
} }
public AggregationContext build() { public AggregationContext build() {
return new AggregationContext(function, valueExtractor, valueAltExtractor, values, currentExchanges, maxExchanges, uuid, resultExtractor); return new AggregationContext(function, valueExtractor, valueAltExtractor, values, currentExchanges, maxExchanges, uuid, resultExtractor, aggregation);
} }
} }
......
package fucoin.actions.aggregation.statistics;
import org.json.JSONObject;
import java.io.BufferedWriter;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
/**
* Simple file logger for statistics
*/
public class FileLogger implements Statistics {
private Path filePath;
public FileLogger(Path filePath) {
this.filePath = filePath;
}
@Override
public void initPacket(String type, double value, int maxIndex) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("type", type);
jsonObject.put("value", value);
jsonObject.put("max_index", maxIndex);
save(PacketType.InitPacket, jsonObject);
}
@Override
public void initPacket(String type, double value, int maxIndex, double secondValue) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("type", type);
jsonObject.put("value", value);
jsonObject.put("max_index", maxIndex);
jsonObject.put("second_value", secondValue);
save(PacketType.InitPacket, jsonObject);
}
@Override
public void valuePacket(String walletName, int aggregation, int exchange, double value, boolean result) {
save(PacketType.ValuePacket, JsonHelper.createValuePacket(walletName, aggregation, exchange, value, result));
}
private void save(Statistics.PacketType packet, JSONObject json) {
try (BufferedWriter writer = Files.newBufferedWriter(filePath, Charset.defaultCharset(), StandardOpenOption.APPEND)) {
writer.write(packet.toString() + " " + json.toString() + System.lineSeparator());
} catch (IOException e) {
}
}
}
package fucoin.actions.aggregation.statistics;
import com.sun.jersey.api.client.*;
import org.json.JSONObject;
import javax.ws.rs.core.MediaType;
/**
* Connector for the graphService
*/
public class GraphicsConnector implements Statistics {
private static final String BASE_URI = "http://localhost:8080/graphService/";
private Client client;
public GraphicsConnector() {
client = Client.create();
}
@Override
public void initPacket(String type, double value, int maxIndex) {
send(PacketType.InitPacket, JsonHelper.createInitPacket(type, value, maxIndex));
}
@Override
public void initPacket(String type, double value, int maxIndex, double secondValue) {
send(PacketType.InitPacket, JsonHelper.createInitPacket(type, value, maxIndex, secondValue));
}
@Override
public void valuePacket(String walletName, int aggregation, int exchange, double value, boolean result) {
send(PacketType.ValuePacket, JsonHelper.createValuePacket(walletName, aggregation, exchange, value, result));
}
private void send(PacketType packet, JSONObject json) {
AsyncWebResource webResource = client.asyncResource(BASE_URI);
webResource.path(packet.toString()).accept(MediaType.APPLICATION_JSON_TYPE).
type(MediaType.APPLICATION_JSON_TYPE).put(ClientResponse.class, json.toString());
}
}
package fucoin.actions.aggregation.statistics;
import org.json.JSONObject;
/**
* Created by felix on 15.07.2016.
*/
public class JsonHelper {
private JsonHelper() {
}
public static JSONObject createInitPacket(String type, double value, int maxIndex) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("type", type);
jsonObject.put("value", value);
jsonObject.put("max_index", maxIndex);
return jsonObject;
}
public static JSONObject createInitPacket(String type, double value, int maxIndex, double secondValue) {
JSONObject jsonObject = createInitPacket(type, value, maxIndex);
jsonObject.put("second_value" , secondValue);
return jsonObject;
}
public static JSONObject createValuePacket(String walletName, int aggregation, int exchange, double value, boolean result) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("walletName", walletName);
jsonObject.put("aggregation", aggregation);
jsonObject.put("exchange", exchange);
// fix the Infinity bug
if(Double.isInfinite(value)) {
jsonObject.put("value", 0.0);
} else {
jsonObject.put("value", value);
}
jsonObject.put("result", result);
return jsonObject;
}
}
package fucoin.actions.aggregation.statistics;
/**
* Interface for statistics service
*/
public interface Statistics {
/**
* Sends an packet to initialize the graph.
*
* @param type the type of aggregation
* @param value the exact value of the aggregation
* @param maxIndex the number of exchanges that we want to display
*/
void initPacket(String type, double value, int maxIndex);
/**
* Sends an packet to initialize the graph.
*
* @param type the type of aggregation
* @param value the exact value of the aggregation
* @param maxIndex the number of exchange that we want to display
* @param secondValue the second smallest/biggest value, needed for min/max aggregation
*/
void initPacket(String type, double value, int maxIndex, double secondValue);
/**
* Sends an packet with the observed value.
*
* @param walletName the wallet that contains the value
* @param aggregation the number of aggregation we are currently on
* @param exchange the number of exchange the packet is on
* @param value the value of the wallet at that given exchange
* @param result true if this is the final result, false otherwise
*/
void valuePacket(String walletName, int aggregation, int exchange, double value, boolean result);
enum PacketType {
InitPacket, ValuePacket
}
}
package fucoin.configurations;
import akka.actor.ActorRef;
import fucoin.configurations.internal.ConfigurationName;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
/**
* This configuration spawns 50 wallets to demonstrate the spawning of many headless wallets
*/
@ConfigurationName("Lots of Wallets")
public class MassWalletConfiguration extends AbstractConfiguration {
@Override
public void run() {
ActorRef supervisor = initSupervisor();
try {
spawnWallets(40, false);
System.out.println("Wallet spawning done!");
} catch (Exception e) {
System.out.println("Wallet spawning timed out!");
}
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(2));
randomTransactions(10, 10);
}
@Override
public void onReceive(Object message) {
super.onReceive(message);
}
}
...@@ -87,7 +87,7 @@ public class SuperVisorGuiControlImpl implements SuperVisorGuiControl { ...@@ -87,7 +87,7 @@ public class SuperVisorGuiControlImpl implements SuperVisorGuiControl {
return logActive; return logActive;
} }
public void startAggregation(AggregationMethod method, int exchanges) { public void startAggregation(AggregationMethod method, int exchanges, int aggregation, double exactValue, double secondValue) {
superVisor.startAggregation(method, exchanges); superVisor.startAggregation(method, exchanges, aggregation, exactValue, secondValue);
} }
} }
...@@ -5,6 +5,11 @@ import java.awt.GridLayout; ...@@ -5,6 +5,11 @@ import java.awt.GridLayout;
import java.awt.event.ItemEvent; import java.awt.event.ItemEvent;
import java.awt.event.WindowAdapter; import java.awt.event.WindowAdapter;
import java.awt.event.WindowEvent; import java.awt.event.WindowEvent;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import javax.swing.JButton; import javax.swing.JButton;
import javax.swing.JCheckBox; import javax.swing.JCheckBox;
...@@ -39,6 +44,10 @@ public class SuperVisorThreadGUI { ...@@ -39,6 +44,10 @@ public class SuperVisorThreadGUI {
private JSpinner aggregationExchangesSpinner; private JSpinner aggregationExchangesSpinner;
private AggregationMethod currentMethod;
private int aggregation = 0;
public SuperVisorThreadGUI(SuperVisorGuiControlImpl superVisorGuiControl) { public SuperVisorThreadGUI(SuperVisorGuiControlImpl superVisorGuiControl) {
this.superVisorGuiControl = superVisorGuiControl; this.superVisorGuiControl = superVisorGuiControl;
...@@ -59,15 +68,18 @@ public class SuperVisorThreadGUI { ...@@ -59,15 +68,18 @@ public class SuperVisorThreadGUI {
txtLog.setCellRenderer(new LogCellRenderer()); txtLog.setCellRenderer(new LogCellRenderer());
aggregationMethodsBox = new JComboBox<AggregationMethod>(AggregationMethod.values()); aggregationMethodsBox = new JComboBox<>(AggregationMethod.values());
aggregationExchangesSpinner = new JSpinner(new SpinnerNumberModel(25, 1, 10000, 1)); aggregationExchangesSpinner = new JSpinner(new SpinnerNumberModel(25, 1, 10000, 1));
startAggregationButton = new JButton("Start Aggregation"); startAggregationButton = new JButton("Start Aggregation");
startAggregationButton.addActionListener(e -> { startAggregationButton.addActionListener(e -> {
AggregationMethod method = (AggregationMethod) aggregationMethodsBox.getSelectedItem(); AggregationMethod method = (AggregationMethod) aggregationMethodsBox.getSelectedItem();
int exchanges = (int) aggregationExchangesSpinner.getModel() int exchanges = (int) aggregationExchangesSpinner.getModel()
.getValue(); .getValue();
superVisorGuiControl.startAggregation(method, exchanges);
setAggregation(method);
startAggregation(method, exchanges);
}); });
showDebug = new JCheckBox("Show debug messages in transaction log"); showDebug = new JCheckBox("Show debug messages in transaction log");
...@@ -149,4 +161,59 @@ public class SuperVisorThreadGUI { ...@@ -149,4 +161,59 @@ public class SuperVisorThreadGUI {
txtLog.ensureIndexIsVisible(log.getSize() - 1); txtLog.ensureIndexIsVisible(log.getSize() - 1);
}); });
} }
private void setAggregation(AggregationMethod aggregationMethod) {
if (Objects.isNull(this.currentMethod)) {
this.currentMethod = aggregationMethod;
this.aggregation = 0;
}
if (this.currentMethod.equals(aggregationMethod)) {
this.aggregation++;
} else {
this.aggregation = 0;
}
this.currentMethod = aggregationMethod;
}
private List<Integer> valueList() {
List<Integer> values = new ArrayList<>();
for (int i = 0; i < superVisorGuiControl.getAmountTableModel().getRowCount(); i++) {
values.add((Integer) superVisorGuiControl.getAmountTableModel().getValueAt(i, 2));
}
Collections.sort(values);
return values;
}
private void startAggregation(AggregationMethod method, int exchanges) {
List<Integer> values = valueList();
double exactValue = 0;
double secondValue = Integer.MIN_VALUE-1;
// since values is a sorted list minimum and second smallest element are at the beginning
if (method.equals(AggregationMethod.Minimum)) {
exactValue = values.get(0);
secondValue = values.get(1);
}
if (method.equals(AggregationMethod.Average)) {
exactValue = values.stream().collect(Collectors.averagingInt(Integer::intValue));
}
// since values is a sorted list maximum and second biggest element are at the end
if (method.equals(AggregationMethod.Maximum)) {
exactValue = values.get(values.size() - 1);
secondValue = values.get(values.size() - 2);
}
if(method.equals(AggregationMethod.Count)) {
exactValue = values.size();
}
if(method.equals(AggregationMethod.Sum)) {
exactValue = values.stream()
.collect(Collectors.summingDouble(Integer::intValue));
}
superVisorGuiControl.startAggregation(method, exchanges, this.aggregation, exactValue, secondValue);
}
} }
...@@ -18,6 +18,8 @@ import fucoin.actions.aggregation.AggregationContext; ...@@ -18,6 +18,8 @@ import fucoin.actions.aggregation.AggregationContext;
import fucoin.actions.aggregation.AggregationContext.AggregationContextBuilder; import fucoin.actions.aggregation.AggregationContext.AggregationContextBuilder;
import fucoin.actions.aggregation.AggregationFunction; import fucoin.actions.aggregation.AggregationFunction;
import fucoin.actions.aggregation.AggregationMethod; import fucoin.actions.aggregation.AggregationMethod;
import fucoin.actions.aggregation.statistics.GraphicsConnector;
import fucoin.actions.aggregation.statistics.Statistics;
import fucoin.actions.control.ActionWalletCreationDone; import fucoin.actions.control.ActionWalletCreationDone;
import fucoin.actions.persist.ActionInvokeUpdate; import fucoin.actions.persist.ActionInvokeUpdate;
import fucoin.actions.transaction.ActionGetAmountAnswer; import fucoin.actions.transaction.ActionGetAmountAnswer;
...@@ -40,6 +42,9 @@ public class SuperVisorImpl extends AbstractNode implements TransactionLogger { ...@@ -40,6 +42,9 @@ public class SuperVisorImpl extends AbstractNode implements TransactionLogger {
private final Map<ActorRef, AggregationContext> aggregationResults = new HashMap<>(); private final Map<ActorRef, AggregationContext> aggregationResults = new HashMap<>();
// refactor...
private Statistics statistics = new GraphicsConnector();
public SuperVisorImpl() { public SuperVisorImpl() {
} }
...@@ -178,7 +183,7 @@ public class SuperVisorImpl extends AbstractNode implements TransactionLogger { ...@@ -178,7 +183,7 @@ public class SuperVisorImpl extends AbstractNode implements TransactionLogger {
this.bankCommitObserver = bankCommitObserver; this.bankCommitObserver = bankCommitObserver;
} }
public void startAggregation(AggregationMethod method, int exchanges) { public void startAggregation(AggregationMethod method, int exchanges, int aggregation, double exactValue, double secondValue) {
clearAggregationResults(); clearAggregationResults();
Function<AbstractWallet, double[]> valueExtractor = method.getValueExtractor(); Function<AbstractWallet, double[]> valueExtractor = method.getValueExtractor();
Function<AbstractWallet, double[]> valueAltExtractor = method.getValueAltExtractor(); Function<AbstractWallet, double[]> valueAltExtractor = method.getValueAltExtractor();
...@@ -190,10 +195,17 @@ public class SuperVisorImpl extends AbstractNode implements TransactionLogger { ...@@ -190,10 +195,17 @@ public class SuperVisorImpl extends AbstractNode implements TransactionLogger {
.setValueAltExtractor(valueAltExtractor) .setValueAltExtractor(valueAltExtractor)
.setResultExtractor(resultExtractor) .setResultExtractor(resultExtractor)
.setMaxExchanges(exchanges) .setMaxExchanges(exchanges)
.setAggregation(aggregation)
.build(); .build();
ActorRef randomNode = getRandomNeighbor(); ActorRef randomNode = getRandomNeighbor();
ActionAggregationInit init = new ActionAggregationInit(context, method.isUseValueExtractor()); ActionAggregationInit init = new ActionAggregationInit(context, method.isUseValueExtractor());
randomNode.tell(init, getSelf()); randomNode.tell(init, getSelf());
// mask value
if(secondValue < Integer.MIN_VALUE) {
this.sendInitPacket(method.toString(), exchanges, exactValue);
}
this.sendInitPacket(method.toString(), exchanges, exactValue, secondValue);
} }
/** /**
...@@ -228,4 +240,16 @@ public class SuperVisorImpl extends AbstractNode implements TransactionLogger { ...@@ -228,4 +240,16 @@ public class SuperVisorImpl extends AbstractNode implements TransactionLogger {
gui.updateAggregationValue(wallet.path() gui.updateAggregationValue(wallet.path()
.name(), context.getResult()); .name(), context.getResult());
} }
public void sendValuePacket(String name, double value, int exchange, int aggregation, boolean result) {
this.statistics.valuePacket(name, aggregation, exchange, value, result);
}
public void sendInitPacket(String type, int exchanges, double value) {
this.statistics.initPacket(type, value, exchanges);
}
public void sendInitPacket(String type, int exchanges, double value, double secondValue) {
this.statistics.initPacket(type, value, exchanges, secondValue);
}
} }
...@@ -182,7 +182,8 @@ public abstract class AbstractWallet extends AbstractNode implements Serializabl ...@@ -182,7 +182,8 @@ public abstract class AbstractWallet extends AbstractNode implements Serializabl
overlayNeighbours.add(wallet); overlayNeighbours.add(wallet);
} }
@Override //TODO: ????!???
/*@Override
public ActorRef getRandomNeighbor() { public ActorRef getRandomNeighbor() {
List<ActorRef> neighbors = new ArrayList<>(overlayNeighbours); List<ActorRef> neighbors = new ArrayList<>(overlayNeighbours);
if (getExcludedNeighbors().size() < neighbors.size()) { if (getExcludedNeighbors().size() < neighbors.size()) {
...@@ -191,5 +192,5 @@ public abstract class AbstractWallet extends AbstractNode implements Serializabl ...@@ -191,5 +192,5 @@ public abstract class AbstractWallet extends AbstractNode implements Serializabl
Random rand = new Random(); Random rand = new Random();
int index = rand.nextInt(neighbors.size()); int index = rand.nextInt(neighbors.size());
return neighbors.get(index); return neighbors.get(index);
} }*/
} }
...@@ -10,6 +10,7 @@ import com.google.common.collect.EvictingQueue; ...@@ -10,6 +10,7 @@ import com.google.common.collect.EvictingQueue;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.actor.Props; import akka.actor.Props;
import fucoin.actions.ClientAction; import fucoin.actions.ClientAction;
import fucoin.actions.aggregation.ActionAggregationStatistics;
import fucoin.actions.aggregation.AggregationContext; import fucoin.actions.aggregation.AggregationContext;
import fucoin.actions.aggregation.AggregationContext.AggregationContextBuilder; import fucoin.actions.aggregation.AggregationContext.AggregationContextBuilder;
import fucoin.actions.join.ActionJoin; import fucoin.actions.join.ActionJoin;
...@@ -269,6 +270,7 @@ public class WalletImpl extends AbstractWallet { ...@@ -269,6 +270,7 @@ public class WalletImpl extends AbstractWallet {
public void setAggregationContext(AggregationContext context) { public void setAggregationContext(AggregationContext context) {
System.out.println("Intermediate " + getName() + ": " + context); System.out.println("Intermediate " + getName() + ": " + context);
this.aggregationContext = context; this.aggregationContext = context;
this.getRemoteSuperVisorActor().tell(new ActionAggregationStatistics(getName(), context.getResult(), context.getCurrentExchanges(), context.getAggregation()), this.getSelf());
} }
@Override @Override
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment