IDisposableThis class interprets requests such as those sent by instances of SimpleRpcClient.
The basic pattern for implementing a service is to subclass SimpleRpcServer, overriding HandleCall and HandleCast as appropriate, and then to create a Subscription object for receiving requests from clients, and start an instance of the SimpleRpcServer subclass with the Subscription.
string queueName = "ServiceRequestQueue"; // See also Subscription ctors
using (IConnection conn = new ConnectionFactory()
.CreateConnection(serverAddress)) {
using (IModel ch = conn.CreateModel()) {
ushort ticket = ch.AccessRequest("/data");
Subscription sub = new Subscription(ch, ticket, queueName);
new MySimpleRpcServerSubclass(sub).MainLoop();
}
}
Note that this class itself does not declare any resources (exchanges, queues or bindings). The Subscription we use for receiving RPC requests should have already declared all the resources we need. See the Subscription constructors and the Subscription.Bind method.
If you are implementing a service that responds to "jms/stream-message"-formatted requests (as implemented by RabbitMQ.Client.Content.IStreamMessageReader), override HandleStreamMessageCall. Otherwise, override HandleSimpleCall or HandleCall as appropriate. Asynchronous, one-way requests are dealt with by HandleCast etc.
Every time a request is successfully received and processed within the server's MainLoop, the request message is Ack()ed using Subscription.Ack before the next request is retrieved. This causes the Subscription object to take care of acknowledging receipt and processing of the request message.
If transactional service is enabled, via SetTransactional(), then after every successful ProcessRequest, IModel.TxCommit is called. Making use of transactional service has effects on all parts of the application that share an IModel instance, completely changing the style of interaction with the AMQP server. For this reason, it is initially disabled, and must be explicitly enabled with a call to SetTransactional(). Please see the documentation for SetTransactional() for details.
To stop a running RPC server, call Close(). This will in turn Close() the Subscription, which will cause MainLoop() to return to its caller.
Unless overridden, ProcessRequest examines properties in the request content header, and uses them to dispatch to one of the Handle[...]() methods. See the documentation for ProcessRequest and each Handle[...] method for details.
| Flags | Type | Name | Summary |
|---|---|---|---|
| public |
bool
|
Transactional
(r)
|
Returns true if we are in "transactional" mode, or false if we are not. |
| Flags | Name | Summary |
|---|---|---|
| public |
SimpleRpcServer(Subscription subscription)
|
Create, but do not start, an instance that will receive requests via the given Subscription. |
| Flags | Name | Summary |
|---|---|---|
| public |
void Close()
|
Shut down the server, causing MainLoop() to return to its caller. |
| public virtual |
byte[] HandleCall(bool isRedelivered, IBasicProperties requestProperties, byte[] body, out IBasicProperties replyProperties)
|
Called by ProcessRequest(), this is the most general method that handles RPC-style requests. |
| public virtual |
void HandleCast(bool isRedelivered, IBasicProperties requestProperties, byte[] body)
|
Called by ProcessRequest(), this is the most general method that handles asynchronous, one-way requests. |
| public virtual |
byte[] HandleSimpleCall(bool isRedelivered, IBasicProperties requestProperties, byte[] body, out IBasicProperties replyProperties)
|
Called by the default HandleCall() implementation as a fallback. |
| public virtual |
void HandleSimpleCast(bool isRedelivered, IBasicProperties requestProperties, byte[] body)
|
Called by the default HandleCast() implementation as a fallback. |
| public virtual |
void HandleStreamMessageCall(IStreamMessageBuilder replyWriter, bool isRedelivered, IBasicProperties requestProperties, object[] args)
|
Called by HandleCall and HandleCast when a "jms/stream-message" request is received. |
| public |
void MainLoop()
|
Enters the main loop of the RPC service. |
| public virtual |
void ProcessRequest(BasicDeliverEventArgs evt)
|
Process a single request received from our subscription. |
| public |
void SetTransactional()
|
Enables transactional mode. |
public
SimpleRpcServer(Subscription subscription)
| Parameters |
|
|---|
The instance is initially in non-transactional mode. See SetTransactional().
Call MainLoop() to start the request-processing loop.
public
void Close()
| Flags | public |
|---|---|
| Return type |
void
|
public virtual
byte[] HandleCall(bool isRedelivered, IBasicProperties requestProperties, byte[] body, out IBasicProperties replyProperties)
| Flags | public virtual | ||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|
| Return type |
byte[]
|
||||||||||
| Parameters |
|
This method should map requestProperties and body to replyProperties and the returned byte array.
The default implementation checks requestProperties.ContentType, and if it is "jms/stream-message" (i.e. the current value of StreamMessageBuilder.MimeType), parses it using StreamMessageReader and delegates to HandleStreamMessageCall before encoding and returning the reply. If the ContentType is any other value, the request is passed to HandleSimpleCall instead.
The isRedelivered flag is true when the server knows for sure that it has tried to send this request previously (although not necessarily to this application). It is not a reliable indicator of previous receipt, however - the only claim it makes is that a delivery attempt was made, not that the attempt succeeded. Be careful if you choose to use the isRedelivered flag.
public virtual
void HandleCast(bool isRedelivered, IBasicProperties requestProperties, byte[] body)
| Flags | public virtual | ||||||||
|---|---|---|---|---|---|---|---|---|---|
| Return type |
void
|
||||||||
| Parameters |
|
The default implementation checks requestProperties.ContentType, and if it is "jms/stream-message" (i.e. the current value of StreamMessageBuilder.MimeType), parses it using StreamMessageReader and delegates to HandleStreamMessageCall, passing in null as the replyWriter parameter to indicate that no reply is desired or possible. If the ContentType is any other value, the request is passed to HandleSimpleCast instead.
The isRedelivered flag is true when the server knows for sure that it has tried to send this request previously (although not necessarily to this application). It is not a reliable indicator of previous receipt, however - the only claim it makes is that a delivery attempt was made, not that the attempt succeeded. Be careful if you choose to use the isRedelivered flag.
public virtual
byte[] HandleSimpleCall(bool isRedelivered, IBasicProperties requestProperties, byte[] body, out IBasicProperties replyProperties)
| Flags | public virtual | ||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|
| Return type |
byte[]
|
||||||||||
| Parameters |
|
public virtual
void HandleSimpleCast(bool isRedelivered, IBasicProperties requestProperties, byte[] body)
| Flags | public virtual | ||||||||
|---|---|---|---|---|---|---|---|---|---|
| Return type |
void
|
||||||||
| Parameters |
|
public virtual
void HandleStreamMessageCall(IStreamMessageBuilder replyWriter, bool isRedelivered, IBasicProperties requestProperties, object[] args)
| Flags | public virtual | ||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|
| Return type |
void
|
||||||||||
| Parameters |
|
The args array contains the values decoded by HandleCall or HandleCast.
The replyWriter parameter will be null if we were called from HandleCast, in which case a reply is not expected or possible, or non-null if we were called from HandleCall. Use the methods of replyWriter in this case to assemble your reply, which will be sent back to the remote caller.
This default implementation does nothing, which effectively sends back an empty reply to any and all remote callers.
public
void MainLoop()
| Flags | public |
|---|---|
| Return type |
void
|
Retrieves requests repeatedly from the service's subscription. Each request is passed to ProcessRequest. Once ProcessRequest returns, the request is acknowledged via Subscription.Ack(). If transactional mode is enabled, TxCommit is then called. Finally, the loop begins again.
Runs until the subscription ends, which happens either as a result of disconnection, or of a call to Close().
public virtual
void ProcessRequest(BasicDeliverEventArgs evt)
| Flags | public virtual | ||||
|---|---|---|---|---|---|
| Return type |
void
|
||||
| Parameters |
|
If the request's properties contain a non-null, non-empty CorrelationId string (see IBasicProperties), it is assumed to be a two-way call, requiring a response. The ReplyTo header property is used as the reply address (via PublicationAddress.Parse, unless that fails, in which case it is treated as a simple queue name), and the request is passed to HandleCall().
If the CorrelationId is absent or empty, the request is treated as one-way asynchronous event, and is passed to HandleCast().
Usually, overriding HandleCall(), HandleCast(), or one of their delegates is sufficient to implement a service, but in some cases overriding ProcessRequest() is required. Overriding ProcessRequest() gives the opportunity to implement schemes for detecting interaction patterns other than simple request/response or one-way communication.
public
void SetTransactional()
| Flags | public |
|---|---|
| Return type |
void
|
Once enabled, transactional mode is not only enabled for all users of the underlying IModel instance, but cannot be disabled without shutting down the entire IModel (which involves shutting down all the services depending on it, and should not be undertaken lightly).
This method calls IModel.TxSelect, every time it is called. (TxSelect is idempotent, so this is harmless.)