From 40b677aa4b857c7490277352bcc5c33cc57a9f34 Mon Sep 17 00:00:00 2001 From: Michael Kmoch <dudelmicha@googlemail.com> Date: Thu, 23 Jul 2015 10:07:44 +0200 Subject: [PATCH] 2PC is working fine --- JavaAkkaFuCoin/src/fucoin/AbstractNode.java | 58 ++++ JavaAkkaFuCoin/src/fucoin/AbstractWallet.java | 126 +-------- JavaAkkaFuCoin/src/fucoin/Main.java | 23 +- JavaAkkaFuCoin/src/fucoin/Wallet.java | 249 ++++++------------ JavaAkkaFuCoin/src/fucoin/WalletCreator.java | 4 +- JavaAkkaFuCoin/src/fucoin/actions/Action.java | 21 ++ .../src/fucoin/actions/ActionGetAmount.java | 5 - .../fucoin/actions/ActionGetAmountAnswer.java | 15 -- .../src/fucoin/actions/ActionInvokeLeave.java | 5 - .../fucoin/actions/ActionInvokeRevive.java | 5 - .../fucoin/actions/ActionInvokeSentMoney.java | 12 - .../actions/ActionInvokeSentMoney2.java | 12 - .../fucoin/actions/ActionInvokeUpdate.java | 5 - .../src/fucoin/actions/ClientAction.java | 12 + .../src/fucoin/actions/join/ActionJoin.java | 18 ++ .../fucoin/actions/join/ActionJoinAnswer.java | 27 ++ .../fucoin/actions/join/GeneralAction.java | 8 + .../src/fucoin/actions/join/Join.java | 10 + .../fucoin/actions/join/ServerActionJoin.java | 27 ++ .../actions/persist/ActionInvalidate.java | 19 ++ .../actions/persist/ActionInvokeLeave.java | 24 ++ .../actions/persist/ActionInvokeRevive.java | 17 ++ .../actions/persist/ActionInvokeUpdate.java | 16 ++ .../actions/persist/ActionSearchMyWallet.java | 28 ++ .../persist/ActionSearchMyWalletAnswer.java | 20 ++ .../actions/persist/ActionStoreOrUpdate.java | 20 ++ .../src/fucoin/actions/persist/Persist.java | 7 + .../search/ActionSearchWalletReference.java | 50 ++++ .../ActionSearchWalletReferenceAnswer.java | 30 +++ .../src/fucoin/actions/search/Search.java | 7 + ...tionCommitDistributedCommitedTransfer.java | 58 ++++ .../actions/transaction/ActionGetAmount.java | 16 ++ .../transaction/ActionGetAmountAnswer.java | 25 ++ ...tionInvokeDistributedCommitedTransfer.java | 36 +++ .../transaction/ActionInvokeSentMoney.java | 41 +++ .../transaction/ActionInvokeSentMoney2.java | 22 ++ ...ionPrepareDistributedCommitedTransfer.java | 41 +++ ...pareDistributedCommitedTransferAnswer.java | 52 ++++ .../transaction/ActionReceiveTransaction.java | 18 ++ .../transaction/CoordinatorTransaction.java | 16 ++ .../actions/transaction/SuperVisorAction.java | 11 + .../actions/transaction/Transaction.java | 7 + JavaAkkaFuCoin/src/fucoin/gui/WalletGui.java | 5 +- .../fucoin/supervisor/ActionUpdateQueue.java | 41 +++ .../DistributedCommitedTransferRequest.java | 58 ++++ .../src/fucoin/supervisor/SuperVisor.java | 124 ++++----- 46 files changed, 1017 insertions(+), 434 deletions(-) create mode 100644 JavaAkkaFuCoin/src/fucoin/AbstractNode.java create mode 100644 JavaAkkaFuCoin/src/fucoin/actions/Action.java delete mode 100644 JavaAkkaFuCoin/src/fucoin/actions/ActionGetAmount.java delete mode 100644 JavaAkkaFuCoin/src/fucoin/actions/ActionGetAmountAnswer.java delete mode 100644 JavaAkkaFuCoin/src/fucoin/actions/ActionInvokeLeave.java delete mode 100644 JavaAkkaFuCoin/src/fucoin/actions/ActionInvokeRevive.java delete mode 100644 JavaAkkaFuCoin/src/fucoin/actions/ActionInvokeSentMoney.java delete mode 100644 JavaAkkaFuCoin/src/fucoin/actions/ActionInvokeSentMoney2.java delete mode 100644 JavaAkkaFuCoin/src/fucoin/actions/ActionInvokeUpdate.java create mode 100644 JavaAkkaFuCoin/src/fucoin/actions/ClientAction.java create mode 100644 JavaAkkaFuCoin/src/fucoin/actions/join/ActionJoin.java create mode 100644 JavaAkkaFuCoin/src/fucoin/actions/join/ActionJoinAnswer.java create mode 100644 JavaAkkaFuCoin/src/fucoin/actions/join/GeneralAction.java create mode 100644 JavaAkkaFuCoin/src/fucoin/actions/join/Join.java create mode 100644 JavaAkkaFuCoin/src/fucoin/actions/join/ServerActionJoin.java create mode 100644 JavaAkkaFuCoin/src/fucoin/actions/persist/ActionInvalidate.java create mode 100644 JavaAkkaFuCoin/src/fucoin/actions/persist/ActionInvokeLeave.java create mode 100644 JavaAkkaFuCoin/src/fucoin/actions/persist/ActionInvokeRevive.java create mode 100644 JavaAkkaFuCoin/src/fucoin/actions/persist/ActionInvokeUpdate.java create mode 100644 JavaAkkaFuCoin/src/fucoin/actions/persist/ActionSearchMyWallet.java create mode 100644 JavaAkkaFuCoin/src/fucoin/actions/persist/ActionSearchMyWalletAnswer.java create mode 100644 JavaAkkaFuCoin/src/fucoin/actions/persist/ActionStoreOrUpdate.java create mode 100644 JavaAkkaFuCoin/src/fucoin/actions/persist/Persist.java create mode 100644 JavaAkkaFuCoin/src/fucoin/actions/search/ActionSearchWalletReference.java create mode 100644 JavaAkkaFuCoin/src/fucoin/actions/search/ActionSearchWalletReferenceAnswer.java create mode 100644 JavaAkkaFuCoin/src/fucoin/actions/search/Search.java create mode 100644 JavaAkkaFuCoin/src/fucoin/actions/transaction/ActionCommitDistributedCommitedTransfer.java create mode 100644 JavaAkkaFuCoin/src/fucoin/actions/transaction/ActionGetAmount.java create mode 100644 JavaAkkaFuCoin/src/fucoin/actions/transaction/ActionGetAmountAnswer.java create mode 100644 JavaAkkaFuCoin/src/fucoin/actions/transaction/ActionInvokeDistributedCommitedTransfer.java create mode 100644 JavaAkkaFuCoin/src/fucoin/actions/transaction/ActionInvokeSentMoney.java create mode 100644 JavaAkkaFuCoin/src/fucoin/actions/transaction/ActionInvokeSentMoney2.java create mode 100644 JavaAkkaFuCoin/src/fucoin/actions/transaction/ActionPrepareDistributedCommitedTransfer.java create mode 100644 JavaAkkaFuCoin/src/fucoin/actions/transaction/ActionPrepareDistributedCommitedTransferAnswer.java create mode 100644 JavaAkkaFuCoin/src/fucoin/actions/transaction/ActionReceiveTransaction.java create mode 100644 JavaAkkaFuCoin/src/fucoin/actions/transaction/CoordinatorTransaction.java create mode 100644 JavaAkkaFuCoin/src/fucoin/actions/transaction/SuperVisorAction.java create mode 100644 JavaAkkaFuCoin/src/fucoin/actions/transaction/Transaction.java create mode 100644 JavaAkkaFuCoin/src/fucoin/supervisor/ActionUpdateQueue.java create mode 100644 JavaAkkaFuCoin/src/fucoin/supervisor/DistributedCommitedTransferRequest.java diff --git a/JavaAkkaFuCoin/src/fucoin/AbstractNode.java b/JavaAkkaFuCoin/src/fucoin/AbstractNode.java new file mode 100644 index 0000000..912b0a2 --- /dev/null +++ b/JavaAkkaFuCoin/src/fucoin/AbstractNode.java @@ -0,0 +1,58 @@ +package fucoin; + +import java.io.Serializable; +import java.util.HashMap; + +import akka.actor.ActorRef; +import akka.actor.UntypedActor; + +public abstract class AbstractNode extends UntypedActor implements Serializable { + + // Returns the akka-style address as String, which + // could be converted to an ActorRef object later + public String getAddress() { + return getAddress(getSelf()); + } + + public String getAddress(ActorRef self) { + return self.path().toSerializationFormatWithAddress(self.path().address()); + } + + // The which receives Action objects + public abstract void onReceive(Object message); + + // Holds references to neighbors that were in + // contact with this wallet during runtime; + // The key corresponds to the Wallet's name + private transient HashMap<String, ActorRef> knownNeighbors = new HashMap<String, ActorRef>(); + + // Holds references to neighbors this wallet + // synchronizes itself to (the Wallet object); + // The key corresponds to the Wallet's name + public transient HashMap<String, ActorRef> localNeighbors = new HashMap<String, ActorRef>(); + + // Holds all Wallets from network participants + // which synchronize their state (Wallet object) + // with us; + // The key corresponds to the Wallet's name + public transient HashMap<String, AbstractWallet> backedUpNeighbors = new HashMap<String, AbstractWallet>(); + + public transient HashMap<ActorRef, Integer> amounts = new HashMap<ActorRef, Integer>(); + + public boolean addKnownNeighbor(String key, ActorRef value) { + if(!knownNeighbors.containsKey(key)){ + knownNeighbors.put(key,value); + return true; + } + return false; + } + + public HashMap<String, ActorRef> getKnownNeighbors() { + return knownNeighbors; + } + + public void log(String string) { + System.out.println(getSelf().path().name()+": "+string); + } + +} \ No newline at end of file diff --git a/JavaAkkaFuCoin/src/fucoin/AbstractWallet.java b/JavaAkkaFuCoin/src/fucoin/AbstractWallet.java index c7bd8d8..e77b674 100644 --- a/JavaAkkaFuCoin/src/fucoin/AbstractWallet.java +++ b/JavaAkkaFuCoin/src/fucoin/AbstractWallet.java @@ -1,131 +1,27 @@ package fucoin; -import java.io.Serializable; -import java.util.HashMap; +public abstract class AbstractWallet extends AbstractNode{ -import akka.actor.ActorRef; -import akka.actor.UntypedActor; - -public abstract class AbstractWallet extends UntypedActor implements Serializable { - - // Used to join the network (a pre known participant/Wallet must be known) - public static class ActionJoin implements Serializable {} - - // Returns some neighbors that might be used as known - // and/or local neighbors - public class ActionJoinAnswer implements Serializable { - public final HashMap<String, ActorRef> someNeighbors = new HashMap<>(); - } - - // Used to push the state of my/a wallet to another participant - public static class ActionStoreOrUpdate implements Serializable { - public final AbstractWallet w; - public ActionStoreOrUpdate(AbstractWallet w) { - this.w = w; - } - } - - // May be used to delete a stored Wallet on another participant - public static class ActionInvalidate implements Serializable { - final String name; - public ActionInvalidate(String name) { - this.name = name; - } - } - - // Used to send (positive amount) or retreive money (negative amount) - public static class ActionReceiveTransaction implements Serializable { - final public int amount; - public ActionReceiveTransaction(int amount) { - this.amount = amount; - } - } - - // Used to search a Wallet by name, i.e. when we want to - // perform a transaction on it - public static class ActionSearchWalletReference implements Serializable { - final String name; - final long ttl; - public ActionSearchWalletReference(String name, long ttl) { - this.name = name; - this.ttl=ttl; - } - } - - // Used to return a Wallet reference (akka-style string which can - // be transformed to an ActorRef) - public static class ActionSearchWalletReferenceAnswer implements Serializable { - final String address; - final String name; - public ActionSearchWalletReferenceAnswer(String name,String address) { - this.address = address; - this.name=name; - } - } - - // Used to search a Wallet by name, i.e. the own wallet if we just - // joined the network; If a receiving participant holds the stored Wallet, - // he returns it, otherwise, he might use gossiping methods to go on - // with the search; - // Note: You should also forward the sender (the participant who actually - // searches for this Wallet, so that it can be returnd the direct way) - public static class ActionSearchMyWallet implements Serializable { - final String name; - public ActionSearchMyWallet(String name) { - this.name = name; - } - } - - // Used to return a searched Wallet - public static class ActionSearchMyWalletAnswer implements Serializable { - final AbstractWallet w; - public ActionSearchMyWalletAnswer(AbstractWallet w) { - this.w = w; - } - } - - // Constructor - public AbstractWallet(String name) { - this.name = name; - } + // Constructor + public AbstractWallet(String name) { + this.name = name; + } // Returns the name of this wallet, e.g. "Lieschen Müller" public String getName() { return this.name; } - - // Returns the akka-style address as String, which - // could be converted to an ActorRef object later - public abstract String getAddress(); - + // Performs housekeeping operations, e.g. pushes // backedUpNeighbor-entries to other neighbors public abstract void leave(); - - // The which receives Action objects - public abstract void onReceive(Object message); - - // Holds references to neighbors that were in - // contact with this wallet during runtime; - // The key corresponds to the Wallet's name - public transient HashMap<String, ActorRef> knownNeighbors = new HashMap<String, ActorRef>(); - - // Holds references to neighbors this wallet - // synchronizes itself to (the Wallet object); - // The key corresponds to the Wallet's name - public transient HashMap<String, ActorRef> localNeighbors = new HashMap<String, ActorRef>(); - - // Holds all Wallets from network participants - // which synchronize their state (Wallet object) - // with us; - // The key corresponds to the Wallet's name - public transient HashMap<String, AbstractWallet> backedUpNeighbors = new HashMap<String, AbstractWallet>(); - + + // The amount this wallet currently holds + public int amount; + // The name of this wallet (does never change, no // duplicates in network assumed) public final String name; - // The amount this wallet currently holds - public int amount; -} \ No newline at end of file +} diff --git a/JavaAkkaFuCoin/src/fucoin/Main.java b/JavaAkkaFuCoin/src/fucoin/Main.java index c131f6c..494bce4 100644 --- a/JavaAkkaFuCoin/src/fucoin/Main.java +++ b/JavaAkkaFuCoin/src/fucoin/Main.java @@ -9,10 +9,7 @@ import akka.actor.ActorSystem; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; -import fucoin.actions.ActionInvokeLeave; -import fucoin.actions.ActionInvokeRevive; -import fucoin.actions.ActionInvokeSentMoney; -import fucoin.actions.ActionInvokeUpdate; +import fucoin.actions.join.ServerActionJoin; import fucoin.supervisor.SuperVisor; @@ -27,11 +24,15 @@ public class Main { ActorRef superVisorActor = system.actorOf(SuperVisor.props(),"SuperVisor"); List<ActorRef> activeActors = new ArrayList<ActorRef>(); ActorRef a1 = system.actorOf(Wallet.props(null,"","Main",superVisorActor),"Main"); - //ActorRef a2 = system.actorOf(Wallet.props(a1,"Main","Main2",superVisorActor),"Main2"); + ActorRef a2 = system.actorOf(Wallet.props(a1,"Main","Main2",superVisorActor),"Main2"); + superVisorActor.tell(new ServerActionJoin("Main"), a1); + superVisorActor.tell(new ServerActionJoin("Main2"), a2); + //a2.tell(new ActionInvokeSentMoney("Main", 200), a2); //activeActors.add(a1); - int maxrounds = 1000; - int maxactors = 100; + /* + int maxrounds = 100; + int maxactors = 10; for(int actor=0; actor<maxactors;actor++){ activeActors.add(system.actorOf(Wallet.props(a1,"Main","Main"+actor,superVisorActor),"Main"+actor)); } @@ -46,13 +47,13 @@ public class Main { List<ActorRef> removedActors = new ArrayList<ActorRef>(); for(ActorRef actor:activeActors){ if(Math.random()<0.6){ - actor.tell(new ActionInvokeSentMoney("Main"+(int)Math.floor(Math.random()*10), (int) (Math.round(Math.random()*100)-50)), actor); + actor.tell(new ActionInvokeSentMoney("Main"+(int)Math.floor(Math.random()*maxactors), (int) (Math.round(Math.random()*100))), actor); } if(Math.random()<0.2){ removedActors.add(actor); int offtime = timestep+(int)(Math.random()*6)+2; - offline.get(Math.min(offtime, maxrounds)).add(actor); + offline.get(Math.min(offtime, maxrounds-1)).add(actor); } } activeActors.removeAll(removedActors); @@ -61,8 +62,7 @@ public class Main { activeActors.add(actorName); } for(ActorRef removedActor : removedActors){ - removedActor.tell(new ActionInvokeLeave(), removedActor); - + removedActor.tell(new ActionInvokeLeave(), removedActor); } Thread.sleep(1000); @@ -72,5 +72,6 @@ public class Main { } superVisorActor.tell(new ActionInvokeUpdate(), superVisorActor); + */ } } diff --git a/JavaAkkaFuCoin/src/fucoin/Wallet.java b/JavaAkkaFuCoin/src/fucoin/Wallet.java index 0d55876..70f26fd 100644 --- a/JavaAkkaFuCoin/src/fucoin/Wallet.java +++ b/JavaAkkaFuCoin/src/fucoin/Wallet.java @@ -1,16 +1,15 @@ package fucoin; -import java.util.Map.Entry; -import java.util.concurrent.Semaphore; - import akka.actor.ActorRef; import akka.actor.Props; -import fucoin.actions.ActionGetAmount; -import fucoin.actions.ActionGetAmountAnswer; -import fucoin.actions.ActionInvokeLeave; -import fucoin.actions.ActionInvokeRevive; -import fucoin.actions.ActionInvokeSentMoney; -import fucoin.actions.ActionInvokeSentMoney2; +import fucoin.actions.ClientAction; +import fucoin.actions.join.ActionJoin; +import fucoin.actions.join.ActionJoinAnswer; +import fucoin.actions.join.ServerActionJoin; +import fucoin.actions.persist.ActionInvokeLeave; +import fucoin.actions.persist.ActionInvokeRevive; +import fucoin.actions.transaction.ActionGetAmountAnswer; +import fucoin.actions.transaction.ActionInvokeSentMoney; import fucoin.gui.IWalletControle; import fucoin.gui.IWalletGuiControle; @@ -28,167 +27,32 @@ public class Wallet extends AbstractWallet implements IWalletControle{ this.preknownNeighbour=preknownNeighbour; this.remoteSuperVisorActor=remoteSuperVisorActor; } - - @Override - public String getAddress() { - return getAddress(getSelf()); - } - - private String getAddress(ActorRef self) { - return self.path().toSerializationFormatWithAddress(self.path().address()); - } - - public void send(String name, int amount){ - //System.out.println("search wallet"+name+" in "+knownNeighbors.keySet()); - if(knownNeighbors.containsKey(name)){ - addAmount(-amount); - knownNeighbors.get(name).tell(new ActionReceiveTransaction(amount), getSelf()); - }else{ - for(ActorRef neighbor : knownNeighbors.values()){ - neighbor.tell(new ActionSearchWalletReference(name,System.currentTimeMillis()+10), getSelf()); - } - - try { - getContext().unwatch(getSelf()); - Thread.sleep(200); - getContext().watch(getSelf()); - } catch (InterruptedException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - //getContext().unwatch(getSelf()); - getSelf().tell(new ActionInvokeSentMoney(name, amount), getSelf()); - - } - - } - - private void addAmount(int amount) { + public void addAmount(int amount) { setAmount(this.amount+amount); log(" My amount is now "+this.amount); - if(gui!=null){ - gui.setAmount(this.amount); - } + } @Override public void leave() { - for(ActorRef neighbor : knownNeighbors.values()){ - if(getSelf().compareTo(neighbor)!=0){ - neighbor.tell(new ActionStoreOrUpdate(this), getSelf()); - } - } - remoteSuperVisorActor.tell(new ActionStoreOrUpdate(this), getSelf()); - isActive=false; - backedUpNeighbors.clear(); - knownNeighbors.clear(); - knownNeighbors.put(preknownNeighbourName,preknownNeighbour); + getSelf().tell(new ActionInvokeLeave(), getSelf()); } @Override public void onReceive(Object message) { - log(getSender().path().name()+" invokes "+getSelf().path().name()+" to do "+message.getClass().getSimpleName()); + log(message.getClass().getSimpleName()); + + //log(getSender().path().name()+" invokes "+getSelf().path().name()+" to do "+message.getClass().getSimpleName()); if(message instanceof ActionInvokeRevive){ - isActive=true; - - preknownNeighbour.tell(new ActionJoin(), getSelf()); - + ((ActionInvokeRevive) message).doAction(this); } - if(!isActive)return; + if(!isActive&&!(message instanceof ActionInvokeRevive))return; //System.out.println(message); - if(message instanceof ActionJoin){ - ActionJoinAnswer aja = new ActionJoinAnswer(); - aja.someNeighbors.putAll(knownNeighbors); - getSender().tell(aja, getSelf()); - }else if(message instanceof ActionJoinAnswer){ - ActionJoinAnswer aja = (ActionJoinAnswer) message; - for(Entry<String, ActorRef> neighbor : knownNeighbors.entrySet()){ - addKnownNeighbor(neighbor.getKey(),neighbor.getValue()); - neighbor.getValue().tell(new ActionSearchMyWallet(name), getSelf()); - } - - - }else if(message instanceof ActionSearchMyWallet){ - ActionSearchMyWallet asmw = (ActionSearchMyWallet) message; - //If I know that somebody is searching himself, - //I can store him under the searched wallet name - addKnownNeighbor(asmw.name, getSender()); - - AbstractWallet storedWallet = backedUpNeighbors.get(asmw.name); - log(" "+knownNeighbors); - if(storedWallet!=null){ - getSender().tell(new ActionSearchMyWalletAnswer(storedWallet), getSelf()); - } - }else if(message instanceof ActionSearchMyWalletAnswer){ - ActionSearchMyWalletAnswer asmwa = (ActionSearchMyWalletAnswer) message; - setAmount(asmwa.w.amount); - getSender().tell(new ActionInvalidate(name), getSelf()); - }else if(message instanceof ActionInvalidate){ - ActionInvalidate ai = (ActionInvalidate) message; - backedUpNeighbors.remove(ai.name); - }else if(message instanceof ActionSearchWalletReference){ - ActionSearchWalletReference aswr = (ActionSearchWalletReference) message; - System.out.println("I search for you"+aswr.name); - if(this.name.equals(aswr.name)){ - getSender().tell(new ActionSearchWalletReferenceAnswer(aswr.name,getAddress()),getSelf()); - }else if(backedUpNeighbors.containsKey(aswr.name)){ - getSender().tell(new ActionSearchWalletReferenceAnswer(aswr.name,backedUpNeighbors.get(aswr.name).getAddress()),getSelf()); - } else if(knownNeighbors.containsKey(aswr.name)){ - getSender().tell(new ActionSearchWalletReferenceAnswer(aswr.name,getAddress(knownNeighbors.get(aswr.name))),getSelf()); - } else if (System.currentTimeMillis()<aswr.ttl){ - //for(ActorRef actor : knownNeighbors.values()){ - // actor.tell(aswr,getSelf()); - //} - //Because Sender is maybe unknown - //getSender().tell(aswr, getSelf()); - } - }else if(message instanceof ActionSearchWalletReferenceAnswer){ - ActionSearchWalletReferenceAnswer aswra = (ActionSearchWalletReferenceAnswer) message; - ActorRef target = getContext().actorSelection(aswra.address).anchor(); - addKnownNeighbor(aswra.name,target); - }else if(message instanceof ActionInvokeSentMoney){ - ActionInvokeSentMoney aism = (ActionInvokeSentMoney) message; - - send(aism.name, aism.amount); - }else if(message instanceof ActionInvokeSentMoney2){ - if(knownNeighbors.containsKey(name)){ - addAmount(-amount); - knownNeighbors.get(name).tell(new ActionReceiveTransaction(amount), getSelf()); - } - }else if(message instanceof ActionReceiveTransaction){ - ActionReceiveTransaction art = (ActionReceiveTransaction) message; - System.out.println(message.getClass().getSimpleName()+" "+art.amount); - addAmount(art.amount); - }else if(message instanceof ActionStoreOrUpdate){ - ActionStoreOrUpdate asou = (ActionStoreOrUpdate) message; - backedUpNeighbors.put(asou.w.name, asou.w); - }else if(message instanceof ActionGetAmount){ - ActionGetAmountAnswer agaa = new ActionGetAmountAnswer(getAddress(),getName(),amount); - getSender().tell(agaa, getSelf()); - }else if(message instanceof ActionInvokeLeave){ - leave(); - }else if(message instanceof ActionInvokeRevive){ - - }else{ - unhandled(message); - System.err.println("Unexpected Error: "+message+" not handeld"); - } - } - - private void addKnownNeighbor(String key, ActorRef value) { - if(!knownNeighbors.containsKey(key)){ - System.out.println(knownNeighbors.keySet()+" does not contain "+key); - knownNeighbors.put(key,value); - if(gui!=null){ - gui.addKnownAddress(key); - } - System.out.println(key+"-->"+value); + if(message instanceof ClientAction){ + ((ClientAction) message).doAction(this); } - } - - private void log(String string) { - System.out.println(getSelf()+": "+string); + } @Override @@ -198,22 +62,23 @@ public class Wallet extends AbstractWallet implements IWalletControle{ gui.setAddress(getAddress()); } String path = "akka.tcp://Core@127.0.0.1:1234/user/Main"; - System.out.println(getContext().provider().getExternalAddressFor(getSelf().path().address())); + //System.out.println(getContext().provider().getExternalAddressFor(getSelf().path().address())); //log("my address should be "+getAddress()); //log(""+preknownNeighbour); //knownNeighbors.put(getName(),getSelf()); - System.out.println(knownNeighbors); + //System.out.println(knownNeighbors); if(preknownNeighbour!=null){ - knownNeighbors.put(preknownNeighbourName,preknownNeighbour); + addKnownNeighbor(preknownNeighbourName,preknownNeighbour); preknownNeighbour.tell(new ActionJoin(), getSelf()); ActionJoinAnswer aja = new ActionJoinAnswer(); - aja.someNeighbors.putAll(knownNeighbors); + aja.someNeighbors.putAll(getKnownNeighbors()); + aja.someNeighbors.put(name, getSelf()); preknownNeighbour.tell(aja, getSelf()); } - setAmount(100); - remoteSuperVisorActor.tell(new ActionJoin(), getSelf()); + //setAmount(100); + //remoteSuperVisorActor.tell(new ServerActionJoin(name), getSelf()); } @Override @@ -241,14 +106,7 @@ public class Wallet extends AbstractWallet implements IWalletControle{ this.gui=gui; } - Semaphore mutex = new Semaphore(1); public void setAmount(int amount){ - try { - mutex.acquire(); - } catch (InterruptedException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } this.amount = amount; if(remoteSuperVisorActor != null){ remoteSuperVisorActor.tell(new ActionGetAmountAnswer(getAddress(), getName(), amount), getSelf()); @@ -256,7 +114,60 @@ public class Wallet extends AbstractWallet implements IWalletControle{ if(gui!=null){ gui.setAmount(this.amount); } - mutex.release(); + } + + public ActorRef getPreknownNeighbour() { + return preknownNeighbour; + } + + public ActorRef getRemoteSuperVisorActor() { + return remoteSuperVisorActor; + } + + public IWalletGuiControle getGui() { + return gui; + } + + public String getPreknownNeighbourName() { + return preknownNeighbourName; + } + + public boolean isActive() { + return isActive; + } + + @Override + public boolean addKnownNeighbor(String key, ActorRef value) { + log(key+" is newNeighbor?"+!getKnownNeighbors().containsKey(key)); + if(getKnownNeighbors().containsKey(key)||key.equals(name)){ + return false; + } + boolean newNeighbor = super.addKnownNeighbor(key, value); + if(gui!=null&&newNeighbor){ + gui.addKnownAddress(key); + } + return newNeighbor; + } + + public void setPreknownNeighbour(ActorRef preknownNeighbour) { + this.preknownNeighbour = preknownNeighbour; + } + + public void setRemoteSuperVisorActor(ActorRef remoteSuperVisorActor) { + this.remoteSuperVisorActor = remoteSuperVisorActor; + } + + public void setPreknownNeighbourName(String preknownNeighbourName) { + this.preknownNeighbourName = preknownNeighbourName; + } + + public void setActive(boolean isActive) { + this.isActive = isActive; + } + + @Override + public void send(String address, int amount) { + getSelf().tell(new ActionInvokeSentMoney(address, amount), getSelf()); } } diff --git a/JavaAkkaFuCoin/src/fucoin/WalletCreator.java b/JavaAkkaFuCoin/src/fucoin/WalletCreator.java index 0193d09..3918b11 100644 --- a/JavaAkkaFuCoin/src/fucoin/WalletCreator.java +++ b/JavaAkkaFuCoin/src/fucoin/WalletCreator.java @@ -24,8 +24,8 @@ public class WalletCreator implements Creator<Wallet> { public Wallet create() throws Exception { Wallet wallet = new Wallet(preknownNeighbour,preknownNeighbourName, walletName,remoteSuperVisorActor); -// IWalletGuiControle gui = new WalletGui(wallet); -// wallet.setGui(gui); + IWalletGuiControle gui = new WalletGui(wallet); + wallet.setGui(gui); return wallet; } diff --git a/JavaAkkaFuCoin/src/fucoin/actions/Action.java b/JavaAkkaFuCoin/src/fucoin/actions/Action.java new file mode 100644 index 0000000..926c6b2 --- /dev/null +++ b/JavaAkkaFuCoin/src/fucoin/actions/Action.java @@ -0,0 +1,21 @@ +package fucoin.actions; + +import fucoin.AbstractNode; +import akka.actor.ActorRef; +import akka.actor.UntypedActorContext; + +public abstract class Action<T extends AbstractNode> { + private ActorRef self; + + public final void doAction(T abstractNode){ + this.self=abstractNode.getSelf(); + onAction(abstractNode.getSender(),abstractNode.getSelf(),abstractNode.getContext(),abstractNode); + } + + protected abstract void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, T abstractNode); + + public void log(String string) { + System.out.println(self.path().name()+": "+string); + } + +} diff --git a/JavaAkkaFuCoin/src/fucoin/actions/ActionGetAmount.java b/JavaAkkaFuCoin/src/fucoin/actions/ActionGetAmount.java deleted file mode 100644 index 9c486c8..0000000 --- a/JavaAkkaFuCoin/src/fucoin/actions/ActionGetAmount.java +++ /dev/null @@ -1,5 +0,0 @@ -package fucoin.actions; - -public class ActionGetAmount { - -} diff --git a/JavaAkkaFuCoin/src/fucoin/actions/ActionGetAmountAnswer.java b/JavaAkkaFuCoin/src/fucoin/actions/ActionGetAmountAnswer.java deleted file mode 100644 index a7c53bd..0000000 --- a/JavaAkkaFuCoin/src/fucoin/actions/ActionGetAmountAnswer.java +++ /dev/null @@ -1,15 +0,0 @@ -package fucoin.actions; - -public class ActionGetAmountAnswer { - - public String address; - public String name; - public int amount; - - public ActionGetAmountAnswer(String address, String name, int amount) { - this.address=address; - this.name=name; - this.amount=amount; - } - -} diff --git a/JavaAkkaFuCoin/src/fucoin/actions/ActionInvokeLeave.java b/JavaAkkaFuCoin/src/fucoin/actions/ActionInvokeLeave.java deleted file mode 100644 index f30bb23..0000000 --- a/JavaAkkaFuCoin/src/fucoin/actions/ActionInvokeLeave.java +++ /dev/null @@ -1,5 +0,0 @@ -package fucoin.actions; - -public class ActionInvokeLeave { - -} diff --git a/JavaAkkaFuCoin/src/fucoin/actions/ActionInvokeRevive.java b/JavaAkkaFuCoin/src/fucoin/actions/ActionInvokeRevive.java deleted file mode 100644 index 182c5e9..0000000 --- a/JavaAkkaFuCoin/src/fucoin/actions/ActionInvokeRevive.java +++ /dev/null @@ -1,5 +0,0 @@ -package fucoin.actions; - -public class ActionInvokeRevive { - -} diff --git a/JavaAkkaFuCoin/src/fucoin/actions/ActionInvokeSentMoney.java b/JavaAkkaFuCoin/src/fucoin/actions/ActionInvokeSentMoney.java deleted file mode 100644 index 6b9cd1a..0000000 --- a/JavaAkkaFuCoin/src/fucoin/actions/ActionInvokeSentMoney.java +++ /dev/null @@ -1,12 +0,0 @@ -package fucoin.actions; - -import java.io.Serializable; - -public class ActionInvokeSentMoney implements Serializable{ - public final String name; - public final int amount; - public ActionInvokeSentMoney(String name, int amount) { - this.name=name; - this.amount = amount; - } -} diff --git a/JavaAkkaFuCoin/src/fucoin/actions/ActionInvokeSentMoney2.java b/JavaAkkaFuCoin/src/fucoin/actions/ActionInvokeSentMoney2.java deleted file mode 100644 index d915a3a..0000000 --- a/JavaAkkaFuCoin/src/fucoin/actions/ActionInvokeSentMoney2.java +++ /dev/null @@ -1,12 +0,0 @@ -package fucoin.actions; - -import java.io.Serializable; - -public class ActionInvokeSentMoney2 implements Serializable{ - public final String name; - public final int amount; - public ActionInvokeSentMoney2(String name, int amount) { - this.name=name; - this.amount = amount; - } -} diff --git a/JavaAkkaFuCoin/src/fucoin/actions/ActionInvokeUpdate.java b/JavaAkkaFuCoin/src/fucoin/actions/ActionInvokeUpdate.java deleted file mode 100644 index 14780e0..0000000 --- a/JavaAkkaFuCoin/src/fucoin/actions/ActionInvokeUpdate.java +++ /dev/null @@ -1,5 +0,0 @@ -package fucoin.actions; - -public class ActionInvokeUpdate { - -} diff --git a/JavaAkkaFuCoin/src/fucoin/actions/ClientAction.java b/JavaAkkaFuCoin/src/fucoin/actions/ClientAction.java new file mode 100644 index 0000000..e4f5119 --- /dev/null +++ b/JavaAkkaFuCoin/src/fucoin/actions/ClientAction.java @@ -0,0 +1,12 @@ +package fucoin.actions; + +import akka.actor.ActorRef; +import akka.actor.UntypedActorContext; +import fucoin.Wallet; + +public abstract class ClientAction extends Action<Wallet>{ + @Override + protected abstract void onAction(ActorRef sender, ActorRef self, + UntypedActorContext context, Wallet abstractNode); + +} diff --git a/JavaAkkaFuCoin/src/fucoin/actions/join/ActionJoin.java b/JavaAkkaFuCoin/src/fucoin/actions/join/ActionJoin.java new file mode 100644 index 0000000..93b690d --- /dev/null +++ b/JavaAkkaFuCoin/src/fucoin/actions/join/ActionJoin.java @@ -0,0 +1,18 @@ +package fucoin.actions.join; + +import akka.actor.ActorRef; +import akka.actor.UntypedActorContext; +import fucoin.AbstractNode; + +//Used to join the network (a pre known participant/Wallet must be known) +public class ActionJoin extends GeneralAction{ + + @Override + protected void onAction(ActorRef sender, ActorRef self, + UntypedActorContext context, AbstractNode node) { + ActionJoinAnswer aja = new ActionJoinAnswer(); + aja.someNeighbors.putAll(node.getKnownNeighbors()); + sender.tell(aja, self); + } + +} diff --git a/JavaAkkaFuCoin/src/fucoin/actions/join/ActionJoinAnswer.java b/JavaAkkaFuCoin/src/fucoin/actions/join/ActionJoinAnswer.java new file mode 100644 index 0000000..b230b1a --- /dev/null +++ b/JavaAkkaFuCoin/src/fucoin/actions/join/ActionJoinAnswer.java @@ -0,0 +1,27 @@ +package fucoin.actions.join; + +import java.util.HashMap; +import java.util.Map.Entry; + +import akka.actor.ActorRef; +import akka.actor.UntypedActorContext; +import fucoin.Wallet; +import fucoin.actions.ClientAction; +import fucoin.actions.persist.ActionSearchMyWallet; + +// Returns some neighbors that might be used as known +// and/or local neighbors +public class ActionJoinAnswer extends ClientAction{ + public final HashMap<String, ActorRef> someNeighbors = new HashMap<>(); + protected void onAction(ActorRef sender, ActorRef self, + UntypedActorContext context, Wallet wallet) { + log("someNeighbors:"+someNeighbors); + for(Entry<String, ActorRef> neighbor : someNeighbors.entrySet()){ + wallet.addKnownNeighbor(neighbor.getKey(),neighbor.getValue()); + } + for(Entry<String, ActorRef> neighbor : someNeighbors.entrySet()){ + neighbor.getValue().tell(new ActionSearchMyWallet(wallet.getName()), self); + } + } + +} diff --git a/JavaAkkaFuCoin/src/fucoin/actions/join/GeneralAction.java b/JavaAkkaFuCoin/src/fucoin/actions/join/GeneralAction.java new file mode 100644 index 0000000..8ddb517 --- /dev/null +++ b/JavaAkkaFuCoin/src/fucoin/actions/join/GeneralAction.java @@ -0,0 +1,8 @@ +package fucoin.actions.join; + +import fucoin.AbstractNode; +import fucoin.actions.Action; + +public abstract class GeneralAction extends Action<AbstractNode> { + +} diff --git a/JavaAkkaFuCoin/src/fucoin/actions/join/Join.java b/JavaAkkaFuCoin/src/fucoin/actions/join/Join.java new file mode 100644 index 0000000..eccba13 --- /dev/null +++ b/JavaAkkaFuCoin/src/fucoin/actions/join/Join.java @@ -0,0 +1,10 @@ +package fucoin.actions.join; + +import akka.actor.ActorRef; +import akka.actor.UntypedActorContext; +import fucoin.AbstractNode; +import fucoin.Wallet; +import fucoin.actions.ClientAction; + +public abstract class Join extends ClientAction{ +} diff --git a/JavaAkkaFuCoin/src/fucoin/actions/join/ServerActionJoin.java b/JavaAkkaFuCoin/src/fucoin/actions/join/ServerActionJoin.java new file mode 100644 index 0000000..9dfc08a --- /dev/null +++ b/JavaAkkaFuCoin/src/fucoin/actions/join/ServerActionJoin.java @@ -0,0 +1,27 @@ +package fucoin.actions.join; + +import akka.actor.ActorRef; +import akka.actor.UntypedActorContext; +import fucoin.actions.transaction.ActionInvokeDistributedCommitedTransfer; +import fucoin.actions.transaction.SuperVisorAction; +import fucoin.supervisor.SuperVisor; + +public class ServerActionJoin extends SuperVisorAction { + private String name; + + public ServerActionJoin(String name) { + this.name = name; + } + + @Override + protected void onAction(ActorRef sender, ActorRef self, + UntypedActorContext context, SuperVisor node) { + ActionJoinAnswer aja = new ActionJoinAnswer(); + aja.someNeighbors.putAll(node.getKnownNeighbors()); + sender.tell(aja, self); + node.addKnownNeighbor(name, sender); + self.tell( + new ActionInvokeDistributedCommitedTransfer(self, sender, 100), + sender); + } +} diff --git a/JavaAkkaFuCoin/src/fucoin/actions/persist/ActionInvalidate.java b/JavaAkkaFuCoin/src/fucoin/actions/persist/ActionInvalidate.java new file mode 100644 index 0000000..f5068d3 --- /dev/null +++ b/JavaAkkaFuCoin/src/fucoin/actions/persist/ActionInvalidate.java @@ -0,0 +1,19 @@ +package fucoin.actions.persist; + +import akka.actor.ActorRef; +import akka.actor.UntypedActorContext; +import fucoin.AbstractNode; +import fucoin.Wallet; + +// May be used to delete a stored Wallet on another participant +public class ActionInvalidate extends Persist{ + public final String name; + public ActionInvalidate(String name) { + this.name = name; + } + @Override + protected void onAction(ActorRef sender, ActorRef self, + UntypedActorContext context, Wallet wallet) { + wallet.backedUpNeighbors.remove(name); + } +} diff --git a/JavaAkkaFuCoin/src/fucoin/actions/persist/ActionInvokeLeave.java b/JavaAkkaFuCoin/src/fucoin/actions/persist/ActionInvokeLeave.java new file mode 100644 index 0000000..e8d0102 --- /dev/null +++ b/JavaAkkaFuCoin/src/fucoin/actions/persist/ActionInvokeLeave.java @@ -0,0 +1,24 @@ +package fucoin.actions.persist; + +import akka.actor.ActorRef; +import akka.actor.UntypedActorContext; +import fucoin.Wallet; + +public class ActionInvokeLeave extends Persist{ + + @Override + protected void onAction(ActorRef sender, ActorRef self, + UntypedActorContext context, Wallet wallet) { + for(ActorRef neighbor : wallet.getKnownNeighbors().values()){ + if(self.compareTo(neighbor)!=0){ + neighbor.tell(new ActionStoreOrUpdate(wallet), self); + } + } + + + wallet.setActive(false); + wallet.backedUpNeighbors.clear(); + wallet.getKnownNeighbors().clear(); + } + +} diff --git a/JavaAkkaFuCoin/src/fucoin/actions/persist/ActionInvokeRevive.java b/JavaAkkaFuCoin/src/fucoin/actions/persist/ActionInvokeRevive.java new file mode 100644 index 0000000..d1bf101 --- /dev/null +++ b/JavaAkkaFuCoin/src/fucoin/actions/persist/ActionInvokeRevive.java @@ -0,0 +1,17 @@ +package fucoin.actions.persist; + +import akka.actor.ActorRef; +import akka.actor.UntypedActorContext; +import fucoin.Wallet; +import fucoin.actions.join.ActionJoin; + +public class ActionInvokeRevive extends Persist{ + + @Override + protected void onAction(ActorRef sender, ActorRef self, + UntypedActorContext context, Wallet wallet) { + wallet.setActive(true); + wallet.getPreknownNeighbour().tell(new ActionJoin(), self); + } + +} diff --git a/JavaAkkaFuCoin/src/fucoin/actions/persist/ActionInvokeUpdate.java b/JavaAkkaFuCoin/src/fucoin/actions/persist/ActionInvokeUpdate.java new file mode 100644 index 0000000..e71f052 --- /dev/null +++ b/JavaAkkaFuCoin/src/fucoin/actions/persist/ActionInvokeUpdate.java @@ -0,0 +1,16 @@ +package fucoin.actions.persist; + +import akka.actor.ActorRef; +import akka.actor.UntypedActorContext; +import fucoin.Wallet; + +public class ActionInvokeUpdate extends Persist{ + + @Override + protected void onAction(ActorRef sender, ActorRef self, + UntypedActorContext context, Wallet wallet) { + // TODO Auto-generated method stub + + } + +} diff --git a/JavaAkkaFuCoin/src/fucoin/actions/persist/ActionSearchMyWallet.java b/JavaAkkaFuCoin/src/fucoin/actions/persist/ActionSearchMyWallet.java new file mode 100644 index 0000000..885cbd6 --- /dev/null +++ b/JavaAkkaFuCoin/src/fucoin/actions/persist/ActionSearchMyWallet.java @@ -0,0 +1,28 @@ +package fucoin.actions.persist; + +import akka.actor.ActorRef; +import akka.actor.UntypedActorContext; +import fucoin.AbstractWallet; +import fucoin.Wallet; + +// Used to search a Wallet by name, i.e. the own wallet if we just +// joined the network; If a receiving participant holds the stored Wallet, +// he returns it, otherwise, he might use gossiping methods to go on +// with the search; +// Note: You should also forward the sender (the participant who actually +// searches for this Wallet, so that it can be returnd the direct way) +public class ActionSearchMyWallet extends Persist{ + public final String name; + public ActionSearchMyWallet(String name) { + this.name = name; + } + @Override + protected void onAction(ActorRef sender, ActorRef self, + UntypedActorContext context, Wallet wallet) { + wallet.addKnownNeighbor(name, sender); + AbstractWallet storedWallet =wallet.backedUpNeighbors.get(name); + if(storedWallet!=null){ + sender.tell(new ActionSearchMyWalletAnswer(storedWallet), self); + } + } +} diff --git a/JavaAkkaFuCoin/src/fucoin/actions/persist/ActionSearchMyWalletAnswer.java b/JavaAkkaFuCoin/src/fucoin/actions/persist/ActionSearchMyWalletAnswer.java new file mode 100644 index 0000000..9336928 --- /dev/null +++ b/JavaAkkaFuCoin/src/fucoin/actions/persist/ActionSearchMyWalletAnswer.java @@ -0,0 +1,20 @@ +package fucoin.actions.persist; + +import akka.actor.ActorRef; +import akka.actor.UntypedActorContext; +import fucoin.AbstractWallet; +import fucoin.Wallet; + +// Used to return a searched Wallet +public class ActionSearchMyWalletAnswer extends Persist { + public final AbstractWallet w; + public ActionSearchMyWalletAnswer(AbstractWallet w) { + this.w = w; + } + @Override + protected void onAction(ActorRef sender, ActorRef self, + UntypedActorContext context, Wallet wallet) { + wallet.setAmount(w.amount); + sender.tell(new ActionInvalidate(wallet.name), self); + } +} diff --git a/JavaAkkaFuCoin/src/fucoin/actions/persist/ActionStoreOrUpdate.java b/JavaAkkaFuCoin/src/fucoin/actions/persist/ActionStoreOrUpdate.java new file mode 100644 index 0000000..977c199 --- /dev/null +++ b/JavaAkkaFuCoin/src/fucoin/actions/persist/ActionStoreOrUpdate.java @@ -0,0 +1,20 @@ +package fucoin.actions.persist; + +import akka.actor.ActorRef; +import akka.actor.UntypedActorContext; +import fucoin.AbstractNode; +import fucoin.AbstractWallet; +import fucoin.Wallet; + +//Used to push the state of my/a wallet to another participant +public class ActionStoreOrUpdate extends Persist { + public final AbstractWallet w; + public ActionStoreOrUpdate(AbstractWallet w) { + this.w = w; + } + @Override + protected void onAction(ActorRef sender, ActorRef self, + UntypedActorContext context, Wallet wallet) { + wallet.backedUpNeighbors.put(w.name, w); + } +} diff --git a/JavaAkkaFuCoin/src/fucoin/actions/persist/Persist.java b/JavaAkkaFuCoin/src/fucoin/actions/persist/Persist.java new file mode 100644 index 0000000..1a42e14 --- /dev/null +++ b/JavaAkkaFuCoin/src/fucoin/actions/persist/Persist.java @@ -0,0 +1,7 @@ +package fucoin.actions.persist; + +import fucoin.actions.ClientAction; + +public abstract class Persist extends ClientAction{ + +} diff --git a/JavaAkkaFuCoin/src/fucoin/actions/search/ActionSearchWalletReference.java b/JavaAkkaFuCoin/src/fucoin/actions/search/ActionSearchWalletReference.java new file mode 100644 index 0000000..4de7d80 --- /dev/null +++ b/JavaAkkaFuCoin/src/fucoin/actions/search/ActionSearchWalletReference.java @@ -0,0 +1,50 @@ +package fucoin.actions.search; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map.Entry; + +import akka.actor.ActorRef; +import akka.actor.UntypedActorContext; +import fucoin.Wallet; + +//Used to return a Wallet reference (akka-style string which can +// be transformed to an ActorRef) +public class ActionSearchWalletReference extends Search{ + + public final String name; + public final List<ActorRef> ttl = new ArrayList<ActorRef>(); + public ActionSearchWalletReference(String name) { + this.name = name; + } + + @Override + protected void onAction(ActorRef sender, ActorRef self, + UntypedActorContext context, Wallet wallet) { + log(wallet.getKnownNeighbors()+"contains "+name+"?"); + ttl.add(self); + ActionSearchWalletReferenceAnswer answer = null; + if(this.name.equals(wallet.getName())){ + answer = new ActionSearchWalletReferenceAnswer(name,wallet.getAddress(),ttl); + }else if(wallet.backedUpNeighbors.containsKey(name)){ + answer = new ActionSearchWalletReferenceAnswer(name,wallet.backedUpNeighbors.get(name).getAddress(),ttl); + } else if(wallet.getKnownNeighbors().containsKey(name)){ + answer = new ActionSearchWalletReferenceAnswer(name,wallet.getAddress(wallet.getKnownNeighbors().get(name)),ttl); + } else if (ttl.size()<5){ + for(ActorRef neighbor : wallet.getKnownNeighbors().values()){ + if(!ttl.contains(neighbor)){ + + + + neighbor.tell(this, self); + } + } + } + //System.out.println("ttl:"+ttl.size()); + //User unknown by this Wallet + if(answer!=null&&ttl.size()>0){ + ttl.get(ttl.size()-1).tell(answer, self); + } + } + +} diff --git a/JavaAkkaFuCoin/src/fucoin/actions/search/ActionSearchWalletReferenceAnswer.java b/JavaAkkaFuCoin/src/fucoin/actions/search/ActionSearchWalletReferenceAnswer.java new file mode 100644 index 0000000..6cc2f40 --- /dev/null +++ b/JavaAkkaFuCoin/src/fucoin/actions/search/ActionSearchWalletReferenceAnswer.java @@ -0,0 +1,30 @@ +package fucoin.actions.search; + +import java.util.List; + +import akka.actor.ActorRef; +import akka.actor.UntypedActorContext; +import fucoin.Wallet; + +public class ActionSearchWalletReferenceAnswer extends Search { + + public final String address; + public final String name; + public final List<ActorRef> pathToSearchedWallet; + public ActionSearchWalletReferenceAnswer(String name,String address, List<ActorRef> pathToSearchedWallet) { + this.address = address; + this.name=name; + this.pathToSearchedWallet=pathToSearchedWallet; + } + + @Override + protected void onAction(ActorRef sender, ActorRef self, + UntypedActorContext context, Wallet wallet) { + ActorRef target = context.actorSelection(address).anchor(); + wallet.addKnownNeighbor(name,target); + int pos = pathToSearchedWallet.indexOf(self); + if(pos>0){ + pathToSearchedWallet.get(pos-1).tell(this, self); + } + } +} diff --git a/JavaAkkaFuCoin/src/fucoin/actions/search/Search.java b/JavaAkkaFuCoin/src/fucoin/actions/search/Search.java new file mode 100644 index 0000000..505f915 --- /dev/null +++ b/JavaAkkaFuCoin/src/fucoin/actions/search/Search.java @@ -0,0 +1,7 @@ +package fucoin.actions.search; + +import fucoin.actions.ClientAction; + +public abstract class Search extends ClientAction{ + +} diff --git a/JavaAkkaFuCoin/src/fucoin/actions/transaction/ActionCommitDistributedCommitedTransfer.java b/JavaAkkaFuCoin/src/fucoin/actions/transaction/ActionCommitDistributedCommitedTransfer.java new file mode 100644 index 0000000..d9edb73 --- /dev/null +++ b/JavaAkkaFuCoin/src/fucoin/actions/transaction/ActionCommitDistributedCommitedTransfer.java @@ -0,0 +1,58 @@ +package fucoin.actions.transaction; + +import akka.actor.ActorRef; +import akka.actor.UntypedActorContext; +import fucoin.Wallet; +import fucoin.actions.ClientAction; +import fucoin.supervisor.DistributedCommitedTransferRequest; + +public class ActionCommitDistributedCommitedTransfer extends ClientAction{ + + private ActorRef source; + private ActorRef target; + private int amount; + private boolean granted; + private long timestamp; + private long id; + + + public ActionCommitDistributedCommitedTransfer(ActorRef source, + ActorRef target, int amount, boolean granted, long timestamp, long id) { + this.source=source; + this.target=target; + this.amount=amount; + this.granted=granted; + this.timestamp=timestamp; + this.id=id; + } + + public ActionCommitDistributedCommitedTransfer( + DistributedCommitedTransferRequest outdatedRequest) { + this.source=outdatedRequest.getSource(); + this.target=outdatedRequest.getTarget(); + this.amount=0; + this.granted=false; + this.timestamp=outdatedRequest.getTimeout(); + this.id=outdatedRequest.getId(); + } + + @Override + protected void onAction(ActorRef sender, ActorRef self, + UntypedActorContext context, Wallet wallet) { + + if(granted){ + Integer sourceAmount = wallet.amounts.getOrDefault(source,0); + Integer targetAmount = wallet.amounts.getOrDefault(target,0); + wallet.amounts.put(source,sourceAmount-amount); + wallet.amounts.put(target,targetAmount+amount); + if(source.compareTo(self)==0)wallet.amount-=amount; + else if(target.compareTo(self)==0)wallet.amount+=amount; + wallet.log("have now "+wallet.amounts.get(self)+" Fucoins"); + }else{ + log("abort transaction with id"+id); + } + log("wallet.amounts:"+wallet.amounts); + + } + +} diff --git a/JavaAkkaFuCoin/src/fucoin/actions/transaction/ActionGetAmount.java b/JavaAkkaFuCoin/src/fucoin/actions/transaction/ActionGetAmount.java new file mode 100644 index 0000000..ad9737b --- /dev/null +++ b/JavaAkkaFuCoin/src/fucoin/actions/transaction/ActionGetAmount.java @@ -0,0 +1,16 @@ +package fucoin.actions.transaction; + +import akka.actor.ActorRef; +import akka.actor.UntypedActorContext; +import fucoin.Wallet; + +public class ActionGetAmount extends Transaction { + + @Override + protected void onAction(ActorRef sender, ActorRef self, + UntypedActorContext context, Wallet wallet) { + ActionGetAmountAnswer agaa = new ActionGetAmountAnswer(wallet.getAddress(),wallet.getName(),wallet.amount); + sender.tell(agaa, self); + } + +} diff --git a/JavaAkkaFuCoin/src/fucoin/actions/transaction/ActionGetAmountAnswer.java b/JavaAkkaFuCoin/src/fucoin/actions/transaction/ActionGetAmountAnswer.java new file mode 100644 index 0000000..4a85da1 --- /dev/null +++ b/JavaAkkaFuCoin/src/fucoin/actions/transaction/ActionGetAmountAnswer.java @@ -0,0 +1,25 @@ +package fucoin.actions.transaction; + +import akka.actor.ActorRef; +import akka.actor.UntypedActorContext; +import fucoin.Wallet; + +public class ActionGetAmountAnswer extends Transaction { + + public String address; + public String name; + public int amount; + + public ActionGetAmountAnswer(String address, String name, int amount) { + this.address=address; + this.name=name; + this.amount=amount; + } + + @Override + protected void onAction(ActorRef sender, ActorRef self, + UntypedActorContext context, Wallet wallet) { + + } + +} diff --git a/JavaAkkaFuCoin/src/fucoin/actions/transaction/ActionInvokeDistributedCommitedTransfer.java b/JavaAkkaFuCoin/src/fucoin/actions/transaction/ActionInvokeDistributedCommitedTransfer.java new file mode 100644 index 0000000..532fe5d --- /dev/null +++ b/JavaAkkaFuCoin/src/fucoin/actions/transaction/ActionInvokeDistributedCommitedTransfer.java @@ -0,0 +1,36 @@ +package fucoin.actions.transaction; + +import java.util.HashMap; + +import akka.actor.ActorRef; +import akka.actor.UntypedActorContext; +import fucoin.Wallet; +import fucoin.supervisor.DistributedCommitedTransferRequest; +import fucoin.supervisor.SuperVisor; + +public class ActionInvokeDistributedCommitedTransfer extends CoordinatorTransaction{ + + private ActorRef source; + private ActorRef target; + private int amount; + + public ActionInvokeDistributedCommitedTransfer(ActorRef source, + ActorRef target, int amount) { + this.source=source; + this.target=target; + this.amount=amount; + } + + @Override + protected void onAction(ActorRef sender, ActorRef self, + UntypedActorContext context, SuperVisor superVisor) { + log("invoke transaction "+source.path().name()+" "+amount+" "+target.path().name()); + long timestamp = System.currentTimeMillis()+500; + DistributedCommitedTransferRequest ds = new DistributedCommitedTransferRequest(source,target,timestamp); + superVisor.addDistributedCommitedTransferRequest(ds); + for(ActorRef neighbor : superVisor.getKnownNeighbors().values()){ + neighbor.tell(new ActionPrepareDistributedCommitedTransfer(source,target,amount,timestamp,ds.getId()), self); + } + } + +} diff --git a/JavaAkkaFuCoin/src/fucoin/actions/transaction/ActionInvokeSentMoney.java b/JavaAkkaFuCoin/src/fucoin/actions/transaction/ActionInvokeSentMoney.java new file mode 100644 index 0000000..3ce0506 --- /dev/null +++ b/JavaAkkaFuCoin/src/fucoin/actions/transaction/ActionInvokeSentMoney.java @@ -0,0 +1,41 @@ +package fucoin.actions.transaction; + + +import akka.actor.ActorRef; +import akka.actor.UntypedActorContext; +import fucoin.Wallet; +import fucoin.actions.search.ActionSearchWalletReference; + +public class ActionInvokeSentMoney extends Transaction{ + public final String name; + public final int amount; + public ActionInvokeSentMoney(String name, int amount) { + this.name=name; + this.amount = amount; + } + @Override + protected void onAction(ActorRef sender, ActorRef self, + UntypedActorContext context, Wallet wallet) { + log(wallet.getKnownNeighbors()+""); + if(wallet.getKnownNeighbors().containsKey(name)){ + wallet.getRemoteSuperVisorActor().tell( + new ActionInvokeDistributedCommitedTransfer(self,wallet.getKnownNeighbors().get(name),amount), sender); + }else{ + for(ActorRef neighbor : wallet.getKnownNeighbors().values()){ + neighbor.tell(new ActionSearchWalletReference(name), self); + } + + try { + context.unwatch(self); + Thread.sleep(200); + context.watch(self); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + //getContext().unwatch(getSelf()); + self.tell(new ActionInvokeSentMoney(name, amount), self); + + } + } +} diff --git a/JavaAkkaFuCoin/src/fucoin/actions/transaction/ActionInvokeSentMoney2.java b/JavaAkkaFuCoin/src/fucoin/actions/transaction/ActionInvokeSentMoney2.java new file mode 100644 index 0000000..743e0b8 --- /dev/null +++ b/JavaAkkaFuCoin/src/fucoin/actions/transaction/ActionInvokeSentMoney2.java @@ -0,0 +1,22 @@ +package fucoin.actions.transaction; + +import akka.actor.ActorRef; +import akka.actor.UntypedActorContext; +import fucoin.Wallet; + +public class ActionInvokeSentMoney2 extends Transaction{ + public final String name; + public final int amount; + public ActionInvokeSentMoney2(String name, int amount) { + this.name=name; + this.amount = amount; + } + @Override + protected void onAction(ActorRef sender, ActorRef self, + UntypedActorContext context, Wallet wallet) { + if(wallet.getKnownNeighbors().containsKey(name)){ + wallet.addAmount(-amount); + wallet.getKnownNeighbors().get(name).tell(new ActionReceiveTransaction(amount), self); + } + } +} diff --git a/JavaAkkaFuCoin/src/fucoin/actions/transaction/ActionPrepareDistributedCommitedTransfer.java b/JavaAkkaFuCoin/src/fucoin/actions/transaction/ActionPrepareDistributedCommitedTransfer.java new file mode 100644 index 0000000..cbe32fb --- /dev/null +++ b/JavaAkkaFuCoin/src/fucoin/actions/transaction/ActionPrepareDistributedCommitedTransfer.java @@ -0,0 +1,41 @@ +package fucoin.actions.transaction; + +import akka.actor.ActorRef; +import akka.actor.UntypedActorContext; +import fucoin.Wallet; + +public class ActionPrepareDistributedCommitedTransfer extends Transaction{ + + private ActorRef source; + private ActorRef target; + private int amount; + private long timestamp; + private long id; + + public ActionPrepareDistributedCommitedTransfer(ActorRef source, + ActorRef target, int amount, long timestamp, long id) { + this.source=source; + this.target=target; + this.amount=amount; + this.timestamp=timestamp; + this.id=id; + } + + @Override + protected void onAction(ActorRef sender, ActorRef self, + UntypedActorContext context, Wallet wallet) { + //log("wallet.amounts:"+wallet.amounts); + //log("amount:"+amount); + //log("source:"+source); + + //log("check if "+source.path().name()+" has more than "+amount+" he has now "+wallet.amounts.getOrDefault(source,0)); + //log("and sender.compareTo(source)==0?"+sender.compareTo(source)); + boolean granted = sender.compareTo(source)==0 //sender is supervisor(bank) has allways money + ||(wallet.amounts.containsKey(source) //sender is unknown, might be valid + &&wallet.amounts.getOrDefault(source,0)>=amount) ; //sender have enough money + //log("granted?:"+granted); + //log("contains?:"+wallet.amounts.containsKey(source) ); + sender.tell(new ActionPrepareDistributedCommitedTransferAnswer(source, target, amount,timestamp,granted,id),self); + } + +} diff --git a/JavaAkkaFuCoin/src/fucoin/actions/transaction/ActionPrepareDistributedCommitedTransferAnswer.java b/JavaAkkaFuCoin/src/fucoin/actions/transaction/ActionPrepareDistributedCommitedTransferAnswer.java new file mode 100644 index 0000000..376884a --- /dev/null +++ b/JavaAkkaFuCoin/src/fucoin/actions/transaction/ActionPrepareDistributedCommitedTransferAnswer.java @@ -0,0 +1,52 @@ +package fucoin.actions.transaction; + +import fucoin.supervisor.DistributedCommitedTransferRequest; +import fucoin.supervisor.SuperVisor; +import akka.actor.ActorRef; +import akka.actor.UntypedActorContext; + +public class ActionPrepareDistributedCommitedTransferAnswer extends CoordinatorTransaction { + + private ActorRef source; + private ActorRef target; + private int amount; + private boolean granted; + private long timestamp; + private long id; + + public ActionPrepareDistributedCommitedTransferAnswer(ActorRef source, + ActorRef target, int amount, long timestamp, boolean granted, long id) { + this.source=source; + this.target=target; + this.amount=amount; + this.granted=granted; + this.timestamp=timestamp; + this.id=id; + } + + @Override + protected void onAction(ActorRef sender, ActorRef self, + UntypedActorContext context, SuperVisor superVisor) { + log(""+superVisor.getKnownNeighbors()); + log("granted?"+granted); + DistributedCommitedTransferRequest request = superVisor.getRequest(id); + if(granted){ + if(request==null)//unknown DistributedCommitedTransferRequest ignore + return; + int newCount = request.addPositiveAnswer(sender); + if(newCount == superVisor.getKnownNeighbors().size()){ + for(ActorRef neighbor : request.getAnswers()){ + neighbor.tell(new ActionCommitDistributedCommitedTransfer(source,target,amount,true,timestamp,id), self); + } + superVisor.deleteRequest(request); + } + }else{ + //A client wants to rollback + if(request!=null) + for(ActorRef neighbor : request.getAnswers()){ + neighbor.tell(new ActionCommitDistributedCommitedTransfer(source,target,amount,false,timestamp,id), self); + } + } + } + +} diff --git a/JavaAkkaFuCoin/src/fucoin/actions/transaction/ActionReceiveTransaction.java b/JavaAkkaFuCoin/src/fucoin/actions/transaction/ActionReceiveTransaction.java new file mode 100644 index 0000000..6abdc2a --- /dev/null +++ b/JavaAkkaFuCoin/src/fucoin/actions/transaction/ActionReceiveTransaction.java @@ -0,0 +1,18 @@ +package fucoin.actions.transaction; + +import akka.actor.ActorRef; +import akka.actor.UntypedActorContext; +import fucoin.Wallet; +//Used to send (positive amount) or retreive money (negative amount) +public class ActionReceiveTransaction extends Transaction { + final public int amount; + public ActionReceiveTransaction(int amount) { + this.amount = amount; + } + + @Override + protected void onAction(ActorRef sender, ActorRef self, + UntypedActorContext context, Wallet wallet) { + wallet.addAmount(wallet.amount); + } +} diff --git a/JavaAkkaFuCoin/src/fucoin/actions/transaction/CoordinatorTransaction.java b/JavaAkkaFuCoin/src/fucoin/actions/transaction/CoordinatorTransaction.java new file mode 100644 index 0000000..7c95795 --- /dev/null +++ b/JavaAkkaFuCoin/src/fucoin/actions/transaction/CoordinatorTransaction.java @@ -0,0 +1,16 @@ +package fucoin.actions.transaction; + +import akka.actor.ActorRef; +import akka.actor.UntypedActorContext; +import fucoin.AbstractNode; +import fucoin.Wallet; +import fucoin.actions.ClientAction; +import fucoin.supervisor.SuperVisor; + +public abstract class CoordinatorTransaction extends SuperVisorAction{ + +@Override +protected abstract void onAction(ActorRef sender, ActorRef self, + UntypedActorContext context, SuperVisor abstractNode); + +} diff --git a/JavaAkkaFuCoin/src/fucoin/actions/transaction/SuperVisorAction.java b/JavaAkkaFuCoin/src/fucoin/actions/transaction/SuperVisorAction.java new file mode 100644 index 0000000..a6bce1e --- /dev/null +++ b/JavaAkkaFuCoin/src/fucoin/actions/transaction/SuperVisorAction.java @@ -0,0 +1,11 @@ +package fucoin.actions.transaction; + +import akka.actor.ActorRef; +import akka.actor.UntypedActorContext; +import fucoin.actions.Action; +import fucoin.actions.ClientAction; +import fucoin.supervisor.SuperVisor; + +public abstract class SuperVisorAction extends Action<SuperVisor>{ + +} diff --git a/JavaAkkaFuCoin/src/fucoin/actions/transaction/Transaction.java b/JavaAkkaFuCoin/src/fucoin/actions/transaction/Transaction.java new file mode 100644 index 0000000..d764afc --- /dev/null +++ b/JavaAkkaFuCoin/src/fucoin/actions/transaction/Transaction.java @@ -0,0 +1,7 @@ +package fucoin.actions.transaction; + +import fucoin.actions.ClientAction; + +public abstract class Transaction extends ClientAction{ + +} diff --git a/JavaAkkaFuCoin/src/fucoin/gui/WalletGui.java b/JavaAkkaFuCoin/src/fucoin/gui/WalletGui.java index 4c6e6b6..3ba36b7 100644 --- a/JavaAkkaFuCoin/src/fucoin/gui/WalletGui.java +++ b/JavaAkkaFuCoin/src/fucoin/gui/WalletGui.java @@ -124,10 +124,13 @@ public WalletGui(IWalletControle walletControle) { }); window.addWindowListener(new WindowAdapter() { + @Override - public void windowClosed(WindowEvent e) { + public void windowClosing(WindowEvent e) { System.out.println("window closing"); walletControle.leave(); + super.windowClosing(e); + } }); } diff --git a/JavaAkkaFuCoin/src/fucoin/supervisor/ActionUpdateQueue.java b/JavaAkkaFuCoin/src/fucoin/supervisor/ActionUpdateQueue.java new file mode 100644 index 0000000..cfafa3d --- /dev/null +++ b/JavaAkkaFuCoin/src/fucoin/supervisor/ActionUpdateQueue.java @@ -0,0 +1,41 @@ +package fucoin.supervisor; + +import java.util.ArrayList; +import java.util.List; + +import akka.actor.ActorRef; +import akka.actor.UntypedActorContext; +import fucoin.actions.transaction.ActionCommitDistributedCommitedTransfer; +import fucoin.actions.transaction.SuperVisorAction; + +public class ActionUpdateQueue extends SuperVisorAction{ + + @Override + protected void onAction(ActorRef sender, ActorRef self, + UntypedActorContext context, SuperVisor superVisor) { + + List<DistributedCommitedTransferRequest> deletes = superVisor.updateList(); + + for(DistributedCommitedTransferRequest outdatedRequest : deletes){ + ActionCommitDistributedCommitedTransfer acdct = new ActionCommitDistributedCommitedTransfer(outdatedRequest); + for(ActorRef neighbor : superVisor.getKnownNeighbors().values()){ + neighbor.tell(acdct, self); + } + } + + context.unwatch(self); + sleep(1000); + context.watch(self); + self.tell(this, self); + } + + private void sleep(long time) { + try { + Thread.sleep(time); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + +} diff --git a/JavaAkkaFuCoin/src/fucoin/supervisor/DistributedCommitedTransferRequest.java b/JavaAkkaFuCoin/src/fucoin/supervisor/DistributedCommitedTransferRequest.java new file mode 100644 index 0000000..a27f873 --- /dev/null +++ b/JavaAkkaFuCoin/src/fucoin/supervisor/DistributedCommitedTransferRequest.java @@ -0,0 +1,58 @@ +package fucoin.supervisor; + +import java.util.LinkedList; +import java.util.List; +import java.util.Random; + +import fucoin.Wallet; +import fucoin.actions.transaction.Transaction; +import akka.actor.ActorRef; +import akka.actor.UntypedActorContext; + +public class DistributedCommitedTransferRequest extends Transaction { + private final static Random random = new Random(System.currentTimeMillis()+System.nanoTime()); + private ActorRef source; + private ActorRef target; + private long timeout; + private long id; + private List<ActorRef> answers = new LinkedList<ActorRef>(); + + public DistributedCommitedTransferRequest(ActorRef source, ActorRef target, + long timeout) { + this.source=source; + this.target=target; + this.timeout=timeout; + this.id=random.nextLong(); + } + + @Override + protected void onAction(ActorRef sender, ActorRef self, + UntypedActorContext context, Wallet wallet) { + + } + + public ActorRef getSource() { + return source; + } + + public ActorRef getTarget() { + return target; + } + + public long getTimeout() { + return timeout; + } + + public int addPositiveAnswer(ActorRef sender) { + answers.add(sender); + return answers.size(); + } + + public List<ActorRef> getAnswers() { + return answers; + } + + public long getId() { + return id; + } +} diff --git a/JavaAkkaFuCoin/src/fucoin/supervisor/SuperVisor.java b/JavaAkkaFuCoin/src/fucoin/supervisor/SuperVisor.java index 5f37677..3107787 100644 --- a/JavaAkkaFuCoin/src/fucoin/supervisor/SuperVisor.java +++ b/JavaAkkaFuCoin/src/fucoin/supervisor/SuperVisor.java @@ -3,24 +3,22 @@ package fucoin.supervisor; import java.awt.Label; import java.util.ArrayList; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Queue; import java.util.concurrent.Semaphore; import akka.actor.ActorRef; import akka.actor.Props; -import akka.actor.UntypedActor; -import fucoin.AbstractWallet.ActionJoin; -import fucoin.AbstractWallet.ActionStoreOrUpdate; -import fucoin.actions.ActionGetAmount; -import fucoin.actions.ActionGetAmountAnswer; -import fucoin.actions.ActionInvokeUpdate; +import fucoin.AbstractNode; +import fucoin.actions.Action; +import fucoin.actions.persist.ActionInvokeUpdate; -public class SuperVisor extends UntypedActor { +public class SuperVisor extends AbstractNode { - private List<ActorRef> knownClients = new ArrayList<ActorRef>(); - private Map<String, Map<String, Integer>> amounts = new HashMap<String, Map<String, Integer>>(); + private AmountTableModel amountTableModel; private Label averageamountLbl; @@ -30,66 +28,14 @@ public class SuperVisor extends UntypedActor { } @Override - public void onReceive(Object msg) throws Exception { - if (msg instanceof ActionJoin) { - if (!knownClients.contains(getSender())) { - knownClients.add(getSender()); - } - } else if (msg instanceof ActionInvokeUpdate) { - log("" + knownClients); - for (ActorRef neighbor : knownClients) { - neighbor.tell(new ActionGetAmount(), getSelf()); - } - } else if (msg instanceof ActionGetAmountAnswer) { - ActionGetAmountAnswer agaa = (ActionGetAmountAnswer) msg; - try { - update(agaa.address, agaa.name, agaa.amount); - } catch (Exception ignoreException) { - } - - } else if (msg instanceof ActionStoreOrUpdate) { - ActionStoreOrUpdate asou = (ActionStoreOrUpdate) msg; - try { - update(asou.w.getAddress(), asou.w.name, asou.w.amount); - } catch (Exception ignoreException) { - } - - knownClients.remove(asou.w.getAddress()); - } - } - - private void log(String msg) { - System.out.println(getSelf() + ": " + msg); + public void onReceive(Object msg) { + log(msg.getClass().getSimpleName()); + + ((Action) msg).doAction(this); } Semaphore mutex = new Semaphore(1); - - private void update(String address, String name, int amount) - throws InterruptedException { - - //log(address + ", " + name + ", " + amount); - if (!amounts.containsKey(address)) { - amounts.put(address, new HashMap<String, Integer>()); - } - amounts.get(address).put(name, amount); - amountTableModel.clear(); - int user = 0; - double avgAmount = 0; - for (Entry<String, Map<String, Integer>> process : amounts.entrySet()) { - for (Entry<String, Integer> account : process.getValue().entrySet()) { - // amountTableModel.addRow(new Object[] { process.getKey(), - // account.getKey(), account.getValue() }); - user++; - avgAmount += account.getValue(); - } - } - if (user > 0) { - avgAmount /= user; - } - avgAmount = ((int) (avgAmount * 100) / 100.0); - this.averageamountLbl.setText("" + avgAmount); - - } + private Map<Long,DistributedCommitedTransferRequest> requestQueue; public static Props props() { return Props.create(SuperVisor.class, new SuperVisorCreator()); @@ -105,19 +51,43 @@ public class SuperVisor extends UntypedActor { @Override public void postStop() throws Exception { - int user = 0; - double avgAmount = 0; - //System.out.println(amounts); - for (Entry<String, Map<String, Integer>> process : amounts.entrySet()) { - for (Entry<String, Integer> account : process.getValue().entrySet()) { - amountTableModel.addRow(new Object[] { process.getKey(), - account.getKey(), account.getValue() }); - user++; - avgAmount += account.getValue(); + super.postStop(); + } + + public void addDistributedCommitedTransferRequest( + DistributedCommitedTransferRequest request) { + requestQueue.put(request.getId(),request); + } + + @Override + public void preStart() throws Exception { + super.preStart(); + requestQueue = new HashMap<Long,DistributedCommitedTransferRequest>(); + self().tell(new ActionUpdateQueue(), self()); + } + + public List<DistributedCommitedTransferRequest> updateList(){ + List<Long> deletesIds = new ArrayList<Long>(); + List<DistributedCommitedTransferRequest> deletes = new ArrayList<DistributedCommitedTransferRequest>(); + for(Entry<Long, DistributedCommitedTransferRequest> outdatedRequest : requestQueue.entrySet()){ + if(outdatedRequest.getValue().getTimeout()<System.currentTimeMillis()){ + deletesIds.add(outdatedRequest.getKey()); + deletes.add(outdatedRequest.getValue()); } } - if (user > 0) { - avgAmount /= user; + for(Long delete : deletesIds){ + requestQueue.remove(delete); } + + return deletes; + } + + public DistributedCommitedTransferRequest getRequest(Long id) { + DistributedCommitedTransferRequest searchedrequest = requestQueue.get(id); + return searchedrequest ; + } + + public void deleteRequest(DistributedCommitedTransferRequest request) { + requestQueue.remove(request.getId()); } } -- GitLab