Skip to content
Snippets Groups Projects
Commit b2653e3f authored by Simon Könnecke's avatar Simon Könnecke
Browse files

since the absented of fully connected network the coins transaction is a bit buggy, add DirtyBugFix

parent 9a20300e
Branches
No related tags found
No related merge requests found
package fucoin.actions.control;
import akka.actor.ActorRef;
import akka.actor.UntypedActorContext;
import fucoin.actions.ClientAction;
import fucoin.wallet.AbstractWallet;
import java.util.List;
public class ActionDirtyBugFix extends ClientAction {
private List<ActorRef> actorRefList;
public ActionDirtyBugFix(List<ActorRef> actorRefList) {
this.actorRefList = actorRefList;
}
@Override
protected void onAction(ActorRef sender, ActorRef self, UntypedActorContext context, AbstractWallet abstractNode) {
actorRefList.forEach(actor -> {
abstractNode.amounts.put(actor, 100);
});
}
}
...@@ -47,10 +47,10 @@ public class ActionCommitDistributedCommittedTransfer extends ClientAction { ...@@ -47,10 +47,10 @@ public class ActionCommitDistributedCommittedTransfer extends ClientAction {
if (granted) { if (granted) {
if (source.compareTo(self) == 0) { if (source.compareTo(self) == 0) {
wallet.getqDigest().offer(amount);
wallet.setAmount(wallet.getAmount() - amount); wallet.setAmount(wallet.getAmount() - amount);
wallet.addTransactionLogMessageSuccess("Sent " + amount + " FUC to " + target.path().name()); wallet.addTransactionLogMessageSuccess("Sent " + amount + " FUC to " + target.path().name());
} else if (target.compareTo(self) == 0) { } else if (target.compareTo(self) == 0) {
wallet.getqDigest().offer(amount);
wallet.setAmount(wallet.getAmount() + amount); wallet.setAmount(wallet.getAmount() + amount);
wallet.addTransactionLogMessageSuccess("Received " + amount + " FUC from " + source.path().name()); wallet.addTransactionLogMessageSuccess("Received " + amount + " FUC from " + source.path().name());
} }
...@@ -70,7 +70,7 @@ public class ActionCommitDistributedCommittedTransfer extends ClientAction { ...@@ -70,7 +70,7 @@ public class ActionCommitDistributedCommittedTransfer extends ClientAction {
} }
// recipient should notify a possible observer // recipient should notify a possible observer
if (observer != null && granted) { if (target.equals(self) && observer != null) {
observer.tell(new ActionNotifyObserver(source, target, amount, granted, timestamp, id), self); observer.tell(new ActionNotifyObserver(source, target, amount, granted, timestamp, id), self);
} }
//wallet.addLogMsg("wallet.amounts:" + wallet.amounts); //wallet.addLogMsg("wallet.amounts:" + wallet.amounts);
......
...@@ -24,13 +24,28 @@ public class ActionPrepareDistributedCommittedTransfer extends Transaction { ...@@ -24,13 +24,28 @@ public class ActionPrepareDistributedCommittedTransfer extends Transaction {
@Override @Override
protected void onAction(ActorRef sender, ActorRef self, protected void onAction(ActorRef sender, ActorRef self,
UntypedActorContext context, AbstractWallet wallet) { UntypedActorContext context, AbstractWallet wallet) {
boolean granted = amount > 0 && //check are the constrains
//sender is supervisor(bank) has always money boolean granted = false;
(sender.compareTo(source) == 0 if (amount > 0) {
//the sender is it self
if (self.equals(source) && wallet.getAmount() >= amount) {
granted = true;
} else {
//check if sender have enough money
if (wallet.amounts.containsKey(source)) {
granted = wallet.amounts.getOrDefault(source, 0) >= amount;
} else {
//sender is unknown, might be valid //sender is unknown, might be valid
|| (wallet.amounts.containsKey(source) granted = true;
//sender have enough money }
&& wallet.amounts.getOrDefault(source, 0) >= amount)); }
}
//SuperVisor right.
if (amount > 0 && sender.compareTo(source) == 0) {
granted = true;
}
// precautionly update own ledger to prevent double spending (respectively agreeing) // precautionly update own ledger to prevent double spending (respectively agreeing)
Integer sourceAmount = wallet.amounts.getOrDefault(source, 0); Integer sourceAmount = wallet.amounts.getOrDefault(source, 0);
...@@ -38,6 +53,7 @@ public class ActionPrepareDistributedCommittedTransfer extends Transaction { ...@@ -38,6 +53,7 @@ public class ActionPrepareDistributedCommittedTransfer extends Transaction {
wallet.amounts.put(source, sourceAmount - amount); wallet.amounts.put(source, sourceAmount - amount);
wallet.amounts.put(target, targetAmount + amount); wallet.amounts.put(target, targetAmount + amount);
sender.tell(new ActionPrepareDistributedCommittedTransferAnswer(source, target, amount, timestamp, granted, id), self); sender.tell(new ActionPrepareDistributedCommittedTransferAnswer(source, target, amount, timestamp, granted, id), self);
} }
......
...@@ -5,10 +5,7 @@ import akka.actor.Props; ...@@ -5,10 +5,7 @@ import akka.actor.Props;
import akka.pattern.Patterns; import akka.pattern.Patterns;
import akka.util.Timeout; import akka.util.Timeout;
import fucoin.AbstractNode; import fucoin.AbstractNode;
import fucoin.actions.control.ActionAnnounceWalletCreation; import fucoin.actions.control.*;
import fucoin.actions.control.ActionWalletGetNeighbours;
import fucoin.actions.control.ActionWalletGetNeighboursAnswer;
import fucoin.actions.control.ActionWalletSendMoney;
import fucoin.actions.join.ActionTellSupervisor; import fucoin.actions.join.ActionTellSupervisor;
import fucoin.actions.transaction.ActionGetAmount; import fucoin.actions.transaction.ActionGetAmount;
import fucoin.actions.transaction.ActionGetAmountAnswer; import fucoin.actions.transaction.ActionGetAmountAnswer;
...@@ -38,34 +35,51 @@ public abstract class AbstractConfiguration extends AbstractNode { ...@@ -38,34 +35,51 @@ public abstract class AbstractConfiguration extends AbstractNode {
protected int remainingTransactions; protected int remainingTransactions;
protected int remainingWalletsToSpawn;
protected boolean walletGuiSupport = false;
public static Props props(Class configurationClass) { public static Props props(Class configurationClass) {
return Props.create(new ConfigurationCreator(configurationClass)); return Props.create(new ConfigurationCreator(configurationClass));
} }
/** /**
* Spawns a new wallet and blocks until it has received its initial money * Spawns a new wallet.
* *
* @throws Exception on timeout * @throws Exception on timeout
*/ */
ActorRef spawnWallet(String name, boolean createGUI) throws Exception { ActorRef spawnWallet(String name, boolean guiSupport) throws Exception {
superVisor.tell(new ActionAnnounceWalletCreation(1, self()), self());
Future<Object> future = Patterns.ask(superVisor, new ActionAnnounceWalletCreation(1, self()), timeout); Future<Object> future = Patterns.ask(superVisor, new ActionAnnounceWalletCreation(1, self()), timeout);
ActorRef wallet = createWallet(name, createGUI); ActorRef wallet = createWallet(name, guiSupport);
Await.result(future, timeout.duration()); Await.result(future, timeout.duration());
return wallet; return wallet;
} }
ActorRef spawnWallet() {
if (remainingWalletsToSpawn <= 0) {
return null;
}
String name = "Wallet" + remainingWalletsToSpawn--;
//superVisor.tell(new ActionAnnounceWalletCreation(), self());
//Future<Object> future = Patterns.ask(superVisor, new ActionAnnounceWalletCreation(1, self()), timeout);
ActorRef wallet = createWallet(name, walletGuiSupport);
//Await.result(future, timeout.duration());
return wallet;
}
/** /**
* Creates a wallet without blocking until the wallet was created * Creates a wallet without blocking until the wallet was created
*/ */
private ActorRef createWallet(String name, boolean createGUI) { private ActorRef createWallet(String name, boolean guiSupport) {
Props props; Props props;
int numOfWallets = activeActors.size(); int numOfWallets = activeActors.size();
if (numOfWallets == 0) { if (numOfWallets == 0) {
props = WalletImpl.props(null, name, createGUI); props = WalletImpl.props(null, name, guiSupport);
} else { } else {
ActorRef knownNeighbour = activeActors.get(ThreadLocalRandom.current().nextInt(activeActors.size())); ActorRef knownNeighbour = activeActors.get(ThreadLocalRandom.current().nextInt(activeActors.size()));
props = WalletImpl.props(knownNeighbour, name, createGUI); props = WalletImpl.props(knownNeighbour, name, guiSupport);
} }
ActorRef actorRef = context().actorOf(props, name); ActorRef actorRef = context().actorOf(props, name);
...@@ -80,19 +94,6 @@ public abstract class AbstractConfiguration extends AbstractNode { ...@@ -80,19 +94,6 @@ public abstract class AbstractConfiguration extends AbstractNode {
return actorRef; return actorRef;
} }
/**
* Spawn multiple wallets and wait until they all have their initial FUC
*
* @throws Exception on timeout
*/
public void spawnWallets(int n, boolean createGUI) throws Exception {
Future<Object> future = Patterns.ask(superVisor, new ActionAnnounceWalletCreation(n, self()), timeout);
for (int i = 0; i < n; i++) {
String nameOfTheWallet = "Wallet" + String.valueOf(activeActors.size());
createWallet(nameOfTheWallet, createGUI);
}
Await.result(future, timeout.duration());
}
/** /**
* Fetch a random wallet * Fetch a random wallet
...@@ -120,12 +121,15 @@ public abstract class AbstractConfiguration extends AbstractNode { ...@@ -120,12 +121,15 @@ public abstract class AbstractConfiguration extends AbstractNode {
return; return;
} }
System.out.println("Next Random Transaction, " + remainingTransactions + " left.");
remainingTransactions--; remainingTransactions--;
try { try {
randomTransaction(); randomTransaction();
} catch (Exception e) { } catch (Exception e) {
System.err.println("Error while trying to perform a random transaction: " + e.getMessage()); System.err.println("Error while trying to perform a random transaction: " + e.getMessage());
remainingTransactions = 0; //remainingTransactions = 0;
nextRandomTransaction();
} }
} }
...@@ -148,6 +152,8 @@ public abstract class AbstractConfiguration extends AbstractNode { ...@@ -148,6 +152,8 @@ public abstract class AbstractConfiguration extends AbstractNode {
//cancel here when no neighbour was found. //cancel here when no neighbour was found.
if (recipient == null) { if (recipient == null) {
remainingTransactions++;
nextRandomTransaction();
return; return;
} }
...@@ -189,8 +195,20 @@ public abstract class AbstractConfiguration extends AbstractNode { ...@@ -189,8 +195,20 @@ public abstract class AbstractConfiguration extends AbstractNode {
System.out.println("Observed a " + status + " transaction of " + notification.amount + " FUCs from " + System.out.println("Observed a " + status + " transaction of " + notification.amount + " FUCs from " +
notification.source.path().name() + " to " + notification.target.path().name()); notification.source.path().name() + " to " + notification.target.path().name());
if (remainingTransactions > 0) { if (remainingTransactions > 0 && remainingWalletsToSpawn <= 0) {
nextRandomTransaction(); nextRandomTransaction();
} else if (remainingWalletsToSpawn > 0) {
spawnWallet();
try {
// init amount table from the wallets
if (remainingWalletsToSpawn <= 0) {
activeActors.forEach(actorRef -> actorRef.tell(new ActionDirtyBugFix(activeActors), self()));
}
Thread.sleep(400);
} catch (InterruptedException e) {
e.printStackTrace();
}
} }
} }
} }
......
package fucoin.configurations; package fucoin.configurations;
import fucoin.actions.transaction.ActionNotifyObserver;
import fucoin.configurations.internal.ConfigurationName; import fucoin.configurations.internal.ConfigurationName;
/** /**
...@@ -8,28 +7,23 @@ import fucoin.configurations.internal.ConfigurationName; ...@@ -8,28 +7,23 @@ import fucoin.configurations.internal.ConfigurationName;
*/ */
@ConfigurationName("Lots of Wallets") @ConfigurationName("Lots of Wallets")
public class MassWalletConfiguration extends AbstractConfiguration { public class MassWalletConfiguration extends AbstractConfiguration {
private boolean runRandomTransactions = false;
@Override @Override
public void run() { public void run() {
initSupervisor(); //Config
try { remainingWalletsToSpawn = 5;
spawnWallets(20, false);
System.out.println("Wallet spawning done!");
Thread.sleep(4000);
} catch (Exception e) {
System.out.println("Wallet spawning timed out!");
}
remainingTransactions = 100; remainingTransactions = 100;
runRandomTransactions = true;
nextRandomTransaction(); //Start SuperVisor
//randomTransactions(100, 10); initSupervisor();
//Init spawning of wallets
System.out.println("Start Wallet spawning!");
spawnWallet();
} }
@Override @Override
public void onReceive(Object message) { public void onReceive(Object message) {
if (message instanceof ActionNotifyObserver && runRandomTransactions) {
nextRandomTransaction();
}
super.onReceive(message); super.onReceive(message);
} }
} }
...@@ -2,9 +2,9 @@ package fucoin.wallet; ...@@ -2,9 +2,9 @@ package fucoin.wallet;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import com.google.gson.Gson; import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import fucoin.AbstractNode; import fucoin.AbstractNode;
import fucoin.gui.TransactionLogger; import fucoin.gui.TransactionLogger;
import scala.concurrent.Future;
import java.io.Serializable; import java.io.Serializable;
import java.time.LocalDateTime; import java.time.LocalDateTime;
...@@ -14,7 +14,7 @@ import java.time.LocalDateTime; ...@@ -14,7 +14,7 @@ import java.time.LocalDateTime;
*/ */
public abstract class AbstractWallet extends AbstractNode implements Serializable, TransactionLogger { public abstract class AbstractWallet extends AbstractNode implements Serializable, TransactionLogger {
private static final double sCompression = 0.7; private static final double sCompression = 0.1;
private QDigest qDigest = new QDigest(sCompression); private QDigest qDigest = new QDigest(sCompression);
/** /**
...@@ -131,7 +131,8 @@ public abstract class AbstractWallet extends AbstractNode implements Serializabl ...@@ -131,7 +131,8 @@ public abstract class AbstractWallet extends AbstractNode implements Serializabl
public boolean writeSnapshot(LocalDateTime time) { public boolean writeSnapshot(LocalDateTime time) {
return super.writeSnapshot(getName(), new Gson().toJson(new Snapshot(this)), time); return super.writeSnapshot(getName(),
(new GsonBuilder().setPrettyPrinting().create()).toJson(new Snapshot(this)), time);
} }
public Snapshot readSnapShot(LocalDateTime time) { public Snapshot readSnapShot(LocalDateTime time) {
......
package fucoin.wallet; package fucoin.wallet;
import akka.actor.ActorRef;
import java.io.Serializable; import java.io.Serializable;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Set;
public class Snapshot implements Serializable { public class Snapshot implements Serializable {
private final String name; private final String name;
private final int amount; private final int amount;
private final byte[] qDigest; private final Set<String> knownNeighbour;
private final HashMap<ActorRef, Integer> knownNeighbourAmounts;
private final List<long[]> qDigestVal; private final List<long[]> qDigestVal;
private final byte[] qDigest;
public Snapshot(AbstractWallet wallet) { public Snapshot(AbstractWallet wallet) {
this.name = wallet.getName(); this.name = wallet.getName();
this.amount = wallet.getAmount(); this.amount = wallet.getAmount();
this.qDigest = QDigest.serialize(wallet.getqDigest()); this.qDigest = QDigest.serialize(wallet.getqDigest());
this.qDigestVal = wallet.getqDigest().toAscRanges(); this.qDigestVal = wallet.getqDigest().toAscRanges();
this.knownNeighbour = wallet.getKnownNeighbors().keySet();
this.knownNeighbourAmounts = wallet.amounts;
} }
public String getName() { public String getName() {
......
  • Guest

    Maybe you want to take a look at what we are currently working on in the dev-group3-gephi branch. There we added an additional overlay network that can be configured by a gephi graph file and should provide a good base for all algorithms. That way you do not have to alter any existing behavior of the distributed commit.

0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment