RabbitMQ and Drupal in action
In business, CMS systems such as Drupal rarely stand alone – CRM, shop systems, and other microsites and applications are just part of everyday life. But how do you get the various applications to work together smoothly?
In today's companies, work processes and the data involved are almost always spread across different applications. Practically everyone has to deal with different systems in day-to-day work: CMS, CRM, ERP, document management systems and many more are in use. Each has its own function and justified existence, but wouldn't it be nice if the individual applications knew about each other and could cooperate better? It would save a lot of work – manual synchronization of data and the processing of orders by hand in multiple, associated systems would then be a thing of the past. A message broker such as RabbitMQ can be employed very well for such application scenarios. It enables interaction between various IT systems, no matter whether it's a Java EE solution, a Ruby on Rails website or the Drupal CMS. In order to communicate with each other, messages are sent, forwarded by the message broker to neighboring systems, where they are received and processed. The distinctive feature here is the way in which the messages are delivered and distributed in a process called routing: messages are held in queues and, if necessary, delivered at a later time if an application is offline. Additionally, the sender doesn't determine the recipient – this is done by the broker based on predefined rules. This results in a reliable and flexible channel of communication for the applications. Furthermore, message brokers can be used in a targeted fashion to achieve highly scalable services or to cushion peaks in demand: messages are then distributed evenly across several consuming applications to achieve load balancing. At the same time, the queues allow the applications to define the processing speed themselves. An application can therefore continue to run stably while the message broker buffers the incoming storm of requests.
Messages in flow
Messages are the central element of a message broker – they consist of a header with key-value pairs and arbitrary content, usually in a technology-neutral format such as XML or JSON. The header contains metadata such as the content type, application/xml and other fields that affect routing. With RabbitMQ, or the underlying industry standard AMQP, this is both the exchange and routing key. The former can be thought of as a switchboard, while the latter is a specially formatted string of characters that can be used to filter messages. An example: An employee changes the availability of a product in a central ERP system – and all connected shop systems need to be informed of this. So an automated message with a routing key such as product.update.stockX.categoryY is sent to the ERP exchange. This exchange now forwards the message to the corresponding queues (Fig. 1) – Details can be found in the "Routing in detail" box.
Thus, a separate copy of the message is sent to each shop system, where the message is received and only removed from the broker's queue once it has been processed successfully. This architecture provides some positive features: if an additional shop system is added in the future, it is sufficient just to adapt the routing – the ERP system itself remains untouched. Additionally, a shop system can be shut down at any time for maintenance. The messages simply stay with the broker and are delivered at a later time. So the message broker infrastructure guarantees that messages are delivered – although not necessarily immediately.
Routing in detail: Messages are sent to an exchange but are received by queues. The connection between the two is constituted by rules called bindings, which bind a queue to a specific exchange. This occurs depending on the type of exchange: Fanout: Each message is forwarded to every bound queue (routing key is ignored) Direct: Message a filtered based on the routing key: A simple string comparison is carried out (multiple queues per routing key are possible) Topic: Messages are filtered as with Direct, but more complex pattern matching is possible: words are separated by periods, the pound sign stands for any word, and the asterisk for zero or more words. Example: product.*.storeX.# Headers: Filters the messages according to any key-value pairs from the message header. Detailed descriptions of the AMQP concepts can be found on the RabbitMQ website.
Designing asynchronous applications
It's helpful to imagine the message exchange as a kind of event-based programming – with the addition of it crossing computer boundaries. This architecture paradigm is actually becoming more and more popular, as it automatically leads to the decoupling of individual components. One of the best examples of this is the Symfony2 Framework, which is made up of lots of small parts that communicate with each other, e.g. with events. The actual programming with a message broker differs greatly from working with classic web services. Requests are not carried out synchronously, but always asynchronously. This can have consequences all the way up to the web interface, as one cannot wait for the response from an event. Perhaps the system being contacted is currently overloaded or being serviced? In the synchronous world, this usually means that the user is faced with an error message. With an increasing number of systems communicating with each other, the probability of an error also increases, as the blackout of one system can affect all others through the coupling of synchronous calls. In an asynchronous world of messaging, you are forced to consider these cases from the outset: the application is also prepared for these unpleasant situations and can continue to operate smoothly – and most importantly, a fault with one of the applications doesn't affect all the others. It is often useful to adapt the interface for the user, so that they don't always expect an immediate response. If in doubt, notifying the user via e-mail is the more reliable path than a notification in a web browser.
The broker's routing and the use of queues has an impact on the processing of messages in the receiving application: messages can overtake each other or even arrive several times. Additionally, you must also expect other applications to produce invalid messages that your own system cannot process. This is especially true if the broker isn't only used internally, but also for enterprise-wide integration. Depending on the type of application, some or all of the problems described here might occur. However, the effected systems can easily deal with these difficulties if they follow a few simple rules:
Process the message in an idempotent manner, i.e. if an event has already been processed, do not trigger an error but accept the message silently – analogous to an HTTP PUT or DELETE call.
Ignore messages if they are outdated.
Ignore messages if they are invalid.
„Let it crash“: cancel processing in cases of severe errors and restart processes instead of trying to continue.
Rule number one solves the problem of duplicate messages, or rather, ensures that the double handling of a message has no further effects. If, for example, a dataset has already been deleted as part of a synchronization, a second request to delete it should not result in an error. Duplicate messages can partly be prevented through the use of transactions: RabbitMQ then ensures that a new message is only produced if the one currently being received has been processed completely. Unfortunately, this ignores all other systems – such as a MySQL database – as they have their own transactions that are not coordinated with those of the broker. Although holistic solutions do exist (outside the world of PHP), they are far less performant as they draw on an elaborate transaction protocol and work with timeouts. The idempotent implementation offers a simpler and more elegant solution here. The second rule's task is to ensure that outdated messages do not overwrite any new data. To do this, it is necessary to find a separate criterion at application level that can determine the order of the messages. In the simplest case, this would be a timestamp of the server time, synchronized as well as possible. The consuming application constantly stores this criterion somewhere and compares it before processing a message: if a newer version exists, the message is ignored. A typical application is the alignment of user data across several systems – here the date of the last message or change for every user could be stored. Rule number three ensures that the queues are not swamped by invalid messages that are sent over and over again because processing is failing. This is where the developer comes in – they have to check the message content carefully before it is processed, as when an error has already occurred, it is difficult to then recognize whether this is due to the message or a temporary error in the application. In the latter case, the message processing should not be confirmed to ensure that the message is automatically sent again later. The application then has another chance to process the message. Finding the source of an error – message or application – is therefore very important, as handling the respective message differs fundamentally depending on the source of the error. The final rule – "let it crash" – is important as messages cannot be received as part of a normal HTTP request, as a long-running process is necessary for the timely processing of messages, which requires constant contact with RabbitMQ. This is no longer a problem in PHP, thanks to the addition of a functional garbage collection with PHP 5.3. Thus, memory usage does not grow immeasurably or push the memory limit over time. Nevertheless, you should still expect fatal errors that cannot be resolved and which knock the process out of sync. It is therefore often the best, and above all, simplest strategy to monitor the process and, if necessary, restart it automatically rather than putting too much effort into complex exception handling. A simple, yet rather unusual error situation for PHP web frameworks is the MySQL connection timing out due to extended inactivity. What seems impossible with second-long or even millisecond-long requests in Apache can now occur during the night, bringing the process to its knees.
Using RabbitMQ with Drupal
To make configuration and development with RabbitMQ simpler and more pleasant, SIRUP has developed the "Message Broker" module for Drupal: providing the programmer with a simple interface with which messages can be sent and received. This currently features an AMQP and a simulation implementation; however it would theoretically also be possible to bind other protocols such as STOMP as the API has a generic structure. Sending a message is shown as per an example in Listing 1.
One of the core ideas of the module is the declarative description of the routing in a central JSON file (Listing 2), which is accessible for all systems involved. This ensures that all queues, exchanges and bindings are set up in the broker in time, and that they also remain consistent. The latter is otherwise virtually impossible as systems cannot be simultaneously updated while in operation. This means that different consumers expect different routing topologies and also try to define them accordingly, which will inevitably lead to an exception.
Typically for Drupal, message reception is prepared by hook and relies on previously described cues in the JSON file. As shown in Listing 3, a callback is defined for each retrieved queue, which is then called once per message. It should be noted that the content of the transferred message first needs to be decoded, e.g. using json_decode. The second parameter is a function that is called to signal the complete processing of the message.
Furthermore, the module provides as much support as possible to simplify the implementation of the rules above. For the second rule, for example, there's a separate exception type called InvalidMessageException. An exception of this kind logs the invalid values, but confirms the processing of the message to RabbitMQ and then continues with the next. Additionally, a critical error can be shown via the special CriticalErrorException. Unlike all other exceptions, this not only aborts the handling of the current message, but shuts down the entire process. Together with an external monitoring process, this allows the entire process to be restarted strategically – very useful in the case of a MySQL timeout as mentioned above.
Local testing and development
The development of distributed systems is always a little more difficult than the implementation of simple local applications. This is not much different when using a message broker: RabbitMQ needs to be installed locally in order to even be able to run and test one's own software. SIRUP has therefore developed a dummy implementation of the API to allow the exchange of messages to be simulated, according to AMQP standards, even without your own message broker. Here, the entire message flow is performed synchronously in the current Drupal request, so that you can develop and test with the usual convenience: in this way, the interaction between the individual parts of the application can be tested and debugged much more easily. This approach has its limitations, of course: only the messages that are sent to the same Drupal system are processed. In addition, the execution is synchronous, which can affect the order in which the messages are processed – but that doesn't trouble robust applications anyway (see above). The first limitation is practically dealt with, with simple mocks (dummies) of the external systems – which are, in any case, essential for integration texts. The integration of this dummy system is structured as follows: a separate Drupal module is developed for each external system, which are implemented into the simulation environment via hooks (Listing 3). In this way, the complete system can be developed and tested locally. This avoids the time-consuming setup of complex development and test environments, while also helping to improve the performance of the staging environment.
Long live Drupal
To enable the prompt processing of messages, a long-running PHP process is required. In the context of Drupal, the command line tool Drush is the perfect solution as it takes care of the bootstrapping of the CMS and the integration of the console environment. The message broker module hooks in there and can be started using the drush consume-amqp-messages command. It then connects to RabbitMQ and processes messages for all eternity. It is also possible to launch single or multiple consumers (predefined by hook), specifying additional parameters that optimize the interaction with RabbitMQ. When executing the Drupal code, however, you will notice from time to time that the CMS was not necessarily designed for such a situation. For example, the watchdog function uses a one-time, initialized constant as the current time when logging. Therefore all of the Drush process's log entries later appear as if they had been made at the start time of the process. This is annoying in itself, but there's another reason why it's worthwhile to adapt Drupal's logging: the default settings are written to the database. If the MySQL connection is terminated unexpectedly, you won't hear anything about it ... Logging in a file is therefore a lot more reliable – e.g. via Drupal's syslog module. Another quirk of some methods, such as node_load, is the use of an internal cache in the form of statistical variables. It is designed to prevent the re-execution of MySQL queries – but can be dangerous when several concurrent Drush processes are being used. It can easily happen that your code continues to work with a now-obsolete node object and then writes/overwrites it in the database: It's better to call node_load and similar functions with the parameter reset=TRUE, so that the cache is deactivated.
Not every piece of software can be equipped directly with an interface to the message broker. Third party software or legacy applications need to be bound via a messaging gateway. This translates the world of the messages into the language of the application. The processing of messages is therefore a matter of delegation, preferably to a stable interface as can be found in many products.
Depending on the scope and (in)completeness of the interface, the gateway can then also be more complex. It is often necessary to query the product APIs regularly to respond to new events. A suitable cronjob must then ensure that new orders are queried at intervals of, say, 10 minutes and that the appropriate messages are sent. To implement the second rule, the date of the last change for all datasets must also be saved. If the interface does not allow for this, the gateway itself needs its own database.
The transition to asynchronous message delivery brings with it changes that require new approaches and solutions: a rethinking is required not only in terms of system design, but also in the implementation, so that the benefits of the message broker end up being reflected in the finished software. Quality requirements such reliability and scalability are now easy to realize thanks to the message broker. The underlying philosophy behind event-oriented programming also helps in the construction of a suitable architecture, as the separation of message producers and consumers forces the decoupling and splitting up of program parts into independent programs: the basis for long-term, maintainable software. In a business context, RabbitMQ provides a flexible exchange that connects all the IT systems in the company, thereby holding the value-added chain together. The centrally defined routing allows for the modification and addition of systems. Existing software does not need to be adapted for this as only the message broker controls the flow of messages. Interested readers will find further details and patterns on the subject in the book „Enterprise Integration Patterns“.
This article appeared as the cover story for PHP Magazin, issue 05/2013 (August/September).