Skip to content

Commit 191c0cf

Browse files
committed
Include column count into parsed message
1 parent fe448b4 commit 191c0cf

File tree

6 files changed

+20
-37
lines changed

6 files changed

+20
-37
lines changed

src/Npgsql/BackendMessages/DataRowMessage.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,12 @@ sealed class DataRowMessage : IBackendMessage
1010
public BackendMessageCode Code => BackendMessageCode.DataRow;
1111

1212
internal int Length { get; private set; }
13+
internal short ColumnCount { get; private set; }
1314

14-
internal DataRowMessage Load(int len)
15+
internal DataRowMessage Load(int len, short columnCount)
1516
{
1617
Length = len;
18+
ColumnCount = columnCount;
1719
return this;
1820
}
19-
}
21+
}

src/Npgsql/Internal/NpgsqlConnector.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1297,6 +1297,9 @@ internal ValueTask<IBackendMessage> ReadMessage(
12971297
await ReadBuffer.Skip(async, len).ConfigureAwait(false);
12981298
continue;
12991299
}
1300+
1301+
// Make sure that the column count is already buffered.
1302+
await ReadBuffer.Ensure(sizeof(short), async).ConfigureAwait(false);
13001303
}
13011304
else if (len > ReadBuffer.ReadBytesLeft)
13021305
{
@@ -1406,7 +1409,7 @@ internal ValueTask<IBackendMessage> ReadMessage(
14061409
case BackendMessageCode.RowDescription:
14071410
return _rowDescriptionMessage.Load(buf, SerializerOptions);
14081411
case BackendMessageCode.DataRow:
1409-
return _dataRowMessage.Load(len);
1412+
return _dataRowMessage.Load(len, buf.ReadInt16());
14101413
case BackendMessageCode.CommandComplete:
14111414
return _commandCompleteMessage.Load(buf, len);
14121415
case BackendMessageCode.ReadyForQuery:

src/Npgsql/NpgsqlBinaryExporter.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -169,8 +169,8 @@ async ValueTask<int> StartRow(bool async, CancellationToken cancellationToken =
169169

170170
await _buf.Ensure(2, async).ConfigureAwait(false);
171171

172-
var numColumns = _buf.ReadInt16();
173-
if (numColumns == -1)
172+
var columnCount = _buf.ReadInt16();
173+
if (columnCount == -1)
174174
{
175175
Expect<CopyDoneMessage>(await _connector.ReadMessage(async).ConfigureAwait(false), _connector);
176176
Expect<CommandCompleteMessage>(await _connector.ReadMessage(async).ConfigureAwait(false), _connector);
@@ -180,7 +180,7 @@ async ValueTask<int> StartRow(bool async, CancellationToken cancellationToken =
180180
return -1;
181181
}
182182

183-
Debug.Assert(numColumns == NumColumns);
183+
Debug.Assert(columnCount == NumColumns);
184184

185185
_column = BeforeColumn;
186186
_rowsExported++;

src/Npgsql/NpgsqlDataReader.cs

Lines changed: 7 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ Task<bool> Read(bool async, CancellationToken cancellationToken = default)
218218
return TrueTask;
219219
case BackendMessageCode.CommandComplete or BackendMessageCode.EmptyQueryResponse when !_expectErrorBarrier && span.Length >= header.Length:
220220
buffer.ReadPosition += MessageHeader.ByteCount;
221-
ProcessMessage(Connector.ParseServerMessage(Buffer, BackendMessageCode.CommandComplete, header.Length)!);
221+
ProcessMessage(Connector.ParseServerMessage(Buffer, header.Code, header.Length)!);
222222
return FalseTask;
223223
default:
224224
return InResultSlow(async, cancellationToken);
@@ -248,14 +248,14 @@ async Task<bool> InResultSlow(bool async, CancellationToken cancellationToken)
248248

249249
await ConsumeRow(async).ConfigureAwait(false);
250250

251-
var msg = await ReadMessage(async).ConfigureAwait(false);
251+
var msg = await Connector.ReadMessage(async,
252+
_isSequential ? DataRowLoadingMode.Sequential : DataRowLoadingMode.NonSequential).ConfigureAwait(false);
252253

253254
switch (msg.Code)
254255
{
255256
case BackendMessageCode.DataRow:
256257
ProcessMessage(msg);
257258
return true;
258-
259259
case BackendMessageCode.CommandComplete:
260260
case BackendMessageCode.EmptyQueryResponse:
261261
ProcessMessage(msg);
@@ -276,23 +276,6 @@ async Task<bool> InResultSlow(bool async, CancellationToken cancellationToken)
276276
}
277277
}
278278

279-
ValueTask<IBackendMessage> ReadMessage(bool async)
280-
{
281-
return _isSequential ? ReadMessageSequential(async, Connector) : Connector.ReadMessage(async);
282-
283-
static async ValueTask<IBackendMessage> ReadMessageSequential(bool async, NpgsqlConnector connector)
284-
{
285-
var msg = await connector.ReadMessage(async, DataRowLoadingMode.Sequential).ConfigureAwait(false);
286-
if (msg.Code is BackendMessageCode.DataRow)
287-
{
288-
// Make sure that the datarow's column count is already buffered
289-
await connector.ReadBuffer.Ensure(sizeof(short), async).ConfigureAwait(false);
290-
return msg;
291-
}
292-
return msg;
293-
}
294-
}
295-
296279
#endregion
297280

298281
#region NextResult
@@ -465,7 +448,8 @@ async Task<bool> NextResult(bool async, bool isConsuming = false, CancellationTo
465448
}
466449
else
467450
{
468-
msg = await ReadMessage(async).ConfigureAwait(false);
451+
msg = await Connector.ReadMessage(async,
452+
_isSequential ? DataRowLoadingMode.Sequential : DataRowLoadingMode.NonSequential).ConfigureAwait(false);
469453
ProcessMessage(msg);
470454
}
471455

@@ -791,9 +775,8 @@ void ProcessMessage(IBackendMessage msg)
791775
Buffer.PgReader.StreamCanSeek = !_isSequential;
792776
}
793777
// We assume that the row's number of columns is identical to the description's
794-
var numColumns = Buffer.ReadInt16();
795-
if (ColumnCount != numColumns)
796-
ThrowHelper.ThrowArgumentException($"Row's number of columns ({numColumns}) differs from the row description's ({ColumnCount})");
778+
if (ColumnCount != dataRow.ColumnCount)
779+
ThrowHelper.ThrowArgumentException($"Row's number of columns ({dataRow.ColumnCount}) differs from the row description's ({ColumnCount})");
797780

798781
var readPosition = Buffer.ReadPosition;
799782
var msgRemainder = dataRow.Length - sizeof(short);

src/Npgsql/PostgresDatabaseInfo.cs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,6 @@ static string SanitizeForReplicationConnection(string str)
310310
Expect<DataRowMessage>(await conn.ReadMessage(async).ConfigureAwait(false), conn);
311311
// Note that here and below we don't assign ReadBuffer to a variable
312312
// because we might allocate oversize buffer
313-
conn.ReadBuffer.Skip(2); // Column count
314313
LongVersion = ReadNonNullableString(conn.ReadBuffer);
315314
Expect<CommandCompleteMessage>(await conn.ReadMessage(async).ConfigureAwait(false), conn);
316315
if (isReplicationConnection)
@@ -325,7 +324,6 @@ static string SanitizeForReplicationConnection(string str)
325324
if (msg is not DataRowMessage)
326325
break;
327326

328-
conn.ReadBuffer.Skip(2); // Column count
329327
var nspname = ReadNonNullableString(conn.ReadBuffer);
330328
var oid = uint.Parse(ReadNonNullableString(conn.ReadBuffer), NumberFormatInfo.InvariantInfo);
331329
Debug.Assert(oid != 0);
@@ -439,7 +437,6 @@ static string SanitizeForReplicationConnection(string str)
439437
if (msg is not DataRowMessage)
440438
break;
441439

442-
conn.ReadBuffer.Skip(2); // Column count
443440
var oid = uint.Parse(ReadNonNullableString(conn.ReadBuffer), NumberFormatInfo.InvariantInfo);
444441
var attname = ReadNonNullableString(conn.ReadBuffer);
445442
var atttypid = uint.Parse(ReadNonNullableString(conn.ReadBuffer), NumberFormatInfo.InvariantInfo);
@@ -501,7 +498,6 @@ static string SanitizeForReplicationConnection(string str)
501498
if (msg is not DataRowMessage)
502499
break;
503500

504-
conn.ReadBuffer.Skip(2); // Column count
505501
var oid = uint.Parse(ReadNonNullableString(conn.ReadBuffer), NumberFormatInfo.InvariantInfo);
506502
var enumlabel = ReadNonNullableString(conn.ReadBuffer);
507503
if (oid != currentOID)

src/Npgsql/Replication/ReplicationConnection.cs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -757,10 +757,9 @@ async Task<object[]> ReadSingleRow(string command, CancellationToken cancellatio
757757
await Connector.Flush(true, cancellationToken).ConfigureAwait(false);
758758

759759
var rowDescription = Expect<RowDescriptionMessage>(await Connector.ReadMessage(true).ConfigureAwait(false), Connector);
760-
Expect<DataRowMessage>(await Connector.ReadMessage(true).ConfigureAwait(false), Connector);
760+
var msg = Expect<DataRowMessage>(await Connector.ReadMessage(true).ConfigureAwait(false), Connector);
761761
var buf = Connector.ReadBuffer;
762-
await buf.EnsureAsync(2).ConfigureAwait(false);
763-
var results = new object[buf.ReadInt16()];
762+
var results = new object[msg.ColumnCount];
764763
for (var i = 0; i < results.Length; i++)
765764
{
766765
await buf.EnsureAsync(4).ConfigureAwait(false);

0 commit comments

Comments
 (0)