Skip to content

Commit 3a7b96a

Browse files
authored
fix: change payload 'any' type to 'schema' to match specification. (LEGO#77)
1 parent cbf3253 commit 3a7b96a

40 files changed

+2215
-822
lines changed

src/LEGO.AsyncAPI.Readers/V2/AsyncApiReaderSettings.cs renamed to src/LEGO.AsyncAPI.Readers/AsyncApiReaderSettings.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
11
// Copyright (c) The LEGO Group. All rights reserved.
22

3-
namespace LEGO.AsyncAPI.Readers.V2
3+
namespace LEGO.AsyncAPI.Readers
44
{
55
using System;
66
using System.Collections.Generic;
77
using System.IO;
88
using LEGO.AsyncAPI.Models.Interfaces;
9-
using LEGO.AsyncAPI.Readers;
109
using LEGO.AsyncAPI.Validations;
1110

1211
public enum ReferenceResolutionSetting

src/LEGO.AsyncAPI.Readers/AsyncApiStreamReader.cs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
// Copyright (c) The LEGO Group. All rights reserved.
22

3-
using LEGO.AsyncAPI.Readers.V2;
4-
53
namespace LEGO.AsyncAPI.Readers
64
{
75
using System.IO;
@@ -35,8 +33,8 @@ public AsyncApiStreamReader(AsyncApiReaderSettings settings = null)
3533
public AsyncApiDocument Read(Stream input, out AsyncApiDiagnostic diagnostic)
3634
{
3735
var reader = new StreamReader(input);
38-
var result = new AsyncApiTextReaderReader(settings).Read(reader, out diagnostic);
39-
if (!settings.LeaveStreamOpen)
36+
var result = new AsyncApiTextReaderReader(this.settings).Read(reader, out diagnostic);
37+
if (!this.settings.LeaveStreamOpen)
4038
{
4139
reader.Dispose();
4240
}
@@ -67,7 +65,7 @@ public async Task<ReadResult> ReadAsync(Stream input)
6765

6866
var reader = new StreamReader(bufferedStream);
6967

70-
return await new AsyncApiTextReaderReader(settings).ReadAsync(reader);
68+
return await new AsyncApiTextReaderReader(this.settings).ReadAsync(reader);
7169
}
7270

7371
/// <summary>
@@ -82,7 +80,7 @@ public T ReadFragment<T>(Stream input, AsyncApiVersion version, out AsyncApiDiag
8280
{
8381
using (var reader = new StreamReader(input))
8482
{
85-
return new AsyncApiTextReaderReader(settings).ReadFragment<T>(reader, version, out diagnostic);
83+
return new AsyncApiTextReaderReader(this.settings).ReadFragment<T>(reader, version, out diagnostic);
8684
}
8785
}
8886
}

src/LEGO.AsyncAPI.Readers/AsyncApiStringReader.cs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
// Copyright (c) The LEGO Group. All rights reserved.
22

3-
using LEGO.AsyncAPI.Readers.V2;
4-
53
namespace LEGO.AsyncAPI.Readers
64
{
75
using System.IO;
@@ -32,7 +30,7 @@ public AsyncApiDocument Read(string input, out AsyncApiDiagnostic diagnostic)
3230
{
3331
using (var reader = new StringReader(input))
3432
{
35-
return new AsyncApiTextReaderReader(settings).Read(reader, out diagnostic);
33+
return new AsyncApiTextReaderReader(this.settings).Read(reader, out diagnostic);
3634
}
3735
}
3836

@@ -44,7 +42,7 @@ public T ReadFragment<T>(string input, AsyncApiVersion version, out AsyncApiDiag
4442
{
4543
using (var reader = new StringReader(input))
4644
{
47-
return new AsyncApiTextReaderReader(settings).ReadFragment<T>(reader, version, out diagnostic);
45+
return new AsyncApiTextReaderReader(this.settings).ReadFragment<T>(reader, version, out diagnostic);
4846
}
4947
}
5048
}

src/LEGO.AsyncAPI.Readers/AsyncApiTextReader.cs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
// Copyright (c) The LEGO Group. All rights reserved.
22

3-
using LEGO.AsyncAPI.Readers.V2;
4-
53
namespace LEGO.AsyncAPI.Readers
64
{
75
using System.IO;
@@ -51,7 +49,7 @@ public AsyncApiDocument Read(TextReader input, out AsyncApiDiagnostic diagnostic
5149
return new AsyncApiDocument();
5250
}
5351

54-
return new AsyncApiYamlDocumentReader(settings).Read(yamlDocument, out diagnostic);
52+
return new AsyncApiYamlDocumentReader(this.settings).Read(yamlDocument, out diagnostic);
5553
}
5654

5755
/// <summary>
@@ -79,7 +77,7 @@ public async Task<ReadResult> ReadAsync(TextReader input)
7977
};
8078
}
8179

82-
return await new AsyncApiYamlDocumentReader(settings).ReadAsync(yamlDocument);
80+
return await new AsyncApiYamlDocumentReader(this.settings).ReadAsync(yamlDocument);
8381
}
8482

8583
/// <summary>
@@ -106,7 +104,7 @@ public T ReadFragment<T>(TextReader input, AsyncApiVersion version, out AsyncApi
106104
return default;
107105
}
108106

109-
return new AsyncApiYamlDocumentReader(settings).ReadFragment<T>(yamlDocument, version,
107+
return new AsyncApiYamlDocumentReader(this.settings).ReadFragment<T>(yamlDocument, version,
110108
out diagnostic);
111109
}
112110

src/LEGO.AsyncAPI.Readers/AsyncApiYamlDocumentReader.cs

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
// Copyright (c) The LEGO Group. All rights reserved.
22

3-
using LEGO.AsyncAPI.Readers.V2;
4-
53
namespace LEGO.AsyncAPI.Readers
64
{
75
using System.Collections.Generic;
@@ -42,24 +40,24 @@ public AsyncApiDocument Read(YamlDocument input, out AsyncApiDiagnostic diagnost
4240
diagnostic = new AsyncApiDiagnostic();
4341
var context = new ParsingContext(diagnostic)
4442
{
45-
ExtensionParsers = settings.ExtensionParsers,
43+
ExtensionParsers = this.settings.ExtensionParsers,
4644
};
4745

4846
AsyncApiDocument document = null;
4947
try
5048
{
5149
document = context.Parse(input);
5250

53-
ResolveReferences(diagnostic, document);
51+
this.ResolveReferences(diagnostic, document);
5452
}
5553
catch (AsyncApiException ex)
5654
{
5755
diagnostic.Errors.Add(new AsyncApiError(ex));
5856
}
5957

60-
if (settings.RuleSet != null && settings.RuleSet.Rules.Count > 0)
58+
if (this.settings.RuleSet != null && this.settings.RuleSet.Rules.Count > 0)
6159
{
62-
var asyncApiErrors = document.Validate(settings.RuleSet);
60+
var asyncApiErrors = document.Validate(this.settings.RuleSet);
6361
foreach (var item in asyncApiErrors.Where(e => e is AsyncApiValidatorError))
6462
{
6563
diagnostic.Errors.Add(item);
@@ -79,25 +77,25 @@ public async Task<ReadResult> ReadAsync(YamlDocument input)
7977
var diagnostic = new AsyncApiDiagnostic();
8078
var context = new ParsingContext(diagnostic)
8179
{
82-
ExtensionParsers = settings.ExtensionParsers,
80+
ExtensionParsers = this.settings.ExtensionParsers,
8381
};
8482

8583
AsyncApiDocument document = null;
8684
try
8785
{
8886
// Parse the AsyncApi Document
8987
document = context.Parse(input);
90-
ResolveReferences(diagnostic, document);
88+
this.ResolveReferences(diagnostic, document);
9189
}
9290
catch (AsyncApiException ex)
9391
{
9492
diagnostic.Errors.Add(new AsyncApiError(ex));
9593
}
9694

9795
// Validate the document
98-
if (settings.RuleSet != null && settings.RuleSet.Rules.Count > 0)
96+
if (this.settings.RuleSet != null && this.settings.RuleSet.Rules.Count > 0)
9997
{
100-
var errors = document.Validate(settings.RuleSet);
98+
var errors = document.Validate(this.settings.RuleSet);
10199
foreach (var item in errors)
102100
{
103101
diagnostic.Errors.Add(item);
@@ -116,7 +114,7 @@ private void ResolveReferences(AsyncApiDiagnostic diagnostic, AsyncApiDocument d
116114
var errors = new List<AsyncApiError>();
117115

118116
// Resolve References if requested
119-
switch (settings.ReferenceResolution)
117+
switch (this.settings.ReferenceResolution)
120118
{
121119
case ReferenceResolutionSetting.ResolveReferences:
122120
errors.AddRange(document.ResolveReferences());
@@ -145,7 +143,7 @@ public T ReadFragment<T>(YamlDocument input, AsyncApiVersion version, out AsyncA
145143
diagnostic = new AsyncApiDiagnostic();
146144
var context = new ParsingContext(diagnostic)
147145
{
148-
ExtensionParsers = settings.ExtensionParsers,
146+
ExtensionParsers = this.settings.ExtensionParsers,
149147
};
150148

151149
IAsyncApiElement element = null;
@@ -160,9 +158,9 @@ public T ReadFragment<T>(YamlDocument input, AsyncApiVersion version, out AsyncA
160158
}
161159

162160
// Validate the element
163-
if (settings.RuleSet != null && settings.RuleSet.Rules.Count > 0)
161+
if (this.settings.RuleSet != null && this.settings.RuleSet.Rules.Count > 0)
164162
{
165-
var errors = element.Validate(settings.RuleSet);
163+
var errors = element.Validate(this.settings.RuleSet);
166164
foreach (var item in errors)
167165
{
168166
diagnostic.Errors.Add(item);

src/LEGO.AsyncAPI.Readers/Bindings/AsyncApiKafkaBindingsDeserializer.cs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,19 @@ internal static partial class AsyncApiV2Deserializer
1919
{ "bindingVersion", (a, n) => { a.BindingVersion = n.GetScalarValue(); } },
2020
{ "topic", (a, n) => { a.Topic = n.GetScalarValue(); } },
2121
{ "partitions", (a, n) => { a.Partitions = n.GetIntegerValue(); } },
22+
{ "topicConfiguration", (a, n) => { a.TopicConfiguration = LoadTopicConfiguration(n); } },
2223
{ "replicas", (a, n) => { a.Replicas = n.GetIntegerValue(); } },
2324
};
2425

26+
private static FixedFieldMap<TopicConfigurationObject> kafkaChannelTopicConfigurationObjectFixedFields = new()
27+
{
28+
{ "cleanup.policy", (a, n) => { a.CleanupPolicy = n.CreateSimpleList(s => s.GetScalarValue()); } },
29+
{ "retention.ms", (a, n) => { a.RetentionMiliseconds = n.GetIntegerValue(); } },
30+
{ "retention.bytes", (a, n) => { a.RetentionBytes = n.GetIntegerValue(); } },
31+
{ "delete.retention.ms", (a, n) => { a.DeleteRetentionMiliseconds = n.GetIntegerValue(); } },
32+
{ "max.message.bytes", (a, n) => { a.MaxMessageBytes = n.GetIntegerValue(); } },
33+
};
34+
2535
private static FixedFieldMap<KafkaOperationBinding> kafkaOperationBindingFixedFields = new()
2636
{
2737
{ "bindingVersion", (a, n) => { a.BindingVersion = n.GetScalarValue(); } },
@@ -37,5 +47,13 @@ internal static partial class AsyncApiV2Deserializer
3747
{ "schemaIdPayloadEncoding", (a, n) => { a.SchemaIdPayloadEncoding = n.GetScalarValue(); } },
3848
{ "schemaLookupStrategy", (a, n) => { a.SchemaLookupStrategy = n.GetScalarValue(); } },
3949
};
50+
51+
private static TopicConfigurationObject LoadTopicConfiguration(ParseNode node)
52+
{
53+
var mapNode = node.CheckMapNode("topicConfiguration");
54+
var retention = new TopicConfigurationObject();
55+
ParseMap(mapNode, retention, kafkaChannelTopicConfigurationObjectFixedFields, null);
56+
return retention;
57+
}
4058
}
4159
}

src/LEGO.AsyncAPI.Readers/ParseNodes/ValueNode.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public override int GetIntegerValue()
3636
return value;
3737
}
3838

39-
throw new AsyncApiReaderException("Value could not parse to integer", node);
39+
throw new AsyncApiReaderException("Value could not parse to integer", this.node);
4040
}
4141

4242
public override long GetLongValue()
@@ -46,7 +46,7 @@ public override long GetLongValue()
4646
return value;
4747
}
4848

49-
throw new AsyncApiReaderException("Value could not parse to long", node);
49+
throw new AsyncApiReaderException("Value could not parse to long", this.node);
5050
}
5151

5252
public override bool GetBooleanValue()
@@ -56,7 +56,7 @@ public override bool GetBooleanValue()
5656
return value;
5757
}
5858

59-
throw new AsyncApiReaderException("Value could not parse to bool", node);
59+
throw new AsyncApiReaderException("Value could not parse to bool", this.node);
6060
}
6161

6262
public override IAsyncApiAny CreateAny()

src/LEGO.AsyncAPI.Readers/ParsingContext.cs

Lines changed: 0 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,6 @@ namespace LEGO.AsyncAPI.Readers
1616
public class ParsingContext
1717
{
1818
private readonly Stack<string> currentLocation = new ();
19-
private readonly Dictionary<string, object> tempStorage = new ();
20-
private readonly Dictionary<object, Dictionary<string, object>> scopedTempStorage = new ();
21-
private readonly Dictionary<string, Stack<string>> loopStacks = new ();
2219

2320
internal Dictionary<string, Func<IAsyncApiAny, IAsyncApiExtension>> ExtensionParsers
2421
{
@@ -104,79 +101,9 @@ public string GetLocation()
104101
this.currentLocation.Reverse().Select(s => s.Replace("~", "~0").Replace("/", "~1")).ToArray());
105102
}
106103

107-
public T GetFromTempStorage<T>(string key, object scope = null)
108-
{
109-
Dictionary<string, object> storage;
110-
111-
if (scope == null)
112-
{
113-
storage = this.tempStorage;
114-
}
115-
else if (!this.scopedTempStorage.TryGetValue(scope, out storage))
116-
{
117-
return default(T);
118-
}
119-
120-
return storage.TryGetValue(key, out var value) ? (T)value : default(T);
121-
}
122-
123-
public void SetTempStorage(string key, object value, object scope = null)
124-
{
125-
Dictionary<string, object> storage;
126-
127-
if (scope == null)
128-
{
129-
storage = this.tempStorage;
130-
}
131-
else if (!this.scopedTempStorage.TryGetValue(scope, out storage))
132-
{
133-
storage = this.scopedTempStorage[scope] = new Dictionary<string, object>();
134-
}
135-
136-
if (value == null)
137-
{
138-
storage.Remove(key);
139-
}
140-
else
141-
{
142-
storage[key] = value;
143-
}
144-
}
145-
146104
public void StartObject(string objectName)
147105
{
148106
this.currentLocation.Push(objectName);
149107
}
150-
151-
public bool PushLoop(string loopId, string key)
152-
{
153-
Stack<string> stack;
154-
if (!this.loopStacks.TryGetValue(loopId, out stack))
155-
{
156-
stack = new Stack<string>();
157-
this.loopStacks.Add(loopId, stack);
158-
}
159-
160-
if (!stack.Contains(key))
161-
{
162-
stack.Push(key);
163-
return true;
164-
}
165-
166-
return false; // Loop detected
167-
}
168-
169-
internal void ClearLoop(string loopid)
170-
{
171-
this.loopStacks[loopid].Clear();
172-
}
173-
174-
public void PopLoop(string loopid)
175-
{
176-
if (this.loopStacks[loopid].Count > 0)
177-
{
178-
this.loopStacks[loopid].Pop();
179-
}
180-
}
181108
}
182109
}

src/LEGO.AsyncAPI.Readers/V2/AsyncApiDocumentDeserializer.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ internal static partial class AsyncApiV2Deserializer
1010
{
1111
private static FixedFieldMap<AsyncApiDocument> asyncApiFixedFields = new()
1212
{
13-
{ "asyncapi", (a, n) => { } },
13+
{ "asyncapi", (a, n) => { a.Asyncapi = "2.5.0"; } },
1414
{ "id", (a, n) => a.Id = n.GetScalarValue() },
1515
{ "info", (a, n) => a.Info = LoadInfo(n) },
1616
{ "servers", (a, n) => a.Servers = n.CreateMap(LoadServer) },

src/LEGO.AsyncAPI.Readers/V2/AsyncApiMessageDeserializer.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ internal static partial class AsyncApiV2Deserializer
2020
"headers", (a, n) => { a.Headers = LoadSchema(n); }
2121
},
2222
{
23-
"payload", (a, n) => { a.Payload = n.CreateAny(); }
23+
"payload", (a, n) => { a.Payload = LoadSchema(n); }
2424
},
2525
{
2626
"correlationId", (a, n) => { a.CorrelationId = LoadCorrelationId(n); }

0 commit comments

Comments
 (0)