Browsing the tag java
Clustering Wicket for fun and profit!
2 Comments | Filed under administration main open source programmingI hate expired sessions, death to all expired sessions. Traditionally a Java servlet container has a fixed session time, a flood of traffic can potentially cause JVM OOM errors if the session time is set too high. I wanted a smart session container that can hold onto sessions for as long as possible and expire sessions only when it is absolutely necessary; A Memcached store would be perfect for this.
There for I recently open sourced the jetty-session-store to solve this problem. With the jetty-session-store you can save your session state to Ehcache, Memcached or the database. State should not be bound to a single JVM, Viva Shared Session Stores!
So now that jetty-session-store is out in the wild you can technically cluster Wicket using just the HttpSessionStore. However, it isn’t very efficient with the way Memcached allocates data in fixed sized cache buckets.
1. Wicket sessions under the HttpSessionStore can get quite large, well over 1Mb in size. A Wicket session not only stores the session state but also the previous serialized pages the user has visited.
2. Serializing and de-serializing a large data structure can get expensive. The HttpSessionStore retains an AccessStackPageMap, which is a list data structure consisting of multiple page map revisions.
So instead of saving one large AccessStackPageMap, I wrote a SecondLevelCacheSessionStore that saves a page map revision per cache entry. This leads to much better cache utilization and a whole lot less serialization on the wire. Not to mention this avoids the whole 1Mb Memcached size limit.
Before you go willy nilly with clustering, read the Wicket render strategies page. Wicket requires session affinity for buffered responses with the default rendering strategy.
Clustering Wicket has never been easier.
Here is an example on how to offload page maps to a hybrid EhCache/Memcached cache. Memcached for long term shared storage while EhCache for short-lived fast cache look ups.
@Override
protected ISessionStore newSessionStore() {
// localhost:11211 — memcached server
// "fabpagestore" — unique appender to avoid key clashes.
// 300 — 5 minute TTL for local ehcache.
return new SecondLevelCacheSessionStore(this,
new CachePageStore(Arrays.asList("localhost:11211"),"fabpagestore",300));
}
}
Here is an example on how to offload page maps to the database.
@Override
protected ISessionStore newSessionStore() {
// "fabpagestore" — unique appender to avoid key clashes.
return new SecondLevelCacheSessionStore(this,new CachePageStore(
new DBCache("jdbc:mysql://foo/mydb", "myname", "mypass", "com.driver.Name", "fabpagestore")));
}
}
Here is my CachePageStore;
import com.base.cache.AsyncMemcache;
import com.base.cache.ICache;
import org.apache.wicket.Page;
import org.apache.wicket.protocol.http.SecondLevelCacheSessionStore.IClusteredPageStore;
import org.apache.wicket.protocol.http.pagestore.AbstractPageStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
public class CachePageStore extends AbstractPageStore implements IClusteredPageStore {
private ICache cache;
private Logger logger = LoggerFactory.getLogger(CachePageStore.class);
public CachePageStore(final List<String> servers, final String poolName, final int ttl) {
this(servers, poolName, true, ttl);
}
public CachePageStore(final List<String> servers, final String poolName, boolean async, final int ttl) {
this(new AsyncMemcache(servers, poolName, async, ttl));
}
public CachePageStore(final ICache cache) {
this.cache = cache;
}
// If pageVersion -1 must return highest page version.
protected String getKey(final String sessId, final String pageMapName, final int pageId, final int pageVersion) {
int pageVer = (pageVersion == -1) ? 0 : pageVersion;
if(pageVersion == -1) {
String[] meta = getMeta(sessId, pageMapName, pageId);
pageVer = Integer.valueOf(meta[0]);
}
return sessId + ":" + pageMapName + ":" + pageId + ":" + pageVer;
}
// If pageVersion -1 must return highest page version.
// If ajaxVersion -1 must return highest version.
public String getKey(final String sessId, final String pageMapName, final int pageId, final int pageVersion, final int ajaxVersion) {
// Default it to 0 initially
int ajaxVer = (ajaxVersion == -1) ? 0 : ajaxVersion;
int pageVer = (pageVersion == -1) ? 0 : pageVersion;
if(pageVersion == -1 || ajaxVersion == -1) {
String[] meta = getMeta(sessId, pageMapName, pageId);
if(pageVersion == -1) {
pageVer = Integer.valueOf(meta[0]);
}
if(ajaxVersion == -1) {
ajaxVer = Integer.valueOf(meta[1]);
}
}
return sessId + ":" + pageMapName + ":" + pageId + ":" + pageVer + ":" + ajaxVer;
}
protected String storeKey(final String sessionId, final Page page) {
return sessionId + ":" + page.getPageMapName() + ":" + page.getId() + ":" + page.getCurrentVersionNumber() + ":" + page.getAjaxVersionNumber();
}
protected String getBaseKey(String sessionId, Page page) {
return sessionId + ":" + page.getPageMapName() + ":" + page.getId();
}
protected String getMetaKey(String sessionId, String pageMap, int id) {
return getBaseKey(sessionId,pageMap,id)+"_meta";
}
protected String getMetaKey(String sessionId, Page page) {
return getBaseKey(sessionId,page)+"_meta";
}
protected String getBaseKey(String sessionId, String pageMap, int id) {
if(id == -1) {
return sessionId + ":" + pageMap;
} else {
return sessionId + ":" + pageMap + ":" + id;
}
}
public boolean containsPage(final String sessionId, final String pageMapName, final int pageId, final int pageVersion) {
String key = getKey(sessionId, pageMapName, pageId, pageVersion, -1);
if (logger.isDebugEnabled()) {
logger.debug("CheckExists: " + key);
}
return cache.keyExists(key);
}
public void destroy() {
}
public <T> Page getPage(final String sessionId, final String pagemap, final int id, final int versionNumber, final int ajaxVersionNumber) {
String key = getKey(sessionId, pagemap, id, versionNumber, ajaxVersionNumber);
if (logger.isDebugEnabled()) {
logger.debug("GetPage: " + key);
}
return (Page) cache.get(key);
}
public void pageAccessed(final String sessionId, final Page page) {
}
// If ID == -1 remove the entire pagemap; getBaseKey() takes care of this.
public void removePage(final String sessionId, final String pagemap, final int id) {
String key = getBaseKey(sessionId, pagemap, id);
if (logger.isDebugEnabled()) {
logger.debug("RemovePage: " + key);
}
cache.remove(getMetaKey(sessionId, pagemap, id));
for (String k : cache.getKeys()) {
if (k.startsWith(key)) {
cache.remove(k);
}
}
}
protected String[] getMeta(final String sessionId, String pageMap, int pageId) {
String metaKey = getMetaKey(sessionId,pageMap,pageId);
Object ret = cache.get(metaKey);
if (logger.isDebugEnabled()) {
logger.debug("GetMeta: " + metaKey);
}
if(ret == null) {
return new String[] {"0","0"};
} else {
return String.valueOf(ret).split(":");
}
}
protected void storeMeta(final String sessionId, final Page page) {
String metaKey = getMetaKey(sessionId, page);
Object ret = cache.get(metaKey);
if (logger.isDebugEnabled()) {
logger.debug("StoreMeta: " + metaKey);
}
if(ret == null) {
cache.put(metaKey,page.getCurrentVersionNumber()+":"+page.getAjaxVersionNumber());
} else {
String[] vals = String.valueOf(ret).split(":");
int currPage = Integer.valueOf(vals[0]);
int currAjax = Integer.valueOf(vals[1]);
if(page.getCurrentVersionNumber() > currPage) {
currPage = page.getCurrentVersionNumber();
}
if(page.getAjaxVersionNumber() > currAjax) {
currAjax = page.getAjaxVersionNumber();
}
cache.put(metaKey,currPage+":"+currAjax);
}
}
public void storePage(final String sessionId, final Page page) {
String sKey = storeKey(sessionId, page);
if (logger.isDebugEnabled()) {
logger.debug("StorePage: " + sKey);
}
cache.put(sKey, page);
storeMeta(sessionId,page);
}
public void unbind(final String sessionId) {
if (logger.isDebugEnabled()) {
logger.debug("Unbind: " + sessionId);
}
for (String key : cache.getKeys()) {
if (key.startsWith(sessionId)) {
cache.remove(key);
}
}
}
}
Are you running JRuby in production? Do you want distributed file storage for your “enterprise” application? Look no further, MogileFS is here.
MogileFS-Client has compatibility issues with JRuby due to it’s use of the low level Socket class. JRuby 1.5-dev does not yet support all the Socket methods, so here is a monkey patch to get the ruby mogilefs client working on JRuby. Yes it blocks, but who cares JRuby has native threads.
This is exactly why I love Ruby; monkey patching.
def self.mogilefs_new(host,port,timeout=5.0)
TCPSocket.open(host,port,timeout)
end
end
class TCPSocket
attr_accessor :mogilefs_addr, :mogilefs_connected, :mogilefs_size, :mogilefs_tcp_cork
def self.open(host,port,timeout = 5.0)
super(host,port.to_i)
end
def readable?
true
end
def write_nonblock(data)
write(data)
end
def recv_nonblock(size,arg)
recv(size,arg)
end
def mogilefs_init(host = nil, port = nil)
true
end
end
Here is an example test case on how to get it all to work.
require ‘mogilefs’
# jmogilefs.rb is the monkey patch above
# load it after loading mogilefs client.
require ‘jmogilefs.rb’
mg = MogileFS::MogileFS.new(:domain=>‘testserv’,:hosts=>[‘xxx.xxx.xxx.xxx:6001′])
p mg.get_file_data ‘video:100:default.jpg’
p mg.get_paths ‘video:100:default.jpg’,true
mg.list_keys(‘video:100′)[0].each do |f|
p f
end
I finally got around to open sourcing our scala memcached implementation that we use at fabulously40 for session storage.
Since wicket sessions can vary greatly in size, using the standard memcached server implementation became impractical due to the slab allocator.
The current code on github lacks the ehcache store and an Actor IoHandler adapter. The internal SMemcached application at fabulously40 uses a private caching API so we can hook up various caching backend storage implementations such as mysql, postgresql, ehcache or even another memcached server. You can grab the TCache project on github that SMemcached uses to unify caching under a single API. This gives SMemcached a lot of flexibility when it comes to caching your data.
fyi. TCache stands for “Tanek” Cache, Tanek means cache in russian.
The project works quite well, but don’t use it in production just yet since there is no data expiration for cached data in the HashMap storage implementation. This is just a technical preview. Do use it in production, this is what we use at Fabulously40
I am about to run out the house for the Labor Day weekend, quickie post on my new coding url strategy.
MixedParamHybridUrlCodingStrategy lets you keep stateful multi-pagemap URLs clean while using mixed parameters.
Example…
This will mount “/questions/stupid-category” and convert it to…
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.apache.wicket.IRequestTarget;
import org.apache.wicket.PageParameters;
import org.apache.wicket.protocol.http.request.WebRequestCodingStrategy;
import org.apache.wicket.request.target.coding.HybridUrlCodingStrategy;
import org.apache.wicket.util.string.AppendingStringBuffer;
import org.apache.wicket.util.value.ValueMap;
public class MixedParamHybridUrlCodingStrategy extends HybridUrlCodingStrategy {
private final String[] parameterNames;
public MixedParamHybridUrlCodingStrategy(final String mountPath, final Class pageClass) {
super(mountPath, pageClass);
this.parameterNames = new String[] {};
}
public MixedParamHybridUrlCodingStrategy(final String mountPath, final Class pageClass,final boolean refresh) {
super(mountPath,pageClass,refresh);
this.parameterNames = new String[] {};
}
public MixedParamHybridUrlCodingStrategy(final String mountPath, final Class pageClass,final String[] params) {
super(mountPath,pageClass,false);
this.parameterNames = params;
}
public MixedParamHybridUrlCodingStrategy(final String mountPath, final Class pageClass,final boolean refresh,final String[] params) {
super(mountPath,pageClass,refresh);
this.parameterNames = params;
}
@Override
protected void appendParameters(final AppendingStringBuffer url, final Map parameters)
{
Set parameterNamesToAdd = new HashSet(parameters.keySet());
// Find index of last specified parameter
boolean foundParameter = false;
int lastSpecifiedParameter = parameterNames.length;
while (lastSpecifiedParameter != 0 && !foundParameter)
{
foundParameter = parameters.containsKey(parameterNames[–lastSpecifiedParameter]);
}
if (foundParameter)
{
for (int i = 0; i <= lastSpecifiedParameter; i++)
{
String parameterName = parameterNames[i];
final Object param = parameters.get(parameterName);
String value = param instanceof String[] ? ((String[])param)[0] : (String)param;
if (value == null)
{
value = "";
}
if (!url.endsWith("/"))
{
url.append("/");
}
url.append(urlEncodePathComponent(value));
parameterNamesToAdd.remove(parameterName);
}
}
if (!parameterNamesToAdd.isEmpty())
{
boolean first = true;
final Iterator iterator = parameterNamesToAdd.iterator();
while (iterator.hasNext())
{
url.append(first ? ‘?’ : ‘&’);
String parameterName = (String)iterator.next();
final Object param = parameters.get(parameterName);
String value = param instanceof String[] ? ((String[])param)[0] : (String)param;
url.append(urlEncodeQueryComponent(parameterName)).append("=").append(
urlEncodeQueryComponent(value));
first = false;
}
}
String pageMap = (String)parameters.get(WebRequestCodingStrategy.PAGEMAP);
if (pageMap != null)
{
pageMap = WebRequestCodingStrategy.encodePageMapName(pageMap);
if (!url.endsWith("/"))
{
url.append("/");
}
url.append(WebRequestCodingStrategy.PAGEMAP).append("/").append(
urlEncodePathComponent(pageMap));
}
}
@Override
public CharSequence encode(final IRequestTarget requestTarget) {
String url = String.valueOf(super.encode(requestTarget));
if(url.contains(".wicket")) {
// Rewrite URL from.. /foo?bar=5.wicket-xxx to /foo.wicket-xxx?bar=5
return url.replaceAll("(.*)\\?(.*)\\.(wicket-[0-9]+)$", "$1.$3?$2");
}
return url;
}
@Override
protected ValueMap decodeParameters(final String urlFragment, final Map urlParameters)
{
PageParameters params = new PageParameters();
if(urlFragment == null) {
return params;
}
// Add all url parameters and normalize
for(Iterator<Map.Entry> itr = urlParameters.entrySet().iterator();itr.hasNext();) {
Map.Entry<String,String[]> e = itr.next();
if(e.getValue() != null) {
String val = (e.getValue())[0];
if(val != null) {
if(val.contains(".wicket")) {
val = val.substring(0,val.indexOf(".wicket"));
urlParameters.put(e.getKey(), val);
}
}
}
}
params.putAll(urlParameters);
String urlPath = urlFragment;
if (urlPath.startsWith("/"))
{
urlPath = urlPath.substring(1);
}
if (urlPath.length() > 0)
{
String[] pathParts = urlPath.split("/");
if (pathParts.length > parameterNames.length)
{
throw new IllegalArgumentException(
"Too many path parts, please provide sufficient number of path parameter names");
}
for (int i = 0; i < pathParts.length; i++)
{
if(pathParts[i].contains(".wicket")) {
pathParts[i].substring(0,pathParts[i].indexOf(".wicket"));
}
if (WebRequestCodingStrategy.PAGEMAP.equals(pathParts[i]))
{
params.put(WebRequestCodingStrategy.PAGEMAP,
WebRequestCodingStrategy.decodePageMapName(urlDecodePathComponent(pathParts[i])));
}
params.put(parameterNames[i], urlDecodePathComponent(pathParts[i]));
}
}
return params;
}
}
GrizzlyConnector patch for Jetty to work with QueuedThreadPool
1 Comment | Filed under administration programmingThis is a late night post, so I am just going to make it short. This patch lets you use QueuedThreadPool with the Grizzly Connector. This is a monkey patch, getMaxThreads() should be moved up into the Thread Interface.
--- GrizzlyConnection-old.java Sat May 2 01:08:02 2009 +++ GrizzlyConnector.java Sat May 2 00:56:37 2009 @@ -51,6 +51,8 @@ import org.mortbay.jetty.webapp.WebAppContext; import org.mortbay.log.Log; import org.mortbay.thread.BoundedThreadPool; +import org.mortbay.thread.QueuedThreadPool; +import org.mortbay.thread.ThreadPool; /* ------------------------------------------------------------------------------- */ /** @@ -178,8 +180,13 @@ controller.setProtocolChainInstanceHandler(instanceHandler); Pipeline pipeline = new DefaultPipeline(); - pipeline.setMaxThreads( - ((BoundedThreadPool)getServer().getThreadPool()).getMaxThreads()); + if(getServer().getThreadPool() instanceof BoundedThreadPool) { + pipeline.setMaxThreads( + ((BoundedThreadPool)getServer().getThreadPool()).getMaxThreads()); + } else { + pipeline.setMaxThreads( + ((QueuedThreadPool)getServer().getThreadPool()).getMaxThreads()); + } controller.setPipeline(pipeline); }
OpenSolaris uses a single-threaded malloc by default for all applications. The JDK that is compiled for Solaris fails to be linked against mtmalloc or the newer umem malloc implementation that is multithread optimized. In a multithreaded application using a single threaded malloc can degrade performance. As memory is being allocated concurrently in multiple threads, all the threads must wait in a queue while malloc() handles one request at a time, this is called heap contention. To get around this contention point you can force the JDK to use the umem malloc.
LD_PRELOAD=/usr/lib/libumem.so /opt/jdk1.7.0/bin/java start.jar or LD_PRELOAD=/usr/lib/libmtmalloc.so /opt/jdk1.7.0/bin/java start.jar
This simple fix has really improved performance on our web service fabulously40. The application went from serving 120req/sec uncached to 170req/sec. Not bad no?
This also works wonders for mysql and varnish, two applications that really put those threads to use. We have dropped 100ms in response time with varnish by just using umem for the malloc implementation.

(11 votes, average: 4.73 out of 5)
(4 votes, average: 3.50 out of 5)