Skip to content
Snippets Groups Projects
Commit a817e090 authored by Kim Kern's avatar Kim Kern
Browse files

add AggregationActions

parent f8c61f42
No related branches found
No related tags found
No related merge requests found
Showing
with 388 additions and 10 deletions
package fucoin;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Random;
import akka.actor.ActorRef;
import akka.actor.Address;
import akka.actor.UntypedActor;
import fucoin.actions.transaction.ActionGetAmount;
import fucoin.wallet.AbstractWallet;
import java.io.Serializable;
import java.util.HashMap;
public abstract class AbstractNode extends UntypedActor implements Serializable {
/**
......@@ -69,6 +72,18 @@ public abstract class AbstractNode extends UntypedActor implements Serializable
return knownNeighbors;
}
/**
* Returns a randomly chosen neighbor from the known neighbors.
*
* @return random neighbor
*/
public ActorRef getRandomNeighbor() {
List<ActorRef> neighbors = new ArrayList<>(knownNeighbors.values());
Random rand = new Random();
int index = rand.nextInt(neighbors.size());
return neighbors.get(index);
}
public void log(String string) {
System.out.println(getSelf().path().name() + ": " + string);
}
......
package fucoin.actions.aggregation;
import java.util.Random;
import akka.actor.ActorRef;
import fucoin.actions.ClientAction;
import fucoin.wallet.AbstractWallet;
public abstract class ActionAggregation extends ClientAction {
private static final long serialVersionUID = 4291894423240991230L;
private final AggregationContext context;
public ActionAggregation(AggregationContext context) {
this.context = context;
}
public AggregationContext getContext() {
return context;
}
/**
* Sends an AggregationRequest after a randomly chosen time to a random
* neighbor if the aggregation has finished or else sends the result to the
* supervisor.
*
* @param wallet
*/
public void continueAggregation(AbstractWallet wallet) {
AggregationContext aggregationContext = wallet.getAggregationContext();
if (!aggregationContext.isDone()) {
sendSleepingRequest(wallet, aggregationContext);
} else {
sendAggregatedResult(wallet, aggregationContext);
}
}
private void sendAggregatedResult(AbstractWallet wallet, AggregationContext aggregationContext) {
ActionAggregationResult result = new ActionAggregationResult(aggregationContext);
wallet.getRemoteSuperVisorActor()
.tell(result, wallet.getSelf());
}
private void sendSleepingRequest(AbstractWallet wallet, AggregationContext aggregationContext) {
ActorRef randomNeighbor = wallet.getRandomNeighbor();
Thread sleepingRequest = new Thread() {
@Override
public void run() {
int wait = new Random().nextInt(500);
try {
Thread.sleep(wait);
ActionAggregationRequest request = new ActionAggregationRequest(aggregationContext);
randomNeighbor.tell(request, wallet.getSelf());
wallet.setPendingAggregationRequest(true);
} catch (InterruptedException ex) {
Thread.currentThread()
.interrupt();
}
}
};
sleepingRequest.start();
}
}
/**
*
*/
package fucoin.actions.aggregation;
import akka.actor.ActorRef;
import akka.actor.UntypedActorContext;
import fucoin.wallet.AbstractWallet;
/**
* @author Kim
*
*/
public class ActionAggregationCancel extends ActionAggregation {
private static final long serialVersionUID = -7416863664917157007L;
public ActionAggregationCancel() {
super(null);
}
@Override
protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, AbstractWallet wallet) {
wallet.setPendingAggregationRequest(false);
continueAggregation(wallet);
}
}
/**
*
*/
package fucoin.actions.aggregation;
import akka.actor.ActorRef;
import akka.actor.UntypedActorContext;
import fucoin.wallet.AbstractWallet;
/**
* @author Kim
*
*/
public class ActionAggregationReply extends ActionAggregation {
private static final long serialVersionUID = 2903632395631186161L;
public ActionAggregationReply(AggregationContext context) {
super(context);
}
@Override
protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, AbstractWallet wallet) {
wallet.setAggregationContext(getContext());
wallet.setPendingAggregationRequest(false);
continueAggregation(wallet);
}
}
/**
*
*/
package fucoin.actions.aggregation;
import akka.actor.ActorRef;
import akka.actor.UntypedActorContext;
import fucoin.wallet.AbstractWallet;
/**
* @author Kim
*
*/
public class ActionAggregationRequest extends ActionAggregation {
private static final long serialVersionUID = -533086993646203160L;
public ActionAggregationRequest(AggregationContext context) {
super(context);
}
@Override
protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, AbstractWallet wallet) {
if (wallet.hasPendingAggregationRequest()) {
sendCancel(wallet);
} else {
replyAggregatedContext(sender, wallet);
continueAggregation(wallet);
}
}
private void sendCancel(AbstractWallet wallet) {
ActionAggregationCancel cancel = new ActionAggregationCancel();
wallet.getRemoteSuperVisorActor()
.tell(cancel, wallet.getSelf());
}
private void replyAggregatedContext(ActorRef sender, AbstractWallet wallet) {
if (!wallet.hasAggregationContext()) {
initContext(wallet);
}
AggregationContext aggregatedContext = wallet.getAggregationContext()
.aggregate(getContext());
wallet.setAggregationContext(aggregatedContext);
ActionAggregationReply reply = new ActionAggregationReply(aggregatedContext);
sender.tell(reply, wallet.getSelf());
}
private void initContext(AbstractWallet wallet) {
AggregationContext aggregationContext = getContext().initContext(wallet);
wallet.setAggregationContext(aggregationContext);
}
}
/**
*
*/
package fucoin.actions.aggregation;
import akka.actor.ActorRef;
import akka.actor.UntypedActorContext;
import fucoin.wallet.AbstractWallet;
/**
* @author Kim
*
*/
public class ActionAggregationResult extends ActionAggregation {
private static final long serialVersionUID = 2937612747233988421L;
public ActionAggregationResult(AggregationContext context) {
super(context);
}
@Override
protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, AbstractWallet wallet) {
// TODO Kim update GUI
System.out.println(context);
}
}
/**
*
*/
package fucoin.actions.aggregation;
import java.io.Serializable;
import java.util.function.Function;
import fucoin.wallet.AbstractWallet;
/**
* @author Kim
*
*/
public class AggregationContext implements Serializable {
private static final long serialVersionUID = -8314269748209085088L;
private final AggregationFunction function;
private final double value;
private final Function<AbstractWallet, Double> valueExtractor;
private final int currentExchanges;
private final int maxExchanges;
public AggregationContext(AggregationFunction function, Function<AbstractWallet, Double> valueExtractor,
double value, int currentExchanges, int maxExchanges) {
this.function = function;
this.valueExtractor = valueExtractor;
this.value = value;
this.currentExchanges = currentExchanges;
this.maxExchanges = maxExchanges;
}
public AggregationContext aggregate(AggregationContext context) {
double aggregatedValue = getFunction().apply(context.getValue(), getValue());
return new AggregationContext(getFunction(), getValueExtractor(), aggregatedValue, getCurrentExchanges() + 1,
getMaxExchanges());
}
public AggregationContext initContext(AbstractWallet wallet) {
double initValue = getValueExtractor().apply(wallet);
return new AggregationContext(getFunction(), getValueExtractor(), initValue, 0, getMaxExchanges());
}
public boolean isDone() {
return getCurrentExchanges() >= getMaxExchanges();
}
public double getValue() {
return value;
}
@Override
public String toString() {
return "The aggregated value is " + getValue() + "after " + getMaxExchanges() + "aggregations.";
}
private AggregationFunction getFunction() {
return function;
}
private Function<AbstractWallet, Double> getValueExtractor() {
return valueExtractor;
}
private int getCurrentExchanges() {
return currentExchanges;
}
private int getMaxExchanges() {
return maxExchanges;
}
}
/**
*
*/
package fucoin.actions.aggregation;
import java.util.function.BiFunction;
/**
* This is a functional interface, that aggregates two numeric values.
*
* @author Kim
*
*/
public interface AggregationFunction extends BiFunction<Double, Double, Double> {
@Override
public Double apply(Double t, Double u);
}
package fucoin.wallet;
import java.io.Serializable;
import akka.actor.ActorRef;
import fucoin.AbstractNode;
import java.io.Serializable;
import fucoin.actions.aggregation.AggregationContext;
/**
*
......@@ -109,10 +110,49 @@ public abstract class AbstractWallet extends AbstractNode implements Serializabl
public abstract void logTransactionFail(String msg);
/**
* Sends amount FUCs to the wallet with the address adress
* Sends amount FUCs to the wallet with the address address
*
* @param address Recipients address
* @param amount Amount to send
* @param address
* Recipients address
* @param amount
* Amount to send
*/
public abstract void send(String address, int amount);
/**
* True, if wallet already has an AggregationContext.
*
* @return True, if context exists
*/
public abstract boolean hasAggregationContext();
/**
* Returns the currently stored AggregationContext.
*
* @return AggregationContext
*/
public abstract AggregationContext getAggregationContext();
/**
* Sets the currently stored AggregationContext
*
* @param context
*/
public abstract void setAggregationContext(AggregationContext context);
/**
* True, if wallet has sent a AggregationRequest and hence is waiting for an
* AggregationReply.
*
* @return True, if AggregationRequest is pending.
*/
public abstract boolean hasPendingAggregationRequest();
/**
* Sets the wallet's status of pending AggregationRequests.
*
* @param hasPendingRequest
*/
public abstract void setPendingAggregationRequest(boolean hasPendingRequest);
}
......@@ -3,6 +3,7 @@ package fucoin.wallet;
import akka.actor.ActorRef;
import akka.actor.Props;
import fucoin.actions.ClientAction;
import fucoin.actions.aggregation.AggregationContext;
import fucoin.actions.join.ActionJoin;
import fucoin.actions.join.ActionJoinAnswer;
import fucoin.actions.join.ServerActionJoin;
......@@ -19,6 +20,8 @@ public class WalletImpl extends AbstractWallet {
private transient WalletGuiControl gui;
private String preKnownNeighbourName;
private boolean isActive;
private AggregationContext aggregationContext;
private boolean hasPendingAggregationRequest = false;
public WalletImpl(String name) {
super(name);
......@@ -225,4 +228,29 @@ public class WalletImpl extends AbstractWallet {
getSelf().tell(new ActionInvokeSentMoney(address, amount), getSelf());
}
@Override
public boolean hasAggregationContext() {
return getAggregationContext() != null;
}
@Override
public AggregationContext getAggregationContext() {
return aggregationContext;
}
@Override
public void setAggregationContext(AggregationContext context) {
this.aggregationContext = context;
}
@Override
public boolean hasPendingAggregationRequest() {
return this.hasPendingAggregationRequest;
}
@Override
public void setPendingAggregationRequest(boolean hasPendingRequest) {
this.hasPendingAggregationRequest = hasPendingRequest;
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment