diff --git a/src/main/java/fucoin/actions/aggregation/ActionAggregation.java b/src/main/java/fucoin/actions/aggregation/ActionAggregation.java index ac08f2029b54a47148e1a3901aa10952855e67e7..90ec575280956be10c84f8c19082e6dfbf8c5a5b 100644 --- a/src/main/java/fucoin/actions/aggregation/ActionAggregation.java +++ b/src/main/java/fucoin/actions/aggregation/ActionAggregation.java @@ -31,7 +31,9 @@ public abstract class ActionAggregation extends ClientAction { if (!aggregationContext.isDone()) { sendSleepingRequest(wallet, aggregationContext); } else { + System.out.println(aggregationContext); sendAggregatedResult(wallet, aggregationContext); + } } @@ -39,6 +41,7 @@ public abstract class ActionAggregation extends ClientAction { ActionAggregationResult result = new ActionAggregationResult(aggregationContext); wallet.getRemoteSuperVisorActor() .tell(result, wallet.getSelf()); + wallet.removeAggregationContext(); } private void sendSleepingRequest(AbstractWallet wallet, AggregationContext aggregationContext) { @@ -46,7 +49,7 @@ public abstract class ActionAggregation extends ClientAction { Thread sleepingRequest = new Thread() { @Override public void run() { - int wait = new Random().nextInt(500); + int wait = new Random().nextInt(500) + 10; try { Thread.sleep(wait); ActionAggregationRequest request = new ActionAggregationRequest(aggregationContext); diff --git a/src/main/java/fucoin/actions/aggregation/ActionAggregationReply.java b/src/main/java/fucoin/actions/aggregation/ActionAggregationReply.java index 8f24c63222f6d8217894023a50eb6616d73b3839..30bbebca38207802e0c228a3195475674ddbbd3a 100644 --- a/src/main/java/fucoin/actions/aggregation/ActionAggregationReply.java +++ b/src/main/java/fucoin/actions/aggregation/ActionAggregationReply.java @@ -23,7 +23,6 @@ public class ActionAggregationReply extends ActionAggregation { protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, AbstractWallet wallet) { wallet.setAggregationContext(getContext()); wallet.setPendingAggregationRequest(false); - continueAggregation(wallet); } } diff --git a/src/main/java/fucoin/actions/aggregation/ActionAggregationRequest.java b/src/main/java/fucoin/actions/aggregation/ActionAggregationRequest.java index a0f10190ed19ddeb7c744a946ced11ab82c81e7f..75f4d1626f733a6f6bb13ee4b56e8d2c7a6cc43d 100644 --- a/src/main/java/fucoin/actions/aggregation/ActionAggregationRequest.java +++ b/src/main/java/fucoin/actions/aggregation/ActionAggregationRequest.java @@ -22,21 +22,21 @@ public class ActionAggregationRequest extends ActionAggregation { @Override protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, AbstractWallet wallet) { if (wallet.hasPendingAggregationRequest()) { - sendCancel(wallet); + sendCancel(sender, self); } else { replyAggregatedContext(sender, wallet); continueAggregation(wallet); } } - private void sendCancel(AbstractWallet wallet) { + private void sendCancel(ActorRef wallet, ActorRef self) { ActionAggregationCancel cancel = new ActionAggregationCancel(); - wallet.getRemoteSuperVisorActor() - .tell(cancel, wallet.getSelf()); + wallet.tell(cancel, self); } private void replyAggregatedContext(ActorRef sender, AbstractWallet wallet) { - if (!wallet.hasAggregationContext()) { + boolean isFirstRequest = !wallet.hasAggregationContext(); + if (isFirstRequest) { initContext(wallet); } AggregationContext aggregatedContext = wallet.getAggregationContext() @@ -44,6 +44,9 @@ public class ActionAggregationRequest extends ActionAggregation { wallet.setAggregationContext(aggregatedContext); ActionAggregationReply reply = new ActionAggregationReply(aggregatedContext); sender.tell(reply, wallet.getSelf()); + if (isFirstRequest) { + continueAggregation(wallet); + } } private void initContext(AbstractWallet wallet) { diff --git a/src/main/java/fucoin/actions/aggregation/AggregationContext.java b/src/main/java/fucoin/actions/aggregation/AggregationContext.java index 3b02259b39b255a50f99165398151f3dd61de73d..86bde3ddddb933b3e42aee84cadd37b84b7b0743 100644 --- a/src/main/java/fucoin/actions/aggregation/AggregationContext.java +++ b/src/main/java/fucoin/actions/aggregation/AggregationContext.java @@ -41,7 +41,8 @@ public class AggregationContext implements Serializable { } public boolean isDone() { - return getCurrentExchanges() >= getMaxExchanges(); + boolean isDone = getCurrentExchanges() >= getMaxExchanges(); + return isDone; } public double getValue() { @@ -50,7 +51,7 @@ public class AggregationContext implements Serializable { @Override public String toString() { - return "The aggregated value is " + getValue() + "after " + getMaxExchanges() + "aggregations."; + return "The aggregated value is " + getValue() + "after " + getCurrentExchanges() + "aggregations."; } diff --git a/src/main/java/fucoin/configurations/MassWalletConfiguration.java b/src/main/java/fucoin/configurations/MassWalletConfiguration.java index fcaaadf3cea173e7b0cf3fa6fc038fbee5dcecae..aed13f029fda536d0255385a4b3c9660b564056b 100644 --- a/src/main/java/fucoin/configurations/MassWalletConfiguration.java +++ b/src/main/java/fucoin/configurations/MassWalletConfiguration.java @@ -1,5 +1,6 @@ package fucoin.configurations; +import akka.actor.ActorRef; import fucoin.configurations.internal.ConfigurationName; /** @@ -9,15 +10,15 @@ import fucoin.configurations.internal.ConfigurationName; public class MassWalletConfiguration extends AbstractConfiguration { @Override public void run() { - initSupervisor(); + ActorRef supervisor = initSupervisor(); try { - spawnWallets(200, false); + spawnWallets(10, false); System.out.println("Wallet spawning done!"); } catch (Exception e) { System.out.println("Wallet spawning timed out!"); } - randomTransactions(100, 10); + randomTransactions(10, 4); } @Override diff --git a/src/main/java/fucoin/gui/SuperVisorGuiControlImpl.java b/src/main/java/fucoin/gui/SuperVisorGuiControlImpl.java index b0c9a125e68a2cadce66c418126fa563e9aec6bc..453b6c6a36338124ef1e9ea590a29ed8d46852b0 100644 --- a/src/main/java/fucoin/gui/SuperVisorGuiControlImpl.java +++ b/src/main/java/fucoin/gui/SuperVisorGuiControlImpl.java @@ -3,14 +3,6 @@ package fucoin.gui; import fucoin.supervisor.AmountTableModel; import fucoin.supervisor.SuperVisorImpl; -import javax.swing.*; -import javax.swing.event.TableModelEvent; -import javax.swing.event.TableModelListener; -import java.awt.*; -import java.awt.event.ItemEvent; -import java.awt.event.WindowAdapter; -import java.awt.event.WindowEvent; - public class SuperVisorGuiControlImpl implements SuperVisorGuiControl { private SuperVisorImpl superVisor; @@ -88,4 +80,8 @@ public class SuperVisorGuiControlImpl implements SuperVisorGuiControl { public boolean isLogActive() { return logActive; } + + public void startAggregation() { + superVisor.startAggregation(); + } } diff --git a/src/main/java/fucoin/gui/SuperVisorThreadGUI.java b/src/main/java/fucoin/gui/SuperVisorThreadGUI.java index d5d7a1bced7a9b4047993970268255c0ce08f765..63b8f8d16d048c52705cde1a849d5c4cf4a3a804 100644 --- a/src/main/java/fucoin/gui/SuperVisorThreadGUI.java +++ b/src/main/java/fucoin/gui/SuperVisorThreadGUI.java @@ -1,11 +1,20 @@ package fucoin.gui; -import javax.swing.*; -import java.awt.*; +import java.awt.BorderLayout; +import java.awt.GridLayout; import java.awt.event.ItemEvent; import java.awt.event.WindowAdapter; import java.awt.event.WindowEvent; +import javax.swing.JButton; +import javax.swing.JCheckBox; +import javax.swing.JFrame; +import javax.swing.JList; +import javax.swing.JPanel; +import javax.swing.JScrollPane; +import javax.swing.JTable; +import javax.swing.SwingUtilities; + /** * */ @@ -19,6 +28,8 @@ public class SuperVisorThreadGUI { private JCheckBox activateLogging; private SuperVisorGuiControlImpl superVisorGuiControl; + private JButton startAggregationButton; + public SuperVisorThreadGUI(SuperVisorGuiControlImpl superVisorGuiControl) { this.superVisorGuiControl = superVisorGuiControl; @@ -38,6 +49,11 @@ public class SuperVisorThreadGUI { JPanel logPanel = new JPanel(new BorderLayout()); txtLog.setCellRenderer(new LogCellRenderer()); + + startAggregationButton = new JButton("Start Aggregation"); + startAggregationButton.addActionListener(e -> { + superVisorGuiControl.startAggregation(); + }); showDebug = new JCheckBox("Show debug messages in transaction log"); showDebug.setSelected(true); @@ -64,6 +80,7 @@ public class SuperVisorThreadGUI { configPanel.add(activateLogging); configPanel.add(showDebug); + configPanel.add(startAggregationButton); //logPanel.add(activateLogging, BorderLayout.NORTH); logPanel.add(configPanel, BorderLayout.NORTH); diff --git a/src/main/java/fucoin/supervisor/SuperVisorImpl.java b/src/main/java/fucoin/supervisor/SuperVisorImpl.java index 8b4ec173daab141c1172927ecaf34da83d5376c0..5f64d1104ca90f772680c854d2b9f3aeafac32e7 100644 --- a/src/main/java/fucoin/supervisor/SuperVisorImpl.java +++ b/src/main/java/fucoin/supervisor/SuperVisorImpl.java @@ -1,24 +1,27 @@ package fucoin.supervisor; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import javax.swing.SwingUtilities; + import akka.actor.ActorRef; import akka.actor.Props; +import fucoin.AbstractNode; import fucoin.actions.Action; +import fucoin.actions.aggregation.ActionAggregationRequest; +import fucoin.actions.aggregation.AggregationContext; import fucoin.actions.control.ActionWalletCreationDone; import fucoin.actions.persist.ActionInvokeUpdate; import fucoin.actions.transaction.ActionGetAmountAnswer; import fucoin.actions.transaction.ActionNotifyObserver; import fucoin.actions.transaction.SuperVisorAction; import fucoin.gui.SuperVisorGuiControl; -import fucoin.AbstractNode; import fucoin.gui.TransactionLogger; -import javax.swing.*; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; - public class SuperVisorImpl extends AbstractNode implements TransactionLogger { //private AmountTableModel amountTableModel; @@ -167,4 +170,13 @@ public class SuperVisorImpl extends AbstractNode implements TransactionLogger { public void setBankCommitObserver(ActorRef bankCommitObserver) { this.bankCommitObserver = bankCommitObserver; } + + public void startAggregation() { + AggregationContext context = new AggregationContext((x, y) -> Math.max(x, y), x -> new Double(x.getAmount()), 0, + 0, 20); + ActorRef randomNode = getRandomNeighbor(); + ActionAggregationRequest request = new ActionAggregationRequest(context); + randomNode.tell(request, getSelf()); + System.out.println("init"); + } } diff --git a/src/main/java/fucoin/wallet/AbstractWallet.java b/src/main/java/fucoin/wallet/AbstractWallet.java index 4c366c5fd96176cad7ebc31462a4754992ec5cd5..d82b8a818ec15476bd70a2e8f10352911e2ec0e4 100644 --- a/src/main/java/fucoin/wallet/AbstractWallet.java +++ b/src/main/java/fucoin/wallet/AbstractWallet.java @@ -152,4 +152,9 @@ public abstract class AbstractWallet extends AbstractNode implements Serializabl * @param observer */ public abstract void send(String address, int amount, ActorRef observer); + + /** + * TODO Kommentar Kim + */ + public abstract void removeAggregationContext(); } diff --git a/src/main/java/fucoin/wallet/WalletImpl.java b/src/main/java/fucoin/wallet/WalletImpl.java index 2bf93d4afc9a83419255adf9588fe708935bb7b6..42900cff4ef2637bc6cfb5689a64836faca4cccd 100644 --- a/src/main/java/fucoin/wallet/WalletImpl.java +++ b/src/main/java/fucoin/wallet/WalletImpl.java @@ -274,4 +274,9 @@ public class WalletImpl extends AbstractWallet { this.hasPendingAggregationRequest = hasPendingRequest; } + @Override + public void removeAggregationContext() { + this.aggregationContext = null; + } + }