src/stomp.nim
changeset 8 363f275588ea
parent 7 9c5ce539b081
child 9 ad53c6500712
equal deleted inserted replaced
7:9c5ce539b081 8:363f275588ea
     1 # vim: set et nosta sw=4 ts=4 ft=nim : 
     1 # vim: set et nosta sw=4 ts=4 ft=nim : 
     2 #
     2 #
     3 # Copyright (c) 2016-2019, Mahlon E. Smith <mahlon@martini.nu>
     3 # Copyright (c) 2016-2021, Mahlon E. Smith <mahlon@martini.nu>
     4 # All rights reserved.
     4 # All rights reserved.
     5 # Redistribution and use in source and binary forms, with or without
     5 # Redistribution and use in source and binary forms, with or without
     6 # modification, are permitted provided that the following conditions are met:
     6 # modification, are permitted provided that the following conditions are met:
     7 #
     7 #
     8 #     * Redistributions of source code must retain the above copyright
     8 #     * Redistributions of source code must retain the above copyright
   100 
   100 
   101 # convenience
   101 # convenience
   102 proc printf( formatstr: cstring ) {.header: "<stdio.h>", varargs.}
   102 proc printf( formatstr: cstring ) {.header: "<stdio.h>", varargs.}
   103 
   103 
   104 
   104 
       
   105 proc encode( str: string ): string =
       
   106     ## Encode value value strings per the "Value Encoding" section
       
   107     ## of the Stomp 1.2 spec.
       
   108     result = str
       
   109     result = result.
       
   110         replace( "\r", "\\r" ).
       
   111         replace( "\n", "\\n" ).
       
   112         replace( "\\", "\\\\" ).
       
   113         replace( ":", "\\c" )
       
   114 
       
   115 
   105 #-------------------------------------------------------------------
   116 #-------------------------------------------------------------------
   106 # R E S P O N S E
   117 # R E S P O N S E
   107 #-------------------------------------------------------------------
   118 #-------------------------------------------------------------------
   108 
   119 
   109 proc is_eol( s: string ): bool =
   120 proc is_eol( s: string ): bool =
   268     ## and a stomp **URI** string.
   279     ## and a stomp **URI** string.
   269     ##
   280     ##
   270     ## .. code-block:: nim
   281     ## .. code-block:: nim
   271     ##
   282     ##
   272     ##    var socket = newSocket()
   283     ##    var socket = newSocket()
   273     ##    var stomp  = newStompClient( socket, "stomp://test:test@example.com/%2Fvhost" )
   284     ##    var stomp  = newStompClient( socket, "stomp://test:test@example.com/vhost" )
   274     ##
   285     ##
   275     ## or if connecting with SSL, when compiled with -d:ssl:
   286     ## or if connecting with SSL, when compiled with -d:ssl:
   276     ##
   287     ##
   277     ## .. code-block:: nim
   288     ## .. code-block:: nim
   278     ##
   289     ##
   279     ##    var socket = newSocket()
   290     ##    var socket = newSocket()
   280     ##    let sslContext = newContext( verifyMode = CVerifyNone )
   291     ##    let sslContext = newContext( verifyMode = CVerifyNone )
   281     ##    sslContext.wrapSocket(socket)
   292     ##    sslContext.wrapSocket(socket)
   282     ##    var stomp = newStompClient( socket, "stomp+ssl://test:test@example.com/%2Fvhost" )
   293     ##    var stomp = newStompClient( socket, "stomp+ssl://test:test@example.com/vhost" )
   283     ##
   294     ##
   284 
   295 
   285     let
   296     let
   286         uri   = parse_uri( uri )
   297         uri   = parse_uri( uri )
   287         vhost = if uri.path.len > 1: uri.path.strip( chars = {'/'}, trailing = false ) else: uri.path
   298         vhost = if uri.path.len > 1: uri.path.strip( chars = {'/'}, trailing = false ) else: uri.path
   325         port = result.uri.port.parse_int
   336         port = result.uri.port.parse_int
   326 
   337 
   327     result.port = Port( port )
   338     result.port = Port( port )
   328 
   339 
   329     # Decode URI encoded slashes for vhosts.
   340     # Decode URI encoded slashes for vhosts.
   330     result.vhost = result.vhost.replace( "%2f", "/" ).replace( "%2F", "/" ).replace( "//", "/" )
   341     result.vhost = result.vhost.
       
   342         replace( "%2f", "/" ).
       
   343         replace( "%2F", "/" ).
       
   344         replace( "//", "/" )
   331 
   345 
   332 
   346 
   333 proc socksend( c: StompClient, data: string ): void =
   347 proc socksend( c: StompClient, data: string ): void =
   334     ## Send data on the connected socket with optional debug output.
   348     ## Send data on the connected socket with optional debug output.
   335     c.socket.send( data )
   349     c.socket.send( data )
   435     ## transaction ID, you'll need to add it yourself with the user defined
   449     ## transaction ID, you'll need to add it yourself with the user defined
   436     ## **headers**.
   450     ## **headers**.
   437 
   451 
   438     if not c.connected: raise newException( StompError, "Client is not connected." )
   452     if not c.connected: raise newException( StompError, "Client is not connected." )
   439     c.socksend( "SEND" & CRLF )
   453     c.socksend( "SEND" & CRLF )
   440     c.socksend( "destination:" & destination & CRLF )
   454     c.socksend( "destination:" & destination.encode & CRLF )
   441     c.socksend( "content-length:" & $message.len & CRLF )
   455     c.socksend( "content-length:" & $message.len & CRLF )
   442     if not ( contenttype == "" ): c.socksend( "content-type:" & contenttype & CRLF )
   456     if not ( contenttype == "" ): c.socksend( "content-type:" & contenttype & CRLF )
   443 
   457 
   444     # Add custom headers.  Add transaction header if one isn't manually
   458     # Add custom headers.  Add transaction header if one isn't manually
   445     # present (and a transaction is open.)
   459     # present (and a transaction is open.)
   446     #
   460     #
   447     var txn_seen = false
   461     var txn_seen = false
   448     for header in headers:
   462     for header in headers:
   449         if header.name == "transaction": txn_seen = true
   463         if header.name == "transaction": txn_seen = true
   450         c.socksend( header.name & ":" & header.value & CRLF )
   464         c.socksend( header.name & ":" & header.value.encode & CRLF )
   451     if not txn_seen: c.add_txn
   465     if not txn_seen: c.add_txn
   452 
   466 
   453     if message == "":
   467     if message == "":
   454         c.finmsg
   468         c.finmsg
   455     else:
   469     else:
   471     ##
   485     ##
   472     ## You may optionally add any additional **headers** the server may support.
   486     ## You may optionally add any additional **headers** the server may support.
   473 
   487 
   474     if not c.connected: raise newException( StompError, "Client is not connected." )
   488     if not c.connected: raise newException( StompError, "Client is not connected." )
   475     c.socksend( "SUBSCRIBE" & CRLF )
   489     c.socksend( "SUBSCRIBE" & CRLF )
   476     c.socksend( "destination:" & destination & CRLF )
   490     c.socksend( "destination:" & destination.encode & CRLF )
   477 
   491 
   478     if id == "":
   492     if id == "":
   479         c.socksend( "id:" & $c.subscriptions.len & CRLF )
   493         c.socksend( "id:" & $c.subscriptions.len & CRLF )
   480     else:
   494     else:
   481         c.socksend( "id:" & id & CRLF )
   495         c.socksend( "id:" & id & CRLF )
   484         c.socksend( "ack:" & ack & CRLF )
   498         c.socksend( "ack:" & ack & CRLF )
   485     else:
   499     else:
   486         if ack != "auto": raise newException( StompError, "Unknown ack type: " & ack )
   500         if ack != "auto": raise newException( StompError, "Unknown ack type: " & ack )
   487 
   501 
   488     for header in headers:
   502     for header in headers:
   489         c.socksend( header.name & ":" & header.value & CRLF )
   503         c.socksend( header.name & ":" & header.value.encode & CRLF )
   490     c.finmsg
   504     c.finmsg
   491     c.subscriptions.add( destination )
   505     c.subscriptions.add( destination )
   492 
   506 
   493 
   507 
   494 proc unsubscribe*( c: StompClient,
   508 proc unsubscribe*( c: StompClient,
   511         i = i + 1
   525         i = i + 1
   512 
   526 
   513     c.socksend( "UNSUBSCRIBE" & CRLF )
   527     c.socksend( "UNSUBSCRIBE" & CRLF )
   514     c.socksend( "id:" & $sub_id & CRLF )
   528     c.socksend( "id:" & $sub_id & CRLF )
   515     for header in headers:
   529     for header in headers:
   516         c.socksend( header.name & ":" & header.value & CRLF )
   530         c.socksend( header.name & ":" & header.value.encode & CRLF )
   517     c.finmsg
   531     c.finmsg
   518     c.subscriptions[ sub_id ] = ""
   532     c.subscriptions[ sub_id ] = ""
   519 
   533 
   520 
   534 
   521 proc begin*( c: StompClient, txn: string ): void =
   535 proc begin*( c: StompClient, txn: string ): void =
   705 
   719 
   706   ./stomp benchmark stomp://test:test@localhost/%2F /exchange/test
   720   ./stomp benchmark stomp://test:test@localhost/%2F /exchange/test
   707 
   721 
   708 It will set messages to require acknowledgement, and nack everything, causing
   722 It will set messages to require acknowledgement, and nack everything, causing
   709 a delivery loop for 10 seconds.
   723 a delivery loop for 10 seconds.
   710 If your vhost requires slashes, use URI escaping: /%2Ftest
   724 
       
   725 With older version of RabbitMQ, If your vhost requires slashes, you'll
       
   726 need to URI escape: /%2Ftest
   711 """
   727 """
   712 
   728 
   713 
   729 
   714     if paramCount() != 3: quit usage
   730     if paramCount() != 3: quit usage
   715 
   731 
   749                 messages.add( r )
   765                 messages.add( r )
   750                 case r.frame:
   766                 case r.frame:
   751                     of "RECEIPT":
   767                     of "RECEIPT":
   752                         discard
   768                         discard
   753                     of "MESSAGE":
   769                     of "MESSAGE":
   754                         let body = r.payload
   770                         discard r.payload
   755                         let id   = r[ "ack" ]
   771                         discard r[ "ack" ]
   756 
   772 
   757             proc seen_heartbeat( c: StompClient, r: StompResponse ) =
   773             proc seen_heartbeat( c: StompClient, r: StompResponse ) =
   758                 heartbeats = heartbeats + 1
   774                 heartbeats = heartbeats + 1
   759 
   775 
   760             stomp.message_callback   = receive_message
   776             stomp.message_callback   = receive_message