Different ways to process an XLANGMessage within an .Net Helper component invoked by an orchestration

Friday, August 12, 2011

Introduction

One of the most common scenarios in BizTalk applications is when an orchestration receives and processes an incoming XML document to produce a result message. Sometimes this latter can be generated just transforming the inbound message with a map, but in another cases the orchestration has to invoke  a method exposed by a helper component which contains the necessary business logic to process the request document and produce a new XML response message. Usually the signature of this method is similar to the following code snippet:
public XmlDocument ProcessRequestReturnXmlDocument(XLANGMessage message)
As you can see, the method above does not return an XLANGMessage object as expected and the reason is quite straightforward: the only constructor exposed by theXLANGMessage class contained in the Microsoft.XLANGs.BaseTypes assembly is protected and inaccessible to user code. The XmlDocument is commonly used by developers to read the entire content of an XML message part using a single line of code (document.DocumentElement.OuterXml)and to access the value of one or multiple elements using an XPath expression and the SelectSingleNode/SelectNodes methods exposed by the class. Using an instance of the XmlDocument class to manipulate the content of an XLANGMessage is a flexible and handy technique, but it can easily lead to out of memory exceptions when dealing with large messages or with a significant amount of medium-size messages within the same host process. In fact, the use of an XmlDocument instance forces the message content to be entirely loaded into memory in order to build the object graph for the DOM and the total amount of memory used by a single instance of this class can grow up to 10 times the actual message size. See the following articles for more information on this topic:
Some months ago I decided to create a sample to compare the different techniques that can be used when invoking an helper component within an orchestration to process the request message and produce a response XML document. The 4 use cases described below have been fully tested on both BizTalk Server 2006 R2 and BizTalk Server 2009. Below you can find a pointer to the BizTalk Server 2009 version of the code. All the use cases are exposed by the same 2 Request-Response WCF Receive Locations:
  • HXM.Calculator.WCF-BasicHttp.ReceiveLocation: a WCF-BasicHttp Receive Location hosted by the BizTalkServerIsolatedHost.
  • HXM.Calculator.WCF-Custom.ReceiveLocation: a WCF-Custom Receive Location hosted by an in-process host which uses a CustomBinding  composed of the BinaryMessageEncodingBindingElement + TcpTransportBindingElement.
The 4 test cases implement the same scenario using a different approach. They all process the operations contained within the CalculatorRequest message and  return a CalculatorResponse document containing results.
CalculatorRequest Message 
<CalculatorRequest mlns="http://Microsoft.BizTalk.CAT.Samples.HandleXLANGMessages.Schemas.CalculatorRequest">
  <Method>XmlDocumentOrchestration</Method>
  <Operations>
    <Operation>
      <Operator>+</Operator>
      <Operand1>82</Operand1>
      <Operand2>18</Operand2>
    </Operation>
    <Operation>
      <Operator>-</Operator>
      <Operand1>30</Operand1>
      <Operand2>12</Operand2>
    </Operation>
    <Operation>
      <Operator>*</Operator>
      <Operand1>25</Operand1>
      <Operand2>8</Operand2>
    </Operation>
    <Operation>
      <Operator>\</Operator>
      <Operand1>100</Operand1>
      <Operand2>25</Operand2>
    </Operation>
      <Operation>
      <Operator>+</Operator>
      <Operand1>100</Operand1>
      <Operand2>32</Operand2>
    </Operation>
  </Operations>
</CalculatorRequest>

CalculatorResponse Message
<CalculatorResponse xmlns="http://Microsoft.BizTalk.CAT.Samples.HandleXLANGMessages.Schemas.CalculatorResponse">
      <Status>Ok</Status>
      <Results>
            <Result>
                  <Value>100</Value>
                  <Error>None</Error>
            </Result>
            <Result>
                  <Value>18</Value>
                  <Error>None</Error>
            </Result>
            <Result>
                  <Value>200</Value>
                  <Error>None</Error>
            </Result>
            <Result>
                  <Value>4</Value>
                  <Error>None</Error>
            </Result>
            <Result>
                  <Value>132</Value>
                  <Error>None</Error>
            </Result>
      </Results>
</CalculatorResponse>

I created a BizTalk project called Schemas to create the XML schemas that define and model the structure of the CalculatorRequest and CalculatorResponse messages. Then I created another project called Orchestrations and I implemented a different orchestration for each technique:
  • XmlDocumentOrchestration: the helper component invoked by the orchestration uses an XmlDocument object and in particular the SelectNodes method to read data of each operation within the incoming request message and another XmlDocument instance to create the outbound response document which contains results.
  • StreamOrchestration: the helper component invoked by the orchestration uses a VirtualStream and an XmlReader to read the content of the request message and uses a VirtualStream and an XmlWriter to generate the response message.
  • MessageClassesOrchestration: the helper component invoked by the orchestration deserializes the CalculatorRequest message to an instance of the CalculatorRequest .NET class, loops through the collections of operations and stores results within a new instance of the CalculatorResponse class. This latter is then serialized to an XML stream within the orchestration.
  • CustomBtxMessageOrchestration: This is probably the most tricky and interesting test case. In fact, the helper component uses an XmlReader to process the data contained in the CalculatorRequest  message and a uses VirtualStream and and XmlWriter to generate the response message. However, instead of returning a Stream object as the method invoked by the StreamOrchestration, this method creates and returns a brand new XLANGMessage. See below for more details.
I created an asynchronous, one-way version for each of the above use cases. In this case requests are submitted to BizTalk via a one-way FILE Receive Location while the response documents generated by each orchestration are persisted to another folder using a one-way FILE Send Port.
  • AsyncXmlDocumentOrchestration: the helper component invoked by the orchestration uses an XmlDocument object and in particular the SelectNodes method to read data of each operation within the incoming request message and another XmlDocument instance to create the outbound response document which contains results.
  • AsyncStreamOrchestration: the helper component invoked by the orchestration uses a VirtualStream and an XmlReader to read the content of the request message and uses a VirtualStream and an XmlWriter to generate the response message.
  • AsyncMessageClassesOrchestration: the helper component invoked by the orchestration deserializes the CalculatorRequest message to an instance of the CalculatorRequest .NET class, loops through the collections of operations and stores results within a new instance of the CalculatorResponse class. This latter is then serialized to an XML stream within the orchestration.
  • AsyncCustomBtxMessageOrchestration: This is probably the most tricky and interesting test case. In fact, the helper component uses an XmlReader to process the data contained in the CalculatorRequest  message and a uses VirtualStream and and XmlWriter to generate the response message. However, instead of returning a Stream object as the method invoked by the StreamOrchestration, this method creates and returns a brand new XLANGMessage. See below for more details.
