0 Comments

In this post we will use NetMQ to build a solution where multiple clients can run commands on the server and the server dynamically created a worker for each command.

Background

ZeroMQ and its .NET port NetMQ are interesting technologies. They seem to have a rather smallish but very enthusiastic group of users and the “scene” gives similar vibes as the Redis community.

ZeroMQ is a technology for adding distributed messaging into your system. Pub & sub, reply & request and other types of communication patterns are available. ZeroMQ doesn’t require a server installation, so it’s a library instead of full blown server solution. For communication, you can use TCP, inproc and other techniques.

Dynamic workers

We needed to use NetMQ in a situation where there’s multiple clients running commands on the server. The idea was that the server would spin up a worker for each client request. Even though ZeroMQ’s documentation is good, finding an example with dynamic workers turned out cumbersome.

We ended up using the following topology:

Client: RequestSocket

Server: RouterSocket (TCP 5555) – DealerSocket (TCP 5556) & Poller

Worker: DealerSocket

Creating the server

The server has a frontend for the requests coming from the client. It also has the backend for communicating with the workers. Poller is used to handle the messages:

Console.WriteLine("Starting server");
using (var front = new RouterSocket())
using (var back = new DealerSocket())
{
    front.Bind("tcp://localhost:5555");
    back.Bind("tcp://localhost:5556");

    var poller = new NetMQPoller();
    poller.Add(front);
    poller.Add(back);

	front.ReceiveReady += (sender, eventArgs) => {...};
	back.ReceiveReady += (sender, eventArgs) => {...};

	poller.Run();
}

Creating the workers

When the server’s frontend receives a message, we want to spin up a new worker. In this example we use ThreadPool to create the worker:

front.ReceiveReady += (sender, eventArgs) =>
{
    var mqMessage = eventArgs.Socket.ReceiveMultipartMessage(3);

    var id = mqMessage.First;
    var content = mqMessage[2].ConvertToString();

    Console.WriteLine("Front received " + content);

    ThreadPool.QueueUserWorkItem(context =>
    {
		// The worker
		// Parameters are available from the context.
        var context = (Tuple<NetMQFrame, string>) context;

        var clientId = context.Item1;
        var message = context.Item2;

		// Run the command
        Thread.Sleep(TimeSpan.FromSeconds(3));

		// Send message to server's backend which then will return the reply to the client
        using (var workerConnection = new DealerSocket())
        {
            workerConnection.Connect("tcp://localhost:5556");

            var messageToClient = new NetMQMessage();
            messageToClient.Append(clientId);
            messageToClient.AppendEmptyFrame();
            messageToClient.Append("hello from worker");

            workerConnection.SendMultipartMessage(messageToClient);
        }

    }, Tuple.Create(id, content));
};

Returning the message to client

As we can see from the code above, the worker message sends the reply to the server’s backend. Only thing left is to route the reply back to the client:

                    back.ReceiveReady += (sender, eventArgs) =>
                    {
                        Console.WriteLine("Back received message, route to client");
                        var mqMessage = eventArgs.Socket.ReceiveMultipartMessage();
                        front.SendMultipartMessage(mqMessage);
                    };

The client

Our client uses RequestSocket to call the server’s frontend. We use a blocking call, so we wait for the server (the worker) to reply:

using (var client = new RequestSocket())
{
    client.Connect("tcp://localhost:5555");
    client.SendFrame("hello from client");
    var returned = client.ReceiveFrameString();
    Console.WriteLine(i1.ToString() + ": back at client " + returned);
}

Conclusion

This post shows one solution for spinning up worker tasks (threads) dynamically using NetMQ. Even though NetMQ only includes few basic concepts, the concepts are so flexible that it’s quite that there’s many other ways to handle this situation.

0 Comments

We’ve been creating a system where we need to download and use NuGet packages dynamically, runtime. To handle this, we use NuGet.Core.

Using NuGet.Core

NuGet.Core contains the basic functionality for installing, removing and updating packages. The main classes when dealing with NuGet.Core are PackageRepositoryFactoryand PackageManager. PackageRepositoryFactory creates the “connection” into your NuGet repository and PackageManager is used to install the packages. Here’s an example code which covers the following situation:

  • We have a console app
  • We use local NuGet repository (file system)
            var repo = PackageRepositoryFactory.Default.CreateRepository("file://C:/temp/packages");
            var packageManager = new PackageManager(repo, _installLocation);
			
            var package = repo.FindPackage("mypackage", SemanticVersion.Parse("1.0.0.0"));
			
			packageManager.PackageInstalled += (sender, eventArgs) =>
            {
                var fileRoot = System.IO.Path.GetDirectoryName(Path.Combine(eventArgs.InstallPath, eventArgs.Package.AssemblyReferences.First().Path));
                Console.WriteLine(fileRoot);
            };
			
			packageManager.InstallPackage(package, false, true, true);

PackageManager’s even PackageInstalled usually causes some grief because it is raised only when package is actually installed: If it’s already installed, PackageInstalled-event is skipped.

Using classes from NuGet runtime

We use MEF to actually execute the code inside the NuGet packages:

  1. Make sure that the packages contains classes which implement known interfaces.
  2. Use DirectoryCatalog to initialize the MEF container.
  3. Use GetExportedValues to get the implementing classes.

For example:

packageManager.PackageInstalled += (sender, eventArgs) =>
{
    var fileRoot = System.IO.Path.GetDirectoryName(Path.Combine(eventArgs.InstallPath, eventArgs.Package.AssemblyReferences.First().Path));

    if (fileRoot == null)
        return;

    var catalog = new AggregateCatalog(
        new DirectoryCatalog(fileRoot, "*.dll"));

    var container = new CompositionContainer(catalog);

    var activities = container.GetExportedValues<IActivity>();
    foreach (var activity in activities)
    {
		activity.Run();
    }
};