Re-arrange for nimble, update to Nim 0.19.
--- a/.hgignore Fri Mar 25 15:59:20 2016 -0700
+++ b/.hgignore Mon Oct 08 12:11:54 2018 -0700
@@ -1,3 +1,5 @@
+syntax: glob
.cache
-stomp$
-stomp.html
+stomp
+src/stomp
+*.html
--- a/Makefile Fri Mar 25 15:59:20 2016 -0700
+++ b/Makefile Mon Oct 08 12:11:54 2018 -0700
@@ -1,11 +1,12 @@
-FILES = stomp.nim
+FILES = src/stomp.nim
CACHE = .cache
default: development
development: ${FILES}
nim --debugInfo --assertions:on --linedir:on -d:ssl --define:debug --nimcache:${CACHE} c ${FILES}
+ @mv src/stomp .
autobuild:
# find . -depth 1 -name \*.nim | entr -cp make
@@ -16,6 +17,7 @@
release: ${FILES}
nim -d:release --opt:speed --nimcache:${CACHE} c ${FILES}
+ @mv src/stomp .
docs:
nim doc ${FILES}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/stomp.nim Mon Oct 08 12:11:54 2018 -0700
@@ -0,0 +1,857 @@
+# vim: set et nosta sw=4 ts=4 ft=nim :
+#
+# Copyright (c) 2016-2018, Mahlon E. Smith <mahlon@martini.nu>
+# All rights reserved.
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#
+# * Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in the
+# documentation and/or other materials provided with the distribution.
+#
+# * Neither the name of Mahlon E. Smith nor the names of his
+# contributors may be used to endorse or promote products derived
+# from this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND ANY
+# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+# DISCLAIMED. IN NO EVENT SHALL THE REGENTS AND CONTRIBUTORS BE LIABLE FOR ANY
+# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+## Overview
+## ============
+##
+## This is a pure-nim client library for interacting with Stomp 1.2
+## compliant messaging brokers.
+##
+## https://stomp.github.io/stomp-specification-1.2.html
+##
+## Stomp is a simple protocol for message passing between clients, using a central
+## broker. It is a subset of other more elaborate protocols (like AMQP), supporting
+## only the most used features of common brokers.
+##
+## Because this library is pure-nim, there are no external dependencies. If you
+## can compile a nim binary, you can participate in advanced messaging between processes.
+##
+## A list of broker support for Stomp can be found here:
+## https://stomp.github.io/implementations.html.
+##
+## This library has been tested with recent versions of RabbitMQ. If it
+## works for you with another broker, please let the author know.
+##
+
+import
+ strutils,
+ nativesockets,
+ net,
+ os,
+ times,
+ uri
+
+const
+ VERSION = "0.1.1" ## The current program version.
+ NULL = "\x00" ## The NULL character.
+ CR = "\r" ## The carriage return character.
+ CRLF = "\r\n" ## Carriage return + Line feed (EOL).
+
+
+# Exceptions
+#
+type
+ StompError* = object of ValueError ## A generic Stomp error state.
+
+ StompClient* = ref object of RootObj ## An object that represents a connection to a Stomp compatible server.
+ socket: Socket ## The socket object attached to this client.
+ connected*: bool ## Is the client currently connected?
+ uri*: Uri ## The URI used to instantiate this client.
+ username: string ## The Stomp server user, if any.
+ password: string ## The Stomp server password, if any.
+ host: string ## The host or IP address to connect to.
+ port: Port ## Optional, if Stomp is on a non-default port.
+ vhost: string ## Parsed from the URI path, a Stomp "virtual host".
+ timeout*: int ## Global socket timeout.
+ last_msgtime*: Time ## Timestamp of last seen server message.
+ options: tuple[ heartbeat: int ] ## Any supported client options, derived from the URI query string.
+ subscriptions*: seq[ string ] ## Registered client subscriptions. Array position is the ID.
+ transactions*: seq[ string ] ## Any currently open transactions.
+ serverinfo: seq[ tuple[name: string, value: string] ] ## Server metadata, populated upon a successful connection.
+
+ connected_callback*: proc ( client: StompClient, response: StompResponse ): void
+ error_callback*: proc ( client: StompClient, response: StompResponse ): void
+ heartbeat_callback*: proc ( client: StompClient, response: StompResponse ): void
+ message_callback*: proc ( client: StompClient, response: StompResponse ): void
+ missed_heartbeat_callback*: proc ( client: StompClient ): void
+ receipt_callback*: proc ( client: StompClient, response: StompResponse ): void
+
+ StompResponse* = ref object of RootObj ## A parsed packet from a Stomp server.
+ headers: seq[ tuple[name: string, value: string] ] ## Any headers in the response. Access with the `[]` proc.
+ frame*: string ## The Stomp frame type.
+ payload*: string ## The message body, if any.
+
+
+# convenience
+proc printf( formatstr: cstring ) {.header: "<stdio.h>", varargs.}
+
+
+#-------------------------------------------------------------------
+# R E S P O N S E
+#-------------------------------------------------------------------
+
+proc is_eol( s: string ): bool =
+ ## Convenience method, returns **true** if string is a Stomp EOF.
+ return s == CR or s == CRLF
+
+
+proc parse_headers( response: StompResponse, c: StompClient ): int =
+ ## Parse response headers from a stream.
+ ## Returns the content length of the response body, or 0.
+ result = 0
+ var line = ""
+
+ c.socket.readline( line, c.timeout )
+ while not line.is_eol:
+ if defined( debug ): printf " <-- %s\n", line
+ var header = line.split( ":" )
+ if header.len < 2: break
+ response.headers.add( (header[0], header[1]) )
+ if cmpIgnoreCase( header[0], "content-length" ) == 0: result = header[1].parse_int
+ c.socket.readline( line, c.timeout )
+
+
+proc parse_payload( response: StompResponse, c: StompClient, bodylength = 0 ): void =
+ ## Parse message payload from a stream.
+ let bufsize = 8192
+ var
+ buf = ""
+ data = ""
+
+
+ # If we already know the length of the body, just perform a buffered read.
+ #
+ if bodylength > 0:
+ var
+ readtotal = 0
+ readamt = 0
+ remaining = 0
+
+ while readtotal != bodylength:
+ remaining = bodylength - readtotal
+
+ if remaining < bufsize:
+ readamt = remaining
+ else:
+ readamt = bufsize
+
+ buf = newString( readamt )
+ readtotal = readtotal + c.socket.recv( buf, readamt, c.timeout )
+ data = data & buf
+
+ # Eat the NULL terminator.
+ discard c.socket.recv( buf, 1, c.timeout )
+
+ # Inefficient path.
+ #
+ else:
+ while buf != NULL:
+ discard c.socket.recv( buf, 1, c.timeout )
+ data = data & buf
+
+ response.payload = data
+
+
+proc newStompResponse( c: StompClient ): StompResponse =
+ ## Initialize a response object, which parses and contains
+ ## the Stomp headers and any additional important data from
+ ## the broker socket.
+ new( result )
+ result.headers = @[]
+
+ # Get the frame type, record last seen server activity time.
+ #
+ var line = ""
+ c.socket.readline( line, c.timeout )
+ c.last_msgtime = get_time()
+
+ # Heartbeat packets (empties.)
+ #
+ # This could -also- parse optional EOLs from the prior
+ # message (after the NULL separator), but since it is a no-op,
+ # this seems harmless.
+ #
+ if line.is_eol:
+ result.frame = "HEARTBEAT"
+ return result
+
+ # All other types.
+ #
+ result.frame = line
+ if defined( debug ):
+ printf " <-- %s\n", line
+
+ # Parse headers and body.
+ #
+ var length = result.parse_headers( c )
+ if result.frame == "MESSAGE" or result.frame == "ERROR":
+ result.parse_payload( c, length )
+
+ # If the response -could- have a body, the NULL has already
+ # been removed from the stream while we checked for one.
+ #
+ if result.payload.len > 0:
+ if result.payload == NULL: # We checked for a body, but there was none.
+ result.payload = ""
+ if defined( debug ): printf " <--\n <-- ^@\n\n"
+ else:
+ if defined( debug ): printf " <--\n <-- (payload)^@\n\n"
+
+ # Otherwise, pop off the NULL terminator now.
+ #
+ else:
+ discard c.socket.recv( line, 1, c.timeout )
+ if defined( debug ): printf " <--\n <-- ^@\n\n"
+
+
+proc `$`*( r: StompResponse ): string =
+ ## Represent a Stomp response as a string.
+ result = r.frame & ": " & $r.headers
+
+
+proc `[]`*( response: StompResponse, key: string ): string =
+ ## Get a specific header from a Stomp response.
+ for header in response.headers:
+ if cmpIgnoreCase( key, header.name ) == 0:
+ return header.value
+ return ""
+
+
+#-------------------------------------------------------------------
+# C A L L B A C K S
+#-------------------------------------------------------------------
+
+proc default_error_callback( c: StompClient, response: StompResponse ) =
+ ## Something bad happened. Disconnect from the server, build an error message,
+ ## and raise an exception.
+ c.socket.close
+ c.connected = false
+
+ var detail = response.payload
+ var msg = response[ "message" ]
+ if $detail[ ^1 ] == "\n": detail = detail[ 0 .. ^2 ] # chomp
+
+ if detail.len > 0: msg = msg & " (" & detail & ")"
+ raise newException( StompError, "ERROR: " & msg )
+
+
+proc default_missed_heartbeat_callback( c: StompClient ) =
+ ## Timeout while connected to the broker.
+ c.socket.close
+ c.connected = false
+ raise newException( StompError, "Heartbeat timeout. Last activity: " & $c.last_msgtime )
+
+
+
+#-------------------------------------------------------------------
+# C L I E N T
+#-------------------------------------------------------------------
+
+proc newStompClient*( s: Socket, uri: string ): StompClient =
+ ## Create a new Stomp client object from a preexisting **socket**,
+ ## and a stomp **URI** string.
+ ##
+ ## .. code-block:: nim
+ ##
+ ## var socket = newSocket()
+ ## var stomp = newStompClient( socket, "stomp://test:test@example.com/%2Fvhost" )
+ ##
+ ## or if connecting with SSL, when compiled with -d:ssl:
+ ##
+ ## .. code-block:: nim
+ ##
+ ## var socket = newSocket()
+ ## let sslContext = newContext( verifyMode = CVerifyNone )
+ ## sslContext.wrapSocket(socket)
+ ## var stomp = newStompClient( socket, "stomp+ssl://test:test@example.com/%2Fvhost" )
+ ##
+ new( result )
+ result.socket = s
+ result.connected = false
+ result.uri = parse_uri( uri )
+ result.username = result.uri.username
+ result.password = result.uri.password
+ result.host = result.uri.hostname
+ result.vhost = result.uri.path
+ result.timeout = 500
+ result.subscriptions = @[]
+ result.transactions = @[]
+
+ # Parse any supported options in the query string.
+ #
+ for pairs in result.uri.query.split( '&' ):
+ let opt = pairs.split( '=' )
+ try:
+ case opt[0]:
+ of "heartbeat":
+ result.options.heartbeat = opt[1].parse_int
+ else:
+ discard
+ except IndexError, ValueError:
+ discard
+
+ # Set default STOMP port if otherwise unset.
+ #
+ if not result.uri.scheme.contains( "stomp" ):
+ raise newException( StompError, "Unknown scheme: " & result.uri.scheme )
+ var port: int
+ if result.uri.port == "":
+ if result.uri.scheme.contains( "+ssl" ):
+ port = 61614
+ else:
+ port = 61613
+ else:
+ port = result.uri.port.parse_int
+
+ result.port = Port( port )
+
+ # Decode URI encoded slashes for vhosts.
+ result.vhost = result.vhost.replace( "%2f", "/" ).replace( "%2F", "/" ).replace( "//", "/" )
+
+
+proc socksend( c: StompClient, data: string ): void =
+ ## Send data on the connected socket with optional debug output.
+ c.socket.send( data )
+ if defined( debug ): printf " --> %s", data
+
+
+proc finmsg( c: StompClient ): void =
+ ## Send data on the connected socket with optional debug output.
+ c.socket.send( CRLF & NULL & CRLF )
+ if defined( debug ): printf " --> \n --> ^@\n\n"
+
+
+proc `[]`*( c: StompClient, key: string ): string =
+ ## Get a specific value from the server metadata, set during the initial connection.
+ if not c.connected: return ""
+ for header in c.serverinfo:
+ if cmpIgnoreCase( key, header.name ) == 0:
+ return header.value
+ return ""
+
+
+proc `$`*( c: StompClient ): string =
+ ## Represent the stomp client as a string, after masking the password.
+ let uri = ( $c.uri ).replace( ":" & c.uri.password & "@", "@" )
+ result = "(NimStomp v" & VERSION & ( if c.connected: " connected" else: " not connected" ) & " to " & uri
+ if not ( c[ "server" ] == "" ): result.add( " --> " & c["server"] )
+ result.add( ")" )
+
+
+proc connect*( c: StompClient ): void =
+ ## Establish a connection to the Stomp server.
+ if c.connected: return
+
+ var headers: seq[ tuple[name: string, value: string] ] = @[]
+ headers.add( ("accept-version", "1.2") )
+
+ # Stomp 1.2 requires the Host: header. Use the path as a vhost if
+ # supplied, otherwise use the hostname of the server.
+ #
+ if c.vhost != "":
+ headers.add( ("host", c.vhost) )
+ else:
+ headers.add( ("host", c.host) )
+
+ if c.username != "": headers.add( ("login", c.username) )
+ if c.password != "": headers.add( ("passcode", c.password) )
+ if c.options.heartbeat > 0:
+ let heartbeat = c.options.heartbeat * 1000
+ headers.add( ("heart-beat", "0," & $heartbeat) )
+
+ # Connect the socket and send the headers off.
+ #
+ c.socket.connect( c.host, c.port )
+ c.socksend( "CONNECT" & CRLF )
+ for header in headers:
+ c.socksend( header.name & ":" & header.value & CRLF )
+ c.finmsg
+
+ # Retreive and copy server metadata to client object.
+ #
+ var response = newStompResponse( c )
+ c.serverinfo = response.headers
+
+ if response.frame != "CONNECTED":
+ if not isNil( c.error_callback ):
+ c.error_callback( c, response )
+ else:
+ c.default_error_callback( response )
+ else:
+ c.connected = true
+ if not isNil( c.connected_callback ):
+ c.connected_callback( c, response )
+
+
+proc disconnect*( c: StompClient ): void =
+ ## Break down the connection to the Stomp server nicely.
+ if not c.connected: return
+ c.socksend( "DISCONNECT" & CRLF )
+ c.finmsg
+
+ c.socket.close
+ c.connected = false
+
+
+proc add_txn( c: StompClient ): void =
+ ## Add a transaction header if there is only a single open txn.
+ if c.transactions.len != 1: return
+ c.socksend( "transaction:" & c.transactions[0] & CRLF )
+
+
+proc send*( c: StompClient,
+ destination: string,
+ message: string = "",
+ contenttype: string = "",
+ headers: seq[ tuple[name: string, value: string] ] = @[] ): void =
+ ## Send a **message** to **destination**.
+ ##
+ ## A Content-Length header is automatically and always included.
+ ## A **contenttype** is optional, but strongly recommended.
+ ##
+ ## Additionally, a transaction ID is automatically added if there is only
+ ## one transaction active. If you need to attach this message to a particular
+ ## transaction ID, you'll need to add it yourself with the user defined
+ ## **headers**.
+
+ if not c.connected: raise newException( StompError, "Client is not connected." )
+ c.socksend( "SEND" & CRLF )
+ c.socksend( "destination:" & destination & CRLF )
+ c.socksend( "content-length:" & $message.len & CRLF )
+ if not ( contenttype == "" ): c.socksend( "content-type:" & contenttype & CRLF )
+
+ # Add custom headers. Add transaction header if one isn't manually
+ # present (and a transaction is open.)
+ #
+ var txn_seen = false
+ for header in headers:
+ if header.name == "transaction": txn_seen = true
+ c.socksend( header.name & ":" & header.value & CRLF )
+ if not txn_seen: c.add_txn
+
+ if message == "":
+ c.finmsg
+ else:
+ c.socket.send( CRLF & message & NULL )
+ if defined( debug ): printf " -->\n --> (payload)^@\n\n"
+
+
+proc subscribe*( c: StompClient,
+ destination: string,
+ ack = "auto",
+ headers: seq[ tuple[name: string, value: string] ] = @[] ): void =
+ ## Subscribe to messages at **destination**.
+ ##
+ ## Setting **ack** to "client" or "client-individual" enables client ACK/NACK mode.
+ ## In this mode, incoming messages aren't considered processed by
+ ## the server unless they receive ACK. By default, the server
+ ## considers the message processed if a client simply accepts it.
+ ##
+ ## You may optionally add any additional **headers** the server may support.
+
+ if not c.connected: raise newException( StompError, "Client is not connected." )
+ c.socksend( "SUBSCRIBE" & CRLF )
+ c.socksend( "destination:" & destination & CRLF )
+ c.socksend( "id:" & $c.subscriptions.len & CRLF )
+ if ack == "client" or ack == "client-individual":
+ c.socksend( "ack:" & ack & CRLF )
+ else:
+ if ack != "auto": raise newException( StompError, "Unknown ack type: " & ack )
+
+ for header in headers:
+ c.socksend( header.name & ":" & header.value & CRLF )
+ c.finmsg
+ c.subscriptions.add( destination )
+
+
+proc unsubscribe*( c: StompClient,
+ destination: string,
+ headers: seq[ tuple[name: string, value: string] ] = @[] ): void =
+ ## Unsubscribe from messages at **destination**.
+ ## You may optionally add any additional **headers** the server may support.
+
+ if not c.connected: raise newException( StompError, "Client is not connected." )
+ var
+ sub_id: int
+ i = 0
+
+ # Find the ID of the subscription.
+ #
+ for sub in c.subscriptions:
+ if sub == destination:
+ sub_id = i
+ break
+ i = i + 1
+
+ c.socksend( "UNSUBSCRIBE" & CRLF )
+ c.socksend( "id:" & $sub_id & CRLF )
+ for header in headers:
+ c.socksend( header.name & ":" & header.value & CRLF )
+ c.finmsg
+ c.subscriptions[ sub_id ] = ""
+
+
+proc begin*( c: StompClient, txn: string ): void =
+ ## Begin a new transaction on the broker, using **txn** as the identifier.
+ c.socksend( "BEGIN" & CRLF )
+ c.socksend( "transaction:" & txn & CRLF )
+ c.finmsg
+ c.transactions.add( txn )
+
+
+proc commit*( c: StompClient, txn: string = "" ): void =
+ ## Finish a specific transaction **txn**, or the most current if unspecified.
+ var transaction = txn
+ if transaction == "" and c.transactions.len > 0: transaction = c.transactions.pop
+ if transaction == "": return
+
+ c.socksend( "COMMIT" & CRLF )
+ c.socksend( "transaction:" & transaction & CRLF )
+ c.finmsg
+
+ # Remove the transaction from the queue.
+ #
+ var new_transactions: seq[ string ] = @[]
+ for txn in c.transactions:
+ if txn != transaction: new_transactions.add( txn )
+ c.transactions = new_transactions
+
+
+proc abort*( c: StompClient, txn: string = "" ): void =
+ ## Cancel a specific transaction **txn**, or the most current if unspecified.
+ var transaction = txn
+ if transaction == "" and c.transactions.len > 0: transaction = c.transactions.pop
+ if transaction == "": return
+
+ c.socksend( "ABORT" & CRLF )
+ c.socksend( "transaction:" & transaction & CRLF )
+ c.finmsg
+
+ # Remove the transaction from the queue.
+ #
+ var new_transactions: seq[ string ] = @[]
+ for txn in c.transactions:
+ if txn != transaction: new_transactions.add( txn )
+ c.transactions = new_transactions
+
+
+proc ack*( c: StompClient, id: string, transaction: string = "" ): void =
+ ## Acknowledge message **id**. Optionally, attach this acknowledgement
+ ## to a specific **transaction** -- if there's only one active, it is
+ ## added automatically.
+ c.socksend( "ACK" & CRLF )
+ c.socksend( "id:" & id & CRLF )
+ if not ( transaction == "" ):
+ c.socksend( "transaction:" & transaction & CRLF )
+ else:
+ c.add_txn
+ c.finmsg
+
+
+proc nack*( c: StompClient, id: string, transaction: string = "" ): void =
+ ## Reject message **id**. Optionally, attach this rejection to a
+ ## specific **transaction** -- if there's only one active, it is
+ ## added automatically.
+ ##
+ ## Subscribe to a queue with ACK mode enabled, and reject the message
+ ## on error:
+ ##
+ ## .. code-block:: nim
+ ##
+ ## stomp.subscribe( "/queue/test", "client-individual" )
+ ## FIXME: attach procs
+ ## stomp.wait_for_messages
+ ##
+ c.socksend( "NACK" & CRLF )
+ c.socksend( "id:" & id & CRLF )
+ if not ( transaction == "" ):
+ c.socksend( "transaction:" & transaction & CRLF )
+ else:
+ c.add_txn
+ c.finmsg
+
+
+proc wait_for_messages*( c: StompClient, loop=true ) =
+ ## Enter a blocking select loop, dispatching to the appropriate proc
+ ## for the received message type. Return after a single message
+ ## is received if **loop** is set to **false**.
+
+ if not c.connected: raise newException( StompError, "Client is not connected." )
+
+ while true:
+ var
+ timeout: int
+ fds = @[ c.socket.get_fd ]
+
+ # Check for missed heartbeats, with an additional second
+ # of wiggle-room.
+ #
+ if c.options.heartbeat > 0:
+ timeout = ( c.options.heartbeat + 1 ) * 1000
+ else:
+ timeout = -1
+
+ if select_read( fds, timeout ) == 0: # timeout, only happens if heartbeating missed
+ if not isNil( c.missed_heartbeat_callback ):
+ c.missed_heartbeat_callback( c )
+ else:
+ c.default_missed_heartbeat_callback
+ if loop: continue else: break
+
+ let response = newStompResponse( c )
+ case response.frame:
+
+ of "HEARTBEAT":
+ if not isNil( c.heartbeat_callback ):
+ c.heartbeat_callback( c, response )
+ continue
+
+ of "RECEIPT":
+ if not isNil( c.receipt_callback ):
+ c.receipt_callback( c, response )
+
+ of "MESSAGE":
+ if not isNil( c.message_callback ):
+ c.message_callback( c, response )
+
+ of "ERROR":
+ if not isNil( c.error_callback ):
+ c.error_callback( c, response )
+ else:
+ c.default_error_callback( response )
+
+ else:
+ if defined( debug ):
+ echo "Strange broker frame: " & response.repr
+
+ if not loop: break
+
+
+
+#-------------------------------------------------------------------
+# T E S T S
+#-------------------------------------------------------------------
+
+# Functional (rather than unit) tests. Requires a Stomp compatible broker.
+# This was tested against RabbitMQ 3.5.3 and 3.6.0.
+# 3.6.0 was -so- much faster.
+#
+# First start up a message receiver:
+# ./stomp receiver [stomp-uri] [subscription-destination]
+#
+# then run another process, to publish stuff:
+# ./stomp publisher [stomp-uri] [publish-destination]
+#
+# An example with an AMQP "direct" exchange, and an exclusive queue:
+# ./stomp publisher stomp://test:test@localhost/?heartbeat=10 /exchange/test
+# ./stomp receiver stomp://test:test@localhost/?heartbeat=10 /exchange/test
+#
+# Then just let 'er run.
+#
+# You can also run a nieve benchmark (deliveries/sec):
+#
+# ./stomp benchmark stomp://test:test@localhost/ /exchange/test
+#
+# It will set messages to require acknowledgement, and nack everything, causing
+# a delivery loop for 10 seconds.
+#
+when isMainModule:
+ let expected = 8
+ var
+ socket = newSocket()
+ messages: seq[ StompResponse ] = @[]
+
+ let usage = """
+First start up a message receiver:
+ ./stomp receiver [stomp-uri] [subscription-destination]
+
+then run another process, to publish stuff:
+ ./stomp publisher [stomp-uri] [publish-destination]
+
+An example with an AMQP "direct" exchange, and an exclusive queue:
+ ./stomp publisher stomp://test:test@localhost/?heartbeat=10 /exchange/test
+ ./stomp receiver stomp://test:test@localhost/?heartbeat=10 /exchange/test
+
+Then just let 'er run.
+
+You can also run a nieve benchmark (deliveries/sec):
+
+ ./stomp benchmark stomp://test:test@localhost/ /exchange/test
+
+It will set messages to require acknowledgement, and nack everything, causing
+a delivery loop for 10 seconds.
+"""
+
+
+ if paramCount() != 3: quit usage
+
+ var stomp = newStompClient( socket, paramStr(2) )
+ stomp.connect
+ echo stomp
+
+ case paramStr(1):
+
+ of "benchmark":
+ echo "* Running for 10 seconds. Compile with -d:debug to see the Stomp conversation."
+ var count = 0
+ var start = get_time()
+
+ proc incr( c: StompClient, r: StompResponse ) =
+ let id = r["ack"]
+ count = count + 1
+ c.nack( id )
+
+ stomp.message_callback = incr
+ stomp.subscribe( paramStr(3), "client" )
+ stomp.send( paramStr(3), "hi." )
+ while get_time() < start + 10.seconds:
+ stomp.wait_for_messages( false )
+
+ printf "* Processed %d messages in 10 seconds.\n", count
+ stomp.disconnect
+
+
+ # Store incoming messages, ensure their contents match our expected behavior.
+ #
+ of "receiver":
+ var heartbeats = 0
+ echo "* Waiting on messages from publisher. Compile with -d:debug to see the Stomp conversation."
+
+ proc receive_message( c: StompClient, r: StompResponse ) =
+ messages.add( r )
+ case r.frame:
+ of "RECEIPT":
+ discard
+ of "MESSAGE":
+ let body = r.payload
+ let id = r[ "ack" ]
+
+ proc seen_heartbeat( c: StompClient, r: StompResponse ) =
+ heartbeats = heartbeats + 1
+
+ stomp.message_callback = receive_message
+ stomp.receipt_callback = receive_message
+ stomp.heartbeat_callback = seen_heartbeat
+ stomp.subscribe( paramStr(3) )
+
+ # Populate the messages sequence with the count of expected messages.
+ for i in 1..expected: stomp.wait_for_messages( false )
+
+ # Assertions on the results!
+ #
+ doAssert( messages.len == expected )
+ doAssert( messages[0].payload == "" )
+
+ doAssert( messages[1].payload == "Hello world!" )
+
+ doAssert( messages[2].payload == "Dumb.\n\n" )
+
+ doAssert( messages[3].payload == "Hello again." )
+ doAssert( messages[3][ "content-type" ] == "text/plain" )
+ doAssert( messages[3][ "Content-Type" ] == "text/plain" )
+
+ doAssert( messages[4][ "x-custom" ] == "yum" )
+
+ doAssert( messages[5][ "receipt" ] == "42" )
+
+ doAssert( messages[6].payload == "transaction!" )
+ doAssert( messages[7].payload == "transaction 2" )
+
+ stomp.disconnect
+
+ if heartbeats > 0:
+ printf "* Tests passed! %d heartbeats seen.", heartbeats
+ else:
+ echo "* Tests passed!"
+
+
+ # Publish a variety of messages with various options.
+ # Pause momentarily between sends(), as brokers -might- impose
+ # rate limits and/or message dropping.
+ #
+ of "publisher":
+ echo "* Publishing to receiver. Compile with -d:debug to see the Stomp conversation."
+
+ # Simple, no frills event.
+ stomp.send( paramStr(3) )
+ sleep 500
+
+ # Event with a body.
+ stomp.send( paramStr(3), "Hello world!" )
+ sleep 500
+
+ # Event that doesn't contain a content-length.
+ # (Note, the broker may elect to add one on your behalf, which is a good thing...
+ # but invalidates this test.)
+ stomp.socksend( "SEND" & CRLF )
+ stomp.socksend( "destination:" & paramStr(3) & CRLF & CRLF )
+ stomp.socksend( "Dumb.\n\n" & NULL )
+ sleep 500
+
+ # Content-Type
+ stomp.send( paramStr(3), "Hello again.", "text/plain" )
+ sleep 500
+
+ # Custom headers.
+ var headers: seq[ tuple[ name: string, value: string ] ] = @[]
+ headers.add( ("x-custom", "yum") )
+ stomp.send( paramStr(3), "Hello again.", "text/plain", headers )
+ sleep 500
+
+ # Receipt requests.
+ proc receive_receipt( c: StompClient, r: StompResponse ) =
+ messages.add( r )
+ headers = @[]
+ headers.add( ("receipt", "42") )
+ stomp.send( paramStr(3), "Hello again.", "text/plain", headers )
+ stomp.receipt_callback = receive_receipt
+ stomp.wait_for_messages( false )
+ doAssert( messages[0]["receipt-id"] == "42" )
+
+ # Aborted transaction.
+ stomp.begin( "test-abort" )
+ for i in 1..3:
+ stomp.send( paramStr(3), "Message: " & $i )
+ stomp.abort
+
+ # Committed transaction.
+ stomp.begin( "test-commit" )
+ stomp.send( paramStr(3), "transaction!" )
+ stomp.commit
+
+ # Mixed transactions.
+ for i in 1..3:
+ headers = @[]
+ headers.add( ("transaction", "test-" & $i ) )
+ stomp.begin( "test-" & $i )
+ stomp.send( paramStr(3), "transaction " & $i, "", headers )
+ sleep 500
+ stomp.abort( "test-1" )
+ sleep 500
+ stomp.commit( "test-2" )
+ sleep 500
+ stomp.abort( "test-3" )
+ sleep 500
+
+ stomp.disconnect
+ echo "* Tests passed!"
+
+ else:
+ quit usage
+
--- a/stomp.nim Fri Mar 25 15:59:20 2016 -0700
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,847 +0,0 @@
-# vim: set et nosta sw=4 ts=4 ft=nim :
-#
-# Copyright (c) 2016, Mahlon E. Smith <mahlon@martini.nu>
-# All rights reserved.
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-#
-# * Redistributions in binary form must reproduce the above copyright
-# notice, this list of conditions and the following disclaimer in the
-# documentation and/or other materials provided with the distribution.
-#
-# * Neither the name of Mahlon E. Smith nor the names of his
-# contributors may be used to endorse or promote products derived
-# from this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND ANY
-# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
-# DISCLAIMED. IN NO EVENT SHALL THE REGENTS AND CONTRIBUTORS BE LIABLE FOR ANY
-# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
-# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
-# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
-# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
-# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-## Overview
-## ============
-##
-## This is a pure-nim client library for interacting with Stomp 1.2
-## compliant messaging brokers.
-##
-## https://stomp.github.io/stomp-specification-1.2.html
-##
-## Stomp is a simple protocol for message passing between clients, using a central
-## broker. It is a subset of other more elaborate protocols (like AMQP), supporting
-## only the most used features of common brokers.
-##
-## Because this library is pure-nim, there are no external dependencies. If you
-## can compile a nim binary, you can participate in advanced messaging between processes.
-##
-## A list of broker support for Stomp can be found here:
-## https://stomp.github.io/implementations.html.
-##
-## This library has been tested with recent versions of RabbitMQ. If it
-## works for you with another broker, please let the author know.
-##
-##
-## Protocol support
-## ----------------
-##
-## Examples
-## =========
-##
-## Connecting with SSL
-## -------------------
-##
-
-import
- strutils,
- nativesockets,
- net,
- os,
- times,
- uri
-
-const
- VERSION = "0.1.0" ## The current program version.
- NULL = "\x00" ## The NULL character.
- CR = "\r" ## The carriage return character.
- CRLF = "\r\n" ## Carriage return + Line feed (EOL).
-
-
-# Exceptions
-#
-type
- StompError* = object of ValueError ## A generic Stomp error state.
-
- StompClient* = ref object of RootObj ## An object that represents a connection to a Stomp compatible server.
- socket: Socket ## The socket object attached to this client.
- connected*: bool ## Is the client currently connected?
- uri*: Uri ## The URI used to instantiate this client.
- username: string ## The Stomp server user, if any.
- password: string ## The Stomp server password, if any.
- host: string ## The host or IP address to connect to.
- port: Port ## Optional, if Stomp is on a non-default port.
- vhost: string ## Parsed from the URI path, a Stomp "virtual host".
- timeout*: int ## Global socket timeout.
- last_msgtime*: Time ## Timestamp of last seen server message.
- options: tuple[ heartbeat: int ] ## Any supported client options, derived from the URI query string.
- subscriptions*: seq[ string ] ## Registered client subscriptions. Array position is the ID.
- transactions*: seq[ string ] ## Any currently open transactions.
- serverinfo: seq[ tuple[name: string, value: string] ] ## Server metadata, populated upon a successful connection.
-
- connected_callback*: proc ( client: StompClient, response: StompResponse ): void
- error_callback*: proc ( client: StompClient, response: StompResponse ): void
- heartbeat_callback*: proc ( client: StompClient, response: StompResponse ): void
- message_callback*: proc ( client: StompClient, response: StompResponse ): void
- missed_heartbeat_callback*: proc ( client: StompClient ): void
- receipt_callback*: proc ( client: StompClient, response: StompResponse ): void
-
- StompResponse* = ref object of RootObj ## A parsed packet from a Stomp server.
- headers: seq[ tuple[name: string, value: string] ] ## Any headers in the response. Access with the `[]` proc.
- frame*: string ## The Stomp frame type.
- payload*: string ## The message body, if any.
-
-
-# convenience
-proc printf( formatstr: cstring ) {.header: "<stdio.h>", varargs.}
-
-
-#-------------------------------------------------------------------
-# R E S P O N S E
-#-------------------------------------------------------------------
-
-proc is_eol( s: string ): bool =
- ## Convenience method, returns **true** if string is a Stomp EOF.
- return s == CR or s == CRLF
-
-
-proc parse_headers( response: StompResponse, c: StompClient ): int =
- ## Parse response headers from a stream.
- ## Returns the content length of the response body, or 0.
- result = 0
- var line = ""
-
- c.socket.readline( line, c.timeout )
- while not line.is_eol:
- if defined( debug ): printf " <-- %s\n", line
- var header = line.split( ":" )
- if header.len < 2: break
- response.headers.add( (header[0], header[1]) )
- if cmpIgnoreCase( header[0], "content-length" ) == 0: result = header[1].parse_int
- c.socket.readline( line, c.timeout )
-
-
-proc parse_payload( response: StompResponse, c: StompClient, bodylength = 0 ): void =
- ## Parse message payload from a stream.
- let bufsize = 8192
- var
- buf = ""
- data = ""
-
-
- # If we already know the length of the body, just perform a buffered read.
- #
- if bodylength > 0:
- var
- readtotal = 0
- readamt = 0
- remaining = 0
-
- while readtotal != bodylength:
- remaining = bodylength - readtotal
-
- if remaining < bufsize:
- readamt = remaining
- else:
- readamt = bufsize
-
- buf = newString( readamt )
- readtotal = readtotal + c.socket.recv( buf, readamt, c.timeout )
- data = data & buf
-
- # Eat the NULL terminator.
- discard c.socket.recv( buf, 1, c.timeout )
-
- # Inefficient path.
- #
- else:
- while buf != NULL:
- discard c.socket.recv( buf, 1, c.timeout )
- data = data & buf
-
- response.payload = data
-
-
-proc newStompResponse( c: StompClient ): StompResponse =
- ## Initialize a response object, which parses and contains
- ## the Stomp headers and any additional important data from
- ## the broker socket.
- new( result )
- result.headers = @[]
-
- # Get the frame type, record last seen server activity time.
- #
- var line = ""
- c.socket.readline( line, c.timeout )
- c.last_msgtime = get_time()
-
- # Heartbeat packets (empties.)
- #
- # This could -also- parse optional EOLs from the prior
- # message (after the NULL separator), but since it is a no-op,
- # this seems harmless.
- #
- if line.is_eol:
- result.frame = "HEARTBEAT"
- return result
-
- # All other types.
- #
- result.frame = line
- if defined( debug ):
- printf " <-- %s\n", line
-
- # Parse headers and body.
- #
- var length = result.parse_headers( c )
- if result.frame == "MESSAGE" or result.frame == "ERROR":
- result.parse_payload( c, length )
-
- # If the response -could- have a body, the NULL has already
- # been removed from the stream while we checked for one.
- #
- if result.payload.len > 0:
- if result.payload == NULL: # We checked for a body, but there was none.
- result.payload = nil
- if defined( debug ): printf " <--\n <-- ^@\n\n"
- else:
- if defined( debug ): printf " <--\n <-- (payload)^@\n\n"
-
- # Otherwise, pop off the NULL terminator now.
- #
- else:
- discard c.socket.recv( line, 1, c.timeout )
- if defined( debug ): printf " <--\n <-- ^@\n\n"
-
-
-proc `$`*( r: StompResponse ): string =
- ## Represent a Stomp response as a string.
- result = r.frame & ": " & $r.headers
-
-
-proc `[]`*( response: StompResponse, key: string ): string =
- ## Get a specific header from a Stomp response.
- for header in response.headers:
- if cmpIgnoreCase( key, header.name ) == 0:
- return header.value
- return nil
-
-
-#-------------------------------------------------------------------
-# C A L L B A C K S
-#-------------------------------------------------------------------
-
-proc default_error_callback( c: StompClient, response: StompResponse ) =
- ## Something bad happened. Disconnect from the server, build an error message,
- ## and raise an exception.
- c.socket.close
- c.connected = false
-
- var detail = response.payload
- var msg = response[ "message" ]
- if $detail[ ^1 ] == "\n": detail = detail[ 0 .. ^2 ] # chomp
-
- if detail.len > 0: msg = msg & " (" & detail & ")"
- raise newException( StompError, "ERROR: " & msg )
-
-
-proc default_missed_heartbeat_callback( c: StompClient ) =
- ## Timeout while connected to the broker.
- c.socket.close
- c.connected = false
- raise newException( StompError, "Heartbeat timeout. Last activity: " & $c.last_msgtime )
-
-
-
-#-------------------------------------------------------------------
-# C L I E N T
-#-------------------------------------------------------------------
-
-proc newStompClient*( s: Socket, uri: string ): StompClient =
- ## Create a new Stomp client object from a preexisting **socket**,
- ## and a stomp **URI** string.
- ##
- ## .. code-block:: nim
- ##
- ## var socket = newSocket()
- ## var stomp = newStompClient( socket, "stomp://test:test@example.com/%2Fvhost" )
- ##
- ## or if connecting with SSL, when compiled with -d:ssl:
- ##
- ## .. code-block:: nim
- ##
- ## var socket = newSocket()
- ## let sslContext = newContext( verifyMode = CVerifyNone )
- ## sslContext.wrapSocket(socket)
- ## var stomp = newStompClient( socket, "stomp+ssl://test:test@example.com/%2Fvhost" )
- ##
- new( result )
- result.socket = s
- result.connected = false
- result.uri = parse_uri( uri )
- result.username = result.uri.username
- result.password = result.uri.password
- result.host = result.uri.hostname
- result.vhost = result.uri.path
- result.timeout = 500
- result.subscriptions = @[]
- result.transactions = @[]
-
- # Parse any supported options in the query string.
- #
- for pairs in result.uri.query.split( '&' ):
- let opt = pairs.split( '=' )
- try:
- case opt[0]:
- of "heartbeat":
- result.options.heartbeat = opt[1].parse_int
- else:
- discard
- except IndexError, ValueError:
- discard
-
- # Set default STOMP port if otherwise unset.
- #
- if not result.uri.scheme.contains( "stomp" ):
- raise newException( StompError, "Unknown scheme: " & result.uri.scheme )
- var port: int
- if result.uri.port == "":
- if result.uri.scheme.contains( "+ssl" ):
- port = 61614
- else:
- port = 61613
- else:
- port = result.uri.port.parse_int
-
- result.port = Port( port )
-
- # Decode URI encoded slashes for vhosts.
- result.vhost = result.vhost.replace( "%2f", "/" ).replace( "%2F", "/" ).replace( "//", "/" )
-
-
-proc socksend( c: StompClient, data: string ): void =
- ## Send data on the connected socket with optional debug output.
- c.socket.send( data )
- if defined( debug ): printf " --> %s", data
-
-
-proc finmsg( c: StompClient ): void =
- ## Send data on the connected socket with optional debug output.
- c.socket.send( CRLF & NULL & CRLF )
- if defined( debug ): printf " --> \n --> ^@\n\n"
-
-
-proc `[]`*( c: StompClient, key: string ): string =
- ## Get a specific value from the server metadata, set during the initial connection.
- if not c.connected: return nil
- for header in c.serverinfo:
- if cmpIgnoreCase( key, header.name ) == 0:
- return header.value
- return nil
-
-
-proc `$`*( c: StompClient ): string =
- ## Represent the stomp client as a string, after masking the password.
- let uri = ( $c.uri ).replace( ":" & c.uri.password & "@", "@" )
- result = "(NimStomp v" & VERSION & ( if c.connected: " connected" else: " not connected" ) & " to " & uri
- if not c[ "server" ].is_nil:
- result.add( " --> " & c["server"] )
- result.add( ")" )
-
-
-proc connect*( c: StompClient ): void =
- ## Establish a connection to the Stomp server.
- if c.connected: return
-
- var headers: seq[ tuple[name: string, value: string] ] = @[]
- headers.add( ("accept-version", "1.2") )
-
- # Stomp 1.2 requires the Host: header. Use the path as a vhost if
- # supplied, otherwise use the hostname of the server.
- #
- if c.vhost != "":
- headers.add( ("host", c.vhost) )
- else:
- headers.add( ("host", c.host) )
-
- if c.username != "": headers.add( ("login", c.username) )
- if c.password != "": headers.add( ("passcode", c.password) )
- if c.options.heartbeat > 0:
- let heartbeat = c.options.heartbeat * 1000
- headers.add( ("heart-beat", "0," & $heartbeat) )
-
- # Connect the socket and send the headers off.
- #
- c.socket.connect( c.host, c.port )
- c.socksend( "CONNECT" & CRLF )
- for header in headers:
- c.socksend( header.name & ":" & header.value & CRLF )
- c.finmsg
-
- # Retreive and copy server metadata to client object.
- #
- var response = newStompResponse( c )
- c.serverinfo = response.headers
-
- if response.frame != "CONNECTED":
- if not isNil( c.error_callback ):
- c.error_callback( c, response )
- else:
- c.default_error_callback( response )
- else:
- c.connected = true
- if not isNil( c.connected_callback ):
- c.connected_callback( c, response )
-
-
-proc disconnect*( c: StompClient ): void =
- ## Break down the connection to the Stomp server nicely.
- if not c.connected: return
- c.socksend( "DISCONNECT" & CRLF )
- c.finmsg
-
- c.socket.close
- c.connected = false
-
-
-proc add_txn( c: StompClient ): void =
- ## Add a transaction header if there is only a single open txn.
- if c.transactions.len != 1: return
- c.socksend( "transaction:" & c.transactions[0] & CRLF )
-
-
-proc send*( c: StompClient,
- destination: string,
- message: string = nil,
- contenttype: string = nil,
- headers: seq[ tuple[name: string, value: string] ] = @[] ): void =
- ## Send a **message** to **destination**.
- ##
- ## A Content-Length header is automatically and always included.
- ## A **contenttype** is optional, but strongly recommended.
- ##
- ## Additionally, a transaction ID is automatically added if there is only
- ## one transaction active. If you need to attach this message to a particular
- ## transaction ID, you'll need to add it yourself with the user defined
- ## **headers**.
-
- if not c.connected: raise newException( StompError, "Client is not connected." )
- c.socksend( "SEND" & CRLF )
- c.socksend( "destination:" & destination & CRLF )
- c.socksend( "content-length:" & $message.len & CRLF )
- if not contenttype.is_nil: c.socksend( "content-type:" & contenttype & CRLF )
-
- # Add custom headers. Add transaction header if one isn't manually
- # present (and a transaction is open.)
- #
- var txn_seen = false
- for header in headers:
- if header.name == "transaction": txn_seen = true
- c.socksend( header.name & ":" & header.value & CRLF )
- if not txn_seen: c.add_txn
-
- if message.is_nil:
- c.finmsg
- else:
- c.socket.send( CRLF & message & NULL )
- if defined( debug ): printf " -->\n --> (payload)^@\n\n"
-
-
-proc subscribe*( c: StompClient,
- destination: string,
- ack = "auto",
- headers: seq[ tuple[name: string, value: string] ] = @[] ): void =
- ## Subscribe to messages at **destination**.
- ##
- ## Setting **ack** to "client" or "client-individual" enables client ACK/NACK mode.
- ## In this mode, incoming messages aren't considered processed by
- ## the server unless they receive ACK. By default, the server
- ## considers the message processed if a client simply accepts it.
- ##
- ## You may optionally add any additional **headers** the server may support.
-
- if not c.connected: raise newException( StompError, "Client is not connected." )
- c.socksend( "SUBSCRIBE" & CRLF )
- c.socksend( "destination:" & destination & CRLF )
- c.socksend( "id:" & $c.subscriptions.len & CRLF )
- if ack == "client" or ack == "client-individual":
- c.socksend( "ack:" & ack & CRLF )
- else:
- if ack != "auto": raise newException( StompError, "Unknown ack type: " & ack )
-
- for header in headers:
- c.socksend( header.name & ":" & header.value & CRLF )
- c.finmsg
- c.subscriptions.add( destination )
-
-
-proc unsubscribe*( c: StompClient,
- destination: string,
- headers: seq[ tuple[name: string, value: string] ] = @[] ): void =
- ## Unsubscribe from messages at **destination**.
- ## You may optionally add any additional **headers** the server may support.
-
- if not c.connected: raise newException( StompError, "Client is not connected." )
- var
- sub_id: int
- i = 0
-
- # Find the ID of the subscription.
- #
- for sub in c.subscriptions:
- if sub == destination:
- sub_id = i
- break
- i = i + 1
-
- c.socksend( "UNSUBSCRIBE" & CRLF )
- c.socksend( "id:" & $sub_id & CRLF )
- for header in headers:
- c.socksend( header.name & ":" & header.value & CRLF )
- c.finmsg
- c.subscriptions[ sub_id ] = ""
-
-
-proc begin*( c: StompClient, txn: string ): void =
- ## Begin a new transaction on the broker, using **txn** as the identifier.
- c.socksend( "BEGIN" & CRLF )
- c.socksend( "transaction:" & txn & CRLF )
- c.finmsg
- c.transactions.add( txn )
-
-
-proc commit*( c: StompClient, txn: string = nil ): void =
- ## Finish a specific transaction **txn**, or the most current if unspecified.
- var transaction = txn
- if transaction.is_nil and c.transactions.len > 0: transaction = c.transactions.pop
- if transaction.is_nil: return
-
- c.socksend( "COMMIT" & CRLF )
- c.socksend( "transaction:" & transaction & CRLF )
- c.finmsg
-
- # Remove the transaction from the queue.
- #
- var new_transactions: seq[ string ] = @[]
- for txn in c.transactions:
- if txn != transaction: new_transactions.add( txn )
- c.transactions = new_transactions
-
-
-proc abort*( c: StompClient, txn: string = nil ): void =
- ## Cancel a specific transaction **txn**, or the most current if unspecified.
- var transaction = txn
- if transaction.is_nil and c.transactions.len > 0: transaction = c.transactions.pop
- if transaction.is_nil: return
-
- c.socksend( "ABORT" & CRLF )
- c.socksend( "transaction:" & transaction & CRLF )
- c.finmsg
-
- # Remove the transaction from the queue.
- #
- var new_transactions: seq[ string ] = @[]
- for txn in c.transactions:
- if txn != transaction: new_transactions.add( txn )
- c.transactions = new_transactions
-
-
-proc ack*( c: StompClient, id: string, transaction: string = nil ): void =
- ## Acknowledge message **id**. Optionally, attach this acknowledgement
- ## to a specific **transaction** -- if there's only one active, it is
- ## added automatically.
- c.socksend( "ACK" & CRLF )
- c.socksend( "id:" & id & CRLF )
- if not transaction.is_nil:
- c.socksend( "transaction:" & transaction & CRLF )
- else:
- c.add_txn
- c.finmsg
-
-
-proc nack*( c: StompClient, id: string, transaction: string = nil ): void =
- ## Reject message **id**. Optionally, attach this rejection to a
- ## specific **transaction** -- if there's only one active, it is
- ## added automatically.
- ##
- ## Subscribe to a queue with ACK mode enabled, and reject the message
- ## on error:
- ##
- ## .. code-block:: nim
- ##
- ## stomp.subscribe( "/queue/test", "client-individual" )
- ## FIXME: attach procs
- ## stomp.wait_for_messages
- ##
- c.socksend( "NACK" & CRLF )
- c.socksend( "id:" & id & CRLF )
- if not transaction.is_nil:
- c.socksend( "transaction:" & transaction & CRLF )
- else:
- c.add_txn
- c.finmsg
-
-
-proc wait_for_messages*( c: StompClient, loop=true ) =
- ## Enter a blocking select loop, dispatching to the appropriate proc
- ## for the received message type. Return after a single message
- ## is received if **loop** is set to **false**.
-
- if not c.connected: raise newException( StompError, "Client is not connected." )
-
- while true:
- var
- timeout: int
- fds = @[ c.socket.get_fd ]
-
- # Check for missed heartbeats, with an additional second
- # of wiggle-room.
- #
- if c.options.heartbeat > 0:
- timeout = ( c.options.heartbeat + 1 ) * 1000
- else:
- timeout = -1
-
- if select( fds, timeout ) == 0: # timeout, only happens if heartbeating missed
- if not isNil( c.missed_heartbeat_callback ):
- c.missed_heartbeat_callback( c )
- else:
- c.default_missed_heartbeat_callback
- if loop: continue else: break
-
- let response = newStompResponse( c )
- case response.frame:
-
- of "HEARTBEAT":
- if not isNil( c.heartbeat_callback ):
- c.heartbeat_callback( c, response )
- continue
-
- of "RECEIPT":
- if not isNil( c.receipt_callback ):
- c.receipt_callback( c, response )
-
- of "MESSAGE":
- if not isNil( c.message_callback ):
- c.message_callback( c, response )
-
- of "ERROR":
- if not isNil( c.error_callback ):
- c.error_callback( c, response )
- else:
- c.default_error_callback( response )
-
- else:
- if defined( debug ):
- echo "Strange broker frame: " & response.repr
-
- if not loop: break
-
-
-
-#-------------------------------------------------------------------
-# T E S T S
-#-------------------------------------------------------------------
-
-# Functional (rather than unit) tests. Requires a Stomp compatible broker.
-# This was tested against RabbitMQ 3.5.3 and 3.6.0.
-# 3.6.0 was -so- much faster.
-#
-# First start up a message receiver:
-# ./stomp receiver [stomp-uri] [subscription-destination]
-#
-# then run another process, to publish stuff:
-# ./stomp publisher [stomp-uri] [publish-destination]
-#
-# An example with an AMQP "direct" exchange, and an exclusive queue:
-# ./stomp publisher stomp://test:test@localhost/?heartbeat=10 /exchange/test
-# ./stomp receiver stomp://test:test@localhost/?heartbeat=10 /exchange/test
-#
-# Then just let 'er run.
-#
-# You can also run a nieve benchmark (deliveries/sec):
-#
-# ./stomp benchmark stomp://test:test@localhost/ /exchange/test
-#
-# It will set messages to require acknowledgement, and nack everything, causing
-# a delivery loop for 10 seconds.
-#
-when isMainModule:
- let expected = 8
- var
- socket = newSocket()
- messages: seq[ StompResponse ] = @[]
-
- if paramCount() != 3: quit "See source comments for how to run functional tests."
-
- var stomp = newStompClient( socket, paramStr(2) )
- stomp.connect
- echo stomp
-
- case paramStr(1):
-
- of "benchmark":
- echo "* Running for 10 seconds. Compile with -d:debug to see the Stomp conversation."
- var count = 0
- var start = get_time()
-
- proc incr( c: StompClient, r: StompResponse ) =
- let id = r["ack"]
- count = count + 1
- c.nack( id )
-
- stomp.message_callback = incr
- stomp.subscribe( paramStr(3), "client" )
- stomp.send( paramStr(3), "hi." )
- while get_time() - start < 10:
- stomp.wait_for_messages( false )
-
- printf "* Processed %d messages in 10 seconds.\n", count
- stomp.disconnect
-
-
- # Store incoming messages, ensure their contents match our expected behavior.
- #
- of "receiver":
- var heartbeats = 0
- echo "* Waiting on messages from publisher. Compile with -d:debug to see the Stomp conversation."
-
- proc receive_message( c: StompClient, r: StompResponse ) =
- messages.add( r )
- case r.frame:
- of "RECEIPT":
- discard
- of "MESSAGE":
- let body = r.payload
- let id = r[ "ack" ]
-
- proc seen_heartbeat( c: StompClient, r: StompResponse ) =
- heartbeats = heartbeats + 1
-
- stomp.message_callback = receive_message
- stomp.receipt_callback = receive_message
- stomp.heartbeat_callback = seen_heartbeat
- stomp.subscribe( paramStr(3) )
-
- # Populate the messages sequence with the count of expected messages.
- for i in 1..expected: stomp.wait_for_messages( false )
-
- # Assertions on the results!
- #
- doAssert( messages.len == expected )
- doAssert( messages[0].payload == nil )
-
- doAssert( messages[1].payload == "Hello world!" )
-
- doAssert( messages[2].payload == "Dumb.\n\n" )
-
- doAssert( messages[3].payload == "Hello again." )
- doAssert( messages[3][ "content-type" ] == "text/plain" )
- doAssert( messages[3][ "Content-Type" ] == "text/plain" )
-
- doAssert( messages[4][ "x-custom" ] == "yum" )
-
- doAssert( messages[5][ "receipt" ] == "42" )
-
- doAssert( messages[6].payload == "transaction!" )
- doAssert( messages[7].payload == "transaction 2" )
-
- stomp.disconnect
-
- if heartbeats > 0:
- printf "* Tests passed! %d heartbeats seen.", heartbeats
- else:
- echo "* Tests passed!"
-
-
- # Publish a variety of messages with various options.
- # Pause momentarily between sends(), as brokers -might- impose
- # rate limits and/or message dropping.
- #
- of "publisher":
- echo "* Publishing to receiver. Compile with -d:debug to see the Stomp conversation."
-
- # Simple, no frills event.
- stomp.send( paramStr(3) )
- sleep 500
-
- # Event with a body.
- stomp.send( paramStr(3), "Hello world!" )
- sleep 500
-
- # Event that doesn't contain a content-length.
- # (Note, the broker may elect to add one on your behalf, which is a good thing...
- # but invalidates this test.)
- stomp.socksend( "SEND" & CRLF )
- stomp.socksend( "destination:" & paramStr(3) & CRLF & CRLF )
- stomp.socksend( "Dumb.\n\n" & NULL )
- sleep 500
-
- # Content-Type
- stomp.send( paramStr(3), "Hello again.", "text/plain" )
- sleep 500
-
- # Custom headers.
- var headers: seq[ tuple[ name: string, value: string ] ] = @[]
- headers.add( ("x-custom", "yum") )
- stomp.send( paramStr(3), "Hello again.", "text/plain", headers )
- sleep 500
-
- # Receipt requests.
- proc receive_receipt( c: StompClient, r: StompResponse ) =
- messages.add( r )
- headers = @[]
- headers.add( ("receipt", "42") )
- stomp.send( paramStr(3), "Hello again.", "text/plain", headers )
- stomp.receipt_callback = receive_receipt
- stomp.wait_for_messages( false )
- doAssert( messages[0]["receipt-id"] == "42" )
-
- # Aborted transaction.
- stomp.begin( "test-abort" )
- for i in 1..3:
- stomp.send( paramStr(3), "Message: " & $i )
- stomp.abort
-
- # Committed transaction.
- stomp.begin( "test-commit" )
- stomp.send( paramStr(3), "transaction!" )
- stomp.commit
-
- # Mixed transactions.
- for i in 1..3:
- headers = @[]
- headers.add( ("transaction", "test-" & $i ) )
- stomp.begin( "test-" & $i )
- stomp.send( paramStr(3), "transaction " & $i, nil, headers )
- sleep 500
- stomp.abort( "test-1" )
- sleep 500
- stomp.commit( "test-2" )
- sleep 500
- stomp.abort( "test-3" )
- sleep 500
-
- stomp.disconnect
- echo "* Tests passed!"
-
- else:
- quit "See source comments for how to run functional tests."
-
-
--- a/stomp.nimble Fri Mar 25 15:59:20 2016 -0700
+++ b/stomp.nimble Mon Oct 08 12:11:54 2018 -0700
@@ -1,11 +1,16 @@
+
# Package
-version = "0.1.0"
+version = "0.1.1"
author = "Mahlon E. Smith <mahlon@martini.nu>"
description = "A pure-nim implementation of the STOMP protocol for machine messaging."
license = "MIT"
+installExt = @["stomp"]
+bin = @["stomp"]
+srcDir = "src"
+
# Dependencies
-requires "nim >= 0.13.0"
+requires "nim >= 0.19.0"