Initial commit.
This commit is contained in:
commit
b9a8c5b53b
4 changed files with 887 additions and 0 deletions
3
.hgignore
Normal file
3
.hgignore
Normal file
|
|
@ -0,0 +1,3 @@
|
|||
.cache
|
||||
stomp$
|
||||
stomp.html
|
||||
26
Makefile
Normal file
26
Makefile
Normal file
|
|
@ -0,0 +1,26 @@
|
|||
|
||||
FILES = stomp.nim
|
||||
CACHE = .cache
|
||||
|
||||
default: development
|
||||
|
||||
development: ${FILES}
|
||||
nim --debugInfo --assertions:on --linedir:on -d:ssl --define:debug --nimcache:${CACHE} c ${FILES}
|
||||
|
||||
autobuild:
|
||||
# find . -depth 1 -name \*.nim | entr -cp make
|
||||
echo ${FILES} | entr -c make
|
||||
|
||||
debugger: ${FILES}
|
||||
nim --debugger:on --nimcache:${CACHE} c ${FILES}
|
||||
|
||||
release: ${FILES}
|
||||
nim -d:release --opt:speed --nimcache:${CACHE} c ${FILES}
|
||||
|
||||
docs:
|
||||
nim doc ${FILES}
|
||||
#nim buildIndex ${FILES}
|
||||
|
||||
clean:
|
||||
cat .hgignore | xargs rm -rf
|
||||
|
||||
847
stomp.nim
Normal file
847
stomp.nim
Normal file
|
|
@ -0,0 +1,847 @@
|
|||
# vim: set et nosta sw=4 ts=4 ft=nim :
|
||||
#
|
||||
# Copyright (c) 2016, Mahlon E. Smith <mahlon@martini.nu>
|
||||
# All rights reserved.
|
||||
# Redistribution and use in source and binary forms, with or without
|
||||
# modification, are permitted provided that the following conditions are met:
|
||||
#
|
||||
# * Redistributions of source code must retain the above copyright
|
||||
# notice, this list of conditions and the following disclaimer.
|
||||
#
|
||||
# * Redistributions in binary form must reproduce the above copyright
|
||||
# notice, this list of conditions and the following disclaimer in the
|
||||
# documentation and/or other materials provided with the distribution.
|
||||
#
|
||||
# * Neither the name of Mahlon E. Smith nor the names of his
|
||||
# contributors may be used to endorse or promote products derived
|
||||
# from this software without specific prior written permission.
|
||||
#
|
||||
# THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND ANY
|
||||
# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
|
||||
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
|
||||
# DISCLAIMED. IN NO EVENT SHALL THE REGENTS AND CONTRIBUTORS BE LIABLE FOR ANY
|
||||
# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
|
||||
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
|
||||
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
|
||||
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
||||
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
|
||||
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
|
||||
## Overview
|
||||
## ============
|
||||
##
|
||||
## This is a pure-nim client library for interacting with Stomp 1.2
|
||||
## compliant messaging brokers.
|
||||
##
|
||||
## https://stomp.github.io/stomp-specification-1.2.html
|
||||
##
|
||||
## Stomp is a simple protocol for message passing between clients, using a central
|
||||
## broker. It is a subset of other more elaborate protocols (like AMQP), supporting
|
||||
## only the most used features of common brokers.
|
||||
##
|
||||
## Because this library is pure-nim, there are no external dependencies. If you
|
||||
## can compile a nim binary, you can participate in advanced messaging between processes.
|
||||
##
|
||||
## A list of broker support for Stomp can be found here:
|
||||
## https://stomp.github.io/implementations.html.
|
||||
##
|
||||
## This library has been tested with recent versions of RabbitMQ. If it
|
||||
## works for you with another broker, please let the author know.
|
||||
##
|
||||
##
|
||||
## Protocol support
|
||||
## ----------------
|
||||
##
|
||||
## Examples
|
||||
## =========
|
||||
##
|
||||
## Connecting with SSL
|
||||
## -------------------
|
||||
##
|
||||
|
||||
import
|
||||
strutils,
|
||||
nativesockets,
|
||||
net,
|
||||
os,
|
||||
times,
|
||||
uri
|
||||
|
||||
const
|
||||
VERSION = "0.1.0" ## The current program version.
|
||||
NULL = "\x00" ## The NULL character.
|
||||
CR = "\r" ## The carriage return character.
|
||||
CRLF = "\r\n" ## Carriage return + Line feed (EOL).
|
||||
|
||||
|
||||
# Exceptions
|
||||
#
|
||||
type
|
||||
StompError* = object of ValueError ## A generic Stomp error state.
|
||||
|
||||
StompClient* = ref object of RootObj ## An object that represents a connection to a Stomp compatible server.
|
||||
socket: Socket ## The socket object attached to this client.
|
||||
connected*: bool ## Is the client currently connected?
|
||||
uri*: Uri ## The URI used to instantiate this client.
|
||||
username: string ## The Stomp server user, if any.
|
||||
password: string ## The Stomp server password, if any.
|
||||
host: string ## The host or IP address to connect to.
|
||||
port: Port ## Optional, if Stomp is on a non-default port.
|
||||
vhost: string ## Parsed from the URI path, a Stomp "virtual host".
|
||||
timeout*: int ## Global socket timeout.
|
||||
last_msgtime*: Time ## Timestamp of last seen server message.
|
||||
options: tuple[ heartbeat: int ] ## Any supported client options, derived from the URI query string.
|
||||
subscriptions*: seq[ string ] ## Registered client subscriptions. Array position is the ID.
|
||||
transactions*: seq[ string ] ## Any currently open transactions.
|
||||
serverinfo: seq[ tuple[name: string, value: string] ] ## Server metadata, populated upon a successful connection.
|
||||
|
||||
connected_callback*: proc ( client: StompClient, response: StompResponse ): void
|
||||
error_callback*: proc ( client: StompClient, response: StompResponse ): void
|
||||
heartbeat_callback*: proc ( client: StompClient, response: StompResponse ): void
|
||||
message_callback*: proc ( client: StompClient, response: StompResponse ): void
|
||||
missed_heartbeat_callback*: proc ( client: StompClient ): void
|
||||
receipt_callback*: proc ( client: StompClient, response: StompResponse ): void
|
||||
|
||||
StompResponse* = ref object of RootObj ## A parsed packet from a Stomp server.
|
||||
headers: seq[ tuple[name: string, value: string] ] ## Any headers in the response. Access with the `[]` proc.
|
||||
frame*: string ## The Stomp frame type.
|
||||
payload*: string ## The message body, if any.
|
||||
|
||||
|
||||
# convenience
|
||||
proc printf( formatstr: cstring ) {.header: "<stdio.h>", varargs.}
|
||||
|
||||
|
||||
#-------------------------------------------------------------------
|
||||
# R E S P O N S E
|
||||
#-------------------------------------------------------------------
|
||||
|
||||
proc is_eol( s: string ): bool =
|
||||
## Convenience method, returns **true** if string is a Stomp EOF.
|
||||
return s == CR or s == CRLF
|
||||
|
||||
|
||||
proc parse_headers( response: StompResponse, c: StompClient ): int =
|
||||
## Parse response headers from a stream.
|
||||
## Returns the content length of the response body, or 0.
|
||||
result = 0
|
||||
var line = ""
|
||||
|
||||
c.socket.readline( line, c.timeout )
|
||||
while not line.is_eol:
|
||||
if defined( debug ): printf " <-- %s\n", line
|
||||
var header = line.split( ":" )
|
||||
if header.len < 2: break
|
||||
response.headers.add( (header[0], header[1]) )
|
||||
if cmpIgnoreCase( header[0], "content-length" ) == 0: result = header[1].parse_int
|
||||
c.socket.readline( line, c.timeout )
|
||||
|
||||
|
||||
proc parse_payload( response: StompResponse, c: StompClient, bodylength = 0 ): void =
|
||||
## Parse message payload from a stream.
|
||||
let bufsize = 8192
|
||||
var
|
||||
buf = ""
|
||||
data = ""
|
||||
|
||||
|
||||
# If we already know the length of the body, just perform a buffered read.
|
||||
#
|
||||
if bodylength > 0:
|
||||
var
|
||||
readtotal = 0
|
||||
readamt = 0
|
||||
remaining = 0
|
||||
|
||||
while readtotal != bodylength:
|
||||
remaining = bodylength - readtotal
|
||||
|
||||
if remaining < bufsize:
|
||||
readamt = remaining
|
||||
else:
|
||||
readamt = bufsize
|
||||
|
||||
buf = newString( readamt )
|
||||
readtotal = readtotal + c.socket.recv( buf, readamt, c.timeout )
|
||||
data = data & buf
|
||||
|
||||
# Eat the NULL terminator.
|
||||
discard c.socket.recv( buf, 1, c.timeout )
|
||||
|
||||
# Inefficient path.
|
||||
#
|
||||
else:
|
||||
while buf != NULL:
|
||||
discard c.socket.recv( buf, 1, c.timeout )
|
||||
data = data & buf
|
||||
|
||||
response.payload = data
|
||||
|
||||
|
||||
proc newStompResponse( c: StompClient ): StompResponse =
|
||||
## Initialize a response object, which parses and contains
|
||||
## the Stomp headers and any additional important data from
|
||||
## the broker socket.
|
||||
new( result )
|
||||
result.headers = @[]
|
||||
|
||||
# Get the frame type, record last seen server activity time.
|
||||
#
|
||||
var line = ""
|
||||
c.socket.readline( line, c.timeout )
|
||||
c.last_msgtime = get_time()
|
||||
|
||||
# Heartbeat packets (empties.)
|
||||
#
|
||||
# This could -also- parse optional EOLs from the prior
|
||||
# message (after the NULL separator), but since it is a no-op,
|
||||
# this seems harmless.
|
||||
#
|
||||
if line.is_eol:
|
||||
result.frame = "HEARTBEAT"
|
||||
return result
|
||||
|
||||
# All other types.
|
||||
#
|
||||
result.frame = line
|
||||
if defined( debug ):
|
||||
printf " <-- %s\n", line
|
||||
|
||||
# Parse headers and body.
|
||||
#
|
||||
var length = result.parse_headers( c )
|
||||
if result.frame == "MESSAGE" or result.frame == "ERROR":
|
||||
result.parse_payload( c, length )
|
||||
|
||||
# If the response -could- have a body, the NULL has already
|
||||
# been removed from the stream while we checked for one.
|
||||
#
|
||||
if result.payload.len > 0:
|
||||
if result.payload == NULL: # We checked for a body, but there was none.
|
||||
result.payload = nil
|
||||
if defined( debug ): printf " <--\n <-- ^@\n\n"
|
||||
else:
|
||||
if defined( debug ): printf " <--\n <-- (payload)^@\n\n"
|
||||
|
||||
# Otherwise, pop off the NULL terminator now.
|
||||
#
|
||||
else:
|
||||
discard c.socket.recv( line, 1, c.timeout )
|
||||
if defined( debug ): printf " <--\n <-- ^@\n\n"
|
||||
|
||||
|
||||
proc `$`*( r: StompResponse ): string =
|
||||
## Represent a Stomp response as a string.
|
||||
result = r.frame & ": " & $r.headers
|
||||
|
||||
|
||||
proc `[]`*( response: StompResponse, key: string ): string =
|
||||
## Get a specific header from a Stomp response.
|
||||
for header in response.headers:
|
||||
if cmpIgnoreCase( key, header.name ) == 0:
|
||||
return header.value
|
||||
return nil
|
||||
|
||||
|
||||
#-------------------------------------------------------------------
|
||||
# C A L L B A C K S
|
||||
#-------------------------------------------------------------------
|
||||
|
||||
proc default_error_callback( c: StompClient, response: StompResponse ) =
|
||||
## Something bad happened. Disconnect from the server, build an error message,
|
||||
## and raise an exception.
|
||||
c.socket.close
|
||||
c.connected = false
|
||||
|
||||
var detail = response.payload
|
||||
var msg = response[ "message" ]
|
||||
if $detail[ ^1 ] == "\n": detail = detail[ 0 .. ^2 ] # chomp
|
||||
|
||||
if detail.len > 0: msg = msg & " (" & detail & ")"
|
||||
raise newException( StompError, "ERROR: " & msg )
|
||||
|
||||
|
||||
proc default_missed_heartbeat_callback( c: StompClient ) =
|
||||
## Timeout while connected to the broker.
|
||||
c.socket.close
|
||||
c.connected = false
|
||||
raise newException( StompError, "Heartbeat timeout. Last activity: " & $c.last_msgtime )
|
||||
|
||||
|
||||
|
||||
#-------------------------------------------------------------------
|
||||
# C L I E N T
|
||||
#-------------------------------------------------------------------
|
||||
|
||||
proc newStompClient*( s: Socket, uri: string ): StompClient =
|
||||
## Create a new Stomp client object from a preexisting **socket**,
|
||||
## and a stomp **URI** string.
|
||||
##
|
||||
## .. code-block:: nim
|
||||
##
|
||||
## var socket = newSocket()
|
||||
## var stomp = newStompClient( socket, "stomp://test:test@example.com/%2Fvhost" )
|
||||
##
|
||||
## or if connecting with SSL, when compiled with -d:ssl:
|
||||
##
|
||||
## .. code-block:: nim
|
||||
##
|
||||
## var socket = newSocket()
|
||||
## let sslContext = newContext( verifyMode = CVerifyNone )
|
||||
## sslContext.wrapSocket(socket)
|
||||
## var stomp = newStompClient( socket, "stomp+ssl://test:test@example.com/%2Fvhost" )
|
||||
##
|
||||
new( result )
|
||||
result.socket = s
|
||||
result.connected = false
|
||||
result.uri = parse_uri( uri )
|
||||
result.username = result.uri.username
|
||||
result.password = result.uri.password
|
||||
result.host = result.uri.hostname
|
||||
result.vhost = result.uri.path
|
||||
result.timeout = 500
|
||||
result.subscriptions = @[]
|
||||
result.transactions = @[]
|
||||
|
||||
# Parse any supported options in the query string.
|
||||
#
|
||||
for pairs in result.uri.query.split( '&' ):
|
||||
let opt = pairs.split( '=' )
|
||||
try:
|
||||
case opt[0]:
|
||||
of "heartbeat":
|
||||
result.options.heartbeat = opt[1].parse_int
|
||||
else:
|
||||
discard
|
||||
except IndexError, ValueError:
|
||||
discard
|
||||
|
||||
# Set default STOMP port if otherwise unset.
|
||||
#
|
||||
if not result.uri.scheme.contains( "stomp" ):
|
||||
raise newException( StompError, "Unknown scheme: " & result.uri.scheme )
|
||||
var port: int
|
||||
if result.uri.port == "":
|
||||
if result.uri.scheme.contains( "+ssl" ):
|
||||
port = 61614
|
||||
else:
|
||||
port = 61613
|
||||
else:
|
||||
port = result.uri.port.parse_int
|
||||
|
||||
result.port = Port( port )
|
||||
|
||||
# Decode URI encoded slashes for vhosts.
|
||||
result.vhost = result.vhost.replace( "%2f", "/" ).replace( "%2F", "/" ).replace( "//", "/" )
|
||||
|
||||
|
||||
proc socksend( c: StompClient, data: string ): void =
|
||||
## Send data on the connected socket with optional debug output.
|
||||
c.socket.send( data )
|
||||
if defined( debug ): printf " --> %s", data
|
||||
|
||||
|
||||
proc finmsg( c: StompClient ): void =
|
||||
## Send data on the connected socket with optional debug output.
|
||||
c.socket.send( CRLF & NULL & CRLF )
|
||||
if defined( debug ): printf " --> \n --> ^@\n\n"
|
||||
|
||||
|
||||
proc `[]`*( c: StompClient, key: string ): string =
|
||||
## Get a specific value from the server metadata, set during the initial connection.
|
||||
if not c.connected: return nil
|
||||
for header in c.serverinfo:
|
||||
if cmpIgnoreCase( key, header.name ) == 0:
|
||||
return header.value
|
||||
return nil
|
||||
|
||||
|
||||
proc `$`*( c: StompClient ): string =
|
||||
## Represent the stomp client as a string, after masking the password.
|
||||
let uri = ( $c.uri ).replace( ":" & c.uri.password & "@", "@" )
|
||||
result = "(NimStomp v" & VERSION & ( if c.connected: " connected" else: " not connected" ) & " to " & uri
|
||||
if not c[ "server" ].is_nil:
|
||||
result.add( " --> " & c["server"] )
|
||||
result.add( ")" )
|
||||
|
||||
|
||||
proc connect*( c: StompClient ): void =
|
||||
## Establish a connection to the Stomp server.
|
||||
if c.connected: return
|
||||
|
||||
var headers: seq[ tuple[name: string, value: string] ] = @[]
|
||||
headers.add( ("accept-version", "1.2") )
|
||||
|
||||
# Stomp 1.2 requires the Host: header. Use the path as a vhost if
|
||||
# supplied, otherwise use the hostname of the server.
|
||||
#
|
||||
if c.vhost != "":
|
||||
headers.add( ("host", c.vhost) )
|
||||
else:
|
||||
headers.add( ("host", c.host) )
|
||||
|
||||
if c.username != "": headers.add( ("login", c.username) )
|
||||
if c.password != "": headers.add( ("passcode", c.password) )
|
||||
if c.options.heartbeat > 0:
|
||||
let heartbeat = c.options.heartbeat * 1000
|
||||
headers.add( ("heart-beat", "0," & $heartbeat) )
|
||||
|
||||
# Connect the socket and send the headers off.
|
||||
#
|
||||
c.socket.connect( c.host, c.port )
|
||||
c.socksend( "CONNECT" & CRLF )
|
||||
for header in headers:
|
||||
c.socksend( header.name & ":" & header.value & CRLF )
|
||||
c.finmsg
|
||||
|
||||
# Retreive and copy server metadata to client object.
|
||||
#
|
||||
var response = newStompResponse( c )
|
||||
c.serverinfo = response.headers
|
||||
|
||||
if response.frame != "CONNECTED":
|
||||
if not isNil( c.error_callback ):
|
||||
c.error_callback( c, response )
|
||||
else:
|
||||
c.default_error_callback( response )
|
||||
else:
|
||||
c.connected = true
|
||||
if not isNil( c.connected_callback ):
|
||||
c.connected_callback( c, response )
|
||||
|
||||
|
||||
proc disconnect*( c: StompClient ): void =
|
||||
## Break down the connection to the Stomp server nicely.
|
||||
if not c.connected: return
|
||||
c.socksend( "DISCONNECT" & CRLF )
|
||||
c.finmsg
|
||||
|
||||
c.socket.close
|
||||
c.connected = false
|
||||
|
||||
|
||||
proc add_txn( c: StompClient ): void =
|
||||
## Add a transaction header if there is only a single open txn.
|
||||
if c.transactions.len != 1: return
|
||||
c.socksend( "transaction:" & c.transactions[0] & CRLF )
|
||||
|
||||
|
||||
proc send*( c: StompClient,
|
||||
destination: string,
|
||||
message: string = nil,
|
||||
contenttype: string = nil,
|
||||
headers: seq[ tuple[name: string, value: string] ] = @[] ): void =
|
||||
## Send a **message** to **destination**.
|
||||
##
|
||||
## A Content-Length header is automatically and always included.
|
||||
## A **contenttype** is optional, but strongly recommended.
|
||||
##
|
||||
## Additionally, a transaction ID is automatically added if there is only
|
||||
## one transaction active. If you need to attach this message to a particular
|
||||
## transaction ID, you'll need to add it yourself with the user defined
|
||||
## **headers**.
|
||||
|
||||
if not c.connected: raise newException( StompError, "Client is not connected." )
|
||||
c.socksend( "SEND" & CRLF )
|
||||
c.socksend( "destination:" & destination & CRLF )
|
||||
c.socksend( "content-length:" & $message.len & CRLF )
|
||||
if not contenttype.is_nil: c.socksend( "content-type:" & contenttype & CRLF )
|
||||
|
||||
# Add custom headers. Add transaction header if one isn't manually
|
||||
# present (and a transaction is open.)
|
||||
#
|
||||
var txn_seen = false
|
||||
for header in headers:
|
||||
if header.name == "transaction": txn_seen = true
|
||||
c.socksend( header.name & ":" & header.value & CRLF )
|
||||
if not txn_seen: c.add_txn
|
||||
|
||||
if message.is_nil:
|
||||
c.finmsg
|
||||
else:
|
||||
c.socket.send( CRLF & message & NULL )
|
||||
if defined( debug ): printf " -->\n --> (payload)^@\n\n"
|
||||
|
||||
|
||||
proc subscribe*( c: StompClient,
|
||||
destination: string,
|
||||
ack = "auto",
|
||||
headers: seq[ tuple[name: string, value: string] ] = @[] ): void =
|
||||
## Subscribe to messages at **destination**.
|
||||
##
|
||||
## Setting **ack** to "client" or "client-individual" enables client ACK/NACK mode.
|
||||
## In this mode, incoming messages aren't considered processed by
|
||||
## the server unless they receive ACK. By default, the server
|
||||
## considers the message processed if a client simply accepts it.
|
||||
##
|
||||
## You may optionally add any additional **headers** the server may support.
|
||||
|
||||
if not c.connected: raise newException( StompError, "Client is not connected." )
|
||||
c.socksend( "SUBSCRIBE" & CRLF )
|
||||
c.socksend( "destination:" & destination & CRLF )
|
||||
c.socksend( "id:" & $c.subscriptions.len & CRLF )
|
||||
if ack == "client" or ack == "client-individual":
|
||||
c.socksend( "ack:" & ack & CRLF )
|
||||
else:
|
||||
if ack != "auto": raise newException( StompError, "Unknown ack type: " & ack )
|
||||
|
||||
for header in headers:
|
||||
c.socksend( header.name & ":" & header.value & CRLF )
|
||||
c.finmsg
|
||||
c.subscriptions.add( destination )
|
||||
|
||||
|
||||
proc unsubscribe*( c: StompClient,
|
||||
destination: string,
|
||||
headers: seq[ tuple[name: string, value: string] ] = @[] ): void =
|
||||
## Unsubscribe from messages at **destination**.
|
||||
## You may optionally add any additional **headers** the server may support.
|
||||
|
||||
if not c.connected: raise newException( StompError, "Client is not connected." )
|
||||
var
|
||||
sub_id: int
|
||||
i = 0
|
||||
|
||||
# Find the ID of the subscription.
|
||||
#
|
||||
for sub in c.subscriptions:
|
||||
if sub == destination:
|
||||
sub_id = i
|
||||
break
|
||||
i = i + 1
|
||||
|
||||
c.socksend( "UNSUBSCRIBE" & CRLF )
|
||||
c.socksend( "id:" & $sub_id & CRLF )
|
||||
for header in headers:
|
||||
c.socksend( header.name & ":" & header.value & CRLF )
|
||||
c.finmsg
|
||||
c.subscriptions[ sub_id ] = ""
|
||||
|
||||
|
||||
proc begin*( c: StompClient, txn: string ): void =
|
||||
## Begin a new transaction on the broker, using **txn** as the identifier.
|
||||
c.socksend( "BEGIN" & CRLF )
|
||||
c.socksend( "transaction:" & txn & CRLF )
|
||||
c.finmsg
|
||||
c.transactions.add( txn )
|
||||
|
||||
|
||||
proc commit*( c: StompClient, txn: string = nil ): void =
|
||||
## Finish a specific transaction **txn**, or the most current if unspecified.
|
||||
var transaction = txn
|
||||
if transaction.is_nil and c.transactions.len > 0: transaction = c.transactions.pop
|
||||
if transaction.is_nil: return
|
||||
|
||||
c.socksend( "COMMIT" & CRLF )
|
||||
c.socksend( "transaction:" & transaction & CRLF )
|
||||
c.finmsg
|
||||
|
||||
# Remove the transaction from the queue.
|
||||
#
|
||||
var new_transactions: seq[ string ] = @[]
|
||||
for txn in c.transactions:
|
||||
if txn != transaction: new_transactions.add( txn )
|
||||
c.transactions = new_transactions
|
||||
|
||||
|
||||
proc abort*( c: StompClient, txn: string = nil ): void =
|
||||
## Cancel a specific transaction **txn**, or the most current if unspecified.
|
||||
var transaction = txn
|
||||
if transaction.is_nil and c.transactions.len > 0: transaction = c.transactions.pop
|
||||
if transaction.is_nil: return
|
||||
|
||||
c.socksend( "ABORT" & CRLF )
|
||||
c.socksend( "transaction:" & transaction & CRLF )
|
||||
c.finmsg
|
||||
|
||||
# Remove the transaction from the queue.
|
||||
#
|
||||
var new_transactions: seq[ string ] = @[]
|
||||
for txn in c.transactions:
|
||||
if txn != transaction: new_transactions.add( txn )
|
||||
c.transactions = new_transactions
|
||||
|
||||
|
||||
proc ack*( c: StompClient, id: string, transaction: string = nil ): void =
|
||||
## Acknowledge message **id**. Optionally, attach this acknowledgement
|
||||
## to a specific **transaction** -- if there's only one active, it is
|
||||
## added automatically.
|
||||
c.socksend( "ACK" & CRLF )
|
||||
c.socksend( "id:" & id & CRLF )
|
||||
if not transaction.is_nil:
|
||||
c.socksend( "transaction:" & transaction & CRLF )
|
||||
else:
|
||||
c.add_txn
|
||||
c.finmsg
|
||||
|
||||
|
||||
proc nack*( c: StompClient, id: string, transaction: string = nil ): void =
|
||||
## Reject message **id**. Optionally, attach this rejection to a
|
||||
## specific **transaction** -- if there's only one active, it is
|
||||
## added automatically.
|
||||
##
|
||||
## Subscribe to a queue with ACK mode enabled, and reject the message
|
||||
## on error:
|
||||
##
|
||||
## .. code-block:: nim
|
||||
##
|
||||
## stomp.subscribe( "/queue/test", "client-individual" )
|
||||
## FIXME: attach procs
|
||||
## stomp.wait_for_messages
|
||||
##
|
||||
c.socksend( "NACK" & CRLF )
|
||||
c.socksend( "id:" & id & CRLF )
|
||||
if not transaction.is_nil:
|
||||
c.socksend( "transaction:" & transaction & CRLF )
|
||||
else:
|
||||
c.add_txn
|
||||
c.finmsg
|
||||
|
||||
|
||||
proc wait_for_messages*( c: StompClient, loop=true ) =
|
||||
## Enter a blocking select loop, dispatching to the appropriate proc
|
||||
## for the received message type. Return after a single message
|
||||
## is received if **loop** is set to **false**.
|
||||
|
||||
if not c.connected: raise newException( StompError, "Client is not connected." )
|
||||
|
||||
while true:
|
||||
var
|
||||
timeout: int
|
||||
fds = @[ c.socket.get_fd ]
|
||||
|
||||
# Check for missed heartbeats, with an additional second
|
||||
# of wiggle-room.
|
||||
#
|
||||
if c.options.heartbeat > 0:
|
||||
timeout = ( c.options.heartbeat + 1 ) * 1000
|
||||
else:
|
||||
timeout = -1
|
||||
|
||||
if select( fds, timeout ) == 0: # timeout, only happens if heartbeating missed
|
||||
if not isNil( c.missed_heartbeat_callback ):
|
||||
c.missed_heartbeat_callback( c )
|
||||
else:
|
||||
c.default_missed_heartbeat_callback
|
||||
if loop: continue else: break
|
||||
|
||||
let response = newStompResponse( c )
|
||||
case response.frame:
|
||||
|
||||
of "HEARTBEAT":
|
||||
if not isNil( c.heartbeat_callback ):
|
||||
c.heartbeat_callback( c, response )
|
||||
continue
|
||||
|
||||
of "RECEIPT":
|
||||
if not isNil( c.receipt_callback ):
|
||||
c.receipt_callback( c, response )
|
||||
|
||||
of "MESSAGE":
|
||||
if not isNil( c.message_callback ):
|
||||
c.message_callback( c, response )
|
||||
|
||||
of "ERROR":
|
||||
if not isNil( c.error_callback ):
|
||||
c.error_callback( c, response )
|
||||
else:
|
||||
c.default_error_callback( response )
|
||||
|
||||
else:
|
||||
if defined( debug ):
|
||||
echo "Strange broker frame: " & response.repr
|
||||
|
||||
if not loop: break
|
||||
|
||||
|
||||
|
||||
#-------------------------------------------------------------------
|
||||
# T E S T S
|
||||
#-------------------------------------------------------------------
|
||||
|
||||
# Functional (rather than unit) tests. Requires a Stomp compatible broker.
|
||||
# This was tested against RabbitMQ 3.5.3 and 3.6.0.
|
||||
# 3.6.0 was -so- much faster.
|
||||
#
|
||||
# First start up a message receiver:
|
||||
# ./stomp receiver [stomp-uri] [subscription-destination]
|
||||
#
|
||||
# then run another process, to publish stuff:
|
||||
# ./stomp publisher [stomp-uri] [publish-destination]
|
||||
#
|
||||
# An example with an AMQP "direct" exchange, and an exclusive queue:
|
||||
# ./stomp publisher stomp://test:test@localhost/?heartbeat=10 /exchange/test
|
||||
# ./stomp receiver stomp://test:test@localhost/?heartbeat=10 /exchange/test
|
||||
#
|
||||
# Then just let 'er run.
|
||||
#
|
||||
# You can also run a nieve benchmark (deliveries/sec):
|
||||
#
|
||||
# ./stomp benchmark stomp://test:test@localhost/ /exchange/test
|
||||
#
|
||||
# It will set messages to require acknowledgement, and nack everything, causing
|
||||
# a delivery loop for 10 seconds.
|
||||
#
|
||||
when isMainModule:
|
||||
let expected = 8
|
||||
var
|
||||
socket = newSocket()
|
||||
messages: seq[ StompResponse ] = @[]
|
||||
|
||||
if paramCount() != 3: quit "See source comments for how to run functional tests."
|
||||
|
||||
var stomp = newStompClient( socket, paramStr(2) )
|
||||
stomp.connect
|
||||
echo stomp
|
||||
|
||||
case paramStr(1):
|
||||
|
||||
of "benchmark":
|
||||
echo "* Running for 10 seconds. Compile with -d:debug to see the Stomp conversation."
|
||||
var count = 0
|
||||
var start = get_time()
|
||||
|
||||
proc incr( c: StompClient, r: StompResponse ) =
|
||||
let id = r["ack"]
|
||||
count = count + 1
|
||||
c.nack( id )
|
||||
|
||||
stomp.message_callback = incr
|
||||
stomp.subscribe( paramStr(3), "client" )
|
||||
stomp.send( paramStr(3), "hi." )
|
||||
while get_time() - start < 10:
|
||||
stomp.wait_for_messages( false )
|
||||
|
||||
printf "* Processed %d messages in 10 seconds.\n", count
|
||||
stomp.disconnect
|
||||
|
||||
|
||||
# Store incoming messages, ensure their contents match our expected behavior.
|
||||
#
|
||||
of "receiver":
|
||||
var heartbeats = 0
|
||||
echo "* Waiting on messages from publisher. Compile with -d:debug to see the Stomp conversation."
|
||||
|
||||
proc receive_message( c: StompClient, r: StompResponse ) =
|
||||
messages.add( r )
|
||||
case r.frame:
|
||||
of "RECEIPT":
|
||||
discard
|
||||
of "MESSAGE":
|
||||
let body = r.payload
|
||||
let id = r[ "ack" ]
|
||||
|
||||
proc seen_heartbeat( c: StompClient, r: StompResponse ) =
|
||||
heartbeats = heartbeats + 1
|
||||
|
||||
stomp.message_callback = receive_message
|
||||
stomp.receipt_callback = receive_message
|
||||
stomp.heartbeat_callback = seen_heartbeat
|
||||
stomp.subscribe( paramStr(3) )
|
||||
|
||||
# Populate the messages sequence with the count of expected messages.
|
||||
for i in 1..expected: stomp.wait_for_messages( false )
|
||||
|
||||
# Assertions on the results!
|
||||
#
|
||||
doAssert( messages.len == expected )
|
||||
doAssert( messages[0].payload == nil )
|
||||
|
||||
doAssert( messages[1].payload == "Hello world!" )
|
||||
|
||||
doAssert( messages[2].payload == "Dumb.\n\n" )
|
||||
|
||||
doAssert( messages[3].payload == "Hello again." )
|
||||
doAssert( messages[3][ "content-type" ] == "text/plain" )
|
||||
doAssert( messages[3][ "Content-Type" ] == "text/plain" )
|
||||
|
||||
doAssert( messages[4][ "x-custom" ] == "yum" )
|
||||
|
||||
doAssert( messages[5][ "receipt" ] == "42" )
|
||||
|
||||
doAssert( messages[6].payload == "transaction!" )
|
||||
doAssert( messages[7].payload == "transaction 2" )
|
||||
|
||||
stomp.disconnect
|
||||
|
||||
if heartbeats > 0:
|
||||
printf "* Tests passed! %d heartbeats seen.", heartbeats
|
||||
else:
|
||||
echo "* Tests passed!"
|
||||
|
||||
|
||||
# Publish a variety of messages with various options.
|
||||
# Pause momentarily between sends(), as brokers -might- impose
|
||||
# rate limits and/or message dropping.
|
||||
#
|
||||
of "publisher":
|
||||
echo "* Publishing to receiver. Compile with -d:debug to see the Stomp conversation."
|
||||
|
||||
# Simple, no frills event.
|
||||
stomp.send( paramStr(3) )
|
||||
sleep 500
|
||||
|
||||
# Event with a body.
|
||||
stomp.send( paramStr(3), "Hello world!" )
|
||||
sleep 500
|
||||
|
||||
# Event that doesn't contain a content-length.
|
||||
# (Note, the broker may elect to add one on your behalf, which is a good thing...
|
||||
# but invalidates this test.)
|
||||
stomp.socksend( "SEND" & CRLF )
|
||||
stomp.socksend( "destination:" & paramStr(3) & CRLF & CRLF )
|
||||
stomp.socksend( "Dumb.\n\n" & NULL )
|
||||
sleep 500
|
||||
|
||||
# Content-Type
|
||||
stomp.send( paramStr(3), "Hello again.", "text/plain" )
|
||||
sleep 500
|
||||
|
||||
# Custom headers.
|
||||
var headers: seq[ tuple[ name: string, value: string ] ] = @[]
|
||||
headers.add( ("x-custom", "yum") )
|
||||
stomp.send( paramStr(3), "Hello again.", "text/plain", headers )
|
||||
sleep 500
|
||||
|
||||
# Receipt requests.
|
||||
proc receive_receipt( c: StompClient, r: StompResponse ) =
|
||||
messages.add( r )
|
||||
headers = @[]
|
||||
headers.add( ("receipt", "42") )
|
||||
stomp.send( paramStr(3), "Hello again.", "text/plain", headers )
|
||||
stomp.receipt_callback = receive_receipt
|
||||
stomp.wait_for_messages( false )
|
||||
doAssert( messages[0]["receipt-id"] == "42" )
|
||||
|
||||
# Aborted transaction.
|
||||
stomp.begin( "test-abort" )
|
||||
for i in 1..3:
|
||||
stomp.send( paramStr(3), "Message: " & $i )
|
||||
stomp.abort
|
||||
|
||||
# Committed transaction.
|
||||
stomp.begin( "test-commit" )
|
||||
stomp.send( paramStr(3), "transaction!" )
|
||||
stomp.commit
|
||||
|
||||
# Mixed transactions.
|
||||
for i in 1..3:
|
||||
headers = @[]
|
||||
headers.add( ("transaction", "test-" & $i ) )
|
||||
stomp.begin( "test-" & $i )
|
||||
stomp.send( paramStr(3), "transaction " & $i, nil, headers )
|
||||
sleep 500
|
||||
stomp.abort( "test-1" )
|
||||
sleep 500
|
||||
stomp.commit( "test-2" )
|
||||
sleep 500
|
||||
stomp.abort( "test-3" )
|
||||
sleep 500
|
||||
|
||||
stomp.disconnect
|
||||
echo "* Tests passed!"
|
||||
|
||||
else:
|
||||
quit "See source comments for how to run functional tests."
|
||||
|
||||
|
||||
11
stomp.nimble
Normal file
11
stomp.nimble
Normal file
|
|
@ -0,0 +1,11 @@
|
|||
# Package
|
||||
|
||||
version = "0.1.0"
|
||||
author = "Mahlon E. Smith <mahlon@martini.nu>"
|
||||
description = "A pure-nim implementation of the STOMP protocol for machine messaging."
|
||||
license = "MIT"
|
||||
|
||||
# Dependencies
|
||||
|
||||
requires "nim >= 0.13.0"
|
||||
|
||||
Loading…
Add table
Add a link
Reference in a new issue