代码之家  ›  专栏  ›  技术社区  ›  epitka

如何从azure功能向azure服务总线添加消息?

  •  0
  • epitka  · 技术社区  · 6 年前

    如果我已经有了json格式的消息,那么如何向服务总线添加消息呢?我可以使用azure函数输出绑定添加消息,但在servicebusexplorer或queueexplorer中看不到任何消息属性。

    我需要重新提交大约1k条消息,消息上有一个错误,所以我将它们导出到文件中,在记事本++中修复了它,现在我创建了一个azure函数来读取文件并将其放入队列。但当我查看消息时,ServiceBusExplorer中没有显示任何消息属性。

    CSX

    #r "Newtonsoft.Json"
    
    using System.Net;
    using Microsoft.AspNetCore.Mvc;
    using Microsoft.Extensions.Primitives;
    using Newtonsoft.Json;
    using Newtonsoft.Json.Linq;
    using System;
    using System.Threading.Tasks;
    using System.Configuration;
    
    const string QueueName = "commands";
    static string FileName = "messages.json";
    
    public static async Task<string> Run(HttpRequest req, ILogger log,
                     ExecutionContext context, ICollector<string> outputSbQueue)
    {
        log.LogInformation("Starting processing messages.");
    
        var filePath = System.IO.Path.Combine(context.FunctionDirectory, FileName);
    
        log.LogInformation("Path: " + filePath);
    
        var text = File.ReadAllText(filePath);
    
        log.LogInformation("Message: " + text);
    
        JArray messages = JArray.Parse(text);
    
        log.LogInformation("Number of message: " + messages.Count);
    
        await SendMessagesAsync(messages,log,outputSbQueue);
        // return req.CreateResponse(HttpStatusCode.OK,
        //                             "Updated",
        //                             "text/plain");
        return "test";
    }
    
    static async Task SendMessagesAsync(JArray messages, ILogger log, 
    ICollector<string> outputSbQueue )
    {
        log.LogInformation("About to iterate messages");
    
        foreach (var message in messages)
        {
            log.LogInformation("Sending Message");
            outputSbQueue.Add(message.ToString());
            log.LogInformation("Sent message: " + message);
        }
    }
    

    消息.json

    [
      {
        "Body": {
          "PaymentPlanId": "2141110b-07da-46b7-a166-ffc7f9f6c5af",
          "InstallmentId": "3bd27b0d-3372-456c-856c-74e09de1413a",
          "Date": "2018-12-05T00:00:00",
          "Amount": 66.89,
          "Attempt": 0,
          "PaymentCorrelationId": "2ae7511e-706f-4d7f-b44b-9690d0fcbf38",
          "CommandId": "a2d5ae26-6289-4cca-bce0-7a1905b64378"
        },
        "ContentType": "text/plain",
        "CorrelationId": null,
        "DeadLetterSource": "commands",
        "DeliveryCount": 1,
        "EnqueuedSequenceNumber": 14684,
        "EnqueuedTimeUtc": "2018-12-06T13:22:37.131Z",
        "ExpiresAtUtc": "9999-12-31T23:59:59.9999999",
        "ForcePersistence": false,
        "IsBodyConsumed": false,
        "Label": "PayDueInstallmentCommand",
        "LockedUntilUtc": null,
        "LockToken": null,
        "MessageId": "a2d5ae26-6289-4cca-bce0-7a1905b64378",
        "PartitionKey": null,
        "Properties": {
          "BodyClrType": "SR.Domain.Commands.PayDueInstallmentCommand, SR.Domain, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null",
          "ParentId": "|Dz4Pxv65XMA=.3975a8a2_32.",
          "RootId": "Dz4Pxv65XMA=",
          "Diagnostic-Id": "|Dz4Pxv65XMA=.3975a8a2_32.1.",
          "DeadLetterReason": "NoCommandInMessage",
          "DeadLetterErrorDescription": "There was no command in the message.",
          "Test":"1"
        },
        "ReplyTo": null,
        "ReplyToSessionId": null,
        "ScheduledEnqueueTimeUtc": "2018-12-06T13:22:36.877Z",
        "SequenceNumber": 14684,
        "SessionId": null,
        "Size": 938,
        "State": 0,
        "TimeToLive": "10675199.02:48:05.4775807",
        "To": null,
        "ViaPartitionKey": null
      }
     ]
    

    函数.json

    {
      "bindings": [
        {
          "authLevel": "function",
          "name": "req",
          "type": "httpTrigger",
          "direction": "in",
          "methods": [
            "get",
            "post"
          ]
        },
        {
          "name": "$return",
          "type": "http",
          "direction": "out"
        },
        {
          "name": "outputSbQueue",
          "type": "serviceBus",
          "queueName": "deadletter",
          "connection": "ServiceBusConnectionString",
          "direction": "out"
        }
      ],
      "disabled": false
    }
    
    1 回复  |  直到 6 年前
        1
  •  0
  •   Jerry Liu Phantom    6 年前

    正如@roman提到的,组装问题很容易解决。由于您已经创建了一个v2函数(如果没有修改新函数应用程序的运行时版本,则为默认值),请在下面添加命令。

    #r "..\\bin\\Microsoft.Azure.ServiceBus.dll" 
    using Microsoft.Azure.ServiceBus;
    

    另一个问题是json模型的结构。实际上是基于 BrokeredMessage 在里面 Microsoft.ServiceBus.Messaging 而不是 Message 在里面 Microsoft.Azure.ServiceBus . 如果需要,您可能必须决定使用哪一个并重构json。 请注意,某些属性是由azure服务总线服务设置的,我们无法在新创建的消息中对其进行修改。

    举个例子 Message . 根据重构json 消息 类,包括所有可配置的属性。

    [
      {
        "Body": {
          "PaymentPlanId": "2141110b-07da-46b7-a166-ffc7f9f6c5af",
          ...
        },
        "ContentType": "text/plain",
        "Label": "MyLable",
        "MessageId": "a2d5ae26-6289-4cca-bce0-7a1905b64378",
        "ScheduledEnqueueTimeUtc": "2018-12-06T13:22:36.877Z",
        "TimeToLive": "10675199.02:48:05.4775807",
        "CorrelationId": null,
        "PartitionKey": null,
        "ReplyTo": null,
        "ReplyToSessionId": null,
        "SessionId": null,
        "To": null,
        "ViaPartitionKey": null
        "UserProperties": {
            "CustomProperty":"test",
            ...
        }
      }
     ]
    

    我们不能直接使用反序列化 JsonConvert.DeserializeObject 根据消息正文的要求 byte[] .

            foreach (var message in messages)
            {
                // Get Body first
                var body = System.Text.Encoding.UTF8.GetBytes(message["Body"].ToString());
                // Empty Body section to deserialize properties
                message["Body"] = "";
                var SBMessage = JsonConvert.DeserializeObject<Message>(message.ToString());
                SBMessage.Body = body;
                log.LogInformation("Sending Message");
                outputSbQueue.Add(SBMessage);
                log.LogInformation("Sent message: " + SBMessage.MessageId);
            }