代码之家  ›  专栏  ›  技术社区  ›  Igor Soloydenko

如何实现无触发器。NET核心控制台应用程序作为连续Azure WebJob?

  •  0
  • Igor Soloydenko  · 技术社区  · 6 年前

    TimerTrigger QueueTrigger ). 我特别关注的是 WebJobs SDK 3.x 顺便说一句

    所以对于一个无触发的WebJob(类似于Windows服务),我是否需要使用 NoAutomaticTrigger 找到一种手动启动“主”代码的方法?

    IHostedService 界面 我甚至没有尝试部署此代码,只是在本地计算机上运行,因此我担心发布过程会确认我的代码不适合当前形式的Azure WebJobs。


    using Microsoft.Azure.ServiceBus;
    using Microsoft.Extensions.Configuration;
    using Microsoft.Extensions.DependencyInjection;
    using Microsoft.Extensions.Hosting;
    
    namespace AbcCorp.Jobs
    {
        public static class Program
        {
            static async Task Main(string[] args)
            {
                var config = new ConfigurationBuilder()
                    .SetBasePath(Directory.GetCurrentDirectory())
                    .AddJsonFile("appsettings.json", optional: false, reloadOnChange: true)
                    .AddJsonFile($"appsettings.{Environment.GetEnvironmentVariable("ASPNETCORE_ENVIRONMENT")}.json", false)
                    .Build();
    
                var hostBuilder = new HostBuilder()
                    .ConfigureWebJobs(builder => { builder.AddAzureStorageCoreServices(); })
                    .ConfigureServices(serviceCollection =>
                    {
                        ConfigureServices(serviceCollection, config);
                        serviceCollection.AddHostedService<ConsoleApplication>();
                    });
    
                using (var host = hostBuilder.Build())
                    await host.RunAsync();
            }
    
            private static IServiceCollection ConfigureServices(IServiceCollection services, IConfigurationRoot configuration)
            {
                services.AddTransient<ConsoleApplication>();
                // ... more DI registrations
                return services;
            }
        }
    }
    

    控制台应用程序。反恐精英

    问题是,我希望这段代码在进程启动时只运行一次。 它将使用常规端口开始侦听服务总线事件 Microsoft.Azure.ServiceBus

    using System.Threading;
    using System.Threading.Tasks;
    using Microsoft.Extensions.Hosting;
    using AbcCorp.Internal.Microsoft.Azure.ServiceBus;
    using AbcCorp.Api.Messaging;
    
    namespace AbcCorp.Jobs
    {
        public sealed class ConsoleApplication: IHostedService
        {
            private readonly IReceiver<SubmissionNotification> _messageReceiver;
            private readonly MessageHandler _messageHandler;
    
            public ConsoleApplication(IReceiver<SubmissionNotification> messageReceiver, MessageHandler messageHandler)
            {
                _messageReceiver = messageReceiver;
                _messageHandler = messageHandler;
            }
    
            public Task StartAsync(CancellationToken cancellationToken)
            {
                _messageReceiver.StartListening(_messageHandler.HandleMessage, _messageHandler.HandleException);
                return Task.Delay(Timeout.Infinite);
            }
    
            public Task StopAsync(CancellationToken cancellationToken)
            {
                _messageReceiver.Dispose();
                return Task.CompletedTask;
            }
        }
    }
    
    0 回复  |  直到 6 年前
        1
  •  1
  •   Nathan Cooper    6 年前

    我找到了 IHostedService 抽象很有帮助,但我不喜欢他们的SDK。我发现它臃肿,难以使用。我不想为了使用大量特殊的magic Azure东西而使用大量的依赖项,当时我只想在WebJob中运行一个控制台应用程序,以后可能会将它移到其他地方。

    因此,我最终只是删除了该依赖项,从SDK中窃取了关闭代码,并编写了自己的服务主机。结果是我的Github回购协议 azure-webjob-host .您可以随意使用它,也可以随意搜索它以获取想法。我不知道,也许如果我再做一次,我会再次尝试让SDK工作,但我认为这是SDK的一个替代方案。

    基本上,我写了一个IServiceHost,和你的没有太大区别(除了 StartAsync 当东西开始时退出,而不是挂起)。然后我编写了自己的服务主机,它基本上只是一个循环:

    await _service.StartAsync(cancellationToken);
    while (!token.IsCancellationRequested){await Task.Delay(1000);}
    await _service.StopAsync(default);
    

    然后我偷了那本书 WebJobsShutdownWatcher from their repo .

    然后我创建了一个 IServiceHost 这启动了我的消息处理程序。(我使用的是Rabbit,它与触发器或azure内容无关)

    public class MessagingService : IHostedService, IDisposable
    {
        public MessagingService(ConnectionSettings connectionSettings,
            AppSubscriberSettings subscriberSettings,
            MessageHandlerTypeMapping[] messageHandlerTypeMappings,
            ILogger<MessagingService> logger)
        {
            ....
        }
    
        public async Task StartAsync(CancellationToken cancellationToken)
        {
            cancellationToken.ThrowIfCancellationRequested();
            await Task.WhenAll(subscribers.Value.Select(s => s.StartSubscriptionAsync()));
        }
    
        public async Task StopAsync(CancellationToken cancellationToken)
        {
            ...
        }
    
        public void Dispose()
        {
            ...
        }
    }
    

    然后我把这些放在一起,做成这样:

    IHostedService myService = new MyService();
    
    using (var host = new ServiceHostBuilder().HostService(myService))
    {
        await host.RunAsync(default);
    }
    
        2
  •  1
  •   Hugo Barona    6 年前

    我有一些工作人员附加到服务总线主题,我们所做的工作如下(ServiceBusClient是包含订阅客户端的自定义类):

    public override Task StartAsync(CancellationToken cancellationToken)
        {
            _serviceBusClient.RegisterOnMessageHandlerAndReceiveMessages(MessageReceivedAsync);
    
            _logger.LogDebug($"Started successfully the Import Client. Listening for messages...");
            return base.StartAsync(cancellationToken);
        }
    
    
            public void RegisterOnMessageHandlerAndReceiveMessages(Func<Message, CancellationToken, Task> ProcessMessagesAsync)
        {
            // Configure the message handler options in terms of exception handling, number of concurrent messages to deliver, etc.
            var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
            {
                // Maximum number of concurrent calls to the callback ProcessMessagesAsync(), set to 1 for simplicity.
                // Set it according to how many messages the application wants to process in parallel.
                MaxConcurrentCalls = 1,
    
                // Indicates whether MessagePump should automatically complete the messages after returning from User Callback.
                // False below indicates the Complete will be handled by the User Callback as in `ProcessMessagesAsync` below.
                AutoComplete = false
            };
    
            // Register the function that processes messages.
            SubscriptionClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions);
        }
    

    然后,您可以使用DI实例化您的服务总线客户机,并将其注入到Worker类的构造函数中。

    这里是我的定制类服务总线客户机的单例实例的初始化

    services.AddSingleton<IServiceBusClient, ServiceBusClient>((p) =>
                    {
    
                        var diagnostics = p.GetService<EventHandling>();
                        var sbc = new ServiceBusClient(
                            programOptions.Endpoint,
                            programOptions.TopicName,
                            programOptions.Subscriber,
                            programOptions.SubscriberKey);
    
                        sbc.Exception += exception => diagnostics.HandleException(exception);
    
                        return sbc;
                    });
    

    然后在这个自定义类上初始化订阅客户端

    public ServiceBusClient(
            string endpoint,
            string topicName,
            string subscriberName,
            string subscriberKey, ReceiveMode mode = ReceiveMode.PeekLock)
        {
            var connBuilder = new ServiceBusConnectionStringBuilder(endpoint, topicName, subscriberName, subscriberKey);
            var connectionString = connBuilder.GetNamespaceConnectionString();
    
            ConnectionString = connectionString;
            TopicName = topicName;
            SubscriptionName = topicName;
    
            SubscriptionClient = new SubscriptionClient(connectionString, topicName, subscriberName, mode);
        }
    
        3
  •  0
  •   kannangokul    6 年前

    你可以在这篇文章中查看@george chen的答案 How to create service bus trigger webjob?

    其中,您可以使用内置队列触发器并在其中写入消息处理程序逻辑,而不是创建接收方并注册消息处理程序。