Let’s review each test case in detail. Take into account that the synchronous and asynchronous version of each use case exploit exactly the same code, so in the code section I will refer only to the synchronous version.

XmlDocumentOrchestration

The following picture depicts the architecture of the XmlDocumentOrchestration test case.
XmlDocumentOrchestration
Message Flow:
  1. A WCF-BasicHttp or WCF-Custom Request-Response Receive Location receives a new CalculatorRequest xml document from the Test Agent/Client Application.
  2. The XML disassembler component within the XMLTransmit pipeline promotes the Method element inside the CalculatorRequest xml document. The Message Agent submits the incoming message to the MessageBox (BizTalkMsgBoxDb).
  3. The inbound request starts a new instance of the XmlDocumentOrchestration. This latter uses a Direct Port receive the CalculatorRequest messages with the Method promoted property = “XmlDocumentOrchestration”.
  4. The XmlDocumentOrchestration invokes the ProcessRequestReturnXmlDocument method exposed by the RequestManager helper component. This object processes within a loop all the operations contained in the inbound XLANGMessage. In particular, the ProcessRequestReturnXmlDocument loads the entire message into an XmlDocument object and uses the XmlDocument.SelectNodes(XPathExpression) method to retrieve the operations contained in the inbound document.
  5. The ProcessRequestReturnXmlDocument  method returns the response message as an XmlDocument.
  6. The XmlDocumentOrchestration publishes the CalculatorResponse message to the MessageBox (BizTalkMsgBoxDb).
  7. The response message is retrieved by the Request-Response WCF-BasicHttp or WCF-Custom Receive Location.
  8. The response message is returned to the Test Agent/Client Application.
Orchestration:
The following picture shows the structure of the test XmlDocumentOrchestration.
XmlDocumentOrchestration2

AsyncXmlDocumentOrchestration

The following picture depicts the architecture of the AsyncXmlDocumentOrchestration test case.
AsyncXmlDocumentOrchestration
Message Flow:
  1. A One-Way FILE Receive Location receives a new CalculatorRequest xml document from the Client Application or Loadgen.
  2. The XML disassembler component within the XMLTransmit pipeline promotes the Method element inside the CalculatorRequest xml document. The Message Agent submits the incoming message to the MessageBox (BizTalkMsgBoxDb).
  3. The inbound request starts a new instance of the AsyncXmlDocumentOrchestration. This latter uses a Direct Port receive the CalculatorRequest messages with the Method promoted property = “AsyncXmlDocumentOrchestration”.
  4. The AsyncXmlDocumentOrchestration invokes the ProcessRequestReturnXmlDocument method exposed by the RequestManager helper component. This object processes within a loop all the operations contained in the inbound XLANGMessage. In particular, the ProcessRequestReturnXmlDocument loads the entire message into an XmlDocument object and uses the XmlDocument.SelectNodes(XPathExpression) method to retrieve the operations contained in the inbound document.
  5. The ProcessRequestReturnXmlDocument  method returns the response message as an XmlDocument.
  6. The AsyncXmlDocumentOrchestration publishes the CalculatorResponse message to the MessageBox (BizTalkMsgBoxDb).
  7. The response message is retrieved by a One-Way FILE Send Port.
  8. The response message is written to an output folder by the One-Way FILE Send Port.
Orchestration:
The following picture shows the structure of the test AsyncXmlDocumentOrchestration.
AsyncXmlDocumentOrchestration2
Code
The XmlDocumentOrchestration receives a request message and returns a response document through a Request-Response Direct Bound Logical Port. The Filter Expression defined on the Activate Receive Shape is configured to receive all the CalculatorRequest messages which Method promoted property equals ‘XmlDocumentOrchestration’. The Expression Shape called BusinessLogic contains the following code:
logHelper.WriteLine("[XmlDocumentOrchestration] Request message received.");
xmlDocument = requestManager.ProcessRequestReturnXmlDocument(requestMessage);
logHelper.WriteLine("[XmlDocumentOrchestration] Request message successfully processed.");

while the Message Assignment Shape simply assigns the  XmlDocument returned by the helper component to the response message.
responseMessage = xmlDocument;

