diff --git a/src/main/java/fucoin/AbstractNode.java b/src/main/java/fucoin/AbstractNode.java index 437262c5c1f4f7472674e58e445abb1104136d56..0ca9a02b4368ba4c6cdabd9c5278fb7e53de1a3f 100644 --- a/src/main/java/fucoin/AbstractNode.java +++ b/src/main/java/fucoin/AbstractNode.java @@ -1,14 +1,17 @@ package fucoin; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Random; + import akka.actor.ActorRef; import akka.actor.Address; import akka.actor.UntypedActor; import fucoin.actions.transaction.ActionGetAmount; import fucoin.wallet.AbstractWallet; -import java.io.Serializable; -import java.util.HashMap; - public abstract class AbstractNode extends UntypedActor implements Serializable { /** @@ -69,6 +72,18 @@ public abstract class AbstractNode extends UntypedActor implements Serializable return knownNeighbors; } + /** + * Returns a randomly chosen neighbor from the known neighbors. + * + * @return random neighbor + */ + public ActorRef getRandomNeighbor() { + List<ActorRef> neighbors = new ArrayList<>(knownNeighbors.values()); + Random rand = new Random(); + int index = rand.nextInt(neighbors.size()); + return neighbors.get(index); + } + public void log(String string) { System.out.println(getSelf().path().name() + ": " + string); } diff --git a/src/main/java/fucoin/actions/aggregation/ActionAggregation.java b/src/main/java/fucoin/actions/aggregation/ActionAggregation.java new file mode 100644 index 0000000000000000000000000000000000000000..ac08f2029b54a47148e1a3901aa10952855e67e7 --- /dev/null +++ b/src/main/java/fucoin/actions/aggregation/ActionAggregation.java @@ -0,0 +1,63 @@ +package fucoin.actions.aggregation; + +import java.util.Random; + +import akka.actor.ActorRef; +import fucoin.actions.ClientAction; +import fucoin.wallet.AbstractWallet; + +public abstract class ActionAggregation extends ClientAction { + + private static final long serialVersionUID = 4291894423240991230L; + private final AggregationContext context; + + public ActionAggregation(AggregationContext context) { + this.context = context; + } + + public AggregationContext getContext() { + return context; + } + + /** + * Sends an AggregationRequest after a randomly chosen time to a random + * neighbor if the aggregation has finished or else sends the result to the + * supervisor. + * + * @param wallet + */ + public void continueAggregation(AbstractWallet wallet) { + AggregationContext aggregationContext = wallet.getAggregationContext(); + if (!aggregationContext.isDone()) { + sendSleepingRequest(wallet, aggregationContext); + } else { + sendAggregatedResult(wallet, aggregationContext); + } + } + + private void sendAggregatedResult(AbstractWallet wallet, AggregationContext aggregationContext) { + ActionAggregationResult result = new ActionAggregationResult(aggregationContext); + wallet.getRemoteSuperVisorActor() + .tell(result, wallet.getSelf()); + } + + private void sendSleepingRequest(AbstractWallet wallet, AggregationContext aggregationContext) { + ActorRef randomNeighbor = wallet.getRandomNeighbor(); + Thread sleepingRequest = new Thread() { + @Override + public void run() { + int wait = new Random().nextInt(500); + try { + Thread.sleep(wait); + ActionAggregationRequest request = new ActionAggregationRequest(aggregationContext); + randomNeighbor.tell(request, wallet.getSelf()); + wallet.setPendingAggregationRequest(true); + } catch (InterruptedException ex) { + Thread.currentThread() + .interrupt(); + } + } + }; + sleepingRequest.start(); + } +} diff --git a/src/main/java/fucoin/actions/aggregation/ActionAggregationCancel.java b/src/main/java/fucoin/actions/aggregation/ActionAggregationCancel.java new file mode 100644 index 0000000000000000000000000000000000000000..2930ef31ef1055dcd7450653d74df1f44e12404b --- /dev/null +++ b/src/main/java/fucoin/actions/aggregation/ActionAggregationCancel.java @@ -0,0 +1,28 @@ +/** + * + */ +package fucoin.actions.aggregation; + +import akka.actor.ActorRef; +import akka.actor.UntypedActorContext; +import fucoin.wallet.AbstractWallet; + +/** + * @author Kim + * + */ +public class ActionAggregationCancel extends ActionAggregation { + + private static final long serialVersionUID = -7416863664917157007L; + + public ActionAggregationCancel() { + super(null); + } + + @Override + protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, AbstractWallet wallet) { + wallet.setPendingAggregationRequest(false); + continueAggregation(wallet); + } + +} diff --git a/src/main/java/fucoin/actions/aggregation/ActionAggregationReply.java b/src/main/java/fucoin/actions/aggregation/ActionAggregationReply.java new file mode 100644 index 0000000000000000000000000000000000000000..8f24c63222f6d8217894023a50eb6616d73b3839 --- /dev/null +++ b/src/main/java/fucoin/actions/aggregation/ActionAggregationReply.java @@ -0,0 +1,29 @@ +/** + * + */ +package fucoin.actions.aggregation; + +import akka.actor.ActorRef; +import akka.actor.UntypedActorContext; +import fucoin.wallet.AbstractWallet; + +/** + * @author Kim + * + */ +public class ActionAggregationReply extends ActionAggregation { + + private static final long serialVersionUID = 2903632395631186161L; + + public ActionAggregationReply(AggregationContext context) { + super(context); + } + + @Override + protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, AbstractWallet wallet) { + wallet.setAggregationContext(getContext()); + wallet.setPendingAggregationRequest(false); + continueAggregation(wallet); + } + +} diff --git a/src/main/java/fucoin/actions/aggregation/ActionAggregationRequest.java b/src/main/java/fucoin/actions/aggregation/ActionAggregationRequest.java new file mode 100644 index 0000000000000000000000000000000000000000..a0f10190ed19ddeb7c744a946ced11ab82c81e7f --- /dev/null +++ b/src/main/java/fucoin/actions/aggregation/ActionAggregationRequest.java @@ -0,0 +1,54 @@ +/** + * + */ +package fucoin.actions.aggregation; + +import akka.actor.ActorRef; +import akka.actor.UntypedActorContext; +import fucoin.wallet.AbstractWallet; + +/** + * @author Kim + * + */ +public class ActionAggregationRequest extends ActionAggregation { + + private static final long serialVersionUID = -533086993646203160L; + + public ActionAggregationRequest(AggregationContext context) { + super(context); + } + + @Override + protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, AbstractWallet wallet) { + if (wallet.hasPendingAggregationRequest()) { + sendCancel(wallet); + } else { + replyAggregatedContext(sender, wallet); + continueAggregation(wallet); + } + } + + private void sendCancel(AbstractWallet wallet) { + ActionAggregationCancel cancel = new ActionAggregationCancel(); + wallet.getRemoteSuperVisorActor() + .tell(cancel, wallet.getSelf()); + } + + private void replyAggregatedContext(ActorRef sender, AbstractWallet wallet) { + if (!wallet.hasAggregationContext()) { + initContext(wallet); + } + AggregationContext aggregatedContext = wallet.getAggregationContext() + .aggregate(getContext()); + wallet.setAggregationContext(aggregatedContext); + ActionAggregationReply reply = new ActionAggregationReply(aggregatedContext); + sender.tell(reply, wallet.getSelf()); + } + + private void initContext(AbstractWallet wallet) { + AggregationContext aggregationContext = getContext().initContext(wallet); + wallet.setAggregationContext(aggregationContext); + } + +} diff --git a/src/main/java/fucoin/actions/aggregation/ActionAggregationResult.java b/src/main/java/fucoin/actions/aggregation/ActionAggregationResult.java new file mode 100644 index 0000000000000000000000000000000000000000..cd389e0a66fd596a381f65cca2ea64cb94eccfe9 --- /dev/null +++ b/src/main/java/fucoin/actions/aggregation/ActionAggregationResult.java @@ -0,0 +1,28 @@ +/** + * + */ +package fucoin.actions.aggregation; + +import akka.actor.ActorRef; +import akka.actor.UntypedActorContext; +import fucoin.wallet.AbstractWallet; + +/** + * @author Kim + * + */ +public class ActionAggregationResult extends ActionAggregation { + + private static final long serialVersionUID = 2937612747233988421L; + + public ActionAggregationResult(AggregationContext context) { + super(context); + } + + @Override + protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, AbstractWallet wallet) { + // TODO Kim update GUI + System.out.println(context); + } + +} diff --git a/src/main/java/fucoin/actions/aggregation/AggregationContext.java b/src/main/java/fucoin/actions/aggregation/AggregationContext.java new file mode 100644 index 0000000000000000000000000000000000000000..3b02259b39b255a50f99165398151f3dd61de73d --- /dev/null +++ b/src/main/java/fucoin/actions/aggregation/AggregationContext.java @@ -0,0 +1,74 @@ +/** + * + */ +package fucoin.actions.aggregation; + +import java.io.Serializable; +import java.util.function.Function; + +import fucoin.wallet.AbstractWallet; + +/** + * @author Kim + * + */ +public class AggregationContext implements Serializable { + private static final long serialVersionUID = -8314269748209085088L; + private final AggregationFunction function; + private final double value; + private final Function<AbstractWallet, Double> valueExtractor; + private final int currentExchanges; + private final int maxExchanges; + + public AggregationContext(AggregationFunction function, Function<AbstractWallet, Double> valueExtractor, + double value, int currentExchanges, int maxExchanges) { + this.function = function; + this.valueExtractor = valueExtractor; + this.value = value; + this.currentExchanges = currentExchanges; + this.maxExchanges = maxExchanges; + } + + public AggregationContext aggregate(AggregationContext context) { + double aggregatedValue = getFunction().apply(context.getValue(), getValue()); + return new AggregationContext(getFunction(), getValueExtractor(), aggregatedValue, getCurrentExchanges() + 1, + getMaxExchanges()); + } + + public AggregationContext initContext(AbstractWallet wallet) { + double initValue = getValueExtractor().apply(wallet); + return new AggregationContext(getFunction(), getValueExtractor(), initValue, 0, getMaxExchanges()); + } + + public boolean isDone() { + return getCurrentExchanges() >= getMaxExchanges(); + } + + public double getValue() { + return value; + } + + @Override + public String toString() { + return "The aggregated value is " + getValue() + "after " + getMaxExchanges() + "aggregations."; + + } + + private AggregationFunction getFunction() { + return function; + } + + private Function<AbstractWallet, Double> getValueExtractor() { + return valueExtractor; + } + + private int getCurrentExchanges() { + return currentExchanges; + } + + private int getMaxExchanges() { + return maxExchanges; + } + + +} diff --git a/src/main/java/fucoin/actions/aggregation/AggregationFunction.java b/src/main/java/fucoin/actions/aggregation/AggregationFunction.java new file mode 100644 index 0000000000000000000000000000000000000000..2fda288cdb42e24b05236c5230ff454892911a13 --- /dev/null +++ b/src/main/java/fucoin/actions/aggregation/AggregationFunction.java @@ -0,0 +1,19 @@ +/** + * + */ +package fucoin.actions.aggregation; + +import java.util.function.BiFunction; + +/** + * This is a functional interface, that aggregates two numeric values. + * + * @author Kim + * + */ +public interface AggregationFunction extends BiFunction<Double, Double, Double> { + + @Override + public Double apply(Double t, Double u); + +} diff --git a/src/main/java/fucoin/wallet/AbstractWallet.java b/src/main/java/fucoin/wallet/AbstractWallet.java index bc44d051ec250a68db11f9e677b683f985a4c5b8..e6b41cfbe8636a07ca562c4d2018ece1a1d926ce 100644 --- a/src/main/java/fucoin/wallet/AbstractWallet.java +++ b/src/main/java/fucoin/wallet/AbstractWallet.java @@ -1,9 +1,10 @@ package fucoin.wallet; +import java.io.Serializable; + import akka.actor.ActorRef; import fucoin.AbstractNode; - -import java.io.Serializable; +import fucoin.actions.aggregation.AggregationContext; /** * @@ -109,10 +110,49 @@ public abstract class AbstractWallet extends AbstractNode implements Serializabl public abstract void logTransactionFail(String msg); /** - * Sends amount FUCs to the wallet with the address adress - * - * @param address Recipients address - * @param amount Amount to send - */ + * Sends amount FUCs to the wallet with the address address + * + * @param address + * Recipients address + * @param amount + * Amount to send + */ public abstract void send(String address, int amount); + + /** + * True, if wallet already has an AggregationContext. + * + * @return True, if context exists + */ + public abstract boolean hasAggregationContext(); + + /** + * Returns the currently stored AggregationContext. + * + * @return AggregationContext + */ + public abstract AggregationContext getAggregationContext(); + + /** + * Sets the currently stored AggregationContext + * + * @param context + */ + public abstract void setAggregationContext(AggregationContext context); + + /** + * True, if wallet has sent a AggregationRequest and hence is waiting for an + * AggregationReply. + * + * @return True, if AggregationRequest is pending. + */ + public abstract boolean hasPendingAggregationRequest(); + + /** + * Sets the wallet's status of pending AggregationRequests. + * + * @param hasPendingRequest + */ + public abstract void setPendingAggregationRequest(boolean hasPendingRequest); + } diff --git a/src/main/java/fucoin/wallet/WalletImpl.java b/src/main/java/fucoin/wallet/WalletImpl.java index f35d620fb87a8629bb4238682c119e208b9158b2..fa39c805a1af17ec3dde81e9226a3f787362d3d5 100644 --- a/src/main/java/fucoin/wallet/WalletImpl.java +++ b/src/main/java/fucoin/wallet/WalletImpl.java @@ -3,6 +3,7 @@ package fucoin.wallet; import akka.actor.ActorRef; import akka.actor.Props; import fucoin.actions.ClientAction; +import fucoin.actions.aggregation.AggregationContext; import fucoin.actions.join.ActionJoin; import fucoin.actions.join.ActionJoinAnswer; import fucoin.actions.join.ServerActionJoin; @@ -19,6 +20,8 @@ public class WalletImpl extends AbstractWallet { private transient WalletGuiControl gui; private String preKnownNeighbourName; private boolean isActive; + private AggregationContext aggregationContext; + private boolean hasPendingAggregationRequest = false; public WalletImpl(String name) { super(name); @@ -225,4 +228,29 @@ public class WalletImpl extends AbstractWallet { getSelf().tell(new ActionInvokeSentMoney(address, amount), getSelf()); } + @Override + public boolean hasAggregationContext() { + return getAggregationContext() != null; + } + + @Override + public AggregationContext getAggregationContext() { + return aggregationContext; + } + + @Override + public void setAggregationContext(AggregationContext context) { + this.aggregationContext = context; + } + + @Override + public boolean hasPendingAggregationRequest() { + return this.hasPendingAggregationRequest; + } + + @Override + public void setPendingAggregationRequest(boolean hasPendingRequest) { + this.hasPendingAggregationRequest = hasPendingRequest; + } + }