diff --git a/src/main/java/fucoin/AbstractNode.java b/src/main/java/fucoin/AbstractNode.java index bdf4df051dd4d91428f4bdfbb78aa73311ee7cd2..1ab730ee48e17f04a0aa09e65b64d0c781cd45f1 100644 --- a/src/main/java/fucoin/AbstractNode.java +++ b/src/main/java/fucoin/AbstractNode.java @@ -17,6 +17,9 @@ import fucoin.wallet.AbstractWallet; public abstract class AbstractNode extends UntypedActor implements Serializable { private final Collection<ActorRef> excludedNeighbors = new HashSet<>(); + private boolean firstGetNextNeighbor = true; + private List<ActorRef> toVisitNeighbors = new ArrayList<>(); + private int indexNeighbor = 0; /** * Returns the akka-style address as String, @@ -90,6 +93,25 @@ public abstract class AbstractNode extends UntypedActor implements Serializable int index = rand.nextInt(neighbors.size()); return neighbors.get(index); } + + public ActorRef getNextNeighbor(int maxNeighbors) { + System.out.println("****************** known:"+ knownNeighbors.values()); + if (firstGetNextNeighbor) { + + firstGetNextNeighbor = false; + for (ActorRef neighbor : knownNeighbors.values()) + if (toVisitNeighbors.size() < maxNeighbors && + !toVisitNeighbors.contains(neighbor)) + toVisitNeighbors.add(neighbor); + } + if (indexNeighbor == maxNeighbors-1){ + indexNeighbor = 0; + } + for (ActorRef n : toVisitNeighbors){ + System.out.println("------------------------ Visit: "+n); + } + return toVisitNeighbors.get(indexNeighbor++); + } public void log(String string) { System.out.println(getSelf().path().name() + ": " + string); diff --git a/src/main/java/fucoin/actions/aggregation/ActionAggregation.java b/src/main/java/fucoin/actions/aggregation/ActionAggregation.java index 723768db906a8a74e3ecb6ce8aa172bf7043d866..fbeed2df8d022396ef87603aa8387cf6c40a2915 100644 --- a/src/main/java/fucoin/actions/aggregation/ActionAggregation.java +++ b/src/main/java/fucoin/actions/aggregation/ActionAggregation.java @@ -54,7 +54,7 @@ public abstract class ActionAggregation extends ClientAction { } private void sendSleepingRequest(AbstractWallet wallet, AggregationContext aggregationContext) { - ActorRef randomNeighbor = wallet.getRandomNeighbor(); + ActorRef nextNeighbor = wallet.getNextNeighbor(aggregationContext.getMaxNeighbors()); Thread sleepingRequest = new Thread() { @Override public void run() { @@ -65,7 +65,7 @@ public abstract class ActionAggregation extends ClientAction { if (!wallet.hasPendingAggregationRequest()) { wallet.setPendingAggregationRequest(true); ActionAggregationRequest request = new ActionAggregationRequest(aggregationContext); - randomNeighbor.tell(request, wallet.getSelf()); + nextNeighbor.tell(request, wallet.getSelf()); } } catch (InterruptedException ex) { Thread.currentThread() diff --git a/src/main/java/fucoin/actions/aggregation/ActionAggregationCancel.java b/src/main/java/fucoin/actions/aggregation/ActionAggregationCancel.java index f0b831bc024ffe071d7b60000f3b5c48b3a60f0c..49e4478508b5ed7313d5ac019736c43401ce7d81 100644 --- a/src/main/java/fucoin/actions/aggregation/ActionAggregationCancel.java +++ b/src/main/java/fucoin/actions/aggregation/ActionAggregationCancel.java @@ -32,14 +32,10 @@ public class ActionAggregationCancel extends ActionAggregation { if (isDone()) { wallet.addExcludedNeighbor(sender); } - AggregationContext aggregationContext = wallet.getAggregationContext(); - if (!wallet.areAllNeighborsExcluded() && !aggregationContext - .isDone()) { + if (!wallet.areAllNeighborsExcluded()) { continueAggregation(wallet); - } else if (aggregationContext.isDone() && !wallet.areAllNeighborsExcluded()) { - sendEndMessage(wallet); } else if (wallet.hasAggregationContext()) { - sendAggregatedResult(wallet, aggregationContext); + sendAggregatedResult(wallet, wallet.getAggregationContext()); sendEndMessage(wallet); } } diff --git a/src/main/java/fucoin/actions/aggregation/AggregationContext.java b/src/main/java/fucoin/actions/aggregation/AggregationContext.java index 67e848b2c5a267636a9c187b036c85d2a05c6cb0..273c7bd705b350dfb6beebd922cf4f58213fb741 100644 --- a/src/main/java/fucoin/actions/aggregation/AggregationContext.java +++ b/src/main/java/fucoin/actions/aggregation/AggregationContext.java @@ -20,15 +20,20 @@ public class AggregationContext implements Serializable { private final Function<AbstractWallet, Double> valueExtractor; private final int currentExchanges; private final int maxExchanges; + private final int currentNeighbors; + private final int maxNeighbors; private final UUID uuid; private AggregationContext(AggregationFunction function, Function<AbstractWallet, Double> valueExtractor, - double value, int currentExchanges, int maxExchanges, UUID uuid) { + double value, int currentExchanges, int maxExchanges, int currentNeighbors, + int maxNeighbors, UUID uuid) { this.function = function; this.valueExtractor = valueExtractor; this.value = value; this.currentExchanges = currentExchanges; this.maxExchanges = maxExchanges; + this.currentNeighbors = currentNeighbors; + this.maxNeighbors = maxNeighbors; this.uuid = uuid; } @@ -38,6 +43,8 @@ public class AggregationContext implements Serializable { aggregatedValue) .setCurrentExchanges(getCurrentExchanges() + 1) .setMaxExchanges(getMaxExchanges()) + .setCurrentNeighbors(getCurrentNeighbors() + 1) + .setMaxNeighbors(getMaxNeighbors()) .setUuid(getUuid()) .build(); System.out.println("Aggregation: " + getValue() + " + " + neighborContext.getValue() + " = " + aggregatedValue); @@ -48,6 +55,7 @@ public class AggregationContext implements Serializable { double initValue = getValueExtractor().apply(wallet); AggregationContext initContext = new AggregationContextBuilder(getFunction(), getValueExtractor(), initValue) .setMaxExchanges(getMaxExchanges()) + .setMaxNeighbors(getMaxNeighbors()) .setUuid(getUuid()) .build(); return initContext; @@ -83,6 +91,14 @@ public class AggregationContext implements Serializable { private int getMaxExchanges() { return maxExchanges; } + + int getCurrentNeighbors() { + return currentNeighbors; + } + + int getMaxNeighbors() { + return maxNeighbors; + } public UUID getUuid() { return uuid; @@ -94,7 +110,10 @@ public class AggregationContext implements Serializable { private final Function<AbstractWallet, Double> valueExtractor; private int currentExchanges; private int maxExchanges; + private int currentNeighbors; + private int maxNeighbors; private UUID uuid; + public AggregationContextBuilder(AggregationFunction function, Function<AbstractWallet, Double> valueExtractor, double value) { @@ -103,6 +122,8 @@ public class AggregationContext implements Serializable { this.value = value; this.currentExchanges = 0; this.maxExchanges = 20; + this.currentNeighbors = 0; + this.maxNeighbors = 0; this.uuid = UUID.randomUUID(); } @@ -115,6 +136,16 @@ public class AggregationContext implements Serializable { this.maxExchanges = maxExchanges; return this; } + + public AggregationContextBuilder setCurrentNeighbors(int currentNeighbors) { + this.currentNeighbors = currentNeighbors; + return this; + } + + public AggregationContextBuilder setMaxNeighbors(int maxNeighbors) { + this.maxNeighbors = maxNeighbors; + return this; + } public AggregationContextBuilder setUuid(UUID uuid) { this.uuid = uuid; @@ -122,7 +153,8 @@ public class AggregationContext implements Serializable { } public AggregationContext build() { - return new AggregationContext(function, valueExtractor, value, currentExchanges, maxExchanges, uuid); + return new AggregationContext(function, valueExtractor, value, currentExchanges, + maxExchanges, currentNeighbors, maxNeighbors, uuid); } } diff --git a/src/main/java/fucoin/gui/SuperVisorGuiControlImpl.java b/src/main/java/fucoin/gui/SuperVisorGuiControlImpl.java index 46d8b9a2f130c7af9777a1366309d65c7541f321..626df0233be03747c4e356ea94c4980c72d33b4a 100644 --- a/src/main/java/fucoin/gui/SuperVisorGuiControlImpl.java +++ b/src/main/java/fucoin/gui/SuperVisorGuiControlImpl.java @@ -87,7 +87,7 @@ public class SuperVisorGuiControlImpl implements SuperVisorGuiControl { return logActive; } - public void startAggregation(AggregationMethod method, int exchanges) { - superVisor.startAggregation(method, exchanges); + public void startAggregation(AggregationMethod method, int exchanges, int neighbours) { + superVisor.startAggregation(method, exchanges, neighbours); } } diff --git a/src/main/java/fucoin/gui/SuperVisorThreadGUI.java b/src/main/java/fucoin/gui/SuperVisorThreadGUI.java index a722a3f49c305b2e0a2c745a20d4ca8143ea4e1b..8bb20b7478047061d267005edb48956f598f0fb7 100644 --- a/src/main/java/fucoin/gui/SuperVisorThreadGUI.java +++ b/src/main/java/fucoin/gui/SuperVisorThreadGUI.java @@ -38,6 +38,8 @@ public class SuperVisorThreadGUI { private JComboBox<AggregationMethod> aggregationMethodsBox; private JSpinner aggregationExchangesSpinner; + + private JSpinner aggregationNeighboursSpinner; public SuperVisorThreadGUI(SuperVisorGuiControlImpl superVisorGuiControl) { @@ -61,15 +63,18 @@ public class SuperVisorThreadGUI { aggregationMethodsBox = new JComboBox<AggregationMethod>(AggregationMethod.values()); aggregationExchangesSpinner = new JSpinner(new SpinnerNumberModel(25, 1, 10000, 1)); + aggregationNeighboursSpinner = new JSpinner(new SpinnerNumberModel(10, 1, 10000, 1)); startAggregationButton = new JButton("Start Aggregation"); startAggregationButton.addActionListener(e -> { AggregationMethod method = (AggregationMethod) aggregationMethodsBox.getSelectedItem(); int exchanges = (int) aggregationExchangesSpinner.getModel() .getValue(); - superVisorGuiControl.startAggregation(method, exchanges); + int neighbours = (int) aggregationNeighboursSpinner.getModel() + .getValue(); + superVisorGuiControl.startAggregation(method, exchanges, neighbours); }); - + showDebug = new JCheckBox("Show debug messages in transaction log"); showDebug.setSelected(true); showDebug.addItemListener(e -> { @@ -101,6 +106,7 @@ public class SuperVisorThreadGUI { logConfigPanel.add(activateLogging); logConfigPanel.add(showDebug); aggregationPanel.add(aggregationExchangesSpinner); + aggregationPanel.add(aggregationNeighboursSpinner); aggregationPanel.add(aggregationMethodsBox); aggregationPanel.add(startAggregationButton); diff --git a/src/main/java/fucoin/supervisor/SuperVisorImpl.java b/src/main/java/fucoin/supervisor/SuperVisorImpl.java index 86ffa7a03e9c3ffe2f61d89f9a84dc903f1fcc1b..da7803575a530148745ab142003037803f8d3956 100644 --- a/src/main/java/fucoin/supervisor/SuperVisorImpl.java +++ b/src/main/java/fucoin/supervisor/SuperVisorImpl.java @@ -178,13 +178,14 @@ public class SuperVisorImpl extends AbstractNode implements TransactionLogger { this.bankCommitObserver = bankCommitObserver; } - public void startAggregation(AggregationMethod method, int exchanges) { + public void startAggregation(AggregationMethod method, int exchanges, int neighbors) { clearAggregationResults(); Function<AbstractWallet, Double> valueExtractor = method.getValueExtractor(); AggregationFunction function = method.getFunction(); double value = method.getValue(); AggregationContext context = new AggregationContextBuilder(function, valueExtractor, value) .setMaxExchanges(exchanges) + .setMaxNeighbors(neighbors) .build(); ActorRef randomNode = getRandomNeighbor(); ActionAggregationInit init = new ActionAggregationInit(context, method.isUseValueExtractor());