This is probably the most common approach used by developers to create and assign an XML document to a new XLANGMessage. Let’s see the code of the method invoked by the XmlDocumentOrchestration:
ProcessRequestReturnXmlDocument Method
public XmlDocument ProcessRequestReturnXmlDocument(XLANGMessage message)
{
    XmlDocument responseDocument = new XmlDocument();
    List<Response> responseList = new List<Response>();
    string op = null;
    string status = Ok;
    string error = null;
    double operand1 = 0;
    double operand2 = 0;
    double value = 0;
    bool ok = true;
    bool succeeded = true;
    int i = 0;

    try
    {
        logHelper.WriteLine("[RequestManager][XmlDocumentOrchestration] Request message received.");
        XmlDocument xmlDocument = (XmlDocument)message[0].RetrieveAs(typeof(XmlDocument));
        if (xmlDocument != null)
        {
            XmlNodeList nodeList = xmlDocument.SelectNodes(OperationXPath);
            if (nodeList != null && nodeList.Count > 0)
            {
                bool parsed1 = true;
                bool parsed2 = true;
                for (i = 0; i < nodeList.Count; i++)
                {
                    if (nodeListIdea.HasChildNodes)
                    {
                        succeeded = true;
                        error = None;
                        value = 0;
                        for (int j = 0; j < nodeListIdea.ChildNodes.Count; j++)
                        {
                            switch (nodeListIdea.ChildNodes[j].LocalName)
                            {
                                case "Operator":
                                    op = nodeListIdea.ChildNodes[j].InnerText;
                                    break;
                                case "Operand1":
                                    parsed1 = double.TryParse(nodeListIdea.ChildNodes[j].InnerText, out operand1);
                                    break;
                                case "Operand2":
                                    parsed2 = double.TryParse(nodeListIdea.ChildNodes[j].InnerText, out operand2);
                                    break;
                            }
                        }
                        if (parsed1 && parsed2)
                        {
                            switch (op)
                            {
                                case "+":
                                    value = operand1 + operand2;
                                    break;
                                case "-":
                                    value = operand1 - operand2;
                                    break;
                                case "*":
                                    value = operand1 * operand2;
                                    break;
                                case "/":
                                    value = operand1 / operand2;
                                    break;
                                default:
                                    error = string.Format(OperationUnknownErrorMessageFormat, op);
                                    status = OperationsFailed;
                                    ok = false;
                                    succeeded = false;
                                    break;
                            }
                        }
                        else
                        {
                            succeeded = false;
                            ok = false;
                            status = OperationsFailed;
                            if (!parsed1)
                            {
                                error = string.Format(OperandIsNotANumberMessageFormat, 1, i + 1);
                            }
                            if (!parsed2)
                            {
                                if (parsed1)
                                {
                                    error = string.Format("{0}\r\n{1}", error, string.Format(OperandIsNotANumberMessageFormat, 2, i + 1));
                                }
                                else
                                {
                                    error = string.Format(OperandIsNotANumberMessageFormat, 2, i + 1);
                                }
                            }
                        }
                        if (succeeded)
                        {
                            logHelper.WriteLine(string.Format(OperationFormat, "XmlDocumentOrchestration", operand1, op, operand2, value));
                        }
                        else
                        {
                            logHelper.WriteLine(error);
                        }
                        responseList.Add(new Response(error, value));
                    }
                }
            }
        }
        StringBuilder builder = new StringBuilder();
        using (XmlWriter writer = XmlWriter.Create(builder))
        {
            writer.WriteStartDocument();
            writer.WriteStartElement("CalculatorResponse", CalculatorResponseNamespace);
            writer.WriteStartElement("Status", CalculatorResponseNamespace);
            writer.WriteString(status);
            writer.WriteEndElement();
            writer.WriteStartElement("Results", CalculatorResponseNamespace);
            for (i = 0; i < responseList.Count; i++)
            {
                writer.WriteStartElement("Result", CalculatorResponseNamespace);
                writer.WriteStartElement("Value", CalculatorResponseNamespace);
                writer.WriteString(responseListIdea.Value.ToString());
                writer.WriteEndElement();
                writer.WriteStartElement("Error", CalculatorResponseNamespace);
                writer.WriteString(responseListIdea.Error);
                writer.WriteEndElement();
                writer.WriteEndElement();
            }
            writer.WriteEndElement();
            writer.WriteEndElement();
        }
        string text = builder.ToString();
        responseDocument.LoadXml(text);
        if (ok)
        {
            logHelper.WriteLine("[RequestManager][XmlDocumentOrchestration] Response message successfully processed.");
        }
        else
        {
            logHelper.WriteLine("[RequestManager][XmlDocumentOrchestration] Request failed.");
        }
    }
    catch (Exception ex)
    {
        logHelper.WriteLine(ex.Message);
        responseDocument.LoadXml(string.Format(ErrorMessageFormat, ex.Message));
    }
    finally
    {
        message.Dispose();
    }
    return responseDocument;
}

Comment

As already highlighted above, using an XmlDocument to process the content of an XLANGMessage is extremely handy and powerful, but it can easily lead to high memory usage, especially when dealing with large messages.

StreamOrchestration

The following picture depicts the architecture of the StreamOrchestration test case.
StreamOrchestration
Message Flow:
  1. A WCF-BasicHttp or WCF-Custom Request-Response Receive Location receives a new CalculatorRequest xml document from the Test Agent/Client Application.
  2. The XML disassembler component within the XMLTransmit pipeline promotes the Method element inside the CalculatorRequest xml document. The Message Agent submits the incoming message to the MessageBox (BizTalkMsgBoxDb).
  3. The inbound request starts a new instance of the StreamOrchestration. This latter uses a Direct Port receive the CalculatorRequest messages with the Method promoted property = “StreamOrchestration”.
  4. The StreamOrchestration invokes the ProcessRequestReturnStream method exposed by the RequestManager helper component. This object processes within a loop all the operations contained in the inbound XLANGMessage. In particular, the ProcessRequestReturnStream uses an XmlReader object to read and process the operations and an XmlWriter and a VirtualStream objects to produce the response message.
  5. The ProcessRequestReturnStream  method returns the response message as a Stream (VirtualStream) object.
  6. The StreamOrchestration publishes the CalculatorResponse message to the MessageBox (BizTalkMsgBoxDb).
  7. The response message is retrieved by the Request-Response WCF-BasicHttp or WCF-Custom Receive Location.
  8. The response message is returned to the Test Agent/Client Application.
Orchestration:
The following picture shows the structure of the test StreamOrchestration.
StreamOrchestration2

AsyncStreamOrchestration

The following picture depicts the architecture of the AsyncStreamOrchestration test case.
AsyncStreamOrchestration
Message Flow:
  1. A One-Way FILE Receive Location receives a new CalculatorRequest xml document from the Client Application or Loadgen.
  2. The XML disassembler component within the XMLTransmit pipeline promotes the Method element inside the CalculatorRequest xml document. The Message Agent submits the incoming message to the MessageBox (BizTalkMsgBoxDb).
  3. The inbound request starts a new instance of the AsyncStreamOrchestration. This latter uses a Direct Port receive the CalculatorRequest messages with the Method promoted property = “AsyncStreamOrchestration”.
  4. The AsyncStreamOrchestration invokes the ProcessRequestReturnStream method exposed by the RequestManager helper component. This object processes within a loop all the operations contained in the inbound XLANGMessage. In particular, the ProcessRequestReturnStream uses an XmlReader object to read and process the operations and an XmlWriter and a VirtualStream objects to produce the response message.
  5. The ProcessRequestReturnStream  method returns the response message as a Stream (VirtualStream) object.
  6. The AsyncStreamOrchestration publishes the CalculatorResponse message to the MessageBox (BizTalkMsgBoxDb).
  7. The response message is retrieved by a One-Way FILE Send Port.
  8. The response message is written to an output folder by the One-Way FILE Send Port.
