Writing a Persistent PHP Daemon for RabbitMQ Event Consumption

First, apologies for the incredibly long, and rude, hiatus since my last blog post.  It's been a busy year with lots of things happening both personally and professionally.

I want to show you how to write a persistent daemon using PHP to consume AMQP (RabbitMQ) messages.  PHP was never intended to be used to write applications that persist; PHP applications should be short-lived as intended.  Using PHP to write a daemon application definitely goes against conventional wisdom because, generally speaking, PHP has problems with memory leaks.  In most established languages (thinking C/C++), leaks are usually the responsibility of the programmer since most leaks native to the language and APIs have been identified and fixed.  

PHP, however, is still relatively new and leaks can appear anyway in your application stack, including in your code, the PHP library/API, or within 3rd-party libraries.  Identifying the source of the leaks can be challenging and, regardless of how many destructor events you explicitly invoke, chances are better-than-good that you'll encounter a leak.

In my recent experiences, my object-oriented framework experienced memory leaks ranging from non-existent (stable) to OMFG (silicon hemophiliac) levels.  Specifically speaking about AMQP and RabbitMQ, my static brokers (fire-n-forget) are usually pretty stable with respect to their memory footprint while my RPC brokers usually leak in varying degrees depending on the complexity of the event being processed.  (e.g.: how many class objects are being instantiated to process the request.)

This is a significant problem since broker applications have to persist and handle event requests 24x7.  The more memory an broker is leaking, the fewer request that can be successfully processed before the broker implodes.  To mitigate memory leaks, I've tried several iterations of solutions and the one I finally landed on - my last deployment has been running for over a year with no downtime - uses a forking (parent-child) model.

In this example, taken from a production application, I am going to demonstrate how to set-up your PHP application to support parent-child-processing, how to properly fork, and how to intercept the SIGCHLD event within the parent, and how to respawn and replace a new child process so that services are not interrupted nor blocked.

This is an efficient, if not graceful, way to mitigate PHP memory leaks;  since all processing (class instantiation, API and function calls, etc.) happen in the child process, when the child dies, whatever memory leaks created in the child, die with the child and are released.

Algorithm Overview

First, we're going to make a few assumptions here:

  • You have established a working RabbitMQ environment that may or may not be using TLS
  • You have successfully connected to RabbitMQ previously and are somewhat familiar with how to write an AMQP RPC consumer using PHP
  • This code is currently working under Ubuntu 14.04 and PHP (cli) 5.5.9, RabbitMQ v3.6.8 with Erlang v19.3.

Disclaimer

Basically, I will show you the logic behind the broker - the code as stand-alone, will not work.  Writing the code to actually connect to the various services is beyond the scope of this article.

Set-Up

When the broker code starts, it will call several off-page modules to do the following:

  • load the framework environment:  constants file, functions, auto-loader, and configuration
  • reads the configuration for the number of child instances to spawn, and the number of requests per child instance.
  • set-up logging.  I use a combination of console logging where STDERR is redirected STDOUT and STDOUT is redirected to a flat file / console.log, and mongo logging.  In the event that mongo, as a resource, is not available, then log messages are redirected to the console instead.

Signal Handler

I am going to set-up a replacement signal-handler for the SIGCHLD event:

////////////////////////////
// set-up the replacement signal handler that will be called on a child's death 
////////////////////////////
declare( ticks = 1);
function sigHandler($_sig) {
    global $numberChildren;
    switch ($_sig) {
        case SIGCHLD :
            $numberChildren--;
            while (($pid = pcntl_wait($_sig, WNOHANG)) > 0) {
                @pcntl_wexitstatus($_sig);
            }
            break;
    }
}
pcntl_signal(SIGCHLD, 'sigHandler');

 

Next, we're going to set-up the forking function that will handle the child-forking, signal detection, and event-processing for the broker:

 

