aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGalen Guyer <galen@galenguyer.com>2022-11-09 23:47:15 -0500
committerGalen Guyer <galen@galenguyer.com>2022-11-09 23:55:44 -0500
commit3094c184734c305d04c193371714e91170b6ac9f (patch)
tree9a2f5a1e142756039cbbebcbf1b030d845fca60c
parent7737ff350af0c10f42c58563f3533153282db230 (diff)
Batch inserts to make them much faster
-rw-r--r--Cargo.lock1
-rw-r--r--Cargo.toml1
-rw-r--r--src/load.rs560
3 files changed, 327 insertions, 235 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 0ceb564..24f4cb1 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -45,6 +45,7 @@ dependencies = [
"csv",
"filetime",
"indicatif",
+ "itertools",
"regex",
"serde",
"sqlx",
diff --git a/Cargo.toml b/Cargo.toml
index a83d9bc..6e81bc6 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -10,6 +10,7 @@ chrono = "0.4.22"
csv = "1.1.6"
filetime = "0.2.18"
indicatif = "0.17.2"
+itertools = "0.10.5"
regex = "1.7.0"
serde = { version = "1.0.147", features = ["derive"] }
sqlx = { version = "0.6.2", features = ["sqlite", "runtime-tokio-rustls", "chrono"] }
diff --git a/src/load.rs b/src/load.rs
index 6adb6d7..2fe04a6 100644
--- a/src/load.rs
+++ b/src/load.rs
@@ -1,18 +1,23 @@
use crate::types::*;
+use csv::StringRecord;
use indicatif::{ProgressBar, ProgressStyle};
-use sqlx::SqlitePool;
+use itertools::Itertools;
+use sqlx::{QueryBuilder, Sqlite, SqlitePool};
use std::fs::File;
use std::io::BufRead;
-const INSERT_AMATEUR_SQL: &str = r"INSERT INTO amateurs (record_type, unique_system_identifier, uls_file_number, ebf_number, call_sign, operator_class, group_code, region_code, trustee_call_sign, trustee_indicator, physician_certification, ve_signature, systematic_call_sign_change, vanity_call_sign_change, vainty_relationship, previous_call_sign, previous_operator_class, trustee_name) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
-const INSERT_COMMENT_SQL: &str = r"INSERT INTO comments (record_type, unique_system_identifier, uls_file_number, call_sign, comment_date, description, status_code, status_date) VALUES (?, ?, ?, ?, ?, ?, ?, ?)";
-const INSERT_ENTITY_SQL: &str = r"INSERT INTO entities (record_type, unique_system_identifier, uls_file_number, ebf_number, call_sign, entity_type, licensee_id, entity_name, first_name, mi, last_name, suffix, phone, fax, email, street_address, city, state, zip_code, po_box, attention_line, sgin, frn, applicant_type_code, applicant_type_other, status_code, status_date, lic_category_code, linked_license_id, linked_callsign) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
-const INSERT_HEADER_SQL: &str = r"INSERT INTO headers (record_type, unique_system_identifier, uls_file_number, ebf_number, call_sign, license_status, radio_service_code, grant_date, expired_date, cancellation_date, eligibility_rule_number, reserved, alien, alien_government, alien_corporation, alien_officer, alien_control, revoked, convicted, adjudged, reserved2, common_carrier, non_common_carrier, private_comm, fixed, mobile, radiolocation, satellite, developmental_or_sta, interconnected_service, certifier_first_name, certifier_mi, certifier_last_name, certifier_suffix, certifier_title, gender, african_american, native_american, hawaiian, asian, white, ethnicity, effective_date, last_action_date, auction_id, reg_stat_broad_serv, band_manager, type_serv_broad_serv, alien_ruling, licensee_name_change, whitespace_ind, additional_cert_choice, additional_cert_answer, discontinuation_ind, regulatory_compliance_ind, eligibility_cert_900, transition_plan_cert_900, return_spectrum_cert_900, payment_cert_900) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
-const INSERT_HISTORY_SQL: &str = r"INSERT INTO history (record_type, unique_system_identifier, uls_file_number, call_sign, log_date, code) VALUES (?, ?, ?, ?, ?, ?)";
-const INSERT_LICENSE_ATTACHMENT_SQL: &str = r"INSERT INTO license_attachments (record_type, unique_system_identifier, call_sign, attachment_code, attachment_description, attachment_date, attachment_file_name, action_performed) VALUES (?, ?, ?, ?, ?, ?, ?, ?)";
-const INSERT_SPECIAL_CONDITION_SQL: &str = r"INSERT INTO special_conditions (record_type, unique_system_identifier, uls_file_number, ebf_number, call_sign, special_conditions_type, special_conditions_code, status_code, status_date) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
-const INSERT_SPECIAL_CONDITION_FREE_FORM_SQL: &str = r"INSERT INTO special_conditions_free_form (record_type, unique_system_identifier, uls_file_number, ebf_number, call_sign, license_free_form_type, unique_license_free_form_identifier, sequence_number, license_free_form_condition, status_code, status_date) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
-const INSERT_SPECIAL_CONDITION_CODES_SQL: &str = r"INSERT INTO special_condition_codes (code, service, description, unknown) VALUES (?, ?, ?, ?)";
+const INSERT_AMATEUR_SQL: &str = r"INSERT INTO amateurs (record_type, unique_system_identifier, uls_file_number, ebf_number, call_sign, operator_class, group_code, region_code, trustee_call_sign, trustee_indicator, physician_certification, ve_signature, systematic_call_sign_change, vanity_call_sign_change, vainty_relationship, previous_call_sign, previous_operator_class, trustee_name) ";
+const INSERT_COMMENT_SQL: &str = r"INSERT INTO comments (record_type, unique_system_identifier, uls_file_number, call_sign, comment_date, description, status_code, status_date) ";
+const INSERT_ENTITY_SQL: &str = r"INSERT INTO entities (record_type, unique_system_identifier, uls_file_number, ebf_number, call_sign, entity_type, licensee_id, entity_name, first_name, mi, last_name, suffix, phone, fax, email, street_address, city, state, zip_code, po_box, attention_line, sgin, frn, applicant_type_code, applicant_type_other, status_code, status_date, lic_category_code, linked_license_id, linked_callsign) ";
+const INSERT_HEADER_SQL: &str = r"INSERT INTO headers (record_type, unique_system_identifier, uls_file_number, ebf_number, call_sign, license_status, radio_service_code, grant_date, expired_date, cancellation_date, eligibility_rule_number, reserved, alien, alien_government, alien_corporation, alien_officer, alien_control, revoked, convicted, adjudged, reserved2, common_carrier, non_common_carrier, private_comm, fixed, mobile, radiolocation, satellite, developmental_or_sta, interconnected_service, certifier_first_name, certifier_mi, certifier_last_name, certifier_suffix, certifier_title, gender, african_american, native_american, hawaiian, asian, white, ethnicity, effective_date, last_action_date, auction_id, reg_stat_broad_serv, band_manager, type_serv_broad_serv, alien_ruling, licensee_name_change, whitespace_ind, additional_cert_choice, additional_cert_answer, discontinuation_ind, regulatory_compliance_ind, eligibility_cert_900, transition_plan_cert_900, return_spectrum_cert_900, payment_cert_900) ";
+const INSERT_HISTORY_SQL: &str = r"INSERT INTO history (record_type, unique_system_identifier, uls_file_number, call_sign, log_date, code) ";
+const INSERT_LICENSE_ATTACHMENT_SQL: &str = r"INSERT INTO license_attachments (record_type, unique_system_identifier, call_sign, attachment_code, attachment_description, attachment_date, attachment_file_name, action_performed) ";
+const INSERT_SPECIAL_CONDITION_SQL: &str = r"INSERT INTO special_conditions (record_type, unique_system_identifier, uls_file_number, ebf_number, call_sign, special_conditions_type, special_conditions_code, status_code, status_date) ";
+const INSERT_SPECIAL_CONDITION_FREE_FORM_SQL: &str = r"INSERT INTO special_conditions_free_form (record_type, unique_system_identifier, uls_file_number, ebf_number, call_sign, license_free_form_type, unique_license_free_form_identifier, sequence_number, license_free_form_condition, status_code, status_date) ";
+const INSERT_SPECIAL_CONDITION_CODES_SQL: &str =
+ r"INSERT INTO special_condition_codes (code, service, description, unknown) ";
+
+const BIND_LIMIT: usize = 32766;
pub async fn load_amateurs(db: &SqlitePool) {
let amateurs_file = File::open("AM.dat").expect("Error opening file");
@@ -38,33 +43,42 @@ pub async fn load_amateurs(db: &SqlitePool) {
);
progress_bar.set_message("AM.dat");
- for line in reader.records() {
- let line = line.expect("Error reading entry");
- let amateur: Amateur = line.deserialize(None).expect("Error deserializing entry");
- let statement = sqlx::query(INSERT_AMATEUR_SQL);
- statement
- .bind(amateur.RecordType)
- .bind(amateur.UniqueSystemIdentifier)
- .bind(amateur.UlsFileNumber)
- .bind(amateur.EBFNumber)
- .bind(amateur.CallSign)
- .bind(amateur.OperatorClass)
- .bind(amateur.GroupCode)
- .bind(amateur.RegionCode)
- .bind(amateur.TrusteeCallSign)
- .bind(amateur.TrusteeIndicator)
- .bind(amateur.PhysicianCertification)
- .bind(amateur.VESignature)
- .bind(amateur.SystematicCallSignChange)
- .bind(amateur.VanityCallSignChange)
- .bind(amateur.VanityRelationship)
- .bind(amateur.PreviousCallSign)
- .bind(amateur.PreviousOperatorClass)
- .bind(amateur.TrusteeName)
+ let chunk_size = BIND_LIMIT / 18;
+ for chunk in &reader.records().chunks(chunk_size) {
+ let chunk = chunk.collect::<Result<Vec<StringRecord>, _>>().unwrap();
+ let chunk = chunk.iter();
+
+ let mut query_builder: QueryBuilder<Sqlite> = QueryBuilder::new(INSERT_AMATEUR_SQL);
+
+ query_builder.push_values(chunk, |mut builder, entry| {
+ let amateur: Amateur = entry.deserialize(None).expect("Error deserializing entry");
+ builder
+ .push_bind(amateur.RecordType)
+ .push_bind(amateur.UniqueSystemIdentifier)
+ .push_bind(amateur.UlsFileNumber)
+ .push_bind(amateur.EBFNumber)
+ .push_bind(amateur.CallSign)
+ .push_bind(amateur.OperatorClass)
+ .push_bind(amateur.GroupCode)
+ .push_bind(amateur.RegionCode)
+ .push_bind(amateur.TrusteeCallSign)
+ .push_bind(amateur.TrusteeIndicator)
+ .push_bind(amateur.PhysicianCertification)
+ .push_bind(amateur.VESignature)
+ .push_bind(amateur.SystematicCallSignChange)
+ .push_bind(amateur.VanityCallSignChange)
+ .push_bind(amateur.VanityRelationship)
+ .push_bind(amateur.PreviousCallSign)
+ .push_bind(amateur.PreviousOperatorClass)
+ .push_bind(amateur.TrusteeName);
+ });
+
+ query_builder
+ .build()
.execute(&mut transaction)
.await
- .expect("Error executing statement");
- progress_bar.set_position(line.position().unwrap().line());
+ .expect("Error executing query");
+ progress_bar.set_position(progress_bar.position() + chunk_size as u64);
}
transaction
@@ -98,23 +112,32 @@ pub async fn load_comments(db: &SqlitePool) {
);
progress_bar.set_message("CO.dat");
- for line in reader.records() {
- let line = line.expect("Error reading entry");
- let comment: Comment = line.deserialize(None).expect("Error deserializing entry");
- let statement = sqlx::query(INSERT_COMMENT_SQL);
- statement
- .bind(comment.RecordType)
- .bind(comment.UniqueSystemIdentifier)
- .bind(comment.UlsFileNumber)
- .bind(comment.CallSign)
- .bind(comment.CommentDate)
- .bind(comment.Description)
- .bind(comment.StatusCode)
- .bind(comment.StatusDate)
+ let chunk_size = BIND_LIMIT / 8;
+ for chunk in &reader.records().chunks(chunk_size) {
+ let chunk = chunk.collect::<Result<Vec<StringRecord>, _>>().unwrap();
+ let chunk = chunk.iter();
+
+ let mut query_builder: QueryBuilder<Sqlite> = QueryBuilder::new(INSERT_COMMENT_SQL);
+
+ query_builder.push_values(chunk, |mut builder, entry| {
+ let comment: Comment = entry.deserialize(None).expect("Error deserializing entry");
+ builder
+ .push_bind(comment.RecordType)
+ .push_bind(comment.UniqueSystemIdentifier)
+ .push_bind(comment.UlsFileNumber)
+ .push_bind(comment.CallSign)
+ .push_bind(comment.CommentDate)
+ .push_bind(comment.Description)
+ .push_bind(comment.StatusCode)
+ .push_bind(comment.StatusDate);
+ });
+
+ query_builder
+ .build()
.execute(&mut transaction)
.await
- .expect("Error executing statement");
- progress_bar.set_position(line.position().unwrap().line());
+ .expect("Error executing query");
+ progress_bar.set_position(progress_bar.position() + chunk_size as u64);
}
transaction
@@ -149,45 +172,54 @@ pub async fn load_entities(db: &SqlitePool) {
);
progress_bar.set_message("EN.dat");
- for line in reader.records() {
- let line = line.expect("Error reading entry");
- let entity: Entity = line.deserialize(None).expect("Error deserializing entry");
- let statement = sqlx::query(INSERT_ENTITY_SQL);
- statement
- .bind(entity.RecordType)
- .bind(entity.UniqueSystemIdentifier)
- .bind(entity.UlsFileNumber)
- .bind(entity.EBFNumber)
- .bind(entity.CallSign)
- .bind(entity.EntityType)
- .bind(entity.LicenseeId)
- .bind(entity.EntityName)
- .bind(entity.FirstName)
- .bind(entity.MiddleInitial)
- .bind(entity.LastName)
- .bind(entity.Suffix)
- .bind(entity.Phone)
- .bind(entity.Fax)
- .bind(entity.Email)
- .bind(entity.StreetAddress)
- .bind(entity.City)
- .bind(entity.State)
- .bind(entity.ZipCode)
- .bind(entity.POBox)
- .bind(entity.AttentionLine)
- .bind(entity.SGIN)
- .bind(entity.FRN)
- .bind(entity.ApplicantTypeCode)
- .bind(entity.ApplicantTypeCodeOther)
- .bind(entity.StatusCode)
- .bind(entity.StatusDate)
- .bind(entity.ThreePointSevenGhzLicenseType)
- .bind(entity.LinkedUniqueSystemIdentifier)
- .bind(entity.LinkedCallsign)
+ let chunk_size = BIND_LIMIT / 30;
+ for chunk in &reader.records().chunks(chunk_size) {
+ let chunk = chunk.collect::<Result<Vec<StringRecord>, _>>().unwrap();
+ let chunk = chunk.iter();
+
+ let mut query_builder: QueryBuilder<Sqlite> = QueryBuilder::new(INSERT_ENTITY_SQL);
+
+ query_builder.push_values(chunk, |mut builder, entry| {
+ let entity: Entity = entry.deserialize(None).expect("Error deserializing entry");
+ builder
+ .push_bind(entity.RecordType)
+ .push_bind(entity.UniqueSystemIdentifier)
+ .push_bind(entity.UlsFileNumber)
+ .push_bind(entity.EBFNumber)
+ .push_bind(entity.CallSign)
+ .push_bind(entity.EntityType)
+ .push_bind(entity.LicenseeId)
+ .push_bind(entity.EntityName)
+ .push_bind(entity.FirstName)
+ .push_bind(entity.MiddleInitial)
+ .push_bind(entity.LastName)
+ .push_bind(entity.Suffix)
+ .push_bind(entity.Phone)
+ .push_bind(entity.Fax)
+ .push_bind(entity.Email)
+ .push_bind(entity.StreetAddress)
+ .push_bind(entity.City)
+ .push_bind(entity.State)
+ .push_bind(entity.ZipCode)
+ .push_bind(entity.POBox)
+ .push_bind(entity.AttentionLine)
+ .push_bind(entity.SGIN)
+ .push_bind(entity.FRN)
+ .push_bind(entity.ApplicantTypeCode)
+ .push_bind(entity.ApplicantTypeCodeOther)
+ .push_bind(entity.StatusCode)
+ .push_bind(entity.StatusDate)
+ .push_bind(entity.ThreePointSevenGhzLicenseType)
+ .push_bind(entity.LinkedUniqueSystemIdentifier)
+ .push_bind(entity.LinkedCallsign);
+ });
+
+ query_builder
+ .build()
.execute(&mut transaction)
.await
- .expect("Error executing statement");
- progress_bar.set_position(line.position().unwrap().line());
+ .expect("Error executing query");
+ progress_bar.set_position(progress_bar.position() + chunk_size as u64);
}
transaction
@@ -221,74 +253,83 @@ pub async fn load_headers(db: &SqlitePool) {
);
progress_bar.set_message("HD.dat");
- for line in reader.records() {
- let line = line.expect("Error reading entry");
- let header: Header = line.deserialize(None).expect("Error deserializing entry");
- let statement = sqlx::query(INSERT_HEADER_SQL);
- statement
- .bind(header.RecordType)
- .bind(header.UniqueSystemIdentifier)
- .bind(header.UlsFileNumber)
- .bind(header.EBFNumber)
- .bind(header.CallSign)
- .bind(header.LicenseStatus)
- .bind(header.RadioServiceCode)
- .bind(header.GrantDate)
- .bind(header.ExpiredDate)
- .bind(header.CancellationDate)
- .bind(header.EligibilityRuleNumber)
- .bind(header.Reserved)
- .bind(header.Alien)
- .bind(header.AlienGovernment)
- .bind(header.AlienCorporation)
- .bind(header.AlienOfficers)
- .bind(header.AlienControl)
- .bind(header.Revoked)
- .bind(header.Convicted)
- .bind(header.Adjudged)
- .bind(header.Reserved2)
- .bind(header.CommonCarrier)
- .bind(header.NonCommonCarrier)
- .bind(header.PrivateComm)
- .bind(header.Fixed)
- .bind(header.Mobile)
- .bind(header.Radiolocation)
- .bind(header.Sattelite)
- .bind(header.DevelopmentalOrSta)
- .bind(header.InterconnectedService)
- .bind(header.CertifierFirstName)
- .bind(header.CertifierMiddleInitial)
- .bind(header.CertifierLastName)
- .bind(header.CertifierSuffix)
- .bind(header.CertifierTitle)
- .bind(header.Female)
- .bind(header.BlackOrAfricanAmerican)
- .bind(header.NativeAmerican)
- .bind(header.Hawaiian)
- .bind(header.Asian)
- .bind(header.White)
- .bind(header.Hispanic)
- .bind(header.EffectiveDate)
- .bind(header.LastActionDate)
- .bind(header.AuctionId)
- .bind(header.BroadcastServicesRegulatoryStatus)
- .bind(header.BandManagerRegulatoryStatus)
- .bind(header.BroadcastServicesTypeOfRadioService)
- .bind(header.AlienRuling)
- .bind(header.LicenseeNameChange)
- .bind(header.WhitespaceIndicator)
- .bind(header.OperationRequirementChoice)
- .bind(header.OperationRequirementAnswer)
- .bind(header.DiscontinuationOfService)
- .bind(header.RegulatoryCompliance)
- .bind(header.EligibilityCertification900Mhz)
- .bind(header.TransitionPlanCertification900Mhz)
- .bind(header.ReturnSpectrumCertification900Mhz)
- .bind(header.PaymentCertification900Mhz)
+ let chunk_size = BIND_LIMIT / 60;
+ for chunk in &reader.records().chunks(chunk_size) {
+ let chunk = chunk.collect::<Result<Vec<StringRecord>, _>>().unwrap();
+ let chunk = chunk.iter();
+
+ let mut query_builder: QueryBuilder<Sqlite> = QueryBuilder::new(INSERT_HEADER_SQL);
+
+ query_builder.push_values(chunk, |mut builder, entry| {
+ let header: Header = entry.deserialize(None).expect("Error deserializing entry");
+ builder
+ .push_bind(header.RecordType)
+ .push_bind(header.UniqueSystemIdentifier)
+ .push_bind(header.UlsFileNumber)
+ .push_bind(header.EBFNumber)
+ .push_bind(header.CallSign)
+ .push_bind(header.LicenseStatus)
+ .push_bind(header.RadioServiceCode)
+ .push_bind(header.GrantDate)
+ .push_bind(header.ExpiredDate)
+ .push_bind(header.CancellationDate)
+ .push_bind(header.EligibilityRuleNumber)
+ .push_bind(header.Reserved)
+ .push_bind(header.Alien)
+ .push_bind(header.AlienGovernment)
+ .push_bind(header.AlienCorporation)
+ .push_bind(header.AlienOfficers)
+ .push_bind(header.AlienControl)
+ .push_bind(header.Revoked)
+ .push_bind(header.Convicted)
+ .push_bind(header.Adjudged)
+ .push_bind(header.Reserved2)
+ .push_bind(header.CommonCarrier)
+ .push_bind(header.NonCommonCarrier)
+ .push_bind(header.PrivateComm)
+ .push_bind(header.Fixed)
+ .push_bind(header.Mobile)
+ .push_bind(header.Radiolocation)
+ .push_bind(header.Sattelite)
+ .push_bind(header.DevelopmentalOrSta)
+ .push_bind(header.InterconnectedService)
+ .push_bind(header.CertifierFirstName)
+ .push_bind(header.CertifierMiddleInitial)
+ .push_bind(header.CertifierLastName)
+ .push_bind(header.CertifierSuffix)
+ .push_bind(header.CertifierTitle)
+ .push_bind(header.Female)
+ .push_bind(header.BlackOrAfricanAmerican)
+ .push_bind(header.NativeAmerican)
+ .push_bind(header.Hawaiian)
+ .push_bind(header.Asian)
+ .push_bind(header.White)
+ .push_bind(header.Hispanic)
+ .push_bind(header.EffectiveDate)
+ .push_bind(header.LastActionDate)
+ .push_bind(header.AuctionId)
+ .push_bind(header.BroadcastServicesRegulatoryStatus)
+ .push_bind(header.BandManagerRegulatoryStatus)
+ .push_bind(header.BroadcastServicesTypeOfRadioService)
+ .push_bind(header.AlienRuling)
+ .push_bind(header.LicenseeNameChange)
+ .push_bind(header.WhitespaceIndicator)
+ .push_bind(header.OperationRequirementChoice)
+ .push_bind(header.OperationRequirementAnswer)
+ .push_bind(header.DiscontinuationOfService)
+ .push_bind(header.RegulatoryCompliance)
+ .push_bind(header.EligibilityCertification900Mhz)
+ .push_bind(header.TransitionPlanCertification900Mhz)
+ .push_bind(header.ReturnSpectrumCertification900Mhz)
+ .push_bind(header.PaymentCertification900Mhz);
+ });
+
+ query_builder
+ .build()
.execute(&mut transaction)
.await
- .expect("Error executing statement");
- progress_bar.set_position(line.position().unwrap().line());
+ .expect("Error executing query");
+ progress_bar.set_position(progress_bar.position() + chunk_size as u64);
}
transaction
@@ -323,21 +364,30 @@ pub async fn load_history(db: &SqlitePool) {
);
progress_bar.set_message("HS.dat");
- for line in reader.records() {
- let line = line.expect("Error reading entry");
- let history: History = line.deserialize(None).expect("Error deserializing entry");
- let statement = sqlx::query(INSERT_HISTORY_SQL);
- statement
- .bind(history.RecordType)
- .bind(history.UniqueSystemIdentifier)
- .bind(history.UlsFileNumber)
- .bind(history.CallSign)
- .bind(history.LogDate)
- .bind(history.Code)
+ let chunk_size = BIND_LIMIT / 6;
+ for chunk in &reader.records().chunks(chunk_size) {
+ let chunk = chunk.collect::<Result<Vec<StringRecord>, _>>().unwrap();
+ let chunk = chunk.iter();
+
+ let mut query_builder: QueryBuilder<Sqlite> = QueryBuilder::new(INSERT_HISTORY_SQL);
+
+ query_builder.push_values(chunk, |mut builder, entry| {
+ let history: History = entry.deserialize(None).expect("Error deserializing entry");
+ builder
+ .push_bind(history.RecordType)
+ .push_bind(history.UniqueSystemIdentifier)
+ .push_bind(history.UlsFileNumber)
+ .push_bind(history.CallSign)
+ .push_bind(history.LogDate)
+ .push_bind(history.Code);
+ });
+
+ query_builder
+ .build()
.execute(&mut transaction)
.await
- .expect("Error executing statement");
- progress_bar.set_position(line.position().unwrap().line());
+ .expect("Error executing query");
+ progress_bar.set_position(progress_bar.position() + chunk_size as u64);
}
transaction
@@ -372,24 +422,34 @@ pub async fn load_license_attachments(db: &SqlitePool) {
);
progress_bar.set_message("LA.dat");
- for line in reader.records() {
- let line = line.expect("Error reading entry");
- let attachment: LicenseAttachment =
- line.deserialize(None).expect("Error deserializing entry");
- let statement = sqlx::query(INSERT_LICENSE_ATTACHMENT_SQL);
- statement
- .bind(attachment.RecordType)
- .bind(attachment.UniqueSystemIdentifier)
- .bind(attachment.CallSign)
- .bind(attachment.AttachmentCode)
- .bind(attachment.AttachmentDescription)
- .bind(attachment.AttachmentDate)
- .bind(attachment.AttachmentFileName)
- .bind(attachment.ActionPerformed)
+ let chunk_size = BIND_LIMIT / 8;
+ for chunk in &reader.records().chunks(chunk_size) {
+ let chunk = chunk.collect::<Result<Vec<StringRecord>, _>>().unwrap();
+ let chunk = chunk.iter();
+
+ let mut query_builder: QueryBuilder<Sqlite> =
+ QueryBuilder::new(INSERT_LICENSE_ATTACHMENT_SQL);
+
+ query_builder.push_values(chunk, |mut builder, entry| {
+ let attachment: LicenseAttachment =
+ entry.deserialize(None).expect("Error deserializing entry");
+ builder
+ .push_bind(attachment.RecordType)
+ .push_bind(attachment.UniqueSystemIdentifier)
+ .push_bind(attachment.CallSign)
+ .push_bind(attachment.AttachmentCode)
+ .push_bind(attachment.AttachmentDescription)
+ .push_bind(attachment.AttachmentDate)
+ .push_bind(attachment.AttachmentFileName)
+ .push_bind(attachment.ActionPerformed);
+ });
+
+ query_builder
+ .build()
.execute(&mut transaction)
.await
- .expect("Error executing statement");
- progress_bar.set_position(line.position().unwrap().line());
+ .expect("Error executing query");
+ progress_bar.set_position(progress_bar.position() + chunk_size as u64);
}
transaction
@@ -424,25 +484,35 @@ pub async fn load_special_conditions(db: &SqlitePool) {
);
progress_bar.set_message("SC.dat");
- for line in reader.records() {
- let line = line.expect("Error reading entry");
- let condition: SpecialCondition =
- line.deserialize(None).expect("Error deserializing entry");
- let statement = sqlx::query(INSERT_SPECIAL_CONDITION_SQL);
- statement
- .bind(condition.RecordType)
- .bind(condition.UniqueSystemIdentifier)
- .bind(condition.UlsFileNumber)
- .bind(condition.EBFNumber)
- .bind(condition.CallSign)
- .bind(condition.SpecialConditionType)
- .bind(condition.SpecialConditionCode)
- .bind(condition.StatusCode)
- .bind(condition.StatusDate)
+ let chunk_size = BIND_LIMIT / 9;
+ for chunk in &reader.records().chunks(chunk_size) {
+ let chunk = chunk.collect::<Result<Vec<StringRecord>, _>>().unwrap();
+ let chunk = chunk.iter();
+
+ let mut query_builder: QueryBuilder<Sqlite> =
+ QueryBuilder::new(INSERT_SPECIAL_CONDITION_SQL);
+
+ query_builder.push_values(chunk, |mut builder, entry| {
+ let condition: SpecialCondition =
+ entry.deserialize(None).expect("Error deserializing entry");
+ builder
+ .push_bind(condition.RecordType)
+ .push_bind(condition.UniqueSystemIdentifier)
+ .push_bind(condition.UlsFileNumber)
+ .push_bind(condition.EBFNumber)
+ .push_bind(condition.CallSign)
+ .push_bind(condition.SpecialConditionType)
+ .push_bind(condition.SpecialConditionCode)
+ .push_bind(condition.StatusCode)
+ .push_bind(condition.StatusDate);
+ });
+
+ query_builder
+ .build()
.execute(&mut transaction)
.await
- .expect("Error executing statement");
- progress_bar.set_position(line.position().unwrap().line());
+ .expect("Error executing query");
+ progress_bar.set_position(progress_bar.position() + chunk_size as u64);
}
transaction
@@ -477,27 +547,37 @@ pub async fn load_special_conditions_free_form(db: &SqlitePool) {
);
progress_bar.set_message("SF.dat");
- for line in reader.records() {
- let line = line.expect("Error reading entry");
- let condition: SpecialConditionFreeForm =
- line.deserialize(None).expect("Error deserializing entry");
- let statement = sqlx::query(INSERT_SPECIAL_CONDITION_FREE_FORM_SQL);
- statement
- .bind(condition.RecordType)
- .bind(condition.UniqueSystemIdentifier)
- .bind(condition.UlsFileNumber)
- .bind(condition.EBFNumber)
- .bind(condition.CallSign)
- .bind(condition.LicenseFreeFormType)
- .bind(condition.UniqueLicenseFreeFormIdentifier)
- .bind(condition.SequenceNumber)
- .bind(condition.LicenseFreeFormCondition)
- .bind(condition.StatusCode)
- .bind(condition.StatusDate)
+ let chunk_size = BIND_LIMIT / 11;
+ for chunk in &reader.records().chunks(chunk_size) {
+ let chunk = chunk.collect::<Result<Vec<StringRecord>, _>>().unwrap();
+ let chunk = chunk.iter();
+
+ let mut query_builder: QueryBuilder<Sqlite> =
+ QueryBuilder::new(INSERT_SPECIAL_CONDITION_FREE_FORM_SQL);
+
+ query_builder.push_values(chunk, |mut builder, entry| {
+ let condition: SpecialConditionFreeForm =
+ entry.deserialize(None).expect("Error deserializing entry");
+ builder
+ .push_bind(condition.RecordType)
+ .push_bind(condition.UniqueSystemIdentifier)
+ .push_bind(condition.UlsFileNumber)
+ .push_bind(condition.EBFNumber)
+ .push_bind(condition.CallSign)
+ .push_bind(condition.LicenseFreeFormType)
+ .push_bind(condition.UniqueLicenseFreeFormIdentifier)
+ .push_bind(condition.SequenceNumber)
+ .push_bind(condition.LicenseFreeFormCondition)
+ .push_bind(condition.StatusCode)
+ .push_bind(condition.StatusDate);
+ });
+
+ query_builder
+ .build()
.execute(&mut transaction)
.await
- .expect("Error executing statement");
- progress_bar.set_position(line.position().unwrap().line());
+ .expect("Error executing query");
+ progress_bar.set_position(progress_bar.position() + chunk_size as u64);
}
transaction
@@ -532,25 +612,35 @@ pub async fn load_special_condition_codes(db: &SqlitePool) {
);
progress_bar.set_message("special_condition_codes.txt");
- for line in reader.records() {
- let line = line.expect("Error reading entry");
- let statement = sqlx::query(INSERT_SPECIAL_CONDITION_CODES_SQL);
- statement
- .bind(line.get(0))
- .bind(line.get(1))
- .bind(format!(
- "{} {} {} {} {}",
- line.get(2).unwrap_or_default(),
- line.get(3).unwrap_or_default(),
- line.get(4).unwrap_or_default(),
- line.get(5).unwrap_or_default(),
- line.get(6).unwrap_or_default()
- ))
- .bind(line.get(7))
+ let chunk_size = BIND_LIMIT / 4;
+ for chunk in &reader.records().chunks(chunk_size) {
+ let chunk = chunk.collect::<Result<Vec<StringRecord>, _>>().unwrap();
+ let chunk = chunk.iter();
+
+ let mut query_builder: QueryBuilder<Sqlite> =
+ QueryBuilder::new(INSERT_SPECIAL_CONDITION_CODES_SQL);
+
+ query_builder.push_values(chunk, |mut builder, entry| {
+ builder
+ .push_bind(entry.get(0))
+ .push_bind(entry.get(1))
+ .push_bind(format!(
+ "{} {} {} {} {}",
+ entry.get(2).unwrap_or_default(),
+ entry.get(3).unwrap_or_default(),
+ entry.get(4).unwrap_or_default(),
+ entry.get(5).unwrap_or_default(),
+ entry.get(6).unwrap_or_default()
+ ))
+ .push_bind(entry.get(7));
+ });
+
+ query_builder
+ .build()
.execute(&mut transaction)
.await
- .expect("Error executing statement");
- progress_bar.set_position(line.position().unwrap().line());
+ .expect("Error executing query");
+ progress_bar.set_position(progress_bar.position() + chunk_size as u64);
}
transaction