Orchestration:
The following picture shows the structure of the test AsyncStreamOrchestration.
AsyncStreamOrchestration2
Code
The StreamOrchestration receives a request message and returns a response document through a Request-Response Direct Bound Logical Port. The Filter Expression defined on the Activate Receive Shape is configured to receive all the CalculatorRequest messages which Method promoted property equals ‘StreamOrchestration’. The Expression Shape called BusinessLogic contains the following code:
logHelper.WriteLine("[StreamOrchestration] Request message received.");
stream = requestManager.ProcessRequestReturnStream(requestMessage);
logHelper.WriteLine("[StreamOrchestration] Request message successfully processed.");

As the name suggests, the ProcessRequestReturnStream method exposed by the RequestManager component receives the CalculatorRequest message as XLANGMessage input parameter and returns a Stream object containing the response document. In particular, the method adopts a streaming approach to process the incoming message and generate the response message:
  • It uses a VirtualStream and an XmlReader to read the content of the request message.
  • It uses a VirtualStream and an XmlWriter to generate the response message.
ProcessRequestReturnStream Method
public Stream ProcessRequestReturnStream(XLANGMessage message)
{
    VirtualStream stream = null;
    List<Response> responseList = new List<Response>();
    string op = null;
    string status = Ok;
    string error = null;
    double operand1 = 0;
    double operand2 = 0;
    double value = 0;
    bool ok = true;
    bool succeeded = true;
    int i = 0;

    try
    {
        logHelper.WriteLine("[RequestManager][StreamOrchestration] Request message received.");
        using (VirtualStream virtualStream = new VirtualStream(bufferSize, thresholdSize))
        {
            using (Stream partStream = (Stream)message[0].RetrieveAs(typeof(Stream)))
            {
                using (XmlReader reader = XmlReader.Create(partStream))
                {
                    while (reader.Read() && ok)
                    {
                        if (reader.LocalName == "Operator" &&
                            reader.NodeType == XmlNodeType.Element)
                        {
                            succeeded = true;
                            error = None;
                            value = 0;
                            op = reader.ReadElementContentAsString();
                            reader.MoveToContent();
                            operand1 = reader.ReadElementContentAsDouble();
                            reader.MoveToContent();
                            operand2 = reader.ReadElementContentAsDouble();
                            i++;
                            switch (op)
                            {
                                case "+":
                                    value = operand1 + operand2;
                                    break;
                                case "-":
                                    value = operand1 - operand2;
                                    break;
                                case "*":
                                    value = operand1 * operand2;
                                    break;
                                case "/":
                                    value = operand1 / operand2;
                                    break;
                                default:
                                    error = string.Format(OperationUnknownErrorMessageFormat, op);
                                    status = OperationsFailed;
                                    ok = false;
                                    succeeded = false;
                                    break;
                            }
                            if (succeeded)
                            {
                                logHelper.WriteLine(string.Format(OperationFormat, "StreamOrchestration", operand1, op, operand2, value));
                            }
                            else
                            {
                                logHelper.WriteLine(error);
                            }
                            responseList.Add(new Response(error, value));
                        }
                    }
                }
            }
        }
        if (ok)
        {
            stream = new VirtualStream(bufferSize, thresholdSize);
            using (XmlWriter writer = XmlWriter.Create(stream))
            {
                writer.WriteStartDocument();
                writer.WriteStartElement("CalculatorResponse", CalculatorResponseNamespace);
                writer.WriteStartElement("Status", CalculatorResponseNamespace);
                writer.WriteString(status);
                writer.WriteEndElement();
                writer.WriteStartElement("Results", CalculatorResponseNamespace);
                for (i = 0; i < responseList.Count; i++)
                {
                    writer.WriteStartElement("Result", CalculatorResponseNamespace);
                    writer.WriteStartElement("Value", CalculatorResponseNamespace);
                    writer.WriteString(responseListIdea.Value.ToString());
                    writer.WriteEndElement();
                    writer.WriteStartElement("Error", CalculatorResponseNamespace);
                    writer.WriteString(responseListIdea.Error);
                    writer.WriteEndElement();
                    writer.WriteEndElement();
                }
                writer.WriteEndElement();
                writer.WriteEndElement();
            }
            stream.Seek(0, SeekOrigin.Begin);
            logHelper.WriteLine("[RequestManager][StreamOrchestration] Response message successfully processed.");
        }
    }
    catch (Exception ex)
    {
        logHelper.WriteLine(string.Format("[RequestManager][StreamOrchestration] {0}", ex.Message));
        stream = new VirtualStream(bufferSize, thresholdSize);
        using (XmlWriter writer = XmlWriter.Create(stream))
        {
            writer.WriteStartDocument();
            writer.WriteStartElement("CalculatorResponse", CalculatorResponseNamespace);
            writer.WriteStartElement("Status", CalculatorResponseNamespace);
            writer.WriteString(ex.Message);
            writer.WriteEndElement();
            writer.WriteEndElement();
        }
        stream.Seek(0, SeekOrigin.Begin);
    }
    finally
    {
        message.Dispose();
    }
    return stream;
}

VirtualStream and ReadOnlySeekableStream
The ReadOnlySeekableStream can be used to wrap and read the content of a non-seekable stream in those cases where after reading it’s necessary to reposition the cursor at the beginning of the stream.  While reading the content of the wrapped stream, the ReadOnlySeekableStream copies its content to a temporary file created in the folder specified by the TMP and/or TEMP environment variables. However, the class exposes a public constructor which allows to specify a different persistence stream as alterative to the FileStream (e.g. VirtualStream, MemoryStream):
public ReadOnlySeekableStream (Stream source, Stream persist, int maxBuffer)

The VirtualStream in its turn is very useful when dealing with large messages within a custom pipeline component or a helper component invoked by an orchestration. In fact, if the document size is bigger than a certain threshold, the default is 1MB,  the message is persisted to a temporary file. This file is created inside a folder identified by the TEMP environment variable of the service account used to run the current host instance. So, when parsing, mapping and dealing in general with large messages, it’s a good practice to move the location of the temporary folder for the BizTalk service account to a dedicated local disk separate from the volume hosting the Windows OS. The VirtualStream class exposes a particular constructor (see below) which enables to specify the size of the internal buffer and the size of this threshold, so you can set the best value based on a given scenario. It’s a good practice to expose these 2 variables, the buffer size and the threshold size as properties of custom pipeline components or helper components.
public VirtualStream (int bufferSize, int thresholdSize)

The use of the VirtualStream and ReadOnlySeekableStream (both classes are contained in the Microsoft.BizTalk.Streaming.dll assembly) can be combined to provide both “seekability” and “overflow to file system” capabilities to custom pipeline components and helper classes. This accommodates the processing of large messages without loading the entire message into memory. The following code could be used in a pipeline component to implement this functionality.
int bufferSize = 0x280;
int thresholdSize = 0x100000;
Stream vStream = new VirtualStream(bufferSize, thresholdSize);
Stream seekStream = new ReadOnlySeekableStream(inboundStream, vStream, bufferSize);

As you can see, the VirtualStream is used as persistence storage for the ReadOnlySeekableStream. In this way, if the length of the original stream is lower the threshold size, its content will be persisted to a MemoryStream, otherwise it will be written to a temporary file. 
As a rule of thumb, if the original stream is seekable, there's no need to use the ReadOnlySeekableStream and you can use just the VirtualStream. However, some of the streams used by the Messaging Engine are forward-only, non-seekable streams, so when it's necessary to read the content of a message within a custom pipeline component and reposition the cursor at the beginning of the stream at the end, it's a good practice combining the use of the VirtualStream and ReadOnlySeekableStream as shown by the code snippet above. See Optimizing Pipeline Performance for more information on this topic.
Response Message Assignment
To assign the content of the stream returned by the ProcessRequestReturnStream to the response message, the Message Assignment Shape (see the code below) invokes the SetResponse method exposed by another helper component called ResponseManager.
responseMessage = null;
responseManager.SetResponse(responseMessage, stream);

This SetResponse method uses the LoadFrom method exposed by the XLANGPart class to assign the content of the stream to the message part of the response document.
public void SetResponse(XLANGMessage message, Stream stream)
{
    try
    {
        if (stream != null &&
            message != null &&
            message.Count > 0)
        {
            if (stream.CanSeek)
            {
                stream.Seek(0, SeekOrigin.Begin);
            }
            message[0].LoadFrom(stream);
        }
    }
    catch (Exception ex)
    {
        Debug.WriteLine(string.Format("[ResponseManager] {0}", ex.Message));
    }
    finally
    {
        message.Dispose();
    }
} 


Comment

This streaming approach used by StreamOrchestration is extremely useful and performant especially when dealing with large messages.

MessageClassesOrchestration

The following picture depicts the architecture of the MessageClassesOrchestration test case.
MessageClassesOrchestration
Message Flow:
  1. A WCF-BasicHttp or WCF-Custom Request-Response Receive Location receives a new CalculatorRequest xml document from the Test Agent/Client Application.
  2. The XML disassembler component within the XMLTransmit pipeline promotes the Method element inside the CalculatorRequest xml document. The Message Agent submits the incoming message to the MessageBox (BizTalkMsgBoxDb).
  3. The inbound request starts a new instance of the MessageClassesOrchestration. This latter uses a Direct Port receive the CalculatorRequest messages with the Method promoted property = “MessageClassesOrchestration”.
  4. The MessageClassesOrchestration invokes the ProcessRequestReturnObject method exposed by the RequestManager helper component. This object processes within a loop all the operations contained in the inbound XLANGMessage. In particular, the ProcessRequestReturnObject deserializes the CalculatorRequest message into a new instance of the CalculatorRequest .NET class, processes al the operations contained in this object and accumulates the results inside an instance of the CalculatorResponse .NET class.
  5. The ProcessRequestReturnObject  method returns an instance of the CalculatorResponse .NET class.
  6. The MessageClassesOrchestration publishes the CalculatorResponse message to the MessageBox (BizTalkMsgBoxDb).
  7. The response message is retrieved by the Request-Response WCF-BasicHttp or WCF-Custom Receive Location.
  8. The response message is returned to the Test Agent/Client Application.
Orchestration:
The following picture shows the structure of the test MessageClassesOrchestration.
MessageClassesOrchestration2

AsyncMessageClassesOrchestration

The following picture depicts the architecture of the AsyncMessageClassesOrchestration test case.
AsyncMessageClassesOrchestration
Message Flow:
  1. A One-Way FILE Receive Location receives a new CalculatorRequest xml document from the Client Application or Loadgen.
  2. The XML disassembler component within the XMLTransmit pipeline promotes the Method element inside the CalculatorRequest xml document. The Message Agent submits the incoming message to the MessageBox (BizTalkMsgBoxDb).
  3. The inbound request starts a new instance of the AsyncMessageClassesOrchestration. This latter uses a Direct Port receive the CalculatorRequest messages with the Method promoted property = “AsyncMessageClassesOrchestration”.
  4. The AsyncMessageClassesOrchestration invokes the ProcessRequestReturnObject method exposed by the RequestManager helper component. This object processes within a loop all the operations contained in the inbound XLANGMessage. In particular, the ProcessRequestReturnObject deserializes the CalculatorRequest message into a new instance of the CalculatorRequest .NET class, processes al the operations contained in this object and accumulates the results inside an instance of the CalculatorResponse .NET class.
  5. The ProcessRequestReturnObject  method returns an instance of the CalculatorResponse .NET class.
  6. The AsyncMessageClassesOrchestration publishes the CalculatorResponse message to the MessageBox (BizTalkMsgBoxDb).
  7. The response message is retrieved by a One-Way FILE Send Port.
  8. The response message is written to an output folder by the One-Way FILE Send Port.
Orchestration:
The following picture shows the structure of the test AsyncMessageClassesOrchestration.
AsyncMessageClassesOrchestration2
Code
The MessageClassesOrchestration receives a request message and returns a response document through a Request-Response Direct Bound Logical Port. The Filter Expression defined on the Activate Receive Shape is configured to receive all the CalculatorRequest messages which Method promoted property equals ‘MessageClassesOrchestration’. The Expression Shape called BusinessLogic contains the following code:
logHelper.WriteLine("[MessageClassesOrchestration] Request message received.");
calculatorResponse = requestManager.ProcessRequestReturnObject(requestMessage);
logHelper.WriteLine("[MessageClassesOrchestration] Request message successfully processed.");

The ProcessRequestReturnObject method invoked by the MessageClassesOrchestration expects an XLANGMessage object as inbound parameter. The code of the method retrieves the content of the message part as instance of the CalculatorRequest class. In this case the RetrieveAs method exposed by the XLANGPart class deserializes the content of the message part and creates an instance of the aforementioned class. 
ProcessRequestReturnObject Method
public CalculatorResponse ProcessRequestReturnObject(XLANGMessage message)
{
    CalculatorResponse response = new CalculatorResponse();
    Operation operation = null;
    string error = null;
    double value = 0;
    bool ok = true;
    bool succeeded = true;

    try
    {
        logHelper.WriteLine("[RequestManager][MessageClassesOrchestration] Request message received.");
        CalculatorRequest request = (CalculatorRequest)message[0].RetrieveAs(typeof(CalculatorRequest));
        if (request != null &&
            request.Operations != null &&
            request.Operations.Count > 0)
        {
            response.Status = Ok;
            for (int i = 0; i < request.Operations.Count; i++)
            {
                operation = (Operation)request.OperationsIdea;
                error = None;
                value = 0;
                succeeded = true;
                switch (operation.Operator)
                {
                    case "+":
                        value = operation.Operand1 + operation.Operand2;
                        break;
                    case "-":
                        value = operation.Operand1 - operation.Operand2;
                        break;
                    case "*":
                        value = operation.Operand1 * operation.Operand2;
                        break;
                    case "/":
                        value = operation.Operand1 / operation.Operand2;
                        break;
                    default:
                        error = string.Format(OperationUnknownErrorMessageFormat, operation.Operator);
                        response.Status = OperationsFailed;
                        ok = false;
                        succeeded = false;
                        break;
                }
                if (succeeded)
                {
                    logHelper.WriteLine(string.Format(OperationFormat, "MessageClassesOrchestration", operation.Operand1, operation.Operator, operation.Operand2, value));
                }
                else
                {
                    logHelper.WriteLine(error);
                }
                response.Results.Add(new Result(value, error));
            }
        }
        else
        {
            response.Status = RequestDoesNotContainAnyOperationsMessage;
        }
        if (ok)
        {
            logHelper.WriteLine("[RequestManager][MessageClassesOrchestration] Response message successfully processed.");
        }
        else
        {
            logHelper.WriteLine("[RequestManager][MessageClassesOrchestration] Request failed.");
        }
    }
    catch (Exception ex)
    {
        logHelper.WriteLine(string.Format("[RequestManager][MessageClassesOrchestration] {0}", ex.Message));
        response.Status = string.Format("[RequestManager][MessageClassesOrchestration] {0}", ex.Message);
        response.Results = null;
    }
    finally
    {
        message.Dispose();
    }
    return response;
}

In this case the RetrieveAs method exposed by the XLANGPart class deserializes the content of the message part and creates an instance of the CalculatorRequest class.  The code for both the CalculatorRequest and CalculatorResponse classes was obtained running the xsd.exe tool to build XML serializable objects for each schema.
xsd /c /n:"Microsoft.BizTalk.CAT.Samples.HandleXLANGMessages.BusinessLogic" "C:\Projects\HandleXLANGMessages\Schemas\CalculatorRequest.xsd"

Indeed, I customized the code returned by the xsd.exe tool to replace arrays with Lists, but anyway… not really a big deal!
CalculatorRequest Class
namespace Microsoft.BizTalk.CAT.Samples.HandleXLANGMessages.BusinessLogic
{
    [Serializable]
    [XmlType(AnonymousType = true, Namespace = "http://Microsoft.BizTalk.CAT.Samples.HandleXLANGMessages.Schemas.CalculatorRequest")]
    [XmlRoot(Namespace = "http://Microsoft.BizTalk.CAT.Samples.HandleXLANGMessages.Schemas.CalculatorRequest", IsNullable = false)]
    public partial class CalculatorRequest
    {
        #region Private Fields
        private string method;
        private List<Operation> operations = new List<Operation>();
        #endregion

        #region Public Properties
        public string Method
        {
            get
            {
                return this.method;
            }
            set
            {
                this.method = value;
            }
        }

        [XmlArrayItem("Operation", Type=typeof(Operation), IsNullable = false)]
        public List<Operation> Operations
        {
            get
            {
                return this.operations;
            }
            set
            {
                this.operations = value;
            }
        } 
        #endregion
    }

    [Serializable]
    [XmlType(AnonymousType = true, Namespace = "http://Microsoft.BizTalk.CAT.Samples.HandleXLANGMessages.Schemas.CalculatorRequest")]
    public partial class Operation
    {
        #region Private Fields
        private string op;
        private double operand1;
        private double operand2;
        #endregion

        #region Public Constructors
        public Operation()
        {
        }
        #endregion

        #region Public Properties
        public string Operator
        {
            get
            {
                return this.op;
            }
            set
            {
                this.op = value;
            }
        }

        public double Operand1
        {
            get
            {
                return this.operand1;
            }
            set
            {
                this.operand1 = value;
            }
        }

        public double Operand2
        {
            get
            {
                return this.operand2;
            }
            set
            {
                this.operand2 = value;
            }
        } 
        #endregion
    }
}
CalculatorResponse Class
namespace Microsoft.BizTalk.CAT.Samples.HandleXLANGMessages.BusinessLogic
{
    [Serializable]
    [XmlType(AnonymousType=true, Namespace="http://Microsoft.BizTalk.CAT.Samples.HandleXLANGMessages.Schemas.CalculatorResponse")]
    [XmlRoot(Namespace="http://Microsoft.BizTalk.CAT.Samples.HandleXLANGMessages.Schemas.CalculatorResponse", IsNullable=false)]
    public partial class CalculatorResponse
    {
        #region Private Fields
        private string status;
        private List<Result> results = new List<Result>();
        #endregion

        #region Public Properties
        public string Status 
        {
            get 
            {
                return this.status;
            }
            set 
            {
                this.status = value;
            }
        }
        
        [XmlArrayItem("Result", Type=typeof(Result), IsNullable=false)]
        public List<Result> Results 
        {
            get 
            {
                return this.results;
            }
            set 
            {
                this.results = value;
            }
        }
        #endregion
    }

