Expand usage of macros for protocol characters. master github/master
authorNathan Bossart <nathan@postgresql.org>
Wed, 6 Aug 2025 18:37:00 +0000 (13:37 -0500)
committerNathan Bossart <nathan@postgresql.org>
Wed, 6 Aug 2025 18:37:00 +0000 (13:37 -0500)
This commit makes use of the existing PqMsg_* macros in more places
and adds new PqReplMsg_* and PqBackupMsg_* macros for use in
special replication and backup messages, respectively.

Author: Dave Cramer <davecramer@gmail.com>
Co-authored-by: Fabrízio de Royes Mello <fabriziomello@gmail.com>
Reviewed-by: Jacob Champion <jacob.champion@enterprisedb.com>
Reviewed-by: Álvaro Herrera <alvherre@kurilemu.de>
Reviewed-by: Euler Taveira <euler@eulerto.com>
Discussion: https://postgr.es/m/aIECfYfevCUpenBT@nathan
Discussion: https://postgr.es/m/CAFcNs%2Br73NOUb7%2BqKrV4HHEki02CS96Z%2Bx19WaFgE087BWwEng%40mail.gmail.com

src/backend/backup/basebackup_copy.c
src/backend/replication/logical/applyparallelworker.c
src/backend/replication/logical/worker.c
src/backend/replication/walreceiver.c
src/backend/replication/walsender.c
src/bin/pg_basebackup/pg_basebackup.c
src/bin/pg_basebackup/pg_recvlogical.c
src/bin/pg_basebackup/receivelog.c
src/include/libpq/protocol.h

index 18b0b5a52d3f88f258c7a25d1c211e112e53c603..eb45d3bcb663b2174c1c8f49f0858866b6604d03 100644 (file)
@@ -143,7 +143,7 @@ bbsink_copystream_begin_backup(bbsink *sink)
    buf = palloc(mysink->base.bbs_buffer_length + MAXIMUM_ALIGNOF);
    mysink->msgbuffer = buf + (MAXIMUM_ALIGNOF - 1);
    mysink->base.bbs_buffer = buf + MAXIMUM_ALIGNOF;
-   mysink->msgbuffer[0] = 'd'; /* archive or manifest data */
+   mysink->msgbuffer[0] = PqMsg_CopyData;  /* archive or manifest data */
 
    /* Tell client the backup start location. */
    SendXlogRecPtrResult(state->startptr, state->starttli);
@@ -170,7 +170,7 @@ bbsink_copystream_begin_archive(bbsink *sink, const char *archive_name)
 
    ti = list_nth(state->tablespaces, state->tablespace_num);
    pq_beginmessage(&buf, PqMsg_CopyData);
-   pq_sendbyte(&buf, 'n');     /* New archive */
+   pq_sendbyte(&buf, PqBackupMsg_NewArchive);
    pq_sendstring(&buf, archive_name);
    pq_sendstring(&buf, ti->path == NULL ? "" : ti->path);
    pq_endmessage(&buf);
@@ -191,7 +191,7 @@ bbsink_copystream_archive_contents(bbsink *sink, size_t len)
    if (mysink->send_to_client)
    {
        /* Add one because we're also sending a leading type byte. */
-       pq_putmessage('d', mysink->msgbuffer, len + 1);
+       pq_putmessage(PqMsg_CopyData, mysink->msgbuffer, len + 1);
    }
 
    /* Consider whether to send a progress report to the client. */
@@ -221,7 +221,7 @@ bbsink_copystream_archive_contents(bbsink *sink, size_t len)
            mysink->last_progress_report_time = now;
 
            pq_beginmessage(&buf, PqMsg_CopyData);
-           pq_sendbyte(&buf, 'p'); /* Progress report */
+           pq_sendbyte(&buf, PqBackupMsg_ProgressReport);
            pq_sendint64(&buf, state->bytes_done);
            pq_endmessage(&buf);
            pq_flush_if_writable();
@@ -247,7 +247,7 @@ bbsink_copystream_end_archive(bbsink *sink)
    mysink->bytes_done_at_last_time_check = state->bytes_done;
    mysink->last_progress_report_time = GetCurrentTimestamp();
    pq_beginmessage(&buf, PqMsg_CopyData);
-   pq_sendbyte(&buf, 'p');     /* Progress report */
+   pq_sendbyte(&buf, PqBackupMsg_ProgressReport);
    pq_sendint64(&buf, state->bytes_done);
    pq_endmessage(&buf);
    pq_flush_if_writable();
@@ -262,7 +262,7 @@ bbsink_copystream_begin_manifest(bbsink *sink)
    StringInfoData buf;
 
    pq_beginmessage(&buf, PqMsg_CopyData);
-   pq_sendbyte(&buf, 'm');     /* Manifest */
+   pq_sendbyte(&buf, PqBackupMsg_Manifest);
    pq_endmessage(&buf);
 }
 
@@ -277,7 +277,7 @@ bbsink_copystream_manifest_contents(bbsink *sink, size_t len)
    if (mysink->send_to_client)
    {
        /* Add one because we're also sending a leading type byte. */
-       pq_putmessage('d', mysink->msgbuffer, len + 1);
+       pq_putmessage(PqMsg_CopyData, mysink->msgbuffer, len + 1);
    }
 }
 
index 1fa931a74229dde33775ff97a842c210d3819828..cd0e19176fdb25ff430402f5fc2effbc2a09b651 100644 (file)
@@ -778,10 +778,10 @@ LogicalParallelApplyLoop(shm_mq_handle *mqh)
 
            /*
             * The first byte of messages sent from leader apply worker to
-            * parallel apply workers can only be 'w'.
+            * parallel apply workers can only be PqReplMsg_WALData.
             */
            c = pq_getmsgbyte(&s);
-           if (c != 'w')
+           if (c != PqReplMsg_WALData)
                elog(ERROR, "unexpected message \"%c\"", c);
 
            /*
index 89e241c83928069835940b2b9e61e3be37e05c62..0fdc5de57ba895ba0d464857df6bcf137234b262 100644 (file)
@@ -3994,7 +3994,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 
                    c = pq_getmsgbyte(&s);
 
-                   if (c == 'w')
+                   if (c == PqReplMsg_WALData)
                    {
                        XLogRecPtr  start_lsn;
                        XLogRecPtr  end_lsn;
@@ -4016,7 +4016,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 
                        maybe_advance_nonremovable_xid(&rdt_data, false);
                    }
-                   else if (c == 'k')
+                   else if (c == PqReplMsg_Keepalive)
                    {
                        XLogRecPtr  end_lsn;
                        TimestampTz timestamp;
@@ -4035,7 +4035,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 
                        UpdateWorkerStats(last_received, timestamp, true);
                    }
-                   else if (c == 's')  /* Primary status update */
+                   else if (c == PqReplMsg_PrimaryStatusUpdate)
                    {
                        rdt_data.remote_lsn = pq_getmsgint64(&s);
                        rdt_data.remote_oldestxid = FullTransactionIdFromU64((uint64) pq_getmsgint64(&s));
@@ -4267,7 +4267,7 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
    else
        resetStringInfo(reply_message);
 
-   pq_sendbyte(reply_message, 'r');
+   pq_sendbyte(reply_message, PqReplMsg_StandbyStatusUpdate);
    pq_sendint64(reply_message, recvpos);   /* write */
    pq_sendint64(reply_message, flushpos);  /* flush */
    pq_sendint64(reply_message, writepos);  /* apply */
@@ -4438,7 +4438,7 @@ request_publisher_status(RetainDeadTuplesData *rdt_data)
     * Send the current time to update the remote walsender's latest reply
     * message received time.
     */
-   pq_sendbyte(request_message, 'p');
+   pq_sendbyte(request_message, PqReplMsg_PrimaryStatusRequest);
    pq_sendint64(request_message, GetCurrentTimestamp());
 
    elog(DEBUG2, "sending publisher status request message");
index b62811017116fb23cfa623d9009e61863be43322..7361ffc9dcf5e1d95e2e19821150f5f968336871 100644 (file)
@@ -826,7 +826,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli)
 
    switch (type)
    {
-       case 'w':               /* WAL records */
+       case PqReplMsg_WALData:
            {
                StringInfoData incoming_message;
 
@@ -850,7 +850,7 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli)
                XLogWalRcvWrite(buf, len, dataStart, tli);
                break;
            }
-       case 'k':               /* Keepalive */
+       case PqReplMsg_Keepalive:
            {
                StringInfoData incoming_message;
 
@@ -1130,7 +1130,7 @@ XLogWalRcvSendReply(bool force, bool requestReply)
    applyPtr = GetXLogReplayRecPtr(NULL);
 
    resetStringInfo(&reply_message);
-   pq_sendbyte(&reply_message, 'r');
+   pq_sendbyte(&reply_message, PqReplMsg_StandbyStatusUpdate);
    pq_sendint64(&reply_message, writePtr);
    pq_sendint64(&reply_message, flushPtr);
    pq_sendint64(&reply_message, applyPtr);
@@ -1234,7 +1234,7 @@ XLogWalRcvSendHSFeedback(bool immed)
 
    /* Construct the message and send it. */
    resetStringInfo(&reply_message);
-   pq_sendbyte(&reply_message, 'h');
+   pq_sendbyte(&reply_message, PqReplMsg_HotStandbyFeedback);
    pq_sendint64(&reply_message, GetCurrentTimestamp());
    pq_sendint32(&reply_message, xmin);
    pq_sendint32(&reply_message, xmin_epoch);
index ee911394a23c626d9e3ed723ceacc5dcd308fe5e..0855bae3535a67db6464da77a736759b320ad68e 100644 (file)
@@ -1534,7 +1534,7 @@ WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xi
 
    resetStringInfo(ctx->out);
 
-   pq_sendbyte(ctx->out, 'w');
+   pq_sendbyte(ctx->out, PqReplMsg_WALData);
    pq_sendint64(ctx->out, lsn);    /* dataStart */
    pq_sendint64(ctx->out, lsn);    /* walEnd */
 
@@ -2292,7 +2292,8 @@ ProcessRepliesIfAny(void)
        switch (firstchar)
        {
                /*
-                * 'd' means a standby reply wrapped in a CopyData packet.
+                * PqMsg_CopyData means a standby reply wrapped in a CopyData
+                * packet.
                 */
            case PqMsg_CopyData:
                ProcessStandbyMessage();
@@ -2300,8 +2301,9 @@ ProcessRepliesIfAny(void)
                break;
 
                /*
-                * CopyDone means the standby requested to finish streaming.
-                * Reply with CopyDone, if we had not sent that already.
+                * PqMsg_CopyDone means the standby requested to finish
+                * streaming.  Reply with CopyDone, if we had not sent that
+                * already.
                 */
            case PqMsg_CopyDone:
                if (!streamingDoneSending)
@@ -2315,7 +2317,8 @@ ProcessRepliesIfAny(void)
                break;
 
                /*
-                * 'X' means that the standby is closing down the socket.
+                * PqMsg_Terminate means that the standby is closing down the
+                * socket.
                 */
            case PqMsg_Terminate:
                proc_exit(0);
@@ -2350,15 +2353,15 @@ ProcessStandbyMessage(void)
 
    switch (msgtype)
    {
-       case 'r':
+       case PqReplMsg_StandbyStatusUpdate:
            ProcessStandbyReplyMessage();
            break;
 
-       case 'h':
+       case PqReplMsg_HotStandbyFeedback:
            ProcessStandbyHSFeedbackMessage();
            break;
 
-       case 'p':
+       case PqReplMsg_PrimaryStatusRequest:
            ProcessStandbyPSRequestMessage();
            break;
 
@@ -2752,7 +2755,7 @@ ProcessStandbyPSRequestMessage(void)
 
    /* construct the message... */
    resetStringInfo(&output_message);
-   pq_sendbyte(&output_message, 's');
+   pq_sendbyte(&output_message, PqReplMsg_PrimaryStatusUpdate);
    pq_sendint64(&output_message, lsn);
    pq_sendint64(&output_message, (int64) U64FromFullTransactionId(fullOldestXidInCommit));
    pq_sendint64(&output_message, (int64) U64FromFullTransactionId(nextFullXid));
@@ -3364,7 +3367,7 @@ XLogSendPhysical(void)
     * OK to read and send the slice.
     */
    resetStringInfo(&output_message);
-   pq_sendbyte(&output_message, 'w');
+   pq_sendbyte(&output_message, PqReplMsg_WALData);
 
    pq_sendint64(&output_message, startptr);    /* dataStart */
    pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
@@ -4135,7 +4138,7 @@ WalSndKeepalive(bool requestReply, XLogRecPtr writePtr)
 
    /* construct the message... */
    resetStringInfo(&output_message);
-   pq_sendbyte(&output_message, 'k');
+   pq_sendbyte(&output_message, PqReplMsg_Keepalive);
    pq_sendint64(&output_message, XLogRecPtrIsInvalid(writePtr) ? sentPtr : writePtr);
    pq_sendint64(&output_message, GetCurrentTimestamp());
    pq_sendbyte(&output_message, requestReply ? 1 : 0);
index 55621f35fb6b73292c3894b1aed6ac15f1b9ddca..0a3ca4315de1ecfbb8f7e3ba8807325c2597665b 100644 (file)
@@ -35,6 +35,7 @@
 #include "fe_utils/option_utils.h"
 #include "fe_utils/recovery_gen.h"
 #include "getopt_long.h"
+#include "libpq/protocol.h"
 #include "receivelog.h"
 #include "streamutil.h"
 
@@ -1338,7 +1339,7 @@ ReceiveArchiveStreamChunk(size_t r, char *copybuf, void *callback_data)
    /* Each CopyData message begins with a type byte. */
    switch (GetCopyDataByte(r, copybuf, &cursor))
    {
-       case 'n':
+       case PqBackupMsg_NewArchive:
            {
                /* New archive. */
                char       *archive_name;
@@ -1410,7 +1411,7 @@ ReceiveArchiveStreamChunk(size_t r, char *copybuf, void *callback_data)
                break;
            }
 
-       case 'd':
+       case PqMsg_CopyData:
            {
                /* Archive or manifest data. */
                if (state->manifest_buffer != NULL)
@@ -1446,7 +1447,7 @@ ReceiveArchiveStreamChunk(size_t r, char *copybuf, void *callback_data)
                break;
            }
 
-       case 'p':
+       case PqBackupMsg_ProgressReport:
            {
                /*
                 * Progress report.
@@ -1465,7 +1466,7 @@ ReceiveArchiveStreamChunk(size_t r, char *copybuf, void *callback_data)
                break;
            }
 
-       case 'm':
+       case PqBackupMsg_Manifest:
            {
                /*
                 * Manifest data will be sent next. This message is not
index 0e9d2e23947313e0e2e0331f69b2ce074fcf274f..7a4d1a2d2ca66fd03581be1e2c79c658b6bdd55a 100644 (file)
@@ -24,6 +24,7 @@
 #include "getopt_long.h"
 #include "libpq-fe.h"
 #include "libpq/pqsignal.h"
+#include "libpq/protocol.h"
 #include "pqexpbuffer.h"
 #include "streamutil.h"
 
@@ -149,7 +150,7 @@ sendFeedback(PGconn *conn, TimestampTz now, bool force, bool replyRequested)
                    LSN_FORMAT_ARGS(output_fsync_lsn),
                    replication_slot);
 
-   replybuf[len] = 'r';
+   replybuf[len] = PqReplMsg_StandbyStatusUpdate;
    len += 1;
    fe_sendint64(output_written_lsn, &replybuf[len]);   /* write */
    len += 8;
@@ -454,7 +455,7 @@ StreamLogicalLog(void)
        }
 
        /* Check the message type. */
-       if (copybuf[0] == 'k')
+       if (copybuf[0] == PqReplMsg_Keepalive)
        {
            int         pos;
            bool        replyRequested;
@@ -466,7 +467,7 @@ StreamLogicalLog(void)
             * We just check if the server requested a reply, and ignore the
             * rest.
             */
-           pos = 1;            /* skip msgtype 'k' */
+           pos = 1;            /* skip msgtype PqReplMsg_Keepalive */
            walEnd = fe_recvint64(&copybuf[pos]);
            output_written_lsn = Max(walEnd, output_written_lsn);
 
@@ -509,7 +510,7 @@ StreamLogicalLog(void)
 
            continue;
        }
-       else if (copybuf[0] != 'w')
+       else if (copybuf[0] != PqReplMsg_WALData)
        {
            pg_log_error("unrecognized streaming header: \"%c\"",
                         copybuf[0]);
@@ -521,7 +522,7 @@ StreamLogicalLog(void)
         * message. We only need the WAL location field (dataStart), the rest
         * of the header is ignored.
         */
-       hdr_len = 1;            /* msgtype 'w' */
+       hdr_len = 1;            /* msgtype PqReplMsg_WALData */
        hdr_len += 8;           /* dataStart */
        hdr_len += 8;           /* walEnd */
        hdr_len += 8;           /* sendTime */
index f2b54d3c50171a2a8e715601da956544a9991fe6..25b13c7f55cd1f08c369f4996ba65b2cbc12597d 100644 (file)
@@ -21,6 +21,7 @@
 #include "access/xlog_internal.h"
 #include "common/logging.h"
 #include "libpq-fe.h"
+#include "libpq/protocol.h"
 #include "receivelog.h"
 #include "streamutil.h"
 
@@ -338,7 +339,7 @@ sendFeedback(PGconn *conn, XLogRecPtr blockpos, TimestampTz now, bool replyReque
    char        replybuf[1 + 8 + 8 + 8 + 8 + 1];
    int         len = 0;
 
-   replybuf[len] = 'r';
+   replybuf[len] = PqReplMsg_StandbyStatusUpdate;
    len += 1;
    fe_sendint64(blockpos, &replybuf[len]); /* write */
    len += 8;
@@ -823,13 +824,13 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream,
            }
 
            /* Check the message type. */
-           if (copybuf[0] == 'k')
+           if (copybuf[0] == PqReplMsg_Keepalive)
            {
                if (!ProcessKeepaliveMsg(conn, stream, copybuf, r, blockpos,
                                         &last_status))
                    goto error;
            }
-           else if (copybuf[0] == 'w')
+           else if (copybuf[0] == PqReplMsg_WALData)
            {
                if (!ProcessWALDataMsg(conn, stream, copybuf, r, &blockpos))
                    goto error;
@@ -1001,7 +1002,7 @@ ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
     * Parse the keepalive message, enclosed in the CopyData message. We just
     * check if the server requested a reply, and ignore the rest.
     */
-   pos = 1;                    /* skip msgtype 'k' */
+   pos = 1;                    /* skip msgtype PqReplMsg_Keepalive */
    pos += 8;                   /* skip walEnd */
    pos += 8;                   /* skip sendTime */
 
@@ -1064,7 +1065,7 @@ ProcessWALDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
     * message. We only need the WAL location field (dataStart), the rest of
     * the header is ignored.
     */
-   hdr_len = 1;                /* msgtype 'w' */
+   hdr_len = 1;                /* msgtype PqReplMsg_WALData */
    hdr_len += 8;               /* dataStart */
    hdr_len += 8;               /* walEnd */
    hdr_len += 8;               /* sendTime */
index b0bcb3cdc26ebc5d1be76a3d8acf274c2077862e..c64e628628d4be5ce10e2e52d3bf04c40f14b134 100644 (file)
 #define PqMsg_Progress              'P'
 
 
+/* Replication codes sent by the primary (wrapped in CopyData messages). */
+
+#define PqReplMsg_Keepalive            'k'
+#define PqReplMsg_PrimaryStatusUpdate 's'
+#define PqReplMsg_WALData          'w'
+
+
+/* Replication codes sent by the standby (wrapped in CopyData messages). */
+
+#define PqReplMsg_HotStandbyFeedback 'h'
+#define PqReplMsg_PrimaryStatusRequest 'p'
+#define PqReplMsg_StandbyStatusUpdate 'r'
+
+
+/* Codes used for backups via COPY OUT (wrapped in CopyData messages). */
+
+#define PqBackupMsg_Manifest       'm'
+#define PqBackupMsg_NewArchive     'n'
+#define PqBackupMsg_ProgressReport 'p'
+
+
 /* These are the authentication request codes sent by the backend. */
 
 #define AUTH_REQ_OK            0   /* User is authenticated  */