BizTalk Utilities CV ,   Jobs ,   Code library
 
Go to the front page to continue learning about XML or select below:

Contents

ReBlogger Contents

Previous posts in Biztalk Adapters

 
 
Page 846 of 17425

Using CCR with ASP.NET

Blogger : MSDN Blogs
All posts : All posts by MSDN Blogs
Category : Biztalk Adapters
Blogged date : 2008 Jun 10

Disclaimer: While I should be somewhat knowledgable about CCR, I can claim no great familiarity with ASP.NET, so if my approach is a little off, please let me know.

How to best use CCR within ASP.NET is something that has come up a number of times. Certainly the powerful concurrency and coordination tools that CCR provides make this an obvious question.

ASP.NET provides a mechanism to run asynchronous tasks, complete with timeout support. When the asynchronous tasks all register completion the page finishes the prerender stage. To make this work with CCR I created an adapter which presents itself to ASP.NET as a classic .Net async task - using the asynchronous programming model (APM), but presents itself to the CCR programmer as a SuccessFailurePort to post a message to when async processing is complete.

A Simple Example

So, to plunge right in, here is somewhat unrealistic, but simple, example...

void Page_Load(object source, EventArgs e)
{
    TaskQueue = Global.TaskQueue;
    
    var asyncPort = base.StartAsyncTask();
    
    var resultPort = new SuccessFailurePort();
    int count = 10;
    for (int i = 0; i < count; i++)
    {
        SpawnIterator("http://wwww.microsoft.com", resultPort, DownloadUrl);
    }
    
    Activate(
        Arbiter.MultipleItemReceive(resultPort,
            count,
            (successes, exceptions) => asyncPort.Post(SuccessResult.Instance)
        )
    );
}

IEnumerator<ITask> DownloadUrl(string url, SuccessFailurePort resultPort)
{
    // Async processing to download from url.

    resultPort.Post(SuccessResult.Instance);
    yield break;
}

Breaking this down, there are several things in the preceding code snippet that aren't available in a typical ASP.NET page, what I haven't shown you (yet!) is that the page is derived from a class CcrPage which gives the page the functionality of the CCR class CcrServiceBase and a couple of extras that are specific to using CCR with ASP.NET.

  • I start by setting TaskQueue = Global.TaskQueue; It is essential to set the TaskQueue property any time you want to use CCR primitives in the page. I usually set it in Page_Load(), that way I always know it is set...
  • Where do I get Global.TaskQueue from? It is most efficient to associate the CCR Dispatcher and DispatcherQueues with the Application, rather than the page. So I have created a class, CcrHttpApplication,that inherits from HttpApplication. More on this in a bit.
  • I call base.StartAsyncTask(); – this is implemented by the base class CcrPage and internally calls the RegisterAsyncTask method, which lets ASP.NET know that this page is doing asynchronous processing. It returns a SuccessFailurePort which I post to when all the asynchronous processing is complete. This in turn lets the ASP.NET infrastructure know that the async processing is finished.
  • I use CCR primitives like SpawnIterator() and Activate(), these again are implemented in CcrPage

The iterator method DownloadUrl asynchronously downloads a page from the supplied URL and, when it has completed, posts a result to the supplied port. Note that in Page_Load() I activate a MultiItemReceive task that will execute when every instance of DownloadUrl has completed. This is a typical scatter-gather pattern that is so easy to implement using CCR.

When all the DownloadUrl calls have posted a result, that task then posts to the SuccessFailurePort initially created by the call to StartAsyncTask();

A Simple Asynchronous Task

As a small diversion, and for completeness sake, here is the actual code for DownloadUrl(). Feel free to skip ahead the CcrPage implementation if this is uninteresting...

IEnumerator<ITask> DownloadUrl(string url, SuccessFailurePort resultPort)
{
    IAsyncResult result = null;
    var completion = new Port<IAsyncResult>();

    var request = WebRequest.Create(new Uri(url));
    request.BeginGetResponse(completion.Post, null);

    yield return Arbiter.Choice(
        Arbiter.Receive(false, completion, ar => result = ar),
        OnAsyncTimeout()
    );

This demonstrates a useful CCR trick when working with APM. The delegate for the Begin() method is the Post method on a port of type Port<IAsyncResult>, this allows me to yield until the method has completed. In this example I'm using a Choice and in the second branch waiting for a timeout. The OnAsyncTimeout() method in CcrPage will execute if the page timeout fires. This allows me to prevent runaway async processing.

So far the code has initiated a request for a web page, and is waiting for it to complete, in which case result will be set the the IAsyncResult, or for the default page timeout.

    if (result == null)
    {
        // page timeout occurred...
        resultPort.Post(new TimeoutException());
        request.Abort();
        yield break;
    }

This has just checked for a timeout. If a timeout occured, this signals an exception back to the async port, aborts the web request and terminates the iterator. All being well, however, we continue below...

    WebResponse response = null;

    try
    {
        response = request.EndGetResponse(result);
    }
    catch (Exception exception)
    {
        resultPort.Post(exception);
        yield break;
    }

This now uses the IAsyncResult that was posted to the completion port and ends the web request. Assuming that this doesn't throw an exception in its own right, we can now continue below in much the same fashion and, using the APM for reading from a stream, download the entire page before posting a response.

Note: at any point in this processing, we can be interrupted by a timeout, or by a failure in the APM, and still cleanly recover.

    var stream = response.GetResponseStream();

    using (var memory = new MemoryStream())
    {
        var buffer = new byte[4096];
        var read = 0;
        do
        {
            stream.BeginRead(buffer, 0, buffer.Length, completion.Post, null);

            result = null;
            yield return Arbiter.Choice(
                Arbiter.Receive(false, completion, ar => result = ar),
                OnAsyncTimout()
            );

            if (result == null)
            {
                resultPort.Post(new TimeoutException());
                yield break;
            }

            read = stream.EndRead(result);
            memory.Write(buffer, 0, read);

        } while (read == buffer.Length);
    }

    resultPort.Post(SuccessResult.Instance);
}

CcrPage Implementation

