学懂C#编程:高级开发技术——深入理解发布-订阅模式(Publisher-Subscriber Pattern)的实现

一、理解 发布-订阅模式(Publish-Subscribe Pattern) 机制

      发布-订阅模式(Publish-Subscribe Pattern)是一种消息传递模式,它允许消息的发送者(发布者)和消息的接收者(订阅者)之间解耦。发布者不需要知道订阅者的存在,反之亦然。这种模式在许多场景中都非常有用,特别是在需要处理异步事件和消息传递的系统中。

      对于初学者来说,完全理解发布-订阅模式(Publish-Subscribe Pattern)有一点难度,但是通过实例学习和手动实践就能理解透彻,并能应用到实际项目开发中,首先我们通过一个简单的示例进行说明:

先定义发布者和订阅者:

public class Publisher
{
    public event EventHandler<EventArgs> EventHappened;
 
    public void RaiseEvent()
    {
        EventHappened?.Invoke(this, EventArgs.Empty);
    }
}
 
public class Subscriber
{
    public Subscriber(Publisher pub)
    {
        pub.EventHappened += OnEvent;
    }
 
    private void OnEvent(object sender, EventArgs e)
    {
        Console.WriteLine("Event handled.");
    }
}

代码解析

这段代码展示了C#中简单的发布-订阅模式(Publisher-Subscriber Pattern)的实现。下面是如何使用这个模型进行调用的步骤:

首先,我们有一个Publisher类,它声明了一个事件EventHappened,该事件是基于EventHandler<T>委托的,其中T是EventArgs,这是最基础的事件参数类型。Publisher类还包含一个方法RaiseEvent()用于触发事件。

接着,我们有一个Subscriber类,它作为事件的订阅者。在Subscriber的构造函数中,它订阅了从Publisher实例传入的EventHappened事件,并绑定了自己的处理方法OnEvent。当事件被触发时,OnEvent方法会被执行,打印出"Event handled."。

下面是具体如何调用这些类以触发事件并处理它的示例:

using System;
 
class Program
{
    static void Main(string[] args)
    {
        // 创建一个Publisher实例
        Publisher publisher = new Publisher();
 
        // 创建一个Subscriber实例,并将Publisher的事件与之绑定
        Subscriber subscriber = new Subscriber(publisher);
 
        // 触发Publisher的事件
        publisher.RaiseEvent();  // 这将导致"Event handled."被打印出来
 
        // 由于C#程序默认在控制台应用的主线程结束时关闭,无需显式调用Console.ReadLine()等待,但如果你在IDE中运行可能需要此行来保持窗口打开查看输出
        // Console.ReadLine();
    }
}

         当你运行上述Program类中的Main方法时,你会看到"Event handled."被输出到控制台。这是因为创建了Publisher和Subscriber的实例后,通过调用publisher.RaiseEvent()触发了事件,进而调用了Subscriber中注册的OnEvent方法。

二、发布-订阅模式的应用场景

发布-订阅模式(Publish-Subscribe Pattern)是一种消息传递模式,它允许消息的发送者(发布者)和消息的接收者(订阅者)之间解耦。发布者不需要知道订阅者的存在,反之亦然。这种模式在许多场景中都非常有用,特别是在需要处理异步事件和消息传递的系统中。

以下是一些实际应用发布-订阅模式的场景:

1. 事件驱动系统

在事件驱动的应用程序中,发布-订阅模式用于处理各种事件。例如,用户界面中的按钮点击事件、键盘输入事件、鼠标移动事件等。

public class EventBus
{
    private readonly Dictionary<string, List<Action<string>>> _subscriptions = new Dictionary<string, List<Action<string>>>();

    public void Publish(string eventName, string eventData)
    {
        if (_subscriptions.ContainsKey(eventName))
        {
            foreach (var handler in _subscriptions[eventName])
            {
                handler(eventData);
            }
        }
    }

    public void Subscribe(string eventName, Action<string> handler)
    {
        if (!_subscriptions.ContainsKey(eventName))
        {
            _subscriptions[eventName] = new List<Action<string>>();
        }
        _subscriptions[eventName].Add(handler);
    }
}

// 使用示例
var eventBus = new EventBus();
eventBus.Subscribe("ButtonClicked", data => Console.WriteLine($"Button clicked with data: {data}"));
eventBus.Publish("ButtonClicked", "Hello, World!");

 解释

  1. EventBus 类:

    • _subscriptions 是一个字典,键是事件名称,值是处理该事件的委托列表。
    • Publish 方法用于发布事件,它接受事件名称和事件数据(这里简化为字符串),并调用所有订阅该事件的处理程序。
    • Subscribe 方法用于订阅事件,它接受事件名称和处理程序,并将处理程序添加到相应的列表中。
  2. 使用示例:

    • 创建一个 EventBus 实例。
    • 使用 Subscribe 方法订阅 ButtonClicked 事件,并提供一个处理程序,该处理程序在事件发生时输出事件数据。
    • 使用 Publish 方法发布 ButtonClicked 事件,并传递事件数据 "Hello, World!"

2. 消息队列系统

在分布式系统中,消息队列(如 RabbitMQ、Azure Service Bus)使用发布-订阅模式来传递消息。生产者发布消息到队列,消费者订阅并处理这些消息。

// 假设使用 RabbitMQ
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
    channel.ExchangeDeclare(exchange: "logs", type: ExchangeType.Fanout);

    var message = "Hello, World!";
    var body = Encoding.UTF8.GetBytes(message);
    channel.BasicPublish(exchange: "logs", routingKey: "", basicProperties: null, body: body);
}

解释 

  1. var factory = new ConnectionFactory() { HostName = "localhost" };: 创建一个连接工厂(ConnectionFactory)实例,这个实例用于创建到RabbitMQ服务器的连接。这里,我们设置了HostNamelocalhost,表示RabbitMQ服务器运行在本地。

  2. using (var connection = factory.CreateConnection()):使用连接工厂创建到RabbitMQ服务器的连接。using关键字确保连接在使用完毕后会被自动关闭和释放。

  3. using (var channel = connection.CreateModel()):创建一个通道,它是发送和接收消息的通道。同样,using关键字确保通道在使用完毕后会被自动关闭和释放。

  4. channel.ExchangeDeclare(exchange: "logs", type: ExchangeType.Fanout);:在RabbitMQ中声明一个交换器(Exchange)。交换器用于接收生产者发送的消息并将它们路由到相应的队列。这里,我们创建了一个名为logs的交换器,类型设置为Fanout,表示这个交换器会把接收到的消息广播到所有绑定的队列。

  5. var message = "Hello, World!";:定义要发送的消息内容。

  6. var body = Encoding.UTF8.GetBytes(message);:将消息内容转换为字节数组,因为RabbitMQ的消息体必须是字节数组。

  7. channel.BasicPublish(exchange: "logs", routingKey: "", basicProperties: null, body: body);:通过通道发送消息。消息会被发送到logs交换器,routingKey是空字符串(在使用Fanout类型的交换器时,路由键会被忽略),basicProperties是消息的基本属性(这里没有设置),body是消息体。

总体来说,这个代码创建了一个到RabbitMQ服务器的连接,然后创建了一个通道,声明了一个交换器,并发送了一条消息到这个交换器。

3. 日志记录系统

在日志记录系统中,不同的组件可能需要记录不同级别或类型的日志。发布-订阅模式可以用于将日志消息发送到不同的处理程序。

public class Logger
{
    private readonly EventBus _eventBus;

    public Logger(EventBus eventBus)
    {
        _eventBus = eventBus;
    }

    public void Log(string level, string message)
    {
        _eventBus.Publish(level, message);
    }
}

// 使用示例
var eventBus = new EventBus();
var logger = new Logger(eventBus);
eventBus.Subscribe("Error", message => Console.WriteLine($"Error: {message}"));
eventBus.Subscribe("Info", message => Console.WriteLine($"Info: {message}"));

logger.Log("Error", "Something went wrong!");
logger.Log("Info", "Operation completed successfully.");

解释 

这个代码示例展示了如何使用一个简单的日志记录器(Logger)和事件总线(EventBus)系统来实现日志的记录和处理。让我们逐步解释它的工作原理和组成部分:

  1)EventBus 类

首先,假设前面提到的 EventBus 类负责管理事件的发布和订阅。它允许你订阅特定类型的事件,并在该事件发布时调用对应的处理程序(或称“处理器”)。

2) Logger 类

接着,Logger 类是一个服务层的抽象,它使用 EventBus 来记录日志。此类包含:

  • 私有字段 _eventBus:存储传入的 EventBus 实例,以便能够发布事件。
  • 构造函数:接收一个 EventBus 实例并将其赋值给 _eventBus 字段。这是依赖注入的一种形式,允许 Logger 使用外部提供的 EventBus 实例。
  • 方法 Log:接受一个日志级别(level)和一个日志消息(message),然后使用 _eventBus 的 Publish 方法发布这个日志。这样,所有订阅了该级别日志的处理器都会被触发。

3)使用示例

在使用示例中,执行了以下步骤:

  1. 创建了一个 EventBus 实例。
  2. 创建了一个 Logger 实例,将之前创建的 EventBus 实例传递给它。
  3. 使用 EventBus.Subscribe 方法订阅了两种类型的日志:“Error”和“Info”。对于每种日志,指定了一个 lambda 表达式作为日志处理程序,当日志发生时,它会接收日志消息并打印到控制台。
  4. 使用 Logger.Log 方法记录了两种级别的日志:“Error”和“Info”。这些调用最终会导致 EventBus.Publish 方法的调用,从而触发相应的日志处理程序。

总结

这个架构允许 Logger 通过 EventBus 与其他组件进行解耦合的通信。Logger 不需要知道谁处理了日志,或者日志的处理方式。这使得系统各部分之间更加灵活且易于扩展。订阅者可以自由订阅或取消订阅日志,而不会影响 Logger 的实现。

4. 实时通知系统

在实时通知系统中,如聊天应用、实时更新应用(如股票市场),发布-订阅模式用于向用户发送实时更新。

public class NotificationService
{
    private readonly EventBus _eventBus;

    public NotificationService(EventBus eventBus)
    {
        _eventBus = eventBus;
    }

    public void SendNotification(string type, string message)
    {
        _eventBus.Publish(type, message);
    }
}

// 使用示例
var eventBus = new EventBus();
var notificationService = new NotificationService(eventBus);
eventBus.Subscribe("ChatMessage", message => Console.WriteLine($"New chat message: {message}"));
eventBus.Subscribe("StockUpdate", message => Console.WriteLine($"Stock update: {message}"));

notificationService.SendNotification("ChatMessage", "Hello, everyone!");
notificationService.SendNotification("StockUpdate", "AAPL: \$150.00");

解释 

1. EventBus 类

首先,假设前面提到的 EventBus 类负责管理事件的发布和订阅。它允许你订阅特定类型的事件,并在该事件发布时触发对应的处理程序(或称“处理器”)。

2. NotificationService 类

接着,NotificationService 类是一个服务层的抽象,它使用 EventBus 来发送通知。此类包含:

  • 私有字段 _eventBus:存储传入的 EventBus 实例,以便能够发布事件。
  • 构造函数:接收一个 EventBus 实例并将其赋值给 _eventBus 字段。这是依赖注入的一种形式,允许 NotificationService 使用外部提供的 EventBus 实例。
  • 方法 SendNotification:接受一个事件类型(type)和消息(message),然后使用 _eventBus 的 Publish 方法发布这个消息。这样,所有订阅了该类型事件的处理器都会被触发。

3. 使用示例

在使用示例中,执行了以下步骤:

  1. 创建了一个 EventBus 实例。
  2. 创建了一个 NotificationService 实例,将之前创建的 EventBus 实例传递给它。
  3. 使用 EventBus.Subscribe 方法订阅了两种类型的事件:“ChatMessage”和“StockUpdate”。对于每种事件,指定了一个 lambda 表达式作为事件处理程序,当事件发生时,它会接收消息并打印到控制台。
  4. 通过 NotificationService 的 SendNotification 方法发送了两种事件:“ChatMessage”和“StockUpdate”。这些调用最终会导致 EventBus.Publish 方法的调用,从而触发相应的事件处理程序。

