Compare commits

..

2 Commits

Author SHA1 Message Date
2cdd08928d added version information 2019-04-18 09:32:04 -04:00
65db8b071d changed the message to test deployment 2019-04-09 09:56:32 -04:00
16 changed files with 66 additions and 909 deletions

24
.bashrc
View File

@ -1,24 +0,0 @@
# .bashrc
# Source global definitions
if [ -f /etc/bashrc ]; then
. /etc/bashrc
fi
# Uncomment the following line if you don't like systemctl's auto-paging feature:
# export SYSTEMD_PAGER=
# User specific aliases and functions
parse_git_branch() {
repo_branch=$(git branch --show-current 2> /dev/null)
[ ${#repo_branch} != 0 ] && echo "://[${repo_branch}]"
}
#export PS1='\[\e[1;92m\][\u@\h \W]$\[\e[0m\] '
export PS1='\[\e[120;92m\]\u\[\e[120;90m\]@\[\e[120;93m\]\H\[\e[120;90m\] :: \d \t \[\e[0;32m\]\w$(parse_git_branch)\[\e[0m\]\n\[\e[1;92m\] $\[\e[0m\] '
# Path
#PATH=$PATH:$HOME/.local/bin:$HOME/bin:.
PATH=$PATH:~/.local/bin:.

12
.flake8
View File

@ -1,12 +0,0 @@
[flake8]
ignore = W391,W503,E126,E127,E402
exclude =
test/,
.scripts/,
libs/,
__pycache__,
.git/,
.cache
max-line-length: 150

View File

@ -2,11 +2,9 @@
Various tools for miscellaneous use.
### Python Web Server
I have included a simple Python web server to the toolset. In time, I would like to make it more customizable. For now, it will just print the familiar "Hello, World" message. It was designed to have an application I could use for stack development work.
```bash
```
$ python-web-server -h
usage: python-web-server [-h] [-i IP_ADDRESS] [-p PORT] [-d DEBUG]
@ -19,20 +17,3 @@ optional arguments:
Run in debug mode
```
### Fake Logs
This small script will generate log files to a filename provided. The delay is 1/*n*-th of a second to give it more of a real-time approach to logging, if timing is important.
```bash
$ fake-logs --help
usage: fake-logs [-h] [-d DELAY] file
positional arguments:
file Full path for log file
optional arguments:
-h, --help show this help message and exit
-d DELAY, --delay DELAY
Number to delay log entries (1/n)
```

View File

@ -1,16 +0,0 @@
# Build the binary
This will likely change into some sort of `Makefile` later, but for now what is needed to build and use this utility is to run the followning commands:
```bash
$ export GOPATH=/home/mock/.local/bin
$ go build .
$ convert-epoch -epoch 1631711695
Epoch time: 1631711695
Local time: Wed, 15 Sep 2021 09:14:55 -0400
Zulu time: Wed, 15 Sep 2021 13:14:55 +0000
```
I would like to remove the `-epoch` flag in favor of just assuming an epoch time value, but that will come later too. Gotta keep learning the language.

View File

@ -1,47 +0,0 @@
package main
import (
"flag"
"fmt"
"strconv"
"time"
)
// Color for extra touch
type Color string
const (
Black Color = "\u001b[30m"
Red = "\u001b[31m"
BoldRed = "\u001b[91m"
Green = "\u001b[32m"
BoldGreen = "\u001b[92m"
Yellow = "\u001b[33m"
BoldYellow = "\u001b[93m"
Blue = "\u001b[34m"
BoldBlue = "\u001b[94m"
Reset = "\u001b[0m"
)
// Simplify the color printing
func cprint(color Color, title string, time_value string) {
fmt.Println(string(Yellow), title, string(color), time_value, string(Reset))
}
func main() {
epoch_now := time.Now().Unix()
// cprint(Red, "Now time: ", time.Unix(epoch_now, 0).Format(time.RFC1123Z))
epoch_time := flag.Int64("epoch", epoch_now, "Supply epoch time value")
flag.Parse()
cprint(Red, "Epoch time: ", strconv.FormatInt(*epoch_time, 10))
local_time := time.Unix(*epoch_time, 0).Format(time.RFC1123Z)
cprint(BoldBlue, "Local time: ", local_time)
zulu_time := time.Unix(*epoch_time, 0).UTC().Format(time.RFC1123Z)
cprint(BoldGreen, "Zulu time: ", zulu_time)
}

View File

@ -1,3 +0,0 @@
module convert-epoch
go 1.16

65
python-web-server Executable file
View File

@ -0,0 +1,65 @@
#!/usr/bin/env python3
import argparse
import http.server
import socketserver
import logging
# setting up logging for this script
_level = logging.INFO
_format = "%(asctime)-15s [%(levelname)-8s] %(lineno)d : %(funcName)s : %(message)s"
logging.basicConfig(format=_format, level=_level)
log = logging.getLogger()
class SimpleHandler(http.server.BaseHTTPRequestHandler):
def do_GET(self):
log.info(f"server version: {self.server_version}")
log.info(f"client address: {self.client_address}")
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write(b"<html><body><h1>Automate all the things!</h1><p>1.1.0</p></body></html>")
return
class SockHandler(socketserver.BaseRequestHandler):
def handle(self):
self.data = self.request.recv(1024).strip()
log.info(f"client address: {self.client_address}")
log.info(f"received data:\n{self.data}")
self.request.sendall(b"Hello, World!\n")
def parse_args():
"""
Parse args
"""
argp = argparse.ArgumentParser()
argp.add_argument('-i', '--ip-address', default="", help="IP Address to bind the server")
argp.add_argument('-p', '--port', type=int, default=8080, help="Port on which to listen for traffic")
argp.add_argument('-d', '--debug', help="Run in debug mode")
return argp.parse_args()
if __name__ == "__main__":
args = parse_args()
if args.debug:
log.setLevel(logging.DEBUG)
try:
#simple_handler = SimpleHandler() # http.server.SimpleHTTPRequestHandler
with http.server.HTTPServer((args.ip_address, args.port), SimpleHandler) as httpd:
log.info(f"Server started on {args.ip_address}:{args.port}")
httpd.serve_forever()
#with socketserver.TCPServer((args.ip_address, args.port), SockHandler) as tcpd:
# log.info(f"Server started on {args.ip_address}:{args.port}")
# tcpd.serve_forever()
except KeyboardInterrupt as control_c:
log.info("Server stopped")

View File

@ -1,112 +0,0 @@
#!/usr/bin/env python3
"""
Will break apart large files into smaller files
"""
import argparse
import re
import os
import glob
def parse_args():
argp = argparse.ArgumentParser(
description="Will break a large file into smaller files"
)
argp.add_argument(
'large_file',
help="Full path for large file"
)
argp.add_argument(
'--prefix', '-p',
default='small',
help="Prefix for the smaller files (default: %(default)s)"
)
argp.add_argument(
'--extension', '-x',
default='txt',
help="Extension for the smaller files (default: %(default)s)"
)
argp.add_argument(
'--size', '-s',
default='10K',
help="File size limits (5K, 10M, 2G)"
)
argp.add_argument(
'--old', '-o',
action='store_true',
help="Reuse existing small files; default behavior is to remove existing files for new ones"
)
# Future functionality:
# argp.add_argument(
# '--count', '-c',
# type=int,
# default=1,
# help="Number of files to generate (default: %(default)s)"
# )
return argp.parse_args()
def main():
args = parse_args()
file_complete = False
file_count = 1
# get the chunk file size
size_value = args.size
size_value_parts = re.match(r'(\d+)([BbKkMmGgTt]?)', size_value).groups()
if size_value_parts[1]:
factor = (
(size_value_parts[1].lower() == 'k' and 1) or
(size_value_parts[1].lower() == 'm' and 2) or
(size_value_parts[1].lower() == 'g' and 3) or
(size_value_parts[1].lower() == 't' and 4)
)
size_limit = int(size_value_parts[0]) * 1024**factor
else:
size_limit - int(size_value_parts[0])
if not args.old:
print("Removing previous files...")
previous_files = glob.glob(f"{args.prefix}-*.{args.extension}")
for previous_file in previous_files:
os.remove(previous_file)
print(f"Begin chunking large file {args.large_file}...")
with open(args.large_file) as large_file:
for line in large_file:
output_filename = f"{args.prefix}-{file_count:00005}.{args.extension}"
if not file_complete:
file_mode = 'a' if os.path.isfile(output_filename) else 'w'
with open(output_filename, file_mode) as small_file:
small_file.write(line)
with open(output_filename, 'r') as small_file:
small_file.seek(0, os.SEEK_END)
output_file_size = small_file.tell()
if output_file_size > size_limit:
file_complete = True
else:
file_complete = False
file_count += 1
print(f"Large file {args.large_file} chunked into {file_count} files no bigger than {args.size}")
if __name__ == "__main__":
main()

View File

@ -1,27 +0,0 @@
#!/usr/bin/env python3
import argparse
import time
import color
def parse_args():
argp = argparse.ArgumentParser()
argp.add_argument('epoch_time', help="Epoch time value")
return argp.parse_args()
def convert_time(epoch):
local_time = time.strftime("%c", time.localtime(epoch))
zulu_time = time.strftime("%c", time.gmtime(epoch))
return (local_time, zulu_time)
if __name__ == "__main__":
args = parse_args()
local_time, zulu_time = convert_time(float(args.epoch_time))
print(f"\nEpoch time: {color.red(args.epoch_time)}")
print(f"Local time: {color.yellow(local_time)}")
print(f"Zulu time: {color.yellow(zulu_time)}\n")

View File

@ -1,164 +0,0 @@
#!/usr/bin/env python3
"""
Randomly writes from a set of messages simulating logging.
"""
import argparse
import logging
import random
import time
import os
import re
import hashlib
def parse_args():
argp = argparse.ArgumentParser(
description="Will generate fake log files specified by the 'file' argument. "
"The --delay argment controls at what speed the file is written. "
"File limits are controlled by the --size argument. The values are "
"'random:N and consistent:N, where N is value in bytes indicated "
"as 4096, 16384B, 10K, 250M, 10G, or 3T. Lowercase prefixes are "
"accepted. Random will generate a file up to the limit specified. "
"Consistent will make the file exactly the size detailed. "
"Count will determine how how many log files are created."
"The --count argument takes an integer. File past the first are "
"given a short hash suffix before the final extension."
)
argp.add_argument('file', help="Full path for log file")
argp.add_argument('-d', '--delay', type=int, default=5, help="Number to delay log entries (1/n)")
argp.add_argument(
'-s', '--size',
default='random:5K',
help="File size limits. Valid choices: 'random:N' (default) or 'consistent:N', where N is an upper limit"
)
argp.add_argument(
'-c', '--count',
type=int,
default=1,
help="Number of log files to generate (default: %(default)s)"
)
return argp.parse_args()
def random_message(log_level):
debug_messages = [
"Values = [10, 20, 33]",
"Cake ratio = 3.14",
"Setting flat mode",
"Back to normal mode",
"Voltage: 1.21 Gigawatts",
"Speed: 88 mph",
"Temperature: 32.4C"
]
info_messages = [
"My cat is blue",
"All your base are belong to us",
"I like turtles",
"Chocolate rain...",
"I hate prunes",
"The cake is a lie",
"I'm giving her all she's got!"
]
warning_messages = [
"Numbers reaching critical levels",
"Magnetic constrictors around the anti-matter containment fields are failing",
"Approaching thermal thresholds",
"Searching is taking too long",
"There are monkeyboys in the facility",
"Structural integrity approaching tolerance",
"Process taking too long"
]
error_messages = [
"Unable to process request",
"Connection lost with host",
"Buffer overflow",
"Warp engines are offline",
"No more coffee",
"Server not responding",
"Cannot divide by zero"
]
message_level = [
debug_messages,
info_messages,
warning_messages,
error_messages
]
which_message = random.randint(0, 6)
return message_level[log_level - 1][which_message]
def main():
args = parse_args()
_LEVEL = logging.NOTSET
_FORMAT = logging.Formatter("%(asctime)-15s [%(levelname)-8s] : %(lineno)d : %(name)s.%(funcName)s : %(message)s")
_file_handler = logging.FileHandler(args.file, mode='a')
_file_handler.setFormatter(_FORMAT)
_file_handler.setLevel(_LEVEL)
log = logging.Logger('fake-logs')
log.addHandler(_file_handler)
file_complete = False
file_count = 0
try:
while file_count < args.count:
log_path = os.path.dirname(args.file)
log_filename = os.path.basename(args.file)
new_filename, file_extension = os.path.splitext(log_filename)
suffix = hashlib.md5(time.strftime('%s').encode('utf-8')).hexdigest()[-4:]
log_filename = os.path.join(log_path, f"{new_filename}-{suffix}{file_extension}")
while not file_complete:
sleep_time = random.random() * float(f"0.{args.delay}")
logging_level = random.randint(0, 40) % 4
log.log(logging_level * 10, random_message(logging_level))
time.sleep(sleep_time)
# Determine file size if specified
if args.size:
size_type, size_value = args.size.split(':')
size_value_parts = re.match(r'(\d+)([BbKkMmGgTt]?)', size_value).groups()
if size_value_parts[1]:
factor = (
(size_value_parts[1].lower() == 'k' and 1) or
(size_value_parts[1].lower() == 'm' and 2) or
(size_value_parts[1].lower() == 'g' and 3) or
(size_value_parts[1].lower() == 't' and 4)
)
size_limit = int(size_value_parts[0]) * 1024**factor
else:
size_limit - int(size_value_parts[0])
log_file_stats = os.stat(args.file)
if log_file_stats.st_size > size_limit:
if size_type == 'consistent':
truncate_log_file(log_filename)
file_complete = True
file_count += 1
except KeyboardInterrupt:
time.sleep(0.25)
log.error(random_message(4))
if __name__ == "__main__":
main()

View File

@ -1,87 +0,0 @@
#!/usr/bin/env python3
import argparse
import json
import logging
_level = logging.INFO
_format = "%(asctime)-15s [%(levelname)-8s] %(lineno)d : %(funcName)s : %(message)s"
logging.basicConfig(format=_format, level=_level)
logger = logging.getLogger()
def parse_args():
argp = argparse.ArgumentParser()
argp.add_argument(
'--debug',
action='store_true',
help="Run in debug mode"
)
argp.add_argument(
'json_filename',
help="JSON filename to flatten"
)
argp.add_argument(
'--output-file', '-o',
help="Output filename for results"
)
return argp.parse_args()
def flatten(data_object, key_name=None, variable_list=[]):
"""Will flatten a JSON file by listing the variables in
a dot notation.
Args:
data_object: Data as dict, list, string, etc. to parse recursively
key_name: String of the key name to pass into in the next call
variable_list: List of the generated variables
Returns:
The list of the variables in dot notation.
"""
if type(data_object) == list:
for data_line in data_object:
flatten(data_line, key_name=key_name, variable_list=variable_list)
elif type(data_object) == dict:
for k, v in data_object.items():
key_name = f"{key_name}.{k}" if key_name else k
flatten(v, key_name=key_name, variable_list=variable_list)
key_name = ".".join(key_name.split('.')[:-1])
else:
variable_list.append(f"{key_name}: {data_object}" if key_name else None)
return variable_list
def main():
args = parse_args()
if args.debug:
logger.info("Running in debug mode")
logger.setLevel(logging.DEBUG)
with open(args.json_filename) as fp:
json_file = json.load(fp)
json_variables = flatten(json_file)
if args.output_file:
with open(args.output_file, 'w') as fp:
for variable in json_variables:
fp.write(f"{variable}\n")
else:
for variable in json_variables:
print(f"{variable}")
if __name__ == "__main__":
main()

View File

@ -1,35 +0,0 @@
#!/usr/bin/env python3
"""
Calculates the number of actual days since everyone
went into hiding (quarantine).
"""
import argparse
import datetime
import dateutil.parser
import color
def parse_args():
argp = argparse.ArgumentParser()
argp.add_argument('starting_date', help="Provide a starting date as a string")
argp.add_argument('--utc', '-u', action='store_true', help="From UTC perspective")
return argp.parse_args()
if __name__ == "__main__":
args = parse_args()
if args.utc:
right_now = datetime.datetime.utcnow()
else:
right_now = datetime.datetime.now()
delta = right_now - dateutil.parser.parse(args.starting_date)
total_days = delta.days
years = total_days / 365.25
weeks = (total_days % 365.25) / 7

View File

@ -1,130 +0,0 @@
#!/usr/bin/env python3
import argparse
import http.server
import socketserver
import logging
import pprint
#__name__ = "python-web-server"
__version__ = (1, 1, 1)
# setting up logging for this script
_level = logging.INFO
_format = "%(asctime)-15s [%(levelname)-8s] %(lineno)d : %(funcName)s : %(message)s"
logging.basicConfig(format=_format, level=_level)
log = logging.getLogger()
class SimpleHandler(http.server.BaseHTTPRequestHandler):
def do_HEAD(self):
log.info(f"server version: {self.server_version}")
log.info(f"client address: {self.client_address}")
log.info(f"path: {self.path}")
log.info(f"headers: {self.headers}")
log.info(f"request version: {self.request_version}")
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
headers = "\n".join([f"{k}: {v}" for k, v in self.headers.items()])
if self.path != '/':
uri = f"{self.path[1:]} "
else:
uri = ""
message = f"""Hello, {uri}World!
App version: {__version__}
Server version: {self.server_version}
Client address: {self.client_address}
Path: {self.path}
Headers:
{pprint.pformat(self.headers)}
Request version: {self.request_version}
"""
self.wfile.write(bytes(message, encoding='utf-8'))
return
def do_GET(self):
log.info(f"server version: {self.server_version}")
log.info(f"client address: {self.client_address}")
log.info(f"path: {self.path}")
log.info(f"headers: {self.headers}")
log.info(f"request version: {self.request_version}")
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
headers = "<br/>".join([f"{k}: {v}" for k, v in self.headers.items()])
if self.path != '/':
uri = f"{self.path[1:]} "
else:
uri = ""
message = f"""<html><body>
<h1>Hello, {uri}World!</h1>
<p>App version: <b>{__version__}</b></p>
<p>Server version: <b>{self.server_version}</b></p>
<p>Client address: <b>{self.client_address}</b></p>
<p>Path: <b>{self.path}</b></p>
<p>Headers:</p> <p><code>{pprint.pformat(self.headers)}</code></p>
<p>Request version: <b>{self.request_version}</b></p>
</body></html>"""
self.wfile.write(bytes(message, encoding='utf-8'))
return
class SockHandler(socketserver.BaseRequestHandler):
def handle(self):
self.data = self.request.recv(1024).strip()
log.info(f"client address: {self.client_address}")
log.info(f"received data:\n{self.data}")
self.request.sendall(b"Hello, World!\n")
def _version(ver=__version__):
"""
Stringify version value
"""
return f"{ver[0]}.{ver[1]}.{ver[2]}"
def parse_args():
"""
Parse args
"""
argp = argparse.ArgumentParser()
argp.add_argument('-i', '--ip-address', default="", help="IP Address to bind the server")
argp.add_argument('-p', '--port', type=int, default=8080, help="Port on which to listen for traffic")
argp.add_argument('-v', '--version', action='version', version=f"%(prog)s {_version()}", help="Print version")
argp.add_argument('-d', '--debug', help="Run in debug mode")
return argp.parse_args()
if __name__ == "__main__":
args = parse_args()
if args.debug:
log.setLevel(logging.DEBUG)
try:
#simple_handler = SimpleHandler() # http.server.SimpleHTTPRequestHandler
with http.server.HTTPServer((args.ip_address, args.port), SimpleHandler) as httpd:
log.info(f"Server started on {args.ip_address}:{args.port}")
httpd.serve_forever()
#with socketserver.TCPServer((args.ip_address, args.port), SockHandler) as tcpd:
# log.info(f"Server started on {args.ip_address}:{args.port}")
# tcpd.serve_forever()
except KeyboardInterrupt as control_c:
log.info("Server stopped")

View File

@ -1,46 +0,0 @@
#!/usr/bin/env python3
"""
Calculates the number of actual days since everyone
went into hiding (quarantine).
"""
import argparse
import datetime
import dateutil.parser
try:
import color
except:
print("Unable to import the color library. No colors for you.")
quarantine_start_date = "13 Mar 2020"
def parse_args():
argp = argparse.ArgumentParser()
argp.add_argument(
'-s', '--start-date',
default=quarantine_start_date,
help="Different start date than the original (%(default)s)"
)
return argp.parse_args()
def quarantine_days(quarantine_start_date=quarantine_start_date):
q_count = datetime.datetime.now() - dateutil.parser.parse(quarantine_start_date)
return q_count.days
if __name__ == "__main__":
args = parse_args()
try:
start_date_message = f"\nQuarantine Start Date: {color.yellow(args.start_date)}"
quarantine_days_message = f"Quarantine Days: {color.red(quarantine_days(args.start_date))}"
except NameError:
start_date_message = f"\nQuarantine Start Date: {args.start_date}"
quarantine_days_message = f"Quarantine Days: {quarantine_days(args.start_date)}"
print(start_date_message)
print(quarantine_days_message)

View File

@ -1,74 +0,0 @@
#!/usr/bin/env python3
"""
Will break apart large files into smaller files
"""
import argparse
import time
def parse_args():
argp = argparse.ArgumentParser(
description="Will break a large file into smaller files"
)
argp.add_argument(
'large_file',
help="Full path for large file"
)
argp.add_argument(
'--prefix', '-p',
default='small',
help="Prefix for the smaller files (default: %(default)s)"
)
argp.add_argument(
'--extension', '-x',
default='txt',
help="Extension for the smaller files (default: %(default)s)"
)
argp.add_argument(
'-s', '--size',
default='10K',
help="File size limits (5K, 10M, 2G)"
)
return argp.parse_args()
def main():
args = parse_args()
file_count = 0
# get size for chunks
size_value = args.size
size_value_parts = re.match(r'(\d+)([BbKkMmGgTt]?)', size_value).groups()
if size_value_parts[1]:
factor = (
(size_value_parts[1].lower() == 'k' and 1) or
(size_value_parts[1].lower() == 'm' and 2) or
(size_value_parts[1].lower() == 'g' and 3) or
(size_value_parts[1].lower() == 't' and 4)
)
size_limit = int(size_value_parts[0]) * 1024**factor
else:
size_limit - int(size_value_parts[0])
with open(args.large_file) as large_file:
while chunk := large_file.read(size_limit):
file_count = file_count + 1
output_filename = f"{args.prefix}-{file_count}.{args.extension}"
with open(output_filename, 'w') as small_file:
small_file.write(chunk)
if __name__ == "__main__":
main()

View File

@ -1,112 +0,0 @@
#!/usr/bin/env python3
import argparse
import http.server
import socketserver
import os
import logging
# setting up logging for this script
_level = logging.INFO
_format = "%(asctime)-15s [%(levelname)-8s] %(lineno)d : %(funcName)s : %(message)s"
logging.basicConfig(format=_format, level=_level)
log = logging.getLogger()
# set up web directory
WEB_DIR = '/tmp/web'
class SimpleHandler(http.server.BaseHTTPRequestHandler):
def do_GET(self):
log.info(f"server version: {self.server_version}")
log.info(f"client address: {self.client_address}")
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write(b"<html><body><h1>Automate all the things!</h1><p>1.1.0</p></body></html>")
return
class SuperSimpleHandler(http.server.SimpleHTTPRequestHandler):
def __init__(self, *args, **kwargs):
super().__init__(*args, directory=WEB_DIR, **kwargs)
def do_GET(self):
log.info(f"server version: {self.server_version}")
log.info(f"client address: {self.client_address}")
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write(b"<html><body><h1>Automate all the things!</h1><p>1.1.0</p></body></html>")
return
class SockHandler(socketserver.BaseRequestHandler):
def handle(self):
self.data = self.request.recv(1024).strip()
log.info(f"client address: {self.client_address}")
log.info(f"received data:\n{self.data}")
self.request.sendall(b"Hello, World!\n")
def parse_args():
"""
Parse args
"""
argp = argparse.ArgumentParser()
argp.add_argument('-i', '--ip-address', default="", help="IP Address to bind the server")
argp.add_argument('-p', '--port', type=int, default=8080, help="Port on which to listen for traffic")
argp.add_argument('-d', '--debug', action='store_true', help="Run in debug mode")
return argp.parse_args()
def create_file(size=1073741824, web_dir=WEB_DIR):
"""
Create 1G file for 1G download test
"""
with open(web_dir, 'wb') as fp:
fp.seek(size - 1)
fp.write(b'\0')
return
def remove_file(web_dir=WEB_DIR):
"""
Remove the 1G file since we no longer need it
"""
os.remove(web_dir)
return
if __name__ == "__main__":
args = parse_args()
if args.debug:
log.setLevel(logging.DEBUG)
try:
if not os.path.isdir(WEB_DIR):
os.mkdir(WEB_DIR)
log.info("Creating 1 gigabyte file...")
create_file(web_dir=os.path.join(WEB_DIR, 'speedtest'))
# TODO: This should be the client which writes a server listening for this
# client sending a file. A simple Python script like this originally was listening
# on a port for a specific code would allow the file to be written to the /tmp
# directory and then read back down, comparing contents and measuring speed.
# simple_handler = SuperSimpleHandler() # http.server.SimpleHTTPRequestHandler
#with socketserver.TCPServer((args.ip_address, args.port), SockHandler) as tcpd:
# log.info(f"Server started on {args.ip_address}:{args.port}")
# tcpd.serve_forever()
except KeyboardInterrupt as control_c:
log.info("Server stopped")
finally:
log.info("Deleting 1 gigabyte file...")
remove_file(web_dir=os.path.join(WEB_DIR, 'speedtest'))