    [Serializable]
    [XmlType(AnonymousType=true, Namespace="http://Microsoft.BizTalk.CAT.Samples.HandleXLANGMessages.Schemas.CalculatorResponse")]
    public partial class Result
    {
        #region Private Fields
        private double value;
        private string error; 
        #endregion

        #region Public Constructors
        public Result()
        {
            this.value = default(double);
            this.error = default(string);
        }

        public Result(double value, string error)
        {
            this.value = value;
            this.error = error;
        }
        #endregion

        #region Public Properties        
        public double Value
        {
            get
            {
                return this.value;
            }
            set
            {
                this.value = value;
            }
        }
        
        public string Error
        {
            get
            {
                return this.error;
            }
            set
            {
                this.error = value;
            }
        } 
        #endregion
    }
}


Response Message Assignment
To assign the content of the CalculatorResponse object returned by the ProcessRequestReturnObject to the response message, the Message Assignment Shape (see the code below) just assign the calculatorResponse object to the message. The XLANG Engine accepts and interprets this particular syntax as a call to theLoadFrom method exposed by the XLANGPart which in its turn serializes the object instance into an XML stream.
responseMessage = calculatorResponse;

Comment

Using the xsd.exe tool you can create a .NET class for each XML schema used by the solution. This approach allows to deserialize an inbound message into an instance of a custom entity class or to to serialize an object to create an XML instance. This technique allows developers to use an object oriented approach when dealing with messages as they can exploit the properties and methods exposed by classes to access and manipulate data instead of using XPath expressions. This approach is valid until the size of messages exchanged and processed by a BizTalk Application is relatively small. As the message size increases, the cost in terms of CPU and Memory usage for deserializing xml messages into objects and serializing objects into xml messages can grow significantly and this technique loses effectiveness.

CustomBtxMessageOrchestration

The following picture depicts the architecture of the CustomBtxMessageOrchestration test case.
CustomBtxMessageOrchestration
Message Flow:
  1. A WCF-BasicHttp or WCF-Custom Request-Response Receive Location receives a new CalculatorRequest xml document from the Test Agent/Client Application.
  2. The XML disassembler component within the XMLTransmit pipeline promotes the Method element inside the CalculatorRequest xml document. The Message Agent submits the incoming message to the MessageBox (BizTalkMsgBoxDb).
  3. The inbound request starts a new instance of the CustomBtxMessageOrchestration. This latter uses a Direct Port receive the CalculatorRequest messages with the Method promoted property = “CustomBtxMessageOrchestration”.
  4. The CustomBtxMessageOrchestration invokes the ProcessRequestReturnXLANGMessage method exposed by the RequestManager helper component. This object processes within a loop all the operations contained in the inbound XLANGMessage. In particular, the ProcessRequestReturnXLANGMessage uses an XmlReader object to read and process the operations and an XmlWriter and a VirtualStream objects to produce the response message.
  5. The ProcessRequestReturnXLANGMessage  method returns a new XLANGMessage.
  6. The CustomBtxMessageOrchestration publishes the CalculatorResponse message to the MessageBox (BizTalkMsgBoxDb).
  7. The response message is retrieved by the Request-Response WCF-BasicHttp or WCF-Custom Receive Location.
  8. The response message is returned to the Test Agent/Client Application.
Orchestration:
The following picture shows the structure of the test CustomBtxMessageOrchestration .
CustomBtxMessageOrchestration2

AsyncCustomBtxMessageOrchestration

The following picture depicts the architecture of the AsyncCustomBtxMessageOrchestration test case.
AsyncCustomBTXMessageOrchestration
Message Flow:
  1. A One-Way FILE Receive Location receives a new CalculatorRequest xml document from the Client Application or Loadgen.
  2. The XML disassembler component within the XMLTransmit pipeline promotes the Method element inside the CalculatorRequest xml document. The Message Agent submits the incoming message to the MessageBox (BizTalkMsgBoxDb).
  3. The inbound request starts a new instance of the AsyncCustomBtxMessageOrchestration. This latter uses a Direct Port receive the CalculatorRequest messages with the Method promoted property = “AsyncCustomBtxMessageOrchestration”.
  4. The AsyncCustomBtxMessageOrchestration invokes the ProcessRequestReturnXLANGMessage method exposed by the RequestManager helper component. This object processes within a loop all the operations contained in the inbound XLANGMessage. In particular, the ProcessRequestReturnXLANGMessage uses an XmlReader object to read and process the operations and an XmlWriter and a VirtualStream objects to produce the response message.
  5. The ProcessRequestReturnXLANGMessage  method returns a new XLANGMessage.
  6. The AsyncCustomBtxMessageOrchestration publishes the CalculatorResponse message to the MessageBox (BizTalkMsgBoxDb).
  7. The response message is retrieved by a One-Way FILE Send Port.
  8. The response message is written to an output folder by the One-Way FILE Send Port.
Orchestration:
The following picture shows the structure of the test AsyncCustomBtxMessageOrchestration.
AsyncCustomBTXMessageOrchestration2
Code
The CustomBtxMessageOrchestration receives a request message and returns a response document through a Request-Response Direct Bound Logical Port. The Filter Expression defined on the Activate Receive Shape is configured to receive all the CalculatorRequest messages which Method promoted property equals ‘CustomBtxMessageOrchestration ’. The Expression Shape called BusinessLogic contains the following code:
logHelper.WriteLine("[CustomBtxMessageOrchestration] Request message received.");
responseMessage = requestManager.ProcessRequestReturnXLANGMessage(requestMessage);
logHelper.WriteLine("[CustomBtxMessageOrchestration] Request message successfully processed.");

As the name suggests, the ProcessRequestReturnXLANGMessage method exposed by the RequestManager component receives the CalculatorRequest message as XLANGMessage input parameter and returns a XLANGMessage object containing the response document. But…wait a minute, the XLANGMessage class do not exposes any public constructor. So how can we create a brand new XLANGMessage within an helper component? Well, there’s a trick underneath and we’ll see the details in a moment. The ProcessRequestReturnXLANGMessage method adopts the same streaming approach and mostly the same code used by the ProcessRequestReturnStream method to process the incoming message and generate the response message:
  • It uses a VirtualStream and an XmlReader to read the content of the request message.
  • It uses a VirtualStream and an XmlWriter to generate the response message.
