Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package fucoin.actions.transaction;
import akka.actor.ActorRef;
import akka.actor.UntypedActorContext;
import fucoin.supervisor.DistributedCommittedTransferRequest;
import fucoin.supervisor.SuperVisorImpl;
public class ActionInvokeDistributedCommittedTransfer extends CoordinatorTransaction {
private ActorRef source;
private ActorRef target;
private int amount;
public ActionInvokeDistributedCommittedTransfer(ActorRef source,
ActorRef target, int amount) {
this.source = source;
this.target = target;
this.amount = amount;
}
@Override
protected void onAction(ActorRef sender, ActorRef self,
UntypedActorContext context, SuperVisorImpl superVisor) {
log("invoke transaction " + source.path().name() +
" sends " + amount +
" to " + target.path().name());
long timeout = System.currentTimeMillis() + 500;
DistributedCommittedTransferRequest ds = new DistributedCommittedTransferRequest(source, target, timeout);
superVisor.addDistributedCommitedTransferRequest(ds);
ActionPrepareDistributedCommittedTransfer apdct = new ActionPrepareDistributedCommittedTransfer(source, target, amount, timeout, ds.getId());
for (ActorRef neighbor : superVisor.getKnownNeighbors().values()) {
neighbor.tell(apdct, self);
}
}
}