BETA

vmware/transport-go

Transport is a full stack, simple, fast, expandable application event bus for your applications. It provides a standardized and simple API, implemented in multiple languages, to allow any individual component inside your applications to talk to one another. This is the Golang implementation of the Transport library.


52
2
11 months ago
v1.3.4
GitHubWebsite

Transport. An event bus for Go

Transport Post-merge pipeline codecov

Transport is a full stack, simple, fast, expandable application event bus for your applications. It can transport anything you want around your application, as well extend different channels to brokers and destinations. Transport makes it easy to move your bits around localized or distribiuted apps, without worrying about all the wiring.

What does that mean?

Transport is an event bus, that allows application developers to build components that can talk to one another, really easily. It provides a standardized and simple API, implemented in multiple languages, to allow any individual component inside your applications to talk to one another.

It really comes to life when you use it to send messages, requests, responses and events around your backend and front-end. Your backend can stream messages to your UI, or other services or agents, as if they were sitting right next to each other, You can build one to one, many to one and many to many topologies with ease.

Channels can be extended to major brokers like Kafka or RabbitMQ, so Transport becomes an 'on/off-ramp' for your main sources of truth.

Watch this quick video for an overview

Quick Transport Overview

Transport for Go also comes with plank

plank is a micro platform for building just about anything you can think of, it exposes RESTful and AsyncAPI & Pub/Sub endpoints for services that can do just about anything. Read more about plank


Getting Started

View All Transport (Golang) Documentation

Other versions of Transport

Transport Site

https://transport-bus.io


Quick Start

Install transport

go get -u github.com/vmware/transport-go

To create an instance of the bus

import 	"github.com/vmware/transport-go/bus"

var transport = bus.GetBus()

Transport is a singleton, there is (should) only ever a single instance of the bus in your application.


Managing / Creating Channels

The ChannelManager interface on the EventBus interface facilitates all Channel operations.

channelManager := transport.GetChannelManager()

Creating Channels

The CreateChannel method will create a new channel with the name "some-channel". It will return a pointer to a Channel object. You don't need to hold on to that pointer if you don't want to. The channel will still exist.

channel := channelManager.CreateChanel("some-channel")

Simple Example

A simple ping pong looks a little like this.

// listen for a single request on 'some-channel'
ts := bus.GetBus()
channel := "some-channel"
ts.GetChannelManager().CreateChannel(channel)

// listen for a single request on 'some-channel'
requestHandler, _ := ts.ListenRequestStream(channel)
requestHandler.Handle(
   func(msg *model.Message) {
       pingContent := msg.Payload.(string)
       fmt.Printf("\nPing: %s\n", pingContent)

       // send a response back.
       ts.SendResponseMessage(channel, pingContent, msg.DestinationId)
   },
   func(err error) {
       // something went wrong...
   })

// send a request to 'some-channel' and handle a single response
responseHandler, _ := ts.RequestOnce(channel, "Woo!")
responseHandler.Handle(
   func(msg *model.Message) {
       fmt.Printf("Pong: %s\n", msg.Payload.(string))
   },
   func(err error) {
       // something went wrong...
   })
// fire the request.
responseHandler.Fire()

This will output:

🌈 Transport booted with id [e495e5d5-2b72-46dd-8013-d49049bd4800]
Ping: Woo!
Pong: Woo!

Connecting to a message broker and using galactic channels

You can see this all working live in some of our interactive demos for Transport TypeScript. it shows Transport acting as both client and server, in which we use plank to run services, and the UI subscribes to those services and talks to them.

We have a live and running instance of plank operating at transport-bus.io. You can try the example code below to use the sample simple stream service and see how simple it is for yourself.

Simple Stream Example

// get a pointer to the bus.
b := bus.GetBus()

// get a pointer to the channel manager
cm := b.GetChannelManager()

// create a broker connector config and connect to 
// transport-bus.io over WebSocket using TLS.
config := &bridge.BrokerConnectorConfig{
    Username:   "guest",            // not required for demo, but our API requires it.
    Password:   "guest",            // ^^ same.
    ServerAddr: "transport-bus.io", // our live broker running plank and demo services.
    UseWS:      true,               // connect over websockets
    WebSocketConfig: &bridge.WebSocketConfig{ // configure websocket
        WSPath: "/ws", // websocket endpoint
        UseTLS: true,
        // use TLS/HTTPS. When using TLS, you can supply your own TLSConfig value, or we can
        // generate a basic one for you if you leave TLSConfig empty. In most cases, 
        // you won't need to supply one.
    }}

// connect to transport-bus.io demo broker
c, err := b.ConnectBroker(config)
if err != nil {
    utils.Log.Fatalf("unable to connect to transport-bus.io, error: %v", err.Error())
}

// create a local channel on the bus.
myLocalChan := "my-stream"
cm.CreateChannel(myLocalChan)

// listen to stream of messages coming in on channel, a handler is returned 
// that allows you to add in lambdas that handle your success messages, and your errors.
handler, _ := b.ListenStream(myLocalChan)

// mark our local 'my-stream' myLocalChan as galactic and map it to our connection and 
// the /topic/simple-stream service
err = cm.MarkChannelAsGalactic(myLocalChan, "/topic/simple-stream", c)
if err != nil {
    utils.Log.Fatalf("unable to map local channel to broker destination: %e", err)
}

// collect the streamed values in a slice
var streamedValues []string

// create a wait group that will wait 10 times before completing.
var wg sync.WaitGroup
wg.Add(10)

// keep listening
handler.Handle(
    func(msg *model.Message) {

        // unmarshal the message payload into a model.Response object
        // this is a wrapper transport uses when being used as a server, 
        // it encapsulates a rich set of data about the message, 
        // but you only really care about the payload (body)
        r := &model.Response{}
        d := msg.Payload.([]byte)
        err := json.Unmarshal(d, &r)
        if err != nil {
            utils.Log.Errorf("error unmarshalling request: %v", err.Error())
            return
        }
        // the value we want is in the payload of our model.Response
        value := r.Payload.(string)

        // log it and save it to our streamedValues
        utils.Log.Infof("stream ticked: %s", value)
        streamedValues = append(streamedValues, value)
        wg.Done()
    },
    func(err error) {
        utils.Log.Errorf("error received on channel: %e", err)
    })

// wait for 10 ticks of the stream, then we're done.
wg.Wait()

// close our handler, we're done.
handler.Close()

// mark channel as local (unsubscribe from all mappings)
err = cm.MarkChannelAsLocal(myLocalChan)
if err != nil {
    utils.Log.Fatalf("unable to unsubscribe, error: %e", err)
}

// disconnect
err = c.Disconnect()
if err != nil {
    utils.Log.Fatalf("unable to disconnect, error: %e", err)
}

// return what we got from the stream.
return streamedValues

You can see this simple example here

Read More Golang Documentation


What is plank?

plank is 'just enough' of a platform for building just about anything you want. Run RESTful and AsyncAPIs with the same code, build simple or complex services that can be exposed to any client in any manner. Talk over WebSockets and pub/sub and streaming, or call the same APIs via REST mappings. plank can do it all. It's tiny, super-fast and runs on any platform. Runs in just a few megabytes of memory, and can be compiled down to the same. It can be used for micro-services, daemons, agents, local UI helper applications... anything!

plank is only available in transport-go

Take a look at plank


Contributing

The transport-go project team welcomes contributions from the community. Before you start working with transport-go, please read our Developer Certificate of Origin. All contributions to this repository must be signed as described on that page. Your signature certifies that you wrote the patch or have the right to pass it on as an open-source patch. For more detailed information, refer to CONTRIBUTING.md.

License

BSD-2-Clause

Copyright (c) 2016-2021, VMware, Inc.

Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:

  1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
  2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.