|
1 # vim: set et nosta sw=4 ts=4 ft=nim : |
|
2 # |
|
3 # Copyright (c) 2016-2018, Mahlon E. Smith <mahlon@martini.nu> |
|
4 # All rights reserved. |
|
5 # Redistribution and use in source and binary forms, with or without |
|
6 # modification, are permitted provided that the following conditions are met: |
|
7 # |
|
8 # * Redistributions of source code must retain the above copyright |
|
9 # notice, this list of conditions and the following disclaimer. |
|
10 # |
|
11 # * Redistributions in binary form must reproduce the above copyright |
|
12 # notice, this list of conditions and the following disclaimer in the |
|
13 # documentation and/or other materials provided with the distribution. |
|
14 # |
|
15 # * Neither the name of Mahlon E. Smith nor the names of his |
|
16 # contributors may be used to endorse or promote products derived |
|
17 # from this software without specific prior written permission. |
|
18 # |
|
19 # THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND ANY |
|
20 # EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED |
|
21 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE |
|
22 # DISCLAIMED. IN NO EVENT SHALL THE REGENTS AND CONTRIBUTORS BE LIABLE FOR ANY |
|
23 # DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES |
|
24 # (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; |
|
25 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND |
|
26 # ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
|
27 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS |
|
28 # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
|
29 |
|
30 ## Overview |
|
31 ## ============ |
|
32 ## |
|
33 ## This is a pure-nim client library for interacting with Stomp 1.2 |
|
34 ## compliant messaging brokers. |
|
35 ## |
|
36 ## https://stomp.github.io/stomp-specification-1.2.html |
|
37 ## |
|
38 ## Stomp is a simple protocol for message passing between clients, using a central |
|
39 ## broker. It is a subset of other more elaborate protocols (like AMQP), supporting |
|
40 ## only the most used features of common brokers. |
|
41 ## |
|
42 ## Because this library is pure-nim, there are no external dependencies. If you |
|
43 ## can compile a nim binary, you can participate in advanced messaging between processes. |
|
44 ## |
|
45 ## A list of broker support for Stomp can be found here: |
|
46 ## https://stomp.github.io/implementations.html. |
|
47 ## |
|
48 ## This library has been tested with recent versions of RabbitMQ. If it |
|
49 ## works for you with another broker, please let the author know. |
|
50 ## |
|
51 |
|
52 import |
|
53 strutils, |
|
54 nativesockets, |
|
55 net, |
|
56 os, |
|
57 times, |
|
58 uri |
|
59 |
|
60 const |
|
61 VERSION = "0.1.1" ## The current program version. |
|
62 NULL = "\x00" ## The NULL character. |
|
63 CR = "\r" ## The carriage return character. |
|
64 CRLF = "\r\n" ## Carriage return + Line feed (EOL). |
|
65 |
|
66 |
|
67 # Exceptions |
|
68 # |
|
69 type |
|
70 StompError* = object of ValueError ## A generic Stomp error state. |
|
71 |
|
72 StompClient* = ref object of RootObj ## An object that represents a connection to a Stomp compatible server. |
|
73 socket: Socket ## The socket object attached to this client. |
|
74 connected*: bool ## Is the client currently connected? |
|
75 uri*: Uri ## The URI used to instantiate this client. |
|
76 username: string ## The Stomp server user, if any. |
|
77 password: string ## The Stomp server password, if any. |
|
78 host: string ## The host or IP address to connect to. |
|
79 port: Port ## Optional, if Stomp is on a non-default port. |
|
80 vhost: string ## Parsed from the URI path, a Stomp "virtual host". |
|
81 timeout*: int ## Global socket timeout. |
|
82 last_msgtime*: Time ## Timestamp of last seen server message. |
|
83 options: tuple[ heartbeat: int ] ## Any supported client options, derived from the URI query string. |
|
84 subscriptions*: seq[ string ] ## Registered client subscriptions. Array position is the ID. |
|
85 transactions*: seq[ string ] ## Any currently open transactions. |
|
86 serverinfo: seq[ tuple[name: string, value: string] ] ## Server metadata, populated upon a successful connection. |
|
87 |
|
88 connected_callback*: proc ( client: StompClient, response: StompResponse ): void |
|
89 error_callback*: proc ( client: StompClient, response: StompResponse ): void |
|
90 heartbeat_callback*: proc ( client: StompClient, response: StompResponse ): void |
|
91 message_callback*: proc ( client: StompClient, response: StompResponse ): void |
|
92 missed_heartbeat_callback*: proc ( client: StompClient ): void |
|
93 receipt_callback*: proc ( client: StompClient, response: StompResponse ): void |
|
94 |
|
95 StompResponse* = ref object of RootObj ## A parsed packet from a Stomp server. |
|
96 headers: seq[ tuple[name: string, value: string] ] ## Any headers in the response. Access with the `[]` proc. |
|
97 frame*: string ## The Stomp frame type. |
|
98 payload*: string ## The message body, if any. |
|
99 |
|
100 |
|
101 # convenience |
|
102 proc printf( formatstr: cstring ) {.header: "<stdio.h>", varargs.} |
|
103 |
|
104 |
|
105 #------------------------------------------------------------------- |
|
106 # R E S P O N S E |
|
107 #------------------------------------------------------------------- |
|
108 |
|
109 proc is_eol( s: string ): bool = |
|
110 ## Convenience method, returns **true** if string is a Stomp EOF. |
|
111 return s == CR or s == CRLF |
|
112 |
|
113 |
|
114 proc parse_headers( response: StompResponse, c: StompClient ): int = |
|
115 ## Parse response headers from a stream. |
|
116 ## Returns the content length of the response body, or 0. |
|
117 result = 0 |
|
118 var line = "" |
|
119 |
|
120 c.socket.readline( line, c.timeout ) |
|
121 while not line.is_eol: |
|
122 if defined( debug ): printf " <-- %s\n", line |
|
123 var header = line.split( ":" ) |
|
124 if header.len < 2: break |
|
125 response.headers.add( (header[0], header[1]) ) |
|
126 if cmpIgnoreCase( header[0], "content-length" ) == 0: result = header[1].parse_int |
|
127 c.socket.readline( line, c.timeout ) |
|
128 |
|
129 |
|
130 proc parse_payload( response: StompResponse, c: StompClient, bodylength = 0 ): void = |
|
131 ## Parse message payload from a stream. |
|
132 let bufsize = 8192 |
|
133 var |
|
134 buf = "" |
|
135 data = "" |
|
136 |
|
137 |
|
138 # If we already know the length of the body, just perform a buffered read. |
|
139 # |
|
140 if bodylength > 0: |
|
141 var |
|
142 readtotal = 0 |
|
143 readamt = 0 |
|
144 remaining = 0 |
|
145 |
|
146 while readtotal != bodylength: |
|
147 remaining = bodylength - readtotal |
|
148 |
|
149 if remaining < bufsize: |
|
150 readamt = remaining |
|
151 else: |
|
152 readamt = bufsize |
|
153 |
|
154 buf = newString( readamt ) |
|
155 readtotal = readtotal + c.socket.recv( buf, readamt, c.timeout ) |
|
156 data = data & buf |
|
157 |
|
158 # Eat the NULL terminator. |
|
159 discard c.socket.recv( buf, 1, c.timeout ) |
|
160 |
|
161 # Inefficient path. |
|
162 # |
|
163 else: |
|
164 while buf != NULL: |
|
165 discard c.socket.recv( buf, 1, c.timeout ) |
|
166 data = data & buf |
|
167 |
|
168 response.payload = data |
|
169 |
|
170 |
|
171 proc newStompResponse( c: StompClient ): StompResponse = |
|
172 ## Initialize a response object, which parses and contains |
|
173 ## the Stomp headers and any additional important data from |
|
174 ## the broker socket. |
|
175 new( result ) |
|
176 result.headers = @[] |
|
177 |
|
178 # Get the frame type, record last seen server activity time. |
|
179 # |
|
180 var line = "" |
|
181 c.socket.readline( line, c.timeout ) |
|
182 c.last_msgtime = get_time() |
|
183 |
|
184 # Heartbeat packets (empties.) |
|
185 # |
|
186 # This could -also- parse optional EOLs from the prior |
|
187 # message (after the NULL separator), but since it is a no-op, |
|
188 # this seems harmless. |
|
189 # |
|
190 if line.is_eol: |
|
191 result.frame = "HEARTBEAT" |
|
192 return result |
|
193 |
|
194 # All other types. |
|
195 # |
|
196 result.frame = line |
|
197 if defined( debug ): |
|
198 printf " <-- %s\n", line |
|
199 |
|
200 # Parse headers and body. |
|
201 # |
|
202 var length = result.parse_headers( c ) |
|
203 if result.frame == "MESSAGE" or result.frame == "ERROR": |
|
204 result.parse_payload( c, length ) |
|
205 |
|
206 # If the response -could- have a body, the NULL has already |
|
207 # been removed from the stream while we checked for one. |
|
208 # |
|
209 if result.payload.len > 0: |
|
210 if result.payload == NULL: # We checked for a body, but there was none. |
|
211 result.payload = "" |
|
212 if defined( debug ): printf " <--\n <-- ^@\n\n" |
|
213 else: |
|
214 if defined( debug ): printf " <--\n <-- (payload)^@\n\n" |
|
215 |
|
216 # Otherwise, pop off the NULL terminator now. |
|
217 # |
|
218 else: |
|
219 discard c.socket.recv( line, 1, c.timeout ) |
|
220 if defined( debug ): printf " <--\n <-- ^@\n\n" |
|
221 |
|
222 |
|
223 proc `$`*( r: StompResponse ): string = |
|
224 ## Represent a Stomp response as a string. |
|
225 result = r.frame & ": " & $r.headers |
|
226 |
|
227 |
|
228 proc `[]`*( response: StompResponse, key: string ): string = |
|
229 ## Get a specific header from a Stomp response. |
|
230 for header in response.headers: |
|
231 if cmpIgnoreCase( key, header.name ) == 0: |
|
232 return header.value |
|
233 return "" |
|
234 |
|
235 |
|
236 #------------------------------------------------------------------- |
|
237 # C A L L B A C K S |
|
238 #------------------------------------------------------------------- |
|
239 |
|
240 proc default_error_callback( c: StompClient, response: StompResponse ) = |
|
241 ## Something bad happened. Disconnect from the server, build an error message, |
|
242 ## and raise an exception. |
|
243 c.socket.close |
|
244 c.connected = false |
|
245 |
|
246 var detail = response.payload |
|
247 var msg = response[ "message" ] |
|
248 if $detail[ ^1 ] == "\n": detail = detail[ 0 .. ^2 ] # chomp |
|
249 |
|
250 if detail.len > 0: msg = msg & " (" & detail & ")" |
|
251 raise newException( StompError, "ERROR: " & msg ) |
|
252 |
|
253 |
|
254 proc default_missed_heartbeat_callback( c: StompClient ) = |
|
255 ## Timeout while connected to the broker. |
|
256 c.socket.close |
|
257 c.connected = false |
|
258 raise newException( StompError, "Heartbeat timeout. Last activity: " & $c.last_msgtime ) |
|
259 |
|
260 |
|
261 |
|
262 #------------------------------------------------------------------- |
|
263 # C L I E N T |
|
264 #------------------------------------------------------------------- |
|
265 |
|
266 proc newStompClient*( s: Socket, uri: string ): StompClient = |
|
267 ## Create a new Stomp client object from a preexisting **socket**, |
|
268 ## and a stomp **URI** string. |
|
269 ## |
|
270 ## .. code-block:: nim |
|
271 ## |
|
272 ## var socket = newSocket() |
|
273 ## var stomp = newStompClient( socket, "stomp://test:test@example.com/%2Fvhost" ) |
|
274 ## |
|
275 ## or if connecting with SSL, when compiled with -d:ssl: |
|
276 ## |
|
277 ## .. code-block:: nim |
|
278 ## |
|
279 ## var socket = newSocket() |
|
280 ## let sslContext = newContext( verifyMode = CVerifyNone ) |
|
281 ## sslContext.wrapSocket(socket) |
|
282 ## var stomp = newStompClient( socket, "stomp+ssl://test:test@example.com/%2Fvhost" ) |
|
283 ## |
|
284 new( result ) |
|
285 result.socket = s |
|
286 result.connected = false |
|
287 result.uri = parse_uri( uri ) |
|
288 result.username = result.uri.username |
|
289 result.password = result.uri.password |
|
290 result.host = result.uri.hostname |
|
291 result.vhost = result.uri.path |
|
292 result.timeout = 500 |
|
293 result.subscriptions = @[] |
|
294 result.transactions = @[] |
|
295 |
|
296 # Parse any supported options in the query string. |
|
297 # |
|
298 for pairs in result.uri.query.split( '&' ): |
|
299 let opt = pairs.split( '=' ) |
|
300 try: |
|
301 case opt[0]: |
|
302 of "heartbeat": |
|
303 result.options.heartbeat = opt[1].parse_int |
|
304 else: |
|
305 discard |
|
306 except IndexError, ValueError: |
|
307 discard |
|
308 |
|
309 # Set default STOMP port if otherwise unset. |
|
310 # |
|
311 if not result.uri.scheme.contains( "stomp" ): |
|
312 raise newException( StompError, "Unknown scheme: " & result.uri.scheme ) |
|
313 var port: int |
|
314 if result.uri.port == "": |
|
315 if result.uri.scheme.contains( "+ssl" ): |
|
316 port = 61614 |
|
317 else: |
|
318 port = 61613 |
|
319 else: |
|
320 port = result.uri.port.parse_int |
|
321 |
|
322 result.port = Port( port ) |
|
323 |
|
324 # Decode URI encoded slashes for vhosts. |
|
325 result.vhost = result.vhost.replace( "%2f", "/" ).replace( "%2F", "/" ).replace( "//", "/" ) |
|
326 |
|
327 |
|
328 proc socksend( c: StompClient, data: string ): void = |
|
329 ## Send data on the connected socket with optional debug output. |
|
330 c.socket.send( data ) |
|
331 if defined( debug ): printf " --> %s", data |
|
332 |
|
333 |
|
334 proc finmsg( c: StompClient ): void = |
|
335 ## Send data on the connected socket with optional debug output. |
|
336 c.socket.send( CRLF & NULL & CRLF ) |
|
337 if defined( debug ): printf " --> \n --> ^@\n\n" |
|
338 |
|
339 |
|
340 proc `[]`*( c: StompClient, key: string ): string = |
|
341 ## Get a specific value from the server metadata, set during the initial connection. |
|
342 if not c.connected: return "" |
|
343 for header in c.serverinfo: |
|
344 if cmpIgnoreCase( key, header.name ) == 0: |
|
345 return header.value |
|
346 return "" |
|
347 |
|
348 |
|
349 proc `$`*( c: StompClient ): string = |
|
350 ## Represent the stomp client as a string, after masking the password. |
|
351 let uri = ( $c.uri ).replace( ":" & c.uri.password & "@", "@" ) |
|
352 result = "(NimStomp v" & VERSION & ( if c.connected: " connected" else: " not connected" ) & " to " & uri |
|
353 if not ( c[ "server" ] == "" ): result.add( " --> " & c["server"] ) |
|
354 result.add( ")" ) |
|
355 |
|
356 |
|
357 proc connect*( c: StompClient ): void = |
|
358 ## Establish a connection to the Stomp server. |
|
359 if c.connected: return |
|
360 |
|
361 var headers: seq[ tuple[name: string, value: string] ] = @[] |
|
362 headers.add( ("accept-version", "1.2") ) |
|
363 |
|
364 # Stomp 1.2 requires the Host: header. Use the path as a vhost if |
|
365 # supplied, otherwise use the hostname of the server. |
|
366 # |
|
367 if c.vhost != "": |
|
368 headers.add( ("host", c.vhost) ) |
|
369 else: |
|
370 headers.add( ("host", c.host) ) |
|
371 |
|
372 if c.username != "": headers.add( ("login", c.username) ) |
|
373 if c.password != "": headers.add( ("passcode", c.password) ) |
|
374 if c.options.heartbeat > 0: |
|
375 let heartbeat = c.options.heartbeat * 1000 |
|
376 headers.add( ("heart-beat", "0," & $heartbeat) ) |
|
377 |
|
378 # Connect the socket and send the headers off. |
|
379 # |
|
380 c.socket.connect( c.host, c.port ) |
|
381 c.socksend( "CONNECT" & CRLF ) |
|
382 for header in headers: |
|
383 c.socksend( header.name & ":" & header.value & CRLF ) |
|
384 c.finmsg |
|
385 |
|
386 # Retreive and copy server metadata to client object. |
|
387 # |
|
388 var response = newStompResponse( c ) |
|
389 c.serverinfo = response.headers |
|
390 |
|
391 if response.frame != "CONNECTED": |
|
392 if not isNil( c.error_callback ): |
|
393 c.error_callback( c, response ) |
|
394 else: |
|
395 c.default_error_callback( response ) |
|
396 else: |
|
397 c.connected = true |
|
398 if not isNil( c.connected_callback ): |
|
399 c.connected_callback( c, response ) |
|
400 |
|
401 |
|
402 proc disconnect*( c: StompClient ): void = |
|
403 ## Break down the connection to the Stomp server nicely. |
|
404 if not c.connected: return |
|
405 c.socksend( "DISCONNECT" & CRLF ) |
|
406 c.finmsg |
|
407 |
|
408 c.socket.close |
|
409 c.connected = false |
|
410 |
|
411 |
|
412 proc add_txn( c: StompClient ): void = |
|
413 ## Add a transaction header if there is only a single open txn. |
|
414 if c.transactions.len != 1: return |
|
415 c.socksend( "transaction:" & c.transactions[0] & CRLF ) |
|
416 |
|
417 |
|
418 proc send*( c: StompClient, |
|
419 destination: string, |
|
420 message: string = "", |
|
421 contenttype: string = "", |
|
422 headers: seq[ tuple[name: string, value: string] ] = @[] ): void = |
|
423 ## Send a **message** to **destination**. |
|
424 ## |
|
425 ## A Content-Length header is automatically and always included. |
|
426 ## A **contenttype** is optional, but strongly recommended. |
|
427 ## |
|
428 ## Additionally, a transaction ID is automatically added if there is only |
|
429 ## one transaction active. If you need to attach this message to a particular |
|
430 ## transaction ID, you'll need to add it yourself with the user defined |
|
431 ## **headers**. |
|
432 |
|
433 if not c.connected: raise newException( StompError, "Client is not connected." ) |
|
434 c.socksend( "SEND" & CRLF ) |
|
435 c.socksend( "destination:" & destination & CRLF ) |
|
436 c.socksend( "content-length:" & $message.len & CRLF ) |
|
437 if not ( contenttype == "" ): c.socksend( "content-type:" & contenttype & CRLF ) |
|
438 |
|
439 # Add custom headers. Add transaction header if one isn't manually |
|
440 # present (and a transaction is open.) |
|
441 # |
|
442 var txn_seen = false |
|
443 for header in headers: |
|
444 if header.name == "transaction": txn_seen = true |
|
445 c.socksend( header.name & ":" & header.value & CRLF ) |
|
446 if not txn_seen: c.add_txn |
|
447 |
|
448 if message == "": |
|
449 c.finmsg |
|
450 else: |
|
451 c.socket.send( CRLF & message & NULL ) |
|
452 if defined( debug ): printf " -->\n --> (payload)^@\n\n" |
|
453 |
|
454 |
|
455 proc subscribe*( c: StompClient, |
|
456 destination: string, |
|
457 ack = "auto", |
|
458 headers: seq[ tuple[name: string, value: string] ] = @[] ): void = |
|
459 ## Subscribe to messages at **destination**. |
|
460 ## |
|
461 ## Setting **ack** to "client" or "client-individual" enables client ACK/NACK mode. |
|
462 ## In this mode, incoming messages aren't considered processed by |
|
463 ## the server unless they receive ACK. By default, the server |
|
464 ## considers the message processed if a client simply accepts it. |
|
465 ## |
|
466 ## You may optionally add any additional **headers** the server may support. |
|
467 |
|
468 if not c.connected: raise newException( StompError, "Client is not connected." ) |
|
469 c.socksend( "SUBSCRIBE" & CRLF ) |
|
470 c.socksend( "destination:" & destination & CRLF ) |
|
471 c.socksend( "id:" & $c.subscriptions.len & CRLF ) |
|
472 if ack == "client" or ack == "client-individual": |
|
473 c.socksend( "ack:" & ack & CRLF ) |
|
474 else: |
|
475 if ack != "auto": raise newException( StompError, "Unknown ack type: " & ack ) |
|
476 |
|
477 for header in headers: |
|
478 c.socksend( header.name & ":" & header.value & CRLF ) |
|
479 c.finmsg |
|
480 c.subscriptions.add( destination ) |
|
481 |
|
482 |
|
483 proc unsubscribe*( c: StompClient, |
|
484 destination: string, |
|
485 headers: seq[ tuple[name: string, value: string] ] = @[] ): void = |
|
486 ## Unsubscribe from messages at **destination**. |
|
487 ## You may optionally add any additional **headers** the server may support. |
|
488 |
|
489 if not c.connected: raise newException( StompError, "Client is not connected." ) |
|
490 var |
|
491 sub_id: int |
|
492 i = 0 |
|
493 |
|
494 # Find the ID of the subscription. |
|
495 # |
|
496 for sub in c.subscriptions: |
|
497 if sub == destination: |
|
498 sub_id = i |
|
499 break |
|
500 i = i + 1 |
|
501 |
|
502 c.socksend( "UNSUBSCRIBE" & CRLF ) |
|
503 c.socksend( "id:" & $sub_id & CRLF ) |
|
504 for header in headers: |
|
505 c.socksend( header.name & ":" & header.value & CRLF ) |
|
506 c.finmsg |
|
507 c.subscriptions[ sub_id ] = "" |
|
508 |
|
509 |
|
510 proc begin*( c: StompClient, txn: string ): void = |
|
511 ## Begin a new transaction on the broker, using **txn** as the identifier. |
|
512 c.socksend( "BEGIN" & CRLF ) |
|
513 c.socksend( "transaction:" & txn & CRLF ) |
|
514 c.finmsg |
|
515 c.transactions.add( txn ) |
|
516 |
|
517 |
|
518 proc commit*( c: StompClient, txn: string = "" ): void = |
|
519 ## Finish a specific transaction **txn**, or the most current if unspecified. |
|
520 var transaction = txn |
|
521 if transaction == "" and c.transactions.len > 0: transaction = c.transactions.pop |
|
522 if transaction == "": return |
|
523 |
|
524 c.socksend( "COMMIT" & CRLF ) |
|
525 c.socksend( "transaction:" & transaction & CRLF ) |
|
526 c.finmsg |
|
527 |
|
528 # Remove the transaction from the queue. |
|
529 # |
|
530 var new_transactions: seq[ string ] = @[] |
|
531 for txn in c.transactions: |
|
532 if txn != transaction: new_transactions.add( txn ) |
|
533 c.transactions = new_transactions |
|
534 |
|
535 |
|
536 proc abort*( c: StompClient, txn: string = "" ): void = |
|
537 ## Cancel a specific transaction **txn**, or the most current if unspecified. |
|
538 var transaction = txn |
|
539 if transaction == "" and c.transactions.len > 0: transaction = c.transactions.pop |
|
540 if transaction == "": return |
|
541 |
|
542 c.socksend( "ABORT" & CRLF ) |
|
543 c.socksend( "transaction:" & transaction & CRLF ) |
|
544 c.finmsg |
|
545 |
|
546 # Remove the transaction from the queue. |
|
547 # |
|
548 var new_transactions: seq[ string ] = @[] |
|
549 for txn in c.transactions: |
|
550 if txn != transaction: new_transactions.add( txn ) |
|
551 c.transactions = new_transactions |
|
552 |
|
553 |
|
554 proc ack*( c: StompClient, id: string, transaction: string = "" ): void = |
|
555 ## Acknowledge message **id**. Optionally, attach this acknowledgement |
|
556 ## to a specific **transaction** -- if there's only one active, it is |
|
557 ## added automatically. |
|
558 c.socksend( "ACK" & CRLF ) |
|
559 c.socksend( "id:" & id & CRLF ) |
|
560 if not ( transaction == "" ): |
|
561 c.socksend( "transaction:" & transaction & CRLF ) |
|
562 else: |
|
563 c.add_txn |
|
564 c.finmsg |
|
565 |
|
566 |
|
567 proc nack*( c: StompClient, id: string, transaction: string = "" ): void = |
|
568 ## Reject message **id**. Optionally, attach this rejection to a |
|
569 ## specific **transaction** -- if there's only one active, it is |
|
570 ## added automatically. |
|
571 ## |
|
572 ## Subscribe to a queue with ACK mode enabled, and reject the message |
|
573 ## on error: |
|
574 ## |
|
575 ## .. code-block:: nim |
|
576 ## |
|
577 ## stomp.subscribe( "/queue/test", "client-individual" ) |
|
578 ## FIXME: attach procs |
|
579 ## stomp.wait_for_messages |
|
580 ## |
|
581 c.socksend( "NACK" & CRLF ) |
|
582 c.socksend( "id:" & id & CRLF ) |
|
583 if not ( transaction == "" ): |
|
584 c.socksend( "transaction:" & transaction & CRLF ) |
|
585 else: |
|
586 c.add_txn |
|
587 c.finmsg |
|
588 |
|
589 |
|
590 proc wait_for_messages*( c: StompClient, loop=true ) = |
|
591 ## Enter a blocking select loop, dispatching to the appropriate proc |
|
592 ## for the received message type. Return after a single message |
|
593 ## is received if **loop** is set to **false**. |
|
594 |
|
595 if not c.connected: raise newException( StompError, "Client is not connected." ) |
|
596 |
|
597 while true: |
|
598 var |
|
599 timeout: int |
|
600 fds = @[ c.socket.get_fd ] |
|
601 |
|
602 # Check for missed heartbeats, with an additional second |
|
603 # of wiggle-room. |
|
604 # |
|
605 if c.options.heartbeat > 0: |
|
606 timeout = ( c.options.heartbeat + 1 ) * 1000 |
|
607 else: |
|
608 timeout = -1 |
|
609 |
|
610 if select_read( fds, timeout ) == 0: # timeout, only happens if heartbeating missed |
|
611 if not isNil( c.missed_heartbeat_callback ): |
|
612 c.missed_heartbeat_callback( c ) |
|
613 else: |
|
614 c.default_missed_heartbeat_callback |
|
615 if loop: continue else: break |
|
616 |
|
617 let response = newStompResponse( c ) |
|
618 case response.frame: |
|
619 |
|
620 of "HEARTBEAT": |
|
621 if not isNil( c.heartbeat_callback ): |
|
622 c.heartbeat_callback( c, response ) |
|
623 continue |
|
624 |
|
625 of "RECEIPT": |
|
626 if not isNil( c.receipt_callback ): |
|
627 c.receipt_callback( c, response ) |
|
628 |
|
629 of "MESSAGE": |
|
630 if not isNil( c.message_callback ): |
|
631 c.message_callback( c, response ) |
|
632 |
|
633 of "ERROR": |
|
634 if not isNil( c.error_callback ): |
|
635 c.error_callback( c, response ) |
|
636 else: |
|
637 c.default_error_callback( response ) |
|
638 |
|
639 else: |
|
640 if defined( debug ): |
|
641 echo "Strange broker frame: " & response.repr |
|
642 |
|
643 if not loop: break |
|
644 |
|
645 |
|
646 |
|
647 #------------------------------------------------------------------- |
|
648 # T E S T S |
|
649 #------------------------------------------------------------------- |
|
650 |
|
651 # Functional (rather than unit) tests. Requires a Stomp compatible broker. |
|
652 # This was tested against RabbitMQ 3.5.3 and 3.6.0. |
|
653 # 3.6.0 was -so- much faster. |
|
654 # |
|
655 # First start up a message receiver: |
|
656 # ./stomp receiver [stomp-uri] [subscription-destination] |
|
657 # |
|
658 # then run another process, to publish stuff: |
|
659 # ./stomp publisher [stomp-uri] [publish-destination] |
|
660 # |
|
661 # An example with an AMQP "direct" exchange, and an exclusive queue: |
|
662 # ./stomp publisher stomp://test:test@localhost/?heartbeat=10 /exchange/test |
|
663 # ./stomp receiver stomp://test:test@localhost/?heartbeat=10 /exchange/test |
|
664 # |
|
665 # Then just let 'er run. |
|
666 # |
|
667 # You can also run a nieve benchmark (deliveries/sec): |
|
668 # |
|
669 # ./stomp benchmark stomp://test:test@localhost/ /exchange/test |
|
670 # |
|
671 # It will set messages to require acknowledgement, and nack everything, causing |
|
672 # a delivery loop for 10 seconds. |
|
673 # |
|
674 when isMainModule: |
|
675 let expected = 8 |
|
676 var |
|
677 socket = newSocket() |
|
678 messages: seq[ StompResponse ] = @[] |
|
679 |
|
680 let usage = """ |
|
681 First start up a message receiver: |
|
682 ./stomp receiver [stomp-uri] [subscription-destination] |
|
683 |
|
684 then run another process, to publish stuff: |
|
685 ./stomp publisher [stomp-uri] [publish-destination] |
|
686 |
|
687 An example with an AMQP "direct" exchange, and an exclusive queue: |
|
688 ./stomp publisher stomp://test:test@localhost/?heartbeat=10 /exchange/test |
|
689 ./stomp receiver stomp://test:test@localhost/?heartbeat=10 /exchange/test |
|
690 |
|
691 Then just let 'er run. |
|
692 |
|
693 You can also run a nieve benchmark (deliveries/sec): |
|
694 |
|
695 ./stomp benchmark stomp://test:test@localhost/ /exchange/test |
|
696 |
|
697 It will set messages to require acknowledgement, and nack everything, causing |
|
698 a delivery loop for 10 seconds. |
|
699 """ |
|
700 |
|
701 |
|
702 if paramCount() != 3: quit usage |
|
703 |
|
704 var stomp = newStompClient( socket, paramStr(2) ) |
|
705 stomp.connect |
|
706 echo stomp |
|
707 |
|
708 case paramStr(1): |
|
709 |
|
710 of "benchmark": |
|
711 echo "* Running for 10 seconds. Compile with -d:debug to see the Stomp conversation." |
|
712 var count = 0 |
|
713 var start = get_time() |
|
714 |
|
715 proc incr( c: StompClient, r: StompResponse ) = |
|
716 let id = r["ack"] |
|
717 count = count + 1 |
|
718 c.nack( id ) |
|
719 |
|
720 stomp.message_callback = incr |
|
721 stomp.subscribe( paramStr(3), "client" ) |
|
722 stomp.send( paramStr(3), "hi." ) |
|
723 while get_time() < start + 10.seconds: |
|
724 stomp.wait_for_messages( false ) |
|
725 |
|
726 printf "* Processed %d messages in 10 seconds.\n", count |
|
727 stomp.disconnect |
|
728 |
|
729 |
|
730 # Store incoming messages, ensure their contents match our expected behavior. |
|
731 # |
|
732 of "receiver": |
|
733 var heartbeats = 0 |
|
734 echo "* Waiting on messages from publisher. Compile with -d:debug to see the Stomp conversation." |
|
735 |
|
736 proc receive_message( c: StompClient, r: StompResponse ) = |
|
737 messages.add( r ) |
|
738 case r.frame: |
|
739 of "RECEIPT": |
|
740 discard |
|
741 of "MESSAGE": |
|
742 let body = r.payload |
|
743 let id = r[ "ack" ] |
|
744 |
|
745 proc seen_heartbeat( c: StompClient, r: StompResponse ) = |
|
746 heartbeats = heartbeats + 1 |
|
747 |
|
748 stomp.message_callback = receive_message |
|
749 stomp.receipt_callback = receive_message |
|
750 stomp.heartbeat_callback = seen_heartbeat |
|
751 stomp.subscribe( paramStr(3) ) |
|
752 |
|
753 # Populate the messages sequence with the count of expected messages. |
|
754 for i in 1..expected: stomp.wait_for_messages( false ) |
|
755 |
|
756 # Assertions on the results! |
|
757 # |
|
758 doAssert( messages.len == expected ) |
|
759 doAssert( messages[0].payload == "" ) |
|
760 |
|
761 doAssert( messages[1].payload == "Hello world!" ) |
|
762 |
|
763 doAssert( messages[2].payload == "Dumb.\n\n" ) |
|
764 |
|
765 doAssert( messages[3].payload == "Hello again." ) |
|
766 doAssert( messages[3][ "content-type" ] == "text/plain" ) |
|
767 doAssert( messages[3][ "Content-Type" ] == "text/plain" ) |
|
768 |
|
769 doAssert( messages[4][ "x-custom" ] == "yum" ) |
|
770 |
|
771 doAssert( messages[5][ "receipt" ] == "42" ) |
|
772 |
|
773 doAssert( messages[6].payload == "transaction!" ) |
|
774 doAssert( messages[7].payload == "transaction 2" ) |
|
775 |
|
776 stomp.disconnect |
|
777 |
|
778 if heartbeats > 0: |
|
779 printf "* Tests passed! %d heartbeats seen.", heartbeats |
|
780 else: |
|
781 echo "* Tests passed!" |
|
782 |
|
783 |
|
784 # Publish a variety of messages with various options. |
|
785 # Pause momentarily between sends(), as brokers -might- impose |
|
786 # rate limits and/or message dropping. |
|
787 # |
|
788 of "publisher": |
|
789 echo "* Publishing to receiver. Compile with -d:debug to see the Stomp conversation." |
|
790 |
|
791 # Simple, no frills event. |
|
792 stomp.send( paramStr(3) ) |
|
793 sleep 500 |
|
794 |
|
795 # Event with a body. |
|
796 stomp.send( paramStr(3), "Hello world!" ) |
|
797 sleep 500 |
|
798 |
|
799 # Event that doesn't contain a content-length. |
|
800 # (Note, the broker may elect to add one on your behalf, which is a good thing... |
|
801 # but invalidates this test.) |
|
802 stomp.socksend( "SEND" & CRLF ) |
|
803 stomp.socksend( "destination:" & paramStr(3) & CRLF & CRLF ) |
|
804 stomp.socksend( "Dumb.\n\n" & NULL ) |
|
805 sleep 500 |
|
806 |
|
807 # Content-Type |
|
808 stomp.send( paramStr(3), "Hello again.", "text/plain" ) |
|
809 sleep 500 |
|
810 |
|
811 # Custom headers. |
|
812 var headers: seq[ tuple[ name: string, value: string ] ] = @[] |
|
813 headers.add( ("x-custom", "yum") ) |
|
814 stomp.send( paramStr(3), "Hello again.", "text/plain", headers ) |
|
815 sleep 500 |
|
816 |
|
817 # Receipt requests. |
|
818 proc receive_receipt( c: StompClient, r: StompResponse ) = |
|
819 messages.add( r ) |
|
820 headers = @[] |
|
821 headers.add( ("receipt", "42") ) |
|
822 stomp.send( paramStr(3), "Hello again.", "text/plain", headers ) |
|
823 stomp.receipt_callback = receive_receipt |
|
824 stomp.wait_for_messages( false ) |
|
825 doAssert( messages[0]["receipt-id"] == "42" ) |
|
826 |
|
827 # Aborted transaction. |
|
828 stomp.begin( "test-abort" ) |
|
829 for i in 1..3: |
|
830 stomp.send( paramStr(3), "Message: " & $i ) |
|
831 stomp.abort |
|
832 |
|
833 # Committed transaction. |
|
834 stomp.begin( "test-commit" ) |
|
835 stomp.send( paramStr(3), "transaction!" ) |
|
836 stomp.commit |
|
837 |
|
838 # Mixed transactions. |
|
839 for i in 1..3: |
|
840 headers = @[] |
|
841 headers.add( ("transaction", "test-" & $i ) ) |
|
842 stomp.begin( "test-" & $i ) |
|
843 stomp.send( paramStr(3), "transaction " & $i, "", headers ) |
|
844 sleep 500 |
|
845 stomp.abort( "test-1" ) |
|
846 sleep 500 |
|
847 stomp.commit( "test-2" ) |
|
848 sleep 500 |
|
849 stomp.abort( "test-3" ) |
|
850 sleep 500 |
|
851 |
|
852 stomp.disconnect |
|
853 echo "* Tests passed!" |
|
854 |
|
855 else: |
|
856 quit usage |
|
857 |