Skip to content

Commit

Permalink
Merge pull request #2 from VolumeGraphics/stream-bytes
Browse files Browse the repository at this point in the history
Stream bytes
  • Loading branch information
TheAdiWijaya authored Aug 21, 2024
2 parents 4a17d0e + d090cbd commit fb63fdb
Show file tree
Hide file tree
Showing 7 changed files with 119 additions and 37 deletions.
5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ description = "A simple git lfs file pulling implementation in pure rust. Can on
[dependencies]
clap = { version = "4.1", features = ["derive", "env"] }
thiserror = "1"
reqwest = { version="0.11" , features = ["json"] }
reqwest = { version="0.11" , features = ["json", "stream"] }
http = "0.2"
serde = {version ="1.0", features=['derive']}
serde_json = "1.0"
Expand All @@ -26,7 +26,8 @@ tracing = "0.1"
tracing-subscriber = "0.3"
vg_errortools = {version="0.1.0", features = ["tokio"]}
enable-ansi-support = "0.2"

futures-util = "0.3.30"
tempfile = "3.12"

[dev-dependencies]
cucumber = "0.19.1"
Expand Down
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,16 @@ The CLI is pretty straight forward.
- '-r / --recurse-pattern [PATTERN]' downloads everything that matches the pattern
- e.g. 'lfspull -r "*.tgz"' downloads all .tgz files in this folder
- e.g. 'lfspull -r "**/*.tgz"' downloads all .tgz files this folder and all subfolders
- '-b / --random-bytes [RANDOM_BYTES]' for temp file name. See https://docs.rs/tempfile/latest/tempfile/struct.Builder.html#method.rand_bytes
- '-a / --access-token [TOKEN]' sets the token - can also be set via $ACCESS_TOKEN from env
- '-v' for verbose mode

## Library API guide

Please see our docs.rs for example code and the gherkin tests for how to check the origin of the file.

## Changelog

### 0.3.0

