# vim: set et nosta sw=4 ts=4 ft=nim : # # Copyright (c) 2016, Mahlon E. Smith # All rights reserved. # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: # # * Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # # * Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # # * Neither the name of Mahlon E. Smith nor the names of his # contributors may be used to endorse or promote products derived # from this software without specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND ANY # EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE # DISCLAIMED. IN NO EVENT SHALL THE REGENTS AND CONTRIBUTORS BE LIABLE FOR ANY # DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES # (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND # ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. ## Overview ## ============ ## ## This is a pure-nim client library for interacting with Stomp 1.2 ## compliant messaging brokers. ## ## https://stomp.github.io/stomp-specification-1.2.html ## ## Stomp is a simple protocol for message passing between clients, using a central ## broker. It is a subset of other more elaborate protocols (like AMQP), supporting ## only the most used features of common brokers. ## ## Because this library is pure-nim, there are no external dependencies. If you ## can compile a nim binary, you can participate in advanced messaging between processes. ## ## A list of broker support for Stomp can be found here: ## https://stomp.github.io/implementations.html. ## ## This library has been tested with recent versions of RabbitMQ. If it ## works for you with another broker, please let the author know. ## ## ## Protocol support ## ---------------- ## ## Examples ## ========= ## ## Connecting with SSL ## ------------------- ## import strutils, nativesockets, net, os, times, uri const VERSION = "0.1.0" ## The current program version. NULL = "\x00" ## The NULL character. CR = "\r" ## The carriage return character. CRLF = "\r\n" ## Carriage return + Line feed (EOL). # Exceptions # type StompError* = object of ValueError ## A generic Stomp error state. StompClient* = ref object of RootObj ## An object that represents a connection to a Stomp compatible server. socket: Socket ## The socket object attached to this client. connected*: bool ## Is the client currently connected? uri*: Uri ## The URI used to instantiate this client. username: string ## The Stomp server user, if any. password: string ## The Stomp server password, if any. host: string ## The host or IP address to connect to. port: Port ## Optional, if Stomp is on a non-default port. vhost: string ## Parsed from the URI path, a Stomp "virtual host". timeout*: int ## Global socket timeout. last_msgtime*: Time ## Timestamp of last seen server message. options: tuple[ heartbeat: int ] ## Any supported client options, derived from the URI query string. subscriptions*: seq[ string ] ## Registered client subscriptions. Array position is the ID. transactions*: seq[ string ] ## Any currently open transactions. serverinfo: seq[ tuple[name: string, value: string] ] ## Server metadata, populated upon a successful connection. connected_callback*: proc ( client: StompClient, response: StompResponse ): void error_callback*: proc ( client: StompClient, response: StompResponse ): void heartbeat_callback*: proc ( client: StompClient, response: StompResponse ): void message_callback*: proc ( client: StompClient, response: StompResponse ): void missed_heartbeat_callback*: proc ( client: StompClient ): void receipt_callback*: proc ( client: StompClient, response: StompResponse ): void StompResponse* = ref object of RootObj ## A parsed packet from a Stomp server. headers: seq[ tuple[name: string, value: string] ] ## Any headers in the response. Access with the `[]` proc. frame*: string ## The Stomp frame type. payload*: string ## The message body, if any. # convenience proc printf( formatstr: cstring ) {.header: "", varargs.} #------------------------------------------------------------------- # R E S P O N S E #------------------------------------------------------------------- proc is_eol( s: string ): bool = ## Convenience method, returns **true** if string is a Stomp EOF. return s == CR or s == CRLF proc parse_headers( response: StompResponse, c: StompClient ): int = ## Parse response headers from a stream. ## Returns the content length of the response body, or 0. result = 0 var line = "" c.socket.readline( line, c.timeout ) while not line.is_eol: if defined( debug ): printf " <-- %s\n", line var header = line.split( ":" ) if header.len < 2: break response.headers.add( (header[0], header[1]) ) if cmpIgnoreCase( header[0], "content-length" ) == 0: result = header[1].parse_int c.socket.readline( line, c.timeout ) proc parse_payload( response: StompResponse, c: StompClient, bodylength = 0 ): void = ## Parse message payload from a stream. let bufsize = 8192 var buf = "" data = "" # If we already know the length of the body, just perform a buffered read. # if bodylength > 0: var readtotal = 0 readamt = 0 remaining = 0 while readtotal != bodylength: remaining = bodylength - readtotal if remaining < bufsize: readamt = remaining else: readamt = bufsize buf = newString( readamt ) readtotal = readtotal + c.socket.recv( buf, readamt, c.timeout ) data = data & buf # Eat the NULL terminator. discard c.socket.recv( buf, 1, c.timeout ) # Inefficient path. # else: while buf != NULL: discard c.socket.recv( buf, 1, c.timeout ) data = data & buf response.payload = data proc newStompResponse( c: StompClient ): StompResponse = ## Initialize a response object, which parses and contains ## the Stomp headers and any additional important data from ## the broker socket. new( result ) result.headers = @[] # Get the frame type, record last seen server activity time. # var line = "" c.socket.readline( line, c.timeout ) c.last_msgtime = get_time() # Heartbeat packets (empties.) # # This could -also- parse optional EOLs from the prior # message (after the NULL separator), but since it is a no-op, # this seems harmless. # if line.is_eol: result.frame = "HEARTBEAT" return result # All other types. # result.frame = line if defined( debug ): printf " <-- %s\n", line # Parse headers and body. # var length = result.parse_headers( c ) if result.frame == "MESSAGE" or result.frame == "ERROR": result.parse_payload( c, length ) # If the response -could- have a body, the NULL has already # been removed from the stream while we checked for one. # if result.payload.len > 0: if result.payload == NULL: # We checked for a body, but there was none. result.payload = nil if defined( debug ): printf " <--\n <-- ^@\n\n" else: if defined( debug ): printf " <--\n <-- (payload)^@\n\n" # Otherwise, pop off the NULL terminator now. # else: discard c.socket.recv( line, 1, c.timeout ) if defined( debug ): printf " <--\n <-- ^@\n\n" proc `$`*( r: StompResponse ): string = ## Represent a Stomp response as a string. result = r.frame & ": " & $r.headers proc `[]`*( response: StompResponse, key: string ): string = ## Get a specific header from a Stomp response. for header in response.headers: if cmpIgnoreCase( key, header.name ) == 0: return header.value return nil #------------------------------------------------------------------- # C A L L B A C K S #------------------------------------------------------------------- proc default_error_callback( c: StompClient, response: StompResponse ) = ## Something bad happened. Disconnect from the server, build an error message, ## and raise an exception. c.socket.close c.connected = false var detail = response.payload var msg = response[ "message" ] if $detail[ ^1 ] == "\n": detail = detail[ 0 .. ^2 ] # chomp if detail.len > 0: msg = msg & " (" & detail & ")" raise newException( StompError, "ERROR: " & msg ) proc default_missed_heartbeat_callback( c: StompClient ) = ## Timeout while connected to the broker. c.socket.close c.connected = false raise newException( StompError, "Heartbeat timeout. Last activity: " & $c.last_msgtime ) #------------------------------------------------------------------- # C L I E N T #------------------------------------------------------------------- proc newStompClient*( s: Socket, uri: string ): StompClient = ## Create a new Stomp client object from a preexisting **socket**, ## and a stomp **URI** string. ## ## .. code-block:: nim ## ## var socket = newSocket() ## var stomp = newStompClient( socket, "stomp://test:test@example.com/%2Fvhost" ) ## ## or if connecting with SSL, when compiled with -d:ssl: ## ## .. code-block:: nim ## ## var socket = newSocket() ## let sslContext = newContext( verifyMode = CVerifyNone ) ## sslContext.wrapSocket(socket) ## var stomp = newStompClient( socket, "stomp+ssl://test:test@example.com/%2Fvhost" ) ## new( result ) result.socket = s result.connected = false result.uri = parse_uri( uri ) result.username = result.uri.username result.password = result.uri.password result.host = result.uri.hostname result.vhost = result.uri.path result.timeout = 500 result.subscriptions = @[] result.transactions = @[] # Parse any supported options in the query string. # for pairs in result.uri.query.split( '&' ): let opt = pairs.split( '=' ) try: case opt[0]: of "heartbeat": result.options.heartbeat = opt[1].parse_int else: discard except IndexError, ValueError: discard # Set default STOMP port if otherwise unset. # if not result.uri.scheme.contains( "stomp" ): raise newException( StompError, "Unknown scheme: " & result.uri.scheme ) var port: int if result.uri.port == "": if result.uri.scheme.contains( "+ssl" ): port = 61614 else: port = 61613 else: port = result.uri.port.parse_int result.port = Port( port ) # Decode URI encoded slashes for vhosts. result.vhost = result.vhost.replace( "%2f", "/" ).replace( "%2F", "/" ).replace( "//", "/" ) proc socksend( c: StompClient, data: string ): void = ## Send data on the connected socket with optional debug output. c.socket.send( data ) if defined( debug ): printf " --> %s", data proc finmsg( c: StompClient ): void = ## Send data on the connected socket with optional debug output. c.socket.send( CRLF & NULL & CRLF ) if defined( debug ): printf " --> \n --> ^@\n\n" proc `[]`*( c: StompClient, key: string ): string = ## Get a specific value from the server metadata, set during the initial connection. if not c.connected: return nil for header in c.serverinfo: if cmpIgnoreCase( key, header.name ) == 0: return header.value return nil proc `$`*( c: StompClient ): string = ## Represent the stomp client as a string, after masking the password. let uri = ( $c.uri ).replace( ":" & c.uri.password & "@", "@" ) result = "(NimStomp v" & VERSION & ( if c.connected: " connected" else: " not connected" ) & " to " & uri if not c[ "server" ].is_nil: result.add( " --> " & c["server"] ) result.add( ")" ) proc connect*( c: StompClient ): void = ## Establish a connection to the Stomp server. if c.connected: return var headers: seq[ tuple[name: string, value: string] ] = @[] headers.add( ("accept-version", "1.2") ) # Stomp 1.2 requires the Host: header. Use the path as a vhost if # supplied, otherwise use the hostname of the server. # if c.vhost != "": headers.add( ("host", c.vhost) ) else: headers.add( ("host", c.host) ) if c.username != "": headers.add( ("login", c.username) ) if c.password != "": headers.add( ("passcode", c.password) ) if c.options.heartbeat > 0: let heartbeat = c.options.heartbeat * 1000 headers.add( ("heart-beat", "0," & $heartbeat) ) # Connect the socket and send the headers off. # c.socket.connect( c.host, c.port ) c.socksend( "CONNECT" & CRLF ) for header in headers: c.socksend( header.name & ":" & header.value & CRLF ) c.finmsg # Retreive and copy server metadata to client object. # var response = newStompResponse( c ) c.serverinfo = response.headers if response.frame != "CONNECTED": if not isNil( c.error_callback ): c.error_callback( c, response ) else: c.default_error_callback( response ) else: c.connected = true if not isNil( c.connected_callback ): c.connected_callback( c, response ) proc disconnect*( c: StompClient ): void = ## Break down the connection to the Stomp server nicely. if not c.connected: return c.socksend( "DISCONNECT" & CRLF ) c.finmsg c.socket.close c.connected = false proc add_txn( c: StompClient ): void = ## Add a transaction header if there is only a single open txn. if c.transactions.len != 1: return c.socksend( "transaction:" & c.transactions[0] & CRLF ) proc send*( c: StompClient, destination: string, message: string = nil, contenttype: string = nil, headers: seq[ tuple[name: string, value: string] ] = @[] ): void = ## Send a **message** to **destination**. ## ## A Content-Length header is automatically and always included. ## A **contenttype** is optional, but strongly recommended. ## ## Additionally, a transaction ID is automatically added if there is only ## one transaction active. If you need to attach this message to a particular ## transaction ID, you'll need to add it yourself with the user defined ## **headers**. if not c.connected: raise newException( StompError, "Client is not connected." ) c.socksend( "SEND" & CRLF ) c.socksend( "destination:" & destination & CRLF ) c.socksend( "content-length:" & $message.len & CRLF ) if not contenttype.is_nil: c.socksend( "content-type:" & contenttype & CRLF ) # Add custom headers. Add transaction header if one isn't manually # present (and a transaction is open.) # var txn_seen = false for header in headers: if header.name == "transaction": txn_seen = true c.socksend( header.name & ":" & header.value & CRLF ) if not txn_seen: c.add_txn if message.is_nil: c.finmsg else: c.socket.send( CRLF & message & NULL ) if defined( debug ): printf " -->\n --> (payload)^@\n\n" proc subscribe*( c: StompClient, destination: string, ack = "auto", headers: seq[ tuple[name: string, value: string] ] = @[] ): void = ## Subscribe to messages at **destination**. ## ## Setting **ack** to "client" or "client-individual" enables client ACK/NACK mode. ## In this mode, incoming messages aren't considered processed by ## the server unless they receive ACK. By default, the server ## considers the message processed if a client simply accepts it. ## ## You may optionally add any additional **headers** the server may support. if not c.connected: raise newException( StompError, "Client is not connected." ) c.socksend( "SUBSCRIBE" & CRLF ) c.socksend( "destination:" & destination & CRLF ) c.socksend( "id:" & $c.subscriptions.len & CRLF ) if ack == "client" or ack == "client-individual": c.socksend( "ack:" & ack & CRLF ) else: if ack != "auto": raise newException( StompError, "Unknown ack type: " & ack ) for header in headers: c.socksend( header.name & ":" & header.value & CRLF ) c.finmsg c.subscriptions.add( destination ) proc unsubscribe*( c: StompClient, destination: string, headers: seq[ tuple[name: string, value: string] ] = @[] ): void = ## Unsubscribe from messages at **destination**. ## You may optionally add any additional **headers** the server may support. if not c.connected: raise newException( StompError, "Client is not connected." ) var sub_id: int i = 0 # Find the ID of the subscription. # for sub in c.subscriptions: if sub == destination: sub_id = i break i = i + 1 c.socksend( "UNSUBSCRIBE" & CRLF ) c.socksend( "id:" & $sub_id & CRLF ) for header in headers: c.socksend( header.name & ":" & header.value & CRLF ) c.finmsg c.subscriptions[ sub_id ] = "" proc begin*( c: StompClient, txn: string ): void = ## Begin a new transaction on the broker, using **txn** as the identifier. c.socksend( "BEGIN" & CRLF ) c.socksend( "transaction:" & txn & CRLF ) c.finmsg c.transactions.add( txn ) proc commit*( c: StompClient, txn: string = nil ): void = ## Finish a specific transaction **txn**, or the most current if unspecified. var transaction = txn if transaction.is_nil and c.transactions.len > 0: transaction = c.transactions.pop if transaction.is_nil: return c.socksend( "COMMIT" & CRLF ) c.socksend( "transaction:" & transaction & CRLF ) c.finmsg # Remove the transaction from the queue. # var new_transactions: seq[ string ] = @[] for txn in c.transactions: if txn != transaction: new_transactions.add( txn ) c.transactions = new_transactions proc abort*( c: StompClient, txn: string = nil ): void = ## Cancel a specific transaction **txn**, or the most current if unspecified. var transaction = txn if transaction.is_nil and c.transactions.len > 0: transaction = c.transactions.pop if transaction.is_nil: return c.socksend( "ABORT" & CRLF ) c.socksend( "transaction:" & transaction & CRLF ) c.finmsg # Remove the transaction from the queue. # var new_transactions: seq[ string ] = @[] for txn in c.transactions: if txn != transaction: new_transactions.add( txn ) c.transactions = new_transactions proc ack*( c: StompClient, id: string, transaction: string = nil ): void = ## Acknowledge message **id**. Optionally, attach this acknowledgement ## to a specific **transaction** -- if there's only one active, it is ## added automatically. c.socksend( "ACK" & CRLF ) c.socksend( "id:" & id & CRLF ) if not transaction.is_nil: c.socksend( "transaction:" & transaction & CRLF ) else: c.add_txn c.finmsg proc nack*( c: StompClient, id: string, transaction: string = nil ): void = ## Reject message **id**. Optionally, attach this rejection to a ## specific **transaction** -- if there's only one active, it is ## added automatically. ## ## Subscribe to a queue with ACK mode enabled, and reject the message ## on error: ## ## .. code-block:: nim ## ## stomp.subscribe( "/queue/test", "client-individual" ) ## FIXME: attach procs ## stomp.wait_for_messages ## c.socksend( "NACK" & CRLF ) c.socksend( "id:" & id & CRLF ) if not transaction.is_nil: c.socksend( "transaction:" & transaction & CRLF ) else: c.add_txn c.finmsg proc wait_for_messages*( c: StompClient, loop=true ) = ## Enter a blocking select loop, dispatching to the appropriate proc ## for the received message type. Return after a single message ## is received if **loop** is set to **false**. if not c.connected: raise newException( StompError, "Client is not connected." ) while true: var timeout: int fds = @[ c.socket.get_fd ] # Check for missed heartbeats, with an additional second # of wiggle-room. # if c.options.heartbeat > 0: timeout = ( c.options.heartbeat + 1 ) * 1000 else: timeout = -1 if select( fds, timeout ) == 0: # timeout, only happens if heartbeating missed if not isNil( c.missed_heartbeat_callback ): c.missed_heartbeat_callback( c ) else: c.default_missed_heartbeat_callback if loop: continue else: break let response = newStompResponse( c ) case response.frame: of "HEARTBEAT": if not isNil( c.heartbeat_callback ): c.heartbeat_callback( c, response ) continue of "RECEIPT": if not isNil( c.receipt_callback ): c.receipt_callback( c, response ) of "MESSAGE": if not isNil( c.message_callback ): c.message_callback( c, response ) of "ERROR": if not isNil( c.error_callback ): c.error_callback( c, response ) else: c.default_error_callback( response ) else: if defined( debug ): echo "Strange broker frame: " & response.repr if not loop: break #------------------------------------------------------------------- # T E S T S #------------------------------------------------------------------- # Functional (rather than unit) tests. Requires a Stomp compatible broker. # This was tested against RabbitMQ 3.5.3 and 3.6.0. # 3.6.0 was -so- much faster. # # First start up a message receiver: # ./stomp receiver [stomp-uri] [subscription-destination] # # then run another process, to publish stuff: # ./stomp publisher [stomp-uri] [publish-destination] # # An example with an AMQP "direct" exchange, and an exclusive queue: # ./stomp publisher stomp://test:test@localhost/?heartbeat=10 /exchange/test # ./stomp receiver stomp://test:test@localhost/?heartbeat=10 /exchange/test # # Then just let 'er run. # # You can also run a nieve benchmark (deliveries/sec): # # ./stomp benchmark stomp://test:test@localhost/ /exchange/test # # It will set messages to require acknowledgement, and nack everything, causing # a delivery loop for 10 seconds. # when isMainModule: let expected = 8 var socket = newSocket() messages: seq[ StompResponse ] = @[] if paramCount() != 3: quit "See source comments for how to run functional tests." var stomp = newStompClient( socket, paramStr(2) ) stomp.connect echo stomp case paramStr(1): of "benchmark": echo "* Running for 10 seconds. Compile with -d:debug to see the Stomp conversation." var count = 0 var start = get_time() proc incr( c: StompClient, r: StompResponse ) = let id = r["ack"] count = count + 1 c.nack( id ) stomp.message_callback = incr stomp.subscribe( paramStr(3), "client" ) stomp.send( paramStr(3), "hi." ) while get_time() - start < 10: stomp.wait_for_messages( false ) printf "* Processed %d messages in 10 seconds.\n", count stomp.disconnect # Store incoming messages, ensure their contents match our expected behavior. # of "receiver": var heartbeats = 0 echo "* Waiting on messages from publisher. Compile with -d:debug to see the Stomp conversation." proc receive_message( c: StompClient, r: StompResponse ) = messages.add( r ) case r.frame: of "RECEIPT": discard of "MESSAGE": let body = r.payload let id = r[ "ack" ] proc seen_heartbeat( c: StompClient, r: StompResponse ) = heartbeats = heartbeats + 1 stomp.message_callback = receive_message stomp.receipt_callback = receive_message stomp.heartbeat_callback = seen_heartbeat stomp.subscribe( paramStr(3) ) # Populate the messages sequence with the count of expected messages. for i in 1..expected: stomp.wait_for_messages( false ) # Assertions on the results! # doAssert( messages.len == expected ) doAssert( messages[0].payload == nil ) doAssert( messages[1].payload == "Hello world!" ) doAssert( messages[2].payload == "Dumb.\n\n" ) doAssert( messages[3].payload == "Hello again." ) doAssert( messages[3][ "content-type" ] == "text/plain" ) doAssert( messages[3][ "Content-Type" ] == "text/plain" ) doAssert( messages[4][ "x-custom" ] == "yum" ) doAssert( messages[5][ "receipt" ] == "42" ) doAssert( messages[6].payload == "transaction!" ) doAssert( messages[7].payload == "transaction 2" ) stomp.disconnect if heartbeats > 0: printf "* Tests passed! %d heartbeats seen.", heartbeats else: echo "* Tests passed!" # Publish a variety of messages with various options. # Pause momentarily between sends(), as brokers -might- impose # rate limits and/or message dropping. # of "publisher": echo "* Publishing to receiver. Compile with -d:debug to see the Stomp conversation." # Simple, no frills event. stomp.send( paramStr(3) ) sleep 500 # Event with a body. stomp.send( paramStr(3), "Hello world!" ) sleep 500 # Event that doesn't contain a content-length. # (Note, the broker may elect to add one on your behalf, which is a good thing... # but invalidates this test.) stomp.socksend( "SEND" & CRLF ) stomp.socksend( "destination:" & paramStr(3) & CRLF & CRLF ) stomp.socksend( "Dumb.\n\n" & NULL ) sleep 500 # Content-Type stomp.send( paramStr(3), "Hello again.", "text/plain" ) sleep 500 # Custom headers. var headers: seq[ tuple[ name: string, value: string ] ] = @[] headers.add( ("x-custom", "yum") ) stomp.send( paramStr(3), "Hello again.", "text/plain", headers ) sleep 500 # Receipt requests. proc receive_receipt( c: StompClient, r: StompResponse ) = messages.add( r ) headers = @[] headers.add( ("receipt", "42") ) stomp.send( paramStr(3), "Hello again.", "text/plain", headers ) stomp.receipt_callback = receive_receipt stomp.wait_for_messages( false ) doAssert( messages[0]["receipt-id"] == "42" ) # Aborted transaction. stomp.begin( "test-abort" ) for i in 1..3: stomp.send( paramStr(3), "Message: " & $i ) stomp.abort # Committed transaction. stomp.begin( "test-commit" ) stomp.send( paramStr(3), "transaction!" ) stomp.commit # Mixed transactions. for i in 1..3: headers = @[] headers.add( ("transaction", "test-" & $i ) ) stomp.begin( "test-" & $i ) stomp.send( paramStr(3), "transaction " & $i, nil, headers ) sleep 500 stomp.abort( "test-1" ) sleep 500 stomp.commit( "test-2" ) sleep 500 stomp.abort( "test-3" ) sleep 500 stomp.disconnect echo "* Tests passed!" else: quit "See source comments for how to run functional tests."