Skip to content
Prev Previous commit
Include column count into parsed message
  • Loading branch information
NinoFloris committed Mar 27, 2024
commit 191c0cfedde8144de98cc80362d284078c7959f8
6 changes: 4 additions & 2 deletions src/Npgsql/BackendMessages/DataRowMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ sealed class DataRowMessage : IBackendMessage
public BackendMessageCode Code => BackendMessageCode.DataRow;

internal int Length { get; private set; }
internal short ColumnCount { get; private set; }

internal DataRowMessage Load(int len)
internal DataRowMessage Load(int len, short columnCount)
{
Length = len;
ColumnCount = columnCount;
return this;
}
}
}
5 changes: 4 additions & 1 deletion src/Npgsql/Internal/NpgsqlConnector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1297,6 +1297,9 @@ internal ValueTask<IBackendMessage> ReadMessage(
await ReadBuffer.Skip(async, len).ConfigureAwait(false);
continue;
}

// Make sure that the column count is already buffered.
await ReadBuffer.Ensure(sizeof(short), async).ConfigureAwait(false);
}
else if (len > ReadBuffer.ReadBytesLeft)
{
Expand Down Expand Up @@ -1406,7 +1409,7 @@ internal ValueTask<IBackendMessage> ReadMessage(
case BackendMessageCode.RowDescription:
return _rowDescriptionMessage.Load(buf, SerializerOptions);
case BackendMessageCode.DataRow:
return _dataRowMessage.Load(len);
return _dataRowMessage.Load(len, buf.ReadInt16());
case BackendMessageCode.CommandComplete:
return _commandCompleteMessage.Load(buf, len);
case BackendMessageCode.ReadyForQuery:
Expand Down
6 changes: 3 additions & 3 deletions src/Npgsql/NpgsqlBinaryExporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,8 @@ async ValueTask<int> StartRow(bool async, CancellationToken cancellationToken =

await _buf.Ensure(2, async).ConfigureAwait(false);

var numColumns = _buf.ReadInt16();
if (numColumns == -1)
var columnCount = _buf.ReadInt16();
if (columnCount == -1)
{
Expect<CopyDoneMessage>(await _connector.ReadMessage(async).ConfigureAwait(false), _connector);
Expect<CommandCompleteMessage>(await _connector.ReadMessage(async).ConfigureAwait(false), _connector);
Expand All @@ -180,7 +180,7 @@ async ValueTask<int> StartRow(bool async, CancellationToken cancellationToken =
return -1;
}

Debug.Assert(numColumns == NumColumns);
Debug.Assert(columnCount == NumColumns);

_column = BeforeColumn;
_rowsExported++;
Expand Down
31 changes: 7 additions & 24 deletions src/Npgsql/NpgsqlDataReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ Task<bool> Read(bool async, CancellationToken cancellationToken = default)
return TrueTask;
case BackendMessageCode.CommandComplete or BackendMessageCode.EmptyQueryResponse when !_expectErrorBarrier && span.Length >= header.Length:
buffer.ReadPosition += MessageHeader.ByteCount;
ProcessMessage(Connector.ParseServerMessage(Buffer, BackendMessageCode.CommandComplete, header.Length)!);
ProcessMessage(Connector.ParseServerMessage(Buffer, header.Code, header.Length)!);
return FalseTask;
default:
return InResultSlow(async, cancellationToken);
Expand Down Expand Up @@ -248,14 +248,14 @@ async Task<bool> InResultSlow(bool async, CancellationToken cancellationToken)

await ConsumeRow(async).ConfigureAwait(false);

var msg = await ReadMessage(async).ConfigureAwait(false);
var msg = await Connector.ReadMessage(async,
_isSequential ? DataRowLoadingMode.Sequential : DataRowLoadingMode.NonSequential).ConfigureAwait(false);

switch (msg.Code)
{
case BackendMessageCode.DataRow:
ProcessMessage(msg);
return true;

case BackendMessageCode.CommandComplete:
case BackendMessageCode.EmptyQueryResponse:
ProcessMessage(msg);
Expand All @@ -276,23 +276,6 @@ async Task<bool> InResultSlow(bool async, CancellationToken cancellationToken)
}
}

ValueTask<IBackendMessage> ReadMessage(bool async)
{
return _isSequential ? ReadMessageSequential(async, Connector) : Connector.ReadMessage(async);

static async ValueTask<IBackendMessage> ReadMessageSequential(bool async, NpgsqlConnector connector)
{
var msg = await connector.ReadMessage(async, DataRowLoadingMode.Sequential).ConfigureAwait(false);
if (msg.Code is BackendMessageCode.DataRow)
{
// Make sure that the datarow's column count is already buffered
await connector.ReadBuffer.Ensure(sizeof(short), async).ConfigureAwait(false);
return msg;
}
return msg;
}
}

#endregion

#region NextResult
Expand Down Expand Up @@ -465,7 +448,8 @@ async Task<bool> NextResult(bool async, bool isConsuming = false, CancellationTo
}
else
{
msg = await ReadMessage(async).ConfigureAwait(false);
msg = await Connector.ReadMessage(async,
_isSequential ? DataRowLoadingMode.Sequential : DataRowLoadingMode.NonSequential).ConfigureAwait(false);
ProcessMessage(msg);
}

Expand Down Expand Up @@ -791,9 +775,8 @@ void ProcessMessage(IBackendMessage msg)
Buffer.PgReader.StreamCanSeek = !_isSequential;
}
// We assume that the row's number of columns is identical to the description's
var numColumns = Buffer.ReadInt16();
if (ColumnCount != numColumns)
ThrowHelper.ThrowArgumentException($"Row's number of columns ({numColumns}) differs from the row description's ({ColumnCount})");
if (ColumnCount != dataRow.ColumnCount)
ThrowHelper.ThrowArgumentException($"Row's number of columns ({dataRow.ColumnCount}) differs from the row description's ({ColumnCount})");

var readPosition = Buffer.ReadPosition;
var msgRemainder = dataRow.Length - sizeof(short);
Expand Down
4 changes: 0 additions & 4 deletions src/Npgsql/PostgresDatabaseInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,6 @@ static string SanitizeForReplicationConnection(string str)
Expect<DataRowMessage>(await conn.ReadMessage(async).ConfigureAwait(false), conn);
// Note that here and below we don't assign ReadBuffer to a variable
// because we might allocate oversize buffer
conn.ReadBuffer.Skip(2); // Column count
LongVersion = ReadNonNullableString(conn.ReadBuffer);
Expect<CommandCompleteMessage>(await conn.ReadMessage(async).ConfigureAwait(false), conn);
if (isReplicationConnection)
Expand All @@ -325,7 +324,6 @@ static string SanitizeForReplicationConnection(string str)
if (msg is not DataRowMessage)
break;

conn.ReadBuffer.Skip(2); // Column count
var nspname = ReadNonNullableString(conn.ReadBuffer);
var oid = uint.Parse(ReadNonNullableString(conn.ReadBuffer), NumberFormatInfo.InvariantInfo);
Debug.Assert(oid != 0);
Expand Down Expand Up @@ -439,7 +437,6 @@ static string SanitizeForReplicationConnection(string str)
if (msg is not DataRowMessage)
break;

conn.ReadBuffer.Skip(2); // Column count
var oid = uint.Parse(ReadNonNullableString(conn.ReadBuffer), NumberFormatInfo.InvariantInfo);
var attname = ReadNonNullableString(conn.ReadBuffer);
var atttypid = uint.Parse(ReadNonNullableString(conn.ReadBuffer), NumberFormatInfo.InvariantInfo);
Expand Down Expand Up @@ -501,7 +498,6 @@ static string SanitizeForReplicationConnection(string str)
if (msg is not DataRowMessage)
break;

conn.ReadBuffer.Skip(2); // Column count
var oid = uint.Parse(ReadNonNullableString(conn.ReadBuffer), NumberFormatInfo.InvariantInfo);
var enumlabel = ReadNonNullableString(conn.ReadBuffer);
if (oid != currentOID)
Expand Down
5 changes: 2 additions & 3 deletions src/Npgsql/Replication/ReplicationConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -757,10 +757,9 @@ async Task<object[]> ReadSingleRow(string command, CancellationToken cancellatio
await Connector.Flush(true, cancellationToken).ConfigureAwait(false);

var rowDescription = Expect<RowDescriptionMessage>(await Connector.ReadMessage(true).ConfigureAwait(false), Connector);
Expect<DataRowMessage>(await Connector.ReadMessage(true).ConfigureAwait(false), Connector);
var msg = Expect<DataRowMessage>(await Connector.ReadMessage(true).ConfigureAwait(false), Connector);
var buf = Connector.ReadBuffer;
await buf.EnsureAsync(2).ConfigureAwait(false);
var results = new object[buf.ReadInt16()];
var results = new object[msg.ColumnCount];
for (var i = 0; i < results.Length; i++)
{
await buf.EnsureAsync(4).ConfigureAwait(false);
Expand Down