pyaver.event_queue
1from .utils import load_multiple_bytes_data 2from solana.publickey import PublicKey 3from solana.rpc.async_api import AsyncClient 4from typing import List, Tuple, Union, Container 5from .enums import Fill, Out, Side 6from solana.rpc.async_api import AsyncClient 7from .layouts import EVENT_QUEUE_HEADER_LAYOUT, EVENT_QUEUE_HEADER_LEN, REGISTER_SIZE, EVENT_LAYOUT 8 9 10async def load_all_event_queues(conn: AsyncClient, event_queues: list[PublicKey]): 11 """ 12 Loads onchain data for multiple Event Queues 13 14 Args: 15 conn (AsyncClient): Solana AsyncClient object 16 event_queues (list[PublicKey]): List of EventQueue account pubkeys 17 18 Returns: 19 list[Tuple[Container, List[Fill | Out]]]: List of EventQueues 20 """ 21 data = await load_multiple_bytes_data(conn, event_queues) 22 return [read_event_queue_from_bytes(d) for d in data] 23 24def read_event_queue_from_bytes(buffer: bytes) -> Tuple[Container, List[Union[Fill, Out]]]: 25 """ 26 Parses raw event queue data into Event objects 27 28 Args: 29 buffer (bytes): Raw bytes coming from onchain 30 31 Returns: 32 Tuple[Container, List[Union[Fill, Out]]]: _description_ 33 """ 34 header = EVENT_QUEUE_HEADER_LAYOUT.parse(buffer) 35 buffer_len = len(buffer) 36 nodes: List[Union[Fill, Out]] = [] 37 for i in range(header.count): 38 header_offset = EVENT_QUEUE_HEADER_LEN + REGISTER_SIZE 39 offset = header_offset + ((i * header.event_size) + header.head) % (buffer_len - header_offset) 40 event = EVENT_LAYOUT.parse(buffer[offset : offset + header.event_size]) 41 42 if event.tag == 0: # FILL 43 node = Fill( 44 taker_side = Side(event.node.taker_side), 45 maker_order_id = int.from_bytes(event.node.maker_order_id, "little"), 46 quote_size = event.node.quote_size, 47 base_size = event.node.base_size, 48 maker_user_market = PublicKey(event.node.maker_callback_info.user_market), 49 taker_user_market = PublicKey(event.node.taker_callback_info.user_market), 50 maker_fee_tier = event.node.maker_callback_info.fee_tier, 51 taker_fee_tier = event.node.taker_callback_info.fee_tier, 52 ) 53 else: # OUT 54 node = Out( 55 side = Side(event.node.side), 56 order_id = int.from_bytes(event.node.order_id, "little"), 57 base_size = event.node.base_size, 58 delete = bool(event.node.delete), 59 user_market =PublicKey(event.node.callback_info.user_market), 60 fee_tier = event.node.callback_info.fee_tier, 61 ) 62 nodes.append(node) 63 return {"header": header, "nodes": nodes} 64 65def prepare_user_accounts_list(user_account: List[PublicKey]) -> List[PublicKey]: 66 """ 67 Sorts list of user accounts by public key (alphabetically) 68 69 Args: 70 user_account (List[PublicKey]): List of User Account account pubkeys 71 72 Returns: 73 List[PublicKey]: Sorted list of User Account account pubkeys 74 """ 75 str_list = [str(pk) for pk in user_account] 76 deduped_list = list(set(str_list)) 77 # TODO: Not clear if this sort is doing the same thing as dex_v4 - they use .sort_unstable() 78 sorted_list = sorted(deduped_list) 79 pubkey_list = [PublicKey(stpk) for stpk in sorted_list] 80 return pubkey_list
async def
load_all_event_queues( conn: solana.rpc.async_api.AsyncClient, event_queues: list[solana.publickey.PublicKey])
11async def load_all_event_queues(conn: AsyncClient, event_queues: list[PublicKey]): 12 """ 13 Loads onchain data for multiple Event Queues 14 15 Args: 16 conn (AsyncClient): Solana AsyncClient object 17 event_queues (list[PublicKey]): List of EventQueue account pubkeys 18 19 Returns: 20 list[Tuple[Container, List[Fill | Out]]]: List of EventQueues 21 """ 22 data = await load_multiple_bytes_data(conn, event_queues) 23 return [read_event_queue_from_bytes(d) for d in data]
Loads onchain data for multiple Event Queues
Args
- conn (AsyncClient): Solana AsyncClient object
- event_queues (list[PublicKey]): List of EventQueue account pubkeys
Returns
list[Tuple[Container, List[Fill | Out]]]: List of EventQueues
def
read_event_queue_from_bytes( buffer: bytes) -> Tuple[Container, List[Union[pyaver.enums.Fill, pyaver.enums.Out]]]:
25def read_event_queue_from_bytes(buffer: bytes) -> Tuple[Container, List[Union[Fill, Out]]]: 26 """ 27 Parses raw event queue data into Event objects 28 29 Args: 30 buffer (bytes): Raw bytes coming from onchain 31 32 Returns: 33 Tuple[Container, List[Union[Fill, Out]]]: _description_ 34 """ 35 header = EVENT_QUEUE_HEADER_LAYOUT.parse(buffer) 36 buffer_len = len(buffer) 37 nodes: List[Union[Fill, Out]] = [] 38 for i in range(header.count): 39 header_offset = EVENT_QUEUE_HEADER_LEN + REGISTER_SIZE 40 offset = header_offset + ((i * header.event_size) + header.head) % (buffer_len - header_offset) 41 event = EVENT_LAYOUT.parse(buffer[offset : offset + header.event_size]) 42 43 if event.tag == 0: # FILL 44 node = Fill( 45 taker_side = Side(event.node.taker_side), 46 maker_order_id = int.from_bytes(event.node.maker_order_id, "little"), 47 quote_size = event.node.quote_size, 48 base_size = event.node.base_size, 49 maker_user_market = PublicKey(event.node.maker_callback_info.user_market), 50 taker_user_market = PublicKey(event.node.taker_callback_info.user_market), 51 maker_fee_tier = event.node.maker_callback_info.fee_tier, 52 taker_fee_tier = event.node.taker_callback_info.fee_tier, 53 ) 54 else: # OUT 55 node = Out( 56 side = Side(event.node.side), 57 order_id = int.from_bytes(event.node.order_id, "little"), 58 base_size = event.node.base_size, 59 delete = bool(event.node.delete), 60 user_market =PublicKey(event.node.callback_info.user_market), 61 fee_tier = event.node.callback_info.fee_tier, 62 ) 63 nodes.append(node) 64 return {"header": header, "nodes": nodes}
Parses raw event queue data into Event objects
Args
- buffer (bytes): Raw bytes coming from onchain
Returns
Tuple[Container, List[Union[Fill, Out]]]: _description_
def
prepare_user_accounts_list( user_account: List[solana.publickey.PublicKey]) -> List[solana.publickey.PublicKey]:
66def prepare_user_accounts_list(user_account: List[PublicKey]) -> List[PublicKey]: 67 """ 68 Sorts list of user accounts by public key (alphabetically) 69 70 Args: 71 user_account (List[PublicKey]): List of User Account account pubkeys 72 73 Returns: 74 List[PublicKey]: Sorted list of User Account account pubkeys 75 """ 76 str_list = [str(pk) for pk in user_account] 77 deduped_list = list(set(str_list)) 78 # TODO: Not clear if this sort is doing the same thing as dex_v4 - they use .sort_unstable() 79 sorted_list = sorted(deduped_list) 80 pubkey_list = [PublicKey(stpk) for stpk in sorted_list] 81 return pubkey_list
Sorts list of user accounts by public key (alphabetically)
Args
- user_account (List[PublicKey]): List of User Account account pubkeys
Returns
List[PublicKey]: Sorted list of User Account account pubkeys