Skip to content

feat(examples): add listen logsubscribe for cpswap migrations #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 131 additions & 0 deletions learning_examples_py/migrations_cpswap/listen_logsubscribe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
"""
Listens for Raydium Launchpad program logs. Get the transaction details using the signature.
Parse the token mint from the MigrateToCpswap instruction.

Note: Since we are listening to the Launchpad logs, it will consume a lot of credits/compute units.
This scripts finds the all tokens that are migrated to the CPSwap program, not limited to only bonkfun.

"""


import json
import asyncio
import os

from dotenv import load_dotenv
from solana.rpc.async_api import AsyncClient
from solders.pubkey import Pubkey
from solana.rpc.async_api import AsyncClient
from solders.signature import Signature
import websockets

load_dotenv()

RPC_ENDPOINT = os.environ.get("SOLANA_NODE_RPC_ENDPOINT")
WSS_ENDPOINT = os.environ.get("SOLANA_NODE_WSS_ENDPOINT")
RAYDIUM_LAUNCHPAD_PROGRAM_ID = Pubkey.from_string("LanMV9sAd7wArD4vJFi2qDdfnVhFxYSUg6eADduJ3uj")


def is_transaction_successful(logs):
for log in logs:
if "AnchorError thrown" in log or "Error" in log:
print(f"[ERROR] Transaction failed: {log}")
return False
return True


async def process_transaction(signature: str):
client = AsyncClient(RPC_ENDPOINT)
signature = Signature.from_string(signature)

try:
resp = await client.get_transaction(
signature,
encoding="jsonParsed",
commitment="confirmed",
max_supported_transaction_version=0,
)
except Exception as e:
print(f"[ERROR] Failed to get transaction data time: {e}")
return

Comment on lines +36 to +50
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix resource leak - AsyncClient not properly closed.

The AsyncClient instance is created but never explicitly closed, which can lead to resource leaks and connection issues.

 async def process_transaction(signature: str):
-    client = AsyncClient(RPC_ENDPOINT)
+    async with AsyncClient(RPC_ENDPOINT) as client:
+        signature = Signature.from_string(signature)
+
+        try:
+            resp = await client.get_transaction(
+                signature,
+                encoding="jsonParsed",
+                commitment="confirmed",
+                max_supported_transaction_version=0,
+            )
+        except Exception as e:
+            print(f"[ERROR] Failed to get transaction data time: {e}")
+            return

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In learning_examples_py/migrations_cpswap/listen_logsubscribe.py around lines 37
to 51, the AsyncClient instance is created but not closed, causing a resource
leak. Modify the code to ensure the AsyncClient is properly closed after use by
either using an async context manager (async with) when creating the client or
explicitly calling await client.close() in a finally block after the try-except
to guarantee cleanup regardless of success or failure.

# retrying if the node is not fully synced
if not resp.value:
await asyncio.sleep(5)
resp = await client.get_transaction(
signature,
encoding="jsonParsed",
commitment="confirmed",
max_supported_transaction_version=0,
)

if not resp.value:
print(f"[ERROR] Transaction not found: {signature}")
return

Comment on lines +51 to +64
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve retry logic and add transaction success validation.

The retry logic should also use the async context manager, and you should validate transaction success before processing.

-    # retrying if the node is not fully synced
-    if not resp.value:
-        await asyncio.sleep(5)
-        resp = await client.get_transaction(
-            signature,
-            encoding="jsonParsed",
-            commitment="confirmed",
-            max_supported_transaction_version=0,
-        )
-        
-        if not resp.value:
-            print(f"[ERROR] Transaction not found: {signature}")
-            return
+        # retrying if the node is not fully synced
+        if not resp.value:
+            await asyncio.sleep(5)
+            resp = await client.get_transaction(
+                signature,
+                encoding="jsonParsed",
+                commitment="confirmed",
+                max_supported_transaction_version=0,
+            )
+            
+            if not resp.value:
+                print(f"[ERROR] Transaction not found: {signature}")
+                return
+        
+        # Check if transaction was successful before processing
+        if not is_transaction_successful(resp.value.transaction.meta.log_messages or []):
+            return

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In learning_examples_py/migrations_cpswap/listen_logsubscribe.py around lines 52
to 65, improve the retry logic by using an async context manager for the client
calls to ensure proper resource handling. Additionally, after retrying, validate
that the transaction was successful by checking the transaction status before
proceeding. Modify the code to use 'async with' for the client and add a check
for transaction success to avoid processing failed transactions.

instructions = resp.value.transaction.transaction.message.instructions

for instruction in instructions:
if instruction.program_id == RAYDIUM_LAUNCHPAD_PROGRAM_ID and instruction.data == "PotQtwz6wf1":
if len(instruction.accounts) == 38:
token_mint = instruction.accounts[1]
print(f"[INFO] Token migrated to cpswap: {token_mint}")
# TODO : use the idl parser and get the more details for the pool and token
break


Comment on lines +65 to +75
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Address magic values and simplify nested conditions.

The hardcoded instruction data and account count make the code brittle. Consider using constants or configuration.

+    # Constants for migration instruction identification
+    MIGRATE_TO_CPSWAP_INSTRUCTION_DATA = "PotQtwz6wf1"
+    EXPECTED_ACCOUNT_COUNT = 38
+    
     instructions = resp.value.transaction.transaction.message.instructions
     
     for instruction in instructions:
-        if instruction.program_id == RAYDIUM_LAUNCHPAD_PROGRAM_ID and instruction.data == "PotQtwz6wf1":
-            if len(instruction.accounts) == 38:
-                token_mint = instruction.accounts[1]
-                print(f"[INFO] Token migrated to cpswap: {token_mint}")
-                # TODO : use the idl parser and get the more details for the pool and token
-                break
+        if (instruction.program_id == RAYDIUM_LAUNCHPAD_PROGRAM_ID and 
+            instruction.data == MIGRATE_TO_CPSWAP_INSTRUCTION_DATA and 
+            len(instruction.accounts) == EXPECTED_ACCOUNT_COUNT):
+            token_mint = instruction.accounts[1]
+            print(f"[INFO] Token migrated to cpswap: {token_mint}")
+            # TODO : use the idl parser and get the more details for the pool and token
+            break
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
instructions = resp.value.transaction.transaction.message.instructions
for instruction in instructions:
if instruction.program_id == RAYDIUM_LAUNCHPAD_PROGRAM_ID and instruction.data == "PotQtwz6wf1":
if len(instruction.accounts) == 38:
token_mint = instruction.accounts[1]
print(f"[INFO] Token migrated to cpswap: {token_mint}")
# TODO : use the idl parser and get the more details for the pool and token
break
# Constants for migration instruction identification
MIGRATE_TO_CPSWAP_INSTRUCTION_DATA = "PotQtwz6wf1"
EXPECTED_ACCOUNT_COUNT = 38
instructions = resp.value.transaction.transaction.message.instructions
for instruction in instructions:
if (instruction.program_id == RAYDIUM_LAUNCHPAD_PROGRAM_ID and
instruction.data == MIGRATE_TO_CPSWAP_INSTRUCTION_DATA and
len(instruction.accounts) == EXPECTED_ACCOUNT_COUNT):
token_mint = instruction.accounts[1]
print(f"[INFO] Token migrated to cpswap: {token_mint}")
# TODO: use the idl parser and get more details for the pool and token
break
🧰 Tools
🪛 Ruff (0.12.2)

69-70: Use a single if statement instead of nested if statements

(SIM102)

🤖 Prompt for AI Agents
In learning_examples_py/migrations_cpswap/listen_logsubscribe.py around lines 66
to 76, replace the hardcoded instruction data string "PotQtwz6wf1" and the
account count 38 with named constants defined at the top of the file or in a
configuration file. This will improve code readability and maintainability.
Also, simplify the nested if conditions by combining them into a single
conditional statement using these constants.

async def listen_for_migrations():
while True:
try:
print("\n[INFO] Connecting to WebSocket ...")
async with websockets.connect(WSS_ENDPOINT) as websocket:
subscription_message = json.dumps(
{
"jsonrpc": "2.0",
"id": 1,
"method": "logsSubscribe",
"params": [
{"mentions": [str(RAYDIUM_LAUNCHPAD_PROGRAM_ID)]},
{"commitment": "confirmed"},
],
}
)
await websocket.send(subscription_message)
print(
f"[INFO] Listening for migration instructions from program: {RAYDIUM_LAUNCHPAD_PROGRAM_ID}"
)

response = await websocket.recv()
print(f"[INFO] Subscription response: {response}")

while True:
try:
response = await asyncio.wait_for(websocket.recv(), timeout=60)
data = json.loads(response)
log_data = data["params"]["result"]["value"]
logs = log_data.get("logs", [])
signature = log_data.get("signature", "unknown")

is_migrated = any(
"Program log: Instruction: MigrateToCpswap" == log
for log in logs
)
if not is_migrated:
continue

asyncio.create_task(process_transaction(signature))

Comment on lines +108 to +116
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Fix Yoda condition and consider task management.

The condition comparison should follow normal convention, and consider tracking created tasks to prevent accumulation.

         is_migrated = any(
-            "Program log: Instruction: MigrateToCpswap" == log
+            log == "Program log: Instruction: MigrateToCpswap"
             for log in logs
         )
         if not is_migrated:
             continue

-        asyncio.create_task(process_transaction(signature))
+        task = asyncio.create_task(process_transaction(signature))
+        # Optional: Add task to a set and clean up completed tasks periodically
+        # to prevent memory accumulation in long-running processes
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
is_migrated = any(
"Program log: Instruction: MigrateToCpswap" == log
for log in logs
)
if not is_migrated:
continue
asyncio.create_task(process_transaction(signature))
is_migrated = any(
log == "Program log: Instruction: MigrateToCpswap"
for log in logs
)
if not is_migrated:
continue
task = asyncio.create_task(process_transaction(signature))
# Optional: Add task to a set and clean up completed tasks periodically
# to prevent memory accumulation in long-running processes
🧰 Tools
🪛 Ruff (0.12.2)

110-110: Yoda condition detected

Rewrite as log == "Program log: Instruction: MigrateToCpswap"

(SIM300)

🤖 Prompt for AI Agents
In learning_examples_py/migrations_cpswap/listen_logsubscribe.py around lines
109 to 117, the condition uses a Yoda style comparison which should be reversed
to follow normal convention by placing the variable on the left side.
Additionally, instead of just creating tasks with asyncio.create_task without
tracking, store the created tasks in a list or set to manage them properly and
avoid unbounded accumulation. Implement a mechanism to track and await or cancel
these tasks as needed.

except TimeoutError:
print("[INFO] No new messages received, continuing...")
except Exception as e:
print(f"[ERROR] Error receiving message: {e}")
break

except Exception as e:
print(f"[ERROR] Connection error: {e}")
print("[INFO] Retrying in 5 seconds...")
await asyncio.sleep(5)


if __name__ == "__main__":
asyncio.run(listen_for_migrations())