use crate::{
	api::api_types::{
		Action, ActionOptions, Connection, DsnpKeys, ImportBundle, PrivacyType, Update,
	},
	dsnp::{
		dsnp_types::{DsnpGraphEdge, DsnpPublicKey, DsnpUserId},
		reader_writer::DsnpReader,
	},
	frequency::Frequency,
	graph::{
		key_manager::{UserKeyProvider, USER_KEY_MANAGER},
		shared_state_manager::{
			PriProvider, PublicKeyProvider, SharedStateManager, SHARED_STATE_MANAGER,
		},
		updates::UpdateEvent,
		user::UserGraph,
	},
	util::transactional_hashmap::{Transactional, TransactionalHashMap},
};
use dryoc::keypair::StackKeyPair;
use dsnp_graph_config::{
	errors::{DsnpGraphError, DsnpGraphResult},
	ConnectionType, Environment, GraphKeyType, InputValidation, SchemaId,
};
use log::Level;
use log_result_proc_macro::log_result_err;
use std::{
	collections::{hash_map::Entry, HashSet},
	sync::{Arc, RwLock},
};
use super::api_types::GraphKeyPair;
#[derive(Debug)]
pub struct GraphState {
	environment: Environment,
	shared_state_manager: Arc<RwLock<SharedStateManager>>,
	user_map: TransactionalHashMap<DsnpUserId, UserGraph>,
}
pub trait GraphAPI {
	fn contains_user_graph(&self, user_id: &DsnpUserId) -> bool;
	fn len(&self) -> usize;
	fn remove_user_graph(&mut self, user_id: &DsnpUserId);
	fn import_users_data(&mut self, payloads: &Vec<ImportBundle>) -> DsnpGraphResult<()>;
	fn export_updates(&self) -> DsnpGraphResult<Vec<Update>>;
	fn export_user_graph_updates(&self, user_id: &DsnpUserId) -> DsnpGraphResult<Vec<Update>>;
	fn apply_actions(
		&mut self,
		action: &[Action],
		options: &Option<ActionOptions>,
	) -> DsnpGraphResult<()>;
	fn force_recalculate_graphs(&self, user_id: &DsnpUserId) -> DsnpGraphResult<Vec<Update>>;
	fn get_connections_for_user_graph(
		&self,
		user_id: &DsnpUserId,
		schema_id: &SchemaId,
		include_pending: bool,
	) -> DsnpGraphResult<Vec<DsnpGraphEdge>>;
	fn get_connections_without_keys(&self) -> DsnpGraphResult<Vec<DsnpUserId>>;
	fn get_one_sided_private_friendship_connections(
		&self,
		user_id: &DsnpUserId,
	) -> DsnpGraphResult<Vec<DsnpGraphEdge>>;
	fn get_public_keys(&self, user_id: &DsnpUserId) -> DsnpGraphResult<Vec<DsnpPublicKey>>;
	fn deserialize_dsnp_keys(keys: &Option<DsnpKeys>) -> DsnpGraphResult<Vec<DsnpPublicKey>>;
	fn generate_keypair(key_pair_type: GraphKeyType) -> DsnpGraphResult<GraphKeyPair>;
}
impl Transactional for GraphState {
	fn commit(&mut self) {
		let ids: Vec<_> = self.user_map.inner().keys().copied().collect();
		for uid in ids {
			if let Some(u) = self.user_map.get_mut(&uid) {
				u.commit();
			}
		}
		self.user_map.commit();
		self.shared_state_manager.write().unwrap().commit();
	}
	fn rollback(&mut self) {
		self.user_map.rollback();
		let ids: Vec<_> = self.user_map.inner().keys().copied().collect();
		for uid in ids {
			if let Some(u) = self.user_map.get_mut(&uid) {
				u.rollback();
			}
		}
		self.shared_state_manager.write().unwrap().rollback();
	}
}
impl GraphAPI for GraphState {
	fn contains_user_graph(&self, user_id: &DsnpUserId) -> bool {
		self.user_map.inner().contains_key(user_id)
	}
	fn len(&self) -> usize {
		self.user_map.len()
	}
	fn remove_user_graph(&mut self, user_id: &DsnpUserId) {
		self.user_map.remove(user_id);
		self.user_map.commit();
	}
	#[log_result_err(Level::Error)]
	fn import_users_data(&mut self, payloads: &Vec<ImportBundle>) -> DsnpGraphResult<()> {
		let result = self.do_import_users_data(payloads);
		match result {
			DsnpGraphResult::Ok(_) => self.commit(),
			DsnpGraphResult::Err(_) => self.rollback(),
		};
		result
	}
	#[log_result_err(Level::Error)]
	fn export_updates(&self) -> DsnpGraphResult<Vec<Update>> {
		let mut result = self
			.shared_state_manager
			.read()
			.map_err(|_| DsnpGraphError::FailedtoReadLock(SHARED_STATE_MANAGER.to_string()))?
			.export_new_key_updates()?;
		let imported_users: Vec<_> = self.user_map.inner().keys().copied().collect();
		for user_id in imported_users {
			let user_graph = self
				.user_map
				.get(&user_id)
				.ok_or(DsnpGraphError::UserGraphNotImported(user_id))?;
			let updates = user_graph.calculate_updates()?;
			result.extend(updates);
		}
		Ok(result)
	}
	#[log_result_err(Level::Error)]
	fn export_user_graph_updates(&self, user_id: &DsnpUserId) -> DsnpGraphResult<Vec<Update>> {
		let mut result = self
			.shared_state_manager
			.read()
			.map_err(|_| DsnpGraphError::FailedtoReadLock(SHARED_STATE_MANAGER.to_string()))?
			.export_new_key_updates_for_user(user_id)?;
		let user_graph = self
			.user_map
			.get(&user_id)
			.ok_or(DsnpGraphError::UserGraphNotImported(*user_id))?;
		let updates = user_graph.calculate_updates()?;
		result.extend(updates);
		Ok(result)
	}
	#[log_result_err(Level::Error)]
	fn apply_actions(
		&mut self,
		actions: &[Action],
		options: &Option<ActionOptions>,
	) -> DsnpGraphResult<()> {
		let disable_auto_commit = match options {
			Some(ActionOptions { disable_auto_commit, .. }) => disable_auto_commit,
			None => &false,
		};
		let result = self.do_apply_actions(actions, options);
		if !disable_auto_commit {
			match result {
				DsnpGraphResult::Ok(_) => self.commit(),
				DsnpGraphResult::Err(_) => self.rollback(),
			}
		}
		result
	}
	#[log_result_err(Level::Error)]
	fn force_recalculate_graphs(&self, user_id: &DsnpUserId) -> DsnpGraphResult<Vec<Update>> {
		let user_graph = self
			.user_map
			.get(&user_id)
			.ok_or(DsnpGraphError::UserGraphNotImported(*user_id))?;
		user_graph.force_calculate_graphs()
	}
	#[log_result_err(Level::Error)]
	fn get_connections_for_user_graph(
		&self,
		user_id: &DsnpUserId,
		schema_id: &SchemaId,
		include_pending: bool,
	) -> DsnpGraphResult<Vec<DsnpGraphEdge>> {
		let user_graph = self
			.user_map
			.get(user_id)
			.ok_or(DsnpGraphError::UserGraphNotImported(*user_id))?;
		Ok(user_graph.get_all_connections_of(*schema_id, include_pending))
	}
	#[log_result_err(Level::Error)]
	fn get_connections_without_keys(&self) -> DsnpGraphResult<Vec<DsnpUserId>> {
		let private_friendship_schema_id = self
			.environment
			.get_config()
			.get_schema_id_from_connection_type(ConnectionType::Friendship(PrivacyType::Private))
			.ok_or(DsnpGraphError::InvalidPrivateSchemaId)?;
		let all_connections: HashSet<_> = self
			.user_map
			.inner()
			.values()
			.flat_map(|user_graph| {
				user_graph.get_all_connections_of(private_friendship_schema_id, true)
			})
			.map(|edge| edge.user_id)
			.collect();
		Ok(self
			.shared_state_manager
			.read()
			.map_err(|_| DsnpGraphError::FailedtoReadLock(SHARED_STATE_MANAGER.to_string()))?
			.find_users_without_keys(all_connections.into_iter().collect()))
	}
	#[log_result_err(Level::Error)]
	fn get_one_sided_private_friendship_connections(
		&self,
		user_id: &DsnpUserId,
	) -> DsnpGraphResult<Vec<DsnpGraphEdge>> {
		let private_friendship_schema_id = self
			.environment
			.get_config()
			.get_schema_id_from_connection_type(ConnectionType::Friendship(PrivacyType::Private))
			.ok_or(DsnpGraphError::InvalidPrivateSchemaId)?;
		let user_graph = match self.user_map.get(user_id) {
			Some(graph) => graph,
			None => return Err(DsnpGraphError::UserGraphNotImported(*user_id)),
		};
		let graph = user_graph
			.graph(&private_friendship_schema_id)
			.ok_or(DsnpGraphError::InvalidSchemaId(private_friendship_schema_id))?;
		graph.get_one_sided_friendships()
	}
	fn get_public_keys(&self, user_id: &DsnpUserId) -> DsnpGraphResult<Vec<DsnpPublicKey>> {
		Ok(self
			.shared_state_manager
			.read()
			.map_err(|_| DsnpGraphError::FailedtoReadLock(SHARED_STATE_MANAGER.to_string()))?
			.get_public_keys(user_id))
	}
	fn deserialize_dsnp_keys(keys: &Option<DsnpKeys>) -> DsnpGraphResult<Vec<DsnpPublicKey>> {
		let mut sorted_keys = match keys {
			Some(keys) => keys.keys.clone().to_vec(),
			None => vec![],
		};
		sorted_keys.sort();
		let mut dsnp_keys = vec![];
		for key in sorted_keys {
			let mut k =
				Frequency::read_public_key(&key.content).map_err(|e| DsnpGraphError::from(e))?;
			k.key_id = Some(key.index.into());
			dsnp_keys.push(k);
		}
		Ok(dsnp_keys)
	}
	fn generate_keypair(key_pair_type: GraphKeyType) -> DsnpGraphResult<GraphKeyPair> {
		let key_pair = match key_pair_type {
			GraphKeyType::X25519 => StackKeyPair::gen(),
		};
		Ok(GraphKeyPair {
			secret_key: key_pair.secret_key.to_vec(),
			public_key: key_pair.public_key.to_vec(),
			key_type: key_pair_type,
		})
	}
}
impl GraphState {
	pub fn new(environment: Environment) -> Self {
		Self {
			environment,
			user_map: TransactionalHashMap::new(),
			shared_state_manager: Arc::new(RwLock::new(SharedStateManager::new())),
		}
	}
	fn get_or_create_user_graph(
		&mut self,
		dsnp_user_id: DsnpUserId,
	) -> DsnpGraphResult<&mut UserGraph> {
		match self.user_map.entry(dsnp_user_id) {
			Entry::Occupied(o) => Ok(o.into_mut()),
			Entry::Vacant(v) => Ok(v.insert(UserGraph::new(
				&dsnp_user_id,
				&self.environment,
				self.shared_state_manager.clone(),
			))),
		}
	}
	#[log_result_err(Level::Error)]
	fn do_import_users_data(&mut self, payloads: &Vec<ImportBundle>) -> DsnpGraphResult<()> {
		for bundle in payloads {
			bundle.validate()?;
		}
		for ImportBundle { schema_id, pages, dsnp_keys, dsnp_user_id, key_pairs } in payloads {
			let connection_type_option =
				self.environment.get_config().get_connection_type_from_schema_id(*schema_id);
			match dsnp_keys {
				Some(dsnp_keys) => {
					self.shared_state_manager
						.write()
						.map_err(|_| {
							DsnpGraphError::FailedtoWriteLock(SHARED_STATE_MANAGER.to_string())
						})?
						.import_dsnp_keys(&dsnp_keys)?;
				},
				None => (),
			};
			let user_graph = self.get_or_create_user_graph(*dsnp_user_id)?;
			let include_secret_keys = !key_pairs.is_empty();
			{
				let mut user_key_manager = user_graph
					.user_key_manager
					.write()
					.map_err(|_| DsnpGraphError::FailedtoWriteLock(USER_KEY_MANAGER.to_string()))?;
				user_key_manager.import_key_pairs(key_pairs.clone())?;
			};
			if pages.is_empty() {
				continue;
			}
			let dsnp_config = user_graph
				.get_dsnp_config(*schema_id)
				.ok_or(DsnpGraphError::InvalidSchemaId(*schema_id))?;
			let graph = user_graph
				.graph_mut(&schema_id)
				.ok_or(DsnpGraphError::InvalidSchemaId(*schema_id))?;
			graph.clear();
			let connection_type =
				connection_type_option.ok_or(DsnpGraphError::InvalidSchemaId(*schema_id))?;
			match connection_type.privacy_type() {
				PrivacyType::Public => {
					graph.import_public(connection_type, pages)?;
					user_graph.sync_updates(*schema_id);
				},
				PrivacyType::Private => {
					if include_secret_keys {
						graph.import_private(&dsnp_config, connection_type, pages)?;
						user_graph.sync_updates(*schema_id);
					}
					if connection_type == ConnectionType::Friendship(PrivacyType::Private) {
						self.shared_state_manager
							.write()
							.map_err(|_| {
								DsnpGraphError::FailedtoWriteLock(SHARED_STATE_MANAGER.to_string())
							})?
							.import_pri(*dsnp_user_id, pages)?;
					}
				},
			};
		}
		Ok(())
	}
	#[log_result_err(Level::Error)]
	fn do_apply_actions(
		&mut self,
		actions: &[Action],
		options: &Option<ActionOptions>,
	) -> DsnpGraphResult<()> {
		for action in actions {
			action.validate()?;
		}
		let (ignore_existing_connections, ignore_missing_connections) = match options {
			Some(options) =>
				(options.ignore_existing_connections, options.ignore_missing_connections),
			None => (false, false),
		};
		for action in actions {
			let owner_graph = self.get_or_create_user_graph(action.owner_dsnp_user_id())?;
			match action {
				Action::Connect {
					connection: Connection { ref dsnp_user_id, ref schema_id },
					dsnp_keys,
					..
				} => {
					if owner_graph.graph_has_connection(*schema_id, *dsnp_user_id, true) {
						if ignore_existing_connections {
							log::warn!(
								"Ignoring add redundant connection {} -> {}",
								action.owner_dsnp_user_id(),
								*dsnp_user_id
							);
							continue;
						}
						return Err(DsnpGraphError::ConnectionAlreadyExists(
							action.owner_dsnp_user_id(),
							*dsnp_user_id,
						));
					}
					owner_graph.update_tracker_mut().register_update(
						UpdateEvent::create_add(*dsnp_user_id, *schema_id),
						ignore_existing_connections,
					)?;
					if let Some(inner_keys) = dsnp_keys {
						self.shared_state_manager
							.write()
							.map_err(|_| {
								DsnpGraphError::FailedtoWriteLock(SHARED_STATE_MANAGER.to_string())
							})?
							.import_dsnp_keys(inner_keys)?;
					}
				},
				Action::Disconnect {
					connection: Connection { ref dsnp_user_id, ref schema_id },
					..
				} => {
					if !owner_graph.graph_has_connection(*schema_id, *dsnp_user_id, true) {
						if ignore_missing_connections {
							log::warn!(
								"Ignoring remove non-existent connection {} -> {}",
								action.owner_dsnp_user_id(),
								*dsnp_user_id
							);
							continue;
						}
						return Err(DsnpGraphError::ConnectionDoesNotExist(
							action.owner_dsnp_user_id(),
							*dsnp_user_id,
						));
					}
					owner_graph.update_tracker_mut().register_update(
						UpdateEvent::create_remove(*dsnp_user_id, *schema_id),
						ignore_missing_connections,
					)?;
				},
				Action::AddGraphKey { new_public_key, .. } => {
					self.shared_state_manager
						.write()
						.map_err(|_| {
							DsnpGraphError::FailedtoWriteLock(SHARED_STATE_MANAGER.to_string())
						})?
						.add_new_key(action.owner_dsnp_user_id(), new_public_key.clone())?;
				},
			}
		}
		Ok(())
	}
}
#[cfg(test)]
mod test {
	use super::*;
	use crate::{
		api::api_types::ResolvedKeyPair,
		dsnp::{dsnp_configs::KeyPairType, dsnp_types::DsnpPrid},
		util::builders::{ImportBundleBuilder, KeyDataBuilder},
	};
	use memory_stats::memory_stats;
	use ntest::*;
	#[test]
	fn graph_contains_false() {
		let state = GraphState::new(Environment::Mainnet);
		assert!(!state.contains_user_graph(&0));
	}
	#[test]
	fn graph_contains_true() {
		let mut state = GraphState::new(Environment::Mainnet);
		let _ = state.get_or_create_user_graph(0);
		assert!(state.contains_user_graph(&0));
	}
	#[test]
	fn graph_len() {
		let mut state = GraphState::new(Environment::Mainnet);
		let _ = state.get_or_create_user_graph(0);
		assert_eq!(state.len(), 1);
		let _ = state.get_or_create_user_graph(1);
		assert_eq!(state.len(), 2);
	}
	#[test]
	fn add_user_success() {
		let mut state = GraphState::new(Environment::Mainnet);
		let res = state.get_or_create_user_graph(0);
		assert!(res.is_ok());
	}
	#[test]
	fn remove_user_success() {
		let mut state = GraphState::new(Environment::Mainnet);
		let _ = state.get_or_create_user_graph(0);
		let _ = state.get_or_create_user_graph(1);
		state.remove_user_graph(&0);
		assert_eq!(state.len(), 1);
		assert!(!state.contains_user_graph(&0));
		assert!(state.contains_user_graph(&1));
	}
	#[test]
	fn remove_nonexistent_user_noop() {
		let mut state = GraphState::new(Environment::Mainnet);
		let _ = state.get_or_create_user_graph(0);
		let _ = state.get_or_create_user_graph(1);
		state.remove_user_graph(&99);
		assert_eq!(state.user_map.len(), 2);
	}
	#[test]
	fn import_user_data_should_import_keys_and_data_for_public_follow_graph() {
		let env = Environment::Mainnet;
		let schema_id = env
			.get_config()
			.get_schema_id_from_connection_type(ConnectionType::Follow(PrivacyType::Public))
			.expect("should exist");
		let mut state = GraphState::new(env.clone());
		let key_pair_raw = StackKeyPair::gen();
		let keypair = GraphKeyPair {
			secret_key: key_pair_raw.secret_key.to_vec(),
			public_key: key_pair_raw.public_key.to_vec(),
			key_type: GraphKeyType::X25519,
		};
		let dsnp_user_id = 123;
		let connections = vec![(2, 0), (3, 0), (4, 0), (5, 0)];
		let input = ImportBundleBuilder::new(env, dsnp_user_id, schema_id)
			.with_key_pairs(&vec![keypair.clone()])
			.with_page(1, &connections, &vec![], 1000)
			.build();
		let res = state.import_users_data(&vec![input]);
		assert!(res.is_ok());
		let public_manager = state.shared_state_manager.read().unwrap();
		let keys = public_manager.get_imported_keys(dsnp_user_id);
		assert_eq!(keys.len(), 1);
		let res = state.get_connections_for_user_graph(&dsnp_user_id, &schema_id, false);
		assert!(res.is_ok());
		let res_set: HashSet<_> = res.unwrap().iter().copied().collect();
		let mapped: HashSet<_> = connections
			.into_iter()
			.map(|(c, s)| DsnpGraphEdge { user_id: c, since: s })
			.collect();
		assert_eq!(res_set, mapped);
	}
	#[test]
	fn import_user_data_should_import_keys_and_data_for_private_follow_graph() {
		let env = Environment::Mainnet;
		let schema_id = env
			.get_config()
			.get_schema_id_from_connection_type(ConnectionType::Follow(PrivacyType::Private))
			.expect("should exist");
		let mut state = GraphState::new(env.clone());
		let key_pair_raw = StackKeyPair::gen();
		let resolved_key =
			ResolvedKeyPair { key_pair: KeyPairType::Version1_0(key_pair_raw.clone()), key_id: 1 };
		let keypair = GraphKeyPair {
			secret_key: key_pair_raw.secret_key.to_vec(),
			public_key: key_pair_raw.public_key.to_vec(),
			key_type: GraphKeyType::X25519,
		};
		let dsnp_user_id = 123;
		let connections = vec![(2, 0), (3, 0), (4, 0), (5, 0)];
		let input = ImportBundleBuilder::new(env, dsnp_user_id, schema_id)
			.with_key_pairs(&vec![keypair])
			.with_encryption_key(resolved_key)
			.with_page(1, &connections, &vec![], 100)
			.build();
		let res = state.import_users_data(&vec![input]);
		assert!(res.is_ok());
		let public_manager = state.shared_state_manager.read().unwrap();
		let keys = public_manager.get_imported_keys(dsnp_user_id);
		assert_eq!(keys.len(), 1);
		let res = state.get_connections_for_user_graph(&dsnp_user_id, &schema_id, false);
		assert!(res.is_ok());
		let res_set: HashSet<_> = res.unwrap().iter().copied().collect();
		let mapped: HashSet<_> = connections
			.into_iter()
			.map(|(c, s)| DsnpGraphEdge { user_id: c, since: s })
			.collect();
		assert_eq!(res_set, mapped);
	}
	#[test]
	#[timeout(100000)]
	fn add_large_number_of_follows_to_private_follow_graph_should_succeed() {
		let env = Environment::Mainnet;
		let schema_id = env
			.get_config()
			.get_schema_id_from_connection_type(ConnectionType::Follow(PrivacyType::Private))
			.expect("should exist");
		let mut state = GraphState::new(env.clone());
		let key_pair_raw = StackKeyPair::gen();
		let resolved_key =
			ResolvedKeyPair { key_pair: KeyPairType::Version1_0(key_pair_raw.clone()), key_id: 1 };
		let keypair = GraphKeyPair {
			secret_key: key_pair_raw.secret_key.to_vec(),
			public_key: key_pair_raw.public_key.to_vec(),
			key_type: GraphKeyType::X25519,
		};
		let dsnp_user_id = 7002;
		let input = ImportBundleBuilder::new(env.clone(), dsnp_user_id, schema_id)
			.with_key_pairs(&vec![keypair.clone()])
			.with_encryption_key(resolved_key.clone())
			.build();
		let mem_usage = memory_stats().unwrap();
		println!("before data import physical mem: {}", mem_usage.physical_mem);
		let res = state.import_users_data(&vec![input]);
		let mem_usage = memory_stats().unwrap();
		println!("after data import physical mem: {}", mem_usage.physical_mem);
		assert!(res.is_ok());
		let actions: Vec<Action> = (1u64..7000u64)
			.map(|id| Action::Connect {
				owner_dsnp_user_id: dsnp_user_id,
				connection: Connection { dsnp_user_id: id, schema_id },
				dsnp_keys: None,
			})
			.collect();
		let mem_usage = memory_stats().unwrap();
		println!("before action import physical mem: {}", mem_usage.physical_mem);
		let res = state.apply_actions(
			&actions,
			&Some(ActionOptions {
				ignore_existing_connections: true,
				ignore_missing_connections: false,
				disable_auto_commit: false,
			}),
		);
		let mem_usage = memory_stats().unwrap();
		println!("after action import physical mem: {}", mem_usage.physical_mem);
		assert!(res.is_ok());
		let connections =
			state.get_connections_for_user_graph(&dsnp_user_id, &schema_id, true).unwrap();
		let before_export_set: HashSet<_> = connections.iter().map(|e| e.user_id).collect();
		let export = state.export_updates();
		assert!(export.is_ok());
		println!("after export physical mem: {}", mem_usage.physical_mem);
		let updates = export.unwrap();
		let mut updated_state = GraphState::new(env.clone());
		let updated_input = ImportBundleBuilder::new(env.clone(), dsnp_user_id, schema_id)
			.with_key_pairs(&vec![keypair])
			.with_encryption_key(resolved_key.clone())
			.build();
		let new_import = ImportBundleBuilder::build_from(&updated_input, &updates);
		let res = updated_state.import_users_data(&vec![new_import]);
		assert!(res.is_ok());
		let connections = updated_state
			.get_connections_for_user_graph(&dsnp_user_id, &schema_id, false)
			.unwrap();
		let after_reimport_set: HashSet<_> = connections.iter().map(|e| e.user_id).collect();
		assert_eq!(before_export_set, after_reimport_set);
	}
	#[test]
	fn import_user_data_without_private_keys_should_add_prids_for_private_friendship_graph() {
		let env = Environment::Mainnet;
		let schema_id = env
			.get_config()
			.get_schema_id_from_connection_type(ConnectionType::Friendship(PrivacyType::Private))
			.expect("should exist");
		let mut state = GraphState::new(env.clone());
		let dsnp_user_id = 123;
		let connections = vec![(2, 0), (3, 0), (4, 0), (5, 0)];
		let prids = vec![
			DsnpPrid::new(&[1, 2, 3, 4, 5, 6, 7, 4]),
			DsnpPrid::new(&[10, 2, 3, 4, 5, 6, 7, 4]),
			DsnpPrid::new(&[8, 2, 0, 4, 5, 6, 7, 4]),
			DsnpPrid::new(&[3, 2, 3, 4, 4, 6, 1, 4]),
		];
		let input = ImportBundleBuilder::new(env, dsnp_user_id, schema_id)
			.with_page(1, &connections, &prids, 1000)
			.build();
		let res = state.import_users_data(&vec![input]);
		assert!(res.is_ok());
		let manager = state.shared_state_manager.read().unwrap();
		for p in prids {
			assert!(manager.contains(dsnp_user_id, p));
		}
	}
	#[test]
	fn import_user_data_with_wrong_key_should_fail_for_private_follow_graph_and_rollback_everything(
	) {
		let env = Environment::Mainnet;
		let schema_id = env
			.get_config()
			.get_schema_id_from_connection_type(ConnectionType::Follow(PrivacyType::Private))
			.expect("should exist");
		let mut state = GraphState::new(env.clone());
		let key_pair_raw = StackKeyPair::gen();
		let resolved_key =
			ResolvedKeyPair { key_pair: KeyPairType::Version1_0(key_pair_raw.clone()), key_id: 1 };
		let keypair = GraphKeyPair {
			secret_key: key_pair_raw.secret_key.to_vec(),
			public_key: key_pair_raw.public_key.to_vec(),
			key_type: GraphKeyType::X25519,
		};
		let dsnp_user_id = 123;
		let connections = vec![(2, 0), (3, 0), (4, 0), (5, 0)];
		let mut input = ImportBundleBuilder::new(env, dsnp_user_id, schema_id)
			.with_key_pairs(&vec![keypair])
			.with_encryption_key(resolved_key)
			.with_page(1, &connections, &vec![], 0)
			.build();
		let wrong_key_pair = StackKeyPair::gen();
		input.key_pairs = vec![GraphKeyPair {
			secret_key: wrong_key_pair.secret_key.to_vec(),
			public_key: wrong_key_pair.public_key.to_vec(),
			key_type: GraphKeyType::X25519,
		}];
		let res = state.import_users_data(&vec![input]);
		assert!(res.is_err());
		assert_eq!(
			state.shared_state_manager.read().unwrap().get_imported_keys(dsnp_user_id).len(),
			0
		);
		assert!(state.get_connections_for_user_graph(&dsnp_user_id, &schema_id, true).is_err());
	}
	#[test]
	fn apply_actions_error_should_rollback_every_action() {
		let env = Environment::Mainnet;
		let schema_id = env
			.get_config()
			.get_schema_id_from_connection_type(ConnectionType::Follow(PrivacyType::Private))
			.expect("should exist");
		let key_pair_raw = StackKeyPair::gen();
		let keypair = GraphKeyPair {
			secret_key: key_pair_raw.secret_key.to_vec(),
			public_key: key_pair_raw.public_key.to_vec(),
			key_type: GraphKeyType::X25519,
		};
		let owner_dsnp_user_id: DsnpUserId = 0;
		let connect_action_1 = Action::Connect {
			owner_dsnp_user_id,
			connection: Connection { dsnp_user_id: 1, schema_id },
			dsnp_keys: Some(DsnpKeys {
				keys: KeyDataBuilder::new().with_key_pairs(&vec![keypair]).build(),
				keys_hash: 0,
				dsnp_user_id: owner_dsnp_user_id,
			}),
		};
		let connect_action_2 = Action::Connect {
			owner_dsnp_user_id,
			connection: Connection { dsnp_user_id: 2, schema_id },
			dsnp_keys: None,
		};
		let key_add_action = Action::AddGraphKey {
			owner_dsnp_user_id,
			new_public_key: b"27893788291911998228288282".to_vec(),
		};
		let mut state = GraphState::new(env);
		assert!(state
			.apply_actions(
				&vec![connect_action_1.clone(), connect_action_2, connect_action_1, key_add_action],
				&None
			)
			.is_err());
		assert_eq!(state.user_map.len(), 0);
		let updates = state.shared_state_manager.write().unwrap().export_new_key_updates();
		assert!(updates.is_ok());
		assert_eq!(updates.unwrap().len(), 0);
	}
}