However, instead of returning a Stream it returns an XLANGMessage… or better… it returns an instance of the CustomBTXMessage class which inherits from theBTXMessage contained in the Microsoft.XLANGs.BizTalk.Engine assembly. Therefore, the XLANGMessage object returned by the ProcessRequestReturnXLANGMessage method can be directly assigned to the response message in the Expression Shape without the need to introduce a separate Message Assignment Shape in the orchestration.
The XLANGMessage indeed is an abstract class so whenever an orchestration invokes a method exposed by a business component and passes an XLANGMessage as parameter, the real type of the object is MessageWrapperForUserCode. Now the trick to create and return an XLANGMessage object as result of a method call is to create a custom class which inherits from a BizTalk-provided class that in its turn inherits from the XLANGMessage. So i chose to create a custom class calledCustomBTXMessage  that inherits from the BTXMessage class. See the code below:
CustomBTXMessage  Class
namespace Microsoft.BizTalk.CAT.Samples.HandleXLANGMessages.Utilities
{
    [Serializable]
    public sealed class CustomBTXMessage : BTXMessage
    {
        public CustomBTXMessage(string messageName, Context context)
            : base(messageName, context)
        {
            context.RefMessage(this);
        }
    }
}

Said that, the code of the ProcessRequestReturnXLANGMessage looks as follows:
ProcessRequestReturnXLANGMessage Method
public XLANGMessage ProcessRequestReturnXLANGMessage(XLANGMessage requestMessage)
{
    CustomBTXMessage customBTXMessage = null;
    XLANGMessage responseMessage = null;
    VirtualStream stream = null;
    List<Response> responseList = new List<Response>();
    string op = null;
    string status = Ok;
    string error = null;
    double operand1 = 0;
    double operand2 = 0;
    double value = 0;
    bool ok = true;
    bool succeeded = true;
    int i = 0;

    try
    {
        logHelper.WriteLine("[RequestManager][CustomBtxMessageOrchestration] Request message received.");
        using (XmlReader reader = (XmlReader)requestMessage[0].RetrieveAs(typeof(XmlReader)))
        {
            while (reader.Read() && ok)
            {
                if (reader.LocalName == "Operator" &&
                    reader.NodeType == XmlNodeType.Element)
                {
                    error = None;
                    value = 0;
                    succeeded = true;
                    op = reader.ReadElementContentAsString();
                    reader.MoveToContent();
                    operand1 = reader.ReadElementContentAsDouble();
                    reader.MoveToContent();
                    operand2 = reader.ReadElementContentAsDouble();
                    i++;
                    switch (op)
                    {
                        case "+":
                            value = operand1 + operand2;
                            break;
                        case "-":
                            value = operand1 - operand2;
                            break;
                        case "*":
                            value = operand1 * operand2;
                            break;
                        case "/":
                            value = operand1 / operand2;
                            break;
                        default:
                            error = string.Format(OperationUnknownErrorMessageFormat, op);
                            status = OperationsFailed;
                            ok = false;
                            succeeded = false;
                            break;
                    }
                    if (succeeded)
                    {
                        logHelper.WriteLine(string.Format(OperationFormat, "CustomBtxMessageOrchestration", 
                                                          operand1, op, operand2, value));
                    }
                    else
                    {
                        logHelper.WriteLine(error);
                    }
                    responseList.Add(new Response(error, value));
                }
            }
        }
        stream = new VirtualStream(bufferSize, thresholdSize);
        using (XmlWriter writer = XmlWriter.Create(stream))
        {
            writer.WriteStartDocument();
            writer.WriteStartElement("CalculatorResponse", CalculatorResponseNamespace);
            writer.WriteStartElement("Status", CalculatorResponseNamespace);
            writer.WriteString(status);
            writer.WriteEndElement();
            writer.WriteStartElement("Results", CalculatorResponseNamespace);
            for (i = 0; i < responseList.Count; i++)
            {
                writer.WriteStartElement("Result", CalculatorResponseNamespace);
                writer.WriteStartElement("Value", CalculatorResponseNamespace);
                writer.WriteString(responseListIdea.Value.ToString());
                writer.WriteEndElement();
                writer.WriteStartElement("Error", CalculatorResponseNamespace);
                writer.WriteString(responseListIdea.Error);
                writer.WriteEndElement();
                writer.WriteEndElement();
            }
            writer.WriteEndElement();
            writer.WriteEndElement();
        }
        stream.Seek(0, SeekOrigin.Begin);
        if (ok)
        {
            logHelper.WriteLine("[RequestManager][CustomBtxMessageOrchestration] 
                                                  Response message successfully processed.");
        }
        else
        {
            logHelper.WriteLine("[RequestManager][CustomBtxMessageOrchestration] 
                                                  Request failed.");
        }

    }
    catch (Exception ex)
    {
        logHelper.WriteLine(string.Format("[RequestManager][CustomBtxMessageOrchestration] {0}", ex.Message));
        stream = new VirtualStream(bufferSize, thresholdSize);
        using (XmlWriter writer = XmlWriter.Create(stream))
        {
            writer.WriteStartDocument();
            writer.WriteStartElement("CalculatorResponse", CalculatorResponseNamespace);
            writer.WriteStartElement("Status", CalculatorResponseNamespace);
            writer.WriteString(ex.Message);
            writer.WriteEndElement();
            writer.WriteEndElement();
        }
        stream.Seek(0, SeekOrigin.Begin);
    }
    finally
    {
        customBTXMessage = new CustomBTXMessage("CalculatorResponse", 
                                                 Service.RootService.XlangStore.OwningContext);
        customBTXMessage.AddPart(string.Empty, "Body");
        customBTXMessage[0].LoadFrom(stream);
        responseMessage = customBTXMessage.GetMessageWrapperForUserCode();
        if (requestMessage != null)
        {
            requestMessage.Dispose();
        }
    }
    return responseMessage; 
}

Comment

This technique is tricky and combines the performance of the streaming approach with the ability to directly return an XLANGMessage. In the second part of the article, I will present the results of the performance tests I conducted to measure and compare the latency and throughput of the design patterns discussed in this post.

Code

You can find the code for BizTalk Server 2009 here, while at the following link you can download the Visio document I used to create my diagrams! Please, let me know your feedbacks! Enjoy! :-)

No comments:

Post a Comment

Post Your Comment...