Skip to content
Snippets Groups Projects
Commit 768b8c18 authored by Kim Kern's avatar Kim Kern
Browse files

removed concurrent aggregation requests

parent 1b8e451b
Branches
No related tags found
No related merge requests found
Showing
with 276 additions and 48 deletions
......@@ -2,7 +2,9 @@ package fucoin;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
......@@ -14,6 +16,8 @@ import fucoin.wallet.AbstractWallet;
public abstract class AbstractNode extends UntypedActor implements Serializable {
private final Collection<ActorRef> excludedNeighbors = new HashSet<>();
/**
* Returns the akka-style address as String,
* which could be converted to an ActorRef object later
......@@ -79,6 +83,9 @@ public abstract class AbstractNode extends UntypedActor implements Serializable
*/
public ActorRef getRandomNeighbor() {
List<ActorRef> neighbors = new ArrayList<>(knownNeighbors.values());
if (excludedNeighbors.size() < neighbors.size()) {
neighbors.removeAll(excludedNeighbors);
}
Random rand = new Random();
int index = rand.nextInt(neighbors.size());
return neighbors.get(index);
......@@ -87,4 +94,32 @@ public abstract class AbstractNode extends UntypedActor implements Serializable
public void log(String string) {
System.out.println(getSelf().path().name() + ": " + string);
}
/**
* If a neighbor should be excluded from the random pick, you can add it
* here.
*
* @param sender
* excluded neighbor
*/
public void addExcludedNeighbor(ActorRef sender) {
excludedNeighbors.add(sender);
}
/**
* After calling this method all known neighbors can possibly be returned by
* getRandomNeighbor().
*/
public void clearExcludedNeighbors() {
excludedNeighbors.clear();
}
/**
* Returns true, if all neighbors are excluded.
*
* @return
*/
public boolean areAllNeighborsExcluded() {
return excludedNeighbors.size() >= knownNeighbors.size();
}
}
\ No newline at end of file
......@@ -28,20 +28,22 @@ public abstract class ActionAggregation extends ClientAction {
*/
public void continueAggregation(AbstractWallet wallet) {
AggregationContext aggregationContext = wallet.getAggregationContext();
if (!aggregationContext.isDone()) {
if (!wallet.hasAggregationContext()) {
// do not continue
} else if (!aggregationContext.isDone()) {
sendSleepingRequest(wallet, aggregationContext);
} else {
System.out.println(aggregationContext);
wallet.clearExcludedNeighbors();
sendAggregatedResult(wallet, aggregationContext);
}
}
private void sendAggregatedResult(AbstractWallet wallet, AggregationContext aggregationContext) {
protected void sendAggregatedResult(AbstractWallet wallet, AggregationContext aggregationContext) {
wallet.addTransactionLogMessageSuccess(wallet.getName() + ": " + aggregationContext.toString());
ActionAggregationResult result = new ActionAggregationResult(aggregationContext);
wallet.getRemoteSuperVisorActor()
.tell(result, wallet.getSelf());
wallet.removeAggregationContext();
}
private void sendSleepingRequest(AbstractWallet wallet, AggregationContext aggregationContext) {
......@@ -50,11 +52,14 @@ public abstract class ActionAggregation extends ClientAction {
@Override
public void run() {
int wait = new Random().nextInt(500) + 10;
wait = 0;
try {
Thread.sleep(wait);
ActionAggregationRequest request = new ActionAggregationRequest(aggregationContext);
randomNeighbor.tell(request, wallet.getSelf());
wallet.setPendingAggregationRequest(true);
if (!wallet.hasPendingAggregationRequest()) {
wallet.setPendingAggregationRequest(true);
ActionAggregationRequest request = new ActionAggregationRequest(aggregationContext);
randomNeighbor.tell(request, wallet.getSelf());
}
} catch (InterruptedException ex) {
Thread.currentThread()
.interrupt();
......
......@@ -14,15 +14,33 @@ import fucoin.wallet.AbstractWallet;
public class ActionAggregationCancel extends ActionAggregation {
private static final long serialVersionUID = -7416863664917157007L;
private final boolean isDone;
public ActionAggregationCancel() {
super(null);
this.isDone = false;
}
public ActionAggregationCancel(boolean isDone) {
super(null);
this.isDone = isDone;
}
@Override
protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, AbstractWallet wallet) {
wallet.setPendingAggregationRequest(false);
continueAggregation(wallet);
if (isDone()) {
wallet.addExcludedNeighbor(sender);
}
if (!wallet.areAllNeighborsExcluded()) {
continueAggregation(wallet);
} else if (wallet.hasAggregationContext()) {
sendAggregatedResult(wallet, wallet.getAggregationContext());
}
}
public boolean isDone() {
return isDone;
}
}
/**
*
*/
package fucoin.actions.aggregation;
import akka.actor.ActorRef;
import akka.actor.UntypedActorContext;
import fucoin.wallet.AbstractWallet;
/**
* @author Kim
*
*/
public class ActionAggregationInit extends ActionAggregation {
public ActionAggregationInit(AggregationContext context) {
super(context);
}
private static final long serialVersionUID = -7416863664917157007L;
@Override
protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, AbstractWallet wallet) {
AggregationContext initContext = getContext().initContext(wallet);
wallet.setAggregationContext(initContext);
continueAggregation(wallet);
}
}
......@@ -21,7 +21,13 @@ public class ActionAggregationReply extends ActionAggregation {
@Override
protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, AbstractWallet wallet) {
wallet.setAggregationContext(getContext());
AggregationContext neighborContext = getContext();
AggregationContext myContext = wallet.getAggregationContext();
if (myContext != null) {
AggregationContext aggregatedContext = myContext.aggregate(neighborContext);
wallet.setAggregationContext(aggregatedContext);
}
wallet.setPendingAggregationRequest(false);
}
......
......@@ -3,6 +3,8 @@
*/
package fucoin.actions.aggregation;
import java.util.UUID;
import akka.actor.ActorRef;
import akka.actor.UntypedActorContext;
import fucoin.wallet.AbstractWallet;
......@@ -21,32 +23,57 @@ public class ActionAggregationRequest extends ActionAggregation {
@Override
protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, AbstractWallet wallet) {
if (wallet.hasPendingAggregationRequest()) {
sendCancel(sender, self);
if (wallet.hasPendingAggregationRequest() || isDone(wallet)) {
sendCancel(sender, self, isDone(wallet));
} else {
replyAggregatedContext(sender, wallet);
sendReply(sender, wallet);
continueAggregation(wallet);
}
}
private void sendCancel(ActorRef wallet, ActorRef self) {
ActionAggregationCancel cancel = new ActionAggregationCancel();
private void sendCancel(ActorRef wallet, ActorRef self, boolean isDone) {
ActionAggregationCancel cancel = new ActionAggregationCancel(isDone);
wallet.tell(cancel, self);
}
private void replyAggregatedContext(ActorRef sender, AbstractWallet wallet) {
boolean isFirstRequest = !wallet.hasAggregationContext();
private void sendReply(ActorRef sender, AbstractWallet wallet) {
boolean isFirstRequest = !wallet.hasAggregationContext() || !hasSameUuid(wallet);
if (isFirstRequest) {
initContext(wallet);
}
AggregationContext aggregatedContext = wallet.getAggregationContext()
.aggregate(getContext());
wallet.setAggregationContext(aggregatedContext);
ActionAggregationReply reply = new ActionAggregationReply(aggregatedContext);
sender.tell(reply, wallet.getSelf());
sendAggregatedContext(sender, wallet);
if (isFirstRequest) {
continueAggregation(wallet);
// continueAggregation(wallet);
}
}
private void sendAggregatedContext(ActorRef sender, AbstractWallet wallet) {
AggregationContext myContext = wallet.getAggregationContext();
AggregationContext neighborContext = getContext();
AggregationContext aggregatedContext = myContext.aggregate(neighborContext);
ActionAggregationReply reply = new ActionAggregationReply(myContext);
sender.tell(reply, wallet.getSelf());
wallet.setAggregationContext(aggregatedContext);
}
/**
* Returns true, if the stored aggregationContext has the same UUID as the
* one from the request.
*
* @param wallet
* @return
*/
private boolean hasSameUuid(AbstractWallet wallet) {
boolean isSame;
if (wallet.hasAggregationContext()) {
UUID requestUuid = wallet.getAggregationContext()
.getUuid();
UUID myUuid = getContext().getUuid();
isSame = requestUuid.equals(myUuid);
} else {
return false;
}
return isSame;
}
private void initContext(AbstractWallet wallet) {
......@@ -54,4 +81,17 @@ public class ActionAggregationRequest extends ActionAggregation {
wallet.setAggregationContext(aggregationContext);
}
private boolean isDone(AbstractWallet wallet) {
boolean isDone;
AggregationContext aggregationContext = wallet.getAggregationContext();
if (wallet.hasAggregationContext() && aggregationContext.isDone() && hasSameUuid(wallet)) {
isDone = true;
} else {
isDone = false;
}
return isDone;
}
}
......@@ -5,24 +5,30 @@ package fucoin.actions.aggregation;
import akka.actor.ActorRef;
import akka.actor.UntypedActorContext;
import fucoin.wallet.AbstractWallet;
import fucoin.actions.transaction.SuperVisorAction;
import fucoin.supervisor.SuperVisorImpl;
/**
* @author Kim
*
*/
public class ActionAggregationResult extends ActionAggregation {
public class ActionAggregationResult extends SuperVisorAction {
private static final long serialVersionUID = 2937612747233988421L;
private final AggregationContext context;
public ActionAggregationResult(AggregationContext context) {
super(context);
this.context = context;
}
@Override
protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, AbstractWallet wallet) {
// TODO Kim update GUI
System.out.println(context);
protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, SuperVisorImpl superVisor) {
// TODO add to supervisor result list
}
public AggregationContext getContext() {
return context;
}
}
......@@ -4,6 +4,7 @@
package fucoin.actions.aggregation;
import java.io.Serializable;
import java.util.UUID;
import java.util.function.Function;
import fucoin.wallet.AbstractWallet;
......@@ -19,25 +20,37 @@ public class AggregationContext implements Serializable {
private final Function<AbstractWallet, Double> valueExtractor;
private final int currentExchanges;
private final int maxExchanges;
private final UUID uuid;
public AggregationContext(AggregationFunction function, Function<AbstractWallet, Double> valueExtractor,
double value, int currentExchanges, int maxExchanges) {
private AggregationContext(AggregationFunction function, Function<AbstractWallet, Double> valueExtractor,
double value, int currentExchanges, int maxExchanges, UUID uuid) {
this.function = function;
this.valueExtractor = valueExtractor;
this.value = value;
this.currentExchanges = currentExchanges;
this.maxExchanges = maxExchanges;
this.uuid = uuid;
}
public AggregationContext aggregate(AggregationContext context) {
double aggregatedValue = getFunction().apply(context.getValue(), getValue());
return new AggregationContext(getFunction(), getValueExtractor(), aggregatedValue, getCurrentExchanges() + 1,
getMaxExchanges());
public AggregationContext aggregate(AggregationContext neighborContext) {
double aggregatedValue = getFunction().apply(neighborContext.getValue(), getValue());
AggregationContext aggregatedContext = new AggregationContextBuilder(getFunction(), getValueExtractor(),
aggregatedValue)
.setCurrentExchanges(getCurrentExchanges() + 1)
.setMaxExchanges(getMaxExchanges())
.setUuid(getUuid())
.build();
System.out.println("Aggregation: " + getValue() + " + " + neighborContext.getValue() + " = " + aggregatedValue);
return aggregatedContext;
}
public AggregationContext initContext(AbstractWallet wallet) {
double initValue = getValueExtractor().apply(wallet);
return new AggregationContext(getFunction(), getValueExtractor(), initValue, 0, getMaxExchanges());
AggregationContext initContext = new AggregationContextBuilder(getFunction(), getValueExtractor(), initValue)
.setMaxExchanges(getMaxExchanges())
.setUuid(getUuid())
.build();
return initContext;
}
public boolean isDone() {
......@@ -51,7 +64,7 @@ public class AggregationContext implements Serializable {
@Override
public String toString() {
return "The aggregated value is " + getValue() + "after " + getCurrentExchanges() + "aggregations.";
return "The aggregated value is " + getValue() + " after " + getCurrentExchanges() + " aggregations.";
}
......@@ -71,5 +84,47 @@ public class AggregationContext implements Serializable {
return maxExchanges;
}
public UUID getUuid() {
return uuid;
}
public static class AggregationContextBuilder {
private final AggregationFunction function;
private final double value;
private final Function<AbstractWallet, Double> valueExtractor;
private int currentExchanges;
private int maxExchanges;
private UUID uuid;
public AggregationContextBuilder(AggregationFunction function, Function<AbstractWallet, Double> valueExtractor,
double value) {
this.function = function;
this.valueExtractor = valueExtractor;
this.value = value;
this.currentExchanges = 0;
this.maxExchanges = 20;
this.uuid = UUID.randomUUID();
}
public AggregationContextBuilder setCurrentExchanges(int currentExchanges) {
this.currentExchanges = currentExchanges;
return this;
}
public AggregationContextBuilder setMaxExchanges(int maxExchanges) {
this.maxExchanges = maxExchanges;
return this;
}
public AggregationContextBuilder setUuid(UUID uuid) {
this.uuid = uuid;
return this;
}
public AggregationContext build() {
return new AggregationContext(function, valueExtractor, value, currentExchanges, maxExchanges, uuid);
}
}
}
......@@ -12,13 +12,13 @@ public class MassWalletConfiguration extends AbstractConfiguration {
public void run() {
ActorRef supervisor = initSupervisor();
try {
spawnWallets(10, false);
spawnWallets(3, false);
System.out.println("Wallet spawning done!");
} catch (Exception e) {
System.out.println("Wallet spawning timed out!");
}
randomTransactions(10, 4);
randomTransactions(3, 3);
}
@Override
......
......@@ -5,6 +5,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.function.Function;
import javax.swing.SwingUtilities;
......@@ -12,8 +13,10 @@ import akka.actor.ActorRef;
import akka.actor.Props;
import fucoin.AbstractNode;
import fucoin.actions.Action;
import fucoin.actions.aggregation.ActionAggregationRequest;
import fucoin.actions.aggregation.ActionAggregationInit;
import fucoin.actions.aggregation.AggregationContext;
import fucoin.actions.aggregation.AggregationContext.AggregationContextBuilder;
import fucoin.actions.aggregation.AggregationFunction;
import fucoin.actions.control.ActionWalletCreationDone;
import fucoin.actions.persist.ActionInvokeUpdate;
import fucoin.actions.transaction.ActionGetAmountAnswer;
......@@ -21,6 +24,7 @@ import fucoin.actions.transaction.ActionNotifyObserver;
import fucoin.actions.transaction.SuperVisorAction;
import fucoin.gui.SuperVisorGuiControl;
import fucoin.gui.TransactionLogger;
import fucoin.wallet.AbstractWallet;
public class SuperVisorImpl extends AbstractNode implements TransactionLogger {
......@@ -172,11 +176,13 @@ public class SuperVisorImpl extends AbstractNode implements TransactionLogger {
}
public void startAggregation() {
AggregationContext context = new AggregationContext((x, y) -> Math.max(x, y), x -> new Double(x.getAmount()), 0,
0, 20);
Function<AbstractWallet, Double> valueExtractor = x -> new Double(x.getAmount());
AggregationFunction function = (x, y) -> (x + y) / 2;
AggregationContext context = new AggregationContextBuilder(function, valueExtractor, 0)
.setMaxExchanges(5)
.build();
ActorRef randomNode = getRandomNeighbor();
ActionAggregationRequest request = new ActionAggregationRequest(context);
randomNode.tell(request, getSelf());
System.out.println("init");
ActionAggregationInit init = new ActionAggregationInit(context);
randomNode.tell(init, getSelf());
}
}
package fucoin.wallet;
import java.io.Serializable;
import java.util.UUID;
import akka.actor.ActorRef;
import fucoin.AbstractNode;
......@@ -154,7 +155,22 @@ public abstract class AbstractWallet extends AbstractNode implements Serializabl
public abstract void send(String address, int amount, ActorRef observer);
/**
* TODO Kommentar Kim
* Adds the aggregation request with the id {@code uuid} to the list of
* handled requests.
*
* @param uuid
* request id
*/
public abstract void addAggregationRequest(UUID uuid);
/**
* Returns true, if the aggregation request with the id {@code uuid} has
* already been handled.
*
* @param uuid
* request id
* @return isHandldedRequest
*/
public abstract void removeAggregationContext();
public abstract boolean isHandledRequest(UUID uuid);
}
......@@ -2,8 +2,11 @@ package fucoin.wallet;
import static akka.dispatch.Futures.future;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import com.google.common.collect.EvictingQueue;
import akka.actor.ActorRef;
import akka.actor.Props;
import fucoin.actions.ClientAction;
......@@ -29,6 +32,7 @@ public class WalletImpl extends AbstractWallet {
private AggregationContext aggregationContext;
private boolean hasPendingAggregationRequest = false;
private ConcurrentLinkedQueue<ActorRef> deferedSupervisorReceivers = new ConcurrentLinkedQueue<>();
private final EvictingQueue<UUID> handledAggregationRequests = EvictingQueue.create(10);
public WalletImpl(String name) {
super(name);
......@@ -261,6 +265,7 @@ public class WalletImpl extends AbstractWallet {
@Override
public void setAggregationContext(AggregationContext context) {
System.out.println("Intermediate " + getName() + ": " + context);
this.aggregationContext = context;
}
......@@ -275,8 +280,13 @@ public class WalletImpl extends AbstractWallet {
}
@Override
public void removeAggregationContext() {
this.aggregationContext = null;
public void addAggregationRequest(UUID uuid) {
this.handledAggregationRequests.add(uuid);
}
@Override
public boolean isHandledRequest(UUID uuid) {
return this.handledAggregationRequests.contains(uuid);
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment