Skip to content

Commit f579aa6

Browse files
authored
Merge pull request #23 from tonlabs/0.27.1-rc
Version 0.27.1
2 parents f814027 + 44eb894 commit f579aa6

File tree

8 files changed

+56
-176
lines changed

8 files changed

+56
-176
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
# Release Notes
22
All notable changes to this project will be documented in this file.
33

4+
## 0.27.1 Apr 28, 2021
5+
### Fixed
6+
- Subscriptions for blocks, transactions and messages do not trigger multiple times any more.
7+
48
## 0.27.0 Apr 20, 2021
59
### New
610
- Support of blockchain config parameters.

ton-node-se/ton_node/src/node_engine/blocks_finality.rs

Lines changed: 20 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -120,10 +120,7 @@ debug!("FINBLK {:?}", hashes);
120120
sb.block.block().clone(),
121121
sb.shard_state.clone(),
122122
is_sync,
123-
BlockProcessingStatus::Finalized,
124-
MessageProcessingStatus::Finalized,
125-
TransactionProcessingStatus::Finalized
126-
);
123+
);
127124

128125
if res.is_err() {
129126
warn!(target: "node", "reflect_block_in_db(Finalized) error: {:?}", res.unwrap_err());
@@ -350,9 +347,7 @@ debug!("FINBLK {:?}", hashes);
350347
block: Block,
351348
shard_state: Arc<ShardStateUnsplit>,
352349
is_sync: bool,
353-
block_status: BlockProcessingStatus,
354-
msg_status: MessageProcessingStatus,
355-
tr_status: TransactionProcessingStatus) -> NodeResult<()> {
350+
) -> NodeResult<()> {
356351

357352
if !is_sync {
358353
if let Some(db) = self.db.clone() {
@@ -369,36 +364,42 @@ debug!("FINBLK {:?}", hashes);
369364

370365
extra.read_in_msg_descr()?.iterate_objects(|in_msg| {
371366
let msg = in_msg.read_message()?;
372-
debug!(target: "node", "PUT-IN-MESSAGE-BLOCK {}", msg.hash()?.to_hex_string());
367+
debug!(target: "node", "PUT-IN-MESSAGE-BLOCK {}", msg.hash()?.to_hex_string());
373368
// msg.prepare_proof_for_json(&block_info_cells, &block_root)?;
374369
// msg.prepare_boc_for_json()?;
375370
let transaction_id = in_msg.transaction_cell().map(|cell| cell.repr_hash());
376371
let transaction_now = in_msg.read_transaction()?.map(|transaction| transaction.now());
377-
let res = db.put_message(msg, msg_status, transaction_id, transaction_now, Some(block_id.clone()));
378-
if res.is_err() {
379-
warn!(target: "node", "reflect message to DB(1). error: {}", res.unwrap_err());
380-
}
372+
db.put_message(
373+
msg,
374+
transaction_id,
375+
transaction_now,
376+
Some(block_id.clone())
377+
).map_err(|err| warn!(target: "node", "reflect message to DB(1). error: {}", err))
378+
.ok();
381379
Ok(true)
382380
})?;
383381

384382
debug!(target: "node", "in_msg_descr.iterate - success");
385383

386-
387384
extra.read_out_msg_descr()?.iterate_objects(|out_msg| {
388385
let msg = out_msg.read_message()?.unwrap();
389386
debug!(target: "node", "PUT-OUT-MESSAGE-BLOCK {:?}", msg);
390387
// msg1.prepare_proof_for_json(&block_info_cells, &block_root)?;
391388
// msg1.prepare_boc_for_json()?;
392389
let transaction_id = out_msg.transaction_cell().map(|cell| cell.repr_hash());
393-
let res = db.put_message(msg, msg_status, transaction_id, None, Some(block_id.clone()));
394-
if res.is_err() {
395-
warn!(target: "node", "reflect message to DB(2). error: {}", res.unwrap_err());
396-
}
390+
db.put_message(
391+
msg,
392+
transaction_id,
393+
None,
394+
Some(block_id.clone())
395+
).map_err(|err| warn!(target: "node", "reflect message to DB(2). error: {}", err))
396+
.ok();
397397
Ok(true)
398398
})?;
399399

400400
debug!(target: "node", "out_msg_descr.iterate - success");
401401

402+
402403
let mut changed_acc = HashSet::new();
403404

404405
extra.read_account_blocks()?.iterate_with_keys(|account_id, account_block| {
@@ -413,7 +414,7 @@ debug!(target: "node", "PUT-TRANSACTION-BLOCK {}", transaction.hash()?.to_hex_st
413414
orig_status = Some(transaction.orig_status.clone());
414415
}
415416
end_status = Some(transaction.end_status.clone());
416-
if let Err(err) = db.put_transaction(transaction, tr_status, Some(block_id.clone()), workchain_id) {
417+
if let Err(err) = db.put_transaction(transaction, Some(block_id.clone()), workchain_id) {
417418
warn!(target: "node", "reflect transaction to DB. error: {}", err);
418419
}
419420
Ok(true)
@@ -449,7 +450,7 @@ debug!(target: "node", "PUT-TRANSACTION-BLOCK {}", transaction.hash()?.to_hex_st
449450

450451
debug!(target: "node", "accounts.iterate - success");
451452

452-
db.put_block(block, block_status.clone())?;
453+
db.put_block(block)?;
453454
}
454455
}
455456
Ok(())
@@ -483,20 +484,6 @@ debug!(target: "node", "NO-BLOCK {:?}", finality_hash);
483484
info!(target: "node", "FINALITY: add block. hash: {:?}", block_hash);
484485
info!(target: "node", "FINALITY: block seq_no: {:?}", sblock.block().read_info()?.seq_no());
485486

486-
debug!(target: "node", "PUT-BLOCK {:?}", finality_hash);
487-
let res = self.reflect_block_in_db(
488-
sblock.block().clone(),
489-
shard_state.clone(),
490-
is_sync,
491-
BlockProcessingStatus::Proposed,
492-
MessageProcessingStatus::Proposed,
493-
TransactionProcessingStatus::Proposed
494-
);
495-
496-
if res.is_err() {
497-
warn!(target: "node", "reflect_block_in_db(Proposed) error: {:?}", res.unwrap_err());
498-
}
499-
500487
let sb = Box::new(ShardBlock::with_block_and_state(
501488
sblock,
502489
sblock_data,
@@ -617,28 +604,6 @@ debug!(target: "node", "PUT-BLOCK-HASH {:?}", sb.block_hash);
617604
/// reset block finality
618605
/// clean all maps, load last finalized data
619606
fn reset(&mut self) -> NodeResult<()> {
620-
{
621-
let mut prev_shard_state = Arc::new(ShardStateUnsplit::default());
622-
let prev_seq_no = self.current_block.seq_no - 1; // take previous seq_no, this func will not be called if current seq_no=0
623-
let prev_sb = self.blocks_by_no.get(&prev_seq_no); // get previous ShardState,
624-
if prev_sb.is_some() {
625-
prev_shard_state = self.stored_to_loaded(prev_sb.unwrap().clone())?.shard_state.clone();
626-
}
627-
628-
// update states in DB and restore previous state of accounts
629-
let res = self.reflect_block_in_db(
630-
self.current_block.block.block().clone(),
631-
prev_shard_state,
632-
false,
633-
BlockProcessingStatus::Refused,
634-
MessageProcessingStatus::Refused,
635-
TransactionProcessingStatus::Refused
636-
);
637-
638-
if res.is_err() {
639-
warn!(target: "node", "reflect_block_in_db(Refused) error: {:?}", res.unwrap_err());
640-
}
641-
}
642607
self.current_block = self.last_finalized_block.clone();
643608
// remove files from disk
644609
for (hash, _sb) in self.blocks_by_hash.iter() {

ton-node-se/ton_node/src/node_engine/messages.rs

Lines changed: 2 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ pub struct MessagesProcessor<T> where
3232
tr_storage: Arc<T>,
3333
queue: Arc<InMessagesQueue>,
3434
shard_id: ShardIdent,
35-
db: Arc<Box<dyn DocumentsDb>>,
3635
blockchain_config: BlockchainConfig,
3736
executors: Arc<Mutex<HashMap<AccountId, Arc<Mutex<OrdinaryTransactionExecutor>>>>>,
3837
}
@@ -45,7 +44,6 @@ impl<T> MessagesProcessor<T> where
4544
tr_storage: Arc<T>,
4645
shard_id: ShardIdent,
4746
blockchain_config: BlockchainConfig,
48-
db: Arc<Box<dyn DocumentsDb>>,
4947
) -> Self {
5048
// make clone for changes
5149
//let shard_state_new = shard_state.lock().unwrap().clone();
@@ -54,7 +52,6 @@ impl<T> MessagesProcessor<T> where
5452
tr_storage,
5553
queue,
5654
shard_id,
57-
db,
5855
blockchain_config,
5956
executors: Arc::new(Mutex::new(HashMap::new())),
6057
}
@@ -378,12 +375,6 @@ let now = Instant::now();
378375
while start_time.elapsed() < timeout {
379376

380377
if let Some(msg) = self.queue.dequeue_first_unused() {
381-
382-
let res = self.db.put_message(msg.message().clone(), MessageProcessingStatus::Processing, None, None, None);
383-
if res.is_err() {
384-
warn!(target: "node", "generate_block_multi reflect to db failed. error: {}", res.unwrap_err());
385-
}
386-
387378
let acc_id = msg.message().int_dst_account_id().unwrap();
388379

389380
// lock account in queue
@@ -777,40 +768,23 @@ impl InMessagesQueue {
777768

778769
storage.insert(msg.clone());
779770
debug!(target: "node", "Queued message: {:?}", msg.message());
780-
781-
// write message into kafka with "queued" status
782-
if let Some(db) = self.db.as_ref() {
783-
let res = db.put_message(msg.message().clone(), MessageProcessingStatus::Queued, None, None, None);
784-
if res.is_err() {
785-
log::error!(target: "node", "failed reflect to db queue message to internal queue. error: {}", res.unwrap_err());
786-
}
787-
}
788771

789772
Ok(())
790773

791774
}
792775

793776
/// Include message into begin queue
794777
fn priority_queue(&self, msg: QueuedMessage) -> std::result::Result<(), QueuedMessage> {
795-
796778
if !self.is_message_to_current_node(msg.message()) {
797779
return self.route_message_to_other_node(msg);
798780
}
799781

800782
let mut storage = self.storage.lock();
801-
802-
// write message into kafka with "queued" status
803-
if let Some(db) = self.db.as_ref() {
804-
let res = db.put_message(msg.message().clone(), MessageProcessingStatus::Queued, None, None, None);
805-
if res.is_err() {
806-
log::error!(target: "node", "failed reflect to db queue message to internal priority queue. error: {}", res.unwrap_err());
807-
}
808-
}
809-
debug!(target: "node", "Priority queued message: {:?}", msg.message());
783+
let msg_str = format!("{:?}", msg.message());
810784
storage.insert(msg);
785+
debug!(target: "node", "Priority queued message: {}", msg_str);
811786

812787
Ok(())
813-
814788
}
815789

816790
/// Extract oldest message from queue.

ton-node-se/ton_node/src/node_engine/mod.rs

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,11 @@ use std::thread::{JoinHandle};
1515
use std::time::{Duration, SystemTime, UNIX_EPOCH};
1616
use ton_labs_assembler::compile_code;
1717
use ton_block::{
18-
Account, BlkPrevInfo, Block, BlockProcessingStatus, CommonMsgInfo,
18+
Account, BlkPrevInfo, Block, CommonMsgInfo,
1919
CurrencyCollection, ExtBlkRef, ExternalInboundMessageHeader, GetRepresentationHash,
20-
Grams, InternalMessageHeader, Message, MessageProcessingStatus,
20+
Grams, InternalMessageHeader, Message,
2121
MsgAddressExt, MsgAddressInt, Serializable, Deserializable, ShardStateUnsplit,
22-
ShardIdent, StateInit, Transaction, TransactionProcessingStatus, SignedBlock,
22+
ShardIdent, StateInit, Transaction, SignedBlock,
2323
};
2424
use ton_types::{ Cell, SliceData };
2525
use ton_types::types::{ UInt256, AccountId, ByteOrderRead };
@@ -464,12 +464,23 @@ pub trait MessagesReceiver: Send {
464464
pub trait DocumentsDb: Send + Sync {
465465
fn put_account(&self, acc: Account) -> NodeResult<()>;
466466
fn put_deleted_account(&self, workchain_id: i32, account_id: AccountId) -> NodeResult<()>;
467-
fn put_block(&self, block: Block, status: BlockProcessingStatus) -> NodeResult<()>;
468-
fn put_message(&self, msg: Message, status: MessageProcessingStatus,
469-
transaction_id: Option<UInt256>, transaction_now: Option<u32>,
470-
block_id: Option<UInt256>) -> NodeResult<()>;
471-
fn put_transaction(&self, tr: Transaction, status: TransactionProcessingStatus,
472-
block_id: Option<UInt256>, workchain_id: i32) -> NodeResult<()>;
467+
fn put_block(&self, block: Block) -> NodeResult<()>;
468+
469+
fn put_message(
470+
&self,
471+
msg: Message,
472+
transaction_id: Option<UInt256>,
473+
transaction_now: Option<u32>,
474+
block_id: Option<UInt256>
475+
) -> NodeResult<()>;
476+
477+
fn put_transaction(
478+
&self,
479+
tr: Transaction,
480+
block_id: Option<UInt256>,
481+
workchain_id: i32
482+
) -> NodeResult<()>;
483+
473484
fn has_delivery_problems(&self) -> bool;
474485
}
475486

@@ -483,14 +494,14 @@ impl DocumentsDb for DocumentsDbMock {
483494
Ok(())
484495
}
485496

486-
fn put_block(&self, _: Block, _: BlockProcessingStatus) -> NodeResult<()> {
497+
fn put_block(&self, _: Block) -> NodeResult<()> {
487498
Ok(())
488499
}
489500

490-
fn put_message(&self, _: Message, _: MessageProcessingStatus, _: Option<UInt256>, _: Option<u32>,
501+
fn put_message(&self, _: Message, _: Option<UInt256>, _: Option<u32>,
491502
_: Option<UInt256>) -> NodeResult<()> { Ok(()) }
492503

493-
fn put_transaction(&self, _: Transaction, _: TransactionProcessingStatus,
504+
fn put_transaction(&self, _: Transaction,
494505
_: Option<UInt256>, _: i32) -> NodeResult<()> {
495506
Ok(())
496507
}

ton-node-se/ton_node/src/node_engine/ton_node_engine.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,6 @@ impl TonNodeEngine {
260260
storage.clone(),
261261
shard.clone(),
262262
blockchain_config,
263-
documents_db.clone(),
264263
)
265264
)),
266265
finalizer: block_finality.clone(),

ton-node-se/ton_node/src/node_engine/ton_node_handlers.rs

Lines changed: 0 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,6 @@ pub fn init_ton_node_handlers(ton: &TonNodeEngine) {
3030

3131
let request = networkprotocol::SendMessageRequest::default();
3232
ton.register_response_callback(request.into_boxed(), TonNodeEngine::process_request_send_message);
33-
34-
let request = networkprotocol::ReflectToDbRequest::default();
35-
ton.register_response_callback(request.into_boxed(), TonNodeEngine::process_request_reflect_to_db);
3633
}
3734

3835
impl TonNodeEngine {
@@ -59,13 +56,6 @@ debug!("VALIDATOR SET {:?}", vals);
5956

6057
// Drop inbound external message if no validators
6158
if (vals.len() == 0) && msg.message().is_inbound_external() {
62-
if let Err(res) = self.db.put_message(
63-
msg.message().clone(),
64-
MessageProcessingStatus::Refused,
65-
None, None, None)
66-
{
67-
warn!(target: "node", "reflect message reject to DB(1). error: {}", res);
68-
}
6959
continue;
7060
}
7161

@@ -361,69 +351,6 @@ debug!(target: "node", "SEND ROUTING MSG {} -> {}", self.validator_index(), next
361351

362352
}
363353
}
364-
365-
pub fn reflect_transaction_to_db(
366-
db: Arc<Box<dyn DocumentsDb>>,
367-
transaction: Transaction,
368-
account: Option<Account>, workchain_id: i32) -> NodeResult<()> {
369-
370-
let transaction_id = transaction.hash().ok();
371-
if let Ok(Some(in_msg)) = transaction.read_in_msg() {
372-
let _res = db.put_message(
373-
in_msg.clone(),
374-
MessageProcessingStatus::Preliminary,
375-
transaction_id.clone(), Some(transaction.now()), None
376-
)?;
377-
}
378-
379-
transaction.iterate_out_msgs(&mut |msg| {
380-
db.put_message(
381-
msg,
382-
MessageProcessingStatus::Preliminary,
383-
transaction_id.clone(), None, None
384-
).map_err(|_| failure::format_err!("put_to_db error"))?;
385-
Ok(true)
386-
})?;
387-
388-
db.put_transaction(
389-
transaction,
390-
TransactionProcessingStatus::Preliminary,
391-
None,
392-
workchain_id
393-
)?;
394-
395-
if let Some(account) = account {
396-
db.put_account(account)?;
397-
}
398-
399-
Ok(())
400-
}
401-
402-
fn process_request_reflect_to_db(&self, _io: &dyn NetworkContext, _peer: &PeerId, request: NetworkProtocol) -> NodeResult<NetworkProtocol> {
403-
match request {
404-
NetworkProtocol::TonEngine_NetworkProtocol_ReflectToDbRequest(request) => {
405-
//info!(target: "node", "!!!! Send Reflect to DB Request !!!!");
406-
407-
let transaction = Transaction::construct_from_bytes(&request.transaction.0)?;
408-
let mut workchain_id = 0;
409-
let mut acc = None;
410-
if !request.account.0.is_empty() {
411-
let account = Account::construct_from_bytes(&request.account.0)?;
412-
workchain_id = account.get_addr().map(|addr| addr.get_workchain_id()).unwrap_or_default();
413-
acc = Some(account);
414-
}
415-
416-
// TODO write transaction to DB with prelimitary state
417-
Self::reflect_transaction_to_db(self.db.clone(), transaction, acc, workchain_id)?;
418-
}
419-
_ => return node_err!(NodeErrorKind::TlIncompatiblePacketType),
420-
421-
};
422-
Ok(networkprotocol::ReflectToDbResponse{
423-
id: 0,
424-
result: 0,
425-
}.into_boxed())
426-
}
427354
}
428355

429356

0 commit comments

Comments
 (0)