src/stomp.nim
changeset 4 2f4e88604125
parent 0 52e9f64937bf
child 6 7d977f308c75
equal deleted inserted replaced
3:d0bc42746346 4:2f4e88604125
       
     1 # vim: set et nosta sw=4 ts=4 ft=nim : 
       
     2 #
       
     3 # Copyright (c) 2016-2018, Mahlon E. Smith <mahlon@martini.nu>
       
     4 # All rights reserved.
       
     5 # Redistribution and use in source and binary forms, with or without
       
     6 # modification, are permitted provided that the following conditions are met:
       
     7 #
       
     8 #     * Redistributions of source code must retain the above copyright
       
     9 #       notice, this list of conditions and the following disclaimer.
       
    10 #
       
    11 #     * Redistributions in binary form must reproduce the above copyright
       
    12 #       notice, this list of conditions and the following disclaimer in the
       
    13 #       documentation and/or other materials provided with the distribution.
       
    14 #
       
    15 #     * Neither the name of Mahlon E. Smith nor the names of his
       
    16 #       contributors may be used to endorse or promote products derived
       
    17 #       from this software without specific prior written permission.
       
    18 #
       
    19 # THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND ANY
       
    20 # EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
       
    21 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
       
    22 # DISCLAIMED. IN NO EVENT SHALL THE REGENTS AND CONTRIBUTORS BE LIABLE FOR ANY
       
    23 # DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
       
    24 # (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
       
    25 # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
       
    26 # ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
       
    27 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
       
    28 # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
       
    29 
       
    30 ## Overview
       
    31 ## ============
       
    32 ##
       
    33 ## This is a pure-nim client library for interacting with Stomp 1.2
       
    34 ## compliant messaging brokers.
       
    35 ##
       
    36 ## https://stomp.github.io/stomp-specification-1.2.html
       
    37 ##
       
    38 ## Stomp is a simple protocol for message passing between clients, using a central
       
    39 ## broker.  It is a subset of other more elaborate protocols (like AMQP), supporting
       
    40 ## only the most used features of common brokers.
       
    41 ##
       
    42 ## Because this library is pure-nim, there are no external dependencies.  If you
       
    43 ## can compile a nim binary, you can participate in advanced messaging between processes.
       
    44 ##
       
    45 ## A list of broker support for Stomp can be found here:
       
    46 ## https://stomp.github.io/implementations.html.
       
    47 ##
       
    48 ## This library has been tested with recent versions of RabbitMQ.  If it
       
    49 ## works for you with another broker, please let the author know.
       
    50 ##
       
    51 
       
    52 import
       
    53     strutils,
       
    54     nativesockets,
       
    55     net,
       
    56     os,
       
    57     times,
       
    58     uri
       
    59 
       
    60 const
       
    61     VERSION = "0.1.1" ## The current program version.
       
    62     NULL    = "\x00"  ## The NULL character.
       
    63     CR      = "\r"    ## The carriage return character.
       
    64     CRLF    = "\r\n"  ## Carriage return + Line feed (EOL).
       
    65 
       
    66 
       
    67 # Exceptions
       
    68 #
       
    69 type
       
    70     StompError* = object of ValueError ## A generic Stomp error state.
       
    71 
       
    72     StompClient* = ref object of RootObj ## An object that represents a connection to a Stomp compatible server.
       
    73         socket:         Socket ## The socket object attached to this client.
       
    74         connected*:     bool ## Is the client currently connected?
       
    75         uri*:           Uri ## The URI used to instantiate this client.
       
    76         username:       string ## The Stomp server user, if any.
       
    77         password:       string ## The Stomp server password, if any.
       
    78         host:           string ## The host or IP address to connect to.
       
    79         port:           Port ## Optional, if Stomp is on a non-default port.
       
    80         vhost:          string ## Parsed from the URI path, a Stomp "virtual host".
       
    81         timeout*:       int ## Global socket timeout.
       
    82         last_msgtime*:  Time ## Timestamp of last seen server message.
       
    83         options:        tuple[ heartbeat: int ] ## Any supported client options, derived from the URI query string.
       
    84         subscriptions*: seq[ string ] ## Registered client subscriptions. Array position is the ID.
       
    85         transactions*:  seq[ string ] ## Any currently open transactions.
       
    86         serverinfo:     seq[ tuple[name: string, value: string] ] ## Server metadata, populated upon a successful connection.
       
    87 
       
    88         connected_callback*:        proc ( client: StompClient, response: StompResponse ): void
       
    89         error_callback*:            proc ( client: StompClient, response: StompResponse ): void
       
    90         heartbeat_callback*:        proc ( client: StompClient, response: StompResponse ): void
       
    91         message_callback*:          proc ( client: StompClient, response: StompResponse ): void
       
    92         missed_heartbeat_callback*: proc ( client: StompClient ): void
       
    93         receipt_callback*:          proc ( client: StompClient, response: StompResponse ): void
       
    94 
       
    95     StompResponse* = ref object of RootObj ## A parsed packet from a Stomp server.
       
    96         headers:  seq[ tuple[name: string, value: string] ] ## Any headers in the response.  Access with the `[]` proc.
       
    97         frame*:   string ## The Stomp frame type.
       
    98         payload*: string ## The message body, if any.
       
    99 
       
   100 
       
   101 # convenience
       
   102 proc printf( formatstr: cstring ) {.header: "<stdio.h>", varargs.}
       
   103 
       
   104 
       
   105 #-------------------------------------------------------------------
       
   106 # R E S P O N S E
       
   107 #-------------------------------------------------------------------
       
   108 
       
   109 proc is_eol( s: string ): bool =
       
   110     ## Convenience method, returns **true** if string is a Stomp EOF.
       
   111     return s == CR or s == CRLF
       
   112 
       
   113 
       
   114 proc parse_headers( response: StompResponse, c: StompClient ): int =
       
   115     ## Parse response headers from a stream.
       
   116     ## Returns the content length of the response body, or 0.
       
   117     result = 0
       
   118     var line = ""
       
   119 
       
   120     c.socket.readline( line, c.timeout )
       
   121     while not line.is_eol:
       
   122         if defined( debug ): printf " <-- %s\n", line
       
   123         var header = line.split( ":" )
       
   124         if header.len < 2: break
       
   125         response.headers.add( (header[0], header[1]) )
       
   126         if cmpIgnoreCase( header[0], "content-length" ) == 0: result = header[1].parse_int
       
   127         c.socket.readline( line, c.timeout )
       
   128 
       
   129 
       
   130 proc parse_payload( response: StompResponse, c: StompClient, bodylength = 0 ): void =
       
   131     ## Parse message payload from a stream.
       
   132     let bufsize = 8192
       
   133     var
       
   134         buf  = ""
       
   135         data = ""
       
   136 
       
   137 
       
   138     # If we already know the length of the body, just perform a buffered read.
       
   139     #
       
   140     if bodylength > 0:
       
   141         var
       
   142             readtotal = 0
       
   143             readamt   = 0
       
   144             remaining = 0
       
   145 
       
   146         while readtotal != bodylength:
       
   147             remaining = bodylength - readtotal
       
   148 
       
   149             if remaining < bufsize:
       
   150                 readamt = remaining
       
   151             else:
       
   152                 readamt = bufsize
       
   153 
       
   154             buf = newString( readamt )
       
   155             readtotal = readtotal + c.socket.recv( buf, readamt, c.timeout )
       
   156             data = data & buf
       
   157 
       
   158         # Eat the NULL terminator.
       
   159         discard c.socket.recv( buf, 1, c.timeout )
       
   160 
       
   161     # Inefficient path.
       
   162     #
       
   163     else:
       
   164         while buf != NULL:
       
   165             discard c.socket.recv( buf, 1, c.timeout )
       
   166             data = data & buf
       
   167 
       
   168     response.payload = data
       
   169 
       
   170 
       
   171 proc newStompResponse( c: StompClient ): StompResponse =
       
   172     ## Initialize a response object, which parses and contains
       
   173     ## the Stomp headers and any additional important data from
       
   174     ## the broker socket.
       
   175     new( result )
       
   176     result.headers = @[]
       
   177 
       
   178     # Get the frame type, record last seen server activity time.
       
   179     #
       
   180     var line = ""
       
   181     c.socket.readline( line, c.timeout )
       
   182     c.last_msgtime = get_time()
       
   183 
       
   184     # Heartbeat packets (empties.)
       
   185     #
       
   186     # This could -also- parse optional EOLs from the prior
       
   187     # message (after the NULL separator), but since it is a no-op,
       
   188     # this seems harmless.
       
   189     #
       
   190     if line.is_eol:
       
   191         result.frame = "HEARTBEAT"
       
   192         return result
       
   193 
       
   194     # All other types.
       
   195     #
       
   196     result.frame = line
       
   197     if defined( debug ):
       
   198         printf " <-- %s\n", line
       
   199 
       
   200     # Parse headers and body.
       
   201     #
       
   202     var length = result.parse_headers( c )
       
   203     if result.frame == "MESSAGE" or result.frame == "ERROR":
       
   204         result.parse_payload( c, length )
       
   205 
       
   206     # If the response -could- have a body, the NULL has already
       
   207     # been removed from the stream while we checked for one.
       
   208     #
       
   209     if result.payload.len > 0:
       
   210         if result.payload == NULL: # We checked for a body, but there was none.
       
   211             result.payload = ""
       
   212             if defined( debug ): printf " <--\n <-- ^@\n\n"
       
   213         else:
       
   214             if defined( debug ): printf " <--\n <-- (payload)^@\n\n"
       
   215 
       
   216     # Otherwise, pop off the NULL terminator now.
       
   217     #
       
   218     else:
       
   219         discard c.socket.recv( line, 1, c.timeout )
       
   220         if defined( debug ): printf " <--\n <-- ^@\n\n"
       
   221 
       
   222 
       
   223 proc `$`*( r: StompResponse ): string =
       
   224     ## Represent a Stomp response as a string.
       
   225     result = r.frame & ": " & $r.headers
       
   226 
       
   227 
       
   228 proc `[]`*( response: StompResponse, key: string ): string =
       
   229     ## Get a specific header from a Stomp response.
       
   230     for header in response.headers:
       
   231         if cmpIgnoreCase( key, header.name ) == 0:
       
   232             return header.value
       
   233     return ""
       
   234 
       
   235 
       
   236 #-------------------------------------------------------------------
       
   237 # C A L L B A C K S
       
   238 #-------------------------------------------------------------------
       
   239 
       
   240 proc default_error_callback( c: StompClient, response: StompResponse ) =
       
   241     ## Something bad happened.  Disconnect from the server, build an error message,
       
   242     ## and raise an exception.
       
   243     c.socket.close
       
   244     c.connected = false
       
   245 
       
   246     var detail = response.payload
       
   247     var msg    = response[ "message" ]
       
   248     if $detail[ ^1 ] == "\n": detail = detail[ 0 .. ^2 ] # chomp
       
   249 
       
   250     if detail.len > 0: msg = msg & " (" & detail & ")"
       
   251     raise newException( StompError, "ERROR: " & msg )
       
   252 
       
   253 
       
   254 proc default_missed_heartbeat_callback( c: StompClient ) =
       
   255     ## Timeout while connected to the broker.
       
   256     c.socket.close
       
   257     c.connected = false
       
   258     raise newException( StompError, "Heartbeat timeout.  Last activity: " & $c.last_msgtime )
       
   259 
       
   260 
       
   261 
       
   262 #-------------------------------------------------------------------
       
   263 # C L I E N T
       
   264 #-------------------------------------------------------------------
       
   265 
       
   266 proc newStompClient*( s: Socket, uri: string ): StompClient =
       
   267     ## Create a new Stomp client object from a preexisting **socket**,
       
   268     ## and a stomp **URI** string.
       
   269     ##
       
   270     ## .. code-block:: nim
       
   271     ##
       
   272     ##    var socket = newSocket()
       
   273     ##    var stomp  = newStompClient( socket, "stomp://test:test@example.com/%2Fvhost" )
       
   274     ##
       
   275     ## or if connecting with SSL, when compiled with -d:ssl:
       
   276     ##
       
   277     ## .. code-block:: nim
       
   278     ##
       
   279     ##    var socket = newSocket()
       
   280     ##    let sslContext = newContext( verifyMode = CVerifyNone )
       
   281     ##    sslContext.wrapSocket(socket)
       
   282     ##    var stomp = newStompClient( socket, "stomp+ssl://test:test@example.com/%2Fvhost" )
       
   283     ##
       
   284     new( result )
       
   285     result.socket        = s
       
   286     result.connected     = false
       
   287     result.uri           = parse_uri( uri )
       
   288     result.username      = result.uri.username
       
   289     result.password      = result.uri.password
       
   290     result.host          = result.uri.hostname
       
   291     result.vhost         = result.uri.path
       
   292     result.timeout       = 500
       
   293     result.subscriptions = @[]
       
   294     result.transactions  = @[]
       
   295 
       
   296     # Parse any supported options in the query string.
       
   297     #
       
   298     for pairs in result.uri.query.split( '&' ):
       
   299         let opt = pairs.split( '=' )
       
   300         try:
       
   301             case opt[0]:
       
   302                 of "heartbeat":
       
   303                     result.options.heartbeat = opt[1].parse_int
       
   304                 else:
       
   305                     discard
       
   306         except IndexError, ValueError:
       
   307             discard
       
   308 
       
   309     # Set default STOMP port if otherwise unset.
       
   310     #
       
   311     if not result.uri.scheme.contains( "stomp" ):
       
   312         raise newException( StompError, "Unknown scheme: " & result.uri.scheme  )
       
   313     var port: int
       
   314     if result.uri.port == "":
       
   315         if result.uri.scheme.contains( "+ssl" ):
       
   316             port = 61614
       
   317         else:
       
   318             port = 61613
       
   319     else:
       
   320         port = result.uri.port.parse_int
       
   321 
       
   322     result.port = Port( port )
       
   323 
       
   324     # Decode URI encoded slashes for vhosts.
       
   325     result.vhost = result.vhost.replace( "%2f", "/" ).replace( "%2F", "/" ).replace( "//", "/" )
       
   326 
       
   327 
       
   328 proc socksend( c: StompClient, data: string ): void =
       
   329     ## Send data on the connected socket with optional debug output.
       
   330     c.socket.send( data )
       
   331     if defined( debug ): printf " --> %s", data
       
   332 
       
   333 
       
   334 proc finmsg( c: StompClient ): void =
       
   335     ## Send data on the connected socket with optional debug output.
       
   336     c.socket.send( CRLF & NULL & CRLF )
       
   337     if defined( debug ): printf " --> \n --> ^@\n\n"
       
   338 
       
   339 
       
   340 proc `[]`*( c: StompClient, key: string ): string =
       
   341     ## Get a specific value from the server metadata, set during the initial connection.
       
   342     if not c.connected: return ""
       
   343     for header in c.serverinfo:
       
   344         if cmpIgnoreCase( key, header.name ) == 0:
       
   345             return header.value
       
   346     return ""
       
   347 
       
   348 
       
   349 proc `$`*( c: StompClient ): string =
       
   350     ## Represent the stomp client as a string, after masking the password.
       
   351     let uri = ( $c.uri ).replace( ":" & c.uri.password & "@", "@" )
       
   352     result = "(NimStomp v" & VERSION & ( if c.connected: " connected" else: " not connected" ) & " to " & uri
       
   353     if not ( c[ "server" ] == "" ): result.add( " --> " & c["server"] )
       
   354     result.add( ")" )
       
   355 
       
   356 
       
   357 proc connect*( c: StompClient ): void =
       
   358     ## Establish a connection to the Stomp server.
       
   359     if c.connected: return
       
   360 
       
   361     var headers: seq[ tuple[name: string, value: string] ] = @[]
       
   362     headers.add( ("accept-version", "1.2") )
       
   363 
       
   364     # Stomp 1.2 requires the Host: header.  Use the path as a vhost if
       
   365     # supplied, otherwise use the hostname of the server.
       
   366     #
       
   367     if c.vhost != "":
       
   368         headers.add( ("host", c.vhost) )
       
   369     else:
       
   370         headers.add( ("host", c.host) )
       
   371 
       
   372     if c.username != "": headers.add( ("login", c.username) )
       
   373     if c.password != "": headers.add( ("passcode", c.password) )
       
   374     if c.options.heartbeat > 0:
       
   375         let heartbeat = c.options.heartbeat * 1000
       
   376         headers.add( ("heart-beat", "0," & $heartbeat) )
       
   377 
       
   378     # Connect the socket and send the headers off.
       
   379     #
       
   380     c.socket.connect( c.host, c.port )
       
   381     c.socksend( "CONNECT" & CRLF )
       
   382     for header in headers:
       
   383         c.socksend( header.name & ":" & header.value & CRLF )
       
   384     c.finmsg
       
   385 
       
   386     # Retreive and copy server metadata to client object.
       
   387     #
       
   388     var response = newStompResponse( c )
       
   389     c.serverinfo = response.headers
       
   390 
       
   391     if response.frame != "CONNECTED":
       
   392         if not isNil( c.error_callback ):
       
   393             c.error_callback( c, response )
       
   394         else:
       
   395             c.default_error_callback( response )
       
   396     else:
       
   397         c.connected = true
       
   398         if not isNil( c.connected_callback ):
       
   399             c.connected_callback( c, response )
       
   400 
       
   401 
       
   402 proc disconnect*( c: StompClient ): void =
       
   403     ## Break down the connection to the Stomp server nicely.
       
   404     if not c.connected: return
       
   405     c.socksend( "DISCONNECT" & CRLF )
       
   406     c.finmsg
       
   407 
       
   408     c.socket.close
       
   409     c.connected = false
       
   410 
       
   411 
       
   412 proc add_txn( c: StompClient ): void =
       
   413     ## Add a transaction header if there is only a single open txn.
       
   414     if c.transactions.len != 1: return
       
   415     c.socksend( "transaction:" & c.transactions[0] & CRLF )
       
   416 
       
   417 
       
   418 proc send*( c: StompClient,
       
   419             destination: string,
       
   420             message:     string = "",
       
   421             contenttype: string = "",
       
   422             headers:     seq[ tuple[name: string, value: string] ] = @[] ): void =
       
   423     ## Send a **message** to **destination**.
       
   424     ##
       
   425     ## A Content-Length header is automatically and always included.
       
   426     ## A **contenttype** is optional, but strongly recommended.
       
   427     ##
       
   428     ## Additionally, a transaction ID is automatically added if there is only
       
   429     ## one transaction active.  If you need to attach this message to a particular
       
   430     ## transaction ID, you'll need to add it yourself with the user defined
       
   431     ## **headers**.
       
   432 
       
   433     if not c.connected: raise newException( StompError, "Client is not connected." )
       
   434     c.socksend( "SEND" & CRLF )
       
   435     c.socksend( "destination:" & destination & CRLF )
       
   436     c.socksend( "content-length:" & $message.len & CRLF )
       
   437     if not ( contenttype == "" ): c.socksend( "content-type:" & contenttype & CRLF )
       
   438 
       
   439     # Add custom headers.  Add transaction header if one isn't manually
       
   440     # present (and a transaction is open.)
       
   441     #
       
   442     var txn_seen = false
       
   443     for header in headers:
       
   444         if header.name == "transaction": txn_seen = true
       
   445         c.socksend( header.name & ":" & header.value & CRLF )
       
   446     if not txn_seen: c.add_txn
       
   447 
       
   448     if message == "":
       
   449         c.finmsg
       
   450     else:
       
   451         c.socket.send( CRLF & message & NULL )
       
   452         if defined( debug ): printf " -->\n --> (payload)^@\n\n"
       
   453 
       
   454 
       
   455 proc subscribe*( c: StompClient,
       
   456             destination: string,
       
   457             ack        = "auto",
       
   458             headers:     seq[ tuple[name: string, value: string] ] = @[] ): void =
       
   459     ## Subscribe to messages at **destination**.
       
   460     ##
       
   461     ## Setting **ack** to "client" or "client-individual" enables client ACK/NACK mode.
       
   462     ## In this mode, incoming messages aren't considered processed by
       
   463     ## the server unless they receive ACK.  By default, the server
       
   464     ## considers the message processed if a client simply accepts it.
       
   465     ##
       
   466     ## You may optionally add any additional **headers** the server may support.
       
   467 
       
   468     if not c.connected: raise newException( StompError, "Client is not connected." )
       
   469     c.socksend( "SUBSCRIBE" & CRLF )
       
   470     c.socksend( "destination:" & destination & CRLF )
       
   471     c.socksend( "id:" & $c.subscriptions.len & CRLF )
       
   472     if ack == "client" or ack == "client-individual":
       
   473         c.socksend( "ack:" & ack & CRLF )
       
   474     else:
       
   475         if ack != "auto": raise newException( StompError, "Unknown ack type: " & ack )
       
   476 
       
   477     for header in headers:
       
   478         c.socksend( header.name & ":" & header.value & CRLF )
       
   479     c.finmsg
       
   480     c.subscriptions.add( destination )
       
   481 
       
   482 
       
   483 proc unsubscribe*( c: StompClient,
       
   484             destination: string,
       
   485             headers:     seq[ tuple[name: string, value: string] ] = @[] ): void =
       
   486     ## Unsubscribe from messages at **destination**.
       
   487     ## You may optionally add any additional **headers** the server may support.
       
   488 
       
   489     if not c.connected: raise newException( StompError, "Client is not connected." )
       
   490     var
       
   491         sub_id: int
       
   492         i = 0
       
   493 
       
   494     # Find the ID of the subscription.
       
   495     #
       
   496     for sub in c.subscriptions:
       
   497         if sub == destination:
       
   498             sub_id = i
       
   499             break
       
   500         i = i + 1
       
   501 
       
   502     c.socksend( "UNSUBSCRIBE" & CRLF )
       
   503     c.socksend( "id:" & $sub_id & CRLF )
       
   504     for header in headers:
       
   505         c.socksend( header.name & ":" & header.value & CRLF )
       
   506     c.finmsg
       
   507     c.subscriptions[ sub_id ] = ""
       
   508 
       
   509 
       
   510 proc begin*( c: StompClient, txn: string ): void =
       
   511     ## Begin a new transaction on the broker, using **txn** as the identifier.
       
   512     c.socksend( "BEGIN" & CRLF )
       
   513     c.socksend( "transaction:" & txn & CRLF )
       
   514     c.finmsg
       
   515     c.transactions.add( txn )
       
   516 
       
   517 
       
   518 proc commit*( c: StompClient, txn: string = "" ): void =
       
   519     ## Finish a specific transaction **txn**, or the most current if unspecified.
       
   520     var transaction = txn
       
   521     if transaction == "" and c.transactions.len > 0: transaction = c.transactions.pop
       
   522     if transaction == "": return
       
   523 
       
   524     c.socksend( "COMMIT" & CRLF )
       
   525     c.socksend( "transaction:" & transaction & CRLF )
       
   526     c.finmsg
       
   527 
       
   528     # Remove the transaction from the queue.
       
   529     #
       
   530     var new_transactions: seq[ string ] = @[]
       
   531     for txn in c.transactions:
       
   532         if txn != transaction: new_transactions.add( txn )
       
   533     c.transactions = new_transactions
       
   534 
       
   535 
       
   536 proc abort*( c: StompClient, txn: string = "" ): void =
       
   537     ## Cancel a specific transaction **txn**, or the most current if unspecified.
       
   538     var transaction = txn
       
   539     if transaction == "" and c.transactions.len > 0: transaction = c.transactions.pop
       
   540     if transaction == "": return
       
   541 
       
   542     c.socksend( "ABORT" & CRLF )
       
   543     c.socksend( "transaction:" & transaction & CRLF )
       
   544     c.finmsg
       
   545 
       
   546     # Remove the transaction from the queue.
       
   547     #
       
   548     var new_transactions: seq[ string ] = @[]
       
   549     for txn in c.transactions:
       
   550         if txn != transaction: new_transactions.add( txn )
       
   551     c.transactions = new_transactions
       
   552 
       
   553 
       
   554 proc ack*( c: StompClient, id: string, transaction: string = "" ): void =
       
   555     ## Acknowledge message **id**.  Optionally, attach this acknowledgement
       
   556     ## to a specific **transaction** -- if there's only one active, it is
       
   557     ## added automatically.
       
   558     c.socksend( "ACK" & CRLF )
       
   559     c.socksend( "id:" & id & CRLF )
       
   560     if not ( transaction == "" ):
       
   561         c.socksend( "transaction:" & transaction & CRLF )
       
   562     else:
       
   563         c.add_txn
       
   564     c.finmsg
       
   565 
       
   566 
       
   567 proc nack*( c: StompClient, id: string, transaction: string = "" ): void =
       
   568     ## Reject message **id**.  Optionally, attach this rejection to a
       
   569     ## specific **transaction** -- if there's only one active, it is
       
   570     ## added automatically.
       
   571     ##
       
   572     ## Subscribe to a queue with ACK mode enabled, and reject the message
       
   573     ## on error:
       
   574     ##
       
   575     ## .. code-block:: nim
       
   576     ##    
       
   577     ##  stomp.subscribe( "/queue/test", "client-individual" )
       
   578     ##  FIXME: attach procs
       
   579     ##  stomp.wait_for_messages
       
   580     ##
       
   581     c.socksend( "NACK" & CRLF )
       
   582     c.socksend( "id:" & id & CRLF )
       
   583     if not ( transaction == "" ):
       
   584         c.socksend( "transaction:" & transaction & CRLF )
       
   585     else:
       
   586         c.add_txn
       
   587     c.finmsg
       
   588 
       
   589 
       
   590 proc wait_for_messages*( c: StompClient, loop=true ) =
       
   591     ## Enter a blocking select loop, dispatching to the appropriate proc
       
   592     ## for the received message type.   Return after a single message
       
   593     ## is received if **loop** is set to **false**.
       
   594 
       
   595     if not c.connected: raise newException( StompError, "Client is not connected." )
       
   596 
       
   597     while true:
       
   598         var
       
   599             timeout: int
       
   600             fds = @[ c.socket.get_fd ]
       
   601 
       
   602         # Check for missed heartbeats, with an additional second
       
   603         # of wiggle-room.
       
   604         #
       
   605         if c.options.heartbeat > 0:
       
   606              timeout = ( c.options.heartbeat + 1 ) * 1000
       
   607         else:
       
   608             timeout = -1
       
   609 
       
   610         if select_read( fds, timeout ) == 0: # timeout, only happens if heartbeating missed
       
   611             if not isNil( c.missed_heartbeat_callback ):
       
   612                 c.missed_heartbeat_callback( c )
       
   613             else:
       
   614                 c.default_missed_heartbeat_callback
       
   615             if loop: continue else: break
       
   616 
       
   617         let response = newStompResponse( c )
       
   618         case response.frame:
       
   619 
       
   620             of "HEARTBEAT":
       
   621                 if not isNil( c.heartbeat_callback ):
       
   622                     c.heartbeat_callback( c, response )
       
   623                 continue
       
   624 
       
   625             of "RECEIPT":
       
   626                 if not isNil( c.receipt_callback ):
       
   627                     c.receipt_callback( c, response )
       
   628 
       
   629             of "MESSAGE":
       
   630                 if not isNil( c.message_callback ):
       
   631                     c.message_callback( c, response )
       
   632 
       
   633             of "ERROR":
       
   634                 if not isNil( c.error_callback ):
       
   635                     c.error_callback( c, response )
       
   636                 else:
       
   637                     c.default_error_callback( response )
       
   638 
       
   639             else:
       
   640                 if defined( debug ):
       
   641                     echo "Strange broker frame: " & response.repr
       
   642 
       
   643         if not loop: break
       
   644 
       
   645 
       
   646 
       
   647 #-------------------------------------------------------------------
       
   648 # T E S T S
       
   649 #-------------------------------------------------------------------
       
   650 
       
   651 # Functional (rather than unit) tests.  Requires a Stomp compatible broker.
       
   652 # This was tested against RabbitMQ 3.5.3 and 3.6.0.
       
   653 # 3.6.0 was -so- much faster.
       
   654 #
       
   655 # First start up a message receiver:
       
   656 #   ./stomp receiver [stomp-uri] [subscription-destination]
       
   657 #
       
   658 # then run another process, to publish stuff:
       
   659 #   ./stomp publisher [stomp-uri] [publish-destination]
       
   660 #
       
   661 # An example with an AMQP "direct" exchange, and an exclusive queue:
       
   662 #   ./stomp publisher stomp://test:test@localhost/?heartbeat=10 /exchange/test
       
   663 #   ./stomp receiver  stomp://test:test@localhost/?heartbeat=10 /exchange/test
       
   664 #
       
   665 # Then just let 'er run.
       
   666 #
       
   667 # You can also run a nieve benchmark (deliveries/sec):
       
   668 #
       
   669 #   ./stomp benchmark stomp://test:test@localhost/ /exchange/test
       
   670 #
       
   671 # It will set messages to require acknowledgement, and nack everything, causing
       
   672 # a delivery loop for 10 seconds.
       
   673 #
       
   674 when isMainModule:
       
   675     let expected = 8
       
   676     var
       
   677         socket   = newSocket()
       
   678         messages: seq[ StompResponse ] = @[]
       
   679 
       
   680     let usage = """
       
   681 First start up a message receiver:
       
   682   ./stomp receiver [stomp-uri] [subscription-destination]
       
   683 
       
   684 then run another process, to publish stuff:
       
   685   ./stomp publisher [stomp-uri] [publish-destination]
       
   686 
       
   687 An example with an AMQP "direct" exchange, and an exclusive queue:
       
   688   ./stomp publisher stomp://test:test@localhost/?heartbeat=10 /exchange/test
       
   689   ./stomp receiver  stomp://test:test@localhost/?heartbeat=10 /exchange/test
       
   690 
       
   691 Then just let 'er run.
       
   692 
       
   693 You can also run a nieve benchmark (deliveries/sec):
       
   694 
       
   695   ./stomp benchmark stomp://test:test@localhost/ /exchange/test
       
   696 
       
   697 It will set messages to require acknowledgement, and nack everything, causing
       
   698 a delivery loop for 10 seconds.
       
   699 """
       
   700 
       
   701 
       
   702     if paramCount() != 3: quit usage
       
   703 
       
   704     var stomp = newStompClient( socket, paramStr(2) )
       
   705     stomp.connect
       
   706     echo stomp
       
   707 
       
   708     case paramStr(1):
       
   709 
       
   710         of "benchmark":
       
   711             echo "* Running for 10 seconds.  Compile with -d:debug to see the Stomp conversation."
       
   712             var count = 0
       
   713             var start = get_time()
       
   714 
       
   715             proc incr( c: StompClient, r: StompResponse ) =
       
   716                 let id = r["ack"]
       
   717                 count = count + 1
       
   718                 c.nack( id )
       
   719 
       
   720             stomp.message_callback = incr
       
   721             stomp.subscribe( paramStr(3), "client" )
       
   722             stomp.send( paramStr(3), "hi." )
       
   723             while get_time() < start + 10.seconds:
       
   724                 stomp.wait_for_messages( false )
       
   725 
       
   726             printf "* Processed %d messages in 10 seconds.\n", count
       
   727             stomp.disconnect
       
   728 
       
   729 
       
   730         # Store incoming messages, ensure their contents match our expected behavior.
       
   731         #
       
   732         of "receiver":
       
   733             var heartbeats = 0
       
   734             echo "* Waiting on messages from publisher.  Compile with -d:debug to see the Stomp conversation."
       
   735 
       
   736             proc receive_message( c: StompClient, r: StompResponse ) =
       
   737                 messages.add( r )
       
   738                 case r.frame:
       
   739                     of "RECEIPT":
       
   740                         discard
       
   741                     of "MESSAGE":
       
   742                         let body = r.payload
       
   743                         let id   = r[ "ack" ]
       
   744 
       
   745             proc seen_heartbeat( c: StompClient, r: StompResponse ) =
       
   746                 heartbeats = heartbeats + 1
       
   747 
       
   748             stomp.message_callback   = receive_message
       
   749             stomp.receipt_callback   = receive_message
       
   750             stomp.heartbeat_callback = seen_heartbeat
       
   751             stomp.subscribe( paramStr(3) )
       
   752 
       
   753             # Populate the messages sequence with the count of expected messages.
       
   754             for i in 1..expected: stomp.wait_for_messages( false )
       
   755 
       
   756             # Assertions on the results!
       
   757             #
       
   758             doAssert( messages.len == expected )
       
   759             doAssert( messages[0].payload == "" )
       
   760 
       
   761             doAssert( messages[1].payload == "Hello world!" )
       
   762 
       
   763             doAssert( messages[2].payload == "Dumb.\n\n" )
       
   764 
       
   765             doAssert( messages[3].payload == "Hello again." )
       
   766             doAssert( messages[3][ "content-type" ] == "text/plain" )
       
   767             doAssert( messages[3][ "Content-Type" ] == "text/plain" )
       
   768 
       
   769             doAssert( messages[4][ "x-custom" ] == "yum" )
       
   770 
       
   771             doAssert( messages[5][ "receipt" ] == "42" )
       
   772 
       
   773             doAssert( messages[6].payload == "transaction!" )
       
   774             doAssert( messages[7].payload == "transaction 2" )
       
   775 
       
   776             stomp.disconnect
       
   777 
       
   778             if heartbeats > 0:
       
   779                 printf "* Tests passed! %d heartbeats seen.", heartbeats
       
   780             else:
       
   781                 echo "* Tests passed!"
       
   782 
       
   783 
       
   784         # Publish a variety of messages with various options.
       
   785         # Pause momentarily between sends(), as brokers -might- impose
       
   786         # rate limits and/or message dropping.
       
   787         #
       
   788         of "publisher":
       
   789             echo "* Publishing to receiver.  Compile with -d:debug to see the Stomp conversation."
       
   790 
       
   791             # Simple, no frills event.
       
   792             stomp.send( paramStr(3) )
       
   793             sleep 500
       
   794 
       
   795             # Event with a body.
       
   796             stomp.send( paramStr(3), "Hello world!" )
       
   797             sleep 500
       
   798 
       
   799             # Event that doesn't contain a content-length.
       
   800             # (Note, the broker may elect to add one on your behalf, which is a good thing...
       
   801             # but invalidates this test.)
       
   802             stomp.socksend( "SEND" & CRLF )
       
   803             stomp.socksend( "destination:" & paramStr(3) & CRLF & CRLF )
       
   804             stomp.socksend( "Dumb.\n\n" & NULL )
       
   805             sleep 500
       
   806 
       
   807             # Content-Type
       
   808             stomp.send( paramStr(3), "Hello again.", "text/plain" )
       
   809             sleep 500
       
   810 
       
   811             # Custom headers.
       
   812             var headers: seq[ tuple[ name: string, value: string ] ] = @[]
       
   813             headers.add( ("x-custom", "yum") )
       
   814             stomp.send( paramStr(3), "Hello again.", "text/plain", headers )
       
   815             sleep 500
       
   816 
       
   817             # Receipt requests.
       
   818             proc receive_receipt( c: StompClient, r: StompResponse ) =
       
   819                 messages.add( r )
       
   820             headers = @[]
       
   821             headers.add( ("receipt", "42") )
       
   822             stomp.send( paramStr(3), "Hello again.", "text/plain", headers )
       
   823             stomp.receipt_callback = receive_receipt
       
   824             stomp.wait_for_messages( false )
       
   825             doAssert( messages[0]["receipt-id"] == "42" )
       
   826 
       
   827             # Aborted transaction.
       
   828             stomp.begin( "test-abort" )
       
   829             for i in 1..3:
       
   830                 stomp.send( paramStr(3), "Message: " & $i )
       
   831             stomp.abort
       
   832 
       
   833             # Committed transaction.
       
   834             stomp.begin( "test-commit" )
       
   835             stomp.send( paramStr(3), "transaction!" )
       
   836             stomp.commit
       
   837 
       
   838             # Mixed transactions.
       
   839             for i in 1..3:
       
   840                 headers = @[]
       
   841                 headers.add( ("transaction", "test-" & $i ) )
       
   842                 stomp.begin( "test-" & $i )
       
   843                 stomp.send( paramStr(3), "transaction " & $i, "", headers )
       
   844                 sleep 500
       
   845             stomp.abort( "test-1" )
       
   846             sleep 500
       
   847             stomp.commit( "test-2" )
       
   848             sleep 500
       
   849             stomp.abort( "test-3" )
       
   850             sleep 500
       
   851 
       
   852             stomp.disconnect
       
   853             echo "* Tests passed!"
       
   854 
       
   855         else:
       
   856             quit usage
       
   857