Skip to content
Snippets Groups Projects
Commit 40b677aa authored by Michael Kmoch's avatar Michael Kmoch
Browse files

2PC is working fine

parent c55ebc08
No related branches found
No related tags found
1 merge request!1Kmoch lewash
Showing
with 531 additions and 0 deletions
package fucoin.actions.persist;
import akka.actor.ActorRef;
import akka.actor.UntypedActorContext;
import fucoin.Wallet;
import fucoin.actions.join.ActionJoin;
public class ActionInvokeRevive extends Persist{
@Override
protected void onAction(ActorRef sender, ActorRef self,
UntypedActorContext context, Wallet wallet) {
wallet.setActive(true);
wallet.getPreknownNeighbour().tell(new ActionJoin(), self);
}
}
package fucoin.actions.persist;
import akka.actor.ActorRef;
import akka.actor.UntypedActorContext;
import fucoin.Wallet;
public class ActionInvokeUpdate extends Persist{
@Override
protected void onAction(ActorRef sender, ActorRef self,
UntypedActorContext context, Wallet wallet) {
// TODO Auto-generated method stub
}
}
package fucoin.actions.persist;
import akka.actor.ActorRef;
import akka.actor.UntypedActorContext;
import fucoin.AbstractWallet;
import fucoin.Wallet;
// Used to search a Wallet by name, i.e. the own wallet if we just
// joined the network; If a receiving participant holds the stored Wallet,
// he returns it, otherwise, he might use gossiping methods to go on
// with the search;
// Note: You should also forward the sender (the participant who actually
// searches for this Wallet, so that it can be returnd the direct way)
public class ActionSearchMyWallet extends Persist{
public final String name;
public ActionSearchMyWallet(String name) {
this.name = name;
}
@Override
protected void onAction(ActorRef sender, ActorRef self,
UntypedActorContext context, Wallet wallet) {
wallet.addKnownNeighbor(name, sender);
AbstractWallet storedWallet =wallet.backedUpNeighbors.get(name);
if(storedWallet!=null){
sender.tell(new ActionSearchMyWalletAnswer(storedWallet), self);
}
}
}
package fucoin.actions.persist;
import akka.actor.ActorRef;
import akka.actor.UntypedActorContext;
import fucoin.AbstractWallet;
import fucoin.Wallet;
// Used to return a searched Wallet
public class ActionSearchMyWalletAnswer extends Persist {
public final AbstractWallet w;
public ActionSearchMyWalletAnswer(AbstractWallet w) {
this.w = w;
}
@Override
protected void onAction(ActorRef sender, ActorRef self,
UntypedActorContext context, Wallet wallet) {
wallet.setAmount(w.amount);
sender.tell(new ActionInvalidate(wallet.name), self);
}
}
package fucoin.actions.persist;
import akka.actor.ActorRef;
import akka.actor.UntypedActorContext;
import fucoin.AbstractNode;
import fucoin.AbstractWallet;
import fucoin.Wallet;
//Used to push the state of my/a wallet to another participant
public class ActionStoreOrUpdate extends Persist {
public final AbstractWallet w;
public ActionStoreOrUpdate(AbstractWallet w) {
this.w = w;
}
@Override
protected void onAction(ActorRef sender, ActorRef self,
UntypedActorContext context, Wallet wallet) {
wallet.backedUpNeighbors.put(w.name, w);
}
}
package fucoin.actions.persist;
import fucoin.actions.ClientAction;
public abstract class Persist extends ClientAction{
}
package fucoin.actions.search;
import java.util.ArrayList;
import java.util.List;
import java.util.Map.Entry;
import akka.actor.ActorRef;
import akka.actor.UntypedActorContext;
import fucoin.Wallet;
//Used to return a Wallet reference (akka-style string which can
// be transformed to an ActorRef)
public class ActionSearchWalletReference extends Search{
public final String name;
public final List<ActorRef> ttl = new ArrayList<ActorRef>();
public ActionSearchWalletReference(String name) {
this.name = name;
}
@Override
protected void onAction(ActorRef sender, ActorRef self,
UntypedActorContext context, Wallet wallet) {
log(wallet.getKnownNeighbors()+"contains "+name+"?");
ttl.add(self);
ActionSearchWalletReferenceAnswer answer = null;
if(this.name.equals(wallet.getName())){
answer = new ActionSearchWalletReferenceAnswer(name,wallet.getAddress(),ttl);
}else if(wallet.backedUpNeighbors.containsKey(name)){
answer = new ActionSearchWalletReferenceAnswer(name,wallet.backedUpNeighbors.get(name).getAddress(),ttl);
} else if(wallet.getKnownNeighbors().containsKey(name)){
answer = new ActionSearchWalletReferenceAnswer(name,wallet.getAddress(wallet.getKnownNeighbors().get(name)),ttl);
} else if (ttl.size()<5){
for(ActorRef neighbor : wallet.getKnownNeighbors().values()){
if(!ttl.contains(neighbor)){
neighbor.tell(this, self);
}
}
}
//System.out.println("ttl:"+ttl.size());
//User unknown by this Wallet
if(answer!=null&&ttl.size()>0){
ttl.get(ttl.size()-1).tell(answer, self);
}
}
}
package fucoin.actions.search;
import java.util.List;
import akka.actor.ActorRef;
import akka.actor.UntypedActorContext;
import fucoin.Wallet;
public class ActionSearchWalletReferenceAnswer extends Search {
public final String address;
public final String name;
public final List<ActorRef> pathToSearchedWallet;
public ActionSearchWalletReferenceAnswer(String name,String address, List<ActorRef> pathToSearchedWallet) {
this.address = address;
this.name=name;
this.pathToSearchedWallet=pathToSearchedWallet;
}
@Override
protected void onAction(ActorRef sender, ActorRef self,
UntypedActorContext context, Wallet wallet) {
ActorRef target = context.actorSelection(address).anchor();
wallet.addKnownNeighbor(name,target);
int pos = pathToSearchedWallet.indexOf(self);
if(pos>0){
pathToSearchedWallet.get(pos-1).tell(this, self);
}
}
}
package fucoin.actions.search;
import fucoin.actions.ClientAction;
public abstract class Search extends ClientAction{
}
package fucoin.actions.transaction;
import akka.actor.ActorRef;
import akka.actor.UntypedActorContext;
import fucoin.Wallet;
import fucoin.actions.ClientAction;
import fucoin.supervisor.DistributedCommitedTransferRequest;
public class ActionCommitDistributedCommitedTransfer extends ClientAction{
private ActorRef source;
private ActorRef target;
private int amount;
private boolean granted;
private long timestamp;
private long id;
public ActionCommitDistributedCommitedTransfer(ActorRef source,
ActorRef target, int amount, boolean granted, long timestamp, long id) {
this.source=source;
this.target=target;
this.amount=amount;
this.granted=granted;
this.timestamp=timestamp;
this.id=id;
}
public ActionCommitDistributedCommitedTransfer(
DistributedCommitedTransferRequest outdatedRequest) {
this.source=outdatedRequest.getSource();
this.target=outdatedRequest.getTarget();
this.amount=0;
this.granted=false;
this.timestamp=outdatedRequest.getTimeout();
this.id=outdatedRequest.getId();
}
@Override
protected void onAction(ActorRef sender, ActorRef self,
UntypedActorContext context, Wallet wallet) {
if(granted){
Integer sourceAmount = wallet.amounts.getOrDefault(source,0);
Integer targetAmount = wallet.amounts.getOrDefault(target,0);
wallet.amounts.put(source,sourceAmount-amount);
wallet.amounts.put(target,targetAmount+amount);
if(source.compareTo(self)==0)wallet.amount-=amount;
else if(target.compareTo(self)==0)wallet.amount+=amount;
wallet.log("have now "+wallet.amounts.get(self)+" Fucoins");
}else{
log("abort transaction with id"+id);
}
log("wallet.amounts:"+wallet.amounts);
}
}
package fucoin.actions.transaction;
import akka.actor.ActorRef;
import akka.actor.UntypedActorContext;
import fucoin.Wallet;
public class ActionGetAmount extends Transaction {
@Override
protected void onAction(ActorRef sender, ActorRef self,
UntypedActorContext context, Wallet wallet) {
ActionGetAmountAnswer agaa = new ActionGetAmountAnswer(wallet.getAddress(),wallet.getName(),wallet.amount);
sender.tell(agaa, self);
}
}
package fucoin.actions;
package fucoin.actions.transaction;
public class ActionGetAmountAnswer {
import akka.actor.ActorRef;
import akka.actor.UntypedActorContext;
import fucoin.Wallet;
public class ActionGetAmountAnswer extends Transaction {
public String address;
public String name;
......@@ -12,4 +16,10 @@ public class ActionGetAmountAnswer {
this.amount=amount;
}
@Override
protected void onAction(ActorRef sender, ActorRef self,
UntypedActorContext context, Wallet wallet) {
}
}
package fucoin.actions.transaction;
import java.util.HashMap;
import akka.actor.ActorRef;
import akka.actor.UntypedActorContext;
import fucoin.Wallet;
import fucoin.supervisor.DistributedCommitedTransferRequest;
import fucoin.supervisor.SuperVisor;
public class ActionInvokeDistributedCommitedTransfer extends CoordinatorTransaction{
private ActorRef source;
private ActorRef target;
private int amount;
public ActionInvokeDistributedCommitedTransfer(ActorRef source,
ActorRef target, int amount) {
this.source=source;
this.target=target;
this.amount=amount;
}
@Override
protected void onAction(ActorRef sender, ActorRef self,
UntypedActorContext context, SuperVisor superVisor) {
log("invoke transaction "+source.path().name()+" "+amount+" "+target.path().name());
long timestamp = System.currentTimeMillis()+500;
DistributedCommitedTransferRequest ds = new DistributedCommitedTransferRequest(source,target,timestamp);
superVisor.addDistributedCommitedTransferRequest(ds);
for(ActorRef neighbor : superVisor.getKnownNeighbors().values()){
neighbor.tell(new ActionPrepareDistributedCommitedTransfer(source,target,amount,timestamp,ds.getId()), self);
}
}
}
package fucoin.actions.transaction;
import akka.actor.ActorRef;
import akka.actor.UntypedActorContext;
import fucoin.Wallet;
import fucoin.actions.search.ActionSearchWalletReference;
public class ActionInvokeSentMoney extends Transaction{
public final String name;
public final int amount;
public ActionInvokeSentMoney(String name, int amount) {
this.name=name;
this.amount = amount;
}
@Override
protected void onAction(ActorRef sender, ActorRef self,
UntypedActorContext context, Wallet wallet) {
log(wallet.getKnownNeighbors()+"");
if(wallet.getKnownNeighbors().containsKey(name)){
wallet.getRemoteSuperVisorActor().tell(
new ActionInvokeDistributedCommitedTransfer(self,wallet.getKnownNeighbors().get(name),amount), sender);
}else{
for(ActorRef neighbor : wallet.getKnownNeighbors().values()){
neighbor.tell(new ActionSearchWalletReference(name), self);
}
try {
context.unwatch(self);
Thread.sleep(200);
context.watch(self);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
//getContext().unwatch(getSelf());
self.tell(new ActionInvokeSentMoney(name, amount), self);
}
}
}
package fucoin.actions.transaction;
import akka.actor.ActorRef;
import akka.actor.UntypedActorContext;
import fucoin.Wallet;
public class ActionInvokeSentMoney2 extends Transaction{
public final String name;
public final int amount;
public ActionInvokeSentMoney2(String name, int amount) {
this.name=name;
this.amount = amount;
}
@Override
protected void onAction(ActorRef sender, ActorRef self,
UntypedActorContext context, Wallet wallet) {
if(wallet.getKnownNeighbors().containsKey(name)){
wallet.addAmount(-amount);
wallet.getKnownNeighbors().get(name).tell(new ActionReceiveTransaction(amount), self);
}
}
}
package fucoin.actions.transaction;
import akka.actor.ActorRef;
import akka.actor.UntypedActorContext;
import fucoin.Wallet;
public class ActionPrepareDistributedCommitedTransfer extends Transaction{
private ActorRef source;
private ActorRef target;
private int amount;
private long timestamp;
private long id;
public ActionPrepareDistributedCommitedTransfer(ActorRef source,
ActorRef target, int amount, long timestamp, long id) {
this.source=source;
this.target=target;
this.amount=amount;
this.timestamp=timestamp;
this.id=id;
}
@Override
protected void onAction(ActorRef sender, ActorRef self,
UntypedActorContext context, Wallet wallet) {
//log("wallet.amounts:"+wallet.amounts);
//log("amount:"+amount);
//log("source:"+source);
//log("check if "+source.path().name()+" has more than "+amount+" he has now "+wallet.amounts.getOrDefault(source,0));
//log("and sender.compareTo(source)==0?"+sender.compareTo(source));
boolean granted = sender.compareTo(source)==0 //sender is supervisor(bank) has allways money
||(wallet.amounts.containsKey(source) //sender is unknown, might be valid
&&wallet.amounts.getOrDefault(source,0)>=amount) ; //sender have enough money
//log("granted?:"+granted);
//log("contains?:"+wallet.amounts.containsKey(source) );
sender.tell(new ActionPrepareDistributedCommitedTransferAnswer(source, target, amount,timestamp,granted,id),self);
}
}
package fucoin.actions.transaction;
import fucoin.supervisor.DistributedCommitedTransferRequest;
import fucoin.supervisor.SuperVisor;
import akka.actor.ActorRef;
import akka.actor.UntypedActorContext;
public class ActionPrepareDistributedCommitedTransferAnswer extends CoordinatorTransaction {
private ActorRef source;
private ActorRef target;
private int amount;
private boolean granted;
private long timestamp;
private long id;
public ActionPrepareDistributedCommitedTransferAnswer(ActorRef source,
ActorRef target, int amount, long timestamp, boolean granted, long id) {
this.source=source;
this.target=target;
this.amount=amount;
this.granted=granted;
this.timestamp=timestamp;
this.id=id;
}
@Override
protected void onAction(ActorRef sender, ActorRef self,
UntypedActorContext context, SuperVisor superVisor) {
log(""+superVisor.getKnownNeighbors());
log("granted?"+granted);
DistributedCommitedTransferRequest request = superVisor.getRequest(id);
if(granted){
if(request==null)//unknown DistributedCommitedTransferRequest ignore
return;
int newCount = request.addPositiveAnswer(sender);
if(newCount == superVisor.getKnownNeighbors().size()){
for(ActorRef neighbor : request.getAnswers()){
neighbor.tell(new ActionCommitDistributedCommitedTransfer(source,target,amount,true,timestamp,id), self);
}
superVisor.deleteRequest(request);
}
}else{
//A client wants to rollback
if(request!=null)
for(ActorRef neighbor : request.getAnswers()){
neighbor.tell(new ActionCommitDistributedCommitedTransfer(source,target,amount,false,timestamp,id), self);
}
}
}
}
package fucoin.actions.transaction;
import akka.actor.ActorRef;
import akka.actor.UntypedActorContext;
import fucoin.Wallet;
//Used to send (positive amount) or retreive money (negative amount)
public class ActionReceiveTransaction extends Transaction {
final public int amount;
public ActionReceiveTransaction(int amount) {
this.amount = amount;
}
@Override
protected void onAction(ActorRef sender, ActorRef self,
UntypedActorContext context, Wallet wallet) {
wallet.addAmount(wallet.amount);
}
}
package fucoin.actions.transaction;
import akka.actor.ActorRef;
import akka.actor.UntypedActorContext;
import fucoin.AbstractNode;
import fucoin.Wallet;
import fucoin.actions.ClientAction;
import fucoin.supervisor.SuperVisor;
public abstract class CoordinatorTransaction extends SuperVisorAction{
@Override
protected abstract void onAction(ActorRef sender, ActorRef self,
UntypedActorContext context, SuperVisor abstractNode);
}
package fucoin.actions.transaction;
import akka.actor.ActorRef;
import akka.actor.UntypedActorContext;
import fucoin.actions.Action;
import fucoin.actions.ClientAction;
import fucoin.supervisor.SuperVisor;
public abstract class SuperVisorAction extends Action<SuperVisor>{
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment