-
Notifications
You must be signed in to change notification settings - Fork 3
feat(examples): add listen logsubscribe for cpswap migrations #1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
feat(examples): add listen logsubscribe for cpswap migrations #1
Conversation
WalkthroughA new Python script has been added to monitor and process migration events from the Raydium Launchpad program on the Solana blockchain. The script listens for specific log instructions via WebSocket, fetches transaction details for migrations to CPSwap, parses relevant data, and handles errors and retries. Changes
Sequence Diagram(s)sequenceDiagram
participant Script
participant SolanaWebSocket
participant SolanaRPC
Script->>SolanaWebSocket: Subscribe to Raydium Launchpad logs
SolanaWebSocket-->>Script: Emit log event
Script->>Script: Filter for "MigrateToCpswap" instruction
alt If relevant instruction found
Script->>SolanaRPC: Fetch transaction details by signature
SolanaRPC-->>Script: Return transaction details
Script->>Script: Parse and identify token mint
Script->>Script: Print migration info
else If not relevant
Script->>Script: Ignore event
end
Script->>SolanaWebSocket: Continue listening (loop)
Estimated code review effort🎯 2 (Simple) | ⏱️ ~7 minutes Poem
Note ⚡️ Unit Test Generation is now available in beta!Learn more here, or try it out under "Finishing Touches" below. 📜 Recent review detailsConfiguration used: CodeRabbit UI 📒 Files selected for processing (1)
🧰 Additional context used🪛 Ruff (0.12.2)learning_examples_py/migrations_cpswap/listen_logsubscribe.py68-69: Use a single (SIM102) 109-109: Yoda condition detected Rewrite as (SIM300) 🔇 Additional comments (3)
✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 5
🧹 Nitpick comments (1)
learning_examples_py/migrations_cpswap/listen_logsubscribe.py (1)
29-35
: Consider more specific error detection.The current error detection using generic "Error" string matching might be too broad and could cause false positives. Consider being more specific about the error patterns you're looking for.
def is_transaction_successful(logs): for log in logs: - if "AnchorError thrown" in log or "Error" in log: + if "AnchorError thrown" in log or "Program failed to complete" in log or "custom program error" in log: print(f"[ERROR] Transaction failed: {log}") return False return True
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
learning_examples_py/migrations_cpswap/listen_logsubscribe.py
(1 hunks)
🧰 Additional context used
🪛 Ruff (0.12.2)
learning_examples_py/migrations_cpswap/listen_logsubscribe.py
18-18: Redefinition of unused AsyncClient
from line 16
Remove definition: AsyncClient
(F811)
69-70: Use a single if
statement instead of nested if
statements
(SIM102)
110-110: Yoda condition detected
Rewrite as log == "Program log: Instruction: MigrateToCpswap"
(SIM300)
🔇 Additional comments (1)
learning_examples_py/migrations_cpswap/listen_logsubscribe.py (1)
130-131
: LGTM!The entry point is correctly implemented for an async application.
async def process_transaction(signature: str): | ||
client = AsyncClient(RPC_ENDPOINT) | ||
signature = Signature.from_string(signature) | ||
|
||
try: | ||
resp = await client.get_transaction( | ||
signature, | ||
encoding="jsonParsed", | ||
commitment="confirmed", | ||
max_supported_transaction_version=0, | ||
) | ||
except Exception as e: | ||
print(f"[ERROR] Failed to get transaction data time: {e}") | ||
return | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix resource leak - AsyncClient not properly closed.
The AsyncClient
instance is created but never explicitly closed, which can lead to resource leaks and connection issues.
async def process_transaction(signature: str):
- client = AsyncClient(RPC_ENDPOINT)
+ async with AsyncClient(RPC_ENDPOINT) as client:
+ signature = Signature.from_string(signature)
+
+ try:
+ resp = await client.get_transaction(
+ signature,
+ encoding="jsonParsed",
+ commitment="confirmed",
+ max_supported_transaction_version=0,
+ )
+ except Exception as e:
+ print(f"[ERROR] Failed to get transaction data time: {e}")
+ return
Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In learning_examples_py/migrations_cpswap/listen_logsubscribe.py around lines 37
to 51, the AsyncClient instance is created but not closed, causing a resource
leak. Modify the code to ensure the AsyncClient is properly closed after use by
either using an async context manager (async with) when creating the client or
explicitly calling await client.close() in a finally block after the try-except
to guarantee cleanup regardless of success or failure.
# retrying if the node is not fully synced | ||
if not resp.value: | ||
await asyncio.sleep(5) | ||
resp = await client.get_transaction( | ||
signature, | ||
encoding="jsonParsed", | ||
commitment="confirmed", | ||
max_supported_transaction_version=0, | ||
) | ||
|
||
if not resp.value: | ||
print(f"[ERROR] Transaction not found: {signature}") | ||
return | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Improve retry logic and add transaction success validation.
The retry logic should also use the async context manager, and you should validate transaction success before processing.
- # retrying if the node is not fully synced
- if not resp.value:
- await asyncio.sleep(5)
- resp = await client.get_transaction(
- signature,
- encoding="jsonParsed",
- commitment="confirmed",
- max_supported_transaction_version=0,
- )
-
- if not resp.value:
- print(f"[ERROR] Transaction not found: {signature}")
- return
+ # retrying if the node is not fully synced
+ if not resp.value:
+ await asyncio.sleep(5)
+ resp = await client.get_transaction(
+ signature,
+ encoding="jsonParsed",
+ commitment="confirmed",
+ max_supported_transaction_version=0,
+ )
+
+ if not resp.value:
+ print(f"[ERROR] Transaction not found: {signature}")
+ return
+
+ # Check if transaction was successful before processing
+ if not is_transaction_successful(resp.value.transaction.meta.log_messages or []):
+ return
Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In learning_examples_py/migrations_cpswap/listen_logsubscribe.py around lines 52
to 65, improve the retry logic by using an async context manager for the client
calls to ensure proper resource handling. Additionally, after retrying, validate
that the transaction was successful by checking the transaction status before
proceeding. Modify the code to use 'async with' for the client and add a check
for transaction success to avoid processing failed transactions.
instructions = resp.value.transaction.transaction.message.instructions | ||
|
||
for instruction in instructions: | ||
if instruction.program_id == RAYDIUM_LAUNCHPAD_PROGRAM_ID and instruction.data == "PotQtwz6wf1": | ||
if len(instruction.accounts) == 38: | ||
token_mint = instruction.accounts[1] | ||
print(f"[INFO] Token migrated to cpswap: {token_mint}") | ||
# TODO : use the idl parser and get the more details for the pool and token | ||
break | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Address magic values and simplify nested conditions.
The hardcoded instruction data and account count make the code brittle. Consider using constants or configuration.
+ # Constants for migration instruction identification
+ MIGRATE_TO_CPSWAP_INSTRUCTION_DATA = "PotQtwz6wf1"
+ EXPECTED_ACCOUNT_COUNT = 38
+
instructions = resp.value.transaction.transaction.message.instructions
for instruction in instructions:
- if instruction.program_id == RAYDIUM_LAUNCHPAD_PROGRAM_ID and instruction.data == "PotQtwz6wf1":
- if len(instruction.accounts) == 38:
- token_mint = instruction.accounts[1]
- print(f"[INFO] Token migrated to cpswap: {token_mint}")
- # TODO : use the idl parser and get the more details for the pool and token
- break
+ if (instruction.program_id == RAYDIUM_LAUNCHPAD_PROGRAM_ID and
+ instruction.data == MIGRATE_TO_CPSWAP_INSTRUCTION_DATA and
+ len(instruction.accounts) == EXPECTED_ACCOUNT_COUNT):
+ token_mint = instruction.accounts[1]
+ print(f"[INFO] Token migrated to cpswap: {token_mint}")
+ # TODO : use the idl parser and get the more details for the pool and token
+ break
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
instructions = resp.value.transaction.transaction.message.instructions | |
for instruction in instructions: | |
if instruction.program_id == RAYDIUM_LAUNCHPAD_PROGRAM_ID and instruction.data == "PotQtwz6wf1": | |
if len(instruction.accounts) == 38: | |
token_mint = instruction.accounts[1] | |
print(f"[INFO] Token migrated to cpswap: {token_mint}") | |
# TODO : use the idl parser and get the more details for the pool and token | |
break | |
# Constants for migration instruction identification | |
MIGRATE_TO_CPSWAP_INSTRUCTION_DATA = "PotQtwz6wf1" | |
EXPECTED_ACCOUNT_COUNT = 38 | |
instructions = resp.value.transaction.transaction.message.instructions | |
for instruction in instructions: | |
if (instruction.program_id == RAYDIUM_LAUNCHPAD_PROGRAM_ID and | |
instruction.data == MIGRATE_TO_CPSWAP_INSTRUCTION_DATA and | |
len(instruction.accounts) == EXPECTED_ACCOUNT_COUNT): | |
token_mint = instruction.accounts[1] | |
print(f"[INFO] Token migrated to cpswap: {token_mint}") | |
# TODO: use the idl parser and get more details for the pool and token | |
break |
🧰 Tools
🪛 Ruff (0.12.2)
69-70: Use a single if
statement instead of nested if
statements
(SIM102)
🤖 Prompt for AI Agents
In learning_examples_py/migrations_cpswap/listen_logsubscribe.py around lines 66
to 76, replace the hardcoded instruction data string "PotQtwz6wf1" and the
account count 38 with named constants defined at the top of the file or in a
configuration file. This will improve code readability and maintainability.
Also, simplify the nested if conditions by combining them into a single
conditional statement using these constants.
is_migrated = any( | ||
"Program log: Instruction: MigrateToCpswap" == log | ||
for log in logs | ||
) | ||
if not is_migrated: | ||
continue | ||
|
||
asyncio.create_task(process_transaction(signature)) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Fix Yoda condition and consider task management.
The condition comparison should follow normal convention, and consider tracking created tasks to prevent accumulation.
is_migrated = any(
- "Program log: Instruction: MigrateToCpswap" == log
+ log == "Program log: Instruction: MigrateToCpswap"
for log in logs
)
if not is_migrated:
continue
- asyncio.create_task(process_transaction(signature))
+ task = asyncio.create_task(process_transaction(signature))
+ # Optional: Add task to a set and clean up completed tasks periodically
+ # to prevent memory accumulation in long-running processes
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
is_migrated = any( | |
"Program log: Instruction: MigrateToCpswap" == log | |
for log in logs | |
) | |
if not is_migrated: | |
continue | |
asyncio.create_task(process_transaction(signature)) | |
is_migrated = any( | |
log == "Program log: Instruction: MigrateToCpswap" | |
for log in logs | |
) | |
if not is_migrated: | |
continue | |
task = asyncio.create_task(process_transaction(signature)) | |
# Optional: Add task to a set and clean up completed tasks periodically | |
# to prevent memory accumulation in long-running processes |
🧰 Tools
🪛 Ruff (0.12.2)
110-110: Yoda condition detected
Rewrite as log == "Program log: Instruction: MigrateToCpswap"
(SIM300)
🤖 Prompt for AI Agents
In learning_examples_py/migrations_cpswap/listen_logsubscribe.py around lines
109 to 117, the condition uses a Yoda style comparison which should be reversed
to follow normal convention by placing the variable on the left side.
Additionally, instead of just creating tasks with asyncio.create_task without
tracking, store the created tasks in a list or set to manage them properly and
avoid unbounded accumulation. Implement a mechanism to track and await or cancel
these tasks as needed.
Added a Logsubscribe listener for the cpswap migrations.
Summary by CodeRabbit