The Fedora Repository makes it possible to design custom event-driven application workflows. For instance, a common task involves sending content to an external search engine or triplestore. Other repositories may wish to generate derivative content such as creating a set of smaller images from a high-resolution master.
Because Fedora publishes modification events on a JMS topic using a local ActiveMQ broker, one can write custom listener applications to handle these various workflows. By default, the repository's JMS broker supports both the OpenWire and STOMP protocols, which means that it is possible to write client listeners (consumers) in a wide variety of languages, including PHP, Python, Ruby and JAVA, among others.
For simple message-consuming applications, writing special-purpose applications may be an excellent choice. In contrast, once a repository begins making use of more complex message-based workflows or when there are multiple listener applications to manage, many repositories use systems such as Apache Camel to simplify the handling of these messages.
Camel makes use of "components" to integrate various services using a terse, domain specific language that can be expressed in JAVA, XML, Scala or Groovy. There exists one such component designed to work specifically with a Fedora4 repository. This makes it possible to model Solr indexing in only a few lines of code like so:
XPathBuilder xpath = new XPathBuilder("/rdf:RDF/rdf:Description/rdf:type[@rdf:resource='http://fedora.info/definitions/v4/rest-api#indexable']") xpath.namespace("rdf", "http://www.w3.org/1999/02/22-rdf-syntax-ns#") from("activemq:topic:fedora") .to("fcrepo:localhost:8080/fedora/rest") .filter(xpath) .to("fcrepo:localhost:8080/fedora/rest?accept=application/json&transform=default") .to("http4:localhost:8080/solr/core/update");
In this example, the XPath filtering predicate is just an example; you can, of course, use many different Predicate languages, including XQuery, SQL or various Scripting Languages.
This same logic can also be expressed using the Spring XML extensions:
<route> <from uri="activemq:topic:fedora"/> <to uri="fcrepo:localhost:8080/fedora/rest"/> <filter> <xpath>/rdf:RDF/rdf:Description/rdf:type[@rdf:resource='http://fedora.info/definitions/v4/rest-api#indexable']</xpath> <to uri="fcrepo:localhost:8080/fedora/rest?accept=application/json&transform=default"/> <to uri="http4:localhost:8080/solr/core/update"/> </filter> </route>
Or, in Scala:
val xpath = new XPathBuilder("/rdf:RDF/rdf:Description/rdf:type[@rdf:resource='http://fedora.info/definitions/v4/rest-api#indexable']") xpath.namespace("rdf", "http://www.w3.org/1999/02/22-rdf-syntax-ns#") "activemq:topic:fedora" ==> { to("fcrepo:localhost:8080/fedora/rest") filter(xpath) { to("fcrepo:localhost:8080/fedora/rest?accept=application/json&transform=default") to("http4:localhost:8080/solr/core/update") } }
Please note that the hostnames used for fedora and solr here are entirely arbitrary. It is quite likely that these systems will be deployed on separate hosts and that the Camel routes will be deployed on yet another host. Camel makes it easy to distribute applications and replicate data asynchronously across multiple hosts
Message Headers
By default, Fedora publishes events to a topic
on a local broker. This topic is named "fedora". Each message will contain an empty body and up to five different header values. Those header values are namespaced so they look like this:
- org.fcrepo.jms.identifier
- org.fcrepo.jms.eventType
- org.fcrepo.jms.properties
- org.fcrepo.jms.timestamp
- org.fcrepo.jms.baseURL
Both eventType
and properties
are comma-delimited lists of events or properties. The eventTypes follow the JCR 2.0 specification and include:
- http://fedora.info/definitions/v4/repository#NODE_ADDED
- http://fedora.info/definitions/v4/repository#NODE_REMOVED
- http://fedora.info/definitions/v4/repository#PROPERTY_ADDED
- http://fedora.info/definitions/v4/repository#PROPERTY_CHANGED
- http://fedora.info/definitions/v4/repository#PROPERTY_REMOVED
The properties
field will list the RDF properties that changed with that event. NODE_REMOVED
events contain no properties. The fcrepo component for Camel is configured to recognize these headers and act appropriately.
Examples
LDPath Transformations
If an fcr:transform
program has been installed as mytransform
, you can generate a JSON representation of an object and send it to a low-latency, highly available document store, such as Riak. The following route determines if an object has been removed or simply added/updated. It then routes the message appropriately to a load-balancer sitting in front of the Riak HTTP endpoint.
val riakKeyProcessor = (exchange: Exchange) => { exchange.getIn.setHeader( Exchange.HTTP_PATH, "/buckets/fcrepo/keys/" + URLEncoder.encode(exchange.getIn.getHeader("org.fcrepo.jms.identifier", classOf[String])) ) } "activemq:topic:fedora" ==> { choice() { when(_.in("org.fcrepo.jms.eventType") == "http://fedora.info/definitions/v4/repository#NODE_REMOVED") { setHeader(Exchange.HTTP_METHOD, constant("DELETE")) process(riakKeyProcessor) to("http4:localhost:8098") } otherwise() { to("fcrepo:localhost:8080/fedora/rest") filter(xpathFilter) { to("fcrepo:localhost:8080/fedora/rest?accept=application/json&transform=mytransform") setHeader(Exchange.HTTP_METHOD, constant("PUT")) process(riakKeyProcessor) to("http4:localhost:8098") } } } }
External Triplestore
Some additional processing must be done to transform an application/n-triples
response into a valid application/sparql-update
payload before sending to Fuseki or Sesame. The fcrepo component contains some processors in org.fcrepo.camel.processor
to handle this case.
from("activemq:topic:fedora") .to("fcrepo:localhost:8080/fedora/rest?accept=application/n-triples") .process(new SparqlUpdateProcessor()) .setHeader(Exchange.CONTENT_TYPE).constant("application/sparql-update") .to("http4:localhost:3030/fcrepo/update");
Event-based Routing
It is often helpful to route messages to different queues based on the eventType
value. This example splits messages on eventType
values and routes the messages to appropriate queues. Following this example, it would be prudent to aggregate the messages based on org.fcrepo.jms.identifier
value after retrieving the messages from the downstream queues.
<route id="fcrepo-event-splitter"> <description> Retrieve messages from the fedora topic. Event types are comma-delimited, so split them into separate messages before routing them. </description> <from uri="activemq:topic:fedora"/> <setBody> <simple>${header.org.fcrepo.jms.eventType}</simple> </setBody> <split> <tokenize token=","/> <setHeader headerName="org.fcrepo.jms.eventType"> <simple>${body}</simple> </setHeader> <setBody> <simple>null</simple> </setBody> <to uri="seda:fcrepo-event-router"/> </split> </route> <route id="fcrepo-event-router"> <description> Route messages based on the eventType. </description> <from uri="seda:fcrepo-event-router"/> <choice> <when> <simple>${header.org.fcrepo.jms.eventType} == "http://fedora.info/definitions/v4/repository#NODE_REMOVED"</simple> <to uri="activemq:queue:fcrepo.delete"/> </when> <when> <simple>${header.org.fcrepo.jms.eventType} == "http://fedora.info/definitions/v4/repository#NODE_ADDED"</simple> <to uri="activemq:queue:fcrepo.add"/> </when> <when> <simple>${header.org.fcrepo.jms.eventType} == "http://fedora.info/definitions/v4/repository#PROPERTY_ADDED"</simple> <to uri="activemq:queue:fcrepo.update"/> </when> <when> <simple>${header.org.fcrepo.jms.eventType} == "http://fedora.info/definitions/v4/repository#PROPERTY_CHANGED"</simple> <to uri="activemq:queue:fcrepo.update"/> </when> <when> <simple>${header.org.fcrepo.jms.eventType} == "http://fedora.info/definitions/v4/repository#PROPERTY_REMOVED"</simple> <to uri="activemq:queue:fcrepo.update"/> </when> <otherwise> <log message="No router for ${header.org.fcrepo.jms.eventType}"/> </otherwise> </choice> </route>
Supporting Queues
The default configuration is fine for locally-deployed listeners, but it can be problematic in a distributed context. For instance, if the listener is restarted while a message is sent to the topic, that message may be missed. Furthermore, if there is a networking hiccup between fedora's local broker and the remote listener, that too can result in lost messages. Instead, in this case, a queue may be better suited.
ActiveMQ brokers support a wide variety of protocols. If Fedora's internal broker is bridged to an external broker, please remember to enable the proper protocols on the remote broker. This can be done like so:
<transportConnectors> <transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/> <transportConnector name="stomp" uri="stomp://0.0.0.0:61613"/> </transportConnectors>
Each transportConnector supports many additional options that can be added to this configuration.
Deployment
Camel routes can be deployed in any JVM container. In order to deploy to Jetty or Tomcat, the route must be built as a WAR file. This command will get you started:
$> mvn archetype:generate \ -DarchetypeGroupId=org.apache.camel.archetypes \ -DarchetypeArtifactId=camel-archetype-war \ -DarchetypeVersion=2.14.0 \ -DgroupId=org.example.camel \ -DartifactId=my-camel-route \ -Dversion=1.0.0-SNAPSHOT \ -Dpackage=org.example.camel
After the project has been built (mvn install
), you will find the WAR file in ./target
. That file can simply be copied to the webapps
directory of your Jetty/Tomcat server.
Another popular deployment option is Karaf, which is a light-weight OSGi-based JVM container. Karaf has the advantage of supporting hot code swapping, which allows you to make sure that your routes are always running. It also allows you to deploy XML-based routes (Spring or Blueprint) by simply copying the files into a deploy
directory.
Karaf can be set up by:
- downloading Karaf from an apache.org mirror
- running ./bin/karaf to enter the shell
installing required services:
$> feature:repo-add camel 2.14.0 $> feature:repo-add activemq 5.10.0 $> feature:install camel $> feature:install activemq-camel # display available camel features $> feature:list | grep camel # install camel features, as needed $> feature:install camel-http4
setting up a service wrapper (so that karaf is always running)
$> feature:install wrapper $> wrapper:install
- following the directions provided by this command
Now, routes can be deployed (and re-deployed) by simply copying JAR files or XML documents to $KARAF_HOME/deploy
.