The page used above, instead of being derived from System.Web.UI.Page, was derived from CcrPage (coming soon to a Ccr Adapter near you), which looks somewhat like the following (ok, so I've left a lot out to keep the sample smaller, but the core functionality is all here)

public class CcrPage : Page
{
    public CcrPage() { }

    public CcrPage(DispatcherQueue taskQueue)
    {
        TaskQueue = taskQueue;
    }

    protected DispatcherQueue TaskQueue;

Basic class constructors, the infamous TaskQueue field, you'll notice that being used a lot below. Using CCR without a ServiceBase class is, of course, a totally reasonable thing to do, but I find that the helper methods reduce code and increase readability.

The next section is just implementing the helper functionality from CcrServiceBase. If you are familiar with CCR programming this is all pretty self explanatory. For simplicity, I've not included all the overloads here, but all the basics are shown...

    public void Activate<T>(params T[] tasks)
        where T : ITask
    {
        foreach (T task in tasks)
        {
            TaskQueue.Enqueue(task);
        }
    }

    protected void Spawn(Handler handler)
    {
        TaskQueue.Enqueue(new Task(handler));
    }

    protected void Spawn<T0>(T0 t0, Handler<T0> handler)
    {
        TaskQueue.Enqueue(new Task<T0>(t0, handler));
    }

    protected void SpawnIterator<T0>(T0 t0, IteratorHandler<T0> handler)
    {
        TaskQueue.Enqueue(new IterativeTask<T0>(t0, handler));
    }

    protected Port<DateTime> TimeoutPort(int milliseconds)
    {
        return TimeoutPort(new TimeSpan(0, 0, 0, 0, milliseconds));
    }

    protected Port<DateTime> TimeoutPort(TimeSpan timespan)
    {
        Port<DateTime> timeoutPort = new Port<DateTime>();
        TaskQueue.EnqueueTimer(timespan, timeoutPort);
        return timeoutPort;
    }

This is where it starts to get a little more interesting, these are the methods that allow us to play well in the ASP.NET world.

    protected SuccessFailurePort StartAsyncTask()
    {
        SuccessFailurePort resultPort = new SuccessFailurePort();
        Dispatcher.AddCausality(new Causality("StartAsyncTask", resultPort));

        RegisterAsyncTask(
            new PageAsyncTask(
                OnBeginTask,
                OnEndTask,
                OnTimeoutTask,
                resultPort
            )
        );

        return resultPort;
    }

StartAsyncTask wraps the ASP.NET RegisterAsyncTask to allow us to use CCR primitives to control the lifecycle of the async processing that we want to do. The code below should demonstrate and explain how I accomplish this.

Adding a Causality to the dispatcher at this point allows me to catch any unhandled exceptions in the async processing in a clean fashion and report them back using the normal ASP.NET error mechanisms.

    
    protected Port<EmptyValue> AsyncTaskTimeoutPort = new Port<EmptyValue>();

    protected ReceiverTask OnAsyncTimeout(Handler handler)
    {
        return Arbiter.Receive(
            false,
            AsyncTaskTimeoutPort,
            delegate(EmptyValue token)
            {
                AsyncTaskTimeoutPort.Post(token);
                handler();
            }
        );
    }

    protected ReceiverTask OnAsyncTimout()
    {
        return Arbiter.Receive(
            false,
            AsyncTaskTimeoutPort,
            delegate(EmptyValue token)
            {
                AsyncTaskTimeoutPort.Post(token);
            }
        );
    }

These two helper functions make responding to page level timeouts easier in your CCR code. The simpler helper was used in the implementation of DownloadUrl() above.

    class SimpleAsyncResult : IAsyncResult
    {
        ManualResetEvent _event = new ManualResetEvent(false);
        AsyncCallback _cb;
        object _state;
        Exception _exception;

        internal SimpleAsyncResult(AsyncCallback cb, object state)
        {
            _cb = cb;
            _state = state;
        }

        public object AsyncState { get { return _state; } }

        public WaitHandle AsyncWaitHandle { get { return _event; } }

        public bool CompletedSynchronously { get { return false; } }

        public bool IsCompleted
        {
            get { return _event.WaitOne(0, false); }
        }

        internal void Complete(SuccessResult success)
        {
            InternalComplete();
        }

        internal Exception Exception { get { return _exception; } }

        internal void Complete(Exception exception)
        {
            _exception = exception;
            InternalComplete();
        }

        private void InternalComplete()
        {
            _event.Set();
            _cb.Invoke(this);
        }
    }

SimpleAsyncResult is an internal class used to help implement the APM that RegisterAsyncTask expects

    IAsyncResult OnBeginTask(object sender, EventArgs e, AsyncCallback cb, object state)
    {
        if (state == null)
        {
            throw new ArgumentNullException();
        }
        SuccessFailurePort resultPort = state as SuccessFailurePort;
        if (resultPort == null)
        {
            throw new ArgumentException();
        }

        SimpleAsyncResult ar = new SimpleAsyncResult(cb, state);

        Activate(Arbiter.Choice(resultPort, ar.Complete, ar.Complete));

        return ar;
    }

This is called at the start of the async processing and sets up the SimpleAsyncResult instance that is used to manage the lifetime of the async processing.

This ultimately activates a Choice that waits for either a SuccessResult or Exception to be posted to the SuccessFailurePort that was created in StartAsyncTask(). The branches of the choice each call different overloads of the Complete() method on the SimpleAsyncResult object.

    void OnEndTask(IAsyncResult ar)
    {
        if (ar == null)
        {
            throw new ArgumentNullException();
        }
        var sar = ar as SimpleAsyncResult;
        if (sar == null)
        {
            throw new ArgumentException();
        }
        else if (sar.IsCompleted)
        {
            if (sar.Exception != null)
            {
                throw new Exception("Failure during asynchronous processing", sar.Exception);
            }
        }
        else
        {
            throw new InvalidOperationException("OnEndTask called before the asynchronous processing has completed");
        }
    }

This handles all the various termination conditions. In the most common success case this does nothing!

    bool _asyncTimedOut;

    protected bool AsyncOperationTimedOut
    {
        get { return _asyncTimedOut; }
    }

    void OnTimeoutTask(IAsyncResult ar)
    {
        _asyncTimedOut = true;
        AsyncTaskTimeoutPort.Post(EmptyValue.SharedInstance);
    }
}

And finally, the timeout handling. This sets a flag on the class to indicate that there was a timeout and posts to an internal port (used by OnAsyncTimeout above) to inform waiting tasks that there was a timeout.

CcrHttpApplication Implementation

As I mentioned above, it is more useful to associate the Dispatcher and DispatcherQueue with the application than with a page, so I add a Global.asax to my project and instead of deriving it from System.Web.HttpApplication, I derive it from CcrHttpApplication, shown below

public class CcrHttpApplication : HttpApplication
{
    static Dispatcher _dispatcher;
    static DispatcherQueue _taskQueue;
    static object _dispatcherLock = new object();

    static public DispatcherQueue TaskQueue
    {
        get { return _taskQueue; }
    }

With all the basic initialization and property accessors out of the way, we can move onto the functionality of this class...

    protected void Initialize()
    {
        Initialize(0, GetType().Name);
    }

    protected void Initialize(int threadCount)
    {
        Initialize(threadCount, GetType().Name);
    }

    protected void Initialize(int threadCount, string threadPoolName)
    {
        if (_dispatcher == null)
        {
            lock (_dispatcherLock)
            {
                if (_dispatcher == null)
                {
                    _dispatcher = new Dispatcher(threadCount, threadPoolName);
                    _taskQueue = new DispatcherQueue("default", _dispatcher);
                }
            }
        }
    }

This creates a new Dispatcher and default DispatcherQueue. One of the Initialize() overloads should be called from Application_Start()

    protected void Shutdown()
    {
        if (_dispatcher != null)
        {
            lock (_dispatcherLock)
            {
                if (_dispatcher != null)
                {
                    _taskQueue.Dispose();
                    _dispatcher.Dispose();

                    _taskQueue = null;
                    _dispatcher = null;
                }
            }
        }
    }
}

This cleanly disposes any created Dispatcher and DispatcherQueue, and should be called from Application_End()


Read comments or post a reply to : Using CCR with ASP.NET
Page 846 of 17425

Newest posts
 

    Email TopXML