The Fedora Repository makes it possible to design custom event-driven application workflows. For instance, a common use case is sending content to an external search engine or triplestore. Other repositories may wish to generate derivative content such as creating a set of smaller images from a high-resolution master.
Because Fedora publishes modification events on a JMS topic using a local ActiveMQ broker, one can write custom listener applications to handle these various workflows. By default, the repository's JMS broker supports both the OpenWire and STOMP protocols, which means that it is possible to write client listeners or consumers in a wide variety of languages, including PHP, Python, Ruby and Java, among others.
Message Headers
By default, Fedora publishes events to a topic
on a local broker. This topic is named "fedora"
The contents of the message header and body are documented here. For information about how to inspect messages coming from Fedora, go here.
Supporting Queues
The default configuration is fine for locally-deployed listeners, but it can be problematic in a distributed context. For instance, if the listener is restarted while a message is sent to the topic, that message may be missed. Furthermore, if there is a networking hiccup between Fedora's local broker and the remote listener, that too can result in lost messages. Instead, in this case, a queue may be better suited.
ActiveMQ supports “virtual destinations”, allowing your broker to automatically forward messages from one location to another. If Fedora is deployed in Tomcat, the ActiveMQ configuration will be located in WEB-INF/classes/config/activemq.xml
. That file can be edited to include the following block:
ActiveMQ brokers support a wide variety of protocols. If Fedora's internal broker is bridged to an external broker, please remember to enable the proper protocols on the remote broker. This can be done like so:
<transportConnectors> <transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/> <transportConnector name="stomp" uri="stomp://0.0.0.0:61613"/> </transportConnectors>
Each transportConnector
supports many additional options that can be added to this configuration.
Deployment
Camel routes can be deployed in any JVM container. In order to deploy to Jetty or Tomcat, the route must be built as a WAR file. This command will get you started:
$> mvn archetype:generate \ -DarchetypeGroupId=org.apache.camel.archetypes \ -DarchetypeArtifactId=camel-archetype-war \ -DarchetypeVersion=2.14.0 \ -DgroupId=org.example.camel \ -DartifactId=my-camel-route \ -Dversion=1.0.0-SNAPSHOT \ -Dpackage=org.example.camel
After the project has been built (mvn install
), you will find the WAR file in ./target
. That file can simply be copied to the webapps
directory of your Jetty/Tomcat server.
Camel Messaging
For simple message-consuming applications, writing special-purpose applications may be an excellent choice. In contrast, once a repository begins making use of more complex message-based workflows or when there are multiple listener applications to manage, many repositories use systems such as Apache Camel to simplify the handling of these messages.
Camel makes use of "components" to integrate various services using a terse, domain specific language (DSL) that can be expressed in Java, XML, Scala or Groovy. There exists an fcrepo-camel component designed to work specifically with a Fedora repository. This makes it possible to model Solr indexing in only a few lines of code like so:
XPathBuilder xpath = new XPathBuilder("/rdf:RDF/rdf:Description/rdf:type[@rdf:resource='http://fedora.info/definitions/v4/indexing#Indexable']") xpath.namespace("rdf", "http://www.w3.org/1999/02/22-rdf-syntax-ns#") from("activemq:topic:fedora") .to("fcrepo:localhost:8080/fedora/rest") .filter(xpath) .to("fcrepo:localhost:8080/fedora/rest?transform=default") .to("http4:localhost:8080/solr/core/update");
In this specific case, the XPath filtering predicate is just an example; you can, of course, use many different Predicate languages, including XQuery, SQL or various Scripting Languages.
This same logic can also be expressed using the Spring XML extensions:
<route> <from uri="activemq:topic:fedora"/> <to uri="fcrepo:localhost:8080/fedora/rest"/> <filter> <xpath>/rdf:RDF/rdf:Description/rdf:type[@rdf:resource='http://fedora.info/definitions/v4/indexing#Indexable']</xpath> <to uri="fcrepo:localhost:8080/fedora/rest?transform=default"/> <to uri="http4:localhost:8080/solr/core/update"/> </filter> </route>
Or, in Scala:
val xpath = new XPathBuilder("/rdf:RDF/rdf:Description/rdf:type[@rdf:resource='http://fedora.info/definitions/v4/indexing#Indexable']") xpath.namespace("rdf", "http://www.w3.org/1999/02/22-rdf-syntax-ns#") "activemq:topic:fedora" ==> { to("fcrepo:localhost:8080/fedora/rest") filter(xpath) { to("fcrepo:localhost:8080/fedora/rest?transform=default") to("http4:localhost:8080/solr/core/update") } }
Please note that the hostnames used for Fedora and Solr in the snippets above are arbitrary. It is quite likely that these systems will be deployed on separate hosts and that the Camel routes will be deployed on yet another host. Camel makes it easy to distribute applications and replicate data asynchronously across an arbitrarily large number of independent systems.
Monitoring Your Camel Routes
It is often useful to keep runtime statistics for your camel routes. Hawtio is a web console for monitoring your messaging infrastructure, and it can be deployed in any JVM container, including Karaf, Tomcat or Jetty.
Once deployed, hawtio is available at http://localhost:8181/hawtio/
With Tomcat or Jetty, deploying hawtio is simply a matter of installing a WAR file. Please see the hawtio website for more information.
The Fedora Camel Toolbox
You can also check out a suite of camel-based services in the Fedora Camel Toolbox.
Setup Fedora to Talk to External ActiveMQ
You might wish to use an external ActiveMQ instead of Fedora's built-in one. This offers easier access to the ActiveMQ web console, the ability to upgrade ActiveMQ independently of Fedora, and the ability to distribute messaging across servers (among other possible benefits). The instructions below offer one possible approach to achieving this setup.
Download the latest ActiveMQ 5 Linux .tar.gz from https://activemq.apache.org/components/classic/download
- Unpack the file and move it under /opt
- Create a symlink from the unpacked directory to /opt/activemq
- Create an activemq user: sudo adduser --system --no-create-home activemq
- Change ownership: sudo chown -R activemq:activemq /opt/activemq/
- Set up the unit file in /usr/lib/systemd/system/activemq.service:
[Unit] Description=Apache ActiveMQ After=network-online.target [Service] Type=forking WorkingDirectory=/opt/activemq/bin ExecStart=/opt/activemq/bin/activemq start ExecStop=/opt/activemq/bin/activemq stop Restart=on-abort User=activemq Group=activemq [Install] WantedBy=multi-user.target
- Edit /opt/activemq/conf/activemq.xml and change port numbers in the transportConnectors section (to avoid conflicts with Fedora’s built-in ActiveMQ; feel free to choose different port numbers as needed, just replace below as needed):
- 61616 -> 41616
- 61613 -> 41613
- Start and enable the service: sudo systemctl enable activemq ; sudo systemctl start activemq
- Edit /opt/fedora/fcrepo.properties and add some properties to customize the configuration and file location for the internal Fedora ActiveMQ (feel free to choose different file paths as appropriate to your Fedora setup):
- fcrepo.activemq.configuration=file:///opt/fedora/config/activemq.xml
- fcrepo.activemq.directory=/opt/fedora/queue
- Create /opt/fedora/config/activemq.xml (this is the default Fedora ActiveMQ configuration with a fedora_bridge network connector added to forward messages to port 41616):
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> <context:property-placeholder/> <bean id="activeMqDirectory" class="java.lang.String"> <constructor-arg value="#{fedoraPropsConfig.activeMqDirectory}"/> </bean> <!-- The <broker> element is used to configure the ActiveMQ broker. --> <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost"> <networkConnectors> <networkConnector name="fedora_bridge" dynamicOnly="true" uri="static:(tcp://localhost:41616)"> <dynamicallyIncludedDestinations> <topic physicalName="fedora"/> <queue physicalName="fedora"/> </dynamicallyIncludedDestinations> </networkConnector> </networkConnectors> <!-- For better performances use VM cursor and small memory limit. For more information, see: http://activemq.apache.org/message-cursors.html Also, if your producer is "hanging", it's probably due to producer flow control. For more information, see: http://activemq.apache.org/producer-flow-control.html --> <destinationPolicy> <policyMap> <policyEntries> <policyEntry topic=">" producerFlowControl="true"> <!-- The constantPendingMessageLimitStrategy is used to prevent slow topic consumers to block producers and affect other consumers by limiting the number of messages that are retained For more information, see: http://activemq.apache.org/slow-consumer-handling.html --> <pendingMessageLimitStrategy> <constantPendingMessageLimitStrategy limit="1000"/> </pendingMessageLimitStrategy> </policyEntry> <policyEntry queue=">" producerFlowControl="true" memoryLimit="1mb"> <!-- Use VM cursor for better latency For more information, see: http://activemq.apache.org/message-cursors.html <pendingQueuePolicy> <vmQueueCursor/> </pendingQueuePolicy> --> </policyEntry> </policyEntries> </policyMap> </destinationPolicy> <!-- The managementContext is used to configure how ActiveMQ is exposed in JMX. By default, ActiveMQ uses the MBean server that is started by the JVM. For more information, see: http://activemq.apache.org/jmx.html --> <managementContext> <managementContext createConnector="false"/> </managementContext> <!-- Configure message persistence for the broker. The default persistence mechanism is the KahaDB store (identified by the kahaDB tag). For more information, see: http://activemq.apache.org/persistence.html --> <persistenceAdapter> <kahaDB directory="#activeMqDirectory"/> </persistenceAdapter> <!-- The systemUsage controls the maximum amount of space the broker will use before slowing down producers. For more information, see: http://activemq.apache.org/producer-flow-control.html If using ActiveMQ embedded - the following limits could safely be used: --> <systemUsage> <systemUsage> <memoryUsage> <memoryUsage limit="20 mb"/> </memoryUsage> <storeUsage> <storeUsage limit="1 gb"/> </storeUsage> <tempUsage> <tempUsage limit="100 mb"/> </tempUsage> </systemUsage> </systemUsage> <!-- The transport connectors expose ActiveMQ over a given protocol to clients and other brokers. For more information, see: http://activemq.apache.org/configuring-transports.html --> <transportConnectors> <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB --> <transportConnector name="openwire" uri="tcp://0.0.0.0:#{fedoraPropsConfig.jmsPort}?maximumConnections=1000&wireformat.maxFrameSize=104857600" /> <transportConnector name="stomp" uri="stomp://0.0.0.0:#{fedoraPropsConfig.stompPort}"/> </transportConnectors> <!-- destroy the spring context on shutdown to stop jetty --> <shutdownHooks> <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook"/> </shutdownHooks> </broker> <!-- Enable web consoles, REST and Ajax APIs and demos Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details <import resource="jetty.xml"/> --> </beans>
- Reconfigure Camel Toolbox by editing /opt/fedora/config/fcrepo-camel-toolbox.properties and adding:
- jms.brokerUrl=tcp://localhost:41616
- Restart Camel Toolbox: (e.g., if you have set it up as a service: sudo systemctl restart cameltoolbox)