pyaver.event_queue

 1from .utils import load_multiple_bytes_data
 2from solana.publickey import PublicKey
 3from solana.rpc.async_api import AsyncClient
 4from typing import List, Tuple, Union, Container
 5from .enums import Fill, Out, Side
 6from solana.rpc.async_api import AsyncClient
 7from .layouts import EVENT_QUEUE_HEADER_LAYOUT, EVENT_QUEUE_HEADER_LEN, REGISTER_SIZE, EVENT_LAYOUT
 8
 9
10async def load_all_event_queues(conn: AsyncClient, event_queues: list[PublicKey]):
11    """
12    Loads onchain data for multiple Event Queues
13
14    Args:
15        conn (AsyncClient): Solana AsyncClient object
16        event_queues (list[PublicKey]): List of EventQueue account pubkeys
17
18    Returns:
19        list[Tuple[Container, List[Fill | Out]]]: List of EventQueues
20    """
21    data = await load_multiple_bytes_data(conn, event_queues)
22    return [read_event_queue_from_bytes(d) for d in data]
23
24def read_event_queue_from_bytes(buffer: bytes) -> Tuple[Container, List[Union[Fill, Out]]]:
25    """
26    Parses raw event queue data into Event objects
27
28    Args:
29        buffer (bytes): Raw bytes coming from onchain
30
31    Returns:
32        Tuple[Container, List[Union[Fill, Out]]]: _description_
33    """
34    header = EVENT_QUEUE_HEADER_LAYOUT.parse(buffer)
35    buffer_len = len(buffer)
36    nodes: List[Union[Fill, Out]] = []
37    for i in range(header.count):
38        header_offset = EVENT_QUEUE_HEADER_LEN + REGISTER_SIZE
39        offset = header_offset + ((i * header.event_size) + header.head) % (buffer_len - header_offset)
40        event = EVENT_LAYOUT.parse(buffer[offset : offset + header.event_size])
41
42        if event.tag == 0: # FILL
43            node = Fill(
44                taker_side = Side(event.node.taker_side),
45                maker_order_id = int.from_bytes(event.node.maker_order_id, "little"),
46                quote_size = event.node.quote_size,
47                base_size = event.node.base_size,
48                maker_user_market = PublicKey(event.node.maker_callback_info.user_market),
49                taker_user_market = PublicKey(event.node.taker_callback_info.user_market),
50                maker_fee_tier = event.node.maker_callback_info.fee_tier,
51                taker_fee_tier = event.node.taker_callback_info.fee_tier,
52            )
53        else:  # OUT
54            node = Out(
55                side = Side(event.node.side),
56                order_id = int.from_bytes(event.node.order_id, "little"),
57                base_size = event.node.base_size,
58                delete = bool(event.node.delete),
59                user_market =PublicKey(event.node.callback_info.user_market),
60                fee_tier = event.node.callback_info.fee_tier,
61            )
62        nodes.append(node)
63    return {"header": header, "nodes": nodes}
64
65def prepare_user_accounts_list(user_account: List[PublicKey]) -> List[PublicKey]:
66    """
67    Sorts list of user accounts by public key (alphabetically)
68
69    Args:
70        user_account (List[PublicKey]): List of User Account account pubkeys
71
72    Returns:
73        List[PublicKey]: Sorted list of User Account account pubkeys
74    """
75    str_list = [str(pk) for pk in user_account]
76    deduped_list = list(set(str_list))
77    # TODO: Not clear if this sort is doing the same thing as dex_v4 - they use .sort_unstable()
78    sorted_list = sorted(deduped_list)
79    pubkey_list = [PublicKey(stpk) for stpk in sorted_list]
80    return pubkey_list
async def load_all_event_queues( conn: solana.rpc.async_api.AsyncClient, event_queues: list[solana.publickey.PublicKey])
11async def load_all_event_queues(conn: AsyncClient, event_queues: list[PublicKey]):
12    """
13    Loads onchain data for multiple Event Queues
14
15    Args:
16        conn (AsyncClient): Solana AsyncClient object
17        event_queues (list[PublicKey]): List of EventQueue account pubkeys
18
19    Returns:
20        list[Tuple[Container, List[Fill | Out]]]: List of EventQueues
21    """
22    data = await load_multiple_bytes_data(conn, event_queues)
23    return [read_event_queue_from_bytes(d) for d in data]

Loads onchain data for multiple Event Queues

Args
  • conn (AsyncClient): Solana AsyncClient object
  • event_queues (list[PublicKey]): List of EventQueue account pubkeys
Returns

list[Tuple[Container, List[Fill | Out]]]: List of EventQueues

def read_event_queue_from_bytes( buffer: bytes) -> Tuple[Container, List[Union[pyaver.enums.Fill, pyaver.enums.Out]]]:
25def read_event_queue_from_bytes(buffer: bytes) -> Tuple[Container, List[Union[Fill, Out]]]:
26    """
27    Parses raw event queue data into Event objects
28
29    Args:
30        buffer (bytes): Raw bytes coming from onchain
31
32    Returns:
33        Tuple[Container, List[Union[Fill, Out]]]: _description_
34    """
35    header = EVENT_QUEUE_HEADER_LAYOUT.parse(buffer)
36    buffer_len = len(buffer)
37    nodes: List[Union[Fill, Out]] = []
38    for i in range(header.count):
39        header_offset = EVENT_QUEUE_HEADER_LEN + REGISTER_SIZE
40        offset = header_offset + ((i * header.event_size) + header.head) % (buffer_len - header_offset)
41        event = EVENT_LAYOUT.parse(buffer[offset : offset + header.event_size])
42
43        if event.tag == 0: # FILL
44            node = Fill(
45                taker_side = Side(event.node.taker_side),
46                maker_order_id = int.from_bytes(event.node.maker_order_id, "little"),
47                quote_size = event.node.quote_size,
48                base_size = event.node.base_size,
49                maker_user_market = PublicKey(event.node.maker_callback_info.user_market),
50                taker_user_market = PublicKey(event.node.taker_callback_info.user_market),
51                maker_fee_tier = event.node.maker_callback_info.fee_tier,
52                taker_fee_tier = event.node.taker_callback_info.fee_tier,
53            )
54        else:  # OUT
55            node = Out(
56                side = Side(event.node.side),
57                order_id = int.from_bytes(event.node.order_id, "little"),
58                base_size = event.node.base_size,
59                delete = bool(event.node.delete),
60                user_market =PublicKey(event.node.callback_info.user_market),
61                fee_tier = event.node.callback_info.fee_tier,
62            )
63        nodes.append(node)
64    return {"header": header, "nodes": nodes}

Parses raw event queue data into Event objects

Args
  • buffer (bytes): Raw bytes coming from onchain
Returns

Tuple[Container, List[Union[Fill, Out]]]: _description_

def prepare_user_accounts_list( user_account: List[solana.publickey.PublicKey]) -> List[solana.publickey.PublicKey]:
66def prepare_user_accounts_list(user_account: List[PublicKey]) -> List[PublicKey]:
67    """
68    Sorts list of user accounts by public key (alphabetically)
69
70    Args:
71        user_account (List[PublicKey]): List of User Account account pubkeys
72
73    Returns:
74        List[PublicKey]: Sorted list of User Account account pubkeys
75    """
76    str_list = [str(pk) for pk in user_account]
77    deduped_list = list(set(str_list))
78    # TODO: Not clear if this sort is doing the same thing as dex_v4 - they use .sort_unstable()
79    sorted_list = sorted(deduped_list)
80    pubkey_list = [PublicKey(stpk) for stpk in sorted_list]
81    return pubkey_list

Sorts list of user accounts by public key (alphabetically)

Args
  • user_account (List[PublicKey]): List of User Account account pubkeys
Returns

List[PublicKey]: Sorted list of User Account account pubkeys