- use stream_bytes to download object directly into a temporary files and avoid 'memory allocation of x bytes failed'
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ pub mod prelude {
/// Strange / malformed http response
#[error("Invalid HTTP response: {0}")]
InvalidResponse(String),
/// something failed while creating tempfile
#[error("TempFile error: {0}")]
TempFile(String),
}
}
pub use prelude::FilePullMode;
Expand Down
10 changes: 8 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ struct Args {
#[clap(short, long)]
recurse_pattern: Option<String>,

///bytes used to create a randomized named temp file. This might be important if using parallelism
#[clap(short = 'b', long)]
random_bytes: Option<usize>,

/// Print debug information
#[clap(short, long)]
verbose: bool,
Expand All @@ -44,12 +48,14 @@ pub async fn main() -> Result<(), LFSError> {
let access_token = args.access_token.as_deref();
if let Some(file) = args.file_to_pull {
info!("Single file mode: {}", file.to_string_lossy());
let result = lfspull::pull_file(file, access_token).await?;
let result = lfspull::pull_file(file, access_token, args.random_bytes).await?;
info!("Result: {}", result);
}
if let Some(recurse_pattern) = args.recurse_pattern {
info!("Glob-recurse mode: {}", &recurse_pattern);
let results = lfspull::glob_recurse_pull_directory(&recurse_pattern, access_token).await?;
let results =
lfspull::glob_recurse_pull_directory(&recurse_pattern, access_token, args.random_bytes)
.await?;
info!("Pulling finished! Listing files and sources: ");

results.into_iter().enumerate().for_each(|(id, (n, r))| {
Expand Down
41 changes: 27 additions & 14 deletions src/repo_tools/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use crate::prelude::*;
mod primitives;

use futures_util::TryFutureExt;
use glob::glob;
use primitives::get_repo_root;
use std::path::{Path, PathBuf};
use tokio::fs;
use tokio::io::AsyncWriteExt;
use tracing::{debug, info};
use url::Url;
use vg_errortools::{fat_io_wrap_tokio, FatIOError};
Expand Down Expand Up @@ -34,7 +34,6 @@ async fn get_real_repo_root<P: AsRef<Path>>(repo_path: P) -> Result<PathBuf, LFS
let worktree_file_contents = fat_io_wrap_tokio(git_path, tokio::fs::read_to_string).await?;
let worktree_path = worktree_file_contents
.split(':')
.into_iter()
.find(|c| c.contains(".git"))
.expect("Could not resolve original repo .git/config file from worktree .git file")
.trim();
Expand Down Expand Up @@ -100,28 +99,37 @@ async fn get_file_cached<P: AsRef<Path>>(
repo_root: P,
metadata: &primitives::MetaData,
access_token: Option<&str>,
randomizer_bytes: Option<usize>,
) -> Result<(PathBuf, FilePullMode), LFSError> {
let cache_dir = get_cache_dir(&repo_root, metadata).await?;
debug!("cache dir {:?}", &cache_dir);
let cache_file = cache_dir.join(&metadata.oid);
debug!("cache file {:?}", &cache_file);
let repo_url = remote_url_ssh_to_https(get_remote_url(&repo_root).await?)?;

if !cache_file.is_file() {
if cache_file.is_file() {
Ok((cache_file, FilePullMode::UsedLocalCache))
} else {
fat_io_wrap_tokio(cache_dir, fs::create_dir_all)
.await
.map_err(|_| {
LFSError::DirectoryTraversalError(
"Could not create lfs cache directory".to_string(),
)
})?;
let buf = primitives::download_file(metadata, &repo_url, access_token).await?;
fat_io_wrap_tokio(&cache_file, fs::File::create)
.await?
.write_all(&buf)
.await
.map_err(|e| FatIOError::from_std_io_err(e, cache_file.clone()))?;

let temp_file =
primitives::download_file(metadata, &repo_url, access_token, randomizer_bytes).await?;
fs::rename(&temp_file.path(), cache_file.as_path())
.map_err(|e| {
LFSError::FatFileIOError(FatIOError::from_std_io_err(
e,
temp_file.path().to_path_buf(),
))
})
.await?;

Ok((cache_file, FilePullMode::DownloadedFromRemote))
} else {
Ok((cache_file, FilePullMode::UsedLocalCache))
}
}

Expand All @@ -138,6 +146,7 @@ async fn get_file_cached<P: AsRef<Path>>(
pub async fn pull_file<P: AsRef<Path>>(
lfs_file: P,
access_token: Option<&str>,
randomizer_bytes: Option<usize>,
) -> Result<FilePullMode, LFSError> {
info!("Pulling file {}", lfs_file.as_ref().to_string_lossy());
if !primitives::is_lfs_node_file(&lfs_file).await? {
Expand All @@ -154,7 +163,8 @@ pub async fn pull_file<P: AsRef<Path>>(
let repo_root = get_repo_root(&lfs_file).await.map_err(|e| {
LFSError::DirectoryTraversalError(format!("Could not find git repo root: {:?}", e))
})?;
let (file_name_cached, origin) = get_file_cached(&repo_root, &metadata, access_token).await?;
let (file_name_cached, origin) =
get_file_cached(&repo_root, &metadata, access_token, randomizer_bytes).await?;
info!(
"Found file (Origin: {:?}), linking to {}",
origin,
Expand Down Expand Up @@ -189,23 +199,26 @@ fn glob_recurse(wildcard_pattern: &str) -> Result<Vec<PathBuf>, LFSError> {
///
/// * `access_token` - the token for Bearer-Auth via HTTPS
///
/// * `randomizer bytes` - bytes used to create a randomized named temp file
///
/// # Examples
///
/// Load all .jpg files from all subdirectories
/// ```no_run
/// let result = lfspull::glob_recurse_pull_directory("dir/to/pull/**/*.jpg", Some("secret-token"));
/// let result = lfspull::glob_recurse_pull_directory("dir/to/pull/**/*.jpg", Some("secret-token"), Some(5));
/// ```
///
pub async fn glob_recurse_pull_directory(
wildcard_pattern: &str,
access_token: Option<&str>,
randomizer_bytes: Option<usize>,
) -> Result<Vec<(String, FilePullMode)>, LFSError> {
let mut result_vec = Vec::new();
let files = glob_recurse(wildcard_pattern)?;
for path in files {
result_vec.push((
path.to_string_lossy().to_string(),
pull_file(&path, access_token).await?,
pull_file(&path, access_token, randomizer_bytes).await?,
));
}

Expand Down
86 changes: 69 additions & 17 deletions src/repo_tools/primitives.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
use crate::prelude::*;
use hex::FromHexError;
use futures_util::stream::StreamExt;
use http::StatusCode;
use reqwest::Client;
use serde::{Deserialize, Serialize};
use serde_json::json;
use sha2::{Digest, Sha256};
use std::collections::HashMap;
use std::convert::TryInto;
use std::io::Write;
use std::path::Path;
use std::path::PathBuf;
use tempfile::NamedTempFile;
use tokio::fs;
use tokio::io::AsyncReadExt;
use tracing::info;
use tracing::{debug, info};
use url::Url;
use vg_errortools::{fat_io_wrap_tokio, FatIOError};

Expand Down Expand Up @@ -117,11 +119,24 @@ fn url_with_auth(url: &str, access_token: Option<&str>) -> Result<Url, LFSError>
Ok(url)
}

// //no need for tempfile crate
// pub(crate) struct TempFile {
// pub(crate) path: PathBuf,
// pub(crate) file: fs::File,
// }
//
// impl Drop for TempFile {
// fn drop(&mut self) {
// let _ = std::fs::remove_file(&self.path);
// }
// }

pub async fn download_file(
meta_data: &MetaData,
repo_remote_url: &str,
access_token: Option<&str>,
) -> Result<bytes::Bytes, LFSError> {
randomizer_bytes: Option<usize>,
) -> Result<NamedTempFile, LFSError> {
const MEDIA_TYPE: &str = "application/vnd.git-lfs+json";
let client = Client::builder().build()?;
assert_eq!(meta_data.hash, Some(Hash::SHA256));
Expand Down Expand Up @@ -183,28 +198,59 @@ pub async fn download_file(
);
return Err(LFSError::InvalidResponse(message));
}
let bytes = response.bytes().await?;
if !check_hash(&bytes, &object.oid)? {
return Err(LFSError::ChecksumMismatch);

debug!("creating temp file in current dir");

const TEMP_SUFFIX: &str = ".lfstmp";
const TEMP_FOLDER: &str = "./";
let tmp_path = PathBuf::from(TEMP_FOLDER).join(format!("{}{TEMP_SUFFIX}", &meta_data.oid));
if randomizer_bytes.is_none() && tmp_path.exists() {
debug!("temp file exists. Deleting");
fat_io_wrap_tokio(&tmp_path, fs::remove_file).await?;
}
Ok(bytes)
}
let temp_file = tempfile::Builder::new()
.prefix(&meta_data.oid)
.suffix(TEMP_SUFFIX)
.rand_bytes(randomizer_bytes.unwrap_or_default())
.tempfile_in(TEMP_FOLDER)
.map_err(|e| LFSError::TempFile(e.to_string()))?;

fn check_hash(bytes: &bytes::Bytes, checksum: &str) -> Result<bool, FromHexError> {
let mut hasher = Sha256::new();
hasher.update(bytes);
let mut stream = response.bytes_stream();
while let Some(chunk_result) = stream.next().await {
let chunk = chunk_result?;
temp_file.as_file().write_all(&chunk).map_err(|e| {
LFSError::FatFileIOError(FatIOError::from_std_io_err(
e,
temp_file.path().to_path_buf(),
))
})?;
hasher.update(chunk);
}
temp_file.as_file().flush().map_err(|e| {
LFSError::FatFileIOError(FatIOError::from_std_io_err(
e,
temp_file.path().to_path_buf(),
))
})?;

debug!("checking hash");

let result = hasher.finalize();
let hex_data = hex::decode(checksum.as_bytes())?;
Ok(result[..] == hex_data)
let hex_data = hex::decode(object.oid.as_bytes())?;
if result[..] == hex_data {
Ok(temp_file)
} else {
Err(LFSError::ChecksumMismatch)
}
}

pub async fn is_lfs_node_file<P: AsRef<Path>>(path: P) -> Result<bool, LFSError> {
if path.as_ref().is_dir() {
return Ok(false);
}
let mut reader = fat_io_wrap_tokio(&path, fs::File::open).await?;
let mut buf: Vec<u8> = Vec::new();
buf.resize(FILE_HEADER.len(), 0);
let mut buf: Vec<u8> = vec![0; FILE_HEADER.len()];
let read_result = reader.read_exact(buf.as_mut_slice()).await;
if let Err(e) = read_result {
match e.kind() {
Expand Down Expand Up @@ -272,13 +318,19 @@ size 665694"#;
);
assert_eq!(parsed.hash, Some(Hash::SHA256));
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn try_pull_from_demo_repo() {
let parsed = parse_lfs_string(LFS_TEST_DATA).expect("Could not parse demo-string!");
let bytes = download_file(&parsed, URL, None)
let temp_file = download_file(&parsed, URL, None, None)
.await
.expect("could not read bytes");
assert_eq!(bytes.len(), parsed.size);
.expect("could not download file");
let temp_size = temp_file
.as_file()
.metadata()
.expect("could not get temp file size")
.len();
assert_eq!(temp_size as usize, parsed.size);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
Expand Down
4 changes: 2 additions & 2 deletions tests/lfspull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ async fn pull_file_step(world: &mut LFSWorld) {
.clone()
.join(TEST_LFS_FILE_NAME);
world.pull_result = Some(
lfspull::pull_file(file_path, None)
lfspull::pull_file(file_path, None, Some(5))
.await
.expect("Could not pull file"),
);
Expand All @@ -65,7 +65,7 @@ async fn pull_file_step(world: &mut LFSWorld) {
async fn pull_directory(world: &mut LFSWorld) {
let fake_repo = world.current_fake_repo.as_ref().unwrap().to_string_lossy();
let pattern = format!("{}/**/*", fake_repo);
let recurse_pull = lfspull::glob_recurse_pull_directory(&pattern, None)
let recurse_pull = lfspull::glob_recurse_pull_directory(&pattern, None, Some(5))
.await
.expect("Could not pull directory")
.into_iter()
Expand Down

0 comments on commit fb63fdb

Please sign in to comment.