|
1 # vim: set et nosta sw=4 ts=4 ft=nim : |
|
2 # |
|
3 # Copyright (c) 2016, Mahlon E. Smith <mahlon@martini.nu> |
|
4 # All rights reserved. |
|
5 # Redistribution and use in source and binary forms, with or without |
|
6 # modification, are permitted provided that the following conditions are met: |
|
7 # |
|
8 # * Redistributions of source code must retain the above copyright |
|
9 # notice, this list of conditions and the following disclaimer. |
|
10 # |
|
11 # * Redistributions in binary form must reproduce the above copyright |
|
12 # notice, this list of conditions and the following disclaimer in the |
|
13 # documentation and/or other materials provided with the distribution. |
|
14 # |
|
15 # * Neither the name of Mahlon E. Smith nor the names of his |
|
16 # contributors may be used to endorse or promote products derived |
|
17 # from this software without specific prior written permission. |
|
18 # |
|
19 # THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND ANY |
|
20 # EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED |
|
21 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE |
|
22 # DISCLAIMED. IN NO EVENT SHALL THE REGENTS AND CONTRIBUTORS BE LIABLE FOR ANY |
|
23 # DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES |
|
24 # (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; |
|
25 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND |
|
26 # ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
|
27 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS |
|
28 # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
|
29 |
|
30 ## Overview |
|
31 ## ============ |
|
32 ## |
|
33 ## This is a pure-nim client library for interacting with Stomp 1.2 |
|
34 ## compliant messaging brokers. |
|
35 ## |
|
36 ## https://stomp.github.io/stomp-specification-1.2.html |
|
37 ## |
|
38 ## Stomp is a simple protocol for message passing between clients, using a central |
|
39 ## broker. It is a subset of other more elaborate protocols (like AMQP), supporting |
|
40 ## only the most used features of common brokers. |
|
41 ## |
|
42 ## Because this library is pure-nim, there are no external dependencies. If you |
|
43 ## can compile a nim binary, you can participate in advanced messaging between processes. |
|
44 ## |
|
45 ## A list of broker support for Stomp can be found here: |
|
46 ## https://stomp.github.io/implementations.html. |
|
47 ## |
|
48 ## This library has been tested with recent versions of RabbitMQ. If it |
|
49 ## works for you with another broker, please let the author know. |
|
50 ## |
|
51 ## |
|
52 ## Protocol support |
|
53 ## ---------------- |
|
54 ## |
|
55 ## Examples |
|
56 ## ========= |
|
57 ## |
|
58 ## Connecting with SSL |
|
59 ## ------------------- |
|
60 ## |
|
61 |
|
62 import |
|
63 strutils, |
|
64 nativesockets, |
|
65 net, |
|
66 os, |
|
67 times, |
|
68 uri |
|
69 |
|
70 const |
|
71 VERSION = "0.1.0" ## The current program version. |
|
72 NULL = "\x00" ## The NULL character. |
|
73 CR = "\r" ## The carriage return character. |
|
74 CRLF = "\r\n" ## Carriage return + Line feed (EOL). |
|
75 |
|
76 |
|
77 # Exceptions |
|
78 # |
|
79 type |
|
80 StompError* = object of ValueError ## A generic Stomp error state. |
|
81 |
|
82 StompClient* = ref object of RootObj ## An object that represents a connection to a Stomp compatible server. |
|
83 socket: Socket ## The socket object attached to this client. |
|
84 connected*: bool ## Is the client currently connected? |
|
85 uri*: Uri ## The URI used to instantiate this client. |
|
86 username: string ## The Stomp server user, if any. |
|
87 password: string ## The Stomp server password, if any. |
|
88 host: string ## The host or IP address to connect to. |
|
89 port: Port ## Optional, if Stomp is on a non-default port. |
|
90 vhost: string ## Parsed from the URI path, a Stomp "virtual host". |
|
91 timeout*: int ## Global socket timeout. |
|
92 last_msgtime*: Time ## Timestamp of last seen server message. |
|
93 options: tuple[ heartbeat: int ] ## Any supported client options, derived from the URI query string. |
|
94 subscriptions*: seq[ string ] ## Registered client subscriptions. Array position is the ID. |
|
95 transactions*: seq[ string ] ## Any currently open transactions. |
|
96 serverinfo: seq[ tuple[name: string, value: string] ] ## Server metadata, populated upon a successful connection. |
|
97 |
|
98 connected_callback*: proc ( client: StompClient, response: StompResponse ): void |
|
99 error_callback*: proc ( client: StompClient, response: StompResponse ): void |
|
100 heartbeat_callback*: proc ( client: StompClient, response: StompResponse ): void |
|
101 message_callback*: proc ( client: StompClient, response: StompResponse ): void |
|
102 missed_heartbeat_callback*: proc ( client: StompClient ): void |
|
103 receipt_callback*: proc ( client: StompClient, response: StompResponse ): void |
|
104 |
|
105 StompResponse* = ref object of RootObj ## A parsed packet from a Stomp server. |
|
106 headers: seq[ tuple[name: string, value: string] ] ## Any headers in the response. Access with the `[]` proc. |
|
107 frame*: string ## The Stomp frame type. |
|
108 payload*: string ## The message body, if any. |
|
109 |
|
110 |
|
111 # convenience |
|
112 proc printf( formatstr: cstring ) {.header: "<stdio.h>", varargs.} |
|
113 |
|
114 |
|
115 #------------------------------------------------------------------- |
|
116 # R E S P O N S E |
|
117 #------------------------------------------------------------------- |
|
118 |
|
119 proc is_eol( s: string ): bool = |
|
120 ## Convenience method, returns **true** if string is a Stomp EOF. |
|
121 return s == CR or s == CRLF |
|
122 |
|
123 |
|
124 proc parse_headers( response: StompResponse, c: StompClient ): int = |
|
125 ## Parse response headers from a stream. |
|
126 ## Returns the content length of the response body, or 0. |
|
127 result = 0 |
|
128 var line = "" |
|
129 |
|
130 c.socket.readline( line, c.timeout ) |
|
131 while not line.is_eol: |
|
132 if defined( debug ): printf " <-- %s\n", line |
|
133 var header = line.split( ":" ) |
|
134 if header.len < 2: break |
|
135 response.headers.add( (header[0], header[1]) ) |
|
136 if cmpIgnoreCase( header[0], "content-length" ) == 0: result = header[1].parse_int |
|
137 c.socket.readline( line, c.timeout ) |
|
138 |
|
139 |
|
140 proc parse_payload( response: StompResponse, c: StompClient, bodylength = 0 ): void = |
|
141 ## Parse message payload from a stream. |
|
142 let bufsize = 8192 |
|
143 var |
|
144 buf = "" |
|
145 data = "" |
|
146 |
|
147 |
|
148 # If we already know the length of the body, just perform a buffered read. |
|
149 # |
|
150 if bodylength > 0: |
|
151 var |
|
152 readtotal = 0 |
|
153 readamt = 0 |
|
154 remaining = 0 |
|
155 |
|
156 while readtotal != bodylength: |
|
157 remaining = bodylength - readtotal |
|
158 |
|
159 if remaining < bufsize: |
|
160 readamt = remaining |
|
161 else: |
|
162 readamt = bufsize |
|
163 |
|
164 buf = newString( readamt ) |
|
165 readtotal = readtotal + c.socket.recv( buf, readamt, c.timeout ) |
|
166 data = data & buf |
|
167 |
|
168 # Eat the NULL terminator. |
|
169 discard c.socket.recv( buf, 1, c.timeout ) |
|
170 |
|
171 # Inefficient path. |
|
172 # |
|
173 else: |
|
174 while buf != NULL: |
|
175 discard c.socket.recv( buf, 1, c.timeout ) |
|
176 data = data & buf |
|
177 |
|
178 response.payload = data |
|
179 |
|
180 |
|
181 proc newStompResponse( c: StompClient ): StompResponse = |
|
182 ## Initialize a response object, which parses and contains |
|
183 ## the Stomp headers and any additional important data from |
|
184 ## the broker socket. |
|
185 new( result ) |
|
186 result.headers = @[] |
|
187 |
|
188 # Get the frame type, record last seen server activity time. |
|
189 # |
|
190 var line = "" |
|
191 c.socket.readline( line, c.timeout ) |
|
192 c.last_msgtime = get_time() |
|
193 |
|
194 # Heartbeat packets (empties.) |
|
195 # |
|
196 # This could -also- parse optional EOLs from the prior |
|
197 # message (after the NULL separator), but since it is a no-op, |
|
198 # this seems harmless. |
|
199 # |
|
200 if line.is_eol: |
|
201 result.frame = "HEARTBEAT" |
|
202 return result |
|
203 |
|
204 # All other types. |
|
205 # |
|
206 result.frame = line |
|
207 if defined( debug ): |
|
208 printf " <-- %s\n", line |
|
209 |
|
210 # Parse headers and body. |
|
211 # |
|
212 var length = result.parse_headers( c ) |
|
213 if result.frame == "MESSAGE" or result.frame == "ERROR": |
|
214 result.parse_payload( c, length ) |
|
215 |
|
216 # If the response -could- have a body, the NULL has already |
|
217 # been removed from the stream while we checked for one. |
|
218 # |
|
219 if result.payload.len > 0: |
|
220 if result.payload == NULL: # We checked for a body, but there was none. |
|
221 result.payload = nil |
|
222 if defined( debug ): printf " <--\n <-- ^@\n\n" |
|
223 else: |
|
224 if defined( debug ): printf " <--\n <-- (payload)^@\n\n" |
|
225 |
|
226 # Otherwise, pop off the NULL terminator now. |
|
227 # |
|
228 else: |
|
229 discard c.socket.recv( line, 1, c.timeout ) |
|
230 if defined( debug ): printf " <--\n <-- ^@\n\n" |
|
231 |
|
232 |
|
233 proc `$`*( r: StompResponse ): string = |
|
234 ## Represent a Stomp response as a string. |
|
235 result = r.frame & ": " & $r.headers |
|
236 |
|
237 |
|
238 proc `[]`*( response: StompResponse, key: string ): string = |
|
239 ## Get a specific header from a Stomp response. |
|
240 for header in response.headers: |
|
241 if cmpIgnoreCase( key, header.name ) == 0: |
|
242 return header.value |
|
243 return nil |
|
244 |
|
245 |
|
246 #------------------------------------------------------------------- |
|
247 # C A L L B A C K S |
|
248 #------------------------------------------------------------------- |
|
249 |
|
250 proc default_error_callback( c: StompClient, response: StompResponse ) = |
|
251 ## Something bad happened. Disconnect from the server, build an error message, |
|
252 ## and raise an exception. |
|
253 c.socket.close |
|
254 c.connected = false |
|
255 |
|
256 var detail = response.payload |
|
257 var msg = response[ "message" ] |
|
258 if $detail[ ^1 ] == "\n": detail = detail[ 0 .. ^2 ] # chomp |
|
259 |
|
260 if detail.len > 0: msg = msg & " (" & detail & ")" |
|
261 raise newException( StompError, "ERROR: " & msg ) |
|
262 |
|
263 |
|
264 proc default_missed_heartbeat_callback( c: StompClient ) = |
|
265 ## Timeout while connected to the broker. |
|
266 c.socket.close |
|
267 c.connected = false |
|
268 raise newException( StompError, "Heartbeat timeout. Last activity: " & $c.last_msgtime ) |
|
269 |
|
270 |
|
271 |
|
272 #------------------------------------------------------------------- |
|
273 # C L I E N T |
|
274 #------------------------------------------------------------------- |
|
275 |
|
276 proc newStompClient*( s: Socket, uri: string ): StompClient = |
|
277 ## Create a new Stomp client object from a preexisting **socket**, |
|
278 ## and a stomp **URI** string. |
|
279 ## |
|
280 ## .. code-block:: nim |
|
281 ## |
|
282 ## var socket = newSocket() |
|
283 ## var stomp = newStompClient( socket, "stomp://test:test@example.com/%2Fvhost" ) |
|
284 ## |
|
285 ## or if connecting with SSL, when compiled with -d:ssl: |
|
286 ## |
|
287 ## .. code-block:: nim |
|
288 ## |
|
289 ## var socket = newSocket() |
|
290 ## let sslContext = newContext( verifyMode = CVerifyNone ) |
|
291 ## sslContext.wrapSocket(socket) |
|
292 ## var stomp = newStompClient( socket, "stomp+ssl://test:test@example.com/%2Fvhost" ) |
|
293 ## |
|
294 new( result ) |
|
295 result.socket = s |
|
296 result.connected = false |
|
297 result.uri = parse_uri( uri ) |
|
298 result.username = result.uri.username |
|
299 result.password = result.uri.password |
|
300 result.host = result.uri.hostname |
|
301 result.vhost = result.uri.path |
|
302 result.timeout = 500 |
|
303 result.subscriptions = @[] |
|
304 result.transactions = @[] |
|
305 |
|
306 # Parse any supported options in the query string. |
|
307 # |
|
308 for pairs in result.uri.query.split( '&' ): |
|
309 let opt = pairs.split( '=' ) |
|
310 try: |
|
311 case opt[0]: |
|
312 of "heartbeat": |
|
313 result.options.heartbeat = opt[1].parse_int |
|
314 else: |
|
315 discard |
|
316 except IndexError, ValueError: |
|
317 discard |
|
318 |
|
319 # Set default STOMP port if otherwise unset. |
|
320 # |
|
321 if not result.uri.scheme.contains( "stomp" ): |
|
322 raise newException( StompError, "Unknown scheme: " & result.uri.scheme ) |
|
323 var port: int |
|
324 if result.uri.port == "": |
|
325 if result.uri.scheme.contains( "+ssl" ): |
|
326 port = 61614 |
|
327 else: |
|
328 port = 61613 |
|
329 else: |
|
330 port = result.uri.port.parse_int |
|
331 |
|
332 result.port = Port( port ) |
|
333 |
|
334 # Decode URI encoded slashes for vhosts. |
|
335 result.vhost = result.vhost.replace( "%2f", "/" ).replace( "%2F", "/" ).replace( "//", "/" ) |
|
336 |
|
337 |
|
338 proc socksend( c: StompClient, data: string ): void = |
|
339 ## Send data on the connected socket with optional debug output. |
|
340 c.socket.send( data ) |
|
341 if defined( debug ): printf " --> %s", data |
|
342 |
|
343 |
|
344 proc finmsg( c: StompClient ): void = |
|
345 ## Send data on the connected socket with optional debug output. |
|
346 c.socket.send( CRLF & NULL & CRLF ) |
|
347 if defined( debug ): printf " --> \n --> ^@\n\n" |
|
348 |
|
349 |
|
350 proc `[]`*( c: StompClient, key: string ): string = |
|
351 ## Get a specific value from the server metadata, set during the initial connection. |
|
352 if not c.connected: return nil |
|
353 for header in c.serverinfo: |
|
354 if cmpIgnoreCase( key, header.name ) == 0: |
|
355 return header.value |
|
356 return nil |
|
357 |
|
358 |
|
359 proc `$`*( c: StompClient ): string = |
|
360 ## Represent the stomp client as a string, after masking the password. |
|
361 let uri = ( $c.uri ).replace( ":" & c.uri.password & "@", "@" ) |
|
362 result = "(NimStomp v" & VERSION & ( if c.connected: " connected" else: " not connected" ) & " to " & uri |
|
363 if not c[ "server" ].is_nil: |
|
364 result.add( " --> " & c["server"] ) |
|
365 result.add( ")" ) |
|
366 |
|
367 |
|
368 proc connect*( c: StompClient ): void = |
|
369 ## Establish a connection to the Stomp server. |
|
370 if c.connected: return |
|
371 |
|
372 var headers: seq[ tuple[name: string, value: string] ] = @[] |
|
373 headers.add( ("accept-version", "1.2") ) |
|
374 |
|
375 # Stomp 1.2 requires the Host: header. Use the path as a vhost if |
|
376 # supplied, otherwise use the hostname of the server. |
|
377 # |
|
378 if c.vhost != "": |
|
379 headers.add( ("host", c.vhost) ) |
|
380 else: |
|
381 headers.add( ("host", c.host) ) |
|
382 |
|
383 if c.username != "": headers.add( ("login", c.username) ) |
|
384 if c.password != "": headers.add( ("passcode", c.password) ) |
|
385 if c.options.heartbeat > 0: |
|
386 let heartbeat = c.options.heartbeat * 1000 |
|
387 headers.add( ("heart-beat", "0," & $heartbeat) ) |
|
388 |
|
389 # Connect the socket and send the headers off. |
|
390 # |
|
391 c.socket.connect( c.host, c.port ) |
|
392 c.socksend( "CONNECT" & CRLF ) |
|
393 for header in headers: |
|
394 c.socksend( header.name & ":" & header.value & CRLF ) |
|
395 c.finmsg |
|
396 |
|
397 # Retreive and copy server metadata to client object. |
|
398 # |
|
399 var response = newStompResponse( c ) |
|
400 c.serverinfo = response.headers |
|
401 |
|
402 if response.frame != "CONNECTED": |
|
403 if not isNil( c.error_callback ): |
|
404 c.error_callback( c, response ) |
|
405 else: |
|
406 c.default_error_callback( response ) |
|
407 else: |
|
408 c.connected = true |
|
409 if not isNil( c.connected_callback ): |
|
410 c.connected_callback( c, response ) |
|
411 |
|
412 |
|
413 proc disconnect*( c: StompClient ): void = |
|
414 ## Break down the connection to the Stomp server nicely. |
|
415 if not c.connected: return |
|
416 c.socksend( "DISCONNECT" & CRLF ) |
|
417 c.finmsg |
|
418 |
|
419 c.socket.close |
|
420 c.connected = false |
|
421 |
|
422 |
|
423 proc add_txn( c: StompClient ): void = |
|
424 ## Add a transaction header if there is only a single open txn. |
|
425 if c.transactions.len != 1: return |
|
426 c.socksend( "transaction:" & c.transactions[0] & CRLF ) |
|
427 |
|
428 |
|
429 proc send*( c: StompClient, |
|
430 destination: string, |
|
431 message: string = nil, |
|
432 contenttype: string = nil, |
|
433 headers: seq[ tuple[name: string, value: string] ] = @[] ): void = |
|
434 ## Send a **message** to **destination**. |
|
435 ## |
|
436 ## A Content-Length header is automatically and always included. |
|
437 ## A **contenttype** is optional, but strongly recommended. |
|
438 ## |
|
439 ## Additionally, a transaction ID is automatically added if there is only |
|
440 ## one transaction active. If you need to attach this message to a particular |
|
441 ## transaction ID, you'll need to add it yourself with the user defined |
|
442 ## **headers**. |
|
443 |
|
444 if not c.connected: raise newException( StompError, "Client is not connected." ) |
|
445 c.socksend( "SEND" & CRLF ) |
|
446 c.socksend( "destination:" & destination & CRLF ) |
|
447 c.socksend( "content-length:" & $message.len & CRLF ) |
|
448 if not contenttype.is_nil: c.socksend( "content-type:" & contenttype & CRLF ) |
|
449 |
|
450 # Add custom headers. Add transaction header if one isn't manually |
|
451 # present (and a transaction is open.) |
|
452 # |
|
453 var txn_seen = false |
|
454 for header in headers: |
|
455 if header.name == "transaction": txn_seen = true |
|
456 c.socksend( header.name & ":" & header.value & CRLF ) |
|
457 if not txn_seen: c.add_txn |
|
458 |
|
459 if message.is_nil: |
|
460 c.finmsg |
|
461 else: |
|
462 c.socket.send( CRLF & message & NULL ) |
|
463 if defined( debug ): printf " -->\n --> (payload)^@\n\n" |
|
464 |
|
465 |
|
466 proc subscribe*( c: StompClient, |
|
467 destination: string, |
|
468 ack = "auto", |
|
469 headers: seq[ tuple[name: string, value: string] ] = @[] ): void = |
|
470 ## Subscribe to messages at **destination**. |
|
471 ## |
|
472 ## Setting **ack** to "client" or "client-individual" enables client ACK/NACK mode. |
|
473 ## In this mode, incoming messages aren't considered processed by |
|
474 ## the server unless they receive ACK. By default, the server |
|
475 ## considers the message processed if a client simply accepts it. |
|
476 ## |
|
477 ## You may optionally add any additional **headers** the server may support. |
|
478 |
|
479 if not c.connected: raise newException( StompError, "Client is not connected." ) |
|
480 c.socksend( "SUBSCRIBE" & CRLF ) |
|
481 c.socksend( "destination:" & destination & CRLF ) |
|
482 c.socksend( "id:" & $c.subscriptions.len & CRLF ) |
|
483 if ack == "client" or ack == "client-individual": |
|
484 c.socksend( "ack:" & ack & CRLF ) |
|
485 else: |
|
486 if ack != "auto": raise newException( StompError, "Unknown ack type: " & ack ) |
|
487 |
|
488 for header in headers: |
|
489 c.socksend( header.name & ":" & header.value & CRLF ) |
|
490 c.finmsg |
|
491 c.subscriptions.add( destination ) |
|
492 |
|
493 |
|
494 proc unsubscribe*( c: StompClient, |
|
495 destination: string, |
|
496 headers: seq[ tuple[name: string, value: string] ] = @[] ): void = |
|
497 ## Unsubscribe from messages at **destination**. |
|
498 ## You may optionally add any additional **headers** the server may support. |
|
499 |
|
500 if not c.connected: raise newException( StompError, "Client is not connected." ) |
|
501 var |
|
502 sub_id: int |
|
503 i = 0 |
|
504 |
|
505 # Find the ID of the subscription. |
|
506 # |
|
507 for sub in c.subscriptions: |
|
508 if sub == destination: |
|
509 sub_id = i |
|
510 break |
|
511 i = i + 1 |
|
512 |
|
513 c.socksend( "UNSUBSCRIBE" & CRLF ) |
|
514 c.socksend( "id:" & $sub_id & CRLF ) |
|
515 for header in headers: |
|
516 c.socksend( header.name & ":" & header.value & CRLF ) |
|
517 c.finmsg |
|
518 c.subscriptions[ sub_id ] = "" |
|
519 |
|
520 |
|
521 proc begin*( c: StompClient, txn: string ): void = |
|
522 ## Begin a new transaction on the broker, using **txn** as the identifier. |
|
523 c.socksend( "BEGIN" & CRLF ) |
|
524 c.socksend( "transaction:" & txn & CRLF ) |
|
525 c.finmsg |
|
526 c.transactions.add( txn ) |
|
527 |
|
528 |
|
529 proc commit*( c: StompClient, txn: string = nil ): void = |
|
530 ## Finish a specific transaction **txn**, or the most current if unspecified. |
|
531 var transaction = txn |
|
532 if transaction.is_nil and c.transactions.len > 0: transaction = c.transactions.pop |
|
533 if transaction.is_nil: return |
|
534 |
|
535 c.socksend( "COMMIT" & CRLF ) |
|
536 c.socksend( "transaction:" & transaction & CRLF ) |
|
537 c.finmsg |
|
538 |
|
539 # Remove the transaction from the queue. |
|
540 # |
|
541 var new_transactions: seq[ string ] = @[] |
|
542 for txn in c.transactions: |
|
543 if txn != transaction: new_transactions.add( txn ) |
|
544 c.transactions = new_transactions |
|
545 |
|
546 |
|
547 proc abort*( c: StompClient, txn: string = nil ): void = |
|
548 ## Cancel a specific transaction **txn**, or the most current if unspecified. |
|
549 var transaction = txn |
|
550 if transaction.is_nil and c.transactions.len > 0: transaction = c.transactions.pop |
|
551 if transaction.is_nil: return |
|
552 |
|
553 c.socksend( "ABORT" & CRLF ) |
|
554 c.socksend( "transaction:" & transaction & CRLF ) |
|
555 c.finmsg |
|
556 |
|
557 # Remove the transaction from the queue. |
|
558 # |
|
559 var new_transactions: seq[ string ] = @[] |
|
560 for txn in c.transactions: |
|
561 if txn != transaction: new_transactions.add( txn ) |
|
562 c.transactions = new_transactions |
|
563 |
|
564 |
|
565 proc ack*( c: StompClient, id: string, transaction: string = nil ): void = |
|
566 ## Acknowledge message **id**. Optionally, attach this acknowledgement |
|
567 ## to a specific **transaction** -- if there's only one active, it is |
|
568 ## added automatically. |
|
569 c.socksend( "ACK" & CRLF ) |
|
570 c.socksend( "id:" & id & CRLF ) |
|
571 if not transaction.is_nil: |
|
572 c.socksend( "transaction:" & transaction & CRLF ) |
|
573 else: |
|
574 c.add_txn |
|
575 c.finmsg |
|
576 |
|
577 |
|
578 proc nack*( c: StompClient, id: string, transaction: string = nil ): void = |
|
579 ## Reject message **id**. Optionally, attach this rejection to a |
|
580 ## specific **transaction** -- if there's only one active, it is |
|
581 ## added automatically. |
|
582 ## |
|
583 ## Subscribe to a queue with ACK mode enabled, and reject the message |
|
584 ## on error: |
|
585 ## |
|
586 ## .. code-block:: nim |
|
587 ## |
|
588 ## stomp.subscribe( "/queue/test", "client-individual" ) |
|
589 ## FIXME: attach procs |
|
590 ## stomp.wait_for_messages |
|
591 ## |
|
592 c.socksend( "NACK" & CRLF ) |
|
593 c.socksend( "id:" & id & CRLF ) |
|
594 if not transaction.is_nil: |
|
595 c.socksend( "transaction:" & transaction & CRLF ) |
|
596 else: |
|
597 c.add_txn |
|
598 c.finmsg |
|
599 |
|
600 |
|
601 proc wait_for_messages*( c: StompClient, loop=true ) = |
|
602 ## Enter a blocking select loop, dispatching to the appropriate proc |
|
603 ## for the received message type. Return after a single message |
|
604 ## is received if **loop** is set to **false**. |
|
605 |
|
606 if not c.connected: raise newException( StompError, "Client is not connected." ) |
|
607 |
|
608 while true: |
|
609 var |
|
610 timeout: int |
|
611 fds = @[ c.socket.get_fd ] |
|
612 |
|
613 # Check for missed heartbeats, with an additional second |
|
614 # of wiggle-room. |
|
615 # |
|
616 if c.options.heartbeat > 0: |
|
617 timeout = ( c.options.heartbeat + 1 ) * 1000 |
|
618 else: |
|
619 timeout = -1 |
|
620 |
|
621 if select( fds, timeout ) == 0: # timeout, only happens if heartbeating missed |
|
622 if not isNil( c.missed_heartbeat_callback ): |
|
623 c.missed_heartbeat_callback( c ) |
|
624 else: |
|
625 c.default_missed_heartbeat_callback |
|
626 if loop: continue else: break |
|
627 |
|
628 let response = newStompResponse( c ) |
|
629 case response.frame: |
|
630 |
|
631 of "HEARTBEAT": |
|
632 if not isNil( c.heartbeat_callback ): |
|
633 c.heartbeat_callback( c, response ) |
|
634 continue |
|
635 |
|
636 of "RECEIPT": |
|
637 if not isNil( c.receipt_callback ): |
|
638 c.receipt_callback( c, response ) |
|
639 |
|
640 of "MESSAGE": |
|
641 if not isNil( c.message_callback ): |
|
642 c.message_callback( c, response ) |
|
643 |
|
644 of "ERROR": |
|
645 if not isNil( c.error_callback ): |
|
646 c.error_callback( c, response ) |
|
647 else: |
|
648 c.default_error_callback( response ) |
|
649 |
|
650 else: |
|
651 if defined( debug ): |
|
652 echo "Strange broker frame: " & response.repr |
|
653 |
|
654 if not loop: break |
|
655 |
|
656 |
|
657 |
|
658 #------------------------------------------------------------------- |
|
659 # T E S T S |
|
660 #------------------------------------------------------------------- |
|
661 |
|
662 # Functional (rather than unit) tests. Requires a Stomp compatible broker. |
|
663 # This was tested against RabbitMQ 3.5.3 and 3.6.0. |
|
664 # 3.6.0 was -so- much faster. |
|
665 # |
|
666 # First start up a message receiver: |
|
667 # ./stomp receiver [stomp-uri] [subscription-destination] |
|
668 # |
|
669 # then run another process, to publish stuff: |
|
670 # ./stomp publisher [stomp-uri] [publish-destination] |
|
671 # |
|
672 # An example with an AMQP "direct" exchange, and an exclusive queue: |
|
673 # ./stomp publisher stomp://test:test@localhost/?heartbeat=10 /exchange/test |
|
674 # ./stomp receiver stomp://test:test@localhost/?heartbeat=10 /exchange/test |
|
675 # |
|
676 # Then just let 'er run. |
|
677 # |
|
678 # You can also run a nieve benchmark (deliveries/sec): |
|
679 # |
|
680 # ./stomp benchmark stomp://test:test@localhost/ /exchange/test |
|
681 # |
|
682 # It will set messages to require acknowledgement, and nack everything, causing |
|
683 # a delivery loop for 10 seconds. |
|
684 # |
|
685 when isMainModule: |
|
686 let expected = 8 |
|
687 var |
|
688 socket = newSocket() |
|
689 messages: seq[ StompResponse ] = @[] |
|
690 |
|
691 if paramCount() != 3: quit "See source comments for how to run functional tests." |
|
692 |
|
693 var stomp = newStompClient( socket, paramStr(2) ) |
|
694 stomp.connect |
|
695 echo stomp |
|
696 |
|
697 case paramStr(1): |
|
698 |
|
699 of "benchmark": |
|
700 echo "* Running for 10 seconds. Compile with -d:debug to see the Stomp conversation." |
|
701 var count = 0 |
|
702 var start = get_time() |
|
703 |
|
704 proc incr( c: StompClient, r: StompResponse ) = |
|
705 let id = r["ack"] |
|
706 count = count + 1 |
|
707 c.nack( id ) |
|
708 |
|
709 stomp.message_callback = incr |
|
710 stomp.subscribe( paramStr(3), "client" ) |
|
711 stomp.send( paramStr(3), "hi." ) |
|
712 while get_time() - start < 10: |
|
713 stomp.wait_for_messages( false ) |
|
714 |
|
715 printf "* Processed %d messages in 10 seconds.\n", count |
|
716 stomp.disconnect |
|
717 |
|
718 |
|
719 # Store incoming messages, ensure their contents match our expected behavior. |
|
720 # |
|
721 of "receiver": |
|
722 var heartbeats = 0 |
|
723 echo "* Waiting on messages from publisher. Compile with -d:debug to see the Stomp conversation." |
|
724 |
|
725 proc receive_message( c: StompClient, r: StompResponse ) = |
|
726 messages.add( r ) |
|
727 case r.frame: |
|
728 of "RECEIPT": |
|
729 discard |
|
730 of "MESSAGE": |
|
731 let body = r.payload |
|
732 let id = r[ "ack" ] |
|
733 |
|
734 proc seen_heartbeat( c: StompClient, r: StompResponse ) = |
|
735 heartbeats = heartbeats + 1 |
|
736 |
|
737 stomp.message_callback = receive_message |
|
738 stomp.receipt_callback = receive_message |
|
739 stomp.heartbeat_callback = seen_heartbeat |
|
740 stomp.subscribe( paramStr(3) ) |
|
741 |
|
742 # Populate the messages sequence with the count of expected messages. |
|
743 for i in 1..expected: stomp.wait_for_messages( false ) |
|
744 |
|
745 # Assertions on the results! |
|
746 # |
|
747 doAssert( messages.len == expected ) |
|
748 doAssert( messages[0].payload == nil ) |
|
749 |
|
750 doAssert( messages[1].payload == "Hello world!" ) |
|
751 |
|
752 doAssert( messages[2].payload == "Dumb.\n\n" ) |
|
753 |
|
754 doAssert( messages[3].payload == "Hello again." ) |
|
755 doAssert( messages[3][ "content-type" ] == "text/plain" ) |
|
756 doAssert( messages[3][ "Content-Type" ] == "text/plain" ) |
|
757 |
|
758 doAssert( messages[4][ "x-custom" ] == "yum" ) |
|
759 |
|
760 doAssert( messages[5][ "receipt" ] == "42" ) |
|
761 |
|
762 doAssert( messages[6].payload == "transaction!" ) |
|
763 doAssert( messages[7].payload == "transaction 2" ) |
|
764 |
|
765 stomp.disconnect |
|
766 |
|
767 if heartbeats > 0: |
|
768 printf "* Tests passed! %d heartbeats seen.", heartbeats |
|
769 else: |
|
770 echo "* Tests passed!" |
|
771 |
|
772 |
|
773 # Publish a variety of messages with various options. |
|
774 # Pause momentarily between sends(), as brokers -might- impose |
|
775 # rate limits and/or message dropping. |
|
776 # |
|
777 of "publisher": |
|
778 echo "* Publishing to receiver. Compile with -d:debug to see the Stomp conversation." |
|
779 |
|
780 # Simple, no frills event. |
|
781 stomp.send( paramStr(3) ) |
|
782 sleep 500 |
|
783 |
|
784 # Event with a body. |
|
785 stomp.send( paramStr(3), "Hello world!" ) |
|
786 sleep 500 |
|
787 |
|
788 # Event that doesn't contain a content-length. |
|
789 # (Note, the broker may elect to add one on your behalf, which is a good thing... |
|
790 # but invalidates this test.) |
|
791 stomp.socksend( "SEND" & CRLF ) |
|
792 stomp.socksend( "destination:" & paramStr(3) & CRLF & CRLF ) |
|
793 stomp.socksend( "Dumb.\n\n" & NULL ) |
|
794 sleep 500 |
|
795 |
|
796 # Content-Type |
|
797 stomp.send( paramStr(3), "Hello again.", "text/plain" ) |
|
798 sleep 500 |
|
799 |
|
800 # Custom headers. |
|
801 var headers: seq[ tuple[ name: string, value: string ] ] = @[] |
|
802 headers.add( ("x-custom", "yum") ) |
|
803 stomp.send( paramStr(3), "Hello again.", "text/plain", headers ) |
|
804 sleep 500 |
|
805 |
|
806 # Receipt requests. |
|
807 proc receive_receipt( c: StompClient, r: StompResponse ) = |
|
808 messages.add( r ) |
|
809 headers = @[] |
|
810 headers.add( ("receipt", "42") ) |
|
811 stomp.send( paramStr(3), "Hello again.", "text/plain", headers ) |
|
812 stomp.receipt_callback = receive_receipt |
|
813 stomp.wait_for_messages( false ) |
|
814 doAssert( messages[0]["receipt-id"] == "42" ) |
|
815 |
|
816 # Aborted transaction. |
|
817 stomp.begin( "test-abort" ) |
|
818 for i in 1..3: |
|
819 stomp.send( paramStr(3), "Message: " & $i ) |
|
820 stomp.abort |
|
821 |
|
822 # Committed transaction. |
|
823 stomp.begin( "test-commit" ) |
|
824 stomp.send( paramStr(3), "transaction!" ) |
|
825 stomp.commit |
|
826 |
|
827 # Mixed transactions. |
|
828 for i in 1..3: |
|
829 headers = @[] |
|
830 headers.add( ("transaction", "test-" & $i ) ) |
|
831 stomp.begin( "test-" & $i ) |
|
832 stomp.send( paramStr(3), "transaction " & $i, nil, headers ) |
|
833 sleep 500 |
|
834 stomp.abort( "test-1" ) |
|
835 sleep 500 |
|
836 stomp.commit( "test-2" ) |
|
837 sleep 500 |
|
838 stomp.abort( "test-3" ) |
|
839 sleep 500 |
|
840 |
|
841 stomp.disconnect |
|
842 echo "* Tests passed!" |
|
843 |
|
844 else: |
|
845 quit "See source comments for how to run functional tests." |
|
846 |
|
847 |