Skip to content
Snippets Groups Projects
Commit 1b8e451b authored by Kim Kern's avatar Kim Kern
Browse files

added gui aggregation trigger

parent c9870a61
Branches
No related tags found
No related merge requests found
Showing with 72 additions and 30 deletions
...@@ -31,7 +31,9 @@ public abstract class ActionAggregation extends ClientAction { ...@@ -31,7 +31,9 @@ public abstract class ActionAggregation extends ClientAction {
if (!aggregationContext.isDone()) { if (!aggregationContext.isDone()) {
sendSleepingRequest(wallet, aggregationContext); sendSleepingRequest(wallet, aggregationContext);
} else { } else {
System.out.println(aggregationContext);
sendAggregatedResult(wallet, aggregationContext); sendAggregatedResult(wallet, aggregationContext);
} }
} }
...@@ -39,6 +41,7 @@ public abstract class ActionAggregation extends ClientAction { ...@@ -39,6 +41,7 @@ public abstract class ActionAggregation extends ClientAction {
ActionAggregationResult result = new ActionAggregationResult(aggregationContext); ActionAggregationResult result = new ActionAggregationResult(aggregationContext);
wallet.getRemoteSuperVisorActor() wallet.getRemoteSuperVisorActor()
.tell(result, wallet.getSelf()); .tell(result, wallet.getSelf());
wallet.removeAggregationContext();
} }
private void sendSleepingRequest(AbstractWallet wallet, AggregationContext aggregationContext) { private void sendSleepingRequest(AbstractWallet wallet, AggregationContext aggregationContext) {
...@@ -46,7 +49,7 @@ public abstract class ActionAggregation extends ClientAction { ...@@ -46,7 +49,7 @@ public abstract class ActionAggregation extends ClientAction {
Thread sleepingRequest = new Thread() { Thread sleepingRequest = new Thread() {
@Override @Override
public void run() { public void run() {
int wait = new Random().nextInt(500); int wait = new Random().nextInt(500) + 10;
try { try {
Thread.sleep(wait); Thread.sleep(wait);
ActionAggregationRequest request = new ActionAggregationRequest(aggregationContext); ActionAggregationRequest request = new ActionAggregationRequest(aggregationContext);
......
...@@ -23,7 +23,6 @@ public class ActionAggregationReply extends ActionAggregation { ...@@ -23,7 +23,6 @@ public class ActionAggregationReply extends ActionAggregation {
protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, AbstractWallet wallet) { protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, AbstractWallet wallet) {
wallet.setAggregationContext(getContext()); wallet.setAggregationContext(getContext());
wallet.setPendingAggregationRequest(false); wallet.setPendingAggregationRequest(false);
continueAggregation(wallet);
} }
} }
...@@ -22,21 +22,21 @@ public class ActionAggregationRequest extends ActionAggregation { ...@@ -22,21 +22,21 @@ public class ActionAggregationRequest extends ActionAggregation {
@Override @Override
protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, AbstractWallet wallet) { protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, AbstractWallet wallet) {
if (wallet.hasPendingAggregationRequest()) { if (wallet.hasPendingAggregationRequest()) {
sendCancel(wallet); sendCancel(sender, self);
} else { } else {
replyAggregatedContext(sender, wallet); replyAggregatedContext(sender, wallet);
continueAggregation(wallet); continueAggregation(wallet);
} }
} }
private void sendCancel(AbstractWallet wallet) { private void sendCancel(ActorRef wallet, ActorRef self) {
ActionAggregationCancel cancel = new ActionAggregationCancel(); ActionAggregationCancel cancel = new ActionAggregationCancel();
wallet.getRemoteSuperVisorActor() wallet.tell(cancel, self);
.tell(cancel, wallet.getSelf());
} }
private void replyAggregatedContext(ActorRef sender, AbstractWallet wallet) { private void replyAggregatedContext(ActorRef sender, AbstractWallet wallet) {
if (!wallet.hasAggregationContext()) { boolean isFirstRequest = !wallet.hasAggregationContext();
if (isFirstRequest) {
initContext(wallet); initContext(wallet);
} }
AggregationContext aggregatedContext = wallet.getAggregationContext() AggregationContext aggregatedContext = wallet.getAggregationContext()
...@@ -44,6 +44,9 @@ public class ActionAggregationRequest extends ActionAggregation { ...@@ -44,6 +44,9 @@ public class ActionAggregationRequest extends ActionAggregation {
wallet.setAggregationContext(aggregatedContext); wallet.setAggregationContext(aggregatedContext);
ActionAggregationReply reply = new ActionAggregationReply(aggregatedContext); ActionAggregationReply reply = new ActionAggregationReply(aggregatedContext);
sender.tell(reply, wallet.getSelf()); sender.tell(reply, wallet.getSelf());
if (isFirstRequest) {
continueAggregation(wallet);
}
} }
private void initContext(AbstractWallet wallet) { private void initContext(AbstractWallet wallet) {
......
...@@ -41,7 +41,8 @@ public class AggregationContext implements Serializable { ...@@ -41,7 +41,8 @@ public class AggregationContext implements Serializable {
} }
public boolean isDone() { public boolean isDone() {
return getCurrentExchanges() >= getMaxExchanges(); boolean isDone = getCurrentExchanges() >= getMaxExchanges();
return isDone;
} }
public double getValue() { public double getValue() {
...@@ -50,7 +51,7 @@ public class AggregationContext implements Serializable { ...@@ -50,7 +51,7 @@ public class AggregationContext implements Serializable {
@Override @Override
public String toString() { public String toString() {
return "The aggregated value is " + getValue() + "after " + getMaxExchanges() + "aggregations."; return "The aggregated value is " + getValue() + "after " + getCurrentExchanges() + "aggregations.";
} }
......
package fucoin.configurations; package fucoin.configurations;
import akka.actor.ActorRef;
import fucoin.configurations.internal.ConfigurationName; import fucoin.configurations.internal.ConfigurationName;
/** /**
...@@ -9,15 +10,15 @@ import fucoin.configurations.internal.ConfigurationName; ...@@ -9,15 +10,15 @@ import fucoin.configurations.internal.ConfigurationName;
public class MassWalletConfiguration extends AbstractConfiguration { public class MassWalletConfiguration extends AbstractConfiguration {
@Override @Override
public void run() { public void run() {
initSupervisor(); ActorRef supervisor = initSupervisor();
try { try {
spawnWallets(200, false); spawnWallets(10, false);
System.out.println("Wallet spawning done!"); System.out.println("Wallet spawning done!");
} catch (Exception e) { } catch (Exception e) {
System.out.println("Wallet spawning timed out!"); System.out.println("Wallet spawning timed out!");
} }
randomTransactions(100, 10); randomTransactions(10, 4);
} }
@Override @Override
......
...@@ -3,14 +3,6 @@ package fucoin.gui; ...@@ -3,14 +3,6 @@ package fucoin.gui;
import fucoin.supervisor.AmountTableModel; import fucoin.supervisor.AmountTableModel;
import fucoin.supervisor.SuperVisorImpl; import fucoin.supervisor.SuperVisorImpl;
import javax.swing.*;
import javax.swing.event.TableModelEvent;
import javax.swing.event.TableModelListener;
import java.awt.*;
import java.awt.event.ItemEvent;
import java.awt.event.WindowAdapter;
import java.awt.event.WindowEvent;
public class SuperVisorGuiControlImpl implements SuperVisorGuiControl { public class SuperVisorGuiControlImpl implements SuperVisorGuiControl {
private SuperVisorImpl superVisor; private SuperVisorImpl superVisor;
...@@ -88,4 +80,8 @@ public class SuperVisorGuiControlImpl implements SuperVisorGuiControl { ...@@ -88,4 +80,8 @@ public class SuperVisorGuiControlImpl implements SuperVisorGuiControl {
public boolean isLogActive() { public boolean isLogActive() {
return logActive; return logActive;
} }
public void startAggregation() {
superVisor.startAggregation();
}
} }
package fucoin.gui; package fucoin.gui;
import javax.swing.*; import java.awt.BorderLayout;
import java.awt.*; import java.awt.GridLayout;
import java.awt.event.ItemEvent; import java.awt.event.ItemEvent;
import java.awt.event.WindowAdapter; import java.awt.event.WindowAdapter;
import java.awt.event.WindowEvent; import java.awt.event.WindowEvent;
import javax.swing.JButton;
import javax.swing.JCheckBox;
import javax.swing.JFrame;
import javax.swing.JList;
import javax.swing.JPanel;
import javax.swing.JScrollPane;
import javax.swing.JTable;
import javax.swing.SwingUtilities;
/** /**
* *
*/ */
...@@ -19,6 +28,8 @@ public class SuperVisorThreadGUI { ...@@ -19,6 +28,8 @@ public class SuperVisorThreadGUI {
private JCheckBox activateLogging; private JCheckBox activateLogging;
private SuperVisorGuiControlImpl superVisorGuiControl; private SuperVisorGuiControlImpl superVisorGuiControl;
private JButton startAggregationButton;
public SuperVisorThreadGUI(SuperVisorGuiControlImpl superVisorGuiControl) { public SuperVisorThreadGUI(SuperVisorGuiControlImpl superVisorGuiControl) {
this.superVisorGuiControl = superVisorGuiControl; this.superVisorGuiControl = superVisorGuiControl;
...@@ -38,6 +49,11 @@ public class SuperVisorThreadGUI { ...@@ -38,6 +49,11 @@ public class SuperVisorThreadGUI {
JPanel logPanel = new JPanel(new BorderLayout()); JPanel logPanel = new JPanel(new BorderLayout());
txtLog.setCellRenderer(new LogCellRenderer()); txtLog.setCellRenderer(new LogCellRenderer());
startAggregationButton = new JButton("Start Aggregation");
startAggregationButton.addActionListener(e -> {
superVisorGuiControl.startAggregation();
});
showDebug = new JCheckBox("Show debug messages in transaction log"); showDebug = new JCheckBox("Show debug messages in transaction log");
showDebug.setSelected(true); showDebug.setSelected(true);
...@@ -64,6 +80,7 @@ public class SuperVisorThreadGUI { ...@@ -64,6 +80,7 @@ public class SuperVisorThreadGUI {
configPanel.add(activateLogging); configPanel.add(activateLogging);
configPanel.add(showDebug); configPanel.add(showDebug);
configPanel.add(startAggregationButton);
//logPanel.add(activateLogging, BorderLayout.NORTH); //logPanel.add(activateLogging, BorderLayout.NORTH);
logPanel.add(configPanel, BorderLayout.NORTH); logPanel.add(configPanel, BorderLayout.NORTH);
......
package fucoin.supervisor; package fucoin.supervisor;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import javax.swing.SwingUtilities;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.actor.Props; import akka.actor.Props;
import fucoin.AbstractNode;
import fucoin.actions.Action; import fucoin.actions.Action;
import fucoin.actions.aggregation.ActionAggregationRequest;
import fucoin.actions.aggregation.AggregationContext;
import fucoin.actions.control.ActionWalletCreationDone; import fucoin.actions.control.ActionWalletCreationDone;
import fucoin.actions.persist.ActionInvokeUpdate; import fucoin.actions.persist.ActionInvokeUpdate;
import fucoin.actions.transaction.ActionGetAmountAnswer; import fucoin.actions.transaction.ActionGetAmountAnswer;
import fucoin.actions.transaction.ActionNotifyObserver; import fucoin.actions.transaction.ActionNotifyObserver;
import fucoin.actions.transaction.SuperVisorAction; import fucoin.actions.transaction.SuperVisorAction;
import fucoin.gui.SuperVisorGuiControl; import fucoin.gui.SuperVisorGuiControl;
import fucoin.AbstractNode;
import fucoin.gui.TransactionLogger; import fucoin.gui.TransactionLogger;
import javax.swing.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
public class SuperVisorImpl extends AbstractNode implements TransactionLogger { public class SuperVisorImpl extends AbstractNode implements TransactionLogger {
//private AmountTableModel amountTableModel; //private AmountTableModel amountTableModel;
...@@ -167,4 +170,13 @@ public class SuperVisorImpl extends AbstractNode implements TransactionLogger { ...@@ -167,4 +170,13 @@ public class SuperVisorImpl extends AbstractNode implements TransactionLogger {
public void setBankCommitObserver(ActorRef bankCommitObserver) { public void setBankCommitObserver(ActorRef bankCommitObserver) {
this.bankCommitObserver = bankCommitObserver; this.bankCommitObserver = bankCommitObserver;
} }
public void startAggregation() {
AggregationContext context = new AggregationContext((x, y) -> Math.max(x, y), x -> new Double(x.getAmount()), 0,
0, 20);
ActorRef randomNode = getRandomNeighbor();
ActionAggregationRequest request = new ActionAggregationRequest(context);
randomNode.tell(request, getSelf());
System.out.println("init");
}
} }
...@@ -152,4 +152,9 @@ public abstract class AbstractWallet extends AbstractNode implements Serializabl ...@@ -152,4 +152,9 @@ public abstract class AbstractWallet extends AbstractNode implements Serializabl
* @param observer * @param observer
*/ */
public abstract void send(String address, int amount, ActorRef observer); public abstract void send(String address, int amount, ActorRef observer);
/**
* TODO Kommentar Kim
*/
public abstract void removeAggregationContext();
} }
...@@ -274,4 +274,9 @@ public class WalletImpl extends AbstractWallet { ...@@ -274,4 +274,9 @@ public class WalletImpl extends AbstractWallet {
this.hasPendingAggregationRequest = hasPendingRequest; this.hasPendingAggregationRequest = hasPendingRequest;
} }
@Override
public void removeAggregationContext() {
this.aggregationContext = null;
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment