In this recipe, we will be retrieving data from the database in a threaded environment, making sure that it is the most current data possible.
We will be using the NuGet
Package Manager to install the Entity Framework 4.1 assemblies.
The package installer can be found at http://nuget.org.
We will also be using a database for connecting to the data and updating it.
Open the Improving Transaction Scope solution in the included source code examples.
using System.Data.Entity; using System.Linq; using System.Threading; using BusinessLogic; using DataAccess; using DataAccess.Database; using Microsoft.VisualStudio.TestTools.UnitTesting; using Test.Properties; namespace Test { [TestClass] public class MultiThreadedOjectsQueriesTests { private IBlogRepository repo1 = null; private IBlogRepository repo2 = null; private string _changedInThread; [TestMethod] public void ShouldGetMostRecentDataFromDatabaseAfterAnUpdateFromAnotherThread() { //Arrange Database.SetInitializer(new Initializer()); repo1 = RepositoryFactory.Get(Settings.Default.Connection); //Act Thread thread = new Thread(UpdateFirstBlog); var item = repo1.Set<Blog>().First(); Assert.IsNotNull(item); thread.Start(); while (thread.ThreadState == ThreadState.Running) { Thread.Sleep(1); } item = repo1.CurrentSet<Blog>().First(); //Assert Assert.AreEqual(_changedInThread, item.Title); } private void UpdateFirstBlog() { repo2 = RepositoryFactory.Get(Settings.Default.Connection); var item = repo2.Set<Blog>().First(); _changedInThread = "Changed in Thread 2"; item.Title = _changedInThread; repo2.SaveChanges(); } } }
Blog
to the BusinessLogic
project with the following code:using System; namespace BusinessLogic { public class Blog { public int Id { get; set; } public DateTime Creationdate { get; set; } public string ShortDescription { get; set; } public string Title { get; set; } public double Rating { get; set; } } }
BlogMapping
to the DataAccess
project with the following code:using System.ComponentModel.DataAnnotations; using System.Data.Entity.ModelConfiguration; using BusinessLogic; namespace DataAccess.Mappings { public class BlogMapping : EntityTypeConfiguration<Blog> { public BlogMapping() { this.ToTable("Blogs"); this.HasKey(x => x.Id); this.Property(x => x.Id).HasDatabaseGeneratedOption(DatabaseGeneratedOption.Identity).HasColumnName("BlogId"); this.Property(x => x.Title).IsRequired().HasMaxLength(250); this.Property(x => x.Creationdate).HasColumnName("CreationDate").IsRequired(); this.Property(x => x.ShortDescription).HasColumnType("Text").IsMaxLength().IsOptional().HasColumnName("Description"); } } }
BlogContext
to the DataAccess
project with the following code:using System; using System.Data.Entity; using System.Data.Entity.Infrastructure; using System.Data.Objects; using System.Linq; using BusinessLogic; using DataAccess.Mappings; namespace DataAccess { public class BlogContext : DbContext, IUnitOfWork { public BlogContext(string connectionString) : base(connectionString) { } protected override void OnModelCreating(DbModelBuilder modelBuilder) { modelBuilder.Configurations.Add(new BlogMapping()); base.OnModelCreating(modelBuilder); } public IQueryable<T> Find<T>() where T : class { return this.Set<T>(); } public IQueryable<T> CurrentFind<T>() where T : class { var context = (IObjectContextAdapter) this; var query = context.ObjectContext.CreateObjectSet<T>(); query.MergeOption = MergeOption.OverwriteChanges; return query; } public void Refresh() { this.ChangeTracker.Entries().ToList().ForEach(x=>x.Reload()); } public void Commit() { this.SaveChanges(); } } }
using System; using System.Data.Entity; using System.Linq; using Logic; namespace DataAccess { public class BlogRepository : IBlogRepository { private readonly IUnitOfWork _context; public BlogRepository(IUnitOfWork context) { _context = context; } public IQueryable<T> Set<T>() where T : class { return _context.Find<T>(); } public IQueryable<T> CurrentSet<T>() where T : class { return _context.CurrentFind<T>(); } public void RollbackChanges() { _context.Refresh(); } public void SaveChanges() { try { _context.Commit(); } catch (Exception) { RollbackChanges(); throw; } } } }
DataAccess
project with the following code:using System.Collections.Generic; using System.Configuration; using System.Threading; namespace DataAccess { public class RepositoryFactory { private static Dictionary<string, IBlogRepository> repositories = new Dictionary<string, IBlogRepository>(); public static IBlogRepository Get(string connectionString) { var id = new RepositoryIdentifier(Thread.CurrentThread, connectionString); if(!repositories.ContainsKey(id)) { //This would more than likely not new up the blog //repository but supply it from an IoC implementation. repositories.Add(id, new BlogRepository(new BlogContext(connectionString))); } return repositories[id]; } public static void Dispose(string connectionString) { var id = new RepositoryIdentifier(Thread.CurrentThread, connectionString); if (!repositories.ContainsKey(id)) { repositories.Remove(id); } } private class RepositoryIdentifier { private readonly Thread _currentThread; private readonly string _connectionString; public RepositoryIdentifier(Thread currentThread, string connectionString) { _currentThread = currentThread; _connectionString = connectionString; } public override string ToString() { return _currentThread.ManagedThreadId + _connectionString; } public static implicit operator string(RepositoryIdentifier r) { return r.ToString(); } } } }
Our test defines the problem that we are trying to solve as we retrieve an object from the database into the context. We then update it from another context, and retrieve it again. We want the most current values possible on the last retrieve.
We add our Blog
object that we will be interacting with, the database-specific mappings, and the context, so we can retrieve and update data from the database.
We then add a current Set
and find
method to the context and the unit of work interface. These methods will allow us to explicitly bypass the entity cache and get the most current data from the database. This is essential for us to allow for the concurrent processing, without the data becoming stale.
The validity of the data in the system is the key to making any system function correctly. There are many ways to handle concurrency issues, and the following sections give details on those solutions:
The Client wins concurrency conflict resolution says that the data changed by the client application is the best version of the truth and overwrites the changes in the data store to match the data in the client. This can be useful for an administrative tool that needs to fix data problems, but is dangerous if set for all applications.
The Store wins concurrency conflict resolution overwrites the data on the client entity with the data on the store. This not only ensures that the client entity is working on the most current data, but also requires the changes to the entity be made again. This may be a good solution for an application with a UI, where the user can redo changes or approve a programmatic redo, but doesn't work in automated service situations.
If neither of the previous two options fit your needs, then you can create your own custom solution. This would be hooked in at the point of the concurrency violation exception in Entity Framework, but you could then handle that in any way that you would like. This takes much more management, and more logic to be coded into the DataAccess
layer, but is far less likely to have some of the fit issues of the previous two examples, because it can be tailored to your exact business needs.