总结

这个架构允许NotificationService通过EventBus与其他组件进行解耦合的通信。NotificationService不需要知道谁处理了事件,或者事件的处理方式。这使得系统各部分之间更加灵活且易于扩展。订阅者可以自由订阅或取消订阅事件,而不会影响NotificationService的实现。

5. 插件系统

在插件系统中,发布-订阅模式用于允许插件订阅特定事件并在事件发生时执行自定义逻辑。

public class PluginManager
{
    private readonly EventBus _eventBus;

    public PluginManager(EventBus eventBus)
    {
        _eventBus = eventBus;
    }

    public void RegisterPlugin(string eventName, Action<object> handler)
    {
        _eventBus.Subscribe(eventName, handler);
    }
}

// 使用示例
var eventBus = new EventBus();
var pluginManager = new PluginManager(eventBus);
pluginManager.RegisterPlugin("FileSaved", data => Console.WriteLine($"File saved: {data}"));
eventBus.Publish("FileSaved", "document.txt");

解释 

这个代码示例展示了如何使用一个简单的插件管理器(PluginManager)和事件总线(EventBus)系统来实现插件的注册和事件的处理。让我们逐步解释它的工作原理和组成部分:

1. EventBus 类

首先,假设前面提到的 EventBus 类负责管理事件的发布和订阅。它允许你订阅特定类型的事件,并在该事件发布时调用对应的处理程序(或称“处理器”)。

2. PluginManager 类

接着,PluginManager 类是一个服务层的抽象,它使用 EventBus 来注册插件和设置事件处理程序。此类包含:

  • 私有字段 _eventBus:存储传入的 EventBus 实例,以便能够注册插件和设置事件处理程序。
  • 构造函数:接收一个 EventBus 实例并将其赋值给 _eventBus 字段。这是依赖注入的一种形式,允许 PluginManager 使用外部提供的 EventBus 实例。
  • 方法 RegisterPlugin:接受一个事件名称(eventName)和一个处理程序(handler),并调用 _eventBus.Subscribe 方法来注册该处理程序以响应特定的事件。

3. 使用示例

在使用示例中,执行了以下步骤:

  1. 创建了一个 EventBus 实例。
  2. 创建了一个 PluginManager 实例,将之前创建的 EventBus 实例传递给它。
  3. 使用 PluginManager.RegisterPlugin 方法注册了一个插件,该插件对 "FileSaved" 事件感兴趣,并提供了一个 lambda 表达式作为事件处理程序。当 "FileSaved" 事件发生时,它会接收数据并在控制台上打印一条消息。
  4. 使用 EventBus.Publish 方法发布了一个 "FileSaved" 事件,并提供了 "document.txt" 作为事件数据。这将导致之前注册的事件处理程序被调用,打印一条消息到控制台。

总结

这个架构允许 PluginManager 通过 EventBus 与其他插件进行解耦合的通信。PluginManager 不需要知道谁处理了事件,或者事件的处理方式。这使得系统各部分之间更加灵活且易于扩展。插件可以自由地注册或取消注册事件,而不会影响 PluginManager 的实现。

总结

发布-订阅模式在许多场景中都非常有用,特别是在需要处理异步事件和消息传递的系统中。通过使用这种模式,可以实现组件之间的解耦,提高系统的灵活性和可扩展性。

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-07-12 14:56:03       67 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-07-12 14:56:03       72 阅读
  3. 在Django里面运行非项目文件

    2024-07-12 14:56:03       58 阅读
  4. Python语言-面向对象

    2024-07-12 14:56:03       69 阅读

热门阅读

  1. C++:右值引用

    2024-07-12 14:56:03       22 阅读
  2. Xcode Playgrounds:探索Swift编程的交互式乐园

    2024-07-12 14:56:03       22 阅读
  3. Okhttp实现原理

    2024-07-12 14:56:03       15 阅读
  4. 2713. 矩阵中严格递增的单元格数

    2024-07-12 14:56:03       20 阅读
  5. global::System.Runtime.InteropServices.DllImport

    2024-07-12 14:56:03       19 阅读