WebSocket API
Arkis provides a WebSocket API that delivers real-time maintenance updates on borrowings, including position details and the calculated risk factor. It uses the WebSocket protocol provided by the Centrifuge library. To receive real-time maintenance updates, use an official Centrifuge client. You will also need to generate an API key in the Arkis App settings and present it as a credential when connecting with the Centrifuge client. The sections below walk through each step and include development documentation.
Create a new API key
Go to "API Key Management" page.
Click on "Create API Key" button in the top right corner.
Enter a token name, select an expiration date from the predefined options or specify a custom date, and choose the desired permissions.
Click "Create API Key" to proceed.
The token is displayed only once. Make sure to copy it immediately and store it securely. Do not save the token in plain text files. If the token is lost or compromised, it must be manually revoked: Revoke an API key.
Integrate the API with the Centrifuge Client
Now that your token has been generated, you can use one of the official Centrifuge clients to subscribe to Arkis real-time maintenance updates.
Prerequisites
Connection info
URL wss://api.arkis.xyz/e/v1/ws
Channel maintenance:wallet:{owner_wallet}
Address of wallet must be checksummed: more info.
Tested client versions
If you're using NodeJS version earlier than v22.4.0 it does not include a built-in WebSocket implementation, so it must be provided manually: more info.
Examples
Below are examples in several popular programming languages. Replace <API TOKEN> with your generated token. Never hardcode or commit the token to your codebase. Replace <OWNER WALLET> with the checksummed address of wallet that was used to open the Margin Account.
import { Centrifuge } from 'centrifuge';
let centrifuge = new Centrifuge('wss://api.arkis.xyz/e/v1/ws', {
timeout: 5000, // Timeout for operations in milliseconds
debug: true, // Debug mode for observability while development is ongoing
token: '<API TOKEN>',
})
// Add event handlers for connection errors and disconnect
centrifuge.on('error', ctx => console.error("Connection error:", ctx))
centrifuge.on('disconnected', ctx => console.log("Disconnected: ", ctx))
// // Paste the address of the wallet used to open the Margin Account
const wallet = '<OWNER WALLET>'
// Create a new subcription for maintenance information
const subscription = centrifuge.newSubscription(`maintenance:wallet:${wallet}`)
// Assign handlers for the most important subscription events
// After the subscription is successfully established, you can access the publication history
subscription.on('subscribed', _ => {
// The max history limit is 100. For demonstrational purposes,
// the limit is set to 1 and "reverse" parameter is true
// so the most recent history item will be returned
// Read more: https://centrifugal.dev/docs/server/history_and_recovery#history-iteration-api
subscription.history({ limit: 1, reverse: true }).then(
// Print the most recent item from the history
ctx => console.log('History item:', ctx.publications[0]),
err => console.error('History error: ', err)
)
})
// The 'publication' event is triggered whenever Arkis Risk Management publishes new maintenance information
subscription.on('publication', ctx => console.dir(ctx.data, { depth: null }))
// Other useful handlers
subscription.on('error', ctx => console.error(ctx))
subscription.on('unsubscribed', ctx => console.log(ctx))
subscription.on('state', ctx => console.log(ctx))
subscription.subscribe()
centrifuge.connect()package main
import (
"context"
"fmt"
"log"
"time"
"github.com/centrifugal/centrifuge-go"
)
const (
wsURL = "wss://api.arkis.xyz/e/v1/ws"
ownerWallet = "<OWNER WALLET>"
apiToken = "<API TOKEN>"
)
func main() {
c := centrifuge.NewJsonClient(wsURL, centrifuge.Config{
Token: apiToken,
})
defer c.Close()
c.OnConnected(func(_ centrifuge.ConnectedEvent) {
log.Println("Connected")
})
c.OnConnecting(func(_ centrifuge.ConnectingEvent) {
log.Println("Connecting")
})
c.OnDisconnected(func(e centrifuge.DisconnectedEvent) {
log.Println("Disconnected", e.Reason)
})
c.OnError(func(e centrifuge.ErrorEvent) {
log.Println("Error", e.Error.Error())
})
err := c.Connect()
if err != nil {
log.Fatalln(err)
}
sub, err := c.NewSubscription(
"maintenance:wallet:"+ownerWallet,
centrifuge.SubscriptionConfig{},
)
if err != nil {
log.Fatalln(err)
}
ctx := context.Background()
sub.OnSubscribed(func(e centrifuge.SubscribedEvent) {
log.Println(
fmt.Sprintf("Successfully subscribed to private channel %s",
sub.Channel))
// After the subscription is successfully established, you can access publication history
go printMostRecentHistoryPoint(ctx, sub)
})
sub.OnError(func(e centrifuge.SubscriptionErrorEvent) {
log.Println(
fmt.Sprintf("Error subscribing to private channel %s: %v",
sub.Channel, e.Error))
})
sub.OnUnsubscribed(func(e centrifuge.UnsubscribedEvent) {
log.Println(
fmt.Sprintf("Unsubscribed from private channel %s: %s",
sub.Channel, e.Reason))
})
sub.OnPublication(func(e centrifuge.PublicationEvent) {
log.Println(
fmt.Sprintf("New message received from channel %s: %s",
sub.Channel, string(e.Data)))
})
// Subscribe on private channel
_ = sub.Subscribe()
// Run until CTRL+C
select {}
}
func printMostRecentHistoryPoint(ctx context.Context, sub *centrifuge.Subscription) {
hCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
history, err := sub.History(hCtx,
// The max history limit is 100. For demonstrational purposes,
// the limit is set to 1 and "reverse" parameter is true
// so the most recent history item will be returned.
// Read more: https://centrifugal.dev/docs/server/history_and_recovery#history-iteration-api
centrifuge.WithHistoryLimit(1),
centrifuge.WithHistoryReverse(true),
)
if err != nil {
log.Fatalln(err)
}
if len(history.Publications) > 0 {
log.Println(
fmt.Sprintf("The most recent history point from private channel %s: %s",
sub.Channel,
string(history.Publications[len(history.Publications)-1].Data)))
}
}
import logging
import asyncio
import signal
from centrifuge import (
Client, ClientEventHandler, ConnectingContext, ConnectedContext,
DisconnectedContext, ErrorContext, ServerSubscribedContext,
ServerSubscribingContext, ServerUnsubscribedContext,
ServerPublicationContext, ServerJoinContext, ServerLeaveContext,
SubscriptionEventHandler, SubscribingContext, SubscribedContext,
UnsubscribedContext, PublicationContext, JoinContext, LeaveContext,
SubscriptionErrorContext, CentrifugeError
)
ARKIS_API_TOKEN = "<API TOKEN>"
OWNER_WALLET = "<OWNER WALLET>"
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
# Configure centrifuge-python logger
cf_logger = logging.getLogger("centrifuge")
cf_logger.setLevel(logging.INFO) # Change to logging.DEBUG for debug output
class ClientEventLoggerHandler(ClientEventHandler):
"""Check out comments of ClientEventHandler methods to see when they are called."""
async def on_connecting(self, ctx: ConnectingContext) -> None:
logging.info("connecting: %s", ctx)
async def on_connected(self, ctx: ConnectedContext) -> None:
logging.info("connected: %s", ctx)
async def on_disconnected(self, ctx: DisconnectedContext) -> None:
logging.info("disconnected: %s", ctx)
async def on_error(self, ctx: ErrorContext) -> None:
logging.error("client error: %s", ctx)
async def on_subscribed(self, ctx: ServerSubscribedContext) -> None:
logging.info("subscribed server-side sub: %s", ctx)
async def on_subscribing(self, ctx: ServerSubscribingContext) -> None:
logging.info("subscribing server-side sub: %s", ctx)
async def on_unsubscribed(self, ctx: ServerUnsubscribedContext) -> None:
logging.info("unsubscribed from server-side sub: %s", ctx)
async def on_publication(self, ctx: ServerPublicationContext) -> None:
logging.info("publication from server-side sub: %s", ctx.pub.data)
async def on_join(self, ctx: ServerJoinContext) -> None:
logging.info("join in server-side sub: %s", ctx)
async def on_leave(self, ctx: ServerLeaveContext) -> None:
logging.info("leave in server-side sub: %s", ctx)
class SubscriptionEventLoggerHandler(SubscriptionEventHandler):
"""Check out comments of SubscriptionEventHandler methods to see when they are called."""
async def on_subscribing(self, ctx: SubscribingContext) -> None:
logging.info("subscribing: %s", ctx)
async def on_subscribed(self, ctx: SubscribedContext) -> None:
logging.info("subscribed: %s", ctx)
async def on_unsubscribed(self, ctx: UnsubscribedContext) -> None:
logging.info("unsubscribed: %s", ctx)
async def on_publication(self, ctx: PublicationContext) -> None:
logging.info("publication: %s", ctx.pub.data)
async def on_join(self, ctx: JoinContext) -> None:
logging.info("join: %s", ctx)
async def on_leave(self, ctx: LeaveContext) -> None:
logging.info("leave: %s", ctx)
async def on_error(self, ctx: SubscriptionErrorContext) -> None:
logging.error("subscription error: %s", ctx)
def run_example():
client = Client(
"wss://api.arkis.xyz/e/v1/ws",
events=ClientEventLoggerHandler(),
token=ARKIS_API_TOKEN
)
sub = client.new_subscription(
f"maintenance:wallet:{OWNER_WALLET}",
events=SubscriptionEventLoggerHandler(),
)
async def run():
await client.connect()
await sub.subscribe()
try:
result = await sub.history(limit=1, reverse=True)
logging.info(result)
except CentrifugeError as e:
logging.error("error history: %s", e)
logging.info(
"all done, client connection is still alive, press Ctrl+C to exit")
asyncio.ensure_future(run())
loop = asyncio.get_event_loop()
async def shutdown(received_signal):
logging.info("received exit signal %s...", received_signal.name)
await client.disconnect()
tasks = [t for t in asyncio.all_tasks(
) if t is not asyncio.current_task()]
for task in tasks:
task.cancel()
logging.info("Cancelling outstanding tasks")
await asyncio.gather(*tasks, return_exceptions=True)
loop.stop()
signals = (signal.SIGTERM, signal.SIGINT)
for s in signals:
loop.add_signal_handler(
s, lambda received_signal=s: asyncio.create_task(
shutdown(received_signal))
)
try:
loop.run_forever()
finally:
loop.close()
logging.info("successfully completed service shutdown")
if __name__ == "__main__":
run_example()
Revoke an API key
Only an Organization Admin can revoke an API key
Go to "API Key Management" page.
Locate the key by its name or tail (the last six characters of the key).
Click "Revoke".
Last updated






