# Borrower's Risk Factor Analytics

Arkis provides a WebSocket API that delivers real-time maintenance updates on borrowed positions, including position details and the calculated risk factor. It uses the WebSocket protocol provided by the [Centrifuge](https://github.com/centrifugal/centrifuge) library. To receive real-time maintenance updates, use an [official Centrifuge client.](https://github.com/centrifugal/centrifuge?tab=readme-ov-file#real-time-sdk) You will also need to generate an API key in the Arkis App settings and present it as a credential when connecting with the Centrifuge client.&#x20;

Step-by-step instructions on how to create and revoke an **API key** are described [on that page](https://docs.arkis.xyz/home/for-developers/arkis-api/api-keys).&#x20;

The sections below walk through each step and provide relevant development documentation.

## Integrating the API with the Centrifuge Client

Now that your token has been generated, you can use one of the [official Centrifuge clients](https://github.com/centrifugal/centrifuge?tab=readme-ov-file#real-time-sdk) to subscribe to Arkis real-time maintenance updates.

### Prerequisites

#### Connection info

<mark style="background-color:purple;">URL</mark>  `wss://api.arkis.xyz/e/v1/ws`&#x20;

<mark style="background-color:purple;">Channel</mark> `maintenance:wallet:{owner_wallet}`

{% hint style="warning" %}
The wallet address must be checksummed: [more info](https://coincodex.com/article/2078/ethereum-address-checksum-explained/).
{% endhint %}

#### Tested client versions

| Language   | Client version                                                               | Runtime version |
| ---------- | ---------------------------------------------------------------------------- | --------------- |
| JavaScript | [v5.5.2](https://www.npmjs.com/package/centrifuge/v/5.5.2)                   | NodeJS v22.18.0 |
| Go         | [v0.10.11](https://pkg.go.dev/github.com/centrifugal/centrifuge-go@v0.10.11) | go1.23.12       |
| Python     | [v0.4.1](https://pypi.org/project/centrifuge-python/0.4.1/)                  | Python 3.13.7   |

{% hint style="warning" %}
If you're using NodeJS version earlier than **v22.4.0,** it does not include a built-in WebSocket implementation, so one must be provided manually: [more info](https://github.com/centrifugal/centrifuge-js?tab=readme-ov-file#using-with-nodejs).
{% endhint %}

### Examples

Below are examples in several popular programming languages. Replace `<API TOKEN>` with your generated token. Never hardcode or commit the token to your codebase. Replace `<OWNER WALLET>` with the **checksummed** address of wallet that was used to open the Margin Account.

{% tabs %}
{% tab title="JavaScript" %}
{% code overflow="wrap" fullWidth="false" %}

```javascript
import { Centrifuge } from 'centrifuge';

let centrifuge = new Centrifuge('wss://api.arkis.xyz/e/v1/ws', {
    timeout: 5000, // Timeout for operations in milliseconds
    debug: true, // Debug mode for observability while development is ongoing
    token: '<API TOKEN>',
})

// Add event handlers for connection errors and disconnect
centrifuge.on('error', ctx => console.error("Connection error:", ctx))
centrifuge.on('disconnected', ctx => console.log("Disconnected: ", ctx))

// // Paste the address of the wallet used to open the Margin Account
const wallet = '<OWNER WALLET>'

// Create a new subcription for maintenance information
const subscription = centrifuge.newSubscription(`maintenance:wallet:${wallet}`)

// Assign handlers for the most important subscription events
// After the subscription is successfully established, you can access the publication history
subscription.on('subscribed', _ => {
    // The max history limit is 100. For demonstrational purposes,
    // the limit is set to 1 and "reverse" parameter is true
    // so the most recent history item will be returned
    // Read more: https://centrifugal.dev/docs/server/history_and_recovery#history-iteration-api
    subscription.history({ limit: 1, reverse: true }).then(
        // Print the most recent item from the history
        ctx => console.log('History item:', ctx.publications[0]),
        err => console.error('History error: ', err)
    )
})

// The 'publication' event is triggered whenever Arkis Risk Management publishes new maintenance information
subscription.on('publication', ctx => console.dir(ctx.data, { depth: null }))

// Other useful handlers
subscription.on('error', ctx => console.error(ctx))
subscription.on('unsubscribed', ctx => console.log(ctx))
subscription.on('state', ctx => console.log(ctx))
subscription.subscribe()

centrifuge.connect()
```

{% endcode %}
{% endtab %}

{% tab title="Go" %}

```go
package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/centrifugal/centrifuge-go"
)

const (
	wsURL       = "wss://api.arkis.xyz/e/v1/ws"
	ownerWallet = "<OWNER WALLET>"
	apiToken    = "<API TOKEN>"
)

func main() {
	c := centrifuge.NewJsonClient(wsURL, centrifuge.Config{
		Token: apiToken,
	})
	defer c.Close()

	c.OnConnected(func(_ centrifuge.ConnectedEvent) {
		log.Println("Connected")
	})
	c.OnConnecting(func(_ centrifuge.ConnectingEvent) {
		log.Println("Connecting")
	})
	c.OnDisconnected(func(e centrifuge.DisconnectedEvent) {
		log.Println("Disconnected", e.Reason)
	})
	c.OnError(func(e centrifuge.ErrorEvent) {
		log.Println("Error", e.Error.Error())
	})

	err := c.Connect()
	if err != nil {
		log.Fatalln(err)
	}

	sub, err := c.NewSubscription(
		"maintenance:wallet:"+ownerWallet,
		centrifuge.SubscriptionConfig{},
	)
	if err != nil {
		log.Fatalln(err)
	}

	ctx := context.Background()

	sub.OnSubscribed(func(e centrifuge.SubscribedEvent) {
		log.Println(
			fmt.Sprintf("Successfully subscribed to private channel %s",
				sub.Channel))

		// After the subscription is successfully established, you can access publication history
		go printMostRecentHistoryPoint(ctx, sub)
	})
	sub.OnError(func(e centrifuge.SubscriptionErrorEvent) {
		log.Println(
			fmt.Sprintf("Error subscribing to private channel %s: %v",
				sub.Channel, e.Error))
	})
	sub.OnUnsubscribed(func(e centrifuge.UnsubscribedEvent) {
		log.Println(
			fmt.Sprintf("Unsubscribed from private channel %s: %s",
				sub.Channel, e.Reason))
	})
	sub.OnPublication(func(e centrifuge.PublicationEvent) {
		log.Println(
			fmt.Sprintf("New message received from channel %s: %s",
				sub.Channel, string(e.Data)))
	})

	// Subscribe on private channel
	_ = sub.Subscribe()

	// Run until CTRL+C
	select {}
}

func printMostRecentHistoryPoint(ctx context.Context, sub *centrifuge.Subscription) {
	hCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	history, err := sub.History(hCtx,
		// The max history limit is 100. For demonstrational purposes,
		// the limit is set to 1 and "reverse" parameter is true
		// so the most recent history item will be returned.
		// Read more: https://centrifugal.dev/docs/server/history_and_recovery#history-iteration-api
		centrifuge.WithHistoryLimit(1),
		centrifuge.WithHistoryReverse(true),
	)
	if err != nil {
		log.Fatalln(err)
	}

	if len(history.Publications) > 0 {
		log.Println(
			fmt.Sprintf("The most recent history point from private channel %s: %s",
				sub.Channel,
				string(history.Publications[len(history.Publications)-1].Data)))
	}
}

```

{% endtab %}

{% tab title="Python" %}
{% code overflow="wrap" %}

```python
import logging
import asyncio
import signal

from centrifuge import (
    Client, ClientEventHandler, ConnectingContext, ConnectedContext,
    DisconnectedContext, ErrorContext, ServerSubscribedContext,
    ServerSubscribingContext, ServerUnsubscribedContext,
    ServerPublicationContext, ServerJoinContext, ServerLeaveContext,
    SubscriptionEventHandler, SubscribingContext, SubscribedContext,
    UnsubscribedContext, PublicationContext, JoinContext, LeaveContext,
    SubscriptionErrorContext, CentrifugeError
)

ARKIS_API_TOKEN = "<API TOKEN>"
OWNER_WALLET = "<OWNER WALLET>"

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)

# Configure centrifuge-python logger
cf_logger = logging.getLogger("centrifuge")
cf_logger.setLevel(logging.INFO)  # Change to logging.DEBUG for debug output


class ClientEventLoggerHandler(ClientEventHandler):
    """Check out comments of ClientEventHandler methods to see when they are called."""

    async def on_connecting(self, ctx: ConnectingContext) -> None:
        logging.info("connecting: %s", ctx)

    async def on_connected(self, ctx: ConnectedContext) -> None:
        logging.info("connected: %s", ctx)

    async def on_disconnected(self, ctx: DisconnectedContext) -> None:
        logging.info("disconnected: %s", ctx)

    async def on_error(self, ctx: ErrorContext) -> None:
        logging.error("client error: %s", ctx)

    async def on_subscribed(self, ctx: ServerSubscribedContext) -> None:
        logging.info("subscribed server-side sub: %s", ctx)

    async def on_subscribing(self, ctx: ServerSubscribingContext) -> None:
        logging.info("subscribing server-side sub: %s", ctx)

    async def on_unsubscribed(self, ctx: ServerUnsubscribedContext) -> None:
        logging.info("unsubscribed from server-side sub: %s", ctx)

    async def on_publication(self, ctx: ServerPublicationContext) -> None:
        logging.info("publication from server-side sub: %s", ctx.pub.data)

    async def on_join(self, ctx: ServerJoinContext) -> None:
        logging.info("join in server-side sub: %s", ctx)

    async def on_leave(self, ctx: ServerLeaveContext) -> None:
        logging.info("leave in server-side sub: %s", ctx)


class SubscriptionEventLoggerHandler(SubscriptionEventHandler):
    """Check out comments of SubscriptionEventHandler methods to see when they are called."""

    async def on_subscribing(self, ctx: SubscribingContext) -> None:
        logging.info("subscribing: %s", ctx)

    async def on_subscribed(self, ctx: SubscribedContext) -> None:
        logging.info("subscribed: %s", ctx)

    async def on_unsubscribed(self, ctx: UnsubscribedContext) -> None:
        logging.info("unsubscribed: %s", ctx)

    async def on_publication(self, ctx: PublicationContext) -> None:
        logging.info("publication: %s", ctx.pub.data)

    async def on_join(self, ctx: JoinContext) -> None:
        logging.info("join: %s", ctx)

    async def on_leave(self, ctx: LeaveContext) -> None:
        logging.info("leave: %s", ctx)

    async def on_error(self, ctx: SubscriptionErrorContext) -> None:
        logging.error("subscription error: %s", ctx)


def run_example():
    client = Client(
        "wss://api.arkis.xyz/e/v1/ws",
        events=ClientEventLoggerHandler(),
        token=ARKIS_API_TOKEN
    )

    sub = client.new_subscription(
        f"maintenance:wallet:{OWNER_WALLET}",
        events=SubscriptionEventLoggerHandler(),
    )

    async def run():
        await client.connect()
        await sub.subscribe()

        try:
            result = await sub.history(limit=1, reverse=True)
            logging.info(result)
        except CentrifugeError as e:
            logging.error("error history: %s", e)

        logging.info(
            "all done, client connection is still alive, press Ctrl+C to exit")

    asyncio.ensure_future(run())
    loop = asyncio.get_event_loop()

    async def shutdown(received_signal):
        logging.info("received exit signal %s...", received_signal.name)
        await client.disconnect()

        tasks = [t for t in asyncio.all_tasks(
        ) if t is not asyncio.current_task()]
        for task in tasks:
            task.cancel()

        logging.info("Cancelling outstanding tasks")
        await asyncio.gather(*tasks, return_exceptions=True)
        loop.stop()

    signals = (signal.SIGTERM, signal.SIGINT)
    for s in signals:
        loop.add_signal_handler(
            s, lambda received_signal=s: asyncio.create_task(
                shutdown(received_signal))
        )

    try:
        loop.run_forever()
    finally:
        loop.close()
        logging.info("successfully completed service shutdown")


if __name__ == "__main__":
    run_example()

```

{% endcode %}
{% endtab %}
{% endtabs %}
