public class SimpleRpcServer

Summary

Implements a simple RPC service, responding to requests received via a Subscription.

Remarks

This class interprets requests such as those sent by instances of SimpleRpcClient.

The basic pattern for implementing a service is to subclass SimpleRpcServer, overriding HandleCall and HandleCast as appropriate, and then to create a Subscription object for receiving requests from clients, and start an instance of the SimpleRpcServer subclass with the Subscription.

            	string queueName = "ServiceRequestQueue"; // See also Subscription ctors
            	using (IConnection conn = new ConnectionFactory()
            	                                .CreateConnection(serverAddress)) {
            	    using (IModel ch = conn.CreateModel()) {
            	        Subscription sub = new Subscription(ch, queueName);
            	        new MySimpleRpcServerSubclass(sub).MainLoop();
            	    }
            	}
            

Note that this class itself does not declare any resources (exchanges, queues or bindings). The Subscription we use for receiving RPC requests should have already declared all the resources we need. See the Subscription constructors and the Subscription.Bind method.

If you are implementing a service that responds to "jms/stream-message"-formatted requests (as implemented by RabbitMQ.Client.Content.IStreamMessageReader), override HandleStreamMessageCall. Otherwise, override HandleSimpleCall or HandleCall as appropriate. Asynchronous, one-way requests are dealt with by HandleCast etc.

Every time a request is successfully received and processed within the server's MainLoop, the request message is Ack()ed using Subscription.Ack before the next request is retrieved. This causes the Subscription object to take care of acknowledging receipt and processing of the request message.

If transactional service is enabled, via SetTransactional(), then after every successful ProcessRequest, IModel.TxCommit is called. Making use of transactional service has effects on all parts of the application that share an IModel instance, completely changing the style of interaction with the AMQP server. For this reason, it is initially disabled, and must be explicitly enabled with a call to SetTransactional(). Please see the documentation for SetTransactional() for details.

To stop a running RPC server, call Close(). This will in turn Close() the Subscription, which will cause MainLoop() to return to its caller.

Unless overridden, ProcessRequest examines properties in the request content header, and uses them to dispatch to one of the Handle[...]() methods. See the documentation for ProcessRequest and each Handle[...] method for details.

See

Property Summary

Flags Type Name Summary
public bool Transactional (r)

Returns true if we are in "transactional" mode, or false if we are not.

Constructor Summary

Flags Name Summary
public SimpleRpcServer(Subscription subscription)

Create, but do not start, an instance that will receive requests via the given Subscription.

Method Summary

Flags Name Summary
public void Close()

Shut down the server, causing MainLoop() to return to its caller.

public virtual byte[] HandleCall(bool isRedelivered, IBasicProperties requestProperties, byte[] body, out IBasicProperties replyProperties)

Called by ProcessRequest(), this is the most general method that handles RPC-style requests.

public virtual void HandleCast(bool isRedelivered, IBasicProperties requestProperties, byte[] body)

Called by ProcessRequest(), this is the most general method that handles asynchronous, one-way requests.

public virtual byte[] HandleSimpleCall(bool isRedelivered, IBasicProperties requestProperties, byte[] body, out IBasicProperties replyProperties)

Called by the default HandleCall() implementation as a fallback.

public virtual void HandleSimpleCast(bool isRedelivered, IBasicProperties requestProperties, byte[] body)

Called by the default HandleCast() implementation as a fallback.

public virtual void HandleStreamMessageCall(IStreamMessageBuilder replyWriter, bool isRedelivered, IBasicProperties requestProperties, object[] args)

Called by HandleCall and HandleCast when a "jms/stream-message" request is received.

public void MainLoop()

Enters the main loop of the RPC service.

public virtual void ProcessRequest(BasicDeliverEventArgs evt)

Process a single request received from our subscription.

public void SetTransactional()

Enables transactional mode.

Property Detail

public bool Transactional (r)

Summary

Returns true if we are in "transactional" mode, or false if we are not.

Constructor Detail

SimpleRpcServer

public SimpleRpcServer(Subscription subscription)

Parameters
Name Type
subscription Subscription

Summary

Create, but do not start, an instance that will receive requests via the given Subscription.

Remarks

The instance is initially in non-transactional mode. See SetTransactional().

Call MainLoop() to start the request-processing loop.

Method Detail

Close

public void Close()

Flags public
Return type void

Summary

Shut down the server, causing MainLoop() to return to its caller.

Remarks

Acts by calling Close() on the server's Subscription object.

HandleCall

public virtual byte[] HandleCall(bool isRedelivered, IBasicProperties requestProperties, byte[] body, out IBasicProperties replyProperties)

Flags public virtual
Return type byte[]
Parameters
Name Type
isRedelivered bool
requestProperties IBasicProperties
body byte[]
replyProperties out IBasicProperties

Summary

Called by ProcessRequest(), this is the most general method that handles RPC-style requests.

Remarks

This method should map requestProperties and body to replyProperties and the returned byte array.

The default implementation checks requestProperties.ContentType, and if it is "jms/stream-message" (i.e. the current value of StreamMessageBuilder.MimeType), parses it using StreamMessageReader and delegates to HandleStreamMessageCall before encoding and returning the reply. If the ContentType is any other value, the request is passed to HandleSimpleCall instead.

The isRedelivered flag is true when the server knows for sure that it has tried to send this request previously (although not necessarily to this application). It is not a reliable indicator of previous receipt, however - the only claim it makes is that a delivery attempt was made, not that the attempt succeeded. Be careful if you choose to use the isRedelivered flag.

HandleCast

public virtual void HandleCast(bool isRedelivered, IBasicProperties requestProperties, byte[] body)

Flags public virtual
Return type void
Parameters
Name Type
isRedelivered bool
requestProperties IBasicProperties
body byte[]

Summary

Called by ProcessRequest(), this is the most general method that handles asynchronous, one-way requests.

Remarks

The default implementation checks requestProperties.ContentType, and if it is "jms/stream-message" (i.e. the current value of StreamMessageBuilder.MimeType), parses it using StreamMessageReader and delegates to HandleStreamMessageCall, passing in null as the replyWriter parameter to indicate that no reply is desired or possible. If the ContentType is any other value, the request is passed to HandleSimpleCast instead.

The isRedelivered flag is true when the server knows for sure that it has tried to send this request previously (although not necessarily to this application). It is not a reliable indicator of previous receipt, however - the only claim it makes is that a delivery attempt was made, not that the attempt succeeded. Be careful if you choose to use the isRedelivered flag.

HandleSimpleCall

public virtual byte[] HandleSimpleCall(bool isRedelivered, IBasicProperties requestProperties, byte[] body, out IBasicProperties replyProperties)

Flags public virtual
Return type byte[]
Parameters
Name Type
isRedelivered bool
requestProperties IBasicProperties
body byte[]
replyProperties out IBasicProperties

Summary

Called by the default HandleCall() implementation as a fallback.

Remarks

If the MIME ContentType of the request did not match any of the types specially recognised (e.g. "jms/stream-message"), this method is called instead with the raw bytes of the request. It should fill in replyProperties (or set it to null) and return a byte array to send back to the remote caller as a reply message.

HandleSimpleCast

public virtual void HandleSimpleCast(bool isRedelivered, IBasicProperties requestProperties, byte[] body)

Flags public virtual
Return type void
Parameters
Name Type
isRedelivered bool
requestProperties IBasicProperties
body byte[]

Summary

Called by the default HandleCast() implementation as a fallback.

Remarks

If the MIME ContentType of the request did not match any of the types specially recognised (e.g. "jms/stream-message"), this method is called instead with the raw bytes of the request.

HandleStreamMessageCall

public virtual void HandleStreamMessageCall(IStreamMessageBuilder replyWriter, bool isRedelivered, IBasicProperties requestProperties, object[] args)

Flags public virtual
Return type void
Parameters
Name Type
replyWriter IStreamMessageBuilder
isRedelivered bool
requestProperties IBasicProperties
args object[]

Summary

Called by HandleCall and HandleCast when a "jms/stream-message" request is received.

Remarks

The args array contains the values decoded by HandleCall or HandleCast.

The replyWriter parameter will be null if we were called from HandleCast, in which case a reply is not expected or possible, or non-null if we were called from HandleCall. Use the methods of replyWriter in this case to assemble your reply, which will be sent back to the remote caller.

This default implementation does nothing, which effectively sends back an empty reply to any and all remote callers.

MainLoop

public void MainLoop()

Flags public
Return type void

Summary

Enters the main loop of the RPC service.

Remarks

Retrieves requests repeatedly from the service's subscription. Each request is passed to ProcessRequest. Once ProcessRequest returns, the request is acknowledged via Subscription.Ack(). If transactional mode is enabled, TxCommit is then called. Finally, the loop begins again.

Runs until the subscription ends, which happens either as a result of disconnection, or of a call to Close().

ProcessRequest

public virtual void ProcessRequest(BasicDeliverEventArgs evt)

Flags public virtual
Return type void
Parameters
Name Type
evt BasicDeliverEventArgs

Summary

Process a single request received from our subscription.

Remarks

If the request's properties contain a non-null, non-empty CorrelationId string (see IBasicProperties), it is assumed to be a two-way call, requiring a response. The ReplyTo header property is used as the reply address (via PublicationAddress.Parse, unless that fails, in which case it is treated as a simple queue name), and the request is passed to HandleCall().

If the CorrelationId is absent or empty, the request is treated as one-way asynchronous event, and is passed to HandleCast().

Usually, overriding HandleCall(), HandleCast(), or one of their delegates is sufficient to implement a service, but in some cases overriding ProcessRequest() is required. Overriding ProcessRequest() gives the opportunity to implement schemes for detecting interaction patterns other than simple request/response or one-way communication.

SetTransactional

public void SetTransactional()

Flags public
Return type void

Summary

Enables transactional mode.

Remarks

Once enabled, transactional mode is not only enabled for all users of the underlying IModel instance, but cannot be disabled without shutting down the entire IModel (which involves shutting down all the services depending on it, and should not be undertaken lightly).

This method calls IModel.TxSelect, every time it is called. (TxSelect is idempotent, so this is harmless.)