From 768b8c185e2ee51419386ad03f72bb7a0832c2f2 Mon Sep 17 00:00:00 2001 From: Kim Kern <kim.kern@fu-berlin.de> Date: Tue, 5 Jul 2016 21:41:52 +0200 Subject: [PATCH] removed concurrent aggregation requests --- src/main/java/fucoin/AbstractNode.java | 35 +++++++++ .../aggregation/ActionAggregation.java | 19 +++-- .../aggregation/ActionAggregationCancel.java | 20 +++++- .../aggregation/ActionAggregationInit.java | 31 ++++++++ .../aggregation/ActionAggregationReply.java | 8 ++- .../aggregation/ActionAggregationRequest.java | 66 +++++++++++++---- .../aggregation/ActionAggregationResult.java | 18 +++-- .../aggregation/AggregationContext.java | 71 ++++++++++++++++--- .../MassWalletConfiguration.java | 4 +- .../fucoin/supervisor/SuperVisorImpl.java | 18 +++-- .../java/fucoin/wallet/AbstractWallet.java | 20 +++++- src/main/java/fucoin/wallet/WalletImpl.java | 14 +++- 12 files changed, 276 insertions(+), 48 deletions(-) create mode 100644 src/main/java/fucoin/actions/aggregation/ActionAggregationInit.java diff --git a/src/main/java/fucoin/AbstractNode.java b/src/main/java/fucoin/AbstractNode.java index 7f2b75d..bdf4df0 100644 --- a/src/main/java/fucoin/AbstractNode.java +++ b/src/main/java/fucoin/AbstractNode.java @@ -2,7 +2,9 @@ package fucoin; import java.io.Serializable; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Random; @@ -14,6 +16,8 @@ import fucoin.wallet.AbstractWallet; public abstract class AbstractNode extends UntypedActor implements Serializable { + private final Collection<ActorRef> excludedNeighbors = new HashSet<>(); + /** * Returns the akka-style address as String, * which could be converted to an ActorRef object later @@ -79,6 +83,9 @@ public abstract class AbstractNode extends UntypedActor implements Serializable */ public ActorRef getRandomNeighbor() { List<ActorRef> neighbors = new ArrayList<>(knownNeighbors.values()); + if (excludedNeighbors.size() < neighbors.size()) { + neighbors.removeAll(excludedNeighbors); + } Random rand = new Random(); int index = rand.nextInt(neighbors.size()); return neighbors.get(index); @@ -87,4 +94,32 @@ public abstract class AbstractNode extends UntypedActor implements Serializable public void log(String string) { System.out.println(getSelf().path().name() + ": " + string); } + + /** + * If a neighbor should be excluded from the random pick, you can add it + * here. + * + * @param sender + * excluded neighbor + */ + public void addExcludedNeighbor(ActorRef sender) { + excludedNeighbors.add(sender); + } + + /** + * After calling this method all known neighbors can possibly be returned by + * getRandomNeighbor(). + */ + public void clearExcludedNeighbors() { + excludedNeighbors.clear(); + } + + /** + * Returns true, if all neighbors are excluded. + * + * @return + */ + public boolean areAllNeighborsExcluded() { + return excludedNeighbors.size() >= knownNeighbors.size(); + } } \ No newline at end of file diff --git a/src/main/java/fucoin/actions/aggregation/ActionAggregation.java b/src/main/java/fucoin/actions/aggregation/ActionAggregation.java index 90ec575..5624ff7 100644 --- a/src/main/java/fucoin/actions/aggregation/ActionAggregation.java +++ b/src/main/java/fucoin/actions/aggregation/ActionAggregation.java @@ -28,20 +28,22 @@ public abstract class ActionAggregation extends ClientAction { */ public void continueAggregation(AbstractWallet wallet) { AggregationContext aggregationContext = wallet.getAggregationContext(); - if (!aggregationContext.isDone()) { + if (!wallet.hasAggregationContext()) { + // do not continue + } else if (!aggregationContext.isDone()) { sendSleepingRequest(wallet, aggregationContext); } else { - System.out.println(aggregationContext); + wallet.clearExcludedNeighbors(); sendAggregatedResult(wallet, aggregationContext); } } - private void sendAggregatedResult(AbstractWallet wallet, AggregationContext aggregationContext) { + protected void sendAggregatedResult(AbstractWallet wallet, AggregationContext aggregationContext) { + wallet.addTransactionLogMessageSuccess(wallet.getName() + ": " + aggregationContext.toString()); ActionAggregationResult result = new ActionAggregationResult(aggregationContext); wallet.getRemoteSuperVisorActor() .tell(result, wallet.getSelf()); - wallet.removeAggregationContext(); } private void sendSleepingRequest(AbstractWallet wallet, AggregationContext aggregationContext) { @@ -50,11 +52,14 @@ public abstract class ActionAggregation extends ClientAction { @Override public void run() { int wait = new Random().nextInt(500) + 10; + wait = 0; try { Thread.sleep(wait); - ActionAggregationRequest request = new ActionAggregationRequest(aggregationContext); - randomNeighbor.tell(request, wallet.getSelf()); - wallet.setPendingAggregationRequest(true); + if (!wallet.hasPendingAggregationRequest()) { + wallet.setPendingAggregationRequest(true); + ActionAggregationRequest request = new ActionAggregationRequest(aggregationContext); + randomNeighbor.tell(request, wallet.getSelf()); + } } catch (InterruptedException ex) { Thread.currentThread() .interrupt(); diff --git a/src/main/java/fucoin/actions/aggregation/ActionAggregationCancel.java b/src/main/java/fucoin/actions/aggregation/ActionAggregationCancel.java index 2930ef3..e990155 100644 --- a/src/main/java/fucoin/actions/aggregation/ActionAggregationCancel.java +++ b/src/main/java/fucoin/actions/aggregation/ActionAggregationCancel.java @@ -14,15 +14,33 @@ import fucoin.wallet.AbstractWallet; public class ActionAggregationCancel extends ActionAggregation { private static final long serialVersionUID = -7416863664917157007L; + private final boolean isDone; public ActionAggregationCancel() { super(null); + this.isDone = false; + } + + public ActionAggregationCancel(boolean isDone) { + super(null); + this.isDone = isDone; } @Override protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, AbstractWallet wallet) { wallet.setPendingAggregationRequest(false); - continueAggregation(wallet); + if (isDone()) { + wallet.addExcludedNeighbor(sender); + } + if (!wallet.areAllNeighborsExcluded()) { + continueAggregation(wallet); + } else if (wallet.hasAggregationContext()) { + sendAggregatedResult(wallet, wallet.getAggregationContext()); + } + } + + public boolean isDone() { + return isDone; } } diff --git a/src/main/java/fucoin/actions/aggregation/ActionAggregationInit.java b/src/main/java/fucoin/actions/aggregation/ActionAggregationInit.java new file mode 100644 index 0000000..e5d7014 --- /dev/null +++ b/src/main/java/fucoin/actions/aggregation/ActionAggregationInit.java @@ -0,0 +1,31 @@ +/** + * + */ +package fucoin.actions.aggregation; + +import akka.actor.ActorRef; +import akka.actor.UntypedActorContext; +import fucoin.wallet.AbstractWallet; + +/** + * @author Kim + * + */ +public class ActionAggregationInit extends ActionAggregation { + + public ActionAggregationInit(AggregationContext context) { + super(context); + } + + private static final long serialVersionUID = -7416863664917157007L; + + @Override + protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, AbstractWallet wallet) { + AggregationContext initContext = getContext().initContext(wallet); + wallet.setAggregationContext(initContext); + continueAggregation(wallet); + } + + + +} diff --git a/src/main/java/fucoin/actions/aggregation/ActionAggregationReply.java b/src/main/java/fucoin/actions/aggregation/ActionAggregationReply.java index 30bbebc..01e892d 100644 --- a/src/main/java/fucoin/actions/aggregation/ActionAggregationReply.java +++ b/src/main/java/fucoin/actions/aggregation/ActionAggregationReply.java @@ -21,7 +21,13 @@ public class ActionAggregationReply extends ActionAggregation { @Override protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, AbstractWallet wallet) { - wallet.setAggregationContext(getContext()); + AggregationContext neighborContext = getContext(); + AggregationContext myContext = wallet.getAggregationContext(); + + if (myContext != null) { + AggregationContext aggregatedContext = myContext.aggregate(neighborContext); + wallet.setAggregationContext(aggregatedContext); + } wallet.setPendingAggregationRequest(false); } diff --git a/src/main/java/fucoin/actions/aggregation/ActionAggregationRequest.java b/src/main/java/fucoin/actions/aggregation/ActionAggregationRequest.java index 75f4d16..5fde093 100644 --- a/src/main/java/fucoin/actions/aggregation/ActionAggregationRequest.java +++ b/src/main/java/fucoin/actions/aggregation/ActionAggregationRequest.java @@ -3,6 +3,8 @@ */ package fucoin.actions.aggregation; +import java.util.UUID; + import akka.actor.ActorRef; import akka.actor.UntypedActorContext; import fucoin.wallet.AbstractWallet; @@ -21,32 +23,57 @@ public class ActionAggregationRequest extends ActionAggregation { @Override protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, AbstractWallet wallet) { - if (wallet.hasPendingAggregationRequest()) { - sendCancel(sender, self); + if (wallet.hasPendingAggregationRequest() || isDone(wallet)) { + sendCancel(sender, self, isDone(wallet)); } else { - replyAggregatedContext(sender, wallet); + sendReply(sender, wallet); continueAggregation(wallet); } } - private void sendCancel(ActorRef wallet, ActorRef self) { - ActionAggregationCancel cancel = new ActionAggregationCancel(); + private void sendCancel(ActorRef wallet, ActorRef self, boolean isDone) { + ActionAggregationCancel cancel = new ActionAggregationCancel(isDone); wallet.tell(cancel, self); } - private void replyAggregatedContext(ActorRef sender, AbstractWallet wallet) { - boolean isFirstRequest = !wallet.hasAggregationContext(); + private void sendReply(ActorRef sender, AbstractWallet wallet) { + boolean isFirstRequest = !wallet.hasAggregationContext() || !hasSameUuid(wallet); if (isFirstRequest) { initContext(wallet); } - AggregationContext aggregatedContext = wallet.getAggregationContext() - .aggregate(getContext()); - wallet.setAggregationContext(aggregatedContext); - ActionAggregationReply reply = new ActionAggregationReply(aggregatedContext); - sender.tell(reply, wallet.getSelf()); + sendAggregatedContext(sender, wallet); if (isFirstRequest) { - continueAggregation(wallet); + // continueAggregation(wallet); + } + } + + private void sendAggregatedContext(ActorRef sender, AbstractWallet wallet) { + AggregationContext myContext = wallet.getAggregationContext(); + AggregationContext neighborContext = getContext(); + AggregationContext aggregatedContext = myContext.aggregate(neighborContext); + ActionAggregationReply reply = new ActionAggregationReply(myContext); + sender.tell(reply, wallet.getSelf()); + wallet.setAggregationContext(aggregatedContext); + } + + /** + * Returns true, if the stored aggregationContext has the same UUID as the + * one from the request. + * + * @param wallet + * @return + */ + private boolean hasSameUuid(AbstractWallet wallet) { + boolean isSame; + if (wallet.hasAggregationContext()) { + UUID requestUuid = wallet.getAggregationContext() + .getUuid(); + UUID myUuid = getContext().getUuid(); + isSame = requestUuid.equals(myUuid); + } else { + return false; } + return isSame; } private void initContext(AbstractWallet wallet) { @@ -54,4 +81,17 @@ public class ActionAggregationRequest extends ActionAggregation { wallet.setAggregationContext(aggregationContext); } + private boolean isDone(AbstractWallet wallet) { + boolean isDone; + AggregationContext aggregationContext = wallet.getAggregationContext(); + + if (wallet.hasAggregationContext() && aggregationContext.isDone() && hasSameUuid(wallet)) { + isDone = true; + } else { + isDone = false; + } + + return isDone; + } + } diff --git a/src/main/java/fucoin/actions/aggregation/ActionAggregationResult.java b/src/main/java/fucoin/actions/aggregation/ActionAggregationResult.java index cd389e0..8375a48 100644 --- a/src/main/java/fucoin/actions/aggregation/ActionAggregationResult.java +++ b/src/main/java/fucoin/actions/aggregation/ActionAggregationResult.java @@ -5,24 +5,30 @@ package fucoin.actions.aggregation; import akka.actor.ActorRef; import akka.actor.UntypedActorContext; -import fucoin.wallet.AbstractWallet; +import fucoin.actions.transaction.SuperVisorAction; +import fucoin.supervisor.SuperVisorImpl; /** * @author Kim * */ -public class ActionAggregationResult extends ActionAggregation { +public class ActionAggregationResult extends SuperVisorAction { private static final long serialVersionUID = 2937612747233988421L; + private final AggregationContext context; public ActionAggregationResult(AggregationContext context) { - super(context); + this.context = context; + } @Override - protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, AbstractWallet wallet) { - // TODO Kim update GUI - System.out.println(context); + protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, SuperVisorImpl superVisor) { + // TODO add to supervisor result list + } + + public AggregationContext getContext() { + return context; } } diff --git a/src/main/java/fucoin/actions/aggregation/AggregationContext.java b/src/main/java/fucoin/actions/aggregation/AggregationContext.java index 86bde3d..67e848b 100644 --- a/src/main/java/fucoin/actions/aggregation/AggregationContext.java +++ b/src/main/java/fucoin/actions/aggregation/AggregationContext.java @@ -4,6 +4,7 @@ package fucoin.actions.aggregation; import java.io.Serializable; +import java.util.UUID; import java.util.function.Function; import fucoin.wallet.AbstractWallet; @@ -19,25 +20,37 @@ public class AggregationContext implements Serializable { private final Function<AbstractWallet, Double> valueExtractor; private final int currentExchanges; private final int maxExchanges; + private final UUID uuid; - public AggregationContext(AggregationFunction function, Function<AbstractWallet, Double> valueExtractor, - double value, int currentExchanges, int maxExchanges) { + private AggregationContext(AggregationFunction function, Function<AbstractWallet, Double> valueExtractor, + double value, int currentExchanges, int maxExchanges, UUID uuid) { this.function = function; this.valueExtractor = valueExtractor; this.value = value; this.currentExchanges = currentExchanges; this.maxExchanges = maxExchanges; + this.uuid = uuid; } - public AggregationContext aggregate(AggregationContext context) { - double aggregatedValue = getFunction().apply(context.getValue(), getValue()); - return new AggregationContext(getFunction(), getValueExtractor(), aggregatedValue, getCurrentExchanges() + 1, - getMaxExchanges()); + public AggregationContext aggregate(AggregationContext neighborContext) { + double aggregatedValue = getFunction().apply(neighborContext.getValue(), getValue()); + AggregationContext aggregatedContext = new AggregationContextBuilder(getFunction(), getValueExtractor(), + aggregatedValue) + .setCurrentExchanges(getCurrentExchanges() + 1) + .setMaxExchanges(getMaxExchanges()) + .setUuid(getUuid()) + .build(); + System.out.println("Aggregation: " + getValue() + " + " + neighborContext.getValue() + " = " + aggregatedValue); + return aggregatedContext; } public AggregationContext initContext(AbstractWallet wallet) { double initValue = getValueExtractor().apply(wallet); - return new AggregationContext(getFunction(), getValueExtractor(), initValue, 0, getMaxExchanges()); + AggregationContext initContext = new AggregationContextBuilder(getFunction(), getValueExtractor(), initValue) + .setMaxExchanges(getMaxExchanges()) + .setUuid(getUuid()) + .build(); + return initContext; } public boolean isDone() { @@ -51,7 +64,7 @@ public class AggregationContext implements Serializable { @Override public String toString() { - return "The aggregated value is " + getValue() + "after " + getCurrentExchanges() + "aggregations."; + return "The aggregated value is " + getValue() + " after " + getCurrentExchanges() + " aggregations."; } @@ -71,5 +84,47 @@ public class AggregationContext implements Serializable { return maxExchanges; } + public UUID getUuid() { + return uuid; + } + + public static class AggregationContextBuilder { + private final AggregationFunction function; + private final double value; + private final Function<AbstractWallet, Double> valueExtractor; + private int currentExchanges; + private int maxExchanges; + private UUID uuid; + + public AggregationContextBuilder(AggregationFunction function, Function<AbstractWallet, Double> valueExtractor, + double value) { + this.function = function; + this.valueExtractor = valueExtractor; + this.value = value; + this.currentExchanges = 0; + this.maxExchanges = 20; + this.uuid = UUID.randomUUID(); + } + + public AggregationContextBuilder setCurrentExchanges(int currentExchanges) { + this.currentExchanges = currentExchanges; + return this; + } + + public AggregationContextBuilder setMaxExchanges(int maxExchanges) { + this.maxExchanges = maxExchanges; + return this; + } + + public AggregationContextBuilder setUuid(UUID uuid) { + this.uuid = uuid; + return this; + } + + public AggregationContext build() { + return new AggregationContext(function, valueExtractor, value, currentExchanges, maxExchanges, uuid); + } + + } } diff --git a/src/main/java/fucoin/configurations/MassWalletConfiguration.java b/src/main/java/fucoin/configurations/MassWalletConfiguration.java index aed13f0..ac6f08b 100644 --- a/src/main/java/fucoin/configurations/MassWalletConfiguration.java +++ b/src/main/java/fucoin/configurations/MassWalletConfiguration.java @@ -12,13 +12,13 @@ public class MassWalletConfiguration extends AbstractConfiguration { public void run() { ActorRef supervisor = initSupervisor(); try { - spawnWallets(10, false); + spawnWallets(3, false); System.out.println("Wallet spawning done!"); } catch (Exception e) { System.out.println("Wallet spawning timed out!"); } - randomTransactions(10, 4); + randomTransactions(3, 3); } @Override diff --git a/src/main/java/fucoin/supervisor/SuperVisorImpl.java b/src/main/java/fucoin/supervisor/SuperVisorImpl.java index 5f64d11..c3b165d 100644 --- a/src/main/java/fucoin/supervisor/SuperVisorImpl.java +++ b/src/main/java/fucoin/supervisor/SuperVisorImpl.java @@ -5,6 +5,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.function.Function; import javax.swing.SwingUtilities; @@ -12,8 +13,10 @@ import akka.actor.ActorRef; import akka.actor.Props; import fucoin.AbstractNode; import fucoin.actions.Action; -import fucoin.actions.aggregation.ActionAggregationRequest; +import fucoin.actions.aggregation.ActionAggregationInit; import fucoin.actions.aggregation.AggregationContext; +import fucoin.actions.aggregation.AggregationContext.AggregationContextBuilder; +import fucoin.actions.aggregation.AggregationFunction; import fucoin.actions.control.ActionWalletCreationDone; import fucoin.actions.persist.ActionInvokeUpdate; import fucoin.actions.transaction.ActionGetAmountAnswer; @@ -21,6 +24,7 @@ import fucoin.actions.transaction.ActionNotifyObserver; import fucoin.actions.transaction.SuperVisorAction; import fucoin.gui.SuperVisorGuiControl; import fucoin.gui.TransactionLogger; +import fucoin.wallet.AbstractWallet; public class SuperVisorImpl extends AbstractNode implements TransactionLogger { @@ -172,11 +176,13 @@ public class SuperVisorImpl extends AbstractNode implements TransactionLogger { } public void startAggregation() { - AggregationContext context = new AggregationContext((x, y) -> Math.max(x, y), x -> new Double(x.getAmount()), 0, - 0, 20); + Function<AbstractWallet, Double> valueExtractor = x -> new Double(x.getAmount()); + AggregationFunction function = (x, y) -> (x + y) / 2; + AggregationContext context = new AggregationContextBuilder(function, valueExtractor, 0) + .setMaxExchanges(5) + .build(); ActorRef randomNode = getRandomNeighbor(); - ActionAggregationRequest request = new ActionAggregationRequest(context); - randomNode.tell(request, getSelf()); - System.out.println("init"); + ActionAggregationInit init = new ActionAggregationInit(context); + randomNode.tell(init, getSelf()); } } diff --git a/src/main/java/fucoin/wallet/AbstractWallet.java b/src/main/java/fucoin/wallet/AbstractWallet.java index d82b8a8..762d0e7 100644 --- a/src/main/java/fucoin/wallet/AbstractWallet.java +++ b/src/main/java/fucoin/wallet/AbstractWallet.java @@ -1,6 +1,7 @@ package fucoin.wallet; import java.io.Serializable; +import java.util.UUID; import akka.actor.ActorRef; import fucoin.AbstractNode; @@ -154,7 +155,22 @@ public abstract class AbstractWallet extends AbstractNode implements Serializabl public abstract void send(String address, int amount, ActorRef observer); /** - * TODO Kommentar Kim + * Adds the aggregation request with the id {@code uuid} to the list of + * handled requests. + * + * @param uuid + * request id + */ + public abstract void addAggregationRequest(UUID uuid); + + /** + * Returns true, if the aggregation request with the id {@code uuid} has + * already been handled. + * + * @param uuid + * request id + * @return isHandldedRequest */ - public abstract void removeAggregationContext(); + public abstract boolean isHandledRequest(UUID uuid); + } diff --git a/src/main/java/fucoin/wallet/WalletImpl.java b/src/main/java/fucoin/wallet/WalletImpl.java index 42900cf..42caa83 100644 --- a/src/main/java/fucoin/wallet/WalletImpl.java +++ b/src/main/java/fucoin/wallet/WalletImpl.java @@ -2,8 +2,11 @@ package fucoin.wallet; import static akka.dispatch.Futures.future; +import java.util.UUID; import java.util.concurrent.ConcurrentLinkedQueue; +import com.google.common.collect.EvictingQueue; + import akka.actor.ActorRef; import akka.actor.Props; import fucoin.actions.ClientAction; @@ -29,6 +32,7 @@ public class WalletImpl extends AbstractWallet { private AggregationContext aggregationContext; private boolean hasPendingAggregationRequest = false; private ConcurrentLinkedQueue<ActorRef> deferedSupervisorReceivers = new ConcurrentLinkedQueue<>(); + private final EvictingQueue<UUID> handledAggregationRequests = EvictingQueue.create(10); public WalletImpl(String name) { super(name); @@ -261,6 +265,7 @@ public class WalletImpl extends AbstractWallet { @Override public void setAggregationContext(AggregationContext context) { + System.out.println("Intermediate " + getName() + ": " + context); this.aggregationContext = context; } @@ -275,8 +280,13 @@ public class WalletImpl extends AbstractWallet { } @Override - public void removeAggregationContext() { - this.aggregationContext = null; + public void addAggregationRequest(UUID uuid) { + this.handledAggregationRequests.add(uuid); + } + + @Override + public boolean isHandledRequest(UUID uuid) { + return this.handledAggregationRequests.contains(uuid); } } -- GitLab