Thursday, September 3, 2009

First version bench

Transactions per second, measured at the receiving end (50 people sending to 50 peers, 2500 expected). A new batch of messages is only sent once all 2500 have been received, so a single message being lost in the system will result in test freeze and is a failure.

0-0=0
124-0=124
400-124=276
788-400=388
2125-788=1337
2431-2125=306
2890-2431=459
3348-2890=458
3909-3348=561
4164-3909=255
4521-4164=357
4776-4521=255
4980-4776=204
5235-4980=255
5591-5235=356
5846-5591=255
6254-5846=408
6713-6254=459
7019-6713=306
8039-7019=1020
8548-8039=509
9211-8548=663
9362-9211=151
9362-9362=0
10153-9362=791
11305-10153=1152
11823-11305=518
12742-11823=919
14412-12742=1670
16138-14412=1726
18115-16138=1977
19696-18115=1581
20162-19696=466
20526-20162=364
20886-20526=360
21167-20886=281
21167-21167=0
21167-21167=0
21167-21167=0

As you can see, we approached desired speed for a bit but then fell back down and froze. No further work was accomplished for the remainder of the test. The auditing profile looks like this:

Snapshot[period=10000ms]:
Threads=4 Tasks=10632/11837
AverageQueueSize=32.67 tasks

Snapshot[period=10000ms]:
Threads=4 Tasks=3281/4837
AverageQueueSize=137.71 tasks

Snapshot[period=10000ms]:
Threads=4 Tasks=1074/2369
AverageQueueSize=226.95 tasks

Snapshot[period=10000ms]:
Threads=4 Tasks=36/116
AverageQueueSize=0.87 tasks

Snapshot[period=10000ms]:
Threads=4 Tasks=35/118
AverageQueueSize=0.87 tasks

Snapshot[period=10000ms]:
Threads=4 Tasks=34/115
AverageQueueSize=0.88 tasks

What that profile suggests to me is that the baseline 35/35 maintenance tasks, which run every ten seconds on an idling darkstar instance, are completing, and all the others are finding it impossible. They might be always taking more than 100ms, or they might be all grabbing for the same resources. While this does merit more examination, I'm excited to try the server revisions suggested by the PDS forum, so am implementing and benchmarking that now. Stand by...

Darkstar Data structures comparison



This is lifted from a live discussion at the Darkstar forums. I thought maybe it deserved a more structured housing: Original discussion

Application description:

It's like a collaborative powerpoint presentation. There are many slides and each of the slides has a large number of users who are contributing content to it. When a user is on the same slide as me, that user sees the content I contribute. This is implemented with a channel for each slide. There is also a global channel on which a ping is sent to you each time any content is contributed, even if you are on a different slide from the originator. When you move to a slide it is necessary to obtain all of the content history from that slide, as well as to become receptive to all of the new content from that point until you leave.

Our load testing involves hundreds of simulated users moving in tandem from slide to slide. This means that at every new join they are contending heavily for the archived content.

Description over.

We've been trying several ways of optimizing these structures because it turns out that contention is basically what kills Darkstar performance - when two transactions go for the same object in the datastore, one of them has to cancel and try again later. If it was itself already the proud holder of the locks on some other objects that locking is wasted and it all has to happen again later.

Second attempt:


This one involves cloning the keyset from the map of users for that slide. I'm cloning them because I theorize that I can release the ManagedObject faster that way - as soon as I exit my transaction I will have put it back in the store, whereas if I passed the keys literally they might still be locking until the subtask released them. I could be wrong...

At each stage of these moveSlide is the first server call post message switching.


private void moveSlide(String destination, ByteBuffer message) {
Integer.parseInt(destination);//Just checking.
ClientSession session = session();
String key = session.getName()+"@"+destination;
ScalableHashMap slide;
try{
slide= (ScalableHashMap)AppContext.getDataManager().getBinding(destination);
}
catch(NameNotBoundException e){//If this slide does not previously exist
slide = new ScalableHashMap();
AppContext.getDataManager().setBinding(destination, slide);
}
for(Object userObject : slide.keySet())
{
String user = userObject.toString();
ScalableList userData = (ScalableList)slide.get(userObject);
AppContext.getTaskManager().scheduleTask(new StartDistributingUserStrokes(userData.toArray(), destination, user));
}

AppContext.getChannelManager().getChannel(GLOBAL_CHANNEL_NAME).send(session,
toMessageBuffer("/MOVE_SLIDE "+session.getName()+" "+destination));
}


So, for each cloned key spawn a subtask which does all the work of StartDistributingUserStrokes. As you can guess, that distributes the user strokes:


private class StartDistributingUserStrokes implements Serializable, Task
{
private Object[] allWork;
private String slide;
private String user;
private int packageSize;
private StartDistributingUserStrokes(Object[] userData, String destination, String author) {
slide = destination;
this.allWork = userData;
user = author;
packageSize = allWork.length;
}

@Override
public void run() throws Exception {
ClientSession session = session();
if(allWork.length!= 0)
{
Object[] strokes = allWork;
session.send(toMessageBuffer(strokes[0].toString()));
Object[] remainingWork = new Object[strokes.length -1];
if(strokes.length > 1)
{
for(int i =1; i < strokes.length; i++)
{
remainingWork[i-1] = strokes[i];
}
}
AppContext.getTaskManager().scheduleTask(new StartDistributingUserStrokes(remainingWork, slide, user));
}
else if(allWork.length == 0){
session.send(toMessageBuffer("/ALL_CONTENT_SENT "+slide));
session.send(toMessageBuffer("/PING "+user+" "+slide+" "+packageSize));
return;
}
}
}