Skip to content
Snippets Groups Projects
Unverified Commit 7a9a92de authored by David Bohn's avatar David Bohn
Browse files

Merge branch 'dev-group3' of...

Merge branch 'dev-group3' of git.imp.fu-berlin.de:DistributedSystems4Students/FUCoin into dev-group3
parents 0d61ebd9 af738894
No related branches found
No related tags found
1 merge request!5Configuration system
Showing
with 118 additions and 18 deletions
......@@ -13,13 +13,20 @@ public class ActionWalletSendMoney extends ClientAction {
protected String address;
protected int amount;
public ActionWalletSendMoney(String address, int amount) {
protected ActorRef observer;
public ActionWalletSendMoney(String address, int amount, ActorRef observer) {
this.address = address;
this.amount = amount;
this.observer = observer;
}
public ActionWalletSendMoney(String address, int amount){
this(address, amount, null);
}
@Override
protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, AbstractWallet abstractNode) {
abstractNode.send(address, amount);
abstractNode.send(address, amount, observer);
}
}
......@@ -14,16 +14,19 @@ public class ActionCommitDistributedCommittedTransfer extends ClientAction {
private boolean granted;
private long timestamp;
private long id;
private ActorRef observer;
public ActionCommitDistributedCommittedTransfer(ActorRef source, ActorRef target,
int amount, boolean granted, long timestamp, long id) {
int amount, boolean granted, long timestamp, long id,
ActorRef observer) {
this.source = source;
this.target = target;
this.amount = amount;
this.granted = granted;
this.timestamp = timestamp;
this.id = id;
this.observer = observer;
}
public ActionCommitDistributedCommittedTransfer(
......@@ -34,6 +37,7 @@ public class ActionCommitDistributedCommittedTransfer extends ClientAction {
this.granted = false;
this.timestamp = outdatedRequest.getTimeout();
this.id = outdatedRequest.getId();
this.observer = outdatedRequest.getObserver();
}
@Override
......@@ -48,6 +52,11 @@ public class ActionCommitDistributedCommittedTransfer extends ClientAction {
} else if (target.compareTo(self) == 0) {
wallet.setAmount(wallet.getAmount() + amount);
wallet.addTransactionLogMessageSuccess("Received " + amount + " FUC from " + source.path().name());
// recipient should notify a possible observer
if (observer != null){
observer.tell(new ActionNotifyObserver(source,target,amount,granted,timestamp, id), self);
}
}
} else {
......@@ -61,6 +70,9 @@ public class ActionCommitDistributedCommittedTransfer extends ClientAction {
if (source.compareTo(self) == 0) {
wallet.addTransactionLogMessageFail("Failed to send " + amount + " FUC to " + target.path().name() + " (Commit has not been granted)");
if (observer != null){
observer.tell(new ActionNotifyObserver(source,target,amount,granted,timestamp, id), self);
}
}
}
......
......@@ -9,12 +9,19 @@ public class ActionInvokeDistributedCommittedTransfer extends CoordinatorTransac
private ActorRef source;
private ActorRef target;
private ActorRef observer;
private int amount;
public ActionInvokeDistributedCommittedTransfer(ActorRef source,
ActorRef target, int amount) {
ActorRef target, int amount){
this(source, target, amount, null);
}
public ActionInvokeDistributedCommittedTransfer(ActorRef source,
ActorRef target, int amount, ActorRef observer) {
this.source = source;
this.target = target;
this.observer = observer;
this.amount = amount;
}
......@@ -27,7 +34,7 @@ public class ActionInvokeDistributedCommittedTransfer extends CoordinatorTransac
long timeout = System.currentTimeMillis() + 500;
DistributedCommittedTransferRequest ds = new DistributedCommittedTransferRequest(source, target, amount, timeout);
DistributedCommittedTransferRequest ds = new DistributedCommittedTransferRequest(source, target, amount, timeout, observer);
superVisor.addDistributedCommitedTransferRequest(ds);
ActionPrepareDistributedCommittedTransfer apdct = new ActionPrepareDistributedCommittedTransfer(source, target, amount, timeout, ds.getId());
for (ActorRef neighbor : superVisor.getKnownNeighbors().values()) {
......
......@@ -9,10 +9,12 @@ import fucoin.wallet.AbstractWallet;
public class ActionInvokeSentMoney extends Transaction {
public final String name;
public final int amount;
public final ActorRef observer;
public ActionInvokeSentMoney(String name, int amount) {
public ActionInvokeSentMoney(String name, int amount, ActorRef observer) {
this.name = name;
this.amount = amount;
this.observer = observer;
}
@Override
......@@ -21,7 +23,7 @@ public class ActionInvokeSentMoney extends Transaction {
wallet.addLogMsg(wallet.getKnownNeighbors() + "");
if (wallet.getKnownNeighbors().containsKey(name)) {
wallet.getRemoteSuperVisorActor().tell(
new ActionInvokeDistributedCommittedTransfer(self, wallet.getKnownNeighbors().get(name), amount), sender);
new ActionInvokeDistributedCommittedTransfer(self, wallet.getKnownNeighbors().get(name), amount, observer), sender);
} else {
ActionSearchWalletReference aswr = new ActionSearchWalletReference(name);
for (ActorRef neighbor : wallet.getKnownNeighbors().values()) {
......
package fucoin.actions.transaction;
import akka.actor.ActorRef;
import akka.actor.UntypedActorContext;
import fucoin.AbstractNode;
import fucoin.actions.Action;
/**
* Used to notify an observer of a transaction when the transaction is finished.
*/
public class ActionNotifyObserver extends Action<AbstractNode> {
public ActorRef source;
public ActorRef target;
public int amount;
public boolean granted;
public long timestamp;
public long id;
public ActionNotifyObserver(ActorRef source, ActorRef target,
int amount, boolean granted, long timestamp, long id){
this.source = source;
this.target = target;
this.amount = amount;
this.granted = granted;
this.timestamp = timestamp;
this.id = id;
}
@Override
protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, AbstractNode abstractNode) {
}
}
......@@ -27,16 +27,22 @@ public class ActionPrepareDistributedCommittedTransferAnswer extends Coordinator
@Override
protected void onAction(ActorRef sender, ActorRef self,
UntypedActorContext context, SuperVisorImpl superVisor) {
//superVisor.addLogMsg("" + superVisor.getKnownNeighbors());
superVisor.addLogMsg("granted?" + granted);
DistributedCommittedTransferRequest request = superVisor.getRequest(id);
// ignore unknown requests
if (request == null){
return;
}
if (granted) {
if (request == null)//unknown DistributedCommittedTransferRequest ignore
return;
int newCount = request.addPositiveAnswer(sender);
if (newCount == superVisor.getKnownNeighbors().size()) {
ActionCommitDistributedCommittedTransfer acdct = new ActionCommitDistributedCommittedTransfer(source, target, amount, true, timestamp, id);
ActionCommitDistributedCommittedTransfer acdct = new ActionCommitDistributedCommittedTransfer(source, target, amount, true, timestamp, id, request.getObserver());
superVisor.addTransactionLogMessageSuccess("Transfer of " + amount + " FUC from" + source.path().name() + " to " + target.path().name());
for (ActorRef neighbor : request.getAnswers()) {
neighbor.tell(acdct, self);
......@@ -45,13 +51,11 @@ public class ActionPrepareDistributedCommittedTransferAnswer extends Coordinator
}
} else {
//A client wants to rollback
if (request != null) {
superVisor.addTransactionLogMessageFail("Client does not grant commit of " + amount + " FUC (" + source.path().name() + " -> " + target.path().name() + ")");
ActionCommitDistributedCommittedTransfer acdct = new ActionCommitDistributedCommittedTransfer(source, target, amount, false, timestamp, id);
ActionCommitDistributedCommittedTransfer acdct = new ActionCommitDistributedCommittedTransfer(source, target, amount, false, timestamp, id, request.getObserver());
for (ActorRef neighbor : request.getAnswers()) {
neighbor.tell(acdct, self);
}
}
}
}
......
package fucoin.configurations;
import akka.actor.ActorRef;
import fucoin.actions.Action;
import fucoin.actions.control.ActionWalletSendMoney;
import fucoin.actions.transaction.ActionNotifyObserver;
import fucoin.configurations.internal.ConfigurationName;
/**
......@@ -24,11 +26,22 @@ public class DefaultConfiguration extends AbstractConfiguration {
e.printStackTrace();
}
wallet1.tell(new ActionWalletSendMoney("Wallet1", 50), wallet1);
wallet1.tell(new ActionWalletSendMoney("Wallet1", 50, getSelf()), wallet1);
}
@Override
public void onReceive(Object message) {
if (message instanceof ActionNotifyObserver){
ActionNotifyObserver notification = (ActionNotifyObserver) message;
String status = "successful";
if(! notification.granted){
status = "failed";
}
System.out.println("Observed a " + status + " transaction of " + notification.amount+" FUCs from " +
notification.source.path().name() + " to " + notification.target.path().name());
}
}
}
......@@ -13,6 +13,8 @@ public class DistributedCommittedTransferRequest extends Transaction {
private final static Random random = new Random(System.currentTimeMillis() + System.nanoTime());
private ActorRef source;
private ActorRef target;
private ActorRef observer;
private int amount;
private long timeout;
......@@ -21,13 +23,15 @@ public class DistributedCommittedTransferRequest extends Transaction {
private List<ActorRef> answers = new LinkedList<>();
public DistributedCommittedTransferRequest(ActorRef source, ActorRef target, int amount,
long timeout) {
long timeout, ActorRef observer) {
this.source = source;
this.target = target;
this.amount = amount;
this.timeout = timeout;
this.id = random.nextLong();
this.observer = observer;
}
@Override
......@@ -64,4 +68,8 @@ public class DistributedCommittedTransferRequest extends Transaction {
public int getAmount() {
return amount;
}
public ActorRef getObserver(){
return observer;
}
}
......@@ -99,10 +99,19 @@ public abstract class AbstractWallet extends AbstractNode implements Serializabl
public abstract void deferSendOfSuperVisorActor(ActorRef destinationWallet);
/**
* Sends amount FUCs to the wallet with the address adress
* Sends amount FUCs to the wallet with the address address
*
* @param address Recipients address
* @param amount Amount to send
*/
public abstract void send(String address, int amount);
/**
* Sends amount FUCs to the wallet with the address address and the observer observer
*
* @param address
* @param amount
* @param observer
*/
public abstract void send(String address, int amount, ActorRef observer);
}
......@@ -211,7 +211,12 @@ public class WalletImpl extends AbstractWallet {
}
public void send(String address, int amount) {
getSelf().tell(new ActionInvokeSentMoney(address, amount), getSelf());
send(address, amount, null);
}
@Override
public void send(String address, int amount, ActorRef observer) {
getSelf().tell(new ActionInvokeSentMoney(address, amount, observer), getSelf());
}
@Override
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment