Arkis provides a WebSocket API that delivers real-time maintenance updates on borrowed positions, including position details and the calculated risk factor. It uses the WebSocket protocol provided by the Centrifuge library. To receive real-time maintenance updates, use an official Centrifuge client. You will also need to generate an API key in the Arkis App settings and present it as a credential when connecting with the Centrifuge client.
Step-by-step instructions on how to create and revoke an API key are described on that page.
The sections below walk through each step and provide relevant development documentation.
Integrating the API with the Centrifuge Client
Now that your token has been generated, you can use one of the official Centrifuge clients to subscribe to Arkis real-time maintenance updates.
Prerequisites
Connection info
URLwss://api.arkis.xyz/e/v1/ws
Channelmaintenance:wallet:{owner_wallet}
The wallet address must be checksummed: more info.
If you're using NodeJS version earlier than v22.4.0, it does not include a built-in WebSocket implementation, so one must be provided manually: more info.
Examples
Below are examples in several popular programming languages. Replace <API TOKEN> with your generated token. Never hardcode or commit the token to your codebase. Replace <OWNER WALLET> with the checksummed address of wallet that was used to open the Margin Account.
import { Centrifuge } from 'centrifuge';
let centrifuge = new Centrifuge('wss://api.arkis.xyz/e/v1/ws', {
timeout: 5000, // Timeout for operations in milliseconds
debug: true, // Debug mode for observability while development is ongoing
token: '<API TOKEN>',
})
// Add event handlers for connection errors and disconnect
centrifuge.on('error', ctx => console.error("Connection error:", ctx))
centrifuge.on('disconnected', ctx => console.log("Disconnected: ", ctx))
// // Paste the address of the wallet used to open the Margin Account
const wallet = '<OWNER WALLET>'
// Create a new subcription for maintenance information
const subscription = centrifuge.newSubscription(`maintenance:wallet:${wallet}`)
// Assign handlers for the most important subscription events
// After the subscription is successfully established, you can access the publication history
subscription.on('subscribed', _ => {
// The max history limit is 100. For demonstrational purposes,
// the limit is set to 1 and "reverse" parameter is true
// so the most recent history item will be returned
// Read more: https://centrifugal.dev/docs/server/history_and_recovery#history-iteration-api
subscription.history({ limit: 1, reverse: true }).then(
// Print the most recent item from the history
ctx => console.log('History item:', ctx.publications[0]),
err => console.error('History error: ', err)
)
})
// The 'publication' event is triggered whenever Arkis Risk Management publishes new maintenance information
subscription.on('publication', ctx => console.dir(ctx.data, { depth: null }))
// Other useful handlers
subscription.on('error', ctx => console.error(ctx))
subscription.on('unsubscribed', ctx => console.log(ctx))
subscription.on('state', ctx => console.log(ctx))
subscription.subscribe()
centrifuge.connect()
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/centrifugal/centrifuge-go"
)
const (
wsURL = "wss://api.arkis.xyz/e/v1/ws"
ownerWallet = "<OWNER WALLET>"
apiToken = "<API TOKEN>"
)
func main() {
c := centrifuge.NewJsonClient(wsURL, centrifuge.Config{
Token: apiToken,
})
defer c.Close()
c.OnConnected(func(_ centrifuge.ConnectedEvent) {
log.Println("Connected")
})
c.OnConnecting(func(_ centrifuge.ConnectingEvent) {
log.Println("Connecting")
})
c.OnDisconnected(func(e centrifuge.DisconnectedEvent) {
log.Println("Disconnected", e.Reason)
})
c.OnError(func(e centrifuge.ErrorEvent) {
log.Println("Error", e.Error.Error())
})
err := c.Connect()
if err != nil {
log.Fatalln(err)
}
sub, err := c.NewSubscription(
"maintenance:wallet:"+ownerWallet,
centrifuge.SubscriptionConfig{},
)
if err != nil {
log.Fatalln(err)
}
ctx := context.Background()
sub.OnSubscribed(func(e centrifuge.SubscribedEvent) {
log.Println(
fmt.Sprintf("Successfully subscribed to private channel %s",
sub.Channel))
// After the subscription is successfully established, you can access publication history
go printMostRecentHistoryPoint(ctx, sub)
})
sub.OnError(func(e centrifuge.SubscriptionErrorEvent) {
log.Println(
fmt.Sprintf("Error subscribing to private channel %s: %v",
sub.Channel, e.Error))
})
sub.OnUnsubscribed(func(e centrifuge.UnsubscribedEvent) {
log.Println(
fmt.Sprintf("Unsubscribed from private channel %s: %s",
sub.Channel, e.Reason))
})
sub.OnPublication(func(e centrifuge.PublicationEvent) {
log.Println(
fmt.Sprintf("New message received from channel %s: %s",
sub.Channel, string(e.Data)))
})
// Subscribe on private channel
_ = sub.Subscribe()
// Run until CTRL+C
select {}
}
func printMostRecentHistoryPoint(ctx context.Context, sub *centrifuge.Subscription) {
hCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
history, err := sub.History(hCtx,
// The max history limit is 100. For demonstrational purposes,
// the limit is set to 1 and "reverse" parameter is true
// so the most recent history item will be returned.
// Read more: https://centrifugal.dev/docs/server/history_and_recovery#history-iteration-api
centrifuge.WithHistoryLimit(1),
centrifuge.WithHistoryReverse(true),
)
if err != nil {
log.Fatalln(err)
}
if len(history.Publications) > 0 {
log.Println(
fmt.Sprintf("The most recent history point from private channel %s: %s",
sub.Channel,
string(history.Publications[len(history.Publications)-1].Data)))
}
}
import logging
import asyncio
import signal
from centrifuge import (
Client, ClientEventHandler, ConnectingContext, ConnectedContext,
DisconnectedContext, ErrorContext, ServerSubscribedContext,
ServerSubscribingContext, ServerUnsubscribedContext,
ServerPublicationContext, ServerJoinContext, ServerLeaveContext,
SubscriptionEventHandler, SubscribingContext, SubscribedContext,
UnsubscribedContext, PublicationContext, JoinContext, LeaveContext,
SubscriptionErrorContext, CentrifugeError
)
ARKIS_API_TOKEN = "<API TOKEN>"
OWNER_WALLET = "<OWNER WALLET>"
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
# Configure centrifuge-python logger
cf_logger = logging.getLogger("centrifuge")
cf_logger.setLevel(logging.INFO) # Change to logging.DEBUG for debug output
class ClientEventLoggerHandler(ClientEventHandler):
"""Check out comments of ClientEventHandler methods to see when they are called."""
async def on_connecting(self, ctx: ConnectingContext) -> None:
logging.info("connecting: %s", ctx)
async def on_connected(self, ctx: ConnectedContext) -> None:
logging.info("connected: %s", ctx)
async def on_disconnected(self, ctx: DisconnectedContext) -> None:
logging.info("disconnected: %s", ctx)
async def on_error(self, ctx: ErrorContext) -> None:
logging.error("client error: %s", ctx)
async def on_subscribed(self, ctx: ServerSubscribedContext) -> None:
logging.info("subscribed server-side sub: %s", ctx)
async def on_subscribing(self, ctx: ServerSubscribingContext) -> None:
logging.info("subscribing server-side sub: %s", ctx)
async def on_unsubscribed(self, ctx: ServerUnsubscribedContext) -> None:
logging.info("unsubscribed from server-side sub: %s", ctx)
async def on_publication(self, ctx: ServerPublicationContext) -> None:
logging.info("publication from server-side sub: %s", ctx.pub.data)
async def on_join(self, ctx: ServerJoinContext) -> None:
logging.info("join in server-side sub: %s", ctx)
async def on_leave(self, ctx: ServerLeaveContext) -> None:
logging.info("leave in server-side sub: %s", ctx)
class SubscriptionEventLoggerHandler(SubscriptionEventHandler):
"""Check out comments of SubscriptionEventHandler methods to see when they are called."""
async def on_subscribing(self, ctx: SubscribingContext) -> None:
logging.info("subscribing: %s", ctx)
async def on_subscribed(self, ctx: SubscribedContext) -> None:
logging.info("subscribed: %s", ctx)
async def on_unsubscribed(self, ctx: UnsubscribedContext) -> None:
logging.info("unsubscribed: %s", ctx)
async def on_publication(self, ctx: PublicationContext) -> None:
logging.info("publication: %s", ctx.pub.data)
async def on_join(self, ctx: JoinContext) -> None:
logging.info("join: %s", ctx)
async def on_leave(self, ctx: LeaveContext) -> None:
logging.info("leave: %s", ctx)
async def on_error(self, ctx: SubscriptionErrorContext) -> None:
logging.error("subscription error: %s", ctx)
def run_example():
client = Client(
"wss://api.arkis.xyz/e/v1/ws",
events=ClientEventLoggerHandler(),
token=ARKIS_API_TOKEN
)
sub = client.new_subscription(
f"maintenance:wallet:{OWNER_WALLET}",
events=SubscriptionEventLoggerHandler(),
)
async def run():
await client.connect()
await sub.subscribe()
try:
result = await sub.history(limit=1, reverse=True)
logging.info(result)
except CentrifugeError as e:
logging.error("error history: %s", e)
logging.info(
"all done, client connection is still alive, press Ctrl+C to exit")
asyncio.ensure_future(run())
loop = asyncio.get_event_loop()
async def shutdown(received_signal):
logging.info("received exit signal %s...", received_signal.name)
await client.disconnect()
tasks = [t for t in asyncio.all_tasks(
) if t is not asyncio.current_task()]
for task in tasks:
task.cancel()
logging.info("Cancelling outstanding tasks")
await asyncio.gather(*tasks, return_exceptions=True)
loop.stop()
signals = (signal.SIGTERM, signal.SIGINT)
for s in signals:
loop.add_signal_handler(
s, lambda received_signal=s: asyncio.create_task(
shutdown(received_signal))
)
try:
loop.run_forever()
finally:
loop.close()
logging.info("successfully completed service shutdown")
if __name__ == "__main__":
run_example()