Converting Wheatley (Slack bot) to Azure Functions!

I’ve first created the basis of Wheatley (my Slack bot) a few years ago during a hackathon. Now, I’ve used it at multiple companies helping me with various ops processes as well as adding in any little functionality that will help my co-workers. Normally I run it in a VM (both Windows and Linux, being that it’s Go based I can compile for most things), but have recently decided to add an option of running it as an Azure Function.

I was impressed how trivially easy this was. Kudos to both Microsoft for finally making it easy to make a Go based Azure Function, but also to Slack for allowing various alternate APIs to integrate against.

For details about making an Azure Function in Go, please see my previous post. What I’d like to highlight here is the Slack specifics I had to do to get AFs working properly. The initial hurdle was that Wheatley only used the RTM (real time communications) protocol, which is basically a fancy was of saying websockets. Now, my main aim for using Azure Functions is that I only wanted to have it running when I needed it and not to have hosted compute always running and always having connections to the various clients. Fortunately, Slack has an alternate option called Event API. Events API is basically just falling back to the good old REST protocol…. given how infrequent the messages really are in Slack (in the big scheme of things) REST works for me nicely.

Jumping across to my Slack library of choice, Slack-go also provides Event API functionality as well as the already used RTM functions. Cool… no switching libraries!

Basically the way Wheatley is designed, very little of it is Slack specific. Sure, receiving and sending messages are obviously Slack specific but those are just some very small touch points. Most of the code is integrating with other systems which has absolutely nothing to do with Slack. So, let’s look at the older RTM based code.

api := slack.New(slackKey)
rtm := api.NewRTM()
go rtm.ManageConnection()

for msg := range rtm.IncomingEvents {
  switch ev := msg.Data.(type) {
  case *slack.MessageEvent:
    originalMessage := ev.Text
    sender := ev.User
    
    // lets just echo back to the user.    
    rtm.SendMessage(rtm.NewOutgoingMessage(originalMessage, ev.Channel)
  default:
  }
}

So, we have some pre-defined slackKey (no, not going to tell you mine), we establish a new RTM connection and basically just sit in a loop getting the latest message and replying. Obviously Wheatley does a lot more, see github for the exact details.

So effectively we just need something similar without the websockets shenanigans.

There’s a bit more handshake ceremony going on, but really not much. Instead of 1 token (above called slackKey) there are 2. The one already mentioned and another called the verification token. This token is used to confirm that the messages you’re receiving are actually for this particular instance of the bot.

Fortunately our HTTP handle func is the same type we’re all used to in Go. The highlights of the function are follows:

var slackApi = slack.New(token)   // same token as RTM... 
func slackHttp( w http.ResponseWriter, r *http.Request) {

  // read the request details.
  buf := new(bytes.Buffer)
  buf.ReadFrom(r.Body)
  body := buf.String()
    
  // ug, hate the wordpress formatting, but basically we're using the 
  // Slack-go API to parse the 
  // event we've received. In this part we also confirm that the 
  // VerificationToken we received matches
  // the one we already have from the Slack portal (variable 
  // verificationToken)
  eventsAPIEvent, e := slackevents.ParseEvent(json.RawMessage(body), 
    slackevents.OptionVerifyToken( 			
    &slackevents.TokenComparator{VerificationToken: 
    verificationToken}))
  if e != nil {
        
    // verification token not matching... bugger off.
    w.WriteHeader(http.StatusUnauthorized)
    return
  }
    
  // Check that we're the bot for this acct.
  // Taken directly from the slack-go event api example :)
  if eventsAPIEvent.Type == slackevents.URLVerification {
    var r *slackevents.ChallengeResponse
    err := json.Unmarshal([]byte(body), &r)
    if err != nil {
      w.WriteHeader(http.StatusInternalServerError)
    }
    w.Header().Set("Content-Type", "text")
    w.Write([]byte(r.Challenge))
  }
    
  // Dealing with the message itself.
  if eventsAPIEvent.Type == slackevents.CallbackEvent {
    innerEvent := eventsAPIEvent.InnerEvent
    switch ev := innerEvent.Data.(type) {
    case *slackevents.MessageEvent:

      // return 200 immediately... according to https://api.slack.com
           /events-api#prepare
      // otherwise if we dont return in 3seconds the delivery is 
      // considered to have failed and we'll get another
      // message. So can return 200 immediately but then the code that 
      // processes the messages can
      // return their results later on
      w.WriteHeader(http.StatusOK)
            
      originalMessage := ev.Text
      sender := ev.User
            
      // again, we'll just echo it back.
      slackApi.PostMessage(ev.channelID, slack.MsgOptionText( 
        originalMessage, false))
    }
  }
}  
  
    
   

If you’re interested in the real Wheatley version (that’s wrapped in the Azure Function finery) then check on github.

The most awkward part is getting the bot permissions correct in the Slack Portal. So far for the basic messaging I’m needing the permissions of: users:read, app_mentions:read, channels:history, chat:write, im:history, mpim:history, mpim:read are useful. These are set in both the Event API part of the portal and the OAuth section.

After a few more days of testing this out on my private Slack group I think Slack + Wheatley + Azure Functions are ready to be unleashed on my co-workers ๐Ÿ™‚

Remote execution of Powershell on Azure VM

From an operations point of view remotely executing commands on a machine is critical for anything beyond a few machines. In the Windows world the way I’ve usually done this is allowing remote powershell…. but I’ve recently realised (I’m slow on the uptake) that I can do this the Azure CLI. If I can do it with the Azure CLI (az) it means there is a REST API… If there is a REST API it means I can tinker.

Proceed with the tinkering!!

First thing’s first. The az command to achieve this is:

az vm run-command invoke --command-id RunPowerShellScript --name my-vm-name -g my-resourcegroup  --scripts 'echo \"hello there\" > c:\temp\ken'

Now the fun bit. To run some arbitrary bit of powershell ( which is scary enough ) the REST endpoint is :

https://management.azure.com/subscriptions/xxxx/resourceGroups/xxxxx/providers/Microsoft.Compute/virtualMachines/xxxxxx/runCommand?api-version=2018-04-01

with the usual substitution in of subscriptionID, resource group name and VM name.

You POST to the above URL with a body in the format of:

{"commandId":"RunPowerShellScript","script":['<powershell>']}

So the powershell could be the REAL commands…. format a drive, start the bitcoin miner etc etc…. OR… in my case I simply want to execute the powershell that has already been installed on the remote machine and has been verified as safe ๐Ÿ™‚

I’m going to incorporate this remote execution into some tools I maintain for my own use (all in Go), so the entire program boils down to something pretty small. Firstly auth against a service principal in Azure, then with the generated token execute the POST. Auth with Azure is simplified with using my tiny AzureAuth project :

azureAuth := azureauth.NewAzureAuth(subscriptionID, tenantID, clientID, clientSecret)
azureAuth.RefreshToken()
template := "https://management.azure.com/subscriptions/%s/resourceGroups/%s/providers/Microsoft.Compute/virtualMachines/%s/runCommand?api-version=2018-04-01"

url := fmt.Sprintf(template, ps.subscriptionID, resourceGroup, vmName)

bodyTemplate := "{\"commandId\":\"RunPowerShellScript\",\"script\":['%s']}"
script := "c:/temp/myscript.ps1"
body := fmt.Sprintf(bodyTemplate, script)

req, _ := http.NewRequest("POST", url, strings.NewReader(body))
req.Header.Add("Authorization", "Bearer " + azureAuth.CurrentToken().AccessToken)
req.Header.Add("Content-type", "application/json")
client := http.Client{}
client.Do(req)

This is obvious with all the error checking removed etc (just to reduce the clutter here).

Now, one important thing to remember. If you want immediate (or even just vaguely quick-ish) execution of your scripts, executing Powershell via Azure REST APIs is NOT the way to achieve this. Even a simple Powershell script to write a hello world file might take 20 seconds or so.

The benefit of this (to me at least) is not enabling general remoting to the Azure VM, no fiddling with firewall rules etc etc. It’s using the Azure REST API (that I use for SOOO many other reasons), using the same type of authentication, same way to integrate into my other tools. It mightn’t fit everyones need but I think this will definitely be my remoting process going onwards (for Azure VMs)

See here for simple implementation. Don’t forget to create a service principal and assign it VM Contributor rights to the VM you’re trying to remote to!

Azure Functions and GO UPDATED!

FINALLY

Note: I forgot to mention that so far, due to a bug in the Go runtime, I can only get binaries created from Go 1.13 to work. Have filed a bug and will see how things go.

Azure has finally made it possible to write Azure Functions in any (well virtually) language you like. Yes, it’s just a redirection layer to your pre-compiled executable/script but hey, it works…. and it works with GO ๐Ÿ™‚

Firstly, you’ll want to read about Azure Custom Handlers , also if you’re interested in Go, check out the samples . The samples include triggers for HTTP, Queues, Blobs etc. For now, I just want to focus on the HTTP triggers. They have seriously made this so easy, in particular running locally vs in an Azure Function is literally a line or two of changes.

Firstly, the image on the Azure Custom Handlers needs to be studied before we go anywhere.

Basically the Functions Host is just a redirection layer to our REAL function, which is basically a webserver. Yes, this is a hack… no question about it… BUT… it’s exactly what I’m after. We can use ANY language we want, as long as it handles HTTP request. Yes, there are overheads compared to not having this indirection layer, but really, I’m WAY more than satisfied with this approach. I’m just glad they didn’t insist we all use Docker containers for everything.

So, as long as we can run a tiny webserver we’re good. Fortunately, Go (and most languages out there these days) come with half decent HTTP servers built in.

For a simple Go example, I’m using:

package main

import (
	"fmt"
	"log"
	"net/http"
	"os"
)

func doProcessing( w http.ResponseWriter, r *http.Request) {
  fmt.Fprintf(w,"testing testing 1,2,3")
}

func main() {
	port, exists := os.LookupEnv("FUNCTIONS_HTTPWORKER_PORT")
	if !exists {
		port = "8080"
	}
	http.HandleFunc("/testfunction", doProcessing)
	log.Fatal(http.ListenAndServe(":"+port,nil))
}

It simply responds to a GET request to /testfunction with a string. Not exciting, but it will do for now. You see that the only change between local and Azure Function versions is the port. If the environment variable FUNCTIONS_HTTPWORKER_PORT exists, then it will use that as the port number, otherwise defaults to 8080 for local env.

Next there are 2 required files, host.json which basically says how the AzureFunction will run, ie what’s the executable that’s going to be the webserver. Mine is:

{
  "version": "2.0",
  "extensionBundle": {
    "id": "Microsoft.Azure.Functions.ExtensionBundle",
    "version": "[1.*, 2.0.0)"
  },
  "httpWorker": {
    "description": {
      "defaultExecutablePath": "azurefunctiongo.exe"
    }
  }
}

Where azurefunctiongo.exe is the executable generated from the above Go code.

Finally there is function.json, this is the binding about a particular function. So in my case I was interested in a HTTP trigger, so my function.json looked like:

{
  "bindings": [
    {
      "authLevel": "anonymous",
      "type": "httpTrigger",
      "direction": "in",
      "name": "req",
      "methods": ["get", "post"]
    },
    {
      "type": "http",
      "direction": "out",
      "name": "res"
    }
  ]
}

It can handle GET and POST (although my code is currently only GET). Incoming is HTTP and output is also HTTP. You might want situations where the input (ie trigger) is HTTP but the output is putting a message onto an Azure Queue for example. The Azure Customer Handler page linked above covers all of this.

Now, all of these just get uploaded to the usual wwwroot of an App Service Plan (will automate that soon) and away you go! (note, make sure you the exe and host.json are in the wwwroot directory, and the function.json is in a subdirectory of wwwroot called whatever your endpoint is called, in my case testfunction)

Now that Azure has this going, I can see SOOOOO many use cases for my projects. Thankyou thankyou thankyou Azure!!!

Github Actions in Go

After a little tinkering about with Github Actions, I decided to write one to scratch an itch I’d been having at work. Github actions react to various events , one of which is Github wiki updates (awesomely named as Gollum events). Unfortunately the various actions I tried from the market place (which take events and then ping slack with details) simply sent a message saying a wiki page was updated and never actually said WHICH page was updated. So I decided it was probably worth while to write up my own to learn how Github Actions work under the covers.

Now, most of what I learned I got off Jacob Tomlinsons great post. I won’t reproduce what he already covers (far better than I could), but just wanted to list a few points that took me a little to realise.

Firstly, it’s in the YML file (an action is configured by yaml… I wont comment) that event types are configured. So in the case of reading wiki/gollum events, the yaml file will have an "on: gollum" entry. This means that the rest of the action (which is compiled on each execution) won’t actually get run except when it’s really needed.

Secondly, if you just want to run something when a particular event is triggered you don’t actually need to read the payload (with a TONNE of info in it). This is what a lot of actions seem to do, but for my situation where I needed to know which wiki page was updated, I needed the specifics. For the specifics you’ll need to read an environment variable (see big list) called GITHUB_EVENT_PATH. This is a path to a file that you then read, and this file has all the specifics of the event triggered. Although some might scoff at the relatively low tech approach to this, I really like it. Read filename from environment variable, then read the file. Easy peasy.

The file is simply JSON (see the events link earlier) and you can deserialise into a class/struct/whatever-suits-your-language easily enough. From there, grab the info you want and pass that to Slack. In the case of Go, the wonderfully useful Json-to-Go site makes this trivial.

Now that I’ve got to grips with writing actions in Go, I’m certain I’ll be writing more. There are many cases where I can picture them being VERY useful in my day-to-day work.

The particular action I wrote up is precious

When “good enough” is really good enough

We all (sweeping generalisation) get analysis paralysis at times. Or at least I think we (devs) all do. I know I certainly get it way too often for me to be comfortable. I’ve always been around REALLY good developers…. people that can out-code me in a minute, and that is REALLY intimidating. They come up with solutions that appear (and possibly are) complicated and well thought out. They’re breaking systems up into nice controller, services, data layers etc etc with what seems like endless abstractions. They’re good… they’re the top devs in the places… what am I not "getting" to see that all of this work is absolutely needed. Yes, they might know that the design they’ve come up with will be perfect for when addition 1,2 and 3 come along.

But what if the additions don’t come? Or they do, but (cue The Imperial March) "the business" have skewed the requirements enough that they appear very different to what was initially talked about.

I’ve ALWAYS preferred simple code rather than clever code, even if it makes me look foolish in front of other developers (probably not a great career move… but hey 23 years in and I’m still going ok). In C# I’d prefer to start with basic loops and THEN figure some uber-l33t LINQ to come up with the same results, although more often than not I’d just stick with the for/while loops. To me, they’re definitely more readable and understandable than the 3 line LINQ expression. But I certainly know other devs that instantly go to LINQ and know it inside and out (like I said, they’re far better than I am).

But… in saying all this, I came across a tweet recently by Alex Gurney ( https://twitter.com/ajtgurney/status/1230580904944439301 ). Unsure how Alexs tweet got on my timeline, but the thought about a "2 hour hack" really started to speak to me. Yes, if we KNOW …. and I mean REALLY know that what we’re coding up is just the basis for something more complex in the next sprint/iteration then yes, obviously put more thought into the design. But if a set of requirements are "do XYZ, take this input… output looks like this…. etc " then I’m starting to wonder is the 2 hour hack really a bad thing?

Of course there are many situations where this would absolutely be irresponsible to do, but equally I can picture MANY classes/services that I’ve seen (or written) where banging something out would have equally done just as well.

Then we need to address the infamous 2 x four letter words… "tech" and "debt". If we just "hack" absolutely everything then we’d probably be sitting on a house of cards, and as soon as something major needs to get worked on we’ll be stuck in a bad position. I think of "hacks" in 2 ways. The first is the less pleasant version…. do whatever it takes to get functional code (functional for the specs we have NOW). This is usually falls under the Perl category of coding (as I call it)…. made sense when you wrote it… come back a week later and it’s all WTF?!?!

The second option is where the solution is the simplest possible (within a small timeboxed period) that can be designed and coded up and isn’t refactored to be the smallest, tightest most elegant code you can create. To me the key thing is SIMPLE. I’m not talking lines of code (since I personally find fewer lines of code != simple), but I’m talking concepts… something my brain can handle.

Stick to simple functions, simple loops, don’t go and start making a bunch of abstractions/interfaces/wrappers that you think you might need in the future. The number of times I’ve seen DB or storage wrappers to abstract away the platform specifics and only end up with a SINGLE concrete implementation. The problem is, I sometimes feel I’m in the minority here. What I find simple and readable others complain isn’t. They’d prefer to see a 1 liner calling map/fold/whatever but I know I’ve certainly experienced issues debugging a map call where 1 entry in a collection has caused the mapping function to blow up. Personally I find a for-loop easier to debug, but maybe that’s just me.

Just as I’m about to post this… I see Nick Craver come up with a perfect summary of how I feel, "Don’t let perfection stand in the way of a net win." https://twitter.com/Nick_Craver/status/1231591808007798784 He mightn’t be talking about 2 hour hack, but I still like the tweet ๐Ÿ™‚

Maybe the readability thing is why I find Go such an attractive and productive language. Yes, you can argue lack of map/fold etc and the constant err checking causes more boiler plate, but seriously that boiler plate takes a few mins to write out and then is just left there to run.

UDP VPN Load testing fun :)

Recently I was required to load test a new VPN system being installed. The VPN in question was being used to handle VOIP calls, so we needed crystal clear connections without any lag, stuttering or badness ™.

The initial set of requirements was that we’d need to be able to handle 65000 bps (UDP) per VPN connection with a minimum of 1000 concurrent VPN connections. There was no tool that easily fitted the bill for this (at least none that we could find), so here was an opportunity to roll our own. Overall this isn’t a lengthy process but definitely has a few gotchas.

Firstly, how to create 1000 VPN connections. Ideas ranged from spinning up 1000 VMs (which would auto load the VPN client), through to Docker and then finally seeing if we can really fire up that number of connections on a single machine. As long as the client machine could be able to create that number VPN tunnels I was sure the actual pumping of data shouldn’t be an issue. Now, for testing a VPN device it shouldn’t matter about the OS on the 2 machines that are communicating via it…. right? Although these days I’m more of a Windows person than a Linux one, I thought there was more chance of multiple VPN clients working on Linux.

Though endless research, experimentation and intellectual debate with others (ok, so I read StackOverflow…), it turns out setting up the VPN requires 2 steps. The first is to actually perform the VPN connection (in my case using OpenConnect). The key after that is setting up the routing properly. I’m no networking god, so this stumped me for ages. But after the OpenConnect stage, the key commands are:

sudo ip route add $destserverip dev $interface src $ipaddr table $table 

sudo ip route add default via $ipaddr table $table 

sudo ip rule add from $ipaddr table $table

Where $destserverip is "server" listening IP address, $ipaddr is the IP for a specific VPN interface and $table is a brand new table name (table1, table2, table3 etc) specific for $ipaddr.

So the above 3 lines are executed 1000 times (or however many VPN connections you make).

Once we have the VPN connections setup, we have to setup the "server" (listening service) on a box/VM and the client application that will send traffic across all VPN connections.

The "load test application" (glorified UDP pump) is written in Go (of course) and covers both server and client functionality. It’s a very simply program, from a server point of view. In it’s most simplistic form it really is just a single goroutine that accepts a UDP packet, tallies how many it’s received…. and well, that’s it. Not really news worthy. In a slightly more complex form (since this was written to test VOIP, and usually conversations are bi-directional), it spins up a separate goroutine for each incoming IP address (denoted by the VPN tunnel) and responds to the originator by echoing back exactly the same packet as it just received.

Not particularly riveting stuff, but it does the job. Accepts all the incoming traffic (from 1000+ clients and informs us of the incoming network volume).

The client side of the code is far more interesting. There are a number of factors that we want to tweak while performing these tests, bits per second, number of bits/bytes/kb per UDP packet, number of "clients" we want to simulate, number of VPN connections we want to use while simulating those clients (most of the time #clients and #VPN connections will be the same), duration of tests etc etc.

Basically the client spins up "num-clients" goroutines, each using (in most scenarios) a unique network (VPN) interface. From there each goroutine simply loops based on the input parameters (duration of test, # packets etc etc). For the tests performed, the client machine (basic 2 core VM) was able to generate enough voice quality VPN traffic to simulate 1100 user and still be mostly idle.

Usage:

โ€‹ on server:

โ€‹ ./udptool_linux_amd64 -server

โ€‹ on client:

โ€‹ ./udptool_linux_amd64 -minclientaddr "192.168.0.0.1" -noclientaddr 1000 -host "1.2.3.4:5001" -noclients 1000 -seconds 6000 -verbose -bps 65000

Broadly, the explanation of the above:

-minclientaddr is the โ€œlowestโ€ IP address generated by OpenConnect.

-noclientaddr specifies how many IP addresses (VPN tunnels) the client app should attempt to connect to. When determining which IP each client should use, it is basically a loop starting at โ€œminclientaddrโ€ and incrementing from there.

-host is the server IP:Port. By default when running udptool as server, the port is 5001.

-noclients is the number of clients we should try and simulate. Most of the time this would be the same as -noclientaddr but sometimes (if you want to double up clients on a single client IP) they could differ

– seconds How many seconds the test should run for.

– verbose Gives updates on when requests are sent and more importantly how often the client sleeps between sending packets. Given udptool is used to send X bytes per second, once itโ€™s sent the data it sleeps until it next needs to run. How long it sleeps is displayed on the console. Most of the time it will sleep for 980+ ms (ie it spends most of itโ€™s time sleeping). If it is ever observed that itโ€™s NOT sleeping most of the time, it means the machine is overworked/underspecced.

-bps 65000 says to send 65000 BITS per second per client.

If anyone is interested in trying it out, please grab it for github.

EventStore and Go

Eventstore is a great event sourcing system for those situations where you want realtime (ish) events to process as opposed to querying a regular RDMS.

Eventstore is written in C# and a lot of the integrations I’ve seen/used are mainly in the .NET space. My favourite hammer of choice is Go, so I wanted to see how easily I could create something useful out of Eventstore and the client libs available.

My Go client lib of choice is go-gesclient which in turn is a port of the .NET client lib. I’ve found it REALLY easy to use and (so far) not buggy ๐Ÿ™‚

The plan was to create a service/server that registers with EventStore and receives a stream of events. The service can have a number of different "EventProcessors" (EPs) which in turn receive the events and process them. Each EP is only interested in a subset of all events (identified by a field called an EventType, or ET). If an EP gets an event of the appropriate EventType then it lets it through and "processes" it. What processing actually IS, is up to the reader…. add it to a running total, calculate a hash from the input, perform DB queries based on the event details etc… ANYTHING.

The most important aspect of the service was that it would need to be very easily extendable. Being able to add another EP trivially would be key.

We have a base interface that any EP would need to satisfy.

type EventProcessor interface {
    ProcessEvent(event *client.ResolvedEvent) error
    GetRegisteredEventTypes() []string
    GetProcessorName() string
    CanSkipBadEvent() bool
    GetLastEventNumber() int
    SetLastEventNumber(pos int) error
}

Let’s go through what each of these interface definitions are about.

ProcessEvent(event *client.ResolvedEvent) error

ProcessEvent is really the main definition in the interface. It takes an event (provided by EventStore) and produces a (hopefully nil) error. This is where we can do whatever we want with the incoming data/event. Ignore it, write it to a database, ROT13 it… whatever we want.

GetRegisteredEventTypes() []string

EventStore can have any number of EventTypes (hey, they’re just strings), so each EventProcessor needs to indicate which ET it’s interested in. In this case GetRegisteredEventTypes will simply return a list of strings which represent which ETs this EventProcessor will respond to. Any number of EPs can register for the same EventTypes. eg. We might have a EP that generates a running total of all "CreditDollarAmount" ET. Where-as another EP might be working on the largest transaction for the day, using the exact same Events being passed in. Each EP will get it’s own copy of the event and will not tread-on-eachothers-toes.

GetProcessorName() string

Simply returns the EP name.

CanSkipBadEvent() bool

This is an interesting one. With event processing in general, the idea is that there is a constant stream of data that can be processed with some system. Events in this stream cannot (should not) be modified. It is also considered very important that each event is processed in the correct order. eg, if it was a banking system, it’s important to have the credit event happen before you start processing the debits (at least my bank thinks so!). Although that is mostly true, some events might be considered "useful to know, but not critical". For example, if the events were regarding average CPU used in the last 5 minutes, if for some reason we cannot process an event with that CPU data (maybe something got corrupted in transit?) we shouldn’t just stop the CPU EP and refuse to do any more work. This is case where skipping an event (unlike banking) would be considered ok. So each EP can determine if it is allowed to skip unprocessed events or not.

GetLastEventNumber() int

Related to the above, we need to determine which event we’re actually up to. Whenever a client connects to Eventstore it needs inform the server which event number it is up to. In theory, every time a client starts it could start from event 1 and simply reprocess the events. This may either a) not be possible (bank trsmansactions anyone?) or b) computationally expensive. Instead, it is up to the client to remember which event it last processed and store it somewhere on persistent storage. Then when the client starts up again, it can refer to the persistent storage and then tell the Eventstore server where to continue the data feed from. This is a very hand wavy way of saying, this is what GetLastEventNumber() is for ๐Ÿ™‚

SetLastEventNumber(pos int) error

Sets the above. Wow, that was shorter ๐Ÿ™‚

So every EventProcessor needs to implement all of the above methods. Fortunately most of them are implemented by the base type since they’re just elaborate getters/setters for some basic variables defined in the EP.

Currently, when using the EventProcessor framework, it is restricted (currently) to a processing a single Eventstore stream. An Eventstore server can handle multiple streams at once (all related to different events/data). Say for example, we wanted to create a client executable that received an Eventstore stream called "Sales" and wanted to create an EventProcessor that simply displayed the incoming amount of Sales (from the event) as well as the running total.

Firstly, let’s create the SalesEventProcessor:

package ep

import (
	"encoding/json"
	"fmt"
	"github.com/jdextraze/go-gesclient/client"
	"github.com/kpfaulkner/eventprocessor/pkg/eventprocessors"
)

const (
	SalesEventId = "SalesEventId"
)

type IncomingAmount struct {
	Amount int `json:"amount"`
}

type SalesEventProcessor struct {
	eventprocessors.BaseEventProcessor
	totalSales int
}

func NewSalesEventProcessor() SalesEventProcessor{
    s := SalesEventProcessor{}
    s.EventTypes = []string{ SalesEventId}
    s.ProcessorName = "SalesEventProcessor"
    s.CanSkipEvent = true
    s.totalSales = 0
    return s
}

func (s *SalesEventProcessor) ProcessEvent(e *client.ResolvedEvent) error {
	amount := IncomingAmount{}
	json.Unmarshal(e.Event().Data(), &amp;amount)
	s.totalSales += amount.Amount
	fmt.Printf("Incoming sales amount %d, Total sales amount %d\n", amount.Amount, s.totalSales)
	return nil
}

Let’s look through what we have. Package, imports and const are pretty self explanatory. IncomingAmount is just a type that has the format of the incoming Eventstore event; single int called "Amount". SalesEventProcessor embeds the BaseEventProcessor, which is a requirement for all EventProcessors. The only custom part of SalesEventProcessor is the running total we have.

Next, the usual Go convention for newing up structs. The three important parts are:

  • Set EventTypes. This is basically the registration of what events this EP is going to get called for.
  • ProcessorName… because having "foo" everywhere sucks.
  • CanSkipEvent … for now set to true… but YMMV

Now the important bit… ProcessEvent. Given this is an utterly simplistic EP, hopefully nothing comes as a shock here. We unmarshal the incoming event data into an IncomingAmount struct. We add the amount to the running total, and print out the incoming and total. Nothing radical here.

In this particular scenario, we have an EventProcessor just working on a single event type. What if we have multiple? Just add the conditional logic in the ProcessEvent function and take it from there..

eg.

func (s *SalesEventProcessor) ProcessEvent(e *client.ResolvedEvent) error {
	switch e.Event().EventType() {
	case SalesEventId:
		return s.ProcessEventSalesEventId(e)
	case SalesEventId2:
		return s.ProcessEventSalesEventId2(e)
	}
	return nil
}

So now we have an EventProcessor, how do we use it? A minimal example can be:

package main

import (
	"fmt"
	"github.com/kpfaulkner/eventprocessor/cmd/ep"
	"github.com/kpfaulkner/eventprocessor/pkg/eventprocessors"
	"github.com/kpfaulkner/eventprocessor/pkg/streamprocessor"
)

func main() {
    // where the tracking DB will be stored.
	trackerPath := "c:/temp/mytracker"
	processor := ep.NewSalesEventProcessor()
  	l := []eventprocessors.EventProcessor { &amp;processor}
   	err := streamprocessor.StartProcessingStream("Default",trackerPath, l)
  	if err != nil {
  		fmt.Printf("Couldn't start StartProcessingStream %s\n", err.Error())
  		return
  	}
}

Here we simply create an array of EventProcessors (in this case an array of a single SalesEventProcessor). We then call StartProcessingStream passing in the stream in question (in this case "Default") and the EventProcessor array that will process the stream. This function won’t return until all processes have exited, which should be an anomaly since it should run continuously except in the case of serious errors.

You may have noticed that I haven’t mentioned CQRS (damn, just did it there!). That will be for a later day…. currently I’m enjoying not breaking my brain but purely having access to a reliable stream of events to process.

Currently this is all a work in progress, but so far I’m extremely impressed with Eventstore in combination with the go-gesclient library. Making a little framework to use these together is really quick and easy to do, I HIGHLY recommend it!

If you would like to try it out, please check it out on github.