//////////////////////
// set-up the forking function so that it can be called initially or on a SIGCHLD event 
//////////////////////
function forkMe()
{
    global $thisWatcher, $eos, $res, $parentLog, $requestsPerInstance, $myRequestsPerInstance, $startingMemory;
    $myRequestsPerInstance = $requestsPerInstance + ((rand(2, 5)) * 10);
    $startingMemory = memory_get_usage(true);

    $thisPid = pcntl_fork();

    switch ($thisPid) {
        case -1 :   // error
            $cmsg = ERROR_FORK_FAILED . $thisWatcher;
            $parentLog->fatal($cmsg);
            die(getDateTime() . CON_ERROR . $res . $cmsg . $eos);
            break;
        case 0 :    // child (broker daemon)
            // replace the sigchld signal handler
            pcntl_signal(SIGCHLD, SIG_DFL);
            $thisPid = getmypid();

            // create the child logger object
            $childLog = new ErrorLogger();

            $queueTag = myConfig::$settings[CONFIG_ADMIN][CONFIG_BROKER_QUEUE_TAG];
//            $exchange = BROKER_EXCHANGE_A1;

            $queue = $queueTag . BROKER_QUEUE_AI;
            $brokerConnection = ResourceManager::fetchResource(RESOURCE_ADMIN);
            if (is_null($brokerConnection)) {
                $childLog->fatal(ERROR_RESOURCE_404 . RESOURCE_BROKER . COLON . BROKER_QUEUE_AI);
                echo getDateTime() . CON_ERROR .  $res . ERROR_RESOURCE_404 . RESOURCE_BROKER . COLON . BROKER_QUEUE_AI . $eos;
                exit(1);                                                    // shell-script exit value for fail
            }

            try {
                $brokerChannel = $brokerConnection->channel();
                $brokerChannel->queue_declare($queue, BROKER_QUEUE_DECLARE_PASSIVE, false, false, true);
            } catch (\PhpAmqpLib\Exception\AMQPRuntimeException $e) {
                $childLog->fatal($e->getMessage());
                echo getDateTime() . CON_ERROR . $res . ERROR_BROKER_QUEUE_DECLARE . $queue . $eos;
                exit(1);
            } catch (\PhpAmqpLib\Exception\AMQPProtocolChannelException $e) {
                $childLog->fatal($e->getMessage());
                echo getDateTime() . CON_ERROR . $res . ERROR_BROKER_QUEUE_DECLARE . $queue . $eos;
                exit(1);
            } catch (Exception $e) {
                $childLog->fatal($e->getMessage());
                echo getDateTime() . CON_ERROR . $res . ERROR_BROKER_QUEUE_DECLARE . $queue . $eos;
                exit(1);
            }

            register_shutdown_function(BROKER_SHUTDOWN_FUNCTION, $brokerChannel, $brokerConnection, $res);
            
            $callback = function($_request) {
                $startTime = myStatic::doingTime();
                global $requestCounter, $res, $eos, $myRequestsPerInstance, $startingMemory;

                $qName = 'myRpcQueue';
                $requestCounter++;
                $returnData = null;
                $request = null;
                $hdrConE = CON_ERROR . $res;
                $hdrConG = CON_SUCCESS . $res;
                $eventSuccess = false;
                $conMsg = '';
                $eventGUID = guid();
                $thisPid = getmypid();

                // set-up the call-back logger
                $callBackLog = new ErrorLogger($eventGUID);

                if (!PayloadValidation($_request, $msg, $request, $eventGUID)) {
                    $conMsg = $hdrConE . $msg;
                    $msg = $res . $msg;
                    $callBackLog->info($msg);
                    $event = BROKER_QUEUE_AI . '(' . ERROR_DATA_VALIDATION_FIRST_PASS . ')';
                } else {
                    $event = BROKER_QUEUE_AI . '(' . $request[BROKER_REQUEST] . ')';
                    if (myConfig::$settings[CONFIG_DEBUG]) $callBackLog->debug(BROKER_REQUEST . COLON . $request[BROKER_REQUEST]);
                    switch ($request[BROKER_REQUEST]) {
                        case BROKER_REQUEST_SHUTDOWN :
                            $_request->delivery_info[BROKER_CHANNEL]->basic_cancel($_request->delivery_info[BROKER_DELIVERY_TAG]);
                            $conMsg = $hdrConG . SUCCESS_SHUTDOWN . $eos;
                            $eventSuccess = true;
                        break;

                        case BROKER_REQUEST_PING :
                            $conMsg = $hdrConG . SUCCESS_PING . BROKER_QUEUE_DU . $eos;
                            $eventSuccess = true;
                        break;
                        
                        default :
                            $conMsg = $hdrConE . ERROR_BROKER_EVENT_UNKNOWN . $request[BROKER_REQUEST];
                            $callBackLog->warn(ERROR_BROKER_EVENT_UNKNOWN . $request[BROKER_REQUEST]);
                            // todo - not a supported event so log something dire
                        break;
                    }
                }
                if (!$eventSuccess and empty($conMsg)) {
                    $conMsg = $hdrConE . ERROR_FINE_PICKLE;
                }
                if (!empty($conMsg)) {
                    echo getDateTime() . $conMsg . ' (' . $requestCounter . "/$myRequestsPerInstance)" . $eos;
                }
                $_request->delivery_info[BROKER_CHANNEL]->basic_ack($_request->delivery_info[BROKER_DELIVERY_TAG]);

                // exit the child if we've reached the request limit
                if ($requestCounter >= $myRequestsPerInstance) {
                    if (getmypid() == $thisPid) {
                        $data = [
                            MONGO_BROKER_METRICS_MEM_START => $startingMemory,
                            MONGO_BROKER_METRICS_MEM_PEAK => memory_get_peak_usage(true),
                            MONGO_BROKER_METRICS_MEM_END => memory_get_usage(true),
                            MONGO_BROKER_METRICS_NAME => CONFIG_BROKER_ADMIN_BROKER_IN,
                            MONGO_BROKER_METRICS_RPI => $myRequestsPerInstance,
                            MONGO_BROKER_METRICS_PID => $thisPid
                        ];
                        
                    // do something with metric data here
                    
                    echo getDateTime() . CON_SYSTEM . $res . 'request count limit reached - exiting...' . $eos;
                    exit(0);
                }
            };
            echo getDateTime() . CON_SUCCESS . $res . $qName . ' queue established as pid: ' . $thisPid . ', for ' . $myRequestsPerInstance . ' requests' . $eos;
            $brokerChannel->basic_qos(null, 1, null);
            $brokerChannel->basic_consume($queue, '', false, false, false, false, $callback);
            while (count($brokerChannel->callbacks)) {
                $brokerChannel->wait();
            }
            break;

        case 1 :    // parent
            ;       // does nothing
            break;
    }
    return($thisPid);
}

 

Remember - this is a non-working stub that shows/demonstrates how to set-up a proper forking parent-child relationship.  The code shown has been heavily modified removing non-essential bits but the basic concepts are still valid.

At the top of the function, we're going to import some global variables and set-up some local variables.  We're also going to take a snap-shot of our starting memory usage for logging later on a SIGCHLD event.

We fork the parent into a child process with the pcntl_fork() function and then call a switch-case statement to evaluate the success of the fork request.  

Under case(0), the child code replaces the signal handler, gets the child's PID value, and then sets-up the local environment:  gets the local error logger, the local resource for rabbitMQ, (and whatever other resource-type objects you will need) and then makes the connection to the RabbitMQ service.

We have to explicitly connect to our resources within the child code, as opposed to inheriting these resources from a parent, because when the child dies, it will close the resource.  If all children of the parent are inheriting the parent's resource handlers, then the first child to die will close that resource on it's death as well as all the other connections for that resource for all of the children.  In other words, when the first child dies, if it inherits the parent's AMQP resource handler, then it will close that handler on it's death... which will close not only all of the other child AMQP resources, but the parent resource too.

The callback function is declared next - this function is called when an event is received/consumed from the queue using the basic_consume() function from the phpamqplib library.  The callback function, like yours and/or the demo function, is where you put your event processing.  Here, since this is an RPC queue, I've provided two events, which I prefer to handle using switch-case constructs, for cleanly shutting-down the broker, and for a ping() event.

Note that we're tracking the number of requests processed, with $requestCounter, and that this variable is evaluated after the switch-case construct to see if the number of requests has exceeded the request limit.

If so, we ensure we're in the current child (thisPid = currentPid) and we collect some memory metrics.  I use a discrete mongo collection to store the metrics which allows me to aggregate and report on the broker memory usage and efficiency.  A sample record is shown below:

{
    "_id" : ObjectId("5818dc42b647a602778b45c0"),
    "memStart_bem" : NumberLong(3670016),
    "memPeak_bem" : NumberLong(8126464),
    "memEnd_bem" : NumberLong(8126464),
    "brokerName_bem" : "adminBrokerIn",
    "brokerRequests_bem" : NumberLong(21),
    "brokerPID_bem" : NumberLong(30466),
    "history" : [
        {
            "sessionIP" : "127.0.0.1",
            "miscInfo" : "processing SIGCHLD in broker event",
            "eventGUID" : "AB6E5060-001E-FE34-44A5-5A9E7D0B1C5A",
            "sessionDate" : ISODate("2016-11-01T18:17:38.597Z"),
            "sessionEvent" : "create"
        }
    ],
    "id_bem" : NumberLong(12),
    "status_bem" : "ACTIVE",
    "token_bem" : "9E4BB5A4-A024-CE72-4442-ECE33F235E2B"
}

 

The callback function will loop indefinitely until shutdown, or until it's processed it's quota of requests.

The final bit of code, within the child, and after the call back function, are standard AMQP calls which set-up the queue-consumption looping.

Parent Code

The parent code is fairly straight-forward.  For however many brokers of this type we're going to run asynchronously, based on our configuration file, we're going to loop and invoke the forkMe() method to create a child registered to this parent.  This architecture/design provides horizontal scaling for the framework by, at start-time, determining how many brokers to instantiate.  (Determined by available system resources.)

for ($numBrokers = 0; $numBrokers < $runningBrokers; $numBrokers++) {
    $childrenPidList[] = forkMe();
}
echo getDateTime() .  CON_SUCCESS . $res . count($childrenPidList) . ' instances of ' . BROKER_QUEUE_AI . ' started...' . $eos;

// the parent process continues to run, waking-up every second to monitor it's children...
// when a child dies, it's death-rattle is caught and the child is replaced with a new process.
while (count($childrenPidList)) {
    $lastPid = 0;
    $newPidList = null;
    $result = pcntl_waitpid(0, $status);        // detect any sigchld from the parent-group
    if (in_array($result, $childrenPidList)) {
        $key = array_search($result, $childrenPidList);
        array_splice($childrenPidList, $key, 1);
        // process has already exited -- restart it
        $childrenPidList[] = forkMe();
    }
}

 

We put the parent in an loop that wakes-up every second to see if there's a SIGCHLD event to process.  The difference between the logic in this loop, and other fork-control logic you may have seen is that the the loop isn't going to block on any particular child through the parameters we've passed to the pcntl_waitpid() function.  Once we've detected the death of a child, (the signal handler returned the child PID), then we remove that PID from the array containing the list of all running child pids and fork() a new child, adding that pid to the array list.

Final Thoughts

At the top of the listing, when we've first forked, you see this line of code:

$myRequestsPerInstance = $requestsPerInstance + ((rand(2, 5)) * 10);

This sets/limits the number of requests for a specific child to process to a base amount ($requestPerInstance - set in the configuration file), plus a random number of requests (20-50).  I offset the number of requests per child because of RabbitMQ's round-robin message delivery.  If all your children finish processing and die at the same time, it can lead to a race-condition within your child consumers.  Offsetting the requests per child allow for the overhead of instantiating new child instances without interruption of service.  Forking, after all, is an expensive system call.

 

Code constants are defined by their upper-case representation and are self-documenting.  I intentionally didn't define the values behind the constants because I just want to show you the general algorithm for setting up a persistent PHP consumer.

My last iteration of this code has been running in production for over a year as of this writing.  In that time, the code hasn't crashed - we lost a service which forced a restart, and we pulled the brokers down twice, intentionally, for scheduled updates, but the code itself has performed quite well.  Taking responsibility for mitigating memory leaks provided my customer with a stable service.

Next...

My next article will demonstrate a broker publisher client class and how you can use it in writing unit tests for your broker code.  Until then, I hope this helped you!