Skip to content
Snippets Groups Projects
Commit 367694d4 authored by lkeidel's avatar lkeidel
Browse files

Merge branch 'dev-group3-supervisor-fix' into 'dev-group3'

Added tell supervisor action to make system init work

Fixes #16 by introducing a new tell supervisor action, that is sent additionally to the ActionJoinAnswer message.
If the answering node does not know the supervisor yet, it will postpone the "tell supervisor" action until it does know one.
That way, we are able to spawn large amounts of wallets

See merge request !4
parents 52740f26 2b7d5802
No related branches found
No related tags found
2 merge requests!5Configuration system,!4Added tell supervisor action to make system init work
...@@ -5,7 +5,7 @@ import akka.actor.ActorSystem; ...@@ -5,7 +5,7 @@ import akka.actor.ActorSystem;
import akka.actor.Props; import akka.actor.Props;
import com.typesafe.config.Config; import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigFactory;
import fucoin.actions.join.ActionJoinAnswer; import fucoin.actions.join.ActionTellSupervisor;
import fucoin.setup.NetworkInterfaceReader; import fucoin.setup.NetworkInterfaceReader;
import fucoin.supervisor.SuperVisorImpl; import fucoin.supervisor.SuperVisorImpl;
import fucoin.wallet.WalletImpl; import fucoin.wallet.WalletImpl;
...@@ -14,13 +14,11 @@ import java.io.File; ...@@ -14,13 +14,11 @@ import java.io.File;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import static akka.dispatch.Futures.future;
public class Main { public class Main {
private static int numberOfWallets = 2; private static int numberOfWallets = 2;
private static boolean createGUI = true; private static boolean createGUI = false;
private static ActorSystem cSystem; private static ActorSystem cSystem;
...@@ -69,7 +67,8 @@ public class Main { ...@@ -69,7 +67,8 @@ public class Main {
// first wallet does not have a neighbour, so it can't send a ActionJoin to anybody // first wallet does not have a neighbour, so it can't send a ActionJoin to anybody
// instead we send directly an ActionJoinAnswer with the supervisor reference // instead we send directly an ActionJoinAnswer with the supervisor reference
if (i == 0) { if (i == 0) {
actorRef.tell(new ActionJoinAnswer(future(() -> cSuperVisorActor, cSystem.dispatcher())), cSuperVisorActor); //actorRef.tell(new ActionJoinAnswer(cSuperVisorActor), cSuperVisorActor);
actorRef.tell(new ActionTellSupervisor(cSuperVisorActor), cSuperVisorActor);
} }
cActiveActors.add(actorRef); cActiveActors.add(actorRef);
......
...@@ -14,8 +14,14 @@ public class ActionJoin extends ClientAction { ...@@ -14,8 +14,14 @@ public class ActionJoin extends ClientAction {
protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, AbstractWallet node) { protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, AbstractWallet node) {
// send the joined node all known neighbours from node and a reference to the supervisor // send the joined node all known neighbours from node and a reference to the supervisor
ActionJoinAnswer aja = new ActionJoinAnswer(node.resolveSuperVisorActor()); ActionJoinAnswer aja = new ActionJoinAnswer();
aja.someNeighbors.putAll(node.getKnownNeighbors()); aja.someNeighbors.putAll(node.getKnownNeighbors());
sender.tell(aja, self); sender.tell(aja, self);
if (node.getRemoteSuperVisorActor() == null) {
node.deferSendOfSuperVisorActor(sender);
} else {
sender.tell(new ActionTellSupervisor(node.getRemoteSuperVisorActor()), self);
}
} }
} }
...@@ -24,11 +24,6 @@ import java.util.Map.Entry; ...@@ -24,11 +24,6 @@ import java.util.Map.Entry;
*/ */
public class ActionJoinAnswer extends ClientAction { public class ActionJoinAnswer extends ClientAction {
public final HashMap<String, ActorRef> someNeighbors = new HashMap<>(); public final HashMap<String, ActorRef> someNeighbors = new HashMap<>();
public final Future<ActorRef> supervisor;
public ActionJoinAnswer(Future<ActorRef> supervisor) {
this.supervisor = supervisor;
}
protected void onAction(ActorRef sender, ActorRef self, protected void onAction(ActorRef sender, ActorRef self,
UntypedActorContext context, AbstractWallet wallet) { UntypedActorContext context, AbstractWallet wallet) {
...@@ -42,18 +37,6 @@ public class ActionJoinAnswer extends ClientAction { ...@@ -42,18 +37,6 @@ public class ActionJoinAnswer extends ClientAction {
for (Entry<String, ActorRef> neighbor : someNeighbors.entrySet()) { for (Entry<String, ActorRef> neighbor : someNeighbors.entrySet()) {
neighbor.getValue().tell(new ActionSearchMyWallet(wallet.getName()), self); neighbor.getValue().tell(new ActionSearchMyWallet(wallet.getName()), self);
} }
// register at the supervisor if the wallet just learned about it
if (wallet.getRemoteSuperVisorActor() == null) {
supervisor.onSuccess(new OnSuccess<ActorRef>() {
@Override
public void onSuccess(ActorRef result) throws Throwable {
wallet.setRemoteSuperVisorActor(result);
}
}, context.system().dispatcher());
//wallet.setRemoteSuperVisorActor(supervisor);
}
} }
} }
package fucoin.actions.join;
import akka.actor.ActorRef;
import akka.actor.UntypedActorContext;
import fucoin.actions.ClientAction;
import fucoin.wallet.AbstractWallet;
/**
* Tell the joining node the supervisor
*/
public class ActionTellSupervisor extends ClientAction {
public final ActorRef supervisor;
public ActionTellSupervisor(ActorRef supervisor) {
this.supervisor = supervisor;
}
@Override
protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, AbstractWallet abstractNode) {
abstractNode.setRemoteSuperVisorActor(supervisor);
}
}
...@@ -21,7 +21,7 @@ public class ServerActionJoin extends SuperVisorAction { ...@@ -21,7 +21,7 @@ public class ServerActionJoin extends SuperVisorAction {
@Override @Override
protected void onAction(ActorRef sender, ActorRef self, protected void onAction(ActorRef sender, ActorRef self,
UntypedActorContext context, SuperVisorImpl node) { UntypedActorContext context, SuperVisorImpl node) {
ActionJoinAnswer aja = new ActionJoinAnswer(node.resolveSelf()); ActionJoinAnswer aja = new ActionJoinAnswer(); // TODO: Might need added TellSupervisor
aja.someNeighbors.putAll(node.getKnownNeighbors()); aja.someNeighbors.putAll(node.getKnownNeighbors());
sender.tell(aja, self); sender.tell(aja, self);
node.addKnownNeighbor(name, sender); node.addKnownNeighbor(name, sender);
......
...@@ -3,6 +3,8 @@ package fucoin.gui; ...@@ -3,6 +3,8 @@ package fucoin.gui;
import fucoin.supervisor.SuperVisorImpl; import fucoin.supervisor.SuperVisorImpl;
import javax.swing.*; import javax.swing.*;
import javax.swing.event.TableModelEvent;
import javax.swing.event.TableModelListener;
import java.awt.*; import java.awt.*;
import java.awt.event.ItemEvent; import java.awt.event.ItemEvent;
import java.awt.event.WindowAdapter; import java.awt.event.WindowAdapter;
...@@ -31,6 +33,7 @@ public class SuperVisorGuiControlImpl implements SuperVisorGuiControl { ...@@ -31,6 +33,7 @@ public class SuperVisorGuiControlImpl implements SuperVisorGuiControl {
//Init Amount Table and SuperVisorImpl //Init Amount Table and SuperVisorImpl
JTable amountListView = new JTable(superVisor.getAmountTableModel()); JTable amountListView = new JTable(superVisor.getAmountTableModel());
superVisor.getAmountTableModel().addTableModelListener(e -> SwingUtilities.invokeLater(() -> frame.setTitle("Server (" + superVisor.getAmountTableModel().getRowCount() + " Wallets)")));
contentPanel.add(new JScrollPane(amountListView)); contentPanel.add(new JScrollPane(amountListView));
JPanel logPanel = new JPanel(new BorderLayout()); JPanel logPanel = new JPanel(new BorderLayout());
......
package fucoin.supervisor; package fucoin.supervisor;
import akka.actor.ActorRef;
import akka.actor.Props; import akka.actor.Props;
import fucoin.actions.Action; import fucoin.actions.Action;
import fucoin.actions.persist.ActionInvokeUpdate; import fucoin.actions.persist.ActionInvokeUpdate;
...@@ -9,7 +8,6 @@ import fucoin.actions.transaction.SuperVisorAction; ...@@ -9,7 +8,6 @@ import fucoin.actions.transaction.SuperVisorAction;
import fucoin.gui.SuperVisorGuiControl; import fucoin.gui.SuperVisorGuiControl;
import fucoin.AbstractNode; import fucoin.AbstractNode;
import fucoin.gui.TransactionLogger; import fucoin.gui.TransactionLogger;
import scala.concurrent.Future;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
...@@ -84,10 +82,6 @@ public class SuperVisorImpl extends AbstractNode implements TransactionLogger{ ...@@ -84,10 +82,6 @@ public class SuperVisorImpl extends AbstractNode implements TransactionLogger{
self().tell(new ActionUpdateQueue(), self()); self().tell(new ActionUpdateQueue(), self());
} }
public Future<ActorRef> resolveSelf() {
return future(() -> self(), context().system().dispatcher());
}
/** /**
* filters the request for outdated and removes them * filters the request for outdated and removes them
* *
......
...@@ -96,7 +96,7 @@ public abstract class AbstractWallet extends AbstractNode implements Serializabl ...@@ -96,7 +96,7 @@ public abstract class AbstractWallet extends AbstractNode implements Serializabl
*/ */
public abstract void setRemoteSuperVisorActor(ActorRef remoteSuperVisorActor); public abstract void setRemoteSuperVisorActor(ActorRef remoteSuperVisorActor);
public abstract Future<ActorRef> resolveSuperVisorActor(); public abstract void deferSendOfSuperVisorActor(ActorRef destinationWallet);
/** /**
* Sends amount FUCs to the wallet with the address adress * Sends amount FUCs to the wallet with the address adress
......
...@@ -5,6 +5,7 @@ import akka.actor.Props; ...@@ -5,6 +5,7 @@ import akka.actor.Props;
import fucoin.actions.ClientAction; import fucoin.actions.ClientAction;
import fucoin.actions.join.ActionJoin; import fucoin.actions.join.ActionJoin;
import fucoin.actions.join.ActionJoinAnswer; import fucoin.actions.join.ActionJoinAnswer;
import fucoin.actions.join.ActionTellSupervisor;
import fucoin.actions.join.ServerActionJoin; import fucoin.actions.join.ServerActionJoin;
import fucoin.actions.persist.ActionInvokeLeave; import fucoin.actions.persist.ActionInvokeLeave;
import fucoin.actions.persist.ActionInvokeRevive; import fucoin.actions.persist.ActionInvokeRevive;
...@@ -15,6 +16,8 @@ import scala.concurrent.Future; ...@@ -15,6 +16,8 @@ import scala.concurrent.Future;
import static akka.dispatch.Futures.future; import static akka.dispatch.Futures.future;
import java.util.concurrent.ConcurrentLinkedQueue;
public class WalletImpl extends AbstractWallet { public class WalletImpl extends AbstractWallet {
private ActorRef preKnownNeighbour; private ActorRef preKnownNeighbour;
...@@ -22,6 +25,7 @@ public class WalletImpl extends AbstractWallet { ...@@ -22,6 +25,7 @@ public class WalletImpl extends AbstractWallet {
private transient WalletGuiControl gui; private transient WalletGuiControl gui;
private String preKnownNeighbourName; private String preKnownNeighbourName;
private boolean isActive; private boolean isActive;
private ConcurrentLinkedQueue<ActorRef> deferedSupervisorReceivers = new ConcurrentLinkedQueue<>();
public WalletImpl(String name) { public WalletImpl(String name) {
super(name); super(name);
...@@ -85,11 +89,15 @@ public class WalletImpl extends AbstractWallet { ...@@ -85,11 +89,15 @@ public class WalletImpl extends AbstractWallet {
if (preKnownNeighbour != null) { if (preKnownNeighbour != null) {
addKnownNeighbor(preKnownNeighbourName, preKnownNeighbour); addKnownNeighbor(preKnownNeighbourName, preKnownNeighbour);
preKnownNeighbour.tell(new ActionJoin(), getSelf()); preKnownNeighbour.tell(new ActionJoin(), getSelf());
ActionJoinAnswer aja = new ActionJoinAnswer(this.resolveSuperVisorActor()); ActionJoinAnswer aja = new ActionJoinAnswer();
aja.someNeighbors.putAll(getKnownNeighbors()); aja.someNeighbors.putAll(getKnownNeighbors());
aja.someNeighbors.put(name, getSelf()); aja.someNeighbors.put(name, getSelf());
preKnownNeighbour.tell(aja, getSelf()); preKnownNeighbour.tell(aja, getSelf());
if (this.getRemoteSuperVisorActor() != null) {
preKnownNeighbour.tell(new ActionTellSupervisor(this.getRemoteSuperVisorActor()), self());
}
} }
} }
...@@ -158,6 +166,21 @@ public class WalletImpl extends AbstractWallet { ...@@ -158,6 +166,21 @@ public class WalletImpl extends AbstractWallet {
if (this.remoteSuperVisorActor == null) { if (this.remoteSuperVisorActor == null) {
this.remoteSuperVisorActor = remoteSuperVisorActor; this.remoteSuperVisorActor = remoteSuperVisorActor;
this.remoteSuperVisorActor.tell(new ServerActionJoin(getName()), getSelf()); this.remoteSuperVisorActor.tell(new ServerActionJoin(getName()), getSelf());
this.tellDeferedSuperVisorReceivers(remoteSuperVisorActor);
}
}
@Override
public void deferSendOfSuperVisorActor(ActorRef destinationWallet) {
deferedSupervisorReceivers.add(destinationWallet);
}
protected void tellDeferedSuperVisorReceivers(ActorRef supervisor) {
while (deferedSupervisorReceivers.size() > 0) {
ActorRef receiver = deferedSupervisorReceivers.poll();
if (receiver != null) {
receiver.tell(new ActionTellSupervisor(supervisor), self());
}
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment