forked from MeetMe/VorpalBunny
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathvorpalbunny.php
360 lines (307 loc) · 12.1 KB
/
vorpalbunny.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
<?php
/**
* Vorpal Bunny is a publishing client for RabbitMQ's JSON-RPC Channel Plugin
*
* The goal is to be a light-weight tool for higher throughput with smaller
* protocol overhead for calling Basic.Publish from PHP applications.
*
* PHP Version 5
*
* @package VorpalBunny
* @author Gavin M. Roy <gmr@myyearbook.com>
* @copyright 2011 Insider Guides, Inc.
* @license http://opensource.org/licenses/bsd-license.php BSD License
* @link http://github.com/myYearbook/VorpalBunny
* @since 2011-02-24
* @version 0.4
*
* Usage:
*
* $vb = new VorpalBunny('localhost');
* $vb->publish( $exchange, $routing_key, $message, $mimeType, $deliveryType);
*/
class VorpalBunny
{
protected static $apcPrefix = 'VorpalBunny:';
protected static $apcIDKey = 'VorpalBunny:id';
protected static $jsonRPCVersion = 1.1;
protected static $jsonRPCTimeout = 3;
protected static $validMethods = array( 'call', 'cast', 'open', 'poll' );
protected static $maxRetries = 3;
protected static $version = 0.4;
private $id = 0;
private $sessionToken = null;
/**
* Initialize the VorpalBunny Class
*
* @param string $host RabbitMQ server to use
* @param int $port RabbitMQ Server HTTP port to use
* @param string $user Username to pass to RabbitMQ when starting a session
* @param string $pass Password to send to RabbitMQ when starting a session
* @param string $vhost RabbitMQ VHost to use
* @param int $timeout Timeout to set on the RabbitMQ JSONRPC Channel side
* @throws Exception when missing APC
*/
function __construct( $host, $port = 55672, $user = 'guest', $pass = 'guest', $vhost = '/', $timeout = 300 )
{
// Do we have APC support for caching session token?
if ( ! is_callable( 'apc_fetch' ) )
{
throw new Exception( "APC is not available, please install APC" );
}
// Construct the APC cache key we'll use in init and elsewhere
$this->cacheKey = self::$apcPrefix . $user . ':' . $pass . '@' . $host . ':' . $port . $vhost;
// Create our Base URL
$this->baseURL = 'http://' . $host . ':' . $port . '/rpc/';
// Hold on to these configuration variables for later use
$this->user = $user;
$this->pass = $pass;
$this->vhost = $vhost;
$this->timeout = $timeout;
$this->curl_init( );
}
/**
* Initialize a new curl connection, do this on each new session
*
* @return void
*/
private function curl_init( )
{
// Delete the previous CURL instance
unset( $this->curl );
// Create our CURL instance
$this->curl = curl_init( );
// Set our CURL options
curl_setopt( $this->curl, CURLOPT_POST, true );
curl_setopt( $this->curl, CURLOPT_RETURNTRANSFER, true );
curl_setopt( $this->curl, CURLOPT_FORBID_REUSE, false );
curl_setopt( $this->curl, CURLOPT_FRESH_CONNECT, false );
curl_setopt( $this->curl, CURLOPT_TIMEOUT, self::$jsonRPCTimeout );
curl_setopt( $this->curl, CURLOPT_USERAGENT, 'VorpalBunny/' . self::$version );
curl_setopt( $this->curl, CURLOPT_HTTPHEADER, array( 'Content-type: application/json',
'x-json-rpc-timeout: ' . self::$jsonRPCTimeout,
'Connection: keep-alive' ) );
}
/**
* Construct the JSON data payload for the POST
*
* @param string $method The RPC call to make, one of open, call, cast, poll
* @param array $params Array of parameters to append to the payload
* @return string JSON encoded array
* @throws Exception
*/
private function getPayload( $method, $params = array( ) )
{
// Make sure we're passing in a valid method
if ( in_array( $method, self::$validMethods ) )
{
// Build our main JSON key/value object
$output = array();
$output['version'] = self::$jsonRPCVersion;
$output['method'] = $method;
$output['id'] = $this->getNextId( );
$output['params'] = $params;
// JSON Encode and return the data
return json_encode( $output );
}
// Better to be strict since invalid data can cause RabbitMQ to kill off the connection with no response
throw new Exception( "Invalid RPC method passed: " . $method );
}
/**
* Return the next communication ID sequence number
*
* @return int
*/
private function getNextId( )
{
// Assume this is set to 0 since we called for the session
$this->id = apc_fetch( self::$apcIDKey ) + 1;
apc_store( self::$apcIDKey, $this->id );
return $this->id;
}
/**
* Retrieves a Session token from the RabbitMQ JSON-RPC Channel Plugin
*
* @param int $recursive request retry counter
* @return void
* @throws Exception
*/
private function getSession( $recursive = 0 )
{
// Reset the session request counter
apc_store( self::$apcIDKey, 0 );
// Defind our parameters array
$parameters = array( $this->user, $this->pass, $this->timeout, $this->vhost );
// Set our post data
$payload = $this->getPayload( 'open', $parameters );
curl_setopt( $this->curl, CURLOPT_POSTFIELDS, $payload );
// Set our URL
$url = $this->baseURL . "rabbitmq";
curl_setopt( $this->curl, CURLOPT_URL, $url );
// Make the Call and get the response
$response = curl_exec( $this->curl );
// Make sure the call succeeded
if ( ! $response )
{
throw new Exception( "Could not connect to the RabbitMQ HTTP Server" );
}
// Parse out the header and body
$header = curl_getinfo( $this->curl );
// Evaluate the return response to make sure we got a good result before continuing
if ( $header['http_code'] != 200 )
{
throw new Exception( "Received a HTTP Error #" . $header['http_code'] . " while getting session. URL: " . $url .
" Payload:" . $payload. " Response: " . $response );
}
// Decode the body into the object representation
$response = json_decode( $response );
// See if we got a RPC error
if ( isset($response->error ) )
{
if ( $recursive < self::$maxRetries )
{
// Rebuild our Curl Object
$this->curl_init( );
// Make a second attept
return $this->getSession( $recursive + 1 );
}
else
{
throw new Exception( "Received " . $recursive . " RPC Errors, while obtaining session. Last URL: " . $url .
" Payload:" . $payload. " Response: " . $response );
}
}
// Make sure we have a body
// Expected response example: {"version":"1.1","id":1,"result":{"service":"F01F0D5ADDF995CAA9B1DCD38AB8E239"}}
if ( ! isset( $response->result ) )
{
throw new Exception( "Missing Required 'response' attribute in JSON response:" . json_encode( $response ) );
}
// Assign our session token
$token = $response->result->service;
// Store the value of the token, timing out before Rabbit does as to reduce number of requests when timed out
apc_store( $this->cacheKey, $token, intval( $this->timeout - ( $this->timeout / 8 ) ) );
// Set our ID counter to 0
apc_store( self::$apcIDKey, 0 );
return $token;
}
/**
* Returns a URL for the specific session id to make calls with
*
* @return string Session URL
*/
private function getSessionURL( )
{
$token = trim( apc_fetch( $this->cacheKey ) );
// If we don't have a valid session token, go get one
if ( ! $token )
{
$token = $this->getSession( );
}
// Return our JSON-RPC-Channel Session URL
return $this->baseURL . $token;
}
/**
* Send a message to RabbitMQ using Basic.Deliver over RPC
*
* For more information on the parameters, see http://www.rabbitmq.com/amqp-0-9-1-quickref.html#basic.deliver
*
* @param string $exchange to publish the message to, can be empty
* @param string $routing_key to publish the message to
* @param string $message to be published, should already be escaped/encoded
* @param array $basic_properties for the message
* @param bool $mandatory set the mandatory bit
* @param bool $immediate set the immediate bit
* @param int $recursive retry counter when trying to recreate a new session
* @return bool Success/Failure
* @throws Exception
*/
function publish( $exchange,
$routing_key,
$message,
$basic_properties = array(),
$mandatory = false,
$immediate = false,
$recursive = 0 )
{
// Make sure they passed in a message
if ( ! strlen( $message ) )
{
throw new Exception( "You must pass in a message to deliver." );
}
// Make sure they passed in a routing_key and exchange
if ( ! strlen( $exchange ) && ! strlen( $routing_key ) )
{
throw new Exception( "You must pass either an exchange or routing key to publish to." );
}
// Set our properties array
$default_properties = array ('content_type' => null,
'encoding' => null,
'headers' => null,
'priority' => null,
'delivery_mode' => null,
'priority' => null,
'correlation_id' => null,
'reply_to' => null,
'expiration' => null,
'message_id' => null,
'timestamp' => time(),
'type' => null,
'user_id' => null,
'app_id' => null,
'reserved' => null );
// Build our message properties, adding a default for things not specified
$properties = array();
foreach ( $default_properties as $key => $value )
{
$properties[] = ( isset( $basic_properties[$key] ) ) ? $basic_properties[$key] : $value;
}
// Second parameter array is: ticket, exchange, routing_key, mandatory, immediate
$parameters = array ( "basic.publish", array(0, $exchange, $routing_key, $mandatory, $immediate ), $message, $properties );
// Set our URL
curl_setopt( $this->curl, CURLOPT_URL, $this->getSessionURL( ) );
// Set our post data
$payload = $this->getPayload( 'cast', $parameters );
curl_setopt( $this->curl, CURLOPT_POSTFIELDS, $payload );
// Make the Call and get the response
$response = curl_exec( $this->curl );
// Parse out the header and body
$header = curl_getinfo( $this->curl );
// Evaluate the return response to make sure we got a good result before continuing
if ( $header['http_code'] != 200 )
{
throw new Exception( "Received a HTTP Error #" . $header['http_code'] . " while sending basic.publish. Response: " . $response);
}
// Decode our JSON response so we can check for success/failure
$response = json_decode( $response );
// See if we got a RPC error
if ( isset($response->error ) )
{
// Try and recurse once to fix the issue of a stale session
if ( in_array( $response->error->code, array( 404, 500 ) ) )
{
if ( $recursive < self::$maxRetries )
{
// Rebuild our Curl Object
$this->curl_init( );
// Remove the existing session key
apc_delete( $this->cacheKey );
// Pubish
return $this->publish( $exchange, $routing_key, $message, $properties, $mandatory, $immediate, $recursive + 1 );
}
}
// Remove the cache key
apc_delete( $this->cacheKey );
// Was an unexpected error
throw new Exception( "Received " . $recursive . " RPC Errors while sending basic.publish. Last URL: " . $url .
" Payload: " . $payload . "Response: " . json_encode( $response ) );
}
// Make sure we have a body
// Expected response example: {"version":"1.1","id":2,"result":[]}
if ( ! isset( $response->result ) )
{
throw new Exception( "Missing Required 'response' attribute in JSON response: " . json_encode( $response ) );
}
return True;
}
}