Skip to content

Commit 4a93c7a

Browse files
dpwdecVisualBeanUlrikSandberggokerakcDec Kolakowski
authored
feat(bindings): add SQS AWS Bindings (LEGO#113)
Co-authored-by: Alex Wichmann <VisualBean@users.noreply.github.com> Co-authored-by: UlrikSandberg <UJSandberg@gmail.com> Co-authored-by: Alex W. Carlsen <alekcarlsen@gmail.com> Co-authored-by: Goker Akce <gokerakce@outlook.com> Co-authored-by: Goker Akce <goker.akce@justeattakeaway.com> Co-authored-by: Dec Kolakowski <dec.kolakowski@justeattakeaway.com>
1 parent d44efb0 commit 4a93c7a

21 files changed

Lines changed: 1056 additions & 10 deletions

src/LEGO.AsyncAPI.Bindings/BindingsCollection.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
// Copyright (c) The LEGO Group. All rights reserved.
22

3+
using LEGO.AsyncAPI.Bindings.Sqs;
4+
35
namespace LEGO.AsyncAPI.Bindings
46
{
57
using System;
@@ -67,5 +69,11 @@ public static TCollection Add<TCollection, TItem>(
6769
new PulsarServerBinding(),
6870
new PulsarChannelBinding(),
6971
};
72+
73+
public static IEnumerable<IBindingParser<IBinding>> Sqs => new List<IBindingParser<IBinding>>
74+
{
75+
new SqsChannelBinding(),
76+
new SqsOperationBinding(),
77+
};
7078
}
7179
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
namespace LEGO.AsyncAPI.Bindings.Sqs
2+
{
3+
using System;
4+
using System.Collections.Generic;
5+
using LEGO.AsyncAPI.Models.Interfaces;
6+
using LEGO.AsyncAPI.Writers;
7+
8+
public class Identifier : IAsyncApiExtensible
9+
{
10+
public string Arn { get; set; }
11+
12+
public string Name { get; set; }
13+
14+
public IDictionary<string, IAsyncApiExtension> Extensions { get; set; } = new Dictionary<string, IAsyncApiExtension>();
15+
16+
public void Serialize(IAsyncApiWriter writer)
17+
{
18+
if (writer is null)
19+
{
20+
throw new ArgumentNullException(nameof(writer));
21+
}
22+
23+
writer.WriteStartObject();
24+
writer.WriteOptionalProperty("arn", this.Arn);
25+
writer.WriteOptionalProperty("name", this.Name);
26+
writer.WriteExtensions(this.Extensions);
27+
writer.WriteEndObject();
28+
}
29+
}
30+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
namespace LEGO.AsyncAPI.Bindings.Sqs
2+
{
3+
using System;
4+
using System.Collections.Generic;
5+
using LEGO.AsyncAPI.Models.Interfaces;
6+
using LEGO.AsyncAPI.Writers;
7+
8+
public class Policy : IAsyncApiExtensible
9+
{
10+
/// <summary>
11+
/// An array of statement objects, each of which controls a permission for this topic.
12+
/// </summary>
13+
public List<Statement> Statements { get; set; }
14+
15+
public IDictionary<string, IAsyncApiExtension> Extensions { get; set; } = new Dictionary<string, IAsyncApiExtension>();
16+
17+
public void Serialize(IAsyncApiWriter writer)
18+
{
19+
if (writer is null)
20+
{
21+
throw new ArgumentNullException(nameof(writer));
22+
}
23+
24+
writer.WriteStartObject();
25+
writer.WriteOptionalCollection("statements", this.Statements, (w, t) => t.Serialize(w));
26+
writer.WriteExtensions(this.Extensions);
27+
writer.WriteEndObject();
28+
}
29+
}
30+
}
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
namespace LEGO.AsyncAPI.Bindings.Sqs
2+
{
3+
using System;
4+
using System.Collections.Generic;
5+
using LEGO.AsyncAPI.Models;
6+
using LEGO.AsyncAPI.Models.Interfaces;
7+
using LEGO.AsyncAPI.Writers;
8+
using Extensions;
9+
using LEGO.AsyncAPI.Readers;
10+
using LEGO.AsyncAPI.Readers.ParseNodes;
11+
12+
public class Queue : IAsyncApiExtensible
13+
{
14+
/// <summary>
15+
/// The name of the queue. When an SNS Operation Binding Object references an SQS queue by name, the identifier should be the one in this field.
16+
/// </summary>
17+
public string Name { get; set; }
18+
19+
/// <summary>
20+
/// Is this a FIFO queue?
21+
/// </summary>
22+
public bool FifoQueue { get; set; }
23+
24+
/// <summary>
25+
/// The number of seconds to delay before a message sent to the queue can be received. used to create a delay queue.
26+
/// </summary>
27+
public int? DeliveryDelay { get; set; }
28+
29+
/// <summary>
30+
/// The length of time, in seconds, that a consumer locks a message - hiding it from reads - before it is unlocked and can be read again.
31+
/// </summary>
32+
public int? VisibilityTimeout { get; set; }
33+
34+
/// <summary>
35+
/// Determines if the queue uses short polling or long polling. Set to zero the queue reads available messages and returns immediately. Set to a non-zero integer, long polling waits the specified number of seconds for messages to arrive before returning.
36+
/// </summary>
37+
public int? ReceiveMessageWaitTime { get; set; }
38+
39+
/// <summary>
40+
/// How long to retain a message on the queue in seconds, unless deleted.
41+
/// </summary>
42+
public int? MessageRetentionPeriod { get; set; }
43+
44+
/// <summary>
45+
/// Prevent poison pill messages by moving un-processable messages to an SQS dead letter queue.
46+
/// </summary>
47+
public RedrivePolicy RedrivePolicy { get; set; }
48+
49+
/// <summary>
50+
/// The security policy for the SQS Queue
51+
/// </summary>
52+
public Policy Policy { get; set; }
53+
54+
/// <summary>
55+
/// Key-value pairs that represent AWS tags on the topic.
56+
/// </summary>
57+
public Dictionary<string, string> Tags { get; set; }
58+
59+
public IDictionary<string, IAsyncApiExtension> Extensions { get; set; } = new Dictionary<string, IAsyncApiExtension>();
60+
61+
public void Serialize(IAsyncApiWriter writer)
62+
{
63+
if (writer is null)
64+
{
65+
throw new ArgumentNullException(nameof(writer));
66+
}
67+
68+
writer.WriteStartObject();
69+
writer.WriteRequiredProperty("name", this.Name);
70+
writer.WriteOptionalProperty("fifoQueue", this.FifoQueue);
71+
writer.WriteOptionalProperty("deliveryDelay", this.DeliveryDelay);
72+
writer.WriteOptionalProperty("visibilityTimeout", this.VisibilityTimeout);
73+
writer.WriteOptionalProperty("receiveMessageWaitTime", this.ReceiveMessageWaitTime);
74+
writer.WriteOptionalProperty("messageRetentionPeriod", this.MessageRetentionPeriod);
75+
writer.WriteOptionalObject("redrivePolicy", this.RedrivePolicy, (w, p) => p.Serialize(w));
76+
writer.WriteOptionalObject("policy", this.Policy, (w, p) => p.Serialize(w));
77+
writer.WriteOptionalMap("tags", this.Tags, (w, t) => w.WriteValue(t));
78+
writer.WriteExtensions(this.Extensions);
79+
writer.WriteEndObject();
80+
}
81+
}
82+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
namespace LEGO.AsyncAPI.Bindings.Sqs
2+
{
3+
using System;
4+
using LEGO.AsyncAPI.Models.Interfaces;
5+
using LEGO.AsyncAPI.Writers;
6+
using System.Collections.Generic;
7+
8+
public class RedrivePolicy : IAsyncApiExtensible
9+
{
10+
/// <summary>
11+
/// Prevent poison pill messages by moving un-processable messages to an SQS dead letter queue.
12+
/// </summary>
13+
public Identifier DeadLetterQueue { get; set; }
14+
15+
/// <summary>
16+
/// The number of times a message is delivered to the source queue before being moved to the dead-letter queue.
17+
/// </summary>
18+
public int? MaxReceiveCount { get; set; }
19+
20+
public IDictionary<string, IAsyncApiExtension> Extensions { get; set; } = new Dictionary<string, IAsyncApiExtension>();
21+
22+
public void Serialize(IAsyncApiWriter writer)
23+
{
24+
if (writer is null)
25+
{
26+
throw new ArgumentNullException(nameof(writer));
27+
}
28+
29+
writer.WriteStartObject();
30+
writer.WriteRequiredObject("deadLetterQueue", this.DeadLetterQueue, (w, q) => q.Serialize(w));
31+
writer.WriteOptionalProperty("maxReceiveCount", this.MaxReceiveCount);
32+
writer.WriteExtensions(this.Extensions);
33+
writer.WriteEndObject();
34+
}
35+
}
36+
}
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
namespace LEGO.AsyncAPI.Bindings.Sqs
2+
{
3+
using System;
4+
using LEGO.AsyncAPI.Readers.ParseNodes;
5+
using LEGO.AsyncAPI.Writers;
6+
7+
/// <summary>
8+
/// This object contains information about the channel representation in SQS.
9+
/// </summary>
10+
public class SqsChannelBinding : ChannelBinding<SqsChannelBinding>
11+
{
12+
/// <summary>
13+
/// A definition of the queue that will be used as the channel.
14+
/// </summary>
15+
public Queue Queue { get; set; }
16+
17+
/// <summary>
18+
/// A definition of the queue that will be used for un-processable messages.
19+
/// </summary>
20+
public Queue DeadLetterQueue { get; set; }
21+
22+
public override string BindingKey => "sqs";
23+
24+
protected override FixedFieldMap<SqsChannelBinding> FixedFieldMap => new()
25+
{
26+
{ "queue", (a, n) => { a.Queue = n.ParseMapWithExtensions(this.queueFixedFields); } },
27+
{ "deadLetterQueue", (a, n) => { a.DeadLetterQueue = n.ParseMapWithExtensions(this.queueFixedFields); } },
28+
};
29+
30+
private FixedFieldMap<Queue> queueFixedFields => new()
31+
{
32+
{ "name", (a, n) => { a.Name = n.GetScalarValue(); } },
33+
{ "fifoQueue", (a, n) => { a.FifoQueue = n.GetBooleanValue(); } },
34+
{ "deliveryDelay", (a, n) => { a.DeliveryDelay = n.GetIntegerValue(); } },
35+
{ "visibilityTimeout", (a, n) => { a.VisibilityTimeout = n.GetIntegerValue(); } },
36+
{ "receiveMessageWaitTime", (a, n) => { a.ReceiveMessageWaitTime = n.GetIntegerValue(); } },
37+
{ "messageRetentionPeriod", (a, n) => { a.MessageRetentionPeriod = n.GetIntegerValue(); } },
38+
{ "redrivePolicy", (a, n) => { a.RedrivePolicy = n.ParseMapWithExtensions(this.redrivePolicyFixedFields); } },
39+
{ "policy", (a, n) => { a.Policy = n.ParseMapWithExtensions(this.policyFixedFields); } },
40+
{ "tags", (a, n) => { a.Tags = n.CreateSimpleMap(s => s.GetScalarValue()); } },
41+
};
42+
43+
private FixedFieldMap<RedrivePolicy> redrivePolicyFixedFields => new()
44+
{
45+
{ "deadLetterQueue", (a, n) => { a.DeadLetterQueue = n.ParseMapWithExtensions(identifierFixFields); } },
46+
{ "maxReceiveCount", (a, n) => { a.MaxReceiveCount = n.GetIntegerValue(); } },
47+
};
48+
49+
private static FixedFieldMap<Identifier> identifierFixFields => new()
50+
{
51+
{ "arn", (a, n) => { a.Arn = n.GetScalarValue(); } },
52+
{ "name", (a, n) => { a.Name = n.GetScalarValue(); } },
53+
};
54+
55+
private FixedFieldMap<Policy> policyFixedFields = new()
56+
{
57+
{ "statements", (a, n) => { a.Statements = n.CreateList(s => s.ParseMapWithExtensions(statementFixedFields)); } },
58+
};
59+
60+
private static FixedFieldMap<Statement> statementFixedFields = new()
61+
{
62+
{ "effect", (a, n) => { a.Effect = n.GetScalarValue().GetEnumFromDisplayName<Effect>(); } },
63+
{ "principal", (a, n) => { a.Principal = StringOrStringList.Parse(n); } },
64+
{ "action", (a, n) => { a.Action = StringOrStringList.Parse(n); } },
65+
};
66+
67+
public override void SerializeProperties(IAsyncApiWriter writer)
68+
{
69+
if (writer is null)
70+
{
71+
throw new ArgumentNullException(nameof(writer));
72+
}
73+
74+
writer.WriteStartObject();
75+
writer.WriteRequiredObject("queue", this.Queue, (w, q) => q.Serialize(w));
76+
writer.WriteOptionalObject("deadLetterQueue", this.DeadLetterQueue, (w, q) => q.Serialize(w));
77+
writer.WriteExtensions(this.Extensions);
78+
writer.WriteEndObject();
79+
}
80+
}
81+
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
namespace LEGO.AsyncAPI.Bindings.Sqs
2+
{
3+
using System;
4+
using System.Collections.Generic;
5+
using LEGO.AsyncAPI.Readers.ParseNodes;
6+
using LEGO.AsyncAPI.Writers;
7+
8+
public class SqsOperationBinding : OperationBinding<SqsOperationBinding>
9+
{
10+
/// <summary>
11+
/// Queue objects that are either the endpoint for an SNS Operation Binding Object, or the deadLetterQueue of the SQS Operation Binding Object
12+
/// </summary>
13+
public List<Queue> Queues { get; set; }
14+
15+
public override string BindingKey => "sqs";
16+
17+
protected override FixedFieldMap<SqsOperationBinding> FixedFieldMap => new()
18+
{
19+
{ "queues", (a, n) => { a.Queues = n.CreateList(s => s.ParseMapWithExtensions(this.queueFixedFields)); } },
20+
};
21+
22+
private FixedFieldMap<Queue> queueFixedFields => new()
23+
{
24+
{ "name", (a, n) => { a.Name = n.GetScalarValue(); } },
25+
{ "fifoQueue", (a, n) => { a.FifoQueue = n.GetBooleanValue(); } },
26+
{ "deliveryDelay", (a, n) => { a.DeliveryDelay = n.GetIntegerValue(); } },
27+
{ "visibilityTimeout", (a, n) => { a.VisibilityTimeout = n.GetIntegerValue(); } },
28+
{ "receiveMessageWaitTime", (a, n) => { a.ReceiveMessageWaitTime = n.GetIntegerValue(); } },
29+
{ "messageRetentionPeriod", (a, n) => { a.MessageRetentionPeriod = n.GetIntegerValue(); } },
30+
{ "redrivePolicy", (a, n) => { a.RedrivePolicy = n.ParseMapWithExtensions(this.redrivePolicyFixedFields); } },
31+
{ "policy", (a, n) => { a.Policy = n.ParseMapWithExtensions(this.policyFixedFields); } },
32+
{ "tags", (a, n) => { a.Tags = n.CreateSimpleMap(s => s.GetScalarValue()); } },
33+
};
34+
35+
private FixedFieldMap<RedrivePolicy> redrivePolicyFixedFields => new()
36+
{
37+
{ "deadLetterQueue", (a, n) => { a.DeadLetterQueue = n.ParseMapWithExtensions(identifierFixFields); } },
38+
{ "maxReceiveCount", (a, n) => { a.MaxReceiveCount = n.GetIntegerValue(); } },
39+
};
40+
41+
private static FixedFieldMap<Identifier> identifierFixFields => new()
42+
{
43+
{ "arn", (a, n) => { a.Arn = n.GetScalarValue(); } },
44+
{ "name", (a, n) => { a.Name = n.GetScalarValue(); } },
45+
};
46+
47+
private FixedFieldMap<Policy> policyFixedFields = new()
48+
{
49+
{ "statements", (a, n) => { a.Statements = n.CreateList(s => s.ParseMapWithExtensions(statementFixedFields)); } },
50+
};
51+
52+
private static FixedFieldMap<Statement> statementFixedFields = new()
53+
{
54+
{ "effect", (a, n) => { a.Effect = n.GetScalarValue().GetEnumFromDisplayName<Effect>(); } },
55+
{ "principal", (a, n) => { a.Principal = StringOrStringList.Parse(n); } },
56+
{ "action", (a, n) => { a.Action = StringOrStringList.Parse(n); } },
57+
};
58+
59+
public override void SerializeProperties(IAsyncApiWriter writer)
60+
{
61+
if (writer is null)
62+
{
63+
throw new ArgumentNullException(nameof(writer));
64+
}
65+
66+
writer.WriteStartObject();
67+
writer.WriteRequiredCollection("queues", this.Queues, (w, t) => t.Serialize(w));
68+
writer.WriteExtensions(this.Extensions);
69+
writer.WriteEndObject();
70+
}
71+
}
72+
}

0 commit comments

Comments
 (0)