From 1b8e451bf4eddeca63322806a8691a414df21e1c Mon Sep 17 00:00:00 2001 From: Kim Kern <kim.kern@fu-berlin.de> Date: Tue, 5 Jul 2016 14:43:13 +0200 Subject: [PATCH] added gui aggregation trigger --- .../aggregation/ActionAggregation.java | 5 +++- .../aggregation/ActionAggregationReply.java | 1 - .../aggregation/ActionAggregationRequest.java | 13 +++++---- .../aggregation/AggregationContext.java | 5 ++-- .../MassWalletConfiguration.java | 7 +++-- .../fucoin/gui/SuperVisorGuiControlImpl.java | 12 +++----- .../java/fucoin/gui/SuperVisorThreadGUI.java | 21 ++++++++++++-- .../fucoin/supervisor/SuperVisorImpl.java | 28 +++++++++++++------ .../java/fucoin/wallet/AbstractWallet.java | 5 ++++ src/main/java/fucoin/wallet/WalletImpl.java | 5 ++++ 10 files changed, 72 insertions(+), 30 deletions(-) diff --git a/src/main/java/fucoin/actions/aggregation/ActionAggregation.java b/src/main/java/fucoin/actions/aggregation/ActionAggregation.java index ac08f20..90ec575 100644 --- a/src/main/java/fucoin/actions/aggregation/ActionAggregation.java +++ b/src/main/java/fucoin/actions/aggregation/ActionAggregation.java @@ -31,7 +31,9 @@ public abstract class ActionAggregation extends ClientAction { if (!aggregationContext.isDone()) { sendSleepingRequest(wallet, aggregationContext); } else { + System.out.println(aggregationContext); sendAggregatedResult(wallet, aggregationContext); + } } @@ -39,6 +41,7 @@ public abstract class ActionAggregation extends ClientAction { ActionAggregationResult result = new ActionAggregationResult(aggregationContext); wallet.getRemoteSuperVisorActor() .tell(result, wallet.getSelf()); + wallet.removeAggregationContext(); } private void sendSleepingRequest(AbstractWallet wallet, AggregationContext aggregationContext) { @@ -46,7 +49,7 @@ public abstract class ActionAggregation extends ClientAction { Thread sleepingRequest = new Thread() { @Override public void run() { - int wait = new Random().nextInt(500); + int wait = new Random().nextInt(500) + 10; try { Thread.sleep(wait); ActionAggregationRequest request = new ActionAggregationRequest(aggregationContext); diff --git a/src/main/java/fucoin/actions/aggregation/ActionAggregationReply.java b/src/main/java/fucoin/actions/aggregation/ActionAggregationReply.java index 8f24c63..30bbebc 100644 --- a/src/main/java/fucoin/actions/aggregation/ActionAggregationReply.java +++ b/src/main/java/fucoin/actions/aggregation/ActionAggregationReply.java @@ -23,7 +23,6 @@ public class ActionAggregationReply extends ActionAggregation { protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, AbstractWallet wallet) { wallet.setAggregationContext(getContext()); wallet.setPendingAggregationRequest(false); - continueAggregation(wallet); } } diff --git a/src/main/java/fucoin/actions/aggregation/ActionAggregationRequest.java b/src/main/java/fucoin/actions/aggregation/ActionAggregationRequest.java index a0f1019..75f4d16 100644 --- a/src/main/java/fucoin/actions/aggregation/ActionAggregationRequest.java +++ b/src/main/java/fucoin/actions/aggregation/ActionAggregationRequest.java @@ -22,21 +22,21 @@ public class ActionAggregationRequest extends ActionAggregation { @Override protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, AbstractWallet wallet) { if (wallet.hasPendingAggregationRequest()) { - sendCancel(wallet); + sendCancel(sender, self); } else { replyAggregatedContext(sender, wallet); continueAggregation(wallet); } } - private void sendCancel(AbstractWallet wallet) { + private void sendCancel(ActorRef wallet, ActorRef self) { ActionAggregationCancel cancel = new ActionAggregationCancel(); - wallet.getRemoteSuperVisorActor() - .tell(cancel, wallet.getSelf()); + wallet.tell(cancel, self); } private void replyAggregatedContext(ActorRef sender, AbstractWallet wallet) { - if (!wallet.hasAggregationContext()) { + boolean isFirstRequest = !wallet.hasAggregationContext(); + if (isFirstRequest) { initContext(wallet); } AggregationContext aggregatedContext = wallet.getAggregationContext() @@ -44,6 +44,9 @@ public class ActionAggregationRequest extends ActionAggregation { wallet.setAggregationContext(aggregatedContext); ActionAggregationReply reply = new ActionAggregationReply(aggregatedContext); sender.tell(reply, wallet.getSelf()); + if (isFirstRequest) { + continueAggregation(wallet); + } } private void initContext(AbstractWallet wallet) { diff --git a/src/main/java/fucoin/actions/aggregation/AggregationContext.java b/src/main/java/fucoin/actions/aggregation/AggregationContext.java index 3b02259..86bde3d 100644 --- a/src/main/java/fucoin/actions/aggregation/AggregationContext.java +++ b/src/main/java/fucoin/actions/aggregation/AggregationContext.java @@ -41,7 +41,8 @@ public class AggregationContext implements Serializable { } public boolean isDone() { - return getCurrentExchanges() >= getMaxExchanges(); + boolean isDone = getCurrentExchanges() >= getMaxExchanges(); + return isDone; } public double getValue() { @@ -50,7 +51,7 @@ public class AggregationContext implements Serializable { @Override public String toString() { - return "The aggregated value is " + getValue() + "after " + getMaxExchanges() + "aggregations."; + return "The aggregated value is " + getValue() + "after " + getCurrentExchanges() + "aggregations."; } diff --git a/src/main/java/fucoin/configurations/MassWalletConfiguration.java b/src/main/java/fucoin/configurations/MassWalletConfiguration.java index fcaaadf..aed13f0 100644 --- a/src/main/java/fucoin/configurations/MassWalletConfiguration.java +++ b/src/main/java/fucoin/configurations/MassWalletConfiguration.java @@ -1,5 +1,6 @@ package fucoin.configurations; +import akka.actor.ActorRef; import fucoin.configurations.internal.ConfigurationName; /** @@ -9,15 +10,15 @@ import fucoin.configurations.internal.ConfigurationName; public class MassWalletConfiguration extends AbstractConfiguration { @Override public void run() { - initSupervisor(); + ActorRef supervisor = initSupervisor(); try { - spawnWallets(200, false); + spawnWallets(10, false); System.out.println("Wallet spawning done!"); } catch (Exception e) { System.out.println("Wallet spawning timed out!"); } - randomTransactions(100, 10); + randomTransactions(10, 4); } @Override diff --git a/src/main/java/fucoin/gui/SuperVisorGuiControlImpl.java b/src/main/java/fucoin/gui/SuperVisorGuiControlImpl.java index b0c9a12..453b6c6 100644 --- a/src/main/java/fucoin/gui/SuperVisorGuiControlImpl.java +++ b/src/main/java/fucoin/gui/SuperVisorGuiControlImpl.java @@ -3,14 +3,6 @@ package fucoin.gui; import fucoin.supervisor.AmountTableModel; import fucoin.supervisor.SuperVisorImpl; -import javax.swing.*; -import javax.swing.event.TableModelEvent; -import javax.swing.event.TableModelListener; -import java.awt.*; -import java.awt.event.ItemEvent; -import java.awt.event.WindowAdapter; -import java.awt.event.WindowEvent; - public class SuperVisorGuiControlImpl implements SuperVisorGuiControl { private SuperVisorImpl superVisor; @@ -88,4 +80,8 @@ public class SuperVisorGuiControlImpl implements SuperVisorGuiControl { public boolean isLogActive() { return logActive; } + + public void startAggregation() { + superVisor.startAggregation(); + } } diff --git a/src/main/java/fucoin/gui/SuperVisorThreadGUI.java b/src/main/java/fucoin/gui/SuperVisorThreadGUI.java index d5d7a1b..63b8f8d 100644 --- a/src/main/java/fucoin/gui/SuperVisorThreadGUI.java +++ b/src/main/java/fucoin/gui/SuperVisorThreadGUI.java @@ -1,11 +1,20 @@ package fucoin.gui; -import javax.swing.*; -import java.awt.*; +import java.awt.BorderLayout; +import java.awt.GridLayout; import java.awt.event.ItemEvent; import java.awt.event.WindowAdapter; import java.awt.event.WindowEvent; +import javax.swing.JButton; +import javax.swing.JCheckBox; +import javax.swing.JFrame; +import javax.swing.JList; +import javax.swing.JPanel; +import javax.swing.JScrollPane; +import javax.swing.JTable; +import javax.swing.SwingUtilities; + /** * */ @@ -19,6 +28,8 @@ public class SuperVisorThreadGUI { private JCheckBox activateLogging; private SuperVisorGuiControlImpl superVisorGuiControl; + private JButton startAggregationButton; + public SuperVisorThreadGUI(SuperVisorGuiControlImpl superVisorGuiControl) { this.superVisorGuiControl = superVisorGuiControl; @@ -38,6 +49,11 @@ public class SuperVisorThreadGUI { JPanel logPanel = new JPanel(new BorderLayout()); txtLog.setCellRenderer(new LogCellRenderer()); + + startAggregationButton = new JButton("Start Aggregation"); + startAggregationButton.addActionListener(e -> { + superVisorGuiControl.startAggregation(); + }); showDebug = new JCheckBox("Show debug messages in transaction log"); showDebug.setSelected(true); @@ -64,6 +80,7 @@ public class SuperVisorThreadGUI { configPanel.add(activateLogging); configPanel.add(showDebug); + configPanel.add(startAggregationButton); //logPanel.add(activateLogging, BorderLayout.NORTH); logPanel.add(configPanel, BorderLayout.NORTH); diff --git a/src/main/java/fucoin/supervisor/SuperVisorImpl.java b/src/main/java/fucoin/supervisor/SuperVisorImpl.java index 8b4ec17..5f64d11 100644 --- a/src/main/java/fucoin/supervisor/SuperVisorImpl.java +++ b/src/main/java/fucoin/supervisor/SuperVisorImpl.java @@ -1,24 +1,27 @@ package fucoin.supervisor; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import javax.swing.SwingUtilities; + import akka.actor.ActorRef; import akka.actor.Props; +import fucoin.AbstractNode; import fucoin.actions.Action; +import fucoin.actions.aggregation.ActionAggregationRequest; +import fucoin.actions.aggregation.AggregationContext; import fucoin.actions.control.ActionWalletCreationDone; import fucoin.actions.persist.ActionInvokeUpdate; import fucoin.actions.transaction.ActionGetAmountAnswer; import fucoin.actions.transaction.ActionNotifyObserver; import fucoin.actions.transaction.SuperVisorAction; import fucoin.gui.SuperVisorGuiControl; -import fucoin.AbstractNode; import fucoin.gui.TransactionLogger; -import javax.swing.*; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; - public class SuperVisorImpl extends AbstractNode implements TransactionLogger { //private AmountTableModel amountTableModel; @@ -167,4 +170,13 @@ public class SuperVisorImpl extends AbstractNode implements TransactionLogger { public void setBankCommitObserver(ActorRef bankCommitObserver) { this.bankCommitObserver = bankCommitObserver; } + + public void startAggregation() { + AggregationContext context = new AggregationContext((x, y) -> Math.max(x, y), x -> new Double(x.getAmount()), 0, + 0, 20); + ActorRef randomNode = getRandomNeighbor(); + ActionAggregationRequest request = new ActionAggregationRequest(context); + randomNode.tell(request, getSelf()); + System.out.println("init"); + } } diff --git a/src/main/java/fucoin/wallet/AbstractWallet.java b/src/main/java/fucoin/wallet/AbstractWallet.java index 4c366c5..d82b8a8 100644 --- a/src/main/java/fucoin/wallet/AbstractWallet.java +++ b/src/main/java/fucoin/wallet/AbstractWallet.java @@ -152,4 +152,9 @@ public abstract class AbstractWallet extends AbstractNode implements Serializabl * @param observer */ public abstract void send(String address, int amount, ActorRef observer); + + /** + * TODO Kommentar Kim + */ + public abstract void removeAggregationContext(); } diff --git a/src/main/java/fucoin/wallet/WalletImpl.java b/src/main/java/fucoin/wallet/WalletImpl.java index 2bf93d4..42900cf 100644 --- a/src/main/java/fucoin/wallet/WalletImpl.java +++ b/src/main/java/fucoin/wallet/WalletImpl.java @@ -274,4 +274,9 @@ public class WalletImpl extends AbstractWallet { this.hasPendingAggregationRequest = hasPendingRequest; } + @Override + public void removeAggregationContext() { + this.aggregationContext = null; + } + } -- GitLab