C# 不用lock写一个多线程程序

多线程并发

    当一段代码有可能被不止一个线程同时访问时,且存在共享资源(变量、文件句柄等),可能出现并发冲突。发生并发冲突时如果不加锁,程序的行为是不可预测的。而加锁本身又是一件麻烦事,弄不好会出现死锁,死锁时程序卡在那既没有异常也没有日志,找问题都无从下手。

    C#中通常会使用 lock 来加锁,这里应严格避免锁静态对象(如 lock(type)),避免 lock(this),这种锁的粒度都太大,容易出现死锁。

多线程程序不加锁的方法

第一,bool 类型是线程安全的,不需要锁

第二,整形, DateTime 类型可以用Interlocked

long lastTicks= DateTime.Now.Ticks;

private void multiThreadMethod()
{
    var l = Interlocked.Read(ref lastTicks);
    var lastTime = new DateTime(l);//读取上一时刻.
    //do work...

    Interlocked.Exchange(ref lastTicks, DateTime.Now.Ticks);//更新上一时刻.
}

 第三,多使用ConcurrentQuere, ConcurrentDictionary

 一个典型的数采驱动程序

    数采驱动通常需要做 接收-处理-转发 三步,为避免阻塞接收,接收后应该入队,由单独的线程完成处理、转发工作。可以设计两个类Driver, MessageHandler,Driver接受数据,MessageHandler入队处理后返回给Driver,再有Driver发送出去。

    Driver代码要关键要保持与数据源的链接,断开后能够自动重连,这部分大概率要有bool 变量在表示链接状态,有lastConnect来记录上次连接成功/数据更新的时间,以便在连接断开或数据不动多长时间后自动重连,收到到数据后入队到MessageHandler处理,并把MessageHandler返回的数据发送出去。

    下面的MessageHandler代码中有三个关键点,

  • 用ConcurrentQuere,出队/入队时避免使用lock;
  • Thread的标准写法,即有一个quitFlag 布尔变量来控制线程的退出,因为线程只有在无事可做时才能退出,不要试图从外部让线程退出;
  • Close 线程退出后,还要把队列中未处理的部分处理完。
public class MessageHandler
{
	ConcurrentQueue<string> queue;
	Thread thread;
	bool quitFlag = false;
	const int BATCH_SIZE = 100;//每次连续发送数据的数量.

	public MessageHandler()
	{
		queue = new ConcurrentQueue<string>();
	}

	public event Action<List<MyDataType>> OnDataArrive;        //有新数据.

	public void Enqueue(string msg)
	{
		queue.Enqueue(msg);
	}

	public void Open()
	{
		if (thread == null)
		{
			quitFlag = false;
			thread = new Thread(doWork);
			thread.Start();
		}
	}

	private void doWork()
	{
		while (!quitFlag)
		{
			int i = 0;
			List<MyDataType> datas = new List<MyDataType>();
			while (queue.Count > 0 && i < BATCH_SIZE)
			{
				if (queue.TryDequeue(out string msg))
				{
					doWorkBody(msg, datas);
					i++;
				}
			}
			if (datas.Count > 0)
			{
				OnDataArrive?.Invoke(datas);
			}
			Thread.Sleep(50);
		}
	}
	private void doWorkBody(string msg, List<MyDataType> datas)
	{
		try
		{
			var dataPackage = JsonConvert.DeserializeObject<DataPackage>(msg);
			foreach (var dataItem in dataPackage.Data)
			{
				datas.Add(new MyDataType()
				{
					//从MQTT payload中得到数据
				});
			}
		}
		catch(Exception ex)
		{
			Logger.Error(ex, $"解析 {msg} 出错", logSource);
		}
	}

	public void Close()
	{
		//退出线程
		quitFlag = true;
		if (thread != null)
		{
			thread.Join();
			thread = null;
		}
		List<MyDataType> datas = new List<MyDataType>();
		while (queue.Count > 0)
		{
			if (queue.TryDequeue(out string msg))
			{
				doWorkBody(msg, datas);
			}
		}//处理最后一批数据
		if (datas.Count > 0)
		{
			OnDataArrive?.Invoke(datas);
		}
	}
}

    

相关推荐

  1. C# 不用lock一个线程序

    2024-05-15 13:06:03       13 阅读
  2. c#线 使用lock

    2024-05-15 13:06:03       16 阅读
  3. 完成一个程序,谈谈Rust线并行算法的体会

    2024-05-15 13:06:03       17 阅读
  4. C++线:unique_lock源码分析与使用详解(六)

    2024-05-15 13:06:03       15 阅读
  5. linux c线分段读取一个文件

    2024-05-15 13:06:03       31 阅读
  6. C++ 线

    2024-05-15 13:06:03       26 阅读
  7. C++ 线

    2024-05-15 13:06:03       17 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-05-15 13:06:03       16 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-05-15 13:06:03       16 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-05-15 13:06:03       15 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-05-15 13:06:03       18 阅读

热门阅读

  1. 韵搜坊 -- 前端整合Axios(联调后端)

    2024-05-15 13:06:03       14 阅读
  2. uniapp vu3 scroll-view 滚动到指定位置

    2024-05-15 13:06:03       10 阅读
  3. 华为OD笔试题:API 集群负载统计

    2024-05-15 13:06:03       9 阅读
  4. 河南省市政给排水乙级资质申请费用大揭秘

    2024-05-15 13:06:03       15 阅读
  5. C++ QT设计模式:解释器模式

    2024-05-15 13:06:03       13 阅读
  6. 项目dev打包报错 Cannot find module ‘node:util‘

    2024-05-15 13:06:03       9 阅读
  7. vue和react的区别

    2024-05-15 13:06:03       13 阅读
  8. 动态IP的应用场景

    2024-05-15 13:06:03       12 阅读
  9. Rust语言内部运行原理介绍

    2024-05-15 13:06:03       10 阅读